From 5ce40617b8b9b980338c4bbc9ab28c13e5f7671f Mon Sep 17 00:00:00 2001 From: Max Luebke Date: Wed, 3 Apr 2024 21:03:10 +0000 Subject: [PATCH] Refactor R functions and how they are called --- R_lib/kin_r_library.R | 188 +++++------------------------------------- src/poet.cpp | 106 ++++++++++-------------- src/poet.hpp.in | 1 + 3 files changed, 67 insertions(+), 228 deletions(-) diff --git a/R_lib/kin_r_library.R b/R_lib/kin_r_library.R index c8bdedb73..33e6ef1dc 100644 --- a/R_lib/kin_r_library.R +++ b/R_lib/kin_r_library.R @@ -15,35 +15,19 @@ ### this program; if not, write to the Free Software Foundation, Inc., 51 ### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - -## Simple function to check file extension. It is needed to check if -## the GridFile is SUM (MUFITS format) or rds/RData -FileExt <- function(x) { - pos <- regexpr("\\.([[:alnum:]]+)$", x) - ifelse(pos > -1L, substring(x, pos + 1L), "") -} - -master_init <- function(setup) { - msgm("Process with rank 0 reading GRID properties") - +master_init <- function(setup, out_dir) { ## Setup the directory where we will store the results - verb <- FALSE - # if (local_rank == 0) { - verb <- TRUE ## verbosity loading MUFITS results - if (!dir.exists(fileout)) { - dir.create(fileout) - msgm("created directory ", fileout) - } else { - msgm("dir ", fileout, " already exists, I will overwrite!") - } - if (!exists("store_result")) { - msgm("store_result doesn't exist!") - } else { - msgm("store_result is ", store_result) - } - # } else { - - # } + if (!dir.exists(out_dir)) { + dir.create(out_dir) + msgm("created directory ", out_dir) + } else { + msgm("dir ", out_dir, " already exists, I will overwrite!") + } + if (!exists("setup$store_result")) { + msgm("store_result doesn't exist!") + } else { + msgm("store_result is ", setup$store_result) + } setup$iter <- 1 setup$timesteps <- setup$timesteps @@ -67,8 +51,8 @@ master_init <- function(setup) { ## This function, called only by master, stores on disk the last ## calculated time step if store_result is TRUE and increments the ## iteration counter -master_iteration_end <- function(setup,iter) { - # iter <- setup$iter +master_iteration_end <- function(setup, state_T, state_C) { + iter <- setup$iter # print(iter) ## max digits for iterations dgts <- as.integer(ceiling(log10(iter))) @@ -80,164 +64,34 @@ master_iteration_end <- function(setup,iter) { if (setup$store_result) { if (iter %in% setup$out_save) { nameout <- paste0(fileout, "/iter_", sprintf(fmt = fmt, iter), ".rds") - info <- list( - tr_req_dt = as.integer(1) - ## tr_allow_dt = setup$allowed_dt, - ## tr_inniter = as.integer(setup$inniter) - ) saveRDS(list( - T = setup$state_T, C = setup$state_C, - simtime = as.integer(0), - tr_info = info + T = state_T, C = state_C, + simtime = as.integer(setup$simulation_time) ), file = nameout) msgm("results stored in <", nameout, ">") } } + ## Add last time step to simulation time + setup$simulation_time <- setup$simulation_time + setup$dt_differ + msgm("done iteration", iter, "/", length(setup$timesteps)) setup$iter <- setup$iter + 1 return(setup) } -## function for the workers to compute chemistry through PHREEQC -slave_chemistry <- function(setup, data) { - base <- setup$base - first <- setup$first - prop <- setup$prop - immobile <- setup$immobile - kin <- setup$kin - ann <- setup$ann - - iter <- setup$iter - timesteps <- setup$timesteps - dt <- timesteps[iter] - - state_T <- data ## not the global field, but the work-package - - ## treat special H+/pH, e-/pe cases - state_T <- RedModRphree::Act2pH(state_T) - - ## reduction of the problem - if (setup$reduce) { - reduced <- ReduceStateOmit(state_T, omit = setup$ann) - } else { - reduced <- state_T - } - - ## form the PHREEQC input script for the current work package - inplist <- SplitMultiKin( - data = reduced, procs = 1, base = base, first = first, - ann = ann, prop = prop, minerals = immobile, kin = kin, dt = dt - ) - - ## if (local_rank==1 & iter==1) - ## RPhreeWriteInp("FirstInp", inplist) - - tmpC <- RunPQC(inplist, procs = 1, second = TRUE) - - ## recompose after the reduction - if (setup$reduce) { - state_C <- RecomposeState(tmpC, reduced) - } else { - state_C <- tmpC - } - - ## the next line is needed since we don't need all columns of - ## PHREEQC output - return(state_C[, prop]) -} - -## This function, called by master -master_chemistry <- function(setup, data) { - state_T <- setup$state_T - - msgm(" chemistry iteration", setup$iter) - - ## treat special H+/pH, e-/pe cases - state_T <- RedModRphree::Act2pH(state_T) - - ## reduction of the problem - if (setup$reduce) { - reduced <- ReduceStateOmit(state_T, omit = setup$ann) - } else { - reduced <- state_T - } - - ## inject data from workers - res_C <- data - - rownames(res_C) <- NULL - - ## print(res_C) - - if (nrow(res_C) > nrow(reduced)) { - res_C <- res_C[seq(2, nrow(res_C), by = 2), ] - } - - ## recompose after the reduction - if (setup$reduce) { - state_C <- RecomposeState(res_C, reduced) - } else { - state_C <- res_C - } - - setup$state_C <- state_C - setup$reduced <- reduced - - return(setup) -} - - -## Adapted version for "reduction" -ReduceStateOmit <- function(data, omit = NULL, sign = 6) { - require(mgcv) - - rem <- colnames(data) - if (is.list(omit)) { - indomi <- match(names(omit), colnames(data)) - datao <- data[, -indomi] - } else { - datao <- data - } - - datao <- signif(datao, sign) - red <- mgcv::uniquecombs(datao) - inds <- attr(red, "index") - now <- ncol(red) - - - ## reattach the omitted column(s) - ## FIXME: control if more than one ann is present - if (is.list(omit)) { - red <- cbind(red, rep(data[1, indomi], nrow(red))) - - colnames(red)[now + 1] <- names(omit) - - ret <- red[, colnames(data)] - } else { - ret <- red - } - rownames(ret) <- NULL - attr(ret, "index") <- inds - return(ret) -} - - ## Attach the name of the calling function to the message displayed on ## R's stdout msgm <- function(...) { - # if (local_rank == 0) { - fname <- as.list(sys.call(-1))[[1]] - prefix <- paste0("R: ", fname, " ::") + prefix <- paste0("R: ") cat(paste(prefix, ..., "\n")) - # } invisible() } ## Function called by master R process to store on disk all relevant ## parameters for the simulation -StoreSetup <- function(setup) { +StoreSetup <- function(setup, filesim, out_dir) { to_store <- vector(mode = "list", length = 4) ## names(to_store) <- c("Sim", "Flow", "Transport", "Chemistry", "DHT") names(to_store) <- c("Sim", "Transport", "DHT", "Cmdline") diff --git a/src/poet.cpp b/src/poet.cpp index adb5dfcd9..253062c88 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -29,8 +29,13 @@ #include #include +#include +#include +#include #include +#include #include +#include #include #include "Base/argh.hpp" @@ -43,6 +48,21 @@ using namespace Rcpp; static int MY_RANK = 0; +static std::unique_ptr global_rt_setup; + +// we need some layz evaluation, as we can't define the functions before the R +// runtime is initialized +static std::optional master_init_R; +static std::optional master_iteration_end_R; +static std::optional store_setup_R; + +static void init_global_functions(RInside &R) { + R.parseEval(kin_r_library); + master_init_R = Rcpp::Function("master_init"); + master_iteration_end_R = Rcpp::Function("master_iteration_end"); + store_setup_R = Rcpp::Function("StoreSetup"); +} + // HACK: this is a step back as the order and also the count of fields is // predefined, but it will change in the future void writeFieldsToR(RInside &R, const Field &trans, const Field &chem) { @@ -126,12 +146,6 @@ ParseRet parseInitValues(char **argv, RInsidePOET &R, cmdl("interp-min", 5) >> params.interp_min_entries; cmdl("interp-bucket-entries", 20) >> params.interp_bucket_entries; - /*Parse output options*/ - // simparams.store_result = !cmdl["ignore-result"]; - - /*Parse output options*/ - // simparams.store_result = !cmdl["ignore-result"]; - if (MY_RANK == 0) { // MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result)); MSG("Work Package Size: " + std::to_string(params.work_package_size)); @@ -162,21 +176,20 @@ ParseRet parseInitValues(char **argv, RInsidePOET &R, std::string init_file; std::string runtime_file; - std::string out_dir; cmdl(1) >> runtime_file; cmdl(2) >> init_file; - cmdl(3) >> out_dir; + cmdl(3) >> params.out_dir; // chem_params.dht_outdir = out_dir; /* distribute information to R runtime */ // if local_rank == 0 then master else worker - R["local_rank"] = MY_RANK; + // R["local_rank"] = MY_RANK; // assign a char* (string) to 'filesim' - R["filesim"] = wrap(runtime_file); + // R["filesim"] = wrap(runtime_file); // assign a char* (string) to 'fileout' - R["fileout"] = wrap(out_dir); + // R["fileout"] = wrap(out_dir); // pass the boolean "store_result" to the R process // R["store_result"] = simparams.store_result; // // worker count @@ -195,23 +208,17 @@ ParseRet parseInitValues(char **argv, RInsidePOET &R, Rcpp::List init_params_ = readRDS(init_file); params.init_params = init_params_; - Rcpp::List runtime_params = - source(runtime_file, Rcpp::Named("local", true)); - runtime_params = runtime_params["value"]; - R[r_runtime_parameters] = runtime_params; + global_rt_setup = std::make_unique(); + *global_rt_setup = source(runtime_file, Rcpp::Named("local", true)); + *global_rt_setup = global_rt_setup->operator[]("value"); params.timesteps = - Rcpp::as>(runtime_params["timesteps"]); + Rcpp::as>(global_rt_setup->operator[]("timesteps")); } catch (const std::exception &e) { ERRMSG("Error while parsing R scripts: " + std::string(e.what())); return ParseRet::PARSER_ERROR; } - // eval the init string, ignoring any returns - // R.parseEvalQ("source(filesim)"); - // R.parseEvalQ("mysetup <- setup"); - - // this->chem_params.initFromR(R); return ParseRet::PARSER_OK; } @@ -236,8 +243,6 @@ static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters ¶ms, for (uint32_t iter = 1; iter < maxiter + 1; iter++) { double start_t = MPI_Wtime(); uint32_t tick = 0; - // cout << "CPP: Evaluating next time step" << endl; - // R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)"); const double &dt = params.timesteps[iter - 1]; @@ -246,12 +251,8 @@ static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters ¶ms, /* displaying iteration number, with C++ and R iterator */ MSG("Going through iteration " + std::to_string(iter)); - // MSG("R's $iter: " + - // std::to_string((uint32_t)(R.parseEval("mysetup$iter"))) + - // ". Iteration"); /* run transport */ - // TODO: transport to diffusion diffusion.simulate(dt); chem.getField().update(diffusion.getField()); @@ -262,24 +263,15 @@ static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters ¶ms, chem.simulate(dt); - writeFieldsToR(R, diffusion.getField(), chem.getField()); - // R["store_result"] = true; - // R.parseEval("mysetup$store_result <- TRUE"); diffusion.getField().update(chem.getField()); - R["req_dt"] = dt; - R["simtime"] = (sim_time += dt); - - R.parseEval("mysetup$req_dt <- req_dt"); - R.parseEval("mysetup$simtime <- simtime"); - - R["iter"] = iter; - // MDL master_iteration_end just writes on disk state_T and // state_C after every iteration if the cmdline option // --ignore-results is not given (and thus the R variable // store_result is TRUE) - R.parseEval("mysetup <- master_iteration_end(setup=mysetup, iter)"); + *global_rt_setup = master_iteration_end_R.value()( + *global_rt_setup, diffusion.getField().asSEXP(), + chem.getField().asSEXP()); MSG("End of *coupling* iteration " + std::to_string(iter) + "/" + std::to_string(maxiter)); @@ -350,10 +342,6 @@ int main(int argc, char *argv[]) { MSG("Running POET version " + std::string(poet_version)); } - /*Loading Dependencies*/ - // TODO: kann raus - R.parseEvalQ(kin_r_library); - RuntimeParameters run_params; switch (parseInitValues(argv, R, run_params)) { @@ -370,6 +358,10 @@ int main(int argc, char *argv[]) { MSG("RInside initialized on process " + std::to_string(MY_RANK)); + std::cout << std::flush; + + MPI_Barrier(MPI_COMM_WORLD); + ChemistryModule chemistry(run_params.work_package_size, init_list.getChemistryInit(), MPI_COMM_WORLD); @@ -385,26 +377,22 @@ int main(int argc, char *argv[]) { chemistry.masterEnableSurrogates(surr_setup); if (MY_RANK > 0) { - chemistry.WorkerLoop(); } else { + + init_global_functions(R); + // R.parseEvalQ("mysetup <- setup"); // // if (MY_RANK == 0) { // get timestep vector from // // grid_init function ... // - std::string master_init_code = "mysetup <- master_init(setup=mysetup)"; - R.parseEval(master_init_code); - - // run_params.initVectorParams(R); + *global_rt_setup = + master_init_R.value()(*global_rt_setup, run_params.out_dir); // MDL: store all parameters - if (MY_RANK == 0) { - MSG("Calling R Function to store calling parameters"); - // R.parseEvalQ("StoreSetup(setup=mysetup)"); - } + // MSG("Calling R Function to store calling parameters"); + // R.parseEvalQ("StoreSetup(setup=mysetup)"); - if (MY_RANK == 0) { - MSG("Init done on process with rank " + std::to_string(MY_RANK)); - } + MSG("Init done on process with rank " + std::to_string(MY_RANK)); // MPI_Barrier(MPI_COMM_WORLD); @@ -417,8 +405,6 @@ int main(int argc, char *argv[]) { MSG("finished simulation loop"); - MSG("start timing profiling"); - // R["simtime"] = dSimTime; // R.parseEvalQ("profiling$simtime <- simtime"); @@ -428,10 +414,8 @@ int main(int argc, char *argv[]) { r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));"; R.parseEval(r_vis_code); - // MSG("Done! Results are stored as R objects into <" + - // run_params.getOutDir() - // + - // "/timings.rds>"); + MSG("Done! Results are stored as R objects into <" + run_params.out_dir + + "/timings.rds>"); } } diff --git a/src/poet.hpp.in b/src/poet.hpp.in index 80db8d436..f9a86be77 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -49,6 +49,7 @@ constexpr uint32_t CHEM_DEFAULT_WORK_PACKAGE_SIZE = 32; constexpr uint32_t CHEM_DHT_SIZE_PER_PROCESS_MB = 1.5E3; struct RuntimeParameters { + std::string out_dir; std::vector timesteps; bool print_progressbar;