mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
Wp data not being shuffled correctly.
This commit is contained in:
parent
71269166ea
commit
dc940b2f88
@ -2,15 +2,16 @@
|
||||
#ifndef CHEMISTRYMODULE_H_
|
||||
#define CHEMISTRYMODULE_H_
|
||||
|
||||
#include "ChemistryDefs.hpp"
|
||||
#include "DataStructures/Field.hpp"
|
||||
#include "DataStructures/NamedVector.hpp"
|
||||
#include "ChemistryDefs.hpp"
|
||||
#include "Control/ControlModule.hpp"
|
||||
#include "Init/InitialList.hpp"
|
||||
#include "NameDouble.h"
|
||||
#include "PhreeqcRunner.hpp"
|
||||
#include "SurrogateModels/DHT_Wrapper.hpp"
|
||||
#include "SurrogateModels/Interpolation.hpp"
|
||||
#include "PhreeqcRunner.hpp"
|
||||
|
||||
#include <array>
|
||||
#include <cstdint>
|
||||
@ -174,12 +175,6 @@ public:
|
||||
*/
|
||||
auto GetMasterLoopTime() const { return this->send_recv_t; }
|
||||
|
||||
auto GetMasterCtrlLogicTime() const { return this->ctrl_t; }
|
||||
|
||||
auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; }
|
||||
|
||||
auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; }
|
||||
|
||||
/**
|
||||
* **Master only** Collect and return all accumulated timings recorded by
|
||||
* workers to run Phreeqc simulation.
|
||||
@ -257,6 +252,8 @@ public:
|
||||
|
||||
std::vector<int> ai_surrogate_validity_vector;
|
||||
|
||||
void setControlModule(poet::ControlModule *ctrl) { control_module = ctrl; }
|
||||
|
||||
protected:
|
||||
void initializeDHT(uint32_t size_mb,
|
||||
const NamedVector<std::uint32_t> &key_species,
|
||||
@ -274,7 +271,8 @@ protected:
|
||||
CHEM_DHT_SIGNIF_VEC,
|
||||
CHEM_DHT_SNAPS,
|
||||
CHEM_DHT_READ_FILE,
|
||||
CHEM_IP, // Control Flag
|
||||
//CHEM_IP, // Control flag
|
||||
CHEM_CTRL, // Control flag
|
||||
CHEM_IP_ENABLE,
|
||||
CHEM_IP_MIN_ENTRIES,
|
||||
CHEM_IP_SIGNIF_VEC,
|
||||
@ -329,7 +327,7 @@ protected:
|
||||
void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer,
|
||||
workpointer_t &sur_pointer, int &pkg_to_send,
|
||||
int &count_pkgs, int &free_workers, double dt,
|
||||
uint32_t iteration, uint32_t control_iteration,
|
||||
uint32_t iteration,
|
||||
const std::vector<uint32_t> &wp_sizes_vector);
|
||||
void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send,
|
||||
int &free_workers);
|
||||
@ -367,6 +365,10 @@ protected:
|
||||
|
||||
void BCastStringVec(std::vector<std::string> &io);
|
||||
|
||||
int packResultsIntoBuffer(std::vector<double> &mpi_buffer, int base_count,
|
||||
const WorkPackage &wp,
|
||||
const WorkPackage &wp_control);
|
||||
|
||||
int comm_size, comm_rank;
|
||||
MPI_Comm group_comm;
|
||||
|
||||
@ -380,13 +382,12 @@ protected:
|
||||
|
||||
poet::DHT_Wrapper *dht = nullptr;
|
||||
|
||||
bool dht_fill_during_rollback{false};
|
||||
bool interp_enabled{false};
|
||||
std::unique_ptr<poet::InterpolationModule> interp;
|
||||
|
||||
bool ai_surrogate_enabled{false};
|
||||
|
||||
static constexpr uint32_t BUFFER_OFFSET = 6;
|
||||
static constexpr uint32_t BUFFER_OFFSET = 5;
|
||||
|
||||
inline void ChemBCast(void *buf, int count, MPI_Datatype datatype) const {
|
||||
MPI_Bcast(buf, count, datatype, 0, this->group_comm);
|
||||
@ -400,10 +401,6 @@ protected:
|
||||
double seq_t = 0.;
|
||||
double send_recv_t = 0.;
|
||||
|
||||
double ctrl_t = 0.;
|
||||
double bcast_ctrl_t = 0.;
|
||||
double recv_ctrl_t = 0.;
|
||||
|
||||
std::array<double, 2> base_totals{0};
|
||||
|
||||
bool print_progessbar{false};
|
||||
@ -422,9 +419,11 @@ protected:
|
||||
|
||||
std::unique_ptr<PhreeqcRunner> pqc_runner;
|
||||
|
||||
std::unique_ptr<poet::ControlModule> ctrl_module;
|
||||
poet::ControlModule *control_module = nullptr;
|
||||
|
||||
//std::vector<double> sur_shuffled;
|
||||
bool control_enabled{false};
|
||||
|
||||
// std::vector<double> sur_shuffled;
|
||||
};
|
||||
} // namespace poet
|
||||
|
||||
|
||||
@ -235,7 +235,7 @@ inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) {
|
||||
inline void poet::ChemistryModule::MasterSendPkgs(
|
||||
worker_list_t &w_list, workpointer_t &work_pointer,
|
||||
workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs,
|
||||
int &free_workers, double dt, uint32_t iteration, uint32_t control_interval,
|
||||
int &free_workers, double dt, uint32_t iteration,
|
||||
const std::vector<uint32_t> &wp_sizes_vector) {
|
||||
/* declare variables */
|
||||
int local_work_package_size;
|
||||
@ -276,8 +276,6 @@ inline void poet::ChemistryModule::MasterSendPkgs(
|
||||
std::accumulate(wp_sizes_vector.begin(),
|
||||
std::next(wp_sizes_vector.begin(), count_pkgs), 0);
|
||||
send_buffer[end_of_wp + 4] = wp_start_index;
|
||||
// whether this iteration is a control iteration
|
||||
send_buffer[end_of_wp + 5] = control_interval;
|
||||
|
||||
/* ATTENTION Worker p has rank p+1 */
|
||||
// MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1,
|
||||
@ -285,6 +283,17 @@ inline void poet::ChemistryModule::MasterSendPkgs(
|
||||
MPI_Send(send_buffer.data(), send_buffer.size(), MPI_DOUBLE, p + 1,
|
||||
LOOP_WORK, this->group_comm);
|
||||
|
||||
/* ---- DEBUG LOG (Sender side) ---- */
|
||||
std::cout << "[DEBUG][rank=" << p+1
|
||||
<< "] sending WP " << (count_pkgs - 1)
|
||||
<< " to worker rank " << (p + 1)
|
||||
<< " | len=" << send_buffer.size()
|
||||
<< " | start index=" << wp_start_index
|
||||
<< " | second element=" << send_buffer[1]
|
||||
<< " | pkg size=" << local_work_package_size
|
||||
<< std::endl;
|
||||
/* -------------------------------- */
|
||||
|
||||
/* Mark that worker has work to do */
|
||||
w_list[p].has_work = 1;
|
||||
free_workers--;
|
||||
@ -301,8 +310,9 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
|
||||
int need_to_receive = 1;
|
||||
double idle_a, idle_b;
|
||||
int p, size;
|
||||
double recv_a, recv_b;
|
||||
std::vector<double> recv_buffer;
|
||||
|
||||
recv_buffer.reserve(wp_size * prop_count * 2);
|
||||
MPI_Status probe_status;
|
||||
// master_recv_a = MPI_Wtime();
|
||||
/* start to loop as long there are packages to recv and the need to receive
|
||||
@ -320,41 +330,48 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
|
||||
idle_b = MPI_Wtime();
|
||||
this->idle_t += idle_b - idle_a;
|
||||
}
|
||||
|
||||
if (!need_to_receive) {
|
||||
continue;
|
||||
}
|
||||
/* if need_to_receive was set to true above, so there is a message to
|
||||
* receive */
|
||||
if (need_to_receive) {
|
||||
p = probe_status.MPI_SOURCE;
|
||||
if (probe_status.MPI_TAG == LOOP_WORK) {
|
||||
bool handled = false;
|
||||
|
||||
switch (probe_status.MPI_TAG) {
|
||||
case LOOP_WORK: {
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
|
||||
MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK,
|
||||
this->group_comm, MPI_STATUS_IGNORE);
|
||||
w_list[p - 1].has_work = 0;
|
||||
pkg_to_recv -= 1;
|
||||
free_workers++;
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
if (probe_status.MPI_TAG == LOOP_CTRL) {
|
||||
recv_a = MPI_Wtime();
|
||||
case LOOP_CTRL: {
|
||||
/* layout of buffer is [phreeqc][surrogate] */
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
|
||||
|
||||
// layout of buffer is [phreeqc][surrogate]
|
||||
std::vector<double> recv_buffer(size);
|
||||
|
||||
recv_buffer.resize(size);
|
||||
MPI_Recv(recv_buffer.data(), size, MPI_DOUBLE, p, LOOP_CTRL,
|
||||
this->group_comm, MPI_STATUS_IGNORE);
|
||||
|
||||
std::copy(recv_buffer.begin(), recv_buffer.begin() + (size / 2),
|
||||
int half = size / 2;
|
||||
std::copy(recv_buffer.begin(), recv_buffer.begin() + half,
|
||||
w_list[p - 1].send_addr);
|
||||
|
||||
std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size,
|
||||
w_list[p - 1].surrogate_addr);
|
||||
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw std::runtime_error("Master received unknown MPI tag: " +
|
||||
std::to_string(probe_status.MPI_TAG));
|
||||
}
|
||||
}
|
||||
if (handled) {
|
||||
w_list[p - 1].has_work = 0;
|
||||
pkg_to_recv -= 1;
|
||||
free_workers++;
|
||||
recv_b = MPI_Wtime();
|
||||
this->recv_ctrl_t += recv_b - recv_a;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -408,10 +425,6 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
int i_pkgs;
|
||||
int ftype;
|
||||
|
||||
double ctrl_a, ctrl_b;
|
||||
double worker_ctrl_a, worker_ctrl_b;
|
||||
double ctrl_bcast_a, ctrl_bcast_b;
|
||||
|
||||
const std::vector<uint32_t> wp_sizes_vector =
|
||||
CalculateWPSizesVector(this->n_cells, this->wp_size);
|
||||
|
||||
@ -425,15 +438,18 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
MPI_INT);
|
||||
}
|
||||
|
||||
/* start time measurement of broadcasting interpolation status */
|
||||
ctrl_bcast_a = MPI_Wtime();
|
||||
uint32_t control_flag = control_module->GetControlIntervalEnabled();
|
||||
if (control_flag) {
|
||||
ftype = CHEM_CTRL;
|
||||
PropagateFunctionType(ftype);
|
||||
ChemBCast(&control_flag, 1, MPI_INT);
|
||||
}
|
||||
|
||||
/*
|
||||
ftype = CHEM_IP;
|
||||
PropagateFunctionType(ftype);
|
||||
ctrl_module->BCastControlFlags();
|
||||
/* end time measurement of broadcasting interpolation status */
|
||||
ctrl_bcast_b = MPI_Wtime();
|
||||
this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a;
|
||||
|
||||
*/
|
||||
ftype = CHEM_WORK_LOOP;
|
||||
PropagateFunctionType(ftype);
|
||||
|
||||
@ -441,32 +457,23 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
|
||||
static uint32_t iteration = 0;
|
||||
|
||||
uint32_t control_logic_enabled =
|
||||
ctrl_module->control_interval_enabled ? 1 : 0;
|
||||
|
||||
if (control_logic_enabled) {
|
||||
ctrl_module->sur_shuffled.clear();
|
||||
ctrl_module->sur_shuffled.reserve(this->n_cells * this->prop_count);
|
||||
}
|
||||
|
||||
/* start time measurement of sequential part */
|
||||
seq_a = MPI_Wtime();
|
||||
|
||||
/* shuffle grid */
|
||||
// grid.shuffleAndExport(mpi_buffer);
|
||||
|
||||
std::vector<double> mpi_buffer =
|
||||
shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size());
|
||||
|
||||
ctrl_module->sur_shuffled.resize(mpi_buffer.size());
|
||||
std::vector<double> mpi_surr_buffer{mpi_buffer};
|
||||
|
||||
/* setup local variables */
|
||||
pkg_to_send = wp_sizes_vector.size();
|
||||
pkg_to_recv = wp_sizes_vector.size();
|
||||
|
||||
workpointer_t work_pointer = mpi_buffer.begin();
|
||||
workpointer_t sur_pointer = ctrl_module->sur_shuffled.begin();
|
||||
workpointer_t sur_pointer = mpi_surr_buffer.begin();
|
||||
worker_list_t worker_list(this->comm_size - 1);
|
||||
|
||||
free_workers = this->comm_size - 1;
|
||||
@ -490,8 +497,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
if (pkg_to_send > 0) {
|
||||
// send packages to all free workers ...
|
||||
MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send,
|
||||
i_pkgs, free_workers, dt, iteration, control_logic_enabled,
|
||||
wp_sizes_vector);
|
||||
i_pkgs, free_workers, dt, iteration, wp_sizes_vector);
|
||||
}
|
||||
// ... and try to receive them from workers who has finished their work
|
||||
MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers);
|
||||
@ -516,22 +522,17 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
|
||||
|
||||
/* do master stuff */
|
||||
|
||||
/* start time measurement of control logic */
|
||||
ctrl_a = MPI_Wtime();
|
||||
|
||||
if (control_logic_enabled && !ctrl_module->rollback_enabled) {
|
||||
std::cout << "[Master] Control logic enabled for this iteration." << std::endl;
|
||||
std::vector<double> sur_unshuffled{ctrl_module->sur_shuffled};
|
||||
unshuffleField(ctrl_module->sur_shuffled, this->n_cells, this->prop_count,
|
||||
if (control_flag) {
|
||||
std::cout << "[Master] Control logic enabled for this iteration."
|
||||
<< std::endl;
|
||||
std::vector<double> sur_unshuffled{mpi_surr_buffer};
|
||||
unshuffleField(mpi_surr_buffer, this->n_cells, this->prop_count,
|
||||
wp_sizes_vector.size(), sur_unshuffled);
|
||||
|
||||
ctrl_module->computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells);
|
||||
control_module->computeSpeciesErrors(out_vec, sur_unshuffled,
|
||||
this->n_cells);
|
||||
}
|
||||
|
||||
/* end time measurement of control logic */
|
||||
ctrl_b = MPI_Wtime();
|
||||
this->ctrl_t += ctrl_b - ctrl_a;
|
||||
|
||||
/* start time measurement of master chemistry */
|
||||
sim_e_chemistry = MPI_Wtime();
|
||||
|
||||
|
||||
@ -9,17 +9,15 @@
|
||||
#include <cstdint>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <mpi.h>
|
||||
#include <limits>
|
||||
#include <mpi.h>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace poet
|
||||
{
|
||||
namespace poet {
|
||||
|
||||
inline std::string get_string(int root, MPI_Comm communicator)
|
||||
{
|
||||
inline std::string get_string(int root, MPI_Comm communicator) {
|
||||
int count;
|
||||
MPI_Bcast(&count, 1, MPI_INT, root, communicator);
|
||||
|
||||
@ -32,102 +30,80 @@ namespace poet
|
||||
delete[] buffer;
|
||||
|
||||
return ret_str;
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerLoop()
|
||||
{
|
||||
void poet::ChemistryModule::WorkerLoop() {
|
||||
struct worker_s timings;
|
||||
|
||||
// HACK: defining the worker iteration count here, which will increment after
|
||||
// each CHEM_ITER_END message
|
||||
uint32_t iteration = 1;
|
||||
bool loop = true;
|
||||
|
||||
while (loop)
|
||||
{
|
||||
while (loop) {
|
||||
int func_type;
|
||||
PropagateFunctionType(func_type);
|
||||
|
||||
switch (func_type)
|
||||
{
|
||||
case CHEM_FIELD_INIT:
|
||||
{
|
||||
switch (func_type) {
|
||||
case CHEM_FIELD_INIT: {
|
||||
ChemBCast(&this->prop_count, 1, MPI_UINT32_T);
|
||||
if (this->ai_surrogate_enabled)
|
||||
{
|
||||
if (this->ai_surrogate_enabled) {
|
||||
this->ai_surrogate_validity_vector.resize(
|
||||
this->n_cells); // resize statt reserve?
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CHEM_AI_BCAST_VALIDITY:
|
||||
{
|
||||
case CHEM_AI_BCAST_VALIDITY: {
|
||||
// Receive the index vector of valid ai surrogate predictions
|
||||
MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells,
|
||||
MPI_INT, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case CHEM_IP:
|
||||
{
|
||||
int interp_flag = 0;
|
||||
int dht_fill_flag = 0;
|
||||
|
||||
ChemBCast(&interp_flag, 1, MPI_INT);
|
||||
ChemBCast(&dht_fill_flag, 1, MPI_INT);
|
||||
|
||||
this->interp_enabled = (interp_flag == 1);
|
||||
this->dht_fill_during_rollback = (dht_fill_flag == 1);
|
||||
case CHEM_CTRL: {
|
||||
int control_flag = 0;
|
||||
ChemBCast(&control_flag, 1, MPI_INT);
|
||||
this->control_enabled = (control_flag == 1);
|
||||
break;
|
||||
}
|
||||
case CHEM_WORK_LOOP:
|
||||
{
|
||||
case CHEM_WORK_LOOP: {
|
||||
WorkerProcessPkgs(timings, iteration);
|
||||
break;
|
||||
}
|
||||
case CHEM_PERF:
|
||||
{
|
||||
case CHEM_PERF: {
|
||||
int type;
|
||||
ChemBCast(&type, 1, MPI_INT);
|
||||
if (type < WORKER_DHT_HITS)
|
||||
{
|
||||
if (type < WORKER_DHT_HITS) {
|
||||
WorkerPerfToMaster(type, timings);
|
||||
break;
|
||||
}
|
||||
WorkerMetricsToMaster(type);
|
||||
break;
|
||||
}
|
||||
case CHEM_BREAK_MAIN_LOOP:
|
||||
{
|
||||
case CHEM_BREAK_MAIN_LOOP: {
|
||||
WorkerPostSim(iteration);
|
||||
loop = false;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
default: {
|
||||
throw std::runtime_error("Worker received unknown tag from master.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
|
||||
uint32_t &iteration)
|
||||
{
|
||||
void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
|
||||
uint32_t &iteration) {
|
||||
MPI_Status probe_status;
|
||||
bool loop = true;
|
||||
|
||||
MPI_Barrier(this->group_comm);
|
||||
|
||||
while (loop)
|
||||
{
|
||||
while (loop) {
|
||||
double idle_a = MPI_Wtime();
|
||||
MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status);
|
||||
double idle_b = MPI_Wtime();
|
||||
|
||||
switch (probe_status.MPI_TAG)
|
||||
{
|
||||
case LOOP_WORK:
|
||||
{
|
||||
switch (probe_status.MPI_TAG) {
|
||||
case LOOP_WORK: {
|
||||
timings.idle_t += idle_b - idle_a;
|
||||
int count;
|
||||
MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
|
||||
@ -135,8 +111,7 @@ namespace poet
|
||||
WorkerDoWork(probe_status, count, timings);
|
||||
break;
|
||||
}
|
||||
case LOOP_END:
|
||||
{
|
||||
case LOOP_END: {
|
||||
WorkerPostIter(probe_status, iteration);
|
||||
iteration++;
|
||||
loop = false;
|
||||
@ -144,25 +119,22 @@ namespace poet
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
|
||||
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
|
||||
int double_count,
|
||||
struct worker_s &timings)
|
||||
{
|
||||
struct worker_s &timings) {
|
||||
static int counter = 1;
|
||||
|
||||
double dht_get_start, dht_get_end;
|
||||
double phreeqc_time_start, phreeqc_time_end;
|
||||
double dht_fill_start, dht_fill_end;
|
||||
double ctrl_time_c, ctrl_time_d;
|
||||
|
||||
uint32_t iteration;
|
||||
double dt;
|
||||
double current_sim_time;
|
||||
uint32_t wp_start_index;
|
||||
int count = double_count;
|
||||
bool control_logic_enabled = false;
|
||||
std::vector<double> mpi_buffer(count);
|
||||
|
||||
/* receive */
|
||||
@ -171,6 +143,7 @@ namespace poet
|
||||
|
||||
/* decrement count of work_package by BUFFER_OFFSET */
|
||||
count -= BUFFER_OFFSET;
|
||||
|
||||
/* check for changes on all additional variables given by the 'header' of
|
||||
* mpi_buffer */
|
||||
|
||||
@ -189,23 +162,24 @@ namespace poet
|
||||
// current work package start location in field
|
||||
wp_start_index = mpi_buffer[count + 4];
|
||||
|
||||
control_logic_enabled = (mpi_buffer[count + 5] == 1);
|
||||
std::cout << "[DEBUG][rank=" << this->comm_rank << "] WP " << counter
|
||||
<< " len=" << count << " | second element: " << mpi_buffer[1]
|
||||
<< " | iteration=" << iteration << " | dt=" << dt
|
||||
<< " | simtime=" << current_sim_time
|
||||
<< " | start_index=" << wp_start_index << std::endl;
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
|
||||
s_curr_wp.input[wp_i] =
|
||||
std::vector<double>(mpi_buffer.begin() + this->prop_count * wp_i,
|
||||
mpi_buffer.begin() + this->prop_count * (wp_i + 1));
|
||||
}
|
||||
|
||||
// std::cout << this->comm_rank << ":" << counter++ << std::endl;
|
||||
if (dht_enabled || interp_enabled)
|
||||
{
|
||||
if (dht_enabled || interp_enabled) {
|
||||
dht->prepareKeys(s_curr_wp.input, dt);
|
||||
}
|
||||
|
||||
if (dht_enabled)
|
||||
{
|
||||
if (dht_enabled) {
|
||||
/* check for values in DHT */
|
||||
dht_get_start = MPI_Wtime();
|
||||
dht->checkDHT(s_curr_wp);
|
||||
@ -213,106 +187,55 @@ namespace poet
|
||||
timings.dht_get += dht_get_end - dht_get_start;
|
||||
}
|
||||
|
||||
if (interp_enabled)
|
||||
{
|
||||
if (interp_enabled) {
|
||||
interp->tryInterpolation(s_curr_wp);
|
||||
}
|
||||
|
||||
if (this->ai_surrogate_enabled)
|
||||
{
|
||||
if (this->ai_surrogate_enabled) {
|
||||
// Map valid predictions from the ai surrogate in the workpackage
|
||||
for (int i = 0; i < s_curr_wp.size; i++)
|
||||
{
|
||||
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1)
|
||||
{
|
||||
for (int i = 0; i < s_curr_wp.size; i++) {
|
||||
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) {
|
||||
s_curr_wp.mapping[i] = CHEM_AISURR;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* if control iteration: create copy surrogate results (output and mappings) and then set them to zero,
|
||||
give this to phreeqc */
|
||||
/* if control iteration: create copy surrogate results (output and mappings)
|
||||
and then set them to zero, give this to phreeqc */
|
||||
|
||||
poet::WorkPackage s_curr_wp_control = s_curr_wp;
|
||||
|
||||
if (control_logic_enabled)
|
||||
{
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++)
|
||||
{
|
||||
s_curr_wp_control.output[wp_i] = std::vector<double>(this->prop_count, 0.0);
|
||||
if (control_enabled) {
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) {
|
||||
s_curr_wp_control.output[wp_i] = std::vector<double>(prop_count, 0.0);
|
||||
s_curr_wp_control.mapping[wp_i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
|
||||
WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp, current_sim_time, dt);
|
||||
WorkerRunWorkPackage(control_enabled ? s_curr_wp_control : s_curr_wp,
|
||||
current_sim_time, dt);
|
||||
|
||||
phreeqc_time_end = MPI_Wtime();
|
||||
|
||||
if (control_logic_enabled)
|
||||
{
|
||||
/* start time measurement for copying control workpackage */
|
||||
ctrl_time_c = MPI_Wtime();
|
||||
|
||||
std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count;
|
||||
|
||||
mpi_buffer.resize(count + sur_wp_offset);
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++)
|
||||
{
|
||||
std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(),
|
||||
mpi_buffer.begin() + this->prop_count * wp_i);
|
||||
}
|
||||
|
||||
// s_curr_wp only contains the interpolated data
|
||||
// copy surrogate output after the the pqc output, mpi_buffer[pqc][interp]
|
||||
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
if (s_curr_wp.mapping[wp_i] != CHEM_PQC) // only copy if surrogate was used
|
||||
{
|
||||
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
|
||||
mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i);
|
||||
} else
|
||||
{
|
||||
// if pqc was used, copy pqc results again
|
||||
std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(),
|
||||
mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
count += sur_wp_offset;
|
||||
|
||||
/* end time measurement for copying control workpackage */
|
||||
ctrl_time_d = MPI_Wtime();
|
||||
timings.ctrl_t += ctrl_time_d - ctrl_time_c;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++)
|
||||
{
|
||||
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
|
||||
mpi_buffer.begin() + this->prop_count * wp_i);
|
||||
}
|
||||
}
|
||||
count =
|
||||
packResultsIntoBuffer(mpi_buffer, count, s_curr_wp, s_curr_wp_control);
|
||||
|
||||
/* send results to master */
|
||||
MPI_Request send_req;
|
||||
|
||||
int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK;
|
||||
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req);
|
||||
int mpi_tag = control_enabled ? LOOP_CTRL : LOOP_WORK;
|
||||
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD,
|
||||
&send_req);
|
||||
|
||||
if (dht_enabled || interp_enabled || dht_fill_during_rollback)
|
||||
{
|
||||
if (dht_enabled || interp_enabled) {
|
||||
/* write results to DHT */
|
||||
dht_fill_start = MPI_Wtime();
|
||||
dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp);
|
||||
dht->fillDHT(control_enabled ? s_curr_wp_control : s_curr_wp);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
if (interp_enabled)
|
||||
{
|
||||
if (interp_enabled) {
|
||||
interp->writePairs();
|
||||
}
|
||||
timings.dht_fill += dht_fill_end - dht_fill_start;
|
||||
@ -320,34 +243,63 @@ namespace poet
|
||||
|
||||
timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start;
|
||||
MPI_Wait(&send_req, MPI_STATUS_IGNORE);
|
||||
}
|
||||
|
||||
int poet::ChemistryModule::packResultsIntoBuffer(
|
||||
std::vector<double> &mpi_buffer, int base_count, const WorkPackage &wp,
|
||||
const WorkPackage &wp_control) {
|
||||
if (control_enabled) {
|
||||
std::size_t wp_offset = wp_control.size * prop_count;
|
||||
mpi_buffer.resize(base_count + wp_offset);
|
||||
|
||||
/* copy pqc outputs first */
|
||||
for (std::size_t wp_i = 0; wp_i < wp_control.size; wp_i++) {
|
||||
std::copy(wp_control.output[wp_i].begin(), wp_control.output[wp_i].end(),
|
||||
mpi_buffer.begin() + prop_count * wp_i);
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
|
||||
uint32_t iteration)
|
||||
{
|
||||
/* copy surrogate output, only if it contains interpolated data, after the
|
||||
* the pqc output, layout = mpi_buffer[pqc][interp] */
|
||||
for (std::size_t wp_i = 0; wp_i < wp.size; wp_i++) {
|
||||
const auto &wp_copy = wp.mapping[wp_i] != CHEM_PQC
|
||||
? wp.output[wp_i]
|
||||
: wp_control.output[wp_i];
|
||||
|
||||
std::copy(wp_copy.begin(), wp_copy.end(),
|
||||
mpi_buffer.begin() + wp_offset + prop_count * wp_i);
|
||||
}
|
||||
return base_count + static_cast<int>(wp_offset);
|
||||
|
||||
} else {
|
||||
for (std::size_t wp_i = 0; wp_i < wp.size; wp_i++) {
|
||||
std::copy(wp.output[wp_i].begin(), wp.output[wp_i].end(),
|
||||
mpi_buffer.begin() + prop_count + wp_i);
|
||||
}
|
||||
return base_count;
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
|
||||
uint32_t iteration) {
|
||||
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm,
|
||||
MPI_STATUS_IGNORE);
|
||||
|
||||
if (this->dht_enabled)
|
||||
{
|
||||
if (this->dht_enabled) {
|
||||
dht_hits.push_back(dht->getHits());
|
||||
dht_evictions.push_back(dht->getEvictions());
|
||||
dht->resetCounter();
|
||||
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND)
|
||||
{
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
}
|
||||
|
||||
if (this->interp_enabled)
|
||||
{
|
||||
if (this->interp_enabled) {
|
||||
std::stringstream out;
|
||||
interp_calls.push_back(interp->getInterpolationCount());
|
||||
interp->resetCounter();
|
||||
interp->writePHTStats();
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND)
|
||||
{
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
@ -356,37 +308,31 @@ namespace poet
|
||||
const auto max_mean_idx =
|
||||
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
|
||||
|
||||
if (max_mean_idx >= 2)
|
||||
{
|
||||
if (max_mean_idx >= 2) {
|
||||
DHT_flush(this->interp->getDHTObject());
|
||||
DHT_flush(this->dht->getDHT());
|
||||
if (this->comm_rank == 2)
|
||||
{
|
||||
if (this->comm_rank == 2) {
|
||||
std::cout << "Flushed both DHT and PHT!\n\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RInsidePOET::getInstance().parseEvalQ("gc()");
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPostSim(uint32_t iteration)
|
||||
{
|
||||
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND)
|
||||
{
|
||||
void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
|
||||
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND)
|
||||
{
|
||||
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
std::stringstream out;
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration)
|
||||
{
|
||||
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
|
||||
std::stringstream out;
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".dht";
|
||||
@ -397,138 +343,113 @@ namespace poet
|
||||
else if (this->comm_rank == 2)
|
||||
std::cout << "CPP: Worker: Successfully written DHT to file " << out.str()
|
||||
<< "\n";
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerReadDHTDump(
|
||||
const std::string &dht_input_file)
|
||||
{
|
||||
void poet::ChemistryModule::WorkerReadDHTDump(
|
||||
const std::string &dht_input_file) {
|
||||
int res = dht->fileToTable((char *)dht_input_file.c_str());
|
||||
if (res != DHT_SUCCESS)
|
||||
{
|
||||
if (res == DHT_WRONG_FILE)
|
||||
{
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res == DHT_WRONG_FILE) {
|
||||
if (this->comm_rank == 1)
|
||||
std::cerr
|
||||
<< "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
if (this->comm_rank == 1)
|
||||
std::cerr << "CPP: Worker: Error in loading current state of DHT from "
|
||||
"file. Continue with empty DHT ...\n";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
if (this->comm_rank == 2)
|
||||
std::cout << "CPP: Worker: Successfully loaded state of DHT from file "
|
||||
<< dht_input_file << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
|
||||
void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
|
||||
double dSimTime,
|
||||
double dTimestep)
|
||||
{
|
||||
double dTimestep) {
|
||||
|
||||
std::vector<std::vector<double>> inout_chem = work_package.input;
|
||||
std::vector<std::size_t> to_ignore;
|
||||
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++)
|
||||
{
|
||||
if (work_package.mapping[wp_id] != CHEM_PQC)
|
||||
{
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
|
||||
if (work_package.mapping[wp_id] != CHEM_PQC) {
|
||||
to_ignore.push_back(wp_id);
|
||||
}
|
||||
|
||||
// HACK: remove the first element (cell_id) before sending to phreeqc
|
||||
inout_chem[wp_id].erase(
|
||||
inout_chem[wp_id].begin(), inout_chem[wp_id].begin() + 1);
|
||||
inout_chem[wp_id].erase(inout_chem[wp_id].begin(),
|
||||
inout_chem[wp_id].begin() + 1);
|
||||
}
|
||||
|
||||
this->pqc_runner->run(inout_chem, dTimestep, to_ignore);
|
||||
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++)
|
||||
{
|
||||
if (work_package.mapping[wp_id] == CHEM_PQC)
|
||||
{
|
||||
// HACK: as we removed the first element (cell_id) before sending to phreeqc,
|
||||
// copy back with an offset of 1
|
||||
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
|
||||
if (work_package.mapping[wp_id] == CHEM_PQC) {
|
||||
// HACK: as we removed the first element (cell_id) before sending to
|
||||
// phreeqc, copy back with an offset of 1
|
||||
work_package.output[wp_id] = work_package.input[wp_id];
|
||||
std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(),
|
||||
work_package.output[wp_id].begin() + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerPerfToMaster(int type,
|
||||
const struct worker_s &timings)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case WORKER_PHREEQC:
|
||||
{
|
||||
void poet::ChemistryModule::WorkerPerfToMaster(int type,
|
||||
const struct worker_s &timings) {
|
||||
switch (type) {
|
||||
case WORKER_PHREEQC: {
|
||||
MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_CTRL_ITER:
|
||||
{
|
||||
case WORKER_CTRL_ITER: {
|
||||
MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_GET:
|
||||
{
|
||||
case WORKER_DHT_GET: {
|
||||
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_FILL:
|
||||
{
|
||||
case WORKER_DHT_FILL: {
|
||||
MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IDLE:
|
||||
{
|
||||
case WORKER_IDLE: {
|
||||
MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_WRITE:
|
||||
{
|
||||
case WORKER_IP_WRITE: {
|
||||
double val = interp->getPHTWriteTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_READ:
|
||||
{
|
||||
case WORKER_IP_READ: {
|
||||
double val = interp->getPHTReadTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_GATHER:
|
||||
{
|
||||
case WORKER_IP_GATHER: {
|
||||
double val = interp->getDHTGatherTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_FC:
|
||||
{
|
||||
case WORKER_IP_FC: {
|
||||
double val = interp->getInterpolationTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
default: {
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerMetricsToMaster(int type)
|
||||
{
|
||||
void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
|
||||
MPI_Comm worker_comm = dht->getCommunicator();
|
||||
int worker_rank;
|
||||
MPI_Comm_rank(worker_comm, &worker_rank);
|
||||
@ -536,47 +457,39 @@ namespace poet
|
||||
MPI_Comm &group_comm = this->group_comm;
|
||||
|
||||
auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm](
|
||||
std::vector<std::uint32_t> &send_buffer, int tag)
|
||||
{
|
||||
std::vector<std::uint32_t> &send_buffer, int tag) {
|
||||
std::vector<uint32_t> to_master(send_buffer.size());
|
||||
MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(),
|
||||
MPI_UINT32_T, MPI_SUM, 0, worker_comm);
|
||||
|
||||
if (worker_rank == 0)
|
||||
{
|
||||
if (worker_rank == 0) {
|
||||
MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag,
|
||||
group_comm);
|
||||
}
|
||||
};
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case WORKER_DHT_HITS:
|
||||
{
|
||||
switch (type) {
|
||||
case WORKER_DHT_HITS: {
|
||||
reduce_and_send(dht_hits, WORKER_DHT_HITS);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_EVICTIONS:
|
||||
{
|
||||
case WORKER_DHT_EVICTIONS: {
|
||||
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_CALLS:
|
||||
{
|
||||
case WORKER_IP_CALLS: {
|
||||
reduce_and_send(interp_calls, WORKER_IP_CALLS);
|
||||
return;
|
||||
}
|
||||
case WORKER_PHT_CACHE_HITS:
|
||||
{
|
||||
case WORKER_PHT_CACHE_HITS: {
|
||||
std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
|
||||
reduce_and_send(input, WORKER_PHT_CACHE_HITS);
|
||||
return;
|
||||
}
|
||||
default:
|
||||
{
|
||||
default: {
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace poet
|
||||
|
||||
@ -4,15 +4,23 @@
|
||||
#include "IO/StatsIO.hpp"
|
||||
#include <cmath>
|
||||
|
||||
bool poet::ControlModule::isControlIteration(uint32_t iter) {
|
||||
void poet::ControlModule::updateControlIteration(const uint32_t iter) {
|
||||
|
||||
global_iteration = iter;
|
||||
|
||||
if (control_interval == 0) {
|
||||
control_interval_enabled = false;
|
||||
return;
|
||||
}
|
||||
|
||||
control_interval_enabled = (iter % control_interval == 0);
|
||||
if (control_interval_enabled) {
|
||||
MSG("[Control] Control interval triggered at iteration " +
|
||||
MSG("[Control] Control interval enabled at iteration " +
|
||||
std::to_string(iter));
|
||||
}
|
||||
return control_interval_enabled;
|
||||
}
|
||||
|
||||
/*
|
||||
void poet::ControlModule::beginIteration() {
|
||||
if (rollback_enabled) {
|
||||
if (sur_disabled_counter > 0) {
|
||||
@ -23,19 +31,23 @@ void poet::ControlModule::beginIteration() {
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
void poet::ControlModule::endIteration(uint32_t iter) {
|
||||
void poet::ControlModule::endIteration(const uint32_t iter) {
|
||||
|
||||
if (!control_interval_enabled) {
|
||||
return;
|
||||
}
|
||||
/* Writing a checkpointing */
|
||||
if (checkpoint_interval > 0 && iter % checkpoint_interval == 0) {
|
||||
/* Control Logic*/
|
||||
if (control_interval_enabled &&
|
||||
checkpoint_interval > 0 /*&& !rollback_enabled*/) {
|
||||
MSG("Writing checkpoint of iteration " + std::to_string(iter));
|
||||
write_checkpoint(out_dir, "checkpoint" + std::to_string(iter) + ".hdf5",
|
||||
{.field = chem->getField(), .iteration = iter});
|
||||
}
|
||||
writeStatsToCSV(error_history, species_names, out_dir, "stats_overview");
|
||||
|
||||
/* Control Logic*/
|
||||
if (control_interval_enabled && !rollback_enabled) {
|
||||
writeStatsToCSV(error_history, species_names, out_dir,
|
||||
"stats_overview");
|
||||
/*
|
||||
|
||||
if (triggerRollbackIfExceeded(*chem, *params, iter)) {
|
||||
rollback_enabled = true;
|
||||
@ -44,9 +56,12 @@ void poet::ControlModule::endIteration(uint32_t iter) {
|
||||
MSG("Interpolation disabled for the next " +
|
||||
std::to_string(control_interval) + ".");
|
||||
}
|
||||
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
void poet::ControlModule::BCastControlFlags() {
|
||||
int interp_flag = rollback_enabled ? 0 : 1;
|
||||
int dht_fill_flag = rollback_enabled ? 1 : 0;
|
||||
@ -54,6 +69,9 @@ void poet::ControlModule::BCastControlFlags() {
|
||||
chem->ChemBCast(&dht_fill_flag, 1, MPI_INT);
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
/*
|
||||
bool poet::ControlModule::triggerRollbackIfExceeded(ChemistryModule &chem,
|
||||
RuntimeParameters ¶ms,
|
||||
uint32_t &iter) {
|
||||
@ -91,17 +109,20 @@ bool poet::ControlModule::triggerRollbackIfExceeded(ChemistryModule &chem,
|
||||
}
|
||||
}
|
||||
MSG("All species are within their MAPE and RRMSE thresholds.");
|
||||
return false;
|
||||
}
|
||||
return
|
||||
|
||||
false;
|
||||
}
|
||||
*/
|
||||
void poet::ControlModule::computeSpeciesErrors(
|
||||
const std::vector<double> &reference_values,
|
||||
const std::vector<double> &surrogate_values, uint32_t size_per_prop) {
|
||||
const std::vector<double> &surrogate_values, const uint32_t size_per_prop) {
|
||||
|
||||
SimulationErrorStats species_error_stats(species_count, params->global_iter,
|
||||
rollback_counter);
|
||||
SimulationErrorStats species_error_stats(this->species_names.size(),
|
||||
global_iteration,
|
||||
/*rollback_counter*/ 0);
|
||||
|
||||
for (uint32_t i = 0; i < species_count; ++i) {
|
||||
for (uint32_t i = 0; i < this->species_names.size(); ++i) {
|
||||
double err_sum = 0.0;
|
||||
double sqr_err_sum = 0.0;
|
||||
uint32_t base_idx = i * size_per_prop;
|
||||
|
||||
@ -16,43 +16,24 @@ class ChemistryModule;
|
||||
class ControlModule {
|
||||
|
||||
public:
|
||||
ControlModule(RuntimeParameters *run_params, ChemistryModule *chem_module)
|
||||
: params(run_params), chem(chem_module) {};
|
||||
|
||||
/* Control configuration*/
|
||||
std::vector<std::string> species_names;
|
||||
uint32_t species_count = 0;
|
||||
std::string out_dir;
|
||||
|
||||
bool rollback_enabled = false;
|
||||
bool control_interval_enabled = false;
|
||||
// std::uint32_t global_iter = 0;
|
||||
// std::uint32_t sur_disabled_counter = 0;
|
||||
// std::uint32_t rollback_counter = 0;
|
||||
|
||||
std::uint32_t global_iter = 0;
|
||||
std::uint32_t sur_disabled_counter = 0;
|
||||
std::uint32_t rollback_counter = 0;
|
||||
std::uint32_t checkpoint_interval = 0;
|
||||
std::uint32_t control_interval = 0;
|
||||
void updateControlIteration(const uint32_t iter);
|
||||
|
||||
std::vector<double> mape_threshold;
|
||||
std::vector<double> rrmse_threshold;
|
||||
auto GetGlobalIteration() const noexcept { return global_iteration; }
|
||||
|
||||
double ctrl_t = 0.;
|
||||
double bcast_ctrl_t = 0.;
|
||||
double recv_ctrl_t = 0.;
|
||||
// void beginIteration();
|
||||
|
||||
/* Buffer for shuffled surrogate data */
|
||||
std::vector<double> sur_shuffled;
|
||||
void endIteration(const uint32_t iter);
|
||||
|
||||
bool isControlIteration(uint32_t iter);
|
||||
// void BCastControlFlags();
|
||||
|
||||
void beginIteration();
|
||||
|
||||
void endIteration(uint32_t iter);
|
||||
|
||||
void BCastControlFlags();
|
||||
|
||||
bool triggerRollbackIfExceeded(ChemistryModule &chem,
|
||||
RuntimeParameters ¶ms, uint32_t &iter);
|
||||
//bool triggerRollbackIfExceeded(ChemistryModule &chem,
|
||||
// RuntimeParameters ¶ms, uint32_t &iter);
|
||||
|
||||
struct SimulationErrorStats {
|
||||
std::vector<double> mape;
|
||||
@ -60,14 +41,14 @@ public:
|
||||
uint32_t iteration; // iterations in simulation after rollbacks
|
||||
uint32_t rollback_count;
|
||||
|
||||
SimulationErrorStats(size_t species_count, uint32_t iter, uint32_t counter)
|
||||
SimulationErrorStats(uint32_t species_count, uint32_t iter, uint32_t counter)
|
||||
: mape(species_count, 0.0), rrmse(species_count, 0.0), iteration(iter),
|
||||
rollback_count(counter) {}
|
||||
};
|
||||
|
||||
static void computeSpeciesErrors(const std::vector<double> &reference_values,
|
||||
void computeSpeciesErrors(const std::vector<double> &reference_values,
|
||||
const std::vector<double> &surrogate_values,
|
||||
uint32_t size_per_prop);
|
||||
const uint32_t size_per_prop);
|
||||
|
||||
std::vector<SimulationErrorStats> error_history;
|
||||
|
||||
@ -75,34 +56,53 @@ public:
|
||||
std::string out_dir;
|
||||
std::uint32_t checkpoint_interval;
|
||||
std::uint32_t control_interval;
|
||||
std::uint32_t species_count;
|
||||
|
||||
std::vector<std::string> species_names;
|
||||
std::vector<double> mape_threshold;
|
||||
std::vector<double> rrmse_threshold;
|
||||
};
|
||||
|
||||
void enableControlLogic(const ControlSetup &setup) {
|
||||
out_dir = setup.out_dir;
|
||||
checkpoint_interval = setup.checkpoint_interval;
|
||||
control_interval = setup.control_interval;
|
||||
species_count = setup.species_count;
|
||||
|
||||
species_names = setup.species_names;
|
||||
mape_threshold = setup.mape_threshold;
|
||||
rrmse_threshold = setup.rrmse_threshold;
|
||||
this->out_dir = setup.out_dir;
|
||||
this->checkpoint_interval = setup.checkpoint_interval;
|
||||
this->control_interval = setup.control_interval;
|
||||
this->species_names = setup.species_names;
|
||||
this->mape_threshold = setup.mape_threshold;
|
||||
}
|
||||
|
||||
bool GetControlIntervalEnabled() const {
|
||||
return this->control_interval_enabled;
|
||||
}
|
||||
|
||||
auto GetControlInterval() const { return this->control_interval; }
|
||||
|
||||
std::vector<double> GetMapeThreshold() const { return this->mape_threshold; }
|
||||
|
||||
/* Profiling getters */
|
||||
auto GetMasterCtrlLogicTime() const { return this->ctrl_t; }
|
||||
auto GetMasterCtrlLogicTime() const { return this->ctrl_time; }
|
||||
|
||||
auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; }
|
||||
auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_time; }
|
||||
|
||||
auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; }
|
||||
auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_time; }
|
||||
|
||||
private:
|
||||
RuntimeParameters *params;
|
||||
ChemistryModule *chem;
|
||||
bool rollback_enabled = false;
|
||||
bool control_interval_enabled = false;
|
||||
|
||||
poet::ChemistryModule *chem = nullptr;
|
||||
|
||||
std::uint32_t checkpoint_interval = 0;
|
||||
std::uint32_t control_interval = 0;
|
||||
std::uint32_t global_iteration = 0;
|
||||
std::vector<double> mape_threshold;
|
||||
|
||||
std::vector<std::string> species_names;
|
||||
std::string out_dir;
|
||||
|
||||
double ctrl_time = 0.0;
|
||||
double bcast_ctrl_time = 0.0;
|
||||
double recv_ctrl_time = 0.0;
|
||||
|
||||
/* Buffer for shuffled surrogate data */
|
||||
std::vector<double> sur_shuffled;
|
||||
};
|
||||
|
||||
} // namespace poet
|
||||
|
||||
34
src/poet.cpp
34
src/poet.cpp
@ -25,7 +25,7 @@
|
||||
#include "Base/RInsidePOET.hpp"
|
||||
#include "CLI/CLI.hpp"
|
||||
#include "Chemistry/ChemistryModule.hpp"
|
||||
#include "Control/ControlManager.hpp"
|
||||
#include "Control/ControlModule.hpp"
|
||||
#include "DataStructures/Field.hpp"
|
||||
#include "Init/InitialList.hpp"
|
||||
#include "Transport/DiffusionModule.hpp"
|
||||
@ -255,8 +255,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) {
|
||||
Rcpp::as<uint32_t>(global_rt_setup->operator[]("checkpoint_interval"));
|
||||
params.mape_threshold = Rcpp::as<std::vector<double>>(
|
||||
global_rt_setup->operator[]("mape_threshold"));
|
||||
params.rrmse_threshold = Rcpp::as<std::vector<double>>(
|
||||
global_rt_setup->operator[]("rrmse_threshold"));
|
||||
} catch (const std::exception &e) {
|
||||
ERRMSG("Error while parsing R scripts: " + std::string(e.what()));
|
||||
return ParseRet::PARSER_ERROR;
|
||||
@ -300,7 +298,6 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
/* SIMULATION LOOP */
|
||||
|
||||
double dSimTime{0};
|
||||
double chkTime = 0.0;
|
||||
|
||||
for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
|
||||
// Rollback countdowm
|
||||
@ -315,10 +312,10 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
}
|
||||
}
|
||||
*/
|
||||
control.beginIteration(iter);
|
||||
//control.beginIteration(iter);
|
||||
|
||||
// params.global_iter = iter;
|
||||
control.isControlIteration(iter);
|
||||
control.updateControlIteration(iter);
|
||||
// params.control_interval_enabled = (iter % params.control_interval == 0);
|
||||
|
||||
double start_t = MPI_Wtime();
|
||||
@ -431,8 +428,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
|
||||
std::to_string(maxiter));
|
||||
|
||||
double chk_start = MPI_Wtime();
|
||||
control.endIteration(iter)
|
||||
control.endIteration(iter);
|
||||
/*
|
||||
if (iter % params.checkpoint_interval == 0) {
|
||||
MSG("Writing checkpoint of iteration " + std::to_string(iter));
|
||||
@ -457,8 +453,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
|
||||
*/
|
||||
|
||||
double chk_end = MPI_Wtime();
|
||||
chkTime += chk_end - chk_start;
|
||||
|
||||
|
||||
// MSG();
|
||||
} // END SIMULATION LOOP
|
||||
@ -476,13 +471,14 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
Rcpp::List diffusion_profiling;
|
||||
diffusion_profiling["simtime"] = diffusion.getTransportTime();
|
||||
|
||||
Rcpp::List ctrl_profiling;
|
||||
/*Rcpp::List ctrl_profiling;
|
||||
ctrl_profiling["checkpointing_time"] = chkTime;
|
||||
ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime();
|
||||
ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime();
|
||||
ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime();
|
||||
ctrl_profiling["ctrl_logic_worker"] =
|
||||
Rcpp::wrap(chem.GetWorkerControlTimings());
|
||||
*/
|
||||
|
||||
if (params.use_dht) {
|
||||
chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits());
|
||||
@ -510,7 +506,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms,
|
||||
profiling["simtime"] = dSimTime;
|
||||
profiling["chemistry"] = chem_profiling;
|
||||
profiling["diffusion"] = diffusion_profiling;
|
||||
profiling["ctrl_logic"] = ctrl_profiling;
|
||||
//profiling["ctrl_logic"] = ctrl_profiling;
|
||||
|
||||
chem.MasterLoopBreak();
|
||||
|
||||
@ -652,7 +648,10 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
ChemistryModule chemistry(run_params.work_package_size,
|
||||
init_list.getChemistryInit(), MPI_COMM_WORLD);
|
||||
ControlModule control(&run_params, &chemistry);
|
||||
|
||||
ControlModule control;
|
||||
|
||||
chemistry.setControlModule(&control);
|
||||
|
||||
const ChemistryModule::SurrogateSetup surr_setup = {
|
||||
getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD),
|
||||
@ -674,14 +673,11 @@ int main(int argc, char *argv[]) {
|
||||
run_params.out_dir, // added
|
||||
run_params.checkpoint_interval,
|
||||
run_params.control_interval,
|
||||
run_params.species_count,
|
||||
run_params.species_names,
|
||||
run_params.mape_threshold,
|
||||
run_params.rrmse_threshold};
|
||||
getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD),
|
||||
run_params.mape_threshold};
|
||||
|
||||
control.enableControlLogic(ctrl_setup);
|
||||
|
||||
|
||||
if (MY_RANK > 0) {
|
||||
chemistry.WorkerLoop();
|
||||
} else {
|
||||
@ -725,7 +721,7 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
chemistry.masterSetField(init_list.getInitialGrid());
|
||||
|
||||
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
|
||||
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry, control);
|
||||
|
||||
MSG("finished simulation loop");
|
||||
|
||||
|
||||
@ -51,15 +51,9 @@ struct RuntimeParameters {
|
||||
|
||||
bool print_progress = false;
|
||||
|
||||
bool rollback_enabled = false;
|
||||
bool control_interval_enabled = false;
|
||||
std::uint32_t global_iter = 0;
|
||||
std::uint32_t sur_disabled_counter = 0;
|
||||
std::uint32_t rollback_counter = 0;
|
||||
std::uint32_t checkpoint_interval = 0;
|
||||
std::uint32_t control_interval = 0;
|
||||
std::vector<double> mape_threshold;
|
||||
std::vector<double> rrmse_threshold;
|
||||
|
||||
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
|
||||
std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user