documentation in DHT and compiler macro DHT_STATISTICS

This commit is contained in:
Max Lübke 2021-01-13 21:02:57 +01:00
parent 0705772204
commit 4c86f9fffd
No known key found for this signature in database
GPG Key ID: D3201E51647D1199
4 changed files with 663 additions and 426 deletions

View File

@ -1,423 +1,510 @@
#include "DHT.h"
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/*
* determining destination rank and index by hash of key
/**
* @brief Determine destination rank and index.
*
* return values by reference
* This is done by looping over all possbile indices. First of all, set a
* temporary index to zero and copy count of bytes for each index into the
* memory area of the temporary index. After that the current index is
* calculated by the temporary index modulo the table size. The destination rank
* of the process is simply determined by hash modulo the communicator size.
*
* @param hash Calculated 64 bit hash.
* @param comm_size Communicator size.
* @param table_size Count of buckets per process.
* @param dest_rank Reference to the destination rank variable.
* @param index Pointer to the array index.
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) {
uint64_t tmp;
int char_hop = 9-index_count;
unsigned int i;
for (i = 0; i < index_count ; i++) {
tmp = 0;
memcpy(&tmp,(unsigned char *)&hash+i, char_hop);
index[i] = (unsigned int) (tmp % table_size);
}
*dest_rank = (unsigned int) (hash % comm_size);
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int *dest_rank,
unsigned int *index, unsigned int index_count) {
/** temporary index */
uint64_t tmp_index;
/** how many bytes for one index? */
int index_size = 9 - index_count;
for (unsigned int i = 0; i < index_count; i++) {
tmp_index = 0;
memcpy(&tmp_index, (unsigned char *)&hash + i, index_size);
index[i] = (unsigned int)(tmp_index % table_size);
}
*dest_rank = (unsigned int)(hash % comm_size);
}
/**
* set write flag to 1
* @brief Set the occupied flag.
*
* This will set the first bit of a bucket to 1.
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char* flag_byte) {
*flag_byte = 0;
*flag_byte |= (1 << 0);
}
static void set_flag(char *flag_byte) {
*flag_byte = 0;
*flag_byte |= (1 << 0);
}
/**
* return 1 if write flag is set
* else 0
* @brief Get the occupied flag.
*
* This function determines whether the occupied flag of a bucket was set or
* not.
*
* @param flag_byte First byte of a bucket.
* @return int Returns 1 for true or 0 for false.
*/
static int read_flag(char flag_byte) {
if ((flag_byte & 0x01) == 0x01) {
return 1;
} else return 0;
if ((flag_byte & 0x01) == 0x01) {
return 1;
} else
return 0;
}
/*
* allocating memory for DHT object and buckets.
* creating MPI window for OSC
* filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use
* return DHT object
*/
DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) {
DHT *object;
MPI_Win window;
void* mem_alloc;
int comm_size, tmp;
DHT *DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size,
uint64_t (*hash_func)(int, void *)) {
DHT *object;
MPI_Win window;
void *mem_alloc;
int comm_size, index_bytes;
tmp = (int) ceil(log2(size));
if (tmp%8 != 0) tmp = tmp + (8-(tmp%8));
// calculate how much bytes for the index are needed to address count of
// buckets per process
index_bytes = (int)ceil(log2(size));
if (index_bytes % 8 != 0) index_bytes = index_bytes + (8 - (index_bytes % 8));
object = (DHT*) malloc(sizeof(DHT));
if (object == NULL) return NULL;
// allocate memory for dht-object
object = (DHT *)malloc(sizeof(DHT));
if (object == NULL) return NULL;
//every memory allocation has 1 additional byte for flags etc.
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL;
if (MPI_Comm_size(comm, &comm_size) != 0) return NULL;
// every memory allocation has 1 additional byte for flags etc.
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL,
&mem_alloc) != 0)
return NULL;
if (MPI_Comm_size(comm, &comm_size) != 0) return NULL;
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
// since MPI_Alloc_mem doesn't provide memory allocation with the memory set
// to zero, we're doing this here
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL;
// create windows on previously allocated memory
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size),
(1 + data_size + key_size), MPI_INFO_NULL, comm,
&window) != 0)
return NULL;
object->data_size = data_size;
object->key_size = key_size;
object->table_size = size;
object->window = window;
object->hash_func = hash_func;
object->comm_size = comm_size;
object->communicator = comm;
object->read_misses = 0;
object->collisions = 0;
object->recv_entry = malloc(1 + data_size + key_size);
object->send_entry = malloc(1 + data_size + key_size);
object->index_count = 9-(tmp/8);
object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int));
object->mem_alloc = mem_alloc;
// fill dht-object
object->data_size = data_size;
object->key_size = key_size;
object->table_size = size;
object->window = window;
object->hash_func = hash_func;
object->comm_size = comm_size;
object->communicator = comm;
object->read_misses = 0;
object->evictions = 0;
object->recv_entry = malloc(1 + data_size + key_size);
object->send_entry = malloc(1 + data_size + key_size);
object->index_count = 9 - (index_bytes / 8);
object->index = (unsigned int *)malloc((9 - (index_bytes / 8)) * sizeof(int));
object->mem_alloc = mem_alloc;
DHT_stats *stats;
// if set, initialize dht_stats
#ifdef DHT_STATISTICS
DHT_stats *stats;
stats = (DHT_stats*) malloc(sizeof(DHT_stats));
if (stats == NULL) return NULL;
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
if (stats == NULL) return NULL;
object->stats = stats;
object->stats->writes_local = (int*) calloc(comm_size, sizeof(int));
object->stats->old_writes = 0;
object->stats->read_misses = 0;
object->stats->collisions = 0;
object->stats->w_access = 0;
object->stats->r_access = 0;
object->stats = stats;
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
object->stats->old_writes = 0;
object->stats->read_misses = 0;
object->stats->evictions = 0;
object->stats->w_access = 0;
object->stats->r_access = 0;
#endif
return object;
return object;
}
/*
* puts passed by data with key to DHT
*
* returning DHT_MPI_ERROR = -1 if MPI error occurred
* else DHT_SUCCESS = 0 if success
*/
int DHT_write(DHT *table, void* send_key, void* send_data) {
unsigned int dest_rank, i;
int result = DHT_SUCCESS;
int DHT_write(DHT *table, void *send_key, void *send_data) {
unsigned int dest_rank, i;
int result = DHT_SUCCESS;
table->stats->w_access++;
#ifdef DHT_STATISTICS
table->stats->w_access++;
#endif
//determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
// determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
table->table_size, &dest_rank, table->index,
table->index_count);
//concatenating key with data to write entry to DHT
set_flag((char *) table->send_entry);
memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size);
memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size);
// concatenating key with data to write entry to DHT
set_flag((char *)table->send_entry);
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
table->data_size);
//locking window of target rank with exclusive lock
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
for (i = 0; i < table->index_count; i++)
{
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
// locking window of target rank with exclusive lock
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
//increment collision counter if receiving key doesn't match sending key
//,entry has write flag + last index is reached
if (read_flag(*(char *)table->recv_entry)) {
if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) {
if (i == (table->index_count)-1) {
table->collisions += 1;
table->stats->collisions += 1;
result = DHT_WRITE_SUCCESS_WITH_COLLISION;
break;
}
} else break;
} else {
table->stats->writes_local[dest_rank]++;
break;
// increment eviction counter if receiving key doesn't match sending key
// entry has write flag and last index is reached.
if (read_flag(*(char *)table->recv_entry)) {
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
0) {
if (i == (table->index_count) - 1) {
table->evictions += 1;
#ifdef DHT_STATISTICS
table->stats->evictions += 1;
#endif
result = DHT_WRITE_SUCCESS_WITH_COLLISION;
break;
}
} else
break;
} else {
#ifdef DHT_STATISTICS
table->stats->writes_local[dest_rank]++;
#endif
break;
}
}
//put data to DHT
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
//unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
// put data to DHT (with last selected index by value i)
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return result;
return result;
}
/*
* gets data from the DHT by key
*
* return DHT_SUCCESS = 0 if success
* DHT_MPI_ERROR = -1 if MPI error occurred
* DHT_READ_ERROR = -2 if receiving key doesn't match sending key
*/
int DHT_read(DHT *table, void* send_key, void* destination) {
unsigned int dest_rank, i;
int DHT_read(DHT *table, void *send_key, void *destination) {
unsigned int dest_rank, i;
table->stats->r_access++;
#ifdef DHT_STATISTICS
table->stats->r_access++;
#endif
//determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
// determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
table->table_size, &dest_rank, table->index,
table->index_count);
//locking window of target rank with shared lock
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR;
//receive data
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
//increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached
//else copy data to dereference of passed by destination pointer
// locking window of target rank with shared lock
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
// receive data
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if ((read_flag(*(char *) table->recv_entry)) == 0) {
table->read_misses += 1;
table->stats->read_misses += 1;
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
}
if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) {
if (i == (table->index_count)-1) {
table->read_misses += 1;
table->stats->read_misses += 1;
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
}
} else break;
// increment read error counter if write flag isn't set ...
if ((read_flag(*(char *)table->recv_entry)) == 0) {
table->read_misses += 1;
#ifdef DHT_STATISTICS
table->stats->read_misses += 1;
#endif
// unlock window and return
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
}
//unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
// ... or key doesn't match passed by key and last index reached.
if (memcmp(((char *)table->recv_entry) + 1, send_key, table->key_size) !=
0) {
if (i == (table->index_count) - 1) {
table->read_misses += 1;
#ifdef DHT_STATISTICS
table->stats->read_misses += 1;
#endif
// unlock window an return
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
}
} else
break;
}
memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size);
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_SUCCESS;
// if matching key was found copy data into memory of passed pointer
memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1,
table->data_size);
return DHT_SUCCESS;
}
int DHT_to_file(DHT* table, const char* filename) {
//open file
MPI_File file;
if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
int DHT_to_file(DHT *table, const char *filename) {
// open file
MPI_File file;
if (MPI_File_open(table->communicator, filename,
MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL,
&file) != 0)
return DHT_FILE_IO_ERROR;
int rank;
MPI_Comm_rank(table->communicator, &rank);
int rank;
MPI_Comm_rank(table->communicator, &rank);
//write header (key_size and data_size)
if (rank == 0) {
if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
// write header (key_size and data_size)
if (rank == 0) {
if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) !=
0)
return DHT_FILE_WRITE_ERROR;
if (MPI_File_write(file, &table->data_size, 1, MPI_INT,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
}
// seek file pointer behind header for all processes
if (MPI_File_seek_shared(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
char *ptr;
int bucket_size = table->key_size + table->data_size + 1;
// iterate over local memory
for (unsigned int i = 0; i < table->table_size; i++) {
ptr = (char *)table->mem_alloc + (i * bucket_size);
// if bucket has been written to (checked by written_flag)...
if (read_flag(*ptr)) {
// write key and data to file
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
}
}
// close file
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
char* ptr;
int bucket_size = table->key_size + table->data_size + 1;
//iterate over local memory
for (unsigned int i = 0; i < table->table_size; i++) {
ptr = (char *) table->mem_alloc + (i * bucket_size);
//if bucket has been written to (checked by written_flag)...
if (read_flag(*ptr)) {
//write key and data to file
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
}
}
//close file
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
return DHT_SUCCESS;
}
int DHT_from_file(DHT* table, const char* filename) {
MPI_File file;
MPI_Offset f_size;
int e_size, m_size, cur_pos, rank, offset;
char* buffer;
void* key;
void* data;
int DHT_from_file(DHT *table, const char *filename) {
MPI_File file;
MPI_Offset f_size;
int bucket_size, buffer_size, cur_pos, rank, offset;
char *buffer;
void *key;
void *data;
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
// open file
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY,
MPI_INFO_NULL, &file) != 0)
return DHT_FILE_IO_ERROR;
if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR;
// get file size
if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR;
MPI_Comm_rank(table->communicator, &rank);
MPI_Comm_rank(table->communicator, &rank);
e_size = table->key_size + table->data_size;
m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE;
buffer = (char *) malloc(m_size);
// calculate bucket size
bucket_size = table->key_size + table->data_size;
// buffer size is either bucket size or, if bucket size is smaller than the
// file header, the size of DHT_FILEHEADER_SIZE
buffer_size =
bucket_size > DHT_FILEHEADER_SIZE ? bucket_size : DHT_FILEHEADER_SIZE;
// allocate buffer
buffer = (char *)malloc(buffer_size);
if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
// read file header
if (MPI_File_read(file, buffer, DHT_FILEHEADER_SIZE, MPI_BYTE,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_READ_ERROR;
if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE;
if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE;
// compare if written header data and key size matches current sizes
if (*(int *)buffer != table->key_size) return DHT_WRONG_FILE;
if (*(int *)(buffer + 4) != table->data_size) return DHT_WRONG_FILE;
offset = e_size*table->comm_size;
// set offset for each process
offset = bucket_size * table->comm_size;
if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
cur_pos = DHT_HEADER_SIZE + (rank * e_size);
// seek behind header of DHT file
if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
// current position is rank * bucket_size + OFFSET
cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size);
while(cur_pos < f_size) {
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
MPI_Offset tmp;
MPI_File_get_position(file, &tmp);
if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
key = buffer;
data = (buffer+table->key_size);
if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR;
// loop over file and write data to DHT with DHT_write
while (cur_pos < f_size) {
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
// TODO: really necessary?
MPI_Offset tmp;
MPI_File_get_position(file, &tmp);
if (MPI_File_read(file, buffer, bucket_size, MPI_BYTE, MPI_STATUS_IGNORE) !=
0)
return DHT_FILE_READ_ERROR;
// extract key and data and write to DHT
key = buffer;
data = (buffer + table->key_size);
if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR;
cur_pos += offset;
}
// increment current position
cur_pos += offset;
}
free (buffer);
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
free(buffer);
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
return DHT_SUCCESS;
}
/*
* frees up memory and accumulate counter
*/
int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) {
int buf;
int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
int buf;
if (collision_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
*collision_counter = buf;
}
if (readerror_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
*readerror_counter = buf;
}
if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR;
if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR;
free(table->recv_entry);
free(table->send_entry);
free(table->index);
if (eviction_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->evictions, &buf, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
*eviction_counter = buf;
}
if (readerror_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
*readerror_counter = buf;
}
if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR;
if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR;
free(table->recv_entry);
free(table->send_entry);
free(table->index);
free(table->stats->writes_local);
free(table->stats);
#ifdef DHT_STATISTICS
free(table->stats->writes_local);
free(table->stats);
#endif
free(table);
free(table);
return DHT_SUCCESS;
return DHT_SUCCESS;
}
/*
* prints a table with statistics about current use of DHT
* for each participating process and summed up results containing:
* 1. occupied buckets (in respect to the memory of this process)
* 2. free buckets (in respect to the memory of this process)
* 3. calls of DHT_write (w_access)
* 4. calls of DHT_read (r_access)
* 5. read misses (see DHT_READ_ERROR)
* 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION)
* 3-6 will reset with every call of this function
* finally the amount of new written entries is printed out (in relation to last call of this funtion)
*/
#ifdef DHT_STATISTICS
int DHT_print_statistics(DHT *table) {
int *written_buckets;
int *read_misses, sum_read_misses;
int *collisions, sum_collisions;
int sum_w_access, sum_r_access, *w_access, *r_access;
int rank;
int *written_buckets;
int *read_misses, sum_read_misses;
int *evictions, sum_evictions;
int sum_w_access, sum_r_access, *w_access, *r_access;
int rank;
MPI_Comm_rank(table->communicator, &rank);
MPI_Comm_rank(table->communicator, &rank);
// disable possible warning of unitialized variable, which is not the case
// since we're using MPI_Gather to obtain all values only on rank 0
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
//disable possible warning of unitialized variable, which is not the case
//since we're using MPI_Gather to obtain all values only on rank 0
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
// obtaining all values from all processes in the communicator
if (rank == 0) read_misses = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1,
MPI_INT, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT,
MPI_SUM, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->read_misses = 0;
//obtaining all values from all processes in the communicator
if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->read_misses = 0;
if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->collisions = 0;
if (rank == 0) evictions = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM,
0, table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->evictions = 0;
if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->w_access = 0;
if (rank == 0) w_access = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->w_access = 0;
if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->r_access = 0;
if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int));
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (rank == 0) r_access = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->r_access = 0;
if (rank == 0) { //only process with rank 0 will print out results as a table
int sum_written_buckets = 0;
if (rank == 0) written_buckets = (int *)calloc(table->comm_size, sizeof(int));
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size,
MPI_INT, MPI_SUM, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
for (int i=0; i < table->comm_size; i++) {
sum_written_buckets += written_buckets[i];
}
if (rank == 0) { // only process with rank 0 will print out results as a
// table
int sum_written_buckets = 0;
int members = 7;
int padsize = (members*12)+1;
char pad[padsize+1];
memset(pad, '-', padsize*sizeof(char));
pad[padsize]= '\0';
printf("\n");
printf("%-35s||resets with every call of this function\n", " ");
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n",
"rank",
"occupied",
"free",
"w_access",
"r_access",
"read misses",
"collisions");
printf("%s\n", pad);
for (int i = 0; i < table->comm_size; i++) {
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
i,
written_buckets[i],
table->table_size-written_buckets[i],
w_access[i],
r_access[i],
read_misses[i],
collisions[i]);
}
printf("%s\n", pad);
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
"sum",
sum_written_buckets,
(table->table_size*table->comm_size)-sum_written_buckets,
sum_w_access,
sum_r_access,
sum_read_misses,
sum_collisions);
printf("%s\n", pad);
printf("%s %d\n",
"new entries:",
sum_written_buckets - table->stats->old_writes);
printf("\n");
fflush(stdout);
table->stats->old_writes = sum_written_buckets;
for (int i = 0; i < table->comm_size; i++) {
sum_written_buckets += written_buckets[i];
}
//enable warning again
#pragma GCC diagnostic pop
int members = 7;
int padsize = (members * 12) + 1;
char pad[padsize + 1];
MPI_Barrier(table->communicator);
return DHT_SUCCESS;
}
memset(pad, '-', padsize * sizeof(char));
pad[padsize] = '\0';
printf("\n");
printf("%-35s||resets with every call of this function\n", " ");
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied",
"free", "w_access", "r_access", "read misses", "evictions");
printf("%s\n", pad);
for (int i = 0; i < table->comm_size; i++) {
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", i,
written_buckets[i], table->table_size - written_buckets[i],
w_access[i], r_access[i], read_misses[i], evictions[i]);
}
printf("%s\n", pad);
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", "sum",
sum_written_buckets,
(table->table_size * table->comm_size) - sum_written_buckets,
sum_w_access, sum_r_access, sum_read_misses, sum_evictions);
printf("%s\n", pad);
printf("%s %d\n",
"new entries:", sum_written_buckets - table->stats->old_writes);
printf("\n");
fflush(stdout);
table->stats->old_writes = sum_written_buckets;
}
// enable warning again
#pragma GCC diagnostic pop
MPI_Barrier(table->communicator);
return DHT_SUCCESS;
}
#endif

View File

@ -1,8 +1,14 @@
/*
* File: DHT.h
* Author: max luebke
/**
* @file DHT.h
* @author Max Lübke (mluebke@uni-potsdam.de)
* @brief API to interact with the DHT
* @version 0.1
* @date 16 Nov 2017
*
* Created on 16. November 2017, 09:14
* This file implements the creation of a DHT by using the MPI
* one-sided-communication. There is also the possibility to write or read data
* from or to the DHT. In addition, the current state of the DHT can be written
* to a file and read in again later.
*/
#ifndef DHT_H
@ -11,102 +17,242 @@
#include <mpi.h>
#include <stdint.h>
/** Returned if some error in MPI routine occurs. */
#define DHT_MPI_ERROR -1
/** Returned by a call of DHT_read if no bucket with given key was found. */
#define DHT_READ_ERROR -2
/** Returned by DHT_write if a bucket was evicted. */
#define DHT_WRITE_SUCCESS_WITH_COLLISION -3
/** Returned when no errors occured. */
#define DHT_SUCCESS 0
#define DHT_WRITE_SUCCESS_WITH_COLLISION 1
#define DHT_WRONG_FILE 11
#define DHT_FILE_IO_ERROR 12
#define DHT_FILE_READ_ERROR 13
#define DHT_FILE_WRITE_ERROR 14
/** Returned by DHT_from_file if the given file does not match expected file. */
#define DHT_WRONG_FILE -11
/** Returned by DHT file operations if MPI file operation fails. */
#define DHT_FILE_IO_ERROR -12
/** Returned by DHT file operations if error occured in MPI_Read operation. */
#define DHT_FILE_READ_ERROR -13
/** Returned by DHT file operations if error occured in MPI_Write operation. */
#define DHT_FILE_WRITE_ERROR -14
#define DHT_HEADER_SIZE 8
/** Size of the file header in byte. */
#define DHT_FILEHEADER_SIZE 8
typedef struct {;
int *writes_local, old_writes;
int read_misses, collisions;
int w_access, r_access;
/**
* Internal struct to store statistics about read and write accesses and also
* read misses and evictions.
* <b>All values will be resetted to zero after a call of
* DHT_print_statistics().</b>
* Internal use only!
*
* @todo There's maybe a better solution than DHT_print_statistics and this
* struct
*/
typedef struct {
/** Count of writes to specific process this process did. */
int* writes_local;
/** Writes after last call of DHT_print_statistics. */
int old_writes;
/** How many read misses occur? */
int read_misses;
/** How many buckets where evicted? */
int evictions;
/** How many calls of DHT_write() did this process? */
int w_access;
/** How many calls of DHT_read() did this process? */
int r_access;
} DHT_stats;
/**
* Struct which serves as a handler or so called \a DHT-object. Will
* be created by DHT_create and must be passed as a parameter to every following
* function. Stores all relevant data.
* Do not touch outside DHT functions!
*/
typedef struct {
MPI_Win window;
int data_size;
int key_size;
unsigned int table_size;
MPI_Comm communicator;
int comm_size;
uint64_t(*hash_func) (int, void*);
void* recv_entry;
void* send_entry;
void* mem_alloc;
int read_misses;
int collisions;
unsigned int *index;
unsigned int index_count;
DHT_stats *stats;
/** Created MPI Window, which serves as the DHT memory area of the process. */
MPI_Win window;
/** Size of the data of a bucket entry in byte. */
int data_size;
/** Size of the key of a bucket entry in byte. */
int key_size;
/** Count of buckets for each process. */
unsigned int table_size;
/** MPI communicator of all participating processes. */
MPI_Comm communicator;
/** Size of the MPI communicator respectively all participating processes. */
int comm_size;
/** Pointer to a hashfunction. */
uint64_t (*hash_func)(int, void*);
/** Pre-allocated memory where a bucket can be received. */
void* recv_entry;
/** Pre-allocated memory where a bucket to send can be stored. */
void* send_entry;
/** Allocated memory on which the MPI window was created. */
void* mem_alloc;
/** Count of read misses over all time. */
int read_misses;
/** Count of evictions over all time. */
int evictions;
/** Array of indeces where a bucket can be stored. */
unsigned int* index;
/** Count of possible indeces. */
unsigned int index_count;
#ifdef DHT_STATISTICS
/** Detailed statistics of the usage of the DHT. */
DHT_stats* stats;
#endif
} DHT;
/*
* parameters:
* MPI_Comm comm - communicator of processes that are holding the DHT
* int size_per_process - number of buckets each process will create
* int data_size - size of data in bytes
* int key_size - size of key in bytes
* *hash_func - pointer to hashfunction
/**
* @brief Create a DHT.
*
* return:
* NULL if error during initialization
* DHT* if success
*/
extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*));
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
* void* data - pointer to data
* void* - pointer to key
* When calling this function, the required memory is allocated and a
* MPI_Window is created. This allows the execution of MPI_Get and
* MPI_Put operations for one-sided communication. Then the number of
* indexes is calculated and finally all relevant data is entered into the
* \a DHT-object which is returned.
*
* return:
* error value (see above)
* @param comm MPI communicator which addresses all participating process of the
* DHT.
* @param size_per_process Number of buckets per process.
* @param data_size Size of data in byte.
* @param key_size Size of the key in byte.
* @param hash_func Pointer to a hash function. This function must take the size
* of the key and a pointer to the key as input parameters and return a 64 bit
* hash.
* @return DHT* The returned value is the \a DHT-object which serves as a handle
* for all DHT operations. If an error occured NULL is returned.
*/
extern int DHT_write(DHT *table, void* key, void* data);
extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process,
int data_size, int key_size,
uint64_t (*hash_func)(int, void*));
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
* void* key - pointer to key
* void* destination - pointer which will hold the resulting data from DHT
/**
* @brief Write data into DHT.
*
* return:
* error value (see above)
*/
extern int DHT_read(DHT *table, void* key, void* destination);
extern int DHT_to_file(DHT *table, const char* filename);
extern int DHT_from_file(DHT *table, const char* filename);
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
* int* collision_counter - pointer which will hold the total count of collisions
* int* readerror_counter - pointer which will hold the total count of read errors
* When DHT_write is called, the address window is locked with a
* LOCK_EXCLUSIVE for write access. Now the first bucket is received
* using MPI_Get and it is checked if the bucket is empty or if the received key
* matches the passed key. If this is the case, the data of the bucket is
* overwritten with the new value. If not, the function continues with the next
* index until no more indexes are available. When the last index is reached and
* there are no more indexes available, the last examined bucket is replaced.
* After successful writing, the memory window is released and the function
* returns.
*
* return:
* error value (see above)
* @param table Pointer to the \a DHT-object.
* @param key Pointer to the key.
* @param data Pointer to the data.
* @return int Returns either DHT_SUCCESS on success or correspondending error
* value on eviction or error.
*/
extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter);
extern int DHT_write(DHT* table, void* key, void* data);
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
*
* return:
* error value (DHT_SUCCESS or DHT_MPI_ERROR)
/**
* @brief Read data from DHT.
*
* At the beginning, the target process and all possible indices are determined.
* After that a SHARED lock on the address window for read access is done
* and the first entry is retrieved. Now the received key is compared
* with the key passed to the function. If they coincide the correct data
* was found. If not it continues with the next index. If the last
* possible bucket is reached and the keys still do not match the read
* error counter is incremented. After the window has been released
* again, the function returns with a corresponding return value (read
* error or error-free read). The data to be read out is also written to
* the memory area of the passed pointer.
*
* @param table Pointer to the \a DHT-object.
* @param key Pointer to the key.
* @param destination Pointer to memory area where retreived data should be
* stored.
* @return int Returns either DHT_SUCCESS on success or correspondending error
* value on read miss or error.
*/
extern int DHT_print_statistics(DHT *table);
extern int DHT_read(DHT* table, void* key, void* destination);
/**
* @brief Write current state of DHT to file.
*
* All contents are written as a memory dump, so that no conversion takes place.
* First, an attempt is made to open or create a file. If this is successful the
* file header consisting of data and key size is written. Then each process
* reads its memory area of the DHT and each bucket that was marked as written
* is added to the file using MPI file operations.
*
* @param table Pointer to the \a DHT-object.
* @param filename Name of the file to write to.
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
* opened/closed or DHT_WRITE_ERROR if file is not writable.
*/
extern int DHT_to_file(DHT* table, const char* filename);
/**
* @brief Read state of DHT from file.
*
* One needs a previously written DHT file (by DHT_from_file).
* First of all, an attempt is made to open the specified file. If this is
* succeeded the file header is read and compared with the current values of the
* DHT. If the data and key sizes do not differ, one can continue. Each process
* reads one line of the file and writes it to the DHT with DHT_write. This
* happens until no more lines are left. The writing is done by the
* implementation of DHT_write.
*
* @param table Pointer to the \a DHT-object.
* @param filename Name of the file to read from.
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
* opened/closed, DHT_READ_ERROR if file is not readable or DHT_WRONG_FILE if
* file doesn't match expectation. This is possible if the data size or key size
* is different.
*/
extern int DHT_from_file(DHT* table, const char* filename);
/**
* @brief Free ressources of DHT.
*
* Finally, to free all resources after using the DHT, the function
* DHT_free must be used. This will free the MPI\_Window, as well as the
* associated memory. Also all internal variables are released. Optionally, the
* count of evictions and read misses can also be obtained.
*
* @param table Pointer to the \a DHT-object.
* @param eviction_counter \a optional: Pointer to integer where the count of
* evictions should be stored.
* @param readerror_counter \a optional: Pointer to integer where the count of
* read errors should be stored.
* @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on
* internal MPI error.
*/
extern int DHT_free(DHT* table, int* eviction_counter, int* readerror_counter);
/**
* @brief Prints a table with statistics about current use of DHT.
*
* These statistics are from each participated process and also summed up over
* all processes. Detailed statistics are:
* -# occupied buckets (in respect to the memory of this process)
* -# free buckets (in respect to the memory of this process)
* -# calls of DHT_write (w_access)
* -# calls of DHT_read (r_access)
* -# read misses (see DHT_READ_ERROR)
* -# collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION)
* 3-6 will reset with every call of this function finally the amount of new
* written entries is printed out (since the last call of this funtion).
*
* This is done by collective MPI operations with the root process with rank 0,
* which will also print a table with all informations to stdout.
*
* Also, as this function was implemented for a special case (POET project) one
* need to define DHT_STATISTICS to the compiler macros to use this
* function (eg. <emph>gcc -DDHT_STATISTICS ... </emph>).
* @param table Pointer to the \a DHT-object.
* @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI
* error.
*/
#ifdef DHT_STATISTICS
extern int DHT_print_statistics(DHT* table);
#endif
#endif /* DHT_H */

View File

@ -1,10 +1,14 @@
#include "DHT_Wrapper.h"
#include "DHT.h"
#include <openssl/md5.h>
#include <iostream>
#include "DHT.h"
using namespace poet;
using namespace std;
uint64_t get_md5(int key_size, void *key) {
MD5_CTX ctx;
unsigned char sum[MD5_DIGEST_LENGTH];
@ -105,10 +109,11 @@ int DHT_Wrapper::tableToFile(const char *filename) {
int DHT_Wrapper::fileToTable(const char *filename) {
int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS)
return res;
if (res != DHT_SUCCESS) return res;
#ifdef DHT_STATISTICS
DHT_print_statistics(dht_object);
#endif
return DHT_SUCCESS;
}
@ -116,7 +121,9 @@ int DHT_Wrapper::fileToTable(const char *filename) {
void DHT_Wrapper::printStatistics() {
int res;
#ifdef DHT_STATISTICS
res = DHT_print_statistics(dht_object);
#endif
if (res != DHT_SUCCESS) {
// MPI ERROR ... WHAT TO DO NOW?
@ -160,6 +167,5 @@ void DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
<< endl;
}
}
if (dt_differ)
fuzzing_buffer[var_count] = dt;
}
if (dt_differ) fuzzing_buffer[var_count] = dt;
}

View File

@ -12,8 +12,6 @@
#define ROUND(value, signif) \
(((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif))
uint64_t get_md5(int key_size, void *key);
namespace poet {
class DHT_Wrapper {
public: