From 3e0c931c4169f4b503373641e9562f4d744ad1c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbke?= Date: Wed, 16 Dec 2020 14:15:14 +0100 Subject: [PATCH] added Grid Class --- src/CMakeLists.txt | 4 +-- src/dht_wrapper.cpp | 37 +++++++++++---------------- src/dht_wrapper.h | 6 ++--- src/kin.cpp | 48 +++++++++++++++++------------------ src/model/Grid.cpp | 61 +++++++++++++++++++++++++++++++++++++++++++++ src/model/Grid.h | 32 ++++++++++++++++++++++++ src/worker.cpp | 45 +++++++++++++++++++-------------- src/worker.h | 3 ++- 8 files changed, 164 insertions(+), 72 deletions(-) create mode 100644 src/model/Grid.cpp create mode 100644 src/model/Grid.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 35b93c905..4eb121d63 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,7 +10,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) find_package(MPI REQUIRED) -set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS -Wall -O3") +set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS") add_definitions(${GCC_CXX_FLAGS}) # prepare R environment (Rcpp + RInside) @@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS}) #define program libraries -add_library(POET_Libs OBJECT util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp) +add_library(POET_Libs STATIC util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp model/Grid.cpp) target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS}) target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto) diff --git a/src/dht_wrapper.cpp b/src/dht_wrapper.cpp index 5042cfb6e..832aea641 100644 --- a/src/dht_wrapper.cpp +++ b/src/dht_wrapper.cpp @@ -39,20 +39,20 @@ uint64_t get_md5(int key_size, void *key) { return retval; } -double Round_off(RRuntime R, double N, double n) { - double result; - R["roundsig"] = n; - R["roundin"] = N; - - result = R.parseEval("signif(roundin, digits=roundsig)"); - - return result; -} +//double Round_off(double N, double n) { +// double result; +// R["roundsig"] = n; +// R["roundin"] = N; +// +// result = R.parseEval("signif(roundin, digits=roundsig)"); +// +// return result; +//} /* * Stores fuzzed version of key in fuzzing_buffer */ -void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) { +void fuzz_for_dht(int var_count, void *key, double dt) { unsigned int i = 0; //introduce fuzzing to allow more hits in DHT for (i = 0; i < (unsigned int)var_count; i++) { @@ -64,15 +64,12 @@ void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) { else if (((double *)key)[i] == 0) fuzzing_buffer[i] = 0; else - //fuzzing_buffer[i] = Round_off(R, std::log10(((double *)key)[i]), dht_significant_digits_vector[i] - 1); fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])), dht_significant_digits_vector[i]); } else { //without log10 - //fuzzing_buffer[i] = Round_off(R, ((double *)key)[i], dht_significant_digits_vector[i]); fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]); } } else if (prop_type_vector[i] == "logact") { - //fuzzing_buffer[i] = Round_off(R, ((double *)key)[i], dht_significant_digits_vector[i]); fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]); } else if (prop_type_vector[i] == "ignore") { fuzzing_buffer[i] = 0; @@ -84,19 +81,16 @@ void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) { fuzzing_buffer[var_count] = dt; } -void check_dht(RRuntime R, int length, std::vector &out_result_index, double *work_package) { +void check_dht(int length, std::vector &out_result_index, double *work_package, double dt) { void *key; int res; int var_count = prop_type_vector.size(); - double dt; - - dt = R.parseEval("mysetup$dt"); for (int i = 0; i < length; i++) { key = (void *)&(work_package[i * var_count]); //fuzz data (round, logarithm etc.) - fuzz_for_dht(R, var_count, key, dt); + fuzz_for_dht(var_count, key, dt); //overwrite input with data from DHT, IF value is found in DHT res = DHT_read(dht_object, fuzzing_buffer, key); @@ -116,14 +110,11 @@ void check_dht(RRuntime R, int length, std::vector &out_result_index, doub } } -void fill_dht(RRuntime R, int length, std::vector &result_index, double *work_package, double *results) { +void fill_dht(int length, std::vector &result_index, double *work_package, double *results, double dt) { void *key; void *data; int res; int var_count = prop_type_vector.size(); - double dt; - - dt = R.parseEval("mysetup$dt"); for (int i = 0; i < length; i++) { key = (void *)&(work_package[i * var_count]); @@ -133,7 +124,7 @@ void fill_dht(RRuntime R, int length, std::vector &result_index, double *w //If true -> was simulated, needs to be inserted into dht //fuzz data (round, logarithm etc.) - fuzz_for_dht(R, var_count, key, dt); + fuzz_for_dht(var_count, key, dt); res = DHT_write(dht_object, fuzzing_buffer, data); diff --git a/src/dht_wrapper.h b/src/dht_wrapper.h index 36aed4585..2d16c06dc 100644 --- a/src/dht_wrapper.h +++ b/src/dht_wrapper.h @@ -10,9 +10,9 @@ using namespace poet; /*Functions*/ uint64_t get_md5(int key_size, void* key); -void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt); -void check_dht(RRuntime R, int length, std::vector &out_result_index, double *work_package); -void fill_dht(RRuntime R, int length, std::vector &result_index, double *work_package, double *results); +void fuzz_for_dht(int var_count, void *key, double dt); +void check_dht(int length, std::vector &out_result_index, double *work_package, double dt); +void fill_dht(int length, std::vector &result_index, double *work_package, double *results, double dt); void print_statistics(); int table_to_file(char* filename); int file_to_table(char* filename); diff --git a/src/kin.cpp b/src/kin.cpp index 7786f50ef..9b2e15ada 100644 --- a/src/kin.cpp +++ b/src/kin.cpp @@ -11,8 +11,9 @@ #include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced #include "dht_wrapper.h" #include "global_buffer.h" -#include "worker.h" #include "util/RRuntime.h" +#include "worker.h" +#include "model/Grid.h" using namespace std; using namespace poet; @@ -266,8 +267,10 @@ int main(int argc, char *argv[]) { std::string init_chemistry_code = "mysetup <- init_chemistry(setup=mysetup)"; R.parseEval(init_chemistry_code); + Grid grid(R); + grid.init(); /* Retrieve state_C from R context for MPI buffer generation */ - Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C"); + //Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C"); /* Init Parallel helper functions */ R["n_procs"] = world_size - 1; /* worker count */ @@ -276,12 +279,12 @@ int main(int argc, char *argv[]) { // Removed additional field for ID in previous versions if (world_rank == 0) { mpi_buffer = - (double *)calloc(state_C.nrow() * (state_C.ncol()), sizeof(double)); + (double *)calloc(grid.getRows() * grid.getCols(), sizeof(double)); } else { mpi_buffer = (double *)calloc( - (work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double)); + (work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); mpi_buffer_results = - (double *)calloc(work_package_size * (state_C.ncol()), sizeof(double)); + (double *)calloc(work_package_size * (grid.getCols()), sizeof(double)); } if (world_rank == 0) { @@ -296,9 +299,9 @@ int main(int argc, char *argv[]) { if (dht_enabled) { // cout << "\nCreating DHT\n"; // determine size of dht entries - int dht_data_size = state_C.ncol() * sizeof(double); + int dht_data_size = grid.getCols() * sizeof(double); int dht_key_size = - state_C.ncol() * sizeof(double) + (dt_differ * sizeof(double)); + grid.getCols() * sizeof(double) + (dt_differ * sizeof(double)); // determine bucket count for preset memory usage // bucket size is key + value + 1 byte for status @@ -391,7 +394,7 @@ int main(int argc, char *argv[]) { // a temporary send buffer double *send_buffer; send_buffer = (double *)calloc( - (work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double)); + (work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double)); // helper variables int iteration; @@ -469,19 +472,16 @@ int main(int argc, char *argv[]) { */ // R.parseEval("tmp <- // shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)"); - R.parseEval("tmp <- shuffle_field(mysetup$state_T, ordered_ids)"); - R.setBufferDataFrame("tmp"); - R.to_C_domain(mpi_buffer); - //Rcpp::DataFrame chemistry_data = R.parseEval("tmp"); - - //convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data); + // Rcpp::DataFrame chemistry_data = R.parseEval("tmp"); + + // convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data); // cout << "CPP: shuffle_field() done" << endl; - + grid.shuffleAndExport(mpi_buffer); /* send and receive work; this is done by counting * the wp */ int pkg_to_send = n_wp; int pkg_to_recv = n_wp; - size_t colCount = R.getBufferNCol(); + size_t colCount = grid.getCols(); int free_workers = world_size - 1; double *work_pointer = mpi_buffer; sim_c_chemistry = MPI_Wtime(); @@ -522,7 +522,6 @@ int main(int argc, char *argv[]) { /* end visual progress */ if (pkg_to_send > 0) { - master_send_a = MPI_Wtime(); /*search for free workers and send work*/ for (int p = 0; p < world_size - 1; p++) { @@ -614,14 +613,15 @@ int main(int argc, char *argv[]) { sim_d_chemistry = MPI_Wtime(); cummul_workers += sim_d_chemistry - sim_c_chemistry; - //convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data); - R.from_C_domain(mpi_buffer); + // convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data); + //R.from_C_domain(mpi_buffer); - R["chemistry_data"] = R.getBufferDataFrame(); + //R["chemistry_data"] = R.getBufferDataFrame(); - /* unshuffle results */ - R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)"); + ///* unshuffle results */ + //R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)"); + grid.importAndUnshuffle(mpi_buffer); /* do master stuff */ sim_e_chemistry = MPI_Wtime(); R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); @@ -649,7 +649,7 @@ int main(int argc, char *argv[]) { MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD); } - // MPI_Barrier(MPI_COMM_WORLD); + MPI_Barrier(MPI_COMM_WORLD); if (dht_snaps == 2) { std::stringstream outfile; @@ -817,7 +817,7 @@ int main(int argc, char *argv[]) { std::cout.flush(); } } - worker_function(R); + worker_function(R, grid); free(mpi_buffer_results); } diff --git a/src/model/Grid.cpp b/src/model/Grid.cpp new file mode 100644 index 000000000..ae61ff3d0 --- /dev/null +++ b/src/model/Grid.cpp @@ -0,0 +1,61 @@ +#include "Grid.h" +#include "Rcpp.h" + +using namespace poet; +using namespace Rcpp; +/** + * At this moment init will only declare and define a variable inside the R + * runtime called grid_tmp since the whole Grid initialization and management is + * done by the R runtime. This may change in the future. + */ +void Grid::init() { + R.parseEval("GRID_TMP <- mysetup$state_C"); + this->ncol = R.parseEval("ncol(GRID_TMP)"); + this->nrow = R.parseEval("nrow(GRID_TMP)"); +} + +/** + * Returns the number of elements for each gridcell. + */ +unsigned int Grid::getCols() { return this->ncol; } + +/** + * Returns the number of gridcells. + */ +unsigned int Grid::getRows() { return this->nrow; } + +void Grid::shuffleAndExport(double *buffer) { + R.parseEval("tmp <- shuffle_field(mysetup$state_T, ordered_ids)"); + R.setBufferDataFrame("tmp"); + R.to_C_domain(buffer); +} + +void Grid::importAndUnshuffle(double *buffer) { + R.setBufferDataFrame("GRID_TMP"); + R.from_C_domain(buffer); + R["GRID_CHEM_DATA"] = R.getBufferDataFrame(); + R.parseEval("result <- unshuffle_field(GRID_CHEM_DATA, ordered_ids)"); +} + +void Grid::importWP(double *buffer, unsigned int p_size) { + R["GRID_WP_SKELETON"] = getSkeletonDataFrame(p_size); + R.setBufferDataFrame("GRID_WP_SKELETON"); + R.from_C_domain(buffer); + R["work_package_full"] = R.getBufferDataFrame(); +} +void Grid::exportWP(double *buffer) { + R.setBufferDataFrame("result_full"); + R.to_C_domain(buffer); +} +/** + * Create a data frame with n rows. + * + * @return Can be seen as a skeleton. The content of the data frame might be + * irrelevant. + */ +Rcpp::DataFrame Grid::getSkeletonDataFrame(unsigned int rows) { + R["GRID_ROWS"] = rows; + + Rcpp::DataFrame tmp = R.parseEval("head(GRID_TMP,GRID_ROWS)"); + return tmp; +} diff --git a/src/model/Grid.h b/src/model/Grid.h new file mode 100644 index 000000000..a2c066505 --- /dev/null +++ b/src/model/Grid.h @@ -0,0 +1,32 @@ +#ifndef GRID_H +#define GRID_H + +#include "../util/RRuntime.h" +#include + +namespace poet { +class Grid { + +public: + Grid(RRuntime &R) : R(R){}; + + void init(); + + unsigned int getCols(); + unsigned int getRows(); + + void shuffleAndExport(double *buffer); + void importAndUnshuffle(double *buffer); + + void importWP(double *buffer, unsigned int p_size); + void exportWP(double *buffer); + +private: + RRuntime R; + unsigned int ncol; + unsigned int nrow; + + Rcpp::DataFrame getSkeletonDataFrame(unsigned int rows); +}; +} // namespace poet +#endif // GRID_H diff --git a/src/worker.cpp b/src/worker.cpp index d6b21064e..d53fc5d5d 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -1,15 +1,16 @@ #include "worker.h" #include "dht_wrapper.h" #include "global_buffer.h" +#include "model/Grid.h" #include "util/RRuntime.h" +#include #include #include -#include using namespace poet; using namespace Rcpp; -void worker_function(RRuntime R) { +void worker_function(RRuntime &R, Grid &grid) { int world_rank; MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Status probe_status; @@ -130,13 +131,14 @@ void worker_function(RRuntime R) { // } /* get df with right structure to fill in work package */ - R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)"); - // R.parseEval("print(rownames(tmp2)[1:5])"); - // R.parseEval("print(head(tmp2, 2))"); - // R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))"); + // R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)"); + // R["skeleton"] = grid.buildDataFrame(work_package_size); + //// R.parseEval("print(rownames(tmp2)[1:5])"); + //// R.parseEval("print(head(tmp2, 2))"); + //// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))"); - //Rcpp::DataFrame buffer = R.parseEval("tmp2"); - R.setBufferDataFrame("skeleton"); + ////Rcpp::DataFrame buffer = R.parseEval("tmp2"); + // R.setBufferDataFrame("skeleton"); if (dht_enabled) { // DEBUG @@ -150,7 +152,7 @@ void worker_function(RRuntime R) { } dht_get_start = MPI_Wtime(); - check_dht(R, local_work_package_size, dht_flags, mpi_buffer); + check_dht(local_work_package_size, dht_flags, mpi_buffer, dt); dht_get_end = MPI_Wtime(); // DEBUG @@ -160,16 +162,19 @@ void worker_function(RRuntime R) { // R.parseEvalQ("print(head(dht_flags))"); } + /* work */ - R.from_C_domain(mpi_buffer); - //convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer); - R["work_package_full"] = R.getBufferDataFrame(); + // R.from_C_domain(mpi_buffer); + ////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer); + // R["work_package_full"] = R.getBufferDataFrame(); // R["work_package"] = buffer; // DEBUG // R.parseEvalQ("print(head(work_package_full))"); // R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )"); + grid.importWP(mpi_buffer, work_package_size); + if (dht_enabled) { R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); } else { @@ -218,11 +223,12 @@ void worker_function(RRuntime R) { R.parseEvalQ("result_full <- result"); } - R.setBufferDataFrame("result_full"); - //Rcpp::DataFrame result = R.parseEval("result_full"); - //convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result); - R.to_C_domain(mpi_buffer_results); + // R.setBufferDataFrame("result_full"); + ////Rcpp::DataFrame result = R.parseEval("result_full"); + ////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result); + // R.to_C_domain(mpi_buffer_results); + grid.exportWP(mpi_buffer_results); /* send results to master */ MPI_Request send_req; MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, @@ -230,8 +236,8 @@ void worker_function(RRuntime R) { if (dht_enabled) { dht_fill_start = MPI_Wtime(); - fill_dht(R, local_work_package_size, dht_flags, mpi_buffer, - mpi_buffer_results); + fill_dht(local_work_package_size, dht_flags, mpi_buffer, + mpi_buffer_results, dt); dht_fill_end = MPI_Wtime(); timing[1] += dht_get_end - dht_get_start; @@ -242,6 +248,7 @@ void worker_function(RRuntime R) { MPI_Wait(&send_req, MPI_STATUS_IGNORE); + } else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */ /* before death, submit profiling/timings to master*/ @@ -262,7 +269,6 @@ void worker_function(RRuntime R) { MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF, MPI_COMM_WORLD); } - break; } else if ((probe_status.MPI_TAG == TAG_DHT_STATS)) { @@ -270,6 +276,7 @@ void worker_function(RRuntime R) { MPI_STATUS_IGNORE); print_statistics(); MPI_Barrier(MPI_COMM_WORLD); + } else if ((probe_status.MPI_TAG == TAG_DHT_STORE)) { char *outdir; MPI_Get_count(&probe_status, MPI_CHAR, &count); diff --git a/src/worker.h b/src/worker.h index 72faa87e8..349333811 100644 --- a/src/worker.h +++ b/src/worker.h @@ -1,10 +1,11 @@ #pragma once #include "util/RRuntime.h" +#include "model/Grid.h" using namespace std; using namespace poet; /*Functions*/ -void worker_function(RRuntime R); +void worker_function(RRuntime &R, Grid &grid); /*Globals*/