prototype naa online training

This commit is contained in:
Hannes Signer 2025-01-08 17:17:06 +01:00
parent d6d6db8b93
commit ef77d755ab
7 changed files with 567 additions and 49 deletions

View File

@ -18,6 +18,9 @@ target_sources(POETLib
Chemistry/SurrogateModels/ProximityHashTable.cpp Chemistry/SurrogateModels/ProximityHashTable.cpp
) )
# add_compile_options(-fsanitize=address)
# add_link_options(-fsanitize=address)
target_include_directories( target_include_directories(
POETLib PUBLIC POETLib PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}"
@ -46,13 +49,9 @@ if(USE_AI_SURROGATE)
find_package(Threads REQUIRED) find_package(Threads REQUIRED)
set_source_files_properties(
Chemistry/SurrogateModels/AI_functions.cpp
PROPERTIES COMPILE_FLAGS "-O3 -march=native -mtune=native"
)
target_sources(POETLib target_sources(POETLib
PRIVATE PRIVATE
Chemistry/SurrogateModels/serializer.cpp
Chemistry/SurrogateModels/AI_functions.cpp Chemistry/SurrogateModels/AI_functions.cpp
) )
@ -64,6 +63,7 @@ if(USE_AI_SURROGATE)
target_link_libraries(POETLib target_link_libraries(POETLib
PUBLIC "${Python3_LIBRARIES}" PUBLIC "${Python3_LIBRARIES}"
PUBLIC Threads::Threads pthread PUBLIC Threads::Threads pthread
PUBLIC naaice::middleware
) )
endif() # USE_AI_SURROGATE endif() # USE_AI_SURROGATE
@ -139,7 +139,7 @@ configure_file(poet.hpp.in poet.hpp @ONLY)
add_executable(poet poet.cpp) add_executable(poet poet.cpp)
target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11 naaice::middleware) target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11)
target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
add_executable(poet_init initializer.cpp) add_executable(poet_init initializer.cpp)

View File

@ -83,7 +83,9 @@ public:
std::uint32_t interp_bucket_size; std::uint32_t interp_bucket_size;
std::uint32_t interp_size_mb; std::uint32_t interp_size_mb;
std::uint32_t interp_min_entries; std::uint32_t interp_min_entries;
// ? extra struct for AI surrogate options?
bool ai_surrogate_enabled; bool ai_surrogate_enabled;
bool naa_enabled;
}; };
void masterEnableSurrogates(const SurrogateSetup &setup) { void masterEnableSurrogates(const SurrogateSetup &setup) {

View File

@ -478,7 +478,7 @@ void Python_Keras_train(std::vector<std::vector<double>> &x,
} }
} }
// Choose the correct model to traimn if clustering is used // Choose the correct model to train if clustering is used
if (train_cluster == 1) { if (train_cluster == 1) {
if (!model_path.empty()) { if (!model_path.empty()) {
model_path.insert(model_path.length() - 6, "_reaction"); model_path.insert(model_path.length() - 6, "_reaction");
@ -637,6 +637,7 @@ void parallel_training(EigenModel *Eigen_model,
// Acquire the Python GIL // Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure(); PyGILState_STATE gstate = PyGILState_Ensure();
// Start training // Start training
Python_Keras_train(inputs, targets, train_cluster, model_name, params); Python_Keras_train(inputs, targets, train_cluster, model_name, params);
@ -669,17 +670,12 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
bool *start_training, bool *end_training, bool *start_training, bool *end_training,
const RuntimeParameters &params, naa_handle *handle){ const RuntimeParameters &params, naa_handle *handle){
fprintf(stdout, "In naa_training\n");
// initialize models with weights from pretrained keras model // initialize models with weights from pretrained keras model
// declare memory regions for model weights, training and target data // declare memory regions for model weights, training and target data
// Initialize training data input and targets
std::vector<std::vector<double>> inputs(
training_data_buffer->x.size(),
std::vector<double>(params.training_data_size));
std::vector<std::vector<double>> targets(
training_data_buffer->x.size(),
std::vector<double>(params.training_data_size));
PyGILState_STATE gstate = PyGILState_Ensure(); PyGILState_STATE gstate = PyGILState_Ensure();
Eigen_model_mutex->lock(); Eigen_model_mutex->lock();
@ -696,7 +692,15 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
Eigen_model_mutex->unlock(); Eigen_model_mutex->unlock();
PyGILState_Release(gstate); PyGILState_Release(gstate);
// determine size for reuired memory regions // Initialize training data input and targets
std::vector<std::vector<double>> inputs(
Eigen_model->biases[Eigen_model->biases.size()-1].size(), // number of species
std::vector<double>(params.training_data_size));
std::vector<std::vector<double>> targets(
Eigen_model->biases[Eigen_model->biases.size()-1].size(),
std::vector<double>(params.training_data_size));
// determine size for required memory regions
size_t modelSize = calculateStructSize(Eigen_model, 'E'); size_t modelSize = calculateStructSize(Eigen_model, 'E');
size_t modelSizeReactive = calculateStructSize(Eigen_model_reactive, 'E'); size_t modelSizeReactive = calculateStructSize(Eigen_model_reactive, 'E');
@ -723,14 +727,16 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
} }
// create memory regions // create memory regions
struct naa_param_t weight_region[] = { struct naa_param_t input_regions[] = {
{(void *)serializedModel, modelSize}, {(void *)serializedModel, modelSize},
{(void *)serializedTrainingData, trainingDataSize}, {(void *)serializedTrainingData, trainingDataSize},
{(void *)serializedTargetData, targetDataSize}}; {(void *)serializedTargetData, targetDataSize}};
struct naa_param_t output_regions[] = {{(void *)serializedModel, modelSize}};
printf("-- Setting Up Connection --\n"); printf("-- Setting Up Connection --\n");
// function code encode the used ai model // function code encode the used ai model
if (naa_create(1, weight_region, 1, weight_region, 0, handle)) { if (naa_create(1, input_regions, 3, output_regions, 1, handle)) {
fprintf(stderr, "Error during naa_create. Exiting.\n"); fprintf(stderr, "Error during naa_create. Exiting.\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -826,8 +832,62 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
} else { } else {
int res = serializeModelWeights(Eigen_model, serializedModel); int res = serializeModelWeights(Eigen_model, serializedModel);
} }
// checksum serializeModel
double checksum_model = 0;
for(size_t i = 0; i < Eigen_model->weight_matrices.size(); i++){
checksum_model += Eigen_model->weight_matrices[i].sum();
}
for(size_t j=0; j<Eigen_model->biases.size(); j++){
checksum_model += Eigen_model->biases[j].sum();
}
fprintf(stdout, "Checksum model: %f\n", checksum_model);
int res1 = serializeTrainingData(&inputs, serializedTrainingData); int res1 = serializeTrainingData(&inputs, serializedTrainingData);
int res2 = serializeTrainingData(&targets, serializedTargetData); int res2 = serializeTrainingData(&targets, serializedTargetData);
// std::vector<std::vector<double>> deserializedTrainingData = deserializeTrainingData(serializedTrainingData);
// std::vector<std::vector<double>> deserializedTargetData = deserializeTrainingData(serializedTargetData);
// calculate checksum of inputs
// double checksum_inputs = 0;
// for (size_t i = 0; i < inputs.size(); i++) {
// for (size_t j = 0; j < inputs[i].size(); j++) {
// checksum_inputs += inputs[i][j];
// // fprintf(stdout, "inputs: %f\n", inputs[i][j]);
// }
// }
// // calculate checksum of inputs
// double checksum_targets = 0;
// for (size_t i = 0; i < targets.size(); i++) {
// for (size_t j = 0; j < targets[i].size(); j++) {
// checksum_targets += targets[i][j];
// // fprintf(stdout, "inputs: %f\n", inputs[i][j]);
// }
// }
// double checksum_training = 0;
// for (size_t i = 0; i < deserializedTrainingData.size(); i++) {
// for (size_t j = 0; j < deserializedTrainingData[i].size(); j++) {
// checksum_training += deserializedTrainingData[i][j];
// // fprintf(stdout, "inputs: %f\n", deserializedTrainingData[i][j]);
// }
// }
// double checksum_testing = 0;
// for (size_t i = 0; i < deserializedTargetData.size(); i++) {
// for (size_t j = 0; j < deserializedTargetData[i].size(); j++) {
// checksum_testing += deserializedTargetData[i][j];
// // fprintf(stdout, "inputs: %f\n", deserializedTrainingData[i][j]);
// }
// }
// fprintf(stdout, "Checksum inputs: %f\n", checksum_inputs);
// fprintf(stdout, "Checksum training: %f\n", checksum_training);
// fprintf(stdout, "Checksum targets: %f\n", checksum_targets);
// fprintf(stdout, "Checksum testing: %f\n", checksum_testing);
printf("-- RPC Invocation --\n"); printf("-- RPC Invocation --\n");
if (naa_invoke(handle)) { if (naa_invoke(handle)) {
@ -835,22 +895,38 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// TODO: naa_wait with new weights // naa_wait with new weights
naa_status status;
if (naa_wait(handle, &status)) {
fprintf(stderr, "Error occurred during naa_wait. Exiting.\n");
exit(EXIT_FAILURE);
}
// TODO: update model weights with received weights printf("Bytes received: %d, RPC Return code: %d\n",
status.bytes_received, status.naa_error);
// update model weights with received weights
EigenModel deserializedModel = EigenModel deserializedModel =
deserializeModelWeights(serializedModel, modelSize); deserializeModelWeights(serializedModel, modelSize);
fprintf(stdout, "After deserialization: %f\n", fprintf(stdout, "After deserialization: %f\n",
deserializedModel.weight_matrices[0](0, 0)); deserializedModel.weight_matrices[0](0, 0));
for (int i = 0; i < Eigen_model->weight_matrices[0].rows(); i++) { Eigen_model_mutex->lock();
for (int j = 0; j < Eigen_model->weight_matrices[0].cols(); j++) {
fprintf(stdout, "model: %f, deserializedModel: %f\n", Eigen_model->weight_matrices = deserializedModel.weight_matrices;
Eigen_model->weight_matrices[0](i, j), Eigen_model->biases = deserializedModel.biases;
deserializedModel.weight_matrices[0](i, j));
} Eigen_model_mutex->unlock();
}
// for (int i = 0; i < Eigen_model->weight_matrices[0].rows(); i++) {
// for (int j = 0; j < Eigen_model->weight_matrices[0].cols(); j++) {
// fprintf(stdout, "model: %f, deserializedModel: %f\n",
// Eigen_model->weight_matrices[0](i, j),
// deserializedModel.weight_matrices[0](i, j));
// }
// }
} }
printf("-- Cleaning Up --\n"); printf("-- Cleaning Up --\n");
@ -917,7 +993,7 @@ int Python_Keras_training_thread(
* @param weights Vector of model weights from keras as returned by * @param weights Vector of model weights from keras as returned by
* Python_Keras_get_weights() * Python_Keras_get_weights()
*/ */
// ? check if updating was succesful -> hash about values? // ? check if updating was successful -> hash about values?
void update_weights( void update_weights(
EigenModel *model, EigenModel *model,
const std::vector<std::vector<std::vector<double>>> &weights) { const std::vector<std::vector<std::vector<double>>> &weights) {

View File

@ -35,7 +35,7 @@ size_t calculateStructSize(void *struct_pointer, char type){
} }
} else if (type == 'T') { } else if (type == 'T') {
struct_size += sizeof(size_t); // number of vectors struct_size += sizeof(size_t); // number of vectors
struct_size += sizeof(size_t); // length of vector struct_size += static_cast<std::vector<std::vector<double>>*>(struct_pointer)->size() * sizeof(size_t); // length of vector
for (const std::vector<double> &vector: for (const std::vector<double> &vector:
*static_cast<std::vector<std::vector<double>>*>(struct_pointer)){ *static_cast<std::vector<std::vector<double>>*>(struct_pointer)){
struct_size += vector.size() * sizeof(double); struct_size += vector.size() * sizeof(double);
@ -56,7 +56,6 @@ int serializeModelWeights(const EigenModel *model, char *memory){
size_counter += sizeof(size_t); size_counter += sizeof(size_t);
for (const Eigen::MatrixXd &matrix : model->weight_matrices) { for (const Eigen::MatrixXd &matrix : model->weight_matrices) {
size_t rows = matrix.rows(), cols = matrix.cols(); size_t rows = matrix.rows(), cols = matrix.cols();
fprintf(stdout, "rows: %zu, cols: %zu\n", rows, cols);
std::memcpy(memory, &rows, sizeof(size_t)); std::memcpy(memory, &rows, sizeof(size_t));
memory += sizeof(size_t); memory += sizeof(size_t);
size_counter += sizeof(size_t); size_counter += sizeof(size_t);
@ -125,9 +124,8 @@ EigenModel deserializeModelWeights(char *memory, size_t buffer_size) {
} }
// interpret memory as Eigen::MatrixXd (more efficient than memcpy?) // interpret memory as Eigen::MatrixXd (more efficient than memcpy?)
Eigen::MatrixXd temp = Eigen::Map<Eigen::MatrixXd>( matrix = Eigen::Map<Eigen::MatrixXd>(
reinterpret_cast<double*>(memory), rows, cols); reinterpret_cast<double*>(memory), rows, cols);
matrix = temp; // copy data to matrix
memory += rows * cols * sizeof(double); memory += rows * cols * sizeof(double);
size_counter += rows * cols * sizeof(double); size_counter += rows * cols * sizeof(double);
} }
@ -161,7 +159,6 @@ EigenModel deserializeModelWeights(char *memory, size_t buffer_size) {
} }
// same procedure as for the matrices // same procedure as for the matrices
// TODO: delete temp variable
Eigen::VectorXd temp = Eigen::Map<Eigen::VectorXd>( Eigen::VectorXd temp = Eigen::Map<Eigen::VectorXd>(
reinterpret_cast<double*>(memory), size); reinterpret_cast<double*>(memory), size);
bias = temp; // Kopieren der Daten bias = temp; // Kopieren der Daten
@ -173,14 +170,14 @@ EigenModel deserializeModelWeights(char *memory, size_t buffer_size) {
} }
int serializeTrainingData(std::vector<std::vector<double>> data, char *memory){ int serializeTrainingData(std::vector<std::vector<double>> *data, char *memory){
size_t num_vectors = data.size(); size_t num_vectors = data->size();
std::memcpy(memory, &num_vectors, sizeof(size_t)); std::memcpy(memory, &num_vectors, sizeof(size_t));
memory += sizeof(size_t); memory += sizeof(size_t);
for (const std::vector<double> &vector : data) { for (const std::vector<double> &vector : *data) {
size_t size = vector.size(); size_t size = vector.size();
std::memcpy(memory, &size, sizeof(size_t)); std::memcpy(memory, &size, sizeof(size_t));
memory += sizeof(size_t); memory += sizeof(size_t);
@ -196,6 +193,7 @@ std::vector<std::vector<double>> deserializeTrainingData(char* data){
std::vector<std::vector<double>> deserialized_data; std::vector<std::vector<double>> deserialized_data;
size_t num_vectors; size_t num_vectors;
std::memcpy(&num_vectors, data, sizeof(size_t)); std::memcpy(&num_vectors, data, sizeof(size_t));
fprintf(stdout, "num_vectors: %zu\n", num_vectors);
data += sizeof(size_t); data += sizeof(size_t);
for (size_t i = 0; i < num_vectors; i++) { for (size_t i = 0; i < num_vectors; i++) {
@ -205,6 +203,7 @@ std::vector<std::vector<double>> deserializeTrainingData(char* data){
std::vector<double> vector(size); std::vector<double> vector(size);
std::memcpy(vector.data(), data, size * sizeof(double)); std::memcpy(vector.data(), data, size * sizeof(double));
data += size * sizeof(double); data += size * sizeof(double);
deserialized_data.push_back(vector); deserialized_data.push_back(vector);

View File

@ -33,6 +33,7 @@
#include <Rcpp/DataFrame.h> #include <Rcpp/DataFrame.h>
#include <Rcpp/Function.h> #include <Rcpp/Function.h>
#include <Rcpp/vector/instantiation.h> #include <Rcpp/vector/instantiation.h>
#include <csignal>
#include <cstdint> #include <cstdint>
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
@ -46,9 +47,7 @@
#include <vector> #include <vector>
#include <stdio.h> #include <stdio.h>
extern "C"{
#include <naaice.h>
}
using namespace std; using namespace std;
using namespace poet; using namespace poet;
@ -77,6 +76,11 @@ static void init_global_functions(RInside &R) {
SaveRObj_R = DEFunc("SaveRObj"); SaveRObj_R = DEFunc("SaveRObj");
} }
void signalHandler(int signum) {
fprintf(stderr, "Terminate program by user.\n");
exit(signum);
}
// HACK: this is a step back as the order and also the count of fields is // HACK: this is a step back as the order and also the count of fields is
// predefined, but it will change in the future // predefined, but it will change in the future
@ -310,18 +314,30 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
std::vector<int> cluster_labels; std::vector<int> cluster_labels;
if (params.use_ai_surrogate) { if (params.use_ai_surrogate) {
MSG("AI: Initialize model"); MSG("AI: Initialize model");
struct naa_handle *handle = NULL;
if(params.use_naa){
handle = (naa_handle*) calloc(1, sizeof(struct naa_handle));
if(!handle){
throw std::runtime_error("Could not allocate memory for NAA handle");
}
// TODO: create regions here (size could be determined from run_params (size of trainings buffer))
struct naa_param_t *model_weights;
}
// Initiate two models from one file // Initiate two models from one file
Python_Keras_load_model(R["model_file_path"], R["model_reactive_file_path"], Python_Keras_load_model(R["model_file_path"], R["model_reactive_file_path"],
params.use_clustering); params.use_clustering);
if (!params.disable_training) { if (!params.disable_training) {
MSG("AI: Initialize training thread"); MSG("AI: Initialize training thread");
// TODO add naa_handle as optional parameter which is NULL per default // TODO add naa_handle as optional parameter which is NULL per default
Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive, Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive,
&Eigen_model_mutex, &training_data_buffer, &Eigen_model_mutex, &training_data_buffer,
&training_data_buffer_mutex, &training_data_buffer_mutex,
&training_data_buffer_full, &start_training, &training_data_buffer_full, &start_training,
&end_training, params); &end_training, params, handle);
} }
if (!params.use_Keras_predictions) { if (!params.use_Keras_predictions) {
// Initialize Eigen model for custom inference function // Initialize Eigen model for custom inference function
@ -390,7 +406,6 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
R["TMP"] = Rcpp::wrap(chem.getField().AsVector()); R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(std::string("predictors <- ") + R.parseEval(std::string("predictors <- ") +
"set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)"); "set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)");
// Apply preprocessing // Apply preprocessing
MSG("AI Preprocessing"); MSG("AI Preprocessing");
R.parseEval("predictors_scaled <- preprocess(predictors)"); R.parseEval("predictors_scaled <- preprocess(predictors)");
@ -425,6 +440,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// Validate prediction and write valid predictions to chem field // Validate prediction and write valid predictions to chem field
MSG("AI: Validate"); MSG("AI: Validate");
R.parseEval("validity_vector <- validate_predictions(predictors, predictions)"); R.parseEval("validity_vector <- validate_predictions(predictors, predictions)");
// R.parseEval("print(head(predictors, 30))");
MSG("AI: Marking valid"); MSG("AI: Marking valid");
chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector")); chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector"));
@ -472,6 +488,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)"); R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
R.parseEval("target_scaled <- preprocess(state_C[ai_surrogate_species])"); R.parseEval("target_scaled <- preprocess(state_C[ai_surrogate_species])");
// R.parseEval("print(head(state_C[ai_surrogate_species], 30))");
std::vector<std::vector<double>> invalid_y = std::vector<std::vector<double>> invalid_y =
R.parseEval("get_invalid_values(target_scaled, validity_vector)"); R.parseEval("get_invalid_values(target_scaled, validity_vector)");
@ -599,6 +616,8 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
signal(SIGINT, signalHandler);
int world_size; int world_size;
// Threadsafe MPI is necessary for the AI surrogate // Threadsafe MPI is necessary for the AI surrogate
@ -656,9 +675,11 @@ int main(int argc, char *argv[]) {
run_params.interp_bucket_entries, run_params.interp_bucket_entries,
run_params.interp_size, run_params.interp_size,
run_params.interp_min_entries, run_params.interp_min_entries,
run_params.use_ai_surrogate}; run_params.use_ai_surrogate,
run_params.use_naa};
// TODO add option for naa training // TODO add option for naa training
chemistry.masterEnableSurrogates(surr_setup); chemistry.masterEnableSurrogates(surr_setup);
if (MY_RANK > 0) { if (MY_RANK > 0) {
@ -685,6 +706,7 @@ int main(int argc, char *argv[]) {
chemistry.masterSetField(init_list.getInitialGrid()); chemistry.masterSetField(init_list.getInitialGrid());
if (run_params.use_ai_surrogate) { if (run_params.use_ai_surrogate) {
// Load default function implementations // Load default function implementations
R.parseEvalQ(ai_surrogate_r_library); R.parseEvalQ(ai_surrogate_r_library);
/* Use dht species for model input and output */ /* Use dht species for model input and output */
@ -704,7 +726,6 @@ int main(int argc, char *argv[]) {
variable of the same name in one of the the R input scripts)*/ variable of the same name in one of the the R input scripts)*/
run_params.training_data_size = init_list.getDiffusionInit().n_rows * run_params.training_data_size = init_list.getDiffusionInit().n_rows *
init_list.getDiffusionInit().n_cols; // Default value is number of cells in field init_list.getDiffusionInit().n_cols; // Default value is number of cells in field
// ? error handling for all if statements
if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) { if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) {
run_params.batch_size = R["batch_size"]; run_params.batch_size = R["batch_size"];
} }

412
src/poet.hpp Normal file
View File

@ -0,0 +1,412 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** Copyright (C) 2023-2024 Max Luebke (University of Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#pragma once
#include <cstdint>
#include <set>
#include <string>
#include <vector>
#include <Rcpp.h>
#define SRC_DIR "/mnt/scratch/signer/poet"
#define CUDA_SRC_DIR ""
#define USE_NAA "USE_NAA;ON"
// target and source IP address for NAA support
// #ifdef USE_NAA
#define SOURCE_IP "10.3.10.41"
#define TARGET_IP "10.3.10.42"
// #endif
static const char *poet_version = "naaice/v0.3-105-g13ad41d";
// using the Raw string literal to avoid escaping the quotes
static const inline std::string kin_r_library = R"(### Copyright (C) 2018-2023 Marco De Lucia, Max Luebke (GFZ Potsdam)
###
### POET is free software; you can redistribute it and/or modify it under the
### terms of the GNU General Public License as published by the Free Software
### Foundation; either version 2 of the License, or (at your option) any later
### version.
###
### POET is distributed in the hope that it will be useful, but WITHOUT ANY
### WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
### A PARTICULAR PURPOSE. See the GNU General Public License for more details.
###
### You should have received a copy of the GNU General Public License along with
### this program; if not, write to the Free Software Foundation, Inc., 51
### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
master_init <- function(setup, out_dir, init_field) {
## Setup the directory where we will store the results
if (!dir.exists(out_dir)) {
dir.create(out_dir)
msgm("created directory ", out_dir)
} else {
msgm("dir ", out_dir, " already exists, I will overwrite!")
}
if (is.null(setup$store_result)) {
msgm("store_result doesn't exist!")
} else {
msgm("store_result is ", setup$store_result)
}
setup$iter <- 1
setup$timesteps <- setup$timesteps
setup$maxiter <- length(setup$timesteps)
setup$iterations <- setup$maxiter
setup$simulation_time <- 0
dgts <- as.integer(ceiling(log10(setup$maxiter)))
## string format to use in sprintf
fmt <- paste0("%0", dgts, "d")
if (is.null(setup[["store_result"]])) {
setup$store_result <- TRUE
}
if (setup$store_result) {
init_field_out <- paste0(out_dir, "/iter_", sprintf(fmt = fmt, 0), ".", setup$out_ext)
init_field <- data.frame(init_field, check.names = FALSE)
SaveRObj(x = init_field, path = init_field_out)
msgm("Stored initial field in ", init_field_out)
if (is.null(setup[["out_save"]])) {
setup$out_save <- seq(1, setup$iterations)
}
}
setup$out_dir <- out_dir
return(setup)
}
## This function, called only by master, stores on disk the last
## calculated time step if store_result is TRUE and increments the
## iteration counter
master_iteration_end <- function(setup, state_T, state_C) {
iter <- setup$iter
# print(iter)
## max digits for iterations
dgts <- as.integer(ceiling(log10(setup$maxiter + 1)))
## string format to use in sprintf
fmt <- paste0("%0", dgts, "d")
## Write on disk state_T and state_C after every iteration
## comprised in setup$out_save
if (setup$store_result) {
if (iter %in% setup$out_save) {
nameout <- paste0(setup$out_dir, "/iter_", sprintf(fmt = fmt, iter), ".", setup$out_ext)
state_T <- data.frame(state_T, check.names = FALSE)
state_C <- data.frame(state_C, check.names = FALSE)
ai_surrogate_info <- list(
prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL,
predictions_validity = if (exists("validity_vector")) validity_vector else NULL,
cluster_labels = if (exists("cluster_labels")) cluster_labels else NULL,
predictions = if (exists("predictions")) predictions else NULL
)
SaveRObj(x = list(
T = state_T,
C = state_C,
simtime = as.integer(setup$simulation_time),
totaltime = as.integer(totaltime),
ai_surrogate_info = ai_surrogate_info
), path = nameout)
msgm("results stored in <", nameout, ">")
}
}
## Add last time step to simulation time
setup$simulation_time <- setup$simulation_time + setup$timesteps[iter]
msgm("done iteration", iter, "/", length(setup$timesteps))
setup$iter <- setup$iter + 1
return(setup)
}
## Attach the name of the calling function to the message displayed on
## R's stdout
msgm <- function(...) {
prefix <- paste0("R: ")
cat(paste(prefix, ..., "\n"))
invisible()
}
## Function called by master R process to store on disk all relevant
## parameters for the simulation
StoreSetup <- function(setup, filesim, out_dir) {
to_store <- vector(mode = "list", length = 4)
## names(to_store) <- c("Sim", "Flow", "Transport", "Chemistry", "DHT")
names(to_store) <- c("Sim", "Transport", "DHT", "Cmdline")
## read the setup R file, which is sourced in kin.cpp
tmpbuff <- file(filesim, "r")
setupfile <- readLines(tmpbuff)
close.connection(tmpbuff)
to_store$Sim <- setupfile
## to_store$Flow <- list(
## snapshots = setup$snapshots,
## gridfile = setup$gridfile,
## phase = setup$phase,
## density = setup$density,
## dt_differ = setup$dt_differ,
## prolong = setup$prolong,
## maxiter = setup$maxiter,
## saved_iter = setup$iter_output,
## out_save = setup$out_save )
to_store$Transport <- setup$diffusion
## to_store$Chemistry <- list(
## nprocs = n_procs,
## wp_size = work_package_size,
## base = setup$base,
## first = setup$first,
## init = setup$initsim,
## db = db,
## kin = setup$kin,
## ann = setup$ann)
if (dht_enabled) {
to_store$DHT <- list(
enabled = dht_enabled,
log = dht_log
## signif = dht_final_signif,
## proptype = dht_final_proptype
)
} else {
to_store$DHT <- FALSE
}
if (dht_enabled) {
to_store$DHT <- list(
enabled = dht_enabled,
log = dht_log
# signif = dht_final_signif,
# proptype = dht_final_proptype
)
} else {
to_store$DHT <- FALSE
}
saveRDS(to_store, file = paste0(fileout, "/setup.rds"))
msgm("initialization stored in ", paste0(fileout, "/setup.rds"))
}
GetWorkPackageSizesVector <- function(n_packages, package_size, len) {
ids <- rep(1:n_packages, times = package_size, each = 1)[1:len]
return(as.integer(table(ids)))
}
## Handler to read R objs from binary files using either builtin
## readRDS() or qs::qread() based on file extension
ReadRObj <- function(path) {
## code borrowed from tools::file_ext()
pos <- regexpr("\\.([[:alnum:]]+)$", path)
extension <- ifelse(pos > -1L, substring(path, pos + 1L), "")
switch(extension,
rds = readRDS(path),
qs = qs::qread(path)
)
}
## Handler to store R objs to binary files using either builtin
## saveRDS() or qs::qsave() based on file extension
SaveRObj <- function(x, path) {
msgm("Storing to", path)
## code borrowed from tools::file_ext()
pos <- regexpr("\\.([[:alnum:]]+)$", path)
extension <- ifelse(pos > -1L, substring(path, pos + 1L), "")
switch(extension,
rds = saveRDS(object = x, file = path),
qs = qs::qsave(x = x, file = path)
)
}
)";
static const inline std::string init_r_library = R"(### Copyright (C) 2018-2024 Marco De Lucia, Max Luebke (GFZ Potsdam, University of Potsdam)
###
### POET is free software; you can redistribute it and/or modify it under the
### terms of the GNU General Public License as published by the Free Software
### Foundation; either version 2 of the License, or (at your option) any later
### version.
###
### POET is distributed in the hope that it will be useful, but WITHOUT ANY
### WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
### A PARTICULAR PURPOSE. See the GNU General Public License for more details.
###
### You should have received a copy of the GNU General Public License along with
### this program; if not, write to the Free Software Foundation, Inc., 51
### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##' @param pqc_mat matrix, containing IDs and PHREEQC outputs
##' @param grid matrix, zonation referring to pqc_mat$ID
##' @return a data.frame
pqc_to_grid <- function(pqc_mat, grid) {
# Convert the input DataFrame to a matrix
pqc_mat <- as.matrix(pqc_mat)
# Flatten the matrix into a vector
id_vector <- as.integer(t(grid))
# Find the matching rows in the matrix
row_indices <- match(id_vector, pqc_mat[, "ID"])
# Extract the matching rows from pqc_mat to size of grid matrix
result_mat <- pqc_mat[row_indices, ]
# Convert the result matrix to a data frame
res_df <- as.data.frame(result_mat)
# Remove all columns which only contain NaN
res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)]
# Remove row names
rownames(res_df) <- NULL
return(res_df)
}
##' @param pqc_mat matrix,
##' @param transport_spec column name of species in pqc_mat
##' @param id
##' @title
##' @return
resolve_pqc_bound <- function(pqc_mat, transport_spec, id) {
df <- as.data.frame(pqc_mat, check.names = FALSE)
value <- df[df$ID == id, transport_spec]
if (is.nan(value)) {
value <- 0
}
return(value)
}
##' @title
##' @param init_grid
##' @param new_names
##' @return
add_missing_transport_species <- function(init_grid, new_names) {
# add 'ID' to new_names front, as it is not a transport species but required
new_names <- c("ID", new_names)
sol_length <- length(new_names)
new_grid <- data.frame(matrix(0, nrow = nrow(init_grid), ncol = sol_length))
names(new_grid) <- new_names
matching_cols <- intersect(names(init_grid), new_names)
# Copy matching columns from init_grid to new_grid
new_grid[, matching_cols] <- init_grid[, matching_cols]
# Add missing columns to new_grid
append_df <- init_grid[, !(names(init_grid) %in% new_names)]
new_grid <- cbind(new_grid, append_df)
return(new_grid)
}
)";
static const inline std::string ai_surrogate_r_library =
R"(## This file contains default function implementations for the ai surrogate.
## To use pre-/postprocessing it is recommended to override these functions
## with custom implementations via the input script. The path to the R-file
## See the barite_50.R file as an example and the general README for more
## information.
preprocess <- function(df) {
return(df)
}
postprocess <- function(df) {
return(df)
}
set_valid_predictions <- function(temp_field, prediction, validity) {
temp_field[validity == 1, ] <- prediction[validity == 1, ]
return(temp_field)
}
get_invalid_values <- function(df, validity) {
return(df[validity == 0, ])
}
set_field <- function(temp_field, columns, rows, column_name_limit,
byrow = FALSE) {
temp_field <- matrix(temp_field, nrow = rows, byrow = byrow)
temp_field <- setNames(data.frame(temp_field), columns)
temp_field <- temp_field[column_name_limit]
return(temp_field)
}
)";
static const inline std::string r_runtime_parameters = "mysetup";
struct RuntimeParameters {
std::string out_dir;
std::vector<double> timesteps;
Rcpp::List init_params;
bool as_rds = false;
std::string out_ext; // MDL added to accomodate for qs::qsave/qread
bool print_progress = false;
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
std::uint32_t work_package_size;
bool use_dht = false;
static constexpr std::uint32_t DHT_SIZE_DEFAULT = 1.5E3;
std::uint32_t dht_size;
static constexpr std::uint8_t DHT_SNAPS_DEFAULT = 0;
std::uint8_t dht_snaps;
bool use_interp = false;
static constexpr std::uint32_t INTERP_SIZE_DEFAULT = 100;
std::uint32_t interp_size;
static constexpr std::uint32_t INTERP_MIN_ENTRIES_DEFAULT = 5;
std::uint32_t interp_min_entries;
static constexpr std::uint32_t INTERP_BUCKET_ENTRIES_DEFAULT = 20;
std::uint32_t interp_bucket_entries;
/*AI surriogate configuration*/
bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate
bool disable_training = false; // Can be set in the R input script
bool use_clustering = false; // Can be set in the R input script
bool use_Keras_predictions = false; // Can be set in the R input script
bool train_only_invalid = false; // Can be set in the R input script
int batch_size = 2560; // default value determined in test on the UP Turing cluster
int training_epochs = 20;; // Can be set in the R input script
int training_data_size; // Can be set in the R input script
bool use_naa = true;
std::string save_model_path = ""; // Can be set in the R input script
std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake
};

View File

@ -31,6 +31,13 @@
#define SRC_DIR "@CMAKE_SOURCE_DIR@" #define SRC_DIR "@CMAKE_SOURCE_DIR@"
#define CUDA_SRC_DIR "@CUDA_TOOLKIT_ROOT_DIR@" #define CUDA_SRC_DIR "@CUDA_TOOLKIT_ROOT_DIR@"
#define USE_NAA "@USE_NAA@"
// target and source IP address for NAA support
// #ifdef USE_NAA
#define SOURCE_IP "@SOURCE_IP@"
#define TARGET_IP "@TARGET_IP@"
// #endif
static const char *poet_version = "@POET_VERSION@"; static const char *poet_version = "@POET_VERSION@";
@ -78,6 +85,7 @@ struct RuntimeParameters {
int batch_size = 2560; // default value determined in test on the UP Turing cluster int batch_size = 2560; // default value determined in test on the UP Turing cluster
int training_epochs = 20;; // Can be set in the R input script int training_epochs = 20;; // Can be set in the R input script
int training_data_size; // Can be set in the R input script int training_data_size; // Can be set in the R input script
bool use_naa = false;
std::string save_model_path = ""; // Can be set in the R input script std::string save_model_path = ""; // Can be set in the R input script
std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake
}; };