feat: add index tracking and flushing functionality to DHT

This commit is contained in:
Max Lübke 2025-01-09 12:18:05 +01:00
parent dffbec674c
commit bcab85c331
4 changed files with 64 additions and 41 deletions

View File

@ -105,6 +105,8 @@ DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
object->index_count = 9 - (index_bytes / 8);
object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t));
object->mem_alloc = mem_alloc;
object->sum_idx = 0;
object->cnt_idx = 0;
// if set, initialize dht_stats
#ifdef DHT_STATISTICS
@ -189,6 +191,9 @@ int DHT_write_accumulate(DHT *table, const void *send_key, int data_size,
}
}
table->cnt_idx += 1;
table->sum_idx += (i + 1);
if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) {
memset((char *)table->send_entry + 1 + table->key_size, '\0',
table->data_size);
@ -279,6 +284,9 @@ int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc,
}
}
table->cnt_idx += 1;
table->sum_idx += (i + 1);
// put data to DHT (with last selected index by value i)
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
@ -550,6 +558,41 @@ int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
return DHT_SUCCESS;
}
float DHT_get_used_idx_factor(DHT *table, int with_reset) {
int rank;
MPI_Comm_rank(table->communicator, &rank);
float my_avg_idx = (float)table->sum_idx / (float)table->cnt_idx;
float max_mean_index;
MPI_Reduce(&my_avg_idx, &max_mean_index, 1, MPI_FLOAT, MPI_MAX, 0,
table->communicator);
MPI_Bcast(&max_mean_index, 1, MPI_FLOAT, 0, table->communicator);
if (!!with_reset) {
table->sum_idx = 0;
table->cnt_idx = 0;
}
return max_mean_index;
}
int DHT_flush(DHT *table) {
// make sure all processes are synchronized
MPI_Barrier(table->communicator);
// wipe local memory with zeros
memset(table->mem_alloc, '\0',
table->table_size * (1 + table->data_size + table->key_size));
table->sum_idx = 0;
table->cnt_idx = 0;
return DHT_SUCCESS;
}
int DHT_print_statistics(DHT *table) {
#ifdef DHT_STATISTICS
int *written_buckets;

View File

@ -117,6 +117,9 @@ typedef struct {
unsigned int index_count;
int (*accumulate_callback)(int, void *, int, void *);
size_t sum_idx;
size_t cnt_idx;
#ifdef DHT_STATISTICS
/** Detailed statistics of the usage of the DHT. */
DHT_stats *stats;
@ -125,10 +128,11 @@ typedef struct {
extern void DHT_set_accumulate_callback(DHT *table,
int (*callback_func)(int, void *, int,
void *));
void *));
extern int DHT_write_accumulate(DHT *table, const void *key, int send_size,
void *data, uint32_t *proc, uint32_t *index, int *callback_ret);
void *data, uint32_t *proc, uint32_t *index,
int *callback_ret);
/**
* @brief Create a DHT.
@ -284,44 +288,8 @@ extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter);
*/
extern int DHT_print_statistics(DHT *table);
/**
* @brief Determine destination rank and index.
*
* This is done by looping over all possbile indices. First of all, set a
* temporary index to zero and copy count of bytes for each index into the
* memory area of the temporary index. After that the current index is
* calculated by the temporary index modulo the table size. The destination rank
* of the process is simply determined by hash modulo the communicator size.
*
* @param hash Calculated 64 bit hash.
* @param comm_size Communicator size.
* @param table_size Count of buckets per process.
* @param dest_rank Reference to the destination rank variable.
* @param index Pointer to the array index.
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int *dest_rank,
uint64_t *index, unsigned int index_count);
extern float DHT_get_used_idx_factor(DHT *table, int with_reset);
/**
* @brief Set the occupied flag.
*
* This will set the first bit of a bucket to 1.
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char *flag_byte);
/**
* @brief Get the occupied flag.
*
* This function determines whether the occupied flag of a bucket was set or
* not.
*
* @param flag_byte First byte of a bucket.
* @return int Returns 1 for true or 0 for false.
*/
static int read_flag(char flag_byte);
extern int DHT_flush(DHT *table);
#endif /* DHT_H */

View File

@ -166,6 +166,8 @@ public:
enum result_status { RES_OK, INSUFFICIENT_DATA, NOT_NEEDED };
DHT *getDHTObject() { return this->pht->getDHTObject(); }
struct InterpolationResult {
std::vector<std::vector<double>> results;
std::vector<result_status> status;

View File

@ -9,7 +9,6 @@
#include <cstdint>
#include <iomanip>
#include <iostream>
#include <map>
#include <mpi.h>
#include <stdexcept>
#include <string>
@ -247,6 +246,17 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
<< std::setw(this->file_pad) << iteration << ".pht";
interp->dumpPHTState(out.str());
}
const auto max_mean_idx =
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
if (max_mean_idx >= 2) {
DHT_flush(this->interp->getDHTObject());
DHT_flush(this->dht->getDHT());
if (this->comm_rank == 2) {
std::cout << "Flushed both DHT and PHT!\n\n";
}
}
}
RInsidePOET::getInstance().parseEvalQ("gc()");