diff --git a/R_lib/kin_r_library.R b/R_lib/kin_r_library.R index 088dec409..e4c1a847a 100644 --- a/R_lib/kin_r_library.R +++ b/R_lib/kin_r_library.R @@ -77,7 +77,8 @@ master_iteration_end <- function(setup, state_T, state_C) { ai_surrogate_info <- list( prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL, - valid_predictions = if (exists("validity_vector")) validity_vector else NULL + predictions_validity = if (exists("validity_vector")) validity_vector else NULL, + predictions = if (exists("predictions")) predictions else NULL ) SaveRObj(x = list( diff --git a/bench/barite/barite_50ai_surr_mdl.R b/bench/barite/barite_50ai_surr_mdl.R index b7aa92cbe..4fde236b9 100644 --- a/bench/barite/barite_50ai_surr_mdl.R +++ b/bench/barite/barite_50ai_surr_mdl.R @@ -142,7 +142,7 @@ mass_balance <- function(x, y) { } validate_predictions <- function(predictors, prediction) { - epsilon <- 1E-7 + epsilon <- 1E-5 mb <- mass_balance(predictors, prediction) msgm("Mass balance mean:", mean(mb)) msgm("Mass balance variance:", var(mb)) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 526d9da37..8aa8b9882 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,6 +21,7 @@ target_sources(POETLib target_include_directories( POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}" + "${CMAKE_CURRENT_BINARY_DIR}" ) option(USE_AI_SURROGATE "Compiles the AI surrogate functions with Python.h integration" OFF) diff --git a/src/Chemistry/SurrogateModels/AI_functions.cpp b/src/Chemistry/SurrogateModels/AI_functions.cpp index d7f05c660..d359870bb 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.cpp +++ b/src/Chemistry/SurrogateModels/AI_functions.cpp @@ -36,6 +36,49 @@ int Python_Keras_setup(std::string functions_file_path) { return py_functions_initialized; } + +/** + * @brief Sets the ai surrogate runtime parameters + * @param R Global R environment + * @param params Gobal runtime parameters struct + * @param init_list List with initial data + */ +void set_ai_surrogate_runtime_params(RInsidePOET& R, RuntimeParameters& params, InitialList& init_list) { + /* AI surrogate training and inference parameters. (Can be set by declaring a + variable of the same name in one of the the R input scripts)*/ + params.Keras_training_always_use_CPU = false; // Default will use GPU if detected + params.Keras_training_always_use_CPU = false; // Default will use GPU if detected + params.use_Keras_predictions = false; // Default inference function is custom C++ / Eigen implementation + params.batch_size = 2560; // default value determined in test on the UP Turing cluster + params.training_epochs = 20; // + params.training_data_size = init_list.getDiffusionInit().n_rows * + init_list.getDiffusionInit().n_cols; // Default value is number of cells in field + params.save_model_path = ""; // Model is only saved if a path is set in the input field + + if (Rcpp::as(R.parseEval("exists(\"batch_size\")"))) { + params.batch_size = R["batch_size"]; + } + if (Rcpp::as(R.parseEval("exists(\"training_epochs\")"))) { + params.training_epochs = R["training_epochs"]; + } + if (Rcpp::as(R.parseEval("exists(\"training_data_size\")"))) { + params.training_data_size = R["training_data_size"]; + } + if (Rcpp::as(R.parseEval("exists(\"use_Keras_predictions\")"))) { + params.use_Keras_predictions = R["use_Keras_predictions"]; + } + if (Rcpp::as(R.parseEval("exists(\"Keras_predictions_always_use_CPU\")"))) { + params.Keras_predictions_always_use_CPU = R["Keras_predictions_always_use_CPU"]; + } + if (Rcpp::as(R.parseEval("exists(\"Keras_training_always_use_CPU\")"))) { + params.Keras_training_always_use_CPU = R["Keras_training_always_use_CPU"]; + } + if (Rcpp::as(R.parseEval("exists(\"save_model_path\")"))) { + params.save_model_path = Rcpp::as(R["save_model_path"]); + std::cout << "AI: Model will be saved as \"" << params.save_model_path << "\"" << std::endl; + } +} + /** * @brief Loads the user-supplied Keras model * @param model_file_path Path to a .keras file that the user must supply as @@ -160,12 +203,19 @@ std::vector Python_Keras_predict(std::vector> x, int * @return Predictions that the neural network made from the input values x. The predictions are * represented as a vector similar to the representation from the Field.AsVector() method */ -std::vector Eigen_predict(const EigenModel& model, std::vector> x, int batch_size) { +std::vector Eigen_predict(const EigenModel& model, std::vector> x, int batch_size, + std::mutex* Eigen_model_mutex) { + + std::cout << "GETTING DIMS" << std::endl; + // Convert input data to Eigen matrix const int num_samples = x[0].size(); const int num_features = x.size(); + std::cout << "SETTING MATRIX" << num_samples << num_features << std::endl; Eigen::MatrixXd full_input_matrix(num_features, num_samples); + + std::cout << "SETTING VALUES" << std::endl; for (int i = 0; i < num_samples; ++i) { for (int j = 0; j < num_features; ++j) { full_input_matrix(j, i) = x[j][i]; @@ -173,30 +223,50 @@ std::vector Eigen_predict(const EigenModel& model, std::vector result; + std::cout << "RESERVING RESULT" << std::endl; + result.reserve(num_samples * num_features); if (num_features != model.weight_matrices[0].cols()) { throw std::runtime_error("Input data size " + std::to_string(num_features) + \ " does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols())); } - + std::cout << "MAKING BATCHESS OF SIZE " << batch_size << std::endl; int num_batches = std::ceil(static_cast(num_samples) / batch_size); + std::cout << "LOOKING MUTEX"<< std::endl; + + Eigen_model_mutex->lock(); + std::cout << "STARTING CALCULATIONS"<< std::endl; for (int batch = 0; batch < num_batches; ++batch) { + std::cout << "BATCH "<< batch << std::endl; int start_idx = batch * batch_size; int end_idx = std::min((batch + 1) * batch_size, num_samples); int current_batch_size = end_idx - start_idx; // Extract the current input data batch - Eigen::MatrixXd batch_data(num_features, current_batch_size); + std::cout << "BATCH SIZE CALCULATED "<< batch << std::endl; + Eigen::MatrixXd batch_data(num_features, current_batch_size); + std::cout << "BATCH INPUT DECLARED "<< batch << std::endl; batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size); - + std::cout << "BATCH INPUT SET "<< batch << std::endl; // Predict + std::cout << "BATCH INPUT CLCULATE "<< batch << std::endl; batch_data = eigen_inference_batched(batch_data, model); + std::cout << "RESULT INSERT "<< batch << std::endl; result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size()); } + std::cout << "UNLOCKING MUTEX"<< std::endl; + Eigen_model_mutex->unlock(); return result; } -void training_data_buffer_append(std::vector>& training_data_buffer, std::vector>& new_values) { + +/** + * @brief Appends data from one matrix (column major std::vector>) to another + * @param training_data_buffer Matrix that the values are appended to + * @param new_values Matrix that is appended + */ +void training_data_buffer_append(std::vector>& training_data_buffer, + std::vector>& new_values) { // Initialize training data buffer if empty if (training_data_buffer.size() == 0) { training_data_buffer = new_values; @@ -208,12 +278,19 @@ void training_data_buffer_append(std::vector>& training_data } } -void Python_keras_train(std::vector> x, std::vector> y, int batch_size, int epochs, - std::string save_model_path) { + +/** + * @brief Uses the Python environment with Keras' default functions to train the model + * @param x Training data features + * @param y Training data targets + * @param params Global runtime paramters + */ +void Python_keras_train(std::vector> x, std::vector> y, + const RuntimeParameters& params) { // Prepare data for python PyObject* py_df_x = vector_to_numpy_array(x); PyObject* py_df_y = vector_to_numpy_array(y); - + // Get the model and training function from the global python interpreter PyObject* py_main_module = PyImport_AddModule("__main__"); PyObject* py_global_dict = PyModule_GetDict(py_main_module); @@ -222,7 +299,7 @@ void Python_keras_train(std::vector> x, std::vector> x, std::vector> x, std::vector lock(*training_data_buffer_mutex); // Conditional waiting: @@ -271,6 +346,10 @@ void parallel_training(EigenModel* Eigen_model, // Wait for a signal on training_data_buffer_full but starts the next round immediately. training_data_buffer_full->wait(lock, [start_training] { return *start_training;}); + if (end_training) { + return; + } + // Reset the waiting predicate *start_training = false; @@ -282,19 +361,19 @@ void parallel_training(EigenModel* Eigen_model, for (int col = 0; col < training_data_buffer->x.size(); col++) { // Copy data from the front of the buffer to the training inputs std::copy(training_data_buffer->x[col].begin(), - training_data_buffer->x[col].begin() + training_data_size, + training_data_buffer->x[col].begin() + params.training_data_size, std::back_inserter(inputs[col])); // Remove copied data from the front of the buffer training_data_buffer->x[col].erase(training_data_buffer->x[col].begin(), - training_data_buffer->x[col].begin() + training_data_size); + training_data_buffer->x[col].begin() + params.training_data_size); // Copy data from the front of the buffer to the training targets std::copy(training_data_buffer->y[col].begin(), - training_data_buffer->y[col].begin() + training_data_size, + training_data_buffer->y[col].begin() + params.training_data_size, std::back_inserter(targets[col])); // Remove copied data from the front of the buffer training_data_buffer->y[col].erase(training_data_buffer->y[col].begin(), - training_data_buffer->y[col].begin() + training_data_size); + training_data_buffer->y[col].begin() + params.training_data_size); } // Unlock the training_data_buffer_mutex @@ -306,9 +385,9 @@ void parallel_training(EigenModel* Eigen_model, PyGILState_STATE gstate = PyGILState_Ensure(); // Start training - Python_keras_train(inputs, targets, batch_size, epochs, save_model_path); + Python_keras_train(inputs, targets, params); - if (!use_Keras_predictions) { + if (!params.use_Keras_predictions) { std::cout << "AI: Training thread: Update shared model weights" << std::endl; Eigen_model_mutex->lock(); Python_Keras_set_weights_as_Eigen(*Eigen_model); @@ -321,6 +400,7 @@ void parallel_training(EigenModel* Eigen_model, } } +std::thread python_train_thread; /** * @brief Starts a thread for parallel training and weight updating. This Wrapper function * ensures, that the main POET program can be built without pthread support if the AI @@ -329,8 +409,11 @@ void parallel_training(EigenModel* Eigen_model, * @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct * @param training_data_buffer Pointer to the Training data struct with which the model is trained * @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct - * @param batch_size Batch size for Keras' training function - * @param epochs Number of training epochs + * @param training_data_buffer_full Conditional waiting variable with wich the main thread signals + * when a training run can start + * @param start_training Conditional waiting predicate to mitigate against spurious wakeups + * @param end_training Signals end of program to wind down thread gracefully + * @param params Global runtime paramters * @return 0 if function was succesful */ int Python_Keras_training_thread(EigenModel* Eigen_model, @@ -338,18 +421,14 @@ int Python_Keras_training_thread(EigenModel* Eigen_model, TrainingData* training_data_buffer, std::mutex* training_data_buffer_mutex, std::condition_variable* training_data_buffer_full, - bool* start_training, - int batch_size, int epochs, int training_data_size, - bool use_Keras_predictions, - std::string save_model_path) { + bool* start_training, bool* end_training, + const RuntimeParameters& params) { PyThreadState *_save = PyEval_SaveThread(); - std::thread training_thread(parallel_training, Eigen_model, Eigen_model_mutex, - training_data_buffer, training_data_buffer_mutex, - training_data_buffer_full, start_training, - batch_size, epochs, training_data_size, - use_Keras_predictions, save_model_path); - training_thread.detach(); + python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_mutex, + training_data_buffer, training_data_buffer_mutex, + training_data_buffer_full, start_training, end_training, + params); return 0; } @@ -453,8 +532,8 @@ void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) { } // Add to EigenModel - eigen_model.weight_matrices.emplace_back(weight_matrix); - eigen_model.biases.emplace_back(bias_vector); + eigen_model.weight_matrices.push_back(weight_matrix); + eigen_model.biases.push_back(bias_vector); } // Clean up @@ -464,8 +543,37 @@ void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model) { PyGILState_Release(gstate); } -void Python_finalize() { - Py_Finalize(); + +/** + * @brief Joins the training thread and winds down the Python environment gracefully + * @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct + * @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct + * @param training_data_buffer_full Conditional waiting variable with wich the main thread signals + * when a training run can start + * @param start_training Conditional waiting predicate to mitigate against spurious wakeups + * @param end_training Signals end of program to wind down thread gracefully */ +void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex, + std::condition_variable* training_data_buffer_full, + bool* start_training, bool* end_training) { + // Acquire the Python GIL + PyGILState_STATE gstate = PyGILState_Ensure(); + // Define training as over + *end_training = true; + Eigen_model_mutex->lock(); + training_data_buffer_mutex->lock(); + + // Wake up and join training thread + *start_training = true; + training_data_buffer_full->notify_one(); + + // Checking first. + // Might be useful if options are added to disable training + if (python_train_thread.joinable()) { + python_train_thread.join(); + } + + //Finalize Python + Py_FinalizeEx(); } } \ No newline at end of file diff --git a/src/Chemistry/SurrogateModels/AI_functions.hpp b/src/Chemistry/SurrogateModels/AI_functions.hpp index dd54fbc53..69a9f4c20 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.hpp +++ b/src/Chemistry/SurrogateModels/AI_functions.hpp @@ -16,6 +16,9 @@ #include #include +#include "poet.hpp" +#include "Base/RInsidePOET.hpp" +#include "Init/InitialList.hpp" // PhreeqC definition of pi clashes with Eigen macros so we have to temporarily undef it #pragma push_macro("pi") @@ -23,7 +26,6 @@ #include #pragma pop_macro("pi") - namespace poet { // Define an aligned allocator for std::vector template @@ -44,42 +46,47 @@ struct TrainingData { int Python_Keras_setup(std::string functions_file_path); -void Python_finalize(); +void set_ai_surrogate_runtime_params(RInsidePOET& R, RuntimeParameters& params, InitialList& init_list); + +void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex, + std::condition_variable* training_data_buffer_full, bool* start_training, bool* end_training); int Python_Keras_load_model(std::string model_file_path); std::vector Python_Keras_predict(std::vector> x, int batch_size); -void training_data_buffer_append(std::vector>& training_data_buffer, std::vector>& new_values); +void training_data_buffer_append(std::vector>& training_data_buffer, + std::vector>& new_values); int Python_Keras_training_thread(EigenModel* Eigen_model, std::mutex* Eigen_model_mutex, TrainingData* training_data_buffer, std::mutex* training_data_buffer_mutex, std::condition_variable* training_data_buffer_full, - bool* start_training, - int batch_size, int epochs, int training_data_size, - bool use_Keras_predictions, std::string save_model_path); + bool* start_training, bool* end_training, + const RuntimeParameters& params); void Python_Keras_set_weights_as_Eigen(EigenModel& eigen_model); Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref& input_batch, const EigenModel& model); -std::vector Eigen_predict(const EigenModel& model, std::vector> x, int batch_size); +std::vector Eigen_predict(const EigenModel& model, std::vector> x, int batch_size, + std::mutex* Eigen_model_mutex); // Otherwise, define the necessary stubs #else inline void Python_Keras_setup(std::string functions_file_path){} -inline void Python_finalize(){} +inline void set_ai_surrogate_runtime_params(RInsidePOET&, RuntimeParameters&, InitialList&){} +inline void Python_finalize(std::mutex*, std::mutex*, std::condition_variable*, bool*, bool*){} inline void Python_Keras_load_model(std::string model_file_path){} inline void training_data_buffer_append(std::vector>&, std::vector>&){} inline std::vector Python_Keras_predict(std::vector>, int){return {};} inline int Python_Keras_training_thread(EigenModel*, std::mutex*, TrainingData*, std::mutex*, - std::condition_variable*, bool*, - int, int, int, bool, std::string){return {};} + std::condition_variable*, RuntimeParameters&, + bool*, bool*){return {};} inline void Python_Keras_set_weights_as_Eigen(EigenModel&){} -inline std::vector Eigen_predict(const EigenModel&, std::vector>, int){return {};} +inline std::vector Eigen_predict(const EigenModel&, std::vector>, int, std::mutex*){return {};} #endif } // namespace poet diff --git a/src/poet.cpp b/src/poet.cpp index 0af3b8fe9..864b2f17b 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -42,7 +42,6 @@ #include #include "Chemistry/SurrogateModels/AI_functions.hpp" #include - #include #include @@ -74,19 +73,6 @@ static void init_global_functions(RInside &R) { } -/* Global variables for the AI surrogate model */ - - /* For the weights and biases of the model - * to use in an inference function with Eigen */ -std::mutex Eigen_model_mutex; -static EigenModel Eigen_model; - -/* For the training data */ -std::mutex training_data_buffer_mutex; -std::condition_variable training_data_buffer_full; -bool start_training; -TrainingData training_data_buffer; - // HACK: this is a step back as the order and also the count of fields is // predefined, but it will change in the future // static inline void writeFieldsToR(RInside &R, const Field &trans, @@ -298,7 +284,33 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, if (params.print_progress) { chem.setProgressBarPrintout(true); + } + + /* For the weights and biases of the AI surrogate + * model to use in an inference function with Eigen */ + std::mutex Eigen_model_mutex; + static EigenModel Eigen_model; + /* For the training data */ + std::mutex training_data_buffer_mutex; + std::condition_variable training_data_buffer_full; + bool start_training, end_training; + TrainingData training_data_buffer; + if (params.use_ai_surrogate) { + MSG("AI: Initialize model"); + Python_Keras_load_model(R["model_file_path"]); + MSG("AI: Initialize training thread"); + + Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex, + &training_data_buffer, &training_data_buffer_mutex, + &training_data_buffer_full, &start_training, &end_training, + params); + if (!params.use_Keras_predictions) { + MSG("AI: Use custom C++ prediction function"); + Python_Keras_set_weights_as_Eigen(Eigen_model); + } + MSG("AI: Surrogate model initialized"); } + R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps()); R["field_nrow"] = chem.getField().GetRequestedVecSize(); @@ -338,7 +350,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, R["TMP"] = Python_Keras_predict(R["predictors_scaled"], params.batch_size); } else { // Predict with custom Eigen function - R["TMP"] = Eigen_predict(Eigen_model, R["predictors_scaled"], params.batch_size); + R["TMP"] = Eigen_predict(Eigen_model, R["predictors_scaled"], params.batch_size, &Eigen_model_mutex); } // Apply postprocessing @@ -464,6 +476,12 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, profiling["diffusion"] = diffusion_profiling; chem.MasterLoopBreak(); + + if (params.use_ai_surrogate) { + MSG("Finalize Python and wind down training thread"); + Python_finalize(&Eigen_model_mutex, &training_data_buffer_mutex, + &training_data_buffer_full, &start_training, &end_training); + } return profiling; } @@ -607,57 +625,13 @@ int main(int argc, char *argv[]) { if (!Rcpp::as(R.parseEval("exists(\"model_file_path\")"))) { throw std::runtime_error("AI surrogate input script must contain a value for model_file_path"); } - - /* AI surrogate training and inference parameters. (Can be set by declaring a - variable of the same name in one of the the R input scripts)*/ - - run_params.use_Keras_predictions = false; - run_params.batch_size = 2560; // default value determined in test on the UP Turing cluster - run_params.training_epochs = 20; // - run_params.training_data_size = init_list.getDiffusionInit().n_rows * - init_list.getDiffusionInit().n_cols; // Default value is number of cells in field - run_params.save_model_path = ""; // Model is only saved if a path is set in the input field - if (Rcpp::as(R.parseEval("exists(\"batch_size\")"))) { - run_params.batch_size = R["batch_size"]; - } - if (Rcpp::as(R.parseEval("exists(\"training_epochs\")"))) { - run_params.training_epochs = R["training_epochs"]; - } - if (Rcpp::as(R.parseEval("exists(\"training_data_size\")"))) { - run_params.training_data_size = R["training_data_size"]; - } - if (Rcpp::as(R.parseEval("exists(\"use_Keras_predictions\")"))) { - run_params.use_Keras_predictions = R["use_Keras_predictions"]; - } - if (Rcpp::as(R.parseEval("exists(\"save_model_path\")"))) { - run_params.save_model_path = Rcpp::as(R["save_model_path"]); - MSG("AI: Model will be saved as \"" + run_params.save_model_path + "\""); - } - - + + set_ai_surrogate_runtime_params(R, run_params, init_list); + MSG("AI: Initialize Python for AI surrogate functions"); std::string python_keras_file = std::string(SRC_DIR) + "/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py"; Python_Keras_setup(python_keras_file); - - MSG("AI: Initialize model"); - Python_Keras_load_model(R["model_file_path"]); - - MSG("AI: Initialize training thread"); - Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex, - &training_data_buffer, &training_data_buffer_mutex, - &training_data_buffer_full, &start_training, - run_params.batch_size, run_params.training_epochs, - run_params.training_data_size, run_params.use_Keras_predictions, - run_params.save_model_path); - - - if (!run_params.use_Keras_predictions) { - MSG("AI: Use custom C++ prediction function"); - Python_Keras_set_weights_as_Eigen(Eigen_model); - } - - MSG("AI: Surrogate model initialized"); } MSG("Init done on process with rank " + std::to_string(MY_RANK)); @@ -675,10 +649,6 @@ int main(int argc, char *argv[]) { "'/timings.', setup$out_ext));"; R.parseEval(r_vis_code); - if (run_params.use_ai_surrogate) { - Python_finalize(); - } - MSG("Done! Results are stored as R objects into <" + run_params.out_dir + "/timings." + run_params.out_ext); } diff --git a/src/poet.hpp.in b/src/poet.hpp.in index c81bf7512..31afa1180 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -72,6 +72,8 @@ struct RuntimeParameters { bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate bool use_Keras_predictions; // Can be set in the R input script + bool Keras_predictions_always_use_CPU; // Can be set in the R input script + bool Keras_training_always_use_CPU; // Can be set in the R input script int batch_size; // Can be set in the R input script int training_epochs; // Can be set in the R input script int training_data_size; // Can be set in the R input script