diff --git a/src/Chemistry/SurrogateModels/AI_functions.cpp b/src/Chemistry/SurrogateModels/AI_functions.cpp index 4b40b9107..8823de1d4 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.cpp +++ b/src/Chemistry/SurrogateModels/AI_functions.cpp @@ -1,68 +1,81 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "AI_functions.hpp" +#include "Base/Macros.hpp" +#include "naaice_ap2.h" #include "poet.hpp" +#include "serializer.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include using namespace std; namespace poet { /** - * @brief Loads the Python interpreter and functions + * @brief Loads the Python interpreter and functions * @param functions_file_path Path to the Python file where the AI surrogate * functions are defined * @return 0 if function was succesful */ -int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir) { +int Python_Keras_setup(std::string functions_file_path, + std::string cuda_src_dir) { // Initialize Python functions Py_Initialize(); // Import numpy functions _import_array(); - PyRun_SimpleString(("cuda_dir = \"" + cuda_src_dir + "\"").c_str()) ; - FILE* fp = fopen(functions_file_path.c_str(), "r"); - int py_functions_initialized = PyRun_SimpleFile(fp, functions_file_path.c_str()); + PyRun_SimpleString(("cuda_dir = \"" + cuda_src_dir + "\"").c_str()); + FILE *fp = fopen(functions_file_path.c_str(), "r"); + int py_functions_initialized = + PyRun_SimpleFile(fp, functions_file_path.c_str()); fclose(fp); if (py_functions_initialized != 0) { PyErr_Print(); - throw std::runtime_error(std::string("AI surrogate Python functions could not be loaded." ) + - "Are tensorflow and numpy installed?"); + throw std::runtime_error( + std::string("AI surrogate Python functions could not be loaded.") + + "Are tensorflow and numpy installed?"); } return py_functions_initialized; } /** - * @brief Loads the user-supplied Keras model + * @brief Loads the user-supplied Keras model * @param model_file_path Path to a .keras file that the user must supply as * a variable "model_file_path" in the R input script * @return 0 if function was succesful */ -int Python_Keras_load_model(std::string model, std::string model_reactive, bool use_clustering) { +int Python_Keras_load_model(std::string model, std::string model_reactive, + bool use_clustering) { // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); // Initialize Keras default model - int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" + - model + "\")").c_str()); + int py_model_loaded = + PyRun_SimpleString(("model = initiate_model(\"" + model + "\")").c_str()); if (py_model_loaded != 0) { PyErr_Print(); throw std::runtime_error("Keras model could not be loaded from: " + model); } if (use_clustering) { - // Initialize second Keras model that will be used for the "reaction" cluster - py_model_loaded = PyRun_SimpleString(("model_reactive = initiate_model(\"" + - model_reactive + "\")").c_str()); + // Initialize second Keras model that will be used for the "reaction" + // cluster + py_model_loaded = PyRun_SimpleString( + ("model_reactive = initiate_model(\"" + model_reactive + "\")") + .c_str()); if (py_model_loaded != 0) { PyErr_Print(); - throw std::runtime_error("Keras model could not be loaded from: " + model_reactive); + throw std::runtime_error("Keras model could not be loaded from: " + + model_reactive); } } // Release the Python GIL @@ -71,17 +84,17 @@ int Python_Keras_load_model(std::string model, std::string model_reactive, bool } /** - * @brief Converts the std::vector 2D matrix representation of a POET Field object to a numpy array - * for use in the Python AI surrogate functions + * @brief Converts the std::vector 2D matrix representation of a POET Field + * object to a numpy array for use in the Python AI surrogate functions * @param field 2D-Matrix with the content of a Field object * @return Numpy representation of the input vector */ -PyObject* vector_to_numpy_array(const std::vector>& field) { - npy_intp dims[2] = {static_cast(field[0].size()), +PyObject *vector_to_numpy_array(const std::vector> &field) { + npy_intp dims[2] = {static_cast(field[0].size()), static_cast(field.size())}; - - PyObject* np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64); - double* data = static_cast(PyArray_DATA((PyArrayObject*)np_array)); + + PyObject *np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64); + double *data = static_cast(PyArray_DATA((PyArrayObject *)np_array)); // write field data to numpy array for (size_t i = 0; i < field.size(); ++i) { for (size_t j = 0; j < field[i].size(); ++j) { @@ -94,28 +107,28 @@ PyObject* vector_to_numpy_array(const std::vector>& field) { /** * @brief Converts a Pyton matrix object to a std::vector vector * @param py_matrix Pyobject that must be a 2D matrix - * @result Vector that can be used similar to the return value of the Field object - * Field.AsVector() method. + * @result Vector that can be used similar to the return value of the Field + * object Field.AsVector() method. */ -std::vector numpy_array_to_vector(PyObject* py_array) { +std::vector numpy_array_to_vector(PyObject *py_array) { std::vector result; if (!PyArray_Check(py_array)) { std::cerr << "The model's output is not a numpy array." << std::endl; return result; } // Cast generic PyObject to PyArrayObject - PyArrayObject* np_array = reinterpret_cast(py_array); - + PyArrayObject *np_array = reinterpret_cast(py_array); + // Get shape int numDims = PyArray_NDIM(np_array); - npy_intp* shape = PyArray_SHAPE(np_array); + npy_intp *shape = PyArray_SHAPE(np_array); if (numDims != 2) { std::cerr << "The model's predictions are not a 2D matrix." << std::endl; return result; } // Copy data into std::vector format - double* data = static_cast(PyArray_DATA(np_array)); + double *data = static_cast(PyArray_DATA(np_array)); npy_intp size = PyArray_SIZE(np_array); result.resize(size); std::copy(data, data + size, result.begin()); @@ -123,198 +136,232 @@ std::vector numpy_array_to_vector(PyObject* py_array) { } /** - * @brief Uses the Python Keras functions to calculate predictions from a neural network. + * @brief Uses the Python Keras functions to calculate predictions from a neural + * network. * @param x 2D-Matrix with the content of a Field object - * @param batch_size size for mini-batches that are used in the Keras model.predict() method - * @return Predictions that the neural network made from the input values x. The predictions are - * represented as a vector similar to the representation from the Field.AsVector() method + * @param batch_size size for mini-batches that are used in the Keras + * model.predict() method + * @return Predictions that the neural network made from the input values x. The + * predictions are represented as a vector similar to the representation from + * the Field.AsVector() method */ -std::vector Python_Keras_predict(std::vector>& x, int batch_size, - std::vector& cluster_labels) { +std::vector Python_Keras_predict(std::vector> &x, + int batch_size, + std::vector &cluster_labels) { // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); // Prepare data for Python - PyObject* py_df_x = vector_to_numpy_array(x); + PyObject *py_df_x = vector_to_numpy_array(x); // Prepare cluster label vector for Python - PyObject* py_cluster_list = PyList_New(cluster_labels.size()); + PyObject *py_cluster_list = PyList_New(cluster_labels.size()); for (size_t i = 0; i < cluster_labels.size(); i++) { - PyObject* py_int = PyLong_FromLong(cluster_labels[i]); + PyObject *py_int = PyLong_FromLong(cluster_labels[i]); PyList_SET_ITEM(py_cluster_list, i, py_int); } - + // Get the model and inference function from the global python interpreter - PyObject* py_main_module = PyImport_AddModule("__main__"); - PyObject* py_global_dict = PyModule_GetDict(py_main_module); - PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model"); - PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step"); - + PyObject *py_main_module = PyImport_AddModule("__main__"); + PyObject *py_global_dict = PyModule_GetDict(py_main_module); + PyObject *py_keras_model = PyDict_GetItemString(py_global_dict, "model"); + PyObject *py_inference_function = + PyDict_GetItemString(py_global_dict, "prediction_step"); + // Get secod model if clustering is used - PyObject* py_keras_model_reactive = Py_None;; + PyObject *py_keras_model_reactive = Py_None; + ; if (cluster_labels.size() > 0) { - py_keras_model_reactive = PyDict_GetItemString(py_global_dict, "model_reactive"); + py_keras_model_reactive = + PyDict_GetItemString(py_global_dict, "model_reactive"); } // Build the function arguments as four python objects and an integer - PyObject* args = Py_BuildValue("(OOOOi)", - py_keras_model, py_keras_model_reactive, py_df_x, py_cluster_list, batch_size); + PyObject *args = + Py_BuildValue("(OOOOi)", py_keras_model, py_keras_model_reactive, py_df_x, + py_cluster_list, batch_size); // Call the Python inference function - PyObject* py_predictions = PyObject_CallObject(py_inference_function, args); + PyObject *py_predictions = PyObject_CallObject(py_inference_function, args); // Check py_rv and return as 2D vector - std::vector predictions = numpy_array_to_vector(py_predictions); - + std::vector predictions = numpy_array_to_vector(py_predictions); + // Clean up - PyErr_Print(); + PyErr_Print(); Py_XDECREF(py_df_x); Py_XDECREF(py_cluster_list); Py_XDECREF(args); Py_XDECREF(py_predictions); - + // Release the Python GIL PyGILState_Release(gstate); return predictions; } /** - * @brief Uses Eigen for fast inference with the weights and biases of a neural network. - * This function assumes ReLU activation for each layer. - * @param input_batch Batch of input data that must fit the size of the neural networks input layer - * @param model Struct of aligned Eigen vectors that hold the neural networks weights and biases. - * Only supports simple fully connected feed forward networks. - * @return The batch of predictions made with the neural network weights and biases and the data - * in input_batch + * @brief Uses Eigen for fast inference with the weights and biases of a neural + * network. This function assumes ReLU activation for each layer. + * @param input_batch Batch of input data that must fit the size of the neural + * networks input layer + * @param model Struct of aligned Eigen vectors that hold the neural networks + * weights and biases. Only supports simple fully connected feed forward + * networks. + * @return The batch of predictions made with the neural network weights and + * biases and the data in input_batch */ -Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref& input_batch, const EigenModel& model) { - Eigen::MatrixXd current_layer = input_batch; +Eigen::MatrixXd +eigen_inference_batched(const Eigen::Ref &input_batch, + const EigenModel &model) { + Eigen::MatrixXd current_layer = input_batch; - // Process all hidden layers - for (size_t layer = 0; layer < model.weight_matrices.size() - 1; ++layer) { - current_layer = (model.weight_matrices[layer] * current_layer); - current_layer = current_layer.colwise() + model.biases[layer]; - current_layer = current_layer.array().max(0.0); - } + // Process all hidden layers + for (size_t layer = 0; layer < model.weight_matrices.size() - 1; ++layer) { + current_layer = (model.weight_matrices[layer] * current_layer); + current_layer = current_layer.colwise() + model.biases[layer]; + current_layer = current_layer.array().max(0.0); + } - // Process output layer (without ReLU) - size_t output_layer = model.weight_matrices.size() - 1; - return (model.weight_matrices[output_layer] * current_layer).colwise() + model.biases[output_layer]; + // Process output layer (without ReLU) + size_t output_layer = model.weight_matrices.size() - 1; + return (model.weight_matrices[output_layer] * current_layer).colwise() + + model.biases[output_layer]; } /** - * @brief Uses the Eigen representation of the two different Keras model weights for fast inference + * @brief Uses the Eigen representation of the two different Keras model weights + * for fast inference * @param model The model for the non reactive cluster of the field (label 0) - * @param model_reactive The model for the non reactive cluster of the field (label 1) + * @param model_reactive The model for the non reactive cluster of the field + * (label 1) * @param x 2D-Matrix with the content of a Field object - * @param batch_size size for mini-batches that are used in the Keras model.predict() method - * @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from - * the training thread + * @param batch_size size for mini-batches that are used in the Keras + * model.predict() method + * @param Eigen_model_mutex Mutex that locks the model during inference and + * prevents updaties from the training thread * @param cluster_labels K-Means cluster label dor each row in the field - * @return Predictions that the neural network made from the input values x. The predictions are - * represented as a vector similar to the representation from the Field.AsVector() method + * @return Predictions that the neural network made from the input values x. The + * predictions are represented as a vector similar to the representation from + * the Field.AsVector() method */ -std::vector Eigen_predict_clustered(const EigenModel& model, const EigenModel& model_reactive, - std::vector>& x, int batch_size, - std::mutex* Eigen_model_mutex, std::vector& cluster_labels) { - const int num_samples = x[0].size(); - const int num_features = x.size(); - if (num_features != model.weight_matrices[0].cols() || - num_features != model_reactive.weight_matrices[0].cols()) { - throw std::runtime_error("Input data size " + std::to_string(num_features) + - " does not match model input layer sizes" + std::to_string(model.weight_matrices[0].cols()) + - " / " + std::to_string(model_reactive.weight_matrices[0].cols())); - } +std::vector Eigen_predict_clustered(const EigenModel &model, + const EigenModel &model_reactive, + std::vector> &x, + int batch_size, + std::mutex *Eigen_model_mutex, + std::vector &cluster_labels) { + const int num_samples = x[0].size(); + const int num_features = x.size(); + if (num_features != model.weight_matrices[0].cols() || + num_features != model_reactive.weight_matrices[0].cols()) { + throw std::runtime_error( + "Input data size " + std::to_string(num_features) + + " does not match model input layer sizes" + + std::to_string(model.weight_matrices[0].cols()) + " / " + + std::to_string(model_reactive.weight_matrices[0].cols())); + } - // Convert input data to Eigen matrix - Eigen::MatrixXd full_input_matrix(num_features, num_samples); - for (size_t i = 0; i < num_samples; ++i) { - for (size_t j = 0; j < num_features; ++j) { - full_input_matrix(j, i) = x[j][i]; + // Convert input data to Eigen matrix + Eigen::MatrixXd full_input_matrix(num_features, num_samples); + for (size_t i = 0; i < num_samples; ++i) { + for (size_t j = 0; j < num_features; ++j) { + full_input_matrix(j, i) = x[j][i]; + } + } + + // Create indices for each cluster + std::vector cluster_0_indices, cluster_1_indices; + for (size_t i = 0; i < cluster_labels.size(); ++i) { + if (cluster_labels[i] == 0) { + cluster_0_indices.push_back(i); + } else { + cluster_1_indices.push_back(i); + } + } + + // Prepare matrices for each cluster + Eigen::MatrixXd input_matrix(num_features, cluster_0_indices.size()); + Eigen::MatrixXd input_matrix_reactive(num_features, cluster_1_indices.size()); + + // Split data according to cluster labels + for (size_t i = 0; i < cluster_0_indices.size(); ++i) { + input_matrix.col(i) = full_input_matrix.col(cluster_0_indices[i]); + } + for (size_t i = 0; i < cluster_1_indices.size(); ++i) { + input_matrix_reactive.col(i) = full_input_matrix.col(cluster_1_indices[i]); + } + // Process each cluster + std::vector result(num_samples * model.weight_matrices.back().rows()); + Eigen_model_mutex->lock(); + + if (!cluster_0_indices.empty()) { + int num_batches_0 = + std::ceil(static_cast(cluster_0_indices.size()) / batch_size); + for (int batch = 0; batch < num_batches_0; ++batch) { + int start_idx = batch * batch_size; + int end_idx = std::min((batch + 1) * batch_size, + static_cast(cluster_0_indices.size())); + int current_batch_size = end_idx - start_idx; + + Eigen::MatrixXd batch_data = + input_matrix.block(0, start_idx, num_features, current_batch_size); + Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model); + + // Store results in their original positions + for (size_t i = 0; i < current_batch_size; ++i) { + int original_idx = cluster_0_indices[start_idx + i]; + for (size_t j = 0; j < batch_result.rows(); ++j) { + result[original_idx * batch_result.rows() + j] = batch_result(j, i); } + } } + } - // Create indices for each cluster - std::vector cluster_0_indices, cluster_1_indices; - for (size_t i = 0; i < cluster_labels.size(); ++i) { - if (cluster_labels[i] == 0) { - cluster_0_indices.push_back(i); - } else { - cluster_1_indices.push_back(i); + // Process cluster 1 + if (!cluster_1_indices.empty()) { + int num_batches_1 = + std::ceil(static_cast(cluster_1_indices.size()) / batch_size); + for (int batch = 0; batch < num_batches_1; ++batch) { + int start_idx = batch * batch_size; + int end_idx = std::min((batch + 1) * batch_size, + static_cast(cluster_1_indices.size())); + int current_batch_size = end_idx - start_idx; + + Eigen::MatrixXd batch_data = input_matrix_reactive.block( + 0, start_idx, num_features, current_batch_size); + Eigen::MatrixXd batch_result = + eigen_inference_batched(batch_data, model_reactive); + + // Store results in their original positions + for (size_t i = 0; i < current_batch_size; ++i) { + int original_idx = cluster_1_indices[start_idx + i]; + for (size_t j = 0; j < batch_result.rows(); ++j) { + result[original_idx * batch_result.rows() + j] = batch_result(j, i); } + } } + } - // Prepare matrices for each cluster - Eigen::MatrixXd input_matrix(num_features, cluster_0_indices.size()); - Eigen::MatrixXd input_matrix_reactive(num_features, cluster_1_indices.size()); - - // Split data according to cluster labels - for (size_t i = 0; i < cluster_0_indices.size(); ++i) { - input_matrix.col(i) = full_input_matrix.col(cluster_0_indices[i]); - } - for (size_t i = 0; i < cluster_1_indices.size(); ++i) { - input_matrix_reactive.col(i) = full_input_matrix.col(cluster_1_indices[i]); - } - // Process each cluster - std::vector result(num_samples * model.weight_matrices.back().rows()); - Eigen_model_mutex->lock(); - - if (!cluster_0_indices.empty()) { - int num_batches_0 = std::ceil(static_cast(cluster_0_indices.size()) / batch_size); - for (int batch = 0; batch < num_batches_0; ++batch) { - int start_idx = batch * batch_size; - int end_idx = std::min((batch + 1) * batch_size, static_cast(cluster_0_indices.size())); - int current_batch_size = end_idx - start_idx; - - Eigen::MatrixXd batch_data = input_matrix.block(0, start_idx, num_features, current_batch_size); - Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model); - - // Store results in their original positions - for (size_t i = 0; i < current_batch_size; ++i) { - int original_idx = cluster_0_indices[start_idx + i]; - for (size_t j = 0; j < batch_result.rows(); ++j) { - result[original_idx * batch_result.rows() + j] = batch_result(j, i); - } - } - } - } - - // Process cluster 1 - if (!cluster_1_indices.empty()) { - int num_batches_1 = std::ceil(static_cast(cluster_1_indices.size()) / batch_size); - for (int batch = 0; batch < num_batches_1; ++batch) { - int start_idx = batch * batch_size; - int end_idx = std::min((batch + 1) * batch_size, static_cast(cluster_1_indices.size())); - int current_batch_size = end_idx - start_idx; - - Eigen::MatrixXd batch_data = input_matrix_reactive.block(0, start_idx, num_features, current_batch_size); - Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model_reactive); - - // Store results in their original positions - for (size_t i = 0; i < current_batch_size; ++i) { - int original_idx = cluster_1_indices[start_idx + i]; - for (size_t j = 0; j < batch_result.rows(); ++j) { - result[original_idx * batch_result.rows() + j] = batch_result(j, i); - } - } - } - } - - Eigen_model_mutex->unlock(); - return result; + Eigen_model_mutex->unlock(); + return result; } /** - * @brief Uses the Eigen representation of the tKeras model weights for fast inference + * @brief Uses the Eigen representation of the tKeras model weights for fast + * inference * @param model The model weights and biases * @param x 2D-Matrix with the content of a Field object - * @param batch_size size for mini-batches that are used in the Keras model.predict() method - * @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from - * the training thread - * @return Predictions that the neural network made from the input values x. The predictions are - * represented as a vector similar to the representation from the Field.AsVector() method + * @param batch_size size for mini-batches that are used in the Keras + * model.predict() method + * @param Eigen_model_mutex Mutex that locks the model during inference and + * prevents updaties from the training thread + * @return Predictions that the neural network made from the input values x. The + * predictions are represented as a vector similar to the representation from + * the Field.AsVector() method */ -std::vector Eigen_predict(const EigenModel& model, std::vector> x, int batch_size, - std::mutex* Eigen_model_mutex) { +std::vector Eigen_predict(const EigenModel &model, + std::vector> x, + int batch_size, + std::mutex *Eigen_model_mutex) { // Convert input data to Eigen matrix const int num_samples = x[0].size(); const int num_features = x.size(); @@ -328,10 +375,11 @@ std::vector Eigen_predict(const EigenModel& model, std::vector result; result.reserve(num_samples * num_features); - + if (num_features != model.weight_matrices[0].cols()) { - throw std::runtime_error("Input data size " + std::to_string(num_features) + \ - " does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols())); + throw std::runtime_error("Input data size " + std::to_string(num_features) + + " does not match model input layer of size " + + std::to_string(model.weight_matrices[0].cols())); } int num_batches = std::ceil(static_cast(num_samples) / batch_size); @@ -342,32 +390,35 @@ std::vector Eigen_predict(const EigenModel& model, std::vectorunlock(); return result; } - - /** - * @brief Appends data from one matrix (column major std::vector>) to another + * @brief Appends data from one matrix (column major + * std::vector>) to another * @param training_data_buffer Matrix that the values are appended to * @param new_values Matrix that is appended */ -void training_data_buffer_append(std::vector>& training_data_buffer, - std::vector>& new_values) { +void training_data_buffer_append( + std::vector> &training_data_buffer, + std::vector> &new_values) { // Initialize training data buffer if empty if (training_data_buffer.size() == 0) { training_data_buffer = new_values; } else { // otherwise append for (size_t col = 0; col < training_data_buffer.size(); col++) { training_data_buffer[col].insert(training_data_buffer[col].end(), - new_values[col].begin(), new_values[col].end()); + new_values[col].begin(), + new_values[col].end()); } } } @@ -376,9 +427,11 @@ void training_data_buffer_append(std::vector>& training_data * @brief Appends data from one int vector to another based on a mask vector * @param labels Vector that the values are appended to * @param new_labels Values that are appended - * @param validity Mask vector that defines how many and which values are appended + * @param validity Mask vector that defines how many and which values are + * appended */ -void cluster_labels_append(std::vector& labels_buffer, std::vector& new_labels, +void cluster_labels_append(std::vector &labels_buffer, + std::vector &new_labels, std::vector validity) { // Calculate new buffer size from number of valid elements in mask int n_invalid = validity.size(); @@ -387,37 +440,40 @@ void cluster_labels_append(std::vector& labels_buffer, std::vector& ne } // Resize label vector to hold non valid elements - int end_index = labels_buffer.size(); + int end_index = labels_buffer.size(); int new_size = end_index + n_invalid; labels_buffer.resize(new_size); // Iterate over mask to transfer cluster labels for (size_t i = 0; i < validity.size(); ++i) { - // Append only the labels of invalid rows + // Append only the labels of invalid rows if (!validity[i]) { - labels_buffer[end_index] = new_labels[i]; + labels_buffer[end_index] = new_labels[i]; end_index++; } } } - /** - * @brief Uses the Python environment with Keras' default functions to train the model + * @brief Uses the Python environment with Keras' default functions to train the + * model * @param x Training data features * @param y Training data targets * @param params Global runtime paramters */ -void Python_Keras_train(std::vector>& x, std::vector>& y, - int train_cluster, std::string model_name, const RuntimeParameters& params) { +void Python_Keras_train(std::vector> &x, + std::vector> &y, int train_cluster, + std::string model_name, + const RuntimeParameters ¶ms) { // Prepare data for python - PyObject* py_df_x = vector_to_numpy_array(x); - PyObject* py_df_y = vector_to_numpy_array(y); + PyObject *py_df_x = vector_to_numpy_array(x); + PyObject *py_df_y = vector_to_numpy_array(y); // Make sure that model output file name .keras file - std::string model_path = params.save_model_path; + std::string model_path = params.save_model_path; if (!model_path.empty()) { - if (model_path.length() >= 6 && model_path.substr(model_path.length() - 6) != ".keras") { - model_path += ".keras"; + if (model_path.length() >= 6 && + model_path.substr(model_path.length() - 6) != ".keras") { + model_path += ".keras"; } } @@ -430,21 +486,23 @@ void Python_Keras_train(std::vector>& x, std::vector>& x, std::vector lock(*training_data_buffer_mutex); - training_data_buffer_full->wait(lock, [start_training] { return *start_training;}); + training_data_buffer_full->wait( + lock, [start_training] { return *start_training; }); // Return if program is about to end if (*end_training) { return; @@ -490,19 +557,30 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive // Get the necessary training data std::cout << "AI: Training thread: Getting training data" << std::endl; // Initialize training data input and targets - std::vector> inputs(training_data_buffer->x.size(), - std::vector(params.training_data_size)); - std::vector> targets(training_data_buffer->x.size(), - std::vector(params.training_data_size)); + std::vector> inputs( + training_data_buffer->x.size(), + std::vector(params.training_data_size)); + std::vector> targets( + training_data_buffer->x.size(), + std::vector(params.training_data_size)); + + fprintf(stdout, "x.size: %zu\n", training_data_buffer->x.size()); + fprintf(stdout, "params.training_data_size: %d\n", params.training_data_size); int buffer_size = training_data_buffer->x[0].size(); + fprintf(stdout, "training_data_buffer->x[0].size: %zu\n", training_data_buffer->x[0].size()); + sleep(10); + // If clustering is used, check the current cluster int n_cluster_reactive = 0; - int train_cluster = -1; // Default value for non clustered training (all data is used) + int train_cluster = + -1; // Default value for non clustered training (all data is used) if (params.use_clustering) { for (size_t i = 0; i < buffer_size; i++) { - n_cluster_reactive += training_data_buffer->cluster_labels[i]; + n_cluster_reactive += training_data_buffer->cluster_labels[i]; } + // ? set train_cluster to true only if all labels in training_data_buffer + // have the corresponding cluster_label? train_cluster = n_cluster_reactive >= params.training_data_size; } int buffer_row = 0; @@ -514,15 +592,15 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive // Copy and remove from training data buffer inputs[col][copied_row] = training_data_buffer->x[col][buffer_row]; targets[col][copied_row] = training_data_buffer->y[col][buffer_row]; - training_data_buffer->x[col].erase(training_data_buffer->x[col].begin() + - buffer_row); - training_data_buffer->y[col].erase(training_data_buffer->y[col].begin() + - buffer_row); + training_data_buffer->x[col].erase( + training_data_buffer->x[col].begin() + buffer_row); + training_data_buffer->y[col].erase( + training_data_buffer->y[col].begin() + buffer_row); } // Remove from cluster label buffer if (params.use_clustering) { training_data_buffer->cluster_labels.erase( - training_data_buffer->cluster_labels.begin() + buffer_row); + training_data_buffer->cluster_labels.begin() + buffer_row); } copied_row++; } else { @@ -530,34 +608,41 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive } } - // Set the waiting predicate to immediately continue training if enough elements of any cluster remain + // Set the waiting predicate to immediately continue training if enough + // elements of any cluster remain if (train_cluster == 1) { - *start_training = ((n_cluster_reactive - params.training_data_size) >= params.training_data_size) || - ((buffer_size - n_cluster_reactive) >= params.training_data_size); + *start_training = + ((n_cluster_reactive - params.training_data_size) >= + params.training_data_size) || + ((buffer_size - n_cluster_reactive) >= params.training_data_size); } else { - *start_training = (buffer_size - n_cluster_reactive - params.training_data_size) - >= params.training_data_size; + *start_training = + (buffer_size - n_cluster_reactive - params.training_data_size) >= + params.training_data_size; } - - //update number of training runs + + // update number of training runs training_data_buffer->n_training_runs += 1; - // Unlock the training_data_buffer_mutex + // Unlock the training_data_buffer_mutex lock.unlock(); std::string model_name = "model"; if (train_cluster == 1) { model_name = "model_reactive"; } - std::cout << "AI: Training thread: Start training " << model_name << std::endl; + std::cout << "AI: Training thread: Start training " << model_name + << std::endl; // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); // Start training Python_Keras_train(inputs, targets, train_cluster, model_name, params); - + if (!params.use_Keras_predictions) { - std::cout << "AI: Training thread: Update shared model weights" << std::endl; - std::vector>> cpp_weights = Python_Keras_get_weights(model_name); + std::cout << "AI: Training thread: Update shared model weights" + << std::endl; + std::vector>> cpp_weights = + Python_Keras_get_weights(model_name); Eigen_model_mutex->lock(); if (train_cluster == 1) { update_weights(Eigen_model_reactive, cpp_weights); @@ -566,91 +651,203 @@ void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive } Eigen_model_mutex->unlock(); } - + // Release the Python GIL PyGILState_Release(gstate); - std::cout << "AI: Training thread: Finished training, waiting for new data" << std::endl; + std::cout << "AI: Training thread: Finished training, waiting for new data" + << std::endl; } } +void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive, + std::mutex *Eigen_model_mutex, + TrainingData *training_data_buffer, + std::mutex *training_data_buffer_mutex, + std::condition_variable *training_data_buffer_full, + bool *start_training, bool *end_training, + const RuntimeParameters ¶ms, naa_handle *handle){ + Eigen_model_mutex->lock(); + training_data_buffer_mutex->lock(); + size_t model_size = calculateStructSize(Eigen_model, 'E'); + // TODO: initialize models with weights from keras model + std::string model_name = "model"; + std::vector>> model_weight = + Python_Keras_get_weights(model_name); + update_weights(Eigen_model, model_weight); + fprintf(stdout, "example weight: %f\n", + Eigen_model->weight_matrices[0](0, 0)); + model_size = calculateStructSize(Eigen_model, 'E'); + + + fprintf(stdout, "size after: %zu\n", model_size); + char *serializedData = (char *)calloc(model_size, sizeof(char)); + if (serializedData == NULL) { + exit(EXIT_FAILURE); + } + int res = serializeModelWeights(Eigen_model, serializedData); + + struct naa_param_t weight_region[] = {(void*) serializedData, model_size}; + + printf("-- Setting Up Connection --\n"); + if (naa_create(1, weight_region, 1, + weight_region, 0, handle)) { + fprintf(stderr, "Error during naa_create. Exiting.\n"); + exit(EXIT_FAILURE); + } + + printf("-- RPC Invocation --\n"); + if (naa_invoke(handle)) { + fprintf(stderr, "Error during naa_invoke. Exiting.\n"); + exit(EXIT_FAILURE); + } + + // TODO: naa_wait with new weights + + + + + EigenModel deserializedModel = deserializeModelWeights(serializedData, model_size); + fprintf(stdout, "After deserialization: %f\n", deserializedModel.weight_matrices[0](0,0)); + + for(int i=0;iweight_matrices[0].rows();i++){ + for (int j=0;jweight_matrices[0].cols();j++){ + fprintf(stdout, "model: %f, deserializedModel: %f\n", + Eigen_model->weight_matrices[0](i, j), deserializedModel.weight_matrices[0](i,j)); + } + } + + free(serializedData); + serializedData = nullptr; + // naa_finalize: clean up connection. + printf("-- Cleaning Up --\n"); + naa_finalize(handle); + // fprintf(stdout, "after free\n"); + // TODO: determine the struct sizes of EigenModel and TrainData + // TODO: initialize RDMA memory regions (two sections: model weights and training data) + + // TODO: establish connection to NAA server + + // TODO: if training buffer is full + + + // TODO: serialize EigenModel and TrainData + // TODO: run invoke call from naaice API + // TODO: wait for results + // TODO: update model weights (dont forget locking) + // TODO: wait for next training buffer iteration + + Eigen_model_mutex->unlock(); + training_data_buffer_mutex->unlock(); + + +} + std::thread python_train_thread; +std::thread naa_train_thread; /** - * @brief Starts a thread for parallel training and weight updating. This Wrapper function - * ensures, that the main POET program can be built without pthread support if the AI - * surrogate functions are disabled during compilation. - * @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights - * @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct - * @param training_data_buffer Pointer to the Training data struct with which the model is trained - * @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct - * @param training_data_buffer_full Conditional waiting variable with wich the main thread signals - * when a training run can start - * @param start_training Conditional waiting predicate to mitigate against spurious wakeups - * @param end_training Signals end of program to wind down thread gracefully + * @brief Starts a thread for parallel training and weight updating. This + * Wrapper function ensures, that the main POET program can be built without + * pthread support if the AI surrogate functions are disabled during + * compilation. + * @param Eigen_model Pointer to the EigenModel struct that will be updates with + * new weights + * @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel + * struct + * @param training_data_buffer Pointer to the Training data struct with which + * the model is trained + * @param training_data_buffer_mutex Mutex to ensure threadsafe access to the + * training data struct + * @param training_data_buffer_full Conditional waiting variable with wich the + * main thread signals when a training run can start + * @param start_training Conditional waiting predicate to mitigate against + * spurious wakeups + * @param end_training Signals end of program to wind down thread gracefully * @param params Global runtime paramters * @return 0 if function was succesful */ -int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive, - std::mutex* Eigen_model_mutex, - TrainingData* training_data_buffer, - std::mutex* training_data_buffer_mutex, - std::condition_variable* training_data_buffer_full, - bool* start_training, bool* end_training, - const RuntimeParameters& params) { - PyThreadState *_save = PyEval_SaveThread(); - python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_reactive, - Eigen_model_mutex, training_data_buffer, - training_data_buffer_mutex, training_data_buffer_full, - start_training, end_training, params); +int Python_Keras_training_thread( + EigenModel *Eigen_model, EigenModel *Eigen_model_reactive, + std::mutex *Eigen_model_mutex, TrainingData *training_data_buffer, + std::mutex *training_data_buffer_mutex, + std::condition_variable *training_data_buffer_full, bool *start_training, + bool *end_training, const RuntimeParameters ¶ms, naa_handle *handle) { + MSG("In Python_Keras_training_thread"); + PyThreadState *_save = PyEval_SaveThread(); // ? + // check if naa is activated and if so, we use training on naa server + if(params.use_naa){ + if (!(handle == NULL)) { + MSG("NAA Accelerator is used for online training"); + naa_train_thread = std::thread( + naa_training, Eigen_model, Eigen_model_reactive, Eigen_model_mutex, + training_data_buffer, training_data_buffer_mutex, + training_data_buffer_full, start_training, end_training, params, handle); + } + } else{ + python_train_thread = std::thread( + parallel_training, Eigen_model, Eigen_model_reactive, Eigen_model_mutex, + training_data_buffer, training_data_buffer_mutex, + training_data_buffer_full, start_training, end_training, params); + } + fprintf(stdout, "End of Python_Keras_training_thread\n"); return 0; } - /** * @brief Updates the EigenModels weigths and biases from the weight vector - * @param model Pounter to an EigenModel struct - * @param weights Vector of model weights from keras as returned by Python_Keras_get_weights() + * @param model Pointer to an EigenModel struct + * @param weights Vector of model weights from keras as returned by + * Python_Keras_get_weights() */ -void update_weights(EigenModel* model, - const std::vector>>& weights) { - size_t num_layers = weights.size() / 2; + // ? check if updating was succesful -> hash about values? +void update_weights( + EigenModel *model, + const std::vector>> &weights) { + MSG("In update_weights"); + size_t num_layers = + weights.size() / 2; // half length because it contains weights and biases for (size_t i = 0; i < weights.size(); i += 2) { - // Fill current weight matrix - size_t rows = weights[i][0].size(); - size_t cols = weights[i].size(); - for (size_t j = 0; j < cols; ++j) { - for (size_t k = 0; k < rows; ++k) { - model->weight_matrices[i / 2](k, j) = weights[i][j][k]; - } - } - // Fill bias vector - size_t bias_size = weights[i + 1][0].size(); - for (size_t j = 0; j < bias_size; ++j) { - model->biases[i / 2](j) = weights[i + 1][0][j]; + // Fill current weight matrix + size_t rows = weights[i][0].size(); + size_t cols = weights[i].size(); + for (size_t j = 0; j < cols; ++j) { + for (size_t k = 0; k < rows; ++k) { + model->weight_matrices[i / 2](k, j) = weights[i][j][k]; } + } + // Fill bias vector + size_t bias_size = weights[i + 1][0].size(); + for (size_t j = 0; j < bias_size; ++j) { + model->biases[i / 2](j) = weights[i + 1][0][j]; + } } } - /** - * @brief Converts the weights and biases from the Python Keras model to C++ vectors - * @return A vector containing the model weights and biases + * @brief Converts the weights and biases from the Python Keras model to C++ + * vectors + * @return A vector containing the model weights and biases */ -std::vector>> Python_Keras_get_weights(std::string model_name) { +std::vector>> +Python_Keras_get_weights(std::string model_name) { // Acquire the Python GIL + fprintf(stdout, "In Python_Keras_get_weights\n"); PyGILState_STATE gstate = PyGILState_Ensure(); - PyObject* py_main_module = PyImport_AddModule("__main__"); - PyObject* py_global_dict = PyModule_GetDict(py_main_module); - PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str()); - PyObject* py_get_weights_function = PyDict_GetItemString(py_global_dict, "get_weights"); - PyObject* args = Py_BuildValue("(O)", py_keras_model); - + PyObject *py_main_module = PyImport_AddModule("__main__"); + PyObject *py_global_dict = PyModule_GetDict(py_main_module); + PyObject *py_keras_model = + PyDict_GetItemString(py_global_dict, model_name.c_str()); + PyObject *py_get_weights_function = + PyDict_GetItemString(py_global_dict, "get_weights"); + PyObject *args = Py_BuildValue("(O)", py_keras_model); + // Call Python function - PyObject* py_weights_list = PyObject_CallObject(py_get_weights_function, args); - + PyObject *py_weights_list = + PyObject_CallObject(py_get_weights_function, args); + if (!py_weights_list) { - PyErr_Print(); - throw std::runtime_error("Failed to get weights from Keras model"); + PyErr_Print(); + throw std::runtime_error("Failed to get weights from Keras model"); } // Container for the extracted weights @@ -659,58 +856,68 @@ std::vector>> Python_Keras_get_weights(std::stri // Iterate through the layers (weights and biases) Py_ssize_t num_layers = PyList_Size(py_weights_list); for (Py_ssize_t i = 0; i < num_layers; ++i) { - PyObject* py_weight_array = PyList_GetItem(py_weights_list, i); + PyObject *py_weight_array = PyList_GetItem(py_weights_list, i); if (!PyArray_Check(py_weight_array)) { throw std::runtime_error("Weight is not a NumPy array."); } - PyArrayObject* weight_np = reinterpret_cast(py_weight_array); + PyArrayObject *weight_np = + reinterpret_cast(py_weight_array); int dtype = PyArray_TYPE(weight_np); // If array is 2D it's a weight matrix if (PyArray_NDIM(weight_np) == 2) { int num_rows = PyArray_DIM(weight_np, 0); int num_cols = PyArray_DIM(weight_np, 1); - - std::vector> weight_matrix(num_rows, std::vector(num_cols)); + + std::vector> weight_matrix( + num_rows, std::vector(num_cols)); // Handle different precision settings if (dtype == NPY_FLOAT32) { - float* weight_data_float = static_cast(PyArray_DATA(weight_np)); + // + float *weight_data_float = + static_cast(PyArray_DATA(weight_np)); for (size_t r = 0; r < num_rows; ++r) { for (size_t c = 0; c < num_cols; ++c) { - weight_matrix[r][c] = static_cast(weight_data_float[r * num_cols + c]); + weight_matrix[r][c] = + static_cast(weight_data_float[r * num_cols + c]); } } } else if (dtype == NPY_DOUBLE) { - double* weight_data_double = static_cast(PyArray_DATA(weight_np)); + double *weight_data_double = + static_cast(PyArray_DATA(weight_np)); for (size_t r = 0; r < num_rows; ++r) { for (size_t c = 0; c < num_cols; ++c) { weight_matrix[r][c] = weight_data_double[r * num_cols + c]; } } } else { - throw std::runtime_error("Unsupported data type for weights. Must be NPY_FLOAT32 or NPY_DOUBLE."); + throw std::runtime_error("Unsupported data type for weights. Must be " + "NPY_FLOAT32 or NPY_DOUBLE."); } cpp_weights.push_back(weight_matrix); - // If array is 1D it's a bias vector + // If array is 1D it's a bias vector } else if (PyArray_NDIM(weight_np) == 1) { int num_elements = PyArray_DIM(weight_np, 0); - std::vector> bias_vector(1, std::vector(num_elements)); + std::vector> bias_vector( + 1, std::vector(num_elements)); // Handle different precision settings if (dtype == NPY_FLOAT32) { - float* bias_data_float = static_cast(PyArray_DATA(weight_np)); + float *bias_data_float = static_cast(PyArray_DATA(weight_np)); for (size_t j = 0; j < num_elements; ++j) { bias_vector[0][j] = static_cast(bias_data_float[j]); } } else if (dtype == NPY_DOUBLE) { - double* bias_data_double = static_cast(PyArray_DATA(weight_np)); + double *bias_data_double = + static_cast(PyArray_DATA(weight_np)); for (size_t j = 0; j < num_elements; ++j) { bias_vector[0][j] = bias_data_double[j]; } } else { - throw std::runtime_error("Unsupported data type for biases. Must be NPY_FLOAT32 or NPY_DOUBLE."); + throw std::runtime_error("Unsupported data type for biases. Must be " + "NPY_FLOAT32 or NPY_DOUBLE."); } cpp_weights.push_back(bias_vector); } @@ -724,18 +931,22 @@ std::vector>> Python_Keras_get_weights(std::stri return cpp_weights; } - /** - * @brief Joins the training thread and winds down the Python environment gracefully - * @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct - * @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct - * @param training_data_buffer_full Conditional waiting variable with wich the main thread signals - * when a training run can start - * @param start_training Conditional waiting predicate to mitigate against spurious wakeups + * @brief Joins the training thread and winds down the Python environment + * gracefully + * @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel + * struct + * @param training_data_buffer_mutex Mutex to ensure threadsafe access to the + * training data struct + * @param training_data_buffer_full Conditional waiting variable with wich the + * main thread signals when a training run can start + * @param start_training Conditional waiting predicate to mitigate against + * spurious wakeups * @param end_training Signals end of program to wind down thread gracefully */ -void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex, - std::condition_variable* training_data_buffer_full, - bool* start_training, bool* end_training) { +void Python_finalize(std::mutex *Eigen_model_mutex, + std::mutex *training_data_buffer_mutex, + std::condition_variable *training_data_buffer_full, + bool *start_training, bool *end_training) { training_data_buffer_mutex->lock(); // Define training as over *end_training = true; @@ -743,15 +954,15 @@ void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_bu *start_training = true; training_data_buffer_mutex->unlock(); training_data_buffer_full->notify_one(); - + if (python_train_thread.joinable()) { python_train_thread.join(); } // Acquire the Python GIL PyGILState_STATE gstate = PyGILState_Ensure(); - //Finalize Python + // Finalize Python Py_FinalizeEx(); } -} //namespace poet \ No newline at end of file +} // namespace poet \ No newline at end of file diff --git a/src/Chemistry/SurrogateModels/AI_functions.hpp b/src/Chemistry/SurrogateModels/AI_functions.hpp index cb6b5432a..4776486fb 100644 --- a/src/Chemistry/SurrogateModels/AI_functions.hpp +++ b/src/Chemistry/SurrogateModels/AI_functions.hpp @@ -17,9 +17,14 @@ #ifndef AI_FUNCTIONS_H #define AI_FUNCTIONS_H +#include +#include #include #include #include "poet.hpp" +extern "C"{ +#include +} // PhreeqC definition of pi clashes with Eigen macros // so we have to temporarily undef it @@ -44,7 +49,7 @@ struct TrainingData { int n_training_runs = 0; }; -// Ony declare the actual functions if flag is set +// Only declare the actual functions if flag is set #ifdef USE_AI_SURROGATE int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir); @@ -71,7 +76,7 @@ int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_mode std::mutex* training_data_buffer_mutex, std::condition_variable* training_data_buffer_full, bool* start_training, bool* end_training, - const RuntimeParameters& params); + const RuntimeParameters& params, naa_handle *handle); void update_weights(EigenModel* model, const std::vector>>& weights); @@ -84,7 +89,6 @@ std::vector Eigen_predict_clustered(const EigenModel& model, const Eigen std::vector Eigen_predict(const EigenModel& model, std::vector> x, int batch_size, std::mutex* Eigen_model_mutex); - // Otherwise, define the necessary stubs #else inline void Python_Keras_setup(std::string, std::string){} @@ -97,7 +101,7 @@ inline void training_data_buffer_append(std::vector>&, inline void cluster_labels_append(std::vector&, std::vector&, std::vector){} inline int Python_Keras_training_thread(EigenModel*, EigenModel*, std::mutex*, TrainingData*, std::mutex*, std::condition_variable*, - bool*, bool*, const RuntimeParameters&){return {};} + bool*, bool*, const RuntimeParameters&, naa_handle*){return {};} inline void update_weights(EigenModel*, const std::vector>>&){} inline std::vector>> Python_Keras_get_weights(std::string){return {};} diff --git a/src/Chemistry/SurrogateModels/serializer.cpp b/src/Chemistry/SurrogateModels/serializer.cpp new file mode 100644 index 000000000..b7fac83d4 --- /dev/null +++ b/src/Chemistry/SurrogateModels/serializer.cpp @@ -0,0 +1,221 @@ +#include "serializer.hpp" +#include "AI_functions.hpp" +#include +#include +#include + +using namespace std; +namespace poet{ +size_t calculateStructSize(void *struct_pointer, char type){ + + size_t struct_size = 0; + if (type == 'E') { + struct_size += sizeof(size_t); // number of matrices + struct_size += + static_cast(struct_pointer)->weight_matrices.size() * + 2 * sizeof(size_t); // dimensions of matrices + struct_size += sizeof(size_t); // number of vectors + struct_size += static_cast(struct_pointer)->biases.size() * + sizeof(size_t); // length of vectors + + for (const Eigen::MatrixXd &matrix : + static_cast(struct_pointer)->weight_matrices) { + // fprintf(stderr, "matrix size: rows:%td, cols: %td\n", matrix.rows(), matrix.cols()); + struct_size += matrix.size() * sizeof(double); + // fprintf(stderr, "matrix size %td\n", matrix.size()); + + } + for (const Eigen::VectorXd &bias : + static_cast(struct_pointer)->biases) { + struct_size += bias.size() * sizeof(double); + // fprintf(stderr, "matrix size %td\n", bias.size()); + + } + } else if (type == 'T') { + } + +return struct_size; + +} + + +int serializeModelWeights(const EigenModel *model, char *memory){ + + size_t num_matrices = model->weight_matrices.size(); + size_t size_counter = 0; + std::memcpy(memory, &num_matrices, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + for (const Eigen::MatrixXd &matrix : model->weight_matrices) { + size_t rows = matrix.rows(), cols = matrix.cols(); + fprintf(stdout, "rows: %zu, cols: %zu\n", rows, cols); + std::memcpy(memory, &rows, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + std::memcpy(memory, &cols, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + std::memcpy(memory, matrix.data(), rows * cols * sizeof(double)); + memory += rows * cols * sizeof(double); + size_counter += rows * cols * sizeof(double); + } + + // Serialisierung der Bias-Vektoren + size_t num_biases = model->biases.size(); + std::memcpy(memory, &num_biases, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + for (const Eigen::VectorXd &bias : model->biases) { + size_t size = bias.size(); + std::memcpy(memory, &size, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + std::memcpy(memory, bias.data(), size * sizeof(double)); + memory += size * sizeof(double); + size_counter += size * sizeof(double); + } + fprintf(stdout, "serializer size: %zu\n", size_counter); + return 0; +} + +// EigenModel deserializeModelWeights(char *memory, size_t buffer_size){ + +// EigenModel deserializedModel; + + +// size_t num_matrices; +// size_t size_counter = 0; +// std::memcpy(&num_matrices, memory, sizeof(size_t)); +// fprintf(stdout, "number of matrices: %zu\n", num_matrices); + +// memory += sizeof(size_t); +// size_counter += sizeof(size_t); +// deserializedModel.weight_matrices.resize(num_matrices); +// for (Eigen::MatrixXd &matrix : deserializedModel.weight_matrices) { +// size_t rows, cols; +// std::memcpy(&rows, memory, sizeof(size_t)); +// memory += sizeof(size_t); +// size_counter += sizeof(size_t); +// std::memcpy(&cols, memory, sizeof(size_t)); +// fprintf(stdout, "rows: %zu, cols: %zu\n", rows, cols); +// memory += sizeof(size_t); +// size_counter += sizeof(size_t); +// fprintf(stdout, "rows before: %td, cols before: %td\n", matrix.rows(), matrix.cols()); +// matrix.resize(rows, cols); +// std::memcpy(matrix.data(), memory, rows * cols * sizeof(double)); + +// memory += rows * cols * sizeof(double); +// size_counter += rows * cols * sizeof(double); +// } +// fprintf(stdout, "deserialized size of matrices: %zu\n", size_counter); +// size_t num_biases; +// std::memcpy(&num_biases, memory, sizeof(size_t)); +// memory += sizeof(size_t); +// size_counter += sizeof(size_t); + +// fprintf(stdout, "number of biases: %zu\n", num_biases); +// deserializedModel.biases.resize(num_biases); +// for (Eigen::VectorXd &bias : deserializedModel.biases) { +// size_t size; +// std::memcpy(&size, memory, sizeof(size_t)); +// fprintf(stdout, "bias length: %zu\n", size); +// memory += sizeof(size_t); +// size_counter += sizeof(size_t); +// bias.resize(size); +// std::memcpy(bias.data(), memory, size * sizeof(double)); +// memory += size * sizeof(double); +// size_counter += size * sizeof(double); +// } +// fprintf(stdout, "deserialized size: %zu\n", size_counter); +// if(size_counter > buffer_size){ +// fprintf(stderr, "buffer corrupted!\n"); +// } +// return deserializedModel; +// } +EigenModel deserializeModelWeights(char *memory, size_t buffer_size) { + EigenModel deserializedModel; + size_t size_counter = 0; + + // Anzahl Matrizen + size_t num_matrices; + if (buffer_size < sizeof(size_t)) { + fprintf(stderr, "Buffer too small.\n"); + return deserializedModel; + } + std::memcpy(&num_matrices, memory, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + deserializedModel.weight_matrices.resize(num_matrices); + + // Matrizen deserialisieren + for (Eigen::MatrixXd &matrix : deserializedModel.weight_matrices) { + size_t rows, cols; + + // Buffer-Check + if (size_counter + 2 * sizeof(size_t) > buffer_size) { + fprintf(stderr, "Buffer too small for matrix dimensions.\n"); + return deserializedModel; + } + + std::memcpy(&rows, memory, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + + std::memcpy(&cols, memory, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + + if (size_counter + rows * cols * sizeof(double) > buffer_size) { + fprintf(stderr, "Buffer too small for matrix data.\n"); + return deserializedModel; + } + + // Kopiere Daten in neue Matrix + Eigen::MatrixXd temp = Eigen::Map( + reinterpret_cast(memory), rows, cols); + matrix = temp; // Kopieren der Daten + memory += rows * cols * sizeof(double); + size_counter += rows * cols * sizeof(double); + } + + // Anzahl Biases + size_t num_biases; + if (size_counter + sizeof(size_t) > buffer_size) { + fprintf(stderr, "Buffer too small for biases.\n"); + return deserializedModel; + } + std::memcpy(&num_biases, memory, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + deserializedModel.biases.resize(num_biases); + + // Biases deserialisieren + for (Eigen::VectorXd &bias : deserializedModel.biases) { + size_t size; + if (size_counter + sizeof(size_t) > buffer_size) { + fprintf(stderr, "Buffer too small for bias size.\n"); + return deserializedModel; + } + + std::memcpy(&size, memory, sizeof(size_t)); + memory += sizeof(size_t); + size_counter += sizeof(size_t); + + if (size_counter + size * sizeof(double) > buffer_size) { + fprintf(stderr, "Buffer too small for bias data.\n"); + return deserializedModel; + } + + // Kopiere Daten in neuen Vektor + Eigen::VectorXd temp = Eigen::Map( + reinterpret_cast(memory), size); + bias = temp; // Kopieren der Daten + memory += size * sizeof(double); + size_counter += size * sizeof(double); + } + + return deserializedModel; +} + + +} \ No newline at end of file diff --git a/src/Chemistry/SurrogateModels/serializer.hpp b/src/Chemistry/SurrogateModels/serializer.hpp new file mode 100644 index 000000000..3c8dbfe61 --- /dev/null +++ b/src/Chemistry/SurrogateModels/serializer.hpp @@ -0,0 +1,56 @@ +#ifndef SERIALIZER_H +#define SERIALIZER_H + +#include "AI_functions.hpp" +#include + +namespace poet{ + +/** + * @brief Serialize the weights and biases of the model into a memory location + * to send them via RDMA + * + * @param model: Struct of EigenModel containing the weights and biases of the + * model + * @param memory: Pointer to the memory location where the serialized data will + * be stored + * @return int: 0 if the serialization was successful, -1 otherwise + */ +int serializeModelWeights(const EigenModel *model, char *memory); + +/** + * @brief Deserialize the weights and biases of the model from a memory location + * + * @param data Pointer to the memory location where the serialized data is stored + * @return EigenModel struct containing the weights and biases of the model + */ +EigenModel deserializeModelWeights(char* memory, size_t buffer_size); + +/** + * @brief Serialize the training data into a memory location to send it via RDMA + * + * @param data Struct of TrainingData containing the training data + * @param memory + * @return std::vector + */ +int serializeTrainingData(const TrainingData& data, void *memory); + +/** + * @brief Deserialize the training data from a memory location + * + * @param data Pointer to the memory location where the serialized data is stored + * @return TrainingData struct containing the training data + */ +TrainingData deserializeTrainingData(void* data); + +/** + * @brief Calculates the size of stored elements in the EigenModel and TrainData + * structs + * + * @param struct_pointer: pointer to the struct + * @param type: determines the struct type given: E for EigenModel and T TrainData + * @return size_t: size of stored elements + */ +size_t calculateStructSize(void* struct_pointer, char type); +} +#endif \ No newline at end of file