mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
Cleanup old source files
This commit is contained in:
parent
6905154f5e
commit
19b4b98b44
423
src/DHT.cpp
423
src/DHT.cpp
@ -1,423 +0,0 @@
|
||||
#include "DHT.h"
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
#include <stdio.h>
|
||||
|
||||
/*
|
||||
* determining destination rank and index by hash of key
|
||||
*
|
||||
* return values by reference
|
||||
*/
|
||||
static void determine_dest(uint64_t hash, int comm_size, unsigned int table_size, unsigned int *dest_rank, unsigned int *index, unsigned int index_count) {
|
||||
uint64_t tmp;
|
||||
int char_hop = 9-index_count;
|
||||
unsigned int i;
|
||||
for (i = 0; i < index_count ; i++) {
|
||||
tmp = 0;
|
||||
memcpy(&tmp,(unsigned char *)&hash+i, char_hop);
|
||||
index[i] = (unsigned int) (tmp % table_size);
|
||||
}
|
||||
*dest_rank = (unsigned int) (hash % comm_size);
|
||||
}
|
||||
|
||||
/**
|
||||
* set write flag to 1
|
||||
*/
|
||||
static void set_flag(char* flag_byte) {
|
||||
*flag_byte = 0;
|
||||
*flag_byte |= (1 << 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* return 1 if write flag is set
|
||||
* else 0
|
||||
*/
|
||||
static int read_flag(char flag_byte) {
|
||||
if ((flag_byte & 0x01) == 0x01) {
|
||||
return 1;
|
||||
} else return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* allocating memory for DHT object and buckets.
|
||||
* creating MPI window for OSC
|
||||
* filling DHT object with passed by parameters, window, 2 counters for R/W errors and 2 pointers with allocated memory for further use
|
||||
* return DHT object
|
||||
*/
|
||||
DHT* DHT_create(MPI_Comm comm, unsigned int size, int data_size, int key_size, uint64_t(*hash_func) (int, void*)) {
|
||||
DHT *object;
|
||||
MPI_Win window;
|
||||
void* mem_alloc;
|
||||
int comm_size, tmp;
|
||||
|
||||
tmp = (int) ceil(log2(size));
|
||||
if (tmp%8 != 0) tmp = tmp + (8-(tmp%8));
|
||||
|
||||
object = (DHT*) malloc(sizeof(DHT));
|
||||
if (object == NULL) return NULL;
|
||||
|
||||
//every memory allocation has 1 additional byte for flags etc.
|
||||
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, &mem_alloc) != 0) return NULL;
|
||||
if (MPI_Comm_size(comm, &comm_size) != 0) return NULL;
|
||||
|
||||
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
|
||||
|
||||
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), (1 + data_size + key_size), MPI_INFO_NULL, comm, &window) != 0) return NULL;
|
||||
|
||||
object->data_size = data_size;
|
||||
object->key_size = key_size;
|
||||
object->table_size = size;
|
||||
object->window = window;
|
||||
object->hash_func = hash_func;
|
||||
object->comm_size = comm_size;
|
||||
object->communicator = comm;
|
||||
object->read_misses = 0;
|
||||
object->collisions = 0;
|
||||
object->recv_entry = malloc(1 + data_size + key_size);
|
||||
object->send_entry = malloc(1 + data_size + key_size);
|
||||
object->index_count = 9-(tmp/8);
|
||||
object->index = (unsigned int*) malloc((9-(tmp/8))*sizeof(int));
|
||||
object->mem_alloc = mem_alloc;
|
||||
|
||||
DHT_stats *stats;
|
||||
|
||||
stats = (DHT_stats*) malloc(sizeof(DHT_stats));
|
||||
if (stats == NULL) return NULL;
|
||||
|
||||
object->stats = stats;
|
||||
object->stats->writes_local = (int*) calloc(comm_size, sizeof(int));
|
||||
object->stats->old_writes = 0;
|
||||
object->stats->read_misses = 0;
|
||||
object->stats->collisions = 0;
|
||||
object->stats->w_access = 0;
|
||||
object->stats->r_access = 0;
|
||||
|
||||
return object;
|
||||
}
|
||||
|
||||
/*
|
||||
* puts passed by data with key to DHT
|
||||
*
|
||||
* returning DHT_MPI_ERROR = -1 if MPI error occurred
|
||||
* else DHT_SUCCESS = 0 if success
|
||||
*/
|
||||
int DHT_write(DHT *table, void* send_key, void* send_data) {
|
||||
unsigned int dest_rank, i;
|
||||
int result = DHT_SUCCESS;
|
||||
|
||||
table->stats->w_access++;
|
||||
|
||||
//determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
|
||||
|
||||
//concatenating key with data to write entry to DHT
|
||||
set_flag((char *) table->send_entry);
|
||||
memcpy((char *) table->send_entry + 1, (char *) send_key, table->key_size);
|
||||
memcpy((char *) table->send_entry + table->key_size + 1, (char *) send_data, table->data_size);
|
||||
|
||||
//locking window of target rank with exclusive lock
|
||||
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
for (i = 0; i < table->index_count; i++)
|
||||
{
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
//increment collision counter if receiving key doesn't match sending key
|
||||
//,entry has write flag + last index is reached
|
||||
|
||||
if (read_flag(*(char *)table->recv_entry)) {
|
||||
if (memcmp(send_key, (char *) table->recv_entry + 1, table->key_size) != 0) {
|
||||
if (i == (table->index_count)-1) {
|
||||
table->collisions += 1;
|
||||
table->stats->collisions += 1;
|
||||
result = DHT_WRITE_SUCCESS_WITH_COLLISION;
|
||||
break;
|
||||
}
|
||||
} else break;
|
||||
} else {
|
||||
table->stats->writes_local[dest_rank]++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//put data to DHT
|
||||
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
|
||||
//unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* gets data from the DHT by key
|
||||
*
|
||||
* return DHT_SUCCESS = 0 if success
|
||||
* DHT_MPI_ERROR = -1 if MPI error occurred
|
||||
* DHT_READ_ERROR = -2 if receiving key doesn't match sending key
|
||||
*/
|
||||
int DHT_read(DHT *table, void* send_key, void* destination) {
|
||||
unsigned int dest_rank, i;
|
||||
|
||||
table->stats->r_access++;
|
||||
|
||||
//determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, table->table_size, &dest_rank, table->index, table->index_count);
|
||||
|
||||
//locking window of target rank with shared lock
|
||||
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) return DHT_MPI_ERROR;
|
||||
//receive data
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], 1 + table->data_size + table->key_size, MPI_BYTE, table->window) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
//increment read error counter if write flag isn't set or key doesn't match passed by key + last index reached
|
||||
//else copy data to dereference of passed by destination pointer
|
||||
|
||||
if ((read_flag(*(char *) table->recv_entry)) == 0) {
|
||||
table->read_misses += 1;
|
||||
table->stats->read_misses += 1;
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
return DHT_READ_ERROR;
|
||||
}
|
||||
|
||||
if (memcmp(((char*)table->recv_entry) + 1, send_key, table->key_size) != 0) {
|
||||
if (i == (table->index_count)-1) {
|
||||
table->read_misses += 1;
|
||||
table->stats->read_misses += 1;
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
return DHT_READ_ERROR;
|
||||
}
|
||||
} else break;
|
||||
}
|
||||
|
||||
//unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
memcpy((char *) destination, (char *) table->recv_entry + table->key_size + 1, table->data_size);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_to_file(DHT* table, char* filename) {
|
||||
//open file
|
||||
MPI_File file;
|
||||
if (MPI_File_open(table->communicator, filename, MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
//write header (key_size and data_size)
|
||||
if (rank == 0) {
|
||||
if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
|
||||
if (MPI_File_write(file, &table->data_size, 1, MPI_INT, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
|
||||
if (MPI_File_seek_shared(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
char* ptr;
|
||||
int bucket_size = table->key_size + table->data_size + 1;
|
||||
|
||||
//iterate over local memory
|
||||
for (unsigned int i = 0; i < table->table_size; i++) {
|
||||
ptr = (char *) table->mem_alloc + (i * bucket_size);
|
||||
//if bucket has been written to (checked by written_flag)...
|
||||
if (read_flag(*ptr)) {
|
||||
//write key and data to file
|
||||
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
}
|
||||
//close file
|
||||
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_from_file(DHT* table, char* filename) {
|
||||
MPI_File file;
|
||||
MPI_Offset f_size;
|
||||
int e_size, m_size, cur_pos, rank, offset;
|
||||
char* buffer;
|
||||
void* key;
|
||||
void* data;
|
||||
|
||||
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
e_size = table->key_size + table->data_size;
|
||||
m_size = e_size > DHT_HEADER_SIZE ? e_size : DHT_HEADER_SIZE;
|
||||
buffer = (char *) malloc(m_size);
|
||||
|
||||
if (MPI_File_read(file, buffer, DHT_HEADER_SIZE, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
|
||||
|
||||
if (*(int *) buffer != table->key_size) return DHT_WRONG_FILE;
|
||||
if (*(int *) (buffer + 4) != table->data_size) return DHT_WRONG_FILE;
|
||||
|
||||
offset = e_size*table->comm_size;
|
||||
|
||||
if (MPI_File_seek(file, DHT_HEADER_SIZE, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
|
||||
cur_pos = DHT_HEADER_SIZE + (rank * e_size);
|
||||
|
||||
while(cur_pos < f_size) {
|
||||
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) return DHT_FILE_IO_ERROR;
|
||||
MPI_Offset tmp;
|
||||
MPI_File_get_position(file, &tmp);
|
||||
if (MPI_File_read(file, buffer, e_size, MPI_BYTE, MPI_STATUS_IGNORE) != 0) return DHT_FILE_READ_ERROR;
|
||||
key = buffer;
|
||||
data = (buffer+table->key_size);
|
||||
if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR;
|
||||
|
||||
cur_pos += offset;
|
||||
}
|
||||
|
||||
free (buffer);
|
||||
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* frees up memory and accumulate counter
|
||||
*/
|
||||
int DHT_free(DHT* table, int* collision_counter, int* readerror_counter) {
|
||||
int buf;
|
||||
|
||||
if (collision_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->collisions, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
*collision_counter = buf;
|
||||
}
|
||||
if (readerror_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
*readerror_counter = buf;
|
||||
}
|
||||
if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR;
|
||||
free(table->recv_entry);
|
||||
free(table->send_entry);
|
||||
free(table->index);
|
||||
|
||||
free(table->stats->writes_local);
|
||||
free(table->stats);
|
||||
|
||||
free(table);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* prints a table with statistics about current use of DHT
|
||||
* for each participating process and summed up results containing:
|
||||
* 1. occupied buckets (in respect to the memory of this process)
|
||||
* 2. free buckets (in respect to the memory of this process)
|
||||
* 3. calls of DHT_write (w_access)
|
||||
* 4. calls of DHT_read (r_access)
|
||||
* 5. read misses (see DHT_READ_ERROR)
|
||||
* 6. collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION)
|
||||
* 3-6 will reset with every call of this function
|
||||
* finally the amount of new written entries is printed out (in relation to last call of this funtion)
|
||||
*/
|
||||
int DHT_print_statistics(DHT *table) {
|
||||
int *written_buckets;
|
||||
int *read_misses, sum_read_misses;
|
||||
int *collisions, sum_collisions;
|
||||
int sum_w_access, sum_r_access, *w_access, *r_access;
|
||||
int rank;
|
||||
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
//disable possible warning of unitialized variable, which is not the case
|
||||
//since we're using MPI_Gather to obtain all values only on rank 0
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
|
||||
//obtaining all values from all processes in the communicator
|
||||
if (rank == 0) read_misses = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->read_misses = 0;
|
||||
|
||||
if (rank == 0) collisions = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->collisions, 1, MPI_INT, collisions, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->collisions, &sum_collisions, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->collisions = 0;
|
||||
|
||||
if (rank == 0) w_access = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->w_access = 0;
|
||||
|
||||
if (rank == 0) r_access = (int*) malloc(table->comm_size*sizeof(int));
|
||||
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
table->stats->r_access = 0;
|
||||
|
||||
if (rank == 0) written_buckets = (int*) calloc(table->comm_size, sizeof(int));
|
||||
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, MPI_INT, MPI_SUM, 0, table->communicator) != 0) return DHT_MPI_ERROR;
|
||||
|
||||
if (rank == 0) { //only process with rank 0 will print out results as a table
|
||||
int sum_written_buckets = 0;
|
||||
|
||||
for (int i=0; i < table->comm_size; i++) {
|
||||
sum_written_buckets += written_buckets[i];
|
||||
}
|
||||
|
||||
int members = 7;
|
||||
int padsize = (members*12)+1;
|
||||
char pad[padsize+1];
|
||||
|
||||
memset(pad, '-', padsize*sizeof(char));
|
||||
pad[padsize]= '\0';
|
||||
printf("\n");
|
||||
printf("%-35s||resets with every call of this function\n", " ");
|
||||
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n",
|
||||
"rank",
|
||||
"occupied",
|
||||
"free",
|
||||
"w_access",
|
||||
"r_access",
|
||||
"read misses",
|
||||
"collisions");
|
||||
printf("%s\n", pad);
|
||||
for (int i = 0; i < table->comm_size; i++) {
|
||||
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
|
||||
i,
|
||||
written_buckets[i],
|
||||
table->table_size-written_buckets[i],
|
||||
w_access[i],
|
||||
r_access[i],
|
||||
read_misses[i],
|
||||
collisions[i]);
|
||||
}
|
||||
printf("%s\n", pad);
|
||||
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n",
|
||||
"sum",
|
||||
sum_written_buckets,
|
||||
(table->table_size*table->comm_size)-sum_written_buckets,
|
||||
sum_w_access,
|
||||
sum_r_access,
|
||||
sum_read_misses,
|
||||
sum_collisions);
|
||||
|
||||
printf("%s\n", pad);
|
||||
printf("%s %d\n",
|
||||
"new entries:",
|
||||
sum_written_buckets - table->stats->old_writes);
|
||||
|
||||
printf("\n");
|
||||
fflush(stdout);
|
||||
|
||||
table->stats->old_writes = sum_written_buckets;
|
||||
}
|
||||
|
||||
//enable warning again
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
112
src/DHT.h
112
src/DHT.h
@ -1,112 +0,0 @@
|
||||
/*
|
||||
* File: DHT.h
|
||||
* Author: max luebke
|
||||
*
|
||||
* Created on 16. November 2017, 09:14
|
||||
*/
|
||||
|
||||
#ifndef DHT_H
|
||||
#define DHT_H
|
||||
|
||||
#include <mpi.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#define DHT_MPI_ERROR -1
|
||||
#define DHT_READ_ERROR -2
|
||||
#define DHT_SUCCESS 0
|
||||
#define DHT_WRITE_SUCCESS_WITH_COLLISION 1
|
||||
|
||||
#define DHT_WRONG_FILE 11
|
||||
#define DHT_FILE_IO_ERROR 12
|
||||
#define DHT_FILE_READ_ERROR 13
|
||||
#define DHT_FILE_WRITE_ERROR 14
|
||||
|
||||
#define DHT_HEADER_SIZE 8
|
||||
|
||||
typedef struct {;
|
||||
int *writes_local, old_writes;
|
||||
int read_misses, collisions;
|
||||
int w_access, r_access;
|
||||
} DHT_stats;
|
||||
|
||||
typedef struct {
|
||||
MPI_Win window;
|
||||
int data_size;
|
||||
int key_size;
|
||||
unsigned int table_size;
|
||||
MPI_Comm communicator;
|
||||
int comm_size;
|
||||
uint64_t(*hash_func) (int, void*);
|
||||
void* recv_entry;
|
||||
void* send_entry;
|
||||
void* mem_alloc;
|
||||
int read_misses;
|
||||
int collisions;
|
||||
unsigned int *index;
|
||||
unsigned int index_count;
|
||||
DHT_stats *stats;
|
||||
} DHT;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* MPI_Comm comm - communicator of processes that are holding the DHT
|
||||
* int size_per_process - number of buckets each process will create
|
||||
* int data_size - size of data in bytes
|
||||
* int key_size - size of key in bytes
|
||||
* *hash_func - pointer to hashfunction
|
||||
*
|
||||
* return:
|
||||
* NULL if error during initialization
|
||||
* DHT* if success
|
||||
*/
|
||||
extern DHT* DHT_create(MPI_Comm comm, unsigned int size_per_process, int data_size, int key_size, uint64_t(*hash_func)(int, void*));
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
* void* data - pointer to data
|
||||
* void* - pointer to key
|
||||
*
|
||||
* return:
|
||||
* error value (see above)
|
||||
*/
|
||||
extern int DHT_write(DHT *table, void* key, void* data);
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
* void* key - pointer to key
|
||||
* void* destination - pointer which will hold the resulting data from DHT
|
||||
*
|
||||
* return:
|
||||
* error value (see above)
|
||||
*/
|
||||
extern int DHT_read(DHT *table, void* key, void* destination);
|
||||
|
||||
extern int DHT_to_file(DHT *table, char* filename);
|
||||
|
||||
extern int DHT_from_file(DHT *table, char* filename);
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
* int* collision_counter - pointer which will hold the total count of collisions
|
||||
* int* readerror_counter - pointer which will hold the total count of read errors
|
||||
*
|
||||
* return:
|
||||
* error value (see above)
|
||||
*/
|
||||
extern int DHT_free(DHT *table, int* collision_counter, int* readerror_counter);
|
||||
|
||||
/*
|
||||
* parameters:
|
||||
* DHT *table - DHT_object created by DHT_create
|
||||
*
|
||||
* return:
|
||||
* error value (DHT_SUCCESS or DHT_MPI_ERROR)
|
||||
*/
|
||||
extern int DHT_print_statistics(DHT *table);
|
||||
|
||||
#endif /* DHT_H */
|
||||
@ -1,176 +0,0 @@
|
||||
#include "dht_wrapper.h"
|
||||
#include <openssl/md5.h>
|
||||
|
||||
using namespace poet;
|
||||
|
||||
/*init globals*/
|
||||
//bool dht_enabled;
|
||||
//int dht_snaps;
|
||||
//int dht_strategy;
|
||||
//int dht_significant_digits;
|
||||
//std::string dht_file;
|
||||
//std::vector<int> dht_significant_digits_vector;
|
||||
//std::vector<string> prop_type_vector;
|
||||
//bool dht_logarithm;
|
||||
//uint64_t dht_size_per_process;
|
||||
uint64_t dht_hits, dht_miss, dht_collision;
|
||||
RInside *R_DHT;
|
||||
std::vector<bool> dht_flags;
|
||||
DHT *dht_object;
|
||||
|
||||
double *fuzzing_buffer;
|
||||
|
||||
//bool dt_differ;
|
||||
/*functions*/
|
||||
|
||||
uint64_t get_md5(int key_size, void *key) {
|
||||
MD5_CTX ctx;
|
||||
unsigned char sum[MD5_DIGEST_LENGTH];
|
||||
uint64_t retval, *v1, *v2;
|
||||
|
||||
MD5_Init(&ctx);
|
||||
MD5_Update(&ctx, key, key_size);
|
||||
MD5_Final(sum, &ctx);
|
||||
|
||||
v1 = (uint64_t *)&sum[0];
|
||||
v2 = (uint64_t *)&sum[8];
|
||||
retval = *v1 ^ *v2;
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
// double Round_off(double N, double n) {
|
||||
// double result;
|
||||
// R["roundsig"] = n;
|
||||
// R["roundin"] = N;
|
||||
//
|
||||
// result = R.parseEval("signif(roundin, digits=roundsig)");
|
||||
//
|
||||
// return result;
|
||||
//}
|
||||
|
||||
/*
|
||||
* Stores fuzzed version of key in fuzzing_buffer
|
||||
*/
|
||||
void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params) {
|
||||
unsigned int i = 0;
|
||||
// introduce fuzzing to allow more hits in DHT
|
||||
for (i = 0; i < (unsigned int)var_count; i++) {
|
||||
if (params->dht_prop_type_vector[i] == "act") {
|
||||
// with log10
|
||||
if (params->dht_log) {
|
||||
if (((double *)key)[i] < 0)
|
||||
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in "
|
||||
"key!"
|
||||
<< endl;
|
||||
else if (((double *)key)[i] == 0)
|
||||
fuzzing_buffer[i] = 0;
|
||||
else
|
||||
fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])),
|
||||
params->dht_signif_vector[i]);
|
||||
} else {
|
||||
// without log10
|
||||
fuzzing_buffer[i] =
|
||||
ROUND((((double *)key)[i]), params->dht_signif_vector[i]);
|
||||
}
|
||||
} else if (params->dht_prop_type_vector[i] == "logact") {
|
||||
fuzzing_buffer[i] =
|
||||
ROUND((((double *)key)[i]), params->dht_signif_vector[i]);
|
||||
} else if (params->dht_prop_type_vector[i] == "ignore") {
|
||||
fuzzing_buffer[i] = 0;
|
||||
} else {
|
||||
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong "
|
||||
"prop_type!"
|
||||
<< endl;
|
||||
}
|
||||
}
|
||||
if (params->dt_differ)
|
||||
fuzzing_buffer[var_count] = dt;
|
||||
}
|
||||
|
||||
void check_dht(int length, std::vector<bool> &out_result_index,
|
||||
double *work_package, double dt, t_simparams *params) {
|
||||
void *key;
|
||||
int res;
|
||||
int var_count = params->dht_prop_type_vector.size();
|
||||
for (int i = 0; i < length; i++) {
|
||||
key = (void *)&(work_package[i * var_count]);
|
||||
|
||||
// fuzz data (round, logarithm etc.)
|
||||
fuzz_for_dht(var_count, key, dt, params);
|
||||
|
||||
// overwrite input with data from DHT, IF value is found in DHT
|
||||
res = DHT_read(dht_object, fuzzing_buffer, key);
|
||||
|
||||
if (res == DHT_SUCCESS) {
|
||||
// flag that this line is replaced by DHT-value, do not simulate!!
|
||||
out_result_index[i] = false;
|
||||
dht_hits++;
|
||||
} else if (res == DHT_READ_ERROR) {
|
||||
// this line is untouched, simulation is needed
|
||||
out_result_index[i] = true;
|
||||
dht_miss++;
|
||||
} else {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fill_dht(int length, std::vector<bool> &result_index, double *work_package,
|
||||
double *results, double dt, t_simparams *params) {
|
||||
void *key;
|
||||
void *data;
|
||||
int res;
|
||||
int var_count = params->dht_prop_type_vector.size();
|
||||
|
||||
for (int i = 0; i < length; i++) {
|
||||
key = (void *)&(work_package[i * var_count]);
|
||||
data = (void *)&(results[i * var_count]);
|
||||
|
||||
if (result_index[i]) {
|
||||
// If true -> was simulated, needs to be inserted into dht
|
||||
|
||||
// fuzz data (round, logarithm etc.)
|
||||
fuzz_for_dht(var_count, key, dt, params);
|
||||
|
||||
res = DHT_write(dht_object, fuzzing_buffer, data);
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) {
|
||||
dht_collision++;
|
||||
} else {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void print_statistics() {
|
||||
int res;
|
||||
|
||||
res = DHT_print_statistics(dht_object);
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
|
||||
int table_to_file(char *filename) {
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
return res;
|
||||
}
|
||||
|
||||
int file_to_table(char *filename) {
|
||||
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
if (res != DHT_SUCCESS)
|
||||
return res;
|
||||
|
||||
DHT_print_statistics(dht_object);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
@ -1,55 +0,0 @@
|
||||
#pragma once
|
||||
#include "DHT.h"
|
||||
#include "util/RRuntime.h"
|
||||
#include "util/SimParams.h"
|
||||
#include <math.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
using namespace std;
|
||||
using namespace poet;
|
||||
|
||||
/*Functions*/
|
||||
uint64_t get_md5(int key_size, void *key);
|
||||
void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params);
|
||||
void check_dht(int length, std::vector<bool> &out_result_index,
|
||||
double *work_package, double dt, t_simparams *params);
|
||||
void fill_dht(int length, std::vector<bool> &result_index, double *work_package,
|
||||
double *results, double dt, t_simparams *params);
|
||||
void print_statistics();
|
||||
int table_to_file(char *filename);
|
||||
int file_to_table(char *filename);
|
||||
|
||||
/*globals*/
|
||||
//extern bool dht_enabled;
|
||||
//extern int dht_snaps;
|
||||
//extern std::string dht_file;
|
||||
//extern bool dt_differ;
|
||||
|
||||
// Default DHT Size per process in Byte (defaults to 1 GiB)
|
||||
#define DHT_SIZE_PER_PROCESS 1073741824
|
||||
|
||||
// sets default dht access and distribution strategy
|
||||
#define DHT_STRATEGY 0
|
||||
// 0 -> DHT is on workers, access from workers only
|
||||
// 1 -> DHT is on workers + master, access from master only !NOT IMPLEMENTED
|
||||
// YET!
|
||||
|
||||
#define ROUND(value, signif) \
|
||||
(((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif))
|
||||
|
||||
//extern int dht_strategy;
|
||||
//extern int dht_significant_digits;
|
||||
//extern std::vector<int> dht_significant_digits_vector;
|
||||
//extern std::vector<string> prop_type_vector;
|
||||
//extern bool dht_logarithm;
|
||||
//extern uint64_t dht_size_per_process;
|
||||
|
||||
// global DHT object, can be NULL if not initialized, check strategy
|
||||
extern DHT *dht_object;
|
||||
|
||||
// DHT Performance counter
|
||||
extern uint64_t dht_hits, dht_miss, dht_collision;
|
||||
|
||||
extern double *fuzzing_buffer;
|
||||
extern std::vector<bool> dht_flags;
|
||||
@ -1,9 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#define BUFFER_OFFSET 5
|
||||
|
||||
/*Globals*/
|
||||
extern double* mpi_buffer;
|
||||
extern double* mpi_buffer_results;
|
||||
|
||||
extern uint32_t work_package_size;
|
||||
@ -1,44 +0,0 @@
|
||||
#include "r_utils.h"
|
||||
|
||||
/* This function converts a pure double dataframe into a double array.
|
||||
buffer <- double array, needs to be allocated before
|
||||
df <- reference to input dataframe
|
||||
*/
|
||||
void convert_R_Dataframe_2_C_buffer(double* buffer, Rcpp::DataFrame &df)
|
||||
{
|
||||
size_t rowCount = df.nrow();
|
||||
size_t colCount = df.ncol();
|
||||
|
||||
for (size_t i = 0; i < rowCount; i++)
|
||||
{
|
||||
for (size_t j = 0; j < colCount; j++)
|
||||
{
|
||||
/* Access column vector j and extract value of line i */
|
||||
Rcpp::DoubleVector col = df[j];
|
||||
buffer[i * colCount + j] = col[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* This function converts a double array into a double dataframe.
|
||||
buffer <- input double array
|
||||
df <- reference to output dataframe, needs to be of fitting size, structure will be taken from it
|
||||
*/
|
||||
void convert_C_buffer_2_R_Dataframe(double* buffer, Rcpp::DataFrame &df)
|
||||
{
|
||||
size_t rowCount = df.nrow();
|
||||
size_t colCount = df.ncol();
|
||||
|
||||
for (size_t i = 0; i < rowCount; i++)
|
||||
{
|
||||
for (size_t j = 0; j < colCount; j++)
|
||||
{
|
||||
/* Access column vector j and extract value of line i */
|
||||
Rcpp::DoubleVector col = df[j];
|
||||
col[i] = buffer[i * colCount + j];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -1,6 +0,0 @@
|
||||
#pragma once
|
||||
#include <RInside.h>
|
||||
|
||||
/*Functions*/
|
||||
void convert_R_Dataframe_2_C_buffer(double* buffer, Rcpp::DataFrame &df);
|
||||
void convert_C_buffer_2_R_Dataframe(double* buffer, Rcpp::DataFrame &df);
|
||||
301
src/worker.cpp
301
src/worker.cpp
@ -1,301 +0,0 @@
|
||||
#include "worker.h"
|
||||
#include "dht_wrapper.h"
|
||||
#include "global_buffer.h"
|
||||
#include "model/Grid.h"
|
||||
#include "util/RRuntime.h"
|
||||
#include <Rcpp.h>
|
||||
#include <iostream>
|
||||
#include <mpi.h>
|
||||
|
||||
using namespace poet;
|
||||
using namespace Rcpp;
|
||||
|
||||
void worker_function(t_simparams *params) {
|
||||
RRuntime R = *(static_cast<RRuntime *>(params->R));
|
||||
Grid grid = *(static_cast<Grid *>(params->grid));
|
||||
int world_rank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
||||
MPI_Status probe_status;
|
||||
int count;
|
||||
|
||||
int local_work_package_size;
|
||||
int iteration;
|
||||
double dt, current_sim_time;
|
||||
|
||||
double idle_a, idle_b;
|
||||
double cummul_idle = 0.f;
|
||||
|
||||
double dht_get_start = 0, dht_get_end = 0;
|
||||
double dht_fill_start = 0, dht_fill_end = 0;
|
||||
double phreeqc_time_start = 0, phreeqc_time_end = 0;
|
||||
int phreeqc_count = 0;
|
||||
|
||||
// timing[0] -> phreeqc
|
||||
// timing[1] -> dht_get
|
||||
// timing[2] -> dht_fill
|
||||
double timing[3];
|
||||
timing[0] = 0.0;
|
||||
timing[1] = 0.0;
|
||||
timing[2] = 0.0;
|
||||
|
||||
// dht_perf[0] -> hits
|
||||
// dht_perf[1] -> miss
|
||||
// dht_perf[2] -> collisions
|
||||
uint64_t dht_perf[3];
|
||||
|
||||
if (params->dht_enabled) {
|
||||
dht_flags.resize(params->wp_size, true); // set size
|
||||
dht_flags.assign(params->wp_size,
|
||||
true); // assign all elements to true (default)
|
||||
dht_hits = 0;
|
||||
dht_miss = 0;
|
||||
dht_collision = 0;
|
||||
|
||||
// MDL: This code has now been moved to kin.cpp
|
||||
// /*Load significance vector from R setup file (or set default)*/
|
||||
// bool signif_vector_exists = R.parseEval("exists('signif_vector')");
|
||||
// if (signif_vector_exists)
|
||||
// {
|
||||
// dht_significant_digits_vector =
|
||||
// as<std::vector<int>>(R["signif_vector"]);
|
||||
// } else
|
||||
// {
|
||||
// dht_significant_digits_vector.assign(dht_object->key_size /
|
||||
// sizeof(double), dht_significant_digits);
|
||||
// }
|
||||
|
||||
// /*Load property type vector from R setup file (or set default)*/
|
||||
// bool prop_type_vector_exists = R.parseEval("exists('prop_type')");
|
||||
// if (prop_type_vector_exists)
|
||||
// {
|
||||
// prop_type_vector = as<std::vector<string>>(R["prop_type"]);
|
||||
// } else
|
||||
// {
|
||||
// prop_type_vector.assign(dht_object->key_size / sizeof(double),
|
||||
// "normal");
|
||||
// }
|
||||
}
|
||||
|
||||
// initialization of helper variables
|
||||
iteration = 0;
|
||||
dt = 0;
|
||||
current_sim_time = 0;
|
||||
local_work_package_size = 0;
|
||||
|
||||
/*worker loop*/
|
||||
while (1) {
|
||||
/*Wait for Message*/
|
||||
idle_a = MPI_Wtime();
|
||||
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
|
||||
idle_b = MPI_Wtime();
|
||||
|
||||
if (probe_status.MPI_TAG == TAG_WORK) { /* do work */
|
||||
|
||||
cummul_idle += idle_b - idle_a;
|
||||
|
||||
/* get number of doubles sent */
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
|
||||
|
||||
/* receive */
|
||||
MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
// decrement count of work_package by BUFFER_OFFSET
|
||||
count -= BUFFER_OFFSET;
|
||||
// check for changes on all additional variables given by the 'header' of
|
||||
// mpi_buffer
|
||||
if (mpi_buffer[count] != local_work_package_size) { // work_package_size
|
||||
local_work_package_size = mpi_buffer[count];
|
||||
R["work_package_size"] = local_work_package_size;
|
||||
R.parseEvalQ("mysetup$work_package_size <- work_package_size");
|
||||
}
|
||||
if (mpi_buffer[count + 1] !=
|
||||
iteration) { // current iteration of simulation
|
||||
iteration = mpi_buffer[count + 1];
|
||||
R["iter"] = iteration;
|
||||
R.parseEvalQ("mysetup$iter <- iter");
|
||||
}
|
||||
if (mpi_buffer[count + 2] != dt) { // current timestep size
|
||||
dt = mpi_buffer[count + 2];
|
||||
R["dt"] = dt;
|
||||
R.parseEvalQ("mysetup$dt <- dt");
|
||||
}
|
||||
if (mpi_buffer[count + 3] !=
|
||||
current_sim_time) { // current simulation time ('age' of simulation)
|
||||
current_sim_time = mpi_buffer[count + 3];
|
||||
R["simulation_time"] = current_sim_time;
|
||||
R.parseEvalQ("mysetup$simulation_time <- simulation_time");
|
||||
}
|
||||
/* 4th double value is currently a placeholder */
|
||||
// if (mpi_buffer[count+4] != placeholder) {
|
||||
// placeholder = mpi_buffer[count+4];
|
||||
// R["mysetup$placeholder"] = placeholder;
|
||||
// }
|
||||
|
||||
/* get df with right structure to fill in work package */
|
||||
// R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
|
||||
// R["skeleton"] = grid.buildDataFrame(work_package_size);
|
||||
//// R.parseEval("print(rownames(tmp2)[1:5])");
|
||||
//// R.parseEval("print(head(tmp2, 2))");
|
||||
//// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
|
||||
|
||||
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
|
||||
// R.setBufferDataFrame("skeleton");
|
||||
|
||||
if (params->dht_enabled) {
|
||||
// DEBUG
|
||||
// cout << "RANK " << world_rank << " start checking DHT\n";
|
||||
|
||||
// resize helper vector dht_flags of work_package_size changes
|
||||
if ((int)dht_flags.size() != local_work_package_size) {
|
||||
dht_flags.resize(local_work_package_size, true); // set size
|
||||
dht_flags.assign(local_work_package_size,
|
||||
true); // assign all elements to true (default)
|
||||
}
|
||||
|
||||
dht_get_start = MPI_Wtime();
|
||||
check_dht(local_work_package_size, dht_flags, mpi_buffer, dt, params);
|
||||
dht_get_end = MPI_Wtime();
|
||||
|
||||
// DEBUG
|
||||
// cout << "RANK " << world_rank << " checking DHT complete \n";
|
||||
|
||||
R["dht_flags"] = as<LogicalVector>(wrap(dht_flags));
|
||||
// R.parseEvalQ("print(head(dht_flags))");
|
||||
}
|
||||
|
||||
/* work */
|
||||
// R.from_C_domain(mpi_buffer);
|
||||
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
|
||||
// R["work_package_full"] = R.getBufferDataFrame();
|
||||
// R["work_package"] = buffer;
|
||||
|
||||
// DEBUG
|
||||
// R.parseEvalQ("print(head(work_package_full))");
|
||||
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
|
||||
|
||||
grid.importWP(mpi_buffer, params->wp_size);
|
||||
|
||||
if (params->dht_enabled) {
|
||||
R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
|
||||
} else {
|
||||
R.parseEvalQ("work_package <- work_package_full");
|
||||
}
|
||||
|
||||
// DEBUG
|
||||
// R.parseEvalQ("print(head(work_package),2)");
|
||||
|
||||
// R.parseEvalQ("rownames(work_package) <- work_package$id");
|
||||
// R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in%
|
||||
// colnames(work_package)"); R.parseEvalQ("id_store <-
|
||||
// rownames(work_package)"); //"[, ncol(work_package)]");
|
||||
// R.parseEvalQ("work_package$id <- NULL");
|
||||
R.parseEvalQ("work_package <- as.matrix(work_package)");
|
||||
|
||||
unsigned int nrows = R.parseEval("nrow(work_package)");
|
||||
|
||||
if (nrows > 0) {
|
||||
/*Single Line error Workaround*/
|
||||
if (nrows <= 1) {
|
||||
// duplicate line to enable correct simmulation
|
||||
R.parseEvalQ("work_package <- work_package[rep(1:nrow(work_package), "
|
||||
"times = 2), ]");
|
||||
}
|
||||
|
||||
phreeqc_count++;
|
||||
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
// MDL
|
||||
// R.parseEvalQ("print('Work_package:\n'); print(head(work_package ,
|
||||
// 2)); cat('RCpp: worker_function:', local_rank, ' \n')");
|
||||
R.parseEvalQ("result <- as.data.frame(slave_chemistry(setup=mysetup, "
|
||||
"data = work_package))");
|
||||
phreeqc_time_end = MPI_Wtime();
|
||||
// R.parseEvalQ("result$id <- id_store");
|
||||
} else {
|
||||
// cout << "Work-Package is empty, skipping phreeqc!" << endl;
|
||||
}
|
||||
|
||||
if (params->dht_enabled) {
|
||||
R.parseEvalQ("result_full <- work_package_full");
|
||||
if (nrows > 0)
|
||||
R.parseEvalQ("result_full[dht_flags,] <- result");
|
||||
} else {
|
||||
R.parseEvalQ("result_full <- result");
|
||||
}
|
||||
|
||||
// R.setBufferDataFrame("result_full");
|
||||
////Rcpp::DataFrame result = R.parseEval("result_full");
|
||||
////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
|
||||
// R.to_C_domain(mpi_buffer_results);
|
||||
|
||||
grid.exportWP(mpi_buffer_results);
|
||||
/* send results to master */
|
||||
MPI_Request send_req;
|
||||
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK,
|
||||
MPI_COMM_WORLD, &send_req);
|
||||
|
||||
if (params->dht_enabled) {
|
||||
dht_fill_start = MPI_Wtime();
|
||||
fill_dht(local_work_package_size, dht_flags, mpi_buffer,
|
||||
mpi_buffer_results, dt, params);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
timing[1] += dht_get_end - dht_get_start;
|
||||
timing[2] += dht_fill_end - dht_fill_start;
|
||||
}
|
||||
|
||||
timing[0] += phreeqc_time_end - phreeqc_time_start;
|
||||
|
||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||
|
||||
} else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */
|
||||
/* before death, submit profiling/timings to master*/
|
||||
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
// timings
|
||||
MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||
|
||||
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||
MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||
|
||||
if (params->dht_enabled) {
|
||||
// dht_perf
|
||||
dht_perf[0] = dht_hits;
|
||||
dht_perf[1] = dht_miss;
|
||||
dht_perf[2] = dht_collision;
|
||||
MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF,
|
||||
MPI_COMM_WORLD);
|
||||
}
|
||||
break;
|
||||
|
||||
} else if ((probe_status.MPI_TAG == TAG_DHT_STATS)) {
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_STATS, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
print_statistics();
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
|
||||
} else if ((probe_status.MPI_TAG == TAG_DHT_STORE)) {
|
||||
char *outdir;
|
||||
MPI_Get_count(&probe_status, MPI_CHAR, &count);
|
||||
outdir = (char *)calloc(count + 1, sizeof(char));
|
||||
MPI_Recv(outdir, count, MPI_CHAR, 0, TAG_DHT_STORE, MPI_COMM_WORLD,
|
||||
MPI_STATUS_IGNORE);
|
||||
int res = table_to_file((char *)outdir);
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (world_rank == 2)
|
||||
cerr << "CPP: Worker: Error in writing current state of DHT to file "
|
||||
"(TAG_DHT_STORE)"
|
||||
<< endl;
|
||||
} else {
|
||||
if (world_rank == 2)
|
||||
cout << "CPP: Worker: Successfully written DHT to file " << outdir
|
||||
<< endl;
|
||||
}
|
||||
free(outdir);
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
}
|
||||
}
|
||||
}
|
||||
18
src/worker.h
18
src/worker.h
@ -1,18 +0,0 @@
|
||||
#pragma once
|
||||
#include "util/SimParams.h"
|
||||
#include "util/RRuntime.h"
|
||||
#include "model/Grid.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace poet;
|
||||
/*Functions*/
|
||||
void worker_function(t_simparams *params);
|
||||
|
||||
|
||||
/*Globals*/
|
||||
#define TAG_WORK 42
|
||||
#define TAG_FINISH 43
|
||||
#define TAG_TIMING 44
|
||||
#define TAG_DHT_PERF 45
|
||||
#define TAG_DHT_STATS 46
|
||||
#define TAG_DHT_STORE 47
|
||||
Loading…
x
Reference in New Issue
Block a user