mirror of
https://git.gfz-potsdam.de/naaice/poet.git
synced 2025-12-16 04:48:23 +01:00
Compare commits
39 Commits
main
...
archive/ic
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c226834db | ||
|
|
8d02293cf5 | ||
|
|
f04d1c491d | ||
|
|
751faeabc8 | ||
|
|
680069b8ac | ||
|
|
d10bf50f66 | ||
|
|
534cc0f6bc | ||
|
|
d5e11c851b | ||
|
|
b0a4e7d64f | ||
|
|
2c17604882 | ||
|
|
7828b00268 | ||
|
|
6abf18ae73 | ||
|
|
64568d5074 | ||
|
|
b0dbe9e7b8 | ||
|
|
e3b7824dd4 | ||
|
|
b3e879635a | ||
|
|
9c9abdb7ef | ||
|
|
be7a861c78 | ||
|
|
106f9c519e | ||
|
|
b1277bac22 | ||
|
|
0c31a9c372 | ||
|
|
32b2c5c741 | ||
|
|
115dd87ba3 | ||
|
|
272f010047 | ||
|
|
4f0a8f3742 | ||
|
|
7187582910 | ||
|
|
3a2769b1f6 | ||
|
|
7dfe4efc23 | ||
|
|
75f1036e3e | ||
|
|
358aa51836 | ||
|
|
d6c4be6ea3 | ||
|
|
b78d7d1a9b | ||
|
|
031a2e237b | ||
|
|
201564d4d1 | ||
|
|
43b70b089e | ||
|
|
664d92adf6 | ||
|
|
9f6b27206e | ||
|
|
5e9639c72d | ||
|
|
430769f247 |
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -8,3 +8,6 @@
|
||||
[submodule "ext/doctest"]
|
||||
path = ext/doctest
|
||||
url = https://github.com/doctest/doctest.git
|
||||
[submodule "ext/DHT"]
|
||||
path = ext/DHT
|
||||
url = git@gitup.uni-potsdam.de:mluebke/dht_ucx.git
|
||||
|
||||
@ -19,8 +19,9 @@ get_poet_version()
|
||||
# set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS") add_definitions(${GCC_CXX_FLAGS})
|
||||
|
||||
find_package(MPI REQUIRED)
|
||||
|
||||
find_package(OpenMP)
|
||||
find_package(RRuntime REQUIRED)
|
||||
find_package(OpenSSL REQUIRED)
|
||||
|
||||
add_subdirectory(src)
|
||||
add_subdirectory(bench)
|
||||
@ -31,6 +32,10 @@ set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
|
||||
add_subdirectory(ext/tug EXCLUDE_FROM_ALL)
|
||||
add_subdirectory(ext/phreeqcrm EXCLUDE_FROM_ALL)
|
||||
|
||||
option(DHT_WITH_MPI "" ON)
|
||||
mark_as_advanced(DHT_WITH_MPI)
|
||||
add_subdirectory(ext/DHT EXCLUDE_FROM_ALL)
|
||||
|
||||
option(POET_ENABLE_TESTING "Build test suite for POET" OFF)
|
||||
|
||||
if (POET_ENABLE_TESTING)
|
||||
|
||||
@ -1,3 +1,3 @@
|
||||
add_subdirectory(dolo)
|
||||
add_subdirectory(surfex)
|
||||
add_subdirectory(barite)
|
||||
#add_subdirectory(surfex)
|
||||
#add_subdirectory(barite)
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
install(FILES
|
||||
dolo_diffu_inner.R
|
||||
dolo_diffu_inner_large.R
|
||||
# dolo_diffu_inner.R
|
||||
# dolo_diffu_inner_large.R
|
||||
dolo_inner.pqi
|
||||
dolo_interp_long.R
|
||||
# dolo_interp_long.R
|
||||
dolo_adv.R
|
||||
phreeqc_kin.dat
|
||||
DESTINATION
|
||||
share/poet/bench/dolo
|
||||
|
||||
219
bench/dolo/dolo_adv.R
Normal file
219
bench/dolo/dolo_adv.R
Normal file
@ -0,0 +1,219 @@
|
||||
## Time-stamp: "Last modified 2023-09-06 10:58:16 mluebke"
|
||||
|
||||
database <- normalizePath("../share/poet/bench/dolo/phreeqc_kin.dat")
|
||||
input_script <- normalizePath("../share/poet/bench/dolo/dolo_inner.pqi")
|
||||
|
||||
#################################################################
|
||||
## Section 1 ##
|
||||
## Grid initialization ##
|
||||
#################################################################
|
||||
|
||||
n <- 1500
|
||||
m <- 500
|
||||
|
||||
types <- c("scratch", "phreeqc", "rds")
|
||||
|
||||
init_cell <- list(
|
||||
"H" = 110.683,
|
||||
"O" = 55.3413,
|
||||
"Charge" = -5.0822e-19,
|
||||
"C(4)" = 1.2279E-4,
|
||||
"Ca" = 1.2279E-4,
|
||||
"Cl" = 0,
|
||||
"Mg" = 0,
|
||||
"O2g" = 0.499957,
|
||||
"Calcite" = 2.07e-4,
|
||||
"Dolomite" = 0
|
||||
)
|
||||
|
||||
grid <- list(
|
||||
n_cells = c(n, m),
|
||||
s_cells = c(n, m),
|
||||
type = types[1]
|
||||
)
|
||||
|
||||
##################################################################
|
||||
## Section 2 ##
|
||||
## Advection parameters ##
|
||||
##################################################################
|
||||
|
||||
## initial conditions
|
||||
|
||||
## HACK: We need the chemical initialization here, as chem module initialization
|
||||
## depends on transport until now. This will change in the future.
|
||||
init_adv <- c(
|
||||
"H" = 110.124,
|
||||
"O" = 55.0622,
|
||||
"Charge" = -1.216307659761E-09,
|
||||
"C(4)" = 1.230067028174E-04,
|
||||
"Ca" = 1.230067028174E-04,
|
||||
"Cl" = 0,
|
||||
"Mg" = 0
|
||||
)
|
||||
|
||||
## list of boundary conditions/inner nodes
|
||||
vecinj_adv <- list(
|
||||
list(
|
||||
"H" = 110.683,
|
||||
"O" = 55.3413,
|
||||
"Charge" = 1.90431e-16,
|
||||
"C(4)" = 0,
|
||||
"Ca" = 0,
|
||||
"Cl" = 0.002,
|
||||
"Mg" = 0.001
|
||||
),
|
||||
list(
|
||||
# "H" = 110.124,
|
||||
# "O" = 55.0622,
|
||||
# "Charge" = -1.216307659761E-09,
|
||||
# "C(4)" = 1.230067028174E-04,
|
||||
# "Ca" = 1.230067028174E-04,
|
||||
# "Cl" = 0,
|
||||
# "Mg" = 0
|
||||
"H" = 110.124,
|
||||
"O" = 55.0622,
|
||||
"Charge" = -1.217e-09,
|
||||
"C(4)" = 0,
|
||||
"Ca" = 0,
|
||||
"Cl" = 0,
|
||||
"Mg" = 0
|
||||
)
|
||||
)
|
||||
|
||||
vecinj_inner <- list(
|
||||
# l1 = c(2, 1, 1)
|
||||
)
|
||||
|
||||
# Create a list to store grid cell information
|
||||
flux_list <- list()
|
||||
|
||||
# Function to get the index of a grid cell given its row and column
|
||||
get_index <- function(row, col) {
|
||||
index <- (row - 1) * m + col
|
||||
if (index < 1) {
|
||||
index <- -1
|
||||
} else if (index > n * m) {
|
||||
index <- -1
|
||||
}
|
||||
return(index)
|
||||
}
|
||||
|
||||
flux_val <- 0.005
|
||||
# Loop through each row and column to populate the flux_list
|
||||
for (row in 1:n) {
|
||||
for (col in 1:m) {
|
||||
index <- get_index(row, col)
|
||||
|
||||
# Initialize the connections for the current cell
|
||||
flux <- c()
|
||||
|
||||
# Check and add connections to the east, south, west, and north cells
|
||||
# east
|
||||
flux <- c(flux, -flux_val)
|
||||
|
||||
# south
|
||||
flux <- c(flux, -flux_val)
|
||||
|
||||
# west
|
||||
flux <- c(flux, flux_val)
|
||||
|
||||
# north
|
||||
flux <- c(flux, flux_val)
|
||||
|
||||
# Store the connections in the flux_list
|
||||
flux_list[[index]] <- flux
|
||||
}
|
||||
}
|
||||
|
||||
vecinj <- do.call(rbind.data.frame, vecinj_adv)
|
||||
names(vecinj) <- names(init_adv)
|
||||
|
||||
advection <- list(
|
||||
init = init_adv,
|
||||
vecinj = vecinj,
|
||||
vecinj_inner = vecinj_inner,
|
||||
const_flux = flux_list
|
||||
)
|
||||
|
||||
#################################################################
|
||||
## Section 3 ##
|
||||
## Chemistry module (Phreeqc) ##
|
||||
#################################################################
|
||||
|
||||
|
||||
## # optional when using DHT
|
||||
dht_species <- c(
|
||||
"H" = 3,
|
||||
"O" = 3,
|
||||
"Charge" = 3,
|
||||
"C(4)" = 6,
|
||||
"Ca" = 6,
|
||||
"Cl" = 3,
|
||||
"Mg" = 5,
|
||||
"Calcite" = 4,
|
||||
"Dolomite" = 4
|
||||
)
|
||||
|
||||
check_sign_cal_dol_dht <- function(old, new) {
|
||||
if ((old["Calcite"] == 0) != (new["Calcite"] == 0)) {
|
||||
return(TRUE)
|
||||
}
|
||||
if ((old["Dolomite"] == 0) != (new["Dolomite"] == 0)) {
|
||||
return(TRUE)
|
||||
}
|
||||
return(FALSE)
|
||||
}
|
||||
|
||||
fuzz_input_dht_keys <- function(input) {
|
||||
return(input[names(dht_species)])
|
||||
}
|
||||
|
||||
check_sign_cal_dol_interp <- function(to_interp, data_set) {
|
||||
data_set <- as.data.frame(do.call(rbind, data_set), check.names = FALSE, optional = TRUE)
|
||||
names(data_set) <- names(dht_species)
|
||||
cal <- (data_set$Calcite == 0) == (to_interp["Calcite"] == 0)
|
||||
dol <- (data_set$Dolomite == 0) == (to_interp["Dolomite"] == 0)
|
||||
|
||||
cal_dol_same_sig <- cal == dol
|
||||
return(rev(which(!cal_dol_same_sig)))
|
||||
}
|
||||
|
||||
check_neg_cal_dol <- function(result) {
|
||||
neg_sign <- (result["Calcite"] <- 0) || (result["Dolomite"] < 0)
|
||||
return(any(neg_sign))
|
||||
}
|
||||
|
||||
hooks <- list(
|
||||
dht_fill = check_sign_cal_dol_dht,
|
||||
dht_fuzz = fuzz_input_dht_keys,
|
||||
interp_pre_func = check_sign_cal_dol_interp,
|
||||
interp_post_func = check_neg_cal_dol
|
||||
)
|
||||
|
||||
chemistry <- list(
|
||||
database = database,
|
||||
input_script = input_script,
|
||||
dht_species = dht_species,
|
||||
hooks = hooks
|
||||
)
|
||||
|
||||
#################################################################
|
||||
## Section 4 ##
|
||||
## Putting all those things together ##
|
||||
#################################################################
|
||||
|
||||
|
||||
iterations <- 500
|
||||
dt <- 600
|
||||
|
||||
out_save <- c(1, iterations)
|
||||
|
||||
setup <- list(
|
||||
grid = grid,
|
||||
advection = advection,
|
||||
chemistry = chemistry,
|
||||
iterations = iterations,
|
||||
timesteps = rep(dt, iterations),
|
||||
store_result = TRUE,
|
||||
out_save = out_save
|
||||
)
|
||||
@ -1265,7 +1265,7 @@ Calcite
|
||||
10 moles=0
|
||||
20 IF ((M<=0) and (SI("Calcite")<0)) then goto 200
|
||||
30 R=8.314462 # in J*K-1*mol-1
|
||||
40 deltaT=1/TK-1/298.15 # wird für 40°C berechnet; TK is temp in Kelvin
|
||||
40 deltaT=1/TK-1/298.15 # wird f<EFBFBD>r 40<34>C berechnet; TK is temp in Kelvin
|
||||
50 e=2.718282 # Eulersche Zahl
|
||||
## mechanism 1 (acid)
|
||||
60 Ea=14400 # Aktivierungsenergie in J/mol => 65.0 in KJ/mol
|
||||
@ -1287,7 +1287,7 @@ Dolomite
|
||||
10 moles=0
|
||||
20 IF ((M<=0) and (SI("Dolomite")<0)) then goto 200
|
||||
30 R=8.314462 # in J*K-1*mol-1
|
||||
40 deltaT=1/TK-1/298.15 # wird für 40°C berechnet; TK is temp in Kelvin
|
||||
40 deltaT=1/TK-1/298.15 # wird f<EFBFBD>r 40<34>C berechnet; TK is temp in Kelvin
|
||||
50 e=2.718282 # Eulersche Zahl
|
||||
## mechanism 1 (acid)
|
||||
60 Ea=36100 # Aktivierungsenergie in J/mol => 65.0 in KJ/mol
|
||||
|
||||
1
ext/DHT
Submodule
1
ext/DHT
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit a22a6b2eee7c34441d6810d47ac2d9b25ce183e5
|
||||
@ -1,49 +1,85 @@
|
||||
add_library(poetlib
|
||||
set(POET_SOURCES
|
||||
Base/Grid.cpp
|
||||
Base/SimParams.cpp
|
||||
Chemistry/ChemistryModule.cpp
|
||||
Chemistry/MasterFunctions.cpp
|
||||
Chemistry/WorkerFunctions.cpp
|
||||
Chemistry/SurrogateModels/DHT_Wrapper.cpp
|
||||
Chemistry/SurrogateModels/DHT.c
|
||||
Chemistry/SurrogateModels/HashFunctions.cpp
|
||||
Chemistry/SurrogateModels/InterpolationModule.cpp
|
||||
Chemistry/SurrogateModels/ProximityHashTable.cpp
|
||||
DataStructures/Field.cpp
|
||||
Transport/DiffusionModule.cpp
|
||||
Transport/AdvectionModule.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(poetlib PUBLIC
|
||||
MPI::MPI_C
|
||||
${MATH_LIBRARY}
|
||||
RRuntime
|
||||
PhreeqcRM
|
||||
tug
|
||||
)
|
||||
|
||||
target_compile_definitions(poetlib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
|
||||
|
||||
mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP)
|
||||
set(PHREEQCRM_DISABLE_OPENMP ON CACHE BOOL "" FORCE)
|
||||
|
||||
option(POET_DHT_DEBUG "Build with DHT debug info" OFF)
|
||||
|
||||
if(POET_DHT_DEBUG)
|
||||
target_compile_definitions(poetlib PRIVATE DHT_STATISTICS)
|
||||
endif()
|
||||
|
||||
option(POET_PHT_ADDITIONAL_INFO "Enables additional information in the PHT" OFF)
|
||||
|
||||
if (POET_PHT_ADDITIONAL_INFO)
|
||||
target_compile_definitions(poetlib PRIVATE POET_PHT_ADD)
|
||||
endif()
|
||||
# option(POET_USE_DHT_MPI "Use MPI for DHT" OFF)
|
||||
#
|
||||
# if (NOT POET_USE_DHT_MPI)
|
||||
# target_compile_definitions(poetlib PUBLIC POET_DHT_UCX)
|
||||
# target_link_libraries(poetlib PUBLIC DHT_UCX)
|
||||
# else()
|
||||
# target_link_libraries(poetlib PUBLIC DHT_MPI)
|
||||
# endif()
|
||||
#
|
||||
# target_compile_definitions(poetlib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
|
||||
#
|
||||
# mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP)
|
||||
# set(PHREEQCRM_DISABLE_OPENMP ON CACHE BOOL "" FORCE)
|
||||
#
|
||||
# option(POET_DHT_DEBUG "Build with DHT debug info" OFF)
|
||||
#
|
||||
# if(POET_DHT_DEBUG)
|
||||
# target_compile_definitions(poetlib PRIVATE DHT_STATISTICS)
|
||||
# endif()
|
||||
#
|
||||
# option(POET_PHT_ADDITIONAL_INFO "Enables additional information in the PHT" OFF)
|
||||
#
|
||||
# if (POET_PHT_ADDITIONAL_INFO)
|
||||
# target_compile_definitions(poetlib PRIVATE POET_PHT_ADD)
|
||||
# endif()
|
||||
#
|
||||
# if (OpenMP_FOUND)
|
||||
# target_link_libraries(poetlib PUBLIC OpenMP::OpenMP_CXX)
|
||||
# endif()
|
||||
|
||||
file(READ "${PROJECT_SOURCE_DIR}/R_lib/kin_r_library.R" R_KIN_LIB )
|
||||
|
||||
configure_file(poet.hpp.in poet.hpp @ONLY)
|
||||
|
||||
add_executable(poet poet.cpp)
|
||||
target_link_libraries(poet PRIVATE poetlib MPI::MPI_C RRuntime)
|
||||
target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
|
||||
add_executable(poet_coarse poet.cpp ${POET_SOURCES})
|
||||
target_link_libraries(poet_coarse PRIVATE
|
||||
MPI::MPI_C
|
||||
${MATH_LIBRARY}
|
||||
RRuntime
|
||||
PhreeqcRM
|
||||
tug
|
||||
OpenSSL::Crypto
|
||||
MPIDHT::coarse
|
||||
)
|
||||
target_include_directories(poet_coarse PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
|
||||
install(TARGETS poet_coarse DESTINATION bin)
|
||||
|
||||
install(TARGETS poet DESTINATION bin)
|
||||
add_executable(poet_fine poet.cpp ${POET_SOURCES})
|
||||
target_link_libraries(poet_fine PRIVATE
|
||||
MPI::MPI_C
|
||||
${MATH_LIBRARY}
|
||||
RRuntime
|
||||
PhreeqcRM
|
||||
tug
|
||||
OpenSSL::Crypto
|
||||
MPIDHT::fine
|
||||
)
|
||||
target_include_directories(poet_fine PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
|
||||
install(TARGETS poet_fine DESTINATION bin)
|
||||
|
||||
add_executable(poet_no poet.cpp ${POET_SOURCES})
|
||||
target_link_libraries(poet_no PRIVATE
|
||||
MPI::MPI_C
|
||||
${MATH_LIBRARY}
|
||||
RRuntime
|
||||
PhreeqcRM
|
||||
tug
|
||||
OpenSSL::Crypto
|
||||
MPIDHT::nolock
|
||||
)
|
||||
target_include_directories(poet_no PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
|
||||
install(TARGETS poet_no DESTINATION bin)
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
#include "ChemistryModule.hpp"
|
||||
|
||||
#include "SurrogateModels/DHT_Wrapper.hpp"
|
||||
#include "SurrogateModels/Interpolation.hpp"
|
||||
// #include "SurrogateModels/Interpolation.hpp"
|
||||
|
||||
#include <PhreeqcRM.h>
|
||||
|
||||
@ -338,12 +338,12 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
|
||||
|
||||
this->dht_enabled = this->params.use_dht;
|
||||
|
||||
if (this->params.use_interp) {
|
||||
initializeInterp(this->params.pht_max_entries, this->params.pht_size,
|
||||
this->params.interp_min_entries,
|
||||
this->params.pht_signifs);
|
||||
this->interp_enabled = this->params.use_interp;
|
||||
}
|
||||
// if (this->params.use_interp) {
|
||||
// initializeInterp(this->params.pht_max_entries, this->params.pht_size,
|
||||
// this->params.interp_min_entries,
|
||||
// this->params.pht_signifs);
|
||||
// this->interp_enabled = this->params.use_interp;
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@ -453,47 +453,47 @@ void poet::ChemistryModule::setDHTReadFile(const std::string &input_file) {
|
||||
}
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::initializeInterp(
|
||||
std::uint32_t bucket_size, std::uint32_t size_mb, std::uint32_t min_entries,
|
||||
const NamedVector<std::uint32_t> &key_species) {
|
||||
// void poet::ChemistryModule::initializeInterp(
|
||||
// std::uint32_t bucket_size, std::uint32_t size_mb, std::uint32_t
|
||||
// min_entries, const NamedVector<std::uint32_t> &key_species) {
|
||||
|
||||
if (!this->is_master) {
|
||||
// if (!this->is_master) {
|
||||
|
||||
constexpr uint32_t MB_FACTOR = 1E6;
|
||||
// constexpr uint32_t MB_FACTOR = 1E6;
|
||||
|
||||
assert(this->dht);
|
||||
// assert(this->dht);
|
||||
|
||||
this->interp_enabled = true;
|
||||
// this->interp_enabled = true;
|
||||
|
||||
auto map_copy = key_species;
|
||||
// auto map_copy = key_species;
|
||||
|
||||
if (key_species.empty()) {
|
||||
map_copy = this->dht->getKeySpecies();
|
||||
for (std::size_t i = 0; i < map_copy.size(); i++) {
|
||||
const std::uint32_t signif =
|
||||
map_copy[i] - (map_copy[i] > InterpolationModule::COARSE_DIFF
|
||||
? InterpolationModule::COARSE_DIFF
|
||||
: 0);
|
||||
map_copy[i] = signif;
|
||||
}
|
||||
}
|
||||
// if (key_species.empty()) {
|
||||
// map_copy = this->dht->getKeySpecies();
|
||||
// for (std::size_t i = 0; i < map_copy.size(); i++) {
|
||||
// const std::uint32_t signif =
|
||||
// map_copy[i] - (map_copy[i] > InterpolationModule::COARSE_DIFF
|
||||
// ? InterpolationModule::COARSE_DIFF
|
||||
// : 0);
|
||||
// map_copy[i] = signif;
|
||||
// }
|
||||
// }
|
||||
|
||||
auto key_indices =
|
||||
parseDHTSpeciesVec(map_copy, dht->getKeySpecies().getNames());
|
||||
// auto key_indices =
|
||||
// parseDHTSpeciesVec(map_copy, dht->getKeySpecies().getNames());
|
||||
|
||||
if (this->interp) {
|
||||
this->interp.reset();
|
||||
}
|
||||
// if (this->interp) {
|
||||
// this->interp.reset();
|
||||
// }
|
||||
|
||||
const uint64_t pht_size = size_mb * MB_FACTOR;
|
||||
// const uint64_t pht_size = size_mb * MB_FACTOR;
|
||||
|
||||
interp = std::make_unique<poet::InterpolationModule>(
|
||||
bucket_size, pht_size, min_entries, *(this->dht), map_copy, key_indices,
|
||||
this->prop_names, this->params.hooks);
|
||||
// interp = std::make_unique<poet::InterpolationModule>(
|
||||
// bucket_size, pht_size, min_entries, *(this->dht), map_copy,
|
||||
// key_indices, this->prop_names, this->params.hooks);
|
||||
|
||||
interp->setInterpolationFunction(inverseDistanceWeighting);
|
||||
}
|
||||
}
|
||||
// interp->setInterpolationFunction(inverseDistanceWeighting);
|
||||
// }
|
||||
// }
|
||||
|
||||
std::vector<double>
|
||||
poet::ChemistryModule::shuffleField(const std::vector<double> &in_field,
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
#include "../Base/SimParams.hpp"
|
||||
#include "../DataStructures/DataStructures.hpp"
|
||||
#include "SurrogateModels/DHT_Wrapper.hpp"
|
||||
#include "SurrogateModels/Interpolation.hpp"
|
||||
// #include "SurrogateModels/Interpolation.hpp"
|
||||
|
||||
#include <IrmResult.h>
|
||||
#include <PhreeqcRM.h>
|
||||
@ -208,6 +208,8 @@ public:
|
||||
*/
|
||||
std::vector<uint32_t> GetWorkerDHTEvictions() const;
|
||||
|
||||
std::vector<uint32_t> GetWorkerDHTCorruptBuckets() const;
|
||||
|
||||
/**
|
||||
* **Master only** Returns the current state of the chemical field.
|
||||
*
|
||||
@ -273,6 +275,7 @@ protected:
|
||||
WORKER_IP_FC,
|
||||
WORKER_DHT_HITS,
|
||||
WORKER_DHT_EVICTIONS,
|
||||
WORKER_DHT_CORRUPT,
|
||||
WORKER_PHT_CACHE_HITS,
|
||||
WORKER_IP_CALLS
|
||||
};
|
||||
@ -280,6 +283,7 @@ protected:
|
||||
std::vector<uint32_t> interp_calls;
|
||||
std::vector<uint32_t> dht_hits;
|
||||
std::vector<uint32_t> dht_evictions;
|
||||
std::vector<uint32_t> corrupt_buckets;
|
||||
|
||||
struct worker_s {
|
||||
double phreeqc_t = 0.;
|
||||
@ -354,7 +358,7 @@ protected:
|
||||
poet::DHT_Wrapper *dht = nullptr;
|
||||
|
||||
bool interp_enabled{false};
|
||||
std::unique_ptr<poet::InterpolationModule> interp;
|
||||
// std::unique_ptr<poet::InterpolationModule> interp;
|
||||
|
||||
static constexpr uint32_t BUFFER_OFFSET = 5;
|
||||
|
||||
|
||||
@ -97,6 +97,25 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTEvictions() const {
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::vector<uint32_t>
|
||||
poet::ChemistryModule::GetWorkerDHTCorruptBuckets() const {
|
||||
int type = CHEM_PERF;
|
||||
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
|
||||
type = WORKER_DHT_CORRUPT;
|
||||
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
|
||||
|
||||
MPI_Status probe;
|
||||
MPI_Probe(MPI_ANY_SOURCE, WORKER_DHT_CORRUPT, this->group_comm, &probe);
|
||||
int count;
|
||||
MPI_Get_count(&probe, MPI_UINT32_T, &count);
|
||||
|
||||
std::vector<uint32_t> ret(count);
|
||||
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE,
|
||||
WORKER_DHT_CORRUPT, this->group_comm, NULL);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::vector<double>
|
||||
poet::ChemistryModule::GetWorkerInterpolationWriteTimings() const {
|
||||
int type = CHEM_PERF;
|
||||
|
||||
@ -1,661 +0,0 @@
|
||||
/// Time-stamp: "Last modified 2023-08-10 11:50:46 mluebke"
|
||||
/*
|
||||
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
|
||||
**
|
||||
** POET is free software; you can redistribute it and/or modify it under the
|
||||
** terms of the GNU General Public License as published by the Free Software
|
||||
** Foundation; either version 2 of the License, or (at your option) any later
|
||||
** version.
|
||||
**
|
||||
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
**
|
||||
** You should have received a copy of the GNU General Public License along with
|
||||
** this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include "DHT.h"
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <math.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
static void determine_dest(uint64_t hash, int comm_size,
|
||||
unsigned int table_size, unsigned int *dest_rank,
|
||||
uint64_t *index, unsigned int index_count) {
|
||||
/** temporary index */
|
||||
uint64_t tmp_index;
|
||||
/** how many bytes do we need for one index? */
|
||||
int index_size = sizeof(double) - (index_count - 1);
|
||||
for (int i = 0; i < index_count; i++) {
|
||||
tmp_index = (hash >> (i * 8)) & ((1ULL << (index_size * 8)) - 1);
|
||||
/* memcpy(&tmp_index, (char *)&hash + i, index_size); */
|
||||
index[i] = (uint64_t)(tmp_index % table_size);
|
||||
}
|
||||
*dest_rank = (unsigned int)(hash % comm_size);
|
||||
}
|
||||
|
||||
static void set_flag(char *flag_byte) {
|
||||
*flag_byte = 0;
|
||||
*flag_byte |= (1 << 0);
|
||||
}
|
||||
|
||||
static int read_flag(char flag_byte) {
|
||||
if ((flag_byte & 0x01) == 0x01) {
|
||||
return 1;
|
||||
} else
|
||||
return 0;
|
||||
}
|
||||
|
||||
DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
|
||||
unsigned int key_size,
|
||||
uint64_t (*hash_func)(int, const void *)) {
|
||||
DHT *object;
|
||||
MPI_Win window;
|
||||
void *mem_alloc;
|
||||
int comm_size, index_bytes;
|
||||
|
||||
// calculate how much bytes for the index are needed to address count of
|
||||
// buckets per process
|
||||
index_bytes = (int)ceil(log2(size));
|
||||
if (index_bytes % 8 != 0)
|
||||
index_bytes = index_bytes + (8 - (index_bytes % 8));
|
||||
|
||||
// allocate memory for dht-object
|
||||
object = (DHT *)malloc(sizeof(DHT));
|
||||
if (object == NULL)
|
||||
return NULL;
|
||||
|
||||
// every memory allocation has 1 additional byte for flags etc.
|
||||
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL,
|
||||
&mem_alloc) != 0)
|
||||
return NULL;
|
||||
if (MPI_Comm_size(comm, &comm_size) != 0)
|
||||
return NULL;
|
||||
|
||||
// since MPI_Alloc_mem doesn't provide memory allocation with the memory set
|
||||
// to zero, we're doing this here
|
||||
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
|
||||
|
||||
// create windows on previously allocated memory
|
||||
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size),
|
||||
(1 + data_size + key_size), MPI_INFO_NULL, comm,
|
||||
&window) != 0)
|
||||
return NULL;
|
||||
|
||||
// fill dht-object
|
||||
object->data_size = data_size;
|
||||
object->key_size = key_size;
|
||||
object->table_size = size;
|
||||
object->window = window;
|
||||
object->hash_func = hash_func;
|
||||
object->comm_size = comm_size;
|
||||
object->communicator = comm;
|
||||
object->read_misses = 0;
|
||||
object->evictions = 0;
|
||||
object->recv_entry = malloc(1 + data_size + key_size);
|
||||
object->send_entry = malloc(1 + data_size + key_size);
|
||||
object->index_count = 9 - (index_bytes / 8);
|
||||
object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t));
|
||||
object->mem_alloc = mem_alloc;
|
||||
|
||||
// if set, initialize dht_stats
|
||||
#ifdef DHT_STATISTICS
|
||||
DHT_stats *stats;
|
||||
|
||||
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
|
||||
if (stats == NULL)
|
||||
return NULL;
|
||||
|
||||
object->stats = stats;
|
||||
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
|
||||
object->stats->old_writes = 0;
|
||||
object->stats->read_misses = 0;
|
||||
object->stats->evictions = 0;
|
||||
object->stats->w_access = 0;
|
||||
object->stats->r_access = 0;
|
||||
#endif
|
||||
|
||||
return object;
|
||||
}
|
||||
|
||||
void DHT_set_accumulate_callback(DHT *table,
|
||||
int (*callback_func)(int, void *, int,
|
||||
void *)) {
|
||||
table->accumulate_callback = callback_func;
|
||||
}
|
||||
|
||||
int DHT_write_accumulate(DHT *table, const void *send_key, int data_size,
|
||||
void *send_data, uint32_t *proc, uint32_t *index,
|
||||
int *callback_ret) {
|
||||
unsigned int dest_rank, i;
|
||||
int result = DHT_SUCCESS;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->w_access++;
|
||||
#endif
|
||||
|
||||
// determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
|
||||
table->table_size, &dest_rank, table->index,
|
||||
table->index_count);
|
||||
|
||||
// concatenating key with data to write entry to DHT
|
||||
set_flag((char *)table->send_entry);
|
||||
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
|
||||
/* memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
|
||||
*/
|
||||
/* table->data_size); */
|
||||
|
||||
// locking window of target rank with exclusive lock
|
||||
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment eviction counter if receiving key doesn't match sending key
|
||||
// entry has write flag and last index is reached.
|
||||
if (read_flag(*(char *)table->recv_entry)) {
|
||||
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
|
||||
0) {
|
||||
if (i == (table->index_count) - 1) {
|
||||
table->evictions += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->evictions += 1;
|
||||
#endif
|
||||
result = DHT_WRITE_SUCCESS_WITH_EVICTION;
|
||||
break;
|
||||
}
|
||||
} else
|
||||
break;
|
||||
} else {
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->writes_local[dest_rank]++;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) {
|
||||
memset((char *)table->send_entry + 1 + table->key_size, '\0',
|
||||
table->data_size);
|
||||
} else {
|
||||
memcpy((char *)table->send_entry + 1 + table->key_size,
|
||||
(char *)table->recv_entry + 1 + table->key_size, table->data_size);
|
||||
}
|
||||
|
||||
*callback_ret = table->accumulate_callback(
|
||||
data_size, (char *)send_data, table->data_size,
|
||||
(char *)table->send_entry + 1 + table->key_size);
|
||||
|
||||
// put data to DHT (with last selected index by value i)
|
||||
if (*callback_ret == 0) {
|
||||
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
}
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
if (proc) {
|
||||
*proc = dest_rank;
|
||||
}
|
||||
|
||||
if (index) {
|
||||
*index = table->index[i];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc,
|
||||
uint32_t *index) {
|
||||
unsigned int dest_rank, i;
|
||||
int result = DHT_SUCCESS;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->w_access++;
|
||||
#endif
|
||||
|
||||
// determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
|
||||
table->table_size, &dest_rank, table->index,
|
||||
table->index_count);
|
||||
|
||||
// concatenating key with data to write entry to DHT
|
||||
set_flag((char *)table->send_entry);
|
||||
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
|
||||
memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
|
||||
table->data_size);
|
||||
|
||||
// locking window of target rank with exclusive lock
|
||||
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment eviction counter if receiving key doesn't match sending key
|
||||
// entry has write flag and last index is reached.
|
||||
if (read_flag(*(char *)table->recv_entry)) {
|
||||
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
|
||||
0) {
|
||||
if (i == (table->index_count) - 1) {
|
||||
table->evictions += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->evictions += 1;
|
||||
#endif
|
||||
result = DHT_WRITE_SUCCESS_WITH_EVICTION;
|
||||
break;
|
||||
}
|
||||
} else
|
||||
break;
|
||||
} else {
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->writes_local[dest_rank]++;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// put data to DHT (with last selected index by value i)
|
||||
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
if (proc) {
|
||||
*proc = dest_rank;
|
||||
}
|
||||
|
||||
if (index) {
|
||||
*index = table->index[i];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
int DHT_read(DHT *table, const void *send_key, void *destination) {
|
||||
unsigned int dest_rank, i;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->r_access++;
|
||||
#endif
|
||||
|
||||
// determine destination rank and index by hash of key
|
||||
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
|
||||
table->table_size, &dest_rank, table->index,
|
||||
table->index_count);
|
||||
|
||||
// locking window of target rank with shared lock
|
||||
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
// receive data
|
||||
for (i = 0; i < table->index_count; i++) {
|
||||
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
|
||||
MPI_BYTE, dest_rank, table->index[i],
|
||||
1 + table->data_size + table->key_size, MPI_BYTE,
|
||||
table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Win_flush(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment read error counter if write flag isn't set ...
|
||||
if ((read_flag(*(char *)table->recv_entry)) == 0) {
|
||||
table->read_misses += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->read_misses += 1;
|
||||
#endif
|
||||
// unlock window and return
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
return DHT_READ_MISS;
|
||||
}
|
||||
|
||||
// ... or key doesn't match passed by key and last index reached.
|
||||
if (memcmp(((char *)table->recv_entry) + 1, send_key, table->key_size) !=
|
||||
0) {
|
||||
if (i == (table->index_count) - 1) {
|
||||
table->read_misses += 1;
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->read_misses += 1;
|
||||
#endif
|
||||
// unlock window an return
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
return DHT_READ_MISS;
|
||||
}
|
||||
} else
|
||||
break;
|
||||
}
|
||||
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(dest_rank, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// if matching key was found copy data into memory of passed pointer
|
||||
memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1,
|
||||
table->data_size);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_read_location(DHT *table, uint32_t proc, uint32_t index,
|
||||
void *destination) {
|
||||
const uint32_t bucket_size = table->data_size + table->key_size + 1;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
table->stats->r_access++;
|
||||
#endif
|
||||
|
||||
// locking window of target rank with shared lock
|
||||
if (MPI_Win_lock(MPI_LOCK_SHARED, proc, 0, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
// receive data
|
||||
if (MPI_Get(table->recv_entry, bucket_size, MPI_BYTE, proc, index,
|
||||
bucket_size, MPI_BYTE, table->window) != 0) {
|
||||
return DHT_MPI_ERROR;
|
||||
}
|
||||
|
||||
// unlock window of target rank
|
||||
if (MPI_Win_unlock(proc, table->window) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// if matching key was found copy data into memory of passed pointer
|
||||
memcpy((char *)destination, (char *)table->recv_entry + 1 + table->key_size,
|
||||
table->data_size);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_to_file(DHT *table, const char *filename) {
|
||||
// open file
|
||||
MPI_File file;
|
||||
if (MPI_File_open(table->communicator, filename,
|
||||
MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL,
|
||||
&file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
// write header (key_size and data_size)
|
||||
if (rank == 0) {
|
||||
if (MPI_File_write_shared(file, &table->key_size, 1, MPI_INT,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_WRITE_ERROR;
|
||||
if (MPI_File_write_shared(file, &table->data_size, 1, MPI_INT,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
|
||||
char *ptr;
|
||||
int bucket_size = table->key_size + table->data_size + 1;
|
||||
|
||||
// iterate over local memory
|
||||
for (unsigned int i = 0; i < table->table_size; i++) {
|
||||
ptr = (char *)table->mem_alloc + (i * bucket_size);
|
||||
// if bucket has been written to (checked by written_flag)...
|
||||
if (read_flag(*ptr)) {
|
||||
// write key and data to file
|
||||
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_WRITE_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
|
||||
// close file
|
||||
if (MPI_File_close(&file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_from_file(DHT *table, const char *filename) {
|
||||
MPI_File file;
|
||||
MPI_Offset f_size;
|
||||
int bucket_size, buffer_size, cur_pos, rank, offset;
|
||||
char *buffer;
|
||||
void *key;
|
||||
void *data;
|
||||
|
||||
// open file
|
||||
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY,
|
||||
MPI_INFO_NULL, &file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
// get file size
|
||||
if (MPI_File_get_size(file, &f_size) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
// calculate bucket size
|
||||
bucket_size = table->key_size + table->data_size;
|
||||
// buffer size is either bucket size or, if bucket size is smaller than the
|
||||
// file header, the size of DHT_FILEHEADER_SIZE
|
||||
buffer_size =
|
||||
bucket_size > DHT_FILEHEADER_SIZE ? bucket_size : DHT_FILEHEADER_SIZE;
|
||||
// allocate buffer
|
||||
buffer = (char *)malloc(buffer_size);
|
||||
|
||||
// read file header
|
||||
if (MPI_File_read(file, buffer, DHT_FILEHEADER_SIZE, MPI_BYTE,
|
||||
MPI_STATUS_IGNORE) != 0)
|
||||
return DHT_FILE_READ_ERROR;
|
||||
|
||||
// compare if written header data and key size matches current sizes
|
||||
if (*(int *)buffer != table->key_size)
|
||||
return DHT_WRONG_FILE;
|
||||
if (*(int *)(buffer + 4) != table->data_size)
|
||||
return DHT_WRONG_FILE;
|
||||
|
||||
// set offset for each process
|
||||
offset = bucket_size * table->comm_size;
|
||||
|
||||
// seek behind header of DHT file
|
||||
if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
// current position is rank * bucket_size + OFFSET
|
||||
cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size);
|
||||
|
||||
// loop over file and write data to DHT with DHT_write
|
||||
while (cur_pos < f_size) {
|
||||
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
// TODO: really necessary?
|
||||
MPI_Offset tmp;
|
||||
MPI_File_get_position(file, &tmp);
|
||||
if (MPI_File_read(file, buffer, bucket_size, MPI_BYTE, MPI_STATUS_IGNORE) !=
|
||||
0)
|
||||
return DHT_FILE_READ_ERROR;
|
||||
// extract key and data and write to DHT
|
||||
key = buffer;
|
||||
data = (buffer + table->key_size);
|
||||
if (DHT_write(table, key, data, NULL, NULL) == DHT_MPI_ERROR)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
// increment current position
|
||||
cur_pos += offset;
|
||||
}
|
||||
|
||||
free(buffer);
|
||||
if (MPI_File_close(&file) != 0)
|
||||
return DHT_FILE_IO_ERROR;
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
|
||||
int buf;
|
||||
|
||||
if (eviction_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->evictions, &buf, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
*eviction_counter = buf;
|
||||
}
|
||||
if (readerror_counter != NULL) {
|
||||
buf = 0;
|
||||
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
*readerror_counter = buf;
|
||||
}
|
||||
if (MPI_Win_free(&(table->window)) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Free_mem(table->mem_alloc) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
free(table->recv_entry);
|
||||
free(table->send_entry);
|
||||
free(table->index);
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
free(table->stats->writes_local);
|
||||
free(table->stats);
|
||||
#endif
|
||||
free(table);
|
||||
|
||||
return DHT_SUCCESS;
|
||||
}
|
||||
|
||||
int DHT_print_statistics(DHT *table) {
|
||||
#ifdef DHT_STATISTICS
|
||||
int *written_buckets;
|
||||
int *read_misses, sum_read_misses;
|
||||
int *evictions, sum_evictions;
|
||||
int sum_w_access, sum_r_access, *w_access, *r_access;
|
||||
int rank;
|
||||
MPI_Comm_rank(table->communicator, &rank);
|
||||
|
||||
// disable possible warning of unitialized variable, which is not the case
|
||||
// since we're using MPI_Gather to obtain all values only on rank 0
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
|
||||
// obtaining all values from all processes in the communicator
|
||||
if (rank == 0)
|
||||
read_misses = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1,
|
||||
MPI_INT, 0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT,
|
||||
MPI_SUM, 0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->read_misses = 0;
|
||||
|
||||
if (rank == 0)
|
||||
evictions = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM,
|
||||
0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->evictions = 0;
|
||||
|
||||
if (rank == 0)
|
||||
w_access = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->w_access = 0;
|
||||
|
||||
if (rank == 0)
|
||||
r_access = (int *)malloc(table->comm_size * sizeof(int));
|
||||
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0,
|
||||
table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
table->stats->r_access = 0;
|
||||
|
||||
if (rank == 0)
|
||||
written_buckets = (int *)calloc(table->comm_size, sizeof(int));
|
||||
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size,
|
||||
MPI_INT, MPI_SUM, 0, table->communicator) != 0)
|
||||
return DHT_MPI_ERROR;
|
||||
|
||||
if (rank == 0) { // only process with rank 0 will print out results as a
|
||||
// table
|
||||
int sum_written_buckets = 0;
|
||||
|
||||
for (int i = 0; i < table->comm_size; i++) {
|
||||
sum_written_buckets += written_buckets[i];
|
||||
}
|
||||
|
||||
int members = 7;
|
||||
int padsize = (members * 12) + 1;
|
||||
char pad[padsize + 1];
|
||||
|
||||
memset(pad, '-', padsize * sizeof(char));
|
||||
pad[padsize] = '\0';
|
||||
printf("\n");
|
||||
printf("%-35s||resets with every call of this function\n", " ");
|
||||
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied",
|
||||
"free", "w_access", "r_access", "read misses", "evictions");
|
||||
printf("%s\n", pad);
|
||||
for (int i = 0; i < table->comm_size; i++) {
|
||||
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", i,
|
||||
written_buckets[i], table->table_size - written_buckets[i],
|
||||
w_access[i], r_access[i], read_misses[i], evictions[i]);
|
||||
}
|
||||
printf("%s\n", pad);
|
||||
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", "sum",
|
||||
sum_written_buckets,
|
||||
(table->table_size * table->comm_size) - sum_written_buckets,
|
||||
sum_w_access, sum_r_access, sum_read_misses, sum_evictions);
|
||||
|
||||
printf("%s\n", pad);
|
||||
printf("%s %d\n",
|
||||
"new entries:", sum_written_buckets - table->stats->old_writes);
|
||||
|
||||
printf("\n");
|
||||
fflush(stdout);
|
||||
|
||||
table->stats->old_writes = sum_written_buckets;
|
||||
}
|
||||
|
||||
// enable warning again
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
MPI_Barrier(table->communicator);
|
||||
return DHT_SUCCESS;
|
||||
#endif
|
||||
}
|
||||
@ -1,327 +0,0 @@
|
||||
/*
|
||||
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
|
||||
**
|
||||
** POET is free software; you can redistribute it and/or modify it under the
|
||||
** terms of the GNU General Public License as published by the Free Software
|
||||
** Foundation; either version 2 of the License, or (at your option) any later
|
||||
** version.
|
||||
**
|
||||
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
**
|
||||
** You should have received a copy of the GNU General Public License along with
|
||||
** this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file DHT.h
|
||||
* @author Max Lübke (mluebke@uni-potsdam.de)
|
||||
* @brief API to interact with the DHT
|
||||
* @version 0.1
|
||||
* @date 16 Nov 2017
|
||||
*
|
||||
* This file implements the creation of a DHT by using the MPI
|
||||
* one-sided-communication. There is also the possibility to write or read data
|
||||
* from or to the DHT. In addition, the current state of the DHT can be written
|
||||
* to a file and read in again later.
|
||||
*/
|
||||
|
||||
#ifndef DHT_H
|
||||
#define DHT_H
|
||||
|
||||
#include <mpi.h>
|
||||
#include <stdint.h>
|
||||
|
||||
/** Returned if some error in MPI routine occurs. */
|
||||
#define DHT_MPI_ERROR -1
|
||||
/** Returned by a call of DHT_read if no bucket with given key was found. */
|
||||
#define DHT_READ_MISS -2
|
||||
/** Returned by DHT_write if a bucket was evicted. */
|
||||
#define DHT_WRITE_SUCCESS_WITH_EVICTION -3
|
||||
/** Returned when no errors occured. */
|
||||
#define DHT_SUCCESS 0
|
||||
|
||||
/** Returned by DHT_from_file if the given file does not match expected file. */
|
||||
#define DHT_WRONG_FILE -11
|
||||
/** Returned by DHT file operations if MPI file operation fails. */
|
||||
#define DHT_FILE_IO_ERROR -12
|
||||
/** Returned by DHT file operations if error occured in MPI_Read operation. */
|
||||
#define DHT_FILE_READ_ERROR -13
|
||||
/** Returned by DHT file operations if error occured in MPI_Write operation. */
|
||||
#define DHT_FILE_WRITE_ERROR -14
|
||||
|
||||
/** Size of the file header in byte. */
|
||||
#define DHT_FILEHEADER_SIZE 8
|
||||
|
||||
/**
|
||||
* Internal struct to store statistics about read and write accesses and also
|
||||
* read misses and evictions.
|
||||
* <b>All values will be resetted to zero after a call of
|
||||
* DHT_print_statistics().</b>
|
||||
* Internal use only!
|
||||
*
|
||||
* @todo There's maybe a better solution than DHT_print_statistics and this
|
||||
* struct
|
||||
*/
|
||||
typedef struct {
|
||||
/** Count of writes to specific process this process did. */
|
||||
int *writes_local;
|
||||
/** Writes after last call of DHT_print_statistics. */
|
||||
int old_writes;
|
||||
/** How many read misses occur? */
|
||||
int read_misses;
|
||||
/** How many buckets where evicted? */
|
||||
int evictions;
|
||||
/** How many calls of DHT_write() did this process? */
|
||||
int w_access;
|
||||
/** How many calls of DHT_read() did this process? */
|
||||
int r_access;
|
||||
} DHT_stats;
|
||||
|
||||
/**
|
||||
* Struct which serves as a handler or so called \a DHT-object. Will
|
||||
* be created by DHT_create and must be passed as a parameter to every following
|
||||
* function. Stores all relevant data.
|
||||
* Do not touch outside DHT functions!
|
||||
*/
|
||||
typedef struct {
|
||||
/** Created MPI Window, which serves as the DHT memory area of the process. */
|
||||
MPI_Win window;
|
||||
/** Size of the data of a bucket entry in byte. */
|
||||
int data_size;
|
||||
/** Size of the key of a bucket entry in byte. */
|
||||
int key_size;
|
||||
/** Count of buckets for each process. */
|
||||
unsigned int table_size;
|
||||
/** MPI communicator of all participating processes. */
|
||||
MPI_Comm communicator;
|
||||
/** Size of the MPI communicator respectively all participating processes. */
|
||||
int comm_size;
|
||||
/** Pointer to a hashfunction. */
|
||||
uint64_t (*hash_func)(int, const void *);
|
||||
/** Pre-allocated memory where a bucket can be received. */
|
||||
void *recv_entry;
|
||||
/** Pre-allocated memory where a bucket to send can be stored. */
|
||||
void *send_entry;
|
||||
/** Allocated memory on which the MPI window was created. */
|
||||
void *mem_alloc;
|
||||
/** Count of read misses over all time. */
|
||||
int read_misses;
|
||||
/** Count of evictions over all time. */
|
||||
int evictions;
|
||||
/** Array of indeces where a bucket can be stored. */
|
||||
uint64_t *index;
|
||||
/** Count of possible indeces. */
|
||||
unsigned int index_count;
|
||||
|
||||
int (*accumulate_callback)(int, void *, int, void *);
|
||||
#ifdef DHT_STATISTICS
|
||||
/** Detailed statistics of the usage of the DHT. */
|
||||
DHT_stats *stats;
|
||||
#endif
|
||||
} DHT;
|
||||
|
||||
extern void DHT_set_accumulate_callback(DHT *table,
|
||||
int (*callback_func)(int, void *, int,
|
||||
void *));
|
||||
|
||||
extern int DHT_write_accumulate(DHT *table, const void *key, int send_size,
|
||||
void *data, uint32_t *proc, uint32_t *index, int *callback_ret);
|
||||
|
||||
/**
|
||||
* @brief Create a DHT.
|
||||
*
|
||||
* When calling this function, the required memory is allocated and a
|
||||
* MPI_Window is created. This allows the execution of MPI_Get and
|
||||
* MPI_Put operations for one-sided communication. Then the number of
|
||||
* indexes is calculated and finally all relevant data is entered into the
|
||||
* \a DHT-object which is returned.
|
||||
*
|
||||
* @param comm MPI communicator which addresses all participating process of the
|
||||
* DHT.
|
||||
* @param size_per_process Number of buckets per process.
|
||||
* @param data_size Size of data in byte.
|
||||
* @param key_size Size of the key in byte.
|
||||
* @param hash_func Pointer to a hash function. This function must take the size
|
||||
* of the key and a pointer to the key as input parameters and return a 64 bit
|
||||
* hash.
|
||||
* @return DHT* The returned value is the \a DHT-object which serves as a handle
|
||||
* for all DHT operations. If an error occured NULL is returned.
|
||||
*/
|
||||
extern DHT *DHT_create(MPI_Comm comm, uint64_t size_per_process,
|
||||
unsigned int data_size, unsigned int key_size,
|
||||
uint64_t (*hash_func)(int, const void *));
|
||||
|
||||
/**
|
||||
* @brief Write data into DHT.
|
||||
*
|
||||
* When DHT_write is called, the address window is locked with a
|
||||
* LOCK_EXCLUSIVE for write access. Now the first bucket is received
|
||||
* using MPI_Get and it is checked if the bucket is empty or if the received key
|
||||
* matches the passed key. If this is the case, the data of the bucket is
|
||||
* overwritten with the new value. If not, the function continues with the next
|
||||
* index until no more indexes are available. When the last index is reached and
|
||||
* there are no more indexes available, the last examined bucket is replaced.
|
||||
* After successful writing, the memory window is released and the function
|
||||
* returns.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param key Pointer to the key.
|
||||
* @param data Pointer to the data.
|
||||
* @param proc If not NULL, returns the process number written to.
|
||||
* @param index If not NULL, returns the index of the bucket where the data was
|
||||
* written to.
|
||||
* @return int Returns either DHT_SUCCESS on success or correspondending error
|
||||
* value on eviction or error.
|
||||
*/
|
||||
extern int DHT_write(DHT *table, void *key, void *data, uint32_t *proc,
|
||||
uint32_t *index);
|
||||
|
||||
/**
|
||||
* @brief Read data from DHT.
|
||||
*
|
||||
* At the beginning, the target process and all possible indices are determined.
|
||||
* After that a SHARED lock on the address window for read access is done
|
||||
* and the first entry is retrieved. Now the received key is compared
|
||||
* with the key passed to the function. If they coincide the correct data
|
||||
* was found. If not it continues with the next index. If the last
|
||||
* possible bucket is reached and the keys still do not match the read
|
||||
* error counter is incremented. After the window has been released
|
||||
* again, the function returns with a corresponding return value (read
|
||||
* error or error-free read). The data to be read out is also written to
|
||||
* the memory area of the passed pointer.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param key Pointer to the key.
|
||||
* @param destination Pointer to memory area where retreived data should be
|
||||
* stored.
|
||||
* @return int Returns either DHT_SUCCESS on success or correspondending error
|
||||
* value on read miss or error.
|
||||
*/
|
||||
extern int DHT_read(DHT *table, const void *key, void *destination);
|
||||
|
||||
extern int DHT_read_location(DHT *table, uint32_t proc, uint32_t index,
|
||||
void *destination);
|
||||
/**
|
||||
* @brief Write current state of DHT to file.
|
||||
*
|
||||
* All contents are written as a memory dump, so that no conversion takes place.
|
||||
* First, an attempt is made to open or create a file. If this is successful the
|
||||
* file header consisting of data and key size is written. Then each process
|
||||
* reads its memory area of the DHT and each bucket that was marked as written
|
||||
* is added to the file using MPI file operations.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param filename Name of the file to write to.
|
||||
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
|
||||
* opened/closed or DHT_WRITE_ERROR if file is not writable.
|
||||
*/
|
||||
extern int DHT_to_file(DHT *table, const char *filename);
|
||||
|
||||
/**
|
||||
* @brief Read state of DHT from file.
|
||||
*
|
||||
* One needs a previously written DHT file (by DHT_from_file).
|
||||
* First of all, an attempt is made to open the specified file. If this is
|
||||
* succeeded the file header is read and compared with the current values of the
|
||||
* DHT. If the data and key sizes do not differ, one can continue. Each process
|
||||
* reads one line of the file and writes it to the DHT with DHT_write. This
|
||||
* happens until no more lines are left. The writing is done by the
|
||||
* implementation of DHT_write.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param filename Name of the file to read from.
|
||||
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
|
||||
* opened/closed, DHT_READ_MISS if file is not readable or DHT_WRONG_FILE if
|
||||
* file doesn't match expectation. This is possible if the data size or key size
|
||||
* is different.
|
||||
*/
|
||||
extern int DHT_from_file(DHT *table, const char *filename);
|
||||
|
||||
/**
|
||||
* @brief Free ressources of DHT.
|
||||
*
|
||||
* Finally, to free all resources after using the DHT, the function
|
||||
* DHT_free must be used. This will free the MPI\_Window, as well as the
|
||||
* associated memory. Also all internal variables are released. Optionally, the
|
||||
* count of evictions and read misses can also be obtained.
|
||||
*
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @param eviction_counter \a optional: Pointer to integer where the count of
|
||||
* evictions should be stored.
|
||||
* @param readerror_counter \a optional: Pointer to integer where the count of
|
||||
* read errors should be stored.
|
||||
* @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on
|
||||
* internal MPI error.
|
||||
*/
|
||||
extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter);
|
||||
|
||||
/**
|
||||
* @brief Prints a table with statistics about current use of DHT.
|
||||
*
|
||||
* These statistics are from each participated process and also summed up over
|
||||
* all processes. Detailed statistics are:
|
||||
* -# occupied buckets (in respect to the memory of this process)
|
||||
* -# free buckets (in respect to the memory of this process)
|
||||
* -# calls of DHT_write (w_access)
|
||||
* -# calls of DHT_read (r_access)
|
||||
* -# read misses (see DHT_READ_MISS)
|
||||
* -# collisions (see DHT_WRITE_SUCCESS_WITH_EVICTION)
|
||||
* 3-6 will reset with every call of this function finally the amount of new
|
||||
* written entries is printed out (since the last call of this funtion).
|
||||
*
|
||||
* This is done by collective MPI operations with the root process with rank 0,
|
||||
* which will also print a table with all informations to stdout.
|
||||
*
|
||||
* Also, as this function was implemented for a special case (POET project) one
|
||||
* need to define DHT_STATISTICS to the compiler macros to use this
|
||||
* function (eg. <emph>gcc -DDHT_STATISTICS ... </emph>).
|
||||
* @param table Pointer to the \a DHT-object.
|
||||
* @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI
|
||||
* error.
|
||||
*/
|
||||
extern int DHT_print_statistics(DHT *table);
|
||||
|
||||
/**
|
||||
* @brief Determine destination rank and index.
|
||||
*
|
||||
* This is done by looping over all possbile indices. First of all, set a
|
||||
* temporary index to zero and copy count of bytes for each index into the
|
||||
* memory area of the temporary index. After that the current index is
|
||||
* calculated by the temporary index modulo the table size. The destination rank
|
||||
* of the process is simply determined by hash modulo the communicator size.
|
||||
*
|
||||
* @param hash Calculated 64 bit hash.
|
||||
* @param comm_size Communicator size.
|
||||
* @param table_size Count of buckets per process.
|
||||
* @param dest_rank Reference to the destination rank variable.
|
||||
* @param index Pointer to the array index.
|
||||
* @param index_count Count of possible indeces.
|
||||
*/
|
||||
static void determine_dest(uint64_t hash, int comm_size,
|
||||
unsigned int table_size, unsigned int *dest_rank,
|
||||
uint64_t *index, unsigned int index_count);
|
||||
|
||||
/**
|
||||
* @brief Set the occupied flag.
|
||||
*
|
||||
* This will set the first bit of a bucket to 1.
|
||||
*
|
||||
* @param flag_byte First byte of a bucket.
|
||||
*/
|
||||
static void set_flag(char *flag_byte);
|
||||
|
||||
/**
|
||||
* @brief Get the occupied flag.
|
||||
*
|
||||
* This function determines whether the occupied flag of a bucket was set or
|
||||
* not.
|
||||
*
|
||||
* @param flag_byte First byte of a bucket.
|
||||
* @return int Returns 1 for true or 0 for false.
|
||||
*/
|
||||
static int read_flag(char flag_byte);
|
||||
|
||||
#endif /* DHT_H */
|
||||
@ -21,6 +21,8 @@
|
||||
*/
|
||||
|
||||
#include "DHT_Wrapper.hpp"
|
||||
#include "HashFunctions.hpp"
|
||||
#include "LUCX/DHT.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
@ -28,7 +30,7 @@
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <mpi.h>
|
||||
#include <stdexcept>
|
||||
#include <vector>
|
||||
|
||||
@ -49,13 +51,36 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
// initialize DHT object
|
||||
// key size = count of key elements + timestep
|
||||
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
|
||||
uint32_t data_size =
|
||||
(data_count + (with_interp ? input_key_elements.size() : 0)) *
|
||||
sizeof(double);
|
||||
// uint32_t data_size =
|
||||
// (data_count + (with_interp ? input_key_elements.size() : 0)) *
|
||||
// sizeof(double);
|
||||
uint32_t data_size = data_count * sizeof(double);
|
||||
uint32_t buckets_per_process =
|
||||
static_cast<std::uint32_t>(dht_size / (data_size + key_size));
|
||||
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
|
||||
&poet::Murmur2_64A);
|
||||
|
||||
#ifdef POET_DHT_UCX
|
||||
const ucx_ep_args_mpi_t ucx_bcast_mpi_args = {.comm = dht_comm};
|
||||
const DHT_init_t dht_init = {
|
||||
.key_size = static_cast<int>(key_size),
|
||||
.data_size = static_cast<int>(data_size),
|
||||
.bucket_count = static_cast<unsigned int>(buckets_per_process),
|
||||
.hash_func = &poet::md5_sum,
|
||||
.bcast_func = UCX_INIT_BSTRAP_MPI,
|
||||
.bcast_func_args = &ucx_bcast_mpi_args};
|
||||
dht_object = DHT_create(&dht_init);
|
||||
#else
|
||||
const DHT_init_t dht_init = {
|
||||
.key_size = static_cast<int>(key_size),
|
||||
.data_size = static_cast<int>(data_size),
|
||||
.bucket_count = static_cast<unsigned int>(buckets_per_process),
|
||||
.hash_func = &poet::md5_sum,
|
||||
.comm = dht_comm};
|
||||
dht_object = DHT_create(&dht_init);
|
||||
#endif
|
||||
|
||||
if (dht_object == nullptr) {
|
||||
throw std::runtime_error("DHT_create failed");
|
||||
}
|
||||
|
||||
dht_signif_vector = key_species.getValues();
|
||||
|
||||
@ -81,7 +106,11 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
|
||||
|
||||
DHT_Wrapper::~DHT_Wrapper() {
|
||||
// free DHT
|
||||
DHT_free(dht_object, NULL, NULL);
|
||||
#ifdef POET_DHT_UCX
|
||||
DHT_free(dht_object, NULL);
|
||||
#else
|
||||
DHT_free(dht_object, NULL);
|
||||
#endif
|
||||
}
|
||||
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
|
||||
-> const DHT_ResultObject & {
|
||||
@ -111,6 +140,9 @@ auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
|
||||
break;
|
||||
case DHT_READ_MISS:
|
||||
break;
|
||||
case DHT_READ_CORRUPT:
|
||||
this->corrupt_buckets++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -232,14 +264,14 @@ DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
|
||||
// }
|
||||
|
||||
int DHT_Wrapper::tableToFile(const char *filename) {
|
||||
int res = DHT_to_file(dht_object, filename);
|
||||
return res;
|
||||
// int res = DHT_to_file(dht_object, filename);
|
||||
// return res;
|
||||
}
|
||||
|
||||
int DHT_Wrapper::fileToTable(const char *filename) {
|
||||
int res = DHT_from_file(dht_object, filename);
|
||||
if (res != DHT_SUCCESS)
|
||||
return res;
|
||||
// int res = DHT_from_file(dht_object, filename);
|
||||
// if (res != DHT_SUCCESS)
|
||||
// return res;
|
||||
|
||||
#ifdef DHT_STATISTICS
|
||||
DHT_print_statistics(dht_object);
|
||||
|
||||
@ -39,9 +39,11 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <LUCX/DHT.h>
|
||||
|
||||
#ifdef POET_DHT_UCX
|
||||
#include <LUCX/Bootstrap.h>
|
||||
#endif
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
@ -187,9 +189,12 @@ public:
|
||||
*/
|
||||
auto getEvictions() { return this->dht_evictions; };
|
||||
|
||||
auto getCorruptBuckets() { return this->corrupt_buckets; }
|
||||
|
||||
void resetCounter() {
|
||||
this->dht_hits = 0;
|
||||
this->dht_evictions = 0;
|
||||
this->corrupt_buckets = 0;
|
||||
}
|
||||
|
||||
void SetSignifVector(std::vector<uint32_t> signif_vec);
|
||||
@ -251,6 +256,7 @@ private:
|
||||
|
||||
uint32_t dht_hits = 0;
|
||||
uint32_t dht_evictions = 0;
|
||||
uint32_t corrupt_buckets = 0;
|
||||
|
||||
NamedVector<std::uint32_t> key_species;
|
||||
|
||||
|
||||
@ -25,6 +25,9 @@
|
||||
*/
|
||||
|
||||
#include "HashFunctions.hpp"
|
||||
#include <cstdint>
|
||||
|
||||
#include <openssl/md5.h>
|
||||
|
||||
#if defined(_MSC_VER)
|
||||
|
||||
@ -93,3 +96,21 @@ uint64_t poet::Murmur2_64A(int len, const void *key) {
|
||||
|
||||
return h;
|
||||
}
|
||||
|
||||
uint64_t poet::md5_sum(int len, const void *key) {
|
||||
MD5_CTX ctx;
|
||||
unsigned char sum[MD5_DIGEST_LENGTH];
|
||||
uint64_t retval;
|
||||
uint64_t *v1;
|
||||
uint64_t *v2;
|
||||
|
||||
MD5_Init(&ctx);
|
||||
MD5_Update(&ctx, key, len);
|
||||
MD5_Final(sum, &ctx);
|
||||
|
||||
v1 = (uint64_t *)&sum[0];
|
||||
v2 = (uint64_t *)&sum[8];
|
||||
retval = *v1 ^ *v2;
|
||||
|
||||
return retval;
|
||||
}
|
||||
@ -31,6 +31,7 @@ namespace poet {
|
||||
constexpr uint32_t HASH_SEED = 80 + 79 + 69 + 84;
|
||||
|
||||
uint64_t Murmur2_64A(int len, const void *key);
|
||||
uint64_t md5_sum(int len, const void *key);
|
||||
|
||||
} // namespace poet
|
||||
|
||||
|
||||
@ -9,20 +9,16 @@
|
||||
#include "LookupKey.hpp"
|
||||
#include "Rounding.hpp"
|
||||
|
||||
#include <cassert>
|
||||
#include <iostream>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mpi.h>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <DHT_ucx/DHT.h>
|
||||
#include <DHT_ucx/UCX_bcast_functions.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
|
||||
@ -3,30 +3,20 @@
|
||||
|
||||
#include "../../DataStructures/DataStructures.hpp"
|
||||
#include "DHT_Wrapper.hpp"
|
||||
#include "HashFunctions.hpp"
|
||||
#include "LookupKey.hpp"
|
||||
#include "Rounding.hpp"
|
||||
|
||||
#include <Rcpp.h>
|
||||
#include <Rinternals.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <mpi.h>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
|
||||
namespace poet {
|
||||
|
||||
InterpolationModule::InterpolationModule(
|
||||
|
||||
@ -6,17 +6,12 @@
|
||||
#include "LookupKey.hpp"
|
||||
#include "Rounding.hpp"
|
||||
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
extern "C" {
|
||||
#include "DHT.h"
|
||||
}
|
||||
#include <DHT_ucx/DHT.h>
|
||||
#include <DHT_ucx/UCX_bcast_functions.h>
|
||||
|
||||
namespace poet {
|
||||
|
||||
@ -38,16 +33,23 @@ ProximityHashTable::ProximityHashTable(uint32_t key_size, uint32_t data_size,
|
||||
uint32_t buckets_per_process =
|
||||
static_cast<std::uint32_t>(size_per_process / (data_size + key_size));
|
||||
|
||||
this->prox_ht = DHT_create(communicator, buckets_per_process, data_size,
|
||||
key_size, &poet::Murmur2_64A);
|
||||
ucx_ep_args_mpi_t mpi_bcast_args = {.comm = communicator};
|
||||
DHT_init_t dht_init_args = {.key_size = static_cast<int>(key_size),
|
||||
.data_size = static_cast<int>(data_size),
|
||||
.bucket_count = buckets_per_process,
|
||||
.hash_func = &poet::Murmur2_64A,
|
||||
.bcast_func = UCX_INIT_BSTRAP_MPI,
|
||||
.bcast_func_args = &mpi_bcast_args};
|
||||
|
||||
DHT_set_accumulate_callback(this->prox_ht, PHT_callback_function);
|
||||
this->prox_ht = DHT_create(&dht_init_args);
|
||||
|
||||
// DHT_set_accumulate_callback(this->prox_ht, PHT_callback_function);
|
||||
}
|
||||
|
||||
ProximityHashTable::~ProximityHashTable() {
|
||||
delete[] bucket_store;
|
||||
if (prox_ht) {
|
||||
DHT_free(prox_ht, NULL, NULL);
|
||||
DHT_free(prox_ht, NULL, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,12 +95,12 @@ void ProximityHashTable::writeLocationToPHT(LookupKey key,
|
||||
|
||||
int ret_val;
|
||||
|
||||
int status = DHT_write_accumulate(prox_ht, key.data(), sizeof(location),
|
||||
&location, NULL, NULL, &ret_val);
|
||||
// int status = DHT_write_accumulate(prox_ht, key.data(), sizeof(location),
|
||||
// &location, NULL, NULL, &ret_val);
|
||||
|
||||
if (status == DHT_WRITE_SUCCESS_WITH_EVICTION) {
|
||||
this->dht_evictions++;
|
||||
}
|
||||
// if (status == DHT_WRITE_SUCCESS_WITH_EVICTION) {
|
||||
// this->dht_evictions++;
|
||||
// }
|
||||
|
||||
// if (ret_val == INTERP_CB_FULL) {
|
||||
// localCache(key, {});
|
||||
@ -148,7 +150,7 @@ const ProximityHashTable::PHT_Result &ProximityHashTable::query(
|
||||
|
||||
for (const auto &loc : locations) {
|
||||
double start_g = MPI_Wtime();
|
||||
DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data());
|
||||
// DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data());
|
||||
this->pht_gather_dht_t += MPI_Wtime() - start_g;
|
||||
|
||||
auto *buffer = reinterpret_cast<double *>(dht_buffer.data());
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
#include "ChemistryModule.hpp"
|
||||
#include "SurrogateModels/DHT_Wrapper.hpp"
|
||||
#include "SurrogateModels/Interpolation.hpp"
|
||||
// #include "SurrogateModels/Interpolation.hpp"
|
||||
|
||||
#include <IrmResult.h>
|
||||
#include <algorithm>
|
||||
@ -173,9 +173,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
|
||||
timings.dht_get += dht_get_end - dht_get_start;
|
||||
}
|
||||
|
||||
if (interp_enabled) {
|
||||
interp->tryInterpolation(s_curr_wp);
|
||||
}
|
||||
// if (interp_enabled) {
|
||||
// interp->tryInterpolation(s_curr_wp);
|
||||
// }
|
||||
|
||||
phreeqc_time_start = MPI_Wtime();
|
||||
|
||||
@ -201,9 +201,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
|
||||
dht->fillDHT(s_curr_wp);
|
||||
dht_fill_end = MPI_Wtime();
|
||||
|
||||
if (interp_enabled) {
|
||||
interp->writePairs();
|
||||
}
|
||||
// if (interp_enabled) {
|
||||
// interp->writePairs();
|
||||
// }
|
||||
timings.dht_fill += dht_fill_end - dht_fill_start;
|
||||
}
|
||||
|
||||
@ -220,24 +220,26 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
|
||||
if (this->dht_enabled) {
|
||||
dht_hits.push_back(dht->getHits());
|
||||
dht_evictions.push_back(dht->getEvictions());
|
||||
corrupt_buckets.push_back(dht->getCorruptBuckets());
|
||||
dht->resetCounter();
|
||||
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
dht->printStatistics();
|
||||
}
|
||||
|
||||
if (this->interp_enabled) {
|
||||
std::stringstream out;
|
||||
interp_calls.push_back(interp->getInterpolationCount());
|
||||
interp->resetCounter();
|
||||
interp->writePHTStats();
|
||||
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
}
|
||||
// if (this->interp_enabled) {
|
||||
// std::stringstream out;
|
||||
// interp_calls.push_back(interp->getInterpolationCount());
|
||||
// interp->resetCounter();
|
||||
// interp->writePHTStats();
|
||||
// if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
|
||||
// out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
// << std::setw(this->file_pad) << iteration << ".pht";
|
||||
// interp->dumpPHTState(out.str());
|
||||
// }
|
||||
// }
|
||||
|
||||
RInsidePOET::getInstance().parseEvalQ("gc()");
|
||||
}
|
||||
@ -246,12 +248,12 @@ void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
|
||||
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
WorkerWriteDHTDump(iteration);
|
||||
}
|
||||
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
std::stringstream out;
|
||||
out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
<< std::setw(this->file_pad) << iteration << ".pht";
|
||||
interp->dumpPHTState(out.str());
|
||||
}
|
||||
// if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
|
||||
// std::stringstream out;
|
||||
// out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
|
||||
// << std::setw(this->file_pad) << iteration << ".pht";
|
||||
// interp->dumpPHTState(out.str());
|
||||
// }
|
||||
}
|
||||
|
||||
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
|
||||
@ -348,26 +350,26 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
|
||||
this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_WRITE: {
|
||||
double val = interp->getPHTWriteTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_READ: {
|
||||
double val = interp->getPHTReadTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_GATHER: {
|
||||
double val = interp->getDHTGatherTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_FC: {
|
||||
double val = interp->getInterpolationTime();
|
||||
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm);
|
||||
break;
|
||||
}
|
||||
// case WORKER_IP_WRITE: {
|
||||
// double val = interp->getPHTWriteTime();
|
||||
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
// this->group_comm); break;
|
||||
// }
|
||||
// case WORKER_IP_READ: {
|
||||
// double val = interp->getPHTReadTime();
|
||||
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
// this->group_comm); break;
|
||||
// }
|
||||
// case WORKER_IP_GATHER: {
|
||||
// double val = interp->getDHTGatherTime();
|
||||
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
// this->group_comm); break;
|
||||
// }
|
||||
// case WORKER_IP_FC: {
|
||||
// double val = interp->getInterpolationTime();
|
||||
// MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
|
||||
// this->group_comm); break;
|
||||
// }
|
||||
default: {
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
@ -402,15 +404,19 @@ void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
|
||||
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
|
||||
break;
|
||||
}
|
||||
case WORKER_DHT_CORRUPT: {
|
||||
reduce_and_send(corrupt_buckets, WORKER_DHT_CORRUPT);
|
||||
break;
|
||||
}
|
||||
case WORKER_IP_CALLS: {
|
||||
reduce_and_send(interp_calls, WORKER_IP_CALLS);
|
||||
return;
|
||||
}
|
||||
case WORKER_PHT_CACHE_HITS: {
|
||||
std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
|
||||
reduce_and_send(input, WORKER_PHT_CACHE_HITS);
|
||||
return;
|
||||
}
|
||||
// case WORKER_PHT_CACHE_HITS: {
|
||||
// std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
|
||||
// reduce_and_send(input, WORKER_PHT_CACHE_HITS);
|
||||
// return;
|
||||
// }
|
||||
default: {
|
||||
throw std::runtime_error("Unknown perf type in master's message.");
|
||||
}
|
||||
|
||||
250
src/Transport/AdvectionModule.cpp
Normal file
250
src/Transport/AdvectionModule.cpp
Normal file
@ -0,0 +1,250 @@
|
||||
#include "AdvectionModule.hpp"
|
||||
|
||||
#include "../Base/Macros.hpp"
|
||||
|
||||
#include <cmath>
|
||||
#include <csignal>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <limits>
|
||||
#include <mpi.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <Rcpp.h>
|
||||
|
||||
#ifdef _OPENMP
|
||||
#include <omp.h>
|
||||
#else
|
||||
#define omp_get_thread_num() 0
|
||||
#endif
|
||||
|
||||
namespace poet {
|
||||
|
||||
inline std::array<std::int32_t, 4> getIndices(std::int32_t index,
|
||||
std::int32_t n, std::int32_t m) {
|
||||
std::array<std::int32_t, 4> indices;
|
||||
|
||||
// east index
|
||||
indices[0] = (index % n == n - 1 ? -1 : index + 1);
|
||||
// south index
|
||||
indices[1] = (index + n >= m * n ? -1 : index + n);
|
||||
// west index
|
||||
indices[2] = (index % n == 0 ? -1 : index - 1);
|
||||
// north index
|
||||
indices[3] = (index - n < 0 ? -1 : index - n);
|
||||
|
||||
return indices;
|
||||
}
|
||||
|
||||
inline double getFluxApplyConc(bool inbound, std::uint32_t curr_index,
|
||||
std::int32_t neighbor_index,
|
||||
const std::vector<double> &conc, double bc) {
|
||||
// On inbound flux and non-boundary condition
|
||||
if (inbound) {
|
||||
if (neighbor_index == -1) {
|
||||
// Return boundary condition value
|
||||
return bc;
|
||||
}
|
||||
return conc[neighbor_index];
|
||||
}
|
||||
|
||||
// inbound flow from boundary condition (open) or outbound flow
|
||||
return conc[curr_index];
|
||||
}
|
||||
|
||||
inline double
|
||||
AdvectionModule::calcDeltaConc(std::size_t cell_index, double bc_val,
|
||||
const std::vector<double> &spec_vec,
|
||||
const std::vector<double> &flux) {
|
||||
const auto neighbor_indices =
|
||||
getIndices(cell_index, this->n_cells[0], this->n_cells[1]);
|
||||
double ds{.0};
|
||||
for (std::size_t neighbor_i = 0; neighbor_i < neighbor_indices.size();
|
||||
neighbor_i++) {
|
||||
const double &curr_flux = flux[neighbor_i];
|
||||
const bool inbound = curr_flux > 0;
|
||||
const double flux_apply_val = getFluxApplyConc(
|
||||
inbound, cell_index, neighbor_indices[neighbor_i], spec_vec, bc_val);
|
||||
ds += curr_flux * flux_apply_val;
|
||||
}
|
||||
|
||||
return ds;
|
||||
}
|
||||
|
||||
std::vector<double>
|
||||
AdvectionModule::CFLTimeVec(double req_dt,
|
||||
const std::vector<double> &max_fluxes) {
|
||||
const auto field_size = this->n_cells[0] * this->n_cells[1];
|
||||
|
||||
double cfl = std::numeric_limits<double>::max();
|
||||
|
||||
for (std::size_t i = 0; i < field_size; i++) {
|
||||
const double dt =
|
||||
(cell_volume * density[i] * water_saturation[i] * porosity[i]) /
|
||||
max_fluxes[i];
|
||||
if (dt < cfl) {
|
||||
cfl = dt;
|
||||
}
|
||||
}
|
||||
|
||||
if (cfl < req_dt) {
|
||||
const std::uint32_t niter = std::floor(req_dt / cfl);
|
||||
std::vector<double> time_vec(niter + 1, cfl);
|
||||
time_vec[niter] = req_dt - (cfl * niter);
|
||||
return time_vec;
|
||||
}
|
||||
|
||||
return {req_dt};
|
||||
}
|
||||
|
||||
void AdvectionModule::simulate(double dt) {
|
||||
double start_t = MPI_Wtime();
|
||||
|
||||
const auto &n = this->n_cells[0];
|
||||
const auto &m = this->n_cells[1];
|
||||
|
||||
// HACK: constant flux for this moment imported from R runtime
|
||||
|
||||
// RInsidePOET &R = RInsidePOET::getInstance();
|
||||
// const auto flux_list =
|
||||
// Rcpp::as<Rcpp::DataFrame>(R.parseEval("mysetup$advection$const_flux"));
|
||||
// std::vector<std::vector<double>> flux(flux_list.size());
|
||||
// auto parse_end = std::chrono::steady_clock::now();
|
||||
|
||||
// MSG("Parsing took " +
|
||||
// std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
// parse_end - parse_start)
|
||||
// .count()));
|
||||
|
||||
MSG("Advection time step requested: " + std::to_string(dt));
|
||||
|
||||
std::vector<double> max_fluxes(flux.size());
|
||||
|
||||
// #pragma omp parallel for schedule(dynamic)
|
||||
for (std::size_t i = 0; i < max_fluxes.size(); i++) {
|
||||
double cumflux = .0;
|
||||
for (std::size_t j = 0; j < flux[i].size(); j++) {
|
||||
cumflux += std::abs(flux[i][j]);
|
||||
}
|
||||
max_fluxes[i] = cumflux;
|
||||
}
|
||||
|
||||
const auto time_vec = CFLTimeVec(dt, max_fluxes);
|
||||
|
||||
MSG("CFL yielding " + std::to_string(time_vec.size()) + " inner iterations");
|
||||
|
||||
auto field_vec = t_field.As2DVector();
|
||||
|
||||
// iterate over all inactive cells and set defined values, as chemistry module
|
||||
// won't skip those cells until now
|
||||
|
||||
for (const auto &inac_cell : this->inactive_cells) {
|
||||
for (std::size_t species_i = 0; species_i < field_vec.size(); species_i++) {
|
||||
field_vec[species_i][inac_cell.first] = inac_cell.second[species_i];
|
||||
}
|
||||
}
|
||||
|
||||
// #pragma omp parallel for schedule(dynamic)
|
||||
for (std::size_t species_i = 0; species_i < field_vec.size(); species_i++) {
|
||||
auto &species = field_vec[species_i];
|
||||
std::vector<double> spec_copy = species;
|
||||
for (const double &curr_dt : time_vec) {
|
||||
for (std::size_t cell_i = 0; cell_i < n * m; cell_i++) {
|
||||
|
||||
// if inactive cell -> skip
|
||||
if (this->inactive_cells.find(cell_i) != this->inactive_cells.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
double delta_conc = calcDeltaConc(
|
||||
cell_i, this->boundary_condition[species_i], species, flux[cell_i]);
|
||||
spec_copy[cell_i] +=
|
||||
(curr_dt * delta_conc) / (porosity[cell_i] * cell_volume);
|
||||
|
||||
// if (species_i == 0 &&
|
||||
// (spec_copy[cell_i] < 110.124 || spec_copy[cell_i] > 110.683)) {
|
||||
// raise(SIGABRT);
|
||||
// }
|
||||
}
|
||||
species = spec_copy;
|
||||
}
|
||||
}
|
||||
|
||||
t_field = field_vec;
|
||||
double end_t = MPI_Wtime();
|
||||
|
||||
MSG("Advection took " + std::to_string(end_t - start_t) + "s");
|
||||
|
||||
this->transport_t += end_t - start_t;
|
||||
}
|
||||
|
||||
void AdvectionModule::initializeParams(RInsidePOET &R) {
|
||||
const std::uint32_t n = this->n_cells[0] =
|
||||
R.parseEval("mysetup$grid$n_cells[1]");
|
||||
const std::uint32_t m = this->n_cells[1] =
|
||||
R.parseEval("mysetup$grid$n_cells[2]");
|
||||
|
||||
const std::uint32_t x = this->s_cells[0] =
|
||||
R.parseEval("mysetup$grid$s_cells[1]");
|
||||
const std::uint32_t y = this->s_cells[1] =
|
||||
R.parseEval("mysetup$grid$s_cells[2]");
|
||||
|
||||
this->field_size = n * m;
|
||||
this->cell_volume = (s_cells[0] * s_cells[1]) / this->field_size;
|
||||
|
||||
// HACK: For now, we neglect porisity, density and water saturation of the
|
||||
// cell
|
||||
this->porosity = std::vector<double>(field_size, 1.);
|
||||
this->density = std::vector<double>(field_size, 1.);
|
||||
this->water_saturation = std::vector<double>(field_size, 1.);
|
||||
|
||||
const auto init_vec =
|
||||
Rcpp::as<Rcpp::NumericVector>(R.parseEval("mysetup$advection$init"));
|
||||
|
||||
// Initialize prop names and according values
|
||||
const auto prop_names = Rcpp::as<std::vector<std::string>>(init_vec.names());
|
||||
std::vector<std::vector<double>> init_field;
|
||||
init_field.reserve(prop_names.size());
|
||||
|
||||
for (const auto &val : init_vec) {
|
||||
init_field.push_back(std::vector<double>(field_size, val));
|
||||
}
|
||||
|
||||
// Set inner constant cells. Is it needed?
|
||||
const Rcpp::DataFrame vecinj = R.parseEval("mysetup$advection$vecinj");
|
||||
const Rcpp::DataFrame inner_vecinj =
|
||||
R.parseEval("mysetup$advection$vecinj_inner");
|
||||
|
||||
for (std::size_t i = 0; i < prop_names.size(); i++) {
|
||||
const Rcpp::NumericVector species_vecinj = vecinj[prop_names[i]];
|
||||
this->boundary_condition.push_back(species_vecinj[0]);
|
||||
}
|
||||
|
||||
// same logic as for diffusion module applied
|
||||
for (std::size_t i = 0; i < inner_vecinj.size(); i++) {
|
||||
const Rcpp::NumericVector tuple = inner_vecinj[i];
|
||||
const std::uint32_t cell_index = (tuple[2] - 1) + ((tuple[1] - 1) * n);
|
||||
std::vector<double> curr_cell_state(prop_names.size());
|
||||
for (std::size_t prop_i = 0; prop_i < prop_names.size(); prop_i++) {
|
||||
const Rcpp::NumericVector curr_prop_vec_inj = vecinj[prop_names[prop_i]];
|
||||
init_field[prop_i][cell_index] = curr_prop_vec_inj[tuple[0] - 1];
|
||||
curr_cell_state[prop_i] = curr_prop_vec_inj[tuple[0] - 1];
|
||||
}
|
||||
this->inactive_cells.insert({cell_index, std::move(curr_cell_state)});
|
||||
}
|
||||
|
||||
this->t_field = Field(field_size, init_field, prop_names);
|
||||
|
||||
// FIXME: parse velocities in instantiation of class
|
||||
const Rcpp::List rcpp_flux_list = R.parseEval("mysetup$advection$const_flux");
|
||||
this->flux.resize(rcpp_flux_list.size());
|
||||
|
||||
for (std::size_t i = 0; i < rcpp_flux_list.size(); i++) {
|
||||
const auto flux_2d = Rcpp::as<std::vector<double>>(rcpp_flux_list[i]);
|
||||
this->flux[i] = flux_2d;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace poet
|
||||
111
src/Transport/AdvectionModule.hpp
Normal file
111
src/Transport/AdvectionModule.hpp
Normal file
@ -0,0 +1,111 @@
|
||||
/*
|
||||
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
|
||||
** Potsdam)
|
||||
**
|
||||
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
|
||||
**
|
||||
** POET is free software; you can redistribute it and/or modify it under the
|
||||
** terms of the GNU General Public License as published by the Free Software
|
||||
** Foundation; either version 2 of the License, or (at your option) any later
|
||||
** version.
|
||||
**
|
||||
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
**
|
||||
** You should have received a copy of the GNU General Public License along with
|
||||
** this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#ifndef ADVECTION_MODULE_H
|
||||
#define ADVECTION_MODULE_H
|
||||
|
||||
#include "../Base/RInsidePOET.hpp"
|
||||
#include "../DataStructures/DataStructures.hpp"
|
||||
|
||||
#include <Rcpp.h>
|
||||
#include <array>
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace poet {
|
||||
/**
|
||||
* @brief Class describing transport simulation using advection
|
||||
*
|
||||
* Quick and dirty implementation, assuming homogenous and constant porisity,
|
||||
* pressure and temperature. Open boundary conditions are assumed.
|
||||
*
|
||||
*/
|
||||
|
||||
class AdvectionModule {
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a new TransportSim object
|
||||
*
|
||||
* The instance will only be initialized with given R object.
|
||||
*
|
||||
* @param R RRuntime object
|
||||
*/
|
||||
AdvectionModule(RInsidePOET &R) { initializeParams(R); }
|
||||
|
||||
/**
|
||||
* @brief Run simulation for one iteration
|
||||
*
|
||||
* This will simply call the R function 'master_advection'
|
||||
*
|
||||
*/
|
||||
void simulate(double dt);
|
||||
|
||||
/**
|
||||
* @brief Get the transport time
|
||||
*
|
||||
* @return double time spent in transport
|
||||
*/
|
||||
double getTransportTime() const { return this->transport_t; }
|
||||
|
||||
/**
|
||||
* \brief Returns the current diffusion field.
|
||||
*
|
||||
* \return Reference to the diffusion field.
|
||||
*/
|
||||
Field &getField() { return this->t_field; }
|
||||
|
||||
private:
|
||||
void initializeParams(RInsidePOET &R);
|
||||
|
||||
double calcDeltaConc(std::size_t cell_index, double bc_val,
|
||||
const std::vector<double> &spec_vec,
|
||||
const std::vector<double> &flux);
|
||||
|
||||
std::vector<double> CFLTimeVec(double req_dt,
|
||||
const std::vector<double> &max_fluxes);
|
||||
|
||||
std::array<std::uint32_t, 2> n_cells;
|
||||
std::array<double, 2> s_cells;
|
||||
std::uint32_t field_size;
|
||||
double cell_volume;
|
||||
|
||||
std::vector<double> porosity;
|
||||
std::vector<double> density;
|
||||
std::vector<double> water_saturation;
|
||||
|
||||
std::map<std::uint32_t, std::vector<double>> inactive_cells;
|
||||
std::vector<double> boundary_condition;
|
||||
|
||||
// FIXME: parse velocities in instantiation of class
|
||||
std::vector<std::vector<double>> flux;
|
||||
|
||||
Field t_field;
|
||||
|
||||
/**
|
||||
* @brief time spent for transport
|
||||
*
|
||||
*/
|
||||
double transport_t = 0.f;
|
||||
};
|
||||
} // namespace poet
|
||||
|
||||
#endif // ADVECTION_MODULE_H
|
||||
57
src/poet.cpp
57
src/poet.cpp
@ -23,6 +23,7 @@
|
||||
#include "Base/RInsidePOET.hpp"
|
||||
#include "Base/SimParams.hpp"
|
||||
#include "Chemistry/ChemistryModule.hpp"
|
||||
#include "Transport/AdvectionModule.hpp"
|
||||
#include "Transport/DiffusionModule.hpp"
|
||||
|
||||
#include <poet.hpp>
|
||||
@ -34,6 +35,7 @@
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <vector>
|
||||
|
||||
#include <mpi.h>
|
||||
@ -114,11 +116,14 @@ void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
|
||||
chem.LoadDatabase(database_path);
|
||||
}
|
||||
|
||||
inline double RunMasterLoop(SimParams ¶ms, RInside &R,
|
||||
inline double RunMasterLoop(SimParams ¶ms, RInsidePOET &R,
|
||||
const GridParams &g_params, uint32_t nxyz_master) {
|
||||
|
||||
DiffusionParams d_params{R};
|
||||
DiffusionModule diffusion(d_params, g_params);
|
||||
// DiffusionParams d_params{R};
|
||||
// DiffusionModule diffusion(d_params, g_params);
|
||||
|
||||
AdvectionModule advection(R);
|
||||
|
||||
/* Iteration Count is dynamic, retrieving value from R (is only needed by
|
||||
* master for the following loop) */
|
||||
uint32_t maxiter = R.parseEval("mysetup$iterations");
|
||||
@ -131,8 +136,7 @@ inline double RunMasterLoop(SimParams ¶ms, RInside &R,
|
||||
set_chem_parameters(chem, nxyz_master, params.getChemParams().database_path);
|
||||
chem.RunInitFile(params.getChemParams().input_script);
|
||||
|
||||
poet::ChemistryModule::SingleCMap init_df = DFToHashMap(d_params.initial_t);
|
||||
chem.initializeField(diffusion.getField());
|
||||
chem.initializeField(advection.getField());
|
||||
|
||||
if (params.getNumParams().print_progressbar) {
|
||||
chem.setProgressBarPrintout(true);
|
||||
@ -161,17 +165,17 @@ inline double RunMasterLoop(SimParams ¶ms, RInside &R,
|
||||
|
||||
/* run transport */
|
||||
// TODO: transport to diffusion
|
||||
diffusion.simulate(dt);
|
||||
advection.simulate(dt);
|
||||
|
||||
chem.getField().update(diffusion.getField());
|
||||
chem.getField().update(advection.getField());
|
||||
|
||||
MSG("Chemistry step");
|
||||
|
||||
chem.SetTimeStep(dt);
|
||||
chem.RunCells();
|
||||
|
||||
writeFieldsToR(R, diffusion.getField(), chem.GetField());
|
||||
diffusion.getField().update(chem.GetField());
|
||||
writeFieldsToR(R, advection.getField(), chem.GetField());
|
||||
advection.getField().update(chem.GetField());
|
||||
|
||||
R["req_dt"] = dt;
|
||||
R["simtime"] = (sim_time += dt);
|
||||
@ -214,7 +218,7 @@ inline double RunMasterLoop(SimParams ¶ms, RInside &R,
|
||||
R["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings());
|
||||
R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
|
||||
|
||||
R["simtime_transport"] = diffusion.getTransportTime();
|
||||
R["simtime_transport"] = advection.getTransportTime();
|
||||
R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
|
||||
|
||||
// R["phreeqc_count"] = phreeqc_counts;
|
||||
@ -225,29 +229,30 @@ inline double RunMasterLoop(SimParams ¶ms, RInside &R,
|
||||
R.parseEvalQ("profiling$dht_hits <- dht_hits");
|
||||
R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
|
||||
R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
|
||||
R["dht_corrupt_buckets"] = Rcpp::wrap(chem.GetWorkerDHTCorruptBuckets());
|
||||
R.parseEvalQ("profiling$dht_corrupt_buckets <- dht_corrupt_buckets");
|
||||
R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
|
||||
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
|
||||
R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());
|
||||
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
|
||||
}
|
||||
if (params.getChemParams().use_interp) {
|
||||
R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
|
||||
R.parseEvalQ("profiling$interp_write <- interp_w");
|
||||
R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
|
||||
R.parseEvalQ("profiling$interp_read <- interp_r");
|
||||
R["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
|
||||
R.parseEvalQ("profiling$interp_gather <- interp_g");
|
||||
R["interp_fc"] =
|
||||
Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
|
||||
R.parseEvalQ("profiling$interp_function_calls <- interp_fc");
|
||||
R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls());
|
||||
R.parseEvalQ("profiling$interp_calls <- interp_calls");
|
||||
R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
|
||||
R.parseEvalQ("profiling$interp_cached <- interp_cached");
|
||||
}
|
||||
// if (params.getChemParams().use_interp) {
|
||||
// R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
|
||||
// R.parseEvalQ("profiling$interp_write <- interp_w");
|
||||
// R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
|
||||
// R.parseEvalQ("profiling$interp_read <- interp_r");
|
||||
// R["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
|
||||
// R.parseEvalQ("profiling$interp_gather <- interp_g");
|
||||
// R["interp_fc"] =
|
||||
// Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
|
||||
// R.parseEvalQ("profiling$interp_function_calls <- interp_fc");
|
||||
// R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls());
|
||||
// R.parseEvalQ("profiling$interp_calls <- interp_calls");
|
||||
// R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
|
||||
// R.parseEvalQ("profiling$interp_cached <- interp_cached");
|
||||
// }
|
||||
|
||||
chem.MasterLoopBreak();
|
||||
diffusion.end();
|
||||
|
||||
return dSimTime;
|
||||
}
|
||||
|
||||
@ -7,10 +7,15 @@ target_link_libraries(testPOET doctest poetlib)
|
||||
target_include_directories(testPOET PRIVATE "${PROJECT_SOURCE_DIR}/src")
|
||||
|
||||
get_filename_component(TEST_RInsideSourceFile "RInsidePOET_funcs.R" REALPATH)
|
||||
configure_file(testDataStructures.hpp.in testDataStructures.hpp)
|
||||
get_filename_component(TEST_SampleInputScript "${PROJECT_SOURCE_DIR}/test/advection/dolo_adv.R" REALPATH)
|
||||
configure_file(testInputFiles.hpp.in InputFiles.hpp)
|
||||
target_include_directories(testPOET PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
|
||||
|
||||
add_custom_target(check
|
||||
COMMAND $<TARGET_FILE:testPOET>
|
||||
DEPENDS testPOET
|
||||
)
|
||||
|
||||
add_executable(advection advection/testAdvection.cpp)
|
||||
target_include_directories(advection PRIVATE "${CMAKE_CURRENT_BINARY_DIR}" "${PROJECT_SOURCE_DIR}/src")
|
||||
target_link_libraries(advection poetlib)
|
||||
|
||||
121
test/advection/dolo_adv.R
Normal file
121
test/advection/dolo_adv.R
Normal file
@ -0,0 +1,121 @@
|
||||
## Time-stamp: "Last modified 2023-08-22 12:44:53 mluebke"
|
||||
|
||||
#################################################################
|
||||
## Section 1 ##
|
||||
## Grid initialization ##
|
||||
#################################################################
|
||||
|
||||
n <- 5
|
||||
m <- 5
|
||||
|
||||
grid <- list(
|
||||
n_cells = c(n, m),
|
||||
s_cells = c(n, m)
|
||||
)
|
||||
|
||||
##################################################################
|
||||
## Section 2 ##
|
||||
## Advection parameters ##
|
||||
##################################################################
|
||||
|
||||
## initial conditions
|
||||
|
||||
## HACK: We need the chemical initialization here, as chem module initialization
|
||||
## depends on transport until now. This will change in the future.
|
||||
init_adv <- c(
|
||||
"H" = 110.124,
|
||||
"O" = 55.0622,
|
||||
"Charge" = -1.216307659761E-09,
|
||||
"C(4)" = 1.230067028174E-04,
|
||||
"Ca" = 1.230067028174E-04,
|
||||
"Cl" = 0,
|
||||
"Mg" = 0
|
||||
)
|
||||
|
||||
## list of boundary conditions/inner nodes
|
||||
vecinj_adv <- list(
|
||||
list(
|
||||
"H" = 110.124,
|
||||
"O" = 55.0622,
|
||||
"Charge" = -1.217e-09,
|
||||
"C(4)" = 0,
|
||||
"Ca" = 0,
|
||||
"Cl" = 0,
|
||||
"Mg" = 0
|
||||
),
|
||||
list(
|
||||
"H" = 110.683,
|
||||
"O" = 55.3413,
|
||||
"Charge" = 1.90431e-16,
|
||||
"C(4)" = 0,
|
||||
"Ca" = 0,
|
||||
"Cl" = 0.002,
|
||||
"Mg" = 0.001
|
||||
)
|
||||
)
|
||||
|
||||
vecinj_inner <- list(
|
||||
l1 = c(2, 1, 1)
|
||||
)
|
||||
|
||||
# Create a list to store grid cell information
|
||||
flux_list <- list()
|
||||
|
||||
# Function to get the index of a grid cell given its row and column
|
||||
get_index <- function(row, col) {
|
||||
index <- (row - 1) * m + col
|
||||
if (index < 1) {
|
||||
index <- -1
|
||||
} else if (index > n * m) {
|
||||
index <- -1
|
||||
}
|
||||
return(index)
|
||||
}
|
||||
|
||||
flux_val <- 1
|
||||
|
||||
# Loop through each row and column to populate the flux_list
|
||||
for (row in 1:n) {
|
||||
for (col in 1:m) {
|
||||
index <- get_index(row, col)
|
||||
|
||||
# Initialize the connections for the current cell
|
||||
flux <- c()
|
||||
|
||||
# Check and add connections to the east, south, west, and north cells
|
||||
# east
|
||||
flux <- c(flux, -flux_val)
|
||||
|
||||
# south
|
||||
flux <- c(flux, -flux_val)
|
||||
|
||||
# west
|
||||
flux <- c(flux, flux_val)
|
||||
|
||||
# north
|
||||
flux <- c(flux, flux_val)
|
||||
|
||||
# Store the connections in the flux_list
|
||||
flux_list[[index]] <- flux
|
||||
}
|
||||
}
|
||||
|
||||
vecinj <- do.call(rbind.data.frame, vecinj_adv)
|
||||
names(vecinj) <- names(init_adv)
|
||||
|
||||
advection <- list(
|
||||
init = init_adv,
|
||||
vecinj = vecinj,
|
||||
vecinj_inner = vecinj_inner,
|
||||
const_flux = flux_list
|
||||
)
|
||||
|
||||
#################################################################
|
||||
## Section 4 ##
|
||||
## Putting all those things together ##
|
||||
#################################################################
|
||||
|
||||
setup <- list(
|
||||
grid = grid,
|
||||
advection = advection
|
||||
)
|
||||
31
test/advection/testAdvection.cpp
Normal file
31
test/advection/testAdvection.cpp
Normal file
@ -0,0 +1,31 @@
|
||||
#include <Transport/AdvectionModule.hpp>
|
||||
|
||||
#include <cstddef>
|
||||
#include <string>
|
||||
|
||||
#include "InputFiles.hpp"
|
||||
|
||||
using namespace poet;
|
||||
|
||||
constexpr std::size_t MAX_ITER = 10;
|
||||
constexpr double DT = 200;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
auto &R = RInsidePOET::getInstance();
|
||||
|
||||
R["input_script"] = SampleInputScript;
|
||||
R.parseEvalQ("source(input_script)");
|
||||
R.parseEval("mysetup <- setup");
|
||||
|
||||
AdvectionModule adv(R);
|
||||
R["field"] = adv.getField().asSEXP();
|
||||
R.parseEval("saveRDS(field, 'adv_0.rds')");
|
||||
|
||||
for (std::size_t i = 0; i < MAX_ITER; i++) {
|
||||
adv.simulate(DT);
|
||||
const std::string save_q =
|
||||
"saveRDS(field, 'adv_" + std::to_string(i + 1) + ".rds')";
|
||||
R["field"] = adv.getField().asSEXP();
|
||||
R.parseEval(save_q);
|
||||
}
|
||||
}
|
||||
@ -1,6 +0,0 @@
|
||||
#ifndef TESTRINSIDEPOET_H_
|
||||
#define TESTRINSIDEPOET_H_
|
||||
|
||||
inline const char *RInside_source_file = "@TEST_RInsideSourceFile@";
|
||||
|
||||
#endif // TESTRINSIDEPOET_H_
|
||||
@ -11,7 +11,7 @@
|
||||
#include <Base/RInsidePOET.hpp>
|
||||
#include <DataStructures/DataStructures.hpp>
|
||||
|
||||
#include "testDataStructures.hpp"
|
||||
#include <InputFiles.hpp>
|
||||
|
||||
using namespace poet;
|
||||
|
||||
|
||||
7
test/testInputFiles.hpp.in
Normal file
7
test/testInputFiles.hpp.in
Normal file
@ -0,0 +1,7 @@
|
||||
#ifndef TESTINPUTFILES_H_
|
||||
#define TESTINPUTFILES_H_
|
||||
|
||||
inline const char *RInside_source_file = "@TEST_RInsideSourceFile@";
|
||||
inline const char *SampleInputScript = "@TEST_SampleInputScript@";
|
||||
|
||||
#endif // TESTINPUTFILES_H_
|
||||
@ -7,7 +7,7 @@
|
||||
#include <Base/RInsidePOET.hpp>
|
||||
#include <DataStructures/DataStructures.hpp>
|
||||
|
||||
#include "testDataStructures.hpp"
|
||||
#include <InputFiles.hpp>
|
||||
|
||||
TEST_CASE("NamedVector") {
|
||||
RInsidePOET &R = RInsidePOET::getInstance();
|
||||
|
||||
144
util/Advection2D.R
Normal file
144
util/Advection2D.R
Normal file
@ -0,0 +1,144 @@
|
||||
## Time-stamp: "Last modified 2023-08-18 21:46:35 delucia"
|
||||
|
||||
library(ReacTran)
|
||||
library(deSolve)
|
||||
library(rootSolve)
|
||||
options(width=114)
|
||||
|
||||
|
||||
Centre <- function(N) {
|
||||
N2 <- ceiling(N/2)
|
||||
N2*(N+1)-N
|
||||
}
|
||||
|
||||
# The model equations
|
||||
HydraulicCharge <- function(t, y, parms) {
|
||||
CONC <- matrix(nrow = parms["Np"], ncol = parms["Np"], data = y)
|
||||
## cat("CONC[N2,N2]=",CONC[N2,N2],"\n")
|
||||
CONC[ parms["N2p"], parms["N2p"] ] <- parms["injp"]
|
||||
dCONC <- tran.2D(CONC, dx = parms["dxp"], dy = parms["dxp"],
|
||||
D.x=parms["Kp"],
|
||||
D.y=parms["Kp"],
|
||||
C.x.up = rep(parms["boundp"], parms["Np"]),
|
||||
C.x.down = rep(parms["boundp"], parms["Np"]),
|
||||
C.y.up = rep(parms["boundp"], parms["Np"]),
|
||||
C.y.down = rep(parms["boundp"], parms["Np"]))$dC
|
||||
dCONC[ Centre(parms["Np"]) ] <- 0
|
||||
return(list(dCONC))
|
||||
}
|
||||
|
||||
|
||||
DarcyVec <- function(v, bound, delta) {
|
||||
aug <- c(bound, v, bound)
|
||||
grad <- diff(aug)/delta
|
||||
}
|
||||
|
||||
|
||||
Darcy <- function(h, dx, boundary, K) {
|
||||
nx <- ncol(h)
|
||||
ny <- nrow(h)
|
||||
## nx+1 x-components of \vec{U}
|
||||
Ux <- -K*apply(h, 1, DarcyVec, bound=boundary, delta=dx)
|
||||
## ny+1 y-components of \vec{U}
|
||||
Uy <- -K*apply(h, 2, DarcyVec, bound=boundary, delta=dx)
|
||||
list(Ux=t(Ux), Uy=Uy)
|
||||
}
|
||||
|
||||
## To reassemble Darcy velocities from cell interfaces to cell centres
|
||||
RollSums <- function(vec) {
|
||||
vec[-length(vec)] + vec[-1]
|
||||
}
|
||||
|
||||
|
||||
##' @title Compute Darcy velocities assuming steady flow
|
||||
##' @param N number of cells per side in 2D square grid
|
||||
##' @param L domain size (m)
|
||||
##' @param inj_h fixed hydraulic charge at domain center
|
||||
##' @param bound_h fixed hydraulic charge at all boundaries
|
||||
##' @param K permeability
|
||||
##' @return
|
||||
##' @author
|
||||
GenerateVelocities <- function(N, L, inj_h=20, bound_h=1, K=1E-4) {
|
||||
require(ReacTran)
|
||||
require(deSolve)
|
||||
|
||||
## Construction of the 2D grid
|
||||
x.grid <- setup.grid.1D(x.up = 0, L = L, N = N)
|
||||
y.grid <- setup.grid.1D(x.up = 0, L = L, N = N)
|
||||
grid2D <- setup.grid.2D(x.grid, y.grid)
|
||||
|
||||
dx <- L/N
|
||||
|
||||
## rough "center" of square domain
|
||||
N2 <- ceiling(N/2)
|
||||
|
||||
## initial condition: "boundary" everywhere...
|
||||
y <- matrix(nrow = N, ncol = N, data = bound_h)
|
||||
## except in the central point!
|
||||
y[N2, N2] <- inj_h
|
||||
|
||||
pars <- c(Kp=K, boundp = bound_h, Np=N, N2p=N2, injp=inj_h, dxp=dx)
|
||||
cat("\n:: calling ode.2D...")
|
||||
outc <- deSolve::ode.2D(y = y, func = HydraulicCharge,
|
||||
times = c(0, 1E7), parms = pars,
|
||||
dim = c(N, N), lrw = 71485484)
|
||||
|
||||
cat(" [ DONE ]\n")
|
||||
charge <- matrix(outc[2,-1], nrow = N, ncol = N)
|
||||
|
||||
|
||||
cat(":: Computing charge gradient...")
|
||||
U <- Darcy(charge, dx=dx, boundary=bound_h, K=K)
|
||||
cat(" [ DONE ]\n")
|
||||
|
||||
## Reassemble the total velocities per cell centre
|
||||
fx <- -t(apply(U$Ux, 1, RollSums))
|
||||
fy <- -apply(U$Uy, 2, RollSums)
|
||||
|
||||
x <- y <- seq(dx/2, L - dx/2, length=N)
|
||||
xx <- rep(x, N)
|
||||
yy <- rep(y, each=N)
|
||||
|
||||
norms <- sqrt(fx**2+fy**2)
|
||||
tab <- data.frame(x=xx, y=yy, norm=as.numeric(norms), ux=as.numeric(fx), uy=as.numeric(fy))
|
||||
|
||||
list(Charge=charge, outc=outc, U=U, xy=tab)
|
||||
}
|
||||
|
||||
|
||||
PlotFlow <- function(tab, skip=1, scale=TRUE, arr.len=0.05,
|
||||
arr.lwd = 1, ...) {
|
||||
if (scale) {
|
||||
tab$ux <- scale(tab$ux, center=FALSE)
|
||||
tab$uy <- scale(tab$uy, center=FALSE)
|
||||
}
|
||||
|
||||
ngrid <- sqrt(nrow(tab))
|
||||
dx <- 2*tab$x[1]
|
||||
fieldlen <- dx*ngrid
|
||||
|
||||
plot(0, 0, "n", xlim = c(0,fieldlen), ylim = c(0,fieldlen),
|
||||
asp = 1, xlab="EASTING", ylab="NORTHING", las=1, ...)
|
||||
|
||||
arrows(x0 = tab$x, y0 = tab$y,
|
||||
x1 = tab$x - tab$uy, y1 = tab$y - tab$ux,
|
||||
length=arr.len, lwd=arr.lwd)
|
||||
|
||||
invisible()
|
||||
}
|
||||
|
||||
|
||||
## square domain of 100x100 m discretized in 101x101 cells
|
||||
aa <- GenerateVelocities(N=201, L=100, inj_h=20, bound_h=1, K=1E-2)
|
||||
|
||||
## plot hydraulic charge (via deSolve's method!)
|
||||
image(aa$outc, ask = FALSE, main ="Hydraulic charge",
|
||||
legend = TRUE, add.contour = FALSE, subset = time == 1E7,
|
||||
xlab="", ylab="", axes=FALSE, asp=1)
|
||||
|
||||
x11()
|
||||
|
||||
PlotFlow(aa$xy, skip=1, scale=TRUE, arr.lwd = 0.5)
|
||||
|
||||
|
||||
image(matrix(log10(aa$xy$norm), ncol=201), asp=1)
|
||||
Loading…
x
Reference in New Issue
Block a user