update naa functions

This commit is contained in:
Hannes Signer 2025-02-13 13:38:28 +01:00
parent d6338b7cae
commit cc7e70f3f1
5 changed files with 66 additions and 79 deletions

View File

@ -12,6 +12,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
include("CMake/POET_Scripts.cmake")
include(FetchContent)
@ -22,7 +23,7 @@ get_poet_version()
find_package(MPI REQUIRED)
find_package(RRuntime REQUIRED)
set(USE_NAA "USE_NAA" ON)
set(USE_NAA ON)
FetchContent_Declare(naa-communication-prototype
GIT_REPOSITORY gfz:naaice/naa-communication-prototype
GIT_TAG cmake-version)
@ -33,6 +34,7 @@ if(USE_NAA)
set(TARGET_IP "10.3.10.42")
endif()
add_subdirectory(src)
option(POET_PREPROCESS_BENCHS "Preprocess benchmarks" ON)

View File

@ -13,6 +13,7 @@ def initiate_model(model_file_path):
def prediction_step(model, model_reactive, x, cluster_labels, batch_size):
# Catch input size mismatches
model_input_shape = model.input_shape[1:]
x = np.transpose(x)
if model_input_shape != x.shape[1:]:
print(f"Input data size {x.shape[1:]} does not match model input size {model_input_shape}",
flush=True)
@ -33,8 +34,17 @@ def get_weights(model):
weights = model.get_weights()
return weights
def set_weights(model, weights):
for i in range(len(weights)):
weights[i] = np.squeeze(weights[i])
model.set_weights(weights)
print("Weight succesful set!")
return 0
def training_step(model, x, y, batch_size, epochs,
output_file_path):
x = np.transpose(x)
y = np.transpose(y)
history = model.fit(x, y,
epochs=epochs,
batch_size=batch_size)

View File

@ -92,16 +92,16 @@ int Python_Keras_load_model(std::string model, std::string model_reactive,
* @return Numpy representation of the input vector
*/
PyObject* vector_to_numpy_array(const std::vector<std::vector<double>>& field) {
npy_intp dims[2] = {static_cast<npy_intp>(field.size()), // Zeilenanzahl
static_cast<npy_intp>(field[0].size())}; // Spaltenanzahl
npy_intp dims[2] = {static_cast<npy_intp>(field.size()), // rows
static_cast<npy_intp>(field[0].size())}; // cols
PyObject* np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64);
double* data = static_cast<double*>(PyArray_DATA((PyArrayObject*)np_array));
// Schreibe die Daten in das Numpy-Array (korrekte Reihenfolge)
// write data to numpy array
for (size_t i = 0; i < field.size(); ++i) {
for (size_t j = 0; j < field[i].size(); ++j) {
data[i * field[0].size() + j] = field[i][j]; // Korrekte Indizes
data[i * field[0].size() + j] = field[i][j];
}
}
@ -666,6 +666,21 @@ void parallel_training(EigenModel *Eigen_model,
}
}
void checkSumCppWeights(std::vector<std::vector<std::vector<double>>> cpp_weights, std::string model_name){
double checksum_cpp_weights = 0;
for(size_t i = 0; i < cpp_weights.size(); i++){
for(size_t j = 0; j < cpp_weights[i].size(); j++){
for(size_t k = 0; k < cpp_weights[i][j].size(); k++){
checksum_cpp_weights += cpp_weights[i][j][k];
if(cpp_weights[i][j][k] > 10){
fprintf(stdout, "Weight %s: %f\n", model_name.c_str(), cpp_weights[i][j][k]);
}
}
}
}
fprintf(stdout, "Checksum cpp weights %s: %f\n", model_name.c_str(), checksum_cpp_weights);
}
void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
std::mutex *Eigen_model_mutex,
TrainingData *training_data_buffer,
@ -680,7 +695,7 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
// initialize models with weights from pretrained keras model
// declare memory regions for model weights, training and target data
PyGILState_STATE gstate = PyGILState_Ensure();
Eigen_model_mutex->lock();
std::vector<std::vector<std::vector<double>>> modelWeight =
@ -694,7 +709,7 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
}
Eigen_model_mutex->unlock();
PyGILState_Release(gstate);
// Initialize training data input and targets
std::vector<std::vector<double>> inputs(
@ -850,48 +865,6 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
int res1 = serializeTrainingData(&inputs, serializedTrainingData);
int res2 = serializeTrainingData(&targets, serializedTargetData);
// std::vector<std::vector<double>> deserializedTrainingData = deserializeTrainingData(serializedTrainingData);
// std::vector<std::vector<double>> deserializedTargetData = deserializeTrainingData(serializedTargetData);
// calculate checksum of inputs
// double checksum_inputs = 0;
// for (size_t i = 0; i < inputs.size(); i++) {
// for (size_t j = 0; j < inputs[i].size(); j++) {
// checksum_inputs += inputs[i][j];
// // fprintf(stdout, "inputs: %f\n", inputs[i][j]);
// }
// }
// // calculate checksum of inputs
// double checksum_targets = 0;
// for (size_t i = 0; i < targets.size(); i++) {
// for (size_t j = 0; j < targets[i].size(); j++) {
// checksum_targets += targets[i][j];
// // fprintf(stdout, "inputs: %f\n", inputs[i][j]);
// }
// }
// double checksum_training = 0;
// for (size_t i = 0; i < deserializedTrainingData.size(); i++) {
// for (size_t j = 0; j < deserializedTrainingData[i].size(); j++) {
// checksum_training += deserializedTrainingData[i][j];
// // fprintf(stdout, "inputs: %f\n", deserializedTrainingData[i][j]);
// }
// }
// double checksum_testing = 0;
// for (size_t i = 0; i < deserializedTargetData.size(); i++) {
// for (size_t j = 0; j < deserializedTargetData[i].size(); j++) {
// checksum_testing += deserializedTargetData[i][j];
// // fprintf(stdout, "inputs: %f\n", deserializedTrainingData[i][j]);
// }
// }
// fprintf(stdout, "Checksum inputs: %f\n", checksum_inputs);
// fprintf(stdout, "Checksum training: %f\n", checksum_training);
// fprintf(stdout, "Checksum targets: %f\n", checksum_targets);
// fprintf(stdout, "Checksum testing: %f\n", checksum_testing);
printf("-- RPC Invocation --\n");
if (naa_invoke(handle)) {
@ -922,24 +895,22 @@ void naa_training(EigenModel *Eigen_model, EigenModel *Eigen_model_reactive,
std::vector<std::vector<std::vector<double>>> cpp_weights =
Python_Keras_get_weights(model_name);
fprintf(stdout, "size of cpp weights: %zu\n", cpp_weights.size());
for(size_t i = 0; i<cpp_weights.size(); i++){
fprintf(stdout, "size of cpp weights: %zu\n", cpp_weights[i].size());
fprintf(stdout, "size of cpp weights: %zu\n", cpp_weights[i][0].size());
}
Python_Keras_get_weights("model");
checkSumCppWeights(cpp_weights, "before");
Python_keras_set_weights(model_name, cpp_weights);
size_t size_cpp_weights = calculateStructSize(&cpp_weights, 'C');
// fprintf(stdout, "size of cpp weight vector: %zu\n", size_cpp_weights);
char *serializedCPPData = (char *)calloc(size_cpp_weights, sizeof(char));
int res = serializeCPPWeights(cpp_weights, serializedCPPData);
std::vector<std::vector<std::vector<double>>> cpp_weights_deserialized =
deserializeCPPWeights(serializedCPPData);
checkSumCppWeights(cpp_weights_deserialized, "after");
// for (int i = 0; i < Eigen_model->weight_matrices[0].rows(); i++) {
// for (int j = 0; j < Eigen_model->weight_matrices[0].cols(); j++) {
// fprintf(stdout, "model: %f, deserializedModel: %f\n",
// Eigen_model->weight_matrices[0](i, j),
// deserializedModel.weight_matrices[0](i, j));
// }
// }
Python_keras_set_weights(model_name, cpp_weights_deserialized);
// TODO: switch from EigenModel to cpp_weighta
}
printf("-- Cleaning Up --\n");
@ -1160,9 +1131,6 @@ int Python_keras_set_weights(std::string model_name, std::vector<std::vector<std
PyObject* shape_str = PyObject_Repr(shape); // Get a string representation of the shape
PyObject* shape_utf8 = PyUnicode_AsEncodedString(shape_str, "utf-8", "strict");
const char* shape_bytes = PyBytes_AS_STRING(shape_utf8);
// Print the shape
std::cout << "Shape of numpy array at index " << i << ": " << shape_bytes << std::endl;
// Clean up
Py_DECREF(shape);

View File

@ -31,6 +31,7 @@
#include <Eigen/src/Core/util/Constants.h>
#include <chrono>
#include "poet.hpp"
#include "tug/Boundary.hpp"
#include "tug/Grid.hpp"
#include "tug/Simulation.hpp"

View File

@ -329,17 +329,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
Python_Keras_load_model(R["model_file_path"], R["model_reactive_file_path"],
params.use_clustering);
if (!params.disable_training) {
MSG("AI: Initialize training thread");
// TODO add naa_handle as optional parameter which is NULL per default
Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive,
&Eigen_model_mutex, &training_data_buffer,
&training_data_buffer_mutex,
&training_data_buffer_full, &start_training,
&end_training, params, handle);
}
if (!params.use_Keras_predictions) {
if (!params.use_Keras_predictions || params.use_naa) {
// Initialize Eigen model for custom inference function
MSG("AI: Use custom C++ prediction function");
// Get Keras weights from Python
@ -373,6 +364,17 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
update_weights(&Eigen_model_reactive, cpp_weights);
}
}
if (!params.disable_training) {
MSG("AI: Initialize training thread");
Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive,
&Eigen_model_mutex, &training_data_buffer,
&training_data_buffer_mutex,
&training_data_buffer_full, &start_training,
&end_training, params, handle);
}
MSG("AI: Surrogate model initialized");
}
@ -487,6 +489,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
std::vector<std::vector<double>> invalid_x =
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
R.parseEval("target_scaled <- preprocess(state_C[ai_surrogate_species])");
// R.parseEval("print(head(state_C[ai_surrogate_species], 30))");
std::vector<std::vector<double>> invalid_y =
@ -500,6 +504,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// count buffer size according to the cluster assignments
int n_cluster_reactive = 0;
size_t buffer_size = training_data_buffer.x[0].size();
fprintf(stdout, "Buffer size: %zu\n", buffer_size);
if (params.use_clustering) {
cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels,
R["validity_vector"]);
@ -512,6 +517,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
(buffer_size - n_cluster_reactive >= params.training_data_size)) {
start_training = true;
training_data_buffer_full.notify_one();
fprintf(stdout, "Signaling training thread\n");
}
training_data_buffer_mutex.unlock();
}
@ -762,7 +768,7 @@ int main(int argc, char *argv[]) {
}
MSG("Init done on process with rank " + std::to_string(MY_RANK));
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
MSG("finished simulation loop");