feat: option to save current ai model after training

This commit is contained in:
hans 2024-10-12 16:04:24 +02:00
parent 0017a20e82
commit a289fc7790
11 changed files with 146 additions and 171 deletions

View File

@ -244,7 +244,7 @@ process of POET remains mostly the same as shown above, but the CMake
option `-DUSE_AI_SURROGATE=ON` must be set.
To use the AI surrogate, you must declare several values in the R input
scipt. This can be either done directly in the input script or in an
script. This can be either done directly in the input script or in an
additional file. This file can be provided by adding the file path as the
element `ai_surrogate_input_script` to the `chemistry_setup` list in the
R input script.
@ -255,7 +255,7 @@ the AI surrogate model is initialized.
- `validate_predictions(predictors, prediction)` [*function*]: Returns a boolean
vector of length `nrow(predictions)`. The output of this function defines
which predictions are considered valid and which are rejected. Regular
siumulation will only be done for the rejected values, and the results
simulation will only be done for the rejected values, and the results
will be added to the training data buffer of the AI surrogate model.
Can eg. be implemented as a mass balance threshold between the predictors
and the prediction.
@ -263,7 +263,7 @@ and the prediction.
The following variables and functions can be declared:
- `batch_size` [*int*]: Batch size for the inference and training functions,
defualts to 2560.
defaults to 2560.
- `training_epochs` [*int*]: Number of training epochs with each training data
set, defaults to 20.
@ -274,14 +274,14 @@ data from the front of the buffer. Defaults to the size of the Field.
- `use_Keras_predictions` [*bool*]: Decides if the Keras prediction function
should be used instead of the custom C++ implementation (Keras might be faster
for larger models, especially on GPU). Defualts to false.
for larger models, especially on GPU). Defaults to false.
- `preprocess(df, backtransform = FALSE, outputs = FALSE)` [*function*]:
Returns the scaled/transformed/backtransformed dataframe. The `backtransform`
Returns the scaled/transformed/backtransformed data frame. The `backtransform`
flag signals if the current processing step is applied to data that is
assumed to be scaled and expects backtransformed values. The `outputs`
flag signals if the current processing step is applied to the output
or tatget of the model. This can be used to eg. skip these processing
or target of the model. This can be used to eg. skip these processing
steps and only scale the model input. The default implementation uses no
transformations.

View File

@ -19,4 +19,12 @@ set_valid_predictions <- function(temp_field, prediction, validity) {
get_invalid_values <- function(df, validity) {
return(df[validity == 0, ])
}
}
set_field <- function(temp_field, columns, rows, column_name_limit,
byrow = FALSE) {
temp_field <- matrix(temp_field, nrow = rows, byrow = byrow)
temp_field <- setNames(data.frame(temp_field), columns)
temp_field <- temp_field[column_name_limit]
return(temp_field)
}

View File

@ -76,8 +76,7 @@ master_iteration_end <- function(setup, state_T, state_C) {
state_C <- data.frame(state_C, check.names = FALSE)
ai_surrogate_info <- list(
prediction_time = if (exists("ai_prediction_time")) as.integer(ai_prediction_time) else NULL,
training_time = if (exists("ai_training_time")) as.integer(ai_training_time) else NULL,
prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL,
valid_predictions = if (exists("validity_vector")) validity_vector else NULL
)

View File

@ -47,8 +47,7 @@ dht_species <- c(
)
chemistry_setup <- list(
dht_species = dht_species,
ai_surrogate_input_script = "./barite_200ai_surrogate_input_script.R"
dht_species = dht_species
)
# Define a setup list for simulation configuration

View File

@ -1,48 +0,0 @@
## load a pretrained model from tensorflow file
## Use the global variable "ai_surrogate_base_path" when using file paths
## relative to the input script
initiate_model <- function() {
init_model <- normalizePath(paste0(ai_surrogate_base_path,
"model_min_max_float64.keras"))
return(load_model_tf(init_model))
}
scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) {
return((x * (max - min)) + min)
} else {
return((x - min) / (max - min))
}
}
preprocess <- function(df, backtransform = FALSE, outputs = FALSE) {
minmax_file <- normalizePath(paste0(ai_surrogate_base_path,
"min_max_bounds.rds"))
global_minmax <- readRDS(minmax_file)
for (column in colnames(df)) {
df[column] <- lapply(df[column],
scale_min_max,
global_minmax$min[column],
global_minmax$max[column],
backtransform)
}
return(df)
}
mass_balance <- function(predictors, prediction) {
dBa <- abs(prediction$Ba + prediction$Barite -
predictors$Ba - predictors$Barite)
dSr <- abs(prediction$Sr + prediction$Celestite -
predictors$Sr - predictors$Celestite)
return(dBa + dSr)
}
validate_predictions <- function(predictors, prediction) {
epsilon <- 3e-5
mb <- mass_balance(predictors, prediction)
msgm("Mass balance mean:", mean(mb))
msgm("Mass balance variance:", var(mb))
msgm("Rows where mass balance meets threshold", epsilon, ":",
sum(mb < epsilon))
return(mb < epsilon)
}

View File

@ -5,6 +5,10 @@
model_file_path <- normalizePath(paste0(ai_surrogate_base_path,
"barite_50ai_all.keras"))
batch_size <- 1280
training_epochs <- 20
save_model_path <- "current_model.keras"
scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) {
return((x * (max - min)) + min)

View File

@ -7,7 +7,6 @@ def initiate_model(model_file_path):
return model
def training_step(model, x, y, x_val, y_val, batch_size, epochs):
epochs = 2000 # This is a constant parameter during all experiments
history = model.fit(x, y,
epochs=epochs,
batch_size=batch_size,
@ -21,13 +20,15 @@ def prediction_step(model, x, batch_size):
def get_weights(model):
weights = model.get_weights()
#return weights
return [w.astype(np.float64) for w in weights]
return weights
def training_step(model, x, y, batch_size, epochs):
def training_step(model, x, y, batch_size, epochs, output_file_path):
history = model.fit(x, y,
epochs=epochs,
batch_size=batch_size)
print(history, flush=True)
if output_file_path:
if output_file_path[-6:] != ".keras":
output_file_path += ".keras"
model.save(output_file_path)
return history

View File

@ -43,6 +43,9 @@ int Python_Keras_setup(std::string functions_file_path) {
* @return 0 if function was succesful
*/
int Python_Keras_load_model(std::string model_file_path) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Initialize Keras model
int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" +
model_file_path + "\")").c_str());
@ -50,6 +53,9 @@ int Python_Keras_load_model(std::string model_file_path) {
PyErr_Print(); // Ensure that python errors make it to stdout
throw std::runtime_error("Keras model could not be loaded from: " + model_file_path);
}
// Release the Python GIL
PyGILState_Release(gstate);
return py_model_loaded;
}
@ -147,7 +153,63 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>> x, int
}
void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vector<double>> y, int batch_size, int epochs) {
/**
* @brief Uses the Eigen representation of the Keras model weights for fast inference
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size) {
// Convert input data to Eigen matrix
const int num_samples = x[0].size();
const int num_features = x.size();
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
for (int i = 0; i < num_samples; ++i) {
for (int j = 0; j < num_features; ++j) {
full_input_matrix(j, i) = x[j][i];
}
}
std::vector<double> result;
result.reserve(num_samples * num_features);
if (num_features != model.weight_matrices[0].cols()) {
throw std::runtime_error("Input data size " + std::to_string(num_features) + \
" does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols()));
}
int num_batches = std::ceil(static_cast<double>(num_samples) / batch_size);
for (int batch = 0; batch < num_batches; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, num_samples);
int current_batch_size = end_idx - start_idx;
// Extract the current input data batch
Eigen::MatrixXd batch_data(num_features, current_batch_size);
batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size);
// Predict
batch_data = eigen_inference_batched(batch_data, model);
result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size());
}
return result;
}
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer, std::vector<std::vector<double>>& new_values) {
// Initialize training data buffer if empty
if (training_data_buffer.size() == 0) {
training_data_buffer = new_values;
} else { // otherwise append
for (int col = 0; col < training_data_buffer.size(); col++) {
training_data_buffer[col].insert(training_data_buffer[col].end(),
new_values[col].begin(), new_values[col].end());
}
}
}
void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vector<double>> y, int batch_size, int epochs,
std::string save_model_path) {
// Prepare data for python
PyObject* py_df_x = vector_to_numpy_array(x);
PyObject* py_df_y = vector_to_numpy_array(y);
@ -159,9 +221,10 @@ void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vec
PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step");
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOOii)",
py_keras_model, py_df_x, py_df_y, batch_size, epochs);
PyObject* args = Py_BuildValue("(OOOiis)",
py_keras_model, py_df_x, py_df_y, batch_size, epochs, save_model_path.c_str());
// Call the Python training function
PyObject *py_rv = PyObject_CallObject(py_training_function, args);
@ -196,7 +259,7 @@ void parallel_training(EigenModel* Eigen_model,
std::condition_variable* training_data_buffer_full,
bool* start_training,
int batch_size, int epochs, int training_data_size,
bool use_Keras_predictions) {
bool use_Keras_predictions, std::string save_model_path) {
while (true) {
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
// Conditional waiting:
@ -238,10 +301,12 @@ void parallel_training(EigenModel* Eigen_model,
lock.unlock();
std::cout << "AI: Training thread: Start training" << std::endl;
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Start training
Python_keras_train(inputs, targets, batch_size, epochs);
Python_keras_train(inputs, targets, batch_size, epochs, save_model_path);
if (!use_Keras_predictions) {
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
@ -275,13 +340,15 @@ int Python_Keras_training_thread(EigenModel* Eigen_model,
std::condition_variable* training_data_buffer_full,
bool* start_training,
int batch_size, int epochs, int training_data_size,
bool use_Keras_predictions) {
bool use_Keras_predictions,
std::string save_model_path) {
PyThreadState *_save = PyEval_SaveThread();
std::thread training_thread(parallel_training, Eigen_model, Eigen_model_mutex,
training_data_buffer, training_data_buffer_mutex,
training_data_buffer_full, start_training,
batch_size, epochs, training_data_size,
use_Keras_predictions);
use_Keras_predictions, save_model_path);
training_thread.detach();
return 0;
}
@ -312,6 +379,9 @@ Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<Eigen::MatrixXd>& input
* @return A EigenModel struct containing the model weights and biases as aligned Eigen matrices
*/
void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
@ -390,49 +460,8 @@ void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) {
// Clean up
Py_DECREF(py_weights_list);
Py_DECREF(args);
}
/**
* @brief Uses the Eigen representation of the Keras model weights for fast inference
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size) {
// Convert input data to Eigen matrix
const int num_samples = x[0].size();
const int num_features = x.size();
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
for (int i = 0; i < num_samples; ++i) {
for (int j = 0; j < num_features; ++j) {
full_input_matrix(j, i) = x[j][i];
}
}
std::vector<double> result;
result.reserve(num_samples * num_features);
if (num_features != model.weight_matrices[0].cols()) {
throw std::runtime_error("Input data size " + std::to_string(num_features) + \
" does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols()));
}
int num_batches = std::ceil(static_cast<double>(num_samples) / batch_size);
for (int batch = 0; batch < num_batches; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, num_samples);
int current_batch_size = end_idx - start_idx;
// Extract the current input data batch
Eigen::MatrixXd batch_data(num_features, current_batch_size);
batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size);
// Predict
batch_data = eigen_inference_batched(batch_data, model);
result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size());
}
return result;
// Release the Python GIL
PyGILState_Release(gstate);
}
void Python_finalize() {

View File

@ -50,6 +50,8 @@ int Python_Keras_load_model(std::string model_file_path);
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>> x, int batch_size);
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer, std::vector<std::vector<double>>& new_values);
int Python_Keras_training_thread(EigenModel* Eigen_model,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
@ -57,7 +59,7 @@ int Python_Keras_training_thread(EigenModel* Eigen_model,
std::condition_variable* training_data_buffer_full,
bool* start_training,
int batch_size, int epochs, int training_data_size,
bool use_Keras_predictions);
bool use_Keras_predictions, std::string save_model_path);
void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model);
@ -70,11 +72,12 @@ std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vect
inline void Python_Keras_setup(std::string functions_file_path){}
inline void Python_finalize(){}
inline void Python_Keras_load_model(std::string model_file_path){}
inline void training_data_buffer_append(std::vector<std::vector<double>>&, std::vector<std::vector<double>>&){}
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>, int){return {};}
inline int Python_Keras_training_thread(EigenModel*, std::mutex*,
TrainingData*, std::mutex*,
std::condition_variable*, bool*,
int, int, int, bool){return {};}
int, int, int, bool, std::string){return {};}
inline void Python_Keras_set_weights_as_Eigen(EigenModel&){}
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>, int){return {};}
#endif

View File

@ -300,12 +300,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
chem.setProgressBarPrintout(true);
}
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
R["field_nrow"] = chem.getField().GetRequestedVecSize();
/* SIMULATION LOOP */
double dSimTime{0};
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
double start_t = MPI_Wtime();
const double &dt = params.timesteps[iter - 1];
// cout << "CPP: Next time step is " << dt << "[s]" << endl;
@ -325,17 +326,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
double ai_start_t = MPI_Wtime();
// Get current values from the tug field for the ai predictions
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(std::string("predictors <- ") +
"set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)");
R.parseEval("predictors <- matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) + ")");
R.parseEval("predictors <- setNames(data.frame(predictors), TMP_PROPS)");
R.parseEval("predictors <- predictors[ai_surrogate_species]");
// Apply preprocessing
MSG("AI Preprocessing");
R.parseEval("predictors_scaled <- preprocess(predictors)");
MSG("AI: Predict");
if (params.use_Keras_predictions) { // Predict with Keras default function
R["TMP"] = Python_Keras_predict(R["predictors_scaled"], params.batch_size);
@ -346,18 +343,14 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// Apply postprocessing
MSG("AI: Postprocesing");
R.parseEval(std::string("predictions_scaled <- ") +
"set_field(TMP, ai_surrogate_species, field_nrow, ai_surrogate_species, byrow = TRUE)");
R.parseEval("predictions <- postprocess(predictions_scaled)");
R.parseEval(std::string("predictions_scaled <- matrix(TMP, ") +
"nrow=nrow(predictors), byrow = TRUE)");
R.parseEval(std::string("predictions_scaled <- ") +
"setNames(data.frame(predictions_scaled), colnames(predictors))");
// Validate prediction and write valid predictions to chem field
MSG("AI: Validate");
R.parseEval("validity_vector <- validate_predictions(predictors, predictions)");
MSG("AI: Marking valid");
chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector"));
std::vector<std::vector<double>> RTempField =
@ -370,27 +363,18 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
MSG("AI: Update field with AI predictions");
chem.getField().update(predictions_field);
// store time for output file
double ai_end_t = MPI_Wtime();
R["ai_prediction_time"] = ai_end_t - ai_start_t;
// Add to training data buffer:
// Input values for which the predictions were invalid
MSG("AI: Add invalid input data to training data buffer");
std::vector<std::vector<double>> R_temp_x =
std::vector<std::vector<double>> invalid_x =
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
training_data_buffer_mutex.lock();
// Initialize training data buffer if empty
if (training_data_buffer.x.size() == 0) {
training_data_buffer.x = R_temp_x;
} else { // otherwise append
for (int col = 0; col < training_data_buffer.x.size(); col++) {
training_data_buffer.x[col].insert(training_data_buffer.x[col].end(),
R_temp_x[col].begin(), R_temp_x[col].end());
}
}
training_data_buffer_append(training_data_buffer.x, invalid_x);
training_data_buffer_mutex.unlock();
double ai_end_t = MPI_Wtime();
R["ai_prediction_time"] = ai_end_t - ai_start_t;
}
// Run simulation step
@ -405,30 +389,19 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// True values for invalid predictions
MSG("AI: Add invalid target data to training data buffer");
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval("targets <- matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) + ")");
R.parseEval("targets <- setNames(data.frame(targets), TMP_PROPS)");
R.parseEval("targets <- predictors[ai_surrogate_species]");
R.parseEval(std::string("targets <- ") +
"set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)");
R.parseEval("target_scaled <- preprocess(targets)");
std::vector<std::vector<double>> R_temp_y =
std::vector<std::vector<double>> invalid_y =
R.parseEval("get_invalid_values(target_scaled, validity_vector)");
training_data_buffer_mutex.lock();
// Initialize training data buffer if empty
if (training_data_buffer.y.size() == 0) {
training_data_buffer.y = R_temp_y;
} else { // otherwise append
for (int col = 0; col < training_data_buffer.y.size(); col++) {
training_data_buffer.y[col].insert(training_data_buffer.y[col].end(),
R_temp_y[col].begin(), R_temp_y[col].end());
}
}
training_data_buffer_append(training_data_buffer.y, invalid_y);
training_data_buffer_mutex.unlock();
// Signal to training thread if training data buffer is full
if (training_data_buffer.y[0].size() > params.training_data_size) {
if (training_data_buffer.y[0].size() >= params.training_data_size) {
start_training = true;
training_data_buffer_full.notify_one();
}
@ -632,15 +605,16 @@ int main(int argc, char *argv[]) {
MSG("AI: Sourcing user-provided script");
R.parseEvalQ(ai_surrogate_input_script);
if (!Rcpp::as<bool>(R.parseEval("exists(\"model_file_path\")"))) {
throw std::runtime_error("AI surrogate input script must contain variable model_file_path!");
throw std::runtime_error("AI surrogate input script must contain a value for model_file_path");
}
/* AI surrogate training and inference parameters. (Can be set by declaring a
variable of the same name in one of the the R input scripts)*/
run_params.use_Keras_predictions = false;
run_params.batch_size = 2560; // default value determined in tests wtih the barite benchmark
run_params.batch_size = 2560; // default value determined in tests with the barite benchmark
run_params.training_epochs = 5; // made up value. TODO: Set to useful default
run_params.training_data_size = 2500; // TODO: How to set this from chemistry field size?
run_params.training_data_size = 2500; // TODO: How to set this from chemistry field size?
run_params.save_model_path = ""; // Model is only saved if a path is set in the input field
if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) {
run_params.batch_size = R["batch_size"];
}
@ -653,6 +627,9 @@ int main(int argc, char *argv[]) {
if (Rcpp::as<bool>(R.parseEval("exists(\"use_Keras_predictions\")"))) {
run_params.use_Keras_predictions = R["use_Keras_predictions"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"save_model_path\")"))) {
run_params.save_model_path = Rcpp::as<std::string>(R["save_model_path"]);
}
MSG("AI: Initialize Python for AI surrogate functions");
@ -662,18 +639,20 @@ int main(int argc, char *argv[]) {
MSG("AI: Initialize model");
Python_Keras_load_model(R["model_file_path"]);
if (!run_params.use_Keras_predictions) {
MSG("AI: Uses custom C++ prediction function");
Python_Keras_set_weights_as_Eigen(Eigen_model);
}
MSG("AI: Initialize training thread");
Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex,
&training_data_buffer, &training_data_buffer_mutex,
&training_data_buffer_full, &start_training,
run_params.batch_size, run_params.training_epochs,
run_params.training_data_size, run_params.use_Keras_predictions);
run_params.training_data_size, run_params.use_Keras_predictions,
run_params.save_model_path);
if (!run_params.use_Keras_predictions) {
MSG("AI: Use custom C++ prediction function");
Python_Keras_set_weights_as_Eigen(Eigen_model);
}
MSG("AI: Surrogate model initialized");
}

View File

@ -75,4 +75,5 @@ struct RuntimeParameters {
int batch_size; // Can be set in the R input script
int training_epochs; // Can be set in the R input script
int training_data_size; // Can be set in the R input script
std::string save_model_path; // Can be set in the R input script
};