first draft for naa_training

This commit is contained in:
Hannes Signer 2024-12-23 23:29:04 +01:00
parent 05e8f11d82
commit d6d6db8b93

View File

@ -6,6 +6,7 @@
#include <Eigen/Dense> #include <Eigen/Dense>
#include <Eigen/src/Core/Matrix.h> #include <Eigen/src/Core/Matrix.h>
#include <Python.h> #include <Python.h>
#include <Rmath.h>
#include <condition_variable> #include <condition_variable>
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
@ -477,7 +478,7 @@ void Python_Keras_train(std::vector<std::vector<double>> &x,
} }
} }
// Choose the correct model to train if clustering is used // Choose the correct model to traimn if clustering is used
if (train_cluster == 1) { if (train_cluster == 1) {
if (!model_path.empty()) { if (!model_path.empty()) {
model_path.insert(model_path.length() - 6, "_reaction"); model_path.insert(model_path.length() - 6, "_reaction");
@ -547,6 +548,9 @@ void parallel_training(EigenModel *Eigen_model,
// does NOT // does NOT
// wait for a signal on training_data_buffer_full but starts the next // wait for a signal on training_data_buffer_full but starts the next
// round immediately. // round immediately.
// n_cluster_reactive: number of elements in the reactive cluster
// buffer_size: size of the whole buffer of training data
// params.training_data_size: number of elements required to start online training
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex); std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
training_data_buffer_full->wait( training_data_buffer_full->wait(
lock, [start_training] { return *start_training; }); lock, [start_training] { return *start_training; });
@ -564,12 +568,7 @@ void parallel_training(EigenModel *Eigen_model,
training_data_buffer->x.size(), training_data_buffer->x.size(),
std::vector<double>(params.training_data_size)); std::vector<double>(params.training_data_size));
fprintf(stdout, "x.size: %zu\n", training_data_buffer->x.size());
fprintf(stdout, "params.training_data_size: %d\n", params.training_data_size);
int buffer_size = training_data_buffer->x[0].size(); int buffer_size = training_data_buffer->x[0].size();
fprintf(stdout, "training_data_buffer->x[0].size: %zu\n", training_data_buffer->x[0].size());
sleep(10);
// If clustering is used, check the current cluster // If clustering is used, check the current cluster
int n_cluster_reactive = 0; int n_cluster_reactive = 0;
@ -579,8 +578,7 @@ void parallel_training(EigenModel *Eigen_model,
for (size_t i = 0; i < buffer_size; i++) { for (size_t i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer->cluster_labels[i]; n_cluster_reactive += training_data_buffer->cluster_labels[i];
} }
// ? set train_cluster to true only if all labels in training_data_buffer
// have the corresponding cluster_label?
train_cluster = n_cluster_reactive >= params.training_data_size; train_cluster = n_cluster_reactive >= params.training_data_size;
} }
int buffer_row = 0; int buffer_row = 0;
@ -609,14 +607,18 @@ void parallel_training(EigenModel *Eigen_model,
} }
// Set the waiting predicate to immediately continue training if enough // Set the waiting predicate to immediately continue training if enough
// elements of any cluster remain // elements of any cluster remain
if (train_cluster == 1) { if (train_cluster == 1) {
*start_training = *start_training =
// if clustering is active, check if after one training run still
// enough enough data of at least one cluster is left
((n_cluster_reactive - params.training_data_size) >= ((n_cluster_reactive - params.training_data_size) >=
params.training_data_size) || params.training_data_size) ||
((buffer_size - n_cluster_reactive) >= params.training_data_size); ((buffer_size - n_cluster_reactive) >= params.training_data_size);
} else { } else {
*start_training = *start_training =
// if no clustering is active, check if there are still
// enough data for another training run
(buffer_size - n_cluster_reactive - params.training_data_size) >= (buffer_size - n_cluster_reactive - params.training_data_size) >=
params.training_data_size; params.training_data_size;
} }
@ -666,80 +668,197 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
std::condition_variable *training_data_buffer_full, std::condition_variable *training_data_buffer_full,
bool *start_training, bool *end_training, bool *start_training, bool *end_training,
const RuntimeParameters &params, naa_handle *handle){ const RuntimeParameters &params, naa_handle *handle){
Eigen_model_mutex->lock();
training_data_buffer_mutex->lock(); // initialize models with weights from pretrained keras model
size_t model_size = calculateStructSize(Eigen_model, 'E'); // declare memory regions for model weights, training and target data
// TODO: initialize models with weights from keras model
std::string model_name = "model";
std::vector<std::vector<std::vector<double>>> model_weight =
Python_Keras_get_weights(model_name);
update_weights(Eigen_model, model_weight);
fprintf(stdout, "example weight: %f\n",
Eigen_model->weight_matrices[0](0, 0));
model_size = calculateStructSize(Eigen_model, 'E');
// Initialize training data input and targets
fprintf(stdout, "size after: %zu\n", model_size); std::vector<std::vector<double>> inputs(
char *serializedData = (char *)calloc(model_size, sizeof(char)); training_data_buffer->x.size(),
if (serializedData == NULL) { std::vector<double>(params.training_data_size));
exit(EXIT_FAILURE); std::vector<std::vector<double>> targets(
} training_data_buffer->x.size(),
int res = serializeModelWeights(Eigen_model, serializedData); std::vector<double>(params.training_data_size));
struct naa_param_t weight_region[] = {(void*) serializedData, model_size}; PyGILState_STATE gstate = PyGILState_Ensure();
Eigen_model_mutex->lock();
printf("-- Setting Up Connection --\n");
if (naa_create(1, weight_region, 1, std::vector<std::vector<std::vector<double>>> modelWeight =
weight_region, 0, handle)) { Python_Keras_get_weights("model");
std::vector<std::vector<std::vector<double>>> modelWeightReactive;
update_weights(Eigen_model, modelWeight);
if(params.use_clustering == true){
modelWeightReactive = Python_Keras_get_weights("model_reactive"); // ? correct
update_weights(Eigen_model_reactive, modelWeightReactive);
}
Eigen_model_mutex->unlock();
PyGILState_Release(gstate);
// determine size for reuired memory regions
size_t modelSize = calculateStructSize(Eigen_model, 'E');
size_t modelSizeReactive = calculateStructSize(Eigen_model_reactive, 'E');
modelSize = modelSize > modelSizeReactive ? modelSize : modelSizeReactive;
size_t trainingDataSize = calculateStructSize(&inputs, 'T');
size_t targetDataSize = calculateStructSize(&targets, 'T');
std::cout << "model size: " << modelSize << std::endl;
std::cout << "training data size: " << trainingDataSize << std::endl;
std::cout << "target data size: " << targetDataSize << std::endl;
char *serializedModel = (char *)calloc(modelSize, sizeof(char));
if (serializedModel == NULL) {
exit(EXIT_FAILURE);
}
char *serializedTrainingData = (char *)calloc(trainingDataSize, sizeof(char));
if (serializedTrainingData == NULL) {
exit(EXIT_FAILURE);
}
char *serializedTargetData = (char *)calloc(targetDataSize, sizeof(char));
if (serializedTargetData == NULL) {
exit(EXIT_FAILURE);
}
// create memory regions
struct naa_param_t weight_region[] = {
{(void *)serializedModel, modelSize},
{(void *)serializedTrainingData, trainingDataSize},
{(void *)serializedTargetData, targetDataSize}};
printf("-- Setting Up Connection --\n");
// function code encode the used ai model
if (naa_create(1, weight_region, 1, weight_region, 0, handle)) {
fprintf(stderr, "Error during naa_create. Exiting.\n"); fprintf(stderr, "Error during naa_create. Exiting.\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
}
while(true){
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
training_data_buffer_full->wait(
lock, [start_training] { return *start_training; });
// Return if program is about to end
if (*end_training) {
return;
} }
printf("-- RPC Invocation --\n"); // Get the necessary training data
if (naa_invoke(handle)) { std::cout << "AI: Training thread: Getting training data" << std::endl;
fprintf(stderr, "Error during naa_invoke. Exiting.\n"); int buffer_size = training_data_buffer->x[0].size();
exit(EXIT_FAILURE);
// If clustering is used, check the current cluster
int n_cluster_reactive = 0;
int train_cluster =
-1; // Default value for non clustered training (all data is used)
if (params.use_clustering) {
for (size_t i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer->cluster_labels[i];
}
train_cluster = n_cluster_reactive >= params.training_data_size;
}
int buffer_row = 0;
int copied_row = 0;
while (copied_row < params.training_data_size) {
if ((train_cluster == -1) ||
(train_cluster == training_data_buffer->cluster_labels[buffer_row])) {
for (size_t col = 0; col < training_data_buffer->x.size(); col++) {
// Copy and remove from training data buffer
inputs[col][copied_row] = training_data_buffer->x[col][buffer_row];
targets[col][copied_row] = training_data_buffer->y[col][buffer_row];
training_data_buffer->x[col].erase(
training_data_buffer->x[col].begin() + buffer_row);
training_data_buffer->y[col].erase(
training_data_buffer->y[col].begin() + buffer_row);
}
// Remove from cluster label buffer
if (params.use_clustering) {
training_data_buffer->cluster_labels.erase(
training_data_buffer->cluster_labels.begin() + buffer_row);
}
copied_row++;
} else {
buffer_row++;
}
} }
// TODO: naa_wait with new weights // Set the waiting predicate to immediately continue training if enough
// elements of any cluster remain
if (train_cluster == 1) {
*start_training =
// if clustering is active, check if after one training run still
EigenModel deserializedModel = deserializeModelWeights(serializedData, model_size); // enough enough data of at least one cluster is left
fprintf(stdout, "After deserialization: %f\n", deserializedModel.weight_matrices[0](0,0)); ((n_cluster_reactive - params.training_data_size) >=
params.training_data_size) ||
for(int i=0;i<Eigen_model->weight_matrices[0].rows();i++){ ((buffer_size - n_cluster_reactive) >= params.training_data_size);
for (int j=0;j<Eigen_model->weight_matrices[0].cols();j++){ } else {
fprintf(stdout, "model: %f, deserializedModel: %f\n", *start_training =
Eigen_model->weight_matrices[0](i, j), deserializedModel.weight_matrices[0](i,j)); // if no clustering is active, check if there are still
// enough data for another training run
(buffer_size - n_cluster_reactive - params.training_data_size) >=
params.training_data_size;
} }
}
free(serializedData); // update number of training runs
serializedData = nullptr; training_data_buffer->n_training_runs += 1;
// naa_finalize: clean up connection. // Unlock the training_data_buffer_mutex
printf("-- Cleaning Up --\n"); lock.unlock();
naa_finalize(handle);
// fprintf(stdout, "after free\n");
// TODO: determine the struct sizes of EigenModel and TrainData
// TODO: initialize RDMA memory regions (two sections: model weights and training data)
// TODO: establish connection to NAA server
// TODO: if training buffer is full // initialize models with weights from pretrained keras model
std::string model_name = "model";
if (train_cluster == 1) {
model_name = "model_reactive";
}
std::cout << "AI: Training thread: Start training " << model_name
<< std::endl;
// data serializatoin
// three memory regions: model weights, predicted data, true data
// model weight region is an input and output memory region
// TODO: serialize EigenModel and TrainData if(train_cluster == 1){
// TODO: run invoke call from naaice API int res = serializeModelWeights(Eigen_model_reactive, serializedModel);
// TODO: wait for results } else {
// TODO: update model weights (dont forget locking) int res = serializeModelWeights(Eigen_model, serializedModel);
// TODO: wait for next training buffer iteration }
int res1 = serializeTrainingData(&inputs, serializedTrainingData);
Eigen_model_mutex->unlock(); int res2 = serializeTrainingData(&targets, serializedTargetData);
training_data_buffer_mutex->unlock();
printf("-- RPC Invocation --\n");
if (naa_invoke(handle)) {
fprintf(stderr, "Error during naa_invoke. Exiting.\n");
exit(EXIT_FAILURE);
}
// TODO: naa_wait with new weights
// TODO: update model weights with received weights
EigenModel deserializedModel =
deserializeModelWeights(serializedModel, modelSize);
fprintf(stdout, "After deserialization: %f\n",
deserializedModel.weight_matrices[0](0, 0));
for (int i = 0; i < Eigen_model->weight_matrices[0].rows(); i++) {
for (int j = 0; j < Eigen_model->weight_matrices[0].cols(); j++) {
fprintf(stdout, "model: %f, deserializedModel: %f\n",
Eigen_model->weight_matrices[0](i, j),
deserializedModel.weight_matrices[0](i, j));
}
}
}
printf("-- Cleaning Up --\n");
naa_finalize(handle);
free(serializedModel);
free(serializedTrainingData);
free(serializedTargetData);
} }
std::thread python_train_thread; std::thread python_train_thread;