diff --git a/.gitignore b/.gitignore index a1bcba0ce..0889d14d1 100644 --- a/.gitignore +++ b/.gitignore @@ -137,12 +137,14 @@ vignettes/*.pdf # R package: bookdown caching files /*_files/ -### vscode ### +### VisualStudioCode ### .vscode/* -!.vscode/settings.json !.vscode/tasks.json !.vscode/launch.json -!.vscode/extensions.json *.code-workspace +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide # End of https://www.toptal.com/developers/gitignore/api/c,c++,r,cmake diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4eb121d63..f0f865eef 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS}) #define program libraries -add_library(POET_Libs STATIC util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp model/Grid.cpp) +add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp) target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS}) target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto) diff --git a/src/DHT/DHT.cpp b/src/DHT/DHT.cpp new file mode 100644 index 000000000..3a988cdad --- /dev/null +++ b/src/DHT/DHT.cpp @@ -0,0 +1,423 @@ +#include "DHT.h" +#include +#include +#include +#include + +/* + * determining destination rank and index by hash of key + * + * return values by reference + */ +static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) { + uint64_t tmp; + int char_hop = 9-index_count; + unsigned int i; + for (i = 0; i < index_count ; i++) { + tmp = 0; + memcpy(&tmp,(unsigned char *)&hash+i, char_hop); + index[i] = (unsigned int) (tmp % table_size); + } + *dest_rank = (unsigned int) (hash % comm_size); +} + +/** + * set write flag to 1 + */ +static void set_flag(char* flag_byte) { + *flag_byte = 0; + *flag_byte |= (1 << 0); +} + +/** + * return 1 if write flag is set + * else 0 + */ +static int read_flag(char flag_byte) { + if ((flag_byte & 0x01) == 0x01) { + return 1; + } else return 0; +} + +/* + * allocating memory for DHT object and buckets. + * creating MPI window for OSC + * filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use + * return DHT object + */ +DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) { + DHT *object; + MPI_Win window; + void* mem_alloc; + int comm_size, tmp; + + tmp = (int) ceil(log2(size)); + if (tmp%8 != 0) tmp = tmp + (8-(tmp%8)); + + object = (DHT*) malloc(sizeof(DHT)); + if (object == NULL) return NULL; + + //every memory allocation has 1 additional byte for flags etc. + if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL; + if (MPI_Comm_size(comm, &comm_size) != 0) return NULL; + + memset(mem_alloc, '\0', size * (1 + data_size + key_size)); + + if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL; + + object->data_size = data_size; + object->key_size = key_size; + object->table_size = size; + object->window = window; + object->hash_func = hash_func; + object->comm_size = comm_size; + object->communicator = comm; + object->read_misses = 0; + object->collisions = 0; + object->recv_entry = malloc(1 + data_size + key_size); + object->send_entry = malloc(1 + data_size + key_size); + object->index_count = 9-(tmp/8); + object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int)); + object->mem_alloc = mem_alloc; + + DHT_stats *stats; + + stats = (DHT_stats*) malloc(sizeof(DHT_stats)); + if (stats == NULL) return NULL; + + object->stats = stats; + object->stats->writes_local = (int*) calloc(comm_size, sizeof(int)); + object->stats->old_writes = 0; + object->stats->read_misses = 0; + object->stats->collisions = 0; + object->stats->w_access = 0; + object->stats->r_access = 0; + + return object; +} + +/* + * puts passed by data with key to DHT + * + * returning DHT_MPI_ERROR = -1 if MPI error occurred + * else DHT_SUCCESS = 0 if success + */ +int DHT_write(DHT *table, void* send_key, void* send_data) { + unsigned int dest_rank, i; + int result = DHT_SUCCESS; + + table->stats->w_access++; + + //determine destination rank and index by hash of key + determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count); + + //concatenating key with data to write entry to DHT + set_flag((char *) table->send_entry); + memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size); + memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size); + + //locking window of target rank with exclusive lock + if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0) + return DHT_MPI_ERROR; + for (i = 0; i < table->index_count; i++) + { + if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; + if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + + //increment collision counter if receiving key doesn't match sending key + //,entry has write flag + last index is reached + + if (read_flag(*(char *)table->recv_entry)) { + if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) { + if (i == (table->index_count)-1) { + table->collisions += 1; + table->stats->collisions += 1; + result = DHT_WRITE_SUCCESS_WITH_COLLISION; + break; + } + } else break; + } else { + table->stats->writes_local[dest_rank]++; + break; + } + } + + //put data to DHT + if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; + //unlock window of target rank + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + + return result; +} + +/* + * gets data from the DHT by key + * + * return DHT_SUCCESS = 0 if success + * DHT_MPI_ERROR = -1 if MPI error occurred + * DHT_READ_ERROR = -2 if receiving key doesn't match sending key + */ +int DHT_read(DHT *table, void* send_key, void* destination) { + unsigned int dest_rank, i; + + table->stats->r_access++; + + //determine destination rank and index by hash of key + determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count); + + //locking window of target rank with shared lock + if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR; + //receive data + for (i = 0; i < table->index_count; i++) { + if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; + if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + + //increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached + //else copy data to dereference of passed by destination pointer + + if ((read_flag(*(char *) table->recv_entry)) == 0) { + table->read_misses += 1; + table->stats->read_misses += 1; + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + return DHT_READ_ERROR; + } + + if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) { + if (i == (table->index_count)-1) { + table->read_misses += 1; + table->stats->read_misses += 1; + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + return DHT_READ_ERROR; + } + } else break; + } + + //unlock window of target rank + if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; + + memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size); + + return DHT_SUCCESS; +} + +int DHT_to_file(DHT* table, const char* filename) { + //open file + MPI_File file; + if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR; + + int rank; + MPI_Comm_rank(table->communicator, &rank); + + //write header (key_size and data_size) + if (rank == 0) { + if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; + if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; + } + + if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; + + char* ptr; + int bucket_size = table->key_size + table->data_size + 1; + + //iterate over local memory + for (unsigned int i = 0; i < table->table_size; i++) { + ptr = (char *) table->mem_alloc + (i * bucket_size); + //if bucket has been written to (checked by written_flag)... + if (read_flag(*ptr)) { + //write key and data to file + if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; + } + } + //close file + if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; + + return DHT_SUCCESS; +} + +int DHT_from_file(DHT* table, const char* filename) { + MPI_File file; + MPI_Offset f_size; + int e_size, m_size, cur_pos, rank, offset; + char* buffer; + void* key; + void* data; + + if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR; + + if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR; + + MPI_Comm_rank(table->communicator, &rank); + + e_size = table->key_size + table->data_size; + m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE; + buffer = (char *) malloc(m_size); + + if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR; + + if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE; + if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE; + + offset = e_size*table->comm_size; + + if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; + cur_pos = DHT_HEADER_SIZE + (rank * e_size); + + while(cur_pos < f_size) { + if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; + MPI_Offset tmp; + MPI_File_get_position(file, &tmp); + if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR; + key = buffer; + data = (buffer+table->key_size); + if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR; + + cur_pos += offset; + } + + free (buffer); + if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; + + return DHT_SUCCESS; +} + +/* + * frees up memory and accumulate counter + */ +int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) { + int buf; + + if (collision_counter != NULL) { + buf = 0; + if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + *collision_counter = buf; + } + if (readerror_counter != NULL) { + buf = 0; + if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + *readerror_counter = buf; + } + if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR; + if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR; + free(table->recv_entry); + free(table->send_entry); + free(table->index); + + free(table->stats->writes_local); + free(table->stats); + + free(table); + + return DHT_SUCCESS; +} + +/* + * prints a table with statistics about current use of DHT + * for each participating process and summed up results containing: + * 1. occupied buckets (in respect to the memory of this process) + * 2. free buckets (in respect to the memory of this process) + * 3. calls of DHT_write (w_access) + * 4. calls of DHT_read (r_access) + * 5. read misses (see DHT_READ_ERROR) + * 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION) + * 3-6 will reset with every call of this function + * finally the amount of new written entries is printed out (in relation to last call of this funtion) + */ +int DHT_print_statistics(DHT *table) { + int *written_buckets; + int *read_misses, sum_read_misses; + int *collisions, sum_collisions; + int sum_w_access, sum_r_access, *w_access, *r_access; + int rank; + + MPI_Comm_rank(table->communicator, &rank); + + //disable possible warning of unitialized variable, which is not the case + //since we're using MPI_Gather to obtain all values only on rank 0 + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" + + //obtaining all values from all processes in the communicator + if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int)); + if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + table->stats->read_misses = 0; + + if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int)); + if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + table->stats->collisions = 0; + + if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int)); + if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + table->stats->w_access = 0; + + if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int)); + if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; + if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + table->stats->r_access = 0; + + if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int)); + if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; + + if (rank == 0) { //only process with rank 0 will print out results as a table + int sum_written_buckets = 0; + + for (int i=0; i < table->comm_size; i++) { + sum_written_buckets += written_buckets[i]; + } + + int members = 7; + int padsize = (members*12)+1; + char pad[padsize+1]; + + memset(pad, '-', padsize*sizeof(char)); + pad[padsize]= '\0'; + printf("\n"); + printf("%-35s||resets with every call of this function\n", " "); + printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", + "rank", + "occupied", + "free", + "w_access", + "r_access", + "read misses", + "collisions"); + printf("%s\n", pad); + for (int i = 0; i < table->comm_size; i++) { + printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", + i, + written_buckets[i], + table->table_size-written_buckets[i], + w_access[i], + r_access[i], + read_misses[i], + collisions[i]); + } + printf("%s\n", pad); + printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", + "sum", + sum_written_buckets, + (table->table_size*table->comm_size)-sum_written_buckets, + sum_w_access, + sum_r_access, + sum_read_misses, + sum_collisions); + + printf("%s\n", pad); + printf("%s %d\n", + "new entries:", + sum_written_buckets - table->stats->old_writes); + + printf("\n"); + fflush(stdout); + + table->stats->old_writes = sum_written_buckets; + } + + //enable warning again + #pragma GCC diagnostic pop + + MPI_Barrier(table->communicator); + return DHT_SUCCESS; +} \ No newline at end of file diff --git a/src/DHT/DHT.h b/src/DHT/DHT.h new file mode 100644 index 000000000..308281650 --- /dev/null +++ b/src/DHT/DHT.h @@ -0,0 +1,112 @@ +/* + * File: DHT.h + * Author: max luebke + * + * Created on 16. November 2017, 09:14 + */ + +#ifndef DHT_H +#define DHT_H + +#include +#include + +#define DHT_MPI_ERROR -1 +#define DHT_READ_ERROR -2 +#define DHT_SUCCESS 0 +#define DHT_WRITE_SUCCESS_WITH_COLLISION 1 + +#define DHT_WRONG_FILE 11 +#define DHT_FILE_IO_ERROR 12 +#define DHT_FILE_READ_ERROR 13 +#define DHT_FILE_WRITE_ERROR 14 + +#define DHT_HEADER_SIZE 8 + +typedef struct {; + int *writes_local, old_writes; + int read_misses, collisions; + int w_access, r_access; +} DHT_stats; + +typedef struct { + MPI_Win window; + int data_size; + int key_size; + unsigned int table_size; + MPI_Comm communicator; + int comm_size; + uint64_t(*hash_func) (int, void*); + void* recv_entry; + void* send_entry; + void* mem_alloc; + int read_misses; + int collisions; + unsigned int *index; + unsigned int index_count; + DHT_stats *stats; +} DHT; + + + +/* + * parameters: + * MPI_Comm comm - communicator of processes that are holding the DHT + * int size_per_process - number of buckets each process will create + * int data_size - size of data in bytes + * int key_size - size of key in bytes + * *hash_func - pointer to hashfunction + * + * return: + * NULL if error during initialization + * DHT* if success + */ +extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*)); + +/* + * parameters: + * DHT *table - DHT_object created by DHT_create + * void* data - pointer to data + * void* - pointer to key + * + * return: + * error value (see above) + */ +extern int DHT_write(DHT *table, void* key, void* data); + +/* + * parameters: + * DHT *table - DHT_object created by DHT_create + * void* key - pointer to key + * void* destination - pointer which will hold the resulting data from DHT + * + * return: + * error value (see above) + */ +extern int DHT_read(DHT *table, void* key, void* destination); + +extern int DHT_to_file(DHT *table, const char* filename); + +extern int DHT_from_file(DHT *table, const char* filename); + +/* + * parameters: + * DHT *table - DHT_object created by DHT_create + * int* collision_counter - pointer which will hold the total count of collisions + * int* readerror_counter - pointer which will hold the total count of read errors + * + * return: + * error value (see above) + */ +extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter); + +/* + * parameters: + * DHT *table - DHT_object created by DHT_create + * + * return: + * error value (DHT_SUCCESS or DHT_MPI_ERROR) + */ +extern int DHT_print_statistics(DHT *table); + +#endif /* DHT_H */ \ No newline at end of file diff --git a/src/DHT/DHT_Wrapper.cpp b/src/DHT/DHT_Wrapper.cpp new file mode 100644 index 000000000..4cdc77a88 --- /dev/null +++ b/src/DHT/DHT_Wrapper.cpp @@ -0,0 +1,165 @@ +#include "DHT_Wrapper.h" +#include "DHT.h" +#include +#include + +using namespace poet; +using namespace std; +uint64_t get_md5(int key_size, void *key) { + MD5_CTX ctx; + unsigned char sum[MD5_DIGEST_LENGTH]; + uint64_t retval, *v1, *v2; + + MD5_Init(&ctx); + MD5_Update(&ctx, key, key_size); + MD5_Final(sum, &ctx); + + v1 = (uint64_t *)&sum[0]; + v2 = (uint64_t *)&sum[8]; + retval = *v1 ^ *v2; + + return retval; +} + +DHT_Wrapper::DHT_Wrapper(t_simparams *params, MPI_Comm dht_comm, + int buckets_per_process, int data_size, int key_size) { + dht_object = + DHT_create(dht_comm, buckets_per_process, data_size, key_size, &get_md5); + fuzzing_buffer = (double *)malloc(key_size); + + this->dt_differ = params->dt_differ; + this->dht_log = params->dht_log; + this->dht_signif_vector = params->dht_signif_vector; + this->dht_prop_type_vector = params->dht_prop_type_vector; +} + +DHT_Wrapper::~DHT_Wrapper() { + DHT_free(dht_object, NULL, NULL); + free(fuzzing_buffer); +} + +void DHT_Wrapper::checkDHT(int length, std::vector &out_result_index, + double *work_package, double dt) { + void *key; + int res; + int var_count = dht_prop_type_vector.size(); + for (int i = 0; i < length; i++) { + key = (void *)&(work_package[i * var_count]); + + // fuzz data (round, logarithm etc.) + fuzzForDHT(var_count, key, dt); + + // overwrite input with data from DHT, IF value is found in DHT + res = DHT_read(dht_object, fuzzing_buffer, key); + + if (res == DHT_SUCCESS) { + // flag that this line is replaced by DHT-value, do not simulate!! + out_result_index[i] = false; + dht_hits++; + } else if (res == DHT_READ_ERROR) { + // this line is untouched, simulation is needed + out_result_index[i] = true; + dht_miss++; + } else { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING + } + } +} + +void DHT_Wrapper::fillDHT(int length, std::vector &result_index, + double *work_package, double *results, double dt) { + void *key; + void *data; + int res; + int var_count = dht_prop_type_vector.size(); + + for (int i = 0; i < length; i++) { + key = (void *)&(work_package[i * var_count]); + data = (void *)&(results[i * var_count]); + + if (result_index[i]) { + // If true -> was simulated, needs to be inserted into dht + + // fuzz data (round, logarithm etc.) + fuzzForDHT(var_count, key, dt); + + res = DHT_write(dht_object, fuzzing_buffer, data); + + if (res != DHT_SUCCESS) { + if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) { + dht_evictions++; + } else { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING + } + } + } + } +} + +int DHT_Wrapper::tableToFile(const char *filename) { + int res = DHT_to_file(dht_object, filename); + return res; +} + +int DHT_Wrapper::fileToTable(const char *filename) { + int res = DHT_from_file(dht_object, filename); + if (res != DHT_SUCCESS) + return res; + + DHT_print_statistics(dht_object); + + return DHT_SUCCESS; +} + +void DHT_Wrapper::printStatistics() { + int res; + + res = DHT_print_statistics(dht_object); + + if (res != DHT_SUCCESS) { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING + } +} + +uint64_t DHT_Wrapper::getHits() { return this->dht_hits; } + +uint64_t DHT_Wrapper::getMisses() { return this->dht_miss; } + +uint64_t DHT_Wrapper::getEvictions() { return this->dht_evictions; } + +void DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) { + unsigned int i = 0; + // introduce fuzzing to allow more hits in DHT + for (i = 0; i < (unsigned int)var_count; i++) { + if (dht_prop_type_vector[i] == "act") { + // with log10 + if (dht_log) { + if (((double *)key)[i] < 0) + cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in " + "key!" + << endl; + else if (((double *)key)[i] == 0) + fuzzing_buffer[i] = 0; + else + fuzzing_buffer[i] = + ROUND(-(std::log10(((double *)key)[i])), dht_signif_vector[i]); + } else { + // without log10 + fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_signif_vector[i]); + } + } else if (dht_prop_type_vector[i] == "logact") { + fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_signif_vector[i]); + } else if (dht_prop_type_vector[i] == "ignore") { + fuzzing_buffer[i] = 0; + } else { + cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong " + "prop_type!" + << endl; + } + } + if (dt_differ) + fuzzing_buffer[var_count] = dt; +} diff --git a/src/DHT/DHT_Wrapper.h b/src/DHT/DHT_Wrapper.h new file mode 100644 index 000000000..7c5218af2 --- /dev/null +++ b/src/DHT/DHT_Wrapper.h @@ -0,0 +1,56 @@ +#ifndef DHT_WRAPPER_H +#define DHT_WRAPPER_H + +#include + +#include +#include + +#include "../util/SimParams.h" +#include "DHT.h" + +#define ROUND(value, signif) \ + (((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif)) + +uint64_t get_md5(int key_size, void *key); + +namespace poet { +class DHT_Wrapper { + public: + DHT_Wrapper(t_simparams *params, MPI_Comm dht_comm, int buckets_per_process, + int data_size, int key_size); + ~DHT_Wrapper(); + + void checkDHT(int length, std::vector &out_result_index, + double *work_package, double dt); + void fillDHT(int length, std::vector &result_index, + double *work_package, double *results, double dt); + + int tableToFile(const char *filename); + int fileToTable(const char *filename); + + void printStatistics(); + + uint64_t getHits(); + uint64_t getMisses(); + uint64_t getEvictions(); + + private: + void fuzzForDHT(int var_count, void *key, double dt); + + DHT *dht_object; + + uint64_t dht_hits = 0; + uint64_t dht_miss = 0; + uint64_t dht_evictions = 0; + + double *fuzzing_buffer; + + bool dt_differ; + bool dht_log; + std::vector dht_signif_vector; + std::vector dht_prop_type_vector; +}; +} // namespace poet + +#endif // DHT_WRAPPER_H diff --git a/src/kin.cpp b/src/kin.cpp index 8cdfa47f7..abfb6023d 100644 --- a/src/kin.cpp +++ b/src/kin.cpp @@ -1,20 +1,22 @@ +#include +#include // mpi header file + #include #include #include #include -#include - -#include // mpi header file - -#include "DHT.h" // MPI-DHT Implementation -#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced -#include "dht_wrapper.h" -#include "global_buffer.h" +// #include "DHT.h" // MPI-DHT Implementation +#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced +// #include "dht_wrapper.h" +// #include "global_buffer.h" +#include "model/ChemSim.h" #include "model/Grid.h" #include "util/RRuntime.h" #include "util/SimParams.h" -#include "worker.h" +// #include "worker.h" + +#define DHT_SIZE_PER_PROCESS 1073741824 using namespace std; using namespace poet; @@ -59,8 +61,7 @@ std::list checkOptions(argh::parser cmdl) { std::set plist = paramList(); for (auto &flag : cmdl.flags()) { - if (!(flist.find(flag) != flist.end())) - retList.push_back(flag); + if (!(flist.find(flag) != flist.end())) retList.push_back(flag); } for (auto ¶m : cmdl.params()) { @@ -85,7 +86,7 @@ int main(int argc, char *argv[]) { double cummul_workers = 0.f; double cummul_chemistry_master = 0.f; - double cummul_master_seq_pre_loop = 0.f; + double cummul_master_seq = 0.f; double cummul_master_seq_loop = 0.f; double master_idle = 0.f; @@ -121,8 +122,7 @@ int main(int argc, char *argv[]) { // make a list of processes in the new communicator process_ranks = (int *)malloc(params.world_size * sizeof(int)); - for (int I = 1; I < params.world_size; I++) - process_ranks[I - 1] = I; + for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I; // get the group under MPI_COMM_WORLD MPI_Comm_group(MPI_COMM_WORLD, &group_world); @@ -130,7 +130,7 @@ int main(int argc, char *argv[]) { MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group); // create the new communicator MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm); - free(process_ranks); // cleanup + free(process_ranks); // cleanup // cout << "Done"; if (cmdl[{"help", "h"}]) { @@ -226,21 +226,24 @@ int main(int argc, char *argv[]) { R["local_rank"] = params.world_rank; /*Loading Dependencies*/ - std::string r_load_dependencies = "suppressMessages(library(Rmufits));" - "suppressMessages(library(RedModRphree));" - "source('kin_r_library.R');" - "source('parallel_r_library.R');"; + std::string r_load_dependencies = + "suppressMessages(library(Rmufits));" + "suppressMessages(library(RedModRphree));" + "source('kin_r_library.R');" + "source('parallel_r_library.R');"; R.parseEvalQ(r_load_dependencies); std::string filesim; - cmdl(1) >> filesim; // <- first positional argument - R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim' - R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns + cmdl(1) >> filesim; // <- first positional argument + R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim' + R.parseEvalQ( + "source(filesim)"); // eval the init string, ignoring any returns if (params.world_rank == - 0) { // only rank 0 initializes goes through the whole initialization - cmdl(2) >> params.out_dir; // <- second positional argument - R["fileout"] = wrap(params.out_dir); // assign a char* (string) to 'fileout' + 0) { // only rank 0 initializes goes through the whole initialization + cmdl(2) >> params.out_dir; // <- second positional argument + R["fileout"] = + wrap(params.out_dir); // assign a char* (string) to 'fileout' // Note: R::sim_init() checks if the directory already exists, // if not it makes it @@ -253,9 +256,9 @@ int main(int argc, char *argv[]) { R.parseEval(master_init_code); params.dt_differ = - R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper + R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); - } else { // workers will only read the setup DataFrame defined by input file + } else { // workers will only read the setup DataFrame defined by input file R.parseEval("mysetup <- setup"); MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); } @@ -280,15 +283,15 @@ int main(int argc, char *argv[]) { R["work_package_size"] = params.wp_size; // Removed additional field for ID in previous versions - if (params.world_rank == 0) { - mpi_buffer = - (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); - } else { - mpi_buffer = (double *)calloc( - (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); - mpi_buffer_results = - (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double)); - } + // if (params.world_rank == 0) { + // mpi_buffer = + // (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); + // } else { + // mpi_buffer = (double *)calloc( + // (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); + // mpi_buffer_results = + // (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double)); + // } if (params.world_rank == 0) { cout << "CPP: parallel init completed (buffers allocated)!" << endl; @@ -302,14 +305,14 @@ int main(int argc, char *argv[]) { if (params.dht_enabled) { // cout << "\nCreating DHT\n"; // determine size of dht entries - int dht_data_size = grid.getCols() * sizeof(double); - int dht_key_size = - grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double)); + // int dht_data_size = grid.getCols() * sizeof(double); + // int dht_key_size = + // grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double)); - // determine bucket count for preset memory usage - // bucket size is key + value + 1 byte for status - int dht_buckets_per_process = - params.dht_size_per_process / (1 + dht_data_size + dht_key_size); + // // // determine bucket count for preset memory usage + // // // bucket size is key + value + 1 byte for status + // int dht_buckets_per_process = + // params.dht_size_per_process / (1 + dht_data_size + dht_key_size); // MDL : following code moved here from worker.cpp /*Load significance vector from R setup file (or set default)*/ @@ -317,8 +320,8 @@ int main(int argc, char *argv[]) { if (signif_vector_exists) { params.dht_signif_vector = as>(R["signif_vector"]); } else { - params.dht_signif_vector.assign(dht_object->key_size / sizeof(double), - dht_significant_digits); + // params.dht_signif_vector.assign(dht_object->key_size / sizeof(double), + // dht_significant_digits); } /*Load property type vector from R setup file (or set default)*/ @@ -326,22 +329,24 @@ int main(int argc, char *argv[]) { if (prop_type_vector_exists) { params.dht_prop_type_vector = as>(R["prop_type"]); } else { - params.dht_prop_type_vector.assign(dht_object->key_size / sizeof(double), - "act"); + // params.dht_prop_type_vector.assign(dht_object->key_size / + // sizeof(double), + // "act"); } if (params.world_rank == 0) { - // print only on master, values are equal on all workes - cout << "CPP: dht_data_size: " << dht_data_size << "\n"; - cout << "CPP: dht_key_size: " << dht_key_size << "\n"; - cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process - << endl; + // // print only on master, values are equal on all workes + // cout << "CPP: dht_data_size: " << dht_data_size << "\n"; + // cout << "CPP: dht_key_size: " << dht_key_size << "\n"; + // cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process + // << endl; // MDL: new output on signif_vector and prop_type if (signif_vector_exists) { cout << "CPP: using problem-specific rounding digits: " << endl; - R.parseEval("print(data.frame(prop=prop, type=prop_type, " - "digits=signif_vector))"); + R.parseEval( + "print(data.frame(prop=prop, type=prop_type, " + "digits=signif_vector))"); } else { cout << "CPP: using DHT default rounding digits = " << dht_significant_digits << endl; @@ -353,22 +358,22 @@ int main(int argc, char *argv[]) { R["dht_final_proptype"] = params.dht_prop_type_vector; } - if (params.dht_strategy == 0) { - if (params.world_rank != 0) { - dht_object = DHT_create(dht_comm, dht_buckets_per_process, - dht_data_size, dht_key_size, get_md5); + // if (params.dht_strategy == 0) { + // if (params.world_rank != 0) { + // dht_object = DHT_create(dht_comm, dht_buckets_per_process, + // dht_data_size, dht_key_size, get_md5); - // storing for access from worker and callback functions - fuzzing_buffer = (double *)malloc(dht_key_size); - } - } else { - dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process, - dht_data_size, dht_key_size, get_md5); - } + // // storing for access from worker and callback functions + // fuzzing_buffer = (double *)malloc(dht_key_size); + // } + // } else { + // dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process, + // dht_data_size, dht_key_size, get_md5); + // } - if (params.world_rank == 0) { - cout << "CPP: DHT successfully created!" << endl; - } + // if (params.world_rank == 0) { + // cout << "CPP: DHT successfully created!" << endl; + // } } // MDL: store all parameters @@ -380,52 +385,23 @@ int main(int argc, char *argv[]) { MPI_Barrier(MPI_COMM_WORLD); if (params.world_rank == 0) { /* This is executed by the master */ + ChemMaster master(¶ms, R, grid); Rcpp::NumericVector master_send; Rcpp::NumericVector master_recv; - sim_a_seq = MPI_Wtime(); - - worker_struct *workerlist = - (worker_struct *)calloc(params.world_size - 1, sizeof(worker_struct)); - int need_to_receive; - MPI_Status probe_status; double *timings; uint64_t *dht_perfs = NULL; - int local_work_package_size; - // a temporary send buffer - double *send_buffer; - send_buffer = (double *)calloc( - (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); - - // helper variables - int iteration; - double dt, current_sim_time; - - int n_wp = 1; // holds the actual number of wp which is - // computed later in R::distribute_work_packages() - std::vector wp_sizes_vector; // vector with the sizes of - // each package - sim_start = MPI_Wtime(); // Iteration Count is dynamic, retrieving value from R (is only needed by // master for the following loop) uint32_t maxiter = R.parseEval("mysetup$maxiter"); - sim_b_seq = MPI_Wtime(); - - cummul_master_seq_pre_loop += sim_b_seq - sim_a_seq; - /*SIMULATION LOOP*/ for (uint32_t iter = 1; iter < maxiter + 1; iter++) { - sim_a_seq = MPI_Wtime(); - - cummul_master_send = 0.f; - cummul_master_recv = 0.f; - cout << "CPP: Evaluating next time step" << endl; R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)"); @@ -440,201 +416,19 @@ int main(int argc, char *argv[]) { R.parseEvalQ("mysetup <- master_advection(setup=mysetup)"); sim_a_transport = MPI_Wtime(); + if (iter == 1) master.prepareSimulation(); + cout << "CPP: Chemistry" << endl; /*Fallback for sequential execution*/ sim_b_chemistry = MPI_Wtime(); if (params.world_size == 1) { - // MDL : the transformation of values into pH and pe - // takes now place in master_advection() so the - // following line is unneeded - // R.parseEvalQ("mysetup$state_T <- - // RedModRphree::Act2pH(mysetup$state_T)"); - R.parseEvalQ( - "result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)"); - R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); + master.runSeq(); } else { /*send work to workers*/ - - // NEW: only in the first iteration we call - // R::distribute_work_packages()!! - if (iter == 1) { - R.parseEvalQ( - "wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), " - "package_size=work_package_size)"); - - // we only sort once the vector - R.parseEvalQ("ordered_ids <- order(wp_ids)"); - R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); - n_wp = (int)R.parseEval("length(wp_sizes_vector)"); - wp_sizes_vector = as>(R["wp_sizes_vector"]); - cout << "CPP: Total number of work packages: " << n_wp << endl; - R.parseEval("stat_wp_sizes(wp_sizes_vector)"); - } - - /* shuffle and extract data - MDL: we now apply :Act2pH directly in master_advection - */ - // R.parseEval("tmp <- - // shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)"); - // Rcpp::DataFrame chemistry_data = R.parseEval("tmp"); - - // convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data); - // cout << "CPP: shuffle_field() done" << endl; - grid.shuffleAndExport(mpi_buffer); - /* send and receive work; this is done by counting - * the wp */ - int pkg_to_send = n_wp; - int pkg_to_recv = n_wp; - size_t colCount = grid.getCols(); - int free_workers = params.world_size - 1; - double *work_pointer = mpi_buffer; - sim_c_chemistry = MPI_Wtime(); - - /* visual progress */ - float progress = 0.0; - int barWidth = 70; - - // retrieve data from R runtime - iteration = (int)R.parseEval("mysetup$iter"); - dt = (double)R.parseEval("mysetup$requested_dt"); - current_sim_time = - (double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt"); - - int count_pkgs = 0; - - sim_b_seq = MPI_Wtime(); - - sim_c_chemistry = MPI_Wtime(); - - while (pkg_to_recv > 0) // start dispatching work packages - { - /* visual progress */ - progress = (float)(count_pkgs + 1) / n_wp; - - cout << "["; - int pos = barWidth * progress; - for (int iprog = 0; iprog < barWidth; ++iprog) { - if (iprog < pos) - cout << "="; - else if (iprog == pos) - cout << ">"; - else - cout << " "; - } - std::cout << "] " << int(progress * 100.0) << " %\r"; - std::cout.flush(); - /* end visual progress */ - - if (pkg_to_send > 0) { - master_send_a = MPI_Wtime(); - /*search for free workers and send work*/ - for (int p = 0; p < params.world_size - 1; p++) { - if (workerlist[p].has_work == 0 && - pkg_to_send > 0) /* worker is free */ { - - // to enable different work_package_size, set local copy of - // work_package_size to either global work_package size or - // remaining 'to_send' packages to_send >= work_package_size ? - // local_work_package_size = work_package_size : - // local_work_package_size = to_send; - - local_work_package_size = (int)wp_sizes_vector[count_pkgs]; - count_pkgs++; - - // cout << "CPP: sending pkg n. " << count_pkgs << " with size " - // << local_work_package_size << endl; - - /*push pointer forward to next work package, after taking the - * current one*/ - workerlist[p].send_addr = work_pointer; - - int end_of_wp = local_work_package_size * colCount; - work_pointer = &(work_pointer[end_of_wp]); - - // fill send buffer starting with work_package ... - std::memcpy(send_buffer, workerlist[p].send_addr, - (end_of_wp) * sizeof(double)); - // followed by: work_package_size - send_buffer[end_of_wp] = (double)local_work_package_size; - // current iteration of simulation - send_buffer[end_of_wp + 1] = (double)iteration; - // size of timestep in seconds - send_buffer[end_of_wp + 2] = dt; - // current time of simulation (age) in seconds - send_buffer[end_of_wp + 3] = current_sim_time; - // placeholder for work_package_count - send_buffer[end_of_wp + 4] = 0.; - - /* ATTENTION Worker p has rank p+1 */ - MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, - p + 1, TAG_WORK, MPI_COMM_WORLD); - - workerlist[p].has_work = 1; - free_workers--; - pkg_to_send -= 1; - } - } - master_send_b = MPI_Wtime(); - cummul_master_send += master_send_b - master_send_a; - } - - /*check if there are results to receive and receive them*/ - need_to_receive = 1; - master_recv_a = MPI_Wtime(); - while (need_to_receive && pkg_to_recv > 0) { - - if (pkg_to_send > 0 && free_workers > 0) - MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, - &need_to_receive, &probe_status); - else { - idle_a = MPI_Wtime(); - MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, - &probe_status); - idle_b = MPI_Wtime(); - master_idle += idle_b - idle_a; - } - - if (need_to_receive) { - int p = probe_status.MPI_SOURCE; - int size; - MPI_Get_count(&probe_status, MPI_DOUBLE, &size); - MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, - TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - workerlist[p - 1].has_work = 0; - pkg_to_recv -= 1; - free_workers++; - } - } - master_recv_b = MPI_Wtime(); - cummul_master_recv += master_recv_b - master_recv_a; - } - - sim_c_seq = MPI_Wtime(); - - // don't overwrite last progress - cout << endl; - - sim_d_chemistry = MPI_Wtime(); - cummul_workers += sim_d_chemistry - sim_c_chemistry; - - // convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data); - // R.from_C_domain(mpi_buffer); - - // R["chemistry_data"] = R.getBufferDataFrame(); - - ///* unshuffle results */ - // R.parseEval("result <- unshuffle_field(chemistry_data, - // ordered_ids)"); - - grid.importAndUnshuffle(mpi_buffer); - /* do master stuff */ - sim_e_chemistry = MPI_Wtime(); - R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); - sim_f_chemistry = MPI_Wtime(); - cummul_chemistry_master += sim_f_chemistry - sim_e_chemistry; + master.runIteration(); } sim_a_chemistry = MPI_Wtime(); - + double master_seq_a = MPI_Wtime(); // MDL master_iteration_end just writes on disk state_T and // state_C after every iteration if the cmdline option // --ignore-results is not given (and thus the R variable @@ -649,48 +443,22 @@ int main(int argc, char *argv[]) { << endl << endl; - if (params.dht_enabled) { - for (int i = 1; i < params.world_size; i++) { - MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD); - } + double master_seq_b = MPI_Wtime(); - MPI_Barrier(MPI_COMM_WORLD); + cummul_master_seq += master.getSeqTime() + (master_seq_b - master_seq_a); + master_send.push_back(master.getSendTime(), "it_" + to_string(iter)); + master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter)); - if (params.dht_snaps == 2) { - std::stringstream outfile; - outfile << params.out_dir << "/iter_" << std::setfill('0') - << std::setw(3) << iter << ".dht"; - for (int i = 1; i < params.world_size; i++) { - MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i, - TAG_DHT_STORE, MPI_COMM_WORLD); - } - MPI_Barrier(MPI_COMM_WORLD); - } + for (int i = 1; i < params.world_size; i++) { + MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD); } - sim_d_seq = MPI_Wtime(); + MPI_Barrier(MPI_COMM_WORLD); - cummul_master_seq_loop += - ((sim_b_seq - sim_a_seq) - (sim_a_transport - sim_b_transport)) + - (sim_d_seq - sim_c_seq); - master_send.push_back(cummul_master_send, "it_" + to_string(iter)); - master_recv.push_back(cummul_master_recv, "it_" + to_string(iter)); - - } // END SIMULATION LOOP + } // END SIMULATION LOOP sim_end = MPI_Wtime(); - - if (params.dht_enabled && params.dht_snaps > 0) { - cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl; - std::string outfile; - outfile = params.out_dir + ".dht"; - for (int i = 1; i < params.world_size; i++) { - MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE, - MPI_COMM_WORLD); - } - MPI_Barrier(MPI_COMM_WORLD); - cout << "CPP: Master: ... done" << endl; - } + master.finishSimulation(); Rcpp::NumericVector phreeqc_time; Rcpp::NumericVector dht_get_time; @@ -702,6 +470,10 @@ int main(int argc, char *argv[]) { timings = (double *)calloc(3, sizeof(double)); + int dht_hits = 0; + int dht_miss = 0; + int dht_collision = 0; + if (params.dht_enabled) { dht_hits = 0; dht_miss = 0; @@ -748,23 +520,21 @@ int main(int argc, char *argv[]) { R.parseEvalQ("profiling$simtime_transport <- simtime_transport"); R["simtime_chemistry"] = cummul_chemistry; R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry"); - R["simtime_workers"] = cummul_workers; + R["simtime_workers"] = master.getWorkerTime(); R.parseEvalQ("profiling$simtime_workers <- simtime_workers"); - R["simtime_chemistry_master"] = cummul_chemistry_master; + R["simtime_chemistry_master"] = master.getChemMasterTime(); R.parseEvalQ( "profiling$simtime_chemistry_master <- simtime_chemistry_master"); - R["seq_master_prep"] = cummul_master_seq_pre_loop; - R.parseEvalQ("profiling$seq_master_prep <- seq_master_prep"); - R["seq_master_loop"] = cummul_master_seq_loop; - R.parseEvalQ("profiling$seq_master_loop <- seq_master_loop"); + R["seq_master"] = cummul_master_seq; + R.parseEvalQ("profiling$seq_master <- seq_master"); // R["master_send"] = master_send; // R.parseEvalQ("profiling$master_send <- master_send"); // R["master_recv"] = master_recv; // R.parseEvalQ("profiling$master_recv <- master_recv"); - R["idle_master"] = master_idle; + R["idle_master"] = master.getIdleTime(); R.parseEvalQ("profiling$idle_master <- idle_master"); R["idle_worker"] = idle_worker; R.parseEvalQ("profiling$idle_worker <- idle_worker"); @@ -788,11 +558,9 @@ int main(int argc, char *argv[]) { R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time"); } - free(workerlist); free(timings); - if (params.dht_enabled) - free(dht_perfs); + if (params.dht_enabled) free(dht_perfs); cout << "CPP: Done! Results are stored as R objects into <" << params.out_dir << "/timings.rds>" << endl; @@ -802,42 +570,22 @@ int main(int argc, char *argv[]) { r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));"; R.parseEval(r_vis_code); } else { /*This is executed by the workers*/ - if (!params.dht_file.empty()) { - int res = file_to_table((char *)params.dht_file.c_str()); - if (res != DHT_SUCCESS) { - if (res == DHT_WRONG_FILE) { - if (params.world_rank == 2) - cerr << "CPP: Worker: Wrong File" << endl; - } else { - if (params.world_rank == 2) - cerr << "CPP: Worker: Error in loading current state of DHT from " - "file" - << endl; - } - return EXIT_FAILURE; - } else { - if (params.world_rank == 2) - cout << "CPP: Worker: Successfully loaded state of DHT from file " - << params.dht_file << endl; - std::cout.flush(); - } - } - worker_function(¶ms); - free(mpi_buffer_results); + ChemWorker worker(¶ms, R, grid); + worker.prepareSimulation(dht_comm); + worker.loop(); } cout << "CPP: finished, cleanup of process " << params.world_rank << endl; - if (params.dht_enabled) { - - if (params.dht_strategy == 0) { - if (params.world_rank != 0) { - DHT_free(dht_object, NULL, NULL); - } - } else { - DHT_free(dht_object, NULL, NULL); - } - } + // if (params.dht_enabled) { + // if (params.dht_strategy == 0) { + // if (params.world_rank != 0) { + // DHT_free(dht_object, NULL, NULL); + // } + // } else { + // DHT_free(dht_object, NULL, NULL); + // } + // } free(mpi_buffer); MPI_Finalize(); diff --git a/src/model/ChemMaster.cpp b/src/model/ChemMaster.cpp new file mode 100644 index 000000000..fee86113a --- /dev/null +++ b/src/model/ChemMaster.cpp @@ -0,0 +1,217 @@ +#include +#include + +#include + +#include "ChemSim.h" + +using namespace poet; +using namespace std; +using namespace Rcpp; + +#define TAG_WORK 42 + +ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_) + : ChemSim(params, R_, grid_) { + this->wp_size = params->wp_size; + this->out_dir = params->out_dir; +} + +void ChemMaster::runIteration() { + double seq_a, seq_b, seq_c, seq_d; + double worker_chemistry_a, worker_chemistry_b; + double sim_e_chemistry, sim_f_chemistry; + int pkg_to_send, pkg_to_recv; + int free_workers; + int i_pkgs; + + seq_a = MPI_Wtime(); + grid.shuffleAndExport(mpi_buffer); + // retrieve data from R runtime + iteration = (int)R.parseEval("mysetup$iter"); + dt = (double)R.parseEval("mysetup$requested_dt"); + current_sim_time = + (double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt"); + + // setup local variables + pkg_to_send = wp_sizes_vector.size(); + pkg_to_recv = wp_sizes_vector.size(); + work_pointer = mpi_buffer; + free_workers = world_size - 1; + i_pkgs = 0; + + seq_b = MPI_Wtime(); + seq_t += seq_b - seq_a; + + worker_chemistry_a = MPI_Wtime(); + while (pkg_to_recv > 0) { + // TODO: Progressbar into IO instance. + printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size()); + if (pkg_to_send > 0) { + sendPkgs(pkg_to_send, i_pkgs, free_workers); + } + recvPkgs(pkg_to_recv, pkg_to_send > 0, free_workers); + } + + // Just to complete the progressbar + cout << endl; + + worker_chemistry_b = MPI_Wtime(); + worker_t = worker_chemistry_b - worker_chemistry_a; + + seq_c = MPI_Wtime(); + grid.importAndUnshuffle(mpi_buffer); + /* do master stuff */ + sim_e_chemistry = MPI_Wtime(); + R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); + sim_f_chemistry = MPI_Wtime(); + chem_master += sim_f_chemistry - sim_e_chemistry; + seq_d = MPI_Wtime(); + seq_t += seq_d - seq_c; +} + +void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs, + int &free_workers) { + double master_send_a, master_send_b; + int local_work_package_size; + int end_of_wp; + + // start time measurement + master_send_a = MPI_Wtime(); + /*search for free workers and send work*/ + for (int p = 0; p < world_size - 1; p++) { + if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ { + // to enable different work_package_size, set local copy of + // work_package_size to either global work_package size or + // remaining 'to_send' packages to_send >= work_package_size ? + // local_work_package_size = work_package_size : + // local_work_package_size = to_send; + + local_work_package_size = (int)wp_sizes_vector[count_pkgs]; + count_pkgs++; + + // cout << "CPP: sending pkg n. " << count_pkgs << " with size " + // << local_work_package_size << endl; + + /*push pointer forward to next work package, after taking the + * current one*/ + workerlist[p].send_addr = work_pointer; + + end_of_wp = local_work_package_size * grid.getCols(); + work_pointer = &(work_pointer[end_of_wp]); + + // fill send buffer starting with work_package ... + std::memcpy(send_buffer, workerlist[p].send_addr, + (end_of_wp) * sizeof(double)); + // followed by: work_package_size + send_buffer[end_of_wp] = (double)local_work_package_size; + // current iteration of simulation + send_buffer[end_of_wp + 1] = (double)iteration; + // size of timestep in seconds + send_buffer[end_of_wp + 2] = dt; + // current time of simulation (age) in seconds + send_buffer[end_of_wp + 3] = current_sim_time; + // placeholder for work_package_count + send_buffer[end_of_wp + 4] = 0.; + + /* ATTENTION Worker p has rank p+1 */ + MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1, + TAG_WORK, MPI_COMM_WORLD); + + workerlist[p].has_work = 1; + free_workers--; + pkg_to_send -= 1; + } + } + master_send_b = MPI_Wtime(); + send_t += master_send_b - master_send_a; +} + +void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers) { + int need_to_receive = 1; + double master_recv_a, master_recv_b; + double idle_a, idle_b; + int p, size; + + MPI_Status probe_status; + master_recv_a = MPI_Wtime(); + while (need_to_receive && pkg_to_recv > 0) { + if (to_send && free_workers > 0) + MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive, + &probe_status); + else { + idle_a = MPI_Wtime(); + MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status); + idle_b = MPI_Wtime(); + master_idle += idle_b - idle_a; + } + + if (need_to_receive) { + p = probe_status.MPI_SOURCE; + size; + MPI_Get_count(&probe_status, MPI_DOUBLE, &size); + MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, TAG_WORK, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + workerlist[p - 1].has_work = 0; + pkg_to_recv -= 1; + free_workers++; + } + } + master_recv_b = MPI_Wtime(); + recv_t += master_recv_b - master_recv_a; +} + +void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) { + /* visual progress */ + double progress = (float)(count_pkgs + 1) / n_wp; + + cout << "["; + int pos = barWidth * progress; + for (int iprog = 0; iprog < barWidth; ++iprog) { + if (iprog < pos) + cout << "="; + else if (iprog == pos) + cout << ">"; + else + cout << " "; + } + std::cout << "] " << int(progress * 100.0) << " %\r"; + std::cout.flush(); + /* end visual progress */ +} + +void ChemMaster::prepareSimulation() { + workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); + send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET, + sizeof(double)); + + R.parseEvalQ( + "wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), " + "package_size=work_package_size)"); + + // we only sort once the vector + R.parseEvalQ("ordered_ids <- order(wp_ids)"); + R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); + R.parseEval("stat_wp_sizes(wp_sizes_vector)"); + wp_sizes_vector = as>(R["wp_sizes_vector"]); + + mpi_buffer = + (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); +} + +void ChemMaster::finishSimulation() { + free(mpi_buffer); + free(workerlist); +} + +double ChemMaster::getSendTime() { return this->send_t; } + +double ChemMaster::getRecvTime() { return this->recv_t; } + +double ChemMaster::getIdleTime() { return this->master_idle; } + +double ChemMaster::getWorkerTime() { return this->worker_t; } + +double ChemMaster::getChemMasterTime() { return this->chem_master; } + +double ChemMaster::getSeqTime() { return this->seq_t; } diff --git a/src/model/ChemSim.cpp b/src/model/ChemSim.cpp new file mode 100644 index 000000000..62b783481 --- /dev/null +++ b/src/model/ChemSim.cpp @@ -0,0 +1,23 @@ +#include "ChemSim.h" +#include "../util/RRuntime.h" +#include "Grid.h" +#include +#include +#include + +using namespace Rcpp; +using namespace poet; + +ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_) + : R(R_), grid(grid_) { + this->world_rank = params->world_rank; + this->world_size = params->world_size; + this->wp_size = params->wp_size; + this->out_dir = params->out_dir; +} + +void ChemSim::runSeq() { + R.parseEvalQ( + "result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)"); + R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); +} diff --git a/src/model/ChemSim.h b/src/model/ChemSim.h new file mode 100644 index 000000000..0d2c6a615 --- /dev/null +++ b/src/model/ChemSim.h @@ -0,0 +1,115 @@ +#ifndef CHEMSIM_H +#define CHEMSIM_H + +#include "../DHT/DHT_Wrapper.h" +#include "../util/RRuntime.h" +#include "../util/SimParams.h" +#include "Grid.h" + +#include +#include + +#define BUFFER_OFFSET 5 +#define TAG_WORK 42 +#define TAG_FINISH 43 +#define TAG_TIMING 44 +#define TAG_DHT_PERF 45 +#define TAG_DHT_STATS 46 +#define TAG_DHT_STORE 47 +#define TAG_DHT_ITER 48 + +namespace poet { +class ChemSim { +public: + ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_); + void runSeq(); + +protected: + double current_sim_time = 0; + int iteration = 0; + int dt = 0; + + int world_rank; + int world_size; + unsigned int wp_size; + + RRuntime &R; + Grid &grid; + + std::vector wp_sizes_vector; + std::string out_dir; + + double *send_buffer; + typedef struct { + char has_work; + double *send_addr; + } worker_struct; + worker_struct *workerlist; + + double *mpi_buffer; +}; + +class ChemMaster : public ChemSim { +public: + ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_); + + void prepareSimulation(); + void finishSimulation(); + + void runIteration(); + + double getSendTime(); + double getRecvTime(); + double getIdleTime(); + double getWorkerTime(); + double getChemMasterTime(); + double getSeqTime(); + +private: + void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70); + void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers); + void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers); + + unsigned int wp_size; + double *work_pointer; + + double send_t = 0.f; + double recv_t = 0.f; + double master_idle = 0.f; + double worker_t = 0.f; + double chem_master = 0.f; + double seq_t = 0.f; +}; + +class ChemWorker : public ChemSim { +public: + ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_); + + void prepareSimulation(MPI_Comm dht_comm); + void loop(); + +private: + void doWork(MPI_Status &probe_status); + void postIter(); + void finishWork(); + void writeFile(); + void readFile(); + + bool dht_enabled; + bool dt_differ; + int dht_snaps; + std::string dht_file; + unsigned int dht_size_per_process; + std::vector dht_flags; + t_simparams *params; + + double *mpi_buffer_results; + + DHT_Wrapper *dht; + + double timing[3]; + double idle_t = 0.f; + int phreeqc_count = 0; +}; +} // namespace poet +#endif // CHEMSIM_H diff --git a/src/model/ChemWorker.cpp b/src/model/ChemWorker.cpp new file mode 100644 index 000000000..8649fbc11 --- /dev/null +++ b/src/model/ChemWorker.cpp @@ -0,0 +1,325 @@ +#include +#include + +#include +#include + +#include "ChemSim.h" + +using namespace poet; +using namespace std; +using namespace Rcpp; + +ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_) + : params(params_), ChemSim(params_, R_, grid_) { + this->dt_differ = params->dt_differ; + this->dht_enabled = params->dht_enabled; + this->dht_size_per_process = params->dht_size_per_process; + this->dht_file = params->dht_file; +} + +void ChemWorker::prepareSimulation(MPI_Comm dht_comm) { + mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET, + sizeof(double)); + mpi_buffer_results = + (double *)calloc(wp_size * (grid.getCols()), sizeof(double)); + + if (world_rank == 1) + cout << "Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF") << endl; + + if (dht_enabled) { + int data_size = grid.getCols() * sizeof(double); + int key_size = + grid.getCols() * sizeof(double) + (dt_differ * sizeof(double)); + int dht_buckets_per_process = + dht_size_per_process / (1 + data_size + key_size); + + if (world_rank == 1) + cout << "CPP: Worker: data size: " << data_size << " bytes" << endl + << "CPP: Worker: key size: " << key_size << "bytes" << endl + << "CPP: Worker: buckets per process " << dht_buckets_per_process + << endl; + + dht = new DHT_Wrapper(params, dht_comm, dht_buckets_per_process, data_size, + key_size); + + if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl; + } + + if (!dht_file.empty()) readFile(); + + // set size + dht_flags.resize(params->wp_size, true); + // assign all elements to true (default) + dht_flags.assign(params->wp_size, true); + + timing[0] = 0.0; + timing[1] = 0.0; + timing[2] = 0.0; +} + +void ChemWorker::loop() { + MPI_Status probe_status; + while (1) { + double idle_a = MPI_Wtime(); + MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status); + double idle_b = MPI_Wtime(); + + if (probe_status.MPI_TAG == TAG_WORK) { + idle_t += idle_b - idle_a; + doWork(probe_status); + } else if (probe_status.MPI_TAG == TAG_FINISH) { + finishWork(); + break; + } else if (probe_status.MPI_TAG == TAG_DHT_ITER) { + postIter(); + } + } +} + +void ChemWorker::doWork(MPI_Status &probe_status) { + int count; + int local_work_package_size = 0; + + double dht_get_start, dht_get_end; + double phreeqc_time_start, phreeqc_time_end; + double dht_fill_start, dht_fill_end; + + /* get number of doubles sent */ + MPI_Get_count(&probe_status, MPI_DOUBLE, &count); + + /* receive */ + MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + + // decrement count of work_package by BUFFER_OFFSET + count -= BUFFER_OFFSET; + // check for changes on all additional variables given by the 'header' of + // mpi_buffer + + // work_package_size + if (mpi_buffer[count] != local_work_package_size) { // work_package_size + local_work_package_size = mpi_buffer[count]; + R["work_package_size"] = local_work_package_size; + R.parseEvalQ("mysetup$work_package_size <- work_package_size"); + } + + // current iteration of simulation + if (mpi_buffer[count + 1] != iteration) { + iteration = mpi_buffer[count + 1]; + R["iter"] = iteration; + R.parseEvalQ("mysetup$iter <- iter"); + } + + // current timestep size + if (mpi_buffer[count + 2] != dt) { + dt = mpi_buffer[count + 2]; + R["dt"] = dt; + R.parseEvalQ("mysetup$dt <- dt"); + } + + // current simulation time ('age' of simulation) + if (mpi_buffer[count + 3] != current_sim_time) { + current_sim_time = mpi_buffer[count + 3]; + R["simulation_time"] = current_sim_time; + R.parseEvalQ("mysetup$simulation_time <- simulation_time"); + } + /* 4th double value is currently a placeholder */ + // if (mpi_buffer[count+4] != placeholder) { + // placeholder = mpi_buffer[count+4]; + // R["mysetup$placeholder"] = placeholder; + // } + + /* get df with right structure to fill in work package */ + // R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)"); + // R["skeleton"] = grid.buildDataFrame(work_package_size); + //// R.parseEval("print(rownames(tmp2)[1:5])"); + //// R.parseEval("print(head(tmp2, 2))"); + //// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))"); + + ////Rcpp::DataFrame buffer = R.parseEval("tmp2"); + // R.setBufferDataFrame("skeleton"); + + if (dht_enabled) { + // DEBUG + // cout << "RANK " << world_rank << " start checking DHT\n"; + + // resize helper vector dht_flags of work_package_size changes + if ((int)dht_flags.size() != local_work_package_size) { + dht_flags.resize(local_work_package_size, true); // set size + dht_flags.assign(local_work_package_size, + true); // assign all elements to true (default) + } + + dht_get_start = MPI_Wtime(); + dht->checkDHT(local_work_package_size, dht_flags, mpi_buffer, dt); + dht_get_end = MPI_Wtime(); + + // DEBUG + // cout << "RANK " << world_rank << " checking DHT complete \n"; + + R["dht_flags"] = as(wrap(dht_flags)); + // R.parseEvalQ("print(head(dht_flags))"); + } + + /* work */ + // R.from_C_domain(mpi_buffer); + ////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer); + // R["work_package_full"] = R.getBufferDataFrame(); + // R["work_package"] = buffer; + + // DEBUG + // R.parseEvalQ("print(head(work_package_full))"); + // R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )"); + + grid.importWP(mpi_buffer, params->wp_size); + + if (params->dht_enabled) { + R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); + } else { + R.parseEvalQ("work_package <- work_package_full"); + } + + // DEBUG + // R.parseEvalQ("print(head(work_package),2)"); + + // R.parseEvalQ("rownames(work_package) <- work_package$id"); + // R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in% + // colnames(work_package)"); R.parseEvalQ("id_store <- + // rownames(work_package)"); //"[, ncol(work_package)]"); + // R.parseEvalQ("work_package$id <- NULL"); + R.parseEvalQ("work_package <- as.matrix(work_package)"); + + unsigned int nrows = R.parseEval("nrow(work_package)"); + + if (nrows > 0) { + /*Single Line error Workaround*/ + if (nrows <= 1) { + // duplicate line to enable correct simmulation + R.parseEvalQ( + "work_package <- work_package[rep(1:nrow(work_package), " + "times = 2), ]"); + } + + phreeqc_count++; + + phreeqc_time_start = MPI_Wtime(); + // MDL + // R.parseEvalQ("print('Work_package:\n'); print(head(work_package , + // 2)); cat('RCpp: worker_function:', local_rank, ' \n')"); + R.parseEvalQ( + "result <- as.data.frame(slave_chemistry(setup=mysetup, " + "data = work_package))"); + phreeqc_time_end = MPI_Wtime(); + // R.parseEvalQ("result$id <- id_store"); + } else { + // cout << "Work-Package is empty, skipping phreeqc!" << endl; + } + + if (dht_enabled) { + R.parseEvalQ("result_full <- work_package_full"); + if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result"); + } else { + R.parseEvalQ("result_full <- result"); + } + + // R.setBufferDataFrame("result_full"); + ////Rcpp::DataFrame result = R.parseEval("result_full"); + ////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result); + // R.to_C_domain(mpi_buffer_results); + + grid.exportWP(mpi_buffer_results); + /* send results to master */ + MPI_Request send_req; + MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, + &send_req); + + if (dht_enabled) { + dht_fill_start = MPI_Wtime(); + dht->fillDHT(local_work_package_size, dht_flags, mpi_buffer, + mpi_buffer_results, dt); + dht_fill_end = MPI_Wtime(); + + timing[1] += dht_get_end - dht_get_start; + timing[2] += dht_fill_end - dht_fill_start; + } + + timing[0] += phreeqc_time_end - phreeqc_time_start; + + MPI_Wait(&send_req, MPI_STATUS_IGNORE); +} + +void ChemWorker::postIter() { + MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_ITER, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + if (dht_enabled) { + dht->printStatistics(); + + if (dht_snaps == 2) { + writeFile(); + } + } + // synchronize all processes + MPI_Barrier(MPI_COMM_WORLD); +} + +void ChemWorker::writeFile() { + std::stringstream out; + out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht"; + int res = dht->tableToFile(out.str().c_str()); + if (res != DHT_SUCCESS && world_rank == 2) + cerr << "CPP: Worker: Errir in writing current state of DHT to file." + << endl; + else if (world_rank == 2) + cout << "CPP: Worker: Successfully written DHT to file " << out.str() + << endl; +} + +void ChemWorker::readFile() { + int res = dht->fileToTable((char *)dht_file.c_str()); + if (res != DHT_SUCCESS) { + if (res == DHT_WRONG_FILE) { + if (world_rank == 1) + cerr << "CPP: Worker: Wrong file layout! Continue with empty DHT ..." + << endl; + } else { + if (world_rank == 1) + cerr << "CPP: Worker: Error in loading current state of DHT from " + "file. Continue with empty DHT ..." + << endl; + } + } else { + if (world_rank == 2) + cout << "CPP: Worker: Successfully loaded state of DHT from file " + << dht_file << endl; + std::cout.flush(); + } +} + +void ChemWorker::finishWork() { + if (dht_enabled && dht_snaps > 0) writeFile(); + + double dht_perf[3]; + /* before death, submit profiling/timings to master*/ + MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + + // timings + MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); + + MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD); + MPI_Send(&idle_t, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); + + if (dht_enabled) { + // dht_perf + dht_perf[0] = dht->getHits(); + dht_perf[1] = dht->getMisses(); + dht_perf[2] = dht->getEvictions(); + MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF, + MPI_COMM_WORLD); + } + + free(mpi_buffer); + free(mpi_buffer_results); + delete dht; +}