diff --git a/R_lib/kin_r_library.R b/R_lib/kin_r_library.R index ad63ea005..e4c1a847a 100644 --- a/R_lib/kin_r_library.R +++ b/R_lib/kin_r_library.R @@ -78,8 +78,7 @@ master_iteration_end <- function(setup, state_T, state_C) { ai_surrogate_info <- list( prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL, predictions_validity = if (exists("validity_vector")) validity_vector else NULL, - predictions = if (exists("predictions")) predictions else NULL, - n_training_runs = if(exists("n_training_runs")) n_training_runs else NULL + predictions = if (exists("predictions")) predictions else NULL ) SaveRObj(x = list( diff --git a/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py b/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py index d1312659a..8d8a1e075 100644 --- a/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py +++ b/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py @@ -17,7 +17,7 @@ def initiate_model(model_file_path): return model def prediction_step(model, x, batch_size, cluster_labels): - + # TODO PREDICT ACCORDING TO CLUSTER prediction = model.predict(x, batch_size) return np.array(prediction, dtype=np.float64) @@ -27,15 +27,14 @@ def get_weights(model): weights = model.get_weights() return weights -def training_step(model, x, y, cluster_labels, batch_size, epochs, output_file_path): +def training_step(model, x, y, batch_size, epochs, + train_cluster, output_file_path): # Check clustering of input data # and only train for the cluster where nothing is happening - cluster_labels = np.array(cluster_labels, dtype=bool) - x = x[cluster_labels] - y = y[cluster_labels] - - print("SUM CLABEL: " + str(sum(cluster_labels)), flush=True) - print("Data size is: " + str(len(x)), flush=True) + + # TODO TRAIN ACCORDING TO CLUSTER + print("Training cluster: " + str(train_cluster), flush=True) + print("Training data size is: " + str(len(x)), flush=True) history = model.fit(x, y, epochs=epochs, diff --git a/src/Chemistry/SurrogateModels/AI_functions.cpp b/src/Chemistry/SurrogateModels/AI_functions.cpp index ca17c2091..1237a7700 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.cpp +++ b/src/Chemistry/SurrogateModels/AI_functions.cpp @@ -44,23 +44,26 @@ int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir * a variable "model_file_path" in the R input script * @return 0 if function was succesful */ -int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction) { +int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction, bool use_clustering) { // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); - // Initialize Keras model for the reaction cluster - int py_model_loaded = PyRun_SimpleString(("model_reaction = initiate_model(\"" + + // Initialize Keras default model + int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" + model_reaction + "\")").c_str()); if (py_model_loaded != 0) { PyErr_Print(); // Ensure that python errors make it to stdout throw std::runtime_error("Keras model could not be loaded from: " + model_reaction); } - // Initialize Keras model for the no reaction cluster - py_model_loaded = PyRun_SimpleString(("model_no_reaction = initiate_model(\"" + - model_no_reaction + "\")").c_str()); - if (py_model_loaded != 0) { - PyErr_Print(); // Ensure that python errors make it to stdout - throw std::runtime_error("Keras model could not be loaded from: " + model_no_reaction); + + if (use_clustering) { + // Initialize second Keras model that will be used for the no-reaction cluster + py_model_loaded = PyRun_SimpleString(("model_no_reaction = initiate_model(\"" + + model_no_reaction + "\")").c_str()); + if (py_model_loaded != 0) { + PyErr_Print(); + throw std::runtime_error("Keras model could not be loaded from: " + model_no_reaction); + } } // Release the Python GIL PyGILState_Release(gstate); @@ -256,14 +259,14 @@ std::vector Python_Keras_predict(std::vector>& x, in // Get the model and training function from the global python interpreter PyObject* py_main_module = PyImport_AddModule("__main__"); PyObject* py_global_dict = PyModule_GetDict(py_main_module); - PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction"); + PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model"); PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step"); // Build the function arguments as four python objects and an integer PyObject* args = Py_BuildValue("(OOiO)", py_keras_model, py_df_x, batch_size, py_cluster_list); // Call the Python training function - PyObject *py_predictions = PyObject_CallObject(py_inference_function, args); + PyObject* py_predictions = PyObject_CallObject(py_inference_function, args); // Check py_rv and return as 2D vector std::vector predictions = numpy_array_to_vector(py_predictions); @@ -277,7 +280,6 @@ std::vector Python_Keras_predict(std::vector>& x, in // Release the Python GIL PyGILState_Release(gstate); - return predictions; } @@ -413,28 +415,21 @@ void cluster_labels_append(std::vector& labels_buffer, std::vector& ne * @param params Global runtime paramters */ void Python_Keras_train(std::vector>& x, std::vector>& y, - std::vector& cluster_labels, const RuntimeParameters& params) { + int train_cluster, const RuntimeParameters& params) { // Prepare data for python PyObject* py_df_x = vector_to_numpy_array(x); PyObject* py_df_y = vector_to_numpy_array(y); - // Prepare cluster label vector for Python - PyObject* py_cluster_list = PyList_New(cluster_labels.size()); - for (size_t i = 0; i < cluster_labels.size(); i++) { - PyObject* py_int = PyLong_FromLong(cluster_labels[i]); - PyList_SET_ITEM(py_cluster_list, i, py_int); - } - // Get the model and training function from the global python interpreter PyObject* py_main_module = PyImport_AddModule("__main__"); PyObject* py_global_dict = PyModule_GetDict(py_main_module); - PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction"); + PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model"); PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step"); // Build the function arguments as four python objects and an integer - PyObject* args = Py_BuildValue("(OOOOiis)", - py_keras_model, py_df_x, py_df_y, py_cluster_list, params.batch_size, - params.training_epochs, params.save_model_path.c_str()); + PyObject* args = Py_BuildValue("(OOOiiis)", + py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs, + train_cluster, params.save_model_path.c_str()); // Call the Python training function @@ -487,36 +482,48 @@ void parallel_training(EigenModel* Eigen_model, // Get the necessary training data std::cout << "AI: Training thread: Getting training data" << std::endl; // Initialize training data input and targets - std::vector> inputs(9); - std::vector> targets(9); - for (int col = 0; col < training_data_buffer->x.size(); col++) { - // Copy data from the front of the buffer to the training inputs - std::copy(training_data_buffer->x[col].begin(), - training_data_buffer->x[col].begin() + params.training_data_size, - std::back_inserter(inputs[col])); - // Remove copied data from the front of the buffer - training_data_buffer->x[col].erase(training_data_buffer->x[col].begin(), - training_data_buffer->x[col].begin() + params.training_data_size); + std::vector> inputs(9, std::vector(params.training_data_size)); + std::vector> targets(9, std::vector(params.training_data_size)); - // Copy data from the front of the buffer to the training targets - std::copy(training_data_buffer->y[col].begin(), - training_data_buffer->y[col].begin() + params.training_data_size, - std::back_inserter(targets[col])); - // Remove copied data from the front of the buffer - training_data_buffer->y[col].erase(training_data_buffer->y[col].begin(), - training_data_buffer->y[col].begin() + params.training_data_size); + int buffer_size = training_data_buffer->x[0].size(); + // If clustering is used, check the current cluster + int n_cluster_reactive; + int train_cluster = -1; // Default value for non clustered training where all data is used + if (params.use_k_means_clustering) { + for (int i = 0; i < buffer_size; i++) { + n_cluster_reactive += training_data_buffer->cluster_labels[i]; + } + train_cluster = n_cluster_reactive >= params.training_data_size; } - // Initialize training data buffer labels - std::vector cluster_labels(training_data_buffer->cluster_labels.begin(), - training_data_buffer->cluster_labels.begin() + - params.training_data_size); - // Remove copied values from the front of the buffer - training_data_buffer->cluster_labels.erase(training_data_buffer->cluster_labels.begin(), - training_data_buffer->cluster_labels.begin() + - params.training_data_size); - + for (int col = 0; col < training_data_buffer->x.size(); col++) { + int buffer_row = 0; + int copied_row = 0; + while (copied_row < params.training_data_size && + buffer_row < training_data_buffer->x[col].size()) { + if ((train_cluster == -1) || + (train_cluster == training_data_buffer->cluster_labels[buffer_row])) { + // Copy and remove from training data buffer + inputs[col][copied_row] = training_data_buffer->x[col][buffer_row]; + targets[col][copied_row] = training_data_buffer->y[col][buffer_row]; + training_data_buffer->x[col].erase(training_data_buffer->x[col].begin() + + buffer_row); + training_data_buffer->y[col].erase(training_data_buffer->y[col].begin() + + buffer_row); + // Copy and remove from cluster label buffer + if (params.use_k_means_clustering && col == 0) { + training_data_buffer->cluster_labels.erase( + training_data_buffer->cluster_labels.begin() + buffer_row); + } + copied_row++; + } else { + buffer_row++; + } + } + } + // Set the waiting predicate to false if buffer is below threshold - *start_training = training_data_buffer->y[0].size() >= params.training_data_size; + *start_training = (n_cluster_reactive >= params.training_data_size) || + (buffer_size - n_cluster_reactive >= params.training_data_size); //update number of training runs training_data_buffer->n_training_runs += 1; // Unlock the training_data_buffer_mutex @@ -527,7 +534,7 @@ void parallel_training(EigenModel* Eigen_model, // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); // Start training - Python_Keras_train(inputs, targets, cluster_labels, params); + Python_Keras_train(inputs, targets, train_cluster, params); if (!params.use_Keras_predictions) { std::cout << "AI: Training thread: Update shared model weights" << std::endl; @@ -611,7 +618,7 @@ std::vector>> Python_Keras_get_weights() { PyObject* py_main_module = PyImport_AddModule("__main__"); PyObject* py_global_dict = PyModule_GetDict(py_main_module); - PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction"); + PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model"); PyObject* py_get_weights_function = PyDict_GetItemString(py_global_dict, "get_weights"); PyObject* args = Py_BuildValue("(O)", py_keras_model); diff --git a/src/Chemistry/SurrogateModels/AI_functions.hpp b/src/Chemistry/SurrogateModels/AI_functions.hpp index 0df51342d..ade5e240d 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.hpp +++ b/src/Chemistry/SurrogateModels/AI_functions.hpp @@ -5,10 +5,13 @@ * @version 0.1 * @date 01 Nov 2024 * - * This file implements the creation of a DHT by using the MPI - * one-sided-communication. There is also the possibility to write or read data - * from or to the DHT. In addition, the current state of the DHT can be written - * to a file and read in again later. + * This file implements functions to train and predict with a neural network. + * All functions are based on a user supplied Keras model. Pyhton.h is used for model + * training with Keras and can be used for inference. The default inference funtion + * is implemented with Eigen matrices in C++. All functions use 2 different models + * to process data separately according to a K-means cluster assignement. This file + * alo contains the functions for the K-means algorithm. + * */ #ifndef AI_FUNCTIONS_H @@ -18,7 +21,8 @@ #include #include "poet.hpp" -// PhreeqC definition of pi clashes with Eigen macros so we have to temporarily undef it +// PhreeqC definition of pi clashes with Eigen macros +// so we have to temporarily undef it #pragma push_macro("pi") #undef pi #include @@ -27,8 +31,15 @@ namespace poet { struct EigenModel { - std::vector weight_matrices; - std::vector biases; + // The first model will be used for all values if clustering is disabled + // or for the reactive part of the field if clustering is enabled + std::vector weight_matrices; + std::vector biases; + + // The other model will be used for the non-reactive cluster + // (if clustering is enabled) + std::vector weight_matrices_no_reaction; + std::vector biases_no_reaction; }; struct TrainingData { @@ -47,7 +58,8 @@ void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_bu std::condition_variable* training_data_buffer_full, bool* start_training, bool* end_training); -int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction); +int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction, + bool use_clustering); std::vector K_Means(std::vector>& field, int k, int maxIterations = 100); @@ -80,7 +92,7 @@ std::vector Eigen_predict(const EigenModel& model, std::vector K_Means(std::vector>&, int, int) {return {};} inline std::vector Python_Keras_predict(std::vector>&, int, std::vector&){return {};} diff --git a/src/poet.cpp b/src/poet.cpp index 75e089890..f1a747630 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -295,12 +295,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::condition_variable training_data_buffer_full; bool start_training, end_training; TrainingData training_data_buffer; - std::vector cluster_labels(chem.getField().GetRequestedVecSize()); + std::vector cluster_labels(chem.getField().GetRequestedVecSize(), -1); if (params.use_ai_surrogate) { MSG("AI: Initialize model"); // Initiate two models from one file TODO: Expand this for two input files - Python_Keras_load_model(R["model_file_path"], R["model_file_path"]); + Python_Keras_load_model(R["model_file_path"], R["model_file_path"], + params.use_k_means_clustering); if (!params.disable_training) { MSG("AI: Initialize training thread"); Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex, @@ -367,7 +368,9 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, std::vector> predictors_scaled = R["predictors_scaled"]; // Get K-Means cluster assignements based on the preprocessed data - cluster_labels = K_Means(predictors_scaled, 2, 300); + if (params.use_k_means_clustering) { + cluster_labels = K_Means(predictors_scaled, 2, 300); + } /* int size = (int)(std::sqrt(chem.getField().GetRequestedVecSize())); @@ -437,7 +440,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, if (params.use_ai_surrogate && !params.disable_training) { // Add values for which the predictions were invalid // to training data buffer - MSG("AI: Add invalid predictions to training data buffer"); + MSG("AI: Add invalid predictions to training data buffer"); std::vector> invalid_x = R.parseEval("get_invalid_values(predictors_scaled, validity_vector)"); @@ -448,15 +451,25 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, training_data_buffer_mutex.lock(); training_data_buffer_append(training_data_buffer.x, invalid_x); training_data_buffer_append(training_data_buffer.y, invalid_y); - cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels, + + // If clustering is used, count the size of the buffer according + // to the cluster assignements + int n_cluster_reactive = 0; + int buffer_size = training_data_buffer.x[0].size(); + if (params.use_k_means_clustering) { + cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels, R["validity_vector"]); - // Signal to training thread if training data buffer is full - if (training_data_buffer.y[0].size() >= params.training_data_size) { + for (int i = 0; i < buffer_size; i++) { + n_cluster_reactive += training_data_buffer.cluster_labels[i]; + } + } + // Signal to the training thread if enough training data is present + if ((n_cluster_reactive >= params.training_data_size) || + (buffer_size - n_cluster_reactive >= params.training_data_size)) { start_training = true; training_data_buffer_full.notify_one(); } training_data_buffer_mutex.unlock(); - R["n_training_runs"] = training_data_buffer.n_training_runs; } diffusion.getField().update(chem.getField()); @@ -683,6 +696,9 @@ int main(int argc, char *argv[]) { run_params.save_model_path = Rcpp::as(R["save_model_path"]); std::cout << "AI: Model will be saved as \"" << run_params.save_model_path << "\"" << std::endl; } + if (Rcpp::as(R.parseEval("exists(\"use_k_means_clustering\")"))) { + run_params.use_k_means_clustering = R["use_k_means_clustering"]; + } MSG("AI: Initialize Python for AI surrogate functions"); std::string python_keras_file = std::string(SRC_DIR) + diff --git a/src/poet.hpp.in b/src/poet.hpp.in index bcea9a668..b3878d67f 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -72,6 +72,7 @@ struct RuntimeParameters { /*AI surriogate configuration*/ bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate bool disable_training; // Can be set in the R input script + bool use_k_means_clustering; // Can be set in the R input script bool use_Keras_predictions; // Can be set in the R input script int batch_size; // Can be set in the R input script int training_epochs; // Can be set in the R input script