updates to ControlModule and Chemistry

This commit is contained in:
rastogi 2025-11-02 15:29:03 +01:00
parent 3fb6ca5209
commit c4da5ccfb8
3 changed files with 26 additions and 14 deletions

View File

@ -439,6 +439,8 @@ protected:
poet::ControlModule *control_module = nullptr; poet::ControlModule *control_module = nullptr;
std::vector<double> mpi_surr_buffer;
bool control_enabled{false}; bool control_enabled{false};
bool warmup_enabled{false}; bool warmup_enabled{false};

View File

@ -257,6 +257,8 @@ inline void poet::ChemistryModule::MasterSendPkgs(
/* note current processed work package in workerlist */ /* note current processed work package in workerlist */
w_list[p].send_addr = work_pointer.base(); w_list[p].send_addr = work_pointer.base();
w_list[p].surrogate_addr = sur_pointer.base(); w_list[p].surrogate_addr = sur_pointer.base();
// this->control_enabled ? sur_pointer.base() : w_list[p].surrogate_addr =
// nullptr;
/* push work pointer to next work package */ /* push work pointer to next work package */
const uint32_t end_of_wp = local_work_package_size * this->prop_count; const uint32_t end_of_wp = local_work_package_size * this->prop_count;
@ -354,6 +356,11 @@ inline void poet::ChemistryModule::MasterRecvPkgs(worker_list_t &w_list,
std::copy(recv_buffer.begin(), recv_buffer.begin() + half, std::copy(recv_buffer.begin(), recv_buffer.begin() + half,
w_list[p - 1].send_addr); w_list[p - 1].send_addr);
/*
if (w_list[p - 1].surrogate_addr == nullptr) {
throw std::runtime_error("MasterRecvPkgs: surrogate_addr is null");
}*/
std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size, std::copy(recv_buffer.begin() + (size / 2), recv_buffer.begin() + size,
w_list[p - 1].surrogate_addr); w_list[p - 1].surrogate_addr);
recv_ctrl_b = MPI_Wtime(); recv_ctrl_b = MPI_Wtime();
@ -443,6 +450,11 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
MPI_Barrier(this->group_comm); MPI_Barrier(this->group_comm);
this->control_enabled = this->control_module->getControlIntervalEnabled();
if (this->control_enabled) {
this->mpi_surr_buffer.assign(this->n_cells * this->prop_count, 0.0);
}
static uint32_t iteration = 0; static uint32_t iteration = 0;
/* start time measurement of sequential part */ /* start time measurement of sequential part */
@ -454,20 +466,16 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count, shuffleField(chem_field.AsVector(), this->n_cells, this->prop_count,
wp_sizes_vector.size()); wp_sizes_vector.size());
control_enabled = this->control_module->getControlIntervalEnabled() ? 1 : 0; //this->mpi_surr_buffer.resize(mpi_buffer.size());
std::vector<double> mpi_surr_buffer{mpi_buffer};
std::cout << "control_enabled is " << control_enabled << ", "
<< "warmup_enabled is " << warmup_enabled << ", "
<< "dht_enabled is " << dht_enabled << ", "
<< "interp_enabled is " << interp_enabled << std::endl;
/* setup local variables */ /* setup local variables */
pkg_to_send = wp_sizes_vector.size(); pkg_to_send = wp_sizes_vector.size();
pkg_to_recv = wp_sizes_vector.size(); pkg_to_recv = wp_sizes_vector.size();
workpointer_t work_pointer = mpi_buffer.begin(); workpointer_t work_pointer = mpi_buffer.begin();
workpointer_t sur_pointer = mpi_surr_buffer.begin(); workpointer_t sur_pointer = this->mpi_surr_buffer.begin();
//(this->control_enabled ? this->mpi_surr_buffer.begin()
// : mpi_buffer.end());
worker_list_t worker_list(this->comm_size - 1); worker_list_t worker_list(this->comm_size - 1);
free_workers = this->comm_size - 1; free_workers = this->comm_size - 1;
@ -515,13 +523,13 @@ void poet::ChemistryModule::MasterRunParallel(double dt) {
chem_field = out_vec; chem_field = out_vec;
/* do master stuff */ /* do master stuff */
if (control_enabled) { if (this->control_enabled) {
std::cout << "[Master] Control logic enabled for this iteration." std::cout << "[Master] Control logic enabled for this iteration."
<< std::endl; << std::endl;
std::vector<double> sur_unshuffled{mpi_surr_buffer}; std::vector<double> sur_unshuffled{mpi_surr_buffer};
shuf_a = MPI_Wtime(); shuf_a = MPI_Wtime();
unshuffleField(mpi_surr_buffer, this->n_cells, this->prop_count, unshuffleField(this->mpi_surr_buffer, this->n_cells, this->prop_count,
wp_sizes_vector.size(), sur_unshuffled); wp_sizes_vector.size(), sur_unshuffled);
shuf_b = MPI_Wtime(); shuf_b = MPI_Wtime();
this->shuf_t += shuf_b - shuf_a; this->shuf_t += shuf_b - shuf_a;

View File

@ -39,13 +39,13 @@ void poet::ControlModule::initiateWarmupPhase(bool dht_enabled,
chem->SetWarmupEnabled(true); chem->SetWarmupEnabled(true);
chem->SetDhtEnabled(false); chem->SetDhtEnabled(false);
chem->SetInterpEnabled(false); chem->SetInterpEnabled(false);
MSG("Warmup enabled until next control interval at iteration " + // MSG("Warmup enabled until next control interval at iteration " +
std::to_string(control_interval) + "."); // std::to_string(control_interval) + ".");
if (rollback_enabled) { if (rollback_enabled) {
if (sur_disabled_counter > 0) { if (sur_disabled_counter > 0) {
--sur_disabled_counter; --sur_disabled_counter;
MSG("Rollback counter: " + std::to_string(sur_disabled_counter)); //MSG("Rollback counter: " + std::to_string(sur_disabled_counter));
} else { } else {
rollback_enabled = false; rollback_enabled = false;
} }
@ -65,12 +65,14 @@ void poet::ControlModule::applyControlLogic(ChemistryModule &chem,
} }
writeCheckpointAndMetrics(chem, iter); writeCheckpointAndMetrics(chem, iter);
if (checkAndRollback(chem, iter) && rollback_count < 4) { if (checkAndRollback(chem, iter) /* && rollback_count < 4*/) {
rollback_enabled = true; rollback_enabled = true;
rollback_count++; rollback_count++;
sur_disabled_counter = control_interval; sur_disabled_counter = control_interval;
/*
MSG("Interpolation disabled for the next " + MSG("Interpolation disabled for the next " +
std::to_string(control_interval) + "."); std::to_string(control_interval) + ".");
*/
} }
} }