Compare commits

...

39 Commits

Author SHA1 Message Date
Max Lübke
6c226834db refactor: Migrate DHT implementation to LUCX 2025-03-03 10:14:57 +01:00
Max Luebke
8d02293cf5 refactor(dht): remove unused preprocessor directive for UCX 2025-02-27 21:43:56 +01:00
Max Luebke
f04d1c491d chore(DHT): update submodule to latest commit 2025-02-27 20:06:10 +01:00
Max Lübke
751faeabc8 fix(dht): pass communicator to DHT_create 2025-02-26 13:22:37 +01:00
Max Lübke
680069b8ac refactor: Replace DHT_ucx with LUCX/DHT in DHT_Wrapper 2025-02-26 13:09:28 +01:00
Max Lübke
d10bf50f66 Update benchmark 2024-04-02 22:53:52 +02:00
Max Lübke
534cc0f6bc Fix CFL-condition for now 2024-03-19 11:38:48 +01:00
Max Lübke
d5e11c851b Add support for tracking and reporting corrupt DHT buckets 2024-03-19 11:38:17 +01:00
Max Lübke
b0a4e7d64f Update to only store first and last iteration of advection model 2024-03-07 14:26:09 +01:00
Max Lübke
2c17604882 Update build process to use external library for DHT_MPI too 2024-03-07 13:58:04 +01:00
Max Lübke
7828b00268 Add OpenSSL dependency and implement MD5 hash function 2024-03-07 12:29:03 +01:00
Max Lübke
6abf18ae73 Implement basic functionality using DHT_UCX 2024-03-07 12:00:17 +01:00
Max Lübke
64568d5074 Add DHT submodule 2024-03-07 11:41:02 +01:00
Max Lübke
b0dbe9e7b8 Update parameters in dolo_adv.R 2024-03-07 11:32:31 +01:00
Max Lübke
e3b7824dd4 Fix variable time steps in AdvectionModule.cpp 2024-03-07 11:29:50 +01:00
Max Lübke
b3e879635a Update flux_val and iterations values 2024-03-06 16:06:44 +01:00
Max Lübke
9c9abdb7ef Update include statement in test cases 2024-03-06 13:20:24 +00:00
Max Lübke
be7a861c78 Add OpenMP support and fix include paths 2024-03-06 13:06:53 +00:00
Max Lübke
106f9c519e Update grid size and flux value in dolo_adv.R 2024-03-06 12:57:56 +00:00
Max Lübke
b1277bac22 Fix parsing of velocities in AdvectionModule constructor by using List over DataFrame 2024-03-06 12:57:56 +00:00
Max Lübke
0c31a9c372 add benchmark for nico 2024-03-06 12:57:56 +00:00
Max Lübke
32b2c5c741 try closed boundary 2024-03-06 12:57:56 +00:00
Max Lübke
115dd87ba3 add output of time measurements 2024-03-06 12:57:45 +00:00
Max Lübke
272f010047 load constant velocities at instantiation 2024-03-06 12:57:36 +00:00
Max Lübke
4f0a8f3742 add openmp pragmas 2024-03-06 12:57:25 +00:00
Max Lübke
7187582910 Revert "define fluxes and max fluxes as class members"
This reverts commit d218697c3bc7769c285d8f1e865bf0bd20cf2299.
2024-03-06 12:57:13 +00:00
Max Lübke
3a2769b1f6 add Phreeqc input script to install target 2024-03-06 12:57:06 +00:00
Max Lübke
7dfe4efc23 define fluxes and max fluxes as class members 2024-03-06 12:57:06 +00:00
Max Luebke
75f1036e3e remove all unusable benchmarks for sake of clarity 2024-03-06 12:56:57 +00:00
Max Luebke
358aa51836 rename header containing file paths for tests 2024-03-06 12:56:57 +00:00
Max Luebke
d6c4be6ea3 fix wrong indexing 2024-03-06 12:56:34 +00:00
Max Luebke
b78d7d1a9b provide additional test input script 2024-03-06 12:56:22 +00:00
Max Luebke
031a2e237b Implement advection into POET 2024-03-06 12:56:22 +00:00
Max Luebke
201564d4d1 Store inactive cells with predefined values (bc) 2024-03-06 12:56:10 +00:00
Marco De Lucia
43b70b089e reverted PlotFlow in Advection2D.R 2024-03-06 12:56:10 +00:00
Marco De Lucia
664d92adf6 added Advection2D.R to util 2024-03-06 12:56:10 +00:00
Max Lübke
9f6b27206e Implemented CFL condition 2024-03-06 12:56:10 +00:00
Max Lübke
5e9639c72d First advection with fixed timestep and volume 2024-03-06 12:55:53 +00:00
Max Luebke
430769f247 prep advection 2024-03-06 12:55:25 +00:00
32 changed files with 1222 additions and 1200 deletions

3
.gitmodules vendored
View File

@ -8,3 +8,6 @@
[submodule "ext/doctest"] [submodule "ext/doctest"]
path = ext/doctest path = ext/doctest
url = https://github.com/doctest/doctest.git url = https://github.com/doctest/doctest.git
[submodule "ext/DHT"]
path = ext/DHT
url = git@gitup.uni-potsdam.de:mluebke/dht_ucx.git

View File

@ -19,8 +19,9 @@ get_poet_version()
# set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS") add_definitions(${GCC_CXX_FLAGS}) # set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS") add_definitions(${GCC_CXX_FLAGS})
find_package(MPI REQUIRED) find_package(MPI REQUIRED)
find_package(OpenMP)
find_package(RRuntime REQUIRED) find_package(RRuntime REQUIRED)
find_package(OpenSSL REQUIRED)
add_subdirectory(src) add_subdirectory(src)
add_subdirectory(bench) add_subdirectory(bench)
@ -31,6 +32,10 @@ set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
add_subdirectory(ext/tug EXCLUDE_FROM_ALL) add_subdirectory(ext/tug EXCLUDE_FROM_ALL)
add_subdirectory(ext/phreeqcrm EXCLUDE_FROM_ALL) add_subdirectory(ext/phreeqcrm EXCLUDE_FROM_ALL)
option(DHT_WITH_MPI "" ON)
mark_as_advanced(DHT_WITH_MPI)
add_subdirectory(ext/DHT EXCLUDE_FROM_ALL)
option(POET_ENABLE_TESTING "Build test suite for POET" OFF) option(POET_ENABLE_TESTING "Build test suite for POET" OFF)
if (POET_ENABLE_TESTING) if (POET_ENABLE_TESTING)

View File

@ -1,3 +1,3 @@
add_subdirectory(dolo) add_subdirectory(dolo)
add_subdirectory(surfex) #add_subdirectory(surfex)
add_subdirectory(barite) #add_subdirectory(barite)

View File

@ -1,8 +1,9 @@
install(FILES install(FILES
dolo_diffu_inner.R # dolo_diffu_inner.R
dolo_diffu_inner_large.R # dolo_diffu_inner_large.R
dolo_inner.pqi dolo_inner.pqi
dolo_interp_long.R # dolo_interp_long.R
dolo_adv.R
phreeqc_kin.dat phreeqc_kin.dat
DESTINATION DESTINATION
share/poet/bench/dolo share/poet/bench/dolo

219
bench/dolo/dolo_adv.R Normal file
View File

@ -0,0 +1,219 @@
## Time-stamp: "Last modified 2023-09-06 10:58:16 mluebke"
database <- normalizePath("../share/poet/bench/dolo/phreeqc_kin.dat")
input_script <- normalizePath("../share/poet/bench/dolo/dolo_inner.pqi")
#################################################################
## Section 1 ##
## Grid initialization ##
#################################################################
n <- 1500
m <- 500
types <- c("scratch", "phreeqc", "rds")
init_cell <- list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = -5.0822e-19,
"C(4)" = 1.2279E-4,
"Ca" = 1.2279E-4,
"Cl" = 0,
"Mg" = 0,
"O2g" = 0.499957,
"Calcite" = 2.07e-4,
"Dolomite" = 0
)
grid <- list(
n_cells = c(n, m),
s_cells = c(n, m),
type = types[1]
)
##################################################################
## Section 2 ##
## Advection parameters ##
##################################################################
## initial conditions
## HACK: We need the chemical initialization here, as chem module initialization
## depends on transport until now. This will change in the future.
init_adv <- c(
"H" = 110.124,
"O" = 55.0622,
"Charge" = -1.216307659761E-09,
"C(4)" = 1.230067028174E-04,
"Ca" = 1.230067028174E-04,
"Cl" = 0,
"Mg" = 0
)
## list of boundary conditions/inner nodes
vecinj_adv <- list(
list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = 1.90431e-16,
"C(4)" = 0,
"Ca" = 0,
"Cl" = 0.002,
"Mg" = 0.001
),
list(
# "H" = 110.124,
# "O" = 55.0622,
# "Charge" = -1.216307659761E-09,
# "C(4)" = 1.230067028174E-04,
# "Ca" = 1.230067028174E-04,
# "Cl" = 0,
# "Mg" = 0
"H" = 110.124,
"O" = 55.0622,
"Charge" = -1.217e-09,
"C(4)" = 0,
"Ca" = 0,
"Cl" = 0,
"Mg" = 0
)
)
vecinj_inner <- list(
# l1 = c(2, 1, 1)
)
# Create a list to store grid cell information
flux_list <- list()
# Function to get the index of a grid cell given its row and column
get_index <- function(row, col) {
index <- (row - 1) * m + col
if (index < 1) {
index <- -1
} else if (index > n * m) {
index <- -1
}
return(index)
}
flux_val <- 0.005
# Loop through each row and column to populate the flux_list
for (row in 1:n) {
for (col in 1:m) {
index <- get_index(row, col)
# Initialize the connections for the current cell
flux <- c()
# Check and add connections to the east, south, west, and north cells
# east
flux <- c(flux, -flux_val)
# south
flux <- c(flux, -flux_val)
# west
flux <- c(flux, flux_val)
# north
flux <- c(flux, flux_val)
# Store the connections in the flux_list
flux_list[[index]] <- flux
}
}
vecinj <- do.call(rbind.data.frame, vecinj_adv)
names(vecinj) <- names(init_adv)
advection <- list(
init = init_adv,
vecinj = vecinj,
vecinj_inner = vecinj_inner,
const_flux = flux_list
)
#################################################################
## Section 3 ##
## Chemistry module (Phreeqc) ##
#################################################################
## # optional when using DHT
dht_species <- c(
"H" = 3,
"O" = 3,
"Charge" = 3,
"C(4)" = 6,
"Ca" = 6,
"Cl" = 3,
"Mg" = 5,
"Calcite" = 4,
"Dolomite" = 4
)
check_sign_cal_dol_dht <- function(old, new) {
if ((old["Calcite"] == 0) != (new["Calcite"] == 0)) {
return(TRUE)
}
if ((old["Dolomite"] == 0) != (new["Dolomite"] == 0)) {
return(TRUE)
}
return(FALSE)
}
fuzz_input_dht_keys <- function(input) {
return(input[names(dht_species)])
}
check_sign_cal_dol_interp <- function(to_interp, data_set) {
data_set <- as.data.frame(do.call(rbind, data_set), check.names = FALSE, optional = TRUE)
names(data_set) <- names(dht_species)
cal <- (data_set$Calcite == 0) == (to_interp["Calcite"] == 0)
dol <- (data_set$Dolomite == 0) == (to_interp["Dolomite"] == 0)
cal_dol_same_sig <- cal == dol
return(rev(which(!cal_dol_same_sig)))
}
check_neg_cal_dol <- function(result) {
neg_sign <- (result["Calcite"] <- 0) || (result["Dolomite"] < 0)
return(any(neg_sign))
}
hooks <- list(
dht_fill = check_sign_cal_dol_dht,
dht_fuzz = fuzz_input_dht_keys,
interp_pre_func = check_sign_cal_dol_interp,
interp_post_func = check_neg_cal_dol
)
chemistry <- list(
database = database,
input_script = input_script,
dht_species = dht_species,
hooks = hooks
)
#################################################################
## Section 4 ##
## Putting all those things together ##
#################################################################
iterations <- 500
dt <- 600
out_save <- c(1, iterations)
setup <- list(
grid = grid,
advection = advection,
chemistry = chemistry,
iterations = iterations,
timesteps = rep(dt, iterations),
store_result = TRUE,
out_save = out_save
)

View File

@ -1265,7 +1265,7 @@ Calcite
10 moles=0 10 moles=0
20 IF ((M<=0) and (SI("Calcite")<0)) then goto 200 20 IF ((M<=0) and (SI("Calcite")<0)) then goto 200
30 R=8.314462 # in J*K-1*mol-1 30 R=8.314462 # in J*K-1*mol-1
40 deltaT=1/TK-1/298.15 # wird für 40°C berechnet; TK is temp in Kelvin 40 deltaT=1/TK-1/298.15 # wird f<EFBFBD>r 40<34>C berechnet; TK is temp in Kelvin
50 e=2.718282 # Eulersche Zahl 50 e=2.718282 # Eulersche Zahl
## mechanism 1 (acid) ## mechanism 1 (acid)
60 Ea=14400 # Aktivierungsenergie in J/mol => 65.0 in KJ/mol 60 Ea=14400 # Aktivierungsenergie in J/mol => 65.0 in KJ/mol
@ -1277,7 +1277,7 @@ Calcite
120 mech_c=(10^logK25)*(e^(-Ea/R*deltaT)) 120 mech_c=(10^logK25)*(e^(-Ea/R*deltaT))
130 rate=mech_a+mech_c 130 rate=mech_a+mech_c
140 IF (SI("Calcite")<0 AND M>0) then moles=parm(1)*rate*(1-SR("Calcite")) # dissolution 140 IF (SI("Calcite")<0 AND M>0) then moles=parm(1)*rate*(1-SR("Calcite")) # dissolution
## 145 IF SI("Calcite")>0 then moles=parm(1)*M*rate*(-1+SR("Calcite")) # precipitation ## 145 IF SI("Calcite")>0 then moles=parm(1)*M*rate*(-1+SR("Calcite")) # precipitation
## 150 moles=parm(1)*rate*(1-SR("Calcite")) # precipitation ## 150 moles=parm(1)*rate*(1-SR("Calcite")) # precipitation
200 save moles*time 200 save moles*time
-end -end
@ -1287,7 +1287,7 @@ Dolomite
10 moles=0 10 moles=0
20 IF ((M<=0) and (SI("Dolomite")<0)) then goto 200 20 IF ((M<=0) and (SI("Dolomite")<0)) then goto 200
30 R=8.314462 # in J*K-1*mol-1 30 R=8.314462 # in J*K-1*mol-1
40 deltaT=1/TK-1/298.15 # wird für 40°C berechnet; TK is temp in Kelvin 40 deltaT=1/TK-1/298.15 # wird f<EFBFBD>r 40<34>C berechnet; TK is temp in Kelvin
50 e=2.718282 # Eulersche Zahl 50 e=2.718282 # Eulersche Zahl
## mechanism 1 (acid) ## mechanism 1 (acid)
60 Ea=36100 # Aktivierungsenergie in J/mol => 65.0 in KJ/mol 60 Ea=36100 # Aktivierungsenergie in J/mol => 65.0 in KJ/mol

1
ext/DHT Submodule

@ -0,0 +1 @@
Subproject commit a22a6b2eee7c34441d6810d47ac2d9b25ce183e5

View File

@ -1,49 +1,85 @@
add_library(poetlib set(POET_SOURCES
Base/Grid.cpp Base/Grid.cpp
Base/SimParams.cpp Base/SimParams.cpp
Chemistry/ChemistryModule.cpp Chemistry/ChemistryModule.cpp
Chemistry/MasterFunctions.cpp Chemistry/MasterFunctions.cpp
Chemistry/WorkerFunctions.cpp Chemistry/WorkerFunctions.cpp
Chemistry/SurrogateModels/DHT_Wrapper.cpp Chemistry/SurrogateModels/DHT_Wrapper.cpp
Chemistry/SurrogateModels/DHT.c
Chemistry/SurrogateModels/HashFunctions.cpp Chemistry/SurrogateModels/HashFunctions.cpp
Chemistry/SurrogateModels/InterpolationModule.cpp
Chemistry/SurrogateModels/ProximityHashTable.cpp
DataStructures/Field.cpp DataStructures/Field.cpp
Transport/DiffusionModule.cpp Transport/DiffusionModule.cpp
Transport/AdvectionModule.cpp
) )
target_link_libraries(poetlib PUBLIC # option(POET_USE_DHT_MPI "Use MPI for DHT" OFF)
MPI::MPI_C #
${MATH_LIBRARY} # if (NOT POET_USE_DHT_MPI)
RRuntime # target_compile_definitions(poetlib PUBLIC POET_DHT_UCX)
PhreeqcRM # target_link_libraries(poetlib PUBLIC DHT_UCX)
tug # else()
) # target_link_libraries(poetlib PUBLIC DHT_MPI)
# endif()
target_compile_definitions(poetlib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX) #
# target_compile_definitions(poetlib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP) #
set(PHREEQCRM_DISABLE_OPENMP ON CACHE BOOL "" FORCE) # mark_as_advanced(PHREEQCRM_BUILD_MPI PHREEQCRM_DISABLE_OPENMP)
# set(PHREEQCRM_DISABLE_OPENMP ON CACHE BOOL "" FORCE)
option(POET_DHT_DEBUG "Build with DHT debug info" OFF) #
# option(POET_DHT_DEBUG "Build with DHT debug info" OFF)
if(POET_DHT_DEBUG) #
target_compile_definitions(poetlib PRIVATE DHT_STATISTICS) # if(POET_DHT_DEBUG)
endif() # target_compile_definitions(poetlib PRIVATE DHT_STATISTICS)
# endif()
option(POET_PHT_ADDITIONAL_INFO "Enables additional information in the PHT" OFF) #
# option(POET_PHT_ADDITIONAL_INFO "Enables additional information in the PHT" OFF)
if (POET_PHT_ADDITIONAL_INFO) #
target_compile_definitions(poetlib PRIVATE POET_PHT_ADD) # if (POET_PHT_ADDITIONAL_INFO)
endif() # target_compile_definitions(poetlib PRIVATE POET_PHT_ADD)
# endif()
#
# if (OpenMP_FOUND)
# target_link_libraries(poetlib PUBLIC OpenMP::OpenMP_CXX)
# endif()
file(READ "${PROJECT_SOURCE_DIR}/R_lib/kin_r_library.R" R_KIN_LIB ) file(READ "${PROJECT_SOURCE_DIR}/R_lib/kin_r_library.R" R_KIN_LIB )
configure_file(poet.hpp.in poet.hpp @ONLY) configure_file(poet.hpp.in poet.hpp @ONLY)
add_executable(poet poet.cpp) add_executable(poet_coarse poet.cpp ${POET_SOURCES})
target_link_libraries(poet PRIVATE poetlib MPI::MPI_C RRuntime) target_link_libraries(poet_coarse PRIVATE
target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") MPI::MPI_C
${MATH_LIBRARY}
RRuntime
PhreeqcRM
tug
OpenSSL::Crypto
MPIDHT::coarse
)
target_include_directories(poet_coarse PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
install(TARGETS poet_coarse DESTINATION bin)
install(TARGETS poet DESTINATION bin) add_executable(poet_fine poet.cpp ${POET_SOURCES})
target_link_libraries(poet_fine PRIVATE
MPI::MPI_C
${MATH_LIBRARY}
RRuntime
PhreeqcRM
tug
OpenSSL::Crypto
MPIDHT::fine
)
target_include_directories(poet_fine PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
install(TARGETS poet_fine DESTINATION bin)
add_executable(poet_no poet.cpp ${POET_SOURCES})
target_link_libraries(poet_no PRIVATE
MPI::MPI_C
${MATH_LIBRARY}
RRuntime
PhreeqcRM
tug
OpenSSL::Crypto
MPIDHT::nolock
)
target_include_directories(poet_no PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
install(TARGETS poet_no DESTINATION bin)

View File

@ -1,7 +1,7 @@
#include "ChemistryModule.hpp" #include "ChemistryModule.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp" // #include "SurrogateModels/Interpolation.hpp"
#include <PhreeqcRM.h> #include <PhreeqcRM.h>
@ -338,12 +338,12 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
this->dht_enabled = this->params.use_dht; this->dht_enabled = this->params.use_dht;
if (this->params.use_interp) { // if (this->params.use_interp) {
initializeInterp(this->params.pht_max_entries, this->params.pht_size, // initializeInterp(this->params.pht_max_entries, this->params.pht_size,
this->params.interp_min_entries, // this->params.interp_min_entries,
this->params.pht_signifs); // this->params.pht_signifs);
this->interp_enabled = this->params.use_interp; // this->interp_enabled = this->params.use_interp;
} // }
} }
} }
@ -453,47 +453,47 @@ void poet::ChemistryModule::setDHTReadFile(const std::string &input_file) {
} }
} }
void poet::ChemistryModule::initializeInterp( // void poet::ChemistryModule::initializeInterp(
std::uint32_t bucket_size, std::uint32_t size_mb, std::uint32_t min_entries, // std::uint32_t bucket_size, std::uint32_t size_mb, std::uint32_t
const NamedVector<std::uint32_t> &key_species) { // min_entries, const NamedVector<std::uint32_t> &key_species) {
if (!this->is_master) { // if (!this->is_master) {
constexpr uint32_t MB_FACTOR = 1E6; // constexpr uint32_t MB_FACTOR = 1E6;
assert(this->dht); // assert(this->dht);
this->interp_enabled = true; // this->interp_enabled = true;
auto map_copy = key_species; // auto map_copy = key_species;
if (key_species.empty()) { // if (key_species.empty()) {
map_copy = this->dht->getKeySpecies(); // map_copy = this->dht->getKeySpecies();
for (std::size_t i = 0; i < map_copy.size(); i++) { // for (std::size_t i = 0; i < map_copy.size(); i++) {
const std::uint32_t signif = // const std::uint32_t signif =
map_copy[i] - (map_copy[i] > InterpolationModule::COARSE_DIFF // map_copy[i] - (map_copy[i] > InterpolationModule::COARSE_DIFF
? InterpolationModule::COARSE_DIFF // ? InterpolationModule::COARSE_DIFF
: 0); // : 0);
map_copy[i] = signif; // map_copy[i] = signif;
} // }
} // }
auto key_indices = // auto key_indices =
parseDHTSpeciesVec(map_copy, dht->getKeySpecies().getNames()); // parseDHTSpeciesVec(map_copy, dht->getKeySpecies().getNames());
if (this->interp) { // if (this->interp) {
this->interp.reset(); // this->interp.reset();
} // }
const uint64_t pht_size = size_mb * MB_FACTOR; // const uint64_t pht_size = size_mb * MB_FACTOR;
interp = std::make_unique<poet::InterpolationModule>( // interp = std::make_unique<poet::InterpolationModule>(
bucket_size, pht_size, min_entries, *(this->dht), map_copy, key_indices, // bucket_size, pht_size, min_entries, *(this->dht), map_copy,
this->prop_names, this->params.hooks); // key_indices, this->prop_names, this->params.hooks);
interp->setInterpolationFunction(inverseDistanceWeighting); // interp->setInterpolationFunction(inverseDistanceWeighting);
} // }
} // }
std::vector<double> std::vector<double>
poet::ChemistryModule::shuffleField(const std::vector<double> &in_field, poet::ChemistryModule::shuffleField(const std::vector<double> &in_field,

View File

@ -6,7 +6,7 @@
#include "../Base/SimParams.hpp" #include "../Base/SimParams.hpp"
#include "../DataStructures/DataStructures.hpp" #include "../DataStructures/DataStructures.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp" // #include "SurrogateModels/Interpolation.hpp"
#include <IrmResult.h> #include <IrmResult.h>
#include <PhreeqcRM.h> #include <PhreeqcRM.h>
@ -208,6 +208,8 @@ public:
*/ */
std::vector<uint32_t> GetWorkerDHTEvictions() const; std::vector<uint32_t> GetWorkerDHTEvictions() const;
std::vector<uint32_t> GetWorkerDHTCorruptBuckets() const;
/** /**
* **Master only** Returns the current state of the chemical field. * **Master only** Returns the current state of the chemical field.
* *
@ -273,6 +275,7 @@ protected:
WORKER_IP_FC, WORKER_IP_FC,
WORKER_DHT_HITS, WORKER_DHT_HITS,
WORKER_DHT_EVICTIONS, WORKER_DHT_EVICTIONS,
WORKER_DHT_CORRUPT,
WORKER_PHT_CACHE_HITS, WORKER_PHT_CACHE_HITS,
WORKER_IP_CALLS WORKER_IP_CALLS
}; };
@ -280,6 +283,7 @@ protected:
std::vector<uint32_t> interp_calls; std::vector<uint32_t> interp_calls;
std::vector<uint32_t> dht_hits; std::vector<uint32_t> dht_hits;
std::vector<uint32_t> dht_evictions; std::vector<uint32_t> dht_evictions;
std::vector<uint32_t> corrupt_buckets;
struct worker_s { struct worker_s {
double phreeqc_t = 0.; double phreeqc_t = 0.;
@ -354,7 +358,7 @@ protected:
poet::DHT_Wrapper *dht = nullptr; poet::DHT_Wrapper *dht = nullptr;
bool interp_enabled{false}; bool interp_enabled{false};
std::unique_ptr<poet::InterpolationModule> interp; // std::unique_ptr<poet::InterpolationModule> interp;
static constexpr uint32_t BUFFER_OFFSET = 5; static constexpr uint32_t BUFFER_OFFSET = 5;

View File

@ -97,6 +97,25 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTEvictions() const {
return ret; return ret;
} }
std::vector<uint32_t>
poet::ChemistryModule::GetWorkerDHTCorruptBuckets() const {
int type = CHEM_PERF;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
type = WORKER_DHT_CORRUPT;
MPI_Bcast(&type, 1, MPI_INT, 0, this->group_comm);
MPI_Status probe;
MPI_Probe(MPI_ANY_SOURCE, WORKER_DHT_CORRUPT, this->group_comm, &probe);
int count;
MPI_Get_count(&probe, MPI_UINT32_T, &count);
std::vector<uint32_t> ret(count);
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE,
WORKER_DHT_CORRUPT, this->group_comm, NULL);
return ret;
}
std::vector<double> std::vector<double>
poet::ChemistryModule::GetWorkerInterpolationWriteTimings() const { poet::ChemistryModule::GetWorkerInterpolationWriteTimings() const {
int type = CHEM_PERF; int type = CHEM_PERF;

View File

@ -1,661 +0,0 @@
/// Time-stamp: "Last modified 2023-08-10 11:50:46 mluebke"
/*
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "DHT.h"
#include <mpi.h>
#include <inttypes.h>
#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int *dest_rank,
uint64_t *index, unsigned int index_count) {
/** temporary index */
uint64_t tmp_index;
/** how many bytes do we need for one index? */
int index_size = sizeof(double) - (index_count - 1);
for (int i = 0; i < index_count; i++) {
tmp_index = (hash >> (i * 8)) & ((1ULL << (index_size * 8)) - 1);
/* memcpy(&tmp_index, (char *)&hash + i, index_size); */
index[i] = (uint64_t)(tmp_index % table_size);
}
*dest_rank = (unsigned int)(hash % comm_size);
}
static void set_flag(char *flag_byte) {
*flag_byte = 0;
*flag_byte |= (1 << 0);
}
static int read_flag(char flag_byte) {
if ((flag_byte & 0x01) == 0x01) {
return 1;
} else
return 0;
}
DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
unsigned int key_size,
uint64_t (*hash_func)(int, const void *)) {
DHT *object;
MPI_Win window;
void *mem_alloc;
int comm_size, index_bytes;
// calculate how much bytes for the index are needed to address count of
// buckets per process
index_bytes = (int)ceil(log2(size));
if (index_bytes % 8 != 0)
index_bytes = index_bytes + (8 - (index_bytes % 8));
// allocate memory for dht-object
object = (DHT *)malloc(sizeof(DHT));
if (object == NULL)
return NULL;
// every memory allocation has 1 additional byte for flags etc.
if (MPI_Alloc_mem(size * (1 + data_size + key_size), MPI_INFO_NULL,
&mem_alloc) != 0)
return NULL;
if (MPI_Comm_size(comm, &comm_size) != 0)
return NULL;
// since MPI_Alloc_mem doesn't provide memory allocation with the memory set
// to zero, we're doing this here
memset(mem_alloc, '\0', size * (1 + data_size + key_size));
// create windows on previously allocated memory
if (MPI_Win_create(mem_alloc, size * (1 + data_size + key_size),
(1 + data_size + key_size), MPI_INFO_NULL, comm,
&window) != 0)
return NULL;
// fill dht-object
object->data_size = data_size;
object->key_size = key_size;
object->table_size = size;
object->window = window;
object->hash_func = hash_func;
object->comm_size = comm_size;
object->communicator = comm;
object->read_misses = 0;
object->evictions = 0;
object->recv_entry = malloc(1 + data_size + key_size);
object->send_entry = malloc(1 + data_size + key_size);
object->index_count = 9 - (index_bytes / 8);
object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t));
object->mem_alloc = mem_alloc;
// if set, initialize dht_stats
#ifdef DHT_STATISTICS
DHT_stats *stats;
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
if (stats == NULL)
return NULL;
object->stats = stats;
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
object->stats->old_writes = 0;
object->stats->read_misses = 0;
object->stats->evictions = 0;
object->stats->w_access = 0;
object->stats->r_access = 0;
#endif
return object;
}
void DHT_set_accumulate_callback(DHT *table,
int (*callback_func)(int, void *, int,
void *)) {
table->accumulate_callback = callback_func;
}
int DHT_write_accumulate(DHT *table, const void *send_key, int data_size,
void *send_data, uint32_t *proc, uint32_t *index,
int *callback_ret) {
unsigned int dest_rank, i;
int result = DHT_SUCCESS;
#ifdef DHT_STATISTICS
table->stats->w_access++;
#endif
// determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
table->table_size, &dest_rank, table->index,
table->index_count);
// concatenating key with data to write entry to DHT
set_flag((char *)table->send_entry);
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
/* memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
*/
/* table->data_size); */
// locking window of target rank with exclusive lock
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// increment eviction counter if receiving key doesn't match sending key
// entry has write flag and last index is reached.
if (read_flag(*(char *)table->recv_entry)) {
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
0) {
if (i == (table->index_count) - 1) {
table->evictions += 1;
#ifdef DHT_STATISTICS
table->stats->evictions += 1;
#endif
result = DHT_WRITE_SUCCESS_WITH_EVICTION;
break;
}
} else
break;
} else {
#ifdef DHT_STATISTICS
table->stats->writes_local[dest_rank]++;
#endif
break;
}
}
if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) {
memset((char *)table->send_entry + 1 + table->key_size, '\0',
table->data_size);
} else {
memcpy((char *)table->send_entry + 1 + table->key_size,
(char *)table->recv_entry + 1 + table->key_size, table->data_size);
}
*callback_ret = table->accumulate_callback(
data_size, (char *)send_data, table->data_size,
(char *)table->send_entry + 1 + table->key_size);
// put data to DHT (with last selected index by value i)
if (*callback_ret == 0) {
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
}
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
if (proc) {
*proc = dest_rank;
}
if (index) {
*index = table->index[i];
}
return result;
}
int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc,
uint32_t *index) {
unsigned int dest_rank, i;
int result = DHT_SUCCESS;
#ifdef DHT_STATISTICS
table->stats->w_access++;
#endif
// determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
table->table_size, &dest_rank, table->index,
table->index_count);
// concatenating key with data to write entry to DHT
set_flag((char *)table->send_entry);
memcpy((char *)table->send_entry + 1, (char *)send_key, table->key_size);
memcpy((char *)table->send_entry + table->key_size + 1, (char *)send_data,
table->data_size);
// locking window of target rank with exclusive lock
if (MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// increment eviction counter if receiving key doesn't match sending key
// entry has write flag and last index is reached.
if (read_flag(*(char *)table->recv_entry)) {
if (memcmp(send_key, (char *)table->recv_entry + 1, table->key_size) !=
0) {
if (i == (table->index_count) - 1) {
table->evictions += 1;
#ifdef DHT_STATISTICS
table->stats->evictions += 1;
#endif
result = DHT_WRITE_SUCCESS_WITH_EVICTION;
break;
}
} else
break;
} else {
#ifdef DHT_STATISTICS
table->stats->writes_local[dest_rank]++;
#endif
break;
}
}
// put data to DHT (with last selected index by value i)
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
if (proc) {
*proc = dest_rank;
}
if (index) {
*index = table->index[i];
}
return result;
}
int DHT_read(DHT *table, const void *send_key, void *destination) {
unsigned int dest_rank, i;
#ifdef DHT_STATISTICS
table->stats->r_access++;
#endif
// determine destination rank and index by hash of key
determine_dest(table->hash_func(table->key_size, send_key), table->comm_size,
table->table_size, &dest_rank, table->index,
table->index_count);
// locking window of target rank with shared lock
if (MPI_Win_lock(MPI_LOCK_SHARED, dest_rank, 0, table->window) != 0)
return DHT_MPI_ERROR;
// receive data
for (i = 0; i < table->index_count; i++) {
if (MPI_Get(table->recv_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i],
1 + table->data_size + table->key_size, MPI_BYTE,
table->window) != 0)
return DHT_MPI_ERROR;
if (MPI_Win_flush(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// increment read error counter if write flag isn't set ...
if ((read_flag(*(char *)table->recv_entry)) == 0) {
table->read_misses += 1;
#ifdef DHT_STATISTICS
table->stats->read_misses += 1;
#endif
// unlock window and return
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
return DHT_READ_MISS;
}
// ... or key doesn't match passed by key and last index reached.
if (memcmp(((char *)table->recv_entry) + 1, send_key, table->key_size) !=
0) {
if (i == (table->index_count) - 1) {
table->read_misses += 1;
#ifdef DHT_STATISTICS
table->stats->read_misses += 1;
#endif
// unlock window an return
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
return DHT_READ_MISS;
}
} else
break;
}
// unlock window of target rank
if (MPI_Win_unlock(dest_rank, table->window) != 0)
return DHT_MPI_ERROR;
// if matching key was found copy data into memory of passed pointer
memcpy((char *)destination, (char *)table->recv_entry + table->key_size + 1,
table->data_size);
return DHT_SUCCESS;
}
int DHT_read_location(DHT *table, uint32_t proc, uint32_t index,
void *destination) {
const uint32_t bucket_size = table->data_size + table->key_size + 1;
#ifdef DHT_STATISTICS
table->stats->r_access++;
#endif
// locking window of target rank with shared lock
if (MPI_Win_lock(MPI_LOCK_SHARED, proc, 0, table->window) != 0)
return DHT_MPI_ERROR;
// receive data
if (MPI_Get(table->recv_entry, bucket_size, MPI_BYTE, proc, index,
bucket_size, MPI_BYTE, table->window) != 0) {
return DHT_MPI_ERROR;
}
// unlock window of target rank
if (MPI_Win_unlock(proc, table->window) != 0)
return DHT_MPI_ERROR;
// if matching key was found copy data into memory of passed pointer
memcpy((char *)destination, (char *)table->recv_entry + 1 + table->key_size,
table->data_size);
return DHT_SUCCESS;
}
int DHT_to_file(DHT *table, const char *filename) {
// open file
MPI_File file;
if (MPI_File_open(table->communicator, filename,
MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL,
&file) != 0)
return DHT_FILE_IO_ERROR;
int rank;
MPI_Comm_rank(table->communicator, &rank);
// write header (key_size and data_size)
if (rank == 0) {
if (MPI_File_write_shared(file, &table->key_size, 1, MPI_INT,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
if (MPI_File_write_shared(file, &table->data_size, 1, MPI_INT,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
}
MPI_Barrier(table->communicator);
char *ptr;
int bucket_size = table->key_size + table->data_size + 1;
// iterate over local memory
for (unsigned int i = 0; i < table->table_size; i++) {
ptr = (char *)table->mem_alloc + (i * bucket_size);
// if bucket has been written to (checked by written_flag)...
if (read_flag(*ptr)) {
// write key and data to file
if (MPI_File_write_shared(file, ptr + 1, bucket_size - 1, MPI_BYTE,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_WRITE_ERROR;
}
}
MPI_Barrier(table->communicator);
// close file
if (MPI_File_close(&file) != 0)
return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
}
int DHT_from_file(DHT *table, const char *filename) {
MPI_File file;
MPI_Offset f_size;
int bucket_size, buffer_size, cur_pos, rank, offset;
char *buffer;
void *key;
void *data;
// open file
if (MPI_File_open(table->communicator, filename, MPI_MODE_RDONLY,
MPI_INFO_NULL, &file) != 0)
return DHT_FILE_IO_ERROR;
// get file size
if (MPI_File_get_size(file, &f_size) != 0)
return DHT_FILE_IO_ERROR;
MPI_Comm_rank(table->communicator, &rank);
// calculate bucket size
bucket_size = table->key_size + table->data_size;
// buffer size is either bucket size or, if bucket size is smaller than the
// file header, the size of DHT_FILEHEADER_SIZE
buffer_size =
bucket_size > DHT_FILEHEADER_SIZE ? bucket_size : DHT_FILEHEADER_SIZE;
// allocate buffer
buffer = (char *)malloc(buffer_size);
// read file header
if (MPI_File_read(file, buffer, DHT_FILEHEADER_SIZE, MPI_BYTE,
MPI_STATUS_IGNORE) != 0)
return DHT_FILE_READ_ERROR;
// compare if written header data and key size matches current sizes
if (*(int *)buffer != table->key_size)
return DHT_WRONG_FILE;
if (*(int *)(buffer + 4) != table->data_size)
return DHT_WRONG_FILE;
// set offset for each process
offset = bucket_size * table->comm_size;
// seek behind header of DHT file
if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
// current position is rank * bucket_size + OFFSET
cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size);
// loop over file and write data to DHT with DHT_write
while (cur_pos < f_size) {
if (MPI_File_seek(file, cur_pos, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
// TODO: really necessary?
MPI_Offset tmp;
MPI_File_get_position(file, &tmp);
if (MPI_File_read(file, buffer, bucket_size, MPI_BYTE, MPI_STATUS_IGNORE) !=
0)
return DHT_FILE_READ_ERROR;
// extract key and data and write to DHT
key = buffer;
data = (buffer + table->key_size);
if (DHT_write(table, key, data, NULL, NULL) == DHT_MPI_ERROR)
return DHT_MPI_ERROR;
// increment current position
cur_pos += offset;
}
free(buffer);
if (MPI_File_close(&file) != 0)
return DHT_FILE_IO_ERROR;
return DHT_SUCCESS;
}
int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
int buf;
if (eviction_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->evictions, &buf, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
*eviction_counter = buf;
}
if (readerror_counter != NULL) {
buf = 0;
if (MPI_Reduce(&table->read_misses, &buf, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
*readerror_counter = buf;
}
if (MPI_Win_free(&(table->window)) != 0)
return DHT_MPI_ERROR;
if (MPI_Free_mem(table->mem_alloc) != 0)
return DHT_MPI_ERROR;
free(table->recv_entry);
free(table->send_entry);
free(table->index);
#ifdef DHT_STATISTICS
free(table->stats->writes_local);
free(table->stats);
#endif
free(table);
return DHT_SUCCESS;
}
int DHT_print_statistics(DHT *table) {
#ifdef DHT_STATISTICS
int *written_buckets;
int *read_misses, sum_read_misses;
int *evictions, sum_evictions;
int sum_w_access, sum_r_access, *w_access, *r_access;
int rank;
MPI_Comm_rank(table->communicator, &rank);
// disable possible warning of unitialized variable, which is not the case
// since we're using MPI_Gather to obtain all values only on rank 0
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
// obtaining all values from all processes in the communicator
if (rank == 0)
read_misses = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->read_misses, 1, MPI_INT, read_misses, 1,
MPI_INT, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->read_misses, &sum_read_misses, 1, MPI_INT,
MPI_SUM, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->read_misses = 0;
if (rank == 0)
evictions = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM,
0, table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->evictions = 0;
if (rank == 0)
w_access = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->w_access = 0;
if (rank == 0)
r_access = (int *)malloc(table->comm_size * sizeof(int));
if (MPI_Gather(&table->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
if (MPI_Reduce(&table->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0,
table->communicator) != 0)
return DHT_MPI_ERROR;
table->stats->r_access = 0;
if (rank == 0)
written_buckets = (int *)calloc(table->comm_size, sizeof(int));
if (MPI_Reduce(table->stats->writes_local, written_buckets, table->comm_size,
MPI_INT, MPI_SUM, 0, table->communicator) != 0)
return DHT_MPI_ERROR;
if (rank == 0) { // only process with rank 0 will print out results as a
// table
int sum_written_buckets = 0;
for (int i = 0; i < table->comm_size; i++) {
sum_written_buckets += written_buckets[i];
}
int members = 7;
int padsize = (members * 12) + 1;
char pad[padsize + 1];
memset(pad, '-', padsize * sizeof(char));
pad[padsize] = '\0';
printf("\n");
printf("%-35s||resets with every call of this function\n", " ");
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied",
"free", "w_access", "r_access", "read misses", "evictions");
printf("%s\n", pad);
for (int i = 0; i < table->comm_size; i++) {
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", i,
written_buckets[i], table->table_size - written_buckets[i],
w_access[i], r_access[i], read_misses[i], evictions[i]);
}
printf("%s\n", pad);
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d\n", "sum",
sum_written_buckets,
(table->table_size * table->comm_size) - sum_written_buckets,
sum_w_access, sum_r_access, sum_read_misses, sum_evictions);
printf("%s\n", pad);
printf("%s %d\n",
"new entries:", sum_written_buckets - table->stats->old_writes);
printf("\n");
fflush(stdout);
table->stats->old_writes = sum_written_buckets;
}
// enable warning again
#pragma GCC diagnostic pop
MPI_Barrier(table->communicator);
return DHT_SUCCESS;
#endif
}

View File

@ -1,327 +0,0 @@
/*
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
/**
* @file DHT.h
* @author Max Lübke (mluebke@uni-potsdam.de)
* @brief API to interact with the DHT
* @version 0.1
* @date 16 Nov 2017
*
* This file implements the creation of a DHT by using the MPI
* one-sided-communication. There is also the possibility to write or read data
* from or to the DHT. In addition, the current state of the DHT can be written
* to a file and read in again later.
*/
#ifndef DHT_H
#define DHT_H
#include <mpi.h>
#include <stdint.h>
/** Returned if some error in MPI routine occurs. */
#define DHT_MPI_ERROR -1
/** Returned by a call of DHT_read if no bucket with given key was found. */
#define DHT_READ_MISS -2
/** Returned by DHT_write if a bucket was evicted. */
#define DHT_WRITE_SUCCESS_WITH_EVICTION -3
/** Returned when no errors occured. */
#define DHT_SUCCESS 0
/** Returned by DHT_from_file if the given file does not match expected file. */
#define DHT_WRONG_FILE -11
/** Returned by DHT file operations if MPI file operation fails. */
#define DHT_FILE_IO_ERROR -12
/** Returned by DHT file operations if error occured in MPI_Read operation. */
#define DHT_FILE_READ_ERROR -13
/** Returned by DHT file operations if error occured in MPI_Write operation. */
#define DHT_FILE_WRITE_ERROR -14
/** Size of the file header in byte. */
#define DHT_FILEHEADER_SIZE 8
/**
* Internal struct to store statistics about read and write accesses and also
* read misses and evictions.
* <b>All values will be resetted to zero after a call of
* DHT_print_statistics().</b>
* Internal use only!
*
* @todo There's maybe a better solution than DHT_print_statistics and this
* struct
*/
typedef struct {
/** Count of writes to specific process this process did. */
int *writes_local;
/** Writes after last call of DHT_print_statistics. */
int old_writes;
/** How many read misses occur? */
int read_misses;
/** How many buckets where evicted? */
int evictions;
/** How many calls of DHT_write() did this process? */
int w_access;
/** How many calls of DHT_read() did this process? */
int r_access;
} DHT_stats;
/**
* Struct which serves as a handler or so called \a DHT-object. Will
* be created by DHT_create and must be passed as a parameter to every following
* function. Stores all relevant data.
* Do not touch outside DHT functions!
*/
typedef struct {
/** Created MPI Window, which serves as the DHT memory area of the process. */
MPI_Win window;
/** Size of the data of a bucket entry in byte. */
int data_size;
/** Size of the key of a bucket entry in byte. */
int key_size;
/** Count of buckets for each process. */
unsigned int table_size;
/** MPI communicator of all participating processes. */
MPI_Comm communicator;
/** Size of the MPI communicator respectively all participating processes. */
int comm_size;
/** Pointer to a hashfunction. */
uint64_t (*hash_func)(int, const void *);
/** Pre-allocated memory where a bucket can be received. */
void *recv_entry;
/** Pre-allocated memory where a bucket to send can be stored. */
void *send_entry;
/** Allocated memory on which the MPI window was created. */
void *mem_alloc;
/** Count of read misses over all time. */
int read_misses;
/** Count of evictions over all time. */
int evictions;
/** Array of indeces where a bucket can be stored. */
uint64_t *index;
/** Count of possible indeces. */
unsigned int index_count;
int (*accumulate_callback)(int, void *, int, void *);
#ifdef DHT_STATISTICS
/** Detailed statistics of the usage of the DHT. */
DHT_stats *stats;
#endif
} DHT;
extern void DHT_set_accumulate_callback(DHT *table,
int (*callback_func)(int, void *, int,
void *));
extern int DHT_write_accumulate(DHT *table, const void *key, int send_size,
void *data, uint32_t *proc, uint32_t *index, int *callback_ret);
/**
* @brief Create a DHT.
*
* When calling this function, the required memory is allocated and a
* MPI_Window is created. This allows the execution of MPI_Get and
* MPI_Put operations for one-sided communication. Then the number of
* indexes is calculated and finally all relevant data is entered into the
* \a DHT-object which is returned.
*
* @param comm MPI communicator which addresses all participating process of the
* DHT.
* @param size_per_process Number of buckets per process.
* @param data_size Size of data in byte.
* @param key_size Size of the key in byte.
* @param hash_func Pointer to a hash function. This function must take the size
* of the key and a pointer to the key as input parameters and return a 64 bit
* hash.
* @return DHT* The returned value is the \a DHT-object which serves as a handle
* for all DHT operations. If an error occured NULL is returned.
*/
extern DHT *DHT_create(MPI_Comm comm, uint64_t size_per_process,
unsigned int data_size, unsigned int key_size,
uint64_t (*hash_func)(int, const void *));
/**
* @brief Write data into DHT.
*
* When DHT_write is called, the address window is locked with a
* LOCK_EXCLUSIVE for write access. Now the first bucket is received
* using MPI_Get and it is checked if the bucket is empty or if the received key
* matches the passed key. If this is the case, the data of the bucket is
* overwritten with the new value. If not, the function continues with the next
* index until no more indexes are available. When the last index is reached and
* there are no more indexes available, the last examined bucket is replaced.
* After successful writing, the memory window is released and the function
* returns.
*
* @param table Pointer to the \a DHT-object.
* @param key Pointer to the key.
* @param data Pointer to the data.
* @param proc If not NULL, returns the process number written to.
* @param index If not NULL, returns the index of the bucket where the data was
* written to.
* @return int Returns either DHT_SUCCESS on success or correspondending error
* value on eviction or error.
*/
extern int DHT_write(DHT *table, void *key, void *data, uint32_t *proc,
uint32_t *index);
/**
* @brief Read data from DHT.
*
* At the beginning, the target process and all possible indices are determined.
* After that a SHARED lock on the address window for read access is done
* and the first entry is retrieved. Now the received key is compared
* with the key passed to the function. If they coincide the correct data
* was found. If not it continues with the next index. If the last
* possible bucket is reached and the keys still do not match the read
* error counter is incremented. After the window has been released
* again, the function returns with a corresponding return value (read
* error or error-free read). The data to be read out is also written to
* the memory area of the passed pointer.
*
* @param table Pointer to the \a DHT-object.
* @param key Pointer to the key.
* @param destination Pointer to memory area where retreived data should be
* stored.
* @return int Returns either DHT_SUCCESS on success or correspondending error
* value on read miss or error.
*/
extern int DHT_read(DHT *table, const void *key, void *destination);
extern int DHT_read_location(DHT *table, uint32_t proc, uint32_t index,
void *destination);
/**
* @brief Write current state of DHT to file.
*
* All contents are written as a memory dump, so that no conversion takes place.
* First, an attempt is made to open or create a file. If this is successful the
* file header consisting of data and key size is written. Then each process
* reads its memory area of the DHT and each bucket that was marked as written
* is added to the file using MPI file operations.
*
* @param table Pointer to the \a DHT-object.
* @param filename Name of the file to write to.
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
* opened/closed or DHT_WRITE_ERROR if file is not writable.
*/
extern int DHT_to_file(DHT *table, const char *filename);
/**
* @brief Read state of DHT from file.
*
* One needs a previously written DHT file (by DHT_from_file).
* First of all, an attempt is made to open the specified file. If this is
* succeeded the file header is read and compared with the current values of the
* DHT. If the data and key sizes do not differ, one can continue. Each process
* reads one line of the file and writes it to the DHT with DHT_write. This
* happens until no more lines are left. The writing is done by the
* implementation of DHT_write.
*
* @param table Pointer to the \a DHT-object.
* @param filename Name of the file to read from.
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
* opened/closed, DHT_READ_MISS if file is not readable or DHT_WRONG_FILE if
* file doesn't match expectation. This is possible if the data size or key size
* is different.
*/
extern int DHT_from_file(DHT *table, const char *filename);
/**
* @brief Free ressources of DHT.
*
* Finally, to free all resources after using the DHT, the function
* DHT_free must be used. This will free the MPI\_Window, as well as the
* associated memory. Also all internal variables are released. Optionally, the
* count of evictions and read misses can also be obtained.
*
* @param table Pointer to the \a DHT-object.
* @param eviction_counter \a optional: Pointer to integer where the count of
* evictions should be stored.
* @param readerror_counter \a optional: Pointer to integer where the count of
* read errors should be stored.
* @return int Returns either DHT_SUCCESS on success or DHT_MPI_ERROR on
* internal MPI error.
*/
extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter);
/**
* @brief Prints a table with statistics about current use of DHT.
*
* These statistics are from each participated process and also summed up over
* all processes. Detailed statistics are:
* -# occupied buckets (in respect to the memory of this process)
* -# free buckets (in respect to the memory of this process)
* -# calls of DHT_write (w_access)
* -# calls of DHT_read (r_access)
* -# read misses (see DHT_READ_MISS)
* -# collisions (see DHT_WRITE_SUCCESS_WITH_EVICTION)
* 3-6 will reset with every call of this function finally the amount of new
* written entries is printed out (since the last call of this funtion).
*
* This is done by collective MPI operations with the root process with rank 0,
* which will also print a table with all informations to stdout.
*
* Also, as this function was implemented for a special case (POET project) one
* need to define DHT_STATISTICS to the compiler macros to use this
* function (eg. <emph>gcc -DDHT_STATISTICS ... </emph>).
* @param table Pointer to the \a DHT-object.
* @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI
* error.
*/
extern int DHT_print_statistics(DHT *table);
/**
* @brief Determine destination rank and index.
*
* This is done by looping over all possbile indices. First of all, set a
* temporary index to zero and copy count of bytes for each index into the
* memory area of the temporary index. After that the current index is
* calculated by the temporary index modulo the table size. The destination rank
* of the process is simply determined by hash modulo the communicator size.
*
* @param hash Calculated 64 bit hash.
* @param comm_size Communicator size.
* @param table_size Count of buckets per process.
* @param dest_rank Reference to the destination rank variable.
* @param index Pointer to the array index.
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int *dest_rank,
uint64_t *index, unsigned int index_count);
/**
* @brief Set the occupied flag.
*
* This will set the first bit of a bucket to 1.
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char *flag_byte);
/**
* @brief Get the occupied flag.
*
* This function determines whether the occupied flag of a bucket was set or
* not.
*
* @param flag_byte First byte of a bucket.
* @return int Returns 1 for true or 0 for false.
*/
static int read_flag(char flag_byte);
#endif /* DHT_H */

View File

@ -21,6 +21,8 @@
*/ */
#include "DHT_Wrapper.hpp" #include "DHT_Wrapper.hpp"
#include "HashFunctions.hpp"
#include "LUCX/DHT.h"
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
@ -28,7 +30,7 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <cstring> #include <cstring>
#include <iostream> #include <mpi.h>
#include <stdexcept> #include <stdexcept>
#include <vector> #include <vector>
@ -49,13 +51,36 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
// initialize DHT object // initialize DHT object
// key size = count of key elements + timestep // key size = count of key elements + timestep
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement); uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
uint32_t data_size = // uint32_t data_size =
(data_count + (with_interp ? input_key_elements.size() : 0)) * // (data_count + (with_interp ? input_key_elements.size() : 0)) *
sizeof(double); // sizeof(double);
uint32_t data_size = data_count * sizeof(double);
uint32_t buckets_per_process = uint32_t buckets_per_process =
static_cast<std::uint32_t>(dht_size / (data_size + key_size)); static_cast<std::uint32_t>(dht_size / (data_size + key_size));
dht_object = DHT_create(dht_comm, buckets_per_process, data_size, key_size,
&poet::Murmur2_64A); #ifdef POET_DHT_UCX
const ucx_ep_args_mpi_t ucx_bcast_mpi_args = {.comm = dht_comm};
const DHT_init_t dht_init = {
.key_size = static_cast<int>(key_size),
.data_size = static_cast<int>(data_size),
.bucket_count = static_cast<unsigned int>(buckets_per_process),
.hash_func = &poet::md5_sum,
.bcast_func = UCX_INIT_BSTRAP_MPI,
.bcast_func_args = &ucx_bcast_mpi_args};
dht_object = DHT_create(&dht_init);
#else
const DHT_init_t dht_init = {
.key_size = static_cast<int>(key_size),
.data_size = static_cast<int>(data_size),
.bucket_count = static_cast<unsigned int>(buckets_per_process),
.hash_func = &poet::md5_sum,
.comm = dht_comm};
dht_object = DHT_create(&dht_init);
#endif
if (dht_object == nullptr) {
throw std::runtime_error("DHT_create failed");
}
dht_signif_vector = key_species.getValues(); dht_signif_vector = key_species.getValues();
@ -81,7 +106,11 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
DHT_Wrapper::~DHT_Wrapper() { DHT_Wrapper::~DHT_Wrapper() {
// free DHT // free DHT
DHT_free(dht_object, NULL, NULL); #ifdef POET_DHT_UCX
DHT_free(dht_object, NULL);
#else
DHT_free(dht_object, NULL);
#endif
} }
auto DHT_Wrapper::checkDHT(WorkPackage &work_package) auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
-> const DHT_ResultObject & { -> const DHT_ResultObject & {
@ -111,6 +140,9 @@ auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
break; break;
case DHT_READ_MISS: case DHT_READ_MISS:
break; break;
case DHT_READ_CORRUPT:
this->corrupt_buckets++;
break;
} }
} }
@ -232,14 +264,14 @@ DHT_Wrapper::ratesToOutput(const std::vector<double> &dht_data,
// } // }
int DHT_Wrapper::tableToFile(const char *filename) { int DHT_Wrapper::tableToFile(const char *filename) {
int res = DHT_to_file(dht_object, filename); // int res = DHT_to_file(dht_object, filename);
return res; // return res;
} }
int DHT_Wrapper::fileToTable(const char *filename) { int DHT_Wrapper::fileToTable(const char *filename) {
int res = DHT_from_file(dht_object, filename); // int res = DHT_from_file(dht_object, filename);
if (res != DHT_SUCCESS) // if (res != DHT_SUCCESS)
return res; // return res;
#ifdef DHT_STATISTICS #ifdef DHT_STATISTICS
DHT_print_statistics(dht_object); DHT_print_statistics(dht_object);

View File

@ -39,9 +39,11 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
extern "C" { #include <LUCX/DHT.h>
#include "DHT.h"
} #ifdef POET_DHT_UCX
#include <LUCX/Bootstrap.h>
#endif
#include <mpi.h> #include <mpi.h>
@ -187,9 +189,12 @@ public:
*/ */
auto getEvictions() { return this->dht_evictions; }; auto getEvictions() { return this->dht_evictions; };
auto getCorruptBuckets() { return this->corrupt_buckets; }
void resetCounter() { void resetCounter() {
this->dht_hits = 0; this->dht_hits = 0;
this->dht_evictions = 0; this->dht_evictions = 0;
this->corrupt_buckets = 0;
} }
void SetSignifVector(std::vector<uint32_t> signif_vec); void SetSignifVector(std::vector<uint32_t> signif_vec);
@ -251,6 +256,7 @@ private:
uint32_t dht_hits = 0; uint32_t dht_hits = 0;
uint32_t dht_evictions = 0; uint32_t dht_evictions = 0;
uint32_t corrupt_buckets = 0;
NamedVector<std::uint32_t> key_species; NamedVector<std::uint32_t> key_species;

View File

@ -25,6 +25,9 @@
*/ */
#include "HashFunctions.hpp" #include "HashFunctions.hpp"
#include <cstdint>
#include <openssl/md5.h>
#if defined(_MSC_VER) #if defined(_MSC_VER)
@ -93,3 +96,21 @@ uint64_t poet::Murmur2_64A(int len, const void *key) {
return h; return h;
} }
uint64_t poet::md5_sum(int len, const void *key) {
MD5_CTX ctx;
unsigned char sum[MD5_DIGEST_LENGTH];
uint64_t retval;
uint64_t *v1;
uint64_t *v2;
MD5_Init(&ctx);
MD5_Update(&ctx, key, len);
MD5_Final(sum, &ctx);
v1 = (uint64_t *)&sum[0];
v2 = (uint64_t *)&sum[8];
retval = *v1 ^ *v2;
return retval;
}

View File

@ -31,6 +31,7 @@ namespace poet {
constexpr uint32_t HASH_SEED = 80 + 79 + 69 + 84; constexpr uint32_t HASH_SEED = 80 + 79 + 69 + 84;
uint64_t Murmur2_64A(int len, const void *key); uint64_t Murmur2_64A(int len, const void *key);
uint64_t md5_sum(int len, const void *key);
} // namespace poet } // namespace poet

View File

@ -9,20 +9,16 @@
#include "LookupKey.hpp" #include "LookupKey.hpp"
#include "Rounding.hpp" #include "Rounding.hpp"
#include <cassert>
#include <iostream>
#include <list> #include <list>
#include <memory> #include <memory>
#include <mpi.h> #include <mpi.h>
#include <string> #include <string>
#include <utility> #include <utility>
extern "C" { #include <DHT_ucx/DHT.h>
#include "DHT.h" #include <DHT_ucx/UCX_bcast_functions.h>
}
#include <cstdint> #include <cstdint>
#include <functional>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>

View File

@ -3,30 +3,20 @@
#include "../../DataStructures/DataStructures.hpp" #include "../../DataStructures/DataStructures.hpp"
#include "DHT_Wrapper.hpp" #include "DHT_Wrapper.hpp"
#include "HashFunctions.hpp"
#include "LookupKey.hpp" #include "LookupKey.hpp"
#include "Rounding.hpp"
#include <Rcpp.h> #include <Rcpp.h>
#include <Rinternals.h> #include <Rinternals.h>
#include <algorithm> #include <algorithm>
#include <array>
#include <cassert>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <functional>
#include <iterator> #include <iterator>
#include <memory> #include <memory>
#include <mpi.h> #include <mpi.h>
#include <string> #include <string>
#include <utility>
#include <vector> #include <vector>
extern "C" {
#include "DHT.h"
}
namespace poet { namespace poet {
InterpolationModule::InterpolationModule( InterpolationModule::InterpolationModule(

View File

@ -6,17 +6,12 @@
#include "LookupKey.hpp" #include "LookupKey.hpp"
#include "Rounding.hpp" #include "Rounding.hpp"
#include <cassert>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <iostream>
#include <memory>
#include <unordered_set>
#include <vector> #include <vector>
extern "C" { #include <DHT_ucx/DHT.h>
#include "DHT.h" #include <DHT_ucx/UCX_bcast_functions.h>
}
namespace poet { namespace poet {
@ -38,16 +33,23 @@ ProximityHashTable::ProximityHashTable(uint32_t key_size, uint32_t data_size,
uint32_t buckets_per_process = uint32_t buckets_per_process =
static_cast<std::uint32_t>(size_per_process / (data_size + key_size)); static_cast<std::uint32_t>(size_per_process / (data_size + key_size));
this->prox_ht = DHT_create(communicator, buckets_per_process, data_size, ucx_ep_args_mpi_t mpi_bcast_args = {.comm = communicator};
key_size, &poet::Murmur2_64A); DHT_init_t dht_init_args = {.key_size = static_cast<int>(key_size),
.data_size = static_cast<int>(data_size),
.bucket_count = buckets_per_process,
.hash_func = &poet::Murmur2_64A,
.bcast_func = UCX_INIT_BSTRAP_MPI,
.bcast_func_args = &mpi_bcast_args};
DHT_set_accumulate_callback(this->prox_ht, PHT_callback_function); this->prox_ht = DHT_create(&dht_init_args);
// DHT_set_accumulate_callback(this->prox_ht, PHT_callback_function);
} }
ProximityHashTable::~ProximityHashTable() { ProximityHashTable::~ProximityHashTable() {
delete[] bucket_store; delete[] bucket_store;
if (prox_ht) { if (prox_ht) {
DHT_free(prox_ht, NULL, NULL); DHT_free(prox_ht, NULL, NULL, NULL);
} }
} }
@ -93,12 +95,12 @@ void ProximityHashTable::writeLocationToPHT(LookupKey key,
int ret_val; int ret_val;
int status = DHT_write_accumulate(prox_ht, key.data(), sizeof(location), // int status = DHT_write_accumulate(prox_ht, key.data(), sizeof(location),
&location, NULL, NULL, &ret_val); // &location, NULL, NULL, &ret_val);
if (status == DHT_WRITE_SUCCESS_WITH_EVICTION) { // if (status == DHT_WRITE_SUCCESS_WITH_EVICTION) {
this->dht_evictions++; // this->dht_evictions++;
} // }
// if (ret_val == INTERP_CB_FULL) { // if (ret_val == INTERP_CB_FULL) {
// localCache(key, {}); // localCache(key, {});
@ -148,7 +150,7 @@ const ProximityHashTable::PHT_Result &ProximityHashTable::query(
for (const auto &loc : locations) { for (const auto &loc : locations) {
double start_g = MPI_Wtime(); double start_g = MPI_Wtime();
DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data()); // DHT_read_location(source_dht, loc.first, loc.second, dht_buffer.data());
this->pht_gather_dht_t += MPI_Wtime() - start_g; this->pht_gather_dht_t += MPI_Wtime() - start_g;
auto *buffer = reinterpret_cast<double *>(dht_buffer.data()); auto *buffer = reinterpret_cast<double *>(dht_buffer.data());

View File

@ -2,7 +2,7 @@
#include "ChemistryModule.hpp" #include "ChemistryModule.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp" // #include "SurrogateModels/Interpolation.hpp"
#include <IrmResult.h> #include <IrmResult.h>
#include <algorithm> #include <algorithm>
@ -173,9 +173,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
timings.dht_get += dht_get_end - dht_get_start; timings.dht_get += dht_get_end - dht_get_start;
} }
if (interp_enabled) { // if (interp_enabled) {
interp->tryInterpolation(s_curr_wp); // interp->tryInterpolation(s_curr_wp);
} // }
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
@ -201,9 +201,9 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
dht->fillDHT(s_curr_wp); dht->fillDHT(s_curr_wp);
dht_fill_end = MPI_Wtime(); dht_fill_end = MPI_Wtime();
if (interp_enabled) { // if (interp_enabled) {
interp->writePairs(); // interp->writePairs();
} // }
timings.dht_fill += dht_fill_end - dht_fill_start; timings.dht_fill += dht_fill_end - dht_fill_start;
} }
@ -220,24 +220,26 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
if (this->dht_enabled) { if (this->dht_enabled) {
dht_hits.push_back(dht->getHits()); dht_hits.push_back(dht->getHits());
dht_evictions.push_back(dht->getEvictions()); dht_evictions.push_back(dht->getEvictions());
corrupt_buckets.push_back(dht->getCorruptBuckets());
dht->resetCounter(); dht->resetCounter();
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
WorkerWriteDHTDump(iteration); WorkerWriteDHTDump(iteration);
} }
dht->printStatistics();
} }
if (this->interp_enabled) { // if (this->interp_enabled) {
std::stringstream out; // std::stringstream out;
interp_calls.push_back(interp->getInterpolationCount()); // interp_calls.push_back(interp->getInterpolationCount());
interp->resetCounter(); // interp->resetCounter();
interp->writePHTStats(); // interp->writePHTStats();
if (this->dht_snaps_type == DHT_SNAPS_ITEREND) { // if (this->dht_snaps_type == DHT_SNAPS_ITEREND) {
out << this->dht_file_out_dir << "/iter_" << std::setfill('0') // out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
<< std::setw(this->file_pad) << iteration << ".pht"; // << std::setw(this->file_pad) << iteration << ".pht";
interp->dumpPHTState(out.str()); // interp->dumpPHTState(out.str());
} // }
} // }
RInsidePOET::getInstance().parseEvalQ("gc()"); RInsidePOET::getInstance().parseEvalQ("gc()");
} }
@ -246,12 +248,12 @@ void poet::ChemistryModule::WorkerPostSim(uint32_t iteration) {
if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { if (this->dht_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
WorkerWriteDHTDump(iteration); WorkerWriteDHTDump(iteration);
} }
if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) { // if (this->interp_enabled && this->dht_snaps_type >= DHT_SNAPS_ITEREND) {
std::stringstream out; // std::stringstream out;
out << this->dht_file_out_dir << "/iter_" << std::setfill('0') // out << this->dht_file_out_dir << "/iter_" << std::setfill('0')
<< std::setw(this->file_pad) << iteration << ".pht"; // << std::setw(this->file_pad) << iteration << ".pht";
interp->dumpPHTState(out.str()); // interp->dumpPHTState(out.str());
} // }
} }
void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) { void poet::ChemistryModule::WorkerWriteDHTDump(uint32_t iteration) {
@ -348,26 +350,26 @@ void poet::ChemistryModule::WorkerPerfToMaster(int type,
this->group_comm); this->group_comm);
break; break;
} }
case WORKER_IP_WRITE: { // case WORKER_IP_WRITE: {
double val = interp->getPHTWriteTime(); // double val = interp->getPHTWriteTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); // MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
break; // this->group_comm); break;
} // }
case WORKER_IP_READ: { // case WORKER_IP_READ: {
double val = interp->getPHTReadTime(); // double val = interp->getPHTReadTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); // MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
break; // this->group_comm); break;
} // }
case WORKER_IP_GATHER: { // case WORKER_IP_GATHER: {
double val = interp->getDHTGatherTime(); // double val = interp->getDHTGatherTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); // MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
break; // this->group_comm); break;
} // }
case WORKER_IP_FC: { // case WORKER_IP_FC: {
double val = interp->getInterpolationTime(); // double val = interp->getInterpolationTime();
MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0, this->group_comm); // MPI_Gather(&val, 1, MPI_DOUBLE, NULL, 1, MPI_DOUBLE, 0,
break; // this->group_comm); break;
} // }
default: { default: {
throw std::runtime_error("Unknown perf type in master's message."); throw std::runtime_error("Unknown perf type in master's message.");
} }
@ -402,15 +404,19 @@ void poet::ChemistryModule::WorkerMetricsToMaster(int type) {
reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS); reduce_and_send(dht_evictions, WORKER_DHT_EVICTIONS);
break; break;
} }
case WORKER_DHT_CORRUPT: {
reduce_and_send(corrupt_buckets, WORKER_DHT_CORRUPT);
break;
}
case WORKER_IP_CALLS: { case WORKER_IP_CALLS: {
reduce_and_send(interp_calls, WORKER_IP_CALLS); reduce_and_send(interp_calls, WORKER_IP_CALLS);
return; return;
} }
case WORKER_PHT_CACHE_HITS: { // case WORKER_PHT_CACHE_HITS: {
std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits(); // std::vector<std::uint32_t> input = this->interp->getPHTLocalCacheHits();
reduce_and_send(input, WORKER_PHT_CACHE_HITS); // reduce_and_send(input, WORKER_PHT_CACHE_HITS);
return; // return;
} // }
default: { default: {
throw std::runtime_error("Unknown perf type in master's message."); throw std::runtime_error("Unknown perf type in master's message.");
} }

View File

@ -0,0 +1,250 @@
#include "AdvectionModule.hpp"
#include "../Base/Macros.hpp"
#include <cmath>
#include <csignal>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <mpi.h>
#include <string>
#include <vector>
#include <Rcpp.h>
#ifdef _OPENMP
#include <omp.h>
#else
#define omp_get_thread_num() 0
#endif
namespace poet {
inline std::array<std::int32_t, 4> getIndices(std::int32_t index,
std::int32_t n, std::int32_t m) {
std::array<std::int32_t, 4> indices;
// east index
indices[0] = (index % n == n - 1 ? -1 : index + 1);
// south index
indices[1] = (index + n >= m * n ? -1 : index + n);
// west index
indices[2] = (index % n == 0 ? -1 : index - 1);
// north index
indices[3] = (index - n < 0 ? -1 : index - n);
return indices;
}
inline double getFluxApplyConc(bool inbound, std::uint32_t curr_index,
std::int32_t neighbor_index,
const std::vector<double> &conc, double bc) {
// On inbound flux and non-boundary condition
if (inbound) {
if (neighbor_index == -1) {
// Return boundary condition value
return bc;
}
return conc[neighbor_index];
}
// inbound flow from boundary condition (open) or outbound flow
return conc[curr_index];
}
inline double
AdvectionModule::calcDeltaConc(std::size_t cell_index, double bc_val,
const std::vector<double> &spec_vec,
const std::vector<double> &flux) {
const auto neighbor_indices =
getIndices(cell_index, this->n_cells[0], this->n_cells[1]);
double ds{.0};
for (std::size_t neighbor_i = 0; neighbor_i < neighbor_indices.size();
neighbor_i++) {
const double &curr_flux = flux[neighbor_i];
const bool inbound = curr_flux > 0;
const double flux_apply_val = getFluxApplyConc(
inbound, cell_index, neighbor_indices[neighbor_i], spec_vec, bc_val);
ds += curr_flux * flux_apply_val;
}
return ds;
}
std::vector<double>
AdvectionModule::CFLTimeVec(double req_dt,
const std::vector<double> &max_fluxes) {
const auto field_size = this->n_cells[0] * this->n_cells[1];
double cfl = std::numeric_limits<double>::max();
for (std::size_t i = 0; i < field_size; i++) {
const double dt =
(cell_volume * density[i] * water_saturation[i] * porosity[i]) /
max_fluxes[i];
if (dt < cfl) {
cfl = dt;
}
}
if (cfl < req_dt) {
const std::uint32_t niter = std::floor(req_dt / cfl);
std::vector<double> time_vec(niter + 1, cfl);
time_vec[niter] = req_dt - (cfl * niter);
return time_vec;
}
return {req_dt};
}
void AdvectionModule::simulate(double dt) {
double start_t = MPI_Wtime();
const auto &n = this->n_cells[0];
const auto &m = this->n_cells[1];
// HACK: constant flux for this moment imported from R runtime
// RInsidePOET &R = RInsidePOET::getInstance();
// const auto flux_list =
// Rcpp::as<Rcpp::DataFrame>(R.parseEval("mysetup$advection$const_flux"));
// std::vector<std::vector<double>> flux(flux_list.size());
// auto parse_end = std::chrono::steady_clock::now();
// MSG("Parsing took " +
// std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
// parse_end - parse_start)
// .count()));
MSG("Advection time step requested: " + std::to_string(dt));
std::vector<double> max_fluxes(flux.size());
// #pragma omp parallel for schedule(dynamic)
for (std::size_t i = 0; i < max_fluxes.size(); i++) {
double cumflux = .0;
for (std::size_t j = 0; j < flux[i].size(); j++) {
cumflux += std::abs(flux[i][j]);
}
max_fluxes[i] = cumflux;
}
const auto time_vec = CFLTimeVec(dt, max_fluxes);
MSG("CFL yielding " + std::to_string(time_vec.size()) + " inner iterations");
auto field_vec = t_field.As2DVector();
// iterate over all inactive cells and set defined values, as chemistry module
// won't skip those cells until now
for (const auto &inac_cell : this->inactive_cells) {
for (std::size_t species_i = 0; species_i < field_vec.size(); species_i++) {
field_vec[species_i][inac_cell.first] = inac_cell.second[species_i];
}
}
// #pragma omp parallel for schedule(dynamic)
for (std::size_t species_i = 0; species_i < field_vec.size(); species_i++) {
auto &species = field_vec[species_i];
std::vector<double> spec_copy = species;
for (const double &curr_dt : time_vec) {
for (std::size_t cell_i = 0; cell_i < n * m; cell_i++) {
// if inactive cell -> skip
if (this->inactive_cells.find(cell_i) != this->inactive_cells.end()) {
continue;
}
double delta_conc = calcDeltaConc(
cell_i, this->boundary_condition[species_i], species, flux[cell_i]);
spec_copy[cell_i] +=
(curr_dt * delta_conc) / (porosity[cell_i] * cell_volume);
// if (species_i == 0 &&
// (spec_copy[cell_i] < 110.124 || spec_copy[cell_i] > 110.683)) {
// raise(SIGABRT);
// }
}
species = spec_copy;
}
}
t_field = field_vec;
double end_t = MPI_Wtime();
MSG("Advection took " + std::to_string(end_t - start_t) + "s");
this->transport_t += end_t - start_t;
}
void AdvectionModule::initializeParams(RInsidePOET &R) {
const std::uint32_t n = this->n_cells[0] =
R.parseEval("mysetup$grid$n_cells[1]");
const std::uint32_t m = this->n_cells[1] =
R.parseEval("mysetup$grid$n_cells[2]");
const std::uint32_t x = this->s_cells[0] =
R.parseEval("mysetup$grid$s_cells[1]");
const std::uint32_t y = this->s_cells[1] =
R.parseEval("mysetup$grid$s_cells[2]");
this->field_size = n * m;
this->cell_volume = (s_cells[0] * s_cells[1]) / this->field_size;
// HACK: For now, we neglect porisity, density and water saturation of the
// cell
this->porosity = std::vector<double>(field_size, 1.);
this->density = std::vector<double>(field_size, 1.);
this->water_saturation = std::vector<double>(field_size, 1.);
const auto init_vec =
Rcpp::as<Rcpp::NumericVector>(R.parseEval("mysetup$advection$init"));
// Initialize prop names and according values
const auto prop_names = Rcpp::as<std::vector<std::string>>(init_vec.names());
std::vector<std::vector<double>> init_field;
init_field.reserve(prop_names.size());
for (const auto &val : init_vec) {
init_field.push_back(std::vector<double>(field_size, val));
}
// Set inner constant cells. Is it needed?
const Rcpp::DataFrame vecinj = R.parseEval("mysetup$advection$vecinj");
const Rcpp::DataFrame inner_vecinj =
R.parseEval("mysetup$advection$vecinj_inner");
for (std::size_t i = 0; i < prop_names.size(); i++) {
const Rcpp::NumericVector species_vecinj = vecinj[prop_names[i]];
this->boundary_condition.push_back(species_vecinj[0]);
}
// same logic as for diffusion module applied
for (std::size_t i = 0; i < inner_vecinj.size(); i++) {
const Rcpp::NumericVector tuple = inner_vecinj[i];
const std::uint32_t cell_index = (tuple[2] - 1) + ((tuple[1] - 1) * n);
std::vector<double> curr_cell_state(prop_names.size());
for (std::size_t prop_i = 0; prop_i < prop_names.size(); prop_i++) {
const Rcpp::NumericVector curr_prop_vec_inj = vecinj[prop_names[prop_i]];
init_field[prop_i][cell_index] = curr_prop_vec_inj[tuple[0] - 1];
curr_cell_state[prop_i] = curr_prop_vec_inj[tuple[0] - 1];
}
this->inactive_cells.insert({cell_index, std::move(curr_cell_state)});
}
this->t_field = Field(field_size, init_field, prop_names);
// FIXME: parse velocities in instantiation of class
const Rcpp::List rcpp_flux_list = R.parseEval("mysetup$advection$const_flux");
this->flux.resize(rcpp_flux_list.size());
for (std::size_t i = 0; i < rcpp_flux_list.size(); i++) {
const auto flux_2d = Rcpp::as<std::vector<double>>(rcpp_flux_list[i]);
this->flux[i] = flux_2d;
}
}
} // namespace poet

View File

@ -0,0 +1,111 @@
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
** Potsdam)
**
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#ifndef ADVECTION_MODULE_H
#define ADVECTION_MODULE_H
#include "../Base/RInsidePOET.hpp"
#include "../DataStructures/DataStructures.hpp"
#include <Rcpp.h>
#include <array>
#include <cstdint>
#include <map>
#include <utility>
#include <vector>
namespace poet {
/**
* @brief Class describing transport simulation using advection
*
* Quick and dirty implementation, assuming homogenous and constant porisity,
* pressure and temperature. Open boundary conditions are assumed.
*
*/
class AdvectionModule {
public:
/**
* @brief Construct a new TransportSim object
*
* The instance will only be initialized with given R object.
*
* @param R RRuntime object
*/
AdvectionModule(RInsidePOET &R) { initializeParams(R); }
/**
* @brief Run simulation for one iteration
*
* This will simply call the R function 'master_advection'
*
*/
void simulate(double dt);
/**
* @brief Get the transport time
*
* @return double time spent in transport
*/
double getTransportTime() const { return this->transport_t; }
/**
* \brief Returns the current diffusion field.
*
* \return Reference to the diffusion field.
*/
Field &getField() { return this->t_field; }
private:
void initializeParams(RInsidePOET &R);
double calcDeltaConc(std::size_t cell_index, double bc_val,
const std::vector<double> &spec_vec,
const std::vector<double> &flux);
std::vector<double> CFLTimeVec(double req_dt,
const std::vector<double> &max_fluxes);
std::array<std::uint32_t, 2> n_cells;
std::array<double, 2> s_cells;
std::uint32_t field_size;
double cell_volume;
std::vector<double> porosity;
std::vector<double> density;
std::vector<double> water_saturation;
std::map<std::uint32_t, std::vector<double>> inactive_cells;
std::vector<double> boundary_condition;
// FIXME: parse velocities in instantiation of class
std::vector<std::vector<double>> flux;
Field t_field;
/**
* @brief time spent for transport
*
*/
double transport_t = 0.f;
};
} // namespace poet
#endif // ADVECTION_MODULE_H

View File

@ -23,6 +23,7 @@
#include "Base/RInsidePOET.hpp" #include "Base/RInsidePOET.hpp"
#include "Base/SimParams.hpp" #include "Base/SimParams.hpp"
#include "Chemistry/ChemistryModule.hpp" #include "Chemistry/ChemistryModule.hpp"
#include "Transport/AdvectionModule.hpp"
#include "Transport/DiffusionModule.hpp" #include "Transport/DiffusionModule.hpp"
#include <poet.hpp> #include <poet.hpp>
@ -34,6 +35,7 @@
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <unistd.h>
#include <vector> #include <vector>
#include <mpi.h> #include <mpi.h>
@ -114,11 +116,14 @@ void set_chem_parameters(poet::ChemistryModule &chem, uint32_t wp_size,
chem.LoadDatabase(database_path); chem.LoadDatabase(database_path);
} }
inline double RunMasterLoop(SimParams &params, RInside &R, inline double RunMasterLoop(SimParams &params, RInsidePOET &R,
const GridParams &g_params, uint32_t nxyz_master) { const GridParams &g_params, uint32_t nxyz_master) {
DiffusionParams d_params{R}; // DiffusionParams d_params{R};
DiffusionModule diffusion(d_params, g_params); // DiffusionModule diffusion(d_params, g_params);
AdvectionModule advection(R);
/* Iteration Count is dynamic, retrieving value from R (is only needed by /* Iteration Count is dynamic, retrieving value from R (is only needed by
* master for the following loop) */ * master for the following loop) */
uint32_t maxiter = R.parseEval("mysetup$iterations"); uint32_t maxiter = R.parseEval("mysetup$iterations");
@ -131,8 +136,7 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
set_chem_parameters(chem, nxyz_master, params.getChemParams().database_path); set_chem_parameters(chem, nxyz_master, params.getChemParams().database_path);
chem.RunInitFile(params.getChemParams().input_script); chem.RunInitFile(params.getChemParams().input_script);
poet::ChemistryModule::SingleCMap init_df = DFToHashMap(d_params.initial_t); chem.initializeField(advection.getField());
chem.initializeField(diffusion.getField());
if (params.getNumParams().print_progressbar) { if (params.getNumParams().print_progressbar) {
chem.setProgressBarPrintout(true); chem.setProgressBarPrintout(true);
@ -161,17 +165,17 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
/* run transport */ /* run transport */
// TODO: transport to diffusion // TODO: transport to diffusion
diffusion.simulate(dt); advection.simulate(dt);
chem.getField().update(diffusion.getField()); chem.getField().update(advection.getField());
MSG("Chemistry step"); MSG("Chemistry step");
chem.SetTimeStep(dt); chem.SetTimeStep(dt);
chem.RunCells(); chem.RunCells();
writeFieldsToR(R, diffusion.getField(), chem.GetField()); writeFieldsToR(R, advection.getField(), chem.GetField());
diffusion.getField().update(chem.GetField()); advection.getField().update(chem.GetField());
R["req_dt"] = dt; R["req_dt"] = dt;
R["simtime"] = (sim_time += dt); R["simtime"] = (sim_time += dt);
@ -214,7 +218,7 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
R["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings()); R["phreeqc_time"] = Rcpp::wrap(chem.GetWorkerPhreeqcTimings());
R.parseEvalQ("profiling$phreeqc <- phreeqc_time"); R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
R["simtime_transport"] = diffusion.getTransportTime(); R["simtime_transport"] = advection.getTransportTime();
R.parseEvalQ("profiling$simtime_transport <- simtime_transport"); R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
// R["phreeqc_count"] = phreeqc_counts; // R["phreeqc_count"] = phreeqc_counts;
@ -225,29 +229,30 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
R.parseEvalQ("profiling$dht_hits <- dht_hits"); R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions()); R["dht_evictions"] = Rcpp::wrap(chem.GetWorkerDHTEvictions());
R.parseEvalQ("profiling$dht_evictions <- dht_evictions"); R.parseEvalQ("profiling$dht_evictions <- dht_evictions");
R["dht_corrupt_buckets"] = Rcpp::wrap(chem.GetWorkerDHTCorruptBuckets());
R.parseEvalQ("profiling$dht_corrupt_buckets <- dht_corrupt_buckets");
R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings()); R["dht_get_time"] = Rcpp::wrap(chem.GetWorkerDHTGetTimings());
R.parseEvalQ("profiling$dht_get_time <- dht_get_time"); R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings()); R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time"); R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
} }
if (params.getChemParams().use_interp) { // if (params.getChemParams().use_interp) {
R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings()); // R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
R.parseEvalQ("profiling$interp_write <- interp_w"); // R.parseEvalQ("profiling$interp_write <- interp_w");
R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings()); // R["interp_r"] = Rcpp::wrap(chem.GetWorkerInterpolationReadTimings());
R.parseEvalQ("profiling$interp_read <- interp_r"); // R.parseEvalQ("profiling$interp_read <- interp_r");
R["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings()); // R["interp_g"] = Rcpp::wrap(chem.GetWorkerInterpolationGatherTimings());
R.parseEvalQ("profiling$interp_gather <- interp_g"); // R.parseEvalQ("profiling$interp_gather <- interp_g");
R["interp_fc"] = // R["interp_fc"] =
Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings()); // Rcpp::wrap(chem.GetWorkerInterpolationFunctionCallTimings());
R.parseEvalQ("profiling$interp_function_calls <- interp_fc"); // R.parseEvalQ("profiling$interp_function_calls <- interp_fc");
R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls()); // R["interp_calls"] = Rcpp::wrap(chem.GetWorkerInterpolationCalls());
R.parseEvalQ("profiling$interp_calls <- interp_calls"); // R.parseEvalQ("profiling$interp_calls <- interp_calls");
R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits()); // R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
R.parseEvalQ("profiling$interp_cached <- interp_cached"); // R.parseEvalQ("profiling$interp_cached <- interp_cached");
} // }
chem.MasterLoopBreak(); chem.MasterLoopBreak();
diffusion.end();
return dSimTime; return dSimTime;
} }

View File

@ -7,10 +7,15 @@ target_link_libraries(testPOET doctest poetlib)
target_include_directories(testPOET PRIVATE "${PROJECT_SOURCE_DIR}/src") target_include_directories(testPOET PRIVATE "${PROJECT_SOURCE_DIR}/src")
get_filename_component(TEST_RInsideSourceFile "RInsidePOET_funcs.R" REALPATH) get_filename_component(TEST_RInsideSourceFile "RInsidePOET_funcs.R" REALPATH)
configure_file(testDataStructures.hpp.in testDataStructures.hpp) get_filename_component(TEST_SampleInputScript "${PROJECT_SOURCE_DIR}/test/advection/dolo_adv.R" REALPATH)
configure_file(testInputFiles.hpp.in InputFiles.hpp)
target_include_directories(testPOET PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") target_include_directories(testPOET PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
add_custom_target(check add_custom_target(check
COMMAND $<TARGET_FILE:testPOET> COMMAND $<TARGET_FILE:testPOET>
DEPENDS testPOET DEPENDS testPOET
) )
add_executable(advection advection/testAdvection.cpp)
target_include_directories(advection PRIVATE "${CMAKE_CURRENT_BINARY_DIR}" "${PROJECT_SOURCE_DIR}/src")
target_link_libraries(advection poetlib)

121
test/advection/dolo_adv.R Normal file
View File

@ -0,0 +1,121 @@
## Time-stamp: "Last modified 2023-08-22 12:44:53 mluebke"
#################################################################
## Section 1 ##
## Grid initialization ##
#################################################################
n <- 5
m <- 5
grid <- list(
n_cells = c(n, m),
s_cells = c(n, m)
)
##################################################################
## Section 2 ##
## Advection parameters ##
##################################################################
## initial conditions
## HACK: We need the chemical initialization here, as chem module initialization
## depends on transport until now. This will change in the future.
init_adv <- c(
"H" = 110.124,
"O" = 55.0622,
"Charge" = -1.216307659761E-09,
"C(4)" = 1.230067028174E-04,
"Ca" = 1.230067028174E-04,
"Cl" = 0,
"Mg" = 0
)
## list of boundary conditions/inner nodes
vecinj_adv <- list(
list(
"H" = 110.124,
"O" = 55.0622,
"Charge" = -1.217e-09,
"C(4)" = 0,
"Ca" = 0,
"Cl" = 0,
"Mg" = 0
),
list(
"H" = 110.683,
"O" = 55.3413,
"Charge" = 1.90431e-16,
"C(4)" = 0,
"Ca" = 0,
"Cl" = 0.002,
"Mg" = 0.001
)
)
vecinj_inner <- list(
l1 = c(2, 1, 1)
)
# Create a list to store grid cell information
flux_list <- list()
# Function to get the index of a grid cell given its row and column
get_index <- function(row, col) {
index <- (row - 1) * m + col
if (index < 1) {
index <- -1
} else if (index > n * m) {
index <- -1
}
return(index)
}
flux_val <- 1
# Loop through each row and column to populate the flux_list
for (row in 1:n) {
for (col in 1:m) {
index <- get_index(row, col)
# Initialize the connections for the current cell
flux <- c()
# Check and add connections to the east, south, west, and north cells
# east
flux <- c(flux, -flux_val)
# south
flux <- c(flux, -flux_val)
# west
flux <- c(flux, flux_val)
# north
flux <- c(flux, flux_val)
# Store the connections in the flux_list
flux_list[[index]] <- flux
}
}
vecinj <- do.call(rbind.data.frame, vecinj_adv)
names(vecinj) <- names(init_adv)
advection <- list(
init = init_adv,
vecinj = vecinj,
vecinj_inner = vecinj_inner,
const_flux = flux_list
)
#################################################################
## Section 4 ##
## Putting all those things together ##
#################################################################
setup <- list(
grid = grid,
advection = advection
)

View File

@ -0,0 +1,31 @@
#include <Transport/AdvectionModule.hpp>
#include <cstddef>
#include <string>
#include "InputFiles.hpp"
using namespace poet;
constexpr std::size_t MAX_ITER = 10;
constexpr double DT = 200;
int main(int argc, char **argv) {
auto &R = RInsidePOET::getInstance();
R["input_script"] = SampleInputScript;
R.parseEvalQ("source(input_script)");
R.parseEval("mysetup <- setup");
AdvectionModule adv(R);
R["field"] = adv.getField().asSEXP();
R.parseEval("saveRDS(field, 'adv_0.rds')");
for (std::size_t i = 0; i < MAX_ITER; i++) {
adv.simulate(DT);
const std::string save_q =
"saveRDS(field, 'adv_" + std::to_string(i + 1) + ".rds')";
R["field"] = adv.getField().asSEXP();
R.parseEval(save_q);
}
}

View File

@ -1,6 +0,0 @@
#ifndef TESTRINSIDEPOET_H_
#define TESTRINSIDEPOET_H_
inline const char *RInside_source_file = "@TEST_RInsideSourceFile@";
#endif // TESTRINSIDEPOET_H_

View File

@ -11,7 +11,7 @@
#include <Base/RInsidePOET.hpp> #include <Base/RInsidePOET.hpp>
#include <DataStructures/DataStructures.hpp> #include <DataStructures/DataStructures.hpp>
#include "testDataStructures.hpp" #include <InputFiles.hpp>
using namespace poet; using namespace poet;

View File

@ -0,0 +1,7 @@
#ifndef TESTINPUTFILES_H_
#define TESTINPUTFILES_H_
inline const char *RInside_source_file = "@TEST_RInsideSourceFile@";
inline const char *SampleInputScript = "@TEST_SampleInputScript@";
#endif // TESTINPUTFILES_H_

View File

@ -7,7 +7,7 @@
#include <Base/RInsidePOET.hpp> #include <Base/RInsidePOET.hpp>
#include <DataStructures/DataStructures.hpp> #include <DataStructures/DataStructures.hpp>
#include "testDataStructures.hpp" #include <InputFiles.hpp>
TEST_CASE("NamedVector") { TEST_CASE("NamedVector") {
RInsidePOET &R = RInsidePOET::getInstance(); RInsidePOET &R = RInsidePOET::getInstance();

144
util/Advection2D.R Normal file
View File

@ -0,0 +1,144 @@
## Time-stamp: "Last modified 2023-08-18 21:46:35 delucia"
library(ReacTran)
library(deSolve)
library(rootSolve)
options(width=114)
Centre <- function(N) {
N2 <- ceiling(N/2)
N2*(N+1)-N
}
# The model equations
HydraulicCharge <- function(t, y, parms) {
CONC <- matrix(nrow = parms["Np"], ncol = parms["Np"], data = y)
## cat("CONC[N2,N2]=",CONC[N2,N2],"\n")
CONC[ parms["N2p"], parms["N2p"] ] <- parms["injp"]
dCONC <- tran.2D(CONC, dx = parms["dxp"], dy = parms["dxp"],
D.x=parms["Kp"],
D.y=parms["Kp"],
C.x.up = rep(parms["boundp"], parms["Np"]),
C.x.down = rep(parms["boundp"], parms["Np"]),
C.y.up = rep(parms["boundp"], parms["Np"]),
C.y.down = rep(parms["boundp"], parms["Np"]))$dC
dCONC[ Centre(parms["Np"]) ] <- 0
return(list(dCONC))
}
DarcyVec <- function(v, bound, delta) {
aug <- c(bound, v, bound)
grad <- diff(aug)/delta
}
Darcy <- function(h, dx, boundary, K) {
nx <- ncol(h)
ny <- nrow(h)
## nx+1 x-components of \vec{U}
Ux <- -K*apply(h, 1, DarcyVec, bound=boundary, delta=dx)
## ny+1 y-components of \vec{U}
Uy <- -K*apply(h, 2, DarcyVec, bound=boundary, delta=dx)
list(Ux=t(Ux), Uy=Uy)
}
## To reassemble Darcy velocities from cell interfaces to cell centres
RollSums <- function(vec) {
vec[-length(vec)] + vec[-1]
}
##' @title Compute Darcy velocities assuming steady flow
##' @param N number of cells per side in 2D square grid
##' @param L domain size (m)
##' @param inj_h fixed hydraulic charge at domain center
##' @param bound_h fixed hydraulic charge at all boundaries
##' @param K permeability
##' @return
##' @author
GenerateVelocities <- function(N, L, inj_h=20, bound_h=1, K=1E-4) {
require(ReacTran)
require(deSolve)
## Construction of the 2D grid
x.grid <- setup.grid.1D(x.up = 0, L = L, N = N)
y.grid <- setup.grid.1D(x.up = 0, L = L, N = N)
grid2D <- setup.grid.2D(x.grid, y.grid)
dx <- L/N
## rough "center" of square domain
N2 <- ceiling(N/2)
## initial condition: "boundary" everywhere...
y <- matrix(nrow = N, ncol = N, data = bound_h)
## except in the central point!
y[N2, N2] <- inj_h
pars <- c(Kp=K, boundp = bound_h, Np=N, N2p=N2, injp=inj_h, dxp=dx)
cat("\n:: calling ode.2D...")
outc <- deSolve::ode.2D(y = y, func = HydraulicCharge,
times = c(0, 1E7), parms = pars,
dim = c(N, N), lrw = 71485484)
cat(" [ DONE ]\n")
charge <- matrix(outc[2,-1], nrow = N, ncol = N)
cat(":: Computing charge gradient...")
U <- Darcy(charge, dx=dx, boundary=bound_h, K=K)
cat(" [ DONE ]\n")
## Reassemble the total velocities per cell centre
fx <- -t(apply(U$Ux, 1, RollSums))
fy <- -apply(U$Uy, 2, RollSums)
x <- y <- seq(dx/2, L - dx/2, length=N)
xx <- rep(x, N)
yy <- rep(y, each=N)
norms <- sqrt(fx**2+fy**2)
tab <- data.frame(x=xx, y=yy, norm=as.numeric(norms), ux=as.numeric(fx), uy=as.numeric(fy))
list(Charge=charge, outc=outc, U=U, xy=tab)
}
PlotFlow <- function(tab, skip=1, scale=TRUE, arr.len=0.05,
arr.lwd = 1, ...) {
if (scale) {
tab$ux <- scale(tab$ux, center=FALSE)
tab$uy <- scale(tab$uy, center=FALSE)
}
ngrid <- sqrt(nrow(tab))
dx <- 2*tab$x[1]
fieldlen <- dx*ngrid
plot(0, 0, "n", xlim = c(0,fieldlen), ylim = c(0,fieldlen),
asp = 1, xlab="EASTING", ylab="NORTHING", las=1, ...)
arrows(x0 = tab$x, y0 = tab$y,
x1 = tab$x - tab$uy, y1 = tab$y - tab$ux,
length=arr.len, lwd=arr.lwd)
invisible()
}
## square domain of 100x100 m discretized in 101x101 cells
aa <- GenerateVelocities(N=201, L=100, inj_h=20, bound_h=1, K=1E-2)
## plot hydraulic charge (via deSolve's method!)
image(aa$outc, ask = FALSE, main ="Hydraulic charge",
legend = TRUE, add.contour = FALSE, subset = time == 1E7,
xlab="", ylab="", axes=FALSE, asp=1)
x11()
PlotFlow(aa$xy, skip=1, scale=TRUE, arr.lwd = 0.5)
image(matrix(log10(aa$xy$norm), ncol=201), asp=1)