mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 04:48:23 +01:00
feat: cluster labels in training data buffer
This commit is contained in:
parent
51b3608b68
commit
b4d093d205
@ -16,7 +16,9 @@ def initiate_model(model_file_path):
|
||||
model = tf.keras.models.load_model(model_file_path)
|
||||
return model
|
||||
|
||||
def prediction_step(model, x, batch_size):
|
||||
def prediction_step(model, x, batch_size, cluster_labels):
|
||||
|
||||
|
||||
prediction = model.predict(x, batch_size)
|
||||
return np.array(prediction, dtype=np.float64)
|
||||
|
||||
@ -25,21 +27,14 @@ def get_weights(model):
|
||||
weights = model.get_weights()
|
||||
return weights
|
||||
|
||||
def training_step(model, x, y, batch_size, epochs, output_file_path):
|
||||
def training_step(model, x, y, cluster_labels, batch_size, epochs, output_file_path):
|
||||
# Check clustering of input data
|
||||
# and only train for the cluster where nothing is happening
|
||||
labels = k_means(x)
|
||||
n = int(np.sqrt(len(labels)))
|
||||
for row in range(n):
|
||||
row_values = []
|
||||
for col in range(n):
|
||||
row_values.append(labels[((n - (row + 1)) * n) + col])
|
||||
print("".join(map(str, row_values)), flush=True)
|
||||
|
||||
x = x[labels==labels[-1]]
|
||||
y = y[labels==labels[-1]]
|
||||
cluster_labels = np.array(cluster_labels, dtype=bool)
|
||||
x = x[cluster_labels]
|
||||
y = y[cluster_labels]
|
||||
|
||||
print("Relevant Cluster is: " + str(labels[-1]), flush=True)
|
||||
print("SUM CLABEL: " + str(sum(cluster_labels)), flush=True)
|
||||
print("Data size is: " + str(len(x)), flush=True)
|
||||
|
||||
history = model.fit(x, y,
|
||||
|
||||
@ -154,7 +154,7 @@ std::vector<vector<double>> calculate_new_clusters(const std::vector<std::vector
|
||||
* @param iterations The number of cluster update steps
|
||||
* @return A vector that contains the assigned cluster for each of the rows in field
|
||||
*/
|
||||
std::vector<int> kMeans(std::vector<std::vector<double>>& field, int k, int iterations) {
|
||||
std::vector<int> K_Means(std::vector<std::vector<double>>& field, int k, int iterations) {
|
||||
// Initialize cluster centers by selecting random points from the field
|
||||
srand(time(0));
|
||||
std::vector<vector<double>> clusters;
|
||||
@ -239,12 +239,19 @@ std::vector<double> numpy_array_to_vector(PyObject* py_array) {
|
||||
* @return Predictions that the neural network made from the input values x. The predictions are
|
||||
* represented as a vector similar to the representation from the Field.AsVector() method
|
||||
*/
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size) {
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
|
||||
std::vector<int>& cluster_labels) {
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
|
||||
// Prepare data for python
|
||||
// Prepare data for Python
|
||||
PyObject* py_df_x = vector_to_numpy_array(x);
|
||||
|
||||
// Prepare cluster label vector for Python
|
||||
PyObject* py_cluster_list = PyList_New(cluster_labels.size());
|
||||
for (size_t i = 0; i < cluster_labels.size(); i++) {
|
||||
PyObject* py_int = PyLong_FromLong(cluster_labels[i]);
|
||||
PyList_SET_ITEM(py_cluster_list, i, py_int);
|
||||
}
|
||||
|
||||
// Get the model and training function from the global python interpreter
|
||||
PyObject* py_main_module = PyImport_AddModule("__main__");
|
||||
@ -252,8 +259,8 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, in
|
||||
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model_no_reaction");
|
||||
PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step");
|
||||
// Build the function arguments as four python objects and an integer
|
||||
PyObject* args = Py_BuildValue("(OOi)",
|
||||
py_keras_model, py_df_x, batch_size);
|
||||
PyObject* args = Py_BuildValue("(OOiO)",
|
||||
py_keras_model, py_df_x, batch_size, py_cluster_list);
|
||||
|
||||
// Call the Python training function
|
||||
PyObject *py_predictions = PyObject_CallObject(py_inference_function, args);
|
||||
@ -264,6 +271,7 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, in
|
||||
// Clean up
|
||||
PyErr_Print(); // Ensure that python errors make it to stdout
|
||||
Py_XDECREF(py_df_x);
|
||||
Py_XDECREF(py_cluster_list);
|
||||
Py_XDECREF(args);
|
||||
Py_XDECREF(py_predictions);
|
||||
|
||||
@ -304,7 +312,7 @@ Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<const Eigen::MatrixXd>&
|
||||
* represented as a vector similar to the representation from the Field.AsVector() method
|
||||
*/
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>>& x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex) {
|
||||
std::mutex* Eigen_model_mutex, std::vector<int>& cluster_labels) {
|
||||
// Convert input data to Eigen matrix
|
||||
const int num_samples = x[0].size();
|
||||
const int num_features = x.size();
|
||||
@ -361,6 +369,42 @@ void training_data_buffer_append(std::vector<std::vector<double>>& training_data
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Appends data from one int vector to another based on a mask vector
|
||||
* @param labels Vector that the values are appended to
|
||||
* @param new_labels Values that are appended
|
||||
* @param validity Mask vector that defines how many and which values are appended
|
||||
*/
|
||||
void cluster_labels_append(std::vector<int>& labels_buffer, std::vector<int>& new_labels,
|
||||
std::vector<int> validity) {
|
||||
// Calculate new buffer size from number of valid elements in mask
|
||||
int n_invalid = validity.size();
|
||||
for (int i = 0; i < validity.size(); i++) {
|
||||
n_invalid -= validity[i];
|
||||
}
|
||||
|
||||
// Interprete the reactive cluster as the one on the origin of the field
|
||||
// TODO: Is that always correct?
|
||||
int reactive_cluster = new_labels[0];
|
||||
// Resize label vector to hold non valid elements
|
||||
// Iterate over mask to transfer cluster labels
|
||||
int end_index = labels_buffer.size();
|
||||
int new_size = end_index + n_invalid;
|
||||
labels_buffer.resize(new_size);
|
||||
for (int i = 0; i < validity.size(); ++i) {
|
||||
// Append only the labels of invalid rows
|
||||
if (!validity[i]) {
|
||||
int label = new_labels[i];
|
||||
//Always define the reactive cluster as cluster 1
|
||||
if (reactive_cluster == 0) {
|
||||
label = 1 - label;
|
||||
}
|
||||
labels_buffer[end_index] = label;
|
||||
end_index++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Uses the Python environment with Keras' default functions to train the model
|
||||
@ -369,11 +413,18 @@ void training_data_buffer_append(std::vector<std::vector<double>>& training_data
|
||||
* @param params Global runtime paramters
|
||||
*/
|
||||
void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::vector<double>>& y,
|
||||
const RuntimeParameters& params) {
|
||||
std::vector<int>& cluster_labels, const RuntimeParameters& params) {
|
||||
// Prepare data for python
|
||||
PyObject* py_df_x = vector_to_numpy_array(x);
|
||||
PyObject* py_df_y = vector_to_numpy_array(y);
|
||||
|
||||
// Prepare cluster label vector for Python
|
||||
PyObject* py_cluster_list = PyList_New(cluster_labels.size());
|
||||
for (size_t i = 0; i < cluster_labels.size(); i++) {
|
||||
PyObject* py_int = PyLong_FromLong(cluster_labels[i]);
|
||||
PyList_SET_ITEM(py_cluster_list, i, py_int);
|
||||
}
|
||||
|
||||
// Get the model and training function from the global python interpreter
|
||||
PyObject* py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
|
||||
@ -381,9 +432,9 @@ void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::ve
|
||||
PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step");
|
||||
|
||||
// Build the function arguments as four python objects and an integer
|
||||
PyObject* args = Py_BuildValue("(OOOiis)",
|
||||
py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs,
|
||||
params.save_model_path.c_str());
|
||||
PyObject* args = Py_BuildValue("(OOOOiis)",
|
||||
py_keras_model, py_df_x, py_df_y, py_cluster_list, params.batch_size,
|
||||
params.training_epochs, params.save_model_path.c_str());
|
||||
|
||||
|
||||
// Call the Python training function
|
||||
@ -433,7 +484,6 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
if (*end_training) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the necessary training data
|
||||
std::cout << "AI: Training thread: Getting training data" << std::endl;
|
||||
// Initialize training data input and targets
|
||||
@ -456,7 +506,16 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin(),
|
||||
training_data_buffer->y[col].begin() + params.training_data_size);
|
||||
}
|
||||
// Set the waiting predicate to false if buffer is below threshold
|
||||
// Initialize training data buffer labels
|
||||
std::vector<int> cluster_labels(training_data_buffer->cluster_labels.begin(),
|
||||
training_data_buffer->cluster_labels.begin() +
|
||||
params.training_data_size);
|
||||
// Remove copied values from the front of the buffer
|
||||
training_data_buffer->cluster_labels.erase(training_data_buffer->cluster_labels.begin(),
|
||||
training_data_buffer->cluster_labels.begin() +
|
||||
params.training_data_size);
|
||||
|
||||
// Set the waiting predicate to false if buffer is below threshold
|
||||
*start_training = training_data_buffer->y[0].size() >= params.training_data_size;
|
||||
//update number of training runs
|
||||
training_data_buffer->n_training_runs += 1;
|
||||
@ -467,9 +526,8 @@ void parallel_training(EigenModel* Eigen_model,
|
||||
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
|
||||
// Start training
|
||||
Python_Keras_train(inputs, targets, params);
|
||||
Python_Keras_train(inputs, targets, cluster_labels, params);
|
||||
|
||||
if (!params.use_Keras_predictions) {
|
||||
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
|
||||
@ -508,7 +566,6 @@ int Python_Keras_training_thread(EigenModel* Eigen_model,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params) {
|
||||
|
||||
PyThreadState *_save = PyEval_SaveThread();
|
||||
python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_mutex,
|
||||
training_data_buffer, training_data_buffer_mutex,
|
||||
@ -580,6 +637,7 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights() {
|
||||
PyArrayObject* weight_np = reinterpret_cast<PyArrayObject*>(py_weight_array);
|
||||
int dtype = PyArray_TYPE(weight_np);
|
||||
|
||||
// If array is 2D it's a weight matrix
|
||||
if (PyArray_NDIM(weight_np) == 2) {
|
||||
int num_rows = PyArray_DIM(weight_np, 0);
|
||||
int num_cols = PyArray_DIM(weight_np, 1);
|
||||
@ -605,8 +663,8 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights() {
|
||||
}
|
||||
cpp_weights.push_back(weight_matrix);
|
||||
|
||||
// If array is 1D it's a bias vector
|
||||
} else if (PyArray_NDIM(weight_np) == 1) {
|
||||
// 1D bias vectors
|
||||
int num_elements = PyArray_DIM(weight_np, 0);
|
||||
std::vector<std::vector<double>> bias_vector(1, std::vector<double>(num_elements));
|
||||
|
||||
@ -625,8 +683,6 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights() {
|
||||
throw std::runtime_error("Unsupported data type for biases. Must be NPY_FLOAT32 or NPY_DOUBLE.");
|
||||
}
|
||||
cpp_weights.push_back(bias_vector);
|
||||
} else {
|
||||
throw std::runtime_error("Unsupported weight dimension. Only 1D and 2D arrays are supported.");
|
||||
}
|
||||
}
|
||||
// Clean up
|
||||
|
||||
@ -34,6 +34,7 @@ struct EigenModel {
|
||||
struct TrainingData {
|
||||
std::vector<std::vector<double>> x;
|
||||
std::vector<std::vector<double>> y;
|
||||
std::vector<int> cluster_labels;
|
||||
int n_training_runs = 0;
|
||||
};
|
||||
|
||||
@ -43,17 +44,22 @@ struct TrainingData {
|
||||
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir);
|
||||
|
||||
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full, bool* start_training, bool* end_training);
|
||||
std::condition_variable* training_data_buffer_full, bool* start_training,
|
||||
bool* end_training);
|
||||
|
||||
int Python_Keras_load_model(std::string model_reaction, std::string model_no_reaction);
|
||||
|
||||
std::vector<int> kMeans(std::vector<std::vector<double>>& field, int k, int maxIterations = 100);
|
||||
std::vector<int> K_Means(std::vector<std::vector<double>>& field, int k, int maxIterations = 100);
|
||||
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size);
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
|
||||
std::vector<int>& cluster_labels);
|
||||
|
||||
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
|
||||
std::vector<std::vector<double>>& new_values);
|
||||
|
||||
void cluster_labels_append(std::vector<int>& labels, std::vector<int>& new_labels,
|
||||
std::vector<int> validity);
|
||||
|
||||
int Python_Keras_training_thread(EigenModel* Eigen_model,
|
||||
std::mutex* Eigen_model_mutex,
|
||||
TrainingData* training_data_buffer,
|
||||
@ -66,17 +72,21 @@ void update_weights(EigenModel* model, const std::vector<std::vector<std::vector
|
||||
|
||||
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights();
|
||||
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>>& x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex);
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>>& x,
|
||||
int batch_size, std::mutex* Eigen_model_mutex,
|
||||
std::vector<int>& cluster_labels);
|
||||
|
||||
// Otherwise, define the necessary stubs
|
||||
#else
|
||||
inline void Python_Keras_setup(std::string, std::string){}
|
||||
inline void Python_finalize(std::mutex*, std::mutex*, std::condition_variable*, bool*, bool*){}
|
||||
inline void Python_Keras_load_model(std::string, std::string){}
|
||||
inline std::vector<int> kMeans(std::vector<std::vector<double>>&, int, int) {return {};}
|
||||
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>&, int){return {};}
|
||||
inline void training_data_buffer_append(std::vector<std::vector<double>>&, std::vector<std::vector<double>>&){}
|
||||
inline std::vector<int> K_Means(std::vector<std::vector<double>>&, int, int) {return {};}
|
||||
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>&, int,
|
||||
std::vector<int>&){return {};}
|
||||
inline void training_data_buffer_append(std::vector<std::vector<double>>&,
|
||||
std::vector<std::vector<double>>&){}
|
||||
inline void cluster_labels_append(std::vector<int>&, std::vector<int>&, std::vector<int>){}
|
||||
inline int Python_Keras_training_thread(EigenModel*, std::mutex*,
|
||||
TrainingData*, std::mutex*,
|
||||
std::condition_variable*,
|
||||
@ -84,7 +94,8 @@ inline int Python_Keras_training_thread(EigenModel*, std::mutex*,
|
||||
|
||||
inline void update_weights(EigenModel*, const std::vector<std::vector<std::vector<double>>>&){}
|
||||
inline std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(){return {};}
|
||||
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>&, int, std::mutex*){return {};}
|
||||
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>&, int,
|
||||
std::mutex*, std::vector<int>&){return {};}
|
||||
#endif
|
||||
} // namespace poet
|
||||
|
||||
|
||||
25
src/poet.cpp
25
src/poet.cpp
@ -295,6 +295,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
std::condition_variable training_data_buffer_full;
|
||||
bool start_training, end_training;
|
||||
TrainingData training_data_buffer;
|
||||
std::vector<int> cluster_labels(chem.getField().GetRequestedVecSize());
|
||||
if (params.use_ai_surrogate) {
|
||||
MSG("AI: Initialize model");
|
||||
|
||||
@ -365,30 +366,28 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
R.parseEval("predictors_scaled <- preprocess(predictors)");
|
||||
std::vector<std::vector<double>> predictors_scaled = R["predictors_scaled"];
|
||||
|
||||
|
||||
|
||||
|
||||
// get k means
|
||||
MSG("KMEANSSSSS:");
|
||||
std::vector<int> labels = kMeans(predictors_scaled, 2, 300);
|
||||
// Get K-Means cluster assignements based on the preprocessed data
|
||||
cluster_labels = K_Means(predictors_scaled, 2, 300);
|
||||
/*
|
||||
int size = (int)(std::sqrt(chem.getField().GetRequestedVecSize()));
|
||||
|
||||
MSG("SIZE: " + std::to_string(size));
|
||||
|
||||
for (int row = size; row > 0; row--) {
|
||||
for (int column = 0; column < size; column++) {
|
||||
std::cout << labels[((row - 1) * size) + column];
|
||||
std::cout << cluster_labels[((row - 1) * size) + column];
|
||||
}
|
||||
std::cout << std::endl;
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
MSG("AI: Predict");
|
||||
if (params.use_Keras_predictions) { // Predict with Keras default function
|
||||
R["TMP"] = Python_Keras_predict(predictors_scaled, params.batch_size);
|
||||
R["TMP"] = Python_Keras_predict(predictors_scaled, params.batch_size, cluster_labels);
|
||||
|
||||
} else { // Predict with custom Eigen function
|
||||
R["TMP"] = Eigen_predict(Eigen_model, predictors_scaled, params.batch_size, &Eigen_model_mutex);
|
||||
R["TMP"] = Eigen_predict(Eigen_model, predictors_scaled, params.batch_size,
|
||||
&Eigen_model_mutex, cluster_labels);
|
||||
}
|
||||
|
||||
// Apply postprocessing
|
||||
@ -438,8 +437,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
if (params.use_ai_surrogate && !params.disable_training) {
|
||||
// Add values for which the predictions were invalid
|
||||
// to training data buffer
|
||||
MSG("AI: Add invalid predictions to training data buffer");
|
||||
|
||||
MSG("AI: Add invalid predictions to training data buffer");
|
||||
std::vector<std::vector<double>> invalid_x =
|
||||
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
|
||||
|
||||
@ -450,8 +448,9 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms,
|
||||
training_data_buffer_mutex.lock();
|
||||
training_data_buffer_append(training_data_buffer.x, invalid_x);
|
||||
training_data_buffer_append(training_data_buffer.y, invalid_y);
|
||||
cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels,
|
||||
R["validity_vector"]);
|
||||
// Signal to training thread if training data buffer is full
|
||||
|
||||
if (training_data_buffer.y[0].size() >= params.training_data_size) {
|
||||
start_training = true;
|
||||
training_data_buffer_full.notify_one();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user