From 9374b267739b89f71b822ec1b1d5766960e30780 Mon Sep 17 00:00:00 2001 From: rastogi Date: Wed, 15 Oct 2025 10:15:21 +0200 Subject: [PATCH] Control component with minimum features --- src/Chemistry/ChemistryModule.hpp | 46 +- src/Chemistry/MasterFunctions.cpp | 128 +++-- src/Chemistry/SurrogateModels/DHT_Wrapper.cpp | 538 ++++++++++-------- .../SurrogateModels/InterpolationModule.cpp | 255 +++++---- src/Chemistry/WorkerFunctions.cpp | 40 +- src/IO/StatsIO.cpp | 28 +- src/IO/StatsIO.hpp | 2 +- src/poet.cpp | 118 ++-- src/poet.hpp.in | 20 +- 9 files changed, 664 insertions(+), 511 deletions(-) diff --git a/src/Chemistry/ChemistryModule.hpp b/src/Chemistry/ChemistryModule.hpp index 836b0f237..c6f57bbec 100644 --- a/src/Chemistry/ChemistryModule.hpp +++ b/src/Chemistry/ChemistryModule.hpp @@ -185,6 +185,13 @@ namespace poet */ auto GetMasterLoopTime() const { return this->send_recv_t; } + + auto GetMasterCtrlLogicTime() const { return this->ctrl_t; } + + auto GetMasterCtrlBcastTime() const { return this->bcast_ctrl_t; } + + auto GetMasterRecvCtrlLogicTime() const { return this->recv_ctrl_t; } + /** * **Master only** Collect and return all accumulated timings recorded by * workers to run Phreeqc simulation. @@ -214,6 +221,8 @@ namespace poet */ std::vector GetWorkerIdleTimings() const; + std::vector GetWorkerControlTimings() const; + /** * **Master only** Collect and return DHT hits of all workers. * @@ -262,25 +271,29 @@ namespace poet std::vector ai_surrogate_validity_vector; RuntimeParameters *runtime_params = nullptr; - uint32_t control_iteration_counter = 0; - struct error_stats + struct SimulationErrorStats { std::vector mape; - std::vector rrsme; - uint32_t iteration; + std::vector rrmse; + uint32_t iteration; // iterations in simulation after rollbacks + uint32_t rollback_count; - error_stats(size_t species_count, size_t iter) - : mape(species_count, 0.0), rrsme(species_count, 0.0), iteration(iter) {} + SimulationErrorStats(size_t species_count, uint32_t iter, uint32_t counter) + : mape(species_count, 0.0), + rrmse(species_count, 0.0), + iteration(iter), + rollback_count(counter){} }; - std::vector error_stats_history; + std::vector error_history; + + static void computeSpeciesErrors(const std::vector &reference_values, + const std::vector &surrogate_values, + uint32_t size_per_prop, + uint32_t species_count, + SimulationErrorStats &species_error_stats); - static void computeStats(const std::vector &pqc_vector, - const std::vector &sur_vector, - uint32_t size_per_prop, uint32_t species_count, - error_stats &stats); - protected: void initializeDHT(uint32_t size_mb, const NamedVector &key_species, @@ -319,6 +332,7 @@ namespace poet enum { WORKER_PHREEQC, + WORKER_CTRL_ITER, WORKER_DHT_GET, WORKER_DHT_FILL, WORKER_IDLE, @@ -342,6 +356,7 @@ namespace poet double dht_get = 0.; double dht_fill = 0.; double idle_t = 0.; + double ctrl_t = 0.; }; struct worker_info_s @@ -410,6 +425,7 @@ namespace poet poet::DHT_Wrapper *dht = nullptr; + bool dht_fill_during_rollback{false}; bool interp_enabled{false}; std::unique_ptr interp; @@ -431,6 +447,10 @@ namespace poet double seq_t = 0.; double send_recv_t = 0.; + double ctrl_t = 0.; + double bcast_ctrl_t = 0.; + double recv_ctrl_t = 0.; + std::array base_totals{0}; bool print_progessbar{false}; @@ -449,7 +469,7 @@ namespace poet std::unique_ptr pqc_runner; - std::vector sur_shuffled; + std::vector sur_shuffled; }; } // namespace poet diff --git a/src/Chemistry/MasterFunctions.cpp b/src/Chemistry/MasterFunctions.cpp index 683985134..c2710bf8b 100644 --- a/src/Chemistry/MasterFunctions.cpp +++ b/src/Chemistry/MasterFunctions.cpp @@ -41,6 +41,12 @@ std::vector poet::ChemistryModule::GetWorkerPhreeqcTimings() const { return MasterGatherWorkerTimings(WORKER_PHREEQC); } +std::vector poet::ChemistryModule::GetWorkerControlTimings() const { + int type = CHEM_PERF; + MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); + return MasterGatherWorkerTimings(WORKER_CTRL_ITER); +} + std::vector poet::ChemistryModule::GetWorkerDHTGetTimings() const { int type = CHEM_PERF; MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm); @@ -160,35 +166,35 @@ std::vector poet::ChemistryModule::GetWorkerPHTCacheHits() const { return ret; } -void poet::ChemistryModule::computeStats(const std::vector &pqc_vector, - const std::vector &sur_vector, +void poet::ChemistryModule::computeSpeciesErrors(const std::vector &reference_values, + const std::vector &surrogate_values, uint32_t size_per_prop, uint32_t species_count, - error_stats &stats) { + SimulationErrorStats &species_error_stats) { for (uint32_t i = 0; i < species_count; ++i) { double err_sum = 0.0; double sqr_err_sum = 0.0; uint32_t base_idx = i * size_per_prop; for (uint32_t j = 0; j < size_per_prop; ++j) { - const double pqc_value = pqc_vector[base_idx + j]; - const double sur_value = sur_vector[base_idx + j]; + const double ref_value = reference_values[base_idx + j]; + const double sur_value = surrogate_values[base_idx + j]; - if (pqc_value == 0.0) { + if (ref_value == 0.0) { if (sur_value != 0.0) { err_sum += 1.0; sqr_err_sum += 1.0; } // Both zero: skip } else { - double alpha = 1.0 - (sur_value / pqc_value); + double alpha = 1.0 - (sur_value / ref_value); err_sum += std::abs(alpha); sqr_err_sum += alpha * alpha; } } - stats.mape[i] = 100.0 * (err_sum / size_per_prop); - stats.rrsme[i] = + species_error_stats.mape[i] = 100.0 * (err_sum / size_per_prop); + species_error_stats.rrmse[i] = (size_per_prop > 0) ? std::sqrt(sqr_err_sum / size_per_prop) : 0.0; } } @@ -264,7 +270,7 @@ inline void poet::ChemistryModule::MasterSendPkgs( worker_list_t &w_list, workpointer_t &work_pointer, workpointer_t &sur_pointer, int &pkg_to_send, int &count_pkgs, int &free_workers, double dt, uint32_t iteration, - uint32_t control_iteration, const std::vector &wp_sizes_vector) { + uint32_t control_interval, const std::vector &wp_sizes_vector) { /* declare variables */ int local_work_package_size; @@ -305,7 +311,7 @@ inline void poet::ChemistryModule::MasterSendPkgs( std::next(wp_sizes_vector.begin(), count_pkgs), 0); send_buffer[end_of_wp + 4] = wp_start_index; // whether this iteration is a control iteration - send_buffer[end_of_wp + 5] = control_iteration; + send_buffer[end_of_wp + 5] = control_interval; /* ATTENTION Worker p has rank p+1 */ // MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p + 1, @@ -329,6 +335,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, int need_to_receive = 1; double idle_a, idle_b; int p, size; + double recv_a, recv_b; MPI_Status probe_status; // master_recv_a = MPI_Wtime(); @@ -361,6 +368,7 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, free_workers++; } if (probe_status.MPI_TAG == LOOP_CTRL) { + recv_a = MPI_Wtime(); MPI_Get_count(&probe_status, MPI_DOUBLE, &size); // layout of buffer is [phreeqc][surrogate] @@ -378,6 +386,8 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list, w_list[p - 1].has_work = 0; pkg_to_recv -= 1; free_workers++; + recv_b = MPI_Wtime(); + this->recv_ctrl_t += recv_b - recv_a; } } } @@ -432,6 +442,10 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { int i_pkgs; int ftype; + double ctrl_a, ctrl_b; + double worker_ctrl_a, worker_ctrl_b; + double ctrl_bcast_a, ctrl_bcast_b; + const std::vector wp_sizes_vector = CalculateWPSizesVector(this->n_cells, this->wp_size); @@ -445,28 +459,44 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { MPI_INT); } - ftype = CHEM_WORK_LOOP; - PropagateFunctionType(ftype); + /* start time measurement of broadcasting interpolation status */ + ctrl_bcast_a = MPI_Wtime(); ftype = CHEM_INTERP; PropagateFunctionType(ftype); - if(this->runtime_params->rollback_simulation){ + int interp_flag = 0; + int dht_fill_flag = 0; + + if(this->runtime_params->rollback_enabled){ this->interp_enabled = false; - int interp_flag = 0; - ChemBCast(&interp_flag, 1, MPI_INT); - } else { + this->dht_fill_during_rollback = true; + interp_flag = 0; + dht_fill_flag = 1; + } + else { this->interp_enabled = true; - int interp_flag = 1; - ChemBCast(&interp_flag, 1, MPI_INT); + this->dht_fill_during_rollback = false; + interp_flag = 1; + dht_fill_flag = 0; } + ChemBCast(&interp_flag, 1, MPI_INT); + ChemBCast(&dht_fill_flag, 1, MPI_INT); + + /* end time measurement of broadcasting interpolation status */ + ctrl_bcast_b = MPI_Wtime(); + this->bcast_ctrl_t += ctrl_bcast_b - ctrl_bcast_a; + + ftype = CHEM_WORK_LOOP; + PropagateFunctionType(ftype); MPI_Barrier(this->group_comm); static uint32_t iteration = 0; - uint32_t control_iteration = static_cast( - this->runtime_params->control_iteration_active ? 1 : 0); - if (control_iteration) { + + uint32_t control_logic_enabled = this->runtime_params->control_interval_enabled ? 1 : 0; + + if (control_logic_enabled) { sur_shuffled.clear(); sur_shuffled.reserve(this->n_cells * this->prop_count); } @@ -512,7 +542,7 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { if (pkg_to_send > 0) { // send packages to all free workers ... MasterSendPkgs(worker_list, work_pointer, sur_pointer, pkg_to_send, - i_pkgs, free_workers, dt, iteration, control_iteration, + i_pkgs, free_workers, dt, iteration, control_logic_enabled, wp_sizes_vector); } // ... and try to receive them from workers who has finished their work @@ -522,39 +552,43 @@ void poet::ChemistryModule::MasterRunParallel(double dt) { // Just to complete the progressbar std::cout << std::endl; - /* stop time measurement of chemistry time needed for send/recv loop */ - worker_chemistry_b = MPI_Wtime(); - this->send_recv_t += worker_chemistry_b - worker_chemistry_a; + /* stop time measurement of chemistry time needed for send/recv loop */ + worker_chemistry_b = MPI_Wtime(); + this->send_recv_t += worker_chemistry_b - worker_chemistry_a; - /* start time measurement of sequential part */ - seq_c = MPI_Wtime(); + /* start time measurement of sequential part */ + seq_c = MPI_Wtime(); - /* unshuffle grid */ - // grid.importAndUnshuffle(mpi_buffer); - std::vector out_vec{mpi_buffer}; - unshuffleField(mpi_buffer, this->n_cells, this->prop_count, - wp_sizes_vector.size(), out_vec); - chem_field = out_vec; + /* unshuffle grid */ + // grid.importAndUnshuffle(mpi_buffer); + std::vector out_vec{mpi_buffer}; + unshuffleField(mpi_buffer, this->n_cells, this->prop_count, + wp_sizes_vector.size(), out_vec); + chem_field = out_vec; - /* do master stuff */ + /* do master stuff */ - if (control_iteration) { - control_iteration_counter++; + /* start time measurement of control logic */ + ctrl_a = MPI_Wtime(); - std::vector sur_unshuffled{sur_shuffled}; + if (control_logic_enabled && !this->runtime_params->rollback_enabled) { - unshuffleField(sur_shuffled, this->n_cells, this->prop_count, - wp_sizes_vector.size(), sur_unshuffled); + std::vector sur_unshuffled{sur_shuffled};; - error_stats stats(this->prop_count, control_iteration_counter * - runtime_params->control_iteration); + unshuffleField(sur_shuffled, this->n_cells, this->prop_count, + wp_sizes_vector.size(), sur_unshuffled); - computeStats(out_vec, sur_unshuffled, this->n_cells, this->prop_count, - stats); - error_stats_history.push_back(stats); + SimulationErrorStats stats(this->prop_count, this->runtime_params->global_iter, this->runtime_params->rollback_counter); + + computeSpeciesErrors(out_vec, sur_unshuffled, this->n_cells, this->prop_count, stats); + + error_history.push_back(stats); + } + + /* end time measurement of control logic */ + ctrl_b = MPI_Wtime(); + this->ctrl_t += ctrl_b - ctrl_a; - // to do: control values to epsilon - } /* start time measurement of master chemistry */ sim_e_chemistry = MPI_Wtime(); diff --git a/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp b/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp index 83db27ff8..8ee59bf8f 100644 --- a/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp +++ b/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp @@ -36,315 +36,357 @@ using namespace std; -namespace poet { +namespace poet +{ -DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size, - const NamedVector &key_species, - const std::vector &key_indices, - const std::vector &_output_names, - const InitialList::ChemistryHookFunctions &_hooks, - uint32_t data_count, bool _with_interp, - bool _has_het_ids) - : key_count(key_indices.size()), data_count(data_count), - input_key_elements(key_indices), communicator(dht_comm), - key_species(key_species), output_names(_output_names), hooks(_hooks), - with_interp(_with_interp), has_het_ids(_has_het_ids) { - // initialize DHT object - // key size = count of key elements + timestep - uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement); - uint32_t data_size = - (data_count + (with_interp ? input_key_elements.size() : 0)) * - sizeof(double); - uint32_t buckets_per_process = - static_cast(dht_size / (data_size + key_size)); - dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size, - &poet::Murmur2_64A); + DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size, + const NamedVector &key_species, + const std::vector &key_indices, + const std::vector &_output_names, + const InitialList::ChemistryHookFunctions &_hooks, + uint32_t data_count, bool _with_interp, + bool _has_het_ids) + : key_count(key_indices.size()), data_count(data_count), + input_key_elements(key_indices), communicator(dht_comm), + key_species(key_species), output_names(_output_names), hooks(_hooks), + with_interp(_with_interp), has_het_ids(_has_het_ids) + { + // initialize DHT object + // key size = count of key elements + timestep + uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement); + uint32_t data_size = + (data_count + (with_interp ? input_key_elements.size() : 0)) * + sizeof(double); + uint32_t buckets_per_process = + static_cast(dht_size / (data_size + key_size)); + dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size, + &poet::Murmur2_64A); - dht_signif_vector = key_species.getValues(); + dht_signif_vector = key_species.getValues(); - // this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT); + // this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT); - this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT); + this->dht_prop_type_vector.resize(key_count, DHT_TYPE_DEFAULT); - const auto key_names = key_species.getNames(); + const auto key_names = key_species.getNames(); - auto tot_h = std::find(key_names.begin(), key_names.end(), "H"); - if (tot_h != key_names.end()) { - this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL; - } - auto tot_o = std::find(key_names.begin(), key_names.end(), "O"); - if (tot_o != key_names.end()) { - this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL; - } - auto charge = std::find(key_names.begin(), key_names.end(), "Charge"); - if (charge != key_names.end()) { - this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE; - } -} - -DHT_Wrapper::~DHT_Wrapper() { - // free DHT - DHT_free(dht_object, NULL, NULL); -} -auto DHT_Wrapper::checkDHT(WorkPackage &work_package) - -> const DHT_ResultObject & { - - const auto length = work_package.size; - - std::vector bucket_writer( - this->data_count + (with_interp ? input_key_elements.size() : 0)); - - // loop over every grid cell contained in work package - for (int i = 0; i < length; i++) { - // point to current grid cell - auto &key_vector = dht_results.keys[i]; - - // overwrite input with data from DHT, IF value is found in DHT - int res = - DHT_read(this->dht_object, key_vector.data(), bucket_writer.data()); - - switch (res) { - case DHT_SUCCESS: - work_package.output[i] = - (with_interp - ? inputAndRatesToOutput(bucket_writer, work_package.input[i]) - : bucket_writer); - work_package.mapping[i] = CHEM_DHT; - this->dht_hits++; - break; - case DHT_READ_MISS: - break; + auto tot_h = std::find(key_names.begin(), key_names.end(), "H"); + if (tot_h != key_names.end()) + { + this->dht_prop_type_vector[tot_h - key_names.begin()] = DHT_TYPE_TOTAL; + } + auto tot_o = std::find(key_names.begin(), key_names.end(), "O"); + if (tot_o != key_names.end()) + { + this->dht_prop_type_vector[tot_o - key_names.begin()] = DHT_TYPE_TOTAL; + } + auto charge = std::find(key_names.begin(), key_names.end(), "Charge"); + if (charge != key_names.end()) + { + this->dht_prop_type_vector[charge - key_names.begin()] = DHT_TYPE_CHARGE; } } - return dht_results; -} + DHT_Wrapper::~DHT_Wrapper() + { + // free DHT + DHT_free(dht_object, NULL, NULL); + } + auto DHT_Wrapper::checkDHT(WorkPackage &work_package) + -> const DHT_ResultObject & + { -void DHT_Wrapper::fillDHT(const WorkPackage &work_package) { + const auto length = work_package.size; - const auto length = work_package.size; + std::vector bucket_writer( + this->data_count + (with_interp ? input_key_elements.size() : 0)); - // loop over every grid cell contained in work package - dht_results.locations.resize(length); - dht_results.filledDHT = std::vector(length, false); - for (int i = 0; i < length; i++) { - // If true grid cell was simulated, needs to be inserted into dht - if (work_package.mapping[i] != CHEM_PQC) { - continue; - } + // loop over every grid cell contained in work package + for (int i = 0; i < length; i++) + { + // point to current grid cell + auto &key_vector = dht_results.keys[i]; - if (work_package.input[i][0] != 2) { - continue; - } + // overwrite input with data from DHT, IF value is found in DHT + int res = + DHT_read(this->dht_object, key_vector.data(), bucket_writer.data()); - // check if calcite or dolomite is absent and present, resp.n and vice - // versa in input/output. If this is the case -> Do not write to DHT! - // HACK: hardcoded, should be fixed! - if (hooks.dht_fill.isValid()) { - NamedVector old_values(output_names, work_package.input[i]); - NamedVector new_values(output_names, work_package.output[i]); - - if (Rcpp::as(hooks.dht_fill(old_values, new_values))) { - continue; + switch (res) + { + case DHT_SUCCESS: + work_package.output[i] = + (with_interp + ? inputAndRatesToOutput(bucket_writer, work_package.input[i]) + : bucket_writer); + work_package.mapping[i] = CHEM_DHT; + this->dht_hits++; + break; + case DHT_READ_MISS: + break; } } - uint32_t proc, index; - auto &key = dht_results.keys[i]; - const auto data = - (with_interp ? outputToInputAndRates(work_package.input[i], - work_package.output[i]) - : work_package.output[i]); - // void *data = (void *)&(work_package[i * this->data_count]); - // fuzz data (round, logarithm etc.) + return dht_results; + } - // insert simulated data with fuzzed key into DHT - int res = DHT_write(this->dht_object, key.data(), - const_cast(data.data()), &proc, &index); + void DHT_Wrapper::fillDHT(const WorkPackage &work_package) + { - dht_results.locations[i] = {proc, index}; + const auto length = work_package.size; - // if data was successfully written ... - if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) { - dht_evictions++; + // loop over every grid cell contained in work package + dht_results.locations.resize(length); + dht_results.filledDHT = std::vector(length, false); + for (int i = 0; i < length; i++) + { + // If true grid cell was simulated, needs to be inserted into dht + if (work_package.mapping[i] != CHEM_PQC) + { + continue; + } + + if (work_package.input[i][1] != 2) + { + continue; + } + + // check if calcite or dolomite is absent and present, resp.n and vice + // versa in input/output. If this is the case -> Do not write to DHT! + // HACK: hardcoded, should be fixed! + if (hooks.dht_fill.isValid()) + { + NamedVector old_values(output_names, work_package.input[i]); + NamedVector new_values(output_names, work_package.output[i]); + + if (Rcpp::as(hooks.dht_fill(old_values, new_values))) + { + continue; + } + } + + uint32_t proc, index; + auto &key = dht_results.keys[i]; + const auto data = + (with_interp ? outputToInputAndRates(work_package.input[i], + work_package.output[i]) + : work_package.output[i]); + // void *data = (void *)&(work_package[i * this->data_count]); + // fuzz data (round, logarithm etc.) + + // insert simulated data with fuzzed key into DHT + int res = DHT_write(this->dht_object, key.data(), + const_cast(data.data()), &proc, &index); + + dht_results.locations[i] = {proc, index}; + + // if data was successfully written ... + if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) + { + dht_evictions++; + } + + dht_results.filledDHT[i] = true; + } + } + + inline std::vector + DHT_Wrapper::outputToInputAndRates(const std::vector &old_results, + const std::vector &new_results) + { + const int prefix_size = this->input_key_elements.size(); + + std::vector output(prefix_size + this->data_count); + std::copy(new_results.begin(), new_results.end(), + output.begin() + prefix_size); + + for (int i = 0; i < prefix_size; i++) + { + const int data_elem_i = input_key_elements[i]; + output[i] = old_results[data_elem_i]; + output[prefix_size + data_elem_i] -= old_results[data_elem_i]; } - dht_results.filledDHT[i] = true; - } -} - -inline std::vector -DHT_Wrapper::outputToInputAndRates(const std::vector &old_results, - const std::vector &new_results) { - const int prefix_size = this->input_key_elements.size(); - - std::vector output(prefix_size + this->data_count); - std::copy(new_results.begin(), new_results.end(), - output.begin() + prefix_size); - - for (int i = 0; i < prefix_size; i++) { - const int data_elem_i = input_key_elements[i]; - output[i] = old_results[data_elem_i]; - output[prefix_size + data_elem_i] -= old_results[data_elem_i]; + return output; } - return output; -} + inline std::vector + DHT_Wrapper::inputAndRatesToOutput(const std::vector &dht_data, + const std::vector &input_values) + { + const int prefix_size = this->input_key_elements.size(); -inline std::vector -DHT_Wrapper::inputAndRatesToOutput(const std::vector &dht_data, - const std::vector &input_values) { - const int prefix_size = this->input_key_elements.size(); + std::vector output(input_values); - std::vector output(input_values); + for (int i = 0; i < prefix_size; i++) + { + const int data_elem_i = input_key_elements[i]; + output[data_elem_i] += dht_data[i]; + } - for (int i = 0; i < prefix_size; i++) { - const int data_elem_i = input_key_elements[i]; - output[data_elem_i] += dht_data[i]; + return output; } - return output; -} + inline std::vector + DHT_Wrapper::outputToRates(const std::vector &old_results, + const std::vector &new_results) + { + std::vector output(new_results); -inline std::vector -DHT_Wrapper::outputToRates(const std::vector &old_results, - const std::vector &new_results) { - std::vector output(new_results); + for (const auto &data_elem_i : input_key_elements) + { + output[data_elem_i] -= old_results[data_elem_i]; + } - for (const auto &data_elem_i : input_key_elements) { - output[data_elem_i] -= old_results[data_elem_i]; + return output; } - return output; -} + inline std::vector + DHT_Wrapper::ratesToOutput(const std::vector &dht_data, + const std::vector &input_values) + { + std::vector output(input_values); -inline std::vector -DHT_Wrapper::ratesToOutput(const std::vector &dht_data, - const std::vector &input_values) { - std::vector output(input_values); + for (const auto &data_elem_i : input_key_elements) + { + output[data_elem_i] += dht_data[data_elem_i]; + } - for (const auto &data_elem_i : input_key_elements) { - output[data_elem_i] += dht_data[data_elem_i]; + return output; } - return output; -} + // void DHT_Wrapper::resultsToWP(std::vector &work_package) { + // for (int i = 0; i < dht_results.length; i++) { + // if (!dht_results.needPhreeqc[i]) { + // std::copy(dht_results.results[i].begin(), dht_results.results[i].end(), + // work_package.begin() + (data_count * i)); + // } + // } + // } -// void DHT_Wrapper::resultsToWP(std::vector &work_package) { -// for (int i = 0; i < dht_results.length; i++) { -// if (!dht_results.needPhreeqc[i]) { -// std::copy(dht_results.results[i].begin(), dht_results.results[i].end(), -// work_package.begin() + (data_count * i)); -// } -// } -// } - -int DHT_Wrapper::tableToFile(const char *filename) { - int res = DHT_to_file(dht_object, filename); - return res; -} - -int DHT_Wrapper::fileToTable(const char *filename) { - int res = DHT_from_file(dht_object, filename); - if (res != DHT_SUCCESS) + int DHT_Wrapper::tableToFile(const char *filename) + { + int res = DHT_to_file(dht_object, filename); return res; + } + + int DHT_Wrapper::fileToTable(const char *filename) + { + int res = DHT_from_file(dht_object, filename); + if (res != DHT_SUCCESS) + return res; #ifdef DHT_STATISTICS - DHT_print_statistics(dht_object); + DHT_print_statistics(dht_object); #endif - return DHT_SUCCESS; -} - -void DHT_Wrapper::printStatistics() { - int res; - - res = DHT_print_statistics(dht_object); - - if (res != DHT_SUCCESS) { - // MPI ERROR ... WHAT TO DO NOW? - // RUNNING CIRCLES WHILE SCREAMING + return DHT_SUCCESS; } -} -LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector &cell, - double dt) { - const auto c_zero_val = std::pow(10, AQUEOUS_EXP); + void DHT_Wrapper::printStatistics() + { + int res; - NamedVector input_nv(this->output_names, cell); + res = DHT_print_statistics(dht_object); - const std::vector eval_vec = - Rcpp::as>(hooks.dht_fuzz(input_nv)); - assert(eval_vec.size() == this->key_count); - LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0}); + if (res != DHT_SUCCESS) + { + // MPI ERROR ... WHAT TO DO NOW? + // RUNNING CIRCLES WHILE SCREAMING + } + } - DHT_Rounder rounder; + LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector &cell, + double dt) + { + const auto c_zero_val = std::pow(10, AQUEOUS_EXP); - int totals_i = 0; - // introduce fuzzing to allow more hits in DHT - // loop over every variable of grid cell - for (std::uint32_t i = 0; i < eval_vec.size(); i++) { - double curr_key = eval_vec[i]; - if (curr_key != 0) { - if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) { - curr_key -= base_totals[totals_i++]; + NamedVector input_nv(this->output_names, cell); + + const std::vector eval_vec = + Rcpp::as>(hooks.dht_fuzz(input_nv)); + assert(eval_vec.size() == this->key_count); + LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0}); + + DHT_Rounder rounder; + + int totals_i = 0; + // introduce fuzzing to allow more hits in DHT + // loop over every variable of grid cell + for (std::uint32_t i = 0; i < eval_vec.size(); i++) + { + double curr_key = eval_vec[i]; + if (curr_key != 0) + { + if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) + { + curr_key -= base_totals[totals_i++]; + } + vecFuzz[i] = + rounder.round(curr_key, dht_signif_vector[i], + this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL); } - vecFuzz[i] = - rounder.round(curr_key, dht_signif_vector[i], - this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL); } - } - // add timestep to the end of the key as double value - vecFuzz[this->key_count].fp_element = dt; - if (has_het_ids) { - vecFuzz[this->key_count + 1].fp_element = cell[0]; + // add timestep to the end of the key as double value + vecFuzz[this->key_count].fp_element = dt; + if (has_het_ids) + { + vecFuzz[this->key_count + 1].fp_element = cell[0]; + } + + return vecFuzz; } - return vecFuzz; -} + LookupKey DHT_Wrapper::fuzzForDHT(const std::vector &cell, double dt) + { + const auto c_zero_val = std::pow(10, AQUEOUS_EXP); -LookupKey DHT_Wrapper::fuzzForDHT(const std::vector &cell, double dt) { - const auto c_zero_val = std::pow(10, AQUEOUS_EXP); + LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0}); + DHT_Rounder rounder; - LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0}); - DHT_Rounder rounder; - - int totals_i = 0; - // introduce fuzzing to allow more hits in DHT - // loop over every variable of grid cell - for (std::uint32_t i = 0; i < input_key_elements.size(); i++) { - if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM) { - continue; - } - double curr_key = cell[input_key_elements[i]]; - if (curr_key != 0) { - if (curr_key < c_zero_val && - this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) { + int totals_i = 0; + // introduce fuzzing to allow more hits in DHT + // loop over every variable of grid cell + for (std::uint32_t i = 0; i < input_key_elements.size(); i++) + { + if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM) + { continue; } - if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) { - curr_key -= base_totals[totals_i++]; + double curr_key = cell[input_key_elements[i]]; + if (curr_key != 0) + { + if (curr_key < c_zero_val && + this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) + { + continue; + } + if (this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL) + { + curr_key -= base_totals[totals_i++]; + } + vecFuzz[i] = + rounder.round(curr_key, dht_signif_vector[i], + this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL); } - vecFuzz[i] = - rounder.round(curr_key, dht_signif_vector[i], - this->dht_prop_type_vector[i] == DHT_TYPE_TOTAL); } - } - // add timestep to the end of the key as double value - vecFuzz[this->key_count].fp_element = dt; - if (has_het_ids) { - vecFuzz[this->key_count + 1].fp_element = cell[0]; + // add timestep to the end of the key as double value + vecFuzz[this->key_count].fp_element = dt; + if (has_het_ids) + { + vecFuzz[this->key_count + 1].fp_element = cell[0]; + } + + return vecFuzz; } - return vecFuzz; -} + void poet::DHT_Wrapper::SetSignifVector(std::vector signif_vec) + { + if (signif_vec.size() != this->key_count) + { + throw std::runtime_error( + "Significant vector size mismatches count of key elements."); + } -void poet::DHT_Wrapper::SetSignifVector(std::vector signif_vec) { - if (signif_vec.size() != this->key_count) { - throw std::runtime_error( - "Significant vector size mismatches count of key elements."); + this->dht_signif_vector = signif_vec; } - - this->dht_signif_vector = signif_vec; -} } // namespace poet diff --git a/src/Chemistry/SurrogateModels/InterpolationModule.cpp b/src/Chemistry/SurrogateModels/InterpolationModule.cpp index db730d00d..e3b93c599 100644 --- a/src/Chemistry/SurrogateModels/InterpolationModule.cpp +++ b/src/Chemistry/SurrogateModels/InterpolationModule.cpp @@ -25,152 +25,175 @@ #include #include -extern "C" { +extern "C" +{ #include "DHT.h" } -namespace poet { +namespace poet +{ -InterpolationModule::InterpolationModule( - std::uint32_t entries_per_bucket, std::uint64_t size_per_process, - std::uint32_t min_entries_needed, DHT_Wrapper &dht, - const NamedVector &interp_key_signifs, - const std::vector &dht_key_indices, - const std::vector &_out_names, - const InitialList::ChemistryHookFunctions &_hooks) - : dht_instance(dht), key_signifs(interp_key_signifs), - key_indices(dht_key_indices), min_entries_needed(min_entries_needed), - dht_names(dht.getKeySpecies().getNames()), out_names(_out_names), - hooks(_hooks) { + InterpolationModule::InterpolationModule( + std::uint32_t entries_per_bucket, std::uint64_t size_per_process, + std::uint32_t min_entries_needed, DHT_Wrapper &dht, + const NamedVector &interp_key_signifs, + const std::vector &dht_key_indices, + const std::vector &_out_names, + const InitialList::ChemistryHookFunctions &_hooks) + : dht_instance(dht), key_signifs(interp_key_signifs), + key_indices(dht_key_indices), min_entries_needed(min_entries_needed), + dht_names(dht.getKeySpecies().getNames()), out_names(_out_names), + hooks(_hooks) + { - initPHT(this->key_signifs.size(), entries_per_bucket, size_per_process, - dht.getCommunicator()); + initPHT(this->key_signifs.size(), entries_per_bucket, size_per_process, + dht.getCommunicator()); - pht->setSourceDHT(dht.getDHT()); -} + pht->setSourceDHT(dht.getDHT()); + } -void InterpolationModule::initPHT(std::uint32_t key_count, - std::uint32_t entries_per_bucket, - std::uint32_t size_per_process, - MPI_Comm communicator) { - uint32_t key_size = key_count * sizeof(Lookup_Keyelement) + sizeof(double); - uint32_t data_size = sizeof(DHT_Location); + void InterpolationModule::initPHT(std::uint32_t key_count, + std::uint32_t entries_per_bucket, + std::uint32_t size_per_process, + MPI_Comm communicator) + { + uint32_t key_size = key_count * sizeof(Lookup_Keyelement) + sizeof(double); + uint32_t data_size = sizeof(DHT_Location); - pht = std::make_unique( - key_size, data_size, entries_per_bucket, size_per_process, communicator); -} + pht = std::make_unique( + key_size, data_size, entries_per_bucket, size_per_process, communicator); + } -void InterpolationModule::writePairs() { - const auto in = this->dht_instance.getDHTResults(); - for (int i = 0; i < in.filledDHT.size(); i++) { - if (in.filledDHT[i]) { - const auto coarse_key = roundKey(in.keys[i]); - pht->writeLocationToPHT(coarse_key, in.locations[i]); + void InterpolationModule::writePairs() + { + const auto in = this->dht_instance.getDHTResults(); + for (int i = 0; i < in.filledDHT.size(); i++) + { + if (in.filledDHT[i]) + { + const auto coarse_key = roundKey(in.keys[i]); + pht->writeLocationToPHT(coarse_key, in.locations[i]); + } } } -} -void InterpolationModule::tryInterpolation(WorkPackage &work_package) { - interp_result.status.resize(work_package.size, NOT_NEEDED); + void InterpolationModule::tryInterpolation(WorkPackage &work_package) + { + interp_result.status.resize(work_package.size, NOT_NEEDED); - const auto dht_results = this->dht_instance.getDHTResults(); + const auto dht_results = this->dht_instance.getDHTResults(); - for (int wp_i = 0; wp_i < work_package.size; wp_i++) { - if (work_package.input[wp_i][0] != 2) { - interp_result.status[wp_i] = INSUFFICIENT_DATA; - continue; - } - - if (work_package.mapping[wp_i] != CHEM_PQC) { - interp_result.status[wp_i] = NOT_NEEDED; - continue; - } - - const auto rounded_key = roundKey(dht_results.keys[wp_i]); - - auto pht_result = - pht->query(rounded_key, this->min_entries_needed, - dht_instance.getInputCount(), dht_instance.getOutputCount()); - - if (pht_result.size < this->min_entries_needed) { - interp_result.status[wp_i] = INSUFFICIENT_DATA; - continue; - } - - if (hooks.interp_pre.isValid()) { - NamedVector nv_in(this->out_names, work_package.input[wp_i]); - - std::vector rm_indices = Rcpp::as>( - hooks.interp_pre(nv_in, pht_result.in_values)); - - pht_result.size -= rm_indices.size(); - - if (pht_result.size < this->min_entries_needed) { + for (int wp_i = 0; wp_i < work_package.size; wp_i++) + { + if (work_package.input[wp_i][1] != 2) + { interp_result.status[wp_i] = INSUFFICIENT_DATA; continue; } - for (const auto &index : rm_indices) { - pht_result.in_values.erase( - std::next(pht_result.in_values.begin(), index - 1)); - pht_result.out_values.erase( - std::next(pht_result.out_values.begin(), index - 1)); + if (work_package.mapping[wp_i] != CHEM_PQC) + { + interp_result.status[wp_i] = NOT_NEEDED; + continue; } - } -#ifdef POET_PHT_ADD - this->pht->incrementReadCounter(roundKey(rounded_key)); -#endif + const auto rounded_key = roundKey(dht_results.keys[wp_i]); - const int cell_id = static_cast(work_package.input[wp_i][0]); + auto pht_result = + pht->query(rounded_key, this->min_entries_needed, + dht_instance.getInputCount(), dht_instance.getOutputCount()); - if (!to_calc_cache.contains(cell_id)) { - const std::vector &to_calc = dht_instance.getKeyElements(); - std::vector keep_indices; + if (pht_result.size < this->min_entries_needed) + { + interp_result.status[wp_i] = INSUFFICIENT_DATA; + continue; + } - for (std::size_t i = 0; i < to_calc.size(); i++) { - if (!std::isnan(work_package.input[wp_i][to_calc[i]])) { - keep_indices.push_back(to_calc[i]); + if (hooks.interp_pre.isValid()) + { + NamedVector nv_in(this->out_names, work_package.input[wp_i]); + + std::vector rm_indices = Rcpp::as>( + hooks.interp_pre(nv_in, pht_result.in_values)); + + pht_result.size -= rm_indices.size(); + + if (pht_result.size < this->min_entries_needed) + { + interp_result.status[wp_i] = INSUFFICIENT_DATA; + continue; + } + + for (const auto &index : rm_indices) + { + pht_result.in_values.erase( + std::next(pht_result.in_values.begin(), index - 1)); + pht_result.out_values.erase( + std::next(pht_result.out_values.begin(), index - 1)); } } - to_calc_cache[cell_id] = keep_indices; +#ifdef POET_PHT_ADD + this->pht->incrementReadCounter(roundKey(rounded_key)); +#endif + + const int cell_id = static_cast(work_package.input[wp_i][1]); + + if (!to_calc_cache.contains(cell_id)) + { + const std::vector &to_calc = dht_instance.getKeyElements(); + std::vector keep_indices; + + for (std::size_t i = 0; i < to_calc.size(); i++) + { + if (!std::isnan(work_package.input[wp_i][to_calc[i]])) + { + keep_indices.push_back(to_calc[i]); + } + } + + to_calc_cache[cell_id] = keep_indices; + } + + double start_fc = MPI_Wtime(); + + work_package.output[wp_i] = + f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i], + pht_result.in_values, pht_result.out_values); + + if (hooks.interp_post.isValid()) + { + NamedVector nv_result(this->out_names, work_package.output[wp_i]); + if (Rcpp::as(hooks.interp_post(nv_result))) + { + interp_result.status[wp_i] = INSUFFICIENT_DATA; + continue; + } + } + + // interp_result.results[i][0] = mean_water; + this->interp_t += MPI_Wtime() - start_fc; + + this->interpolations++; + + work_package.mapping[wp_i] = CHEM_INTERP; + interp_result.status[wp_i] = RES_OK; } + } - double start_fc = MPI_Wtime(); - - work_package.output[wp_i] = - f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i], - pht_result.in_values, pht_result.out_values); - - if (hooks.interp_post.isValid()) { - NamedVector nv_result(this->out_names, work_package.output[wp_i]); - if (Rcpp::as(hooks.interp_post(nv_result))) { - interp_result.status[wp_i] = INSUFFICIENT_DATA; - continue; + void InterpolationModule::resultsToWP(std::vector &work_package) const + { + for (uint32_t i = 0; i < interp_result.status.size(); i++) + { + if (interp_result.status[i] == RES_OK) + { + const std::size_t length = + interp_result.results[i].end() - interp_result.results[i].begin(); + std::copy(interp_result.results[i].begin(), + interp_result.results[i].end(), + work_package.begin() + (length * i)); } } - - // interp_result.results[i][0] = mean_water; - this->interp_t += MPI_Wtime() - start_fc; - - this->interpolations++; - - work_package.mapping[wp_i] = CHEM_INTERP; - interp_result.status[wp_i] = RES_OK; } -} - -void InterpolationModule::resultsToWP(std::vector &work_package) const { - for (uint32_t i = 0; i < interp_result.status.size(); i++) { - if (interp_result.status[i] == RES_OK) { - const std::size_t length = - interp_result.results[i].end() - interp_result.results[i].begin(); - std::copy(interp_result.results[i].begin(), - interp_result.results[i].end(), - work_package.begin() + (length * i)); - } - } -} } // namespace poet diff --git a/src/Chemistry/WorkerFunctions.cpp b/src/Chemistry/WorkerFunctions.cpp index b354f986d..4406ec65d 100644 --- a/src/Chemistry/WorkerFunctions.cpp +++ b/src/Chemistry/WorkerFunctions.cpp @@ -69,9 +69,14 @@ namespace poet } case CHEM_INTERP: { - int interp_flag; + int interp_flag = 0; + int dht_fill_flag = 0; + ChemBCast(&interp_flag, 1, MPI_INT); + ChemBCast(&dht_fill_flag, 1, MPI_INT); + this->interp_enabled = (interp_flag == 1); + this->dht_fill_during_rollback = (dht_fill_flag == 1); break; } case CHEM_WORK_LOOP: @@ -150,13 +155,14 @@ namespace poet double dht_get_start, dht_get_end; double phreeqc_time_start, phreeqc_time_end; double dht_fill_start, dht_fill_end; + double ctrl_time_c, ctrl_time_d; uint32_t iteration; double dt; double current_sim_time; uint32_t wp_start_index; int count = double_count; - bool control_iteration_active = false; + bool control_logic_enabled = false; std::vector mpi_buffer(count); /* receive */ @@ -183,7 +189,7 @@ namespace poet // current work package start location in field wp_start_index = mpi_buffer[count + 4]; - control_iteration_active = (mpi_buffer[count + 5] == 1); + control_logic_enabled = (mpi_buffer[count + 5] == 1); for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) { @@ -229,7 +235,7 @@ namespace poet poet::WorkPackage s_curr_wp_control = s_curr_wp; - if (control_iteration_active) + if (control_logic_enabled) { for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) { @@ -240,12 +246,15 @@ namespace poet phreeqc_time_start = MPI_Wtime(); - WorkerRunWorkPackage(control_iteration_active ? s_curr_wp_control : s_curr_wp, current_sim_time, dt); + WorkerRunWorkPackage(control_logic_enabled ? s_curr_wp_control : s_curr_wp, current_sim_time, dt); phreeqc_time_end = MPI_Wtime(); - if (control_iteration_active) - { + if (control_logic_enabled) + { + /* start time measurement for copying control workpackage */ + ctrl_time_c = MPI_Wtime(); + std::size_t sur_wp_offset = s_curr_wp.size * this->prop_count; mpi_buffer.resize(count + sur_wp_offset); @@ -275,6 +284,10 @@ namespace poet } count += sur_wp_offset; + + /* end time measurement for copying control workpackage */ + ctrl_time_d = MPI_Wtime(); + timings.ctrl_t += ctrl_time_d - ctrl_time_c; } else { @@ -288,14 +301,14 @@ namespace poet /* send results to master */ MPI_Request send_req; - int mpi_tag = control_iteration_active ? LOOP_CTRL : LOOP_WORK; + int mpi_tag = control_logic_enabled ? LOOP_CTRL : LOOP_WORK; MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req); - if (dht_enabled || interp_enabled) + if (dht_enabled || interp_enabled || dht_fill_during_rollback) { /* write results to DHT */ dht_fill_start = MPI_Wtime(); - dht->fillDHT(control_iteration_active ? s_curr_wp_control : s_curr_wp); + dht->fillDHT(control_logic_enabled ? s_curr_wp_control : s_curr_wp); dht_fill_end = MPI_Wtime(); if (interp_enabled) @@ -306,7 +319,6 @@ namespace poet } timings.phreeqc_t += phreeqc_time_end - phreeqc_time_start; - MPI_Wait(&send_req, MPI_STATUS_IGNORE); } @@ -460,6 +472,12 @@ namespace poet this->group_comm); break; } + case WORKER_CTRL_ITER: + { + MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, + this->group_comm); + break; + } case WORKER_DHT_GET: { MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, diff --git a/src/IO/StatsIO.cpp b/src/IO/StatsIO.cpp index 4312a46dd..5155ffd1f 100644 --- a/src/IO/StatsIO.cpp +++ b/src/IO/StatsIO.cpp @@ -2,10 +2,11 @@ #include #include #include +#include // for std::setw and std::setprecision namespace poet { - void writeStatsToCSV(const std::vector &all_stats, + void writeStatsToCSV(const std::vector &all_stats, const std::vector &species_names, const std::string &filename) { @@ -17,21 +18,32 @@ namespace poet } // header - out << "Iteration, Species, MAPE, RRSME \n"; + out << std::left << std::setw(15) << "Iteration" + << std::setw(15) << "Rollback" + << std::setw(15) << "Species" + << std::setw(15) << "MAPE" + << std::setw(15) << "RRSME" << "\n"; + out << std::string(75, '-') << "\n"; // separator line + + // data rows for (size_t i = 0; i < all_stats.size(); ++i) { for (size_t j = 0; j < species_names.size(); ++j) { - out << all_stats[i].iteration << ",\t" - << species_names[j] << ",\t" - << all_stats[i].mape[j] << ",\t" - << all_stats[i].rrsme[j] << "\n"; + out << std::left + << std::setw(15) << all_stats[i].iteration + << std::setw(15) << all_stats[i].rollback_count + << std::setw(15) << species_names[j] + << std::setw(15) << all_stats[i].mape[j] + << std::setw(15) << all_stats[i].rrmse[j] + << "\n"; } - out << std::endl; + out << "\n"; // blank line between iterations } out.close(); std::cout << "Stats written to " << filename << "\n"; } -} // namespace poet \ No newline at end of file +} + // namespace poet \ No newline at end of file diff --git a/src/IO/StatsIO.hpp b/src/IO/StatsIO.hpp index a865cc64a..a7fd1c606 100644 --- a/src/IO/StatsIO.hpp +++ b/src/IO/StatsIO.hpp @@ -3,7 +3,7 @@ namespace poet { - void writeStatsToCSV(const std::vector &all_stats, + void writeStatsToCSV(const std::vector &all_stats, const std::vector &species_names, const std::string &filename); } // namespace poet diff --git a/src/poet.cpp b/src/poet.cpp index 0f558b5d7..4cc308788 100644 --- a/src/poet.cpp +++ b/src/poet.cpp @@ -253,7 +253,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) try { - Rcpp::List init_params_(ReadRObj_R(init_file)); params.init_params = init_params_; @@ -266,14 +265,12 @@ int parseInitValues(int argc, char **argv, RuntimeParameters ¶ms) params.timesteps = Rcpp::as>(global_rt_setup->operator[]("timesteps")); - params.control_iteration = - Rcpp::as(global_rt_setup->operator[]("control_iteration")); - params.species_epsilon = - Rcpp::as>(global_rt_setup->operator[]("species_epsilon")); - params.penalty_iteration = - Rcpp::as(global_rt_setup->operator[]("penalty_iteration")); - params.max_penalty_iteration = - Rcpp::as(global_rt_setup->operator[]("max_penalty_iteration")); + params.control_interval = + Rcpp::as(global_rt_setup->operator[]("control_interval")); + params.checkpoint_interval = + Rcpp::as(global_rt_setup->operator[]("checkpoint_interval")); + params.mape_threshold = + Rcpp::as>(global_rt_setup->operator[]("mape_threshold")); } catch (const std::exception &e) { @@ -304,53 +301,38 @@ void call_master_iter_end(RInside &R, const Field &trans, const Field &chem) *global_rt_setup = R["setup"]; } -bool checkAndRollback(ChemistryModule &chem, RuntimeParameters ¶ms, uint32_t &iter) +bool triggerRollbackIfExceeded(ChemistryModule &chem, RuntimeParameters ¶ms, uint32_t ¤t_iteration) { - const std::vector &latest_mape = chem.error_stats_history.back().mape; + const std::vector &mape_values = chem.error_history.back().mape; - for (uint32_t j = 0; j < params.species_epsilon.size(); j++) + for (uint32_t i = 0; i < params.mape_threshold.size(); i++) { - if (params.species_epsilon[j] < latest_mape[j] && latest_mape[j] != 0) + // Skip if no meaningful MAPE value + if(mape_values[i] == 0){ + continue; + } + if (mape_values[i] > params.mape_threshold[i]) { - uint32_t rollback_iter = iter - (iter % params.control_iteration); + uint32_t rollback_iteration = ((current_iteration - 1) / params.checkpoint_interval) * params.checkpoint_interval; - std::cout << chem.getField().GetProps()[j] << " with a MAPE value of " << latest_mape[j] << " exceeds epsilon of " - << params.species_epsilon[j] << "! " << std::endl; + MSG("[THRESHOLD EXCEEDED] " + chem.getField().GetProps()[i] + " has MAPE = " + + std::to_string(mape_values[i]) + " exceeding threshold = " + std::to_string(params.mape_threshold[i]) + + " → rolling back to iteration " + std::to_string(rollback_iteration)); Checkpoint_s checkpoint_read{.field = chem.getField()}; - read_checkpoint("checkpoint" + std::to_string(rollback_iter) + ".hdf5", checkpoint_read); - iter = checkpoint_read.iteration; + read_checkpoint("checkpoint" + std::to_string(rollback_iteration) + ".hdf5", checkpoint_read); + current_iteration = checkpoint_read.iteration; + // Rollback happend return true; - } + } } - MSG("All spezies are below their threshold values"); + + MSG("All species are within their error thresholds."); return false; } -void updatePenaltyLogic(RuntimeParameters ¶ms, bool roolback_happend) -{ - if (roolback_happend) - { - params.rollback_simulation = true; - params.penalty_counter = params.penalty_iteration; - std::cout << "Penalty counter reset to: " << params.penalty_counter << std::endl; - MSG("Rollback! Penalty phase started for " + std::to_string(params.penalty_iteration) + " iterations."); - } - else - { - if (params.rollback_simulation && params.penalty_counter == 0) - { - params.rollback_simulation = false; - MSG("Penalty phase ended. Interpolation re-enabled."); - } - else if (!params.rollback_simulation) - { - params.penalty_iteration = std::min(params.penalty_iteration *= 2, params.max_penalty_iteration); - MSG("Stable surrogate phase detected. Penalty iteration doubled to " + std::to_string(params.penalty_iteration) + " iterations."); - } - } -} + static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, DiffusionModule &diffusion, @@ -367,21 +349,25 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, } R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps()); - params.next_penalty_check = params.penalty_iteration; - /* SIMULATION LOOP */ double dSimTime{0}; + double chkTime = 0.0; + for (uint32_t iter = 1; iter < maxiter + 1; iter++) { - // Penalty countdown - if (params.rollback_simulation && params.penalty_counter > 0) - { - params.penalty_counter--; - std::cout << "Penalty counter: " << params.penalty_counter << std::endl; + // Rollback countdowm + if (params.rollback_enabled) { + if (params.sur_disabled_counter > 0) { + --params.sur_disabled_counter; + MSG("Rollback counter: " + std::to_string(params.sur_disabled_counter)); + } else { + params.rollback_enabled = false; + } } - params.control_iteration_active = (iter % params.control_iteration == 0 /* && iter != 0 */); + params.global_iter = iter; + params.control_interval_enabled = (iter % params.control_interval == 0); double start_t = MPI_Wtime(); @@ -495,20 +481,27 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, MSG("End of *coupling* iteration " + std::to_string(iter) + "/" + std::to_string(maxiter)); - if (iter % params.control_iteration == 0) - { - writeStatsToCSV(chem.error_stats_history, chem.getField().GetProps(), "stats_overview"); + double chk_start = MPI_Wtime(); + + if(iter % params.checkpoint_interval == 0){ + MSG("Writing checkpoint of iteration " + std::to_string(iter)); write_checkpoint("checkpoint" + std::to_string(iter) + ".hdf5", {.field = chem.getField(), .iteration = iter}); } - if (iter == params.next_penalty_check) + if (params.control_interval_enabled && !params.rollback_enabled) { - bool roolback_happend = checkAndRollback(chem, params, iter); - updatePenaltyLogic(params, roolback_happend); + writeStatsToCSV(chem.error_history, chem.getField().GetProps(), "stats_overview"); - params.next_penalty_check = iter + params.penalty_iteration; + if(triggerRollbackIfExceeded(chem, params, iter)){ + params.rollback_enabled = true; + params.rollback_counter ++; + params.sur_disabled_counter = params.control_interval; + MSG("Interpolation disabled for the next " + std::to_string(params.control_interval) + "."); + } } + double chk_end = MPI_Wtime(); + chkTime += chk_end - chk_start; // MSG(); } // END SIMULATION LOOP @@ -526,6 +519,13 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, Rcpp::List diffusion_profiling; diffusion_profiling["simtime"] = diffusion.getTransportTime(); + Rcpp::List ctrl_profiling; + ctrl_profiling["checkpointing_time"] = chkTime; + ctrl_profiling["ctrl_logic_master"] = chem.GetMasterCtrlLogicTime(); + ctrl_profiling["bcast_ctrl_logic_master"] = chem.GetMasterCtrlBcastTime(); + ctrl_profiling["recv_ctrl_logic_maser"] = chem.GetMasterRecvCtrlLogicTime(); + ctrl_profiling["ctrl_logic_worker"] = Rcpp::wrap(chem.GetWorkerControlTimings()); + if (params.use_dht) { chem_profiling["dht_hits"] = Rcpp::wrap(chem.GetWorkerDHTHits()); @@ -554,6 +554,8 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, RuntimeParameters ¶ms, profiling["simtime"] = dSimTime; profiling["chemistry"] = chem_profiling; profiling["diffusion"] = diffusion_profiling; + profiling["ctrl_logic"] = ctrl_profiling; + chem.MasterLoopBreak(); diff --git a/src/poet.hpp.in b/src/poet.hpp.in index a5b82c150..6f9f0fabf 100644 --- a/src/poet.hpp.in +++ b/src/poet.hpp.in @@ -38,10 +38,10 @@ static const inline std::string ai_surrogate_r_library = R"(@R_AI_SURROGATE_LIB@)"; static const inline std::string r_runtime_parameters = "mysetup"; -struct RuntimeParameters { +struct RuntimeParameters +{ std::string out_dir; std::vector timesteps; - std::vector species_epsilon; Rcpp::List init_params; @@ -52,13 +52,15 @@ struct RuntimeParameters { bool print_progress = false; - std::uint32_t penalty_iteration = 0; - std::uint32_t max_penalty_iteration = 0; - std::uint32_t penalty_counter = 0; - std::uint32_t next_penalty_check = 0; - bool rollback_simulation = false; - bool control_iteration_active = false; - std::uint32_t control_iteration = 1; + bool rollback_enabled = false; + bool control_interval_enabled = false; + std::uint32_t global_iter = 0; + std::uint32_t sur_disabled_counter = 0; + std::uint32_t rollback_counter = 0; + std::uint32_t checkpoint_interval = 0; + std::uint32_t control_interval = 0; + std::vector mape_threshold; + std::vector rrmse_threshold; static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32; std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;