mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
Declare new type SimParams as struct
This commit is contained in:
parent
41b15e14e8
commit
0b3583e59f
@ -4,15 +4,15 @@
|
|||||||
using namespace poet;
|
using namespace poet;
|
||||||
|
|
||||||
/*init globals*/
|
/*init globals*/
|
||||||
bool dht_enabled;
|
//bool dht_enabled;
|
||||||
int dht_snaps;
|
//int dht_snaps;
|
||||||
int dht_strategy;
|
//int dht_strategy;
|
||||||
int dht_significant_digits;
|
//int dht_significant_digits;
|
||||||
std::string dht_file;
|
//std::string dht_file;
|
||||||
std::vector<int> dht_significant_digits_vector;
|
//std::vector<int> dht_significant_digits_vector;
|
||||||
std::vector<string> prop_type_vector;
|
//std::vector<string> prop_type_vector;
|
||||||
bool dht_logarithm;
|
//bool dht_logarithm;
|
||||||
uint64_t dht_size_per_process;
|
//uint64_t dht_size_per_process;
|
||||||
uint64_t dht_hits, dht_miss, dht_collision;
|
uint64_t dht_hits, dht_miss, dht_collision;
|
||||||
RInside *R_DHT;
|
RInside *R_DHT;
|
||||||
std::vector<bool> dht_flags;
|
std::vector<bool> dht_flags;
|
||||||
@ -20,26 +20,26 @@ DHT *dht_object;
|
|||||||
|
|
||||||
double *fuzzing_buffer;
|
double *fuzzing_buffer;
|
||||||
|
|
||||||
bool dt_differ;
|
//bool dt_differ;
|
||||||
/*functions*/
|
/*functions*/
|
||||||
|
|
||||||
uint64_t get_md5(int key_size, void *key) {
|
uint64_t get_md5(int key_size, void *key) {
|
||||||
MD5_CTX ctx;
|
MD5_CTX ctx;
|
||||||
unsigned char sum[MD5_DIGEST_LENGTH];
|
unsigned char sum[MD5_DIGEST_LENGTH];
|
||||||
uint64_t retval, *v1, *v2;
|
uint64_t retval, *v1, *v2;
|
||||||
|
|
||||||
MD5_Init(&ctx);
|
MD5_Init(&ctx);
|
||||||
MD5_Update(&ctx, key, key_size);
|
MD5_Update(&ctx, key, key_size);
|
||||||
MD5_Final(sum, &ctx);
|
MD5_Final(sum, &ctx);
|
||||||
|
|
||||||
v1 = (uint64_t *)&sum[0];
|
v1 = (uint64_t *)&sum[0];
|
||||||
v2 = (uint64_t *)&sum[8];
|
v2 = (uint64_t *)&sum[8];
|
||||||
retval = *v1 ^ *v2;
|
retval = *v1 ^ *v2;
|
||||||
|
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
//double Round_off(double N, double n) {
|
// double Round_off(double N, double n) {
|
||||||
// double result;
|
// double result;
|
||||||
// R["roundsig"] = n;
|
// R["roundsig"] = n;
|
||||||
// R["roundin"] = N;
|
// R["roundin"] = N;
|
||||||
@ -50,119 +50,127 @@ uint64_t get_md5(int key_size, void *key) {
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Stores fuzzed version of key in fuzzing_buffer
|
* Stores fuzzed version of key in fuzzing_buffer
|
||||||
*/
|
*/
|
||||||
void fuzz_for_dht(int var_count, void *key, double dt) {
|
void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params) {
|
||||||
unsigned int i = 0;
|
unsigned int i = 0;
|
||||||
//introduce fuzzing to allow more hits in DHT
|
// introduce fuzzing to allow more hits in DHT
|
||||||
for (i = 0; i < (unsigned int)var_count; i++) {
|
for (i = 0; i < (unsigned int)var_count; i++) {
|
||||||
if (prop_type_vector[i] == "act") {
|
if (params->dht_prop_type_vector[i] == "act") {
|
||||||
//with log10
|
// with log10
|
||||||
if (dht_logarithm) {
|
if (params->dht_log) {
|
||||||
if (((double *)key)[i] < 0)
|
if (((double *)key)[i] < 0)
|
||||||
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value at key!" << endl;
|
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in "
|
||||||
else if (((double *)key)[i] == 0)
|
"key!"
|
||||||
fuzzing_buffer[i] = 0;
|
<< endl;
|
||||||
else
|
else if (((double *)key)[i] == 0)
|
||||||
fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])), dht_significant_digits_vector[i]);
|
fuzzing_buffer[i] = 0;
|
||||||
} else {
|
else
|
||||||
//without log10
|
fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])),
|
||||||
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]);
|
params->dht_signif_vector[i]);
|
||||||
}
|
} else {
|
||||||
} else if (prop_type_vector[i] == "logact") {
|
// without log10
|
||||||
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]);
|
fuzzing_buffer[i] =
|
||||||
} else if (prop_type_vector[i] == "ignore") {
|
ROUND((((double *)key)[i]), params->dht_signif_vector[i]);
|
||||||
fuzzing_buffer[i] = 0;
|
}
|
||||||
} else {
|
} else if (params->dht_prop_type_vector[i] == "logact") {
|
||||||
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong prop_type!" << endl;
|
fuzzing_buffer[i] =
|
||||||
}
|
ROUND((((double *)key)[i]), params->dht_signif_vector[i]);
|
||||||
|
} else if (params->dht_prop_type_vector[i] == "ignore") {
|
||||||
|
fuzzing_buffer[i] = 0;
|
||||||
|
} else {
|
||||||
|
cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong "
|
||||||
|
"prop_type!"
|
||||||
|
<< endl;
|
||||||
}
|
}
|
||||||
if (dt_differ)
|
}
|
||||||
fuzzing_buffer[var_count] = dt;
|
if (params->dt_differ)
|
||||||
|
fuzzing_buffer[var_count] = dt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void check_dht(int length, std::vector<bool> &out_result_index, double *work_package, double dt) {
|
void check_dht(int length, std::vector<bool> &out_result_index,
|
||||||
void *key;
|
double *work_package, double dt, t_simparams *params) {
|
||||||
int res;
|
void *key;
|
||||||
int var_count = prop_type_vector.size();
|
int res;
|
||||||
|
int var_count = params->dht_prop_type_vector.size();
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
key = (void *)&(work_package[i * var_count]);
|
||||||
|
|
||||||
for (int i = 0; i < length; i++) {
|
// fuzz data (round, logarithm etc.)
|
||||||
key = (void *)&(work_package[i * var_count]);
|
fuzz_for_dht(var_count, key, dt, params);
|
||||||
|
|
||||||
//fuzz data (round, logarithm etc.)
|
// overwrite input with data from DHT, IF value is found in DHT
|
||||||
fuzz_for_dht(var_count, key, dt);
|
res = DHT_read(dht_object, fuzzing_buffer, key);
|
||||||
|
|
||||||
//overwrite input with data from DHT, IF value is found in DHT
|
if (res == DHT_SUCCESS) {
|
||||||
res = DHT_read(dht_object, fuzzing_buffer, key);
|
// flag that this line is replaced by DHT-value, do not simulate!!
|
||||||
|
out_result_index[i] = false;
|
||||||
if (res == DHT_SUCCESS) {
|
dht_hits++;
|
||||||
//flag that this line is replaced by DHT-value, do not simulate!!
|
} else if (res == DHT_READ_ERROR) {
|
||||||
out_result_index[i] = false;
|
// this line is untouched, simulation is needed
|
||||||
dht_hits++;
|
out_result_index[i] = true;
|
||||||
} else if (res == DHT_READ_ERROR) {
|
dht_miss++;
|
||||||
//this line is untouched, simulation is needed
|
} else {
|
||||||
out_result_index[i] = true;
|
// MPI ERROR ... WHAT TO DO NOW?
|
||||||
dht_miss++;
|
// RUNNING CIRCLES WHILE SCREAMING
|
||||||
} else {
|
|
||||||
//MPI ERROR ... WHAT TO DO NOW?
|
|
||||||
//RUNNING CIRCLES WHILE SCREAMING
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void fill_dht(int length, std::vector<bool> &result_index, double *work_package, double *results, double dt) {
|
void fill_dht(int length, std::vector<bool> &result_index, double *work_package,
|
||||||
void *key;
|
double *results, double dt, t_simparams *params) {
|
||||||
void *data;
|
void *key;
|
||||||
int res;
|
void *data;
|
||||||
int var_count = prop_type_vector.size();
|
int res;
|
||||||
|
int var_count = params->dht_prop_type_vector.size();
|
||||||
|
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
key = (void *)&(work_package[i * var_count]);
|
key = (void *)&(work_package[i * var_count]);
|
||||||
data = (void *)&(results[i * var_count]);
|
data = (void *)&(results[i * var_count]);
|
||||||
|
|
||||||
if (result_index[i]) {
|
if (result_index[i]) {
|
||||||
//If true -> was simulated, needs to be inserted into dht
|
// If true -> was simulated, needs to be inserted into dht
|
||||||
|
|
||||||
//fuzz data (round, logarithm etc.)
|
// fuzz data (round, logarithm etc.)
|
||||||
fuzz_for_dht(var_count, key, dt);
|
fuzz_for_dht(var_count, key, dt, params);
|
||||||
|
|
||||||
res = DHT_write(dht_object, fuzzing_buffer, data);
|
res = DHT_write(dht_object, fuzzing_buffer, data);
|
||||||
|
|
||||||
if (res != DHT_SUCCESS) {
|
if (res != DHT_SUCCESS) {
|
||||||
if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) {
|
if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) {
|
||||||
dht_collision++;
|
dht_collision++;
|
||||||
} else {
|
} else {
|
||||||
//MPI ERROR ... WHAT TO DO NOW?
|
// MPI ERROR ... WHAT TO DO NOW?
|
||||||
//RUNNING CIRCLES WHILE SCREAMING
|
// RUNNING CIRCLES WHILE SCREAMING
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void print_statistics() {
|
void print_statistics() {
|
||||||
int res;
|
int res;
|
||||||
|
|
||||||
res = DHT_print_statistics(dht_object);
|
res = DHT_print_statistics(dht_object);
|
||||||
|
|
||||||
if (res != DHT_SUCCESS) {
|
if (res != DHT_SUCCESS) {
|
||||||
//MPI ERROR ... WHAT TO DO NOW?
|
// MPI ERROR ... WHAT TO DO NOW?
|
||||||
//RUNNING CIRCLES WHILE SCREAMING
|
// RUNNING CIRCLES WHILE SCREAMING
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int table_to_file(char *filename) {
|
int table_to_file(char *filename) {
|
||||||
int res = DHT_to_file(dht_object, filename);
|
int res = DHT_to_file(dht_object, filename);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
int file_to_table(char *filename) {
|
int file_to_table(char *filename) {
|
||||||
|
|
||||||
int res = DHT_from_file(dht_object, filename);
|
int res = DHT_from_file(dht_object, filename);
|
||||||
if (res != DHT_SUCCESS)
|
if (res != DHT_SUCCESS)
|
||||||
return res;
|
return res;
|
||||||
|
|
||||||
DHT_print_statistics(dht_object);
|
DHT_print_statistics(dht_object);
|
||||||
|
|
||||||
return DHT_SUCCESS;
|
return DHT_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,50 +1,55 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
#include "DHT.h"
|
||||||
#include "util/RRuntime.h"
|
#include "util/RRuntime.h"
|
||||||
|
#include "util/SimParams.h"
|
||||||
|
#include <math.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <math.h>
|
|
||||||
#include "DHT.h"
|
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace poet;
|
using namespace poet;
|
||||||
|
|
||||||
/*Functions*/
|
/*Functions*/
|
||||||
uint64_t get_md5(int key_size, void* key);
|
uint64_t get_md5(int key_size, void *key);
|
||||||
void fuzz_for_dht(int var_count, void *key, double dt);
|
void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params);
|
||||||
void check_dht(int length, std::vector<bool> &out_result_index, double *work_package, double dt);
|
void check_dht(int length, std::vector<bool> &out_result_index,
|
||||||
void fill_dht(int length, std::vector<bool> &result_index, double *work_package, double *results, double dt);
|
double *work_package, double dt, t_simparams *params);
|
||||||
|
void fill_dht(int length, std::vector<bool> &result_index, double *work_package,
|
||||||
|
double *results, double dt, t_simparams *params);
|
||||||
void print_statistics();
|
void print_statistics();
|
||||||
int table_to_file(char* filename);
|
int table_to_file(char *filename);
|
||||||
int file_to_table(char* filename);
|
int file_to_table(char *filename);
|
||||||
|
|
||||||
/*globals*/
|
/*globals*/
|
||||||
extern bool dht_enabled;
|
//extern bool dht_enabled;
|
||||||
extern int dht_snaps;
|
//extern int dht_snaps;
|
||||||
extern std::string dht_file;
|
//extern std::string dht_file;
|
||||||
extern bool dt_differ;
|
//extern bool dt_differ;
|
||||||
|
|
||||||
//Default DHT Size per process in Byte (defaults to 1 GiB)
|
// Default DHT Size per process in Byte (defaults to 1 GiB)
|
||||||
#define DHT_SIZE_PER_PROCESS 1073741824
|
#define DHT_SIZE_PER_PROCESS 1073741824
|
||||||
|
|
||||||
//sets default dht access and distribution strategy
|
// sets default dht access and distribution strategy
|
||||||
#define DHT_STRATEGY 0
|
#define DHT_STRATEGY 0
|
||||||
// 0 -> DHT is on workers, access from workers only
|
// 0 -> DHT is on workers, access from workers only
|
||||||
// 1 -> DHT is on workers + master, access from master only !NOT IMPLEMENTED YET!
|
// 1 -> DHT is on workers + master, access from master only !NOT IMPLEMENTED
|
||||||
|
// YET!
|
||||||
|
|
||||||
#define ROUND(value,signif) (((int) (pow(10.0, (double) signif) * value)) * pow(10.0, (double) -signif))
|
#define ROUND(value, signif) \
|
||||||
|
(((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif))
|
||||||
|
|
||||||
extern int dht_strategy;
|
//extern int dht_strategy;
|
||||||
extern int dht_significant_digits;
|
//extern int dht_significant_digits;
|
||||||
extern std::vector<int> dht_significant_digits_vector;
|
//extern std::vector<int> dht_significant_digits_vector;
|
||||||
extern std::vector<string> prop_type_vector;
|
//extern std::vector<string> prop_type_vector;
|
||||||
extern bool dht_logarithm;
|
//extern bool dht_logarithm;
|
||||||
extern uint64_t dht_size_per_process;
|
//extern uint64_t dht_size_per_process;
|
||||||
|
|
||||||
//global DHT object, can be NULL if not initialized, check strategy
|
// global DHT object, can be NULL if not initialized, check strategy
|
||||||
extern DHT* dht_object;
|
extern DHT *dht_object;
|
||||||
|
|
||||||
//DHT Performance counter
|
// DHT Performance counter
|
||||||
extern uint64_t dht_hits, dht_miss, dht_collision;
|
extern uint64_t dht_hits, dht_miss, dht_collision;
|
||||||
|
|
||||||
extern double* fuzzing_buffer;
|
extern double *fuzzing_buffer;
|
||||||
extern std::vector<bool> dht_flags;
|
extern std::vector<bool> dht_flags;
|
||||||
|
|||||||
215
src/kin.cpp
215
src/kin.cpp
@ -11,9 +11,10 @@
|
|||||||
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
|
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
|
||||||
#include "dht_wrapper.h"
|
#include "dht_wrapper.h"
|
||||||
#include "global_buffer.h"
|
#include "global_buffer.h"
|
||||||
#include "util/RRuntime.h"
|
|
||||||
#include "worker.h"
|
|
||||||
#include "model/Grid.h"
|
#include "model/Grid.h"
|
||||||
|
#include "util/RRuntime.h"
|
||||||
|
#include "util/SimParams.h"
|
||||||
|
#include "worker.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace poet;
|
using namespace poet;
|
||||||
@ -100,41 +101,40 @@ int main(int argc, char *argv[]) {
|
|||||||
double sim_e_chemistry, sim_f_chemistry;
|
double sim_e_chemistry, sim_f_chemistry;
|
||||||
|
|
||||||
argh::parser cmdl(argv);
|
argh::parser cmdl(argv);
|
||||||
|
int dht_significant_digits;
|
||||||
// cout << "CPP: Start Init (MPI)" << endl;
|
// cout << "CPP: Start Init (MPI)" << endl;
|
||||||
|
|
||||||
|
t_simparams params;
|
||||||
|
|
||||||
MPI_Init(&argc, &argv);
|
MPI_Init(&argc, &argv);
|
||||||
|
|
||||||
int world_size;
|
MPI_Comm_size(MPI_COMM_WORLD, &(params.world_size));
|
||||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
|
||||||
|
|
||||||
int world_rank;
|
MPI_Comm_rank(MPI_COMM_WORLD, &(params.world_rank));
|
||||||
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
|
||||||
|
|
||||||
/*Create custom Communicator with all processes except 0 (the master) for DHT
|
/*Create custom Communicator with all processes except 0 (the master) for DHT
|
||||||
* storage*/
|
* storage*/
|
||||||
// only needed if strategy == 0, but done anyway
|
// only needed if strategy == 0, but done anyway
|
||||||
MPI_Group group_world;
|
MPI_Group dht_group, group_world;
|
||||||
MPI_Group dht_group;
|
|
||||||
MPI_Comm dht_comm;
|
MPI_Comm dht_comm;
|
||||||
int *process_ranks;
|
int *process_ranks;
|
||||||
|
|
||||||
// make a list of processes in the new communicator
|
// make a list of processes in the new communicator
|
||||||
process_ranks = (int *)malloc(world_size * sizeof(int));
|
process_ranks = (int *)malloc(params.world_size * sizeof(int));
|
||||||
for (int I = 1; I < world_size; I++)
|
for (int I = 1; I < params.world_size; I++)
|
||||||
process_ranks[I - 1] = I;
|
process_ranks[I - 1] = I;
|
||||||
|
|
||||||
// get the group under MPI_COMM_WORLD
|
// get the group under MPI_COMM_WORLD
|
||||||
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
|
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
|
||||||
// create the new group
|
// create the new group
|
||||||
MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group);
|
MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group);
|
||||||
// create the new communicator
|
// create the new communicator
|
||||||
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
|
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
|
||||||
free(process_ranks); // cleanup
|
free(process_ranks); // cleanup
|
||||||
// cout << "Done";
|
// cout << "Done";
|
||||||
|
|
||||||
if (cmdl[{"help", "h"}]) {
|
if (cmdl[{"help", "h"}]) {
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "Todo" << endl
|
cout << "Todo" << endl
|
||||||
<< "See README.md for further information." << endl;
|
<< "See README.md for further information." << endl;
|
||||||
}
|
}
|
||||||
@ -144,7 +144,7 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
/*INIT is now done separately in an R file provided here as argument!*/
|
/*INIT is now done separately in an R file provided here as argument!*/
|
||||||
if (!cmdl(2)) {
|
if (!cmdl(2)) {
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cerr << "ERROR. Kin needs 2 positional arguments: " << endl
|
cerr << "ERROR. Kin needs 2 positional arguments: " << endl
|
||||||
<< "1) the R script defining your simulation and" << endl
|
<< "1) the R script defining your simulation and" << endl
|
||||||
<< "2) the directory prefix where to save results and profiling"
|
<< "2) the directory prefix where to save results and profiling"
|
||||||
@ -156,7 +156,7 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
std::list<std::string> optionsError = checkOptions(cmdl);
|
std::list<std::string> optionsError = checkOptions(cmdl);
|
||||||
if (!optionsError.empty()) {
|
if (!optionsError.empty()) {
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cerr << "Unrecognized option(s):\n" << endl;
|
cerr << "Unrecognized option(s):\n" << endl;
|
||||||
for (auto option : optionsError) {
|
for (auto option : optionsError) {
|
||||||
cerr << option << endl;
|
cerr << option << endl;
|
||||||
@ -168,61 +168,61 @@ int main(int argc, char *argv[]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*Parse DHT arguments*/
|
/*Parse DHT arguments*/
|
||||||
dht_enabled = cmdl["dht"];
|
params.dht_enabled = cmdl["dht"];
|
||||||
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
|
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
cmdl("dht-strategy", 0) >> dht_strategy;
|
cmdl("dht-strategy", 0) >> params.dht_strategy;
|
||||||
// cout << "CPP: DHT strategy is " << dht_strategy << endl;
|
// cout << "CPP: DHT strategy is " << dht_strategy << endl;
|
||||||
|
|
||||||
cmdl("dht-signif", 5) >> dht_significant_digits;
|
cmdl("dht-signif", 5) >> dht_significant_digits;
|
||||||
// cout << "CPP: DHT significant digits = " << dht_significant_digits <<
|
// cout << "CPP: DHT significant digits = " << dht_significant_digits <<
|
||||||
// endl;
|
// endl;
|
||||||
|
|
||||||
dht_logarithm = cmdl["dht-log"];
|
params.dht_log = cmdl["dht-log"];
|
||||||
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
|
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
|
||||||
// : "OFF" ) << endl;
|
// : "OFF" ) << endl;
|
||||||
|
|
||||||
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> dht_size_per_process;
|
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process;
|
||||||
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
|
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
|
||||||
// endl;
|
// endl;
|
||||||
|
|
||||||
cmdl("dht-snaps", 0) >> dht_snaps;
|
cmdl("dht-snaps", 0) >> params.dht_snaps;
|
||||||
|
|
||||||
cmdl("dht-file") >> dht_file;
|
cmdl("dht-file") >> params.dht_file;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*Parse work package size*/
|
/*Parse work package size*/
|
||||||
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> work_package_size;
|
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size;
|
||||||
|
|
||||||
/*Parse output options*/
|
/*Parse output options*/
|
||||||
store_result = !cmdl["ignore-result"];
|
store_result = !cmdl["ignore-result"];
|
||||||
|
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF")
|
cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF")
|
||||||
<< endl;
|
<< endl;
|
||||||
cout << "CPP: Work Package Size: " << work_package_size << endl;
|
cout << "CPP: Work Package Size: " << params.wp_size << endl;
|
||||||
cout << "CPP: DHT is " << (dht_enabled ? "ON" : "OFF") << '\n';
|
cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n';
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
cout << "CPP: DHT strategy is " << dht_strategy << endl;
|
cout << "CPP: DHT strategy is " << params.dht_strategy << endl;
|
||||||
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
|
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
|
||||||
"defined) = "
|
"defined) = "
|
||||||
<< dht_significant_digits << endl;
|
<< dht_significant_digits << endl;
|
||||||
cout << "CPP: DHT logarithm before rounding: "
|
cout << "CPP: DHT logarithm before rounding: "
|
||||||
<< (dht_logarithm ? "ON" : "OFF") << endl;
|
<< (params.dht_log ? "ON" : "OFF") << endl;
|
||||||
cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process
|
cout << "CPP: DHT size per process (Byte) = "
|
||||||
<< endl;
|
<< params.dht_size_per_process << endl;
|
||||||
cout << "CPP: DHT save snapshots is " << dht_snaps << endl;
|
cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl;
|
||||||
cout << "CPP: DHT load file is " << dht_file << endl;
|
cout << "CPP: DHT load file is " << params.dht_file << endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cout << "CPP: R Init (RInside) on process " << world_rank << endl;
|
cout << "CPP: R Init (RInside) on process " << params.world_rank << endl;
|
||||||
RRuntime R(argc, argv);
|
RRuntime R(argc, argv);
|
||||||
|
|
||||||
// if local_rank == 0 then master else worker
|
// if local_rank == 0 then master else worker
|
||||||
R["local_rank"] = world_rank;
|
R["local_rank"] = params.world_rank;
|
||||||
|
|
||||||
/*Loading Dependencies*/
|
/*Loading Dependencies*/
|
||||||
std::string r_load_dependencies = "suppressMessages(library(Rmufits));"
|
std::string r_load_dependencies = "suppressMessages(library(Rmufits));"
|
||||||
@ -236,11 +236,10 @@ int main(int argc, char *argv[]) {
|
|||||||
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
|
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
|
||||||
R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns
|
R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns
|
||||||
|
|
||||||
std::string out_dir;
|
if (params.world_rank ==
|
||||||
if (world_rank ==
|
|
||||||
0) { // only rank 0 initializes goes through the whole initialization
|
0) { // only rank 0 initializes goes through the whole initialization
|
||||||
cmdl(2) >> out_dir; // <- second positional argument
|
cmdl(2) >> params.out_dir; // <- second positional argument
|
||||||
R["fileout"] = wrap(out_dir); // assign a char* (string) to 'fileout'
|
R["fileout"] = wrap(params.out_dir); // assign a char* (string) to 'fileout'
|
||||||
|
|
||||||
// Note: R::sim_init() checks if the directory already exists,
|
// Note: R::sim_init() checks if the directory already exists,
|
||||||
// if not it makes it
|
// if not it makes it
|
||||||
@ -252,15 +251,17 @@ int main(int argc, char *argv[]) {
|
|||||||
std::string master_init_code = "mysetup <- master_init(setup=setup)";
|
std::string master_init_code = "mysetup <- master_init(setup=setup)";
|
||||||
R.parseEval(master_init_code);
|
R.parseEval(master_init_code);
|
||||||
|
|
||||||
dt_differ = R.parseEval("mysetup$dt_differ");
|
params.dt_differ =
|
||||||
MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
|
||||||
|
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
||||||
} else { // workers will only read the setup DataFrame defined by input file
|
} else { // workers will only read the setup DataFrame defined by input file
|
||||||
R.parseEval("mysetup <- setup");
|
R.parseEval("mysetup <- setup");
|
||||||
MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "CPP: R init done on process with rank " << world_rank << endl;
|
cout << "CPP: R init done on process with rank " << params.world_rank
|
||||||
|
<< endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize chemistry on all processes
|
// initialize chemistry on all processes
|
||||||
@ -270,63 +271,64 @@ int main(int argc, char *argv[]) {
|
|||||||
Grid grid(R);
|
Grid grid(R);
|
||||||
grid.init();
|
grid.init();
|
||||||
/* Retrieve state_C from R context for MPI buffer generation */
|
/* Retrieve state_C from R context for MPI buffer generation */
|
||||||
//Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
|
// Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
|
||||||
|
|
||||||
/* Init Parallel helper functions */
|
/* Init Parallel helper functions */
|
||||||
R["n_procs"] = world_size - 1; /* worker count */
|
R["n_procs"] = params.world_size - 1; /* worker count */
|
||||||
R["work_package_size"] = work_package_size;
|
R["work_package_size"] = params.wp_size;
|
||||||
|
|
||||||
// Removed additional field for ID in previous versions
|
// Removed additional field for ID in previous versions
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
mpi_buffer =
|
mpi_buffer =
|
||||||
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
||||||
} else {
|
} else {
|
||||||
mpi_buffer = (double *)calloc(
|
mpi_buffer = (double *)calloc(
|
||||||
(work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
(params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
||||||
mpi_buffer_results =
|
mpi_buffer_results =
|
||||||
(double *)calloc(work_package_size * (grid.getCols()), sizeof(double));
|
(double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "CPP: parallel init completed (buffers allocated)!" << endl;
|
cout << "CPP: parallel init completed (buffers allocated)!" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
// MDL: pass to R the DHT stuff (basically, only for storing of
|
// MDL: pass to R the DHT stuff (basically, only for storing of
|
||||||
// simulation parameters). These 2 variables are always defined:
|
// simulation parameters). These 2 variables are always defined:
|
||||||
R["dht_enabled"] = dht_enabled;
|
R["dht_enabled"] = params.dht_enabled;
|
||||||
R["dht_log"] = dht_logarithm;
|
R["dht_log"] = params.dht_log;
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
// cout << "\nCreating DHT\n";
|
// cout << "\nCreating DHT\n";
|
||||||
// determine size of dht entries
|
// determine size of dht entries
|
||||||
int dht_data_size = grid.getCols() * sizeof(double);
|
int dht_data_size = grid.getCols() * sizeof(double);
|
||||||
int dht_key_size =
|
int dht_key_size =
|
||||||
grid.getCols() * sizeof(double) + (dt_differ * sizeof(double));
|
grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
|
||||||
|
|
||||||
// determine bucket count for preset memory usage
|
// determine bucket count for preset memory usage
|
||||||
// bucket size is key + value + 1 byte for status
|
// bucket size is key + value + 1 byte for status
|
||||||
int dht_buckets_per_process =
|
int dht_buckets_per_process =
|
||||||
dht_size_per_process / (1 + dht_data_size + dht_key_size);
|
params.dht_size_per_process / (1 + dht_data_size + dht_key_size);
|
||||||
|
|
||||||
// MDL : following code moved here from worker.cpp
|
// MDL : following code moved here from worker.cpp
|
||||||
/*Load significance vector from R setup file (or set default)*/
|
/*Load significance vector from R setup file (or set default)*/
|
||||||
bool signif_vector_exists = R.parseEval("exists('signif_vector')");
|
bool signif_vector_exists = R.parseEval("exists('signif_vector')");
|
||||||
if (signif_vector_exists) {
|
if (signif_vector_exists) {
|
||||||
dht_significant_digits_vector = as<std::vector<int>>(R["signif_vector"]);
|
params.dht_signif_vector = as<std::vector<int>>(R["signif_vector"]);
|
||||||
} else {
|
} else {
|
||||||
dht_significant_digits_vector.assign(
|
params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
|
||||||
dht_object->key_size / sizeof(double), dht_significant_digits);
|
dht_significant_digits);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*Load property type vector from R setup file (or set default)*/
|
/*Load property type vector from R setup file (or set default)*/
|
||||||
bool prop_type_vector_exists = R.parseEval("exists('prop_type')");
|
bool prop_type_vector_exists = R.parseEval("exists('prop_type')");
|
||||||
if (prop_type_vector_exists) {
|
if (prop_type_vector_exists) {
|
||||||
prop_type_vector = as<std::vector<string>>(R["prop_type"]);
|
params.dht_prop_type_vector = as<std::vector<string>>(R["prop_type"]);
|
||||||
} else {
|
} else {
|
||||||
prop_type_vector.assign(dht_object->key_size / sizeof(double), "act");
|
params.dht_prop_type_vector.assign(dht_object->key_size / sizeof(double),
|
||||||
|
"act");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
// print only on master, values are equal on all workes
|
// print only on master, values are equal on all workes
|
||||||
cout << "CPP: dht_data_size: " << dht_data_size << "\n";
|
cout << "CPP: dht_data_size: " << dht_data_size << "\n";
|
||||||
cout << "CPP: dht_key_size: " << dht_key_size << "\n";
|
cout << "CPP: dht_key_size: " << dht_key_size << "\n";
|
||||||
@ -345,12 +347,12 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
// MDL: pass to R the DHT stuff. These variables exist
|
// MDL: pass to R the DHT stuff. These variables exist
|
||||||
// only if dht_enabled is true
|
// only if dht_enabled is true
|
||||||
R["dht_final_signif"] = dht_significant_digits_vector;
|
R["dht_final_signif"] = params.dht_signif_vector;
|
||||||
R["dht_final_proptype"] = prop_type_vector;
|
R["dht_final_proptype"] = params.dht_prop_type_vector;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dht_strategy == 0) {
|
if (params.dht_strategy == 0) {
|
||||||
if (world_rank != 0) {
|
if (params.world_rank != 0) {
|
||||||
dht_object = DHT_create(dht_comm, dht_buckets_per_process,
|
dht_object = DHT_create(dht_comm, dht_buckets_per_process,
|
||||||
dht_data_size, dht_key_size, get_md5);
|
dht_data_size, dht_key_size, get_md5);
|
||||||
|
|
||||||
@ -362,20 +364,20 @@ int main(int argc, char *argv[]) {
|
|||||||
dht_data_size, dht_key_size, get_md5);
|
dht_data_size, dht_key_size, get_md5);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "CPP: DHT successfully created!" << endl;
|
cout << "CPP: DHT successfully created!" << endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MDL: store all parameters
|
// MDL: store all parameters
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "CPP: Calling R Function to store calling parameters" << endl;
|
cout << "CPP: Calling R Function to store calling parameters" << endl;
|
||||||
R.parseEvalQ("StoreSetup(setup=mysetup)");
|
R.parseEvalQ("StoreSetup(setup=mysetup)");
|
||||||
}
|
}
|
||||||
|
|
||||||
MPI_Barrier(MPI_COMM_WORLD);
|
MPI_Barrier(MPI_COMM_WORLD);
|
||||||
|
|
||||||
if (world_rank == 0) { /* This is executed by the master */
|
if (params.world_rank == 0) { /* This is executed by the master */
|
||||||
|
|
||||||
Rcpp::NumericVector master_send;
|
Rcpp::NumericVector master_send;
|
||||||
Rcpp::NumericVector master_recv;
|
Rcpp::NumericVector master_recv;
|
||||||
@ -383,7 +385,7 @@ int main(int argc, char *argv[]) {
|
|||||||
sim_a_seq = MPI_Wtime();
|
sim_a_seq = MPI_Wtime();
|
||||||
|
|
||||||
worker_struct *workerlist =
|
worker_struct *workerlist =
|
||||||
(worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
|
(worker_struct *)calloc(params.world_size - 1, sizeof(worker_struct));
|
||||||
int need_to_receive;
|
int need_to_receive;
|
||||||
MPI_Status probe_status;
|
MPI_Status probe_status;
|
||||||
double *timings;
|
double *timings;
|
||||||
@ -394,7 +396,7 @@ int main(int argc, char *argv[]) {
|
|||||||
// a temporary send buffer
|
// a temporary send buffer
|
||||||
double *send_buffer;
|
double *send_buffer;
|
||||||
send_buffer = (double *)calloc(
|
send_buffer = (double *)calloc(
|
||||||
(work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
(params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
|
||||||
|
|
||||||
// helper variables
|
// helper variables
|
||||||
int iteration;
|
int iteration;
|
||||||
@ -440,7 +442,7 @@ int main(int argc, char *argv[]) {
|
|||||||
/*Fallback for sequential execution*/
|
/*Fallback for sequential execution*/
|
||||||
sim_b_chemistry = MPI_Wtime();
|
sim_b_chemistry = MPI_Wtime();
|
||||||
|
|
||||||
if (world_size == 1) {
|
if (params.world_size == 1) {
|
||||||
// MDL : the transformation of values into pH and pe
|
// MDL : the transformation of values into pH and pe
|
||||||
// takes now place in master_advection() so the
|
// takes now place in master_advection() so the
|
||||||
// following line is unneeded
|
// following line is unneeded
|
||||||
@ -473,7 +475,7 @@ int main(int argc, char *argv[]) {
|
|||||||
// R.parseEval("tmp <-
|
// R.parseEval("tmp <-
|
||||||
// shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)");
|
// shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)");
|
||||||
// Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
|
// Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
|
||||||
|
|
||||||
// convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
|
// convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
|
||||||
// cout << "CPP: shuffle_field() done" << endl;
|
// cout << "CPP: shuffle_field() done" << endl;
|
||||||
grid.shuffleAndExport(mpi_buffer);
|
grid.shuffleAndExport(mpi_buffer);
|
||||||
@ -482,7 +484,7 @@ int main(int argc, char *argv[]) {
|
|||||||
int pkg_to_send = n_wp;
|
int pkg_to_send = n_wp;
|
||||||
int pkg_to_recv = n_wp;
|
int pkg_to_recv = n_wp;
|
||||||
size_t colCount = grid.getCols();
|
size_t colCount = grid.getCols();
|
||||||
int free_workers = world_size - 1;
|
int free_workers = params.world_size - 1;
|
||||||
double *work_pointer = mpi_buffer;
|
double *work_pointer = mpi_buffer;
|
||||||
sim_c_chemistry = MPI_Wtime();
|
sim_c_chemistry = MPI_Wtime();
|
||||||
|
|
||||||
@ -524,7 +526,7 @@ int main(int argc, char *argv[]) {
|
|||||||
if (pkg_to_send > 0) {
|
if (pkg_to_send > 0) {
|
||||||
master_send_a = MPI_Wtime();
|
master_send_a = MPI_Wtime();
|
||||||
/*search for free workers and send work*/
|
/*search for free workers and send work*/
|
||||||
for (int p = 0; p < world_size - 1; p++) {
|
for (int p = 0; p < params.world_size - 1; p++) {
|
||||||
if (workerlist[p].has_work == 0 &&
|
if (workerlist[p].has_work == 0 &&
|
||||||
pkg_to_send > 0) /* worker is free */ {
|
pkg_to_send > 0) /* worker is free */ {
|
||||||
|
|
||||||
@ -614,12 +616,13 @@ int main(int argc, char *argv[]) {
|
|||||||
cummul_workers += sim_d_chemistry - sim_c_chemistry;
|
cummul_workers += sim_d_chemistry - sim_c_chemistry;
|
||||||
|
|
||||||
// convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
|
// convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
|
||||||
//R.from_C_domain(mpi_buffer);
|
// R.from_C_domain(mpi_buffer);
|
||||||
|
|
||||||
//R["chemistry_data"] = R.getBufferDataFrame();
|
// R["chemistry_data"] = R.getBufferDataFrame();
|
||||||
|
|
||||||
///* unshuffle results */
|
///* unshuffle results */
|
||||||
//R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)");
|
// R.parseEval("result <- unshuffle_field(chemistry_data,
|
||||||
|
// ordered_ids)");
|
||||||
|
|
||||||
grid.importAndUnshuffle(mpi_buffer);
|
grid.importAndUnshuffle(mpi_buffer);
|
||||||
/* do master stuff */
|
/* do master stuff */
|
||||||
@ -644,18 +647,18 @@ int main(int argc, char *argv[]) {
|
|||||||
<< endl
|
<< endl
|
||||||
<< endl;
|
<< endl;
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
for (int i = 1; i < world_size; i++) {
|
for (int i = 1; i < params.world_size; i++) {
|
||||||
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD);
|
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
MPI_Barrier(MPI_COMM_WORLD);
|
MPI_Barrier(MPI_COMM_WORLD);
|
||||||
|
|
||||||
if (dht_snaps == 2) {
|
if (params.dht_snaps == 2) {
|
||||||
std::stringstream outfile;
|
std::stringstream outfile;
|
||||||
outfile << out_dir << "/iter_" << std::setfill('0') << std::setw(3)
|
outfile << params.out_dir << "/iter_" << std::setfill('0')
|
||||||
<< iter << ".dht";
|
<< std::setw(3) << iter << ".dht";
|
||||||
for (int i = 1; i < world_size; i++) {
|
for (int i = 1; i < params.world_size; i++) {
|
||||||
MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i,
|
MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i,
|
||||||
TAG_DHT_STORE, MPI_COMM_WORLD);
|
TAG_DHT_STORE, MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
@ -675,11 +678,11 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
sim_end = MPI_Wtime();
|
sim_end = MPI_Wtime();
|
||||||
|
|
||||||
if (dht_enabled && dht_snaps > 0) {
|
if (params.dht_enabled && params.dht_snaps > 0) {
|
||||||
cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl;
|
cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl;
|
||||||
std::string outfile;
|
std::string outfile;
|
||||||
outfile = out_dir + ".dht";
|
outfile = params.out_dir + ".dht";
|
||||||
for (int i = 1; i < world_size; i++) {
|
for (int i = 1; i < params.world_size; i++) {
|
||||||
MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE,
|
MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE,
|
||||||
MPI_COMM_WORLD);
|
MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
@ -697,7 +700,7 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
timings = (double *)calloc(3, sizeof(double));
|
timings = (double *)calloc(3, sizeof(double));
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
dht_hits = 0;
|
dht_hits = 0;
|
||||||
dht_miss = 0;
|
dht_miss = 0;
|
||||||
dht_collision = 0;
|
dht_collision = 0;
|
||||||
@ -706,7 +709,7 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
double idle_worker_tmp;
|
double idle_worker_tmp;
|
||||||
|
|
||||||
for (int p = 0; p < world_size - 1; p++) {
|
for (int p = 0; p < params.world_size - 1; p++) {
|
||||||
/* ATTENTION Worker p has rank p+1 */
|
/* ATTENTION Worker p has rank p+1 */
|
||||||
/* Send termination message to worker */
|
/* Send termination message to worker */
|
||||||
MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
|
MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
|
||||||
@ -723,7 +726,7 @@ int main(int argc, char *argv[]) {
|
|||||||
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
||||||
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
|
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
|
dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
|
||||||
dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
|
dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
|
||||||
|
|
||||||
@ -770,7 +773,7 @@ int main(int argc, char *argv[]) {
|
|||||||
R["phreeqc_count"] = phreeqc_counts;
|
R["phreeqc_count"] = phreeqc_counts;
|
||||||
R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
|
R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
R["dht_hits"] = dht_hits;
|
R["dht_hits"] = dht_hits;
|
||||||
R.parseEvalQ("profiling$dht_hits <- dht_hits");
|
R.parseEvalQ("profiling$dht_hits <- dht_hits");
|
||||||
R["dht_miss"] = dht_miss;
|
R["dht_miss"] = dht_miss;
|
||||||
@ -786,47 +789,47 @@ int main(int argc, char *argv[]) {
|
|||||||
free(workerlist);
|
free(workerlist);
|
||||||
free(timings);
|
free(timings);
|
||||||
|
|
||||||
if (dht_enabled)
|
if (params.dht_enabled)
|
||||||
free(dht_perfs);
|
free(dht_perfs);
|
||||||
|
|
||||||
cout << "CPP: Done! Results are stored as R objects into <" << out_dir
|
cout << "CPP: Done! Results are stored as R objects into <"
|
||||||
<< "/timings.rds>" << endl;
|
<< params.out_dir << "/timings.rds>" << endl;
|
||||||
/*exporting results and profiling data*/
|
/*exporting results and profiling data*/
|
||||||
|
|
||||||
std::string r_vis_code;
|
std::string r_vis_code;
|
||||||
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
|
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
|
||||||
R.parseEval(r_vis_code);
|
R.parseEval(r_vis_code);
|
||||||
} else { /*This is executed by the workers*/
|
} else { /*This is executed by the workers*/
|
||||||
if (!dht_file.empty()) {
|
if (!params.dht_file.empty()) {
|
||||||
int res = file_to_table((char *)dht_file.c_str());
|
int res = file_to_table((char *)params.dht_file.c_str());
|
||||||
if (res != DHT_SUCCESS) {
|
if (res != DHT_SUCCESS) {
|
||||||
if (res == DHT_WRONG_FILE) {
|
if (res == DHT_WRONG_FILE) {
|
||||||
if (world_rank == 2)
|
if (params.world_rank == 2)
|
||||||
cerr << "CPP: Worker: Wrong File" << endl;
|
cerr << "CPP: Worker: Wrong File" << endl;
|
||||||
} else {
|
} else {
|
||||||
if (world_rank == 2)
|
if (params.world_rank == 2)
|
||||||
cerr << "CPP: Worker: Error in loading current state of DHT from "
|
cerr << "CPP: Worker: Error in loading current state of DHT from "
|
||||||
"file"
|
"file"
|
||||||
<< endl;
|
<< endl;
|
||||||
}
|
}
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
} else {
|
} else {
|
||||||
if (world_rank == 2)
|
if (params.world_rank == 2)
|
||||||
cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
||||||
<< dht_file << endl;
|
<< params.dht_file << endl;
|
||||||
std::cout.flush();
|
std::cout.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
worker_function(R, grid);
|
worker_function(R, grid, ¶ms);
|
||||||
free(mpi_buffer_results);
|
free(mpi_buffer_results);
|
||||||
}
|
}
|
||||||
|
|
||||||
cout << "CPP: finished, cleanup of process " << world_rank << endl;
|
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
|
|
||||||
if (dht_strategy == 0) {
|
if (params.dht_strategy == 0) {
|
||||||
if (world_rank != 0) {
|
if (params.world_rank != 0) {
|
||||||
DHT_free(dht_object, NULL, NULL);
|
DHT_free(dht_object, NULL, NULL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -837,7 +840,7 @@ int main(int argc, char *argv[]) {
|
|||||||
free(mpi_buffer);
|
free(mpi_buffer);
|
||||||
MPI_Finalize();
|
MPI_Finalize();
|
||||||
|
|
||||||
if (world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
cout << "CPP: done, bye!" << endl;
|
cout << "CPP: done, bye!" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
26
src/util/SimParams.h
Normal file
26
src/util/SimParams.h
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
#ifndef SIMPARAMS_H
|
||||||
|
#define SIMPARAMS_H
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int world_size;
|
||||||
|
int world_rank;
|
||||||
|
|
||||||
|
bool dht_enabled;
|
||||||
|
bool dht_log;
|
||||||
|
bool dt_differ;
|
||||||
|
int dht_snaps;
|
||||||
|
int dht_strategy;
|
||||||
|
unsigned int dht_size_per_process;
|
||||||
|
std::vector<int> dht_signif_vector;
|
||||||
|
std::vector<std::string> dht_prop_type_vector;
|
||||||
|
std::string dht_file;
|
||||||
|
|
||||||
|
unsigned int wp_size;
|
||||||
|
|
||||||
|
std::string out_dir;
|
||||||
|
} t_simparams;
|
||||||
|
|
||||||
|
#endif // SIMPARAMS_H
|
||||||
@ -10,7 +10,7 @@
|
|||||||
using namespace poet;
|
using namespace poet;
|
||||||
using namespace Rcpp;
|
using namespace Rcpp;
|
||||||
|
|
||||||
void worker_function(RRuntime &R, Grid &grid) {
|
void worker_function(RRuntime &R, Grid &grid, t_simparams *params) {
|
||||||
int world_rank;
|
int world_rank;
|
||||||
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
||||||
MPI_Status probe_status;
|
MPI_Status probe_status;
|
||||||
@ -41,9 +41,9 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
// dht_perf[2] -> collisions
|
// dht_perf[2] -> collisions
|
||||||
uint64_t dht_perf[3];
|
uint64_t dht_perf[3];
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params->dht_enabled) {
|
||||||
dht_flags.resize(work_package_size, true); // set size
|
dht_flags.resize(params->wp_size, true); // set size
|
||||||
dht_flags.assign(work_package_size,
|
dht_flags.assign(params->wp_size,
|
||||||
true); // assign all elements to true (default)
|
true); // assign all elements to true (default)
|
||||||
dht_hits = 0;
|
dht_hits = 0;
|
||||||
dht_miss = 0;
|
dht_miss = 0;
|
||||||
@ -140,7 +140,7 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
|
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
|
||||||
// R.setBufferDataFrame("skeleton");
|
// R.setBufferDataFrame("skeleton");
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params->dht_enabled) {
|
||||||
// DEBUG
|
// DEBUG
|
||||||
// cout << "RANK " << world_rank << " start checking DHT\n";
|
// cout << "RANK " << world_rank << " start checking DHT\n";
|
||||||
|
|
||||||
@ -152,7 +152,7 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dht_get_start = MPI_Wtime();
|
dht_get_start = MPI_Wtime();
|
||||||
check_dht(local_work_package_size, dht_flags, mpi_buffer, dt);
|
check_dht(local_work_package_size, dht_flags, mpi_buffer, dt, params);
|
||||||
dht_get_end = MPI_Wtime();
|
dht_get_end = MPI_Wtime();
|
||||||
|
|
||||||
// DEBUG
|
// DEBUG
|
||||||
@ -162,7 +162,6 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
// R.parseEvalQ("print(head(dht_flags))");
|
// R.parseEvalQ("print(head(dht_flags))");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* work */
|
/* work */
|
||||||
// R.from_C_domain(mpi_buffer);
|
// R.from_C_domain(mpi_buffer);
|
||||||
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
|
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
|
||||||
@ -173,9 +172,9 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
// R.parseEvalQ("print(head(work_package_full))");
|
// R.parseEvalQ("print(head(work_package_full))");
|
||||||
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
|
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
|
||||||
|
|
||||||
grid.importWP(mpi_buffer, work_package_size);
|
grid.importWP(mpi_buffer, params->wp_size);
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params->dht_enabled) {
|
||||||
R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
|
R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
|
||||||
} else {
|
} else {
|
||||||
R.parseEvalQ("work_package <- work_package_full");
|
R.parseEvalQ("work_package <- work_package_full");
|
||||||
@ -215,7 +214,7 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
// cout << "Work-Package is empty, skipping phreeqc!" << endl;
|
// cout << "Work-Package is empty, skipping phreeqc!" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params->dht_enabled) {
|
||||||
R.parseEvalQ("result_full <- work_package_full");
|
R.parseEvalQ("result_full <- work_package_full");
|
||||||
if (nrows > 0)
|
if (nrows > 0)
|
||||||
R.parseEvalQ("result_full[dht_flags,] <- result");
|
R.parseEvalQ("result_full[dht_flags,] <- result");
|
||||||
@ -234,10 +233,10 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK,
|
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK,
|
||||||
MPI_COMM_WORLD, &send_req);
|
MPI_COMM_WORLD, &send_req);
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params->dht_enabled) {
|
||||||
dht_fill_start = MPI_Wtime();
|
dht_fill_start = MPI_Wtime();
|
||||||
fill_dht(local_work_package_size, dht_flags, mpi_buffer,
|
fill_dht(local_work_package_size, dht_flags, mpi_buffer,
|
||||||
mpi_buffer_results, dt);
|
mpi_buffer_results, dt, params);
|
||||||
dht_fill_end = MPI_Wtime();
|
dht_fill_end = MPI_Wtime();
|
||||||
|
|
||||||
timing[1] += dht_get_end - dht_get_start;
|
timing[1] += dht_get_end - dht_get_start;
|
||||||
@ -248,7 +247,6 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
|
|
||||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||||
|
|
||||||
|
|
||||||
} else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */
|
} else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */
|
||||||
/* before death, submit profiling/timings to master*/
|
/* before death, submit profiling/timings to master*/
|
||||||
|
|
||||||
@ -261,7 +259,7 @@ void worker_function(RRuntime &R, Grid &grid) {
|
|||||||
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
|
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||||
MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
|
MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
|
||||||
|
|
||||||
if (dht_enabled) {
|
if (params->dht_enabled) {
|
||||||
// dht_perf
|
// dht_perf
|
||||||
dht_perf[0] = dht_hits;
|
dht_perf[0] = dht_hits;
|
||||||
dht_perf[1] = dht_miss;
|
dht_perf[1] = dht_miss;
|
||||||
|
|||||||
@ -1,11 +1,12 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
#include "util/SimParams.h"
|
||||||
#include "util/RRuntime.h"
|
#include "util/RRuntime.h"
|
||||||
#include "model/Grid.h"
|
#include "model/Grid.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace poet;
|
using namespace poet;
|
||||||
/*Functions*/
|
/*Functions*/
|
||||||
void worker_function(RRuntime &R, Grid &grid);
|
void worker_function(RRuntime &R, Grid &grid, t_simparams *params);
|
||||||
|
|
||||||
|
|
||||||
/*Globals*/
|
/*Globals*/
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user