From a6e3aa4fe337e3629682569dc2624245318a6b8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbke?= Date: Thu, 17 Dec 2020 18:16:13 +0100 Subject: [PATCH] Declare new type SimParams as struct --- src/dht_wrapper.cpp | 220 ++++++++++++++++++++++--------------------- src/dht_wrapper.h | 57 ++++++----- src/kin.cpp | 215 +++++++++++++++++++++--------------------- src/util/SimParams.h | 26 +++++ src/worker.cpp | 26 +++-- src/worker.h | 3 +- 6 files changed, 294 insertions(+), 253 deletions(-) create mode 100644 src/util/SimParams.h diff --git a/src/dht_wrapper.cpp b/src/dht_wrapper.cpp index 832aea641..f03707c45 100644 --- a/src/dht_wrapper.cpp +++ b/src/dht_wrapper.cpp @@ -4,15 +4,15 @@ using namespace poet; /*init globals*/ -bool dht_enabled; -int dht_snaps; -int dht_strategy; -int dht_significant_digits; -std::string dht_file; -std::vector dht_significant_digits_vector; -std::vector prop_type_vector; -bool dht_logarithm; -uint64_t dht_size_per_process; +//bool dht_enabled; +//int dht_snaps; +//int dht_strategy; +//int dht_significant_digits; +//std::string dht_file; +//std::vector dht_significant_digits_vector; +//std::vector prop_type_vector; +//bool dht_logarithm; +//uint64_t dht_size_per_process; uint64_t dht_hits, dht_miss, dht_collision; RInside *R_DHT; std::vector dht_flags; @@ -20,26 +20,26 @@ DHT *dht_object; double *fuzzing_buffer; -bool dt_differ; +//bool dt_differ; /*functions*/ uint64_t get_md5(int key_size, void *key) { - MD5_CTX ctx; - unsigned char sum[MD5_DIGEST_LENGTH]; - uint64_t retval, *v1, *v2; + MD5_CTX ctx; + unsigned char sum[MD5_DIGEST_LENGTH]; + uint64_t retval, *v1, *v2; - MD5_Init(&ctx); - MD5_Update(&ctx, key, key_size); - MD5_Final(sum, &ctx); + MD5_Init(&ctx); + MD5_Update(&ctx, key, key_size); + MD5_Final(sum, &ctx); - v1 = (uint64_t *)&sum[0]; - v2 = (uint64_t *)&sum[8]; - retval = *v1 ^ *v2; + v1 = (uint64_t *)&sum[0]; + v2 = (uint64_t *)&sum[8]; + retval = *v1 ^ *v2; - return retval; + return retval; } -//double Round_off(double N, double n) { +// double Round_off(double N, double n) { // double result; // R["roundsig"] = n; // R["roundin"] = N; @@ -50,119 +50,127 @@ uint64_t get_md5(int key_size, void *key) { //} /* -* Stores fuzzed version of key in fuzzing_buffer -*/ -void fuzz_for_dht(int var_count, void *key, double dt) { - unsigned int i = 0; - //introduce fuzzing to allow more hits in DHT - for (i = 0; i < (unsigned int)var_count; i++) { - if (prop_type_vector[i] == "act") { - //with log10 - if (dht_logarithm) { - if (((double *)key)[i] < 0) - cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value at key!" << endl; - else if (((double *)key)[i] == 0) - fuzzing_buffer[i] = 0; - else - fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])), dht_significant_digits_vector[i]); - } else { - //without log10 - fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]); - } - } else if (prop_type_vector[i] == "logact") { - fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]); - } else if (prop_type_vector[i] == "ignore") { - fuzzing_buffer[i] = 0; - } else { - cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong prop_type!" << endl; - } + * Stores fuzzed version of key in fuzzing_buffer + */ +void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params) { + unsigned int i = 0; + // introduce fuzzing to allow more hits in DHT + for (i = 0; i < (unsigned int)var_count; i++) { + if (params->dht_prop_type_vector[i] == "act") { + // with log10 + if (params->dht_log) { + if (((double *)key)[i] < 0) + cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Negative value in " + "key!" + << endl; + else if (((double *)key)[i] == 0) + fuzzing_buffer[i] = 0; + else + fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])), + params->dht_signif_vector[i]); + } else { + // without log10 + fuzzing_buffer[i] = + ROUND((((double *)key)[i]), params->dht_signif_vector[i]); + } + } else if (params->dht_prop_type_vector[i] == "logact") { + fuzzing_buffer[i] = + ROUND((((double *)key)[i]), params->dht_signif_vector[i]); + } else if (params->dht_prop_type_vector[i] == "ignore") { + fuzzing_buffer[i] = 0; + } else { + cerr << "dht_wrapper.cpp::fuzz_for_dht(): Warning! Probably wrong " + "prop_type!" + << endl; } - if (dt_differ) - fuzzing_buffer[var_count] = dt; + } + if (params->dt_differ) + fuzzing_buffer[var_count] = dt; } -void check_dht(int length, std::vector &out_result_index, double *work_package, double dt) { - void *key; - int res; - int var_count = prop_type_vector.size(); +void check_dht(int length, std::vector &out_result_index, + double *work_package, double dt, t_simparams *params) { + void *key; + int res; + int var_count = params->dht_prop_type_vector.size(); + for (int i = 0; i < length; i++) { + key = (void *)&(work_package[i * var_count]); - for (int i = 0; i < length; i++) { - key = (void *)&(work_package[i * var_count]); + // fuzz data (round, logarithm etc.) + fuzz_for_dht(var_count, key, dt, params); - //fuzz data (round, logarithm etc.) - fuzz_for_dht(var_count, key, dt); + // overwrite input with data from DHT, IF value is found in DHT + res = DHT_read(dht_object, fuzzing_buffer, key); - //overwrite input with data from DHT, IF value is found in DHT - res = DHT_read(dht_object, fuzzing_buffer, key); - - if (res == DHT_SUCCESS) { - //flag that this line is replaced by DHT-value, do not simulate!! - out_result_index[i] = false; - dht_hits++; - } else if (res == DHT_READ_ERROR) { - //this line is untouched, simulation is needed - out_result_index[i] = true; - dht_miss++; - } else { - //MPI ERROR ... WHAT TO DO NOW? - //RUNNING CIRCLES WHILE SCREAMING - } + if (res == DHT_SUCCESS) { + // flag that this line is replaced by DHT-value, do not simulate!! + out_result_index[i] = false; + dht_hits++; + } else if (res == DHT_READ_ERROR) { + // this line is untouched, simulation is needed + out_result_index[i] = true; + dht_miss++; + } else { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING } + } } -void fill_dht(int length, std::vector &result_index, double *work_package, double *results, double dt) { - void *key; - void *data; - int res; - int var_count = prop_type_vector.size(); +void fill_dht(int length, std::vector &result_index, double *work_package, + double *results, double dt, t_simparams *params) { + void *key; + void *data; + int res; + int var_count = params->dht_prop_type_vector.size(); - for (int i = 0; i < length; i++) { - key = (void *)&(work_package[i * var_count]); - data = (void *)&(results[i * var_count]); + for (int i = 0; i < length; i++) { + key = (void *)&(work_package[i * var_count]); + data = (void *)&(results[i * var_count]); - if (result_index[i]) { - //If true -> was simulated, needs to be inserted into dht + if (result_index[i]) { + // If true -> was simulated, needs to be inserted into dht - //fuzz data (round, logarithm etc.) - fuzz_for_dht(var_count, key, dt); + // fuzz data (round, logarithm etc.) + fuzz_for_dht(var_count, key, dt, params); - res = DHT_write(dht_object, fuzzing_buffer, data); + res = DHT_write(dht_object, fuzzing_buffer, data); - if (res != DHT_SUCCESS) { - if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) { - dht_collision++; - } else { - //MPI ERROR ... WHAT TO DO NOW? - //RUNNING CIRCLES WHILE SCREAMING - } - } + if (res != DHT_SUCCESS) { + if (res == DHT_WRITE_SUCCESS_WITH_COLLISION) { + dht_collision++; + } else { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING } + } } + } } void print_statistics() { - int res; + int res; - res = DHT_print_statistics(dht_object); + res = DHT_print_statistics(dht_object); - if (res != DHT_SUCCESS) { - //MPI ERROR ... WHAT TO DO NOW? - //RUNNING CIRCLES WHILE SCREAMING - } + if (res != DHT_SUCCESS) { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING + } } int table_to_file(char *filename) { - int res = DHT_to_file(dht_object, filename); - return res; + int res = DHT_to_file(dht_object, filename); + return res; } int file_to_table(char *filename) { - int res = DHT_from_file(dht_object, filename); - if (res != DHT_SUCCESS) - return res; + int res = DHT_from_file(dht_object, filename); + if (res != DHT_SUCCESS) + return res; - DHT_print_statistics(dht_object); + DHT_print_statistics(dht_object); - return DHT_SUCCESS; + return DHT_SUCCESS; } diff --git a/src/dht_wrapper.h b/src/dht_wrapper.h index 2d16c06dc..f6e6cb7f5 100644 --- a/src/dht_wrapper.h +++ b/src/dht_wrapper.h @@ -1,50 +1,55 @@ #pragma once +#include "DHT.h" #include "util/RRuntime.h" +#include "util/SimParams.h" +#include #include #include -#include -#include "DHT.h" using namespace std; using namespace poet; /*Functions*/ -uint64_t get_md5(int key_size, void* key); -void fuzz_for_dht(int var_count, void *key, double dt); -void check_dht(int length, std::vector &out_result_index, double *work_package, double dt); -void fill_dht(int length, std::vector &result_index, double *work_package, double *results, double dt); +uint64_t get_md5(int key_size, void *key); +void fuzz_for_dht(int var_count, void *key, double dt, t_simparams *params); +void check_dht(int length, std::vector &out_result_index, + double *work_package, double dt, t_simparams *params); +void fill_dht(int length, std::vector &result_index, double *work_package, + double *results, double dt, t_simparams *params); void print_statistics(); -int table_to_file(char* filename); -int file_to_table(char* filename); +int table_to_file(char *filename); +int file_to_table(char *filename); /*globals*/ -extern bool dht_enabled; -extern int dht_snaps; -extern std::string dht_file; -extern bool dt_differ; +//extern bool dht_enabled; +//extern int dht_snaps; +//extern std::string dht_file; +//extern bool dt_differ; -//Default DHT Size per process in Byte (defaults to 1 GiB) +// Default DHT Size per process in Byte (defaults to 1 GiB) #define DHT_SIZE_PER_PROCESS 1073741824 -//sets default dht access and distribution strategy +// sets default dht access and distribution strategy #define DHT_STRATEGY 0 // 0 -> DHT is on workers, access from workers only -// 1 -> DHT is on workers + master, access from master only !NOT IMPLEMENTED YET! +// 1 -> DHT is on workers + master, access from master only !NOT IMPLEMENTED +// YET! -#define ROUND(value,signif) (((int) (pow(10.0, (double) signif) * value)) * pow(10.0, (double) -signif)) +#define ROUND(value, signif) \ + (((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif)) -extern int dht_strategy; -extern int dht_significant_digits; -extern std::vector dht_significant_digits_vector; -extern std::vector prop_type_vector; -extern bool dht_logarithm; -extern uint64_t dht_size_per_process; +//extern int dht_strategy; +//extern int dht_significant_digits; +//extern std::vector dht_significant_digits_vector; +//extern std::vector prop_type_vector; +//extern bool dht_logarithm; +//extern uint64_t dht_size_per_process; -//global DHT object, can be NULL if not initialized, check strategy -extern DHT* dht_object; +// global DHT object, can be NULL if not initialized, check strategy +extern DHT *dht_object; -//DHT Performance counter +// DHT Performance counter extern uint64_t dht_hits, dht_miss, dht_collision; -extern double* fuzzing_buffer; +extern double *fuzzing_buffer; extern std::vector dht_flags; diff --git a/src/kin.cpp b/src/kin.cpp index 9b2e15ada..cdddb1ad0 100644 --- a/src/kin.cpp +++ b/src/kin.cpp @@ -11,9 +11,10 @@ #include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced #include "dht_wrapper.h" #include "global_buffer.h" -#include "util/RRuntime.h" -#include "worker.h" #include "model/Grid.h" +#include "util/RRuntime.h" +#include "util/SimParams.h" +#include "worker.h" using namespace std; using namespace poet; @@ -100,41 +101,40 @@ int main(int argc, char *argv[]) { double sim_e_chemistry, sim_f_chemistry; argh::parser cmdl(argv); - + int dht_significant_digits; // cout << "CPP: Start Init (MPI)" << endl; + t_simparams params; + MPI_Init(&argc, &argv); - int world_size; - MPI_Comm_size(MPI_COMM_WORLD, &world_size); + MPI_Comm_size(MPI_COMM_WORLD, &(params.world_size)); - int world_rank; - MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + MPI_Comm_rank(MPI_COMM_WORLD, &(params.world_rank)); /*Create custom Communicator with all processes except 0 (the master) for DHT * storage*/ // only needed if strategy == 0, but done anyway - MPI_Group group_world; - MPI_Group dht_group; + MPI_Group dht_group, group_world; MPI_Comm dht_comm; int *process_ranks; // make a list of processes in the new communicator - process_ranks = (int *)malloc(world_size * sizeof(int)); - for (int I = 1; I < world_size; I++) + process_ranks = (int *)malloc(params.world_size * sizeof(int)); + for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I; // get the group under MPI_COMM_WORLD MPI_Comm_group(MPI_COMM_WORLD, &group_world); // create the new group - MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group); + MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group); // create the new communicator MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm); free(process_ranks); // cleanup // cout << "Done"; if (cmdl[{"help", "h"}]) { - if (world_rank == 0) { + if (params.world_rank == 0) { cout << "Todo" << endl << "See README.md for further information." << endl; } @@ -144,7 +144,7 @@ int main(int argc, char *argv[]) { /*INIT is now done separately in an R file provided here as argument!*/ if (!cmdl(2)) { - if (world_rank == 0) { + if (params.world_rank == 0) { cerr << "ERROR. Kin needs 2 positional arguments: " << endl << "1) the R script defining your simulation and" << endl << "2) the directory prefix where to save results and profiling" @@ -156,7 +156,7 @@ int main(int argc, char *argv[]) { std::list optionsError = checkOptions(cmdl); if (!optionsError.empty()) { - if (world_rank == 0) { + if (params.world_rank == 0) { cerr << "Unrecognized option(s):\n" << endl; for (auto option : optionsError) { cerr << option << endl; @@ -168,61 +168,61 @@ int main(int argc, char *argv[]) { } /*Parse DHT arguments*/ - dht_enabled = cmdl["dht"]; + params.dht_enabled = cmdl["dht"]; // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n'; - if (dht_enabled) { - cmdl("dht-strategy", 0) >> dht_strategy; + if (params.dht_enabled) { + cmdl("dht-strategy", 0) >> params.dht_strategy; // cout << "CPP: DHT strategy is " << dht_strategy << endl; cmdl("dht-signif", 5) >> dht_significant_digits; // cout << "CPP: DHT significant digits = " << dht_significant_digits << // endl; - dht_logarithm = cmdl["dht-log"]; + params.dht_log = cmdl["dht-log"]; // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON" // : "OFF" ) << endl; - cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> dht_size_per_process; + cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process; // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process << // endl; - cmdl("dht-snaps", 0) >> dht_snaps; + cmdl("dht-snaps", 0) >> params.dht_snaps; - cmdl("dht-file") >> dht_file; + cmdl("dht-file") >> params.dht_file; } /*Parse work package size*/ - cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> work_package_size; + cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size; /*Parse output options*/ store_result = !cmdl["ignore-result"]; - if (world_rank == 0) { + if (params.world_rank == 0) { cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF") << endl; - cout << "CPP: Work Package Size: " << work_package_size << endl; - cout << "CPP: DHT is " << (dht_enabled ? "ON" : "OFF") << '\n'; + cout << "CPP: Work Package Size: " << params.wp_size << endl; + cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n'; - if (dht_enabled) { - cout << "CPP: DHT strategy is " << dht_strategy << endl; + if (params.dht_enabled) { + cout << "CPP: DHT strategy is " << params.dht_strategy << endl; cout << "CPP: DHT key default digits (ignored if 'signif_vector' is " "defined) = " << dht_significant_digits << endl; cout << "CPP: DHT logarithm before rounding: " - << (dht_logarithm ? "ON" : "OFF") << endl; - cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process - << endl; - cout << "CPP: DHT save snapshots is " << dht_snaps << endl; - cout << "CPP: DHT load file is " << dht_file << endl; + << (params.dht_log ? "ON" : "OFF") << endl; + cout << "CPP: DHT size per process (Byte) = " + << params.dht_size_per_process << endl; + cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl; + cout << "CPP: DHT load file is " << params.dht_file << endl; } } - cout << "CPP: R Init (RInside) on process " << world_rank << endl; + cout << "CPP: R Init (RInside) on process " << params.world_rank << endl; RRuntime R(argc, argv); // if local_rank == 0 then master else worker - R["local_rank"] = world_rank; + R["local_rank"] = params.world_rank; /*Loading Dependencies*/ std::string r_load_dependencies = "suppressMessages(library(Rmufits));" @@ -236,11 +236,10 @@ int main(int argc, char *argv[]) { R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim' R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns - std::string out_dir; - if (world_rank == + if (params.world_rank == 0) { // only rank 0 initializes goes through the whole initialization - cmdl(2) >> out_dir; // <- second positional argument - R["fileout"] = wrap(out_dir); // assign a char* (string) to 'fileout' + cmdl(2) >> params.out_dir; // <- second positional argument + R["fileout"] = wrap(params.out_dir); // assign a char* (string) to 'fileout' // Note: R::sim_init() checks if the directory already exists, // if not it makes it @@ -252,15 +251,17 @@ int main(int argc, char *argv[]) { std::string master_init_code = "mysetup <- master_init(setup=setup)"; R.parseEval(master_init_code); - dt_differ = R.parseEval("mysetup$dt_differ"); - MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); + params.dt_differ = + R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper + MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); } else { // workers will only read the setup DataFrame defined by input file R.parseEval("mysetup <- setup"); - MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); + MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); } - if (world_rank == 0) { - cout << "CPP: R init done on process with rank " << world_rank << endl; + if (params.world_rank == 0) { + cout << "CPP: R init done on process with rank " << params.world_rank + << endl; } // initialize chemistry on all processes @@ -270,63 +271,64 @@ int main(int argc, char *argv[]) { Grid grid(R); grid.init(); /* Retrieve state_C from R context for MPI buffer generation */ - //Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C"); + // Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C"); /* Init Parallel helper functions */ - R["n_procs"] = world_size - 1; /* worker count */ - R["work_package_size"] = work_package_size; + R["n_procs"] = params.world_size - 1; /* worker count */ + R["work_package_size"] = params.wp_size; // Removed additional field for ID in previous versions - if (world_rank == 0) { + if (params.world_rank == 0) { mpi_buffer = (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); } else { mpi_buffer = (double *)calloc( - (work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); + (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); mpi_buffer_results = - (double *)calloc(work_package_size * (grid.getCols()), sizeof(double)); + (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double)); } - if (world_rank == 0) { + if (params.world_rank == 0) { cout << "CPP: parallel init completed (buffers allocated)!" << endl; } // MDL: pass to R the DHT stuff (basically, only for storing of // simulation parameters). These 2 variables are always defined: - R["dht_enabled"] = dht_enabled; - R["dht_log"] = dht_logarithm; + R["dht_enabled"] = params.dht_enabled; + R["dht_log"] = params.dht_log; - if (dht_enabled) { + if (params.dht_enabled) { // cout << "\nCreating DHT\n"; // determine size of dht entries int dht_data_size = grid.getCols() * sizeof(double); int dht_key_size = - grid.getCols() * sizeof(double) + (dt_differ * sizeof(double)); + grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double)); // determine bucket count for preset memory usage // bucket size is key + value + 1 byte for status int dht_buckets_per_process = - dht_size_per_process / (1 + dht_data_size + dht_key_size); + params.dht_size_per_process / (1 + dht_data_size + dht_key_size); // MDL : following code moved here from worker.cpp /*Load significance vector from R setup file (or set default)*/ bool signif_vector_exists = R.parseEval("exists('signif_vector')"); if (signif_vector_exists) { - dht_significant_digits_vector = as>(R["signif_vector"]); + params.dht_signif_vector = as>(R["signif_vector"]); } else { - dht_significant_digits_vector.assign( - dht_object->key_size / sizeof(double), dht_significant_digits); + params.dht_signif_vector.assign(dht_object->key_size / sizeof(double), + dht_significant_digits); } /*Load property type vector from R setup file (or set default)*/ bool prop_type_vector_exists = R.parseEval("exists('prop_type')"); if (prop_type_vector_exists) { - prop_type_vector = as>(R["prop_type"]); + params.dht_prop_type_vector = as>(R["prop_type"]); } else { - prop_type_vector.assign(dht_object->key_size / sizeof(double), "act"); + params.dht_prop_type_vector.assign(dht_object->key_size / sizeof(double), + "act"); } - if (world_rank == 0) { + if (params.world_rank == 0) { // print only on master, values are equal on all workes cout << "CPP: dht_data_size: " << dht_data_size << "\n"; cout << "CPP: dht_key_size: " << dht_key_size << "\n"; @@ -345,12 +347,12 @@ int main(int argc, char *argv[]) { // MDL: pass to R the DHT stuff. These variables exist // only if dht_enabled is true - R["dht_final_signif"] = dht_significant_digits_vector; - R["dht_final_proptype"] = prop_type_vector; + R["dht_final_signif"] = params.dht_signif_vector; + R["dht_final_proptype"] = params.dht_prop_type_vector; } - if (dht_strategy == 0) { - if (world_rank != 0) { + if (params.dht_strategy == 0) { + if (params.world_rank != 0) { dht_object = DHT_create(dht_comm, dht_buckets_per_process, dht_data_size, dht_key_size, get_md5); @@ -362,20 +364,20 @@ int main(int argc, char *argv[]) { dht_data_size, dht_key_size, get_md5); } - if (world_rank == 0) { + if (params.world_rank == 0) { cout << "CPP: DHT successfully created!" << endl; } } // MDL: store all parameters - if (world_rank == 0) { + if (params.world_rank == 0) { cout << "CPP: Calling R Function to store calling parameters" << endl; R.parseEvalQ("StoreSetup(setup=mysetup)"); } MPI_Barrier(MPI_COMM_WORLD); - if (world_rank == 0) { /* This is executed by the master */ + if (params.world_rank == 0) { /* This is executed by the master */ Rcpp::NumericVector master_send; Rcpp::NumericVector master_recv; @@ -383,7 +385,7 @@ int main(int argc, char *argv[]) { sim_a_seq = MPI_Wtime(); worker_struct *workerlist = - (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); + (worker_struct *)calloc(params.world_size - 1, sizeof(worker_struct)); int need_to_receive; MPI_Status probe_status; double *timings; @@ -394,7 +396,7 @@ int main(int argc, char *argv[]) { // a temporary send buffer double *send_buffer; send_buffer = (double *)calloc( - (work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); + (params.wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); // helper variables int iteration; @@ -440,7 +442,7 @@ int main(int argc, char *argv[]) { /*Fallback for sequential execution*/ sim_b_chemistry = MPI_Wtime(); - if (world_size == 1) { + if (params.world_size == 1) { // MDL : the transformation of values into pH and pe // takes now place in master_advection() so the // following line is unneeded @@ -473,7 +475,7 @@ int main(int argc, char *argv[]) { // R.parseEval("tmp <- // shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)"); // Rcpp::DataFrame chemistry_data = R.parseEval("tmp"); - + // convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data); // cout << "CPP: shuffle_field() done" << endl; grid.shuffleAndExport(mpi_buffer); @@ -482,7 +484,7 @@ int main(int argc, char *argv[]) { int pkg_to_send = n_wp; int pkg_to_recv = n_wp; size_t colCount = grid.getCols(); - int free_workers = world_size - 1; + int free_workers = params.world_size - 1; double *work_pointer = mpi_buffer; sim_c_chemistry = MPI_Wtime(); @@ -524,7 +526,7 @@ int main(int argc, char *argv[]) { if (pkg_to_send > 0) { master_send_a = MPI_Wtime(); /*search for free workers and send work*/ - for (int p = 0; p < world_size - 1; p++) { + for (int p = 0; p < params.world_size - 1; p++) { if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ { @@ -614,12 +616,13 @@ int main(int argc, char *argv[]) { cummul_workers += sim_d_chemistry - sim_c_chemistry; // convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data); - //R.from_C_domain(mpi_buffer); + // R.from_C_domain(mpi_buffer); - //R["chemistry_data"] = R.getBufferDataFrame(); + // R["chemistry_data"] = R.getBufferDataFrame(); ///* unshuffle results */ - //R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)"); + // R.parseEval("result <- unshuffle_field(chemistry_data, + // ordered_ids)"); grid.importAndUnshuffle(mpi_buffer); /* do master stuff */ @@ -644,18 +647,18 @@ int main(int argc, char *argv[]) { << endl << endl; - if (dht_enabled) { - for (int i = 1; i < world_size; i++) { + if (params.dht_enabled) { + for (int i = 1; i < params.world_size; i++) { MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD); } MPI_Barrier(MPI_COMM_WORLD); - if (dht_snaps == 2) { + if (params.dht_snaps == 2) { std::stringstream outfile; - outfile << out_dir << "/iter_" << std::setfill('0') << std::setw(3) - << iter << ".dht"; - for (int i = 1; i < world_size; i++) { + outfile << params.out_dir << "/iter_" << std::setfill('0') + << std::setw(3) << iter << ".dht"; + for (int i = 1; i < params.world_size; i++) { MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i, TAG_DHT_STORE, MPI_COMM_WORLD); } @@ -675,11 +678,11 @@ int main(int argc, char *argv[]) { sim_end = MPI_Wtime(); - if (dht_enabled && dht_snaps > 0) { + if (params.dht_enabled && params.dht_snaps > 0) { cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl; std::string outfile; - outfile = out_dir + ".dht"; - for (int i = 1; i < world_size; i++) { + outfile = params.out_dir + ".dht"; + for (int i = 1; i < params.world_size; i++) { MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE, MPI_COMM_WORLD); } @@ -697,7 +700,7 @@ int main(int argc, char *argv[]) { timings = (double *)calloc(3, sizeof(double)); - if (dht_enabled) { + if (params.dht_enabled) { dht_hits = 0; dht_miss = 0; dht_collision = 0; @@ -706,7 +709,7 @@ int main(int argc, char *argv[]) { double idle_worker_tmp; - for (int p = 0; p < world_size - 1; p++) { + for (int p = 0; p < params.world_size - 1; p++) { /* ATTENTION Worker p has rank p+1 */ /* Send termination message to worker */ MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD); @@ -723,7 +726,7 @@ int main(int argc, char *argv[]) { MPI_COMM_WORLD, MPI_STATUS_IGNORE); idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1)); - if (dht_enabled) { + if (params.dht_enabled) { dht_get_time.push_back(timings[1], "w" + to_string(p + 1)); dht_fill_time.push_back(timings[2], "w" + to_string(p + 1)); @@ -770,7 +773,7 @@ int main(int argc, char *argv[]) { R["phreeqc_count"] = phreeqc_counts; R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count"); - if (dht_enabled) { + if (params.dht_enabled) { R["dht_hits"] = dht_hits; R.parseEvalQ("profiling$dht_hits <- dht_hits"); R["dht_miss"] = dht_miss; @@ -786,47 +789,47 @@ int main(int argc, char *argv[]) { free(workerlist); free(timings); - if (dht_enabled) + if (params.dht_enabled) free(dht_perfs); - cout << "CPP: Done! Results are stored as R objects into <" << out_dir - << "/timings.rds>" << endl; + cout << "CPP: Done! Results are stored as R objects into <" + << params.out_dir << "/timings.rds>" << endl; /*exporting results and profiling data*/ std::string r_vis_code; r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));"; R.parseEval(r_vis_code); } else { /*This is executed by the workers*/ - if (!dht_file.empty()) { - int res = file_to_table((char *)dht_file.c_str()); + if (!params.dht_file.empty()) { + int res = file_to_table((char *)params.dht_file.c_str()); if (res != DHT_SUCCESS) { if (res == DHT_WRONG_FILE) { - if (world_rank == 2) + if (params.world_rank == 2) cerr << "CPP: Worker: Wrong File" << endl; } else { - if (world_rank == 2) + if (params.world_rank == 2) cerr << "CPP: Worker: Error in loading current state of DHT from " "file" << endl; } return EXIT_FAILURE; } else { - if (world_rank == 2) + if (params.world_rank == 2) cout << "CPP: Worker: Successfully loaded state of DHT from file " - << dht_file << endl; + << params.dht_file << endl; std::cout.flush(); } } - worker_function(R, grid); + worker_function(R, grid, ¶ms); free(mpi_buffer_results); } - cout << "CPP: finished, cleanup of process " << world_rank << endl; + cout << "CPP: finished, cleanup of process " << params.world_rank << endl; - if (dht_enabled) { + if (params.dht_enabled) { - if (dht_strategy == 0) { - if (world_rank != 0) { + if (params.dht_strategy == 0) { + if (params.world_rank != 0) { DHT_free(dht_object, NULL, NULL); } } else { @@ -837,7 +840,7 @@ int main(int argc, char *argv[]) { free(mpi_buffer); MPI_Finalize(); - if (world_rank == 0) { + if (params.world_rank == 0) { cout << "CPP: done, bye!" << endl; } diff --git a/src/util/SimParams.h b/src/util/SimParams.h new file mode 100644 index 000000000..f54d864cd --- /dev/null +++ b/src/util/SimParams.h @@ -0,0 +1,26 @@ +#ifndef SIMPARAMS_H +#define SIMPARAMS_H + +#include +#include + +typedef struct { + int world_size; + int world_rank; + + bool dht_enabled; + bool dht_log; + bool dt_differ; + int dht_snaps; + int dht_strategy; + unsigned int dht_size_per_process; + std::vector dht_signif_vector; + std::vector dht_prop_type_vector; + std::string dht_file; + + unsigned int wp_size; + + std::string out_dir; +} t_simparams; + +#endif // SIMPARAMS_H diff --git a/src/worker.cpp b/src/worker.cpp index d53fc5d5d..0bc36dfb6 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -10,7 +10,7 @@ using namespace poet; using namespace Rcpp; -void worker_function(RRuntime &R, Grid &grid) { +void worker_function(RRuntime &R, Grid &grid, t_simparams *params) { int world_rank; MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Status probe_status; @@ -41,9 +41,9 @@ void worker_function(RRuntime &R, Grid &grid) { // dht_perf[2] -> collisions uint64_t dht_perf[3]; - if (dht_enabled) { - dht_flags.resize(work_package_size, true); // set size - dht_flags.assign(work_package_size, + if (params->dht_enabled) { + dht_flags.resize(params->wp_size, true); // set size + dht_flags.assign(params->wp_size, true); // assign all elements to true (default) dht_hits = 0; dht_miss = 0; @@ -140,7 +140,7 @@ void worker_function(RRuntime &R, Grid &grid) { ////Rcpp::DataFrame buffer = R.parseEval("tmp2"); // R.setBufferDataFrame("skeleton"); - if (dht_enabled) { + if (params->dht_enabled) { // DEBUG // cout << "RANK " << world_rank << " start checking DHT\n"; @@ -152,7 +152,7 @@ void worker_function(RRuntime &R, Grid &grid) { } dht_get_start = MPI_Wtime(); - check_dht(local_work_package_size, dht_flags, mpi_buffer, dt); + check_dht(local_work_package_size, dht_flags, mpi_buffer, dt, params); dht_get_end = MPI_Wtime(); // DEBUG @@ -162,7 +162,6 @@ void worker_function(RRuntime &R, Grid &grid) { // R.parseEvalQ("print(head(dht_flags))"); } - /* work */ // R.from_C_domain(mpi_buffer); ////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer); @@ -173,9 +172,9 @@ void worker_function(RRuntime &R, Grid &grid) { // R.parseEvalQ("print(head(work_package_full))"); // R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )"); - grid.importWP(mpi_buffer, work_package_size); + grid.importWP(mpi_buffer, params->wp_size); - if (dht_enabled) { + if (params->dht_enabled) { R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); } else { R.parseEvalQ("work_package <- work_package_full"); @@ -215,7 +214,7 @@ void worker_function(RRuntime &R, Grid &grid) { // cout << "Work-Package is empty, skipping phreeqc!" << endl; } - if (dht_enabled) { + if (params->dht_enabled) { R.parseEvalQ("result_full <- work_package_full"); if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result"); @@ -234,10 +233,10 @@ void worker_function(RRuntime &R, Grid &grid) { MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, &send_req); - if (dht_enabled) { + if (params->dht_enabled) { dht_fill_start = MPI_Wtime(); fill_dht(local_work_package_size, dht_flags, mpi_buffer, - mpi_buffer_results, dt); + mpi_buffer_results, dt, params); dht_fill_end = MPI_Wtime(); timing[1] += dht_get_end - dht_get_start; @@ -248,7 +247,6 @@ void worker_function(RRuntime &R, Grid &grid) { MPI_Wait(&send_req, MPI_STATUS_IGNORE); - } else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */ /* before death, submit profiling/timings to master*/ @@ -261,7 +259,7 @@ void worker_function(RRuntime &R, Grid &grid) { MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD); MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); - if (dht_enabled) { + if (params->dht_enabled) { // dht_perf dht_perf[0] = dht_hits; dht_perf[1] = dht_miss; diff --git a/src/worker.h b/src/worker.h index 349333811..435cfef7d 100644 --- a/src/worker.h +++ b/src/worker.h @@ -1,11 +1,12 @@ #pragma once +#include "util/SimParams.h" #include "util/RRuntime.h" #include "model/Grid.h" using namespace std; using namespace poet; /*Functions*/ -void worker_function(RRuntime &R, Grid &grid); +void worker_function(RRuntime &R, Grid &grid, t_simparams *params); /*Globals*/