Control component with minimum features

This commit is contained in:
rastogi 2025-10-15 10:15:21 +02:00
parent 104301dd5f
commit 1944b0fc76
14 changed files with 664 additions and 511 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -185,6 +185,13 @@ namespace poet
*/
auto GetMasterLoopTime() const { return this->send_recv_t; }
auto GetMasterCtrlLogicTime() const { return this->ctrl_t; }
auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; }
auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; }
/**
* **Master only** Collect and return all accumulated timings recorded by
* workers to run Phreeqc simulation.
@ -214,6 +221,8 @@ namespace poet
*/
std::vector<double> GetWorkerIdleTimings() const;
std::vector<double> GetWorkerControlTimings() const;
/**
* **Master only** Collect and return DHT hits of all workers.
*
@ -262,25 +271,29 @@ namespace poet
std::vector<int> ai_surrogate_validity_vector;
RuntimeParameters *runtime_params = nullptr;
uint32_t control_iteration_counter = 0;
struct error_stats
struct SimulationErrorStats
{
std::vector<double> mape;
std::vector<double> rrsme;
uint32_t iteration;
std::vector<double> rrmse;
uint32_t iteration; // iterations in simulation after rollbacks
uint32_t rollback_count;
error_stats(size_t species_count, size_t iter)
: mape(species_count, 0.0), rrsme(species_count, 0.0), iteration(iter) {}
SimulationErrorStats(size_t species_count, uint32_t iter, uint32_t counter)
: mape(species_count, 0.0),
rrmse(species_count, 0.0),
iteration(iter),
rollback_count(counter){}
};
std::vector<error_stats> error_stats_history;
std::vector<SimulationErrorStats> error_history;
static void computeSpeciesErrors(const std::vector<double> &reference_values,
const std::vector<double> &surrogate_values,
uint32_t size_per_prop,
uint32_t species_count,
SimulationErrorStats &species_error_stats);
static void computeStats(const std::vector<double> &pqc_vector,
const std::vector<double> &sur_vector,
uint32_t size_per_prop, uint32_t species_count,
error_stats &stats);
protected:
void initializeDHT(uint32_t size_mb,
const NamedVector<std::uint32_t> &key_species,
@ -319,6 +332,7 @@ namespace poet
enum
{
WORKER_PHREEQC,
WORKER_CTRL_ITER,
WORKER_DHT_GET,
WORKER_DHT_FILL,
WORKER_IDLE,
@ -342,6 +356,7 @@ namespace poet
double dht_get = 0.;
double dht_fill = 0.;
double idle_t = 0.;
double ctrl_t = 0.;
};
struct worker_info_s
@ -410,6 +425,7 @@ namespace poet
poet::DHT_Wrapper *dht = nullptr;
bool dht_fill_during_rollback{false};
bool interp_enabled{false};
std::unique_ptr<poet::InterpolationModule> interp;
@ -431,6 +447,10 @@ namespace poet
double seq_t = 0.;
double send_recv_t = 0.;
double ctrl_t = 0.;
double bcast_ctrl_t = 0.;
double recv_ctrl_t = 0.;
std::array<double, 2> base_totals{0};
bool print_progessbar{false};
@ -449,7 +469,7 @@ namespace poet
std::unique_ptr<PhreeqcRunner> pqc_runner;
std::vector<double> sur_shuffled;
std::vector<double> sur_shuffled;
};
} // namespace poet

View File

@ -41,6 +41,12 @@ std::vector<double> poet::ChemistryModule::GetWorkerPhreeqcTimings() const {
return MasterGatherWorkerTimings(WORKER_PHREEQC);
}
std::vector<double> poet::ChemistryModule::GetWorkerControlTimings() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
return MasterGatherWorkerTimings(WORKER_CTRL_ITER);
}
std::vector<double> poet::ChemistryModule::GetWorkerDHTGetTimings() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
@ -160,35 +166,35 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerPHTCacheHits() const {
return ret;
}
void poet::ChemistryModule::computeStats(const std::vector<double> &pqc_vector,
const std::vector<double> &sur_vector,
void poet::ChemistryModule::computeSpeciesErrors(const std::vector<double> &reference_values,
const std::vector<double> &surrogate_values,
uint32_t size_per_prop,
uint32_t species_count,
error_stats &stats) {
SimulationErrorStats &species_error_stats) {
for (uint32_t i = 0; i < species_count; ++i) {
double err_sum = 0.0;
double sqr_err_sum = 0.0;
uint32_t base_idx = i * size_per_prop;
for (uint32_t j = 0; j < size_per_prop; ++j) {
const double pqc_value = pqc_vector[base_idx + j];
const double sur_value = sur_vector[base_idx + j];
const double ref_value = reference_values[base_idx + j];
const double sur_value = surrogate_values[base_idx + j];
if (pqc_value == 0.0) {
if (ref_value == 0.0) {
if (sur_value != 0.0) {
err_sum += 1.0;
sqr_err_sum += 1.0;
}
// Both zero: skip
} else {
double alpha = 1.0 - (sur_value / pqc_value);
double alpha = 1.0 - (sur_value / ref_value);
err_sum += std::abs(alpha);
sqr_err_sum += alpha * alpha;
}
}
stats.mape[i] = 100.0 * (err_sum / size_per_prop);
stats.rrsme[i] =
species_error_stats.mape[i] = 100.0 * (err_sum / size_per_prop);
species_error_stats.rrmse[i] =
(size_per_prop > 0) ? std::sqrt(sqr_err_sum / size_per_prop) : 0.0;
}
}
@ -264,7 +270,7 @@ inline void poet::ChemistryModule::MasterSendPkgs(
worker_list_t &w_list, workpointer_t &work_pointer,
workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs,
int &free_workers, double dt, uint32_t iteration,
uint32_t control_iteration, const std::vector<uint32_t> &wp_sizes_vector) {
uint32_t control_interval, const std::vector<uint32_t> &wp_sizes_vector) {
/* declare variables */
int local_work_package_size;
@ -305,7 +311,7 @@ inline void poet::ChemistryModule::MasterSendPkgs(
std::next(wp_sizes_vector.begin(), count_pkgs), 0);
send_buffer[end_of_wp + 4] = wp_start_index;
// whether this iteration is a control iteration
send_buffer[end_of_wp + 5] = control_iteration;
send_buffer[end_of_wp + 5] = control_interval;
/* ATTENTION Worker p has rank p+1 */
// MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
@ -329,6 +335,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
int need_to_receive = 1;
double idle_a, idle_b;
int p, size;
double recv_a, recv_b;
MPI_Status probe_status;
// master_recv_a = MPI_Wtime();
@ -361,6 +368,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
free_workers++;
}
if (probe_status.MPI_TAG == LOOP_CTRL) {
recv_a = MPI_Wtime();
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
// layout of buffer is [phreeqc][surrogate]
@ -378,6 +386,8 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
w_list[p - 1].has_work = 0;
pkg_to_recv -= 1;
free_workers++;
recv_b = MPI_Wtime();
this->recv_ctrl_t += recv_b - recv_a;
}
}
}
@ -432,6 +442,10 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
int i_pkgs;
int ftype;
double ctrl_a, ctrl_b;
double worker_ctrl_a, worker_ctrl_b;
double ctrl_bcast_a, ctrl_bcast_b;
const std::vector<uint32_t> wp_sizes_vector =
CalculateWPSizesVector(this->n_cells, this->wp_size);
@ -445,28 +459,44 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
MPI_INT);
}
ftype = CHEM_WORK_LOOP;
PropagateFunctionType(ftype);
/* start time measurement of broadcasting interpolation status */
ctrl_bcast_a = MPI_Wtime();
ftype = CHEM_INTERP;
PropagateFunctionType(ftype);
if(this->runtime_params->rollback_simulation){
int interp_flag = 0;
int dht_fill_flag = 0;
if(this->runtime_params->rollback_enabled){
this->interp_enabled = false;
int interp_flag = 0;
ChemBCast(&interp_flag, 1, MPI_INT);
} else {
this->dht_fill_during_rollback = true;
interp_flag = 0;
dht_fill_flag = 1;
}
else {
this->interp_enabled = true;
int interp_flag = 1;
ChemBCast(&interp_flag, 1, MPI_INT);
this->dht_fill_during_rollback = false;
interp_flag = 1;
dht_fill_flag = 0;
}
ChemBCast(&interp_flag, 1, MPI_INT);
ChemBCast(&dht_fill_flag, 1, MPI_INT);
/* end time measurement of broadcasting interpolation status */
ctrl_bcast_b = MPI_Wtime();
this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a;
ftype = CHEM_WORK_LOOP;
PropagateFunctionType(ftype);
MPI_Barrier(this->group_comm);
static uint32_t iteration = 0;
uint32_t control_iteration = static_cast<uint32_t>(
this->runtime_params->control_iteration_active ? 1 : 0);
if (control_iteration) {
uint32_t control_logic_enabled = this->runtime_params->control_interval_enabled ? 1 : 0;
if (control_logic_enabled) {
sur_shuffled.clear();
sur_shuffled.reserve(this->n_cells * this->prop_count);
}
@ -512,7 +542,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
if (pkg_to_send > 0) {
// send packages to all free workers ...
MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send,
i_pkgs, free_workers, dt, iteration, control_iteration,
i_pkgs, free_workers, dt, iteration, control_logic_enabled,
wp_sizes_vector);
}
// ... and try to receive them from workers who has finished their work
@ -522,39 +552,43 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
// Just to complete the progressbar
std::cout << std::endl;
/* stop time measurement of chemistry time needed for send/recv loop */
worker_chemistry_b = MPI_Wtime();
this->send_recv_t += worker_chemistry_b - worker_chemistry_a;
/* stop time measurement of chemistry time needed for send/recv loop */
worker_chemistry_b = MPI_Wtime();
this->send_recv_t += worker_chemistry_b - worker_chemistry_a;
/* start time measurement of sequential part */
seq_c = MPI_Wtime();
/* start time measurement of sequential part */
seq_c = MPI_Wtime();
/* unshuffle grid */
// grid.importAndUnshuffle(mpi_buffer);
std::vector<double> out_vec{mpi_buffer};
unshuffleField(mpi_buffer, this->n_cells, this->prop_count,
wp_sizes_vector.size(), out_vec);
chem_field = out_vec;
/* unshuffle grid */
// grid.importAndUnshuffle(mpi_buffer);
std::vector<double> out_vec{mpi_buffer};
unshuffleField(mpi_buffer, this->n_cells, this->prop_count,
wp_sizes_vector.size(), out_vec);
chem_field = out_vec;
/* do master stuff */
/* do master stuff */
if (control_iteration) {
control_iteration_counter++;
/* start time measurement of control logic */
ctrl_a = MPI_Wtime();
std::vector<double> sur_unshuffled{sur_shuffled};
if (control_logic_enabled && !this->runtime_params->rollback_enabled) {
unshuffleField(sur_shuffled, this->n_cells, this->prop_count,
wp_sizes_vector.size(), sur_unshuffled);
std::vector<double> sur_unshuffled{sur_shuffled};;
error_stats stats(this->prop_count, control_iteration_counter *
runtime_params->control_iteration);
unshuffleField(sur_shuffled, this->n_cells, this->prop_count,
wp_sizes_vector.size(), sur_unshuffled);
computeStats(out_vec, sur_unshuffled, this->n_cells, this->prop_count,
stats);
error_stats_history.push_back(stats);
SimulationErrorStats stats(this->prop_count, this->runtime_params->global_iter, this->runtime_params->rollback_counter);
computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells, this->prop_count, stats);
error_history.push_back(stats);
}
/* end time measurement of control logic */
ctrl_b = MPI_Wtime();
this->ctrl_t += ctrl_b - ctrl_a;
// to do: control values to epsilon
}
/* start time measurement of master chemistry */
sim_e_chemistry = MPI_Wtime();

View File

@ -36,315 +36,357 @@
using namespace std;
namespace poet {
namespace poet
{
DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
const NamedVector<std::uint32_t> &key_species,
const std::vector<std::int32_t> &key_indices,
const std::vector<std::string> &_output_names,
const InitialList::ChemistryHookFunctions &_hooks,
uint32_t data_count, bool _with_interp,
bool _has_het_ids)
: key_count(key_indices.size()), data_count(data_count),
input_key_elements(key_indices), communicator(dht_comm),
key_species(key_species), output_names(_output_names), hooks(_hooks),
with_interp(_with_interp), has_het_ids(_has_het_ids) {
// initialize DHT object
// key size = count of key elements + timestep
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
uint32_t data_size =
(data_count + (with_interp ? input_key_elements.size() : 0)) *
sizeof(double);
uint32_t buckets_per_process =
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
&poet::Murmur2_64A);
DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
const NamedVector<std::uint32_t> &key_species,
const std::vector<std::int32_t> &key_indices,
const std::vector<std::string> &_output_names,
const InitialList::ChemistryHookFunctions &_hooks,
uint32_t data_count, bool _with_interp,
bool _has_het_ids)
: key_count(key_indices.size()), data_count(data_count),
input_key_elements(key_indices), communicator(dht_comm),
key_species(key_species), output_names(_output_names), hooks(_hooks),
with_interp(_with_interp), has_het_ids(_has_het_ids)
{
// initialize DHT object
// key size = count of key elements + timestep
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
uint32_t data_size =
(data_count + (with_interp ? input_key_elements.size() : 0)) *
sizeof(double);
uint32_t buckets_per_process =
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
&poet::Murmur2_64A);
dht_signif_vector = key_species.getValues();
dht_signif_vector = key_species.getValues();
// this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
// this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT);
this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT);
const auto key_names = key_species.getNames();
const auto key_names = key_species.getNames();
auto tot_h = std::find(key_names.begin(), key_names.end(), "H");
if (tot_h != key_names.end()) {
this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL;
}
auto tot_o = std::find(key_names.begin(), key_names.end(), "O");
if (tot_o != key_names.end()) {
this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL;
}
auto charge = std::find(key_names.begin(), key_names.end(), "Charge");
if (charge != key_names.end()) {
this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE;
}
}
DHT_Wrapper::~DHT_Wrapper() {
// free DHT
DHT_free(dht_object, NULL, NULL);
}
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
-> const DHT_ResultObject & {
const auto length = work_package.size;
std::vector<double> bucket_writer(
this->data_count + (with_interp ? input_key_elements.size() : 0));
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++) {
// point to current grid cell
auto &key_vector = dht_results.keys[i];
// overwrite input with data from DHT, IF value is found in DHT
int res =
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
switch (res) {
case DHT_SUCCESS:
work_package.output[i] =
(with_interp
? inputAndRatesToOutput(bucket_writer, work_package.input[i])
: bucket_writer);
work_package.mapping[i] = CHEM_DHT;
this->dht_hits++;
break;
case DHT_READ_MISS:
break;
auto tot_h = std::find(key_names.begin(), key_names.end(), "H");
if (tot_h != key_names.end())
{
this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL;
}
auto tot_o = std::find(key_names.begin(), key_names.end(), "O");
if (tot_o != key_names.end())
{
this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL;
}
auto charge = std::find(key_names.begin(), key_names.end(), "Charge");
if (charge != key_names.end())
{
this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE;
}
}
return dht_results;
}
DHT_Wrapper::~DHT_Wrapper()
{
// free DHT
DHT_free(dht_object, NULL, NULL);
}
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
-> const DHT_ResultObject &
{
void DHT_Wrapper::fillDHT(const WorkPackage &work_package) {
const auto length = work_package.size;
const auto length = work_package.size;
std::vector<double> bucket_writer(
this->data_count + (with_interp ? input_key_elements.size() : 0));
// loop over every grid cell contained in work package
dht_results.locations.resize(length);
dht_results.filledDHT = std::vector<bool>(length, false);
for (int i = 0; i < length; i++) {
// If true grid cell was simulated, needs to be inserted into dht
if (work_package.mapping[i] != CHEM_PQC) {
continue;
}
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++)
{
// point to current grid cell
auto &key_vector = dht_results.keys[i];
if (work_package.input[i][0] != 2) {
continue;
}
// overwrite input with data from DHT, IF value is found in DHT
int res =
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
// check if calcite or dolomite is absent and present, resp.n and vice
// versa in input/output. If this is the case -> Do not write to DHT!
// HACK: hardcoded, should be fixed!
if (hooks.dht_fill.isValid()) {
NamedVector<double> old_values(output_names, work_package.input[i]);
NamedVector<double> new_values(output_names, work_package.output[i]);
if (Rcpp::as<bool>(hooks.dht_fill(old_values, new_values))) {
continue;
switch (res)
{
case DHT_SUCCESS:
work_package.output[i] =
(with_interp
? inputAndRatesToOutput(bucket_writer, work_package.input[i])
: bucket_writer);
work_package.mapping[i] = CHEM_DHT;
this->dht_hits++;
break;
case DHT_READ_MISS:
break;
}
}
uint32_t proc, index;
auto &key = dht_results.keys[i];
const auto data =
(with_interp ? outputToInputAndRates(work_package.input[i],
work_package.output[i])
: work_package.output[i]);
// void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
return dht_results;
}
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, key.data(),
const_cast<double *>(data.data()), &proc, &index);
void DHT_Wrapper::fillDHT(const WorkPackage &work_package)
{
dht_results.locations[i] = {proc, index};
const auto length = work_package.size;
// if data was successfully written ...
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
dht_evictions++;
// loop over every grid cell contained in work package
dht_results.locations.resize(length);
dht_results.filledDHT = std::vector<bool>(length, false);
for (int i = 0; i < length; i++)
{
// If true grid cell was simulated, needs to be inserted into dht
if (work_package.mapping[i] != CHEM_PQC)
{
continue;
}
if (work_package.input[i][1] != 2)
{
continue;
}
// check if calcite or dolomite is absent and present, resp.n and vice
// versa in input/output. If this is the case -> Do not write to DHT!
// HACK: hardcoded, should be fixed!
if (hooks.dht_fill.isValid())
{
NamedVector<double> old_values(output_names, work_package.input[i]);
NamedVector<double> new_values(output_names, work_package.output[i]);
if (Rcpp::as<bool>(hooks.dht_fill(old_values, new_values)))
{
continue;
}
}
uint32_t proc, index;
auto &key = dht_results.keys[i];
const auto data =
(with_interp ? outputToInputAndRates(work_package.input[i],
work_package.output[i])
: work_package.output[i]);
// void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, key.data(),
const_cast<double *>(data.data()), &proc, &index);
dht_results.locations[i] = {proc, index};
// if data was successfully written ...
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION))
{
dht_evictions++;
}
dht_results.filledDHT[i] = true;
}
}
inline std::vector<double>
DHT_Wrapper::outputToInputAndRates(const std::vector<double> &old_results,
const std::vector<double> &new_results)
{
const int prefix_size = this->input_key_elements.size();
std::vector<double> output(prefix_size + this->data_count);
std::copy(new_results.begin(), new_results.end(),
output.begin() + prefix_size);
for (int i = 0; i < prefix_size; i++)
{
const int data_elem_i = input_key_elements[i];
output[i] = old_results[data_elem_i];
output[prefix_size + data_elem_i] -= old_results[data_elem_i];
}
dht_results.filledDHT[i] = true;
}
}
inline std::vector<double>
DHT_Wrapper::outputToInputAndRates(const std::vector<double> &old_results,
const std::vector<double> &new_results) {
const int prefix_size = this->input_key_elements.size();
std::vector<double> output(prefix_size + this->data_count);
std::copy(new_results.begin(), new_results.end(),
output.begin() + prefix_size);
for (int i = 0; i < prefix_size; i++) {
const int data_elem_i = input_key_elements[i];
output[i] = old_results[data_elem_i];
output[prefix_size + data_elem_i] -= old_results[data_elem_i];
return output;
}
return output;
}
inline std::vector<double>
DHT_Wrapper::inputAndRatesToOutput(const std::vector<double> &dht_data,
const std::vector<double> &input_values)
{
const int prefix_size = this->input_key_elements.size();
inline std::vector<double>
DHT_Wrapper::inputAndRatesToOutput(const std::vector<double> &dht_data,
const std::vector<double> &input_values) {
const int prefix_size = this->input_key_elements.size();
std::vector<double> output(input_values);
std::vector<double> output(input_values);
for (int i = 0; i < prefix_size; i++)
{
const int data_elem_i = input_key_elements[i];
output[data_elem_i] += dht_data[i];
}
for (int i = 0; i < prefix_size; i++) {
const int data_elem_i = input_key_elements[i];
output[data_elem_i] += dht_data[i];
return output;
}
return output;
}
inline std::vector<double>
DHT_Wrapper::outputToRates(const std::vector<double> &old_results,
const std::vector<double> &new_results)
{
std::vector<double> output(new_results);
inline std::vector<double>
DHT_Wrapper::outputToRates(const std::vector<double> &old_results,
const std::vector<double> &new_results) {
std::vector<double> output(new_results);
for (const auto &data_elem_i : input_key_elements)
{
output[data_elem_i] -= old_results[data_elem_i];
}
for (const auto &data_elem_i : input_key_elements) {
output[data_elem_i] -= old_results[data_elem_i];
return output;
}
return output;
}
inline std::vector<double>
DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
const std::vector<double> &input_values)
{
std::vector<double> output(input_values);
inline std::vector<double>
DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
const std::vector<double> &input_values) {
std::vector<double> output(input_values);
for (const auto &data_elem_i : input_key_elements)
{
output[data_elem_i] += dht_data[data_elem_i];
}
for (const auto &data_elem_i : input_key_elements) {
output[data_elem_i] += dht_data[data_elem_i];
return output;
}
return output;
}
// void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
// for (int i = 0; i < dht_results.length; i++) {
// if (!dht_results.needPhreeqc[i]) {
// std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
// work_package.begin() + (data_count * i));
// }
// }
// }
// void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
// for (int i = 0; i < dht_results.length; i++) {
// if (!dht_results.needPhreeqc[i]) {
// std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
// work_package.begin() + (data_count * i));
// }
// }
// }
int DHT_Wrapper::tableToFile(const char *filename) {
int res = DHT_to_file(dht_object, filename);
return res;
}
int DHT_Wrapper::fileToTable(const char *filename) {
int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS)
int DHT_Wrapper::tableToFile(const char *filename)
{
int res = DHT_to_file(dht_object, filename);
return res;
}
int DHT_Wrapper::fileToTable(const char *filename)
{
int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS)
return res;
#ifdef DHT_STATISTICS
DHT_print_statistics(dht_object);
DHT_print_statistics(dht_object);
#endif
return DHT_SUCCESS;
}
void DHT_Wrapper::printStatistics() {
int res;
res = DHT_print_statistics(dht_object);
if (res != DHT_SUCCESS) {
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
return DHT_SUCCESS;
}
}
LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
double dt) {
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
void DHT_Wrapper::printStatistics()
{
int res;
NamedVector<double> input_nv(this->output_names, cell);
res = DHT_print_statistics(dht_object);
const std::vector<double> eval_vec =
Rcpp::as<std::vector<double>>(hooks.dht_fuzz(input_nv));
assert(eval_vec.size() == this->key_count);
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
if (res != DHT_SUCCESS)
{
// MPI ERROR ... WHAT TO DO NOW?
// RUNNING CIRCLES WHILE SCREAMING
}
}
DHT_Rounder rounder;
LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
double dt)
{
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
int totals_i = 0;
// introduce fuzzing to allow more hits in DHT
// loop over every variable of grid cell
for (std::uint32_t i = 0; i < eval_vec.size(); i++) {
double curr_key = eval_vec[i];
if (curr_key != 0) {
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) {
curr_key -= base_totals[totals_i++];
NamedVector<double> input_nv(this->output_names, cell);
const std::vector<double> eval_vec =
Rcpp::as<std::vector<double>>(hooks.dht_fuzz(input_nv));
assert(eval_vec.size() == this->key_count);
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
DHT_Rounder rounder;
int totals_i = 0;
// introduce fuzzing to allow more hits in DHT
// loop over every variable of grid cell
for (std::uint32_t i = 0; i < eval_vec.size(); i++)
{
double curr_key = eval_vec[i];
if (curr_key != 0)
{
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL)
{
curr_key -= base_totals[totals_i++];
}
vecFuzz[i] =
rounder.round(curr_key, dht_signif_vector[i],
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
}
vecFuzz[i] =
rounder.round(curr_key, dht_signif_vector[i],
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
}
}
// add timestep to the end of the key as double value
vecFuzz[this->key_count].fp_element = dt;
if (has_het_ids) {
vecFuzz[this->key_count + 1].fp_element = cell[0];
// add timestep to the end of the key as double value
vecFuzz[this->key_count].fp_element = dt;
if (has_het_ids)
{
vecFuzz[this->key_count + 1].fp_element = cell[0];
}
return vecFuzz;
}
return vecFuzz;
}
LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt)
{
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt) {
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
DHT_Rounder rounder;
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
DHT_Rounder rounder;
int totals_i = 0;
// introduce fuzzing to allow more hits in DHT
// loop over every variable of grid cell
for (std::uint32_t i = 0; i < input_key_elements.size(); i++) {
if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM) {
continue;
}
double curr_key = cell[input_key_elements[i]];
if (curr_key != 0) {
if (curr_key < c_zero_val &&
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) {
int totals_i = 0;
// introduce fuzzing to allow more hits in DHT
// loop over every variable of grid cell
for (std::uint32_t i = 0; i < input_key_elements.size(); i++)
{
if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM)
{
continue;
}
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) {
curr_key -= base_totals[totals_i++];
double curr_key = cell[input_key_elements[i]];
if (curr_key != 0)
{
if (curr_key < c_zero_val &&
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT)
{
continue;
}
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL)
{
curr_key -= base_totals[totals_i++];
}
vecFuzz[i] =
rounder.round(curr_key, dht_signif_vector[i],
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
}
vecFuzz[i] =
rounder.round(curr_key, dht_signif_vector[i],
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
}
}
// add timestep to the end of the key as double value
vecFuzz[this->key_count].fp_element = dt;
if (has_het_ids) {
vecFuzz[this->key_count + 1].fp_element = cell[0];
// add timestep to the end of the key as double value
vecFuzz[this->key_count].fp_element = dt;
if (has_het_ids)
{
vecFuzz[this->key_count + 1].fp_element = cell[0];
}
return vecFuzz;
}
return vecFuzz;
}
void poet::DHT_Wrapper::SetSignifVector(std::vector<uint32_t> signif_vec)
{
if (signif_vec.size() != this->key_count)
{
throw std::runtime_error(
"Significant vector size mismatches count of key elements.");
}
void poet::DHT_Wrapper::SetSignifVector(std::vector<uint32_t> signif_vec) {
if (signif_vec.size() != this->key_count) {
throw std::runtime_error(
"Significant vector size mismatches count of key elements.");
this->dht_signif_vector = signif_vec;
}
this->dht_signif_vector = signif_vec;
}
} // namespace poet

View File

@ -25,152 +25,175 @@
#include <utility>
#include <vector>
extern "C" {
extern "C"
{
#include "DHT.h"
}
namespace poet {
namespace poet
{
InterpolationModule::InterpolationModule(
std::uint32_t entries_per_bucket, std::uint64_t size_per_process,
std::uint32_t min_entries_needed, DHT_Wrapper &dht,
const NamedVector<std::uint32_t> &interp_key_signifs,
const std::vector<std::int32_t> &dht_key_indices,
const std::vector<std::string> &_out_names,
const InitialList::ChemistryHookFunctions &_hooks)
: dht_instance(dht), key_signifs(interp_key_signifs),
key_indices(dht_key_indices), min_entries_needed(min_entries_needed),
dht_names(dht.getKeySpecies().getNames()), out_names(_out_names),
hooks(_hooks) {
InterpolationModule::InterpolationModule(
std::uint32_t entries_per_bucket, std::uint64_t size_per_process,
std::uint32_t min_entries_needed, DHT_Wrapper &dht,
const NamedVector<std::uint32_t> &interp_key_signifs,
const std::vector<std::int32_t> &dht_key_indices,
const std::vector<std::string> &_out_names,
const InitialList::ChemistryHookFunctions &_hooks)
: dht_instance(dht), key_signifs(interp_key_signifs),
key_indices(dht_key_indices), min_entries_needed(min_entries_needed),
dht_names(dht.getKeySpecies().getNames()), out_names(_out_names),
hooks(_hooks)
{
initPHT(this->key_signifs.size(), entries_per_bucket, size_per_process,
dht.getCommunicator());
initPHT(this->key_signifs.size(), entries_per_bucket, size_per_process,
dht.getCommunicator());
pht->setSourceDHT(dht.getDHT());
}
pht->setSourceDHT(dht.getDHT());
}
void InterpolationModule::initPHT(std::uint32_t key_count,
std::uint32_t entries_per_bucket,
std::uint32_t size_per_process,
MPI_Comm communicator) {
uint32_t key_size = key_count * sizeof(Lookup_Keyelement) + sizeof(double);
uint32_t data_size = sizeof(DHT_Location);
void InterpolationModule::initPHT(std::uint32_t key_count,
std::uint32_t entries_per_bucket,
std::uint32_t size_per_process,
MPI_Comm communicator)
{
uint32_t key_size = key_count * sizeof(Lookup_Keyelement) + sizeof(double);
uint32_t data_size = sizeof(DHT_Location);
pht = std::make_unique<ProximityHashTable>(
key_size, data_size, entries_per_bucket, size_per_process, communicator);
}
pht = std::make_unique<ProximityHashTable>(
key_size, data_size, entries_per_bucket, size_per_process, communicator);
}
void InterpolationModule::writePairs() {
const auto in = this->dht_instance.getDHTResults();
for (int i = 0; i < in.filledDHT.size(); i++) {
if (in.filledDHT[i]) {
const auto coarse_key = roundKey(in.keys[i]);
pht->writeLocationToPHT(coarse_key, in.locations[i]);
void InterpolationModule::writePairs()
{
const auto in = this->dht_instance.getDHTResults();
for (int i = 0; i < in.filledDHT.size(); i++)
{
if (in.filledDHT[i])
{
const auto coarse_key = roundKey(in.keys[i]);
pht->writeLocationToPHT(coarse_key, in.locations[i]);
}
}
}
}
void InterpolationModule::tryInterpolation(WorkPackage &work_package) {
interp_result.status.resize(work_package.size, NOT_NEEDED);
void InterpolationModule::tryInterpolation(WorkPackage &work_package)
{
interp_result.status.resize(work_package.size, NOT_NEEDED);
const auto dht_results = this->dht_instance.getDHTResults();
const auto dht_results = this->dht_instance.getDHTResults();
for (int wp_i = 0; wp_i < work_package.size; wp_i++) {
if (work_package.input[wp_i][0] != 2) {
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
if (work_package.mapping[wp_i] != CHEM_PQC) {
interp_result.status[wp_i] = NOT_NEEDED;
continue;
}
const auto rounded_key = roundKey(dht_results.keys[wp_i]);
auto pht_result =
pht->query(rounded_key, this->min_entries_needed,
dht_instance.getInputCount(), dht_instance.getOutputCount());
if (pht_result.size < this->min_entries_needed) {
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
if (hooks.interp_pre.isValid()) {
NamedVector<double> nv_in(this->out_names, work_package.input[wp_i]);
std::vector<int> rm_indices = Rcpp::as<std::vector<int>>(
hooks.interp_pre(nv_in, pht_result.in_values));
pht_result.size -= rm_indices.size();
if (pht_result.size < this->min_entries_needed) {
for (int wp_i = 0; wp_i < work_package.size; wp_i++)
{
if (work_package.input[wp_i][1] != 2)
{
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
for (const auto &index : rm_indices) {
pht_result.in_values.erase(
std::next(pht_result.in_values.begin(), index - 1));
pht_result.out_values.erase(
std::next(pht_result.out_values.begin(), index - 1));
if (work_package.mapping[wp_i] != CHEM_PQC)
{
interp_result.status[wp_i] = NOT_NEEDED;
continue;
}
}
#ifdef POET_PHT_ADD
this->pht->incrementReadCounter(roundKey(rounded_key));
#endif
const auto rounded_key = roundKey(dht_results.keys[wp_i]);
const int cell_id = static_cast<int>(work_package.input[wp_i][0]);
auto pht_result =
pht->query(rounded_key, this->min_entries_needed,
dht_instance.getInputCount(), dht_instance.getOutputCount());
if (!to_calc_cache.contains(cell_id)) {
const std::vector<std::int32_t> &to_calc = dht_instance.getKeyElements();
std::vector<std::int32_t> keep_indices;
if (pht_result.size < this->min_entries_needed)
{
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
for (std::size_t i = 0; i < to_calc.size(); i++) {
if (!std::isnan(work_package.input[wp_i][to_calc[i]])) {
keep_indices.push_back(to_calc[i]);
if (hooks.interp_pre.isValid())
{
NamedVector<double> nv_in(this->out_names, work_package.input[wp_i]);
std::vector<int> rm_indices = Rcpp::as<std::vector<int>>(
hooks.interp_pre(nv_in, pht_result.in_values));
pht_result.size -= rm_indices.size();
if (pht_result.size < this->min_entries_needed)
{
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
for (const auto &index : rm_indices)
{
pht_result.in_values.erase(
std::next(pht_result.in_values.begin(), index - 1));
pht_result.out_values.erase(
std::next(pht_result.out_values.begin(), index - 1));
}
}
to_calc_cache[cell_id] = keep_indices;
#ifdef POET_PHT_ADD
this->pht->incrementReadCounter(roundKey(rounded_key));
#endif
const int cell_id = static_cast<int>(work_package.input[wp_i][1]);
if (!to_calc_cache.contains(cell_id))
{
const std::vector<std::int32_t> &to_calc = dht_instance.getKeyElements();
std::vector<std::int32_t> keep_indices;
for (std::size_t i = 0; i < to_calc.size(); i++)
{
if (!std::isnan(work_package.input[wp_i][to_calc[i]]))
{
keep_indices.push_back(to_calc[i]);
}
}
to_calc_cache[cell_id] = keep_indices;
}
double start_fc = MPI_Wtime();
work_package.output[wp_i] =
f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i],
pht_result.in_values, pht_result.out_values);
if (hooks.interp_post.isValid())
{
NamedVector<double> nv_result(this->out_names, work_package.output[wp_i]);
if (Rcpp::as<bool>(hooks.interp_post(nv_result)))
{
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
}
// interp_result.results[i][0] = mean_water;
this->interp_t += MPI_Wtime() - start_fc;
this->interpolations++;
work_package.mapping[wp_i] = CHEM_INTERP;
interp_result.status[wp_i] = RES_OK;
}
}
double start_fc = MPI_Wtime();
work_package.output[wp_i] =
f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i],
pht_result.in_values, pht_result.out_values);
if (hooks.interp_post.isValid()) {
NamedVector<double> nv_result(this->out_names, work_package.output[wp_i]);
if (Rcpp::as<bool>(hooks.interp_post(nv_result))) {
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
void InterpolationModule::resultsToWP(std::vector<double> &work_package) const
{
for (uint32_t i = 0; i < interp_result.status.size(); i++)
{
if (interp_result.status[i] == RES_OK)
{
const std::size_t length =
interp_result.results[i].end() - interp_result.results[i].begin();
std::copy(interp_result.results[i].begin(),
interp_result.results[i].end(),
work_package.begin() + (length * i));
}
}
// interp_result.results[i][0] = mean_water;
this->interp_t += MPI_Wtime() - start_fc;
this->interpolations++;
work_package.mapping[wp_i] = CHEM_INTERP;
interp_result.status[wp_i] = RES_OK;
}
}
void InterpolationModule::resultsToWP(std::vector<double> &work_package) const {
for (uint32_t i = 0; i < interp_result.status.size(); i++) {
if (interp_result.status[i] == RES_OK) {
const std::size_t length =
interp_result.results[i].end() - interp_result.results[i].begin();
std::copy(interp_result.results[i].begin(),
interp_result.results[i].end(),
work_package.begin() + (length * i));
}
}
}
} // namespace poet

View File

@ -69,9 +69,14 @@ namespace poet
}
case CHEM_INTERP:
{
int interp_flag;
int interp_flag = 0;
int dht_fill_flag = 0;
ChemBCast(&interp_flag, 1, MPI_INT);
ChemBCast(&dht_fill_flag, 1, MPI_INT);
this->interp_enabled = (interp_flag == 1);
this->dht_fill_during_rollback = (dht_fill_flag == 1);
break;
}
case CHEM_WORK_LOOP:
@ -150,13 +155,14 @@ namespace poet
double dht_get_start, dht_get_end;
double phreeqc_time_start, phreeqc_time_end;
double dht_fill_start, dht_fill_end;
double ctrl_time_c, ctrl_time_d;
uint32_t iteration;
double dt;
double current_sim_time;
uint32_t wp_start_index;
int count = double_count;
bool control_iteration_active = false;
bool control_logic_enabled = false;
std::vector<double> mpi_buffer(count);
/* receive */
@ -183,7 +189,7 @@ namespace poet
// current work package start location in field
wp_start_index = mpi_buffer[count + 4];
control_iteration_active = (mpi_buffer[count + 5] == 1);
control_logic_enabled = (mpi_buffer[count + 5] == 1);
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
{
@ -229,7 +235,7 @@ namespace poet
poet::WorkPackage s_curr_wp_control = s_curr_wp;
if (control_iteration_active)
if (control_logic_enabled)
{
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++)
{
@ -240,12 +246,15 @@ namespace poet
phreeqc_time_start = MPI_Wtime();
WorkerRunWorkPackage(control_iteration_active ? s_curr_wp_control : s_curr_wp, current_sim_time, dt);
WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp, current_sim_time, dt);
phreeqc_time_end = MPI_Wtime();
if (control_iteration_active)
{
if (control_logic_enabled)
{
/* start time measurement for copying control workpackage */
ctrl_time_c = MPI_Wtime();
std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count;
mpi_buffer.resize(count + sur_wp_offset);
@ -275,6 +284,10 @@ namespace poet
}
count += sur_wp_offset;
/* end time measurement for copying control workpackage */
ctrl_time_d = MPI_Wtime();
timings.ctrl_t += ctrl_time_d - ctrl_time_c;
}
else
{
@ -288,14 +301,14 @@ namespace poet
/* send results to master */
MPI_Request send_req;
int mpi_tag = control_iteration_active ? LOOP_CTRL : LOOP_WORK;
int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK;
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req);
if (dht_enabled || interp_enabled)
if (dht_enabled || interp_enabled || dht_fill_during_rollback)
{
/* write results to DHT */
dht_fill_start = MPI_Wtime();
dht->fillDHT(control_iteration_active ? s_curr_wp_control : s_curr_wp);
dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp);
dht_fill_end = MPI_Wtime();
if (interp_enabled)
@ -306,7 +319,6 @@ namespace poet
}
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
}
@ -460,6 +472,12 @@ namespace poet
this->group_comm);
break;
}
case WORKER_CTRL_ITER:
{
MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
this->group_comm);
break;
}
case WORKER_DHT_GET:
{
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,

View File

@ -2,10 +2,11 @@
#include <fstream>
#include <iostream>
#include <string>
#include <iomanip> // for std::setw and std::setprecision
namespace poet
{
void writeStatsToCSV(const std::vector<ChemistryModule::error_stats> &all_stats,
void writeStatsToCSV(const std::vector<ChemistryModule::SimulationErrorStats> &all_stats,
const std::vector<std::string> &species_names,
const std::string &filename)
{
@ -17,21 +18,32 @@ namespace poet
}
// header
out << "Iteration, Species, MAPE, RRSME \n";
out << std::left << std::setw(15) << "Iteration"
<< std::setw(15) << "Rollback"
<< std::setw(15) << "Species"
<< std::setw(15) << "MAPE"
<< std::setw(15) << "RRSME" << "\n";
out << std::string(75, '-') << "\n"; // separator line
// data rows
for (size_t i = 0; i < all_stats.size(); ++i)
{
for (size_t j = 0; j < species_names.size(); ++j)
{
out << all_stats[i].iteration << ",\t"
<< species_names[j] << ",\t"
<< all_stats[i].mape[j] << ",\t"
<< all_stats[i].rrsme[j] << "\n";
out << std::left
<< std::setw(15) << all_stats[i].iteration
<< std::setw(15) << all_stats[i].rollback_count
<< std::setw(15) << species_names[j]
<< std::setw(15) << all_stats[i].mape[j]
<< std::setw(15) << all_stats[i].rrmse[j]
<< "\n";
}
out << std::endl;
out << "\n"; // blank line between iterations
}
out.close();
std::cout << "Stats written to " << filename << "\n";
}
} // namespace poet
}
// namespace poet

View File

@ -3,7 +3,7 @@
namespace poet
{
void writeStatsToCSV(const std::vector<ChemistryModule::error_stats> &all_stats,
void writeStatsToCSV(const std::vector<ChemistryModule::SimulationErrorStats> &all_stats,
const std::vector<std::string> &species_names,
const std::string &filename);
} // namespace poet

View File

@ -253,7 +253,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params)
try
{
Rcpp::List init_params_(ReadRObj_R(init_file));
params.init_params = init_params_;
@ -266,14 +265,12 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params)
params.timesteps =
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps"));
params.control_iteration =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("control_iteration"));
params.species_epsilon =
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("species_epsilon"));
params.penalty_iteration =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("penalty_iteration"));
params.max_penalty_iteration =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("max_penalty_iteration"));
params.control_interval =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("control_interval"));
params.checkpoint_interval =
Rcpp::as<uint32_t>(global_rt_setup->operator[]("checkpoint_interval"));
params.mape_threshold =
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("mape_threshold"));
}
catch (const std::exception &e)
{
@ -304,53 +301,38 @@ void call_master_iter_end(RInside &R, const Field &trans, const Field &chem)
*global_rt_setup = R["setup"];
}
bool checkAndRollback(ChemistryModule &chem, RuntimeParameters &params, uint32_t &iter)
bool triggerRollbackIfExceeded(ChemistryModule &chem, RuntimeParameters &params, uint32_t &current_iteration)
{
const std::vector<double> &latest_mape = chem.error_stats_history.back().mape;
const std::vector<double> &mape_values = chem.error_history.back().mape;
for (uint32_t j = 0; j < params.species_epsilon.size(); j++)
for (uint32_t i = 0; i < params.mape_threshold.size(); i++)
{
if (params.species_epsilon[j] < latest_mape[j] && latest_mape[j] != 0)
// Skip if no meaningful MAPE value
if(mape_values[i] == 0){
continue;
}
if (mape_values[i] > params.mape_threshold[i])
{
uint32_t rollback_iter = iter - (iter % params.control_iteration);
uint32_t rollback_iteration = ((current_iteration - 1) / params.checkpoint_interval) * params.checkpoint_interval;
std::cout << chem.getField().GetProps()[j] << " with a MAPE value of " << latest_mape[j] << " exceeds epsilon of "
<< params.species_epsilon[j] << "! " << std::endl;
MSG("[THRESHOLD EXCEEDED] " + chem.getField().GetProps()[i] + " has MAPE = " +
std::to_string(mape_values[i]) + " exceeding threshold = " + std::to_string(params.mape_threshold[i]) +
" → rolling back to iteration " + std::to_string(rollback_iteration));
Checkpoint_s checkpoint_read{.field = chem.getField()};
read_checkpoint("checkpoint" + std::to_string(rollback_iter) + ".hdf5", checkpoint_read);
iter = checkpoint_read.iteration;
read_checkpoint("checkpoint" + std::to_string(rollback_iteration) + ".hdf5", checkpoint_read);
current_iteration = checkpoint_read.iteration;
// Rollback happend
return true;
}
}
}
MSG("All spezies are below their threshold values");
MSG("All species are within their error thresholds.");
return false;
}
void updatePenaltyLogic(RuntimeParameters &params, bool roolback_happend)
{
if (roolback_happend)
{
params.rollback_simulation = true;
params.penalty_counter = params.penalty_iteration;
std::cout << "Penalty counter reset to: " << params.penalty_counter << std::endl;
MSG("Rollback! Penalty phase started for " + std::to_string(params.penalty_iteration) + " iterations.");
}
else
{
if (params.rollback_simulation && params.penalty_counter == 0)
{
params.rollback_simulation = false;
MSG("Penalty phase ended. Interpolation re-enabled.");
}
else if (!params.rollback_simulation)
{
params.penalty_iteration = std::min(params.penalty_iteration *= 2, params.max_penalty_iteration);
MSG("Stable surrogate phase detected. Penalty iteration doubled to " + std::to_string(params.penalty_iteration) + " iterations.");
}
}
}
static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters &params,
DiffusionModule &diffusion,
@ -367,21 +349,25 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters &params,
}
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
params.next_penalty_check = params.penalty_iteration;
/* SIMULATION LOOP */
double dSimTime{0};
double chkTime = 0.0;
for (uint32_t iter = 1; iter < maxiter + 1; iter++)
{
// Penalty countdown
if (params.rollback_simulation && params.penalty_counter > 0)
{
params.penalty_counter--;
std::cout << "Penalty counter: " << params.penalty_counter << std::endl;
// Rollback countdowm
if (params.rollback_enabled) {
if (params.sur_disabled_counter > 0) {
--params.sur_disabled_counter;
MSG("Rollback counter: " + std::to_string(params.sur_disabled_counter));
} else {
params.rollback_enabled = false;
}
}
params.control_iteration_active = (iter % params.control_iteration == 0 /* && iter != 0 */);
params.global_iter = iter;
params.control_interval_enabled = (iter % params.control_interval == 0);
double start_t = MPI_Wtime();
@ -495,20 +481,27 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters &params,
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
std::to_string(maxiter));
if (iter % params.control_iteration == 0)
{
writeStatsToCSV(chem.error_stats_history, chem.getField().GetProps(), "stats_overview");
double chk_start = MPI_Wtime();
if(iter % params.checkpoint_interval == 0){
MSG("Writing checkpoint of iteration " + std::to_string(iter));
write_checkpoint("checkpoint" + std::to_string(iter) + ".hdf5",
{.field = chem.getField(), .iteration = iter});
}
if (iter == params.next_penalty_check)
if (params.control_interval_enabled && !params.rollback_enabled)
{
bool roolback_happend = checkAndRollback(chem, params, iter);
updatePenaltyLogic(params, roolback_happend);
writeStatsToCSV(chem.error_history, chem.getField().GetProps(), "stats_overview");
params.next_penalty_check = iter + params.penalty_iteration;
if(triggerRollbackIfExceeded(chem, params, iter)){
params.rollback_enabled = true;
params.rollback_counter ++;
params.sur_disabled_counter = params.control_interval;
MSG("Interpolation disabled for the next " + std::to_string(params.control_interval) + ".");
}
}
double chk_end = MPI_Wtime();
chkTime += chk_end - chk_start;
// MSG();
} // END SIMULATION LOOP
@ -526,6 +519,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters &params,
Rcpp::List diffusion_profiling;
diffusion_profiling["simtime"] = diffusion.getTransportTime();
Rcpp::List ctrl_profiling;
ctrl_profiling["checkpointing_time"] = chkTime;
ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime();
ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime();
ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime();
ctrl_profiling["ctrl_logic_worker"] = Rcpp::wrap(chem.GetWorkerControlTimings());
if (params.use_dht)
{
chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
@ -554,6 +554,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters &params,
profiling["simtime"] = dSimTime;
profiling["chemistry"] = chem_profiling;
profiling["diffusion"] = diffusion_profiling;
profiling["ctrl_logic"] = ctrl_profiling;
chem.MasterLoopBreak();

View File

@ -38,10 +38,10 @@ static const inline std::string ai_surrogate_r_library =
R"(@R_AI_SURROGATE_LIB@)";
static const inline std::string r_runtime_parameters = "mysetup";
struct RuntimeParameters {
struct RuntimeParameters
{
std::string out_dir;
std::vector<double> timesteps;
std::vector<double> species_epsilon;
Rcpp::List init_params;
@ -52,13 +52,15 @@ struct RuntimeParameters {
bool print_progress = false;
std::uint32_t penalty_iteration = 0;
std::uint32_t max_penalty_iteration = 0;
std::uint32_t penalty_counter = 0;
std::uint32_t next_penalty_check = 0;
bool rollback_simulation = false;
bool control_iteration_active = false;
std::uint32_t control_iteration = 1;
bool rollback_enabled = false;
bool control_interval_enabled = false;
std::uint32_t global_iter = 0;
std::uint32_t sur_disabled_counter = 0;
std::uint32_t rollback_counter = 0;
std::uint32_t checkpoint_interval = 0;
std::uint32_t control_interval = 0;
std::vector<double> mape_threshold;
std::vector<double> rrmse_threshold;
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;