feat: enable parallelization with tug

This commit is contained in:
Max Lübke 2022-10-26 14:57:32 +02:00
parent ba80f0be20
commit 8dd2bd192d
6 changed files with 167 additions and 61 deletions

View File

@ -20,10 +20,10 @@
#include <Rcpp.h> #include <Rcpp.h>
#include <poet/ChemSim.hpp> #include <poet/ChemSim.hpp>
#include <poet/DiffusionModule.hpp>
#include <poet/Grid.hpp> #include <poet/Grid.hpp>
#include <poet/RRuntime.hpp> #include <poet/RRuntime.hpp>
#include <poet/SimParams.hpp> #include <poet/SimParams.hpp>
#include <poet/DiffusionModule.hpp>
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>
@ -88,7 +88,7 @@ int main(int argc, char *argv[]) {
// HACK: we disable master_init and dt_differ propagation here for testing // HACK: we disable master_init and dt_differ propagation here for testing
// purposes // purposes
// //
// bool dt_differ; bool dt_differ = false;
R.parseEvalQ("mysetup <- setup"); R.parseEvalQ("mysetup <- setup");
if (world_rank == 0) { // get timestep vector from if (world_rank == 0) { // get timestep vector from
// grid_init function ... // // grid_init function ... //
@ -97,15 +97,15 @@ int main(int argc, char *argv[]) {
// dt_differ = R.parseEval("mysetup$dt_differ"); // dt_differ = R.parseEval("mysetup$dt_differ");
// // ... and broadcast it to every other rank unequal to 0 // // ... and broadcast it to every other rank unequal to 0
// MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
// } }
// /* workers will only read the setup DataFrame defined by input file */ /* workers will only read the setup DataFrame defined by input file */
// else { else {
// R.parseEval("mysetup <- setup"); // R.parseEval("mysetup <- setup");
// MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
} }
// params.setDtDiffer(dt_differ); params.setDtDiffer(dt_differ);
// initialize chemistry on all processes // initialize chemistry on all processes
// TODO: einlesen einer initial matrix (DataFrame) // TODO: einlesen einer initial matrix (DataFrame)
@ -114,7 +114,6 @@ int main(int argc, char *argv[]) {
// TODO: Grid anpassen // TODO: Grid anpassen
Grid grid(R, poet::GridParams(R)); Grid grid(R, poet::GridParams(R));
// grid.init_from_R(); // grid.init_from_R();
@ -170,11 +169,11 @@ int main(int argc, char *argv[]) {
/* Fallback for sequential execution */ /* Fallback for sequential execution */
// TODO: use new grid // TODO: use new grid
if (world_size == 1) { if (world_size == 1) {
master.ChemSim::run(); master.ChemSim::run(dt);
} }
/* otherwise run parallel */ /* otherwise run parallel */
else { else {
master.run(); master.run(dt);
} }
// MDL master_iteration_end just writes on disk state_T and // MDL master_iteration_end just writes on disk state_T and

View File

@ -3,8 +3,8 @@
## Grid initialization ## ## Grid initialization ##
################################################################# #################################################################
n <- 5 n <- 50
m <- 5 m <- 50
types <- c("scratch", "phreeqc", "rds") types <- c("scratch", "phreeqc", "rds")
@ -45,7 +45,7 @@ init_cell <- list(
grid <- list( grid <- list(
n_cells = c(n, m), n_cells = c(n, m),
s_cells = c(1,1), s_cells = c(n,m),
type = types[1], type = types[1],
init_cell = as.data.frame(init_cell), init_cell = as.data.frame(init_cell),
props = names(init_cell), props = names(init_cell),
@ -156,7 +156,7 @@ selout <- c(
# TODO: dt and iterations # TODO: dt and iterations
iterations <- 500 iterations <- 10
setup <- list( setup <- list(
# bound = myboundmat, # bound = myboundmat,

View File

@ -2,7 +2,7 @@
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of ** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam) ** Potsdam)
** **
** Copyright (C) 2018-2021 Marco De Lucia, Max Luebke (GFZ Potsdam) ** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
** **
** POET is free software; you can redistribute it and/or modify it under the ** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software ** terms of the GNU General Public License as published by the Free Software
@ -25,6 +25,7 @@
#include "Grid.hpp" #include "Grid.hpp"
#include "RRuntime.hpp" #include "RRuntime.hpp"
#include "SimParams.hpp" #include "SimParams.hpp"
#include <bits/stdint-uintn.h>
#include <cstdint> #include <cstdint>
#include <mpi.h> #include <mpi.h>
@ -79,7 +80,7 @@ public:
* @todo change function name. Maybe 'slave' to 'seq'. * @todo change function name. Maybe 'slave' to 'seq'.
* *
*/ */
virtual void run(); virtual void run(double dt);
/** /**
* @brief End simulation * @brief End simulation
@ -150,7 +151,7 @@ protected:
* @brief Stores information about size of the current work package * @brief Stores information about size of the current work package
* *
*/ */
std::vector<int> wp_sizes_vector; std::vector<uint32_t> wp_sizes_vector;
/** /**
* @brief Absolute path to output path * @brief Absolute path to output path
@ -249,7 +250,7 @@ public:
* The main tasks are instrumented with time measurements. * The main tasks are instrumented with time measurements.
* *
*/ */
void run() override; void run(double dt) override;
/** /**
* @brief End chemistry simulation. * @brief End chemistry simulation.
@ -361,6 +362,12 @@ private:
*/ */
void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers); void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers);
void shuffleField(const std::vector<double> &in_field, uint32_t size_per_prop,
uint32_t prop_count, double *out_buffer);
void unshuffleField(const double *in_buffer, uint32_t size_per_prop,
uint32_t prop_count, std::vector<double> &out_field);
/** /**
* @brief Indicating usage of DHT * @brief Indicating usage of DHT
* *

View File

@ -2,7 +2,7 @@
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of ** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam) ** Potsdam)
** **
** Copyright (C) 2018-2021 Marco De Lucia (GFZ Potsdam) ** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
** **
** POET is free software; you can redistribute it and/or modify it under the ** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software ** terms of the GNU General Public License as published by the Free Software
@ -18,12 +18,16 @@
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
#include "poet/DiffusionModule.hpp"
#include <Rcpp.h> #include <Rcpp.h>
#include <bits/stdint-uintn.h>
#include <iostream> #include <iostream>
#include <poet/ChemSim.hpp> #include <poet/ChemSim.hpp>
#include <poet/Grid.hpp> #include <poet/Grid.hpp>
#include <string>
#include <vector>
using namespace poet; using namespace poet;
using namespace std; using namespace std;
@ -38,23 +42,34 @@ ChemMaster::ChemMaster(SimParams &params, RRuntime &R_, Grid &grid_)
this->out_dir = params.getOutDir(); this->out_dir = params.getOutDir();
uint32_t grid_size = grid.getTotalCellCount() * this->prop_names.size();
/* allocate memory */ /* allocate memory */
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
send_buffer = (double *)calloc( send_buffer = (double *)calloc(
(wp_size * (grid.getSpeciesCount())) + BUFFER_OFFSET, sizeof(double)); (wp_size * this->prop_names.size()) + BUFFER_OFFSET, sizeof(double));
mpi_buffer = (double *)calloc(grid.getGridCellsCount(GRID_Y_DIR) * mpi_buffer = (double *)calloc(grid_size, sizeof(double));
grid.getGridCellsCount(GRID_X_DIR),
sizeof(double)); // R.parseEvalQ("wp_ids <- distribute_work_packages(len=length(mysetup$prop),
// "
// "package_size=work_package_size)");
// // we only sort once the vector
// R.parseEvalQ("ordered_ids <- order(wp_ids)");
// R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
// R.parseEval("stat_wp_sizes(wp_sizes_vector)");
// wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
/* calculate distribution of work packages */ /* calculate distribution of work packages */
R.parseEvalQ("wp_ids <- distribute_work_packages(len=length(mysetup$prop), " uint32_t mod_pkgs = grid_size % this->wp_size;
"package_size=work_package_size)"); uint32_t n_packages = (uint32_t)(grid.getTotalCellCount() / this->wp_size) +
(mod_pkgs != 0 ? 1 : 0);
// we only sort once the vector this->wp_sizes_vector =
R.parseEvalQ("ordered_ids <- order(wp_ids)"); std::vector<uint32_t>(n_packages - mod_pkgs, this->wp_size);
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); for (uint32_t i = 0; i < mod_pkgs; i++) {
R.parseEval("stat_wp_sizes(wp_sizes_vector)"); this->wp_sizes_vector.push_back(this->wp_size - 1);
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]); }
} }
ChemMaster::~ChemMaster() { ChemMaster::~ChemMaster() {
@ -62,7 +77,7 @@ ChemMaster::~ChemMaster() {
free(workerlist); free(workerlist);
} }
void ChemMaster::run() { void ChemMaster::run(double dt) {
/* declare most of the needed variables here */ /* declare most of the needed variables here */
double chem_a, chem_b; double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d; double seq_a, seq_b, seq_c, seq_d;
@ -78,14 +93,37 @@ void ChemMaster::run() {
/* start time measurement of sequential part */ /* start time measurement of sequential part */
seq_a = MPI_Wtime(); seq_a = MPI_Wtime();
std::vector<double> &field = this->state->mem;
for (uint32_t i = 0; i < this->prop_names.size(); i++) {
try {
std::vector<double> t_prop_vec = this->grid.getSpeciesByName(
this->prop_names[i], poet::DIFFUSION_MODULE_NAME);
std::copy(t_prop_vec.begin(), t_prop_vec.end(),
field.begin() + (i * this->n_cells_per_prop));
} catch (...) {
continue;
}
}
// HACK: transfer the field into R data structure serving as input for phreeqc
R["TMP_T"] = field;
R.parseEvalQ("mysetup$state_T <- setNames(data.frame(matrix(TMP_T, "
"ncol=length(mysetup$grid$props), nrow=" +
std::to_string(this->n_cells_per_prop) +
")), mysetup$grid$props)");
/* shuffle grid */ /* shuffle grid */
grid.shuffleAndExport(mpi_buffer); // grid.shuffleAndExport(mpi_buffer);
this->shuffleField(field, this->n_cells_per_prop, this->prop_names.size(),
mpi_buffer);
/* retrieve needed data from R runtime */ /* retrieve needed data from R runtime */
iteration = (int)R.parseEval("mysetup$iter"); iteration = (int)R.parseEval("mysetup$iter");
dt = (double)R.parseEval("mysetup$requested_dt"); // dt = (double)R.parseEval("mysetup$requested_dt");
current_sim_time = current_sim_time = (double)R.parseEval("mysetup$simulation_time") - dt;
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
/* setup local variables */ /* setup local variables */
pkg_to_send = wp_sizes_vector.size(); pkg_to_send = wp_sizes_vector.size();
@ -126,14 +164,23 @@ void ChemMaster::run() {
seq_c = MPI_Wtime(); seq_c = MPI_Wtime();
/* unshuffle grid */ /* unshuffle grid */
grid.importAndUnshuffle(mpi_buffer); // grid.importAndUnshuffle(mpi_buffer);
this->unshuffleField(mpi_buffer, this->n_cells_per_prop,
this->prop_names.size(), field);
/* do master stuff */ /* do master stuff */
/* start time measurement of master chemistry */ /* start time measurement of master chemistry */
sim_e_chemistry = MPI_Wtime(); sim_e_chemistry = MPI_Wtime();
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); // HACK: We don't need to call master_chemistry here since our result is
// already written to the memory as a data frame
// R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
R["TMP_T"] = Rcpp::wrap(field);
R.parseEval(std::string("mysetup$state_C <- setNames(data.frame(matrix(TMP_T, nrow=" +
to_string(this->n_cells_per_prop) +
")), mysetup$grid$props)"));
/* end time measurement of master chemistry */ /* end time measurement of master chemistry */
sim_f_chemistry = MPI_Wtime(); sim_f_chemistry = MPI_Wtime();
@ -369,6 +416,39 @@ void ChemMaster::end() {
free(dht_perfs); free(dht_perfs);
} }
void ChemMaster::shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop, uint32_t prop_count,
double *out_buffer) {
uint32_t wp_count = this->wp_sizes_vector.size();
uint32_t write_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_buffer[(write_i * prop_count) + k] =
in_field[(k * size_per_prop) + j];
}
write_i++;
}
}
}
void ChemMaster::unshuffleField(const double *in_buffer, uint32_t size_per_prop,
uint32_t prop_count,
std::vector<double> &out_field) {
uint32_t wp_count = this->wp_sizes_vector.size();
uint32_t read_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_field[(k * size_per_prop) + j] =
in_buffer[(read_i * prop_count) + k];
}
read_i++;
}
}
}
double ChemMaster::getSendTime() { return this->send_t; } double ChemMaster::getSendTime() { return this->send_t; }
double ChemMaster::getRecvTime() { return this->recv_t; } double ChemMaster::getRecvTime() { return this->recv_t; }

View File

@ -58,7 +58,7 @@ ChemSim::ChemSim(SimParams &params, RRuntime &R_, Grid &grid_)
} }
} }
void ChemSim::run() { void ChemSim::run(double dt) {
double chem_a, chem_b; double chem_a, chem_b;
/* start time measuring */ /* start time measuring */

View File

@ -18,12 +18,15 @@
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
#include "poet/SimParams.hpp"
#include <Rcpp.h> #include <Rcpp.h>
#include <iostream> #include <iostream>
#include <ostream>
#include <string> #include <string>
#include <poet/ChemSim.hpp> #include <poet/ChemSim.hpp>
#include <vector>
using namespace poet; using namespace poet;
using namespace std; using namespace std;
@ -41,19 +44,19 @@ ChemWorker::ChemWorker(SimParams &params, RRuntime &R_, Grid &grid_,
this->dht_file = params.getDHTFile(); this->dht_file = params.getDHTFile();
mpi_buffer = (double *)calloc((wp_size * (grid.getSpeciesCount())) + BUFFER_OFFSET, mpi_buffer = (double *)calloc(
sizeof(double)); (wp_size * (this->prop_names.size())) + BUFFER_OFFSET, sizeof(double));
mpi_buffer_results = mpi_buffer_results =
(double *)calloc(wp_size * (grid.getSpeciesCount()), sizeof(double)); (double *)calloc(wp_size * (this->prop_names.size()), sizeof(double));
if (world_rank == 1) if (world_rank == 1)
cout << "CPP: Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF") cout << "CPP: Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF")
<< endl; << endl;
if (dht_enabled) { if (dht_enabled) {
int data_size = grid.getSpeciesCount() * sizeof(double); int data_size = this->prop_names.size() * sizeof(double);
int key_size = int key_size =
grid.getSpeciesCount() * sizeof(double) + (dt_differ * sizeof(double)); this->prop_names.size() * sizeof(double) + (dt_differ * sizeof(double));
int dht_buckets_per_process = int dht_buckets_per_process =
dht_size_per_process / (1 + data_size + key_size); dht_size_per_process / (1 + data_size + key_size);
@ -66,9 +69,11 @@ ChemWorker::ChemWorker(SimParams &params, RRuntime &R_, Grid &grid_,
dht = new DHT_Wrapper(params, dht_comm, dht_buckets_per_process, data_size, dht = new DHT_Wrapper(params, dht_comm, dht_buckets_per_process, data_size,
key_size); key_size);
if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl; if (world_rank == 1)
cout << "CPP: Worker: DHT created!" << endl;
if (!dht_file.empty()) readFile(); if (!dht_file.empty())
readFile();
// set size // set size
dht_flags.resize(wp_size, true); dht_flags.resize(wp_size, true);
// assign all elements to true (default) // assign all elements to true (default)
@ -83,7 +88,8 @@ ChemWorker::ChemWorker(SimParams &params, RRuntime &R_, Grid &grid_,
ChemWorker::~ChemWorker() { ChemWorker::~ChemWorker() {
free(mpi_buffer); free(mpi_buffer);
free(mpi_buffer_results); free(mpi_buffer_results);
if (dht_enabled) delete dht; if (dht_enabled)
delete dht;
} }
void ChemWorker::loop() { void ChemWorker::loop() {
@ -114,6 +120,8 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
int count; int count;
int local_work_package_size = 0; int local_work_package_size = 0;
static int counter = 1;
double dht_get_start, dht_get_end; double dht_get_start, dht_get_end;
double phreeqc_time_start, phreeqc_time_end; double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end; double dht_fill_start, dht_fill_end;
@ -127,12 +135,12 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
/* decrement count of work_package by BUFFER_OFFSET */ /* decrement count of work_package by BUFFER_OFFSET */
count -= BUFFER_OFFSET; count -= BUFFER_OFFSET;
/* check for changes on all additional variables given by the 'header' of /* check for changes on all additional variables given by the 'header' of
* mpi_buffer */ * mpi_buffer */
// work_package_size // work_package_size
if (mpi_buffer[count] != local_work_package_size) { // work_package_size if (mpi_buffer[count] != local_work_package_size) { // work_package_size
local_work_package_size = mpi_buffer[count]; local_work_package_size = mpi_buffer[count];
R["work_package_size"] = local_work_package_size; R["work_package_size"] = local_work_package_size;
R.parseEvalQ("mysetup$work_package_size <- work_package_size"); R.parseEvalQ("mysetup$work_package_size <- work_package_size");
@ -167,9 +175,9 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
if (dht_enabled) { if (dht_enabled) {
/* resize helper vector dht_flags of work_package_size changes */ /* resize helper vector dht_flags of work_package_size changes */
if ((int)dht_flags.size() != local_work_package_size) { if ((int)dht_flags.size() != local_work_package_size) {
dht_flags.resize(local_work_package_size, true); // set size dht_flags.resize(local_work_package_size, true); // set size
dht_flags.assign(local_work_package_size, dht_flags.assign(local_work_package_size,
true); // assign all elements to true (default) true); // assign all elements to true (default)
} }
/* check for values in DHT */ /* check for values in DHT */
@ -182,7 +190,19 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
} }
/* Convert grid to R runtime */ /* Convert grid to R runtime */
grid.importWP(mpi_buffer, wp_size); // grid.importWP(mpi_buffer, wp_size);
size_t rowCount = local_work_package_size;
size_t colCount = this->prop_names.size();
std::vector<std::vector<double>> input(colCount);
for (size_t i = 0; i < rowCount; i++) {
for (size_t j = 0; j < colCount; j++) {
input[j].push_back(mpi_buffer[i * colCount + j]);
}
}
R["work_package_full"] = Rcpp::as<Rcpp::DataFrame>(Rcpp::wrap(input));
if (dht_enabled) { if (dht_enabled) {
R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
@ -198,15 +218,13 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
/*Single Line error Workaround*/ /*Single Line error Workaround*/
if (nrows <= 1) { if (nrows <= 1) {
// duplicate line to enable correct simmulation // duplicate line to enable correct simmulation
R.parseEvalQ( R.parseEvalQ("work_package <- work_package[rep(1:nrow(work_package), "
"work_package <- work_package[rep(1:nrow(work_package), " "times = 2), ]");
"times = 2), ]");
} }
/* Run PHREEQC */ /* Run PHREEQC */
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
R.parseEvalQ( R.parseEvalQ("result <- as.data.frame(slave_chemistry(setup=mysetup, "
"result <- as.data.frame(slave_chemistry(setup=mysetup, " "data = work_package))");
"data = work_package))");
phreeqc_time_end = MPI_Wtime(); phreeqc_time_end = MPI_Wtime();
} else { } else {
// undefined behaviour, isn't it? // undefined behaviour, isn't it?
@ -216,7 +234,8 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
if (dht_enabled) { if (dht_enabled) {
R.parseEvalQ("result_full <- work_package_full"); R.parseEvalQ("result_full <- work_package_full");
if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result"); if (nrows > 0)
R.parseEvalQ("result_full[dht_flags,] <- result");
} else { } else {
R.parseEvalQ("result_full <- result"); R.parseEvalQ("result_full <- result");
} }
@ -312,5 +331,6 @@ void ChemWorker::finishWork() {
MPI_Send(dht_perf, 3, MPI_INT, 0, TAG_DHT_PERF, MPI_COMM_WORLD); MPI_Send(dht_perf, 3, MPI_INT, 0, TAG_DHT_PERF, MPI_COMM_WORLD);
} }
if (dht_enabled && dht_snaps > 0) writeFile(); if (dht_enabled && dht_snaps > 0)
writeFile();
} }