From e603a5a995fa14aa8750772d26dad501c96e6304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbke?= Date: Thu, 9 Jan 2025 12:18:05 +0100 Subject: [PATCH] feat: add index tracking and flushing functionality to DHT --- src/Chemistry/SurrogateModels/DHT.c | 43 +++++++++++++++++ src/Chemistry/SurrogateModels/DHT.h | 48 ++++--------------- .../SurrogateModels/Interpolation.hpp | 2 + src/Chemistry/WorkerFunctions.cpp | 12 ++++- 4 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/Chemistry/SurrogateModels/DHT.c b/src/Chemistry/SurrogateModels/DHT.c index e17ff6fa0..465ba7a13 100644 --- a/src/Chemistry/SurrogateModels/DHT.c +++ b/src/Chemistry/SurrogateModels/DHT.c @@ -105,6 +105,8 @@ DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size, object->index_count = 9 - (index_bytes / 8); object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t)); object->mem_alloc = mem_alloc; + object->sum_idx = 0; + object->cnt_idx = 0; // if set, initialize dht_stats #ifdef DHT_STATISTICS @@ -189,6 +191,9 @@ int DHT_write_accumulate(DHT *table, const void *send_key, int data_size, } } + table->cnt_idx += 1; + table->sum_idx += (i + 1); + if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) { memset((char *)table->send_entry + 1 + table->key_size, '\0', table->data_size); @@ -279,6 +284,9 @@ int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc, } } + table->cnt_idx += 1; + table->sum_idx += (i + 1); + // put data to DHT (with last selected index by value i) if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, MPI_BYTE, dest_rank, table->index[i], @@ -550,6 +558,41 @@ int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) { return DHT_SUCCESS; } +float DHT_get_used_idx_factor(DHT *table, int with_reset) { + int rank; + MPI_Comm_rank(table->communicator, &rank); + + float my_avg_idx = (float)table->sum_idx / (float)table->cnt_idx; + + float max_mean_index; + + MPI_Reduce(&my_avg_idx, &max_mean_index, 1, MPI_FLOAT, MPI_MAX, 0, + table->communicator); + + MPI_Bcast(&max_mean_index, 1, MPI_FLOAT, 0, table->communicator); + + if (!!with_reset) { + table->sum_idx = 0; + table->cnt_idx = 0; + } + + return max_mean_index; +} + +int DHT_flush(DHT *table) { + // make sure all processes are synchronized + MPI_Barrier(table->communicator); + + // wipe local memory with zeros + memset(table->mem_alloc, '\0', + table->table_size * (1 + table->data_size + table->key_size)); + + table->sum_idx = 0; + table->cnt_idx = 0; + + return DHT_SUCCESS; +} + int DHT_print_statistics(DHT *table) { #ifdef DHT_STATISTICS int *written_buckets; diff --git a/src/Chemistry/SurrogateModels/DHT.h b/src/Chemistry/SurrogateModels/DHT.h index ba0022bff..b0822ed88 100644 --- a/src/Chemistry/SurrogateModels/DHT.h +++ b/src/Chemistry/SurrogateModels/DHT.h @@ -117,6 +117,9 @@ typedef struct { unsigned int index_count; int (*accumulate_callback)(int, void *, int, void *); + + size_t sum_idx; + size_t cnt_idx; #ifdef DHT_STATISTICS /** Detailed statistics of the usage of the DHT. */ DHT_stats *stats; @@ -125,10 +128,11 @@ typedef struct { extern void DHT_set_accumulate_callback(DHT *table, int (*callback_func)(int, void *, int, - void *)); + void *)); extern int DHT_write_accumulate(DHT *table, const void *key, int send_size, - void *data, uint32_t *proc, uint32_t *index, int *callback_ret); + void *data, uint32_t *proc, uint32_t *index, + int *callback_ret); /** * @brief Create a DHT. @@ -284,44 +288,8 @@ extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter); */ extern int DHT_print_statistics(DHT *table); -/** - * @brief Determine destination rank and index. - * - * This is done by looping over all possbile indices. First of all, set a - * temporary index to zero and copy count of bytes for each index into the - * memory area of the temporary index. After that the current index is - * calculated by the temporary index modulo the table size. The destination rank - * of the process is simply determined by hash modulo the communicator size. - * - * @param hash Calculated 64 bit hash. - * @param comm_size Communicator size. - * @param table_size Count of buckets per process. - * @param dest_rank Reference to the destination rank variable. - * @param index Pointer to the array index. - * @param index_count Count of possible indeces. - */ -static void determine_dest(uint64_t hash, int comm_size, - unsigned int table_size, unsigned int *dest_rank, - uint64_t *index, unsigned int index_count); +extern float DHT_get_used_idx_factor(DHT *table, int with_reset); -/** - * @brief Set the occupied flag. - * - * This will set the first bit of a bucket to 1. - * - * @param flag_byte First byte of a bucket. - */ -static void set_flag(char *flag_byte); - -/** - * @brief Get the occupied flag. - * - * This function determines whether the occupied flag of a bucket was set or - * not. - * - * @param flag_byte First byte of a bucket. - * @return int Returns 1 for true or 0 for false. - */ -static int read_flag(char flag_byte); +extern int DHT_flush(DHT *table); #endif /* DHT_H */ diff --git a/src/Chemistry/SurrogateModels/Interpolation.hpp b/src/Chemistry/SurrogateModels/Interpolation.hpp index efa79e4f0..c00daf4eb 100644 --- a/src/Chemistry/SurrogateModels/Interpolation.hpp +++ b/src/Chemistry/SurrogateModels/Interpolation.hpp @@ -166,6 +166,8 @@ public: enum result_status { RES_OK, INSUFFICIENT_DATA, NOT_NEEDED }; + DHT *getDHTObject() { return this->pht->getDHTObject(); } + struct InterpolationResult { std::vector> results; std::vector status; diff --git a/src/Chemistry/WorkerFunctions.cpp b/src/Chemistry/WorkerFunctions.cpp index a971ddd6c..b7eb6096c 100644 --- a/src/Chemistry/WorkerFunctions.cpp +++ b/src/Chemistry/WorkerFunctions.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -247,6 +246,17 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, << std::setw(this->file_pad) << iteration << ".pht"; interp->dumpPHTState(out.str()); } + + const auto max_mean_idx = + DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); + + if (max_mean_idx >= 2) { + DHT_flush(this->interp->getDHTObject()); + DHT_flush(this->dht->getDHT()); + if (this->comm_rank == 2) { + std::cout << "Flushed both DHT and PHT!\n\n"; + } + } } RInsidePOET::getInstance().parseEvalQ("gc()");