From dc940b2f886ed8dedcca8121286ae30897ad555d Mon Sep 17 00:00:00 2001 From: rastogi Date: Thu, 23 Oct 2025 23:16:58 +0200 Subject: [PATCH] Wp data not being shuffled correctly. --- src/Chemistry/ChemistryModule.hpp | 33 +- src/Chemistry/MasterFunctions.cpp | 137 ++--- src/Chemistry/WorkerFunctions.cpp | 981 ++++++++++++++---------------- src/Control/ControlModule.cpp | 53 +- src/Control/ControlModule.hpp | 96 +-- src/poet.cpp | 34 +- src/poet.hpp.in | 6 - 7 files changed, 632 insertions(+), 708 deletions(-) diff --git a/src/Chemistry/ChemistryModule.hpp b/src/Chemistry/ChemistryModule.hpp index 73be52c60..8a96084c5 100644 --- a/src/Chemistry/ChemistryModule.hpp +++ b/src/Chemistry/ChemistryModule.hpp @@ -2,15 +2,16 @@ #ifndef CHEMISTRYMODULE_H_ #define CHEMISTRYMODULE_H_ +#include "ChemistryDefs.hpp" #include "DataStructures/Field.hpp" #include "DataStructures/NamedVector.hpp" #include "ChemistryDefs.hpp" #include "Control/ControlModule.hpp" #include "Init/InitialList.hpp" #include "NameDouble.h" +#include "PhreeqcRunner.hpp" #include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/Interpolation.hpp" -#include "PhreeqcRunner.hpp" #include #include @@ -174,12 +175,6 @@ public: */ auto GetMasterLoopTime() const { return this->send_recv_t; } - auto GetMasterCtrlLogicTime() const { return this->ctrl_t; } - - auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; } - - auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; } - /** * **Master only** Collect and return all accumulated timings recorded by * workers to run Phreeqc simulation. @@ -257,6 +252,8 @@ public: std::vector ai_surrogate_validity_vector; + void setControlModule(poet::ControlModule *ctrl) { control_module = ctrl; } + protected: void initializeDHT(uint32_t size_mb, const NamedVector &key_species, @@ -274,7 +271,8 @@ protected: CHEM_DHT_SIGNIF_VEC, CHEM_DHT_SNAPS, CHEM_DHT_READ_FILE, - CHEM_IP, // Control Flag + //CHEM_IP, // Control flag + CHEM_CTRL, // Control flag CHEM_IP_ENABLE, CHEM_IP_MIN_ENTRIES, CHEM_IP_SIGNIF_VEC, @@ -329,7 +327,7 @@ protected: void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer, workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs, int &free_workers, double dt, - uint32_t iteration, uint32_t control_iteration, + uint32_t iteration, const std::vector &wp_sizes_vector); void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send, int &free_workers); @@ -367,6 +365,10 @@ protected: void BCastStringVec(std::vector &io); + int packResultsIntoBuffer(std::vector &mpi_buffer, int base_count, + const WorkPackage &wp, + const WorkPackage &wp_control); + int comm_size, comm_rank; MPI_Comm group_comm; @@ -380,13 +382,12 @@ protected: poet::DHT_Wrapper *dht = nullptr; - bool dht_fill_during_rollback{false}; bool interp_enabled{false}; std::unique_ptr interp; bool ai_surrogate_enabled{false}; - static constexpr uint32_t BUFFER_OFFSET = 6; + static constexpr uint32_t BUFFER_OFFSET = 5; inline void ChemBCast(void *buf, int count, MPI_Datatype datatype) const { MPI_Bcast(buf, count, datatype, 0, this->group_comm); @@ -400,10 +401,6 @@ protected: double seq_t = 0.; double send_recv_t = 0.; - double ctrl_t = 0.; - double bcast_ctrl_t = 0.; - double recv_ctrl_t = 0.; - std::array base_totals{0}; bool print_progessbar{false}; @@ -422,9 +419,11 @@ protected: std::unique_ptr pqc_runner; - std::unique_ptr ctrl_module; + poet::ControlModule *control_module = nullptr; - //std::vector sur_shuffled; + bool control_enabled{false}; + + // std::vector sur_shuffled; }; } // namespace poet diff --git a/src/Chemistry/MasterFunctions.cpp b/src/Chemistry/MasterFunctions.cpp index 4c75fb3cd..0bed2310f 100644 --- a/src/Chemistry/MasterFunctions.cpp +++ b/src/Chemistry/MasterFunctions.cpp @@ -235,7 +235,7 @@ inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) { inline void poet::ChemistryModule::MasterSendPkgs( worker_list_t &w_list, workpointer_t &work_pointer, workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs, - int &free_workers, double dt, uint32_t iteration, uint32_t control_interval, + int &free_workers, double dt, uint32_t iteration, const std::vector &wp_sizes_vector) { /* declare variables */ int local_work_package_size; @@ -276,14 +276,23 @@ inline void poet::ChemistryModule::MasterSendPkgs( std::accumulate(wp_sizes_vector.begin(), std::next(wp_sizes_vector.begin(), count_pkgs), 0); send_buffer[end_of_wp + 4] = wp_start_index; - // whether this iteration is a control iteration - send_buffer[end_of_wp + 5] = control_interval; /* ATTENTION Worker p has rank p+1 */ // MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1, // LOOP_WORK, this->group_comm); MPI_Send(send_buffer.data(), send_buffer.size(), MPI_DOUBLE, p + 1, LOOP_WORK, this->group_comm); + + /* ---- DEBUG LOG (Sender side) ---- */ + std::cout << "[DEBUG][rank=" << p+1 + << "] sending WP " << (count_pkgs - 1) + << " to worker rank " << (p + 1) + << " | len=" << send_buffer.size() + << " | start index=" << wp_start_index + << " | second element=" << send_buffer[1] + << " | pkg size=" << local_work_package_size + << std::endl; + /* -------------------------------- */ /* Mark that worker has work to do */ w_list[p].has_work = 1; @@ -301,8 +310,9 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, int need_to_receive = 1; double idle_a, idle_b; int p, size; - double recv_a, recv_b; + std::vector recv_buffer; + recv_buffer.reserve(wp_size * prop_count * 2); MPI_Status probe_status; // master_recv_a = MPI_Wtime(); /* start to loop as long there are packages to recv and the need to receive @@ -320,41 +330,48 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, idle_b = MPI_Wtime(); this->idle_t += idle_b - idle_a; } - + if (!need_to_receive) { + continue; + } /* if need_to_receive was set to true above, so there is a message to * receive */ - if (need_to_receive) { - p = probe_status.MPI_SOURCE; - if (probe_status.MPI_TAG == LOOP_WORK) { - MPI_Get_count(&probe_status, MPI_DOUBLE, &size); - MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK, - this->group_comm, MPI_STATUS_IGNORE); - w_list[p - 1].has_work = 0; - pkg_to_recv -= 1; - free_workers++; - } - if (probe_status.MPI_TAG == LOOP_CTRL) { - recv_a = MPI_Wtime(); - MPI_Get_count(&probe_status, MPI_DOUBLE, &size); + p = probe_status.MPI_SOURCE; + bool handled = false; - // layout of buffer is [phreeqc][surrogate] - std::vector recv_buffer(size); + switch (probe_status.MPI_TAG) { + case LOOP_WORK: { + MPI_Get_count(&probe_status, MPI_DOUBLE, &size); + MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK, + this->group_comm, MPI_STATUS_IGNORE); + handled = true; + break; + } + case LOOP_CTRL: { + /* layout of buffer is [phreeqc][surrogate] */ + MPI_Get_count(&probe_status, MPI_DOUBLE, &size); + recv_buffer.resize(size); + MPI_Recv(recv_buffer.data(), size, MPI_DOUBLE, p, LOOP_CTRL, + this->group_comm, MPI_STATUS_IGNORE); - MPI_Recv(recv_buffer.data(), size, MPI_DOUBLE, p, LOOP_CTRL, - this->group_comm, MPI_STATUS_IGNORE); + int half = size / 2; + std::copy(recv_buffer.begin(), recv_buffer.begin() + half, + w_list[p - 1].send_addr); - std::copy(recv_buffer.begin(), recv_buffer.begin() + (size / 2), - w_list[p - 1].send_addr); + std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size, + w_list[p - 1].surrogate_addr); - std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size, - w_list[p - 1].surrogate_addr); - - w_list[p - 1].has_work = 0; - pkg_to_recv -= 1; - free_workers++; - recv_b = MPI_Wtime(); - this->recv_ctrl_t += recv_b - recv_a; - } + handled = true; + break; + } + default: { + throw std::runtime_error("Master received unknown MPI tag: " + + std::to_string(probe_status.MPI_TAG)); + } + } + if (handled) { + w_list[p - 1].has_work = 0; + pkg_to_recv -= 1; + free_workers++; } } } @@ -408,10 +425,6 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { int i_pkgs; int ftype; - double ctrl_a, ctrl_b; - double worker_ctrl_a, worker_ctrl_b; - double ctrl_bcast_a, ctrl_bcast_b; - const std::vector wp_sizes_vector = CalculateWPSizesVector(this->n_cells, this->wp_size); @@ -425,15 +438,18 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { MPI_INT); } - /* start time measurement of broadcasting interpolation status */ - ctrl_bcast_a = MPI_Wtime(); + uint32_t control_flag = control_module->GetControlIntervalEnabled(); + if (control_flag) { + ftype = CHEM_CTRL; + PropagateFunctionType(ftype); + ChemBCast(&control_flag, 1, MPI_INT); + } + + /* ftype = CHEM_IP; PropagateFunctionType(ftype); ctrl_module->BCastControlFlags(); - /* end time measurement of broadcasting interpolation status */ - ctrl_bcast_b = MPI_Wtime(); - this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a; - +*/ ftype = CHEM_WORK_LOOP; PropagateFunctionType(ftype); @@ -441,32 +457,23 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { static uint32_t iteration = 0; - uint32_t control_logic_enabled = - ctrl_module->control_interval_enabled ? 1 : 0; - - if (control_logic_enabled) { - ctrl_module->sur_shuffled.clear(); - ctrl_module->sur_shuffled.reserve(this->n_cells * this->prop_count); - } - /* start time measurement of sequential part */ seq_a = MPI_Wtime(); /* shuffle grid */ // grid.shuffleAndExport(mpi_buffer); - std::vector mpi_buffer = shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count, wp_sizes_vector.size()); - ctrl_module->sur_shuffled.resize(mpi_buffer.size()); + std::vector mpi_surr_buffer{mpi_buffer}; /* setup local variables */ pkg_to_send = wp_sizes_vector.size(); pkg_to_recv = wp_sizes_vector.size(); workpointer_t work_pointer = mpi_buffer.begin(); - workpointer_t sur_pointer = ctrl_module->sur_shuffled.begin(); + workpointer_t sur_pointer = mpi_surr_buffer.begin(); worker_list_t worker_list(this->comm_size - 1); free_workers = this->comm_size - 1; @@ -490,8 +497,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { if (pkg_to_send > 0) { // send packages to all free workers ... MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send, - i_pkgs, free_workers, dt, iteration, control_logic_enabled, - wp_sizes_vector); + i_pkgs, free_workers, dt, iteration, wp_sizes_vector); } // ... and try to receive them from workers who has finished their work MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers); @@ -516,22 +522,17 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { /* do master stuff */ - /* start time measurement of control logic */ - ctrl_a = MPI_Wtime(); - - if (control_logic_enabled && !ctrl_module->rollback_enabled) { - std::cout << "[Master] Control logic enabled for this iteration." << std::endl; - std::vector sur_unshuffled{ctrl_module->sur_shuffled}; - unshuffleField(ctrl_module->sur_shuffled, this->n_cells, this->prop_count, + if (control_flag) { + std::cout << "[Master] Control logic enabled for this iteration." + << std::endl; + std::vector sur_unshuffled{mpi_surr_buffer}; + unshuffleField(mpi_surr_buffer, this->n_cells, this->prop_count, wp_sizes_vector.size(), sur_unshuffled); - ctrl_module->computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells); + control_module->computeSpeciesErrors(out_vec, sur_unshuffled, + this->n_cells); } - /* end time measurement of control logic */ - ctrl_b = MPI_Wtime(); - this->ctrl_t += ctrl_b - ctrl_a; - /* start time measurement of master chemistry */ sim_e_chemistry = MPI_Wtime(); diff --git a/src/Chemistry/WorkerFunctions.cpp b/src/Chemistry/WorkerFunctions.cpp index 8cf15fe92..6b9b73ceb 100644 --- a/src/Chemistry/WorkerFunctions.cpp +++ b/src/Chemistry/WorkerFunctions.cpp @@ -9,574 +9,487 @@ #include #include #include -#include #include +#include #include #include #include -namespace poet -{ +namespace poet { - inline std::string get_string(int root, MPI_Comm communicator) - { - int count; - MPI_Bcast(&count, 1, MPI_INT, root, communicator); +inline std::string get_string(int root, MPI_Comm communicator) { + int count; + MPI_Bcast(&count, 1, MPI_INT, root, communicator); - char *buffer = new char[count + 1]; - MPI_Bcast(buffer, count, MPI_CHAR, root, communicator); + char *buffer = new char[count + 1]; + MPI_Bcast(buffer, count, MPI_CHAR, root, communicator); - buffer[count] = '\0'; + buffer[count] = '\0'; - std::string ret_str(buffer); - delete[] buffer; + std::string ret_str(buffer); + delete[] buffer; - return ret_str; + return ret_str; +} + +void poet::ChemistryModule::WorkerLoop() { + struct worker_s timings; + + // HACK: defining the worker iteration count here, which will increment after + // each CHEM_ITER_END message + uint32_t iteration = 1; + bool loop = true; + while (loop) { + int func_type; + PropagateFunctionType(func_type); + + switch (func_type) { + case CHEM_FIELD_INIT: { + ChemBCast(&this->prop_count, 1, MPI_UINT32_T); + if (this->ai_surrogate_enabled) { + this->ai_surrogate_validity_vector.resize( + this->n_cells); // resize statt reserve? + } + break; + } + case CHEM_AI_BCAST_VALIDITY: { + // Receive the index vector of valid ai surrogate predictions + MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, + MPI_INT, 0, this->group_comm); + break; + } + case CHEM_CTRL: { + int control_flag = 0; + ChemBCast(&control_flag, 1, MPI_INT); + this->control_enabled = (control_flag == 1); + break; + } + case CHEM_WORK_LOOP: { + WorkerProcessPkgs(timings, iteration); + break; + } + case CHEM_PERF: { + int type; + ChemBCast(&type, 1, MPI_INT); + if (type < WORKER_DHT_HITS) { + WorkerPerfToMaster(type, timings); + break; + } + WorkerMetricsToMaster(type); + break; + } + case CHEM_BREAK_MAIN_LOOP: { + WorkerPostSim(iteration); + loop = false; + break; + } + default: { + throw std::runtime_error("Worker received unknown tag from master."); + } + } + } +} + +void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings, + uint32_t &iteration) { + MPI_Status probe_status; + bool loop = true; + + MPI_Barrier(this->group_comm); + + while (loop) { + double idle_a = MPI_Wtime(); + MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status); + double idle_b = MPI_Wtime(); + + switch (probe_status.MPI_TAG) { + case LOOP_WORK: { + timings.idle_t += idle_b - idle_a; + int count; + MPI_Get_count(&probe_status, MPI_DOUBLE, &count); + + WorkerDoWork(probe_status, count, timings); + break; + } + case LOOP_END: { + WorkerPostIter(probe_status, iteration); + iteration++; + loop = false; + break; + } + } + } +} + +void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, + int double_count, + struct worker_s &timings) { + static int counter = 1; + + double dht_get_start, dht_get_end; + double phreeqc_time_start, phreeqc_time_end; + double dht_fill_start, dht_fill_end; + + uint32_t iteration; + double dt; + double current_sim_time; + uint32_t wp_start_index; + int count = double_count; + std::vector mpi_buffer(count); + + /* receive */ + MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, + MPI_STATUS_IGNORE); + + /* decrement count of work_package by BUFFER_OFFSET */ + count -= BUFFER_OFFSET; + + /* check for changes on all additional variables given by the 'header' of + * mpi_buffer */ + + // work_package_size + poet::WorkPackage s_curr_wp(mpi_buffer[count]); + + // current iteration of simulation + iteration = mpi_buffer[count + 1]; + + // current timestep size + dt = mpi_buffer[count + 2]; + + // current simulation time ('age' of simulation) + current_sim_time = mpi_buffer[count + 3]; + + // current work package start location in field + wp_start_index = mpi_buffer[count + 4]; + + std::cout << "[DEBUG][rank=" << this->comm_rank << "] WP " << counter + << " len=" << count << " | second element: " << mpi_buffer[1] + << " | iteration=" << iteration << " | dt=" << dt + << " | simtime=" << current_sim_time + << " | start_index=" << wp_start_index << std::endl; + + for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { + s_curr_wp.input[wp_i] = + std::vector(mpi_buffer.begin() + this->prop_count * wp_i, + mpi_buffer.begin() + this->prop_count * (wp_i + 1)); } - void poet::ChemistryModule::WorkerLoop() - { - struct worker_s timings; - - // HACK: defining the worker iteration count here, which will increment after - // each CHEM_ITER_END message - uint32_t iteration = 1; - bool loop = true; - - while (loop) - { - int func_type; - PropagateFunctionType(func_type); - - switch (func_type) - { - case CHEM_FIELD_INIT: - { - ChemBCast(&this->prop_count, 1, MPI_UINT32_T); - if (this->ai_surrogate_enabled) - { - this->ai_surrogate_validity_vector.resize( - this->n_cells); // resize statt reserve? - } - break; - } - case CHEM_AI_BCAST_VALIDITY: - { - // Receive the index vector of valid ai surrogate predictions - MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, - MPI_INT, 0, this->group_comm); - break; - } - case CHEM_IP: - { - int interp_flag = 0; - int dht_fill_flag = 0; - - ChemBCast(&interp_flag, 1, MPI_INT); - ChemBCast(&dht_fill_flag, 1, MPI_INT); - - this->interp_enabled = (interp_flag == 1); - this->dht_fill_during_rollback = (dht_fill_flag == 1); - break; - } - case CHEM_WORK_LOOP: - { - WorkerProcessPkgs(timings, iteration); - break; - } - case CHEM_PERF: - { - int type; - ChemBCast(&type, 1, MPI_INT); - if (type < WORKER_DHT_HITS) - { - WorkerPerfToMaster(type, timings); - break; - } - WorkerMetricsToMaster(type); - break; - } - case CHEM_BREAK_MAIN_LOOP: - { - WorkerPostSim(iteration); - loop = false; - break; - } - default: - { - throw std::runtime_error("Worker received unknown tag from master."); - } - } - } - } - - void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings, - uint32_t &iteration) - { - MPI_Status probe_status; - bool loop = true; - - MPI_Barrier(this->group_comm); - - while (loop) - { - double idle_a = MPI_Wtime(); - MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status); - double idle_b = MPI_Wtime(); - - switch (probe_status.MPI_TAG) - { - case LOOP_WORK: - { - timings.idle_t += idle_b - idle_a; - int count; - MPI_Get_count(&probe_status, MPI_DOUBLE, &count); - - WorkerDoWork(probe_status, count, timings); - break; - } - case LOOP_END: - { - WorkerPostIter(probe_status, iteration); - iteration++; - loop = false; - break; - } - } - } - } - - void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, - int double_count, - struct worker_s &timings) - { - static int counter = 1; - - double dht_get_start, dht_get_end; - double phreeqc_time_start, phreeqc_time_end; - double dht_fill_start, dht_fill_end; - double ctrl_time_c, ctrl_time_d; - - uint32_t iteration; - double dt; - double current_sim_time; - uint32_t wp_start_index; - int count = double_count; - bool control_logic_enabled = false; - std::vector mpi_buffer(count); - - /* receive */ - MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, - MPI_STATUS_IGNORE); - - /* decrement count of work_package by BUFFER_OFFSET */ - count -= BUFFER_OFFSET; - /* check for changes on all additional variables given by the 'header' of - * mpi_buffer */ - - // work_package_size - poet::WorkPackage s_curr_wp(mpi_buffer[count]); - - // current iteration of simulation - iteration = mpi_buffer[count + 1]; - - // current timestep size - dt = mpi_buffer[count + 2]; - - // current simulation time ('age' of simulation) - current_sim_time = mpi_buffer[count + 3]; - - // current work package start location in field - wp_start_index = mpi_buffer[count + 4]; - - control_logic_enabled = (mpi_buffer[count + 5] == 1); - - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) - { - s_curr_wp.input[wp_i] = - std::vector(mpi_buffer.begin() + this->prop_count * wp_i, - mpi_buffer.begin() + this->prop_count * (wp_i + 1)); - } - - // std::cout << this->comm_rank << ":" << counter++ << std::endl; - if (dht_enabled || interp_enabled) - { - dht->prepareKeys(s_curr_wp.input, dt); - } - - if (dht_enabled) - { - /* check for values in DHT */ - dht_get_start = MPI_Wtime(); - dht->checkDHT(s_curr_wp); - dht_get_end = MPI_Wtime(); - timings.dht_get += dht_get_end - dht_get_start; - } - - if (interp_enabled) - { - interp->tryInterpolation(s_curr_wp); - } - - if (this->ai_surrogate_enabled) - { - // Map valid predictions from the ai surrogate in the workpackage - for (int i = 0; i < s_curr_wp.size; i++) - { - if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) - { - s_curr_wp.mapping[i] = CHEM_AISURR; - } - } - } - - /* if control iteration: create copy surrogate results (output and mappings) and then set them to zero, - give this to phreeqc */ - - poet::WorkPackage s_curr_wp_control = s_curr_wp; - - if (control_logic_enabled) - { - for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) - { - s_curr_wp_control.output[wp_i] = std::vector(this->prop_count, 0.0); - s_curr_wp_control.mapping[wp_i] = 0; - } - } - - phreeqc_time_start = MPI_Wtime(); - - WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp, current_sim_time, dt); - - phreeqc_time_end = MPI_Wtime(); - - if (control_logic_enabled) - { - /* start time measurement for copying control workpackage */ - ctrl_time_c = MPI_Wtime(); - - std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count; - - mpi_buffer.resize(count + sur_wp_offset); - - for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) - { - std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(), - mpi_buffer.begin() + this->prop_count * wp_i); - } - - // s_curr_wp only contains the interpolated data - // copy surrogate output after the the pqc output, mpi_buffer[pqc][interp] - - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) - { - if (s_curr_wp.mapping[wp_i] != CHEM_PQC) // only copy if surrogate was used - { - std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), - mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i); - } else - { - // if pqc was used, copy pqc results again - std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(), - mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i); - } - - } - - count += sur_wp_offset; - - /* end time measurement for copying control workpackage */ - ctrl_time_d = MPI_Wtime(); - timings.ctrl_t += ctrl_time_d - ctrl_time_c; - } - else - { - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) - { - std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), - mpi_buffer.begin() + this->prop_count * wp_i); - } - } - - /* send results to master */ - MPI_Request send_req; - - int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK; - MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req); - - if (dht_enabled || interp_enabled || dht_fill_during_rollback) - { - /* write results to DHT */ - dht_fill_start = MPI_Wtime(); - dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp); - dht_fill_end = MPI_Wtime(); - - if (interp_enabled) - { - interp->writePairs(); - } - timings.dht_fill += dht_fill_end - dht_fill_start; - } - - timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; - MPI_Wait(&send_req, MPI_STATUS_IGNORE); + // std::cout << this->comm_rank << ":" << counter++ << std::endl; + if (dht_enabled || interp_enabled) { + dht->prepareKeys(s_curr_wp.input, dt); } - void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, - uint32_t iteration) - { - MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm, - MPI_STATUS_IGNORE); - - if (this->dht_enabled) - { - dht_hits.push_back(dht->getHits()); - dht_evictions.push_back(dht->getEvictions()); - dht->resetCounter(); - - if (this->dht_snaps_type == DHT_SNAPS_ITEREND) - { - WorkerWriteDHTDump(iteration); - } - } - - if (this->interp_enabled) - { - std::stringstream out; - interp_calls.push_back(interp->getInterpolationCount()); - interp->resetCounter(); - interp->writePHTStats(); - if (this->dht_snaps_type == DHT_SNAPS_ITEREND) - { - out << this->dht_file_out_dir << "/iter_" << std::setfill('0') - << std::setw(this->file_pad) << iteration << ".pht"; - interp->dumpPHTState(out.str()); - } - - const auto max_mean_idx = - DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); - - if (max_mean_idx >= 2) - { - DHT_flush(this->interp->getDHTObject()); - DHT_flush(this->dht->getDHT()); - if (this->comm_rank == 2) - { - std::cout << "Flushed both DHT and PHT!\n\n"; - } - } - } - - RInsidePOET::getInstance().parseEvalQ("gc()"); + if (dht_enabled) { + /* check for values in DHT */ + dht_get_start = MPI_Wtime(); + dht->checkDHT(s_curr_wp); + dht_get_end = MPI_Wtime(); + timings.dht_get += dht_get_end - dht_get_start; } - void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) - { - if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) - { + if (interp_enabled) { + interp->tryInterpolation(s_curr_wp); + } + + if (this->ai_surrogate_enabled) { + // Map valid predictions from the ai surrogate in the workpackage + for (int i = 0; i < s_curr_wp.size; i++) { + if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) { + s_curr_wp.mapping[i] = CHEM_AISURR; + } + } + } + + /* if control iteration: create copy surrogate results (output and mappings) + and then set them to zero, give this to phreeqc */ + + poet::WorkPackage s_curr_wp_control = s_curr_wp; + + if (control_enabled) { + for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) { + s_curr_wp_control.output[wp_i] = std::vector(prop_count, 0.0); + s_curr_wp_control.mapping[wp_i] = 0; + } + } + + phreeqc_time_start = MPI_Wtime(); + + WorkerRunWorkPackage(control_enabled ? s_curr_wp_control : s_curr_wp, + current_sim_time, dt); + + phreeqc_time_end = MPI_Wtime(); + + count = + packResultsIntoBuffer(mpi_buffer, count, s_curr_wp, s_curr_wp_control); + + /* send results to master */ + MPI_Request send_req; + + int mpi_tag = control_enabled ? LOOP_CTRL : LOOP_WORK; + MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, + &send_req); + + if (dht_enabled || interp_enabled) { + /* write results to DHT */ + dht_fill_start = MPI_Wtime(); + dht->fillDHT(control_enabled ? s_curr_wp_control : s_curr_wp); + dht_fill_end = MPI_Wtime(); + + if (interp_enabled) { + interp->writePairs(); + } + timings.dht_fill += dht_fill_end - dht_fill_start; + } + + timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; + MPI_Wait(&send_req, MPI_STATUS_IGNORE); +} + +int poet::ChemistryModule::packResultsIntoBuffer( + std::vector &mpi_buffer, int base_count, const WorkPackage &wp, + const WorkPackage &wp_control) { + if (control_enabled) { + std::size_t wp_offset = wp_control.size * prop_count; + mpi_buffer.resize(base_count + wp_offset); + + /* copy pqc outputs first */ + for (std::size_t wp_i = 0; wp_i < wp_control.size; wp_i++) { + std::copy(wp_control.output[wp_i].begin(), wp_control.output[wp_i].end(), + mpi_buffer.begin() + prop_count * wp_i); + } + + /* copy surrogate output, only if it contains interpolated data, after the + * the pqc output, layout = mpi_buffer[pqc][interp] */ + for (std::size_t wp_i = 0; wp_i < wp.size; wp_i++) { + const auto &wp_copy = wp.mapping[wp_i] != CHEM_PQC + ? wp.output[wp_i] + : wp_control.output[wp_i]; + + std::copy(wp_copy.begin(), wp_copy.end(), + mpi_buffer.begin() + wp_offset + prop_count * wp_i); + } + return base_count + static_cast(wp_offset); + + } else { + for (std::size_t wp_i = 0; wp_i < wp.size; wp_i++) { + std::copy(wp.output[wp_i].begin(), wp.output[wp_i].end(), + mpi_buffer.begin() + prop_count + wp_i); + } + return base_count; + } +} + +void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, + uint32_t iteration) { + MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm, + MPI_STATUS_IGNORE); + + if (this->dht_enabled) { + dht_hits.push_back(dht->getHits()); + dht_evictions.push_back(dht->getEvictions()); + dht->resetCounter(); + + if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { WorkerWriteDHTDump(iteration); } - if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) - { - std::stringstream out; + } + + if (this->interp_enabled) { + std::stringstream out; + interp_calls.push_back(interp->getInterpolationCount()); + interp->resetCounter(); + interp->writePHTStats(); + if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { out << this->dht_file_out_dir << "/iter_" << std::setfill('0') << std::setw(this->file_pad) << iteration << ".pht"; interp->dumpPHTState(out.str()); } + + const auto max_mean_idx = + DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); + + if (max_mean_idx >= 2) { + DHT_flush(this->interp->getDHTObject()); + DHT_flush(this->dht->getDHT()); + if (this->comm_rank == 2) { + std::cout << "Flushed both DHT and PHT!\n\n"; + } + } } - void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) - { + RInsidePOET::getInstance().parseEvalQ("gc()"); +} + +void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) { + if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { + WorkerWriteDHTDump(iteration); + } + if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { std::stringstream out; out << this->dht_file_out_dir << "/iter_" << std::setfill('0') - << std::setw(this->file_pad) << iteration << ".dht"; - int res = dht->tableToFile(out.str().c_str()); - if (res != DHT_SUCCESS && this->comm_rank == 2) - std::cerr - << "CPP: Worker: Error in writing current state of DHT to file.\n"; - else if (this->comm_rank == 2) - std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() - << "\n"; + << std::setw(this->file_pad) << iteration << ".pht"; + interp->dumpPHTState(out.str()); + } +} + +void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) { + std::stringstream out; + out << this->dht_file_out_dir << "/iter_" << std::setfill('0') + << std::setw(this->file_pad) << iteration << ".dht"; + int res = dht->tableToFile(out.str().c_str()); + if (res != DHT_SUCCESS && this->comm_rank == 2) + std::cerr + << "CPP: Worker: Error in writing current state of DHT to file.\n"; + else if (this->comm_rank == 2) + std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() + << "\n"; +} + +void poet::ChemistryModule::WorkerReadDHTDump( + const std::string &dht_input_file) { + int res = dht->fileToTable((char *)dht_input_file.c_str()); + if (res != DHT_SUCCESS) { + if (res == DHT_WRONG_FILE) { + if (this->comm_rank == 1) + std::cerr + << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n"; + } else { + if (this->comm_rank == 1) + std::cerr << "CPP: Worker: Error in loading current state of DHT from " + "file. Continue with empty DHT ...\n"; + } + } else { + if (this->comm_rank == 2) + std::cout << "CPP: Worker: Successfully loaded state of DHT from file " + << dht_input_file << "\n"; + } +} + +void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, + double dSimTime, + double dTimestep) { + + std::vector> inout_chem = work_package.input; + std::vector to_ignore; + + for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { + if (work_package.mapping[wp_id] != CHEM_PQC) { + to_ignore.push_back(wp_id); + } + + // HACK: remove the first element (cell_id) before sending to phreeqc + inout_chem[wp_id].erase(inout_chem[wp_id].begin(), + inout_chem[wp_id].begin() + 1); } - void poet::ChemistryModule::WorkerReadDHTDump( - const std::string &dht_input_file) - { - int res = dht->fileToTable((char *)dht_input_file.c_str()); - if (res != DHT_SUCCESS) - { - if (res == DHT_WRONG_FILE) - { - if (this->comm_rank == 1) - std::cerr - << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n"; - } - else - { - if (this->comm_rank == 1) - std::cerr << "CPP: Worker: Error in loading current state of DHT from " - "file. Continue with empty DHT ...\n"; - } - } - else - { - if (this->comm_rank == 2) - std::cout << "CPP: Worker: Successfully loaded state of DHT from file " - << dht_input_file << "\n"; + this->pqc_runner->run(inout_chem, dTimestep, to_ignore); + + for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { + if (work_package.mapping[wp_id] == CHEM_PQC) { + // HACK: as we removed the first element (cell_id) before sending to + // phreeqc, copy back with an offset of 1 + work_package.output[wp_id] = work_package.input[wp_id]; + std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(), + work_package.output[wp_id].begin() + 1); } } +} - void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, - double dSimTime, - double dTimestep) - { - - std::vector> inout_chem = work_package.input; - std::vector to_ignore; - - for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) - { - if (work_package.mapping[wp_id] != CHEM_PQC) - { - to_ignore.push_back(wp_id); - } - - // HACK: remove the first element (cell_id) before sending to phreeqc - inout_chem[wp_id].erase( - inout_chem[wp_id].begin(), inout_chem[wp_id].begin() + 1); +void poet::ChemistryModule::WorkerPerfToMaster(int type, + const struct worker_s &timings) { + switch (type) { + case WORKER_PHREEQC: { + MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } - - this->pqc_runner->run(inout_chem, dTimestep, to_ignore); - - for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) - { - if (work_package.mapping[wp_id] == CHEM_PQC) - { - // HACK: as we removed the first element (cell_id) before sending to phreeqc, - // copy back with an offset of 1 - work_package.output[wp_id] = work_package.input[wp_id]; - std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(), - work_package.output[wp_id].begin() + 1); - } - } + case WORKER_CTRL_ITER: { + MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } - - void poet::ChemistryModule::WorkerPerfToMaster(int type, - const struct worker_s &timings) - { - switch (type) - { - case WORKER_PHREEQC: - { - MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_CTRL_ITER: - { - MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_DHT_GET: - { - MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_DHT_FILL: - { - MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_IDLE: - { - MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_IP_WRITE: - { - double val = interp->getPHTWriteTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_READ: - { - double val = interp->getPHTReadTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_GATHER: - { - double val = interp->getDHTGatherTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_FC: - { - double val = interp->getInterpolationTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - default: - { - throw std::runtime_error("Unknown perf type in master's message."); - } - } + case WORKER_DHT_GET: { + MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } - - void poet::ChemistryModule::WorkerMetricsToMaster(int type) - { - MPI_Comm worker_comm = dht->getCommunicator(); - int worker_rank; - MPI_Comm_rank(worker_comm, &worker_rank); - - MPI_Comm &group_comm = this->group_comm; - - auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( - std::vector &send_buffer, int tag) - { - std::vector to_master(send_buffer.size()); - MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), - MPI_UINT32_T, MPI_SUM, 0, worker_comm); - - if (worker_rank == 0) - { - MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, - group_comm); - } - }; - - switch (type) - { - case WORKER_DHT_HITS: - { - reduce_and_send(dht_hits, WORKER_DHT_HITS); - break; - } - case WORKER_DHT_EVICTIONS: - { - reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); - break; - } - case WORKER_IP_CALLS: - { - reduce_and_send(interp_calls, WORKER_IP_CALLS); - return; - } - case WORKER_PHT_CACHE_HITS: - { - std::vector input = this->interp->getPHTLocalCacheHits(); - reduce_and_send(input, WORKER_PHT_CACHE_HITS); - return; - } - default: - { - throw std::runtime_error("Unknown perf type in master's message."); - } - } + case WORKER_DHT_FILL: { + MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } + case WORKER_IDLE: { + MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } + case WORKER_IP_WRITE: { + double val = interp->getPHTWriteTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_READ: { + double val = interp->getPHTReadTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_GATHER: { + double val = interp->getDHTGatherTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_FC: { + double val = interp->getInterpolationTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + default: { + throw std::runtime_error("Unknown perf type in master's message."); + } + } +} + +void poet::ChemistryModule::WorkerMetricsToMaster(int type) { + MPI_Comm worker_comm = dht->getCommunicator(); + int worker_rank; + MPI_Comm_rank(worker_comm, &worker_rank); + + MPI_Comm &group_comm = this->group_comm; + + auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( + std::vector &send_buffer, int tag) { + std::vector to_master(send_buffer.size()); + MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), + MPI_UINT32_T, MPI_SUM, 0, worker_comm); + + if (worker_rank == 0) { + MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, + group_comm); + } + }; + + switch (type) { + case WORKER_DHT_HITS: { + reduce_and_send(dht_hits, WORKER_DHT_HITS); + break; + } + case WORKER_DHT_EVICTIONS: { + reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); + break; + } + case WORKER_IP_CALLS: { + reduce_and_send(interp_calls, WORKER_IP_CALLS); + return; + } + case WORKER_PHT_CACHE_HITS: { + std::vector input = this->interp->getPHTLocalCacheHits(); + reduce_and_send(input, WORKER_PHT_CACHE_HITS); + return; + } + default: { + throw std::runtime_error("Unknown perf type in master's message."); + } + } +} } // namespace poet diff --git a/src/Control/ControlModule.cpp b/src/Control/ControlModule.cpp index a5a71d577..66b3a419c 100644 --- a/src/Control/ControlModule.cpp +++ b/src/Control/ControlModule.cpp @@ -4,15 +4,23 @@ #include "IO/StatsIO.hpp" #include -bool poet::ControlModule::isControlIteration(uint32_t iter) { +void poet::ControlModule::updateControlIteration(const uint32_t iter) { + + global_iteration = iter; + + if (control_interval == 0) { + control_interval_enabled = false; + return; + } + control_interval_enabled = (iter % control_interval == 0); if (control_interval_enabled) { - MSG("[Control] Control interval triggered at iteration " + + MSG("[Control] Control interval enabled at iteration " + std::to_string(iter)); } - return control_interval_enabled; } +/* void poet::ControlModule::beginIteration() { if (rollback_enabled) { if (sur_disabled_counter > 0) { @@ -23,19 +31,23 @@ void poet::ControlModule::beginIteration() { } } } +*/ -void poet::ControlModule::endIteration(uint32_t iter) { +void poet::ControlModule::endIteration(const uint32_t iter) { + + if (!control_interval_enabled) { + return; + } /* Writing a checkpointing */ - if (checkpoint_interval > 0 && iter % checkpoint_interval == 0) { + /* Control Logic*/ + if (control_interval_enabled && + checkpoint_interval > 0 /*&& !rollback_enabled*/) { MSG("Writing checkpoint of iteration " + std::to_string(iter)); write_checkpoint(out_dir, "checkpoint" + std::to_string(iter) + ".hdf5", {.field = chem->getField(), .iteration = iter}); - } + writeStatsToCSV(error_history, species_names, out_dir, "stats_overview"); - /* Control Logic*/ - if (control_interval_enabled && !rollback_enabled) { - writeStatsToCSV(error_history, species_names, out_dir, - "stats_overview"); + /* if (triggerRollbackIfExceeded(*chem, *params, iter)) { rollback_enabled = true; @@ -44,9 +56,12 @@ void poet::ControlModule::endIteration(uint32_t iter) { MSG("Interpolation disabled for the next " + std::to_string(control_interval) + "."); } + + */ } } +/* void poet::ControlModule::BCastControlFlags() { int interp_flag = rollback_enabled ? 0 : 1; int dht_fill_flag = rollback_enabled ? 1 : 0; @@ -54,6 +69,9 @@ void poet::ControlModule::BCastControlFlags() { chem->ChemBCast(&dht_fill_flag, 1, MPI_INT); } +*/ + +/* bool poet::ControlModule::triggerRollbackIfExceeded(ChemistryModule &chem, RuntimeParameters ¶ms, uint32_t &iter) { @@ -91,17 +109,20 @@ bool poet::ControlModule::triggerRollbackIfExceeded(ChemistryModule &chem, } } MSG("All species are within their MAPE and RRMSE thresholds."); - return false; -} + return + false; +} +*/ void poet::ControlModule::computeSpeciesErrors( const std::vector &reference_values, - const std::vector &surrogate_values, uint32_t size_per_prop) { + const std::vector &surrogate_values, const uint32_t size_per_prop) { - SimulationErrorStats species_error_stats(species_count, params->global_iter, - rollback_counter); + SimulationErrorStats species_error_stats(this->species_names.size(), + global_iteration, + /*rollback_counter*/ 0); - for (uint32_t i = 0; i < species_count; ++i) { + for (uint32_t i = 0; i < this->species_names.size(); ++i) { double err_sum = 0.0; double sqr_err_sum = 0.0; uint32_t base_idx = i * size_per_prop; diff --git a/src/Control/ControlModule.hpp b/src/Control/ControlModule.hpp index 6bd848d07..14620c2e4 100644 --- a/src/Control/ControlModule.hpp +++ b/src/Control/ControlModule.hpp @@ -16,43 +16,24 @@ class ChemistryModule; class ControlModule { public: - ControlModule(RuntimeParameters *run_params, ChemistryModule *chem_module) - : params(run_params), chem(chem_module) {}; - /* Control configuration*/ - std::vector species_names; - uint32_t species_count = 0; - std::string out_dir; - bool rollback_enabled = false; - bool control_interval_enabled = false; + // std::uint32_t global_iter = 0; + // std::uint32_t sur_disabled_counter = 0; + // std::uint32_t rollback_counter = 0; - std::uint32_t global_iter = 0; - std::uint32_t sur_disabled_counter = 0; - std::uint32_t rollback_counter = 0; - std::uint32_t checkpoint_interval = 0; - std::uint32_t control_interval = 0; + void updateControlIteration(const uint32_t iter); - std::vector mape_threshold; - std::vector rrmse_threshold; + auto GetGlobalIteration() const noexcept { return global_iteration; } - double ctrl_t = 0.; - double bcast_ctrl_t = 0.; - double recv_ctrl_t = 0.; + // void beginIteration(); - /* Buffer for shuffled surrogate data */ - std::vector sur_shuffled; + void endIteration(const uint32_t iter); - bool isControlIteration(uint32_t iter); + // void BCastControlFlags(); - void beginIteration(); - - void endIteration(uint32_t iter); - - void BCastControlFlags(); - - bool triggerRollbackIfExceeded(ChemistryModule &chem, - RuntimeParameters ¶ms, uint32_t &iter); + //bool triggerRollbackIfExceeded(ChemistryModule &chem, + // RuntimeParameters ¶ms, uint32_t &iter); struct SimulationErrorStats { std::vector mape; @@ -60,14 +41,14 @@ public: uint32_t iteration; // iterations in simulation after rollbacks uint32_t rollback_count; - SimulationErrorStats(size_t species_count, uint32_t iter, uint32_t counter) + SimulationErrorStats(uint32_t species_count, uint32_t iter, uint32_t counter) : mape(species_count, 0.0), rrmse(species_count, 0.0), iteration(iter), rollback_count(counter) {} }; - static void computeSpeciesErrors(const std::vector &reference_values, + void computeSpeciesErrors(const std::vector &reference_values, const std::vector &surrogate_values, - uint32_t size_per_prop); + const uint32_t size_per_prop); std::vector error_history; @@ -75,34 +56,53 @@ public: std::string out_dir; std::uint32_t checkpoint_interval; std::uint32_t control_interval; - std::uint32_t species_count; - std::vector species_names; std::vector mape_threshold; - std::vector rrmse_threshold; }; void enableControlLogic(const ControlSetup &setup) { - out_dir = setup.out_dir; - checkpoint_interval = setup.checkpoint_interval; - control_interval = setup.control_interval; - species_count = setup.species_count; - - species_names = setup.species_names; - mape_threshold = setup.mape_threshold; - rrmse_threshold = setup.rrmse_threshold; + this->out_dir = setup.out_dir; + this->checkpoint_interval = setup.checkpoint_interval; + this->control_interval = setup.control_interval; + this->species_names = setup.species_names; + this->mape_threshold = setup.mape_threshold; } + bool GetControlIntervalEnabled() const { + return this->control_interval_enabled; + } + + auto GetControlInterval() const { return this->control_interval; } + + std::vector GetMapeThreshold() const { return this->mape_threshold; } + /* Profiling getters */ - auto GetMasterCtrlLogicTime() const { return this->ctrl_t; } + auto GetMasterCtrlLogicTime() const { return this->ctrl_time; } - auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; } + auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_time; } - auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; } + auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_time; } private: - RuntimeParameters *params; - ChemistryModule *chem; + bool rollback_enabled = false; + bool control_interval_enabled = false; + + poet::ChemistryModule *chem = nullptr; + + std::uint32_t checkpoint_interval = 0; + std::uint32_t control_interval = 0; + std::uint32_t global_iteration = 0; + std::vector mape_threshold; + + std::vector species_names; + std::string out_dir; + + double ctrl_time = 0.0; + double bcast_ctrl_time = 0.0; + double recv_ctrl_time = 0.0; + + /* Buffer for shuffled surrogate data */ + std::vector sur_shuffled; }; } // namespace poet diff --git a/src/poet.cpp b/src/poet.cpp index 48260f3c7..3525ced8d 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -25,7 +25,7 @@ #include "Base/RInsidePOET.hpp" #include "CLI/CLI.hpp" #include "Chemistry/ChemistryModule.hpp" -#include "Control/ControlManager.hpp" +#include "Control/ControlModule.hpp" #include "DataStructures/Field.hpp" #include "Init/InitialList.hpp" #include "Transport/DiffusionModule.hpp" @@ -255,8 +255,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) { Rcpp::as(global_rt_setup->operator[]("checkpoint_interval")); params.mape_threshold = Rcpp::as>( global_rt_setup->operator[]("mape_threshold")); - params.rrmse_threshold = Rcpp::as>( - global_rt_setup->operator[]("rrmse_threshold")); } catch (const std::exception &e) { ERRMSG("Error while parsing R scripts: " + std::string(e.what())); return ParseRet::PARSER_ERROR; @@ -300,7 +298,6 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, /* SIMULATION LOOP */ double dSimTime{0}; - double chkTime = 0.0; for (uint32_t iter = 1; iter < maxiter + 1; iter++) { // Rollback countdowm @@ -315,10 +312,10 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, } } */ - control.beginIteration(iter); + //control.beginIteration(iter); // params.global_iter = iter; - control.isControlIteration(iter); + control.updateControlIteration(iter); // params.control_interval_enabled = (iter % params.control_interval == 0); double start_t = MPI_Wtime(); @@ -431,8 +428,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, MSG("End of *coupling* iteration " + std::to_string(iter) + "/" + std::to_string(maxiter)); - double chk_start = MPI_Wtime(); - control.endIteration(iter) + control.endIteration(iter); /* if (iter % params.checkpoint_interval == 0) { MSG("Writing checkpoint of iteration " + std::to_string(iter)); @@ -457,8 +453,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, */ - double chk_end = MPI_Wtime(); - chkTime += chk_end - chk_start; + // MSG(); } // END SIMULATION LOOP @@ -476,13 +471,14 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, Rcpp::List diffusion_profiling; diffusion_profiling["simtime"] = diffusion.getTransportTime(); - Rcpp::List ctrl_profiling; + /*Rcpp::List ctrl_profiling; ctrl_profiling["checkpointing_time"] = chkTime; ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime(); ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime(); ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime(); ctrl_profiling["ctrl_logic_worker"] = Rcpp::wrap(chem.GetWorkerControlTimings()); + */ if (params.use_dht) { chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits()); @@ -510,7 +506,7 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, profiling["simtime"] = dSimTime; profiling["chemistry"] = chem_profiling; profiling["diffusion"] = diffusion_profiling; - profiling["ctrl_logic"] = ctrl_profiling; + //profiling["ctrl_logic"] = ctrl_profiling; chem.MasterLoopBreak(); @@ -652,7 +648,10 @@ int main(int argc, char *argv[]) { ChemistryModule chemistry(run_params.work_package_size, init_list.getChemistryInit(), MPI_COMM_WORLD); - ControlModule control(&run_params, &chemistry); + + ControlModule control; + + chemistry.setControlModule(&control); const ChemistryModule::SurrogateSetup surr_setup = { getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD), @@ -674,14 +673,11 @@ int main(int argc, char *argv[]) { run_params.out_dir, // added run_params.checkpoint_interval, run_params.control_interval, - run_params.species_count, - run_params.species_names, - run_params.mape_threshold, - run_params.rrmse_threshold}; + getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD), + run_params.mape_threshold}; control.enableControlLogic(ctrl_setup); - if (MY_RANK > 0) { chemistry.WorkerLoop(); } else { @@ -725,7 +721,7 @@ int main(int argc, char *argv[]) { chemistry.masterSetField(init_list.getInitialGrid()); - Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry); + Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry, control); MSG("finished simulation loop"); diff --git a/src/poet.hpp.in b/src/poet.hpp.in index aea51966e..b5f807c1c 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -51,15 +51,9 @@ struct RuntimeParameters { bool print_progress = false; - bool rollback_enabled = false; - bool control_interval_enabled = false; - std::uint32_t global_iter = 0; - std::uint32_t sur_disabled_counter = 0; - std::uint32_t rollback_counter = 0; std::uint32_t checkpoint_interval = 0; std::uint32_t control_interval = 0; std::vector mape_threshold; - std::vector rrmse_threshold; static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32; std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;