added Grid Class

This commit is contained in:
Max Lübke 2020-12-16 14:15:14 +01:00
parent 7b4c3a4bce
commit 41b15e14e8
8 changed files with 164 additions and 72 deletions

View File

@ -10,7 +10,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
find_package(MPI REQUIRED)
set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS -Wall -O3")
set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS")
add_definitions(${GCC_CXX_FLAGS})
# prepare R environment (Rcpp + RInside)
@ -89,7 +89,7 @@ include_directories(${MPI_CXX_INCLUDE_DIRS})
#define program libraries
add_library(POET_Libs OBJECT util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp)
add_library(POET_Libs STATIC util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp model/Grid.cpp)
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)

View File

@ -39,20 +39,20 @@ uint64_t get_md5(int key_size, void *key) {
return retval;
}
double Round_off(RRuntime R, double N, double n) {
double result;
R["roundsig"] = n;
R["roundin"] = N;
result = R.parseEval("signif(roundin, digits=roundsig)");
return result;
}
//double Round_off(double N, double n) {
// double result;
// R["roundsig"] = n;
// R["roundin"] = N;
//
// result = R.parseEval("signif(roundin, digits=roundsig)");
//
// return result;
//}
/*
* Stores fuzzed version of key in fuzzing_buffer
*/
void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) {
void fuzz_for_dht(int var_count, void *key, double dt) {
unsigned int i = 0;
//introduce fuzzing to allow more hits in DHT
for (i = 0; i < (unsigned int)var_count; i++) {
@ -64,15 +64,12 @@ void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) {
else if (((double *)key)[i] == 0)
fuzzing_buffer[i] = 0;
else
//fuzzing_buffer[i] = Round_off(R, std::log10(((double *)key)[i]), dht_significant_digits_vector[i] - 1);
fuzzing_buffer[i] = ROUND(-(std::log10(((double *)key)[i])), dht_significant_digits_vector[i]);
} else {
//without log10
//fuzzing_buffer[i] = Round_off(R, ((double *)key)[i], dht_significant_digits_vector[i]);
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]);
}
} else if (prop_type_vector[i] == "logact") {
//fuzzing_buffer[i] = Round_off(R, ((double *)key)[i], dht_significant_digits_vector[i]);
fuzzing_buffer[i] = ROUND((((double *)key)[i]), dht_significant_digits_vector[i]);
} else if (prop_type_vector[i] == "ignore") {
fuzzing_buffer[i] = 0;
@ -84,19 +81,16 @@ void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) {
fuzzing_buffer[var_count] = dt;
}
void check_dht(RRuntime R, int length, std::vector<bool> &out_result_index, double *work_package) {
void check_dht(int length, std::vector<bool> &out_result_index, double *work_package, double dt) {
void *key;
int res;
int var_count = prop_type_vector.size();
double dt;
dt = R.parseEval("mysetup$dt");
for (int i = 0; i < length; i++) {
key = (void *)&(work_package[i * var_count]);
//fuzz data (round, logarithm etc.)
fuzz_for_dht(R, var_count, key, dt);
fuzz_for_dht(var_count, key, dt);
//overwrite input with data from DHT, IF value is found in DHT
res = DHT_read(dht_object, fuzzing_buffer, key);
@ -116,14 +110,11 @@ void check_dht(RRuntime R, int length, std::vector<bool> &out_result_index, doub
}
}
void fill_dht(RRuntime R, int length, std::vector<bool> &result_index, double *work_package, double *results) {
void fill_dht(int length, std::vector<bool> &result_index, double *work_package, double *results, double dt) {
void *key;
void *data;
int res;
int var_count = prop_type_vector.size();
double dt;
dt = R.parseEval("mysetup$dt");
for (int i = 0; i < length; i++) {
key = (void *)&(work_package[i * var_count]);
@ -133,7 +124,7 @@ void fill_dht(RRuntime R, int length, std::vector<bool> &result_index, double *w
//If true -> was simulated, needs to be inserted into dht
//fuzz data (round, logarithm etc.)
fuzz_for_dht(R, var_count, key, dt);
fuzz_for_dht(var_count, key, dt);
res = DHT_write(dht_object, fuzzing_buffer, data);

View File

@ -10,9 +10,9 @@ using namespace poet;
/*Functions*/
uint64_t get_md5(int key_size, void* key);
void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt);
void check_dht(RRuntime R, int length, std::vector<bool> &out_result_index, double *work_package);
void fill_dht(RRuntime R, int length, std::vector<bool> &result_index, double *work_package, double *results);
void fuzz_for_dht(int var_count, void *key, double dt);
void check_dht(int length, std::vector<bool> &out_result_index, double *work_package, double dt);
void fill_dht(int length, std::vector<bool> &result_index, double *work_package, double *results, double dt);
void print_statistics();
int table_to_file(char* filename);
int file_to_table(char* filename);

View File

@ -11,8 +11,9 @@
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
#include "dht_wrapper.h"
#include "global_buffer.h"
#include "worker.h"
#include "util/RRuntime.h"
#include "worker.h"
#include "model/Grid.h"
using namespace std;
using namespace poet;
@ -266,8 +267,10 @@ int main(int argc, char *argv[]) {
std::string init_chemistry_code = "mysetup <- init_chemistry(setup=mysetup)";
R.parseEval(init_chemistry_code);
Grid grid(R);
grid.init();
/* Retrieve state_C from R context for MPI buffer generation */
Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
//Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
/* Init Parallel helper functions */
R["n_procs"] = world_size - 1; /* worker count */
@ -276,12 +279,12 @@ int main(int argc, char *argv[]) {
// Removed additional field for ID in previous versions
if (world_rank == 0) {
mpi_buffer =
(double *)calloc(state_C.nrow() * (state_C.ncol()), sizeof(double));
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
} else {
mpi_buffer = (double *)calloc(
(work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double));
(work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
mpi_buffer_results =
(double *)calloc(work_package_size * (state_C.ncol()), sizeof(double));
(double *)calloc(work_package_size * (grid.getCols()), sizeof(double));
}
if (world_rank == 0) {
@ -296,9 +299,9 @@ int main(int argc, char *argv[]) {
if (dht_enabled) {
// cout << "\nCreating DHT\n";
// determine size of dht entries
int dht_data_size = state_C.ncol() * sizeof(double);
int dht_data_size = grid.getCols() * sizeof(double);
int dht_key_size =
state_C.ncol() * sizeof(double) + (dt_differ * sizeof(double));
grid.getCols() * sizeof(double) + (dt_differ * sizeof(double));
// determine bucket count for preset memory usage
// bucket size is key + value + 1 byte for status
@ -391,7 +394,7 @@ int main(int argc, char *argv[]) {
// a temporary send buffer
double *send_buffer;
send_buffer = (double *)calloc(
(work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double));
(work_package_size * (grid.getCols())) + BUFFER_OFFSET, sizeof(double));
// helper variables
int iteration;
@ -469,19 +472,16 @@ int main(int argc, char *argv[]) {
*/
// R.parseEval("tmp <-
// shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)");
R.parseEval("tmp <- shuffle_field(mysetup$state_T, ordered_ids)");
R.setBufferDataFrame("tmp");
R.to_C_domain(mpi_buffer);
//Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
//convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
// Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
// convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
// cout << "CPP: shuffle_field() done" << endl;
grid.shuffleAndExport(mpi_buffer);
/* send and receive work; this is done by counting
* the wp */
int pkg_to_send = n_wp;
int pkg_to_recv = n_wp;
size_t colCount = R.getBufferNCol();
size_t colCount = grid.getCols();
int free_workers = world_size - 1;
double *work_pointer = mpi_buffer;
sim_c_chemistry = MPI_Wtime();
@ -522,7 +522,6 @@ int main(int argc, char *argv[]) {
/* end visual progress */
if (pkg_to_send > 0) {
master_send_a = MPI_Wtime();
/*search for free workers and send work*/
for (int p = 0; p < world_size - 1; p++) {
@ -614,14 +613,15 @@ int main(int argc, char *argv[]) {
sim_d_chemistry = MPI_Wtime();
cummul_workers += sim_d_chemistry - sim_c_chemistry;
//convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
R.from_C_domain(mpi_buffer);
// convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
//R.from_C_domain(mpi_buffer);
R["chemistry_data"] = R.getBufferDataFrame();
//R["chemistry_data"] = R.getBufferDataFrame();
/* unshuffle results */
R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)");
///* unshuffle results */
//R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)");
grid.importAndUnshuffle(mpi_buffer);
/* do master stuff */
sim_e_chemistry = MPI_Wtime();
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
@ -649,7 +649,7 @@ int main(int argc, char *argv[]) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD);
}
// MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
if (dht_snaps == 2) {
std::stringstream outfile;
@ -817,7 +817,7 @@ int main(int argc, char *argv[]) {
std::cout.flush();
}
}
worker_function(R);
worker_function(R, grid);
free(mpi_buffer_results);
}

61
src/model/Grid.cpp Normal file
View File

@ -0,0 +1,61 @@
#include "Grid.h"
#include "Rcpp.h"
using namespace poet;
using namespace Rcpp;
/**
* At this moment init will only declare and define a variable inside the R
* runtime called grid_tmp since the whole Grid initialization and management is
* done by the R runtime. This may change in the future.
*/
void Grid::init() {
R.parseEval("GRID_TMP <- mysetup$state_C");
this->ncol = R.parseEval("ncol(GRID_TMP)");
this->nrow = R.parseEval("nrow(GRID_TMP)");
}
/**
* Returns the number of elements for each gridcell.
*/
unsigned int Grid::getCols() { return this->ncol; }
/**
* Returns the number of gridcells.
*/
unsigned int Grid::getRows() { return this->nrow; }
void Grid::shuffleAndExport(double *buffer) {
R.parseEval("tmp <- shuffle_field(mysetup$state_T, ordered_ids)");
R.setBufferDataFrame("tmp");
R.to_C_domain(buffer);
}
void Grid::importAndUnshuffle(double *buffer) {
R.setBufferDataFrame("GRID_TMP");
R.from_C_domain(buffer);
R["GRID_CHEM_DATA"] = R.getBufferDataFrame();
R.parseEval("result <- unshuffle_field(GRID_CHEM_DATA, ordered_ids)");
}
void Grid::importWP(double *buffer, unsigned int p_size) {
R["GRID_WP_SKELETON"] = getSkeletonDataFrame(p_size);
R.setBufferDataFrame("GRID_WP_SKELETON");
R.from_C_domain(buffer);
R["work_package_full"] = R.getBufferDataFrame();
}
void Grid::exportWP(double *buffer) {
R.setBufferDataFrame("result_full");
R.to_C_domain(buffer);
}
/**
* Create a data frame with n rows.
*
* @return Can be seen as a skeleton. The content of the data frame might be
* irrelevant.
*/
Rcpp::DataFrame Grid::getSkeletonDataFrame(unsigned int rows) {
R["GRID_ROWS"] = rows;
Rcpp::DataFrame tmp = R.parseEval("head(GRID_TMP,GRID_ROWS)");
return tmp;
}

32
src/model/Grid.h Normal file
View File

@ -0,0 +1,32 @@
#ifndef GRID_H
#define GRID_H
#include "../util/RRuntime.h"
#include <Rcpp.h>
namespace poet {
class Grid {
public:
Grid(RRuntime &R) : R(R){};
void init();
unsigned int getCols();
unsigned int getRows();
void shuffleAndExport(double *buffer);
void importAndUnshuffle(double *buffer);
void importWP(double *buffer, unsigned int p_size);
void exportWP(double *buffer);
private:
RRuntime R;
unsigned int ncol;
unsigned int nrow;
Rcpp::DataFrame getSkeletonDataFrame(unsigned int rows);
};
} // namespace poet
#endif // GRID_H

View File

@ -1,15 +1,16 @@
#include "worker.h"
#include "dht_wrapper.h"
#include "global_buffer.h"
#include "model/Grid.h"
#include "util/RRuntime.h"
#include <Rcpp.h>
#include <iostream>
#include <mpi.h>
#include <Rcpp.h>
using namespace poet;
using namespace Rcpp;
void worker_function(RRuntime R) {
void worker_function(RRuntime &R, Grid &grid) {
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Status probe_status;
@ -130,13 +131,14 @@ void worker_function(RRuntime R) {
// }
/* get df with right structure to fill in work package */
R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
// R.parseEval("print(rownames(tmp2)[1:5])");
// R.parseEval("print(head(tmp2, 2))");
// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
// R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
// R["skeleton"] = grid.buildDataFrame(work_package_size);
//// R.parseEval("print(rownames(tmp2)[1:5])");
//// R.parseEval("print(head(tmp2, 2))");
//// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
//Rcpp::DataFrame buffer = R.parseEval("tmp2");
R.setBufferDataFrame("skeleton");
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
// R.setBufferDataFrame("skeleton");
if (dht_enabled) {
// DEBUG
@ -150,7 +152,7 @@ void worker_function(RRuntime R) {
}
dht_get_start = MPI_Wtime();
check_dht(R, local_work_package_size, dht_flags, mpi_buffer);
check_dht(local_work_package_size, dht_flags, mpi_buffer, dt);
dht_get_end = MPI_Wtime();
// DEBUG
@ -160,16 +162,19 @@ void worker_function(RRuntime R) {
// R.parseEvalQ("print(head(dht_flags))");
}
/* work */
R.from_C_domain(mpi_buffer);
//convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
R["work_package_full"] = R.getBufferDataFrame();
// R.from_C_domain(mpi_buffer);
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
// R["work_package_full"] = R.getBufferDataFrame();
// R["work_package"] = buffer;
// DEBUG
// R.parseEvalQ("print(head(work_package_full))");
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
grid.importWP(mpi_buffer, work_package_size);
if (dht_enabled) {
R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
} else {
@ -218,11 +223,12 @@ void worker_function(RRuntime R) {
R.parseEvalQ("result_full <- result");
}
R.setBufferDataFrame("result_full");
//Rcpp::DataFrame result = R.parseEval("result_full");
//convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
R.to_C_domain(mpi_buffer_results);
// R.setBufferDataFrame("result_full");
////Rcpp::DataFrame result = R.parseEval("result_full");
////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
// R.to_C_domain(mpi_buffer_results);
grid.exportWP(mpi_buffer_results);
/* send results to master */
MPI_Request send_req;
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK,
@ -230,8 +236,8 @@ void worker_function(RRuntime R) {
if (dht_enabled) {
dht_fill_start = MPI_Wtime();
fill_dht(R, local_work_package_size, dht_flags, mpi_buffer,
mpi_buffer_results);
fill_dht(local_work_package_size, dht_flags, mpi_buffer,
mpi_buffer_results, dt);
dht_fill_end = MPI_Wtime();
timing[1] += dht_get_end - dht_get_start;
@ -242,6 +248,7 @@ void worker_function(RRuntime R) {
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
} else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */
/* before death, submit profiling/timings to master*/
@ -262,7 +269,6 @@ void worker_function(RRuntime R) {
MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF,
MPI_COMM_WORLD);
}
break;
} else if ((probe_status.MPI_TAG == TAG_DHT_STATS)) {
@ -270,6 +276,7 @@ void worker_function(RRuntime R) {
MPI_STATUS_IGNORE);
print_statistics();
MPI_Barrier(MPI_COMM_WORLD);
} else if ((probe_status.MPI_TAG == TAG_DHT_STORE)) {
char *outdir;
MPI_Get_count(&probe_status, MPI_CHAR, &count);

View File

@ -1,10 +1,11 @@
#pragma once
#include "util/RRuntime.h"
#include "model/Grid.h"
using namespace std;
using namespace poet;
/*Functions*/
void worker_function(RRuntime R);
void worker_function(RRuntime &R, Grid &grid);
/*Globals*/