Refactor R functions and how they are called

This commit is contained in:
Max Luebke 2024-04-03 21:03:10 +00:00
parent 2b6f17f18c
commit 5ce40617b8
3 changed files with 67 additions and 228 deletions

View File

@ -15,35 +15,19 @@
### this program; if not, write to the Free Software Foundation, Inc., 51
### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
## Simple function to check file extension. It is needed to check if
## the GridFile is SUM (MUFITS format) or rds/RData
FileExt <- function(x) {
pos <- regexpr("\\.([[:alnum:]]+)$", x)
ifelse(pos > -1L, substring(x, pos + 1L), "")
}
master_init <- function(setup) {
msgm("Process with rank 0 reading GRID properties")
master_init <- function(setup, out_dir) {
## Setup the directory where we will store the results
verb <- FALSE
# if (local_rank == 0) {
verb <- TRUE ## verbosity loading MUFITS results
if (!dir.exists(fileout)) {
dir.create(fileout)
msgm("created directory ", fileout)
} else {
msgm("dir ", fileout, " already exists, I will overwrite!")
}
if (!exists("store_result")) {
msgm("store_result doesn't exist!")
} else {
msgm("store_result is ", store_result)
}
# } else {
# }
if (!dir.exists(out_dir)) {
dir.create(out_dir)
msgm("created directory ", out_dir)
} else {
msgm("dir ", out_dir, " already exists, I will overwrite!")
}
if (!exists("setup$store_result")) {
msgm("store_result doesn't exist!")
} else {
msgm("store_result is ", setup$store_result)
}
setup$iter <- 1
setup$timesteps <- setup$timesteps
@ -67,8 +51,8 @@ master_init <- function(setup) {
## This function, called only by master, stores on disk the last
## calculated time step if store_result is TRUE and increments the
## iteration counter
master_iteration_end <- function(setup,iter) {
# iter <- setup$iter
master_iteration_end <- function(setup, state_T, state_C) {
iter <- setup$iter
# print(iter)
## max digits for iterations
dgts <- as.integer(ceiling(log10(iter)))
@ -80,164 +64,34 @@ master_iteration_end <- function(setup,iter) {
if (setup$store_result) {
if (iter %in% setup$out_save) {
nameout <- paste0(fileout, "/iter_", sprintf(fmt = fmt, iter), ".rds")
info <- list(
tr_req_dt = as.integer(1)
## tr_allow_dt = setup$allowed_dt,
## tr_inniter = as.integer(setup$inniter)
)
saveRDS(list(
T = setup$state_T, C = setup$state_C,
simtime = as.integer(0),
tr_info = info
T = state_T, C = state_C,
simtime = as.integer(setup$simulation_time)
), file = nameout)
msgm("results stored in <", nameout, ">")
}
}
## Add last time step to simulation time
setup$simulation_time <- setup$simulation_time + setup$dt_differ
msgm("done iteration", iter, "/", length(setup$timesteps))
setup$iter <- setup$iter + 1
return(setup)
}
## function for the workers to compute chemistry through PHREEQC
slave_chemistry <- function(setup, data) {
base <- setup$base
first <- setup$first
prop <- setup$prop
immobile <- setup$immobile
kin <- setup$kin
ann <- setup$ann
iter <- setup$iter
timesteps <- setup$timesteps
dt <- timesteps[iter]
state_T <- data ## not the global field, but the work-package
## treat special H+/pH, e-/pe cases
state_T <- RedModRphree::Act2pH(state_T)
## reduction of the problem
if (setup$reduce) {
reduced <- ReduceStateOmit(state_T, omit = setup$ann)
} else {
reduced <- state_T
}
## form the PHREEQC input script for the current work package
inplist <- SplitMultiKin(
data = reduced, procs = 1, base = base, first = first,
ann = ann, prop = prop, minerals = immobile, kin = kin, dt = dt
)
## if (local_rank==1 & iter==1)
## RPhreeWriteInp("FirstInp", inplist)
tmpC <- RunPQC(inplist, procs = 1, second = TRUE)
## recompose after the reduction
if (setup$reduce) {
state_C <- RecomposeState(tmpC, reduced)
} else {
state_C <- tmpC
}
## the next line is needed since we don't need all columns of
## PHREEQC output
return(state_C[, prop])
}
## This function, called by master
master_chemistry <- function(setup, data) {
state_T <- setup$state_T
msgm(" chemistry iteration", setup$iter)
## treat special H+/pH, e-/pe cases
state_T <- RedModRphree::Act2pH(state_T)
## reduction of the problem
if (setup$reduce) {
reduced <- ReduceStateOmit(state_T, omit = setup$ann)
} else {
reduced <- state_T
}
## inject data from workers
res_C <- data
rownames(res_C) <- NULL
## print(res_C)
if (nrow(res_C) > nrow(reduced)) {
res_C <- res_C[seq(2, nrow(res_C), by = 2), ]
}
## recompose after the reduction
if (setup$reduce) {
state_C <- RecomposeState(res_C, reduced)
} else {
state_C <- res_C
}
setup$state_C <- state_C
setup$reduced <- reduced
return(setup)
}
## Adapted version for "reduction"
ReduceStateOmit <- function(data, omit = NULL, sign = 6) {
require(mgcv)
rem <- colnames(data)
if (is.list(omit)) {
indomi <- match(names(omit), colnames(data))
datao <- data[, -indomi]
} else {
datao <- data
}
datao <- signif(datao, sign)
red <- mgcv::uniquecombs(datao)
inds <- attr(red, "index")
now <- ncol(red)
## reattach the omitted column(s)
## FIXME: control if more than one ann is present
if (is.list(omit)) {
red <- cbind(red, rep(data[1, indomi], nrow(red)))
colnames(red)[now + 1] <- names(omit)
ret <- red[, colnames(data)]
} else {
ret <- red
}
rownames(ret) <- NULL
attr(ret, "index") <- inds
return(ret)
}
## Attach the name of the calling function to the message displayed on
## R's stdout
msgm <- function(...) {
# if (local_rank == 0) {
fname <- as.list(sys.call(-1))[[1]]
prefix <- paste0("R: ", fname, " ::")
prefix <- paste0("R: ")
cat(paste(prefix, ..., "\n"))
# }
invisible()
}
## Function called by master R process to store on disk all relevant
## parameters for the simulation
StoreSetup <- function(setup) {
StoreSetup <- function(setup, filesim, out_dir) {
to_store <- vector(mode = "list", length = 4)
## names(to_store) <- c("Sim", "Flow", "Transport", "Chemistry", "DHT")
names(to_store) <- c("Sim", "Transport", "DHT", "Cmdline")

View File

@ -29,8 +29,13 @@
#include <RInside.h>
#include <Rcpp.h>
#include <Rcpp/Function.h>
#include <Rcpp/vector/instantiation.h>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <mpi.h>
#include <optional>
#include <string>
#include "Base/argh.hpp"
@ -43,6 +48,21 @@ using namespace Rcpp;
static int MY_RANK = 0;
static std::unique_ptr<Rcpp::List> global_rt_setup;
// we need some layz evaluation, as we can't define the functions before the R
// runtime is initialized
static std::optional<Rcpp::Function> master_init_R;
static std::optional<Rcpp::Function> master_iteration_end_R;
static std::optional<Rcpp::Function> store_setup_R;
static void init_global_functions(RInside &R) {
R.parseEval(kin_r_library);
master_init_R = Rcpp::Function("master_init");
master_iteration_end_R = Rcpp::Function("master_iteration_end");
store_setup_R = Rcpp::Function("StoreSetup");
}
// HACK: this is a step back as the order and also the count of fields is
// predefined, but it will change in the future
void writeFieldsToR(RInside &R, const Field &trans, const Field &chem) {
@ -126,12 +146,6 @@ ParseRet parseInitValues(char **argv, RInsidePOET &R,
cmdl("interp-min", 5) >> params.interp_min_entries;
cmdl("interp-bucket-entries", 20) >> params.interp_bucket_entries;
/*Parse output options*/
// simparams.store_result = !cmdl["ignore-result"];
/*Parse output options*/
// simparams.store_result = !cmdl["ignore-result"];
if (MY_RANK == 0) {
// MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result));
MSG("Work Package Size: " + std::to_string(params.work_package_size));
@ -162,21 +176,20 @@ ParseRet parseInitValues(char **argv, RInsidePOET &R,
std::string init_file;
std::string runtime_file;
std::string out_dir;
cmdl(1) >> runtime_file;
cmdl(2) >> init_file;
cmdl(3) >> out_dir;
cmdl(3) >> params.out_dir;
// chem_params.dht_outdir = out_dir;
/* distribute information to R runtime */
// if local_rank == 0 then master else worker
R["local_rank"] = MY_RANK;
// R["local_rank"] = MY_RANK;
// assign a char* (string) to 'filesim'
R["filesim"] = wrap(runtime_file);
// R["filesim"] = wrap(runtime_file);
// assign a char* (string) to 'fileout'
R["fileout"] = wrap(out_dir);
// R["fileout"] = wrap(out_dir);
// pass the boolean "store_result" to the R process
// R["store_result"] = simparams.store_result;
// // worker count
@ -195,23 +208,17 @@ ParseRet parseInitValues(char **argv, RInsidePOET &R,
Rcpp::List init_params_ = readRDS(init_file);
params.init_params = init_params_;
Rcpp::List runtime_params =
source(runtime_file, Rcpp::Named("local", true));
runtime_params = runtime_params["value"];
R[r_runtime_parameters] = runtime_params;
global_rt_setup = std::make_unique<Rcpp::List>();
*global_rt_setup = source(runtime_file, Rcpp::Named("local", true));
*global_rt_setup = global_rt_setup->operator[]("value");
params.timesteps =
Rcpp::as<std::vector<double>>(runtime_params["timesteps"]);
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps"));
} catch (const std::exception &e) {
ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
return ParseRet::PARSER_ERROR;
}
// eval the init string, ignoring any returns
// R.parseEvalQ("source(filesim)");
// R.parseEvalQ("mysetup <- setup");
// this->chem_params.initFromR(R);
return ParseRet::PARSER_OK;
}
@ -236,8 +243,6 @@ static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters &params,
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
double start_t = MPI_Wtime();
uint32_t tick = 0;
// cout << "CPP: Evaluating next time step" << endl;
// R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
const double &dt = params.timesteps[iter - 1];
@ -246,12 +251,8 @@ static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters &params,
/* displaying iteration number, with C++ and R iterator */
MSG("Going through iteration " + std::to_string(iter));
// MSG("R's $iter: " +
// std::to_string((uint32_t)(R.parseEval("mysetup$iter"))) +
// ". Iteration");
/* run transport */
// TODO: transport to diffusion
diffusion.simulate(dt);
chem.getField().update(diffusion.getField());
@ -262,24 +263,15 @@ static Rcpp::List RunMasterLoop(RInside &R, const RuntimeParameters &params,
chem.simulate(dt);
writeFieldsToR(R, diffusion.getField(), chem.getField());
// R["store_result"] = true;
// R.parseEval("mysetup$store_result <- TRUE");
diffusion.getField().update(chem.getField());
R["req_dt"] = dt;
R["simtime"] = (sim_time += dt);
R.parseEval("mysetup$req_dt <- req_dt");
R.parseEval("mysetup$simtime <- simtime");
R["iter"] = iter;
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEval("mysetup <- master_iteration_end(setup=mysetup, iter)");
*global_rt_setup = master_iteration_end_R.value()(
*global_rt_setup, diffusion.getField().asSEXP(),
chem.getField().asSEXP());
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
std::to_string(maxiter));
@ -350,10 +342,6 @@ int main(int argc, char *argv[]) {
MSG("Running POET version " + std::string(poet_version));
}
/*Loading Dependencies*/
// TODO: kann raus
R.parseEvalQ(kin_r_library);
RuntimeParameters run_params;
switch (parseInitValues(argv, R, run_params)) {
@ -370,6 +358,10 @@ int main(int argc, char *argv[]) {
MSG("RInside initialized on process " + std::to_string(MY_RANK));
std::cout << std::flush;
MPI_Barrier(MPI_COMM_WORLD);
ChemistryModule chemistry(run_params.work_package_size,
init_list.getChemistryInit(), MPI_COMM_WORLD);
@ -385,26 +377,22 @@ int main(int argc, char *argv[]) {
chemistry.masterEnableSurrogates(surr_setup);
if (MY_RANK > 0) {
chemistry.WorkerLoop();
} else {
init_global_functions(R);
// R.parseEvalQ("mysetup <- setup");
// // if (MY_RANK == 0) { // get timestep vector from
// // grid_init function ... //
std::string master_init_code = "mysetup <- master_init(setup=mysetup)";
R.parseEval(master_init_code);
// run_params.initVectorParams(R);
*global_rt_setup =
master_init_R.value()(*global_rt_setup, run_params.out_dir);
// MDL: store all parameters
if (MY_RANK == 0) {
MSG("Calling R Function to store calling parameters");
// R.parseEvalQ("StoreSetup(setup=mysetup)");
}
// MSG("Calling R Function to store calling parameters");
// R.parseEvalQ("StoreSetup(setup=mysetup)");
if (MY_RANK == 0) {
MSG("Init done on process with rank " + std::to_string(MY_RANK));
}
MSG("Init done on process with rank " + std::to_string(MY_RANK));
// MPI_Barrier(MPI_COMM_WORLD);
@ -417,8 +405,6 @@ int main(int argc, char *argv[]) {
MSG("finished simulation loop");
MSG("start timing profiling");
// R["simtime"] = dSimTime;
// R.parseEvalQ("profiling$simtime <- simtime");
@ -428,10 +414,8 @@ int main(int argc, char *argv[]) {
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
// MSG("Done! Results are stored as R objects into <" +
// run_params.getOutDir()
// +
// "/timings.rds>");
MSG("Done! Results are stored as R objects into <" + run_params.out_dir +
"/timings.rds>");
}
}

View File

@ -49,6 +49,7 @@ constexpr uint32_t CHEM_DEFAULT_WORK_PACKAGE_SIZE = 32;
constexpr uint32_t CHEM_DHT_SIZE_PER_PROCESS_MB = 1.5E3;
struct RuntimeParameters {
std::string out_dir;
std::vector<double> timesteps;
bool print_progressbar;