diff --git a/src/DHT.cpp b/src/DHT.cpp deleted file mode 100644 index 546f7aa50..000000000 --- a/src/DHT.cpp +++ /dev/null @@ -1,423 +0,0 @@ -#include "DHT.h" -#include -#include -#include -#include - -/* - * determining destination rank and index by hash of key - * - * return values by reference - */ -static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) { - uint64_t tmp; - int char_hop = 9-index_count; - unsigned int i; - for (i = 0; i < index_count ; i++) { - tmp = 0; - memcpy(&tmp,(unsigned char *)&hash+i, char_hop); - index[i] = (unsigned int) (tmp % table_size); - } - *dest_rank = (unsigned int) (hash % comm_size); -} - -/** - * set write flag to 1 - */ -static void set_flag(char* flag_byte) { - *flag_byte = 0; - *flag_byte |= (1 << 0); -} - -/** - * return 1 if write flag is set - * else 0 - */ -static int read_flag(char flag_byte) { - if ((flag_byte & 0x01) == 0x01) { - return 1; - } else return 0; -} - -/* - * allocating memory for DHT object and buckets. - * creating MPI window for OSC - * filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use - * return DHT object - */ -DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) { - DHT *object; - MPI_Win window; - void* mem_alloc; - int comm_size, tmp; - - tmp = (int) ceil(log2(size)); - if (tmp%8 != 0) tmp = tmp + (8-(tmp%8)); - - object = (DHT*) malloc(sizeof(DHT)); - if (object == NULL) return NULL; - - //every memory allocation has 1 additional byte for flags etc. - if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL; - if (MPI_Comm_size(comm, &comm_size) != 0) return NULL; - - memset(mem_alloc, '\0', size * (1 + data_size + key_size)); - - if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL; - - object->data_size = data_size; - object->key_size = key_size; - object->table_size = size; - object->window = window; - object->hash_func = hash_func; - object->comm_size = comm_size; - object->communicator = comm; - object->read_misses = 0; - object->collisions = 0; - object->recv_entry = malloc(1 + data_size + key_size); - object->send_entry = malloc(1 + data_size + key_size); - object->index_count = 9-(tmp/8); - object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int)); - object->mem_alloc = mem_alloc; - - DHT_stats *stats; - - stats = (DHT_stats*) malloc(sizeof(DHT_stats)); - if (stats == NULL) return NULL; - - object->stats = stats; - object->stats->writes_local = (int*) calloc(comm_size, sizeof(int)); - object->stats->old_writes = 0; - object->stats->read_misses = 0; - object->stats->collisions = 0; - object->stats->w_access = 0; - object->stats->r_access = 0; - - return object; -} - -/* - * puts passed by data with key to DHT - * - * returning DHT_MPI_ERROR = -1 if MPI error occurred - * else DHT_SUCCESS = 0 if success - */ -int DHT_write(DHT *table, void* send_key, void* send_data) { - unsigned int dest_rank, i; - int result = DHT_SUCCESS; - - table->stats->w_access++; - - //determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count); - - //concatenating key with data to write entry to DHT - set_flag((char *) table->send_entry); - memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size); - memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size); - - //locking window of target rank with exclusive lock - if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0) - return DHT_MPI_ERROR; - for (i = 0; i < table->index_count; i++) - { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - - //increment collision counter if receiving key doesn't match sending key - //,entry has write flag + last index is reached - - if (read_flag(*(char *)table->recv_entry)) { - if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) { - if (i == (table->index_count)-1) { - table->collisions += 1; - table->stats->collisions += 1; - result = DHT_WRITE_SUCCESS_WITH_COLLISION; - break; - } - } else break; - } else { - table->stats->writes_local[dest_rank]++; - break; - } - } - - //put data to DHT - if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; - //unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - - return result; -} - -/* - * gets data from the DHT by key - * - * return DHT_SUCCESS = 0 if success - * DHT_MPI_ERROR = -1 if MPI error occurred - * DHT_READ_ERROR = -2 if receiving key doesn't match sending key - */ -int DHT_read(DHT *table, void* send_key, void* destination) { - unsigned int dest_rank, i; - - table->stats->r_access++; - - //determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count); - - //locking window of target rank with shared lock - if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR; - //receive data - for (i = 0; i < table->index_count; i++) { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - - //increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached - //else copy data to dereference of passed by destination pointer - - if ((read_flag(*(char *) table->recv_entry)) == 0) { - table->read_misses += 1; - table->stats->read_misses += 1; - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - return DHT_READ_ERROR; - } - - if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) { - if (i == (table->index_count)-1) { - table->read_misses += 1; - table->stats->read_misses += 1; - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - return DHT_READ_ERROR; - } - } else break; - } - - //unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR; - - memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size); - - return DHT_SUCCESS; -} - -int DHT_to_file(DHT* table, char* filename) { - //open file - MPI_File file; - if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR; - - int rank; - MPI_Comm_rank(table->communicator, &rank); - - //write header (key_size and data_size) - if (rank == 0) { - if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; - if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; - } - - if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; - - char* ptr; - int bucket_size = table->key_size + table->data_size + 1; - - //iterate over local memory - for (unsigned int i = 0; i < table->table_size; i++) { - ptr = (char *) table->mem_alloc + (i * bucket_size); - //if bucket has been written to (checked by written_flag)... - if (read_flag(*ptr)) { - //write key and data to file - if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR; - } - } - //close file - if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; - - return DHT_SUCCESS; -} - -int DHT_from_file(DHT* table, char* filename) { - MPI_File file; - MPI_Offset f_size; - int e_size, m_size, cur_pos, rank, offset; - char* buffer; - void* key; - void* data; - - if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR; - - if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR; - - MPI_Comm_rank(table->communicator, &rank); - - e_size = table->key_size + table->data_size; - m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE; - buffer = (char *) malloc(m_size); - - if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR; - - if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE; - if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE; - - offset = e_size*table->comm_size; - - if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; - cur_pos = DHT_HEADER_SIZE + (rank * e_size); - - while(cur_pos < f_size) { - if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR; - MPI_Offset tmp; - MPI_File_get_position(file, &tmp); - if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR; - key = buffer; - data = (buffer+table->key_size); - if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR; - - cur_pos += offset; - } - - free (buffer); - if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR; - - return DHT_SUCCESS; -} - -/* - * frees up memory and accumulate counter - */ -int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) { - int buf; - - if (collision_counter != NULL) { - buf = 0; - if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - *collision_counter = buf; - } - if (readerror_counter != NULL) { - buf = 0; - if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - *readerror_counter = buf; - } - if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR; - if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR; - free(table->recv_entry); - free(table->send_entry); - free(table->index); - - free(table->stats->writes_local); - free(table->stats); - - free(table); - - return DHT_SUCCESS; -} - -/* - * prints a table with statistics about current use of DHT - * for each participating process and summed up results containing: - * 1. occupied buckets (in respect to the memory of this process) - * 2. free buckets (in respect to the memory of this process) - * 3. calls of DHT_write (w_access) - * 4. calls of DHT_read (r_access) - * 5. read misses (see DHT_READ_ERROR) - * 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION) - * 3-6 will reset with every call of this function - * finally the amount of new written entries is printed out (in relation to last call of this funtion) - */ -int DHT_print_statistics(DHT *table) { - int *written_buckets; - int *read_misses, sum_read_misses; - int *collisions, sum_collisions; - int sum_w_access, sum_r_access, *w_access, *r_access; - int rank; - - MPI_Comm_rank(table->communicator, &rank); - - //disable possible warning of unitialized variable, which is not the case - //since we're using MPI_Gather to obtain all values only on rank 0 - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" - - //obtaining all values from all processes in the communicator - if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->read_misses = 0; - - if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->collisions = 0; - - if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->w_access = 0; - - if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int)); - if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - table->stats->r_access = 0; - - if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int)); - if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR; - - if (rank == 0) { //only process with rank 0 will print out results as a table - int sum_written_buckets = 0; - - for (int i=0; i < table->comm_size; i++) { - sum_written_buckets += written_buckets[i]; - } - - int members = 7; - int padsize = (members*12)+1; - char pad[padsize+1]; - - memset(pad, '-', padsize*sizeof(char)); - pad[padsize]= '\0'; - printf("\n"); - printf("%-35s||resets with every call of this function\n", " "); - printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", - "rank", - "occupied", - "free", - "w_access", - "r_access", - "read misses", - "collisions"); - printf("%s\n", pad); - for (int i = 0; i < table->comm_size; i++) { - printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", - i, - written_buckets[i], - table->table_size-written_buckets[i], - w_access[i], - r_access[i], - read_misses[i], - collisions[i]); - } - printf("%s\n", pad); - printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", - "sum", - sum_written_buckets, - (table->table_size*table->comm_size)-sum_written_buckets, - sum_w_access, - sum_r_access, - sum_read_misses, - sum_collisions); - - printf("%s\n", pad); - printf("%s %d\n", - "new entries:", - sum_written_buckets - table->stats->old_writes); - - printf("\n"); - fflush(stdout); - - table->stats->old_writes = sum_written_buckets; - } - - //enable warning again - #pragma GCC diagnostic pop - - MPI_Barrier(table->communicator); - return DHT_SUCCESS; -} \ No newline at end of file diff --git a/src/DHT.h b/src/DHT.h deleted file mode 100644 index 1b118e690..000000000 --- a/src/DHT.h +++ /dev/null @@ -1,112 +0,0 @@ -/* - * File: DHT.h - * Author: max luebke - * - * Created on 16. November 2017, 09:14 - */ - -#ifndef DHT_H -#define DHT_H - -#include -#include - -#define DHT_MPI_ERROR -1 -#define DHT_READ_ERROR -2 -#define DHT_SUCCESS 0 -#define DHT_WRITE_SUCCESS_WITH_COLLISION 1 - -#define DHT_WRONG_FILE 11 -#define DHT_FILE_IO_ERROR 12 -#define DHT_FILE_READ_ERROR 13 -#define DHT_FILE_WRITE_ERROR 14 - -#define DHT_HEADER_SIZE 8 - -typedef struct {; - int *writes_local, old_writes; - int read_misses, collisions; - int w_access, r_access; -} DHT_stats; - -typedef struct { - MPI_Win window; - int data_size; - int key_size; - unsigned int table_size; - MPI_Comm communicator; - int comm_size; - uint64_t(*hash_func) (int, void*); - void* recv_entry; - void* send_entry; - void* mem_alloc; - int read_misses; - int collisions; - unsigned int *index; - unsigned int index_count; - DHT_stats *stats; -} DHT; - - - -/* - * parameters: - * MPI_Comm comm - communicator of processes that are holding the DHT - * int size_per_process - number of buckets each process will create - * int data_size - size of data in bytes - * int key_size - size of key in bytes - * *hash_func - pointer to hashfunction - * - * return: - * NULL if error during initialization - * DHT* if success - */ -extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*)); - -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * void* data - pointer to data - * void* - pointer to key - * - * return: - * error value (see above) - */ -extern int DHT_write(DHT *table, void* key, void* data); - -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * void* key - pointer to key - * void* destination - pointer which will hold the resulting data from DHT - * - * return: - * error value (see above) - */ -extern int DHT_read(DHT *table, void* key, void* destination); - -extern int DHT_to_file(DHT *table, char* filename); - -extern int DHT_from_file(DHT *table, char* filename); - -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * int* collision_counter - pointer which will hold the total count of collisions - * int* readerror_counter - pointer which will hold the total count of read errors - * - * return: - * error value (see above) - */ -extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter); - -/* - * parameters: - * DHT *table - DHT_object created by DHT_create - * - * return: - * error value (DHT_SUCCESS or DHT_MPI_ERROR) - */ -extern int DHT_print_statistics(DHT *table); - -#endif /* DHT_H */ \ No newline at end of file diff --git a/src/dht_wrapper.cpp b/src/dht_wrapper.cpp deleted file mode 100644 index f03707c45..000000000 --- a/src/dht_wrapper.cpp +++ /dev/null @@ -1,176 +0,0 @@ -#include "dht_wrapper.h" -#include - -using namespace poet; - -/*init globals*/ -//bool dht_enabled; -//int dht_snaps; -//int dht_strategy; -//int dht_significant_digits; -//std::string dht_file; -//std::vector dht_significant_digits_vector; -//std::vector prop_type_vector; -//bool dht_logarithm; -//uint64_t dht_size_per_process; -uint64_t dht_hits, dht_miss, dht_collision; -RInside *R_DHT; -std::vector dht_flags; -DHT *dht_object; - -double *fuzzing_buffer; - -//bool dt_differ; -/*functions*/ - -uint64_t get_md5(int key_size, void *key) { - MD5_CTX ctx; - unsigned char sum[MD5_DIGEST_LENGTH]; - uint64_t retval, *v1, *v2; - - MD5_Init(&ctx); - MD5_Update(&ctx, key, key_size); - MD5_Final(sum, &ctx); - - v1 = (uint64_t *)&sum[0]; - v2 = (uint64_t *)&sum[8]; - retval = *v1 ^ *v2; - - return retval; -} - -// double Round_off(double N, double n) { -// double result; -// R["roundsig"] = n; -// R["roundin"] = N; -// -// result = R.parseEval("signif(roundin, digits=roundsig)"); -// -// return result; -//} - -/* - * Stores fuzzed version of key in fuzzing_buffer - */ -void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params) { - unsigned int i = 0; - // introduce fuzzing to allow more hits in DHT - for (i = 0; i < (unsigned int)var_count; i++) { - if (params->dht_prop_type_vector[i] == "act") { - // with log10 - if (params->dht_log) { - if (((double *)key)[i] < 0) - cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in " - "key!" - << endl; - else if (((double *)key)[i] == 0) - fuzzing_buffer[i] = 0; - else - fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])), - params->dht_signif_vector[i]); - } else { - // without log10 - fuzzing_buffer[i] = - ROUND((((double *)key)[i]), params->dht_signif_vector[i]); - } - } else if (params->dht_prop_type_vector[i] == "logact") { - fuzzing_buffer[i] = - ROUND((((double *)key)[i]), params->dht_signif_vector[i]); - } else if (params->dht_prop_type_vector[i] == "ignore") { - fuzzing_buffer[i] = 0; - } else { - cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong " - "prop_type!" - << endl; - } - } - if (params->dt_differ) - fuzzing_buffer[var_count] = dt; -} - -void check_dht(int length, std::vector &out_result_index, - double *work_package, double dt, t_simparams *params) { - void *key; - int res; - int var_count = params->dht_prop_type_vector.size(); - for (int i = 0; i < length; i++) { - key = (void *)&(work_package[i * var_count]); - - // fuzz data (round, logarithm etc.) - fuzz_for_dht(var_count, key, dt, params); - - // overwrite input with data from DHT, IF value is found in DHT - res = DHT_read(dht_object, fuzzing_buffer, key); - - if (res == DHT_SUCCESS) { - // flag that this line is replaced by DHT-value, do not simulate!! - out_result_index[i] = false; - dht_hits++; - } else if (res == DHT_READ_ERROR) { - // this line is untouched, simulation is needed - out_result_index[i] = true; - dht_miss++; - } else { - // MPI ERROR ... WHAT TO DO NOW? - // RUNNING CIRCLES WHILE SCREAMING - } - } -} - -void fill_dht(int length, std::vector &result_index, double *work_package, - double *results, double dt, t_simparams *params) { - void *key; - void *data; - int res; - int var_count = params->dht_prop_type_vector.size(); - - for (int i = 0; i < length; i++) { - key = (void *)&(work_package[i * var_count]); - data = (void *)&(results[i * var_count]); - - if (result_index[i]) { - // If true -> was simulated, needs to be inserted into dht - - // fuzz data (round, logarithm etc.) - fuzz_for_dht(var_count, key, dt, params); - - res = DHT_write(dht_object, fuzzing_buffer, data); - - if (res != DHT_SUCCESS) { - if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) { - dht_collision++; - } else { - // MPI ERROR ... WHAT TO DO NOW? - // RUNNING CIRCLES WHILE SCREAMING - } - } - } - } -} - -void print_statistics() { - int res; - - res = DHT_print_statistics(dht_object); - - if (res != DHT_SUCCESS) { - // MPI ERROR ... WHAT TO DO NOW? - // RUNNING CIRCLES WHILE SCREAMING - } -} - -int table_to_file(char *filename) { - int res = DHT_to_file(dht_object, filename); - return res; -} - -int file_to_table(char *filename) { - - int res = DHT_from_file(dht_object, filename); - if (res != DHT_SUCCESS) - return res; - - DHT_print_statistics(dht_object); - - return DHT_SUCCESS; -} diff --git a/src/dht_wrapper.h b/src/dht_wrapper.h deleted file mode 100644 index f6e6cb7f5..000000000 --- a/src/dht_wrapper.h +++ /dev/null @@ -1,55 +0,0 @@ -#pragma once -#include "DHT.h" -#include "util/RRuntime.h" -#include "util/SimParams.h" -#include -#include -#include - -using namespace std; -using namespace poet; - -/*Functions*/ -uint64_t get_md5(int key_size, void *key); -void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params); -void check_dht(int length, std::vector &out_result_index, - double *work_package, double dt, t_simparams *params); -void fill_dht(int length, std::vector &result_index, double *work_package, - double *results, double dt, t_simparams *params); -void print_statistics(); -int table_to_file(char *filename); -int file_to_table(char *filename); - -/*globals*/ -//extern bool dht_enabled; -//extern int dht_snaps; -//extern std::string dht_file; -//extern bool dt_differ; - -// Default DHT Size per process in Byte (defaults to 1 GiB) -#define DHT_SIZE_PER_PROCESS 1073741824 - -// sets default dht access and distribution strategy -#define DHT_STRATEGY 0 -// 0 -> DHT is on workers, access from workers only -// 1 -> DHT is on workers + master, access from master only !NOT IMPLEMENTED -// YET! - -#define ROUND(value, signif) \ - (((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif)) - -//extern int dht_strategy; -//extern int dht_significant_digits; -//extern std::vector dht_significant_digits_vector; -//extern std::vector prop_type_vector; -//extern bool dht_logarithm; -//extern uint64_t dht_size_per_process; - -// global DHT object, can be NULL if not initialized, check strategy -extern DHT *dht_object; - -// DHT Performance counter -extern uint64_t dht_hits, dht_miss, dht_collision; - -extern double *fuzzing_buffer; -extern std::vector dht_flags; diff --git a/src/global_buffer.h b/src/global_buffer.h deleted file mode 100644 index 69cc742f2..000000000 --- a/src/global_buffer.h +++ /dev/null @@ -1,9 +0,0 @@ -#pragma once - -#define BUFFER_OFFSET 5 - -/*Globals*/ -extern double* mpi_buffer; -extern double* mpi_buffer_results; - -extern uint32_t work_package_size; \ No newline at end of file diff --git a/src/r_utils.cpp b/src/r_utils.cpp deleted file mode 100644 index eafa33fa6..000000000 --- a/src/r_utils.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "r_utils.h" - -/* This function converts a pure double dataframe into a double array. - buffer <- double array, needs to be allocated before - df <- reference to input dataframe -*/ -void convert_R_Dataframe_2_C_buffer(double* buffer, Rcpp::DataFrame &df) -{ - size_t rowCount = df.nrow(); - size_t colCount = df.ncol(); - - for (size_t i = 0; i < rowCount; i++) - { - for (size_t j = 0; j < colCount; j++) - { - /* Access column vector j and extract value of line i */ - Rcpp::DoubleVector col = df[j]; - buffer[i * colCount + j] = col[i]; - } - } -} - -/* This function converts a double array into a double dataframe. - buffer <- input double array - df <- reference to output dataframe, needs to be of fitting size, structure will be taken from it -*/ -void convert_C_buffer_2_R_Dataframe(double* buffer, Rcpp::DataFrame &df) -{ - size_t rowCount = df.nrow(); - size_t colCount = df.ncol(); - - for (size_t i = 0; i < rowCount; i++) - { - for (size_t j = 0; j < colCount; j++) - { - /* Access column vector j and extract value of line i */ - Rcpp::DoubleVector col = df[j]; - col[i] = buffer[i * colCount + j]; - } - } -} - - - diff --git a/src/r_utils.h b/src/r_utils.h deleted file mode 100644 index 95000e97f..000000000 --- a/src/r_utils.h +++ /dev/null @@ -1,6 +0,0 @@ -#pragma once -#include - -/*Functions*/ -void convert_R_Dataframe_2_C_buffer(double* buffer, Rcpp::DataFrame &df); -void convert_C_buffer_2_R_Dataframe(double* buffer, Rcpp::DataFrame &df); \ No newline at end of file diff --git a/src/worker.cpp b/src/worker.cpp deleted file mode 100644 index 60643530d..000000000 --- a/src/worker.cpp +++ /dev/null @@ -1,301 +0,0 @@ -#include "worker.h" -#include "dht_wrapper.h" -#include "global_buffer.h" -#include "model/Grid.h" -#include "util/RRuntime.h" -#include -#include -#include - -using namespace poet; -using namespace Rcpp; - -void worker_function(t_simparams *params) { - RRuntime R = *(static_cast(params->R)); - Grid grid = *(static_cast(params->grid)); - int world_rank; - MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); - MPI_Status probe_status; - int count; - - int local_work_package_size; - int iteration; - double dt, current_sim_time; - - double idle_a, idle_b; - double cummul_idle = 0.f; - - double dht_get_start = 0, dht_get_end = 0; - double dht_fill_start = 0, dht_fill_end = 0; - double phreeqc_time_start = 0, phreeqc_time_end = 0; - int phreeqc_count = 0; - - // timing[0] -> phreeqc - // timing[1] -> dht_get - // timing[2] -> dht_fill - double timing[3]; - timing[0] = 0.0; - timing[1] = 0.0; - timing[2] = 0.0; - - // dht_perf[0] -> hits - // dht_perf[1] -> miss - // dht_perf[2] -> collisions - uint64_t dht_perf[3]; - - if (params->dht_enabled) { - dht_flags.resize(params->wp_size, true); // set size - dht_flags.assign(params->wp_size, - true); // assign all elements to true (default) - dht_hits = 0; - dht_miss = 0; - dht_collision = 0; - - // MDL: This code has now been moved to kin.cpp - // /*Load significance vector from R setup file (or set default)*/ - // bool signif_vector_exists = R.parseEval("exists('signif_vector')"); - // if (signif_vector_exists) - // { - // dht_significant_digits_vector = - // as>(R["signif_vector"]); - // } else - // { - // dht_significant_digits_vector.assign(dht_object->key_size / - // sizeof(double), dht_significant_digits); - // } - - // /*Load property type vector from R setup file (or set default)*/ - // bool prop_type_vector_exists = R.parseEval("exists('prop_type')"); - // if (prop_type_vector_exists) - // { - // prop_type_vector = as>(R["prop_type"]); - // } else - // { - // prop_type_vector.assign(dht_object->key_size / sizeof(double), - // "normal"); - // } - } - - // initialization of helper variables - iteration = 0; - dt = 0; - current_sim_time = 0; - local_work_package_size = 0; - - /*worker loop*/ - while (1) { - /*Wait for Message*/ - idle_a = MPI_Wtime(); - MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status); - idle_b = MPI_Wtime(); - - if (probe_status.MPI_TAG == TAG_WORK) { /* do work */ - - cummul_idle += idle_b - idle_a; - - /* get number of doubles sent */ - MPI_Get_count(&probe_status, MPI_DOUBLE, &count); - - /* receive */ - MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - - // decrement count of work_package by BUFFER_OFFSET - count -= BUFFER_OFFSET; - // check for changes on all additional variables given by the 'header' of - // mpi_buffer - if (mpi_buffer[count] != local_work_package_size) { // work_package_size - local_work_package_size = mpi_buffer[count]; - R["work_package_size"] = local_work_package_size; - R.parseEvalQ("mysetup$work_package_size <- work_package_size"); - } - if (mpi_buffer[count + 1] != - iteration) { // current iteration of simulation - iteration = mpi_buffer[count + 1]; - R["iter"] = iteration; - R.parseEvalQ("mysetup$iter <- iter"); - } - if (mpi_buffer[count + 2] != dt) { // current timestep size - dt = mpi_buffer[count + 2]; - R["dt"] = dt; - R.parseEvalQ("mysetup$dt <- dt"); - } - if (mpi_buffer[count + 3] != - current_sim_time) { // current simulation time ('age' of simulation) - current_sim_time = mpi_buffer[count + 3]; - R["simulation_time"] = current_sim_time; - R.parseEvalQ("mysetup$simulation_time <- simulation_time"); - } - /* 4th double value is currently a placeholder */ - // if (mpi_buffer[count+4] != placeholder) { - // placeholder = mpi_buffer[count+4]; - // R["mysetup$placeholder"] = placeholder; - // } - - /* get df with right structure to fill in work package */ - // R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)"); - // R["skeleton"] = grid.buildDataFrame(work_package_size); - //// R.parseEval("print(rownames(tmp2)[1:5])"); - //// R.parseEval("print(head(tmp2, 2))"); - //// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))"); - - ////Rcpp::DataFrame buffer = R.parseEval("tmp2"); - // R.setBufferDataFrame("skeleton"); - - if (params->dht_enabled) { - // DEBUG - // cout << "RANK " << world_rank << " start checking DHT\n"; - - // resize helper vector dht_flags of work_package_size changes - if ((int)dht_flags.size() != local_work_package_size) { - dht_flags.resize(local_work_package_size, true); // set size - dht_flags.assign(local_work_package_size, - true); // assign all elements to true (default) - } - - dht_get_start = MPI_Wtime(); - check_dht(local_work_package_size, dht_flags, mpi_buffer, dt, params); - dht_get_end = MPI_Wtime(); - - // DEBUG - // cout << "RANK " << world_rank << " checking DHT complete \n"; - - R["dht_flags"] = as(wrap(dht_flags)); - // R.parseEvalQ("print(head(dht_flags))"); - } - - /* work */ - // R.from_C_domain(mpi_buffer); - ////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer); - // R["work_package_full"] = R.getBufferDataFrame(); - // R["work_package"] = buffer; - - // DEBUG - // R.parseEvalQ("print(head(work_package_full))"); - // R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )"); - - grid.importWP(mpi_buffer, params->wp_size); - - if (params->dht_enabled) { - R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); - } else { - R.parseEvalQ("work_package <- work_package_full"); - } - - // DEBUG - // R.parseEvalQ("print(head(work_package),2)"); - - // R.parseEvalQ("rownames(work_package) <- work_package$id"); - // R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in% - // colnames(work_package)"); R.parseEvalQ("id_store <- - // rownames(work_package)"); //"[, ncol(work_package)]"); - // R.parseEvalQ("work_package$id <- NULL"); - R.parseEvalQ("work_package <- as.matrix(work_package)"); - - unsigned int nrows = R.parseEval("nrow(work_package)"); - - if (nrows > 0) { - /*Single Line error Workaround*/ - if (nrows <= 1) { - // duplicate line to enable correct simmulation - R.parseEvalQ("work_package <- work_package[rep(1:nrow(work_package), " - "times = 2), ]"); - } - - phreeqc_count++; - - phreeqc_time_start = MPI_Wtime(); - // MDL - // R.parseEvalQ("print('Work_package:\n'); print(head(work_package , - // 2)); cat('RCpp: worker_function:', local_rank, ' \n')"); - R.parseEvalQ("result <- as.data.frame(slave_chemistry(setup=mysetup, " - "data = work_package))"); - phreeqc_time_end = MPI_Wtime(); - // R.parseEvalQ("result$id <- id_store"); - } else { - // cout << "Work-Package is empty, skipping phreeqc!" << endl; - } - - if (params->dht_enabled) { - R.parseEvalQ("result_full <- work_package_full"); - if (nrows > 0) - R.parseEvalQ("result_full[dht_flags,] <- result"); - } else { - R.parseEvalQ("result_full <- result"); - } - - // R.setBufferDataFrame("result_full"); - ////Rcpp::DataFrame result = R.parseEval("result_full"); - ////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result); - // R.to_C_domain(mpi_buffer_results); - - grid.exportWP(mpi_buffer_results); - /* send results to master */ - MPI_Request send_req; - MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, - MPI_COMM_WORLD, &send_req); - - if (params->dht_enabled) { - dht_fill_start = MPI_Wtime(); - fill_dht(local_work_package_size, dht_flags, mpi_buffer, - mpi_buffer_results, dt, params); - dht_fill_end = MPI_Wtime(); - - timing[1] += dht_get_end - dht_get_start; - timing[2] += dht_fill_end - dht_fill_start; - } - - timing[0] += phreeqc_time_end - phreeqc_time_start; - - MPI_Wait(&send_req, MPI_STATUS_IGNORE); - - } else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */ - /* before death, submit profiling/timings to master*/ - - MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - - // timings - MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); - - MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD); - MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); - - if (params->dht_enabled) { - // dht_perf - dht_perf[0] = dht_hits; - dht_perf[1] = dht_miss; - dht_perf[2] = dht_collision; - MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF, - MPI_COMM_WORLD); - } - break; - - } else if ((probe_status.MPI_TAG == TAG_DHT_STATS)) { - MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_STATS, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - print_statistics(); - MPI_Barrier(MPI_COMM_WORLD); - - } else if ((probe_status.MPI_TAG == TAG_DHT_STORE)) { - char *outdir; - MPI_Get_count(&probe_status, MPI_CHAR, &count); - outdir = (char *)calloc(count + 1, sizeof(char)); - MPI_Recv(outdir, count, MPI_CHAR, 0, TAG_DHT_STORE, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - int res = table_to_file((char *)outdir); - if (res != DHT_SUCCESS) { - if (world_rank == 2) - cerr << "CPP: Worker: Error in writing current state of DHT to file " - "(TAG_DHT_STORE)" - << endl; - } else { - if (world_rank == 2) - cout << "CPP: Worker: Successfully written DHT to file " << outdir - << endl; - } - free(outdir); - MPI_Barrier(MPI_COMM_WORLD); - } - } -} diff --git a/src/worker.h b/src/worker.h deleted file mode 100644 index 076ede6e2..000000000 --- a/src/worker.h +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once -#include "util/SimParams.h" -#include "util/RRuntime.h" -#include "model/Grid.h" - -using namespace std; -using namespace poet; -/*Functions*/ -void worker_function(t_simparams *params); - - -/*Globals*/ -#define TAG_WORK 42 -#define TAG_FINISH 43 -#define TAG_TIMING 44 -#define TAG_DHT_PERF 45 -#define TAG_DHT_STATS 46 -#define TAG_DHT_STORE 47