mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
Control component with minimum features
This commit is contained in:
parent
9f1d69982d
commit
9374b26773
@ -185,6 +185,13 @@ namespace poet
|
||||
*/
|
||||
auto GetMasterLoopTime() const { return this->send_recv_t; }
|
||||
|
||||
|
||||
auto GetMasterCtrlLogicTime() const { return this->ctrl_t; }
|
||||
|
||||
auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; }
|
||||
|
||||
auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; }
|
||||
|
||||
/**
|
||||
* **Master only** Collect and return all accumulated timings recorded by
|
||||
* workers to run Phreeqc simulation.
|
||||
@ -214,6 +221,8 @@ namespace poet
|
||||
*/
|
||||
std::vector<double> GetWorkerIdleTimings() const;
|
||||
|
||||
std::vector<double> GetWorkerControlTimings() const;
|
||||
|
||||
/**
|
||||
* **Master only** Collect and return DHT hits of all workers.
|
||||
*
|
||||
@ -262,24 +271,28 @@ namespace poet
|
||||
std::vector<int> ai_surrogate_validity_vector;
|
||||
|
||||
RuntimeParameters *runtime_params = nullptr;
|
||||
uint32_t control_iteration_counter = 0;
|
||||
|
||||
struct error_stats
|
||||
struct SimulationErrorStats
|
||||
{
|
||||
std::vector<double> mape;
|
||||
std::vector<double> rrsme;
|
||||
uint32_t iteration;
|
||||
std::vector<double> rrmse;
|
||||
uint32_t iteration; // iterations in simulation after rollbacks
|
||||
uint32_t rollback_count;
|
||||
|
||||
error_stats(size_t species_count, size_t iter)
|
||||
: mape(species_count, 0.0), rrsme(species_count, 0.0), iteration(iter) {}
|
||||
SimulationErrorStats(size_t species_count, uint32_t iter, uint32_t counter)
|
||||
: mape(species_count, 0.0),
|
||||
rrmse(species_count, 0.0),
|
||||
iteration(iter),
|
||||
rollback_count(counter){}
|
||||
};
|
||||
|
||||
std::vector<error_stats> error_stats_history;
|
||||
std::vector<SimulationErrorStats> error_history;
|
||||
|
||||
static void computeStats(const std::vector<double> &pqc_vector,
|
||||
const std::vector<double> &sur_vector,
|
||||
uint32_t size_per_prop, uint32_t species_count,
|
||||
error_stats &stats);
|
||||
static void computeSpeciesErrors(const std::vector<double> &reference_values,
|
||||
const std::vector<double> &surrogate_values,
|
||||
uint32_t size_per_prop,
|
||||
uint32_t species_count,
|
||||
SimulationErrorStats &species_error_stats);
|
||||
|
||||
protected:
|
||||
void initializeDHT(uint32_t size_mb,
|
||||
@ -319,6 +332,7 @@ namespace poet
|
||||
enum
|
||||
{
|
||||
WORKER_PHREEQC,
|
||||
WORKER_CTRL_ITER,
|
||||
WORKER_DHT_GET,
|
||||
WORKER_DHT_FILL,
|
||||
WORKER_IDLE,
|
||||
@ -342,6 +356,7 @@ namespace poet
|
||||
double dht_get = 0.;
|
||||
double dht_fill = 0.;
|
||||
double idle_t = 0.;
|
||||
double ctrl_t = 0.;
|
||||
};
|
||||
|
||||
struct worker_info_s
|
||||
@ -410,6 +425,7 @@ namespace poet
|
||||
|
||||
poet::DHT_Wrapper *dht = nullptr;
|
||||
|
||||
bool dht_fill_during_rollback{false};
|
||||
bool interp_enabled{false};
|
||||
std::unique_ptr<poet::InterpolationModule> interp;
|
||||
|
||||
@ -431,6 +447,10 @@ namespace poet
|
||||
double seq_t = 0.;
|
||||
double send_recv_t = 0.;
|
||||
|
||||
double ctrl_t = 0.;
|
||||
double bcast_ctrl_t = 0.;
|
||||
double recv_ctrl_t = 0.;
|
||||
|
||||
std::array<double, 2> base_totals{0};
|
||||
|
||||
bool print_progessbar{false};
|
||||
@ -449,7 +469,7 @@ namespace poet
|
||||
|
||||
std::unique_ptr<PhreeqcRunner> pqc_runner;
|
||||
|
||||
std::vector<double> sur_shuffled;
|
||||
std::vector<double> sur_shuffled;
|
||||
};
|
||||
} // namespace poet
|
||||
|
||||
|
||||
@ -41,6 +41,12 @@ std::vector<double> poet::ChemistryModule::GetWorkerPhreeqcTimings() const {
|
||||
return MasterGatherWorkerTimings(WORKER_PHREEQC);
|
||||
}
|
||||
|
||||
std::vector<double> poet::ChemistryModule::GetWorkerControlTimings() const {
|
||||
int type = CHEM_PERF;
|
||||
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
|
||||
return MasterGatherWorkerTimings(WORKER_CTRL_ITER);
|
||||
}
|
||||
|
||||
std::vector<double> poet::ChemistryModule::GetWorkerDHTGetTimings() const {
|
||||
int type = CHEM_PERF;
|
||||
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
|
||||
@ -160,35 +166,35 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerPHTCacheHits() const {
|
||||
return ret;
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::computeStats(const std::vector<double> &pqc_vector,
|
||||
const std::vector<double> &sur_vector,
|
||||
void poet::ChemistryModule::computeSpeciesErrors(const std::vector<double> &reference_values,
|
||||
const std::vector<double> &surrogate_values,
|
||||
uint32_t size_per_prop,
|
||||
uint32_t species_count,
|
||||
error_stats &stats) {
|
||||
SimulationErrorStats &species_error_stats) {
|
||||
for (uint32_t i = 0; i < species_count; ++i) {
|
||||
double err_sum = 0.0;
|
||||
double sqr_err_sum = 0.0;
|
||||
uint32_t base_idx = i * size_per_prop;
|
||||
|
||||
for (uint32_t j = 0; j < size_per_prop; ++j) {
|
||||
const double pqc_value = pqc_vector[base_idx + j];
|
||||
const double sur_value = sur_vector[base_idx + j];
|
||||
const double ref_value = reference_values[base_idx + j];
|
||||
const double sur_value = surrogate_values[base_idx + j];
|
||||
|
||||
if (pqc_value == 0.0) {
|
||||
if (ref_value == 0.0) {
|
||||
if (sur_value != 0.0) {
|
||||
err_sum += 1.0;
|
||||
sqr_err_sum += 1.0;
|
||||
}
|
||||
// Both zero: skip
|
||||
} else {
|
||||
double alpha = 1.0 - (sur_value / pqc_value);
|
||||
double alpha = 1.0 - (sur_value / ref_value);
|
||||
err_sum += std::abs(alpha);
|
||||
sqr_err_sum += alpha * alpha;
|
||||
}
|
||||
}
|
||||
|
||||
stats.mape[i] = 100.0 * (err_sum / size_per_prop);
|
||||
stats.rrsme[i] =
|
||||
species_error_stats.mape[i] = 100.0 * (err_sum / size_per_prop);
|
||||
species_error_stats.rrmse[i] =
|
||||
(size_per_prop > 0) ? std::sqrt(sqr_err_sum / size_per_prop) : 0.0;
|
||||
}
|
||||
}
|
||||
@ -264,7 +270,7 @@ inline void poet::ChemistryModule::MasterSendPkgs(
|
||||
worker_list_t &w_list, workpointer_t &work_pointer,
|
||||
workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs,
|
||||
int &free_workers, double dt, uint32_t iteration,
|
||||
uint32_t control_iteration, const std::vector<uint32_t> &wp_sizes_vector) {
|
||||
uint32_t control_interval, const std::vector<uint32_t> &wp_sizes_vector) {
|
||||
/* declare variables */
|
||||
int local_work_package_size;
|
||||
|
||||
@ -305,7 +311,7 @@ inline void poet::ChemistryModule::MasterSendPkgs(
|
||||
std::next(wp_sizes_vector.begin(), count_pkgs), 0);
|
||||
send_buffer[end_of_wp + 4] = wp_start_index;
|
||||
// whether this iteration is a control iteration
|
||||
send_buffer[end_of_wp + 5] = control_iteration;
|
||||
send_buffer[end_of_wp + 5] = control_interval;
|
||||
|
||||
/* ATTENTION Worker p has rank p+1 */
|
||||
// MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
|
||||
@ -329,6 +335,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
|
||||
int need_to_receive = 1;
|
||||
double idle_a, idle_b;
|
||||
int p, size;
|
||||
double recv_a, recv_b;
|
||||
|
||||
MPI_Status probe_status;
|
||||
// master_recv_a = MPI_Wtime();
|
||||
@ -361,6 +368,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
|
||||
free_workers++;
|
||||
}
|
||||
if (probe_status.MPI_TAG == LOOP_CTRL) {
|
||||
recv_a = MPI_Wtime();
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
|
||||
|
||||
// layout of buffer is [phreeqc][surrogate]
|
||||
@ -378,6 +386,8 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
|
||||
w_list[p - 1].has_work = 0;
|
||||
pkg_to_recv -= 1;
|
||||
free_workers++;
|
||||
recv_b = MPI_Wtime();
|
||||
this->recv_ctrl_t += recv_b - recv_a;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -432,6 +442,10 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
int i_pkgs;
|
||||
int ftype;
|
||||
|
||||
double ctrl_a, ctrl_b;
|
||||
double worker_ctrl_a, worker_ctrl_b;
|
||||
double ctrl_bcast_a, ctrl_bcast_b;
|
||||
|
||||
const std::vector<uint32_t> wp_sizes_vector =
|
||||
CalculateWPSizesVector(this->n_cells, this->wp_size);
|
||||
|
||||
@ -445,28 +459,44 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
MPI_INT);
|
||||
}
|
||||
|
||||
ftype = CHEM_WORK_LOOP;
|
||||
PropagateFunctionType(ftype);
|
||||
/* start time measurement of broadcasting interpolation status */
|
||||
ctrl_bcast_a = MPI_Wtime();
|
||||
|
||||
ftype = CHEM_INTERP;
|
||||
PropagateFunctionType(ftype);
|
||||
|
||||
if(this->runtime_params->rollback_simulation){
|
||||
int interp_flag = 0;
|
||||
int dht_fill_flag = 0;
|
||||
|
||||
if(this->runtime_params->rollback_enabled){
|
||||
this->interp_enabled = false;
|
||||
int interp_flag = 0;
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
} else {
|
||||
this->interp_enabled = true;
|
||||
int interp_flag = 1;
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
this->dht_fill_during_rollback = true;
|
||||
interp_flag = 0;
|
||||
dht_fill_flag = 1;
|
||||
}
|
||||
else {
|
||||
this->interp_enabled = true;
|
||||
this->dht_fill_during_rollback = false;
|
||||
interp_flag = 1;
|
||||
dht_fill_flag = 0;
|
||||
}
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
ChemBCast(&dht_fill_flag, 1, MPI_INT);
|
||||
|
||||
/* end time measurement of broadcasting interpolation status */
|
||||
ctrl_bcast_b = MPI_Wtime();
|
||||
this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a;
|
||||
|
||||
ftype = CHEM_WORK_LOOP;
|
||||
PropagateFunctionType(ftype);
|
||||
|
||||
MPI_Barrier(this->group_comm);
|
||||
|
||||
static uint32_t iteration = 0;
|
||||
uint32_t control_iteration = static_cast<uint32_t>(
|
||||
this->runtime_params->control_iteration_active ? 1 : 0);
|
||||
if (control_iteration) {
|
||||
|
||||
uint32_t control_logic_enabled = this->runtime_params->control_interval_enabled ? 1 : 0;
|
||||
|
||||
if (control_logic_enabled) {
|
||||
sur_shuffled.clear();
|
||||
sur_shuffled.reserve(this->n_cells * this->prop_count);
|
||||
}
|
||||
@ -512,7 +542,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
if (pkg_to_send > 0) {
|
||||
// send packages to all free workers ...
|
||||
MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send,
|
||||
i_pkgs, free_workers, dt, iteration, control_iteration,
|
||||
i_pkgs, free_workers, dt, iteration, control_logic_enabled,
|
||||
wp_sizes_vector);
|
||||
}
|
||||
// ... and try to receive them from workers who has finished their work
|
||||
@ -522,39 +552,43 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
// Just to complete the progressbar
|
||||
std::cout << std::endl;
|
||||
|
||||
/* stop time measurement of chemistry time needed for send/recv loop */
|
||||
worker_chemistry_b = MPI_Wtime();
|
||||
this->send_recv_t += worker_chemistry_b - worker_chemistry_a;
|
||||
/* stop time measurement of chemistry time needed for send/recv loop */
|
||||
worker_chemistry_b = MPI_Wtime();
|
||||
this->send_recv_t += worker_chemistry_b - worker_chemistry_a;
|
||||
|
||||
/* start time measurement of sequential part */
|
||||
seq_c = MPI_Wtime();
|
||||
/* start time measurement of sequential part */
|
||||
seq_c = MPI_Wtime();
|
||||
|
||||
/* unshuffle grid */
|
||||
// grid.importAndUnshuffle(mpi_buffer);
|
||||
std::vector<double> out_vec{mpi_buffer};
|
||||
unshuffleField(mpi_buffer, this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size(), out_vec);
|
||||
chem_field = out_vec;
|
||||
/* unshuffle grid */
|
||||
// grid.importAndUnshuffle(mpi_buffer);
|
||||
std::vector<double> out_vec{mpi_buffer};
|
||||
unshuffleField(mpi_buffer, this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size(), out_vec);
|
||||
chem_field = out_vec;
|
||||
|
||||
/* do master stuff */
|
||||
/* do master stuff */
|
||||
|
||||
if (control_iteration) {
|
||||
control_iteration_counter++;
|
||||
/* start time measurement of control logic */
|
||||
ctrl_a = MPI_Wtime();
|
||||
|
||||
std::vector<double> sur_unshuffled{sur_shuffled};
|
||||
if (control_logic_enabled && !this->runtime_params->rollback_enabled) {
|
||||
|
||||
unshuffleField(sur_shuffled, this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size(), sur_unshuffled);
|
||||
std::vector<double> sur_unshuffled{sur_shuffled};;
|
||||
|
||||
error_stats stats(this->prop_count, control_iteration_counter *
|
||||
runtime_params->control_iteration);
|
||||
unshuffleField(sur_shuffled, this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size(), sur_unshuffled);
|
||||
|
||||
computeStats(out_vec, sur_unshuffled, this->n_cells, this->prop_count,
|
||||
stats);
|
||||
error_stats_history.push_back(stats);
|
||||
SimulationErrorStats stats(this->prop_count, this->runtime_params->global_iter, this->runtime_params->rollback_counter);
|
||||
|
||||
computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells, this->prop_count, stats);
|
||||
|
||||
error_history.push_back(stats);
|
||||
}
|
||||
|
||||
/* end time measurement of control logic */
|
||||
ctrl_b = MPI_Wtime();
|
||||
this->ctrl_t += ctrl_b - ctrl_a;
|
||||
|
||||
// to do: control values to epsilon
|
||||
}
|
||||
|
||||
/* start time measurement of master chemistry */
|
||||
sim_e_chemistry = MPI_Wtime();
|
||||
|
||||
@ -36,315 +36,357 @@
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace poet {
|
||||
namespace poet
|
||||
{
|
||||
|
||||
DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
const NamedVector<std::uint32_t> &key_species,
|
||||
const std::vector<std::int32_t> &key_indices,
|
||||
const std::vector<std::string> &_output_names,
|
||||
const InitialList::ChemistryHookFunctions &_hooks,
|
||||
uint32_t data_count, bool _with_interp,
|
||||
bool _has_het_ids)
|
||||
: key_count(key_indices.size()), data_count(data_count),
|
||||
input_key_elements(key_indices), communicator(dht_comm),
|
||||
key_species(key_species), output_names(_output_names), hooks(_hooks),
|
||||
with_interp(_with_interp), has_het_ids(_has_het_ids) {
|
||||
// initialize DHT object
|
||||
// key size = count of key elements + timestep
|
||||
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
|
||||
uint32_t data_size =
|
||||
(data_count + (with_interp ? input_key_elements.size() : 0)) *
|
||||
sizeof(double);
|
||||
uint32_t buckets_per_process =
|
||||
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
|
||||
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
|
||||
&poet::Murmur2_64A);
|
||||
DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
const NamedVector<std::uint32_t> &key_species,
|
||||
const std::vector<std::int32_t> &key_indices,
|
||||
const std::vector<std::string> &_output_names,
|
||||
const InitialList::ChemistryHookFunctions &_hooks,
|
||||
uint32_t data_count, bool _with_interp,
|
||||
bool _has_het_ids)
|
||||
: key_count(key_indices.size()), data_count(data_count),
|
||||
input_key_elements(key_indices), communicator(dht_comm),
|
||||
key_species(key_species), output_names(_output_names), hooks(_hooks),
|
||||
with_interp(_with_interp), has_het_ids(_has_het_ids)
|
||||
{
|
||||
// initialize DHT object
|
||||
// key size = count of key elements + timestep
|
||||
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
|
||||
uint32_t data_size =
|
||||
(data_count + (with_interp ? input_key_elements.size() : 0)) *
|
||||
sizeof(double);
|
||||
uint32_t buckets_per_process =
|
||||
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
|
||||
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
|
||||
&poet::Murmur2_64A);
|
||||
|
||||
dht_signif_vector = key_species.getValues();
|
||||
dht_signif_vector = key_species.getValues();
|
||||
|
||||
// this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
|
||||
// this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
|
||||
|
||||
this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT);
|
||||
this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT);
|
||||
|
||||
const auto key_names = key_species.getNames();
|
||||
const auto key_names = key_species.getNames();
|
||||
|
||||
auto tot_h = std::find(key_names.begin(), key_names.end(), "H");
|
||||
if (tot_h != key_names.end()) {
|
||||
this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL;
|
||||
}
|
||||
auto tot_o = std::find(key_names.begin(), key_names.end(), "O");
|
||||
if (tot_o != key_names.end()) {
|
||||
this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL;
|
||||
}
|
||||
auto charge = std::find(key_names.begin(), key_names.end(), "Charge");
|
||||
if (charge != key_names.end()) {
|
||||
this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE;
|
||||
}
|
||||
}
|
||||
|
||||
DHT_Wrapper::~DHT_Wrapper() {
|
||||
// free DHT
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
}
|
||||
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
|
||||
-> const DHT_ResultObject & {
|
||||
|
||||
const auto length = work_package.size;
|
||||
|
||||
std::vector<double> bucket_writer(
|
||||
this->data_count + (with_interp ? input_key_elements.size() : 0));
|
||||
|
||||
// loop over every grid cell contained in work package
|
||||
for (int i = 0; i < length; i++) {
|
||||
// point to current grid cell
|
||||
auto &key_vector = dht_results.keys[i];
|
||||
|
||||
// overwrite input with data from DHT, IF value is found in DHT
|
||||
int res =
|
||||
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
|
||||
|
||||
switch (res) {
|
||||
case DHT_SUCCESS:
|
||||
work_package.output[i] =
|
||||
(with_interp
|
||||
? inputAndRatesToOutput(bucket_writer, work_package.input[i])
|
||||
: bucket_writer);
|
||||
work_package.mapping[i] = CHEM_DHT;
|
||||
this->dht_hits++;
|
||||
break;
|
||||
case DHT_READ_MISS:
|
||||
break;
|
||||
auto tot_h = std::find(key_names.begin(), key_names.end(), "H");
|
||||
if (tot_h != key_names.end())
|
||||
{
|
||||
this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL;
|
||||
}
|
||||
auto tot_o = std::find(key_names.begin(), key_names.end(), "O");
|
||||
if (tot_o != key_names.end())
|
||||
{
|
||||
this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL;
|
||||
}
|
||||
auto charge = std::find(key_names.begin(), key_names.end(), "Charge");
|
||||
if (charge != key_names.end())
|
||||
{
|
||||
this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE;
|
||||
}
|
||||
}
|
||||
|
||||
return dht_results;
|
||||
}
|
||||
DHT_Wrapper::~DHT_Wrapper()
|
||||
{
|
||||
// free DHT
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
}
|
||||
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
|
||||
-> const DHT_ResultObject &
|
||||
{
|
||||
|
||||
void DHT_Wrapper::fillDHT(const WorkPackage &work_package) {
|
||||
const auto length = work_package.size;
|
||||
|
||||
const auto length = work_package.size;
|
||||
std::vector<double> bucket_writer(
|
||||
this->data_count + (with_interp ? input_key_elements.size() : 0));
|
||||
|
||||
// loop over every grid cell contained in work package
|
||||
dht_results.locations.resize(length);
|
||||
dht_results.filledDHT = std::vector<bool>(length, false);
|
||||
for (int i = 0; i < length; i++) {
|
||||
// If true grid cell was simulated, needs to be inserted into dht
|
||||
if (work_package.mapping[i] != CHEM_PQC) {
|
||||
continue;
|
||||
}
|
||||
// loop over every grid cell contained in work package
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
// point to current grid cell
|
||||
auto &key_vector = dht_results.keys[i];
|
||||
|
||||
if (work_package.input[i][0] != 2) {
|
||||
continue;
|
||||
}
|
||||
// overwrite input with data from DHT, IF value is found in DHT
|
||||
int res =
|
||||
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
|
||||
|
||||
// check if calcite or dolomite is absent and present, resp.n and vice
|
||||
// versa in input/output. If this is the case -> Do not write to DHT!
|
||||
// HACK: hardcoded, should be fixed!
|
||||
if (hooks.dht_fill.isValid()) {
|
||||
NamedVector<double> old_values(output_names, work_package.input[i]);
|
||||
NamedVector<double> new_values(output_names, work_package.output[i]);
|
||||
|
||||
if (Rcpp::as<bool>(hooks.dht_fill(old_values, new_values))) {
|
||||
continue;
|
||||
switch (res)
|
||||
{
|
||||
case DHT_SUCCESS:
|
||||
work_package.output[i] =
|
||||
(with_interp
|
||||
? inputAndRatesToOutput(bucket_writer, work_package.input[i])
|
||||
: bucket_writer);
|
||||
work_package.mapping[i] = CHEM_DHT;
|
||||
this->dht_hits++;
|
||||
break;
|
||||
case DHT_READ_MISS:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t proc, index;
|
||||
auto &key = dht_results.keys[i];
|
||||
const auto data =
|
||||
(with_interp ? outputToInputAndRates(work_package.input[i],
|
||||
work_package.output[i])
|
||||
: work_package.output[i]);
|
||||
// void *data = (void *)&(work_package[i * this->data_count]);
|
||||
// fuzz data (round, logarithm etc.)
|
||||
return dht_results;
|
||||
}
|
||||
|
||||
// insert simulated data with fuzzed key into DHT
|
||||
int res = DHT_write(this->dht_object, key.data(),
|
||||
const_cast<double *>(data.data()), &proc, &index);
|
||||
void DHT_Wrapper::fillDHT(const WorkPackage &work_package)
|
||||
{
|
||||
|
||||
dht_results.locations[i] = {proc, index};
|
||||
const auto length = work_package.size;
|
||||
|
||||
// if data was successfully written ...
|
||||
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
|
||||
dht_evictions++;
|
||||
// loop over every grid cell contained in work package
|
||||
dht_results.locations.resize(length);
|
||||
dht_results.filledDHT = std::vector<bool>(length, false);
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
// If true grid cell was simulated, needs to be inserted into dht
|
||||
if (work_package.mapping[i] != CHEM_PQC)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (work_package.input[i][1] != 2)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// check if calcite or dolomite is absent and present, resp.n and vice
|
||||
// versa in input/output. If this is the case -> Do not write to DHT!
|
||||
// HACK: hardcoded, should be fixed!
|
||||
if (hooks.dht_fill.isValid())
|
||||
{
|
||||
NamedVector<double> old_values(output_names, work_package.input[i]);
|
||||
NamedVector<double> new_values(output_names, work_package.output[i]);
|
||||
|
||||
if (Rcpp::as<bool>(hooks.dht_fill(old_values, new_values)))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t proc, index;
|
||||
auto &key = dht_results.keys[i];
|
||||
const auto data =
|
||||
(with_interp ? outputToInputAndRates(work_package.input[i],
|
||||
work_package.output[i])
|
||||
: work_package.output[i]);
|
||||
// void *data = (void *)&(work_package[i * this->data_count]);
|
||||
// fuzz data (round, logarithm etc.)
|
||||
|
||||
// insert simulated data with fuzzed key into DHT
|
||||
int res = DHT_write(this->dht_object, key.data(),
|
||||
const_cast<double *>(data.data()), &proc, &index);
|
||||
|
||||
dht_results.locations[i] = {proc, index};
|
||||
|
||||
// if data was successfully written ...
|
||||
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION))
|
||||
{
|
||||
dht_evictions++;
|
||||
}
|
||||
|
||||
dht_results.filledDHT[i] = true;
|
||||
}
|
||||
}
|
||||
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::outputToInputAndRates(const std::vector<double> &old_results,
|
||||
const std::vector<double> &new_results)
|
||||
{
|
||||
const int prefix_size = this->input_key_elements.size();
|
||||
|
||||
std::vector<double> output(prefix_size + this->data_count);
|
||||
std::copy(new_results.begin(), new_results.end(),
|
||||
output.begin() + prefix_size);
|
||||
|
||||
for (int i = 0; i < prefix_size; i++)
|
||||
{
|
||||
const int data_elem_i = input_key_elements[i];
|
||||
output[i] = old_results[data_elem_i];
|
||||
output[prefix_size + data_elem_i] -= old_results[data_elem_i];
|
||||
}
|
||||
|
||||
dht_results.filledDHT[i] = true;
|
||||
}
|
||||
}
|
||||
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::outputToInputAndRates(const std::vector<double> &old_results,
|
||||
const std::vector<double> &new_results) {
|
||||
const int prefix_size = this->input_key_elements.size();
|
||||
|
||||
std::vector<double> output(prefix_size + this->data_count);
|
||||
std::copy(new_results.begin(), new_results.end(),
|
||||
output.begin() + prefix_size);
|
||||
|
||||
for (int i = 0; i < prefix_size; i++) {
|
||||
const int data_elem_i = input_key_elements[i];
|
||||
output[i] = old_results[data_elem_i];
|
||||
output[prefix_size + data_elem_i] -= old_results[data_elem_i];
|
||||
return output;
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::inputAndRatesToOutput(const std::vector<double> &dht_data,
|
||||
const std::vector<double> &input_values)
|
||||
{
|
||||
const int prefix_size = this->input_key_elements.size();
|
||||
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::inputAndRatesToOutput(const std::vector<double> &dht_data,
|
||||
const std::vector<double> &input_values) {
|
||||
const int prefix_size = this->input_key_elements.size();
|
||||
std::vector<double> output(input_values);
|
||||
|
||||
std::vector<double> output(input_values);
|
||||
for (int i = 0; i < prefix_size; i++)
|
||||
{
|
||||
const int data_elem_i = input_key_elements[i];
|
||||
output[data_elem_i] += dht_data[i];
|
||||
}
|
||||
|
||||
for (int i = 0; i < prefix_size; i++) {
|
||||
const int data_elem_i = input_key_elements[i];
|
||||
output[data_elem_i] += dht_data[i];
|
||||
return output;
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::outputToRates(const std::vector<double> &old_results,
|
||||
const std::vector<double> &new_results)
|
||||
{
|
||||
std::vector<double> output(new_results);
|
||||
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::outputToRates(const std::vector<double> &old_results,
|
||||
const std::vector<double> &new_results) {
|
||||
std::vector<double> output(new_results);
|
||||
for (const auto &data_elem_i : input_key_elements)
|
||||
{
|
||||
output[data_elem_i] -= old_results[data_elem_i];
|
||||
}
|
||||
|
||||
for (const auto &data_elem_i : input_key_elements) {
|
||||
output[data_elem_i] -= old_results[data_elem_i];
|
||||
return output;
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
|
||||
const std::vector<double> &input_values)
|
||||
{
|
||||
std::vector<double> output(input_values);
|
||||
|
||||
inline std::vector<double>
|
||||
DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
|
||||
const std::vector<double> &input_values) {
|
||||
std::vector<double> output(input_values);
|
||||
for (const auto &data_elem_i : input_key_elements)
|
||||
{
|
||||
output[data_elem_i] += dht_data[data_elem_i];
|
||||
}
|
||||
|
||||
for (const auto &data_elem_i : input_key_elements) {
|
||||
output[data_elem_i] += dht_data[data_elem_i];
|
||||
return output;
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
// void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
|
||||
// for (int i = 0; i < dht_results.length; i++) {
|
||||
// if (!dht_results.needPhreeqc[i]) {
|
||||
// std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
|
||||
// work_package.begin() + (data_count * i));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
|
||||
// for (int i = 0; i < dht_results.length; i++) {
|
||||
// if (!dht_results.needPhreeqc[i]) {
|
||||
// std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
|
||||
// work_package.begin() + (data_count * i));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
int DHT_Wrapper::tableToFile(const char *filename) {
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
return res;
|
||||
}
|
||||
|
||||
int DHT_Wrapper::fileToTable(const char *filename) {
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
if (res != DHT_SUCCESS)
|
||||
int DHT_Wrapper::tableToFile(const char *filename)
|
||||
{
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
return res;
|
||||
}
|
||||
|
||||
int DHT_Wrapper::fileToTable(const char *filename)
|
||||
{
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
if (res != DHT_SUCCESS)
|
||||
return res;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
DHT_print_statistics(dht_object);
|
||||
DHT_print_statistics(dht_object);
|
||||
#endif
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
void DHT_Wrapper::printStatistics() {
|
||||
int res;
|
||||
|
||||
res = DHT_print_statistics(dht_object);
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
|
||||
double dt) {
|
||||
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
|
||||
void DHT_Wrapper::printStatistics()
|
||||
{
|
||||
int res;
|
||||
|
||||
NamedVector<double> input_nv(this->output_names, cell);
|
||||
res = DHT_print_statistics(dht_object);
|
||||
|
||||
const std::vector<double> eval_vec =
|
||||
Rcpp::as<std::vector<double>>(hooks.dht_fuzz(input_nv));
|
||||
assert(eval_vec.size() == this->key_count);
|
||||
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
|
||||
if (res != DHT_SUCCESS)
|
||||
{
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
}
|
||||
|
||||
DHT_Rounder rounder;
|
||||
LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
|
||||
double dt)
|
||||
{
|
||||
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
|
||||
|
||||
int totals_i = 0;
|
||||
// introduce fuzzing to allow more hits in DHT
|
||||
// loop over every variable of grid cell
|
||||
for (std::uint32_t i = 0; i < eval_vec.size(); i++) {
|
||||
double curr_key = eval_vec[i];
|
||||
if (curr_key != 0) {
|
||||
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) {
|
||||
curr_key -= base_totals[totals_i++];
|
||||
NamedVector<double> input_nv(this->output_names, cell);
|
||||
|
||||
const std::vector<double> eval_vec =
|
||||
Rcpp::as<std::vector<double>>(hooks.dht_fuzz(input_nv));
|
||||
assert(eval_vec.size() == this->key_count);
|
||||
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
|
||||
|
||||
DHT_Rounder rounder;
|
||||
|
||||
int totals_i = 0;
|
||||
// introduce fuzzing to allow more hits in DHT
|
||||
// loop over every variable of grid cell
|
||||
for (std::uint32_t i = 0; i < eval_vec.size(); i++)
|
||||
{
|
||||
double curr_key = eval_vec[i];
|
||||
if (curr_key != 0)
|
||||
{
|
||||
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL)
|
||||
{
|
||||
curr_key -= base_totals[totals_i++];
|
||||
}
|
||||
vecFuzz[i] =
|
||||
rounder.round(curr_key, dht_signif_vector[i],
|
||||
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
|
||||
}
|
||||
vecFuzz[i] =
|
||||
rounder.round(curr_key, dht_signif_vector[i],
|
||||
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
|
||||
}
|
||||
}
|
||||
// add timestep to the end of the key as double value
|
||||
vecFuzz[this->key_count].fp_element = dt;
|
||||
if (has_het_ids) {
|
||||
vecFuzz[this->key_count + 1].fp_element = cell[0];
|
||||
// add timestep to the end of the key as double value
|
||||
vecFuzz[this->key_count].fp_element = dt;
|
||||
if (has_het_ids)
|
||||
{
|
||||
vecFuzz[this->key_count + 1].fp_element = cell[0];
|
||||
}
|
||||
|
||||
return vecFuzz;
|
||||
}
|
||||
|
||||
return vecFuzz;
|
||||
}
|
||||
LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt)
|
||||
{
|
||||
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
|
||||
|
||||
LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt) {
|
||||
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
|
||||
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
|
||||
DHT_Rounder rounder;
|
||||
|
||||
LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
|
||||
DHT_Rounder rounder;
|
||||
|
||||
int totals_i = 0;
|
||||
// introduce fuzzing to allow more hits in DHT
|
||||
// loop over every variable of grid cell
|
||||
for (std::uint32_t i = 0; i < input_key_elements.size(); i++) {
|
||||
if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM) {
|
||||
continue;
|
||||
}
|
||||
double curr_key = cell[input_key_elements[i]];
|
||||
if (curr_key != 0) {
|
||||
if (curr_key < c_zero_val &&
|
||||
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) {
|
||||
int totals_i = 0;
|
||||
// introduce fuzzing to allow more hits in DHT
|
||||
// loop over every variable of grid cell
|
||||
for (std::uint32_t i = 0; i < input_key_elements.size(); i++)
|
||||
{
|
||||
if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) {
|
||||
curr_key -= base_totals[totals_i++];
|
||||
double curr_key = cell[input_key_elements[i]];
|
||||
if (curr_key != 0)
|
||||
{
|
||||
if (curr_key < c_zero_val &&
|
||||
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL)
|
||||
{
|
||||
curr_key -= base_totals[totals_i++];
|
||||
}
|
||||
vecFuzz[i] =
|
||||
rounder.round(curr_key, dht_signif_vector[i],
|
||||
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
|
||||
}
|
||||
vecFuzz[i] =
|
||||
rounder.round(curr_key, dht_signif_vector[i],
|
||||
this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL);
|
||||
}
|
||||
}
|
||||
// add timestep to the end of the key as double value
|
||||
vecFuzz[this->key_count].fp_element = dt;
|
||||
if (has_het_ids) {
|
||||
vecFuzz[this->key_count + 1].fp_element = cell[0];
|
||||
// add timestep to the end of the key as double value
|
||||
vecFuzz[this->key_count].fp_element = dt;
|
||||
if (has_het_ids)
|
||||
{
|
||||
vecFuzz[this->key_count + 1].fp_element = cell[0];
|
||||
}
|
||||
|
||||
return vecFuzz;
|
||||
}
|
||||
|
||||
return vecFuzz;
|
||||
}
|
||||
void poet::DHT_Wrapper::SetSignifVector(std::vector<uint32_t> signif_vec)
|
||||
{
|
||||
if (signif_vec.size() != this->key_count)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"Significant vector size mismatches count of key elements.");
|
||||
}
|
||||
|
||||
void poet::DHT_Wrapper::SetSignifVector(std::vector<uint32_t> signif_vec) {
|
||||
if (signif_vec.size() != this->key_count) {
|
||||
throw std::runtime_error(
|
||||
"Significant vector size mismatches count of key elements.");
|
||||
this->dht_signif_vector = signif_vec;
|
||||
}
|
||||
|
||||
this->dht_signif_vector = signif_vec;
|
||||
}
|
||||
} // namespace poet
|
||||
|
||||
@ -25,152 +25,175 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
extern "C"
|
||||
{
|
||||
#include "DHT.h"
|
||||
}
|
||||
|
||||
namespace poet {
|
||||
namespace poet
|
||||
{
|
||||
|
||||
InterpolationModule::InterpolationModule(
|
||||
std::uint32_t entries_per_bucket, std::uint64_t size_per_process,
|
||||
std::uint32_t min_entries_needed, DHT_Wrapper &dht,
|
||||
const NamedVector<std::uint32_t> &interp_key_signifs,
|
||||
const std::vector<std::int32_t> &dht_key_indices,
|
||||
const std::vector<std::string> &_out_names,
|
||||
const InitialList::ChemistryHookFunctions &_hooks)
|
||||
: dht_instance(dht), key_signifs(interp_key_signifs),
|
||||
key_indices(dht_key_indices), min_entries_needed(min_entries_needed),
|
||||
dht_names(dht.getKeySpecies().getNames()), out_names(_out_names),
|
||||
hooks(_hooks) {
|
||||
InterpolationModule::InterpolationModule(
|
||||
std::uint32_t entries_per_bucket, std::uint64_t size_per_process,
|
||||
std::uint32_t min_entries_needed, DHT_Wrapper &dht,
|
||||
const NamedVector<std::uint32_t> &interp_key_signifs,
|
||||
const std::vector<std::int32_t> &dht_key_indices,
|
||||
const std::vector<std::string> &_out_names,
|
||||
const InitialList::ChemistryHookFunctions &_hooks)
|
||||
: dht_instance(dht), key_signifs(interp_key_signifs),
|
||||
key_indices(dht_key_indices), min_entries_needed(min_entries_needed),
|
||||
dht_names(dht.getKeySpecies().getNames()), out_names(_out_names),
|
||||
hooks(_hooks)
|
||||
{
|
||||
|
||||
initPHT(this->key_signifs.size(), entries_per_bucket, size_per_process,
|
||||
dht.getCommunicator());
|
||||
initPHT(this->key_signifs.size(), entries_per_bucket, size_per_process,
|
||||
dht.getCommunicator());
|
||||
|
||||
pht->setSourceDHT(dht.getDHT());
|
||||
}
|
||||
pht->setSourceDHT(dht.getDHT());
|
||||
}
|
||||
|
||||
void InterpolationModule::initPHT(std::uint32_t key_count,
|
||||
std::uint32_t entries_per_bucket,
|
||||
std::uint32_t size_per_process,
|
||||
MPI_Comm communicator) {
|
||||
uint32_t key_size = key_count * sizeof(Lookup_Keyelement) + sizeof(double);
|
||||
uint32_t data_size = sizeof(DHT_Location);
|
||||
void InterpolationModule::initPHT(std::uint32_t key_count,
|
||||
std::uint32_t entries_per_bucket,
|
||||
std::uint32_t size_per_process,
|
||||
MPI_Comm communicator)
|
||||
{
|
||||
uint32_t key_size = key_count * sizeof(Lookup_Keyelement) + sizeof(double);
|
||||
uint32_t data_size = sizeof(DHT_Location);
|
||||
|
||||
pht = std::make_unique<ProximityHashTable>(
|
||||
key_size, data_size, entries_per_bucket, size_per_process, communicator);
|
||||
}
|
||||
pht = std::make_unique<ProximityHashTable>(
|
||||
key_size, data_size, entries_per_bucket, size_per_process, communicator);
|
||||
}
|
||||
|
||||
void InterpolationModule::writePairs() {
|
||||
const auto in = this->dht_instance.getDHTResults();
|
||||
for (int i = 0; i < in.filledDHT.size(); i++) {
|
||||
if (in.filledDHT[i]) {
|
||||
const auto coarse_key = roundKey(in.keys[i]);
|
||||
pht->writeLocationToPHT(coarse_key, in.locations[i]);
|
||||
void InterpolationModule::writePairs()
|
||||
{
|
||||
const auto in = this->dht_instance.getDHTResults();
|
||||
for (int i = 0; i < in.filledDHT.size(); i++)
|
||||
{
|
||||
if (in.filledDHT[i])
|
||||
{
|
||||
const auto coarse_key = roundKey(in.keys[i]);
|
||||
pht->writeLocationToPHT(coarse_key, in.locations[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void InterpolationModule::tryInterpolation(WorkPackage &work_package) {
|
||||
interp_result.status.resize(work_package.size, NOT_NEEDED);
|
||||
void InterpolationModule::tryInterpolation(WorkPackage &work_package)
|
||||
{
|
||||
interp_result.status.resize(work_package.size, NOT_NEEDED);
|
||||
|
||||
const auto dht_results = this->dht_instance.getDHTResults();
|
||||
const auto dht_results = this->dht_instance.getDHTResults();
|
||||
|
||||
for (int wp_i = 0; wp_i < work_package.size; wp_i++) {
|
||||
if (work_package.input[wp_i][0] != 2) {
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (work_package.mapping[wp_i] != CHEM_PQC) {
|
||||
interp_result.status[wp_i] = NOT_NEEDED;
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto rounded_key = roundKey(dht_results.keys[wp_i]);
|
||||
|
||||
auto pht_result =
|
||||
pht->query(rounded_key, this->min_entries_needed,
|
||||
dht_instance.getInputCount(), dht_instance.getOutputCount());
|
||||
|
||||
if (pht_result.size < this->min_entries_needed) {
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hooks.interp_pre.isValid()) {
|
||||
NamedVector<double> nv_in(this->out_names, work_package.input[wp_i]);
|
||||
|
||||
std::vector<int> rm_indices = Rcpp::as<std::vector<int>>(
|
||||
hooks.interp_pre(nv_in, pht_result.in_values));
|
||||
|
||||
pht_result.size -= rm_indices.size();
|
||||
|
||||
if (pht_result.size < this->min_entries_needed) {
|
||||
for (int wp_i = 0; wp_i < work_package.size; wp_i++)
|
||||
{
|
||||
if (work_package.input[wp_i][1] != 2)
|
||||
{
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const auto &index : rm_indices) {
|
||||
pht_result.in_values.erase(
|
||||
std::next(pht_result.in_values.begin(), index - 1));
|
||||
pht_result.out_values.erase(
|
||||
std::next(pht_result.out_values.begin(), index - 1));
|
||||
if (work_package.mapping[wp_i] != CHEM_PQC)
|
||||
{
|
||||
interp_result.status[wp_i] = NOT_NEEDED;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef POET_PHT_ADD
|
||||
this->pht->incrementReadCounter(roundKey(rounded_key));
|
||||
#endif
|
||||
const auto rounded_key = roundKey(dht_results.keys[wp_i]);
|
||||
|
||||
const int cell_id = static_cast<int>(work_package.input[wp_i][0]);
|
||||
auto pht_result =
|
||||
pht->query(rounded_key, this->min_entries_needed,
|
||||
dht_instance.getInputCount(), dht_instance.getOutputCount());
|
||||
|
||||
if (!to_calc_cache.contains(cell_id)) {
|
||||
const std::vector<std::int32_t> &to_calc = dht_instance.getKeyElements();
|
||||
std::vector<std::int32_t> keep_indices;
|
||||
if (pht_result.size < this->min_entries_needed)
|
||||
{
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (std::size_t i = 0; i < to_calc.size(); i++) {
|
||||
if (!std::isnan(work_package.input[wp_i][to_calc[i]])) {
|
||||
keep_indices.push_back(to_calc[i]);
|
||||
if (hooks.interp_pre.isValid())
|
||||
{
|
||||
NamedVector<double> nv_in(this->out_names, work_package.input[wp_i]);
|
||||
|
||||
std::vector<int> rm_indices = Rcpp::as<std::vector<int>>(
|
||||
hooks.interp_pre(nv_in, pht_result.in_values));
|
||||
|
||||
pht_result.size -= rm_indices.size();
|
||||
|
||||
if (pht_result.size < this->min_entries_needed)
|
||||
{
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const auto &index : rm_indices)
|
||||
{
|
||||
pht_result.in_values.erase(
|
||||
std::next(pht_result.in_values.begin(), index - 1));
|
||||
pht_result.out_values.erase(
|
||||
std::next(pht_result.out_values.begin(), index - 1));
|
||||
}
|
||||
}
|
||||
|
||||
to_calc_cache[cell_id] = keep_indices;
|
||||
#ifdef POET_PHT_ADD
|
||||
this->pht->incrementReadCounter(roundKey(rounded_key));
|
||||
#endif
|
||||
|
||||
const int cell_id = static_cast<int>(work_package.input[wp_i][1]);
|
||||
|
||||
if (!to_calc_cache.contains(cell_id))
|
||||
{
|
||||
const std::vector<std::int32_t> &to_calc = dht_instance.getKeyElements();
|
||||
std::vector<std::int32_t> keep_indices;
|
||||
|
||||
for (std::size_t i = 0; i < to_calc.size(); i++)
|
||||
{
|
||||
if (!std::isnan(work_package.input[wp_i][to_calc[i]]))
|
||||
{
|
||||
keep_indices.push_back(to_calc[i]);
|
||||
}
|
||||
}
|
||||
|
||||
to_calc_cache[cell_id] = keep_indices;
|
||||
}
|
||||
|
||||
double start_fc = MPI_Wtime();
|
||||
|
||||
work_package.output[wp_i] =
|
||||
f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i],
|
||||
pht_result.in_values, pht_result.out_values);
|
||||
|
||||
if (hooks.interp_post.isValid())
|
||||
{
|
||||
NamedVector<double> nv_result(this->out_names, work_package.output[wp_i]);
|
||||
if (Rcpp::as<bool>(hooks.interp_post(nv_result)))
|
||||
{
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// interp_result.results[i][0] = mean_water;
|
||||
this->interp_t += MPI_Wtime() - start_fc;
|
||||
|
||||
this->interpolations++;
|
||||
|
||||
work_package.mapping[wp_i] = CHEM_INTERP;
|
||||
interp_result.status[wp_i] = RES_OK;
|
||||
}
|
||||
}
|
||||
|
||||
double start_fc = MPI_Wtime();
|
||||
|
||||
work_package.output[wp_i] =
|
||||
f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i],
|
||||
pht_result.in_values, pht_result.out_values);
|
||||
|
||||
if (hooks.interp_post.isValid()) {
|
||||
NamedVector<double> nv_result(this->out_names, work_package.output[wp_i]);
|
||||
if (Rcpp::as<bool>(hooks.interp_post(nv_result))) {
|
||||
interp_result.status[wp_i] = INSUFFICIENT_DATA;
|
||||
continue;
|
||||
void InterpolationModule::resultsToWP(std::vector<double> &work_package) const
|
||||
{
|
||||
for (uint32_t i = 0; i < interp_result.status.size(); i++)
|
||||
{
|
||||
if (interp_result.status[i] == RES_OK)
|
||||
{
|
||||
const std::size_t length =
|
||||
interp_result.results[i].end() - interp_result.results[i].begin();
|
||||
std::copy(interp_result.results[i].begin(),
|
||||
interp_result.results[i].end(),
|
||||
work_package.begin() + (length * i));
|
||||
}
|
||||
}
|
||||
|
||||
// interp_result.results[i][0] = mean_water;
|
||||
this->interp_t += MPI_Wtime() - start_fc;
|
||||
|
||||
this->interpolations++;
|
||||
|
||||
work_package.mapping[wp_i] = CHEM_INTERP;
|
||||
interp_result.status[wp_i] = RES_OK;
|
||||
}
|
||||
}
|
||||
|
||||
void InterpolationModule::resultsToWP(std::vector<double> &work_package) const {
|
||||
for (uint32_t i = 0; i < interp_result.status.size(); i++) {
|
||||
if (interp_result.status[i] == RES_OK) {
|
||||
const std::size_t length =
|
||||
interp_result.results[i].end() - interp_result.results[i].begin();
|
||||
std::copy(interp_result.results[i].begin(),
|
||||
interp_result.results[i].end(),
|
||||
work_package.begin() + (length * i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace poet
|
||||
|
||||
@ -69,9 +69,14 @@ namespace poet
|
||||
}
|
||||
case CHEM_INTERP:
|
||||
{
|
||||
int interp_flag;
|
||||
int interp_flag = 0;
|
||||
int dht_fill_flag = 0;
|
||||
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
ChemBCast(&dht_fill_flag, 1, MPI_INT);
|
||||
|
||||
this->interp_enabled = (interp_flag == 1);
|
||||
this->dht_fill_during_rollback = (dht_fill_flag == 1);
|
||||
break;
|
||||
}
|
||||
case CHEM_WORK_LOOP:
|
||||
@ -150,13 +155,14 @@ namespace poet
|
||||
double dht_get_start, dht_get_end;
|
||||
double phreeqc_time_start, phreeqc_time_end;
|
||||
double dht_fill_start, dht_fill_end;
|
||||
double ctrl_time_c, ctrl_time_d;
|
||||
|
||||
uint32_t iteration;
|
||||
double dt;
|
||||
double current_sim_time;
|
||||
uint32_t wp_start_index;
|
||||
int count = double_count;
|
||||
bool control_iteration_active = false;
|
||||
bool control_logic_enabled = false;
|
||||
std::vector<double> mpi_buffer(count);
|
||||
|
||||
/* receive */
|
||||
@ -183,7 +189,7 @@ namespace poet
|
||||
// current work package start location in field
|
||||
wp_start_index = mpi_buffer[count + 4];
|
||||
|
||||
control_iteration_active = (mpi_buffer[count + 5] == 1);
|
||||
control_logic_enabled = (mpi_buffer[count + 5] == 1);
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
@ -229,7 +235,7 @@ namespace poet
|
||||
|
||||
poet::WorkPackage s_curr_wp_control = s_curr_wp;
|
||||
|
||||
if (control_iteration_active)
|
||||
if (control_logic_enabled)
|
||||
{
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++)
|
||||
{
|
||||
@ -240,12 +246,15 @@ namespace poet
|
||||
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
|
||||
WorkerRunWorkPackage(control_iteration_active ? s_curr_wp_control : s_curr_wp, current_sim_time, dt);
|
||||
WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp, current_sim_time, dt);
|
||||
|
||||
phreeqc_time_end = MPI_Wtime();
|
||||
|
||||
if (control_iteration_active)
|
||||
if (control_logic_enabled)
|
||||
{
|
||||
/* start time measurement for copying control workpackage */
|
||||
ctrl_time_c = MPI_Wtime();
|
||||
|
||||
std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count;
|
||||
|
||||
mpi_buffer.resize(count + sur_wp_offset);
|
||||
@ -275,6 +284,10 @@ namespace poet
|
||||
}
|
||||
|
||||
count += sur_wp_offset;
|
||||
|
||||
/* end time measurement for copying control workpackage */
|
||||
ctrl_time_d = MPI_Wtime();
|
||||
timings.ctrl_t += ctrl_time_d - ctrl_time_c;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -288,14 +301,14 @@ namespace poet
|
||||
/* send results to master */
|
||||
MPI_Request send_req;
|
||||
|
||||
int mpi_tag = control_iteration_active ? LOOP_CTRL : LOOP_WORK;
|
||||
int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK;
|
||||
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req);
|
||||
|
||||
if (dht_enabled || interp_enabled)
|
||||
if (dht_enabled || interp_enabled || dht_fill_during_rollback)
|
||||
{
|
||||
/* write results to DHT */
|
||||
dht_fill_start = MPI_Wtime();
|
||||
dht->fillDHT(control_iteration_active ? s_curr_wp_control : s_curr_wp);
|
||||
dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
if (interp_enabled)
|
||||
@ -306,7 +319,6 @@ namespace poet
|
||||
}
|
||||
|
||||
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
|
||||
|
||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
@ -460,6 +472,12 @@ namespace poet
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_CTRL_ITER:
|
||||
{
|
||||
MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_GET:
|
||||
{
|
||||
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
|
||||
@ -2,10 +2,11 @@
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <iomanip> // for std::setw and std::setprecision
|
||||
|
||||
namespace poet
|
||||
{
|
||||
void writeStatsToCSV(const std::vector<ChemistryModule::error_stats> &all_stats,
|
||||
void writeStatsToCSV(const std::vector<ChemistryModule::SimulationErrorStats> &all_stats,
|
||||
const std::vector<std::string> &species_names,
|
||||
const std::string &filename)
|
||||
{
|
||||
@ -17,21 +18,32 @@ namespace poet
|
||||
}
|
||||
|
||||
// header
|
||||
out << "Iteration, Species, MAPE, RRSME \n";
|
||||
out << std::left << std::setw(15) << "Iteration"
|
||||
<< std::setw(15) << "Rollback"
|
||||
<< std::setw(15) << "Species"
|
||||
<< std::setw(15) << "MAPE"
|
||||
<< std::setw(15) << "RRSME" << "\n";
|
||||
|
||||
out << std::string(75, '-') << "\n"; // separator line
|
||||
|
||||
// data rows
|
||||
for (size_t i = 0; i < all_stats.size(); ++i)
|
||||
{
|
||||
for (size_t j = 0; j < species_names.size(); ++j)
|
||||
{
|
||||
out << all_stats[i].iteration << ",\t"
|
||||
<< species_names[j] << ",\t"
|
||||
<< all_stats[i].mape[j] << ",\t"
|
||||
<< all_stats[i].rrsme[j] << "\n";
|
||||
out << std::left
|
||||
<< std::setw(15) << all_stats[i].iteration
|
||||
<< std::setw(15) << all_stats[i].rollback_count
|
||||
<< std::setw(15) << species_names[j]
|
||||
<< std::setw(15) << all_stats[i].mape[j]
|
||||
<< std::setw(15) << all_stats[i].rrmse[j]
|
||||
<< "\n";
|
||||
}
|
||||
out << std::endl;
|
||||
out << "\n"; // blank line between iterations
|
||||
}
|
||||
|
||||
out.close();
|
||||
std::cout << "Stats written to " << filename << "\n";
|
||||
}
|
||||
} // namespace poet
|
||||
}
|
||||
// namespace poet
|
||||
@ -3,7 +3,7 @@
|
||||
|
||||
namespace poet
|
||||
{
|
||||
void writeStatsToCSV(const std::vector<ChemistryModule::error_stats> &all_stats,
|
||||
void writeStatsToCSV(const std::vector<ChemistryModule::SimulationErrorStats> &all_stats,
|
||||
const std::vector<std::string> &species_names,
|
||||
const std::string &filename);
|
||||
} // namespace poet
|
||||
|
||||
116
src/poet.cpp
116
src/poet.cpp
@ -253,7 +253,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
Rcpp::List init_params_(ReadRObj_R(init_file));
|
||||
params.init_params = init_params_;
|
||||
|
||||
@ -266,14 +265,12 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
|
||||
params.timesteps =
|
||||
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("timesteps"));
|
||||
params.control_iteration =
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("control_iteration"));
|
||||
params.species_epsilon =
|
||||
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("species_epsilon"));
|
||||
params.penalty_iteration =
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("penalty_iteration"));
|
||||
params.max_penalty_iteration =
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("max_penalty_iteration"));
|
||||
params.control_interval =
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("control_interval"));
|
||||
params.checkpoint_interval =
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("checkpoint_interval"));
|
||||
params.mape_threshold =
|
||||
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("mape_threshold"));
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
@ -304,53 +301,38 @@ void call_master_iter_end(RInside &R, const Field &trans, const Field &chem)
|
||||
*global_rt_setup = R["setup"];
|
||||
}
|
||||
|
||||
bool checkAndRollback(ChemistryModule &chem, RuntimeParameters ¶ms, uint32_t &iter)
|
||||
bool triggerRollbackIfExceeded(ChemistryModule &chem, RuntimeParameters ¶ms, uint32_t ¤t_iteration)
|
||||
{
|
||||
const std::vector<double> &latest_mape = chem.error_stats_history.back().mape;
|
||||
const std::vector<double> &mape_values = chem.error_history.back().mape;
|
||||
|
||||
for (uint32_t j = 0; j < params.species_epsilon.size(); j++)
|
||||
for (uint32_t i = 0; i < params.mape_threshold.size(); i++)
|
||||
{
|
||||
if (params.species_epsilon[j] < latest_mape[j] && latest_mape[j] != 0)
|
||||
// Skip if no meaningful MAPE value
|
||||
if(mape_values[i] == 0){
|
||||
continue;
|
||||
}
|
||||
if (mape_values[i] > params.mape_threshold[i])
|
||||
{
|
||||
uint32_t rollback_iter = iter - (iter % params.control_iteration);
|
||||
uint32_t rollback_iteration = ((current_iteration - 1) / params.checkpoint_interval) * params.checkpoint_interval;
|
||||
|
||||
std::cout << chem.getField().GetProps()[j] << " with a MAPE value of " << latest_mape[j] << " exceeds epsilon of "
|
||||
<< params.species_epsilon[j] << "! " << std::endl;
|
||||
MSG("[THRESHOLD EXCEEDED] " + chem.getField().GetProps()[i] + " has MAPE = " +
|
||||
std::to_string(mape_values[i]) + " exceeding threshold = " + std::to_string(params.mape_threshold[i]) +
|
||||
" → rolling back to iteration " + std::to_string(rollback_iteration));
|
||||
|
||||
Checkpoint_s checkpoint_read{.field = chem.getField()};
|
||||
read_checkpoint("checkpoint" + std::to_string(rollback_iter) + ".hdf5", checkpoint_read);
|
||||
iter = checkpoint_read.iteration;
|
||||
read_checkpoint("checkpoint" + std::to_string(rollback_iteration) + ".hdf5", checkpoint_read);
|
||||
current_iteration = checkpoint_read.iteration;
|
||||
|
||||
// Rollback happend
|
||||
return true;
|
||||
}
|
||||
}
|
||||
MSG("All spezies are below their threshold values");
|
||||
|
||||
MSG("All species are within their error thresholds.");
|
||||
return false;
|
||||
}
|
||||
|
||||
void updatePenaltyLogic(RuntimeParameters ¶ms, bool roolback_happend)
|
||||
{
|
||||
if (roolback_happend)
|
||||
{
|
||||
params.rollback_simulation = true;
|
||||
params.penalty_counter = params.penalty_iteration;
|
||||
std::cout << "Penalty counter reset to: " << params.penalty_counter << std::endl;
|
||||
MSG("Rollback! Penalty phase started for " + std::to_string(params.penalty_iteration) + " iterations.");
|
||||
}
|
||||
else
|
||||
{
|
||||
if (params.rollback_simulation && params.penalty_counter == 0)
|
||||
{
|
||||
params.rollback_simulation = false;
|
||||
MSG("Penalty phase ended. Interpolation re-enabled.");
|
||||
}
|
||||
else if (!params.rollback_simulation)
|
||||
{
|
||||
params.penalty_iteration = std::min(params.penalty_iteration *= 2, params.max_penalty_iteration);
|
||||
MSG("Stable surrogate phase detected. Penalty iteration doubled to " + std::to_string(params.penalty_iteration) + " iterations.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
DiffusionModule &diffusion,
|
||||
@ -367,21 +349,25 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
}
|
||||
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
|
||||
|
||||
params.next_penalty_check = params.penalty_iteration;
|
||||
|
||||
/* SIMULATION LOOP */
|
||||
|
||||
double dSimTime{0};
|
||||
double chkTime = 0.0;
|
||||
|
||||
for (uint32_t iter = 1; iter < maxiter + 1; iter++)
|
||||
{
|
||||
// Penalty countdown
|
||||
if (params.rollback_simulation && params.penalty_counter > 0)
|
||||
{
|
||||
params.penalty_counter--;
|
||||
std::cout << "Penalty counter: " << params.penalty_counter << std::endl;
|
||||
// Rollback countdowm
|
||||
if (params.rollback_enabled) {
|
||||
if (params.sur_disabled_counter > 0) {
|
||||
--params.sur_disabled_counter;
|
||||
MSG("Rollback counter: " + std::to_string(params.sur_disabled_counter));
|
||||
} else {
|
||||
params.rollback_enabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
params.control_iteration_active = (iter % params.control_iteration == 0 /* && iter != 0 */);
|
||||
params.global_iter = iter;
|
||||
params.control_interval_enabled = (iter % params.control_interval == 0);
|
||||
|
||||
double start_t = MPI_Wtime();
|
||||
|
||||
@ -495,20 +481,27 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
|
||||
std::to_string(maxiter));
|
||||
|
||||
if (iter % params.control_iteration == 0)
|
||||
{
|
||||
writeStatsToCSV(chem.error_stats_history, chem.getField().GetProps(), "stats_overview");
|
||||
double chk_start = MPI_Wtime();
|
||||
|
||||
if(iter % params.checkpoint_interval == 0){
|
||||
MSG("Writing checkpoint of iteration " + std::to_string(iter));
|
||||
write_checkpoint("checkpoint" + std::to_string(iter) + ".hdf5",
|
||||
{.field = chem.getField(), .iteration = iter});
|
||||
}
|
||||
|
||||
if (iter == params.next_penalty_check)
|
||||
if (params.control_interval_enabled && !params.rollback_enabled)
|
||||
{
|
||||
bool roolback_happend = checkAndRollback(chem, params, iter);
|
||||
updatePenaltyLogic(params, roolback_happend);
|
||||
writeStatsToCSV(chem.error_history, chem.getField().GetProps(), "stats_overview");
|
||||
|
||||
params.next_penalty_check = iter + params.penalty_iteration;
|
||||
if(triggerRollbackIfExceeded(chem, params, iter)){
|
||||
params.rollback_enabled = true;
|
||||
params.rollback_counter ++;
|
||||
params.sur_disabled_counter = params.control_interval;
|
||||
MSG("Interpolation disabled for the next " + std::to_string(params.control_interval) + ".");
|
||||
}
|
||||
}
|
||||
double chk_end = MPI_Wtime();
|
||||
chkTime += chk_end - chk_start;
|
||||
|
||||
// MSG();
|
||||
} // END SIMULATION LOOP
|
||||
@ -526,6 +519,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
Rcpp::List diffusion_profiling;
|
||||
diffusion_profiling["simtime"] = diffusion.getTransportTime();
|
||||
|
||||
Rcpp::List ctrl_profiling;
|
||||
ctrl_profiling["checkpointing_time"] = chkTime;
|
||||
ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime();
|
||||
ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime();
|
||||
ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime();
|
||||
ctrl_profiling["ctrl_logic_worker"] = Rcpp::wrap(chem.GetWorkerControlTimings());
|
||||
|
||||
if (params.use_dht)
|
||||
{
|
||||
chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
|
||||
@ -554,6 +554,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
profiling["simtime"] = dSimTime;
|
||||
profiling["chemistry"] = chem_profiling;
|
||||
profiling["diffusion"] = diffusion_profiling;
|
||||
profiling["ctrl_logic"] = ctrl_profiling;
|
||||
|
||||
|
||||
chem.MasterLoopBreak();
|
||||
|
||||
|
||||
@ -38,10 +38,10 @@ static const inline std::string ai_surrogate_r_library =
|
||||
R"(@R_AI_SURROGATE_LIB@)";
|
||||
static const inline std::string r_runtime_parameters = "mysetup";
|
||||
|
||||
struct RuntimeParameters {
|
||||
struct RuntimeParameters
|
||||
{
|
||||
std::string out_dir;
|
||||
std::vector<double> timesteps;
|
||||
std::vector<double> species_epsilon;
|
||||
|
||||
Rcpp::List init_params;
|
||||
|
||||
@ -52,13 +52,15 @@ struct RuntimeParameters {
|
||||
|
||||
bool print_progress = false;
|
||||
|
||||
std::uint32_t penalty_iteration = 0;
|
||||
std::uint32_t max_penalty_iteration = 0;
|
||||
std::uint32_t penalty_counter = 0;
|
||||
std::uint32_t next_penalty_check = 0;
|
||||
bool rollback_simulation = false;
|
||||
bool control_iteration_active = false;
|
||||
std::uint32_t control_iteration = 1;
|
||||
bool rollback_enabled = false;
|
||||
bool control_interval_enabled = false;
|
||||
std::uint32_t global_iter = 0;
|
||||
std::uint32_t sur_disabled_counter = 0;
|
||||
std::uint32_t rollback_counter = 0;
|
||||
std::uint32_t checkpoint_interval = 0;
|
||||
std::uint32_t control_interval = 0;
|
||||
std::vector<double> mape_threshold;
|
||||
std::vector<double> rrmse_threshold;
|
||||
|
||||
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
|
||||
std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user