documented libraries

This commit is contained in:
Max Lübke 2021-02-02 17:22:11 +01:00
parent 991f825dc6
commit c8bb7262a0
8 changed files with 679 additions and 119 deletions

View File

@ -17,10 +17,14 @@ ChemMaster::ChemMaster(SimParams &params, RRuntime &R_, Grid &grid_)
this->out_dir = params.getOutDir(); this->out_dir = params.getOutDir();
/* allocate memory */
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct)); workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET, send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
sizeof(double)); sizeof(double));
mpi_buffer =
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
/* calculate distribution of work packages */
R.parseEvalQ( R.parseEvalQ(
"wp_ids <- distribute_work_packages(len=nrow(mysetup$state_C), " "wp_ids <- distribute_work_packages(len=nrow(mysetup$state_C), "
"package_size=work_package_size)"); "package_size=work_package_size)");
@ -30,9 +34,6 @@ ChemMaster::ChemMaster(SimParams &params, RRuntime &R_, Grid &grid_)
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
R.parseEval("stat_wp_sizes(wp_sizes_vector)"); R.parseEval("stat_wp_sizes(wp_sizes_vector)");
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]); wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
mpi_buffer =
(double *)calloc(grid.getRows() * grid.getCols(), sizeof(double));
} }
ChemMaster::~ChemMaster() { ChemMaster::~ChemMaster() {
@ -41,6 +42,7 @@ ChemMaster::~ChemMaster() {
} }
void ChemMaster::run() { void ChemMaster::run() {
/* declare most of the needed variables here */
double chem_a, chem_b; double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d; double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b; double worker_chemistry_a, worker_chemistry_b;
@ -49,83 +51,105 @@ void ChemMaster::run() {
int free_workers; int free_workers;
int i_pkgs; int i_pkgs;
/* start time measurement of whole chemistry simulation */
chem_a = MPI_Wtime(); chem_a = MPI_Wtime();
/* start time measurement of sequential part */
seq_a = MPI_Wtime(); seq_a = MPI_Wtime();
/* shuffle grid */
grid.shuffleAndExport(mpi_buffer); grid.shuffleAndExport(mpi_buffer);
// retrieve data from R runtime
/* retrieve needed data from R runtime */
iteration = (int)R.parseEval("mysetup$iter"); iteration = (int)R.parseEval("mysetup$iter");
dt = (double)R.parseEval("mysetup$requested_dt"); dt = (double)R.parseEval("mysetup$requested_dt");
current_sim_time = current_sim_time =
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt"); (double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
// setup local variables /* setup local variables */
pkg_to_send = wp_sizes_vector.size(); pkg_to_send = wp_sizes_vector.size();
pkg_to_recv = wp_sizes_vector.size(); pkg_to_recv = wp_sizes_vector.size();
work_pointer = mpi_buffer; work_pointer = mpi_buffer;
free_workers = world_size - 1; free_workers = world_size - 1;
i_pkgs = 0; i_pkgs = 0;
/* end time measurement of sequential part */
seq_b = MPI_Wtime(); seq_b = MPI_Wtime();
seq_t += seq_b - seq_a; seq_t += seq_b - seq_a;
/* start time measurement of chemistry time needed for send/recv loop */
worker_chemistry_a = MPI_Wtime(); worker_chemistry_a = MPI_Wtime();
/* start send/recv loop */
// while there are still packages to recv
while (pkg_to_recv > 0) { while (pkg_to_recv > 0) {
// TODO: Progressbar into IO instance. // print a progressbar to stdout
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size()); printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
// while there are still packages to send
if (pkg_to_send > 0) { if (pkg_to_send > 0) {
// send packages to all free workers ...
sendPkgs(pkg_to_send, i_pkgs, free_workers); sendPkgs(pkg_to_send, i_pkgs, free_workers);
} }
// ... and try to receive them from workers who has finished their work
recvPkgs(pkg_to_recv, pkg_to_send > 0, free_workers); recvPkgs(pkg_to_recv, pkg_to_send > 0, free_workers);
} }
// Just to complete the progressbar // Just to complete the progressbar
cout << endl; cout << endl;
/* stop time measurement of chemistry time needed for send/recv loop */
worker_chemistry_b = MPI_Wtime(); worker_chemistry_b = MPI_Wtime();
worker_t = worker_chemistry_b - worker_chemistry_a; worker_t = worker_chemistry_b - worker_chemistry_a;
/* start time measurement of sequential part */
seq_c = MPI_Wtime(); seq_c = MPI_Wtime();
/* unshuffle grid */
grid.importAndUnshuffle(mpi_buffer); grid.importAndUnshuffle(mpi_buffer);
/* do master stuff */ /* do master stuff */
/* start time measurement of master chemistry */
sim_e_chemistry = MPI_Wtime(); sim_e_chemistry = MPI_Wtime();
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
/* end time measurement of master chemistry */
sim_f_chemistry = MPI_Wtime(); sim_f_chemistry = MPI_Wtime();
chem_master += sim_f_chemistry - sim_e_chemistry; chem_master += sim_f_chemistry - sim_e_chemistry;
/* end time measurement of sequential part */
seq_d = MPI_Wtime(); seq_d = MPI_Wtime();
seq_t += seq_d - seq_c; seq_t += seq_d - seq_c;
/* end time measurement of whole chemistry simulation */
chem_b = MPI_Wtime(); chem_b = MPI_Wtime();
chem_t += chem_b - chem_a; chem_t += chem_b - chem_a;
} }
void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs, void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
int &free_workers) { int &free_workers) {
/* declare variables */
double master_send_a, master_send_b; double master_send_a, master_send_b;
int local_work_package_size; int local_work_package_size;
int end_of_wp; int end_of_wp;
// start time measurement /* start time measurement */
master_send_a = MPI_Wtime(); master_send_a = MPI_Wtime();
/*search for free workers and send work*/
/* search for free workers and send work */
for (int p = 0; p < world_size - 1; p++) { for (int p = 0; p < world_size - 1; p++) {
if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ { if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ {
// to enable different work_package_size, set local copy of /* to enable different work_package_size, set local copy of
// work_package_size to either global work_package size or * work_package_size to pre-calculated work package size vector */
// remaining 'to_send' packages to_send >= work_package_size ?
// local_work_package_size = work_package_size :
// local_work_package_size = to_send;
local_work_package_size = (int)wp_sizes_vector[count_pkgs]; local_work_package_size = (int)wp_sizes_vector[count_pkgs];
count_pkgs++; count_pkgs++;
// cout << "CPP: sending pkg n. " << count_pkgs << " with size " /* note current processed work package in workerlist */
// << local_work_package_size << endl;
/*push pointer forward to next work package, after taking the
* current one*/
workerlist[p].send_addr = work_pointer; workerlist[p].send_addr = work_pointer;
/* push work pointer to next work package */
end_of_wp = local_work_package_size * grid.getCols(); end_of_wp = local_work_package_size * grid.getCols();
work_pointer = &(work_pointer[end_of_wp]); work_pointer = &(work_pointer[end_of_wp]);
@ -147,6 +171,7 @@ void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1, MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
TAG_WORK, MPI_COMM_WORLD); TAG_WORK, MPI_COMM_WORLD);
/* Mark that worker has work to do */
workerlist[p].has_work = 1; workerlist[p].has_work = 1;
free_workers--; free_workers--;
pkg_to_send -= 1; pkg_to_send -= 1;
@ -157,6 +182,7 @@ void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
} }
void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers) { void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers) {
/* declare most of the variables here */
int need_to_receive = 1; int need_to_receive = 1;
double master_recv_a, master_recv_b; double master_recv_a, master_recv_b;
double idle_a, idle_b; double idle_a, idle_b;
@ -164,20 +190,26 @@ void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers) {
MPI_Status probe_status; MPI_Status probe_status;
master_recv_a = MPI_Wtime(); master_recv_a = MPI_Wtime();
/* start to loop as long there are packages to recv and the need to receive
*/
while (need_to_receive && pkg_to_recv > 0) { while (need_to_receive && pkg_to_recv > 0) {
// only of there are still packages to send and free workers are available
if (to_send && free_workers > 0) if (to_send && free_workers > 0)
// non blocking probing
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive, MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive,
&probe_status); &probe_status);
else { else {
idle_a = MPI_Wtime(); idle_a = MPI_Wtime();
// blocking probing
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status); MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status);
idle_b = MPI_Wtime(); idle_b = MPI_Wtime();
master_idle += idle_b - idle_a; master_idle += idle_b - idle_a;
} }
/* if need_to_receive was set to true above, so there is a message to
* receive */
if (need_to_receive) { if (need_to_receive) {
p = probe_status.MPI_SOURCE; p = probe_status.MPI_SOURCE;
size;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size); MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, TAG_WORK, MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, TAG_WORK,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_COMM_WORLD, MPI_STATUS_IGNORE);
@ -210,8 +242,10 @@ void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
} }
void ChemMaster::end() { void ChemMaster::end() {
/* call end() from base class */
ChemSim::end(); ChemSim::end();
/* now we get to the part of the master */
double *timings; double *timings;
int *dht_perfs; int *dht_perfs;
@ -238,11 +272,13 @@ void ChemMaster::end() {
double idle_worker_tmp; double idle_worker_tmp;
/* loop over all workers *
* ATTENTION Worker p has rank p+1 */
for (int p = 0; p < world_size - 1; p++) { for (int p = 0; p < world_size - 1; p++) {
/* ATTENTION Worker p has rank p+1 */
/* Send termination message to worker */ /* Send termination message to worker */
MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD); MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
/* ... and receive all timings and metrics from each worker */
MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD, MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
phreeqc_time.push_back(timings[0], "w" + to_string(p + 1)); phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
@ -263,11 +299,11 @@ void ChemMaster::end() {
MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
dht_hits += dht_perfs[0]; dht_hits += dht_perfs[0];
dht_miss += dht_perfs[1]; dht_miss += dht_perfs[1];
cout << "profiler miss = " << dht_miss << endl;
dht_collision += dht_perfs[2]; dht_collision += dht_perfs[2];
} }
} }
/* distribute all data to the R runtime */
R["simtime_chemistry"] = chem_t; R["simtime_chemistry"] = chem_t;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry"); R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
R["simtime_workers"] = worker_t; R["simtime_workers"] = worker_t;
@ -279,11 +315,6 @@ void ChemMaster::end() {
R["seq_master"] = seq_t; R["seq_master"] = seq_t;
R.parseEvalQ("profiling$seq_master <- seq_master"); R.parseEvalQ("profiling$seq_master <- seq_master");
// R["master_send"] = master_send;
// R.parseEvalQ("profiling$master_send <- master_send");
// R["master_recv"] = master_recv;
// R.parseEvalQ("profiling$master_recv <- master_recv");
R["idle_master"] = master_idle; R["idle_master"] = master_idle;
R.parseEvalQ("profiling$idle_master <- idle_master"); R.parseEvalQ("profiling$idle_master <- idle_master");
R["idle_worker"] = idle_worker; R["idle_worker"] = idle_worker;
@ -308,6 +339,7 @@ void ChemMaster::end() {
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time"); R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
} }
/* do some cleanup */
free(timings); free(timings);
if (dht_enabled) free(dht_perfs); if (dht_enabled) free(dht_perfs);

View File

@ -19,13 +19,16 @@ ChemSim::ChemSim(SimParams &params, RRuntime &R_, Grid &grid_)
void ChemSim::run() { void ChemSim::run() {
double chem_a, chem_b; double chem_a, chem_b;
/* start time measuring */
chem_a = MPI_Wtime(); chem_a = MPI_Wtime();
R.parseEvalQ( R.parseEvalQ(
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)"); "result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
/* end time measuring */
chem_b = MPI_Wtime(); chem_b = MPI_Wtime();
chem_t += chem_b - chem_a; chem_t += chem_b - chem_a;
} }

View File

@ -10,112 +10,543 @@
#include "Grid.h" #include "Grid.h"
/** Number of data elements that are kept free at each work package */
#define BUFFER_OFFSET 5 #define BUFFER_OFFSET 5
/** Message tag indicating work */
#define TAG_WORK 42 #define TAG_WORK 42
/** Message tag indicating to finish loop */
#define TAG_FINISH 43 #define TAG_FINISH 43
/** Message tag indicating timing profiling */
#define TAG_TIMING 44 #define TAG_TIMING 44
/** Message tag indicating collecting DHT performance */
#define TAG_DHT_PERF 45 #define TAG_DHT_PERF 45
#define TAG_DHT_STATS 46 /** Message tag indicating simulation reached the end of an itertation */
#define TAG_DHT_STORE 47 #define TAG_DHT_ITER 47
#define TAG_DHT_ITER 48
namespace poet { namespace poet {
/**
* @brief Base class of the chemical simulation
*
* Providing member functions to run an iteration and to end a simulation. Also
* containing basic parameters for simulation.
*
*/
class ChemSim { class ChemSim {
public: public:
/**
* @brief Construct a new ChemSim object
*
* Creating a new instance of class ChemSim will just extract simulation
* parameters from SimParams object.
*
* @param params SimParams object
* @param R_ R runtime
* @param grid_ Initialized grid
*/
ChemSim(SimParams &params, RRuntime &R_, Grid &grid_); ChemSim(SimParams &params, RRuntime &R_, Grid &grid_);
/**
* @brief Run iteration of simulation in sequential mode
*
* This will call the correspondending R function slave_chemistry, followed by
* the execution of master_chemistry.
*
* @todo change function name. Maybe 'slave' to 'seq'.
*
*/
virtual void run(); virtual void run();
/**
* @brief End simulation
*
* End the simulation by distribute the measured runtime of simulation to the
* R runtime.
*
*/
virtual void end(); virtual void end();
/**
* @brief Get the Chemistry Time
*
* @return double Runtime of sequential chemistry simulation in seconds
*/
double getChemistryTime(); double getChemistryTime();
protected: protected:
/**
* @brief Current simulation time or 'age' of simulation
*
*/
double current_sim_time = 0; double current_sim_time = 0;
/**
* @brief Current iteration
*
*/
int iteration = 0; int iteration = 0;
/**
* @brief Current simulation timestep
*
*/
int dt = 0; int dt = 0;
/**
* @brief Rank of process in MPI_COMM_WORLD
*
*/
int world_rank; int world_rank;
/**
* @brief Size of communicator MPI_COMM_WORLD
*
*/
int world_size; int world_size;
/**
* @brief Number of grid cells in each work package
*
*/
unsigned int wp_size; unsigned int wp_size;
/**
* @brief Instance of RRuntime object
*
*/
RRuntime &R; RRuntime &R;
/**
* @brief Initialized grid object
*
*/
Grid &grid; Grid &grid;
/**
* @brief Stores information about size of the current work package
*
*/
std::vector<int> wp_sizes_vector; std::vector<int> wp_sizes_vector;
/**
* @brief Absolute path to output path
*
*/
std::string out_dir; std::string out_dir;
/**
* @brief Pointer to sending buffer
*
*/
double *send_buffer; double *send_buffer;
/**
* @brief Worker struct
*
* This struct contains information which worker as work and which work
* package he is working on.
*
*/
typedef struct { typedef struct {
char has_work; char has_work;
double *send_addr; double *send_addr;
} worker_struct; } worker_struct;
/**
* @brief Pointer to worker_struct
*
*/
worker_struct *workerlist; worker_struct *workerlist;
/**
* @brief Pointer to mpi_buffer
*
* Typically for the master this is a continous C memory area containing the
* grid. For worker the memory area will just have the size of a work package.
*
*/
double *mpi_buffer; double *mpi_buffer;
/**
* @brief Total chemistry runtime
*
*/
double chem_t = 0.f; double chem_t = 0.f;
}; };
/**
* @brief Class providing execution of master chemistry
*
* Providing member functions to run an iteration and to end a simulation. Also
* a loop to send and recv pkgs from workers is implemented.
*
*/
class ChemMaster : public ChemSim { class ChemMaster : public ChemSim {
public: public:
/**
* @brief Construct a new ChemMaster object
*
* The following steps are executed to create a new object of ChemMaster:
* -# all needed simulation parameters are extracted
* -# memory is allocated
* -# distribution of work packages is calculated
*
* @param params Simulation parameters as SimParams object
* @param R_ R runtime
* @param grid_ Grid object
*/
ChemMaster(SimParams &params, RRuntime &R_, Grid &grid_); ChemMaster(SimParams &params, RRuntime &R_, Grid &grid_);
/**
* @brief Destroy the ChemMaster object
*
* By freeing ChemMaster all buffers allocated in the Constructor are freed.
*
*/
~ChemMaster(); ~ChemMaster();
/**
* @brief Run iteration of simulation in parallel mode
*
* To run the chemistry simulation parallel following steps are done:
*
* -# 'Shuffle' the grid by previously calculated distribution of work
* packages. Convert R grid to C memory area.
* -# Start the send/recv loop.
* Detailed description in sendPkgs respectively in recvPkgs.
* -# 'Unshuffle'
* the grid and convert C memory area to R grid.
* -# Run 'master_chemistry'
*
* The main tasks are instrumented with time measurements.
*
*/
void run() override; void run() override;
/**
* @brief End chemistry simulation.
*
* Notify the worker to finish their 'work'-loop. This is done by sending
* every worker an empty message with the tag TAG_FINISH. Now the master will
* receive measured times and DHT metrics from all worker one after another.
* Finally he will write all data to the R runtime and return this function.
*
*/
void end() override; void end() override;
/**
* @brief Get the send time
*
* Time spent in send loop.
*
* @return double sent time in seconds
*/
double getSendTime(); double getSendTime();
/**
* @brief Get the recv time
*
* Time spent in recv loop.
*
* @return double recv time in seconds
*/
double getRecvTime(); double getRecvTime();
/**
* @brief Get the idle time
*
* Time master was idling in MPI_Probe of recv loop.
*
* @return double idle time in seconds
*/
double getIdleTime(); double getIdleTime();
/**
* @brief Get the Worker time
*
* Time spent in whole send/recv loop.
*
* @return double worker time in seconds
*/
double getWorkerTime(); double getWorkerTime();
/**
* @brief Get the ChemMaster time
*
* Time spent in 'master_chemistry' R function.
*
* @return double ChemMaster time in seconds
*/
double getChemMasterTime(); double getChemMasterTime();
/**
* @brief Get the sequential time
*
* Time master executed code which must be run sequential.
*
* @return double seqntial time in seconds.
*/
double getSeqTime(); double getSeqTime();
private: private:
/**
* @brief Print a progressbar
*
* Prints a progressbar to stdout according to count of processed work
* packages in this iteration.
*
* @param count_pkgs Last processed index of work package
* @param n_wp Number of work packages
* @param barWidth Width of the progressbar/Count of characters to display the
* bar
*/
void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70); void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70);
/**
* @brief Start send loop
*
* Send a work package to every free worker, which are noted in a worker
* struct. After a work package was sent move pointer on work grid to the next
* work package. Use MPI_Send to transfer work package to worker.
*
* @param pkg_to_send Pointer to variable containing how much work packages
* are still to send
* @param count_pkgs Pointer to variable indexing the current work package
* @param free_workers Pointer to variable with the count of free workers
*/
void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers); void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers);
/**
* @brief Start recv loop
*
* Receive processed work packages by worker. This is done by first probing
* for a message. If a message is receivable, receive it and put result into
* respective memory area. Continue, but now with a non blocking MPI_Probe. If
* a message is receivable or if no work packages are left to send, receive
* it. Otherwise or if all remaining work packages are received exit loop.
*
* @param pkg_to_recv Pointer to variable counting the to receiving work
* packages
* @param to_send Bool indicating if there are still work packages to send
* @param free_workers Pointer to worker to variable holding the number of
* free workers
*/
void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers); void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers);
/**
* @brief Indicating usage of DHT
*
*/
bool dht_enabled; bool dht_enabled;
/**
* @brief Default number of grid cells in each work package
*
*/
unsigned int wp_size; unsigned int wp_size;
/**
* @brief Pointer to current to be processed work package
*
*/
double *work_pointer; double *work_pointer;
/**
* @brief Time spent in send loop
*
*/
double send_t = 0.f; double send_t = 0.f;
/**
* @brief Time spent in recv loop
*
*/
double recv_t = 0.f; double recv_t = 0.f;
/**
* @brief Time master is idling in MPI_Probe
*
*/
double master_idle = 0.f; double master_idle = 0.f;
/**
* @brief Time spent in send/recv loop
*
*/
double worker_t = 0.f; double worker_t = 0.f;
/**
* @brief Time spent in sequential chemistry part
*
*/
double chem_master = 0.f; double chem_master = 0.f;
/**
* @brief Time spent in sequential instructions
*
*/
double seq_t = 0.f; double seq_t = 0.f;
}; };
/**
* @brief Class providing execution of worker chemistry
*
* Providing mainly a function to loop and wait for messages from the master.
*
*/
class ChemWorker : public ChemSim { class ChemWorker : public ChemSim {
public: public:
/**
* @brief Construct a new ChemWorker object
*
* The following steps are executed to create a new object of ChemWorker:
* -# all needed simulation parameters are extracted
* -# memory is allocated
* -# Preparetion to create a DHT
* -# and finally create a new DHT_Wrapper
*
* @param params Simulation parameters as SimParams object
* @param R_ R runtime
* @param grid_ Grid object
* @param dht_comm Communicator addressing all processes marked as worker
*/
ChemWorker(SimParams &params, RRuntime &R_, Grid &grid_, MPI_Comm dht_comm); ChemWorker(SimParams &params, RRuntime &R_, Grid &grid_, MPI_Comm dht_comm);
/**
* @brief Destroy the ChemWorker object
*
* Therefore all buffers are freed and the DHT_Wrapper object is destroyed.
*
*/
~ChemWorker(); ~ChemWorker();
/**
* @brief Start the 'work' loop
*
* Loop in an endless loop. At the beginning probe for a message from the
* master process. If there is a receivable message evaluate the message tag.
*
*/
void loop(); void loop();
private: private:
/**
* @brief Evaluating message to receive as work package
*
* These steps are done:
*
* -# Receive message
* -# Evaluate message header containing information about work package size,
* current iteration and timestep, simulation age
* -# if DHT is enabled check DHT for previously simulated results
* -# run simulation of work package
* -# send results back to master
* -# if DHT is enabled write simulated grid points to DHT
*
* @param probe_status message status of produced by MPI_Probe in loop
*/
void doWork(MPI_Status &probe_status); void doWork(MPI_Status &probe_status);
/**
* @brief Action to do after iteration
*
* If DHT is enabled print statistics and if dht_snaps is set to 2 write DHT
* snapshots.
*
*/
void postIter(); void postIter();
/**
* @brief Message tag evaluates to TAG_FINISH
*
* Send all the collected timings and (possbile) DHT metrics to the master.
*
*/
void finishWork(); void finishWork();
/**
* @brief Write DHT snapshot
*
*/
void writeFile(); void writeFile();
/**
* @brief Read DHT snapshot
*
*/
void readFile(); void readFile();
/**
* @brief Indicates usage of DHT
*
*/
bool dht_enabled; bool dht_enabled;
/**
* @brief Boolean if dt differs between iterations
*
*/
bool dt_differ; bool dt_differ;
/**
* @brief Value between 0 and 2, indicating when to write DHT snapshots
*
*/
int dht_snaps; int dht_snaps;
/**
* @brief Absolute path to DHT snapshot file
*
*/
std::string dht_file; std::string dht_file;
/**
* @brief Count of bytes each process should allocate for the DHT
*
*/
unsigned int dht_size_per_process; unsigned int dht_size_per_process;
/**
* @brief Indicates which grid cells were previously simulated and don't need
* to be simulated now
*
*/
std::vector<bool> dht_flags; std::vector<bool> dht_flags;
/**
* @brief simulated results are stored here
*
*/
double *mpi_buffer_results; double *mpi_buffer_results;
/**
* @brief Instance of DHT_Wrapper
*
*/
DHT_Wrapper *dht; DHT_Wrapper *dht;
/**
* @brief Array to store timings
*
* The values are stored in following order
*
* -# PHREEQC time
* -# DHT_get time
* -# DHT_fill time
*
*/
double timing[3]; double timing[3];
/**
* @brief Time worker is idling in MPI_Probe
*
*/
double idle_t = 0.f; double idle_t = 0.f;
/**
* @brief Count of PHREEQC calls
*
*/
int phreeqc_count = 0; int phreeqc_count = 0;
}; };
} // namespace poet } // namespace poet

View File

@ -47,15 +47,14 @@ ChemWorker::ChemWorker(SimParams &params, RRuntime &R_, Grid &grid_,
key_size); key_size);
if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl; if (world_rank == 1) cout << "CPP: Worker: DHT created!" << endl;
if (!dht_file.empty()) readFile();
// set size
dht_flags.resize(wp_size, true);
// assign all elements to true (default)
dht_flags.assign(wp_size, true);
} }
if (!dht_file.empty()) readFile();
// set size
dht_flags.resize(wp_size, true);
// assign all elements to true (default)
dht_flags.assign(wp_size, true);
timing[0] = 0.0; timing[0] = 0.0;
timing[1] = 0.0; timing[1] = 0.0;
timing[2] = 0.0; timing[2] = 0.0;
@ -74,12 +73,17 @@ void ChemWorker::loop() {
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status); MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
double idle_b = MPI_Wtime(); double idle_b = MPI_Wtime();
/* there is a work package to receive */
if (probe_status.MPI_TAG == TAG_WORK) { if (probe_status.MPI_TAG == TAG_WORK) {
idle_t += idle_b - idle_a; idle_t += idle_b - idle_a;
doWork(probe_status); doWork(probe_status);
} else if (probe_status.MPI_TAG == TAG_DHT_ITER) { }
/* end of iteration */
else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
postIter(); postIter();
} else if (probe_status.MPI_TAG == TAG_FINISH) { }
/* end of simulation */
else if (probe_status.MPI_TAG == TAG_FINISH) {
finishWork(); finishWork();
break; break;
} }
@ -94,17 +98,18 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
double phreeqc_time_start, phreeqc_time_end; double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end; double dht_fill_start, dht_fill_end;
/* get number of doubles sent */ /* get number of doubles to be received */
MPI_Get_count(&probe_status, MPI_DOUBLE, &count); MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
/* receive */ /* receive */
MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
// decrement count of work_package by BUFFER_OFFSET /* decrement count of work_package by BUFFER_OFFSET */
count -= BUFFER_OFFSET; count -= BUFFER_OFFSET;
// check for changes on all additional variables given by the 'header' of
// mpi_buffer /* check for changes on all additional variables given by the 'header' of
* mpi_buffer */
// work_package_size // work_package_size
if (mpi_buffer[count] != local_work_package_size) { // work_package_size if (mpi_buffer[count] != local_work_package_size) { // work_package_size
@ -139,48 +144,24 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
// R["mysetup$placeholder"] = placeholder; // R["mysetup$placeholder"] = placeholder;
// } // }
/* get df with right structure to fill in work package */
// R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
// R["skeleton"] = grid.buildDataFrame(work_package_size);
//// R.parseEval("print(rownames(tmp2)[1:5])");
//// R.parseEval("print(head(tmp2, 2))");
//// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
////Rcpp::DataFrame buffer = R.parseEval("tmp2");
// R.setBufferDataFrame("skeleton");
if (dht_enabled) { if (dht_enabled) {
// DEBUG /* resize helper vector dht_flags of work_package_size changes */
// cout << "RANK " << world_rank << " start checking DHT\n";
// resize helper vector dht_flags of work_package_size changes
if ((int)dht_flags.size() != local_work_package_size) { if ((int)dht_flags.size() != local_work_package_size) {
dht_flags.resize(local_work_package_size, true); // set size dht_flags.resize(local_work_package_size, true); // set size
dht_flags.assign(local_work_package_size, dht_flags.assign(local_work_package_size,
true); // assign all elements to true (default) true); // assign all elements to true (default)
} }
/* check for values in DHT */
dht_get_start = MPI_Wtime(); dht_get_start = MPI_Wtime();
dht->checkDHT(local_work_package_size, dht_flags, mpi_buffer, dt); dht->checkDHT(local_work_package_size, dht_flags, mpi_buffer, dt);
dht_get_end = MPI_Wtime(); dht_get_end = MPI_Wtime();
// DEBUG /* distribute dht_flags to R Runtime */
// cout << "RANK " << world_rank << " checking DHT complete \n";
R["dht_flags"] = as<LogicalVector>(wrap(dht_flags)); R["dht_flags"] = as<LogicalVector>(wrap(dht_flags));
// R.parseEvalQ("print(head(dht_flags))");
} }
/* work */ /* Convert grid to R runtime */
// R.from_C_domain(mpi_buffer);
////convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
// R["work_package_full"] = R.getBufferDataFrame();
// R["work_package"] = buffer;
// DEBUG
// R.parseEvalQ("print(head(work_package_full))");
// R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
grid.importWP(mpi_buffer, wp_size); grid.importWP(mpi_buffer, wp_size);
if (dht_enabled) { if (dht_enabled) {
@ -189,14 +170,6 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
R.parseEvalQ("work_package <- work_package_full"); R.parseEvalQ("work_package <- work_package_full");
} }
// DEBUG
// R.parseEvalQ("print(head(work_package),2)");
// R.parseEvalQ("rownames(work_package) <- work_package$id");
// R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in%
// colnames(work_package)"); R.parseEvalQ("id_store <-
// rownames(work_package)"); //"[, ncol(work_package)]");
// R.parseEvalQ("work_package$id <- NULL");
R.parseEvalQ("work_package <- as.matrix(work_package)"); R.parseEvalQ("work_package <- as.matrix(work_package)");
unsigned int nrows = R.parseEval("nrow(work_package)"); unsigned int nrows = R.parseEval("nrow(work_package)");
@ -209,22 +182,18 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
"work_package <- work_package[rep(1:nrow(work_package), " "work_package <- work_package[rep(1:nrow(work_package), "
"times = 2), ]"); "times = 2), ]");
} }
/* Run PHREEQC */
phreeqc_count++;
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
// MDL
// R.parseEvalQ("print('Work_package:\n'); print(head(work_package ,
// 2)); cat('RCpp: worker_function:', local_rank, ' \n')");
R.parseEvalQ( R.parseEvalQ(
"result <- as.data.frame(slave_chemistry(setup=mysetup, " "result <- as.data.frame(slave_chemistry(setup=mysetup, "
"data = work_package))"); "data = work_package))");
phreeqc_time_end = MPI_Wtime(); phreeqc_time_end = MPI_Wtime();
// R.parseEvalQ("result$id <- id_store");
} else { } else {
// cout << "Work-Package is empty, skipping phreeqc!" << endl; // undefined behaviour, isn't it?
} }
phreeqc_count++;
if (dht_enabled) { if (dht_enabled) {
R.parseEvalQ("result_full <- work_package_full"); R.parseEvalQ("result_full <- work_package_full");
if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result"); if (nrows > 0) R.parseEvalQ("result_full[dht_flags,] <- result");
@ -232,11 +201,7 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
R.parseEvalQ("result_full <- result"); R.parseEvalQ("result_full <- result");
} }
// R.setBufferDataFrame("result_full"); /* convert grid to C domain */
////Rcpp::DataFrame result = R.parseEval("result_full");
////convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
// R.to_C_domain(mpi_buffer_results);
grid.exportWP(mpi_buffer_results); grid.exportWP(mpi_buffer_results);
/* send results to master */ /* send results to master */
MPI_Request send_req; MPI_Request send_req;
@ -244,6 +209,7 @@ void ChemWorker::doWork(MPI_Status &probe_status) {
&send_req); &send_req);
if (dht_enabled) { if (dht_enabled) {
/* write results to DHT */
dht_fill_start = MPI_Wtime(); dht_fill_start = MPI_Wtime();
dht->fillDHT(local_work_package_size, dht_flags, mpi_buffer, dht->fillDHT(local_work_package_size, dht_flags, mpi_buffer,
mpi_buffer_results, dt); mpi_buffer_results, dt);
@ -327,8 +293,4 @@ void ChemWorker::finishWork() {
} }
if (dht_enabled && dht_snaps > 0) writeFile(); if (dht_enabled && dht_snaps > 0) writeFile();
// free(mpi_buffer);
// free(mpi_buffer_results);
// delete dht;
} }

View File

@ -2,25 +2,15 @@
using namespace poet; using namespace poet;
using namespace Rcpp; using namespace Rcpp;
/**
* At this moment init will only declare and define a variable inside the R
* runtime called grid_tmp since the whole Grid initialization and management is
* done by the R runtime. This may change in the future.
*/
void Grid::init() { void Grid::init() {
R.parseEval("GRID_TMP <- mysetup$state_C"); R.parseEval("GRID_TMP <- mysetup$state_C");
this->ncol = R.parseEval("ncol(GRID_TMP)"); this->ncol = R.parseEval("ncol(GRID_TMP)");
this->nrow = R.parseEval("nrow(GRID_TMP)"); this->nrow = R.parseEval("nrow(GRID_TMP)");
} }
/**
* Returns the number of elements for each gridcell.
*/
unsigned int Grid::getCols() { return this->ncol; } unsigned int Grid::getCols() { return this->ncol; }
/**
* Returns the number of gridcells.
*/
unsigned int Grid::getRows() { return this->nrow; } unsigned int Grid::getRows() { return this->nrow; }
void Grid::shuffleAndExport(double *buffer) { void Grid::shuffleAndExport(double *buffer) {
@ -36,8 +26,8 @@ void Grid::importAndUnshuffle(double *buffer) {
R.parseEval("result <- unshuffle_field(GRID_CHEM_DATA, ordered_ids)"); R.parseEval("result <- unshuffle_field(GRID_CHEM_DATA, ordered_ids)");
} }
void Grid::importWP(double *buffer, unsigned int p_size) { void Grid::importWP(double *buffer, unsigned int wp_size) {
R["GRID_WP_SKELETON"] = getSkeletonDataFrame(p_size); R["GRID_WP_SKELETON"] = getSkeletonDataFrame(wp_size);
R.setBufferDataFrame("GRID_WP_SKELETON"); R.setBufferDataFrame("GRID_WP_SKELETON");
R.from_C_domain(buffer); R.from_C_domain(buffer);
R["work_package_full"] = R.getBufferDataFrame(); R["work_package_full"] = R.getBufferDataFrame();
@ -46,12 +36,7 @@ void Grid::exportWP(double *buffer) {
R.setBufferDataFrame("result_full"); R.setBufferDataFrame("result_full");
R.to_C_domain(buffer); R.to_C_domain(buffer);
} }
/**
* Create a data frame with n rows.
*
* @return Can be seen as a skeleton. The content of the data frame might be
* irrelevant.
*/
Rcpp::DataFrame Grid::getSkeletonDataFrame(unsigned int rows) { Rcpp::DataFrame Grid::getSkeletonDataFrame(unsigned int rows) {
R["GRID_ROWS"] = rows; R["GRID_ROWS"] = rows;

View File

@ -5,26 +5,121 @@
#include <Rcpp.h> #include <Rcpp.h>
namespace poet { namespace poet {
/**
* @brief Class describing the grid
*
* Providing methods to shuffle and unshuffle grid (for the master) as also to
* import and export a work package (for worker).
*
* @todo find better abstraction
*
*/
class Grid { class Grid {
public: public:
/**
* @brief Construct a new Grid object
*
* This will call the default constructor and initializes private RRuntime
* with given R runtime.
*
* @param R
*/
Grid(RRuntime &R) : R(R){}; Grid(RRuntime &R) : R(R){};
/**
* @brief Init the grid
*
* At this moment init will only declare and define a variable inside the R
* runtime called grid_tmp since the whole Grid initialization and management
* is done by the R runtime. This may change in the future.
*
*/
void init(); void init();
/**
* @brief Returns the number of elements for each gridcell
*
* @return unsigned int Number of elements
*/
unsigned int getCols(); unsigned int getCols();
/**
* @brief Returns the number of grid cells
*
* @return unsigned int Number of grid cells
*/
unsigned int getRows(); unsigned int getRows();
/**
* @brief Shuffle the grid and export it to C memory area
*
* This will call shuffle_field inside R runtime, set the resulting grid as
* buffered data frame in RRuntime object and write R grid to continous C
* memory area.
*
* @param[in,out] buffer Pointer to C memory area
*/
void shuffleAndExport(double *buffer); void shuffleAndExport(double *buffer);
/**
* @brief Unshuffle the grid and import it from C memory area into R runtime
*
* Write C memory area into temporary R grid variable and unshuffle it.
*
* @param buffer Pointer to C memory area
*/
void importAndUnshuffle(double *buffer); void importAndUnshuffle(double *buffer);
void importWP(double *buffer, unsigned int p_size); /**
* @brief Import a C memory area as a work package into R runtime
*
* Get a skeleton from getSkeletonDataFrame inside R runtime and set this as
* buffer data frame of RRuntime object. Now convert C memory area to R data
* structure.
*
* @param buffer Pointer to C memory area
* @param wp_size Count of grid cells per work package
*/
void importWP(double *buffer, unsigned int wp_size);
/**
* @brief Export a work package from R runtime into C memory area
*
* Set buffer data frame in RRuntime object to data frame holding the results
* and convert this to C memory area.
*
* @param buffer Pointer to C memory area
*/
void exportWP(double *buffer); void exportWP(double *buffer);
private: private:
/**
* @brief Instance of RRuntime
*
*/
RRuntime R; RRuntime R;
/**
* @brief Number of columns of grid
*
*/
unsigned int ncol; unsigned int ncol;
/**
* @brief Number of rows of grid
*
*/
unsigned int nrow; unsigned int nrow;
/**
* @brief Get a SkeletonDataFrame
*
* Return a skeleton with \e n rows of current grid
*
* @param rows number of rows to return skeleton
* @return Rcpp::DataFrame Can be seen as a skeleton. The content of the data
* frame might be irrelevant.
*/
Rcpp::DataFrame getSkeletonDataFrame(unsigned int rows); Rcpp::DataFrame getSkeletonDataFrame(unsigned int rows);
}; };
} // namespace poet } // namespace poet

View File

@ -4,18 +4,57 @@
#include <RRuntime.h> #include <RRuntime.h>
namespace poet { namespace poet {
/**
* @brief Class describing transport simulation
*
* Offers simple methods to run an iteration and end the simulation.
*
*/
class TransportSim { class TransportSim {
public: public:
/**
* @brief Construct a new TransportSim object
*
* The instance will only be initialized with given R object.
*
* @param R RRuntime object
*/
TransportSim(RRuntime &R); TransportSim(RRuntime &R);
/**
* @brief Run simulation for one iteration
*
* This will simply call the R function 'master_advection'
*
*/
void run(); void run();
/**
* @brief End simulation
*
* All measured timings are distributed to the R runtime
*
*/
void end(); void end();
/**
* @brief Get the transport time
*
* @return double time spent in transport
*/
double getTransportTime(); double getTransportTime();
private: private:
/**
* @brief Instance of RRuntime
*
*/
RRuntime &R; RRuntime &R;
/**
* @brief time spent for transport
*
*/
double transport_t = 0.f; double transport_t = 0.f;
}; };
} // namespace poet } // namespace poet

View File

@ -18,24 +18,37 @@
/** Standard work package size */ /** Standard work package size */
#define WORK_PACKAGE_SIZE_DEFAULT 5 #define WORK_PACKAGE_SIZE_DEFAULT 5
namespace poet {
/**
* @brief Defining all simulation parameters
*
*/
typedef struct { typedef struct {
/** Count of processes in MPI_COMM_WORLD */
int world_size; int world_size;
/** rank of proces in MPI_COMM_WORLD */
int world_rank; int world_rank;
/** indicates if DHT should be used */
bool dht_enabled; bool dht_enabled;
/** apply logarithm to key before rounding */
bool dht_log; bool dht_log;
/** indicates if timestep dt differs between iterations */
bool dt_differ; bool dt_differ;
/** Indicates, when a DHT snapshot should be written */
int dht_snaps; int dht_snaps;
/** <b>not implemented</b>: How a DHT is distributed over processes */
int dht_strategy; int dht_strategy;
/** Size of DHt per process in byter */
unsigned int dht_size_per_process; unsigned int dht_size_per_process;
/** Default significant digit for rounding */
int dht_significant_digits; int dht_significant_digits;
/** Default work package size */
unsigned int wp_size; unsigned int wp_size;
/** indicates if resulting grid should be stored after every iteration */
bool store_result; bool store_result;
} t_simparams; } t_simparams;
namespace poet {
/** /**
* @brief Reads information from program arguments and R runtime * @brief Reads information from program arguments and R runtime
* *