Compare commits

..

84 Commits

Author SHA1 Message Date
Marco De Lucia
58175dd8e2 Merge branch 'mdl/defaultstorelease' into 'main'
DEFAULT_BUILD_TYPE set to Release

See merge request naaice/poet!60
2025-12-06 22:37:17 +01:00
Marco De Lucia
c8a779ca58 set DEFAULT_BUILD_TYPE to "Release"; defined properties so one cycles through when using ccmake 2025-11-14 19:19:07 +01:00
Marco De Lucia
14c08460ad Merge branch 'mdl/readme' into 'main'
Mdl/readme

Closes #22

See merge request naaice/poet!59
2025-11-14 15:31:46 +01:00
Marco De Lucia
0dc9352459 Updated README - domain change, litephreeqc 2025-11-14 15:28:25 +01:00
Max Lübke
9cd487cc15 Merge branch 'mdl/litephreeqc' into 'main'
Chore: New URL and name for litephreeqc in .gitmodules

See merge request naaice/poet!58
2025-11-13 16:45:53 +01:00
Max Lübke
04ac4b8b7b ci(gitlab): allow mirror push job to fail 2025-11-13 16:39:06 +01:00
Max Lübke
2c813b9e24 build: set path for submodule 2025-11-13 16:35:57 +01:00
Max Lübke
1b55f5e651 chore: use git mv to set new path for litephreeqc 2025-11-13 16:31:47 +01:00
Max Lübke
746ae15645 chore: rollback submodule path 2025-11-13 16:30:55 +01:00
Marco De Lucia
9066ae942b Chore: New URL and name for litephreeqc in .gitmodules 2025-11-13 16:22:21 +01:00
Marco De Lucia
22f5f6f21d chore: add CITATIONS.cff 2025-10-24 11:29:57 +02:00
Marco De Lucia
cbe0712766 Merge branch '19-cmake-requirements-missmatch-with-tug' into 'main'
Resolve "CMake requirements missmatch with TUG."

Closes #19

See merge request naaice/poet!57
2025-10-24 10:33:41 +02:00
Max Lübke
21c606f5b5 docs(license): add SPDX identifier to source files 2025-10-24 09:41:25 +02:00
Max Lübke
c8bedaac28 refactor(diffusion): integrate tug::Diffusion solver 2025-10-24 09:41:03 +02:00
Max Lübke
5f2b0cc942 build(deps): update cli11 to v2.5.0 2025-10-24 09:38:26 +02:00
Max Lübke
0d307c790f build(deps): update minimum CMake to 3.20 and tug submodule 2025-10-24 09:38:05 +02:00
Max Lübke
fb1f053055 Merge branch 'ml/mirror-to-github' into 'main'
ci(github): add job to push to GitHub remote

See merge request naaice/poet!56
2025-10-15 09:58:35 +02:00
Max Lübke
86a683f07c ci(github): add job to push to GitHub remote 2025-10-15 09:57:05 +02:00
Max Lübke
490a28903b Merge branch 'chore' into 'main'
chore: update iphreeqc submodule

See merge request naaice/poet!54
2025-09-23 12:21:14 +02:00
Max Lübke
0317ddbf8f chore: update iphreeqc submodule 2025-09-23 12:19:45 +02:00
Max Lübke
92d6044e97 Merge branch 'ml/add-build-type-to-readme' into 'main'
Add CMake Build Type to Readme

See merge request naaice/poet!53
2025-05-20 14:29:00 +02:00
Max Lübke
77ccd11220 Add CMake Build Type to Readme 2025-05-20 14:28:45 +02:00
Marco De Lucia
ccd54aaa09 Merge branch '17-new-barite-benchmarks-for-fgcs-journal' into 'main'
Resolve "New barite benchmarks for FGCS journal"

Closes #17

See merge request naaice/poet!44
2025-05-19 12:50:30 +02:00
Max Lübke
66d908b548 fix: Correct boolean conversion in DHT hook call 2025-03-12 10:35:23 +01:00
Hannes Martin Signer
a4bf2a13f9 Merge branch 'ml/add-tug-solver-option' into 'main'
Add option to toggle tug numerical solver approach

See merge request naaice/poet!50
2025-01-29 16:17:52 +01:00
Max Luebke
85e93955bc Add option to toggle tug numerical solver approach 2025-01-29 15:53:32 +01:00
Max Lübke
5a7779e8de refactor: clean up CMakeLists.txt by removing unnecessary target_sources indentation 2025-01-09 12:18:13 +01:00
Max Lübke
bcab85c331 feat: add index tracking and flushing functionality to DHT 2025-01-09 12:18:05 +01:00
Max Lübke
dffbec674c Merge branch '17-new-barite-benchmarks-for-fgcs-journal' of git.gfz-potsdam.de:naaice/poet into 17-new-barite-benchmarks-for-fgcs-journal 2024-12-20 10:12:51 +01:00
Max Lübke
2df3f9f34a [wip] add input validation for work_package in DHT_Wrapper and InterpolationModule 2024-12-20 10:12:19 +01:00
Max Lübke
cefea926cf fix: update grid dimensions and refine species definitions in barite_fgcs_2.R 2024-12-20 10:08:07 +01:00
Marco De Lucia
b93cdeed83 Fancier benchmark for fgcs paper 2024-12-20 10:08:07 +01:00
Max Lübke
8b7e3e983e Merge branch 'hotfix-qs-option' into 'main'
fix: remove duplicate flag for saving output as .qs file in parseInitValues

See merge request naaice/poet!49
2024-12-20 10:07:21 +01:00
Max Lübke
a14095622f fix: remove duplicate flag for saving output as .qs file in parseInitValues 2024-12-20 10:05:49 +01:00
Marco De Lucia
554d501ed0 Fancier benchmark for fgcs paper 2024-12-20 09:24:19 +01:00
Max Lübke
ab01dbc0a7 Merge branch 'ml/fix-interpolation' into 'main'
Fix Interpolation and add optional features to POET benchmark description

See merge request naaice/poet!48
2024-12-20 09:23:44 +01:00
Max Lübke
eb384b48c0 Merge branch 'main' into 'ml/fix-interpolation'
# Conflicts:
#   src/initializer.cpp
#   src/poet.cpp
#   src/poet.hpp.in
2024-12-20 09:22:55 +01:00
Max Lübke
e34c2ef464 fix: initialize default values for RuntimeParameters in poet.hpp.in 2024-12-20 09:21:18 +01:00
Max Lübke
d5a70a20bb fix: update dht_species values for barite and celestite in benchmark scripts 2024-12-20 09:21:18 +01:00
Max Lübke
a718c3efaf Revert "feat: make output of H(0) and O(0) optional"
This reverts commit bdb5ce944fd001cb36c8369d8c792f2f4a54c7be.
2024-12-20 09:21:18 +01:00
Max Lübke
5664153cf9 fix: handle zero distance case in inverseDistanceWeighting to prevent division by zero 2024-12-20 09:21:18 +01:00
Max Lübke
6ab404eefd feat: add with_h0_o0 and with_redox parameters to ChemistryModule and related classes 2024-12-20 09:21:18 +01:00
Max Luebke
e5588a2ee9 feat: add support for redox in PhreeqcMatrix initialization 2024-12-20 09:21:18 +01:00
Max Lübke
6fb226887e feat: enhance LookupKey with const isnan method and comparison operator 2024-12-20 09:21:18 +01:00
Max Lübke
521937a527 refactor: streamline WorkPackage constructor and improve DHT_Wrapper::fillDHT logic 2024-12-20 09:21:17 +01:00
Max Luebke
d661da13bd feat: add dht_snaps and dht_out_dir parameters to ChemistryModule and main function 2024-12-20 09:21:17 +01:00
Max Luebke
06728bc28b fix: initialize includeH0O0 to false in command line options 2024-12-20 09:21:17 +01:00
Max Luebke
c5b0ae201c feat: make output of H(0) and O(0) optional 2024-12-20 09:21:17 +01:00
Max Lübke
ac96f24a33 feat: add has_het_ids parameter to DHT initialization and related functions 2024-12-20 09:21:17 +01:00
Max Lübke
9b9fd898b7 fix: use const reference for to_calc in InterpolationModule to improve performance 2024-12-20 09:21:17 +01:00
Max Lübke
536ebdd351 fix: remove double initialization of dht_enabled in ChemistryModule 2024-12-20 09:21:17 +01:00
Max Lübke
f6b4ce017a feat: implement caching for interpolation calculations and add NaN handling 2024-12-20 09:21:16 +01:00
Max Luebke
43d2a846c7 [wip] fix: add base_totals to SurrogateSetup 2024-12-20 09:21:16 +01:00
Marco De Lucia
dd9cc5e59f reverting <format> since gcc < 13 does not support it 2024-12-20 09:21:16 +01:00
Marco De Lucia
2e3de84a90 Update references to .qs2 in README 2024-12-20 09:21:16 +01:00
Marco De Lucia
3b47b8ac1f Update README: qs2 as default output format, gfz.de everywhere 2024-12-20 09:21:16 +01:00
Marco De Lucia
0437670e05 Less and more informative stdout messages 2024-12-20 09:21:16 +01:00
Marco De Lucia
9f89edd492 Added qs2 as new default format 2024-12-20 09:21:15 +01:00
Max Lübke
347b60c3f3 Merge branch 'ml/gitlab-ci' into 'main'
Refactor CI pipeline by removing the build stage and adjusting test job dependencies

See merge request naaice/poet!47
2024-12-20 09:06:18 +01:00
Max Lübke
c36fe346b3 Refactor CI pipeline by removing the build stage and adjusting test job dependencies 2024-12-20 09:04:30 +01:00
Max Lübke
f4f4ea2be2 Merge branch 'ml/fix-license' into 'main'
Revert "No code changes made."

See merge request naaice/poet!46
2024-12-20 09:00:27 +01:00
Max Lübke
60c4a15c9f Update LICENSE to include updated copyright notice and additional guidelines for program distribution 2024-12-20 08:58:40 +01:00
Max Lübke
0c22f17df7 Revert "No code changes made."
This reverts commit f60b7cf7fcb33e9ff55e1c8050775217d9d37f1a.
2024-12-20 08:57:51 +01:00
Max Lübke
824442b315 Merge branch 'ml/fix-license' into 'main'
Fix license parsing error of GitLab?

See merge request naaice/poet!45
2024-12-20 08:55:43 +01:00
Max Lübke
f60b7cf7fc No code changes made. 2024-12-20 08:46:51 +01:00
Max Lübke
d121585894 Merge branch 'mdl/fgcsbench' into 'main'
Mdl/fgcsbench

See merge request naaice/poet!43
2024-12-13 12:19:57 +01:00
Marco De Lucia
03385d3caf reverting <format> since gcc < 13 does not support it 2024-12-13 12:14:55 +01:00
Marco De Lucia
2478518d73 Update references to .qs2 in README 2024-12-13 12:14:55 +01:00
Marco De Lucia
9a787b793b Update README: qs2 as default output format, gfz.de everywhere 2024-12-13 12:14:55 +01:00
Marco De Lucia
a519edd102 Less and more informative stdout messages 2024-12-13 12:14:55 +01:00
Marco De Lucia
61d3a8a227 Added qs2 as new default format 2024-12-13 12:14:55 +01:00
Hannes Martin Signer
b29be5fd6d Merge branch 'update-scheme' into 'main'
update poet scheme

See merge request naaice/poet!42
2024-12-10 15:03:54 +01:00
Hannes Martin Signer
b19cdef894 Update 2 files
- /docs/POET_scheme.svg
- /docs/POET.drawio
2024-12-10 15:02:45 +01:00
Max Lübke
b67c1fd36d Merge branch 'update-poet-scheme' into 'main'
update POET scheme

See merge request naaice/poet!41
2024-12-10 11:15:48 +01:00
Hannes Martin Signer
2dba14d3bd update POET scheme 2024-12-10 11:14:01 +01:00
Max Lübke
5c92a2156f Merge branch 'ml/iphreeqc-v0.3' into 'main'
Update IPhreeqc Submodule to v0.3

See merge request naaice/poet!40
2024-12-04 11:35:27 +01:00
Max Lübke
6c660557fd refactor: Replace PhreeqcEngine instances with PhreeqcRunner for improved chemistry processing 2024-12-04 10:34:13 +00:00
Max Lübke
db7a2ad2ce build: Enhance Dockerfile for improved environment setup and dependency management 2024-12-04 08:44:55 +00:00
Max Lübke
0d9162fb66 Merge branch 'ml/misc' into 'main'
Various changes to build files (CMake & devcontainer)

See merge request naaice/poet!38
2024-11-07 20:57:45 +01:00
Max Lübke
716454e115 build: Update Dockerfile and devcontainer.json for enhanced environment setup 2024-11-07 19:56:04 +00:00
Max Lübke
287eda880a build: Refactor R runtime detection in CMake to ensure required parameters are set 2024-11-07 19:56:04 +00:00
Max Lübke
66098aab40 Merge branch 'ml/new-iphreeqc-api' into 'main'
Update to IPhreeqc v3.8.2

See merge request naaice/poet!37
2024-11-07 14:23:58 +01:00
Max Lübke
cdbf344329 fix: Update getSolutionNames call to remove unnecessary argument 2024-11-07 14:22:58 +01:00
Max Lübke
45ea77ae0f Update subproject commit in ext/iphreeqc 2024-11-07 14:22:51 +01:00
57 changed files with 2063 additions and 2212 deletions

View File

@ -1,20 +1,108 @@
FROM mcr.microsoft.com/vscode/devcontainers/base:debian FROM gcc:11.2.0 AS builder
RUN sudo apt-get update && export DEBIAN_FRONTEND=noninteractive \ ENV DEBIAN_FRONTEND=noninteractive
&& sudo apt-get install -y \
cmake \
git \
libeigen3-dev \
libopenmpi-dev \
r-base-dev
RUN git clone https://github.com/doctest/doctest.git /doctest \ RUN apt-get update \
&& cd /doctest \ && apt-get install -y \
&& mkdir build \ sudo \
&& cd build \ git \
&& cmake .. \ ninja-build \
&& make install \ libmpfr-dev \
&& cd / \ python3-dev && \
&& rm -rf /doctest apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN /usr/bin/R -q -e "install.packages(c('Rcpp', 'RInside'))" WORKDIR /tmp
ARG OPENMPI_VERSION=4.1.1
ADD https://download.open-mpi.org/release/open-mpi/v${OPENMPI_VERSION%.*}/openmpi-${OPENMPI_VERSION}.tar.gz /tmp/openmpi.tar.gz
RUN mkdir openmpi && \
tar xf openmpi.tar.gz -C openmpi --strip-components 1 && \
cd openmpi && \
./configure --prefix=/usr/local && \
make -j $(nproc) && \
make install && \
rm -rf /tmp/openmpi tmp/openmpi.tar.gz
ARG CMAKE_VERSION=3.30.5
ADD https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.sh /tmp/cmake.sh
RUN bash ./cmake.sh --skip-license --prefix=/usr/local \
&& rm cmake.sh
ARG LAPACK_VERSION=3.12.0
ADD https://github.com/Reference-LAPACK/lapack/archive/refs/tags/v${LAPACK_VERSION}.tar.gz /tmp/lapack.tar.gz
RUN mkdir lapack && \
tar xf lapack.tar.gz -C lapack --strip-components 1 && \
cd lapack && \
mkdir build && \
cd build && \
cmake .. -G Ninja -DBUILD_SHARED_LIBS=ON && \
ninja install && \
rm -rf /tmp/lapack tmp/lapack.tar.gz
ARG R_VERSION=4.4.2
ADD https://cran.r-project.org/src/base/R-${R_VERSION%%.*}/R-${R_VERSION}.tar.gz /tmp/R.tar.gz
RUN mkdir R && \
tar xf R.tar.gz -C R --strip-components 1 && \
cd R && \
./configure --prefix=/usr/local --enable-R-shlib --with-blas --with-lapack && \
make -j $(nproc) && \
make install && \
rm -rf /tmp/R tmp/R.tar.gz
RUN /usr/local/bin/R -q -e "install.packages(c('Rcpp', 'RInside', 'qs'), repos='https://cran.rstudio.com/')"
ARG EIGEN_VERSION=3.4.0
ADD https://gitlab.com/libeigen/eigen/-/archive/${EIGEN_VERSION}/eigen-${EIGEN_VERSION}.tar.bz2 /tmp/eigen.tar.bz2
RUN mkdir eigen && \
tar xf eigen.tar.bz2 -C eigen --strip-components 1 && \
cd eigen && \
mkdir build && \
cd build && \
cmake .. -G Ninja && \
ninja install && \
rm -rf /tmp/eigen tmp/eigen.tar.bz2
ARG GDB_VERSION=15.2
ADD https://ftp.gnu.org/gnu/gdb/gdb-${GDB_VERSION}.tar.xz /tmp/gdb.tar.xz
RUN mkdir gdb && \
tar xf gdb.tar.xz -C gdb --strip-components 1 && \
cd gdb && \
./configure --prefix=/usr/local && \
make -j $(nproc) && \
make install && \
rm -rf /tmp/gdb tmp/gdb.tar.xz
RUN useradd -m -s /bin/bash -G sudo vscode \
&& echo "vscode ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
USER vscode
ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
RUN sudo apt-get update && \
sudo apt-get install -y zsh && \
sudo apt-get clean && \
sudo rm -rf /var/lib/apt/lists/*
RUN sh -c "$(wget -O- https://github.com/deluan/zsh-in-docker/releases/download/v1.2.1/zsh-in-docker.sh)" -- \
-t agnoster \
-p zsh-syntax-highlighting
RUN zsh -c "git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting"
RUN zsh -c "git clone --depth 1 https://github.com/junegunn/fzf.git ~/.fzf && ~/.fzf/install"
RUN mkdir -p /home/vscode/.config/gdb \
&& echo "set auto-load safe-path /" > /home/vscode/.config/gdb/gdbinit
ENV CMAKE_GENERATOR=Ninja
ENV CMAKE_EXPORT_COMPILE_COMMANDS=ON
WORKDIR /home/vscode

View File

@ -21,7 +21,8 @@
}, },
// in case you want to push/pull from remote repositories using ssh // in case you want to push/pull from remote repositories using ssh
"mounts": [ "mounts": [
"source=${localEnv:HOME}/.ssh,target=/home/vscode/.ssh,type=bind,consistency=cached" "source=${localEnv:HOME}/.ssh,target=/home/vscode/.ssh,type=bind,consistency=cached",
"source=${localEnv:HOME}/.gitconfig,target=/home/vscode/.gitconfig,type=bind,consistency=cached"
] ]
// Uncomment to connect as an existing user other than the container default. More info: https://aka.ms/dev-containers-non-root. // Uncomment to connect as an existing user other than the container default. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "devcontainer" // "remoteUser": "devcontainer"

View File

@ -20,7 +20,6 @@ image: git.gfz-potsdam.de:5000/naaice/poet:ci
stages: # List of stages for jobs, and their order of execution stages: # List of stages for jobs, and their order of execution
- release - release
- build
- test - test
variables: variables:
@ -28,26 +27,11 @@ variables:
SOURCE_ARCHIVE_NAME: 'poet_${CI_COMMIT_TAG}_sources.tar.gz' SOURCE_ARCHIVE_NAME: 'poet_${CI_COMMIT_TAG}_sources.tar.gz'
CHANGELOG_FILE: 'commit_changelog.md' CHANGELOG_FILE: 'commit_changelog.md'
build-poet: # This job runs in the build stage, which runs first. test: # This job runs in the build stage, which runs first.
stage: build stage: test
script: script:
- mkdir -p build && cd build - mkdir -p build && cd build
- cmake -DPOET_ENABLE_TESTING=ON -DPOET_PREPROCESS_BENCHS=OFF .. - cmake -DPOET_ENABLE_TESTING=ON -DPOET_PREPROCESS_BENCHS=OFF -DCMAKE_BUILD_TYPE=Release ..
- make -j$(nproc)
cache:
key: build-$CI_COMMIT_BRANCH
paths:
- build
artifacts:
paths:
- build
test-poet:
stage: test
dependencies:
- build-poet
script:
- cd build
- make -j$(nproc) check - make -j$(nproc) check
pages: pages:
@ -65,6 +49,22 @@ pages:
rules: rules:
- if: $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH || $CI_COMMIT_TAG - if: $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH || $CI_COMMIT_TAG
push:
stage: release
variables:
GITHUB_REPOSITORY: 'git@github.com:POET-Simulator/POET.git'
before_script:
# I know that there is this file env variable in gitlab, but somehow it does not work for me (still complaining about white spaces ...)
# Therefore, the ssh key is stored as a base64 encoded string
- mkdir -p ~/.ssh && echo $GITHUB_SSH_PRIVATE_KEY | base64 -d > ~/.ssh/id_ed25519 && chmod 0600 ~/.ssh/id_ed25519
- ssh-keyscan github.com >> ~/.ssh/known_hosts
- echo $MIRROR_SCRIPT | base64 -d > mirror.sh && chmod +x mirror.sh
script:
- if [[-d poet.git ]]; then rm -rf poet.git; fi
- git clone --mirror "https://git.gfz-potsdam.de/naaice/poet.git" "poet.git" && cd poet.git
- git push --mirror $GITHUB_REPOSITORY
allow_failure: true
#archive-sources: # This job runs in the build stage, which runs first. #archive-sources: # This job runs in the build stage, which runs first.
# image: python:3 # image: python:3
# stage: release # stage: release

6
.gitmodules vendored
View File

@ -2,6 +2,6 @@
path = ext/tug path = ext/tug
url = ../tug.git url = ../tug.git
[submodule "ext/iphreeqc"] [submodule "ext/litephreeqc"]
path = ext/iphreeqc path = ext/litephreeqc
url = ../iphreeqc.git url = ../litephreeqc.git

51
CITATION.cff Normal file
View File

@ -0,0 +1,51 @@
# This CITATION.cff file was generated with cffinit.
# Visit https://bit.ly/cffinit to generate yours today!
cff-version: 1.2.0
title: 'POET: POtsdam rEactive Transport'
message: >-
If you use this software, please cite it using these
metadata.
type: software
authors:
- given-names: Max
family-names: Lübke
email: mluebke@uni-potsdam.de
affiliation: University of Potsdam
orcid: 'https://orcid.org/0009-0008-9773-3038'
- given-names: Marco
family-names: De Lucia
email: delucia@gfz.de
affiliation: GFZ Helmholtz Centre for Geosciences
orcid: 'https://orcid.org/0000-0002-1186-4491'
- given-names: Alexander
family-names: Lindemann
- given-names: Hannes
family-names: Signer
email: signer@uni-potsdam.de
orcid: 'https://orcid.org/0009-0000-3058-8472'
- given-names: Bettina
family-names: Schnor
email: schnor@cs.uni-potsdam.de
affiliation: University of Potsdam
orcid: 'https://orcid.org/0000-0001-7369-8057'
- given-names: Hans
family-names: Straile
identifiers:
- type: doi
value: 10.5194/gmd-14-7391-2021
description: >-
POET (v0.1): speedup of many-core parallel reactive
transport simulations with fast DHT lookups
repository-code: 'https://git.gfz-potsdam.de/naaice/poet'
abstract: >-
Massively parallel reactive transport simulator exploring
acceleration strategies such as embedding of AI/ML and
cache of results in Distributed Hash Tables. Developed in
cooperation with computer scientists of University of
Potsdam.
keywords:
- Reactive Transport
- Geochemistry
- AI/ML Surrogate Modelling
license: GPL-2.0

View File

@ -1,28 +1,27 @@
# prepare R environment (Rcpp + RInside) # prepare R environment (Rcpp + RInside)
find_program(R_EXE "R") find_program(R_EXE "R"
REQUIRED
)
# search for R executable, R header file and library path # search for R executable, R header file and library path
if(R_EXE) execute_process(
execute_process( COMMAND ${R_EXE} RHOME
COMMAND ${R_EXE} RHOME OUTPUT_VARIABLE R_ROOT_DIR
OUTPUT_VARIABLE R_ROOT_DIR OUTPUT_STRIP_TRAILING_WHITESPACE
OUTPUT_STRIP_TRAILING_WHITESPACE )
)
find_path( find_path(
R_INCLUDE_DIR R.h R_INCLUDE_DIR R.h
HINTS ${R_ROOT_DIR} HINTS /usr/include /usr/local/include /usr/share ${R_ROOT_DIR}/include
PATHS /usr/include /usr/local/include /usr/share PATH_SUFFIXES R/include R
PATH_SUFFIXES include/R R/include REQUIRED
) )
find_library( find_library(
R_LIBRARY libR.so R_LIBRARY libR.so
HINTS ${R_ROOT_DIR}/lib HINTS ${R_ROOT_DIR}/lib
) REQUIRED
else() )
message(FATAL_ERROR "No R runtime found!")
endif()
set(R_LIBRARIES ${R_LIBRARY}) set(R_LIBRARIES ${R_LIBRARY})
set(R_INCLUDE_DIRS ${R_INCLUDE_DIR}) set(R_INCLUDE_DIRS ${R_INCLUDE_DIR})
@ -71,11 +70,8 @@ list(APPEND R_INCLUDE_DIRS ${R_RInside_INCLUDE_DIR})
# putting all together into interface library # putting all together into interface library
add_library(RRuntime INTERFACE IMPORTED) add_library(RRuntime INTERFACE IMPORTED)
set_target_properties( target_link_libraries(RRuntime INTERFACE ${R_LIBRARIES})
RRuntime PROPERTIES target_include_directories(RRuntime INTERFACE ${R_INCLUDE_DIRS})
INTERFACE_LINK_LIBRARIES "${R_LIBRARIES}"
INTERFACE_INCLUDE_DIRECTORIES "${R_INCLUDE_DIRS}"
)
unset(R_LIBRARIES) unset(R_LIBRARIES)
unset(R_INCLUDE_DIRS) unset(R_INCLUDE_DIRS)

View File

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.14) cmake_minimum_required(VERSION 3.20)
project(POET project(POET
LANGUAGES CXX C LANGUAGES CXX C
@ -8,6 +8,16 @@ project(POET
set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_CXX_STANDARD_REQUIRED True)
set(DEFAULT_BUILD_TYPE "Release")
if(NOT CMAKE_BUILD_TYPE)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}'.")
set(CMAKE_BUILD_TYPE "${DEFAULT_BUILD_TYPE}" CACHE
STRING "Choose the type of build." FORCE)
# Set the possible values of build type for cmake-gui
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS
"Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif()
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
include("CMake/POET_Scripts.cmake") include("CMake/POET_Scripts.cmake")
@ -16,6 +26,7 @@ list(APPEND CMAKE_MODULE_PATH "${POET_SOURCE_DIR}/CMake")
get_poet_version() get_poet_version()
find_package(MPI REQUIRED) find_package(MPI REQUIRED)
find_package(RRuntime REQUIRED) find_package(RRuntime REQUIRED)
add_subdirectory(src) add_subdirectory(src)
@ -30,7 +41,7 @@ endif()
set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE) set(TUG_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
add_subdirectory(ext/tug EXCLUDE_FROM_ALL) add_subdirectory(ext/tug EXCLUDE_FROM_ALL)
add_subdirectory(ext/iphreeqc EXCLUDE_FROM_ALL) add_subdirectory(ext/litephreeqc EXCLUDE_FROM_ALL)
option(POET_ENABLE_TESTING "Build test suite for POET" OFF) option(POET_ENABLE_TESTING "Build test suite for POET" OFF)

62
LICENSE
View File

@ -1,8 +1,8 @@
GNU GENERAL PUBLIC LICENSE GNU GENERAL PUBLIC LICENSE
Version 2, June 1991 Version 2, June 1991
Copyright (C) 1989, 1991 Free Software Foundation, Inc., <http://fsf.org/> Copyright (C) 1989, 1991 Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed. of this license document, but changing it is not allowed.
@ -278,3 +278,61 @@ PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGES. POSSIBILITY OF SUCH DAMAGES.
END OF TERMS AND CONDITIONS END OF TERMS AND CONDITIONS
How to Apply These Terms to Your New Programs
If you develop a new program, and you want it to be of the greatest
possible use to the public, the best way to achieve this is to make it
free software which everyone can redistribute and change under these terms.
To do so, attach the following notices to the program. It is safest
to attach them to the start of each source file to most effectively
convey the exclusion of warranty; and each file should have at least
the "copyright" line and a pointer to where the full notice is found.
<one line to give the program's name and a brief idea of what it does.>
Copyright (C) <year> <name of author>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, see <https://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail.
If the program is interactive, make it output a short notice like this
when it starts in an interactive mode:
Gnomovision version 69, Copyright (C) year name of author
Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it
under certain conditions; type `show c' for details.
The hypothetical commands `show w' and `show c' should show the appropriate
parts of the General Public License. Of course, the commands you use may
be called something other than `show w' and `show c'; they could even be
mouse-clicks or menu items--whatever suits your program.
You should also get your employer (if you work as a programmer) or your
school, if any, to sign a "copyright disclaimer" for the program, if
necessary. Here is a sample; alter the names:
Yoyodyne, Inc., hereby disclaims all copyright interest in the program
`Gnomovision' (which makes passes at compilers) written by James Hacker.
<signature of Moe Ghoul>, 1 April 1989
Moe Ghoul, President of Vice
This General Public License does not permit incorporating your program into
proprietary programs. If your program is a subroutine library, you may
consider it more useful to permit linking proprietary applications with the
library. If this is what you want to do, use the GNU Lesser General
Public License instead of this License.

190
README.md
View File

@ -1,25 +1,30 @@
# POET # POET
**NOTE: GFZ is migrating its domain from <gfz-potsdam.de> to <gfz.de>.
This should be finalized by the end of 2025. We adopt the NEW domain
in all the links given below. If you encounter 'unreachable address'
try the OLD domain.**
[POET](https://doi.org/10.5281/zenodo.4757913) is a coupled reactive [POET](https://doi.org/10.5281/zenodo.4757913) is a coupled reactive
transport simulator implementing a parallel architecture and a fast, transport simulator implementing a parallel architecture and a fast,
original MPI-based Distributed Hash Table. original MPI-based Distributed Hash Table.
![POET's Coupling Scheme](./docs/Scheme_POET_en.svg) ![POET's Coupling Scheme](./docs/POET_scheme.svg)
## Parsed code documentiation ## Parsed code documentiation
A parsed version of POET's documentation can be found at [Gitlab A parsed version of POET's documentation can be found at [Gitlab
pages](https://naaice.git-pages.gfz-potsdam.de/poet). pages](https://naaice.git-pages.gfz.de/poet).
## External Libraries ## External Libraries
The following external libraries are shipped with POET: The following external libraries are shipped with POET:
- **CLI11** - <https://github.com/CLIUtils/CLI11> - **CLI11** - <https://github.com/CLIUtils/CLI11>
- **IPhreeqc** with patches from GFZ/UP - - **litephreeqc**: IPhreeqc
<https://github.com/usgs-coupled/iphreeqc> - (<https://github.com/usgs-coupled/iphreeqc>) with patches from
<https://git.gfz-potsdam.de/naaice/iphreeqc> GFZ/UP: <https://git.gfz.de/naaice/litephreeqc>
- **tug** - <https://git.gfz-potsdam.de/naaice/tug> - **tug** - <https://git.gfz.de/naaice/tug>
## Installation ## Installation
@ -29,7 +34,7 @@ To compile POET you need following software to be installed:
- C/C++ compiler (tested with GCC) - C/C++ compiler (tested with GCC)
- MPI-Implementation (tested with OpenMPI and MVAPICH) - MPI-Implementation (tested with OpenMPI and MVAPICH)
- CMake 3.9+ - CMake 3.20+
- Eigen3 3.4+ (required by `tug`) - Eigen3 3.4+ (required by `tug`)
- *optional*: `doxygen` with `dot` bindings for documentation - *optional*: `doxygen` with `dot` bindings for documentation
- R language and environment including headers or `-dev` packages - R language and environment including headers or `-dev` packages
@ -41,7 +46,7 @@ installed:
- [Rcpp](https://cran.r-project.org/web/packages/Rcpp/index.html) - [Rcpp](https://cran.r-project.org/web/packages/Rcpp/index.html)
- [RInside](https://cran.r-project.org/web/packages/RInside/index.html) - [RInside](https://cran.r-project.org/web/packages/RInside/index.html)
- [qs](https://cran.r-project.org/web/packages/qs/index.html) - [qs](https://cran.r-project.org/web/packages/qs/index.html)
- [qs2](https://cran.r-project.org/web/packages/qs2/index.html)
This can be simply achieved by issuing the following commands: This can be simply achieved by issuing the following commands:
```sh ```sh
@ -49,7 +54,7 @@ This can be simply achieved by issuing the following commands:
$ R $ R
# install R dependencies (case sensitive!) # install R dependencies (case sensitive!)
> install.packages(c("Rcpp", "RInside","qs")) > install.packages(c("Rcpp", "RInside","qs","qs2"))
> q(save="no") > q(save="no")
``` ```
@ -59,7 +64,7 @@ POET can be anonimously cloned from this repo over https. Make sure to
also download the submodules: also download the submodules:
```sh ```sh
git clone --recurse-submodules https://git.gfz-potsdam.de/naaice/poet.git git clone --recurse-submodules https://git.gfz.de/naaice/poet.git
``` ```
The `--recurse-submodules` option is a shorthand for: The `--recurse-submodules` option is a shorthand for:
```sh ```sh
@ -74,7 +79,7 @@ usual:
```sh ```sh
mkdir build && cd build mkdir build && cd build
cmake .. cmake -DCMAKE_BUILD_TYPE=Release ..
``` ```
This will create the directory `build` and processes the CMake files This will create the directory `build` and processes the CMake files
@ -98,11 +103,6 @@ following available options:
slowed down significantly. Defaults to _OFF_. slowed down significantly. Defaults to _OFF_.
- **POET_PREPROCESS_BENCHS**=*boolean* - enables the preprocessing of - **POET_PREPROCESS_BENCHS**=*boolean* - enables the preprocessing of
predefined models/benchmarks. Defaults to *ON*. predefined models/benchmarks. Defaults to *ON*.
- **USE_AI_SURROGATE**=*boolean* - includes the functions of the AI
surrogate model. When active, CMake relies on `find_package()` to find
an a implementation of `Threads` and a Python environment where Numpy
and Keras need to be installed. Defaults to _OFF_.
### Example: Build from scratch ### Example: Build from scratch
@ -115,7 +115,7 @@ follows:
$ R $ R
# install R dependencies # install R dependencies
> install.packages(c("Rcpp", "RInside","qs")) > install.packages(c("Rcpp", "RInside","qs","qs2"))
> q(save="no") > q(save="no")
# cd into POET project root # cd into POET project root
@ -123,7 +123,7 @@ $ cd <POET_dir>
# Build process # Build process
$ mkdir build && cd build $ mkdir build && cd build
$ cmake -DCMAKE_INSTALL_PREFIX=/home/<user>/poet .. $ cmake -DCMAKE_INSTALL_PREFIX=/home/<user>/poet -DCMAKE_BUILD_TYPE=Release ..
$ make -j<max_numprocs> $ make -j<max_numprocs>
$ make install $ make install
``` ```
@ -143,17 +143,17 @@ poet
└── share └── share
└── poet └── poet
├── barite ├── barite
│   ├── barite_200.rds │   ├── barite_200.qs2
│   ├── barite_200_rt.R │   ├── barite_200_rt.R
│   ├── barite_het.rds │   ├── barite_het.qs2
│   └── barite_het_rt.R │   └── barite_het_rt.R
├── dolo ├── dolo
│   ├── dolo_inner_large.rds │   ├── dolo_inner_large.qs2
│   ├── dolo_inner_large_rt.R │   ├── dolo_inner_large_rt.R
│   ├── dolo_interp.rds │   ├── dolo_interp.qs2
│   └── dolo_interp_rt.R │   └── dolo_interp_rt.R
└── surfex └── surfex
├── PoetEGU_surfex_500.rds ├── PoetEGU_surfex_500.qs2
└── PoetEGU_surfex_500_rt.R └── PoetEGU_surfex_500_rt.R
``` ```
@ -187,7 +187,8 @@ The following parameters can be set:
| **-P, --progress** | | show progress bar | | **-P, --progress** | | show progress bar |
| **--ai-surrogate** | | activates the AI surrogate chemistry model (defaults to _OFF_) | | **--ai-surrogate** | | activates the AI surrogate chemistry model (defaults to _OFF_) |
| **--dht** | | enabling DHT usage (defaults to _OFF_) | | **--dht** | | enabling DHT usage (defaults to _OFF_) |
| **--qs** | | store results using qs::qsave() (.qs extension) instead of default RDS (.rds) | | **--qs** | | store results using qs::qsave() (.qs extension) instead of default qs2 (.qs2) |
| **--rds** | | store results using saveRDS() (.rds extension) instead of default qs2 (.qs2) |
| **--dht-strategy=** | _0-1_ | change DHT strategy. **NOT IMPLEMENTED YET** (Defaults to _0_) | | **--dht-strategy=** | _0-1_ | change DHT strategy. **NOT IMPLEMENTED YET** (Defaults to _0_) |
| **--dht-size=** | _1-n_ | size of DHT per process involved in megabyte (defaults to _1000 MByte_) | | **--dht-size=** | _1-n_ | size of DHT per process involved in megabyte (defaults to _1000 MByte_) |
| **--dht-snaps=** | _0-2_ | disable or enable storage of DHT snapshots | | **--dht-snaps=** | _0-2_ | disable or enable storage of DHT snapshots |
@ -218,7 +219,7 @@ installed POET-dir `<POET_INSTALL_DIR>/bin` and run:
```sh ```sh
cp ../share/poet/barite/barite_het* . cp ../share/poet/barite/barite_het* .
mpirun -n 4 ./poet barite_het_rt.R barite_het.rds output mpirun -n 4 ./poet barite_het_rt.R barite_het.qs2 output
``` ```
After a finished simulation all data generated by POET will be found After a finished simulation all data generated by POET will be found
@ -231,86 +232,42 @@ DHT snapshot shall be produced. This is done by appending the
`--dht-snaps=<value>` option. The resulting call would look like this: `--dht-snaps=<value>` option. The resulting call would look like this:
```sh ```sh
mpirun -n 4 ./poet --dht --dht-snaps=2 barite_het_rt.R barite_het.rds output mpirun -n 4 ./poet --dht --dht-snaps=2 barite_het_rt.R barite_het.qs2 output
``` ```
### Example: Preparing Environment and Running with AI surrogate ### Example: Preparing Environment and Running with AI surrogate
To run the AI surrogate, you need to have a Keras installed in your To run the AI surrogate, you need to install the R package `keras3`. The
Python environment. The implementation in POET is agnostic to the exact compilation process of POET remains the same as shown above.
Keras version, but the provided model file must match your Keras version.
Using Keras 3 with `.keras` model files is recommended. The compilation
process of POET remains mostly the same as shown above, but the CMake
option `-DUSE_AI_SURROGATE=ON` must be set.
To use the AI surrogate, you must declare several values in the R input In the following code block, the installation process on the Turing Cluster is
script. This can be either done directly in the input script or in an shown. `miniconda` is used to create a virtual environment to install
additional file. This file can be provided by adding the file path as the tensorflow/keras. Please adapt the installation process to your needs.
element `ai_surrogate_input_script` to the `chemistry_setup` list in the
R input script.
The following variables and functions must be declared: <!-- Start an R interactive session and install the required packages: -->
- `model_file_path` [*string*]: Path to the Keras model file with which
the AI surrogate model is initialized.
- `validate_predictions(predictors, prediction)` [*function*]: Must return a ```sh
boolean vector of length `nrow(predictions)`. The output of this function # First, install the required R packages
defines which predictions are considered valid and which are rejected. R -e "install.packages('keras3', repos='https://cloud.r-project.org/')"
the predictors and predictions are passed in their original original (not
transformed) scale. Regular simulation will only be done for the rejected
values. The input data of the rejected rows and the respective true results
from simulation will be added to the training data buffer of the AI surrogate
model. Can eg. be implemented as a mass balance threshold between the
predictors and the prediction.
# manually create a virtual environment to install keras/python using conda,
# as this is somehow broken on the Turing Cluster when using the `keras::install_keras()` function
cd poet
The following variables and functions can be declared: # create a virtual environment in the .ai directory with python 3.11
- `batch_size` [*int*]: Batch size for the inference and training functions, conda create -p ./.ai python=3.11
defaults to 2560. conda activate ./.ai
- `training_epochs` [*int*]: Number of training epochs with each training data # install tensorflow and keras
set, defaults to 20. pip install keras tensorflow[and-cuda]
- `training_data_size` [*int*]: Size of the training data buffer. After # add conda's python path to the R environment
the buffer has been filled, the model starts training and removes this amount # make sure to have the conda environment activated
of data from the front of the buffer. Defaults to the size of the Field. echo -e "RETICULATE_PYTHON=$(which python)\n" >> ~/.Renviron
```
- `use_Keras_predictions` [*bool*]: Decides if the Keras prediction function After setup the R environment, recompile POET and you're ready to run the AI
should be used instead of the custom C++ implementation. Keras might be faster surrogate.
for larger models, especially on GPU. The C++ inference function assumes that
the Keras model is a standrad feed forward network with either 32 or 64 bit
precision and ReLU activation. Any model that deviates from this architecture
should activate the Keras prediction function to ensure correct calculation.
Defaults to false.
- `disable_training` [*bool*]: Deactivates the training functions. Defaults to
false.
- `train_only_invalid` [*bool*]: Use only the data from PHREEQC for training
instead of the whole field (which might contain the models own predictions).
Defaults to false.
- `save_model_path` [*string*]: After each training step the current model
is saved to this path as a .keras file.
- `preprocess(df)` [*function*]:
Returns the scaled/transformed data frame. The default implementation uses no
scaling or transformations.
- `postprocess(df)` [*function*]: Returns the rescaled/backtransformed data frame.
The combination of preprocess() and postprocess() is expected to be idempotent.
The default implementation uses no scaling or transformations.
- `assign_clusters(df)` [*function*]: Must return a vector of length
`nrow(predictions)` that contains cluster labels as 0/1. According to these
labels, two separate models will be used for inference and training. Cluster
assignments can e.g. be done for the reactive and non reactive parts of the
field.
- `model_reactive_file_path` [*string*]: Path to the Keras model file with
which the AI surrogate model for the reactive cluster is initialized. If
ommitted, the models for both clusters will be initialized from
`model_file_path`
```sh ```sh
cd <installation_dir>/bin cd <installation_dir>/bin
@ -322,7 +279,7 @@ cp <project_root_dir>/bench/barite/{barite_50ai*,db_barite.dat,barite.pqi} .
./poet_init barite_50ai.R ./poet_init barite_50ai.R
# run POET with AI surrogate and GPU utilization # run POET with AI surrogate and GPU utilization
srun --gres=gpu -N 1 -n 12 ./poet --ai-surrogate barite_50ai_rt.R barite_50ai.rds output srun --gres=gpu -N 1 -n 12 ./poet --ai-surrogate barite_50ai_rt.R barite_50ai.qs2 output
``` ```
Keep in mind that the AI surrogate is currently not stable or might also not Keep in mind that the AI surrogate is currently not stable or might also not
@ -333,7 +290,7 @@ produce any valid predictions.
In order to provide a model to POET, you need to setup a R script In order to provide a model to POET, you need to setup a R script
which can then be used by `poet_init` to generate the simulation which can then be used by `poet_init` to generate the simulation
input. Which parameters are required can be found in the input. Which parameters are required can be found in the
[Wiki](https://git.gfz-potsdam.de/naaice/poet/-/wikis/Initialization). [Wiki](https://git.gfz.de/naaice/poet/-/wikis/Initialization).
We try to keep the document up-to-date. However, if you encounter We try to keep the document up-to-date. However, if you encounter
missing information or need help, please get in touch with us via the missing information or need help, please get in touch with us via the
issue tracker or E-Mail. issue tracker or E-Mail.
@ -347,7 +304,7 @@ issue tracker or E-Mail.
where: where:
- **output** - name of the output file (defaults to the input file - **output** - name of the output file (defaults to the input file
name with the extension `.rds`) name with the extension `.qs2`)
- **setwd** - set the working directory to the directory of the input - **setwd** - set the working directory to the directory of the input
file (e.g. to allow relative paths in the input script). However, file (e.g. to allow relative paths in the input script). However,
the output file will be stored in the directory from which the output file will be stored in the directory from which
@ -361,3 +318,44 @@ important information from the OpenMPI Man Page:
For example, on platforms that support it, the clock_gettime() For example, on platforms that support it, the clock_gettime()
function will be used to obtain a monotonic clock value with whatever function will be used to obtain a monotonic clock value with whatever
precision is supported on that platform (e.g., nanoseconds). precision is supported on that platform (e.g., nanoseconds).
## Additional functions for the AI surrogate
The AI surrogate can be activated for any benchmark and is by default
initiated as a sequential keras model with three hidden layer of depth
48, 96, 24 with relu activation and adam optimizer. All functions in
`ai_surrogate_model.R` can be overridden by adding custom definitions
via an R file in the input script. This is done by adding the path to
this file in the input script. Simply add the path as an element
called `ai_surrogate_input_script` to the `chemistry_setup` list.
Please use the global variable `ai_surrogate_base_path` as a base path
when relative filepaths are used in custom funtions.
**There is currently no default implementation to determine the
validity of predicted values.** This means, that every input script
must include an R source file with a custom function
`validate_predictions(predictors, prediction)`. Examples for custom
functions can be found for the barite_200 benchmark
The functions can be defined as follows:
`validate_predictions(predictors, prediction)`: Returns a boolean
index vector that signals for each row in the predictions if the
values are considered valid. Can eg. be implemented as a mass balance
threshold between the predictors and the prediction.
`initiate_model()`: Returns a keras model. Can be used to load
pretrained models.
`preprocess(df, backtransform = FALSE, outputs = FALSE)`: Returns the
scaled/transformed/backtransformed dataframe. The `backtransform` flag
signals if the current processing step is applied to data that's
assumed to be scaled and expects backtransformed values. The `outputs`
flag signals if the current processing step is applied to the output
or tatget of the model. This can be used to eg. skip these processing
steps and only scale the model input.
`training_step (model, predictor, target, validity)`: Trains the model
after each iteration. `validity` is the bool index vector given by
`validate_predictions` and can eg. be used to only train on values
that have not been valid predictions.

View File

@ -0,0 +1,75 @@
## This file contains default function implementations for the ai surrogate.
## To load pretrained models, use pre-/postprocessing or change hyperparameters
## it is recommended to override these functions with custom implementations via
## the input script. The path to the R-file containing the functions mus be set
## in the variable "ai_surrogate_input_script". See the barite_200.R file as an
## example and the general README for more information.
require(keras3)
require(tensorflow)
initiate_model <- function() {
hidden_layers <- c(48, 96, 24)
activation <- "relu"
loss <- "mean_squared_error"
input_length <- length(ai_surrogate_species)
output_length <- length(ai_surrogate_species)
## Creates a new sequential model from scratch
model <- keras_model_sequential()
## Input layer defined by input data shape
model %>% layer_dense(units = input_length,
activation = activation,
input_shape = input_length,
dtype = "float32")
for (layer_size in hidden_layers) {
model %>% layer_dense(units = layer_size,
activation = activation,
dtype = "float32")
}
## Output data defined by output data shape
model %>% layer_dense(units = output_length,
activation = activation,
dtype = "float32")
model %>% compile(loss = loss,
optimizer = "adam")
return(model)
}
gpu_info <- function() {
msgm(tf_gpu_configured())
}
prediction_step <- function(model, predictors) {
prediction <- predict(model, as.matrix(predictors))
colnames(prediction) <- colnames(predictors)
return(as.data.frame(prediction))
}
preprocess <- function(df, backtransform = FALSE, outputs = FALSE) {
return(df)
}
postprocess <- function(df, backtransform = TRUE, outputs = TRUE) {
return(df)
}
set_valid_predictions <- function(temp_field, prediction, validity) {
temp_field[validity == 1, ] <- prediction[validity == 1, ]
return(temp_field)
}
training_step <- function(model, predictor, target, validity) {
msgm("Training:")
x <- as.matrix(predictor)
y <- as.matrix(target[colnames(x)])
model %>% fit(x, y)
model %>% save_model_tf(paste0(out_dir, "/current_model.keras"))
}

View File

@ -1,30 +0,0 @@
## This file contains default function implementations for the ai surrogate.
## To use pre-/postprocessing it is recommended to override these functions
## with custom implementations via the input script. The path to the R-file
## See the barite_50.R file as an example and the general README for more
## information.
preprocess <- function(df) {
return(df)
}
postprocess <- function(df) {
return(df)
}
set_valid_predictions <- function(temp_field, prediction, validity) {
temp_field[validity == 1, ] <- prediction[validity == 1, ]
return(temp_field)
}
get_invalid_values <- function(df, validity) {
return(df[validity == 0, ])
}
set_field <- function(temp_field, columns, rows, column_name_limit,
byrow = FALSE) {
temp_field <- matrix(temp_field, nrow = rows, byrow = byrow)
temp_field <- setNames(data.frame(temp_field), columns)
temp_field <- temp_field[column_name_limit]
return(temp_field)
}

View File

@ -13,6 +13,34 @@
### this program; if not, write to the Free Software Foundation, Inc., 51 ### this program; if not, write to the Free Software Foundation, Inc., 51
### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##' @param pqc_mat matrix, containing IDs and PHREEQC outputs
##' @param grid matrix, zonation referring to pqc_mat$ID
##' @return a data.frame
# pqc_to_grid <- function(pqc_mat, grid) {
# # Convert the input DataFrame to a matrix
# pqc_mat <- as.matrix(pqc_mat)
# # Flatten the matrix into a vector
# id_vector <- as.integer(t(grid))
# # Find the matching rows in the matrix
# row_indices <- match(id_vector, pqc_mat[, "ID"])
# # Extract the matching rows from pqc_mat to size of grid matrix
# result_mat <- pqc_mat[row_indices, ]
# # Convert the result matrix to a data frame
# res_df <- as.data.frame(result_mat)
# # Remove all columns which only contain NaN
# res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)]
# # Remove row names
# rownames(res_df) <- NULL
# return(res_df)
# }
##' @param pqc_mat matrix, containing IDs and PHREEQC outputs ##' @param pqc_mat matrix, containing IDs and PHREEQC outputs
##' @param grid matrix, zonation referring to pqc_mat$ID ##' @param grid matrix, zonation referring to pqc_mat$ID
##' @return a data.frame ##' @return a data.frame
@ -33,7 +61,7 @@ pqc_to_grid <- function(pqc_mat, grid) {
res_df <- as.data.frame(result_mat) res_df <- as.data.frame(result_mat)
# Remove all columns which only contain NaN # Remove all columns which only contain NaN
res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)] # res_df <- res_df[, colSums(is.na(res_df)) != nrow(res_df)]
# Remove row names # Remove row names
rownames(res_df) <- NULL rownames(res_df) <- NULL

View File

@ -76,10 +76,9 @@ master_iteration_end <- function(setup, state_T, state_C) {
state_C <- data.frame(state_C, check.names = FALSE) state_C <- data.frame(state_C, check.names = FALSE)
ai_surrogate_info <- list( ai_surrogate_info <- list(
prediction_time = if (exists("ai_prediction_time")) ai_prediction_time else NULL, prediction_time = if (exists("ai_prediction_time")) as.integer(ai_prediction_time) else NULL,
predictions_validity = if (exists("validity_vector")) validity_vector else NULL, training_time = if (exists("ai_training_time")) as.integer(ai_training_time) else NULL,
cluster_labels = if (exists("cluster_labels")) cluster_labels else NULL, valid_predictions = if (exists("validity_vector")) validity_vector else NULL
predictions = if (exists("predictions")) predictions else NULL
) )
SaveRObj(x = list( SaveRObj(x = list(
@ -95,7 +94,7 @@ master_iteration_end <- function(setup, state_T, state_C) {
## Add last time step to simulation time ## Add last time step to simulation time
setup$simulation_time <- setup$simulation_time + setup$timesteps[iter] setup$simulation_time <- setup$simulation_time + setup$timesteps[iter]
msgm("done iteration", iter, "/", length(setup$timesteps)) ## msgm("done iteration", iter, "/", length(setup$timesteps))
setup$iter <- setup$iter + 1 setup$iter <- setup$iter + 1
return(setup) return(setup)
} }
@ -110,69 +109,6 @@ msgm <- function(...) {
} }
## Function called by master R process to store on disk all relevant
## parameters for the simulation
StoreSetup <- function(setup, filesim, out_dir) {
to_store <- vector(mode = "list", length = 4)
## names(to_store) <- c("Sim", "Flow", "Transport", "Chemistry", "DHT")
names(to_store) <- c("Sim", "Transport", "DHT", "Cmdline")
## read the setup R file, which is sourced in kin.cpp
tmpbuff <- file(filesim, "r")
setupfile <- readLines(tmpbuff)
close.connection(tmpbuff)
to_store$Sim <- setupfile
## to_store$Flow <- list(
## snapshots = setup$snapshots,
## gridfile = setup$gridfile,
## phase = setup$phase,
## density = setup$density,
## dt_differ = setup$dt_differ,
## prolong = setup$prolong,
## maxiter = setup$maxiter,
## saved_iter = setup$iter_output,
## out_save = setup$out_save )
to_store$Transport <- setup$diffusion
## to_store$Chemistry <- list(
## nprocs = n_procs,
## wp_size = work_package_size,
## base = setup$base,
## first = setup$first,
## init = setup$initsim,
## db = db,
## kin = setup$kin,
## ann = setup$ann)
if (dht_enabled) {
to_store$DHT <- list(
enabled = dht_enabled,
log = dht_log
## signif = dht_final_signif,
## proptype = dht_final_proptype
)
} else {
to_store$DHT <- FALSE
}
if (dht_enabled) {
to_store$DHT <- list(
enabled = dht_enabled,
log = dht_log
# signif = dht_final_signif,
# proptype = dht_final_proptype
)
} else {
to_store$DHT <- FALSE
}
saveRDS(to_store, file = paste0(fileout, "/setup.rds"))
msgm("initialization stored in ", paste0(fileout, "/setup.rds"))
}
GetWorkPackageSizesVector <- function(n_packages, package_size, len) { GetWorkPackageSizesVector <- function(n_packages, package_size, len) {
ids <- rep(1:n_packages, times = package_size, each = 1)[1:len] ids <- rep(1:n_packages, times = package_size, each = 1)[1:len]
return(as.integer(table(ids))) return(as.integer(table(ids)))
@ -180,7 +116,7 @@ GetWorkPackageSizesVector <- function(n_packages, package_size, len) {
## Handler to read R objs from binary files using either builtin ## Handler to read R objs from binary files using either builtin
## readRDS() or qs::qread() based on file extension ## readRDS(), qs::qread() or qs2::qs_read() based on file extension
ReadRObj <- function(path) { ReadRObj <- function(path) {
## code borrowed from tools::file_ext() ## code borrowed from tools::file_ext()
pos <- regexpr("\\.([[:alnum:]]+)$", path) pos <- regexpr("\\.([[:alnum:]]+)$", path)
@ -188,20 +124,88 @@ ReadRObj <- function(path) {
switch(extension, switch(extension,
rds = readRDS(path), rds = readRDS(path),
qs = qs::qread(path) qs = qs::qread(path),
qs2 = qs2::qs_read(path)
) )
} }
## Handler to store R objs to binary files using either builtin ## Handler to store R objs to binary files using either builtin
## saveRDS() or qs::qsave() based on file extension ## saveRDS() or qs::qsave() based on file extension
SaveRObj <- function(x, path) { SaveRObj <- function(x, path) {
msgm("Storing to", path) ## msgm("Storing to", path)
## code borrowed from tools::file_ext() ## code borrowed from tools::file_ext()
pos <- regexpr("\\.([[:alnum:]]+)$", path) pos <- regexpr("\\.([[:alnum:]]+)$", path)
extension <- ifelse(pos > -1L, substring(path, pos + 1L), "") extension <- ifelse(pos > -1L, substring(path, pos + 1L), "")
switch(extension, switch(extension,
rds = saveRDS(object = x, file = path), rds = saveRDS(object = x, file = path),
qs = qs::qsave(x = x, file = path) qs = qs::qsave(x = x, file = path),
qs2 = qs2::qs_save(object = x, file = path)
) )
} }
######## Old relic code
## ## Function called by master R process to store on disk all relevant
## ## parameters for the simulation
## StoreSetup <- function(setup, filesim, out_dir) {
## to_store <- vector(mode = "list", length = 4)
## ## names(to_store) <- c("Sim", "Flow", "Transport", "Chemistry", "DHT")
## names(to_store) <- c("Sim", "Transport", "DHT", "Cmdline")
## ## read the setup R file, which is sourced in kin.cpp
## tmpbuff <- file(filesim, "r")
## setupfile <- readLines(tmpbuff)
## close.connection(tmpbuff)
## to_store$Sim <- setupfile
## ## to_store$Flow <- list(
## ## snapshots = setup$snapshots,
## ## gridfile = setup$gridfile,
## ## phase = setup$phase,
## ## density = setup$density,
## ## dt_differ = setup$dt_differ,
## ## prolong = setup$prolong,
## ## maxiter = setup$maxiter,
## ## saved_iter = setup$iter_output,
## ## out_save = setup$out_save )
## to_store$Transport <- setup$diffusion
## ## to_store$Chemistry <- list(
## ## nprocs = n_procs,
## ## wp_size = work_package_size,
## ## base = setup$base,
## ## first = setup$first,
## ## init = setup$initsim,
## ## db = db,
## ## kin = setup$kin,
## ## ann = setup$ann)
## if (dht_enabled) {
## to_store$DHT <- list(
## enabled = dht_enabled,
## log = dht_log
## ## signif = dht_final_signif,
## ## proptype = dht_final_proptype
## )
## } else {
## to_store$DHT <- FALSE
## }
## if (dht_enabled) {
## to_store$DHT <- list(
## enabled = dht_enabled,
## log = dht_log
## # signif = dht_final_signif,
## # proptype = dht_final_proptype
## )
## } else {
## to_store$DHT <- FALSE
## }
## saveRDS(to_store, file = paste0(fileout, "/setup.rds"))
## msgm("initialization stored in ", paste0(fileout, "/setup.rds"))
## }

View File

@ -7,7 +7,7 @@ function(ADD_BENCH_TARGET TARGET POET_BENCH_LIST RT_FILES OUT_PATH)
foreach(BENCH_FILE ${${POET_BENCH_LIST}}) foreach(BENCH_FILE ${${POET_BENCH_LIST}})
get_filename_component(BENCH_NAME ${BENCH_FILE} NAME_WE) get_filename_component(BENCH_NAME ${BENCH_FILE} NAME_WE)
set(OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/${BENCH_NAME}) set(OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/${BENCH_NAME})
set(OUT_FILE_EXT ${OUT_FILE}.qs) set(OUT_FILE_EXT ${OUT_FILE}.qs2)
add_custom_command( add_custom_command(
OUTPUT ${OUT_FILE_EXT} OUTPUT ${OUT_FILE_EXT}

21
bench/barite/barite_200.R Executable file → Normal file
View File

@ -35,19 +35,20 @@ diffusion_setup <- list(
) )
dht_species <- c( dht_species <- c(
"H" = 7, "H" = 3,
"O" = 7, "O" = 3,
"Charge" = 4, "Charge" = 6,
"Ba" = 7, "Ba" = 6,
"Cl" = 7, "Cl" = 6,
"S(6)" = 7, "S" = 6,
"Sr" = 7, "Sr" = 6,
"Barite" = 4, "Barite" = 5,
"Celestite" = 4 "Celestite" = 5
) )
chemistry_setup <- list( chemistry_setup <- list(
dht_species = dht_species dht_species = dht_species,
ai_surrogate_input_script = "./barite_200ai_surrogate_input_script.R"
) )
# Define a setup list for simulation configuration # Define a setup list for simulation configuration

View File

@ -0,0 +1,48 @@
## load a pretrained model from tensorflow file
## Use the global variable "ai_surrogate_base_path" when using file paths
## relative to the input script
initiate_model <- function() {
init_model <- normalizePath(paste0(ai_surrogate_base_path,
"model_min_max_float64.keras"))
return(load_model_tf(init_model))
}
scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) {
return((x * (max - min)) + min)
} else {
return((x - min) / (max - min))
}
}
preprocess <- function(df, backtransform = FALSE, outputs = FALSE) {
minmax_file <- normalizePath(paste0(ai_surrogate_base_path,
"min_max_bounds.rds"))
global_minmax <- readRDS(minmax_file)
for (column in colnames(df)) {
df[column] <- lapply(df[column],
scale_min_max,
global_minmax$min[column],
global_minmax$max[column],
backtransform)
}
return(df)
}
mass_balance <- function(predictors, prediction) {
dBa <- abs(prediction$Ba + prediction$Barite -
predictors$Ba - predictors$Barite)
dSr <- abs(prediction$Sr + prediction$Celestite -
predictors$Sr - predictors$Celestite)
return(dBa + dSr)
}
validate_predictions <- function(predictors, prediction) {
epsilon <- 3e-5
mb <- mass_balance(predictors, prediction)
msgm("Mass balance mean:", mean(mb))
msgm("Mass balance variance:", var(mb))
msgm("Rows where mass balance meets threshold", epsilon, ":",
sum(mb < epsilon))
return(mb < epsilon)
}

24
bench/barite/barite_50ai.R Executable file → Normal file
View File

@ -11,9 +11,9 @@ grid_def <- matrix(2, nrow = rows, ncol = cols)
grid_setup <- list( grid_setup <- list(
pqc_in_file = "./barite.pqi", pqc_in_file = "./barite.pqi",
pqc_db_file = "./db_barite.dat", ## Path to the database file for Phreeqc pqc_db_file = "./db_barite.dat", ## Path to the database file for Phreeqc
grid_def = grid_def, ## Definition of the grid, containing IDs according to the Phreeqc input script grid_def = grid_def, ## Definition of the grid, containing IDs according to the Phreeqc input script
grid_size = c(s_rows, s_cols), ## Size of the grid in meters grid_size = c(s_rows, s_cols), ## Size of the grid in meters
constant_cells = c() ## IDs of cells with constant concentration constant_cells = c() ## IDs of cells with constant concentration
) )
bound_length <- 2 bound_length <- 2
@ -36,15 +36,15 @@ diffusion_setup <- list(
) )
dht_species <- c( dht_species <- c(
"H" = 4, "H" = 3,
"O" = 10, "O" = 3,
"Charge" = 4, "Charge" = 3,
"Ba" = 7, "Ba" = 6,
"Cl" = 4, "Cl" = 6,
"S(6)" = 7, "S" = 6,
"Sr" = 4, "Sr" = 6,
"Barite" = 2, "Barite" = 5,
"Celestite" = 2 "Celestite" = 5
) )
chemistry_setup <- list( chemistry_setup <- list(

0
bench/barite/barite_50ai_rt.R Executable file → Normal file
View File

View File

@ -1,148 +1,69 @@
## Time-stamp: "Last modified 2024-05-30 13:27:06 delucia" ## Time-stamp: "Last modified 2024-05-30 13:27:06 delucia"
## load a pretrained model from tensorflow file
## Use the global variable "ai_surrogate_base_path" when using file paths ## Use the global variable "ai_surrogate_base_path" when using file paths
## relative to the input script ## relative to the input script
model_file_path <- normalizePath(paste0(ai_surrogate_base_path, initiate_model <- function() {
"barite_50ai_all.keras")) require(keras3)
require(tensorflow)
batch_size <- 1280 init_model <- normalizePath(paste0(ai_surrogate_base_path,
training_epochs <- 20 "barite_50ai_all.keras"))
save_model_path <- "current_model.keras" Model <- keras3::load_model(init_model)
msgm("Loaded model:")
print(str(Model))
return(Model)
}
scale_min_max <- function(x, min, max, backtransform) { scale_min_max <- function(x, min, max, backtransform) {
if (backtransform) { if (backtransform) {
return((x * (max - min)) + min) return((x * (max - min)) + min)
} else { } else {
return((x - min) / (max - min)) return((x - min) / (max - min))
} }
} }
## Apply decimal logarithms handling negative values minmax <- list(min = c(H = 111.012433592824, O = 55.5062185549492, Charge = -3.1028354471876e-08,
Safelog <- function (vec) { Ba = 1.87312878574393e-141, Cl = 0, `S(6)` = 4.24227510643685e-07,
rv <- range(vec) Sr = 0.00049382996130541, Barite = 0.000999542409828586, Celestite = 0.244801877115968),
if (max(abs(rv)) < 1) { max = c(H = 111.012433679682, O = 55.5087003521685, Charge = 5.27666636082035e-07,
ret <- vec Ba = 0.0908849779513762, Cl = 0.195697626449355, `S(6)` = 0.000620774752665846,
ret[vec == 0] <- 0 Sr = 0.0558680070692722, Barite = 0.756779139057097, Celestite = 1.00075422160624
ret[vec > 0] <- log10(vec[vec > 0]) ))
ret[vec < 0] <- -log10(-vec[vec < 0])
} else {
ret <- log10(abs(vec))
}
ret
}
Safeexp <- function (vec) {
ret <- vec
ret[vec == 0] <- 0
ret[vec > 0] <- -10^(-vec[vec > 0])
ret[vec < 0] <- 10^(vec[vec < 0])
ret
}
##' @title Apply transformations to cols of a data.frame
##' @param df named data.frame
##' @param tlist list of function names
##' @return data.frame with the columns specified in tlist and the
##' transformed numerical values
##' @author MDL
TransfList <- function (df, tlist) {
vars <- intersect(colnames(df), names(tlist))
lens <- sapply(tlist[vars], length)
n <- max(lens)
filledlist <- lapply(tlist[vars],
function(x)
if (length(x) < n)
return(c(x, rep("I", n - length(x))))
else
return(x))
tmp <- df[, vars]
for (i in seq_len(n)) {
tmp <- as.data.frame(sapply(vars, function(x)
do.call(filledlist[[x]][i], list(tmp[, x]))))
}
return(tmp)
}
##' This function applies some specified string substitution such as
##' 's/log/exp/' so that from a list of "forward" transformation
##' functions it computes a "backward" one
##' @title Apply back transformations to cols of a data.frame
##' @param df named data.frame
##' @param tlist list of function names
##' @return data.frame with the columns specified in tlist and the
##' backtransformed numerical values
##' @author MDL
BackTransfList <- function (df, tlist) {
vars <- intersect(colnames(df), names(tlist))
lens <- sapply(tlist[vars], length)
n <- max(lens)
filledlist <- lapply(tlist[vars],
function(x)
if (length(x) < n)
return(c(x, rep("I", n - length(x))))
else
return(x))
rlist <- lapply(filledlist, rev)
rlist <- lapply(rlist, sub, pattern = "log", replacement = "exp")
rlist <- lapply(rlist, sub, pattern = "1p", replacement = "m1")
rlist <- lapply(rlist, sub, pattern = "Mul", replacement = "Div")
tmp <- df[, vars]
for (i in seq_len(n)) {
tmp <- as.data.frame(sapply(vars, function(x)
do.call(rlist[[x]][i], list(tmp[, x]))))
}
return(tmp)
}
tlist <- list("H" = "log1p", "O" = "log1p", "Charge" = "Safelog",
"Ba" = "log1p", "Cl" = "log1p", "S(6)" = "log1p",
"Sr" = "log1p", "Barite" = "log1p", "Celestite" = "log1p")
minmaxlog <- list(min = c(H = 4.71860987935512, O = 4.03435069501537,
Charge = -14.5337693764617, Ba = 1.87312878574393e-141,
Cl = 0, `S(6)` = 4.2422742065922e-07,
Sr = 0.00049370806741832, Barite = 0.000999043199940672,
Celestite = 0.218976382406766),
max = c(H = 4.71860988013054, O = 4.03439461483133,
Charge = 12.9230752782909, Ba = 0.086989273200771,
Cl = 0.178729802869381, `S(6)` = 0.000620582151722819,
Sr = 0.0543631841661421, Barite = 0.56348209786429,
Celestite = 0.69352422027466))
preprocess <- function(df) { preprocess <- function(df) {
if (!is.data.frame(df)) if (!is.data.frame(df))
df <- as.data.frame(df, check.names = FALSE) df <- as.data.frame(df, check.names = FALSE)
tlog <- TransfList(df, tlist)
as.data.frame(lapply(colnames(tlog), as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x = tlog[x], function(x) scale_min_max(x=df[x],
min = minmaxlog$min[x], min=minmax$min[x],
max = minmaxlog$max[x], max=minmax$max[x],
backtransform = FALSE)), backtransform=FALSE)),
check.names = FALSE) check.names = FALSE)
} }
postprocess <- function(df) { postprocess <- function(df) {
if (!is.data.frame(df)) if (!is.data.frame(df))
df <- as.data.frame(df, check.names = FALSE) df <- as.data.frame(df, check.names = FALSE)
ret <- as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x = df[x],
min = minmaxlog$min[x],
max = minmaxlog$max[x],
backtransform = TRUE)),
check.names = FALSE)
BackTransfList(ret, tlist) as.data.frame(lapply(colnames(df),
function(x) scale_min_max(x=df[x],
min=minmax$min[x],
max=minmax$max[x],
backtransform=TRUE)),
check.names = FALSE)
} }
mass_balance <- function(x, y) { mass_balance <- function(predictors, prediction) {
dBa <- abs(y$Ba + y$Barite - dBa <- abs(prediction$Ba + prediction$Barite -
(x$Ba + x$Barite)) predictors$Ba - predictors$Barite)
dSr <- abs(y$Sr + y$Celestite - dSr <- abs(prediction$Sr + prediction$Celestite -
(x$Sr + x$Celestite)) predictors$Sr - predictors$Celestite)
return(dBa + dSr) return(dBa + dSr)
} }
validate_predictions <- function(predictors, prediction) { validate_predictions <- function(predictors, prediction) {
epsilon <- 1E-5 epsilon <- 1E-7
mb <- mass_balance(predictors, prediction) mb <- mass_balance(predictors, prediction)
msgm("Mass balance mean:", mean(mb)) msgm("Mass balance mean:", mean(mb))
msgm("Mass balance variance:", var(mb)) msgm("Mass balance variance:", var(mb))
@ -151,3 +72,19 @@ validate_predictions <- function(predictors, prediction) {
sum(ret)) sum(ret))
return(ret) return(ret)
} }
training_step <- function(model, predictor, target, validity) {
msgm("Starting incremental training:")
## x <- as.matrix(predictor)
## y <- as.matrix(target[colnames(x)])
history <- model %>% keras3::fit(x = data.matrix(predictor),
y = data.matrix(target),
epochs = 10, verbose=1)
keras3::save_model(model,
filepath = paste0(out_dir, "/current_model.keras"),
overwrite=TRUE)
return(model)
}

View File

@ -41,36 +41,36 @@ diffusion_setup <- list(
) )
check_sign_cal_dol_dht <- function(old, new) { check_sign_cal_dol_dht <- function(old, new) {
if ((old["Calcite"] == 0) != (new["Calcite"] == 0)) { # if ((old["Calcite"] == 0) != (new["Calcite"] == 0)) {
return(TRUE) # return(TRUE)
} # }
if ((old["Dolomite"] == 0) != (new["Dolomite"] == 0)) { # if ((old["Dolomite"] == 0) != (new["Dolomite"] == 0)) {
return(TRUE) # return(TRUE)
} # }
return(FALSE) return(FALSE)
} }
fuzz_input_dht_keys <- function(input) { # fuzz_input_dht_keys <- function(input) {
dht_species <- c( # dht_species <- c(
"H" = 3, # "H" = 3,
"O" = 3, # "O" = 3,
"Charge" = 3, # "Charge" = 3,
"C(4)" = 6, # "C" = 6,
"Ca" = 6, # "Ca" = 6,
"Cl" = 3, # "Cl" = 3,
"Mg" = 5, # "Mg" = 5,
"Calcite" = 4, # "Calcite" = 4,
"Dolomite" = 4 # "Dolomite" = 4
) # )
return(input[names(dht_species)]) # return(input[names(dht_species)])
} # }
check_sign_cal_dol_interp <- function(to_interp, data_set) { check_sign_cal_dol_interp <- function(to_interp, data_set) {
dht_species <- c( dht_species <- c(
"H" = 3, "H" = 3,
"O" = 3, "O" = 3,
"Charge" = 3, "Charge" = 3,
"C(4)" = 6, "C" = 6,
"Ca" = 6, "Ca" = 6,
"Cl" = 3, "Cl" = 3,
"Mg" = 5, "Mg" = 5,
@ -95,7 +95,7 @@ check_neg_cal_dol <- function(result) {
# significant digits) # significant digits)
pht_species <- c( pht_species <- c(
"C(4)" = 3, "C" = 3,
"Ca" = 3, "Ca" = 3,
"Mg" = 2, "Mg" = 2,
"Calcite" = 2, "Calcite" = 2,
@ -107,7 +107,7 @@ chemistry_setup <- list(
"H" = 3, "H" = 3,
"O" = 3, "O" = 3,
"Charge" = 3, "Charge" = 3,
"C(4)" = 6, "C" = 6,
"Ca" = 6, "Ca" = 6,
"Cl" = 3, "Cl" = 3,
"Mg" = 5, "Mg" = 5,
@ -117,7 +117,7 @@ chemistry_setup <- list(
pht_species = pht_species, pht_species = pht_species,
hooks = list( hooks = list(
dht_fill = check_sign_cal_dol_dht, dht_fill = check_sign_cal_dol_dht,
dht_fuzz = fuzz_input_dht_keys, # dht_fuzz = fuzz_input_dht_keys,
interp_pre = check_sign_cal_dol_interp, interp_pre = check_sign_cal_dol_interp,
interp_post = check_neg_cal_dol interp_post = check_neg_cal_dol
) )

Binary file not shown.

View File

@ -0,0 +1,102 @@
% Created 2024-12-11 mer 23:24
% Intended LaTeX compiler: pdflatex
\documentclass[a4paper, 9pt]{article}
\usepackage[utf8]{inputenc}
\usepackage[T1]{fontenc}
\usepackage{graphicx}
\usepackage{longtable}
\usepackage{wrapfig}
\usepackage{rotating}
\usepackage[normalem]{ulem}
\usepackage{amsmath}
\usepackage{amssymb}
\usepackage{capt-of}
\usepackage{hyperref}
\usepackage{fullpage}
\usepackage{amsmath}
\usepackage{graphicx}
\usepackage{charter}
\usepackage{listings}
\lstloadlanguages{R}
\author{MDL <delucia@gfz.de>}
\date{2024-12-11}
\title{A \texttt{barite}-based benchmark for FGCS interpolation paper}
\begin{document}
\maketitle
\section{Description}
\label{sec:org739879a}
\begin{itemize}
\item \texttt{barite\_fgcs\_2.R}: POET input script with circular
"crystals" on a 200x200 nodes grid
\item \(\alpha\): isotropic 10\textsuperscript{-5}
m\textsuperscript{2}/s outside of the crystals,
10\textsuperscript{-7} inside
\item 200 iterations, dt = 1000
\item \texttt{barite\_fgcs\_2.pqi}: PHREEQC input, 4 SOLUTIONS
(basically the same as in \texttt{barite} benchmark):
\begin{enumerate}
\item Equilibrium with Celestite, no mineral \(Rightarrow\)
\item Equilibrium with Celestite, KINETICS Celestite (1 mol) and
Barite (0 mol)
\item Injection of 0.1 BaCl2 from NW corner
\item Injection of 0.2 BaCl2 from SE corner
\end{enumerate}
\item \texttt{db\_barite.dat}: PHREEQC database containing the kinetic
expressions for barite and celestite, stripped down from
\texttt{phreeqc.dat}
\end{itemize}
\begin{figure}[htbp]
\centering
\includegraphics[width=0.48\textwidth]{./fgcs_Celestite_init.pdf}
\includegraphics[width=0.48\textwidth]{./fgcs_Barite_200.pdf}
\caption{\textbf{Left:} Initial distribution of Celestite
"crystals". \textbf{Right:} precipitated Barite}
\end{figure}
\section{Interpolation}
\label{sec:org2a09431}
Using the following parametrization:
\begin{lstlisting}
dht_species <- c("H" = 7,
"O" = 7,
"Ba" = 7,
"Cl" = 7,
"S(6)" = 7,
"Sr" = 7,
"Barite" = 4,
"Celestite" = 4)
pht_species <- c("Ba" = 4,
"Cl" = 3,
"S(6)" = 3,
"Sr" = 3,
"Barite" = 2,
"Celestite" = 2 )
\end{lstlisting}
Runtime goes from 1800 to 600 s (21 CPUs) but there are "suspect"
errors especially in O and H, where "suspect" means some values appear
to be multiplied by 2:
\begin{figure}[htbp]
\centering
\includegraphics[width=0.9\textwidth]{./fgcs_interp_1.pdf}
\caption{Scatterplots reference vs interpolated after 1 coupling
iteration}
\end{figure}
\end{document}
%%% Local Variables:
%%% mode: LaTeX
%%% TeX-master: t
%%% End:

90
bench/fgcs/EvalFGCS.R Normal file
View File

@ -0,0 +1,90 @@
## Time-stamp: "Last modified 2024-12-11 23:21:25 delucia"
library(PoetUtils)
library(viridis)
res <- ReadPOETSims("./res_fgcs2_96/")
pp <- PlotField(res$iter_200$C$Barite, rows = 200, cols = 200, contour = FALSE,
nlevels=12, palette=terrain.colors)
cairo_pdf("fgcs_Celestite_init.pdf", family="serif")
par(mar=c(0,0,0,0))
pp <- PlotField((res$iter_000$Celestite), rows = 200, cols = 200,
contour = FALSE, breaks=c(-0.5,0.5,1.5),
palette = grey.colors, plot.axes = FALSE, scale = FALSE,
main="Initial Celestite crystals")
dev.off()
cairo_pdf("fgcs_Ba_init.pdf", family="serif")
par(mar=c(0,0,0,0))
pp <- PlotField(log10(res$iter_001$C$Cl), rows = 200, cols = 200,
contour = FALSE,
palette = terrain.colors, plot.axes = FALSE, scale = FALSE,
main="log10(Ba)")
dev.off()
pp <- PlotField(log10(res$iter_002$C$Ba), rows = 200, cols = 200,
contour = FALSE, palette = viridis, rev.palette = FALSE,
main = "log10(Ba) after 5 iterations")
pp <- PlotField(log10(res$iter_200$C$`S(6)`), rows = 200, cols = 200, contour = FALSE)
str(res$iter_00)
res$iter_178$C$Barite
pp <- res$iter_043$C$Barite
breaks <- pretty(pp, n = 5)
br <- c(0, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1)
pp <- PlotField(res$iter_200$C$Barite, rows = 200, cols = 200, contour = FALSE,
breaks = br, palette=terrain.colors)
cairo_pdf("fgcs_Barite_200.pdf", family="serif")
pp <- PlotField(log10(res$iter_200$C$Barite), rows = 200, cols = 200,
contour = FALSE, palette = terrain.colors, plot.axes = FALSE,
rev.palette = FALSE, main = "log10(Barite) after 200 iter")
dev.off()
ref <- ReadPOETSims("./res_fgcs_2_ref")
rei <- ReadPOETSims("./res_fgcs_2_interp1/")
timref <- ReadRObj("./res_fgcs_2_ref/timings.qs")
timint <- ReadRObj("./res_fgcs_2_interp1/timings.qs")
timref
timint
wch <- c("H","O", "Ba", "Sr","Cl", "S(6)")
rf <- data.matrix(ref$iter_001$C[, wch])
r1 <- data.matrix(rei$iter_001$C[, wch])
r1[is.nan(r1)] <- NA
rf[is.nan(rf)] <- NA
cairo_pdf("fgcs_interp_1.pdf", family="serif", width = 10, height = 7)
PlotScatter(rf, r1, which = wch, labs = c("ref", "interp"), cols = 3, log="", las = 1, pch=4)
dev.off()
head(r1)
head(rf)
rf$O
r1$O

2
bench/fgcs/README.org Normal file
View File

@ -0,0 +1,2 @@
* Refer to the LaTeX file (and pdf) for more information

105
bench/fgcs/barite_fgcs_2.R Normal file
View File

@ -0,0 +1,105 @@
## Time-stamp: "Last modified 2024-12-11 16:08:11 delucia"
cols <- 1000
rows <- 1000
dim_cols <- 50
dim_rows <- 50
ncirc <- 20 ## number of crystals
rmax <- cols / 10 ## max radius (in nodes)
set.seed(22933)
centers <- cbind(sample(seq_len(cols), ncirc), sample(seq_len(rows), ncirc))
radii <- sample(seq_len(rmax), ncirc, replace = TRUE)
mi <- matrix(rep(seq_len(cols), rows), byrow = TRUE, nrow = rows)
mj <- matrix(rep(seq_len(cols), each = rows), byrow = TRUE, nrow = rows)
tmpl <- lapply(seq_len(ncirc), function(x) which((mi - centers[x, 1])^2 + (mj - centers[x, 2])^2 < radii[x]^2, arr.ind = TRUE))
inds <- do.call(rbind, tmpl)
grid <- matrix(1, nrow = rows, ncol = cols)
grid[inds] <- 2
alpha <- matrix(1e-5, ncol = cols, nrow = rows)
alpha[inds] <- 1e-7
## image(grid, asp=1)
## Define grid configuration for POET model
grid_setup <- list(
pqc_in_file = "./barite_fgcs_2.pqi",
pqc_db_file = "../barite/db_barite.dat", ## database file
grid_def = grid, ## grid definition, IDs according to the Phreeqc input
grid_size = c(dim_cols, dim_rows), ## grid size in meters
constant_cells = c() ## IDs of cells with constant concentration
)
bound_length <- cols / 10
bound_N <- list(
"type" = rep("constant", bound_length),
"sol_id" = rep(3, bound_length),
"cell" = seq(1, bound_length)
)
bound_W <- list(
"type" = rep("constant", bound_length),
"sol_id" = rep(3, bound_length),
"cell" = seq(1, bound_length)
)
bound_E <- list(
"type" = rep("constant", bound_length),
"sol_id" = rep(4, bound_length),
"cell" = seq(rows - bound_length + 1, rows)
)
bound_S <- list(
"type" = rep("constant", bound_length),
"sol_id" = rep(4, bound_length),
"cell" = seq(cols - bound_length + 1, cols)
)
diffusion_setup <- list(
boundaries = list(
"W" = bound_W,
"N" = bound_N,
"E" = bound_E,
"S" = bound_S
),
alpha_x = alpha,
alpha_y = alpha
)
dht_species <- c(
"H" = 7,
"O" = 7,
"Ba" = 7,
"Cl" = 7,
"S" = 7,
"Sr" = 7,
"Barite" = 4,
"Celestite" = 4
)
pht_species <- c(
"Ba" = 4,
"Cl" = 3,
"S" = 3,
"Sr" = 3,
"Barite" = 0,
"Celestite" = 0
)
chemistry_setup <- list(
dht_species = dht_species,
pht_species = pht_species
)
## Define a setup list for simulation configuration
setup <- list(
Grid = grid_setup, ## Parameters related to the grid structure
Diffusion = diffusion_setup, ## Parameters related to the diffusion process
Chemistry = chemistry_setup
)

View File

@ -0,0 +1,49 @@
SOLUTION 1
units mol/kgw
water 1
temperature 25
pH 7.008
pe 10.798
S 6.205e-04
Sr 6.205e-04
END
SOLUTION 2
units mol/kgw
water 1
temperature 25
pH 7.008
pe 10.798
S 6.205e-04
Sr 6.205e-04
KINETICS 2
Barite
-m 0.00
-parms 50. # reactive surface area
-tol 1e-9
Celestite
-m 1
-parms 10.0 # reactive surface area
-tol 1e-9
END
SOLUTION 3
units mol/kgw
water 1
temperature 25
Ba 0.1
Cl 0.2
END
SOLUTION 4
units mol/kgw
water 1
temperature 25
Ba 0.2
Cl 0.4
END
RUN_CELLS
-cells 1 2 3 4
END

View File

@ -0,0 +1,7 @@
iterations <- 200
dt <- 1000
list(
timesteps = rep(dt, iterations),
store_result = TRUE
)

281
docs/POET.drawio Normal file

File diff suppressed because one or more lines are too long

4
docs/POET_scheme.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 813 KiB

@ -1 +0,0 @@
Subproject commit e6e5e0d5156c093241a53e6ce074ef346d64ae26

1
ext/litephreeqc Submodule

@ -0,0 +1 @@
Subproject commit 953c752431d2b2758268083f407f943843efc7ad

@ -1 +1 @@
Subproject commit 449647010ab9cdf9e405139f360424a2b21ab3ab Subproject commit 9c4aeee410c71d064f7567143d4f8d6451ade75a

View File

@ -1,7 +1,4 @@
add_library(POETLib) add_library(POETLib
target_sources(POETLib
PRIVATE
Init/InitialList.cpp Init/InitialList.cpp
Init/GridInit.cpp Init/GridInit.cpp
Init/DiffusionInit.cpp Init/DiffusionInit.cpp
@ -18,73 +15,30 @@ target_sources(POETLib
Chemistry/SurrogateModels/ProximityHashTable.cpp Chemistry/SurrogateModels/ProximityHashTable.cpp
) )
target_include_directories( set(POET_TUG_APPROACH "Implicit" CACHE STRING "tug numerical approach to use")
POETLib PUBLIC set_property(CACHE POET_TUG_APPROACH PROPERTY STRINGS "Implicit" "Explicit")
"${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_BINARY_DIR}"
)
option(USE_AI_SURROGATE "Compiles the AI surrogate functions with Python.h integration" OFF)
if(USE_AI_SURROGATE)
add_definitions(-DUSE_AI_SURROGATE)
message("Building with AI surrogate functions")
message(" -- Needs Python (with Numpy & Keras) and Threads libraries")
find_package(CUDA)
IF (${CUDA_FOUND})
message(" -- Setting TensorFlow CUDA path to: ${CUDA_TOOLKIT_ROOT_DIR}")
ELSE (${CUDA_FOUND})
message(" -- No CUDA installation found for TensorFlow ")
ENDIF (${CUDA_FOUND})
# make sure to use the python installation from the conda environment
if(DEFINED ENV{CONDA_PREFIX})
set(Python3_EXECUTABLE "$ENV{CONDA_PREFIX}/bin/python3")
endif()
find_package(Python3 COMPONENTS Interpreter Development NumPy REQUIRED)
find_package(Threads REQUIRED)
set_source_files_properties(
Chemistry/SurrogateModels/AI_functions.cpp
PROPERTIES COMPILE_FLAGS "-O3 -march=native -mtune=native"
)
target_sources(POETLib
PRIVATE
Chemistry/SurrogateModels/AI_functions.cpp
)
target_include_directories(POETLib PUBLIC
"${Python3_INCLUDE_DIRS}"
"${Python3_NumPy_INCLUDE_DIRS}"
)
target_link_libraries(POETLib
PUBLIC "${Python3_LIBRARIES}"
PUBLIC Threads::Threads pthread
)
endif() # USE_AI_SURROGATE
if (POET_TUG_APPROACH STREQUAL "Implicit")
target_compile_definitions(POETLib PRIVATE POET_TUG_BTCS)
elseif (POET_TUG_APPROACH STREQUAL "Explicit")
target_compile_definitions(POETLib PRIVATE POET_TUG_FTCS)
endif()
target_include_directories(POETLib PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}")
target_link_libraries( target_link_libraries(
POETLib POETLib
PUBLIC RRuntime PUBLIC RRuntime
PUBLIC IPhreeqcPOET PUBLIC IPhreeqcPOET
PUBLIC tug PUBLIC tug
PUBLIC MPI::MPI_C PUBLIC MPI::MPI_C
) )
# Define Python API version
target_compile_definitions(POETLib PRIVATE NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION)
include(FetchContent) include(FetchContent)
FetchContent_Declare( FetchContent_Declare(
cli11 cli11
QUIET QUIET
GIT_REPOSITORY https://github.com/CLIUtils/CLI11.git GIT_REPOSITORY https://github.com/CLIUtils/CLI11.git
GIT_TAG v2.4.2 GIT_TAG v2.5.0
) )
FetchContent_MakeAvailable(cli11) FetchContent_MakeAvailable(cli11)
@ -132,11 +86,10 @@ target_compile_definitions(POETLib PUBLIC STRICT_R_HEADERS OMPI_SKIP_MPICXX)
file(READ "${PROJECT_SOURCE_DIR}/R_lib/kin_r_library.R" R_KIN_LIB ) file(READ "${PROJECT_SOURCE_DIR}/R_lib/kin_r_library.R" R_KIN_LIB )
file(READ "${PROJECT_SOURCE_DIR}/R_lib/init_r_lib.R" R_INIT_LIB) file(READ "${PROJECT_SOURCE_DIR}/R_lib/init_r_lib.R" R_INIT_LIB)
file(READ "${PROJECT_SOURCE_DIR}/R_lib/ai_surrogate_model_functions.R" R_AI_SURROGATE_LIB) file(READ "${PROJECT_SOURCE_DIR}/R_lib/ai_surrogate_model.R" R_AI_SURROGATE_LIB)
configure_file(poet.hpp.in poet.hpp @ONLY) configure_file(poet.hpp.in poet.hpp @ONLY)
add_executable(poet poet.cpp) add_executable(poet poet.cpp)
target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11) target_link_libraries(poet PRIVATE POETLib MPI::MPI_C RRuntime CLI11::CLI11)
target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") target_include_directories(poet PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")

View File

@ -14,10 +14,7 @@ struct WorkPackage {
std::vector<std::vector<double>> output; std::vector<std::vector<double>> output;
std::vector<std::uint8_t> mapping; std::vector<std::uint8_t> mapping;
WorkPackage(std::size_t _size) : size(_size) { WorkPackage(std::size_t _size)
input.resize(size); : size(_size), input(size), output(size), mapping(size, CHEM_PQC) {}
output.resize(size);
mapping.resize(size, CHEM_PQC);
}
}; };
} // namespace poet } // namespace poet

View File

@ -1,11 +1,14 @@
#include "ChemistryModule.hpp" #include "ChemistryModule.hpp"
#include "PhreeqcEngine.hpp" #include "PhreeqcEngine.hpp"
#include "PhreeqcMatrix.hpp"
#include "PhreeqcRunner.hpp"
#include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp" #include "SurrogateModels/Interpolation.hpp"
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <cmath>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <mpi.h> #include <mpi.h>
@ -63,7 +66,8 @@ inverseDistanceWeighting(const std::vector<std::int32_t> &to_calc,
distance += std::pow( distance += std::pow(
rescaled[key_comp_i][point_i] - rescaled[key_comp_i][data_set_n], 2); rescaled[key_comp_i][point_i] - rescaled[key_comp_i][data_set_n], 2);
} }
weights[point_i] = 1 / std::sqrt(distance);
weights[point_i] = distance != 0 ? 1 / std::sqrt(distance) : 0;
assert(!std::isnan(weights[point_i])); assert(!std::isnan(weights[point_i]));
inv_sum += weights[point_i]; inv_sum += weights[point_i];
} }
@ -94,63 +98,9 @@ inverseDistanceWeighting(const std::vector<std::int32_t> &to_calc,
key_delta /= inv_sum; key_delta /= inv_sum;
results[output_comp_i] = from[output_comp_i] + key_delta; results[output_comp_i] = from[output_comp_i] + key_delta;
assert(!std::isnan(results[output_comp_i]));
} }
// if (!has_h) {
// double new_val = 0;
// for (int j = 0; j < data_set_n; j++) {
// new_val += weights[j] * output[j][0];
// }
// results[0] = new_val / inv_sum;
// }
// if (!has_h) {
// double new_val = 0;
// for (int j = 0; j < data_set_n; j++) {
// new_val += weights[j] * output[j][1];
// }
// results[1] = new_val / inv_sum;
// }
// for (std::uint32_t i = 0; i < to_calc.size(); i++) {
// const std::uint32_t interp_i = to_calc[i];
// // rescale input between 0 and 1
// for (int j = 0; j < input.size(); j++) {
// buffer[j] = input[j].at(i);
// }
// buffer[buffer_size - 1] = from[interp_i];
// const double min = *std::min_element(buffer, buffer + buffer_size);
// const double max = *std::max_element(buffer, buffer + buffer_size);
// for (int j = 0; j < input.size(); j++) {
// buffer[j] = ((max - min) != 0 ? (buffer[j] - min) / (max - min) : 1);
// }
// from_rescaled =
// ((max - min) != 0 ? (from[interp_i] - min) / (max - min) : 0);
// double inv_sum = 0;
// // calculate distances for each point
// for (int i = 0; i < input.size(); i++) {
// const double distance = std::pow(buffer[i] - from_rescaled, 2);
// buffer[i] = distance > 0 ? (1 / std::sqrt(distance)) : 0;
// inv_sum += buffer[i];
// }
// // calculate new values
// double new_val = 0;
// for (int i = 0; i < output.size(); i++) {
// new_val += buffer[i] * output[i][interp_i];
// }
// results[interp_i] = new_val / inv_sum;
// if (std::isnan(results[interp_i])) {
// std::cout << "nan with new_val = " << output[0][i] << std::endl;
// }
// }
return results; return results;
} }
@ -167,16 +117,12 @@ poet::ChemistryModule::ChemistryModule(
this->n_cells = chem_params.total_grid_cells; this->n_cells = chem_params.total_grid_cells;
if (!is_master) { if (!is_master) {
for (const auto &[pqc_id, pqc_config] : chem_params.pqc_config) { PhreeqcMatrix pqc_mat =
this->phreeqc_instances[pqc_id] = PhreeqcMatrix(chem_params.database, chem_params.pqc_script,
std::make_unique<PhreeqcEngine>(pqc_config); chem_params.with_h0_o0, chem_params.with_redox);
}
// for (std::size_t i = 0; i < chem_params.pqc_ids.size(); i++) { this->pqc_runner =
// this->phreeqc_instances[chem_params.pqc_ids[i]] = std::make_unique<PhreeqcRunner>(pqc_mat.subset(chem_params.pqc_ids));
// std::make_unique<PhreeqcWrapper>(
// chem_params.database, chem_params.pqc_scripts[i],
// chem_params.pqc_sol_order, chem_params.field_header, wp_size_);
// }
} }
} }
@ -187,11 +133,10 @@ poet::ChemistryModule::~ChemistryModule() {
} }
void poet::ChemistryModule::initializeDHT( void poet::ChemistryModule::initializeDHT(
uint32_t size_mb, const NamedVector<std::uint32_t> &key_species) { uint32_t size_mb, const NamedVector<std::uint32_t> &key_species,
bool has_het_ids) {
constexpr uint32_t MB_FACTOR = 1E6; constexpr uint32_t MB_FACTOR = 1E6;
this->dht_enabled = true;
MPI_Comm dht_comm; MPI_Comm dht_comm;
if (this->is_master) { if (this->is_master) {
@ -221,7 +166,7 @@ void poet::ChemistryModule::initializeDHT(
this->dht = new DHT_Wrapper(dht_comm, dht_size, map_copy, key_indices, this->dht = new DHT_Wrapper(dht_comm, dht_size, map_copy, key_indices,
this->prop_names, params.hooks, this->prop_names, params.hooks,
this->prop_count, interp_enabled); this->prop_count, interp_enabled, has_het_ids);
this->dht->setBaseTotals(base_totals.at(0), base_totals.at(1)); this->dht->setBaseTotals(base_totals.at(0), base_totals.at(1));
} }
} }
@ -312,9 +257,10 @@ void poet::ChemistryModule::initializeInterp(
map_copy = this->dht->getKeySpecies(); map_copy = this->dht->getKeySpecies();
for (auto i = 0; i < map_copy.size(); i++) { for (auto i = 0; i < map_copy.size(); i++) {
const std::uint32_t signif = const std::uint32_t signif =
static_cast<std::uint32_t>(map_copy[i]) - (map_copy[i] > InterpolationModule::COARSE_DIFF static_cast<std::uint32_t>(map_copy[i]) -
? InterpolationModule::COARSE_DIFF (map_copy[i] > InterpolationModule::COARSE_DIFF
: 0); ? InterpolationModule::COARSE_DIFF
: 0);
map_copy[i] = signif; map_copy[i] = signif;
} }
} }
@ -372,6 +318,7 @@ void poet::ChemistryModule::unshuffleField(const std::vector<double> &in_buffer,
} }
} }
void poet::ChemistryModule::set_ai_surrogate_validity_vector(std::vector<int> r_vector) { void poet::ChemistryModule::set_ai_surrogate_validity_vector(
std::vector<int> r_vector) {
this->ai_surrogate_validity_vector = r_vector; this->ai_surrogate_validity_vector = r_vector;
} }

View File

@ -8,10 +8,11 @@
#include "ChemistryDefs.hpp" #include "ChemistryDefs.hpp"
#include "Init/InitialList.hpp" #include "Init/InitialList.hpp"
#include "NameDouble.h"
#include "SurrogateModels/DHT_Wrapper.hpp" #include "SurrogateModels/DHT_Wrapper.hpp"
#include "SurrogateModels/Interpolation.hpp" #include "SurrogateModels/Interpolation.hpp"
#include "PhreeqcEngine.hpp" #include "PhreeqcRunner.hpp"
#include <array> #include <array>
#include <cstdint> #include <cstdint>
#include <map> #include <map>
@ -75,9 +76,13 @@ public:
struct SurrogateSetup { struct SurrogateSetup {
std::vector<std::string> prop_names; std::vector<std::string> prop_names;
std::array<double, 2> base_totals;
bool has_het_ids;
bool dht_enabled; bool dht_enabled;
std::uint32_t dht_size_mb; std::uint32_t dht_size_mb;
int dht_snaps;
std::string dht_out_dir;
bool interp_enabled; bool interp_enabled;
std::uint32_t interp_bucket_size; std::uint32_t interp_bucket_size;
@ -95,8 +100,15 @@ public:
this->interp_enabled = setup.interp_enabled; this->interp_enabled = setup.interp_enabled;
this->ai_surrogate_enabled = setup.ai_surrogate_enabled; this->ai_surrogate_enabled = setup.ai_surrogate_enabled;
this->base_totals = setup.base_totals;
if (this->dht_enabled || this->interp_enabled) { if (this->dht_enabled || this->interp_enabled) {
this->initializeDHT(setup.dht_size_mb, this->params.dht_species); this->initializeDHT(setup.dht_size_mb, this->params.dht_species,
setup.has_het_ids);
if (setup.dht_snaps != DHT_SNAPS_DISABLED) {
this->setDHTSnapshots(setup.dht_snaps, setup.dht_out_dir);
}
} }
if (this->interp_enabled) { if (this->interp_enabled) {
@ -222,8 +234,8 @@ public:
}; };
/** /**
* **Master only** Set the ai surrogate validity vector from R * **Master only** Set the ai surrogate validity vector from R
*/ */
void set_ai_surrogate_validity_vector(std::vector<int> r_vector); void set_ai_surrogate_validity_vector(std::vector<int> r_vector);
std::vector<uint32_t> GetWorkerInterpolationCalls() const; std::vector<uint32_t> GetWorkerInterpolationCalls() const;
@ -239,7 +251,8 @@ public:
protected: protected:
void initializeDHT(uint32_t size_mb, void initializeDHT(uint32_t size_mb,
const NamedVector<std::uint32_t> &key_species); const NamedVector<std::uint32_t> &key_species,
bool has_het_ids);
void setDHTSnapshots(int type, const std::string &out_dir); void setDHTSnapshots(int type, const std::string &out_dir);
void setDHTReadFile(const std::string &input_file); void setDHTReadFile(const std::string &input_file);
@ -390,7 +403,7 @@ protected:
const InitialList::ChemistryInit params; const InitialList::ChemistryInit params;
std::map<int, std::unique_ptr<PhreeqcEngine>> phreeqc_instances; std::unique_ptr<PhreeqcRunner> pqc_runner;
}; };
} // namespace poet } // namespace poet

View File

@ -1,43 +0,0 @@
import tensorflow as tf
import numpy as np
import os
os.environ["TF_XLA_FLAGS"] = "--tf_xla_cpu_global_jit"
os.environ["XLA_FLAGS"] = "--xla_gpu_cuda_data_dir=" + cuda_dir
def initiate_model(model_file_path):
print("AI: Model loaded from: " + model_file_path, flush=True)
model = tf.keras.models.load_model(model_file_path)
return model
def prediction_step(model, model_reactive, x, cluster_labels, batch_size):
# Catch input size mismatches
model_input_shape = model.input_shape[1:]
if model_input_shape != x.shape[1:]:
print(f"Input data size {x.shape[1:]} does not match model input size {model_input_shape}",
flush=True)
# Predict separately if clustering is used
if cluster_labels:
cluster_labels = np.asarray(cluster_labels, dtype=bool)
# Combine results
prediction = np.zeros_like(x)
prediction[cluster_labels] = model_reactive.predict(x[cluster_labels], batch_size)
prediction[~cluster_labels] = model.predict(x[~cluster_labels], batch_size)
else:
prediction = model.predict(x, batch_size)
return np.array(prediction, dtype=np.float64)
def get_weights(model):
weights = model.get_weights()
return weights
def training_step(model, x, y, batch_size, epochs,
output_file_path):
history = model.fit(x, y,
epochs=epochs,
batch_size=batch_size)
if output_file_path:
model.save(output_file_path)
return history

View File

@ -1,757 +0,0 @@
#include <iostream>
#include <string>
#include <cstring>
#include <vector>
#include <Python.h>
#include <numpy/arrayobject.h>
#include <Eigen/Dense>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "AI_functions.hpp"
#include "poet.hpp"
using namespace std;
namespace poet {
/**
* @brief Loads the Python interpreter and functions
* @param functions_file_path Path to the Python file where the AI surrogate
* functions are defined
* @return 0 if function was succesful
*/
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir) {
// Initialize Python functions
Py_Initialize();
// Import numpy functions
_import_array();
PyRun_SimpleString(("cuda_dir = \"" + cuda_src_dir + "\"").c_str()) ;
FILE* fp = fopen(functions_file_path.c_str(), "r");
int py_functions_initialized = PyRun_SimpleFile(fp, functions_file_path.c_str());
fclose(fp);
if (py_functions_initialized != 0) {
PyErr_Print();
throw std::runtime_error(std::string("AI surrogate Python functions could not be loaded." ) +
"Are tensorflow and numpy installed?");
}
return py_functions_initialized;
}
/**
* @brief Loads the user-supplied Keras model
* @param model_file_path Path to a .keras file that the user must supply as
* a variable "model_file_path" in the R input script
* @return 0 if function was succesful
*/
int Python_Keras_load_model(std::string model, std::string model_reactive, bool use_clustering) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Initialize Keras default model
int py_model_loaded = PyRun_SimpleString(("model = initiate_model(\"" +
model + "\")").c_str());
if (py_model_loaded != 0) {
PyErr_Print();
throw std::runtime_error("Keras model could not be loaded from: " + model);
}
if (use_clustering) {
// Initialize second Keras model that will be used for the "reaction" cluster
py_model_loaded = PyRun_SimpleString(("model_reactive = initiate_model(\"" +
model_reactive + "\")").c_str());
if (py_model_loaded != 0) {
PyErr_Print();
throw std::runtime_error("Keras model could not be loaded from: " + model_reactive);
}
}
// Release the Python GIL
PyGILState_Release(gstate);
return py_model_loaded;
}
/**
* @brief Converts the std::vector 2D matrix representation of a POET Field object to a numpy array
* for use in the Python AI surrogate functions
* @param field 2D-Matrix with the content of a Field object
* @return Numpy representation of the input vector
*/
PyObject* vector_to_numpy_array(const std::vector<std::vector<double>>& field) {
npy_intp dims[2] = {static_cast<npy_intp>(field[0].size()),
static_cast<npy_intp>(field.size())};
PyObject* np_array = PyArray_SimpleNew(2, dims, NPY_FLOAT64);
double* data = static_cast<double*>(PyArray_DATA((PyArrayObject*)np_array));
// write field data to numpy array
for (size_t i = 0; i < field.size(); ++i) {
for (size_t j = 0; j < field[i].size(); ++j) {
data[j * field.size() + i] = field[i][j];
}
}
return np_array;
}
/**
* @brief Converts a Pyton matrix object to a std::vector vector
* @param py_matrix Pyobject that must be a 2D matrix
* @result Vector that can be used similar to the return value of the Field object
* Field.AsVector() method.
*/
std::vector<double> numpy_array_to_vector(PyObject* py_array) {
std::vector<double> result;
if (!PyArray_Check(py_array)) {
std::cerr << "The model's output is not a numpy array." << std::endl;
return result;
}
// Cast generic PyObject to PyArrayObject
PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(py_array);
// Get shape
int numDims = PyArray_NDIM(np_array);
npy_intp* shape = PyArray_SHAPE(np_array);
if (numDims != 2) {
std::cerr << "The model's predictions are not a 2D matrix." << std::endl;
return result;
}
// Copy data into std::vector format
double* data = static_cast<double*>(PyArray_DATA(np_array));
npy_intp size = PyArray_SIZE(np_array);
result.resize(size);
std::copy(data, data + size, result.begin());
return result;
}
/**
* @brief Uses the Python Keras functions to calculate predictions from a neural network.
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
std::vector<int>& cluster_labels) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Prepare data for Python
PyObject* py_df_x = vector_to_numpy_array(x);
// Prepare cluster label vector for Python
PyObject* py_cluster_list = PyList_New(cluster_labels.size());
for (size_t i = 0; i < cluster_labels.size(); i++) {
PyObject* py_int = PyLong_FromLong(cluster_labels[i]);
PyList_SET_ITEM(py_cluster_list, i, py_int);
}
// Get the model and inference function from the global python interpreter
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, "model");
PyObject* py_inference_function = PyDict_GetItemString(py_global_dict, "prediction_step");
// Get secod model if clustering is used
PyObject* py_keras_model_reactive = Py_None;;
if (cluster_labels.size() > 0) {
py_keras_model_reactive = PyDict_GetItemString(py_global_dict, "model_reactive");
}
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOOOi)",
py_keras_model, py_keras_model_reactive, py_df_x, py_cluster_list, batch_size);
// Call the Python inference function
PyObject* py_predictions = PyObject_CallObject(py_inference_function, args);
// Check py_rv and return as 2D vector
std::vector<double> predictions = numpy_array_to_vector(py_predictions);
// Clean up
PyErr_Print();
Py_XDECREF(py_df_x);
Py_XDECREF(py_cluster_list);
Py_XDECREF(args);
Py_XDECREF(py_predictions);
// Release the Python GIL
PyGILState_Release(gstate);
return predictions;
}
/**
* @brief Uses Eigen for fast inference with the weights and biases of a neural network.
* This function assumes ReLU activation for each layer.
* @param input_batch Batch of input data that must fit the size of the neural networks input layer
* @param model Struct of aligned Eigen vectors that hold the neural networks weights and biases.
* Only supports simple fully connected feed forward networks.
* @return The batch of predictions made with the neural network weights and biases and the data
* in input_batch
*/
Eigen::MatrixXd eigen_inference_batched(const Eigen::Ref<const Eigen::MatrixXd>& input_batch, const EigenModel& model) {
Eigen::MatrixXd current_layer = input_batch;
// Process all hidden layers
for (size_t layer = 0; layer < model.weight_matrices.size() - 1; ++layer) {
current_layer = (model.weight_matrices[layer] * current_layer);
current_layer = current_layer.colwise() + model.biases[layer];
current_layer = current_layer.array().max(0.0);
}
// Process output layer (without ReLU)
size_t output_layer = model.weight_matrices.size() - 1;
return (model.weight_matrices[output_layer] * current_layer).colwise() + model.biases[output_layer];
}
/**
* @brief Uses the Eigen representation of the two different Keras model weights for fast inference
* @param model The model for the non reactive cluster of the field (label 0)
* @param model_reactive The model for the non reactive cluster of the field (label 1)
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from
* the training thread
* @param cluster_labels K-Means cluster label dor each row in the field
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Eigen_predict_clustered(const EigenModel& model, const EigenModel& model_reactive,
std::vector<std::vector<double>>& x, int batch_size,
std::mutex* Eigen_model_mutex, std::vector<int>& cluster_labels) {
const int num_samples = x[0].size();
const int num_features = x.size();
if (num_features != model.weight_matrices[0].cols() ||
num_features != model_reactive.weight_matrices[0].cols()) {
throw std::runtime_error("Input data size " + std::to_string(num_features) +
" does not match model input layer sizes" + std::to_string(model.weight_matrices[0].cols()) +
" / " + std::to_string(model_reactive.weight_matrices[0].cols()));
}
// Convert input data to Eigen matrix
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
for (size_t i = 0; i < num_samples; ++i) {
for (size_t j = 0; j < num_features; ++j) {
full_input_matrix(j, i) = x[j][i];
}
}
// Create indices for each cluster
std::vector<int> cluster_0_indices, cluster_1_indices;
for (size_t i = 0; i < cluster_labels.size(); ++i) {
if (cluster_labels[i] == 0) {
cluster_0_indices.push_back(i);
} else {
cluster_1_indices.push_back(i);
}
}
// Prepare matrices for each cluster
Eigen::MatrixXd input_matrix(num_features, cluster_0_indices.size());
Eigen::MatrixXd input_matrix_reactive(num_features, cluster_1_indices.size());
// Split data according to cluster labels
for (size_t i = 0; i < cluster_0_indices.size(); ++i) {
input_matrix.col(i) = full_input_matrix.col(cluster_0_indices[i]);
}
for (size_t i = 0; i < cluster_1_indices.size(); ++i) {
input_matrix_reactive.col(i) = full_input_matrix.col(cluster_1_indices[i]);
}
// Process each cluster
std::vector<double> result(num_samples * model.weight_matrices.back().rows());
Eigen_model_mutex->lock();
if (!cluster_0_indices.empty()) {
int num_batches_0 = std::ceil(static_cast<double>(cluster_0_indices.size()) / batch_size);
for (int batch = 0; batch < num_batches_0; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, static_cast<int>(cluster_0_indices.size()));
int current_batch_size = end_idx - start_idx;
Eigen::MatrixXd batch_data = input_matrix.block(0, start_idx, num_features, current_batch_size);
Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model);
// Store results in their original positions
for (size_t i = 0; i < current_batch_size; ++i) {
int original_idx = cluster_0_indices[start_idx + i];
for (size_t j = 0; j < batch_result.rows(); ++j) {
result[original_idx * batch_result.rows() + j] = batch_result(j, i);
}
}
}
}
// Process cluster 1
if (!cluster_1_indices.empty()) {
int num_batches_1 = std::ceil(static_cast<double>(cluster_1_indices.size()) / batch_size);
for (int batch = 0; batch < num_batches_1; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, static_cast<int>(cluster_1_indices.size()));
int current_batch_size = end_idx - start_idx;
Eigen::MatrixXd batch_data = input_matrix_reactive.block(0, start_idx, num_features, current_batch_size);
Eigen::MatrixXd batch_result = eigen_inference_batched(batch_data, model_reactive);
// Store results in their original positions
for (size_t i = 0; i < current_batch_size; ++i) {
int original_idx = cluster_1_indices[start_idx + i];
for (size_t j = 0; j < batch_result.rows(); ++j) {
result[original_idx * batch_result.rows() + j] = batch_result(j, i);
}
}
}
}
Eigen_model_mutex->unlock();
return result;
}
/**
* @brief Uses the Eigen representation of the tKeras model weights for fast inference
* @param model The model weights and biases
* @param x 2D-Matrix with the content of a Field object
* @param batch_size size for mini-batches that are used in the Keras model.predict() method
* @param Eigen_model_mutex Mutex that locks the model during inference and prevents updaties from
* the training thread
* @return Predictions that the neural network made from the input values x. The predictions are
* represented as a vector similar to the representation from the Field.AsVector() method
*/
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
std::mutex* Eigen_model_mutex) {
// Convert input data to Eigen matrix
const int num_samples = x[0].size();
const int num_features = x.size();
Eigen::MatrixXd full_input_matrix(num_features, num_samples);
for (size_t i = 0; i < num_samples; ++i) {
for (size_t j = 0; j < num_features; ++j) {
full_input_matrix(j, i) = x[j][i];
}
}
std::vector<double> result;
result.reserve(num_samples * num_features);
if (num_features != model.weight_matrices[0].cols()) {
throw std::runtime_error("Input data size " + std::to_string(num_features) + \
" does not match model input layer of size " + std::to_string(model.weight_matrices[0].cols()));
}
int num_batches = std::ceil(static_cast<double>(num_samples) / batch_size);
Eigen_model_mutex->lock();
for (int batch = 0; batch < num_batches; ++batch) {
int start_idx = batch * batch_size;
int end_idx = std::min((batch + 1) * batch_size, num_samples);
int current_batch_size = end_idx - start_idx;
// Extract the current input data batch
Eigen::MatrixXd batch_data(num_features, current_batch_size);
batch_data = full_input_matrix.block(0, start_idx, num_features, current_batch_size);
// Predict
batch_data = eigen_inference_batched(batch_data, model);
result.insert(result.end(), batch_data.data(), batch_data.data() + batch_data.size());
}
Eigen_model_mutex->unlock();
return result;
}
/**
* @brief Appends data from one matrix (column major std::vector<std::vector<double>>) to another
* @param training_data_buffer Matrix that the values are appended to
* @param new_values Matrix that is appended
*/
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
std::vector<std::vector<double>>& new_values) {
// Initialize training data buffer if empty
if (training_data_buffer.size() == 0) {
training_data_buffer = new_values;
} else { // otherwise append
for (size_t col = 0; col < training_data_buffer.size(); col++) {
training_data_buffer[col].insert(training_data_buffer[col].end(),
new_values[col].begin(), new_values[col].end());
}
}
}
/**
* @brief Appends data from one int vector to another based on a mask vector
* @param labels Vector that the values are appended to
* @param new_labels Values that are appended
* @param validity Mask vector that defines how many and which values are appended
*/
void cluster_labels_append(std::vector<int>& labels_buffer, std::vector<int>& new_labels,
std::vector<int> validity) {
// Calculate new buffer size from number of valid elements in mask
int n_invalid = validity.size();
for (size_t i = 0; i < validity.size(); i++) {
n_invalid -= validity[i];
}
// Resize label vector to hold non valid elements
int end_index = labels_buffer.size();
int new_size = end_index + n_invalid;
labels_buffer.resize(new_size);
// Iterate over mask to transfer cluster labels
for (size_t i = 0; i < validity.size(); ++i) {
// Append only the labels of invalid rows
if (!validity[i]) {
labels_buffer[end_index] = new_labels[i];
end_index++;
}
}
}
/**
* @brief Uses the Python environment with Keras' default functions to train the model
* @param x Training data features
* @param y Training data targets
* @param params Global runtime paramters
*/
void Python_Keras_train(std::vector<std::vector<double>>& x, std::vector<std::vector<double>>& y,
int train_cluster, std::string model_name, const RuntimeParameters& params) {
// Prepare data for python
PyObject* py_df_x = vector_to_numpy_array(x);
PyObject* py_df_y = vector_to_numpy_array(y);
// Make sure that model output file name .keras file
std::string model_path = params.save_model_path;
if (!model_path.empty()) {
if (model_path.length() >= 6 && model_path.substr(model_path.length() - 6) != ".keras") {
model_path += ".keras";
}
}
// Choose the correct model to train if clustering is used
if (train_cluster == 1) {
if (!model_path.empty()) {
model_path.insert(model_path.length() - 6, "_reaction");
std::cout << "MODEL SAVED AS:" << model_path << std::endl;
}
}
// Get the model and training function from the global python interpreter
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str());
PyObject* py_training_function = PyDict_GetItemString(py_global_dict, "training_step");
// Build the function arguments as four python objects and an integer
PyObject* args = Py_BuildValue("(OOOiis)",
py_keras_model, py_df_x, py_df_y, params.batch_size, params.training_epochs,
model_path.c_str());
// Call the Python training function
PyObject *py_rv = PyObject_CallObject(py_training_function, args);
// Clean up
PyErr_Print();
Py_DECREF(py_df_x);
Py_DECREF(py_df_y);
Py_DECREF(args);
}
/**
* @brief Function for threadsafe parallel training and weight updating.
* The function waits conditionally until the training data buffer is full.
* It then clears the buffer and starts training, after training it writes the new weights to
* the Eigen model.
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @param end_training Signals end of program to wind down thread gracefully
* @param params Global runtime paramters
* @return 0 if function was succesful
*/
void parallel_training(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training,
const RuntimeParameters& params) {
while (true) {
// Conditional waiting:
// - Sleeps until a signal arrives on training_data_buffer_full
// - Releases the lock on training_data_buffer_mutex while sleeping
// - Lambda function with start_training checks if it was a spurious wakeup
// - Reaquires the lock on training_data_buffer_mutex after waking up
// - If start_training has been set to true while the thread was active, it does NOT
// wait for a signal on training_data_buffer_full but starts the next round immediately.
std::unique_lock<std::mutex> lock(*training_data_buffer_mutex);
training_data_buffer_full->wait(lock, [start_training] { return *start_training;});
// Return if program is about to end
if (*end_training) {
return;
}
// Get the necessary training data
std::cout << "AI: Training thread: Getting training data" << std::endl;
// Initialize training data input and targets
std::vector<std::vector<double>> inputs(training_data_buffer->x.size(),
std::vector<double>(params.training_data_size));
std::vector<std::vector<double>> targets(training_data_buffer->x.size(),
std::vector<double>(params.training_data_size));
int buffer_size = training_data_buffer->x[0].size();
// If clustering is used, check the current cluster
int n_cluster_reactive = 0;
int train_cluster = -1; // Default value for non clustered training (all data is used)
if (params.use_clustering) {
for (size_t i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer->cluster_labels[i];
}
train_cluster = n_cluster_reactive >= params.training_data_size;
}
int buffer_row = 0;
int copied_row = 0;
while (copied_row < params.training_data_size) {
if ((train_cluster == -1) ||
(train_cluster == training_data_buffer->cluster_labels[buffer_row])) {
for (size_t col = 0; col < training_data_buffer->x.size(); col++) {
// Copy and remove from training data buffer
inputs[col][copied_row] = training_data_buffer->x[col][buffer_row];
targets[col][copied_row] = training_data_buffer->y[col][buffer_row];
training_data_buffer->x[col].erase(training_data_buffer->x[col].begin() +
buffer_row);
training_data_buffer->y[col].erase(training_data_buffer->y[col].begin() +
buffer_row);
}
// Remove from cluster label buffer
if (params.use_clustering) {
training_data_buffer->cluster_labels.erase(
training_data_buffer->cluster_labels.begin() + buffer_row);
}
copied_row++;
} else {
buffer_row++;
}
}
// Set the waiting predicate to immediately continue training if enough elements of any cluster remain
if (train_cluster == 1) {
*start_training = ((n_cluster_reactive - params.training_data_size) >= params.training_data_size) ||
((buffer_size - n_cluster_reactive) >= params.training_data_size);
} else {
*start_training = (buffer_size - n_cluster_reactive - params.training_data_size)
>= params.training_data_size;
}
//update number of training runs
training_data_buffer->n_training_runs += 1;
// Unlock the training_data_buffer_mutex
lock.unlock();
std::string model_name = "model";
if (train_cluster == 1) {
model_name = "model_reactive";
}
std::cout << "AI: Training thread: Start training " << model_name << std::endl;
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
// Start training
Python_Keras_train(inputs, targets, train_cluster, model_name, params);
if (!params.use_Keras_predictions) {
std::cout << "AI: Training thread: Update shared model weights" << std::endl;
std::vector<std::vector<std::vector<double>>> cpp_weights = Python_Keras_get_weights(model_name);
Eigen_model_mutex->lock();
if (train_cluster == 1) {
update_weights(Eigen_model_reactive, cpp_weights);
} else {
update_weights(Eigen_model, cpp_weights);
}
Eigen_model_mutex->unlock();
}
// Release the Python GIL
PyGILState_Release(gstate);
std::cout << "AI: Training thread: Finished training, waiting for new data" << std::endl;
}
}
std::thread python_train_thread;
/**
* @brief Starts a thread for parallel training and weight updating. This Wrapper function
* ensures, that the main POET program can be built without pthread support if the AI
* surrogate functions are disabled during compilation.
* @param Eigen_model Pointer to the EigenModel struct that will be updates with new weights
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer Pointer to the Training data struct with which the model is trained
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @param end_training Signals end of program to wind down thread gracefully
* @param params Global runtime paramters
* @return 0 if function was succesful
*/
int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training,
const RuntimeParameters& params) {
PyThreadState *_save = PyEval_SaveThread();
python_train_thread = std::thread(parallel_training, Eigen_model, Eigen_model_reactive,
Eigen_model_mutex, training_data_buffer,
training_data_buffer_mutex, training_data_buffer_full,
start_training, end_training, params);
return 0;
}
/**
* @brief Updates the EigenModels weigths and biases from the weight vector
* @param model Pounter to an EigenModel struct
* @param weights Vector of model weights from keras as returned by Python_Keras_get_weights()
*/
void update_weights(EigenModel* model,
const std::vector<std::vector<std::vector<double>>>& weights) {
size_t num_layers = weights.size() / 2;
for (size_t i = 0; i < weights.size(); i += 2) {
// Fill current weight matrix
size_t rows = weights[i][0].size();
size_t cols = weights[i].size();
for (size_t j = 0; j < cols; ++j) {
for (size_t k = 0; k < rows; ++k) {
model->weight_matrices[i / 2](k, j) = weights[i][j][k];
}
}
// Fill bias vector
size_t bias_size = weights[i + 1][0].size();
for (size_t j = 0; j < bias_size; ++j) {
model->biases[i / 2](j) = weights[i + 1][0][j];
}
}
}
/**
* @brief Converts the weights and biases from the Python Keras model to C++ vectors
* @return A vector containing the model weights and biases
*/
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string model_name) {
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
PyObject* py_main_module = PyImport_AddModule("__main__");
PyObject* py_global_dict = PyModule_GetDict(py_main_module);
PyObject* py_keras_model = PyDict_GetItemString(py_global_dict, model_name.c_str());
PyObject* py_get_weights_function = PyDict_GetItemString(py_global_dict, "get_weights");
PyObject* args = Py_BuildValue("(O)", py_keras_model);
// Call Python function
PyObject* py_weights_list = PyObject_CallObject(py_get_weights_function, args);
if (!py_weights_list) {
PyErr_Print();
throw std::runtime_error("Failed to get weights from Keras model");
}
// Container for the extracted weights
std::vector<std::vector<std::vector<double>>> cpp_weights;
// Iterate through the layers (weights and biases)
Py_ssize_t num_layers = PyList_Size(py_weights_list);
for (Py_ssize_t i = 0; i < num_layers; ++i) {
PyObject* py_weight_array = PyList_GetItem(py_weights_list, i);
if (!PyArray_Check(py_weight_array)) {
throw std::runtime_error("Weight is not a NumPy array.");
}
PyArrayObject* weight_np = reinterpret_cast<PyArrayObject*>(py_weight_array);
int dtype = PyArray_TYPE(weight_np);
// If array is 2D it's a weight matrix
if (PyArray_NDIM(weight_np) == 2) {
int num_rows = PyArray_DIM(weight_np, 0);
int num_cols = PyArray_DIM(weight_np, 1);
std::vector<std::vector<double>> weight_matrix(num_rows, std::vector<double>(num_cols));
// Handle different precision settings
if (dtype == NPY_FLOAT32) {
float* weight_data_float = static_cast<float*>(PyArray_DATA(weight_np));
for (size_t r = 0; r < num_rows; ++r) {
for (size_t c = 0; c < num_cols; ++c) {
weight_matrix[r][c] = static_cast<double>(weight_data_float[r * num_cols + c]);
}
}
} else if (dtype == NPY_DOUBLE) {
double* weight_data_double = static_cast<double*>(PyArray_DATA(weight_np));
for (size_t r = 0; r < num_rows; ++r) {
for (size_t c = 0; c < num_cols; ++c) {
weight_matrix[r][c] = weight_data_double[r * num_cols + c];
}
}
} else {
throw std::runtime_error("Unsupported data type for weights. Must be NPY_FLOAT32 or NPY_DOUBLE.");
}
cpp_weights.push_back(weight_matrix);
// If array is 1D it's a bias vector
} else if (PyArray_NDIM(weight_np) == 1) {
int num_elements = PyArray_DIM(weight_np, 0);
std::vector<std::vector<double>> bias_vector(1, std::vector<double>(num_elements));
// Handle different precision settings
if (dtype == NPY_FLOAT32) {
float* bias_data_float = static_cast<float*>(PyArray_DATA(weight_np));
for (size_t j = 0; j < num_elements; ++j) {
bias_vector[0][j] = static_cast<double>(bias_data_float[j]);
}
} else if (dtype == NPY_DOUBLE) {
double* bias_data_double = static_cast<double*>(PyArray_DATA(weight_np));
for (size_t j = 0; j < num_elements; ++j) {
bias_vector[0][j] = bias_data_double[j];
}
} else {
throw std::runtime_error("Unsupported data type for biases. Must be NPY_FLOAT32 or NPY_DOUBLE.");
}
cpp_weights.push_back(bias_vector);
}
}
// Clean up
Py_DECREF(py_weights_list);
Py_DECREF(args);
// Release Python GIL
PyGILState_Release(gstate);
return cpp_weights;
}
/**
* @brief Joins the training thread and winds down the Python environment gracefully
* @param Eigen_model_mutex Mutex to ensure threadsafe access to the EigenModel struct
* @param training_data_buffer_mutex Mutex to ensure threadsafe access to the training data struct
* @param training_data_buffer_full Conditional waiting variable with wich the main thread signals
* when a training run can start
* @param start_training Conditional waiting predicate to mitigate against spurious wakeups
* @param end_training Signals end of program to wind down thread gracefully */
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training) {
training_data_buffer_mutex->lock();
// Define training as over
*end_training = true;
// Wake up and join training thread
*start_training = true;
training_data_buffer_mutex->unlock();
training_data_buffer_full->notify_one();
if (python_train_thread.joinable()) {
python_train_thread.join();
}
// Acquire the Python GIL
PyGILState_STATE gstate = PyGILState_Ensure();
//Finalize Python
Py_FinalizeEx();
}
} //namespace poet

View File

@ -1,112 +0,0 @@
/**
* @file AI_functions.hpp
* @author Hans Straile (straile@uni-potsdam.de)
* @brief API for the AI/Machine Learning based chemistry surrogate model with functions to initialize a neural network and use it for training and inference via Keras for Python .
* @version 0.1
* @date 01 Nov 2024
*
* This file implements functions to train and predict with a neural network.
* All functions are based on a user supplied Keras model. Pyhton.h is used for model
* training with Keras and can be used for inference. The default inference funtion
* is implemented with Eigen matrices in C++. All functions use 2 different models
* to process data separately according to a K-means cluster assignement. This file
* alo contains the functions for the K-means algorithm.
*
*/
#ifndef AI_FUNCTIONS_H
#define AI_FUNCTIONS_H
#include <string>
#include <vector>
#include "poet.hpp"
// PhreeqC definition of pi clashes with Eigen macros
// so we have to temporarily undef it
#pragma push_macro("pi")
#undef pi
#include <Eigen/Dense>
#pragma pop_macro("pi")
namespace poet {
struct EigenModel {
// Custom struct for the Keras weight matrices and
// bias vectors
std::vector<Eigen::MatrixXd> weight_matrices;
std::vector<Eigen::VectorXd> biases;
};
struct TrainingData {
std::vector<std::vector<double>> x;
std::vector<std::vector<double>> y;
std::vector<int> cluster_labels;
int n_training_runs = 0;
};
// Ony declare the actual functions if flag is set
#ifdef USE_AI_SURROGATE
int Python_Keras_setup(std::string functions_file_path, std::string cuda_src_dir);
void Python_finalize(std::mutex* Eigen_model_mutex, std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full, bool* start_training,
bool* end_training);
int Python_Keras_load_model(std::string model, std::string model_reactive,
bool use_clustering);
std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>& x, int batch_size,
std::vector<int>& cluster_labels);
void training_data_buffer_append(std::vector<std::vector<double>>& training_data_buffer,
std::vector<std::vector<double>>& new_values);
void cluster_labels_append(std::vector<int>& labels, std::vector<int>& new_labels,
std::vector<int> validity);
int Python_Keras_training_thread(EigenModel* Eigen_model, EigenModel* Eigen_model_reactive,
std::mutex* Eigen_model_mutex,
TrainingData* training_data_buffer,
std::mutex* training_data_buffer_mutex,
std::condition_variable* training_data_buffer_full,
bool* start_training, bool* end_training,
const RuntimeParameters& params);
void update_weights(EigenModel* model, const std::vector<std::vector<std::vector<double>>>& weights);
std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string model_name);
std::vector<double> Eigen_predict_clustered(const EigenModel& model, const EigenModel& model_reactive,
std::vector<std::vector<double>>& x,
int batch_size, std::mutex* Eigen_model_mutex,
std::vector<int>& cluster_labels);
std::vector<double> Eigen_predict(const EigenModel& model, std::vector<std::vector<double>> x, int batch_size,
std::mutex* Eigen_model_mutex);
// Otherwise, define the necessary stubs
#else
inline void Python_Keras_setup(std::string, std::string){}
inline void Python_finalize(std::mutex*, std::mutex*, std::condition_variable*, bool*, bool*){}
inline void Python_Keras_load_model(std::string, std::string, bool){}
inline std::vector<double> Python_Keras_predict(std::vector<std::vector<double>>&, int,
std::vector<int>&){return {};}
inline void training_data_buffer_append(std::vector<std::vector<double>>&,
std::vector<std::vector<double>>&){}
inline void cluster_labels_append(std::vector<int>&, std::vector<int>&, std::vector<int>){}
inline int Python_Keras_training_thread(EigenModel*, EigenModel*, std::mutex*,
TrainingData*, std::mutex*, std::condition_variable*,
bool*, bool*, const RuntimeParameters&){return {};}
inline void update_weights(EigenModel*, const std::vector<std::vector<std::vector<double>>>&){}
inline std::vector<std::vector<std::vector<double>>> Python_Keras_get_weights(std::string){return {};}
inline std::vector<double> Eigen_predict_clustered(const EigenModel&, const EigenModel&,
std::vector<std::vector<double>>&, int,
std::mutex*, std::vector<int>&){return {};}
inline std::vector<double> Eigen_predict(const EigenModel&, std::vector<std::vector<double>>, int,
std::mutex*){return {};}
#endif
} // namespace poet
#endif // AI_FUNCTIONS_HPP

View File

@ -105,6 +105,8 @@ DHT *DHT_create(MPI_Comm comm, uint64_t size, unsigned int data_size,
object->index_count = 9 - (index_bytes / 8); object->index_count = 9 - (index_bytes / 8);
object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t)); object->index = (uint64_t *)malloc((object->index_count) * sizeof(uint64_t));
object->mem_alloc = mem_alloc; object->mem_alloc = mem_alloc;
object->sum_idx = 0;
object->cnt_idx = 0;
// if set, initialize dht_stats // if set, initialize dht_stats
#ifdef DHT_STATISTICS #ifdef DHT_STATISTICS
@ -189,6 +191,9 @@ int DHT_write_accumulate(DHT *table, const void *send_key, int data_size,
} }
} }
table->cnt_idx += 1;
table->sum_idx += (i + 1);
if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) { if (result == DHT_WRITE_SUCCESS_WITH_EVICTION) {
memset((char *)table->send_entry + 1 + table->key_size, '\0', memset((char *)table->send_entry + 1 + table->key_size, '\0',
table->data_size); table->data_size);
@ -279,6 +284,9 @@ int DHT_write(DHT *table, void *send_key, void *send_data, uint32_t *proc,
} }
} }
table->cnt_idx += 1;
table->sum_idx += (i + 1);
// put data to DHT (with last selected index by value i) // put data to DHT (with last selected index by value i)
if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size, if (MPI_Put(table->send_entry, 1 + table->data_size + table->key_size,
MPI_BYTE, dest_rank, table->index[i], MPI_BYTE, dest_rank, table->index[i],
@ -550,6 +558,41 @@ int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter) {
return DHT_SUCCESS; return DHT_SUCCESS;
} }
float DHT_get_used_idx_factor(DHT *table, int with_reset) {
int rank;
MPI_Comm_rank(table->communicator, &rank);
float my_avg_idx = (float)table->sum_idx / (float)table->cnt_idx;
float max_mean_index;
MPI_Reduce(&my_avg_idx, &max_mean_index, 1, MPI_FLOAT, MPI_MAX, 0,
table->communicator);
MPI_Bcast(&max_mean_index, 1, MPI_FLOAT, 0, table->communicator);
if (!!with_reset) {
table->sum_idx = 0;
table->cnt_idx = 0;
}
return max_mean_index;
}
int DHT_flush(DHT *table) {
// make sure all processes are synchronized
MPI_Barrier(table->communicator);
// wipe local memory with zeros
memset(table->mem_alloc, '\0',
table->table_size * (1 + table->data_size + table->key_size));
table->sum_idx = 0;
table->cnt_idx = 0;
return DHT_SUCCESS;
}
int DHT_print_statistics(DHT *table) { int DHT_print_statistics(DHT *table) {
#ifdef DHT_STATISTICS #ifdef DHT_STATISTICS
int *written_buckets; int *written_buckets;

View File

@ -117,6 +117,9 @@ typedef struct {
unsigned int index_count; unsigned int index_count;
int (*accumulate_callback)(int, void *, int, void *); int (*accumulate_callback)(int, void *, int, void *);
size_t sum_idx;
size_t cnt_idx;
#ifdef DHT_STATISTICS #ifdef DHT_STATISTICS
/** Detailed statistics of the usage of the DHT. */ /** Detailed statistics of the usage of the DHT. */
DHT_stats *stats; DHT_stats *stats;
@ -125,10 +128,11 @@ typedef struct {
extern void DHT_set_accumulate_callback(DHT *table, extern void DHT_set_accumulate_callback(DHT *table,
int (*callback_func)(int, void *, int, int (*callback_func)(int, void *, int,
void *)); void *));
extern int DHT_write_accumulate(DHT *table, const void *key, int send_size, extern int DHT_write_accumulate(DHT *table, const void *key, int send_size,
void *data, uint32_t *proc, uint32_t *index, int *callback_ret); void *data, uint32_t *proc, uint32_t *index,
int *callback_ret);
/** /**
* @brief Create a DHT. * @brief Create a DHT.
@ -284,44 +288,8 @@ extern int DHT_free(DHT *table, int *eviction_counter, int *readerror_counter);
*/ */
extern int DHT_print_statistics(DHT *table); extern int DHT_print_statistics(DHT *table);
/** extern float DHT_get_used_idx_factor(DHT *table, int with_reset);
* @brief Determine destination rank and index.
*
* This is done by looping over all possbile indices. First of all, set a
* temporary index to zero and copy count of bytes for each index into the
* memory area of the temporary index. After that the current index is
* calculated by the temporary index modulo the table size. The destination rank
* of the process is simply determined by hash modulo the communicator size.
*
* @param hash Calculated 64 bit hash.
* @param comm_size Communicator size.
* @param table_size Count of buckets per process.
* @param dest_rank Reference to the destination rank variable.
* @param index Pointer to the array index.
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int *dest_rank,
uint64_t *index, unsigned int index_count);
/** extern int DHT_flush(DHT *table);
* @brief Set the occupied flag.
*
* This will set the first bit of a bucket to 1.
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char *flag_byte);
/**
* @brief Get the occupied flag.
*
* This function determines whether the occupied flag of a bucket was set or
* not.
*
* @param flag_byte First byte of a bucket.
* @return int Returns 1 for true or 0 for false.
*/
static int read_flag(char flag_byte);
#endif /* DHT_H */ #endif /* DHT_H */

View File

@ -43,11 +43,12 @@ DHT_Wrapper::DHT_Wrapper(MPI_Comm dht_comm, std::uint64_t dht_size,
const std::vector<std::int32_t> &key_indices, const std::vector<std::int32_t> &key_indices,
const std::vector<std::string> &_output_names, const std::vector<std::string> &_output_names,
const InitialList::ChemistryHookFunctions &_hooks, const InitialList::ChemistryHookFunctions &_hooks,
uint32_t data_count, bool _with_interp) uint32_t data_count, bool _with_interp,
bool _has_het_ids)
: key_count(key_indices.size()), data_count(data_count), : key_count(key_indices.size()), data_count(data_count),
input_key_elements(key_indices), communicator(dht_comm), input_key_elements(key_indices), communicator(dht_comm),
key_species(key_species), output_names(_output_names), hooks(_hooks), key_species(key_species), output_names(_output_names), hooks(_hooks),
with_interp(_with_interp) { with_interp(_with_interp), has_het_ids(_has_het_ids) {
// initialize DHT object // initialize DHT object
// key size = count of key elements + timestep // key size = count of key elements + timestep
uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement); uint32_t key_size = (key_count + 1) * sizeof(Lookup_Keyelement);
@ -128,42 +129,47 @@ void DHT_Wrapper::fillDHT(const WorkPackage &work_package) {
dht_results.filledDHT = std::vector<bool>(length, false); dht_results.filledDHT = std::vector<bool>(length, false);
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
// If true grid cell was simulated, needs to be inserted into dht // If true grid cell was simulated, needs to be inserted into dht
if (work_package.mapping[i] == CHEM_PQC) { if (work_package.mapping[i] != CHEM_PQC) {
continue;
// check if calcite or dolomite is absent and present, resp.n and vice
// versa in input/output. If this is the case -> Do not write to DHT!
// HACK: hardcoded, should be fixed!
if (hooks.dht_fill.isValid()) {
NamedVector<double> old_values(output_names, work_package.input[i]);
NamedVector<double> new_values(output_names, work_package.output[i]);
if (hooks.dht_fill(old_values, new_values)) {
continue;
}
}
uint32_t proc, index;
auto &key = dht_results.keys[i];
const auto data =
(with_interp ? outputToInputAndRates(work_package.input[i],
work_package.output[i])
: work_package.output[i]);
// void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, key.data(),
const_cast<double *>(data.data()), &proc, &index);
dht_results.locations[i] = {proc, index};
// if data was successfully written ...
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
dht_evictions++;
}
dht_results.filledDHT[i] = true;
} }
if (work_package.input[i][0] != 2) {
continue;
}
// check if calcite or dolomite is absent and present, resp.n and vice
// versa in input/output. If this is the case -> Do not write to DHT!
// HACK: hardcoded, should be fixed!
if (hooks.dht_fill.isValid()) {
NamedVector<double> old_values(output_names, work_package.input[i]);
NamedVector<double> new_values(output_names, work_package.output[i]);
if (Rcpp::as<bool>(hooks.dht_fill(old_values, new_values))) {
continue;
}
}
uint32_t proc, index;
auto &key = dht_results.keys[i];
const auto data =
(with_interp ? outputToInputAndRates(work_package.input[i],
work_package.output[i])
: work_package.output[i]);
// void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, key.data(),
const_cast<double *>(data.data()), &proc, &index);
dht_results.locations[i] = {proc, index};
// if data was successfully written ...
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
dht_evictions++;
}
dht_results.filledDHT[i] = true;
} }
} }
@ -270,7 +276,7 @@ LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
const std::vector<double> eval_vec = const std::vector<double> eval_vec =
Rcpp::as<std::vector<double>>(hooks.dht_fuzz(input_nv)); Rcpp::as<std::vector<double>>(hooks.dht_fuzz(input_nv));
assert(eval_vec.size() == this->key_count); assert(eval_vec.size() == this->key_count);
LookupKey vecFuzz(this->key_count + 1, {.0}); LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
DHT_Rounder rounder; DHT_Rounder rounder;
@ -290,6 +296,9 @@ LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
} }
// add timestep to the end of the key as double value // add timestep to the end of the key as double value
vecFuzz[this->key_count].fp_element = dt; vecFuzz[this->key_count].fp_element = dt;
if (has_het_ids) {
vecFuzz[this->key_count + 1].fp_element = cell[0];
}
return vecFuzz; return vecFuzz;
} }
@ -297,7 +306,7 @@ LookupKey DHT_Wrapper::fuzzForDHT_R(const std::vector<double> &cell,
LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt) { LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt) {
const auto c_zero_val = std::pow(10, AQUEOUS_EXP); const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
LookupKey vecFuzz(this->key_count + 1, {.0}); LookupKey vecFuzz(this->key_count + 1 + has_het_ids, {.0});
DHT_Rounder rounder; DHT_Rounder rounder;
int totals_i = 0; int totals_i = 0;
@ -323,6 +332,9 @@ LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt) {
} }
// add timestep to the end of the key as double value // add timestep to the end of the key as double value
vecFuzz[this->key_count].fp_element = dt; vecFuzz[this->key_count].fp_element = dt;
if (has_het_ids) {
vecFuzz[this->key_count + 1].fp_element = cell[0];
}
return vecFuzz; return vecFuzz;
} }

View File

@ -87,7 +87,7 @@ public:
const std::vector<std::int32_t> &key_indices, const std::vector<std::int32_t> &key_indices,
const std::vector<std::string> &output_names, const std::vector<std::string> &output_names,
const InitialList::ChemistryHookFunctions &hooks, const InitialList::ChemistryHookFunctions &hooks,
uint32_t data_count, bool with_interp); uint32_t data_count, bool with_interp, bool has_het_ids);
/** /**
* @brief Destroy the dht wrapper object * @brief Destroy the dht wrapper object
* *
@ -264,6 +264,7 @@ private:
DHT_ResultObject dht_results; DHT_ResultObject dht_results;
std::array<double, 2> base_totals{0}; std::array<double, 2> base_totals{0};
bool has_het_ids{false};
}; };
} // namespace poet } // namespace poet

View File

@ -166,6 +166,8 @@ public:
enum result_status { RES_OK, INSUFFICIENT_DATA, NOT_NEEDED }; enum result_status { RES_OK, INSUFFICIENT_DATA, NOT_NEEDED };
DHT *getDHTObject() { return this->pht->getDHTObject(); }
struct InterpolationResult { struct InterpolationResult {
std::vector<std::vector<double>> results; std::vector<std::vector<double>> results;
std::vector<result_status> status; std::vector<result_status> status;
@ -261,6 +263,8 @@ private:
const InitialList::ChemistryHookFunctions &hooks; const InitialList::ChemistryHookFunctions &hooks;
const std::vector<std::string> &out_names; const std::vector<std::string> &out_names;
const std::vector<std::string> dht_names; const std::vector<std::string> dht_names;
std::unordered_map<int, std::vector<std::int32_t>> to_calc_cache;
}; };
} // namespace poet } // namespace poet

View File

@ -14,6 +14,7 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cassert> #include <cassert>
#include <cmath>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <functional> #include <functional>
@ -75,6 +76,11 @@ void InterpolationModule::tryInterpolation(WorkPackage &work_package) {
const auto dht_results = this->dht_instance.getDHTResults(); const auto dht_results = this->dht_instance.getDHTResults();
for (int wp_i = 0; wp_i < work_package.size; wp_i++) { for (int wp_i = 0; wp_i < work_package.size; wp_i++) {
if (work_package.input[wp_i][0] != 2) {
interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue;
}
if (work_package.mapping[wp_i] != CHEM_PQC) { if (work_package.mapping[wp_i] != CHEM_PQC) {
interp_result.status[wp_i] = NOT_NEEDED; interp_result.status[wp_i] = NOT_NEEDED;
continue; continue;
@ -116,15 +122,30 @@ void InterpolationModule::tryInterpolation(WorkPackage &work_package) {
this->pht->incrementReadCounter(roundKey(rounded_key)); this->pht->incrementReadCounter(roundKey(rounded_key));
#endif #endif
const int cell_id = static_cast<int>(work_package.input[wp_i][0]);
if (!to_calc_cache.contains(cell_id)) {
const std::vector<std::int32_t> &to_calc = dht_instance.getKeyElements();
std::vector<std::int32_t> keep_indices;
for (std::size_t i = 0; i < to_calc.size(); i++) {
if (!std::isnan(work_package.input[wp_i][to_calc[i]])) {
keep_indices.push_back(to_calc[i]);
}
}
to_calc_cache[cell_id] = keep_indices;
}
double start_fc = MPI_Wtime(); double start_fc = MPI_Wtime();
work_package.output[wp_i] = work_package.output[wp_i] =
f_interpolate(dht_instance.getKeyElements(), work_package.input[wp_i], f_interpolate(to_calc_cache[cell_id], work_package.input[wp_i],
pht_result.in_values, pht_result.out_values); pht_result.in_values, pht_result.out_values);
if (hooks.interp_post.isValid()) { if (hooks.interp_post.isValid()) {
NamedVector<double> nv_result(this->out_names, work_package.output[wp_i]); NamedVector<double> nv_result(this->out_names, work_package.output[wp_i]);
if (hooks.interp_post(nv_result)) { if (Rcpp::as<bool>(hooks.interp_post(nv_result))) {
interp_result.status[wp_i] = INSUFFICIENT_DATA; interp_result.status[wp_i] = INSUFFICIENT_DATA;
continue; continue;
} }

View File

@ -10,9 +10,21 @@
namespace poet { namespace poet {
constexpr std::int8_t SC_NOTATION_EXPONENT_MASK = -128;
constexpr std::int64_t SC_NOTATION_SIGNIFICANT_MASK = 0xFFFFFFFFFFFF;
struct Lookup_SC_notation { struct Lookup_SC_notation {
std::int8_t exp : 8; std::int8_t exp : 8;
std::int64_t significant : 56; std::int64_t significant : 56;
constexpr static Lookup_SC_notation nan() {
return {SC_NOTATION_EXPONENT_MASK, SC_NOTATION_SIGNIFICANT_MASK};
}
constexpr bool isnan() const {
return !!(exp == SC_NOTATION_EXPONENT_MASK &&
significant == SC_NOTATION_SIGNIFICANT_MASK);
}
}; };
union Lookup_Keyelement { union Lookup_Keyelement {
@ -23,6 +35,10 @@ union Lookup_Keyelement {
return std::memcmp(this, &other, sizeof(Lookup_Keyelement)) == 0 ? true return std::memcmp(this, &other, sizeof(Lookup_Keyelement)) == 0 ? true
: false; : false;
} }
template <typename T> bool operator>(const T &other) const {
return this->sc_notation.significant > other;
}
}; };
class LookupKey : public std::vector<Lookup_Keyelement> { class LookupKey : public std::vector<Lookup_Keyelement> {

View File

@ -20,6 +20,11 @@ class DHT_Rounder {
public: public:
Lookup_Keyelement round(const double &value, std::uint32_t signif, Lookup_Keyelement round(const double &value, std::uint32_t signif,
bool is_ho) { bool is_ho) {
if (std::isnan(value)) {
return {.sc_notation = Lookup_SC_notation::nan()};
}
std::int8_t exp = std::int8_t exp =
static_cast<std::int8_t>(std::floor(std::log10(std::fabs(value)))); static_cast<std::int8_t>(std::floor(std::log10(std::fabs(value))));
@ -60,6 +65,14 @@ public:
std::uint32_t signif) { std::uint32_t signif) {
Lookup_Keyelement new_val = value; Lookup_Keyelement new_val = value;
if (value.sc_notation.isnan()) {
return {.sc_notation = Lookup_SC_notation::nan()};
}
if (signif == 0) {
return {.sc_notation = {0, value > 0}};
}
std::uint32_t diff_signif = std::uint32_t diff_signif =
static_cast<std::uint32_t>( static_cast<std::uint32_t>(
std::ceil(std::log10(std::abs(value.sc_notation.significant)))) - std::ceil(std::log10(std::abs(value.sc_notation.significant)))) -

View File

@ -9,7 +9,6 @@
#include <cstdint> #include <cstdint>
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
#include <map>
#include <mpi.h> #include <mpi.h>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
@ -48,13 +47,15 @@ void poet::ChemistryModule::WorkerLoop() {
case CHEM_FIELD_INIT: { case CHEM_FIELD_INIT: {
ChemBCast(&this->prop_count, 1, MPI_UINT32_T); ChemBCast(&this->prop_count, 1, MPI_UINT32_T);
if (this->ai_surrogate_enabled) { if (this->ai_surrogate_enabled) {
this->ai_surrogate_validity_vector.resize(this->n_cells); // resize statt reserve? this->ai_surrogate_validity_vector.resize(
this->n_cells); // resize statt reserve?
} }
break; break;
} }
case CHEM_AI_BCAST_VALIDITY: { case CHEM_AI_BCAST_VALIDITY: {
// Receive the index vector of valid ai surrogate predictions // Receive the index vector of valid ai surrogate predictions
MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells, MPI_INT, 0, this->group_comm); MPI_Bcast(&this->ai_surrogate_validity_vector.front(), this->n_cells,
MPI_INT, 0, this->group_comm);
break; break;
} }
case CHEM_WORK_LOOP: { case CHEM_WORK_LOOP: {
@ -161,15 +162,6 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
mpi_buffer.begin() + this->prop_count * (wp_i + 1)); mpi_buffer.begin() + this->prop_count * (wp_i + 1));
} }
if (this->ai_surrogate_enabled) {
// Map valid predictions from the ai surrogate in the workpackage
for (int i = 0; i < s_curr_wp.size; i++) {
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) {
s_curr_wp.mapping[i] = CHEM_AISURR;
}
}
}
// std::cout << this->comm_rank << ":" << counter++ << std::endl; // std::cout << this->comm_rank << ":" << counter++ << std::endl;
if (dht_enabled || interp_enabled) { if (dht_enabled || interp_enabled) {
dht->prepareKeys(s_curr_wp.input, dt); dht->prepareKeys(s_curr_wp.input, dt);
@ -187,6 +179,15 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
interp->tryInterpolation(s_curr_wp); interp->tryInterpolation(s_curr_wp);
} }
if (this->ai_surrogate_enabled) {
// Map valid predictions from the ai surrogate in the workpackage
for (int i = 0; i < s_curr_wp.size; i++) {
if (this->ai_surrogate_validity_vector[wp_start_index + i] == 1) {
s_curr_wp.mapping[i] = CHEM_AISURR;
}
}
}
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
WorkerRunWorkPackage(s_curr_wp, current_sim_time, dt); WorkerRunWorkPackage(s_curr_wp, current_sim_time, dt);
@ -245,6 +246,17 @@ void poet::ChemistryModule::WorkerPostIter(MPI_Status &prope_status,
<< std::setw(this->file_pad) << iteration << ".pht"; << std::setw(this->file_pad) << iteration << ".pht";
interp->dumpPHTState(out.str()); interp->dumpPHTState(out.str());
} }
const auto max_mean_idx =
DHT_get_used_idx_factor(this->interp->getDHTObject(), 1);
if (max_mean_idx >= 2) {
DHT_flush(this->interp->getDHTObject());
DHT_flush(this->dht->getDHT());
if (this->comm_rank == 2) {
std::cout << "Flushed both DHT and PHT!\n\n";
}
}
} }
RInsidePOET::getInstance().parseEvalQ("gc()"); RInsidePOET::getInstance().parseEvalQ("gc()");
@ -299,28 +311,19 @@ void poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
double dSimTime, double dSimTime,
double dTimestep) { double dTimestep) {
std::vector<std::vector<double>> inout_chem = work_package.input;
std::vector<std::size_t> to_ignore;
for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) { for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
if (work_package.mapping[wp_id] != CHEM_PQC) { if (work_package.mapping[wp_id] != CHEM_PQC) {
continue; to_ignore.push_back(wp_id);
} }
}
this->pqc_runner->run(inout_chem, dTimestep, to_ignore);
auto curr_input = work_package.input[wp_id]; for (std::size_t wp_id = 0; wp_id < work_package.size; wp_id++) {
const auto pqc_id = static_cast<int>(curr_input[0]); if (work_package.mapping[wp_id] == CHEM_PQC) {
work_package.output[wp_id] = inout_chem[wp_id];
auto &phreeqc_instance = this->phreeqc_instances[pqc_id];
work_package.output[wp_id] = work_package.input[wp_id];
curr_input.erase(std::remove_if(curr_input.begin(), curr_input.end(),
[](double d) { return std::isnan(d); }),
curr_input.end());
phreeqc_instance->runCell(curr_input, dTimestep);
std::size_t output_index = 0;
for (std::size_t i = 0; i < work_package.output[wp_id].size(); i++) {
if (!std::isnan(work_package.output[wp_id][i])) {
work_package.output[wp_id][i] = curr_input[output_index++];
}
} }
} }
} }

View File

@ -2,16 +2,14 @@
#include <Rcpp.h> #include <Rcpp.h>
#include <cstddef> #include <cstddef>
#include <fstream>
#include <string>
#include <utility>
#include <vector> #include <vector>
namespace poet { namespace poet {
void InitialList::initChemistry(const Rcpp::List &chem) { void InitialList::initChemistry(const Rcpp::List &chem) {
this->pqc_solutions = std::vector<std::string>(
this->transport_names.begin() + 3, this->transport_names.end());
this->pqc_solution_primaries = this->phreeqc->getSolutionPrimaries();
if (chem.containsElementNamed("dht_species")) { if (chem.containsElementNamed("dht_species")) {
this->dht_species = Rcpp::as<NamedVector<uint32_t>>(chem["dht_species"]); this->dht_species = Rcpp::as<NamedVector<uint32_t>>(chem["dht_species"]);
} }
@ -51,7 +49,7 @@ void InitialList::initChemistry(const Rcpp::List &chem) {
// Get base path // Get base path
ai_surrogate_input_script_path = ai_surrogate_input_script_path.substr(0, ai_surrogate_input_script_path.find_last_of('/') + 1); ai_surrogate_input_script_path = ai_surrogate_input_script_path.substr(0, ai_surrogate_input_script_path.find_last_of('/') + 1);
// Add the filepath as a global variable in R to enable relative filepaths in the R script // Add the filepath as a global variable in R to enable relative filepaths in the R script
fileContent.insert(0, "ai_surrogate_base_path <- \"" + ai_surrogate_input_script_path + "\"\n"); fileContent += "\nai_surrogate_base_path <- \"" + ai_surrogate_input_script_path + "\"";
this->ai_surrogate_input_script = fileContent; this->ai_surrogate_input_script = fileContent;
} }
@ -69,21 +67,16 @@ InitialList::ChemistryInit InitialList::getChemistryInit() const {
// chem_init.field_header = this->field_header; // chem_init.field_header = this->field_header;
chem_init.database = database; chem_init.database = database;
chem_init.pqc_script = pqc_script;
chem_init.pqc_ids = pqc_ids;
chem_init.with_h0_o0 = with_h0_o0;
chem_init.with_redox = with_redox;
// chem_init.pqc_scripts = pqc_scripts; // chem_init.pqc_scripts = pqc_scripts;
// chem_init.pqc_ids = pqc_ids; // chem_init.pqc_ids = pqc_ids;
for (std::size_t i = 0; i < pqc_scripts.size(); i++) { // for (std::size_t i = 0; i < pqc_ids.size(); ++i) {
POETInitCell cell = { // chem_init.pqc_input[pqc_ids[i]] = pqc_scripts[i];
pqc_solutions, // }
pqc_solution_primaries,
Rcpp::as<std::vector<std::string>>(pqc_exchanger[i]),
Rcpp::as<std::vector<std::string>>(pqc_kinetics[i]),
Rcpp::as<std::vector<std::string>>(pqc_equilibrium[i]),
Rcpp::as<std::vector<std::string>>(pqc_surface_comps[i]),
Rcpp::as<std::vector<std::string>>(pqc_surface_charges[i])};
chem_init.pqc_config[pqc_ids[i]] = {database, pqc_scripts[i], cell};
}
// chem_init.pqc_sol_order = pqc_solutions; // chem_init.pqc_sol_order = pqc_solutions;

View File

@ -1,4 +1,3 @@
#include <algorithm>
#include <tug/Boundary.hpp> #include <tug/Boundary.hpp>
// leave above Rcpp includes, as eigen seem to have problems with a preceding // leave above Rcpp includes, as eigen seem to have problems with a preceding
// Rcpp include // Rcpp include
@ -8,7 +7,7 @@
#include "DataStructures/Field.hpp" #include "DataStructures/Field.hpp"
#include "InitialList.hpp" #include "InitialList.hpp"
#include "PhreeqcInit.hpp" #include "PhreeqcMatrix.hpp"
#include <Rcpp/Function.h> #include <Rcpp/Function.h>
#include <Rcpp/proxy/ProtectedProxy.h> #include <Rcpp/proxy/ProtectedProxy.h>
@ -53,98 +52,102 @@ static std::vector<TugType> colMajToRowMaj(const Rcpp::NumericVector &vec,
} }
} }
static std::vector<std::string> // static std::vector<std::string>
extend_transport_names(std::unique_ptr<PhreeqcInit> &phreeqc, // extend_transport_names(std::unique_ptr<PhreeqcInit> &phreeqc,
const Rcpp::List &boundaries_list, // const Rcpp::List &boundaries_list,
const Rcpp::List &inner_boundaries, // const Rcpp::List &inner_boundaries,
const std::vector<std::string> &old_trans_names) { // const std::vector<std::string> &old_trans_names) {
std::vector<std::string> transport_names = old_trans_names; // std::vector<std::string> transport_names = old_trans_names;
std::set<int> constant_pqc_ids; // std::set<int> constant_pqc_ids;
for (const auto &side : tug_side_mapping) { // for (const auto &side : tug_side_mapping) {
if (!boundaries_list.containsElementNamed(side.second.c_str())) { // if (!boundaries_list.containsElementNamed(side.second.c_str())) {
continue; // continue;
} // }
Rcpp::List mapping = boundaries_list[side.second]; // Rcpp::List mapping = boundaries_list[side.second];
const Rcpp::NumericVector cells = mapping["cell"]; // const Rcpp::NumericVector cells = mapping["cell"];
const Rcpp::NumericVector values = mapping["sol_id"]; // const Rcpp::NumericVector values = mapping["sol_id"];
const Rcpp::CharacterVector type_str = mapping["type"]; // const Rcpp::CharacterVector type_str = mapping["type"];
if (cells.size() != values.size()) { // if (cells.size() != values.size()) {
throw std::runtime_error("Boundary [" + side.second + // throw std::runtime_error("Boundary [" + side.second +
"] cells and values are not the same " // "] cells and values are not the same "
"length"); // "length");
} // }
for (auto i = 0; i < cells.size(); i++) { // for (auto i = 0; i < cells.size(); i++) {
if (type_str[i] == "constant") { // if (type_str[i] == "constant") {
constant_pqc_ids.insert(static_cast<std::size_t>(values[i])); // constant_pqc_ids.insert(static_cast<std::size_t>(values[i]));
} // }
} // }
} // }
if (inner_boundaries.size() > 0) { // if (inner_boundaries.size() > 0) {
const Rcpp::NumericVector values = inner_boundaries["sol_id"]; // const Rcpp::NumericVector values = inner_boundaries["sol_id"];
for (auto i = 0; i < values.size(); i++) { // for (auto i = 0; i < values.size(); i++) {
constant_pqc_ids.insert(static_cast<std::size_t>(values[i])); // constant_pqc_ids.insert(static_cast<std::size_t>(values[i]));
} // }
} // }
if (!constant_pqc_ids.empty()) { // if (!constant_pqc_ids.empty()) {
constexpr std::size_t keep_h_o_charge = 3; // constexpr std::size_t keep_h_o_charge = 3;
for (const auto &pqc_id : constant_pqc_ids) { // for (const auto &pqc_id : constant_pqc_ids) {
const auto solution_names = phreeqc->getSolutionNames(pqc_id); // const auto solution_names = phreeqc->getSolutionNames(pqc_id);
// add those strings which are not already in the transport_names // // add those strings which are not already in the transport_names
for (const auto &name : solution_names) { // for (const auto &name : solution_names) {
if (std::find(transport_names.begin(), transport_names.end(), name) == // if (std::find(transport_names.begin(), transport_names.end(), name)
transport_names.end()) { // ==
auto position = // transport_names.end()) {
std::lower_bound(transport_names.begin() + keep_h_o_charge, // auto position =
transport_names.end(), name); // std::lower_bound(transport_names.begin() + keep_h_o_charge,
// transport_names.end(), name);
transport_names.insert(position, name); // transport_names.insert(position, name);
} // }
} // }
} // }
} // }
return transport_names; // return transport_names;
} // }
static Rcpp::List // static Rcpp::List
extend_initial_grid(const Rcpp::List &initial_grid, // extend_initial_grid(const Rcpp::List &initial_grid,
const std::vector<std::string> &transport_names) { // const std::vector<std::string> &transport_names) {
// std::vector<std::string> names_to_add(transport_names.begin() + old_size, // // std::vector<std::string> names_to_add(transport_names.begin() +
// transport_names.end()); // old_size,
// // transport_names.end());
Rcpp::Function extend_grid_R("add_missing_transport_species"); // Rcpp::Function extend_grid_R("add_missing_transport_species");
return extend_grid_R(initial_grid, Rcpp::wrap(transport_names)); // return extend_grid_R(initial_grid, Rcpp::wrap(transport_names));
} // }
std::pair<Rcpp::List, Rcpp::List> std::pair<Rcpp::List, Rcpp::List>
InitialList::resolveBoundaries(const Rcpp::List &boundaries_list, InitialList::resolveBoundaries(const Rcpp::List &boundaries_list,
const Rcpp::List &inner_boundaries) { const Rcpp::List &inner_boundaries,
const PhreeqcMatrix &phreeqc_mat) {
Rcpp::List bound_list; Rcpp::List bound_list;
Rcpp::List inner_bound; Rcpp::List inner_bound;
Rcpp::Function resolve_R("resolve_pqc_bound"); Rcpp::Function resolve_R("resolve_pqc_bound");
const std::size_t old_transport_size = this->transport_names.size(); // const std::size_t old_transport_size = this->transport_names.size();
this->transport_names = extend_transport_names( // this->transport_names = extend_transport_names(
this->phreeqc, boundaries_list, inner_boundaries, this->transport_names); // this->phreeqc, boundaries_list, inner_boundaries,
// this->transport_names);
const std::size_t new_transport_size = this->transport_names.size(); // const std::size_t new_transport_size = this->transport_names.size();
if (old_transport_size != new_transport_size) { // if (old_transport_size != new_transport_size) {
this->initial_grid = // this->initial_grid =
extend_initial_grid(this->initial_grid, this->transport_names); // extend_initial_grid(this->initial_grid, this->transport_names);
} // }
for (const auto &species_name : this->transport_names) { for (const auto &species_name : this->transport_names) {
Rcpp::List spec_list; Rcpp::List spec_list;
@ -179,8 +182,11 @@ InitialList::resolveBoundaries(const Rcpp::List &boundaries_list,
c_type[c_id] = tug::BC_TYPE_CLOSED; c_type[c_id] = tug::BC_TYPE_CLOSED;
} else if (type_str[i] == "constant") { } else if (type_str[i] == "constant") {
c_type[c_id] = tug::BC_TYPE_CONSTANT; c_type[c_id] = tug::BC_TYPE_CONSTANT;
c_value[c_id] = Rcpp::as<TugType>( c_value[c_id] =
resolve_R(this->phreeqc_mat, Rcpp::wrap(species_name), values[i])); static_cast<TugType>(phreeqc_mat(values[i], species_name));
// c_value[c_id] = Rcpp::as<TugType>(
// resolve_R(this->phreeqc_mat, Rcpp::wrap(species_name),
// values[i]));
} else { } else {
throw std::runtime_error("Unknown boundary type"); throw std::runtime_error("Unknown boundary type");
} }
@ -211,8 +217,11 @@ InitialList::resolveBoundaries(const Rcpp::List &boundaries_list,
for (std::size_t i = 0; i < inner_row_vec.size(); i++) { for (std::size_t i = 0; i < inner_row_vec.size(); i++) {
rows.push_back(inner_row_vec[i] - 1); rows.push_back(inner_row_vec[i] - 1);
cols.push_back(inner_col_vec[i] - 1); cols.push_back(inner_col_vec[i] - 1);
c_value.push_back(Rcpp::as<TugType>(resolve_R( c_value.push_back(static_cast<TugType>(
this->phreeqc_mat, Rcpp::wrap(species_name), inner_pqc_id_vec[i]))); phreeqc_mat(inner_pqc_id_vec[i], species_name)));
// c_value.push_back(Rcpp::as<TugType>(resolve_R(
// this->phreeqc_mat, Rcpp::wrap(species_name),
// inner_pqc_id_vec[i])));
} }
inner_bound[species_name] = inner_bound[species_name] =
@ -276,7 +285,8 @@ static Rcpp::List parseAlphas(const SEXP &input,
return out_list; return out_list;
} }
void InitialList::initDiffusion(const Rcpp::List &diffusion_input) { void InitialList::initDiffusion(const Rcpp::List &diffusion_input,
const PhreeqcMatrix &phreeqc) {
Rcpp::List boundaries; Rcpp::List boundaries;
Rcpp::List inner_boundaries; Rcpp::List inner_boundaries;
@ -298,7 +308,7 @@ void InitialList::initDiffusion(const Rcpp::List &diffusion_input) {
diffusion_input[DIFFU_MEMBER_STR(DiffusionMembers::ALPHA_Y)]; diffusion_input[DIFFU_MEMBER_STR(DiffusionMembers::ALPHA_Y)];
const auto resolved_boundaries = const auto resolved_boundaries =
resolveBoundaries(boundaries, inner_boundaries); resolveBoundaries(boundaries, inner_boundaries, phreeqc);
this->boundaries = resolved_boundaries.first; this->boundaries = resolved_boundaries.first;
this->inner_boundaries = resolved_boundaries.second; this->inner_boundaries = resolved_boundaries.second;

View File

@ -1,13 +1,19 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* GridInit.cpp - Implementation of grid initialization
* Copyright (C) 2024-2025 Max Luebke (University of Potsdam)
*/
#include "InitialList.hpp" #include "InitialList.hpp"
#include "PhreeqcInit.hpp" #include "PhreeqcMatrix.hpp"
#include <RInside.h> #include <RInside.h>
#include <Rcpp/Function.h> #include <Rcpp/Function.h>
#include <Rcpp/vector/Matrix.h>
#include <Rcpp/vector/instantiation.h> #include <Rcpp/vector/instantiation.h>
#include <cstdint> #include <fstream>
#include <map> #include <map>
#include <memory>
#include <regex> #include <regex>
#include <sstream> #include <sstream>
#include <string> #include <string>
@ -15,38 +21,6 @@
namespace poet { namespace poet {
static Rcpp::NumericMatrix
pqcScriptToGrid(std::unique_ptr<PhreeqcInit> &phreeqc, RInside &R) {
PhreeqcInit::PhreeqcMat phreeqc_mat = phreeqc->getPhreeqcMat();
// add "id" to the front of the column names
const std::size_t ncols = phreeqc_mat.names.size();
const std::size_t nrows = phreeqc_mat.values.size();
phreeqc_mat.names.insert(phreeqc_mat.names.begin(), "ID");
Rcpp::NumericMatrix mat(nrows, ncols + 1);
for (std::size_t i = 0; i < nrows; i++) {
mat(i, 0) = phreeqc_mat.ids[i];
for (std::size_t j = 0; j < ncols; ++j) {
mat(i, j + 1) = phreeqc_mat.values[i][j];
}
}
Rcpp::colnames(mat) = Rcpp::wrap(phreeqc_mat.names);
return mat;
}
static inline Rcpp::List matToGrid(RInside &R, const Rcpp::NumericMatrix &mat,
const Rcpp::NumericMatrix &grid) {
Rcpp::Function pqc_to_grid_R("pqc_to_grid");
return pqc_to_grid_R(mat, grid);
}
static inline std::map<int, std::string> static inline std::map<int, std::string>
replaceRawKeywordIDs(std::map<int, std::string> raws) { replaceRawKeywordIDs(std::map<int, std::string> raws) {
for (auto &raw : raws) { for (auto &raw : raws) {
@ -60,25 +34,6 @@ replaceRawKeywordIDs(std::map<int, std::string> raws) {
return raws; return raws;
} }
static inline uint32_t getSolutionCount(std::unique_ptr<PhreeqcInit> &phreeqc,
const Rcpp::List &initial_grid) {
PhreeqcInit::ModulesArray mod_array;
Rcpp::Function unique_R("unique");
std::vector<int> row_ids =
Rcpp::as<std::vector<int>>(unique_R(initial_grid["ID"]));
// std::vector<std::uint32_t> sizes_vec(sizes.begin(), sizes.end());
// Rcpp::Function modify_sizes("modify_module_sizes");
// sizes_vec = Rcpp::as<std::vector<std::uint32_t>>(
// modify_sizes(sizes_vec, phreeqc_mat, initial_grid));
// std::copy(sizes_vec.begin(), sizes_vec.end(), sizes.begin());
return phreeqc->getModuleSizes(row_ids)[POET_SOL];
}
static std::string readFile(const std::string &path) { static std::string readFile(const std::string &path) {
std::string string_rpath(PATH_MAX, '\0'); std::string string_rpath(PATH_MAX, '\0');
@ -98,10 +53,26 @@ static std::string readFile(const std::string &path) {
return buffer.str(); return buffer.str();
} }
void InitialList::prepareGrid(const Rcpp::List &grid_input) { static Rcpp::List expandGrid(const PhreeqcMatrix &pqc_mat,
// parse input values const std::vector<int> unique_ids,
Rcpp::Function unique_R("unique"); const Rcpp::IntegerMatrix &grid_def) {
PhreeqcMatrix subset_pqc_mat = pqc_mat.subset(unique_ids);
PhreeqcMatrix::STLExport phreeqc_mat =
subset_pqc_mat.get(PhreeqcMatrix::VectorExportType::COLUMN_MAJOR);
const std::size_t ncols = phreeqc_mat.names.size();
const std::size_t nrows = phreeqc_mat.values.size() / ncols;
Rcpp::NumericMatrix pqc_mat_R(nrows, ncols, phreeqc_mat.values.begin());
Rcpp::colnames(pqc_mat_R) = Rcpp::wrap(phreeqc_mat.names);
return Rcpp::Function("pqc_to_grid")(pqc_mat_R, grid_def);
}
PhreeqcMatrix InitialList::prepareGrid(const Rcpp::List &grid_input) {
// parse input values
std::string script; std::string script;
std::string database; std::string database;
@ -126,8 +97,9 @@ void InitialList::prepareGrid(const Rcpp::List &grid_input) {
} }
this->database = database; this->database = database;
this->pqc_script = script;
Rcpp::NumericMatrix grid_def = Rcpp::IntegerMatrix grid_def =
grid_input[GRID_MEMBER_STR(GridMembers::GRID_DEF)]; grid_input[GRID_MEMBER_STR(GridMembers::GRID_DEF)];
Rcpp::NumericVector grid_size = Rcpp::NumericVector grid_size =
grid_input[GRID_MEMBER_STR(GridMembers::GRID_SIZE)]; grid_input[GRID_MEMBER_STR(GridMembers::GRID_SIZE)];
@ -156,35 +128,77 @@ void InitialList::prepareGrid(const Rcpp::List &grid_input) {
throw std::runtime_error("Grid size must be positive."); throw std::runtime_error("Grid size must be positive.");
} }
this->phreeqc = std::make_unique<PhreeqcInit>(database, script); bool with_redox =
grid_input.containsElementNamed(
GRID_MEMBER_STR(GridMembers::PQC_WITH_REDOX))
? Rcpp::as<bool>(
grid_input[GRID_MEMBER_STR(GridMembers::PQC_WITH_REDOX)])
: false;
this->phreeqc_mat = pqcScriptToGrid(phreeqc, R); bool with_h0_o0 =
this->initial_grid = matToGrid(R, this->phreeqc_mat, grid_def); grid_input.containsElementNamed(
GRID_MEMBER_STR(GridMembers::PQC_WITH_H0_O0))
? Rcpp::as<bool>(
grid_input[GRID_MEMBER_STR(GridMembers::PQC_WITH_H0_O0)])
: false;
const uint32_t solution_count = getSolutionCount(phreeqc, this->initial_grid); if (with_h0_o0 && !with_redox) {
throw std::runtime_error(
std::vector<std::string> colnames = "Output of H(0) and O(0) can only be used with redox.");
Rcpp::as<std::vector<std::string>>(this->initial_grid.names());
this->transport_names = std::vector<std::string>(
colnames.begin() + 1,
colnames.begin() + 1 + solution_count); // skip ID
std::map<int, std::string> pqc_raw_dumps;
pqc_raw_dumps = replaceRawKeywordIDs(phreeqc->raw_dumps());
this->pqc_ids =
Rcpp::as<std::vector<int>>(unique_R(this->initial_grid["ID"]));
for (const auto &id : this->pqc_ids) {
this->pqc_scripts.push_back(pqc_raw_dumps[id]);
this->pqc_exchanger.push_back(phreeqc->getExchanger(id));
this->pqc_kinetics.push_back(phreeqc->getKineticsNames(id));
this->pqc_equilibrium.push_back(phreeqc->getEquilibriumNames(id));
this->pqc_surface_comps.push_back(phreeqc->getSurfaceCompNames(id));
this->pqc_surface_charges.push_back(phreeqc->getSurfaceChargeNames(id));
} }
this->with_h0_o0 = with_h0_o0;
this->with_redox = with_redox;
PhreeqcMatrix pqc_mat =
PhreeqcMatrix(database, script, with_h0_o0, with_redox);
this->transport_names = pqc_mat.getMatrixTransported();
Rcpp::Function unique_R("unique");
Rcpp::Function sort_R("sort");
std::vector<int> unique_ids = Rcpp::as<std::vector<int>>(
unique_R(Rcpp::IntegerVector(grid_def.begin(), grid_def.end())));
this->initial_grid = expandGrid(pqc_mat, unique_ids, grid_def);
const auto pqc_raw_dumps = replaceRawKeywordIDs(pqc_mat.getDumpStringsPQI());
for (const auto &id : unique_ids) {
this->pqc_ids.push_back(id);
}
return pqc_mat;
// this->phreeqc_mat = pqcScriptToGrid(phreeqc, R);
// this->initial_grid = matToGrid(R, this->phreeqc_mat, grid_def);
// const uint32_t solution_count = getSolutionCount(phreeqc,
// this->initial_grid);
// std::vector<std::string> colnames =
// Rcpp::as<std::vector<std::string>>(this->initial_grid.names());
// this->transport_names = std::vector<std::string>(
// colnames.begin() + 1,
// colnames.begin() + 1 + solution_count); // skip ID
// std::map<int, std::string> pqc_raw_dumps;
// pqc_raw_dumps = replaceRawKeywordIDs(phreeqc->raw_dumps());
// this->pqc_ids =
// Rcpp::as<std::vector<int>>(unique_R(this->initial_grid["ID"]));
// for (const auto &id : this->pqc_ids) {
// this->pqc_scripts.push_back(pqc_raw_dumps[id]);
// // this->pqc_exchanger.push_back(phreeqc->getExchanger(id));
// // this->pqc_kinetics.push_back(phreeqc->getKineticsNames(id));
// // this->pqc_equilibrium.push_back(phreeqc->getEquilibriumNames(id));
// // this->pqc_surface_comps.push_back(phreeqc->getSurfaceCompNames(id));
// //
// this->pqc_surface_charges.push_back(phreeqc->getSurfaceChargeNames(id));
// }
} }
} // namespace poet } // namespace poet

View File

@ -1,5 +1,6 @@
#include "InitialList.hpp" #include "InitialList.hpp"
#include "DataStructures/NamedVector.hpp" #include "DataStructures/NamedVector.hpp"
#include "PhreeqcMatrix.hpp"
#include <Rcpp/internal/wrap.h> #include <Rcpp/internal/wrap.h>
#include <Rcpp/iostream/Rstreambuf.h> #include <Rcpp/iostream/Rstreambuf.h>
#include <Rcpp/proxy/ProtectedProxy.h> #include <Rcpp/proxy/ProtectedProxy.h>
@ -10,8 +11,8 @@
namespace poet { namespace poet {
void InitialList::initializeFromList(const Rcpp::List &setup) { void InitialList::initializeFromList(const Rcpp::List &setup) {
prepareGrid(setup[grid_key]); PhreeqcMatrix phreeqc = prepareGrid(setup[grid_key]);
initDiffusion(setup[diffusion_key]); initDiffusion(setup[diffusion_key], phreeqc);
initChemistry(setup[chemistry_key]); initChemistry(setup[chemistry_key]);
} }
@ -53,26 +54,30 @@ void InitialList::importList(const Rcpp::List &setup, bool minimal) {
} }
this->database = this->database =
Rcpp::as<std::string>(setup[static_cast<int>(ExportList::CHEM_DATABASE)]); Rcpp::as<std::string>(setup[static_cast<int>(ExportList::CHEM_DATABASE)]);
this->pqc_script = Rcpp::as<std::string>(
setup[static_cast<int>(ExportList::CHEM_PQC_SCRIPT)]);
this->with_h0_o0 =
Rcpp::as<bool>(setup[static_cast<int>(ExportList::CHEM_PQC_WITH_H0_O0)]);
this->with_redox =
Rcpp::as<bool>(setup[static_cast<int>(ExportList::CHEM_PQC_WITH_REDOX)]);
this->field_header = Rcpp::as<std::vector<std::string>>( this->field_header = Rcpp::as<std::vector<std::string>>(
setup[static_cast<int>(ExportList::CHEM_FIELD_HEADER)]); setup[static_cast<int>(ExportList::CHEM_FIELD_HEADER)]);
this->pqc_scripts = Rcpp::as<std::vector<std::string>>(
setup[static_cast<int>(ExportList::CHEM_PQC_SCRIPTS)]);
this->pqc_ids = Rcpp::as<std::vector<int>>( this->pqc_ids = Rcpp::as<std::vector<int>>(
setup[static_cast<int>(ExportList::CHEM_PQC_IDS)]); setup[static_cast<int>(ExportList::CHEM_PQC_IDS)]);
this->pqc_solutions = Rcpp::as<std::vector<std::string>>( // this->pqc_solutions = Rcpp::as<std::vector<std::string>>(
setup[static_cast<int>(ExportList::CHEM_PQC_SOLUTIONS)]); // setup[static_cast<int>(ExportList::CHEM_PQC_SOLUTIONS)]);
this->pqc_solution_primaries = Rcpp::as<std::vector<std::string>>( // this->pqc_solution_primaries = Rcpp::as<std::vector<std::string>>(
setup[static_cast<int>(ExportList::CHEM_PQC_SOLUTION_PRIMARY)]); // setup[static_cast<int>(ExportList::CHEM_PQC_SOLUTION_PRIMARY)]);
this->pqc_exchanger = // this->pqc_exchanger =
Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_EXCHANGER)]); // Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_EXCHANGER)]);
this->pqc_kinetics = // this->pqc_kinetics =
Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_KINETICS)]); // Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_KINETICS)]);
this->pqc_equilibrium = // this->pqc_equilibrium =
Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_EQUILIBRIUM)]); // Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_EQUILIBRIUM)]);
this->pqc_surface_comps = // this->pqc_surface_comps =
Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_SURFACE_COMPS)]); // Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_SURFACE_COMPS)]);
this->pqc_surface_charges = // this->pqc_surface_charges =
Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_SURFACE_CHARGES)]); // Rcpp::List(setup[static_cast<int>(ExportList::CHEM_PQC_SURFACE_CHARGES)]);
this->dht_species = NamedVector<uint32_t>( this->dht_species = NamedVector<uint32_t>(
setup[static_cast<int>(ExportList::CHEM_DHT_SPECIES)]); setup[static_cast<int>(ExportList::CHEM_DHT_SPECIES)]);
@ -108,25 +113,29 @@ Rcpp::List InitialList::exportList() {
out[static_cast<int>(ExportList::DIFFU_ALPHA_Y)] = this->alpha_y; out[static_cast<int>(ExportList::DIFFU_ALPHA_Y)] = this->alpha_y;
out[static_cast<int>(ExportList::CHEM_DATABASE)] = Rcpp::wrap(this->database); out[static_cast<int>(ExportList::CHEM_DATABASE)] = Rcpp::wrap(this->database);
out[static_cast<int>(ExportList::CHEM_PQC_SCRIPT)] =
Rcpp::wrap(this->pqc_script);
out[static_cast<int>(ExportList::CHEM_PQC_WITH_H0_O0)] =
Rcpp::wrap(this->with_h0_o0);
out[static_cast<int>(ExportList::CHEM_PQC_WITH_REDOX)] =
Rcpp::wrap(this->with_redox);
out[static_cast<int>(ExportList::CHEM_FIELD_HEADER)] = out[static_cast<int>(ExportList::CHEM_FIELD_HEADER)] =
Rcpp::wrap(this->field_header); Rcpp::wrap(this->field_header);
out[static_cast<int>(ExportList::CHEM_PQC_SCRIPTS)] =
Rcpp::wrap(this->pqc_scripts);
out[static_cast<int>(ExportList::CHEM_PQC_IDS)] = Rcpp::wrap(this->pqc_ids); out[static_cast<int>(ExportList::CHEM_PQC_IDS)] = Rcpp::wrap(this->pqc_ids);
out[static_cast<int>(ExportList::CHEM_PQC_SOLUTIONS)] = // out[static_cast<int>(ExportList::CHEM_PQC_SOLUTIONS)] =
Rcpp::wrap(this->pqc_solutions); // Rcpp::wrap(this->pqc_solutions);
out[static_cast<int>(ExportList::CHEM_PQC_SOLUTION_PRIMARY)] = // out[static_cast<int>(ExportList::CHEM_PQC_SOLUTION_PRIMARY)] =
Rcpp::wrap(this->pqc_solution_primaries); // Rcpp::wrap(this->pqc_solution_primaries);
out[static_cast<int>(ExportList::CHEM_PQC_EXCHANGER)] = // out[static_cast<int>(ExportList::CHEM_PQC_EXCHANGER)] =
Rcpp::wrap(this->pqc_exchanger); // Rcpp::wrap(this->pqc_exchanger);
out[static_cast<int>(ExportList::CHEM_PQC_KINETICS)] = // out[static_cast<int>(ExportList::CHEM_PQC_KINETICS)] =
Rcpp::wrap(this->pqc_kinetics); // Rcpp::wrap(this->pqc_kinetics);
out[static_cast<int>(ExportList::CHEM_PQC_EQUILIBRIUM)] = // out[static_cast<int>(ExportList::CHEM_PQC_EQUILIBRIUM)] =
Rcpp::wrap(this->pqc_equilibrium); // Rcpp::wrap(this->pqc_equilibrium);
out[static_cast<int>(ExportList::CHEM_PQC_SURFACE_COMPS)] = // out[static_cast<int>(ExportList::CHEM_PQC_SURFACE_COMPS)] =
Rcpp::wrap(this->pqc_surface_comps); // Rcpp::wrap(this->pqc_surface_comps);
out[static_cast<int>(ExportList::CHEM_PQC_SURFACE_CHARGES)] = // out[static_cast<int>(ExportList::CHEM_PQC_SURFACE_CHARGES)] =
Rcpp::wrap(this->pqc_surface_charges); // Rcpp::wrap(this->pqc_surface_charges);
out[static_cast<int>(ExportList::CHEM_DHT_SPECIES)] = this->dht_species; out[static_cast<int>(ExportList::CHEM_DHT_SPECIES)] = this->dht_species;
out[static_cast<int>(ExportList::CHEM_INTERP_SPECIES)] = out[static_cast<int>(ExportList::CHEM_INTERP_SPECIES)] =
Rcpp::wrap(this->interp_species); Rcpp::wrap(this->interp_species);

View File

@ -2,7 +2,7 @@
#include "Base/RInsidePOET.hpp" #include "Base/RInsidePOET.hpp"
#include "DataStructures/NamedVector.hpp" #include "DataStructures/NamedVector.hpp"
#include "POETInit.hpp" #include "PhreeqcMatrix.hpp"
#include <Rcpp/vector/instantiation.h> #include <Rcpp/vector/instantiation.h>
#include <set> #include <set>
#include <tug/Boundary.hpp> #include <tug/Boundary.hpp>
@ -13,9 +13,7 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <PhreeqcInit.hpp>
#include <map> #include <map>
#include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -53,22 +51,17 @@ private:
DIFFU_ALPHA_X, DIFFU_ALPHA_X,
DIFFU_ALPHA_Y, DIFFU_ALPHA_Y,
CHEM_DATABASE, CHEM_DATABASE,
CHEM_FIELD_HEADER, CHEM_PQC_SCRIPT,
CHEM_PQC_SCRIPTS, CHEM_PQC_WITH_H0_O0,
CHEM_PQC_WITH_REDOX,
CHEM_PQC_IDS, CHEM_PQC_IDS,
CHEM_PQC_SOLUTIONS, CHEM_FIELD_HEADER,
CHEM_PQC_SOLUTION_PRIMARY,
CHEM_PQC_EXCHANGER,
CHEM_PQC_KINETICS,
CHEM_PQC_EQUILIBRIUM,
CHEM_PQC_SURFACE_COMPS,
CHEM_PQC_SURFACE_CHARGES,
CHEM_DHT_SPECIES, CHEM_DHT_SPECIES,
CHEM_INTERP_SPECIES, CHEM_INTERP_SPECIES,
CHEM_HOOKS, CHEM_HOOKS,
AI_SURROGATE_INPUT_SCRIPT, AI_SURROGATE_INPUT_SCRIPT,
ENUM_SIZE // Hack: Last element of the enum to show enum size ENUM_SIZE // Hack: Last element of the enum to show enum size
}; };
// Grid members // Grid members
static constexpr const char *grid_key = "Grid"; static constexpr const char *grid_key = "Grid";
@ -76,6 +69,8 @@ private:
enum class GridMembers { enum class GridMembers {
PQC_SCRIPT_STRING, PQC_SCRIPT_STRING,
PQC_SCRIPT_FILE, PQC_SCRIPT_FILE,
PQC_WITH_REDOX,
PQC_WITH_H0_O0,
PQC_DB_STRING, PQC_DB_STRING,
PQC_DB_FILE, PQC_DB_FILE,
GRID_DEF, GRID_DEF,
@ -89,17 +84,18 @@ private:
static_cast<std::size_t>(InitialList::GridMembers::ENUM_SIZE); static_cast<std::size_t>(InitialList::GridMembers::ENUM_SIZE);
static constexpr std::array<const char *, size_GridMembers> static constexpr std::array<const char *, size_GridMembers>
GridMembersString = {"pqc_in_string", "pqc_in_file", "pqc_db_string", GridMembersString = {"pqc_in_string", "pqc_in_file", "pqc_with_redox",
"pqc_db_file", "grid_def", "grid_size", "pqc_wth_h0_o0", "pqc_db_string", "pqc_db_file",
"constant_cells", "porosity"}; "grid_def", "grid_size", "constant_cells",
"porosity"};
constexpr const char *GRID_MEMBER_STR(GridMembers member) const { constexpr const char *GRID_MEMBER_STR(GridMembers member) const {
return GridMembersString[static_cast<std::size_t>(member)]; return GridMembersString[static_cast<std::size_t>(member)];
} }
std::unique_ptr<PhreeqcInit> phreeqc; // std::unique_ptr<PhreeqcMatrix> pqc_mat;
void prepareGrid(const Rcpp::List &grid_input); PhreeqcMatrix prepareGrid(const Rcpp::List &grid_input);
std::uint8_t dim{0}; std::uint8_t dim{0};
@ -114,9 +110,6 @@ private:
Rcpp::List initial_grid; Rcpp::List initial_grid;
// No export
Rcpp::NumericMatrix phreeqc_mat;
public: public:
struct DiffusionInit { struct DiffusionInit {
using BoundaryMap = using BoundaryMap =
@ -169,10 +162,12 @@ private:
return DiffusionMembersString[static_cast<std::size_t>(member)]; return DiffusionMembersString[static_cast<std::size_t>(member)];
} }
void initDiffusion(const Rcpp::List &diffusion_input); void initDiffusion(const Rcpp::List &diffusion_input,
const PhreeqcMatrix &phreeqc);
std::pair<Rcpp::List, Rcpp::List> std::pair<Rcpp::List, Rcpp::List>
resolveBoundaries(const Rcpp::List &boundaries_list, resolveBoundaries(const Rcpp::List &boundaries_list,
const Rcpp::List &inner_boundaries); const Rcpp::List &inner_boundaries,
const PhreeqcMatrix &phreeqc_mat);
Rcpp::List boundaries; Rcpp::List boundaries;
Rcpp::List inner_boundaries; Rcpp::List inner_boundaries;
@ -190,17 +185,12 @@ private:
std::vector<std::string> field_header; std::vector<std::string> field_header;
std::string database; std::string database;
std::vector<std::string> pqc_scripts; std::string pqc_script;
bool with_h0_o0{false};
bool with_redox{false};
// std::vector<std::string> pqc_scripts;
std::vector<int> pqc_ids; std::vector<int> pqc_ids;
std::vector<std::string> pqc_solutions;
std::vector<std::string> pqc_solution_primaries;
Rcpp::List pqc_exchanger;
Rcpp::List pqc_kinetics;
Rcpp::List pqc_equilibrium;
Rcpp::List pqc_surface_comps;
Rcpp::List pqc_surface_charges;
NamedVector<std::uint32_t> dht_species; NamedVector<std::uint32_t> dht_species;
NamedVector<std::uint32_t> interp_species; NamedVector<std::uint32_t> interp_species;
@ -227,10 +217,12 @@ public:
// std::vector<std::string> field_header; // std::vector<std::string> field_header;
std::string database; std::string database;
// std::vector<std::string> pqc_scripts; std::string pqc_script;
// std::vector<int> pqc_ids; bool with_h0_o0;
bool with_redox;
std::vector<int> pqc_ids;
std::map<int, POETConfig> pqc_config; // std::map<int, std::string> pqc_input;
// std::vector<std::string> pqc_sol_order; // std::vector<std::string> pqc_sol_order;

View File

@ -1,97 +1,48 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/* /*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of * DiffusionModule.cpp - Coupling module betweeen POET and tug
** Potsdam) * Copyright (C) 2018-2025 Max Luebke (University of Potsdam)
** */
** Copyright (C) 2018-2022 Marco De Lucia, Max Luebke (GFZ Potsdam)
**
** Copyright (C) 2023-2024 Marco De Lucia (GFZ Potsdam), Max Luebke (University
** of Potsdam)
**
** POET is free software; you can redistribute it and/or modify it under the
** terms of the GNU General Public License as published by the Free Software
** Foundation; either version 2 of the License, or (at your option) any later
** version.
**
** POET is distributed in the hope that it will be useful, but WITHOUT ANY
** WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
** A PARTICULAR PURPOSE. See the GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License along with
** this program; if not, write to the Free Software Foundation, Inc., 51
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "DiffusionModule.hpp" #include "DiffusionModule.hpp"
#include "Base/Macros.hpp" #include "Base/Macros.hpp"
#include "Init/InitialList.hpp" #include "Init/InitialList.hpp"
#include <Eigen/src/Core/Map.h>
#include <Eigen/src/Core/Matrix.h>
#include <Eigen/src/Core/util/Constants.h>
#include <chrono> #include <chrono>
#include "tug/Boundary.hpp"
#include "tug/Grid.hpp"
#include "tug/Simulation.hpp"
#include <string> #include <string>
#include <vector> #include <tug/Boundary.hpp>
#include <tug/Diffusion.hpp>
using namespace poet; using namespace poet;
static inline std::vector<TugType>
MatrixToVec(const Eigen::MatrixX<TugType> &mat) {
std::vector<TugType> vec(mat.rows() * mat.cols());
for (std::uint32_t i = 0; i < mat.cols(); i++) {
for (std::uint32_t j = 0; j < mat.rows(); j++) {
vec[j * mat.cols() + i] = mat(j, i);
}
}
return vec;
}
static inline Eigen::MatrixX<TugType>
VecToMatrix(const std::vector<TugType> &vec, std::uint32_t n_rows,
std::uint32_t n_cols) {
Eigen::MatrixX<TugType> mat(n_rows, n_cols);
for (std::uint32_t i = 0; i < n_cols; i++) {
for (std::uint32_t j = 0; j < n_rows; j++) {
mat(j, i) = vec[j * n_cols + i];
}
}
return mat;
}
// static constexpr double ZERO_MULTIPLICATOR = 10E-14;
void DiffusionModule::simulate(double requested_dt) { void DiffusionModule::simulate(double requested_dt) {
MSG("Starting diffusion ..."); // MSG("Starting diffusion ...");
const auto start_diffusion_t = std::chrono::high_resolution_clock::now(); const auto start_diffusion_t = std::chrono::high_resolution_clock::now();
const auto &n_rows = this->param_list.n_rows; const auto &n_rows = this->param_list.n_rows;
const auto &n_cols = this->param_list.n_cols; const auto &n_cols = this->param_list.n_cols;
tug::Grid<TugType> grid(param_list.n_rows, param_list.n_cols);
tug::Boundary boundary(grid);
grid.setDomain(param_list.s_rows, param_list.s_cols);
tug::Simulation<TugType> sim(grid, boundary);
sim.setIterations(1);
for (const auto &sol_name : this->param_list.transport_names) { for (const auto &sol_name : this->param_list.transport_names) {
auto &species_conc = this->transport_field[sol_name]; #if defined(POET_TUG_BTCS)
tug::Diffusion<TugType> diffusion_solver(
this->transport_field[sol_name].data(), n_rows, n_cols);
#elif defined(POET_TUG_FTCS)
tug::Diffusion<TugType, tug::FTCS_APPROACH> diffusion_solver(
this->transport_field[sol_name].data(), n_rows, n_cols);
#else
#error "No valid approach defined"
#endif
Eigen::MatrixX<TugType> conc = VecToMatrix(species_conc, n_rows, n_cols); tug::RowMajMatMap<TugType> alpha_x_map(
Eigen::MatrixX<TugType> alpha_x = this->param_list.alpha_x[sol_name].data(), n_rows, n_cols);
VecToMatrix(this->param_list.alpha_x[sol_name], n_rows, n_cols); tug::RowMajMatMap<TugType> alpha_y_map(
Eigen::MatrixX<TugType> alpha_y = this->param_list.alpha_y[sol_name].data(), n_rows, n_cols);
VecToMatrix(this->param_list.alpha_y[sol_name], n_rows, n_cols);
diffusion_solver.setAlphaX(alpha_x_map);
diffusion_solver.setAlphaY(alpha_y_map);
auto &boundary = diffusion_solver.getBoundaryConditions();
boundary.deserialize(this->param_list.boundaries[sol_name]); boundary.deserialize(this->param_list.boundaries[sol_name]);
@ -99,14 +50,8 @@ void DiffusionModule::simulate(double requested_dt) {
boundary.setInnerBoundaries(this->param_list.inner_boundaries[sol_name]); boundary.setInnerBoundaries(this->param_list.inner_boundaries[sol_name]);
} }
grid.setAlpha(alpha_x, alpha_y); diffusion_solver.setTimestep(requested_dt);
grid.setConcentrations(conc); diffusion_solver.run();
sim.setTimestep(requested_dt);
sim.run();
species_conc = MatrixToVec(grid.getConcentrations());
} }
const auto end_diffusion_t = std::chrono::high_resolution_clock::now(); const auto end_diffusion_t = std::chrono::high_resolution_clock::now();

View File

@ -34,9 +34,11 @@ int main(int argc, char **argv) {
"input script") "input script")
->default_val(false); ->default_val(false);
bool asRDS; bool asRDS{false};
app.add_flag("-r, --rds", asRDS, "Save output as .rds file instead of .qs") app.add_flag("-r, --rds", asRDS, "Save output as .rds")->default_val(false);
->default_val(false);
bool asQS{false};
app.add_flag("-q, --qs", asQS, "Save output as .qs")->default_val(false);
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
@ -69,7 +71,13 @@ int main(int argc, char **argv) {
} }
// append the correct file extension // append the correct file extension
output_file += asRDS ? ".rds" : ".qs"; if (asRDS) {
output_file += ".rds";
} else if (asQS) {
output_file += ".qs";
} else {
output_file += ".qs2";
}
// set working directory to the directory of the input script // set working directory to the directory of the input script
if (setwd) { if (setwd) {

View File

@ -28,24 +28,27 @@
#include "DataStructures/Field.hpp" #include "DataStructures/Field.hpp"
#include "Init/InitialList.hpp" #include "Init/InitialList.hpp"
#include "Transport/DiffusionModule.hpp" #include "Transport/DiffusionModule.hpp"
#include <RInside.h> #include <RInside.h>
#include <Rcpp.h> #include <Rcpp.h>
#include <Rcpp/DataFrame.h> #include <Rcpp/DataFrame.h>
#include <Rcpp/Function.h> #include <Rcpp/Function.h>
#include <Rcpp/vector/instantiation.h> #include <Rcpp/vector/instantiation.h>
#include <algorithm>
#include <array>
#include <cstdint> #include <cstdint>
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <mpi.h> #include <mpi.h>
#include <set>
#include <string> #include <string>
#include <mutex>
#include <condition_variable>
#include "Chemistry/SurrogateModels/AI_functions.hpp"
#include <CLI/CLI.hpp> #include <CLI/CLI.hpp>
#include <poet.hpp> #include <poet.hpp>
#include <vector> #include <vector>
using namespace std; // using namespace std;
using namespace poet; using namespace poet;
using namespace Rcpp; using namespace Rcpp;
@ -57,7 +60,7 @@ static std::unique_ptr<Rcpp::List> global_rt_setup;
// before the R runtime is initialized // before the R runtime is initialized
static poet::DEFunc master_init_R; static poet::DEFunc master_init_R;
static poet::DEFunc master_iteration_end_R; static poet::DEFunc master_iteration_end_R;
static poet::DEFunc store_setup_R; // MDL: unused -> static poet::DEFunc store_setup_R;
static poet::DEFunc ReadRObj_R; static poet::DEFunc ReadRObj_R;
static poet::DEFunc SaveRObj_R; static poet::DEFunc SaveRObj_R;
static poet::DEFunc source_R; static poet::DEFunc source_R;
@ -66,13 +69,12 @@ static void init_global_functions(RInside &R) {
R.parseEval(kin_r_library); R.parseEval(kin_r_library);
master_init_R = DEFunc("master_init"); master_init_R = DEFunc("master_init");
master_iteration_end_R = DEFunc("master_iteration_end"); master_iteration_end_R = DEFunc("master_iteration_end");
store_setup_R = DEFunc("StoreSetup"); // MDL: unused -> store_setup_R = DEFunc("StoreSetup");
source_R = DEFunc("source"); source_R = DEFunc("source");
ReadRObj_R = DEFunc("ReadRObj"); ReadRObj_R = DEFunc("ReadRObj");
SaveRObj_R = DEFunc("SaveRObj"); SaveRObj_R = DEFunc("SaveRObj");
} }
// HACK: this is a step back as the order and also the count of fields is // HACK: this is a step back as the order and also the count of fields is
// predefined, but it will change in the future // predefined, but it will change in the future
// static inline void writeFieldsToR(RInside &R, const Field &trans, // static inline void writeFieldsToR(RInside &R, const Field &trans,
@ -147,7 +149,10 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params) {
"Enable AI surrogate for chemistry module"); "Enable AI surrogate for chemistry module");
app.add_flag("--rds", params.as_rds, app.add_flag("--rds", params.as_rds,
"Save output as .rds file instead of .qs"); "Save output as .rds file instead of default .qs2");
app.add_flag("--qs", params.as_qs,
"Save output as .qs file instead of default .qs2");
std::string init_file; std::string init_file;
std::string runtime_file; std::string runtime_file;
@ -175,7 +180,11 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params) {
} }
// set the output extension // set the output extension
params.out_ext = params.as_rds ? "rds" : "qs"; params.out_ext = "qs2";
if (params.as_rds)
params.out_ext = "rds";
if (params.as_qs)
params.out_ext = "qs";
if (MY_RANK == 0) { if (MY_RANK == 0) {
// MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result)); // MSG("Complete results storage is " + BOOL_PRINT(simparams.store_result));
@ -183,12 +192,6 @@ int parseInitValues(int argc, char **argv, RuntimeParameters &params) {
MSG("Work Package Size: " + std::to_string(params.work_package_size)); MSG("Work Package Size: " + std::to_string(params.work_package_size));
MSG("DHT is " + BOOL_PRINT(params.use_dht)); MSG("DHT is " + BOOL_PRINT(params.use_dht));
MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate)); MSG("AI Surrogate is " + BOOL_PRINT(params.use_ai_surrogate));
#ifndef USE_AI_SURROGATE
if (params.use_ai_surrogate) {
throw std::runtime_error("AI Surrogate functions can only be used if they are included during compile time.\n \
Please use the CMake flag -DUSE_AI_SURROGATE=ON.");
}
#endif
if (params.use_dht) { if (params.use_dht) {
// MSG("DHT strategy is " + std::to_string(simparams.dht_strategy)); // MSG("DHT strategy is " + std::to_string(simparams.dht_strategy));
@ -285,163 +288,100 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
if (params.print_progress) { if (params.print_progress) {
chem.setProgressBarPrintout(true); chem.setProgressBarPrintout(true);
} }
/* For the weights and biases of the AI surrogate
* model to use in an inference function with Eigen */
std::mutex Eigen_model_mutex;
static EigenModel Eigen_model;
/* With K Means clustering, a second model will be used
* only for the reactive part of the field (and the first
* model only for the non-reactive part) */
static EigenModel Eigen_model_reactive;
/* For the training data */
std::mutex training_data_buffer_mutex;
std::condition_variable training_data_buffer_full;
bool start_training, end_training;
TrainingData training_data_buffer;
std::vector<int> cluster_labels;
if (params.use_ai_surrogate) {
MSG("AI: Initialize model");
// Initiate two models from one file
Python_Keras_load_model(R["model_file_path"], R["model_reactive_file_path"],
params.use_clustering);
if (!params.disable_training) {
MSG("AI: Initialize training thread");
Python_Keras_training_thread(&Eigen_model, &Eigen_model_reactive,
&Eigen_model_mutex, &training_data_buffer,
&training_data_buffer_mutex,
&training_data_buffer_full, &start_training,
&end_training, params);
}
if (!params.use_Keras_predictions) {
// Initialize Eigen model for custom inference function
MSG("AI: Use custom C++ prediction function");
// Get Keras weights from Python
std::vector<std::vector<std::vector<double>>> cpp_weights = Python_Keras_get_weights("model");
// Set model size
size_t num_layers = cpp_weights.size() / 2;
Eigen_model.weight_matrices.resize(num_layers);
Eigen_model.biases.resize(num_layers);
for (size_t i = 0; i < cpp_weights.size(); i += 2) {
size_t rows = cpp_weights[i][0].size();
size_t cols = cpp_weights[i].size();
Eigen_model.weight_matrices[i / 2].resize(rows, cols);
size_t bias_size = cpp_weights[i + 1][0].size();
Eigen_model.biases[i / 2].resize(bias_size);
}
// Set initial model weights
update_weights(&Eigen_model, cpp_weights);
if (params.use_clustering) {
// Initialize Eigen model for reactive part of the field
cpp_weights = Python_Keras_get_weights("model_reactive");
num_layers = cpp_weights.size() / 2;
Eigen_model_reactive.weight_matrices.resize(num_layers);
Eigen_model_reactive.biases.resize(num_layers);
for (size_t i = 0; i < cpp_weights.size(); i += 2) {
size_t rows = cpp_weights[i][0].size();
size_t cols = cpp_weights[i].size();
Eigen_model_reactive.weight_matrices[i / 2].resize(rows, cols);
size_t bias_size = cpp_weights[i + 1][0].size();
Eigen_model_reactive.biases[i / 2].resize(bias_size);
}
update_weights(&Eigen_model_reactive, cpp_weights);
}
}
MSG("AI: Surrogate model initialized");
}
R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps()); R["TMP_PROPS"] = Rcpp::wrap(chem.getField().GetProps());
R["field_nrow"] = chem.getField().GetRequestedVecSize();
/* SIMULATION LOOP */ /* SIMULATION LOOP */
double dSimTime{0}; double dSimTime{0};
for (uint32_t iter = 1; iter < maxiter + 1; iter++) { for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
double start_t = MPI_Wtime(); double start_t = MPI_Wtime();
const double &dt = params.timesteps[iter - 1]; const double &dt = params.timesteps[iter - 1];
// cout << "CPP: Next time step is " << dt << "[s]" << endl; std::cout << std::endl;
MSG("Next time step is " + std::to_string(dt) + " [s]");
/* displaying iteration number, with C++ and R iterator */ /* displaying iteration number, with C++ and R iterator */
MSG("Going through iteration " + std::to_string(iter)); MSG("Going through iteration " + std::to_string(iter) + "/" +
std::to_string(maxiter));
MSG("Current time step is " + std::to_string(dt));
/* run transport */ /* run transport */
diffusion.simulate(dt); diffusion.simulate(dt);
chem.getField().update(diffusion.getField()); chem.getField().update(diffusion.getField());
MSG("Chemistry step"); // MSG("Chemistry start");
if (params.use_ai_surrogate) { if (params.use_ai_surrogate) {
double ai_start_t = MPI_Wtime(); double ai_start_t = MPI_Wtime();
// Save current values from the tug field as predictor for the ai step
// Get current values from the tug field for the ai predictions
R["TMP"] = Rcpp::wrap(chem.getField().AsVector()); R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(std::string("predictors <- ") + R.parseEval(
"set_field(TMP, TMP_PROPS, field_nrow, ai_surrogate_species)"); std::string("predictors <- setNames(data.frame(matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) +
")), TMP_PROPS)"));
R.parseEval("predictors <- predictors[ai_surrogate_species]");
// Apply preprocessing // Apply preprocessing
MSG("AI Preprocessing"); MSG("AI Preprocessing");
R.parseEval("predictors_scaled <- preprocess(predictors)"); R.parseEval("predictors_scaled <- preprocess(predictors)");
std::vector<std::vector<double>> predictors_scaled = R["predictors_scaled"];
// Get K-Means cluster assignements based on the preprocessed data // Predict
if (params.use_clustering) { MSG("AI Prediction");
R.parseEval("cluster_labels <- assign_clusters(predictors_scaled)"); R.parseEval(
cluster_labels = Rcpp::as<std::vector<int>>(R["cluster_labels"]); "aipreds_scaled <- prediction_step(model, predictors_scaled)");
}
MSG("AI: Predict");
if (params.use_Keras_predictions) { // Predict with Keras default function
R["TMP"] = Python_Keras_predict(predictors_scaled, params.batch_size, cluster_labels);
} else { // Predict with custom Eigen function
if (params.use_clustering) {
R["TMP"] = Eigen_predict_clustered(Eigen_model, Eigen_model_reactive,
predictors_scaled, params.batch_size,
&Eigen_model_mutex, cluster_labels);
} else {
R["TMP"] = Eigen_predict(Eigen_model, predictors_scaled, params.batch_size,
&Eigen_model_mutex);
}
}
// Apply postprocessing // Apply postprocessing
MSG("AI: Postprocesing"); MSG("AI Postprocessing");
R.parseEval(std::string("predictions_scaled <- ") + R.parseEval("aipreds <- postprocess(aipreds_scaled)");
"set_field(TMP, ai_surrogate_species, field_nrow, ai_surrogate_species, byrow = TRUE)");
R.parseEval("predictions <- postprocess(predictions_scaled)");
// Validate prediction and write valid predictions to chem field // Validate prediction and write valid predictions to chem field
MSG("AI: Validate"); MSG("AI Validation");
R.parseEval("validity_vector <- validate_predictions(predictors, predictions)"); R.parseEval(
"validity_vector <- validate_predictions(predictors, aipreds)");
MSG("AI: Marking valid"); MSG("AI Marking accepted");
chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector")); chem.set_ai_surrogate_validity_vector(R.parseEval("validity_vector"));
MSG("AI TempField");
std::vector<std::vector<double>> RTempField = std::vector<std::vector<double>> RTempField =
R.parseEval("set_valid_predictions(predictors, predictions, validity_vector)"); R.parseEval("set_valid_predictions(predictors,\
aipreds,\
validity_vector)");
MSG("AI Set Field");
Field predictions_field = Field predictions_field =
Field(R.parseEval("nrow(predictions)"), RTempField, Field(R.parseEval("nrow(predictors)"), RTempField,
R.parseEval("colnames(predictions)")); R.parseEval("colnames(predictors)"));
MSG("AI: Update field with AI predictions"); MSG("AI Update");
chem.getField().update(predictions_field); chem.getField().update(predictions_field);
// store time for output file
double ai_end_t = MPI_Wtime(); double ai_end_t = MPI_Wtime();
R["ai_prediction_time"] = ai_end_t - ai_start_t; R["ai_prediction_time"] = ai_end_t - ai_start_t;
} }
// Run simulation step
MSG("Simulate chemistry");
chem.simulate(dt); chem.simulate(dt);
/* AI surrogate iterative training*/
if (params.use_ai_surrogate) {
double ai_start_t = MPI_Wtime();
R["TMP"] = Rcpp::wrap(chem.getField().AsVector());
R.parseEval(
std::string("targets <- setNames(data.frame(matrix(TMP, nrow=" +
std::to_string(chem.getField().GetRequestedVecSize()) +
")), TMP_PROPS)"));
R.parseEval("targets <- targets[ai_surrogate_species]");
// TODO: Check how to get the correct columns
R.parseEval("target_scaled <- preprocess(targets)");
MSG("AI: incremental training");
R.parseEval("model <- training_step(model, predictors_scaled, "
"target_scaled, validity_vector)");
double ai_end_t = MPI_Wtime();
R["ai_training_time"] = ai_end_t - ai_start_t;
}
// MPI_Barrier(MPI_COMM_WORLD); // MPI_Barrier(MPI_COMM_WORLD);
double end_t = MPI_Wtime(); double end_t = MPI_Wtime();
dSimTime += end_t - start_t; dSimTime += end_t - start_t;
@ -453,53 +393,15 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
// store_result is TRUE) // store_result is TRUE)
call_master_iter_end(R, diffusion.getField(), chem.getField()); call_master_iter_end(R, diffusion.getField(), chem.getField());
/* AI surrogate iterative training*/
if (params.use_ai_surrogate && !params.disable_training) {
// Add values to training data buffer
MSG("AI: Add to training data buffer");
if (!params.train_only_invalid) {
// Use all values if not specified otherwise
R.parseEval("validity_vector[] <- 0");
}
std::vector<std::vector<double>> invalid_x =
R.parseEval("get_invalid_values(predictors_scaled, validity_vector)");
R.parseEval("target_scaled <- preprocess(state_C[ai_surrogate_species])");
std::vector<std::vector<double>> invalid_y =
R.parseEval("get_invalid_values(target_scaled, validity_vector)");
training_data_buffer_mutex.lock();
training_data_buffer_append(training_data_buffer.x, invalid_x);
training_data_buffer_append(training_data_buffer.y, invalid_y);
// If clustering is used, Add cluster labels to buffer and
// count buffer size according to the cluster assignments
int n_cluster_reactive = 0;
size_t buffer_size = training_data_buffer.x[0].size();
if (params.use_clustering) {
cluster_labels_append(training_data_buffer.cluster_labels, cluster_labels,
R["validity_vector"]);
for (size_t i = 0; i < buffer_size; i++) {
n_cluster_reactive += training_data_buffer.cluster_labels[i];
}
}
// Signal to the training thread if enough training data is present
if ((n_cluster_reactive >= params.training_data_size) ||
(buffer_size - n_cluster_reactive >= params.training_data_size)) {
start_training = true;
training_data_buffer_full.notify_one();
}
training_data_buffer_mutex.unlock();
}
diffusion.getField().update(chem.getField()); diffusion.getField().update(chem.getField());
MSG("End of *coupling* iteration " + std::to_string(iter) + "/" + MSG("End of *coupling* iteration " + std::to_string(iter) + "/" +
std::to_string(maxiter)); std::to_string(maxiter));
MSG(); // MSG();
} // END SIMULATION LOOP } // END SIMULATION LOOP
std::cout << std::endl;
Rcpp::List chem_profiling; Rcpp::List chem_profiling;
chem_profiling["simtime"] = chem.GetChemistryTime(); chem_profiling["simtime"] = chem.GetChemistryTime();
chem_profiling["loop"] = chem.GetMasterLoopTime(); chem_profiling["loop"] = chem.GetMasterLoopTime();
@ -538,12 +440,6 @@ static Rcpp::List RunMasterLoop(RInsidePOET &R, const RuntimeParameters &params,
profiling["chemistry"] = chem_profiling; profiling["chemistry"] = chem_profiling;
profiling["diffusion"] = diffusion_profiling; profiling["diffusion"] = diffusion_profiling;
if (params.use_ai_surrogate) {
MSG("Finalize Python and wind down training thread");
Python_finalize(&Eigen_model_mutex, &training_data_buffer_mutex,
&training_data_buffer_full, &start_training, &end_training);
}
chem.MasterLoopBreak(); chem.MasterLoopBreak();
return profiling; return profiling;
@ -592,14 +488,58 @@ std::vector<std::string> getSpeciesNames(const Field &&field, int root,
return species_names_out; return species_names_out;
} }
std::array<double, 2> getBaseTotals(Field &&field, int root, MPI_Comm comm) {
std::array<double, 2> base_totals;
int rank;
MPI_Comm_rank(comm, &rank);
const bool is_master = root == rank;
if (is_master) {
const auto h_col = field["H"];
const auto o_col = field["O"];
base_totals[0] = *std::min_element(h_col.begin(), h_col.end());
base_totals[1] = *std::min_element(o_col.begin(), o_col.end());
MPI_Bcast(base_totals.data(), 2, MPI_DOUBLE, root, MPI_COMM_WORLD);
return base_totals;
}
MPI_Bcast(base_totals.data(), 2, MPI_DOUBLE, root, comm);
return base_totals;
}
bool getHasID(Field &&field, int root, MPI_Comm comm) {
bool has_id;
int rank;
MPI_Comm_rank(comm, &rank);
const bool is_master = root == rank;
if (is_master) {
const auto ID_field = field["ID"];
std::set<double> unique_IDs(ID_field.begin(), ID_field.end());
has_id = unique_IDs.size() > 1;
MPI_Bcast(&has_id, 1, MPI_C_BOOL, root, MPI_COMM_WORLD);
return has_id;
}
MPI_Bcast(&has_id, 1, MPI_C_BOOL, root, comm);
return has_id;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int world_size; int world_size;
// Threadsafe MPI is necessary for the AI surrogate MPI_Init(&argc, &argv);
// training thread
int provided;
int required = MPI_THREAD_FUNNELED;
MPI_Init_thread(&argc, &argv, required, &provided);
{ {
MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_size(MPI_COMM_WORLD, &world_size);
@ -642,10 +582,13 @@ int main(int argc, char *argv[]) {
init_list.getChemistryInit(), MPI_COMM_WORLD); init_list.getChemistryInit(), MPI_COMM_WORLD);
const ChemistryModule::SurrogateSetup surr_setup = { const ChemistryModule::SurrogateSetup surr_setup = {
getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD), getSpeciesNames(init_list.getInitialGrid(), 0, MPI_COMM_WORLD),
getBaseTotals(init_list.getInitialGrid(), 0, MPI_COMM_WORLD),
getHasID(init_list.getInitialGrid(), 0, MPI_COMM_WORLD),
run_params.use_dht, run_params.use_dht,
run_params.dht_size, run_params.dht_size,
run_params.dht_snaps,
run_params.out_dir,
run_params.use_interp, run_params.use_interp,
run_params.interp_bucket_entries, run_params.interp_bucket_entries,
run_params.interp_size, run_params.interp_size,
@ -670,15 +613,8 @@ int main(int argc, char *argv[]) {
R["out_ext"] = run_params.out_ext; R["out_ext"] = run_params.out_ext;
R["out_dir"] = run_params.out_dir; R["out_dir"] = run_params.out_dir;
// MPI_Barrier(MPI_COMM_WORLD);
DiffusionModule diffusion(init_list.getDiffusionInit(),
init_list.getInitialGrid());
chemistry.masterSetField(init_list.getInitialGrid());
if (run_params.use_ai_surrogate) { if (run_params.use_ai_surrogate) {
// Load default function implementations /* Incorporate ai surrogate from R */
R.parseEvalQ(ai_surrogate_r_library); R.parseEvalQ(ai_surrogate_r_library);
/* Use dht species for model input and output */ /* Use dht species for model input and output */
R["ai_surrogate_species"] = R["ai_surrogate_species"] =
@ -687,53 +623,23 @@ int main(int argc, char *argv[]) {
const std::string ai_surrogate_input_script = const std::string ai_surrogate_input_script =
init_list.getChemistryInit().ai_surrogate_input_script; init_list.getChemistryInit().ai_surrogate_input_script;
MSG("AI: Sourcing user-provided script"); MSG("AI: sourcing user-provided script");
R.parseEvalQ(ai_surrogate_input_script); R.parseEvalQ(ai_surrogate_input_script);
if (!Rcpp::as<bool>(R.parseEval("exists(\"model_file_path\")"))) {
throw std::runtime_error("AI surrogate input script must contain a value for model_file_path");
}
/* AI surrogate training and inference parameters. (Can be set by declaring a MSG("AI: initialize AI model");
variable of the same name in one of the the R input scripts)*/ R.parseEval("model <- initiate_model()");
run_params.training_data_size = init_list.getDiffusionInit().n_rows * R.parseEval("gpu_info()");
init_list.getDiffusionInit().n_cols; // Default value is number of cells in field
if (Rcpp::as<bool>(R.parseEval("exists(\"batch_size\")"))) {
run_params.batch_size = R["batch_size"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"training_epochs\")"))) {
run_params.training_epochs = R["training_epochs"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"training_data_size\")"))) {
run_params.training_data_size = R["training_data_size"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"use_Keras_predictions\")"))) {
run_params.use_Keras_predictions = R["use_Keras_predictions"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"disable_training\")"))) {
run_params.disable_training = R["disable_training"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"train_only_invalid\")"))) {
run_params.train_only_invalid = R["train_only_invalid"];
}
if (Rcpp::as<bool>(R.parseEval("exists(\"save_model_path\")"))) {
run_params.save_model_path = Rcpp::as<std::string>(R["save_model_path"]);
MSG("AI: Model will be saved as \"" + run_params.save_model_path + "\"");
}
if (Rcpp::as<bool>(R.parseEval("exists(\"assign_clusters\")"))) {
run_params.use_clustering = true;
MSG("Clustering will be used for the AI surrogate")
}
if (!Rcpp::as<bool>(R.parseEval("exists(\"model_reactive_file_path\")"))) {
R.parseEval("model_reactive_file_path <- model_file_path");
}
MSG("AI: Initialize Python for the AI surrogate functions");
std::string python_keras_file = std::string(SRC_DIR) +
"/src/Chemistry/SurrogateModels/AI_Python_functions/keras_AI_surrogate.py";
Python_Keras_setup(python_keras_file, run_params.cuda_src_dir);
} }
MSG("Init done on process with rank " + std::to_string(MY_RANK)); MSG("Init done on process with rank " + std::to_string(MY_RANK));
// MPI_Barrier(MPI_COMM_WORLD);
DiffusionModule diffusion(init_list.getDiffusionInit(),
init_list.getInitialGrid());
chemistry.masterSetField(init_list.getInitialGrid());
Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry); Rcpp::List profiling = RunMasterLoop(R, run_params, diffusion, chemistry);
MSG("finished simulation loop"); MSG("finished simulation loop");
@ -742,7 +648,7 @@ int main(int argc, char *argv[]) {
R["setup"] = *global_rt_setup; R["setup"] = *global_rt_setup;
R["setup$out_ext"] = run_params.out_ext; R["setup$out_ext"] = run_params.out_ext;
string r_vis_code; std::string r_vis_code;
r_vis_code = "SaveRObj(x = profiling, path = paste0(out_dir, " r_vis_code = "SaveRObj(x = profiling, path = paste0(out_dir, "
"'/timings.', setup$out_ext));"; "'/timings.', setup$out_ext));";
R.parseEval(r_vis_code); R.parseEval(r_vis_code);

View File

@ -23,19 +23,16 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
#include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include <Rcpp.h> #include <Rcpp.h>
#define SRC_DIR "@CMAKE_SOURCE_DIR@"
#define CUDA_SRC_DIR "@CUDA_TOOLKIT_ROOT_DIR@"
static const char *poet_version = "@POET_VERSION@"; static const char *poet_version = "@POET_VERSION@";
// using the Raw string literal to avoid escaping the quotes // using the Raw string literal to avoid escaping the quotes
static const inline std::string kin_r_library = R"(@R_KIN_LIB@)"; static const inline std::string kin_r_library = R"(@R_KIN_LIB@)";
static const inline std::string init_r_library = R"(@R_INIT_LIB@)"; static const inline std::string init_r_library = R"(@R_INIT_LIB@)";
static const inline std::string ai_surrogate_r_library = static const inline std::string ai_surrogate_r_library =
R"(@R_AI_SURROGATE_LIB@)"; R"(@R_AI_SURROGATE_LIB@)";
@ -47,37 +44,29 @@ struct RuntimeParameters {
Rcpp::List init_params; Rcpp::List init_params;
// MDL added to accomodate for qs::qsave/qread
bool as_rds = false; bool as_rds = false;
std::string out_ext; // MDL added to accomodate for qs::qsave/qread bool as_qs = false;
std::string out_ext;
bool print_progress = false; bool print_progress = false;
static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32; static constexpr std::uint32_t WORK_PACKAGE_SIZE_DEFAULT = 32;
std::uint32_t work_package_size; std::uint32_t work_package_size = WORK_PACKAGE_SIZE_DEFAULT;
bool use_dht = false; bool use_dht = false;
static constexpr std::uint32_t DHT_SIZE_DEFAULT = 1.5E3; static constexpr std::uint32_t DHT_SIZE_DEFAULT = 1.5E3;
std::uint32_t dht_size; std::uint32_t dht_size = DHT_SIZE_DEFAULT;
static constexpr std::uint8_t DHT_SNAPS_DEFAULT = 0; static constexpr std::uint8_t DHT_SNAPS_DEFAULT = 0;
std::uint8_t dht_snaps; std::uint8_t dht_snaps = DHT_SNAPS_DEFAULT;
bool use_interp = false; bool use_interp = false;
static constexpr std::uint32_t INTERP_SIZE_DEFAULT = 100; static constexpr std::uint32_t INTERP_SIZE_DEFAULT = 100;
std::uint32_t interp_size; std::uint32_t interp_size = INTERP_SIZE_DEFAULT;
static constexpr std::uint32_t INTERP_MIN_ENTRIES_DEFAULT = 5; static constexpr std::uint32_t INTERP_MIN_ENTRIES_DEFAULT = 5;
std::uint32_t interp_min_entries; std::uint32_t interp_min_entries = INTERP_MIN_ENTRIES_DEFAULT;
static constexpr std::uint32_t INTERP_BUCKET_ENTRIES_DEFAULT = 20; static constexpr std::uint32_t INTERP_BUCKET_ENTRIES_DEFAULT = 20;
std::uint32_t interp_bucket_entries; std::uint32_t interp_bucket_entries = INTERP_BUCKET_ENTRIES_DEFAULT;
/*AI surriogate configuration*/ bool use_ai_surrogate = false;
bool use_ai_surrogate = false; // Can be set with command line flag ---ai-surrogate
bool disable_training = false; // Can be set in the R input script
bool use_clustering = false; // Can be set in the R input script
bool use_Keras_predictions = false; // Can be set in the R input script
bool train_only_invalid = false; // Can be set in the R input script
int batch_size = 2560; // default value determined in test on the UP Turing cluster
int training_epochs = 20;; // Can be set in the R input script
int training_data_size; // Can be set in the R input script
std::string save_model_path = ""; // Can be set in the R input script
std::string cuda_src_dir = CUDA_SRC_DIR; // From CMake
}; };