/* ** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of ** Potsdam) ** ** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam) ** ** Copyright (C) 2023-2024 Max Luebke (University of Potsdam) ** ** POET is free software; you can redistribute it and/or modify it under the ** terms of the GNU General Public License as published by the Free Software ** Foundation; either version 2 of the License, or (at your option) any later ** version. ** ** POET is distributed in the hope that it will be useful, but WITHOUT ANY ** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR ** A PARTICULAR PURPOSE. See the GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License along with ** this program; if not, write to the Free Software Foundation, Inc., 51 ** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "Base/Macros.hpp" #include "Base/RInsidePOET.hpp" #include "Chemistry/ChemistryModule.hpp" #include "DataStructures/Field.hpp" #include "Init/InitialList.hpp" #include "Transport/DiffusionModule.hpp" #include #include #include #include #include #include "Base/argh.hpp" #include using namespace std; using namespace poet; using namespace Rcpp; static int MY_RANK = 0; // HACK: this is a step back as the order and also the count of fields is // predefined, but it will change in the future void writeFieldsToR(RInside &R, const Field &trans, const Field &chem) { Rcpp::DataFrame t_field(trans.asSEXP()); R["TMP"] = t_field; R.parseEval("mysetup$state_T <- TMP"); R["TMP"] = chem.asSEXP(); R.parseEval("mysetup$state_C <- TMP"); } enum ParseRet { PARSER_OK, PARSER_ERROR, PARSER_HELP }; ParseRet parseInitValues(char **argv, RInsidePOET &R, RuntimeParameters ¶ms) { // initialize argh object argh::parser cmdl(argv); // if user asked for help if (cmdl[{"help", "h"}]) { if (MY_RANK == 0) { MSG("Todo"); MSG("See README.md for further information."); } return ParseRet::PARSER_HELP; } // if positional arguments are missing if (!cmdl(3)) { if (MY_RANK == 0) { ERRMSG("POET needs 3 positional arguments: "); ERRMSG("1) the R script defining your simulation runtime."); ERRMSG("2) the initial .rds file generated by poet_init."); ERRMSG("3) the directory prefix where to save results and profiling"); } return ParseRet::PARSER_ERROR; } // parse flags and check for unknown for (const auto &option : cmdl.flags()) { if (!(flaglist.find(option) != flaglist.end())) { if (MY_RANK == 0) { ERRMSG("Unrecognized option: " + option); ERRMSG("Make sure to use available options. Exiting!"); } return ParseRet::PARSER_ERROR; } } // parse parameters and check for unknown for (const auto &option : cmdl.params()) { if (!(paramlist.find(option.first) != paramlist.end())) { if (MY_RANK == 0) { ERRMSG("Unrecognized option: " + option.first); ERRMSG("Make sure to use available options. Exiting!"); } return ParseRet::PARSER_ERROR; } } params.print_progressbar = cmdl[{"P", "progress"}]; /*Parse work package size*/ cmdl("work-package-size", CHEM_DEFAULT_WORK_PACKAGE_SIZE) >> params.work_package_size; /* Parse DHT arguments */ params.use_dht = cmdl["dht"]; params.use_interp = cmdl["interp"]; // cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n'; cmdl("dht-size", CHEM_DHT_SIZE_PER_PROCESS_MB) >> params.dht_size; // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process << // endl; cmdl("dht-snaps", 0) >> params.dht_snaps; params.use_interp = cmdl["interp"]; cmdl("interp-size", 100) >> params.interp_size; cmdl("interp-min", 5) >> params.interp_min_entries; cmdl("interp-bucket-entries", 20) >> params.interp_bucket_entries; /*Parse output options*/ // simparams.store_result = !cmdl["ignore-result"]; /*Parse output options*/ // simparams.store_result = !cmdl["ignore-result"]; if (MY_RANK == 0) { // MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result)); MSG("Work Package Size: " + std::to_string(params.work_package_size)); MSG("DHT is " + BOOL_PRINT(params.use_dht)); if (params.use_dht) { // MSG("DHT strategy is " + std::to_string(simparams.dht_strategy)); // MDL: these should be outdated (?) // MSG("DHT key default digits (ignored if 'signif_vector' is " // "defined) = " // << simparams.dht_significant_digits); // MSG("DHT logarithm before rounding: " // << (simparams.dht_log ? "ON" : "OFF")); MSG("DHT size per process (Megabyte) = " + std::to_string(params.dht_size)); MSG("DHT save snapshots is " + BOOL_PRINT(params.dht_snaps)); // MSG("DHT load file is " + chem_params.dht_file); } if (params.use_interp) { MSG("PHT interpolation enabled: " + BOOL_PRINT(params.use_interp)); MSG("PHT interp-size = " + std::to_string(params.interp_size)); MSG("PHT interp-min = " + std::to_string(params.interp_min_entries)); MSG("PHT interp-bucket-entries = " + std::to_string(params.interp_bucket_entries)); } } std::string init_file; std::string runtime_file; std::string out_dir; cmdl(1) >> runtime_file; cmdl(2) >> init_file; cmdl(3) >> out_dir; // chem_params.dht_outdir = out_dir; /* distribute information to R runtime */ // if local_rank == 0 then master else worker R["local_rank"] = MY_RANK; // assign a char* (string) to 'filesim' R["filesim"] = wrap(runtime_file); // assign a char* (string) to 'fileout' R["fileout"] = wrap(out_dir); // pass the boolean "store_result" to the R process // R["store_result"] = simparams.store_result; // // worker count // R["n_procs"] = simparams.world_size - 1; // // work package size // R["work_package_size"] = simparams.wp_size; // // dht enabled? // R["dht_enabled"] = chem_params.use_dht; // // log before rounding? // R["dht_log"] = simparams.dht_log; try { Rcpp::Function source("source"); Rcpp::Function readRDS("readRDS"); Rcpp::List init_params_ = readRDS(init_file); params.init_params = init_params_; Rcpp::List runtime_params = source(runtime_file, Rcpp::Named("local", true)); runtime_params = runtime_params["value"]; R[r_runtime_parameters] = runtime_params; params.timesteps = Rcpp::as>(runtime_params["timesteps"]); } catch (const std::exception &e) { ERRMSG("Error while parsing R scripts: " + std::string(e.what())); return ParseRet::PARSER_ERROR; } // eval the init string, ignoring any returns // R.parseEvalQ("source(filesim)"); // R.parseEvalQ("mysetup <- setup"); // this->chem_params.initFromR(R); return ParseRet::PARSER_OK; } static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters ¶ms, DiffusionModule &diffusion, ChemistryModule &chem) { /* Iteration Count is dynamic, retrieving value from R (is only needed by * master for the following loop) */ uint32_t maxiter = params.timesteps.size(); double sim_time = .0; if (params.print_progressbar) { chem.setProgressBarPrintout(true); } /* SIMULATION LOOP */ double dSimTime{0}; for (uint32_t iter = 1; iter < maxiter + 1; iter++) { double start_t = MPI_Wtime(); uint32_t tick = 0; // cout << "CPP: Evaluating next time step" << endl; // R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)"); const double &dt = params.timesteps[iter - 1]; // cout << "CPP: Next time step is " << dt << "[s]" << endl; MSG("Next time step is " + std::to_string(dt) + " [s]"); /* displaying iteration number, with C++ and R iterator */ MSG("Going through iteration " + std::to_string(iter)); // MSG("R's $iter: " + // std::to_string((uint32_t)(R.parseEval("mysetup$iter"))) + // ". Iteration"); /* run transport */ // TODO: transport to diffusion diffusion.simulate(dt); chem.GetField().update(diffusion.getField()); // chem.getfield().update(diffusion.getfield()); MSG("Chemistry step"); chem.simulate(dt); writeFieldsToR(R, diffusion.getField(), chem.GetField()); // R["store_result"] = true; // R.parseEval("mysetup$store_result <- TRUE"); diffusion.getField().update(chem.GetField()); R["req_dt"] = dt; R["simtime"] = (sim_time += dt); R.parseEval("mysetup$req_dt <- req_dt"); R.parseEval("mysetup$simtime <- simtime"); R["iter"] = iter; // MDL master_iteration_end just writes on disk state_T and // state_C after every iteration if the cmdline option // --ignore-results is not given (and thus the R variable // store_result is TRUE) R.parseEval("mysetup <- master_iteration_end(setup=mysetup, iter)"); MSG("End of *coupling* iteration " + std::to_string(iter) + "/" + std::to_string(maxiter)); MSG(); // MPI_Barrier(MPI_COMM_WORLD); double end_t = MPI_Wtime(); dSimTime += end_t - start_t; } // END SIMULATION LOOP Rcpp::List chem_profiling; chem_profiling["simtime"] = chem.GetChemistryTime(); chem_profiling["loop"] = chem.GetMasterLoopTime(); chem_profiling["sequential"] = chem.GetMasterSequentialTime(); chem_profiling["idle_master"] = chem.GetMasterIdleTime(); chem_profiling["idle_worker"] = Rcpp::wrap(chem.GetWorkerIdleTimings()); chem_profiling["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings()); Rcpp::List diffusion_profiling; diffusion_profiling["simtime"] = diffusion.getTransportTime(); if (params.use_dht) { chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits()); chem_profiling["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions()); chem_profiling["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings()); chem_profiling["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings()); } if (params.use_interp) { chem_profiling["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings()); chem_profiling["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings()); chem_profiling["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings()); chem_profiling["interp_fc"] = Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings()); chem_profiling["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls()); chem_profiling["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits()); } Rcpp::List profiling; profiling["simtime"] = dSimTime; profiling["chemistry"] = chem_profiling; profiling["diffusion"] = diffusion_profiling; chem.MasterLoopBreak(); return profiling; } int main(int argc, char *argv[]) { double dSimTime, sim_end; int world_size; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &MY_RANK); RInsidePOET &R = RInsidePOET::getInstance(); if (MY_RANK == 0) { MSG("Running POET version " + std::string(poet_version)); } /*Loading Dependencies*/ // TODO: kann raus R.parseEvalQ(kin_r_library); RuntimeParameters run_params; switch (parseInitValues(argv, R, run_params)) { case ParseRet::PARSER_ERROR: case ParseRet::PARSER_HELP: MPI_Finalize(); return 0; case ParseRet::PARSER_OK: break; } InitialList init_list(R); init_list.importList(run_params.init_params); MSG("RInside initialized on process " + std::to_string(MY_RANK)); ChemistryModule chemistry(run_params.work_package_size, init_list.getChemistryInit(), MPI_COMM_WORLD); const ChemistryModule::SurrogateSetup surr_setup = { init_list.getInitialGrid().GetProps(), run_params.use_dht, run_params.dht_size, run_params.use_interp, run_params.interp_bucket_entries, run_params.interp_size, run_params.interp_min_entries}; chemistry.masterEnableSurrogates(surr_setup); if (MY_RANK > 0) { chemistry.WorkerLoop(); MPI_Barrier(MPI_COMM_WORLD); MSG("finished, cleanup of process " + std::to_string(MY_RANK)); MPI_Finalize(); return EXIT_SUCCESS; } // R.parseEvalQ("mysetup <- setup"); // // if (MY_RANK == 0) { // get timestep vector from // // grid_init function ... // std::string master_init_code = "mysetup <- master_init(setup=mysetup)"; R.parseEval(master_init_code); // run_params.initVectorParams(R); // MDL: store all parameters if (MY_RANK == 0) { MSG("Calling R Function to store calling parameters"); // R.parseEvalQ("StoreSetup(setup=mysetup)"); } if (MY_RANK == 0) { MSG("Init done on process with rank " + std::to_string(MY_RANK)); } // MPI_Barrier(MPI_COMM_WORLD); DiffusionModule diffusion(init_list.getDiffusionInit(), init_list.getInitialGrid()); chemistry.masterSetField(init_list.getInitialGrid()); Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry); MSG("finished simulation loop"); MSG("start timing profiling"); // R["simtime"] = dSimTime; // R.parseEvalQ("profiling$simtime <- simtime"); R["profiling"] = profiling; string r_vis_code; r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));"; R.parseEval(r_vis_code); // MSG("Done! Results are stored as R objects into <" + run_params.getOutDir() // + // "/timings.rds>"); MPI_Barrier(MPI_COMM_WORLD); MSG("finished, cleanup of process " + std::to_string(MY_RANK)); MPI_Finalize(); if (MY_RANK == 0) { MSG("done, bye!"); } exit(0); }