mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-15 20:38:23 +01:00
Substitute ChemSim
This commit is contained in:
parent
4eb4702cff
commit
1d9c1d5710
8
.gitignore
vendored
8
.gitignore
vendored
@ -137,12 +137,14 @@ vignettes/*.pdf
|
||||
# R package: bookdown caching files
|
||||
/*_files/
|
||||
|
||||
### vscode ###
|
||||
### VisualStudioCode ###
|
||||
.vscode/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/tasks.json
|
||||
!.vscode/launch.json
|
||||
!.vscode/extensions.json
|
||||
*.code-workspace
|
||||
|
||||
### VisualStudioCode Patch ###
|
||||
# Ignore all local history of files
|
||||
.history
|
||||
.ionide
|
||||
# End of https://www.toptal.com/developers/gitignore/api/c,c++,r,cmake
|
||||
|
||||
@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS})
|
||||
|
||||
#define program libraries
|
||||
|
||||
add_library(POET_Libs STATIC util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp model/Grid.cpp)
|
||||
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp)
|
||||
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
|
||||
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)
|
||||
|
||||
|
||||
423
src/DHT/DHT.cpp
Normal file
423
src/DHT/DHT.cpp
Normal file
@ -0,0 +1,423 @@
|
||||
#include "DHT.h"
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
#include <stdio.h>
|
||||
|
||||
/*
|
||||
* determining destination rank and index by hash of key
|
||||
*
|
||||
* return values by reference
|
||||
*/
|
||||
static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) {
|
||||
uint64_t tmp;
|
||||
int char_hop = 9-index_count;
|
||||
unsigned int i;
|
||||
for (i = 0; i < index_count ; i++) {
|
||||
tmp = 0;
|
||||
memcpy(&tmp,(unsigned char *)&hash+i, char_hop);
|
||||
index[i] = (unsigned int) (tmp % table_size);
|
||||
}
|
||||
*dest_rank = (unsigned int) (hash % comm_size);
|
||||
}
|
||||
|
||||
/**
|
||||
* set write flag to 1
|
||||
*/
|
||||
static void set_flag(char* flag_byte) {
|
||||
*flag_byte = 0;
|
||||
*flag_byte |= (1 << 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* return 1 if write flag is set
|
||||
* else 0
|
||||
*/
|
||||
static int read_flag(char flag_byte) {
|
||||
if ((flag_byte & 0x01) == 0x01) {
|
||||
return 1;
|
||||
} else return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* allocating memory for DHT object and buckets.
|
||||
* creating MPI window for OSC
|
||||
* filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use
|
||||
* return DHT object
|
||||
*/
|
||||
DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) {
|
||||
DHT *object;
|
||||
MPI_Win window;
|
||||
void* mem_alloc;
|
||||
int comm_size, tmp;
|
||||
|
||||
tmp = (int) ceil(log2(size));
|
||||
if (tmp%8 != 0) tmp = tmp + (8-(tmp%8));
|
||||
|
||||
object = (DHT*) malloc(sizeof(DHT));
|
||||
if (object == NULL) return NULL;
|
||||
|
||||
//every memory allocation has 1 additional byte for flags etc.
|
||||
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL;
|
||||
if (MPI_Comm_size(comm, &comm_size) != 0) return NULL;
|
||||
|
||||
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
|
||||
|
||||
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL;
|
||||
|
||||
object->data_size = data_size;
|
||||
object->key_size = key_size;
|
||||
object->table_size = size;
|
||||
object->window = window;
|
||||
object->hash_func = hash_func;
|
||||
object->comm_size = comm_size;
|
||||
object->communicator = comm;
|
||||
object->read_misses = 0;
|
||||
object->collisions = 0;
|
||||
object->recv_entry = malloc(1 + data_size + key_size);
|
||||
object->send_entry = malloc(1 + data_size + key_size);
|
||||
object->index_count = 9-(tmp/8);
|
||||
object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int));
|
||||
object->mem_alloc = mem_alloc;
|
||||
|
||||
DHT_stats *stats;
|
||||
|
||||
stats = (DHT_stats*) malloc(sizeof(DHT_stats));
|
||||
if (stats == NULL) return NULL;
|
||||
|
||||
object->stats = stats;
|
||||
object->stats->writes_local = (int*) calloc(comm_size, sizeof(int));
|
||||
object->stats->old_writes = 0;
|
||||
object->stats->read_misses = 0;
|
||||
object->stats->collisions = 0;
|
||||
object->stats->w_access = 0;
|
||||
object->stats->r_access = 0;
|
||||
|
||||
return object;
|
||||
}
|
||||
|
||||
/*
|
||||
* puts passed by data with key to DHT
|
||||
*
|
||||
* returning DHT_MPI_ERROR = -1 if MPI error occurred
|
||||
* else DHT_SUCCESS = 0 if success
|
||||
*/
|
||||
int DHT_write(DHT *table, void* send_key, void* send_data) {
|
||||
unsigned int dest_rank, i;
|
||||
int result = DHT_SUCCESS;
|
||||
|
||||
table->stats->w_access++;
|
||||
|
||||
//determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
|
||||
|
||||
//concatenating key with data to write entry to DHT
|
||||
set_flag((char *) table->send_entry);
|
||||
memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size);
|
||||
memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size);
|
||||
|
||||
//locking window of target rank with exclusive lock
|
||||
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
for (i = 0; i < table->index_count; i++)
|
||||
{
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
//increment collision counter if receiving key doesn't match sending key
|
||||
//,entry has write flag + last index is reached
|
||||
|
||||
if (read_flag(*(char *)table->recv_entry)) {
|
||||
if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) {
|
||||
if (i == (table->index_count)-1) {
|
||||
table->collisions += 1;
|
||||
table->stats->collisions += 1;
|
||||
result = DHT_WRITE_SUCCESS_WITH_COLLISION;
|
||||
break;
|
||||
}
|
||||
} else break;
|
||||
} else {
|
||||
table->stats->writes_local[dest_rank]++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//put data to DHT
|
||||
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
|
||||
//unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* gets data from the DHT by key
|
||||
*
|
||||
* return DHT_SUCCESS = 0 if success
|
||||
* DHT_MPI_ERROR = -1 if MPI error occurred
|
||||
* DHT_READ_ERROR = -2 if receiving key doesn't match sending key
|
||||
*/
|
||||
int DHT_read(DHT *table, void* send_key, void* destination) {
|
||||
unsigned int dest_rank, i;
|
||||
|
||||
table->stats->r_access++;
|
||||
|
||||
//determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
|
||||
|
||||
//locking window of target rank with shared lock
|
||||
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR;
|
||||
//receive data
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
//increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached
|
||||
//else copy data to dereference of passed by destination pointer
|
||||
|
||||
if ((read_flag(*(char *) table->recv_entry)) == 0) {
|
||||
table->read_misses += 1;
|
||||
table->stats->read_misses += 1;
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
return DHT_READ_ERROR;
|
||||
}
|
||||
|
||||
if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) {
|
||||
if (i == (table->index_count)-1) {
|
||||
table->read_misses += 1;
|
||||
table->stats->read_misses += 1;
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
return DHT_READ_ERROR;
|
||||
}
|
||||
} else break;
|
||||
}
|
||||
|
||||
//unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_to_file(DHT* table, const char* filename) {
|
||||
//open file
|
||||
MPI_File file;
|
||||
if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
//write header (key_size and data_size)
|
||||
if (rank == 0) {
|
||||
if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
|
||||
if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
|
||||
if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
char* ptr;
|
||||
int bucket_size = table->key_size + table->data_size + 1;
|
||||
|
||||
//iterate over local memory
|
||||
for (unsigned int i = 0; i < table->table_size; i++) {
|
||||
ptr = (char *) table->mem_alloc + (i * bucket_size);
|
||||
//if bucket has been written to (checked by written_flag)...
|
||||
if (read_flag(*ptr)) {
|
||||
//write key and data to file
|
||||
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
}
|
||||
//close file
|
||||
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_from_file(DHT* table, const char* filename) {
|
||||
MPI_File file;
|
||||
MPI_Offset f_size;
|
||||
int e_size, m_size, cur_pos, rank, offset;
|
||||
char* buffer;
|
||||
void* key;
|
||||
void* data;
|
||||
|
||||
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
e_size = table->key_size + table->data_size;
|
||||
m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE;
|
||||
buffer = (char *) malloc(m_size);
|
||||
|
||||
if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
|
||||
|
||||
if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE;
|
||||
if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE;
|
||||
|
||||
offset = e_size*table->comm_size;
|
||||
|
||||
if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
|
||||
cur_pos = DHT_HEADER_SIZE + (rank * e_size);
|
||||
|
||||
while(cur_pos < f_size) {
|
||||
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
|
||||
MPI_Offset tmp;
|
||||
MPI_File_get_position(file, &tmp);
|
||||
if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
|
||||
key = buffer;
|
||||
data = (buffer+table->key_size);
|
||||
if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR;
|
||||
|
||||
cur_pos += offset;
|
||||
}
|
||||
|
||||
free (buffer);
|
||||
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* frees up memory and accumulate counter
|
||||
*/
|
||||
int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) {
|
||||
int buf;
|
||||
|
||||
if (collision_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
*collision_counter = buf;
|
||||
}
|
||||
if (readerror_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
*readerror_counter = buf;
|
||||
}
|
||||
if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR;
|
||||
free(table->recv_entry);
|
||||
free(table->send_entry);
|
||||
free(table->index);
|
||||
|
||||
free(table->stats->writes_local);
|
||||
free(table->stats);
|
||||
|
||||
free(table);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* prints a table with statistics about current use of DHT
|
||||
* for each participating process and summed up results containing:
|
||||
* 1. occupied buckets (in respect to the memory of this process)
|
||||
* 2. free buckets (in respect to the memory of this process)
|
||||
* 3. calls of DHT_write (w_access)
|
||||
* 4. calls of DHT_read (r_access)
|
||||
* 5. read misses (see DHT_READ_ERROR)
|
||||
* 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION)
|
||||
* 3-6 will reset with every call of this function
|
||||
* finally the amount of new written entries is printed out (in relation to last call of this funtion)
|
||||
*/
|
||||
int DHT_print_statistics(DHT *table) {
|
||||
int *written_buckets;
|
||||
int *read_misses, sum_read_misses;
|
||||
int *collisions, sum_collisions;
|
||||
int sum_w_access, sum_r_access, *w_access, *r_access;
|
||||
int rank;
|
||||
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
//disable possible warning of unitialized variable, which is not the case
|
||||
//since we're using MPI_Gather to obtain all values only on rank 0
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
|
||||
//obtaining all values from all processes in the communicator
|
||||
if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->read_misses = 0;
|
||||
|
||||
if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->collisions = 0;
|
||||
|
||||
if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->w_access = 0;
|
||||
|
||||
if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->r_access = 0;
|
||||
|
||||
if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int));
|
||||
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
if (rank == 0) { //only process with rank 0 will print out results as a table
|
||||
int sum_written_buckets = 0;
|
||||
|
||||
for (int i=0; i < table->comm_size; i++) {
|
||||
sum_written_buckets += written_buckets[i];
|
||||
}
|
||||
|
||||
int members = 7;
|
||||
int padsize = (members*12)+1;
|
||||
char pad[padsize+1];
|
||||
|
||||
memset(pad, '-', padsize*sizeof(char));
|
||||
pad[padsize]= '\0';
|
||||
printf("\n");
|
||||
printf("%-35s||resets with every call of this function\n", " ");
|
||||
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n",
|
||||
"rank",
|
||||
"occupied",
|
||||
"free",
|
||||
"w_access",
|
||||
"r_access",
|
||||
"read misses",
|
||||
"collisions");
|
||||
printf("%s\n", pad);
|
||||
for (int i = 0; i < table->comm_size; i++) {
|
||||
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
|
||||
i,
|
||||
written_buckets[i],
|
||||
table->table_size-written_buckets[i],
|
||||
w_access[i],
|
||||
r_access[i],
|
||||
read_misses[i],
|
||||
collisions[i]);
|
||||
}
|
||||
printf("%s\n", pad);
|
||||
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
|
||||
"sum",
|
||||
sum_written_buckets,
|
||||
(table->table_size*table->comm_size)-sum_written_buckets,
|
||||
sum_w_access,
|
||||
sum_r_access,
|
||||
sum_read_misses,
|
||||
sum_collisions);
|
||||
|
||||
printf("%s\n", pad);
|
||||
printf("%s %d\n",
|
||||
"new entries:",
|
||||
sum_written_buckets - table->stats->old_writes);
|
||||
|
||||
printf("\n");
|
||||
fflush(stdout);
|
||||
|
||||
table->stats->old_writes = sum_written_buckets;
|
||||
}
|
||||
|
||||
//enable warning again
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
112
src/DHT/DHT.h
Normal file
112
src/DHT/DHT.h
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
* File: DHT.h
|
||||
* Author: max luebke
|
||||
*
|
||||
* Created on 16. November 2017, 09:14
|
||||
*/
|
||||
|
||||
#ifndef DHT_H
|
||||
#define DHT_H
|
||||
|
||||
#include <mpi.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#define DHT_MPI_ERROR -1
|
||||
#define DHT_READ_ERROR -2
|
||||
#define DHT_SUCCESS 0
|
||||
#define DHT_WRITE_SUCCESS_WITH_COLLISION 1
|
||||
|
||||
#define DHT_WRONG_FILE 11
|
||||
#define DHT_FILE_IO_ERROR 12
|
||||
#define DHT_FILE_READ_ERROR 13
|
||||
#define DHT_FILE_WRITE_ERROR 14
|
||||
|
||||
#define DHT_HEADER_SIZE 8
|
||||
|
||||
typedef struct {;
|
||||
int *writes_local, old_writes;
|
||||
int read_misses, collisions;
|
||||
int w_access, r_access;
|
||||
} DHT_stats;
|
||||
|
||||
typedef struct {
|
||||
MPI_Win window;
|
||||
int data_size;
|
||||
int key_size;
|
||||
unsigned int table_size;
|
||||
MPI_Comm communicator;
|
||||
int comm_size;
|
||||
uint64_t(*hash_func) (int, void*);
|
||||
void* recv_entry;
|
||||
void* send_entry;
|
||||
void* mem_alloc;
|
||||
int read_misses;
|
||||
int collisions;
|
||||
unsigned int *index;
|
||||
unsigned int index_count;
|
||||
DHT_stats *stats;
|
||||
} DHT;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* MPI_Comm comm - communicator of processes that are holding the DHT
|
||||
* int size_per_process - number of buckets each process will create
|
||||
* int data_size - size of data in bytes
|
||||
* int key_size - size of key in bytes
|
||||
* *hash_func - pointer to hashfunction
|
||||
*
|
||||
* return:
|
||||
* NULL if error during initialization
|
||||
* DHT* if success
|
||||
*/
|
||||
extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*));
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
* void* data - pointer to data
|
||||
* void* - pointer to key
|
||||
*
|
||||
* return:
|
||||
* error value (see above)
|
||||
*/
|
||||
extern int DHT_write(DHT *table, void* key, void* data);
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
* void* key - pointer to key
|
||||
* void* destination - pointer which will hold the resulting data from DHT
|
||||
*
|
||||
* return:
|
||||
* error value (see above)
|
||||
*/
|
||||
extern int DHT_read(DHT *table, void* key, void* destination);
|
||||
|
||||
extern int DHT_to_file(DHT *table, const char* filename);
|
||||
|
||||
extern int DHT_from_file(DHT *table, const char* filename);
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
* int* collision_counter - pointer which will hold the total count of collisions
|
||||
* int* readerror_counter - pointer which will hold the total count of read errors
|
||||
*
|
||||
* return:
|
||||
* error value (see above)
|
||||
*/
|
||||
extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter);
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
*
|
||||
* return:
|
||||
* error value (DHT_SUCCESS or DHT_MPI_ERROR)
|
||||
*/
|
||||
extern int DHT_print_statistics(DHT *table);
|
||||
|
||||
#endif /* DHT_H */
|
||||
165
src/DHT/DHT_Wrapper.cpp
Normal file
165
src/DHT/DHT_Wrapper.cpp
Normal file
@ -0,0 +1,165 @@
|
||||
#include "DHT_Wrapper.h"
|
||||
#include "DHT.h"
|
||||
#include <openssl/md5.h>
|
||||
#include <iostream>
|
||||
|
||||
using namespace poet;
|
||||
using namespace std;
|
||||
uint64_t get_md5(int key_size, void *key) {
|
||||
MD5_CTX ctx;
|
||||
unsigned char sum[MD5_DIGEST_LENGTH];
|
||||
uint64_t retval, *v1, *v2;
|
||||
|
||||
MD5_Init(&ctx);
|
||||
MD5_Update(&ctx, key, key_size);
|
||||
MD5_Final(sum, &ctx);
|
||||
|
||||
v1 = (uint64_t *)&sum[0];
|
||||
v2 = (uint64_t *)&sum[8];
|
||||
retval = *v1 ^ *v2;
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
DHT_Wrapper::DHT_Wrapper(t_simparams *params, MPI_Comm dht_comm,
|
||||
int buckets_per_process, int data_size, int key_size) {
|
||||
dht_object =
|
||||
DHT_create(dht_comm, buckets_per_process, data_size, key_size, &get_md5);
|
||||
fuzzing_buffer = (double *)malloc(key_size);
|
||||
|
||||
this->dt_differ = params->dt_differ;
|
||||
this->dht_log = params->dht_log;
|
||||
this->dht_signif_vector = params->dht_signif_vector;
|
||||
this->dht_prop_type_vector = params->dht_prop_type_vector;
|
||||
}
|
||||
|
||||
DHT_Wrapper::~DHT_Wrapper() {
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
free(fuzzing_buffer);
|
||||
}
|
||||
|
||||
void DHT_Wrapper::checkDHT(int length, std::vector<bool> &out_result_index,
|
||||
double *work_package, double dt) {
|
||||
void *key;
|
||||
int res;
|
||||
int var_count = dht_prop_type_vector.size();
|
||||
for (int i = 0; i < length; i++) {
|
||||
key = (void *)&(work_package[i * var_count]);
|
||||
|
||||
// fuzz data (round, logarithm etc.)
|
||||
fuzzForDHT(var_count, key, dt);
|
||||
|
||||
// overwrite input with data from DHT, IF value is found in DHT
|
||||
res = DHT_read(dht_object, fuzzing_buffer, key);
|
||||
|
||||
if (res == DHT_SUCCESS) {
|
||||
// flag that this line is replaced by DHT-value, do not simulate!!
|
||||
out_result_index[i] = false;
|
||||
dht_hits++;
|
||||
} else if (res == DHT_READ_ERROR) {
|
||||
// this line is untouched, simulation is needed
|
||||
out_result_index[i] = true;
|
||||
dht_miss++;
|
||||
} else {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DHT_Wrapper::fillDHT(int length, std::vector<bool> &result_index,
|
||||
double *work_package, double *results, double dt) {
|
||||
void *key;
|
||||
void *data;
|
||||
int res;
|
||||
int var_count = dht_prop_type_vector.size();
|
||||
|
||||
for (int i = 0; i < length; i++) {
|
||||
key = (void *)&(work_package[i * var_count]);
|
||||
data = (void *)&(results[i * var_count]);
|
||||
|
||||
if (result_index[i]) {
|
||||
// If true -> was simulated, needs to be inserted into dht
|
||||
|
||||
// fuzz data (round, logarithm etc.)
|
||||
fuzzForDHT(var_count, key, dt);
|
||||
|
||||
res = DHT_write(dht_object, fuzzing_buffer, data);
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) {
|
||||
dht_evictions++;
|
||||
} else {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int DHT_Wrapper::tableToFile(const char *filename) {
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
return res;
|
||||
}
|
||||
|
||||
int DHT_Wrapper::fileToTable(const char *filename) {
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
if (res != DHT_SUCCESS)
|
||||
return res;
|
||||
|
||||
DHT_print_statistics(dht_object);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
void DHT_Wrapper::printStatistics() {
|
||||
int res;
|
||||
|
||||
res = DHT_print_statistics(dht_object);
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t DHT_Wrapper::getHits() { return this->dht_hits; }
|
||||
|
||||
uint64_t DHT_Wrapper::getMisses() { return this->dht_miss; }
|
||||
|
||||
uint64_t DHT_Wrapper::getEvictions() { return this->dht_evictions; }
|
||||
|
||||
void DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
|
||||
unsigned int i = 0;
|
||||
// introduce fuzzing to allow more hits in DHT
|
||||
for (i = 0; i < (unsigned int)var_count; i++) {
|
||||
if (dht_prop_type_vector[i] == "act") {
|
||||
// with log10
|
||||
if (dht_log) {
|
||||
if (((double *)key)[i] < 0)
|
||||
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in "
|
||||
"key!"
|
||||
<< endl;
|
||||
else if (((double *)key)[i] == 0)
|
||||
fuzzing_buffer[i] = 0;
|
||||
else
|
||||
fuzzing_buffer[i] =
|
||||
ROUND(-(std::log10(((double *)key)[i])), dht_signif_vector[i]);
|
||||
} else {
|
||||
// without log10
|
||||
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_signif_vector[i]);
|
||||
}
|
||||
} else if (dht_prop_type_vector[i] == "logact") {
|
||||
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_signif_vector[i]);
|
||||
} else if (dht_prop_type_vector[i] == "ignore") {
|
||||
fuzzing_buffer[i] = 0;
|
||||
} else {
|
||||
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong "
|
||||
"prop_type!"
|
||||
<< endl;
|
||||
}
|
||||
}
|
||||
if (dt_differ)
|
||||
fuzzing_buffer[var_count] = dt;
|
||||
}
|
||||
56
src/DHT/DHT_Wrapper.h
Normal file
56
src/DHT/DHT_Wrapper.h
Normal file
@ -0,0 +1,56 @@
|
||||
#ifndef DHT_WRAPPER_H
|
||||
#define DHT_WRAPPER_H
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "../util/SimParams.h"
|
||||
#include "DHT.h"
|
||||
|
||||
#define ROUND(value, signif) \
|
||||
(((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif))
|
||||
|
||||
uint64_t get_md5(int key_size, void *key);
|
||||
|
||||
namespace poet {
|
||||
class DHT_Wrapper {
|
||||
public:
|
||||
DHT_Wrapper(t_simparams *params, MPI_Comm dht_comm, int buckets_per_process,
|
||||
int data_size, int key_size);
|
||||
~DHT_Wrapper();
|
||||
|
||||
void checkDHT(int length, std::vector<bool> &out_result_index,
|
||||
double *work_package, double dt);
|
||||
void fillDHT(int length, std::vector<bool> &result_index,
|
||||
double *work_package, double *results, double dt);
|
||||
|
||||
int tableToFile(const char *filename);
|
||||
int fileToTable(const char *filename);
|
||||
|
||||
void printStatistics();
|
||||
|
||||
uint64_t getHits();
|
||||
uint64_t getMisses();
|
||||
uint64_t getEvictions();
|
||||
|
||||
private:
|
||||
void fuzzForDHT(int var_count, void *key, double dt);
|
||||
|
||||
DHT *dht_object;
|
||||
|
||||
uint64_t dht_hits = 0;
|
||||
uint64_t dht_miss = 0;
|
||||
uint64_t dht_evictions = 0;
|
||||
|
||||
double *fuzzing_buffer;
|
||||
|
||||
bool dt_differ;
|
||||
bool dht_log;
|
||||
std::vector<int> dht_signif_vector;
|
||||
std::vector<std::string> dht_prop_type_vector;
|
||||
};
|
||||
} // namespace poet
|
||||
|
||||
#endif // DHT_WRAPPER_H
|
||||
472
src/kin.cpp
472
src/kin.cpp
@ -1,20 +1,22 @@
|
||||
#include <Rcpp.h>
|
||||
#include <mpi.h> // mpi header file
|
||||
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <Rcpp.h>
|
||||
|
||||
#include <mpi.h> // mpi header file
|
||||
|
||||
#include "DHT.h" // MPI-DHT Implementation
|
||||
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
|
||||
#include "dht_wrapper.h"
|
||||
#include "global_buffer.h"
|
||||
// #include "DHT.h" // MPI-DHT Implementation
|
||||
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
|
||||
// #include "dht_wrapper.h"
|
||||
// #include "global_buffer.h"
|
||||
#include "model/ChemSim.h"
|
||||
#include "model/Grid.h"
|
||||
#include "util/RRuntime.h"
|
||||
#include "util/SimParams.h"
|
||||
#include "worker.h"
|
||||
// #include "worker.h"
|
||||
|
||||
#define DHT_SIZE_PER_PROCESS 1073741824
|
||||
|
||||
using namespace std;
|
||||
using namespace poet;
|
||||
@ -59,8 +61,7 @@ std::list<std::string> checkOptions(argh::parser cmdl) {
|
||||
std::set<std::string> plist = paramList();
|
||||
|
||||
for (auto &flag : cmdl.flags()) {
|
||||
if (!(flist.find(flag) != flist.end()))
|
||||
retList.push_back(flag);
|
||||
if (!(flist.find(flag) != flist.end())) retList.push_back(flag);
|
||||
}
|
||||
|
||||
for (auto ¶m : cmdl.params()) {
|
||||
@ -85,7 +86,7 @@ int main(int argc, char *argv[]) {
|
||||
double cummul_workers = 0.f;
|
||||
double cummul_chemistry_master = 0.f;
|
||||
|
||||
double cummul_master_seq_pre_loop = 0.f;
|
||||
double cummul_master_seq = 0.f;
|
||||
double cummul_master_seq_loop = 0.f;
|
||||
double master_idle = 0.f;
|
||||
|
||||
@ -121,8 +122,7 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
// make a list of processes in the new communicator
|
||||
process_ranks = (int *)malloc(params.world_size * sizeof(int));
|
||||
for (int I = 1; I < params.world_size; I++)
|
||||
process_ranks[I - 1] = I;
|
||||
for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I;
|
||||
|
||||
// get the group under MPI_COMM_WORLD
|
||||
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
|
||||
@ -130,7 +130,7 @@ int main(int argc, char *argv[]) {
|
||||
MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group);
|
||||
// create the new communicator
|
||||
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
|
||||
free(process_ranks); // cleanup
|
||||
free(process_ranks); // cleanup
|
||||
// cout << "Done";
|
||||
|
||||
if (cmdl[{"help", "h"}]) {
|
||||
@ -226,21 +226,24 @@ int main(int argc, char *argv[]) {
|
||||
R["local_rank"] = params.world_rank;
|
||||
|
||||
/*Loading Dependencies*/
|
||||
std::string r_load_dependencies = "suppressMessages(library(Rmufits));"
|
||||
"suppressMessages(library(RedModRphree));"
|
||||
"source('kin_r_library.R');"
|
||||
"source('parallel_r_library.R');";
|
||||
std::string r_load_dependencies =
|
||||
"suppressMessages(library(Rmufits));"
|
||||
"suppressMessages(library(RedModRphree));"
|
||||
"source('kin_r_library.R');"
|
||||
"source('parallel_r_library.R');";
|
||||
R.parseEvalQ(r_load_dependencies);
|
||||
|
||||
std::string filesim;
|
||||
cmdl(1) >> filesim; // <- first positional argument
|
||||
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
|
||||
R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns
|
||||
cmdl(1) >> filesim; // <- first positional argument
|
||||
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
|
||||
R.parseEvalQ(
|
||||
"source(filesim)"); // eval the init string, ignoring any returns
|
||||
|
||||
if (params.world_rank ==
|
||||
0) { // only rank 0 initializes goes through the whole initialization
|
||||
cmdl(2) >> params.out_dir; // <- second positional argument
|
||||
R["fileout"] = wrap(params.out_dir); // assign a char* (string) to 'fileout'
|
||||
0) { // only rank 0 initializes goes through the whole initialization
|
||||
cmdl(2) >> params.out_dir; // <- second positional argument
|
||||
R["fileout"] =
|
||||
wrap(params.out_dir); // assign a char* (string) to 'fileout'
|
||||
|
||||
// Note: R::sim_init() checks if the directory already exists,
|
||||
// if not it makes it
|
||||
@ -253,9 +256,9 @@ int main(int argc, char *argv[]) {
|
||||
R.parseEval(master_init_code);
|
||||
|
||||
params.dt_differ =
|
||||
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
|
||||
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
|
||||
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
||||
} else { // workers will only read the setup DataFrame defined by input file
|
||||
} else { // workers will only read the setup DataFrame defined by input file
|
||||
R.parseEval("mysetup <- setup");
|
||||
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
||||
}
|
||||
@ -280,15 +283,15 @@ int main(int argc, char *argv[]) {
|
||||
R["work_package_size"] = params.wp_size;
|
||||
|
||||
// Removed additional field for ID in previous versions
|
||||
if (params.world_rank == 0) {
|
||||
mpi_buffer =
|
||||
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
||||
} else {
|
||||
mpi_buffer = (double *)calloc(
|
||||
(params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
||||
mpi_buffer_results =
|
||||
(double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
|
||||
}
|
||||
// if (params.world_rank == 0) {
|
||||
// mpi_buffer =
|
||||
// (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
||||
// } else {
|
||||
// mpi_buffer = (double *)calloc(
|
||||
// (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
||||
// mpi_buffer_results =
|
||||
// (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
|
||||
// }
|
||||
|
||||
if (params.world_rank == 0) {
|
||||
cout << "CPP: parallel init completed (buffers allocated)!" << endl;
|
||||
@ -302,14 +305,14 @@ int main(int argc, char *argv[]) {
|
||||
if (params.dht_enabled) {
|
||||
// cout << "\nCreating DHT\n";
|
||||
// determine size of dht entries
|
||||
int dht_data_size = grid.getCols() * sizeof(double);
|
||||
int dht_key_size =
|
||||
grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
|
||||
// int dht_data_size = grid.getCols() * sizeof(double);
|
||||
// int dht_key_size =
|
||||
// grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
|
||||
|
||||
// determine bucket count for preset memory usage
|
||||
// bucket size is key + value + 1 byte for status
|
||||
int dht_buckets_per_process =
|
||||
params.dht_size_per_process / (1 + dht_data_size + dht_key_size);
|
||||
// // // determine bucket count for preset memory usage
|
||||
// // // bucket size is key + value + 1 byte for status
|
||||
// int dht_buckets_per_process =
|
||||
// params.dht_size_per_process / (1 + dht_data_size + dht_key_size);
|
||||
|
||||
// MDL : following code moved here from worker.cpp
|
||||
/*Load significance vector from R setup file (or set default)*/
|
||||
@ -317,8 +320,8 @@ int main(int argc, char *argv[]) {
|
||||
if (signif_vector_exists) {
|
||||
params.dht_signif_vector = as<std::vector<int>>(R["signif_vector"]);
|
||||
} else {
|
||||
params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
|
||||
dht_significant_digits);
|
||||
// params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
|
||||
// dht_significant_digits);
|
||||
}
|
||||
|
||||
/*Load property type vector from R setup file (or set default)*/
|
||||
@ -326,22 +329,24 @@ int main(int argc, char *argv[]) {
|
||||
if (prop_type_vector_exists) {
|
||||
params.dht_prop_type_vector = as<std::vector<string>>(R["prop_type"]);
|
||||
} else {
|
||||
params.dht_prop_type_vector.assign(dht_object->key_size / sizeof(double),
|
||||
"act");
|
||||
// params.dht_prop_type_vector.assign(dht_object->key_size /
|
||||
// sizeof(double),
|
||||
// "act");
|
||||
}
|
||||
|
||||
if (params.world_rank == 0) {
|
||||
// print only on master, values are equal on all workes
|
||||
cout << "CPP: dht_data_size: " << dht_data_size << "\n";
|
||||
cout << "CPP: dht_key_size: " << dht_key_size << "\n";
|
||||
cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process
|
||||
<< endl;
|
||||
// // print only on master, values are equal on all workes
|
||||
// cout << "CPP: dht_data_size: " << dht_data_size << "\n";
|
||||
// cout << "CPP: dht_key_size: " << dht_key_size << "\n";
|
||||
// cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process
|
||||
// << endl;
|
||||
|
||||
// MDL: new output on signif_vector and prop_type
|
||||
if (signif_vector_exists) {
|
||||
cout << "CPP: using problem-specific rounding digits: " << endl;
|
||||
R.parseEval("print(data.frame(prop=prop, type=prop_type, "
|
||||
"digits=signif_vector))");
|
||||
R.parseEval(
|
||||
"print(data.frame(prop=prop, type=prop_type, "
|
||||
"digits=signif_vector))");
|
||||
} else {
|
||||
cout << "CPP: using DHT default rounding digits = "
|
||||
<< dht_significant_digits << endl;
|
||||
@ -353,22 +358,22 @@ int main(int argc, char *argv[]) {
|
||||
R["dht_final_proptype"] = params.dht_prop_type_vector;
|
||||
}
|
||||
|
||||
if (params.dht_strategy == 0) {
|
||||
if (params.world_rank != 0) {
|
||||
dht_object = DHT_create(dht_comm, dht_buckets_per_process,
|
||||
dht_data_size, dht_key_size, get_md5);
|
||||
// if (params.dht_strategy == 0) {
|
||||
// if (params.world_rank != 0) {
|
||||
// dht_object = DHT_create(dht_comm, dht_buckets_per_process,
|
||||
// dht_data_size, dht_key_size, get_md5);
|
||||
|
||||
// storing for access from worker and callback functions
|
||||
fuzzing_buffer = (double *)malloc(dht_key_size);
|
||||
}
|
||||
} else {
|
||||
dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process,
|
||||
dht_data_size, dht_key_size, get_md5);
|
||||
}
|
||||
// // storing for access from worker and callback functions
|
||||
// fuzzing_buffer = (double *)malloc(dht_key_size);
|
||||
// }
|
||||
// } else {
|
||||
// dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process,
|
||||
// dht_data_size, dht_key_size, get_md5);
|
||||
// }
|
||||
|
||||
if (params.world_rank == 0) {
|
||||
cout << "CPP: DHT successfully created!" << endl;
|
||||
}
|
||||
// if (params.world_rank == 0) {
|
||||
// cout << "CPP: DHT successfully created!" << endl;
|
||||
// }
|
||||
}
|
||||
|
||||
// MDL: store all parameters
|
||||
@ -380,52 +385,23 @@ int main(int argc, char *argv[]) {
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
|
||||
if (params.world_rank == 0) { /* This is executed by the master */
|
||||
ChemMaster master(¶ms, R, grid);
|
||||
|
||||
Rcpp::NumericVector master_send;
|
||||
Rcpp::NumericVector master_recv;
|
||||
|
||||
sim_a_seq = MPI_Wtime();
|
||||
|
||||
worker_struct *workerlist =
|
||||
(worker_struct *)calloc(params.world_size - 1, sizeof(worker_struct));
|
||||
int need_to_receive;
|
||||
MPI_Status probe_status;
|
||||
double *timings;
|
||||
uint64_t *dht_perfs = NULL;
|
||||
|
||||
int local_work_package_size;
|
||||
|
||||
// a temporary send buffer
|
||||
double *send_buffer;
|
||||
send_buffer = (double *)calloc(
|
||||
(params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
||||
|
||||
// helper variables
|
||||
int iteration;
|
||||
double dt, current_sim_time;
|
||||
|
||||
int n_wp = 1; // holds the actual number of wp which is
|
||||
// computed later in R::distribute_work_packages()
|
||||
std::vector<int> wp_sizes_vector; // vector with the sizes of
|
||||
// each package
|
||||
|
||||
sim_start = MPI_Wtime();
|
||||
|
||||
// Iteration Count is dynamic, retrieving value from R (is only needed by
|
||||
// master for the following loop)
|
||||
uint32_t maxiter = R.parseEval("mysetup$maxiter");
|
||||
|
||||
sim_b_seq = MPI_Wtime();
|
||||
|
||||
cummul_master_seq_pre_loop += sim_b_seq - sim_a_seq;
|
||||
|
||||
/*SIMULATION LOOP*/
|
||||
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
|
||||
sim_a_seq = MPI_Wtime();
|
||||
|
||||
cummul_master_send = 0.f;
|
||||
cummul_master_recv = 0.f;
|
||||
|
||||
cout << "CPP: Evaluating next time step" << endl;
|
||||
R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
|
||||
|
||||
@ -440,201 +416,19 @@ int main(int argc, char *argv[]) {
|
||||
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
|
||||
sim_a_transport = MPI_Wtime();
|
||||
|
||||
if (iter == 1) master.prepareSimulation();
|
||||
|
||||
cout << "CPP: Chemistry" << endl;
|
||||
/*Fallback for sequential execution*/
|
||||
sim_b_chemistry = MPI_Wtime();
|
||||
|
||||
if (params.world_size == 1) {
|
||||
// MDL : the transformation of values into pH and pe
|
||||
// takes now place in master_advection() so the
|
||||
// following line is unneeded
|
||||
// R.parseEvalQ("mysetup$state_T <-
|
||||
// RedModRphree::Act2pH(mysetup$state_T)");
|
||||
R.parseEvalQ(
|
||||
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
|
||||
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
|
||||
master.runSeq();
|
||||
} else { /*send work to workers*/
|
||||
|
||||
// NEW: only in the first iteration we call
|
||||
// R::distribute_work_packages()!!
|
||||
if (iter == 1) {
|
||||
R.parseEvalQ(
|
||||
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
|
||||
"package_size=work_package_size)");
|
||||
|
||||
// we only sort once the vector
|
||||
R.parseEvalQ("ordered_ids <- order(wp_ids)");
|
||||
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
|
||||
n_wp = (int)R.parseEval("length(wp_sizes_vector)");
|
||||
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
|
||||
cout << "CPP: Total number of work packages: " << n_wp << endl;
|
||||
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
|
||||
}
|
||||
|
||||
/* shuffle and extract data
|
||||
MDL: we now apply :Act2pH directly in master_advection
|
||||
*/
|
||||
// R.parseEval("tmp <-
|
||||
// shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)");
|
||||
// Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
|
||||
|
||||
// convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
|
||||
// cout << "CPP: shuffle_field() done" << endl;
|
||||
grid.shuffleAndExport(mpi_buffer);
|
||||
/* send and receive work; this is done by counting
|
||||
* the wp */
|
||||
int pkg_to_send = n_wp;
|
||||
int pkg_to_recv = n_wp;
|
||||
size_t colCount = grid.getCols();
|
||||
int free_workers = params.world_size - 1;
|
||||
double *work_pointer = mpi_buffer;
|
||||
sim_c_chemistry = MPI_Wtime();
|
||||
|
||||
/* visual progress */
|
||||
float progress = 0.0;
|
||||
int barWidth = 70;
|
||||
|
||||
// retrieve data from R runtime
|
||||
iteration = (int)R.parseEval("mysetup$iter");
|
||||
dt = (double)R.parseEval("mysetup$requested_dt");
|
||||
current_sim_time =
|
||||
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
|
||||
|
||||
int count_pkgs = 0;
|
||||
|
||||
sim_b_seq = MPI_Wtime();
|
||||
|
||||
sim_c_chemistry = MPI_Wtime();
|
||||
|
||||
while (pkg_to_recv > 0) // start dispatching work packages
|
||||
{
|
||||
/* visual progress */
|
||||
progress = (float)(count_pkgs + 1) / n_wp;
|
||||
|
||||
cout << "[";
|
||||
int pos = barWidth * progress;
|
||||
for (int iprog = 0; iprog < barWidth; ++iprog) {
|
||||
if (iprog < pos)
|
||||
cout << "=";
|
||||
else if (iprog == pos)
|
||||
cout << ">";
|
||||
else
|
||||
cout << " ";
|
||||
}
|
||||
std::cout << "] " << int(progress * 100.0) << " %\r";
|
||||
std::cout.flush();
|
||||
/* end visual progress */
|
||||
|
||||
if (pkg_to_send > 0) {
|
||||
master_send_a = MPI_Wtime();
|
||||
/*search for free workers and send work*/
|
||||
for (int p = 0; p < params.world_size - 1; p++) {
|
||||
if (workerlist[p].has_work == 0 &&
|
||||
pkg_to_send > 0) /* worker is free */ {
|
||||
|
||||
// to enable different work_package_size, set local copy of
|
||||
// work_package_size to either global work_package size or
|
||||
// remaining 'to_send' packages to_send >= work_package_size ?
|
||||
// local_work_package_size = work_package_size :
|
||||
// local_work_package_size = to_send;
|
||||
|
||||
local_work_package_size = (int)wp_sizes_vector[count_pkgs];
|
||||
count_pkgs++;
|
||||
|
||||
// cout << "CPP: sending pkg n. " << count_pkgs << " with size "
|
||||
// << local_work_package_size << endl;
|
||||
|
||||
/*push pointer forward to next work package, after taking the
|
||||
* current one*/
|
||||
workerlist[p].send_addr = work_pointer;
|
||||
|
||||
int end_of_wp = local_work_package_size * colCount;
|
||||
work_pointer = &(work_pointer[end_of_wp]);
|
||||
|
||||
// fill send buffer starting with work_package ...
|
||||
std::memcpy(send_buffer, workerlist[p].send_addr,
|
||||
(end_of_wp) * sizeof(double));
|
||||
// followed by: work_package_size
|
||||
send_buffer[end_of_wp] = (double)local_work_package_size;
|
||||
// current iteration of simulation
|
||||
send_buffer[end_of_wp + 1] = (double)iteration;
|
||||
// size of timestep in seconds
|
||||
send_buffer[end_of_wp + 2] = dt;
|
||||
// current time of simulation (age) in seconds
|
||||
send_buffer[end_of_wp + 3] = current_sim_time;
|
||||
// placeholder for work_package_count
|
||||
send_buffer[end_of_wp + 4] = 0.;
|
||||
|
||||
/* ATTENTION Worker p has rank p+1 */
|
||||
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE,
|
||||
p + 1, TAG_WORK, MPI_COMM_WORLD);
|
||||
|
||||
workerlist[p].has_work = 1;
|
||||
free_workers--;
|
||||
pkg_to_send -= 1;
|
||||
}
|
||||
}
|
||||
master_send_b = MPI_Wtime();
|
||||
cummul_master_send += master_send_b - master_send_a;
|
||||
}
|
||||
|
||||
/*check if there are results to receive and receive them*/
|
||||
need_to_receive = 1;
|
||||
master_recv_a = MPI_Wtime();
|
||||
while (need_to_receive && pkg_to_recv > 0) {
|
||||
|
||||
if (pkg_to_send > 0 && free_workers > 0)
|
||||
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD,
|
||||
&need_to_receive, &probe_status);
|
||||
else {
|
||||
idle_a = MPI_Wtime();
|
||||
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD,
|
||||
&probe_status);
|
||||
idle_b = MPI_Wtime();
|
||||
master_idle += idle_b - idle_a;
|
||||
}
|
||||
|
||||
if (need_to_receive) {
|
||||
int p = probe_status.MPI_SOURCE;
|
||||
int size;
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
|
||||
MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p,
|
||||
TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||
workerlist[p - 1].has_work = 0;
|
||||
pkg_to_recv -= 1;
|
||||
free_workers++;
|
||||
}
|
||||
}
|
||||
master_recv_b = MPI_Wtime();
|
||||
cummul_master_recv += master_recv_b - master_recv_a;
|
||||
}
|
||||
|
||||
sim_c_seq = MPI_Wtime();
|
||||
|
||||
// don't overwrite last progress
|
||||
cout << endl;
|
||||
|
||||
sim_d_chemistry = MPI_Wtime();
|
||||
cummul_workers += sim_d_chemistry - sim_c_chemistry;
|
||||
|
||||
// convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
|
||||
// R.from_C_domain(mpi_buffer);
|
||||
|
||||
// R["chemistry_data"] = R.getBufferDataFrame();
|
||||
|
||||
///* unshuffle results */
|
||||
// R.parseEval("result <- unshuffle_field(chemistry_data,
|
||||
// ordered_ids)");
|
||||
|
||||
grid.importAndUnshuffle(mpi_buffer);
|
||||
/* do master stuff */
|
||||
sim_e_chemistry = MPI_Wtime();
|
||||
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
|
||||
sim_f_chemistry = MPI_Wtime();
|
||||
cummul_chemistry_master += sim_f_chemistry - sim_e_chemistry;
|
||||
master.runIteration();
|
||||
}
|
||||
sim_a_chemistry = MPI_Wtime();
|
||||
|
||||
double master_seq_a = MPI_Wtime();
|
||||
// MDL master_iteration_end just writes on disk state_T and
|
||||
// state_C after every iteration if the cmdline option
|
||||
// --ignore-results is not given (and thus the R variable
|
||||
@ -649,48 +443,22 @@ int main(int argc, char *argv[]) {
|
||||
<< endl
|
||||
<< endl;
|
||||
|
||||
if (params.dht_enabled) {
|
||||
for (int i = 1; i < params.world_size; i++) {
|
||||
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD);
|
||||
}
|
||||
double master_seq_b = MPI_Wtime();
|
||||
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
cummul_master_seq += master.getSeqTime() + (master_seq_b - master_seq_a);
|
||||
master_send.push_back(master.getSendTime(), "it_" + to_string(iter));
|
||||
master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter));
|
||||
|
||||
if (params.dht_snaps == 2) {
|
||||
std::stringstream outfile;
|
||||
outfile << params.out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(3) << iter << ".dht";
|
||||
for (int i = 1; i < params.world_size; i++) {
|
||||
MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i,
|
||||
TAG_DHT_STORE, MPI_COMM_WORLD);
|
||||
}
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
}
|
||||
for (int i = 1; i < params.world_size; i++) {
|
||||
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
|
||||
}
|
||||
|
||||
sim_d_seq = MPI_Wtime();
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
|
||||
cummul_master_seq_loop +=
|
||||
((sim_b_seq - sim_a_seq) - (sim_a_transport - sim_b_transport)) +
|
||||
(sim_d_seq - sim_c_seq);
|
||||
master_send.push_back(cummul_master_send, "it_" + to_string(iter));
|
||||
master_recv.push_back(cummul_master_recv, "it_" + to_string(iter));
|
||||
|
||||
} // END SIMULATION LOOP
|
||||
} // END SIMULATION LOOP
|
||||
|
||||
sim_end = MPI_Wtime();
|
||||
|
||||
if (params.dht_enabled && params.dht_snaps > 0) {
|
||||
cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl;
|
||||
std::string outfile;
|
||||
outfile = params.out_dir + ".dht";
|
||||
for (int i = 1; i < params.world_size; i++) {
|
||||
MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE,
|
||||
MPI_COMM_WORLD);
|
||||
}
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
cout << "CPP: Master: ... done" << endl;
|
||||
}
|
||||
master.finishSimulation();
|
||||
|
||||
Rcpp::NumericVector phreeqc_time;
|
||||
Rcpp::NumericVector dht_get_time;
|
||||
@ -702,6 +470,10 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
timings = (double *)calloc(3, sizeof(double));
|
||||
|
||||
int dht_hits = 0;
|
||||
int dht_miss = 0;
|
||||
int dht_collision = 0;
|
||||
|
||||
if (params.dht_enabled) {
|
||||
dht_hits = 0;
|
||||
dht_miss = 0;
|
||||
@ -748,23 +520,21 @@ int main(int argc, char *argv[]) {
|
||||
R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
|
||||
R["simtime_chemistry"] = cummul_chemistry;
|
||||
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
|
||||
R["simtime_workers"] = cummul_workers;
|
||||
R["simtime_workers"] = master.getWorkerTime();
|
||||
R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
|
||||
R["simtime_chemistry_master"] = cummul_chemistry_master;
|
||||
R["simtime_chemistry_master"] = master.getChemMasterTime();
|
||||
R.parseEvalQ(
|
||||
"profiling$simtime_chemistry_master <- simtime_chemistry_master");
|
||||
|
||||
R["seq_master_prep"] = cummul_master_seq_pre_loop;
|
||||
R.parseEvalQ("profiling$seq_master_prep <- seq_master_prep");
|
||||
R["seq_master_loop"] = cummul_master_seq_loop;
|
||||
R.parseEvalQ("profiling$seq_master_loop <- seq_master_loop");
|
||||
R["seq_master"] = cummul_master_seq;
|
||||
R.parseEvalQ("profiling$seq_master <- seq_master");
|
||||
|
||||
// R["master_send"] = master_send;
|
||||
// R.parseEvalQ("profiling$master_send <- master_send");
|
||||
// R["master_recv"] = master_recv;
|
||||
// R.parseEvalQ("profiling$master_recv <- master_recv");
|
||||
|
||||
R["idle_master"] = master_idle;
|
||||
R["idle_master"] = master.getIdleTime();
|
||||
R.parseEvalQ("profiling$idle_master <- idle_master");
|
||||
R["idle_worker"] = idle_worker;
|
||||
R.parseEvalQ("profiling$idle_worker <- idle_worker");
|
||||
@ -788,11 +558,9 @@ int main(int argc, char *argv[]) {
|
||||
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
|
||||
}
|
||||
|
||||
free(workerlist);
|
||||
free(timings);
|
||||
|
||||
if (params.dht_enabled)
|
||||
free(dht_perfs);
|
||||
if (params.dht_enabled) free(dht_perfs);
|
||||
|
||||
cout << "CPP: Done! Results are stored as R objects into <"
|
||||
<< params.out_dir << "/timings.rds>" << endl;
|
||||
@ -802,42 +570,22 @@ int main(int argc, char *argv[]) {
|
||||
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
|
||||
R.parseEval(r_vis_code);
|
||||
} else { /*This is executed by the workers*/
|
||||
if (!params.dht_file.empty()) {
|
||||
int res = file_to_table((char *)params.dht_file.c_str());
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res == DHT_WRONG_FILE) {
|
||||
if (params.world_rank == 2)
|
||||
cerr << "CPP: Worker: Wrong File" << endl;
|
||||
} else {
|
||||
if (params.world_rank == 2)
|
||||
cerr << "CPP: Worker: Error in loading current state of DHT from "
|
||||
"file"
|
||||
<< endl;
|
||||
}
|
||||
return EXIT_FAILURE;
|
||||
} else {
|
||||
if (params.world_rank == 2)
|
||||
cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
||||
<< params.dht_file << endl;
|
||||
std::cout.flush();
|
||||
}
|
||||
}
|
||||
worker_function(¶ms);
|
||||
free(mpi_buffer_results);
|
||||
ChemWorker worker(¶ms, R, grid);
|
||||
worker.prepareSimulation(dht_comm);
|
||||
worker.loop();
|
||||
}
|
||||
|
||||
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
|
||||
|
||||
if (params.dht_enabled) {
|
||||
|
||||
if (params.dht_strategy == 0) {
|
||||
if (params.world_rank != 0) {
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
}
|
||||
} else {
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
}
|
||||
}
|
||||
// if (params.dht_enabled) {
|
||||
// if (params.dht_strategy == 0) {
|
||||
// if (params.world_rank != 0) {
|
||||
// DHT_free(dht_object, NULL, NULL);
|
||||
// }
|
||||
// } else {
|
||||
// DHT_free(dht_object, NULL, NULL);
|
||||
// }
|
||||
// }
|
||||
|
||||
free(mpi_buffer);
|
||||
MPI_Finalize();
|
||||
|
||||
217
src/model/ChemMaster.cpp
Normal file
217
src/model/ChemMaster.cpp
Normal file
@ -0,0 +1,217 @@
|
||||
#include <Rcpp.h>
|
||||
#include <mpi.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "ChemSim.h"
|
||||
|
||||
using namespace poet;
|
||||
using namespace std;
|
||||
using namespace Rcpp;
|
||||
|
||||
#define TAG_WORK 42
|
||||
|
||||
ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_)
|
||||
: ChemSim(params, R_, grid_) {
|
||||
this->wp_size = params->wp_size;
|
||||
this->out_dir = params->out_dir;
|
||||
}
|
||||
|
||||
void ChemMaster::runIteration() {
|
||||
double seq_a, seq_b, seq_c, seq_d;
|
||||
double worker_chemistry_a, worker_chemistry_b;
|
||||
double sim_e_chemistry, sim_f_chemistry;
|
||||
int pkg_to_send, pkg_to_recv;
|
||||
int free_workers;
|
||||
int i_pkgs;
|
||||
|
||||
seq_a = MPI_Wtime();
|
||||
grid.shuffleAndExport(mpi_buffer);
|
||||
// retrieve data from R runtime
|
||||
iteration = (int)R.parseEval("mysetup$iter");
|
||||
dt = (double)R.parseEval("mysetup$requested_dt");
|
||||
current_sim_time =
|
||||
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
|
||||
|
||||
// setup local variables
|
||||
pkg_to_send = wp_sizes_vector.size();
|
||||
pkg_to_recv = wp_sizes_vector.size();
|
||||
work_pointer = mpi_buffer;
|
||||
free_workers = world_size - 1;
|
||||
i_pkgs = 0;
|
||||
|
||||
seq_b = MPI_Wtime();
|
||||
seq_t += seq_b - seq_a;
|
||||
|
||||
worker_chemistry_a = MPI_Wtime();
|
||||
while (pkg_to_recv > 0) {
|
||||
// TODO: Progressbar into IO instance.
|
||||
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
|
||||
if (pkg_to_send > 0) {
|
||||
sendPkgs(pkg_to_send, i_pkgs, free_workers);
|
||||
}
|
||||
recvPkgs(pkg_to_recv, pkg_to_send > 0, free_workers);
|
||||
}
|
||||
|
||||
// Just to complete the progressbar
|
||||
cout << endl;
|
||||
|
||||
worker_chemistry_b = MPI_Wtime();
|
||||
worker_t = worker_chemistry_b - worker_chemistry_a;
|
||||
|
||||
seq_c = MPI_Wtime();
|
||||
grid.importAndUnshuffle(mpi_buffer);
|
||||
/* do master stuff */
|
||||
sim_e_chemistry = MPI_Wtime();
|
||||
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
|
||||
sim_f_chemistry = MPI_Wtime();
|
||||
chem_master += sim_f_chemistry - sim_e_chemistry;
|
||||
seq_d = MPI_Wtime();
|
||||
seq_t += seq_d - seq_c;
|
||||
}
|
||||
|
||||
void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
|
||||
int &free_workers) {
|
||||
double master_send_a, master_send_b;
|
||||
int local_work_package_size;
|
||||
int end_of_wp;
|
||||
|
||||
// start time measurement
|
||||
master_send_a = MPI_Wtime();
|
||||
/*search for free workers and send work*/
|
||||
for (int p = 0; p < world_size - 1; p++) {
|
||||
if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ {
|
||||
// to enable different work_package_size, set local copy of
|
||||
// work_package_size to either global work_package size or
|
||||
// remaining 'to_send' packages to_send >= work_package_size ?
|
||||
// local_work_package_size = work_package_size :
|
||||
// local_work_package_size = to_send;
|
||||
|
||||
local_work_package_size = (int)wp_sizes_vector[count_pkgs];
|
||||
count_pkgs++;
|
||||
|
||||
// cout << "CPP: sending pkg n. " << count_pkgs << " with size "
|
||||
// << local_work_package_size << endl;
|
||||
|
||||
/*push pointer forward to next work package, after taking the
|
||||
* current one*/
|
||||
workerlist[p].send_addr = work_pointer;
|
||||
|
||||
end_of_wp = local_work_package_size * grid.getCols();
|
||||
work_pointer = &(work_pointer[end_of_wp]);
|
||||
|
||||
// fill send buffer starting with work_package ...
|
||||
std::memcpy(send_buffer, workerlist[p].send_addr,
|
||||
(end_of_wp) * sizeof(double));
|
||||
// followed by: work_package_size
|
||||
send_buffer[end_of_wp] = (double)local_work_package_size;
|
||||
// current iteration of simulation
|
||||
send_buffer[end_of_wp + 1] = (double)iteration;
|
||||
// size of timestep in seconds
|
||||
send_buffer[end_of_wp + 2] = dt;
|
||||
// current time of simulation (age) in seconds
|
||||
send_buffer[end_of_wp + 3] = current_sim_time;
|
||||
// placeholder for work_package_count
|
||||
send_buffer[end_of_wp + 4] = 0.;
|
||||
|
||||
/* ATTENTION Worker p has rank p+1 */
|
||||
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
|
||||
TAG_WORK, MPI_COMM_WORLD);
|
||||
|
||||
workerlist[p].has_work = 1;
|
||||
free_workers--;
|
||||
pkg_to_send -= 1;
|
||||
}
|
||||
}
|
||||
master_send_b = MPI_Wtime();
|
||||
send_t += master_send_b - master_send_a;
|
||||
}
|
||||
|
||||
void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers) {
|
||||
int need_to_receive = 1;
|
||||
double master_recv_a, master_recv_b;
|
||||
double idle_a, idle_b;
|
||||
int p, size;
|
||||
|
||||
MPI_Status probe_status;
|
||||
master_recv_a = MPI_Wtime();
|
||||
while (need_to_receive && pkg_to_recv > 0) {
|
||||
if (to_send && free_workers > 0)
|
||||
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive,
|
||||
&probe_status);
|
||||
else {
|
||||
idle_a = MPI_Wtime();
|
||||
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status);
|
||||
idle_b = MPI_Wtime();
|
||||
master_idle += idle_b - idle_a;
|
||||
}
|
||||
|
||||
if (need_to_receive) {
|
||||
p = probe_status.MPI_SOURCE;
|
||||
size;
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
|
||||
MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, TAG_WORK,
|
||||
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||
workerlist[p - 1].has_work = 0;
|
||||
pkg_to_recv -= 1;
|
||||
free_workers++;
|
||||
}
|
||||
}
|
||||
master_recv_b = MPI_Wtime();
|
||||
recv_t += master_recv_b - master_recv_a;
|
||||
}
|
||||
|
||||
void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
|
||||
/* visual progress */
|
||||
double progress = (float)(count_pkgs + 1) / n_wp;
|
||||
|
||||
cout << "[";
|
||||
int pos = barWidth * progress;
|
||||
for (int iprog = 0; iprog < barWidth; ++iprog) {
|
||||
if (iprog < pos)
|
||||
cout << "=";
|
||||
else if (iprog == pos)
|
||||
cout << ">";
|
||||
else
|
||||
cout << " ";
|
||||
}
|
||||
std::cout << "] " << int(progress * 100.0) << " %\r";
|
||||
std::cout.flush();
|
||||
/* end visual progress */
|
||||
}
|
||||
|
||||
void ChemMaster::prepareSimulation() {
|
||||
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
|
||||
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
|
||||
sizeof(double));
|
||||
|
||||
R.parseEvalQ(
|
||||
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
|
||||
"package_size=work_package_size)");
|
||||
|
||||
// we only sort once the vector
|
||||
R.parseEvalQ("ordered_ids <- order(wp_ids)");
|
||||
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
|
||||
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
|
||||
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
|
||||
|
||||
mpi_buffer =
|
||||
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
||||
}
|
||||
|
||||
void ChemMaster::finishSimulation() {
|
||||
free(mpi_buffer);
|
||||
free(workerlist);
|
||||
}
|
||||
|
||||
double ChemMaster::getSendTime() { return this->send_t; }
|
||||
|
||||
double ChemMaster::getRecvTime() { return this->recv_t; }
|
||||
|
||||
double ChemMaster::getIdleTime() { return this->master_idle; }
|
||||
|
||||
double ChemMaster::getWorkerTime() { return this->worker_t; }
|
||||
|
||||
double ChemMaster::getChemMasterTime() { return this->chem_master; }
|
||||
|
||||
double ChemMaster::getSeqTime() { return this->seq_t; }
|
||||
23
src/model/ChemSim.cpp
Normal file
23
src/model/ChemSim.cpp
Normal file
@ -0,0 +1,23 @@
|
||||
#include "ChemSim.h"
|
||||
#include "../util/RRuntime.h"
|
||||
#include "Grid.h"
|
||||
#include <Rcpp.h>
|
||||
#include <iostream>
|
||||
#include <mpi.h>
|
||||
|
||||
using namespace Rcpp;
|
||||
using namespace poet;
|
||||
|
||||
ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_)
|
||||
: R(R_), grid(grid_) {
|
||||
this->world_rank = params->world_rank;
|
||||
this->world_size = params->world_size;
|
||||
this->wp_size = params->wp_size;
|
||||
this->out_dir = params->out_dir;
|
||||
}
|
||||
|
||||
void ChemSim::runSeq() {
|
||||
R.parseEvalQ(
|
||||
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
|
||||
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
|
||||
}
|
||||
115
src/model/ChemSim.h
Normal file
115
src/model/ChemSim.h
Normal file
@ -0,0 +1,115 @@
|
||||
#ifndef CHEMSIM_H
|
||||
#define CHEMSIM_H
|
||||
|
||||
#include "../DHT/DHT_Wrapper.h"
|
||||
#include "../util/RRuntime.h"
|
||||
#include "../util/SimParams.h"
|
||||
#include "Grid.h"
|
||||
|
||||
#include <mpi.h>
|
||||
#include <vector>
|
||||
|
||||
#define BUFFER_OFFSET 5
|
||||
#define TAG_WORK 42
|
||||
#define TAG_FINISH 43
|
||||
#define TAG_TIMING 44
|
||||
#define TAG_DHT_PERF 45
|
||||
#define TAG_DHT_STATS 46
|
||||
#define TAG_DHT_STORE 47
|
||||
#define TAG_DHT_ITER 48
|
||||
|
||||
namespace poet {
|
||||
class ChemSim {
|
||||
public:
|
||||
ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_);
|
||||
void runSeq();
|
||||
|
||||
protected:
|
||||
double current_sim_time = 0;
|
||||
int iteration = 0;
|
||||
int dt = 0;
|
||||
|
||||
int world_rank;
|
||||
int world_size;
|
||||
unsigned int wp_size;
|
||||
|
||||
RRuntime &R;
|
||||
Grid &grid;
|
||||
|
||||
std::vector<int> wp_sizes_vector;
|
||||
std::string out_dir;
|
||||
|
||||
double *send_buffer;
|
||||
typedef struct {
|
||||
char has_work;
|
||||
double *send_addr;
|
||||
} worker_struct;
|
||||
worker_struct *workerlist;
|
||||
|
||||
double *mpi_buffer;
|
||||
};
|
||||
|
||||
class ChemMaster : public ChemSim {
|
||||
public:
|
||||
ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_);
|
||||
|
||||
void prepareSimulation();
|
||||
void finishSimulation();
|
||||
|
||||
void runIteration();
|
||||
|
||||
double getSendTime();
|
||||
double getRecvTime();
|
||||
double getIdleTime();
|
||||
double getWorkerTime();
|
||||
double getChemMasterTime();
|
||||
double getSeqTime();
|
||||
|
||||
private:
|
||||
void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70);
|
||||
void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers);
|
||||
void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers);
|
||||
|
||||
unsigned int wp_size;
|
||||
double *work_pointer;
|
||||
|
||||
double send_t = 0.f;
|
||||
double recv_t = 0.f;
|
||||
double master_idle = 0.f;
|
||||
double worker_t = 0.f;
|
||||
double chem_master = 0.f;
|
||||
double seq_t = 0.f;
|
||||
};
|
||||
|
||||
class ChemWorker : public ChemSim {
|
||||
public:
|
||||
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_);
|
||||
|
||||
void prepareSimulation(MPI_Comm dht_comm);
|
||||
void loop();
|
||||
|
||||
private:
|
||||
void doWork(MPI_Status &probe_status);
|
||||
void postIter();
|
||||
void finishWork();
|
||||
void writeFile();
|
||||
void readFile();
|
||||
|
||||
bool dht_enabled;
|
||||
bool dt_differ;
|
||||
int dht_snaps;
|
||||
std::string dht_file;
|
||||
unsigned int dht_size_per_process;
|
||||
std::vector<bool> dht_flags;
|
||||
t_simparams *params;
|
||||
|
||||
double *mpi_buffer_results;
|
||||
|
||||
DHT_Wrapper *dht;
|
||||
|
||||
double timing[3];
|
||||
double idle_t = 0.f;
|
||||
int phreeqc_count = 0;
|
||||
};
|
||||
} // namespace poet
|
||||
#endif // CHEMSIM_H
|
||||
325
src/model/ChemWorker.cpp
Normal file
325
src/model/ChemWorker.cpp
Normal file
@ -0,0 +1,325 @@
|
||||
#include <Rcpp.h>
|
||||
#include <mpi.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
|
||||
#include "ChemSim.h"
|
||||
|
||||
using namespace poet;
|
||||
using namespace std;
|
||||
using namespace Rcpp;
|
||||
|
||||
ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_)
|
||||
: params(params_), ChemSim(params_, R_, grid_) {
|
||||
this->dt_differ = params->dt_differ;
|
||||
this->dht_enabled = params->dht_enabled;
|
||||
this->dht_size_per_process = params->dht_size_per_process;
|
||||
this->dht_file = params->dht_file;
|
||||
}
|
||||
|
||||
void ChemWorker::prepareSimulation(MPI_Comm dht_comm) {
|
||||
mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
|
||||
sizeof(double));
|
||||
mpi_buffer_results =
|
||||
(double *)calloc(wp_size * (grid.getCols()), sizeof(double));
|
||||
|
||||
if (world_rank == 1)
|
||||
cout << "Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF") << endl;
|
||||
|
||||
if (dht_enabled) {
|
||||
int data_size = grid.getCols() * sizeof(double);
|
||||
int key_size =
|
||||
grid.getCols() * sizeof(double) + (dt_differ * sizeof(double));
|
||||
int dht_buckets_per_process =
|
||||
dht_size_per_process / (1 + data_size + key_size);
|
||||
|
||||
if (world_rank == 1)
|
||||
cout << "CPP: Worker: data size: " << data_size << " bytes" << endl
|
||||
<< "CPP: Worker: key size: " << key_size << "bytes" << endl
|
||||
<< "CPP: Worker: buckets per process " << dht_buckets_per_process
|
||||
<< endl;
|
||||
|
||||
dht = new DHT_Wrapper(params, dht_comm, dht_buckets_per_process, data_size,
|
||||
key_size);
|
||||
|
||||
if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl;
|
||||
}
|
||||
|
||||
if (!dht_file.empty()) readFile();
|
||||
|
||||
// set size
|
||||
dht_flags.resize(params->wp_size, true);
|
||||
// assign all elements to true (default)
|
||||
dht_flags.assign(params->wp_size, true);
|
||||
|
||||
timing[0] = 0.0;
|
||||
timing[1] = 0.0;
|
||||
timing[2] = 0.0;
|
||||
}
|
||||
|
||||
void ChemWorker::loop() {
|
||||
MPI_Status probe_status;
|
||||
while (1) {
|
||||
double idle_a = MPI_Wtime();
|
||||
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
|
||||
double idle_b = MPI_Wtime();
|
||||
|
||||
if (probe_status.MPI_TAG == TAG_WORK) {
|
||||
idle_t += idle_b - idle_a;
|
||||
doWork(probe_status);
|
||||
} else if (probe_status.MPI_TAG == TAG_FINISH) {
|
||||
finishWork();
|
||||
break;
|
||||
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
|
||||
postIter();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ChemWorker::doWork(MPI_Status &probe_status) {
|
||||
int count;
|
||||
int local_work_package_size = 0;
|
||||
|
||||
double dht_get_start, dht_get_end;
|
||||
double phreeqc_time_start, phreeqc_time_end;
|
||||
double dht_fill_start, dht_fill_end;
|
||||
|
||||
/* get number of doubles sent */
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
|
||||
|
||||
/* receive */
|
||||
MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
// decrement count of work_package by BUFFER_OFFSET
|
||||
count -= BUFFER_OFFSET;
|
||||
// check for changes on all additional variables given by the 'header' of
|
||||
// mpi_buffer
|
||||
|
||||
// work_package_size
|
||||
if (mpi_buffer[count] != local_work_package_size) { // work_package_size
|
||||
local_work_package_size = mpi_buffer[count];
|
||||
R["work_package_size"] = local_work_package_size;
|
||||
R.parseEvalQ("mysetup$work_package_size <- work_package_size");
|
||||
}
|
||||
|
||||
// current iteration of simulation
|
||||
if (mpi_buffer[count + 1] != iteration) {
|
||||
iteration = mpi_buffer[count + 1];
|
||||
R["iter"] = iteration;
|
||||
R.parseEvalQ("mysetup$iter <- iter");
|
||||
}
|
||||
|
||||
// current timestep size
|
||||
if (mpi_buffer[count + 2] != dt) {
|
||||
dt = mpi_buffer[count + 2];
|
||||
R["dt"] = dt;
|
||||
R.parseEvalQ("mysetup$dt <- dt");
|
||||
}
|
||||
|
||||
// current simulation time ('age' of simulation)
|
||||
if (mpi_buffer[count + 3] != current_sim_time) {
|
||||
current_sim_time = mpi_buffer[count + 3];
|
||||
R["simulation_time"] = current_sim_time;
|
||||
R.parseEvalQ("mysetup$simulation_time <- simulation_time");
|
||||
}
|
||||
/* 4th double value is currently a placeholder */
|
||||
// if (mpi_buffer[count+4] != placeholder) {
|
||||
// placeholder = mpi_buffer[count+4];
|
||||
// R["mysetup$placeholder"] = placeholder;
|
||||
// }
|
||||
|
||||
/* get df with right structure to fill in work package */
|
||||
// R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
|
||||
// R["skeleton"] = grid.buildDataFrame(work_package_size);
|
||||
//// R.parseEval("print(rownames(tmp2)[1:5])");
|
||||
//// R.parseEval("print(head(tmp2, 2))");
|
||||
//// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
|
||||
|
||||
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
|
||||
// R.setBufferDataFrame("skeleton");
|
||||
|
||||
if (dht_enabled) {
|
||||
// DEBUG
|
||||
// cout << "RANK " << world_rank << " start checking DHT\n";
|
||||
|
||||
// resize helper vector dht_flags of work_package_size changes
|
||||
if ((int)dht_flags.size() != local_work_package_size) {
|
||||
dht_flags.resize(local_work_package_size, true); // set size
|
||||
dht_flags.assign(local_work_package_size,
|
||||
true); // assign all elements to true (default)
|
||||
}
|
||||
|
||||
dht_get_start = MPI_Wtime();
|
||||
dht->checkDHT(local_work_package_size, dht_flags, mpi_buffer, dt);
|
||||
dht_get_end = MPI_Wtime();
|
||||
|
||||
// DEBUG
|
||||
// cout << "RANK " << world_rank << " checking DHT complete \n";
|
||||
|
||||
R["dht_flags"] = as<LogicalVector>(wrap(dht_flags));
|
||||
// R.parseEvalQ("print(head(dht_flags))");
|
||||
}
|
||||
|
||||
/* work */
|
||||
// R.from_C_domain(mpi_buffer);
|
||||
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
|
||||
// R["work_package_full"] = R.getBufferDataFrame();
|
||||
// R["work_package"] = buffer;
|
||||
|
||||
// DEBUG
|
||||
// R.parseEvalQ("print(head(work_package_full))");
|
||||
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
|
||||
|
||||
grid.importWP(mpi_buffer, params->wp_size);
|
||||
|
||||
if (params->dht_enabled) {
|
||||
R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
|
||||
} else {
|
||||
R.parseEvalQ("work_package <- work_package_full");
|
||||
}
|
||||
|
||||
// DEBUG
|
||||
// R.parseEvalQ("print(head(work_package),2)");
|
||||
|
||||
// R.parseEvalQ("rownames(work_package) <- work_package$id");
|
||||
// R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in%
|
||||
// colnames(work_package)"); R.parseEvalQ("id_store <-
|
||||
// rownames(work_package)"); //"[, ncol(work_package)]");
|
||||
// R.parseEvalQ("work_package$id <- NULL");
|
||||
R.parseEvalQ("work_package <- as.matrix(work_package)");
|
||||
|
||||
unsigned int nrows = R.parseEval("nrow(work_package)");
|
||||
|
||||
if (nrows > 0) {
|
||||
/*Single Line error Workaround*/
|
||||
if (nrows <= 1) {
|
||||
// duplicate line to enable correct simmulation
|
||||
R.parseEvalQ(
|
||||
"work_package <- work_package[rep(1:nrow(work_package), "
|
||||
"times = 2), ]");
|
||||
}
|
||||
|
||||
phreeqc_count++;
|
||||
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
// MDL
|
||||
// R.parseEvalQ("print('Work_package:\n'); print(head(work_package ,
|
||||
// 2)); cat('RCpp: worker_function:', local_rank, ' \n')");
|
||||
R.parseEvalQ(
|
||||
"result <- as.data.frame(slave_chemistry(setup=mysetup, "
|
||||
"data = work_package))");
|
||||
phreeqc_time_end = MPI_Wtime();
|
||||
// R.parseEvalQ("result$id <- id_store");
|
||||
} else {
|
||||
// cout << "Work-Package is empty, skipping phreeqc!" << endl;
|
||||
}
|
||||
|
||||
if (dht_enabled) {
|
||||
R.parseEvalQ("result_full <- work_package_full");
|
||||
if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result");
|
||||
} else {
|
||||
R.parseEvalQ("result_full <- result");
|
||||
}
|
||||
|
||||
// R.setBufferDataFrame("result_full");
|
||||
////Rcpp::DataFrame result = R.parseEval("result_full");
|
||||
////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
|
||||
// R.to_C_domain(mpi_buffer_results);
|
||||
|
||||
grid.exportWP(mpi_buffer_results);
|
||||
/* send results to master */
|
||||
MPI_Request send_req;
|
||||
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
|
||||
&send_req);
|
||||
|
||||
if (dht_enabled) {
|
||||
dht_fill_start = MPI_Wtime();
|
||||
dht->fillDHT(local_work_package_size, dht_flags, mpi_buffer,
|
||||
mpi_buffer_results, dt);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
timing[1] += dht_get_end - dht_get_start;
|
||||
timing[2] += dht_fill_end - dht_fill_start;
|
||||
}
|
||||
|
||||
timing[0] += phreeqc_time_end - phreeqc_time_start;
|
||||
|
||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
void ChemWorker::postIter() {
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_ITER, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
if (dht_enabled) {
|
||||
dht->printStatistics();
|
||||
|
||||
if (dht_snaps == 2) {
|
||||
writeFile();
|
||||
}
|
||||
}
|
||||
// synchronize all processes
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
}
|
||||
|
||||
void ChemWorker::writeFile() {
|
||||
std::stringstream out;
|
||||
out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht";
|
||||
int res = dht->tableToFile(out.str().c_str());
|
||||
if (res != DHT_SUCCESS && world_rank == 2)
|
||||
cerr << "CPP: Worker: Errir in writing current state of DHT to file."
|
||||
<< endl;
|
||||
else if (world_rank == 2)
|
||||
cout << "CPP: Worker: Successfully written DHT to file " << out.str()
|
||||
<< endl;
|
||||
}
|
||||
|
||||
void ChemWorker::readFile() {
|
||||
int res = dht->fileToTable((char *)dht_file.c_str());
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res == DHT_WRONG_FILE) {
|
||||
if (world_rank == 1)
|
||||
cerr << "CPP: Worker: Wrong file layout! Continue with empty DHT ..."
|
||||
<< endl;
|
||||
} else {
|
||||
if (world_rank == 1)
|
||||
cerr << "CPP: Worker: Error in loading current state of DHT from "
|
||||
"file. Continue with empty DHT ..."
|
||||
<< endl;
|
||||
}
|
||||
} else {
|
||||
if (world_rank == 2)
|
||||
cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
||||
<< dht_file << endl;
|
||||
std::cout.flush();
|
||||
}
|
||||
}
|
||||
|
||||
void ChemWorker::finishWork() {
|
||||
if (dht_enabled && dht_snaps > 0) writeFile();
|
||||
|
||||
double dht_perf[3];
|
||||
/* before death, submit profiling/timings to master*/
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
// timings
|
||||
MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||
|
||||
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||
MPI_Send(&idle_t, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||
|
||||
if (dht_enabled) {
|
||||
// dht_perf
|
||||
dht_perf[0] = dht->getHits();
|
||||
dht_perf[1] = dht->getMisses();
|
||||
dht_perf[2] = dht->getEvictions();
|
||||
MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF,
|
||||
MPI_COMM_WORLD);
|
||||
}
|
||||
|
||||
free(mpi_buffer);
|
||||
free(mpi_buffer_results);
|
||||
delete dht;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user