feat: fast serialization/storage using qs package via --qs flag

This commit is contained in:
Marco De Lucia 2024-06-11 16:50:02 +02:00
parent edf936f3d0
commit 8d0be5ae0d
4 changed files with 160 additions and 97 deletions

View File

@ -1,5 +1,5 @@
<!-- <!--
Time-stamp: "Last modified 2023-08-02 13:55:11 mluebke" Time-stamp: "Last modified 2024-09-12 11:39:28 delucia"
--> -->
# POET # POET
@ -87,7 +87,7 @@ follows:
$ R $ R
# install R dependencies # install R dependencies
> install.packages(c("Rcpp", "RInside")) > install.packages(c("Rcpp", "RInside","qs"))
> q(save="no") > q(save="no")
# cd into POET project root # cd into POET project root
@ -133,13 +133,14 @@ With the installation of POET, two executables are provided:
- `poet` - the main executable to run simulations - `poet` - the main executable to run simulations
- `poet_init` - a preprocessor to generate input files for POET from R scripts - `poet_init` - a preprocessor to generate input files for POET from R scripts
Preprocessed benchmarks can be found in the `share/poet` directory with an Preprocessed benchmarks can be found in the `share/poet` directory
according *runtime* setup. More on those files and how to create them later. with an according *runtime* setup. More on those files and how to
create them later.
## Running ## Running
Run POET by `mpirun ./poet [OPTIONS] <RUNFILE> <SIMFILE> <OUTPUT_DIRECTORY>` Run POET by `mpirun ./poet [OPTIONS] <RUNFILE> <SIMFILE>
where: <OUTPUT_DIRECTORY>` where:
- **OPTIONS** - POET options (explained below) - **OPTIONS** - POET options (explained below)
- **RUNFILE** - Runtime parameters described as R script - **RUNFILE** - Runtime parameters described as R script
@ -154,8 +155,9 @@ The following parameters can be set:
|-----------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------| |-----------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------|
| **--work-package-size=** | _1..n_ | size of work packages (defaults to _5_) | | **--work-package-size=** | _1..n_ | size of work packages (defaults to _5_) |
| **-P, --progress** | | show progress bar | | **-P, --progress** | | show progress bar |
| **--ai-surrogate** | | activates the AI surrogate chemistry model (defaults to _OFF_) | | **--ai-surrogate** | | activates the AI surrogate chemistry model (defaults to _OFF_) |
| **--dht** | | enabling DHT usage (defaults to _OFF_) | | **--dht** | | enabling DHT usage (defaults to _OFF_) |
| **--qs** | | store results using qs::qsave() (.qs extension) instead of default RDS (.rds) |
| **--dht-strategy=** | _0-1_ | change DHT strategy. **NOT IMPLEMENTED YET** (Defaults to _0_) | | **--dht-strategy=** | _0-1_ | change DHT strategy. **NOT IMPLEMENTED YET** (Defaults to _0_) |
| **--dht-size=** | _1-n_ | size of DHT per process involved in megabyte (defaults to _1000 MByte_) | | **--dht-size=** | _1-n_ | size of DHT per process involved in megabyte (defaults to _1000 MByte_) |
| **--dht-snaps=** | _0-2_ | disable or enable storage of DHT snapshots | | **--dht-snaps=** | _0-2_ | disable or enable storage of DHT snapshots |
@ -253,12 +255,13 @@ produce any valid predictions.
## Defining a model ## Defining a model
In order to provide a model to POET, you need to setup a R script which can then In order to provide a model to POET, you need to setup a R script
be used by `poet_init` to generate the simulation input. Which parameters are which can then be used by `poet_init` to generate the simulation
required can be found in the input. Which parameters are required can be found in the
[Wiki](https://git.gfz-potsdam.de/naaice/poet/-/wikis/Initialization). We try to [Wiki](https://git.gfz-potsdam.de/naaice/poet/-/wikis/Initialization).
keep the document up-to-date. However, if you encounter missing information or We try to keep the document up-to-date. However, if you encounter
need help, please get in touch with us via the issue tracker or E-Mail. missing information or need help, please get in touch with us via the
issue tracker or E-Mail.
`poet_init` can be used as follows: `poet_init` can be used as follows:
@ -268,46 +271,50 @@ need help, please get in touch with us via the issue tracker or E-Mail.
where: where:
- **output** - name of the output file (defaults to the input file name - **output** - name of the output file (defaults to the input file
with the extension `.rds`) name with the extension `.rds`)
- **setwd** - set the working directory to the directory of the input file (e.g. - **setwd** - set the working directory to the directory of the input
to allow relative paths in the input script). However, the output file file (e.g. to allow relative paths in the input script). However,
will be stored in the directory from which `poet_init` was called. the output file will be stored in the directory from which
`poet_init` was called.
## Additional functions for the AI surrogate ## Additional functions for the AI surrogate
The AI surrogate can be activated for any benchmark and is by default initiated The AI surrogate can be activated for any benchmark and is by default
as a sequential keras model with three hidden layer of depth 48, 96, 24 with initiated as a sequential keras model with three hidden layer of depth
relu activation and adam optimizer. All functions in `ai_surrogate_model.R` can 48, 96, 24 with relu activation and adam optimizer. All functions in
be overridden by adding custom definitions via an R file in the input script. `ai_surrogate_model.R` can be overridden by adding custom definitions
This is done by adding the path to this file in the input script. Simply add the via an R file in the input script. This is done by adding the path to
path as an element called `ai_surrogate_input_script` to the `chemistry_setup` this file in the input script. Simply add the path as an element
list. Please use the global variable `ai_surrogate_base_path` as a base path called `ai_surrogate_input_script` to the `chemistry_setup` list.
Please use the global variable `ai_surrogate_base_path` as a base path
when relative filepaths are used in custom funtions. when relative filepaths are used in custom funtions.
**There is currently no default implementation to determine the validity of **There is currently no default implementation to determine the
predicted values.** This means, that every input script must include an R source validity of predicted values.** This means, that every input script
file with a custom function `validate_predictions(predictors, prediction)`. must include an R source file with a custom function
Examples for custom functions can be found for the barite_200 benchmark `validate_predictions(predictors, prediction)`. Examples for custom
functions can be found for the barite_200 benchmark
The functions can be defined as follows: The functions can be defined as follows:
`validate_predictions(predictors, prediction)`: Returns a boolean index vector `validate_predictions(predictors, prediction)`: Returns a boolean
that signals for each row in the predictions if the values are considered valid. index vector that signals for each row in the predictions if the
Can eg. be implemented as a mass balance threshold between the predictors and values are considered valid. Can eg. be implemented as a mass balance
the prediction. threshold between the predictors and the prediction.
`initiate_model()`: Returns a keras model. Can be used to load pretrained `initiate_model()`: Returns a keras model. Can be used to load
models. pretrained models.
`preprocess(df, backtransform = FALSE, outputs = FALSE)`: Returns the `preprocess(df, backtransform = FALSE, outputs = FALSE)`: Returns the
scaled/transformed/backtransformed dataframe. The `backtransform` flag signals scaled/transformed/backtransformed dataframe. The `backtransform` flag
if the current processing step is applied to data that's assumed to be scaled signals if the current processing step is applied to data that's
and expects backtransformed values. The `outputs` flag signals if the current assumed to be scaled and expects backtransformed values. The `outputs`
processing step is applied to the output or tatget of the model. This can be flag signals if the current processing step is applied to the output
used to eg. skip these processing steps and only scale the model input. or tatget of the model. This can be used to eg. skip these processing
steps and only scale the model input.
`training_step (model, predictor, target, validity)`: Trains the model after `training_step (model, predictor, target, validity)`: Trains the model
each iteration. `validity` is the bool index vector given by after each iteration. `validity` is the bool index vector given by
`validate_predictions` and can eg. be used to only train on values that have not `validate_predictions` and can eg. be used to only train on values
been valid predictions. that have not been valid predictions.

View File

@ -1,4 +1,4 @@
## Time-stamp: "Last modified 2023-08-15 11:58:23 delucia" ## Time-stamp: "Last modified 2024-06-11 14:26:33 delucia"
### Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam) ### Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam)
### ###
@ -35,14 +35,18 @@ master_init <- function(setup, out_dir, init_field) {
setup$iterations <- setup$maxiter setup$iterations <- setup$maxiter
setup$simulation_time <- 0 setup$simulation_time <- 0
dgts <- as.integer(ceiling(log10(setup$maxiter)))
## string format to use in sprintf
fmt <- paste0("%0", dgts, "d")
if (is.null(setup[["store_result"]])) { if (is.null(setup[["store_result"]])) {
setup$store_result <- TRUE setup$store_result <- TRUE
} }
if (setup$store_result) { if (setup$store_result) {
init_field_out <- paste0(out_dir, "/iter_0.rds") init_field_out <- paste0(out_dir, "/iter_", sprintf(fmt = fmt, 0), ".", setup$out_ext)
init_field <- data.frame(init_field, check.names = FALSE) init_field <- data.frame(init_field, check.names = FALSE)
saveRDS(init_field, file = init_field_out) SaveRObj(x = init_field, path = init_field_out)
msgm("Stored initial field in ", init_field_out) msgm("Stored initial field in ", init_field_out)
if (is.null(setup[["out_save"]])) { if (is.null(setup[["out_save"]])) {
setup$out_save <- seq(1, setup$iterations) setup$out_save <- seq(1, setup$iterations)
@ -69,7 +73,7 @@ master_iteration_end <- function(setup, state_T, state_C) {
## comprised in setup$out_save ## comprised in setup$out_save
if (setup$store_result) { if (setup$store_result) {
if (iter %in% setup$out_save) { if (iter %in% setup$out_save) {
nameout <- paste0(setup$out_dir, "/iter_", sprintf(fmt = fmt, iter), ".rds") nameout <- paste0(setup$out_dir, "/iter_", sprintf(fmt = fmt, iter), ".", setup$out_ext)
state_T <- data.frame(state_T, check.names = FALSE) state_T <- data.frame(state_T, check.names = FALSE)
state_C <- data.frame(state_C, check.names = FALSE) state_C <- data.frame(state_C, check.names = FALSE)
@ -77,13 +81,14 @@ master_iteration_end <- function(setup, state_T, state_C) {
prediction_time = if(exists("ai_prediction_time")) as.integer(ai_prediction_time) else NULL, prediction_time = if(exists("ai_prediction_time")) as.integer(ai_prediction_time) else NULL,
training_time = if(exists("ai_training_time")) as.integer(ai_training_time) else NULL, training_time = if(exists("ai_training_time")) as.integer(ai_training_time) else NULL,
valid_predictions = if(exists("validity_vector")) validity_vector else NULL) valid_predictions = if(exists("validity_vector")) validity_vector else NULL)
saveRDS(list(
T = state_T, SaveRObj(x = list(
C = state_C, T = state_T,
simtime = as.integer(setup$simulation_time), C = state_C,
totaltime = as.integer(totaltime), simtime = as.integer(setup$simulation_time),
ai_surrogate_info = ai_surrogate_info totaltime = as.integer(totaltime),
), file = nameout) ai_surrogate_info = ai_surrogate_info
), path = nameout)
msgm("results stored in <", nameout, ">") msgm("results stored in <", nameout, ">")
} }
} }
@ -172,3 +177,30 @@ GetWorkPackageSizesVector <- function(n_packages, package_size, len) {
ids <- rep(1:n_packages, times = package_size, each = 1)[1:len] ids <- rep(1:n_packages, times = package_size, each = 1)[1:len]
return(as.integer(table(ids))) return(as.integer(table(ids)))
} }
## Handler to read R objs from binary files using either builtin
## readRDS() or qs::qread() based on file extension
ReadRObj <- function(path) {
## code borrowed from tools::file_ext()
pos <- regexpr("\\.([[:alnum:]]+)$", path)
extension <- ifelse(pos > -1L, substring(path, pos + 1L), "")
switch(extension,
rds = readRDS(path),
qs = qs::qread(path))
}
## Handler to store R objs to binary files using either builtin
## saveRDS() or qs::qsave() based on file extension
SaveRObj <- function(x, path) {
msgm("Storing to", path)
## code borrowed from tools::file_ext()
pos <- regexpr("\\.([[:alnum:]]+)$", path)
extension <- ifelse(pos > -1L, substring(path, pos + 1L), "")
switch(extension,
rds = saveRDS(object = x, file=path),
qs = qs::qsave(x=x, file = path))
}

View File

@ -52,17 +52,23 @@ static int MY_RANK = 0;
static std::unique_ptr<Rcpp::List> global_rt_setup; static std::unique_ptr<Rcpp::List> global_rt_setup;
// we need some layz evaluation, as we can't define the functions before the R // we need some lazy evaluation, as we can't define the functions
// runtime is initialized // before the R runtime is initialized
static std::optional<Rcpp::Function> master_init_R; static std::optional<Rcpp::Function> master_init_R;
static std::optional<Rcpp::Function> master_iteration_end_R; static std::optional<Rcpp::Function> master_iteration_end_R;
static std::optional<Rcpp::Function> store_setup_R; static std::optional<Rcpp::Function> store_setup_R;
static std::optional<Rcpp::Function> ReadRObj_R;
static std::optional<Rcpp::Function> SaveRObj_R;
static std::optional<Rcpp::Function> source_R;
static void init_global_functions(RInside &R) { static void init_global_functions(RInside &R) {
R.parseEval(kin_r_library); R.parseEval(kin_r_library);
master_init_R = Rcpp::Function("master_init"); master_init_R = Rcpp::Function("master_init");
master_iteration_end_R = Rcpp::Function("master_iteration_end"); master_iteration_end_R = Rcpp::Function("master_iteration_end");
store_setup_R = Rcpp::Function("StoreSetup"); store_setup_R = Rcpp::Function("StoreSetup");
source_R = Rcpp::Function("source");
ReadRObj_R = Rcpp::Function("ReadRObj");
SaveRObj_R = Rcpp::Function("SaveRObj");
} }
// HACK: this is a step back as the order and also the count of fields is // HACK: this is a step back as the order and also the count of fields is
@ -150,8 +156,16 @@ ParseRet parseInitValues(char **argv, RuntimeParameters &params) {
params.use_ai_surrogate = cmdl["ai-surrogate"]; params.use_ai_surrogate = cmdl["ai-surrogate"];
// MDL: optional flag "qs" to switch to qsave()
params.out_ext = "rds";
if (cmdl["qs"]) {
MSG("Enabled <qs> output");
params.out_ext = "qs";
}
if (MY_RANK == 0) { if (MY_RANK == 0) {
// MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result)); // MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result));
MSG("Output format/extension is " + params.out_ext);
MSG("Work Package Size: " + std::to_string(params.work_package_size)); MSG("Work Package Size: " + std::to_string(params.work_package_size));
MSG("DHT is " + BOOL_PRINT(params.use_dht)); MSG("DHT is " + BOOL_PRINT(params.use_dht));
MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate)); MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate));
@ -207,18 +221,22 @@ ParseRet parseInitValues(char **argv, RuntimeParameters &params) {
// R["dht_log"] = simparams.dht_log; // R["dht_log"] = simparams.dht_log;
try { try {
Rcpp::Function source("source"); // Rcpp::Function source("source");
Rcpp::Function readRDS("readRDS"); // Rcpp::Function ReadRObj("ReadRObj");
// Rcpp::Function SaveRObj("SaveRObj");
Rcpp::List init_params_ = readRDS(init_file); Rcpp::List init_params_ = ReadRObj_R.value()(init_file);
params.init_params = init_params_; params.init_params = init_params_;
global_rt_setup = std::make_unique<Rcpp::List>(); global_rt_setup = std::make_unique<Rcpp::List>();
*global_rt_setup = source(runtime_file, Rcpp::Named("local", true)); *global_rt_setup = source_R.value()(runtime_file, Rcpp::Named("local", true));
*global_rt_setup = global_rt_setup->operator[]("value"); *global_rt_setup = global_rt_setup->operator[]("value");
// MDL add "out_ext" for output format to R setup
(*global_rt_setup)["out_ext"] = params.out_ext;
params.timesteps = params.timesteps =
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps")); Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps"));
} catch (const std::exception &e) { } catch (const std::exception &e) {
ERRMSG("Error while parsing R scripts: " + std::string(e.what())); ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
@ -450,18 +468,21 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int world_size; int world_size;
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
{ {
MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &MY_RANK); MPI_Comm_rank(MPI_COMM_WORLD, &MY_RANK);
RInsidePOET &R = RInsidePOET::getInstance(); RInsidePOET &R = RInsidePOET::getInstance();
if (MY_RANK == 0) { if (MY_RANK == 0) {
MSG("Running POET version " + std::string(poet_version)); MSG("Running POET version " + std::string(poet_version));
} }
init_global_functions(R);
RuntimeParameters run_params; RuntimeParameters run_params;
@ -473,19 +494,19 @@ int main(int argc, char *argv[]) {
case ParseRet::PARSER_OK: case ParseRet::PARSER_OK:
break; break;
} }
InitialList init_list(R); InitialList init_list(R);
init_list.importList(run_params.init_params, MY_RANK != 0); init_list.importList(run_params.init_params, MY_RANK != 0);
MSG("RInside initialized on process " + std::to_string(MY_RANK)); MSG("RInside initialized on process " + std::to_string(MY_RANK));
std::cout << std::flush; std::cout << std::flush;
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
ChemistryModule chemistry(run_params.work_package_size, ChemistryModule chemistry(run_params.work_package_size,
init_list.getChemistryInit(), MPI_COMM_WORLD); init_list.getChemistryInit(), MPI_COMM_WORLD);
const ChemistryModule::SurrogateSetup surr_setup = { const ChemistryModule::SurrogateSetup surr_setup = {
getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD), getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD),
run_params.use_dht, run_params.use_dht,
@ -501,56 +522,58 @@ int main(int argc, char *argv[]) {
if (MY_RANK > 0) { if (MY_RANK > 0) {
chemistry.WorkerLoop(); chemistry.WorkerLoop();
} else { } else {
init_global_functions(R);
// R.parseEvalQ("mysetup <- setup"); // R.parseEvalQ("mysetup <- setup");
// // if (MY_RANK == 0) { // get timestep vector from // // if (MY_RANK == 0) { // get timestep vector from
// // grid_init function ... // // // grid_init function ... //
*global_rt_setup = *global_rt_setup =
master_init_R.value()(*global_rt_setup, run_params.out_dir, master_init_R.value()(*global_rt_setup, run_params.out_dir,
init_list.getInitialGrid().asSEXP()); init_list.getInitialGrid().asSEXP());
// MDL: store all parameters // MDL: store all parameters
// MSG("Calling R Function to store calling parameters"); // MSG("Calling R Function to store calling parameters");
// R.parseEvalQ("StoreSetup(setup=mysetup)"); // R.parseEvalQ("StoreSetup(setup=mysetup)");
R["out_ext"] = run_params.out_ext;
R["out_dir"] = run_params.out_dir;
if (run_params.use_ai_surrogate) { if (run_params.use_ai_surrogate) {
/* Incorporate ai surrogate from R */ /* Incorporate ai surrogate from R */
R.parseEvalQ(ai_surrogate_r_library); R.parseEvalQ(ai_surrogate_r_library);
/* Use dht species for model input and output */ /* Use dht species for model input and output */
R["ai_surrogate_species"] = init_list.getChemistryInit().dht_species.getNames(); R["ai_surrogate_species"] = init_list.getChemistryInit().dht_species.getNames();
R["out_dir"] = run_params.out_dir;
const std::string ai_surrogate_input_script = init_list.getChemistryInit().ai_surrogate_input_script; const std::string ai_surrogate_input_script = init_list.getChemistryInit().ai_surrogate_input_script;
MSG("AI: sourcing user-provided script"); MSG("AI: sourcing user-provided script");
R.parseEvalQ(ai_surrogate_input_script); R.parseEvalQ(ai_surrogate_input_script);
MSG("AI: initialize AI model"); MSG("AI: initialize AI model");
R.parseEval("model <- initiate_model()"); R.parseEval("model <- initiate_model()");
R.parseEval("gpu_info()"); R.parseEval("gpu_info()");
} }
MSG("Init done on process with rank " + std::to_string(MY_RANK)); MSG("Init done on process with rank " + std::to_string(MY_RANK));
// MPI_Barrier(MPI_COMM_WORLD); // MPI_Barrier(MPI_COMM_WORLD);
DiffusionModule diffusion(init_list.getDiffusionInit(), DiffusionModule diffusion(init_list.getDiffusionInit(),
init_list.getInitialGrid()); init_list.getInitialGrid());
chemistry.masterSetField(init_list.getInitialGrid()); chemistry.masterSetField(init_list.getInitialGrid());
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry); Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
MSG("finished simulation loop"); MSG("finished simulation loop");
R["profiling"] = profiling; R["profiling"] = profiling;
R["setup"] = *global_rt_setup; R["setup"] = *global_rt_setup;
R["setup$out_ext"] = run_params.out_ext;
string r_vis_code; string r_vis_code;
r_vis_code = r_vis_code =
"saveRDS(profiling, file=paste0(setup$out_dir,'/timings.rds'));"; "SaveRObj(x = profiling, path = paste0(out_dir, '/timings.', setup$out_ext));";
R.parseEval(r_vis_code); R.parseEval(r_vis_code);
MSG("Done! Results are stored as R objects into <" + run_params.out_dir + MSG("Done! Results are stored as R objects into <" + run_params.out_dir +
"/timings.rds>"); "/timings." + run_params.out_ext);
} }
} }

View File

@ -39,7 +39,7 @@ static const inline std::string ai_surrogate_r_library = R"(@R_AI_SURROGATE_LIB@
static const inline std::string r_runtime_parameters = "mysetup"; static const inline std::string r_runtime_parameters = "mysetup";
const std::set<std::string> flaglist{"ignore-result", "dht", "P", "progress", const std::set<std::string> flaglist{"ignore-result", "dht", "P", "progress",
"interp", "ai-surrogate"}; "interp", "ai-surrogate", "qs"};
const std::set<std::string> paramlist{ const std::set<std::string> paramlist{
"work-package-size", "dht-strategy", "dht-size", "dht-snaps", "work-package-size", "dht-strategy", "dht-size", "dht-snaps",
"dht-file", "interp-size", "interp-min", "interp-bucket-entries"}; "dht-file", "interp-size", "interp-min", "interp-bucket-entries"};
@ -51,6 +51,7 @@ constexpr uint32_t CHEM_DHT_SIZE_PER_PROCESS_MB = 1.5E3;
struct RuntimeParameters { struct RuntimeParameters {
std::string out_dir; std::string out_dir;
std::vector<double> timesteps; std::vector<double> timesteps;
std::string out_ext; // MDL added to accomodate for qs::qsave/qread
bool print_progressbar; bool print_progressbar;
uint32_t work_package_size; uint32_t work_package_size;