Build libraries for each directory

This commit is contained in:
Max Lübke 2021-01-25 17:40:32 +01:00
parent 5b44d94730
commit 76a32c12c0
No known key found for this signature in database
GPG Key ID: D3201E51647D1199
27 changed files with 509 additions and 504 deletions

25
CMake/FindR.cmake Normal file
View File

@ -0,0 +1,25 @@
# prepare R environment (Rcpp + RInside)
find_program(R_EXE "R")
# search for R executable, R header file and library path
if(R_EXE)
execute_process(COMMAND ${R_EXE} RHOME
OUTPUT_VARIABLE R_ROOT_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
find_path(R_INCLUDE_DIR R.h
HINTS ${R_ROOT_DIR}
PATHS /usr/inlcude /usr/local/include /usr/share
PATH_SUFFIXES include/R R/include
)
find_library(R_LIBRARY R
HINTS ${R_ROOT_DIR}/lib
)
else()
message(FATAL_ERROR "No R runtime found!")
endif()
set(R_LIBRARIES ${R_LIBRARY})
set(R_INCLUDE_DIRS ${R_INCLUDE_DIR})

23
CMake/FindRInside.cmake Normal file
View File

@ -0,0 +1,23 @@
# find RInside libraries and include path
execute_process(COMMAND echo "cat(find.package('RInside'))"
COMMAND ${R_EXE} --vanilla --slave
RESULT_VARIABLE RINSIDE_NOT_FOUND
ERROR_QUIET
OUTPUT_VARIABLE RINSIDE_PATH
OUTPUT_STRIP_TRAILING_WHITESPACE
)
if(RInside_NOT_FOUND)
message(FATAL_ERROR "RInside not found!")
endif()
find_library(R_RInside_LIBRARY libRInside.so
HINTS ${RINSIDE_PATH}/lib)
list(APPEND R_LIBRARIES ${R_RInside_LIBRARY})
find_path(R_RInside_INCLUDE_DIR RInside.h
HINTS ${RINSIDE_PATH}
PATH_SUFFIXES include)
list(APPEND R_INCLUDE_DIRS ${R_RInside_INCLUDE_DIR})

23
CMake/FindRcpp.cmake Normal file
View File

@ -0,0 +1,23 @@
# find Rcpp include directory
execute_process(COMMAND echo "cat(find.package('Rcpp'))"
COMMAND ${R_EXE} --vanilla --slave
RESULT_VARIABLE RCPP_NOT_FOUND
ERROR_QUIET
OUTPUT_VARIABLE RCPP_PATH
OUTPUT_STRIP_TRAILING_WHITESPACE
)
if(RCPP_NOT_FOUND)
message(FATAL_ERROR "Rcpp not found!")
endif()
# find_library(R_Rcpp_LIBRARY Rcpp.so
# HINTS ${RCPP_PATH}/libs)
# list(APPEND R_LIBRARIES ${R_Rcpp_LIBRARY})
find_path(R_Rcpp_INCLUDE_DIR Rcpp.h
HINTS ${RCPP_PATH}
PATH_SUFFIXES include)
list(APPEND R_INCLUDE_DIRS ${R_Rcpp_INCLUDE_DIR})

21
CMakeLists.txt Normal file
View File

@ -0,0 +1,21 @@
# Version 3.9+ offers new MPI package variables
cmake_minimum_required(VERSION 3.9)
project(POET VERSION 0.1 LANGUAGES CXX C)
# specify the C++ standard
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
list(APPEND CMAKE_MODULE_PATH "${POET_SOURCE_DIR}/CMake")
set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS")
add_definitions(${GCC_CXX_FLAGS})
find_package(MPI REQUIRED)
find_package(R REQUIRED)
find_package(Rcpp REQUIRED)
find_package(RInside REQUIRED)
add_subdirectory(src)

View File

@ -1,103 +1,6 @@
# Version 3.9+ offers new MPI package variables
cmake_minimum_required(VERSION 3.9)
project(POET VERSION 0.1)
# Not needed until now
# specify the C++ standard
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
find_package(MPI REQUIRED)
set(GCC_CXX_FLAGS "-D STRICT_R_HEADERS")
add_definitions(${GCC_CXX_FLAGS})
# prepare R environment (Rcpp + RInside)
find_program(R_EXE "R")
# search for R executable, R header file and library path
if(R_EXE)
execute_process(COMMAND ${R_EXE} RHOME
OUTPUT_VARIABLE R_ROOT_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
find_path(R_INCLUDE_DIR R.h
HINTS ${R_ROOT_DIR}
PATHS /usr/inlcude /usr/local/include /usr/share
PATH_SUFFIXES include/R R/include
)
find_library(R_LIBRARY R
HINTS ${R_ROOT_DIR}/lib
)
else()
message(FATAL_ERROR "No R runtime found!")
endif()
set(R_LIBRARIES ${R_LIBRARY})
set(R_INCLUDE_DIRS ${R_INCLUDE_DIR})
# find Rcpp include directory
execute_process(COMMAND echo "cat(find.package('Rcpp'))"
COMMAND ${R_EXE} --vanilla --slave
RESULT_VARIABLE RCPP_NOT_FOUND
ERROR_QUIET
OUTPUT_VARIABLE RCPP_PATH
OUTPUT_STRIP_TRAILING_WHITESPACE
)
if(RCPP_NOT_FOUND)
message(FATAL_ERROR "Rcpp not found!")
endif()
find_path(R_Rcpp_INCLUDE_DIR Rcpp.h
HINTS ${RCPP_PATH}
PATH_SUFFIXES include)
list(APPEND R_INCLUDE_DIRS ${R_Rcpp_INCLUDE_DIR})
# find RInside libraries and include path
execute_process(COMMAND echo "cat(find.package('RInside'))"
COMMAND ${R_EXE} --vanilla --slave
RESULT_VARIABLE RINSIDE_NOT_FOUND
ERROR_QUIET
OUTPUT_VARIABLE RINSIDE_PATH
OUTPUT_STRIP_TRAILING_WHITESPACE
)
if(RInside_NOT_FOUND)
message(FATAL_ERROR "RInside not found!")
endif()
find_library(R_RInside_LIBRARY libRInside.so
HINTS ${RINSIDE_PATH}/lib)
list(APPEND R_LIBRARIES ${R_RInside_LIBRARY})
find_path(R_RInside_INCLUDE_DIR RInside.h
HINTS ${RINSIDE_PATH}
PATH_SUFFIXES include)
list(APPEND R_INCLUDE_DIRS ${R_RInside_INCLUDE_DIR})
#include found directories for the whole scope (will be changed with modularization)
include_directories(${MPI_CXX_INCLUDE_DIRS})
#define program libraries
add_library(POET_Libs STATIC util/RRuntime.cpp model/Grid.cpp model/ChemSim.cpp model/ChemMaster.cpp model/ChemWorker.cpp DHT/DHT_Wrapper.cpp DHT/DHT.cpp util/Parser.cpp model/TransportSim.cpp util/Profiler.cpp)
target_include_directories(POET_Libs PUBLIC ${R_INCLUDE_DIRS})
target_link_libraries(POET_Libs ${R_LIBRARIES} MPI::MPI_CXX crypto)
#add_library(DHT OBJECT DHT.cpp dht_wrapper.cpp)
#target_link_libraries(DHT crypto R_Wrapper)
#add_library(Worker OBJECT worker.cpp)
#target_link_libraries(Worker ${R_LIBRARIES} MPI::MPI_CXX R_Wrapper)
add_executable(poet kin.cpp)
target_link_libraries(poet PUBLIC MPI::MPI_CXX POET_Libs)
target_link_libraries(poet PRIVATE POET_Model POET_Util)
add_subdirectory(DHT)
add_subdirectory(model)
add_subdirectory(util)

7
src/DHT/CMakeLists.txt Normal file
View File

@ -0,0 +1,7 @@
add_library(DHT SHARED DHT.c DHT.h)
target_include_directories(DHT PRIVATE ${MPI_C_INCLUDE_DIRS})
target_link_libraries(DHT MPI::MPI_C)
add_library(DHT_Wrapper STATIC DHT_Wrapper.cpp DHT_Wrapper.h)
target_include_directories(DHT_Wrapper PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(DHT_Wrapper DHT m crypto POET_Util)

View File

@ -5,22 +5,6 @@
#include <stdlib.h>
#include <string.h>
/**
* @brief Determine destination rank and index.
*
* This is done by looping over all possbile indices. First of all, set a
* temporary index to zero and copy count of bytes for each index into the
* memory area of the temporary index. After that the current index is
* calculated by the temporary index modulo the table size. The destination rank
* of the process is simply determined by hash modulo the communicator size.
*
* @param hash Calculated 64 bit hash.
* @param comm_size Communicator size.
* @param table_size Count of buckets per process.
* @param dest_rank Reference to the destination rank variable.
* @param index Pointer to the array index.
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int *dest_rank,
unsigned int *index, unsigned int index_count) {
@ -36,27 +20,11 @@ static void determine_dest(uint64_t hash, int comm_size,
*dest_rank = (unsigned int)(hash % comm_size);
}
/**
* @brief Set the occupied flag.
*
* This will set the first bit of a bucket to 1.
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char *flag_byte) {
*flag_byte = 0;
*flag_byte |= (1 << 0);
}
/**
* @brief Get the occupied flag.
*
* This function determines whether the occupied flag of a bucket was set or
* not.
*
* @param flag_byte First byte of a bucket.
* @return int Returns 1 for true or 0 for false.
*/
static int read_flag(char flag_byte) {
if ((flag_byte & 0x01) == 0x01) {
return 1;
@ -228,7 +196,7 @@ int DHT_read(DHT *table, void *send_key, void *destination) {
#endif
// unlock window and return
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
return DHT_READ_MISS;
}
// ... or key doesn't match passed by key and last index reached.
@ -241,7 +209,7 @@ int DHT_read(DHT *table, void *send_key, void *destination) {
#endif
// unlock window an return
if (MPI_Win_unlock(dest_rank, table->window) != 0) return DHT_MPI_ERROR;
return DHT_READ_ERROR;
return DHT_READ_MISS;
}
} else
break;
@ -344,7 +312,7 @@ int DHT_from_file(DHT *table, const char *filename) {
// seek behind header of DHT file
if (MPI_File_seek(file, DHT_FILEHEADER_SIZE, MPI_SEEK_SET) != 0)
return DHT_FILE_IO_ERROR;
// current position is rank * bucket_size + OFFSET
cur_pos = DHT_FILEHEADER_SIZE + (rank * bucket_size);

View File

@ -20,7 +20,7 @@
/** Returned if some error in MPI routine occurs. */
#define DHT_MPI_ERROR -1
/** Returned by a call of DHT_read if no bucket with given key was found. */
#define DHT_READ_ERROR -2
#define DHT_READ_MISS -2
/** Returned by DHT_write if a bucket was evicted. */
#define DHT_WRITE_SUCCESS_WITH_COLLISION -3
/** Returned when no errors occured. */
@ -202,7 +202,7 @@ extern int DHT_to_file(DHT* table, const char* filename);
* @param table Pointer to the \a DHT-object.
* @param filename Name of the file to read from.
* @return int Returns DHT_SUCCESS on succes, DHT_FILE_IO_ERROR if file can't be
* opened/closed, DHT_READ_ERROR if file is not readable or DHT_WRONG_FILE if
* opened/closed, DHT_READ_MISS if file is not readable or DHT_WRONG_FILE if
* file doesn't match expectation. This is possible if the data size or key size
* is different.
*/
@ -235,7 +235,7 @@ extern int DHT_free(DHT* table, int* eviction_counter, int* readerror_counter);
* -# free buckets (in respect to the memory of this process)
* -# calls of DHT_write (w_access)
* -# calls of DHT_read (r_access)
* -# read misses (see DHT_READ_ERROR)
* -# read misses (see DHT_READ_MISS)
* -# collisions (see DHT_WRITE_SUCCESS_WITH_COLLISION)
* 3-6 will reset with every call of this function finally the amount of new
* written entries is printed out (since the last call of this funtion).
@ -250,9 +250,48 @@ extern int DHT_free(DHT* table, int* eviction_counter, int* readerror_counter);
* @return int Returns DHT_SUCCESS on success or DHT_MPI_ERROR on internal MPI
* error.
*/
#ifdef DHT_STATISTICS
extern int DHT_print_statistics(DHT* table);
#endif
/**
* @brief Determine destination rank and index.
*
* This is done by looping over all possbile indices. First of all, set a
* temporary index to zero and copy count of bytes for each index into the
* memory area of the temporary index. After that the current index is
* calculated by the temporary index modulo the table size. The destination rank
* of the process is simply determined by hash modulo the communicator size.
*
* @param hash Calculated 64 bit hash.
* @param comm_size Communicator size.
* @param table_size Count of buckets per process.
* @param dest_rank Reference to the destination rank variable.
* @param index Pointer to the array index.
* @param index_count Count of possible indeces.
*/
static void determine_dest(uint64_t hash, int comm_size,
unsigned int table_size, unsigned int* dest_rank,
unsigned int* index, unsigned int index_count);
/**
* @brief Set the occupied flag.
*
* This will set the first bit of a bucket to 1.
*
* @param flag_byte First byte of a bucket.
*/
static void set_flag(char* flag_byte);
/**
* @brief Get the occupied flag.
*
* This function determines whether the occupied flag of a bucket was set or
* not.
*
* @param flag_byte First byte of a bucket.
* @return int Returns 1 for true or 0 for false.
*/
static int read_flag(char flag_byte);
#endif /* DHT_H */

View File

@ -1,11 +1,10 @@
#include "DHT_Wrapper.h"
#include <math.h>
#include <openssl/md5.h>
#include <iostream>
#include "DHT.h"
using namespace poet;
using namespace std;
@ -60,7 +59,7 @@ void DHT_Wrapper::checkDHT(int length, std::vector<bool> &out_result_index,
// flag that this line is replaced by DHT-value, do not simulate!!
out_result_index[i] = false;
dht_hits++;
} else if (res == DHT_READ_ERROR) {
} else if (res == DHT_READ_MISS) {
// this line is untouched, simulation is needed
out_result_index[i] = true;
dht_miss++;

View File

@ -1,13 +1,16 @@
#ifndef DHT_WRAPPER_H
#define DHT_WRAPPER_H
#include <mpi.h>
#include <SimParams.h>
#include <string>
#include <vector>
#include "../util/SimParams.h"
#include "DHT.h"
extern "C" {
#include <DHT.h>
}
#include <mpi.h>
#define ROUND(value, signif) \
(((int)(pow(10.0, (double)signif) * value)) * pow(10.0, (double)-signif))

View File

@ -1,5 +1,4 @@
#include <Rcpp.h>
#include <mpi.h> // mpi header file
#include <cstring>
#include <iostream>
@ -7,16 +6,14 @@
#include <vector>
// #include "DHT.h" // MPI-DHT Implementation
#include "argh.h" // Argument handler https://github.com/adishavit/argh BSD-licenced
// #include "dht_wrapper.h"
// #include "global_buffer.h"
#include "model/ChemSim.h"
#include "model/Grid.h"
#include "model/TransportSim.h"
#include "util/Parser.h"
#include "util/RRuntime.h"
#include "util/SimParams.h"
#include "util/Profiler.h"
// #include "argh.h" // Argument handler https://github.com/adishavit/argh
// BSD-licenced #include "dht_wrapper.h" #include "global_buffer.h"
#include <ChemSim.h>
#include <Grid.h>
#include <Parser.h>
#include <RRuntime.h>
#include <SimParams.h>
#include <TransportSim.h>
// #include "worker.h"
//#define DHT_SIZE_PER_PROCESS 1073741824
@ -277,7 +274,7 @@ int main(int argc, char *argv[]) {
R.parseEval(init_chemistry_code);
Grid grid(R);
params.grid = &grid;
// params.grid = &grid;
grid.init();
/* Retrieve state_C from R context for MPI buffer generation */
// Rcpp::DataFrame state_C = R.parseEval("mysetup$state_C");
@ -306,7 +303,7 @@ int main(int argc, char *argv[]) {
// R["dht_enabled"] = params.dht_enabled;
// R["dht_log"] = params.dht_log;
params.R = &R;
// params.R = &R;
if (params.dht_enabled) {
// cout << "\nCreating DHT\n";
@ -417,7 +414,7 @@ int main(int argc, char *argv[]) {
cout << "CPP: Calling Advection" << endl;
trans.runIteration();
trans.run();
// sim_b_transport = MPI_Wtime();
// R.parseEvalQ("mysetup <- master_advection(setup=mysetup)");
// sim_a_transport = MPI_Wtime();
@ -428,175 +425,193 @@ int main(int argc, char *argv[]) {
/*Fallback for sequential execution*/
if (params.world_size == 1) {
master.runSeq();
} else { /*send work to workers*/
master.runPar();
master.ChemSim::run();
} else {
master.run();
}
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
// MDL master_iteration_end just writes on disk state_T and
// state_C after every iteration if the cmdline option
// --ignore-results is not given (and thus the R variable
// store_result is TRUE)
R.parseEvalQ("mysetup <- master_iteration_end(setup=mysetup)");
// cummul_transport += trans.getTransportTime();
// cummul_chemistry += master.getChemistryTime();
// cummul_transport += trans.getTransportTime();
// cummul_chemistry += master.getChemistryTime();
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
cout << endl
<< "CPP: End of *coupling* iteration " << iter << "/" << maxiter
<< endl
<< endl;
// master_send.push_back(master.getSendTime(), "it_" + to_string(iter));
// master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter));
// master_send.push_back(master.getSendTime(), "it_" + to_string(iter));
// master_recv.push_back(master.getRecvTime(), "it_" + to_string(iter));
for (int i = 1; i < params.world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
}
for (int i = 1; i < params.world_size; i++) {
MPI_Send(NULL, 0, MPI_DOUBLE, i, TAG_DHT_ITER, MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
} // END SIMULATION LOOP
} // END SIMULATION LOOP
cout << "CPP: finished simulation loop" << endl;
cout << "CPP: finished simulation loop" << endl;
sim_end = MPI_Wtime();
sim_end = MPI_Wtime();
Profiler::startProfiling(params, master, trans, R, sim_end - sim_start);
cout << "CPP: start timing profiling" << endl;
// Rcpp::NumericVector phreeqc_time;
// Rcpp::NumericVector dht_get_time;
// Rcpp::NumericVector dht_fill_time;
// Rcpp::IntegerVector phreeqc_counts;
// Rcpp::NumericVector idle_worker;
R.parseEvalQ("profiling <- list()");
// int phreeqc_tmp;
R["simtime"] = sim_end - sim_start;
R.parseEvalQ("profiling$simtime <- simtime");
trans.end();
// timings = (double *)calloc(3, sizeof(double));
// int dht_hits = 0;
// int dht_miss = 0;
// int dht_collision = 0;
// if (params.dht_enabled) {
// dht_hits = 0;
// dht_miss = 0;
// dht_collision = 0;
// dht_perfs = (uint64_t *)calloc(3, sizeof(uint64_t));
// }
// double idle_worker_tmp;
// for (int p = 0; p < params.world_size - 1; p++) {
// /* ATTENTION Worker p has rank p+1 */
// /* Send termination message to worker */
// MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
// MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
// MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
// MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
// if (params.dht_enabled) {
// dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
// dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
// MPI_Recv(dht_perfs, 3, MPI_UNSIGNED_LONG_LONG, p + 1, TAG_DHT_PERF,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// dht_hits += dht_perfs[0];
// dht_miss += dht_perfs[1];
// dht_collision += dht_perfs[2];
// }
// }
// R.parseEvalQ("profiling <- list()");
// R["simtime"] = sim_end - sim_start;
// R.parseEvalQ("profiling$simtime <- simtime");
// R["simtime_transport"] = cummul_transport;
// R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
// R["simtime_chemistry"] = cummul_chemistry;
// R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
// R["simtime_workers"] = master.getWorkerTime();
// R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
// R["simtime_chemistry_master"] = master.getChemMasterTime();
// R.parseEvalQ(
// "profiling$simtime_chemistry_master <- simtime_chemistry_master");
// R["seq_master"] = cummul_master_seq;
// R.parseEvalQ("profiling$seq_master <- seq_master");
// // R["master_send"] = master_send;
// // R.parseEvalQ("profiling$master_send <- master_send");
// // R["master_recv"] = master_recv;
// // R.parseEvalQ("profiling$master_recv <- master_recv");
// R["idle_master"] = master.getIdleTime();
// R.parseEvalQ("profiling$idle_master <- idle_master");
// R["idle_worker"] = idle_worker;
// R.parseEvalQ("profiling$idle_worker <- idle_worker");
// R["phreeqc_time"] = phreeqc_time;
// R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
// R["phreeqc_count"] = phreeqc_counts;
// R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
// if (params.dht_enabled) {
// R["dht_hits"] = dht_hits;
// R.parseEvalQ("profiling$dht_hits <- dht_hits");
// R["dht_miss"] = dht_miss;
// R.parseEvalQ("profiling$dht_miss <- dht_miss");
// R["dht_collision"] = dht_collision;
// R.parseEvalQ("profiling$dht_collisions <- dht_collision");
// R["dht_get_time"] = dht_get_time;
// R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
// R["dht_fill_time"] = dht_fill_time;
// R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
// }
// free(timings);
// if (params.dht_enabled) free(dht_perfs);
cout << "CPP: Done! Results are stored as R objects into <"
<< params.out_dir << "/timings.rds>" << endl;
/*exporting results and profiling data*/
// std::string r_vis_code;
// r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
// R.parseEval(r_vis_code);
} else { /*This is executed by the workers*/
ChemWorker worker(&params, R, grid, dht_comm);
// worker.prepareSimulation(dht_comm);
worker.loop();
if (params.world_size == 1) {
master.ChemSim::end();
} else {
master.end();
}
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
// Rcpp::NumericVector phreeqc_time;
// Rcpp::NumericVector dht_get_time;
// Rcpp::NumericVector dht_fill_time;
// Rcpp::IntegerVector phreeqc_counts;
// Rcpp::NumericVector idle_worker;
// int phreeqc_tmp;
// timings = (double *)calloc(3, sizeof(double));
// int dht_hits = 0;
// int dht_miss = 0;
// int dht_collision = 0;
// if (params.dht_enabled) {
// if (params.dht_strategy == 0) {
// if (params.world_rank != 0) {
// DHT_free(dht_object, NULL, NULL);
// }
// } else {
// DHT_free(dht_object, NULL, NULL);
// dht_hits = 0;
// dht_miss = 0;
// dht_collision = 0;
// dht_perfs = (uint64_t *)calloc(3, sizeof(uint64_t));
// }
// double idle_worker_tmp;
// for (int p = 0; p < params.world_size - 1; p++) {
// /* ATTENTION Worker p has rank p+1 */
// /* Send termination message to worker */
// MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
// MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
// MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
// MPI_STATUS_IGNORE);
// phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
// MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
// if (params.dht_enabled) {
// dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
// dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
// MPI_Recv(dht_perfs, 3, MPI_UNSIGNED_LONG_LONG, p + 1, TAG_DHT_PERF,
// MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// dht_hits += dht_perfs[0];
// dht_miss += dht_perfs[1];
// dht_collision += dht_perfs[2];
// }
// }
// free(mpi_buffer);
MPI_Finalize();
// R.parseEvalQ("profiling <- list()");
if (params.world_rank == 0) {
cout << "CPP: done, bye!" << endl;
}
// R["simtime"] = sim_end - sim_start;
// R.parseEvalQ("profiling$simtime <- simtime");
// R["simtime_transport"] = cummul_transport;
// R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
// R["simtime_chemistry"] = cummul_chemistry;
// R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
// R["simtime_workers"] = master.getWorkerTime();
// R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
// R["simtime_chemistry_master"] = master.getChemMasterTime();
// R.parseEvalQ(
// "profiling$simtime_chemistry_master <- simtime_chemistry_master");
exit(0);
// R["seq_master"] = cummul_master_seq;
// R.parseEvalQ("profiling$seq_master <- seq_master");
// // R["master_send"] = master_send;
// // R.parseEvalQ("profiling$master_send <- master_send");
// // R["master_recv"] = master_recv;
// // R.parseEvalQ("profiling$master_recv <- master_recv");
// R["idle_master"] = master.getIdleTime();
// R.parseEvalQ("profiling$idle_master <- idle_master");
// R["idle_worker"] = idle_worker;
// R.parseEvalQ("profiling$idle_worker <- idle_worker");
// R["phreeqc_time"] = phreeqc_time;
// R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
// R["phreeqc_count"] = phreeqc_counts;
// R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
// if (params.dht_enabled) {
// R["dht_hits"] = dht_hits;
// R.parseEvalQ("profiling$dht_hits <- dht_hits");
// R["dht_miss"] = dht_miss;
// R.parseEvalQ("profiling$dht_miss <- dht_miss");
// R["dht_collision"] = dht_collision;
// R.parseEvalQ("profiling$dht_collisions <- dht_collision");
// R["dht_get_time"] = dht_get_time;
// R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
// R["dht_fill_time"] = dht_fill_time;
// R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
// }
// free(timings);
// if (params.dht_enabled) free(dht_perfs);
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
cout << "CPP: Done! Results are stored as R objects into <" << params.out_dir
<< "/timings.rds>" << endl;
/*exporting results and profiling data*/
// std::string r_vis_code;
// r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
// R.parseEval(r_vis_code);
}
else { /*This is executed by the workers*/
ChemWorker worker(&params, R, grid, dht_comm);
// worker.prepareSimulation(dht_comm);
worker.loop();
}
cout << "CPP: finished, cleanup of process " << params.world_rank << endl;
// if (params.dht_enabled) {
// if (params.dht_strategy == 0) {
// if (params.world_rank != 0) {
// DHT_free(dht_object, NULL, NULL);
// }
// } else {
// DHT_free(dht_object, NULL, NULL);
// }
// }
// free(mpi_buffer);
MPI_Finalize();
if (params.world_rank == 0) {
cout << "CPP: done, bye!" << endl;
}
exit(0);
}

3
src/model/CMakeLists.txt Normal file
View File

@ -0,0 +1,3 @@
add_library(POET_Model STATIC ChemSim.cpp ChemSim.h ChemMaster.cpp ChemWorker.cpp TransportSim.cpp Grid.cpp)
target_include_directories(POET_Model PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(POET_Model POET_Util DHT_Wrapper)

View File

@ -1,5 +1,4 @@
#include <Rcpp.h>
#include <mpi.h>
#include <iostream>
@ -15,6 +14,7 @@ ChemMaster::ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_)
: ChemSim(params, R_, grid_) {
this->wp_size = params->wp_size;
this->out_dir = params->out_dir;
this->dht_enabled = params->dht_enabled;
workerlist = (worker_struct *)calloc(world_size - 1, sizeof(worker_struct));
send_buffer = (double *)calloc((wp_size * (grid.getCols())) + BUFFER_OFFSET,
@ -39,7 +39,7 @@ ChemMaster::~ChemMaster() {
free(workerlist);
}
void ChemMaster::runPar() {
void ChemMaster::run() {
double chem_a, chem_b;
double seq_a, seq_b, seq_c, seq_d;
double worker_chemistry_a, worker_chemistry_b;
@ -208,6 +208,110 @@ void ChemMaster::printProgressbar(int count_pkgs, int n_wp, int barWidth) {
/* end visual progress */
}
void ChemMaster::end() {
ChemSim::end();
double *timings;
int *dht_perfs;
Rcpp::NumericVector phreeqc_time;
Rcpp::NumericVector dht_get_time;
Rcpp::NumericVector dht_fill_time;
Rcpp::IntegerVector phreeqc_counts;
Rcpp::NumericVector idle_worker;
int phreeqc_tmp;
timings = (double *)calloc(3, sizeof(double));
int dht_hits = 0;
int dht_miss = 0;
int dht_collision = 0;
if (dht_enabled) {
dht_hits = 0;
dht_miss = 0;
dht_collision = 0;
dht_perfs = (int *)calloc(3, sizeof(int));
}
double idle_worker_tmp;
for (int p = 0; p < world_size - 1; p++) {
/* ATTENTION Worker p has rank p+1 */
/* Send termination message to worker */
MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
if (dht_enabled) {
dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
MPI_Recv(dht_perfs, 3, MPI_INT, p + 1, TAG_DHT_PERF, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
dht_hits += dht_perfs[0];
dht_miss += dht_perfs[1];
cout << "profiler miss = " << dht_miss << endl;
dht_collision += dht_perfs[2];
}
}
R["simtime_chemistry"] = chem_t;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
R["simtime_workers"] = worker_t;
R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
R["simtime_chemistry_master"] = chem_master;
R.parseEvalQ(
"profiling$simtime_chemistry_master <- simtime_chemistry_master");
R["seq_master"] = seq_t;
R.parseEvalQ("profiling$seq_master <- seq_master");
// R["master_send"] = master_send;
// R.parseEvalQ("profiling$master_send <- master_send");
// R["master_recv"] = master_recv;
// R.parseEvalQ("profiling$master_recv <- master_recv");
R["idle_master"] = master_idle;
R.parseEvalQ("profiling$idle_master <- idle_master");
R["idle_worker"] = idle_worker;
R.parseEvalQ("profiling$idle_worker <- idle_worker");
R["phreeqc_time"] = phreeqc_time;
R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
R["phreeqc_count"] = phreeqc_counts;
R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
if (dht_enabled) {
R["dht_hits"] = dht_hits;
R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_miss"] = dht_miss;
R.parseEvalQ("profiling$dht_miss <- dht_miss");
R["dht_collision"] = dht_collision;
R.parseEvalQ("profiling$dht_collisions <- dht_collision");
R["dht_get_time"] = dht_get_time;
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = dht_fill_time;
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
free(timings);
if (dht_enabled) free(dht_perfs);
}
double ChemMaster::getSendTime() { return this->send_t; }
double ChemMaster::getRecvTime() { return this->recv_t; }

View File

@ -1,13 +1,9 @@
#include "ChemSim.h"
#include <Rcpp.h>
#include <mpi.h>
#include <iostream>
#include "../util/RRuntime.h"
#include "Grid.h"
using namespace Rcpp;
using namespace poet;
@ -19,7 +15,7 @@ ChemSim::ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_)
this->out_dir = params->out_dir;
}
void ChemSim::runSeq() {
void ChemSim::run() {
double chem_a, chem_b;
chem_a = MPI_Wtime();
@ -32,4 +28,9 @@ void ChemSim::runSeq() {
chem_t += chem_b - chem_a;
}
void ChemSim::end() {
R["simtime_chemistry"] = chem_t;
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
}
double ChemSim::getChemistryTime() { return this->chem_t; }

View File

@ -1,15 +1,17 @@
#ifndef CHEMSIM_H
#define CHEMSIM_H
#include "../DHT/DHT_Wrapper.h"
#include "../util/RRuntime.h"
#include "../util/SimParams.h"
#include "Grid.h"
#include <DHT_Wrapper.h>
#include <RRuntime.h>
#include <SimParams.h>
#include <mpi.h>
#include <vector>
#include "Grid.h"
#define BUFFER_OFFSET 5
#define TAG_WORK 42
#define TAG_FINISH 43
#define TAG_TIMING 44
@ -20,13 +22,15 @@
namespace poet {
class ChemSim {
public:
public:
ChemSim(t_simparams *params, RRuntime &R_, Grid &grid_);
void runSeq();
virtual void run();
virtual void end();
double getChemistryTime();
protected:
protected:
double current_sim_time = 0;
int iteration = 0;
int dt = 0;
@ -54,12 +58,12 @@ protected:
};
class ChemMaster : public ChemSim {
public:
public:
ChemMaster(t_simparams *params, RRuntime &R_, Grid &grid_);
~ChemMaster();
void runPar();
void profile();
void run() override;
void end() override;
double getSendTime();
double getRecvTime();
@ -68,11 +72,12 @@ public:
double getChemMasterTime();
double getSeqTime();
private:
private:
void printProgressbar(int count_pkgs, int n_wp, int barWidth = 70);
void sendPkgs(int &pkg_to_send, int &count_pkgs, int &free_workers);
void recvPkgs(int &pkg_to_recv, bool to_send, int &free_workers);
bool dht_enabled;
unsigned int wp_size;
double *work_pointer;
@ -85,13 +90,14 @@ private:
};
class ChemWorker : public ChemSim {
public:
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_, MPI_Comm dht_comm);
public:
ChemWorker(t_simparams *params_, RRuntime &R_, Grid &grid_,
MPI_Comm dht_comm);
~ChemWorker();
void loop();
private:
private:
void doWork(MPI_Status &probe_status);
void postIter();
void finishWork();
@ -114,5 +120,5 @@ private:
double idle_t = 0.f;
int phreeqc_count = 0;
};
} // namespace poet
#endif // CHEMSIM_H
} // namespace poet
#endif // CHEMSIM_H

View File

@ -1,5 +1,4 @@
#include "Grid.h"
#include "Rcpp.h"
using namespace poet;
using namespace Rcpp;

View File

@ -1,32 +1,31 @@
#ifndef GRID_H
#define GRID_H
#include "../util/RRuntime.h"
#include <RRuntime.h>
#include <Rcpp.h>
namespace poet {
class Grid {
public:
public:
Grid(RRuntime &R) : R(R){};
void init();
unsigned int getCols();
unsigned int getRows();
void shuffleAndExport(double *buffer);
void importAndUnshuffle(double *buffer);
void importWP(double *buffer, unsigned int p_size);
void exportWP(double *buffer);
private:
private:
RRuntime R;
unsigned int ncol;
unsigned int nrow;
Rcpp::DataFrame getSkeletonDataFrame(unsigned int rows);
};
} // namespace poet
#endif // GRID_H
} // namespace poet
#endif // GRID_H

View File

@ -6,7 +6,7 @@ using namespace poet;
TransportSim::TransportSim(RRuntime &R_) : R(R_) {}
void TransportSim::runIteration() {
void TransportSim::run() {
double sim_a_transport, sim_b_transport;
sim_b_transport = MPI_Wtime();
@ -16,4 +16,9 @@ void TransportSim::runIteration() {
transport_t += sim_a_transport - sim_b_transport;
}
void TransportSim::end() {
R["simtime_transport"] = transport_t;
R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
}
double TransportSim::getTransportTime() { return this->transport_t; }

View File

@ -1,14 +1,15 @@
#ifndef TRANSPORT_SIM_H
#define TRANSPORT_SIM_H
#include "../util/RRuntime.h"
#include <RRuntime.h>
namespace poet {
class TransportSim {
public:
TransportSim(RRuntime &R);
void runIteration();
void run();
void end();
double getTransportTime();

3
src/util/CMakeLists.txt Normal file
View File

@ -0,0 +1,3 @@
add_library(POET_Util STATIC RRuntime.cpp Parser.cpp)
target_include_directories(POET_Util PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${R_INCLUDE_DIRS})
target_link_libraries(POET_Util ${R_LIBRARIES})

View File

@ -1,11 +1,12 @@
#ifndef PARSER_H
#define PARSER_H
#include <RRuntime.h>
#include <string>
#include "../argh.h"
#include "RRuntime.h"
#include "SimParams.h"
#include "argh.h"
#define PARSER_OK 0
#define PARSER_ERROR 1
@ -27,8 +28,8 @@ class Parser {
std::list<std::string> checkOptions(argh::parser cmdl);
std::set<std::string> flaglist{"ignore-result", "dht", "dht-nolog"};
std::set<std::string> paramlist{"work-package-size", "dht-signif",
"dht-strategy", "dht-size",
"dht-snaps", "dht-file"};
"dht-strategy", "dht-size",
"dht-snaps", "dht-file"};
argh::parser cmdl;
t_simparams simparams;

View File

@ -1,126 +0,0 @@
#include "Profiler.h"
#include <Rcpp.h>
#include <mpi.h>
#include <string>
#include <iostream>
using namespace Rcpp;
using namespace std;
int poet::Profiler::startProfiling(t_simparams &params, ChemMaster &chem,
TransportSim &trans, RRuntime &R,
double simtime) {
double *timings;
int *dht_perfs;
Rcpp::NumericVector phreeqc_time;
Rcpp::NumericVector dht_get_time;
Rcpp::NumericVector dht_fill_time;
Rcpp::IntegerVector phreeqc_counts;
Rcpp::NumericVector idle_worker;
int phreeqc_tmp;
timings = (double *)calloc(3, sizeof(double));
int dht_hits = 0;
int dht_miss = 0;
int dht_collision = 0;
if (params.dht_enabled) {
dht_hits = 0;
dht_miss = 0;
dht_collision = 0;
dht_perfs = (int *)calloc(3, sizeof(int));
}
double idle_worker_tmp;
for (int p = 0; p < params.world_size - 1; p++) {
/* ATTENTION Worker p has rank p+1 */
/* Send termination message to worker */
MPI_Send(NULL, 0, MPI_DOUBLE, p + 1, TAG_FINISH, MPI_COMM_WORLD);
MPI_Recv(timings, 3, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_time.push_back(timings[0], "w" + to_string(p + 1));
MPI_Recv(&phreeqc_tmp, 1, MPI_INT, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
phreeqc_counts.push_back(phreeqc_tmp, "w" + to_string(p + 1));
MPI_Recv(&idle_worker_tmp, 1, MPI_DOUBLE, p + 1, TAG_TIMING, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
idle_worker.push_back(idle_worker_tmp, "w" + to_string(p + 1));
if (params.dht_enabled) {
dht_get_time.push_back(timings[1], "w" + to_string(p + 1));
dht_fill_time.push_back(timings[2], "w" + to_string(p + 1));
MPI_Recv(dht_perfs, 3, MPI_INT, p + 1, TAG_DHT_PERF,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
dht_hits += dht_perfs[0];
dht_miss += dht_perfs[1];
cout << "profiler miss = " << dht_miss << endl;
dht_collision += dht_perfs[2];
}
}
R.parseEvalQ("profiling <- list()");
R["simtime"] = simtime;
R.parseEvalQ("profiling$simtime <- simtime");
R["simtime_transport"] = trans.getTransportTime();
R.parseEvalQ("profiling$simtime_transport <- simtime_transport");
R["simtime_chemistry"] = chem.getChemistryTime();
R.parseEvalQ("profiling$simtime_chemistry <- simtime_chemistry");
R["simtime_workers"] = chem.getWorkerTime();
R.parseEvalQ("profiling$simtime_workers <- simtime_workers");
R["simtime_chemistry_master"] = chem.getChemMasterTime();
R.parseEvalQ(
"profiling$simtime_chemistry_master <- simtime_chemistry_master");
R["seq_master"] = chem.getSeqTime();
R.parseEvalQ("profiling$seq_master <- seq_master");
// R["master_send"] = master_send;
// R.parseEvalQ("profiling$master_send <- master_send");
// R["master_recv"] = master_recv;
// R.parseEvalQ("profiling$master_recv <- master_recv");
R["idle_master"] = chem.getIdleTime();
R.parseEvalQ("profiling$idle_master <- idle_master");
R["idle_worker"] = idle_worker;
R.parseEvalQ("profiling$idle_worker <- idle_worker");
R["phreeqc_time"] = phreeqc_time;
R.parseEvalQ("profiling$phreeqc <- phreeqc_time");
R["phreeqc_count"] = phreeqc_counts;
R.parseEvalQ("profiling$phreeqc_count <- phreeqc_count");
if (params.dht_enabled) {
R["dht_hits"] = dht_hits;
R.parseEvalQ("profiling$dht_hits <- dht_hits");
R["dht_miss"] = dht_miss;
R.parseEvalQ("profiling$dht_miss <- dht_miss");
R["dht_collision"] = dht_collision;
R.parseEvalQ("profiling$dht_collisions <- dht_collision");
R["dht_get_time"] = dht_get_time;
R.parseEvalQ("profiling$dht_get_time <- dht_get_time");
R["dht_fill_time"] = dht_fill_time;
R.parseEvalQ("profiling$dht_fill_time <- dht_fill_time");
}
free(timings);
if (params.dht_enabled) free(dht_perfs);
string r_vis_code;
r_vis_code = "saveRDS(profiling, file=paste0(fileout,'/timings.rds'));";
R.parseEval(r_vis_code);
return 0;
}

View File

@ -1,18 +0,0 @@
#ifndef PROFILER_H
#define PROFILER_H
#include "../model/ChemSim.h"
#include "../model/TransportSim.h"
#include "RRuntime.h"
#include "SimParams.h"
namespace poet {
class Profiler {
public:
static int startProfiling(t_simparams &params, ChemMaster &chem,
TransportSim &trans, RRuntime &R, double simtime);
};
} // namespace poet
#endif // PROFILER_H

View File

@ -1,6 +1,8 @@
#include "RRuntime.h"
#include <RInside.h>
#include <Rcpp.h>
#include <string>
using namespace poet;

View File

@ -3,6 +3,7 @@
#include <RInside.h>
#include <Rcpp.h>
#include <string>
namespace poet {
@ -14,8 +15,7 @@ namespace poet {
* If an instance of RRuntime is created a R runtime will also be spawned.
*/
class RRuntime : public RInside {
public:
public:
/**
* Constructor of class RRuntime calling constructor of RInside.
*/
@ -29,8 +29,8 @@ public:
size_t getBufferNCol();
size_t getBufferNRow();
private:
private:
Rcpp::DataFrame dfbuff;
};
} // namespace poet
#endif // RRUNTIME_H
} // namespace poet
#endif // RRUNTIME_H

View File

@ -3,7 +3,6 @@
#include <string>
#include <vector>
#include "RRuntime.h"
typedef struct {
int world_size;
@ -26,8 +25,8 @@ typedef struct {
bool store_result;
void* R;
void* grid;
// void* R;
// void* grid;
} t_simparams;
#endif // SIMPARAMS_H
#endif // SIMPARAMS_H