Compare commits

...

54 Commits

Author SHA1 Message Date
straile
9df43999b0 docs: fix typo 2024-11-22 09:38:27 +01:00
straile
efdeccce17 docs: typo 2024-11-10 12:31:57 +01:00
straile
fc689383d4 feat: remove sklearn k means 2024-11-09 16:26:00 +01:00
straile
f648f618de docs: add info about ReLU and C++ inference 2024-11-08 19:18:17 +01:00
straile
8adeffa315 refactor: remove obsolete elements from EigenModel struct 2024-11-08 12:53:59 +01:00
straile
f64f8fa465 fix: declaration of validity vector 2024-11-04 17:18:41 +01:00
straile
3894cd4165 refactor: setting r variables 2024-11-04 16:11:14 +01:00
straile
47c9ffff2b test: add Charge 2024-11-04 16:01:32 +01:00
straile
c4d6197e9b fix: merge issues 2024-11-04 15:56:09 +01:00
straile
8551e07450 fix: heap issues in R 2024-11-04 15:46:58 +01:00
straile
110d5c810b fix: roll back to functioning state 2024-11-04 15:40:41 +01:00
straile
c4da86ef98 fix: dynamic training buffer column number 2024-11-03 18:02:32 +01:00
straile
1c4b949ce9 feat: cluster labels from R function 2024-11-02 17:37:15 +01:00
straile
81723f81f8 feat: manipulate ai_surrogate_species 2024-11-02 17:08:22 +01:00
straile
4d5a7aadfb test: remove species for ai 2024-11-01 19:56:00 +01:00
straile
b32927cff0 refactor: be more consistent with int and size_t 2024-10-31 19:56:14 +01:00
straile
9091117e67 docs: add k-means user script variables to readme 2024-10-30 16:51:20 +01:00
straile
8062e7528b docs: add k-means user script variables to readme 2024-10-30 16:48:00 +01:00
straile
bf5501867b fix: typo in function stub for compilation without -DUSE_AI_SURROGATE 2024-10-27 22:25:14 +01:00
straile
062cdb5256 feat: K-Means option fully implemented 2024-10-27 18:12:32 +01:00
straile
361b34d11d feat: input script option to use k means, fill training data buffer accordingly 2024-10-26 13:33:01 +02:00
straile
b4d093d205 feat: cluster labels in training data buffer 2024-10-25 18:58:31 +02:00
straile
51b3608b68 feat: C++ kmeans 2024-10-25 13:20:01 +02:00
straile
2f0b84bb3e fix: lock mutex before signal to end program 2024-10-23 18:25:32 +02:00
straile
4d254250e1 fix: set training wait predicate with buffer threshold check 2024-10-20 12:38:58 +02:00
straile
997ae32092 refactor: pre/postprocess as separate functions 2024-10-19 18:44:44 +02:00
straile
f746a566cc timekeeping & a lot else 2024-10-19 18:29:41 +02:00
straile
a1c954df43 refactor: get training data targets directly from state_C 2024-10-19 14:43:31 +02:00
straile
29858bb6d5 fix: mutex around 'start_training' when filling training buffer 2024-10-18 17:51:38 +02:00
straile
50f820dc94 docs: CUDA found message 2024-10-17 10:51:49 +02:00
straile
db36a99462 fix: cuda not required 2024-10-17 10:21:45 +02:00
straile
589773731a feat: ai surrogate inference time keeping 2024-10-16 15:59:04 +02:00
straile
7ae203c7dc feat: disable training via input script 2024-10-15 15:35:27 +02:00
straile
7925534b5e fix: Python_Finalize() no longer potentially blocking 2024-10-15 13:07:28 +02:00
straile
b127fbe7b3 fix: tenacious double free error at end of program 2024-10-15 12:27:41 +02:00
straile
a4a1eedcac fWF 2024-10-15 11:36:12 +02:00
straile
8691370abb Eigen model works 2024-10-15 11:17:30 +02:00
straile
e99a114bc3 revert all 2024-10-14 20:28:57 +02:00
straile
c323705f34 what is the problem? 2024-10-14 20:03:14 +02:00
straile
74cd827c68 what is the problem? 2024-10-14 19:58:34 +02:00
hans
5bfb95c2fc feat: training data buffer size defaults to field size 2024-10-12 17:58:51 +02:00
hans
a289fc7790 feat: option to save current ai model after training 2024-10-12 16:04:24 +02:00
straile
0017a20e82 docs: update descriptions of ai surrogate options and parameters 2024-10-11 12:34:43 +02:00
straile
f7d3a7ea65 fix: function stubs for compilation without AI surrogate 2024-10-11 12:12:38 +02:00
straile
84c86a85f5 Feat: AI Surrogate inference (with Keras or C++) and parallel training 2024-10-11 11:25:00 +02:00
straile
c2535b03a7 Feat: Conditional waiting and training data buffer 2024-10-10 20:11:52 +02:00
straile
0bf51d0f02 Fix: Eigen double free 2024-10-10 14:17:28 +02:00
straile
3464ada1f1 Docs: Description of Buid flag 2024-10-09 17:33:12 +02:00
straile
d839ae4d5e Validate predictions 2024-10-09 17:23:14 +02:00
straile
e2d96ca9b6 Feat: Build flag for AI surrogate 2024-10-09 15:25:03 +02:00
straile
a0fe891f99 Eigen inference but slow 2024-10-07 22:01:29 +02:00
straile
394e7caa49 Python prediction 2024-10-07 13:40:43 +02:00
straile
c0456e2f14 Removing AI surrogate R lib 2024-10-07 10:01:37 +02:00
straile
b4b4d76c74 Include Python.h 2024-10-07 09:50:42 +02:00
18 changed files with 1458 additions and 346 deletions

View File

@ -16,7 +16,6 @@ list(APPEND CMAKE_MODULE_PATH "${POET_SOURCE_DIR}/CMake")
get_poet_version()
find_package(MPI REQUIRED)
find_package(RRuntime REQUIRED)
add_subdirectory(src)

136
README.md
View File

@ -98,6 +98,11 @@ following available options:
slowed down significantly. Defaults to _OFF_.
- **POET_PREPROCESS_BENCHS**=*boolean* - enables the preprocessing of
predefined models/benchmarks. Defaults to *ON*.
- **USE_AI_SURROGATE**=*boolean* - includes the functions of the AI
surrogate model. When active, CMake relies on `find_package()` to find
an a implementation of `Threads` and a Python environment where Numpy
and Keras need to be installed. Defaults to _OFF_.
### Example: Build from scratch
@ -231,37 +236,81 @@ mpirun -n 4 ./poet --dht --dht-snaps=2 barite_het_rt.R barite_het.rds output
### Example: Preparing Environment and Running with AI surrogate
To run the AI surrogate, you need to install the R package `keras3`. The
compilation process of POET remains the same as shown above.
To run the AI surrogate, you need to have a Keras installed in your
Python environment. The implementation in POET is agnostic to the exact
Keras version, but the provided model file must match your Keras version.
Using Keras 3 with `.keras` model files is recommended. The compilation
process of POET remains mostly the same as shown above, but the CMake
option `-DUSE_AI_SURROGATE=ON` must be set.
In the following code block, the installation process on the Turing Cluster is
shown. `miniconda` is used to create a virtual environment to install
tensorflow/keras. Please adapt the installation process to your needs.
To use the AI surrogate, you must declare several values in the R input
script. This can be either done directly in the input script or in an
additional file. This file can be provided by adding the file path as the
element `ai_surrogate_input_script` to the `chemistry_setup` list in the
R input script.
<!-- Start an R interactive session and install the required packages: -->
The following variables and functions must be declared:
- `model_file_path` [*string*]: Path to the Keras model file with which
the AI surrogate model is initialized.
```sh
# First, install the required R packages
R -e "install.packages('keras3', repos='https://cloud.r-project.org/')"
- `validate_predictions(predictors, prediction)` [*function*]: Must return a
boolean vector of length `nrow(predictions)`. The output of this function
defines which predictions are considered valid and which are rejected.
the predictors and predictions are passed in their original original (not
transformed) scale. Regular simulation will only be done for the rejected
values. The input data of the rejected rows and the respective true results
from simulation will be added to the training data buffer of the AI surrogate
model. Can eg. be implemented as a mass balance threshold between the
predictors and the prediction.
# manually create a virtual environment to install keras/python using conda,
# as this is somehow broken on the Turing Cluster when using the `keras::install_keras()` function
cd poet
# create a virtual environment in the .ai directory with python 3.11
conda create -p ./.ai python=3.11
conda activate ./.ai
The following variables and functions can be declared:
- `batch_size` [*int*]: Batch size for the inference and training functions,
defaults to 2560.
# install tensorflow and keras
pip install keras tensorflow[and-cuda]
- `training_epochs` [*int*]: Number of training epochs with each training data
set, defaults to 20.
# add conda's python path to the R environment
# make sure to have the conda environment activated
echo -e "RETICULATE_PYTHON=$(which python)\n" >> ~/.Renviron
```
- `training_data_size` [*int*]: Size of the training data buffer. After
the buffer has been filled, the model starts training and removes this amount
of data from the front of the buffer. Defaults to the size of the Field.
After setup the R environment, recompile POET and you're ready to run the AI
surrogate.
- `use_Keras_predictions` [*bool*]: Decides if the Keras prediction function
should be used instead of the custom C++ implementation. Keras might be faster
for larger models, especially on GPU. The C++ inference function assumes that
the Keras model is a standrad feed forward network with either 32 or 64 bit
precision and ReLU activation. Any model that deviates from this architecture
should activate the Keras prediction function to ensure correct calculation.
Defaults to false.
- `disable_training` [*bool*]: Deactivates the training functions. Defaults to
false.
- `train_only_invalid` [*bool*]: Use only the data from PHREEQC for training
instead of the whole field (which might contain the models own predictions).
Defaults to false.
- `save_model_path` [*string*]: After each training step the current model
is saved to this path as a .keras file.
- `preprocess(df)` [*function*]:
Returns the scaled/transformed data frame. The default implementation uses no
scaling or transformations.
- `postprocess(df)` [*function*]: Returns the rescaled/backtransformed data frame.
The combination of preprocess() and postprocess() is expected to be idempotent.
The default implementation uses no scaling or transformations.
- `assign_clusters(df)` [*function*]: Must return a vector of length
`nrow(predictions)` that contains cluster labels as 0/1. According to these
labels, two separate models will be used for inference and training. Cluster
assignments can e.g. be done for the reactive and non reactive parts of the
field.
- `model_reactive_file_path` [*string*]: Path to the Keras model file with
which the AI surrogate model for the reactive cluster is initialized. If
ommitted, the models for both clusters will be initialized from
`model_file_path`
```sh
cd <installation_dir>/bin
@ -312,44 +361,3 @@ important information from the OpenMPI Man Page:
For example, on platforms that support it, the clock_gettime()
function will be used to obtain a monotonic clock value with whatever
precision is supported on that platform (e.g., nanoseconds).
## Additional functions for the AI surrogate
The AI surrogate can be activated for any benchmark and is by default
initiated as a sequential keras model with three hidden layer of depth
48, 96, 24 with relu activation and adam optimizer. All functions in
`ai_surrogate_model.R` can be overridden by adding custom definitions
via an R file in the input script. This is done by adding the path to
this file in the input script. Simply add the path as an element
called `ai_surrogate_input_script` to the `chemistry_setup` list.
Please use the global variable `ai_surrogate_base_path` as a base path
when relative filepaths are used in custom funtions.
**There is currently no default implementation to determine the
validity of predicted values.** This means, that every input script
must include an R source file with a custom function
`validate_predictions(predictors, prediction)`. Examples for custom
functions can be found for the barite_200 benchmark
The functions can be defined as follows:
`validate_predictions(predictors, prediction)`: Returns a boolean
index vector that signals for each row in the predictions if the
values are considered valid. Can eg. be implemented as a mass balance
threshold between the predictors and the prediction.
`initiate_model()`: Returns a keras model. Can be used to load
pretrained models.
`preprocess(df, backtransform = FALSE, outputs = FALSE)`: Returns the
scaled/transformed/backtransformed dataframe. The `backtransform` flag
signals if the current processing step is applied to data that's
assumed to be scaled and expects backtransformed values. The `outputs`
flag signals if the current processing step is applied to the output
or tatget of the model. This can be used to eg. skip these processing
steps and only scale the model input.
`training_step (model, predictor, target, validity)`: Trains the model
after each iteration. `validity` is the bool index vector given by
`validate_predictions` and can eg. be used to only train on values
that have not been valid predictions.

View File

@ -1,75 +0,0 @@
## This file contains default function implementations for the ai surrogate.
## To load pretrained models, use pre-/postprocessing or change hyperparameters
## it is recommended to override these functions with custom implementations via
## the input script. The path to the R-file containing the functions mus be set
## in the variable "ai_surrogate_input_script". See the barite_200.R file as an
## example and the general README for more information.
require(keras3)
require(tensorflow)
initiate_model <- function() {
hidden_layers <- c(48, 96, 24)
activation <- "relu"
loss <- "mean_squared_error"
input_length <- length(ai_surrogate_species)
output_length <- length(ai_surrogate_species)
## Creates a new sequential model from scratch
model <- keras_model_sequential()
## Input layer defined by input data shape
model %>% layer_dense(units = input_length,
activation = activation,
input_shape = input_length,
dtype = "float32")
for (layer_size in hidden_layers) {
model %>% layer_dense(units = layer_size,
activation = activation,
dtype = "float32")
}
## Output data defined by output data shape
model %>% layer_dense(units = output_length,
activation = activation,
dtype = "float32")
model %>% compile(loss = loss,
optimizer = "adam")
return(model)
}
gpu_info <- function() {
msgm(tf_gpu_configured())
}
prediction_step <- function(model, predictors) {
prediction <- predict(model, as.matrix(predictors))
colnames(prediction) <- colnames(predictors)
return(as.data.frame(prediction))
}
preprocess <- function(df, backtransform = FALSE, outputs = FALSE) {
return(df)
}
postprocess <- function(df, backtransform = TRUE, outputs = TRUE) {
return(df)
}
set_valid_predictions <- function(temp_field, prediction, validity) {
temp_field[validity == 1, ] <- prediction[validity == 1, ]
return(temp_field)
}
training_step <- function(model, predictor, target, validity) {
msgm("Training:")
x <- as.matrix(predictor)
y <- as.matrix(target[colnames(x)])
model %>% fit(x, y)
model %>% save_model_tf(paste0(out_dir, "/current_model.keras"))
}

View File

@ -0,0 +1,30 @@
## This file contains default function implementations for the ai surrogate.
## To use pre-/postprocessing it is recommended to override these functions
## with custom implementations via the input script. The path to the R-file
## See the barite_50.R file as an example and the general README for more
## information.
preprocess <- function(df) {
return(df)
}
postprocess <- function(df) {
return(df)
}
set_valid_predictions <- function(temp_field, prediction, validity) {
temp_field[validity == 1, ] <- prediction[validity == 1, ]
return(temp_field)
}
get_invalid_values <- function(df, validity) {
return(df[validity == 0, ])
}
set_field <- function(temp_field, columns, rows, column_name_limit,
byrow = FALSE) {
temp_field <- matrix(temp_field, nrow = rows, byrow = byrow)
temp_field <- setNames(data.frame(temp_field), columns)
temp_field <- temp_field[column_name_limit]
return(temp_field)
}

View File

@ -76,9 +76,10 @@ master_iteration_end <- function(setup, state_T, state_C) {
state_C <- data.frame(state_C, check.names = FALSE)
ai_surrogate_info <- list(
prediction_time = if (exists("ai_prediction_time")) as.integer(ai_prediction_time) else NULL,
training_time = if (exists("ai_training_time")) as.integer(ai_training_time) else NULL,
valid_predictions = if (exists("validity_vector")) validity_vector else NULL
prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL,
predictions_validity = if (exists("validity_vector")) validity_vector else NULL,
cluster_labels = if (exists("cluster_labels")) cluster_labels else NULL,
predictions = if (exists("predictions")) predictions else NULL
)
SaveRObj(x = list(

3
bench/barite/barite_200.R Normal file → Executable file
View File

@ -47,8 +47,7 @@ dht_species <- c(
)
chemistry_setup <- list(
dht_species = dht_species,
ai_surrogate_input_script = "./barite_200ai_surrogate_input_script.R"
dht_species = dht_species
)
# Define a setup list for simulation configuration

View File

@ -1,48 +0,0 @@
## load a pretrained model from tensorflow file
## Use the global variable "ai_surrogate_base_path" when using file paths
## relative to the input script
initiate_model <- function() {
init_model <- normalizePath(paste0(ai_surrogate_base_path,
"model_min_max_float64.keras"))
return(load_model_tf(init_model))
}
scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) {
return((x * (max - min)) + min)
} else {
return((x - min) / (max - min))
}
}
preprocess <- function(df, backtransform = FALSE, outputs = FALSE) {
minmax_file <- normalizePath(paste0(ai_surrogate_base_path,
"min_max_bounds.rds"))
global_minmax <- readRDS(minmax_file)
for (column in colnames(df)) {
df[column] <- lapply(df[column],
scale_min_max,
global_minmax$min[column],
global_minmax$max[column],
backtransform)
}
return(df)
}
mass_balance <- function(predictors, prediction) {
dBa <- abs(prediction$Ba + prediction$Barite -
predictors$Ba - predictors$Barite)
dSr <- abs(prediction$Sr + prediction$Celestite -
predictors$Sr - predictors$Celestite)
return(dBa + dSr)
}
validate_predictions <- function(predictors, prediction) {
epsilon <- 3e-5
mb <- mass_balance(predictors, prediction)
msgm("Mass balance mean:", mean(mb))
msgm("Mass balance variance:", var(mb))
msgm("Rows where mass balance meets threshold", epsilon, ":",
sum(mb < epsilon))
return(mb < epsilon)
}

0
bench/barite/barite_50ai.R Normal file → Executable file
View File

0
bench/barite/barite_50ai_rt.R Normal file → Executable file
View File

View File

@ -1,18 +1,13 @@
## Time-stamp: "Last modified 2024-05-30 13:27:06 delucia"
## load a pretrained model from tensorflow file
## Use the global variable "ai_surrogate_base_path" when using file paths
## relative to the input script
initiate_model <- function() {
require(keras3)
require(tensorflow)
init_model <- normalizePath(paste0(ai_surrogate_base_path,
model_file_path <- normalizePath(paste0(ai_surrogate_base_path,
"barite_50ai_all.keras"))
Model <- keras3::load_model(init_model)
msgm("Loaded model:")
print(str(Model))
return(Model)
}
batch_size <- 1280
training_epochs <- 20
save_model_path <- "current_model.keras"
scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) {
@ -22,22 +17,105 @@ scale_min_max <- function(x, min, max, backtransform) {
}
}
minmax <- list(min = c(H = 111.012433592824, O = 55.5062185549492, Charge = -3.1028354471876e-08,
Ba = 1.87312878574393e-141, Cl = 0, `S(6)` = 4.24227510643685e-07,
Sr = 0.00049382996130541, Barite = 0.000999542409828586, Celestite = 0.244801877115968),
max = c(H = 111.012433679682, O = 55.5087003521685, Charge = 5.27666636082035e-07,
Ba = 0.0908849779513762, Cl = 0.195697626449355, `S(6)` = 0.000620774752665846,
Sr = 0.0558680070692722, Barite = 0.756779139057097, Celestite = 1.00075422160624
))
## Apply decimal logarithms handling negative values
Safelog <- function (vec) {
rv <- range(vec)
if (max(abs(rv)) < 1) {
ret <- vec
ret[vec == 0] <- 0
ret[vec > 0] <- log10(vec[vec > 0])
ret[vec < 0] <- -log10(-vec[vec < 0])
} else {
ret <- log10(abs(vec))
}
ret
}
Safeexp <- function (vec) {
ret <- vec
ret[vec == 0] <- 0
ret[vec > 0] <- -10^(-vec[vec > 0])
ret[vec < 0] <- 10^(vec[vec < 0])
ret
}
##' @title Apply transformations to cols of a data.frame
##' @param df named data.frame
##' @param tlist list of function names
##' @return data.frame with the columns specified in tlist and the
##' transformed numerical values
##' @author MDL
TransfList <- function (df, tlist) {
vars <- intersect(colnames(df), names(tlist))
lens <- sapply(tlist[vars], length)
n <- max(lens)
filledlist <- lapply(tlist[vars],
function(x)
if (length(x) < n)
return(c(x, rep("I", n - length(x))))
else
return(x))
tmp <- df[, vars]
for (i in seq_len(n)) {
tmp <- as.data.frame(sapply(vars, function(x)
do.call(filledlist[[x]][i], list(tmp[, x]))))
}
return(tmp)
}
##' This function applies some specified string substitution such as
##' 's/log/exp/' so that from a list of "forward" transformation
##' functions it computes a "backward" one
##' @title Apply back transformations to cols of a data.frame
##' @param df named data.frame
##' @param tlist list of function names
##' @return data.frame with the columns specified in tlist and the
##' backtransformed numerical values
##' @author MDL
BackTransfList <- function (df, tlist) {
vars <- intersect(colnames(df), names(tlist))
lens <- sapply(tlist[vars], length)
n <- max(lens)
filledlist <- lapply(tlist[vars],
function(x)
if (length(x) < n)
return(c(x, rep("I", n - length(x))))
else
return(x))
rlist <- lapply(filledlist, rev)
rlist <- lapply(rlist, sub, pattern = "log", replacement = "exp")
rlist <- lapply(rlist, sub, pattern = "1p", replacement = "m1")
rlist <- lapply(rlist, sub, pattern = "Mul", replacement = "Div")
tmp <- df[, vars]
for (i in seq_len(n)) {
tmp <- as.data.frame(sapply(vars, function(x)
do.call(rlist[[x]][i], list(tmp[, x]))))
}
return(tmp)
}
tlist <- list("H" = "log1p", "O" = "log1p", "Charge" = "Safelog",
"Ba" = "log1p", "Cl" = "log1p", "S(6)" = "log1p",
"Sr" = "log1p", "Barite" = "log1p", "Celestite" = "log1p")
minmaxlog <- list(min = c(H = 4.71860987935512, O = 4.03435069501537,
Charge = -14.5337693764617, Ba = 1.87312878574393e-141,
Cl = 0, `S(6)` = 4.2422742065922e-07,
Sr = 0.00049370806741832, Barite = 0.000999043199940672,
Celestite = 0.218976382406766),
max = c(H = 4.71860988013054, O = 4.03439461483133,
Charge = 12.9230752782909, Ba = 0.086989273200771,
Cl = 0.178729802869381, `S(6)` = 0.000620582151722819,
Sr = 0.0543631841661421, Barite = 0.56348209786429,
Celestite = 0.69352422027466))
preprocess <- function(df) {
if (!is.data.frame(df))
df <- as.data.frame(df, check.names = FALSE)
as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x=df[x],
min=minmax$min[x],
max=minmax$max[x],
tlog <- TransfList(df, tlist)
as.data.frame(lapply(colnames(tlog),
function(x) scale_min_max(x = tlog[x],
min = minmaxlog$min[x],
max = minmaxlog$max[x],
backtransform = FALSE)),
check.names = FALSE)
}
@ -45,25 +123,26 @@ preprocess <- function(df) {
postprocess <- function(df) {
if (!is.data.frame(df))
df <- as.data.frame(df, check.names = FALSE)
as.data.frame(lapply(colnames(df),
ret <- as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x = df[x],
min=minmax$min[x],
max=minmax$max[x],
min = minmaxlog$min[x],
max = minmaxlog$max[x],
backtransform = TRUE)),
check.names = FALSE)
BackTransfList(ret, tlist)
}
mass_balance <- function(predictors, prediction) {
dBa <- abs(prediction$Ba + prediction$Barite -
predictors$Ba - predictors$Barite)
dSr <- abs(prediction$Sr + prediction$Celestite -
predictors$Sr - predictors$Celestite)
mass_balance <- function(x, y) {
dBa <- abs(y$Ba + y$Barite -
(x$Ba + x$Barite))
dSr <- abs(y$Sr + y$Celestite -
(x$Sr + x$Celestite))
return(dBa + dSr)
}
validate_predictions <- function(predictors, prediction) {
epsilon <- 1E-7
epsilon <- 1E-5
mb <- mass_balance(predictors, prediction)
msgm("Mass balance mean:", mean(mb))
msgm("Mass balance variance:", var(mb))
@ -72,19 +151,3 @@ validate_predictions <- function(predictors, prediction) {
sum(ret))
return(ret)
}
training_step <- function(model, predictor, target, validity) {
msgm("Starting incremental training:")
## x <- as.matrix(predictor)
## y <- as.matrix(target[colnames(x)])
history <- model %>% keras3::fit(x = data.matrix(predictor),
y = data.matrix(target),
epochs = 10, verbose=1)
keras3::save_model(model,
filepath = paste0(out_dir, "/current_model.keras"),
overwrite=TRUE)
return(model)
}

View File

@ -18,7 +18,55 @@ target_sources(POETLib
Chemistry/SurrogateModels/ProximityHashTable.cpp
)
target_include_directories(POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}")
target_include_directories(
POETLib PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_BINARY_DIR}"
)
option(USE_AI_SURROGATE "Compiles the AI surrogate functions with Python.h integration" OFF)
if(USE_AI_SURROGATE)
add_definitions(-DUSE_AI_SURROGATE)
message("Building with AI surrogate functions")
message(" -- Needs Python (with Numpy & Keras) and Threads libraries")
find_package(CUDA)
IF (${CUDA_FOUND})
message(" -- Setting TensorFlow CUDA path to: ${CUDA_TOOLKIT_ROOT_DIR}")
ELSE (${CUDA_FOUND})
message(" -- No CUDA installation found for TensorFlow ")
ENDIF (${CUDA_FOUND})
# make sure to use the python installation from the conda environment
if(DEFINED ENV{CONDA_PREFIX})
set(Python3_EXECUTABLE "$ENV{CONDA_PREFIX}/bin/python3")
endif()
find_package(Python3 COMPONENTS Interpreter Development NumPy REQUIRED)
find_package(Threads REQUIRED)
set_source_files_properties(
Chemistry/SurrogateModels/AI_functions.cpp
PROPERTIES COMPILE_FLAGS "-O3 -march=native -mtune=native"
)
target_sources(POETLib
PRIVATE
Chemistry/SurrogateModels/AI_functions.cpp
)
target_include_directories(POETLib PUBLIC
"${Python3_INCLUDE_DIRS}"
"${Python3_NumPy_INCLUDE_DIRS}"
)
target_link_libraries(POETLib
PUBLIC "${Python3_LIBRARIES}"
PUBLIC Threads::Threads pthread
)
endif() # USE_AI_SURROGATE
target_link_libraries(
POETLib
PUBLIC RRuntime
@ -27,6 +75,10 @@ target_link_libraries(
PUBLIC MPI::MPI_C
)
# Define Python API version
target_compile_definitions(POETLib PRIVATE NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION)
include(FetchContent)
FetchContent_Declare(
cli11
@ -80,10 +132,11 @@ target_compile_definitions(POETLib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
file(READ "${PROJECT_SOURCE_DIR}/R_lib/kin_r_library.R" R_KIN_LIB )
file(READ "${PROJECT_SOURCE_DIR}/R_lib/init_r_lib.R" R_INIT_LIB)
file(READ "${PROJECT_SOURCE_DIR}/R_lib/ai_surrogate_model.R" R_AI_SURROGATE_LIB)
file(READ "${PROJECT_SOURCE_DIR}/R_lib/ai_surrogate_model_functions.R" R_AI_SURROGATE_LIB)
configure_file(poet.hpp.in poet.hpp @ONLY)
add_executable(poet poet.cpp)
target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11)
target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")

View File

@ -0,0 +1,43 @@
import tensorflow as tf
import numpy as np
import os
os.environ["TF_XLA_FLAGS"] = "--tf_xla_cpu_global_jit"
os.environ["XLA_FLAGS"] = "--xla_gpu_cuda_data_dir=" + cuda_dir
def initiate_model(model_file_path):
print("AI: Model loaded from: " + model_file_path, flush=True)
model = tf.keras.models.load_model(model_file_path)
return model
def prediction_step(model, model_reactive, x, cluster_labels, batch_size):
# Catch input size mismatches
model_input_shape = model.input_shape[1:]
if model_input_shape != x.shape[1:]:
print(f"Input data size {x.shape[1:]} does not match model input size {model_input_shape}",
flush=True)
# Predict separately if clustering is used
if cluster_labels:
cluster_labels = np.asarray(cluster_labels, dtype=bool)
# Combine results
prediction = np.zeros_like(x)
prediction[cluster_labels] = model_reactive.predict(x[cluster_labels], batch_size)
prediction[~cluster_labels] = model.predict(x[~cluster_labels], batch_size)
else:
prediction = model.predict(x, batch_size)
return np.array(prediction, dtype=np.float64)
def get_weights(model):
weights = model.get_weights()
return weights
def training_step(model, x, y, batch_size, epochs,
output_file_path):
history = model.fit(x, y,
epochs=epochs,
batch_size=batch_size)
if output_file_path:
model.save(output_file_path)
return history

View File

@ -0,0 +1,757 @@
#include <iostream>
#include <string>
#include <cstring>
#include <vector>
#include <Python.h>
#include <numpy/arrayobject.h>
#include <Eigen/Dense>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "AI_functions.hpp"
#include "poet.hpp"
using namespace std;
namespace poet {
/**
* @brief Loads the Python interpreter and functions
* @param functions_file_path Path to the Python file where the AI surrogate
* functions are defined
* @return 0 if function was succesful
*/
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir) {
// Initialize Python functions
Py_Initialize();
// Import numpy functions
_import_array();
PyRun_SimpleString(("cuda_dir = \"" + cuda_src_dir + "\"").c_str()) ;
FILE* fp = fopen(functions_file_path.c_str(), "r");
int py_functions_initialized = PyRun_SimpleFile(fp, functions_file_path.c_str());
fclose(fp);
if (py_functions_initialized != 0) {
PyErr_Print();
throw std::runtime_error(std::string("AI surrogate Python functions could not be loaded." ) +
"Are tensorflow and numpy installed?");
}
return py_functions_initialized;
}
/**
* @brief Loads the user-supplied Keras model
* @param model_file_path Path to a .keras file that the user must supply as
* a variable "model_file_path" in the R input script
* @return 0 if function was succesful
*/
int Python_Keras_load_model(std::string model, std::string model_reactive, bool use_clustering) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Initialize Keras default model
int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" +
model + "\")").c_str());
if (py_model_loaded != 0) {
PyErr_Print();
throw std::runtime_error("Keras model could not be loaded from: " + model);
}
if (use_clustering) {
// Initialize second Keras model that will be used for the "reaction" cluster
py_model_loaded = PyRun_SimpleString(("model_reactive = initiate_model(\"" +
model_reactive + "\")").c_str());
if (py_model_loaded != 0) {
PyErr_Print();
throw std::runtime_error("Keras model could not be loaded from: " + model_reactive);
}
}
// Release the Python GIL
PyGILState_Release(gstate);
return py_model_loaded;
}
/**
* @brief Converts the std::vector 2D matrix representation of a POET Field object to a numpy array
* for use in the Python AI surrogate functions
* @param field 2D-Matrix with the content of a Field object
* @return Numpy representation of the input vector
*/
PyObject* vector_to_numpy_array(const std::vector<std::vector<double>>& field) {
npy_intp dims[2] = {static_cast<npy_intp>(field[0].size()),
static_cast<npy_intp>(field.size())};
PyObject* np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64);
double* data = static_cast<double*>(PyArray_DATA((PyArrayObject*)np_array));
// write field data to numpy array
for (size_t i = 0; i < field.size(); ++i) {
for (size_t j = 0; j < field[i].size(); ++j) {
data[j * field.size() + i] = field[i][j];
}
}
return np_array;
}
/**
* @brief Converts a Pyton matrix object to a std::vector vector
* @param py_matrix Pyobject that must be a 2D matrix
* @result Vector that can be used similar to the return value of the Field object
* Field.AsVector() method.
*/
std::vector<double> numpy_array_to_vector(PyObject* py_array) {
std::vector<double> result;
if (!PyArray_Check(py_array)) {
std::cerr << "The model's output is not a numpy array." << std::endl;
return result;
}
// Cast generic PyObject to PyArrayObject
PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(py_array);
// Get shape
int numDims = PyArray_NDIM(np_array);
npy_intp* shape = PyArray_SHAPE(np_array);
if (numDims != 2) {
std::cerr << "The model's predictions are not a 2D matrix." << std::endl;
return result;
}
// Copy data into std::vector format
double* data = static_cast<double*>(PyArray_DATA(np_array));
npy_intp size = PyArray_SIZE(np_array);
result.resize(size);
std::copy(data, data + size, result.begin());
return result;
}
/**
* @brief Uses the Python Keras functions to calculate predictions from a neural network.
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
std::vector<int>& cluster_labels) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Prepare data for Python
PyObject* py_df_x = vector_to_numpy_array(x);
// Prepare cluster label vector for Python
PyObject* py_cluster_list = PyList_New(cluster_labels.size());
for (size_t i = 0; i < cluster_labels.size(); i++) {
PyObject* py_int = PyLong_FromLong(cluster_labels[i]);
PyList_SET_ITEM(py_cluster_list, i, py_int);
}
// Get the model and inference function from the global python interpreter
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step");
// Get secod model if clustering is used
PyObject* py_keras_model_reactive = Py_None;;
if (cluster_labels.size() > 0) {
py_keras_model_reactive = PyDict_GetItemString(py_global_dict, "model_reactive");
}
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOOOi)",
py_keras_model, py_keras_model_reactive, py_df_x, py_cluster_list, batch_size);
// Call the Python inference function
PyObject* py_predictions = PyObject_CallObject(py_inference_function, args);
// Check py_rv and return as 2D vector
std::vector<double> predictions = numpy_array_to_vector(py_predictions);
// Clean up
PyErr_Print();
Py_XDECREF(py_df_x);
Py_XDECREF(py_cluster_list);
Py_XDECREF(args);
Py_XDECREF(py_predictions);
// Release the Python GIL
PyGILState_Release(gstate);
return predictions;
}
/**
* @brief Uses Eigen for fast inference with the weights and biases of a neural network.
* This function assumes ReLU activation for each layer.
* @param input_batch Batch of input data that must fit the size of the neural networks input layer
* @param model Struct of aligned Eigen vectors that hold the neural networks weights and biases.
* Only supports simple fully connected feed forward networks.
* @return The batch of predictions made with the neural network weights and biases and the data
* in input_batch
*/
Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<const Eigen::MatrixXd>& input_batch, const EigenModel& model) {
Eigen::MatrixXd current_layer = input_batch;
// Process all hidden layers
for (size_t layer = 0; layer < model.weight_matrices.size() - 1; ++layer) {
current_layer = (model.weight_matrices[layer] * current_layer);
current_layer = current_layer.colwise() + model.biases[layer];
current_layer = current_layer.array().max(0.0);
}
// Process output layer (without ReLU)
size_t output_layer = model.weight_matrices.size() - 1;
return (model.weight_matrices[output_layer] * current_layer).colwise() + model.biases[output_layer];
}
/**
* @brief Uses the Eigen representation of the two different Keras model weights for fast inference
* @param model The model for the non reactive cluster of the field (label 0)
* @param model_reactive The model for the non reactive cluster of the field (label 1)
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from
* the training thread
* @param cluster_labels K-Means cluster label dor each row in the field
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Eigen_predict_clustered(const EigenModel& model, const EigenModel& model_reactive,
std::vector<std::vector<double>>& x, int batch_size,
std::mutex* Eigen_model_mutex, std::vector<int>& cluster_labels) {
const int num_samples = x[0].size();
const int num_features = x.size();
if (num_features != model.weight_matrices[0].cols() ||
num_features != model_reactive.weight_matrices[0].cols()) {
throw std::runtime_error("Input data size " + std::to_string(num_features) +
" does not match model input layer sizes" + std::to_string(model.weight_matrices[0].cols()) +
" / " + std::to_string(model_reactive.weight_matrices[0].cols()));
}
// Convert input data to Eigen matrix
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
for (size_t i = 0; i < num_samples; ++i) {
for (size_t j = 0; j < num_features; ++j) {
full_input_matrix(j, i) = x[j][i];
}
}
// Create indices for each cluster
std::vector<int> cluster_0_indices, cluster_1_indices;
for (size_t i = 0; i < cluster_labels.size(); ++i) {
if (cluster_labels[i] == 0) {
cluster_0_indices.push_back(i);
} else {
cluster_1_indices.push_back(i);
}
}
// Prepare matrices for each cluster
Eigen::MatrixXd input_matrix(num_features, cluster_0_indices.size());
Eigen::MatrixXd input_matrix_reactive(num_features, cluster_1_indices.size());
// Split data according to cluster labels
for (size_t i = 0; i < cluster_0_indices.size(); ++i) {
input_matrix.col(i) = full_input_matrix.col(cluster_0_indices[i]);
}
for (size_t i = 0; i < cluster_1_indices.size(); ++i) {
input_matrix_reactive.col(i) = full_input_matrix.col(cluster_1_indices[i]);
}
// Process each cluster
std::vector<double> result(num_samples * model.weight_matrices.back().rows());
Eigen_model_mutex->lock();
if (!cluster_0_indices.empty()) {
int num_batches_0 = std::ceil(static_cast<double>(cluster_0_indices.size()) / batch_size);
for (int batch = 0; batch < num_batches_0; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, static_cast<int>(cluster_0_indices.size()));
int current_batch_size = end_idx - start_idx;
Eigen::MatrixXd batch_data = input_matrix.block(0, start_idx, num_features, current_batch_size);
Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model);
// Store results in their original positions
for (size_t i = 0; i < current_batch_size; ++i) {
int original_idx = cluster_0_indices[start_idx + i];
for (size_t j = 0; j < batch_result.rows(); ++j) {
result[original_idx * batch_result.rows() + j] = batch_result(j, i);
}
}
}
}
// Process cluster 1
if (!cluster_1_indices.empty()) {
int num_batches_1 = std::ceil(static_cast<double>(cluster_1_indices.size()) / batch_size);
for (int batch = 0; batch < num_batches_1; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, static_cast<int>(cluster_1_indices.size()));
int current_batch_size = end_idx - start_idx;
Eigen::MatrixXd batch_data = input_matrix_reactive.block(0, start_idx, num_features, current_batch_size);
Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model_reactive);
// Store results in their original positions
for (size_t i = 0; i < current_batch_size; ++i) {
int original_idx = cluster_1_indices[start_idx + i];
for (size_t j = 0; j < batch_result.rows(); ++j) {
result[original_idx * batch_result.rows() + j] = batch_result(j, i);
}
}
}
}
Eigen_model_mutex->unlock();
return result;
}
/**
* @brief Uses the Eigen representation of the tKeras model weights for fast inference
* @param model The model weights and biases
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from
* the training thread
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
std::mutex* Eigen_model_mutex) {
// Convert input data to Eigen matrix
const int num_samples = x[0].size();
const int num_features = x.size();
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
for (size_t i = 0; i < num_samples; ++i) {
for (size_t j = 0; j < num_features; ++j) {
full_input_matrix(j, i) = x[j][i];
}
}
std::vector<double> result;
result.reserve(num_samples * num_features);
if (num_features != model.weight_matrices[0].cols()) {
throw std::runtime_error("Input data size " + std::to_string(num_features) + \
" does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols()));
}
int num_batches = std::ceil(static_cast<double>(num_samples) / batch_size);
Eigen_model_mutex->lock();
for (int batch = 0; batch < num_batches; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, num_samples);
int current_batch_size = end_idx - start_idx;
// Extract the current input data batch
Eigen::MatrixXd batch_data(num_features, current_batch_size);
batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size);
// Predict
batch_data = eigen_inference_batched(batch_data, model);
result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size());
}
Eigen_model_mutex->unlock();
return result;
}
/**
* @brief Appends data from one matrix (column major std::vector<std::vector<double>>) to another
* @param training_data_buffer Matrix that the values are appended to
* @param new_values Matrix that is appended
*/
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
std::vector<std::vector<double>>& new_values) {
// Initialize training data buffer if empty
if (training_data_buffer.size() == 0) {
training_data_buffer = new_values;
} else { // otherwise append
for (size_t col = 0; col < training_data_buffer.size(); col++) {
training_data_buffer[col].insert(training_data_buffer[col].end(),
new_values[col].begin(), new_values[col].end());
}
}
}
/**
* @brief Appends data from one int vector to another based on a mask vector
* @param labels Vector that the values are appended to
* @param new_labels Values that are appended
* @param validity Mask vector that defines how many and which values are appended
*/
void cluster_labels_append(std::vector<int>& labels_buffer, std::vector<int>& new_labels,
std::vector<int> validity) {
// Calculate new buffer size from number of valid elements in mask
int n_invalid = validity.size();
for (size_t i = 0; i < validity.size(); i++) {
n_invalid -= validity[i];
}
// Resize label vector to hold non valid elements
int end_index = labels_buffer.size();
int new_size = end_index + n_invalid;
labels_buffer.resize(new_size);
// Iterate over mask to transfer cluster labels
for (size_t i = 0; i < validity.size(); ++i) {
// Append only the labels of invalid rows
if (!validity[i]) {
labels_buffer[end_index] = new_labels[i];
end_index++;
}
}
}
/**
* @brief Uses the Python environment with Keras' default functions to train the model
* @param x Training data features
* @param y Training data targets
* @param params Global runtime paramters
*/
void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::vector<double>>& y,
int train_cluster, std::string model_name, const RuntimeParameters& params) {
// Prepare data for python
PyObject* py_df_x = vector_to_numpy_array(x);
PyObject* py_df_y = vector_to_numpy_array(y);
// Make sure that model output file name .keras file
std::string model_path = params.save_model_path;
if (!model_path.empty()) {
if (model_path.length() >= 6 && model_path.substr(model_path.length() - 6) != ".keras") {
model_path += ".keras";
}
}
// Choose the correct model to train if clustering is used
if (train_cluster == 1) {
if (!model_path.empty()) {
model_path.insert(model_path.length() - 6, "_reaction");
std::cout << "MODEL SAVED AS:" << model_path << std::endl;
}
}
// Get the model and training function from the global python interpreter
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str());
PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step");
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOOiis)",
py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs,
model_path.c_str());
// Call the Python training function
PyObject *py_rv = PyObject_CallObject(py_training_function, args);
// Clean up
PyErr_Print();
Py_DECREF(py_df_x);
Py_DECREF(py_df_y);
Py_DECREF(args);
}
/**
* @brief Function for threadsafe parallel training and weight updating.
* The function waits conditionally until the training data buffer is full.
* It then clears the buffer and starts training, after training it writes the new weights to
* the Eigen model.
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @param end_training Signals end of program to wind down thread gracefully
* @param params Global runtime paramters
* @return 0 if function was succesful
*/
void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training,
const RuntimeParameters& params) {
while (true) {
// Conditional waiting:
// - Sleeps until a signal arrives on training_data_buffer_full
// - Releases the lock on training_data_buffer_mutex while sleeping
// - Lambda function with start_training checks if it was a spurious wakeup
// - Reaquires the lock on training_data_buffer_mutex after waking up
// - If start_training has been set to true while the thread was active, it does NOT
// wait for a signal on training_data_buffer_full but starts the next round immediately.
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
training_data_buffer_full->wait(lock, [start_training] { return *start_training;});
// Return if program is about to end
if (*end_training) {
return;
}
// Get the necessary training data
std::cout << "AI: Training thread: Getting training data" << std::endl;
// Initialize training data input and targets
std::vector<std::vector<double>> inputs(training_data_buffer->x.size(),
std::vector<double>(params.training_data_size));
std::vector<std::vector<double>> targets(training_data_buffer->x.size(),
std::vector<double>(params.training_data_size));
int buffer_size = training_data_buffer->x[0].size();
// If clustering is used, check the current cluster
int n_cluster_reactive = 0;
int train_cluster = -1; // Default value for non clustered training (all data is used)
if (params.use_clustering) {
for (size_t i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer->cluster_labels[i];
}
train_cluster = n_cluster_reactive >= params.training_data_size;
}
int buffer_row = 0;
int copied_row = 0;
while (copied_row < params.training_data_size) {
if ((train_cluster == -1) ||
(train_cluster == training_data_buffer->cluster_labels[buffer_row])) {
for (size_t col = 0; col < training_data_buffer->x.size(); col++) {
// Copy and remove from training data buffer
inputs[col][copied_row] = training_data_buffer->x[col][buffer_row];
targets[col][copied_row] = training_data_buffer->y[col][buffer_row];
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin() +
buffer_row);
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin() +
buffer_row);
}
// Remove from cluster label buffer
if (params.use_clustering) {
training_data_buffer->cluster_labels.erase(
training_data_buffer->cluster_labels.begin() + buffer_row);
}
copied_row++;
} else {
buffer_row++;
}
}
// Set the waiting predicate to immediately continue training if enough elements of any cluster remain
if (train_cluster == 1) {
*start_training = ((n_cluster_reactive - params.training_data_size) >= params.training_data_size) ||
((buffer_size - n_cluster_reactive) >= params.training_data_size);
} else {
*start_training = (buffer_size - n_cluster_reactive - params.training_data_size)
>= params.training_data_size;
}
//update number of training runs
training_data_buffer->n_training_runs += 1;
// Unlock the training_data_buffer_mutex
lock.unlock();
std::string model_name = "model";
if (train_cluster == 1) {
model_name = "model_reactive";
}
std::cout << "AI: Training thread: Start training " << model_name << std::endl;
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Start training
Python_Keras_train(inputs, targets, train_cluster, model_name, params);
if (!params.use_Keras_predictions) {
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
std::vector<std::vector<std::vector<double>>> cpp_weights = Python_Keras_get_weights(model_name);
Eigen_model_mutex->lock();
if (train_cluster == 1) {
update_weights(Eigen_model_reactive, cpp_weights);
} else {
update_weights(Eigen_model, cpp_weights);
}
Eigen_model_mutex->unlock();
}
// Release the Python GIL
PyGILState_Release(gstate);
std::cout << "AI: Training thread: Finished training, waiting for new data" << std::endl;
}
}
std::thread python_train_thread;
/**
* @brief Starts a thread for parallel training and weight updating. This Wrapper function
* ensures, that the main POET program can be built without pthread support if the AI
* surrogate functions are disabled during compilation.
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @param end_training Signals end of program to wind down thread gracefully
* @param params Global runtime paramters
* @return 0 if function was succesful
*/
int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training,
const RuntimeParameters& params) {
PyThreadState *_save = PyEval_SaveThread();
python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_reactive,
Eigen_model_mutex, training_data_buffer,
training_data_buffer_mutex, training_data_buffer_full,
start_training, end_training, params);
return 0;
}
/**
* @brief Updates the EigenModels weigths and biases from the weight vector
* @param model Pounter to an EigenModel struct
* @param weights Vector of model weights from keras as returned by Python_Keras_get_weights()
*/
void update_weights(EigenModel* model,
const std::vector<std::vector<std::vector<double>>>& weights) {
size_t num_layers = weights.size() / 2;
for (size_t i = 0; i < weights.size(); i += 2) {
// Fill current weight matrix
size_t rows = weights[i][0].size();
size_t cols = weights[i].size();
for (size_t j = 0; j < cols; ++j) {
for (size_t k = 0; k < rows; ++k) {
model->weight_matrices[i / 2](k, j) = weights[i][j][k];
}
}
// Fill bias vector
size_t bias_size = weights[i + 1][0].size();
for (size_t j = 0; j < bias_size; ++j) {
model->biases[i / 2](j) = weights[i + 1][0][j];
}
}
}
/**
* @brief Converts the weights and biases from the Python Keras model to C++ vectors
* @return A vector containing the model weights and biases
*/
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string model_name) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str());
PyObject* py_get_weights_function = PyDict_GetItemString(py_global_dict, "get_weights");
PyObject* args = Py_BuildValue("(O)", py_keras_model);
// Call Python function
PyObject* py_weights_list = PyObject_CallObject(py_get_weights_function, args);
if (!py_weights_list) {
PyErr_Print();
throw std::runtime_error("Failed to get weights from Keras model");
}
// Container for the extracted weights
std::vector<std::vector<std::vector<double>>> cpp_weights;
// Iterate through the layers (weights and biases)
Py_ssize_t num_layers = PyList_Size(py_weights_list);
for (Py_ssize_t i = 0; i < num_layers; ++i) {
PyObject* py_weight_array = PyList_GetItem(py_weights_list, i);
if (!PyArray_Check(py_weight_array)) {
throw std::runtime_error("Weight is not a NumPy array.");
}
PyArrayObject* weight_np = reinterpret_cast<PyArrayObject*>(py_weight_array);
int dtype = PyArray_TYPE(weight_np);
// If array is 2D it's a weight matrix
if (PyArray_NDIM(weight_np) == 2) {
int num_rows = PyArray_DIM(weight_np, 0);
int num_cols = PyArray_DIM(weight_np, 1);
std::vector<std::vector<double>> weight_matrix(num_rows, std::vector<double>(num_cols));
// Handle different precision settings
if (dtype == NPY_FLOAT32) {
float* weight_data_float = static_cast<float*>(PyArray_DATA(weight_np));
for (size_t r = 0; r < num_rows; ++r) {
for (size_t c = 0; c < num_cols; ++c) {
weight_matrix[r][c] = static_cast<double>(weight_data_float[r * num_cols + c]);
}
}
} else if (dtype == NPY_DOUBLE) {
double* weight_data_double = static_cast<double*>(PyArray_DATA(weight_np));
for (size_t r = 0; r < num_rows; ++r) {
for (size_t c = 0; c < num_cols; ++c) {
weight_matrix[r][c] = weight_data_double[r * num_cols + c];
}
}
} else {
throw std::runtime_error("Unsupported data type for weights. Must be NPY_FLOAT32 or NPY_DOUBLE.");
}
cpp_weights.push_back(weight_matrix);
// If array is 1D it's a bias vector
} else if (PyArray_NDIM(weight_np) == 1) {
int num_elements = PyArray_DIM(weight_np, 0);
std::vector<std::vector<double>> bias_vector(1, std::vector<double>(num_elements));
// Handle different precision settings
if (dtype == NPY_FLOAT32) {
float* bias_data_float = static_cast<float*>(PyArray_DATA(weight_np));
for (size_t j = 0; j < num_elements; ++j) {
bias_vector[0][j] = static_cast<double>(bias_data_float[j]);
}
} else if (dtype == NPY_DOUBLE) {
double* bias_data_double = static_cast<double*>(PyArray_DATA(weight_np));
for (size_t j = 0; j < num_elements; ++j) {
bias_vector[0][j] = bias_data_double[j];
}
} else {
throw std::runtime_error("Unsupported data type for biases. Must be NPY_FLOAT32 or NPY_DOUBLE.");
}
cpp_weights.push_back(bias_vector);
}
}
// Clean up
Py_DECREF(py_weights_list);
Py_DECREF(args);
// Release Python GIL
PyGILState_Release(gstate);
return cpp_weights;
}
/**
* @brief Joins the training thread and winds down the Python environment gracefully
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @param end_training Signals end of program to wind down thread gracefully */
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training) {
training_data_buffer_mutex->lock();
// Define training as over
*end_training = true;
// Wake up and join training thread
*start_training = true;
training_data_buffer_mutex->unlock();
training_data_buffer_full->notify_one();
if (python_train_thread.joinable()) {
python_train_thread.join();
}
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
//Finalize Python
Py_FinalizeEx();
}
} //namespace poet

View File

@ -0,0 +1,112 @@
/**
* @file AI_functions.hpp
* @author Hans Straile (straile@uni-potsdam.de)
* @brief API for the AI/Machine Learning based chemistry surrogate model with functions to initialize a neural network and use it for training and inference via Keras for Python .
* @version 0.1
* @date 01 Nov 2024
*
* This file implements functions to train and predict with a neural network.
* All functions are based on a user supplied Keras model. Pyhton.h is used for model
* training with Keras and can be used for inference. The default inference funtion
* is implemented with Eigen matrices in C++. All functions use 2 different models
* to process data separately according to a K-means cluster assignement. This file
* alo contains the functions for the K-means algorithm.
*
*/
#ifndef AI_FUNCTIONS_H
#define AI_FUNCTIONS_H
#include <string>
#include <vector>
#include "poet.hpp"
// PhreeqC definition of pi clashes with Eigen macros
// so we have to temporarily undef it
#pragma push_macro("pi")
#undef pi
#include <Eigen/Dense>
#pragma pop_macro("pi")
namespace poet {
struct EigenModel {
// Custom struct for the Keras weight matrices and
// bias vectors
std::vector<Eigen::MatrixXd> weight_matrices;
std::vector<Eigen::VectorXd> biases;
};
struct TrainingData {
std::vector<std::vector<double>> x;
std::vector<std::vector<double>> y;
std::vector<int> cluster_labels;
int n_training_runs = 0;
};
// Ony declare the actual functions if flag is set
#ifdef USE_AI_SURROGATE
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir);
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full, bool* start_training,
bool* end_training);
int Python_Keras_load_model(std::string model, std::string model_reactive,
bool use_clustering);
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
std::vector<int>& cluster_labels);
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
std::vector<std::vector<double>>& new_values);
void cluster_labels_append(std::vector<int>& labels, std::vector<int>& new_labels,
std::vector<int> validity);
int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training,
const RuntimeParameters& params);
void update_weights(EigenModel* model, const std::vector<std::vector<std::vector<double>>>& weights);
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string model_name);
std::vector<double> Eigen_predict_clustered(const EigenModel& model, const EigenModel& model_reactive,
std::vector<std::vector<double>>& x,
int batch_size, std::mutex* Eigen_model_mutex,
std::vector<int>& cluster_labels);
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
std::mutex* Eigen_model_mutex);
// Otherwise, define the necessary stubs
#else
inline void Python_Keras_setup(std::string, std::string){}
inline void Python_finalize(std::mutex*, std::mutex*, std::condition_variable*, bool*, bool*){}
inline void Python_Keras_load_model(std::string, std::string, bool){}
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>&, int,
std::vector<int>&){return {};}
inline void training_data_buffer_append(std::vector<std::vector<double>>&,
std::vector<std::vector<double>>&){}
inline void cluster_labels_append(std::vector<int>&, std::vector<int>&, std::vector<int>){}
inline int Python_Keras_training_thread(EigenModel*, EigenModel*, std::mutex*,
TrainingData*, std::mutex*, std::condition_variable*,
bool*, bool*, const RuntimeParameters&){return {};}
inline void update_weights(EigenModel*, const std::vector<std::vector<std::vector<double>>>&){}
inline std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string){return {};}
inline std::vector<double> Eigen_predict_clustered(const EigenModel&, const EigenModel&,
std::vector<std::vector<double>>&, int,
std::mutex*, std::vector<int>&){return {};}
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>, int,
std::mutex*){return {};}
#endif
} // namespace poet
#endif // AI_FUNCTIONS_HPP

View File

@ -161,6 +161,15 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
mpi_buffer.begin() + this->prop_count * (wp_i + 1));
}
if (this->ai_surrogate_enabled) {
// Map valid predictions from the ai surrogate in the workpackage
for (int i = 0; i < s_curr_wp.size; i++) {
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) {
s_curr_wp.mapping[i] = CHEM_AISURR;
}
}
}
// std::cout << this->comm_rank << ":" << counter++ << std::endl;
if (dht_enabled || interp_enabled) {
dht->prepareKeys(s_curr_wp.input, dt);
@ -178,16 +187,6 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
interp->tryInterpolation(s_curr_wp);
}
if (this->ai_surrogate_enabled) {
// Map valid predictions from the ai surrogate in the workpackage
for (int i = 0; i < s_curr_wp.size; i++) {
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) {
s_curr_wp.mapping[i] = CHEM_AISURR;
}
}
}
phreeqc_time_start = MPI_Wtime();
WorkerRunWorkPackage(s_curr_wp, current_sim_time, dt);

View File

@ -51,7 +51,7 @@ void InitialList::initChemistry(const Rcpp::List &chem) {
// Get base path
ai_surrogate_input_script_path = ai_surrogate_input_script_path.substr(0, ai_surrogate_input_script_path.find_last_of('/') + 1);
// Add the filepath as a global variable in R to enable relative filepaths in the R script
fileContent += "\nai_surrogate_base_path <- \"" + ai_surrogate_input_script_path + "\"";
fileContent.insert(0, "ai_surrogate_base_path <- \"" + ai_surrogate_input_script_path + "\"\n");
this->ai_surrogate_input_script = fileContent;
}

View File

@ -28,7 +28,6 @@
#include "DataStructures/Field.hpp"
#include "Init/InitialList.hpp"
#include "Transport/DiffusionModule.hpp"
#include <RInside.h>
#include <Rcpp.h>
#include <Rcpp/DataFrame.h>
@ -39,9 +38,10 @@
#include <memory>
#include <mpi.h>
#include <string>
#include <mutex>
#include <condition_variable>
#include "Chemistry/SurrogateModels/AI_functions.hpp"
#include <CLI/CLI.hpp>
#include <poet.hpp>
#include <vector>
@ -72,6 +72,7 @@ static void init_global_functions(RInside &R) {
SaveRObj_R = DEFunc("SaveRObj");
}
// HACK: this is a step back as the order and also the count of fields is
// predefined, but it will change in the future
// static inline void writeFieldsToR(RInside &R, const Field &trans,
@ -182,6 +183,12 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params) {
MSG("Work Package Size: " + std::to_string(params.work_package_size));
MSG("DHT is " + BOOL_PRINT(params.use_dht));
MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate));
#ifndef USE_AI_SURROGATE
if (params.use_ai_surrogate) {
throw std::runtime_error("AI Surrogate functions can only be used if they are included during compile time.\n \
Please use the CMake flag -DUSE_AI_SURROGATE=ON.");
}
#endif
if (params.use_dht) {
// MSG("DHT strategy is " + std::to_string(simparams.dht_strategy));
@ -278,7 +285,77 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
if (params.print_progress) {
chem.setProgressBarPrintout(true);
}
/* For the weights and biases of the AI surrogate
* model to use in an inference function with Eigen */
std::mutex Eigen_model_mutex;
static EigenModel Eigen_model;
/* With K Means clustering, a second model will be used
* only for the reactive part of the field (and the first
* model only for the non-reactive part) */
static EigenModel Eigen_model_reactive;
/* For the training data */
std::mutex training_data_buffer_mutex;
std::condition_variable training_data_buffer_full;
bool start_training, end_training;
TrainingData training_data_buffer;
std::vector<int> cluster_labels;
if (params.use_ai_surrogate) {
MSG("AI: Initialize model");
// Initiate two models from one file
Python_Keras_load_model(R["model_file_path"], R["model_reactive_file_path"],
params.use_clustering);
if (!params.disable_training) {
MSG("AI: Initialize training thread");
Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive,
&Eigen_model_mutex, &training_data_buffer,
&training_data_buffer_mutex,
&training_data_buffer_full, &start_training,
&end_training, params);
}
if (!params.use_Keras_predictions) {
// Initialize Eigen model for custom inference function
MSG("AI: Use custom C++ prediction function");
// Get Keras weights from Python
std::vector<std::vector<std::vector<double>>> cpp_weights = Python_Keras_get_weights("model");
// Set model size
size_t num_layers = cpp_weights.size() / 2;
Eigen_model.weight_matrices.resize(num_layers);
Eigen_model.biases.resize(num_layers);
for (size_t i = 0; i < cpp_weights.size(); i += 2) {
size_t rows = cpp_weights[i][0].size();
size_t cols = cpp_weights[i].size();
Eigen_model.weight_matrices[i / 2].resize(rows, cols);
size_t bias_size = cpp_weights[i + 1][0].size();
Eigen_model.biases[i / 2].resize(bias_size);
}
// Set initial model weights
update_weights(&Eigen_model, cpp_weights);
if (params.use_clustering) {
// Initialize Eigen model for reactive part of the field
cpp_weights = Python_Keras_get_weights("model_reactive");
num_layers = cpp_weights.size() / 2;
Eigen_model_reactive.weight_matrices.resize(num_layers);
Eigen_model_reactive.biases.resize(num_layers);
for (size_t i = 0; i < cpp_weights.size(); i += 2) {
size_t rows = cpp_weights[i][0].size();
size_t cols = cpp_weights[i].size();
Eigen_model_reactive.weight_matrices[i / 2].resize(rows, cols);
size_t bias_size = cpp_weights[i + 1][0].size();
Eigen_model_reactive.biases[i / 2].resize(bias_size);
}
update_weights(&Eigen_model_reactive, cpp_weights);
}
}
MSG("AI: Surrogate model initialized");
}
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
R["field_nrow"] = chem.getField().GetRequestedVecSize();
/* SIMULATION LOOP */
double dSimTime{0};
@ -299,77 +376,72 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
chem.getField().update(diffusion.getField());
MSG("Chemistry step");
if (params.use_ai_surrogate) {
double ai_start_t = MPI_Wtime();
// Save current values from the tug field as predictor for the ai step
// Get current values from the tug field for the ai predictions
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(
std::string("predictors <- setNames(data.frame(matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) +
")), TMP_PROPS)"));
R.parseEval("predictors <- predictors[ai_surrogate_species]");
R.parseEval(std::string("predictors <- ") +
"set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)");
// Apply preprocessing
MSG("AI Preprocessing");
R.parseEval("predictors_scaled <- preprocess(predictors)");
std::vector<std::vector<double>> predictors_scaled = R["predictors_scaled"];
// Predict
MSG("AI Predict");
R.parseEval(
"aipreds_scaled <- prediction_step(model, predictors_scaled)");
// Get K-Means cluster assignements based on the preprocessed data
if (params.use_clustering) {
R.parseEval("cluster_labels <- assign_clusters(predictors_scaled)");
cluster_labels = Rcpp::as<std::vector<int>>(R["cluster_labels"]);
}
MSG("AI: Predict");
if (params.use_Keras_predictions) { // Predict with Keras default function
R["TMP"] = Python_Keras_predict(predictors_scaled, params.batch_size, cluster_labels);
} else { // Predict with custom Eigen function
if (params.use_clustering) {
R["TMP"] = Eigen_predict_clustered(Eigen_model, Eigen_model_reactive,
predictors_scaled, params.batch_size,
&Eigen_model_mutex, cluster_labels);
} else {
R["TMP"] = Eigen_predict(Eigen_model, predictors_scaled, params.batch_size,
&Eigen_model_mutex);
}
}
// Apply postprocessing
MSG("AI Postprocesing");
R.parseEval("aipreds <- postprocess(aipreds_scaled)");
MSG("AI: Postprocesing");
R.parseEval(std::string("predictions_scaled <- ") +
"set_field(TMP, ai_surrogate_species, field_nrow, ai_surrogate_species, byrow = TRUE)");
R.parseEval("predictions <- postprocess(predictions_scaled)");
// Validate prediction and write valid predictions to chem field
MSG("AI Validate");
R.parseEval(
"validity_vector <- validate_predictions(predictors, aipreds)");
MSG("AI: Validate");
R.parseEval("validity_vector <- validate_predictions(predictors, predictions)");
MSG("AI Marking accepted");
MSG("AI: Marking valid");
chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector"));
MSG("AI TempField");
std::vector<std::vector<double>> RTempField =
R.parseEval("set_valid_predictions(predictors,\
aipreds,\
validity_vector)");
R.parseEval("set_valid_predictions(predictors, predictions, validity_vector)");
MSG("AI Set Field");
Field predictions_field =
Field(R.parseEval("nrow(predictors)"), RTempField,
R.parseEval("colnames(predictors)"));
Field(R.parseEval("nrow(predictions)"), RTempField,
R.parseEval("colnames(predictions)"));
MSG("AI Update");
MSG("AI: Update field with AI predictions");
chem.getField().update(predictions_field);
// store time for output file
double ai_end_t = MPI_Wtime();
R["ai_prediction_time"] = ai_end_t - ai_start_t;
}
// Run simulation step
MSG("Simulate chemistry");
chem.simulate(dt);
/* AI surrogate iterative training*/
if (params.use_ai_surrogate) {
double ai_start_t = MPI_Wtime();
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(
std::string("targets <- setNames(data.frame(matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) +
")), TMP_PROPS)"));
R.parseEval("targets <- targets[ai_surrogate_species]");
// TODO: Check how to get the correct columns
R.parseEval("target_scaled <- preprocess(targets)");
MSG("AI: incremental training");
R.parseEval("model <- training_step(model, predictors_scaled, "
"target_scaled, validity_vector)");
double ai_end_t = MPI_Wtime();
R["ai_training_time"] = ai_end_t - ai_start_t;
}
// MPI_Barrier(MPI_COMM_WORLD);
double end_t = MPI_Wtime();
dSimTime += end_t - start_t;
@ -381,6 +453,46 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// store_result is TRUE)
call_master_iter_end(R, diffusion.getField(), chem.getField());
/* AI surrogate iterative training*/
if (params.use_ai_surrogate && !params.disable_training) {
// Add values to training data buffer
MSG("AI: Add to training data buffer");
if (!params.train_only_invalid) {
// Use all values if not specified otherwise
R.parseEval("validity_vector[] <- 0");
}
std::vector<std::vector<double>> invalid_x =
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
R.parseEval("target_scaled <- preprocess(state_C[ai_surrogate_species])");
std::vector<std::vector<double>> invalid_y =
R.parseEval("get_invalid_values(target_scaled, validity_vector)");
training_data_buffer_mutex.lock();
training_data_buffer_append(training_data_buffer.x, invalid_x);
training_data_buffer_append(training_data_buffer.y, invalid_y);
// If clustering is used, Add cluster labels to buffer and
// count buffer size according to the cluster assignments
int n_cluster_reactive = 0;
size_t buffer_size = training_data_buffer.x[0].size();
if (params.use_clustering) {
cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels,
R["validity_vector"]);
for (size_t i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer.cluster_labels[i];
}
}
// Signal to the training thread if enough training data is present
if ((n_cluster_reactive >= params.training_data_size) ||
(buffer_size - n_cluster_reactive >= params.training_data_size)) {
start_training = true;
training_data_buffer_full.notify_one();
}
training_data_buffer_mutex.unlock();
}
diffusion.getField().update(chem.getField());
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
@ -426,6 +538,12 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
profiling["chemistry"] = chem_profiling;
profiling["diffusion"] = diffusion_profiling;
if (params.use_ai_surrogate) {
MSG("Finalize Python and wind down training thread");
Python_finalize(&Eigen_model_mutex, &training_data_buffer_mutex,
&training_data_buffer_full, &start_training, &end_training);
}
chem.MasterLoopBreak();
return profiling;
@ -477,7 +595,11 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
int main(int argc, char *argv[]) {
int world_size;
MPI_Init(&argc, &argv);
// Threadsafe MPI is necessary for the AI surrogate
// training thread
int provided;
int required = MPI_THREAD_FUNNELED;
MPI_Init_thread(&argc, &argv, required, &provided);
{
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
@ -548,8 +670,15 @@ int main(int argc, char *argv[]) {
R["out_ext"] = run_params.out_ext;
R["out_dir"] = run_params.out_dir;
// MPI_Barrier(MPI_COMM_WORLD);
DiffusionModule diffusion(init_list.getDiffusionInit(),
init_list.getInitialGrid());
chemistry.masterSetField(init_list.getInitialGrid());
if (run_params.use_ai_surrogate) {
/* Incorporate ai surrogate from R */
// Load default function implementations
R.parseEvalQ(ai_surrogate_r_library);
/* Use dht species for model input and output */
R["ai_surrogate_species"] =
@ -558,23 +687,53 @@ int main(int argc, char *argv[]) {
const std::string ai_surrogate_input_script =
init_list.getChemistryInit().ai_surrogate_input_script;
MSG("AI: sourcing user-provided script");
MSG("AI: Sourcing user-provided script");
R.parseEvalQ(ai_surrogate_input_script);
if (!Rcpp::as<bool>(R.parseEval("exists(\"model_file_path\")"))) {
throw std::runtime_error("AI surrogate input script must contain a value for model_file_path");
}
MSG("AI: initialize AI model");
R.parseEval("model <- initiate_model()");
R.parseEval("gpu_info()");
/* AI surrogate training and inference parameters. (Can be set by declaring a
variable of the same name in one of the the R input scripts)*/
run_params.training_data_size = init_list.getDiffusionInit().n_rows *
init_list.getDiffusionInit().n_cols; // Default value is number of cells in field
if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) {
run_params.batch_size = R["batch_size"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"training_epochs\")"))) {
run_params.training_epochs = R["training_epochs"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"training_data_size\")"))) {
run_params.training_data_size = R["training_data_size"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"use_Keras_predictions\")"))) {
run_params.use_Keras_predictions = R["use_Keras_predictions"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"disable_training\")"))) {
run_params.disable_training = R["disable_training"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"train_only_invalid\")"))) {
run_params.train_only_invalid = R["train_only_invalid"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"save_model_path\")"))) {
run_params.save_model_path = Rcpp::as<std::string>(R["save_model_path"]);
MSG("AI: Model will be saved as \"" + run_params.save_model_path + "\"");
}
if (Rcpp::as<bool>(R.parseEval("exists(\"assign_clusters\")"))) {
run_params.use_clustering = true;
MSG("Clustering will be used for the AI surrogate")
}
if (!Rcpp::as<bool>(R.parseEval("exists(\"model_reactive_file_path\")"))) {
R.parseEval("model_reactive_file_path <- model_file_path");
}
MSG("AI: Initialize Python for the AI surrogate functions");
std::string python_keras_file = std::string(SRC_DIR) +
"/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py";
Python_Keras_setup(python_keras_file, run_params.cuda_src_dir);
}
MSG("Init done on process with rank " + std::to_string(MY_RANK));
// MPI_Barrier(MPI_COMM_WORLD);
DiffusionModule diffusion(init_list.getDiffusionInit(),
init_list.getInitialGrid());
chemistry.masterSetField(init_list.getInitialGrid());
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
MSG("finished simulation loop");

View File

@ -29,11 +29,13 @@
#include <Rcpp.h>
#define SRC_DIR "@CMAKE_SOURCE_DIR@"
#define CUDA_SRC_DIR "@CUDA_TOOLKIT_ROOT_DIR@"
static const char *poet_version = "@POET_VERSION@";
// using the Raw string literal to avoid escaping the quotes
static const inline std::string kin_r_library = R"(@R_KIN_LIB@)";
static const inline std::string init_r_library = R"(@R_INIT_LIB@)";
static const inline std::string ai_surrogate_r_library =
R"(@R_AI_SURROGATE_LIB@)";
@ -67,5 +69,15 @@ struct RuntimeParameters {
static constexpr std::uint32_t INTERP_BUCKET_ENTRIES_DEFAULT = 20;
std::uint32_t interp_bucket_entries;
bool use_ai_surrogate = false;
/*AI surriogate configuration*/
bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate
bool disable_training = false; // Can be set in the R input script
bool use_clustering = false; // Can be set in the R input script
bool use_Keras_predictions = false; // Can be set in the R input script
bool train_only_invalid = false; // Can be set in the R input script
int batch_size = 2560; // default value determined in test on the UP Turing cluster
int training_epochs = 20;; // Can be set in the R input script
int training_data_size; // Can be set in the R input script
std::string save_model_path = ""; // Can be set in the R input script
std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake
};