diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f0f865eef..d1b71ab99 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS}) #define program libraries -add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp) +add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp util/Parser.cpp model/TransportSim.cpp) target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS}) target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto) diff --git a/src/kin.cpp b/src/kin.cpp index 673b5c223..f67186f4a 100644 --- a/src/kin.cpp +++ b/src/kin.cpp @@ -12,70 +12,72 @@ // #include "global_buffer.h" #include "model/ChemSim.h" #include "model/Grid.h" +#include "util/Parser.h" #include "util/RRuntime.h" #include "util/SimParams.h" +#include "model/TransportSim.h" // #include "worker.h" -#define DHT_SIZE_PER_PROCESS 1073741824 +//#define DHT_SIZE_PER_PROCESS 1073741824 using namespace std; using namespace poet; using namespace Rcpp; -double *mpi_buffer; -double *mpi_buffer_results; +// double *mpi_buffer; +// double *mpi_buffer_results; -uint32_t work_package_size; -#define WORK_PACKAGE_SIZE_DEFAULT 5 +// uint32_t work_package_size; +// #define WORK_PACKAGE_SIZE_DEFAULT 5 -bool store_result; +// bool store_result; -std::set paramList() { - std::set options; - // global - options.insert("work-package-size"); - // only DHT - options.insert("dht-signif"); - options.insert("dht-strategy"); - options.insert("dht-size"); - options.insert("dht-snaps"); - options.insert("dht-file"); +// std::set paramList() { +// std::set options; +// // global +// options.insert("work-package-size"); +// // only DHT +// options.insert("dht-signif"); +// options.insert("dht-strategy"); +// options.insert("dht-size"); +// options.insert("dht-snaps"); +// options.insert("dht-file"); - return options; -} +// return options; +// } -std::set flagList() { - std::set options; - // global - options.insert("ignore-result"); - // only DHT - options.insert("dht"); - options.insert("dht-log"); +// std::set flagList() { +// std::set options; +// // global +// options.insert("ignore-result"); +// // only DHT +// options.insert("dht"); +// options.insert("dht-log"); - return options; -} +// return options; +// } -std::list checkOptions(argh::parser cmdl) { - std::list retList; - std::set flist = flagList(); - std::set plist = paramList(); +// std::list checkOptions(argh::parser cmdl) { +// std::list retList; +// std::set flist = flagList(); +// std::set plist = paramList(); - for (auto &flag : cmdl.flags()) { - if (!(flist.find(flag) != flist.end())) retList.push_back(flag); - } +// for (auto &flag : cmdl.flags()) { +// if (!(flist.find(flag) != flist.end())) retList.push_back(flag); +// } - for (auto ¶m : cmdl.params()) { - if (!(plist.find(param.first) != plist.end())) - retList.push_back(param.first); - } +// for (auto ¶m : cmdl.params()) { +// if (!(plist.find(param.first) != plist.end())) +// retList.push_back(param.first); +// } - return retList; -} +// return retList; +// } -typedef struct { - char has_work; - double *send_addr; -} worker_struct; +// typedef struct { +// char has_work; +// double *send_addr; +// } worker_struct; int main(int argc, char *argv[]) { double sim_start, sim_b_transport, sim_a_transport, sim_b_chemistry, @@ -83,35 +85,20 @@ int main(int argc, char *argv[]) { double cummul_transport = 0.f; double cummul_chemistry = 0.f; - double cummul_workers = 0.f; - double cummul_chemistry_master = 0.f; - double cummul_master_seq = 0.f; - double cummul_master_seq_loop = 0.f; - double master_idle = 0.f; - - double master_send_a, master_send_b; - double cummul_master_send = 0.f; - double master_recv_a, master_recv_b; - double cummul_master_recv = 0.f; - - double sim_a_seq, sim_b_seq, sim_c_seq, sim_d_seq; - double idle_a, idle_b; - - double sim_c_chemistry, sim_d_chemistry; - double sim_e_chemistry, sim_f_chemistry; argh::parser cmdl(argv); int dht_significant_digits; // cout << "CPP: Start Init (MPI)" << endl; t_simparams params; + int world_size, world_rank; MPI_Init(&argc, &argv); - MPI_Comm_size(MPI_COMM_WORLD, &(params.world_size)); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); - MPI_Comm_rank(MPI_COMM_WORLD, &(params.world_rank)); + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); /*Create custom Communicator with all processes except 0 (the master) for DHT * storage*/ @@ -121,109 +108,90 @@ int main(int argc, char *argv[]) { int *process_ranks; // make a list of processes in the new communicator - process_ranks = (int *)malloc(params.world_size * sizeof(int)); - for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I; + process_ranks = (int *)malloc(world_size * sizeof(int)); + for (int I = 1; I < world_size; I++) process_ranks[I - 1] = I; // get the group under MPI_COMM_WORLD MPI_Comm_group(MPI_COMM_WORLD, &group_world); // create the new group - MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group); + MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group); // create the new communicator MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm); free(process_ranks); // cleanup // cout << "Done"; - if (cmdl[{"help", "h"}]) { - if (params.world_rank == 0) { - cout << "Todo" << endl - << "See README.md for further information." << endl; - } - MPI_Finalize(); - return EXIT_SUCCESS; - } + // if (cmdl[{"help", "h"}]) { + // if (params.world_rank == 0) { + // cout << "Todo" << endl + // << "See README.md for further information." << endl; + // } + // MPI_Finalize(); + // return EXIT_SUCCESS; + // } - /*INIT is now done separately in an R file provided here as argument!*/ - if (!cmdl(2)) { - if (params.world_rank == 0) { - cerr << "ERROR. Kin needs 2 positional arguments: " << endl - << "1) the R script defining your simulation and" << endl - << "2) the directory prefix where to save results and profiling" - << endl; - } - MPI_Finalize(); - return EXIT_FAILURE; - } + // /*INIT is now done separately in an R file provided here as argument!*/ + // if (!cmdl(2)) { + // if (params.world_rank == 0) { + // cerr << "ERROR. Kin needs 2 positional arguments: " << endl + // << "1) the R script defining your simulation and" << endl + // << "2) the directory prefix where to save results and profiling" + // << endl; + // } + // MPI_Finalize(); + // return EXIT_FAILURE; + // } - std::list optionsError = checkOptions(cmdl); - if (!optionsError.empty()) { - if (params.world_rank == 0) { - cerr << "Unrecognized option(s):\n" << endl; - for (auto option : optionsError) { - cerr << option << endl; - } - cerr << "\nMake sure to use available options. Exiting!" << endl; - } - MPI_Finalize(); - return EXIT_FAILURE; - } + // std::list optionsError = checkOptions(cmdl); + // if (!optionsError.empty()) { + // if (params.world_rank == 0) { + // cerr << "Unrecognized option(s):\n" << endl; + // for (auto option : optionsError) { + // cerr << option << endl; + // } + // cerr << "\nMake sure to use available options. Exiting!" << endl; + // } + // MPI_Finalize(); + // return EXIT_FAILURE; + // } - /*Parse DHT arguments*/ - params.dht_enabled = cmdl["dht"]; - // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n'; + // /*Parse DHT arguments*/ + // params.dht_enabled = cmdl["dht"]; + // // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n'; - if (params.dht_enabled) { - cmdl("dht-strategy", 0) >> params.dht_strategy; - // cout << "CPP: DHT strategy is " << dht_strategy << endl; + // if (params.dht_enabled) { + // cmdl("dht-strategy", 0) >> params.dht_strategy; + // // cout << "CPP: DHT strategy is " << dht_strategy << endl; - cmdl("dht-signif", 5) >> dht_significant_digits; - // cout << "CPP: DHT significant digits = " << dht_significant_digits << - // endl; + // cmdl("dht-signif", 5) >> dht_significant_digits; + // // cout << "CPP: DHT significant digits = " << dht_significant_digits << + // // endl; - params.dht_log = !(cmdl["dht-nolog"]); - // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON" - // : "OFF" ) << endl; + // params.dht_log = !(cmdl["dht-nolog"]); + // // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? + // "ON" + // // : "OFF" ) << endl; - cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process; - // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process << - // endl; + // cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process; + // // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process + // << + // // endl; - cmdl("dht-snaps", 0) >> params.dht_snaps; + // cmdl("dht-snaps", 0) >> params.dht_snaps; - cmdl("dht-file") >> params.dht_file; - } + // cmdl("dht-file") >> params.dht_file; + // } - /*Parse work package size*/ - cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size; + // /*Parse work package size*/ + // cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size; - /*Parse output options*/ - store_result = !cmdl["ignore-result"]; + // /*Parse output options*/ + // store_result = !cmdl["ignore-result"]; - if (params.world_rank == 0) { - cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF") - << endl; - cout << "CPP: Work Package Size: " << params.wp_size << endl; - cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n'; - - if (params.dht_enabled) { - cout << "CPP: DHT strategy is " << params.dht_strategy << endl; - cout << "CPP: DHT key default digits (ignored if 'signif_vector' is " - "defined) = " - << dht_significant_digits << endl; - cout << "CPP: DHT logarithm before rounding: " - << (params.dht_log ? "ON" : "OFF") << endl; - cout << "CPP: DHT size per process (Byte) = " - << params.dht_size_per_process << endl; - cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl; - cout << "CPP: DHT load file is " << params.dht_file << endl; - } - } - - cout << "CPP: R Init (RInside) on process " << params.world_rank << endl; RRuntime R(argc, argv); - params.R = &R; + cout << "CPP: R Init (RInside) on process " << world_rank << endl; // if local_rank == 0 then master else worker - R["local_rank"] = params.world_rank; + // R["local_rank"] = params.world_rank; /*Loading Dependencies*/ std::string r_load_dependencies = @@ -233,37 +201,71 @@ int main(int argc, char *argv[]) { "source('parallel_r_library.R');"; R.parseEvalQ(r_load_dependencies); - std::string filesim; - cmdl(1) >> filesim; // <- first positional argument - R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim' - R.parseEvalQ( - "source(filesim)"); // eval the init string, ignoring any returns + Parser parser(argv, world_rank, world_size); + int pret = parser.parseCmdl(); - if (params.world_rank == - 0) { // only rank 0 initializes goes through the whole initialization - cmdl(2) >> params.out_dir; // <- second positional argument - R["fileout"] = - wrap(params.out_dir); // assign a char* (string) to 'fileout' + if (pret == PARSER_ERROR) { + MPI_Finalize(); + return EXIT_FAILURE; + } else if (pret == PARSER_HELP) { + MPI_Finalize(); + return EXIT_SUCCESS; + } + + parser.parseR(R); + params = parser.getParams(); + + // if (params.world_rank == 0) { + // cout << "CPP: Complete results storage is " << (params.store_result ? "ON" : + // "OFF") + // << endl; + // cout << "CPP: Work Package Size: " << params.wp_size << endl; + // cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n'; + + // if (params.dht_enabled) { + // cout << "CPP: DHT strategy is " << params.dht_strategy << endl; + // // cout << "CPP: DHT key default digits (ignored if 'signif_vector' is " + // // "defined) = " + // // << dht_significant_digits << endl; + // cout << "CPP: DHT logarithm before rounding: " + // << (params.dht_log ? "ON" : "OFF") << endl; + // cout << "CPP: DHT size per process (Byte) = " + // << params.dht_size_per_process << endl; + // cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl; + // cout << "CPP: DHT load file is " << params.dht_file << endl; + // } + // } + + // std::string filesim; + // cmdl(1) >> filesim; // <- first positional argument + // R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim' + // R.parseEvalQ( + // "source(filesim)"); // eval the init string, ignoring any returns + + // only rank 0 initializes goes through the whole initialization + if (world_rank == 0) { + // cmdl(2) >> params.out_dir; // <- second positional argument + // R["fileout"] = + // wrap(params.out_dir); // assign a char* (string) to 'fileout' // Note: R::sim_init() checks if the directory already exists, // if not it makes it // pass the boolean "store_result" to the R process - R["store_result"] = store_result; + // R["store_result"] = store_result; // get timestep vector from grid_init function ... std::string master_init_code = "mysetup <- master_init(setup=setup)"; R.parseEval(master_init_code); - params.dt_differ = - R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper + params.dt_differ = R.parseEval("mysetup$dt_differ"); MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); } else { // workers will only read the setup DataFrame defined by input file R.parseEval("mysetup <- setup"); MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); } - if (params.world_rank == 0) { + if (world_rank == 0) { cout << "CPP: R init done on process with rank " << params.world_rank << endl; } @@ -279,8 +281,8 @@ int main(int argc, char *argv[]) { // Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C"); /* Init Parallel helper functions */ - R["n_procs"] = params.world_size - 1; /* worker count */ - R["work_package_size"] = params.wp_size; + // R["n_procs"] = params.world_size - 1; /* worker count */ + // R["work_package_size"] = params.wp_size; // Removed additional field for ID in previous versions // if (params.world_rank == 0) { @@ -293,21 +295,24 @@ int main(int argc, char *argv[]) { // (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double)); // } - if (params.world_rank == 0) { - cout << "CPP: parallel init completed (buffers allocated)!" << endl; - } + // if (params.world_rank == 0) { + // cout << "CPP: parallel init completed (buffers allocated)!" << endl; + // } // MDL: pass to R the DHT stuff (basically, only for storing of // simulation parameters). These 2 variables are always defined: - R["dht_enabled"] = params.dht_enabled; - R["dht_log"] = params.dht_log; + // R["dht_enabled"] = params.dht_enabled; + // R["dht_log"] = params.dht_log; + + params.R = &R; if (params.dht_enabled) { // cout << "\nCreating DHT\n"; // determine size of dht entries // int dht_data_size = grid.getCols() * sizeof(double); // int dht_key_size = - // grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double)); + // grid.getCols() * sizeof(double) + (params.dt_differ * + // sizeof(double)); // // // determine bucket count for preset memory usage // // // bucket size is key + value + 1 byte for status @@ -320,8 +325,7 @@ int main(int argc, char *argv[]) { if (signif_vector_exists) { params.dht_signif_vector = as>(R["signif_vector"]); } else { - // params.dht_signif_vector.assign(dht_object->key_size / sizeof(double), - // dht_significant_digits); + params.dht_signif_vector.assign(grid.getCols(), dht_significant_digits); } /*Load property type vector from R setup file (or set default)*/ @@ -329,9 +333,7 @@ int main(int argc, char *argv[]) { if (prop_type_vector_exists) { params.dht_prop_type_vector = as>(R["prop_type"]); } else { - // params.dht_prop_type_vector.assign(dht_object->key_size / - // sizeof(double), - // "act"); + params.dht_prop_type_vector.assign(grid.getCols(), "act"); } if (params.world_rank == 0) { @@ -386,6 +388,7 @@ int main(int argc, char *argv[]) { if (params.world_rank == 0) { /* This is executed by the master */ ChemMaster master(¶ms, R, grid); + TransportSim trans(R); Rcpp::NumericVector master_send; Rcpp::NumericVector master_recv; @@ -412,22 +415,22 @@ int main(int argc, char *argv[]) { cout << "CPP: Calling Advection" << endl; - sim_b_transport = MPI_Wtime(); - R.parseEvalQ("mysetup <- master_advection(setup=mysetup)"); - sim_a_transport = MPI_Wtime(); + trans.runIteration(); + // sim_b_transport = MPI_Wtime(); + // R.parseEvalQ("mysetup <- master_advection(setup=mysetup)"); + // sim_a_transport = MPI_Wtime(); - if (iter == 1) master.prepareSimulation(); + // if (iter == 1) master.prepareSimulation(); cout << "CPP: Chemistry" << endl; /*Fallback for sequential execution*/ - sim_b_chemistry = MPI_Wtime(); if (params.world_size == 1) { master.runSeq(); } else { /*send work to workers*/ - master.runIteration(); + master.runPar(); } - sim_a_chemistry = MPI_Wtime(); + double master_seq_a = MPI_Wtime(); // MDL master_iteration_end just writes on disk state_T and // state_C after every iteration if the cmdline option @@ -435,8 +438,8 @@ int main(int argc, char *argv[]) { // store_result is TRUE) R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)"); - cummul_transport += sim_a_transport - sim_b_transport; - cummul_chemistry += sim_a_chemistry - sim_b_chemistry; + cummul_transport += trans.getTransportTime(); + cummul_chemistry += master.getChemistryTime(); cout << endl << "CPP: End of *coupling* iteration " << iter << "/" << maxiter @@ -457,8 +460,9 @@ int main(int argc, char *argv[]) { } // END SIMULATION LOOP + cout << "CPP: finished simulation loop" << endl; + sim_end = MPI_Wtime(); - master.finishSimulation(); Rcpp::NumericVector phreeqc_time; Rcpp::NumericVector dht_get_time; @@ -483,6 +487,9 @@ int main(int argc, char *argv[]) { double idle_worker_tmp; + cout << "CPP: Advising worker to stop work and collect data from them" + << endl; + for (int p = 0; p < params.world_size - 1; p++) { /* ATTENTION Worker p has rank p+1 */ /* Send termination message to worker */ @@ -570,8 +577,8 @@ int main(int argc, char *argv[]) { r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));"; R.parseEval(r_vis_code); } else { /*This is executed by the workers*/ - ChemWorker worker(¶ms, R, grid); - worker.prepareSimulation(dht_comm); + ChemWorker worker(¶ms, R, grid, dht_comm); + // worker.prepareSimulation(dht_comm); worker.loop(); } @@ -587,7 +594,7 @@ int main(int argc, char *argv[]) { // } // } - free(mpi_buffer); + // free(mpi_buffer); MPI_Finalize(); if (params.world_rank == 0) { diff --git a/src/model/ChemMaster.cpp b/src/model/ChemMaster.cpp index fee86113a..c4a96396b 100644 --- a/src/model/ChemMaster.cpp +++ b/src/model/ChemMaster.cpp @@ -15,9 +15,32 @@ ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_) : ChemSim(params, R_, grid_) { this->wp_size = params->wp_size; this->out_dir = params->out_dir; + + workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); + send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET, + sizeof(double)); + + R.parseEvalQ( + "wp_ids <- distribute_work_packages(len=nrow(mysetup$state_C), " + "package_size=work_package_size)"); + + // we only sort once the vector + R.parseEvalQ("ordered_ids <- order(wp_ids)"); + R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); + R.parseEval("stat_wp_sizes(wp_sizes_vector)"); + wp_sizes_vector = as>(R["wp_sizes_vector"]); + + mpi_buffer = + (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); } -void ChemMaster::runIteration() { +ChemMaster::~ChemMaster() { + free(mpi_buffer); + free(workerlist); +} + +void ChemMaster::runPar() { + double chem_a, chem_b; double seq_a, seq_b, seq_c, seq_d; double worker_chemistry_a, worker_chemistry_b; double sim_e_chemistry, sim_f_chemistry; @@ -25,6 +48,8 @@ void ChemMaster::runIteration() { int free_workers; int i_pkgs; + chem_a = MPI_Wtime(); + seq_a = MPI_Wtime(); grid.shuffleAndExport(mpi_buffer); // retrieve data from R runtime @@ -68,6 +93,9 @@ void ChemMaster::runIteration() { chem_master += sim_f_chemistry - sim_e_chemistry; seq_d = MPI_Wtime(); seq_t += seq_d - seq_c; + + chem_b = MPI_Wtime(); + chem_t += chem_b - chem_a; } void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs, @@ -180,30 +208,6 @@ void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) { /* end visual progress */ } -void ChemMaster::prepareSimulation() { - workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); - send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET, - sizeof(double)); - - R.parseEvalQ( - "wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), " - "package_size=work_package_size)"); - - // we only sort once the vector - R.parseEvalQ("ordered_ids <- order(wp_ids)"); - R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); - R.parseEval("stat_wp_sizes(wp_sizes_vector)"); - wp_sizes_vector = as>(R["wp_sizes_vector"]); - - mpi_buffer = - (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); -} - -void ChemMaster::finishSimulation() { - free(mpi_buffer); - free(workerlist); -} - double ChemMaster::getSendTime() { return this->send_t; } double ChemMaster::getRecvTime() { return this->recv_t; } diff --git a/src/model/ChemSim.cpp b/src/model/ChemSim.cpp index 62b783481..8a322ef4b 100644 --- a/src/model/ChemSim.cpp +++ b/src/model/ChemSim.cpp @@ -1,9 +1,12 @@ #include "ChemSim.h" + +#include +#include + +#include + #include "../util/RRuntime.h" #include "Grid.h" -#include -#include -#include using namespace Rcpp; using namespace poet; @@ -17,7 +20,16 @@ ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_) } void ChemSim::runSeq() { + double chem_a, chem_b; + + chem_a = MPI_Wtime(); + R.parseEvalQ( "result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)"); R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); + + chem_b = MPI_Wtime(); + chem_t += chem_b - chem_a; } + +double ChemSim::getChemistryTime() { return this->chem_t; } diff --git a/src/model/ChemSim.h b/src/model/ChemSim.h index 0d2c6a615..5b9b2f8ae 100644 --- a/src/model/ChemSim.h +++ b/src/model/ChemSim.h @@ -22,7 +22,9 @@ namespace poet { class ChemSim { public: ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_); + void runSeq(); + double getChemistryTime(); protected: double current_sim_time = 0; @@ -47,16 +49,17 @@ protected: worker_struct *workerlist; double *mpi_buffer; + + double chem_t = 0.f; }; class ChemMaster : public ChemSim { public: ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_); + ~ChemMaster(); - void prepareSimulation(); - void finishSimulation(); - - void runIteration(); + void runPar(); + void profile(); double getSendTime(); double getRecvTime(); @@ -83,9 +86,9 @@ private: class ChemWorker : public ChemSim { public: - ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_); + ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_, MPI_Comm dht_comm); + ~ChemWorker(); - void prepareSimulation(MPI_Comm dht_comm); void loop(); private: diff --git a/src/model/ChemWorker.cpp b/src/model/ChemWorker.cpp index 91f8937ec..20746db3e 100644 --- a/src/model/ChemWorker.cpp +++ b/src/model/ChemWorker.cpp @@ -10,15 +10,15 @@ using namespace poet; using namespace std; using namespace Rcpp; -ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_) +ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_, + MPI_Comm dht_comm) : params(params_), ChemSim(params_, R_, grid_) { this->dt_differ = params->dt_differ; this->dht_enabled = params->dht_enabled; this->dht_size_per_process = params->dht_size_per_process; this->dht_file = params->dht_file; -} + this->dht_snaps = params->dht_snaps; -void ChemWorker::prepareSimulation(MPI_Comm dht_comm) { mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); mpi_buffer_results = @@ -59,6 +59,12 @@ void ChemWorker::prepareSimulation(MPI_Comm dht_comm) { timing[2] = 0.0; } +ChemWorker::~ChemWorker() { + free(mpi_buffer); + free(mpi_buffer_results); + delete dht; +} + void ChemWorker::loop() { MPI_Status probe_status; while (1) { @@ -69,11 +75,11 @@ void ChemWorker::loop() { if (probe_status.MPI_TAG == TAG_WORK) { idle_t += idle_b - idle_a; doWork(probe_status); + } else if (probe_status.MPI_TAG == TAG_DHT_ITER) { + postIter(); } else if (probe_status.MPI_TAG == TAG_FINISH) { finishWork(); break; - } else if (probe_status.MPI_TAG == TAG_DHT_ITER) { - postIter(); } } } @@ -265,11 +271,12 @@ void ChemWorker::postIter() { } void ChemWorker::writeFile() { + cout.flush(); std::stringstream out; out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht"; int res = dht->tableToFile(out.str().c_str()); if (res != DHT_SUCCESS && world_rank == 2) - cerr << "CPP: Worker: Errir in writing current state of DHT to file." + cerr << "CPP: Worker: Error in writing current state of DHT to file." << endl; else if (world_rank == 2) cout << "CPP: Worker: Successfully written DHT to file " << out.str() @@ -298,9 +305,6 @@ void ChemWorker::readFile() { } void ChemWorker::finishWork() { - if (dht_enabled && dht_snaps > 0) writeFile(); - - double dht_perf[3]; /* before death, submit profiling/timings to master*/ MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD, MPI_STATUS_IGNORE); @@ -313,6 +317,7 @@ void ChemWorker::finishWork() { if (dht_enabled) { // dht_perf + double dht_perf[3]; dht_perf[0] = dht->getHits(); dht_perf[1] = dht->getMisses(); dht_perf[2] = dht->getEvictions(); @@ -320,7 +325,9 @@ void ChemWorker::finishWork() { MPI_COMM_WORLD); } - free(mpi_buffer); - free(mpi_buffer_results); - delete dht; + if (dht_enabled && dht_snaps > 0) writeFile(); + + // free(mpi_buffer); + // free(mpi_buffer_results); + // delete dht; } diff --git a/src/model/TransportSim.cpp b/src/model/TransportSim.cpp new file mode 100644 index 000000000..566bef69e --- /dev/null +++ b/src/model/TransportSim.cpp @@ -0,0 +1,19 @@ +#include "TransportSim.h" + +#include + +using namespace poet; + +TransportSim::TransportSim(RRuntime &R_) : R(_R) {} + +void TransportSim::runIteration() { + double sim_a_transport, sim_b_transport; + + sim_b_transport = MPI_Wtime(); + R.parseEvalQ("mysetup <- master_advection(setup=mysetup)"); + sim_a_transport = MPI_Wtime(); + + transport_t += sim_a_transport - sim_b_transport; +} + +double TransportSim::getTransportTime { return this->transport_t; } diff --git a/src/model/TransportSim.h b/src/model/TransportSim.h new file mode 100644 index 000000000..7ceb44daf --- /dev/null +++ b/src/model/TransportSim.h @@ -0,0 +1,22 @@ +#ifndef TRANSPORT_SIM_H +#define TRANSPORT_SIM_H + +#include "../util/RRuntime.h" + +namespace poet { +class TransportSim { + public: + TransportSim(RRuntime &R); + + void runIteration(); + + double getTransportTime(); + + private: + RRuntime &R; + + double transport_t = 0.f; +}; +} // namespace poet + +#endif // TRANSPORT_SIM_H \ No newline at end of file diff --git a/src/util/Parser.cpp b/src/util/Parser.cpp new file mode 100644 index 000000000..51b3c73a5 --- /dev/null +++ b/src/util/Parser.cpp @@ -0,0 +1,144 @@ +#include "Parser.h" + +#include + +#include + +using namespace poet; +using namespace std; +using namespace Rcpp; + +Parser::Parser(char *argv[], int world_rank_, int world_size_) + : cmdl(argv), world_rank(world_rank_), world_size(world_size_) { + this->simparams.world_rank = world_rank_; + this->simparams.world_size = world_size; +} + +int Parser::parseCmdl() { + // if user asked for help + if (cmdl[{"help", "h"}]) { + if (world_rank == 0) { + cout << "Todo" << endl + << "See README.md for further information." << endl; + } + return PARSER_HELP; + } + // if positional arguments are missing + else if (!cmdl(2)) { + if (world_rank == 0) { + cerr << "ERROR. Kin needs 2 positional arguments: " << endl + << "1) the R script defining your simulation and" << endl + << "2) the directory prefix where to save results and profiling" + << endl; + } + return PARSER_ERROR; + } + + std::list optionsError = checkOptions(cmdl); + if (!optionsError.empty()) { + if (world_rank == 0) { + cerr << "Unrecognized option(s):\n" << endl; + for (auto option : optionsError) { + cerr << option << endl; + } + cerr << "\nMake sure to use available options. Exiting!" << endl; + } + return PARSER_ERROR; + } + + /*Parse DHT arguments*/ + simparams.dht_enabled = cmdl["dht"]; + // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n'; + + if (simparams.dht_enabled) { + cmdl("dht-strategy", 0) >> simparams.dht_strategy; + // cout << "CPP: DHT strategy is " << dht_strategy << endl; + + cmdl("dht-signif", 5) >> dht_significant_digits; + // cout << "CPP: DHT significant digits = " << dht_significant_digits << + // endl; + + simparams.dht_log = !(cmdl["dht-nolog"]); + // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON" + // : "OFF" ) << endl; + + cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> simparams.dht_size_per_process; + // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process << + // endl; + + cmdl("dht-snaps", 0) >> simparams.dht_snaps; + + cmdl("dht-file") >> simparams.dht_file; + } + /*Parse work package size*/ + cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size; + + /*Parse output options*/ + simparams.store_result = !cmdl["ignore-result"]; + + if (world_rank == 0) { + cout << "CPP: Complete results storage is " + << (simparams.store_result ? "ON" : "OFF") << endl; + cout << "CPP: Work Package Size: " << simparams.wp_size << endl; + cout << "CPP: DHT is " << (simparams.dht_enabled ? "ON" : "OFF") << '\n'; + + if (simparams.dht_enabled) { + cout << "CPP: DHT strategy is " << simparams.dht_strategy << endl; + cout << "CPP: DHT key default digits (ignored if 'signif_vector' is " + "defined) = " + << dht_significant_digits << endl; + cout << "CPP: DHT logarithm before rounding: " + << (simparams.dht_log ? "ON" : "OFF") << endl; + cout << "CPP: DHT size per process (Byte) = " + << simparams.dht_size_per_process << endl; + cout << "CPP: DHT save snapshots is " << simparams.dht_snaps << endl; + cout << "CPP: DHT load file is " << simparams.dht_file << endl; + } + } + + cmdl(1) >> simparams.filesim; + cmdl(2) >> simparams.out_dir; + + return PARSER_OK; +} + +void Parser::parseR(RRuntime &R) { + // if local_rank == 0 then master else worker + R["local_rank"] = simparams.world_rank; + // assign a char* (string) to 'filesim' + R["filesim"] = wrap(simparams.filesim); + // assign a char* (string) to 'fileout' + R["fileout"] = wrap(simparams.out_dir); + // pass the boolean "store_result" to the R process + R["store_result"] = simparams.store_result; + // worker count + R["n_procs"] = simparams.world_size - 1; + // work package size + R["work_package_size"] = simparams.wp_size; + // dht enabled? + R["dht_enabled"] = simparams.dht_enabled; + // log before rounding? + R["dht_log"] = simparams.dht_log; + + // eval the init string, ignoring any returns + R.parseEvalQ("source(filesim)"); +} + +t_simparams Parser::getParams() { return this->simparams; } + +std::list Parser::checkOptions(argh::parser cmdl) { + std::list retList; + // std::set flist = flagList(); + // std::set plist = paramList(); + + for (auto &flag : cmdl.flags()) { + if (!(flaglist.find(flag) != flaglist.end())) retList.push_back(flag); + } + + for (auto ¶m : cmdl.params()) { + if (!(paramlist.find(param.first) != paramlist.end())) + retList.push_back(param.first); + } + + return retList; +} diff --git a/src/util/Parser.h b/src/util/Parser.h new file mode 100644 index 000000000..d79a7ad35 --- /dev/null +++ b/src/util/Parser.h @@ -0,0 +1,42 @@ +#ifndef PARSER_H +#define PARSER_H + +#include + +#include "../argh.h" +#include "RRuntime.h" +#include "SimParams.h" + +#define PARSER_OK 0 +#define PARSER_ERROR 1 +#define PARSER_HELP 2 + +#define DHT_SIZE_PER_PROCESS 1073741824 +#define WORK_PACKAGE_SIZE_DEFAULT 5 + +namespace poet { +class Parser { + public: + Parser(char *argv[], int world_rank, int world_size); + int parseCmdl(); + void parseR(RRuntime &R); + + t_simparams getParams(); + + private: + std::list checkOptions(argh::parser cmdl); + std::set flaglist{"ignore-result", "dht", "dht-nolog"}; + std::set paramlist{"work-package-size", "dht-signif", + "dht-strategy", "dht-size", + "dht-snaps", "dht-file"}; + argh::parser cmdl; + + t_simparams simparams; + + int world_rank; + int world_size; + + int dht_significant_digits; +}; +} // namespace poet +#endif // PARSER_H \ No newline at end of file diff --git a/src/util/SimParams.h b/src/util/SimParams.h index 948dee481..8c982f17a 100644 --- a/src/util/SimParams.h +++ b/src/util/SimParams.h @@ -21,8 +21,11 @@ typedef struct { unsigned int wp_size; + std::string filesim; std::string out_dir; + bool store_result; + void* R; void* grid; } t_simparams;