Add support for tracking and reporting corrupt DHT buckets

This commit is contained in:
Max Lübke 2024-03-19 11:38:17 +01:00
parent b0a4e7d64f
commit d5e11c851b
7 changed files with 43 additions and 1 deletions

@ -1 +1 @@
Subproject commit f0a1924195a093c5f56753ca424179106f9a4d82
Subproject commit 19d4510fdcb9cecf3d8df427eea4966053835a82

View File

@ -208,6 +208,8 @@ public:
*/
std::vector<uint32_t> GetWorkerDHTEvictions() const;
std::vector<uint32_t> GetWorkerDHTCorruptBuckets() const;
/**
* **Master only** Returns the current state of the chemical field.
*
@ -273,6 +275,7 @@ protected:
WORKER_IP_FC,
WORKER_DHT_HITS,
WORKER_DHT_EVICTIONS,
WORKER_DHT_CORRUPT,
WORKER_PHT_CACHE_HITS,
WORKER_IP_CALLS
};
@ -280,6 +283,7 @@ protected:
std::vector<uint32_t> interp_calls;
std::vector<uint32_t> dht_hits;
std::vector<uint32_t> dht_evictions;
std::vector<uint32_t> corrupt_buckets;
struct worker_s {
double phreeqc_t = 0.;

View File

@ -97,6 +97,25 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTEvictions() const {
return ret;
}
std::vector<uint32_t>
poet::ChemistryModule::GetWorkerDHTCorruptBuckets() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
type = WORKER_DHT_CORRUPT;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
MPI_Status probe;
MPI_Probe(MPI_ANY_SOURCE, WORKER_DHT_CORRUPT, this->group_comm, &probe);
int count;
MPI_Get_count(&probe, MPI_UINT32_T, &count);
std::vector<uint32_t> ret(count);
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE,
WORKER_DHT_CORRUPT, this->group_comm, NULL);
return ret;
}
std::vector<double>
poet::ChemistryModule::GetWorkerInterpolationWriteTimings() const {
int type = CHEM_PERF;

View File

@ -21,6 +21,7 @@
*/
#include "DHT_Wrapper.hpp"
#include "DHT_ucx/DHT.h"
#include "HashFunctions.hpp"
#include <algorithm>
@ -134,6 +135,11 @@ auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
break;
case DHT_READ_MISS:
break;
#if POET_DHT_UCX
case DHT_READ_CORRUPT:
this->corrupt_buckets++;
break;
#endif
}
}

View File

@ -190,9 +190,12 @@ public:
*/
auto getEvictions() { return this->dht_evictions; };
auto getCorruptBuckets() { return this->corrupt_buckets; }
void resetCounter() {
this->dht_hits = 0;
this->dht_evictions = 0;
this->corrupt_buckets = 0;
}
void SetSignifVector(std::vector<uint32_t> signif_vec);
@ -254,6 +257,7 @@ private:
uint32_t dht_hits = 0;
uint32_t dht_evictions = 0;
uint32_t corrupt_buckets = 0;
NamedVector<std::uint32_t> key_species;

View File

@ -220,11 +220,13 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
if (this->dht_enabled) {
dht_hits.push_back(dht->getHits());
dht_evictions.push_back(dht->getEvictions());
corrupt_buckets.push_back(dht->getCorruptBuckets());
dht->resetCounter();
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
WorkerWriteDHTDump(iteration);
}
dht->printStatistics();
}
// if (this->interp_enabled) {
@ -402,6 +404,10 @@ void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
break;
}
case WORKER_DHT_CORRUPT: {
reduce_and_send(corrupt_buckets, WORKER_DHT_CORRUPT);
break;
}
case WORKER_IP_CALLS: {
reduce_and_send(interp_calls, WORKER_IP_CALLS);
return;

View File

@ -35,6 +35,7 @@
#include <cstring>
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <mpi.h>
@ -228,6 +229,8 @@ inline double RunMasterLoop(SimParams &params, RInsidePOET &R,
R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
R["dht_corrupt_buckets"] = Rcpp::wrap(chem.GetWorkerDHTCorruptBuckets());
R.parseEvalQ("profiling$dht_corrupt_buckets <- dht_corrupt_buckets");
R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());