From c040f9523566ce8f6ed73649c61f43b289271ddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20L=C3=BCbke?= Date: Thu, 6 Mar 2025 09:04:23 +0100 Subject: [PATCH] feat: Integrate DHT library as a submodule --- .gitmodules | 3 + CMakeLists.txt | 5 +- ext/dht | 1 + src/CMakeLists.txt | 3 +- src/Chemistry/SurrogateModels/DHT.c | 703 ------------------ src/Chemistry/SurrogateModels/DHT.h | 295 -------- src/Chemistry/SurrogateModels/DHT_Wrapper.cpp | 29 +- src/Chemistry/SurrogateModels/DHT_Wrapper.hpp | 10 +- .../SurrogateModels/Interpolation.hpp | 16 +- .../SurrogateModels/InterpolationModule.cpp | 9 +- .../SurrogateModels/ProximityHashTable.cpp | 129 +--- src/Chemistry/WorkerFunctions.cpp | 11 - 12 files changed, 75 insertions(+), 1139 deletions(-) create mode 160000 ext/dht delete mode 100644 src/Chemistry/SurrogateModels/DHT.c delete mode 100644 src/Chemistry/SurrogateModels/DHT.h diff --git a/.gitmodules b/.gitmodules index 26a3acc7f..91883e1a9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -5,3 +5,6 @@ [submodule "ext/iphreeqc"] path = ext/iphreeqc url = ../iphreeqc.git +[submodule "ext/dht"] + path = ext/dht + url = https://gitup.uni-potsdam.de/mluebke/dht_ucx.git diff --git a/CMakeLists.txt b/CMakeLists.txt index fa7f009a1..1e0b5b227 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,8 +19,6 @@ find_package(MPI REQUIRED) find_package(RRuntime REQUIRED) -add_subdirectory(src) - option(POET_PREPROCESS_BENCHS "Preprocess benchmarks" ON) if (POET_PREPROCESS_BENCHS) @@ -32,6 +30,9 @@ set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE) add_subdirectory(ext/tug EXCLUDE_FROM_ALL) add_subdirectory(ext/iphreeqc EXCLUDE_FROM_ALL) +add_subdirectory(ext/dht EXCLUDE_FROM_ALL) + +add_subdirectory(src) option(POET_ENABLE_TESTING "Build test suite for POET" OFF) diff --git a/ext/dht b/ext/dht new file mode 160000 index 000000000..cc9f027d3 --- /dev/null +++ b/ext/dht @@ -0,0 +1 @@ +Subproject commit cc9f027d35d771c2ef1c547b8403f642ae781e48 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7e740a0cd..f8d36b1c1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,7 +9,6 @@ add_library(POETLib Chemistry/MasterFunctions.cpp Chemistry/WorkerFunctions.cpp Chemistry/SurrogateModels/DHT_Wrapper.cpp - Chemistry/SurrogateModels/DHT.c Chemistry/SurrogateModels/HashFunctions.cpp Chemistry/SurrogateModels/InterpolationModule.cpp Chemistry/SurrogateModels/ProximityHashTable.cpp @@ -22,6 +21,8 @@ target_link_libraries( PUBLIC IPhreeqcPOET PUBLIC tug PUBLIC MPI::MPI_C + PUBLIC LUCX::PHT + PUBLIC LUCX::nolock ) include(FetchContent) diff --git a/src/Chemistry/SurrogateModels/DHT.c b/src/Chemistry/SurrogateModels/DHT.c deleted file mode 100644 index 465ba7a13..000000000 --- a/src/Chemistry/SurrogateModels/DHT.c +++ /dev/null @@ -1,703 +0,0 @@ -/* -** Copyright (C) 2017-2021 Max Luebke (University of Potsdam) -** -** POET is free software; you can redistribute it and/or modify it under the -** terms of the GNU General Public License as published by the Free Software -** Foundation; either version 2 of the License, or (at your option) any later -** version. -** -** POET is distributed in the hope that it will be useful, but WITHOUT ANY -** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -** A PARTICULAR PURPOSE. See the GNU General Public License for more details. -** -** You should have received a copy of the GNU General Public License along with -** this program; if not, write to the Free Software Foundation, Inc., 51 -** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -*/ - -#include "DHT.h" - -#include - -#include -#include -#include -#include -#include -#include -#include - -static void determine_dest(uint64_t hash, int comm_size, - unsigned int table_size, unsigned int *dest_rank, - uint64_t *index, unsigned int index_count) { - /** temporary index */ - uint64_t tmp_index; - /** how many bytes do we need for one index? */ - int index_size = sizeof(double) - (index_count - 1); - for (int i = 0; i < index_count; i++) { - tmp_index = (hash >> (i * 8)) & ((1ULL << (index_size * 8)) - 1); - /* memcpy(&tmp_index, (char *)&hash + i, index_size); */ - index[i] = (uint64_t)(tmp_index % table_size); - } - *dest_rank = (unsigned int)(hash % comm_size); -} - -static void set_flag(char *flag_byte) { - *flag_byte = 0; - *flag_byte |= (1 << 0); -} - -static int read_flag(char flag_byte) { - if ((flag_byte & 0x01) == 0x01) { - return 1; - } else - return 0; -} - -DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size, - unsigned int key_size, - uint64_t (*hash_func)(int, const void *)) { - DHT *object; - MPI_Win window; - void *mem_alloc; - int comm_size, index_bytes; - - // calculate how much bytes for the index are needed to address count of - // buckets per process - index_bytes = (int)ceil(log2(size)); - if (index_bytes % 8 != 0) - index_bytes = index_bytes + (8 - (index_bytes % 8)); - - // allocate memory for dht-object - object = (DHT *)malloc(sizeof(DHT)); - if (object == NULL) - return NULL; - - // every memory allocation has 1 additional byte for flags etc. - if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL, - &mem_alloc) != 0) - return NULL; - if (MPI_Comm_size(comm, &comm_size) != 0) - return NULL; - - // since MPI_Alloc_mem doesn't provide memory allocation with the memory set - // to zero, we're doing this here - memset(mem_alloc, '\0', size * (1 + data_size + key_size)); - - // create windows on previously allocated memory - if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size), - (1 + data_size + key_size), MPI_INFO_NULL, comm, - &window) != 0) - return NULL; - - // fill dht-object - object->data_size = data_size; - object->key_size = key_size; - object->table_size = size; - object->window = window; - object->hash_func = hash_func; - object->comm_size = comm_size; - object->communicator = comm; - object->read_misses = 0; - object->evictions = 0; - object->recv_entry = malloc(1 + data_size + key_size); - object->send_entry = malloc(1 + data_size + key_size); - object->index_count = 9 - (index_bytes / 8); - object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t)); - object->mem_alloc = mem_alloc; - object->sum_idx = 0; - object->cnt_idx = 0; - - // if set, initialize dht_stats -#ifdef DHT_STATISTICS - DHT_stats *stats; - - stats = (DHT_stats *)malloc(sizeof(DHT_stats)); - if (stats == NULL) - return NULL; - - object->stats = stats; - object->stats->writes_local = (int *)calloc(comm_size, sizeof(int)); - object->stats->old_writes = 0; - object->stats->read_misses = 0; - object->stats->evictions = 0; - object->stats->w_access = 0; - object->stats->r_access = 0; -#endif - - return object; -} - -void DHT_set_accumulate_callback(DHT *table, - int (*callback_func)(int, void *, int, - void *)) { - table->accumulate_callback = callback_func; -} - -int DHT_write_accumulate(DHT *table, const void *send_key, int data_size, - void *send_data, uint32_t *proc, uint32_t *index, - int *callback_ret) { - unsigned int dest_rank, i; - int result = DHT_SUCCESS; - -#ifdef DHT_STATISTICS - table->stats->w_access++; -#endif - - // determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, - table->table_size, &dest_rank, table->index, - table->index_count); - - // concatenating key with data to write entry to DHT - set_flag((char *)table->send_entry); - memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size); - /* memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data, - */ - /* table->data_size); */ - - // locking window of target rank with exclusive lock - if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0) - return DHT_MPI_ERROR; - for (i = 0; i < table->index_count; i++) { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, - MPI_BYTE, dest_rank, table->index[i], - 1 + table->data_size + table->key_size, MPI_BYTE, - table->window) != 0) - return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - - // increment eviction counter if receiving key doesn't match sending key - // entry has write flag and last index is reached. - if (read_flag(*(char *)table->recv_entry)) { - if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) != - 0) { - if (i == (table->index_count) - 1) { - table->evictions += 1; -#ifdef DHT_STATISTICS - table->stats->evictions += 1; -#endif - result = DHT_WRITE_SUCCESS_WITH_EVICTION; - break; - } - } else - break; - } else { -#ifdef DHT_STATISTICS - table->stats->writes_local[dest_rank]++; -#endif - break; - } - } - - table->cnt_idx += 1; - table->sum_idx += (i + 1); - - if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) { - memset((char *)table->send_entry + 1 + table->key_size, '\0', - table->data_size); - } else { - memcpy((char *)table->send_entry + 1 + table->key_size, - (char *)table->recv_entry + 1 + table->key_size, table->data_size); - } - - *callback_ret = table->accumulate_callback( - data_size, (char *)send_data, table->data_size, - (char *)table->send_entry + 1 + table->key_size); - - // put data to DHT (with last selected index by value i) - if (*callback_ret == 0) { - if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, - MPI_BYTE, dest_rank, table->index[i], - 1 + table->data_size + table->key_size, MPI_BYTE, - table->window) != 0) - return DHT_MPI_ERROR; - } - // unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - - if (proc) { - *proc = dest_rank; - } - - if (index) { - *index = table->index[i]; - } - - return result; -} - -int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc, - uint32_t *index) { - unsigned int dest_rank, i; - int result = DHT_SUCCESS; - -#ifdef DHT_STATISTICS - table->stats->w_access++; -#endif - - // determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, - table->table_size, &dest_rank, table->index, - table->index_count); - - // concatenating key with data to write entry to DHT - set_flag((char *)table->send_entry); - memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size); - memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data, - table->data_size); - - // locking window of target rank with exclusive lock - if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0) - return DHT_MPI_ERROR; - for (i = 0; i < table->index_count; i++) { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, - MPI_BYTE, dest_rank, table->index[i], - 1 + table->data_size + table->key_size, MPI_BYTE, - table->window) != 0) - return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - - // increment eviction counter if receiving key doesn't match sending key - // entry has write flag and last index is reached. - if (read_flag(*(char *)table->recv_entry)) { - if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) != - 0) { - if (i == (table->index_count) - 1) { - table->evictions += 1; -#ifdef DHT_STATISTICS - table->stats->evictions += 1; -#endif - result = DHT_WRITE_SUCCESS_WITH_EVICTION; - break; - } - } else - break; - } else { -#ifdef DHT_STATISTICS - table->stats->writes_local[dest_rank]++; -#endif - break; - } - } - - table->cnt_idx += 1; - table->sum_idx += (i + 1); - - // put data to DHT (with last selected index by value i) - if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, - MPI_BYTE, dest_rank, table->index[i], - 1 + table->data_size + table->key_size, MPI_BYTE, - table->window) != 0) - return DHT_MPI_ERROR; - // unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - - if (proc) { - *proc = dest_rank; - } - - if (index) { - *index = table->index[i]; - } - - return result; -} - -int DHT_read(DHT *table, const void *send_key, void *destination) { - unsigned int dest_rank, i; - -#ifdef DHT_STATISTICS - table->stats->r_access++; -#endif - - // determine destination rank and index by hash of key - determine_dest(table->hash_func(table->key_size, send_key), table->comm_size, - table->table_size, &dest_rank, table->index, - table->index_count); - - // locking window of target rank with shared lock - if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0) - return DHT_MPI_ERROR; - // receive data - for (i = 0; i < table->index_count; i++) { - if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size, - MPI_BYTE, dest_rank, table->index[i], - 1 + table->data_size + table->key_size, MPI_BYTE, - table->window) != 0) - return DHT_MPI_ERROR; - if (MPI_Win_flush(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - - // increment read error counter if write flag isn't set ... - if ((read_flag(*(char *)table->recv_entry)) == 0) { - table->read_misses += 1; -#ifdef DHT_STATISTICS - table->stats->read_misses += 1; -#endif - // unlock window and return - if (MPI_Win_unlock(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - return DHT_READ_MISS; - } - - // ... or key doesn't match passed by key and last index reached. - if (memcmp(((char *)table->recv_entry) + 1, send_key, table->key_size) != - 0) { - if (i == (table->index_count) - 1) { - table->read_misses += 1; -#ifdef DHT_STATISTICS - table->stats->read_misses += 1; -#endif - // unlock window an return - if (MPI_Win_unlock(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - return DHT_READ_MISS; - } - } else - break; - } - - // unlock window of target rank - if (MPI_Win_unlock(dest_rank, table->window) != 0) - return DHT_MPI_ERROR; - - // if matching key was found copy data into memory of passed pointer - memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1, - table->data_size); - - return DHT_SUCCESS; -} - -int DHT_read_location(DHT *table, uint32_t proc, uint32_t index, - void *destination) { - const uint32_t bucket_size = table->data_size + table->key_size + 1; - -#ifdef DHT_STATISTICS - table->stats->r_access++; -#endif - - // locking window of target rank with shared lock - if (MPI_Win_lock(MPI_LOCK_SHARED, proc, 0, table->window) != 0) - return DHT_MPI_ERROR; - // receive data - if (MPI_Get(table->recv_entry, bucket_size, MPI_BYTE, proc, index, - bucket_size, MPI_BYTE, table->window) != 0) { - return DHT_MPI_ERROR; - } - - // unlock window of target rank - if (MPI_Win_unlock(proc, table->window) != 0) - return DHT_MPI_ERROR; - - // if matching key was found copy data into memory of passed pointer - memcpy((char *)destination, (char *)table->recv_entry + 1 + table->key_size, - table->data_size); - - return DHT_SUCCESS; -} - -int DHT_to_file(DHT *table, const char *filename) { - // open file - MPI_File file; - if (MPI_File_open(table->communicator, filename, - MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, - &file) != 0) - return DHT_FILE_IO_ERROR; - - int rank; - MPI_Comm_rank(table->communicator, &rank); - - // write header (key_size and data_size) - if (rank == 0) { - if (MPI_File_write_shared(file, &table->key_size, 1, MPI_INT, - MPI_STATUS_IGNORE) != 0) - return DHT_FILE_WRITE_ERROR; - if (MPI_File_write_shared(file, &table->data_size, 1, MPI_INT, - MPI_STATUS_IGNORE) != 0) - return DHT_FILE_WRITE_ERROR; - } - - MPI_Barrier(table->communicator); - - char *ptr; - int bucket_size = table->key_size + table->data_size + 1; - - // iterate over local memory - for (unsigned int i = 0; i < table->table_size; i++) { - ptr = (char *)table->mem_alloc + (i * bucket_size); - // if bucket has been written to (checked by written_flag)... - if (read_flag(*ptr)) { - // write key and data to file - if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE, - MPI_STATUS_IGNORE) != 0) - return DHT_FILE_WRITE_ERROR; - } - } - - MPI_Barrier(table->communicator); - - // close file - if (MPI_File_close(&file) != 0) - return DHT_FILE_IO_ERROR; - - return DHT_SUCCESS; -} - -int DHT_from_file(DHT *table, const char *filename) { - MPI_File file; - MPI_Offset f_size; - int bucket_size, buffer_size, cur_pos, rank, offset; - char *buffer; - void *key; - void *data; - - // open file - if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY, - MPI_INFO_NULL, &file) != 0) - return DHT_FILE_IO_ERROR; - - // get file size - if (MPI_File_get_size(file, &f_size) != 0) - return DHT_FILE_IO_ERROR; - - MPI_Comm_rank(table->communicator, &rank); - - // calculate bucket size - bucket_size = table->key_size + table->data_size; - // buffer size is either bucket size or, if bucket size is smaller than the - // file header, the size of DHT_FILEHEADER_SIZE - buffer_size = - bucket_size > DHT_FILEHEADER_SIZE ? bucket_size : DHT_FILEHEADER_SIZE; - // allocate buffer - buffer = (char *)malloc(buffer_size); - - // read file header - if (MPI_File_read(file, buffer, DHT_FILEHEADER_SIZE, MPI_BYTE, - MPI_STATUS_IGNORE) != 0) - return DHT_FILE_READ_ERROR; - - // compare if written header data and key size matches current sizes - if (*(int *)buffer != table->key_size) - return DHT_WRONG_FILE; - if (*(int *)(buffer + 4) != table->data_size) - return DHT_WRONG_FILE; - - // set offset for each process - offset = bucket_size * table->comm_size; - - // seek behind header of DHT file - if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0) - return DHT_FILE_IO_ERROR; - - // current position is rank * bucket_size + OFFSET - cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size); - - // loop over file and write data to DHT with DHT_write - while (cur_pos < f_size) { - if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0) - return DHT_FILE_IO_ERROR; - // TODO: really necessary? - MPI_Offset tmp; - MPI_File_get_position(file, &tmp); - if (MPI_File_read(file, buffer, bucket_size, MPI_BYTE, MPI_STATUS_IGNORE) != - 0) - return DHT_FILE_READ_ERROR; - // extract key and data and write to DHT - key = buffer; - data = (buffer + table->key_size); - if (DHT_write(table, key, data, NULL, NULL) == DHT_MPI_ERROR) - return DHT_MPI_ERROR; - - // increment current position - cur_pos += offset; - } - - free(buffer); - if (MPI_File_close(&file) != 0) - return DHT_FILE_IO_ERROR; - - return DHT_SUCCESS; -} - -int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) { - int buf; - - if (eviction_counter != NULL) { - buf = 0; - if (MPI_Reduce(&table->evictions, &buf, 1, MPI_INT, MPI_SUM, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - *eviction_counter = buf; - } - if (readerror_counter != NULL) { - buf = 0; - if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - *readerror_counter = buf; - } - if (MPI_Win_free(&(table->window)) != 0) - return DHT_MPI_ERROR; - if (MPI_Free_mem(table->mem_alloc) != 0) - return DHT_MPI_ERROR; - free(table->recv_entry); - free(table->send_entry); - free(table->index); - -#ifdef DHT_STATISTICS - free(table->stats->writes_local); - free(table->stats); -#endif - free(table); - - return DHT_SUCCESS; -} - -float DHT_get_used_idx_factor(DHT *table, int with_reset) { - int rank; - MPI_Comm_rank(table->communicator, &rank); - - float my_avg_idx = (float)table->sum_idx / (float)table->cnt_idx; - - float max_mean_index; - - MPI_Reduce(&my_avg_idx, &max_mean_index, 1, MPI_FLOAT, MPI_MAX, 0, - table->communicator); - - MPI_Bcast(&max_mean_index, 1, MPI_FLOAT, 0, table->communicator); - - if (!!with_reset) { - table->sum_idx = 0; - table->cnt_idx = 0; - } - - return max_mean_index; -} - -int DHT_flush(DHT *table) { - // make sure all processes are synchronized - MPI_Barrier(table->communicator); - - // wipe local memory with zeros - memset(table->mem_alloc, '\0', - table->table_size * (1 + table->data_size + table->key_size)); - - table->sum_idx = 0; - table->cnt_idx = 0; - - return DHT_SUCCESS; -} - -int DHT_print_statistics(DHT *table) { -#ifdef DHT_STATISTICS - int *written_buckets; - int *read_misses, sum_read_misses; - int *evictions, sum_evictions; - int sum_w_access, sum_r_access, *w_access, *r_access; - int rank; - MPI_Comm_rank(table->communicator, &rank); - -// disable possible warning of unitialized variable, which is not the case -// since we're using MPI_Gather to obtain all values only on rank 0 -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wmaybe-uninitialized" - - // obtaining all values from all processes in the communicator - if (rank == 0) - read_misses = (int *)malloc(table->comm_size * sizeof(int)); - if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1, - MPI_INT, 0, table->communicator) != 0) - return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT, - MPI_SUM, 0, table->communicator) != 0) - return DHT_MPI_ERROR; - table->stats->read_misses = 0; - - if (rank == 0) - evictions = (int *)malloc(table->comm_size * sizeof(int)); - if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM, - 0, table->communicator) != 0) - return DHT_MPI_ERROR; - table->stats->evictions = 0; - - if (rank == 0) - w_access = (int *)malloc(table->comm_size * sizeof(int)); - if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - table->stats->w_access = 0; - - if (rank == 0) - r_access = (int *)malloc(table->comm_size * sizeof(int)); - if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0, - table->communicator) != 0) - return DHT_MPI_ERROR; - table->stats->r_access = 0; - - if (rank == 0) - written_buckets = (int *)calloc(table->comm_size, sizeof(int)); - if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size, - MPI_INT, MPI_SUM, 0, table->communicator) != 0) - return DHT_MPI_ERROR; - - if (rank == 0) { // only process with rank 0 will print out results as a - // table - int sum_written_buckets = 0; - - for (int i = 0; i < table->comm_size; i++) { - sum_written_buckets += written_buckets[i]; - } - - int members = 7; - int padsize = (members * 12) + 1; - char pad[padsize + 1]; - - memset(pad, '-', padsize * sizeof(char)); - pad[padsize] = '\0'; - printf("\n"); - printf("%-35s||resets with every call of this function\n", " "); - printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied", - "free", "w_access", "r_access", "read misses", "evictions"); - printf("%s\n", pad); - for (int i = 0; i < table->comm_size; i++) { - printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", i, - written_buckets[i], table->table_size - written_buckets[i], - w_access[i], r_access[i], read_misses[i], evictions[i]); - } - printf("%s\n", pad); - printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", "sum", - sum_written_buckets, - (table->table_size * table->comm_size) - sum_written_buckets, - sum_w_access, sum_r_access, sum_read_misses, sum_evictions); - - printf("%s\n", pad); - printf("%s %d\n", - "new entries:", sum_written_buckets - table->stats->old_writes); - - printf("\n"); - fflush(stdout); - - table->stats->old_writes = sum_written_buckets; - } - -// enable warning again -#pragma GCC diagnostic pop - - MPI_Barrier(table->communicator); - return DHT_SUCCESS; -#endif -} diff --git a/src/Chemistry/SurrogateModels/DHT.h b/src/Chemistry/SurrogateModels/DHT.h deleted file mode 100644 index b0822ed88..000000000 --- a/src/Chemistry/SurrogateModels/DHT.h +++ /dev/null @@ -1,295 +0,0 @@ -/* -** Copyright (C) 2017-2021 Max Luebke (University of Potsdam) -** -** POET is free software; you can redistribute it and/or modify it under the -** terms of the GNU General Public License as published by the Free Software -** Foundation; either version 2 of the License, or (at your option) any later -** version. -** -** POET is distributed in the hope that it will be useful, but WITHOUT ANY -** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -** A PARTICULAR PURPOSE. See the GNU General Public License for more details. -** -** You should have received a copy of the GNU General Public License along with -** this program; if not, write to the Free Software Foundation, Inc., 51 -** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -*/ - -/** - * @file DHT.h - * @author Max Lübke (mluebke@uni-potsdam.de) - * @brief API to interact with the DHT - * @version 0.1 - * @date 16 Nov 2017 - * - * This file implements the creation of a DHT by using the MPI - * one-sided-communication. There is also the possibility to write or read data - * from or to the DHT. In addition, the current state of the DHT can be written - * to a file and read in again later. - */ - -#ifndef DHT_H -#define DHT_H - -#include -#include - -/** Returned if some error in MPI routine occurs. */ -#define DHT_MPI_ERROR -1 -/** Returned by a call of DHT_read if no bucket with given key was found. */ -#define DHT_READ_MISS -2 -/** Returned by DHT_write if a bucket was evicted. */ -#define DHT_WRITE_SUCCESS_WITH_EVICTION -3 -/** Returned when no errors occured. */ -#define DHT_SUCCESS 0 - -/** Returned by DHT_from_file if the given file does not match expected file. */ -#define DHT_WRONG_FILE -11 -/** Returned by DHT file operations if MPI file operation fails. */ -#define DHT_FILE_IO_ERROR -12 -/** Returned by DHT file operations if error occured in MPI_Read operation. */ -#define DHT_FILE_READ_ERROR -13 -/** Returned by DHT file operations if error occured in MPI_Write operation. */ -#define DHT_FILE_WRITE_ERROR -14 - -/** Size of the file header in byte. */ -#define DHT_FILEHEADER_SIZE 8 - -/** - * Internal struct to store statistics about read and write accesses and also - * read misses and evictions. - * All values will be resetted to zero after a call of - * DHT_print_statistics(). - * Internal use only! - * - * @todo There's maybe a better solution than DHT_print_statistics and this - * struct - */ -typedef struct { - /** Count of writes to specific process this process did. */ - int *writes_local; - /** Writes after last call of DHT_print_statistics. */ - int old_writes; - /** How many read misses occur? */ - int read_misses; - /** How many buckets where evicted? */ - int evictions; - /** How many calls of DHT_write() did this process? */ - int w_access; - /** How many calls of DHT_read() did this process? */ - int r_access; -} DHT_stats; - -/** - * Struct which serves as a handler or so called \a DHT-object. Will - * be created by DHT_create and must be passed as a parameter to every following - * function. Stores all relevant data. - * Do not touch outside DHT functions! - */ -typedef struct { - /** Created MPI Window, which serves as the DHT memory area of the process. */ - MPI_Win window; - /** Size of the data of a bucket entry in byte. */ - int data_size; - /** Size of the key of a bucket entry in byte. */ - int key_size; - /** Count of buckets for each process. */ - unsigned int table_size; - /** MPI communicator of all participating processes. */ - MPI_Comm communicator; - /** Size of the MPI communicator respectively all participating processes. */ - int comm_size; - /** Pointer to a hashfunction. */ - uint64_t (*hash_func)(int, const void *); - /** Pre-allocated memory where a bucket can be received. */ - void *recv_entry; - /** Pre-allocated memory where a bucket to send can be stored. */ - void *send_entry; - /** Allocated memory on which the MPI window was created. */ - void *mem_alloc; - /** Count of read misses over all time. */ - int read_misses; - /** Count of evictions over all time. */ - int evictions; - /** Array of indeces where a bucket can be stored. */ - uint64_t *index; - /** Count of possible indeces. */ - unsigned int index_count; - - int (*accumulate_callback)(int, void *, int, void *); - - size_t sum_idx; - size_t cnt_idx; -#ifdef DHT_STATISTICS - /** Detailed statistics of the usage of the DHT. */ - DHT_stats *stats; -#endif -} DHT; - -extern void DHT_set_accumulate_callback(DHT *table, - int (*callback_func)(int, void *, int, - void *)); - -extern int DHT_write_accumulate(DHT *table, const void *key, int send_size, - void *data, uint32_t *proc, uint32_t *index, - int *callback_ret); - -/** - * @brief Create a DHT. - * - * When calling this function, the required memory is allocated and a - * MPI_Window is created. This allows the execution of MPI_Get and - * MPI_Put operations for one-sided communication. Then the number of - * indexes is calculated and finally all relevant data is entered into the - * \a DHT-object which is returned. - * - * @param comm MPI communicator which addresses all participating process of the - * DHT. - * @param size_per_process Number of buckets per process. - * @param data_size Size of data in byte. - * @param key_size Size of the key in byte. - * @param hash_func Pointer to a hash function. This function must take the size - * of the key and a pointer to the key as input parameters and return a 64 bit - * hash. - * @return DHT* The returned value is the \a DHT-object which serves as a handle - * for all DHT operations. If an error occured NULL is returned. - */ -extern DHT *DHT_create(MPI_Comm comm, uint64_t size_per_process, - unsigned int data_size, unsigned int key_size, - uint64_t (*hash_func)(int, const void *)); - -/** - * @brief Write data into DHT. - * - * When DHT_write is called, the address window is locked with a - * LOCK_EXCLUSIVE for write access. Now the first bucket is received - * using MPI_Get and it is checked if the bucket is empty or if the received key - * matches the passed key. If this is the case, the data of the bucket is - * overwritten with the new value. If not, the function continues with the next - * index until no more indexes are available. When the last index is reached and - * there are no more indexes available, the last examined bucket is replaced. - * After successful writing, the memory window is released and the function - * returns. - * - * @param table Pointer to the \a DHT-object. - * @param key Pointer to the key. - * @param data Pointer to the data. - * @param proc If not NULL, returns the process number written to. - * @param index If not NULL, returns the index of the bucket where the data was - * written to. - * @return int Returns either DHT_SUCCESS on success or correspondending error - * value on eviction or error. - */ -extern int DHT_write(DHT *table, void *key, void *data, uint32_t *proc, - uint32_t *index); - -/** - * @brief Read data from DHT. - * - * At the beginning, the target process and all possible indices are determined. - * After that a SHARED lock on the address window for read access is done - * and the first entry is retrieved. Now the received key is compared - * with the key passed to the function. If they coincide the correct data - * was found. If not it continues with the next index. If the last - * possible bucket is reached and the keys still do not match the read - * error counter is incremented. After the window has been released - * again, the function returns with a corresponding return value (read - * error or error-free read). The data to be read out is also written to - * the memory area of the passed pointer. - * - * @param table Pointer to the \a DHT-object. - * @param key Pointer to the key. - * @param destination Pointer to memory area where retreived data should be - * stored. - * @return int Returns either DHT_SUCCESS on success or correspondending error - * value on read miss or error. - */ -extern int DHT_read(DHT *table, const void *key, void *destination); - -extern int DHT_read_location(DHT *table, uint32_t proc, uint32_t index, - void *destination); -/** - * @brief Write current state of DHT to file. - * - * All contents are written as a memory dump, so that no conversion takes place. - * First, an attempt is made to open or create a file. If this is successful the - * file header consisting of data and key size is written. Then each process - * reads its memory area of the DHT and each bucket that was marked as written - * is added to the file using MPI file operations. - * - * @param table Pointer to the \a DHT-object. - * @param filename Name of the file to write to. - * @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be - * opened/closed or DHT_WRITE_ERROR if file is not writable. - */ -extern int DHT_to_file(DHT *table, const char *filename); - -/** - * @brief Read state of DHT from file. - * - * One needs a previously written DHT file (by DHT_from_file). - * First of all, an attempt is made to open the specified file. If this is - * succeeded the file header is read and compared with the current values of the - * DHT. If the data and key sizes do not differ, one can continue. Each process - * reads one line of the file and writes it to the DHT with DHT_write. This - * happens until no more lines are left. The writing is done by the - * implementation of DHT_write. - * - * @param table Pointer to the \a DHT-object. - * @param filename Name of the file to read from. - * @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be - * opened/closed, DHT_READ_MISS if file is not readable or DHT_WRONG_FILE if - * file doesn't match expectation. This is possible if the data size or key size - * is different. - */ -extern int DHT_from_file(DHT *table, const char *filename); - -/** - * @brief Free ressources of DHT. - * - * Finally, to free all resources after using the DHT, the function - * DHT_free must be used. This will free the MPI\_Window, as well as the - * associated memory. Also all internal variables are released. Optionally, the - * count of evictions and read misses can also be obtained. - * - * @param table Pointer to the \a DHT-object. - * @param eviction_counter \a optional: Pointer to integer where the count of - * evictions should be stored. - * @param readerror_counter \a optional: Pointer to integer where the count of - * read errors should be stored. - * @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on - * internal MPI error. - */ -extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter); - -/** - * @brief Prints a table with statistics about current use of DHT. - * - * These statistics are from each participated process and also summed up over - * all processes. Detailed statistics are: - * -# occupied buckets (in respect to the memory of this process) - * -# free buckets (in respect to the memory of this process) - * -# calls of DHT_write (w_access) - * -# calls of DHT_read (r_access) - * -# read misses (see DHT_READ_MISS) - * -# collisions (see DHT_WRITE_SUCCESS_WITH_EVICTION) - * 3-6 will reset with every call of this function finally the amount of new - * written entries is printed out (since the last call of this funtion). - * - * This is done by collective MPI operations with the root process with rank 0, - * which will also print a table with all informations to stdout. - * - * Also, as this function was implemented for a special case (POET project) one - * need to define DHT_STATISTICS to the compiler macros to use this - * function (eg. gcc -DDHT_STATISTICS ... ). - * @param table Pointer to the \a DHT-object. - * @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI - * error. - */ -extern int DHT_print_statistics(DHT *table); - -extern float DHT_get_used_idx_factor(DHT *table, int with_reset); - -extern int DHT_flush(DHT *table); - -#endif /* DHT_H */ diff --git a/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp b/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp index 3b836d4f4..865a49794 100644 --- a/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp +++ b/src/Chemistry/SurrogateModels/DHT_Wrapper.cpp @@ -21,6 +21,7 @@ #include "DHT_Wrapper.hpp" +#include "Chemistry/SurrogateModels/HashFunctions.hpp" #include "Init/InitialList.hpp" #include "Rounding.hpp" @@ -34,6 +35,8 @@ #include #include +#include + using namespace std; namespace poet { @@ -48,7 +51,8 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size, : key_count(key_indices.size()), data_count(data_count), input_key_elements(key_indices), communicator(dht_comm), key_species(key_species), output_names(_output_names), hooks(_hooks), - with_interp(_with_interp), has_het_ids(_has_het_ids) { + with_interp(_with_interp), has_het_ids(_has_het_ids), + dht_object(new DHT) { // initialize DHT object // key size = count of key elements + timestep uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement); @@ -57,8 +61,9 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size, sizeof(double); uint32_t buckets_per_process = static_cast(dht_size / (data_size + key_size)); - dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size, - &poet::Murmur2_64A); + + int status = DHT_create(key_size, data_size, buckets_per_process, + &poet::Murmur2_64A, dht_comm, dht_object.get()); dht_signif_vector = key_species.getValues(); @@ -82,10 +87,8 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size, } } -DHT_Wrapper::~DHT_Wrapper() { - // free DHT - DHT_free(dht_object, NULL, NULL); -} +DHT_Wrapper::~DHT_Wrapper() { DHT_free(this->dht_object.get(), NULL); } + auto DHT_Wrapper::checkDHT(WorkPackage &work_package) -> const DHT_ResultObject & { @@ -100,8 +103,8 @@ auto DHT_Wrapper::checkDHT(WorkPackage &work_package) auto &key_vector = dht_results.keys[i]; // overwrite input with data from DHT, IF value is found in DHT - int res = - DHT_read(this->dht_object, key_vector.data(), bucket_writer.data()); + int res = DHT_read(this->dht_object.get(), key_vector.data(), + bucket_writer.data()); switch (res) { case DHT_SUCCESS: @@ -159,7 +162,7 @@ void DHT_Wrapper::fillDHT(const WorkPackage &work_package) { // fuzz data (round, logarithm etc.) // insert simulated data with fuzzed key into DHT - int res = DHT_write(this->dht_object, key.data(), + int res = DHT_write(this->dht_object.get(), key.data(), const_cast(data.data()), &proc, &index); dht_results.locations[i] = {proc, index}; @@ -240,12 +243,12 @@ DHT_Wrapper::ratesToOutput(const std::vector &dht_data, // } int DHT_Wrapper::tableToFile(const char *filename) { - int res = DHT_to_file(dht_object, filename); + int res = DHT_to_file(dht_object.get(), filename); return res; } int DHT_Wrapper::fileToTable(const char *filename) { - int res = DHT_from_file(dht_object, filename); + int res = DHT_from_file(dht_object.get(), filename); if (res != DHT_SUCCESS) return res; @@ -259,7 +262,7 @@ int DHT_Wrapper::fileToTable(const char *filename) { void DHT_Wrapper::printStatistics() { int res; - res = DHT_print_statistics(dht_object); + res = DHT_print_statistics(dht_object.get()); if (res != DHT_SUCCESS) { // MPI ERROR ... WHAT TO DO NOW? diff --git a/src/Chemistry/SurrogateModels/DHT_Wrapper.hpp b/src/Chemistry/SurrogateModels/DHT_Wrapper.hpp index 9449692b4..576f2f56d 100644 --- a/src/Chemistry/SurrogateModels/DHT_Wrapper.hpp +++ b/src/Chemistry/SurrogateModels/DHT_Wrapper.hpp @@ -30,6 +30,7 @@ #include "Init/InitialList.hpp" #include "LookupKey.hpp" +#include #include #include #include @@ -37,9 +38,7 @@ #include #include -extern "C" { -#include "DHT.h" -} +#include #include @@ -194,7 +193,7 @@ public: auto getDataCount() { return this->data_count; } auto getCommunicator() { return this->communicator; } - DHT *getDHT() { return this->dht_object; }; + DHT *getDHT() { return this->dht_object.get(); }; DHT_ResultObject &getDHTResults() { return this->dht_results; } @@ -227,7 +226,8 @@ private: uint32_t key_count; uint32_t data_count; - DHT *dht_object; + std::unique_ptr dht_object; + MPI_Comm communicator; LookupKey fuzzForDHT(const std::vector &cell, double dt); diff --git a/src/Chemistry/SurrogateModels/Interpolation.hpp b/src/Chemistry/SurrogateModels/Interpolation.hpp index c00daf4eb..781f5619a 100644 --- a/src/Chemistry/SurrogateModels/Interpolation.hpp +++ b/src/Chemistry/SurrogateModels/Interpolation.hpp @@ -15,9 +15,7 @@ #include #include -extern "C" { -#include "DHT.h" -} +#include #include #include @@ -71,9 +69,9 @@ public: void getEntriesFromLocation(const PHT_Result &locations, const std::vector &signif); - void writeStats() { DHT_print_statistics(this->prox_ht); } + void writeStats() { PHT_print_statistics(this->prox_ht.get()); } - DHT *getDHTObject() { return this->prox_ht; } + PHT *getDHTObject() { return this->prox_ht.get(); } auto getPHTWriteTime() const -> double { return this->pht_write_t; }; auto getPHTReadTime() const -> double { return this->pht_read_t; }; @@ -104,7 +102,7 @@ private: static bool similarityCheck(const LookupKey &fine, const LookupKey &coarse, const std::vector &signif); - char *bucket_store; + std::unique_ptr bucket_store; class Cache : private std::unordered_map { @@ -127,7 +125,7 @@ private: }; Cache localCache; - DHT *prox_ht; + std::unique_ptr prox_ht; std::uint32_t dht_evictions = 0; @@ -166,7 +164,7 @@ public: enum result_status { RES_OK, INSUFFICIENT_DATA, NOT_NEEDED }; - DHT *getDHTObject() { return this->pht->getDHTObject(); } + PHT *getDHTObject() { return this->pht->getDHTObject(); } struct InterpolationResult { std::vector> results; @@ -211,7 +209,7 @@ public: void writePHTStats() { this->pht->writeStats(); } void dumpPHTState(const std::string &filename) { - DHT_to_file(this->pht->getDHTObject(), filename.c_str()); + PHT_to_file(this->pht->getDHTObject(), filename.c_str()); } static constexpr std::uint32_t COARSE_DIFF = 2; diff --git a/src/Chemistry/SurrogateModels/InterpolationModule.cpp b/src/Chemistry/SurrogateModels/InterpolationModule.cpp index 0b6fbd1d8..ae30e3a89 100644 --- a/src/Chemistry/SurrogateModels/InterpolationModule.cpp +++ b/src/Chemistry/SurrogateModels/InterpolationModule.cpp @@ -3,31 +3,24 @@ #include "DHT_Wrapper.hpp" #include "DataStructures/NamedVector.hpp" -#include "HashFunctions.hpp" #include "LookupKey.hpp" -#include "Rounding.hpp" #include #include #include #include -#include #include #include #include #include -#include #include #include #include #include -#include #include -extern "C" { -#include "DHT.h" -} +#include namespace poet { diff --git a/src/Chemistry/SurrogateModels/ProximityHashTable.cpp b/src/Chemistry/SurrogateModels/ProximityHashTable.cpp index d63ccb7ae..6bb07caf6 100644 --- a/src/Chemistry/SurrogateModels/ProximityHashTable.cpp +++ b/src/Chemistry/SurrogateModels/ProximityHashTable.cpp @@ -2,20 +2,17 @@ #include "DHT_Wrapper.hpp" #include "HashFunctions.hpp" +#include "LUCX/DHT.h" #include "LookupKey.hpp" #include "Rounding.hpp" #include #include #include -#include #include -#include #include -extern "C" { -#include "DHT.h" -} +#include namespace poet { @@ -23,86 +20,37 @@ ProximityHashTable::ProximityHashTable(uint32_t key_size, uint32_t data_size, uint32_t entry_count, uint32_t size_per_process, MPI_Comm communicator_) - : communicator(communicator_) { + : communicator(communicator_), prox_ht(new PHT) { - data_size *= entry_count; - data_size += sizeof(bucket_indicator); - -#ifdef POET_PHT_ADD - data_size += sizeof(std::uint64_t); -#endif - - bucket_store = new char[data_size]; + bucket_store = std::make_unique(data_size * entry_count); uint32_t buckets_per_process = - static_cast(size_per_process / (data_size + key_size)); + static_cast(size_per_process / (data_size * entry_count)); - this->prox_ht = DHT_create(communicator, buckets_per_process, data_size, - key_size, &poet::Murmur2_64A); + int status = + PHT_create(key_size, data_size, buckets_per_process, entry_count, + &poet::Murmur2_64A, communicator, this->prox_ht.get()); - DHT_set_accumulate_callback(this->prox_ht, PHT_callback_function); + if (status != PHT_SUCCESS) { + throw std::runtime_error("Failed to create PHT."); + } } ProximityHashTable::~ProximityHashTable() { - delete[] bucket_store; - if (prox_ht) { - DHT_free(prox_ht, NULL, NULL); - } -} - -int ProximityHashTable::PHT_callback_function(int in_data_size, void *in_data, - int out_data_size, - void *out_data) { - const int max_elements_per_bucket = - static_cast((out_data_size - sizeof(bucket_indicator) -#ifdef POET_PHT_ADD - - sizeof(std::uint64_t) -#endif - ) / - in_data_size); - DHT_Location *input = reinterpret_cast(in_data); - - bucket_indicator *occupied_buckets = - reinterpret_cast(out_data); - DHT_Location *pairs = reinterpret_cast(occupied_buckets + 1); - - if (*occupied_buckets == max_elements_per_bucket) { - return INTERP_CB_FULL; - } - - for (bucket_indicator i = 0; i < *occupied_buckets; i++) { - if (pairs[i] == *input) { - return INTERP_CB_ALREADY_IN; - } - } - - pairs[(*occupied_buckets)++] = *input; - - return INTERP_CB_OK; + PHT_free(this->prox_ht.get(), NULL); } void ProximityHashTable::writeLocationToPHT(LookupKey key, DHT_Location location) { - double start = MPI_Wtime(); - // if (localCache[key].first) { - // return; - // } + int status = + PHT_write(this->prox_ht.get(), key.data(), &location, NULL, NULL); - int ret_val; - - int status = DHT_write_accumulate(prox_ht, key.data(), sizeof(location), - &location, NULL, NULL, &ret_val); - - if (status == DHT_WRITE_SUCCESS_WITH_EVICTION) { + if (status == PHT_WRITE_SUCCESS_WITH_EVICTION) { this->dht_evictions++; } - // if (ret_val == INTERP_CB_FULL) { - // localCache(key, {}); - // } - this->pht_write_t += MPI_Wtime() - start; } @@ -117,47 +65,44 @@ const ProximityHashTable::PHT_Result &ProximityHashTable::query( return (lookup_results = cache_ret.second); } - int res = DHT_read(prox_ht, key.data(), bucket_store); + std::uint32_t slots_read; + int status = + PHT_read(prox_ht.get(), key.data(), bucket_store.get(), &slots_read); + this->pht_read_t += MPI_Wtime() - start_r; - if (res != DHT_SUCCESS) { + if (status == PHT_READ_MISS || slots_read < min_entries_needed) { this->lookup_results.size = 0; return lookup_results; } - auto *bucket_element_count = - reinterpret_cast(bucket_store); - auto *bucket_elements = - reinterpret_cast(bucket_element_count + 1); + DHT_Location *bucket_elements = + reinterpret_cast(bucket_store.get()); - if (*bucket_element_count < min_entries_needed) { - this->lookup_results.size = 0; - return lookup_results; - } - - lookup_results.size = *bucket_element_count; - auto locations = std::vector( - bucket_elements, bucket_elements + *(bucket_element_count)); + lookup_results.size = slots_read; lookup_results.in_values.clear(); - lookup_results.in_values.reserve(*bucket_element_count); + lookup_results.in_values.resize(slots_read); - lookup_results.out_values.clear(); - lookup_results.out_values.reserve(*bucket_element_count); + lookup_results.out_values.resize(slots_read); - for (const auto &loc : locations) { - double start_g = MPI_Wtime(); - DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data()); - this->pht_gather_dht_t += MPI_Wtime() - start_g; + for (std::uint32_t i = 0; i < slots_read; i++) { + DHT_Location &loc = bucket_elements[i]; + int status = + DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data()); + + if (status == DHT_READ_MISS) { + continue; + } auto *buffer = reinterpret_cast(dht_buffer.data()); - lookup_results.in_values.push_back( - std::vector(buffer, buffer + input_count)); + lookup_results.in_values[i] = + std::vector(buffer, buffer + input_count); buffer += input_count; - lookup_results.out_values.push_back( - std::vector(buffer, buffer + output_count)); + lookup_results.out_values[i] = + std::vector(buffer, buffer + output_count); } if (lookup_results.size != 0) { diff --git a/src/Chemistry/WorkerFunctions.cpp b/src/Chemistry/WorkerFunctions.cpp index b7eb6096c..419a9e502 100644 --- a/src/Chemistry/WorkerFunctions.cpp +++ b/src/Chemistry/WorkerFunctions.cpp @@ -246,17 +246,6 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status, << std::setw(this->file_pad) << iteration << ".pht"; interp->dumpPHTState(out.str()); } - - const auto max_mean_idx = - DHT_get_used_idx_factor(this->interp->getDHTObject(), 1); - - if (max_mean_idx >= 2) { - DHT_flush(this->interp->getDHTObject()); - DHT_flush(this->dht->getDHT()); - if (this->comm_rank == 2) { - std::cout << "Flushed both DHT and PHT!\n\n"; - } - } } RInsidePOET::getInstance().parseEvalQ("gc()");