Substitute r_utils.*

This commit is contained in:
Max Lübke 2020-12-15 14:20:25 +01:00
parent 89276a4e81
commit 3203bbe70e
No known key found for this signature in database
GPG Key ID: D3201E51647D1199
9 changed files with 1252 additions and 973 deletions

140
.gitignore vendored Normal file
View File

@ -0,0 +1,140 @@
# Created by https://www.toptal.com/developers/gitignore/api/c,c++,r,cmake
# Edit at https://www.toptal.com/developers/gitignore?templates=c,c++,r,cmake
### C ###
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
### C++ ###
# Prerequisites
# Compiled Object files
*.slo
# Precompiled Headers
# Compiled Dynamic libraries
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
# Executables
### CMake ###
CMakeLists.txt.user
CMakeCache.txt
CMakeFiles
CMakeScripts
Testing
Makefile
cmake_install.cmake
install_manifest.txt
compile_commands.json
CTestTestfile.cmake
_deps
### CMake Patch ###
# External projects
*-prefix/
### R ###
# History files
.Rhistory
.Rapp.history
# Session Data files
.RData
# User-specific files
.Ruserdata
# Example code in package build process
*-Ex.R
# Output files from R CMD build
/*.tar.gz
# Output files from R CMD check
/*.Rcheck/
# RStudio files
.Rproj.user/
# produced vignettes
vignettes/*.html
vignettes/*.pdf
# OAuth2 token, see https://github.com/hadley/httr/releases/tag/v0.3
.httr-oauth
# knitr and R markdown default cache directories
*_cache/
/cache/
# Temporary files created by R markdown
*.utf8.md
*.knit.md
# R Environment Variables
.Renviron
### R.Bookdown Stack ###
# R package: bookdown caching files
/*_files/
# End of https://www.toptal.com/developers/gitignore/api/c,c++,r,cmake

View File

@ -3,12 +3,10 @@ cmake_minimum_required(VERSION 3.9)
project(POET VERSION 0.1) project(POET VERSION 0.1)
add_executable(poet kin.cpp)
# Not needed until now # Not needed until now
# specify the C++ standard # specify the C++ standard
#set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD 14)
#set(CMAKE_CXX_STANDARD_REQUIRED True) set(CMAKE_CXX_STANDARD_REQUIRED True)
find_package(MPI REQUIRED) find_package(MPI REQUIRED)
@ -87,15 +85,19 @@ find_path(R_RInside_INCLUDE_DIR RInside.h
list(APPEND R_INCLUDE_DIRS ${R_RInside_INCLUDE_DIR}) list(APPEND R_INCLUDE_DIRS ${R_RInside_INCLUDE_DIR})
#include found directories for the whole scope (will be changed with modularization) #include found directories for the whole scope (will be changed with modularization)
include_directories(${R_INCLUDE_DIRS} ${MPI_CXX_INCLUDE_DIRS}) include_directories(${MPI_CXX_INCLUDE_DIRS})
#define program libraries #define program libraries
add_library(Worker STATIC worker.cpp)
target_link_libraries(Worker MPI::MPI_CXX)
add_library(DHT STATIC DHT.cpp dht_wrapper.cpp) add_library(POET_Libs OBJECT util/RRuntime.cpp dht_wrapper.cpp worker.cpp DHT.cpp)
target_link_libraries(DHT crypto) target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)
add_library(R_UTILS STATIC r_utils.cpp) #add_library(DHT OBJECT DHT.cpp dht_wrapper.cpp)
#target_link_libraries(DHT crypto R_Wrapper)
target_link_libraries(poet PUBLIC Worker DHT R_UTILS ${R_LIBRARIES} MPI::MPI_CXX) #add_library(Worker OBJECT worker.cpp)
#target_link_libraries(Worker ${R_LIBRARIES} MPI::MPI_CXX R_Wrapper)
add_executable(poet kin.cpp)
target_link_libraries(poet PUBLIC MPI::MPI_CXX POET_Libs)

View File

@ -1,6 +1,8 @@
#include "dht_wrapper.h" #include "dht_wrapper.h"
#include <openssl/md5.h> #include <openssl/md5.h>
using namespace poet;
/*init globals*/ /*init globals*/
bool dht_enabled; bool dht_enabled;
int dht_snaps; int dht_snaps;
@ -37,7 +39,7 @@ uint64_t get_md5(int key_size, void *key) {
return retval; return retval;
} }
double Round_off(RInside &R, double N, double n) { double Round_off(RRuntime R, double N, double n) {
double result; double result;
R["roundsig"] = n; R["roundsig"] = n;
R["roundin"] = N; R["roundin"] = N;
@ -50,7 +52,7 @@ double Round_off(RInside &R, double N, double n) {
/* /*
* Stores fuzzed version of key in fuzzing_buffer * Stores fuzzed version of key in fuzzing_buffer
*/ */
void fuzz_for_dht(RInside &R, int var_count, void *key, double dt) { void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt) {
unsigned int i = 0; unsigned int i = 0;
//introduce fuzzing to allow more hits in DHT //introduce fuzzing to allow more hits in DHT
for (i = 0; i < (unsigned int)var_count; i++) { for (i = 0; i < (unsigned int)var_count; i++) {
@ -82,7 +84,7 @@ void fuzz_for_dht(RInside &R, int var_count, void *key, double dt) {
fuzzing_buffer[var_count] = dt; fuzzing_buffer[var_count] = dt;
} }
void check_dht(RInside &R, int length, std::vector<bool> &out_result_index, double *work_package) { void check_dht(RRuntime R, int length, std::vector<bool> &out_result_index, double *work_package) {
void *key; void *key;
int res; int res;
int var_count = prop_type_vector.size(); int var_count = prop_type_vector.size();
@ -114,7 +116,7 @@ void check_dht(RInside &R, int length, std::vector<bool> &out_result_index, doub
} }
} }
void fill_dht(RInside &R, int length, std::vector<bool> &result_index, double *work_package, double *results) { void fill_dht(RRuntime R, int length, std::vector<bool> &result_index, double *work_package, double *results) {
void *key; void *key;
void *data; void *data;
int res; int res;

View File

@ -1,18 +1,18 @@
#pragma once #pragma once
#include <RInside.h> #include "util/RRuntime.h"
#include <string> #include <string>
#include <vector> #include <vector>
#include <math.h> #include <math.h>
#include "DHT.h" #include "DHT.h"
using namespace std; using namespace std;
using namespace Rcpp; using namespace poet;
/*Functions*/ /*Functions*/
uint64_t get_md5(int key_size, void* key); uint64_t get_md5(int key_size, void* key);
void fuzz_for_dht(RInside &R, int var_count, void *key, double dt); void fuzz_for_dht(RRuntime R, int var_count, void *key, double dt);
void check_dht(RInside &R, int length, std::vector<bool> &out_result_index, double *work_package); void check_dht(RRuntime R, int length, std::vector<bool> &out_result_index, double *work_package);
void fill_dht(RInside &R, int length, std::vector<bool> &result_index, double *work_package, double *results); void fill_dht(RRuntime R, int length, std::vector<bool> &result_index, double *work_package, double *results);
void print_statistics(); void print_statistics();
int table_to_file(char* filename); int table_to_file(char* filename);
int file_to_table(char* filename); int file_to_table(char* filename);

View File

@ -1,24 +1,25 @@
#include <cstring>
#include <iostream>
#include <string> #include <string>
#include <vector> #include <vector>
#include <iostream>
#include <cstring>
#include <RInside.h> // for the embedded R via RInside #include <Rcpp.h>
#include <mpi.h> // mpi header file #include <mpi.h> // mpi header file
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
#include "DHT.h" // MPI-DHT Implementation #include "DHT.h" // MPI-DHT Implementation
#include "worker.h" #include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
#include "r_utils.h"
#include "dht_wrapper.h" #include "dht_wrapper.h"
#include "global_buffer.h" #include "global_buffer.h"
#include "worker.h"
#include "util/RRuntime.h"
using namespace std; using namespace std;
using namespace poet;
using namespace Rcpp; using namespace Rcpp;
double* mpi_buffer; double *mpi_buffer;
double* mpi_buffer_results; double *mpi_buffer_results;
uint32_t work_package_size; uint32_t work_package_size;
#define WORK_PACKAGE_SIZE_DEFAULT 5 #define WORK_PACKAGE_SIZE_DEFAULT 5
@ -27,9 +28,9 @@ bool store_result;
std::set<std::string> paramList() { std::set<std::string> paramList() {
std::set<std::string> options; std::set<std::string> options;
//global // global
options.insert("work-package-size"); options.insert("work-package-size");
//only DHT // only DHT
options.insert("dht-signif"); options.insert("dht-signif");
options.insert("dht-strategy"); options.insert("dht-strategy");
options.insert("dht-size"); options.insert("dht-size");
@ -41,9 +42,9 @@ std::set<std::string> paramList() {
std::set<std::string> flagList() { std::set<std::string> flagList() {
std::set<std::string> options; std::set<std::string> options;
//global // global
options.insert("ignore-result"); options.insert("ignore-result");
//only DHT // only DHT
options.insert("dht"); options.insert("dht");
options.insert("dht-log"); options.insert("dht-log");
@ -55,27 +56,27 @@ std::list<std::string> checkOptions(argh::parser cmdl) {
std::set<std::string> flist = flagList(); std::set<std::string> flist = flagList();
std::set<std::string> plist = paramList(); std::set<std::string> plist = paramList();
for (auto& flag: cmdl.flags()) { for (auto &flag : cmdl.flags()) {
if (!(flist.find(flag) != flist.end())) retList.push_back(flag); if (!(flist.find(flag) != flist.end()))
retList.push_back(flag);
} }
for (auto& param: cmdl.params()) { for (auto &param : cmdl.params()) {
if (!(plist.find(param.first) != plist.end())) retList.push_back(param.first); if (!(plist.find(param.first) != plist.end()))
retList.push_back(param.first);
} }
return retList; return retList;
} }
typedef struct typedef struct {
{
char has_work; char has_work;
double* send_addr; double *send_addr;
} worker_struct; } worker_struct;
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
double sim_start, sim_b_transport, sim_a_transport, sim_b_chemistry, sim_a_chemistry, double sim_start, sim_b_transport, sim_a_transport, sim_b_chemistry,
sim_end; sim_a_chemistry, sim_end;
double cummul_transport = 0.f; double cummul_transport = 0.f;
double cummul_chemistry = 0.f; double cummul_chemistry = 0.f;
@ -101,7 +102,7 @@ int main(int argc, char *argv[]) {
// cout << "CPP: Start Init (MPI)" << endl; // cout << "CPP: Start Init (MPI)" << endl;
MPI_Init( &argc, &argv ); MPI_Init(&argc, &argv);
int world_size; int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_size(MPI_COMM_WORLD, &world_size);
@ -109,31 +110,32 @@ int main(int argc, char *argv[]) {
int world_rank; int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
/*Create custom Communicator with all processes except 0 (the master) for DHT storage*/ /*Create custom Communicator with all processes except 0 (the master) for DHT
//only needed if strategy == 0, but done anyway * storage*/
// only needed if strategy == 0, but done anyway
MPI_Group group_world; MPI_Group group_world;
MPI_Group dht_group; MPI_Group dht_group;
MPI_Comm dht_comm; MPI_Comm dht_comm;
int* process_ranks; int *process_ranks;
// make a list of processes in the new communicator // make a list of processes in the new communicator
process_ranks= (int*) malloc(world_size*sizeof(int)); process_ranks = (int *)malloc(world_size * sizeof(int));
for(int I = 1; I < world_size; I++) for (int I = 1; I < world_size; I++)
process_ranks[I-1] = I; process_ranks[I - 1] = I;
//get the group under MPI_COMM_WORLD // get the group under MPI_COMM_WORLD
MPI_Comm_group(MPI_COMM_WORLD, &group_world); MPI_Comm_group(MPI_COMM_WORLD, &group_world);
//create the new group // create the new group
MPI_Group_incl(group_world, world_size-1, process_ranks, &dht_group); MPI_Group_incl(group_world, world_size - 1, process_ranks, &dht_group);
// create the new communicator // create the new communicator
MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm); MPI_Comm_create(MPI_COMM_WORLD, dht_group, &dht_comm);
free (process_ranks); //cleanup free(process_ranks); // cleanup
// cout << "Done"; // cout << "Done";
if (cmdl[{"help", "h"}]) { if (cmdl[{"help", "h"}]) {
if (world_rank == 0) { if (world_rank == 0) {
cout << "Todo" << endl << cout << "Todo" << endl
"See README.md for further information." << endl; << "See README.md for further information." << endl;
} }
MPI_Finalize(); MPI_Finalize();
return EXIT_SUCCESS; return EXIT_SUCCESS;
@ -142,9 +144,10 @@ int main(int argc, char *argv[]) {
/*INIT is now done separately in an R file provided here as argument!*/ /*INIT is now done separately in an R file provided here as argument!*/
if (!cmdl(2)) { if (!cmdl(2)) {
if (world_rank == 0) { if (world_rank == 0) {
cerr << "ERROR. Kin needs 2 positional arguments: " << endl << cerr << "ERROR. Kin needs 2 positional arguments: " << endl
"1) the R script defining your simulation and" << endl << << "1) the R script defining your simulation and" << endl
"2) the directory prefix where to save results and profiling" << endl; << "2) the directory prefix where to save results and profiling"
<< endl;
} }
MPI_Finalize(); MPI_Finalize();
return EXIT_FAILURE; return EXIT_FAILURE;
@ -154,7 +157,7 @@ int main(int argc, char *argv[]) {
if (!optionsError.empty()) { if (!optionsError.empty()) {
if (world_rank == 0) { if (world_rank == 0) {
cerr << "Unrecognized option(s):\n" << endl; cerr << "Unrecognized option(s):\n" << endl;
for (auto option: optionsError) { for (auto option : optionsError) {
cerr << option << endl; cerr << option << endl;
} }
cerr << "\nMake sure to use available options. Exiting!" << endl; cerr << "\nMake sure to use available options. Exiting!" << endl;
@ -172,13 +175,16 @@ int main(int argc, char *argv[]) {
// cout << "CPP: DHT strategy is " << dht_strategy << endl; // cout << "CPP: DHT strategy is " << dht_strategy << endl;
cmdl("dht-signif", 5) >> dht_significant_digits; cmdl("dht-signif", 5) >> dht_significant_digits;
// cout << "CPP: DHT significant digits = " << dht_significant_digits << endl; // cout << "CPP: DHT significant digits = " << dht_significant_digits <<
// endl;
dht_logarithm = cmdl["dht-log"]; dht_logarithm = cmdl["dht-log"];
// cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON" : "OFF" ) << endl; // cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON"
// : "OFF" ) << endl;
cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> dht_size_per_process; cmdl("dht-size", DHT_SIZE_PER_PROCESS) >> dht_size_per_process;
// cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process << endl; // cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process <<
// endl;
cmdl("dht-snaps", 0) >> dht_snaps; cmdl("dht-snaps", 0) >> dht_snaps;
@ -188,35 +194,37 @@ int main(int argc, char *argv[]) {
/*Parse work package size*/ /*Parse work package size*/
cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> work_package_size; cmdl("work-package-size", WORK_PACKAGE_SIZE_DEFAULT) >> work_package_size;
/*Parse output options*/ /*Parse output options*/
store_result = !cmdl["ignore-result"]; store_result = !cmdl["ignore-result"];
if (world_rank == 0) {
if (world_rank==0) { cout << "CPP: Complete results storage is " << (store_result ? "ON" : "OFF")
cout << "CPP: Complete results storage is " << ( store_result ? "ON" : "OFF" ) << endl; << endl;
cout << "CPP: Work Package Size: " << work_package_size << endl; cout << "CPP: Work Package Size: " << work_package_size << endl;
cout << "CPP: DHT is " << ( dht_enabled ? "ON" : "OFF" ) << '\n'; cout << "CPP: DHT is " << (dht_enabled ? "ON" : "OFF") << '\n';
if (dht_enabled) { if (dht_enabled) {
cout << "CPP: DHT strategy is " << dht_strategy << endl; cout << "CPP: DHT strategy is " << dht_strategy << endl;
cout << "CPP: DHT key default digits (ignored if 'signif_vector' is defined) = " << dht_significant_digits << endl; cout << "CPP: DHT key default digits (ignored if 'signif_vector' is "
cout << "CPP: DHT logarithm before rounding: " << ( dht_logarithm ? "ON" : "OFF" ) << endl; "defined) = "
cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process << endl; << dht_significant_digits << endl;
cout << "CPP: DHT logarithm before rounding: "
<< (dht_logarithm ? "ON" : "OFF") << endl;
cout << "CPP: DHT size per process (Byte) = " << dht_size_per_process
<< endl;
cout << "CPP: DHT save snapshots is " << dht_snaps << endl; cout << "CPP: DHT save snapshots is " << dht_snaps << endl;
cout << "CPP: DHT load file is " << dht_file << endl; cout << "CPP: DHT load file is " << dht_file << endl;
} }
} }
cout << "CPP: R Init (RInside) on process " << world_rank << endl; cout << "CPP: R Init (RInside) on process " << world_rank << endl;
RInside R(argc, argv); RRuntime R(argc, argv);
// if local_rank == 0 then master else worker // if local_rank == 0 then master else worker
R["local_rank"] = world_rank; R["local_rank"] = world_rank;
/*Loading Dependencies*/ /*Loading Dependencies*/
std::string r_load_dependencies = std::string r_load_dependencies = "suppressMessages(library(Rmufits));"
"suppressMessages(library(Rmufits));"
"suppressMessages(library(RedModRphree));" "suppressMessages(library(RedModRphree));"
"source('kin_r_library.R');" "source('kin_r_library.R');"
"source('parallel_r_library.R');"; "source('parallel_r_library.R');";
@ -228,7 +236,8 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns R.parseEvalQ("source(filesim)"); // eval the init string, ignoring any returns
std::string out_dir; std::string out_dir;
if (world_rank == 0) { // only rank 0 initializes goes through the whole initialization if (world_rank ==
0) { // only rank 0 initializes goes through the whole initialization
cmdl(2) >> out_dir; // <- second positional argument cmdl(2) >> out_dir; // <- second positional argument
R["fileout"] = wrap(out_dir); // assign a char* (string) to 'fileout' R["fileout"] = wrap(out_dir); // assign a char* (string) to 'fileout'
@ -238,7 +247,7 @@ int main(int argc, char *argv[]) {
// pass the boolean "store_result" to the R process // pass the boolean "store_result" to the R process
R["store_result"] = store_result; R["store_result"] = store_result;
//get timestep vector from grid_init function ... // get timestep vector from grid_init function ...
std::string master_init_code = "mysetup <- master_init(setup=setup)"; std::string master_init_code = "mysetup <- master_init(setup=setup)";
R.parseEval(master_init_code); R.parseEval(master_init_code);
@ -249,11 +258,11 @@ int main(int argc, char *argv[]) {
MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD); MPI_Bcast(&dt_differ, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
} }
if (world_rank==0) { if (world_rank == 0) {
cout << "CPP: R init done on process with rank " << world_rank << endl; cout << "CPP: R init done on process with rank " << world_rank << endl;
} }
//initialize chemistry on all processes // initialize chemistry on all processes
std::string init_chemistry_code = "mysetup <- init_chemistry(setup=mysetup)"; std::string init_chemistry_code = "mysetup <- init_chemistry(setup=mysetup)";
R.parseEval(init_chemistry_code); R.parseEval(init_chemistry_code);
@ -261,20 +270,21 @@ int main(int argc, char *argv[]) {
Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C"); Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
/* Init Parallel helper functions */ /* Init Parallel helper functions */
R["n_procs"] = world_size-1; /* worker count */ R["n_procs"] = world_size - 1; /* worker count */
R["work_package_size"] = work_package_size; R["work_package_size"] = work_package_size;
// Removed additional field for ID in previous versions // Removed additional field for ID in previous versions
if (world_rank == 0) if (world_rank == 0) {
{ mpi_buffer =
mpi_buffer = (double*) calloc(state_C.nrow() * (state_C.ncol()), sizeof(double)); (double *)calloc(state_C.nrow() * (state_C.ncol()), sizeof(double));
} else } else {
{ mpi_buffer = (double *)calloc(
mpi_buffer = (double*) calloc((work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double)); (work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double));
mpi_buffer_results = (double*) calloc(work_package_size * (state_C.ncol()), sizeof(double)); mpi_buffer_results =
(double *)calloc(work_package_size * (state_C.ncol()), sizeof(double));
} }
if (world_rank==0) { if (world_rank == 0) {
cout << "CPP: parallel init completed (buffers allocated)!" << endl; cout << "CPP: parallel init completed (buffers allocated)!" << endl;
} }
@ -283,52 +293,51 @@ int main(int argc, char *argv[]) {
R["dht_enabled"] = dht_enabled; R["dht_enabled"] = dht_enabled;
R["dht_log"] = dht_logarithm; R["dht_log"] = dht_logarithm;
if (dht_enabled) if (dht_enabled) {
{ // cout << "\nCreating DHT\n";
//cout << "\nCreating DHT\n"; // determine size of dht entries
//determine size of dht entries
int dht_data_size = state_C.ncol() * sizeof(double); int dht_data_size = state_C.ncol() * sizeof(double);
int dht_key_size = state_C.ncol() * sizeof(double) + (dt_differ * sizeof(double)); int dht_key_size =
state_C.ncol() * sizeof(double) + (dt_differ * sizeof(double));
//determine bucket count for preset memory usage // determine bucket count for preset memory usage
//bucket size is key + value + 1 byte for status // bucket size is key + value + 1 byte for status
int dht_buckets_per_process = dht_size_per_process / (1 + dht_data_size + dht_key_size); int dht_buckets_per_process =
dht_size_per_process / (1 + dht_data_size + dht_key_size);
// MDL : following code moved here from worker.cpp // MDL : following code moved here from worker.cpp
/*Load significance vector from R setup file (or set default)*/ /*Load significance vector from R setup file (or set default)*/
bool signif_vector_exists = R.parseEval("exists('signif_vector')"); bool signif_vector_exists = R.parseEval("exists('signif_vector')");
if (signif_vector_exists) if (signif_vector_exists) {
{
dht_significant_digits_vector = as<std::vector<int>>(R["signif_vector"]); dht_significant_digits_vector = as<std::vector<int>>(R["signif_vector"]);
} else } else {
{ dht_significant_digits_vector.assign(
dht_significant_digits_vector.assign(dht_object->key_size / sizeof(double), dht_significant_digits); dht_object->key_size / sizeof(double), dht_significant_digits);
} }
/*Load property type vector from R setup file (or set default)*/ /*Load property type vector from R setup file (or set default)*/
bool prop_type_vector_exists = R.parseEval("exists('prop_type')"); bool prop_type_vector_exists = R.parseEval("exists('prop_type')");
if (prop_type_vector_exists) if (prop_type_vector_exists) {
{
prop_type_vector = as<std::vector<string>>(R["prop_type"]); prop_type_vector = as<std::vector<string>>(R["prop_type"]);
} else } else {
{
prop_type_vector.assign(dht_object->key_size / sizeof(double), "act"); prop_type_vector.assign(dht_object->key_size / sizeof(double), "act");
} }
if(world_rank == 0) if (world_rank == 0) {
{ // print only on master, values are equal on all workes
//print only on master, values are equal on all workes
cout << "CPP: dht_data_size: " << dht_data_size << "\n"; cout << "CPP: dht_data_size: " << dht_data_size << "\n";
cout << "CPP: dht_key_size: " << dht_key_size << "\n"; cout << "CPP: dht_key_size: " << dht_key_size << "\n";
cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process << endl; cout << "CPP: dht_buckets_per_process: " << dht_buckets_per_process
<< endl;
// MDL: new output on signif_vector and prop_type // MDL: new output on signif_vector and prop_type
if (signif_vector_exists) { if (signif_vector_exists) {
cout << "CPP: using problem-specific rounding digits: " << endl; cout << "CPP: using problem-specific rounding digits: " << endl;
R.parseEval("print(data.frame(prop=prop, type=prop_type, digits=signif_vector))"); R.parseEval("print(data.frame(prop=prop, type=prop_type, "
} else "digits=signif_vector))");
{ } else {
cout << "CPP: using DHT default rounding digits = " << dht_significant_digits << endl; cout << "CPP: using DHT default rounding digits = "
<< dht_significant_digits << endl;
} }
// MDL: pass to R the DHT stuff. These variables exist // MDL: pass to R the DHT stuff. These variables exist
@ -337,58 +346,57 @@ int main(int argc, char *argv[]) {
R["dht_final_proptype"] = prop_type_vector; R["dht_final_proptype"] = prop_type_vector;
} }
if (dht_strategy == 0) if (dht_strategy == 0) {
{ if (world_rank != 0) {
if(world_rank != 0) { dht_object = DHT_create(dht_comm, dht_buckets_per_process,
dht_object = DHT_create(dht_comm, dht_buckets_per_process, dht_data_size, dht_key_size, get_md5); dht_data_size, dht_key_size, get_md5);
//storing for access from worker and callback functions // storing for access from worker and callback functions
fuzzing_buffer = (double *) malloc (dht_key_size); fuzzing_buffer = (double *)malloc(dht_key_size);
} }
} else { } else {
dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process, dht_data_size, dht_key_size, get_md5); dht_object = DHT_create(MPI_COMM_WORLD, dht_buckets_per_process,
dht_data_size, dht_key_size, get_md5);
} }
if (world_rank==0) { if (world_rank == 0) {
cout << "CPP: DHT successfully created!" << endl; cout << "CPP: DHT successfully created!" << endl;
} }
} }
// MDL: store all parameters // MDL: store all parameters
if (world_rank==0) { if (world_rank == 0) {
cout << "CPP: Calling R Function to store calling parameters" << endl; cout << "CPP: Calling R Function to store calling parameters" << endl;
R.parseEvalQ("StoreSetup(setup=mysetup)"); R.parseEvalQ("StoreSetup(setup=mysetup)");
} }
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
if (world_rank == 0) if (world_rank == 0) { /* This is executed by the master */
{ /* This is executed by the master */
Rcpp::NumericVector master_send; Rcpp::NumericVector master_send;
Rcpp::NumericVector master_recv; Rcpp::NumericVector master_recv;
sim_a_seq = MPI_Wtime(); sim_a_seq = MPI_Wtime();
worker_struct* workerlist = (worker_struct*) calloc(world_size-1, sizeof(worker_struct)); worker_struct *workerlist =
(worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
int need_to_receive; int need_to_receive;
MPI_Status probe_status; MPI_Status probe_status;
double* timings; double *timings;
uint64_t* dht_perfs = NULL; uint64_t *dht_perfs = NULL;
int local_work_package_size; int local_work_package_size;
// a temporary send buffer // a temporary send buffer
double* send_buffer; double *send_buffer;
send_buffer = (double*) calloc((work_package_size * (state_C.ncol() )) + BUFFER_OFFSET, sizeof(double)); send_buffer = (double *)calloc(
(work_package_size * (state_C.ncol())) + BUFFER_OFFSET, sizeof(double));
// helper variables // helper variables
int iteration; int iteration;
double dt, current_sim_time; double dt, current_sim_time;
int n_wp = 1; // holds the actual number of wp which is int n_wp = 1; // holds the actual number of wp which is
// computed later in R::distribute_work_packages() // computed later in R::distribute_work_packages()
std::vector<int> wp_sizes_vector; // vector with the sizes of std::vector<int> wp_sizes_vector; // vector with the sizes of
@ -396,7 +404,8 @@ int main(int argc, char *argv[]) {
sim_start = MPI_Wtime(); sim_start = MPI_Wtime();
//Iteration Count is dynamic, retrieving value from R (is only needed by master for the following loop) // Iteration Count is dynamic, retrieving value from R (is only needed by
// master for the following loop)
uint32_t maxiter = R.parseEval("mysetup$maxiter"); uint32_t maxiter = R.parseEval("mysetup$maxiter");
sim_b_seq = MPI_Wtime(); sim_b_seq = MPI_Wtime();
@ -404,20 +413,19 @@ int main(int argc, char *argv[]) {
cummul_master_seq_pre_loop += sim_b_seq - sim_a_seq; cummul_master_seq_pre_loop += sim_b_seq - sim_a_seq;
/*SIMULATION LOOP*/ /*SIMULATION LOOP*/
for(uint32_t iter = 1; iter < maxiter+1; iter++) for (uint32_t iter = 1; iter < maxiter + 1; iter++) {
{
sim_a_seq = MPI_Wtime(); sim_a_seq = MPI_Wtime();
cummul_master_send = 0.f; cummul_master_send = 0.f;
cummul_master_recv = 0.f; cummul_master_recv = 0.f;
cout << "CPP: Evaluating next time step" << endl; cout << "CPP: Evaluating next time step" << endl;
R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)"); R.parseEvalQ("mysetup <- master_iteration_setup(mysetup)");
/*displaying iteration number, with C++ and R iterator*/ /*displaying iteration number, with C++ and R iterator*/
cout << "CPP: Going through iteration " << iter << endl; cout << "CPP: Going through iteration " << iter << endl;
cout << "CPP: R's $iter: "<< ((uint32_t) (R.parseEval("mysetup$iter"))) <<". Iteration" << endl; cout << "CPP: R's $iter: " << ((uint32_t)(R.parseEval("mysetup$iter")))
<< ". Iteration" << endl;
cout << "CPP: Calling Advection" << endl; cout << "CPP: Calling Advection" << endl;
@ -425,65 +433,68 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("mysetup <- master_advection(setup=mysetup)"); R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
sim_a_transport = MPI_Wtime(); sim_a_transport = MPI_Wtime();
cout << "CPP: Chemistry" << endl; cout << "CPP: Chemistry" << endl;
/*Fallback for sequential execution*/ /*Fallback for sequential execution*/
sim_b_chemistry = MPI_Wtime(); sim_b_chemistry = MPI_Wtime();
if(world_size == 1) if (world_size == 1) {
{
// MDL : the transformation of values into pH and pe // MDL : the transformation of values into pH and pe
// takes now place in master_advection() so the // takes now place in master_advection() so the
// following line is unneeded // following line is unneeded
// R.parseEvalQ("mysetup$state_T <- RedModRphree::Act2pH(mysetup$state_T)"); // R.parseEvalQ("mysetup$state_T <-
R.parseEvalQ("result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)"); // RedModRphree::Act2pH(mysetup$state_T)");
R.parseEvalQ(
"result <- slave_chemistry(setup=mysetup, data=mysetup$state_T)");
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
} else { /*send work to workers*/ } else { /*send work to workers*/
// NEW: only in the first iteration we call // NEW: only in the first iteration we call
// R::distribute_work_packages()!! // R::distribute_work_packages()!!
if (iter==1) if (iter == 1) {
{ R.parseEvalQ(
R.parseEvalQ("wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), package_size=work_package_size)"); "wp_ids <- distribute_work_packages(len=nrow(mysetup$state_T), "
"package_size=work_package_size)");
// we only sort once the vector // we only sort once the vector
R.parseEvalQ("ordered_ids <- order(wp_ids)"); R.parseEvalQ("ordered_ids <- order(wp_ids)");
R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)"); R.parseEvalQ("wp_sizes_vector <- compute_wp_sizes(wp_ids)");
n_wp = (int) R.parseEval("length(wp_sizes_vector)"); n_wp = (int)R.parseEval("length(wp_sizes_vector)");
wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]); wp_sizes_vector = as<std::vector<int>>(R["wp_sizes_vector"]);
cout << "CPP: Total number of work packages: " << n_wp << endl; cout << "CPP: Total number of work packages: " << n_wp << endl;
R.parseEval("stat_wp_sizes(wp_sizes_vector)"); R.parseEval("stat_wp_sizes(wp_sizes_vector)");
} }
/* shuffle and extract data /* shuffle and extract data
MDL: we now apply :Act2pH directly in master_advection MDL: we now apply :Act2pH directly in master_advection
*/ */
// R.parseEval("tmp <- shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)"); // R.parseEval("tmp <-
// shuffle_field(RedModRphree::Act2pH(mysetup$state_T), ordered_ids)");
R.parseEval("tmp <- shuffle_field(mysetup$state_T, ordered_ids)"); R.parseEval("tmp <- shuffle_field(mysetup$state_T, ordered_ids)");
Rcpp::DataFrame chemistry_data = R.parseEval("tmp"); R.setBufferDataFrame("tmp");
R.to_C_domain(mpi_buffer);
//Rcpp::DataFrame chemistry_data = R.parseEval("tmp");
convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data); //convert_R_Dataframe_2_C_buffer(mpi_buffer, chemistry_data);
// cout << "CPP: shuffle_field() done" << endl; // cout << "CPP: shuffle_field() done" << endl;
/* send and receive work; this is done by counting /* send and receive work; this is done by counting
* the wp */ * the wp */
int pkg_to_send = n_wp; int pkg_to_send = n_wp;
int pkg_to_recv = n_wp; int pkg_to_recv = n_wp;
size_t colCount = chemistry_data.ncol(); size_t colCount = R.getBufferNCol();
int free_workers = world_size-1; int free_workers = world_size - 1;
double* work_pointer = mpi_buffer; double *work_pointer = mpi_buffer;
sim_c_chemistry = MPI_Wtime(); sim_c_chemistry = MPI_Wtime();
/* visual progress */ /* visual progress */
float progress = 0.0; float progress = 0.0;
int barWidth = 70; int barWidth = 70;
//retrieve data from R runtime // retrieve data from R runtime
iteration = (int) R.parseEval("mysetup$iter"); iteration = (int)R.parseEval("mysetup$iter");
dt = (double) R.parseEval("mysetup$requested_dt"); dt = (double)R.parseEval("mysetup$requested_dt");
current_sim_time = (double) R.parseEval("mysetup$simulation_time-mysetup$requested_dt"); current_sim_time =
(double)R.parseEval("mysetup$simulation_time-mysetup$requested_dt");
int count_pkgs = 0; int count_pkgs = 0;
@ -494,7 +505,7 @@ int main(int argc, char *argv[]) {
while (pkg_to_recv > 0) // start dispatching work packages while (pkg_to_recv > 0) // start dispatching work packages
{ {
/* visual progress */ /* visual progress */
progress = (float) (count_pkgs+1)/n_wp; progress = (float)(count_pkgs + 1) / n_wp;
cout << "["; cout << "[";
int pos = barWidth * progress; int pos = barWidth * progress;
@ -510,35 +521,40 @@ int main(int argc, char *argv[]) {
std::cout.flush(); std::cout.flush();
/* end visual progress */ /* end visual progress */
if (pkg_to_send > 0) { if (pkg_to_send > 0) {
master_send_a = MPI_Wtime(); master_send_a = MPI_Wtime();
/*search for free workers and send work*/ /*search for free workers and send work*/
for (int p = 0; p < world_size-1; p++) { for (int p = 0; p < world_size - 1; p++) {
if (workerlist[p].has_work == 0 && pkg_to_send > 0) /* worker is free */ { if (workerlist[p].has_work == 0 &&
pkg_to_send > 0) /* worker is free */ {
// to enable different work_package_size, set local copy of work_package_size to // to enable different work_package_size, set local copy of
// either global work_package size or remaining 'to_send' packages // work_package_size to either global work_package size or
// to_send >= work_package_size ? local_work_package_size = work_package_size : local_work_package_size = to_send; // remaining 'to_send' packages to_send >= work_package_size ?
// local_work_package_size = work_package_size :
// local_work_package_size = to_send;
local_work_package_size = (int) wp_sizes_vector[count_pkgs]; local_work_package_size = (int)wp_sizes_vector[count_pkgs];
count_pkgs++; count_pkgs++;
// cout << "CPP: sending pkg n. " << count_pkgs << " with size " << local_work_package_size << endl; // cout << "CPP: sending pkg n. " << count_pkgs << " with size "
// << local_work_package_size << endl;
/*push pointer forward to next work package, after taking the current one*/ /*push pointer forward to next work package, after taking the
* current one*/
workerlist[p].send_addr = work_pointer; workerlist[p].send_addr = work_pointer;
int end_of_wp = local_work_package_size * colCount; int end_of_wp = local_work_package_size * colCount;
work_pointer = &(work_pointer[end_of_wp]); work_pointer = &(work_pointer[end_of_wp]);
// fill send buffer starting with work_package ... // fill send buffer starting with work_package ...
std::memcpy(send_buffer, workerlist[p].send_addr, (end_of_wp) * sizeof(double)); std::memcpy(send_buffer, workerlist[p].send_addr,
(end_of_wp) * sizeof(double));
// followed by: work_package_size // followed by: work_package_size
send_buffer[end_of_wp] = (double) local_work_package_size; send_buffer[end_of_wp] = (double)local_work_package_size;
// current iteration of simulation // current iteration of simulation
send_buffer[end_of_wp + 1] = (double) iteration; send_buffer[end_of_wp + 1] = (double)iteration;
// size of timestep in seconds // size of timestep in seconds
send_buffer[end_of_wp + 2] = dt; send_buffer[end_of_wp + 2] = dt;
// current time of simulation (age) in seconds // current time of simulation (age) in seconds
@ -547,7 +563,8 @@ int main(int argc, char *argv[]) {
send_buffer[end_of_wp + 4] = 0.; send_buffer[end_of_wp + 4] = 0.;
/* ATTENTION Worker p has rank p+1 */ /* ATTENTION Worker p has rank p+1 */
MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE, p+1, TAG_WORK, MPI_COMM_WORLD); MPI_Send(send_buffer, end_of_wp + BUFFER_OFFSET, MPI_DOUBLE,
p + 1, TAG_WORK, MPI_COMM_WORLD);
workerlist[p].has_work = 1; workerlist[p].has_work = 1;
free_workers--; free_workers--;
@ -558,30 +575,29 @@ int main(int argc, char *argv[]) {
cummul_master_send += master_send_b - master_send_a; cummul_master_send += master_send_b - master_send_a;
} }
/*check if there are results to receive and receive them*/ /*check if there are results to receive and receive them*/
need_to_receive = 1; need_to_receive = 1;
master_recv_a = MPI_Wtime(); master_recv_a = MPI_Wtime();
while(need_to_receive && pkg_to_recv > 0) while (need_to_receive && pkg_to_recv > 0) {
{
if (pkg_to_send > 0 && free_workers > 0) if (pkg_to_send > 0 && free_workers > 0)
MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &need_to_receive, &probe_status); MPI_Iprobe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD,
&need_to_receive, &probe_status);
else { else {
idle_a = MPI_Wtime(); idle_a = MPI_Wtime();
MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD, &probe_status); MPI_Probe(MPI_ANY_SOURCE, TAG_WORK, MPI_COMM_WORLD,
&probe_status);
idle_b = MPI_Wtime(); idle_b = MPI_Wtime();
master_idle += idle_b - idle_a; master_idle += idle_b - idle_a;
} }
if(need_to_receive) if (need_to_receive) {
{
int p = probe_status.MPI_SOURCE; int p = probe_status.MPI_SOURCE;
int size; int size;
MPI_Get_count(&probe_status, MPI_DOUBLE, &size); MPI_Get_count(&probe_status, MPI_DOUBLE, &size);
MPI_Recv(workerlist[p-1].send_addr, size, MPI_DOUBLE, p, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(workerlist[p - 1].send_addr, size, MPI_DOUBLE, p,
workerlist[p-1].has_work = 0; TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
workerlist[p - 1].has_work = 0;
pkg_to_recv -= 1; pkg_to_recv -= 1;
free_workers++; free_workers++;
} }
@ -598,9 +614,10 @@ int main(int argc, char *argv[]) {
sim_d_chemistry = MPI_Wtime(); sim_d_chemistry = MPI_Wtime();
cummul_workers += sim_d_chemistry - sim_c_chemistry; cummul_workers += sim_d_chemistry - sim_c_chemistry;
convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data); //convert_C_buffer_2_R_Dataframe(mpi_buffer, chemistry_data);
R.from_C_domain(mpi_buffer);
R["chemistry_data"] = chemistry_data; R["chemistry_data"] = R.getBufferDataFrame();
/* unshuffle results */ /* unshuffle results */
R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)"); R.parseEval("result <- unshuffle_field(chemistry_data, ordered_ids)");
@ -610,7 +627,6 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)"); R.parseEvalQ("mysetup <- master_chemistry(setup=mysetup, data=result)");
sim_f_chemistry = MPI_Wtime(); sim_f_chemistry = MPI_Wtime();
cummul_chemistry_master += sim_f_chemistry - sim_e_chemistry; cummul_chemistry_master += sim_f_chemistry - sim_e_chemistry;
} }
sim_a_chemistry = MPI_Wtime(); sim_a_chemistry = MPI_Wtime();
@ -623,10 +639,13 @@ int main(int argc, char *argv[]) {
cummul_transport += sim_a_transport - sim_b_transport; cummul_transport += sim_a_transport - sim_b_transport;
cummul_chemistry += sim_a_chemistry - sim_b_chemistry; cummul_chemistry += sim_a_chemistry - sim_b_chemistry;
cout << endl << "CPP: End of *coupling* iteration "<< iter <<"/" << maxiter << endl << endl; cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
if (dht_enabled) { if (dht_enabled) {
for (int i=1; i < world_size; i++) { for (int i = 1; i < world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD); MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_STATS, MPI_COMM_WORLD);
} }
@ -634,18 +653,21 @@ int main(int argc, char *argv[]) {
if (dht_snaps == 2) { if (dht_snaps == 2) {
std::stringstream outfile; std::stringstream outfile;
outfile << out_dir << "/iter_" << std::setfill('0') << std::setw(3) << iter << ".dht"; outfile << out_dir << "/iter_" << std::setfill('0') << std::setw(3)
for (int i=1; i < world_size; i++) { << iter << ".dht";
MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i, TAG_DHT_STORE, MPI_COMM_WORLD); for (int i = 1; i < world_size; i++) {
MPI_Send(outfile.str().c_str(), outfile.str().size(), MPI_CHAR, i,
TAG_DHT_STORE, MPI_COMM_WORLD);
} }
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
} }
} }
sim_d_seq = MPI_Wtime(); sim_d_seq = MPI_Wtime();
cummul_master_seq_loop += ((sim_b_seq - sim_a_seq) - (sim_a_transport - sim_b_transport)) + (sim_d_seq - sim_c_seq); cummul_master_seq_loop +=
((sim_b_seq - sim_a_seq) - (sim_a_transport - sim_b_transport)) +
(sim_d_seq - sim_c_seq);
master_send.push_back(cummul_master_send, "it_" + to_string(iter)); master_send.push_back(cummul_master_send, "it_" + to_string(iter));
master_recv.push_back(cummul_master_recv, "it_" + to_string(iter)); master_recv.push_back(cummul_master_recv, "it_" + to_string(iter));
@ -657,8 +679,9 @@ int main(int argc, char *argv[]) {
cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl; cout << "CPP: Master: Instruct workers to write DHT to file ..." << endl;
std::string outfile; std::string outfile;
outfile = out_dir + ".dht"; outfile = out_dir + ".dht";
for (int i=1; i < world_size; i++) { for (int i = 1; i < world_size; i++) {
MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE, MPI_COMM_WORLD); MPI_Send(outfile.c_str(), outfile.size(), MPI_CHAR, i, TAG_DHT_STORE,
MPI_COMM_WORLD);
} }
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
cout << "CPP: Master: ... done" << endl; cout << "CPP: Master: ... done" << endl;
@ -672,39 +695,40 @@ int main(int argc, char *argv[]) {
int phreeqc_tmp; int phreeqc_tmp;
timings = (double*) calloc(3, sizeof(double)); timings = (double *)calloc(3, sizeof(double));
if (dht_enabled) { if (dht_enabled) {
dht_hits = 0; dht_hits = 0;
dht_miss = 0; dht_miss = 0;
dht_collision = 0; dht_collision = 0;
dht_perfs = (uint64_t*) calloc(3, sizeof(uint64_t)); dht_perfs = (uint64_t *)calloc(3, sizeof(uint64_t));
} }
double idle_worker_tmp; double idle_worker_tmp;
for (int p = 0; p < world_size-1; p++) for (int p = 0; p < world_size - 1; p++) {
{
/* ATTENTION Worker p has rank p+1 */ /* ATTENTION Worker p has rank p+1 */
/* Send termination message to worker */ /* Send termination message to worker */
MPI_Send(NULL, 0, MPI_DOUBLE, p+1, TAG_FINISH, MPI_COMM_WORLD); MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
MPI_Recv(timings, 3, MPI_DOUBLE, p+1, TAG_TIMING, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
phreeqc_time.push_back(timings[0], "w" + to_string(p+1)); MPI_STATUS_IGNORE);
phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p+1, TAG_TIMING, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING,
phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p+1)); MPI_COMM_WORLD, MPI_STATUS_IGNORE);
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p+1, TAG_TIMING, MPI_COMM_WORLD, MPI_STATUS_IGNORE); if (dht_enabled) {
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p+1)); dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
if (dht_enabled) MPI_Recv(dht_perfs, 3, MPI_UNSIGNED_LONG_LONG, p + 1, TAG_DHT_PERF,
{ MPI_COMM_WORLD, MPI_STATUS_IGNORE);
dht_get_time.push_back(timings[1], "w" + to_string(p+1));
dht_fill_time.push_back(timings[2], "w" + to_string(p+1));
MPI_Recv(dht_perfs, 3, MPI_UNSIGNED_LONG_LONG, p+1, TAG_DHT_PERF, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
dht_hits += dht_perfs[0]; dht_hits += dht_perfs[0];
dht_miss += dht_perfs[1]; dht_miss += dht_perfs[1];
dht_collision += dht_perfs[2]; dht_collision += dht_perfs[2];
@ -722,7 +746,8 @@ int main(int argc, char *argv[]) {
R["simtime_workers"] = cummul_workers; R["simtime_workers"] = cummul_workers;
R.parseEvalQ("profiling$simtime_workers <- simtime_workers"); R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
R["simtime_chemistry_master"] = cummul_chemistry_master; R["simtime_chemistry_master"] = cummul_chemistry_master;
R.parseEvalQ("profiling$simtime_chemistry_master <- simtime_chemistry_master"); R.parseEvalQ(
"profiling$simtime_chemistry_master <- simtime_chemistry_master");
R["seq_master_prep"] = cummul_master_seq_pre_loop; R["seq_master_prep"] = cummul_master_seq_pre_loop;
R.parseEvalQ("profiling$seq_master_prep <- seq_master_prep"); R.parseEvalQ("profiling$seq_master_prep <- seq_master_prep");
@ -745,8 +770,7 @@ int main(int argc, char *argv[]) {
R["phreeqc_count"] = phreeqc_counts; R["phreeqc_count"] = phreeqc_counts;
R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count"); R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
if (dht_enabled) if (dht_enabled) {
{
R["dht_hits"] = dht_hits; R["dht_hits"] = dht_hits;
R.parseEvalQ("profiling$dht_hits <- dht_hits"); R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_miss"] = dht_miss; R["dht_miss"] = dht_miss;
@ -759,14 +783,14 @@ int main(int argc, char *argv[]) {
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time"); R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
} }
free(workerlist); free(workerlist);
free(timings); free(timings);
if (dht_enabled) if (dht_enabled)
free(dht_perfs); free(dht_perfs);
cout << "CPP: Done! Results are stored as R objects into <" << out_dir << "/timings.rds>" << endl; cout << "CPP: Done! Results are stored as R objects into <" << out_dir
<< "/timings.rds>" << endl;
/*exporting results and profiling data*/ /*exporting results and profiling data*/
std::string r_vis_code; std::string r_vis_code;
@ -774,16 +798,22 @@ int main(int argc, char *argv[]) {
R.parseEval(r_vis_code); R.parseEval(r_vis_code);
} else { /*This is executed by the workers*/ } else { /*This is executed by the workers*/
if (!dht_file.empty()) { if (!dht_file.empty()) {
int res = file_to_table((char *) dht_file.c_str()); int res = file_to_table((char *)dht_file.c_str());
if (res != DHT_SUCCESS) { if (res != DHT_SUCCESS) {
if (res == DHT_WRONG_FILE) { if (res == DHT_WRONG_FILE) {
if (world_rank == 2) cerr << "CPP: Worker: Wrong File" << endl; if (world_rank == 2)
cerr << "CPP: Worker: Wrong File" << endl;
} else { } else {
if (world_rank == 2) cerr << "CPP: Worker: Error in loading current state of DHT from file" << endl; if (world_rank == 2)
cerr << "CPP: Worker: Error in loading current state of DHT from "
"file"
<< endl;
} }
return EXIT_FAILURE; return EXIT_FAILURE;
} else { } else {
if (world_rank == 2) cout << "CPP: Worker: Successfully loaded state of DHT from file " << dht_file << endl; if (world_rank == 2)
cout << "CPP: Worker: Successfully loaded state of DHT from file "
<< dht_file << endl;
std::cout.flush(); std::cout.flush();
} }
} }
@ -793,12 +823,10 @@ int main(int argc, char *argv[]) {
cout << "CPP: finished, cleanup of process " << world_rank << endl; cout << "CPP: finished, cleanup of process " << world_rank << endl;
if (dht_enabled) if (dht_enabled) {
{
if (dht_strategy == 0) if (dht_strategy == 0) {
{ if (world_rank != 0) {
if(world_rank != 0) {
DHT_free(dht_object, NULL, NULL); DHT_free(dht_object, NULL, NULL);
} }
} else { } else {
@ -809,10 +837,9 @@ int main(int argc, char *argv[]) {
free(mpi_buffer); free(mpi_buffer);
MPI_Finalize(); MPI_Finalize();
if (world_rank==0) { if (world_rank == 0) {
cout << "CPP: done, bye!" << endl; cout << "CPP: done, bye!" << endl;
} }
exit(0); exit(0);
} }

56
src/util/RRuntime.cpp Normal file
View File

@ -0,0 +1,56 @@
#include "RRuntime.h"
#include <RInside.h>
#include <Rcpp.h>
#include <string>
using namespace poet;
/**
* Convert a R dataframe into a C continious memory area.
*
* @param varname Name of the R internal variable name.
*/
void RRuntime::to_C_domain(double *buffer) {
size_t rowCount = dfbuff.nrow();
size_t colCount = dfbuff.ncol();
for (size_t i = 0; i < rowCount; i++) {
for (size_t j = 0; j < colCount; j++) {
/* Access column vector j and extract value of line i */
Rcpp::DoubleVector col = dfbuff[j];
buffer[i * colCount + j] = col[i];
}
}
}
/**
* Convert continious C memory area into R dataframe and puts it into R runtime.
*
* @param buffer Pointer to memory area which should be converted into R
* dataframe.
* @param skeleton Defines the raw data frame structure and muste be defined
* inside the R runtime beforehand.
* @param varname Name of the R internal variable name.
*/
void RRuntime::from_C_domain(double *buffer) {
size_t rowCount = dfbuff.nrow();
size_t colCount = dfbuff.ncol();
for (size_t i = 0; i < rowCount; i++) {
for (size_t j = 0; j < colCount; j++) {
/* Access column vector j and extract value of line i */
Rcpp::DoubleVector col = dfbuff[j];
col[i] = buffer[i * colCount + j];
}
}
}
void RRuntime::setBufferDataFrame(std::string dfname) {
this->dfbuff = parseEval(dfname);
}
Rcpp::DataFrame RRuntime::getBufferDataFrame() { return this->dfbuff; }
size_t RRuntime::getBufferNCol() { return (this->dfbuff).ncol(); }
size_t RRuntime::getBufferNRow() { return (this->dfbuff).nrow(); }

36
src/util/RRuntime.h Normal file
View File

@ -0,0 +1,36 @@
#ifndef RRUNTIME_H
#define RRUNTIME_H
#include <RInside.h>
#include <Rcpp.h>
#include <string>
namespace poet {
/**
* RRuntime is a wrapper class around a RInside (R) runtime and provides several
* simplified methods to use R commands inside POET.
*
* If an instance of RRuntime is created a R runtime will also be spawned.
*/
class RRuntime : public RInside {
public:
/**
* Constructor of class RRuntime calling constructor of RInside.
*/
RRuntime(const int argc, const char *const argv[]) : RInside(argc, argv){};
void to_C_domain(double *buffer);
void from_C_domain(double *buffer);
void setBufferDataFrame(std::string dfname);
Rcpp::DataFrame getBufferDataFrame();
size_t getBufferNCol();
size_t getBufferNRow();
private:
Rcpp::DataFrame dfbuff;
};
} // namespace poet
#endif // RRUNTIME_H

View File

@ -1,12 +1,15 @@
#include "worker.h" #include "worker.h"
#include "dht_wrapper.h" #include "dht_wrapper.h"
#include "global_buffer.h" #include "global_buffer.h"
#include "r_utils.h" #include "util/RRuntime.h"
#include <mpi.h>
#include <iostream> #include <iostream>
#include <mpi.h>
#include <Rcpp.h>
void worker_function(RInside& R) using namespace poet;
{ using namespace Rcpp;
void worker_function(RRuntime R) {
int world_rank; int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Status probe_status; MPI_Status probe_status;
@ -19,42 +22,43 @@ void worker_function(RInside& R)
double idle_a, idle_b; double idle_a, idle_b;
double cummul_idle = 0.f; double cummul_idle = 0.f;
double dht_get_start=0, dht_get_end=0; double dht_get_start = 0, dht_get_end = 0;
double dht_fill_start=0, dht_fill_end=0; double dht_fill_start = 0, dht_fill_end = 0;
double phreeqc_time_start=0, phreeqc_time_end=0; double phreeqc_time_start = 0, phreeqc_time_end = 0;
int phreeqc_count = 0; int phreeqc_count = 0;
//timing[0] -> phreeqc // timing[0] -> phreeqc
//timing[1] -> dht_get // timing[1] -> dht_get
//timing[2] -> dht_fill // timing[2] -> dht_fill
double timing[3]; double timing[3];
timing[0] = 0.0; timing[0] = 0.0;
timing[1] = 0.0; timing[1] = 0.0;
timing[2] = 0.0; timing[2] = 0.0;
//dht_perf[0] -> hits // dht_perf[0] -> hits
//dht_perf[1] -> miss // dht_perf[1] -> miss
//dht_perf[2] -> collisions // dht_perf[2] -> collisions
uint64_t dht_perf[3]; uint64_t dht_perf[3];
if (dht_enabled) if (dht_enabled) {
{ dht_flags.resize(work_package_size, true); // set size
dht_flags.resize(work_package_size, true); //set size dht_flags.assign(work_package_size,
dht_flags.assign(work_package_size, true); //assign all elements to true (default) true); // assign all elements to true (default)
dht_hits = 0; dht_hits = 0;
dht_miss = 0; dht_miss = 0;
dht_collision = 0; dht_collision = 0;
// MDL: This code has now been moved to kin.cpp // MDL: This code has now been moved to kin.cpp
// /*Load significance vector from R setup file (or set default)*/ // /*Load significance vector from R setup file (or set default)*/
// bool signif_vector_exists = R.parseEval("exists('signif_vector')"); // bool signif_vector_exists = R.parseEval("exists('signif_vector')");
// if (signif_vector_exists) // if (signif_vector_exists)
// { // {
// dht_significant_digits_vector = as<std::vector<int>>(R["signif_vector"]); // dht_significant_digits_vector =
// as<std::vector<int>>(R["signif_vector"]);
// } else // } else
// { // {
// dht_significant_digits_vector.assign(dht_object->key_size / sizeof(double), dht_significant_digits); // dht_significant_digits_vector.assign(dht_object->key_size /
// sizeof(double), dht_significant_digits);
// } // }
// /*Load property type vector from R setup file (or set default)*/ // /*Load property type vector from R setup file (or set default)*/
@ -64,26 +68,25 @@ void worker_function(RInside& R)
// prop_type_vector = as<std::vector<string>>(R["prop_type"]); // prop_type_vector = as<std::vector<string>>(R["prop_type"]);
// } else // } else
// { // {
// prop_type_vector.assign(dht_object->key_size / sizeof(double), "normal"); // prop_type_vector.assign(dht_object->key_size / sizeof(double),
// "normal");
// } // }
} }
//initialization of helper variables // initialization of helper variables
iteration = 0; iteration = 0;
dt = 0; dt = 0;
current_sim_time = 0; current_sim_time = 0;
local_work_package_size = 0; local_work_package_size = 0;
/*worker loop*/ /*worker loop*/
while(1) while (1) {
{
/*Wait for Message*/ /*Wait for Message*/
idle_a = MPI_Wtime(); idle_a = MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status); MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &probe_status);
idle_b = MPI_Wtime(); idle_b = MPI_Wtime();
if (probe_status.MPI_TAG == TAG_WORK) if (probe_status.MPI_TAG == TAG_WORK) { /* do work */
{ /* do work */
cummul_idle += idle_b - idle_a; cummul_idle += idle_b - idle_a;
@ -91,28 +94,32 @@ void worker_function(RInside& R)
MPI_Get_count(&probe_status, MPI_DOUBLE, &count); MPI_Get_count(&probe_status, MPI_DOUBLE, &count);
/* receive */ /* receive */
MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(mpi_buffer, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
//decrement count of work_package by BUFFER_OFFSET // decrement count of work_package by BUFFER_OFFSET
count -= BUFFER_OFFSET; count -= BUFFER_OFFSET;
//check for changes on all additional variables given by the 'header' of mpi_buffer // check for changes on all additional variables given by the 'header' of
if (mpi_buffer[count] != local_work_package_size) { //work_package_size // mpi_buffer
if (mpi_buffer[count] != local_work_package_size) { // work_package_size
local_work_package_size = mpi_buffer[count]; local_work_package_size = mpi_buffer[count];
R["work_package_size"] = local_work_package_size; R["work_package_size"] = local_work_package_size;
R.parseEvalQ("mysetup$work_package_size <- work_package_size"); R.parseEvalQ("mysetup$work_package_size <- work_package_size");
} }
if (mpi_buffer[count+1] != iteration) { //current iteration of simulation if (mpi_buffer[count + 1] !=
iteration = mpi_buffer[count+1]; iteration) { // current iteration of simulation
iteration = mpi_buffer[count + 1];
R["iter"] = iteration; R["iter"] = iteration;
R.parseEvalQ("mysetup$iter <- iter"); R.parseEvalQ("mysetup$iter <- iter");
} }
if (mpi_buffer[count+2] != dt) { //current timestep size if (mpi_buffer[count + 2] != dt) { // current timestep size
dt = mpi_buffer[count+2]; dt = mpi_buffer[count + 2];
R["dt"] = dt; R["dt"] = dt;
R.parseEvalQ("mysetup$dt <- dt"); R.parseEvalQ("mysetup$dt <- dt");
} }
if (mpi_buffer[count+3] != current_sim_time) { //current simulation time ('age' of simulation) if (mpi_buffer[count + 3] !=
current_sim_time = mpi_buffer[count+3]; current_sim_time) { // current simulation time ('age' of simulation)
current_sim_time = mpi_buffer[count + 3];
R["simulation_time"] = current_sim_time; R["simulation_time"] = current_sim_time;
R.parseEvalQ("mysetup$simulation_time <- simulation_time"); R.parseEvalQ("mysetup$simulation_time <- simulation_time");
} }
@ -123,87 +130,87 @@ void worker_function(RInside& R)
// } // }
/* get df with right structure to fill in work package */ /* get df with right structure to fill in work package */
R.parseEvalQ("tmp2 <- head(mysetup$state_C, work_package_size)"); R.parseEvalQ("skeleton <- head(mysetup$state_C, work_package_size)");
// R.parseEval("print(rownames(tmp2)[1:5])"); // R.parseEval("print(rownames(tmp2)[1:5])");
// R.parseEval("print(head(tmp2, 2))"); // R.parseEval("print(head(tmp2, 2))");
// R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))"); // R.parseEvalQ("tmp2$id <- as.double(rownames(tmp2))");
Rcpp::DataFrame buffer = R.parseEval("tmp2"); //Rcpp::DataFrame buffer = R.parseEval("tmp2");
R.setBufferDataFrame("skeleton");
if (dht_enabled) if (dht_enabled) {
{
// DEBUG // DEBUG
// cout << "RANK " << world_rank << " start checking DHT\n"; // cout << "RANK " << world_rank << " start checking DHT\n";
//resize helper vector dht_flags of work_package_size changes // resize helper vector dht_flags of work_package_size changes
if ((int) dht_flags.size() != local_work_package_size) { if ((int)dht_flags.size() != local_work_package_size) {
dht_flags.resize(local_work_package_size, true); //set size dht_flags.resize(local_work_package_size, true); // set size
dht_flags.assign(local_work_package_size, true); //assign all elements to true (default) dht_flags.assign(local_work_package_size,
true); // assign all elements to true (default)
} }
dht_get_start = MPI_Wtime(); dht_get_start = MPI_Wtime();
check_dht(R, local_work_package_size, dht_flags, mpi_buffer); check_dht(R, local_work_package_size, dht_flags, mpi_buffer);
dht_get_end = MPI_Wtime(); dht_get_end = MPI_Wtime();
//DEBUG // DEBUG
//cout << "RANK " << world_rank << " checking DHT complete \n"; // cout << "RANK " << world_rank << " checking DHT complete \n";
R["dht_flags"] = as<LogicalVector>(wrap(dht_flags)); R["dht_flags"] = as<LogicalVector>(wrap(dht_flags));
//R.parseEvalQ("print(head(dht_flags))"); // R.parseEvalQ("print(head(dht_flags))");
} }
/* work */ /* work */
convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer); R.from_C_domain(mpi_buffer);
R["work_package_full"] = buffer; //convert_C_buffer_2_R_Dataframe(mpi_buffer, buffer);
//R["work_package"] = buffer; R["work_package_full"] = R.getBufferDataFrame();
// R["work_package"] = buffer;
//DEBUG // DEBUG
//R.parseEvalQ("print(head(work_package_full))"); // R.parseEvalQ("print(head(work_package_full))");
//R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )"); // R.parseEvalQ("print( c(length(dht_flags), nrow(work_package_full)) )");
if (dht_enabled) if (dht_enabled) {
{
R.parseEvalQ("work_package <- work_package_full[dht_flags,]"); R.parseEvalQ("work_package <- work_package_full[dht_flags,]");
} else { } else {
R.parseEvalQ("work_package <- work_package_full"); R.parseEvalQ("work_package <- work_package_full");
} }
//DEBUG // DEBUG
// R.parseEvalQ("print(head(work_package),2)"); // R.parseEvalQ("print(head(work_package),2)");
// R.parseEvalQ("rownames(work_package) <- work_package$id"); // R.parseEvalQ("rownames(work_package) <- work_package$id");
// R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in% colnames(work_package)"); // R.parseEval("print(paste('id %in% colnames(work_package)', 'id' %in%
// R.parseEvalQ("id_store <- rownames(work_package)"); //"[, ncol(work_package)]"); // colnames(work_package)"); R.parseEvalQ("id_store <-
// rownames(work_package)"); //"[, ncol(work_package)]");
// R.parseEvalQ("work_package$id <- NULL"); // R.parseEvalQ("work_package$id <- NULL");
R.parseEvalQ("work_package <- as.matrix(work_package)"); R.parseEvalQ("work_package <- as.matrix(work_package)");
unsigned int nrows = R.parseEval("nrow(work_package)"); unsigned int nrows = R.parseEval("nrow(work_package)");
if (nrows > 0) if (nrows > 0) {
{
/*Single Line error Workaround*/ /*Single Line error Workaround*/
if (nrows <=1) if (nrows <= 1) {
{ // duplicate line to enable correct simmulation
//duplicate line to enable correct simmulation R.parseEvalQ("work_package <- work_package[rep(1:nrow(work_package), "
R.parseEvalQ("work_package <- work_package[rep(1:nrow(work_package), times = 2), ]"); "times = 2), ]");
} }
phreeqc_count++; phreeqc_count++;
phreeqc_time_start = MPI_Wtime(); phreeqc_time_start = MPI_Wtime();
// MDL // MDL
// R.parseEvalQ("print('Work_package:\n'); print(head(work_package , 2)); cat('RCpp: worker_function:', local_rank, ' \n')"); // R.parseEvalQ("print('Work_package:\n'); print(head(work_package ,
R.parseEvalQ("result <- as.data.frame(slave_chemistry(setup=mysetup, data = work_package))"); // 2)); cat('RCpp: worker_function:', local_rank, ' \n')");
R.parseEvalQ("result <- as.data.frame(slave_chemistry(setup=mysetup, "
"data = work_package))");
phreeqc_time_end = MPI_Wtime(); phreeqc_time_end = MPI_Wtime();
// R.parseEvalQ("result$id <- id_store"); // R.parseEvalQ("result$id <- id_store");
} else } else {
{ // cout << "Work-Package is empty, skipping phreeqc!" << endl;
//cout << "Work-Package is empty, skipping phreeqc!" << endl;
} }
if (dht_enabled) {
if (dht_enabled)
{
R.parseEvalQ("result_full <- work_package_full"); R.parseEvalQ("result_full <- work_package_full");
if (nrows > 0) if (nrows > 0)
R.parseEvalQ("result_full[dht_flags,] <- result"); R.parseEvalQ("result_full[dht_flags,] <- result");
@ -211,17 +218,20 @@ void worker_function(RInside& R)
R.parseEvalQ("result_full <- result"); R.parseEvalQ("result_full <- result");
} }
Rcpp::DataFrame result = R.parseEval("result_full"); R.setBufferDataFrame("result_full");
convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result); //Rcpp::DataFrame result = R.parseEval("result_full");
//convert_R_Dataframe_2_C_buffer(mpi_buffer_results, result);
R.to_C_domain(mpi_buffer_results);
/* send results to master */ /* send results to master */
MPI_Request send_req; MPI_Request send_req;
MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD, &send_req); MPI_Isend(mpi_buffer_results, count, MPI_DOUBLE, 0, TAG_WORK,
MPI_COMM_WORLD, &send_req);
if (dht_enabled) if (dht_enabled) {
{
dht_fill_start = MPI_Wtime(); dht_fill_start = MPI_Wtime();
fill_dht(R, local_work_package_size, dht_flags, mpi_buffer, mpi_buffer_results); fill_dht(R, local_work_package_size, dht_flags, mpi_buffer,
mpi_buffer_results);
dht_fill_end = MPI_Wtime(); dht_fill_end = MPI_Wtime();
timing[1] += dht_get_end - dht_get_start; timing[1] += dht_get_end - dht_get_start;
@ -230,45 +240,52 @@ void worker_function(RInside& R)
timing[0] += phreeqc_time_end - phreeqc_time_start; timing[0] += phreeqc_time_end - phreeqc_time_start;
MPI_Wait(&send_req,MPI_STATUS_IGNORE); MPI_Wait(&send_req, MPI_STATUS_IGNORE);
} else if (probe_status.MPI_TAG == TAG_FINISH) } else if (probe_status.MPI_TAG == TAG_FINISH) { /* recv and die */
{ /* recv and die */
/* before death, submit profiling/timings to master*/ /* before death, submit profiling/timings to master*/
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_FINISH, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
//timings // timings
MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); MPI_Send(timing, 3, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD); MPI_Send(&phreeqc_count, 1, MPI_INT, 0, TAG_TIMING, MPI_COMM_WORLD);
MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD); MPI_Send(&cummul_idle, 1, MPI_DOUBLE, 0, TAG_TIMING, MPI_COMM_WORLD);
if(dht_enabled) if (dht_enabled) {
{ // dht_perf
//dht_perf
dht_perf[0] = dht_hits; dht_perf[0] = dht_hits;
dht_perf[1] = dht_miss; dht_perf[1] = dht_miss;
dht_perf[2] = dht_collision; dht_perf[2] = dht_collision;
MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF, MPI_COMM_WORLD); MPI_Send(dht_perf, 3, MPI_UNSIGNED_LONG_LONG, 0, TAG_DHT_PERF,
MPI_COMM_WORLD);
} }
break; break;
} else if ((probe_status.MPI_TAG == TAG_DHT_STATS)) { } else if ((probe_status.MPI_TAG == TAG_DHT_STATS)) {
MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_STATS, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(NULL, 0, MPI_DOUBLE, 0, TAG_DHT_STATS, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
print_statistics(); print_statistics();
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
} else if ((probe_status.MPI_TAG == TAG_DHT_STORE)) { } else if ((probe_status.MPI_TAG == TAG_DHT_STORE)) {
char* outdir; char *outdir;
MPI_Get_count(&probe_status, MPI_CHAR, &count); MPI_Get_count(&probe_status, MPI_CHAR, &count);
outdir = (char *) calloc(count + 1, sizeof(char)); outdir = (char *)calloc(count + 1, sizeof(char));
MPI_Recv(outdir, count, MPI_CHAR, 0, TAG_DHT_STORE, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(outdir, count, MPI_CHAR, 0, TAG_DHT_STORE, MPI_COMM_WORLD,
int res = table_to_file((char *) outdir); MPI_STATUS_IGNORE);
int res = table_to_file((char *)outdir);
if (res != DHT_SUCCESS) { if (res != DHT_SUCCESS) {
if (world_rank == 2) cerr << "CPP: Worker: Error in writing current state of DHT to file (TAG_DHT_STORE)" << endl; if (world_rank == 2)
cerr << "CPP: Worker: Error in writing current state of DHT to file "
"(TAG_DHT_STORE)"
<< endl;
} else { } else {
if (world_rank == 2) cout << "CPP: Worker: Successfully written DHT to file " << outdir << endl; if (world_rank == 2)
cout << "CPP: Worker: Successfully written DHT to file " << outdir
<< endl;
} }
free(outdir); free(outdir);
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);

View File

@ -1,11 +1,10 @@
#pragma once #pragma once
#include <RInside.h> #include "util/RRuntime.h"
using namespace std; using namespace std;
using namespace Rcpp; using namespace poet;
/*Functions*/ /*Functions*/
void worker_function(RInside &R); void worker_function(RRuntime R);
/*Globals*/ /*Globals*/