BREAKING CHANGE: Introduce ChemistryModule as extension of PhreeqcRM

This commit is contained in:
Max Luebke 2023-02-14 16:12:14 +01:00 committed by Max Lübke
parent a493054f9d
commit e6819b59bc
26 changed files with 2082 additions and 1759 deletions

View File

@ -2,9 +2,15 @@ get_poet_version()
configure_file(poet.h.in poet.h)
add_executable(poet poet.cpp)
if (POET_USE_PRM_BACKEND)
set(poet_SRC poet_prm.cpp)
else()
set(poet_SRC poet.cpp)
endif()
add_executable(poet ${poet_SRC})
target_include_directories(poet PUBLIC "${CMAKE_CURRENT_BINARY_DIR}")
target_link_libraries(poet PUBLIC poet_lib MPI::MPI_C)
target_link_libraries(poet PUBLIC poet_lib MPI::MPI_CXX)
#target_compile_definitions(poet PRIVATE OMPI_SKIP_MPICXX)
install(TARGETS poet DESTINATION bin)

View File

@ -18,11 +18,12 @@
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/ChemistryModule.hpp"
#include <RInside.h>
#include <Rcpp.h>
#include <Rcpp/internal/wrap.h>
#include <cstdint>
#include <poet/ChemSimPar.hpp>
#include <poet/ChemSimSeq.hpp>
#include <cstdlib>
#include <poet/DiffusionModule.hpp>
#include <poet/Grid.hpp>
#include <poet/SimParams.hpp>
@ -39,9 +40,65 @@ using namespace std;
using namespace poet;
using namespace Rcpp;
template <class ChemistryInstance>
poet::ChemistryModule::SingleCMap DFToHashMap(const Rcpp::DataFrame &df) {
std::unordered_map<std::string, double> out_map;
vector<string> col_names = Rcpp::as<vector<string>>(df.names());
for (const auto &name : col_names) {
double val = df[name.c_str()];
out_map.insert({name, val});
}
return out_map;
}
void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
const std::string &database_path) {
chem.SetErrorHandlerMode(1);
chem.SetComponentH2O(false);
chem.SetRebalanceFraction(0.5);
chem.SetRebalanceByCell(true);
chem.UseSolutionDensityVolume(false);
chem.SetPartitionUZSolids(false);
// Set concentration units
// 1, mg/L; 2, mol/L; 3, kg/kgs
chem.SetUnitsSolution(2);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsPPassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsExchange(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsSurface(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsGasPhase(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsSSassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsKinetics(1);
// Set representative volume
std::vector<double> rv;
rv.resize(wp_size, 1.0);
chem.SetRepresentativeVolume(rv);
// Set initial porosity
std::vector<double> por;
por.resize(wp_size, 1);
chem.SetPorosity(por);
// Set initial saturation
std::vector<double> sat;
sat.resize(wp_size, 1.0);
chem.SetSaturation(sat);
// Load database
chem.LoadDatabase(database_path);
}
inline double RunMasterLoop(SimParams &params, RInside &R, Grid &grid,
ChemistryParams &chem_params) {
ChemistryParams &chem_params,
const GridParams &g_params, uint32_t nxyz_master) {
DiffusionModule diffusion(poet::DiffusionParams(R), grid);
/* Iteration Count is dynamic, retrieving value from R (is only needed by
@ -50,8 +107,31 @@ inline double RunMasterLoop(SimParams &params, RInside &R, Grid &grid,
double sim_time = .0;
ChemistryInstance C(params, R, grid);
C.InitModule(chem_params);
ChemistryModule chem(nxyz_master, params.getNumParams().wp_size,
MPI_COMM_WORLD);
set_chem_parameters(chem, nxyz_master, chem_params.database_path);
chem.RunInitFile(chem_params.input_script);
chem.InitializeField(grid.GetTotalCellCount(), DFToHashMap(g_params.init_df));
if (params.getNumParams().dht_enabled) {
chem.SetDHTEnabled(true, params.getNumParams().dht_size_per_process);
if (!params.getDHTSignifVector().empty()) {
chem.SetDHTSignifVector(params.getDHTSignifVector());
}
if (!params.getDHTPropTypeVector().empty()) {
chem.SetDHTPropTypeVector(params.getDHTPropTypeVector());
}
if (!params.getDHTFile().empty()) {
chem.ReadDHTFile(params.getDHTFile());
}
if (params.getNumParams().dht_snaps > 0) {
chem.SetDHTSnaps(params.getNumParams().dht_snaps, params.getOutDir());
}
}
StateMemory *chem_state = grid.RegisterState("state_C", chem.GetPropNames());
chem_state->mem = chem.GetField();
/* SIMULATION LOOP */
double dStartTime = MPI_Wtime();
@ -77,7 +157,13 @@ inline double RunMasterLoop(SimParams &params, RInside &R, Grid &grid,
cout << "CPP: Chemistry" << endl;
C.Simulate(dt);
chem.SetTimeStep(dt);
chem.SetField(chem_state->mem);
chem.SetTimeStep(dt);
chem.RunCells();
chem_state->mem = chem.GetField();
grid.PreModuleFieldCopy(tick++);
R["req_dt"] = dt;
@ -97,12 +183,50 @@ inline double RunMasterLoop(SimParams &params, RInside &R, Grid &grid,
<< endl
<< endl;
MPI_Barrier(MPI_COMM_WORLD);
// MPI_Barrier(MPI_COMM_WORLD);
} // END SIMULATION LOOP
R.parseEvalQ("profiling <- list()");
C.End();
R["simtime_chemistry"] = chem.GetChemistryTime();
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
R["chemistry_loop"] = chem.GetMasterLoopTime();
R.parseEvalQ("profiling$chemistry_loop <- chemistry_loop");
R["chemistry_sequential"] = chem.GetMasterSequentialTime();
R.parseEvalQ("profiling$simtime_sequential <- chemistry_sequential");
R["idle_master"] = chem.GetMasterIdleTime();
R.parseEvalQ("profiling$idle_master <- idle_master");
R["idle_worker"] = Rcpp::wrap(chem.GetWorkerIdleTimings());
R.parseEvalQ("profiling$idle_worker <- idle_worker");
R["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings());
R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
R["simtime_transport"] = diffusion.getTransportTime();
R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
// R["phreeqc_count"] = phreeqc_counts;
// R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
if (params.getNumParams().dht_enabled) {
R["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_miss"] = Rcpp::wrap(chem.GetWorkerDHTMiss());
R.parseEvalQ("profiling$dht_miss <- dht_miss");
R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
chem.MasterLoopBreak();
diffusion.end();
return MPI_Wtime() - dStartTime;
@ -119,20 +243,36 @@ int main(int argc, char *argv[]) {
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
/*Create custom Communicator with all processes except 0 (the master) for DHT
* storage */
MPI_Comm dht_comm;
if (world_rank == 0) {
MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, world_rank, &dht_comm);
} else {
MPI_Comm_split(MPI_COMM_WORLD, 1, world_rank, &dht_comm);
}
if (world_rank == 0) {
cout << "Running POET in version " << poet_version << endl << endl;
}
if (world_rank > 0) {
{
uint32_t c_size;
MPI_Bcast(&c_size, 1, MPI_UINT32_T, 0, MPI_COMM_WORLD);
char *buffer = new char[c_size + 1];
MPI_Bcast(buffer, c_size, MPI_CHAR, 0, MPI_COMM_WORLD);
buffer[c_size] = '\0';
// ChemistryModule worker(nxyz, nxyz, MPI_COMM_WORLD);
ChemistryModule worker =
poet::ChemistryModule::createWorker(MPI_COMM_WORLD);
set_chem_parameters(worker, worker.GetWPSize(), std::string(buffer));
delete[] buffer;
worker.WorkerLoop();
}
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: finished, cleanup of process " << world_rank << endl;
MPI_Finalize();
return EXIT_SUCCESS;
}
/* initialize R runtime */
RInside R(argc, argv);
@ -154,42 +294,18 @@ int main(int argc, char *argv[]) {
cout << "CPP: R Init (RInside) on process " << world_rank << endl;
// HACK: we disable master_init and dt_differ propagation here for testing
// purposes
//
bool dt_differ = false;
R.parseEvalQ("mysetup <- setup");
if (world_rank == 0) { // get timestep vector from
// grid_init function ... //
std::string master_init_code = "mysetup <- master_init(setup=setup)";
R.parseEval(master_init_code);
// dt_differ = R.parseEval("mysetup$dt_differ");
// // ... and broadcast it to every other rank unequal to 0
MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
}
/* workers will only read the setup DataFrame defined by input file */
else {
// R.parseEval("mysetup <- setup");
MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
}
params.setDtDiffer(dt_differ);
// initialize chemistry on all processes
// TODO: einlesen einer initial matrix (DataFrame)
// std::string init_chemistry_code = "mysetup <-
// init_chemistry(setup=mysetup)"; R.parseEval(init_chemistry_code);
// TODO: Grid anpassen
// if (world_rank == 0) { // get timestep vector from
// grid_init function ... //
std::string master_init_code = "mysetup <- master_init(setup=setup)";
R.parseEval(master_init_code);
Grid grid;
GridParams g_params(R);
grid.InitModuleFromParams(GridParams(R));
grid.PushbackModuleFlow(poet::DIFFUSION_MODULE_NAME,
poet::BaseChemModule::CHEMISTRY_MODULE_NAME);
grid.PushbackModuleFlow(poet::BaseChemModule::CHEMISTRY_MODULE_NAME,
poet::DIFFUSION_MODULE_NAME);
grid.InitModuleFromParams(g_params);
grid.PushbackModuleFlow(poet::DIFFUSION_MODULE_NAME, CHEMISTRY_MODULE_NAME);
grid.PushbackModuleFlow(CHEMISTRY_MODULE_NAME, poet::DIFFUSION_MODULE_NAME);
params.initVectorParams(R, grid.GetSpeciesCount());
@ -203,38 +319,35 @@ int main(int argc, char *argv[]) {
cout << "CPP: Init done on process with rank " << world_rank << endl;
}
MPI_Barrier(MPI_COMM_WORLD);
// MPI_Barrier(MPI_COMM_WORLD);
poet::ChemistryParams chem_params(R);
/* THIS IS EXECUTED BY THE MASTER */
if (world_rank == 0) {
if (world_size == 1) {
dSimTime = RunMasterLoop<ChemSeq>(params, R, grid, chem_params);
} else {
dSimTime = RunMasterLoop<ChemMaster>(params, R, grid, chem_params);
}
std::string db_path = chem_params.database_path;
uint32_t c_size = db_path.size();
MPI_Bcast(&c_size, 1, MPI_UINT32_T, 0, MPI_COMM_WORLD);
cout << "CPP: finished simulation loop" << endl;
MPI_Bcast(db_path.data(), c_size, MPI_CHAR, 0, MPI_COMM_WORLD);
uint32_t nxyz_master = (world_size == 1 ? grid.GetTotalCellCount() : 1);
cout << "CPP: start timing profiling" << endl;
dSimTime = RunMasterLoop(params, R, grid, chem_params, g_params, nxyz_master);
R["simtime"] = dSimTime;
R.parseEvalQ("profiling$simtime <- simtime");
cout << "CPP: finished simulation loop" << endl;
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: start timing profiling" << endl;
cout << "CPP: Done! Results are stored as R objects into <"
<< params.getOutDir() << "/timings.rds>" << endl;
}
/* THIS IS EXECUTED BY THE WORKERS */
else {
ChemWorker worker(params, R, grid, dht_comm);
worker.InitModule(chem_params);
worker.loop();
}
R["simtime"] = dSimTime;
R.parseEvalQ("profiling$simtime <- simtime");
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: Done! Results are stored as R objects into <"
<< params.getOutDir() << "/timings.rds>" << endl;
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: finished, cleanup of process " << world_rank << endl;
MPI_Finalize();

View File

@ -1,6 +1,10 @@
#ifndef POET_H
#define POET_H
#include "poet/ChemistryModule.hpp"
#include <Rcpp.h>
const char *poet_version = "@POET_VERSION@";
const char *CHEMISTRY_MODULE_NAME = "state_C";
#endif // POET_H

287
app/poet_prm.cpp Normal file
View File

@ -0,0 +1,287 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/ChemistryModule.hpp"
#include <RInside.h>
#include <Rcpp.h>
#include <Rcpp/internal/wrap.h>
#include <cstdint>
#include <cstdlib>
#include <poet/DiffusionModule.hpp>
#include <poet/Grid.hpp>
#include <poet/SimParams.hpp>
#include <cstring>
#include <iostream>
#include <string>
#include <vector>
#include <mpi.h>
#include <poet.h>
using namespace std;
using namespace poet;
using namespace Rcpp;
void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
const std::string &database_path) {
chem.SetErrorHandlerMode(1);
chem.SetComponentH2O(false);
chem.SetRebalanceFraction(0.5);
chem.SetRebalanceByCell(true);
chem.UseSolutionDensityVolume(false);
chem.SetPartitionUZSolids(false);
// Set concentration units
// 1, mg/L; 2, mol/L; 3, kg/kgs
chem.SetUnitsSolution(2);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsPPassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsExchange(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsSurface(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsGasPhase(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsSSassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsKinetics(1);
// Set representative volume
std::vector<double> rv;
rv.resize(wp_size, 1.0);
chem.SetRepresentativeVolume(rv);
// Set initial porosity
std::vector<double> por;
por.resize(wp_size, 1);
chem.SetPorosity(por);
// Set initial saturation
std::vector<double> sat;
sat.resize(wp_size, 1.0);
chem.SetSaturation(sat);
// Load database
chem.LoadDatabase(database_path);
}
inline double RunMasterLoop(SimParams &params, RInside &R, Grid &grid,
ChemistryParams &chem_params,
const GridParams &g_params, uint32_t nxyz_master) {
DiffusionModule diffusion(poet::DiffusionParams(R), grid);
/* Iteration Count is dynamic, retrieving value from R (is only needed by
* master for the following loop) */
uint32_t maxiter = R.parseEval("mysetup$iterations");
double sim_time = .0;
ChemistryModule chem(grid.GetTotalCellCount(), MPI_COMM_WORLD);
set_chem_parameters(chem, nxyz_master, chem_params.database_path);
chem.RunInitFile(chem_params.input_script);
chem.SetSelectedOutputOn(true);
chem.SetTimeStep(0);
chem.RunCells();
StateMemory *chem_state = grid.RegisterState("state_C", chem.GetPropNames());
auto &chem_field = chem_state->mem;
chem_field = chem.GetField();
/* SIMULATION LOOP */
double dStartTime = MPI_Wtime();
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
uint32_t tick = 0;
// cout << "CPP: Evaluating next time step" << endl;
// R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
double dt = Rcpp::as<double>(
R.parseEval("mysetup$timesteps[" + std::to_string(iter) + "]"));
cout << "CPP: Next time step is " << dt << "[s]" << endl;
/* displaying iteration number, with C++ and R iterator */
cout << "CPP: Going through iteration " << iter << endl;
cout << "CPP: R's $iter: " << ((uint32_t)(R.parseEval("mysetup$iter")))
<< ". Iteration" << endl;
/* run transport */
// TODO: transport to diffusion
diffusion.simulate(dt);
grid.PreModuleFieldCopy(tick++);
cout << "CPP: Chemistry" << endl;
chem.SetTimeStep(dt);
chem.SetConcentrations(chem_field);
chem.SetTimeStep(dt);
chem.RunCells();
chem_field = chem.GetField();
grid.WriteFieldsToR(R);
grid.PreModuleFieldCopy(tick++);
R["req_dt"] = dt;
R["simtime"] = (sim_time += dt);
R.parseEval("mysetup$req_dt <- req_dt");
R.parseEval("mysetup$simtime <- simtime");
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
// MPI_Barrier(MPI_COMM_WORLD);
} // END SIMULATION LOOP
R.parseEvalQ("profiling <- list()");
R["simtime_chemistry"] = chem.GetChemistryTime();
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
chem.MpiWorkerBreak();
diffusion.end();
return MPI_Wtime() - dStartTime;
}
int main(int argc, char *argv[]) {
double dSimTime, sim_end;
int world_size, world_rank;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
if (world_rank == 0) {
cout << "Running POET in version " << poet_version << endl << endl;
}
if (world_rank > 0) {
{
uint32_t nxyz;
MPI_Bcast(&nxyz, 1, MPI_UINT32_T, 0, MPI_COMM_WORLD);
ChemistryModule worker(nxyz, MPI_COMM_WORLD);
worker.MpiWorker();
}
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: finished, cleanup of process " << world_rank << endl;
MPI_Finalize();
return EXIT_SUCCESS;
}
/* initialize R runtime */
RInside R(argc, argv);
/*Loading Dependencies*/
// TODO: kann raus
std::string r_load_dependencies = "source('../R_lib/kin_r_library.R');";
R.parseEvalQ(r_load_dependencies);
SimParams params(world_rank, world_size);
int pret = params.parseFromCmdl(argv, R);
if (pret == poet::PARSER_ERROR) {
MPI_Finalize();
return EXIT_FAILURE;
} else if (pret == poet::PARSER_HELP) {
MPI_Finalize();
return EXIT_SUCCESS;
}
cout << "CPP: R Init (RInside) on process " << world_rank << endl;
R.parseEvalQ("mysetup <- setup");
// if (world_rank == 0) { // get timestep vector from
// grid_init function ... //
std::string master_init_code = "mysetup <- master_init(setup=setup)";
R.parseEval(master_init_code);
Grid grid;
GridParams g_params(R);
grid.InitModuleFromParams(g_params);
grid.PushbackModuleFlow(poet::DIFFUSION_MODULE_NAME, CHEMISTRY_MODULE_NAME);
grid.PushbackModuleFlow(CHEMISTRY_MODULE_NAME, poet::DIFFUSION_MODULE_NAME);
params.initVectorParams(R, grid.GetSpeciesCount());
// MDL: store all parameters
if (world_rank == 0) {
cout << "CPP: Calling R Function to store calling parameters" << endl;
R.parseEvalQ("StoreSetup(setup=mysetup)");
}
if (world_rank == 0) {
cout << "CPP: Init done on process with rank " << world_rank << endl;
}
// MPI_Barrier(MPI_COMM_WORLD);
poet::ChemistryParams chem_params(R);
/* THIS IS EXECUTED BY THE MASTER */
uint32_t nxyz = grid.GetTotalCellCount();
MPI_Bcast(&nxyz, 1, MPI_UINT32_T, 0, MPI_COMM_WORLD);
uint32_t nxyz_master = grid.GetTotalCellCount();
dSimTime = RunMasterLoop(params, R, grid, chem_params, g_params, nxyz_master);
cout << "CPP: finished simulation loop" << endl;
cout << "CPP: start timing profiling" << endl;
R["simtime"] = dSimTime;
R.parseEvalQ("profiling$simtime <- simtime");
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: Done! Results are stored as R objects into <"
<< params.getOutDir() << "/timings.rds>" << endl;
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: finished, cleanup of process " << world_rank << endl;
MPI_Finalize();
if (world_rank == 0) {
cout << "CPP: done, bye!" << endl;
}
exit(0);
}

View File

@ -1,13 +1,6 @@
SELECTED_OUTPUT
-high_precision true
-reset false
-time
-soln
-temperature true
-water true
-pH
-pe
-totals C Ca Cl Mg
-kinetic_reactants Calcite Dolomite
-equilibrium O2g

@ -1 +1 @@
Subproject commit cc5ef0ff37233d24108a793a74abe649dcae54b7
Subproject commit f862889b693507458d450c6319abe5e1dce030f7

View File

@ -1,273 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef CHEMSIMPAR_H
#define CHEMSIMPAR_H
#include "ChemSimSeq.hpp"
#include "DHT_Wrapper.hpp"
#include "Grid.hpp"
#include "RInside.h"
#include "SimParams.hpp"
#include "poet/PhreeqcWrapper.hpp"
#include <array>
#include <cstdint>
#include <mpi.h>
#include <string>
#include <vector>
/** Number of data elements that are kept free at each work package */
#define BUFFER_OFFSET 5
/** Message tag indicating work */
#define TAG_WORK 42
/** Message tag indicating to finish loop */
#define TAG_FINISH 43
/** Message tag indicating timing profiling */
#define TAG_TIMING 44
/** Message tag indicating collecting DHT performance */
#define TAG_DHT_PERF 45
/** Message tag indicating simulation reached the end of an itertation */
#define TAG_DHT_ITER 47
namespace poet {
/**
* @brief Class providing execution of master chemistry
*
* Providing member functions to run an iteration and to end a simulation. Also
* a loop to send and recv pkgs from workers is implemented.
*
*/
class ChemMaster : public BaseChemModule {
public:
/**
* @brief Construct a new ChemMaster object
*
* The following steps are executed to create a new object of ChemMaster:
* -# all needed simulation parameters are extracted
* -# memory is allocated
* -# distribution of work packages is calculated
*
* @param params Simulation parameters as SimParams object
* @param R_ R runtime
* @param grid_ Grid object
*/
ChemMaster(SimParams &params, RInside &R_, Grid &grid_);
/**
* @brief Destroy the ChemMaster object
*
* By freeing ChemMaster all buffers allocated in the Constructor are freed.
*
*/
~ChemMaster();
/**
* @brief Run iteration of simulation in parallel mode
*
* To run the chemistry simulation parallel following steps are done:
*
* -# 'Shuffle' the grid by previously calculated distribution of work
* packages. Convert R grid to C memory area.
* -# Start the send/recv loop.
* Detailed description in sendPkgs respectively in recvPkgs.
* -# 'Unshuffle'
* the grid and convert C memory area to R grid.
* -# Run 'master_chemistry'
*
* The main tasks are instrumented with time measurements.
*
*/
void Simulate(double dt);
/**
* @brief End chemistry simulation.
*
* Notify the worker to finish their 'work'-loop. This is done by sending
* every worker an empty message with the tag TAG_FINISH. Now the master will
* receive measured times and DHT metrics from all worker one after another.
* Finally he will write all data to the R runtime and return this function.
*
*/
void End();
/**
* @brief Get the send time
*
* Time spent in send loop.
*
* @return double sent time in seconds
*/
double getSendTime();
/**
* @brief Get the recv time
*
* Time spent in recv loop.
*
* @return double recv time in seconds
*/
double getRecvTime();
/**
* @brief Get the idle time
*
* Time master was idling in MPI_Probe of recv loop.
*
* @return double idle time in seconds
*/
double getIdleTime();
/**
* @brief Get the Worker time
*
* Time spent in whole send/recv loop.
*
* @return double worker time in seconds
*/
double getWorkerTime();
/**
* @brief Get the ChemMaster time
*
* Time spent in 'master_chemistry' R function.
*
* @return double ChemMaster time in seconds
*/
double getChemMasterTime();
/**
* @brief Get the sequential time
*
* Time master executed code which must be run sequential.
*
* @return double seqntial time in seconds.
*/
double getSeqTime();
private:
void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70);
inline void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers,
double *&work_pointer, const double &dt,
const uint32_t iteration);
inline void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers);
void shuffleField(const std::vector<double> &in_field, uint32_t size_per_prop,
uint32_t prop_count, double *out_buffer);
void unshuffleField(const double *in_buffer, uint32_t size_per_prop,
uint32_t prop_count, std::vector<double> &out_field);
uint32_t wp_size;
bool dht_enabled;
double send_t = 0.f;
double recv_t = 0.f;
double master_idle = 0.f;
double worker_t = 0.f;
double chem_master = 0.f;
double seq_t = 0.f;
typedef struct {
char has_work;
double *send_addr;
} worker_struct;
worker_struct *workerlist;
double *send_buffer;
double *mpi_buffer;
std::vector<uint32_t> wp_sizes_vector;
poet::StateMemory *state;
};
/**
* @brief Class providing execution of worker chemistry
*
* Providing mainly a function to loop and wait for messages from the master.
*
*/
class ChemWorker : public BaseChemModule {
public:
/**
* @brief Construct a new ChemWorker object
*
* The following steps are executed to create a new object of ChemWorker:
* -# all needed simulation parameters are extracted
* -# memory is allocated
* -# Preparetion to create a DHT
* -# and finally create a new DHT_Wrapper
*
* @param params Simulation parameters as SimParams object
* @param R_ R runtime
* @param grid_ Grid object
* @param dht_comm Communicator addressing all processes marked as worker
*/
ChemWorker(SimParams &params, RInside &R_, Grid &grid_, MPI_Comm dht_comm);
void InitModule(poet::ChemistryParams &chem_params);
/**
* @brief Destroy the ChemWorker object
*
* Therefore all buffers are freed and the DHT_Wrapper object is destroyed.
*
*/
~ChemWorker();
/**
* @brief Start the 'work' loop
*
* Loop in an endless loop. At the beginning probe for a message from the
* master process. If there is a receivable message evaluate the message tag.
*
*/
void loop();
private:
void doWork(MPI_Status &probe_status);
void postIter();
void finishWork();
void writeFile();
void readFile();
uint32_t wp_size;
uint32_t dht_size_per_process;
uint32_t iteration = 0;
bool dht_enabled;
bool dt_differ;
int dht_snaps;
std::string dht_file;
std::vector<bool> dht_flags;
double *mpi_buffer_results;
DHT_Wrapper *dht;
std::array<double, 3> timing;
double idle_t = 0.f;
uint32_t phreeqc_count = 0;
double *mpi_buffer;
uint32_t ncomps;
std::string out_dir;
PhreeqcWrapper *phreeqc_rm = std::nullptr_t();
};
} // namespace poet
#endif // CHEMSIMPAR_H

View File

@ -1,131 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef CHEMSIMSEQ_H
#define CHEMSIMSEQ_H
#include "DHT_Wrapper.hpp"
#include "Grid.hpp"
#include "PhreeqcWrapper.hpp"
#include "RInside.h"
#include "SimParams.hpp"
#include <bits/stdint-uintn.h>
#include <cstddef>
#include <cstdint>
#include <mpi.h>
#include <string>
#include <vector>
/** Number of data elements that are kept free at each work package */
#define BUFFER_OFFSET 5
/** Message tag indicating work */
#define TAG_WORK 42
/** Message tag indicating to finish loop */
#define TAG_FINISH 43
/** Message tag indicating timing profiling */
#define TAG_TIMING 44
/** Message tag indicating collecting DHT performance */
#define TAG_DHT_PERF 45
/** Message tag indicating simulation reached the end of an itertation */
#define TAG_DHT_ITER 47
namespace poet {
/**
* @brief Base class of the chemical simulation
*
* Providing member functions to run an iteration and to end a simulation. Also
* containing basic parameters for simulation.
*
*/
class BaseChemModule {
public:
BaseChemModule(SimParams &params, RInside &R_, Grid &grid_);
virtual void InitModule(poet::ChemistryParams &chem_params){};
static constexpr const char *CHEMISTRY_MODULE_NAME = "state_c";
protected:
int world_rank;
int world_size;
RInside &R;
Grid &grid;
double chem_t = 0.f;
double current_sim_time = 0;
uint32_t n_cells_per_prop;
std::vector<std::string> prop_names;
};
class ChemSeq : public BaseChemModule {
public:
/**
* @brief Construct a new ChemSim object
*
* Creating a new instance of class ChemSim will just extract simulation
* parameters from SimParams object.
*
* @param params SimParams object
* @param R_ R runtime
* @param grid_ Initialized grid
*/
ChemSeq(SimParams &params, RInside &R_, Grid &grid_);
~ChemSeq();
void InitModule(poet::ChemistryParams &chem_params);
/**
* @brief Run iteration of simulation in sequential mode
*
* This will call the correspondending R function slave_chemistry, followed by
* the execution of master_chemistry.
*
* @todo change function name. Maybe 'slave' to 'seq'.
*
*/
void Simulate(double dt);
/**
* @brief End simulation
*
* End the simulation by distribute the measured runtime of simulation to the
* R runtime.
*
*/
void End();
/**
* @brief Get the Chemistry Time
*
* @return double Runtime of sequential chemistry simulation in seconds
*/
double getChemistryTime();
private:
poet::StateMemory *state;
PhreeqcWrapper *phreeqc_rm = std::nullptr_t();
};
} // namespace poet
#endif // CHEMSIMSEQ_H

View File

@ -0,0 +1,413 @@
#ifndef CHEMISTRYMODULE_H_
#define CHEMISTRYMODULE_H_
#include "IrmResult.h"
#include "PhreeqcRM.h"
#include "poet/DHT_Wrapper.hpp"
#include <cstddef>
#include <cstdint>
#include <mpi.h>
#include <string>
#include <unordered_map>
#include <vector>
namespace poet {
/**
* \brief Wrapper around PhreeqcRM to provide POET specific parallelization with
* easy access.
*/
class ChemistryModule : public PhreeqcRM {
public:
#ifdef POET_USE_PRM
/**
* Creates a new instance of Chemistry module with given grid cell count and
* using MPI communicator.
*
* Constructor is just a wrapper around PhreeqcRM's constructor, so just calls
* the base class constructor.
*
* \param nxyz Count of grid cells.
* \param communicator MPI communicator where work will be distributed.
*/
ChemistryModule(uint32_t nxyz, MPI_Comm communicator)
: PhreeqcRM(nxyz, communicator), n_cells(nxyz){};
#else
/**
* Creates a new instance of Chemistry module with given grid cell count, work
* package size and communicator.
*
* This constructor shall only be called by the master. To create workers, see
* ChemistryModule::createWorker .
*
* When the use of parallelization is intended, the nxyz value shall be set to
* 1 to save memory and only one node is needed for initialization.
*
* \param nxyz Count of grid cells to allocate and initialize for each
* process. For parellel use set to 1 at the master.
* \param wp_size Count of grid cells to fill each work package at maximum.
* \param communicator MPI communicator to distribute work in.
*/
ChemistryModule(uint32_t nxyz, uint32_t wp_size, MPI_Comm communicator);
/**
* Deconstructor, which frees DHT data structure if used.
*/
~ChemistryModule();
#endif
/**
* Parses the input script and extract information needed during runtime.
*
* **Only run by master**.
*
* Database must be loaded beforehand.
*
* \param input_script_path Path to input script to parse.
*/
void RunInitFile(const std::string &input_script_path);
/**
* Run the chemical simulation with parameters set.
*/
void RunCells();
/**
* Returns the chemical field.
*/
auto GetField() const { return this->field; }
/**
* Returns all known species names, including not only aqueous species, but
* also equilibrium, exchange, surface and kinetic reactants.
*/
auto GetPropNames() const { return this->prop_names; }
/**
* Return the accumulated runtime in seconds for chemical simulation.
*/
auto GetChemistryTime() const { return this->chem_t; }
#ifndef POET_USE_PRM
/**
* Create a new worker instance inside given MPI communicator.
*
* Wraps communication needed before instanciation can take place.
*
* \param communicator MPI communicator to distribute work in.
*
* \returns A worker instance with fixed work package size.
*/
static ChemistryModule createWorker(MPI_Comm communicator);
/**
* Default work package size.
*/
static constexpr uint32_t CHEM_DEFAULT_WP_SIZE = 5;
/**
* Intended to alias input parameters for grid initialization with a single
* value per species.
*/
using SingleCMap = std::unordered_map<std::string, double>;
/**
* Intended to alias input parameters for grid initialization with mutlitple
* values per species.
*/
using VectorCMap = std::unordered_map<std::string, std::vector<double>>;
/**
* Enumerating DHT file options
*/
enum {
DHT_FILES_DISABLED = 0, //!< disabled file output
DHT_FILES_SIMEND, //!< only output of snapshot after simulation
DHT_FILES_ITEREND //!< output snapshots after each iteration
};
/**
* Initializes field with a "DataFrame" containing a single value for each
* species.
*
* Each species must have been previously found by
* ChemistryModule::RunInitFile.
*
* \exception std::domain_error Species name of map is not defined.
*
* \param n_cells Count of cells for each species.
* \param mapped_values Unordered map containing one double value for each
* specified species.
*/
void InitializeField(uint32_t n_cells, const SingleCMap &mapped_values);
/**
* Initializes field with a "DataFrame" containing a vector of values for each
* species.
*
* Each species must have been previously found by
* ChemistryModule::RunInitFile.
*
* There is no check if vector length matches count of grid cells defined.
*
* \exception std::domain_error Species name of map is not defined.
*
* \param mapped_values Unordered map containing a vector of multiple values.
* Size of the vectors shall be the count of grid cells defined previously.
*/
void InitializeField(const VectorCMap &mapped_values);
/**
* **Only called by workers!** Start the worker listening loop.
*/
void WorkerLoop();
/**
* **Called by master** Advise the workers to break the loop.
*/
void MasterLoopBreak();
/**
* **Master only** Enables DHT usage for the workers.
*
* If called multiple times enabling the DHT, the current state of DHT will be
* overwritten with a new instance of the DHT.
*
* \param enable Enables or disables the usage of the DHT.
* \param size_mb Size in megabyte to allocate for the DHT if enabled.
*/
void SetDHTEnabled(bool enable, uint32_t size_mb);
/**
* **Master only** Set DHT snapshots to specific mode.
*
* \param type DHT snapshot mode.
* \param out_dir Path to output DHT snapshots.
*/
void SetDHTSnaps(int type, const std::string &out_dir);
/**
* **Master only** Set the vector with significant digits to round before
* inserting into DHT.
*
* \param signif_vec Vector defining significant digit for each species. Order
* is defined by prop_type vector (ChemistryModule::GetPropNames).
*/
void SetDHTSignifVector(std::vector<uint32_t> signif_vec);
/**
* **Master only** Set the DHT rounding type of each species. See
* DHT_PROP_TYPES enumemartion for explanation.
*
* \param proptype_vec Vector defining DHT prop type for each species.
*/
void SetDHTPropTypeVector(std::vector<uint32_t> proptype_vec);
/**
* **Master only** Load the state of the DHT from given file.
*
* \param input_file File to load data from.
*/
void ReadDHTFile(const std::string &input_file);
/**
* Overwrite the current field state by another field.
*
* There are no checks if new vector dimensions matches expected sizes etc.
*
* \param field New input field. Current field state of the instance will be
* overwritten.
*/
void SetField(const std::vector<double> &field) { this->field = field; }
/**
* **Master only** Return count of grid cells.
*/
auto GetNCells() const { return this->n_cells; }
/**
* **Master only** Return work package size.
*/
auto GetWPSize() const { return this->wp_size; }
/**
* **Master only** Return the time in seconds the master spent waiting for any
* free worker.
*/
auto GetMasterIdleTime() const { return this->idle_t; }
/**
* **Master only** Return the time in seconds the master spent in sequential
* part of the simulation, including times for shuffling/unshuffling field
* etc.
*/
auto GetMasterSequentialTime() const { return this->seq_t; }
/**
* **Master only** Return the time in seconds the master spent in the
* send/receive loop.
*/
auto GetMasterLoopTime() const { return this->send_recv_t; }
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to run Phreeqc simulation.
*
* \return Vector of all accumulated Phreeqc timings.
*/
std::vector<double> GetWorkerPhreeqcTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to get values from the DHT.
*
* \return Vector of all accumulated DHT get times.
*/
std::vector<double> GetWorkerDHTGetTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to write values to the DHT.
*
* \return Vector of all accumulated DHT fill times.
*/
std::vector<double> GetWorkerDHTFillTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers waiting for work packages from the master.
*
* \return Vector of all accumulated waiting times.
*/
std::vector<double> GetWorkerIdleTimings() const;
/**
* **Master only** Collect and return DHT hits of all workers.
*
* \return Vector of all count of DHT hits.
*/
std::vector<uint32_t> GetWorkerDHTHits() const;
/**
* **Master only** Collect and return DHT misses of all workers.
*
* \return Vector of all count of DHT misses.
*/
std::vector<uint32_t> GetWorkerDHTMiss() const;
/**
* **Master only** Collect and return DHT evictions of all workers.
*
* \return Vector of all count of DHT evictions.
*/
std::vector<uint32_t> GetWorkerDHTEvictions() const;
#endif
protected:
#ifdef POET_USE_PRM
void PrmToPoetField(std::vector<double> &field);
#else
enum {
CHEM_INIT,
CHEM_WP_SIZE,
CHEM_DHT_ENABLE,
CHEM_DHT_SIGNIF_VEC,
CHEM_DHT_PROP_TYPE_VEC,
CHEM_DHT_SNAPS,
CHEM_DHT_READ_FILE,
CHEM_WORK_LOOP,
CHEM_PERF,
CHEM_BREAK_MAIN_LOOP
};
enum { LOOP_WORK, LOOP_END };
enum {
WORKER_PHREEQC,
WORKER_DHT_GET,
WORKER_DHT_FILL,
WORKER_IDLE,
WORKER_DHT_HITS,
WORKER_DHT_MISS,
WORKER_DHT_EVICTIONS
};
struct worker_s {
double phreeqc_t = 0.;
double dht_get = 0.;
double dht_fill = 0.;
double idle_t = 0.;
};
struct worker_info_s {
char has_work = 0;
double *send_addr;
};
using worker_list_t = std::vector<struct worker_info_s>;
using workpointer_t = std::vector<double>::iterator;
void MasterRunParallel();
void MasterRunSequential();
void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer,
int &pkg_to_send, int &count_pkgs, int &free_workers,
double dt, uint32_t iteration,
const std::vector<uint32_t> &wp_sizes_vector);
void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send,
int &free_workers);
std::vector<double> MasterGatherWorkerTimings(int type) const;
std::vector<uint32_t> MasterGatherWorkerMetrics(int type) const;
void WorkerProcessPkgs(struct worker_s &timings, uint32_t &iteration);
void WorkerDoWork(MPI_Status &probe_status, int double_count,
struct worker_s &timings);
void WorkerPostIter(MPI_Status &prope_status, uint32_t iteration);
void WorkerPostSim(uint32_t iteration);
void WorkerWriteDHTDump(uint32_t iteration);
void WorkerReadDHTDump(const std::string &dht_input_file);
void WorkerPerfToMaster(int type, const struct worker_s &timings);
void WorkerMetricsToMaster(int type);
IRM_RESULT WorkerRunWorkPackage(std::vector<double> &vecWP,
std::vector<int32_t> &vecMapping,
double dSimTime, double dTimestep);
void GetWPFromInternals(std::vector<double> &vecWP, uint32_t wp_size);
void SetInternalsFromWP(const std::vector<double> &vecWP, uint32_t wp_size);
std::vector<uint32_t> CalculateWPSizesVector(uint32_t n_cells,
uint32_t wp_size) const;
int comm_size, comm_rank;
MPI_Comm group_comm;
bool is_sequential;
bool is_master;
uint32_t wp_size;
bool dht_enabled = false;
int dht_snaps_type = DHT_FILES_DISABLED;
std::string dht_file_out_dir;
poet::DHT_Wrapper *dht = std::nullptr_t();
static constexpr uint32_t BUFFER_OFFSET = 5;
inline void ChemBCast(void *buf, int count, MPI_Datatype datatype) const {
MPI_Bcast(buf, count, datatype, 0, this->group_comm);
}
inline void PropagateFunctionType(int &type) const {
ChemBCast(&type, 1, MPI_INT);
}
double simtime = 0.;
double idle_t = 0.;
double seq_t = 0.;
double send_recv_t = 0.;
#endif
double chem_t = 0.;
uint32_t n_cells = 0;
uint32_t prop_count = 0;
std::vector<std::string> prop_names;
std::vector<double> field;
std::vector<uint32_t> speciesPerModule;
static constexpr uint32_t MODULE_COUNT = 5;
};
} // namespace poet
#endif // CHEMISTRYMODULE_H_

View File

@ -2,7 +2,7 @@
#define DHT_TYPES_H_
namespace poet {
enum { DHT_TYPE_DEFAULT, DHT_TYPE_ACT, DHT_TYPE_IGNORE };
enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_ACT, DHT_TYPE_IGNORE };
}
#endif // DHT_TYPES_H_

View File

@ -197,9 +197,9 @@ private:
std::vector<DHT_Keyelement> fuzzForDHT(int var_count, void *key, double dt);
uint64_t dht_hits = 0;
uint64_t dht_miss = 0;
uint64_t dht_evictions = 0;
uint32_t dht_hits = 0;
uint32_t dht_miss = 0;
uint32_t dht_evictions = 0;
std::vector<uint32_t> dht_signif_vector;
std::vector<uint32_t> dht_prop_type_vector;

View File

@ -1,50 +0,0 @@
#ifndef PHREEQCWRAPPER_H_
#define PHREEQCWRAPPER_H_
#include "IrmResult.h"
#include "poet/SimParams.hpp"
#include <PhreeqcRM.h>
#include <cstdint>
#include <string>
#include <vector>
namespace poet {
class PhreeqcWrapper : public PhreeqcRM {
public:
PhreeqcWrapper(uint32_t inxyz);
void SetupAndLoadDB(const poet::ChemistryParams &chemPar);
void InitFromFile(const std::string &strInputFile);
auto GetSpeciesCount() -> uint32_t;
auto GetSpeciesCountByModule() -> const std::vector<uint32_t> &;
auto GetSpeciesNamesFull() -> const std::vector<std::string> &;
void SetInternalsFromWP(const std::vector<double> &vecWP,
uint32_t iCurrWPSize);
void GetWPFromInternals(std::vector<double> &vecWP, uint32_t iCurrWPSize);
auto ReplaceTotalsByPotentials(const std::vector<double> &vecWP,
uint32_t iCurrWPSize) -> std::vector<double>;
IRM_RESULT RunWorkPackage(std::vector<double> &vecWP,
std::vector<int32_t> &vecMapping, double dSimTime,
double dTimestep);
private:
void InitInternals();
inline void SetMappingForWP(uint32_t iCurrWPSize);
static constexpr int MODULE_COUNT = 5;
static constexpr uint32_t DHT_SELOUT = 100;
uint32_t iWPSize;
bool bInactiveCells = false;
uint32_t iSpeciesCount;
std::vector<uint32_t> vecSpeciesPerModule;
std::vector<std::string> vecSpeciesNames;
std::vector<int> vecDefMapping;
};
} // namespace poet
#endif // PHREEQCWRAPPER_H_

View File

@ -1,12 +0,0 @@
#include <poet/ChemSimSeq.hpp>
poet::BaseChemModule::BaseChemModule(SimParams &params, RInside &R_,
Grid &grid_)
: R(R_), grid(grid_) {
t_simparams tmp = params.getNumParams();
this->world_rank = tmp.world_rank;
this->world_size = tmp.world_size;
this->prop_names = this->grid.GetPropNames();
this->n_cells_per_prop = this->grid.GetTotalCellCount();
}

View File

@ -7,14 +7,10 @@ file(GLOB poet_lib_SRC
find_library(MATH_LIBRARY m)
find_library(CRYPTO_LIBRARY crypto)
option(POET_DHT_DEBUG "Build with DHT debug info" OFF)
add_library(poet_lib ${poet_lib_SRC})
target_include_directories(poet_lib PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(poet_lib PUBLIC
MPI::MPI_C ${MATH_LIBRARY} ${CRYPTO_LIBRARY} RRuntime tug PhreeqcRM DataStructures)
MPI::MPI_CXX ${MATH_LIBRARY} ${CRYPTO_LIBRARY} RRuntime tug PhreeqcRM DataStructures ChemistryModule)
target_compile_definitions(poet_lib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
if(POET_DHT_DEBUG)
target_compile_definitions(poet_lib PRIVATE DHT_STATISTICS)
endif()
add_subdirectory(ChemistryModule)

View File

@ -1,445 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/ChemSimPar.hpp"
#include "poet/ChemSimSeq.hpp"
#include "poet/DiffusionModule.hpp"
#include "poet/Grid.hpp"
#include "poet/SimParams.hpp"
#include <Rcpp.h>
#include <array>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <string>
#include <vector>
using namespace poet;
using namespace std;
using namespace Rcpp;
ChemMaster::ChemMaster(SimParams &params, RInside &R_, Grid &grid_)
: BaseChemModule(params, R_, grid_) {
t_simparams tmp = params.getNumParams();
this->wp_size = tmp.wp_size;
this->dht_enabled = tmp.dht_enabled;
uint32_t grid_size = grid.GetTotalCellCount() * this->prop_names.size();
/* allocate memory */
this->workerlist = new worker_struct[this->world_size - 1];
std::memset(this->workerlist, '\0', sizeof(worker_struct) * (world_size - 1));
this->send_buffer =
new double[this->wp_size * this->prop_names.size() + BUFFER_OFFSET];
this->mpi_buffer = new double[grid_size];
/* calculate distribution of work packages */
uint32_t mod_pkgs = grid.GetTotalCellCount() % this->wp_size;
uint32_t n_packages = (uint32_t)(grid.GetTotalCellCount() / this->wp_size) +
(mod_pkgs != 0 ? 1 : 0);
Rcpp::Function wp_f("GetWorkPackageSizesVector");
this->wp_sizes_vector = Rcpp::as<std::vector<uint32_t>>(
wp_f(n_packages, this->wp_size, grid.GetTotalCellCount()));
this->state = this->grid.RegisterState(
poet::BaseChemModule::CHEMISTRY_MODULE_NAME, this->prop_names);
std::vector<double> &field = this->state->mem;
field.resize(this->n_cells_per_prop * this->prop_names.size());
for (uint32_t i = 0; i < this->prop_names.size(); i++) {
std::vector<double> prop_vec =
this->grid.GetSpeciesByName(this->prop_names[i]);
std::copy(prop_vec.begin(), prop_vec.end(),
field.begin() + (i * this->n_cells_per_prop));
}
}
ChemMaster::~ChemMaster() {
delete this->workerlist;
delete this->send_buffer;
delete this->mpi_buffer;
}
void ChemMaster::Simulate(double dt) {
/* declare most of the needed variables here */
double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b;
double sim_e_chemistry, sim_f_chemistry;
int pkg_to_send, pkg_to_recv;
int free_workers;
int i_pkgs;
uint32_t iteration;
/* start time measurement of whole chemistry simulation */
chem_a = MPI_Wtime();
/* start time measurement of sequential part */
seq_a = MPI_Wtime();
std::vector<double> &field = this->state->mem;
// HACK: transfer the field into R data structure serving as input for phreeqc
R["TMP_T"] = field;
R.parseEvalQ("mysetup$state_T <- setNames(data.frame(matrix(TMP_T, "
"ncol=length(mysetup$grid$props), nrow=" +
std::to_string(this->n_cells_per_prop) +
")), mysetup$grid$props)");
/* shuffle grid */
// grid.shuffleAndExport(mpi_buffer);
this->shuffleField(field, this->n_cells_per_prop, this->prop_names.size(),
mpi_buffer);
/* retrieve needed data from R runtime */
iteration = (int)R.parseEval("mysetup$iter");
// dt = (double)R.parseEval("mysetup$requested_dt");
/* setup local variables */
pkg_to_send = wp_sizes_vector.size();
pkg_to_recv = wp_sizes_vector.size();
double *work_pointer = mpi_buffer;
free_workers = world_size - 1;
i_pkgs = 0;
/* end time measurement of sequential part */
seq_b = MPI_Wtime();
seq_t += seq_b - seq_a;
/* start time measurement of chemistry time needed for send/recv loop */
worker_chemistry_a = MPI_Wtime();
/* start send/recv loop */
// while there are still packages to recv
while (pkg_to_recv > 0) {
// print a progressbar to stdout
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
// while there are still packages to send
if (pkg_to_send > 0) {
// send packages to all free workers ...
sendPkgs(pkg_to_send, i_pkgs, free_workers, work_pointer, dt, iteration);
}
// ... and try to receive them from workers who has finished their work
recvPkgs(pkg_to_recv, pkg_to_send > 0, free_workers);
}
// Just to complete the progressbar
cout << endl;
/* stop time measurement of chemistry time needed for send/recv loop */
worker_chemistry_b = MPI_Wtime();
worker_t = worker_chemistry_b - worker_chemistry_a;
/* start time measurement of sequential part */
seq_c = MPI_Wtime();
/* unshuffle grid */
// grid.importAndUnshuffle(mpi_buffer);
this->unshuffleField(mpi_buffer, this->n_cells_per_prop,
this->prop_names.size(), field);
/* do master stuff */
/* start time measurement of master chemistry */
sim_e_chemistry = MPI_Wtime();
// HACK: We don't need to call master_chemistry here since our result is
// already written to the memory as a data frame
// R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
R["TMP_T"] = Rcpp::wrap(field);
R.parseEval(std::string(
"mysetup$state_C <- setNames(data.frame(matrix(TMP_T, nrow=" +
to_string(this->n_cells_per_prop) + ")), mysetup$grid$props)"));
/* end time measurement of master chemistry */
sim_f_chemistry = MPI_Wtime();
chem_master += sim_f_chemistry - sim_e_chemistry;
/* end time measurement of sequential part */
seq_d = MPI_Wtime();
seq_t += seq_d - seq_c;
/* end time measurement of whole chemistry simulation */
chem_b = MPI_Wtime();
chem_t += chem_b - chem_a;
/* advise workers to end chemistry iteration */
for (int i = 1; i < world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
}
this->current_sim_time += dt;
}
inline void ChemMaster::sendPkgs(int &pkg_to_send, int &count_pkgs,
int &free_workers, double *&work_pointer,
const double &dt, const uint32_t iteration) {
/* declare variables */
double master_send_a, master_send_b;
int local_work_package_size;
int end_of_wp;
/* start time measurement */
master_send_a = MPI_Wtime();
/* search for free workers and send work */
for (int p = 0; p < world_size - 1; p++) {
if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ {
/* to enable different work_package_size, set local copy of
* work_package_size to pre-calculated work package size vector */
local_work_package_size = (int)wp_sizes_vector[count_pkgs];
count_pkgs++;
/* note current processed work package in workerlist */
workerlist[p].send_addr = work_pointer;
/* push work pointer to next work package */
end_of_wp = local_work_package_size * grid.GetSpeciesCount();
work_pointer = &(work_pointer[end_of_wp]);
// fill send buffer starting with work_package ...
std::memcpy(send_buffer, workerlist[p].send_addr,
(end_of_wp) * sizeof(double));
// followed by: work_package_size
send_buffer[end_of_wp] = (double)local_work_package_size;
// current iteration of simulation
send_buffer[end_of_wp + 1] = (double)iteration;
// size of timestep in seconds
send_buffer[end_of_wp + 2] = dt;
// current time of simulation (age) in seconds
send_buffer[end_of_wp + 3] = current_sim_time;
// placeholder for work_package_count
send_buffer[end_of_wp + 4] = 0.;
/* ATTENTION Worker p has rank p+1 */
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
TAG_WORK, MPI_COMM_WORLD);
/* Mark that worker has work to do */
workerlist[p].has_work = 1;
free_workers--;
pkg_to_send -= 1;
}
}
master_send_b = MPI_Wtime();
send_t += master_send_b - master_send_a;
}
inline void ChemMaster::recvPkgs(int &pkg_to_recv, bool to_send,
int &free_workers) {
/* declare most of the variables here */
int need_to_receive = 1;
double master_recv_a, master_recv_b;
double idle_a, idle_b;
int p, size;
MPI_Status probe_status;
master_recv_a = MPI_Wtime();
/* start to loop as long there are packages to recv and the need to receive
*/
while (need_to_receive && pkg_to_recv > 0) {
// only of there are still packages to send and free workers are available
if (to_send && free_workers > 0)
// non blocking probing
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive,
&probe_status);
else {
idle_a = MPI_Wtime();
// blocking probing
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status);
idle_b = MPI_Wtime();
master_idle += idle_b - idle_a;
}
/* if need_to_receive was set to true above, so there is a message to
* receive */
if (need_to_receive) {
p = probe_status.MPI_SOURCE;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p, TAG_WORK,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
workerlist[p - 1].has_work = 0;
pkg_to_recv -= 1;
free_workers++;
}
}
master_recv_b = MPI_Wtime();
recv_t += master_recv_b - master_recv_a;
}
void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
/* visual progress */
double progress = (float)(count_pkgs + 1) / n_wp;
cout << "[";
int pos = barWidth * progress;
for (int iprog = 0; iprog < barWidth; ++iprog) {
if (iprog < pos)
cout << "=";
else if (iprog == pos)
cout << ">";
else
cout << " ";
}
std::cout << "] " << int(progress * 100.0) << " %\r";
std::cout.flush();
/* end visual progress */
}
void ChemMaster::End() {
R["simtime_chemistry"] = chem_t;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
Rcpp::NumericVector phreeqc_time;
Rcpp::NumericVector dht_get_time;
Rcpp::NumericVector dht_fill_time;
Rcpp::IntegerVector phreeqc_counts;
Rcpp::NumericVector idle_worker;
int phreeqc_tmp;
std::array<double, 3> timings;
std::array<int, 3> dht_perfs;
int dht_hits = 0;
int dht_miss = 0;
int dht_evictions = 0;
double idle_worker_tmp;
/* loop over all workers *
* ATTENTION Worker p has rank p+1 */
for (int p = 0; p < world_size - 1; p++) {
/* Send termination message to worker */
MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
/* ... and receive all timings and metrics from each worker */
MPI_Recv(timings.data(), 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
if (dht_enabled) {
dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
MPI_Recv(dht_perfs.data(), 3, MPI_INT, p + 1, TAG_DHT_PERF,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
dht_hits += dht_perfs[0];
dht_miss += dht_perfs[1];
dht_evictions += dht_perfs[2];
}
}
/* distribute all data to the R runtime */
R["simtime_chemistry"] = chem_t;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
R["simtime_workers"] = worker_t;
R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
R["simtime_chemistry_master"] = chem_master;
R.parseEvalQ(
"profiling$simtime_chemistry_master <- simtime_chemistry_master");
R["seq_master"] = seq_t;
R.parseEvalQ("profiling$seq_master <- seq_master");
R["idle_master"] = master_idle;
R.parseEvalQ("profiling$idle_master <- idle_master");
R["idle_worker"] = idle_worker;
R.parseEvalQ("profiling$idle_worker <- idle_worker");
R["phreeqc_time"] = phreeqc_time;
R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
R["phreeqc_count"] = phreeqc_counts;
R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
if (dht_enabled) {
R["dht_hits"] = dht_hits;
R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_miss"] = dht_miss;
R.parseEvalQ("profiling$dht_miss <- dht_miss");
R["dht_evictions"] = dht_evictions;
R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
R["dht_get_time"] = dht_get_time;
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = dht_fill_time;
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
}
void ChemMaster::shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop, uint32_t prop_count,
double *out_buffer) {
uint32_t wp_count = this->wp_sizes_vector.size();
uint32_t write_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_buffer[(write_i * prop_count) + k] =
in_field[(k * size_per_prop) + j];
}
write_i++;
}
}
}
void ChemMaster::unshuffleField(const double *in_buffer, uint32_t size_per_prop,
uint32_t prop_count,
std::vector<double> &out_field) {
uint32_t wp_count = this->wp_sizes_vector.size();
uint32_t read_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_field[(k * size_per_prop) + j] =
in_buffer[(read_i * prop_count) + k];
}
read_i++;
}
}
}
double ChemMaster::getSendTime() { return this->send_t; }
double ChemMaster::getRecvTime() { return this->recv_t; }
double ChemMaster::getIdleTime() { return this->master_idle; }
double ChemMaster::getWorkerTime() { return this->worker_t; }
double ChemMaster::getChemMasterTime() { return this->chem_master; }
double ChemMaster::getSeqTime() { return this->seq_t; }

View File

@ -1,112 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/DiffusionModule.hpp"
#include "poet/SimParams.hpp"
#include <poet/ChemSimSeq.hpp>
#include <poet/Grid.hpp>
#include <Rcpp.h>
#include <algorithm>
#include <bits/stdint-uintn.h>
#include <iostream>
#include <string>
#include <vector>
using namespace Rcpp;
using namespace poet;
ChemSeq::ChemSeq(SimParams &params, RInside &R_, Grid &grid_)
: BaseChemModule(params, R_, grid_) {
this->state = this->grid.RegisterState(
poet::BaseChemModule::CHEMISTRY_MODULE_NAME, this->prop_names);
std::vector<double> &field = this->state->mem;
field.resize(this->n_cells_per_prop * this->prop_names.size());
for (uint32_t i = 0; i < this->prop_names.size(); i++) {
std::vector<double> prop_vec =
this->grid.GetSpeciesByName(this->prop_names[i]);
std::copy(prop_vec.begin(), prop_vec.end(),
field.begin() + (i * this->n_cells_per_prop));
}
}
poet::ChemSeq::~ChemSeq() {
if (this->phreeqc_rm) {
delete this->phreeqc_rm;
}
}
void poet::ChemSeq::InitModule(poet::ChemistryParams &chem_params) {
this->phreeqc_rm = new PhreeqcWrapper(this->n_cells_per_prop);
this->phreeqc_rm->SetupAndLoadDB(chem_params);
this->phreeqc_rm->InitFromFile(chem_params.input_script);
}
void ChemSeq::Simulate(double dt) {
double chem_a, chem_b;
/* start time measuring */
chem_a = MPI_Wtime();
std::vector<double> &field = this->state->mem;
// HACK: transfer the field into R data structure serving as input for phreeqc
R["TMP_T"] = field;
R.parseEvalQ("mysetup$state_T <- setNames(data.frame(matrix(TMP_T, "
"ncol=length(mysetup$grid$props), nrow=" +
std::to_string(this->n_cells_per_prop) +
")), mysetup$grid$props)");
this->phreeqc_rm->SetInternalsFromWP(field, this->n_cells_per_prop);
this->phreeqc_rm->SetTime(0);
this->phreeqc_rm->SetTimeStep(dt);
this->phreeqc_rm->RunCells();
// HACK: we will copy resulting field into global grid field. Maybe we will
// find a more performant way here ...
std::vector<double> vecSimResult;
this->phreeqc_rm->GetWPFromInternals(vecSimResult, this->n_cells_per_prop);
std::copy(vecSimResult.begin(), vecSimResult.end(), field.begin());
R["TMP_C"] = field;
R.parseEvalQ("mysetup$state_C <- setNames(data.frame(matrix(TMP_C, "
"ncol=length(mysetup$grid$props), nrow=" +
std::to_string(this->n_cells_per_prop) +
")), mysetup$grid$props)");
/* end time measuring */
chem_b = MPI_Wtime();
chem_t += chem_b - chem_a;
}
void ChemSeq::End() {
R["simtime_chemistry"] = this->chem_t;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
}
double ChemSeq::getChemistryTime() { return this->chem_t; }

View File

@ -1,305 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2021 Marco De Lucia (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/ChemSimPar.hpp"
#include "poet/DHT_Wrapper.hpp"
#include "poet/SimParams.hpp"
#include <Rcpp.h>
#include <algorithm>
#include <cmath>
#include <cstdint>
#include <iostream>
#include <mpi.h>
#include <ostream>
#include <string>
#include <vector>
using namespace poet;
using namespace std;
using namespace Rcpp;
ChemWorker::ChemWorker(SimParams &params, RInside &R_, Grid &grid_,
MPI_Comm dht_comm)
: BaseChemModule(params, R_, grid_) {
t_simparams tmp = params.getNumParams();
this->wp_size = tmp.wp_size;
this->out_dir = params.getOutDir();
this->dt_differ = tmp.dt_differ;
this->dht_enabled = tmp.dht_enabled;
this->dht_size_per_process = tmp.dht_size_per_process;
this->dht_snaps = tmp.dht_snaps;
this->dht_file = params.getDHTFile();
this->mpi_buffer =
new double[(this->wp_size * this->prop_names.size()) + BUFFER_OFFSET];
this->mpi_buffer_results = new double[wp_size * this->prop_names.size()];
if (world_rank == 1)
cout << "CPP: Worker: DHT usage is " << (dht_enabled ? "ON" : "OFF")
<< endl;
if (this->dht_enabled) {
uint32_t iKeyCount = this->prop_names.size() + (dt_differ);
uint32_t iDataCount = this->prop_names.size();
if (world_rank == 1)
cout << "CPP: Worker: data count: " << iDataCount << " entries" << endl
<< "CPP: Worker: key count: " << iKeyCount << " entries" << endl
<< "CPP: Worker: memory per process "
<< params.getNumParams().dht_size_per_process / std::pow(10, 6)
<< " MByte" << endl;
dht = new DHT_Wrapper(params, dht_comm,
params.getNumParams().dht_size_per_process, iKeyCount,
iDataCount);
if (world_rank == 1)
cout << "CPP: Worker: DHT created!" << endl;
if (!dht_file.empty())
readFile();
// set size
dht_flags.resize(wp_size, true);
}
this->timing.fill(0.0);
}
ChemWorker::~ChemWorker() {
delete this->mpi_buffer;
delete this->mpi_buffer_results;
if (dht_enabled)
delete this->dht;
if (this->phreeqc_rm) {
delete this->phreeqc_rm;
}
}
void ChemWorker::loop() {
MPI_Status probe_status;
while (1) {
double idle_a = MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
double idle_b = MPI_Wtime();
/* there is a work package to receive */
if (probe_status.MPI_TAG == TAG_WORK) {
idle_t += idle_b - idle_a;
doWork(probe_status);
}
/* end of iteration */
else if (probe_status.MPI_TAG == TAG_DHT_ITER) {
postIter();
}
/* end of simulation */
else if (probe_status.MPI_TAG == TAG_FINISH) {
finishWork();
break;
}
}
}
void poet::ChemWorker::InitModule(poet::ChemistryParams &chem_params) {
this->phreeqc_rm = new PhreeqcWrapper(this->wp_size);
this->phreeqc_rm->SetupAndLoadDB(chem_params);
this->phreeqc_rm->InitFromFile(chem_params.input_script);
}
void ChemWorker::doWork(MPI_Status &probe_status) {
int count;
int local_work_package_size = 0;
static int counter = 1;
double dht_get_start, dht_get_end;
double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end;
double dt;
bool bNoPhreeqc = false;
/* get number of doubles to be received */
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
/* receive */
MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
/* decrement count of work_package by BUFFER_OFFSET */
count -= BUFFER_OFFSET;
/* check for changes on all additional variables given by the 'header' of
* mpi_buffer */
// work_package_size
local_work_package_size = mpi_buffer[count];
// current iteration of simulation
this->iteration = mpi_buffer[count + 1];
// current timestep size
dt = mpi_buffer[count + 2];
// current simulation time ('age' of simulation)
current_sim_time = mpi_buffer[count + 3];
/* 4th double value is currently a placeholder */
// placeholder = mpi_buffer[count+4];
std::vector<double> vecCurrWP(
mpi_buffer,
mpi_buffer + (local_work_package_size * this->prop_names.size()));
std::vector<int32_t> vecMappingWP(this->wp_size);
DHT_ResultObject DHT_Results;
for (uint32_t i = 0; i < local_work_package_size; i++) {
vecMappingWP[i] = i;
}
if (local_work_package_size != this->wp_size) {
std::vector<double> vecFiller(
(this->wp_size - local_work_package_size) * this->prop_names.size(), 0);
vecCurrWP.insert(vecCurrWP.end(), vecFiller.begin(), vecFiller.end());
// set all remaining cells to inactive
for (int i = local_work_package_size; i < this->wp_size; i++) {
vecMappingWP[i] = -1;
}
}
if (dht_enabled) {
/* check for values in DHT */
dht_get_start = MPI_Wtime();
DHT_Results = dht->checkDHT(local_work_package_size, dt, vecCurrWP);
dht_get_end = MPI_Wtime();
DHT_Results.ResultsToMapping(vecMappingWP);
}
phreeqc_time_start = MPI_Wtime();
this->phreeqc_rm->RunWorkPackage(vecCurrWP, vecMappingWP, current_sim_time,
dt);
phreeqc_time_end = MPI_Wtime();
if (dht_enabled) {
DHT_Results.ResultsToWP(vecCurrWP);
}
/* send results to master */
MPI_Request send_req;
MPI_Isend(vecCurrWP.data(), count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
&send_req);
if (dht_enabled) {
/* write results to DHT */
dht_fill_start = MPI_Wtime();
dht->fillDHT(local_work_package_size, DHT_Results, vecCurrWP);
dht_fill_end = MPI_Wtime();
timing[1] += dht_get_end - dht_get_start;
timing[2] += dht_fill_end - dht_fill_start;
}
timing[0] += phreeqc_time_end - phreeqc_time_start;
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
}
void ChemWorker::postIter() {
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_ITER, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
if (dht_enabled) {
dht->printStatistics();
if (dht_snaps == 2) {
writeFile();
}
}
// synchronize all processes
MPI_Barrier(MPI_COMM_WORLD);
}
void ChemWorker::writeFile() {
cout.flush();
std::stringstream out;
out << out_dir << "/iter_" << setfill('0') << setw(3) << this->iteration
<< ".dht";
int res = dht->tableToFile(out.str().c_str());
if (res != DHT_SUCCESS && world_rank == 2)
cerr << "CPP: Worker: Error in writing current state of DHT to file."
<< endl;
else if (world_rank == 2)
cout << "CPP: Worker: Successfully written DHT to file " << out.str()
<< endl;
}
void ChemWorker::readFile() {
int res = dht->fileToTable((char *)dht_file.c_str());
if (res != DHT_SUCCESS) {
if (res == DHT_WRONG_FILE) {
if (world_rank == 1)
cerr << "CPP: Worker: Wrong file layout! Continue with empty DHT ..."
<< endl;
} else {
if (world_rank == 1)
cerr << "CPP: Worker: Error in loading current state of DHT from "
"file. Continue with empty DHT ..."
<< endl;
}
} else {
if (world_rank == 2)
cout << "CPP: Worker: Successfully loaded state of DHT from file "
<< dht_file << endl;
std::cout.flush();
}
}
void ChemWorker::finishWork() {
/* before death, submit profiling/timings to master*/
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
// timings
MPI_Send(timing.data(), timing.size(), MPI_DOUBLE, 0, TAG_TIMING,
MPI_COMM_WORLD);
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
MPI_Send(&idle_t, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
if (dht_enabled) {
// dht_perf
int dht_perf[3];
dht_perf[0] = dht->getHits();
dht_perf[1] = dht->getMisses();
dht_perf[2] = dht->getEvictions();
MPI_Send(dht_perf, 3, MPI_INT, 0, TAG_DHT_PERF, MPI_COMM_WORLD);
}
if (dht_enabled && dht_snaps > 0)
writeFile();
}

View File

@ -0,0 +1,32 @@
option(POET_USE_PRM_BACKEND "Use PhreeqcRM parallelization instead of poet's." OFF)
option(POET_DHT_DEBUG "Build with DHT debug info" OFF)
list(APPEND CHEM_MODEL_SRC "ChemistryModule.cpp" )
mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP)
set(PHREEQCRM_DISABLE_OPENMP ON CACHE BOOL "" FORCE)
if(POET_USE_PRM_BACKEND)
set(PHREEQCRM_BUILD_MPI ON CACHE BOOL "" FORCE)
target_compile_definitions(poet_lib PRIVATE POET_USE_PRM)
else()
set(PHREEQCRM_BUILD_MPI OFF CACHE BOOL "" FORCE)
list(APPEND CHEM_MODEL_SRC "WorkerFunctions.cpp" "MasterFunctions.cpp" "DHT.c" "DHT_Wrapper.cpp" "HashFunctions.cpp")
endif()
add_library(ChemistryModule ${CHEM_MODEL_SRC})
target_include_directories(ChemistryModule PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(ChemistryModule
PUBLIC MPI::MPI_CXX PhreeqcRM
PRIVATE ${MATH_LIBRARY} ${CRYPTO_LIBRARY}
)
target_compile_definitions(ChemistryModule PUBLIC OMPI_SKIP_MPICXX)
if(POET_DHT_DEBUG)
target_compile_definitions(ChemistryModule PRIVATE DHT_STATISTICS)
endif()
if(POET_USE_PRM_BACKEND)
target_compile_definitions(ChemistryModule PUBLIC POET_USE_PRM)
endif()

View File

@ -0,0 +1,362 @@
#include "poet/ChemistryModule.hpp"
#include "PhreeqcRM.h"
#include "poet/DHT_Wrapper.hpp"
#include <cassert>
#include <cstdint>
#include <mpi.h>
#include <stdexcept>
#include <string>
#include <vector>
#ifndef POET_USE_PRM
poet::ChemistryModule::ChemistryModule(uint32_t nxyz, uint32_t wp_size,
MPI_Comm communicator)
: PhreeqcRM(nxyz, 1), group_comm(communicator), wp_size(wp_size) {
MPI_Comm_size(communicator, &this->comm_size);
MPI_Comm_rank(communicator, &this->comm_rank);
this->is_sequential = (this->comm_size == 1);
this->is_master = (this->comm_rank == 0);
if (!is_sequential && is_master) {
MPI_Bcast(&wp_size, 1, MPI_UINT32_T, 0, this->group_comm);
}
}
poet::ChemistryModule::~ChemistryModule() {
if (dht) {
delete dht;
}
}
poet::ChemistryModule
poet::ChemistryModule::createWorker(MPI_Comm communicator) {
uint32_t wp_size;
MPI_Bcast(&wp_size, 1, MPI_UINT32_T, 0, communicator);
return ChemistryModule(wp_size, wp_size, communicator);
}
#endif
void poet::ChemistryModule::RunInitFile(const std::string &input_script_path) {
#ifndef POET_USE_PRM
if (this->is_master) {
int f_type = CHEM_INIT;
PropagateFunctionType(f_type);
int count = input_script_path.size();
ChemBCast(&count, 1, MPI_INT);
ChemBCast(const_cast<char *>(input_script_path.data()), count, MPI_CHAR);
}
#endif
this->RunFile(true, true, false, input_script_path);
this->RunString(true, false, false, "DELETE; -all; PRINT; -warnings 0;");
this->FindComponents();
std::vector<std::string> props;
this->speciesPerModule.reserve(this->MODULE_COUNT);
std::vector<std::string> curr_prop_names = this->GetComponents();
props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
this->speciesPerModule.push_back(curr_prop_names.size());
curr_prop_names = this->GetEquilibriumPhases();
props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
char equilibrium = (curr_prop_names.empty() ? -1 : 1);
this->speciesPerModule.push_back(curr_prop_names.size());
curr_prop_names = this->GetExchangeNames();
props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
char exchange = (curr_prop_names.empty() ? -1 : 1);
this->speciesPerModule.push_back(curr_prop_names.size());
curr_prop_names = this->GetSurfaceNames();
props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
char surface = (curr_prop_names.empty() ? -1 : 1);
this->speciesPerModule.push_back(curr_prop_names.size());
// curr_prop_names = this->GetGasComponents();
// props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
// char gas = (curr_prop_names.empty() ? -1 : 1);
// this->speciesPerModule.push_back(curr_prop_names.size());
// curr_prop_names = this->GetSolidSolutionNames();
// props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
// char ssolutions = (curr_prop_names.empty() ? -1 : 1);
// this->speciesPerModule.push_back(curr_prop_names.size());
curr_prop_names = this->GetKineticReactions();
props.insert(props.end(), curr_prop_names.begin(), curr_prop_names.end());
char kinetics = (curr_prop_names.empty() ? -1 : 1);
this->speciesPerModule.push_back(curr_prop_names.size());
this->prop_count = props.size();
#ifdef POET_USE_PRM
std::vector<int> ic1;
ic1.resize(this->nxyz * 7, -1);
// TODO: hardcoded reaction modules
for (int i = 0; i < nxyz; i++) {
ic1[i] = 1; // Solution 1
ic1[nxyz + i] = equilibrium; // Equilibrium 1
ic1[2 * nxyz + i] = exchange; // Exchange none
ic1[3 * nxyz + i] = surface; // Surface none
ic1[4 * nxyz + i] = -1; // Gas phase none
ic1[5 * nxyz + i] = -1; // Solid solutions none
ic1[6 * nxyz + i] = kinetics; // Kinetics 1
}
this->InitialPhreeqc2Module(ic1);
#else
if (!this->is_master || this->is_sequential) {
std::vector<int> ic1;
ic1.resize(this->nxyz * 7, -1);
// TODO: hardcoded reaction modules
for (int i = 0; i < nxyz; i++) {
ic1[i] = 1; // Solution 1
ic1[nxyz + i] = equilibrium; // Equilibrium 1
ic1[2 * nxyz + i] = exchange; // Exchange none
ic1[3 * nxyz + i] = surface; // Surface none
ic1[4 * nxyz + i] = -1; // Gas phase none
ic1[5 * nxyz + i] = -1; // Solid solutions none
ic1[6 * nxyz + i] = kinetics; // Kinetics 1
}
this->InitialPhreeqc2Module(ic1);
}
#endif
this->prop_names = props;
}
#ifndef POET_USE_PRM
void poet::ChemistryModule::InitializeField(
uint32_t n_cells, const poet::ChemistryModule::SingleCMap &mapped_values) {
if (this->is_master) {
this->field.reserve(this->prop_count * n_cells);
for (const std::string &prop_name : this->prop_names) {
const auto m_it = mapped_values.find(prop_name);
if (m_it == mapped_values.end()) {
throw std::domain_error(
"Prop names vector does not match any key in given map.");
}
const std::vector<double> field_row(n_cells, m_it->second);
const auto chem_field_end = this->field.end();
this->field.insert(chem_field_end, field_row.begin(), field_row.end());
}
this->n_cells = n_cells;
}
}
void poet::ChemistryModule::InitializeField(
const poet::ChemistryModule::VectorCMap &mapped_values) {
if (this->is_master) {
for (const std::string &prop_name : this->prop_names) {
const auto m_it = mapped_values.find(prop_name);
if (m_it == mapped_values.end()) {
throw std::domain_error(
"Prop names vector does not match any key in given map.");
}
const auto field_row = m_it->second;
const auto chem_field_end = this->field.end();
this->field.insert(chem_field_end, field_row.begin(), field_row.end());
}
this->n_cells = mapped_values.begin()->second.size();
}
}
void poet::ChemistryModule::SetDHTEnabled(bool enable, uint32_t size_mb) {
constexpr uint32_t MB_FACTOR = 1E6;
if (this->is_master) {
int ftype = CHEM_DHT_ENABLE;
PropagateFunctionType(ftype);
ChemBCast(&enable, 1, MPI_CXX_BOOL);
ChemBCast(&size_mb, 1, MPI_UINT32_T);
}
this->dht_enabled = enable;
if (enable) {
MPI_Comm dht_comm;
if (this->is_master) {
MPI_Comm_split(this->group_comm, MPI_UNDEFINED, this->comm_rank,
&dht_comm);
return;
}
MPI_Comm_split(this->group_comm, 1, this->comm_rank, &dht_comm);
if (this->dht) {
delete this->dht;
}
const uint32_t dht_size = size_mb * MB_FACTOR;
this->dht =
new DHT_Wrapper(dht_comm, dht_size, this->prop_count, this->prop_count);
}
}
void poet::ChemistryModule::SetDHTSnaps(int type, const std::string &out_dir) {
if (this->is_master) {
int ftype = CHEM_DHT_SNAPS;
PropagateFunctionType(ftype);
int str_size = out_dir.size();
ChemBCast(&type, 1, MPI_INT);
ChemBCast(&str_size, 1, MPI_INT);
ChemBCast(const_cast<char *>(out_dir.data()), str_size, MPI_CHAR);
}
this->dht_file_out_dir = out_dir;
}
void poet::ChemistryModule::SetDHTSignifVector(
std::vector<uint32_t> signif_vec) {
if (this->is_master) {
if (signif_vec.size() != this->prop_count) {
throw std::runtime_error(
"Significant vector sizes mismatches prop count.");
}
int ftype = CHEM_DHT_SIGNIF_VEC;
PropagateFunctionType(ftype);
ChemBCast(signif_vec.data(), signif_vec.size(), MPI_UINT32_T);
return;
}
this->dht->SetSignifVector(signif_vec);
}
void poet::ChemistryModule::SetDHTPropTypeVector(
std::vector<uint32_t> proptype_vec) {
if (this->is_master) {
if (proptype_vec.size() != this->prop_count) {
throw std::runtime_error("Prop type vector sizes mismatches prop count.");
}
int ftype = CHEM_DHT_PROP_TYPE_VEC;
PropagateFunctionType(ftype);
ChemBCast(proptype_vec.data(), proptype_vec.size(), MPI_UINT32_T);
return;
}
this->dht->SetPropTypeVector(proptype_vec);
}
void poet::ChemistryModule::ReadDHTFile(const std::string &input_file) {
if (this->is_master) {
int ftype = CHEM_DHT_READ_FILE;
PropagateFunctionType(ftype);
int str_size = input_file.size();
ChemBCast(&str_size, 1, MPI_INT);
ChemBCast(const_cast<char *>(input_file.data()), str_size, MPI_CHAR);
}
if (!this->dht_enabled) {
throw std::runtime_error("DHT file cannot be read. DHT is not enabled.");
}
if (!this->is_master) {
WorkerReadDHTDump(input_file);
}
}
void poet::ChemistryModule::GetWPFromInternals(std::vector<double> &vecWP,
uint32_t wp_size) {
std::vector<double> vecCurrOutput;
vecWP.clear();
vecWP.reserve(this->prop_count * wp_size);
this->GetConcentrations(vecCurrOutput);
vecWP.insert(vecWP.end(), vecCurrOutput.begin(), vecCurrOutput.end());
if (this->speciesPerModule[1] != 0) {
this->GetPPhaseMoles(vecCurrOutput);
vecWP.insert(vecWP.end(), vecCurrOutput.begin(), vecCurrOutput.end());
}
// NOTE: Block for 'Surface' and 'Exchange' is missing because of missing
// Getters @ PhreeqcRM
// ...
// BLOCK_END
if (this->speciesPerModule[4] != 0) {
this->GetKineticsMoles(vecCurrOutput);
vecWP.insert(vecWP.end(), vecCurrOutput.begin(), vecCurrOutput.end());
}
}
void poet::ChemistryModule::SetInternalsFromWP(const std::vector<double> &vecWP,
uint32_t wp_size) {
uint32_t iCurrElements;
auto itStart = vecWP.begin();
auto itEnd = itStart;
// this->SetMappingForWP(iCurrWPSize);
int nchem = this->GetChemistryCellCount();
iCurrElements = this->speciesPerModule[0];
itEnd += iCurrElements * wp_size;
this->SetConcentrations(std::vector<double>(itStart, itEnd));
itStart = itEnd;
// Equlibirum Phases
if ((iCurrElements = this->speciesPerModule[1]) != 0) {
itEnd += iCurrElements * wp_size;
this->SetPPhaseMoles(std::vector<double>(itStart, itEnd));
itStart = itEnd;
}
// // NOTE: Block for 'Surface' and 'Exchange' is missing because of missing
// // setters @ PhreeqcRM
// // ...
// // BLOCK_END
if ((iCurrElements = this->speciesPerModule[4]) != 0) {
itEnd += iCurrElements * wp_size;
this->SetKineticsMoles(std::vector<double>(itStart, itEnd));
itStart = itEnd;
}
}
#else //POET_USE_PRM
inline void poet::ChemistryModule::PrmToPoetField(std::vector<double> &field) {
GetConcentrations(field);
int col = GetSelectedOutputColumnCount();
int rows = GetSelectedOutputRowCount();
field.reserve(field.size() + 3 * rows);
std::vector<double> so;
GetSelectedOutput(so);
for (int j = 0; j < col; j += 2) {
const auto start = so.begin() + (j * rows);
const auto end = start + rows;
field.insert(field.end(), start, end);
}
}
void poet::ChemistryModule::RunCells() {
PhreeqcRM::RunCells();
PrmToPoetField(this->field);
}
#endif

View File

@ -0,0 +1,360 @@
#include "PhreeqcRM.h"
#include "poet/ChemistryModule.hpp"
#include <algorithm>
#include <cstdint>
#include <mpi.h>
#include <stdexcept>
#include <vector>
std::vector<uint32_t>
poet::ChemistryModule::MasterGatherWorkerMetrics(int type) const {
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
uint32_t dummy;
std::vector<uint32_t> metrics(this->comm_size);
MPI_Gather(&dummy, 1, MPI_UINT32_T, metrics.data(), 1, MPI_UINT32_T, 0,
this->group_comm);
metrics.erase(metrics.begin());
return metrics;
}
std::vector<double>
poet::ChemistryModule::MasterGatherWorkerTimings(int type) const {
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
double dummy;
std::vector<double> timings(this->comm_size);
MPI_Gather(&dummy, 1, MPI_DOUBLE, timings.data(), 1, MPI_DOUBLE, 0,
this->group_comm);
timings.erase(timings.begin());
return timings;
}
std::vector<double> poet::ChemistryModule::GetWorkerPhreeqcTimings() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerTimings(WORKER_PHREEQC);
}
std::vector<double> poet::ChemistryModule::GetWorkerDHTGetTimings() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerTimings(WORKER_DHT_GET);
}
std::vector<double> poet::ChemistryModule::GetWorkerDHTFillTimings() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerTimings(WORKER_DHT_FILL);
}
std::vector<double> poet::ChemistryModule::GetWorkerIdleTimings() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerTimings(WORKER_IDLE);
}
std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTHits() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerMetrics(WORKER_DHT_HITS);
}
std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTMiss() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerMetrics(WORKER_DHT_MISS);
}
std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTEvictions() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerMetrics(WORKER_DHT_EVICTIONS);
}
inline std::vector<double> shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop,
uint32_t prop_count,
uint32_t wp_count) {
std::vector<double> out_buffer(in_field.size());
uint32_t write_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_buffer[(write_i * prop_count) + k] =
in_field[(k * size_per_prop) + j];
}
write_i++;
}
}
return out_buffer;
}
inline void unshuffleField(const std::vector<double> &in_buffer,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count, std::vector<double> &out_field) {
uint32_t read_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_field[(k * size_per_prop) + j] =
in_buffer[(read_i * prop_count) + k];
}
read_i++;
}
}
}
inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) {
/* visual progress */
double progress = (float)(count_pkgs + 1) / n_wp;
std::cout << "[";
int pos = barWidth * progress;
for (int iprog = 0; iprog < barWidth; ++iprog) {
if (iprog < pos)
std::cout << "=";
else if (iprog == pos)
std::cout << ">";
else
std::cout << " ";
}
std::cout << "] " << int(progress * 100.0) << " %\r";
std::cout.flush();
/* end visual progress */
}
inline void poet::ChemistryModule::MasterSendPkgs(
worker_list_t &w_list, workpointer_t &work_pointer, int &pkg_to_send,
int &count_pkgs, int &free_workers, double dt, uint32_t iteration,
const std::vector<uint32_t> &wp_sizes_vector) {
/* declare variables */
int local_work_package_size;
/* search for free workers and send work */
for (int p = 0; p < this->comm_size - 1; p++) {
if (w_list[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ {
/* to enable different work_package_size, set local copy of
* work_package_size to pre-calculated work package size vector */
local_work_package_size = (int)wp_sizes_vector[count_pkgs];
count_pkgs++;
/* note current processed work package in workerlist */
w_list[p].send_addr = work_pointer.base();
/* push work pointer to next work package */
const uint32_t end_of_wp = local_work_package_size * this->prop_count;
std::vector<double> send_buffer(end_of_wp + this->BUFFER_OFFSET);
std::copy(work_pointer, work_pointer + end_of_wp, send_buffer.begin());
work_pointer += end_of_wp;
// fill send buffer starting with work_package ...
// followed by: work_package_size
send_buffer[end_of_wp] = (double)local_work_package_size;
// current iteration of simulation
send_buffer[end_of_wp + 1] = (double)iteration;
// size of timestep in seconds
send_buffer[end_of_wp + 2] = dt;
// current time of simulation (age) in seconds
send_buffer[end_of_wp + 3] = this->simtime;
// placeholder for work_package_count
send_buffer[end_of_wp + 4] = 0.;
/* ATTENTION Worker p has rank p+1 */
// MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
// LOOP_WORK, this->group_comm);
MPI_Send(send_buffer.data(), send_buffer.size(), MPI_DOUBLE, p + 1,
LOOP_WORK, this->group_comm);
/* Mark that worker has work to do */
w_list[p].has_work = 1;
free_workers--;
pkg_to_send -= 1;
}
}
}
inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
int &pkg_to_recv,
bool to_send,
int &free_workers) {
/* declare most of the variables here */
int need_to_receive = 1;
double idle_a, idle_b;
int p, size;
MPI_Status probe_status;
// master_recv_a = MPI_Wtime();
/* start to loop as long there are packages to recv and the need to receive
*/
while (need_to_receive && pkg_to_recv > 0) {
// only of there are still packages to send and free workers are available
if (to_send && free_workers > 0)
// non blocking probing
MPI_Iprobe(MPI_ANY_SOURCE, LOOP_WORK, MPI_COMM_WORLD, &need_to_receive,
&probe_status);
else {
idle_a = MPI_Wtime();
// blocking probing
MPI_Probe(MPI_ANY_SOURCE, LOOP_WORK, MPI_COMM_WORLD, &probe_status);
idle_b = MPI_Wtime();
this->idle_t += idle_b - idle_a;
}
/* if need_to_receive was set to true above, so there is a message to
* receive */
if (need_to_receive) {
p = probe_status.MPI_SOURCE;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK,
this->group_comm, MPI_STATUS_IGNORE);
w_list[p - 1].has_work = 0;
pkg_to_recv -= 1;
free_workers++;
}
}
}
void poet::ChemistryModule::RunCells() {
if (this->is_sequential) {
MasterRunSequential();
return;
}
MasterRunParallel();
}
void poet::ChemistryModule::MasterRunSequential() {
SetInternalsFromWP(this->field, this->nxyz);
PhreeqcRM::RunCells();
GetWPFromInternals(this->field, this->nxyz);
}
void poet::ChemistryModule::MasterRunParallel() {
/* declare most of the needed variables here */
double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b;
double sim_e_chemistry, sim_f_chemistry;
int pkg_to_send, pkg_to_recv;
int free_workers;
int i_pkgs;
int ftype = CHEM_WORK_LOOP;
PropagateFunctionType(ftype);
MPI_Barrier(this->group_comm);
double dt = this->PhreeqcRM::GetTimeStep();
static uint32_t iteration = 0;
/* start time measurement of whole chemistry simulation */
chem_a = MPI_Wtime();
/* start time measurement of sequential part */
seq_a = MPI_Wtime();
const std::vector<uint32_t> wp_sizes_vector =
CalculateWPSizesVector(this->n_cells, this->wp_size);
/* shuffle grid */
// grid.shuffleAndExport(mpi_buffer);
std::vector<double> mpi_buffer = shuffleField(
this->field, this->n_cells, this->prop_count, wp_sizes_vector.size());
/* setup local variables */
pkg_to_send = wp_sizes_vector.size();
pkg_to_recv = wp_sizes_vector.size();
workpointer_t work_pointer = mpi_buffer.begin();
worker_list_t worker_list(this->comm_size - 1);
free_workers = this->comm_size - 1;
i_pkgs = 0;
/* end time measurement of sequential part */
seq_b = MPI_Wtime();
seq_t += seq_b - seq_a;
/* start time measurement of chemistry time needed for send/recv loop */
worker_chemistry_a = MPI_Wtime();
/* start send/recv loop */
// while there are still packages to recv
while (pkg_to_recv > 0) {
// print a progressbar to stdout
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
// while there are still packages to send
if (pkg_to_send > 0) {
// send packages to all free workers ...
MasterSendPkgs(worker_list, work_pointer, pkg_to_send, i_pkgs,
free_workers, dt, iteration, wp_sizes_vector);
}
// ... and try to receive them from workers who has finished their work
MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers);
}
// Just to complete the progressbar
std::cout << std::endl;
/* stop time measurement of chemistry time needed for send/recv loop */
worker_chemistry_b = MPI_Wtime();
this->send_recv_t += worker_chemistry_b - worker_chemistry_a;
/* start time measurement of sequential part */
seq_c = MPI_Wtime();
/* unshuffle grid */
// grid.importAndUnshuffle(mpi_buffer);
unshuffleField(mpi_buffer, this->n_cells, this->prop_count,
wp_sizes_vector.size(), this->field);
/* do master stuff */
/* start time measurement of master chemistry */
sim_e_chemistry = MPI_Wtime();
/* end time measurement of sequential part */
seq_d = MPI_Wtime();
this->seq_t += seq_d - seq_c;
/* end time measurement of whole chemistry simulation */
/* advise workers to end chemistry iteration */
for (int i = 1; i < this->comm_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, LOOP_END, this->group_comm);
}
chem_b = MPI_Wtime();
this->chem_t += chem_b - chem_a;
this->simtime += dt;
iteration++;
}
void poet::ChemistryModule::MasterLoopBreak() {
int type = CHEM_BREAK_MAIN_LOOP;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
}
std::vector<uint32_t>
poet::ChemistryModule::CalculateWPSizesVector(uint32_t n_cells,
uint32_t wp_size) const {
bool mod_pkgs = (n_cells % wp_size) != 0;
uint32_t n_packages = (uint32_t)(n_cells / wp_size) + mod_pkgs;
std::vector<uint32_t> wp_sizes_vector(n_packages, 0);
for (int i = 0; i < n_cells; i++) {
wp_sizes_vector[i % n_packages] += 1;
}
return wp_sizes_vector;
}

View File

@ -0,0 +1,422 @@
#include "IrmResult.h"
#include "poet/ChemistryModule.hpp"
#include <cstddef>
#include <cstdint>
#include <iomanip>
#include <iostream>
#include <mpi.h>
#include <stdexcept>
#include <string>
#include <vector>
inline std::string get_string(int root, MPI_Comm communicator) {
int count;
MPI_Bcast(&count, 1, MPI_INT, root, communicator);
char *buffer = new char[count + 1];
MPI_Bcast(buffer, count, MPI_CHAR, root, communicator);
buffer[count] = '\0';
std::string ret_str(buffer);
delete[] buffer;
return ret_str;
}
void poet::ChemistryModule::WorkerLoop() {
struct worker_s timings;
// HACK: defining the worker iteration count here, which will increment after
// each CHEM_ITER_END message
uint32_t iteration = 1;
bool loop = true;
while (loop) {
int func_type;
PropagateFunctionType(func_type);
switch (func_type) {
case CHEM_INIT: {
RunInitFile(get_string(0, this->group_comm));
break;
}
case CHEM_DHT_ENABLE: {
bool enable;
ChemBCast(&enable, 1, MPI_CXX_BOOL);
uint32_t size_mb;
ChemBCast(&size_mb, 1, MPI_UINT32_T);
SetDHTEnabled(enable, size_mb);
break;
}
case CHEM_DHT_SIGNIF_VEC: {
std::vector<uint32_t> input_vec(this->prop_count);
ChemBCast(input_vec.data(), this->prop_count, MPI_UINT32_T);
SetDHTSignifVector(input_vec);
break;
}
case CHEM_DHT_PROP_TYPE_VEC: {
std::vector<uint32_t> input_vec(this->prop_count);
ChemBCast(input_vec.data(), this->prop_count, MPI_UINT32_T);
SetDHTPropTypeVector(input_vec);
break;
}
case CHEM_DHT_SNAPS: {
int type;
ChemBCast(&type, 1, MPI_INT);
SetDHTSnaps(type, get_string(0, this->group_comm));
break;
}
case CHEM_DHT_READ_FILE: {
ReadDHTFile(get_string(0, this->group_comm));
break;
}
case CHEM_WORK_LOOP: {
WorkerProcessPkgs(timings, iteration);
break;
}
case CHEM_PERF: {
int type;
ChemBCast(&type, 1, MPI_INT);
if (type < WORKER_DHT_HITS) {
WorkerPerfToMaster(type, timings);
break;
}
WorkerMetricsToMaster(type);
break;
}
case CHEM_BREAK_MAIN_LOOP: {
WorkerPostSim(iteration);
loop = false;
break;
}
default: {
throw std::runtime_error("Worker received unknown tag from master.");
}
}
}
}
void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
uint32_t &iteration) {
MPI_Status probe_status;
bool loop = true;
MPI_Barrier(this->group_comm);
while (loop) {
double idle_a = MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status);
double idle_b = MPI_Wtime();
switch (probe_status.MPI_TAG) {
case LOOP_WORK: {
timings.idle_t += idle_b - idle_a;
int count;
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
WorkerDoWork(probe_status, count, timings);
break;
}
case LOOP_END: {
WorkerPostIter(probe_status, iteration);
iteration++;
loop = false;
break;
}
}
}
}
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
int double_count,
struct worker_s &timings) {
int local_work_package_size = 0;
static int counter = 1;
double dht_get_start, dht_get_end;
double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end;
uint32_t iteration;
double dt;
double current_sim_time;
const uint32_t n_cells_times_props = this->prop_count * this->wp_size;
std::vector<double> vecCurrWP(n_cells_times_props + BUFFER_OFFSET);
int count = double_count;
/* receive */
MPI_Recv(vecCurrWP.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm,
MPI_STATUS_IGNORE);
/* decrement count of work_package by BUFFER_OFFSET */
count -= BUFFER_OFFSET;
/* check for changes on all additional variables given by the 'header' of
* mpi_buffer */
// work_package_size
local_work_package_size = vecCurrWP[count];
// current iteration of simulation
iteration = vecCurrWP[count + 1];
// current timestep size
dt = vecCurrWP[count + 2];
// current simulation time ('age' of simulation)
current_sim_time = vecCurrWP[count + 3];
/* 4th double value is currently a placeholder */
// placeholder = mpi_buffer[count+4];
// std::vector<double> vecCurrWP(
// mpi_buffer,
// mpi_buffer + (local_work_package_size * this->prop_names.size()));
vecCurrWP.resize(n_cells_times_props);
std::vector<int32_t> vecMappingWP(this->wp_size);
DHT_ResultObject DHT_Results;
for (uint32_t i = 0; i < local_work_package_size; i++) {
vecMappingWP[i] = i;
}
if (local_work_package_size != this->wp_size) {
// std::vector<double> vecFiller(
// (this->wp_size - local_work_package_size) * prop_count, 0);
// vecCurrWP.insert(vecCurrWP.end(), vecFiller.begin(), vecFiller.end());
// set all remaining cells to inactive
for (int i = local_work_package_size; i < this->wp_size; i++) {
vecMappingWP[i] = -1;
}
}
if (dht_enabled) {
/* check for values in DHT */
dht_get_start = MPI_Wtime();
DHT_Results = dht->checkDHT(local_work_package_size, dt, vecCurrWP);
dht_get_end = MPI_Wtime();
DHT_Results.ResultsToMapping(vecMappingWP);
}
phreeqc_time_start = MPI_Wtime();
WorkerRunWorkPackage(vecCurrWP, vecMappingWP, current_sim_time, dt);
phreeqc_time_end = MPI_Wtime();
if (dht_enabled) {
DHT_Results.ResultsToWP(vecCurrWP);
}
/* send results to master */
MPI_Request send_req;
MPI_Isend(vecCurrWP.data(), count, MPI_DOUBLE, 0, LOOP_WORK, MPI_COMM_WORLD,
&send_req);
if (dht_enabled) {
/* write results to DHT */
dht_fill_start = MPI_Wtime();
dht->fillDHT(local_work_package_size, DHT_Results, vecCurrWP);
dht_fill_end = MPI_Wtime();
timings.dht_get += dht_get_end - dht_get_start;
timings.dht_fill += dht_fill_end - dht_fill_start;
}
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
}
void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
uint32_t iteration) {
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm,
MPI_STATUS_IGNORE);
if (this->dht_enabled) {
this->dht->printStatistics();
if (this->dht_snaps_type == DHT_FILES_ITEREND) {
WorkerWriteDHTDump(iteration);
}
}
}
void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
/* before death, submit profiling/timings to master*/
// double timings_serialized[4];
// timings_serialized[0] = timings.phreeqc_t;
// timings_serialized[1] = timings.dht_get;
// timings_serialized[2] = timings.dht_fill;
// timings_serialized[3] = timings.idle_t;
// // timings
// MPI_Send(timings_serialized, 4, MPI_DOUBLE, 0, 0, this->group_comm);
// // MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
// // MPI_Send(&idle_t, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
// if (this->dht_enabled) {
// // dht_perf
// int dht_perf[3];
// dht_perf[0] = dht->getHits();
// dht_perf[1] = dht->getMisses();
// dht_perf[2] = dht->getEvictions();
// MPI_Send(dht_perf, 3, MPI_INT, 0, 0, this->group_comm);
// }
if (this->dht_enabled && this->dht_snaps_type > DHT_FILES_SIMEND) {
WorkerWriteDHTDump(iteration);
}
}
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
std::stringstream out;
out << this->dht_file_out_dir << "/iter_" << std::setfill('0') << std::setw(4)
<< iteration << ".dht";
int res = dht->tableToFile(out.str().c_str());
if (res != DHT_SUCCESS && this->comm_rank == 2)
std::cerr
<< "CPP: Worker: Error in writing current state of DHT to file.\n";
else if (this->comm_rank == 2)
std::cout << "CPP: Worker: Successfully written DHT to file " << out.str()
<< "\n";
}
void poet::ChemistryModule::WorkerReadDHTDump(
const std::string &dht_input_file) {
int res = dht->fileToTable((char *)dht_input_file.c_str());
if (res != DHT_SUCCESS) {
if (res == DHT_WRONG_FILE) {
if (this->comm_rank == 1)
std::cerr
<< "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n";
} else {
if (this->comm_rank == 1)
std::cerr << "CPP: Worker: Error in loading current state of DHT from "
"file. Continue with empty DHT ...\n";
}
} else {
if (this->comm_rank == 2)
std::cout << "CPP: Worker: Successfully loaded state of DHT from file "
<< dht_input_file << "\n";
}
}
IRM_RESULT
poet::ChemistryModule::WorkerRunWorkPackage(std::vector<double> &vecWP,
std::vector<int32_t> &vecMapping,
double dSimTime, double dTimestep) {
if (this->wp_size != vecMapping.size()) {
return IRM_INVALIDARG;
}
if ((this->wp_size * this->prop_count) != vecWP.size()) {
return IRM_INVALIDARG;
}
// check if we actually need to start phreeqc
bool bRunPhreeqc = false;
for (const auto &aMappingNum : vecMapping) {
if (aMappingNum != -1) {
bRunPhreeqc = true;
break;
}
}
if (!bRunPhreeqc) {
return IRM_OK;
}
std::vector<double> vecCopy;
vecCopy = vecWP;
for (uint32_t i = 0; i < this->prop_count; i++) {
for (uint32_t j = 0; j < this->wp_size; j++) {
vecWP[(i * this->wp_size) + j] = vecCopy[(j * this->prop_count) + i];
}
}
IRM_RESULT result;
this->PhreeqcRM::CreateMapping(vecMapping);
this->SetInternalsFromWP(vecWP, this->wp_size);
this->PhreeqcRM::SetTime(dSimTime);
this->PhreeqcRM::SetTimeStep(dTimestep);
result = this->PhreeqcRM::RunCells();
this->GetWPFromInternals(vecWP, this->wp_size);
vecCopy = vecWP;
for (uint32_t i = 0; i < this->prop_count; i++) {
for (uint32_t j = 0; j < this->wp_size; j++) {
vecWP[(j * this->prop_count) + i] = vecCopy[(i * this->wp_size) + j];
}
}
return result;
}
void poet::ChemistryModule::WorkerPerfToMaster(int type,
const struct worker_s &timings) {
switch (type) {
case WORKER_PHREEQC: {
MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm);
break;
}
case WORKER_DHT_GET: {
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm);
break;
}
case WORKER_DHT_FILL: {
MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm);
break;
}
case WORKER_IDLE: {
MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm);
break;
}
default: {
throw std::runtime_error("Unknown perf type in master's message.");
}
}
}
void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
uint32_t value;
switch (type) {
case WORKER_DHT_HITS: {
value = dht->getHits();
break;
}
case WORKER_DHT_MISS: {
value = dht->getMisses();
break;
}
case WORKER_DHT_EVICTIONS: {
value = dht->getEvictions();
break;
}
default: {
throw std::runtime_error("Unknown perf type in master's message.");
}
}
MPI_Gather(&value, 1, MPI_UINT32_T, NULL, 1, MPI_UINT32_T, 0,
this->group_comm);
}

View File

@ -25,7 +25,6 @@
#include <algorithm>
#include <bits/stdint-intn.h>
#include <cstdint>
#include <poet/ChemSimSeq.hpp>
#include <poet/DiffusionModule.hpp>
#include <poet/Grid.hpp>

View File

@ -1,336 +0,0 @@
#include "poet/PhreeqcWrapper.hpp"
#include "IPhreeqc.hpp"
#include "IrmResult.h"
#include "PhreeqcRM.h"
#include <algorithm>
#include <cstdint>
#include <iterator>
#include <stdexcept>
#include <string>
#include <vector>
poet::PhreeqcWrapper::PhreeqcWrapper(uint32_t inxyz)
: PhreeqcRM(inxyz, 1), iWPSize(inxyz) {
this->vecDefMapping.resize(inxyz);
for (uint32_t i = 0; i < inxyz; i++) {
this->vecDefMapping[i] = i;
}
}
void poet::PhreeqcWrapper::SetupAndLoadDB(
const poet::ChemistryParams &chemPar) {
// TODO: hardcoded options ...
this->SetErrorHandlerMode(1);
this->SetComponentH2O(false);
this->SetRebalanceFraction(0.5);
this->SetRebalanceByCell(true);
this->UseSolutionDensityVolume(false);
this->SetPartitionUZSolids(false);
// Set concentration units
// 1, mg/L; 2, mol/L; 3, kg/kgs
this->SetUnitsSolution(2);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
this->SetUnitsPPassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
this->SetUnitsExchange(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
this->SetUnitsSurface(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
this->SetUnitsGasPhase(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
this->SetUnitsSSassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
this->SetUnitsKinetics(1);
// Set representative volume
std::vector<double> rv;
rv.resize(this->iWPSize, 1.0);
this->SetRepresentativeVolume(rv);
// Set initial porosity
std::vector<double> por;
por.resize(this->iWPSize, 1);
this->SetPorosity(por);
// Set initial saturation
std::vector<double> sat;
sat.resize(this->iWPSize, 1.0);
this->SetSaturation(sat);
// Load database
this->LoadDatabase(chemPar.database_path);
}
void poet::PhreeqcWrapper::InitFromFile(const std::string &strInputFile) {
this->RunFile(true, true, false, strInputFile);
// MDL: this is run only by the workers
this->RunString(true, false, true, "DELETE; -all; PRINT; -warnings 0;");
this->FindComponents();
std::vector<int> ic1;
ic1.resize(this->iWPSize * 7, -1);
// TODO: hardcoded reaction modules
for (int i = 0; i < nxyz; i++) {
ic1[i] = 1; // Solution 1
ic1[nxyz + i] = 1; // Equilibrium 1
ic1[2 * nxyz + i] = -1; // Exchange none
ic1[3 * nxyz + i] = -1; // Surface none
ic1[4 * nxyz + i] = -1; // Gas phase none
ic1[5 * nxyz + i] = -1; // Solid solutions none
ic1[6 * nxyz + i] = 1; // Kinetics 1
}
this->InitialPhreeqc2Module(ic1);
// Initial equilibration of cells
// double time = 0.0;
// double time_step = 0.0;
// this->SetTime(time);
// this->SetTimeStep(time_step);
// this->RunCells();
this->InitInternals();
}
void poet::PhreeqcWrapper::InitInternals() {
uint32_t sum = 0, curr_count;
std::vector<std::string> vecCurrSpecies;
this->vecSpeciesPerModule.reserve(this->MODULE_COUNT);
vecCurrSpecies = this->GetComponents();
this->vecSpeciesNames.insert(this->vecSpeciesNames.end(),
vecCurrSpecies.begin(), vecCurrSpecies.end());
this->vecSpeciesPerModule.push_back(vecCurrSpecies.size());
vecCurrSpecies = this->GetEquilibriumPhases();
this->vecSpeciesNames.insert(this->vecSpeciesNames.end(),
vecCurrSpecies.begin(), vecCurrSpecies.end());
this->vecSpeciesPerModule.push_back(vecCurrSpecies.size());
vecCurrSpecies = this->GetExchangeSpecies();
this->vecSpeciesNames.insert(this->vecSpeciesNames.end(),
vecCurrSpecies.begin(), vecCurrSpecies.end());
this->vecSpeciesPerModule.push_back(vecCurrSpecies.size());
vecCurrSpecies = this->GetSurfaceSpecies();
this->vecSpeciesNames.insert(this->vecSpeciesNames.end(),
vecCurrSpecies.begin(), vecCurrSpecies.end());
this->vecSpeciesPerModule.push_back(vecCurrSpecies.size());
vecCurrSpecies = this->GetKineticReactions();
this->vecSpeciesNames.insert(this->vecSpeciesNames.end(),
vecCurrSpecies.begin(), vecCurrSpecies.end());
this->vecSpeciesPerModule.push_back(vecCurrSpecies.size());
this->iSpeciesCount = this->vecSpeciesNames.size();
}
auto poet::PhreeqcWrapper::GetSpeciesCount() -> uint32_t {
return this->iSpeciesCount;
}
auto poet::PhreeqcWrapper::GetSpeciesCountByModule()
-> const std::vector<uint32_t> & {
return this->vecSpeciesPerModule;
}
auto poet::PhreeqcWrapper::GetSpeciesNamesFull()
-> const std::vector<std::string> & {
return this->vecSpeciesNames;
}
void poet::PhreeqcWrapper::SetInternalsFromWP(const std::vector<double> &vecWP,
uint32_t iCurrWPSize) {
uint32_t iCurrElements;
auto itStart = vecWP.begin();
auto itEnd = itStart;
// this->SetMappingForWP(iCurrWPSize);
int nchem = this->GetChemistryCellCount();
iCurrElements = this->vecSpeciesPerModule[0];
itEnd += iCurrElements * this->iWPSize;
std::vector<double> out;
this->GetConcentrations(out);
this->SetConcentrations(std::vector<double>(itStart, itEnd));
itStart = itEnd;
// Equlibirum Phases
if ((iCurrElements = this->vecSpeciesPerModule[1]) != 0) {
itEnd += iCurrElements * this->iWPSize;
this->GetPPhaseMoles(out);
this->SetPPhaseMoles(std::vector<double>(itStart, itEnd));
itStart = itEnd;
}
// // NOTE: Block for 'Surface' and 'Exchange' is missing because of missing
// // setters @ PhreeqcRM
// // ...
// // BLOCK_END
if ((iCurrElements = this->vecSpeciesPerModule[4]) != 0) {
itEnd += iCurrElements * this->iWPSize;
this->GetKineticsMoles(out);
this->SetKineticsMoles(std::vector<double>(itStart, itEnd));
itStart = itEnd;
}
}
void poet::PhreeqcWrapper::GetWPFromInternals(std::vector<double> &vecWP,
uint32_t iCurrWPSize) {
std::vector<double> vecCurrOutput;
vecWP.clear();
vecWP.reserve(this->iSpeciesCount * this->iWPSize);
this->GetConcentrations(vecCurrOutput);
vecWP.insert(vecWP.end(), vecCurrOutput.begin(), vecCurrOutput.end());
if (this->vecSpeciesPerModule[1] != 0) {
this->GetPPhaseMoles(vecCurrOutput);
vecWP.insert(vecWP.end(), vecCurrOutput.begin(), vecCurrOutput.end());
}
// NOTE: Block for 'Surface' and 'Exchange' is missing because of missing
// Getters @ PhreeqcRM
// ...
// BLOCK_END
if (this->vecSpeciesPerModule[4] != 0) {
this->GetKineticsMoles(vecCurrOutput);
vecWP.insert(vecWP.end(), vecCurrOutput.begin(), vecCurrOutput.end());
}
}
auto poet::PhreeqcWrapper::ReplaceTotalsByPotentials(
const std::vector<double> &vecWP, uint32_t iCurrWPSize)
-> std::vector<double> {
uint32_t iPropsPerCell = this->vecSpeciesNames.size();
int iphreeqcResult;
std::vector<double> vecOutput;
vecOutput.reserve((iPropsPerCell - 1) * iCurrWPSize);
auto itCStart = vecWP.begin();
auto itCEnd = itCStart + (this->vecSpeciesPerModule[0]);
uint32_t iDiff = iPropsPerCell - this->vecSpeciesPerModule[0];
for (uint32_t i = 0; i < iCurrWPSize; i++) {
std::vector<double> vecCIn(itCStart, itCEnd);
double pH, pe;
// FIXME: Hardcoded temperatures and pressures here!
IPhreeqc *util_ptr = this->Concentrations2Utility(
vecCIn, std::vector<double>(1, 25.0), std::vector<double>(1, 1.0));
std::string sInput = "SELECTED_OUTPUT " + std::to_string(this->DHT_SELOUT) +
"; -pH; -pe;RUN_CELLS; -cells 1; -time_step 0";
iphreeqcResult = util_ptr->RunString(sInput.c_str());
if (iphreeqcResult != 0) {
throw std::runtime_error("IPhreeqc Utility returned non-zero value: " +
std::to_string(iphreeqcResult));
}
util_ptr->SetCurrentSelectedOutputUserNumber(this->DHT_SELOUT);
int vtype;
static std::string svalue(100, '\0');
iphreeqcResult = util_ptr->GetSelectedOutputValue2(
1, 0, &vtype, &pH, svalue.data(), svalue.capacity());
if (iphreeqcResult != 0) {
throw std::runtime_error("IPhreeqc Utility returned non-zero value: " +
std::to_string(iphreeqcResult));
}
iphreeqcResult = util_ptr->GetSelectedOutputValue2(
1, 1, &vtype, &pe, svalue.data(), svalue.capacity());
if (iphreeqcResult != 0) {
throw std::runtime_error("IPhreeqc Utility returned non-zero value: " +
std::to_string(iphreeqcResult));
}
vecOutput.insert(vecOutput.end(), vecCIn.begin() + 3, vecCIn.end());
vecOutput.push_back(pH);
vecOutput.push_back(pe);
vecOutput.insert(vecOutput.end(), itCEnd, itCEnd + iDiff);
itCStart = itCEnd + iDiff;
itCEnd = itCStart + (this->vecSpeciesPerModule[0]);
}
return vecOutput;
}
IRM_RESULT
poet::PhreeqcWrapper::RunWorkPackage(std::vector<double> &vecWP,
std::vector<int32_t> &vecMapping,
double dSimTime, double dTimestep) {
if (this->iWPSize != vecMapping.size()) {
return IRM_INVALIDARG;
}
if ((this->iWPSize * this->iSpeciesCount) != vecWP.size()) {
return IRM_INVALIDARG;
}
// check if we actually need to start phreeqc
bool bRunPhreeqc = false;
for (const auto &aMappingNum : vecMapping) {
if (aMappingNum != -1) {
bRunPhreeqc = true;
break;
}
}
if (!bRunPhreeqc) {
return IRM_OK;
}
std::vector<double> vecCopy;
vecCopy = vecWP;
for (uint32_t i = 0; i < this->iSpeciesCount; i++) {
for (uint32_t j = 0; j < this->iWPSize; j++) {
vecWP[(i * this->iWPSize) + j] = vecCopy[(j * this->iSpeciesCount) + i];
}
}
IRM_RESULT result;
this->CreateMapping(vecMapping);
this->SetInternalsFromWP(vecWP, this->iWPSize);
this->SetTime(dSimTime);
this->SetTimeStep(dTimestep);
result = this->PhreeqcRM::RunCells();
this->GetWPFromInternals(vecWP, this->iWPSize);
vecCopy = vecWP;
for (uint32_t i = 0; i < this->iSpeciesCount; i++) {
for (uint32_t j = 0; j < this->iWPSize; j++) {
vecWP[(j * this->iSpeciesCount) + i] = vecCopy[(i * this->iWPSize) + j];
}
}
return result;
}
inline void poet::PhreeqcWrapper::SetMappingForWP(uint32_t iCurrWPSize) {
if (iCurrWPSize == this->iWPSize) {
this->CreateMapping(this->vecDefMapping);
return;
}
std::vector<int> vecCurrMapping(this->vecDefMapping);
for (uint32_t i = iCurrWPSize; i < vecCurrMapping.size(); i++) {
vecCurrMapping[i] = -1;
}
this->CreateMapping(vecCurrMapping);
}