mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-15 20:38:23 +01:00
what is the problem?
This commit is contained in:
parent
5bfb95c2fc
commit
74cd827c68
@ -77,7 +77,8 @@ master_iteration_end <- function(setup, state_T, state_C) {
|
||||
|
||||
ai_surrogate_info <- list(
|
||||
prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL,
|
||||
valid_predictions = if (exists("validity_vector")) validity_vector else NULL
|
||||
predictions_validity = if (exists("validity_vector")) validity_vector else NULL,
|
||||
predictions = if (exists("predictions")) predictions else NULL
|
||||
)
|
||||
|
||||
SaveRObj(x = list(
|
||||
|
||||
@ -142,7 +142,7 @@ mass_balance <- function(x, y) {
|
||||
}
|
||||
|
||||
validate_predictions <- function(predictors, prediction) {
|
||||
epsilon <- 1E-7
|
||||
epsilon <- 1E-5
|
||||
mb <- mass_balance(predictors, prediction)
|
||||
msgm("Mass balance mean:", mean(mb))
|
||||
msgm("Mass balance variance:", var(mb))
|
||||
|
||||
@ -21,6 +21,7 @@ target_sources(POETLib
|
||||
target_include_directories(
|
||||
POETLib PUBLIC
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}"
|
||||
"${CMAKE_CURRENT_BINARY_DIR}"
|
||||
)
|
||||
|
||||
option(USE_AI_SURROGATE "Compiles the AI surrogate functions with Python.h integration" OFF)
|
||||
|
||||
@ -36,6 +36,49 @@ int Python_Keras_setup(std::string functions_file_path) {
|
||||
return py_functions_initialized;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Sets the ai surrogate runtime parameters
|
||||
* @param R Global R environment
|
||||
* @param params Gobal runtime parameters struct
|
||||
* @param init_list List with initial data
|
||||
*/
|
||||
void set_ai_surrogate_runtime_params(RInsidePOET& R, RuntimeParameters& params, InitialList& init_list) {
|
||||
/* AI surrogate training and inference parameters. (Can be set by declaring a
|
||||
variable of the same name in one of the the R input scripts)*/
|
||||
params.Keras_training_always_use_CPU = false; // Default will use GPU if detected
|
||||
params.Keras_training_always_use_CPU = false; // Default will use GPU if detected
|
||||
params.use_Keras_predictions = false; // Default inference function is custom C++ / Eigen implementation
|
||||
params.batch_size = 2560; // default value determined in test on the UP Turing cluster
|
||||
params.training_epochs = 20; //
|
||||
params.training_data_size = init_list.getDiffusionInit().n_rows *
|
||||
init_list.getDiffusionInit().n_cols; // Default value is number of cells in field
|
||||
params.save_model_path = ""; // Model is only saved if a path is set in the input field
|
||||
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) {
|
||||
params.batch_size = R["batch_size"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"training_epochs\")"))) {
|
||||
params.training_epochs = R["training_epochs"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"training_data_size\")"))) {
|
||||
params.training_data_size = R["training_data_size"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"use_Keras_predictions\")"))) {
|
||||
params.use_Keras_predictions = R["use_Keras_predictions"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"Keras_predictions_always_use_CPU\")"))) {
|
||||
params.Keras_predictions_always_use_CPU = R["Keras_predictions_always_use_CPU"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"Keras_training_always_use_CPU\")"))) {
|
||||
params.Keras_training_always_use_CPU = R["Keras_training_always_use_CPU"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"save_model_path\")"))) {
|
||||
params.save_model_path = Rcpp::as<std::string>(R["save_model_path"]);
|
||||
std::cout << "AI: Model will be saved as \"" << params.save_model_path << "\"" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Loads the user-supplied Keras model
|
||||
* @param model_file_path Path to a .keras file that the user must supply as
|
||||
@ -160,12 +203,19 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>> x, int
|
||||
* @return Predictions that the neural network made from the input values x. The predictions are
|
||||
* represented as a vector similar to the representation from the Field.AsVector() method
|
||||
*/
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size) {
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex) {
|
||||
|
||||
std::cout << "GETTING DIMS" << std::endl;
|
||||
|
||||
// Convert input data to Eigen matrix
|
||||
const int num_samples = x[0].size();
|
||||
const int num_features = x.size();
|
||||
|
||||
std::cout << "SETTING MATRIX" << num_samples << num_features << std::endl;
|
||||
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
|
||||
|
||||
std::cout << "SETTING VALUES" << std::endl;
|
||||
for (int i = 0; i < num_samples; ++i) {
|
||||
for (int j = 0; j < num_features; ++j) {
|
||||
full_input_matrix(j, i) = x[j][i];
|
||||
@ -173,30 +223,50 @@ std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vect
|
||||
}
|
||||
|
||||
std::vector<double> result;
|
||||
std::cout << "RESERVING RESULT" << std::endl;
|
||||
|
||||
result.reserve(num_samples * num_features);
|
||||
|
||||
if (num_features != model.weight_matrices[0].cols()) {
|
||||
throw std::runtime_error("Input data size " + std::to_string(num_features) + \
|
||||
" does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols()));
|
||||
}
|
||||
|
||||
std::cout << "MAKING BATCHESS OF SIZE " << batch_size << std::endl;
|
||||
int num_batches = std::ceil(static_cast<double>(num_samples) / batch_size);
|
||||
std::cout << "LOOKING MUTEX"<< std::endl;
|
||||
|
||||
Eigen_model_mutex->lock();
|
||||
std::cout << "STARTING CALCULATIONS"<< std::endl;
|
||||
for (int batch = 0; batch < num_batches; ++batch) {
|
||||
std::cout << "BATCH "<< batch << std::endl;
|
||||
int start_idx = batch * batch_size;
|
||||
int end_idx = std::min((batch + 1) * batch_size, num_samples);
|
||||
int current_batch_size = end_idx - start_idx;
|
||||
// Extract the current input data batch
|
||||
Eigen::MatrixXd batch_data(num_features, current_batch_size);
|
||||
std::cout << "BATCH SIZE CALCULATED "<< batch << std::endl;
|
||||
Eigen::MatrixXd batch_data(num_features, current_batch_size);
|
||||
std::cout << "BATCH INPUT DECLARED "<< batch << std::endl;
|
||||
batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size);
|
||||
|
||||
std::cout << "BATCH INPUT SET "<< batch << std::endl;
|
||||
// Predict
|
||||
std::cout << "BATCH INPUT CLCULATE "<< batch << std::endl;
|
||||
batch_data = eigen_inference_batched(batch_data, model);
|
||||
std::cout << "RESULT INSERT "<< batch << std::endl;
|
||||
result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size());
|
||||
}
|
||||
std::cout << "UNLOCKING MUTEX"<< std::endl;
|
||||
Eigen_model_mutex->unlock();
|
||||
return result;
|
||||
}
|
||||
|
||||
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer, std::vector<std::vector<double>>& new_values) {
|
||||
|
||||
/**
|
||||
* @brief Appends data from one matrix (column major std::vector<std::vector<double>>) to another
|
||||
* @param training_data_buffer Matrix that the values are appended to
|
||||
* @param new_values Matrix that is appended
|
||||
*/
|
||||
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
|
||||
std::vector<std::vector<double>>& new_values) {
|
||||
// Initialize training data buffer if empty
|
||||
if (training_data_buffer.size() == 0) {
|
||||
training_data_buffer = new_values;
|
||||
@ -208,12 +278,19 @@ void training_data_buffer_append(std::vector<std::vector<double>>& training_data
|
||||
}
|
||||
}
|
||||
|
||||
void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vector<double>> y, int batch_size, int epochs,
|
||||
std::string save_model_path) {
|
||||
|
||||
/**
|
||||
* @brief Uses the Python environment with Keras' default functions to train the model
|
||||
* @param x Training data features
|
||||
* @param y Training data targets
|
||||
* @param params Global runtime paramters
|
||||
*/
|
||||
void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vector<double>> y,
|
||||
const RuntimeParameters& params) {
|
||||
// Prepare data for python
|
||||
PyObject* py_df_x = vector_to_numpy_array(x);
|
||||
PyObject* py_df_y = vector_to_numpy_array(y);
|
||||
|
||||
|
||||
// Get the model and training function from the global python interpreter
|
||||
PyObject* py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
|
||||
@ -222,7 +299,7 @@ void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vec
|
||||
|
||||
// Build the function arguments as four python objects and an integer
|
||||
PyObject* args = Py_BuildValue("(OOOiis)",
|
||||
py_keras_model, py_df_x, py_df_y, batch_size, epochs, save_model_path.c_str());
|
||||
py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs, params.save_model_path.c_str());
|
||||
|
||||
|
||||
// Call the Python training function
|
||||
@ -235,7 +312,6 @@ void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vec
|
||||
Py_DECREF(args);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Function for threadsafe parallel training and weight updating.
|
||||
* The function waits conditionally until the training data buffer is full.
|
||||
@ -248,8 +324,8 @@ void Python_keras_train(std::vector<std::vector<double>> x, std::vector<std::vec
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
|
||||
* when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
|
||||
* @param batch_size Batch size for Keras' training function
|
||||
* @param epochs Number of training epochs
|
||||
* @param end_training Signals end of program to wind down thread gracefully
|
||||
* @param params Global runtime paramters
|
||||
* @return 0 if function was succesful
|
||||
*/
|
||||
void parallel_training(EigenModel* Eigen_model,
|
||||
@ -257,9 +333,8 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
TrainingData* training_data_buffer,
|
||||
std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training,
|
||||
int batch_size, int epochs, int training_data_size,
|
||||
bool use_Keras_predictions, std::string save_model_path) {
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params) {
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
|
||||
// Conditional waiting:
|
||||
@ -271,6 +346,10 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
// Wait for a signal on training_data_buffer_full but starts the next round immediately.
|
||||
training_data_buffer_full->wait(lock, [start_training] { return *start_training;});
|
||||
|
||||
if (end_training) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset the waiting predicate
|
||||
*start_training = false;
|
||||
|
||||
@ -282,19 +361,19 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
for (int col = 0; col < training_data_buffer->x.size(); col++) {
|
||||
// Copy data from the front of the buffer to the training inputs
|
||||
std::copy(training_data_buffer->x[col].begin(),
|
||||
training_data_buffer->x[col].begin() + training_data_size,
|
||||
training_data_buffer->x[col].begin() + params.training_data_size,
|
||||
std::back_inserter(inputs[col]));
|
||||
// Remove copied data from the front of the buffer
|
||||
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin(),
|
||||
training_data_buffer->x[col].begin() + training_data_size);
|
||||
training_data_buffer->x[col].begin() + params.training_data_size);
|
||||
|
||||
// Copy data from the front of the buffer to the training targets
|
||||
std::copy(training_data_buffer->y[col].begin(),
|
||||
training_data_buffer->y[col].begin() + training_data_size,
|
||||
training_data_buffer->y[col].begin() + params.training_data_size,
|
||||
std::back_inserter(targets[col]));
|
||||
// Remove copied data from the front of the buffer
|
||||
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin(),
|
||||
training_data_buffer->y[col].begin() + training_data_size);
|
||||
training_data_buffer->y[col].begin() + params.training_data_size);
|
||||
|
||||
}
|
||||
// Unlock the training_data_buffer_mutex
|
||||
@ -306,9 +385,9 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
|
||||
// Start training
|
||||
Python_keras_train(inputs, targets, batch_size, epochs, save_model_path);
|
||||
Python_keras_train(inputs, targets, params);
|
||||
|
||||
if (!use_Keras_predictions) {
|
||||
if (!params.use_Keras_predictions) {
|
||||
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
|
||||
Eigen_model_mutex->lock();
|
||||
Python_Keras_set_weights_as_Eigen(*Eigen_model);
|
||||
@ -321,6 +400,7 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
}
|
||||
}
|
||||
|
||||
std::thread python_train_thread;
|
||||
/**
|
||||
* @brief Starts a thread for parallel training and weight updating. This Wrapper function
|
||||
* ensures, that the main POET program can be built without pthread support if the AI
|
||||
@ -329,8 +409,11 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
|
||||
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
|
||||
* @param batch_size Batch size for Keras' training function
|
||||
* @param epochs Number of training epochs
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
|
||||
* when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
|
||||
* @param end_training Signals end of program to wind down thread gracefully
|
||||
* @param params Global runtime paramters
|
||||
* @return 0 if function was succesful
|
||||
*/
|
||||
int Python_Keras_training_thread(EigenModel* Eigen_model,
|
||||
@ -338,18 +421,14 @@ int Python_Keras_training_thread(EigenModel* Eigen_model,
|
||||
TrainingData* training_data_buffer,
|
||||
std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training,
|
||||
int batch_size, int epochs, int training_data_size,
|
||||
bool use_Keras_predictions,
|
||||
std::string save_model_path) {
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params) {
|
||||
|
||||
PyThreadState *_save = PyEval_SaveThread();
|
||||
std::thread training_thread(parallel_training, Eigen_model, Eigen_model_mutex,
|
||||
training_data_buffer, training_data_buffer_mutex,
|
||||
training_data_buffer_full, start_training,
|
||||
batch_size, epochs, training_data_size,
|
||||
use_Keras_predictions, save_model_path);
|
||||
training_thread.detach();
|
||||
python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_mutex,
|
||||
training_data_buffer, training_data_buffer_mutex,
|
||||
training_data_buffer_full, start_training, end_training,
|
||||
params);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -453,8 +532,8 @@ void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) {
|
||||
}
|
||||
|
||||
// Add to EigenModel
|
||||
eigen_model.weight_matrices.emplace_back(weight_matrix);
|
||||
eigen_model.biases.emplace_back(bias_vector);
|
||||
eigen_model.weight_matrices.push_back(weight_matrix);
|
||||
eigen_model.biases.push_back(bias_vector);
|
||||
}
|
||||
|
||||
// Clean up
|
||||
@ -464,8 +543,37 @@ void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) {
|
||||
PyGILState_Release(gstate);
|
||||
}
|
||||
|
||||
void Python_finalize() {
|
||||
Py_Finalize();
|
||||
|
||||
/**
|
||||
* @brief Joins the training thread and winds down the Python environment gracefully
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
|
||||
* when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
|
||||
* @param end_training Signals end of program to wind down thread gracefully */
|
||||
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training, bool* end_training) {
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
// Define training as over
|
||||
*end_training = true;
|
||||
Eigen_model_mutex->lock();
|
||||
training_data_buffer_mutex->lock();
|
||||
|
||||
// Wake up and join training thread
|
||||
*start_training = true;
|
||||
training_data_buffer_full->notify_one();
|
||||
|
||||
// Checking first.
|
||||
// Might be useful if options are added to disable training
|
||||
if (python_train_thread.joinable()) {
|
||||
python_train_thread.join();
|
||||
}
|
||||
|
||||
//Finalize Python
|
||||
Py_FinalizeEx();
|
||||
}
|
||||
|
||||
}
|
||||
@ -16,6 +16,9 @@
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "poet.hpp"
|
||||
#include "Base/RInsidePOET.hpp"
|
||||
#include "Init/InitialList.hpp"
|
||||
|
||||
// PhreeqC definition of pi clashes with Eigen macros so we have to temporarily undef it
|
||||
#pragma push_macro("pi")
|
||||
@ -23,7 +26,6 @@
|
||||
#include <Eigen/Dense>
|
||||
#pragma pop_macro("pi")
|
||||
|
||||
|
||||
namespace poet {
|
||||
// Define an aligned allocator for std::vector
|
||||
template<typename T>
|
||||
@ -44,42 +46,47 @@ struct TrainingData {
|
||||
|
||||
int Python_Keras_setup(std::string functions_file_path);
|
||||
|
||||
void Python_finalize();
|
||||
void set_ai_surrogate_runtime_params(RInsidePOET& R, RuntimeParameters& params, InitialList& init_list);
|
||||
|
||||
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full, bool* start_training, bool* end_training);
|
||||
|
||||
int Python_Keras_load_model(std::string model_file_path);
|
||||
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>> x, int batch_size);
|
||||
|
||||
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer, std::vector<std::vector<double>>& new_values);
|
||||
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
|
||||
std::vector<std::vector<double>>& new_values);
|
||||
|
||||
int Python_Keras_training_thread(EigenModel* Eigen_model,
|
||||
std::mutex* Eigen_model_mutex,
|
||||
TrainingData* training_data_buffer,
|
||||
std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training,
|
||||
int batch_size, int epochs, int training_data_size,
|
||||
bool use_Keras_predictions, std::string save_model_path);
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params);
|
||||
|
||||
void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model);
|
||||
|
||||
Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<Eigen::MatrixXd>& input_batch, const EigenModel& model);
|
||||
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size);
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex);
|
||||
|
||||
// Otherwise, define the necessary stubs
|
||||
#else
|
||||
inline void Python_Keras_setup(std::string functions_file_path){}
|
||||
inline void Python_finalize(){}
|
||||
inline void set_ai_surrogate_runtime_params(RInsidePOET&, RuntimeParameters&, InitialList&){}
|
||||
inline void Python_finalize(std::mutex*, std::mutex*, std::condition_variable*, bool*, bool*){}
|
||||
inline void Python_Keras_load_model(std::string model_file_path){}
|
||||
inline void training_data_buffer_append(std::vector<std::vector<double>>&, std::vector<std::vector<double>>&){}
|
||||
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>, int){return {};}
|
||||
inline int Python_Keras_training_thread(EigenModel*, std::mutex*,
|
||||
TrainingData*, std::mutex*,
|
||||
std::condition_variable*, bool*,
|
||||
int, int, int, bool, std::string){return {};}
|
||||
std::condition_variable*, RuntimeParameters&,
|
||||
bool*, bool*){return {};}
|
||||
inline void Python_Keras_set_weights_as_Eigen(EigenModel&){}
|
||||
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>, int){return {};}
|
||||
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>, int, std::mutex*){return {};}
|
||||
#endif
|
||||
} // namespace poet
|
||||
|
||||
|
||||
102
src/poet.cpp
102
src/poet.cpp
@ -42,7 +42,6 @@
|
||||
#include <condition_variable>
|
||||
#include "Chemistry/SurrogateModels/AI_functions.hpp"
|
||||
#include <CLI/CLI.hpp>
|
||||
|
||||
#include <poet.hpp>
|
||||
#include <vector>
|
||||
|
||||
@ -74,19 +73,6 @@ static void init_global_functions(RInside &R) {
|
||||
}
|
||||
|
||||
|
||||
/* Global variables for the AI surrogate model */
|
||||
|
||||
/* For the weights and biases of the model
|
||||
* to use in an inference function with Eigen */
|
||||
std::mutex Eigen_model_mutex;
|
||||
static EigenModel Eigen_model;
|
||||
|
||||
/* For the training data */
|
||||
std::mutex training_data_buffer_mutex;
|
||||
std::condition_variable training_data_buffer_full;
|
||||
bool start_training;
|
||||
TrainingData training_data_buffer;
|
||||
|
||||
// HACK: this is a step back as the order and also the count of fields is
|
||||
// predefined, but it will change in the future
|
||||
// static inline void writeFieldsToR(RInside &R, const Field &trans,
|
||||
@ -298,7 +284,33 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
|
||||
if (params.print_progress) {
|
||||
chem.setProgressBarPrintout(true);
|
||||
}
|
||||
|
||||
/* For the weights and biases of the AI surrogate
|
||||
* model to use in an inference function with Eigen */
|
||||
std::mutex Eigen_model_mutex;
|
||||
static EigenModel Eigen_model;
|
||||
/* For the training data */
|
||||
std::mutex training_data_buffer_mutex;
|
||||
std::condition_variable training_data_buffer_full;
|
||||
bool start_training, end_training;
|
||||
TrainingData training_data_buffer;
|
||||
if (params.use_ai_surrogate) {
|
||||
MSG("AI: Initialize model");
|
||||
Python_Keras_load_model(R["model_file_path"]);
|
||||
MSG("AI: Initialize training thread");
|
||||
|
||||
Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex,
|
||||
&training_data_buffer, &training_data_buffer_mutex,
|
||||
&training_data_buffer_full, &start_training, &end_training,
|
||||
params);
|
||||
if (!params.use_Keras_predictions) {
|
||||
MSG("AI: Use custom C++ prediction function");
|
||||
Python_Keras_set_weights_as_Eigen(Eigen_model);
|
||||
}
|
||||
MSG("AI: Surrogate model initialized");
|
||||
}
|
||||
|
||||
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
|
||||
R["field_nrow"] = chem.getField().GetRequestedVecSize();
|
||||
|
||||
@ -338,7 +350,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
R["TMP"] = Python_Keras_predict(R["predictors_scaled"], params.batch_size);
|
||||
|
||||
} else { // Predict with custom Eigen function
|
||||
R["TMP"] = Eigen_predict(Eigen_model, R["predictors_scaled"], params.batch_size);
|
||||
R["TMP"] = Eigen_predict(Eigen_model, R["predictors_scaled"], params.batch_size, &Eigen_model_mutex);
|
||||
}
|
||||
|
||||
// Apply postprocessing
|
||||
@ -464,6 +476,12 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
profiling["diffusion"] = diffusion_profiling;
|
||||
|
||||
chem.MasterLoopBreak();
|
||||
|
||||
if (params.use_ai_surrogate) {
|
||||
MSG("Finalize Python and wind down training thread");
|
||||
Python_finalize(&Eigen_model_mutex, &training_data_buffer_mutex,
|
||||
&training_data_buffer_full, &start_training, &end_training);
|
||||
}
|
||||
|
||||
return profiling;
|
||||
}
|
||||
@ -607,57 +625,13 @@ int main(int argc, char *argv[]) {
|
||||
if (!Rcpp::as<bool>(R.parseEval("exists(\"model_file_path\")"))) {
|
||||
throw std::runtime_error("AI surrogate input script must contain a value for model_file_path");
|
||||
}
|
||||
|
||||
/* AI surrogate training and inference parameters. (Can be set by declaring a
|
||||
variable of the same name in one of the the R input scripts)*/
|
||||
|
||||
run_params.use_Keras_predictions = false;
|
||||
run_params.batch_size = 2560; // default value determined in test on the UP Turing cluster
|
||||
run_params.training_epochs = 20; //
|
||||
run_params.training_data_size = init_list.getDiffusionInit().n_rows *
|
||||
init_list.getDiffusionInit().n_cols; // Default value is number of cells in field
|
||||
run_params.save_model_path = ""; // Model is only saved if a path is set in the input field
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) {
|
||||
run_params.batch_size = R["batch_size"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"training_epochs\")"))) {
|
||||
run_params.training_epochs = R["training_epochs"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"training_data_size\")"))) {
|
||||
run_params.training_data_size = R["training_data_size"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"use_Keras_predictions\")"))) {
|
||||
run_params.use_Keras_predictions = R["use_Keras_predictions"];
|
||||
}
|
||||
if (Rcpp::as<bool>(R.parseEval("exists(\"save_model_path\")"))) {
|
||||
run_params.save_model_path = Rcpp::as<std::string>(R["save_model_path"]);
|
||||
MSG("AI: Model will be saved as \"" + run_params.save_model_path + "\"");
|
||||
}
|
||||
|
||||
|
||||
|
||||
set_ai_surrogate_runtime_params(R, run_params, init_list);
|
||||
|
||||
MSG("AI: Initialize Python for AI surrogate functions");
|
||||
std::string python_keras_file = std::string(SRC_DIR) +
|
||||
"/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py";
|
||||
Python_Keras_setup(python_keras_file);
|
||||
|
||||
MSG("AI: Initialize model");
|
||||
Python_Keras_load_model(R["model_file_path"]);
|
||||
|
||||
MSG("AI: Initialize training thread");
|
||||
Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex,
|
||||
&training_data_buffer, &training_data_buffer_mutex,
|
||||
&training_data_buffer_full, &start_training,
|
||||
run_params.batch_size, run_params.training_epochs,
|
||||
run_params.training_data_size, run_params.use_Keras_predictions,
|
||||
run_params.save_model_path);
|
||||
|
||||
|
||||
if (!run_params.use_Keras_predictions) {
|
||||
MSG("AI: Use custom C++ prediction function");
|
||||
Python_Keras_set_weights_as_Eigen(Eigen_model);
|
||||
}
|
||||
|
||||
MSG("AI: Surrogate model initialized");
|
||||
}
|
||||
|
||||
MSG("Init done on process with rank " + std::to_string(MY_RANK));
|
||||
@ -675,10 +649,6 @@ int main(int argc, char *argv[]) {
|
||||
"'/timings.', setup$out_ext));";
|
||||
R.parseEval(r_vis_code);
|
||||
|
||||
if (run_params.use_ai_surrogate) {
|
||||
Python_finalize();
|
||||
}
|
||||
|
||||
MSG("Done! Results are stored as R objects into <" + run_params.out_dir +
|
||||
"/timings." + run_params.out_ext);
|
||||
}
|
||||
|
||||
@ -72,6 +72,8 @@ struct RuntimeParameters {
|
||||
bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate
|
||||
|
||||
bool use_Keras_predictions; // Can be set in the R input script
|
||||
bool Keras_predictions_always_use_CPU; // Can be set in the R input script
|
||||
bool Keras_training_always_use_CPU; // Can be set in the R input script
|
||||
int batch_size; // Can be set in the R input script
|
||||
int training_epochs; // Can be set in the R input script
|
||||
int training_data_size; // Can be set in the R input script
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user