From 1b2d942960287dd5b43c1015a8ab27088e095952 Mon Sep 17 00:00:00 2001 From: rastogi Date: Sun, 2 Nov 2025 22:26:14 +0100 Subject: [PATCH] feat: bcast control_cell_ids to workers --- src/Chemistry/ChemistryModule.hpp | 61 +- src/Chemistry/MasterFunctions.cpp | 56 +- src/Chemistry/WorkerFunctions.cpp | 960 +++++++++++++++--------------- src/Control/ControlModule.hpp | 2 +- src/poet.cpp | 29 +- 5 files changed, 554 insertions(+), 554 deletions(-) diff --git a/src/Chemistry/ChemistryModule.hpp b/src/Chemistry/ChemistryModule.hpp index 0948e52a4..dec7b2bdc 100644 --- a/src/Chemistry/ChemistryModule.hpp +++ b/src/Chemistry/ChemistryModule.hpp @@ -2,19 +2,16 @@ #ifndef CHEMISTRYMODULE_H_ #define CHEMISTRYMODULE_H_ +#include "ChemistryDefs.hpp" +#include "Control/ControlModule.hpp" #include "DataStructures/Field.hpp" #include "DataStructures/NamedVector.hpp" - -#include "ChemistryDefs.hpp" - #include "Init/InitialList.hpp" #include "NameDouble.h" +#include "PhreeqcRunner.hpp" #include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/Interpolation.hpp" -#include "poet.hpp" - -#include "PhreeqcRunner.hpp" #include #include #include @@ -24,6 +21,7 @@ #include namespace poet { +class ControlModule; /** * \brief Wrapper around PhreeqcRM to provide POET specific parallelization with * easy access. @@ -173,7 +171,7 @@ public: /** * **Master only** Return the time in seconds the master spent in the * send/receive loop. - */ + */ auto GetMasterLoopTime() const { return this->send_recv_t; } auto GetMasterRecvCtrlDataTime() const { return this->recv_ctrl_t; } @@ -211,6 +209,8 @@ public: */ std::vector GetWorkerIdleTimings() const; + std::vector GetWorkerControlTimings() const; + /** * **Master only** Collect and return DHT hits of all workers. * @@ -257,25 +257,15 @@ public: std::vector ai_surrogate_validity_vector; - RuntimeParameters *runtime_params = nullptr; - uint32_t control_iteration_counter = 0; + void SetControlModule(poet::ControlModule *ctrl) { control_module = ctrl; } - struct error_stats { - std::vector mape; - std::vector rrsme; - uint32_t iteration; + void SetDhtEnabled(bool enabled) { dht_enabled = enabled; } + bool GetDhtEnabled() const { return dht_enabled; } - error_stats(size_t species_count, size_t iter) - : mape(species_count, 0.0), rrsme(species_count, 0.0), iteration(iter) { - } - }; + void SetInterpEnabled(bool enabled) { interp_enabled = enabled; } + bool GetInterpEnabled() const { return interp_enabled; } - std::vector error_stats_history; - - static void computeStats(const std::vector &pqc_vector, - const std::vector &sur_vector, - uint32_t size_per_prop, uint32_t species_count, - error_stats &stats); + void SetWarmupEnabled(bool enabled) { warmup_enabled = enabled; } protected: void initializeDHT(uint32_t size_mb, @@ -290,12 +280,13 @@ protected: enum { CHEM_FIELD_INIT, - CHEM_DHT_ENABLE, + //CHEM_DHT_ENABLE, CHEM_DHT_SIGNIF_VEC, CHEM_DHT_SNAPS, CHEM_DHT_READ_FILE, - CHEM_INTERP, - CHEM_IP_ENABLE, + //CHEM_WARMUP_PHASE, // Control flag + //CHEM_CTRL_ENABLE, // Control flag + //CHEM_IP_ENABLE, CHEM_IP_MIN_ENTRIES, CHEM_IP_SIGNIF_VEC, CHEM_WORK_LOOP, @@ -308,6 +299,7 @@ protected: enum { WORKER_PHREEQC, + WORKER_CTRL_ITER, WORKER_DHT_GET, WORKER_DHT_FILL, WORKER_IDLE, @@ -330,6 +322,7 @@ protected: double dht_get = 0.; double dht_fill = 0.; double idle_t = 0.; + double ctrl_t = 0.; }; struct worker_info_s { @@ -347,7 +340,7 @@ protected: void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer, workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs, int &free_workers, double dt, - uint32_t iteration, uint32_t control_iteration, + uint32_t iteration, const std::vector &wp_sizes_vector); void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send, int &free_workers); @@ -385,6 +378,10 @@ protected: void BCastStringVec(std::vector &io); + int packResultsIntoBuffer(std::vector &mpi_buffer, int base_count, + const WorkPackage &wp, + const WorkPackage &wp_control); + int comm_size, comm_rank; MPI_Comm group_comm; @@ -412,6 +409,7 @@ protected: inline void PropagateFunctionType(int &type) const { ChemBCast(&type, 1, MPI_INT); } + double simtime = 0.; double idle_t = 0.; double seq_t = 0.; @@ -419,10 +417,9 @@ protected: double recv_ctrl_t = 0.; double shuf_t = 0.; - double metrics_t = 0. + double metrics_t = 0.; - std::array - base_totals{0}; + std::array base_totals{0}; bool print_progessbar{false}; @@ -442,8 +439,12 @@ protected: poet::ControlModule *control_module = nullptr; + std::vector mpi_surr_buffer; + bool control_enabled{false}; bool warmup_enabled{false}; + + // std::vector sur_shuffled; }; } // namespace poet diff --git a/src/Chemistry/MasterFunctions.cpp b/src/Chemistry/MasterFunctions.cpp index 06e5e9c02..b731188ce 100644 --- a/src/Chemistry/MasterFunctions.cpp +++ b/src/Chemistry/MasterFunctions.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -41,6 +40,12 @@ std::vector poet::ChemistryModule::GetWorkerPhreeqcTimings() const { return MasterGatherWorkerTimings(WORKER_PHREEQC); } +std::vector poet::ChemistryModule::GetWorkerControlTimings() const { + int type = CHEM_PERF; + MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); + return MasterGatherWorkerTimings(WORKER_CTRL_ITER); +} + std::vector poet::ChemistryModule::GetWorkerDHTGetTimings() const { int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); @@ -252,6 +257,8 @@ inline void poet::ChemistryModule::MasterSendPkgs( /* note current processed work package in workerlist */ w_list[p].send_addr = work_pointer.base(); w_list[p].surrogate_addr = sur_pointer.base(); + // this->control_enabled ? sur_pointer.base() : w_list[p].surrogate_addr = + // nullptr; /* push work pointer to next work package */ const uint32_t end_of_wp = local_work_package_size * this->prop_count; @@ -349,6 +356,11 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, std::copy(recv_buffer.begin(), recv_buffer.begin() + half, w_list[p - 1].send_addr); + /* + if (w_list[p - 1].surrogate_addr == nullptr) { + throw std::runtime_error("MasterRecvPkgs: surrogate_addr is null"); + }*/ + std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size, w_list[p - 1].surrogate_addr); recv_ctrl_b = MPI_Wtime(); @@ -418,6 +430,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { int free_workers; int i_pkgs; int ftype; + double shuf_a, shuf_b, metrics_a, metrics_b; const std::vector wp_sizes_vector = CalculateWPSizesVector(this->n_cells, this->wp_size); @@ -435,47 +448,34 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { ftype = CHEM_WORK_LOOP; PropagateFunctionType(ftype); - ftype = CHEM_INTERP; - PropagateFunctionType(ftype); - - if (this->runtime_params->rollback_simulation) { - this->interp_enabled = false; - int interp_flag = 0; - ChemBCast(&interp_flag, 1, MPI_INT); - } else { - this->interp_enabled = true; - int interp_flag = 1; - ChemBCast(&interp_flag, 1, MPI_INT); - } - MPI_Barrier(this->group_comm); - static uint32_t iteration = 0; - uint32_t control_iteration = static_cast( - this->runtime_params->control_iteration_active ? 1 : 0); - if (control_iteration) { - sur_shuffled.clear(); - sur_shuffled.reserve(this->n_cells * this->prop_count); + this->control_enabled = this->control_module->getControlIntervalEnabled(); + if (this->control_enabled) { + this->mpi_surr_buffer.assign(this->n_cells * this->prop_count, 0.0); } + static uint32_t iteration = 0; + /* start time measurement of sequential part */ seq_a = MPI_Wtime(); /* shuffle grid */ // grid.shuffleAndExport(mpi_buffer); - std::vector mpi_buffer = shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count, wp_sizes_vector.size()); - this->sur_shuffled.resize(mpi_buffer.size()); + //this->mpi_surr_buffer.resize(mpi_buffer.size()); /* setup local variables */ pkg_to_send = wp_sizes_vector.size(); pkg_to_recv = wp_sizes_vector.size(); workpointer_t work_pointer = mpi_buffer.begin(); - workpointer_t sur_pointer = sur_shuffled.begin(); + workpointer_t sur_pointer = this->mpi_surr_buffer.begin(); + //(this->control_enabled ? this->mpi_surr_buffer.begin() + // : mpi_buffer.end()); worker_list_t worker_list(this->comm_size - 1); free_workers = this->comm_size - 1; @@ -499,8 +499,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { if (pkg_to_send > 0) { // send packages to all free workers ... MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send, - i_pkgs, free_workers, dt, iteration, control_iteration, - wp_sizes_vector); + i_pkgs, free_workers, dt, iteration, wp_sizes_vector); } // ... and try to receive them from workers who has finished their work MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers); @@ -524,15 +523,13 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { chem_field = out_vec; /* do master stuff */ - - /* do master stuff */ - if (control_enabled) { + if (this->control_enabled) { std::cout << "[Master] Control logic enabled for this iteration." << std::endl; std::vector sur_unshuffled{mpi_surr_buffer}; shuf_a = MPI_Wtime(); - unshuffleField(mpi_surr_buffer, this->n_cells, this->prop_count, + unshuffleField(this->mpi_surr_buffer, this->n_cells, this->prop_count, wp_sizes_vector.size(), sur_unshuffled); shuf_b = MPI_Wtime(); this->shuf_t += shuf_b - shuf_a; @@ -550,7 +547,6 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { this->metrics_t += metrics_b - metrics_a; } - /* start time measurement of master chemistry */ sim_e_chemistry = MPI_Wtime(); diff --git a/src/Chemistry/WorkerFunctions.cpp b/src/Chemistry/WorkerFunctions.cpp index b354f986d..ed56ea10a 100644 --- a/src/Chemistry/WorkerFunctions.cpp +++ b/src/Chemistry/WorkerFunctions.cpp @@ -9,556 +9,534 @@ #include #include #include -#include #include +#include #include #include #include -namespace poet -{ +namespace poet { - inline std::string get_string(int root, MPI_Comm communicator) - { - int count; - MPI_Bcast(&count, 1, MPI_INT, root, communicator); +inline std::string get_string(int root, MPI_Comm communicator) { + int count; + MPI_Bcast(&count, 1, MPI_INT, root, communicator); - char *buffer = new char[count + 1]; - MPI_Bcast(buffer, count, MPI_CHAR, root, communicator); + char *buffer = new char[count + 1]; + MPI_Bcast(buffer, count, MPI_CHAR, root, communicator); - buffer[count] = '\0'; + buffer[count] = '\0'; - std::string ret_str(buffer); - delete[] buffer; + std::string ret_str(buffer); + delete[] buffer; - return ret_str; + return ret_str; +} + +void poet::ChemistryModule::WorkerLoop() { + struct worker_s timings; + + // HACK: defining the worker iteration count here, which will increment after + // each CHEM_ITER_END message + uint32_t iteration = 1; + bool loop = true; + + while (loop) { + int func_type; + PropagateFunctionType(func_type); + + switch (func_type) { + case CHEM_FIELD_INIT: { + ChemBCast(&this->prop_count, 1, MPI_UINT32_T); + if (this->ai_surrogate_enabled) { + this->ai_surrogate_validity_vector.resize( + this->n_cells); // resize statt reserve? + } + break; + } + case CHEM_AI_BCAST_VALIDITY: { + // Receive the index vector of valid ai surrogate predictions + MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, + MPI_INT, 0, this->group_comm); + break; + } + /* + case CHEM_WARMUP_PHASE: { + int warmup_flag = 0; + ChemBCast(&warmup_flag, 1, MPI_INT); + this->warmup_enabled = (warmup_flag == 1); + //std::cout << "Warmup phase is " << this->warmup_enabled << std::endl; + break; + } + case CHEM_DHT_ENABLE: { + int dht_flag = 0; + ChemBCast(&dht_flag, 1, MPI_INT); + this->dht_enabled = (dht_flag == 1); + //std::cout << "DHT_enabled is " << this->dht_enabled << std::endl; + break; + } + case CHEM_IP_ENABLE: { + int interp_flag = 0; + ChemBCast(&interp_flag, 1, MPI_INT); + this->interp_enabled = (interp_flag == 1); + ; + std::cout << "Interp_enabled is " << this->interp_enabled << std::endl; + break; + } + case CHEM_CTRL_ENABLE: { + int control_flag = 0; + ChemBCast(&control_flag, 1, MPI_INT); + this->control_enabled = (control_flag == 1); + std::cout << "Control_enabled is " << this->control_enabled << std::endl; + break; + } + */ + case CHEM_WORK_LOOP: { + WorkerProcessPkgs(timings, iteration); + break; + } + case CHEM_PERF: { + int type; + ChemBCast(&type, 1, MPI_INT); + if (type < WORKER_DHT_HITS) { + WorkerPerfToMaster(type, timings); + break; + } + WorkerMetricsToMaster(type); + break; + } + case CHEM_BREAK_MAIN_LOOP: { + WorkerPostSim(iteration); + loop = false; + break; + } + default: { + throw std::runtime_error("Worker received unknown tag from master."); + } + } + } +} + +void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings, + uint32_t &iteration) { + MPI_Status probe_status; + bool loop = true; + + MPI_Barrier(this->group_comm); + + while (loop) { + double idle_a = MPI_Wtime(); + MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status); + double idle_b = MPI_Wtime(); + + switch (probe_status.MPI_TAG) { + case LOOP_WORK: { + timings.idle_t += idle_b - idle_a; + int count; + MPI_Get_count(&probe_status, MPI_DOUBLE, &count); + + WorkerDoWork(probe_status, count, timings); + break; + } + case LOOP_END: { + WorkerPostIter(probe_status, iteration); + iteration++; + loop = false; + break; + } + } + } +} + +void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, + int double_count, + struct worker_s &timings) { + static int counter = 1; + + double dht_get_start, dht_get_end; + double phreeqc_time_start, phreeqc_time_end; + double dht_fill_start, dht_fill_end; + double ctrl_cp_start, ctrl_cp_end, ctrl_start, ctrl_end; + + uint32_t iteration; + double dt; + double current_sim_time; + uint32_t wp_start_index; + int count = double_count; + int flags; + std::vector mpi_buffer(count); + + /* receive */ + MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, + MPI_STATUS_IGNORE); + + /* decrement count of work_package by BUFFER_OFFSET */ + count -= BUFFER_OFFSET; + /* check for changes on all additional variables given by the 'header' of + * mpi_buffer */ + + // work_package_size + poet::WorkPackage s_curr_wp(mpi_buffer[count]); + + // current iteration of simulation + iteration = mpi_buffer[count + 1]; + + // current timestep size + dt = mpi_buffer[count + 2]; + + // current simulation time ('age' of simulation) + current_sim_time = mpi_buffer[count + 3]; + + // current work package start location in field + wp_start_index = mpi_buffer[count + 4]; + + // read packed control flags + flags = static_cast(mpi_buffer[count + 5]); + this->interp_enabled = (flags & 1) != 0; + this->dht_enabled = (flags & 2) != 0; + this->warmup_enabled = (flags & 4) != 0; + this->control_enabled = (flags & 8) != 0; + + /*std::cout << "warmup_enabled is " << warmup_enabled << ", control_enabled is + " + << control_enabled << ", dht_enabled is " + << dht_enabled << ", interp_enabled is " << interp_enabled + << std::endl;*/ + + for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { + s_curr_wp.input[wp_i] = + std::vector(mpi_buffer.begin() + this->prop_count * wp_i, + mpi_buffer.begin() + this->prop_count * (wp_i + 1)); } - void poet::ChemistryModule::WorkerLoop() - { - struct worker_s timings; + // std::cout << this->comm_rank << ":" << counter++ << std::endl; + if (dht_enabled || interp_enabled || warmup_enabled) { + dht->prepareKeys(s_curr_wp.input, dt); + } - // HACK: defining the worker iteration count here, which will increment after - // each CHEM_ITER_END message - uint32_t iteration = 1; - bool loop = true; + if (dht_enabled) { + /* check for values in DHT */ + dht_get_start = MPI_Wtime(); + dht->checkDHT(s_curr_wp); + dht_get_end = MPI_Wtime(); + timings.dht_get += dht_get_end - dht_get_start; + } - while (loop) - { - int func_type; - PropagateFunctionType(func_type); + if (interp_enabled) { + interp->tryInterpolation(s_curr_wp); + } - switch (func_type) - { - case CHEM_FIELD_INIT: - { - ChemBCast(&this->prop_count, 1, MPI_UINT32_T); - if (this->ai_surrogate_enabled) - { - this->ai_surrogate_validity_vector.resize( - this->n_cells); // resize statt reserve? - } - break; - } - case CHEM_AI_BCAST_VALIDITY: - { - // Receive the index vector of valid ai surrogate predictions - MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, - MPI_INT, 0, this->group_comm); - break; - } - case CHEM_INTERP: - { - int interp_flag; - ChemBCast(&interp_flag, 1, MPI_INT); - this->interp_enabled = (interp_flag == 1); - break; - } - case CHEM_WORK_LOOP: - { - WorkerProcessPkgs(timings, iteration); - break; - } - case CHEM_PERF: - { - int type; - ChemBCast(&type, 1, MPI_INT); - if (type < WORKER_DHT_HITS) - { - WorkerPerfToMaster(type, timings); - break; - } - WorkerMetricsToMaster(type); - break; - } - case CHEM_BREAK_MAIN_LOOP: - { - WorkerPostSim(iteration); - loop = false; - break; - } - default: - { - throw std::runtime_error("Worker received unknown tag from master."); - } - } + if (this->ai_surrogate_enabled) { + // Map valid predictions from the ai surrogate in the workpackage + for (int i = 0; i < s_curr_wp.size; i++) { + if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) { + s_curr_wp.mapping[i] = CHEM_AISURR; } } + } - void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings, - uint32_t &iteration) - { - MPI_Status probe_status; - bool loop = true; + /* if control iteration: create copy surrogate results (output and mappings) + and then set them to zero, give this to phreeqc */ - MPI_Barrier(this->group_comm); + - while (loop) - { - double idle_a = MPI_Wtime(); - MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status); - double idle_b = MPI_Wtime(); + poet::WorkPackage s_curr_wp_control = s_curr_wp; - switch (probe_status.MPI_TAG) - { - case LOOP_WORK: - { - timings.idle_t += idle_b - idle_a; - int count; - MPI_Get_count(&probe_status, MPI_DOUBLE, &count); - WorkerDoWork(probe_status, count, timings); - break; - } - case LOOP_END: - { - WorkerPostIter(probe_status, iteration); - iteration++; - loop = false; - break; - } - } - } + + /* + if (control_enabled) { + ctrl_cp_start = MPI_Wtime(); + for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) { + s_curr_wp_control.output[wp_i] = std::vector(this->prop_count, 0.0); + s_curr_wp_control.mapping[wp_i] = CHEM_PQC; + } + ctrl_cp_end = MPI_Wtime(); + timings.ctrl_t += ctrl_cp_end - ctrl_cp_start; + } + */ + + phreeqc_time_start = MPI_Wtime(); + + WorkerRunWorkPackage(control_enabled ? s_curr_wp_control : s_curr_wp, + current_sim_time, dt); + + phreeqc_time_end = MPI_Wtime(); + + for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { + std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), + mpi_buffer.begin() + this->prop_count * wp_i); } - void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, - int double_count, - struct worker_s &timings) - { - static int counter = 1; + /* + if (control_enabled) { + ctrl_start = MPI_Wtime(); + std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count; - double dht_get_start, dht_get_end; - double phreeqc_time_start, phreeqc_time_end; - double dht_fill_start, dht_fill_end; + mpi_buffer.resize(count + sur_wp_offset); - uint32_t iteration; - double dt; - double current_sim_time; - uint32_t wp_start_index; - int count = double_count; - bool control_iteration_active = false; - std::vector mpi_buffer(count); - - /* receive */ - MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, - MPI_STATUS_IGNORE); - - /* decrement count of work_package by BUFFER_OFFSET */ - count -= BUFFER_OFFSET; - /* check for changes on all additional variables given by the 'header' of - * mpi_buffer */ - - // work_package_size - poet::WorkPackage s_curr_wp(mpi_buffer[count]); - - // current iteration of simulation - iteration = mpi_buffer[count + 1]; - - // current timestep size - dt = mpi_buffer[count + 2]; - - // current simulation time ('age' of simulation) - current_sim_time = mpi_buffer[count + 3]; - - // current work package start location in field - wp_start_index = mpi_buffer[count + 4]; - - control_iteration_active = (mpi_buffer[count + 5] == 1); - - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) - { - s_curr_wp.input[wp_i] = - std::vector(mpi_buffer.begin() + this->prop_count * wp_i, - mpi_buffer.begin() + this->prop_count * (wp_i + 1)); + for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) { + std::copy(s_curr_wp_control.output[wp_i].begin(), + s_curr_wp_control.output[wp_i].end(), + mpi_buffer.begin() + this->prop_count * wp_i); } - // std::cout << this->comm_rank << ":" << counter++ << std::endl; - if (dht_enabled || interp_enabled) - { - dht->prepareKeys(s_curr_wp.input, dt); - } + // s_curr_wp only contains the interpolated data + // copy surrogate output after the the pqc output, mpi_buffer[pqc][interp] - if (dht_enabled) - { - /* check for values in DHT */ - dht_get_start = MPI_Wtime(); - dht->checkDHT(s_curr_wp); - dht_get_end = MPI_Wtime(); - timings.dht_get += dht_get_end - dht_get_start; - } - - if (interp_enabled) - { - interp->tryInterpolation(s_curr_wp); - } - - if (this->ai_surrogate_enabled) - { - // Map valid predictions from the ai surrogate in the workpackage - for (int i = 0; i < s_curr_wp.size; i++) - { - if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) - { - s_curr_wp.mapping[i] = CHEM_AISURR; - } - } - } - - /* if control iteration: create copy surrogate results (output and mappings) and then set them to zero, - give this to phreeqc */ - - poet::WorkPackage s_curr_wp_control = s_curr_wp; - - if (control_iteration_active) - { - for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) - { - s_curr_wp_control.output[wp_i] = std::vector(this->prop_count, 0.0); - s_curr_wp_control.mapping[wp_i] = 0; - } - } - - phreeqc_time_start = MPI_Wtime(); - - WorkerRunWorkPackage(control_iteration_active ? s_curr_wp_control : s_curr_wp, current_sim_time, dt); - - phreeqc_time_end = MPI_Wtime(); - - if (control_iteration_active) - { - std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count; - - mpi_buffer.resize(count + sur_wp_offset); - - for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) - { - std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(), - mpi_buffer.begin() + this->prop_count * wp_i); - } - - // s_curr_wp only contains the interpolated data - // copy surrogate output after the the pqc output, mpi_buffer[pqc][interp] - - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) - { - if (s_curr_wp.mapping[wp_i] != CHEM_PQC) // only copy if surrogate was used - { + for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { + // only copy if surrogate was used + if (s_curr_wp.mapping[wp_i] != CHEM_PQC) { std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i); - } else - { + } else { // if pqc was used, copy pqc results again - std::copy(s_curr_wp_control.output[wp_i].begin(), s_curr_wp_control.output[wp_i].end(), + std::copy(s_curr_wp_control.output[wp_i].begin(), + s_curr_wp_control.output[wp_i].end(), mpi_buffer.begin() + sur_wp_offset + this->prop_count * wp_i); } - - } - - count += sur_wp_offset; } - else - { - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) - { - std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), - mpi_buffer.begin() + this->prop_count * wp_i); - } + count += sur_wp_offset; + ctrl_end = MPI_Wtime(); + timings.ctrl_t += ctrl_end - ctrl_start; + } else { + + } +*/ + + /* send results to master */ + MPI_Request send_req; + + int mpi_tag = control_enabled ? LOOP_CTRL : LOOP_WORK; + MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, + &send_req); + + if (dht_enabled || interp_enabled || warmup_enabled) { + /* write results to DHT */ + dht_fill_start = MPI_Wtime(); + dht->fillDHT(control_enabled ? s_curr_wp_control : s_curr_wp); + dht_fill_end = MPI_Wtime(); + + if (interp_enabled || warmup_enabled) { + interp->writePairs(); } - - /* send results to master */ - MPI_Request send_req; - - int mpi_tag = control_iteration_active ? LOOP_CTRL : LOOP_WORK; - MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req); - - if (dht_enabled || interp_enabled) - { - /* write results to DHT */ - dht_fill_start = MPI_Wtime(); - dht->fillDHT(control_iteration_active ? s_curr_wp_control : s_curr_wp); - dht_fill_end = MPI_Wtime(); - - if (interp_enabled) - { - interp->writePairs(); - } - timings.dht_fill += dht_fill_end - dht_fill_start; - } - - timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; - - MPI_Wait(&send_req, MPI_STATUS_IGNORE); + timings.dht_fill += dht_fill_end - dht_fill_start; } - void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, - uint32_t iteration) - { - MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm, - MPI_STATUS_IGNORE); + timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; + MPI_Wait(&send_req, MPI_STATUS_IGNORE); +} - if (this->dht_enabled) - { - dht_hits.push_back(dht->getHits()); - dht_evictions.push_back(dht->getEvictions()); - dht->resetCounter(); +void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, + uint32_t iteration) { + MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm, + MPI_STATUS_IGNORE); - if (this->dht_snaps_type == DHT_SNAPS_ITEREND) - { - WorkerWriteDHTDump(iteration); - } - } + if (this->dht_enabled) { + dht_hits.push_back(dht->getHits()); + dht_evictions.push_back(dht->getEvictions()); + dht->resetCounter(); - if (this->interp_enabled) - { - std::stringstream out; - interp_calls.push_back(interp->getInterpolationCount()); - interp->resetCounter(); - interp->writePHTStats(); - if (this->dht_snaps_type == DHT_SNAPS_ITEREND) - { - out << this->dht_file_out_dir << "/iter_" << std::setfill('0') - << std::setw(this->file_pad) << iteration << ".pht"; - interp->dumpPHTState(out.str()); - } - - const auto max_mean_idx = - DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); - - if (max_mean_idx >= 2) - { - DHT_flush(this->interp->getDHTObject()); - DHT_flush(this->dht->getDHT()); - if (this->comm_rank == 2) - { - std::cout << "Flushed both DHT and PHT!\n\n"; - } - } - } - - RInsidePOET::getInstance().parseEvalQ("gc()"); - } - - void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) - { - if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) - { + if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { WorkerWriteDHTDump(iteration); } - if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) - { - std::stringstream out; + } + + if (this->interp_enabled) { + std::stringstream out; + interp_calls.push_back(interp->getInterpolationCount()); + interp->resetCounter(); + interp->writePHTStats(); + if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { out << this->dht_file_out_dir << "/iter_" << std::setfill('0') << std::setw(this->file_pad) << iteration << ".pht"; interp->dumpPHTState(out.str()); } + + const auto max_mean_idx = + DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); + + if (max_mean_idx >= 2) { + DHT_flush(this->interp->getDHTObject()); + DHT_flush(this->dht->getDHT()); + if (this->comm_rank == 2) { + std::cout << "Flushed both DHT and PHT!\n\n"; + } + } } - void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) - { + RInsidePOET::getInstance().parseEvalQ("gc()"); +} + +void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) { + if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { + WorkerWriteDHTDump(iteration); + } + if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { std::stringstream out; out << this->dht_file_out_dir << "/iter_" << std::setfill('0') - << std::setw(this->file_pad) << iteration << ".dht"; - int res = dht->tableToFile(out.str().c_str()); - if (res != DHT_SUCCESS && this->comm_rank == 2) - std::cerr - << "CPP: Worker: Error in writing current state of DHT to file.\n"; - else if (this->comm_rank == 2) - std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() - << "\n"; + << std::setw(this->file_pad) << iteration << ".pht"; + interp->dumpPHTState(out.str()); + } +} + +void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) { + std::stringstream out; + out << this->dht_file_out_dir << "/iter_" << std::setfill('0') + << std::setw(this->file_pad) << iteration << ".dht"; + int res = dht->tableToFile(out.str().c_str()); + if (res != DHT_SUCCESS && this->comm_rank == 2) + std::cerr + << "CPP: Worker: Error in writing current state of DHT to file.\n"; + else if (this->comm_rank == 2) + std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() + << "\n"; +} + +void poet::ChemistryModule::WorkerReadDHTDump( + const std::string &dht_input_file) { + int res = dht->fileToTable((char *)dht_input_file.c_str()); + if (res != DHT_SUCCESS) { + if (res == DHT_WRONG_FILE) { + if (this->comm_rank == 1) + std::cerr + << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n"; + } else { + if (this->comm_rank == 1) + std::cerr << "CPP: Worker: Error in loading current state of DHT from " + "file. Continue with empty DHT ...\n"; + } + } else { + if (this->comm_rank == 2) + std::cout << "CPP: Worker: Successfully loaded state of DHT from file " + << dht_input_file << "\n"; + } +} + +void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, + double dSimTime, + double dTimestep) { + + std::vector> inout_chem = work_package.input; + std::vector to_ignore; + + for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { + if (work_package.mapping[wp_id] != CHEM_PQC) { + to_ignore.push_back(wp_id); + } + + // HACK: remove the first element (cell_id) before sending to phreeqc + inout_chem[wp_id].erase(inout_chem[wp_id].begin(), + inout_chem[wp_id].begin() + 1); } - void poet::ChemistryModule::WorkerReadDHTDump( - const std::string &dht_input_file) - { - int res = dht->fileToTable((char *)dht_input_file.c_str()); - if (res != DHT_SUCCESS) - { - if (res == DHT_WRONG_FILE) - { - if (this->comm_rank == 1) - std::cerr - << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n"; - } - else - { - if (this->comm_rank == 1) - std::cerr << "CPP: Worker: Error in loading current state of DHT from " - "file. Continue with empty DHT ...\n"; - } - } - else - { - if (this->comm_rank == 2) - std::cout << "CPP: Worker: Successfully loaded state of DHT from file " - << dht_input_file << "\n"; + this->pqc_runner->run(inout_chem, dTimestep, to_ignore); + + for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { + if (work_package.mapping[wp_id] == CHEM_PQC) { + // HACK: as we removed the first element (cell_id) before sending to + // phreeqc, copy back with an offset of 1 + work_package.output[wp_id] = work_package.input[wp_id]; + std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(), + work_package.output[wp_id].begin() + 1); } } +} - void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, - double dSimTime, - double dTimestep) - { - - std::vector> inout_chem = work_package.input; - std::vector to_ignore; - - for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) - { - if (work_package.mapping[wp_id] != CHEM_PQC) - { - to_ignore.push_back(wp_id); - } - - // HACK: remove the first element (cell_id) before sending to phreeqc - inout_chem[wp_id].erase( - inout_chem[wp_id].begin(), inout_chem[wp_id].begin() + 1); +void poet::ChemistryModule::WorkerPerfToMaster(int type, + const struct worker_s &timings) { + switch (type) { + case WORKER_PHREEQC: { + MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } - - this->pqc_runner->run(inout_chem, dTimestep, to_ignore); - - for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) - { - if (work_package.mapping[wp_id] == CHEM_PQC) - { - // HACK: as we removed the first element (cell_id) before sending to phreeqc, - // copy back with an offset of 1 - work_package.output[wp_id] = work_package.input[wp_id]; - std::copy(inout_chem[wp_id].begin(), inout_chem[wp_id].end(), - work_package.output[wp_id].begin() + 1); - } - } + case WORKER_CTRL_ITER: { + MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } - - void poet::ChemistryModule::WorkerPerfToMaster(int type, - const struct worker_s &timings) - { - switch (type) - { - case WORKER_PHREEQC: - { - MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_DHT_GET: - { - MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_DHT_FILL: - { - MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_IDLE: - { - MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_IP_WRITE: - { - double val = interp->getPHTWriteTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_READ: - { - double val = interp->getPHTReadTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_GATHER: - { - double val = interp->getDHTGatherTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_FC: - { - double val = interp->getInterpolationTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - default: - { - throw std::runtime_error("Unknown perf type in master's message."); - } - } + case WORKER_DHT_GET: { + MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } - - void poet::ChemistryModule::WorkerMetricsToMaster(int type) - { - MPI_Comm worker_comm = dht->getCommunicator(); - int worker_rank; - MPI_Comm_rank(worker_comm, &worker_rank); - - MPI_Comm &group_comm = this->group_comm; - - auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( - std::vector &send_buffer, int tag) - { - std::vector to_master(send_buffer.size()); - MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), - MPI_UINT32_T, MPI_SUM, 0, worker_comm); - - if (worker_rank == 0) - { - MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, - group_comm); - } - }; - - switch (type) - { - case WORKER_DHT_HITS: - { - reduce_and_send(dht_hits, WORKER_DHT_HITS); - break; - } - case WORKER_DHT_EVICTIONS: - { - reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); - break; - } - case WORKER_IP_CALLS: - { - reduce_and_send(interp_calls, WORKER_IP_CALLS); - return; - } - case WORKER_PHT_CACHE_HITS: - { - std::vector input = this->interp->getPHTLocalCacheHits(); - reduce_and_send(input, WORKER_PHT_CACHE_HITS); - return; - } - default: - { - throw std::runtime_error("Unknown perf type in master's message."); - } - } + case WORKER_DHT_FILL: { + MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; } + case WORKER_IDLE: { + MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } + case WORKER_IP_WRITE: { + double val = interp->getPHTWriteTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_READ: { + double val = interp->getPHTReadTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_GATHER: { + double val = interp->getDHTGatherTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_FC: { + double val = interp->getInterpolationTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + default: { + throw std::runtime_error("Unknown perf type in master's message."); + } + } +} + +void poet::ChemistryModule::WorkerMetricsToMaster(int type) { + MPI_Comm worker_comm = dht->getCommunicator(); + int worker_rank; + MPI_Comm_rank(worker_comm, &worker_rank); + + MPI_Comm &group_comm = this->group_comm; + + auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( + std::vector &send_buffer, int tag) { + std::vector to_master(send_buffer.size()); + MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), + MPI_UINT32_T, MPI_SUM, 0, worker_comm); + + if (worker_rank == 0) { + MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, + group_comm); + } + }; + + switch (type) { + case WORKER_DHT_HITS: { + reduce_and_send(dht_hits, WORKER_DHT_HITS); + break; + } + case WORKER_DHT_EVICTIONS: { + reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); + break; + } + case WORKER_IP_CALLS: { + reduce_and_send(interp_calls, WORKER_IP_CALLS); + return; + } + case WORKER_PHT_CACHE_HITS: { + std::vector input = this->interp->getPHTLocalCacheHits(); + reduce_and_send(input, WORKER_PHT_CACHE_HITS); + return; + } + default: { + throw std::runtime_error("Unknown perf type in master's message."); + } + } +} } // namespace poet diff --git a/src/Control/ControlModule.hpp b/src/Control/ControlModule.hpp index 2631e937a..e7afd1287 100644 --- a/src/Control/ControlModule.hpp +++ b/src/Control/ControlModule.hpp @@ -52,7 +52,7 @@ public: std::uint32_t control_interval; std::vector species_names; std::vector mape_threshold; - std::vector ctrl_cell_ids; + std::vector ctrl_cell_ids; }; void enableControlLogic(const ControlSetup &setup) { diff --git a/src/poet.cpp b/src/poet.cpp index bf04da0a8..36a648208 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -250,12 +250,13 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) { params.timesteps = Rcpp::as>(global_rt_setup->operator[]("timesteps")); - params.checkpoint_interval = Rcpp::as(global_rt_setup->operator[]("checkpoint_interval")); + params.checkpoint_interval = + Rcpp::as(global_rt_setup->operator[]("checkpoint_interval")); params.control_interval = Rcpp::as(global_rt_setup->operator[]("control_interval")); params.mape_threshold = Rcpp::as>( global_rt_setup->operator[]("mape_threshold")); - params.ctrl_cell_ids = Rcpp::as>( + params.ctrl_cell_ids = Rcpp::as>( global_rt_setup->operator[]("ctrl_cell_ids")); catch (const std::exception &e) { @@ -465,6 +466,30 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) { return profiling; } + static void getControlCellIds(const vector &ids, int root, + MPI_Comm comm) { + std::uint32_t n_ids = 0; + int rank; + MPI_Comm_rank(comm, &rank); + bool is_master = root == rank; + + if (is_master) { + n_ids = ids.size(); + } + // broadcast size of id vector + MPI_Bcast(n_ids, 1, MPI_UINT32_T, root, comm); + + // worker + if (!is_master) { + ids.resize(n_ids); + } + // broadcast control cell ids + if (n_ids > 0) { + MPI_Bcast(ids.data(), n_ids, MPI_UINT32_T, root, comm); + } + } + + std::vector getSpeciesNames(const Field &&field, int root, MPI_Comm comm) { std::uint32_t n_elements;