mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
Introducing TransportSim and Parser
This commit is contained in:
parent
3f710bfea6
commit
b461e05d5a
@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS})
|
|||||||
|
|
||||||
#define program libraries
|
#define program libraries
|
||||||
|
|
||||||
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp)
|
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp util/Parser.cpp model/TransportSim.cpp)
|
||||||
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
|
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
|
||||||
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)
|
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)
|
||||||
|
|
||||||
|
|||||||
369
src/kin.cpp
369
src/kin.cpp
@ -12,70 +12,72 @@
|
|||||||
// #include "global_buffer.h"
|
// #include "global_buffer.h"
|
||||||
#include "model/ChemSim.h"
|
#include "model/ChemSim.h"
|
||||||
#include "model/Grid.h"
|
#include "model/Grid.h"
|
||||||
|
#include "util/Parser.h"
|
||||||
#include "util/RRuntime.h"
|
#include "util/RRuntime.h"
|
||||||
#include "util/SimParams.h"
|
#include "util/SimParams.h"
|
||||||
|
#include "model/TransportSim.h"
|
||||||
// #include "worker.h"
|
// #include "worker.h"
|
||||||
|
|
||||||
#define DHT_SIZE_PER_PROCESS 1073741824
|
//#define DHT_SIZE_PER_PROCESS 1073741824
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace poet;
|
using namespace poet;
|
||||||
using namespace Rcpp;
|
using namespace Rcpp;
|
||||||
|
|
||||||
double *mpi_buffer;
|
// double *mpi_buffer;
|
||||||
double *mpi_buffer_results;
|
// double *mpi_buffer_results;
|
||||||
|
|
||||||
uint32_t work_package_size;
|
// uint32_t work_package_size;
|
||||||
#define WORK_PACKAGE_SIZE_DEFAULT 5
|
// #define WORK_PACKAGE_SIZE_DEFAULT 5
|
||||||
|
|
||||||
bool store_result;
|
// bool store_result;
|
||||||
|
|
||||||
std::set<std::string> paramList() {
|
// std::set<std::string> paramList() {
|
||||||
std::set<std::string> options;
|
// std::set<std::string> options;
|
||||||
// global
|
// // global
|
||||||
options.insert("work-package-size");
|
// options.insert("work-package-size");
|
||||||
// only DHT
|
// // only DHT
|
||||||
options.insert("dht-signif");
|
// options.insert("dht-signif");
|
||||||
options.insert("dht-strategy");
|
// options.insert("dht-strategy");
|
||||||
options.insert("dht-size");
|
// options.insert("dht-size");
|
||||||
options.insert("dht-snaps");
|
// options.insert("dht-snaps");
|
||||||
options.insert("dht-file");
|
// options.insert("dht-file");
|
||||||
|
|
||||||
return options;
|
// return options;
|
||||||
}
|
// }
|
||||||
|
|
||||||
std::set<std::string> flagList() {
|
// std::set<std::string> flagList() {
|
||||||
std::set<std::string> options;
|
// std::set<std::string> options;
|
||||||
// global
|
// // global
|
||||||
options.insert("ignore-result");
|
// options.insert("ignore-result");
|
||||||
// only DHT
|
// // only DHT
|
||||||
options.insert("dht");
|
// options.insert("dht");
|
||||||
options.insert("dht-log");
|
// options.insert("dht-log");
|
||||||
|
|
||||||
return options;
|
// return options;
|
||||||
}
|
// }
|
||||||
|
|
||||||
std::list<std::string> checkOptions(argh::parser cmdl) {
|
// std::list<std::string> checkOptions(argh::parser cmdl) {
|
||||||
std::list<std::string> retList;
|
// std::list<std::string> retList;
|
||||||
std::set<std::string> flist = flagList();
|
// std::set<std::string> flist = flagList();
|
||||||
std::set<std::string> plist = paramList();
|
// std::set<std::string> plist = paramList();
|
||||||
|
|
||||||
for (auto &flag : cmdl.flags()) {
|
// for (auto &flag : cmdl.flags()) {
|
||||||
if (!(flist.find(flag) != flist.end())) retList.push_back(flag);
|
// if (!(flist.find(flag) != flist.end())) retList.push_back(flag);
|
||||||
}
|
// }
|
||||||
|
|
||||||
for (auto ¶m : cmdl.params()) {
|
// for (auto ¶m : cmdl.params()) {
|
||||||
if (!(plist.find(param.first) != plist.end()))
|
// if (!(plist.find(param.first) != plist.end()))
|
||||||
retList.push_back(param.first);
|
// retList.push_back(param.first);
|
||||||
}
|
// }
|
||||||
|
|
||||||
return retList;
|
// return retList;
|
||||||
}
|
// }
|
||||||
|
|
||||||
typedef struct {
|
// typedef struct {
|
||||||
char has_work;
|
// char has_work;
|
||||||
double *send_addr;
|
// double *send_addr;
|
||||||
} worker_struct;
|
// } worker_struct;
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
double sim_start, sim_b_transport, sim_a_transport, sim_b_chemistry,
|
double sim_start, sim_b_transport, sim_a_transport, sim_b_chemistry,
|
||||||
@ -83,35 +85,20 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
double cummul_transport = 0.f;
|
double cummul_transport = 0.f;
|
||||||
double cummul_chemistry = 0.f;
|
double cummul_chemistry = 0.f;
|
||||||
double cummul_workers = 0.f;
|
|
||||||
double cummul_chemistry_master = 0.f;
|
|
||||||
|
|
||||||
double cummul_master_seq = 0.f;
|
double cummul_master_seq = 0.f;
|
||||||
double cummul_master_seq_loop = 0.f;
|
|
||||||
double master_idle = 0.f;
|
|
||||||
|
|
||||||
double master_send_a, master_send_b;
|
|
||||||
double cummul_master_send = 0.f;
|
|
||||||
double master_recv_a, master_recv_b;
|
|
||||||
double cummul_master_recv = 0.f;
|
|
||||||
|
|
||||||
double sim_a_seq, sim_b_seq, sim_c_seq, sim_d_seq;
|
|
||||||
double idle_a, idle_b;
|
|
||||||
|
|
||||||
double sim_c_chemistry, sim_d_chemistry;
|
|
||||||
double sim_e_chemistry, sim_f_chemistry;
|
|
||||||
|
|
||||||
argh::parser cmdl(argv);
|
argh::parser cmdl(argv);
|
||||||
int dht_significant_digits;
|
int dht_significant_digits;
|
||||||
// cout << "CPP: Start Init (MPI)" << endl;
|
// cout << "CPP: Start Init (MPI)" << endl;
|
||||||
|
|
||||||
t_simparams params;
|
t_simparams params;
|
||||||
|
int world_size, world_rank;
|
||||||
|
|
||||||
MPI_Init(&argc, &argv);
|
MPI_Init(&argc, &argv);
|
||||||
|
|
||||||
MPI_Comm_size(MPI_COMM_WORLD, &(params.world_size));
|
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||||
|
|
||||||
MPI_Comm_rank(MPI_COMM_WORLD, &(params.world_rank));
|
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
||||||
|
|
||||||
/*Create custom Communicator with all processes except 0 (the master) for DHT
|
/*Create custom Communicator with all processes except 0 (the master) for DHT
|
||||||
* storage*/
|
* storage*/
|
||||||
@ -121,109 +108,90 @@ int main(int argc, char *argv[]) {
|
|||||||
int *process_ranks;
|
int *process_ranks;
|
||||||
|
|
||||||
// make a list of processes in the new communicator
|
// make a list of processes in the new communicator
|
||||||
process_ranks = (int *)malloc(params.world_size * sizeof(int));
|
process_ranks = (int *)malloc(world_size * sizeof(int));
|
||||||
for (int I = 1; I < params.world_size; I++) process_ranks[I - 1] = I;
|
for (int I = 1; I < world_size; I++) process_ranks[I - 1] = I;
|
||||||
|
|
||||||
// get the group under MPI_COMM_WORLD
|
// get the group under MPI_COMM_WORLD
|
||||||
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
|
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
|
||||||
// create the new group
|
// create the new group
|
||||||
MPI_Group_incl(group_world, params.world_size - 1, process_ranks, &dht_group);
|
MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group);
|
||||||
// create the new communicator
|
// create the new communicator
|
||||||
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
|
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
|
||||||
free(process_ranks); // cleanup
|
free(process_ranks); // cleanup
|
||||||
// cout << "Done";
|
// cout << "Done";
|
||||||
|
|
||||||
if (cmdl[{"help", "h"}]) {
|
// if (cmdl[{"help", "h"}]) {
|
||||||
if (params.world_rank == 0) {
|
// if (params.world_rank == 0) {
|
||||||
cout << "Todo" << endl
|
// cout << "Todo" << endl
|
||||||
<< "See README.md for further information." << endl;
|
// << "See README.md for further information." << endl;
|
||||||
}
|
// }
|
||||||
MPI_Finalize();
|
// MPI_Finalize();
|
||||||
return EXIT_SUCCESS;
|
// return EXIT_SUCCESS;
|
||||||
}
|
// }
|
||||||
|
|
||||||
/*INIT is now done separately in an R file provided here as argument!*/
|
// /*INIT is now done separately in an R file provided here as argument!*/
|
||||||
if (!cmdl(2)) {
|
// if (!cmdl(2)) {
|
||||||
if (params.world_rank == 0) {
|
// if (params.world_rank == 0) {
|
||||||
cerr << "ERROR. Kin needs 2 positional arguments: " << endl
|
// cerr << "ERROR. Kin needs 2 positional arguments: " << endl
|
||||||
<< "1) the R script defining your simulation and" << endl
|
// << "1) the R script defining your simulation and" << endl
|
||||||
<< "2) the directory prefix where to save results and profiling"
|
// << "2) the directory prefix where to save results and profiling"
|
||||||
<< endl;
|
// << endl;
|
||||||
}
|
// }
|
||||||
MPI_Finalize();
|
// MPI_Finalize();
|
||||||
return EXIT_FAILURE;
|
// return EXIT_FAILURE;
|
||||||
}
|
// }
|
||||||
|
|
||||||
std::list<std::string> optionsError = checkOptions(cmdl);
|
// std::list<std::string> optionsError = checkOptions(cmdl);
|
||||||
if (!optionsError.empty()) {
|
// if (!optionsError.empty()) {
|
||||||
if (params.world_rank == 0) {
|
// if (params.world_rank == 0) {
|
||||||
cerr << "Unrecognized option(s):\n" << endl;
|
// cerr << "Unrecognized option(s):\n" << endl;
|
||||||
for (auto option : optionsError) {
|
// for (auto option : optionsError) {
|
||||||
cerr << option << endl;
|
// cerr << option << endl;
|
||||||
}
|
// }
|
||||||
cerr << "\nMake sure to use available options. Exiting!" << endl;
|
// cerr << "\nMake sure to use available options. Exiting!" << endl;
|
||||||
}
|
// }
|
||||||
MPI_Finalize();
|
// MPI_Finalize();
|
||||||
return EXIT_FAILURE;
|
// return EXIT_FAILURE;
|
||||||
}
|
// }
|
||||||
|
|
||||||
/*Parse DHT arguments*/
|
// /*Parse DHT arguments*/
|
||||||
params.dht_enabled = cmdl["dht"];
|
// params.dht_enabled = cmdl["dht"];
|
||||||
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
|
// // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
|
||||||
|
|
||||||
if (params.dht_enabled) {
|
// if (params.dht_enabled) {
|
||||||
cmdl("dht-strategy", 0) >> params.dht_strategy;
|
// cmdl("dht-strategy", 0) >> params.dht_strategy;
|
||||||
// cout << "CPP: DHT strategy is " << dht_strategy << endl;
|
// // cout << "CPP: DHT strategy is " << dht_strategy << endl;
|
||||||
|
|
||||||
cmdl("dht-signif", 5) >> dht_significant_digits;
|
// cmdl("dht-signif", 5) >> dht_significant_digits;
|
||||||
// cout << "CPP: DHT significant digits = " << dht_significant_digits <<
|
// // cout << "CPP: DHT significant digits = " << dht_significant_digits <<
|
||||||
// endl;
|
// // endl;
|
||||||
|
|
||||||
params.dht_log = !(cmdl["dht-nolog"]);
|
// params.dht_log = !(cmdl["dht-nolog"]);
|
||||||
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
|
// // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ?
|
||||||
// : "OFF" ) << endl;
|
// "ON"
|
||||||
|
// // : "OFF" ) << endl;
|
||||||
|
|
||||||
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process;
|
// cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> params.dht_size_per_process;
|
||||||
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
|
// // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process
|
||||||
// endl;
|
// <<
|
||||||
|
// // endl;
|
||||||
|
|
||||||
cmdl("dht-snaps", 0) >> params.dht_snaps;
|
// cmdl("dht-snaps", 0) >> params.dht_snaps;
|
||||||
|
|
||||||
cmdl("dht-file") >> params.dht_file;
|
// cmdl("dht-file") >> params.dht_file;
|
||||||
}
|
// }
|
||||||
|
|
||||||
/*Parse work package size*/
|
// /*Parse work package size*/
|
||||||
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size;
|
// cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> params.wp_size;
|
||||||
|
|
||||||
/*Parse output options*/
|
// /*Parse output options*/
|
||||||
store_result = !cmdl["ignore-result"];
|
// store_result = !cmdl["ignore-result"];
|
||||||
|
|
||||||
if (params.world_rank == 0) {
|
|
||||||
cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF")
|
|
||||||
<< endl;
|
|
||||||
cout << "CPP: Work Package Size: " << params.wp_size << endl;
|
|
||||||
cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n';
|
|
||||||
|
|
||||||
if (params.dht_enabled) {
|
|
||||||
cout << "CPP: DHT strategy is " << params.dht_strategy << endl;
|
|
||||||
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
|
|
||||||
"defined) = "
|
|
||||||
<< dht_significant_digits << endl;
|
|
||||||
cout << "CPP: DHT logarithm before rounding: "
|
|
||||||
<< (params.dht_log ? "ON" : "OFF") << endl;
|
|
||||||
cout << "CPP: DHT size per process (Byte) = "
|
|
||||||
<< params.dht_size_per_process << endl;
|
|
||||||
cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl;
|
|
||||||
cout << "CPP: DHT load file is " << params.dht_file << endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cout << "CPP: R Init (RInside) on process " << params.world_rank << endl;
|
|
||||||
RRuntime R(argc, argv);
|
RRuntime R(argc, argv);
|
||||||
params.R = &R;
|
cout << "CPP: R Init (RInside) on process " << world_rank << endl;
|
||||||
|
|
||||||
// if local_rank == 0 then master else worker
|
// if local_rank == 0 then master else worker
|
||||||
R["local_rank"] = params.world_rank;
|
// R["local_rank"] = params.world_rank;
|
||||||
|
|
||||||
/*Loading Dependencies*/
|
/*Loading Dependencies*/
|
||||||
std::string r_load_dependencies =
|
std::string r_load_dependencies =
|
||||||
@ -233,37 +201,71 @@ int main(int argc, char *argv[]) {
|
|||||||
"source('parallel_r_library.R');";
|
"source('parallel_r_library.R');";
|
||||||
R.parseEvalQ(r_load_dependencies);
|
R.parseEvalQ(r_load_dependencies);
|
||||||
|
|
||||||
std::string filesim;
|
Parser parser(argv, world_rank, world_size);
|
||||||
cmdl(1) >> filesim; // <- first positional argument
|
int pret = parser.parseCmdl();
|
||||||
R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
|
|
||||||
R.parseEvalQ(
|
|
||||||
"source(filesim)"); // eval the init string, ignoring any returns
|
|
||||||
|
|
||||||
if (params.world_rank ==
|
if (pret == PARSER_ERROR) {
|
||||||
0) { // only rank 0 initializes goes through the whole initialization
|
MPI_Finalize();
|
||||||
cmdl(2) >> params.out_dir; // <- second positional argument
|
return EXIT_FAILURE;
|
||||||
R["fileout"] =
|
} else if (pret == PARSER_HELP) {
|
||||||
wrap(params.out_dir); // assign a char* (string) to 'fileout'
|
MPI_Finalize();
|
||||||
|
return EXIT_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
parser.parseR(R);
|
||||||
|
params = parser.getParams();
|
||||||
|
|
||||||
|
// if (params.world_rank == 0) {
|
||||||
|
// cout << "CPP: Complete results storage is " << (params.store_result ? "ON" :
|
||||||
|
// "OFF")
|
||||||
|
// << endl;
|
||||||
|
// cout << "CPP: Work Package Size: " << params.wp_size << endl;
|
||||||
|
// cout << "CPP: DHT is " << (params.dht_enabled ? "ON" : "OFF") << '\n';
|
||||||
|
|
||||||
|
// if (params.dht_enabled) {
|
||||||
|
// cout << "CPP: DHT strategy is " << params.dht_strategy << endl;
|
||||||
|
// // cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
|
||||||
|
// // "defined) = "
|
||||||
|
// // << dht_significant_digits << endl;
|
||||||
|
// cout << "CPP: DHT logarithm before rounding: "
|
||||||
|
// << (params.dht_log ? "ON" : "OFF") << endl;
|
||||||
|
// cout << "CPP: DHT size per process (Byte) = "
|
||||||
|
// << params.dht_size_per_process << endl;
|
||||||
|
// cout << "CPP: DHT save snapshots is " << params.dht_snaps << endl;
|
||||||
|
// cout << "CPP: DHT load file is " << params.dht_file << endl;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// std::string filesim;
|
||||||
|
// cmdl(1) >> filesim; // <- first positional argument
|
||||||
|
// R["filesim"] = wrap(filesim); // assign a char* (string) to 'filesim'
|
||||||
|
// R.parseEvalQ(
|
||||||
|
// "source(filesim)"); // eval the init string, ignoring any returns
|
||||||
|
|
||||||
|
// only rank 0 initializes goes through the whole initialization
|
||||||
|
if (world_rank == 0) {
|
||||||
|
// cmdl(2) >> params.out_dir; // <- second positional argument
|
||||||
|
// R["fileout"] =
|
||||||
|
// wrap(params.out_dir); // assign a char* (string) to 'fileout'
|
||||||
|
|
||||||
// Note: R::sim_init() checks if the directory already exists,
|
// Note: R::sim_init() checks if the directory already exists,
|
||||||
// if not it makes it
|
// if not it makes it
|
||||||
|
|
||||||
// pass the boolean "store_result" to the R process
|
// pass the boolean "store_result" to the R process
|
||||||
R["store_result"] = store_result;
|
// R["store_result"] = store_result;
|
||||||
|
|
||||||
// get timestep vector from grid_init function ...
|
// get timestep vector from grid_init function ...
|
||||||
std::string master_init_code = "mysetup <- master_init(setup=setup)";
|
std::string master_init_code = "mysetup <- master_init(setup=setup)";
|
||||||
R.parseEval(master_init_code);
|
R.parseEval(master_init_code);
|
||||||
|
|
||||||
params.dt_differ =
|
params.dt_differ = R.parseEval("mysetup$dt_differ");
|
||||||
R.parseEval("mysetup$dt_differ"); // TODO: Set in DHTWrapper
|
|
||||||
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
||||||
} else { // workers will only read the setup DataFrame defined by input file
|
} else { // workers will only read the setup DataFrame defined by input file
|
||||||
R.parseEval("mysetup <- setup");
|
R.parseEval("mysetup <- setup");
|
||||||
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
MPI_Bcast(&(params.dt_differ), 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (params.world_rank == 0) {
|
if (world_rank == 0) {
|
||||||
cout << "CPP: R init done on process with rank " << params.world_rank
|
cout << "CPP: R init done on process with rank " << params.world_rank
|
||||||
<< endl;
|
<< endl;
|
||||||
}
|
}
|
||||||
@ -279,8 +281,8 @@ int main(int argc, char *argv[]) {
|
|||||||
// Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
|
// Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
|
||||||
|
|
||||||
/* Init Parallel helper functions */
|
/* Init Parallel helper functions */
|
||||||
R["n_procs"] = params.world_size - 1; /* worker count */
|
// R["n_procs"] = params.world_size - 1; /* worker count */
|
||||||
R["work_package_size"] = params.wp_size;
|
// R["work_package_size"] = params.wp_size;
|
||||||
|
|
||||||
// Removed additional field for ID in previous versions
|
// Removed additional field for ID in previous versions
|
||||||
// if (params.world_rank == 0) {
|
// if (params.world_rank == 0) {
|
||||||
@ -293,21 +295,24 @@ int main(int argc, char *argv[]) {
|
|||||||
// (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
|
// (double *)calloc(params.wp_size * (grid.getCols()), sizeof(double));
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (params.world_rank == 0) {
|
// if (params.world_rank == 0) {
|
||||||
cout << "CPP: parallel init completed (buffers allocated)!" << endl;
|
// cout << "CPP: parallel init completed (buffers allocated)!" << endl;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// MDL: pass to R the DHT stuff (basically, only for storing of
|
// MDL: pass to R the DHT stuff (basically, only for storing of
|
||||||
// simulation parameters). These 2 variables are always defined:
|
// simulation parameters). These 2 variables are always defined:
|
||||||
R["dht_enabled"] = params.dht_enabled;
|
// R["dht_enabled"] = params.dht_enabled;
|
||||||
R["dht_log"] = params.dht_log;
|
// R["dht_log"] = params.dht_log;
|
||||||
|
|
||||||
|
params.R = &R;
|
||||||
|
|
||||||
if (params.dht_enabled) {
|
if (params.dht_enabled) {
|
||||||
// cout << "\nCreating DHT\n";
|
// cout << "\nCreating DHT\n";
|
||||||
// determine size of dht entries
|
// determine size of dht entries
|
||||||
// int dht_data_size = grid.getCols() * sizeof(double);
|
// int dht_data_size = grid.getCols() * sizeof(double);
|
||||||
// int dht_key_size =
|
// int dht_key_size =
|
||||||
// grid.getCols() * sizeof(double) + (params.dt_differ * sizeof(double));
|
// grid.getCols() * sizeof(double) + (params.dt_differ *
|
||||||
|
// sizeof(double));
|
||||||
|
|
||||||
// // // determine bucket count for preset memory usage
|
// // // determine bucket count for preset memory usage
|
||||||
// // // bucket size is key + value + 1 byte for status
|
// // // bucket size is key + value + 1 byte for status
|
||||||
@ -320,8 +325,7 @@ int main(int argc, char *argv[]) {
|
|||||||
if (signif_vector_exists) {
|
if (signif_vector_exists) {
|
||||||
params.dht_signif_vector = as<std::vector<int>>(R["signif_vector"]);
|
params.dht_signif_vector = as<std::vector<int>>(R["signif_vector"]);
|
||||||
} else {
|
} else {
|
||||||
// params.dht_signif_vector.assign(dht_object->key_size / sizeof(double),
|
params.dht_signif_vector.assign(grid.getCols(), dht_significant_digits);
|
||||||
// dht_significant_digits);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*Load property type vector from R setup file (or set default)*/
|
/*Load property type vector from R setup file (or set default)*/
|
||||||
@ -329,9 +333,7 @@ int main(int argc, char *argv[]) {
|
|||||||
if (prop_type_vector_exists) {
|
if (prop_type_vector_exists) {
|
||||||
params.dht_prop_type_vector = as<std::vector<string>>(R["prop_type"]);
|
params.dht_prop_type_vector = as<std::vector<string>>(R["prop_type"]);
|
||||||
} else {
|
} else {
|
||||||
// params.dht_prop_type_vector.assign(dht_object->key_size /
|
params.dht_prop_type_vector.assign(grid.getCols(), "act");
|
||||||
// sizeof(double),
|
|
||||||
// "act");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (params.world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
@ -386,6 +388,7 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
if (params.world_rank == 0) { /* This is executed by the master */
|
if (params.world_rank == 0) { /* This is executed by the master */
|
||||||
ChemMaster master(¶ms, R, grid);
|
ChemMaster master(¶ms, R, grid);
|
||||||
|
TransportSim trans(R);
|
||||||
|
|
||||||
Rcpp::NumericVector master_send;
|
Rcpp::NumericVector master_send;
|
||||||
Rcpp::NumericVector master_recv;
|
Rcpp::NumericVector master_recv;
|
||||||
@ -412,22 +415,22 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
cout << "CPP: Calling Advection" << endl;
|
cout << "CPP: Calling Advection" << endl;
|
||||||
|
|
||||||
sim_b_transport = MPI_Wtime();
|
trans.runIteration();
|
||||||
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
|
// sim_b_transport = MPI_Wtime();
|
||||||
sim_a_transport = MPI_Wtime();
|
// R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
|
||||||
|
// sim_a_transport = MPI_Wtime();
|
||||||
|
|
||||||
if (iter == 1) master.prepareSimulation();
|
// if (iter == 1) master.prepareSimulation();
|
||||||
|
|
||||||
cout << "CPP: Chemistry" << endl;
|
cout << "CPP: Chemistry" << endl;
|
||||||
/*Fallback for sequential execution*/
|
/*Fallback for sequential execution*/
|
||||||
sim_b_chemistry = MPI_Wtime();
|
|
||||||
|
|
||||||
if (params.world_size == 1) {
|
if (params.world_size == 1) {
|
||||||
master.runSeq();
|
master.runSeq();
|
||||||
} else { /*send work to workers*/
|
} else { /*send work to workers*/
|
||||||
master.runIteration();
|
master.runPar();
|
||||||
}
|
}
|
||||||
sim_a_chemistry = MPI_Wtime();
|
|
||||||
double master_seq_a = MPI_Wtime();
|
double master_seq_a = MPI_Wtime();
|
||||||
// MDL master_iteration_end just writes on disk state_T and
|
// MDL master_iteration_end just writes on disk state_T and
|
||||||
// state_C after every iteration if the cmdline option
|
// state_C after every iteration if the cmdline option
|
||||||
@ -435,8 +438,8 @@ int main(int argc, char *argv[]) {
|
|||||||
// store_result is TRUE)
|
// store_result is TRUE)
|
||||||
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
|
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
|
||||||
|
|
||||||
cummul_transport += sim_a_transport - sim_b_transport;
|
cummul_transport += trans.getTransportTime();
|
||||||
cummul_chemistry += sim_a_chemistry - sim_b_chemistry;
|
cummul_chemistry += master.getChemistryTime();
|
||||||
|
|
||||||
cout << endl
|
cout << endl
|
||||||
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
|
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
|
||||||
@ -457,8 +460,9 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
} // END SIMULATION LOOP
|
} // END SIMULATION LOOP
|
||||||
|
|
||||||
|
cout << "CPP: finished simulation loop" << endl;
|
||||||
|
|
||||||
sim_end = MPI_Wtime();
|
sim_end = MPI_Wtime();
|
||||||
master.finishSimulation();
|
|
||||||
|
|
||||||
Rcpp::NumericVector phreeqc_time;
|
Rcpp::NumericVector phreeqc_time;
|
||||||
Rcpp::NumericVector dht_get_time;
|
Rcpp::NumericVector dht_get_time;
|
||||||
@ -483,6 +487,9 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
double idle_worker_tmp;
|
double idle_worker_tmp;
|
||||||
|
|
||||||
|
cout << "CPP: Advising worker to stop work and collect data from them"
|
||||||
|
<< endl;
|
||||||
|
|
||||||
for (int p = 0; p < params.world_size - 1; p++) {
|
for (int p = 0; p < params.world_size - 1; p++) {
|
||||||
/* ATTENTION Worker p has rank p+1 */
|
/* ATTENTION Worker p has rank p+1 */
|
||||||
/* Send termination message to worker */
|
/* Send termination message to worker */
|
||||||
@ -570,8 +577,8 @@ int main(int argc, char *argv[]) {
|
|||||||
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
|
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
|
||||||
R.parseEval(r_vis_code);
|
R.parseEval(r_vis_code);
|
||||||
} else { /*This is executed by the workers*/
|
} else { /*This is executed by the workers*/
|
||||||
ChemWorker worker(¶ms, R, grid);
|
ChemWorker worker(¶ms, R, grid, dht_comm);
|
||||||
worker.prepareSimulation(dht_comm);
|
// worker.prepareSimulation(dht_comm);
|
||||||
worker.loop();
|
worker.loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,7 +594,7 @@ int main(int argc, char *argv[]) {
|
|||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
free(mpi_buffer);
|
// free(mpi_buffer);
|
||||||
MPI_Finalize();
|
MPI_Finalize();
|
||||||
|
|
||||||
if (params.world_rank == 0) {
|
if (params.world_rank == 0) {
|
||||||
|
|||||||
@ -15,9 +15,32 @@ ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_)
|
|||||||
: ChemSim(params, R_, grid_) {
|
: ChemSim(params, R_, grid_) {
|
||||||
this->wp_size = params->wp_size;
|
this->wp_size = params->wp_size;
|
||||||
this->out_dir = params->out_dir;
|
this->out_dir = params->out_dir;
|
||||||
|
|
||||||
|
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
|
||||||
|
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
|
||||||
|
sizeof(double));
|
||||||
|
|
||||||
|
R.parseEvalQ(
|
||||||
|
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_C), "
|
||||||
|
"package_size=work_package_size)");
|
||||||
|
|
||||||
|
// we only sort once the vector
|
||||||
|
R.parseEvalQ("ordered_ids <- order(wp_ids)");
|
||||||
|
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
|
||||||
|
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
|
||||||
|
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
|
||||||
|
|
||||||
|
mpi_buffer =
|
||||||
|
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ChemMaster::runIteration() {
|
ChemMaster::~ChemMaster() {
|
||||||
|
free(mpi_buffer);
|
||||||
|
free(workerlist);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ChemMaster::runPar() {
|
||||||
|
double chem_a, chem_b;
|
||||||
double seq_a, seq_b, seq_c, seq_d;
|
double seq_a, seq_b, seq_c, seq_d;
|
||||||
double worker_chemistry_a, worker_chemistry_b;
|
double worker_chemistry_a, worker_chemistry_b;
|
||||||
double sim_e_chemistry, sim_f_chemistry;
|
double sim_e_chemistry, sim_f_chemistry;
|
||||||
@ -25,6 +48,8 @@ void ChemMaster::runIteration() {
|
|||||||
int free_workers;
|
int free_workers;
|
||||||
int i_pkgs;
|
int i_pkgs;
|
||||||
|
|
||||||
|
chem_a = MPI_Wtime();
|
||||||
|
|
||||||
seq_a = MPI_Wtime();
|
seq_a = MPI_Wtime();
|
||||||
grid.shuffleAndExport(mpi_buffer);
|
grid.shuffleAndExport(mpi_buffer);
|
||||||
// retrieve data from R runtime
|
// retrieve data from R runtime
|
||||||
@ -68,6 +93,9 @@ void ChemMaster::runIteration() {
|
|||||||
chem_master += sim_f_chemistry - sim_e_chemistry;
|
chem_master += sim_f_chemistry - sim_e_chemistry;
|
||||||
seq_d = MPI_Wtime();
|
seq_d = MPI_Wtime();
|
||||||
seq_t += seq_d - seq_c;
|
seq_t += seq_d - seq_c;
|
||||||
|
|
||||||
|
chem_b = MPI_Wtime();
|
||||||
|
chem_t += chem_b - chem_a;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
|
void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
|
||||||
@ -180,30 +208,6 @@ void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
|
|||||||
/* end visual progress */
|
/* end visual progress */
|
||||||
}
|
}
|
||||||
|
|
||||||
void ChemMaster::prepareSimulation() {
|
|
||||||
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
|
|
||||||
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
|
|
||||||
sizeof(double));
|
|
||||||
|
|
||||||
R.parseEvalQ(
|
|
||||||
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
|
|
||||||
"package_size=work_package_size)");
|
|
||||||
|
|
||||||
// we only sort once the vector
|
|
||||||
R.parseEvalQ("ordered_ids <- order(wp_ids)");
|
|
||||||
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
|
|
||||||
R.parseEval("stat_wp_sizes(wp_sizes_vector)");
|
|
||||||
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
|
|
||||||
|
|
||||||
mpi_buffer =
|
|
||||||
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
|
|
||||||
}
|
|
||||||
|
|
||||||
void ChemMaster::finishSimulation() {
|
|
||||||
free(mpi_buffer);
|
|
||||||
free(workerlist);
|
|
||||||
}
|
|
||||||
|
|
||||||
double ChemMaster::getSendTime() { return this->send_t; }
|
double ChemMaster::getSendTime() { return this->send_t; }
|
||||||
|
|
||||||
double ChemMaster::getRecvTime() { return this->recv_t; }
|
double ChemMaster::getRecvTime() { return this->recv_t; }
|
||||||
|
|||||||
@ -1,9 +1,12 @@
|
|||||||
#include "ChemSim.h"
|
#include "ChemSim.h"
|
||||||
|
|
||||||
|
#include <Rcpp.h>
|
||||||
|
#include <mpi.h>
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
#include "../util/RRuntime.h"
|
#include "../util/RRuntime.h"
|
||||||
#include "Grid.h"
|
#include "Grid.h"
|
||||||
#include <Rcpp.h>
|
|
||||||
#include <iostream>
|
|
||||||
#include <mpi.h>
|
|
||||||
|
|
||||||
using namespace Rcpp;
|
using namespace Rcpp;
|
||||||
using namespace poet;
|
using namespace poet;
|
||||||
@ -17,7 +20,16 @@ ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ChemSim::runSeq() {
|
void ChemSim::runSeq() {
|
||||||
|
double chem_a, chem_b;
|
||||||
|
|
||||||
|
chem_a = MPI_Wtime();
|
||||||
|
|
||||||
R.parseEvalQ(
|
R.parseEvalQ(
|
||||||
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
|
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
|
||||||
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
|
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
|
||||||
|
|
||||||
|
chem_b = MPI_Wtime();
|
||||||
|
chem_t += chem_b - chem_a;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double ChemSim::getChemistryTime() { return this->chem_t; }
|
||||||
|
|||||||
@ -22,7 +22,9 @@ namespace poet {
|
|||||||
class ChemSim {
|
class ChemSim {
|
||||||
public:
|
public:
|
||||||
ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_);
|
ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_);
|
||||||
|
|
||||||
void runSeq();
|
void runSeq();
|
||||||
|
double getChemistryTime();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
double current_sim_time = 0;
|
double current_sim_time = 0;
|
||||||
@ -47,16 +49,17 @@ protected:
|
|||||||
worker_struct *workerlist;
|
worker_struct *workerlist;
|
||||||
|
|
||||||
double *mpi_buffer;
|
double *mpi_buffer;
|
||||||
|
|
||||||
|
double chem_t = 0.f;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ChemMaster : public ChemSim {
|
class ChemMaster : public ChemSim {
|
||||||
public:
|
public:
|
||||||
ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_);
|
ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_);
|
||||||
|
~ChemMaster();
|
||||||
|
|
||||||
void prepareSimulation();
|
void runPar();
|
||||||
void finishSimulation();
|
void profile();
|
||||||
|
|
||||||
void runIteration();
|
|
||||||
|
|
||||||
double getSendTime();
|
double getSendTime();
|
||||||
double getRecvTime();
|
double getRecvTime();
|
||||||
@ -83,9 +86,9 @@ private:
|
|||||||
|
|
||||||
class ChemWorker : public ChemSim {
|
class ChemWorker : public ChemSim {
|
||||||
public:
|
public:
|
||||||
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_);
|
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_, MPI_Comm dht_comm);
|
||||||
|
~ChemWorker();
|
||||||
|
|
||||||
void prepareSimulation(MPI_Comm dht_comm);
|
|
||||||
void loop();
|
void loop();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -10,15 +10,15 @@ using namespace poet;
|
|||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace Rcpp;
|
using namespace Rcpp;
|
||||||
|
|
||||||
ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_)
|
ChemWorker::ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_,
|
||||||
|
MPI_Comm dht_comm)
|
||||||
: params(params_), ChemSim(params_, R_, grid_) {
|
: params(params_), ChemSim(params_, R_, grid_) {
|
||||||
this->dt_differ = params->dt_differ;
|
this->dt_differ = params->dt_differ;
|
||||||
this->dht_enabled = params->dht_enabled;
|
this->dht_enabled = params->dht_enabled;
|
||||||
this->dht_size_per_process = params->dht_size_per_process;
|
this->dht_size_per_process = params->dht_size_per_process;
|
||||||
this->dht_file = params->dht_file;
|
this->dht_file = params->dht_file;
|
||||||
}
|
this->dht_snaps = params->dht_snaps;
|
||||||
|
|
||||||
void ChemWorker::prepareSimulation(MPI_Comm dht_comm) {
|
|
||||||
mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
|
mpi_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
|
||||||
sizeof(double));
|
sizeof(double));
|
||||||
mpi_buffer_results =
|
mpi_buffer_results =
|
||||||
@ -59,6 +59,12 @@ void ChemWorker::prepareSimulation(MPI_Comm dht_comm) {
|
|||||||
timing[2] = 0.0;
|
timing[2] = 0.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ChemWorker::~ChemWorker() {
|
||||||
|
free(mpi_buffer);
|
||||||
|
free(mpi_buffer_results);
|
||||||
|
delete dht;
|
||||||
|
}
|
||||||
|
|
||||||
void ChemWorker::loop() {
|
void ChemWorker::loop() {
|
||||||
MPI_Status probe_status;
|
MPI_Status probe_status;
|
||||||
while (1) {
|
while (1) {
|
||||||
@ -69,11 +75,11 @@ void ChemWorker::loop() {
|
|||||||
if (probe_status.MPI_TAG == TAG_WORK) {
|
if (probe_status.MPI_TAG == TAG_WORK) {
|
||||||
idle_t += idle_b - idle_a;
|
idle_t += idle_b - idle_a;
|
||||||
doWork(probe_status);
|
doWork(probe_status);
|
||||||
|
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
|
||||||
|
postIter();
|
||||||
} else if (probe_status.MPI_TAG == TAG_FINISH) {
|
} else if (probe_status.MPI_TAG == TAG_FINISH) {
|
||||||
finishWork();
|
finishWork();
|
||||||
break;
|
break;
|
||||||
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
|
|
||||||
postIter();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -265,11 +271,12 @@ void ChemWorker::postIter() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ChemWorker::writeFile() {
|
void ChemWorker::writeFile() {
|
||||||
|
cout.flush();
|
||||||
std::stringstream out;
|
std::stringstream out;
|
||||||
out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht";
|
out << out_dir << "/iter_" << setfill('0') << setw(3) << iteration << ".dht";
|
||||||
int res = dht->tableToFile(out.str().c_str());
|
int res = dht->tableToFile(out.str().c_str());
|
||||||
if (res != DHT_SUCCESS && world_rank == 2)
|
if (res != DHT_SUCCESS && world_rank == 2)
|
||||||
cerr << "CPP: Worker: Errir in writing current state of DHT to file."
|
cerr << "CPP: Worker: Error in writing current state of DHT to file."
|
||||||
<< endl;
|
<< endl;
|
||||||
else if (world_rank == 2)
|
else if (world_rank == 2)
|
||||||
cout << "CPP: Worker: Successfully written DHT to file " << out.str()
|
cout << "CPP: Worker: Successfully written DHT to file " << out.str()
|
||||||
@ -298,9 +305,6 @@ void ChemWorker::readFile() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ChemWorker::finishWork() {
|
void ChemWorker::finishWork() {
|
||||||
if (dht_enabled && dht_snaps > 0) writeFile();
|
|
||||||
|
|
||||||
double dht_perf[3];
|
|
||||||
/* before death, submit profiling/timings to master*/
|
/* before death, submit profiling/timings to master*/
|
||||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
|
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
|
||||||
MPI_STATUS_IGNORE);
|
MPI_STATUS_IGNORE);
|
||||||
@ -313,6 +317,7 @@ void ChemWorker::finishWork() {
|
|||||||
|
|
||||||
if (dht_enabled) {
|
if (dht_enabled) {
|
||||||
// dht_perf
|
// dht_perf
|
||||||
|
double dht_perf[3];
|
||||||
dht_perf[0] = dht->getHits();
|
dht_perf[0] = dht->getHits();
|
||||||
dht_perf[1] = dht->getMisses();
|
dht_perf[1] = dht->getMisses();
|
||||||
dht_perf[2] = dht->getEvictions();
|
dht_perf[2] = dht->getEvictions();
|
||||||
@ -320,7 +325,9 @@ void ChemWorker::finishWork() {
|
|||||||
MPI_COMM_WORLD);
|
MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(mpi_buffer);
|
if (dht_enabled && dht_snaps > 0) writeFile();
|
||||||
free(mpi_buffer_results);
|
|
||||||
delete dht;
|
// free(mpi_buffer);
|
||||||
|
// free(mpi_buffer_results);
|
||||||
|
// delete dht;
|
||||||
}
|
}
|
||||||
|
|||||||
19
src/model/TransportSim.cpp
Normal file
19
src/model/TransportSim.cpp
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
#include "TransportSim.h"
|
||||||
|
|
||||||
|
#include <mpi.h>
|
||||||
|
|
||||||
|
using namespace poet;
|
||||||
|
|
||||||
|
TransportSim::TransportSim(RRuntime &R_) : R(_R) {}
|
||||||
|
|
||||||
|
void TransportSim::runIteration() {
|
||||||
|
double sim_a_transport, sim_b_transport;
|
||||||
|
|
||||||
|
sim_b_transport = MPI_Wtime();
|
||||||
|
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
|
||||||
|
sim_a_transport = MPI_Wtime();
|
||||||
|
|
||||||
|
transport_t += sim_a_transport - sim_b_transport;
|
||||||
|
}
|
||||||
|
|
||||||
|
double TransportSim::getTransportTime { return this->transport_t; }
|
||||||
22
src/model/TransportSim.h
Normal file
22
src/model/TransportSim.h
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
#ifndef TRANSPORT_SIM_H
|
||||||
|
#define TRANSPORT_SIM_H
|
||||||
|
|
||||||
|
#include "../util/RRuntime.h"
|
||||||
|
|
||||||
|
namespace poet {
|
||||||
|
class TransportSim {
|
||||||
|
public:
|
||||||
|
TransportSim(RRuntime &R);
|
||||||
|
|
||||||
|
void runIteration();
|
||||||
|
|
||||||
|
double getTransportTime();
|
||||||
|
|
||||||
|
private:
|
||||||
|
RRuntime &R;
|
||||||
|
|
||||||
|
double transport_t = 0.f;
|
||||||
|
};
|
||||||
|
} // namespace poet
|
||||||
|
|
||||||
|
#endif // TRANSPORT_SIM_H
|
||||||
144
src/util/Parser.cpp
Normal file
144
src/util/Parser.cpp
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
#include "Parser.h"
|
||||||
|
|
||||||
|
#include <Rcpp.h>
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
using namespace poet;
|
||||||
|
using namespace std;
|
||||||
|
using namespace Rcpp;
|
||||||
|
|
||||||
|
Parser::Parser(char *argv[], int world_rank_, int world_size_)
|
||||||
|
: cmdl(argv), world_rank(world_rank_), world_size(world_size_) {
|
||||||
|
this->simparams.world_rank = world_rank_;
|
||||||
|
this->simparams.world_size = world_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
int Parser::parseCmdl() {
|
||||||
|
// if user asked for help
|
||||||
|
if (cmdl[{"help", "h"}]) {
|
||||||
|
if (world_rank == 0) {
|
||||||
|
cout << "Todo" << endl
|
||||||
|
<< "See README.md for further information." << endl;
|
||||||
|
}
|
||||||
|
return PARSER_HELP;
|
||||||
|
}
|
||||||
|
// if positional arguments are missing
|
||||||
|
else if (!cmdl(2)) {
|
||||||
|
if (world_rank == 0) {
|
||||||
|
cerr << "ERROR. Kin needs 2 positional arguments: " << endl
|
||||||
|
<< "1) the R script defining your simulation and" << endl
|
||||||
|
<< "2) the directory prefix where to save results and profiling"
|
||||||
|
<< endl;
|
||||||
|
}
|
||||||
|
return PARSER_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::list<std::string> optionsError = checkOptions(cmdl);
|
||||||
|
if (!optionsError.empty()) {
|
||||||
|
if (world_rank == 0) {
|
||||||
|
cerr << "Unrecognized option(s):\n" << endl;
|
||||||
|
for (auto option : optionsError) {
|
||||||
|
cerr << option << endl;
|
||||||
|
}
|
||||||
|
cerr << "\nMake sure to use available options. Exiting!" << endl;
|
||||||
|
}
|
||||||
|
return PARSER_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*Parse DHT arguments*/
|
||||||
|
simparams.dht_enabled = cmdl["dht"];
|
||||||
|
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
|
||||||
|
|
||||||
|
if (simparams.dht_enabled) {
|
||||||
|
cmdl("dht-strategy", 0) >> simparams.dht_strategy;
|
||||||
|
// cout << "CPP: DHT strategy is " << dht_strategy << endl;
|
||||||
|
|
||||||
|
cmdl("dht-signif", 5) >> dht_significant_digits;
|
||||||
|
// cout << "CPP: DHT significant digits = " << dht_significant_digits <<
|
||||||
|
// endl;
|
||||||
|
|
||||||
|
simparams.dht_log = !(cmdl["dht-nolog"]);
|
||||||
|
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
|
||||||
|
// : "OFF" ) << endl;
|
||||||
|
|
||||||
|
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> simparams.dht_size_per_process;
|
||||||
|
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
|
||||||
|
// endl;
|
||||||
|
|
||||||
|
cmdl("dht-snaps", 0) >> simparams.dht_snaps;
|
||||||
|
|
||||||
|
cmdl("dht-file") >> simparams.dht_file;
|
||||||
|
}
|
||||||
|
/*Parse work package size*/
|
||||||
|
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size;
|
||||||
|
|
||||||
|
/*Parse output options*/
|
||||||
|
simparams.store_result = !cmdl["ignore-result"];
|
||||||
|
|
||||||
|
if (world_rank == 0) {
|
||||||
|
cout << "CPP: Complete results storage is "
|
||||||
|
<< (simparams.store_result ? "ON" : "OFF") << endl;
|
||||||
|
cout << "CPP: Work Package Size: " << simparams.wp_size << endl;
|
||||||
|
cout << "CPP: DHT is " << (simparams.dht_enabled ? "ON" : "OFF") << '\n';
|
||||||
|
|
||||||
|
if (simparams.dht_enabled) {
|
||||||
|
cout << "CPP: DHT strategy is " << simparams.dht_strategy << endl;
|
||||||
|
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
|
||||||
|
"defined) = "
|
||||||
|
<< dht_significant_digits << endl;
|
||||||
|
cout << "CPP: DHT logarithm before rounding: "
|
||||||
|
<< (simparams.dht_log ? "ON" : "OFF") << endl;
|
||||||
|
cout << "CPP: DHT size per process (Byte) = "
|
||||||
|
<< simparams.dht_size_per_process << endl;
|
||||||
|
cout << "CPP: DHT save snapshots is " << simparams.dht_snaps << endl;
|
||||||
|
cout << "CPP: DHT load file is " << simparams.dht_file << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cmdl(1) >> simparams.filesim;
|
||||||
|
cmdl(2) >> simparams.out_dir;
|
||||||
|
|
||||||
|
return PARSER_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Parser::parseR(RRuntime &R) {
|
||||||
|
// if local_rank == 0 then master else worker
|
||||||
|
R["local_rank"] = simparams.world_rank;
|
||||||
|
// assign a char* (string) to 'filesim'
|
||||||
|
R["filesim"] = wrap(simparams.filesim);
|
||||||
|
// assign a char* (string) to 'fileout'
|
||||||
|
R["fileout"] = wrap(simparams.out_dir);
|
||||||
|
// pass the boolean "store_result" to the R process
|
||||||
|
R["store_result"] = simparams.store_result;
|
||||||
|
// worker count
|
||||||
|
R["n_procs"] = simparams.world_size - 1;
|
||||||
|
// work package size
|
||||||
|
R["work_package_size"] = simparams.wp_size;
|
||||||
|
// dht enabled?
|
||||||
|
R["dht_enabled"] = simparams.dht_enabled;
|
||||||
|
// log before rounding?
|
||||||
|
R["dht_log"] = simparams.dht_log;
|
||||||
|
|
||||||
|
// eval the init string, ignoring any returns
|
||||||
|
R.parseEvalQ("source(filesim)");
|
||||||
|
}
|
||||||
|
|
||||||
|
t_simparams Parser::getParams() { return this->simparams; }
|
||||||
|
|
||||||
|
std::list<std::string> Parser::checkOptions(argh::parser cmdl) {
|
||||||
|
std::list<std::string> retList;
|
||||||
|
// std::set<std::string> flist = flagList();
|
||||||
|
// std::set<std::string> plist = paramList();
|
||||||
|
|
||||||
|
for (auto &flag : cmdl.flags()) {
|
||||||
|
if (!(flaglist.find(flag) != flaglist.end())) retList.push_back(flag);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto ¶m : cmdl.params()) {
|
||||||
|
if (!(paramlist.find(param.first) != paramlist.end()))
|
||||||
|
retList.push_back(param.first);
|
||||||
|
}
|
||||||
|
|
||||||
|
return retList;
|
||||||
|
}
|
||||||
42
src/util/Parser.h
Normal file
42
src/util/Parser.h
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
#ifndef PARSER_H
|
||||||
|
#define PARSER_H
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "../argh.h"
|
||||||
|
#include "RRuntime.h"
|
||||||
|
#include "SimParams.h"
|
||||||
|
|
||||||
|
#define PARSER_OK 0
|
||||||
|
#define PARSER_ERROR 1
|
||||||
|
#define PARSER_HELP 2
|
||||||
|
|
||||||
|
#define DHT_SIZE_PER_PROCESS 1073741824
|
||||||
|
#define WORK_PACKAGE_SIZE_DEFAULT 5
|
||||||
|
|
||||||
|
namespace poet {
|
||||||
|
class Parser {
|
||||||
|
public:
|
||||||
|
Parser(char *argv[], int world_rank, int world_size);
|
||||||
|
int parseCmdl();
|
||||||
|
void parseR(RRuntime &R);
|
||||||
|
|
||||||
|
t_simparams getParams();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::list<std::string> checkOptions(argh::parser cmdl);
|
||||||
|
std::set<std::string> flaglist{"ignore-result", "dht", "dht-nolog"};
|
||||||
|
std::set<std::string> paramlist{"work-package-size", "dht-signif",
|
||||||
|
"dht-strategy", "dht-size",
|
||||||
|
"dht-snaps", "dht-file"};
|
||||||
|
argh::parser cmdl;
|
||||||
|
|
||||||
|
t_simparams simparams;
|
||||||
|
|
||||||
|
int world_rank;
|
||||||
|
int world_size;
|
||||||
|
|
||||||
|
int dht_significant_digits;
|
||||||
|
};
|
||||||
|
} // namespace poet
|
||||||
|
#endif // PARSER_H
|
||||||
@ -21,8 +21,11 @@ typedef struct {
|
|||||||
|
|
||||||
unsigned int wp_size;
|
unsigned int wp_size;
|
||||||
|
|
||||||
|
std::string filesim;
|
||||||
std::string out_dir;
|
std::string out_dir;
|
||||||
|
|
||||||
|
bool store_result;
|
||||||
|
|
||||||
void* R;
|
void* R;
|
||||||
void* grid;
|
void* grid;
|
||||||
} t_simparams;
|
} t_simparams;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user