From 8d0be5ae0da1a67eed12679b18dacde79cf1cf38 Mon Sep 17 00:00:00 2001 From: Marco De Lucia Date: Tue, 11 Jun 2024 16:50:02 +0200 Subject: [PATCH] feat: fast serialization/storage using qs package via `--qs` flag --- README.md | 97 +++++++++++++++++++++------------------ R_lib/kin_r_library.R | 54 +++++++++++++++++----- src/poet.cpp | 103 ++++++++++++++++++++++++++---------------- src/poet.hpp.in | 3 +- 4 files changed, 160 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index b17d73c12..ec75dceaf 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # POET @@ -87,7 +87,7 @@ follows: $ R # install R dependencies -> install.packages(c("Rcpp", "RInside")) +> install.packages(c("Rcpp", "RInside","qs")) > q(save="no") # cd into POET project root @@ -133,13 +133,14 @@ With the installation of POET, two executables are provided: - `poet` - the main executable to run simulations - `poet_init` - a preprocessor to generate input files for POET from R scripts -Preprocessed benchmarks can be found in the `share/poet` directory with an -according *runtime* setup. More on those files and how to create them later. +Preprocessed benchmarks can be found in the `share/poet` directory +with an according *runtime* setup. More on those files and how to +create them later. ## Running -Run POET by `mpirun ./poet [OPTIONS] ` -where: +Run POET by `mpirun ./poet [OPTIONS] +` where: - **OPTIONS** - POET options (explained below) - **RUNFILE** - Runtime parameters described as R script @@ -154,8 +155,9 @@ The following parameters can be set: |-----------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------| | **--work-package-size=** | _1..n_ | size of work packages (defaults to _5_) | | **-P, --progress** | | show progress bar | -| **--ai-surrogate** | | activates the AI surrogate chemistry model (defaults to _OFF_) | +| **--ai-surrogate** | | activates the AI surrogate chemistry model (defaults to _OFF_) | | **--dht** | | enabling DHT usage (defaults to _OFF_) | +| **--qs** | | store results using qs::qsave() (.qs extension) instead of default RDS (.rds) | | **--dht-strategy=** | _0-1_ | change DHT strategy. **NOT IMPLEMENTED YET** (Defaults to _0_) | | **--dht-size=** | _1-n_ | size of DHT per process involved in megabyte (defaults to _1000 MByte_) | | **--dht-snaps=** | _0-2_ | disable or enable storage of DHT snapshots | @@ -253,12 +255,13 @@ produce any valid predictions. ## Defining a model -In order to provide a model to POET, you need to setup a R script which can then -be used by `poet_init` to generate the simulation input. Which parameters are -required can be found in the -[Wiki](https://git.gfz-potsdam.de/naaice/poet/-/wikis/Initialization). We try to -keep the document up-to-date. However, if you encounter missing information or -need help, please get in touch with us via the issue tracker or E-Mail. +In order to provide a model to POET, you need to setup a R script +which can then be used by `poet_init` to generate the simulation +input. Which parameters are required can be found in the +[Wiki](https://git.gfz-potsdam.de/naaice/poet/-/wikis/Initialization). +We try to keep the document up-to-date. However, if you encounter +missing information or need help, please get in touch with us via the +issue tracker or E-Mail. `poet_init` can be used as follows: @@ -268,46 +271,50 @@ need help, please get in touch with us via the issue tracker or E-Mail. where: -- **output** - name of the output file (defaults to the input file name - with the extension `.rds`) -- **setwd** - set the working directory to the directory of the input file (e.g. - to allow relative paths in the input script). However, the output file - will be stored in the directory from which `poet_init` was called. +- **output** - name of the output file (defaults to the input file + name with the extension `.rds`) +- **setwd** - set the working directory to the directory of the input + file (e.g. to allow relative paths in the input script). However, + the output file will be stored in the directory from which + `poet_init` was called. ## Additional functions for the AI surrogate -The AI surrogate can be activated for any benchmark and is by default initiated -as a sequential keras model with three hidden layer of depth 48, 96, 24 with -relu activation and adam optimizer. All functions in `ai_surrogate_model.R` can -be overridden by adding custom definitions via an R file in the input script. -This is done by adding the path to this file in the input script. Simply add the -path as an element called `ai_surrogate_input_script` to the `chemistry_setup` -list. Please use the global variable `ai_surrogate_base_path` as a base path +The AI surrogate can be activated for any benchmark and is by default +initiated as a sequential keras model with three hidden layer of depth +48, 96, 24 with relu activation and adam optimizer. All functions in +`ai_surrogate_model.R` can be overridden by adding custom definitions +via an R file in the input script. This is done by adding the path to +this file in the input script. Simply add the path as an element +called `ai_surrogate_input_script` to the `chemistry_setup` list. +Please use the global variable `ai_surrogate_base_path` as a base path when relative filepaths are used in custom funtions. -**There is currently no default implementation to determine the validity of -predicted values.** This means, that every input script must include an R source -file with a custom function `validate_predictions(predictors, prediction)`. -Examples for custom functions can be found for the barite_200 benchmark +**There is currently no default implementation to determine the +validity of predicted values.** This means, that every input script +must include an R source file with a custom function +`validate_predictions(predictors, prediction)`. Examples for custom +functions can be found for the barite_200 benchmark -The functions can be defined as follows: +The functions can be defined as follows: -`validate_predictions(predictors, prediction)`: Returns a boolean index vector -that signals for each row in the predictions if the values are considered valid. -Can eg. be implemented as a mass balance threshold between the predictors and -the prediction. +`validate_predictions(predictors, prediction)`: Returns a boolean +index vector that signals for each row in the predictions if the +values are considered valid. Can eg. be implemented as a mass balance +threshold between the predictors and the prediction. -`initiate_model()`: Returns a keras model. Can be used to load pretrained -models. +`initiate_model()`: Returns a keras model. Can be used to load +pretrained models. `preprocess(df, backtransform = FALSE, outputs = FALSE)`: Returns the -scaled/transformed/backtransformed dataframe. The `backtransform` flag signals -if the current processing step is applied to data that's assumed to be scaled -and expects backtransformed values. The `outputs` flag signals if the current -processing step is applied to the output or tatget of the model. This can be -used to eg. skip these processing steps and only scale the model input. +scaled/transformed/backtransformed dataframe. The `backtransform` flag +signals if the current processing step is applied to data that's +assumed to be scaled and expects backtransformed values. The `outputs` +flag signals if the current processing step is applied to the output +or tatget of the model. This can be used to eg. skip these processing +steps and only scale the model input. -`training_step (model, predictor, target, validity)`: Trains the model after -each iteration. `validity` is the bool index vector given by -`validate_predictions` and can eg. be used to only train on values that have not -been valid predictions. \ No newline at end of file +`training_step (model, predictor, target, validity)`: Trains the model +after each iteration. `validity` is the bool index vector given by +`validate_predictions` and can eg. be used to only train on values +that have not been valid predictions. diff --git a/R_lib/kin_r_library.R b/R_lib/kin_r_library.R index cb8eaecd3..143c72df5 100644 --- a/R_lib/kin_r_library.R +++ b/R_lib/kin_r_library.R @@ -1,4 +1,4 @@ -## Time-stamp: "Last modified 2023-08-15 11:58:23 delucia" +## Time-stamp: "Last modified 2024-06-11 14:26:33 delucia" ### Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam) ### @@ -35,14 +35,18 @@ master_init <- function(setup, out_dir, init_field) { setup$iterations <- setup$maxiter setup$simulation_time <- 0 + dgts <- as.integer(ceiling(log10(setup$maxiter))) + ## string format to use in sprintf + fmt <- paste0("%0", dgts, "d") + if (is.null(setup[["store_result"]])) { setup$store_result <- TRUE } if (setup$store_result) { - init_field_out <- paste0(out_dir, "/iter_0.rds") + init_field_out <- paste0(out_dir, "/iter_", sprintf(fmt = fmt, 0), ".", setup$out_ext) init_field <- data.frame(init_field, check.names = FALSE) - saveRDS(init_field, file = init_field_out) + SaveRObj(x = init_field, path = init_field_out) msgm("Stored initial field in ", init_field_out) if (is.null(setup[["out_save"]])) { setup$out_save <- seq(1, setup$iterations) @@ -69,7 +73,7 @@ master_iteration_end <- function(setup, state_T, state_C) { ## comprised in setup$out_save if (setup$store_result) { if (iter %in% setup$out_save) { - nameout <- paste0(setup$out_dir, "/iter_", sprintf(fmt = fmt, iter), ".rds") + nameout <- paste0(setup$out_dir, "/iter_", sprintf(fmt = fmt, iter), ".", setup$out_ext) state_T <- data.frame(state_T, check.names = FALSE) state_C <- data.frame(state_C, check.names = FALSE) @@ -77,13 +81,14 @@ master_iteration_end <- function(setup, state_T, state_C) { prediction_time = if(exists("ai_prediction_time")) as.integer(ai_prediction_time) else NULL, training_time = if(exists("ai_training_time")) as.integer(ai_training_time) else NULL, valid_predictions = if(exists("validity_vector")) validity_vector else NULL) - saveRDS(list( - T = state_T, - C = state_C, - simtime = as.integer(setup$simulation_time), - totaltime = as.integer(totaltime), - ai_surrogate_info = ai_surrogate_info - ), file = nameout) + + SaveRObj(x = list( + T = state_T, + C = state_C, + simtime = as.integer(setup$simulation_time), + totaltime = as.integer(totaltime), + ai_surrogate_info = ai_surrogate_info + ), path = nameout) msgm("results stored in <", nameout, ">") } } @@ -172,3 +177,30 @@ GetWorkPackageSizesVector <- function(n_packages, package_size, len) { ids <- rep(1:n_packages, times = package_size, each = 1)[1:len] return(as.integer(table(ids))) } + + +## Handler to read R objs from binary files using either builtin +## readRDS() or qs::qread() based on file extension +ReadRObj <- function(path) { + ## code borrowed from tools::file_ext() + pos <- regexpr("\\.([[:alnum:]]+)$", path) + extension <- ifelse(pos > -1L, substring(path, pos + 1L), "") + + switch(extension, + rds = readRDS(path), + qs = qs::qread(path)) +} + +## Handler to store R objs to binary files using either builtin +## saveRDS() or qs::qsave() based on file extension +SaveRObj <- function(x, path) { + msgm("Storing to", path) + ## code borrowed from tools::file_ext() + pos <- regexpr("\\.([[:alnum:]]+)$", path) + extension <- ifelse(pos > -1L, substring(path, pos + 1L), "") + + switch(extension, + rds = saveRDS(object = x, file=path), + qs = qs::qsave(x=x, file = path)) +} + diff --git a/src/poet.cpp b/src/poet.cpp index 4a0abc2c1..06e15b16a 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -52,17 +52,23 @@ static int MY_RANK = 0; static std::unique_ptr global_rt_setup; -// we need some layz evaluation, as we can't define the functions before the R -// runtime is initialized +// we need some lazy evaluation, as we can't define the functions +// before the R runtime is initialized static std::optional master_init_R; static std::optional master_iteration_end_R; static std::optional store_setup_R; +static std::optional ReadRObj_R; +static std::optional SaveRObj_R; +static std::optional source_R; static void init_global_functions(RInside &R) { R.parseEval(kin_r_library); - master_init_R = Rcpp::Function("master_init"); + master_init_R = Rcpp::Function("master_init"); master_iteration_end_R = Rcpp::Function("master_iteration_end"); - store_setup_R = Rcpp::Function("StoreSetup"); + store_setup_R = Rcpp::Function("StoreSetup"); + source_R = Rcpp::Function("source"); + ReadRObj_R = Rcpp::Function("ReadRObj"); + SaveRObj_R = Rcpp::Function("SaveRObj"); } // HACK: this is a step back as the order and also the count of fields is @@ -150,8 +156,16 @@ ParseRet parseInitValues(char **argv, RuntimeParameters ¶ms) { params.use_ai_surrogate = cmdl["ai-surrogate"]; + // MDL: optional flag "qs" to switch to qsave() + params.out_ext = "rds"; + if (cmdl["qs"]) { + MSG("Enabled output"); + params.out_ext = "qs"; + } + if (MY_RANK == 0) { // MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result)); + MSG("Output format/extension is " + params.out_ext); MSG("Work Package Size: " + std::to_string(params.work_package_size)); MSG("DHT is " + BOOL_PRINT(params.use_dht)); MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate)); @@ -207,18 +221,22 @@ ParseRet parseInitValues(char **argv, RuntimeParameters ¶ms) { // R["dht_log"] = simparams.dht_log; try { - Rcpp::Function source("source"); - Rcpp::Function readRDS("readRDS"); + // Rcpp::Function source("source"); + // Rcpp::Function ReadRObj("ReadRObj"); + // Rcpp::Function SaveRObj("SaveRObj"); - Rcpp::List init_params_ = readRDS(init_file); + Rcpp::List init_params_ = ReadRObj_R.value()(init_file); params.init_params = init_params_; - + global_rt_setup = std::make_unique(); - *global_rt_setup = source(runtime_file, Rcpp::Named("local", true)); + *global_rt_setup = source_R.value()(runtime_file, Rcpp::Named("local", true)); *global_rt_setup = global_rt_setup->operator[]("value"); + // MDL add "out_ext" for output format to R setup + (*global_rt_setup)["out_ext"] = params.out_ext; + params.timesteps = - Rcpp::as>(global_rt_setup->operator[]("timesteps")); + Rcpp::as>(global_rt_setup->operator[]("timesteps")); } catch (const std::exception &e) { ERRMSG("Error while parsing R scripts: " + std::string(e.what())); @@ -450,18 +468,21 @@ std::vector getSpeciesNames(const Field &&field, int root, int main(int argc, char *argv[]) { int world_size; - + MPI_Init(&argc, &argv); { MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &MY_RANK); - + RInsidePOET &R = RInsidePOET::getInstance(); - + if (MY_RANK == 0) { MSG("Running POET version " + std::string(poet_version)); } + + + init_global_functions(R); RuntimeParameters run_params; @@ -473,19 +494,19 @@ int main(int argc, char *argv[]) { case ParseRet::PARSER_OK: break; } - + InitialList init_list(R); init_list.importList(run_params.init_params, MY_RANK != 0); - + MSG("RInside initialized on process " + std::to_string(MY_RANK)); - + std::cout << std::flush; - + MPI_Barrier(MPI_COMM_WORLD); - + ChemistryModule chemistry(run_params.work_package_size, init_list.getChemistryInit(), MPI_COMM_WORLD); - + const ChemistryModule::SurrogateSetup surr_setup = { getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD), run_params.use_dht, @@ -501,56 +522,58 @@ int main(int argc, char *argv[]) { if (MY_RANK > 0) { chemistry.WorkerLoop(); } else { - init_global_functions(R); // R.parseEvalQ("mysetup <- setup"); // // if (MY_RANK == 0) { // get timestep vector from // // grid_init function ... // *global_rt_setup = - master_init_R.value()(*global_rt_setup, run_params.out_dir, - init_list.getInitialGrid().asSEXP()); + master_init_R.value()(*global_rt_setup, run_params.out_dir, + init_list.getInitialGrid().asSEXP()); // MDL: store all parameters // MSG("Calling R Function to store calling parameters"); // R.parseEvalQ("StoreSetup(setup=mysetup)"); + R["out_ext"] = run_params.out_ext; + R["out_dir"] = run_params.out_dir; + if (run_params.use_ai_surrogate) { /* Incorporate ai surrogate from R */ R.parseEvalQ(ai_surrogate_r_library); /* Use dht species for model input and output */ R["ai_surrogate_species"] = init_list.getChemistryInit().dht_species.getNames(); - R["out_dir"] = run_params.out_dir; - + const std::string ai_surrogate_input_script = init_list.getChemistryInit().ai_surrogate_input_script; - - MSG("AI: sourcing user-provided script"); - R.parseEvalQ(ai_surrogate_input_script); - + + MSG("AI: sourcing user-provided script"); + R.parseEvalQ(ai_surrogate_input_script); + MSG("AI: initialize AI model"); - R.parseEval("model <- initiate_model()"); + R.parseEval("model <- initiate_model()"); R.parseEval("gpu_info()"); - } - + } + MSG("Init done on process with rank " + std::to_string(MY_RANK)); - + // MPI_Barrier(MPI_COMM_WORLD); - + DiffusionModule diffusion(init_list.getDiffusionInit(), init_list.getInitialGrid()); - + chemistry.masterSetField(init_list.getInitialGrid()); - + Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry); - + MSG("finished simulation loop"); - + R["profiling"] = profiling; R["setup"] = *global_rt_setup; + R["setup$out_ext"] = run_params.out_ext; string r_vis_code; r_vis_code = - "saveRDS(profiling, file=paste0(setup$out_dir,'/timings.rds'));"; + "SaveRObj(x = profiling, path = paste0(out_dir, '/timings.', setup$out_ext));"; R.parseEval(r_vis_code); - + MSG("Done! Results are stored as R objects into <" + run_params.out_dir + - "/timings.rds>"); + "/timings." + run_params.out_ext); } } diff --git a/src/poet.hpp.in b/src/poet.hpp.in index cca89e264..660a9e074 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -39,7 +39,7 @@ static const inline std::string ai_surrogate_r_library = R"(@R_AI_SURROGATE_LIB@ static const inline std::string r_runtime_parameters = "mysetup"; const std::set flaglist{"ignore-result", "dht", "P", "progress", - "interp", "ai-surrogate"}; + "interp", "ai-surrogate", "qs"}; const std::set paramlist{ "work-package-size", "dht-strategy", "dht-size", "dht-snaps", "dht-file", "interp-size", "interp-min", "interp-bucket-entries"}; @@ -51,6 +51,7 @@ constexpr uint32_t CHEM_DHT_SIZE_PER_PROCESS_MB = 1.5E3; struct RuntimeParameters { std::string out_dir; std::vector timesteps; + std::string out_ext; // MDL added to accomodate for qs::qsave/qread bool print_progressbar; uint32_t work_package_size;