diff --git a/app/poet.cpp b/app/poet.cpp index 472d64b8a..490768cc2 100644 --- a/app/poet.cpp +++ b/app/poet.cpp @@ -20,10 +20,10 @@ #include #include +#include #include #include #include -#include #include #include @@ -88,7 +88,7 @@ int main(int argc, char *argv[]) { // HACK: we disable master_init and dt_differ propagation here for testing // purposes // - // bool dt_differ; + bool dt_differ = false; R.parseEvalQ("mysetup <- setup"); if (world_rank == 0) { // get timestep vector from // grid_init function ... // @@ -97,15 +97,15 @@ int main(int argc, char *argv[]) { // dt_differ = R.parseEval("mysetup$dt_differ"); // // ... and broadcast it to every other rank unequal to 0 - // MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); - // } - // /* workers will only read the setup DataFrame defined by input file */ - // else { - // R.parseEval("mysetup <- setup"); - // MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); + MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); + } + /* workers will only read the setup DataFrame defined by input file */ + else { + // R.parseEval("mysetup <- setup"); + MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); } - // params.setDtDiffer(dt_differ); + params.setDtDiffer(dt_differ); // initialize chemistry on all processes // TODO: einlesen einer initial matrix (DataFrame) @@ -114,7 +114,6 @@ int main(int argc, char *argv[]) { // TODO: Grid anpassen - Grid grid(R, poet::GridParams(R)); // grid.init_from_R(); @@ -170,11 +169,11 @@ int main(int argc, char *argv[]) { /* Fallback for sequential execution */ // TODO: use new grid if (world_size == 1) { - master.ChemSim::run(); + master.ChemSim::run(dt); } /* otherwise run parallel */ else { - master.run(); + master.run(dt); } // MDL master_iteration_end just writes on disk state_T and diff --git a/data/SimDol2D_diffu.R b/data/SimDol2D_diffu.R index 8e12c234d..b46a0be87 100644 --- a/data/SimDol2D_diffu.R +++ b/data/SimDol2D_diffu.R @@ -3,8 +3,8 @@ ## Grid initialization ## ################################################################# -n <- 5 -m <- 5 +n <- 50 +m <- 50 types <- c("scratch", "phreeqc", "rds") @@ -45,7 +45,7 @@ init_cell <- list( grid <- list( n_cells = c(n, m), - s_cells = c(1,1), + s_cells = c(n,m), type = types[1], init_cell = as.data.frame(init_cell), props = names(init_cell), @@ -156,7 +156,7 @@ selout <- c( # TODO: dt and iterations -iterations <- 500 +iterations <- 10 setup <- list( # bound = myboundmat, diff --git a/include/poet/ChemSim.hpp b/include/poet/ChemSim.hpp index a9f5b7c85..0fdd6c1b3 100644 --- a/include/poet/ChemSim.hpp +++ b/include/poet/ChemSim.hpp @@ -2,7 +2,7 @@ ** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of ** Potsdam) ** -** Copyright (C) 2018-2021 Marco De Lucia, Max Luebke (GFZ Potsdam) +** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam) ** ** POET is free software; you can redistribute it and/or modify it under the ** terms of the GNU General Public License as published by the Free Software @@ -25,6 +25,7 @@ #include "Grid.hpp" #include "RRuntime.hpp" #include "SimParams.hpp" +#include #include #include @@ -79,7 +80,7 @@ public: * @todo change function name. Maybe 'slave' to 'seq'. * */ - virtual void run(); + virtual void run(double dt); /** * @brief End simulation @@ -150,7 +151,7 @@ protected: * @brief Stores information about size of the current work package * */ - std::vector wp_sizes_vector; + std::vector wp_sizes_vector; /** * @brief Absolute path to output path @@ -249,7 +250,7 @@ public: * The main tasks are instrumented with time measurements. * */ - void run() override; + void run(double dt) override; /** * @brief End chemistry simulation. @@ -361,6 +362,12 @@ private: */ void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers); + void shuffleField(const std::vector &in_field, uint32_t size_per_prop, + uint32_t prop_count, double *out_buffer); + + void unshuffleField(const double *in_buffer, uint32_t size_per_prop, + uint32_t prop_count, std::vector &out_field); + /** * @brief Indicating usage of DHT * diff --git a/src/ChemMaster.cpp b/src/ChemMaster.cpp index f1a416dfa..e67e5a3ae 100644 --- a/src/ChemMaster.cpp +++ b/src/ChemMaster.cpp @@ -2,7 +2,7 @@ ** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of ** Potsdam) ** -** Copyright (C) 2018-2021 Marco De Lucia (GFZ Potsdam) +** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam) ** ** POET is free software; you can redistribute it and/or modify it under the ** terms of the GNU General Public License as published by the Free Software @@ -18,12 +18,16 @@ ** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include "poet/DiffusionModule.hpp" #include +#include #include #include #include +#include +#include using namespace poet; using namespace std; @@ -38,23 +42,34 @@ ChemMaster::ChemMaster(SimParams ¶ms, RRuntime &R_, Grid &grid_) this->out_dir = params.getOutDir(); + uint32_t grid_size = grid.getTotalCellCount() * this->prop_names.size(); + /* allocate memory */ workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); send_buffer = (double *)calloc( - (wp_size * (grid.getSpeciesCount())) + BUFFER_OFFSET, sizeof(double)); - mpi_buffer = (double *)calloc(grid.getGridCellsCount(GRID_Y_DIR) * - grid.getGridCellsCount(GRID_X_DIR), - sizeof(double)); + (wp_size * this->prop_names.size()) + BUFFER_OFFSET, sizeof(double)); + mpi_buffer = (double *)calloc(grid_size, sizeof(double)); + + // R.parseEvalQ("wp_ids <- distribute_work_packages(len=length(mysetup$prop), + // " + // "package_size=work_package_size)"); + + // // we only sort once the vector + // R.parseEvalQ("ordered_ids <- order(wp_ids)"); + // R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); + // R.parseEval("stat_wp_sizes(wp_sizes_vector)"); + // wp_sizes_vector = as>(R["wp_sizes_vector"]); /* calculate distribution of work packages */ - R.parseEvalQ("wp_ids <- distribute_work_packages(len=length(mysetup$prop), " - "package_size=work_package_size)"); + uint32_t mod_pkgs = grid_size % this->wp_size; + uint32_t n_packages = (uint32_t)(grid.getTotalCellCount() / this->wp_size) + + (mod_pkgs != 0 ? 1 : 0); - // we only sort once the vector - R.parseEvalQ("ordered_ids <- order(wp_ids)"); - R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); - R.parseEval("stat_wp_sizes(wp_sizes_vector)"); - wp_sizes_vector = as>(R["wp_sizes_vector"]); + this->wp_sizes_vector = + std::vector(n_packages - mod_pkgs, this->wp_size); + for (uint32_t i = 0; i < mod_pkgs; i++) { + this->wp_sizes_vector.push_back(this->wp_size - 1); + } } ChemMaster::~ChemMaster() { @@ -62,7 +77,7 @@ ChemMaster::~ChemMaster() { free(workerlist); } -void ChemMaster::run() { +void ChemMaster::run(double dt) { /* declare most of the needed variables here */ double chem_a, chem_b; double seq_a, seq_b, seq_c, seq_d; @@ -78,14 +93,37 @@ void ChemMaster::run() { /* start time measurement of sequential part */ seq_a = MPI_Wtime(); + std::vector &field = this->state->mem; + + for (uint32_t i = 0; i < this->prop_names.size(); i++) { + try { + std::vector t_prop_vec = this->grid.getSpeciesByName( + this->prop_names[i], poet::DIFFUSION_MODULE_NAME); + + std::copy(t_prop_vec.begin(), t_prop_vec.end(), + field.begin() + (i * this->n_cells_per_prop)); + } catch (...) { + continue; + } + } + + // HACK: transfer the field into R data structure serving as input for phreeqc + R["TMP_T"] = field; + + R.parseEvalQ("mysetup$state_T <- setNames(data.frame(matrix(TMP_T, " + "ncol=length(mysetup$grid$props), nrow=" + + std::to_string(this->n_cells_per_prop) + + ")), mysetup$grid$props)"); + /* shuffle grid */ - grid.shuffleAndExport(mpi_buffer); + // grid.shuffleAndExport(mpi_buffer); + this->shuffleField(field, this->n_cells_per_prop, this->prop_names.size(), + mpi_buffer); /* retrieve needed data from R runtime */ iteration = (int)R.parseEval("mysetup$iter"); - dt = (double)R.parseEval("mysetup$requested_dt"); - current_sim_time = - (double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt"); + // dt = (double)R.parseEval("mysetup$requested_dt"); + current_sim_time = (double)R.parseEval("mysetup$simulation_time") - dt; /* setup local variables */ pkg_to_send = wp_sizes_vector.size(); @@ -126,14 +164,23 @@ void ChemMaster::run() { seq_c = MPI_Wtime(); /* unshuffle grid */ - grid.importAndUnshuffle(mpi_buffer); + // grid.importAndUnshuffle(mpi_buffer); + this->unshuffleField(mpi_buffer, this->n_cells_per_prop, + this->prop_names.size(), field); /* do master stuff */ /* start time measurement of master chemistry */ sim_e_chemistry = MPI_Wtime(); - R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); + // HACK: We don't need to call master_chemistry here since our result is + // already written to the memory as a data frame + + // R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); + R["TMP_T"] = Rcpp::wrap(field); + R.parseEval(std::string("mysetup$state_C <- setNames(data.frame(matrix(TMP_T, nrow=" + + to_string(this->n_cells_per_prop) + + ")), mysetup$grid$props)")); /* end time measurement of master chemistry */ sim_f_chemistry = MPI_Wtime(); @@ -369,6 +416,39 @@ void ChemMaster::end() { free(dht_perfs); } +void ChemMaster::shuffleField(const std::vector &in_field, + uint32_t size_per_prop, uint32_t prop_count, + double *out_buffer) { + uint32_t wp_count = this->wp_sizes_vector.size(); + uint32_t write_i = 0; + for (uint32_t i = 0; i < wp_count; i++) { + for (uint32_t j = i; j < size_per_prop; j += wp_count) { + for (uint32_t k = 0; k < prop_count; k++) { + out_buffer[(write_i * prop_count) + k] = + in_field[(k * size_per_prop) + j]; + } + write_i++; + } + } +} + +void ChemMaster::unshuffleField(const double *in_buffer, uint32_t size_per_prop, + uint32_t prop_count, + std::vector &out_field) { + uint32_t wp_count = this->wp_sizes_vector.size(); + uint32_t read_i = 0; + + for (uint32_t i = 0; i < wp_count; i++) { + for (uint32_t j = i; j < size_per_prop; j += wp_count) { + for (uint32_t k = 0; k < prop_count; k++) { + out_field[(k * size_per_prop) + j] = + in_buffer[(read_i * prop_count) + k]; + } + read_i++; + } + } +} + double ChemMaster::getSendTime() { return this->send_t; } double ChemMaster::getRecvTime() { return this->recv_t; } diff --git a/src/ChemSim.cpp b/src/ChemSim.cpp index 5dc155a46..da7c46b8c 100644 --- a/src/ChemSim.cpp +++ b/src/ChemSim.cpp @@ -58,7 +58,7 @@ ChemSim::ChemSim(SimParams ¶ms, RRuntime &R_, Grid &grid_) } } -void ChemSim::run() { +void ChemSim::run(double dt) { double chem_a, chem_b; /* start time measuring */ diff --git a/src/ChemWorker.cpp b/src/ChemWorker.cpp index 97b7728d3..93acc694e 100644 --- a/src/ChemWorker.cpp +++ b/src/ChemWorker.cpp @@ -18,12 +18,15 @@ ** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include "poet/SimParams.hpp" #include #include +#include #include #include +#include using namespace poet; using namespace std; @@ -41,19 +44,19 @@ ChemWorker::ChemWorker(SimParams ¶ms, RRuntime &R_, Grid &grid_, this->dht_file = params.getDHTFile(); - mpi_buffer = (double *)calloc((wp_size * (grid.getSpeciesCount())) + BUFFER_OFFSET, - sizeof(double)); + mpi_buffer = (double *)calloc( + (wp_size * (this->prop_names.size())) + BUFFER_OFFSET, sizeof(double)); mpi_buffer_results = - (double *)calloc(wp_size * (grid.getSpeciesCount()), sizeof(double)); + (double *)calloc(wp_size * (this->prop_names.size()), sizeof(double)); if (world_rank == 1) cout << "CPP: Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF") << endl; if (dht_enabled) { - int data_size = grid.getSpeciesCount() * sizeof(double); + int data_size = this->prop_names.size() * sizeof(double); int key_size = - grid.getSpeciesCount() * sizeof(double) + (dt_differ * sizeof(double)); + this->prop_names.size() * sizeof(double) + (dt_differ * sizeof(double)); int dht_buckets_per_process = dht_size_per_process / (1 + data_size + key_size); @@ -66,9 +69,11 @@ ChemWorker::ChemWorker(SimParams ¶ms, RRuntime &R_, Grid &grid_, dht = new DHT_Wrapper(params, dht_comm, dht_buckets_per_process, data_size, key_size); - if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl; + if (world_rank == 1) + cout << "CPP: Worker: DHT created!" << endl; - if (!dht_file.empty()) readFile(); + if (!dht_file.empty()) + readFile(); // set size dht_flags.resize(wp_size, true); // assign all elements to true (default) @@ -83,7 +88,8 @@ ChemWorker::ChemWorker(SimParams ¶ms, RRuntime &R_, Grid &grid_, ChemWorker::~ChemWorker() { free(mpi_buffer); free(mpi_buffer_results); - if (dht_enabled) delete dht; + if (dht_enabled) + delete dht; } void ChemWorker::loop() { @@ -114,6 +120,8 @@ void ChemWorker::doWork(MPI_Status &probe_status) { int count; int local_work_package_size = 0; + static int counter = 1; + double dht_get_start, dht_get_end; double phreeqc_time_start, phreeqc_time_end; double dht_fill_start, dht_fill_end; @@ -127,12 +135,12 @@ void ChemWorker::doWork(MPI_Status &probe_status) { /* decrement count of work_package by BUFFER_OFFSET */ count -= BUFFER_OFFSET; - + /* check for changes on all additional variables given by the 'header' of * mpi_buffer */ // work_package_size - if (mpi_buffer[count] != local_work_package_size) { // work_package_size + if (mpi_buffer[count] != local_work_package_size) { // work_package_size local_work_package_size = mpi_buffer[count]; R["work_package_size"] = local_work_package_size; R.parseEvalQ("mysetup$work_package_size <- work_package_size"); @@ -167,9 +175,9 @@ void ChemWorker::doWork(MPI_Status &probe_status) { if (dht_enabled) { /* resize helper vector dht_flags of work_package_size changes */ if ((int)dht_flags.size() != local_work_package_size) { - dht_flags.resize(local_work_package_size, true); // set size + dht_flags.resize(local_work_package_size, true); // set size dht_flags.assign(local_work_package_size, - true); // assign all elements to true (default) + true); // assign all elements to true (default) } /* check for values in DHT */ @@ -182,7 +190,19 @@ void ChemWorker::doWork(MPI_Status &probe_status) { } /* Convert grid to R runtime */ - grid.importWP(mpi_buffer, wp_size); + // grid.importWP(mpi_buffer, wp_size); + size_t rowCount = local_work_package_size; + size_t colCount = this->prop_names.size(); + + std::vector> input(colCount); + + for (size_t i = 0; i < rowCount; i++) { + for (size_t j = 0; j < colCount; j++) { + input[j].push_back(mpi_buffer[i * colCount + j]); + } + } + + R["work_package_full"] = Rcpp::as(Rcpp::wrap(input)); if (dht_enabled) { R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); @@ -198,15 +218,13 @@ void ChemWorker::doWork(MPI_Status &probe_status) { /*Single Line error Workaround*/ if (nrows <= 1) { // duplicate line to enable correct simmulation - R.parseEvalQ( - "work_package <- work_package[rep(1:nrow(work_package), " - "times = 2), ]"); + R.parseEvalQ("work_package <- work_package[rep(1:nrow(work_package), " + "times = 2), ]"); } /* Run PHREEQC */ phreeqc_time_start = MPI_Wtime(); - R.parseEvalQ( - "result <- as.data.frame(slave_chemistry(setup=mysetup, " - "data = work_package))"); + R.parseEvalQ("result <- as.data.frame(slave_chemistry(setup=mysetup, " + "data = work_package))"); phreeqc_time_end = MPI_Wtime(); } else { // undefined behaviour, isn't it? @@ -216,7 +234,8 @@ void ChemWorker::doWork(MPI_Status &probe_status) { if (dht_enabled) { R.parseEvalQ("result_full <- work_package_full"); - if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result"); + if (nrows > 0) + R.parseEvalQ("result_full[dht_flags,] <- result"); } else { R.parseEvalQ("result_full <- result"); } @@ -312,5 +331,6 @@ void ChemWorker::finishWork() { MPI_Send(dht_perf, 3, MPI_INT, 0, TAG_DHT_PERF, MPI_COMM_WORLD); } - if (dht_enabled && dht_snaps > 0) writeFile(); + if (dht_enabled && dht_snaps > 0) + writeFile(); }