Substitute ChemSim

This commit is contained in:
Max Lübke 2021-01-05 18:07:27 +01:00
parent cdcadced2a
commit 0ed1ea9fd8
11 changed files with 1552 additions and 366 deletions

8
.gitignore vendored
View File

@ -137,12 +137,14 @@ vignettes/*.pdf
# R package: bookdown caching files
/*_files/
### vscode ###
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# End of https://www.toptal.com/developers/gitignore/api/c,c++,r,cmake

View File

@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS})
#define program libraries
add_library(POET_Libs STATIC util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp model/Grid.cpp)
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp)
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)

423
src/DHT/DHT.cpp Normal file
View File

@ -0,0 +1,423 @@
#include "DHT.h"
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <stdio.h>
/*
* determining destination rank and index by hash of key
*
* return values by reference
*/
static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) {
uint64_t tmp;
int char_hop = 9-index_count;
unsigned int i;
for (i = 0; i < index_count ; i++) {
tmp = 0;
memcpy(&tmp,(unsigned char *)&hash+i, char_hop);
index[i] = (unsigned int) (tmp % table_size);
}
*dest_rank = (unsigned int) (hash % comm_size);
}
/**
* set write flag to 1
*/
static void set_flag(char* flag_byte) {
*flag_byte = 0;
*flag_byte |= (1 << 0);
}
/**
* return 1 if write flag is set
* else 0
*/
static int read_flag(char flag_byte) {
if ((flag_byte & 0x01) == 0x01) {
return 1;
} else return 0;
}
/*
* allocating memory for DHT object and buckets.
* creating MPI window for OSC
* filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use
* return DHT object
*/
DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) {
DHT *object;
MPI_Win window;
void* mem_alloc;
int comm_size, tmp;
tmp = (int) ceil(log2(size));
if (tmp%8 != 0) tmp = tmp + (8-(tmp%8));
object = (DHT*) malloc(sizeof(DHT));
if (object == NULL) return NULL;
//every memory allocation has 1 additional byte for flags etc.
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL;
if (MPI_Comm_size(comm, &comm_size) != 0) return NULL;
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL;
object->data_size = data_size;
object->key_size = key_size;
object->table_size = size;
object->window = window;
object->hash_func = hash_func;
object->comm_size = comm_size;
object->communicator = comm;
object->read_misses = 0;
object->collisions = 0;
object->recv_entry = malloc(1 + data_size + key_size);
object->send_entry = malloc(1 + data_size + key_size);
object->index_count = 9-(tmp/8);
object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int));
object->mem_alloc = mem_alloc;
DHT_stats *stats;
stats = (DHT_stats*) malloc(sizeof(DHT_stats));
if (stats == NULL) return NULL;
object->stats = stats;
object->stats->writes_local = (int*) calloc(comm_size, sizeof(int));
object->stats->old_writes = 0;
object->stats->read_misses = 0;
object->stats->collisions = 0;
object->stats->w_access = 0;
object->stats->r_access = 0;
return object;
}
/*
* puts passed by data with key to DHT
*
* returning DHT_MPI_ERROR = -1 if MPI error occurred
* else DHT_SUCCESS = 0 if success
*/
int DHT_write(DHT *table, void* send_key, void* send_data) {
unsigned int dest_rank, i;
int result = DHT_SUCCESS;
table->stats->w_access++;
//determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
//concatenating key with data to write entry to DHT
set_flag((char *) table->send_entry);
memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size);
memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size);
//locking window of target rank with exclusive lock
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
for (i = 0; i < table->index_count; i++)
{
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
//increment collision counter if receiving key doesn't match sending key
//,entry has write flag + last index is reached
if (read_flag(*(char *)table->recv_entry)) {
if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) {
if (i == (table->index_count)-1) {
table->collisions += 1;
table->stats->collisions += 1;
result = DHT_WRITE_SUCCESS_WITH_COLLISION;
break;
}
} else break;
} else {
table->stats->writes_local[dest_rank]++;
break;
}
}
//put data to DHT
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
//unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return result;
}
/*
* gets data from the DHT by key
*
* return DHT_SUCCESS = 0 if success
* DHT_MPI_ERROR = -1 if MPI error occurred
* DHT_READ_ERROR = -2 if receiving key doesn't match sending key
*/
int DHT_read(DHT *table, void* send_key, void* destination) {
unsigned int dest_rank, i;
table->stats->r_access++;
//determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
//locking window of target rank with shared lock
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR;
//receive data
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
//increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached
//else copy data to dereference of passed by destination pointer
if ((read_flag(*(char *) table->recv_entry)) == 0) {
table->read_misses += 1;
table->stats->read_misses += 1;
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
}
if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) {
if (i == (table->index_count)-1) {
table->read_misses += 1;
table->stats->read_misses += 1;
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
}
} else break;
}
//unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size);
return DHT_SUCCESS;
}
int DHT_to_file(DHT* table, const char* filename) {
//open file
MPI_File file;
if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
int rank;
MPI_Comm_rank(table->communicator, &rank);
//write header (key_size and data_size)
if (rank == 0) {
if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
}
if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
char* ptr;
int bucket_size = table->key_size + table->data_size + 1;
//iterate over local memory
for (unsigned int i = 0; i < table->table_size; i++) {
ptr = (char *) table->mem_alloc + (i * bucket_size);
//if bucket has been written to (checked by written_flag)...
if (read_flag(*ptr)) {
//write key and data to file
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
}
}
//close file
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
}
int DHT_from_file(DHT* table, const char* filename) {
MPI_File file;
MPI_Offset f_size;
int e_size, m_size, cur_pos, rank, offset;
char* buffer;
void* key;
void* data;
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR;
MPI_Comm_rank(table->communicator, &rank);
e_size = table->key_size + table->data_size;
m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE;
buffer = (char *) malloc(m_size);
if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE;
if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE;
offset = e_size*table->comm_size;
if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
cur_pos = DHT_HEADER_SIZE + (rank * e_size);
while(cur_pos < f_size) {
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
MPI_Offset tmp;
MPI_File_get_position(file, &tmp);
if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
key = buffer;
data = (buffer+table->key_size);
if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR;
cur_pos += offset;
}
free (buffer);
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
}
/*
* frees up memory and accumulate counter
*/
int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) {
int buf;
if (collision_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
*collision_counter = buf;
}
if (readerror_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
*readerror_counter = buf;
}
if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR;
if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR;
free(table->recv_entry);
free(table->send_entry);
free(table->index);
free(table->stats->writes_local);
free(table->stats);
free(table);
return DHT_SUCCESS;
}
/*
* prints a table with statistics about current use of DHT
* for each participating process and summed up results containing:
* 1. occupied buckets (in respect to the memory of this process)
* 2. free buckets (in respect to the memory of this process)
* 3. calls of DHT_write (w_access)
* 4. calls of DHT_read (r_access)
* 5. read misses (see DHT_READ_ERROR)
* 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION)
* 3-6 will reset with every call of this function
* finally the amount of new written entries is printed out (in relation to last call of this funtion)
*/
int DHT_print_statistics(DHT *table) {
int *written_buckets;
int *read_misses, sum_read_misses;
int *collisions, sum_collisions;
int sum_w_access, sum_r_access, *w_access, *r_access;
int rank;
MPI_Comm_rank(table->communicator, &rank);
//disable possible warning of unitialized variable, which is not the case
//since we're using MPI_Gather to obtain all values only on rank 0
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
//obtaining all values from all processes in the communicator
if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->read_misses = 0;
if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->collisions = 0;
if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->w_access = 0;
if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int));
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
table->stats->r_access = 0;
if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int));
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
if (rank == 0) { //only process with rank 0 will print out results as a table
int sum_written_buckets = 0;
for (int i=0; i < table->comm_size; i++) {
sum_written_buckets += written_buckets[i];
}
int members = 7;
int padsize = (members*12)+1;
char pad[padsize+1];
memset(pad, '-', padsize*sizeof(char));
pad[padsize]= '\0';
printf("\n");
printf("%-35s||resets with every call of this function\n", " ");
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n",
"rank",
"occupied",
"free",
"w_access",
"r_access",
"read misses",
"collisions");
printf("%s\n", pad);
for (int i = 0; i < table->comm_size; i++) {
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
i,
written_buckets[i],
table->table_size-written_buckets[i],
w_access[i],
r_access[i],
read_misses[i],
collisions[i]);
}
printf("%s\n", pad);
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
"sum",
sum_written_buckets,
(table->table_size*table->comm_size)-sum_written_buckets,
sum_w_access,
sum_r_access,
sum_read_misses,
sum_collisions);
printf("%s\n", pad);
printf("%s %d\n",
"new entries:",
sum_written_buckets - table->stats->old_writes);
printf("\n");
fflush(stdout);
table->stats->old_writes = sum_written_buckets;
}
//enable warning again
#pragma GCC diagnostic pop
MPI_Barrier(table->communicator);
return DHT_SUCCESS;
}

112
src/DHT/DHT.h Normal file
View File

@ -0,0 +1,112 @@
/*
* File: DHT.h
* Author: max luebke
*
* Created on 16. November 2017, 09:14
*/
#ifndef DHT_H
#define DHT_H
#include <mpi.h>
#include <stdint.h>
#define DHT_MPI_ERROR -1
#define DHT_READ_ERROR -2
#define DHT_SUCCESS 0
#define DHT_WRITE_SUCCESS_WITH_COLLISION 1
#define DHT_WRONG_FILE 11
#define DHT_FILE_IO_ERROR 12
#define DHT_FILE_READ_ERROR 13
#define DHT_FILE_WRITE_ERROR 14
#define DHT_HEADER_SIZE 8
typedef struct {;
int *writes_local, old_writes;
int read_misses, collisions;
int w_access, r_access;
} DHT_stats;
typedef struct {
MPI_Win window;
int data_size;
int key_size;
unsigned int table_size;
MPI_Comm communicator;
int comm_size;
uint64_t(*hash_func) (int, void*);
void* recv_entry;
void* send_entry;
void* mem_alloc;
int read_misses;
int collisions;
unsigned int *index;
unsigned int index_count;
DHT_stats *stats;
} DHT;
/*
* parameters:
* MPI_Comm comm - communicator of processes that are holding the DHT
* int size_per_process - number of buckets each process will create
* int data_size - size of data in bytes
* int key_size - size of key in bytes
* *hash_func - pointer to hashfunction
*
* return:
* NULL if error during initialization
* DHT* if success
*/
extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*));
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
* void* data - pointer to data
* void* - pointer to key
*
* return:
* error value (see above)
*/
extern int DHT_write(DHT *table, void* key, void* data);
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
* void* key - pointer to key
* void* destination - pointer which will hold the resulting data from DHT
*
* return:
* error value (see above)
*/
extern int DHT_read(DHT *table, void* key, void* destination);
extern int DHT_to_file(DHT *table, const char* filename);
extern int DHT_from_file(DHT *table, const char* filename);
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
* int* collision_counter - pointer which will hold the total count of collisions
* int* readerror_counter - pointer which will hold the total count of read errors
*
* return:
* error value (see above)
*/
extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter);
/*
* parameters:
* DHT *table - DHT_object created by DHT_create
*
* return:
* error value (DHT_SUCCESS or DHT_MPI_ERROR)
*/
extern int DHT_print_statistics(DHT *table);
#endif /* DHT_H */

165
src/DHT/DHT_Wrapper.cpp Normal file
View File

@ -0,0 +1,165 @@
#include "DHT_Wrapper.h"
#include "DHT.h"
#include <openssl/md5.h>
#include <iostream>
using namespace poet;
using namespace std;
uint64_t get_md5(int key_size, void *key) {
MD5_CTX ctx;
unsigned char sum[MD5_DIGEST_LENGTH];
uint64_t retval, *v1, *v2;
MD5_Init(&ctx);
MD5_Update(&ctx, key, key_size);
MD5_Final(sum, &ctx);
v1 = (uint64_t *)&sum[0];
v2 = (uint64_t *)&sum[8];
retval = *v1 ^ *v2;
return retval;
}
DHT_Wrapper::DHT_Wrapper(t_simparams *params, MPI_Comm dht_comm,
int buckets_per_process, int data_size, int key_size) {
dht_object =
DHT_create(dht_comm, buckets_per_process, data_size, key_size, &get_md5);
fuzzing_buffer = (double *)malloc(key_size);
this->dt_differ = params->dt_differ;
this->dht_log = params->dht_log;
this->dht_signif_vector = params->dht_signif_vector;
this->dht_prop_type_vector = params->dht_prop_type_vector;
}
DHT_Wrapper::~DHT_Wrapper() {
DHT_free(dht_object, NULL, NULL);
free(fuzzing_buffer);
}
void DHT_Wrapper::checkDHT(int length, std::vector<bool> &out_result_index,
double *work_package, double dt) {
void *key;
int res;
int var_count = dht_prop_type_vector.size();
for (int i = 0; i < length; i++) {
key = (void *)&(work_package[i * var_count]);
// fuzz data (round, logarithm etc.)
fuzzForDHT(var_count, key, dt);
// overwrite input with data from DHT, IF value is found in DHT
res = DHT_read(dht_object, fuzzing_buffer, key);
if (res == DHT_SUCCESS) {
// flag that this line is replaced by DHT-value, do not simulate!!
out_result_index[i] = false;
dht_hits++;
} else if (res == DHT_READ_ERROR) {
// this line is untouched, simulation is needed
out_result_index[i] = true;
dht_miss++;
} else {
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
}
}
}
void DHT_Wrapper::fillDHT(int length, std::vector<bool> &result_index,
double *work_package, double *results, double dt) {
void *key;
void *data;
int res;
int var_count = dht_prop_type_vector.size();
for (int i = 0; i < length; i++) {
key = (void *)&(work_package[i * var_count]);
data = (void *)&(results[i * var_count]);
if (result_index[i]) {
// If true -> was simulated, needs to be inserted into dht
// fuzz data (round, logarithm etc.)
fuzzForDHT(var_count, key, dt);
res = DHT_write(dht_object, fuzzing_buffer, data);
if (res != DHT_SUCCESS) {
if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) {
dht_evictions++;
} else {
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
}
}
}
}
}
int DHT_Wrapper::tableToFile(const char *filename) {
int res = DHT_to_file(dht_object, filename);
return res;
}
int DHT_Wrapper::fileToTable(const char *filename) {
int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS)
return res;
DHT_print_statistics(dht_object);
return DHT_SUCCESS;
}
void DHT_Wrapper::printStatistics() {
int res;
res = DHT_print_statistics(dht_object);
if (res != DHT_SUCCESS) {
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
}
}
uint64_t DHT_Wrapper::getHits() { return this->dht_hits; }
uint64_t DHT_Wrapper::getMisses() { return this->dht_miss; }
uint64_t DHT_Wrapper::getEvictions() { return this->dht_evictions; }
void DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
unsigned int i = 0;
// introduce fuzzing to allow more hits in DHT
for (i = 0; i < (unsigned int)var_count; i++) {
if (dht_prop_type_vector[i] == "act") {
// with log10
if (dht_log) {
if (((double *)key)[i] < 0)
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in "
"key!"
<< endl;
else if (((double *)key)[i] == 0)
fuzzing_buffer[i] = 0;
else
fuzzing_buffer[i] =
ROUND(-(std::log10(((double *)key)[i])), dht_signif_vector[i]);
} else {
// without log10
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_signif_vector[i]);
}
} else if (dht_prop_type_vector[i] == "logact") {
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_signif_vector[i]);
} else if (dht_prop_type_vector[i] == "ignore") {
fuzzing_buffer[i] = 0;
} else {
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong "
"prop_type!"
<< endl;
}
}
if (dt_differ)
fuzzing_buffer[var_count] = dt;
}

56
src/DHT/DHT_Wrapper.h Normal file
View File

@ -0,0 +1,56 @@
#ifndef DHT_WRAPPER_H
#define DHT_WRAPPER_H
#include <mpi.h>
#include <string>
#include <vector>
#include "../util/SimParams.h"
#include "DHT.h"
#define ROUND(value, signif) \
(((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif))
uint64_t get_md5(int key_size, void *key);
namespace poet {
class DHT_Wrapper {
public:
DHT_Wrapper(t_simparams *params, MPI_Comm dht_comm, int buckets_per_process,
int data_size, int key_size);
~DHT_Wrapper();
void checkDHT(int length, std::vector<bool> &out_result_index,
double *work_package, double dt);
void fillDHT(int length, std::vector<bool> &result_index,
double *work_package, double *results, double dt);
int tableToFile(const char *filename);
int fileToTable(const char *filename);
void printStatistics();
uint64_t getHits();
uint64_t getMisses();
uint64_t getEvictions();
private:
void fuzzForDHT(int var_count, void *key, double dt);
DHT *dht_object;
uint64_t dht_hits = 0;
uint64_t dht_miss = 0;
uint64_t dht_evictions = 0;
double *fuzzing_buffer;
bool dt_differ;
bool dht_log;
std::vector<int> dht_signif_vector;
std::vector<std::string> dht_prop_type_vector;
};
} // namespace poet
#endif // DHT_WRAPPER_H

View File

@ -1,20 +1,22 @@
#include <Rcpp.h>
#include <mpi.h> // mpi header file
#include <cstring>
#include <iostream>
#include <string>
#include <vector>
#include <Rcpp.h>
#include <mpi.h> // mpi header file
#include "DHT.h" // MPI-DHT Implementation
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
#include "dht_wrapper.h"
#include "global_buffer.h"
// #include "DHT.h" // MPI-DHT Implementation
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
// #include "dht_wrapper.h"
// #include "global_buffer.h"
#include "model/ChemSim.h"
#include "model/Grid.h"
#include "util/RRuntime.h"
#include "util/SimParams.h"
#include "worker.h"
// #include "worker.h"
#define DHT_SIZE_PER_PROCESS 1073741824
using namespace std;
using namespace poet;
@ -59,8 +61,7 @@ std::list<std::string> checkOptions(argh::parser cmdl) {
std::set<std::string> plist = paramList();
for (auto &flag : cmdl.flags()) {
if (!(flist.find(flag) != flist.end()))
retList.push_back(flag);
if (!(flist.find(flag) != flist.end())) retList.push_back(flag);
}
for (auto &param : cmdl.params()) {
@ -85,7 +86,7 @@ int main(int argc, char *argv[]) {
double cummul_workers = 0.f;
double cummul_chemistry_master = 0.f;
double cummul_master_seq_pre_loop = 0.f;
double cummul_master_seq = 0.f;
double cummul_master_seq_loop = 0.f;
double master_idle = 0.f;
@ -121,8 +122,7 @@ int main(int argc, char *argv[]) {
// make a list of processes in the new communicator
process_ranks = (int *)malloc(params.world_size * sizeof(int));
for (int I = 1; I < params.world_size; I++)
process_ranks[I - 1] = I;
for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I;
// get the group under MPI_COMM_WORLD
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
@ -130,7 +130,7 @@ int main(int argc, char *argv[]) {
MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group);
// create the new communicator
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
free(process_ranks); // cleanup
free(process_ranks); // cleanup
// cout << "Done";
if (cmdl[{"help", "h"}]) {
@ -226,21 +226,24 @@ int main(int argc, char *argv[]) {
R["local_rank"] = params.world_rank;
/*Loading Dependencies*/
std::string r_load_dependencies = "suppressMessages(library(Rmufits));"
"suppressMessages(library(RedModRphree));"
"source('kin_r_library.R');"
"source('parallel_r_library.R');";
std::string r_load_dependencies =
"suppressMessages(library(Rmufits));"
"suppressMessages(library(RedModRphree));"
"source('kin_r_library.R');"
"source('parallel_r_library.R');";
R.parseEvalQ(r_load_dependencies);
std::string filesim;
cmdl(1) >> filesim; // <- first positional argument
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns
cmdl(1) >> filesim; // <- first positional argument
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
R.parseEvalQ(
"source(filesim)"); // eval the init string, ignoring any returns
if (params.world_rank ==
0) { // only rank 0 initializes goes through the whole initialization
cmdl(2) >> params.out_dir; // <- second positional argument
R["fileout"] = wrap(params.out_dir); // assign a char* (string) to 'fileout'
0) { // only rank 0 initializes goes through the whole initialization
cmdl(2) >> params.out_dir; // <- second positional argument
R["fileout"] =
wrap(params.out_dir); // assign a char* (string) to 'fileout'
// Note: R::sim_init() checks if the directory already exists,
// if not it makes it
@ -253,9 +256,9 @@ int main(int argc, char *argv[]) {
R.parseEval(master_init_code);
params.dt_differ =
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
} else { // workers will only read the setup DataFrame defined by input file
} else { // workers will only read the setup DataFrame defined by input file
R.parseEval("mysetup <- setup");
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
}
@ -280,15 +283,15 @@ int main(int argc, char *argv[]) {
R["work_package_size"] = params.wp_size;
// Removed additional field for ID in previous versions
if (params.world_rank == 0) {
mpi_buffer =
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
} else {
mpi_buffer = (double *)calloc(
(params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
mpi_buffer_results =
(double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
}
// if (params.world_rank == 0) {
// mpi_buffer =
// (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
// } else {
// mpi_buffer = (double *)calloc(
// (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
// mpi_buffer_results =
// (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
// }
if (params.world_rank == 0) {
cout << "CPP: parallel init completed (buffers allocated)!" << endl;
@ -302,14 +305,14 @@ int main(int argc, char *argv[]) {
if (params.dht_enabled) {
// cout << "\nCreating DHT\n";
// determine size of dht entries
int dht_data_size = grid.getCols() * sizeof(double);
int dht_key_size =
grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
// int dht_data_size = grid.getCols() * sizeof(double);
// int dht_key_size =
// grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
// determine bucket count for preset memory usage
// bucket size is key + value + 1 byte for status
int dht_buckets_per_process =
params.dht_size_per_process / (1 + dht_data_size + dht_key_size);
// // // determine bucket count for preset memory usage
// // // bucket size is key + value + 1 byte for status
// int dht_buckets_per_process =
// params.dht_size_per_process / (1 + dht_data_size + dht_key_size);
// MDL : following code moved here from worker.cpp
/*Load significance vector from R setup file (or set default)*/
@ -317,8 +320,8 @@ int main(int argc, char *argv[]) {
if (signif_vector_exists) {
params.dht_signif_vector = as<std::vector<int>>(R["signif_vector"]);
} else {
params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
dht_significant_digits);
// params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
// dht_significant_digits);
}
/*Load property type vector from R setup file (or set default)*/
@ -326,22 +329,24 @@ int main(int argc, char *argv[]) {
if (prop_type_vector_exists) {
params.dht_prop_type_vector = as<std::vector<string>>(R["prop_type"]);
} else {
params.dht_prop_type_vector.assign(dht_object->key_size / sizeof(double),
"act");
// params.dht_prop_type_vector.assign(dht_object->key_size /
// sizeof(double),
// "act");
}
if (params.world_rank == 0) {
// print only on master, values are equal on all workes
cout << "CPP: dht_data_size: " << dht_data_size << "\n";
cout << "CPP: dht_key_size: " << dht_key_size << "\n";
cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process
<< endl;
// // print only on master, values are equal on all workes
// cout << "CPP: dht_data_size: " << dht_data_size << "\n";
// cout << "CPP: dht_key_size: " << dht_key_size << "\n";
// cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process
// << endl;
// MDL: new output on signif_vector and prop_type
if (signif_vector_exists) {
cout << "CPP: using problem-specific rounding digits: " << endl;
R.parseEval("print(data.frame(prop=prop, type=prop_type, "
"digits=signif_vector))");
R.parseEval(
"print(data.frame(prop=prop, type=prop_type, "
"digits=signif_vector))");
} else {
cout << "CPP: using DHT default rounding digits = "
<< dht_significant_digits << endl;
@ -353,22 +358,22 @@ int main(int argc, char *argv[]) {
R["dht_final_proptype"] = params.dht_prop_type_vector;
}
if (params.dht_strategy == 0) {
if (params.world_rank != 0) {
dht_object = DHT_create(dht_comm, dht_buckets_per_process,
dht_data_size, dht_key_size, get_md5);
// if (params.dht_strategy == 0) {
// if (params.world_rank != 0) {
// dht_object = DHT_create(dht_comm, dht_buckets_per_process,
// dht_data_size, dht_key_size, get_md5);
// storing for access from worker and callback functions
fuzzing_buffer = (double *)malloc(dht_key_size);
}
} else {
dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process,
dht_data_size, dht_key_size, get_md5);
}
// // storing for access from worker and callback functions
// fuzzing_buffer = (double *)malloc(dht_key_size);
// }
// } else {
// dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process,
// dht_data_size, dht_key_size, get_md5);
// }
if (params.world_rank == 0) {
cout << "CPP: DHT successfully created!" << endl;
}
// if (params.world_rank == 0) {
// cout << "CPP: DHT successfully created!" << endl;
// }
}
// MDL: store all parameters
@ -380,52 +385,23 @@ int main(int argc, char *argv[]) {
MPI_Barrier(MPI_COMM_WORLD);
if (params.world_rank == 0) { /* This is executed by the master */
ChemMaster master(&params, R, grid);
Rcpp::NumericVector master_send;
Rcpp::NumericVector master_recv;
sim_a_seq = MPI_Wtime();
worker_struct *workerlist =
(worker_struct *)calloc(params.world_size - 1, sizeof(worker_struct));
int need_to_receive;
MPI_Status probe_status;
double *timings;
uint64_t *dht_perfs = NULL;
int local_work_package_size;
// a temporary send buffer
double *send_buffer;
send_buffer = (double *)calloc(
(params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
// helper variables
int iteration;
double dt, current_sim_time;
int n_wp = 1; // holds the actual number of wp which is
// computed later in R::distribute_work_packages()
std::vector<int> wp_sizes_vector; // vector with the sizes of
// each package
sim_start = MPI_Wtime();
// Iteration Count is dynamic, retrieving value from R (is only needed by
// master for the following loop)
uint32_t maxiter = R.parseEval("mysetup$maxiter");
sim_b_seq = MPI_Wtime();
cummul_master_seq_pre_loop += sim_b_seq - sim_a_seq;
/*SIMULATION LOOP*/
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
sim_a_seq = MPI_Wtime();
cummul_master_send = 0.f;
cummul_master_recv = 0.f;
cout << "CPP: Evaluating next time step" << endl;
R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
@ -440,201 +416,19 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
sim_a_transport = MPI_Wtime();
if (iter == 1) master.prepareSimulation();
cout << "CPP: Chemistry" << endl;
/*Fallback for sequential execution*/
sim_b_chemistry = MPI_Wtime();
if (params.world_size == 1) {
// MDL : the transformation of values into pH and pe
// takes now place in master_advection() so the
// following line is unneeded
// R.parseEvalQ("mysetup$state_T <-
// RedModRphree::Act2pH(mysetup$state_T)");
R.parseEvalQ(
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
master.runSeq();
} else { /*send work to workers*/
// NEW: only in the first iteration we call
// R::distribute_work_packages()!!
if (iter == 1) {
R.parseEvalQ(
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
"package_size=work_package_size)");
// we only sort once the vector
R.parseEvalQ("ordered_ids <- order(wp_ids)");
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
n_wp = (int)R.parseEval("length(wp_sizes_vector)");
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
cout << "CPP: Total number of work packages: " << n_wp << endl;
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
}
/* shuffle and extract data
MDL: we now apply :Act2pH directly in master_advection
*/
// R.parseEval("tmp <-
// shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)");
// Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
// convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
// cout << "CPP: shuffle_field() done" << endl;
grid.shuffleAndExport(mpi_buffer);
/* send and receive work; this is done by counting
* the wp */
int pkg_to_send = n_wp;
int pkg_to_recv = n_wp;
size_t colCount = grid.getCols();
int free_workers = params.world_size - 1;
double *work_pointer = mpi_buffer;
sim_c_chemistry = MPI_Wtime();
/* visual progress */
float progress = 0.0;
int barWidth = 70;
// retrieve data from R runtime
iteration = (int)R.parseEval("mysetup$iter");
dt = (double)R.parseEval("mysetup$requested_dt");
current_sim_time =
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
int count_pkgs = 0;
sim_b_seq = MPI_Wtime();
sim_c_chemistry = MPI_Wtime();
while (pkg_to_recv > 0) // start dispatching work packages
{
/* visual progress */
progress = (float)(count_pkgs + 1) / n_wp;
cout << "[";
int pos = barWidth * progress;
for (int iprog = 0; iprog < barWidth; ++iprog) {
if (iprog < pos)
cout << "=";
else if (iprog == pos)
cout << ">";
else
cout << " ";
}
std::cout << "] " << int(progress * 100.0) << " %\r";
std::cout.flush();
/* end visual progress */
if (pkg_to_send > 0) {
master_send_a = MPI_Wtime();
/*search for free workers and send work*/
for (int p = 0; p < params.world_size - 1; p++) {
if (workerlist[p].has_work == 0 &&
pkg_to_send > 0) /* worker is free */ {
// to enable different work_package_size, set local copy of
// work_package_size to either global work_package size or
// remaining 'to_send' packages to_send >= work_package_size ?
// local_work_package_size = work_package_size :
// local_work_package_size = to_send;
local_work_package_size = (int)wp_sizes_vector[count_pkgs];
count_pkgs++;
// cout << "CPP: sending pkg n. " << count_pkgs << " with size "
// << local_work_package_size << endl;
/*push pointer forward to next work package, after taking the
* current one*/
workerlist[p].send_addr = work_pointer;
int end_of_wp = local_work_package_size * colCount;
work_pointer = &(work_pointer[end_of_wp]);
// fill send buffer starting with work_package ...
std::memcpy(send_buffer, workerlist[p].send_addr,
(end_of_wp) * sizeof(double));
// followed by: work_package_size
send_buffer[end_of_wp] = (double)local_work_package_size;
// current iteration of simulation
send_buffer[end_of_wp + 1] = (double)iteration;
// size of timestep in seconds
send_buffer[end_of_wp + 2] = dt;
// current time of simulation (age) in seconds
send_buffer[end_of_wp + 3] = current_sim_time;
// placeholder for work_package_count
send_buffer[end_of_wp + 4] = 0.;
/* ATTENTION Worker p has rank p+1 */
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE,
p + 1, TAG_WORK, MPI_COMM_WORLD);
workerlist[p].has_work = 1;
free_workers--;
pkg_to_send -= 1;
}
}
master_send_b = MPI_Wtime();
cummul_master_send += master_send_b - master_send_a;
}
/*check if there are results to receive and receive them*/
need_to_receive = 1;
master_recv_a = MPI_Wtime();
while (need_to_receive && pkg_to_recv > 0) {
if (pkg_to_send > 0 && free_workers > 0)
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD,
&need_to_receive, &probe_status);
else {
idle_a = MPI_Wtime();
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD,
&probe_status);
idle_b = MPI_Wtime();
master_idle += idle_b - idle_a;
}
if (need_to_receive) {
int p = probe_status.MPI_SOURCE;
int size;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p,
TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
workerlist[p - 1].has_work = 0;
pkg_to_recv -= 1;
free_workers++;
}
}
master_recv_b = MPI_Wtime();
cummul_master_recv += master_recv_b - master_recv_a;
}
sim_c_seq = MPI_Wtime();
// don't overwrite last progress
cout << endl;
sim_d_chemistry = MPI_Wtime();
cummul_workers += sim_d_chemistry - sim_c_chemistry;
// convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
// R.from_C_domain(mpi_buffer);
// R["chemistry_data"] = R.getBufferDataFrame();
///* unshuffle results */
// R.parseEval("result <- unshuffle_field(chemistry_data,
// ordered_ids)");
grid.importAndUnshuffle(mpi_buffer);
/* do master stuff */
sim_e_chemistry = MPI_Wtime();
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
sim_f_chemistry = MPI_Wtime();
cummul_chemistry_master += sim_f_chemistry - sim_e_chemistry;
master.runIteration();
}
sim_a_chemistry = MPI_Wtime();
double master_seq_a = MPI_Wtime();
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
@ -649,48 +443,22 @@ int main(int argc, char *argv[]) {
<< endl
<< endl;
if (params.dht_enabled) {
for (int i = 1; i < params.world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD);
}
double master_seq_b = MPI_Wtime();
MPI_Barrier(MPI_COMM_WORLD);
cummul_master_seq += master.getSeqTime() + (master_seq_b - master_seq_a);
master_send.push_back(master.getSendTime(), "it_" + to_string(iter));
master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter));
if (params.dht_snaps == 2) {
std::stringstream outfile;
outfile << params.out_dir << "/iter_" << std::setfill('0')
<< std::setw(3) << iter << ".dht";
for (int i = 1; i < params.world_size; i++) {
MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i,
TAG_DHT_STORE, MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
}
for (int i = 1; i < params.world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
}
sim_d_seq = MPI_Wtime();
MPI_Barrier(MPI_COMM_WORLD);
cummul_master_seq_loop +=
((sim_b_seq - sim_a_seq) - (sim_a_transport - sim_b_transport)) +
(sim_d_seq - sim_c_seq);
master_send.push_back(cummul_master_send, "it_" + to_string(iter));
master_recv.push_back(cummul_master_recv, "it_" + to_string(iter));
} // END SIMULATION LOOP
} // END SIMULATION LOOP
sim_end = MPI_Wtime();
if (params.dht_enabled && params.dht_snaps > 0) {
cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl;
std::string outfile;
outfile = params.out_dir + ".dht";
for (int i = 1; i < params.world_size; i++) {
MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE,
MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: Master: ... done" << endl;
}
master.finishSimulation();
Rcpp::NumericVector phreeqc_time;
Rcpp::NumericVector dht_get_time;
@ -702,6 +470,10 @@ int main(int argc, char *argv[]) {
timings = (double *)calloc(3, sizeof(double));
int dht_hits = 0;
int dht_miss = 0;
int dht_collision = 0;
if (params.dht_enabled) {
dht_hits = 0;
dht_miss = 0;
@ -748,23 +520,21 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
R["simtime_chemistry"] = cummul_chemistry;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
R["simtime_workers"] = cummul_workers;
R["simtime_workers"] = master.getWorkerTime();
R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
R["simtime_chemistry_master"] = cummul_chemistry_master;
R["simtime_chemistry_master"] = master.getChemMasterTime();
R.parseEvalQ(
"profiling$simtime_chemistry_master <- simtime_chemistry_master");
R["seq_master_prep"] = cummul_master_seq_pre_loop;
R.parseEvalQ("profiling$seq_master_prep <- seq_master_prep");
R["seq_master_loop"] = cummul_master_seq_loop;
R.parseEvalQ("profiling$seq_master_loop <- seq_master_loop");
R["seq_master"] = cummul_master_seq;
R.parseEvalQ("profiling$seq_master <- seq_master");
// R["master_send"] = master_send;
// R.parseEvalQ("profiling$master_send <- master_send");
// R["master_recv"] = master_recv;
// R.parseEvalQ("profiling$master_recv <- master_recv");
R["idle_master"] = master_idle;
R["idle_master"] = master.getIdleTime();
R.parseEvalQ("profiling$idle_master <- idle_master");
R["idle_worker"] = idle_worker;
R.parseEvalQ("profiling$idle_worker <- idle_worker");
@ -788,11 +558,9 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
free(workerlist);
free(timings);
if (params.dht_enabled)
free(dht_perfs);
if (params.dht_enabled) free(dht_perfs);
cout << "CPP: Done! Results are stored as R objects into <"
<< params.out_dir << "/timings.rds>" << endl;
@ -802,42 +570,22 @@ int main(int argc, char *argv[]) {
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
} else { /*This is executed by the workers*/
if (!params.dht_file.empty()) {
int res = file_to_table((char *)params.dht_file.c_str());
if (res != DHT_SUCCESS) {
if (res == DHT_WRONG_FILE) {
if (params.world_rank == 2)
cerr << "CPP: Worker: Wrong File" << endl;
} else {
if (params.world_rank == 2)
cerr << "CPP: Worker: Error in loading current state of DHT from "
"file"
<< endl;
}
return EXIT_FAILURE;
} else {
if (params.world_rank == 2)
cout << "CPP: Worker: Successfully loaded state of DHT from file "
<< params.dht_file << endl;
std::cout.flush();
}
}
worker_function(&params);
free(mpi_buffer_results);
ChemWorker worker(&params, R, grid);
worker.prepareSimulation(dht_comm);
worker.loop();
}
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
if (params.dht_enabled) {
if (params.dht_strategy == 0) {
if (params.world_rank != 0) {
DHT_free(dht_object, NULL, NULL);
}
} else {
DHT_free(dht_object, NULL, NULL);
}
}
// if (params.dht_enabled) {
// if (params.dht_strategy == 0) {
// if (params.world_rank != 0) {
// DHT_free(dht_object, NULL, NULL);
// }
// } else {
// DHT_free(dht_object, NULL, NULL);
// }
// }
free(mpi_buffer);
MPI_Finalize();

217
src/model/ChemMaster.cpp Normal file
View File

@ -0,0 +1,217 @@
#include <Rcpp.h>
#include <mpi.h>
#include <iostream>
#include "ChemSim.h"
using namespace poet;
using namespace std;
using namespace Rcpp;
#define TAG_WORK 42
ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_)
: ChemSim(params, R_, grid_) {
this->wp_size = params->wp_size;
this->out_dir = params->out_dir;
}
void ChemMaster::runIteration() {
double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b;
double sim_e_chemistry, sim_f_chemistry;
int pkg_to_send, pkg_to_recv;
int free_workers;
int i_pkgs;
seq_a = MPI_Wtime();
grid.shuffleAndExport(mpi_buffer);
// retrieve data from R runtime
iteration = (int)R.parseEval("mysetup$iter");
dt = (double)R.parseEval("mysetup$requested_dt");
current_sim_time =
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
// setup local variables
pkg_to_send = wp_sizes_vector.size();
pkg_to_recv = wp_sizes_vector.size();
work_pointer = mpi_buffer;
free_workers = world_size - 1;
i_pkgs = 0;
seq_b = MPI_Wtime();
seq_t += seq_b - seq_a;
worker_chemistry_a = MPI_Wtime();
while (pkg_to_recv > 0) {
// TODO: Progressbar into IO instance.
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
if (pkg_to_send > 0) {
sendPkgs(pkg_to_send, i_pkgs, free_workers);
}
recvPkgs(pkg_to_recv, pkg_to_send > 0, free_workers);
}
// Just to complete the progressbar
cout << endl;
worker_chemistry_b = MPI_Wtime();
worker_t = worker_chemistry_b - worker_chemistry_a;
seq_c = MPI_Wtime();
grid.importAndUnshuffle(mpi_buffer);
/* do master stuff */
sim_e_chemistry = MPI_Wtime();
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
sim_f_chemistry = MPI_Wtime();
chem_master += sim_f_chemistry - sim_e_chemistry;
seq_d = MPI_Wtime();
seq_t += seq_d - seq_c;
}
void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
int &free_workers) {
double master_send_a, master_send_b;
int local_work_package_size;
int end_of_wp;
// start time measurement
master_send_a = MPI_Wtime();
/*search for free workers and send work*/
for (int p = 0; p < world_size - 1; p++) {
if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ {
// to enable different work_package_size, set local copy of
// work_package_size to either global work_package size or
// remaining 'to_send' packages to_send >= work_package_size ?
// local_work_package_size = work_package_size :
// local_work_package_size = to_send;
local_work_package_size = (int)wp_sizes_vector[count_pkgs];
count_pkgs++;
// cout << "CPP: sending pkg n. " << count_pkgs << " with size "
// << local_work_package_size << endl;
/*push pointer forward to next work package, after taking the
* current one*/
workerlist[p].send_addr = work_pointer;
end_of_wp = local_work_package_size * grid.getCols();
work_pointer = &(work_pointer[end_of_wp]);
// fill send buffer starting with work_package ...
std::memcpy(send_buffer, workerlist[p].send_addr,
(end_of_wp) * sizeof(double));
// followed by: work_package_size
send_buffer[end_of_wp] = (double)local_work_package_size;
// current iteration of simulation
send_buffer[end_of_wp + 1] = (double)iteration;
// size of timestep in seconds
send_buffer[end_of_wp + 2] = dt;
// current time of simulation (age) in seconds
send_buffer[end_of_wp + 3] = current_sim_time;
// placeholder for work_package_count
send_buffer[end_of_wp + 4] = 0.;
/* ATTENTION Worker p has rank p+1 */
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
TAG_WORK, MPI_COMM_WORLD);
workerlist[p].has_work = 1;
free_workers--;
pkg_to_send -= 1;
}
}
master_send_b = MPI_Wtime();
send_t += master_send_b - master_send_a;
}
void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers) {
int need_to_receive = 1;
double master_recv_a, master_recv_b;
double idle_a, idle_b;
int p, size;
MPI_Status probe_status;
master_recv_a = MPI_Wtime();
while (need_to_receive && pkg_to_recv > 0) {
if (to_send && free_workers > 0)
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive,
&probe_status);
else {
idle_a = MPI_Wtime();
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status);
idle_b = MPI_Wtime();
master_idle += idle_b - idle_a;
}
if (need_to_receive) {
p = probe_status.MPI_SOURCE;
size;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, TAG_WORK,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
workerlist[p - 1].has_work = 0;
pkg_to_recv -= 1;
free_workers++;
}
}
master_recv_b = MPI_Wtime();
recv_t += master_recv_b - master_recv_a;
}
void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
/* visual progress */
double progress = (float)(count_pkgs + 1) / n_wp;
cout << "[";
int pos = barWidth * progress;
for (int iprog = 0; iprog < barWidth; ++iprog) {
if (iprog < pos)
cout << "=";
else if (iprog == pos)
cout << ">";
else
cout << " ";
}
std::cout << "] " << int(progress * 100.0) << " %\r";
std::cout.flush();
/* end visual progress */
}
void ChemMaster::prepareSimulation() {
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
sizeof(double));
R.parseEvalQ(
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
"package_size=work_package_size)");
// we only sort once the vector
R.parseEvalQ("ordered_ids <- order(wp_ids)");
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
mpi_buffer =
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
}
void ChemMaster::finishSimulation() {
free(mpi_buffer);
free(workerlist);
}
double ChemMaster::getSendTime() { return this->send_t; }
double ChemMaster::getRecvTime() { return this->recv_t; }
double ChemMaster::getIdleTime() { return this->master_idle; }
double ChemMaster::getWorkerTime() { return this->worker_t; }
double ChemMaster::getChemMasterTime() { return this->chem_master; }
double ChemMaster::getSeqTime() { return this->seq_t; }

23
src/model/ChemSim.cpp Normal file
View File

@ -0,0 +1,23 @@
#include "ChemSim.h"
#include "../util/RRuntime.h"
#include "Grid.h"
#include <Rcpp.h>
#include <iostream>
#include <mpi.h>
using namespace Rcpp;
using namespace poet;
ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_)
: R(R_), grid(grid_) {
this->world_rank = params->world_rank;
this->world_size = params->world_size;
this->wp_size = params->wp_size;
this->out_dir = params->out_dir;
}
void ChemSim::runSeq() {
R.parseEvalQ(
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
}

115
src/model/ChemSim.h Normal file
View File

@ -0,0 +1,115 @@
#ifndef CHEMSIM_H
#define CHEMSIM_H
#include "../DHT/DHT_Wrapper.h"
#include "../util/RRuntime.h"
#include "../util/SimParams.h"
#include "Grid.h"
#include <mpi.h>
#include <vector>
#define BUFFER_OFFSET 5
#define TAG_WORK 42
#define TAG_FINISH 43
#define TAG_TIMING 44
#define TAG_DHT_PERF 45
#define TAG_DHT_STATS 46
#define TAG_DHT_STORE 47
#define TAG_DHT_ITER 48
namespace poet {
class ChemSim {
public:
ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_);
void runSeq();
protected:
double current_sim_time = 0;
int iteration = 0;
int dt = 0;
int world_rank;
int world_size;
unsigned int wp_size;
RRuntime &R;
Grid &grid;
std::vector<int> wp_sizes_vector;
std::string out_dir;
double *send_buffer;
typedef struct {
char has_work;
double *send_addr;
} worker_struct;
worker_struct *workerlist;
double *mpi_buffer;
};
class ChemMaster : public ChemSim {
public:
ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_);
void prepareSimulation();
void finishSimulation();
void runIteration();
double getSendTime();
double getRecvTime();
double getIdleTime();
double getWorkerTime();
double getChemMasterTime();
double getSeqTime();
private:
void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70);
void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers);
void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers);
unsigned int wp_size;
double *work_pointer;
double send_t = 0.f;
double recv_t = 0.f;
double master_idle = 0.f;
double worker_t = 0.f;
double chem_master = 0.f;
double seq_t = 0.f;
};
class ChemWorker : public ChemSim {
public:
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_);
void prepareSimulation(MPI_Comm dht_comm);
void loop();
private:
void doWork(MPI_Status &probe_status);
void postIter();
void finishWork();
void writeFile();
void readFile();
bool dht_enabled;
bool dt_differ;
int dht_snaps;
std::string dht_file;
unsigned int dht_size_per_process;
std::vector<bool> dht_flags;
t_simparams *params;
double *mpi_buffer_results;
DHT_Wrapper *dht;
double timing[3];
double idle_t = 0.f;
int phreeqc_count = 0;
};
} // namespace poet
#endif // CHEMSIM_H

325
src/model/ChemWorker.cpp Normal file
View File

@ -0,0 +1,325 @@
#include <Rcpp.h>
#include <mpi.h>
#include <iostream>
#include <string>
#include "ChemSim.h"
using namespace poet;
using namespace std;
using namespace Rcpp;
ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_)
: params(params_), ChemSim(params_, R_, grid_) {
this->dt_differ = params->dt_differ;
this->dht_enabled = params->dht_enabled;
this->dht_size_per_process = params->dht_size_per_process;
this->dht_file = params->dht_file;
}
void ChemWorker::prepareSimulation(MPI_Comm dht_comm) {
mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
sizeof(double));
mpi_buffer_results =
(double *)calloc(wp_size * (grid.getCols()), sizeof(double));
if (world_rank == 1)
cout << "Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF") << endl;
if (dht_enabled) {
int data_size = grid.getCols() * sizeof(double);
int key_size =
grid.getCols() * sizeof(double) + (dt_differ * sizeof(double));
int dht_buckets_per_process =
dht_size_per_process / (1 + data_size + key_size);
if (world_rank == 1)
cout << "CPP: Worker: data size: " << data_size << " bytes" << endl
<< "CPP: Worker: key size: " << key_size << "bytes" << endl
<< "CPP: Worker: buckets per process " << dht_buckets_per_process
<< endl;
dht = new DHT_Wrapper(params, dht_comm, dht_buckets_per_process, data_size,
key_size);
if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl;
}
if (!dht_file.empty()) readFile();
// set size
dht_flags.resize(params->wp_size, true);
// assign all elements to true (default)
dht_flags.assign(params->wp_size, true);
timing[0] = 0.0;
timing[1] = 0.0;
timing[2] = 0.0;
}
void ChemWorker::loop() {
MPI_Status probe_status;
while (1) {
double idle_a = MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
double idle_b = MPI_Wtime();
if (probe_status.MPI_TAG == TAG_WORK) {
idle_t += idle_b - idle_a;
doWork(probe_status);
} else if (probe_status.MPI_TAG == TAG_FINISH) {
finishWork();
break;
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
postIter();
}
}
}
void ChemWorker::doWork(MPI_Status &probe_status) {
int count;
int local_work_package_size = 0;
double dht_get_start, dht_get_end;
double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end;
/* get number of doubles sent */
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
/* receive */
MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
// decrement count of work_package by BUFFER_OFFSET
count -= BUFFER_OFFSET;
// check for changes on all additional variables given by the 'header' of
// mpi_buffer
// work_package_size
if (mpi_buffer[count] != local_work_package_size) { // work_package_size
local_work_package_size = mpi_buffer[count];
R["work_package_size"] = local_work_package_size;
R.parseEvalQ("mysetup$work_package_size <- work_package_size");
}
// current iteration of simulation
if (mpi_buffer[count + 1] != iteration) {
iteration = mpi_buffer[count + 1];
R["iter"] = iteration;
R.parseEvalQ("mysetup$iter <- iter");
}
// current timestep size
if (mpi_buffer[count + 2] != dt) {
dt = mpi_buffer[count + 2];
R["dt"] = dt;
R.parseEvalQ("mysetup$dt <- dt");
}
// current simulation time ('age' of simulation)
if (mpi_buffer[count + 3] != current_sim_time) {
current_sim_time = mpi_buffer[count + 3];
R["simulation_time"] = current_sim_time;
R.parseEvalQ("mysetup$simulation_time <- simulation_time");
}
/* 4th double value is currently a placeholder */
// if (mpi_buffer[count+4] != placeholder) {
// placeholder = mpi_buffer[count+4];
// R["mysetup$placeholder"] = placeholder;
// }
/* get df with right structure to fill in work package */
// R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
// R["skeleton"] = grid.buildDataFrame(work_package_size);
//// R.parseEval("print(rownames(tmp2)[1:5])");
//// R.parseEval("print(head(tmp2, 2))");
//// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
// R.setBufferDataFrame("skeleton");
if (dht_enabled) {
// DEBUG
// cout << "RANK " << world_rank << " start checking DHT\n";
// resize helper vector dht_flags of work_package_size changes
if ((int)dht_flags.size() != local_work_package_size) {
dht_flags.resize(local_work_package_size, true); // set size
dht_flags.assign(local_work_package_size,
true); // assign all elements to true (default)
}
dht_get_start = MPI_Wtime();
dht->checkDHT(local_work_package_size, dht_flags, mpi_buffer, dt);
dht_get_end = MPI_Wtime();
// DEBUG
// cout << "RANK " << world_rank << " checking DHT complete \n";
R["dht_flags"] = as<LogicalVector>(wrap(dht_flags));
// R.parseEvalQ("print(head(dht_flags))");
}
/* work */
// R.from_C_domain(mpi_buffer);
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
// R["work_package_full"] = R.getBufferDataFrame();
// R["work_package"] = buffer;
// DEBUG
// R.parseEvalQ("print(head(work_package_full))");
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
grid.importWP(mpi_buffer, params->wp_size);
if (params->dht_enabled) {
R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
} else {
R.parseEvalQ("work_package <- work_package_full");
}
// DEBUG
// R.parseEvalQ("print(head(work_package),2)");
// R.parseEvalQ("rownames(work_package) <- work_package$id");
// R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in%
// colnames(work_package)"); R.parseEvalQ("id_store <-
// rownames(work_package)"); //"[, ncol(work_package)]");
// R.parseEvalQ("work_package$id <- NULL");
R.parseEvalQ("work_package <- as.matrix(work_package)");
unsigned int nrows = R.parseEval("nrow(work_package)");
if (nrows > 0) {
/*Single Line error Workaround*/
if (nrows <= 1) {
// duplicate line to enable correct simmulation
R.parseEvalQ(
"work_package <- work_package[rep(1:nrow(work_package), "
"times = 2), ]");
}
phreeqc_count++;
phreeqc_time_start = MPI_Wtime();
// MDL
// R.parseEvalQ("print('Work_package:\n'); print(head(work_package ,
// 2)); cat('RCpp: worker_function:', local_rank, ' \n')");
R.parseEvalQ(
"result <- as.data.frame(slave_chemistry(setup=mysetup, "
"data = work_package))");
phreeqc_time_end = MPI_Wtime();
// R.parseEvalQ("result$id <- id_store");
} else {
// cout << "Work-Package is empty, skipping phreeqc!" << endl;
}
if (dht_enabled) {
R.parseEvalQ("result_full <- work_package_full");
if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result");
} else {
R.parseEvalQ("result_full <- result");
}
// R.setBufferDataFrame("result_full");
////Rcpp::DataFrame result = R.parseEval("result_full");
////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
// R.to_C_domain(mpi_buffer_results);
grid.exportWP(mpi_buffer_results);
/* send results to master */
MPI_Request send_req;
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
&send_req);
if (dht_enabled) {
dht_fill_start = MPI_Wtime();
dht->fillDHT(local_work_package_size, dht_flags, mpi_buffer,
mpi_buffer_results, dt);
dht_fill_end = MPI_Wtime();
timing[1] += dht_get_end - dht_get_start;
timing[2] += dht_fill_end - dht_fill_start;
}
timing[0] += phreeqc_time_end - phreeqc_time_start;
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
}
void ChemWorker::postIter() {
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_ITER, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
if (dht_enabled) {
dht->printStatistics();
if (dht_snaps == 2) {
writeFile();
}
}
// synchronize all processes
MPI_Barrier(MPI_COMM_WORLD);
}
void ChemWorker::writeFile() {
std::stringstream out;
out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht";
int res = dht->tableToFile(out.str().c_str());
if (res != DHT_SUCCESS && world_rank == 2)
cerr << "CPP: Worker: Errir in writing current state of DHT to file."
<< endl;
else if (world_rank == 2)
cout << "CPP: Worker: Successfully written DHT to file " << out.str()
<< endl;
}
void ChemWorker::readFile() {
int res = dht->fileToTable((char *)dht_file.c_str());
if (res != DHT_SUCCESS) {
if (res == DHT_WRONG_FILE) {
if (world_rank == 1)
cerr << "CPP: Worker: Wrong file layout! Continue with empty DHT ..."
<< endl;
} else {
if (world_rank == 1)
cerr << "CPP: Worker: Error in loading current state of DHT from "
"file. Continue with empty DHT ..."
<< endl;
}
} else {
if (world_rank == 2)
cout << "CPP: Worker: Successfully loaded state of DHT from file "
<< dht_file << endl;
std::cout.flush();
}
}
void ChemWorker::finishWork() {
if (dht_enabled && dht_snaps > 0) writeFile();
double dht_perf[3];
/* before death, submit profiling/timings to master*/
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
// timings
MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
MPI_Send(&idle_t, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
if (dht_enabled) {
// dht_perf
dht_perf[0] = dht->getHits();
dht_perf[1] = dht->getMisses();
dht_perf[2] = dht->getEvictions();
MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF,
MPI_COMM_WORLD);
}
free(mpi_buffer);
free(mpi_buffer_results);
delete dht;
}