Added Cl threshold in WorkerDoWork()

This commit is contained in:
rastogi 2025-12-09 16:54:04 +01:00
parent be5b42392a
commit 36b6f8d859
5 changed files with 54 additions and 57 deletions

View File

@ -4,6 +4,8 @@ SOLUTION 1
temperature 25 temperature 25
pH 7 pH 7
pe 4 pe 4
Mg 1e-12
Cl 2e-12
PURE 1 PURE 1
Calcite 0.0 1 Calcite 0.0 1
END END

View File

@ -6,7 +6,7 @@ mape_threshold <- rep(0.0035, 13)
mape_threshold[5] <- 1 #Charge mape_threshold[5] <- 1 #Charge
zero_abs <- 1e-13 zero_abs <- 1e-13
rb_limit <- 3 rb_limit <- 3
rb_interval_limit <- 100 rb_interval_limit <- 200
#ctrl_cell_ids <- seq(0, (400*400)/2 - 1, by = 401) #ctrl_cell_ids <- seq(0, (400*400)/2 - 1, by = 401)
#out_save <- seq(500, iterations, by = 500) #out_save <- seq(500, iterations, by = 500)

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
#SBATCH --job-name=p1_eps0035_r1 #SBATCH --job-name=p1_eps0035_200
#SBATCH --output=p1_eps0035_r2_%j.out #SBATCH --output=p1_eps0035_200_%j.out
#SBATCH --error=p1_eps0035_r2_%j.err #SBATCH --error=p1_eps0035_200_%j.err
#SBATCH --partition=long #SBATCH --partition=long
#SBATCH --nodes=6 #SBATCH --nodes=6
#SBATCH --ntasks-per-node=24 #SBATCH --ntasks-per-node=24
@ -15,5 +15,5 @@ module purge
module load cmake gcc openmpi module load cmake gcc openmpi
#mpirun -n 144 ./poet dolo_fgcs_3.R dolo_fgcs_3.qs2 dolo_only_pqc #mpirun -n 144 ./poet dolo_fgcs_3.R dolo_fgcs_3.qs2 dolo_only_pqc
mpirun -n 144 ./poet --interp dolo_fgcs_3_rt.R dolo_fgcs_3.qs2 p1_eps0035_r2 mpirun -n 144 ./poet --interp dolo_fgcs_3_rt.R dolo_fgcs_3.qs2 p1_eps0035_200
#mpirun -n 144 ./poet --interp barite_fgcs_4_new/barite_fgcs_4_new_rt.R barite_fgcs_4_new/barite_fgcs_4_new.qs2 barite #mpirun -n 144 ./poet --interp barite_fgcs_4_new/barite_fgcs_4_new_rt.R barite_fgcs_4_new/barite_fgcs_4_new.qs2 barite

View File

@ -6,7 +6,7 @@
namespace poet { namespace poet {
enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_CHARGE, DHT_TYPE_TOTAL }; enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_CHARGE, DHT_TYPE_TOTAL };
enum CHEMISTRY_OUT_SOURCE { CHEM_PQC, CHEM_DHT, CHEM_INTERP, CHEM_AISURR }; enum CHEMISTRY_OUT_SOURCE { CHEM_PQC, CHEM_DHT, CHEM_INTERP, CHEM_AISURR, CHEM_SKIP };
struct WorkPackage { struct WorkPackage {
std::size_t size; std::size_t size;

View File

@ -48,15 +48,14 @@ void poet::ChemistryModule::WorkerLoop() {
case CHEM_FIELD_INIT: { case CHEM_FIELD_INIT: {
ChemBCast(&this->prop_count, 1, MPI_UINT32_T); ChemBCast(&this->prop_count, 1, MPI_UINT32_T);
if (this->ai_surrogate_enabled) { if (this->ai_surrogate_enabled) {
this->ai_surrogate_validity_vector.resize( this->ai_surrogate_validity_vector.resize(this->n_cells); // resize statt reserve?
this->n_cells); // resize statt reserve?
} }
break; break;
} }
case CHEM_AI_BCAST_VALIDITY: { case CHEM_AI_BCAST_VALIDITY: {
// Receive the index vector of valid ai surrogate predictions // Receive the index vector of valid ai surrogate predictions
MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, MPI_INT, 0,
MPI_INT, 0, this->group_comm); this->group_comm);
break; break;
} }
case CHEM_CTRL_ENABLE: { case CHEM_CTRL_ENABLE: {
@ -130,17 +129,16 @@ void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
} }
} }
void poet::ChemistryModule::copyPkgs(const WorkPackage &wp, void poet::ChemistryModule::copyPkgs(const WorkPackage &wp,
std::vector<double> &mpi_buffer, std::vector<double> &mpi_buffer,
std::size_t offset) { std::size_t offset) {
for (std::size_t wp_i = 0; wp_i < wp.size; wp_i++) { for (std::size_t wp_i = 0; wp_i < wp.size; wp_i++) {
std::copy(wp.output[wp_i].begin(), wp.output[wp_i].end(), std::copy(wp.output[wp_i].begin(), wp.output[wp_i].end(),
mpi_buffer.begin() + offset + this->prop_count * wp_i); mpi_buffer.begin() + offset + this->prop_count * wp_i);
} }
} }
void poet::ChemistryModule::copyCtrlPkgs(const WorkPackage &pqc_wp, void poet::ChemistryModule::copyCtrlPkgs(const WorkPackage &pqc_wp,
const WorkPackage &surr_wp, const WorkPackage &surr_wp,
std::vector<double> &mpi_buffer, std::vector<double> &mpi_buffer, int &count) {
int &count) {
std::size_t wp_offset = surr_wp.size * this->prop_count; std::size_t wp_offset = surr_wp.size * this->prop_count;
mpi_buffer.resize(count + wp_offset); mpi_buffer.resize(count + wp_offset);
@ -162,8 +160,7 @@ void poet::ChemistryModule::copyCtrlPkgs(const WorkPackage &pqc_wp,
count += wp_offset; count += wp_offset;
} }
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status, int double_count,
int double_count,
struct worker_s &timings) { struct worker_s &timings) {
static int counter = 1; static int counter = 1;
@ -180,6 +177,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
int flags; int flags;
std::vector<double> mpi_buffer(count); std::vector<double> mpi_buffer(count);
const int CL_INDEX = 7;
const double CL_THRESHOLD = 1e-10;
/* receive */ /* receive */
MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm, MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm,
MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
@ -216,6 +216,16 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
mpi_buffer.begin() + this->prop_count * (wp_i + 1)); mpi_buffer.begin() + this->prop_count * (wp_i + 1));
} }
/* skip simulation of cells cells where Cl concentration is below threshold */
/*
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
if (s_curr_wp.input[wp_i][CL_INDEX] < CL_THRESHOLD) {
s_curr_wp.mapping[wp_i] = CHEM_SKIP;
s_curr_wp.output[wp_i] = s_curr_wp.input[wp_i];
}
}
*/
// std::cout << this->comm_rank << ":" << counter++ << std::endl; // std::cout << this->comm_rank << ":" << counter++ << std::endl;
if (dht_enabled || interp_enabled || stab_enabled) { if (dht_enabled || interp_enabled || stab_enabled) {
dht->prepareKeys(s_curr_wp.input, dt); dht->prepareKeys(s_curr_wp.input, dt);
@ -250,8 +260,7 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
if (ctrl_enabled) { if (ctrl_enabled) {
ctrl_cp_start = MPI_Wtime(); ctrl_cp_start = MPI_Wtime();
for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) { for (std::size_t wp_i = 0; wp_i < s_curr_wp_control.size; wp_i++) {
s_curr_wp_control.output[wp_i] = s_curr_wp_control.output[wp_i] = std::vector<double>(this->prop_count, 0.0);
std::vector<double>(this->prop_count, 0.0);
s_curr_wp_control.mapping[wp_i] = CHEM_PQC; s_curr_wp_control.mapping[wp_i] = CHEM_PQC;
} }
ctrl_cp_end = MPI_Wtime(); ctrl_cp_end = MPI_Wtime();
@ -260,8 +269,8 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
WorkerRunWorkPackage(ctrl_enabled ? s_curr_wp_control : s_curr_wp, WorkerRunWorkPackage(ctrl_enabled ? s_curr_wp_control : s_curr_wp, current_sim_time,
current_sim_time, dt); dt);
phreeqc_time_end = MPI_Wtime(); phreeqc_time_end = MPI_Wtime();
@ -278,8 +287,7 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
MPI_Request send_req; MPI_Request send_req;
int mpi_tag = ctrl_enabled ? LOOP_CTRL : LOOP_WORK; int mpi_tag = ctrl_enabled ? LOOP_CTRL : LOOP_WORK;
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, mpi_tag, MPI_COMM_WORLD, &send_req);
&send_req);
if (dht_enabled || interp_enabled || stab_enabled) { if (dht_enabled || interp_enabled || stab_enabled) {
/* write results to DHT */ /* write results to DHT */
@ -297,19 +305,18 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
MPI_Wait(&send_req, MPI_STATUS_IGNORE); MPI_Wait(&send_req, MPI_STATUS_IGNORE);
} }
void poet::ChemistryModule::WorkerPostIter(MPI_Status &probe_status, void poet::ChemistryModule::WorkerPostIter(MPI_Status &probe_status, uint32_t iteration) {
uint32_t iteration) {
int size, flush = 0; int size, flush = 0;
MPI_Get_count(&probe_status, MPI_INT, &size); MPI_Get_count(&probe_status, MPI_INT, &size);
if (size == 1) { if (size == 1) {
MPI_Recv(&flush, size, MPI_INT, probe_status.MPI_SOURCE, LOOP_END, MPI_Recv(&flush, size, MPI_INT, probe_status.MPI_SOURCE, LOOP_END, this->group_comm,
this->group_comm, MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
} else { } else {
MPI_Recv(NULL, 0, MPI_INT, probe_status.MPI_SOURCE, LOOP_END, MPI_Recv(NULL, 0, MPI_INT, probe_status.MPI_SOURCE, LOOP_END, this->group_comm,
this->group_comm, MPI_STATUS_IGNORE); MPI_STATUS_IGNORE);
} }
if (this->dht_enabled) { if (this->dht_enabled) {
@ -333,8 +340,7 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &probe_status,
interp->dumpPHTState(out.str()); interp->dumpPHTState(out.str());
} }
const auto max_mean_idx = const auto max_mean_idx = DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
if (max_mean_idx >= 2 || flush) { if (max_mean_idx >= 2 || flush) {
DHT_flush(this->interp->getDHTObject()); DHT_flush(this->interp->getDHTObject());
@ -366,21 +372,17 @@ void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
<< std::setw(this->file_pad) << iteration << ".dht"; << std::setw(this->file_pad) << iteration << ".dht";
int res = dht->tableToFile(out.str().c_str()); int res = dht->tableToFile(out.str().c_str());
if (res != DHT_SUCCESS && this->comm_rank == 2) if (res != DHT_SUCCESS && this->comm_rank == 2)
std::cerr std::cerr << "CPP: Worker: Error in writing current state of DHT to file.\n";
<< "CPP: Worker: Error in writing current state of DHT to file.\n";
else if (this->comm_rank == 2) else if (this->comm_rank == 2)
std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() std::cout << "CPP: Worker: Successfully written DHT to file " << out.str() << "\n";
<< "\n";
} }
void poet::ChemistryModule::WorkerReadDHTDump( void poet::ChemistryModule::WorkerReadDHTDump(const std::string &dht_input_file) {
const std::string &dht_input_file) {
int res = dht->fileToTable((char *)dht_input_file.c_str()); int res = dht->fileToTable((char *)dht_input_file.c_str());
if (res != DHT_SUCCESS) { if (res != DHT_SUCCESS) {
if (res == DHT_WRONG_FILE) { if (res == DHT_WRONG_FILE) {
if (this->comm_rank == 1) if (this->comm_rank == 1)
std::cerr std::cerr << "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n";
<< "CPP: Worker: Wrong file layout! Continue with empty DHT ...\n";
} else { } else {
if (this->comm_rank == 1) if (this->comm_rank == 1)
std::cerr << "CPP: Worker: Error in loading current state of DHT from " std::cerr << "CPP: Worker: Error in loading current state of DHT from "
@ -394,8 +396,7 @@ void poet::ChemistryModule::WorkerReadDHTDump(
} }
void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package, void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
double dSimTime, double dSimTime, double dTimestep) {
double dTimestep) {
std::vector<std::vector<double>> inout_chem = work_package.input; std::vector<std::vector<double>> inout_chem = work_package.input;
std::vector<std::size_t> to_ignore; std::vector<std::size_t> to_ignore;
@ -406,8 +407,7 @@ void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
} }
// HACK: remove the first element (cell_id) before sending to phreeqc // HACK: remove the first element (cell_id) before sending to phreeqc
inout_chem[wp_id].erase(inout_chem[wp_id].begin(), inout_chem[wp_id].erase(inout_chem[wp_id].begin(), inout_chem[wp_id].begin() + 1);
inout_chem[wp_id].begin() + 1);
} }
this->pqc_runner->run(inout_chem, dTimestep, to_ignore); this->pqc_runner->run(inout_chem, dTimestep, to_ignore);
@ -423,8 +423,7 @@ void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
} }
} }
void poet::ChemistryModule::WorkerPerfToMaster(int type, void poet::ChemistryModule::WorkerPerfToMaster(int type, const struct worker_s &timings) {
const struct worker_s &timings) {
switch (type) { switch (type) {
case WORKER_PHREEQC: { case WORKER_PHREEQC: {
MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, MPI_Gather(&timings.phreeqc_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
@ -432,13 +431,11 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
break; break;
} }
case WORKER_CTRL_ITER: { case WORKER_CTRL_ITER: {
MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, MPI_Gather(&timings.ctrl_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
this->group_comm);
break; break;
} }
case WORKER_DHT_GET: { case WORKER_DHT_GET: {
MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, MPI_Gather(&timings.dht_get, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
this->group_comm);
break; break;
} }
case WORKER_DHT_FILL: { case WORKER_DHT_FILL: {
@ -447,8 +444,7 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
break; break;
} }
case WORKER_IDLE: { case WORKER_IDLE: {
MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, MPI_Gather(&timings.idle_t, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
this->group_comm);
break; break;
} }
case WORKER_IP_WRITE: { case WORKER_IP_WRITE: {
@ -484,15 +480,14 @@ void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
MPI_Comm &group_comm = this->group_comm; MPI_Comm &group_comm = this->group_comm;
auto reduce_and_send = [&worker_rank, &worker_comm, &group_comm]( auto reduce_and_send = [&worker_rank, &worker_comm,
std::vector<std::uint32_t> &send_buffer, int tag) { &group_comm](std::vector<std::uint32_t> &send_buffer, int tag) {
std::vector<uint32_t> to_master(send_buffer.size()); std::vector<uint32_t> to_master(send_buffer.size());
MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), MPI_Reduce(send_buffer.data(), to_master.data(), send_buffer.size(), MPI_UINT32_T,
MPI_UINT32_T, MPI_SUM, 0, worker_comm); MPI_SUM, 0, worker_comm);
if (worker_rank == 0) { if (worker_rank == 0) {
MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, MPI_Send(to_master.data(), to_master.size(), MPI_UINT32_T, 0, tag, group_comm);
group_comm);
} }
}; };