feat: enable interpolation without DHT lookup

This commit is contained in:
Max Luebke 2023-08-10 14:15:26 +02:00
parent 20db291bae
commit 164406ff25
16 changed files with 393 additions and 514 deletions

View File

@ -224,6 +224,7 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = Rcpp::wrap(chem.GetWorkerDHTFillTimings());
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
if (params.getChemParams().use_interp) {
R["interp_w"] = Rcpp::wrap(chem.GetWorkerInterpolationWriteTimings());
R.parseEvalQ("profiling$interp_write <- interp_w");
@ -239,7 +240,6 @@ inline double RunMasterLoop(SimParams &params, RInside &R,
R["interp_cached"] = Rcpp::wrap(chem.GetWorkerPHTCacheHits());
R.parseEvalQ("profiling$interp_cached <- interp_cached");
}
}
chem.MasterLoopBreak();
diffusion.end();

@ -1 +1 @@
Subproject commit 89f713b273cd5340b2e8169523da04c2d7ad89c9
Subproject commit 6ed14c35322a245e3a9776ef262c0ac0eba3b301

View File

@ -1,15 +1,15 @@
// Time-stamp: "Last modified 2023-08-01 13:13:18 mluebke"
// Time-stamp: "Last modified 2023-08-08 13:15:49 mluebke"
#ifndef CHEMISTRYMODULE_H_
#define CHEMISTRYMODULE_H_
#include "DHT_Wrapper.hpp"
#include "DataStructures.hpp"
#include "Field.hpp"
#include "Interpolation.hpp"
#include "IrmResult.h"
#include "PhreeqcRM.h"
#include "SimParams.hpp"
#include "DataStructures.hpp"
#include <array>
#include <cstddef>
@ -322,9 +322,8 @@ protected:
void WorkerPerfToMaster(int type, const struct worker_s &timings);
void WorkerMetricsToMaster(int type);
IRM_RESULT WorkerRunWorkPackage(std::vector<double> &vecWP,
std::vector<std::uint32_t> &vecMapping,
double dSimTime, double dTimestep);
IRM_RESULT WorkerRunWorkPackage(WorkPackage &work_package, double dSimTime,
double dTimestep);
std::vector<uint32_t> CalculateWPSizesVector(uint32_t n_cells,
uint32_t wp_size) const;

View File

@ -1,8 +0,0 @@
#ifndef DHT_TYPES_H_
#define DHT_TYPES_H_
namespace poet {
enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_CHARGE, DHT_TYPE_TOTAL };
}
#endif // DHT_TYPES_H_

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-08-01 13:48:34 mluebke"
// Time-stamp: "Last modified 2023-08-08 18:01:12 mluebke"
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
@ -25,7 +25,7 @@
#include "DataStructures.hpp"
#include "LookupKey.hpp"
#include "poet/DHT_Types.hpp"
#include "enums.hpp"
#include "poet/HashFunctions.hpp"
#include "poet/LookupKey.hpp"
#include "poet/Rounding.hpp"
@ -57,12 +57,9 @@ using DHT_Location = std::pair<std::uint32_t, std::uint32_t>;
class DHT_Wrapper {
public:
using DHT_ResultObject = struct DHTResobj {
uint32_t length;
std::vector<LookupKey> keys;
std::vector<std::vector<double>> results;
std::vector<double> old_values;
std::vector<bool> needPhreeqc;
std::vector<DHT_Location> locations;
std::vector<bool> filledDHT;
};
static constexpr std::int32_t DHT_KEY_INPUT_CUSTOM =
@ -119,9 +116,7 @@ public:
* @param[in,out] work_package Pointer to current work package
* @param dt Current timestep of simulation
*/
auto checkDHT(int length, double dt, const std::vector<double> &work_package,
std::vector<std::uint32_t> &curr_mapping)
-> const DHT_ResultObject &;
auto checkDHT(WorkPackage &work_package) -> const DHT_ResultObject &;
/**
* @brief Write simulated values into DHT
@ -139,7 +134,7 @@ public:
* outputs of the PHREEQC simulation
* @param dt Current timestep of simulation
*/
void fillDHT(int length, const std::vector<double> &work_package);
void fillDHT(const WorkPackage &work_package);
void resultsToWP(std::vector<double> &work_package);
@ -214,6 +209,14 @@ public:
std::uint32_t getOutputCount() const { return this->data_count; }
inline void prepareKeys(const std::vector<std::vector<double>> &input_values,
double dt) {
dht_results.keys.resize(input_values.size());
for (std::size_t i = 0; i < input_values.size(); i++) {
dht_results.keys[i] = fuzzForDHT(input_values[i], dt);
}
}
private:
uint32_t key_count;
uint32_t data_count;
@ -221,7 +224,7 @@ private:
DHT *dht_object;
MPI_Comm communicator;
LookupKey fuzzForDHT(int var_count, void *key, double dt);
LookupKey fuzzForDHT(const std::vector<double> &cell, double dt);
std::vector<double>
outputToInputAndRates(const std::vector<double> &old_results,

View File

@ -1,13 +1,29 @@
#ifndef DATASTRUCTURES_H_
#define DATASTRUCTURES_H_
#include "enums.hpp"
#include <cstddef>
#include <cstdint>
#include <string>
#include <utility>
#include <vector>
namespace poet {
struct WorkPackage {
std::size_t size;
std::vector<std::vector<double>> input;
std::vector<std::vector<double>> output;
std::vector<std::uint8_t> mapping;
WorkPackage(size_t _size) : size(_size) {
input.resize(size);
output.resize(size);
mapping.resize(size, CHEM_PQC);
}
};
template <typename value_type> class NamedVector {
public:
void insert(std::pair<std::string, value_type> to_insert) {

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-08-01 18:10:48 mluebke"
// Time-stamp: "Last modified 2023-08-09 12:52:37 mluebke"
#ifndef INTERPOLATION_H_
#define INTERPOLATION_H_
@ -65,10 +65,9 @@ public:
void writeLocationToPHT(LookupKey key, DHT_Location location);
const PHT_Result &query(const LookupKey &key,
const std::vector<std::uint32_t> &signif,
std::uint32_t min_entries_needed,
std::uint32_t input_count,
std::uint32_t output_count);
const std::uint32_t min_entries_needed,
const std::uint32_t input_count,
const std::uint32_t output_count);
std::uint64_t getLocations(const LookupKey &key);
@ -185,10 +184,9 @@ public:
auto getMinEntriesNeeded() { return this->min_entries_needed; }
void writePairs(const DHT_Wrapper::DHT_ResultObject &in);
void writePairs();
void tryInterpolation(DHT_Wrapper::DHT_ResultObject &dht_results,
std::vector<std::uint32_t> &curr_mapping);
void tryInterpolation(WorkPackage &work_package);
void resultsToWP(std::vector<double> &work_package) const;

10
include/poet/enums.hpp Normal file
View File

@ -0,0 +1,10 @@
#ifndef ENUMS_H_
#define ENUMS_H_
namespace poet {
enum DHT_PROP_TYPES { DHT_TYPE_DEFAULT, DHT_TYPE_CHARGE, DHT_TYPE_TOTAL };
enum CHEMISTRY_OUT_SOURCE { CHEM_PQC, CHEM_DHT, CHEM_INTERP };
} // namespace poet
#endif // ENUMS_H_

View File

@ -293,16 +293,23 @@ void poet::ChemistryModule::initializeField(const Field &trans_field) {
this->n_cells = trans_field.GetRequestedVecSize();
chem_field = Field(n_cells);
std::vector<double> phreeqc_init;
this->getDumpedField(phreeqc_init);
std::vector<std::vector<double>> phreeqc_dump(this->nxyz);
this->getDumpedField(phreeqc_dump);
if (is_sequential) {
std::vector<double> init_vec{phreeqc_init};
this->unshuffleField(phreeqc_init, n_cells, prop_count, 1, init_vec);
std::vector<double> init_vec;
for (std::size_t i = 0; i < n_cells; i++) {
init_vec.insert(init_vec.end(), phreeqc_dump[i].begin(),
phreeqc_dump[i].end());
}
const auto tmp_buffer{init_vec};
this->unshuffleField(tmp_buffer, n_cells, prop_count, 1, init_vec);
chem_field.InitFromVec(init_vec, prop_names);
return;
}
std::vector<double> &phreeqc_init = phreeqc_dump[0];
std::vector<std::vector<double>> initial_values;
for (const auto &vec : trans_field.As2DVector()) {

View File

@ -90,8 +90,8 @@ std::vector<uint32_t> poet::ChemistryModule::GetWorkerDHTEvictions() const {
MPI_Get_count(&probe, MPI_UINT32_T, &count);
std::vector<uint32_t> ret(count);
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE, WORKER_DHT_EVICTIONS,
this->group_comm, NULL);
MPI_Recv(ret.data(), count, MPI_UINT32_T, probe.MPI_SOURCE,
WORKER_DHT_EVICTIONS, this->group_comm, NULL);
return ret;
}
@ -322,9 +322,24 @@ void poet::ChemistryModule::RunCells() {
void poet::ChemistryModule::MasterRunSequential() {
std::vector<double> shuffled_field =
shuffleField(chem_field.AsVector(), n_cells, prop_count, 1);
this->setDumpedField(shuffled_field);
std::vector<std::vector<double>> input;
for (std::size_t i = 0; i < n_cells; i++) {
input.push_back(
std::vector<double>(shuffled_field.begin() + (i * prop_count),
shuffled_field.begin() + ((i + 1) * prop_count)));
}
this->setDumpedField(input);
PhreeqcRM::RunCells();
this->getDumpedField(shuffled_field);
this->getDumpedField(input);
shuffled_field.clear();
for (std::size_t i = 0; i < n_cells; i++) {
shuffled_field.insert(shuffled_field.end(), input[i].begin(),
input[i].end());
}
std::vector<double> out_vec{shuffled_field};
unshuffleField(shuffled_field, n_cells, prop_count, 1, out_vec);
chem_field.SetFromVector(out_vec);

View File

@ -1,4 +1,4 @@
/// Time-stamp: "Last modified 2023-06-28 15:58:19 mluebke"
/// Time-stamp: "Last modified 2023-08-10 11:50:46 mluebke"
/*
** Copyright (C) 2017-2021 Max Luebke (University of Potsdam)
**
@ -35,8 +35,8 @@ static void determine_dest(uint64_t hash, int comm_size,
/** how many bytes do we need for one index? */
int index_size = sizeof(double) - (index_count - 1);
for (int i = 0; i < index_count; i++) {
tmp_index = 0;
memcpy(&tmp_index, (char *)&hash + i, index_size);
tmp_index = (hash >> (i * 8)) & ((1ULL << (index_size * 8)) - 1);
/* memcpy(&tmp_index, (char *)&hash + i, index_size); */
index[i] = (uint64_t)(tmp_index % table_size);
}
*dest_rank = (unsigned int)(hash % comm_size);

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-08-01 13:41:57 mluebke"
// Time-stamp: "Last modified 2023-08-09 14:05:01 mluebke"
/*
** Copyright (C) 2018-2021 Alexander Lindemann, Max Luebke (University of
@ -21,11 +21,11 @@
*/
#include "poet/DHT_Wrapper.hpp"
#include "poet/DHT_Types.hpp"
#include "poet/HashFunctions.hpp"
#include "poet/Interpolation.hpp"
#include "poet/LookupKey.hpp"
#include "poet/Rounding.hpp"
#include "poet/enums.hpp"
#include <algorithm>
#include <cmath>
@ -83,97 +83,68 @@ DHT_Wrapper::~DHT_Wrapper() {
// free DHT
DHT_free(dht_object, NULL, NULL);
}
auto DHT_Wrapper::checkDHT(int length, double dt,
const std::vector<double> &work_package,
std::vector<std::uint32_t> &curr_mapping)
auto DHT_Wrapper::checkDHT(WorkPackage &work_package)
-> const DHT_ResultObject & {
dht_results.length = length;
dht_results.keys.resize(length);
dht_results.results.resize(length);
dht_results.needPhreeqc.resize(length);
const auto length = work_package.size;
std::vector<double> bucket_writer(this->data_count +
input_key_elements.size());
std::vector<std::uint32_t> new_mapping;
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++) {
// point to current grid cell
void *key = (void *)&(work_package[i * this->data_count]);
auto &data = dht_results.results[i];
auto &key_vector = dht_results.keys[i];
// data.resize(this->data_count);
key_vector = fuzzForDHT(this->key_count, key, dt);
// overwrite input with data from DHT, IF value is found in DHT
int res =
DHT_read(this->dht_object, key_vector.data(), bucket_writer.data());
switch (res) {
case DHT_SUCCESS:
dht_results.results[i] = inputAndRatesToOutput(bucket_writer);
dht_results.needPhreeqc[i] = false;
work_package.output[i] = inputAndRatesToOutput(bucket_writer);
work_package.mapping[i] = CHEM_DHT;
this->dht_hits++;
break;
case DHT_READ_MISS:
dht_results.needPhreeqc[i] = true;
new_mapping.push_back(curr_mapping[i]);
dht_results.results[i] = std::vector<double>{
&work_package[i * this->data_count],
&work_package[i * this->data_count] + this->data_count};
// HACK: apply normalization to total H and O in results field of DHT
// dht_results.results[i][0] -= base_totals[0];
// dht_results.results[i][1] -= base_totals[1];
break;
}
}
curr_mapping = std::move(new_mapping);
dht_results.old_values = work_package;
return dht_results;
}
void DHT_Wrapper::fillDHT(int length, const std::vector<double> &work_package) {
void DHT_Wrapper::fillDHT(const WorkPackage &work_package) {
const auto length = work_package.size;
// loop over every grid cell contained in work package
dht_results.locations.resize(length);
dht_results.filledDHT = std::vector<bool>(length, false);
for (int i = 0; i < length; i++) {
// If true grid cell was simulated, needs to be inserted into dht
if (dht_results.needPhreeqc[i]) {
if (work_package.mapping[i] == CHEM_PQC) {
// check if calcite or dolomite is absent and present, resp.n and vice
// versa in input/output. If this is the case -> Do not write to DHT!
// HACK: hardcoded, should be fixed!
if ((dht_results.old_values[i * this->data_count + 7] == 0) !=
(work_package[i * this->data_count + 7] == 0)) {
dht_results.needPhreeqc[i] = false;
if ((work_package.input[i][7] == 0) != (work_package.output[i][7] == 0)) {
continue;
}
if ((dht_results.old_values[i * this->data_count + 9] == 0) !=
(work_package[i * this->data_count + 9] == 0)) {
dht_results.needPhreeqc[i] = false;
if ((work_package.input[i][9] == 0) != (work_package.output[i][9] == 0)) {
continue;
}
uint32_t proc, index;
const auto &key = dht_results.keys[i];
const auto curr_old_data = std::vector<double>(
dht_results.old_values.begin() + (i * this->data_count),
dht_results.old_values.begin() + ((i + 1) * this->data_count));
const auto curr_new_data = std::vector<double>(
work_package.begin() + (i * this->data_count),
work_package.begin() + ((i + 1) * this->data_count));
const auto data = outputToInputAndRates(curr_old_data, curr_new_data);
auto &key = dht_results.keys[i];
const auto data =
outputToInputAndRates(work_package.input[i], work_package.output[i]);
// void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
// insert simulated data with fuzzed key into DHT
int res = DHT_write(this->dht_object, (void *)(key.data()),
int res = DHT_write(this->dht_object, key.data(),
const_cast<double *>(data.data()), &proc, &index);
dht_results.locations[i] = {proc, index};
@ -182,6 +153,8 @@ void DHT_Wrapper::fillDHT(int length, const std::vector<double> &work_package) {
if ((res != DHT_SUCCESS) && (res == DHT_WRITE_SUCCESS_WITH_EVICTION)) {
dht_evictions++;
}
dht_results.filledDHT[i] = true;
}
}
}
@ -218,14 +191,14 @@ DHT_Wrapper::inputAndRatesToOutput(const std::vector<double> &dht_data) {
return output;
}
void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
for (int i = 0; i < dht_results.length; i++) {
if (!dht_results.needPhreeqc[i]) {
std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
work_package.begin() + (data_count * i));
}
}
}
// void DHT_Wrapper::resultsToWP(std::vector<double> &work_package) {
// for (int i = 0; i < dht_results.length; i++) {
// if (!dht_results.needPhreeqc[i]) {
// std::copy(dht_results.results[i].begin(), dht_results.results[i].end(),
// work_package.begin() + (data_count * i));
// }
// }
// }
int DHT_Wrapper::tableToFile(const char *filename) {
int res = DHT_to_file(dht_object, filename);
@ -255,11 +228,10 @@ void DHT_Wrapper::printStatistics() {
}
}
LookupKey DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
LookupKey DHT_Wrapper::fuzzForDHT(const std::vector<double> &cell, double dt) {
const auto c_zero_val = std::pow(10, AQUEOUS_EXP);
const Lookup_Keyelement dummy = {.0};
LookupKey vecFuzz(var_count + 1, dummy);
LookupKey vecFuzz(this->key_count + 1, {.0});
DHT_Rounder rounder;
int totals_i = 0;
@ -269,7 +241,7 @@ LookupKey DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
if (input_key_elements[i] == DHT_KEY_INPUT_CUSTOM) {
continue;
}
double curr_key = ((double *)key)[input_key_elements[i]];
double curr_key = cell[input_key_elements[i]];
if (curr_key != 0) {
if (curr_key < c_zero_val &&
this->dht_prop_type_vector[i] == DHT_TYPE_DEFAULT) {
@ -284,7 +256,7 @@ LookupKey DHT_Wrapper::fuzzForDHT(int var_count, void *key, double dt) {
}
}
// add timestep to the end of the key as double value
vecFuzz[var_count].fp_element = dt;
vecFuzz[this->key_count].fp_element = dt;
return vecFuzz;
}

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-08-01 23:18:45 mluebke"
// Time-stamp: "Last modified 2023-08-09 13:41:53 mluebke"
#include "poet/DHT_Wrapper.hpp"
#include "poet/HashFunctions.hpp"
@ -48,31 +48,32 @@ void InterpolationModule::initPHT(std::uint32_t key_count,
key_size, data_size, entries_per_bucket, size_per_process, communicator);
}
void InterpolationModule::writePairs(const DHT_Wrapper::DHT_ResultObject &in) {
for (int i = 0; i < in.length; i++) {
if (in.needPhreeqc[i]) {
void InterpolationModule::writePairs() {
const auto in = this->dht_instance.getDHTResults();
for (int i = 0; i < in.filledDHT.size(); i++) {
if (in.filledDHT[i]) {
const auto coarse_key = roundKey(in.keys[i]);
pht->writeLocationToPHT(coarse_key, in.locations[i]);
}
}
}
void InterpolationModule::tryInterpolation(
DHT_Wrapper::DHT_ResultObject &dht_results,
std::vector<std::uint32_t> &curr_mapping) {
interp_result.status.resize(dht_results.length, NOT_NEEDED);
interp_result.results.resize(dht_results.length, {});
void InterpolationModule::tryInterpolation(WorkPackage &work_package) {
interp_result.status.resize(work_package.size, NOT_NEEDED);
for (int i = 0; i < dht_results.length; i++) {
if (!dht_results.needPhreeqc[i]) {
const auto dht_results = this->dht_instance.getDHTResults();
for (int i = 0; i < work_package.size; i++) {
if (work_package.mapping[i] != CHEM_PQC) {
interp_result.status[i] = NOT_NEEDED;
continue;
}
const auto rounded_key = roundKey(dht_results.keys[i]);
auto pht_result =
pht->query(roundKey(dht_results.keys[i]), this->key_signifs.getValues(),
this->min_entries_needed, dht_instance.getInputCount(),
dht_instance.getOutputCount());
pht->query(rounded_key, this->min_entries_needed,
dht_instance.getInputCount(), dht_instance.getOutputCount());
int pht_i = 0;
@ -85,9 +86,9 @@ void InterpolationModule::tryInterpolation(
auto out_it = pht_result.out_values.begin() + pht_i;
bool same_sig_calcite = (pht_result.in_values[pht_i][7] == 0) ==
(dht_results.results[i][7] == 0);
(work_package.input[i][7] == 0);
bool same_sig_dolomite = (pht_result.in_values[pht_i][8] == 0) ==
(dht_results.results[i][9] == 0);
(work_package.input[i][9] == 0);
if (!same_sig_calcite || !same_sig_dolomite) {
pht_result.size -= 1;
pht_result.in_values.erase(in_it);
@ -104,7 +105,7 @@ void InterpolationModule::tryInterpolation(
}
#ifdef POET_PHT_ADD
this->pht->incrementReadCounter(roundKey(dht_results.keys[i]));
this->pht->incrementReadCounter(roundKey(roundedKey));
#endif
double start_fc = MPI_Wtime();
@ -115,11 +116,11 @@ void InterpolationModule::tryInterpolation(
// }
// mean_water /= pht_result.size;
interp_result.results[i] =
f_interpolate(dht_instance.getKeyElements(), dht_results.results[i],
work_package.output[i] =
f_interpolate(dht_instance.getKeyElements(), work_package.input[i],
pht_result.in_values, pht_result.out_values);
if (interp_result.results[i][7] < 0 || interp_result.results[i][9] < 0) {
if (work_package.output[i][7] < 0 || work_package.output[i][9] < 0) {
interp_result.status[i] = INSUFFICIENT_DATA;
continue;
}
@ -129,10 +130,7 @@ void InterpolationModule::tryInterpolation(
this->interpolations++;
curr_mapping.erase(std::remove(curr_mapping.begin(), curr_mapping.end(), i),
curr_mapping.end());
dht_results.needPhreeqc[i] = false;
work_package.mapping[i] = CHEM_INTERP;
interp_result.status[i] = RES_OK;
}
}

View File

@ -1,4 +1,4 @@
// Time-stamp: "Last modified 2023-08-01 17:11:42 mluebke"
// Time-stamp: "Last modified 2023-08-09 13:32:11 mluebke"
#include "poet/DHT_Wrapper.hpp"
#include "poet/HashFunctions.hpp"
@ -106,9 +106,8 @@ void ProximityHashTable::writeLocationToPHT(LookupKey key,
}
const ProximityHashTable::PHT_Result &ProximityHashTable::query(
const LookupKey &key, const std::vector<std::uint32_t> &signif,
std::uint32_t min_entries_needed, std::uint32_t input_count,
std::uint32_t output_count) {
const LookupKey &key, const std::uint32_t min_entries_needed,
const std::uint32_t input_count, const std::uint32_t output_count) {
double start_r = MPI_Wtime();
const auto cache_ret = localCache[key];
@ -158,27 +157,6 @@ const ProximityHashTable::PHT_Result &ProximityHashTable::query(
buffer += input_count;
lookup_results.out_values.push_back(
std::vector<double>(buffer, buffer + output_count));
// if (!similarityCheck(check_key, key, signif)) {
// // TODO: original stored location in PHT was overwritten in DHT.
// Need to
// // handle this!
// lookup_results.size--;
// if (lookup_results.size < min_entries_needed) {
// lookup_results.size = 0;
// break;
// }
// continue;
// }
// auto input = convertKeysFromDHT(buffer_start, dht_key_count);
// // remove timestep from the key
// input.pop_back();
// lookup_results.in_keys.push_back(input);
// auto *data = reinterpret_cast<double *>(buffer + dht_key_count);
// lookup_results.out_values.push_back(
// std::vector<double>(data, data + dht_data_count));
}
if (lookup_results.size != 0) {

View File

@ -1,10 +1,10 @@
// Time-stamp: "Last modified 2023-08-01 17:22:20 mluebke"
// Time-stamp: "Last modified 2023-08-10 12:14:24 mluebke"
#include "IrmResult.h"
#include "poet/ChemistryModule.hpp"
#include "poet/DHT_Wrapper.hpp"
#include "poet/Interpolation.hpp"
#include <IrmResult.h>
#include <algorithm>
#include <cassert>
#include <cmath>
@ -19,146 +19,6 @@
#include <vector>
namespace poet {
//std::vector<double>
//inverseDistanceWeighting(const std::vector<std::int32_t> &to_calc,
// const std::vector<double> &from,
// const std::vector<std::vector<double>> &input,
// const std::vector<std::vector<double>> &output) {
// std::vector<double> results = from;
//
// const std::uint32_t buffer_size = input.size() + 1;
// double buffer[buffer_size];
// double from_rescaled;
//
// const std::uint32_t data_set_n = input.size();
// double rescaled[to_calc.size()][data_set_n + 1];
// double weights[data_set_n];
//
// // rescaling over all key elements
// for (int key_comp_i = 0; key_comp_i < to_calc.size(); key_comp_i++) {
// const auto output_comp_i = to_calc[key_comp_i];
//
// // rescale input between 0 and 1
// for (int point_i = 0; point_i < data_set_n; point_i++) {
// rescaled[key_comp_i][point_i] = input[point_i][key_comp_i];
// }
//
// rescaled[key_comp_i][data_set_n] = from[output_comp_i];
//
// const double min = *std::min_element(rescaled[key_comp_i],
// rescaled[key_comp_i] + data_set_n + 1);
// const double max = *std::max_element(rescaled[key_comp_i],
// rescaled[key_comp_i] + data_set_n + 1);
//
// for (int point_i = 0; point_i < data_set_n; point_i++) {
// rescaled[key_comp_i][point_i] =
// ((max - min) != 0
// ? (rescaled[key_comp_i][point_i] - min) / (max - min)
// : 0);
// }
// rescaled[key_comp_i][data_set_n] =
// ((max - min) != 0 ? (from[output_comp_i] - min) / (max - min) : 0);
// }
//
// // calculate distances for each data set
// double inv_sum = 0;
// for (int point_i = 0; point_i < data_set_n; point_i++) {
// double distance = 0;
// for (int key_comp_i = 0; key_comp_i < to_calc.size(); key_comp_i++) {
// distance += std::pow(
// rescaled[key_comp_i][point_i] - rescaled[key_comp_i][data_set_n], 2);
// }
// weights[point_i] = 1 / std::sqrt(distance);
// assert(!std::isnan(weights[point_i]));
// inv_sum += weights[point_i];
// }
//
// assert(!std::isnan(inv_sum));
//
// // actual interpolation
// // bool has_h = false;
// // bool has_o = false;
//
// for (int key_comp_i = 0; key_comp_i < to_calc.size(); key_comp_i++) {
// const auto output_comp_i = to_calc[key_comp_i];
// double key_delta = 0;
//
// // if (interp_i == 0) {
// // has_h = true;
// // }
//
// // if (interp_i == 1) {
// // has_o = true;
// // }
//
// for (int j = 0; j < data_set_n; j++) {
// key_delta += weights[j] * input[j][key_comp_i];
// }
//
// key_delta /= inv_sum;
//
// results[output_comp_i] = from[output_comp_i] + key_delta;
// }
//
// // if (!has_h) {
// // double new_val = 0;
// // for (int j = 0; j < data_set_n; j++) {
// // new_val += weights[j] * output[j][0];
// // }
// // results[0] = new_val / inv_sum;
// // }
//
// // if (!has_h) {
// // double new_val = 0;
// // for (int j = 0; j < data_set_n; j++) {
// // new_val += weights[j] * output[j][1];
// // }
// // results[1] = new_val / inv_sum;
// // }
//
// // for (std::uint32_t i = 0; i < to_calc.size(); i++) {
// // const std::uint32_t interp_i = to_calc[i];
//
// // // rescale input between 0 and 1
// // for (int j = 0; j < input.size(); j++) {
// // buffer[j] = input[j].at(i);
// // }
//
// // buffer[buffer_size - 1] = from[interp_i];
//
// // const double min = *std::min_element(buffer, buffer + buffer_size);
// // const double max = *std::max_element(buffer, buffer + buffer_size);
//
// // for (int j = 0; j < input.size(); j++) {
// // buffer[j] = ((max - min) != 0 ? (buffer[j] - min) / (max - min) : 1);
// // }
// // from_rescaled =
// // ((max - min) != 0 ? (from[interp_i] - min) / (max - min) : 0);
//
// // double inv_sum = 0;
//
// // // calculate distances for each point
// // for (int i = 0; i < input.size(); i++) {
// // const double distance = std::pow(buffer[i] - from_rescaled, 2);
//
// // buffer[i] = distance > 0 ? (1 / std::sqrt(distance)) : 0;
// // inv_sum += buffer[i];
// // }
// // // calculate new values
// // double new_val = 0;
// // for (int i = 0; i < output.size(); i++) {
// // new_val += buffer[i] * output[i][interp_i];
// // }
// // results[interp_i] = new_val / inv_sum;
// // if (std::isnan(results[interp_i])) {
// // std::cout << "nan with new_val = " << output[0][i] << std::endl;
// // }
// // }
//
// return results;
//}
inline std::string get_string(int root, MPI_Comm communicator) {
int count;
MPI_Bcast(&count, 1, MPI_INT, root, communicator);
@ -256,8 +116,6 @@ void poet::ChemistryModule::WorkerProcessPkgs(struct worker_s &timings,
void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
int double_count,
struct worker_s &timings) {
int local_work_package_size = 0;
static int counter = 1;
double dht_get_start, dht_get_end;
@ -268,12 +126,11 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
double dt;
double current_sim_time;
const uint32_t n_cells_times_props = this->prop_count * this->wp_size;
std::vector<double> vecCurrWP(n_cells_times_props + BUFFER_OFFSET);
int count = double_count;
std::vector<double> mpi_buffer(count);
/* receive */
MPI_Recv(vecCurrWP.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm,
MPI_Recv(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, this->group_comm,
MPI_STATUS_IGNORE);
/* decrement count of work_package by BUFFER_OFFSET */
@ -283,72 +140,70 @@ void poet::ChemistryModule::WorkerDoWork(MPI_Status &probe_status,
* mpi_buffer */
// work_package_size
local_work_package_size = vecCurrWP[count];
poet::WorkPackage s_curr_wp(mpi_buffer[count]);
// current iteration of simulation
iteration = vecCurrWP[count + 1];
iteration = mpi_buffer[count + 1];
// current timestep size
dt = vecCurrWP[count + 2];
dt = mpi_buffer[count + 2];
// current simulation time ('age' of simulation)
current_sim_time = vecCurrWP[count + 3];
current_sim_time = mpi_buffer[count + 3];
/* 4th double value is currently a placeholder */
// placeholder = mpi_buffer[count+4];
vecCurrWP.resize(n_cells_times_props);
std::vector<std::uint32_t> vecMappingWP(local_work_package_size);
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
s_curr_wp.input[wp_i] =
std::vector<double>(mpi_buffer.begin() + this->prop_count * wp_i,
mpi_buffer.begin() + this->prop_count * (wp_i + 1));
}
{
std::uint32_t i = 0;
std::generate(vecMappingWP.begin(), vecMappingWP.end(),
[&] { return i++; });
// std::cout << this->comm_rank << ":" << counter++ << std::endl;
if (dht_enabled || interp_enabled) {
dht->prepareKeys(s_curr_wp.input, dt);
}
if (dht_enabled) {
/* check for values in DHT */
dht_get_start = MPI_Wtime();
dht->checkDHT(local_work_package_size, dt, vecCurrWP, vecMappingWP);
dht->checkDHT(s_curr_wp);
dht_get_end = MPI_Wtime();
timings.dht_get += dht_get_end - dht_get_start;
}
if (interp_enabled) {
interp->tryInterpolation(dht->getDHTResults(), vecMappingWP);
}
interp->tryInterpolation(s_curr_wp);
}
phreeqc_time_start = MPI_Wtime();
if (WorkerRunWorkPackage(vecCurrWP, vecMappingWP, current_sim_time, dt) !=
IRM_OK) {
throw std::runtime_error("Phreeqc threw an error!");
if (WorkerRunWorkPackage(s_curr_wp, current_sim_time, dt) != IRM_OK) {
std::cerr << "Phreeqc error" << std::endl;
};
phreeqc_time_end = MPI_Wtime();
if (dht_enabled) {
dht->resultsToWP(vecCurrWP);
if (interp_enabled) {
interp->resultsToWP(vecCurrWP);
}
for (std::size_t wp_i = 0; wp_i < s_curr_wp.size; wp_i++) {
std::copy(s_curr_wp.output[wp_i].begin(), s_curr_wp.output[wp_i].end(),
mpi_buffer.begin() + this->prop_count * wp_i);
}
/* send results to master */
MPI_Request send_req;
MPI_Isend(vecCurrWP.data(), count, MPI_DOUBLE, 0, LOOP_WORK, MPI_COMM_WORLD,
MPI_Isend(mpi_buffer.data(), count, MPI_DOUBLE, 0, LOOP_WORK, MPI_COMM_WORLD,
&send_req);
if (dht_enabled) {
if (dht_enabled || interp_enabled) {
/* write results to DHT */
dht_fill_start = MPI_Wtime();
dht->fillDHT(local_work_package_size, vecCurrWP);
dht->fillDHT(s_curr_wp);
dht_fill_end = MPI_Wtime();
if (interp_enabled) {
interp->writePairs(dht->getDHTResults());
interp->writePairs();
}
timings.dht_get += dht_get_end - dht_get_start;
timings.dht_fill += dht_fill_end - dht_fill_start;
}
@ -429,36 +284,39 @@ void poet::ChemistryModule::WorkerReadDHTDump(
}
IRM_RESULT
poet::ChemistryModule::WorkerRunWorkPackage(
std::vector<double> &vecWP, std::vector<std::uint32_t> &vecMapping,
poet::ChemistryModule::WorkerRunWorkPackage(WorkPackage &work_package,
double dSimTime, double dTimestep) {
if ((this->wp_size * this->prop_count) != vecWP.size()) {
return IRM_INVALIDARG;
}
// check if we actually need to start phreeqc
bool bRunPhreeqc = false;
for (const auto &aMappingNum : vecMapping) {
if (aMappingNum != -1) {
bRunPhreeqc = true;
break;
std::vector<std::uint32_t> pqc_mapping;
for (std::size_t i = 0; i < work_package.size; i++) {
if (work_package.mapping[i] == CHEM_PQC) {
pqc_mapping.push_back(i);
}
}
if (!bRunPhreeqc) {
if (pqc_mapping.empty()) {
return IRM_OK;
}
IRM_RESULT result;
this->PhreeqcRM::setPOETMapping(vecMapping);
this->setDumpedField(vecWP);
this->PhreeqcRM::setPOETMapping(pqc_mapping);
this->setDumpedField(work_package.input);
this->PhreeqcRM::SetTime(dSimTime);
this->PhreeqcRM::SetTimeStep(dTimestep);
result = this->PhreeqcRM::RunCells();
this->getDumpedField(vecWP);
std::vector<std::vector<double>> output_tmp(work_package.size);
this->getDumpedField(output_tmp);
for (std::size_t i = 0; i < work_package.size; i++) {
if (work_package.mapping[i] == CHEM_PQC) {
work_package.output[i] = output_tmp[i];
}
}
return result;
}

View File

@ -18,15 +18,17 @@
** Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "poet/enums.hpp"
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <poet/DHT_Types.hpp>
#include <poet/SimParams.hpp>
#include <RInside.h>
#include <Rcpp.h>
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <iostream>
#include <string>
#include <string_view>
@ -134,11 +136,13 @@ int SimParams::parseFromCmdl(char *argv[], RInsidePOET &R) {
simparams.print_progressbar = cmdl[{"P", "progress"}];
simparams.print_progressbar = cmdl[{"P", "progress"}];
/*Parse DHT arguments*/
chem_params.use_dht = cmdl["dht"];
chem_params.use_interp = cmdl["interp"];
// cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n';
if (chem_params.use_dht) {
cmdl("dht-size", DHT_SIZE_PER_PROCESS_MB) >> chem_params.dht_size;
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
// endl;
@ -146,6 +150,32 @@ int SimParams::parseFromCmdl(char *argv[], RInsidePOET &R) {
cmdl("dht-snaps", 0) >> chem_params.dht_snaps;
cmdl("dht-file") >> chem_params.dht_file;
/*Parse work package size*/
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size;
cmdl("interp-size", 100) >> chem_params.pht_size;
cmdl("interp-min", 5) >> chem_params.interp_min_entries;
cmdl("interp-bucket-entries", 20) >> chem_params.pht_max_entries;
/*Parse output options*/
simparams.store_result = !cmdl["ignore-result"];
cout << "CPP: Complete results storage is "
<< (simparams.store_result ? "ON" : "OFF") << endl;
cout << "CPP: Work Package Size: " << simparams.wp_size << endl;
cout << "CPP: DHT is " << (chem_params.use_dht ? "ON" : "OFF") << '\n';
if (chem_params.use_dht || chem_params.use_interp) {
cout << "CPP: DHT strategy is " << simparams.dht_strategy << endl;
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
"defined) = "
<< simparams.dht_significant_digits << endl;
cout << "CPP: DHT logarithm before rounding: "
<< (simparams.dht_log ? "ON" : "OFF") << endl;
cout << "CPP: DHT size per process (Byte) = " << chem_params.dht_size
<< endl;
cout << "CPP: DHT save snapshots is " << chem_params.dht_snaps << endl;
cout << "CPP: DHT load file is " << chem_params.dht_file << endl;
}
/*Parse work package size*/
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> simparams.wp_size;
@ -171,7 +201,8 @@ int SimParams::parseFromCmdl(char *argv[], RInsidePOET &R) {
// << simparams.dht_significant_digits);
// MSG("DHT logarithm before rounding: "
// << (simparams.dht_log ? "ON" : "OFF"));
MSG("DHT size per process (Byte) = " + std::to_string(chem_params.dht_size));
MSG("DHT size per process (Byte) = " +
std::to_string(chem_params.dht_size));
MSG("DHT save snapshots is " + BOOL_PRINT(chem_params.dht_snaps));
MSG("DHT load file is " + chem_params.dht_file);
}
@ -179,8 +210,10 @@ int SimParams::parseFromCmdl(char *argv[], RInsidePOET &R) {
if (chem_params.use_interp) {
MSG("PHT interpolation enabled: " + BOOL_PRINT(chem_params.use_interp));
MSG("PHT interp-size = " + std::to_string(chem_params.pht_size));
MSG("PHT interp-min = " + std::to_string(chem_params.interp_min_entries));
MSG("PHT interp-bucket-entries = " + std::to_string(chem_params.pht_max_entries));
MSG("PHT interp-min = " +
std::to_string(chem_params.interp_min_entries));
MSG("PHT interp-bucket-entries = " +
std::to_string(chem_params.pht_max_entries));
}
}