mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-15 12:28:22 +01:00
feat: Integrate DHT library as a submodule
This commit is contained in:
parent
5a7779e8de
commit
c040f95235
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -5,3 +5,6 @@
|
||||
[submodule "ext/iphreeqc"]
|
||||
path = ext/iphreeqc
|
||||
url = ../iphreeqc.git
|
||||
[submodule "ext/dht"]
|
||||
path = ext/dht
|
||||
url = https://gitup.uni-potsdam.de/mluebke/dht_ucx.git
|
||||
|
||||
@ -19,8 +19,6 @@ find_package(MPI REQUIRED)
|
||||
|
||||
find_package(RRuntime REQUIRED)
|
||||
|
||||
add_subdirectory(src)
|
||||
|
||||
option(POET_PREPROCESS_BENCHS "Preprocess benchmarks" ON)
|
||||
|
||||
if (POET_PREPROCESS_BENCHS)
|
||||
@ -32,6 +30,9 @@ set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
|
||||
|
||||
add_subdirectory(ext/tug EXCLUDE_FROM_ALL)
|
||||
add_subdirectory(ext/iphreeqc EXCLUDE_FROM_ALL)
|
||||
add_subdirectory(ext/dht EXCLUDE_FROM_ALL)
|
||||
|
||||
add_subdirectory(src)
|
||||
|
||||
option(POET_ENABLE_TESTING "Build test suite for POET" OFF)
|
||||
|
||||
|
||||
1
ext/dht
Submodule
1
ext/dht
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit cc9f027d35d771c2ef1c547b8403f642ae781e48
|
||||
@ -9,7 +9,6 @@ add_library(POETLib
|
||||
Chemistry/MasterFunctions.cpp
|
||||
Chemistry/WorkerFunctions.cpp
|
||||
Chemistry/SurrogateModels/DHT_Wrapper.cpp
|
||||
Chemistry/SurrogateModels/DHT.c
|
||||
Chemistry/SurrogateModels/HashFunctions.cpp
|
||||
Chemistry/SurrogateModels/InterpolationModule.cpp
|
||||
Chemistry/SurrogateModels/ProximityHashTable.cpp
|
||||
@ -22,6 +21,8 @@ target_link_libraries(
|
||||
PUBLIC IPhreeqcPOET
|
||||
PUBLIC tug
|
||||
PUBLIC MPI::MPI_C
|
||||
PUBLIC LUCX::PHT
|
||||
PUBLIC LUCX::nolock
|
||||
)
|
||||
|
||||
include(FetchContent)
|
||||
|
||||
@ -1,703 +0,0 @@
|
||||
/*
|
||||
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
|
||||
**
|
||||
** POET is free software; you can redistribute it and/or modify it under the
|
||||
** terms of the GNU General Public License as published by the Free Software
|
||||
** Foundation; either version 2 of the License, or (at your option) any later
|
||||
** version.
|
||||
**
|
||||
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
**
|
||||
** You should have received a copy of the GNU General Public License along with
|
||||
** this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include "DHT.h"
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <math.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
static void determine_dest(uint64_t hash, int comm_size,
|
||||
unsigned int table_size, unsigned int *dest_rank,
|
||||
uint64_t *index, unsigned int index_count) {
|
||||
/** temporary index */
|
||||
uint64_t tmp_index;
|
||||
/** how many bytes do we need for one index? */
|
||||
int index_size = sizeof(double) - (index_count - 1);
|
||||
for (int i = 0; i < index_count; i++) {
|
||||
tmp_index = (hash >> (i * 8)) & ((1ULL << (index_size * 8)) - 1);
|
||||
/* memcpy(&tmp_index, (char *)&hash + i, index_size); */
|
||||
index[i] = (uint64_t)(tmp_index % table_size);
|
||||
}
|
||||
*dest_rank = (unsigned int)(hash % comm_size);
|
||||
}
|
||||
|
||||
static void set_flag(char *flag_byte) {
|
||||
*flag_byte = 0;
|
||||
*flag_byte |= (1 << 0);
|
||||
}
|
||||
|
||||
static int read_flag(char flag_byte) {
|
||||
if ((flag_byte & 0x01) == 0x01) {
|
||||
return 1;
|
||||
} else
|
||||
return 0;
|
||||
}
|
||||
|
||||
DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
|
||||
unsigned int key_size,
|
||||
uint64_t (*hash_func)(int, const void *)) {
|
||||
DHT *object;
|
||||
MPI_Win window;
|
||||
void *mem_alloc;
|
||||
int comm_size, index_bytes;
|
||||
|
||||
// calculate how much bytes for the index are needed to address count of
|
||||
// buckets per process
|
||||
index_bytes = (int)ceil(log2(size));
|
||||
if (index_bytes % 8 != 0)
|
||||
index_bytes = index_bytes + (8 - (index_bytes % 8));
|
||||
|
||||
// allocate memory for dht-object
|
||||
object = (DHT *)malloc(sizeof(DHT));
|
||||
if (object == NULL)
|
||||
return NULL;
|
||||
|
||||
// every memory allocation has 1 additional byte for flags etc.
|
||||
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL,
|
||||
&mem_alloc) != 0)
|
||||
return NULL;
|
||||
if (MPI_Comm_size(comm, &comm_size) != 0)
|
||||
return NULL;
|
||||
|
||||
// since MPI_Alloc_mem doesn't provide memory allocation with the memory set
|
||||
// to zero, we're doing this here
|
||||
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
|
||||
|
||||
// create windows on previously allocated memory
|
||||
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size),
|
||||
(1 + data_size + key_size), MPI_INFO_NULL, comm,
|
||||
&window) != 0)
|
||||
return NULL;
|
||||
|
||||
// fill dht-object
|
||||
object->data_size = data_size;
|
||||
object->key_size = key_size;
|
||||
object->table_size = size;
|
||||
object->window = window;
|
||||
object->hash_func = hash_func;
|
||||
object->comm_size = comm_size;
|
||||
object->communicator = comm;
|
||||
object->read_misses = 0;
|
||||
object->evictions = 0;
|
||||
object->recv_entry = malloc(1 + data_size + key_size);
|
||||
object->send_entry = malloc(1 + data_size + key_size);
|
||||
object->index_count = 9 - (index_bytes / 8);
|
||||
object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t));
|
||||
object->mem_alloc = mem_alloc;
|
||||
object->sum_idx = 0;
|
||||
object->cnt_idx = 0;
|
||||
|
||||
// if set, initialize dht_stats
|
||||
#ifdef DHT_STATISTICS
|
||||
DHT_stats *stats;
|
||||
|
||||
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
|
||||
if (stats == NULL)
|
||||
return NULL;
|
||||
|
||||
object->stats = stats;
|
||||
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
|
||||
object->stats->old_writes = 0;
|
||||
object->stats->read_misses = 0;
|
||||
object->stats->evictions = 0;
|
||||
object->stats->w_access = 0;
|
||||
object->stats->r_access = 0;
|
||||
#endif
|
||||
|
||||
return object;
|
||||
}
|
||||
|
||||
void DHT_set_accumulate_callback(DHT *table,
|
||||
int (*callback_func)(int, void *, int,
|
||||
void *)) {
|
||||
table->accumulate_callback = callback_func;
|
||||
}
|
||||
|
||||
int DHT_write_accumulate(DHT *table, const void *send_key, int data_size,
|
||||
void *send_data, uint32_t *proc, uint32_t *index,
|
||||
int *callback_ret) {
|
||||
unsigned int dest_rank, i;
|
||||
int result = DHT_SUCCESS;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->w_access++;
|
||||
#endif
|
||||
|
||||
// determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
|
||||
table->table_size, &dest_rank, table->index,
|
||||
table->index_count);
|
||||
|
||||
// concatenating key with data to write entry to DHT
|
||||
set_flag((char *)table->send_entry);
|
||||
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
|
||||
/* memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
|
||||
*/
|
||||
/* table->data_size); */
|
||||
|
||||
// locking window of target rank with exclusive lock
|
||||
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment eviction counter if receiving key doesn't match sending key
|
||||
// entry has write flag and last index is reached.
|
||||
if (read_flag(*(char *)table->recv_entry)) {
|
||||
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
|
||||
0) {
|
||||
if (i == (table->index_count) - 1) {
|
||||
table->evictions += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->evictions += 1;
|
||||
#endif
|
||||
result = DHT_WRITE_SUCCESS_WITH_EVICTION;
|
||||
break;
|
||||
}
|
||||
} else
|
||||
break;
|
||||
} else {
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->writes_local[dest_rank]++;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
table->cnt_idx += 1;
|
||||
table->sum_idx += (i + 1);
|
||||
|
||||
if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) {
|
||||
memset((char *)table->send_entry + 1 + table->key_size, '\0',
|
||||
table->data_size);
|
||||
} else {
|
||||
memcpy((char *)table->send_entry + 1 + table->key_size,
|
||||
(char *)table->recv_entry + 1 + table->key_size, table->data_size);
|
||||
}
|
||||
|
||||
*callback_ret = table->accumulate_callback(
|
||||
data_size, (char *)send_data, table->data_size,
|
||||
(char *)table->send_entry + 1 + table->key_size);
|
||||
|
||||
// put data to DHT (with last selected index by value i)
|
||||
if (*callback_ret == 0) {
|
||||
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
}
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
if (proc) {
|
||||
*proc = dest_rank;
|
||||
}
|
||||
|
||||
if (index) {
|
||||
*index = table->index[i];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc,
|
||||
uint32_t *index) {
|
||||
unsigned int dest_rank, i;
|
||||
int result = DHT_SUCCESS;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->w_access++;
|
||||
#endif
|
||||
|
||||
// determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
|
||||
table->table_size, &dest_rank, table->index,
|
||||
table->index_count);
|
||||
|
||||
// concatenating key with data to write entry to DHT
|
||||
set_flag((char *)table->send_entry);
|
||||
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
|
||||
memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
|
||||
table->data_size);
|
||||
|
||||
// locking window of target rank with exclusive lock
|
||||
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment eviction counter if receiving key doesn't match sending key
|
||||
// entry has write flag and last index is reached.
|
||||
if (read_flag(*(char *)table->recv_entry)) {
|
||||
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
|
||||
0) {
|
||||
if (i == (table->index_count) - 1) {
|
||||
table->evictions += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->evictions += 1;
|
||||
#endif
|
||||
result = DHT_WRITE_SUCCESS_WITH_EVICTION;
|
||||
break;
|
||||
}
|
||||
} else
|
||||
break;
|
||||
} else {
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->writes_local[dest_rank]++;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
table->cnt_idx += 1;
|
||||
table->sum_idx += (i + 1);
|
||||
|
||||
// put data to DHT (with last selected index by value i)
|
||||
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
if (proc) {
|
||||
*proc = dest_rank;
|
||||
}
|
||||
|
||||
if (index) {
|
||||
*index = table->index[i];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
int DHT_read(DHT *table, const void *send_key, void *destination) {
|
||||
unsigned int dest_rank, i;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->r_access++;
|
||||
#endif
|
||||
|
||||
// determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
|
||||
table->table_size, &dest_rank, table->index,
|
||||
table->index_count);
|
||||
|
||||
// locking window of target rank with shared lock
|
||||
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
// receive data
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment read error counter if write flag isn't set ...
|
||||
if ((read_flag(*(char *)table->recv_entry)) == 0) {
|
||||
table->read_misses += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->read_misses += 1;
|
||||
#endif
|
||||
// unlock window and return
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
return DHT_READ_MISS;
|
||||
}
|
||||
|
||||
// ... or key doesn't match passed by key and last index reached.
|
||||
if (memcmp(((char *)table->recv_entry) + 1, send_key, table->key_size) !=
|
||||
0) {
|
||||
if (i == (table->index_count) - 1) {
|
||||
table->read_misses += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->read_misses += 1;
|
||||
#endif
|
||||
// unlock window an return
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
return DHT_READ_MISS;
|
||||
}
|
||||
} else
|
||||
break;
|
||||
}
|
||||
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// if matching key was found copy data into memory of passed pointer
|
||||
memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1,
|
||||
table->data_size);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_read_location(DHT *table, uint32_t proc, uint32_t index,
|
||||
void *destination) {
|
||||
const uint32_t bucket_size = table->data_size + table->key_size + 1;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->r_access++;
|
||||
#endif
|
||||
|
||||
// locking window of target rank with shared lock
|
||||
if (MPI_Win_lock(MPI_LOCK_SHARED, proc, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
// receive data
|
||||
if (MPI_Get(table->recv_entry, bucket_size, MPI_BYTE, proc, index,
|
||||
bucket_size, MPI_BYTE, table->window) != 0) {
|
||||
return DHT_MPI_ERROR;
|
||||
}
|
||||
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(proc, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// if matching key was found copy data into memory of passed pointer
|
||||
memcpy((char *)destination, (char *)table->recv_entry + 1 + table->key_size,
|
||||
table->data_size);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_to_file(DHT *table, const char *filename) {
|
||||
// open file
|
||||
MPI_File file;
|
||||
if (MPI_File_open(table->communicator, filename,
|
||||
MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL,
|
||||
&file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
// write header (key_size and data_size)
|
||||
if (rank == 0) {
|
||||
if (MPI_File_write_shared(file, &table->key_size, 1, MPI_INT,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_WRITE_ERROR;
|
||||
if (MPI_File_write_shared(file, &table->data_size, 1, MPI_INT,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
|
||||
char *ptr;
|
||||
int bucket_size = table->key_size + table->data_size + 1;
|
||||
|
||||
// iterate over local memory
|
||||
for (unsigned int i = 0; i < table->table_size; i++) {
|
||||
ptr = (char *)table->mem_alloc + (i * bucket_size);
|
||||
// if bucket has been written to (checked by written_flag)...
|
||||
if (read_flag(*ptr)) {
|
||||
// write key and data to file
|
||||
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
|
||||
// close file
|
||||
if (MPI_File_close(&file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_from_file(DHT *table, const char *filename) {
|
||||
MPI_File file;
|
||||
MPI_Offset f_size;
|
||||
int bucket_size, buffer_size, cur_pos, rank, offset;
|
||||
char *buffer;
|
||||
void *key;
|
||||
void *data;
|
||||
|
||||
// open file
|
||||
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY,
|
||||
MPI_INFO_NULL, &file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
// get file size
|
||||
if (MPI_File_get_size(file, &f_size) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
// calculate bucket size
|
||||
bucket_size = table->key_size + table->data_size;
|
||||
// buffer size is either bucket size or, if bucket size is smaller than the
|
||||
// file header, the size of DHT_FILEHEADER_SIZE
|
||||
buffer_size =
|
||||
bucket_size > DHT_FILEHEADER_SIZE ? bucket_size : DHT_FILEHEADER_SIZE;
|
||||
// allocate buffer
|
||||
buffer = (char *)malloc(buffer_size);
|
||||
|
||||
// read file header
|
||||
if (MPI_File_read(file, buffer, DHT_FILEHEADER_SIZE, MPI_BYTE,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_READ_ERROR;
|
||||
|
||||
// compare if written header data and key size matches current sizes
|
||||
if (*(int *)buffer != table->key_size)
|
||||
return DHT_WRONG_FILE;
|
||||
if (*(int *)(buffer + 4) != table->data_size)
|
||||
return DHT_WRONG_FILE;
|
||||
|
||||
// set offset for each process
|
||||
offset = bucket_size * table->comm_size;
|
||||
|
||||
// seek behind header of DHT file
|
||||
if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
// current position is rank * bucket_size + OFFSET
|
||||
cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size);
|
||||
|
||||
// loop over file and write data to DHT with DHT_write
|
||||
while (cur_pos < f_size) {
|
||||
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
// TODO: really necessary?
|
||||
MPI_Offset tmp;
|
||||
MPI_File_get_position(file, &tmp);
|
||||
if (MPI_File_read(file, buffer, bucket_size, MPI_BYTE, MPI_STATUS_IGNORE) !=
|
||||
0)
|
||||
return DHT_FILE_READ_ERROR;
|
||||
// extract key and data and write to DHT
|
||||
key = buffer;
|
||||
data = (buffer + table->key_size);
|
||||
if (DHT_write(table, key, data, NULL, NULL) == DHT_MPI_ERROR)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment current position
|
||||
cur_pos += offset;
|
||||
}
|
||||
|
||||
free(buffer);
|
||||
if (MPI_File_close(&file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
|
||||
int buf;
|
||||
|
||||
if (eviction_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->evictions, &buf, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
*eviction_counter = buf;
|
||||
}
|
||||
if (readerror_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
*readerror_counter = buf;
|
||||
}
|
||||
if (MPI_Win_free(&(table->window)) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Free_mem(table->mem_alloc) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
free(table->recv_entry);
|
||||
free(table->send_entry);
|
||||
free(table->index);
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
free(table->stats->writes_local);
|
||||
free(table->stats);
|
||||
#endif
|
||||
free(table);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
float DHT_get_used_idx_factor(DHT *table, int with_reset) {
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
float my_avg_idx = (float)table->sum_idx / (float)table->cnt_idx;
|
||||
|
||||
float max_mean_index;
|
||||
|
||||
MPI_Reduce(&my_avg_idx, &max_mean_index, 1, MPI_FLOAT, MPI_MAX, 0,
|
||||
table->communicator);
|
||||
|
||||
MPI_Bcast(&max_mean_index, 1, MPI_FLOAT, 0, table->communicator);
|
||||
|
||||
if (!!with_reset) {
|
||||
table->sum_idx = 0;
|
||||
table->cnt_idx = 0;
|
||||
}
|
||||
|
||||
return max_mean_index;
|
||||
}
|
||||
|
||||
int DHT_flush(DHT *table) {
|
||||
// make sure all processes are synchronized
|
||||
MPI_Barrier(table->communicator);
|
||||
|
||||
// wipe local memory with zeros
|
||||
memset(table->mem_alloc, '\0',
|
||||
table->table_size * (1 + table->data_size + table->key_size));
|
||||
|
||||
table->sum_idx = 0;
|
||||
table->cnt_idx = 0;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_print_statistics(DHT *table) {
|
||||
#ifdef DHT_STATISTICS
|
||||
int *written_buckets;
|
||||
int *read_misses, sum_read_misses;
|
||||
int *evictions, sum_evictions;
|
||||
int sum_w_access, sum_r_access, *w_access, *r_access;
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
// disable possible warning of unitialized variable, which is not the case
|
||||
// since we're using MPI_Gather to obtain all values only on rank 0
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
|
||||
// obtaining all values from all processes in the communicator
|
||||
if (rank == 0)
|
||||
read_misses = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1,
|
||||
MPI_INT, 0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT,
|
||||
MPI_SUM, 0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->read_misses = 0;
|
||||
|
||||
if (rank == 0)
|
||||
evictions = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM,
|
||||
0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->evictions = 0;
|
||||
|
||||
if (rank == 0)
|
||||
w_access = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->w_access = 0;
|
||||
|
||||
if (rank == 0)
|
||||
r_access = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->r_access = 0;
|
||||
|
||||
if (rank == 0)
|
||||
written_buckets = (int *)calloc(table->comm_size, sizeof(int));
|
||||
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size,
|
||||
MPI_INT, MPI_SUM, 0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
if (rank == 0) { // only process with rank 0 will print out results as a
|
||||
// table
|
||||
int sum_written_buckets = 0;
|
||||
|
||||
for (int i = 0; i < table->comm_size; i++) {
|
||||
sum_written_buckets += written_buckets[i];
|
||||
}
|
||||
|
||||
int members = 7;
|
||||
int padsize = (members * 12) + 1;
|
||||
char pad[padsize + 1];
|
||||
|
||||
memset(pad, '-', padsize * sizeof(char));
|
||||
pad[padsize] = '\0';
|
||||
printf("\n");
|
||||
printf("%-35s||resets with every call of this function\n", " ");
|
||||
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied",
|
||||
"free", "w_access", "r_access", "read misses", "evictions");
|
||||
printf("%s\n", pad);
|
||||
for (int i = 0; i < table->comm_size; i++) {
|
||||
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", i,
|
||||
written_buckets[i], table->table_size - written_buckets[i],
|
||||
w_access[i], r_access[i], read_misses[i], evictions[i]);
|
||||
}
|
||||
printf("%s\n", pad);
|
||||
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", "sum",
|
||||
sum_written_buckets,
|
||||
(table->table_size * table->comm_size) - sum_written_buckets,
|
||||
sum_w_access, sum_r_access, sum_read_misses, sum_evictions);
|
||||
|
||||
printf("%s\n", pad);
|
||||
printf("%s %d\n",
|
||||
"new entries:", sum_written_buckets - table->stats->old_writes);
|
||||
|
||||
printf("\n");
|
||||
fflush(stdout);
|
||||
|
||||
table->stats->old_writes = sum_written_buckets;
|
||||
}
|
||||
|
||||
// enable warning again
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
return DHT_SUCCESS;
|
||||
#endif
|
||||
}
|
||||
@ -1,295 +0,0 @@
|
||||
/*
|
||||
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
|
||||
**
|
||||
** POET is free software; you can redistribute it and/or modify it under the
|
||||
** terms of the GNU General Public License as published by the Free Software
|
||||
** Foundation; either version 2 of the License, or (at your option) any later
|
||||
** version.
|
||||
**
|
||||
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
**
|
||||
** You should have received a copy of the GNU General Public License along with
|
||||
** this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file DHT.h
|
||||
* @author Max Lübke (mluebke@uni-potsdam.de)
|
||||
* @brief API to interact with the DHT
|
||||
* @version 0.1
|
||||
* @date 16 Nov 2017
|
||||
*
|
||||
* This file implements the creation of a DHT by using the MPI
|
||||
* one-sided-communication. There is also the possibility to write or read data
|
||||
* from or to the DHT. In addition, the current state of the DHT can be written
|
||||
* to a file and read in again later.
|
||||
*/
|
||||
|
||||
#ifndef DHT_H
|
||||
#define DHT_H
|
||||
|
||||
#include <mpi.h>
|
||||
#include <stdint.h>
|
||||
|
||||
/** Returned if some error in MPI routine occurs. */
|
||||
#define DHT_MPI_ERROR -1
|
||||
/** Returned by a call of DHT_read if no bucket with given key was found. */
|
||||
#define DHT_READ_MISS -2
|
||||
/** Returned by DHT_write if a bucket was evicted. */
|
||||
#define DHT_WRITE_SUCCESS_WITH_EVICTION -3
|
||||
/** Returned when no errors occured. */
|
||||
#define DHT_SUCCESS 0
|
||||
|
||||
/** Returned by DHT_from_file if the given file does not match expected file. */
|
||||
#define DHT_WRONG_FILE -11
|
||||
/** Returned by DHT file operations if MPI file operation fails. */
|
||||
#define DHT_FILE_IO_ERROR -12
|
||||
/** Returned by DHT file operations if error occured in MPI_Read operation. */
|
||||
#define DHT_FILE_READ_ERROR -13
|
||||
/** Returned by DHT file operations if error occured in MPI_Write operation. */
|
||||
#define DHT_FILE_WRITE_ERROR -14
|
||||
|
||||
/** Size of the file header in byte. */
|
||||
#define DHT_FILEHEADER_SIZE 8
|
||||
|
||||
/**
|
||||
* Internal struct to store statistics about read and write accesses and also
|
||||
* read misses and evictions.
|
||||
* <b>All values will be resetted to zero after a call of
|
||||
* DHT_print_statistics().</b>
|
||||
* Internal use only!
|
||||
*
|
||||
* @todo There's maybe a better solution than DHT_print_statistics and this
|
||||
* struct
|
||||
*/
|
||||
typedef struct {
|
||||
/** Count of writes to specific process this process did. */
|
||||
int *writes_local;
|
||||
/** Writes after last call of DHT_print_statistics. */
|
||||
int old_writes;
|
||||
/** How many read misses occur? */
|
||||
int read_misses;
|
||||
/** How many buckets where evicted? */
|
||||
int evictions;
|
||||
/** How many calls of DHT_write() did this process? */
|
||||
int w_access;
|
||||
/** How many calls of DHT_read() did this process? */
|
||||
int r_access;
|
||||
} DHT_stats;
|
||||
|
||||
/**
|
||||
* Struct which serves as a handler or so called \a DHT-object. Will
|
||||
* be created by DHT_create and must be passed as a parameter to every following
|
||||
* function. Stores all relevant data.
|
||||
* Do not touch outside DHT functions!
|
||||
*/
|
||||
typedef struct {
|
||||
/** Created MPI Window, which serves as the DHT memory area of the process. */
|
||||
MPI_Win window;
|
||||
/** Size of the data of a bucket entry in byte. */
|
||||
int data_size;
|
||||
/** Size of the key of a bucket entry in byte. */
|
||||
int key_size;
|
||||
/** Count of buckets for each process. */
|
||||
unsigned int table_size;
|
||||
/** MPI communicator of all participating processes. */
|
||||
MPI_Comm communicator;
|
||||
/** Size of the MPI communicator respectively all participating processes. */
|
||||
int comm_size;
|
||||
/** Pointer to a hashfunction. */
|
||||
uint64_t (*hash_func)(int, const void *);
|
||||
/** Pre-allocated memory where a bucket can be received. */
|
||||
void *recv_entry;
|
||||
/** Pre-allocated memory where a bucket to send can be stored. */
|
||||
void *send_entry;
|
||||
/** Allocated memory on which the MPI window was created. */
|
||||
void *mem_alloc;
|
||||
/** Count of read misses over all time. */
|
||||
int read_misses;
|
||||
/** Count of evictions over all time. */
|
||||
int evictions;
|
||||
/** Array of indeces where a bucket can be stored. */
|
||||
uint64_t *index;
|
||||
/** Count of possible indeces. */
|
||||
unsigned int index_count;
|
||||
|
||||
int (*accumulate_callback)(int, void *, int, void *);
|
||||
|
||||
size_t sum_idx;
|
||||
size_t cnt_idx;
|
||||
#ifdef DHT_STATISTICS
|
||||
/** Detailed statistics of the usage of the DHT. */
|
||||
DHT_stats *stats;
|
||||
#endif
|
||||
} DHT;
|
||||
|
||||
extern void DHT_set_accumulate_callback(DHT *table,
|
||||
int (*callback_func)(int, void *, int,
|
||||
void *));
|
||||
|
||||
extern int DHT_write_accumulate(DHT *table, const void *key, int send_size,
|
||||
void *data, uint32_t *proc, uint32_t *index,
|
||||
int *callback_ret);
|
||||
|
||||
/**
|
||||
* @brief Create a DHT.
|
||||
*
|
||||
* When calling this function, the required memory is allocated and a
|
||||
* MPI_Window is created. This allows the execution of MPI_Get and
|
||||
* MPI_Put operations for one-sided communication. Then the number of
|
||||
* indexes is calculated and finally all relevant data is entered into the
|
||||
* \a DHT-object which is returned.
|
||||
*
|
||||
* @param comm MPI communicator which addresses all participating process of the
|
||||
* DHT.
|
||||
* @param size_per_process Number of buckets per process.
|
||||
* @param data_size Size of data in byte.
|
||||
* @param key_size Size of the key in byte.
|
||||
* @param hash_func Pointer to a hash function. This function must take the size
|
||||
* of the key and a pointer to the key as input parameters and return a 64 bit
|
||||
* hash.
|
||||
* @return DHT* The returned value is the \a DHT-object which serves as a handle
|
||||
* for all DHT operations. If an error occured NULL is returned.
|
||||
*/
|
||||
extern DHT *DHT_create(MPI_Comm comm, uint64_t size_per_process,
|
||||
unsigned int data_size, unsigned int key_size,
|
||||
uint64_t (*hash_func)(int, const void *));
|
||||
|
||||
/**
|
||||
* @brief Write data into DHT.
|
||||
*
|
||||
* When DHT_write is called, the address window is locked with a
|
||||
* LOCK_EXCLUSIVE for write access. Now the first bucket is received
|
||||
* using MPI_Get and it is checked if the bucket is empty or if the received key
|
||||
* matches the passed key. If this is the case, the data of the bucket is
|
||||
* overwritten with the new value. If not, the function continues with the next
|
||||
* index until no more indexes are available. When the last index is reached and
|
||||
* there are no more indexes available, the last examined bucket is replaced.
|
||||
* After successful writing, the memory window is released and the function
|
||||
* returns.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param key Pointer to the key.
|
||||
* @param data Pointer to the data.
|
||||
* @param proc If not NULL, returns the process number written to.
|
||||
* @param index If not NULL, returns the index of the bucket where the data was
|
||||
* written to.
|
||||
* @return int Returns either DHT_SUCCESS on success or correspondending error
|
||||
* value on eviction or error.
|
||||
*/
|
||||
extern int DHT_write(DHT *table, void *key, void *data, uint32_t *proc,
|
||||
uint32_t *index);
|
||||
|
||||
/**
|
||||
* @brief Read data from DHT.
|
||||
*
|
||||
* At the beginning, the target process and all possible indices are determined.
|
||||
* After that a SHARED lock on the address window for read access is done
|
||||
* and the first entry is retrieved. Now the received key is compared
|
||||
* with the key passed to the function. If they coincide the correct data
|
||||
* was found. If not it continues with the next index. If the last
|
||||
* possible bucket is reached and the keys still do not match the read
|
||||
* error counter is incremented. After the window has been released
|
||||
* again, the function returns with a corresponding return value (read
|
||||
* error or error-free read). The data to be read out is also written to
|
||||
* the memory area of the passed pointer.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param key Pointer to the key.
|
||||
* @param destination Pointer to memory area where retreived data should be
|
||||
* stored.
|
||||
* @return int Returns either DHT_SUCCESS on success or correspondending error
|
||||
* value on read miss or error.
|
||||
*/
|
||||
extern int DHT_read(DHT *table, const void *key, void *destination);
|
||||
|
||||
extern int DHT_read_location(DHT *table, uint32_t proc, uint32_t index,
|
||||
void *destination);
|
||||
/**
|
||||
* @brief Write current state of DHT to file.
|
||||
*
|
||||
* All contents are written as a memory dump, so that no conversion takes place.
|
||||
* First, an attempt is made to open or create a file. If this is successful the
|
||||
* file header consisting of data and key size is written. Then each process
|
||||
* reads its memory area of the DHT and each bucket that was marked as written
|
||||
* is added to the file using MPI file operations.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param filename Name of the file to write to.
|
||||
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
|
||||
* opened/closed or DHT_WRITE_ERROR if file is not writable.
|
||||
*/
|
||||
extern int DHT_to_file(DHT *table, const char *filename);
|
||||
|
||||
/**
|
||||
* @brief Read state of DHT from file.
|
||||
*
|
||||
* One needs a previously written DHT file (by DHT_from_file).
|
||||
* First of all, an attempt is made to open the specified file. If this is
|
||||
* succeeded the file header is read and compared with the current values of the
|
||||
* DHT. If the data and key sizes do not differ, one can continue. Each process
|
||||
* reads one line of the file and writes it to the DHT with DHT_write. This
|
||||
* happens until no more lines are left. The writing is done by the
|
||||
* implementation of DHT_write.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param filename Name of the file to read from.
|
||||
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
|
||||
* opened/closed, DHT_READ_MISS if file is not readable or DHT_WRONG_FILE if
|
||||
* file doesn't match expectation. This is possible if the data size or key size
|
||||
* is different.
|
||||
*/
|
||||
extern int DHT_from_file(DHT *table, const char *filename);
|
||||
|
||||
/**
|
||||
* @brief Free ressources of DHT.
|
||||
*
|
||||
* Finally, to free all resources after using the DHT, the function
|
||||
* DHT_free must be used. This will free the MPI\_Window, as well as the
|
||||
* associated memory. Also all internal variables are released. Optionally, the
|
||||
* count of evictions and read misses can also be obtained.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param eviction_counter \a optional: Pointer to integer where the count of
|
||||
* evictions should be stored.
|
||||
* @param readerror_counter \a optional: Pointer to integer where the count of
|
||||
* read errors should be stored.
|
||||
* @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on
|
||||
* internal MPI error.
|
||||
*/
|
||||
extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter);
|
||||
|
||||
/**
|
||||
* @brief Prints a table with statistics about current use of DHT.
|
||||
*
|
||||
* These statistics are from each participated process and also summed up over
|
||||
* all processes. Detailed statistics are:
|
||||
* -# occupied buckets (in respect to the memory of this process)
|
||||
* -# free buckets (in respect to the memory of this process)
|
||||
* -# calls of DHT_write (w_access)
|
||||
* -# calls of DHT_read (r_access)
|
||||
* -# read misses (see DHT_READ_MISS)
|
||||
* -# collisions (see DHT_WRITE_SUCCESS_WITH_EVICTION)
|
||||
* 3-6 will reset with every call of this function finally the amount of new
|
||||
* written entries is printed out (since the last call of this funtion).
|
||||
*
|
||||
* This is done by collective MPI operations with the root process with rank 0,
|
||||
* which will also print a table with all informations to stdout.
|
||||
*
|
||||
* Also, as this function was implemented for a special case (POET project) one
|
||||
* need to define DHT_STATISTICS to the compiler macros to use this
|
||||
* function (eg. <emph>gcc -DDHT_STATISTICS ... </emph>).
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI
|
||||
* error.
|
||||
*/
|
||||
extern int DHT_print_statistics(DHT *table);
|
||||
|
||||
extern float DHT_get_used_idx_factor(DHT *table, int with_reset);
|
||||
|
||||
extern int DHT_flush(DHT *table);
|
||||
|
||||
#endif /* DHT_H */
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "DHT_Wrapper.hpp"
|
||||
|
||||
#include "Chemistry/SurrogateModels/HashFunctions.hpp"
|
||||
#include "Init/InitialList.hpp"
|
||||
#include "Rounding.hpp"
|
||||
|
||||
@ -34,6 +35,8 @@
|
||||
#include <stdexcept>
|
||||
#include <vector>
|
||||
|
||||
#include <LUCX/DHT.h>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace poet {
|
||||
@ -48,7 +51,8 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
: key_count(key_indices.size()), data_count(data_count),
|
||||
input_key_elements(key_indices), communicator(dht_comm),
|
||||
key_species(key_species), output_names(_output_names), hooks(_hooks),
|
||||
with_interp(_with_interp), has_het_ids(_has_het_ids) {
|
||||
with_interp(_with_interp), has_het_ids(_has_het_ids),
|
||||
dht_object(new DHT) {
|
||||
// initialize DHT object
|
||||
// key size = count of key elements + timestep
|
||||
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
|
||||
@ -57,8 +61,9 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
sizeof(double);
|
||||
uint32_t buckets_per_process =
|
||||
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
|
||||
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
|
||||
&poet::Murmur2_64A);
|
||||
|
||||
int status = DHT_create(key_size, data_size, buckets_per_process,
|
||||
&poet::Murmur2_64A, dht_comm, dht_object.get());
|
||||
|
||||
dht_signif_vector = key_species.getValues();
|
||||
|
||||
@ -82,10 +87,8 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
}
|
||||
}
|
||||
|
||||
DHT_Wrapper::~DHT_Wrapper() {
|
||||
// free DHT
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
}
|
||||
DHT_Wrapper::~DHT_Wrapper() { DHT_free(this->dht_object.get(), NULL); }
|
||||
|
||||
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
|
||||
-> const DHT_ResultObject & {
|
||||
|
||||
@ -100,8 +103,8 @@ auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
|
||||
auto &key_vector = dht_results.keys[i];
|
||||
|
||||
// overwrite input with data from DHT, IF value is found in DHT
|
||||
int res =
|
||||
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
|
||||
int res = DHT_read(this->dht_object.get(), key_vector.data(),
|
||||
bucket_writer.data());
|
||||
|
||||
switch (res) {
|
||||
case DHT_SUCCESS:
|
||||
@ -159,7 +162,7 @@ void DHT_Wrapper::fillDHT(const WorkPackage &work_package) {
|
||||
// fuzz data (round, logarithm etc.)
|
||||
|
||||
// insert simulated data with fuzzed key into DHT
|
||||
int res = DHT_write(this->dht_object, key.data(),
|
||||
int res = DHT_write(this->dht_object.get(), key.data(),
|
||||
const_cast<double *>(data.data()), &proc, &index);
|
||||
|
||||
dht_results.locations[i] = {proc, index};
|
||||
@ -240,12 +243,12 @@ DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
|
||||
// }
|
||||
|
||||
int DHT_Wrapper::tableToFile(const char *filename) {
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
int res = DHT_to_file(dht_object.get(), filename);
|
||||
return res;
|
||||
}
|
||||
|
||||
int DHT_Wrapper::fileToTable(const char *filename) {
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
int res = DHT_from_file(dht_object.get(), filename);
|
||||
if (res != DHT_SUCCESS)
|
||||
return res;
|
||||
|
||||
@ -259,7 +262,7 @@ int DHT_Wrapper::fileToTable(const char *filename) {
|
||||
void DHT_Wrapper::printStatistics() {
|
||||
int res;
|
||||
|
||||
res = DHT_print_statistics(dht_object);
|
||||
res = DHT_print_statistics(dht_object.get());
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "Init/InitialList.hpp"
|
||||
#include "LookupKey.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <array>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
@ -37,9 +38,7 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <LUCX/DHT.h>
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
@ -194,7 +193,7 @@ public:
|
||||
|
||||
auto getDataCount() { return this->data_count; }
|
||||
auto getCommunicator() { return this->communicator; }
|
||||
DHT *getDHT() { return this->dht_object; };
|
||||
DHT *getDHT() { return this->dht_object.get(); };
|
||||
|
||||
DHT_ResultObject &getDHTResults() { return this->dht_results; }
|
||||
|
||||
@ -227,7 +226,8 @@ private:
|
||||
uint32_t key_count;
|
||||
uint32_t data_count;
|
||||
|
||||
DHT *dht_object;
|
||||
std::unique_ptr<DHT> dht_object;
|
||||
|
||||
MPI_Comm communicator;
|
||||
|
||||
LookupKey fuzzForDHT(const std::vector<double> &cell, double dt);
|
||||
|
||||
@ -15,9 +15,7 @@
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <LUCX/PHT.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <unordered_map>
|
||||
@ -71,9 +69,9 @@ public:
|
||||
void getEntriesFromLocation(const PHT_Result &locations,
|
||||
const std::vector<uint32_t> &signif);
|
||||
|
||||
void writeStats() { DHT_print_statistics(this->prox_ht); }
|
||||
void writeStats() { PHT_print_statistics(this->prox_ht.get()); }
|
||||
|
||||
DHT *getDHTObject() { return this->prox_ht; }
|
||||
PHT *getDHTObject() { return this->prox_ht.get(); }
|
||||
|
||||
auto getPHTWriteTime() const -> double { return this->pht_write_t; };
|
||||
auto getPHTReadTime() const -> double { return this->pht_read_t; };
|
||||
@ -104,7 +102,7 @@ private:
|
||||
static bool similarityCheck(const LookupKey &fine, const LookupKey &coarse,
|
||||
const std::vector<uint32_t> &signif);
|
||||
|
||||
char *bucket_store;
|
||||
std::unique_ptr<char[]> bucket_store;
|
||||
|
||||
class Cache
|
||||
: private std::unordered_map<LookupKey, PHT_Result, LookupKeyHasher> {
|
||||
@ -127,7 +125,7 @@ private:
|
||||
};
|
||||
|
||||
Cache localCache;
|
||||
DHT *prox_ht;
|
||||
std::unique_ptr<PHT> prox_ht;
|
||||
|
||||
std::uint32_t dht_evictions = 0;
|
||||
|
||||
@ -166,7 +164,7 @@ public:
|
||||
|
||||
enum result_status { RES_OK, INSUFFICIENT_DATA, NOT_NEEDED };
|
||||
|
||||
DHT *getDHTObject() { return this->pht->getDHTObject(); }
|
||||
PHT *getDHTObject() { return this->pht->getDHTObject(); }
|
||||
|
||||
struct InterpolationResult {
|
||||
std::vector<std::vector<double>> results;
|
||||
@ -211,7 +209,7 @@ public:
|
||||
|
||||
void writePHTStats() { this->pht->writeStats(); }
|
||||
void dumpPHTState(const std::string &filename) {
|
||||
DHT_to_file(this->pht->getDHTObject(), filename.c_str());
|
||||
PHT_to_file(this->pht->getDHTObject(), filename.c_str());
|
||||
}
|
||||
|
||||
static constexpr std::uint32_t COARSE_DIFF = 2;
|
||||
|
||||
@ -3,31 +3,24 @@
|
||||
|
||||
#include "DHT_Wrapper.hpp"
|
||||
#include "DataStructures/NamedVector.hpp"
|
||||
#include "HashFunctions.hpp"
|
||||
#include "LookupKey.hpp"
|
||||
#include "Rounding.hpp"
|
||||
|
||||
#include <Rcpp.h>
|
||||
#include <Rcpp/proxy/ProtectedProxy.h>
|
||||
#include <Rinternals.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
#include <cmath>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <mpi.h>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <LUCX/PHT.h>
|
||||
|
||||
namespace poet {
|
||||
|
||||
|
||||
@ -2,20 +2,17 @@
|
||||
|
||||
#include "DHT_Wrapper.hpp"
|
||||
#include "HashFunctions.hpp"
|
||||
#include "LUCX/DHT.h"
|
||||
#include "LookupKey.hpp"
|
||||
#include "Rounding.hpp"
|
||||
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <LUCX/PHT.h>
|
||||
|
||||
namespace poet {
|
||||
|
||||
@ -23,86 +20,37 @@ ProximityHashTable::ProximityHashTable(uint32_t key_size, uint32_t data_size,
|
||||
uint32_t entry_count,
|
||||
uint32_t size_per_process,
|
||||
MPI_Comm communicator_)
|
||||
: communicator(communicator_) {
|
||||
: communicator(communicator_), prox_ht(new PHT) {
|
||||
|
||||
data_size *= entry_count;
|
||||
data_size += sizeof(bucket_indicator);
|
||||
|
||||
#ifdef POET_PHT_ADD
|
||||
data_size += sizeof(std::uint64_t);
|
||||
#endif
|
||||
|
||||
bucket_store = new char[data_size];
|
||||
bucket_store = std::make_unique<char[]>(data_size * entry_count);
|
||||
|
||||
uint32_t buckets_per_process =
|
||||
static_cast<std::uint32_t>(size_per_process / (data_size + key_size));
|
||||
static_cast<std::uint32_t>(size_per_process / (data_size * entry_count));
|
||||
|
||||
this->prox_ht = DHT_create(communicator, buckets_per_process, data_size,
|
||||
key_size, &poet::Murmur2_64A);
|
||||
int status =
|
||||
PHT_create(key_size, data_size, buckets_per_process, entry_count,
|
||||
&poet::Murmur2_64A, communicator, this->prox_ht.get());
|
||||
|
||||
DHT_set_accumulate_callback(this->prox_ht, PHT_callback_function);
|
||||
if (status != PHT_SUCCESS) {
|
||||
throw std::runtime_error("Failed to create PHT.");
|
||||
}
|
||||
}
|
||||
|
||||
ProximityHashTable::~ProximityHashTable() {
|
||||
delete[] bucket_store;
|
||||
if (prox_ht) {
|
||||
DHT_free(prox_ht, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
int ProximityHashTable::PHT_callback_function(int in_data_size, void *in_data,
|
||||
int out_data_size,
|
||||
void *out_data) {
|
||||
const int max_elements_per_bucket =
|
||||
static_cast<int>((out_data_size - sizeof(bucket_indicator)
|
||||
#ifdef POET_PHT_ADD
|
||||
- sizeof(std::uint64_t)
|
||||
#endif
|
||||
) /
|
||||
in_data_size);
|
||||
DHT_Location *input = reinterpret_cast<DHT_Location *>(in_data);
|
||||
|
||||
bucket_indicator *occupied_buckets =
|
||||
reinterpret_cast<bucket_indicator *>(out_data);
|
||||
DHT_Location *pairs = reinterpret_cast<DHT_Location *>(occupied_buckets + 1);
|
||||
|
||||
if (*occupied_buckets == max_elements_per_bucket) {
|
||||
return INTERP_CB_FULL;
|
||||
}
|
||||
|
||||
for (bucket_indicator i = 0; i < *occupied_buckets; i++) {
|
||||
if (pairs[i] == *input) {
|
||||
return INTERP_CB_ALREADY_IN;
|
||||
}
|
||||
}
|
||||
|
||||
pairs[(*occupied_buckets)++] = *input;
|
||||
|
||||
return INTERP_CB_OK;
|
||||
PHT_free(this->prox_ht.get(), NULL);
|
||||
}
|
||||
|
||||
void ProximityHashTable::writeLocationToPHT(LookupKey key,
|
||||
DHT_Location location) {
|
||||
|
||||
double start = MPI_Wtime();
|
||||
|
||||
// if (localCache[key].first) {
|
||||
// return;
|
||||
// }
|
||||
int status =
|
||||
PHT_write(this->prox_ht.get(), key.data(), &location, NULL, NULL);
|
||||
|
||||
int ret_val;
|
||||
|
||||
int status = DHT_write_accumulate(prox_ht, key.data(), sizeof(location),
|
||||
&location, NULL, NULL, &ret_val);
|
||||
|
||||
if (status == DHT_WRITE_SUCCESS_WITH_EVICTION) {
|
||||
if (status == PHT_WRITE_SUCCESS_WITH_EVICTION) {
|
||||
this->dht_evictions++;
|
||||
}
|
||||
|
||||
// if (ret_val == INTERP_CB_FULL) {
|
||||
// localCache(key, {});
|
||||
// }
|
||||
|
||||
this->pht_write_t += MPI_Wtime() - start;
|
||||
}
|
||||
|
||||
@ -117,47 +65,44 @@ const ProximityHashTable::PHT_Result &ProximityHashTable::query(
|
||||
return (lookup_results = cache_ret.second);
|
||||
}
|
||||
|
||||
int res = DHT_read(prox_ht, key.data(), bucket_store);
|
||||
std::uint32_t slots_read;
|
||||
int status =
|
||||
PHT_read(prox_ht.get(), key.data(), bucket_store.get(), &slots_read);
|
||||
|
||||
this->pht_read_t += MPI_Wtime() - start_r;
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (status == PHT_READ_MISS || slots_read < min_entries_needed) {
|
||||
this->lookup_results.size = 0;
|
||||
return lookup_results;
|
||||
}
|
||||
|
||||
auto *bucket_element_count =
|
||||
reinterpret_cast<bucket_indicator *>(bucket_store);
|
||||
auto *bucket_elements =
|
||||
reinterpret_cast<DHT_Location *>(bucket_element_count + 1);
|
||||
DHT_Location *bucket_elements =
|
||||
reinterpret_cast<DHT_Location *>(bucket_store.get());
|
||||
|
||||
if (*bucket_element_count < min_entries_needed) {
|
||||
this->lookup_results.size = 0;
|
||||
return lookup_results;
|
||||
}
|
||||
|
||||
lookup_results.size = *bucket_element_count;
|
||||
auto locations = std::vector<DHT_Location>(
|
||||
bucket_elements, bucket_elements + *(bucket_element_count));
|
||||
lookup_results.size = slots_read;
|
||||
|
||||
lookup_results.in_values.clear();
|
||||
lookup_results.in_values.reserve(*bucket_element_count);
|
||||
lookup_results.in_values.resize(slots_read);
|
||||
|
||||
lookup_results.out_values.clear();
|
||||
lookup_results.out_values.reserve(*bucket_element_count);
|
||||
lookup_results.out_values.resize(slots_read);
|
||||
|
||||
for (const auto &loc : locations) {
|
||||
double start_g = MPI_Wtime();
|
||||
DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data());
|
||||
this->pht_gather_dht_t += MPI_Wtime() - start_g;
|
||||
for (std::uint32_t i = 0; i < slots_read; i++) {
|
||||
DHT_Location &loc = bucket_elements[i];
|
||||
int status =
|
||||
DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data());
|
||||
|
||||
if (status == DHT_READ_MISS) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto *buffer = reinterpret_cast<double *>(dht_buffer.data());
|
||||
|
||||
lookup_results.in_values.push_back(
|
||||
std::vector<double>(buffer, buffer + input_count));
|
||||
lookup_results.in_values[i] =
|
||||
std::vector<double>(buffer, buffer + input_count);
|
||||
|
||||
buffer += input_count;
|
||||
lookup_results.out_values.push_back(
|
||||
std::vector<double>(buffer, buffer + output_count));
|
||||
lookup_results.out_values[i] =
|
||||
std::vector<double>(buffer, buffer + output_count);
|
||||
}
|
||||
|
||||
if (lookup_results.size != 0) {
|
||||
|
||||
@ -246,17 +246,6 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
|
||||
const auto max_mean_idx =
|
||||
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
|
||||
|
||||
if (max_mean_idx >= 2) {
|
||||
DHT_flush(this->interp->getDHTObject());
|
||||
DHT_flush(this->dht->getDHT());
|
||||
if (this->comm_rank == 2) {
|
||||
std::cout << "Flushed both DHT and PHT!\n\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RInsidePOET::getInstance().parseEvalQ("gc()");
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user