Introducing TransportSim and Parser

This commit is contained in:
Max Lübke 2021-01-12 14:48:25 +01:00
parent 19b4b98b44
commit 5fed0a4a3e
11 changed files with 491 additions and 228 deletions

View File

@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS})
#define program libraries
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp)
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp util/Parser.cpp model/TransportSim.cpp)
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)

View File

@ -12,70 +12,72 @@
// #include "global_buffer.h"
#include "model/ChemSim.h"
#include "model/Grid.h"
#include "util/Parser.h"
#include "util/RRuntime.h"
#include "util/SimParams.h"
#include "model/TransportSim.h"
// #include "worker.h"
#define DHT_SIZE_PER_PROCESS 1073741824
//#define DHT_SIZE_PER_PROCESS 1073741824
using namespace std;
using namespace poet;
using namespace Rcpp;
double *mpi_buffer;
double *mpi_buffer_results;
// double *mpi_buffer;
// double *mpi_buffer_results;
uint32_t work_package_size;
#define WORK_PACKAGE_SIZE_DEFAULT 5
// uint32_t work_package_size;
// #define WORK_PACKAGE_SIZE_DEFAULT 5
bool store_result;
// bool store_result;
std::set<std::string> paramList() {
std::set<std::string> options;
// global
options.insert("work-package-size");
// only DHT
options.insert("dht-signif");
options.insert("dht-strategy");
options.insert("dht-size");
options.insert("dht-snaps");
options.insert("dht-file");
// std::set<std::string> paramList() {
// std::set<std::string> options;
// // global
// options.insert("work-package-size");
// // only DHT
// options.insert("dht-signif");
// options.insert("dht-strategy");
// options.insert("dht-size");
// options.insert("dht-snaps");
// options.insert("dht-file");
return options;
}
// return options;
// }
std::set<std::string> flagList() {
std::set<std::string> options;
// global
options.insert("ignore-result");
// only DHT
options.insert("dht");
options.insert("dht-log");
// std::set<std::string> flagList() {
// std::set<std::string> options;
// // global
// options.insert("ignore-result");
// // only DHT
// options.insert("dht");
// options.insert("dht-log");
return options;
}
// return options;
// }
std::list<std::string> checkOptions(argh::parser cmdl) {
std::list<std::string> retList;
std::set<std::string> flist = flagList();
std::set<std::string> plist = paramList();
// std::list<std::string> checkOptions(argh::parser cmdl) {
// std::list<std::string> retList;
// std::set<std::string> flist = flagList();
// std::set<std::string> plist = paramList();
for (auto &flag : cmdl.flags()) {
if (!(flist.find(flag) != flist.end())) retList.push_back(flag);
}
// for (auto &flag : cmdl.flags()) {
// if (!(flist.find(flag) != flist.end())) retList.push_back(flag);
// }
for (auto &param : cmdl.params()) {
if (!(plist.find(param.first) != plist.end()))
retList.push_back(param.first);
}
// for (auto &param : cmdl.params()) {
// if (!(plist.find(param.first) != plist.end()))
// retList.push_back(param.first);
// }
return retList;
}
// return retList;
// }
typedef struct {
char has_work;
double *send_addr;
} worker_struct;
// typedef struct {
// char has_work;
// double *send_addr;
// } worker_struct;
int main(int argc, char *argv[]) {
double sim_start, sim_b_transport, sim_a_transport, sim_b_chemistry,
@ -83,35 +85,20 @@ int main(int argc, char *argv[]) {
double cummul_transport = 0.f;
double cummul_chemistry = 0.f;
double cummul_workers = 0.f;
double cummul_chemistry_master = 0.f;
double cummul_master_seq = 0.f;
double cummul_master_seq_loop = 0.f;
double master_idle = 0.f;
double master_send_a, master_send_b;
double cummul_master_send = 0.f;
double master_recv_a, master_recv_b;
double cummul_master_recv = 0.f;
double sim_a_seq, sim_b_seq, sim_c_seq, sim_d_seq;
double idle_a, idle_b;
double sim_c_chemistry, sim_d_chemistry;
double sim_e_chemistry, sim_f_chemistry;
argh::parser cmdl(argv);
int dht_significant_digits;
// cout << "CPP: Start Init (MPI)" << endl;
t_simparams params;
int world_size, world_rank;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &(params.world_size));
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &(params.world_rank));
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
/*Create custom Communicator with all processes except 0 (the master) for DHT
* storage*/
@ -121,109 +108,90 @@ int main(int argc, char *argv[]) {
int *process_ranks;
// make a list of processes in the new communicator
process_ranks = (int *)malloc(params.world_size * sizeof(int));
for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I;
process_ranks = (int *)malloc(world_size * sizeof(int));
for (int I = 1; I < world_size; I++) process_ranks[I - 1] = I;
// get the group under MPI_COMM_WORLD
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
// create the new group
MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group);
MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group);
// create the new communicator
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
free(process_ranks); // cleanup
// cout << "Done";
if (cmdl[{"help", "h"}]) {
if (params.world_rank == 0) {
cout << "Todo" << endl
<< "See README.md for further information." << endl;
}
MPI_Finalize();
return EXIT_SUCCESS;
}
// if (cmdl[{"help", "h"}]) {
// if (params.world_rank == 0) {
// cout << "Todo" << endl
// << "See README.md for further information." << endl;
// }
// MPI_Finalize();
// return EXIT_SUCCESS;
// }
/*INIT is now done separately in an R file provided here as argument!*/
if (!cmdl(2)) {
if (params.world_rank == 0) {
cerr << "ERROR. Kin needs 2 positional arguments: " << endl
<< "1) the R script defining your simulation and" << endl
<< "2) the directory prefix where to save results and profiling"
<< endl;
}
MPI_Finalize();
return EXIT_FAILURE;
}
// /*INIT is now done separately in an R file provided here as argument!*/
// if (!cmdl(2)) {
// if (params.world_rank == 0) {
// cerr << "ERROR. Kin needs 2 positional arguments: " << endl
// << "1) the R script defining your simulation and" << endl
// << "2) the directory prefix where to save results and profiling"
// << endl;
// }
// MPI_Finalize();
// return EXIT_FAILURE;
// }
std::list<std::string> optionsError = checkOptions(cmdl);
if (!optionsError.empty()) {
if (params.world_rank == 0) {
cerr << "Unrecognized option(s):\n" << endl;
for (auto option : optionsError) {
cerr << option << endl;
}
cerr << "\nMake sure to use available options. Exiting!" << endl;
}
MPI_Finalize();
return EXIT_FAILURE;
}
// std::list<std::string> optionsError = checkOptions(cmdl);
// if (!optionsError.empty()) {
// if (params.world_rank == 0) {
// cerr << "Unrecognized option(s):\n" << endl;
// for (auto option : optionsError) {
// cerr << option << endl;
// }
// cerr << "\nMake sure to use available options. Exiting!" << endl;
// }
// MPI_Finalize();
// return EXIT_FAILURE;
// }
/*Parse DHT arguments*/
params.dht_enabled = cmdl["dht"];
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
// /*Parse DHT arguments*/
// params.dht_enabled = cmdl["dht"];
// // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
if (params.dht_enabled) {
cmdl("dht-strategy", 0) >> params.dht_strategy;
// cout << "CPP: DHT strategy is " << dht_strategy << endl;
// if (params.dht_enabled) {
// cmdl("dht-strategy", 0) >> params.dht_strategy;
// // cout << "CPP: DHT strategy is " << dht_strategy << endl;
cmdl("dht-signif", 5) >> dht_significant_digits;
// cout << "CPP: DHT significant digits = " << dht_significant_digits <<
// endl;
// cmdl("dht-signif", 5) >> dht_significant_digits;
// // cout << "CPP: DHT significant digits = " << dht_significant_digits <<
// // endl;
params.dht_log = !(cmdl["dht-nolog"]);
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
// : "OFF" ) << endl;
// params.dht_log = !(cmdl["dht-nolog"]);
// // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ?
// "ON"
// // : "OFF" ) << endl;
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process;
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
// endl;
// cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process;
// // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process
// <<
// // endl;
cmdl("dht-snaps", 0) >> params.dht_snaps;
// cmdl("dht-snaps", 0) >> params.dht_snaps;
cmdl("dht-file") >> params.dht_file;
}
// cmdl("dht-file") >> params.dht_file;
// }
/*Parse work package size*/
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size;
// /*Parse work package size*/
// cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size;
/*Parse output options*/
store_result = !cmdl["ignore-result"];
// /*Parse output options*/
// store_result = !cmdl["ignore-result"];
if (params.world_rank == 0) {
cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF")
<< endl;
cout << "CPP: Work Package Size: " << params.wp_size << endl;
cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n';
if (params.dht_enabled) {
cout << "CPP: DHT strategy is " << params.dht_strategy << endl;
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
"defined) = "
<< dht_significant_digits << endl;
cout << "CPP: DHT logarithm before rounding: "
<< (params.dht_log ? "ON" : "OFF") << endl;
cout << "CPP: DHT size per process (Byte) = "
<< params.dht_size_per_process << endl;
cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl;
cout << "CPP: DHT load file is " << params.dht_file << endl;
}
}
cout << "CPP: R Init (RInside) on process " << params.world_rank << endl;
RRuntime R(argc, argv);
params.R = &R;
cout << "CPP: R Init (RInside) on process " << world_rank << endl;
// if local_rank == 0 then master else worker
R["local_rank"] = params.world_rank;
// R["local_rank"] = params.world_rank;
/*Loading Dependencies*/
std::string r_load_dependencies =
@ -233,37 +201,71 @@ int main(int argc, char *argv[]) {
"source('parallel_r_library.R');";
R.parseEvalQ(r_load_dependencies);
std::string filesim;
cmdl(1) >> filesim; // <- first positional argument
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
R.parseEvalQ(
"source(filesim)"); // eval the init string, ignoring any returns
Parser parser(argv, world_rank, world_size);
int pret = parser.parseCmdl();
if (params.world_rank ==
0) { // only rank 0 initializes goes through the whole initialization
cmdl(2) >> params.out_dir; // <- second positional argument
R["fileout"] =
wrap(params.out_dir); // assign a char* (string) to 'fileout'
if (pret == PARSER_ERROR) {
MPI_Finalize();
return EXIT_FAILURE;
} else if (pret == PARSER_HELP) {
MPI_Finalize();
return EXIT_SUCCESS;
}
parser.parseR(R);
params = parser.getParams();
// if (params.world_rank == 0) {
// cout << "CPP: Complete results storage is " << (params.store_result ? "ON" :
// "OFF")
// << endl;
// cout << "CPP: Work Package Size: " << params.wp_size << endl;
// cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n';
// if (params.dht_enabled) {
// cout << "CPP: DHT strategy is " << params.dht_strategy << endl;
// // cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
// // "defined) = "
// // << dht_significant_digits << endl;
// cout << "CPP: DHT logarithm before rounding: "
// << (params.dht_log ? "ON" : "OFF") << endl;
// cout << "CPP: DHT size per process (Byte) = "
// << params.dht_size_per_process << endl;
// cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl;
// cout << "CPP: DHT load file is " << params.dht_file << endl;
// }
// }
// std::string filesim;
// cmdl(1) >> filesim; // <- first positional argument
// R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
// R.parseEvalQ(
// "source(filesim)"); // eval the init string, ignoring any returns
// only rank 0 initializes goes through the whole initialization
if (world_rank == 0) {
// cmdl(2) >> params.out_dir; // <- second positional argument
// R["fileout"] =
// wrap(params.out_dir); // assign a char* (string) to 'fileout'
// Note: R::sim_init() checks if the directory already exists,
// if not it makes it
// pass the boolean "store_result" to the R process
R["store_result"] = store_result;
// R["store_result"] = store_result;
// get timestep vector from grid_init function ...
std::string master_init_code = "mysetup <- master_init(setup=setup)";
R.parseEval(master_init_code);
params.dt_differ =
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
params.dt_differ = R.parseEval("mysetup$dt_differ");
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
} else { // workers will only read the setup DataFrame defined by input file
R.parseEval("mysetup <- setup");
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
}
if (params.world_rank == 0) {
if (world_rank == 0) {
cout << "CPP: R init done on process with rank " << params.world_rank
<< endl;
}
@ -279,8 +281,8 @@ int main(int argc, char *argv[]) {
// Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
/* Init Parallel helper functions */
R["n_procs"] = params.world_size - 1; /* worker count */
R["work_package_size"] = params.wp_size;
// R["n_procs"] = params.world_size - 1; /* worker count */
// R["work_package_size"] = params.wp_size;
// Removed additional field for ID in previous versions
// if (params.world_rank == 0) {
@ -293,21 +295,24 @@ int main(int argc, char *argv[]) {
// (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
// }
if (params.world_rank == 0) {
cout << "CPP: parallel init completed (buffers allocated)!" << endl;
}
// if (params.world_rank == 0) {
// cout << "CPP: parallel init completed (buffers allocated)!" << endl;
// }
// MDL: pass to R the DHT stuff (basically, only for storing of
// simulation parameters). These 2 variables are always defined:
R["dht_enabled"] = params.dht_enabled;
R["dht_log"] = params.dht_log;
// R["dht_enabled"] = params.dht_enabled;
// R["dht_log"] = params.dht_log;
params.R = &R;
if (params.dht_enabled) {
// cout << "\nCreating DHT\n";
// determine size of dht entries
// int dht_data_size = grid.getCols() * sizeof(double);
// int dht_key_size =
// grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
// grid.getCols() * sizeof(double) + (params.dt_differ *
// sizeof(double));
// // // determine bucket count for preset memory usage
// // // bucket size is key + value + 1 byte for status
@ -320,8 +325,7 @@ int main(int argc, char *argv[]) {
if (signif_vector_exists) {
params.dht_signif_vector = as<std::vector<int>>(R["signif_vector"]);
} else {
// params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
// dht_significant_digits);
params.dht_signif_vector.assign(grid.getCols(), dht_significant_digits);
}
/*Load property type vector from R setup file (or set default)*/
@ -329,9 +333,7 @@ int main(int argc, char *argv[]) {
if (prop_type_vector_exists) {
params.dht_prop_type_vector = as<std::vector<string>>(R["prop_type"]);
} else {
// params.dht_prop_type_vector.assign(dht_object->key_size /
// sizeof(double),
// "act");
params.dht_prop_type_vector.assign(grid.getCols(), "act");
}
if (params.world_rank == 0) {
@ -386,6 +388,7 @@ int main(int argc, char *argv[]) {
if (params.world_rank == 0) { /* This is executed by the master */
ChemMaster master(&params, R, grid);
TransportSim trans(R);
Rcpp::NumericVector master_send;
Rcpp::NumericVector master_recv;
@ -412,22 +415,22 @@ int main(int argc, char *argv[]) {
cout << "CPP: Calling Advection" << endl;
sim_b_transport = MPI_Wtime();
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
sim_a_transport = MPI_Wtime();
trans.runIteration();
// sim_b_transport = MPI_Wtime();
// R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
// sim_a_transport = MPI_Wtime();
if (iter == 1) master.prepareSimulation();
// if (iter == 1) master.prepareSimulation();
cout << "CPP: Chemistry" << endl;
/*Fallback for sequential execution*/
sim_b_chemistry = MPI_Wtime();
if (params.world_size == 1) {
master.runSeq();
} else { /*send work to workers*/
master.runIteration();
master.runPar();
}
sim_a_chemistry = MPI_Wtime();
double master_seq_a = MPI_Wtime();
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
@ -435,8 +438,8 @@ int main(int argc, char *argv[]) {
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
cummul_transport += sim_a_transport - sim_b_transport;
cummul_chemistry += sim_a_chemistry - sim_b_chemistry;
cummul_transport += trans.getTransportTime();
cummul_chemistry += master.getChemistryTime();
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
@ -457,8 +460,9 @@ int main(int argc, char *argv[]) {
} // END SIMULATION LOOP
cout << "CPP: finished simulation loop" << endl;
sim_end = MPI_Wtime();
master.finishSimulation();
Rcpp::NumericVector phreeqc_time;
Rcpp::NumericVector dht_get_time;
@ -483,6 +487,9 @@ int main(int argc, char *argv[]) {
double idle_worker_tmp;
cout << "CPP: Advising worker to stop work and collect data from them"
<< endl;
for (int p = 0; p < params.world_size - 1; p++) {
/* ATTENTION Worker p has rank p+1 */
/* Send termination message to worker */
@ -570,8 +577,8 @@ int main(int argc, char *argv[]) {
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
} else { /*This is executed by the workers*/
ChemWorker worker(&params, R, grid);
worker.prepareSimulation(dht_comm);
ChemWorker worker(&params, R, grid, dht_comm);
// worker.prepareSimulation(dht_comm);
worker.loop();
}
@ -587,7 +594,7 @@ int main(int argc, char *argv[]) {
// }
// }
free(mpi_buffer);
// free(mpi_buffer);
MPI_Finalize();
if (params.world_rank == 0) {

View File

@ -15,9 +15,32 @@ ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_)
: ChemSim(params, R_, grid_) {
this->wp_size = params->wp_size;
this->out_dir = params->out_dir;
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
sizeof(double));
R.parseEvalQ(
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_C), "
"package_size=work_package_size)");
// we only sort once the vector
R.parseEvalQ("ordered_ids <- order(wp_ids)");
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
mpi_buffer =
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
}
void ChemMaster::runIteration() {
ChemMaster::~ChemMaster() {
free(mpi_buffer);
free(workerlist);
}
void ChemMaster::runPar() {
double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b;
double sim_e_chemistry, sim_f_chemistry;
@ -25,6 +48,8 @@ void ChemMaster::runIteration() {
int free_workers;
int i_pkgs;
chem_a = MPI_Wtime();
seq_a = MPI_Wtime();
grid.shuffleAndExport(mpi_buffer);
// retrieve data from R runtime
@ -68,6 +93,9 @@ void ChemMaster::runIteration() {
chem_master += sim_f_chemistry - sim_e_chemistry;
seq_d = MPI_Wtime();
seq_t += seq_d - seq_c;
chem_b = MPI_Wtime();
chem_t += chem_b - chem_a;
}
void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
@ -180,30 +208,6 @@ void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
/* end visual progress */
}
void ChemMaster::prepareSimulation() {
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
sizeof(double));
R.parseEvalQ(
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
"package_size=work_package_size)");
// we only sort once the vector
R.parseEvalQ("ordered_ids <- order(wp_ids)");
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
mpi_buffer =
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
}
void ChemMaster::finishSimulation() {
free(mpi_buffer);
free(workerlist);
}
double ChemMaster::getSendTime() { return this->send_t; }
double ChemMaster::getRecvTime() { return this->recv_t; }

View File

@ -1,9 +1,12 @@
#include "ChemSim.h"
#include <Rcpp.h>
#include <mpi.h>
#include <iostream>
#include "../util/RRuntime.h"
#include "Grid.h"
#include <Rcpp.h>
#include <iostream>
#include <mpi.h>
using namespace Rcpp;
using namespace poet;
@ -17,7 +20,16 @@ ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_)
}
void ChemSim::runSeq() {
double chem_a, chem_b;
chem_a = MPI_Wtime();
R.parseEvalQ(
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
chem_b = MPI_Wtime();
chem_t += chem_b - chem_a;
}
double ChemSim::getChemistryTime() { return this->chem_t; }

View File

@ -22,7 +22,9 @@ namespace poet {
class ChemSim {
public:
ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_);
void runSeq();
double getChemistryTime();
protected:
double current_sim_time = 0;
@ -47,16 +49,17 @@ protected:
worker_struct *workerlist;
double *mpi_buffer;
double chem_t = 0.f;
};
class ChemMaster : public ChemSim {
public:
ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_);
~ChemMaster();
void prepareSimulation();
void finishSimulation();
void runIteration();
void runPar();
void profile();
double getSendTime();
double getRecvTime();
@ -83,9 +86,9 @@ private:
class ChemWorker : public ChemSim {
public:
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_);
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_, MPI_Comm dht_comm);
~ChemWorker();
void prepareSimulation(MPI_Comm dht_comm);
void loop();
private:

View File

@ -10,15 +10,15 @@ using namespace poet;
using namespace std;
using namespace Rcpp;
ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_)
ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_,
MPI_Comm dht_comm)
: params(params_), ChemSim(params_, R_, grid_) {
this->dt_differ = params->dt_differ;
this->dht_enabled = params->dht_enabled;
this->dht_size_per_process = params->dht_size_per_process;
this->dht_file = params->dht_file;
}
this->dht_snaps = params->dht_snaps;
void ChemWorker::prepareSimulation(MPI_Comm dht_comm) {
mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
sizeof(double));
mpi_buffer_results =
@ -59,6 +59,12 @@ void ChemWorker::prepareSimulation(MPI_Comm dht_comm) {
timing[2] = 0.0;
}
ChemWorker::~ChemWorker() {
free(mpi_buffer);
free(mpi_buffer_results);
delete dht;
}
void ChemWorker::loop() {
MPI_Status probe_status;
while (1) {
@ -69,11 +75,11 @@ void ChemWorker::loop() {
if (probe_status.MPI_TAG == TAG_WORK) {
idle_t += idle_b - idle_a;
doWork(probe_status);
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
postIter();
} else if (probe_status.MPI_TAG == TAG_FINISH) {
finishWork();
break;
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
postIter();
}
}
}
@ -265,11 +271,12 @@ void ChemWorker::postIter() {
}
void ChemWorker::writeFile() {
cout.flush();
std::stringstream out;
out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht";
int res = dht->tableToFile(out.str().c_str());
if (res != DHT_SUCCESS && world_rank == 2)
cerr << "CPP: Worker: Errir in writing current state of DHT to file."
cerr << "CPP: Worker: Error in writing current state of DHT to file."
<< endl;
else if (world_rank == 2)
cout << "CPP: Worker: Successfully written DHT to file " << out.str()
@ -298,9 +305,6 @@ void ChemWorker::readFile() {
}
void ChemWorker::finishWork() {
if (dht_enabled && dht_snaps > 0) writeFile();
double dht_perf[3];
/* before death, submit profiling/timings to master*/
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
@ -313,6 +317,7 @@ void ChemWorker::finishWork() {
if (dht_enabled) {
// dht_perf
double dht_perf[3];
dht_perf[0] = dht->getHits();
dht_perf[1] = dht->getMisses();
dht_perf[2] = dht->getEvictions();
@ -320,7 +325,9 @@ void ChemWorker::finishWork() {
MPI_COMM_WORLD);
}
free(mpi_buffer);
free(mpi_buffer_results);
delete dht;
if (dht_enabled && dht_snaps > 0) writeFile();
// free(mpi_buffer);
// free(mpi_buffer_results);
// delete dht;
}

View File

@ -0,0 +1,19 @@
#include "TransportSim.h"
#include <mpi.h>
using namespace poet;
TransportSim::TransportSim(RRuntime &R_) : R(_R) {}
void TransportSim::runIteration() {
double sim_a_transport, sim_b_transport;
sim_b_transport = MPI_Wtime();
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
sim_a_transport = MPI_Wtime();
transport_t += sim_a_transport - sim_b_transport;
}
double TransportSim::getTransportTime { return this->transport_t; }

22
src/model/TransportSim.h Normal file
View File

@ -0,0 +1,22 @@
#ifndef TRANSPORT_SIM_H
#define TRANSPORT_SIM_H
#include "../util/RRuntime.h"
namespace poet {
class TransportSim {
public:
TransportSim(RRuntime &R);
void runIteration();
double getTransportTime();
private:
RRuntime &R;
double transport_t = 0.f;
};
} // namespace poet
#endif // TRANSPORT_SIM_H

144
src/util/Parser.cpp Normal file
View File

@ -0,0 +1,144 @@
#include "Parser.h"
#include <Rcpp.h>
#include <iostream>
using namespace poet;
using namespace std;
using namespace Rcpp;
Parser::Parser(char *argv[], int world_rank_, int world_size_)
: cmdl(argv), world_rank(world_rank_), world_size(world_size_) {
this->simparams.world_rank = world_rank_;
this->simparams.world_size = world_size;
}
int Parser::parseCmdl() {
// if user asked for help
if (cmdl[{"help", "h"}]) {
if (world_rank == 0) {
cout << "Todo" << endl
<< "See README.md for further information." << endl;
}
return PARSER_HELP;
}
// if positional arguments are missing
else if (!cmdl(2)) {
if (world_rank == 0) {
cerr << "ERROR. Kin needs 2 positional arguments: " << endl
<< "1) the R script defining your simulation and" << endl
<< "2) the directory prefix where to save results and profiling"
<< endl;
}
return PARSER_ERROR;
}
std::list<std::string> optionsError = checkOptions(cmdl);
if (!optionsError.empty()) {
if (world_rank == 0) {
cerr << "Unrecognized option(s):\n" << endl;
for (auto option : optionsError) {
cerr << option << endl;
}
cerr << "\nMake sure to use available options. Exiting!" << endl;
}
return PARSER_ERROR;
}
/*Parse DHT arguments*/
simparams.dht_enabled = cmdl["dht"];
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
if (simparams.dht_enabled) {
cmdl("dht-strategy", 0) >> simparams.dht_strategy;
// cout << "CPP: DHT strategy is " << dht_strategy << endl;
cmdl("dht-signif", 5) >> dht_significant_digits;
// cout << "CPP: DHT significant digits = " << dht_significant_digits <<
// endl;
simparams.dht_log = !(cmdl["dht-nolog"]);
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
// : "OFF" ) << endl;
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> simparams.dht_size_per_process;
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
// endl;
cmdl("dht-snaps", 0) >> simparams.dht_snaps;
cmdl("dht-file") >> simparams.dht_file;
}
/*Parse work package size*/
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size;
/*Parse output options*/
simparams.store_result = !cmdl["ignore-result"];
if (world_rank == 0) {
cout << "CPP: Complete results storage is "
<< (simparams.store_result ? "ON" : "OFF") << endl;
cout << "CPP: Work Package Size: " << simparams.wp_size << endl;
cout << "CPP: DHT is " << (simparams.dht_enabled ? "ON" : "OFF") << '\n';
if (simparams.dht_enabled) {
cout << "CPP: DHT strategy is " << simparams.dht_strategy << endl;
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
"defined) = "
<< dht_significant_digits << endl;
cout << "CPP: DHT logarithm before rounding: "
<< (simparams.dht_log ? "ON" : "OFF") << endl;
cout << "CPP: DHT size per process (Byte) = "
<< simparams.dht_size_per_process << endl;
cout << "CPP: DHT save snapshots is " << simparams.dht_snaps << endl;
cout << "CPP: DHT load file is " << simparams.dht_file << endl;
}
}
cmdl(1) >> simparams.filesim;
cmdl(2) >> simparams.out_dir;
return PARSER_OK;
}
void Parser::parseR(RRuntime &R) {
// if local_rank == 0 then master else worker
R["local_rank"] = simparams.world_rank;
// assign a char* (string) to 'filesim'
R["filesim"] = wrap(simparams.filesim);
// assign a char* (string) to 'fileout'
R["fileout"] = wrap(simparams.out_dir);
// pass the boolean "store_result" to the R process
R["store_result"] = simparams.store_result;
// worker count
R["n_procs"] = simparams.world_size - 1;
// work package size
R["work_package_size"] = simparams.wp_size;
// dht enabled?
R["dht_enabled"] = simparams.dht_enabled;
// log before rounding?
R["dht_log"] = simparams.dht_log;
// eval the init string, ignoring any returns
R.parseEvalQ("source(filesim)");
}
t_simparams Parser::getParams() { return this->simparams; }
std::list<std::string> Parser::checkOptions(argh::parser cmdl) {
std::list<std::string> retList;
// std::set<std::string> flist = flagList();
// std::set<std::string> plist = paramList();
for (auto &flag : cmdl.flags()) {
if (!(flaglist.find(flag) != flaglist.end())) retList.push_back(flag);
}
for (auto &param : cmdl.params()) {
if (!(paramlist.find(param.first) != paramlist.end()))
retList.push_back(param.first);
}
return retList;
}

42
src/util/Parser.h Normal file
View File

@ -0,0 +1,42 @@
#ifndef PARSER_H
#define PARSER_H
#include <string>
#include "../argh.h"
#include "RRuntime.h"
#include "SimParams.h"
#define PARSER_OK 0
#define PARSER_ERROR 1
#define PARSER_HELP 2
#define DHT_SIZE_PER_PROCESS 1073741824
#define WORK_PACKAGE_SIZE_DEFAULT 5
namespace poet {
class Parser {
public:
Parser(char *argv[], int world_rank, int world_size);
int parseCmdl();
void parseR(RRuntime &R);
t_simparams getParams();
private:
std::list<std::string> checkOptions(argh::parser cmdl);
std::set<std::string> flaglist{"ignore-result", "dht", "dht-nolog"};
std::set<std::string> paramlist{"work-package-size", "dht-signif",
"dht-strategy", "dht-size",
"dht-snaps", "dht-file"};
argh::parser cmdl;
t_simparams simparams;
int world_rank;
int world_size;
int dht_significant_digits;
};
} // namespace poet
#endif // PARSER_H

View File

@ -21,8 +21,11 @@ typedef struct {
unsigned int wp_size;
std::string filesim;
std::string out_dir;
bool store_result;
void* R;
void* grid;
} t_simparams;