poet/src/poet.cpp

520 lines
16 KiB
C++

/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** Copyright (C) 2023-2024 Max Luebke (University of Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "Base/Macros.hpp"
#include "Base/RInsidePOET.hpp"
// #include "Chemistry/ChemistryModule.hpp"
#include "Chemistry/ChemistryModule.hpp"
#include "DataStructures/Field.hpp"
#include "Init/InitialList.hpp"
#include "Transport/DiffusionModule.hpp"
#include <RInside.h>
#include <R_ext/Boolean.h>
#include <Rcpp/DataFrame.h>
#include <Rcpp/vector/instantiation.h>
#include <poet.hpp>
#include <Rcpp.h>
#include <cstdlib>
#include <string>
#include <mpi.h>
#include "Base/argh.hpp"
using namespace std;
using namespace poet;
using namespace Rcpp;
static int MY_RANK = 0;
// HACK: this is a step back as the order and also the count of fields is
// predefined, but it will change in the future
void writeFieldsToR(RInside &R, const Field &trans, const Field &chem) {
Rcpp::DataFrame t_field(trans.asSEXP());
R["TMP"] = t_field;
R.parseEval("mysetup$state_T <- TMP");
R["TMP"] = chem.asSEXP();
R.parseEval("mysetup$state_C <- TMP");
}
enum ParseRet { PARSER_OK, PARSER_ERROR, PARSER_HELP };
ParseRet parseInitValues(char **argv, RInsidePOET &R,
RuntimeParameters &params) {
// initialize argh object
argh::parser cmdl(argv);
// if user asked for help
if (cmdl[{"help", "h"}]) {
if (MY_RANK == 0) {
MSG("Todo");
MSG("See README.md for further information.");
}
return ParseRet::PARSER_HELP;
}
// if positional arguments are missing
if (!cmdl(3)) {
if (MY_RANK == 0) {
ERRMSG("POET needs 3 positional arguments: ");
ERRMSG("1) the R script defining your simulation runtime.");
ERRMSG("2) the initial .rds file generated by poet_init.");
ERRMSG("3) the directory prefix where to save results and profiling");
}
return ParseRet::PARSER_ERROR;
}
// parse flags and check for unknown
for (const auto &option : cmdl.flags()) {
if (!(flaglist.find(option) != flaglist.end())) {
if (MY_RANK == 0) {
ERRMSG("Unrecognized option: " + option);
ERRMSG("Make sure to use available options. Exiting!");
}
return ParseRet::PARSER_ERROR;
}
}
// parse parameters and check for unknown
for (const auto &option : cmdl.params()) {
if (!(paramlist.find(option.first) != paramlist.end())) {
if (MY_RANK == 0) {
ERRMSG("Unrecognized option: " + option.first);
ERRMSG("Make sure to use available options. Exiting!");
}
return ParseRet::PARSER_ERROR;
}
}
// simparams.print_progressbar = cmdl[{"P", "progress"}];
// // simparams.print_progressbar = cmdl[{"P", "progress"}];
// /* Parse DHT arguments */
// chem_params.use_dht = cmdl["dht"];
// chem_params.use_interp = cmdl["interp"];
// // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
// cmdl("dht-size", DHT_SIZE_PER_PROCESS_MB) >> chem_params.dht_size;
// // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
// // endl;
// cmdl("dht-snaps", 0) >> chem_params.dht_snaps;
// cmdl("dht-file") >> chem_params.dht_file;
// /*Parse work package size*/
// cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size;
// cmdl("interp-size", 100) >> chem_params.pht_size;
// cmdl("interp-min", 5) >> chem_params.interp_min_entries;
// cmdl("interp-bucket-entries", 20) >> chem_params.pht_max_entries;
// /*Parse output options*/
// simparams.store_result = !cmdl["ignore-result"];
// /*Parse work package size*/
// cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size;
// chem_params.use_interp = cmdl["interp"];
// cmdl("interp-size", 100) >> chem_params.pht_size;
// cmdl("interp-min", 5) >> chem_params.interp_min_entries;
// cmdl("interp-bucket-entries", 20) >> chem_params.pht_max_entries;
// /*Parse output options*/
// simparams.store_result = !cmdl["ignore-result"];
// if (simparams.world_rank == 0) {
// MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result));
// MSG("Work Package Size: " + std::to_string(simparams.wp_size));
// MSG("DHT is " + BOOL_PRINT(chem_params.use_dht));
// if (chem_params.use_dht) {
// MSG("DHT strategy is " + std::to_string(simparams.dht_strategy));
// // MDL: these should be outdated (?)
// // MSG("DHT key default digits (ignored if 'signif_vector' is "
// // "defined) = "
// // << simparams.dht_significant_digits);
// // MSG("DHT logarithm before rounding: "
// // << (simparams.dht_log ? "ON" : "OFF"));
// MSG("DHT size per process (Megabyte) = " +
// std::to_string(chem_params.dht_size));
// MSG("DHT save snapshots is " + BOOL_PRINT(chem_params.dht_snaps));
// MSG("DHT load file is " + chem_params.dht_file);
// }
// if (chem_params.use_interp) {
// MSG("PHT interpolation enabled: " +
// BOOL_PRINT(chem_params.use_interp)); MSG("PHT interp-size = " +
// std::to_string(chem_params.pht_size)); MSG("PHT interp-min = " +
// std::to_string(chem_params.interp_min_entries));
// MSG("PHT interp-bucket-entries = " +
// std::to_string(chem_params.pht_max_entries));
// }
// }
std::string init_file;
std::string runtime_file;
std::string out_dir;
cmdl(1) >> runtime_file;
cmdl(2) >> init_file;
cmdl(3) >> out_dir;
// chem_params.dht_outdir = out_dir;
/* distribute information to R runtime */
// if local_rank == 0 then master else worker
R["local_rank"] = MY_RANK;
// assign a char* (string) to 'filesim'
R["filesim"] = wrap(runtime_file);
// assign a char* (string) to 'fileout'
R["fileout"] = wrap(out_dir);
// pass the boolean "store_result" to the R process
// R["store_result"] = simparams.store_result;
// // worker count
// R["n_procs"] = simparams.world_size - 1;
// // work package size
// R["work_package_size"] = simparams.wp_size;
// // dht enabled?
// R["dht_enabled"] = chem_params.use_dht;
// // log before rounding?
// R["dht_log"] = simparams.dht_log;
try {
Rcpp::Function source("source");
Rcpp::Function readRDS("readRDS");
Rcpp::List init_params_ = readRDS(init_file);
params.init_params = init_params_;
Rcpp::List runtime_params =
source(runtime_file, Rcpp::Named("local", true));
runtime_params = runtime_params["value"];
R[r_runtime_parameters] = runtime_params;
params.timesteps =
Rcpp::as<std::vector<double>>(runtime_params["timesteps"]);
} catch (const std::exception &e) {
ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
return ParseRet::PARSER_ERROR;
}
// eval the init string, ignoring any returns
// R.parseEvalQ("source(filesim)");
// R.parseEvalQ("mysetup <- setup");
// this->chem_params.initFromR(R);
return ParseRet::PARSER_OK;
}
// void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
// const std::string &database_path) {
// chem.SetErrorHandlerMode(1);
// chem.SetComponentH2O(false);
// chem.SetRebalanceFraction(0.5);
// chem.SetRebalanceByCell(true);
// chem.UseSolutionDensityVolume(false);
// chem.SetPartitionUZSolids(false);
// // Set concentration units
// // 1, mg/L; 2, mol/L; 3, kg/kgs
// chem.SetUnitsSolution(2);
// // 0, mol/L cell; 1, mol/L water; 2 mol/L rock
// chem.SetUnitsPPassemblage(1);
// // 0, mol/L cell; 1, mol/L water; 2 mol/L rock
// chem.SetUnitsExchange(1);
// // 0, mol/L cell; 1, mol/L water; 2 mol/L rock
// chem.SetUnitsSurface(1);
// // 0, mol/L cell; 1, mol/L water; 2 mol/L rock
// chem.SetUnitsGasPhase(1);
// // 0, mol/L cell; 1, mol/L water; 2 mol/L rock
// chem.SetUnitsSSassemblage(1);
// // 0, mol/L cell; 1, mol/L water; 2 mol/L rock
// chem.SetUnitsKinetics(1);
// // Set representative volume
// std::vector<double> rv;
// rv.resize(wp_size, 1.0);
// chem.SetRepresentativeVolume(rv);
// // Set initial porosity
// std::vector<double> por;
// por.resize(wp_size, 1);
// chem.SetPorosity(por);
// // Set initial saturation
// std::vector<double> sat;
// sat.resize(wp_size, 1.0);
// chem.SetSaturation(sat);
// // Load database
// chem.LoadDatabase(database_path);
// }
static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters &params,
DiffusionModule &diffusion,
ChemistryModule &chem) {
/* Iteration Count is dynamic, retrieving value from R (is only needed by
* master for the following loop) */
uint32_t maxiter = params.timesteps.size();
double sim_time = .0;
// ChemistryModule chem(nxyz_master, params.getNumParams().wp_size, maxiter,
// params.getChemParams(), MPI_COMM_WORLD);
// set_chem_parameters(chem, nxyz_master,
// params.getChemParams().database_path);
// chem.RunInitFile(params.getChemParams().input_script);
// poet::ChemistryModule::SingleCMap init_df =
// DFToHashMap(d_params.initial_t);
// chem.initializeField(diffusion.getField());
// if (params.getNumParams().print_progressbar) {
chem.setProgressBarPrintout(true);
// }
/* SIMULATION LOOP */
double dSimTime{0};
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
double start_t = MPI_Wtime();
uint32_t tick = 0;
// cout << "CPP: Evaluating next time step" << endl;
// R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
const double &dt = params.timesteps[iter - 1];
// cout << "CPP: Next time step is " << dt << "[s]" << endl;
MSG("Next time step is " + std::to_string(dt) + " [s]");
/* displaying iteration number, with C++ and R iterator */
MSG("Going through iteration " + std::to_string(iter));
// MSG("R's $iter: " +
// std::to_string((uint32_t)(R.parseEval("mysetup$iter"))) +
// ". Iteration");
/* run transport */
// TODO: transport to diffusion
diffusion.simulate(dt);
chem.GetField().update(diffusion.getField());
// chem.getfield().update(diffusion.getfield());
MSG("Chemistry step");
chem.simulate(dt);
writeFieldsToR(R, diffusion.getField(), chem.GetField());
// R["store_result"] = true;
// R.parseEval("mysetup$store_result <- TRUE");
diffusion.getField().update(chem.GetField());
R["req_dt"] = dt;
R["simtime"] = (sim_time += dt);
R.parseEval("mysetup$req_dt <- req_dt");
R.parseEval("mysetup$simtime <- simtime");
R["iter"] = iter;
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEval("mysetup <- master_iteration_end(setup=mysetup, iter)");
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
std::to_string(maxiter));
MSG();
// MPI_Barrier(MPI_COMM_WORLD);
double end_t = MPI_Wtime();
dSimTime += end_t - start_t;
} // END SIMULATION LOOP
Rcpp::List chem_profiling;
chem_profiling["simtime"] = chem.GetChemistryTime();
chem_profiling["loop"] = chem.GetMasterLoopTime();
chem_profiling["sequential"] = chem.GetMasterSequentialTime();
chem_profiling["idle_master"] = chem.GetMasterIdleTime();
chem_profiling["idle_worker"] = Rcpp::wrap(chem.GetWorkerIdleTimings());
chem_profiling["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings());
Rcpp::List diffusion_profiling;
diffusion_profiling["simtime"] = diffusion.getTransportTime();
Rcpp::List profiling;
profiling["simtime"] = dSimTime;
profiling["chemistry"] = chem_profiling;
profiling["diffusion"] = diffusion_profiling;
// if (params.getChemParams().use_dht) {
// R["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
// R.parseEvalQ("profiling$dht_hits <- dht_hits");
// R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
// R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
// R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
// R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
// R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());
// R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
// }
// if (params.getChemParams().use_interp) {
// R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
// R.parseEvalQ("profiling$interp_write <- interp_w");
// R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
// R.parseEvalQ("profiling$interp_read <- interp_r");
// R["interp_g"] =
// Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
// R.parseEvalQ("profiling$interp_gather <- interp_g");
// R["interp_fc"] =
// Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
// R.parseEvalQ("profiling$interp_function_calls <- interp_fc");
// R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls());
// R.parseEvalQ("profiling$interp_calls <- interp_calls");
// R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
// R.parseEvalQ("profiling$interp_cached <- interp_cached");
// }
chem.MasterLoopBreak();
return profiling;
}
int main(int argc, char *argv[]) {
double dSimTime, sim_end;
int world_size, world_rank;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
RInsidePOET &R = RInsidePOET::getInstance();
if (world_rank == 0) {
MSG("Running POET version " + std::string(poet_version));
}
/*Loading Dependencies*/
// TODO: kann raus
R.parseEvalQ(kin_r_library);
RuntimeParameters run_params;
switch (parseInitValues(argv, R, run_params)) {
case ParseRet::PARSER_ERROR:
case ParseRet::PARSER_HELP:
MPI_Finalize();
return 0;
case ParseRet::PARSER_OK:
break;
}
InitialList init_list(R);
init_list.importList(run_params.init_params);
MSG("RInside initialized on process " + std::to_string(world_rank));
if (world_rank > 0) {
ChemistryModule worker(1, init_list.getChemistryInit(), MPI_COMM_WORLD);
worker.WorkerLoop();
MPI_Barrier(MPI_COMM_WORLD);
MSG("finished, cleanup of process " + std::to_string(world_rank));
MPI_Finalize();
return EXIT_SUCCESS;
}
// R.parseEvalQ("mysetup <- setup");
// // if (world_rank == 0) { // get timestep vector from
// // grid_init function ... //
std::string master_init_code = "mysetup <- master_init(setup=mysetup)";
R.parseEval(master_init_code);
// run_params.initVectorParams(R);
// MDL: store all parameters
if (world_rank == 0) {
MSG("Calling R Function to store calling parameters");
// R.parseEvalQ("StoreSetup(setup=mysetup)");
}
if (world_rank == 0) {
MSG("Init done on process with rank " + std::to_string(world_rank));
}
// MPI_Barrier(MPI_COMM_WORLD);
DiffusionModule diffusion(init_list.getDiffusionInit(),
init_list.getInitialGrid());
ChemistryModule chemistry(1, init_list.getChemistryInit(), MPI_COMM_WORLD);
chemistry.masterSetField(init_list.getInitialGrid());
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
MSG("finished simulation loop");
MSG("start timing profiling");
// R["simtime"] = dSimTime;
// R.parseEvalQ("profiling$simtime <- simtime");
R["profiling"] = profiling;
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
// MSG("Done! Results are stored as R objects into <" + run_params.getOutDir()
// +
// "/timings.rds>");
MPI_Barrier(MPI_COMM_WORLD);
MSG("finished, cleanup of process " + std::to_string(world_rank));
MPI_Finalize();
if (world_rank == 0) {
MSG("done, bye!");
}
exit(0);
}