From 59e99b3d0adfef258300602498e0ebc57ac73b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbke?= Date: Wed, 13 Jan 2021 21:02:57 +0100 Subject: [PATCH] documentation in DHT and compiler macro DHT_STATISTICS --- src/DHT/DHT.cpp | 761 ++++++++++++++++++++++------------------ src/DHT/DHT.h | 308 +++++++++++----- src/DHT/DHT_Wrapper.cpp | 18 +- src/DHT/DHT_Wrapper.h | 2 - 4 files changed, 663 insertions(+), 426 deletions(-) diff --git a/src/DHT/DHT.cpp b/src/DHT/DHT.cpp index 3a988cdad..2ffb3d9ad 100644 --- a/src/DHT/DHT.cpp +++ b/src/DHT/DHT.cpp @@ -1,423 +1,510 @@ #include "DHT.h" -#include -#include + #include #include +#include +#include -/* - * determining destination rank and index by hash of key +/** + * @brief Determine destination rank and index. * - * return values by reference + * This is done by looping over all possbile indices. First of all, set a + * temporary index to zero and copy count of bytes for each index into the + * memory area of the temporary index. After that the current index is + * calculated by the temporary index modulo the table size. The destination rank + * of the process is simply determined by hash modulo the communicator size. + * + * @param hash Calculated 64 bit hash. + * @param comm_size Communicator size. + * @param table_size Count of buckets per process. + * @param dest_rank Reference to the destination rank variable. + * @param index Pointer to the array index. + * @param index_count Count of possible indeces. */ -static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) { - uint64_t tmp; - int char_hop = 9-index_count; - unsigned int i; - for (i = 0; i < index_count ; i++) { - tmp = 0; - memcpy(&tmp,(unsigned char *)&hash+i, char_hop); - index[i] = (unsigned int) (tmp % table_size); - } - *dest_rank = (unsigned int) (hash % comm_size); +static void determine_dest(uint64_t hash, int comm_size, + unsigned int table_size, unsigned int *dest_rank, + unsigned int *index, unsigned int index_count) { + /** temporary index */ + uint64_t tmp_index; + /** how many bytes for one index? */ + int index_size = 9 - index_count; + for (unsigned int i = 0; i < index_count; i++) { + tmp_index = 0; + memcpy(&tmp_index, (unsigned char *)&hash + i, index_size); + index[i] = (unsigned int)(tmp_index % table_size); + } + *dest_rank = (unsigned int)(hash % comm_size); } /** - * set write flag to 1 + * @brief Set the occupied flag. + * + * This will set the first bit of a bucket to 1. + * + * @param flag_byte First byte of a bucket. */ -static void set_flag(char* flag_byte) { - *flag_byte = 0; - *flag_byte |= (1 << 0); -} +static void set_flag(char *flag_byte) { + *flag_byte = 0; + *flag_byte |= (1 << 0); +} /** - * return 1 if write flag is set - * else 0 + * @brief Get the occupied flag. + * + * This function determines whether the occupied flag of a bucket was set or + * not. + * + * @param flag_byte First byte of a bucket. + * @return int Returns 1 for true or 0 for false. */ static int read_flag(char flag_byte) { - if ((flag_byte & 0x01) == 0x01) { - return 1; - } else return 0; + if ((flag_byte & 0x01) == 0x01) { + return 1; + } else + return 0; } -/* - * allocating memory for DHT object and buckets. - * creating MPI window for OSC - * filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use - * return DHT object - */ -DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) { - DHT *object; - MPI_Win window; - void* mem_alloc; - int comm_size, tmp; +DHT *DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, + uint64_t (*hash_func)(int, void *)) { + DHT *object; + MPI_Win window; + void *mem_alloc; + int comm_size, index_bytes; - tmp = (int) ceil(log2(size)); - if (tmp%8 != 0) tmp = tmp + (8-(tmp%8)); + // calculate how much bytes for the index are needed to address count of + // buckets per process + index_bytes = (int)ceil(log2(size)); + if (index_bytes % 8 != 0) index_bytes = index_bytes + (8 - (index_bytes % 8)); - object = (DHT*) malloc(sizeof(DHT)); - if (object == NULL) return NULL; + // allocate memory for dht-object + object = (DHT *)malloc(sizeof(DHT)); + if (object == NULL) return NULL; - //every memory allocation has 1 additional byte for flags etc. - if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL; - if (MPI_Comm_size(comm, &comm_size) != 0) return NULL; + // every memory allocation has 1 additional byte for flags etc. + if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, + &mem_alloc) != 0) + return NULL; + if (MPI_Comm_size(comm, &comm_size) != 0) return NULL; - memset(mem_alloc, '\0', size * (1 + data_size + key_size)); + // since MPI_Alloc_mem doesn't provide memory allocation with the memory set + // to zero, we're doing this here + memset(mem_alloc, '\0', size * (1 + data_size + key_size)); - if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL; + // create windows on previously allocated memory + if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), + (1 + data_size + key_size), MPI_INFO_NULL, comm, + &window) != 0) + return NULL; - object->data_size = data_size; - object->key_size = key_size; - object->table_size = size; - object->window = window; - object->hash_func = hash_func; - object->comm_size = comm_size; - object->communicator = comm; - object->read_misses = 0; - object->collisions = 0; - object->recv_entry = malloc(1 + data_size + key_size); - object->send_entry = malloc(1 + data_size + key_size); - object->index_count = 9-(tmp/8); - object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int)); - object->mem_alloc = mem_alloc; + // fill dht-object + object->data_size = data_size; + object->key_size = key_size; + object->table_size = size; + object->window = window; + object->hash_func = hash_func; + object->comm_size = comm_size; + object->communicator = comm; + object->read_misses = 0; + object->evictions = 0; + object->recv_entry = malloc(1 + data_size + key_size); + object->send_entry = malloc(1 + data_size + key_size); + object->index_count = 9 - (index_bytes / 8); + object->index = (unsigned int *)malloc((9 - (index_bytes / 8)) * sizeof(int)); + object->mem_alloc = mem_alloc; - DHT_stats *stats; + // if set, initialize dht_stats +#ifdef DHT_STATISTICS + DHT_stats *stats; - stats = (DHT_stats*) malloc(sizeof(DHT_stats)); - if (stats == NULL) return NULL; + stats = (DHT_stats *)malloc(sizeof(DHT_stats)); + if (stats == NULL) return NULL; - object->stats = stats; - object->stats->writes_local = (int*) calloc(comm_size, sizeof(int)); - object->stats->old_writes = 0; - object->stats->read_misses = 0; - object->stats->collisions = 0; - object->stats->w_access = 0; - object->stats->r_access = 0; + object->stats = stats; + object->stats->writes_local = (int *)calloc(comm_size, sizeof(int)); + object->stats->old_writes = 0; + object->stats->read_misses = 0; + object->stats->evictions = 0; + object->stats->w_access = 0; + object->stats->r_access = 0; +#endif - return object; + return object; } -/* - * puts passed by data with key to DHT - * - * returning DHT_MPI_ERROR = -1 if MPI error occurred - * else DHT_SUCCESS = 0 if success - */ -int DHT_write(DHT *table, void* send_key, void* send_data) { - unsigned int dest_rank, i; - int result = DHT_SUCCESS; +int DHT_write(DHT *table, void *send_key, void *send_data) { + unsigned int dest_rank, i; + int result = DHT_SUCCESS; - table->stats->w_access++; +#ifdef DHT_STATISTICS + table->stats->w_access++; +#endif - //determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count); + // determine destination rank and index by hash of key + determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, + table->table_size, &dest_rank, table->index, + table->index_count); - //concatenating key with data to write entry to DHT - set_flag((char *) table->send_entry); - memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size); - memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size); + // concatenating key with data to write entry to DHT + set_flag((char *)table->send_entry); + memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size); + memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data, + table->data_size); - //locking window of target rank with exclusive lock - if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0) - return DHT_MPI_ERROR; - for (i = 0; i < table->index_count; i++) - { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + // locking window of target rank with exclusive lock + if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0) + return DHT_MPI_ERROR; + for (i = 0; i < table->index_count; i++) { + if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, + MPI_BYTE, dest_rank, table->index[i], + 1 + table->data_size + table->key_size, MPI_BYTE, + table->window) != 0) + return DHT_MPI_ERROR; + if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - //increment collision counter if receiving key doesn't match sending key - //,entry has write flag + last index is reached - - if (read_flag(*(char *)table->recv_entry)) { - if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) { - if (i == (table->index_count)-1) { - table->collisions += 1; - table->stats->collisions += 1; - result = DHT_WRITE_SUCCESS_WITH_COLLISION; - break; - } - } else break; - } else { - table->stats->writes_local[dest_rank]++; - break; + // increment eviction counter if receiving key doesn't match sending key + // entry has write flag and last index is reached. + if (read_flag(*(char *)table->recv_entry)) { + if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) != + 0) { + if (i == (table->index_count) - 1) { + table->evictions += 1; +#ifdef DHT_STATISTICS + table->stats->evictions += 1; +#endif + result = DHT_WRITE_SUCCESS_WITH_COLLISION; + break; } + } else + break; + } else { +#ifdef DHT_STATISTICS + table->stats->writes_local[dest_rank]++; +#endif + break; } + } - //put data to DHT - if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; - //unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + // put data to DHT (with last selected index by value i) + if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, + MPI_BYTE, dest_rank, table->index[i], + 1 + table->data_size + table->key_size, MPI_BYTE, + table->window) != 0) + return DHT_MPI_ERROR; + // unlock window of target rank + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - return result; + return result; } -/* - * gets data from the DHT by key - * - * return DHT_SUCCESS = 0 if success - * DHT_MPI_ERROR = -1 if MPI error occurred - * DHT_READ_ERROR = -2 if receiving key doesn't match sending key - */ -int DHT_read(DHT *table, void* send_key, void* destination) { - unsigned int dest_rank, i; +int DHT_read(DHT *table, void *send_key, void *destination) { + unsigned int dest_rank, i; - table->stats->r_access++; +#ifdef DHT_STATISTICS + table->stats->r_access++; +#endif - //determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count); + // determine destination rank and index by hash of key + determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, + table->table_size, &dest_rank, table->index, + table->index_count); - //locking window of target rank with shared lock - if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR; - //receive data - for (i = 0; i < table->index_count; i++) { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - - //increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached - //else copy data to dereference of passed by destination pointer + // locking window of target rank with shared lock + if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) + return DHT_MPI_ERROR; + // receive data + for (i = 0; i < table->index_count; i++) { + if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, + MPI_BYTE, dest_rank, table->index[i], + 1 + table->data_size + table->key_size, MPI_BYTE, + table->window) != 0) + return DHT_MPI_ERROR; + if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - if ((read_flag(*(char *) table->recv_entry)) == 0) { - table->read_misses += 1; - table->stats->read_misses += 1; - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - return DHT_READ_ERROR; - } - - if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) { - if (i == (table->index_count)-1) { - table->read_misses += 1; - table->stats->read_misses += 1; - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - return DHT_READ_ERROR; - } - } else break; + // increment read error counter if write flag isn't set ... + if ((read_flag(*(char *)table->recv_entry)) == 0) { + table->read_misses += 1; +#ifdef DHT_STATISTICS + table->stats->read_misses += 1; +#endif + // unlock window and return + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + return DHT_READ_ERROR; } - //unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + // ... or key doesn't match passed by key and last index reached. + if (memcmp(((char *)table->recv_entry) + 1, send_key, table->key_size) != + 0) { + if (i == (table->index_count) - 1) { + table->read_misses += 1; +#ifdef DHT_STATISTICS + table->stats->read_misses += 1; +#endif + // unlock window an return + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + return DHT_READ_ERROR; + } + } else + break; + } - memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size); + // unlock window of target rank + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - return DHT_SUCCESS; + // if matching key was found copy data into memory of passed pointer + memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1, + table->data_size); + + return DHT_SUCCESS; } -int DHT_to_file(DHT* table, const char* filename) { - //open file - MPI_File file; - if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR; +int DHT_to_file(DHT *table, const char *filename) { + // open file + MPI_File file; + if (MPI_File_open(table->communicator, filename, + MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, + &file) != 0) + return DHT_FILE_IO_ERROR; - int rank; - MPI_Comm_rank(table->communicator, &rank); + int rank; + MPI_Comm_rank(table->communicator, &rank); - //write header (key_size and data_size) - if (rank == 0) { - if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; - if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; + // write header (key_size and data_size) + if (rank == 0) { + if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != + 0) + return DHT_FILE_WRITE_ERROR; + if (MPI_File_write(file, &table->data_size, 1, MPI_INT, + MPI_STATUS_IGNORE) != 0) + return DHT_FILE_WRITE_ERROR; + } + + // seek file pointer behind header for all processes + if (MPI_File_seek_shared(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0) + return DHT_FILE_IO_ERROR; + + char *ptr; + int bucket_size = table->key_size + table->data_size + 1; + + // iterate over local memory + for (unsigned int i = 0; i < table->table_size; i++) { + ptr = (char *)table->mem_alloc + (i * bucket_size); + // if bucket has been written to (checked by written_flag)... + if (read_flag(*ptr)) { + // write key and data to file + if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, + MPI_STATUS_IGNORE) != 0) + return DHT_FILE_WRITE_ERROR; } + } + // close file + if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; - if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; - - char* ptr; - int bucket_size = table->key_size + table->data_size + 1; - - //iterate over local memory - for (unsigned int i = 0; i < table->table_size; i++) { - ptr = (char *) table->mem_alloc + (i * bucket_size); - //if bucket has been written to (checked by written_flag)... - if (read_flag(*ptr)) { - //write key and data to file - if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; - } - } - //close file - if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; - - return DHT_SUCCESS; + return DHT_SUCCESS; } -int DHT_from_file(DHT* table, const char* filename) { - MPI_File file; - MPI_Offset f_size; - int e_size, m_size, cur_pos, rank, offset; - char* buffer; - void* key; - void* data; +int DHT_from_file(DHT *table, const char *filename) { + MPI_File file; + MPI_Offset f_size; + int bucket_size, buffer_size, cur_pos, rank, offset; + char *buffer; + void *key; + void *data; - if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR; + // open file + if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, + MPI_INFO_NULL, &file) != 0) + return DHT_FILE_IO_ERROR; - if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR; + // get file size + if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR; - MPI_Comm_rank(table->communicator, &rank); + MPI_Comm_rank(table->communicator, &rank); - e_size = table->key_size + table->data_size; - m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE; - buffer = (char *) malloc(m_size); + // calculate bucket size + bucket_size = table->key_size + table->data_size; + // buffer size is either bucket size or, if bucket size is smaller than the + // file header, the size of DHT_FILEHEADER_SIZE + buffer_size = + bucket_size > DHT_FILEHEADER_SIZE ? bucket_size : DHT_FILEHEADER_SIZE; + // allocate buffer + buffer = (char *)malloc(buffer_size); - if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR; + // read file header + if (MPI_File_read(file, buffer, DHT_FILEHEADER_SIZE, MPI_BYTE, + MPI_STATUS_IGNORE) != 0) + return DHT_FILE_READ_ERROR; - if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE; - if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE; + // compare if written header data and key size matches current sizes + if (*(int *)buffer != table->key_size) return DHT_WRONG_FILE; + if (*(int *)(buffer + 4) != table->data_size) return DHT_WRONG_FILE; - offset = e_size*table->comm_size; + // set offset for each process + offset = bucket_size * table->comm_size; - if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; - cur_pos = DHT_HEADER_SIZE + (rank * e_size); + // seek behind header of DHT file + if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0) + return DHT_FILE_IO_ERROR; + + // current position is rank * bucket_size + OFFSET + cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size); - while(cur_pos < f_size) { - if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; - MPI_Offset tmp; - MPI_File_get_position(file, &tmp); - if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR; - key = buffer; - data = (buffer+table->key_size); - if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR; + // loop over file and write data to DHT with DHT_write + while (cur_pos < f_size) { + if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) + return DHT_FILE_IO_ERROR; + // TODO: really necessary? + MPI_Offset tmp; + MPI_File_get_position(file, &tmp); + if (MPI_File_read(file, buffer, bucket_size, MPI_BYTE, MPI_STATUS_IGNORE) != + 0) + return DHT_FILE_READ_ERROR; + // extract key and data and write to DHT + key = buffer; + data = (buffer + table->key_size); + if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR; - cur_pos += offset; - } + // increment current position + cur_pos += offset; + } - free (buffer); - if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; + free(buffer); + if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; - return DHT_SUCCESS; + return DHT_SUCCESS; } -/* - * frees up memory and accumulate counter - */ -int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) { - int buf; +int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) { + int buf; - if (collision_counter != NULL) { - buf = 0; - if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - *collision_counter = buf; - } - if (readerror_counter != NULL) { - buf = 0; - if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - *readerror_counter = buf; - } - if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR; - if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR; - free(table->recv_entry); - free(table->send_entry); - free(table->index); + if (eviction_counter != NULL) { + buf = 0; + if (MPI_Reduce(&table->evictions, &buf, 1, MPI_INT, MPI_SUM, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + *eviction_counter = buf; + } + if (readerror_counter != NULL) { + buf = 0; + if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + *readerror_counter = buf; + } + if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR; + if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR; + free(table->recv_entry); + free(table->send_entry); + free(table->index); - free(table->stats->writes_local); - free(table->stats); +#ifdef DHT_STATISTICS + free(table->stats->writes_local); + free(table->stats); +#endif + free(table); - free(table); - - return DHT_SUCCESS; + return DHT_SUCCESS; } -/* - * prints a table with statistics about current use of DHT - * for each participating process and summed up results containing: - * 1. occupied buckets (in respect to the memory of this process) - * 2. free buckets (in respect to the memory of this process) - * 3. calls of DHT_write (w_access) - * 4. calls of DHT_read (r_access) - * 5. read misses (see DHT_READ_ERROR) - * 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION) - * 3-6 will reset with every call of this function - * finally the amount of new written entries is printed out (in relation to last call of this funtion) - */ +#ifdef DHT_STATISTICS int DHT_print_statistics(DHT *table) { - int *written_buckets; - int *read_misses, sum_read_misses; - int *collisions, sum_collisions; - int sum_w_access, sum_r_access, *w_access, *r_access; - int rank; + int *written_buckets; + int *read_misses, sum_read_misses; + int *evictions, sum_evictions; + int sum_w_access, sum_r_access, *w_access, *r_access; + int rank; + MPI_Comm_rank(table->communicator, &rank); - MPI_Comm_rank(table->communicator, &rank); +// disable possible warning of unitialized variable, which is not the case +// since we're using MPI_Gather to obtain all values only on rank 0 +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wmaybe-uninitialized" - //disable possible warning of unitialized variable, which is not the case - //since we're using MPI_Gather to obtain all values only on rank 0 - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" + // obtaining all values from all processes in the communicator + if (rank == 0) read_misses = (int *)malloc(table->comm_size * sizeof(int)); + if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, + MPI_INT, 0, table->communicator) != 0) + return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, + MPI_SUM, 0, table->communicator) != 0) + return DHT_MPI_ERROR; + table->stats->read_misses = 0; - //obtaining all values from all processes in the communicator - if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->read_misses = 0; - - if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->collisions = 0; + if (rank == 0) evictions = (int *)malloc(table->comm_size * sizeof(int)); + if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM, + 0, table->communicator) != 0) + return DHT_MPI_ERROR; + table->stats->evictions = 0; - if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->w_access = 0; + if (rank == 0) w_access = (int *)malloc(table->comm_size * sizeof(int)); + if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + table->stats->w_access = 0; - if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->r_access = 0; - - if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int)); - if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + if (rank == 0) r_access = (int *)malloc(table->comm_size * sizeof(int)); + if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, + table->communicator) != 0) + return DHT_MPI_ERROR; + table->stats->r_access = 0; - if (rank == 0) { //only process with rank 0 will print out results as a table - int sum_written_buckets = 0; + if (rank == 0) written_buckets = (int *)calloc(table->comm_size, sizeof(int)); + if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, + MPI_INT, MPI_SUM, 0, table->communicator) != 0) + return DHT_MPI_ERROR; - for (int i=0; i < table->comm_size; i++) { - sum_written_buckets += written_buckets[i]; - } + if (rank == 0) { // only process with rank 0 will print out results as a + // table + int sum_written_buckets = 0; - int members = 7; - int padsize = (members*12)+1; - char pad[padsize+1]; - - memset(pad, '-', padsize*sizeof(char)); - pad[padsize]= '\0'; - printf("\n"); - printf("%-35s||resets with every call of this function\n", " "); - printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", - "rank", - "occupied", - "free", - "w_access", - "r_access", - "read misses", - "collisions"); - printf("%s\n", pad); - for (int i = 0; i < table->comm_size; i++) { - printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", - i, - written_buckets[i], - table->table_size-written_buckets[i], - w_access[i], - r_access[i], - read_misses[i], - collisions[i]); - } - printf("%s\n", pad); - printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", - "sum", - sum_written_buckets, - (table->table_size*table->comm_size)-sum_written_buckets, - sum_w_access, - sum_r_access, - sum_read_misses, - sum_collisions); - - printf("%s\n", pad); - printf("%s %d\n", - "new entries:", - sum_written_buckets - table->stats->old_writes); - - printf("\n"); - fflush(stdout); - - table->stats->old_writes = sum_written_buckets; + for (int i = 0; i < table->comm_size; i++) { + sum_written_buckets += written_buckets[i]; } - //enable warning again - #pragma GCC diagnostic pop + int members = 7; + int padsize = (members * 12) + 1; + char pad[padsize + 1]; - MPI_Barrier(table->communicator); - return DHT_SUCCESS; -} \ No newline at end of file + memset(pad, '-', padsize * sizeof(char)); + pad[padsize] = '\0'; + printf("\n"); + printf("%-35s||resets with every call of this function\n", " "); + printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied", + "free", "w_access", "r_access", "read misses", "evictions"); + printf("%s\n", pad); + for (int i = 0; i < table->comm_size; i++) { + printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", i, + written_buckets[i], table->table_size - written_buckets[i], + w_access[i], r_access[i], read_misses[i], evictions[i]); + } + printf("%s\n", pad); + printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", "sum", + sum_written_buckets, + (table->table_size * table->comm_size) - sum_written_buckets, + sum_w_access, sum_r_access, sum_read_misses, sum_evictions); + + printf("%s\n", pad); + printf("%s %d\n", + "new entries:", sum_written_buckets - table->stats->old_writes); + + printf("\n"); + fflush(stdout); + + table->stats->old_writes = sum_written_buckets; + } + +// enable warning again +#pragma GCC diagnostic pop + + MPI_Barrier(table->communicator); + return DHT_SUCCESS; +} +#endif \ No newline at end of file diff --git a/src/DHT/DHT.h b/src/DHT/DHT.h index 308281650..751224407 100644 --- a/src/DHT/DHT.h +++ b/src/DHT/DHT.h @@ -1,8 +1,14 @@ -/* - * File: DHT.h - * Author: max luebke +/** + * @file DHT.h + * @author Max Lübke (mluebke@uni-potsdam.de) + * @brief API to interact with the DHT + * @version 0.1 + * @date 16 Nov 2017 * - * Created on 16. November 2017, 09:14 + * This file implements the creation of a DHT by using the MPI + * one-sided-communication. There is also the possibility to write or read data + * from or to the DHT. In addition, the current state of the DHT can be written + * to a file and read in again later. */ #ifndef DHT_H @@ -11,102 +17,242 @@ #include #include +/** Returned if some error in MPI routine occurs. */ #define DHT_MPI_ERROR -1 +/** Returned by a call of DHT_read if no bucket with given key was found. */ #define DHT_READ_ERROR -2 +/** Returned by DHT_write if a bucket was evicted. */ +#define DHT_WRITE_SUCCESS_WITH_COLLISION -3 +/** Returned when no errors occured. */ #define DHT_SUCCESS 0 -#define DHT_WRITE_SUCCESS_WITH_COLLISION 1 -#define DHT_WRONG_FILE 11 -#define DHT_FILE_IO_ERROR 12 -#define DHT_FILE_READ_ERROR 13 -#define DHT_FILE_WRITE_ERROR 14 +/** Returned by DHT_from_file if the given file does not match expected file. */ +#define DHT_WRONG_FILE -11 +/** Returned by DHT file operations if MPI file operation fails. */ +#define DHT_FILE_IO_ERROR -12 +/** Returned by DHT file operations if error occured in MPI_Read operation. */ +#define DHT_FILE_READ_ERROR -13 +/** Returned by DHT file operations if error occured in MPI_Write operation. */ +#define DHT_FILE_WRITE_ERROR -14 -#define DHT_HEADER_SIZE 8 +/** Size of the file header in byte. */ +#define DHT_FILEHEADER_SIZE 8 -typedef struct {; - int *writes_local, old_writes; - int read_misses, collisions; - int w_access, r_access; +/** + * Internal struct to store statistics about read and write accesses and also + * read misses and evictions. + * All values will be resetted to zero after a call of + * DHT_print_statistics(). + * Internal use only! + * + * @todo There's maybe a better solution than DHT_print_statistics and this + * struct + */ +typedef struct { + /** Count of writes to specific process this process did. */ + int* writes_local; + /** Writes after last call of DHT_print_statistics. */ + int old_writes; + /** How many read misses occur? */ + int read_misses; + /** How many buckets where evicted? */ + int evictions; + /** How many calls of DHT_write() did this process? */ + int w_access; + /** How many calls of DHT_read() did this process? */ + int r_access; } DHT_stats; +/** + * Struct which serves as a handler or so called \a DHT-object. Will + * be created by DHT_create and must be passed as a parameter to every following + * function. Stores all relevant data. + * Do not touch outside DHT functions! + */ typedef struct { - MPI_Win window; - int data_size; - int key_size; - unsigned int table_size; - MPI_Comm communicator; - int comm_size; - uint64_t(*hash_func) (int, void*); - void* recv_entry; - void* send_entry; - void* mem_alloc; - int read_misses; - int collisions; - unsigned int *index; - unsigned int index_count; - DHT_stats *stats; + /** Created MPI Window, which serves as the DHT memory area of the process. */ + MPI_Win window; + /** Size of the data of a bucket entry in byte. */ + int data_size; + /** Size of the key of a bucket entry in byte. */ + int key_size; + /** Count of buckets for each process. */ + unsigned int table_size; + /** MPI communicator of all participating processes. */ + MPI_Comm communicator; + /** Size of the MPI communicator respectively all participating processes. */ + int comm_size; + /** Pointer to a hashfunction. */ + uint64_t (*hash_func)(int, void*); + /** Pre-allocated memory where a bucket can be received. */ + void* recv_entry; + /** Pre-allocated memory where a bucket to send can be stored. */ + void* send_entry; + /** Allocated memory on which the MPI window was created. */ + void* mem_alloc; + /** Count of read misses over all time. */ + int read_misses; + /** Count of evictions over all time. */ + int evictions; + /** Array of indeces where a bucket can be stored. */ + unsigned int* index; + /** Count of possible indeces. */ + unsigned int index_count; +#ifdef DHT_STATISTICS + /** Detailed statistics of the usage of the DHT. */ + DHT_stats* stats; +#endif } DHT; - - -/* - * parameters: - * MPI_Comm comm - communicator of processes that are holding the DHT - * int size_per_process - number of buckets each process will create - * int data_size - size of data in bytes - * int key_size - size of key in bytes - * *hash_func - pointer to hashfunction +/** + * @brief Create a DHT. * - * return: - * NULL if error during initialization - * DHT* if success - */ -extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*)); - -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * void* data - pointer to data - * void* - pointer to key + * When calling this function, the required memory is allocated and a + * MPI_Window is created. This allows the execution of MPI_Get and + * MPI_Put operations for one-sided communication. Then the number of + * indexes is calculated and finally all relevant data is entered into the + * \a DHT-object which is returned. * - * return: - * error value (see above) + * @param comm MPI communicator which addresses all participating process of the + * DHT. + * @param size_per_process Number of buckets per process. + * @param data_size Size of data in byte. + * @param key_size Size of the key in byte. + * @param hash_func Pointer to a hash function. This function must take the size + * of the key and a pointer to the key as input parameters and return a 64 bit + * hash. + * @return DHT* The returned value is the \a DHT-object which serves as a handle + * for all DHT operations. If an error occured NULL is returned. */ -extern int DHT_write(DHT *table, void* key, void* data); +extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, + int data_size, int key_size, + uint64_t (*hash_func)(int, void*)); -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * void* key - pointer to key - * void* destination - pointer which will hold the resulting data from DHT +/** + * @brief Write data into DHT. * - * return: - * error value (see above) - */ -extern int DHT_read(DHT *table, void* key, void* destination); - -extern int DHT_to_file(DHT *table, const char* filename); - -extern int DHT_from_file(DHT *table, const char* filename); - -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * int* collision_counter - pointer which will hold the total count of collisions - * int* readerror_counter - pointer which will hold the total count of read errors + * When DHT_write is called, the address window is locked with a + * LOCK_EXCLUSIVE for write access. Now the first bucket is received + * using MPI_Get and it is checked if the bucket is empty or if the received key + * matches the passed key. If this is the case, the data of the bucket is + * overwritten with the new value. If not, the function continues with the next + * index until no more indexes are available. When the last index is reached and + * there are no more indexes available, the last examined bucket is replaced. + * After successful writing, the memory window is released and the function + * returns. * - * return: - * error value (see above) + * @param table Pointer to the \a DHT-object. + * @param key Pointer to the key. + * @param data Pointer to the data. + * @return int Returns either DHT_SUCCESS on success or correspondending error + * value on eviction or error. */ -extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter); +extern int DHT_write(DHT* table, void* key, void* data); -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * - * return: - * error value (DHT_SUCCESS or DHT_MPI_ERROR) +/** + * @brief Read data from DHT. + * + * At the beginning, the target process and all possible indices are determined. + * After that a SHARED lock on the address window for read access is done + * and the first entry is retrieved. Now the received key is compared + * with the key passed to the function. If they coincide the correct data + * was found. If not it continues with the next index. If the last + * possible bucket is reached and the keys still do not match the read + * error counter is incremented. After the window has been released + * again, the function returns with a corresponding return value (read + * error or error-free read). The data to be read out is also written to + * the memory area of the passed pointer. + * + * @param table Pointer to the \a DHT-object. + * @param key Pointer to the key. + * @param destination Pointer to memory area where retreived data should be + * stored. + * @return int Returns either DHT_SUCCESS on success or correspondending error + * value on read miss or error. */ -extern int DHT_print_statistics(DHT *table); +extern int DHT_read(DHT* table, void* key, void* destination); + +/** + * @brief Write current state of DHT to file. + * + * All contents are written as a memory dump, so that no conversion takes place. + * First, an attempt is made to open or create a file. If this is successful the + * file header consisting of data and key size is written. Then each process + * reads its memory area of the DHT and each bucket that was marked as written + * is added to the file using MPI file operations. + * + * @param table Pointer to the \a DHT-object. + * @param filename Name of the file to write to. + * @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be + * opened/closed or DHT_WRITE_ERROR if file is not writable. + */ +extern int DHT_to_file(DHT* table, const char* filename); + +/** + * @brief Read state of DHT from file. + * + * One needs a previously written DHT file (by DHT_from_file). + * First of all, an attempt is made to open the specified file. If this is + * succeeded the file header is read and compared with the current values of the + * DHT. If the data and key sizes do not differ, one can continue. Each process + * reads one line of the file and writes it to the DHT with DHT_write. This + * happens until no more lines are left. The writing is done by the + * implementation of DHT_write. + * + * @param table Pointer to the \a DHT-object. + * @param filename Name of the file to read from. + * @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be + * opened/closed, DHT_READ_ERROR if file is not readable or DHT_WRONG_FILE if + * file doesn't match expectation. This is possible if the data size or key size + * is different. + */ +extern int DHT_from_file(DHT* table, const char* filename); + +/** + * @brief Free ressources of DHT. + * + * Finally, to free all resources after using the DHT, the function + * DHT_free must be used. This will free the MPI\_Window, as well as the + * associated memory. Also all internal variables are released. Optionally, the + * count of evictions and read misses can also be obtained. + * + * @param table Pointer to the \a DHT-object. + * @param eviction_counter \a optional: Pointer to integer where the count of + * evictions should be stored. + * @param readerror_counter \a optional: Pointer to integer where the count of + * read errors should be stored. + * @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on + * internal MPI error. + */ +extern int DHT_free(DHT* table, int* eviction_counter, int* readerror_counter); + +/** + * @brief Prints a table with statistics about current use of DHT. + * + * These statistics are from each participated process and also summed up over + * all processes. Detailed statistics are: + * -# occupied buckets (in respect to the memory of this process) + * -# free buckets (in respect to the memory of this process) + * -# calls of DHT_write (w_access) + * -# calls of DHT_read (r_access) + * -# read misses (see DHT_READ_ERROR) + * -# collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION) + * 3-6 will reset with every call of this function finally the amount of new + * written entries is printed out (since the last call of this funtion). + * + * This is done by collective MPI operations with the root process with rank 0, + * which will also print a table with all informations to stdout. + * + * Also, as this function was implemented for a special case (POET project) one + * need to define DHT_STATISTICS to the compiler macros to use this + * function (eg. gcc -DDHT_STATISTICS ... ). + * @param table Pointer to the \a DHT-object. + * @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI + * error. + */ + +#ifdef DHT_STATISTICS +extern int DHT_print_statistics(DHT* table); +#endif #endif /* DHT_H */ \ No newline at end of file diff --git a/src/DHT/DHT_Wrapper.cpp b/src/DHT/DHT_Wrapper.cpp index 4cdc77a88..0e2b8edef 100644 --- a/src/DHT/DHT_Wrapper.cpp +++ b/src/DHT/DHT_Wrapper.cpp @@ -1,10 +1,14 @@ #include "DHT_Wrapper.h" -#include "DHT.h" + #include + #include +#include "DHT.h" + using namespace poet; using namespace std; + uint64_t get_md5(int key_size, void *key) { MD5_CTX ctx; unsigned char sum[MD5_DIGEST_LENGTH]; @@ -105,10 +109,11 @@ int DHT_Wrapper::tableToFile(const char *filename) { int DHT_Wrapper::fileToTable(const char *filename) { int res = DHT_from_file(dht_object, filename); - if (res != DHT_SUCCESS) - return res; + if (res != DHT_SUCCESS) return res; +#ifdef DHT_STATISTICS DHT_print_statistics(dht_object); +#endif return DHT_SUCCESS; } @@ -116,7 +121,9 @@ int DHT_Wrapper::fileToTable(const char *filename) { void DHT_Wrapper::printStatistics() { int res; +#ifdef DHT_STATISTICS res = DHT_print_statistics(dht_object); +#endif if (res != DHT_SUCCESS) { // MPI ERROR ... WHAT TO DO NOW? @@ -160,6 +167,5 @@ void DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) { << endl; } } - if (dt_differ) - fuzzing_buffer[var_count] = dt; -} + if (dt_differ) fuzzing_buffer[var_count] = dt; +} \ No newline at end of file diff --git a/src/DHT/DHT_Wrapper.h b/src/DHT/DHT_Wrapper.h index 7c5218af2..cbabf33a6 100644 --- a/src/DHT/DHT_Wrapper.h +++ b/src/DHT/DHT_Wrapper.h @@ -12,8 +12,6 @@ #define ROUND(value, signif) \ (((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif)) -uint64_t get_md5(int key_size, void *key); - namespace poet { class DHT_Wrapper { public: