From ef77d755ab86381d4c4eb879033a4dfdefc23dd6 Mon Sep 17 00:00:00 2001 From: Hannes Signer Date: Wed, 8 Jan 2025 17:17:06 +0100 Subject: [PATCH] prototype naa online training --- src/CMakeLists.txt | 12 +- src/Chemistry/ChemistryModule.hpp | 2 + .../SurrogateModels/AI_functions.cpp | 128 ++++-- src/Chemistry/SurrogateModels/serializer.cpp | 15 +- src/poet.cpp | 39 +- src/poet.hpp | 412 ++++++++++++++++++ src/poet.hpp.in | 8 + 7 files changed, 567 insertions(+), 49 deletions(-) create mode 100644 src/poet.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1feb6b86d..f89163c07 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -18,6 +18,9 @@ target_sources(POETLib Chemistry/SurrogateModels/ProximityHashTable.cpp ) +# add_compile_options(-fsanitize=address) +# add_link_options(-fsanitize=address) + target_include_directories( POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}" @@ -45,14 +48,10 @@ if(USE_AI_SURROGATE) find_package(Python3 COMPONENTS Interpreter Development NumPy REQUIRED) find_package(Threads REQUIRED) - - set_source_files_properties( - Chemistry/SurrogateModels/AI_functions.cpp - PROPERTIES COMPILE_FLAGS "-O3 -march=native -mtune=native" - ) target_sources(POETLib PRIVATE + Chemistry/SurrogateModels/serializer.cpp Chemistry/SurrogateModels/AI_functions.cpp ) @@ -64,6 +63,7 @@ if(USE_AI_SURROGATE) target_link_libraries(POETLib PUBLIC "${Python3_LIBRARIES}" PUBLIC Threads::Threads pthread + PUBLIC naaice::middleware ) endif() # USE_AI_SURROGATE @@ -139,7 +139,7 @@ configure_file(poet.hpp.in poet.hpp @ONLY) add_executable(poet poet.cpp) -target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11 naaice::middleware) +target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11) target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") add_executable(poet_init initializer.cpp) diff --git a/src/Chemistry/ChemistryModule.hpp b/src/Chemistry/ChemistryModule.hpp index c06293572..4dc642b81 100644 --- a/src/Chemistry/ChemistryModule.hpp +++ b/src/Chemistry/ChemistryModule.hpp @@ -83,7 +83,9 @@ public: std::uint32_t interp_bucket_size; std::uint32_t interp_size_mb; std::uint32_t interp_min_entries; + // ? extra struct for AI surrogate options? bool ai_surrogate_enabled; + bool naa_enabled; }; void masterEnableSurrogates(const SurrogateSetup &setup) { diff --git a/src/Chemistry/SurrogateModels/AI_functions.cpp b/src/Chemistry/SurrogateModels/AI_functions.cpp index d4023caec..b3e5ffef9 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.cpp +++ b/src/Chemistry/SurrogateModels/AI_functions.cpp @@ -478,7 +478,7 @@ void Python_Keras_train(std::vector> &x, } } - // Choose the correct model to traimn if clustering is used + // Choose the correct model to train if clustering is used if (train_cluster == 1) { if (!model_path.empty()) { model_path.insert(model_path.length() - 6, "_reaction"); @@ -637,6 +637,7 @@ void parallel_training(EigenModel *Eigen_model, // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); + // Start training Python_Keras_train(inputs, targets, train_cluster, model_name, params); @@ -669,17 +670,12 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive, bool *start_training, bool *end_training, const RuntimeParameters ¶ms, naa_handle *handle){ + fprintf(stdout, "In naa_training\n"); + + // initialize models with weights from pretrained keras model // declare memory regions for model weights, training and target data - // Initialize training data input and targets - std::vector> inputs( - training_data_buffer->x.size(), - std::vector(params.training_data_size)); - std::vector> targets( - training_data_buffer->x.size(), - std::vector(params.training_data_size)); - PyGILState_STATE gstate = PyGILState_Ensure(); Eigen_model_mutex->lock(); @@ -696,7 +692,15 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive, Eigen_model_mutex->unlock(); PyGILState_Release(gstate); - // determine size for reuired memory regions + // Initialize training data input and targets + std::vector> inputs( + Eigen_model->biases[Eigen_model->biases.size()-1].size(), // number of species + std::vector(params.training_data_size)); + std::vector> targets( + Eigen_model->biases[Eigen_model->biases.size()-1].size(), + std::vector(params.training_data_size)); + + // determine size for required memory regions size_t modelSize = calculateStructSize(Eigen_model, 'E'); size_t modelSizeReactive = calculateStructSize(Eigen_model_reactive, 'E'); @@ -723,14 +727,16 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive, } // create memory regions - struct naa_param_t weight_region[] = { + struct naa_param_t input_regions[] = { {(void *)serializedModel, modelSize}, {(void *)serializedTrainingData, trainingDataSize}, {(void *)serializedTargetData, targetDataSize}}; + struct naa_param_t output_regions[] = {{(void *)serializedModel, modelSize}}; + printf("-- Setting Up Connection --\n"); // function code encode the used ai model - if (naa_create(1, weight_region, 1, weight_region, 0, handle)) { + if (naa_create(1, input_regions, 3, output_regions, 1, handle)) { fprintf(stderr, "Error during naa_create. Exiting.\n"); exit(EXIT_FAILURE); } @@ -826,31 +832,101 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive, } else { int res = serializeModelWeights(Eigen_model, serializedModel); } + + // checksum serializeModel + double checksum_model = 0; + for(size_t i = 0; i < Eigen_model->weight_matrices.size(); i++){ + checksum_model += Eigen_model->weight_matrices[i].sum(); + } + for(size_t j=0; jbiases.size(); j++){ + checksum_model += Eigen_model->biases[j].sum(); + } + + fprintf(stdout, "Checksum model: %f\n", checksum_model); + int res1 = serializeTrainingData(&inputs, serializedTrainingData); int res2 = serializeTrainingData(&targets, serializedTargetData); + // std::vector> deserializedTrainingData = deserializeTrainingData(serializedTrainingData); + // std::vector> deserializedTargetData = deserializeTrainingData(serializedTargetData); + + + // calculate checksum of inputs + // double checksum_inputs = 0; + // for (size_t i = 0; i < inputs.size(); i++) { + // for (size_t j = 0; j < inputs[i].size(); j++) { + // checksum_inputs += inputs[i][j]; + // // fprintf(stdout, "inputs: %f\n", inputs[i][j]); + // } + // } - printf("-- RPC Invocation --\n"); - if (naa_invoke(handle)) { - fprintf(stderr, "Error during naa_invoke. Exiting.\n"); + // // calculate checksum of inputs + // double checksum_targets = 0; + // for (size_t i = 0; i < targets.size(); i++) { + // for (size_t j = 0; j < targets[i].size(); j++) { + // checksum_targets += targets[i][j]; + // // fprintf(stdout, "inputs: %f\n", inputs[i][j]); + // } + // } + + // double checksum_training = 0; + // for (size_t i = 0; i < deserializedTrainingData.size(); i++) { + // for (size_t j = 0; j < deserializedTrainingData[i].size(); j++) { + // checksum_training += deserializedTrainingData[i][j]; + // // fprintf(stdout, "inputs: %f\n", deserializedTrainingData[i][j]); + // } + // } + + // double checksum_testing = 0; + // for (size_t i = 0; i < deserializedTargetData.size(); i++) { + // for (size_t j = 0; j < deserializedTargetData[i].size(); j++) { + // checksum_testing += deserializedTargetData[i][j]; + // // fprintf(stdout, "inputs: %f\n", deserializedTrainingData[i][j]); + // } + // } + + // fprintf(stdout, "Checksum inputs: %f\n", checksum_inputs); + // fprintf(stdout, "Checksum training: %f\n", checksum_training); + // fprintf(stdout, "Checksum targets: %f\n", checksum_targets); + // fprintf(stdout, "Checksum testing: %f\n", checksum_testing); + + printf("-- RPC Invocation --\n"); + if (naa_invoke(handle)) { + fprintf(stderr, "Error during naa_invoke. Exiting.\n"); + exit(EXIT_FAILURE); + } + + // naa_wait with new weights + naa_status status; + if (naa_wait(handle, &status)) { + fprintf(stderr, "Error occurred during naa_wait. Exiting.\n"); exit(EXIT_FAILURE); } - // TODO: naa_wait with new weights - - // TODO: update model weights with received weights + printf("Bytes received: %d, RPC Return code: %d\n", + status.bytes_received, status.naa_error); + // update model weights with received weights EigenModel deserializedModel = deserializeModelWeights(serializedModel, modelSize); fprintf(stdout, "After deserialization: %f\n", deserializedModel.weight_matrices[0](0, 0)); - for (int i = 0; i < Eigen_model->weight_matrices[0].rows(); i++) { - for (int j = 0; j < Eigen_model->weight_matrices[0].cols(); j++) { - fprintf(stdout, "model: %f, deserializedModel: %f\n", - Eigen_model->weight_matrices[0](i, j), - deserializedModel.weight_matrices[0](i, j)); - } - } + Eigen_model_mutex->lock(); + + Eigen_model->weight_matrices = deserializedModel.weight_matrices; + Eigen_model->biases = deserializedModel.biases; + + Eigen_model_mutex->unlock(); + + + + // for (int i = 0; i < Eigen_model->weight_matrices[0].rows(); i++) { + // for (int j = 0; j < Eigen_model->weight_matrices[0].cols(); j++) { + // fprintf(stdout, "model: %f, deserializedModel: %f\n", + // Eigen_model->weight_matrices[0](i, j), + // deserializedModel.weight_matrices[0](i, j)); + // } + // } } printf("-- Cleaning Up --\n"); @@ -917,7 +993,7 @@ int Python_Keras_training_thread( * @param weights Vector of model weights from keras as returned by * Python_Keras_get_weights() */ - // ? check if updating was succesful -> hash about values? + // ? check if updating was successful -> hash about values? void update_weights( EigenModel *model, const std::vector>> &weights) { diff --git a/src/Chemistry/SurrogateModels/serializer.cpp b/src/Chemistry/SurrogateModels/serializer.cpp index 95ce67086..162e57cfa 100644 --- a/src/Chemistry/SurrogateModels/serializer.cpp +++ b/src/Chemistry/SurrogateModels/serializer.cpp @@ -35,7 +35,7 @@ size_t calculateStructSize(void *struct_pointer, char type){ } } else if (type == 'T') { struct_size += sizeof(size_t); // number of vectors - struct_size += sizeof(size_t); // length of vector + struct_size += static_cast>*>(struct_pointer)->size() * sizeof(size_t); // length of vector for (const std::vector &vector: *static_cast>*>(struct_pointer)){ struct_size += vector.size() * sizeof(double); @@ -56,7 +56,6 @@ int serializeModelWeights(const EigenModel *model, char *memory){ size_counter += sizeof(size_t); for (const Eigen::MatrixXd &matrix : model->weight_matrices) { size_t rows = matrix.rows(), cols = matrix.cols(); - fprintf(stdout, "rows: %zu, cols: %zu\n", rows, cols); std::memcpy(memory, &rows, sizeof(size_t)); memory += sizeof(size_t); size_counter += sizeof(size_t); @@ -125,9 +124,8 @@ EigenModel deserializeModelWeights(char *memory, size_t buffer_size) { } // interpret memory as Eigen::MatrixXd (more efficient than memcpy?) - Eigen::MatrixXd temp = Eigen::Map( + matrix = Eigen::Map( reinterpret_cast(memory), rows, cols); - matrix = temp; // copy data to matrix memory += rows * cols * sizeof(double); size_counter += rows * cols * sizeof(double); } @@ -161,7 +159,6 @@ EigenModel deserializeModelWeights(char *memory, size_t buffer_size) { } // same procedure as for the matrices - // TODO: delete temp variable Eigen::VectorXd temp = Eigen::Map( reinterpret_cast(memory), size); bias = temp; // Kopieren der Daten @@ -173,14 +170,14 @@ EigenModel deserializeModelWeights(char *memory, size_t buffer_size) { } -int serializeTrainingData(std::vector> data, char *memory){ +int serializeTrainingData(std::vector> *data, char *memory){ - size_t num_vectors = data.size(); + size_t num_vectors = data->size(); std::memcpy(memory, &num_vectors, sizeof(size_t)); memory += sizeof(size_t); - for (const std::vector &vector : data) { + for (const std::vector &vector : *data) { size_t size = vector.size(); std::memcpy(memory, &size, sizeof(size_t)); memory += sizeof(size_t); @@ -196,6 +193,7 @@ std::vector> deserializeTrainingData(char* data){ std::vector> deserialized_data; size_t num_vectors; std::memcpy(&num_vectors, data, sizeof(size_t)); + fprintf(stdout, "num_vectors: %zu\n", num_vectors); data += sizeof(size_t); for (size_t i = 0; i < num_vectors; i++) { @@ -205,6 +203,7 @@ std::vector> deserializeTrainingData(char* data){ std::vector vector(size); std::memcpy(vector.data(), data, size * sizeof(double)); + data += size * sizeof(double); deserialized_data.push_back(vector); diff --git a/src/poet.cpp b/src/poet.cpp index 7656659b7..e7eeea906 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -46,9 +47,7 @@ #include #include -extern "C"{ -#include -} + using namespace std; using namespace poet; @@ -77,6 +76,11 @@ static void init_global_functions(RInside &R) { SaveRObj_R = DEFunc("SaveRObj"); } +void signalHandler(int signum) { + fprintf(stderr, "Terminate program by user.\n"); + exit(signum); +} + // HACK: this is a step back as the order and also the count of fields is // predefined, but it will change in the future @@ -310,18 +314,30 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::vector cluster_labels; if (params.use_ai_surrogate) { MSG("AI: Initialize model"); + struct naa_handle *handle = NULL; + if(params.use_naa){ + handle = (naa_handle*) calloc(1, sizeof(struct naa_handle)); + if(!handle){ + throw std::runtime_error("Could not allocate memory for NAA handle"); + } + + // TODO: create regions here (size could be determined from run_params (size of trainings buffer)) + struct naa_param_t *model_weights; + } // Initiate two models from one file Python_Keras_load_model(R["model_file_path"], R["model_reactive_file_path"], params.use_clustering); + if (!params.disable_training) { MSG("AI: Initialize training thread"); // TODO add naa_handle as optional parameter which is NULL per default + Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive, &Eigen_model_mutex, &training_data_buffer, &training_data_buffer_mutex, &training_data_buffer_full, &start_training, - &end_training, params); + &end_training, params, handle); } if (!params.use_Keras_predictions) { // Initialize Eigen model for custom inference function @@ -390,12 +406,11 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, R["TMP"] = Rcpp::wrap(chem.getField().AsVector()); R.parseEval(std::string("predictors <- ") + "set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)"); - // Apply preprocessing MSG("AI Preprocessing"); R.parseEval("predictors_scaled <- preprocess(predictors)"); std::vector> predictors_scaled = R["predictors_scaled"]; - + // Get K-Means cluster assignements based on the preprocessed data if (params.use_clustering) { R.parseEval("cluster_labels <- assign_clusters(predictors_scaled)"); @@ -425,6 +440,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, // Validate prediction and write valid predictions to chem field MSG("AI: Validate"); R.parseEval("validity_vector <- validate_predictions(predictors, predictions)"); + // R.parseEval("print(head(predictors, 30))"); MSG("AI: Marking valid"); chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector")); @@ -470,8 +486,9 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::vector> invalid_x = R.parseEval("get_invalid_values(predictors_scaled, validity_vector)"); - + R.parseEval("target_scaled <- preprocess(state_C[ai_surrogate_species])"); + // R.parseEval("print(head(state_C[ai_surrogate_species], 30))"); std::vector> invalid_y = R.parseEval("get_invalid_values(target_scaled, validity_vector)"); @@ -599,6 +616,8 @@ std::vector getSpeciesNames(const Field &&field, int root, } int main(int argc, char *argv[]) { + signal(SIGINT, signalHandler); + int world_size; // Threadsafe MPI is necessary for the AI surrogate @@ -656,9 +675,11 @@ int main(int argc, char *argv[]) { run_params.interp_bucket_entries, run_params.interp_size, run_params.interp_min_entries, - run_params.use_ai_surrogate}; + run_params.use_ai_surrogate, + run_params.use_naa}; // TODO add option for naa training + chemistry.masterEnableSurrogates(surr_setup); if (MY_RANK > 0) { @@ -685,6 +706,7 @@ int main(int argc, char *argv[]) { chemistry.masterSetField(init_list.getInitialGrid()); if (run_params.use_ai_surrogate) { + // Load default function implementations R.parseEvalQ(ai_surrogate_r_library); /* Use dht species for model input and output */ @@ -704,7 +726,6 @@ int main(int argc, char *argv[]) { variable of the same name in one of the the R input scripts)*/ run_params.training_data_size = init_list.getDiffusionInit().n_rows * init_list.getDiffusionInit().n_cols; // Default value is number of cells in field - // ? error handling for all if statements if (Rcpp::as(R.parseEval("exists(\"batch_size\")"))) { run_params.batch_size = R["batch_size"]; } diff --git a/src/poet.hpp b/src/poet.hpp new file mode 100644 index 000000000..bea9c48d2 --- /dev/null +++ b/src/poet.hpp @@ -0,0 +1,412 @@ +/* +** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of +** Potsdam) +** +** Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam) +** +** Copyright (C) 2023-2024 Max Luebke (University of Potsdam) +** +** POET is free software; you can redistribute it and/or modify it under the +** terms of the GNU General Public License as published by the Free Software +** Foundation; either version 2 of the License, or (at your option) any later +** version. +** +** POET is distributed in the hope that it will be useful, but WITHOUT ANY +** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +** A PARTICULAR PURPOSE. See the GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License along with +** this program; if not, write to the Free Software Foundation, Inc., 51 +** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#pragma once + +#include +#include +#include +#include + +#include + +#define SRC_DIR "/mnt/scratch/signer/poet" +#define CUDA_SRC_DIR "" +#define USE_NAA "USE_NAA;ON" + +// target and source IP address for NAA support +// #ifdef USE_NAA +#define SOURCE_IP "10.3.10.41" +#define TARGET_IP "10.3.10.42" +// #endif + +static const char *poet_version = "naaice/v0.3-105-g13ad41d"; + +// using the Raw string literal to avoid escaping the quotes +static const inline std::string kin_r_library = R"(### Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam) +### +### POET is free software; you can redistribute it and/or modify it under the +### terms of the GNU General Public License as published by the Free Software +### Foundation; either version 2 of the License, or (at your option) any later +### version. +### +### POET is distributed in the hope that it will be useful, but WITHOUT ANY +### WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +### A PARTICULAR PURPOSE. See the GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License along with +### this program; if not, write to the Free Software Foundation, Inc., 51 +### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +master_init <- function(setup, out_dir, init_field) { + ## Setup the directory where we will store the results + if (!dir.exists(out_dir)) { + dir.create(out_dir) + msgm("created directory ", out_dir) + } else { + msgm("dir ", out_dir, " already exists, I will overwrite!") + } + if (is.null(setup$store_result)) { + msgm("store_result doesn't exist!") + } else { + msgm("store_result is ", setup$store_result) + } + + setup$iter <- 1 + setup$timesteps <- setup$timesteps + setup$maxiter <- length(setup$timesteps) + setup$iterations <- setup$maxiter + setup$simulation_time <- 0 + + dgts <- as.integer(ceiling(log10(setup$maxiter))) + ## string format to use in sprintf + fmt <- paste0("%0", dgts, "d") + + if (is.null(setup[["store_result"]])) { + setup$store_result <- TRUE + } + + if (setup$store_result) { + init_field_out <- paste0(out_dir, "/iter_", sprintf(fmt = fmt, 0), ".", setup$out_ext) + init_field <- data.frame(init_field, check.names = FALSE) + SaveRObj(x = init_field, path = init_field_out) + msgm("Stored initial field in ", init_field_out) + if (is.null(setup[["out_save"]])) { + setup$out_save <- seq(1, setup$iterations) + } + } + + setup$out_dir <- out_dir + + return(setup) +} + +## This function, called only by master, stores on disk the last +## calculated time step if store_result is TRUE and increments the +## iteration counter +master_iteration_end <- function(setup, state_T, state_C) { + iter <- setup$iter + # print(iter) + ## max digits for iterations + dgts <- as.integer(ceiling(log10(setup$maxiter + 1))) + ## string format to use in sprintf + fmt <- paste0("%0", dgts, "d") + + ## Write on disk state_T and state_C after every iteration + ## comprised in setup$out_save + if (setup$store_result) { + if (iter %in% setup$out_save) { + nameout <- paste0(setup$out_dir, "/iter_", sprintf(fmt = fmt, iter), ".", setup$out_ext) + state_T <- data.frame(state_T, check.names = FALSE) + state_C <- data.frame(state_C, check.names = FALSE) + + ai_surrogate_info <- list( + prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL, + predictions_validity = if (exists("validity_vector")) validity_vector else NULL, + cluster_labels = if (exists("cluster_labels")) cluster_labels else NULL, + predictions = if (exists("predictions")) predictions else NULL + ) + + SaveRObj(x = list( + T = state_T, + C = state_C, + simtime = as.integer(setup$simulation_time), + totaltime = as.integer(totaltime), + ai_surrogate_info = ai_surrogate_info + ), path = nameout) + msgm("results stored in <", nameout, ">") + } + } + ## Add last time step to simulation time + setup$simulation_time <- setup$simulation_time + setup$timesteps[iter] + + msgm("done iteration", iter, "/", length(setup$timesteps)) + setup$iter <- setup$iter + 1 + return(setup) +} + + +## Attach the name of the calling function to the message displayed on +## R's stdout +msgm <- function(...) { + prefix <- paste0("R: ") + cat(paste(prefix, ..., "\n")) + invisible() +} + + +## Function called by master R process to store on disk all relevant +## parameters for the simulation +StoreSetup <- function(setup, filesim, out_dir) { + to_store <- vector(mode = "list", length = 4) + ## names(to_store) <- c("Sim", "Flow", "Transport", "Chemistry", "DHT") + names(to_store) <- c("Sim", "Transport", "DHT", "Cmdline") + + ## read the setup R file, which is sourced in kin.cpp + tmpbuff <- file(filesim, "r") + setupfile <- readLines(tmpbuff) + close.connection(tmpbuff) + + to_store$Sim <- setupfile + + ## to_store$Flow <- list( + ## snapshots = setup$snapshots, + ## gridfile = setup$gridfile, + ## phase = setup$phase, + ## density = setup$density, + ## dt_differ = setup$dt_differ, + ## prolong = setup$prolong, + ## maxiter = setup$maxiter, + ## saved_iter = setup$iter_output, + ## out_save = setup$out_save ) + + to_store$Transport <- setup$diffusion + + ## to_store$Chemistry <- list( + ## nprocs = n_procs, + ## wp_size = work_package_size, + ## base = setup$base, + ## first = setup$first, + ## init = setup$initsim, + ## db = db, + ## kin = setup$kin, + ## ann = setup$ann) + + if (dht_enabled) { + to_store$DHT <- list( + enabled = dht_enabled, + log = dht_log + ## signif = dht_final_signif, + ## proptype = dht_final_proptype + ) + } else { + to_store$DHT <- FALSE + } + + if (dht_enabled) { + to_store$DHT <- list( + enabled = dht_enabled, + log = dht_log + # signif = dht_final_signif, + # proptype = dht_final_proptype + ) + } else { + to_store$DHT <- FALSE + } + + saveRDS(to_store, file = paste0(fileout, "/setup.rds")) + msgm("initialization stored in ", paste0(fileout, "/setup.rds")) +} + +GetWorkPackageSizesVector <- function(n_packages, package_size, len) { + ids <- rep(1:n_packages, times = package_size, each = 1)[1:len] + return(as.integer(table(ids))) +} + + +## Handler to read R objs from binary files using either builtin +## readRDS() or qs::qread() based on file extension +ReadRObj <- function(path) { + ## code borrowed from tools::file_ext() + pos <- regexpr("\\.([[:alnum:]]+)$", path) + extension <- ifelse(pos > -1L, substring(path, pos + 1L), "") + + switch(extension, + rds = readRDS(path), + qs = qs::qread(path) + ) +} + +## Handler to store R objs to binary files using either builtin +## saveRDS() or qs::qsave() based on file extension +SaveRObj <- function(x, path) { + msgm("Storing to", path) + ## code borrowed from tools::file_ext() + pos <- regexpr("\\.([[:alnum:]]+)$", path) + extension <- ifelse(pos > -1L, substring(path, pos + 1L), "") + + switch(extension, + rds = saveRDS(object = x, file = path), + qs = qs::qsave(x = x, file = path) + ) +} +)"; +static const inline std::string init_r_library = R"(### Copyright (C) 2018-2024 Marco De Lucia, Max Luebke (GFZ Potsdam, University of Potsdam) +### +### POET is free software; you can redistribute it and/or modify it under the +### terms of the GNU General Public License as published by the Free Software +### Foundation; either version 2 of the License, or (at your option) any later +### version. +### +### POET is distributed in the hope that it will be useful, but WITHOUT ANY +### WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +### A PARTICULAR PURPOSE. See the GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License along with +### this program; if not, write to the Free Software Foundation, Inc., 51 +### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +##' @param pqc_mat matrix, containing IDs and PHREEQC outputs +##' @param grid matrix, zonation referring to pqc_mat$ID +##' @return a data.frame +pqc_to_grid <- function(pqc_mat, grid) { + # Convert the input DataFrame to a matrix + pqc_mat <- as.matrix(pqc_mat) + + # Flatten the matrix into a vector + id_vector <- as.integer(t(grid)) + + # Find the matching rows in the matrix + row_indices <- match(id_vector, pqc_mat[, "ID"]) + + # Extract the matching rows from pqc_mat to size of grid matrix + result_mat <- pqc_mat[row_indices, ] + + # Convert the result matrix to a data frame + res_df <- as.data.frame(result_mat) + + # Remove all columns which only contain NaN + res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)] + + # Remove row names + rownames(res_df) <- NULL + + return(res_df) +} + + +##' @param pqc_mat matrix, +##' @param transport_spec column name of species in pqc_mat +##' @param id +##' @title +##' @return +resolve_pqc_bound <- function(pqc_mat, transport_spec, id) { + df <- as.data.frame(pqc_mat, check.names = FALSE) + value <- df[df$ID == id, transport_spec] + + if (is.nan(value)) { + value <- 0 + } + + return(value) +} + +##' @title +##' @param init_grid +##' @param new_names +##' @return +add_missing_transport_species <- function(init_grid, new_names) { + # add 'ID' to new_names front, as it is not a transport species but required + new_names <- c("ID", new_names) + sol_length <- length(new_names) + + new_grid <- data.frame(matrix(0, nrow = nrow(init_grid), ncol = sol_length)) + names(new_grid) <- new_names + + matching_cols <- intersect(names(init_grid), new_names) + + # Copy matching columns from init_grid to new_grid + new_grid[, matching_cols] <- init_grid[, matching_cols] + + + # Add missing columns to new_grid + append_df <- init_grid[, !(names(init_grid) %in% new_names)] + new_grid <- cbind(new_grid, append_df) + + return(new_grid) +} +)"; +static const inline std::string ai_surrogate_r_library = + R"(## This file contains default function implementations for the ai surrogate. +## To use pre-/postprocessing it is recommended to override these functions +## with custom implementations via the input script. The path to the R-file +## See the barite_50.R file as an example and the general README for more +## information. + +preprocess <- function(df) { + return(df) +} + +postprocess <- function(df) { + return(df) +} + +set_valid_predictions <- function(temp_field, prediction, validity) { + temp_field[validity == 1, ] <- prediction[validity == 1, ] + return(temp_field) +} + +get_invalid_values <- function(df, validity) { + return(df[validity == 0, ]) +} + +set_field <- function(temp_field, columns, rows, column_name_limit, + byrow = FALSE) { + temp_field <- matrix(temp_field, nrow = rows, byrow = byrow) + temp_field <- setNames(data.frame(temp_field), columns) + temp_field <- temp_field[column_name_limit] + return(temp_field) +} +)"; +static const inline std::string r_runtime_parameters = "mysetup"; + +struct RuntimeParameters { + std::string out_dir; + std::vector timesteps; + + Rcpp::List init_params; + + bool as_rds = false; + std::string out_ext; // MDL added to accomodate for qs::qsave/qread + + bool print_progress = false; + + static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32; + std::uint32_t work_package_size; + + bool use_dht = false; + static constexpr std::uint32_t DHT_SIZE_DEFAULT = 1.5E3; + std::uint32_t dht_size; + static constexpr std::uint8_t DHT_SNAPS_DEFAULT = 0; + std::uint8_t dht_snaps; + + bool use_interp = false; + static constexpr std::uint32_t INTERP_SIZE_DEFAULT = 100; + std::uint32_t interp_size; + static constexpr std::uint32_t INTERP_MIN_ENTRIES_DEFAULT = 5; + std::uint32_t interp_min_entries; + static constexpr std::uint32_t INTERP_BUCKET_ENTRIES_DEFAULT = 20; + std::uint32_t interp_bucket_entries; + + /*AI surriogate configuration*/ + bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate + bool disable_training = false; // Can be set in the R input script + bool use_clustering = false; // Can be set in the R input script + bool use_Keras_predictions = false; // Can be set in the R input script + bool train_only_invalid = false; // Can be set in the R input script + int batch_size = 2560; // default value determined in test on the UP Turing cluster + int training_epochs = 20;; // Can be set in the R input script + int training_data_size; // Can be set in the R input script + bool use_naa = true; + std::string save_model_path = ""; // Can be set in the R input script + std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake +}; diff --git a/src/poet.hpp.in b/src/poet.hpp.in index 9d343dccf..221b80001 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -31,6 +31,13 @@ #define SRC_DIR "@CMAKE_SOURCE_DIR@" #define CUDA_SRC_DIR "@CUDA_TOOLKIT_ROOT_DIR@" +#define USE_NAA "@USE_NAA@" + +// target and source IP address for NAA support +// #ifdef USE_NAA +#define SOURCE_IP "@SOURCE_IP@" +#define TARGET_IP "@TARGET_IP@" +// #endif static const char *poet_version = "@POET_VERSION@"; @@ -78,6 +85,7 @@ struct RuntimeParameters { int batch_size = 2560; // default value determined in test on the UP Turing cluster int training_epochs = 20;; // Can be set in the R input script int training_data_size; // Can be set in the R input script + bool use_naa = false; std::string save_model_path = ""; // Can be set in the R input script std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake };