mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 04:48:23 +01:00
add training weight serializer functions
This commit is contained in:
parent
13ad41d302
commit
f76e438c30
@ -1,15 +1,21 @@
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <cstring>
|
||||
#include <vector>
|
||||
#include <Python.h>
|
||||
#include <numpy/arrayobject.h>
|
||||
#include <Eigen/Dense>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include "AI_functions.hpp"
|
||||
#include "Base/Macros.hpp"
|
||||
#include "naaice_ap2.h"
|
||||
#include "poet.hpp"
|
||||
#include "serializer.hpp"
|
||||
#include <Eigen/Dense>
|
||||
#include <Eigen/src/Core/Matrix.h>
|
||||
#include <Python.h>
|
||||
#include <condition_variable>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <numpy/arrayobject.h>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
using namespace std;
|
||||
|
||||
@ -21,18 +27,21 @@ namespace poet {
|
||||
* functions are defined
|
||||
* @return 0 if function was succesful
|
||||
*/
|
||||
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir) {
|
||||
int Python_Keras_setup(std::string functions_file_path,
|
||||
std::string cuda_src_dir) {
|
||||
// Initialize Python functions
|
||||
Py_Initialize();
|
||||
// Import numpy functions
|
||||
_import_array();
|
||||
PyRun_SimpleString(("cuda_dir = \"" + cuda_src_dir + "\"").c_str()) ;
|
||||
FILE* fp = fopen(functions_file_path.c_str(), "r");
|
||||
int py_functions_initialized = PyRun_SimpleFile(fp, functions_file_path.c_str());
|
||||
PyRun_SimpleString(("cuda_dir = \"" + cuda_src_dir + "\"").c_str());
|
||||
FILE *fp = fopen(functions_file_path.c_str(), "r");
|
||||
int py_functions_initialized =
|
||||
PyRun_SimpleFile(fp, functions_file_path.c_str());
|
||||
fclose(fp);
|
||||
if (py_functions_initialized != 0) {
|
||||
PyErr_Print();
|
||||
throw std::runtime_error(std::string("AI surrogate Python functions could not be loaded." ) +
|
||||
throw std::runtime_error(
|
||||
std::string("AI surrogate Python functions could not be loaded.") +
|
||||
"Are tensorflow and numpy installed?");
|
||||
}
|
||||
return py_functions_initialized;
|
||||
@ -44,25 +53,29 @@ int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir
|
||||
* a variable "model_file_path" in the R input script
|
||||
* @return 0 if function was succesful
|
||||
*/
|
||||
int Python_Keras_load_model(std::string model, std::string model_reactive, bool use_clustering) {
|
||||
int Python_Keras_load_model(std::string model, std::string model_reactive,
|
||||
bool use_clustering) {
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
|
||||
// Initialize Keras default model
|
||||
int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" +
|
||||
model + "\")").c_str());
|
||||
int py_model_loaded =
|
||||
PyRun_SimpleString(("model = initiate_model(\"" + model + "\")").c_str());
|
||||
if (py_model_loaded != 0) {
|
||||
PyErr_Print();
|
||||
throw std::runtime_error("Keras model could not be loaded from: " + model);
|
||||
}
|
||||
|
||||
if (use_clustering) {
|
||||
// Initialize second Keras model that will be used for the "reaction" cluster
|
||||
py_model_loaded = PyRun_SimpleString(("model_reactive = initiate_model(\"" +
|
||||
model_reactive + "\")").c_str());
|
||||
// Initialize second Keras model that will be used for the "reaction"
|
||||
// cluster
|
||||
py_model_loaded = PyRun_SimpleString(
|
||||
("model_reactive = initiate_model(\"" + model_reactive + "\")")
|
||||
.c_str());
|
||||
if (py_model_loaded != 0) {
|
||||
PyErr_Print();
|
||||
throw std::runtime_error("Keras model could not be loaded from: " + model_reactive);
|
||||
throw std::runtime_error("Keras model could not be loaded from: " +
|
||||
model_reactive);
|
||||
}
|
||||
}
|
||||
// Release the Python GIL
|
||||
@ -71,17 +84,17 @@ int Python_Keras_load_model(std::string model, std::string model_reactive, bool
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Converts the std::vector 2D matrix representation of a POET Field object to a numpy array
|
||||
* for use in the Python AI surrogate functions
|
||||
* @brief Converts the std::vector 2D matrix representation of a POET Field
|
||||
* object to a numpy array for use in the Python AI surrogate functions
|
||||
* @param field 2D-Matrix with the content of a Field object
|
||||
* @return Numpy representation of the input vector
|
||||
*/
|
||||
PyObject* vector_to_numpy_array(const std::vector<std::vector<double>>& field) {
|
||||
PyObject *vector_to_numpy_array(const std::vector<std::vector<double>> &field) {
|
||||
npy_intp dims[2] = {static_cast<npy_intp>(field[0].size()),
|
||||
static_cast<npy_intp>(field.size())};
|
||||
|
||||
PyObject* np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64);
|
||||
double* data = static_cast<double*>(PyArray_DATA((PyArrayObject*)np_array));
|
||||
PyObject *np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64);
|
||||
double *data = static_cast<double *>(PyArray_DATA((PyArrayObject *)np_array));
|
||||
// write field data to numpy array
|
||||
for (size_t i = 0; i < field.size(); ++i) {
|
||||
for (size_t j = 0; j < field[i].size(); ++j) {
|
||||
@ -94,28 +107,28 @@ PyObject* vector_to_numpy_array(const std::vector<std::vector<double>>& field) {
|
||||
/**
|
||||
* @brief Converts a Pyton matrix object to a std::vector vector
|
||||
* @param py_matrix Pyobject that must be a 2D matrix
|
||||
* @result Vector that can be used similar to the return value of the Field object
|
||||
* Field.AsVector() method.
|
||||
* @result Vector that can be used similar to the return value of the Field
|
||||
* object Field.AsVector() method.
|
||||
*/
|
||||
std::vector<double> numpy_array_to_vector(PyObject* py_array) {
|
||||
std::vector<double> numpy_array_to_vector(PyObject *py_array) {
|
||||
std::vector<double> result;
|
||||
if (!PyArray_Check(py_array)) {
|
||||
std::cerr << "The model's output is not a numpy array." << std::endl;
|
||||
return result;
|
||||
}
|
||||
// Cast generic PyObject to PyArrayObject
|
||||
PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(py_array);
|
||||
PyArrayObject *np_array = reinterpret_cast<PyArrayObject *>(py_array);
|
||||
|
||||
// Get shape
|
||||
int numDims = PyArray_NDIM(np_array);
|
||||
npy_intp* shape = PyArray_SHAPE(np_array);
|
||||
npy_intp *shape = PyArray_SHAPE(np_array);
|
||||
if (numDims != 2) {
|
||||
std::cerr << "The model's predictions are not a 2D matrix." << std::endl;
|
||||
return result;
|
||||
}
|
||||
|
||||
// Copy data into std::vector format
|
||||
double* data = static_cast<double*>(PyArray_DATA(np_array));
|
||||
double *data = static_cast<double *>(PyArray_DATA(np_array));
|
||||
npy_intp size = PyArray_SIZE(np_array);
|
||||
result.resize(size);
|
||||
std::copy(data, data + size, result.begin());
|
||||
@ -123,44 +136,52 @@ std::vector<double> numpy_array_to_vector(PyObject* py_array) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Uses the Python Keras functions to calculate predictions from a neural network.
|
||||
* @brief Uses the Python Keras functions to calculate predictions from a neural
|
||||
* network.
|
||||
* @param x 2D-Matrix with the content of a Field object
|
||||
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
|
||||
* @return Predictions that the neural network made from the input values x. The predictions are
|
||||
* represented as a vector similar to the representation from the Field.AsVector() method
|
||||
* @param batch_size size for mini-batches that are used in the Keras
|
||||
* model.predict() method
|
||||
* @return Predictions that the neural network made from the input values x. The
|
||||
* predictions are represented as a vector similar to the representation from
|
||||
* the Field.AsVector() method
|
||||
*/
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
|
||||
std::vector<int>& cluster_labels) {
|
||||
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>> &x,
|
||||
int batch_size,
|
||||
std::vector<int> &cluster_labels) {
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
// Prepare data for Python
|
||||
PyObject* py_df_x = vector_to_numpy_array(x);
|
||||
PyObject *py_df_x = vector_to_numpy_array(x);
|
||||
|
||||
// Prepare cluster label vector for Python
|
||||
PyObject* py_cluster_list = PyList_New(cluster_labels.size());
|
||||
PyObject *py_cluster_list = PyList_New(cluster_labels.size());
|
||||
for (size_t i = 0; i < cluster_labels.size(); i++) {
|
||||
PyObject* py_int = PyLong_FromLong(cluster_labels[i]);
|
||||
PyObject *py_int = PyLong_FromLong(cluster_labels[i]);
|
||||
PyList_SET_ITEM(py_cluster_list, i, py_int);
|
||||
}
|
||||
|
||||
// Get the model and inference function from the global python interpreter
|
||||
PyObject* py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
|
||||
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
|
||||
PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step");
|
||||
PyObject *py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject *py_global_dict = PyModule_GetDict(py_main_module);
|
||||
PyObject *py_keras_model = PyDict_GetItemString(py_global_dict, "model");
|
||||
PyObject *py_inference_function =
|
||||
PyDict_GetItemString(py_global_dict, "prediction_step");
|
||||
|
||||
// Get secod model if clustering is used
|
||||
PyObject* py_keras_model_reactive = Py_None;;
|
||||
PyObject *py_keras_model_reactive = Py_None;
|
||||
;
|
||||
if (cluster_labels.size() > 0) {
|
||||
py_keras_model_reactive = PyDict_GetItemString(py_global_dict, "model_reactive");
|
||||
py_keras_model_reactive =
|
||||
PyDict_GetItemString(py_global_dict, "model_reactive");
|
||||
}
|
||||
|
||||
// Build the function arguments as four python objects and an integer
|
||||
PyObject* args = Py_BuildValue("(OOOOi)",
|
||||
py_keras_model, py_keras_model_reactive, py_df_x, py_cluster_list, batch_size);
|
||||
PyObject *args =
|
||||
Py_BuildValue("(OOOOi)", py_keras_model, py_keras_model_reactive, py_df_x,
|
||||
py_cluster_list, batch_size);
|
||||
|
||||
// Call the Python inference function
|
||||
PyObject* py_predictions = PyObject_CallObject(py_inference_function, args);
|
||||
PyObject *py_predictions = PyObject_CallObject(py_inference_function, args);
|
||||
|
||||
// Check py_rv and return as 2D vector
|
||||
std::vector<double> predictions = numpy_array_to_vector(py_predictions);
|
||||
@ -178,15 +199,19 @@ std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, in
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Uses Eigen for fast inference with the weights and biases of a neural network.
|
||||
* This function assumes ReLU activation for each layer.
|
||||
* @param input_batch Batch of input data that must fit the size of the neural networks input layer
|
||||
* @param model Struct of aligned Eigen vectors that hold the neural networks weights and biases.
|
||||
* Only supports simple fully connected feed forward networks.
|
||||
* @return The batch of predictions made with the neural network weights and biases and the data
|
||||
* in input_batch
|
||||
* @brief Uses Eigen for fast inference with the weights and biases of a neural
|
||||
* network. This function assumes ReLU activation for each layer.
|
||||
* @param input_batch Batch of input data that must fit the size of the neural
|
||||
* networks input layer
|
||||
* @param model Struct of aligned Eigen vectors that hold the neural networks
|
||||
* weights and biases. Only supports simple fully connected feed forward
|
||||
* networks.
|
||||
* @return The batch of predictions made with the neural network weights and
|
||||
* biases and the data in input_batch
|
||||
*/
|
||||
Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<const Eigen::MatrixXd>& input_batch, const EigenModel& model) {
|
||||
Eigen::MatrixXd
|
||||
eigen_inference_batched(const Eigen::Ref<const Eigen::MatrixXd> &input_batch,
|
||||
const EigenModel &model) {
|
||||
Eigen::MatrixXd current_layer = input_batch;
|
||||
|
||||
// Process all hidden layers
|
||||
@ -198,31 +223,41 @@ Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<const Eigen::MatrixXd>&
|
||||
|
||||
// Process output layer (without ReLU)
|
||||
size_t output_layer = model.weight_matrices.size() - 1;
|
||||
return (model.weight_matrices[output_layer] * current_layer).colwise() + model.biases[output_layer];
|
||||
return (model.weight_matrices[output_layer] * current_layer).colwise() +
|
||||
model.biases[output_layer];
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Uses the Eigen representation of the two different Keras model weights for fast inference
|
||||
* @brief Uses the Eigen representation of the two different Keras model weights
|
||||
* for fast inference
|
||||
* @param model The model for the non reactive cluster of the field (label 0)
|
||||
* @param model_reactive The model for the non reactive cluster of the field (label 1)
|
||||
* @param model_reactive The model for the non reactive cluster of the field
|
||||
* (label 1)
|
||||
* @param x 2D-Matrix with the content of a Field object
|
||||
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
|
||||
* @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from
|
||||
* the training thread
|
||||
* @param batch_size size for mini-batches that are used in the Keras
|
||||
* model.predict() method
|
||||
* @param Eigen_model_mutex Mutex that locks the model during inference and
|
||||
* prevents updaties from the training thread
|
||||
* @param cluster_labels K-Means cluster label dor each row in the field
|
||||
* @return Predictions that the neural network made from the input values x. The predictions are
|
||||
* represented as a vector similar to the representation from the Field.AsVector() method
|
||||
* @return Predictions that the neural network made from the input values x. The
|
||||
* predictions are represented as a vector similar to the representation from
|
||||
* the Field.AsVector() method
|
||||
*/
|
||||
std::vector<double> Eigen_predict_clustered(const EigenModel& model, const EigenModel& model_reactive,
|
||||
std::vector<std::vector<double>>& x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex, std::vector<int>& cluster_labels) {
|
||||
std::vector<double> Eigen_predict_clustered(const EigenModel &model,
|
||||
const EigenModel &model_reactive,
|
||||
std::vector<std::vector<double>> &x,
|
||||
int batch_size,
|
||||
std::mutex *Eigen_model_mutex,
|
||||
std::vector<int> &cluster_labels) {
|
||||
const int num_samples = x[0].size();
|
||||
const int num_features = x.size();
|
||||
if (num_features != model.weight_matrices[0].cols() ||
|
||||
num_features != model_reactive.weight_matrices[0].cols()) {
|
||||
throw std::runtime_error("Input data size " + std::to_string(num_features) +
|
||||
" does not match model input layer sizes" + std::to_string(model.weight_matrices[0].cols()) +
|
||||
" / " + std::to_string(model_reactive.weight_matrices[0].cols()));
|
||||
throw std::runtime_error(
|
||||
"Input data size " + std::to_string(num_features) +
|
||||
" does not match model input layer sizes" +
|
||||
std::to_string(model.weight_matrices[0].cols()) + " / " +
|
||||
std::to_string(model_reactive.weight_matrices[0].cols()));
|
||||
}
|
||||
|
||||
// Convert input data to Eigen matrix
|
||||
@ -259,13 +294,16 @@ std::vector<double> Eigen_predict_clustered(const EigenModel& model, const Eigen
|
||||
Eigen_model_mutex->lock();
|
||||
|
||||
if (!cluster_0_indices.empty()) {
|
||||
int num_batches_0 = std::ceil(static_cast<double>(cluster_0_indices.size()) / batch_size);
|
||||
int num_batches_0 =
|
||||
std::ceil(static_cast<double>(cluster_0_indices.size()) / batch_size);
|
||||
for (int batch = 0; batch < num_batches_0; ++batch) {
|
||||
int start_idx = batch * batch_size;
|
||||
int end_idx = std::min((batch + 1) * batch_size, static_cast<int>(cluster_0_indices.size()));
|
||||
int end_idx = std::min((batch + 1) * batch_size,
|
||||
static_cast<int>(cluster_0_indices.size()));
|
||||
int current_batch_size = end_idx - start_idx;
|
||||
|
||||
Eigen::MatrixXd batch_data = input_matrix.block(0, start_idx, num_features, current_batch_size);
|
||||
Eigen::MatrixXd batch_data =
|
||||
input_matrix.block(0, start_idx, num_features, current_batch_size);
|
||||
Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model);
|
||||
|
||||
// Store results in their original positions
|
||||
@ -280,14 +318,18 @@ std::vector<double> Eigen_predict_clustered(const EigenModel& model, const Eigen
|
||||
|
||||
// Process cluster 1
|
||||
if (!cluster_1_indices.empty()) {
|
||||
int num_batches_1 = std::ceil(static_cast<double>(cluster_1_indices.size()) / batch_size);
|
||||
int num_batches_1 =
|
||||
std::ceil(static_cast<double>(cluster_1_indices.size()) / batch_size);
|
||||
for (int batch = 0; batch < num_batches_1; ++batch) {
|
||||
int start_idx = batch * batch_size;
|
||||
int end_idx = std::min((batch + 1) * batch_size, static_cast<int>(cluster_1_indices.size()));
|
||||
int end_idx = std::min((batch + 1) * batch_size,
|
||||
static_cast<int>(cluster_1_indices.size()));
|
||||
int current_batch_size = end_idx - start_idx;
|
||||
|
||||
Eigen::MatrixXd batch_data = input_matrix_reactive.block(0, start_idx, num_features, current_batch_size);
|
||||
Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model_reactive);
|
||||
Eigen::MatrixXd batch_data = input_matrix_reactive.block(
|
||||
0, start_idx, num_features, current_batch_size);
|
||||
Eigen::MatrixXd batch_result =
|
||||
eigen_inference_batched(batch_data, model_reactive);
|
||||
|
||||
// Store results in their original positions
|
||||
for (size_t i = 0; i < current_batch_size; ++i) {
|
||||
@ -304,17 +346,22 @@ std::vector<double> Eigen_predict_clustered(const EigenModel& model, const Eigen
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Uses the Eigen representation of the tKeras model weights for fast inference
|
||||
* @brief Uses the Eigen representation of the tKeras model weights for fast
|
||||
* inference
|
||||
* @param model The model weights and biases
|
||||
* @param x 2D-Matrix with the content of a Field object
|
||||
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
|
||||
* @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from
|
||||
* the training thread
|
||||
* @return Predictions that the neural network made from the input values x. The predictions are
|
||||
* represented as a vector similar to the representation from the Field.AsVector() method
|
||||
* @param batch_size size for mini-batches that are used in the Keras
|
||||
* model.predict() method
|
||||
* @param Eigen_model_mutex Mutex that locks the model during inference and
|
||||
* prevents updaties from the training thread
|
||||
* @return Predictions that the neural network made from the input values x. The
|
||||
* predictions are represented as a vector similar to the representation from
|
||||
* the Field.AsVector() method
|
||||
*/
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex) {
|
||||
std::vector<double> Eigen_predict(const EigenModel &model,
|
||||
std::vector<std::vector<double>> x,
|
||||
int batch_size,
|
||||
std::mutex *Eigen_model_mutex) {
|
||||
// Convert input data to Eigen matrix
|
||||
const int num_samples = x[0].size();
|
||||
const int num_features = x.size();
|
||||
@ -330,8 +377,9 @@ std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vect
|
||||
result.reserve(num_samples * num_features);
|
||||
|
||||
if (num_features != model.weight_matrices[0].cols()) {
|
||||
throw std::runtime_error("Input data size " + std::to_string(num_features) + \
|
||||
" does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols()));
|
||||
throw std::runtime_error("Input data size " + std::to_string(num_features) +
|
||||
" does not match model input layer of size " +
|
||||
std::to_string(model.weight_matrices[0].cols()));
|
||||
}
|
||||
int num_batches = std::ceil(static_cast<double>(num_samples) / batch_size);
|
||||
|
||||
@ -342,32 +390,35 @@ std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vect
|
||||
int current_batch_size = end_idx - start_idx;
|
||||
// Extract the current input data batch
|
||||
Eigen::MatrixXd batch_data(num_features, current_batch_size);
|
||||
batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size);
|
||||
batch_data =
|
||||
full_input_matrix.block(0, start_idx, num_features, current_batch_size);
|
||||
// Predict
|
||||
batch_data = eigen_inference_batched(batch_data, model);
|
||||
|
||||
result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size());
|
||||
result.insert(result.end(), batch_data.data(),
|
||||
batch_data.data() + batch_data.size());
|
||||
}
|
||||
Eigen_model_mutex->unlock();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @brief Appends data from one matrix (column major std::vector<std::vector<double>>) to another
|
||||
* @brief Appends data from one matrix (column major
|
||||
* std::vector<std::vector<double>>) to another
|
||||
* @param training_data_buffer Matrix that the values are appended to
|
||||
* @param new_values Matrix that is appended
|
||||
*/
|
||||
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
|
||||
std::vector<std::vector<double>>& new_values) {
|
||||
void training_data_buffer_append(
|
||||
std::vector<std::vector<double>> &training_data_buffer,
|
||||
std::vector<std::vector<double>> &new_values) {
|
||||
// Initialize training data buffer if empty
|
||||
if (training_data_buffer.size() == 0) {
|
||||
training_data_buffer = new_values;
|
||||
} else { // otherwise append
|
||||
for (size_t col = 0; col < training_data_buffer.size(); col++) {
|
||||
training_data_buffer[col].insert(training_data_buffer[col].end(),
|
||||
new_values[col].begin(), new_values[col].end());
|
||||
new_values[col].begin(),
|
||||
new_values[col].end());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -376,9 +427,11 @@ void training_data_buffer_append(std::vector<std::vector<double>>& training_data
|
||||
* @brief Appends data from one int vector to another based on a mask vector
|
||||
* @param labels Vector that the values are appended to
|
||||
* @param new_labels Values that are appended
|
||||
* @param validity Mask vector that defines how many and which values are appended
|
||||
* @param validity Mask vector that defines how many and which values are
|
||||
* appended
|
||||
*/
|
||||
void cluster_labels_append(std::vector<int>& labels_buffer, std::vector<int>& new_labels,
|
||||
void cluster_labels_append(std::vector<int> &labels_buffer,
|
||||
std::vector<int> &new_labels,
|
||||
std::vector<int> validity) {
|
||||
// Calculate new buffer size from number of valid elements in mask
|
||||
int n_invalid = validity.size();
|
||||
@ -400,23 +453,26 @@ void cluster_labels_append(std::vector<int>& labels_buffer, std::vector<int>& ne
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Uses the Python environment with Keras' default functions to train the model
|
||||
* @brief Uses the Python environment with Keras' default functions to train the
|
||||
* model
|
||||
* @param x Training data features
|
||||
* @param y Training data targets
|
||||
* @param params Global runtime paramters
|
||||
*/
|
||||
void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::vector<double>>& y,
|
||||
int train_cluster, std::string model_name, const RuntimeParameters& params) {
|
||||
void Python_Keras_train(std::vector<std::vector<double>> &x,
|
||||
std::vector<std::vector<double>> &y, int train_cluster,
|
||||
std::string model_name,
|
||||
const RuntimeParameters ¶ms) {
|
||||
// Prepare data for python
|
||||
PyObject* py_df_x = vector_to_numpy_array(x);
|
||||
PyObject* py_df_y = vector_to_numpy_array(y);
|
||||
PyObject *py_df_x = vector_to_numpy_array(x);
|
||||
PyObject *py_df_y = vector_to_numpy_array(y);
|
||||
|
||||
// Make sure that model output file name .keras file
|
||||
std::string model_path = params.save_model_path;
|
||||
if (!model_path.empty()) {
|
||||
if (model_path.length() >= 6 && model_path.substr(model_path.length() - 6) != ".keras") {
|
||||
if (model_path.length() >= 6 &&
|
||||
model_path.substr(model_path.length() - 6) != ".keras") {
|
||||
model_path += ".keras";
|
||||
}
|
||||
}
|
||||
@ -430,14 +486,16 @@ void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::ve
|
||||
}
|
||||
|
||||
// Get the model and training function from the global python interpreter
|
||||
PyObject* py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
|
||||
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str());
|
||||
PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step");
|
||||
PyObject *py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject *py_global_dict = PyModule_GetDict(py_main_module);
|
||||
PyObject *py_keras_model =
|
||||
PyDict_GetItemString(py_global_dict, model_name.c_str());
|
||||
PyObject *py_training_function =
|
||||
PyDict_GetItemString(py_global_dict, "training_step");
|
||||
|
||||
// Build the function arguments as four python objects and an integer
|
||||
PyObject* args = Py_BuildValue("(OOOiis)",
|
||||
py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs,
|
||||
PyObject *args = Py_BuildValue("(OOOiis)", py_keras_model, py_df_x, py_df_y,
|
||||
params.batch_size, params.training_epochs,
|
||||
model_path.c_str());
|
||||
|
||||
// Call the Python training function
|
||||
@ -453,36 +511,45 @@ void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::ve
|
||||
/**
|
||||
* @brief Function for threadsafe parallel training and weight updating.
|
||||
* The function waits conditionally until the training data buffer is full.
|
||||
* It then clears the buffer and starts training, after training it writes the new weights to
|
||||
* the Eigen model.
|
||||
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
|
||||
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
|
||||
* when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
|
||||
* It then clears the buffer and starts training, after training it writes the
|
||||
* new weights to the Eigen model.
|
||||
* @param Eigen_model Pointer to the EigenModel struct that will be updates with
|
||||
* new weights
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel
|
||||
* struct
|
||||
* @param training_data_buffer Pointer to the Training data struct with which
|
||||
* the model is trained
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the
|
||||
* training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the
|
||||
* main thread signals when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against
|
||||
* spurious wakeups
|
||||
* @param end_training Signals end of program to wind down thread gracefully
|
||||
* @param params Global runtime paramters
|
||||
* @return 0 if function was succesful
|
||||
*/
|
||||
void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
|
||||
std::mutex* Eigen_model_mutex,
|
||||
TrainingData* training_data_buffer,
|
||||
std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params) {
|
||||
void parallel_training(EigenModel *Eigen_model,
|
||||
EigenModel *Eigen_model_reactive,
|
||||
std::mutex *Eigen_model_mutex,
|
||||
TrainingData *training_data_buffer,
|
||||
std::mutex *training_data_buffer_mutex,
|
||||
std::condition_variable *training_data_buffer_full,
|
||||
bool *start_training, bool *end_training,
|
||||
const RuntimeParameters ¶ms) {
|
||||
while (true) {
|
||||
// Conditional waiting:
|
||||
// - Sleeps until a signal arrives on training_data_buffer_full
|
||||
// - Releases the lock on training_data_buffer_mutex while sleeping
|
||||
// - Lambda function with start_training checks if it was a spurious wakeup
|
||||
// - Reaquires the lock on training_data_buffer_mutex after waking up
|
||||
// - If start_training has been set to true while the thread was active, it does NOT
|
||||
// wait for a signal on training_data_buffer_full but starts the next round immediately.
|
||||
// - If start_training has been set to true while the thread was active, it
|
||||
// does NOT
|
||||
// wait for a signal on training_data_buffer_full but starts the next
|
||||
// round immediately.
|
||||
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
|
||||
training_data_buffer_full->wait(lock, [start_training] { return *start_training;});
|
||||
training_data_buffer_full->wait(
|
||||
lock, [start_training] { return *start_training; });
|
||||
// Return if program is about to end
|
||||
if (*end_training) {
|
||||
return;
|
||||
@ -490,19 +557,30 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive
|
||||
// Get the necessary training data
|
||||
std::cout << "AI: Training thread: Getting training data" << std::endl;
|
||||
// Initialize training data input and targets
|
||||
std::vector<std::vector<double>> inputs(training_data_buffer->x.size(),
|
||||
std::vector<std::vector<double>> inputs(
|
||||
training_data_buffer->x.size(),
|
||||
std::vector<double>(params.training_data_size));
|
||||
std::vector<std::vector<double>> targets(training_data_buffer->x.size(),
|
||||
std::vector<std::vector<double>> targets(
|
||||
training_data_buffer->x.size(),
|
||||
std::vector<double>(params.training_data_size));
|
||||
|
||||
fprintf(stdout, "x.size: %zu\n", training_data_buffer->x.size());
|
||||
fprintf(stdout, "params.training_data_size: %d\n", params.training_data_size);
|
||||
|
||||
int buffer_size = training_data_buffer->x[0].size();
|
||||
fprintf(stdout, "training_data_buffer->x[0].size: %zu\n", training_data_buffer->x[0].size());
|
||||
sleep(10);
|
||||
|
||||
// If clustering is used, check the current cluster
|
||||
int n_cluster_reactive = 0;
|
||||
int train_cluster = -1; // Default value for non clustered training (all data is used)
|
||||
int train_cluster =
|
||||
-1; // Default value for non clustered training (all data is used)
|
||||
if (params.use_clustering) {
|
||||
for (size_t i = 0; i < buffer_size; i++) {
|
||||
n_cluster_reactive += training_data_buffer->cluster_labels[i];
|
||||
}
|
||||
// ? set train_cluster to true only if all labels in training_data_buffer
|
||||
// have the corresponding cluster_label?
|
||||
train_cluster = n_cluster_reactive >= params.training_data_size;
|
||||
}
|
||||
int buffer_row = 0;
|
||||
@ -514,10 +592,10 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive
|
||||
// Copy and remove from training data buffer
|
||||
inputs[col][copied_row] = training_data_buffer->x[col][buffer_row];
|
||||
targets[col][copied_row] = training_data_buffer->y[col][buffer_row];
|
||||
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin() +
|
||||
buffer_row);
|
||||
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin() +
|
||||
buffer_row);
|
||||
training_data_buffer->x[col].erase(
|
||||
training_data_buffer->x[col].begin() + buffer_row);
|
||||
training_data_buffer->y[col].erase(
|
||||
training_data_buffer->y[col].begin() + buffer_row);
|
||||
}
|
||||
// Remove from cluster label buffer
|
||||
if (params.use_clustering) {
|
||||
@ -530,16 +608,20 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive
|
||||
}
|
||||
}
|
||||
|
||||
// Set the waiting predicate to immediately continue training if enough elements of any cluster remain
|
||||
// Set the waiting predicate to immediately continue training if enough
|
||||
// elements of any cluster remain
|
||||
if (train_cluster == 1) {
|
||||
*start_training = ((n_cluster_reactive - params.training_data_size) >= params.training_data_size) ||
|
||||
*start_training =
|
||||
((n_cluster_reactive - params.training_data_size) >=
|
||||
params.training_data_size) ||
|
||||
((buffer_size - n_cluster_reactive) >= params.training_data_size);
|
||||
} else {
|
||||
*start_training = (buffer_size - n_cluster_reactive - params.training_data_size)
|
||||
>= params.training_data_size;
|
||||
*start_training =
|
||||
(buffer_size - n_cluster_reactive - params.training_data_size) >=
|
||||
params.training_data_size;
|
||||
}
|
||||
|
||||
//update number of training runs
|
||||
// update number of training runs
|
||||
training_data_buffer->n_training_runs += 1;
|
||||
// Unlock the training_data_buffer_mutex
|
||||
lock.unlock();
|
||||
@ -548,7 +630,8 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive
|
||||
if (train_cluster == 1) {
|
||||
model_name = "model_reactive";
|
||||
}
|
||||
std::cout << "AI: Training thread: Start training " << model_name << std::endl;
|
||||
std::cout << "AI: Training thread: Start training " << model_name
|
||||
<< std::endl;
|
||||
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
@ -556,8 +639,10 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive
|
||||
Python_Keras_train(inputs, targets, train_cluster, model_name, params);
|
||||
|
||||
if (!params.use_Keras_predictions) {
|
||||
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
|
||||
std::vector<std::vector<std::vector<double>>> cpp_weights = Python_Keras_get_weights(model_name);
|
||||
std::cout << "AI: Training thread: Update shared model weights"
|
||||
<< std::endl;
|
||||
std::vector<std::vector<std::vector<double>>> cpp_weights =
|
||||
Python_Keras_get_weights(model_name);
|
||||
Eigen_model_mutex->lock();
|
||||
if (train_cluster == 1) {
|
||||
update_weights(Eigen_model_reactive, cpp_weights);
|
||||
@ -569,50 +654,157 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive
|
||||
|
||||
// Release the Python GIL
|
||||
PyGILState_Release(gstate);
|
||||
std::cout << "AI: Training thread: Finished training, waiting for new data" << std::endl;
|
||||
std::cout << "AI: Training thread: Finished training, waiting for new data"
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
|
||||
std::mutex *Eigen_model_mutex,
|
||||
TrainingData *training_data_buffer,
|
||||
std::mutex *training_data_buffer_mutex,
|
||||
std::condition_variable *training_data_buffer_full,
|
||||
bool *start_training, bool *end_training,
|
||||
const RuntimeParameters ¶ms, naa_handle *handle){
|
||||
Eigen_model_mutex->lock();
|
||||
training_data_buffer_mutex->lock();
|
||||
size_t model_size = calculateStructSize(Eigen_model, 'E');
|
||||
// TODO: initialize models with weights from keras model
|
||||
std::string model_name = "model";
|
||||
std::vector<std::vector<std::vector<double>>> model_weight =
|
||||
Python_Keras_get_weights(model_name);
|
||||
update_weights(Eigen_model, model_weight);
|
||||
fprintf(stdout, "example weight: %f\n",
|
||||
Eigen_model->weight_matrices[0](0, 0));
|
||||
model_size = calculateStructSize(Eigen_model, 'E');
|
||||
|
||||
|
||||
fprintf(stdout, "size after: %zu\n", model_size);
|
||||
char *serializedData = (char *)calloc(model_size, sizeof(char));
|
||||
if (serializedData == NULL) {
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
int res = serializeModelWeights(Eigen_model, serializedData);
|
||||
|
||||
struct naa_param_t weight_region[] = {(void*) serializedData, model_size};
|
||||
|
||||
printf("-- Setting Up Connection --\n");
|
||||
if (naa_create(1, weight_region, 1,
|
||||
weight_region, 0, handle)) {
|
||||
fprintf(stderr, "Error during naa_create. Exiting.\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
printf("-- RPC Invocation --\n");
|
||||
if (naa_invoke(handle)) {
|
||||
fprintf(stderr, "Error during naa_invoke. Exiting.\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
// TODO: naa_wait with new weights
|
||||
|
||||
|
||||
|
||||
|
||||
EigenModel deserializedModel = deserializeModelWeights(serializedData, model_size);
|
||||
fprintf(stdout, "After deserialization: %f\n", deserializedModel.weight_matrices[0](0,0));
|
||||
|
||||
for(int i=0;i<Eigen_model->weight_matrices[0].rows();i++){
|
||||
for (int j=0;j<Eigen_model->weight_matrices[0].cols();j++){
|
||||
fprintf(stdout, "model: %f, deserializedModel: %f\n",
|
||||
Eigen_model->weight_matrices[0](i, j), deserializedModel.weight_matrices[0](i,j));
|
||||
}
|
||||
}
|
||||
|
||||
free(serializedData);
|
||||
serializedData = nullptr;
|
||||
// naa_finalize: clean up connection.
|
||||
printf("-- Cleaning Up --\n");
|
||||
naa_finalize(handle);
|
||||
// fprintf(stdout, "after free\n");
|
||||
// TODO: determine the struct sizes of EigenModel and TrainData
|
||||
// TODO: initialize RDMA memory regions (two sections: model weights and training data)
|
||||
|
||||
// TODO: establish connection to NAA server
|
||||
|
||||
// TODO: if training buffer is full
|
||||
|
||||
|
||||
// TODO: serialize EigenModel and TrainData
|
||||
// TODO: run invoke call from naaice API
|
||||
// TODO: wait for results
|
||||
// TODO: update model weights (dont forget locking)
|
||||
// TODO: wait for next training buffer iteration
|
||||
|
||||
Eigen_model_mutex->unlock();
|
||||
training_data_buffer_mutex->unlock();
|
||||
|
||||
|
||||
}
|
||||
|
||||
std::thread python_train_thread;
|
||||
std::thread naa_train_thread;
|
||||
/**
|
||||
* @brief Starts a thread for parallel training and weight updating. This Wrapper function
|
||||
* ensures, that the main POET program can be built without pthread support if the AI
|
||||
* surrogate functions are disabled during compilation.
|
||||
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
|
||||
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
|
||||
* when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
|
||||
* @brief Starts a thread for parallel training and weight updating. This
|
||||
* Wrapper function ensures, that the main POET program can be built without
|
||||
* pthread support if the AI surrogate functions are disabled during
|
||||
* compilation.
|
||||
* @param Eigen_model Pointer to the EigenModel struct that will be updates with
|
||||
* new weights
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel
|
||||
* struct
|
||||
* @param training_data_buffer Pointer to the Training data struct with which
|
||||
* the model is trained
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the
|
||||
* training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the
|
||||
* main thread signals when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against
|
||||
* spurious wakeups
|
||||
* @param end_training Signals end of program to wind down thread gracefully
|
||||
* @param params Global runtime paramters
|
||||
* @return 0 if function was succesful
|
||||
*/
|
||||
int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
|
||||
std::mutex* Eigen_model_mutex,
|
||||
TrainingData* training_data_buffer,
|
||||
std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params) {
|
||||
PyThreadState *_save = PyEval_SaveThread();
|
||||
python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_reactive,
|
||||
Eigen_model_mutex, training_data_buffer,
|
||||
training_data_buffer_mutex, training_data_buffer_full,
|
||||
start_training, end_training, params);
|
||||
int Python_Keras_training_thread(
|
||||
EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
|
||||
std::mutex *Eigen_model_mutex, TrainingData *training_data_buffer,
|
||||
std::mutex *training_data_buffer_mutex,
|
||||
std::condition_variable *training_data_buffer_full, bool *start_training,
|
||||
bool *end_training, const RuntimeParameters ¶ms, naa_handle *handle) {
|
||||
MSG("In Python_Keras_training_thread");
|
||||
PyThreadState *_save = PyEval_SaveThread(); // ?
|
||||
// check if naa is activated and if so, we use training on naa server
|
||||
if(params.use_naa){
|
||||
if (!(handle == NULL)) {
|
||||
MSG("NAA Accelerator is used for online training");
|
||||
naa_train_thread = std::thread(
|
||||
naa_training, Eigen_model, Eigen_model_reactive, Eigen_model_mutex,
|
||||
training_data_buffer, training_data_buffer_mutex,
|
||||
training_data_buffer_full, start_training, end_training, params, handle);
|
||||
}
|
||||
} else{
|
||||
python_train_thread = std::thread(
|
||||
parallel_training, Eigen_model, Eigen_model_reactive, Eigen_model_mutex,
|
||||
training_data_buffer, training_data_buffer_mutex,
|
||||
training_data_buffer_full, start_training, end_training, params);
|
||||
}
|
||||
fprintf(stdout, "End of Python_Keras_training_thread\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Updates the EigenModels weigths and biases from the weight vector
|
||||
* @param model Pounter to an EigenModel struct
|
||||
* @param weights Vector of model weights from keras as returned by Python_Keras_get_weights()
|
||||
* @param model Pointer to an EigenModel struct
|
||||
* @param weights Vector of model weights from keras as returned by
|
||||
* Python_Keras_get_weights()
|
||||
*/
|
||||
void update_weights(EigenModel* model,
|
||||
const std::vector<std::vector<std::vector<double>>>& weights) {
|
||||
size_t num_layers = weights.size() / 2;
|
||||
// ? check if updating was succesful -> hash about values?
|
||||
void update_weights(
|
||||
EigenModel *model,
|
||||
const std::vector<std::vector<std::vector<double>>> &weights) {
|
||||
MSG("In update_weights");
|
||||
size_t num_layers =
|
||||
weights.size() / 2; // half length because it contains weights and biases
|
||||
for (size_t i = 0; i < weights.size(); i += 2) {
|
||||
// Fill current weight matrix
|
||||
size_t rows = weights[i][0].size();
|
||||
@ -630,23 +822,28 @@ void update_weights(EigenModel* model,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Converts the weights and biases from the Python Keras model to C++ vectors
|
||||
* @brief Converts the weights and biases from the Python Keras model to C++
|
||||
* vectors
|
||||
* @return A vector containing the model weights and biases
|
||||
*/
|
||||
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string model_name) {
|
||||
std::vector<std::vector<std::vector<double>>>
|
||||
Python_Keras_get_weights(std::string model_name) {
|
||||
// Acquire the Python GIL
|
||||
fprintf(stdout, "In Python_Keras_get_weights\n");
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
|
||||
PyObject* py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
|
||||
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str());
|
||||
PyObject* py_get_weights_function = PyDict_GetItemString(py_global_dict, "get_weights");
|
||||
PyObject* args = Py_BuildValue("(O)", py_keras_model);
|
||||
PyObject *py_main_module = PyImport_AddModule("__main__");
|
||||
PyObject *py_global_dict = PyModule_GetDict(py_main_module);
|
||||
PyObject *py_keras_model =
|
||||
PyDict_GetItemString(py_global_dict, model_name.c_str());
|
||||
PyObject *py_get_weights_function =
|
||||
PyDict_GetItemString(py_global_dict, "get_weights");
|
||||
PyObject *args = Py_BuildValue("(O)", py_keras_model);
|
||||
|
||||
// Call Python function
|
||||
PyObject* py_weights_list = PyObject_CallObject(py_get_weights_function, args);
|
||||
PyObject *py_weights_list =
|
||||
PyObject_CallObject(py_get_weights_function, args);
|
||||
|
||||
if (!py_weights_list) {
|
||||
PyErr_Print();
|
||||
@ -659,12 +856,13 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::stri
|
||||
// Iterate through the layers (weights and biases)
|
||||
Py_ssize_t num_layers = PyList_Size(py_weights_list);
|
||||
for (Py_ssize_t i = 0; i < num_layers; ++i) {
|
||||
PyObject* py_weight_array = PyList_GetItem(py_weights_list, i);
|
||||
PyObject *py_weight_array = PyList_GetItem(py_weights_list, i);
|
||||
if (!PyArray_Check(py_weight_array)) {
|
||||
throw std::runtime_error("Weight is not a NumPy array.");
|
||||
}
|
||||
|
||||
PyArrayObject* weight_np = reinterpret_cast<PyArrayObject*>(py_weight_array);
|
||||
PyArrayObject *weight_np =
|
||||
reinterpret_cast<PyArrayObject *>(py_weight_array);
|
||||
int dtype = PyArray_TYPE(weight_np);
|
||||
|
||||
// If array is 2D it's a weight matrix
|
||||
@ -672,45 +870,54 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::stri
|
||||
int num_rows = PyArray_DIM(weight_np, 0);
|
||||
int num_cols = PyArray_DIM(weight_np, 1);
|
||||
|
||||
std::vector<std::vector<double>> weight_matrix(num_rows, std::vector<double>(num_cols));
|
||||
std::vector<std::vector<double>> weight_matrix(
|
||||
num_rows, std::vector<double>(num_cols));
|
||||
// Handle different precision settings
|
||||
if (dtype == NPY_FLOAT32) {
|
||||
float* weight_data_float = static_cast<float*>(PyArray_DATA(weight_np));
|
||||
//
|
||||
float *weight_data_float =
|
||||
static_cast<float *>(PyArray_DATA(weight_np));
|
||||
for (size_t r = 0; r < num_rows; ++r) {
|
||||
for (size_t c = 0; c < num_cols; ++c) {
|
||||
weight_matrix[r][c] = static_cast<double>(weight_data_float[r * num_cols + c]);
|
||||
weight_matrix[r][c] =
|
||||
static_cast<double>(weight_data_float[r * num_cols + c]);
|
||||
}
|
||||
}
|
||||
} else if (dtype == NPY_DOUBLE) {
|
||||
double* weight_data_double = static_cast<double*>(PyArray_DATA(weight_np));
|
||||
double *weight_data_double =
|
||||
static_cast<double *>(PyArray_DATA(weight_np));
|
||||
for (size_t r = 0; r < num_rows; ++r) {
|
||||
for (size_t c = 0; c < num_cols; ++c) {
|
||||
weight_matrix[r][c] = weight_data_double[r * num_cols + c];
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("Unsupported data type for weights. Must be NPY_FLOAT32 or NPY_DOUBLE.");
|
||||
throw std::runtime_error("Unsupported data type for weights. Must be "
|
||||
"NPY_FLOAT32 or NPY_DOUBLE.");
|
||||
}
|
||||
cpp_weights.push_back(weight_matrix);
|
||||
|
||||
// If array is 1D it's a bias vector
|
||||
} else if (PyArray_NDIM(weight_np) == 1) {
|
||||
int num_elements = PyArray_DIM(weight_np, 0);
|
||||
std::vector<std::vector<double>> bias_vector(1, std::vector<double>(num_elements));
|
||||
std::vector<std::vector<double>> bias_vector(
|
||||
1, std::vector<double>(num_elements));
|
||||
|
||||
// Handle different precision settings
|
||||
if (dtype == NPY_FLOAT32) {
|
||||
float* bias_data_float = static_cast<float*>(PyArray_DATA(weight_np));
|
||||
float *bias_data_float = static_cast<float *>(PyArray_DATA(weight_np));
|
||||
for (size_t j = 0; j < num_elements; ++j) {
|
||||
bias_vector[0][j] = static_cast<double>(bias_data_float[j]);
|
||||
}
|
||||
} else if (dtype == NPY_DOUBLE) {
|
||||
double* bias_data_double = static_cast<double*>(PyArray_DATA(weight_np));
|
||||
double *bias_data_double =
|
||||
static_cast<double *>(PyArray_DATA(weight_np));
|
||||
for (size_t j = 0; j < num_elements; ++j) {
|
||||
bias_vector[0][j] = bias_data_double[j];
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("Unsupported data type for biases. Must be NPY_FLOAT32 or NPY_DOUBLE.");
|
||||
throw std::runtime_error("Unsupported data type for biases. Must be "
|
||||
"NPY_FLOAT32 or NPY_DOUBLE.");
|
||||
}
|
||||
cpp_weights.push_back(bias_vector);
|
||||
}
|
||||
@ -724,18 +931,22 @@ std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::stri
|
||||
return cpp_weights;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Joins the training thread and winds down the Python environment gracefully
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
|
||||
* when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
|
||||
* @brief Joins the training thread and winds down the Python environment
|
||||
* gracefully
|
||||
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel
|
||||
* struct
|
||||
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the
|
||||
* training data struct
|
||||
* @param training_data_buffer_full Conditional waiting variable with wich the
|
||||
* main thread signals when a training run can start
|
||||
* @param start_training Conditional waiting predicate to mitigate against
|
||||
* spurious wakeups
|
||||
* @param end_training Signals end of program to wind down thread gracefully */
|
||||
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training, bool* end_training) {
|
||||
void Python_finalize(std::mutex *Eigen_model_mutex,
|
||||
std::mutex *training_data_buffer_mutex,
|
||||
std::condition_variable *training_data_buffer_full,
|
||||
bool *start_training, bool *end_training) {
|
||||
training_data_buffer_mutex->lock();
|
||||
// Define training as over
|
||||
*end_training = true;
|
||||
@ -750,8 +961,8 @@ void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_bu
|
||||
|
||||
// Acquire the Python GIL
|
||||
PyGILState_STATE gstate = PyGILState_Ensure();
|
||||
//Finalize Python
|
||||
// Finalize Python
|
||||
Py_FinalizeEx();
|
||||
}
|
||||
|
||||
} //namespace poet
|
||||
} // namespace poet
|
||||
@ -17,9 +17,14 @@
|
||||
#ifndef AI_FUNCTIONS_H
|
||||
#define AI_FUNCTIONS_H
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "poet.hpp"
|
||||
extern "C"{
|
||||
#include <naaice_ap2.h>
|
||||
}
|
||||
|
||||
// PhreeqC definition of pi clashes with Eigen macros
|
||||
// so we have to temporarily undef it
|
||||
@ -44,7 +49,7 @@ struct TrainingData {
|
||||
int n_training_runs = 0;
|
||||
};
|
||||
|
||||
// Ony declare the actual functions if flag is set
|
||||
// Only declare the actual functions if flag is set
|
||||
#ifdef USE_AI_SURROGATE
|
||||
|
||||
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir);
|
||||
@ -71,7 +76,7 @@ int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_mode
|
||||
std::mutex* training_data_buffer_mutex,
|
||||
std::condition_variable* training_data_buffer_full,
|
||||
bool* start_training, bool* end_training,
|
||||
const RuntimeParameters& params);
|
||||
const RuntimeParameters& params, naa_handle *handle);
|
||||
|
||||
void update_weights(EigenModel* model, const std::vector<std::vector<std::vector<double>>>& weights);
|
||||
|
||||
@ -84,7 +89,6 @@ std::vector<double> Eigen_predict_clustered(const EigenModel& model, const Eigen
|
||||
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
|
||||
std::mutex* Eigen_model_mutex);
|
||||
|
||||
|
||||
// Otherwise, define the necessary stubs
|
||||
#else
|
||||
inline void Python_Keras_setup(std::string, std::string){}
|
||||
@ -97,7 +101,7 @@ inline void training_data_buffer_append(std::vector<std::vector<double>>&,
|
||||
inline void cluster_labels_append(std::vector<int>&, std::vector<int>&, std::vector<int>){}
|
||||
inline int Python_Keras_training_thread(EigenModel*, EigenModel*, std::mutex*,
|
||||
TrainingData*, std::mutex*, std::condition_variable*,
|
||||
bool*, bool*, const RuntimeParameters&){return {};}
|
||||
bool*, bool*, const RuntimeParameters&, naa_handle*){return {};}
|
||||
|
||||
inline void update_weights(EigenModel*, const std::vector<std::vector<std::vector<double>>>&){}
|
||||
inline std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string){return {};}
|
||||
|
||||
221
src/Chemistry/SurrogateModels/serializer.cpp
Normal file
221
src/Chemistry/SurrogateModels/serializer.cpp
Normal file
@ -0,0 +1,221 @@
|
||||
#include "serializer.hpp"
|
||||
#include "AI_functions.hpp"
|
||||
#include <Eigen/src/Core/Matrix.h>
|
||||
#include <cstddef>
|
||||
#include <cstdio>
|
||||
|
||||
using namespace std;
|
||||
namespace poet{
|
||||
size_t calculateStructSize(void *struct_pointer, char type){
|
||||
|
||||
size_t struct_size = 0;
|
||||
if (type == 'E') {
|
||||
struct_size += sizeof(size_t); // number of matrices
|
||||
struct_size +=
|
||||
static_cast<EigenModel *>(struct_pointer)->weight_matrices.size() *
|
||||
2 * sizeof(size_t); // dimensions of matrices
|
||||
struct_size += sizeof(size_t); // number of vectors
|
||||
struct_size += static_cast<EigenModel *>(struct_pointer)->biases.size() *
|
||||
sizeof(size_t); // length of vectors
|
||||
|
||||
for (const Eigen::MatrixXd &matrix :
|
||||
static_cast<EigenModel *>(struct_pointer)->weight_matrices) {
|
||||
// fprintf(stderr, "matrix size: rows:%td, cols: %td\n", matrix.rows(), matrix.cols());
|
||||
struct_size += matrix.size() * sizeof(double);
|
||||
// fprintf(stderr, "matrix size %td\n", matrix.size());
|
||||
|
||||
}
|
||||
for (const Eigen::VectorXd &bias :
|
||||
static_cast<EigenModel *>(struct_pointer)->biases) {
|
||||
struct_size += bias.size() * sizeof(double);
|
||||
// fprintf(stderr, "matrix size %td\n", bias.size());
|
||||
|
||||
}
|
||||
} else if (type == 'T') {
|
||||
}
|
||||
|
||||
return struct_size;
|
||||
|
||||
}
|
||||
|
||||
|
||||
int serializeModelWeights(const EigenModel *model, char *memory){
|
||||
|
||||
size_t num_matrices = model->weight_matrices.size();
|
||||
size_t size_counter = 0;
|
||||
std::memcpy(memory, &num_matrices, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
for (const Eigen::MatrixXd &matrix : model->weight_matrices) {
|
||||
size_t rows = matrix.rows(), cols = matrix.cols();
|
||||
fprintf(stdout, "rows: %zu, cols: %zu\n", rows, cols);
|
||||
std::memcpy(memory, &rows, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
std::memcpy(memory, &cols, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
std::memcpy(memory, matrix.data(), rows * cols * sizeof(double));
|
||||
memory += rows * cols * sizeof(double);
|
||||
size_counter += rows * cols * sizeof(double);
|
||||
}
|
||||
|
||||
// Serialisierung der Bias-Vektoren
|
||||
size_t num_biases = model->biases.size();
|
||||
std::memcpy(memory, &num_biases, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
for (const Eigen::VectorXd &bias : model->biases) {
|
||||
size_t size = bias.size();
|
||||
std::memcpy(memory, &size, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
std::memcpy(memory, bias.data(), size * sizeof(double));
|
||||
memory += size * sizeof(double);
|
||||
size_counter += size * sizeof(double);
|
||||
}
|
||||
fprintf(stdout, "serializer size: %zu\n", size_counter);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// EigenModel deserializeModelWeights(char *memory, size_t buffer_size){
|
||||
|
||||
// EigenModel deserializedModel;
|
||||
|
||||
|
||||
// size_t num_matrices;
|
||||
// size_t size_counter = 0;
|
||||
// std::memcpy(&num_matrices, memory, sizeof(size_t));
|
||||
// fprintf(stdout, "number of matrices: %zu\n", num_matrices);
|
||||
|
||||
// memory += sizeof(size_t);
|
||||
// size_counter += sizeof(size_t);
|
||||
// deserializedModel.weight_matrices.resize(num_matrices);
|
||||
// for (Eigen::MatrixXd &matrix : deserializedModel.weight_matrices) {
|
||||
// size_t rows, cols;
|
||||
// std::memcpy(&rows, memory, sizeof(size_t));
|
||||
// memory += sizeof(size_t);
|
||||
// size_counter += sizeof(size_t);
|
||||
// std::memcpy(&cols, memory, sizeof(size_t));
|
||||
// fprintf(stdout, "rows: %zu, cols: %zu\n", rows, cols);
|
||||
// memory += sizeof(size_t);
|
||||
// size_counter += sizeof(size_t);
|
||||
// fprintf(stdout, "rows before: %td, cols before: %td\n", matrix.rows(), matrix.cols());
|
||||
// matrix.resize(rows, cols);
|
||||
// std::memcpy(matrix.data(), memory, rows * cols * sizeof(double));
|
||||
|
||||
// memory += rows * cols * sizeof(double);
|
||||
// size_counter += rows * cols * sizeof(double);
|
||||
// }
|
||||
// fprintf(stdout, "deserialized size of matrices: %zu\n", size_counter);
|
||||
// size_t num_biases;
|
||||
// std::memcpy(&num_biases, memory, sizeof(size_t));
|
||||
// memory += sizeof(size_t);
|
||||
// size_counter += sizeof(size_t);
|
||||
|
||||
// fprintf(stdout, "number of biases: %zu\n", num_biases);
|
||||
// deserializedModel.biases.resize(num_biases);
|
||||
// for (Eigen::VectorXd &bias : deserializedModel.biases) {
|
||||
// size_t size;
|
||||
// std::memcpy(&size, memory, sizeof(size_t));
|
||||
// fprintf(stdout, "bias length: %zu\n", size);
|
||||
// memory += sizeof(size_t);
|
||||
// size_counter += sizeof(size_t);
|
||||
// bias.resize(size);
|
||||
// std::memcpy(bias.data(), memory, size * sizeof(double));
|
||||
// memory += size * sizeof(double);
|
||||
// size_counter += size * sizeof(double);
|
||||
// }
|
||||
// fprintf(stdout, "deserialized size: %zu\n", size_counter);
|
||||
// if(size_counter > buffer_size){
|
||||
// fprintf(stderr, "buffer corrupted!\n");
|
||||
// }
|
||||
// return deserializedModel;
|
||||
// }
|
||||
EigenModel deserializeModelWeights(char *memory, size_t buffer_size) {
|
||||
EigenModel deserializedModel;
|
||||
size_t size_counter = 0;
|
||||
|
||||
// Anzahl Matrizen
|
||||
size_t num_matrices;
|
||||
if (buffer_size < sizeof(size_t)) {
|
||||
fprintf(stderr, "Buffer too small.\n");
|
||||
return deserializedModel;
|
||||
}
|
||||
std::memcpy(&num_matrices, memory, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
deserializedModel.weight_matrices.resize(num_matrices);
|
||||
|
||||
// Matrizen deserialisieren
|
||||
for (Eigen::MatrixXd &matrix : deserializedModel.weight_matrices) {
|
||||
size_t rows, cols;
|
||||
|
||||
// Buffer-Check
|
||||
if (size_counter + 2 * sizeof(size_t) > buffer_size) {
|
||||
fprintf(stderr, "Buffer too small for matrix dimensions.\n");
|
||||
return deserializedModel;
|
||||
}
|
||||
|
||||
std::memcpy(&rows, memory, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
|
||||
std::memcpy(&cols, memory, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
|
||||
if (size_counter + rows * cols * sizeof(double) > buffer_size) {
|
||||
fprintf(stderr, "Buffer too small for matrix data.\n");
|
||||
return deserializedModel;
|
||||
}
|
||||
|
||||
// Kopiere Daten in neue Matrix
|
||||
Eigen::MatrixXd temp = Eigen::Map<Eigen::MatrixXd>(
|
||||
reinterpret_cast<double*>(memory), rows, cols);
|
||||
matrix = temp; // Kopieren der Daten
|
||||
memory += rows * cols * sizeof(double);
|
||||
size_counter += rows * cols * sizeof(double);
|
||||
}
|
||||
|
||||
// Anzahl Biases
|
||||
size_t num_biases;
|
||||
if (size_counter + sizeof(size_t) > buffer_size) {
|
||||
fprintf(stderr, "Buffer too small for biases.\n");
|
||||
return deserializedModel;
|
||||
}
|
||||
std::memcpy(&num_biases, memory, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
deserializedModel.biases.resize(num_biases);
|
||||
|
||||
// Biases deserialisieren
|
||||
for (Eigen::VectorXd &bias : deserializedModel.biases) {
|
||||
size_t size;
|
||||
if (size_counter + sizeof(size_t) > buffer_size) {
|
||||
fprintf(stderr, "Buffer too small for bias size.\n");
|
||||
return deserializedModel;
|
||||
}
|
||||
|
||||
std::memcpy(&size, memory, sizeof(size_t));
|
||||
memory += sizeof(size_t);
|
||||
size_counter += sizeof(size_t);
|
||||
|
||||
if (size_counter + size * sizeof(double) > buffer_size) {
|
||||
fprintf(stderr, "Buffer too small for bias data.\n");
|
||||
return deserializedModel;
|
||||
}
|
||||
|
||||
// Kopiere Daten in neuen Vektor
|
||||
Eigen::VectorXd temp = Eigen::Map<Eigen::VectorXd>(
|
||||
reinterpret_cast<double*>(memory), size);
|
||||
bias = temp; // Kopieren der Daten
|
||||
memory += size * sizeof(double);
|
||||
size_counter += size * sizeof(double);
|
||||
}
|
||||
|
||||
return deserializedModel;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
56
src/Chemistry/SurrogateModels/serializer.hpp
Normal file
56
src/Chemistry/SurrogateModels/serializer.hpp
Normal file
@ -0,0 +1,56 @@
|
||||
#ifndef SERIALIZER_H
|
||||
#define SERIALIZER_H
|
||||
|
||||
#include "AI_functions.hpp"
|
||||
#include <cstddef>
|
||||
|
||||
namespace poet{
|
||||
|
||||
/**
|
||||
* @brief Serialize the weights and biases of the model into a memory location
|
||||
* to send them via RDMA
|
||||
*
|
||||
* @param model: Struct of EigenModel containing the weights and biases of the
|
||||
* model
|
||||
* @param memory: Pointer to the memory location where the serialized data will
|
||||
* be stored
|
||||
* @return int: 0 if the serialization was successful, -1 otherwise
|
||||
*/
|
||||
int serializeModelWeights(const EigenModel *model, char *memory);
|
||||
|
||||
/**
|
||||
* @brief Deserialize the weights and biases of the model from a memory location
|
||||
*
|
||||
* @param data Pointer to the memory location where the serialized data is stored
|
||||
* @return EigenModel struct containing the weights and biases of the model
|
||||
*/
|
||||
EigenModel deserializeModelWeights(char* memory, size_t buffer_size);
|
||||
|
||||
/**
|
||||
* @brief Serialize the training data into a memory location to send it via RDMA
|
||||
*
|
||||
* @param data Struct of TrainingData containing the training data
|
||||
* @param memory
|
||||
* @return std::vector<char>
|
||||
*/
|
||||
int serializeTrainingData(const TrainingData& data, void *memory);
|
||||
|
||||
/**
|
||||
* @brief Deserialize the training data from a memory location
|
||||
*
|
||||
* @param data Pointer to the memory location where the serialized data is stored
|
||||
* @return TrainingData struct containing the training data
|
||||
*/
|
||||
TrainingData deserializeTrainingData(void* data);
|
||||
|
||||
/**
|
||||
* @brief Calculates the size of stored elements in the EigenModel and TrainData
|
||||
* structs
|
||||
*
|
||||
* @param struct_pointer: pointer to the struct
|
||||
* @param type: determines the struct type given: E for EigenModel and T TrainData
|
||||
* @return size_t: size of stored elements
|
||||
*/
|
||||
size_t calculateStructSize(void* struct_pointer, char type);
|
||||
}
|
||||
#endif
|
||||
Loading…
x
Reference in New Issue
Block a user