mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 04:48:23 +01:00
517 lines
18 KiB
C++
517 lines
18 KiB
C++
/*
|
|
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
|
|
** Potsdam)
|
|
**
|
|
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
|
|
**
|
|
** Copyright (C) 2023-2024 Max Luebke (University of Potsdam)
|
|
**
|
|
** POET is free software; you can redistribute it and/or modify it under the
|
|
** terms of the GNU General Public License as published by the Free Software
|
|
** Foundation; either version 2 of the License, or (at your option) any later
|
|
** version.
|
|
**
|
|
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
|
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
|
**
|
|
** You should have received a copy of the GNU General Public License along with
|
|
** this program; if not, write to the Free Software Foundation, Inc., 51
|
|
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#include "Base/Macros.hpp"
|
|
#include "Base/RInsidePOET.hpp"
|
|
#include "Chemistry/ChemistryModule.hpp"
|
|
#include "DataStructures/Field.hpp"
|
|
#include "Init/InitialList.hpp"
|
|
#include "Transport/DiffusionModule.hpp"
|
|
|
|
#include <RInside.h>
|
|
#include <Rcpp.h>
|
|
#include <Rcpp/DataFrame.h>
|
|
#include <Rcpp/Function.h>
|
|
#include <Rcpp/vector/instantiation.h>
|
|
#include <cstdlib>
|
|
#include <memory>
|
|
#include <mpi.h>
|
|
#include <optional>
|
|
#include <string>
|
|
|
|
#include "Base/argh.hpp"
|
|
|
|
#include <poet.hpp>
|
|
|
|
using namespace std;
|
|
using namespace poet;
|
|
using namespace Rcpp;
|
|
|
|
static int MY_RANK = 0;
|
|
|
|
static std::unique_ptr<Rcpp::List> global_rt_setup;
|
|
|
|
// we need some layz evaluation, as we can't define the functions before the R
|
|
// runtime is initialized
|
|
static std::optional<Rcpp::Function> master_init_R;
|
|
static std::optional<Rcpp::Function> master_iteration_end_R;
|
|
static std::optional<Rcpp::Function> store_setup_R;
|
|
|
|
static void init_global_functions(RInside &R) {
|
|
R.parseEval(kin_r_library);
|
|
master_init_R = Rcpp::Function("master_init");
|
|
master_iteration_end_R = Rcpp::Function("master_iteration_end");
|
|
store_setup_R = Rcpp::Function("StoreSetup");
|
|
}
|
|
|
|
// HACK: this is a step back as the order and also the count of fields is
|
|
// predefined, but it will change in the future
|
|
// static inline void writeFieldsToR(RInside &R, const Field &trans,
|
|
// const Field &chem) {
|
|
|
|
// Rcpp::DataFrame t_field(trans.asSEXP());
|
|
// R["TMP"] = t_field;
|
|
// R.parseEval("mysetup$state_T <- TMP");
|
|
|
|
// R["TMP"] = chem.asSEXP();
|
|
// R.parseEval("mysetup$state_C <- TMP");
|
|
// }
|
|
|
|
enum ParseRet { PARSER_OK, PARSER_ERROR, PARSER_HELP };
|
|
|
|
ParseRet parseInitValues(char **argv, RuntimeParameters ¶ms) {
|
|
// initialize argh object
|
|
argh::parser cmdl(argv);
|
|
|
|
// if user asked for help
|
|
if (cmdl[{"help", "h"}]) {
|
|
if (MY_RANK == 0) {
|
|
MSG("Todo");
|
|
MSG("See README.md for further information.");
|
|
}
|
|
|
|
return ParseRet::PARSER_HELP;
|
|
}
|
|
// if positional arguments are missing
|
|
if (!cmdl(3)) {
|
|
if (MY_RANK == 0) {
|
|
ERRMSG("POET needs 3 positional arguments: ");
|
|
ERRMSG("1) the R script defining your simulation runtime.");
|
|
ERRMSG("2) the initial .rds file generated by poet_init.");
|
|
ERRMSG("3) the directory prefix where to save results and profiling");
|
|
}
|
|
return ParseRet::PARSER_ERROR;
|
|
}
|
|
|
|
// parse flags and check for unknown
|
|
for (const auto &option : cmdl.flags()) {
|
|
if (!(flaglist.find(option) != flaglist.end())) {
|
|
if (MY_RANK == 0) {
|
|
ERRMSG("Unrecognized option: " + option);
|
|
ERRMSG("Make sure to use available options. Exiting!");
|
|
}
|
|
return ParseRet::PARSER_ERROR;
|
|
}
|
|
}
|
|
|
|
// parse parameters and check for unknown
|
|
for (const auto &option : cmdl.params()) {
|
|
if (!(paramlist.find(option.first) != paramlist.end())) {
|
|
if (MY_RANK == 0) {
|
|
ERRMSG("Unrecognized option: " + option.first);
|
|
ERRMSG("Make sure to use available options. Exiting!");
|
|
}
|
|
return ParseRet::PARSER_ERROR;
|
|
}
|
|
}
|
|
|
|
params.print_progressbar = cmdl[{"P", "progress"}];
|
|
|
|
/*Parse work package size*/
|
|
cmdl("work-package-size", CHEM_DEFAULT_WORK_PACKAGE_SIZE) >>
|
|
params.work_package_size;
|
|
|
|
/* Parse DHT arguments */
|
|
params.use_dht = cmdl["dht"];
|
|
params.use_interp = cmdl["interp"];
|
|
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
|
|
|
|
cmdl("dht-size", CHEM_DHT_SIZE_PER_PROCESS_MB) >> params.dht_size;
|
|
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
|
|
// endl;
|
|
|
|
cmdl("dht-snaps", 0) >> params.dht_snaps;
|
|
|
|
params.use_interp = cmdl["interp"];
|
|
cmdl("interp-size", 100) >> params.interp_size;
|
|
cmdl("interp-min", 5) >> params.interp_min_entries;
|
|
cmdl("interp-bucket-entries", 20) >> params.interp_bucket_entries;
|
|
|
|
params.use_ai_surrogate = cmdl["ai-surrogate"];
|
|
|
|
if (MY_RANK == 0) {
|
|
// MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result));
|
|
MSG("Work Package Size: " + std::to_string(params.work_package_size));
|
|
MSG("DHT is " + BOOL_PRINT(params.use_dht));
|
|
MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate));
|
|
|
|
if (params.use_dht) {
|
|
// MSG("DHT strategy is " + std::to_string(simparams.dht_strategy));
|
|
// MDL: these should be outdated (?)
|
|
// MSG("DHT key default digits (ignored if 'signif_vector' is "
|
|
// "defined) = "
|
|
// << simparams.dht_significant_digits);
|
|
// MSG("DHT logarithm before rounding: "
|
|
// << (simparams.dht_log ? "ON" : "OFF"));
|
|
MSG("DHT size per process (Megabyte) = " +
|
|
std::to_string(params.dht_size));
|
|
MSG("DHT save snapshots is " + BOOL_PRINT(params.dht_snaps));
|
|
// MSG("DHT load file is " + chem_params.dht_file);
|
|
}
|
|
|
|
if (params.use_interp) {
|
|
MSG("PHT interpolation enabled: " + BOOL_PRINT(params.use_interp));
|
|
MSG("PHT interp-size = " + std::to_string(params.interp_size));
|
|
MSG("PHT interp-min = " + std::to_string(params.interp_min_entries));
|
|
MSG("PHT interp-bucket-entries = " +
|
|
std::to_string(params.interp_bucket_entries));
|
|
}
|
|
}
|
|
|
|
std::string init_file;
|
|
std::string runtime_file;
|
|
|
|
cmdl(1) >> runtime_file;
|
|
cmdl(2) >> init_file;
|
|
cmdl(3) >> params.out_dir;
|
|
|
|
// chem_params.dht_outdir = out_dir;
|
|
|
|
/* distribute information to R runtime */
|
|
// if local_rank == 0 then master else worker
|
|
// R["local_rank"] = MY_RANK;
|
|
// assign a char* (string) to 'filesim'
|
|
// R["filesim"] = wrap(runtime_file);
|
|
// assign a char* (string) to 'fileout'
|
|
// R["fileout"] = wrap(out_dir);
|
|
// pass the boolean "store_result" to the R process
|
|
// R["store_result"] = simparams.store_result;
|
|
// // worker count
|
|
// R["n_procs"] = simparams.world_size - 1;
|
|
// // work package size
|
|
// R["work_package_size"] = simparams.wp_size;
|
|
// // dht enabled?
|
|
// R["dht_enabled"] = chem_params.use_dht;
|
|
// // log before rounding?
|
|
// R["dht_log"] = simparams.dht_log;
|
|
|
|
try {
|
|
Rcpp::Function source("source");
|
|
Rcpp::Function readRDS("readRDS");
|
|
|
|
Rcpp::List init_params_ = readRDS(init_file);
|
|
params.init_params = init_params_;
|
|
|
|
global_rt_setup = std::make_unique<Rcpp::List>();
|
|
*global_rt_setup = source(runtime_file, Rcpp::Named("local", true));
|
|
*global_rt_setup = global_rt_setup->operator[]("value");
|
|
|
|
params.timesteps =
|
|
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps"));
|
|
|
|
} catch (const std::exception &e) {
|
|
ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
|
|
return ParseRet::PARSER_ERROR;
|
|
}
|
|
|
|
return ParseRet::PARSER_OK;
|
|
}
|
|
|
|
// HACK: this is a step back as the order and also the count of fields is
|
|
// predefined, but it will change in the future
|
|
void call_master_iter_end(RInside &R, const Field &trans, const Field &chem) {
|
|
R["TMP"] = Rcpp::wrap(trans.AsVector());
|
|
R["TMP_PROPS"] = Rcpp::wrap(trans.GetProps());
|
|
R.parseEval(std::string("state_T <- setNames(data.frame(matrix(TMP, nrow=" +
|
|
std::to_string(trans.GetRequestedVecSize()) +
|
|
")), TMP_PROPS)"));
|
|
|
|
R["TMP"] = Rcpp::wrap(chem.AsVector());
|
|
R["TMP_PROPS"] = Rcpp::wrap(chem.GetProps());
|
|
R.parseEval(std::string("state_C <- setNames(data.frame(matrix(TMP, nrow=" +
|
|
std::to_string(chem.GetRequestedVecSize()) +
|
|
")), TMP_PROPS)"));
|
|
R["setup"] = *global_rt_setup;
|
|
R.parseEval("setup <- master_iteration_end(setup, state_T, state_C)");
|
|
*global_rt_setup = R["setup"];
|
|
}
|
|
|
|
static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
|
DiffusionModule &diffusion,
|
|
ChemistryModule &chem) {
|
|
|
|
/* Iteration Count is dynamic, retrieving value from R (is only needed by
|
|
* master for the following loop) */
|
|
uint32_t maxiter = params.timesteps.size();
|
|
|
|
if (params.print_progressbar) {
|
|
chem.setProgressBarPrintout(true);
|
|
}
|
|
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
|
|
|
|
/* SIMULATION LOOP */
|
|
double dSimTime{0};
|
|
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
|
|
double start_t = MPI_Wtime();
|
|
|
|
const double &dt = params.timesteps[iter - 1];
|
|
|
|
// cout << "CPP: Next time step is " << dt << "[s]" << endl;
|
|
MSG("Next time step is " + std::to_string(dt) + " [s]");
|
|
|
|
/* displaying iteration number, with C++ and R iterator */
|
|
MSG("Going through iteration " + std::to_string(iter));
|
|
|
|
/* run transport */
|
|
diffusion.simulate(dt);
|
|
|
|
chem.getField().update(diffusion.getField());
|
|
|
|
MSG("Chemistry step");
|
|
if (params.use_ai_surrogate) {
|
|
double ai_start_t = MPI_Wtime();
|
|
// Save current values from the tug field as predictor for the ai step
|
|
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
|
|
R.parseEval(std::string(
|
|
"predictors <- setNames(data.frame(matrix(TMP, nrow=" +
|
|
std::to_string(chem.getField().GetRequestedVecSize()) + ")), TMP_PROPS)"));
|
|
R.parseEval("predictors <- predictors[ai_surrogate_species]");
|
|
|
|
// Predict
|
|
R.parseEval("predictors_scaled <- preprocess(predictors)");
|
|
|
|
R.parseEval("print('PREDICTORS:')");
|
|
R.parseEval("print(head(predictors))");
|
|
|
|
R.parseEval("prediction <- preprocess(prediction_step(model, predictors_scaled),\
|
|
backtransform = TRUE,\
|
|
outputs = TRUE)");
|
|
|
|
// Validate prediction and write valid predictions to chem field
|
|
R.parseEval("validity_vector <- validate_predictions(predictors,\
|
|
prediction)");
|
|
chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector"));
|
|
|
|
std::vector<std::vector<double>> RTempField = R.parseEval("set_valid_predictions(predictors,\
|
|
prediction,\
|
|
validity_vector)");
|
|
|
|
Field predictions_field = Field(R.parseEval("nrow(predictors)"),
|
|
RTempField,
|
|
R.parseEval("names(predictors)"));
|
|
chem.getField().update(predictions_field);
|
|
double ai_end_t = MPI_Wtime();
|
|
R["ai_prediction_time"] = ai_end_t - ai_start_t;
|
|
}
|
|
|
|
chem.simulate(dt);
|
|
|
|
/* AI surrogate iterative training*/
|
|
if (params.use_ai_surrogate) {
|
|
double ai_start_t = MPI_Wtime();
|
|
|
|
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
|
|
R.parseEval(std::string(
|
|
"targets <- setNames(data.frame(matrix(TMP, nrow=" +
|
|
std::to_string(chem.getField().GetRequestedVecSize()) + ")), TMP_PROPS)"));
|
|
R.parseEval("targets <- targets[ai_surrogate_species]");
|
|
|
|
// TODO: Check how to get the correct columns
|
|
R.parseEval("target_scaled <- preprocess(targets, outputs = TRUE)");
|
|
|
|
R.parseEval("print('TARGET:')");
|
|
R.parseEval("print(head(target_scaled))");
|
|
|
|
R.parseEval("training_step(model, predictors_scaled, target_scaled, validity_vector)");
|
|
double ai_end_t = MPI_Wtime();
|
|
R["ai_training_time"] = ai_end_t - ai_start_t;
|
|
}
|
|
|
|
// MPI_Barrier(MPI_COMM_WORLD);
|
|
double end_t = MPI_Wtime();
|
|
dSimTime += end_t - start_t;
|
|
R["totaltime"] = dSimTime;
|
|
|
|
// MDL master_iteration_end just writes on disk state_T and
|
|
// state_C after every iteration if the cmdline option
|
|
// --ignore-results is not given (and thus the R variable
|
|
// store_result is TRUE)
|
|
call_master_iter_end(R, diffusion.getField(), chem.getField());
|
|
|
|
diffusion.getField().update(chem.getField());
|
|
|
|
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
|
|
std::to_string(maxiter));
|
|
MSG();
|
|
} // END SIMULATION LOOP
|
|
|
|
Rcpp::List chem_profiling;
|
|
chem_profiling["simtime"] = chem.GetChemistryTime();
|
|
chem_profiling["loop"] = chem.GetMasterLoopTime();
|
|
chem_profiling["sequential"] = chem.GetMasterSequentialTime();
|
|
chem_profiling["idle_master"] = chem.GetMasterIdleTime();
|
|
chem_profiling["idle_worker"] = Rcpp::wrap(chem.GetWorkerIdleTimings());
|
|
chem_profiling["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings());
|
|
|
|
Rcpp::List diffusion_profiling;
|
|
diffusion_profiling["simtime"] = diffusion.getTransportTime();
|
|
|
|
if (params.use_dht) {
|
|
chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
|
|
chem_profiling["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
|
|
chem_profiling["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
|
|
chem_profiling["dht_fill_time"] =
|
|
Rcpp::wrap(chem.GetWorkerDHTFillTimings());
|
|
}
|
|
|
|
if (params.use_interp) {
|
|
chem_profiling["interp_w"] =
|
|
Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
|
|
chem_profiling["interp_r"] =
|
|
Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
|
|
chem_profiling["interp_g"] =
|
|
Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
|
|
chem_profiling["interp_fc"] =
|
|
Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
|
|
chem_profiling["interp_calls"] =
|
|
Rcpp::wrap(chem.GetWorkerInterpolationCalls());
|
|
chem_profiling["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
|
|
}
|
|
|
|
Rcpp::List profiling;
|
|
profiling["simtime"] = dSimTime;
|
|
profiling["chemistry"] = chem_profiling;
|
|
profiling["diffusion"] = diffusion_profiling;
|
|
|
|
chem.MasterLoopBreak();
|
|
|
|
return profiling;
|
|
}
|
|
|
|
int main(int argc, char *argv[]) {
|
|
int world_size;
|
|
|
|
MPI_Init(&argc, &argv);
|
|
|
|
{
|
|
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
|
MPI_Comm_rank(MPI_COMM_WORLD, &MY_RANK);
|
|
|
|
RInsidePOET &R = RInsidePOET::getInstance();
|
|
|
|
if (MY_RANK == 0) {
|
|
MSG("Running POET version " + std::string(poet_version));
|
|
}
|
|
|
|
RuntimeParameters run_params;
|
|
|
|
switch (parseInitValues(argv, run_params)) {
|
|
case ParseRet::PARSER_ERROR:
|
|
case ParseRet::PARSER_HELP:
|
|
MPI_Finalize();
|
|
return 0;
|
|
case ParseRet::PARSER_OK:
|
|
break;
|
|
}
|
|
|
|
InitialList init_list(R);
|
|
init_list.importList(run_params.init_params, MY_RANK != 0);
|
|
|
|
MSG("RInside initialized on process " + std::to_string(MY_RANK));
|
|
|
|
std::cout << std::flush;
|
|
|
|
MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
ChemistryModule chemistry(run_params.work_package_size,
|
|
init_list.getChemistryInit(), MPI_COMM_WORLD);
|
|
|
|
const ChemistryModule::SurrogateSetup surr_setup = {
|
|
init_list.getInitialGrid().GetProps(),
|
|
run_params.use_dht,
|
|
run_params.dht_size,
|
|
run_params.use_interp,
|
|
run_params.interp_bucket_entries,
|
|
run_params.interp_size,
|
|
run_params.interp_min_entries,
|
|
run_params.use_ai_surrogate};
|
|
|
|
chemistry.masterEnableSurrogates(surr_setup);
|
|
|
|
if (MY_RANK > 0) {
|
|
chemistry.WorkerLoop();
|
|
} else {
|
|
init_global_functions(R);
|
|
// R.parseEvalQ("mysetup <- setup");
|
|
// // if (MY_RANK == 0) { // get timestep vector from
|
|
// // grid_init function ... //
|
|
*global_rt_setup =
|
|
master_init_R.value()(*global_rt_setup, run_params.out_dir,
|
|
init_list.getInitialGrid().asSEXP());
|
|
|
|
// MDL: store all parameters
|
|
// MSG("Calling R Function to store calling parameters");
|
|
// R.parseEvalQ("StoreSetup(setup=mysetup)");
|
|
if (run_params.use_ai_surrogate) {
|
|
/* Incorporate ai surrogate from R */
|
|
R.parseEvalQ(ai_surrogate_r_library);
|
|
/* Use dht species for model input and output */
|
|
R["ai_surrogate_species"] = init_list.getChemistryInit().dht_species.getNames();
|
|
R["out_dir"] = run_params.out_dir;
|
|
|
|
const std::string ai_surrogate_input_script_path = init_list.getChemistryInit().ai_surrogate_input_script;
|
|
|
|
if (!ai_surrogate_input_script_path.empty()) {
|
|
R["ai_surrogate_base_path"] = ai_surrogate_input_script_path.substr(0, ai_surrogate_input_script_path.find_last_of('/') + 1);
|
|
R.parseEvalQ("source('" + ai_surrogate_input_script_path + "')");
|
|
}
|
|
R.parseEval("model <- initiate_model()");
|
|
R.parseEval("gpu_info()");
|
|
}
|
|
|
|
MSG("Init done on process with rank " + std::to_string(MY_RANK));
|
|
|
|
// MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
DiffusionModule diffusion(init_list.getDiffusionInit(),
|
|
init_list.getInitialGrid());
|
|
|
|
chemistry.masterSetField(init_list.getInitialGrid());
|
|
|
|
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
|
|
|
|
MSG("finished simulation loop");
|
|
|
|
R["profiling"] = profiling;
|
|
R["setup"] = *global_rt_setup;
|
|
|
|
string r_vis_code;
|
|
r_vis_code =
|
|
"saveRDS(profiling, file=paste0(setup$out_dir,'/timings.rds'));";
|
|
R.parseEval(r_vis_code);
|
|
|
|
MSG("Done! Results are stored as R objects into <" + run_params.out_dir +
|
|
"/timings.rds>");
|
|
}
|
|
}
|
|
|
|
MSG("finished, cleanup of process " + std::to_string(MY_RANK));
|
|
|
|
MPI_Finalize();
|
|
|
|
if (MY_RANK == 0) {
|
|
MSG("done, bye!");
|
|
}
|
|
|
|
exit(0);
|
|
}
|