Update build process to use external library for DHT_MPI too

This commit is contained in:
Max Lübke 2024-03-07 13:58:04 +01:00
parent 7828b00268
commit 2c17604882
9 changed files with 135 additions and 112 deletions

View File

@ -31,6 +31,9 @@ set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
add_subdirectory(ext/tug EXCLUDE_FROM_ALL)
add_subdirectory(ext/phreeqcrm EXCLUDE_FROM_ALL)
option(DHT_WITH_MPI "" ON)
mark_as_advanced(DHT_WITH_MPI)
add_subdirectory(ext/DHT EXCLUDE_FROM_ALL)
option(POET_ENABLE_TESTING "Build test suite for POET" OFF)

@ -1 +1 @@
Subproject commit cdc7ebb0e76ebe3dae2ff7c2cd6ae2c6a7394980
Subproject commit f0a1924195a093c5f56753ca424179106f9a4d82

View File

@ -6,8 +6,8 @@ add_library(poetlib
Chemistry/WorkerFunctions.cpp
Chemistry/SurrogateModels/DHT_Wrapper.cpp
Chemistry/SurrogateModels/HashFunctions.cpp
Chemistry/SurrogateModels/InterpolationModule.cpp
Chemistry/SurrogateModels/ProximityHashTable.cpp
# Chemistry/SurrogateModels/InterpolationModule.cpp
# Chemistry/SurrogateModels/ProximityHashTable.cpp
DataStructures/Field.cpp
Transport/DiffusionModule.cpp
Transport/AdvectionModule.cpp
@ -19,10 +19,18 @@ target_link_libraries(poetlib PUBLIC
RRuntime
PhreeqcRM
tug
DHT_UCX
OpenSSL::Crypto
)
option(POET_USE_DHT_MPI "Use MPI for DHT" OFF)
if (NOT POET_USE_DHT_MPI)
target_compile_definitions(poetlib PUBLIC POET_DHT_UCX)
target_link_libraries(poetlib PUBLIC DHT_UCX)
else()
target_link_libraries(poetlib PUBLIC DHT_MPI)
endif()
target_compile_definitions(poetlib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP)

View File

@ -1,7 +1,7 @@
#include "ChemistryModule.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp"
// #include "SurrogateModels/Interpolation.hpp"
#include <PhreeqcRM.h>
@ -338,12 +338,12 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
this->dht_enabled = this->params.use_dht;
if (this->params.use_interp) {
initializeInterp(this->params.pht_max_entries, this->params.pht_size,
this->params.interp_min_entries,
this->params.pht_signifs);
this->interp_enabled = this->params.use_interp;
}
// if (this->params.use_interp) {
// initializeInterp(this->params.pht_max_entries, this->params.pht_size,
// this->params.interp_min_entries,
// this->params.pht_signifs);
// this->interp_enabled = this->params.use_interp;
// }
}
}
@ -453,47 +453,47 @@ void poet::ChemistryModule::setDHTReadFile(const std::string &input_file) {
}
}
void poet::ChemistryModule::initializeInterp(
std::uint32_t bucket_size, std::uint32_t size_mb, std::uint32_t min_entries,
const NamedVector<std::uint32_t> &key_species) {
// void poet::ChemistryModule::initializeInterp(
// std::uint32_t bucket_size, std::uint32_t size_mb, std::uint32_t
// min_entries, const NamedVector<std::uint32_t> &key_species) {
if (!this->is_master) {
// if (!this->is_master) {
constexpr uint32_t MB_FACTOR = 1E6;
// constexpr uint32_t MB_FACTOR = 1E6;
assert(this->dht);
// assert(this->dht);
this->interp_enabled = true;
// this->interp_enabled = true;
auto map_copy = key_species;
// auto map_copy = key_species;
if (key_species.empty()) {
map_copy = this->dht->getKeySpecies();
for (std::size_t i = 0; i < map_copy.size(); i++) {
const std::uint32_t signif =
map_copy[i] - (map_copy[i] > InterpolationModule::COARSE_DIFF
? InterpolationModule::COARSE_DIFF
: 0);
map_copy[i] = signif;
}
}
// if (key_species.empty()) {
// map_copy = this->dht->getKeySpecies();
// for (std::size_t i = 0; i < map_copy.size(); i++) {
// const std::uint32_t signif =
// map_copy[i] - (map_copy[i] > InterpolationModule::COARSE_DIFF
// ? InterpolationModule::COARSE_DIFF
// : 0);
// map_copy[i] = signif;
// }
// }
auto key_indices =
parseDHTSpeciesVec(map_copy, dht->getKeySpecies().getNames());
// auto key_indices =
// parseDHTSpeciesVec(map_copy, dht->getKeySpecies().getNames());
if (this->interp) {
this->interp.reset();
}
// if (this->interp) {
// this->interp.reset();
// }
const uint64_t pht_size = size_mb * MB_FACTOR;
// const uint64_t pht_size = size_mb * MB_FACTOR;
interp = std::make_unique<poet::InterpolationModule>(
bucket_size, pht_size, min_entries, *(this->dht), map_copy, key_indices,
this->prop_names, this->params.hooks);
// interp = std::make_unique<poet::InterpolationModule>(
// bucket_size, pht_size, min_entries, *(this->dht), map_copy,
// key_indices, this->prop_names, this->params.hooks);
interp->setInterpolationFunction(inverseDistanceWeighting);
}
}
// interp->setInterpolationFunction(inverseDistanceWeighting);
// }
// }
std::vector<double>
poet::ChemistryModule::shuffleField(const std::vector<double> &in_field,

View File

@ -6,7 +6,7 @@
#include "../Base/SimParams.hpp"
#include "../DataStructures/DataStructures.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp"
// #include "SurrogateModels/Interpolation.hpp"
#include <IrmResult.h>
#include <PhreeqcRM.h>
@ -354,7 +354,7 @@ protected:
poet::DHT_Wrapper *dht = nullptr;
bool interp_enabled{false};
std::unique_ptr<poet::InterpolationModule> interp;
// std::unique_ptr<poet::InterpolationModule> interp;
static constexpr uint32_t BUFFER_OFFSET = 5;

View File

@ -21,7 +21,6 @@
*/
#include "DHT_Wrapper.hpp"
#include "DHT_ucx/UCX_bcast_functions.h"
#include "HashFunctions.hpp"
#include <algorithm>
@ -30,7 +29,6 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <mpi.h>
#include <stdexcept>
#include <vector>
@ -52,12 +50,14 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
// initialize DHT object
// key size = count of key elements + timestep
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
uint32_t data_size =
(data_count + (with_interp ? input_key_elements.size() : 0)) *
sizeof(double);
// uint32_t data_size =
// (data_count + (with_interp ? input_key_elements.size() : 0)) *
// sizeof(double);
uint32_t data_size = data_count * sizeof(double);
uint32_t buckets_per_process =
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
#ifdef POET_DHT_UCX
const ucx_ep_args_mpi_t ucx_bcast_mpi_args = {.comm = dht_comm};
const DHT_init_t dht_init = {
.key_size = static_cast<int>(key_size),
@ -67,6 +67,10 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
.bcast_func = UCX_INIT_BSTRAP_MPI,
.bcast_func_args = &ucx_bcast_mpi_args};
dht_object = DHT_create(&dht_init);
#else
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
poet::md5_sum);
#endif
if (dht_object == nullptr) {
throw std::runtime_error("DHT_create failed");
@ -96,7 +100,11 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
DHT_Wrapper::~DHT_Wrapper() {
// free DHT
#ifdef POET_DHT_UCX
DHT_free(dht_object, NULL, NULL, NULL);
#else
DHT_free(dht_object, NULL, NULL);
#endif
}
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
-> const DHT_ResultObject & {

View File

@ -39,8 +39,12 @@
#include <utility>
#include <vector>
#ifdef POET_DHT_UCX
#include <DHT_ucx/DHT.h>
#include <DHT_ucx/UCX_bcast_functions.h>
#else
#include <DHT_mpi/DHT.h>
#endif
#include <mpi.h>

View File

@ -2,7 +2,7 @@
#include "ChemistryModule.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp"
// #include "SurrogateModels/Interpolation.hpp"
#include <IrmResult.h>
#include <algorithm>
@ -173,9 +173,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
timings.dht_get += dht_get_end - dht_get_start;
}
if (interp_enabled) {
interp->tryInterpolation(s_curr_wp);
}
// if (interp_enabled) {
// interp->tryInterpolation(s_curr_wp);
// }
phreeqc_time_start = MPI_Wtime();
@ -201,9 +201,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
dht->fillDHT(s_curr_wp);
dht_fill_end = MPI_Wtime();
if (interp_enabled) {
interp->writePairs();
}
// if (interp_enabled) {
// interp->writePairs();
// }
timings.dht_fill += dht_fill_end - dht_fill_start;
}
@ -227,17 +227,17 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
}
}
if (this->interp_enabled) {
std::stringstream out;
interp_calls.push_back(interp->getInterpolationCount());
interp->resetCounter();
interp->writePHTStats();
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
<< std::setw(this->file_pad) << iteration << ".pht";
interp->dumpPHTState(out.str());
}
}
// if (this->interp_enabled) {
// std::stringstream out;
// interp_calls.push_back(interp->getInterpolationCount());
// interp->resetCounter();
// interp->writePHTStats();
// if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
// out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
// << std::setw(this->file_pad) << iteration << ".pht";
// interp->dumpPHTState(out.str());
// }
// }
RInsidePOET::getInstance().parseEvalQ("gc()");
}
@ -246,12 +246,12 @@ void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
WorkerWriteDHTDump(iteration);
}
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
std::stringstream out;
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
<< std::setw(this->file_pad) << iteration << ".pht";
interp->dumpPHTState(out.str());
}
// if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
// std::stringstream out;
// out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
// << std::setw(this->file_pad) << iteration << ".pht";
// interp->dumpPHTState(out.str());
// }
}
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
@ -348,26 +348,26 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
this->group_comm);
break;
}
case WORKER_IP_WRITE: {
double val = interp->getPHTWriteTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
break;
}
case WORKER_IP_READ: {
double val = interp->getPHTReadTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
break;
}
case WORKER_IP_GATHER: {
double val = interp->getDHTGatherTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
break;
}
case WORKER_IP_FC: {
double val = interp->getInterpolationTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
break;
}
// case WORKER_IP_WRITE: {
// double val = interp->getPHTWriteTime();
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
// this->group_comm); break;
// }
// case WORKER_IP_READ: {
// double val = interp->getPHTReadTime();
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
// this->group_comm); break;
// }
// case WORKER_IP_GATHER: {
// double val = interp->getDHTGatherTime();
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
// this->group_comm); break;
// }
// case WORKER_IP_FC: {
// double val = interp->getInterpolationTime();
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
// this->group_comm); break;
// }
default: {
throw std::runtime_error("Unknown perf type in master's message.");
}
@ -406,11 +406,11 @@ void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
reduce_and_send(interp_calls, WORKER_IP_CALLS);
return;
}
case WORKER_PHT_CACHE_HITS: {
std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
reduce_and_send(input, WORKER_PHT_CACHE_HITS);
return;
}
// case WORKER_PHT_CACHE_HITS: {
// std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
// reduce_and_send(input, WORKER_PHT_CACHE_HITS);
// return;
// }
default: {
throw std::runtime_error("Unknown perf type in master's message.");
}

View File

@ -233,21 +233,21 @@ inline double RunMasterLoop(SimParams &params, RInsidePOET &R,
R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
if (params.getChemParams().use_interp) {
R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
R.parseEvalQ("profiling$interp_write <- interp_w");
R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
R.parseEvalQ("profiling$interp_read <- interp_r");
R["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
R.parseEvalQ("profiling$interp_gather <- interp_g");
R["interp_fc"] =
Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
R.parseEvalQ("profiling$interp_function_calls <- interp_fc");
R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls());
R.parseEvalQ("profiling$interp_calls <- interp_calls");
R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
R.parseEvalQ("profiling$interp_cached <- interp_cached");
}
// if (params.getChemParams().use_interp) {
// R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
// R.parseEvalQ("profiling$interp_write <- interp_w");
// R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
// R.parseEvalQ("profiling$interp_read <- interp_r");
// R["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
// R.parseEvalQ("profiling$interp_gather <- interp_g");
// R["interp_fc"] =
// Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
// R.parseEvalQ("profiling$interp_function_calls <- interp_fc");
// R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls());
// R.parseEvalQ("profiling$interp_calls <- interp_calls");
// R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
// R.parseEvalQ("profiling$interp_cached <- interp_cached");
// }
chem.MasterLoopBreak();