From d825f33b4f00c96c20d27a82ec5991e8c500bf26 Mon Sep 17 00:00:00 2001 From: Hannes Signer Date: Wed, 10 Dec 2025 19:52:00 +0100 Subject: [PATCH] update submodule --- ext/ai-surrogate-poet | 2 +- src/poet.cpp | 110 ++++++++++++++++++++++++++++-------------- src/poet.hpp.in | 6 ++- 3 files changed, 79 insertions(+), 39 deletions(-) diff --git a/ext/ai-surrogate-poet b/ext/ai-surrogate-poet index 2dd2b8881..68c643bd0 160000 --- a/ext/ai-surrogate-poet +++ b/ext/ai-surrogate-poet @@ -1 +1 @@ -Subproject commit 2dd2b8881d6fe27b08a259d48ee8bca6188f049a +Subproject commit 68c643bd0b9dfd02d3d9eb7408621d48544937c9 diff --git a/src/poet.cpp b/src/poet.cpp index 08250387e..9d89f18a7 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -46,7 +46,6 @@ #include #include -#include #include #include @@ -166,6 +165,9 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) { app.add_flag("-c,--copy-non-reactive", params.copy_non_reactive_regions, "Copy non-reactive regions instead of computing them"); + app.add_option("-f,--fn", params.function_code, "Function code for the NAA") + ->check(CLI::PositiveNumber); + app.add_flag("--rds", params.as_rds, "Save output as .rds file instead of default .qs2"); @@ -311,8 +313,14 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, if (params.ai) { - ai_ctx = std::make_unique( - "/mnt/scratch/signer/poet/bench/barite/barite_trained.weights.h5"); + if (params.function_code == 1) { + ai_ctx = std::make_unique( + "./bench/barite/barite_trained.weights.h5"); + } else if (params.function_code == 2) { + ai_ctx = std::make_unique( + "./bench/dolo/dolomite_trained.weights.h5"); + } + R.parseEval( "mean <- as.numeric(standard$mean[ai_surrogate_species_output])"); R.parseEval( @@ -326,8 +334,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, // initialzie training backens only if retraining is desired if (params.ai_backend == PYTHON_BACKEND) { MSG("AI Surrogate with Python/keras backend enabled.") - ai_ctx->training_backend = - std::make_unique>(4 * params.batch_size); + std::cerr << "Not implemented" << std::endl; } else if (params.ai_backend == NAA_BACKEND) { MSG("AI Surrogate with NAA backend enabled.") ai_ctx->training_backend = @@ -339,7 +346,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, ai_ctx->design_buffer, ai_ctx->results_buffer, ai_ctx->model, ai_ctx->meta_params, ai_ctx->scaler, ai_ctx->data_semaphore_write, ai_ctx->data_semaphore_read, ai_ctx->model_semaphore, - ai_ctx->training_is_running, 1); + ai_ctx->training_is_running, params.function_code); } } @@ -361,6 +368,21 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, /* run transport */ diffusion.simulate(dt); + // validity_vector: + // vector with length = number of grid cells (1 indicates + // that values are copied into the next time step (no chemical reaction) + // or the ai surrogate prediction was good enoug. In both cases the + // workers skip the exact simulation with PHREEQC) predictor_idx + // ai_validity_vector + // predictors: + // data frame with elements that are used as input for the ai surrogate + // model. The data frame can be smaller than the grid size if + // copy_non_reactive_regions option is enabled. In this case only the + // reactive data are used for ai prediction. + // targets: + // data frame with elements that are used as ai outputs for the + // retraining step. + if (params.ai || params.copy_non_reactive_regions) { chem.getField().update(diffusion.getField()); @@ -370,46 +392,58 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::string("field <- setNames(data.frame(matrix(TMP, nrow=" + std::to_string(chem.getField().GetRequestedVecSize()) + ")), TMP_PROPS)")); + R.parseEval("print(head(field))"); R.parseEval("validity_vector <- rep(FALSE, nrow(field))"); + R.parseEval("length(validity_vector)"); + + R.parseEval("print(length(validity_vector))"); + if (params.copy_non_reactive_regions) { - R.parseEval("validity_vector <- field$Cl < 1e-14"); + R.parseEval( + "validity_vector <- field[[threshold$species]] < threshold$value"); } } - // MSG("Chemistry start"); + MSG("Chemistry start"); if (params.ai) { double ai_start_t = MPI_Wtime(); // deep copy field - R.parseEval("predictors <- data.frame(field)"); + R.parseEval("predictors <- field"); // get only ai related species + R.parseEval("print(head(predictors))"); + R.parseEval("print(ai_surrogate_species_input)"); R.parseEval("predictors <- predictors[ai_surrogate_species_input]"); // remove already copied values R.parseEval("predictors <- predictors[!validity_vector,]"); - R.parseEval( - "print(paste('Length of predictors:', length(predictors$H)))"); - // store row names of predictors R.parseEval("predictor_idx <- row.names(predictors)"); - R.parseEval("print(head(predictors))"); R.parseEval("predictors_scaled <- preprocess(predictors)"); + + R.parseEval("print(head(predictors_scaled))"); std::vector> predictors_scaled = R["predictors_scaled"]; std::vector predictions_scaled = ai_ctx->model.predict(predictors_scaled, params.batch_size, ai_ctx->model_semaphore); // features per cell - int n_samples = R.parseEval("nrow(predictors)"); int n_output_features = ai_ctx->model.weight_matrices.back().cols(); - std::cout << "n_output_features: " << n_output_features << std::endl; std::vector predictions_scaled_double(predictions_scaled.begin(), predictions_scaled.end()); + + std::cout << "First elements of predictions_scaled_double: "; + for (size_t i = 0; + i < std::min(size_t(10), predictions_scaled_double.size()); i++) { + std::cout << predictions_scaled_double[i] << " "; + } + std::cout << std::endl; + R["TMP"] = predictions_scaled_double; R["n_samples"] = n_samples; R["n_output"] = n_output_features; @@ -418,15 +452,14 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, "nrow=n_samples, ncol=n_output, byrow=TRUE)), " "ai_surrogate_species_output)"); // R.parseEval("print(head(predictions_scaled))"); + // R.parseEval("print(head(predictions_scaled))"); R.parseEval("predictions <- postprocess(predictions_scaled)"); - // R.parseEval("print(head(predictions))"); MSG("AI Validation"); R.parseEval("ai_validity_vector <- validate_predictions(predictors, " "predictions) "); - R.parseEval("print(length(predictor_idx))"); R.parseEval("print(length(ai_validity_vector))"); // get only indices where prediction was valid @@ -434,9 +467,9 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, // set in global validity vector all elements to true, where prediction // was possible - R.parseEval("validity_vector[predictor_idx] <- TRUE"); + R.parseEval("validity_vector[as.numeric(predictor_idx)] <- TRUE"); - R.parseEval("print(head(validity_vector))"); + R.parseEval("print(length(validity_vector))"); MSG("AI TempField"); // maybe row.names was overwritten by function calls ?? @@ -444,7 +477,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, // subset predictions to ai_validity_vector == TRUE R.parseEval("predictions <- predictions[ai_validity_vector,]"); // merge predicted values into field stored in R - R.parseEval("field[row.names(predictions),ai_surrogate_species_output] " + R.parseEval("field[as.numeric(row.names(predictions)),ai_surrogate_" + "species_output] " "<- predictions"); MSG("AI Set Field"); @@ -461,9 +495,9 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, if (params.copy_non_reactive_regions || params.ai) { MSG("Set copied or predicted values for the workers"); - R.parseEval( "print(paste('Number of valid cells:', sum(validity_vector)))"); + R.parseEval("print(head(validity_vector))"); chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector")); } @@ -478,18 +512,21 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::string("targets <- setNames(data.frame(matrix(TMP, nrow=" + std::to_string(chem.getField().GetRequestedVecSize()) + ")), TMP_PROPS)")); - + R.parseEval("print(paste('Length of validity_vector:', " + "length(ai_validity_vector)))"); R.parseEval("predictors_retraining <- " "get_invalid_values(predictors_scaled, ai_validity_vector)"); - R.parseEval("print(head(predictors_retraining))"); - R.parseEval("targets <- targets[predictor_idx, ]"); - R.parseEval("targets_retraining <- " - "get_invalid_values(targets[ai_surrogate_species_output], " - "ai_validity_vector)"); + R.parseEval("targets <- " + "targets[as.numeric(row.names(predictors_retraining)), " + "ai_surrogate_species_output]"); + R.parseEval("print(length(predictors_scaled$H))"); R.parseEval("print(length(ai_validity_vector))"); - R.parseEval("targets_retraining <- preprocess(targets_retraining)"); + R.parseEval("targets_retraining <- preprocess(targets)"); + + R.parseEval("print(head(predictors_retraining))"); + R.parseEval("print(head(targets_retraining))"); std::vector> predictors_retraining = R["predictors_retraining"]; @@ -505,9 +542,11 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::cout << "size of targets " << targets_retraining[0].size() << std::endl; - ai_ctx->design_buffer.addData(predictors_retraining); - ai_ctx->results_buffer.addData(targets_retraining); - + if (predictors_retraining[0].size() > 0 && + targets_retraining[0].size() > 0) { + ai_ctx->design_buffer.addData(predictors_retraining); + ai_ctx->results_buffer.addData(targets_retraining); + } size_t elements_design_buffer = ai_ctx->design_buffer.getSize() / (predictors_retraining.size() * sizeof(float)); @@ -520,9 +559,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::cout << "results_buffer_size: " << elements_results_buffer << std::endl; - if (elements_design_buffer >= - 20 * params.batch_size && // TODO: change to 4 * grid_size - elements_results_buffer >= 20 * params.batch_size && + if (elements_design_buffer >= 4 * params.batch_size && + elements_results_buffer >= 4 * params.batch_size && ai_ctx->training_is_running == false) { ai_ctx->data_semaphore_read.release(); } else { @@ -553,7 +591,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::cout << std::endl; - if (!params.disable_retraining) { + if (params.ai && !params.disable_retraining) { ai_ctx->training_backend->stop_training(ai_ctx->data_semaphore_read); } @@ -769,7 +807,7 @@ int main(int argc, char *argv[]) { R["out_ext"] = run_params.out_ext; R["out_dir"] = run_params.out_dir; - if (run_params.ai) { + if (run_params.ai || run_params.copy_non_reactive_regions) { /* Incorporate ai surrogate from R */ R.parseEvalQ(ai_surrogate_r_library); /* Use dht species for model input and output */ diff --git a/src/poet.hpp.in b/src/poet.hpp.in index ab7a4b55b..4b1c3c71f 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -84,9 +84,11 @@ struct RuntimeParameters { bool ai = false; bool disable_retraining = false; static constexpr std::uint8_t AI_BACKEND_DEFAULT = 1; - std::uint8_t ai_backend = 1; // 1 - python, 2 - naa + std::uint8_t ai_backend = AI_BACKEND_DEFAULT; // 1 - python, 2 - naa bool train_only_invalid = true; - int batch_size = 1000; + int batch_size = 200 * 200; + static constexpr std::uint8_t DEFAULT_FUNCTION_CODE = 0; + std::uint8_t function_code = DEFAULT_FUNCTION_CODE; static constexpr bool COPY_NON_REACTIVE_REGIONS = false; bool copy_non_reactive_regions = COPY_NON_REACTIVE_REGIONS;