mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 12:54:50 +01:00
DAOS implementation
This commit is contained in:
parent
e81e7ec2fa
commit
9014948805
200
README.md
200
README.md
@ -4,199 +4,33 @@
|
||||
|
||||
# POET Daos
|
||||
|
||||
[POET](https://doi.org/10.5281/zenodo.4757913) is a coupled reactive transport
|
||||
simulator implementing a parallel architecture and a fast, original MPI-based
|
||||
Distributed Hash Table.
|
||||
## Branch Information
|
||||
|
||||

|
||||
|
||||
## Parsed code documentiation
|
||||
|
||||
A parsed version of POET's documentiation can be found at [Gitlab
|
||||
pages](https://naaice.git-pages.gfz-potsdam.de/poet).
|
||||
|
||||
## External Libraries
|
||||
|
||||
The following external header library is shipped with POET:
|
||||
|
||||
- **argh** - https://github.com/adishavit/argh (BSD license)
|
||||
- **PhreeqcRM** with patches from GFZ -
|
||||
https://www.usgs.gov/software/phreeqc-version-3 -
|
||||
https://git.gfz-potsdam.de/mluebke/phreeqcrm-gfz
|
||||
- **tug** - https://git.gfz-potsdam.de/sec34/tug
|
||||
This branch replaces POET's MPI-based key-value store with the one provided by
|
||||
the DAOS-API, which is meant to be run in an environment which is connected with
|
||||
a DAOS server.
|
||||
|
||||
## Installation
|
||||
|
||||
### Requirements
|
||||
|
||||
To compile POET you need several software to be installed:
|
||||
|
||||
- C/C++ compiler (tested with GCC)
|
||||
- MPI-Implementation (tested with OpenMPI and MVAPICH)
|
||||
- R language and environment
|
||||
- CMake 3.9+
|
||||
- *optional*: `doxygen` with `dot` bindings for documentiation
|
||||
|
||||
The following R libraries must then be installed, which will get the
|
||||
needed dependencies automatically:
|
||||
|
||||
- [Rcpp](https://cran.r-project.org/web/packages/Rcpp/index.html)
|
||||
- [RInside](https://cran.r-project.org/web/packages/RInside/index.html)
|
||||
|
||||
### Compiling source code
|
||||
|
||||
The generation of makefiles is done with CMake. You should be able to generate
|
||||
Makefiles by running:
|
||||
|
||||
```sh
|
||||
mkdir build && cd build
|
||||
cmake ..
|
||||
```
|
||||
|
||||
This will create the directory `build` and processes the CMake files
|
||||
and generate Makefiles from it. You're now able to run `make` to start
|
||||
build process.
|
||||
|
||||
If everything went well you'll find the executable at
|
||||
`build/app/poet`, but it is recommended to install the POET project
|
||||
structure to a desired `CMAKE_INSTALL_PREFIX` with `make install`.
|
||||
|
||||
During the generation of Makefiles, various options can be specified
|
||||
via `cmake -D <option>=<value> [...]`. Currently, there are the
|
||||
following available options:
|
||||
|
||||
- **POET_DHT_Debug**=_boolean_ - toggles the output of detailed statistics about
|
||||
DHT usage. Defaults to _OFF_.
|
||||
- **POET_ENABLE_TESTING**=_boolean_ - enables small set of unit tests (more to
|
||||
come). Defaults to _OFF_.
|
||||
- **POET_USE_PRM_BACKEND**=_bollean_ - use the PhreeqcRM parallelization instead
|
||||
of POET's one. Intended for debugging purposes for modellers.
|
||||
|
||||
### Example: Build from scratch
|
||||
|
||||
Assuming that only the C/C++ compiler, MPI libraries, R runtime
|
||||
environment and CMake have been installed, POET can be installed as
|
||||
follows:
|
||||
|
||||
```sh
|
||||
# start R environment
|
||||
$ R
|
||||
|
||||
# install R dependencies
|
||||
> install.packages(c("Rcpp", "RInside"))
|
||||
> q(save="no")
|
||||
|
||||
# cd into POET project root
|
||||
$ cd <POET_dir>
|
||||
|
||||
# Build process
|
||||
$ mkdir build && cd build
|
||||
$ cmake -DCMAKE_INSTALL_PREFIX=/home/<user>/poet ..
|
||||
$ make -j<max_numprocs>
|
||||
$ make install
|
||||
```
|
||||
|
||||
This will install a POET project structure into `/home/<user>/poet`
|
||||
which is called hereinafter `<POET_INSTALL_DIR>`. With this version of
|
||||
POET we **do not recommend** to install to hierarchies like
|
||||
`/usr/local/` etc.
|
||||
|
||||
The correspondending directory tree would look like this:
|
||||
|
||||
```sh
|
||||
poet
|
||||
├── bin
|
||||
│ └── poet
|
||||
├── R_lib
|
||||
│ └── kin_r_library.R
|
||||
└── share
|
||||
└── poet
|
||||
├── bench
|
||||
│ ├── dolo_diffu_inner_large.R
|
||||
│ ├── dolo_diffu_inner.R
|
||||
│ └── dolo_inner.pqi
|
||||
└── examples
|
||||
├── dol.pqi
|
||||
├── phreeqc_kin.dat
|
||||
├── SimDol1D_diffu.R
|
||||
└── SimDol2D_diffu.R
|
||||
```
|
||||
|
||||
The R libraries will be loaded at runtime and the paths are hardcoded
|
||||
absolute paths inside `poet.cpp`. So, if you consider to move
|
||||
`bin/poet` either change paths of the R source files and recompile
|
||||
POET or also move `R_lib/*` relative to the binary.
|
||||
To install this version of POET please follow the instructions in the main branche,
|
||||
next to the requiered software already listed DAOS needs to be build on the system.
|
||||
|
||||
## Running
|
||||
|
||||
Run POET by `mpirun ./poet <OPTIONS> <SIMFILE> <OUTPUT_DIRECTORY>`
|
||||
where:
|
||||
|
||||
- **OPTIONS** - runtime parameters (explained below)
|
||||
- **SIMFILE** - simulation described as R script (e.g.
|
||||
`<POET_INSTALL_DIR>/share/examples/SimDol2D_diffu.R`)
|
||||
- **OUTPUT_DIRECTORY** - path, where all output of POET should be stored
|
||||
|
||||
### Runtime options
|
||||
|
||||
The following parameters can be set:
|
||||
|
||||
| Option | Value | Description |
|
||||
|--------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||
| **--work-package-size=** | _1..n_ | size of work packages (defaults to _5_) |
|
||||
| **--ignore-result** | | disables store of simulation resuls |
|
||||
| **--dht** | | enabling DHT usage (defaults to _OFF_) |
|
||||
| **--dht-signif=** | _1..n_ | set rounding to number of significant digits (defaults to _5_) (it is recommended to use `signif_vec` in R input script) |
|
||||
| **--dht-strategy=** | _0-1_ | change DHT strategy. **NOT IMPLEMENTED YET** (Defaults to _0_) |
|
||||
| **--dht-size=** | _1-n_ | size of DHT per process involved in megabyte (defaults to _1000 MByte_) |
|
||||
| **--dht-snaps=** | _0-2_ | disable or enable storage of DHT snapshots |
|
||||
| **--dht-file=** | `<SNAPSHOT>` | initializes DHT with the given snapshot file |
|
||||
|
||||
#### Additions to `dht-signif`
|
||||
|
||||
Only used if no vector is given in setup file. For individual values
|
||||
per column use R vector `signif_vector` in `SIMFILE`.
|
||||
|
||||
#### Additions to `dht-snaps`
|
||||
|
||||
Following values can be set:
|
||||
|
||||
- _0_ = snapshots are disabled
|
||||
- _1_ = only stores snapshot at the end of the simulation with name
|
||||
`<OUTPUT_DIRECTORY>.dht`
|
||||
- _2_ = stores snapshot at the end and after each iteration iteration
|
||||
snapshot files are stored in `<DIRECTORY>/iter<n>.dht`
|
||||
|
||||
### Example: Running from scratch
|
||||
|
||||
We will continue the above example and start a simulation with
|
||||
`SimDol2D_diffu.R`. As transport a simple fixed-coefficient diffusion is used.
|
||||
It's a 2D, 100x100 grid, simulating 10 time steps. To start the simulation with
|
||||
4 processes `cd` into your previously installed POET-dir
|
||||
`<POET_INSTALL_DIR>/bin` and run:
|
||||
To run POET with DAOS the --dht (WIP) parameter needs to be set, furthermore,
|
||||
-x DAOS_POOL needs to be given as well, with DAOS_POOL being an environment variable with a running pool name,
|
||||
for example:
|
||||
|
||||
```sh
|
||||
mpirun -n 4 ./poet ../share/poet/examples/SimDol2D_diffu.R output
|
||||
export DAOS_POOL="<pool_name>"
|
||||
|
||||
mpirun -n 4 -x DAOS_POOL ./poet --dht ../bench/dolo_diffu_inner/dolo_diffu_inner.R output
|
||||
```
|
||||
|
||||
After a finished simulation all data generated by POET will be found
|
||||
in the directory `output`.
|
||||
## Differences to the main branch
|
||||
|
||||
You might want to use the DHT to cache previously simulated data and reuse them
|
||||
in further time-steps. Just append `--dht` to the options of POET to activate
|
||||
the usage of the DHT. Also, after each iteration a DHT snapshot shall be
|
||||
produced. This is done by appending the `--dht-snaps=<value>` option. The
|
||||
resulting call would look like this:
|
||||
- src/ChemistryModule/CMakeLists.txt was modified to compile DAOS code
|
||||
- src/ChemistryModule/DaosKeyValue.c implements the underlying framework to connect and disconnect from the DAOS server, as well as call the read and write operations
|
||||
- src/ChemistryModule/DHT_Wrapper.cpp was slightly modified to use DaosKeyValue.c instead of DHT.c
|
||||
|
||||
```sh
|
||||
mpirun -n 4 ./poet --dht --dht-snaps=2 ../share/poet/examples/SimDol2D_diffu.R output
|
||||
```
|
||||
|
||||
## About the usage of MPI_Wtime()
|
||||
|
||||
Implemented time measurement functions uses `MPI_Wtime()`. Some
|
||||
important information from the OpenMPI Man Page:
|
||||
|
||||
For example, on platforms that support it, the clock_gettime()
|
||||
function will be used to obtain a monotonic clock value with whatever
|
||||
precision is supported on that platform (e.g., nanoseconds).
|
||||
- include/poet/ was with the header file of DaosKeyValue extend and DHT_Wrapper.hpp was modified as well
|
||||
@ -29,9 +29,15 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
|
||||
|
||||
#include <poet/DaosKeyValue.h>
|
||||
|
||||
|
||||
#include "poet/DHT.h"
|
||||
|
||||
|
||||
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
@ -176,6 +182,7 @@ public:
|
||||
*/
|
||||
void printStatistics();
|
||||
|
||||
void changeNullCharacter(char* key);
|
||||
/**
|
||||
* @brief Get the Hits object
|
||||
*
|
||||
@ -208,7 +215,10 @@ private:
|
||||
uint32_t key_count;
|
||||
uint32_t data_count;
|
||||
|
||||
DHT *dht_object;
|
||||
uint32_t data_size = 0;
|
||||
uint32_t key_size = 0;
|
||||
|
||||
DAOSKV *daosKV_object;
|
||||
|
||||
std::vector<DHT_Keyelement> fuzzForDHT(int var_count, void *key, double dt);
|
||||
|
||||
@ -216,6 +226,7 @@ private:
|
||||
uint32_t dht_miss = 0;
|
||||
uint32_t dht_evictions = 0;
|
||||
|
||||
|
||||
std::vector<uint32_t> dht_signif_vector;
|
||||
std::vector<std::uint32_t> dht_prop_type_vector;
|
||||
std::vector<std::uint32_t> input_key_elements;
|
||||
|
||||
98
include/poet/DaosKeyValue.h
Normal file
98
include/poet/DaosKeyValue.h
Normal file
@ -0,0 +1,98 @@
|
||||
|
||||
/**
|
||||
* @file DaosKeyValue.h
|
||||
* @author Nico Sauerbrei (nico.sauerbrei@uni-potsdam.de)
|
||||
* @brief API to interact with DAOS
|
||||
* @version 0.1
|
||||
* @date 01 Jun 2023
|
||||
*
|
||||
* This file implements the communication between POET and the DAOS
|
||||
* Key-Value Store
|
||||
*/
|
||||
|
||||
#ifndef DAOS_KEY_VALUE_H
|
||||
#define DAOS_KEY_VALUE_H
|
||||
|
||||
#include <mpi.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <daos.h>
|
||||
|
||||
#define DAOS_SUCCESS 0
|
||||
#define DAOS_ERROR -1
|
||||
#define DAOS_MPI_ERROR -2
|
||||
#define DAOS_READ_MISS -3
|
||||
|
||||
/**
|
||||
* Internal struct to store statistics about read and write accesses and also
|
||||
* read misses and evictions.
|
||||
* <b>All values will be resetted to zero after a call of
|
||||
* DHT_print_statistics().</b>
|
||||
* Internal use only!
|
||||
*
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
/** Count of writes to specific process this process did. */
|
||||
int *writes_local;
|
||||
/** Writes after last call of DHT_print_statistics. */
|
||||
int old_writes;
|
||||
/** How many read misses occur? */
|
||||
int read_misses;
|
||||
/** How many buckets where evicted? */
|
||||
int evictions;
|
||||
/** How many calls of DHT_write() did this process? */
|
||||
int w_access;
|
||||
/** How many calls of DHT_read() did this process? */
|
||||
int r_access;
|
||||
} DAOSKV_stats;
|
||||
|
||||
/**
|
||||
* Struct which serves as a handler or so called \a DHT-object. Will
|
||||
* be created by DHT_create and must be passed as a parameter to every following
|
||||
* function. Stores all relevant data.
|
||||
* Do not touch outside DHT functions!
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
|
||||
/** MPI communicator of all participating processes. */
|
||||
MPI_Comm communicator;
|
||||
/** Size of the MPI communicator respectively all participating processes. */
|
||||
int comm_size;
|
||||
/** Rank of the process in the MPI communicator. */
|
||||
int rank;
|
||||
/** Count of read misses over all time. */
|
||||
int read_misses;
|
||||
/** Count of evictions over all time. */
|
||||
int evictions;
|
||||
|
||||
/** Label of the DAOS container.*/
|
||||
char *cont_label;
|
||||
/** DAOS pool handle.*/
|
||||
daos_handle_t poh;
|
||||
/** DAOS container handle.*/
|
||||
daos_handle_t coh;
|
||||
/** DAOS object handle.*/
|
||||
daos_handle_t oh;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
/** Detailed statistics of the usage of the DHT. */
|
||||
DAOS_stats *stats;
|
||||
#endif
|
||||
} DAOSKV;
|
||||
#ifdef __cplusplus
|
||||
extern "C"
|
||||
{
|
||||
#endif
|
||||
|
||||
extern DAOSKV *DAOSKV_create(MPI_Comm comm);
|
||||
extern int DAOSKV_free(DAOSKV *object);
|
||||
extern int DAOSKV_write(DAOSKV *object, void *key, int key_size, void *send_data, int send_size);
|
||||
extern int DAOSKV_read(DAOSKV *object, void *key, int key_size, void *recv_data, int recv_size);
|
||||
extern int DAOSKV_remove(DAOSKV *object, void *key, int key_size);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* DAOS_KEY_VALUE_H */
|
||||
@ -11,13 +11,26 @@ if(POET_USE_PRM_BACKEND)
|
||||
target_compile_definitions(poet_lib PRIVATE POET_USE_PRM)
|
||||
else()
|
||||
set(PHREEQCRM_BUILD_MPI OFF CACHE BOOL "" FORCE)
|
||||
list(APPEND CHEM_MODEL_SRC "WorkerFunctions.cpp" "MasterFunctions.cpp" "DHT.c" "DHT_Wrapper.cpp" "HashFunctions.cpp")
|
||||
list(APPEND CHEM_MODEL_SRC "WorkerFunctions.cpp" "MasterFunctions.cpp" "DHT_Wrapper.cpp" "HashFunctions.cpp" "DaosKeyValue.c" "DHT.c")
|
||||
endif()
|
||||
|
||||
# ML: Fuer Nico: Includes + linking fuer Daos bereitstellen
|
||||
find_library(DAOS_LIB libdaos.so
|
||||
PATH_SUFFIXES lib lib64
|
||||
)
|
||||
find_path(DAOS_INCLUDE daos.h)
|
||||
|
||||
add_library(DAOS INTERFACE IMPORTED)
|
||||
set_target_properties(DAOS PROPERTIES
|
||||
INTERFACE_LINK_LIBRARIES "${DAOS_LIB}"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${DAOS_INCLUDE}"
|
||||
)
|
||||
|
||||
add_library(ChemistryModule ${CHEM_MODEL_SRC})
|
||||
target_include_directories(ChemistryModule PUBLIC ${PROJECT_SOURCE_DIR}/include)
|
||||
target_link_libraries(ChemistryModule
|
||||
PUBLIC MPI::MPI_CXX PhreeqcRM
|
||||
PUBLIC MPI::MPI_CXX PhreeqcRM DAOS
|
||||
PRIVATE ${MATH_LIBRARY}
|
||||
)
|
||||
|
||||
|
||||
@ -22,6 +22,8 @@
|
||||
#include "poet/DHT_Types.hpp"
|
||||
#include "poet/HashFunctions.hpp"
|
||||
|
||||
#include "poet/DaosKeyValue.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cmath>
|
||||
#include <cstddef>
|
||||
@ -31,6 +33,8 @@
|
||||
#include <math.h>
|
||||
#include <stdexcept>
|
||||
#include <vector>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
using namespace poet;
|
||||
using namespace std;
|
||||
@ -54,11 +58,10 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, uint32_t dht_size,
|
||||
: key_count(key_indices.size()), data_count(data_count),
|
||||
input_key_elements(key_indices) {
|
||||
// initialize DHT object
|
||||
uint32_t key_size = (key_count + 1) * sizeof(DHT_Keyelement);
|
||||
uint32_t data_size = data_count * sizeof(double);
|
||||
uint32_t buckets_per_process = dht_size / (1 + data_size + key_size);
|
||||
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
|
||||
&poet::Murmur2_64A);
|
||||
key_size = (key_count + 1) * sizeof(DHT_Keyelement);
|
||||
data_size = data_count * sizeof(double);
|
||||
|
||||
daosKV_object = DAOSKV_create(dht_comm);
|
||||
|
||||
this->dht_signif_vector.resize(key_size, DHT_KEY_SIGNIF_DEFAULT);
|
||||
this->dht_signif_vector[0] = DHT_KEY_SIGNIF_TOTALS;
|
||||
@ -73,7 +76,7 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, uint32_t dht_size,
|
||||
|
||||
DHT_Wrapper::~DHT_Wrapper() {
|
||||
// free DHT
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
DAOSKV_free(daosKV_object);
|
||||
}
|
||||
auto DHT_Wrapper::checkDHT(int length, double dt,
|
||||
const std::vector<double> &work_package,
|
||||
@ -94,18 +97,20 @@ auto DHT_Wrapper::checkDHT(int length, double dt,
|
||||
auto &data = dht_results.results[i];
|
||||
auto &key_vector = dht_results.keys[i];
|
||||
|
||||
|
||||
data.resize(this->data_count);
|
||||
key_vector = fuzzForDHT(this->key_count, key, dt);
|
||||
|
||||
// overwrite input with data from DHT, IF value is found in DHT
|
||||
int res = DHT_read(this->dht_object, key_vector.data(), data.data());
|
||||
|
||||
int res = DAOSKV_read(this->daosKV_object,
|
||||
key_vector.data(),key_vector.size(),
|
||||
data.data(), data.size());
|
||||
|
||||
switch (res) {
|
||||
case DHT_SUCCESS:
|
||||
case DAOS_SUCCESS:
|
||||
dht_results.needPhreeqc[i] = false;
|
||||
this->dht_hits++;
|
||||
break;
|
||||
case DHT_READ_MISS:
|
||||
case DAOS_READ_MISS:
|
||||
dht_results.needPhreeqc[i] = true;
|
||||
new_mapping.push_back(curr_mapping[i]);
|
||||
this->dht_miss++;
|
||||
@ -123,17 +128,14 @@ void DHT_Wrapper::fillDHT(int length, const std::vector<double> &work_package) {
|
||||
for (int i = 0; i < length; i++) {
|
||||
// If true grid cell was simulated, needs to be inserted into dht
|
||||
if (dht_results.needPhreeqc[i]) {
|
||||
const auto &key = dht_results.keys[i];
|
||||
auto &key = dht_results.keys[i];
|
||||
void *data = (void *)&(work_package[i * this->data_count]);
|
||||
// fuzz data (round, logarithm etc.)
|
||||
|
||||
// insert simulated data with fuzzed key into DHT
|
||||
int res = DHT_write(this->dht_object, (void *)key.data(), data);
|
||||
int res = DAOSKV_read(this->daosKV_object,
|
||||
(void *) key.data(), key.size(),
|
||||
data, data_size);
|
||||
|
||||
// if data was successfully written ...
|
||||
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
|
||||
dht_evictions++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -148,28 +150,30 @@ void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
|
||||
}
|
||||
|
||||
int DHT_Wrapper::tableToFile(const char *filename) {
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
//int res = DHT_to_file(daosKV_object, filename);
|
||||
int res = 1;
|
||||
return res;
|
||||
}
|
||||
|
||||
int DHT_Wrapper::fileToTable(const char *filename) {
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
//&int res = DHT_from_file(daosKV_object, filename);
|
||||
int res = 1;
|
||||
if (res != DHT_SUCCESS)
|
||||
return res;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
DHT_print_statistics(dht_object);
|
||||
//DHT_print_statistics(daosKV_object);
|
||||
#endif
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
void DHT_Wrapper::printStatistics() {
|
||||
int res;
|
||||
int res = 1;
|
||||
|
||||
res = DHT_print_statistics(dht_object);
|
||||
//res = DHT_print_statistics(daosKV_object);
|
||||
|
||||
if (res != DHT_SUCCESS) {
|
||||
if (res != DAOS_SUCCESS) {
|
||||
// MPI ERROR ... WHAT TO DO NOW?
|
||||
// RUNNING CIRCLES WHILE SCREAMING
|
||||
}
|
||||
|
||||
305
src/ChemistryModule/DaosKeyValue.c
Normal file
305
src/ChemistryModule/DaosKeyValue.c
Normal file
@ -0,0 +1,305 @@
|
||||
/*
|
||||
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
|
||||
**
|
||||
** POET is free software; you can redistribute it and/or modify it under the
|
||||
** terms of the GNU General Public License as published by the Free Software
|
||||
** Foundation; either version 2 of the License, or (at your option) any later
|
||||
** version.
|
||||
**
|
||||
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
**
|
||||
** You should have received a copy of the GNU General Public License along with
|
||||
** this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include "poet/DaosKeyValue.h"
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <math.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <daos.h>
|
||||
|
||||
enum handleType
|
||||
{
|
||||
HANDLE_POOL,
|
||||
HANDLE_CO,
|
||||
};
|
||||
|
||||
#define ENUM_DESC_BUF 512
|
||||
#define ENUM_DESC_NR 5
|
||||
|
||||
enum
|
||||
{
|
||||
OBJ_DKEY,
|
||||
OBJ_AKEY
|
||||
};
|
||||
|
||||
static inline void
|
||||
handle_share(DAOSKV *object, int type)
|
||||
{
|
||||
d_iov_t ghdl = {NULL, 0, 0};
|
||||
int rc;
|
||||
|
||||
if (object->rank == 0)
|
||||
{
|
||||
/** fetch size of global handle */
|
||||
if (type == HANDLE_POOL)
|
||||
rc = daos_pool_local2global(object->poh, &ghdl);
|
||||
else
|
||||
rc = daos_cont_local2global(object->coh, &ghdl);
|
||||
}
|
||||
|
||||
/** broadcast size of global handle to all peers */
|
||||
MPI_Bcast(&ghdl.iov_buf_len, 1, MPI_UINT64_T, 0, object->communicator);
|
||||
|
||||
/** allocate buffer for global pool handle */
|
||||
ghdl.iov_buf = malloc(ghdl.iov_buf_len);
|
||||
ghdl.iov_len = ghdl.iov_buf_len;
|
||||
|
||||
if (object->rank == 0)
|
||||
{
|
||||
/** generate actual global handle to share with peer tasks */
|
||||
if (type == HANDLE_POOL)
|
||||
rc = daos_pool_local2global(object->poh, &ghdl);
|
||||
else
|
||||
rc = daos_cont_local2global(object->coh, &ghdl);
|
||||
}
|
||||
|
||||
/** broadcast global handle to all peers */
|
||||
MPI_Bcast(ghdl.iov_buf, ghdl.iov_len, MPI_BYTE, 0, object->communicator);
|
||||
|
||||
if (object->rank != 0)
|
||||
{
|
||||
/** unpack global handle */
|
||||
if (type == HANDLE_POOL)
|
||||
{
|
||||
/* NB: Only pool_global2local are different */
|
||||
rc = daos_pool_global2local(ghdl, &object->poh);
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = daos_cont_global2local(object->poh, ghdl, &object->coh);
|
||||
}
|
||||
}
|
||||
|
||||
free(ghdl.iov_buf);
|
||||
|
||||
MPI_Barrier(object->communicator);
|
||||
}
|
||||
|
||||
DAOSKV *DAOSKV_create(MPI_Comm comm)
|
||||
{
|
||||
|
||||
DAOSKV *object;
|
||||
int rc;
|
||||
|
||||
object = (DAOSKV *)malloc(sizeof(DAOSKV));
|
||||
if (object == NULL)
|
||||
return NULL;
|
||||
|
||||
// fill daos-object
|
||||
object->communicator = comm;
|
||||
object->read_misses = 0;
|
||||
object->evictions = 0;
|
||||
|
||||
// if set, initialize dht_stats
|
||||
#ifdef DHT_STATISTICS
|
||||
DHT_stats *stats;
|
||||
|
||||
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
|
||||
if (stats == NULL)
|
||||
return NULL;
|
||||
|
||||
object->stats = stats;
|
||||
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
|
||||
object->stats->old_writes = 0;
|
||||
object->stats->read_misses = 0;
|
||||
object->stats->evictions = 0;
|
||||
object->stats->w_access = 0;
|
||||
object->stats->r_access = 0;
|
||||
#endif
|
||||
|
||||
MPI_Comm_rank(comm, &object->rank);
|
||||
MPI_Comm_size(comm, &object->comm_size);
|
||||
|
||||
/** initialize the local DAOS stack */
|
||||
if (daos_init() != 0)
|
||||
return NULL;
|
||||
|
||||
/** Call connect on rank 0 only and broadcast handle to others */
|
||||
if (object->rank == 0)
|
||||
{
|
||||
char *pool_name = getenv("DAOS_POOL");
|
||||
if(pool_name == NULL)
|
||||
printf("Pool label invalid \n");
|
||||
if (daos_pool_connect(pool_name, NULL, DAOS_PC_RW, &object->poh,
|
||||
NULL, NULL) != 0)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/** share pool handle with peer tasks */
|
||||
handle_share(object, HANDLE_POOL);
|
||||
|
||||
/*
|
||||
* Create and open container on rank 0 and share the handle.
|
||||
*
|
||||
* Alternatively, one could create the container outside of this program
|
||||
* using the daos utility: daos cont create --pool=puuid
|
||||
* and pass the uuid to the app.
|
||||
*/
|
||||
if (object->rank == 0)
|
||||
{
|
||||
/** create container */
|
||||
if (getenv("DAOS_CONT") != NULL)
|
||||
{
|
||||
object->cont_label= getenv("DAOS_CONT");
|
||||
}
|
||||
else
|
||||
{
|
||||
object->cont_label = "Poet_Pool2";
|
||||
}
|
||||
|
||||
/** check & open container if it already exist */
|
||||
if (0 != daos_cont_open(object->poh, object->cont_label, DAOS_COO_RW, &object->coh, NULL, NULL))
|
||||
{
|
||||
/** create & open container*/
|
||||
daos_cont_create_with_label(object->poh, object->cont_label, NULL, NULL, NULL);
|
||||
|
||||
/** open container */
|
||||
if (daos_cont_open(object->poh, object->cont_label, DAOS_COO_RW, &object->coh, NULL, NULL) != 0)
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
/** share container handle with peer tasks */
|
||||
handle_share(object, HANDLE_CO);
|
||||
|
||||
/** open object */
|
||||
|
||||
daos_obj_id_t oid;
|
||||
|
||||
oid.hi = 0;
|
||||
oid.lo = 4;
|
||||
|
||||
daos_obj_generate_oid(object->coh, &oid, 0, OC_SX, 0, 0);
|
||||
|
||||
daos_obj_open(object->coh, oid, DAOS_OO_RW, &object->oh, NULL);
|
||||
|
||||
return object;
|
||||
}
|
||||
|
||||
int DAOSKV_free(DAOSKV *object)
|
||||
{
|
||||
MPI_Barrier(object->communicator);
|
||||
|
||||
if (daos_obj_close(object->oh, NULL) != 0)
|
||||
return DAOS_ERROR;
|
||||
if (daos_cont_close(object->coh, NULL) != 0)
|
||||
return DAOS_ERROR;
|
||||
|
||||
if (object->rank == 0)
|
||||
{
|
||||
daos_cont_destroy(object->poh, object->cont_label, 0, NULL);
|
||||
}
|
||||
if (daos_pool_disconnect(object->poh, NULL) != 0)
|
||||
return DAOS_ERROR;
|
||||
|
||||
if (daos_fini() != 0)
|
||||
return DAOS_ERROR;
|
||||
|
||||
return DAOS_SUCCESS;
|
||||
}
|
||||
|
||||
int DAOSKV_write(DAOSKV *object, void *key, int key_size, void *send_data, int send_size)
|
||||
{
|
||||
int rc;
|
||||
|
||||
d_iov_t dkey;
|
||||
d_sg_list_t sgl;
|
||||
d_iov_t sg_iov;
|
||||
daos_iod_t iod;
|
||||
|
||||
// set dkey
|
||||
d_iov_set(&dkey, key, key_size);
|
||||
|
||||
d_iov_set(&sg_iov, send_data, send_size);
|
||||
sgl.sg_nr = 1;
|
||||
sgl.sg_nr_out = 0;
|
||||
sgl.sg_iovs = &sg_iov;
|
||||
|
||||
// set akey
|
||||
// Here for all the same value since dkey differ
|
||||
// Maybe other way around is better? Or a mix?
|
||||
int akey = 1;
|
||||
d_iov_set(&iod.iod_name, &akey, sizeof(int));
|
||||
|
||||
iod.iod_nr = 1; /** has to be 1 for single value */
|
||||
iod.iod_size = send_size; /** size of the single value */
|
||||
iod.iod_recxs = NULL; /** recx is ignored for single value */
|
||||
iod.iod_type = DAOS_IOD_SINGLE; /** value type of the akey */
|
||||
|
||||
rc = daos_obj_update(object->oh, DAOS_TX_NONE, 0, &dkey, 1, &iod, &sgl,
|
||||
NULL);
|
||||
|
||||
if (rc != 0)
|
||||
return DAOS_ERROR;
|
||||
|
||||
return DAOS_SUCCESS;
|
||||
}
|
||||
|
||||
int DAOSKV_read(DAOSKV *object, void *key, int key_size, void *recv_data, int recv_size)
|
||||
{
|
||||
int rc;
|
||||
|
||||
d_iov_t dkey;
|
||||
d_sg_list_t sgl;
|
||||
d_iov_t sg_iov;
|
||||
daos_iod_t iod;
|
||||
|
||||
// set dkey
|
||||
d_iov_set(&dkey, key, key_size);
|
||||
|
||||
d_iov_set(&sg_iov, recv_data, recv_size);
|
||||
sgl.sg_nr = 1;
|
||||
sgl.sg_nr_out = 0;
|
||||
sgl.sg_iovs = &sg_iov;
|
||||
|
||||
// set akey
|
||||
// Here for all the same value since dkey differ
|
||||
// Maybe other way around is better? Or a mix?
|
||||
int akey = 1;
|
||||
d_iov_set(&iod.iod_name, &akey, sizeof(int));
|
||||
|
||||
iod.iod_nr = 1; /** 1 for single value */
|
||||
iod.iod_size = DAOS_REC_ANY; /** size of the single value, set to DAOS_REC_ANY to check if key was written or not */
|
||||
iod.iod_recxs = NULL; /** recx is ignored for single value */
|
||||
iod.iod_type = DAOS_IOD_SINGLE; /** value type of the akey */
|
||||
|
||||
/** fetch a dkey */
|
||||
rc = daos_obj_fetch(object->oh, DAOS_TX_NONE, 0, &dkey, 1, &iod, &sgl,
|
||||
NULL, NULL);
|
||||
if (rc != 0)
|
||||
return DAOS_ERROR;
|
||||
|
||||
if (iod.iod_size == 0)
|
||||
return DAOS_READ_MISS;
|
||||
|
||||
return DAOS_SUCCESS;
|
||||
}
|
||||
|
||||
int DAOSKV_remove(DAOSKV *object, void *key, int key_size)
|
||||
{
|
||||
d_iov_t dkey;
|
||||
|
||||
// set dkey
|
||||
d_iov_set(&dkey, key, key_size);
|
||||
if (daos_obj_punch_dkeys(object->oh, DAOS_TX_NONE, 0, 1, &dkey, NULL) != 0)
|
||||
return DAOS_ERROR;
|
||||
|
||||
return DAOS_SUCCESS;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user