use better communicator creation

This commit is contained in:
Max Lübke 2021-01-28 15:16:35 +01:00
parent b96131fa71
commit b85575a69b

View File

@ -100,22 +100,13 @@ int main(int argc, char *argv[]) {
/*Create custom Communicator with all processes except 0 (the master) for DHT
* storage*/
// only needed if strategy == 0, but done anyway
MPI_Group dht_group, group_world;
MPI_Comm dht_comm;
int *process_ranks;
// make a list of processes in the new communicator
process_ranks = (int *)malloc(world_size * sizeof(int));
for (int I = 1; I < world_size; I++) process_ranks[I - 1] = I;
// get the group under MPI_COMM_WORLD
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
// create the new group
MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group);
// create the new communicator
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
free(process_ranks); // cleanup
if (world_rank == 0) {
MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, world_rank, &dht_comm);
} else {
MPI_Comm_split(MPI_COMM_WORLD, 1, world_rank, &dht_comm);
}
// cout << "Done";
// if (cmdl[{"help", "h"}]) {
@ -430,188 +421,187 @@ int main(int argc, char *argv[]) {
master.run();
}
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
// cummul_transport += trans.getTransportTime();
// cummul_chemistry += master.getChemistryTime();
// cummul_transport += trans.getTransportTime();
// cummul_chemistry += master.getChemistryTime();
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
// master_send.push_back(master.getSendTime(), "it_" + to_string(iter));
// master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter));
// master_send.push_back(master.getSendTime(), "it_" + to_string(iter));
// master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter));
for (int i = 1; i < params.world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
for (int i = 1; i < params.world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
} // END SIMULATION LOOP
cout << "CPP: finished simulation loop" << endl;
sim_end = MPI_Wtime();
cout << "CPP: start timing profiling" << endl;
R.parseEvalQ("profiling <- list()");
R["simtime"] = sim_end - sim_start;
R.parseEvalQ("profiling$simtime <- simtime");
trans.end();
if (params.world_size == 1) {
master.ChemSim::end();
} else {
master.end();
}
MPI_Barrier(MPI_COMM_WORLD);
// Rcpp::NumericVector phreeqc_time;
// Rcpp::NumericVector dht_get_time;
// Rcpp::NumericVector dht_fill_time;
// Rcpp::IntegerVector phreeqc_counts;
// Rcpp::NumericVector idle_worker;
} // END SIMULATION LOOP
// int phreeqc_tmp;
cout << "CPP: finished simulation loop" << endl;
// timings = (double *)calloc(3, sizeof(double));
sim_end = MPI_Wtime();
// int dht_hits = 0;
// int dht_miss = 0;
// int dht_collision = 0;
cout << "CPP: start timing profiling" << endl;
// if (params.dht_enabled) {
// dht_hits = 0;
// dht_miss = 0;
// dht_collision = 0;
// dht_perfs = (uint64_t *)calloc(3, sizeof(uint64_t));
// }
R.parseEvalQ("profiling <- list()");
// double idle_worker_tmp;
R["simtime"] = sim_end - sim_start;
R.parseEvalQ("profiling$simtime <- simtime");
// for (int p = 0; p < params.world_size - 1; p++) {
// /* ATTENTION Worker p has rank p+1 */
// /* Send termination message to worker */
// MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
trans.end();
// MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
if (params.world_size == 1) {
master.ChemSim::end();
} else {
master.end();
// MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
// MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
// if (params.dht_enabled) {
// dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
// dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
// MPI_Recv(dht_perfs, 3, MPI_UNSIGNED_LONG_LONG, p + 1, TAG_DHT_PERF,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// dht_hits += dht_perfs[0];
// dht_miss += dht_perfs[1];
// dht_collision += dht_perfs[2];
// }
// }
// R.parseEvalQ("profiling <- list()");
// R["simtime"] = sim_end - sim_start;
// R.parseEvalQ("profiling$simtime <- simtime");
// R["simtime_transport"] = cummul_transport;
// R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
// R["simtime_chemistry"] = cummul_chemistry;
// R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
// R["simtime_workers"] = master.getWorkerTime();
// R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
// R["simtime_chemistry_master"] = master.getChemMasterTime();
// R.parseEvalQ(
// "profiling$simtime_chemistry_master <- simtime_chemistry_master");
// R["seq_master"] = cummul_master_seq;
// R.parseEvalQ("profiling$seq_master <- seq_master");
// // R["master_send"] = master_send;
// // R.parseEvalQ("profiling$master_send <- master_send");
// // R["master_recv"] = master_recv;
// // R.parseEvalQ("profiling$master_recv <- master_recv");
// R["idle_master"] = master.getIdleTime();
// R.parseEvalQ("profiling$idle_master <- idle_master");
// R["idle_worker"] = idle_worker;
// R.parseEvalQ("profiling$idle_worker <- idle_worker");
// R["phreeqc_time"] = phreeqc_time;
// R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
// R["phreeqc_count"] = phreeqc_counts;
// R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
// if (params.dht_enabled) {
// R["dht_hits"] = dht_hits;
// R.parseEvalQ("profiling$dht_hits <- dht_hits");
// R["dht_miss"] = dht_miss;
// R.parseEvalQ("profiling$dht_miss <- dht_miss");
// R["dht_collision"] = dht_collision;
// R.parseEvalQ("profiling$dht_collisions <- dht_collision");
// R["dht_get_time"] = dht_get_time;
// R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
// R["dht_fill_time"] = dht_fill_time;
// R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
// }
// free(timings);
// if (params.dht_enabled) free(dht_perfs);
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: Done! Results are stored as R objects into <"
<< params.out_dir << "/timings.rds>" << endl;
/*exporting results and profiling data*/
// std::string r_vis_code;
// r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
// R.parseEval(r_vis_code);
} else { /*This is executed by the workers*/
ChemWorker worker(&params, R, grid, dht_comm);
// worker.prepareSimulation(dht_comm);
worker.loop();
}
// Rcpp::NumericVector phreeqc_time;
// Rcpp::NumericVector dht_get_time;
// Rcpp::NumericVector dht_fill_time;
// Rcpp::IntegerVector phreeqc_counts;
// Rcpp::NumericVector idle_worker;
// int phreeqc_tmp;
// timings = (double *)calloc(3, sizeof(double));
// int dht_hits = 0;
// int dht_miss = 0;
// int dht_collision = 0;
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
// if (params.dht_enabled) {
// dht_hits = 0;
// dht_miss = 0;
// dht_collision = 0;
// dht_perfs = (uint64_t *)calloc(3, sizeof(uint64_t));
// }
// double idle_worker_tmp;
// for (int p = 0; p < params.world_size - 1; p++) {
// /* ATTENTION Worker p has rank p+1 */
// /* Send termination message to worker */
// MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
// MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
// MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
// MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
// if (params.dht_enabled) {
// dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
// dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
// MPI_Recv(dht_perfs, 3, MPI_UNSIGNED_LONG_LONG, p + 1, TAG_DHT_PERF,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// dht_hits += dht_perfs[0];
// dht_miss += dht_perfs[1];
// dht_collision += dht_perfs[2];
// if (params.dht_strategy == 0) {
// if (params.world_rank != 0) {
// DHT_free(dht_object, NULL, NULL);
// }
// } else {
// DHT_free(dht_object, NULL, NULL);
// }
// }
// R.parseEvalQ("profiling <- list()");
// free(mpi_buffer);
MPI_Finalize();
// R["simtime"] = sim_end - sim_start;
// R.parseEvalQ("profiling$simtime <- simtime");
// R["simtime_transport"] = cummul_transport;
// R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
// R["simtime_chemistry"] = cummul_chemistry;
// R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
// R["simtime_workers"] = master.getWorkerTime();
// R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
// R["simtime_chemistry_master"] = master.getChemMasterTime();
// R.parseEvalQ(
// "profiling$simtime_chemistry_master <- simtime_chemistry_master");
if (params.world_rank == 0) {
cout << "CPP: done, bye!" << endl;
}
// R["seq_master"] = cummul_master_seq;
// R.parseEvalQ("profiling$seq_master <- seq_master");
// // R["master_send"] = master_send;
// // R.parseEvalQ("profiling$master_send <- master_send");
// // R["master_recv"] = master_recv;
// // R.parseEvalQ("profiling$master_recv <- master_recv");
// R["idle_master"] = master.getIdleTime();
// R.parseEvalQ("profiling$idle_master <- idle_master");
// R["idle_worker"] = idle_worker;
// R.parseEvalQ("profiling$idle_worker <- idle_worker");
// R["phreeqc_time"] = phreeqc_time;
// R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
// R["phreeqc_count"] = phreeqc_counts;
// R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
// if (params.dht_enabled) {
// R["dht_hits"] = dht_hits;
// R.parseEvalQ("profiling$dht_hits <- dht_hits");
// R["dht_miss"] = dht_miss;
// R.parseEvalQ("profiling$dht_miss <- dht_miss");
// R["dht_collision"] = dht_collision;
// R.parseEvalQ("profiling$dht_collisions <- dht_collision");
// R["dht_get_time"] = dht_get_time;
// R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
// R["dht_fill_time"] = dht_fill_time;
// R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
// }
// free(timings);
// if (params.dht_enabled) free(dht_perfs);
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: Done! Results are stored as R objects into <" << params.out_dir
<< "/timings.rds>" << endl;
/*exporting results and profiling data*/
// std::string r_vis_code;
// r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
// R.parseEval(r_vis_code);
}
else { /*This is executed by the workers*/
ChemWorker worker(&params, R, grid, dht_comm);
// worker.prepareSimulation(dht_comm);
worker.loop();
}
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
// if (params.dht_enabled) {
// if (params.dht_strategy == 0) {
// if (params.world_rank != 0) {
// DHT_free(dht_object, NULL, NULL);
// }
// } else {
// DHT_free(dht_object, NULL, NULL);
// }
// }
// free(mpi_buffer);
MPI_Finalize();
if (params.world_rank == 0) {
cout << "CPP: done, bye!" << endl;
}
exit(0);
exit(0);
}