feat: input script option to use k means, fill training data buffer accordingly

This commit is contained in:
straile 2024-10-26 13:33:01 +02:00
parent b4d093d205
commit 361b34d11d
6 changed files with 114 additions and 80 deletions

View File

@ -78,8 +78,7 @@ master_iteration_end <- function(setup, state_T, state_C) {
ai_surrogate_info <- list(
prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL,
predictions_validity = if (exists("validity_vector")) validity_vector else NULL,
predictions = if (exists("predictions")) predictions else NULL,
n_training_runs = if(exists("n_training_runs")) n_training_runs else NULL
predictions = if (exists("predictions")) predictions else NULL
)
SaveRObj(x = list(

View File

@ -17,7 +17,7 @@ def initiate_model(model_file_path):
return model
def prediction_step(model, x, batch_size, cluster_labels):
# TODO PREDICT ACCORDING TO CLUSTER
prediction = model.predict(x, batch_size)
return np.array(prediction, dtype=np.float64)
@ -27,15 +27,14 @@ def get_weights(model):
weights = model.get_weights()
return weights
def training_step(model, x, y, cluster_labels, batch_size, epochs, output_file_path):
def training_step(model, x, y, batch_size, epochs,
train_cluster, output_file_path):
# Check clustering of input data
# and only train for the cluster where nothing is happening
cluster_labels = np.array(cluster_labels, dtype=bool)
x = x[cluster_labels]
y = y[cluster_labels]
print("SUM CLABEL: " + str(sum(cluster_labels)), flush=True)
print("Data size is: " + str(len(x)), flush=True)
# TODO TRAIN ACCORDING TO CLUSTER
print("Training cluster: " + str(train_cluster), flush=True)
print("Training data size is: " + str(len(x)), flush=True)
history = model.fit(x, y,
epochs=epochs,

View File

@ -44,24 +44,27 @@ int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir
* a variable "model_file_path" in the R input script
* @return 0 if function was succesful
*/
int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction) {
int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction, bool use_clustering) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Initialize Keras model for the reaction cluster
int py_model_loaded = PyRun_SimpleString(("model_reaction = initiate_model(\"" +
// Initialize Keras default model
int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" +
model_reaction + "\")").c_str());
if (py_model_loaded != 0) {
PyErr_Print(); // Ensure that python errors make it to stdout
throw std::runtime_error("Keras model could not be loaded from: " + model_reaction);
}
// Initialize Keras model for the no reaction cluster
if (use_clustering) {
// Initialize second Keras model that will be used for the no-reaction cluster
py_model_loaded = PyRun_SimpleString(("model_no_reaction = initiate_model(\"" +
model_no_reaction + "\")").c_str());
if (py_model_loaded != 0) {
PyErr_Print(); // Ensure that python errors make it to stdout
PyErr_Print();
throw std::runtime_error("Keras model could not be loaded from: " + model_no_reaction);
}
}
// Release the Python GIL
PyGILState_Release(gstate);
return py_model_loaded;
@ -256,14 +259,14 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, in
// Get the model and training function from the global python interpreter
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction");
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step");
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOiO)",
py_keras_model, py_df_x, batch_size, py_cluster_list);
// Call the Python training function
PyObject *py_predictions = PyObject_CallObject(py_inference_function, args);
PyObject* py_predictions = PyObject_CallObject(py_inference_function, args);
// Check py_rv and return as 2D vector
std::vector<double> predictions = numpy_array_to_vector(py_predictions);
@ -277,7 +280,6 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, in
// Release the Python GIL
PyGILState_Release(gstate);
return predictions;
}
@ -413,28 +415,21 @@ void cluster_labels_append(std::vector<int>& labels_buffer, std::vector<int>& ne
* @param params Global runtime paramters
*/
void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::vector<double>>& y,
std::vector<int>& cluster_labels, const RuntimeParameters& params) {
int train_cluster, const RuntimeParameters& params) {
// Prepare data for python
PyObject* py_df_x = vector_to_numpy_array(x);
PyObject* py_df_y = vector_to_numpy_array(y);
// Prepare cluster label vector for Python
PyObject* py_cluster_list = PyList_New(cluster_labels.size());
for (size_t i = 0; i < cluster_labels.size(); i++) {
PyObject* py_int = PyLong_FromLong(cluster_labels[i]);
PyList_SET_ITEM(py_cluster_list, i, py_int);
}
// Get the model and training function from the global python interpreter
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction");
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step");
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOOOiis)",
py_keras_model, py_df_x, py_df_y, py_cluster_list, params.batch_size,
params.training_epochs, params.save_model_path.c_str());
PyObject* args = Py_BuildValue("(OOOiiis)",
py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs,
train_cluster, params.save_model_path.c_str());
// Call the Python training function
@ -487,36 +482,48 @@ void parallel_training(EigenModel* Eigen_model,
// Get the necessary training data
std::cout << "AI: Training thread: Getting training data" << std::endl;
// Initialize training data input and targets
std::vector<std::vector<double>> inputs(9);
std::vector<std::vector<double>> targets(9);
for (int col = 0; col < training_data_buffer->x.size(); col++) {
// Copy data from the front of the buffer to the training inputs
std::copy(training_data_buffer->x[col].begin(),
training_data_buffer->x[col].begin() + params.training_data_size,
std::back_inserter(inputs[col]));
// Remove copied data from the front of the buffer
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin(),
training_data_buffer->x[col].begin() + params.training_data_size);
std::vector<std::vector<double>> inputs(9, std::vector<double>(params.training_data_size));
std::vector<std::vector<double>> targets(9, std::vector<double>(params.training_data_size));
// Copy data from the front of the buffer to the training targets
std::copy(training_data_buffer->y[col].begin(),
training_data_buffer->y[col].begin() + params.training_data_size,
std::back_inserter(targets[col]));
// Remove copied data from the front of the buffer
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin(),
training_data_buffer->y[col].begin() + params.training_data_size);
int buffer_size = training_data_buffer->x[0].size();
// If clustering is used, check the current cluster
int n_cluster_reactive;
int train_cluster = -1; // Default value for non clustered training where all data is used
if (params.use_k_means_clustering) {
for (int i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer->cluster_labels[i];
}
train_cluster = n_cluster_reactive >= params.training_data_size;
}
for (int col = 0; col < training_data_buffer->x.size(); col++) {
int buffer_row = 0;
int copied_row = 0;
while (copied_row < params.training_data_size &&
buffer_row < training_data_buffer->x[col].size()) {
if ((train_cluster == -1) ||
(train_cluster == training_data_buffer->cluster_labels[buffer_row])) {
// Copy and remove from training data buffer
inputs[col][copied_row] = training_data_buffer->x[col][buffer_row];
targets[col][copied_row] = training_data_buffer->y[col][buffer_row];
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin() +
buffer_row);
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin() +
buffer_row);
// Copy and remove from cluster label buffer
if (params.use_k_means_clustering && col == 0) {
training_data_buffer->cluster_labels.erase(
training_data_buffer->cluster_labels.begin() + buffer_row);
}
copied_row++;
} else {
buffer_row++;
}
}
}
// Initialize training data buffer labels
std::vector<int> cluster_labels(training_data_buffer->cluster_labels.begin(),
training_data_buffer->cluster_labels.begin() +
params.training_data_size);
// Remove copied values from the front of the buffer
training_data_buffer->cluster_labels.erase(training_data_buffer->cluster_labels.begin(),
training_data_buffer->cluster_labels.begin() +
params.training_data_size);
// Set the waiting predicate to false if buffer is below threshold
*start_training = training_data_buffer->y[0].size() >= params.training_data_size;
*start_training = (n_cluster_reactive >= params.training_data_size) ||
(buffer_size - n_cluster_reactive >= params.training_data_size);
//update number of training runs
training_data_buffer->n_training_runs += 1;
// Unlock the training_data_buffer_mutex
@ -527,7 +534,7 @@ void parallel_training(EigenModel* Eigen_model,
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Start training
Python_Keras_train(inputs, targets, cluster_labels, params);
Python_Keras_train(inputs, targets, train_cluster, params);
if (!params.use_Keras_predictions) {
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
@ -611,7 +618,7 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights() {
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction");
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
PyObject* py_get_weights_function = PyDict_GetItemString(py_global_dict, "get_weights");
PyObject* args = Py_BuildValue("(O)", py_keras_model);

View File

@ -5,10 +5,13 @@
* @version 0.1
* @date 01 Nov 2024
*
* This file implements the creation of a DHT by using the MPI
* one-sided-communication. There is also the possibility to write or read data
* from or to the DHT. In addition, the current state of the DHT can be written
* to a file and read in again later.
* This file implements functions to train and predict with a neural network.
* All functions are based on a user supplied Keras model. Pyhton.h is used for model
* training with Keras and can be used for inference. The default inference funtion
* is implemented with Eigen matrices in C++. All functions use 2 different models
* to process data separately according to a K-means cluster assignement. This file
* alo contains the functions for the K-means algorithm.
*
*/
#ifndef AI_FUNCTIONS_H
@ -18,7 +21,8 @@
#include <vector>
#include "poet.hpp"
// PhreeqC definition of pi clashes with Eigen macros so we have to temporarily undef it
// PhreeqC definition of pi clashes with Eigen macros
// so we have to temporarily undef it
#pragma push_macro("pi")
#undef pi
#include <Eigen/Dense>
@ -27,8 +31,15 @@
namespace poet {
struct EigenModel {
// The first model will be used for all values if clustering is disabled
// or for the reactive part of the field if clustering is enabled
std::vector<Eigen::MatrixXd> weight_matrices;
std::vector<Eigen::VectorXd> biases;
// The other model will be used for the non-reactive cluster
// (if clustering is enabled)
std::vector<Eigen::MatrixXd> weight_matrices_no_reaction;
std::vector<Eigen::VectorXd> biases_no_reaction;
};
struct TrainingData {
@ -47,7 +58,8 @@ void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_bu
std::condition_variable* training_data_buffer_full, bool* start_training,
bool* end_training);
int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction);
int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction,
bool use_clustering);
std::vector<int> K_Means(std::vector<std::vector<double>>& field, int k, int maxIterations = 100);
@ -80,7 +92,7 @@ std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vect
#else
inline void Python_Keras_setup(std::string, std::string){}
inline void Python_finalize(std::mutex*, std::mutex*, std::condition_variable*, bool*, bool*){}
inline void Python_Keras_load_model(std::string, std::string){}
inline void Python_Keras_load_model(std::string, std::string, bool){}
inline std::vector<int> K_Means(std::vector<std::vector<double>>&, int, int) {return {};}
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>&, int,
std::vector<int>&){return {};}

View File

@ -295,12 +295,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
std::condition_variable training_data_buffer_full;
bool start_training, end_training;
TrainingData training_data_buffer;
std::vector<int> cluster_labels(chem.getField().GetRequestedVecSize());
std::vector<int> cluster_labels(chem.getField().GetRequestedVecSize(), -1);
if (params.use_ai_surrogate) {
MSG("AI: Initialize model");
// Initiate two models from one file TODO: Expand this for two input files
Python_Keras_load_model(R["model_file_path"], R["model_file_path"]);
Python_Keras_load_model(R["model_file_path"], R["model_file_path"],
params.use_k_means_clustering);
if (!params.disable_training) {
MSG("AI: Initialize training thread");
Python_Keras_training_thread(&Eigen_model, &Eigen_model_mutex,
@ -367,7 +368,9 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
std::vector<std::vector<double>> predictors_scaled = R["predictors_scaled"];
// Get K-Means cluster assignements based on the preprocessed data
if (params.use_k_means_clustering) {
cluster_labels = K_Means(predictors_scaled, 2, 300);
}
/*
int size = (int)(std::sqrt(chem.getField().GetRequestedVecSize()));
@ -448,15 +451,25 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
training_data_buffer_mutex.lock();
training_data_buffer_append(training_data_buffer.x, invalid_x);
training_data_buffer_append(training_data_buffer.y, invalid_y);
// If clustering is used, count the size of the buffer according
// to the cluster assignements
int n_cluster_reactive = 0;
int buffer_size = training_data_buffer.x[0].size();
if (params.use_k_means_clustering) {
cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels,
R["validity_vector"]);
// Signal to training thread if training data buffer is full
if (training_data_buffer.y[0].size() >= params.training_data_size) {
for (int i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer.cluster_labels[i];
}
}
// Signal to the training thread if enough training data is present
if ((n_cluster_reactive >= params.training_data_size) ||
(buffer_size - n_cluster_reactive >= params.training_data_size)) {
start_training = true;
training_data_buffer_full.notify_one();
}
training_data_buffer_mutex.unlock();
R["n_training_runs"] = training_data_buffer.n_training_runs;
}
diffusion.getField().update(chem.getField());
@ -683,6 +696,9 @@ int main(int argc, char *argv[]) {
run_params.save_model_path = Rcpp::as<std::string>(R["save_model_path"]);
std::cout << "AI: Model will be saved as \"" << run_params.save_model_path << "\"" << std::endl;
}
if (Rcpp::as<bool>(R.parseEval("exists(\"use_k_means_clustering\")"))) {
run_params.use_k_means_clustering = R["use_k_means_clustering"];
}
MSG("AI: Initialize Python for AI surrogate functions");
std::string python_keras_file = std::string(SRC_DIR) +

View File

@ -72,6 +72,7 @@ struct RuntimeParameters {
/*AI surriogate configuration*/
bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate
bool disable_training; // Can be set in the R input script
bool use_k_means_clustering; // Can be set in the R input script
bool use_Keras_predictions; // Can be set in the R input script
int batch_size; // Can be set in the R input script
int training_epochs; // Can be set in the R input script