Feat: Conditional waiting and training data buffer

This commit is contained in:
straile 2024-10-10 20:11:52 +02:00
parent 0bf51d0f02
commit c2535b03a7
7 changed files with 318 additions and 111 deletions

View File

@ -16,7 +16,6 @@ list(APPEND CMAKE_MODULE_PATH "${POET_SOURCE_DIR}/CMake")
get_poet_version()
find_package(MPI REQUIRED)
find_package(RRuntime REQUIRED)
add_subdirectory(src)

View File

@ -16,3 +16,7 @@ set_valid_predictions <- function(temp_field, prediction, validity) {
temp_field[validity == 1, ] <- prediction[validity == 1, ]
return(temp_field)
}
get_invalid_values <- function(df, validity) {
return(df[validity == 0, ])
}

View File

@ -1,23 +1,10 @@
## Time-stamp: "Last modified 2024-05-30 13:27:06 delucia"
## load a pretrained model from tensorflow file
## Use the global variable "ai_surrogate_base_path" when using file paths
## relative to the input script
model_file_path <- normalizePath(paste0(ai_surrogate_base_path,
"barite_50ai_all.keras"))
initiate_model <- function() {
require(keras3)
require(tensorflow)
init_model <- normalizePath(paste0(ai_surrogate_base_path,
"barite_50ai_all.keras"))
Model <- keras3::load_model(init_model)
msgm("Loaded model:")
print(str(Model))
return(Model)
}
scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) {
return((x * (max - min)) + min)
@ -26,43 +13,127 @@ scale_min_max <- function(x, min, max, backtransform) {
}
}
minmax <- list(min = c(H = 111.012433592824, O = 55.5062185549492, Charge = -3.1028354471876e-08,
Ba = 1.87312878574393e-141, Cl = 0, `S(6)` = 4.24227510643685e-07,
Sr = 0.00049382996130541, Barite = 0.000999542409828586, Celestite = 0.244801877115968),
max = c(H = 111.012433679682, O = 55.5087003521685, Charge = 5.27666636082035e-07,
Ba = 0.0908849779513762, Cl = 0.195697626449355, `S(6)` = 0.000620774752665846,
Sr = 0.0558680070692722, Barite = 0.756779139057097, Celestite = 1.00075422160624
))
## Apply decimal logarithms handling negative values
Safelog <- function (vec) {
rv <- range(vec)
if (max(abs(rv)) < 1) {
ret <- vec
ret[vec == 0] <- 0
ret[vec > 0] <- log10(vec[vec > 0])
ret[vec < 0] <- -log10(-vec[vec < 0])
} else {
ret <- log10(abs(vec))
}
ret
}
Safeexp <- function (vec) {
ret <- vec
ret[vec == 0] <- 0
ret[vec > 0] <- -10^(-vec[vec > 0])
ret[vec < 0] <- 10^(vec[vec < 0])
ret
}
##' @title Apply transformations to cols of a data.frame
##' @param df named data.frame
##' @param tlist list of function names
##' @return data.frame with the columns specified in tlist and the
##' transformed numerical values
##' @author MDL
TransfList <- function (df, tlist) {
vars <- intersect(colnames(df), names(tlist))
lens <- sapply(tlist[vars], length)
n <- max(lens)
filledlist <- lapply(tlist[vars],
function(x)
if (length(x) < n)
return(c(x, rep("I", n - length(x))))
else
return(x))
tmp <- df[, vars]
for (i in seq_len(n)) {
tmp <- as.data.frame(sapply(vars, function(x)
do.call(filledlist[[x]][i], list(tmp[, x]))))
}
return(tmp)
}
##' This function applies some specified string substitution such as
##' 's/log/exp/' so that from a list of "forward" transformation
##' functions it computes a "backward" one
##' @title Apply back transformations to cols of a data.frame
##' @param df named data.frame
##' @param tlist list of function names
##' @return data.frame with the columns specified in tlist and the
##' backtransformed numerical values
##' @author MDL
BackTransfList <- function (df, tlist) {
vars <- intersect(colnames(df), names(tlist))
lens <- sapply(tlist[vars], length)
n <- max(lens)
filledlist <- lapply(tlist[vars],
function(x)
if (length(x) < n)
return(c(x, rep("I", n - length(x))))
else
return(x))
rlist <- lapply(filledlist, rev)
rlist <- lapply(rlist, sub, pattern = "log", replacement = "exp")
rlist <- lapply(rlist, sub, pattern = "1p", replacement = "m1")
rlist <- lapply(rlist, sub, pattern = "Mul", replacement = "Div")
tmp <- df[, vars]
for (i in seq_len(n)) {
tmp <- as.data.frame(sapply(vars, function(x)
do.call(rlist[[x]][i], list(tmp[, x]))))
}
return(tmp)
}
tlist <- list("H" = "log1p", "O" = "log1p", "Charge" = "Safelog",
"Ba" = "log1p", "Cl" = "log1p", "S(6)" = "log1p",
"Sr" = "log1p", "Barite" = "log1p", "Celestite" = "log1p")
minmaxlog <- list(min = c(H = 4.71860987935512, O = 4.03435069501537,
Charge = -14.5337693764617, Ba = 1.87312878574393e-141,
Cl = 0, `S(6)` = 4.2422742065922e-07,
Sr = 0.00049370806741832, Barite = 0.000999043199940672,
Celestite = 0.218976382406766),
max = c(H = 4.71860988013054, O = 4.03439461483133,
Charge = 12.9230752782909, Ba = 0.086989273200771,
Cl = 0.178729802869381, `S(6)` = 0.000620582151722819,
Sr = 0.0543631841661421, Barite = 0.56348209786429,
Celestite = 0.69352422027466))
preprocess <- function(df) {
if (!is.data.frame(df))
df <- as.data.frame(df, check.names = FALSE)
as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x=df[x],
min=minmax$min[x],
max=minmax$max[x],
backtransform=FALSE)),
tlog <- TransfList(df, tlist)
as.data.frame(lapply(colnames(tlog),
function(x) scale_min_max(x = tlog[x],
min = minmaxlog$min[x],
max = minmaxlog$max[x],
backtransform = FALSE)),
check.names = FALSE)
}
postprocess <- function(df) {
if (!is.data.frame(df))
df <- as.data.frame(df, check.names = FALSE)
as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x=df[x],
min=minmax$min[x],
max=minmax$max[x],
backtransform=TRUE)),
ret <- as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x = df[x],
min = minmaxlog$min[x],
max = minmaxlog$max[x],
backtransform = TRUE)),
check.names = FALSE)
BackTransfList(ret, tlist)
}
mass_balance <- function(predictors, prediction) {
dBa <- abs(prediction$Ba + prediction$Barite -
predictors$Ba - predictors$Barite)
dSr <- abs(prediction$Sr + prediction$Celestite -
predictors$Sr - predictors$Celestite)
mass_balance <- function(x, y) {
dBa <- abs(y$Ba + y$Barite -
(x$Ba + x$Barite))
dSr <- abs(y$Sr + y$Celestite -
(x$Sr + x$Celestite))
return(dBa + dSr)
}
@ -76,19 +147,3 @@ validate_predictions <- function(predictors, prediction) {
sum(ret))
return(ret)
}
training_step <- function(model, predictor, target, validity) {
msgm("Starting incremental training:")
## x <- as.matrix(predictor)
## y <- as.matrix(target[colnames(x)])
history <- model %>% keras3::fit(x = data.matrix(predictor),
y = data.matrix(target),
epochs = 10, verbose=1)
keras3::save_model(model,
filepath = paste0(out_dir, "/current_model.keras"),
overwrite=TRUE)
return(model)
}

View File

@ -27,12 +27,15 @@ option(USE_AI_SURROGATE "Compiles the AI surrogate functions with Python.h integ
if(USE_AI_SURROGATE)
add_definitions(-DUSE_AI_SURROGATE)
message("Building with AI surrogate functions")
message(" - Needs Python with Numpy and Keras and Threads libraries")
# make sure to use the python installation from the conda environment
if(DEFINED ENV{CONDA_PREFIX})
set(Python3_EXECUTABLE "$ENV{CONDA_PREFIX}/bin/python3")
endif()
find_package(Python3 COMPONENTS Interpreter Development NumPy REQUIRED)
find_package(Threads REQUIRED)
set_source_files_properties(
Chemistry/SurrogateModels/AI_functions.cpp
PROPERTIES COMPILE_FLAGS "-O3 -march=native -mtune=native"
@ -50,6 +53,7 @@ if(USE_AI_SURROGATE)
target_link_libraries(POETLib
PUBLIC "${Python3_LIBRARIES}"
PUBLIC Threads::Threads pthread
)
endif() # USE_AI_SURROGATE

View File

@ -5,6 +5,9 @@
#include <Python.h>
#include <numpy/arrayobject.h>
#include <Eigen/Dense>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "AI_functions.hpp"
@ -51,6 +54,95 @@ int Python_Keras_load_model(std::string model_file_path) {
return py_model_loaded;
}
/**
* @brief Function for threadsafe parallel training and weight updating.
* The function waits conditionally until the training data buffer is full.
* It then clears the buffer and starts training, after training it writes the new weights to
* the Eigen model.
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @return 0 if function was succesful
*/
void parallel_training(EigenModel* Eigen_model,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training) {
while (true) {
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
// Conditional waiting:
// Sleeps until a signal arrives on training_data_buffer_full
// Releases the lock on training_data_buffer_mutex while sleeping
// Lambda function with start_training checks if it was a spurious wakeup
// Reaquires the lock on training_data_buffer_mutex after waking up
// If start_training has been set to true while the thread was active, it does NOT
// Wait for a signal on training_data_buffer_full but starts the next round immediately.
training_data_buffer_full->wait(lock, [start_training] { return *start_training;});
// Reset the waiting predicate
*start_training = false;
// Get the necessary training data
// Initialize training data input and targets
std::vector<std::vector<double>> inputs(9);
std::vector<std::vector<double>> targets(9);
for (int col = 0; col < training_data_buffer->x.size(); col++) {
// Copy data from the front of the buffer to the training inputs
std::copy(training_data_buffer->x[col].begin(),
training_data_buffer->x[col].begin() + 2000,
std::back_inserter(inputs[col]));
// Remove copied data from the front of the buffer
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin(),
training_data_buffer->x[col].begin() + 2000);
// Copy data from the front of the buffer to the training targets
std::copy(training_data_buffer->y[col].begin(),
training_data_buffer->y[col].begin() + 2000,
std::back_inserter(targets[col]));
// Remove copied data from the front of the buffer
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin(),
training_data_buffer->y[col].begin() + 2000);
}
std::cout << "Training data rows: " << training_data_buffer->y[0].size() << std::endl;
// Unlock the training_data_buffer_mutex
lock.unlock();
// TODO: Actual training
}
}
/**
* @brief Starts a thread for parallel training and weight updating. This Wrapper function
* ensures, that the main POET program can be built without pthread support if the AI
* surrogate functions are disabled during compilation.
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @return 0 if function was succesful
*/
int Python_Keras_training_thread(EigenModel* Eigen_model,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training) {
std::thread training_thread(parallel_training, Eigen_model, Eigen_model_mutex,
training_data_buffer, training_data_buffer_mutex,
training_data_buffer_full, start_training);
training_thread.detach();
return 0;
}
/**
* @brief Converts the std::vector 2D matrix representation of a POET Field object to a numpy array
* for use in the Python AI surrogate functions
@ -162,9 +254,7 @@ Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<Eigen::MatrixXd>& input
* @brief Converts the weights and biases from the Python Keras model to Eigen matrices
* @return A EigenModel struct containing the model weights and biases as aligned Eigen matrices
*/
EigenModel Python_Keras_get_weights_as_Eigen() {
EigenModel eigen_model;
void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) {
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
@ -179,6 +269,9 @@ EigenModel Python_Keras_get_weights_as_Eigen() {
throw std::runtime_error("Failed to get weights from Keras model");
}
// Clear old values
eigen_model.weight_matrices.clear();
eigen_model.biases.clear();
Py_ssize_t num_layers = PyList_Size(py_weights_list);
for (Py_ssize_t i = 0; i < num_layers; i += 2) {
// Get weight matrix
@ -240,8 +333,6 @@ EigenModel Python_Keras_get_weights_as_Eigen() {
// Clean up
Py_DECREF(py_weights_list);
Py_DECREF(args);
return eigen_model;
}
/**

View File

@ -32,6 +32,11 @@ struct EigenModel {
aligned_vector<Eigen::VectorXd> biases;
};
struct TrainingData {
std::vector<std::vector<double>> x;
std::vector<std::vector<double>> y;
};
// Ony declare the actual functions if flag is set
#ifdef USE_AI_SURROGATE
@ -41,13 +46,16 @@ void Python_finalize();
int Python_Keras_load_model(std::string model_file_path);
int Python_Keras_training_thread(EigenModel* Eigen_model,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training);
std::vector<double> Python_keras_predict(std::vector<std::vector<double>> x, int batch_size);
EigenModel Python_Keras_get_weights_as_Eigen();
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights();
EigenModel transform_weights(const std::vector<std::vector<std::vector<double>>>& weights);
void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model);
Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<Eigen::MatrixXd>& input_batch, const EigenModel& model);
@ -60,8 +68,11 @@ std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vect
inline void Python_Keras_setup(std::string functions_file_path){}
inline void Python_finalize(){};
inline void Python_Keras_load_model(std::string model_file_path){}
inline int Python_Keras_training_thread(EigenModel*, std::mutex*,
TrainingData*, std::mutex*,
std::condition_variable*, bool*){return {};}
inline std::vector<double> Python_keras_predict(std::vector<std::vector<double>>, int){return {};}
inline EigenModel Python_Keras_get_weights_as_Eigen(){return {};}
inline void Python_Keras_set_weights_as_Eigen(EigenModel&){}
inline EigenModel transform_weights(const std::vector<std::vector<std::vector<double>>>){return {};}
inline std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(){return {};}
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>, int){return {};}

View File

@ -28,8 +28,6 @@
#include "DataStructures/Field.hpp"
#include "Init/InitialList.hpp"
#include "Transport/DiffusionModule.hpp"
#include "Chemistry/SurrogateModels/AI_functions.hpp"
#include <RInside.h>
#include <Rcpp.h>
#include <Rcpp/DataFrame.h>
@ -40,7 +38,9 @@
#include <memory>
#include <mpi.h>
#include <string>
#include <mutex>
#include <condition_variable>
#include "Chemistry/SurrogateModels/AI_functions.hpp"
#include <CLI/CLI.hpp>
#include <poet.hpp>
@ -73,6 +73,20 @@ static void init_global_functions(RInside &R) {
SaveRObj_R = DEFunc("SaveRObj");
}
/* Global variables for the AI surrogate model */
/* For the weights and biases of the model
* to use in an inference function with Eigen */
std::mutex Eigen_model_mutex;
static EigenModel Eigen_model;
/* For the training data */
std::mutex training_data_buffer_mutex;
std::condition_variable training_data_buffer_full;
bool start_training;
TrainingData training_data_buffer;
// HACK: this is a step back as the order and also the count of fields is
// predefined, but it will change in the future
// static inline void writeFieldsToR(RInside &R, const Field &trans,
@ -282,10 +296,6 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
* master for the following loop) */
uint32_t maxiter = params.timesteps.size();
/* Stores the weights and biases of the AI surrogate model
* for use in an inference function with Eigen */
EigenModel Eigen_model;
if (params.print_progress) {
chem.setProgressBarPrintout(true);
}
@ -337,70 +347,97 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
MSG("Keras predictions:")
R.parseEval("print(head(predictions_scaled))");*/
static EigenModel Eigen_model;
Eigen_model = Python_Keras_get_weights_as_Eigen();
MSG("Eigen predictions")
MSG("AI: Predicting");
R["TMP"] = Eigen_predict(Eigen_model, R["predictors_scaled"], params.batch_size);
MSG("Eigen scaling")
R.parseEval(std::string("predictions_scaled <- matrix(TMP, ") +
"nrow=nrow(predictors), byrow = TRUE)");
R.parseEval(std::string("predictions_scaled <- ") +
"setNames(data.frame(predictions_scaled), colnames(predictors))");
R.parseEval("print(head(predictions_scaled))");
// Apply postprocessing
MSG("AI Postprocesing");
MSG("AI: Postprocesing");
R.parseEval("predictions <- postprocess(predictions_scaled)");
// Validate prediction and write valid predictions to chem field
MSG("AI Validate");
MSG("AI: Validate");
R.parseEval("validity_vector <- validate_predictions(predictors, predictions)");
MSG("AI Marking valid");
MSG("AI: Marking valid");
chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector"));
std::vector<std::vector<double>> RTempField =
R.parseEval("set_valid_predictions(predictors, predictions, validity_vector)");
// ToDo: Set temp field for training data here
Field predictions_field =
Field(R.parseEval("nrow(predictors)"), RTempField,
R.parseEval("colnames(predictors)"));
MSG("AI Update field with AI predictions");
MSG("AI: Update field with AI predictions");
chem.getField().update(predictions_field);
// Add to training data buffer:
// Input values for which the predictions were invalid
MSG("AI: Add invalid input data to training data buffer");
std::vector<std::vector<double>> R_temp_x =
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
training_data_buffer_mutex.lock();
// Initialize training data buffer if empty
if (training_data_buffer.x.size() == 0) {
training_data_buffer.x = R_temp_x;
} else { // otherwise append
for (int col = 0; col < training_data_buffer.x.size(); col++) {
training_data_buffer.x[col].insert(training_data_buffer.x[col].end(),
R_temp_x[col].begin(), R_temp_x[col].end());
}
}
training_data_buffer_mutex.unlock();
double ai_end_t = MPI_Wtime();
R["ai_prediction_time"] = ai_end_t - ai_start_t;
}
// Run simulation step
MSG("Simulate chemistry");
chem.simulate(dt);
/* AI surrogate iterative training*/
if (params.use_ai_surrogate) {
double ai_start_t = MPI_Wtime();
// Add to training data buffer targets:
// True values for invalid predictions
MSG("AI: Add invalid target data to training data buffer");
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(
std::string("targets <- setNames(data.frame(matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) +
")), TMP_PROPS)"));
R.parseEval("targets <- targets[ai_surrogate_species]");
// TODO: Check how to get the correct columns
R.parseEval("targets <- matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) + ")");
R.parseEval("targets <- setNames(data.frame(targets), TMP_PROPS)");
R.parseEval("targets <- predictors[ai_surrogate_species]");
R.parseEval("target_scaled <- preprocess(targets)");
MSG("AI: incremental training");
R.parseEval("model <- training_step(model, predictors_scaled, "
"target_scaled, validity_vector)");
double ai_end_t = MPI_Wtime();
R["ai_training_time"] = ai_end_t - ai_start_t;
std::vector<std::vector<double>> R_temp_y =
R.parseEval("get_invalid_values(target_scaled, validity_vector)");
training_data_buffer_mutex.lock();
// Initialize training data buffer if empty
if (training_data_buffer.y.size() == 0) {
training_data_buffer.y = R_temp_y;
} else { // otherwise append
for (int col = 0; col < training_data_buffer.y.size(); col++) {
training_data_buffer.y[col].insert(training_data_buffer.y[col].end(),
R_temp_y[col].begin(), R_temp_y[col].end());
}
}
training_data_buffer_mutex.unlock();
// Signal to training thread if training data buffer is full
if (training_data_buffer.y[0].size() > 2000) {
start_training = true;
training_data_buffer_full.notify_one();
}
}
// MPI_Barrier(MPI_COMM_WORLD);
@ -615,6 +652,12 @@ int main(int argc, char *argv[]) {
MSG("AI: Initialize model");
Python_Keras_load_model(R["model_file_path"]);
Python_Keras_set_weights_as_Eigen(Eigen_model);
MSG("AI: Initialize training thread");
Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex,
&training_data_buffer, &training_data_buffer_mutex,
&training_data_buffer_full, &start_training);
MSG("AI: Surrogate model initialized");
}