Compare commits

...

16 Commits

Author SHA1 Message Date
rastogi
5edb8dc63b Rollback error 2025-10-24 12:37:22 +02:00
rastogi
040ae1181b Modified IO functions 2025-10-24 12:36:59 +02:00
rastogi
8d04a675af Added rrmse threshold values 2025-10-24 12:36:47 +02:00
Max Lübke
8aa006862a feat(grid): enable cell_ID integration in chemistry data flow 2025-10-24 10:55:49 +02:00
rastogi
0d126b9b3b feat(control): dynamic prototype, penalty_iteration, error while disabling surrogate fixed 2025-10-24 10:55:49 +02:00
rastogi
101e31cc5d rollback implemented, triggered when MAPE exceeds epsilon 2025-10-24 10:55:49 +02:00
Max Lübke
91f5430381 cleanup computeStats() 2025-10-24 10:55:49 +02:00
Max Lübke
2b60f423fe fix mssing handling of different storage locatios of work packages 2025-10-24 10:55:49 +02:00
Max Lübke
934bf50527 formatting MasterFunctions.cpp 2025-10-24 10:55:49 +02:00
Max Lübke
425c3bcf78 stuff in computeStats 2025-10-24 10:55:49 +02:00
Max Lübke
c20aadd9ab Add fgcs dolo benchmark 2025-10-24 10:55:49 +02:00
Max Lübke
815cf7767d rename control work package and subsitute interpolated work package 2025-10-24 10:55:49 +02:00
rastogi
089f8863ac Initial control-loop setup. Unit tests for MAPE and RMSE added.
Note: computeStats function is not working correctly yet.
2025-10-24 10:55:49 +02:00
rastogi
6800335cc0 computeStats not working correctly, Unit Tests added 2025-10-24 10:55:49 +02:00
Max Lübke
30dd0b4290 feat: Implement checkpointing
Co-authored-by: hmars-t <hmars-t@users.noreply.github.com>
2025-10-24 10:55:49 +02:00
rastogi
573361cdbc Initial commit 2025-10-24 10:55:49 +02:00
17 changed files with 1232 additions and 417 deletions

View File

@ -60,6 +60,9 @@ pqc_to_grid <- function(pqc_mat, grid) {
# Convert the result matrix to a data frame # Convert the result matrix to a data frame
res_df <- as.data.frame(result_mat) res_df <- as.data.frame(result_mat)
# Add cell_ID column to beginning of res_df
res_df <- cbind(cell_ID = seq(0, nrow(res_df) - 1), res_df)
# Remove all columns which only contain NaN # Remove all columns which only contain NaN
# res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)] # res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)]

View File

@ -1,4 +1,4 @@
list( list(
timesteps = rep(50, 100), timesteps = rep(50, 100),
store_result = TRUE store_result = TRUE
) )

48
bench/dolo/dolo_fgcs.pqi Normal file
View File

@ -0,0 +1,48 @@
SOLUTION 1
units mol/kgw
water 1
temperature 25
pH 7
pe 4
PURE 1
Calcite 0.0 1
END
RUN_CELLS
-cells 1
END
COPY solution 1 2
#PURE 2
# O2g -0.1675 10
KINETICS 2
Calcite
-m 0.00207
-parms 0.05
-tol 1e-10
Dolomite
-m 0.0
-parms 0.01
-tol 1e-10
END
SOLUTION 3
units mol/kgw
water 1
temp 25
Mg 0.001
Cl 0.002
END
SOLUTION 4
units mol/kgw
water 1
temp 25
Mg 0.002
Cl 0.004
END
RUN_CELLS
-cells 2-4
END

116
bench/dolo/dolo_fgcs_3.R Normal file
View File

@ -0,0 +1,116 @@
rows <- 400
cols <- 400
grid_def <- matrix(2, nrow = rows, ncol = cols)
# Define grid configuration for POET model
grid_setup <- list(
pqc_in_file = "./dolo_fgcs.pqi",
pqc_db_file = "./phreeqc_kin.dat", # Path to the database file for Phreeqc
grid_def = grid_def, # Definition of the grid, containing IDs according to the Phreeqc input script
grid_size = c(5, 5), # Size of the grid in meters
constant_cells = c() # IDs of cells with constant concentration
)
bound_def_we <- list(
"type" = rep("constant", rows),
"sol_id" = rep(1, rows),
"cell" = seq(1, rows)
)
bound_def_ns <- list(
"type" = rep("constant", cols),
"sol_id" = rep(1, cols),
"cell" = seq(1, cols)
)
diffusion_setup <- list(
boundaries = list(
"W" = bound_def_we,
"E" = bound_def_we,
"N" = bound_def_ns,
"S" = bound_def_ns
),
inner_boundaries = list(
"row" = floor(rows / 2),
"col" = floor(cols / 2),
"sol_id" = c(3)
),
alpha_x = 1e-6,
alpha_y = 1e-6
)
check_sign_cal_dol_dht <- function(old, new) {
if ((old["Calcite"] == 0) != (new["Calcite"] == 0)) {
return(TRUE)
}
if ((old["Dolomite"] == 0) != (new["Dolomite"] == 0)) {
return(TRUE)
}
return(FALSE)
}
check_sign_cal_dol_interp <- function(to_interp, data_set) {
dht_species <- c(
"H" = 3,
"O" = 3,
"C" = 6,
"Ca" = 6,
"Cl" = 3,
"Mg" = 5,
"Calcite" = 4,
"Dolomite" = 4
)
data_set <- as.data.frame(do.call(rbind, data_set), check.names = FALSE, optional = TRUE)
names(data_set) <- names(dht_species)
cal <- (data_set$Calcite == 0) == (to_interp["Calcite"] == 0)
dol <- (data_set$Dolomite == 0) == (to_interp["Dolomite"] == 0)
cal_dol_same_sig <- cal == dol
return(rev(which(!cal_dol_same_sig)))
}
check_neg_cal_dol <- function(result) {
neg_sign <- (result["Calcite"] < 0) || (result["Dolomite"] < 0)
return(neg_sign)
}
# Optional when using Interpolation (example with less key species and custom
# significant digits)
pht_species <- c(
"C" = 3,
"Ca" = 3,
"Mg" = 3,
"Cl" = 3,
"Calcite" = 3,
"Dolomite" = 3
)
dht_species <- c(
"H" = 3,
"O" = 3,
"C" = 6,
"Ca" = 6,
"Cl" = 3,
"Mg" = 5,
"Calcite" = 4,
"Dolomite" = 4)
chemistry_setup <- list(
dht_species = dht_species,
pht_species = pht_species,
hooks = list(
dht_fill = check_sign_cal_dol_dht,
interp_pre = check_sign_cal_dol_interp,
interp_post = check_neg_cal_dol
)
)
# Define a setup list for simulation configuration
setup <- list(
Grid = grid_setup, # Parameters related to the grid structure
Diffusion = diffusion_setup, # Parameters related to the diffusion process
Chemistry = chemistry_setup # Parameters related to the chemistry process
)

View File

@ -1,8 +1,28 @@
include(FetchContent)
FetchContent_Declare(
cli11
QUIET
GIT_REPOSITORY https://github.com/CLIUtils/CLI11.git
GIT_TAG v2.4.2
)
FetchContent_Declare(
highfive
QUIET
GIT_REPOSITORY https://github.com/highfive-devs/highfive.git
GIT_TAG v3.0.0
)
FetchContent_MakeAvailable(cli11)
FetchContent_MakeAvailable(highfive)
add_library(POETLib add_library(POETLib
Init/InitialList.cpp Init/InitialList.cpp
Init/GridInit.cpp Init/GridInit.cpp
Init/DiffusionInit.cpp Init/DiffusionInit.cpp
Init/ChemistryInit.cpp Init/ChemistryInit.cpp
IO/checkpoint.cpp
IO/StatsIO.cpp
DataStructures/Field.cpp DataStructures/Field.cpp
Transport/DiffusionModule.cpp Transport/DiffusionModule.cpp
Chemistry/ChemistryModule.cpp Chemistry/ChemistryModule.cpp
@ -24,24 +44,16 @@ elseif (POET_TUG_APPROACH STREQUAL "Explicit")
target_compile_definitions(POETLib PRIVATE POET_TUG_FTCS) target_compile_definitions(POETLib PRIVATE POET_TUG_FTCS)
endif() endif()
target_include_directories(POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}") target_include_directories(POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_BINARY_DIR}")
target_link_libraries( target_link_libraries(
POETLib POETLib
PUBLIC RRuntime PUBLIC RRuntime
PUBLIC IPhreeqcPOET PUBLIC IPhreeqcPOET
PUBLIC tug PUBLIC tug
PUBLIC MPI::MPI_C PUBLIC MPI::MPI_C
PUBLIC HighFive::HighFive
) )
include(FetchContent)
FetchContent_Declare(
cli11
QUIET
GIT_REPOSITORY https://github.com/CLIUtils/CLI11.git
GIT_TAG v2.4.2
)
FetchContent_MakeAvailable(cli11)
# add_library(poetlib # add_library(poetlib
# Base/Grid.cpp # Base/Grid.cpp

View File

@ -12,6 +12,8 @@
#include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp" #include "SurrogateModels/Interpolation.hpp"
#include "poet.hpp"
#include "PhreeqcRunner.hpp" #include "PhreeqcRunner.hpp"
#include <array> #include <array>
#include <cstdint> #include <cstdint>
@ -21,390 +23,441 @@
#include <string> #include <string>
#include <vector> #include <vector>
namespace poet { namespace poet
/** {
* \brief Wrapper around PhreeqcRM to provide POET specific parallelization with
* easy access.
*/
class ChemistryModule {
public:
/** /**
* Creates a new instance of Chemistry module with given grid cell count, work * \brief Wrapper around PhreeqcRM to provide POET specific parallelization with
* package size and communicator. * easy access.
*
* This constructor shall only be called by the master. To create workers, see
* ChemistryModule::createWorker .
*
* When the use of parallelization is intended, the nxyz value shall be set to
* 1 to save memory and only one node is needed for initialization.
*
* \param nxyz Count of grid cells to allocate and initialize for each
* process. For parellel use set to 1 at the master.
* \param wp_size Count of grid cells to fill each work package at maximum.
* \param communicator MPI communicator to distribute work in.
*/ */
ChemistryModule(uint32_t wp_size, class ChemistryModule
const InitialList::ChemistryInit chem_params, {
MPI_Comm communicator); public:
/**
* Creates a new instance of Chemistry module with given grid cell count, work
* package size and communicator.
*
* This constructor shall only be called by the master. To create workers, see
* ChemistryModule::createWorker .
*
* When the use of parallelization is intended, the nxyz value shall be set to
* 1 to save memory and only one node is needed for initialization.
*
* \param nxyz Count of grid cells to allocate and initialize for each
* process. For parellel use set to 1 at the master.
* \param wp_size Count of grid cells to fill each work package at maximum.
* \param communicator MPI communicator to distribute work in.
*/
ChemistryModule(uint32_t wp_size,
const InitialList::ChemistryInit chem_params,
MPI_Comm communicator);
/** /**
* Deconstructor, which frees DHT data structure if used. * Deconstructor, which frees DHT data structure if used.
*/ */
~ChemistryModule(); ~ChemistryModule();
void masterSetField(Field field); void masterSetField(Field field);
/** /**
* Run the chemical simulation with parameters set. * Run the chemical simulation with parameters set.
*/ */
void simulate(double dt); void simulate(double dt);
/** /**
* Returns all known species names, including not only aqueous species, but * Returns all known species names, including not only aqueous species, but
* also equilibrium, exchange, surface and kinetic reactants. * also equilibrium, exchange, surface and kinetic reactants.
*/ */
// auto GetPropNames() const { return this->prop_names; } // auto GetPropNames() const { return this->prop_names; }
/** /**
* Return the accumulated runtime in seconds for chemical simulation. * Return the accumulated runtime in seconds for chemical simulation.
*/ */
auto GetChemistryTime() const { return this->chem_t; } auto GetChemistryTime() const { return this->chem_t; }
void setFilePadding(std::uint32_t maxiter) { void setFilePadding(std::uint32_t maxiter)
this->file_pad = {
static_cast<std::uint8_t>(std::ceil(std::log10(maxiter + 1))); this->file_pad =
} static_cast<std::uint8_t>(std::ceil(std::log10(maxiter + 1)));
}
struct SurrogateSetup { struct SurrogateSetup
std::vector<std::string> prop_names; {
std::array<double, 2> base_totals; std::vector<std::string> prop_names;
bool has_het_ids; std::array<double, 2> base_totals;
bool has_het_ids;
bool dht_enabled; bool dht_enabled;
std::uint32_t dht_size_mb; std::uint32_t dht_size_mb;
int dht_snaps; int dht_snaps;
std::string dht_out_dir; std::string dht_out_dir;
bool interp_enabled; bool interp_enabled;
std::uint32_t interp_bucket_size; std::uint32_t interp_bucket_size;
std::uint32_t interp_size_mb; std::uint32_t interp_size_mb;
std::uint32_t interp_min_entries; std::uint32_t interp_min_entries;
bool ai_surrogate_enabled; bool ai_surrogate_enabled;
}; };
void masterEnableSurrogates(const SurrogateSetup &setup) { void masterEnableSurrogates(const SurrogateSetup &setup)
// FIXME: This is a hack to get the prop_names and prop_count from the setup {
this->prop_names = setup.prop_names; // FIXME: This is a hack to get the prop_names and prop_count from the setup
this->prop_count = setup.prop_names.size(); this->prop_names = setup.prop_names;
this->prop_count = setup.prop_names.size();
this->dht_enabled = setup.dht_enabled; this->dht_enabled = setup.dht_enabled;
this->interp_enabled = setup.interp_enabled; this->interp_enabled = setup.interp_enabled;
this->ai_surrogate_enabled = setup.ai_surrogate_enabled; this->ai_surrogate_enabled = setup.ai_surrogate_enabled;
this->base_totals = setup.base_totals; this->base_totals = setup.base_totals;
if (this->dht_enabled || this->interp_enabled) { if (this->dht_enabled || this->interp_enabled)
this->initializeDHT(setup.dht_size_mb, this->params.dht_species, {
setup.has_het_ids); this->initializeDHT(setup.dht_size_mb, this->params.dht_species,
setup.has_het_ids);
if (setup.dht_snaps != DHT_SNAPS_DISABLED) { if (setup.dht_snaps != DHT_SNAPS_DISABLED)
this->setDHTSnapshots(setup.dht_snaps, setup.dht_out_dir); {
this->setDHTSnapshots(setup.dht_snaps, setup.dht_out_dir);
}
}
if (this->interp_enabled)
{
this->initializeInterp(setup.interp_bucket_size, setup.interp_size_mb,
setup.interp_min_entries,
this->params.interp_species);
} }
} }
if (this->interp_enabled) { /**
this->initializeInterp(setup.interp_bucket_size, setup.interp_size_mb, * Intended to alias input parameters for grid initialization with a single
setup.interp_min_entries, * value per species.
this->params.interp_species); */
using SingleCMap = std::unordered_map<std::string, double>;
/**
* Intended to alias input parameters for grid initialization with mutlitple
* values per species.
*/
using VectorCMap = std::unordered_map<std::string, std::vector<double>>;
/**
* Enumerating DHT file options
*/
enum
{
DHT_SNAPS_DISABLED = 0, //!< disabled file output
DHT_SNAPS_SIMEND, //!< only output of snapshot after simulation
DHT_SNAPS_ITEREND //!< output snapshots after each iteration
};
/**
* **Only called by workers!** Start the worker listening loop.
*/
void WorkerLoop();
/**
* **Called by master** Advise the workers to break the loop.
*/
void MasterLoopBreak();
/**
* **Master only** Return count of grid cells.
*/
auto GetNCells() const { return this->n_cells; }
/**
* **Master only** Return work package size.
*/
auto GetWPSize() const { return this->wp_size; }
/**
* **Master only** Return the time in seconds the master spent waiting for any
* free worker.
*/
auto GetMasterIdleTime() const { return this->idle_t; }
/**
* **Master only** Return the time in seconds the master spent in sequential
* part of the simulation, including times for shuffling/unshuffling field
* etc.
*/
auto GetMasterSequentialTime() const { return this->seq_t; }
/**
* **Master only** Return the time in seconds the master spent in the
* send/receive loop.
*/
auto GetMasterLoopTime() const { return this->send_recv_t; }
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to run Phreeqc simulation.
*
* \return Vector of all accumulated Phreeqc timings.
*/
std::vector<double> GetWorkerPhreeqcTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to get values from the DHT.
*
* \return Vector of all accumulated DHT get times.
*/
std::vector<double> GetWorkerDHTGetTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to write values to the DHT.
*
* \return Vector of all accumulated DHT fill times.
*/
std::vector<double> GetWorkerDHTFillTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers waiting for work packages from the master.
*
* \return Vector of all accumulated waiting times.
*/
std::vector<double> GetWorkerIdleTimings() const;
/**
* **Master only** Collect and return DHT hits of all workers.
*
* \return Vector of all count of DHT hits.
*/
std::vector<uint32_t> GetWorkerDHTHits() const;
/**
* **Master only** Collect and return DHT evictions of all workers.
*
* \return Vector of all count of DHT evictions.
*/
std::vector<uint32_t> GetWorkerDHTEvictions() const;
/**
* **Master only** Returns the current state of the chemical field.
*
* \return Reference to the chemical field.
*/
Field &getField() { return this->chem_field; }
/**
* **Master only** Enable/disable progress bar.
*
* \param enabled True if print progressbar, false if not.
*/
void setProgressBarPrintout(bool enabled)
{
this->print_progessbar = enabled;
};
/**
* **Master only** Set the ai surrogate validity vector from R
*/
void set_ai_surrogate_validity_vector(std::vector<int> r_vector);
std::vector<uint32_t> GetWorkerInterpolationCalls() const;
std::vector<double> GetWorkerInterpolationWriteTimings() const;
std::vector<double> GetWorkerInterpolationReadTimings() const;
std::vector<double> GetWorkerInterpolationGatherTimings() const;
std::vector<double> GetWorkerInterpolationFunctionCallTimings() const;
std::vector<uint32_t> GetWorkerPHTCacheHits() const;
std::vector<int> ai_surrogate_validity_vector;
RuntimeParameters *runtime_params = nullptr;
uint32_t control_iteration_counter = 0;
struct error_stats
{
std::vector<double> mape;
std::vector<double> rrsme;
uint32_t iteration;
error_stats(size_t species_count, size_t iter)
: mape(species_count, 0.0), rrsme(species_count, 0.0), iteration(iter) {}
};
std::vector<SimulationErrorStats> error_history;
void computeSpeciesErrors(const std::vector<double> &reference_values,
const std::vector<double> &surrogate_values,
uint32_t size_per_prop,
uint32_t species_count,
SimulationErrorStats &species_error_stats);
static void computeStats(const std::vector<double> &pqc_vector,
const std::vector<double> &sur_vector,
uint32_t size_per_prop, uint32_t species_count,
error_stats &stats);
protected:
void initializeDHT(uint32_t size_mb,
const NamedVector<std::uint32_t> &key_species,
bool has_het_ids);
void setDHTSnapshots(int type, const std::string &out_dir);
void setDHTReadFile(const std::string &input_file);
void initializeInterp(std::uint32_t bucket_size, std::uint32_t size_mb,
std::uint32_t min_entries,
const NamedVector<std::uint32_t> &key_species);
enum
{
CHEM_FIELD_INIT,
CHEM_DHT_ENABLE,
CHEM_DHT_SIGNIF_VEC,
CHEM_DHT_SNAPS,
CHEM_DHT_READ_FILE,
CHEM_INTERP,
CHEM_IP_ENABLE,
CHEM_IP_MIN_ENTRIES,
CHEM_IP_SIGNIF_VEC,
CHEM_WORK_LOOP,
CHEM_PERF,
CHEM_BREAK_MAIN_LOOP,
CHEM_AI_BCAST_VALIDITY
};
enum
{
LOOP_WORK,
LOOP_END,
LOOP_CTRL
};
enum
{
WORKER_PHREEQC,
WORKER_DHT_GET,
WORKER_DHT_FILL,
WORKER_IDLE,
WORKER_IP_WRITE,
WORKER_IP_READ,
WORKER_IP_GATHER,
WORKER_IP_FC,
WORKER_DHT_HITS,
WORKER_DHT_EVICTIONS,
WORKER_PHT_CACHE_HITS,
WORKER_IP_CALLS
};
std::vector<uint32_t> interp_calls;
std::vector<uint32_t> dht_hits;
std::vector<uint32_t> dht_evictions;
struct worker_s
{
double phreeqc_t = 0.;
double dht_get = 0.;
double dht_fill = 0.;
double idle_t = 0.;
};
struct worker_info_s
{
char has_work = 0;
double *send_addr;
double *surrogate_addr;
};
using worker_list_t = std::vector<struct worker_info_s>;
using workpointer_t = std::vector<double>::iterator;
void MasterRunParallel(double dt);
void MasterRunSequential();
void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer, workpointer_t &sur_pointer,
int &pkg_to_send, int &count_pkgs, int &free_workers,
double dt, uint32_t iteration, uint32_t control_iteration,
const std::vector<uint32_t> &wp_sizes_vector);
void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send,
int &free_workers);
std::vector<double> MasterGatherWorkerTimings(int type) const;
std::vector<uint32_t> MasterGatherWorkerMetrics(int type) const;
void WorkerProcessPkgs(struct worker_s &timings, uint32_t &iteration);
void WorkerDoWork(MPI_Status &probe_status, int double_count,
struct worker_s &timings);
void WorkerPostIter(MPI_Status &prope_status, uint32_t iteration);
void WorkerPostSim(uint32_t iteration);
void WorkerWriteDHTDump(uint32_t iteration);
void WorkerReadDHTDump(const std::string &dht_input_file);
void WorkerPerfToMaster(int type, const struct worker_s &timings);
void WorkerMetricsToMaster(int type);
void WorkerRunWorkPackage(WorkPackage &work_package, double dSimTime,
double dTimestep);
std::vector<uint32_t> CalculateWPSizesVector(uint32_t n_cells,
uint32_t wp_size) const;
std::vector<double> shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count);
void unshuffleField(const std::vector<double> &in_buffer,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count, std::vector<double> &out_field);
std::vector<std::int32_t>
parseDHTSpeciesVec(const NamedVector<std::uint32_t> &key_species,
const std::vector<std::string> &to_compare) const;
void BCastStringVec(std::vector<std::string> &io);
int comm_size, comm_rank;
MPI_Comm group_comm;
bool is_sequential;
bool is_master;
uint32_t wp_size;
bool dht_enabled{false};
int dht_snaps_type{DHT_SNAPS_DISABLED};
std::string dht_file_out_dir;
poet::DHT_Wrapper *dht = nullptr;
bool ht_fill{false};
bool interp_enabled{false};
std::unique_ptr<poet::InterpolationModule> interp;
bool ai_surrogate_enabled{false};
static constexpr uint32_t BUFFER_OFFSET = 6;
inline void ChemBCast(void *buf, int count, MPI_Datatype datatype) const
{
MPI_Bcast(buf, count, datatype, 0, this->group_comm);
} }
}
/** inline void PropagateFunctionType(int &type) const
* Intended to alias input parameters for grid initialization with a single {
* value per species. ChemBCast(&type, 1, MPI_INT);
*/ }
using SingleCMap = std::unordered_map<std::string, double>; double simtime = 0.;
/**
* Intended to alias input parameters for grid initialization with mutlitple
* values per species.
*/
using VectorCMap = std::unordered_map<std::string, std::vector<double>>;
/**
* Enumerating DHT file options
*/
enum {
DHT_SNAPS_DISABLED = 0, //!< disabled file output
DHT_SNAPS_SIMEND, //!< only output of snapshot after simulation
DHT_SNAPS_ITEREND //!< output snapshots after each iteration
};
/**
* **Only called by workers!** Start the worker listening loop.
*/
void WorkerLoop();
/**
* **Called by master** Advise the workers to break the loop.
*/
void MasterLoopBreak();
/**
* **Master only** Return count of grid cells.
*/
auto GetNCells() const { return this->n_cells; }
/**
* **Master only** Return work package size.
*/
auto GetWPSize() const { return this->wp_size; }
/**
* **Master only** Return the time in seconds the master spent waiting for any
* free worker.
*/
auto GetMasterIdleTime() const { return this->idle_t; }
/**
* **Master only** Return the time in seconds the master spent in sequential
* part of the simulation, including times for shuffling/unshuffling field
* etc.
*/
auto GetMasterSequentialTime() const { return this->seq_t; }
/**
* **Master only** Return the time in seconds the master spent in the
* send/receive loop.
*/
auto GetMasterLoopTime() const { return this->send_recv_t; }
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to run Phreeqc simulation.
*
* \return Vector of all accumulated Phreeqc timings.
*/
std::vector<double> GetWorkerPhreeqcTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to get values from the DHT.
*
* \return Vector of all accumulated DHT get times.
*/
std::vector<double> GetWorkerDHTGetTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to write values to the DHT.
*
* \return Vector of all accumulated DHT fill times.
*/
std::vector<double> GetWorkerDHTFillTimings() const;
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers waiting for work packages from the master.
*
* \return Vector of all accumulated waiting times.
*/
std::vector<double> GetWorkerIdleTimings() const;
/**
* **Master only** Collect and return DHT hits of all workers.
*
* \return Vector of all count of DHT hits.
*/
std::vector<uint32_t> GetWorkerDHTHits() const;
/**
* **Master only** Collect and return DHT evictions of all workers.
*
* \return Vector of all count of DHT evictions.
*/
std::vector<uint32_t> GetWorkerDHTEvictions() const;
/**
* **Master only** Returns the current state of the chemical field.
*
* \return Reference to the chemical field.
*/
Field &getField() { return this->chem_field; }
/**
* **Master only** Enable/disable progress bar.
*
* \param enabled True if print progressbar, false if not.
*/
void setProgressBarPrintout(bool enabled) {
this->print_progessbar = enabled;
};
/**
* **Master only** Set the ai surrogate validity vector from R
*/
void set_ai_surrogate_validity_vector(std::vector<int> r_vector);
std::vector<uint32_t> GetWorkerInterpolationCalls() const;
std::vector<double> GetWorkerInterpolationWriteTimings() const;
std::vector<double> GetWorkerInterpolationReadTimings() const;
std::vector<double> GetWorkerInterpolationGatherTimings() const;
std::vector<double> GetWorkerInterpolationFunctionCallTimings() const;
std::vector<uint32_t> GetWorkerPHTCacheHits() const;
std::vector<int> ai_surrogate_validity_vector;
protected:
void initializeDHT(uint32_t size_mb,
const NamedVector<std::uint32_t> &key_species,
bool has_het_ids);
void setDHTSnapshots(int type, const std::string &out_dir);
void setDHTReadFile(const std::string &input_file);
void initializeInterp(std::uint32_t bucket_size, std::uint32_t size_mb,
std::uint32_t min_entries,
const NamedVector<std::uint32_t> &key_species);
enum {
CHEM_FIELD_INIT,
CHEM_DHT_ENABLE,
CHEM_DHT_SIGNIF_VEC,
CHEM_DHT_SNAPS,
CHEM_DHT_READ_FILE,
CHEM_IP_ENABLE,
CHEM_IP_MIN_ENTRIES,
CHEM_IP_SIGNIF_VEC,
CHEM_WORK_LOOP,
CHEM_PERF,
CHEM_BREAK_MAIN_LOOP,
CHEM_AI_BCAST_VALIDITY
};
enum { LOOP_WORK, LOOP_END };
enum {
WORKER_PHREEQC,
WORKER_DHT_GET,
WORKER_DHT_FILL,
WORKER_IDLE,
WORKER_IP_WRITE,
WORKER_IP_READ,
WORKER_IP_GATHER,
WORKER_IP_FC,
WORKER_DHT_HITS,
WORKER_DHT_EVICTIONS,
WORKER_PHT_CACHE_HITS,
WORKER_IP_CALLS
};
std::vector<uint32_t> interp_calls;
std::vector<uint32_t> dht_hits;
std::vector<uint32_t> dht_evictions;
struct worker_s {
double phreeqc_t = 0.;
double dht_get = 0.;
double dht_fill = 0.;
double idle_t = 0.; double idle_t = 0.;
double seq_t = 0.;
double send_recv_t = 0.;
std::array<double, 2> base_totals{0};
bool print_progessbar{false};
std::uint8_t file_pad{1};
double chem_t{0.};
uint32_t n_cells = 0;
uint32_t prop_count = 0;
std::vector<std::string> prop_names;
Field chem_field;
const InitialList::ChemistryInit params;
std::unique_ptr<PhreeqcRunner> pqc_runner;
std::vector<double> sur_shuffled;
}; };
struct worker_info_s {
char has_work = 0;
double *send_addr;
};
using worker_list_t = std::vector<struct worker_info_s>;
using workpointer_t = std::vector<double>::iterator;
void MasterRunParallel(double dt);
void MasterRunSequential();
void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer,
int &pkg_to_send, int &count_pkgs, int &free_workers,
double dt, uint32_t iteration,
const std::vector<uint32_t> &wp_sizes_vector);
void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send,
int &free_workers);
std::vector<double> MasterGatherWorkerTimings(int type) const;
std::vector<uint32_t> MasterGatherWorkerMetrics(int type) const;
void WorkerProcessPkgs(struct worker_s &timings, uint32_t &iteration);
void WorkerDoWork(MPI_Status &probe_status, int double_count,
struct worker_s &timings);
void WorkerPostIter(MPI_Status &prope_status, uint32_t iteration);
void WorkerPostSim(uint32_t iteration);
void WorkerWriteDHTDump(uint32_t iteration);
void WorkerReadDHTDump(const std::string &dht_input_file);
void WorkerPerfToMaster(int type, const struct worker_s &timings);
void WorkerMetricsToMaster(int type);
void WorkerRunWorkPackage(WorkPackage &work_package, double dSimTime,
double dTimestep);
std::vector<uint32_t> CalculateWPSizesVector(uint32_t n_cells,
uint32_t wp_size) const;
std::vector<double> shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count);
void unshuffleField(const std::vector<double> &in_buffer,
uint32_t size_per_prop, uint32_t prop_count,
uint32_t wp_count, std::vector<double> &out_field);
std::vector<std::int32_t>
parseDHTSpeciesVec(const NamedVector<std::uint32_t> &key_species,
const std::vector<std::string> &to_compare) const;
void BCastStringVec(std::vector<std::string> &io);
int comm_size, comm_rank;
MPI_Comm group_comm;
bool is_sequential;
bool is_master;
uint32_t wp_size;
bool dht_enabled{false};
int dht_snaps_type{DHT_SNAPS_DISABLED};
std::string dht_file_out_dir;
poet::DHT_Wrapper *dht = nullptr;
bool interp_enabled{false};
std::unique_ptr<poet::InterpolationModule> interp;
bool ai_surrogate_enabled{false};
static constexpr uint32_t BUFFER_OFFSET = 5;
inline void ChemBCast(void *buf, int count, MPI_Datatype datatype) const {
MPI_Bcast(buf, count, datatype, 0, this->group_comm);
}
inline void PropagateFunctionType(int &type) const {
ChemBCast(&type, 1, MPI_INT);
}
double simtime = 0.;
double idle_t = 0.;
double seq_t = 0.;
double send_recv_t = 0.;
std::array<double, 2> base_totals{0};
bool print_progessbar{false};
std::uint8_t file_pad{1};
double chem_t{0.};
uint32_t n_cells = 0;
uint32_t prop_count = 0;
std::vector<std::string> prop_names;
Field chem_field;
const InitialList::ChemistryInit params;
std::unique_ptr<PhreeqcRunner> pqc_runner;
};
} // namespace poet } // namespace poet
#endif // CHEMISTRYMODULE_H_ #endif // CHEMISTRYMODULE_H_

View File

@ -3,6 +3,7 @@
#include <algorithm> #include <algorithm>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <iomanip>
#include <mpi.h> #include <mpi.h>
#include <vector> #include <vector>
@ -159,6 +160,52 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerPHTCacheHits() const {
return ret; return ret;
} }
void poet::ChemistryModule::computeSpeciesErrors(
const std::vector<double> &reference_values,
const std::vector<double> &surrogate_values, uint32_t size_per_prop,
uint32_t species_count, SimulationErrorStats &species_error_stats) {
for (uint32_t i = 0; i < species_count; ++i) {
double err_sum = 0.0;
double sqr_err_sum = 0.0;
uint32_t base_idx = i * size_per_prop;
if (i > 1) {
std::cerr << "---- Species [" << i << "] " << this->prop_names[i]
<< " ----\n";
}
for (uint32_t j = 0; j < size_per_prop; ++j) {
const double pqc_value = pqc_vector[base_idx + j];
const double sur_value = sur_vector[base_idx + j];
// Print raw input values
if (i > 1) {
std::cerr << " index " << j << " | ref=" << ref_value
<< " | sur=" << sur_value << '\n';
}
if (ref_value == 0.0) {
if (sur_value != 0.0) {
err_sum += 1.0;
sqr_err_sum += 1.0;
}
// Both zero: skip
} else {
double alpha = 1.0 - (sur_value / pqc_value);
err_sum += std::abs(alpha);
sqr_err_sum += alpha * alpha;
}
}
stats.mape[i] = 100.0 * (err_sum / size_per_prop);
stats.rrsme[i] =
(size_per_prop > 0) ? std::sqrt(sqr_err_sum / size_per_prop) : 0.0;
std::cerr << " -> MAPE=" << species_error_stats.mape[i]
<< " RRSME=" << species_error_stats.rrmse[i] << "\n\n";
}
}
inline std::vector<int> shuffleVector(const std::vector<int> &in_vector, inline std::vector<int> shuffleVector(const std::vector<int> &in_vector,
uint32_t size_per_prop, uint32_t size_per_prop,
uint32_t wp_count) { uint32_t wp_count) {
@ -175,14 +222,14 @@ inline std::vector<int> shuffleVector(const std::vector<int> &in_vector,
inline std::vector<double> shuffleField(const std::vector<double> &in_field, inline std::vector<double> shuffleField(const std::vector<double> &in_field,
uint32_t size_per_prop, uint32_t size_per_prop,
uint32_t prop_count, uint32_t species_count,
uint32_t wp_count) { uint32_t wp_count) {
std::vector<double> out_buffer(in_field.size()); std::vector<double> out_buffer(in_field.size());
uint32_t write_i = 0; uint32_t write_i = 0;
for (uint32_t i = 0; i < wp_count; i++) { for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) { for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) { for (uint32_t k = 0; k < species_count; k++) {
out_buffer[(write_i * prop_count) + k] = out_buffer[(write_i * species_count) + k] =
in_field[(k * size_per_prop) + j]; in_field[(k * size_per_prop) + j];
} }
write_i++; write_i++;
@ -192,15 +239,15 @@ inline std::vector<double> shuffleField(const std::vector<double> &in_field,
} }
inline void unshuffleField(const std::vector<double> &in_buffer, inline void unshuffleField(const std::vector<double> &in_buffer,
uint32_t size_per_prop, uint32_t prop_count, uint32_t size_per_prop, uint32_t species_count,
uint32_t wp_count, std::vector<double> &out_field) { uint32_t wp_count, std::vector<double> &out_field) {
uint32_t read_i = 0; uint32_t read_i = 0;
for (uint32_t i = 0; i < wp_count; i++) { for (uint32_t i = 0; i < wp_count; i++) {
for (uint32_t j = i; j < size_per_prop; j += wp_count) { for (uint32_t j = i; j < size_per_prop; j += wp_count) {
for (uint32_t k = 0; k < prop_count; k++) { for (uint32_t k = 0; k < species_count; k++) {
out_field[(k * size_per_prop) + j] = out_field[(k * size_per_prop) + j] =
in_buffer[(read_i * prop_count) + k]; in_buffer[(read_i * species_count) + k];
} }
read_i++; read_i++;
} }
@ -227,15 +274,17 @@ inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) {
} }
inline void poet::ChemistryModule::MasterSendPkgs( inline void poet::ChemistryModule::MasterSendPkgs(
worker_list_t &w_list, workpointer_t &work_pointer, int &pkg_to_send, worker_list_t &w_list, workpointer_t &work_pointer,
int &count_pkgs, int &free_workers, double dt, uint32_t iteration, workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs,
int &free_workers, double dt, uint32_t iteration, uint32_t control_interval,
const std::vector<uint32_t> &wp_sizes_vector) { const std::vector<uint32_t> &wp_sizes_vector) {
/* declare variables */ /* declare variables */
int local_work_package_size; int local_work_package_size;
/* search for free workers and send work */ /* search for free workers and send work */
for (int p = 0; p < this->comm_size - 1; p++) { for (int p = 0; p < this->comm_size - 1; p++) {
if (w_list[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ { if (w_list[p].has_work == 0 && pkg_to_send > 0) /* worker is free */
{
/* to enable different work_package_size, set local copy of /* to enable different work_package_size, set local copy of
* work_package_size to pre-calculated work package size vector */ * work_package_size to pre-calculated work package size vector */
@ -244,6 +293,7 @@ inline void poet::ChemistryModule::MasterSendPkgs(
/* note current processed work package in workerlist */ /* note current processed work package in workerlist */
w_list[p].send_addr = work_pointer.base(); w_list[p].send_addr = work_pointer.base();
w_list[p].surrogate_addr = sur_pointer.base();
/* push work pointer to next work package */ /* push work pointer to next work package */
const uint32_t end_of_wp = local_work_package_size * this->prop_count; const uint32_t end_of_wp = local_work_package_size * this->prop_count;
@ -251,6 +301,7 @@ inline void poet::ChemistryModule::MasterSendPkgs(
std::copy(work_pointer, work_pointer + end_of_wp, send_buffer.begin()); std::copy(work_pointer, work_pointer + end_of_wp, send_buffer.begin());
work_pointer += end_of_wp; work_pointer += end_of_wp;
sur_pointer += end_of_wp;
// fill send buffer starting with work_package ... // fill send buffer starting with work_package ...
// followed by: work_package_size // followed by: work_package_size
@ -262,9 +313,12 @@ inline void poet::ChemistryModule::MasterSendPkgs(
// current time of simulation (age) in seconds // current time of simulation (age) in seconds
send_buffer[end_of_wp + 3] = this->simtime; send_buffer[end_of_wp + 3] = this->simtime;
// current work package start location in field // current work package start location in field
uint32_t wp_start_index = std::accumulate(wp_sizes_vector.begin(), std::next(wp_sizes_vector.begin(), count_pkgs), 0); uint32_t wp_start_index =
std::accumulate(wp_sizes_vector.begin(),
std::next(wp_sizes_vector.begin(), count_pkgs), 0);
send_buffer[end_of_wp + 4] = wp_start_index; send_buffer[end_of_wp + 4] = wp_start_index;
// whether this iteration is a control iteration
send_buffer[end_of_wp + 5] = control_iteration;
/* ATTENTION Worker p has rank p+1 */ /* ATTENTION Worker p has rank p+1 */
// MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1, // MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
@ -288,6 +342,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
int need_to_receive = 1; int need_to_receive = 1;
double idle_a, idle_b; double idle_a, idle_b;
int p, size; int p, size;
double recv_a, recv_b;
MPI_Status probe_status; MPI_Status probe_status;
// master_recv_a = MPI_Wtime(); // master_recv_a = MPI_Wtime();
@ -297,12 +352,12 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
// only of there are still packages to send and free workers are available // only of there are still packages to send and free workers are available
if (to_send && free_workers > 0) if (to_send && free_workers > 0)
// non blocking probing // non blocking probing
MPI_Iprobe(MPI_ANY_SOURCE, LOOP_WORK, MPI_COMM_WORLD, &need_to_receive, MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &need_to_receive,
&probe_status); &probe_status);
else { else {
idle_a = MPI_Wtime(); idle_a = MPI_Wtime();
// blocking probing // blocking probing
MPI_Probe(MPI_ANY_SOURCE, LOOP_WORK, MPI_COMM_WORLD, &probe_status); MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
idle_b = MPI_Wtime(); idle_b = MPI_Wtime();
this->idle_t += idle_b - idle_a; this->idle_t += idle_b - idle_a;
} }
@ -311,12 +366,33 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
* receive */ * receive */
if (need_to_receive) { if (need_to_receive) {
p = probe_status.MPI_SOURCE; p = probe_status.MPI_SOURCE;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size); if (probe_status.MPI_TAG == LOOP_WORK) {
MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK, MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
this->group_comm, MPI_STATUS_IGNORE); MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK,
w_list[p - 1].has_work = 0; this->group_comm, MPI_STATUS_IGNORE);
pkg_to_recv -= 1; w_list[p - 1].has_work = 0;
free_workers++; pkg_to_recv -= 1;
free_workers++;
}
if (probe_status.MPI_TAG == LOOP_CTRL) {
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
// layout of buffer is [phreeqc][surrogate]
std::vector<double> recv_buffer(size);
MPI_Recv(recv_buffer.data(), size, MPI_DOUBLE, p, LOOP_CTRL,
this->group_comm, MPI_STATUS_IGNORE);
std::copy(recv_buffer.begin(), recv_buffer.begin() + (size / 2),
w_list[p - 1].send_addr);
std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size,
w_list[p - 1].surrogate_addr);
w_list[p - 1].has_work = 0;
pkg_to_recv -= 1;
free_workers++;
}
} }
} }
} }
@ -376,11 +452,42 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
if (this->ai_surrogate_enabled) { if (this->ai_surrogate_enabled) {
ftype = CHEM_AI_BCAST_VALIDITY; ftype = CHEM_AI_BCAST_VALIDITY;
PropagateFunctionType(ftype); PropagateFunctionType(ftype);
this->ai_surrogate_validity_vector = shuffleVector(this->ai_surrogate_validity_vector, this->ai_surrogate_validity_vector =
this->n_cells, shuffleVector(this->ai_surrogate_validity_vector, this->n_cells,
wp_sizes_vector.size()); wp_sizes_vector.size());
ChemBCast(&this->ai_surrogate_validity_vector.front(), this->n_cells, MPI_INT); ChemBCast(&this->ai_surrogate_validity_vector.front(), this->n_cells,
} MPI_INT);
}
ftype = CHEM_WORK_LOOP;
PropagateFunctionType(ftype);
ftype = CHEM_INTERP;
PropagateFunctionType(ftype);
int interp_flag = 0;
int ht_fill_flag = 0;
if (this->runtime_params->global_iter <
this->runtime_params->control_interval ||
this->runtime_params->rollback_enabled) {
this->interp_enabled = false;
this->ht_fill = true;
interp_flag = 0;
ht_fill_flag = 1;
} else {
this->interp_enabled = true;
this->ht_fill = false;
interp_flag = 1;
ht_fill_flag = 0;
}
ChemBCast(&interp_flag, 1, MPI_INT);
ChemBCast(&ht_fill_flag, 1, MPI_INT);
/* end time measurement of broadcasting interpolation status */
ctrl_bcast_b = MPI_Wtime();
this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a;
ftype = CHEM_WORK_LOOP; ftype = CHEM_WORK_LOOP;
PropagateFunctionType(ftype); PropagateFunctionType(ftype);
@ -389,20 +496,32 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
static uint32_t iteration = 0; static uint32_t iteration = 0;
uint32_t control_logic_enabled =
this->runtime_params->control_interval_enabled ? 1 : 0;
if (control_logic_enabled) {
sur_shuffled.clear();
sur_shuffled.reserve(this->n_cells * this->prop_count);
}
/* start time measurement of sequential part */ /* start time measurement of sequential part */
seq_a = MPI_Wtime(); seq_a = MPI_Wtime();
/* shuffle grid */ /* shuffle grid */
// grid.shuffleAndExport(mpi_buffer); // grid.shuffleAndExport(mpi_buffer);
std::vector<double> mpi_buffer = std::vector<double> mpi_buffer =
shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count, shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count,
wp_sizes_vector.size()); wp_sizes_vector.size());
this->sur_shuffled.resize(mpi_buffer.size());
/* setup local variables */ /* setup local variables */
pkg_to_send = wp_sizes_vector.size(); pkg_to_send = wp_sizes_vector.size();
pkg_to_recv = wp_sizes_vector.size(); pkg_to_recv = wp_sizes_vector.size();
workpointer_t work_pointer = mpi_buffer.begin(); workpointer_t work_pointer = mpi_buffer.begin();
workpointer_t sur_pointer = sur_shuffled.begin();
worker_list_t worker_list(this->comm_size - 1); worker_list_t worker_list(this->comm_size - 1);
free_workers = this->comm_size - 1; free_workers = this->comm_size - 1;
@ -425,8 +544,9 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
// while there are still packages to send // while there are still packages to send
if (pkg_to_send > 0) { if (pkg_to_send > 0) {
// send packages to all free workers ... // send packages to all free workers ...
MasterSendPkgs(worker_list, work_pointer, pkg_to_send, i_pkgs, MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send,
free_workers, dt, iteration, wp_sizes_vector); i_pkgs, free_workers, dt, iteration, control_iteration,
wp_sizes_vector);
} }
// ... and try to receive them from workers who has finished their work // ... and try to receive them from workers who has finished their work
MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers); MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers);
@ -451,6 +571,30 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
/* do master stuff */ /* do master stuff */
/* start time measurement of control logic */
ctrl_a = MPI_Wtime();
if (control_logic_enabled && !this->runtime_params->rollback_enabled) {
std::vector<double> sur_unshuffled{sur_shuffled};
unshuffleField(sur_shuffled, this->n_cells, this->prop_count,
wp_sizes_vector.size(), sur_unshuffled);
SimulationErrorStats stats(this->prop_count,
this->runtime_params->global_iter,
this->runtime_params->rollback_counter);
computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells,
this->prop_count, stats);
error_history.push_back(stats);
}
/* end time measurement of control logic */
ctrl_b = MPI_Wtime();
this->ctrl_t += ctrl_b - ctrl_a;
/* start time measurement of master chemistry */ /* start time measurement of master chemistry */
sim_e_chemistry = MPI_Wtime(); sim_e_chemistry = MPI_Wtime();
@ -498,4 +642,4 @@ void poet::ChemistryModule::masterSetField(Field field) {
PropagateFunctionType(ftype); PropagateFunctionType(ftype);
ChemBCast(&this->prop_count, 1, MPI_UINT32_T); ChemBCast(&this->prop_count, 1, MPI_UINT32_T);
} }

View File

@ -9,6 +9,7 @@
#include <cstdint> #include <cstdint>
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
#include <limits>
#include <mpi.h> #include <mpi.h>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
@ -58,6 +59,16 @@ void poet::ChemistryModule::WorkerLoop() {
MPI_INT, 0, this->group_comm); MPI_INT, 0, this->group_comm);
break; break;
} }
case CHEM_INTERP: {
int interp_flag = 0;
int ht_fill_flag = 0;
ChemBCast(&interp_flag, 1, MPI_INT);
ChemBCast(&ht_fill_flag, 1, MPI_INT);
this->interp_enabled = (interp_flag == 1);
this->ht_fill = (ht_fill_flag == 1);
break;
}
case CHEM_WORK_LOOP: { case CHEM_WORK_LOOP: {
WorkerProcessPkgs(timings, iteration); WorkerProcessPkgs(timings, iteration);
break; break;
@ -123,12 +134,14 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
double dht_get_start, dht_get_end; double dht_get_start, dht_get_end;
double phreeqc_time_start, phreeqc_time_end; double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end; double dht_fill_start, dht_fill_end;
double ctrl_time_c, ctrl_time_d;
uint32_t iteration; uint32_t iteration;
double dt; double dt;
double current_sim_time; double current_sim_time;
uint32_t wp_start_index; uint32_t wp_start_index;
int count = double_count; int count = double_count;
bool control_logic_enabled = false;
std::vector<double> mpi_buffer(count); std::vector<double> mpi_buffer(count);
/* receive */ /* receive */
@ -137,7 +150,6 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
/* decrement count of work_package by BUFFER_OFFSET */ /* decrement count of work_package by BUFFER_OFFSET */
count -= BUFFER_OFFSET; count -= BUFFER_OFFSET;
/* check for changes on all additional variables given by the 'header' of /* check for changes on all additional variables given by the 'header' of
* mpi_buffer */ * mpi_buffer */
@ -156,6 +168,8 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
// current work package start location in field // current work package start location in field
wp_start_index = mpi_buffer[count + 4]; wp_start_index = mpi_buffer[count + 4];
control_logic_enabled = (mpi_buffer[count + 5] == 1);
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
s_curr_wp.input[wp_i] = s_curr_wp.input[wp_i] =
std::vector<double>(mpi_buffer.begin() + this->prop_count * wp_i, std::vector<double>(mpi_buffer.begin() + this->prop_count * wp_i,
@ -163,7 +177,7 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
} }
// std::cout << this->comm_rank << ":" << counter++ << std::endl; // std::cout << this->comm_rank << ":" << counter++ << std::endl;
if (dht_enabled || interp_enabled) { if (dht_enabled || interp_enabled || ht_fill) {
dht->prepareKeys(s_curr_wp.input, dt); dht->prepareKeys(s_curr_wp.input, dt);
} }
@ -188,36 +202,89 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
} }
} }
/* if control iteration: create copy surrogate results (output and mappings)
and then set them to zero, give this to phreeqc */
poet::WorkPackage s_curr_wp_control = s_curr_wp;
if (control_logic_enabled) {
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) {
s_curr_wp_control.output[wp_i] =
std::vector<double>(this->prop_count, 0.0);
s_curr_wp_control.mapping[wp_i] = 0;
}
}
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
WorkerRunWorkPackage(s_curr_wp, current_sim_time, dt); WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp,
current_sim_time, dt);
phreeqc_time_end = MPI_Wtime(); phreeqc_time_end = MPI_Wtime();
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { if (control_logic_enabled) {
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), /* start time measurement for copying control workpackage */
mpi_buffer.begin() + this->prop_count * wp_i); ctrl_time_c = MPI_Wtime();
std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count;
mpi_buffer.resize(count + sur_wp_offset);
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) {
std::copy(s_curr_wp_control.output[wp_i].begin(),
s_curr_wp_control.output[wp_i].end(),
mpi_buffer.begin() + this->prop_count * wp_i);
}
// s_curr_wp only contains the interpolated data
// copy surrogate output after the the pqc output, mpi_buffer[pqc][interp]
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
if (s_curr_wp.mapping[wp_i] !=
CHEM_PQC) // only copy if surrogate was used
{
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i);
} else {
// if pqc was used, copy pqc results again
std::copy(s_curr_wp_control.output[wp_i].begin(),
s_curr_wp_control.output[wp_i].end(),
mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i);
}
}
count += sur_wp_offset;
/* end time measurement for copying control workpackage */
ctrl_time_d = MPI_Wtime();
timings.ctrl_t += ctrl_time_d - ctrl_time_c;
} else {
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
mpi_buffer.begin() + this->prop_count * wp_i);
}
} }
/* send results to master */ /* send results to master */
MPI_Request send_req; MPI_Request send_req;
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, MPI_COMM_WORLD,
int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK;
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD,
&send_req); &send_req);
if (dht_enabled || interp_enabled) { if (dht_enabled || interp_enabled || ht_fill) {
/* write results to DHT */ /* write results to DHT */
dht_fill_start = MPI_Wtime(); dht_fill_start = MPI_Wtime();
dht->fillDHT(s_curr_wp); dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp);
dht_fill_end = MPI_Wtime(); dht_fill_end = MPI_Wtime();
if (interp_enabled) { if (interp_enabled || ht_fill) {
interp->writePairs(); interp->writePairs();
} }
timings.dht_fill += dht_fill_end - dht_fill_start; timings.dht_fill += dht_fill_end - dht_fill_start;
} }
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
MPI_Wait(&send_req, MPI_STATUS_IGNORE); MPI_Wait(&send_req, MPI_STATUS_IGNORE);
} }
@ -318,12 +385,21 @@ void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
if (work_package.mapping[wp_id] != CHEM_PQC) { if (work_package.mapping[wp_id] != CHEM_PQC) {
to_ignore.push_back(wp_id); to_ignore.push_back(wp_id);
} }
// HACK: remove the first element (cell_id) before sending to phreeqc
inout_chem[wp_id].erase(inout_chem[wp_id].begin(),
inout_chem[wp_id].begin() + 1);
} }
this->pqc_runner->run(inout_chem, dTimestep, to_ignore); this->pqc_runner->run(inout_chem, dTimestep, to_ignore);
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
if (work_package.mapping[wp_id] == CHEM_PQC) { if (work_package.mapping[wp_id] == CHEM_PQC) {
work_package.output[wp_id] = inout_chem[wp_id]; // HACK: as we removed the first element (cell_id) before sending to
// phreeqc, copy back with an offset of 1
work_package.output[wp_id] = work_package.input[wp_id];
std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(),
work_package.output[wp_id].begin() + 1);
} }
} }
} }
@ -336,6 +412,11 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
this->group_comm); this->group_comm);
break; break;
} }
case WORKER_CTRL_ITER: {
MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm);
break;
}
case WORKER_DHT_GET: { case WORKER_DHT_GET: {
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm); this->group_comm);

9
src/IO/Datatypes.hpp Normal file
View File

@ -0,0 +1,9 @@
#pragma once
#include <cstdint>
#include <DataStructures/Field.hpp>
struct Checkpoint_s {
poet::Field &field;
uint32_t iteration;
};

9
src/IO/HDF5Functions.hpp Normal file
View File

@ -0,0 +1,9 @@
#pragma once
#include <string>
#include "Datatypes.hpp"
int write_checkpoint(const std::string &dir_path, const std::string &file_name, struct Checkpoint_s &&checkpoint);
int read_checkpoint(const std::string &dir_path, const std::string &file_name, struct Checkpoint_s &checkpoint);

45
src/IO/StatsIO.cpp Normal file
View File

@ -0,0 +1,45 @@
#include "IO/StatsIO.hpp"
#include <fstream>
#include <iostream>
#include <string>
#include <iomanip>
#include <filesystem>
namespace poet
{
void writeStatsToCSV(const std::vector<ChemistryModule::error_stats> &all_stats,
const std::vector<std::string> &species_names,
const std::string &out_dir,
const std::string &filename)
{
std::filesystem::path full_path = std::filesystem::path(out_dir) / filename;
std::ofstream out(full_path);
if (!out.is_open())
{
std::cerr << "Could not open " << filename << " !" << std::endl;
return;
}
// header
out << "Iteration, Species, MAPE, RRSME \n";
out << std::string(75, '-') << "\n";
// data rows
for (size_t i = 0; i < all_stats.size(); ++i)
{
for (size_t j = 0; j < species_names.size(); ++j)
{
out << all_stats[i].iteration << ",\t"
<< species_names[j] << ",\t"
<< all_stats[i].mape[j] << ",\t"
<< all_stats[i].rrsme[j] << "\n";
}
out << "\n";
}
out.close();
std::cout << "Stats written to " << filename << "\n";
}
} // namespace poet

10
src/IO/StatsIO.hpp Normal file
View File

@ -0,0 +1,10 @@
#include <string>
#include "Chemistry/ChemistryModule.hpp"
namespace poet
{
void writeStatsToCSV(const std::vector<ChemistryModule::error_stats> &all_stats,
const std::vector<std::string> &species_names,
const std::string &out_dir,
const std::string &filename);
} // namespace poet

42
src/IO/checkpoint.cpp Normal file
View File

@ -0,0 +1,42 @@
#include "IO/Datatypes.hpp"
#include <cstdint>
#include <highfive/H5Easy.hpp>
#include <filesystem>
namespace fs = std::filesystem;
int write_checkpoint(const std::string &dir_path, const std::string &file_name, struct Checkpoint_s &&checkpoint){
if (!fs::exists(dir_path)) {
std::cerr << "Directory does not exist: " << dir_path << std::endl;
return -1;
}
fs::path file_path = fs::path(dir_path) / file_name;
// TODO: errorhandling
H5Easy::File file(file_path, H5Easy::File::Overwrite);
H5Easy::dump(file, "/MetaParam/Iterations", checkpoint.iteration);
H5Easy::dump(file, "/Grid/Names", checkpoint.field.GetProps());
H5Easy::dump(file, "/Grid/Chemistry", checkpoint.field.As2DVector());
return 0;
}
int read_checkpoint(const std::string &dir_path, const std::string &file_name, struct Checkpoint_s &checkpoint){
fs::path file_path = fs::path(dir_path) / file_name;
if (!fs::exists(file_path)) {
std::cerr << "File does not exist: " << file_path << std::endl;
return -1;
}
H5Easy::File file(file_path, H5Easy::File::ReadOnly);
checkpoint.iteration = H5Easy::load<uint32_t>(file, "/MetaParam/Iterations");
checkpoint.field = H5Easy::load<std::vector<std::vector<double>>>(file, "/Grid/Chemistry");
return 0;
}

View File

@ -26,6 +26,9 @@
#include "CLI/CLI.hpp" #include "CLI/CLI.hpp"
#include "Chemistry/ChemistryModule.hpp" #include "Chemistry/ChemistryModule.hpp"
#include "DataStructures/Field.hpp" #include "DataStructures/Field.hpp"
#include "IO/Datatypes.hpp"
#include "IO/HDF5Functions.hpp"
#include "IO/StatsIO.hpp"
#include "Init/InitialList.hpp" #include "Init/InitialList.hpp"
#include "Transport/DiffusionModule.hpp" #include "Transport/DiffusionModule.hpp"
@ -236,7 +239,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params) {
// R["dht_log"] = simparams.dht_log; // R["dht_log"] = simparams.dht_log;
try { try {
Rcpp::List init_params_(ReadRObj_R(init_file)); Rcpp::List init_params_(ReadRObj_R(init_file));
params.init_params = init_params_; params.init_params = init_params_;
@ -249,7 +251,12 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params) {
params.timesteps = params.timesteps =
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps")); Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps"));
params.control_interval =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("control_interval"));
params.checkpoint_interval =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("checkpoint_interval"));
params.mape_threshold = Rcpp::as<std::vector<double>>(
global_rt_setup->operator[]("mape_threshold"));
} catch (const std::exception &e) { } catch (const std::exception &e) {
ERRMSG("Error while parsing R scripts: " + std::string(e.what())); ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
return ParseRet::PARSER_ERROR; return ParseRet::PARSER_ERROR;
@ -277,7 +284,42 @@ void call_master_iter_end(RInside &R, const Field &trans, const Field &chem) {
*global_rt_setup = R["setup"]; *global_rt_setup = R["setup"];
} }
static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params, bool triggerRollbackIfExceeded(ChemistryModule &chem, RuntimeParameters &params,
uint32_t &current_iteration) {
const auto &mape = chem.error_history.back().mape;
const auto &props = chem.getField().GetProps();
for (uint32_t i = 0; i < params.mape_threshold.size(); ++i) {
// Skip invalid entries
if (mape[i] == 0) {
continue;
}
bool mape_exceeded = mape[i] > params.mape_threshold[i];
if (mape_exceeded) {
uint32_t rollback_iter =
((current_iteration - 1) / params.checkpoint_interval) *
params.checkpoint_interval;
MSG("[THRESHOLD EXCEEDED] " + props[i] +
" has MAPE = " + std::to_string(mape[i]) +
" exceeding threshold = " + std::to_string(params.mape_threshold[i]) +
" → rolling back to iteration " + std::to_string(rollback_iter));
Checkpoint_s checkpoint_read{.field = chem.getField()};
read_checkpoint(params.out_dir,
"checkpoint" + std::to_string(rollback_iter) + ".hdf5",
checkpoint_read);
current_iteration = checkpoint_read.iteration;
return true; // rollback happened
}
}
MSG("All species are within their MAPE and RRMSE thresholds.");
return false;
}
static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters &params,
DiffusionModule &diffusion, DiffusionModule &diffusion,
ChemistryModule &chem) { ChemistryModule &chem) {
@ -290,9 +332,28 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
} }
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps()); R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
params.next_penalty_check = params.penalty_iteration;
/* SIMULATION LOOP */ /* SIMULATION LOOP */
double dSimTime{0}; double dSimTime{0};
double write_chk = 0.0;
double write_stats = 0.0;
double read_chk = 0.0;
for (uint32_t iter = 1; iter < maxiter + 1; iter++) { for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
// Rollback countdowm
if (params.rollback_enabled) {
if (params.sur_disabled_counter > 0) {
--params.sur_disabled_counter;
MSG("Rollback counter: " + std::to_string(params.sur_disabled_counter));
} else {
params.rollback_enabled = false;
}
}
params.control_iteration_active = (iter % params.control_iteration == 0 /* && iter != 0 */);
double start_t = MPI_Wtime(); double start_t = MPI_Wtime();
const double &dt = params.timesteps[iter - 1]; const double &dt = params.timesteps[iter - 1];
@ -308,6 +369,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
/* run transport */ /* run transport */
diffusion.simulate(dt); diffusion.simulate(dt);
chem.runtime_params = &params;
chem.getField().update(diffusion.getField()); chem.getField().update(diffusion.getField());
// MSG("Chemistry start"); // MSG("Chemistry start");
@ -393,10 +456,50 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// store_result is TRUE) // store_result is TRUE)
call_master_iter_end(R, diffusion.getField(), chem.getField()); call_master_iter_end(R, diffusion.getField(), chem.getField());
// TODO: write checkpoint
// checkpoint struct --> field and iteration
diffusion.getField().update(chem.getField()); diffusion.getField().update(chem.getField());
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" + MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
std::to_string(maxiter)); std::to_string(maxiter));
double write_chk_start, write_chk_end;
double write_stats_start, write_stats_end;
double read_chk_start, read_chk_end;
if (params.control_interval_enabled) {
write_chk_start = MPI_Wtime();
MSG("Writing checkpoint of iteration " + std::to_string(iter));
write_checkpoint(params.out_dir,
"checkpoint" + std::to_string(iter) + ".hdf5",
{.field = chem.getField(), .iteration = iter});
write_chk_end = MPI_Wtime();
if (!params.rollback_enabled) {
write_stats_start = MPI_Wtime();
writeStatsToCSV(chem.error_history, chem.getField().GetProps(),
params.out_dir, "stats_overview");
write_stats_end = MPI_Wtime();
read_chk_start = MPI_Wtime();
params.rollback_enabled = triggerRollbackIfExceeded(chem, params, iter);
read_chk_end = MPI_Wtime();
if (params.rollback_enabled) {
params.rollback_counter++;
params.sur_disabled_counter = params.control_interval;
MSG("Interpolation disabled for the next " +
std::to_string(params.control_interval) + ".");
}
}
}
write_chk += write_chk_end - write_chk_start;
write_stats += write_stats_end - write_stats_start;
read_chk += read_chk_end - read_chk_start;
// MSG(); // MSG();
} // END SIMULATION LOOP } // END SIMULATION LOOP
@ -413,6 +516,16 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
Rcpp::List diffusion_profiling; Rcpp::List diffusion_profiling;
diffusion_profiling["simtime"] = diffusion.getTransportTime(); diffusion_profiling["simtime"] = diffusion.getTransportTime();
Rcpp::List ctrl_profiling;
ctrl_profiling["write_checkpoint"] = write_chk;
ctrl_profiling["read_checkpoint"] = read_chk;
ctrl_profiling["write_metrics"] = write_stats;
ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime();
ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime();
ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime();
ctrl_profiling["ctrl_logic_worker"] =
Rcpp::wrap(chem.GetWorkerControlTimings());
if (params.use_dht) { if (params.use_dht) {
chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits()); chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
chem_profiling["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions()); chem_profiling["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
@ -439,6 +552,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
profiling["simtime"] = dSimTime; profiling["simtime"] = dSimTime;
profiling["chemistry"] = chem_profiling; profiling["chemistry"] = chem_profiling;
profiling["diffusion"] = diffusion_profiling; profiling["diffusion"] = diffusion_profiling;
profiling["ctrl_logic"] = ctrl_profiling;
chem.MasterLoopBreak(); chem.MasterLoopBreak();

View File

@ -41,6 +41,7 @@ static const inline std::string r_runtime_parameters = "mysetup";
struct RuntimeParameters { struct RuntimeParameters {
std::string out_dir; std::string out_dir;
std::vector<double> timesteps; std::vector<double> timesteps;
std::vector<double> species_epsilon;
Rcpp::List init_params; Rcpp::List init_params;
@ -51,6 +52,15 @@ struct RuntimeParameters {
bool print_progress = false; bool print_progress = false;
bool rollback_enabled = false;
bool control_interval_enabled = false;
std::uint32_t global_iter = 0;
std::uint32_t sur_disabled_counter = 0;
std::uint32_t rollback_counter = 0;
std::uint32_t checkpoint_interval = 0;
std::uint32_t control_interval = 0;
std::vector<double> mape_threshold;
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32; static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT; std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;

View File

@ -12,7 +12,7 @@ get_filename_component(TEST_RInsideSourceFile "RInsidePOET_funcs.R" REALPATH)
configure_file(testDataStructures.hpp.in testDataStructures.hpp) configure_file(testDataStructures.hpp.in testDataStructures.hpp)
target_include_directories(testPOET PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") target_include_directories(testPOET PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
add_custom_target(check add_custom_target(check_poet
COMMAND $<TARGET_FILE:testPOET> COMMAND $<TARGET_FILE:testPOET>
DEPENDS testPOET DEPENDS testPOET
) )

119
test/testStats.cpp Normal file
View File

@ -0,0 +1,119 @@
#include <cmath>
#include <cstddef>
#include <doctest/doctest.h>
#include <vector>
#include <Chemistry/ChemistryModule.hpp>
TEST_CASE("Stats calculation")
{
std::vector<double> real =
{
2, 2, 2, 2, // species 1
2.0, 0.01, 0.7, 0.5, // species 2
0.0, 0.0, 0.0, 0.0, // species 3
0.0, 0.0, 0.0, 0.0, // species 4
-2.5, -0.02, -0.7, -0.5, // species 5
7.7, 6.01, 4.7, 0.5 // species 6
};
std::vector<double> pred =
{
2, 2, 2, 2,
2.0, 0.02, 0.6, 0.5,
0.1, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0, 0.0,
2.5, 0.01, 0.6, 0.5,
2.8, 0.02, 0.7, 0.5
};
poet::ChemistryModule::error_stats stats(6, 5);
poet::ChemistryModule::computeStats(real, pred, /*size_per_prop*/ 4, /*species_count*/ 6, stats);
SUBCASE("Non-zero values")
{
// species 1 is ID, should stay 0
CHECK_EQ(stats.mape[0], 0);
CHECK_EQ(stats.rrsme[0], 0);
/*
mape species 2
cell0: |(2.0 - 2.0)/2.0| = 0
cell1: |(0.01 - 0.02)/0.01| = 1
cell2: |(0.7 - 0.6)/0.7| = 0.142857143
cell3: |(0.5 - 0.5)/0.5| = 0
mape = 1.142857143/ 4 = 0.285714286 *100
rrsme species 1
squared err sum = 1.02040816
rrsme = sqrt(1.02040816/4) = 0.50507627
*/
CHECK_EQ(stats.mape[1], doctest::Approx(28.5714286).epsilon(1e-6));
CHECK_EQ(stats.rrsme[1], doctest::Approx(0.50507627).epsilon(1e-6));
}
SUBCASE("Zero-denominator case")
{
/*
mape species 3
cell0: |(0.0 - 0.1)/0.0|
cell1: |(0.0 - 0.0)/0.0|
cell2: |(0.0 - 0.0)/0.0|
cell3: |(0.0 - 0.0)/0.0|
mape = 1 *100
rrsme = 1
*/
CHECK_EQ(stats.mape[2], 100.0);
CHECK_EQ(stats.rrsme[2], 1.0);
}
SUBCASE("True and predicted values are zero")
{
/*
mape species 4
cell0: |(0.0 - 0.0)/0.0|
cell1: |(0.0 - 0.0)/0.0|
cell2: |(0.0 - 0.0)/0.0|
cell3: |(0.0 - 0.0)/0.0|
mape = 0.0
rrsme = 0.0
*/
CHECK_EQ(stats.mape[3], 0.0);
CHECK_EQ(stats.rrsme[3], 0.0);
}
SUBCASE("Negative values")
{
/*
mape species 5
cell0: |(-2.5 - 2.5)/-2.5| = 2
cell1: |(-0.02 - 0.01)/-0.02| = 1.5
cell2: |(-0.7 - 0.6)/-0.7| = 1.85714286
cell3: |(-0.5 - 0.5)/-0.5| = 2
mape = (100.0 / 4) * 7.35714286 = 183.92857143
rrsme = sqrt(13.6989796 / 4) = 1.85060663
*/
CHECK_EQ(stats.mape[4], doctest::Approx(183.92857143).epsilon(1e-6));
CHECK_EQ(stats.rrsme[4], doctest::Approx(1.85060663).epsilon(1e-6));
}
SUBCASE("Large differences")
{
/*
mape species 6
cell0: |(7.7 - 2.8)/7.7| = 0.63636364
cell1: |(6.01 - 0.02)/6.01| = 0.99667221
cell2: |(4.7 - 0.7)/4.7| = 0.85106383
cell3: |(0.5 - 0.5)/0.5| = 0
mape = (100.0 / 4) * 2.48409968 = 62.102492
rrsme = sqrt(2,12262382 / 4) = 0.72846136
*/
CHECK_EQ(stats.mape[5], doctest::Approx(62.102492).epsilon(1e-6));
CHECK_EQ(stats.rrsme[5], doctest::Approx(0.72846136).epsilon(1e-6));
}
}