Merge branch 'merge-ma-changes' into nico-daos

This commit is contained in:
Max Luebke 2023-07-27 15:22:22 +02:00
commit 6084c8ac80
39 changed files with 1640 additions and 1339 deletions

View File

@ -24,7 +24,6 @@ find_package(RRuntime REQUIRED)
add_subdirectory(src)
add_subdirectory(R_lib)
add_subdirectory(data)
add_subdirectory(app)
add_subdirectory(bench)

View File

@ -1,5 +1,5 @@
<!--
Time-stamp: "Last modified 2023-01-19 12:06:10 delucia"
Time-stamp: "Last modified 2023-07-27 15:18:32 mluebke"
-->
# POET Daos
@ -40,4 +40,4 @@ mpirun -n 4 -x DAOS_POOL ./poet --dht ../bench/dolo_diffu_inner/dolo_diffu_inner
- src/ChemistryModule/DaosKeyValue.c implements the underlying framework to connect and disconnect from the DAOS server, as well as call the read and write operations
- src/ChemistryModule/DHT_Wrapper.cpp was slightly modified to use DaosKeyValue.c instead of DHT.c
- include/poet/ was with the header file of DaosKeyValue extend and DHT_Wrapper.hpp was modified as well
- include/poet/ was with the header file of DaosKeyValue extend and DHT_Wrapper.hpp was modified as well

View File

@ -264,9 +264,9 @@ StoreSetup <- function(setup) {
if (dht_enabled) {
to_store$DHT <- list(
enabled = dht_enabled,
log = dht_log,
signif = dht_final_signif,
proptype = dht_final_proptype
log = dht_log
#signif = dht_final_signif,
#proptype = dht_final_proptype
)
} else {
to_store$DHT <- FALSE

View File

@ -1,14 +1,7 @@
configure_file(poet.h.in poet.h)
if(POET_USE_PRM_BACKEND)
set(poet_SRC poet_prm.cpp)
else()
set(poet_SRC poet.cpp)
endif()
add_executable(poet ${poet_SRC})
add_executable(poet poet.cpp)
target_include_directories(poet PUBLIC "${CMAKE_CURRENT_BINARY_DIR}")
target_link_libraries(poet PUBLIC poet_lib MPI::MPI_CXX)
#target_compile_definitions(poet PRIVATE OMPI_SKIP_MPICXX)
install(TARGETS poet DESTINATION bin)

View File

@ -19,12 +19,12 @@
*/
#include "poet/ChemistryModule.hpp"
#include <RInside.h>
#include <Rcpp.h>
#include <cstdint>
#include <cstdlib>
#include <poet/DiffusionModule.hpp>
#include <poet/Grid.hpp>
#include <poet/RInsidePOET.hpp>
#include <poet/SimParams.hpp>
#include <cstring>
@ -70,7 +70,7 @@ void writeFieldsToR(RInside &R, const Field &trans, const Field &chem) {
void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
const std::string &database_path) {
chem.SetErrorHandlerMode(1);
chem.SetComponentH2O(true);
chem.SetComponentH2O(false);
chem.SetRebalanceFraction(0.5);
chem.SetRebalanceByCell(true);
chem.UseSolutionDensityVolume(false);
@ -123,7 +123,7 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
double sim_time = .0;
ChemistryModule chem(nxyz_master, params.getNumParams().wp_size,
ChemistryModule chem(nxyz_master, params.getNumParams().wp_size, maxiter,
MPI_COMM_WORLD);
set_chem_parameters(chem, nxyz_master, chem_params.database_path);
chem.RunInitFile(chem_params.input_script);
@ -131,15 +131,16 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
poet::ChemistryModule::SingleCMap init_df = DFToHashMap(d_params.initial_t);
chem.initializeField(diffusion.getField());
if (params.getNumParams().print_progressbar) {
chem.setProgressBarPrintout(true);
}
if (params.getNumParams().dht_enabled) {
chem.SetDHTEnabled(true, params.getNumParams().dht_size_per_process,
chem_params.dht_species);
if (!chem_params.dht_signif.empty()) {
chem.SetDHTSignifVector(chem_params.dht_signif);
}
if (!params.getDHTPropTypeVector().empty()) {
chem.SetDHTPropTypeVector(params.getDHTPropTypeVector());
}
if (!params.getDHTFile().empty()) {
chem.ReadDHTFile(params.getDHTFile());
}
@ -150,8 +151,9 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
/* SIMULATION LOOP */
double dStartTime = MPI_Wtime();
double dSimTime{0};
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
double start_t = MPI_Wtime();
uint32_t tick = 0;
// cout << "CPP: Evaluating next time step" << endl;
// R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
@ -199,7 +201,8 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
<< endl;
// MPI_Barrier(MPI_COMM_WORLD);
double end_t = MPI_Wtime();
dSimTime += end_t - start_t;
} // END SIMULATION LOOP
R.parseEvalQ("profiling <- list()");
@ -231,8 +234,6 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
if (params.getNumParams().dht_enabled) {
R["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_miss"] = Rcpp::wrap(chem.GetWorkerDHTMiss());
R.parseEvalQ("profiling$dht_miss <- dht_miss");
R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
@ -244,7 +245,7 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
chem.MasterLoopBreak();
diffusion.end();
return MPI_Wtime() - dStartTime;
return dSimTime;
}
int main(int argc, char *argv[]) {
@ -288,8 +289,7 @@ int main(int argc, char *argv[]) {
return EXIT_SUCCESS;
}
/* initialize R runtime */
RInside R(argc, argv);
RInsidePOET &R = RInsidePOET::getInstance();
/*Loading Dependencies*/
// TODO: kann raus

View File

@ -1,285 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/ChemistryModule.hpp"
#include <RInside.h>
#include <Rcpp.h>
#include <cstdint>
#include <cstdlib>
#include <poet/DiffusionModule.hpp>
#include <poet/Grid.hpp>
#include <poet/SimParams.hpp>
#include <cstring>
#include <iostream>
#include <string>
#include <vector>
#include <mpi.h>
#include <poet.h>
using namespace std;
using namespace poet;
using namespace Rcpp;
void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
const std::string &database_path) {
chem.SetErrorHandlerMode(1);
chem.SetComponentH2O(false);
chem.SetRebalanceFraction(0.5);
chem.SetRebalanceByCell(true);
chem.UseSolutionDensityVolume(false);
chem.SetPartitionUZSolids(false);
// Set concentration units
// 1, mg/L; 2, mol/L; 3, kg/kgs
chem.SetUnitsSolution(2);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsPPassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsExchange(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsSurface(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsGasPhase(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsSSassemblage(1);
// 0, mol/L cell; 1, mol/L water; 2 mol/L rock
chem.SetUnitsKinetics(1);
// Set representative volume
std::vector<double> rv;
rv.resize(wp_size, 1.0);
chem.SetRepresentativeVolume(rv);
// Set initial porosity
std::vector<double> por;
por.resize(wp_size, 1);
chem.SetPorosity(por);
// Set initial saturation
std::vector<double> sat;
sat.resize(wp_size, 1.0);
chem.SetSaturation(sat);
// Load database
chem.LoadDatabase(database_path);
}
inline double RunMasterLoop(SimParams &params, RInside &R, Grid &grid,
ChemistryParams &chem_params,
const GridParams &g_params, uint32_t nxyz_master) {
DiffusionModule diffusion(poet::DiffusionParams(R), grid);
/* Iteration Count is dynamic, retrieving value from R (is only needed by
* master for the following loop) */
uint32_t maxiter = R.parseEval("mysetup$iterations");
double sim_time = .0;
ChemistryModule chem(grid.GetTotalCellCount(), MPI_COMM_WORLD);
set_chem_parameters(chem, nxyz_master, chem_params.database_path);
chem.RunInitFile(chem_params.input_script);
chem.SetTimeStep(0);
chem.RunCells();
StateMemory *chem_state = grid.RegisterState("state_C", chem.GetPropNames());
auto &chem_field = chem_state->mem;
chem_field = chem.GetField();
/* SIMULATION LOOP */
double dStartTime = MPI_Wtime();
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
uint32_t tick = 0;
// cout << "CPP: Evaluating next time step" << endl;
// R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
double dt = Rcpp::as<double>(
R.parseEval("mysetup$timesteps[" + std::to_string(iter) + "]"));
cout << "CPP: Next time step is " << dt << "[s]" << endl;
/* displaying iteration number, with C++ and R iterator */
cout << "CPP: Going through iteration " << iter << endl;
cout << "CPP: R's $iter: " << ((uint32_t)(R.parseEval("mysetup$iter")))
<< ". Iteration" << endl;
/* run transport */
// TODO: transport to diffusion
diffusion.simulate(dt);
grid.PreModuleFieldCopy(tick++);
cout << "CPP: Chemistry" << endl;
chem.SetTimeStep(dt);
chem.SetConcentrations(chem_field);
chem.SetTimeStep(dt);
chem.RunCells();
chem_field = chem.GetField();
grid.WriteFieldsToR(R);
grid.PreModuleFieldCopy(tick++);
R["req_dt"] = dt;
R["simtime"] = (sim_time += dt);
R.parseEval("mysetup$req_dt <- req_dt");
R.parseEval("mysetup$simtime <- simtime");
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
// MPI_Barrier(MPI_COMM_WORLD);
} // END SIMULATION LOOP
R.parseEvalQ("profiling <- list()");
R["simtime_chemistry"] = chem.GetChemistryTime();
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
chem.MpiWorkerBreak();
diffusion.end();
return MPI_Wtime() - dStartTime;
}
int main(int argc, char *argv[]) {
double dSimTime, sim_end;
int world_size, world_rank;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
if (world_rank == 0) {
cout << "Running POET in version " << poet_version << endl << endl;
}
if (world_rank > 0) {
{
uint32_t nxyz;
MPI_Bcast(&nxyz, 1, MPI_UINT32_T, 0, MPI_COMM_WORLD);
ChemistryModule worker(nxyz, MPI_COMM_WORLD);
worker.MpiWorker();
}
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: finished, cleanup of process " << world_rank << endl;
MPI_Finalize();
return EXIT_SUCCESS;
}
/* initialize R runtime */
RInside R(argc, argv);
/*Loading Dependencies*/
// TODO: kann raus
std::string r_load_dependencies = "source('../R_lib/kin_r_library.R');";
R.parseEvalQ(r_load_dependencies);
SimParams params(world_rank, world_size);
int pret = params.parseFromCmdl(argv, R);
if (pret == poet::PARSER_ERROR) {
MPI_Finalize();
return EXIT_FAILURE;
} else if (pret == poet::PARSER_HELP) {
MPI_Finalize();
return EXIT_SUCCESS;
}
cout << "CPP: R Init (RInside) on process " << world_rank << endl;
R.parseEvalQ("mysetup <- setup");
// if (world_rank == 0) { // get timestep vector from
// grid_init function ... //
std::string master_init_code = "mysetup <- master_init(setup=setup)";
R.parseEval(master_init_code);
Grid grid;
GridParams g_params(R);
grid.InitModuleFromParams(g_params);
grid.PushbackModuleFlow(poet::DIFFUSION_MODULE_NAME, CHEMISTRY_MODULE_NAME);
grid.PushbackModuleFlow(CHEMISTRY_MODULE_NAME, poet::DIFFUSION_MODULE_NAME);
params.initVectorParams(R, grid.GetSpeciesCount());
// MDL: store all parameters
if (world_rank == 0) {
cout << "CPP: Calling R Function to store calling parameters" << endl;
R.parseEvalQ("StoreSetup(setup=mysetup)");
}
if (world_rank == 0) {
cout << "CPP: Init done on process with rank " << world_rank << endl;
}
// MPI_Barrier(MPI_COMM_WORLD);
poet::ChemistryParams chem_params(R);
/* THIS IS EXECUTED BY THE MASTER */
uint32_t nxyz = grid.GetTotalCellCount();
MPI_Bcast(&nxyz, 1, MPI_UINT32_T, 0, MPI_COMM_WORLD);
uint32_t nxyz_master = grid.GetTotalCellCount();
dSimTime = RunMasterLoop(params, R, grid, chem_params, g_params, nxyz_master);
cout << "CPP: finished simulation loop" << endl;
cout << "CPP: start timing profiling" << endl;
R["simtime"] = dSimTime;
R.parseEvalQ("profiling$simtime <- simtime");
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: Done! Results are stored as R objects into <"
<< params.getOutDir() << "/timings.rds>" << endl;
MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: finished, cleanup of process " << world_rank << endl;
MPI_Finalize();
if (world_rank == 0) {
cout << "CPP: done, bye!" << endl;
}
exit(0);
}

View File

@ -1,3 +1,3 @@
add_subdirectory(dolo_diffu_inner)
add_subdirectory(dolo)
add_subdirectory(surfex)
add_subdirectory(barite)

View File

@ -2,6 +2,8 @@ install(FILES
dolo_diffu_inner.R
dolo_diffu_inner_large.R
dolo_inner.pqi
dolo_interp_long.R
phreeqc_kin.dat
DESTINATION
share/poet/bench/dolo
)

View File

@ -1,6 +1,6 @@
## Time-stamp: "Last modified 2023-04-24 15:12:02 mluebke"
## Time-stamp: "Last modified 2023-07-27 14:58:19 mluebke"
database <- normalizePath("../share/poet/examples/phreeqc_kin.dat")
database <- normalizePath("../share/poet/bench/dolo/phreeqc_kin.dat")
input_script <- normalizePath("../share/poet/bench/dolo/dolo_inner.pqi")
#################################################################
@ -44,8 +44,8 @@ grid <- list(
## initial conditions
init_diffu <- list(
"H" = 0.000211313883539788,
"O" = 0.00398302904424952,
"H" = 110.683,
"O" = 55.3413,
"Charge" = -5.0822e-19,
"C(4)" = 1.2279E-4,
"Ca" = 1.2279E-4,
@ -67,8 +67,8 @@ alpha_diffu <- c(
## list of boundary conditions/inner nodes
vecinj_diffu <- list(
list(
"H" = 0.0001540445,
"O" = 0.002148006,
"H" = 110.683,
"O" = 55.3413,
"Charge" = 1.90431e-16,
"C(4)" = 0,
"Ca" = 0,
@ -76,8 +76,8 @@ vecinj_diffu <- list(
"Mg" = 0.001
),
list(
"H" = 0.0001610193,
"O" = 0.002386934,
"H" = 110.683,
"O" = 55.3413,
"Charge" = 1.90431e-16,
"C(4)" = 0,
"Ca" = 0.0,
@ -120,16 +120,21 @@ diffusion <- list(
## # Needed when using DHT
dht_species <- c("H", "O", "Charge", "C(4)", "Ca", "Cl", "Mg", "Calcite",
"Dolomite")
#dht_signif <- rep(15, length(dht_species))
dht_signif <- c(10, 10, 3, 5, 5, 5, 5, 5, 5)
dht_species <- c("H" = 10,
"O" = 10,
"Charge" = 3,
"C(4)" = 5,
"Ca" = 5,
"Cl" = 5,
"Mg" = 5,
"Calcite" = 5,
"Dolomite" =5
)
chemistry <- list(
database = database,
input_script = input_script,
dht_species = dht_species,
dht_signif = dht_signif
dht_species = dht_species
)
#################################################################

View File

@ -1,6 +1,6 @@
## Time-stamp: "Last modified 2023-04-24 15:43:26 mluebke"
## Time-stamp: "Last modified 2023-07-27 14:58:41 mluebke"
database <- normalizePath("../share/poet/examples/phreeqc_kin.dat")
database <- normalizePath("../share/poet/bench/dolo/phreeqc_kin.dat")
input_script <- normalizePath("../share/poet/bench/dolo/dolo_inner.pqi")
#################################################################
@ -118,18 +118,22 @@ diffusion <- list(
## Chemistry module (Phreeqc) ##
#################################################################
## # Needed when using DHT
dht_species <- c("H", "O", "Charge", "C(4)", "Ca", "Cl", "Mg", "Calcite",
"Dolomite")
#dht_signif <- rep(15, length(dht_species))
dht_signif <- c(10, 10, 3, 5, 5, 5, 5, 5, 5)
dht_species <- c("H" = 10,
"O" = 10,
"Charge" = 3,
"C(4)" = 5,
"Ca" = 5,
"Cl" = 5,
"Mg" = 5,
"Calcite" = 5,
"Dolomite" =5
)
chemistry <- list(
database = database,
input_script = input_script,
dht_species = dht_species,
dht_signif = dht_signif
dht_species = dht_species
)
#################################################################

View File

@ -0,0 +1,159 @@
## Time-stamp: "Last modified 2023-07-27 15:10:04 mluebke"
database <- normalizePath("../share/poet/bench/dolo/phreeqc_kin.dat")
input_script <- normalizePath("../share/poet/bench/dolo/dolo_inner.pqi")
#################################################################
## Section 1 ##
## Grid initialization ##
#################################################################
n <- 400
m <- 200
types <- c("scratch", "phreeqc", "rds")
init_cell <- list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = -5.0822e-19,
"C" = 1.2279E-4,
"Ca" = 1.2279E-4,
"Cl" = 0,
"Mg" = 0,
"O2g" = 0.499957,
"Calcite" = 2.07e-4,
"Dolomite" = 0
)
grid <- list(
n_cells = c(n, m),
s_cells = c(5, 2.5),
type = types[1],
init_cell = as.data.frame(init_cell, check.names = FALSE),
props = names(init_cell),
database = database,
input_script = input_script
)
##################################################################
## Section 2 ##
## Diffusion parameters and boundary conditions ##
##################################################################
## initial conditions
init_diffu <- list(
"H" = 1.110124E+02,
"O" = 5.550833E+01,
"Charge" = -1.216307659761E-09,
"C(4)" = 1.230067028174E-04,
"Ca" = 1.230067028174E-04,
"Cl" = 0,
"Mg" = 0
)
## diffusion coefficients
alpha_diffu <- c(
"H" = 1E-6,
"O" = 1E-6,
"Charge" = 1E-6,
"C(4)" = 1E-6,
"Ca" = 1E-6,
"Cl" = 1E-6,
"Mg" = 1E-6
)
## list of boundary conditions/inner nodes
vecinj_diffu <- list(
list(
"H" = 1.110124E+02,
"O" = 5.550796E+01,
"Charge" = -3.230390327801E-08,
"C(4)" = 0,
"Ca" = 0,
"Cl" = 0.002,
"Mg" = 0.001
),
list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = 1.90431e-16,
"C(4)" = 0,
"Ca" = 0.0,
"Cl" = 0.004,
"Mg" = 0.002
),
init_diffu
)
vecinj_inner <- list(
l1 = c(1, floor(n / 2), floor(m / 2))
# l2 = c(2,1400,800),
# l3 = c(2,1600,800)
)
boundary <- list(
# "N" = c(1, rep(0, n-1)),
"N" = rep(3, n),
"E" = rep(3, m),
"S" = rep(3, n),
"W" = rep(3, m)
)
diffu_list <- names(alpha_diffu)
vecinj <- do.call(rbind.data.frame, vecinj_diffu)
names(vecinj) <- names(init_diffu)
diffusion <- list(
init = as.data.frame(init_diffu, check.names = FALSE),
vecinj = vecinj,
vecinj_inner = vecinj_inner,
vecinj_index = boundary,
alpha = alpha_diffu
)
#################################################################
## Section 3 ##
## Chemistry module (Phreeqc) ##
#################################################################
## # Needed when using DHT
dht_species <- c(
"H" = 3,
"O" = 3,
"Charge" = 3,
"C(4)" = 6,
"Ca" = 6,
"Cl" = 3,
"Mg" = 5,
"Calcite" = 4,
"Dolomite" = 4
)
chemistry <- list(
database = database,
input_script = input_script,
dht_species = dht_species
)
#################################################################
## Section 4 ##
## Putting all those things together ##
#################################################################
iterations <- 20000
dt <- 200
setup <- list(
grid = grid,
diffusion = diffusion,
chemistry = chemistry,
iterations = iterations,
timesteps = rep(dt, iterations),
store_result = TRUE,
out_save = c(1, seq(50, iterations, by = 50))
)

View File

@ -2,7 +2,7 @@
### SURFACE and with the RATES for Calcite and Dolomite to use with
### RedModRphree
### Time-stamp: "Last modified 2018-05-06 14:36:23 delucia"
### Time-stamp: "Last modified 2023-05-23 10:35:56 mluebke"
# PHREEQC.DAT for calculating pressure dependence of reactions, with
# molal volumina of aqueous species and of minerals, and
@ -1276,9 +1276,9 @@ Calcite
110 logK25=-5.81
120 mech_c=(10^logK25)*(e^(-Ea/R*deltaT))
130 rate=mech_a+mech_c
## 140 IF SI("Calcite")<0 then moles=parm(1)*rate*(1-SR("Calcite")) # dissolution
140 IF (SI("Calcite")<0 AND M>0) then moles=parm(1)*rate*(1-SR("Calcite")) # dissolution
## 145 IF SI("Calcite")>0 then moles=parm(1)*M*rate*(-1+SR("Calcite")) # precipitation
150 moles=parm(1)*rate*(1-SR("Calcite")) # precipitation
## 150 moles=parm(1)*rate*(1-SR("Calcite")) # precipitation
200 save moles*time
-end

View File

@ -1,8 +0,0 @@
install(
FILES
SimDol2D_diffu.R
SimDol1D_diffu.R
phreeqc_kin.dat
dol.pqi
DESTINATION
share/poet/examples)

View File

@ -1,180 +0,0 @@
#################################################################
## Section 1 ##
## Grid initialization ##
#################################################################
n <- 5
m <- 5
types <- c("scratch", "phreeqc", "rds")
# initsim <- c("SELECTED_OUTPUT", "-high_precision true",
# "-reset false",
# "USER_PUNCH",
# "-head C Ca Cl Mg pH pe O2g Calcite Dolomite",
# "10 PUNCH TOT(\"C\"), TOT(\"Ca\"), TOT(\"Cl\"), TOT(\"Mg\"), -LA(\"H+\"), -LA(\"e-\"), EQUI(\"O2g\"), EQUI(\"Calcite\"), EQUI(\"Dolomite\")",
# "SOLUTION 1",
# "units mol/kgw",
# "temp 25.0", "water 1",
# "pH 9.91 charge",
# "pe 4.0",
# "C 1.2279E-04",
# "Ca 1.2279E-04",
# "Cl 1E-12",
# "Mg 1E-12",
# "PURE 1",
# "O2g -0.6788 10.0",
# "Calcite 0.0 2.07E-3",
# "Dolomite 0.0 0.0",
# "END")
# needed if init type is set to "scratch"
# prop <- c("C", "Ca", "Cl", "Mg", "pH", "pe", "O2g", "Calcite", "Dolomite")
init_cell <- list(
"C" = 1.2279E-4,
"Ca" = 1.2279E-4,
"Cl" = 0,
"Mg" = 0,
"pH" = 9.91,
"pe" = 4,
"O2g" = 10,
"Calcite" = 2.07e-4,
"Dolomite" = 0
)
grid <- list(
n_cells = n,
s_cells = n,
type = types[1],
init_cell = as.data.frame(init_cell),
props = names(init_cell),
init_script = NULL
)
##################################################################
## Section 2 ##
## Diffusion parameters and boundary conditions ##
##################################################################
init_diffu <- c(
"C" = 1.2279E-4,
"Ca" = 1.2279E-4,
"Cl" = 0,
"Mg" = 0,
"pe" = 4,
"pH" = 7
)
alpha_diffu <- c(
"C" = 1E-4,
"Ca" = 1E-4,
"Cl" = 1E-4,
"Mg" = 1E-4,
"pe" = 1E-4,
"pH" = 1E-4
)
vecinj_diffu <- list(
list(
"C" = 0,
"Ca" = 0,
"Cl" = 0.002,
"Mg" = 0.001,
"pe" = 4,
"pH" = 9.91
)
)
boundary <- list(
"E" = 0,
"W" = 1
)
diffu_list <- names(alpha_diffu)
diffusion <- list(
init = init_diffu,
vecinj = do.call(rbind.data.frame, vecinj_diffu),
vecinj_index = boundary,
alpha = alpha_diffu
)
##################################################################
## Section 3 ##
## Phreeqc simulation ##
##################################################################
db <- RPhreeFile(system.file("extdata", "phreeqc_kin.dat",
package = "RedModRphree"
), is.db = TRUE)
phreeqc::phrLoadDatabaseString(db)
# NOTE: This won't be needed in the future either. Could also be done in a. pqi
# file
base <- c(
"SOLUTION 1",
"units mol/kgw",
"temp 25.0",
"water 1",
"pH 9.91 charge",
"pe 4.0",
"C 1.2279E-04",
"Ca 1.2279E-04",
"Mg 0.001",
"Cl 0.002",
"PURE 1",
"O2g -0.1675 10",
"KINETICS 1",
"-steps 100",
"-step_divide 100",
"-bad_step_max 2000",
"Calcite", "-m 0.000207",
"-parms 0.0032",
"Dolomite",
"-m 0.0",
"-parms 0.00032",
"END"
)
selout <- c(
"SELECTED_OUTPUT", "-high_precision true", "-reset false",
"-time", "-soln", "-temperature true", "-water true",
"-pH", "-pe", "-totals C Ca Cl Mg",
"-kinetic_reactants Calcite Dolomite", "-equilibrium O2g"
)
# Needed when using DHT
signif_vector <- c(7, 7, 7, 7, 7, 7, 7, 5, 5)
prop_type <- c("act", "act", "act", "act", "logact", "logact", "ignore", "act", "act")
prop <- names(init_cell)
iterations <- 500
setup <- list(
# bound = myboundmat,
base = base,
first = selout,
# initsim = initsim,
# Cf = 1,
grid = grid,
diffusion = diffusion,
prop = prop,
immobile = c(7, 8, 9),
kin = c(8, 9),
ann = list(O2g = -0.1675),
# phase = "FLUX1",
# density = "DEN1",
reduce = FALSE,
# snapshots = demodir, ## directory where we will read MUFITS SUM files
# gridfile = paste0(demodir, "/d2ascii.run.GRID.SUM")
# init = init,
# vecinj = vecinj,
# cinj = c(0,1),
# boundary = boundary,
# injections = FALSE,
iterations = iterations,
timesteps = rep(10, iterations)
)

View File

@ -1,213 +0,0 @@
database <- normalizePath("../share/poet/examples/phreeqc_kin.dat")
input_script <- normalizePath("../share/poet/examples/dol.pqi")
#################################################################
## Section 1 ##
## Grid initialization ##
#################################################################
n <- 100
m <- 100
types <- c("scratch", "phreeqc", "rds")
# initsim <- c("SELECTED_OUTPUT", "-high_precision true",
# "-reset false",
# "USER_PUNCH",
# "-head C Ca Cl Mg pH pe O2g Calcite Dolomite",
# "10 PUNCH TOT(\"C\"), TOT(\"Ca\"), TOT(\"Cl\"), TOT(\"Mg\"), -LA(\"H+\"), -LA(\"e-\"), EQUI(\"O2g\"), EQUI(\"Calcite\"), EQUI(\"Dolomite\")",
# "SOLUTION 1",
# "units mol/kgw",
# "temp 25.0", "water 1",
# "pH 9.91 charge",
# "pe 4.0",
# "C 1.2279E-04",
# "Ca 1.2279E-04",
# "Cl 1E-12",
# "Mg 1E-12",
# "PURE 1",
# "O2g -0.6788 10.0",
# "Calcite 0.0 2.07E-3",
# "Dolomite 0.0 0.0",
# "END")
# needed if init type is set to "scratch"
# prop <- c("C", "Ca", "Cl", "Mg", "pH", "pe", "O2g", "Calcite", "Dolomite")
init_cell <- list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = -5.0822e-19,
"C" = 1.2279E-4,
"Ca" = 1.2279E-4,
"Cl" = 0,
"Mg" = 0,
"O2g" = 0.499957,
"Calcite" = 2.07e-4,
"Dolomite" = 0
)
grid <- list(
n_cells = c(n, m),
s_cells = c(n, m),
type = types[1],
init_cell = as.data.frame(init_cell),
props = names(init_cell),
database = database,
input_script = input_script
)
##################################################################
## Section 2 ##
## Diffusion parameters and boundary conditions ##
##################################################################
init_diffu <- c(
"H" = 110.683,
"O" = 55.3413,
"Charge" = -5.0822e-19,
"C" = 1.2279E-4,
"Ca" = 1.2279E-4,
"Cl" = 0,
"Mg" = 0
)
alpha_diffu <- c(
"H" = 1E-4,
"O" = 1E-4,
"Charge" = 1E-4,
"C" = 1E-4,
"Ca" = 1E-4,
"Cl" = 1E-4,
"Mg" = 1E-4
)
vecinj_diffu <- list(
list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = 1.90431e-16,
"C" = 0,
"Ca" = 0,
"Cl" = 0.002,
"Mg" = 0.001
)
)
#inner_index <- c(5, 15, 25)
#inner_vecinj_index <- rep(1, 3)
#
#vecinj_inner <- cbind(inner_index, inner_vecinj_index)
vecinj_inner <- list(
l1 = c(1,2,2)
)
boundary <- list(
"N" = rep(0, n),
"E" = rep(0, n),
"S" = rep(0, n),
"W" = rep(0, n)
)
diffu_list <- names(alpha_diffu)
diffusion <- list(
init = init_diffu,
vecinj = do.call(rbind.data.frame, vecinj_diffu),
vecinj_inner = vecinj_inner,
vecinj_index = boundary,
alpha = alpha_diffu
)
#################################################################
## Section 3 ##
## Chemitry module (Phreeqc) ##
#################################################################
# db <- RPhreeFile(system.file("extdata", "phreeqc_kin.dat",
# package = "RedModRphree"
# ), is.db = TRUE)
#
# phreeqc::phrLoadDatabaseString(db)
# NOTE: This won't be needed in the future either. Could also be done in a. pqi
# file
base <- c(
"SOLUTION 1",
"units mol/kgw",
"temp 25.0",
"water 1",
"pH 9.91 charge",
"pe 4.0",
"C 1.2279E-04",
"Ca 1.2279E-04",
"Mg 0.001",
"Cl 0.002",
"PURE 1",
"O2g -0.1675 10",
"KINETICS 1",
"-steps 100",
"-step_divide 100",
"-bad_step_max 2000",
"Calcite", "-m 0.000207",
"-parms 0.0032",
"Dolomite",
"-m 0.0",
"-parms 0.00032",
"END"
)
selout <- c(
"SELECTED_OUTPUT", "-high_precision true", "-reset false",
"-time", "-soln", "-temperature true", "-water true",
"-pH", "-pe", "-totals C Ca Cl Mg",
"-kinetic_reactants Calcite Dolomite", "-equilibrium O2g"
)
# Needed when using DHT
signif_vector <- c(10, 10, 10, 7, 7, 7, 7, 0, 5, 5)
prop_type <- c("", "", "", "act", "act", "act", "act", "ignore", "", "")
prop <- names(init_cell)
chemistry <- list(
database = database,
input_script = input_script
)
#################################################################
## Section 4 ##
## Putting all those things together ##
#################################################################
iterations <- 10
setup <- list(
# bound = myboundmat,
base = base,
first = selout,
# initsim = initsim,
# Cf = 1,
grid = grid,
diffusion = diffusion,
chemistry = chemistry,
prop = prop,
immobile = c(7, 8, 9),
kin = c(8, 9),
ann = list(O2g = -0.1675),
# phase = "FLUX1",
# density = "DEN1",
reduce = FALSE,
# snapshots = demodir, ## directory where we will read MUFITS SUM files
# gridfile = paste0(demodir, "/d2ascii.run.GRID.SUM")
# init = init,
# vecinj = vecinj,
# cinj = c(0,1),
# boundary = boundary,
# injections = FALSE,
iterations = iterations,
timesteps = rep(10, iterations)
)

View File

@ -1,13 +0,0 @@
phreeqc:
RebalanceByCell: True
RebalanceFraction: 0.5
SolutionDensityVolume: False
PartitionUZSolids: False
Units:
Solution: "mg/L"
PPassamblage: "mol/L"
Exchange: "mol/L"
Surface: "mol/L"
GasPhase: "mol/L"
SSassemblage: "mol/L"
Kinetics: "mol/L"

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 73 KiB

@ -1 +1 @@
Subproject commit ba5dc40af119da015d36d2554ecd558ef9da1eb6
Subproject commit 89f713b273cd5340b2e8169523da04c2d7ad89c9

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-04-24 14:30:06 mluebke"
// Time-stamp: "Last modified 2023-07-27 15:05:26 mluebke"
#ifndef CHEMISTRYMODULE_H_
#define CHEMISTRYMODULE_H_
@ -24,20 +24,6 @@ namespace poet {
*/
class ChemistryModule : public PhreeqcRM {
public:
#ifdef POET_USE_PRM
/**
* Creates a new instance of Chemistry module with given grid cell count and
* using MPI communicator.
*
* Constructor is just a wrapper around PhreeqcRM's constructor, so just calls
* the base class constructor.
*
* \param nxyz Count of grid cells.
* \param communicator MPI communicator where work will be distributed.
*/
ChemistryModule(uint32_t nxyz, MPI_Comm communicator)
: PhreeqcRM(nxyz, communicator), n_cells(nxyz){};
#else
/**
* Creates a new instance of Chemistry module with given grid cell count, work
* package size and communicator.
@ -53,13 +39,13 @@ public:
* \param wp_size Count of grid cells to fill each work package at maximum.
* \param communicator MPI communicator to distribute work in.
*/
ChemistryModule(uint32_t nxyz, uint32_t wp_size, MPI_Comm communicator);
ChemistryModule(uint32_t nxyz, uint32_t wp_size, std::uint32_t maxiter,
MPI_Comm communicator);
/**
* Deconstructor, which frees DHT data structure if used.
*/
~ChemistryModule();
#endif
/**
* Parses the input script and extract information needed during runtime.
@ -92,8 +78,6 @@ public:
*/
auto GetChemistryTime() const { return this->chem_t; }
#ifndef POET_USE_PRM
/**
* Create a new worker instance inside given MPI communicator.
*
@ -126,9 +110,9 @@ public:
* Enumerating DHT file options
*/
enum {
DHT_FILES_DISABLED = 0, //!< disabled file output
DHT_FILES_SIMEND, //!< only output of snapshot after simulation
DHT_FILES_ITEREND //!< output snapshots after each iteration
DHT_SNAPS_DISABLED = 0, //!< disabled file output
DHT_SNAPS_SIMEND, //!< only output of snapshot after simulation
DHT_SNAPS_ITEREND //!< output snapshots after each iteration
};
/**
@ -162,7 +146,7 @@ public:
* \param size_mb Size in megabyte to allocate for the DHT if enabled.
* \param key_species Name of species to be used for key creation.
*/
void SetDHTEnabled(bool enable, uint32_t size_mb,
void SetDHTEnabled(bool enable, std::uint64_t size_mb,
const std::vector<std::string> &key_species);
/**
* **Master only** Set DHT snapshots to specific mode.
@ -179,13 +163,7 @@ public:
* is defined by prop_type vector (ChemistryModule::GetPropNames).
*/
void SetDHTSignifVector(std::vector<uint32_t> &signif_vec);
/**
* **Master only** Set the DHT rounding type of each species. See
* DHT_PROP_TYPES enumemartion for explanation.
*
* \param proptype_vec Vector defining DHT prop type for each species.
*/
void SetDHTPropTypeVector(std::vector<uint32_t> proptype_vec);
/**
* **Master only** Load the state of the DHT from given file.
*
@ -254,13 +232,6 @@ public:
*/
std::vector<uint32_t> GetWorkerDHTHits() const;
/**
* **Master only** Collect and return DHT misses of all workers.
*
* \return Vector of all count of DHT misses.
*/
std::vector<uint32_t> GetWorkerDHTMiss() const;
/**
* **Master only** Collect and return DHT evictions of all workers.
*
@ -274,18 +245,23 @@ public:
* \return Reference to the chemical field.
*/
Field &getField() { return this->chem_field; }
#endif
/**
* **Master only** Enable/disable progress bar.
*
* \param enabled True if print progressbar, false if not.
*/
void setProgressBarPrintout(bool enabled) {
this->print_progessbar = enabled;
};
protected:
#ifdef POET_USE_PRM
void PrmToPoetField(std::vector<double> &field);
#else
enum {
CHEM_INIT,
CHEM_WP_SIZE,
CHEM_INIT_SPECIES,
CHEM_DHT_ENABLE,
CHEM_DHT_SIGNIF_VEC,
CHEM_DHT_PROP_TYPE_VEC,
CHEM_DHT_SNAPS,
CHEM_DHT_READ_FILE,
CHEM_WORK_LOOP,
@ -301,10 +277,12 @@ protected:
WORKER_DHT_FILL,
WORKER_IDLE,
WORKER_DHT_HITS,
WORKER_DHT_MISS,
WORKER_DHT_EVICTIONS
WORKER_DHT_EVICTIONS,
};
std::vector<uint32_t> dht_hits;
std::vector<uint32_t> dht_evictions;
struct worker_s {
double phreeqc_t = 0.;
double dht_get = 0.;
@ -359,9 +337,11 @@ protected:
void unshuffleField(const std::vector<double> &in_buffer,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count, std::vector<double> &out_field);
std::vector<std::uint32_t>
std::vector<std::int32_t>
parseDHTSpeciesVec(const std::vector<std::string> &species_vec) const;
void BCastStringVec(std::vector<std::string> &io);
int comm_size, comm_rank;
MPI_Comm group_comm;
@ -370,7 +350,7 @@ protected:
uint32_t wp_size{CHEM_DEFAULT_WP_SIZE};
bool dht_enabled{false};
int dht_snaps_type{DHT_FILES_DISABLED};
int dht_snaps_type{DHT_SNAPS_DISABLED};
std::string dht_file_out_dir;
poet::DHT_Wrapper *dht = nullptr;
@ -391,9 +371,11 @@ protected:
std::array<double, 2> base_totals{0};
#endif
bool print_progessbar{false};
double chem_t = 0.;
std::uint32_t file_pad;
double chem_t{0.};
uint32_t n_cells = 0;
uint32_t prop_count = 0;

View File

@ -67,7 +67,7 @@
*/
typedef struct {
/** Count of writes to specific process this process did. */
int* writes_local;
int *writes_local;
/** Writes after last call of DHT_print_statistics. */
int old_writes;
/** How many read misses occur? */
@ -100,24 +100,24 @@ typedef struct {
/** Size of the MPI communicator respectively all participating processes. */
int comm_size;
/** Pointer to a hashfunction. */
uint64_t (*hash_func)(int, const void*);
uint64_t (*hash_func)(int, const void *);
/** Pre-allocated memory where a bucket can be received. */
void* recv_entry;
void *recv_entry;
/** Pre-allocated memory where a bucket to send can be stored. */
void* send_entry;
void *send_entry;
/** Allocated memory on which the MPI window was created. */
void* mem_alloc;
void *mem_alloc;
/** Count of read misses over all time. */
int read_misses;
/** Count of evictions over all time. */
int evictions;
/** Array of indeces where a bucket can be stored. */
uint64_t* index;
uint64_t *index;
/** Count of possible indeces. */
unsigned int index_count;
#ifdef DHT_STATISTICS
/** Detailed statistics of the usage of the DHT. */
DHT_stats* stats;
DHT_stats *stats;
#endif
} DHT;
@ -141,9 +141,9 @@ typedef struct {
* @return DHT* The returned value is the \a DHT-object which serves as a handle
* for all DHT operations. If an error occured NULL is returned.
*/
extern DHT* DHT_create(MPI_Comm comm, uint64_t size_per_process,
extern DHT *DHT_create(MPI_Comm comm, uint64_t size_per_process,
unsigned int data_size, unsigned int key_size,
uint64_t (*hash_func)(int, const void*));
uint64_t (*hash_func)(int, const void *));
/**
* @brief Write data into DHT.
@ -161,10 +161,14 @@ extern DHT* DHT_create(MPI_Comm comm, uint64_t size_per_process,
* @param table Pointer to the \a DHT-object.
* @param key Pointer to the key.
* @param data Pointer to the data.
* @param proc If not NULL, returns the process number written to.
* @param index If not NULL, returns the index of the bucket where the data was
* written to.
* @return int Returns either DHT_SUCCESS on success or correspondending error
* value on eviction or error.
*/
extern int DHT_write(DHT* table, void* key, void* data);
extern int DHT_write(DHT *table, void *key, void *data, uint32_t *proc,
uint32_t *index);
/**
* @brief Read data from DHT.
@ -187,7 +191,7 @@ extern int DHT_write(DHT* table, void* key, void* data);
* @return int Returns either DHT_SUCCESS on success or correspondending error
* value on read miss or error.
*/
extern int DHT_read(DHT* table, void* key, void* destination);
extern int DHT_read(DHT *table, const void *key, void *destination);
/**
* @brief Write current state of DHT to file.
@ -203,7 +207,7 @@ extern int DHT_read(DHT* table, void* key, void* destination);
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
* opened/closed or DHT_WRITE_ERROR if file is not writable.
*/
extern int DHT_to_file(DHT* table, const char* filename);
extern int DHT_to_file(DHT *table, const char *filename);
/**
* @brief Read state of DHT from file.
@ -223,7 +227,7 @@ extern int DHT_to_file(DHT* table, const char* filename);
* file doesn't match expectation. This is possible if the data size or key size
* is different.
*/
extern int DHT_from_file(DHT* table, const char* filename);
extern int DHT_from_file(DHT *table, const char *filename);
/**
* @brief Free ressources of DHT.
@ -241,7 +245,7 @@ extern int DHT_from_file(DHT* table, const char* filename);
* @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on
* internal MPI error.
*/
extern int DHT_free(DHT* table, int* eviction_counter, int* readerror_counter);
extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter);
/**
* @brief Prints a table with statistics about current use of DHT.
@ -267,7 +271,7 @@ extern int DHT_free(DHT* table, int* eviction_counter, int* readerror_counter);
* @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI
* error.
*/
extern int DHT_print_statistics(DHT* table);
extern int DHT_print_statistics(DHT *table);
/**
* @brief Determine destination rank and index.
@ -286,8 +290,8 @@ extern int DHT_print_statistics(DHT* table);
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int* dest_rank,
uint64_t* index, unsigned int index_count);
unsigned int table_size, unsigned int *dest_rank,
uint64_t *index, unsigned int index_count);
/**
* @brief Set the occupied flag.
@ -296,7 +300,7 @@ static void determine_dest(uint64_t hash, int comm_size,
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char* flag_byte);
static void set_flag(char *flag_byte);
/**
* @brief Get the occupied flag.

View File

@ -2,7 +2,7 @@
#define DHT_TYPES_H_
namespace poet {
enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_CHARGE, DHT_TYPE_IGNORE, DHT_TYPE_TOTAL };
enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_CHARGE, DHT_TYPE_TOTAL };
}
#endif // DHT_TYPES_H_

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-04-24 16:23:42 mluebke"
// Time-stamp: "Last modified 2023-07-27 15:19:33 mluebke"
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
@ -24,9 +24,13 @@
#define DHT_WRAPPER_H
#include "poet/DHT_Types.hpp"
#include "poet/LookupKey.hpp"
#include "poet/Rounding.hpp"
#include <array>
#include <cstdint>
#include <limits>
#include <string>
#include <utility>
#include <vector>
@ -43,35 +47,7 @@
namespace poet {
struct DHT_SCNotation {
std::int8_t exp : 8;
std::int64_t significant : 56;
};
union DHT_Keyelement {
double fp_elemet;
DHT_SCNotation sc_notation;
};
using DHT_ResultObject = struct DHTResobj {
uint32_t length;
std::vector<std::vector<DHT_Keyelement>> keys;
std::vector<std::vector<double>> results;
std::vector<bool> needPhreeqc;
};
/**
* @brief Return user-defined md5sum
*
* This function will calculate a hashsum with the help of md5sum. Therefore the
* md5sum for a given key is calculated and divided into two 64-bit parts. These
* will be XORed and returned as the hash.
*
* @param key_size Size of key in bytes
* @param key Pointer to key
* @return uint64_t Hashsum as an unsigned 64-bit integer
*/
static uint64_t get_md5(int key_size, void *key);
using DHT_Location = std::pair<std::uint32_t, std::uint32_t>;
/**
* @brief C++-Wrapper around DHT implementation
@ -82,6 +58,20 @@ static uint64_t get_md5(int key_size, void *key);
*/
class DHT_Wrapper {
public:
using DHT_ResultObject = struct DHTResobj {
uint32_t length;
std::vector<LookupKey> keys;
std::vector<std::vector<double>> results;
std::vector<double> old_values;
std::vector<bool> needPhreeqc;
std::vector<DHT_Location> locations;
};
static constexpr std::int32_t DHT_KEY_INPUT_CUSTOM =
std::numeric_limits<std::int32_t>::min();
static constexpr int DHT_KEY_SIGNIF_DEFAULT = 5;
/**
* @brief Construct a new dht wrapper object
*
@ -97,9 +87,9 @@ public:
* for key creation.
* @param data_count Count of data entries
*/
DHT_Wrapper(MPI_Comm dht_comm, uint32_t dht_size,
const std::vector<std::uint32_t> &key_indices,
uint32_t data_count);
DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
const std::vector<std::int32_t> &key_indices,
const std::vector<std::string> &key_names, uint32_t data_count);
/**
* @brief Destroy the dht wrapper object
*
@ -111,6 +101,9 @@ public:
*/
~DHT_Wrapper();
DHT_Wrapper &operator=(const DHT_Wrapper &) = delete;
DHT_Wrapper(const DHT_Wrapper &) = delete;
/**
* @brief Check if values of workpackage are stored in DHT
*
@ -129,7 +122,7 @@ public:
*/
auto checkDHT(int length, double dt, const std::vector<double> &work_package,
std::vector<std::uint32_t> &curr_mapping)
-> const poet::DHT_ResultObject &;
-> const DHT_ResultObject &;
/**
* @brief Write simulated values into DHT
@ -190,13 +183,6 @@ public:
*/
auto getHits() { return this->dht_hits; };
/**
* @brief Get the Misses object
*
* @return uint64_t Count of read misses
*/
auto getMisses() { return this->dht_miss; };
/**
* @brief Get the Evictions object
*
@ -204,13 +190,33 @@ public:
*/
auto getEvictions() { return this->dht_evictions; };
void SetSignifVector(std::vector<uint32_t> signif_vec);
void SetPropTypeVector(std::vector<uint32_t> prop_type_vec);
void setBaseTotals(const std::array<double, 2> &bt) {
this->base_totals = bt;
void resetCounter() {
this->dht_hits = 0;
this->dht_evictions = 0;
}
void SetSignifVector(std::vector<uint32_t> signif_vec);
auto GetSignifVector() { return this->dht_signif_vector; };
auto getDataCount() { return this->data_count; }
auto getCommunicator() { return this->communicator; }
DHT *getDHT() { return this->dht_object; };
DHT_ResultObject &getDHTResults() { return this->dht_results; }
const auto &getKeyElements() const { return this->input_key_elements; }
void setBaseTotals(double tot_h, double tot_o) {
this->base_totals = {tot_h, tot_o};
}
std::uint32_t getInputCount() const {
return this->input_key_elements.size();
}
std::uint32_t getOutputCount() const { return this->data_count; }
private:
uint32_t key_count;
uint32_t data_count;
@ -219,21 +225,27 @@ private:
uint32_t key_size = 0;
DAOSKV *daosKV_object;
DHT *dht_object;
MPI_Comm communicator;
std::vector<DHT_Keyelement> fuzzForDHT(int var_count, void *key, double dt);
LookupKey fuzzForDHT(int var_count, void *key, double dt);
std::vector<double>
outputToInputAndRates(const std::vector<double> &old_results,
const std::vector<double> &new_results);
std::vector<double>
inputAndRatesToOutput(const std::vector<double> &dht_data);
uint32_t dht_hits = 0;
uint32_t dht_miss = 0;
uint32_t dht_evictions = 0;
std::vector<uint32_t> dht_signif_vector;
std::vector<std::uint32_t> dht_prop_type_vector;
std::vector<std::uint32_t> input_key_elements;
std::vector<std::int32_t> input_key_elements;
static constexpr int DHT_KEY_SIGNIF_DEFAULT = 5;
static constexpr int DHT_KEY_SIGNIF_TOTALS = 10;
static constexpr int DHT_KEY_SIGNIF_CHARGE = 3;
std::vector<std::string> key_names;
DHT_ResultObject dht_results;

View File

@ -0,0 +1,50 @@
// Time-stamp: "Last modified 2023-07-26 10:20:10 mluebke"
#ifndef LOOKUPKEY_H_
#define LOOKUPKEY_H_
#include "poet/HashFunctions.hpp"
#include <cstdint>
#include <cstring>
#include <vector>
namespace poet {
struct Lookup_SC_notation {
std::int8_t exp : 8;
std::int64_t significant : 56;
};
union Lookup_Keyelement {
double fp_element;
Lookup_SC_notation sc_notation;
bool operator==(const Lookup_Keyelement &other) const {
return std::memcmp(this, &other, sizeof(Lookup_Keyelement)) == 0 ? true
: false;
}
};
class LookupKey : public std::vector<Lookup_Keyelement> {
public:
explicit LookupKey(size_type count);
using std::vector<Lookup_Keyelement>::vector;
std::vector<double> to_double() const;
static Lookup_SC_notation round_from_double(const double in,
std::uint32_t signif);
static double to_double(const Lookup_SC_notation in);
};
struct LookupKeyHasher {
std::uint64_t operator()(const LookupKey &k) const {
const uint32_t key_size = k.size() * sizeof(Lookup_Keyelement);
return poet::Murmur2_64A(key_size, k.data());
}
};
} // namespace poet
#endif // LOOKUPKEY_H_

View File

@ -0,0 +1,21 @@
#ifndef RPOET_H_
#define RPOET_H_
#include <RInside.h>
class RInsidePOET : public RInside {
public:
static RInsidePOET &getInstance() {
static RInsidePOET instance;
return instance;
}
RInsidePOET(RInsidePOET const &) = delete;
void operator=(RInsidePOET const &) = delete;
private:
RInsidePOET() : RInside(){};
};
#endif // RPOET_H_

91
include/poet/Rounding.hpp Normal file
View File

@ -0,0 +1,91 @@
#ifndef ROUNDING_H_
#define ROUNDING_H_
#include "LookupKey.hpp"
#include <cmath>
#include <cstdint>
namespace poet {
constexpr std::int8_t AQUEOUS_EXP = -13;
template <typename Input, typename Output, typename ConvertTo = double>
class IRounding {
public:
virtual Output round(const Input &, std::uint32_t signif) = 0;
virtual ConvertTo convert(const Output &) = 0;
};
class DHT_Rounder {
public:
Lookup_Keyelement round(const double &value, std::uint32_t signif,
bool is_ho) {
std::int8_t exp =
static_cast<std::int8_t>(std::floor(std::log10(std::fabs(value))));
if (!is_ho) {
if (exp < AQUEOUS_EXP) {
return {.sc_notation = {0, 0}};
}
std::int8_t diff = exp - signif + 1;
if (diff < AQUEOUS_EXP) {
signif -= AQUEOUS_EXP - diff;
}
}
Lookup_Keyelement elem;
elem.sc_notation.exp = exp;
elem.sc_notation.significant =
static_cast<std::int64_t>(value * std::pow(10, signif - exp - 1));
return elem;
}
double convert(const Lookup_Keyelement &key_elem) {
std::int32_t normalized_exp = static_cast<std::int32_t>(
-std::log10(std::fabs(key_elem.sc_notation.significant)));
// add stored exponent to normalized exponent
normalized_exp += key_elem.sc_notation.exp;
// return significant times 10 to the power of exponent
return key_elem.sc_notation.significant * std::pow(10., normalized_exp);
}
};
class PHT_Rounder : public IRounding<Lookup_Keyelement, Lookup_Keyelement> {
public:
Lookup_Keyelement round(const Lookup_Keyelement &value,
std::uint32_t signif) {
Lookup_Keyelement new_val = value;
std::uint32_t diff_signif =
static_cast<std::uint32_t>(
std::ceil(std::log10(value.sc_notation.significant))) -
signif;
new_val.sc_notation.significant = static_cast<int64_t>(
value.sc_notation.significant / std::pow(10., diff_signif));
if (new_val.sc_notation.significant == 0) {
new_val.sc_notation.exp = 0;
}
return new_val;
}
double convert(const Lookup_Keyelement &key_elem) {
std::int32_t normalized_exp = static_cast<std::int32_t>(
-std::log10(std::fabs(key_elem.sc_notation.significant)));
// add stored exponent to normalized exponent
normalized_exp += key_elem.sc_notation.exp;
// return significant times 10 to the power of exponent
return key_elem.sc_notation.significant * std::pow(10., normalized_exp);
}
};
} // namespace poet
#endif // ROUNDING_H_

View File

@ -26,15 +26,16 @@
#include <string_view>
#include <vector>
#include "ChemistryModule.hpp"
#include "argh.hpp" // Argument handler https://github.com/adishavit/argh
#include <RInside.h>
#include <Rcpp.h>
// BSD-licenced
/** Standard DHT Size. Defaults to 1 GB (1000 MB) */
constexpr uint32_t DHT_SIZE_PER_PROCESS_MB = 1E3;
constexpr uint32_t DHT_SIZE_PER_PROCESS_MB = 1.5E3;
/** Standard work package size */
#define WORK_PACKAGE_SIZE_DEFAULT 5
#define WORK_PACKAGE_SIZE_DEFAULT 32
namespace poet {
@ -67,6 +68,9 @@ typedef struct {
unsigned int wp_size;
/** indicates if resulting grid should be stored after every iteration */
bool store_result;
/** indicating whether the progress bar during chemistry simulation should be
* printed or not */
bool print_progressbar;
} t_simparams;
using GridParams = struct s_GridParams {
@ -190,16 +194,6 @@ public:
*/
auto getDHTSignifVector() const { return this->dht_signif_vector; };
/**
* @brief Get the DHT_Prop_Type_Vector
*
* Returns a vector indicating of which type a variable of a grid cell is.
*
* @return std::vector<std::string> Vector if strings defining a type of a
* variable
*/
auto getDHTPropTypeVector() const { return this->dht_prop_type_vector; };
/**
* @brief Return name of DHT snapshot.
*
@ -233,7 +227,8 @@ public:
private:
std::list<std::string> validateOptions(argh::parser cmdl);
const std::set<std::string> flaglist{"ignore-result", "dht", "dht-nolog"};
const std::set<std::string> flaglist{"ignore-result", "dht", "dht-nolog", "P",
"progress"};
const std::set<std::string> paramlist{"work-package-size", "dht-signif",
"dht-strategy", "dht-size",
"dht-snaps", "dht-file"};
@ -241,7 +236,6 @@ private:
t_simparams simparams;
std::vector<uint32_t> dht_signif_vector;
std::vector<uint32_t> dht_prop_type_vector;
std::string dht_file;
std::string filesim;

View File

@ -1,6 +1,4 @@
add_subdirectory(DataStructures)
file(GLOB poet_lib_SRC
file(GLOB_RECURSE poet_lib_SRC
CONFIGURE_DEPENDS
"*.cpp" "*.c")
@ -10,7 +8,15 @@ find_library(CRYPTO_LIBRARY crypto)
add_library(poet_lib ${poet_lib_SRC})
target_include_directories(poet_lib PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(poet_lib PUBLIC
MPI::MPI_CXX ${MATH_LIBRARY} RRuntime tug PhreeqcRM DataStructures ChemistryModule)
MPI::MPI_CXX ${MATH_LIBRARY} RRuntime PhreeqcRM tug)
target_compile_definitions(poet_lib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
add_subdirectory(ChemistryModule)
mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP)
set(PHREEQCRM_DISABLE_OPENMP ON CACHE BOOL "" FORCE)
option(POET_DHT_DEBUG "Build with DHT debug info" OFF)
if(POET_DHT_DEBUG)
target_compile_definitions(poet_lib PRIVATE DHT_STATISTICS)
endif()

View File

@ -4,6 +4,7 @@
#include <algorithm>
#include <cassert>
#include <cmath>
#include <cstdint>
#include <map>
#include <mpi.h>
@ -12,8 +13,10 @@
#include <utility>
#include <vector>
#ifndef POET_USE_PRM
constexpr uint32_t MB_FACTOR = 1E6;
poet::ChemistryModule::ChemistryModule(uint32_t nxyz, uint32_t wp_size,
std::uint32_t maxiter,
MPI_Comm communicator)
: PhreeqcRM(nxyz, 1), group_comm(communicator), wp_size(wp_size) {
@ -25,7 +28,10 @@ poet::ChemistryModule::ChemistryModule(uint32_t nxyz, uint32_t wp_size,
if (!is_sequential && is_master) {
MPI_Bcast(&wp_size, 1, MPI_UINT32_T, 0, this->group_comm);
MPI_Bcast(&maxiter, 1, MPI_UINT32_T, 0, this->group_comm);
}
this->file_pad = std::ceil(std::log10(maxiter + 1));
}
poet::ChemistryModule::~ChemistryModule() {
@ -39,13 +45,13 @@ poet::ChemistryModule::createWorker(MPI_Comm communicator) {
uint32_t wp_size;
MPI_Bcast(&wp_size, 1, MPI_UINT32_T, 0, communicator);
return ChemistryModule(wp_size, wp_size, communicator);
std::uint32_t maxiter;
MPI_Bcast(&maxiter, 1, MPI_UINT32_T, 0, communicator);
return ChemistryModule(wp_size, wp_size, maxiter, communicator);
}
#endif
void poet::ChemistryModule::RunInitFile(const std::string &input_script_path) {
#ifndef POET_USE_PRM
if (this->is_master) {
int f_type = CHEM_INIT;
PropagateFunctionType(f_type);
@ -54,7 +60,6 @@ void poet::ChemistryModule::RunInitFile(const std::string &input_script_path) {
ChemBCast(&count, 1, MPI_INT);
ChemBCast(const_cast<char *>(input_script_path.data()), count, MPI_CHAR);
}
#endif
this->RunFile(true, true, false, input_script_path);
this->RunString(true, false, false, "DELETE; -all; PRINT; -warnings 0;");
@ -69,7 +74,6 @@ void poet::ChemistryModule::RunInitFile(const std::string &input_script_path) {
char equilibrium = (speciesPerModule[3] == 0 ? -1 : 1);
char surface = (speciesPerModule[4] == 0 ? -1 : 1);
#ifdef POET_USE_PRM
std::vector<int> ic1;
ic1.resize(this->nxyz * 7, -1);
// TODO: hardcoded reaction modules
@ -84,25 +88,8 @@ void poet::ChemistryModule::RunInitFile(const std::string &input_script_path) {
}
this->InitialPhreeqc2Module(ic1);
#else
std::vector<int> ic1;
ic1.resize(this->nxyz * 7, -1);
// TODO: hardcoded reaction modules
for (int i = 0; i < nxyz; i++) {
ic1[i] = 1; // Solution 1
ic1[nxyz + i] = equilibrium; // Equilibrium 1
ic1[2 * nxyz + i] = exchange; // Exchange none
ic1[3 * nxyz + i] = surface; // Surface none
ic1[4 * nxyz + i] = -1; // Gas phase none
ic1[5 * nxyz + i] = -1; // Solid solutions none
ic1[6 * nxyz + i] = kinetics; // Kinetics 1
}
this->InitialPhreeqc2Module(ic1);
#endif
}
#ifndef POET_USE_PRM
void poet::ChemistryModule::initializeField(const Field &trans_field) {
if (is_master) {
@ -145,7 +132,9 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
}
// now sort the new values
std::sort(new_solution_names.begin() + 4, new_solution_names.end());
std::sort(new_solution_names.begin() + 3, new_solution_names.end());
this->SetPOETSolutionNames(new_solution_names);
this->speciesPerModule[0] = new_solution_names.size();
// and append other processes than solutions
std::vector<std::string> new_prop_names = new_solution_names;
@ -156,8 +145,6 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
this->prop_names = std::move(new_prop_names);
this->prop_count = prop_names.size();
this->SetPOETSolutionNames(new_solution_names);
if (is_master) {
this->n_cells = trans_field.GetRequestedVecSize();
chem_field = Field(n_cells);
@ -174,37 +161,58 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
std::vector<std::vector<double>> initial_values;
for (int i = 0; i < phreeqc_init.size(); i++) {
for (const auto &vec : trans_field.As2DVector()) {
initial_values.push_back(vec);
}
this->base_totals = {initial_values.at(0).at(0),
initial_values.at(1).at(0)};
ChemBCast(base_totals.data(), 2, MPI_DOUBLE);
for (int i = speciesPerModule[0]; i < phreeqc_init.size(); i++) {
std::vector<double> init(n_cells, phreeqc_init[i]);
initial_values.push_back(std::move(init));
}
chem_field.InitFromVec(initial_values, prop_names);
} else {
ChemBCast(base_totals.data(), 2, MPI_DOUBLE);
}
}
void poet::ChemistryModule::SetDHTEnabled(
bool enable, uint32_t size_mb,
bool enable, std::uint64_t size_mb,
const std::vector<std::string> &key_species) {
constexpr uint32_t MB_FACTOR = 1E6;
std::vector<std::uint32_t> key_inidices;
std::vector<std::int32_t> key_inidices;
std::vector<std::string> worker_copy;
if (this->is_master) {
int ftype = CHEM_DHT_ENABLE;
PropagateFunctionType(ftype);
ChemBCast(&enable, 1, MPI_CXX_BOOL);
ChemBCast(&size_mb, 1, MPI_UINT32_T);
ChemBCast(&size_mb, 1, MPI_UINT64_T);
// HACK: ugh, another problem with const qualifiers ...
auto non_const_keys = key_species;
BCastStringVec(non_const_keys);
int vec_size = key_species.size();
ChemBCast(&vec_size, 1, MPI_INT);
key_inidices = parseDHTSpeciesVec(key_species);
int vec_size = key_inidices.size();
ChemBCast(&vec_size, 1, MPI_INT);
ChemBCast(key_inidices.data(), key_inidices.size(), MPI_UINT32_T);
ChemBCast(key_inidices.data(), key_inidices.size(), MPI_INT32_T);
} else {
BCastStringVec(worker_copy);
int vec_size;
ChemBCast(&vec_size, 1, MPI_INT);
key_inidices.resize(vec_size);
ChemBCast(key_inidices.data(), vec_size, MPI_UINT32_T);
ChemBCast(key_inidices.data(), vec_size, MPI_INT32_T);
}
this->dht_enabled = enable;
@ -224,17 +232,17 @@ void poet::ChemistryModule::SetDHTEnabled(
delete this->dht;
}
const uint32_t dht_size = size_mb * MB_FACTOR;
const std::uint64_t dht_size = size_mb * MB_FACTOR;
this->dht =
new DHT_Wrapper(dht_comm, dht_size, key_inidices, this->prop_count);
// this->dht->setBaseTotals(this->base_totals);
this->dht = new DHT_Wrapper(dht_comm, dht_size, std::move(key_inidices),
std::move(worker_copy), this->prop_count);
this->dht->setBaseTotals(base_totals.at(0), base_totals.at(1));
}
}
inline std::vector<std::uint32_t> poet::ChemistryModule::parseDHTSpeciesVec(
inline std::vector<std::int32_t> poet::ChemistryModule::parseDHTSpeciesVec(
const std::vector<std::string> &species_vec) const {
std::vector<uint32_t> species_indices;
std::vector<int32_t> species_indices;
if (species_vec.empty()) {
species_indices.resize(this->prop_count);
@ -250,8 +258,8 @@ inline std::vector<std::uint32_t> poet::ChemistryModule::parseDHTSpeciesVec(
for (const auto &name : species_vec) {
auto it = std::find(this->prop_names.begin(), this->prop_names.end(), name);
if (it == prop_names.end()) {
throw std::runtime_error(
"DHT species name was not found in prop name vector!");
species_indices.push_back(DHT_Wrapper::DHT_KEY_INPUT_CUSTOM);
continue;
}
const std::uint32_t index = it - prop_names.begin();
species_indices.push_back(index);
@ -260,6 +268,32 @@ inline std::vector<std::uint32_t> poet::ChemistryModule::parseDHTSpeciesVec(
return species_indices;
}
void poet::ChemistryModule::BCastStringVec(std::vector<std::string> &io) {
if (this->is_master) {
int vec_size = io.size();
ChemBCast(&vec_size, 1, MPI_INT);
for (const auto &value : io) {
int buf_size = value.size() + 1;
ChemBCast(&buf_size, 1, MPI_INT);
ChemBCast(const_cast<char *>(value.c_str()), buf_size, MPI_CHAR);
}
} else {
int vec_size;
ChemBCast(&vec_size, 1, MPI_INT);
io.resize(vec_size);
for (int i = 0; i < vec_size; i++) {
int buf_size;
ChemBCast(&buf_size, 1, MPI_INT);
char buf[buf_size];
ChemBCast(buf, buf_size, MPI_CHAR);
io[i] = std::string{buf};
}
}
}
void poet::ChemistryModule::SetDHTSnaps(int type, const std::string &out_dir) {
if (this->is_master) {
int ftype = CHEM_DHT_SNAPS;
@ -297,23 +331,6 @@ void poet::ChemistryModule::SetDHTSignifVector(
this->dht->SetSignifVector(signif_vec);
}
void poet::ChemistryModule::SetDHTPropTypeVector(
std::vector<uint32_t> proptype_vec) {
if (this->is_master) {
if (proptype_vec.size() != this->prop_count) {
throw std::runtime_error("Prop type vector sizes mismatches prop count.");
}
int ftype = CHEM_DHT_PROP_TYPE_VEC;
PropagateFunctionType(ftype);
ChemBCast(proptype_vec.data(), proptype_vec.size(), MPI_UINT32_T);
return;
}
this->dht->SetPropTypeVector(proptype_vec);
}
void poet::ChemistryModule::ReadDHTFile(const std::string &input_file) {
if (this->is_master) {
int ftype = CHEM_DHT_READ_FILE;
@ -368,27 +385,3 @@ void poet::ChemistryModule::unshuffleField(const std::vector<double> &in_buffer,
}
}
}
#else // POET_USE_PRM
inline void poet::ChemistryModule::PrmToPoetField(std::vector<double> &field) {
this->getDumpedField(field);
}
void poet::ChemistryModule::RunCells() {
PhreeqcRM::RunCells();
std::vector<double> tmp_field;
PrmToPoetField(tmp_field);
this->field = tmp_field;
for (uint32_t i = 0; i < field.size(); i++) {
uint32_t back_i = static_cast<uint32_t>(i / this->nxyz);
uint32_t mod_i = i % this->nxyz;
field[i] = tmp_field[back_i + (mod_i * this->prop_count)];
}
}
#endif

View File

@ -1,226 +0,0 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2021 Marco De Lucia (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/DHT_Wrapper.hpp"
#include "poet/DHT_Types.hpp"
#include "poet/HashFunctions.hpp"
#include <algorithm>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <math.h>
#include <stdexcept>
#include <vector>
using namespace poet;
using namespace std;
inline DHT_SCNotation round_key_element(double value, std::uint32_t signif) {
std::int8_t exp =
static_cast<std::int8_t>(std::floor(std::log10(std::fabs(value))));
std::int64_t significant =
static_cast<std::int64_t>(value * std::pow(10, signif - exp - 1));
DHT_SCNotation elem;
elem.exp = exp;
elem.significant = significant;
return elem;
}
DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, uint32_t dht_size,
const std::vector<std::uint32_t> &key_indices,
uint32_t data_count)
: key_count(key_indices.size()), data_count(data_count),
input_key_elements(key_indices) {
// initialize DHT object
uint32_t key_size = (key_count + 1) * sizeof(DHT_Keyelement);
uint32_t data_size = data_count * sizeof(double);
uint32_t buckets_per_process = dht_size / (1 + data_size + key_size);
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
&poet::Murmur2_64A);
this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
this->dht_signif_vector[0] = DHT_KEY_SIGNIF_TOTALS;
this->dht_signif_vector[1] = DHT_KEY_SIGNIF_TOTALS;
this->dht_signif_vector[2] = DHT_KEY_SIGNIF_CHARGE;
this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT);
this->dht_prop_type_vector[0] = DHT_TYPE_TOTAL;
this->dht_prop_type_vector[1] = DHT_TYPE_TOTAL;
this->dht_prop_type_vector[2] = DHT_TYPE_CHARGE;
}
DHT_Wrapper::~DHT_Wrapper() {
// free DHT
DHT_free(dht_object, NULL, NULL);
}
auto DHT_Wrapper::checkDHT(int length, double dt,
const std::vector<double> &work_package,
std::vector<std::uint32_t> &curr_mapping)
-> const poet::DHT_ResultObject & {
dht_results.length = length;
dht_results.keys.resize(length);
dht_results.results.resize(length);
dht_results.needPhreeqc.resize(length);
std::vector<std::uint32_t> new_mapping;
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++) {
// point to current grid cell
void *key = (void *)&(work_package[i * this->data_count]);
auto &data = dht_results.results[i];
auto &key_vector = dht_results.keys[i];
data.resize(this->data_count);
key_vector = fuzzForDHT(this->key_count, key, dt);
// overwrite input with data from DHT, IF value is found in DHT
int res = DHT_read(this->dht_object, key_vector.data(), data.data());
switch (res) {
case DHT_SUCCESS:
dht_results.needPhreeqc[i] = false;
this->dht_hits++;
break;
case DHT_READ_MISS:
dht_results.needPhreeqc[i] = true;
new_mapping.push_back(curr_mapping[i]);
this->dht_miss++;
break;
}
}
curr_mapping = std::move(new_mapping);
return dht_results;
}
void DHT_Wrapper::fillDHT(int length, const std::vector<double> &work_package) {
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++) {
// If true grid cell was simulated, needs to be inserted into dht
if (dht_results.needPhreeqc[i]) {
const auto &key = dht_results.keys[i];
void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, (void *)key.data(), data);
// if data was successfully written ...
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
dht_evictions++;
}
}
}
}
void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
for (int i = 0; i < dht_results.length; i++) {
if (!dht_results.needPhreeqc[i]) {
std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
work_package.begin() + (data_count * i));
}
}
}
int DHT_Wrapper::tableToFile(const char *filename) {
int res = DHT_to_file(dht_object, filename);
return res;
}
int DHT_Wrapper::fileToTable(const char *filename) {
int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS)
return res;
#ifdef DHT_STATISTICS
DHT_print_statistics(dht_object);
#endif
return DHT_SUCCESS;
}
void DHT_Wrapper::printStatistics() {
int res;
res = DHT_print_statistics(dht_object);
if (res != DHT_SUCCESS) {
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
}
}
std::vector<DHT_Keyelement> DHT_Wrapper::fuzzForDHT(int var_count, void *key,
double dt) {
constexpr double zero_val = 10E-14;
std::vector<DHT_Keyelement> vecFuzz(var_count + 1);
std::memset(&vecFuzz[0], 0, sizeof(DHT_Keyelement) * var_count);
int totals_i = 0;
// introduce fuzzing to allow more hits in DHT
// loop over every variable of grid cell
for (std::uint32_t i = 0; i < input_key_elements.size(); i++) {
double curr_key = ((double *)key)[input_key_elements[i]];
if (curr_key != 0) {
if (curr_key < zero_val &&
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) {
continue;
}
if (this->dht_prop_type_vector[i] == DHT_TYPE_IGNORE) {
continue;
}
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) {
curr_key -= base_totals[totals_i++];
}
vecFuzz[i].sc_notation = round_key_element(curr_key, dht_signif_vector[i]);
}
}
// if timestep differs over iterations set current current time step at the
// end of fuzzing buffer
vecFuzz[var_count].fp_elemet = dt;
return vecFuzz;
}
void poet::DHT_Wrapper::SetSignifVector(std::vector<uint32_t> signif_vec) {
if (signif_vec.size() != this->key_count) {
throw std::runtime_error(
"Significant vector size mismatches count of key elements.");
}
this->dht_signif_vector = signif_vec;
}
void poet::DHT_Wrapper::SetPropTypeVector(std::vector<uint32_t> prop_type_vec) {
if (prop_type_vec.size() != this->key_count) {
throw std::runtime_error(
"Prop type vector size mismatches count of key elements.");
}
this->dht_prop_type_vector = prop_type_vec;
}

View File

@ -2,6 +2,7 @@
#include "poet/ChemistryModule.hpp"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <mpi.h>
#include <stdexcept>
@ -62,19 +63,71 @@ std::vector<double> poet::ChemistryModule::GetWorkerIdleTimings() const {
std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTHits() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerMetrics(WORKER_DHT_HITS);
}
std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTMiss() const {
int type = CHEM_PERF;
type = WORKER_DHT_HITS;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerMetrics(WORKER_DHT_MISS);
MPI_Status probe;
MPI_Probe(MPI_ANY_SOURCE, WORKER_DHT_HITS, this->group_comm, &probe);
int count;
MPI_Get_count(&probe, MPI_UINT32_T, &count);
std::vector<uint32_t> ret(count);
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE, WORKER_DHT_HITS,
this->group_comm, NULL);
return ret;
}
std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTEvictions() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerMetrics(WORKER_DHT_EVICTIONS);
type = WORKER_DHT_EVICTIONS;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
MPI_Status probe;
MPI_Probe(MPI_ANY_SOURCE, WORKER_DHT_EVICTIONS, this->group_comm, &probe);
int count;
MPI_Get_count(&probe, MPI_UINT32_T, &count);
std::vector<uint32_t> ret(count);
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE, WORKER_DHT_EVICTIONS,
this->group_comm, NULL);
return ret;
}
inline std::vector<double> shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop,
uint32_t prop_count,
uint32_t wp_count) {
std::vector<double> out_buffer(in_field.size());
uint32_t write_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_buffer[(write_i * prop_count) + k] =
in_field[(k * size_per_prop) + j];
}
write_i++;
}
}
return out_buffer;
}
inline void unshuffleField(const std::vector<double> &in_buffer,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count, std::vector<double> &out_field) {
uint32_t read_i = 0;
for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) {
out_field[(k * size_per_prop) + j] =
in_buffer[(read_i * prop_count) + k];
}
read_i++;
}
}
}
inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) {
@ -190,12 +243,15 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
}
void poet::ChemistryModule::RunCells() {
double start_t{MPI_Wtime()};
if (this->is_sequential) {
MasterRunSequential();
return;
}
MasterRunParallel();
double end_t{MPI_Wtime()};
this->chem_t += end_t - start_t;
}
void poet::ChemistryModule::MasterRunSequential() {
@ -211,7 +267,6 @@ void poet::ChemistryModule::MasterRunSequential() {
void poet::ChemistryModule::MasterRunParallel() {
/* declare most of the needed variables here */
double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b;
double sim_e_chemistry, sim_f_chemistry;
@ -227,9 +282,6 @@ void poet::ChemistryModule::MasterRunParallel() {
double dt = this->PhreeqcRM::GetTimeStep();
static uint32_t iteration = 0;
/* start time measurement of whole chemistry simulation */
chem_a = MPI_Wtime();
/* start time measurement of sequential part */
seq_a = MPI_Wtime();
@ -240,7 +292,7 @@ void poet::ChemistryModule::MasterRunParallel() {
// grid.shuffleAndExport(mpi_buffer);
std::vector<double> mpi_buffer =
shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count,
wp_sizes_vector.size());
wp_sizes_vector.size());
/* setup local variables */
pkg_to_send = wp_sizes_vector.size();
@ -263,7 +315,9 @@ void poet::ChemistryModule::MasterRunParallel() {
// while there are still packages to recv
while (pkg_to_recv > 0) {
// print a progressbar to stdout
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
if (print_progessbar) {
printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size());
}
// while there are still packages to send
if (pkg_to_send > 0) {
// send packages to all free workers ...
@ -288,7 +342,7 @@ void poet::ChemistryModule::MasterRunParallel() {
// grid.importAndUnshuffle(mpi_buffer);
std::vector<double> out_vec{mpi_buffer};
unshuffleField(mpi_buffer, this->n_cells, this->prop_count,
wp_sizes_vector.size(), out_vec);
wp_sizes_vector.size(), out_vec);
chem_field.SetFromVector(out_vec);
/* do master stuff */
@ -306,8 +360,6 @@ void poet::ChemistryModule::MasterRunParallel() {
for (int i = 1; i < this->comm_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, LOOP_END, this->group_comm);
}
chem_b = MPI_Wtime();
this->chem_t += chem_b - chem_a;
this->simtime += dt;
iteration++;
@ -322,7 +374,8 @@ std::vector<uint32_t>
poet::ChemistryModule::CalculateWPSizesVector(uint32_t n_cells,
uint32_t wp_size) const {
bool mod_pkgs = (n_cells % wp_size) != 0;
uint32_t n_packages = (uint32_t)(n_cells / wp_size) + mod_pkgs;
uint32_t n_packages =
(uint32_t)(n_cells / wp_size) + static_cast<int>(mod_pkgs);
std::vector<uint32_t> wp_sizes_vector(n_packages, 0);

View File

@ -1,3 +1,4 @@
/// Time-stamp: "Last modified 2023-06-28 15:58:19 mluebke"
/*
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
**
@ -15,10 +16,12 @@
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <mpi.h>
#include <poet/DHT.h>
#include <inttypes.h>
#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -52,7 +55,8 @@ static int read_flag(char flag_byte) {
}
DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
unsigned int key_size, uint64_t (*hash_func)(int, const void *)) {
unsigned int key_size,
uint64_t (*hash_func)(int, const void *)) {
DHT *object;
MPI_Win window;
void *mem_alloc;
@ -61,17 +65,20 @@ DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
// calculate how much bytes for the index are needed to address count of
// buckets per process
index_bytes = (int)ceil(log2(size));
if (index_bytes % 8 != 0) index_bytes = index_bytes + (8 - (index_bytes % 8));
if (index_bytes % 8 != 0)
index_bytes = index_bytes + (8 - (index_bytes % 8));
// allocate memory for dht-object
object = (DHT *)malloc(sizeof(DHT));
if (object == NULL) return NULL;
if (object == NULL)
return NULL;
// every memory allocation has 1 additional byte for flags etc.
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL,
&mem_alloc) != 0)
return NULL;
if (MPI_Comm_size(comm, &comm_size) != 0) return NULL;
if (MPI_Comm_size(comm, &comm_size) != 0)
return NULL;
// since MPI_Alloc_mem doesn't provide memory allocation with the memory set
// to zero, we're doing this here
@ -104,7 +111,8 @@ DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
DHT_stats *stats;
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
if (stats == NULL) return NULL;
if (stats == NULL)
return NULL;
object->stats = stats;
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
@ -118,7 +126,8 @@ DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
return object;
}
int DHT_write(DHT *table, void *send_key, void *send_data) {
int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc,
uint32_t *index) {
unsigned int dest_rank, i;
int result = DHT_SUCCESS;
@ -146,7 +155,8 @@ int DHT_write(DHT *table, void *send_key, void *send_data) {
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// increment eviction counter if receiving key doesn't match sending key
// entry has write flag and last index is reached.
@ -178,12 +188,21 @@ int DHT_write(DHT *table, void *send_key, void *send_data) {
table->window) != 0)
return DHT_MPI_ERROR;
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
if (proc) {
*proc = dest_rank;
}
if (index) {
*index = table->index[i];
}
return result;
}
int DHT_read(DHT *table, void *send_key, void *destination) {
int DHT_read(DHT *table, const void *send_key, void *destination) {
unsigned int dest_rank, i;
#ifdef DHT_STATISTICS
@ -205,7 +224,8 @@ int DHT_read(DHT *table, void *send_key, void *destination) {
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// increment read error counter if write flag isn't set ...
if ((read_flag(*(char *)table->recv_entry)) == 0) {
@ -214,7 +234,8 @@ int DHT_read(DHT *table, void *send_key, void *destination) {
table->stats->read_misses += 1;
#endif
// unlock window and return
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
return DHT_READ_MISS;
}
@ -227,7 +248,8 @@ int DHT_read(DHT *table, void *send_key, void *destination) {
table->stats->read_misses += 1;
#endif
// unlock window an return
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
return DHT_READ_MISS;
}
} else
@ -235,7 +257,8 @@ int DHT_read(DHT *table, void *send_key, void *destination) {
}
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// if matching key was found copy data into memory of passed pointer
memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1,
@ -257,17 +280,15 @@ int DHT_to_file(DHT *table, const char *filename) {
// write header (key_size and data_size)
if (rank == 0) {
if (MPI_File_write(file, &table->key_size, 1, MPI_INT, MPI_STATUS_IGNORE) !=
0)
if (MPI_File_write_shared(file, &table->key_size, 1, MPI_INT,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
if (MPI_File_write(file, &table->data_size, 1, MPI_INT,
MPI_STATUS_IGNORE) != 0)
if (MPI_File_write_shared(file, &table->data_size, 1, MPI_INT,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
}
// seek file pointer behind header for all processes
if (MPI_File_seek_shared(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
MPI_Barrier(table->communicator);
char *ptr;
int bucket_size = table->key_size + table->data_size + 1;
@ -283,8 +304,12 @@ int DHT_to_file(DHT *table, const char *filename) {
return DHT_FILE_WRITE_ERROR;
}
}
MPI_Barrier(table->communicator);
// close file
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
if (MPI_File_close(&file) != 0)
return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
}
@ -303,7 +328,8 @@ int DHT_from_file(DHT *table, const char *filename) {
return DHT_FILE_IO_ERROR;
// get file size
if (MPI_File_get_size(file, &f_size) != 0) return DHT_FILE_IO_ERROR;
if (MPI_File_get_size(file, &f_size) != 0)
return DHT_FILE_IO_ERROR;
MPI_Comm_rank(table->communicator, &rank);
@ -322,8 +348,10 @@ int DHT_from_file(DHT *table, const char *filename) {
return DHT_FILE_READ_ERROR;
// compare if written header data and key size matches current sizes
if (*(int *)buffer != table->key_size) return DHT_WRONG_FILE;
if (*(int *)(buffer + 4) != table->data_size) return DHT_WRONG_FILE;
if (*(int *)buffer != table->key_size)
return DHT_WRONG_FILE;
if (*(int *)(buffer + 4) != table->data_size)
return DHT_WRONG_FILE;
// set offset for each process
offset = bucket_size * table->comm_size;
@ -348,14 +376,16 @@ int DHT_from_file(DHT *table, const char *filename) {
// extract key and data and write to DHT
key = buffer;
data = (buffer + table->key_size);
if (DHT_write(table, key, data) == DHT_MPI_ERROR) return DHT_MPI_ERROR;
if (DHT_write(table, key, data, NULL, NULL) == DHT_MPI_ERROR)
return DHT_MPI_ERROR;
// increment current position
cur_pos += offset;
}
free(buffer);
if (MPI_File_close(&file) != 0) return DHT_FILE_IO_ERROR;
if (MPI_File_close(&file) != 0)
return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
}
@ -377,8 +407,10 @@ int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
return DHT_MPI_ERROR;
*readerror_counter = buf;
}
if (MPI_Win_free(&(table->window)) != 0) return DHT_MPI_ERROR;
if (MPI_Free_mem(table->mem_alloc) != 0) return DHT_MPI_ERROR;
if (MPI_Win_free(&(table->window)) != 0)
return DHT_MPI_ERROR;
if (MPI_Free_mem(table->mem_alloc) != 0)
return DHT_MPI_ERROR;
free(table->recv_entry);
free(table->send_entry);
free(table->index);
@ -407,7 +439,8 @@ int DHT_print_statistics(DHT *table) {
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
// obtaining all values from all processes in the communicator
if (rank == 0) read_misses = (int *)malloc(table->comm_size * sizeof(int));
if (rank == 0)
read_misses = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1,
MPI_INT, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
@ -416,7 +449,8 @@ int DHT_print_statistics(DHT *table) {
return DHT_MPI_ERROR;
table->stats->read_misses = 0;
if (rank == 0) evictions = (int *)malloc(table->comm_size * sizeof(int));
if (rank == 0)
evictions = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
@ -425,7 +459,8 @@ int DHT_print_statistics(DHT *table) {
return DHT_MPI_ERROR;
table->stats->evictions = 0;
if (rank == 0) w_access = (int *)malloc(table->comm_size * sizeof(int));
if (rank == 0)
w_access = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
@ -434,7 +469,8 @@ int DHT_print_statistics(DHT *table) {
return DHT_MPI_ERROR;
table->stats->w_access = 0;
if (rank == 0) r_access = (int *)malloc(table->comm_size * sizeof(int));
if (rank == 0)
r_access = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
@ -443,13 +479,14 @@ int DHT_print_statistics(DHT *table) {
return DHT_MPI_ERROR;
table->stats->r_access = 0;
if (rank == 0) written_buckets = (int *)calloc(table->comm_size, sizeof(int));
if (rank == 0)
written_buckets = (int *)calloc(table->comm_size, sizeof(int));
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size,
MPI_INT, MPI_SUM, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
if (rank == 0) { // only process with rank 0 will print out results as a
// table
if (rank == 0) { // only process with rank 0 will print out results as a
// table
int sum_written_buckets = 0;
for (int i = 0; i < table->comm_size; i++) {

View File

@ -0,0 +1,295 @@
// Time-stamp: "Last modified 2023-07-26 10:35:30 mluebke"
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2021 Marco De Lucia (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/DHT_Wrapper.hpp"
#include "poet/DHT_Types.hpp"
#include "poet/HashFunctions.hpp"
#include "poet/LookupKey.hpp"
#include "poet/Rounding.hpp"
#include <algorithm>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <stdexcept>
#include <vector>
using namespace std;
namespace poet {
DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
const std::vector<std::int32_t> &key_indices,
const std::vector<std::string> &key_names,
uint32_t data_count)
: key_count(key_indices.size()), data_count(data_count),
input_key_elements(key_indices), communicator(dht_comm),
key_names(key_names) {
// initialize DHT object
// key size = count of key elements + timestep
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
uint32_t data_size =
(data_count + input_key_elements.size()) * sizeof(double);
uint32_t buckets_per_process =
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
&poet::Murmur2_64A);
this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT);
auto tot_h = std::find(key_names.begin(), key_names.end(), "H");
if (tot_h != key_names.end()) {
this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL;
}
auto tot_o = std::find(key_names.begin(), key_names.end(), "O");
if (tot_o != key_names.end()) {
this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL;
}
auto charge = std::find(key_names.begin(), key_names.end(), "Charge");
if (charge != key_names.end()) {
this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE;
}
}
DHT_Wrapper::~DHT_Wrapper() {
// free DHT
DHT_free(dht_object, NULL, NULL);
}
auto DHT_Wrapper::checkDHT(int length, double dt,
const std::vector<double> &work_package,
std::vector<std::uint32_t> &curr_mapping)
-> const DHT_ResultObject & {
dht_results.length = length;
dht_results.keys.resize(length);
dht_results.results.resize(length);
dht_results.needPhreeqc.resize(length);
std::vector<double> bucket_writer(this->data_count +
input_key_elements.size());
std::vector<std::uint32_t> new_mapping;
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++) {
// point to current grid cell
void *key = (void *)&(work_package[i * this->data_count]);
auto &data = dht_results.results[i];
auto &key_vector = dht_results.keys[i];
// data.resize(this->data_count);
key_vector = fuzzForDHT(this->key_count, key, dt);
// overwrite input with data from DHT, IF value is found in DHT
int res =
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
switch (res) {
case DHT_SUCCESS:
dht_results.results[i] = inputAndRatesToOutput(bucket_writer);
dht_results.needPhreeqc[i] = false;
this->dht_hits++;
break;
case DHT_READ_MISS:
dht_results.needPhreeqc[i] = true;
new_mapping.push_back(curr_mapping[i]);
dht_results.results[i] = std::vector<double>{
&work_package[i * this->data_count],
&work_package[i * this->data_count] + this->data_count};
// HACK: apply normalization to total H and O in results field of DHT
// dht_results.results[i][0] -= base_totals[0];
// dht_results.results[i][1] -= base_totals[1];
break;
}
}
curr_mapping = std::move(new_mapping);
dht_results.old_values = work_package;
return dht_results;
}
void DHT_Wrapper::fillDHT(int length, const std::vector<double> &work_package) {
// loop over every grid cell contained in work package
dht_results.locations.resize(length);
for (int i = 0; i < length; i++) {
// If true grid cell was simulated, needs to be inserted into dht
if (dht_results.needPhreeqc[i]) {
// check if calcite or dolomite is absent and present, resp.n and vice
// versa in input/output. If this is the case -> Do not write to DHT!
// HACK: hardcoded, should be fixed!
if ((dht_results.old_values[i * this->data_count + 7] == 0) !=
(work_package[i * this->data_count + 7] == 0)) {
dht_results.needPhreeqc[i] = false;
continue;
}
if ((dht_results.old_values[i * this->data_count + 9] == 0) !=
(work_package[i * this->data_count + 9] == 0)) {
dht_results.needPhreeqc[i] = false;
continue;
}
uint32_t proc, index;
const auto &key = dht_results.keys[i];
const auto curr_old_data = std::vector<double>(
dht_results.old_values.begin() + (i * this->data_count),
dht_results.old_values.begin() + ((i + 1) * this->data_count));
const auto curr_new_data = std::vector<double>(
work_package.begin() + (i * this->data_count),
work_package.begin() + ((i + 1) * this->data_count));
const auto data = outputToInputAndRates(curr_old_data, curr_new_data);
// void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, (void *)(key.data()),
const_cast<double *>(data.data()), &proc, &index);
dht_results.locations[i] = {proc, index};
// if data was successfully written ...
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
dht_evictions++;
}
}
}
}
std::vector<double>
DHT_Wrapper::outputToInputAndRates(const std::vector<double> &old_results,
const std::vector<double> &new_results) {
const int prefix_size = this->input_key_elements.size();
std::vector<double> output(prefix_size + this->data_count);
std::copy(new_results.begin(), new_results.end(),
output.begin() + prefix_size);
for (int i = 0; i < prefix_size; i++) {
const int data_elem_i = input_key_elements[i];
output[i] = old_results[data_elem_i];
output[prefix_size + data_elem_i] -= old_results[data_elem_i];
}
return output;
}
std::vector<double>
DHT_Wrapper::inputAndRatesToOutput(const std::vector<double> &dht_data) {
const int prefix_size = this->input_key_elements.size();
std::vector<double> output{dht_data.begin() + prefix_size, dht_data.end()};
for (int i = 0; i < prefix_size; i++) {
const int data_elem_i = input_key_elements[i];
output[data_elem_i] += dht_data[i];
}
return output;
}
void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
for (int i = 0; i < dht_results.length; i++) {
if (!dht_results.needPhreeqc[i]) {
std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
work_package.begin() + (data_count * i));
}
}
}
int DHT_Wrapper::tableToFile(const char *filename) {
int res = DHT_to_file(dht_object, filename);
return res;
}
int DHT_Wrapper::fileToTable(const char *filename) {
int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS)
return res;
#ifdef DHT_STATISTICS
DHT_print_statistics(dht_object);
#endif
return DHT_SUCCESS;
}
void DHT_Wrapper::printStatistics() {
int res;
res = DHT_print_statistics(dht_object);
if (res != DHT_SUCCESS) {
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
}
}
LookupKey DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
const Lookup_Keyelement dummy = {.0};
LookupKey vecFuzz(var_count + 1, dummy);
DHT_Rounder rounder;
int totals_i = 0;
// introduce fuzzing to allow more hits in DHT
// loop over every variable of grid cell
for (std::uint32_t i = 0; i < input_key_elements.size(); i++) {
if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM) {
continue;
}
double curr_key = ((double *)key)[input_key_elements[i]];
if (curr_key != 0) {
if (curr_key < c_zero_val &&
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) {
continue;
}
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) {
curr_key -= base_totals[totals_i++];
}
vecFuzz[i] =
rounder.round(curr_key, dht_signif_vector[i],
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
}
}
// add timestep to the end of the key as double value
vecFuzz[var_count].fp_element = dt;
return vecFuzz;
}
void poet::DHT_Wrapper::SetSignifVector(std::vector<uint32_t> signif_vec) {
if (signif_vec.size() != this->key_count) {
throw std::runtime_error(
"Significant vector size mismatches count of key elements.");
}
this->dht_signif_vector = signif_vec;
}
} // namespace poet

View File

@ -1,7 +1,8 @@
// Time-stamp: "Last modified 2023-04-24 16:55:34 mluebke"
// Time-stamp: "Last modified 2023-07-27 15:21:16 mluebke"
#include "IrmResult.h"
#include "poet/ChemistryModule.hpp"
#include "poet/DHT_Wrapper.hpp"
#include <algorithm>
#include <cstddef>
@ -14,6 +15,8 @@
#include <string>
#include <vector>
namespace poet {
inline std::string get_string(int root, MPI_Comm communicator) {
int count;
MPI_Bcast(&count, 1, MPI_INT, root, communicator);
@ -55,8 +58,8 @@ void poet::ChemistryModule::WorkerLoop() {
bool enable;
ChemBCast(&enable, 1, MPI_CXX_BOOL);
uint32_t size_mb;
ChemBCast(&size_mb, 1, MPI_UINT32_T);
std::uint64_t size_mb;
ChemBCast(&size_mb, 1, MPI_UINT64_T);
std::vector<std::string> name_dummy;
@ -69,13 +72,6 @@ void poet::ChemistryModule::WorkerLoop() {
SetDHTSignifVector(input_vec);
break;
}
case CHEM_DHT_PROP_TYPE_VEC: {
std::vector<uint32_t> input_vec(this->prop_count);
ChemBCast(input_vec.data(), this->prop_count, MPI_UINT32_T);
SetDHTPropTypeVector(input_vec);
break;
}
case CHEM_DHT_SNAPS: {
int type;
ChemBCast(&type, 1, MPI_INT);
@ -245,23 +241,25 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm,
MPI_STATUS_IGNORE);
if (this->dht_enabled) {
this->dht->printStatistics();
dht_hits.push_back(dht->getHits());
dht_evictions.push_back(dht->getEvictions());
dht->resetCounter();
if (this->dht_snaps_type == DHT_FILES_ITEREND) {
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
WorkerWriteDHTDump(iteration);
}
}
}
void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
if (this->dht_enabled && this->dht_snaps_type == DHT_FILES_SIMEND) {
if (this->dht_enabled && this->dht_snaps_type == DHT_SNAPS_SIMEND) {
WorkerWriteDHTDump(iteration);
}
}
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
std::stringstream out;
out << this->dht_file_out_dir << "/iter_" << std::setfill('0') << std::setw(3)
<< iteration << ".dht";
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
<< std::setw(this->file_pad) << iteration << ".dht";
int res = dht->tableToFile(out.str().c_str());
if (res != DHT_SUCCESS && this->comm_rank == 2)
std::cerr
@ -355,24 +353,37 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
}
void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
uint32_t value;
MPI_Comm worker_comm = dht->getCommunicator();
int worker_rank;
MPI_Comm_rank(worker_comm, &worker_rank);
MPI_Comm &group_comm = this->group_comm;
auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm](
std::vector<std::uint32_t> &send_buffer, int tag) {
std::vector<uint32_t> to_master(send_buffer.size());
MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(),
MPI_UINT32_T, MPI_SUM, 0, worker_comm);
if (worker_rank == 0) {
MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag,
group_comm);
}
};
switch (type) {
case WORKER_DHT_HITS: {
value = dht->getHits();
break;
}
case WORKER_DHT_MISS: {
value = dht->getMisses();
reduce_and_send(dht_hits, WORKER_DHT_HITS);
break;
}
case WORKER_DHT_EVICTIONS: {
value = dht->getEvictions();
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
break;
}
default: {
throw std::runtime_error("Unknown perf type in master's message.");
}
}
MPI_Gather(&value, 1, MPI_UINT32_T, NULL, 1, MPI_UINT32_T, 0,
this->group_comm);
}
} // namespace poet

View File

@ -1,6 +0,0 @@
file(GLOB DataStructures_SRC
CONFIGURE_DEPENDS
"*.cpp" "*.c")
add_library(DataStructures ${DataStructures_SRC})
target_include_directories(DataStructures PUBLIC ${PROJECT_SOURCE_DIR}/include)

View File

@ -87,15 +87,14 @@ poet::ChemistryParams::s_ChemistryParams(RInside &R) {
Rcpp::as<std::string>(R.parseEval("mysetup$chemistry$database"));
this->input_script =
Rcpp::as<std::string>(R.parseEval("mysetup$chemistry$input_script"));
if (Rcpp::as<bool>(
R.parseEval("'dht_species' %in% names(mysetup$chemistry)"))) {
this->dht_species = Rcpp::as<std::vector<std::string>>(
auto dht_species = Rcpp::as<Rcpp::NumericVector>(
R.parseEval("mysetup$chemistry$dht_species"));
}
if (Rcpp::as<bool>(
R.parseEval("'dht_signif' %in% names(mysetup$chemistry)"))) {
this->dht_signif = Rcpp::as<std::vector<std::uint32_t>>(
R.parseEval("mysetup$chemistry$dht_signif"));
this->dht_species = Rcpp::as<std::vector<std::string>>(dht_species.names());
this->dht_signif = Rcpp::as<std::vector<std::uint32_t>>(dht_species);
}
}
@ -141,6 +140,8 @@ int SimParams::parseFromCmdl(char *argv[], RInside &R) {
return poet::PARSER_ERROR;
}
simparams.print_progressbar = cmdl[{"P", "progress"}];
/*Parse DHT arguments*/
simparams.dht_enabled = cmdl["dht"];
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
@ -218,55 +219,7 @@ int SimParams::parseFromCmdl(char *argv[], RInside &R) {
return poet::PARSER_OK;
}
void SimParams::initVectorParams(RInside &R) {
if (simparams.dht_enabled) {
/*Load significance vector from R setup file (or set default)*/
bool signif_vector_exists = R.parseEval("exists('signif_vector')");
if (signif_vector_exists) {
dht_signif_vector = as<std::vector<uint32_t>>(R["signif_vector"]);
}
/*Load property type vector from R setup file (or set default)*/
bool prop_type_vector_exists = R.parseEval("exists('prop_type')");
if (prop_type_vector_exists) {
std::vector<std::string> prop_type_R =
as<std::vector<string>>(R["prop_type"]);
this->dht_prop_type_vector.clear();
this->dht_prop_type_vector.reserve(prop_type_R.size());
for (const auto &type : prop_type_R) {
if (type == "act") {
this->dht_prop_type_vector.push_back(DHT_TYPE_CHARGE);
continue;
}
if (type == "ignore") {
this->dht_prop_type_vector.push_back(DHT_TYPE_IGNORE);
continue;
}
this->dht_prop_type_vector.push_back(DHT_TYPE_DEFAULT);
}
}
if (simparams.world_rank == 0) {
// MDL: new output on signif_vector and prop_type
if (signif_vector_exists) {
cout << "CPP: using problem-specific rounding digits: " << endl;
R.parseEval("print(data.frame(prop=prop, type=prop_type, "
"digits=signif_vector))");
} else {
cout << "CPP: using DHT default rounding digits = "
<< simparams.dht_significant_digits << endl;
}
// MDL: pass to R the DHT stuff. These variables exist
// only if dht_enabled is true
R["dht_final_signif"] = dht_signif_vector;
R["dht_final_proptype"] = dht_prop_type_vector;
}
}
}
void SimParams::initVectorParams(RInside &R) {}
std::list<std::string> SimParams::validateOptions(argh::parser cmdl) {
/* store all unknown parameters here */