diff --git a/bench/barite/barite_het_rt.R b/bench/barite/barite_het_rt.R index a0b63df67..beb3a5c45 100644 --- a/bench/barite/barite_het_rt.R +++ b/bench/barite/barite_het_rt.R @@ -1,4 +1,4 @@ list( timesteps = rep(50, 100), store_result = TRUE -) \ No newline at end of file +) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ca39b6106..1886a9f43 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -24,7 +24,7 @@ elseif (POET_TUG_APPROACH STREQUAL "Explicit") target_compile_definitions(POETLib PRIVATE POET_TUG_FTCS) endif() -target_include_directories(POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}") +target_include_directories(POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_BINARY_DIR}") target_link_libraries( POETLib PUBLIC RRuntime diff --git a/src/Chemistry/ChemistryModule.hpp b/src/Chemistry/ChemistryModule.hpp index 4bf925400..22547a212 100644 --- a/src/Chemistry/ChemistryModule.hpp +++ b/src/Chemistry/ChemistryModule.hpp @@ -12,6 +12,8 @@ #include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/Interpolation.hpp" +#include "poet.hpp" + #include "PhreeqcRunner.hpp" #include #include @@ -249,6 +251,8 @@ public: std::vector ai_surrogate_validity_vector; + RuntimeParameters *runtime_params = nullptr; + protected: void initializeDHT(uint32_t size_mb, const NamedVector &key_species, @@ -275,7 +279,7 @@ protected: CHEM_AI_BCAST_VALIDITY }; - enum { LOOP_WORK, LOOP_END }; + enum { LOOP_WORK, LOOP_END, WITH_REL_ERROR }; enum { WORKER_PHREEQC, @@ -316,7 +320,7 @@ protected: void MasterSendPkgs(worker_list_t &w_list, workpointer_t &work_pointer, int &pkg_to_send, int &count_pkgs, int &free_workers, - double dt, uint32_t iteration, + double dt, uint32_t iteration, uint32_t control_iter, const std::vector &wp_sizes_vector); void MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send, int &free_workers); @@ -373,7 +377,7 @@ protected: bool ai_surrogate_enabled{false}; - static constexpr uint32_t BUFFER_OFFSET = 5; + static constexpr uint32_t BUFFER_OFFSET = 6; inline void ChemBCast(void *buf, int count, MPI_Datatype datatype) const { MPI_Bcast(buf, count, datatype, 0, this->group_comm); diff --git a/src/Chemistry/MasterFunctions.cpp b/src/Chemistry/MasterFunctions.cpp index fce7b4139..63858c530 100644 --- a/src/Chemistry/MasterFunctions.cpp +++ b/src/Chemistry/MasterFunctions.cpp @@ -7,7 +7,8 @@ #include std::vector -poet::ChemistryModule::MasterGatherWorkerMetrics(int type) const { +poet::ChemistryModule::MasterGatherWorkerMetrics(int type) const +{ MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); uint32_t dummy; @@ -21,7 +22,8 @@ poet::ChemistryModule::MasterGatherWorkerMetrics(int type) const { } std::vector -poet::ChemistryModule::MasterGatherWorkerTimings(int type) const { +poet::ChemistryModule::MasterGatherWorkerTimings(int type) const +{ MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); double dummy; @@ -34,31 +36,36 @@ poet::ChemistryModule::MasterGatherWorkerTimings(int type) const { return timings; } -std::vector poet::ChemistryModule::GetWorkerPhreeqcTimings() const { +std::vector poet::ChemistryModule::GetWorkerPhreeqcTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_PHREEQC); } -std::vector poet::ChemistryModule::GetWorkerDHTGetTimings() const { +std::vector poet::ChemistryModule::GetWorkerDHTGetTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_DHT_GET); } -std::vector poet::ChemistryModule::GetWorkerDHTFillTimings() const { +std::vector poet::ChemistryModule::GetWorkerDHTFillTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_DHT_FILL); } -std::vector poet::ChemistryModule::GetWorkerIdleTimings() const { +std::vector poet::ChemistryModule::GetWorkerIdleTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_IDLE); } -std::vector poet::ChemistryModule::GetWorkerDHTHits() const { +std::vector poet::ChemistryModule::GetWorkerDHTHits() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); type = WORKER_DHT_HITS; @@ -76,7 +83,8 @@ std::vector poet::ChemistryModule::GetWorkerDHTHits() const { return ret; } -std::vector poet::ChemistryModule::GetWorkerDHTEvictions() const { +std::vector poet::ChemistryModule::GetWorkerDHTEvictions() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); type = WORKER_DHT_EVICTIONS; @@ -95,35 +103,40 @@ std::vector poet::ChemistryModule::GetWorkerDHTEvictions() const { } std::vector -poet::ChemistryModule::GetWorkerInterpolationWriteTimings() const { +poet::ChemistryModule::GetWorkerInterpolationWriteTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_IP_WRITE); } std::vector -poet::ChemistryModule::GetWorkerInterpolationReadTimings() const { +poet::ChemistryModule::GetWorkerInterpolationReadTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_IP_READ); } std::vector -poet::ChemistryModule::GetWorkerInterpolationGatherTimings() const { +poet::ChemistryModule::GetWorkerInterpolationGatherTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_IP_GATHER); } std::vector -poet::ChemistryModule::GetWorkerInterpolationFunctionCallTimings() const { +poet::ChemistryModule::GetWorkerInterpolationFunctionCallTimings() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); return MasterGatherWorkerTimings(WORKER_IP_FC); } std::vector -poet::ChemistryModule::GetWorkerInterpolationCalls() const { +poet::ChemistryModule::GetWorkerInterpolationCalls() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); type = WORKER_IP_CALLS; @@ -141,7 +154,8 @@ poet::ChemistryModule::GetWorkerInterpolationCalls() const { return ret; } -std::vector poet::ChemistryModule::GetWorkerPHTCacheHits() const { +std::vector poet::ChemistryModule::GetWorkerPHTCacheHits() const +{ int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); type = WORKER_PHT_CACHE_HITS; @@ -161,11 +175,14 @@ std::vector poet::ChemistryModule::GetWorkerPHTCacheHits() const { inline std::vector shuffleVector(const std::vector &in_vector, uint32_t size_per_prop, - uint32_t wp_count) { + uint32_t wp_count) +{ std::vector out_buffer(in_vector.size()); uint32_t write_i = 0; - for (uint32_t i = 0; i < wp_count; i++) { - for (uint32_t j = i; j < size_per_prop; j += wp_count) { + for (uint32_t i = 0; i < wp_count; i++) + { + for (uint32_t j = i; j < size_per_prop; j += wp_count) + { out_buffer[write_i] = in_vector[j]; write_i++; } @@ -175,14 +192,18 @@ inline std::vector shuffleVector(const std::vector &in_vector, inline std::vector shuffleField(const std::vector &in_field, uint32_t size_per_prop, - uint32_t prop_count, - uint32_t wp_count) { + uint32_t species_count, + uint32_t wp_count) +{ std::vector out_buffer(in_field.size()); uint32_t write_i = 0; - for (uint32_t i = 0; i < wp_count; i++) { - for (uint32_t j = i; j < size_per_prop; j += wp_count) { - for (uint32_t k = 0; k < prop_count; k++) { - out_buffer[(write_i * prop_count) + k] = + for (uint32_t i = 0; i < wp_count; i++) + { + for (uint32_t j = i; j < size_per_prop; j += wp_count) + { + for (uint32_t k = 0; k < species_count; k++) + { + out_buffer[(write_i * species_count) + k] = in_field[(k * size_per_prop) + j]; } write_i++; @@ -192,28 +213,34 @@ inline std::vector shuffleField(const std::vector &in_field, } inline void unshuffleField(const std::vector &in_buffer, - uint32_t size_per_prop, uint32_t prop_count, - uint32_t wp_count, std::vector &out_field) { + uint32_t size_per_prop, uint32_t species_count, + uint32_t wp_count, std::vector &out_field) +{ uint32_t read_i = 0; - for (uint32_t i = 0; i < wp_count; i++) { - for (uint32_t j = i; j < size_per_prop; j += wp_count) { - for (uint32_t k = 0; k < prop_count; k++) { + for (uint32_t i = 0; i < wp_count; i++) + { + for (uint32_t j = i; j < size_per_prop; j += wp_count) + { + for (uint32_t k = 0; k < species_count; k++) + { out_field[(k * size_per_prop) + j] = - in_buffer[(read_i * prop_count) + k]; + in_buffer[(read_i * species_count) + k]; } read_i++; } } } -inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) { +inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) +{ /* visual progress */ double progress = (float)(count_pkgs + 1) / n_wp; std::cout << "["; int pos = barWidth * progress; - for (int iprog = 0; iprog < barWidth; ++iprog) { + for (int iprog = 0; iprog < barWidth; ++iprog) + { if (iprog < pos) std::cout << "="; else if (iprog == pos) @@ -228,14 +255,17 @@ inline void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70) { inline void poet::ChemistryModule::MasterSendPkgs( worker_list_t &w_list, workpointer_t &work_pointer, int &pkg_to_send, - int &count_pkgs, int &free_workers, double dt, uint32_t iteration, - const std::vector &wp_sizes_vector) { + int &count_pkgs, int &free_workers, double dt, uint32_t iteration, uint32_t control_iteration, + const std::vector &wp_sizes_vector) +{ /* declare variables */ int local_work_package_size; /* search for free workers and send work */ - for (int p = 0; p < this->comm_size - 1; p++) { - if (w_list[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ { + for (int p = 0; p < this->comm_size - 1; p++) + { + if (w_list[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ + { /* to enable different work_package_size, set local copy of * work_package_size to pre-calculated work package size vector */ @@ -264,7 +294,8 @@ inline void poet::ChemistryModule::MasterSendPkgs( // current work package start location in field uint32_t wp_start_index = std::accumulate(wp_sizes_vector.begin(), std::next(wp_sizes_vector.begin(), count_pkgs), 0); send_buffer[end_of_wp + 4] = wp_start_index; - + // whether this iteration is a control iteration + send_buffer[end_of_wp + 5] = control_iteration; /* ATTENTION Worker p has rank p+1 */ // MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1, @@ -283,7 +314,8 @@ inline void poet::ChemistryModule::MasterSendPkgs( inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, int &pkg_to_recv, bool to_send, - int &free_workers) { + int &free_workers) +{ /* declare most of the variables here */ int need_to_receive = 1; double idle_a, idle_b; @@ -293,37 +325,74 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, // master_recv_a = MPI_Wtime(); /* start to loop as long there are packages to recv and the need to receive */ - while (need_to_receive && pkg_to_recv > 0) { + while (need_to_receive && pkg_to_recv > 0) + { // only of there are still packages to send and free workers are available if (to_send && free_workers > 0) // non blocking probing - MPI_Iprobe(MPI_ANY_SOURCE, LOOP_WORK, MPI_COMM_WORLD, &need_to_receive, + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &need_to_receive, &probe_status); - else { + else + { idle_a = MPI_Wtime(); // blocking probing - MPI_Probe(MPI_ANY_SOURCE, LOOP_WORK, MPI_COMM_WORLD, &probe_status); + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status); idle_b = MPI_Wtime(); this->idle_t += idle_b - idle_a; } /* if need_to_receive was set to true above, so there is a message to * receive */ - if (need_to_receive) { + if (need_to_receive) + { p = probe_status.MPI_SOURCE; - MPI_Get_count(&probe_status, MPI_DOUBLE, &size); - MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK, - this->group_comm, MPI_STATUS_IGNORE); - w_list[p - 1].has_work = 0; - pkg_to_recv -= 1; - free_workers++; + if (probe_status.MPI_TAG == LOOP_WORK) + { + MPI_Get_count(&probe_status, MPI_DOUBLE, &size); + MPI_Recv(w_list[p - 1].send_addr, size, MPI_DOUBLE, p, LOOP_WORK, + this->group_comm, MPI_STATUS_IGNORE); + w_list[p - 1].has_work = 0; + pkg_to_recv -= 1; + free_workers++; + } + if (probe_status.MPI_TAG == WITH_REL_ERROR) + { + MPI_Get_count(&probe_status, MPI_DOUBLE, &size); + + std::cout << "[Master] Probed rel error from worker " << p + << ", size = " << size << std::endl; + + int half = size/2; + + std::vector rel_err_buffer(size); + std::vector rel_error(half); + MPI_Recv(rel_err_buffer.data(), size, MPI_DOUBLE, p, WITH_REL_ERROR, + this->group_comm, MPI_STATUS_IGNORE); + + + + std::copy(rel_err_buffer.begin(), rel_err_buffer.begin() + half, + w_list[p - 1].send_addr); + + std::copy(rel_err_buffer.begin() + half, rel_err_buffer.end(), rel_error.begin()); + + std::cout << "[Master] Received rel error buffer from worker " << p + << ", first value = " << (rel_err_buffer.empty() ? -1 : rel_err_buffer[0]) + << std::endl; + + w_list[p - 1].has_work = 0; + pkg_to_recv -= 1; + free_workers++; + } } } } -void poet::ChemistryModule::simulate(double dt) { +void poet::ChemistryModule::simulate(double dt) +{ double start_t{MPI_Wtime()}; - if (this->is_sequential) { + if (this->is_sequential) + { MasterRunSequential(); return; } @@ -333,7 +402,8 @@ void poet::ChemistryModule::simulate(double dt) { this->chem_t += end_t - start_t; } -void poet::ChemistryModule::MasterRunSequential() { +void poet::ChemistryModule::MasterRunSequential() +{ // std::vector shuffled_field = // shuffleField(chem_field.AsVector(), n_cells, prop_count, 1); @@ -360,7 +430,8 @@ void poet::ChemistryModule::MasterRunSequential() { // chem_field = out_vec; } -void poet::ChemistryModule::MasterRunParallel(double dt) { +void poet::ChemistryModule::MasterRunParallel(double dt) +{ /* declare most of the needed variables here */ double seq_a, seq_b, seq_c, seq_d; double worker_chemistry_a, worker_chemistry_b; @@ -373,14 +444,15 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { const std::vector wp_sizes_vector = CalculateWPSizesVector(this->n_cells, this->wp_size); - if (this->ai_surrogate_enabled) { + if (this->ai_surrogate_enabled) + { ftype = CHEM_AI_BCAST_VALIDITY; PropagateFunctionType(ftype); this->ai_surrogate_validity_vector = shuffleVector(this->ai_surrogate_validity_vector, - this->n_cells, + this->n_cells, wp_sizes_vector.size()); ChemBCast(&this->ai_surrogate_validity_vector.front(), this->n_cells, MPI_INT); - } + } ftype = CHEM_WORK_LOOP; PropagateFunctionType(ftype); @@ -388,6 +460,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { MPI_Barrier(this->group_comm); static uint32_t iteration = 0; + uint32_t control_iteration = static_cast(this->runtime_params->control_iteration_active ? 1 : 0); /* start time measurement of sequential part */ seq_a = MPI_Wtime(); @@ -417,20 +490,47 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { /* start send/recv loop */ // while there are still packages to recv - while (pkg_to_recv > 0) { + while (pkg_to_recv > 0) + { // print a progressbar to stdout - if (print_progessbar) { + if (print_progessbar) + { printProgressbar((int)i_pkgs, (int)wp_sizes_vector.size()); } // while there are still packages to send - if (pkg_to_send > 0) { + if (pkg_to_send > 0) + { // send packages to all free workers ... MasterSendPkgs(worker_list, work_pointer, pkg_to_send, i_pkgs, - free_workers, dt, iteration, wp_sizes_vector); + free_workers, dt, iteration, control_iteration, wp_sizes_vector); } // ... and try to receive them from workers who has finished their work MasterRecvPkgs(worker_list, pkg_to_recv, pkg_to_send > 0, free_workers); } + // to do: Statistik + + /* if control_iteration_active is true receive rel. error data and compare with epsilon */ + if (this->runtime_params->control_iteration_active) + { + + // do Statistik + /** + int rel_err_offset = size / 2; // or calculate as needed + + for (std::size_t ep_i = 0; ep_i < this->runtime_params->species_epsilon.size(); ep_i++) + { + if (rel_err_buffer[rel_err_offset + ep_i] > this->runtime_params->species_epsilon[ep_i]) + { + std::cout << "[Master] At least one relative error exceeded epsilon threshold!" + << std::endl; + std::cout << "value: " << rel_err_buffer[rel_err_offset + ep_i] << " epsilon: " + << this->runtime_params->species_epsilon[ep_i] << std::endl; + break; + } + } + */ + } + // Just to complete the progressbar std::cout << std::endl; @@ -461,7 +561,8 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { /* end time measurement of whole chemistry simulation */ /* advise workers to end chemistry iteration */ - for (int i = 1; i < this->comm_size; i++) { + for (int i = 1; i < this->comm_size; i++) + { MPI_Send(NULL, 0, MPI_DOUBLE, i, LOOP_END, this->group_comm); } @@ -469,28 +570,32 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { iteration++; } -void poet::ChemistryModule::MasterLoopBreak() { +void poet::ChemistryModule::MasterLoopBreak() +{ int type = CHEM_BREAK_MAIN_LOOP; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); } std::vector poet::ChemistryModule::CalculateWPSizesVector(uint32_t n_cells, - uint32_t wp_size) const { + uint32_t wp_size) const +{ bool mod_pkgs = (n_cells % wp_size) != 0; uint32_t n_packages = (uint32_t)(n_cells / wp_size) + static_cast(mod_pkgs); std::vector wp_sizes_vector(n_packages, 0); - for (int i = 0; i < n_cells; i++) { + for (int i = 0; i < n_cells; i++) + { wp_sizes_vector[i % n_packages] += 1; } return wp_sizes_vector; } -void poet::ChemistryModule::masterSetField(Field field) { +void poet::ChemistryModule::masterSetField(Field field) +{ this->chem_field = field; this->prop_count = field.GetProps().size(); diff --git a/src/Chemistry/WorkerFunctions.cpp b/src/Chemistry/WorkerFunctions.cpp index b7eb6096c..a45a5bc23 100644 --- a/src/Chemistry/WorkerFunctions.cpp +++ b/src/Chemistry/WorkerFunctions.cpp @@ -10,414 +10,559 @@ #include #include #include +#include #include #include #include -namespace poet { +namespace poet +{ -inline std::string get_string(int root, MPI_Comm communicator) { - int count; - MPI_Bcast(&count, 1, MPI_INT, root, communicator); + inline std::string get_string(int root, MPI_Comm communicator) + { + int count; + MPI_Bcast(&count, 1, MPI_INT, root, communicator); - char *buffer = new char[count + 1]; - MPI_Bcast(buffer, count, MPI_CHAR, root, communicator); + char *buffer = new char[count + 1]; + MPI_Bcast(buffer, count, MPI_CHAR, root, communicator); - buffer[count] = '\0'; + buffer[count] = '\0'; - std::string ret_str(buffer); - delete[] buffer; + std::string ret_str(buffer); + delete[] buffer; - return ret_str; -} + return ret_str; + } -void poet::ChemistryModule::WorkerLoop() { - struct worker_s timings; + void poet::ChemistryModule::WorkerLoop() + { + struct worker_s timings; - // HACK: defining the worker iteration count here, which will increment after - // each CHEM_ITER_END message - uint32_t iteration = 1; - bool loop = true; + // HACK: defining the worker iteration count here, which will increment after + // each CHEM_ITER_END message + uint32_t iteration = 1; + bool loop = true; - while (loop) { - int func_type; - PropagateFunctionType(func_type); + while (loop) + { + int func_type; + PropagateFunctionType(func_type); - switch (func_type) { - case CHEM_FIELD_INIT: { - ChemBCast(&this->prop_count, 1, MPI_UINT32_T); - if (this->ai_surrogate_enabled) { - this->ai_surrogate_validity_vector.resize( - this->n_cells); // resize statt reserve? - } - break; - } - case CHEM_AI_BCAST_VALIDITY: { - // Receive the index vector of valid ai surrogate predictions - MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, - MPI_INT, 0, this->group_comm); - break; - } - case CHEM_WORK_LOOP: { - WorkerProcessPkgs(timings, iteration); - break; - } - case CHEM_PERF: { - int type; - ChemBCast(&type, 1, MPI_INT); - if (type < WORKER_DHT_HITS) { - WorkerPerfToMaster(type, timings); + switch (func_type) + { + case CHEM_FIELD_INIT: + { + ChemBCast(&this->prop_count, 1, MPI_UINT32_T); + if (this->ai_surrogate_enabled) + { + this->ai_surrogate_validity_vector.resize( + this->n_cells); // resize statt reserve? + } break; } - WorkerMetricsToMaster(type); - break; - } - case CHEM_BREAK_MAIN_LOOP: { - WorkerPostSim(iteration); - loop = false; - break; - } - default: { - throw std::runtime_error("Worker received unknown tag from master."); - } - } - } -} - -void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings, - uint32_t &iteration) { - MPI_Status probe_status; - bool loop = true; - - MPI_Barrier(this->group_comm); - - while (loop) { - double idle_a = MPI_Wtime(); - MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status); - double idle_b = MPI_Wtime(); - - switch (probe_status.MPI_TAG) { - case LOOP_WORK: { - timings.idle_t += idle_b - idle_a; - int count; - MPI_Get_count(&probe_status, MPI_DOUBLE, &count); - - WorkerDoWork(probe_status, count, timings); - break; - } - case LOOP_END: { - WorkerPostIter(probe_status, iteration); - iteration++; - loop = false; - break; - } - } - } -} - -void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, - int double_count, - struct worker_s &timings) { - static int counter = 1; - - double dht_get_start, dht_get_end; - double phreeqc_time_start, phreeqc_time_end; - double dht_fill_start, dht_fill_end; - - uint32_t iteration; - double dt; - double current_sim_time; - uint32_t wp_start_index; - int count = double_count; - std::vector mpi_buffer(count); - - /* receive */ - MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, - MPI_STATUS_IGNORE); - - /* decrement count of work_package by BUFFER_OFFSET */ - count -= BUFFER_OFFSET; - - /* check for changes on all additional variables given by the 'header' of - * mpi_buffer */ - - // work_package_size - poet::WorkPackage s_curr_wp(mpi_buffer[count]); - - // current iteration of simulation - iteration = mpi_buffer[count + 1]; - - // current timestep size - dt = mpi_buffer[count + 2]; - - // current simulation time ('age' of simulation) - current_sim_time = mpi_buffer[count + 3]; - - // current work package start location in field - wp_start_index = mpi_buffer[count + 4]; - - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { - s_curr_wp.input[wp_i] = - std::vector(mpi_buffer.begin() + this->prop_count * wp_i, - mpi_buffer.begin() + this->prop_count * (wp_i + 1)); - } - - // std::cout << this->comm_rank << ":" << counter++ << std::endl; - if (dht_enabled || interp_enabled) { - dht->prepareKeys(s_curr_wp.input, dt); - } - - if (dht_enabled) { - /* check for values in DHT */ - dht_get_start = MPI_Wtime(); - dht->checkDHT(s_curr_wp); - dht_get_end = MPI_Wtime(); - timings.dht_get += dht_get_end - dht_get_start; - } - - if (interp_enabled) { - interp->tryInterpolation(s_curr_wp); - } - - if (this->ai_surrogate_enabled) { - // Map valid predictions from the ai surrogate in the workpackage - for (int i = 0; i < s_curr_wp.size; i++) { - if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) { - s_curr_wp.mapping[i] = CHEM_AISURR; + case CHEM_AI_BCAST_VALIDITY: + { + // Receive the index vector of valid ai surrogate predictions + MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, + MPI_INT, 0, this->group_comm); + break; + } + case CHEM_WORK_LOOP: + { + WorkerProcessPkgs(timings, iteration); + break; + } + case CHEM_PERF: + { + int type; + ChemBCast(&type, 1, MPI_INT); + if (type < WORKER_DHT_HITS) + { + WorkerPerfToMaster(type, timings); + break; + } + WorkerMetricsToMaster(type); + break; + } + case CHEM_BREAK_MAIN_LOOP: + { + WorkerPostSim(iteration); + loop = false; + break; + } + default: + { + throw std::runtime_error("Worker received unknown tag from master."); + } } } } - phreeqc_time_start = MPI_Wtime(); + void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings, + uint32_t &iteration) + { + MPI_Status probe_status; + bool loop = true; - WorkerRunWorkPackage(s_curr_wp, current_sim_time, dt); + MPI_Barrier(this->group_comm); - phreeqc_time_end = MPI_Wtime(); + while (loop) + { + double idle_a = MPI_Wtime(); + MPI_Probe(0, MPI_ANY_TAG, this->group_comm, &probe_status); + double idle_b = MPI_Wtime(); - for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { - std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(), - mpi_buffer.begin() + this->prop_count * wp_i); - } + switch (probe_status.MPI_TAG) + { + case LOOP_WORK: + { + timings.idle_t += idle_b - idle_a; + int count; + MPI_Get_count(&probe_status, MPI_DOUBLE, &count); - /* send results to master */ - MPI_Request send_req; - MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, MPI_COMM_WORLD, - &send_req); - - if (dht_enabled || interp_enabled) { - /* write results to DHT */ - dht_fill_start = MPI_Wtime(); - dht->fillDHT(s_curr_wp); - dht_fill_end = MPI_Wtime(); - - if (interp_enabled) { - interp->writePairs(); + WorkerDoWork(probe_status, count, timings); + break; + } + case LOOP_END: + { + WorkerPostIter(probe_status, iteration); + iteration++; + loop = false; + break; + } + } } - timings.dht_fill += dht_fill_end - dht_fill_start; } - timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; + void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, + int double_count, + struct worker_s &timings) + { + static int counter = 1; - MPI_Wait(&send_req, MPI_STATUS_IGNORE); -} + double dht_get_start, dht_get_end; + double phreeqc_time_start, phreeqc_time_end; + double dht_fill_start, dht_fill_end; -void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, - uint32_t iteration) { - MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm, - MPI_STATUS_IGNORE); + uint32_t iteration; + double dt; + double current_sim_time; + uint32_t wp_start_index; + int count = double_count; + bool control_iteration_active = false; + std::vector mpi_buffer(count); - if (this->dht_enabled) { - dht_hits.push_back(dht->getHits()); - dht_evictions.push_back(dht->getEvictions()); - dht->resetCounter(); + /* receive */ + MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, + MPI_STATUS_IGNORE); - if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { + /* decrement count of work_package by BUFFER_OFFSET */ + count -= BUFFER_OFFSET; + /* check for changes on all additional variables given by the 'header' of + * mpi_buffer */ + + // work_package_size + poet::WorkPackage s_curr_wp(mpi_buffer[count]); + + // current iteration of simulation + iteration = mpi_buffer[count + 1]; + + // current timestep size + dt = mpi_buffer[count + 2]; + + // current simulation time ('age' of simulation) + current_sim_time = mpi_buffer[count + 3]; + + // current work package start location in field + wp_start_index = mpi_buffer[count + 4]; + + control_iteration_active = (mpi_buffer[count + 5] == 1); + + for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) + { + s_curr_wp.input[wp_i] = + std::vector(mpi_buffer.begin() + this->prop_count * wp_i, + mpi_buffer.begin() + this->prop_count * (wp_i + 1)); + } + + // std::cout << this->comm_rank << ":" << counter++ << std::endl; + if (dht_enabled || interp_enabled) + { + dht->prepareKeys(s_curr_wp.input, dt); + } + + if (dht_enabled) + { + /* check for values in DHT */ + dht_get_start = MPI_Wtime(); + dht->checkDHT(s_curr_wp); + dht_get_end = MPI_Wtime(); + timings.dht_get += dht_get_end - dht_get_start; + } + + if (interp_enabled) + { + interp->tryInterpolation(s_curr_wp); + } + + if (this->ai_surrogate_enabled) + { + // Map valid predictions from the ai surrogate in the workpackage + for (int i = 0; i < s_curr_wp.size; i++) + { + if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) + { + s_curr_wp.mapping[i] = CHEM_AISURR; + } + } + } + + /* if control iteration: create copy surrogate results (output and mappings) and then set them to zero, + give this to phreeqc */ + + poet::WorkPackage s_curr_wp_pqc = s_curr_wp; + + if (control_iteration_active) + { + for (std::size_t wp_i = 0; wp_i < s_curr_wp_pqc.size; wp_i++) + { + s_curr_wp_pqc.output[wp_i] = std::vector(this->prop_count, 0.0); + s_curr_wp_pqc.mapping[wp_i] = 0; + } + } + + phreeqc_time_start = MPI_Wtime(); + + WorkerRunWorkPackage(control_iteration_active ? s_curr_wp_pqc : s_curr_wp, current_sim_time, dt); + + phreeqc_time_end = MPI_Wtime(); + + if (control_iteration_active) + { + // increase size for relative error + std::size_t rel_error_size = s_curr_wp.size * this->prop_count; + + // extend mpi_buffer, for rel. error for every species + mpi_buffer.resize(count + rel_error_size); + std::size_t offset = count; + count += rel_error_size; + + // calc rel. error if phreeqc != surrogate + for (std::size_t wp_i = 0; wp_i < s_curr_wp_pqc.size; wp_i++) + { + const auto &surrogate_result = s_curr_wp.output[wp_i]; + const auto &phreeqc_result = s_curr_wp_pqc.output[wp_i]; + + // std::cout << "surrogate_result.size() " << surrogate_result.size() << ", phreeqc_result " << phreeqc_result.size() << std::endl; + + // fill NaNs + if (surrogate_result.size() == 0) + { + for (std::size_t i = 0; i < this->prop_count; i++) + { + mpi_buffer[offset++] = std::numeric_limits::quiet_NaN(); + } + } + + // compute rel error + if (surrogate_result.size() == phreeqc_result.size()) + { + for (std::size_t i = 0; i < this->prop_count; i++) + { + double ref = phreeqc_result[i]; + double surrogate = surrogate_result[i]; + + if (std::abs(ref) > 1e-9) + { + mpi_buffer[offset++] = std::abs((surrogate - ref) / ref); + } + else + { + mpi_buffer[offset++] = 0.0; + } + } + } + } + } + + poet::WorkPackage &s_curr_wp_copy = control_iteration_active ? s_curr_wp_pqc : s_curr_wp; + + for (std::size_t wp_i = 0; wp_i < s_curr_wp_copy.size; wp_i++) + { + std::copy(s_curr_wp_copy.output[wp_i].begin(), s_curr_wp_copy.output[wp_i].end(), + mpi_buffer.begin() + this->prop_count * wp_i); + } + + /* send results to master */ + MPI_Request send_req; + + int mpi_tag = control_iteration_active ? WITH_REL_ERROR : LOOP_WORK; + MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req); + + if (control_iteration_active) + { + std::cout << "[Worker " << this->comm_rank << "] Sent results." << std::endl; + } + + if (dht_enabled || interp_enabled) + { + /* write results to DHT */ + dht_fill_start = MPI_Wtime(); + dht->fillDHT(s_curr_wp_copy); + dht_fill_end = MPI_Wtime(); + + if (interp_enabled) + { + interp->writePairs(); + } + timings.dht_fill += dht_fill_end - dht_fill_start; + } + + timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; + + MPI_Wait(&send_req, MPI_STATUS_IGNORE); + } + + void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, + uint32_t iteration) + { + MPI_Recv(NULL, 0, MPI_DOUBLE, 0, LOOP_END, this->group_comm, + MPI_STATUS_IGNORE); + + if (this->dht_enabled) + { + dht_hits.push_back(dht->getHits()); + dht_evictions.push_back(dht->getEvictions()); + dht->resetCounter(); + + if (this->dht_snaps_type == DHT_SNAPS_ITEREND) + { + WorkerWriteDHTDump(iteration); + } + } + + if (this->interp_enabled) + { + std::stringstream out; + interp_calls.push_back(interp->getInterpolationCount()); + interp->resetCounter(); + interp->writePHTStats(); + if (this->dht_snaps_type == DHT_SNAPS_ITEREND) + { + out << this->dht_file_out_dir << "/iter_" << std::setfill('0') + << std::setw(this->file_pad) << iteration << ".pht"; + interp->dumpPHTState(out.str()); + } + + const auto max_mean_idx = + DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); + + if (max_mean_idx >= 2) + { + DHT_flush(this->interp->getDHTObject()); + DHT_flush(this->dht->getDHT()); + if (this->comm_rank == 2) + { + std::cout << "Flushed both DHT and PHT!\n\n"; + } + } + } + + RInsidePOET::getInstance().parseEvalQ("gc()"); + } + + void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) + { + if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) + { WorkerWriteDHTDump(iteration); } - } - - if (this->interp_enabled) { - std::stringstream out; - interp_calls.push_back(interp->getInterpolationCount()); - interp->resetCounter(); - interp->writePHTStats(); - if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { + if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) + { + std::stringstream out; out << this->dht_file_out_dir << "/iter_" << std::setfill('0') << std::setw(this->file_pad) << iteration << ".pht"; interp->dumpPHTState(out.str()); } + } - const auto max_mean_idx = - DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); + void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) + { + std::stringstream out; + out << this->dht_file_out_dir << "/iter_" << std::setfill('0') + << std::setw(this->file_pad) << iteration << ".dht"; + int res = dht->tableToFile(out.str().c_str()); + if (res != DHT_SUCCESS && this->comm_rank == 2) + std::cerr + << "CPP: Worker: Error in writing current state of DHT to file.\n"; + else if (this->comm_rank == 2) + std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() + << "\n"; + } - if (max_mean_idx >= 2) { - DHT_flush(this->interp->getDHTObject()); - DHT_flush(this->dht->getDHT()); - if (this->comm_rank == 2) { - std::cout << "Flushed both DHT and PHT!\n\n"; + void poet::ChemistryModule::WorkerReadDHTDump( + const std::string &dht_input_file) + { + int res = dht->fileToTable((char *)dht_input_file.c_str()); + if (res != DHT_SUCCESS) + { + if (res == DHT_WRONG_FILE) + { + if (this->comm_rank == 1) + std::cerr + << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n"; + } + else + { + if (this->comm_rank == 1) + std::cerr << "CPP: Worker: Error in loading current state of DHT from " + "file. Continue with empty DHT ...\n"; + } + } + else + { + if (this->comm_rank == 2) + std::cout << "CPP: Worker: Successfully loaded state of DHT from file " + << dht_input_file << "\n"; + } + } + + void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, + double dSimTime, + double dTimestep) + { + + std::vector> inout_chem = work_package.input; + std::vector to_ignore; + + for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) + { + if (work_package.mapping[wp_id] != CHEM_PQC) + { + to_ignore.push_back(wp_id); + } + } + this->pqc_runner->run(inout_chem, dTimestep, to_ignore); + + for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) + { + if (work_package.mapping[wp_id] == CHEM_PQC) + { + work_package.output[wp_id] = inout_chem[wp_id]; } } } - RInsidePOET::getInstance().parseEvalQ("gc()"); -} - -void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) { - if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { - WorkerWriteDHTDump(iteration); - } - if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { - std::stringstream out; - out << this->dht_file_out_dir << "/iter_" << std::setfill('0') - << std::setw(this->file_pad) << iteration << ".pht"; - interp->dumpPHTState(out.str()); - } -} - -void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) { - std::stringstream out; - out << this->dht_file_out_dir << "/iter_" << std::setfill('0') - << std::setw(this->file_pad) << iteration << ".dht"; - int res = dht->tableToFile(out.str().c_str()); - if (res != DHT_SUCCESS && this->comm_rank == 2) - std::cerr - << "CPP: Worker: Error in writing current state of DHT to file.\n"; - else if (this->comm_rank == 2) - std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() - << "\n"; -} - -void poet::ChemistryModule::WorkerReadDHTDump( - const std::string &dht_input_file) { - int res = dht->fileToTable((char *)dht_input_file.c_str()); - if (res != DHT_SUCCESS) { - if (res == DHT_WRONG_FILE) { - if (this->comm_rank == 1) - std::cerr - << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n"; - } else { - if (this->comm_rank == 1) - std::cerr << "CPP: Worker: Error in loading current state of DHT from " - "file. Continue with empty DHT ...\n"; + void poet::ChemistryModule::WorkerPerfToMaster(int type, + const struct worker_s &timings) + { + switch (type) + { + case WORKER_PHREEQC: + { + MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } + case WORKER_DHT_GET: + { + MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } + case WORKER_DHT_FILL: + { + MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } + case WORKER_IDLE: + { + MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } + case WORKER_IP_WRITE: + { + double val = interp->getPHTWriteTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_READ: + { + double val = interp->getPHTReadTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_GATHER: + { + double val = interp->getDHTGatherTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + case WORKER_IP_FC: + { + double val = interp->getInterpolationTime(); + MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); + break; + } + default: + { + throw std::runtime_error("Unknown perf type in master's message."); } - } else { - if (this->comm_rank == 2) - std::cout << "CPP: Worker: Successfully loaded state of DHT from file " - << dht_input_file << "\n"; - } -} - -void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, - double dSimTime, - double dTimestep) { - - std::vector> inout_chem = work_package.input; - std::vector to_ignore; - - for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { - if (work_package.mapping[wp_id] != CHEM_PQC) { - to_ignore.push_back(wp_id); } } - this->pqc_runner->run(inout_chem, dTimestep, to_ignore); - for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { - if (work_package.mapping[wp_id] == CHEM_PQC) { - work_package.output[wp_id] = inout_chem[wp_id]; + void poet::ChemistryModule::WorkerMetricsToMaster(int type) + { + MPI_Comm worker_comm = dht->getCommunicator(); + int worker_rank; + MPI_Comm_rank(worker_comm, &worker_rank); + + MPI_Comm &group_comm = this->group_comm; + + auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( + std::vector &send_buffer, int tag) + { + std::vector to_master(send_buffer.size()); + MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), + MPI_UINT32_T, MPI_SUM, 0, worker_comm); + + if (worker_rank == 0) + { + MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, + group_comm); + } + }; + + switch (type) + { + case WORKER_DHT_HITS: + { + reduce_and_send(dht_hits, WORKER_DHT_HITS); + break; + } + case WORKER_DHT_EVICTIONS: + { + reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); + break; + } + case WORKER_IP_CALLS: + { + reduce_and_send(interp_calls, WORKER_IP_CALLS); + return; + } + case WORKER_PHT_CACHE_HITS: + { + std::vector input = this->interp->getPHTLocalCacheHits(); + reduce_and_send(input, WORKER_PHT_CACHE_HITS); + return; + } + default: + { + throw std::runtime_error("Unknown perf type in master's message."); + } } } -} - -void poet::ChemistryModule::WorkerPerfToMaster(int type, - const struct worker_s &timings) { - switch (type) { - case WORKER_PHREEQC: { - MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_DHT_GET: { - MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_DHT_FILL: { - MPI_Gather(&timings.dht_fill, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_IDLE: { - MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, - this->group_comm); - break; - } - case WORKER_IP_WRITE: { - double val = interp->getPHTWriteTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_READ: { - double val = interp->getPHTReadTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_GATHER: { - double val = interp->getDHTGatherTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - case WORKER_IP_FC: { - double val = interp->getInterpolationTime(); - MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); - break; - } - default: { - throw std::runtime_error("Unknown perf type in master's message."); - } - } -} - -void poet::ChemistryModule::WorkerMetricsToMaster(int type) { - MPI_Comm worker_comm = dht->getCommunicator(); - int worker_rank; - MPI_Comm_rank(worker_comm, &worker_rank); - - MPI_Comm &group_comm = this->group_comm; - - auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( - std::vector &send_buffer, int tag) { - std::vector to_master(send_buffer.size()); - MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), - MPI_UINT32_T, MPI_SUM, 0, worker_comm); - - if (worker_rank == 0) { - MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, - group_comm); - } - }; - - switch (type) { - case WORKER_DHT_HITS: { - reduce_and_send(dht_hits, WORKER_DHT_HITS); - break; - } - case WORKER_DHT_EVICTIONS: { - reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); - break; - } - case WORKER_IP_CALLS: { - reduce_and_send(interp_calls, WORKER_IP_CALLS); - return; - } - case WORKER_PHT_CACHE_HITS: { - std::vector input = this->interp->getPHTLocalCacheHits(); - reduce_and_send(input, WORKER_PHT_CACHE_HITS); - return; - } - default: { - throw std::runtime_error("Unknown perf type in master's message."); - } - } -} } // namespace poet diff --git a/src/poet.cpp b/src/poet.cpp index 742f4c395..87d5a2796 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -249,6 +249,10 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) { params.timesteps = Rcpp::as>(global_rt_setup->operator[]("timesteps")); + params.control_iteration = + Rcpp::as(global_rt_setup->operator[]("control_iteration")); + params.species_epsilon = + Rcpp::as>(global_rt_setup->operator[]("species_epsilon")); } catch (const std::exception &e) { ERRMSG("Error while parsing R scripts: " + std::string(e.what())); @@ -277,7 +281,7 @@ void call_master_iter_end(RInside &R, const Field &trans, const Field &chem) { *global_rt_setup = R["setup"]; } -static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, +static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, DiffusionModule &diffusion, ChemistryModule &chem) { @@ -291,8 +295,12 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps()); /* SIMULATION LOOP */ + double dSimTime{0}; for (uint32_t iter = 1; iter < maxiter + 1; iter++) { + + params.control_iteration_active = (iter % params.control_iteration == 0); + double start_t = MPI_Wtime(); const double &dt = params.timesteps[iter - 1]; @@ -308,6 +316,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters ¶ms, /* run transport */ diffusion.simulate(dt); + chem.runtime_params = ¶ms; + chem.getField().update(diffusion.getField()); // MSG("Chemistry start"); diff --git a/src/poet.hpp.in b/src/poet.hpp.in index 9462f6d7e..c08199b6d 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -41,6 +41,7 @@ static const inline std::string r_runtime_parameters = "mysetup"; struct RuntimeParameters { std::string out_dir; std::vector timesteps; + std::vector species_epsilon; Rcpp::List init_params; @@ -51,6 +52,9 @@ struct RuntimeParameters { bool print_progress = false; + bool control_iteration_active = false; + std::uint32_t control_iteration = 25; + static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32; std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;