test measurements

This commit is contained in:
Hannes Signer 2025-03-05 18:00:43 +01:00
parent cc7e70f3f1
commit 7c1f08bcad
2 changed files with 88 additions and 15 deletions

View File

@ -23,15 +23,21 @@ get_poet_version()
find_package(MPI REQUIRED)
find_package(RRuntime REQUIRED)
set(USE_NAA ON)
option(USE_NAA "Use NAA for communication" OFF)
FetchContent_Declare(naa-communication-prototype
GIT_REPOSITORY gfz:naaice/naa-communication-prototype
GIT_TAG cmake-version)
FetchContent_MakeAvailable(naa-communication-prototype)
set(SOURCE_IP "10.3.10.41")
set(TARGET_IP "10.3.10.42")
if(USE_NAA)
FetchContent_MakeAvailable(naa-communication-prototype)
set(SOURCE_IP "10.3.10.41")
set(TARGET_IP "10.3.10.42")
set(USE_NAA_VALUE true)
else()
set(USE_NAA_VALUE false)
endif()

View File

@ -7,6 +7,7 @@
#include <Eigen/src/Core/Matrix.h>
#include <Python.h>
#include <Rmath.h>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <cstdio>
@ -19,6 +20,9 @@
#include <thread>
#include <vector>
#define HOST_MEASUREMENT 0
#define NAA_MEASUREMENT 1
using namespace std;
namespace poet {
@ -438,7 +442,8 @@ void training_data_buffer_append(
void cluster_labels_append(std::vector<int> &labels_buffer,
std::vector<int> &new_labels,
std::vector<int> validity) {
// Calculate new buffer size from number of valid elements in mask
// Calculate new buffer
// fprintf(logfile, "repetition region_sizes \n");size from number of valid elements in mask
int n_invalid = validity.size();
for (size_t i = 0; i < validity.size(); i++) {
n_invalid -= validity[i];
@ -542,6 +547,18 @@ void parallel_training(EigenModel *Eigen_model,
std::condition_variable *training_data_buffer_full,
bool *start_training, bool *end_training,
const RuntimeParameters &params) {
#ifdef HOST_MEASUREMENT
FILE *logfile = fopen("host_measurements.txt", "a");
if (logfile == NULL) {
logfile = fopen("host_measurements.txt", "w");
fprintf(logfile, "repetition host_time\n");
} else {
logfile = fopen("host_measurements.txt", "a");
}
#endif
while (true) {
// Conditional waiting:
// - Sleeps until a signal arrives on training_data_buffer_full
@ -555,6 +572,9 @@ void parallel_training(EigenModel *Eigen_model,
// n_cluster_reactive: number of elements in the reactive cluster
// buffer_size: size of the whole buffer of training data
// params.training_data_size: number of elements required to start online training
const auto start_t = std::chrono::high_resolution_clock::now();
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
training_data_buffer_full->wait(
lock, [start_training] { return *start_training; });
@ -659,6 +679,17 @@ void parallel_training(EigenModel *Eigen_model,
Eigen_model_mutex->unlock();
}
#ifdef HOST_MEASUREMENT
auto end_t = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds difference = end_t - start_t;
fprintf(logfile, "%d, %ld\n", training_data_buffer->n_training_runs, difference.count());
if(training_data_buffer->n_training_runs == 10){
fclose(logfile);
exit(0);
}
#endif
// Release the Python GIL
PyGILState_Release(gstate);
std::cout << "AI: Training thread: Finished training, waiting for new data"
@ -691,11 +722,19 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
fprintf(stdout, "In naa_training\n");
#ifdef NAA_MEASUREMENT
FILE *logfile = fopen("naa_measurements.txt", "a");
if (logfile == NULL) {
logfile = fopen("naa_measurements.txt", "w");
fprintf(logfile, "repetition, model_size, training_data_size, target_data_size, serialization_time\n");
} else {
logfile = fopen("naa_measurements.txt", "a");
}
#endif
// initialize models with weights from pretrained keras model
// declare memory regions for model weights, training and target data
Eigen_model_mutex->lock();
std::vector<std::vector<std::vector<double>>> modelWeight =
@ -706,6 +745,9 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
if(params.use_clustering == true){
modelWeightReactive = Python_Keras_get_weights("model_reactive"); // ? correct
update_weights(Eigen_model_reactive, modelWeightReactive);
const auto start_t = std::chrono::high_resolution_clock::now();
}
Eigen_model_mutex->unlock();
@ -732,6 +774,9 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
std::cout << "training data size: " << trainingDataSize << std::endl;
std::cout << "target data size: " << targetDataSize << std::endl;
// TODO: change to floats and use vectors instead of calloc!!!
auto start_calloc_t = std::chrono::high_resolution_clock::now();
char *serializedModel = (char *)calloc(modelSize, sizeof(char));
if (serializedModel == NULL) {
exit(EXIT_FAILURE);
@ -744,6 +789,8 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
if (serializedTargetData == NULL) {
exit(EXIT_FAILURE);
}
auto end_calloc_t = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds difference_calloc = end_calloc_t - start_calloc_t;
// create memory regions
struct naa_param_t input_regions[] = {
@ -846,11 +893,14 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
// three memory regions: model weights, predicted data, true data
// model weight region is an input and output memory region
auto start_serialization_weights_t = std::chrono::high_resolution_clock::now();
if(train_cluster == 1){
int res = serializeModelWeights(Eigen_model_reactive, serializedModel);
} else {
int res = serializeModelWeights(Eigen_model, serializedModel);
}
auto end_serialization_weights_t = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds difference_serialization_weights = end_serialization_weights_t - start_serialization_weights_t;
// checksum serializeModel
double checksum_model = 0;
@ -863,8 +913,11 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
fprintf(stdout, "Checksum model: %f\n", checksum_model);
auto start_serialization_data_t = std::chrono::high_resolution_clock::now();
int res1 = serializeTrainingData(&inputs, serializedTrainingData);
int res2 = serializeTrainingData(&targets, serializedTargetData);
auto end_serialization_data_t = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds difference_serialization_data = end_serialization_data_t - start_serialization_data_t;
printf("-- RPC Invocation --\n");
if (naa_invoke(handle)) {
@ -883,6 +936,7 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
status.bytes_received, status.naa_error);
// update model weights with received weights
auto start_deserialization_t = std::chrono::high_resolution_clock::now();
EigenModel deserializedModel =
deserializeModelWeights(serializedModel, modelSize);
@ -893,24 +947,37 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
Eigen_model_mutex->unlock();
auto end_deserialization_t = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds difference_deserialization_t = end_deserialization_t - start_deserialization_t;
std::vector<std::vector<std::vector<double>>> cpp_weights =
Python_Keras_get_weights("model");
checkSumCppWeights(cpp_weights, "before");
auto start_deserialization2_t = std::chrono::high_resolution_clock::now();
size_t size_cpp_weights = calculateStructSize(&cpp_weights, 'C');
// fprintf(stdout, "size of cpp weight vector: %zu\n", size_cpp_weights);
char *serializedCPPData = (char *)calloc(size_cpp_weights, sizeof(char));
int res = serializeCPPWeights(cpp_weights, serializedCPPData);
std::vector<std::vector<std::vector<double>>> cpp_weights_deserialized =
deserializeCPPWeights(serializedCPPData);
checkSumCppWeights(cpp_weights_deserialized, "after");
// checkSumCppWeights(cpp_weights_deserialized, "after");
Python_keras_set_weights(model_name, cpp_weights_deserialized);
auto end_deserialization2_t = std::chrono::high_resolution_clock::now();
std::chrono::nanoseconds update_keras_model = end_deserialization2_t - start_deserialization2_t;
// TODO: switch from EigenModel to cpp_weighta
#ifdef NAA_MEASUREMENT
std::chrono::nanoseconds difference_serialization =
difference_calloc + difference_serialization_weights +
difference_serialization_data + difference_deserialization_t +
update_keras_model;
fprintf(logfile, "%d, %zu, %zu, %zu, %ld\n", training_data_buffer->n_training_runs, modelSize, trainingDataSize, targetDataSize, difference_serialization.count());
if(training_data_buffer->n_training_runs == 10){
fclose(logfile);
exit(0);
}
#endif
}
printf("-- Cleaning Up --\n");