mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
Rollback error
This commit is contained in:
parent
040ae1181b
commit
5edb8dc63b
@ -274,7 +274,13 @@ namespace poet
|
||||
: mape(species_count, 0.0), rrsme(species_count, 0.0), iteration(iter) {}
|
||||
};
|
||||
|
||||
std::vector<error_stats> error_stats_history;
|
||||
std::vector<SimulationErrorStats> error_history;
|
||||
|
||||
void computeSpeciesErrors(const std::vector<double> &reference_values,
|
||||
const std::vector<double> &surrogate_values,
|
||||
uint32_t size_per_prop,
|
||||
uint32_t species_count,
|
||||
SimulationErrorStats &species_error_stats);
|
||||
|
||||
static void computeStats(const std::vector<double> &pqc_vector,
|
||||
const std::vector<double> &sur_vector,
|
||||
@ -410,6 +416,7 @@ namespace poet
|
||||
|
||||
poet::DHT_Wrapper *dht = nullptr;
|
||||
|
||||
bool ht_fill{false};
|
||||
bool interp_enabled{false};
|
||||
std::unique_ptr<poet::InterpolationModule> interp;
|
||||
|
||||
|
||||
@ -160,21 +160,31 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerPHTCacheHits() const {
|
||||
return ret;
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::computeStats(const std::vector<double> &pqc_vector,
|
||||
const std::vector<double> &sur_vector,
|
||||
uint32_t size_per_prop,
|
||||
uint32_t species_count,
|
||||
error_stats &stats) {
|
||||
void poet::ChemistryModule::computeSpeciesErrors(
|
||||
const std::vector<double> &reference_values,
|
||||
const std::vector<double> &surrogate_values, uint32_t size_per_prop,
|
||||
uint32_t species_count, SimulationErrorStats &species_error_stats) {
|
||||
for (uint32_t i = 0; i < species_count; ++i) {
|
||||
double err_sum = 0.0;
|
||||
double sqr_err_sum = 0.0;
|
||||
uint32_t base_idx = i * size_per_prop;
|
||||
|
||||
if (i > 1) {
|
||||
std::cerr << "---- Species [" << i << "] " << this->prop_names[i]
|
||||
<< " ----\n";
|
||||
}
|
||||
|
||||
for (uint32_t j = 0; j < size_per_prop; ++j) {
|
||||
const double pqc_value = pqc_vector[base_idx + j];
|
||||
const double sur_value = sur_vector[base_idx + j];
|
||||
|
||||
if (pqc_value == 0.0) {
|
||||
// Print raw input values
|
||||
if (i > 1) {
|
||||
std::cerr << " index " << j << " | ref=" << ref_value
|
||||
<< " | sur=" << sur_value << '\n';
|
||||
}
|
||||
|
||||
if (ref_value == 0.0) {
|
||||
if (sur_value != 0.0) {
|
||||
err_sum += 1.0;
|
||||
sqr_err_sum += 1.0;
|
||||
@ -190,6 +200,9 @@ void poet::ChemistryModule::computeStats(const std::vector<double> &pqc_vector,
|
||||
stats.mape[i] = 100.0 * (err_sum / size_per_prop);
|
||||
stats.rrsme[i] =
|
||||
(size_per_prop > 0) ? std::sqrt(sqr_err_sum / size_per_prop) : 0.0;
|
||||
|
||||
std::cerr << " -> MAPE=" << species_error_stats.mape[i]
|
||||
<< " RRSME=" << species_error_stats.rrmse[i] << "\n\n";
|
||||
}
|
||||
}
|
||||
|
||||
@ -263,8 +276,8 @@ inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) {
|
||||
inline void poet::ChemistryModule::MasterSendPkgs(
|
||||
worker_list_t &w_list, workpointer_t &work_pointer,
|
||||
workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs,
|
||||
int &free_workers, double dt, uint32_t iteration,
|
||||
uint32_t control_iteration, const std::vector<uint32_t> &wp_sizes_vector) {
|
||||
int &free_workers, double dt, uint32_t iteration, uint32_t control_interval,
|
||||
const std::vector<uint32_t> &wp_sizes_vector) {
|
||||
/* declare variables */
|
||||
int local_work_package_size;
|
||||
|
||||
@ -329,6 +342,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
|
||||
int need_to_receive = 1;
|
||||
double idle_a, idle_b;
|
||||
int p, size;
|
||||
double recv_a, recv_b;
|
||||
|
||||
MPI_Status probe_status;
|
||||
// master_recv_a = MPI_Wtime();
|
||||
@ -451,22 +465,41 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
ftype = CHEM_INTERP;
|
||||
PropagateFunctionType(ftype);
|
||||
|
||||
if(this->runtime_params->rollback_simulation){
|
||||
int interp_flag = 0;
|
||||
int ht_fill_flag = 0;
|
||||
|
||||
if (this->runtime_params->global_iter <
|
||||
this->runtime_params->control_interval ||
|
||||
this->runtime_params->rollback_enabled) {
|
||||
this->interp_enabled = false;
|
||||
int interp_flag = 0;
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
this->ht_fill = true;
|
||||
interp_flag = 0;
|
||||
ht_fill_flag = 1;
|
||||
} else {
|
||||
this->interp_enabled = true;
|
||||
int interp_flag = 1;
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
this->ht_fill = false;
|
||||
interp_flag = 1;
|
||||
ht_fill_flag = 0;
|
||||
}
|
||||
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
ChemBCast(&ht_fill_flag, 1, MPI_INT);
|
||||
|
||||
/* end time measurement of broadcasting interpolation status */
|
||||
ctrl_bcast_b = MPI_Wtime();
|
||||
this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a;
|
||||
|
||||
ftype = CHEM_WORK_LOOP;
|
||||
PropagateFunctionType(ftype);
|
||||
|
||||
MPI_Barrier(this->group_comm);
|
||||
|
||||
static uint32_t iteration = 0;
|
||||
uint32_t control_iteration = static_cast<uint32_t>(
|
||||
this->runtime_params->control_iteration_active ? 1 : 0);
|
||||
if (control_iteration) {
|
||||
|
||||
uint32_t control_logic_enabled =
|
||||
this->runtime_params->control_interval_enabled ? 1 : 0;
|
||||
|
||||
if (control_logic_enabled) {
|
||||
sur_shuffled.clear();
|
||||
sur_shuffled.reserve(this->n_cells * this->prop_count);
|
||||
}
|
||||
@ -538,24 +571,30 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
|
||||
/* do master stuff */
|
||||
|
||||
if (control_iteration) {
|
||||
control_iteration_counter++;
|
||||
/* start time measurement of control logic */
|
||||
ctrl_a = MPI_Wtime();
|
||||
|
||||
if (control_logic_enabled && !this->runtime_params->rollback_enabled) {
|
||||
|
||||
std::vector<double> sur_unshuffled{sur_shuffled};
|
||||
|
||||
unshuffleField(sur_shuffled, this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size(), sur_unshuffled);
|
||||
|
||||
error_stats stats(this->prop_count, control_iteration_counter *
|
||||
runtime_params->control_iteration);
|
||||
SimulationErrorStats stats(this->prop_count,
|
||||
this->runtime_params->global_iter,
|
||||
this->runtime_params->rollback_counter);
|
||||
|
||||
computeStats(out_vec, sur_unshuffled, this->n_cells, this->prop_count,
|
||||
stats);
|
||||
error_stats_history.push_back(stats);
|
||||
computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells,
|
||||
this->prop_count, stats);
|
||||
|
||||
// to do: control values to epsilon
|
||||
error_history.push_back(stats);
|
||||
}
|
||||
|
||||
/* end time measurement of control logic */
|
||||
ctrl_b = MPI_Wtime();
|
||||
this->ctrl_t += ctrl_b - ctrl_a;
|
||||
|
||||
/* start time measurement of master chemistry */
|
||||
sim_e_chemistry = MPI_Wtime();
|
||||
|
||||
|
||||
@ -9,556 +9,496 @@
|
||||
#include <cstdint>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <mpi.h>
|
||||
#include <limits>
|
||||
#include <mpi.h>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace poet
|
||||
{
|
||||
namespace poet {
|
||||
|
||||
inline std::string get_string(int root, MPI_Comm communicator)
|
||||
{
|
||||
int count;
|
||||
MPI_Bcast(&count, 1, MPI_INT, root, communicator);
|
||||
inline std::string get_string(int root, MPI_Comm communicator) {
|
||||
int count;
|
||||
MPI_Bcast(&count, 1, MPI_INT, root, communicator);
|
||||
|
||||
char *buffer = new char[count + 1];
|
||||
MPI_Bcast(buffer, count, MPI_CHAR, root, communicator);
|
||||
char *buffer = new char[count + 1];
|
||||
MPI_Bcast(buffer, count, MPI_CHAR, root, communicator);
|
||||
|
||||
buffer[count] = '\0';
|
||||
buffer[count] = '\0';
|
||||
|
||||
std::string ret_str(buffer);
|
||||
delete[] buffer;
|
||||
std::string ret_str(buffer);
|
||||
delete[] buffer;
|
||||
|
||||
return ret_str;
|
||||
return ret_str;
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerLoop() {
|
||||
struct worker_s timings;
|
||||
|
||||
// HACK: defining the worker iteration count here, which will increment after
|
||||
// each CHEM_ITER_END message
|
||||
uint32_t iteration = 1;
|
||||
bool loop = true;
|
||||
|
||||
while (loop) {
|
||||
int func_type;
|
||||
PropagateFunctionType(func_type);
|
||||
|
||||
switch (func_type) {
|
||||
case CHEM_FIELD_INIT: {
|
||||
ChemBCast(&this->prop_count, 1, MPI_UINT32_T);
|
||||
if (this->ai_surrogate_enabled) {
|
||||
this->ai_surrogate_validity_vector.resize(
|
||||
this->n_cells); // resize statt reserve?
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CHEM_AI_BCAST_VALIDITY: {
|
||||
// Receive the index vector of valid ai surrogate predictions
|
||||
MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells,
|
||||
MPI_INT, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case CHEM_INTERP: {
|
||||
int interp_flag = 0;
|
||||
int ht_fill_flag = 0;
|
||||
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
ChemBCast(&ht_fill_flag, 1, MPI_INT);
|
||||
this->interp_enabled = (interp_flag == 1);
|
||||
this->ht_fill = (ht_fill_flag == 1);
|
||||
break;
|
||||
}
|
||||
case CHEM_WORK_LOOP: {
|
||||
WorkerProcessPkgs(timings, iteration);
|
||||
break;
|
||||
}
|
||||
case CHEM_PERF: {
|
||||
int type;
|
||||
ChemBCast(&type, 1, MPI_INT);
|
||||
if (type < WORKER_DHT_HITS) {
|
||||
WorkerPerfToMaster(type, timings);
|
||||
break;
|
||||
}
|
||||
WorkerMetricsToMaster(type);
|
||||
break;
|
||||
}
|
||||
case CHEM_BREAK_MAIN_LOOP: {
|
||||
WorkerPostSim(iteration);
|
||||
loop = false;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw std::runtime_error("Worker received unknown tag from master.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
|
||||
uint32_t &iteration) {
|
||||
MPI_Status probe_status;
|
||||
bool loop = true;
|
||||
|
||||
MPI_Barrier(this->group_comm);
|
||||
|
||||
while (loop) {
|
||||
double idle_a = MPI_Wtime();
|
||||
MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status);
|
||||
double idle_b = MPI_Wtime();
|
||||
|
||||
switch (probe_status.MPI_TAG) {
|
||||
case LOOP_WORK: {
|
||||
timings.idle_t += idle_b - idle_a;
|
||||
int count;
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
|
||||
|
||||
WorkerDoWork(probe_status, count, timings);
|
||||
break;
|
||||
}
|
||||
case LOOP_END: {
|
||||
WorkerPostIter(probe_status, iteration);
|
||||
iteration++;
|
||||
loop = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
|
||||
int double_count,
|
||||
struct worker_s &timings) {
|
||||
static int counter = 1;
|
||||
|
||||
double dht_get_start, dht_get_end;
|
||||
double phreeqc_time_start, phreeqc_time_end;
|
||||
double dht_fill_start, dht_fill_end;
|
||||
double ctrl_time_c, ctrl_time_d;
|
||||
|
||||
uint32_t iteration;
|
||||
double dt;
|
||||
double current_sim_time;
|
||||
uint32_t wp_start_index;
|
||||
int count = double_count;
|
||||
bool control_logic_enabled = false;
|
||||
std::vector<double> mpi_buffer(count);
|
||||
|
||||
/* receive */
|
||||
MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
/* decrement count of work_package by BUFFER_OFFSET */
|
||||
count -= BUFFER_OFFSET;
|
||||
/* check for changes on all additional variables given by the 'header' of
|
||||
* mpi_buffer */
|
||||
|
||||
// work_package_size
|
||||
poet::WorkPackage s_curr_wp(mpi_buffer[count]);
|
||||
|
||||
// current iteration of simulation
|
||||
iteration = mpi_buffer[count + 1];
|
||||
|
||||
// current timestep size
|
||||
dt = mpi_buffer[count + 2];
|
||||
|
||||
// current simulation time ('age' of simulation)
|
||||
current_sim_time = mpi_buffer[count + 3];
|
||||
|
||||
// current work package start location in field
|
||||
wp_start_index = mpi_buffer[count + 4];
|
||||
|
||||
control_logic_enabled = (mpi_buffer[count + 5] == 1);
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
|
||||
s_curr_wp.input[wp_i] =
|
||||
std::vector<double>(mpi_buffer.begin() + this->prop_count * wp_i,
|
||||
mpi_buffer.begin() + this->prop_count * (wp_i + 1));
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerLoop()
|
||||
{
|
||||
struct worker_s timings;
|
||||
// std::cout << this->comm_rank << ":" << counter++ << std::endl;
|
||||
if (dht_enabled || interp_enabled || ht_fill) {
|
||||
dht->prepareKeys(s_curr_wp.input, dt);
|
||||
}
|
||||
|
||||
// HACK: defining the worker iteration count here, which will increment after
|
||||
// each CHEM_ITER_END message
|
||||
uint32_t iteration = 1;
|
||||
bool loop = true;
|
||||
if (dht_enabled) {
|
||||
/* check for values in DHT */
|
||||
dht_get_start = MPI_Wtime();
|
||||
dht->checkDHT(s_curr_wp);
|
||||
dht_get_end = MPI_Wtime();
|
||||
timings.dht_get += dht_get_end - dht_get_start;
|
||||
}
|
||||
|
||||
while (loop)
|
||||
{
|
||||
int func_type;
|
||||
PropagateFunctionType(func_type);
|
||||
if (interp_enabled) {
|
||||
interp->tryInterpolation(s_curr_wp);
|
||||
}
|
||||
|
||||
switch (func_type)
|
||||
{
|
||||
case CHEM_FIELD_INIT:
|
||||
{
|
||||
ChemBCast(&this->prop_count, 1, MPI_UINT32_T);
|
||||
if (this->ai_surrogate_enabled)
|
||||
{
|
||||
this->ai_surrogate_validity_vector.resize(
|
||||
this->n_cells); // resize statt reserve?
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CHEM_AI_BCAST_VALIDITY:
|
||||
{
|
||||
// Receive the index vector of valid ai surrogate predictions
|
||||
MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells,
|
||||
MPI_INT, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case CHEM_INTERP:
|
||||
{
|
||||
int interp_flag;
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
this->interp_enabled = (interp_flag == 1);
|
||||
break;
|
||||
}
|
||||
case CHEM_WORK_LOOP:
|
||||
{
|
||||
WorkerProcessPkgs(timings, iteration);
|
||||
break;
|
||||
}
|
||||
case CHEM_PERF:
|
||||
{
|
||||
int type;
|
||||
ChemBCast(&type, 1, MPI_INT);
|
||||
if (type < WORKER_DHT_HITS)
|
||||
{
|
||||
WorkerPerfToMaster(type, timings);
|
||||
break;
|
||||
}
|
||||
WorkerMetricsToMaster(type);
|
||||
break;
|
||||
}
|
||||
case CHEM_BREAK_MAIN_LOOP:
|
||||
{
|
||||
WorkerPostSim(iteration);
|
||||
loop = false;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw std::runtime_error("Worker received unknown tag from master.");
|
||||
}
|
||||
}
|
||||
if (this->ai_surrogate_enabled) {
|
||||
// Map valid predictions from the ai surrogate in the workpackage
|
||||
for (int i = 0; i < s_curr_wp.size; i++) {
|
||||
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) {
|
||||
s_curr_wp.mapping[i] = CHEM_AISURR;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
|
||||
uint32_t &iteration)
|
||||
{
|
||||
MPI_Status probe_status;
|
||||
bool loop = true;
|
||||
/* if control iteration: create copy surrogate results (output and mappings)
|
||||
and then set them to zero, give this to phreeqc */
|
||||
|
||||
MPI_Barrier(this->group_comm);
|
||||
poet::WorkPackage s_curr_wp_control = s_curr_wp;
|
||||
|
||||
while (loop)
|
||||
{
|
||||
double idle_a = MPI_Wtime();
|
||||
MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status);
|
||||
double idle_b = MPI_Wtime();
|
||||
if (control_logic_enabled) {
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) {
|
||||
s_curr_wp_control.output[wp_i] =
|
||||
std::vector<double>(this->prop_count, 0.0);
|
||||
s_curr_wp_control.mapping[wp_i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
switch (probe_status.MPI_TAG)
|
||||
{
|
||||
case LOOP_WORK:
|
||||
{
|
||||
timings.idle_t += idle_b - idle_a;
|
||||
int count;
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
|
||||
WorkerDoWork(probe_status, count, timings);
|
||||
break;
|
||||
}
|
||||
case LOOP_END:
|
||||
{
|
||||
WorkerPostIter(probe_status, iteration);
|
||||
iteration++;
|
||||
loop = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp,
|
||||
current_sim_time, dt);
|
||||
|
||||
phreeqc_time_end = MPI_Wtime();
|
||||
|
||||
if (control_logic_enabled) {
|
||||
/* start time measurement for copying control workpackage */
|
||||
ctrl_time_c = MPI_Wtime();
|
||||
|
||||
std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count;
|
||||
|
||||
mpi_buffer.resize(count + sur_wp_offset);
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) {
|
||||
std::copy(s_curr_wp_control.output[wp_i].begin(),
|
||||
s_curr_wp_control.output[wp_i].end(),
|
||||
mpi_buffer.begin() + this->prop_count * wp_i);
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
|
||||
int double_count,
|
||||
struct worker_s &timings)
|
||||
{
|
||||
static int counter = 1;
|
||||
// s_curr_wp only contains the interpolated data
|
||||
// copy surrogate output after the the pqc output, mpi_buffer[pqc][interp]
|
||||
|
||||
double dht_get_start, dht_get_end;
|
||||
double phreeqc_time_start, phreeqc_time_end;
|
||||
double dht_fill_start, dht_fill_end;
|
||||
|
||||
uint32_t iteration;
|
||||
double dt;
|
||||
double current_sim_time;
|
||||
uint32_t wp_start_index;
|
||||
int count = double_count;
|
||||
bool control_iteration_active = false;
|
||||
std::vector<double> mpi_buffer(count);
|
||||
|
||||
/* receive */
|
||||
MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
/* decrement count of work_package by BUFFER_OFFSET */
|
||||
count -= BUFFER_OFFSET;
|
||||
/* check for changes on all additional variables given by the 'header' of
|
||||
* mpi_buffer */
|
||||
|
||||
// work_package_size
|
||||
poet::WorkPackage s_curr_wp(mpi_buffer[count]);
|
||||
|
||||
// current iteration of simulation
|
||||
iteration = mpi_buffer[count + 1];
|
||||
|
||||
// current timestep size
|
||||
dt = mpi_buffer[count + 2];
|
||||
|
||||
// current simulation time ('age' of simulation)
|
||||
current_sim_time = mpi_buffer[count + 3];
|
||||
|
||||
// current work package start location in field
|
||||
wp_start_index = mpi_buffer[count + 4];
|
||||
|
||||
control_iteration_active = (mpi_buffer[count + 5] == 1);
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
s_curr_wp.input[wp_i] =
|
||||
std::vector<double>(mpi_buffer.begin() + this->prop_count * wp_i,
|
||||
mpi_buffer.begin() + this->prop_count * (wp_i + 1));
|
||||
}
|
||||
|
||||
// std::cout << this->comm_rank << ":" << counter++ << std::endl;
|
||||
if (dht_enabled || interp_enabled)
|
||||
{
|
||||
dht->prepareKeys(s_curr_wp.input, dt);
|
||||
}
|
||||
|
||||
if (dht_enabled)
|
||||
{
|
||||
/* check for values in DHT */
|
||||
dht_get_start = MPI_Wtime();
|
||||
dht->checkDHT(s_curr_wp);
|
||||
dht_get_end = MPI_Wtime();
|
||||
timings.dht_get += dht_get_end - dht_get_start;
|
||||
}
|
||||
|
||||
if (interp_enabled)
|
||||
{
|
||||
interp->tryInterpolation(s_curr_wp);
|
||||
}
|
||||
|
||||
if (this->ai_surrogate_enabled)
|
||||
{
|
||||
// Map valid predictions from the ai surrogate in the workpackage
|
||||
for (int i = 0; i < s_curr_wp.size; i++)
|
||||
{
|
||||
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1)
|
||||
{
|
||||
s_curr_wp.mapping[i] = CHEM_AISURR;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* if control iteration: create copy surrogate results (output and mappings) and then set them to zero,
|
||||
give this to phreeqc */
|
||||
|
||||
poet::WorkPackage s_curr_wp_control = s_curr_wp;
|
||||
|
||||
if (control_iteration_active)
|
||||
{
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++)
|
||||
{
|
||||
s_curr_wp_control.output[wp_i] = std::vector<double>(this->prop_count, 0.0);
|
||||
s_curr_wp_control.mapping[wp_i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
|
||||
WorkerRunWorkPackage(control_iteration_active ? s_curr_wp_control : s_curr_wp, current_sim_time, dt);
|
||||
|
||||
phreeqc_time_end = MPI_Wtime();
|
||||
|
||||
if (control_iteration_active)
|
||||
{
|
||||
std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count;
|
||||
|
||||
mpi_buffer.resize(count + sur_wp_offset);
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++)
|
||||
{
|
||||
std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(),
|
||||
mpi_buffer.begin() + this->prop_count * wp_i);
|
||||
}
|
||||
|
||||
// s_curr_wp only contains the interpolated data
|
||||
// copy surrogate output after the the pqc output, mpi_buffer[pqc][interp]
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
if (s_curr_wp.mapping[wp_i] != CHEM_PQC) // only copy if surrogate was used
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
|
||||
if (s_curr_wp.mapping[wp_i] !=
|
||||
CHEM_PQC) // only copy if surrogate was used
|
||||
{
|
||||
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
|
||||
mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i);
|
||||
} else
|
||||
{
|
||||
} else {
|
||||
// if pqc was used, copy pqc results again
|
||||
std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(),
|
||||
std::copy(s_curr_wp_control.output[wp_i].begin(),
|
||||
s_curr_wp_control.output[wp_i].end(),
|
||||
mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
count += sur_wp_offset;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
|
||||
mpi_buffer.begin() + this->prop_count * wp_i);
|
||||
}
|
||||
}
|
||||
|
||||
/* send results to master */
|
||||
MPI_Request send_req;
|
||||
count += sur_wp_offset;
|
||||
|
||||
int mpi_tag = control_iteration_active ? LOOP_CTRL : LOOP_WORK;
|
||||
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req);
|
||||
|
||||
if (dht_enabled || interp_enabled)
|
||||
{
|
||||
/* write results to DHT */
|
||||
dht_fill_start = MPI_Wtime();
|
||||
dht->fillDHT(control_iteration_active ? s_curr_wp_control : s_curr_wp);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
if (interp_enabled)
|
||||
{
|
||||
interp->writePairs();
|
||||
}
|
||||
timings.dht_fill += dht_fill_end - dht_fill_start;
|
||||
/* end time measurement for copying control workpackage */
|
||||
ctrl_time_d = MPI_Wtime();
|
||||
timings.ctrl_t += ctrl_time_d - ctrl_time_c;
|
||||
} else {
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
|
||||
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
|
||||
mpi_buffer.begin() + this->prop_count * wp_i);
|
||||
}
|
||||
|
||||
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
|
||||
|
||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
|
||||
uint32_t iteration)
|
||||
{
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
/* send results to master */
|
||||
MPI_Request send_req;
|
||||
|
||||
if (this->dht_enabled)
|
||||
{
|
||||
dht_hits.push_back(dht->getHits());
|
||||
dht_evictions.push_back(dht->getEvictions());
|
||||
dht->resetCounter();
|
||||
int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK;
|
||||
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD,
|
||||
&send_req);
|
||||
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND)
|
||||
{
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
if (dht_enabled || interp_enabled || ht_fill) {
|
||||
/* write results to DHT */
|
||||
dht_fill_start = MPI_Wtime();
|
||||
dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
if (interp_enabled || ht_fill) {
|
||||
interp->writePairs();
|
||||
}
|
||||
|
||||
if (this->interp_enabled)
|
||||
{
|
||||
std::stringstream out;
|
||||
interp_calls.push_back(interp->getInterpolationCount());
|
||||
interp->resetCounter();
|
||||
interp->writePHTStats();
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND)
|
||||
{
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
|
||||
const auto max_mean_idx =
|
||||
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
|
||||
|
||||
if (max_mean_idx >= 2)
|
||||
{
|
||||
DHT_flush(this->interp->getDHTObject());
|
||||
DHT_flush(this->dht->getDHT());
|
||||
if (this->comm_rank == 2)
|
||||
{
|
||||
std::cout << "Flushed both DHT and PHT!\n\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RInsidePOET::getInstance().parseEvalQ("gc()");
|
||||
timings.dht_fill += dht_fill_end - dht_fill_start;
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostSim(uint32_t iteration)
|
||||
{
|
||||
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND)
|
||||
{
|
||||
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
|
||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
|
||||
uint32_t iteration) {
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
if (this->dht_enabled) {
|
||||
dht_hits.push_back(dht->getHits());
|
||||
dht_evictions.push_back(dht->getEvictions());
|
||||
dht->resetCounter();
|
||||
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND)
|
||||
{
|
||||
std::stringstream out;
|
||||
}
|
||||
|
||||
if (this->interp_enabled) {
|
||||
std::stringstream out;
|
||||
interp_calls.push_back(interp->getInterpolationCount());
|
||||
interp->resetCounter();
|
||||
interp->writePHTStats();
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
|
||||
const auto max_mean_idx =
|
||||
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
|
||||
|
||||
if (max_mean_idx >= 2) {
|
||||
DHT_flush(this->interp->getDHTObject());
|
||||
DHT_flush(this->dht->getDHT());
|
||||
if (this->comm_rank == 2) {
|
||||
std::cout << "Flushed both DHT and PHT!\n\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration)
|
||||
{
|
||||
RInsidePOET::getInstance().parseEvalQ("gc()");
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
|
||||
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
std::stringstream out;
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".dht";
|
||||
int res = dht->tableToFile(out.str().c_str());
|
||||
if (res != DHT_SUCCESS && this->comm_rank == 2)
|
||||
std::cerr
|
||||
<< "CPP: Worker: Error in writing current state of DHT to file.\n";
|
||||
else if (this->comm_rank == 2)
|
||||
std::cout << "CPP: Worker: Successfully written DHT to file " << out.str()
|
||||
<< "\n";
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
|
||||
std::stringstream out;
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".dht";
|
||||
int res = dht->tableToFile(out.str().c_str());
|
||||
if (res != DHT_SUCCESS && this->comm_rank == 2)
|
||||
std::cerr
|
||||
<< "CPP: Worker: Error in writing current state of DHT to file.\n";
|
||||
else if (this->comm_rank == 2)
|
||||
std::cout << "CPP: Worker: Successfully written DHT to file " << out.str()
|
||||
<< "\n";
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerReadDHTDump(
|
||||
const std::string &dht_input_file) {
|
||||
int res = dht->fileToTable((char *)dht_input_file.c_str());
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res == DHT_WRONG_FILE) {
|
||||
if (this->comm_rank == 1)
|
||||
std::cerr
|
||||
<< "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n";
|
||||
} else {
|
||||
if (this->comm_rank == 1)
|
||||
std::cerr << "CPP: Worker: Error in loading current state of DHT from "
|
||||
"file. Continue with empty DHT ...\n";
|
||||
}
|
||||
} else {
|
||||
if (this->comm_rank == 2)
|
||||
std::cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
||||
<< dht_input_file << "\n";
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
|
||||
double dSimTime,
|
||||
double dTimestep) {
|
||||
|
||||
std::vector<std::vector<double>> inout_chem = work_package.input;
|
||||
std::vector<std::size_t> to_ignore;
|
||||
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
|
||||
if (work_package.mapping[wp_id] != CHEM_PQC) {
|
||||
to_ignore.push_back(wp_id);
|
||||
}
|
||||
|
||||
// HACK: remove the first element (cell_id) before sending to phreeqc
|
||||
inout_chem[wp_id].erase(inout_chem[wp_id].begin(),
|
||||
inout_chem[wp_id].begin() + 1);
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerReadDHTDump(
|
||||
const std::string &dht_input_file)
|
||||
{
|
||||
int res = dht->fileToTable((char *)dht_input_file.c_str());
|
||||
if (res != DHT_SUCCESS)
|
||||
{
|
||||
if (res == DHT_WRONG_FILE)
|
||||
{
|
||||
if (this->comm_rank == 1)
|
||||
std::cerr
|
||||
<< "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
if (this->comm_rank == 1)
|
||||
std::cerr << "CPP: Worker: Error in loading current state of DHT from "
|
||||
"file. Continue with empty DHT ...\n";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (this->comm_rank == 2)
|
||||
std::cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
||||
<< dht_input_file << "\n";
|
||||
this->pqc_runner->run(inout_chem, dTimestep, to_ignore);
|
||||
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
|
||||
if (work_package.mapping[wp_id] == CHEM_PQC) {
|
||||
// HACK: as we removed the first element (cell_id) before sending to
|
||||
// phreeqc, copy back with an offset of 1
|
||||
work_package.output[wp_id] = work_package.input[wp_id];
|
||||
std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(),
|
||||
work_package.output[wp_id].begin() + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
|
||||
double dSimTime,
|
||||
double dTimestep)
|
||||
{
|
||||
|
||||
std::vector<std::vector<double>> inout_chem = work_package.input;
|
||||
std::vector<std::size_t> to_ignore;
|
||||
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++)
|
||||
{
|
||||
if (work_package.mapping[wp_id] != CHEM_PQC)
|
||||
{
|
||||
to_ignore.push_back(wp_id);
|
||||
}
|
||||
|
||||
// HACK: remove the first element (cell_id) before sending to phreeqc
|
||||
inout_chem[wp_id].erase(
|
||||
inout_chem[wp_id].begin(), inout_chem[wp_id].begin() + 1);
|
||||
void poet::ChemistryModule::WorkerPerfToMaster(int type,
|
||||
const struct worker_s &timings) {
|
||||
switch (type) {
|
||||
case WORKER_PHREEQC: {
|
||||
MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
|
||||
this->pqc_runner->run(inout_chem, dTimestep, to_ignore);
|
||||
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++)
|
||||
{
|
||||
if (work_package.mapping[wp_id] == CHEM_PQC)
|
||||
{
|
||||
// HACK: as we removed the first element (cell_id) before sending to phreeqc,
|
||||
// copy back with an offset of 1
|
||||
work_package.output[wp_id] = work_package.input[wp_id];
|
||||
std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(),
|
||||
work_package.output[wp_id].begin() + 1);
|
||||
}
|
||||
}
|
||||
case WORKER_CTRL_ITER: {
|
||||
MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPerfToMaster(int type,
|
||||
const struct worker_s &timings)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case WORKER_PHREEQC:
|
||||
{
|
||||
MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_GET:
|
||||
{
|
||||
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_FILL:
|
||||
{
|
||||
MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IDLE:
|
||||
{
|
||||
MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_WRITE:
|
||||
{
|
||||
double val = interp->getPHTWriteTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_READ:
|
||||
{
|
||||
double val = interp->getPHTReadTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_GATHER:
|
||||
{
|
||||
double val = interp->getDHTGatherTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_FC:
|
||||
{
|
||||
double val = interp->getInterpolationTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
}
|
||||
case WORKER_DHT_GET: {
|
||||
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerMetricsToMaster(int type)
|
||||
{
|
||||
MPI_Comm worker_comm = dht->getCommunicator();
|
||||
int worker_rank;
|
||||
MPI_Comm_rank(worker_comm, &worker_rank);
|
||||
|
||||
MPI_Comm &group_comm = this->group_comm;
|
||||
|
||||
auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm](
|
||||
std::vector<std::uint32_t> &send_buffer, int tag)
|
||||
{
|
||||
std::vector<uint32_t> to_master(send_buffer.size());
|
||||
MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(),
|
||||
MPI_UINT32_T, MPI_SUM, 0, worker_comm);
|
||||
|
||||
if (worker_rank == 0)
|
||||
{
|
||||
MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag,
|
||||
group_comm);
|
||||
}
|
||||
};
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case WORKER_DHT_HITS:
|
||||
{
|
||||
reduce_and_send(dht_hits, WORKER_DHT_HITS);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_EVICTIONS:
|
||||
{
|
||||
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_CALLS:
|
||||
{
|
||||
reduce_and_send(interp_calls, WORKER_IP_CALLS);
|
||||
return;
|
||||
}
|
||||
case WORKER_PHT_CACHE_HITS:
|
||||
{
|
||||
std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
|
||||
reduce_and_send(input, WORKER_PHT_CACHE_HITS);
|
||||
return;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
}
|
||||
case WORKER_DHT_FILL: {
|
||||
MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IDLE: {
|
||||
MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_WRITE: {
|
||||
double val = interp->getPHTWriteTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_READ: {
|
||||
double val = interp->getPHTReadTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_GATHER: {
|
||||
double val = interp->getDHTGatherTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_FC: {
|
||||
double val = interp->getInterpolationTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
|
||||
MPI_Comm worker_comm = dht->getCommunicator();
|
||||
int worker_rank;
|
||||
MPI_Comm_rank(worker_comm, &worker_rank);
|
||||
|
||||
MPI_Comm &group_comm = this->group_comm;
|
||||
|
||||
auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm](
|
||||
std::vector<std::uint32_t> &send_buffer, int tag) {
|
||||
std::vector<uint32_t> to_master(send_buffer.size());
|
||||
MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(),
|
||||
MPI_UINT32_T, MPI_SUM, 0, worker_comm);
|
||||
|
||||
if (worker_rank == 0) {
|
||||
MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag,
|
||||
group_comm);
|
||||
}
|
||||
};
|
||||
|
||||
switch (type) {
|
||||
case WORKER_DHT_HITS: {
|
||||
reduce_and_send(dht_hits, WORKER_DHT_HITS);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_EVICTIONS: {
|
||||
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_CALLS: {
|
||||
reduce_and_send(interp_calls, WORKER_IP_CALLS);
|
||||
return;
|
||||
}
|
||||
case WORKER_PHT_CACHE_HITS: {
|
||||
std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
|
||||
reduce_and_send(input, WORKER_PHT_CACHE_HITS);
|
||||
return;
|
||||
}
|
||||
default: {
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace poet
|
||||
|
||||
243
src/poet.cpp
243
src/poet.cpp
@ -68,8 +68,7 @@ static poet::DEFunc ReadRObj_R;
|
||||
static poet::DEFunc SaveRObj_R;
|
||||
static poet::DEFunc source_R;
|
||||
|
||||
static void init_global_functions(RInside &R)
|
||||
{
|
||||
static void init_global_functions(RInside &R) {
|
||||
R.parseEval(kin_r_library);
|
||||
master_init_R = DEFunc("master_init");
|
||||
master_iteration_end_R = DEFunc("master_iteration_end");
|
||||
@ -92,15 +91,9 @@ static void init_global_functions(RInside &R)
|
||||
// R.parseEval("mysetup$state_C <- TMP");
|
||||
// }
|
||||
|
||||
enum ParseRet
|
||||
{
|
||||
PARSER_OK,
|
||||
PARSER_ERROR,
|
||||
PARSER_HELP
|
||||
};
|
||||
enum ParseRet { PARSER_OK, PARSER_ERROR, PARSER_HELP };
|
||||
|
||||
int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
{
|
||||
int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) {
|
||||
|
||||
CLI::App app{"POET - Potsdam rEactive Transport simulator"};
|
||||
|
||||
@ -182,12 +175,9 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
"Output directory of the simulation")
|
||||
->required();
|
||||
|
||||
try
|
||||
{
|
||||
try {
|
||||
app.parse(argc, argv);
|
||||
}
|
||||
catch (const CLI::ParseError &e)
|
||||
{
|
||||
} catch (const CLI::ParseError &e) {
|
||||
app.exit(e);
|
||||
return -1;
|
||||
}
|
||||
@ -199,16 +189,14 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
if (params.as_qs)
|
||||
params.out_ext = "qs";
|
||||
|
||||
if (MY_RANK == 0)
|
||||
{
|
||||
if (MY_RANK == 0) {
|
||||
// MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result));
|
||||
MSG("Output format/extension is " + params.out_ext);
|
||||
MSG("Work Package Size: " + std::to_string(params.work_package_size));
|
||||
MSG("DHT is " + BOOL_PRINT(params.use_dht));
|
||||
MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate));
|
||||
|
||||
if (params.use_dht)
|
||||
{
|
||||
if (params.use_dht) {
|
||||
// MSG("DHT strategy is " + std::to_string(simparams.dht_strategy));
|
||||
// MDL: these should be outdated (?)
|
||||
// MSG("DHT key default digits (ignored if 'signif_vector' is "
|
||||
@ -222,8 +210,7 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
// MSG("DHT load file is " + chem_params.dht_file);
|
||||
}
|
||||
|
||||
if (params.use_interp)
|
||||
{
|
||||
if (params.use_interp) {
|
||||
MSG("PHT interpolation enabled: " + BOOL_PRINT(params.use_interp));
|
||||
MSG("PHT interp-size = " + std::to_string(params.interp_size));
|
||||
MSG("PHT interp-min = " + std::to_string(params.interp_min_entries));
|
||||
@ -251,9 +238,7 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
// // log before rounding?
|
||||
// R["dht_log"] = simparams.dht_log;
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
try {
|
||||
Rcpp::List init_params_(ReadRObj_R(init_file));
|
||||
params.init_params = init_params_;
|
||||
|
||||
@ -270,13 +255,9 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("control_interval"));
|
||||
params.checkpoint_interval =
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("checkpoint_interval"));
|
||||
params.mape_threshold =
|
||||
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("mape_threshold"));
|
||||
params.rrmse_threshold =
|
||||
Rcpp::as<std::vector<double>>(global_rt_setup->operator[]("rrmse_threshold"));
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
params.mape_threshold = Rcpp::as<std::vector<double>>(
|
||||
global_rt_setup->operator[]("mape_threshold"));
|
||||
} catch (const std::exception &e) {
|
||||
ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
|
||||
return ParseRet::PARSER_ERROR;
|
||||
}
|
||||
@ -286,8 +267,7 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms)
|
||||
|
||||
// HACK: this is a step back as the order and also the count of fields is
|
||||
// predefined, but it will change in the future
|
||||
void call_master_iter_end(RInside &R, const Field &trans, const Field &chem)
|
||||
{
|
||||
void call_master_iter_end(RInside &R, const Field &trans, const Field &chem) {
|
||||
R["TMP"] = Rcpp::wrap(trans.AsVector());
|
||||
R["TMP_PROPS"] = Rcpp::wrap(trans.GetProps());
|
||||
R.parseEval(std::string("state_T <- setNames(data.frame(matrix(TMP, nrow=" +
|
||||
@ -304,53 +284,50 @@ void call_master_iter_end(RInside &R, const Field &trans, const Field &chem)
|
||||
*global_rt_setup = R["setup"];
|
||||
}
|
||||
|
||||
bool checkAndRollback(ChemistryModule &chem, RuntimeParameters ¶ms, uint32_t &iter)
|
||||
{
|
||||
const auto &mape = chem.error_history.back().mape;
|
||||
const auto &rrmse = chem.error_history.back().rrmse;
|
||||
const auto &props = chem.getField().GetProps();
|
||||
bool triggerRollbackIfExceeded(ChemistryModule &chem, RuntimeParameters ¶ms,
|
||||
uint32_t ¤t_iteration) {
|
||||
const auto &mape = chem.error_history.back().mape;
|
||||
const auto &props = chem.getField().GetProps();
|
||||
|
||||
for (uint32_t i = 0; i < params.mape_threshold.size(); ++i)
|
||||
{
|
||||
// Skip invalid entries
|
||||
if ((mape[i] == 0 && rrmse[i] == 0))
|
||||
continue;
|
||||
|
||||
bool mape_exceeded = mape[i] > params.mape_threshold[i];
|
||||
bool rrmse_exceeded = rrmse[i] > params.rrmse_threshold[i];
|
||||
|
||||
if (mape_exceeded || rrmse_exceeded)
|
||||
{
|
||||
uint32_t rollback_iter = ((current_iteration - 1) / params.checkpoint_interval) * params.checkpoint_interval;
|
||||
std::string metric = mape_exceeded ? "MAPE" : "RRMSE";
|
||||
double value = mape_exceeded ? mape[i] : rrmse[i];
|
||||
double threshold = mape_exceeded ? params.mape_threshold[i] : params.rrmse_threshold[i];
|
||||
|
||||
MSG("[THRESHOLD EXCEEDED] " + props[i] + " has " + metric + " = " +
|
||||
std::to_string(value) + " exceeding threshold = " + std::to_string(threshold) +
|
||||
" → rolling back to iteration " + std::to_string(rollback_iter));
|
||||
|
||||
Checkpoint_s checkpoint_read{.field = chem.getField()};
|
||||
read_checkpoint(params.out_dir, "checkpoint" + std::to_string(rollback_iter) + ".hdf5", checkpoint_read);
|
||||
current_iteration = checkpoint_read.iteration;
|
||||
return true; // rollback happened
|
||||
}
|
||||
for (uint32_t i = 0; i < params.mape_threshold.size(); ++i) {
|
||||
// Skip invalid entries
|
||||
if (mape[i] == 0) {
|
||||
continue;
|
||||
}
|
||||
MSG("All species are within their MAPE and RRMSE thresholds.");
|
||||
return false;
|
||||
|
||||
bool mape_exceeded = mape[i] > params.mape_threshold[i];
|
||||
|
||||
if (mape_exceeded) {
|
||||
uint32_t rollback_iter =
|
||||
((current_iteration - 1) / params.checkpoint_interval) *
|
||||
params.checkpoint_interval;
|
||||
|
||||
MSG("[THRESHOLD EXCEEDED] " + props[i] +
|
||||
" has MAPE = " + std::to_string(mape[i]) +
|
||||
" exceeding threshold = " + std::to_string(params.mape_threshold[i]) +
|
||||
" → rolling back to iteration " + std::to_string(rollback_iter));
|
||||
|
||||
Checkpoint_s checkpoint_read{.field = chem.getField()};
|
||||
read_checkpoint(params.out_dir,
|
||||
"checkpoint" + std::to_string(rollback_iter) + ".hdf5",
|
||||
checkpoint_read);
|
||||
current_iteration = checkpoint_read.iteration;
|
||||
return true; // rollback happened
|
||||
}
|
||||
}
|
||||
MSG("All species are within their MAPE and RRMSE thresholds.");
|
||||
return false;
|
||||
}
|
||||
|
||||
static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
DiffusionModule &diffusion,
|
||||
ChemistryModule &chem)
|
||||
{
|
||||
ChemistryModule &chem) {
|
||||
|
||||
/* Iteration Count is dynamic, retrieving value from R (is only needed by
|
||||
* master for the following loop) */
|
||||
uint32_t maxiter = params.timesteps.size();
|
||||
|
||||
if (params.print_progress)
|
||||
{
|
||||
if (params.print_progress) {
|
||||
chem.setProgressBarPrintout(true);
|
||||
}
|
||||
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
|
||||
@ -360,13 +337,19 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
/* SIMULATION LOOP */
|
||||
|
||||
double dSimTime{0};
|
||||
for (uint32_t iter = 1; iter < maxiter + 1; iter++)
|
||||
{
|
||||
// Penalty countdown
|
||||
if (params.rollback_simulation && params.penalty_counter > 0)
|
||||
{
|
||||
params.penalty_counter--;
|
||||
std::cout << "Penalty counter: " << params.penalty_counter << std::endl;
|
||||
double write_chk = 0.0;
|
||||
double write_stats = 0.0;
|
||||
double read_chk = 0.0;
|
||||
|
||||
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
|
||||
// Rollback countdowm
|
||||
if (params.rollback_enabled) {
|
||||
if (params.sur_disabled_counter > 0) {
|
||||
--params.sur_disabled_counter;
|
||||
MSG("Rollback counter: " + std::to_string(params.sur_disabled_counter));
|
||||
} else {
|
||||
params.rollback_enabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
params.control_iteration_active = (iter % params.control_iteration == 0 /* && iter != 0 */);
|
||||
@ -391,8 +374,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
chem.getField().update(diffusion.getField());
|
||||
|
||||
// MSG("Chemistry start");
|
||||
if (params.use_ai_surrogate)
|
||||
{
|
||||
if (params.use_ai_surrogate) {
|
||||
double ai_start_t = MPI_Wtime();
|
||||
// Save current values from the tug field as predictor for the ai step
|
||||
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
|
||||
@ -443,8 +425,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
chem.simulate(dt);
|
||||
|
||||
/* AI surrogate iterative training*/
|
||||
if (params.use_ai_surrogate)
|
||||
{
|
||||
if (params.use_ai_surrogate) {
|
||||
double ai_start_t = MPI_Wtime();
|
||||
|
||||
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
|
||||
@ -483,20 +464,41 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
|
||||
std::to_string(maxiter));
|
||||
|
||||
double chk_start = MPI_Wtime();
|
||||
double write_chk_start, write_chk_end;
|
||||
double write_stats_start, write_stats_end;
|
||||
double read_chk_start, read_chk_end;
|
||||
|
||||
if(iter % params.checkpoint_interval == 0){
|
||||
if (params.control_interval_enabled) {
|
||||
|
||||
write_chk_start = MPI_Wtime();
|
||||
MSG("Writing checkpoint of iteration " + std::to_string(iter));
|
||||
write_checkpoint(params.out_dir, "checkpoint" + std::to_string(iter) + ".hdf5",
|
||||
write_checkpoint(params.out_dir,
|
||||
"checkpoint" + std::to_string(iter) + ".hdf5",
|
||||
{.field = chem.getField(), .iteration = iter});
|
||||
write_chk_end = MPI_Wtime();
|
||||
|
||||
if (!params.rollback_enabled) {
|
||||
write_stats_start = MPI_Wtime();
|
||||
writeStatsToCSV(chem.error_history, chem.getField().GetProps(),
|
||||
params.out_dir, "stats_overview");
|
||||
write_stats_end = MPI_Wtime();
|
||||
|
||||
read_chk_start = MPI_Wtime();
|
||||
params.rollback_enabled = triggerRollbackIfExceeded(chem, params, iter);
|
||||
read_chk_end = MPI_Wtime();
|
||||
|
||||
if (params.rollback_enabled) {
|
||||
params.rollback_counter++;
|
||||
params.sur_disabled_counter = params.control_interval;
|
||||
MSG("Interpolation disabled for the next " +
|
||||
std::to_string(params.control_interval) + ".");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (iter == params.next_penalty_check)
|
||||
{
|
||||
writeStatsToCSV(chem.error_history, chem.getField().GetProps(), params.out_dir,"stats_overview");
|
||||
|
||||
params.next_penalty_check = iter + params.penalty_iteration;
|
||||
}
|
||||
write_chk += write_chk_end - write_chk_start;
|
||||
write_stats += write_stats_end - write_stats_start;
|
||||
read_chk += read_chk_end - read_chk_start;
|
||||
|
||||
// MSG();
|
||||
} // END SIMULATION LOOP
|
||||
@ -514,8 +516,17 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
Rcpp::List diffusion_profiling;
|
||||
diffusion_profiling["simtime"] = diffusion.getTransportTime();
|
||||
|
||||
if (params.use_dht)
|
||||
{
|
||||
Rcpp::List ctrl_profiling;
|
||||
ctrl_profiling["write_checkpoint"] = write_chk;
|
||||
ctrl_profiling["read_checkpoint"] = read_chk;
|
||||
ctrl_profiling["write_metrics"] = write_stats;
|
||||
ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime();
|
||||
ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime();
|
||||
ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime();
|
||||
ctrl_profiling["ctrl_logic_worker"] =
|
||||
Rcpp::wrap(chem.GetWorkerControlTimings());
|
||||
|
||||
if (params.use_dht) {
|
||||
chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
|
||||
chem_profiling["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
|
||||
chem_profiling["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
|
||||
@ -523,8 +534,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
Rcpp::wrap(chem.GetWorkerDHTFillTimings());
|
||||
}
|
||||
|
||||
if (params.use_interp)
|
||||
{
|
||||
if (params.use_interp) {
|
||||
chem_profiling["interp_w"] =
|
||||
Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
|
||||
chem_profiling["interp_r"] =
|
||||
@ -542,6 +552,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
profiling["simtime"] = dSimTime;
|
||||
profiling["chemistry"] = chem_profiling;
|
||||
profiling["diffusion"] = diffusion_profiling;
|
||||
profiling["ctrl_logic"] = ctrl_profiling;
|
||||
|
||||
chem.MasterLoopBreak();
|
||||
|
||||
@ -549,8 +560,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
}
|
||||
|
||||
std::vector<std::string> getSpeciesNames(const Field &&field, int root,
|
||||
MPI_Comm comm)
|
||||
{
|
||||
MPI_Comm comm) {
|
||||
std::uint32_t n_elements;
|
||||
std::uint32_t n_string_size;
|
||||
|
||||
@ -560,13 +570,11 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
|
||||
const bool is_master = root == rank;
|
||||
|
||||
// first, the master sends all the species names iterative
|
||||
if (is_master)
|
||||
{
|
||||
if (is_master) {
|
||||
n_elements = field.GetProps().size();
|
||||
MPI_Bcast(&n_elements, 1, MPI_UINT32_T, root, MPI_COMM_WORLD);
|
||||
|
||||
for (std::uint32_t i = 0; i < n_elements; i++)
|
||||
{
|
||||
for (std::uint32_t i = 0; i < n_elements; i++) {
|
||||
n_string_size = field.GetProps()[i].size();
|
||||
MPI_Bcast(&n_string_size, 1, MPI_UINT32_T, root, MPI_COMM_WORLD);
|
||||
MPI_Bcast(const_cast<char *>(field.GetProps()[i].c_str()), n_string_size,
|
||||
@ -581,8 +589,7 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
|
||||
|
||||
std::vector<std::string> species_names_out(n_elements);
|
||||
|
||||
for (std::uint32_t i = 0; i < n_elements; i++)
|
||||
{
|
||||
for (std::uint32_t i = 0; i < n_elements; i++) {
|
||||
MPI_Bcast(&n_string_size, 1, MPI_UINT32_T, root, MPI_COMM_WORLD);
|
||||
|
||||
char recv_buf[n_string_size];
|
||||
@ -595,8 +602,7 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
|
||||
return species_names_out;
|
||||
}
|
||||
|
||||
std::array<double, 2> getBaseTotals(Field &&field, int root, MPI_Comm comm)
|
||||
{
|
||||
std::array<double, 2> getBaseTotals(Field &&field, int root, MPI_Comm comm) {
|
||||
std::array<double, 2> base_totals;
|
||||
|
||||
int rank;
|
||||
@ -604,8 +610,7 @@ std::array<double, 2> getBaseTotals(Field &&field, int root, MPI_Comm comm)
|
||||
|
||||
const bool is_master = root == rank;
|
||||
|
||||
if (is_master)
|
||||
{
|
||||
if (is_master) {
|
||||
const auto h_col = field["H"];
|
||||
const auto o_col = field["O"];
|
||||
|
||||
@ -620,8 +625,7 @@ std::array<double, 2> getBaseTotals(Field &&field, int root, MPI_Comm comm)
|
||||
return base_totals;
|
||||
}
|
||||
|
||||
bool getHasID(Field &&field, int root, MPI_Comm comm)
|
||||
{
|
||||
bool getHasID(Field &&field, int root, MPI_Comm comm) {
|
||||
bool has_id;
|
||||
|
||||
int rank;
|
||||
@ -629,8 +633,7 @@ bool getHasID(Field &&field, int root, MPI_Comm comm)
|
||||
|
||||
const bool is_master = root == rank;
|
||||
|
||||
if (is_master)
|
||||
{
|
||||
if (is_master) {
|
||||
const auto ID_field = field["ID"];
|
||||
|
||||
std::set<double> unique_IDs(ID_field.begin(), ID_field.end());
|
||||
@ -647,8 +650,7 @@ bool getHasID(Field &&field, int root, MPI_Comm comm)
|
||||
return has_id;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int main(int argc, char *argv[]) {
|
||||
int world_size;
|
||||
|
||||
MPI_Init(&argc, &argv);
|
||||
@ -659,8 +661,7 @@ int main(int argc, char *argv[])
|
||||
|
||||
RInsidePOET &R = RInsidePOET::getInstance();
|
||||
|
||||
if (MY_RANK == 0)
|
||||
{
|
||||
if (MY_RANK == 0) {
|
||||
MSG("Running POET version " + std::string(poet_version));
|
||||
}
|
||||
|
||||
@ -668,8 +669,7 @@ int main(int argc, char *argv[])
|
||||
|
||||
RuntimeParameters run_params;
|
||||
|
||||
if (parseInitValues(argc, argv, run_params) != 0)
|
||||
{
|
||||
if (parseInitValues(argc, argv, run_params) != 0) {
|
||||
MPI_Finalize();
|
||||
return 0;
|
||||
}
|
||||
@ -711,12 +711,9 @@ int main(int argc, char *argv[])
|
||||
|
||||
chemistry.masterEnableSurrogates(surr_setup);
|
||||
|
||||
if (MY_RANK > 0)
|
||||
{
|
||||
if (MY_RANK > 0) {
|
||||
chemistry.WorkerLoop();
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
// R.parseEvalQ("mysetup <- setup");
|
||||
// // if (MY_RANK == 0) { // get timestep vector from
|
||||
// // grid_init function ... //
|
||||
@ -730,8 +727,7 @@ int main(int argc, char *argv[])
|
||||
R["out_ext"] = run_params.out_ext;
|
||||
R["out_dir"] = run_params.out_dir;
|
||||
|
||||
if (run_params.use_ai_surrogate)
|
||||
{
|
||||
if (run_params.use_ai_surrogate) {
|
||||
/* Incorporate ai surrogate from R */
|
||||
R.parseEvalQ(ai_surrogate_r_library);
|
||||
/* Use dht species for model input and output */
|
||||
@ -780,8 +776,7 @@ int main(int argc, char *argv[])
|
||||
|
||||
MPI_Finalize();
|
||||
|
||||
if (MY_RANK == 0)
|
||||
{
|
||||
if (MY_RANK == 0) {
|
||||
MSG("done, bye!");
|
||||
}
|
||||
|
||||
|
||||
@ -52,13 +52,14 @@ struct RuntimeParameters {
|
||||
|
||||
bool print_progress = false;
|
||||
|
||||
std::uint32_t penalty_iteration = 0;
|
||||
std::uint32_t max_penalty_iteration = 0;
|
||||
std::uint32_t penalty_counter = 0;
|
||||
std::uint32_t next_penalty_check = 0;
|
||||
bool rollback_simulation = false;
|
||||
bool control_iteration_active = false;
|
||||
std::uint32_t control_iteration = 1;
|
||||
bool rollback_enabled = false;
|
||||
bool control_interval_enabled = false;
|
||||
std::uint32_t global_iter = 0;
|
||||
std::uint32_t sur_disabled_counter = 0;
|
||||
std::uint32_t rollback_counter = 0;
|
||||
std::uint32_t checkpoint_interval = 0;
|
||||
std::uint32_t control_interval = 0;
|
||||
std::vector<double> mape_threshold;
|
||||
|
||||
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
|
||||
std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user