Bug fixes - Functional State

This commit is contained in:
nsauerbrei 2023-06-21 23:36:00 +02:00
parent 9014948805
commit d6b09a12d7
3 changed files with 165 additions and 17 deletions

View File

@ -22,6 +22,7 @@
#define DAOS_ERROR -1
#define DAOS_MPI_ERROR -2
#define DAOS_READ_MISS -3
#define DHT_STATISTICS 1
/**
* Internal struct to store statistics about read and write accesses and also
@ -39,6 +40,8 @@ typedef struct
int old_writes;
/** How many read misses occur? */
int read_misses;
/** How many read hits occur? */
int read_hits;
/** How many buckets where evicted? */
int evictions;
/** How many calls of DHT_write() did this process? */
@ -78,7 +81,7 @@ typedef struct
#ifdef DHT_STATISTICS
/** Detailed statistics of the usage of the DHT. */
DAOS_stats *stats;
DAOSKV_stats *stats;
#endif
} DAOSKV;
#ifdef __cplusplus
@ -91,6 +94,7 @@ extern "C"
extern int DAOSKV_write(DAOSKV *object, void *key, int key_size, void *send_data, int send_size);
extern int DAOSKV_read(DAOSKV *object, void *key, int key_size, void *recv_data, int recv_size);
extern int DAOSKV_remove(DAOSKV *object, void *key, int key_size);
extern int DAOSKV_print_statistics(DAOSKV *object);
#ifdef __cplusplus
}
#endif

View File

@ -102,19 +102,23 @@ auto DHT_Wrapper::checkDHT(int length, double dt,
key_vector = fuzzForDHT(this->key_count, key, dt);
int res = DAOSKV_read(this->daosKV_object,
key_vector.data(),key_vector.size(),
data.data(), data.size());
key_vector.data(),key_size,
data.data(), data_size);
switch (res) {
case DAOS_SUCCESS:
dht_results.needPhreeqc[i] = false;
this->dht_hits++;
break;
case DAOS_READ_MISS:
dht_results.needPhreeqc[i] = true;
new_mapping.push_back(curr_mapping[i]);
this->dht_miss++;
break;
default:
printf("Error");
}
}
@ -124,16 +128,19 @@ auto DHT_Wrapper::checkDHT(int length, double dt,
}
void DHT_Wrapper::fillDHT(int length, const std::vector<double> &work_package) {
// loop over every grid cell contained in work package
for (int i = 0; i < length; i++) {
// If true grid cell was simulated, needs to be inserted into dht
if (dht_results.needPhreeqc[i]) {
auto &key = dht_results.keys[i];
void *data = (void *)&(work_package[i * this->data_count]);
// fuzz data (round, logarithm etc.)
int res = DAOSKV_read(this->daosKV_object,
(void *) key.data(), key.size(),
int res = DAOSKV_write(this->daosKV_object,
(void *) key.data(), key_size,
data, data_size);
}
@ -169,9 +176,8 @@ int DHT_Wrapper::fileToTable(const char *filename) {
}
void DHT_Wrapper::printStatistics() {
int res = 1;
//res = DHT_print_statistics(daosKV_object);
int res = DAOSKV_print_statistics(daosKV_object);
if (res != DAOS_SUCCESS) {
// MPI ERROR ... WHAT TO DO NOW?

View File

@ -108,25 +108,28 @@ DAOSKV *DAOSKV_create(MPI_Comm comm)
object->read_misses = 0;
object->evictions = 0;
MPI_Comm_rank(comm, &object->rank);
MPI_Comm_size(comm, &object->comm_size);
// if set, initialize dht_stats
#ifdef DHT_STATISTICS
DHT_stats *stats;
DAOSKV_stats *stats;
stats = (DHT_stats *)malloc(sizeof(DHT_stats));
stats = (DAOSKV_stats *)malloc(sizeof(DAOSKV_stats));
if (stats == NULL)
return NULL;
object->stats = stats;
object->stats->writes_local = (int *)calloc(comm_size, sizeof(int));
object->stats->writes_local = (int *)calloc(object->comm_size, sizeof(int));
object->stats->old_writes = 0;
object->stats->read_misses = 0;
object->stats->read_hits = 0;
object->stats->evictions = 0;
object->stats->w_access = 0;
object->stats->r_access = 0;
#endif
MPI_Comm_rank(comm, &object->rank);
MPI_Comm_size(comm, &object->comm_size);
/** initialize the local DAOS stack */
if (daos_init() != 0)
@ -219,6 +222,11 @@ int DAOSKV_write(DAOSKV *object, void *key, int key_size, void *send_data, int s
{
int rc;
#ifdef DHT_STATISTICS
object->stats->w_access++;
#endif
d_iov_t dkey;
d_sg_list_t sgl;
d_iov_t sg_iov;
@ -249,6 +257,10 @@ int DAOSKV_write(DAOSKV *object, void *key, int key_size, void *send_data, int s
if (rc != 0)
return DAOS_ERROR;
#ifdef DHT_STATISTICS
object->stats->writes_local[object->rank]++;
#endif
return DAOS_SUCCESS;
}
@ -256,6 +268,10 @@ int DAOSKV_read(DAOSKV *object, void *key, int key_size, void *recv_data, int re
{
int rc;
#ifdef DHT_STATISTICS
object->stats->r_access++;
#endif
d_iov_t dkey;
d_sg_list_t sgl;
d_iov_t sg_iov;
@ -283,12 +299,19 @@ int DAOSKV_read(DAOSKV *object, void *key, int key_size, void *recv_data, int re
/** fetch a dkey */
rc = daos_obj_fetch(object->oh, DAOS_TX_NONE, 0, &dkey, 1, &iod, &sgl,
NULL, NULL);
if (rc != 0)
if (rc != 0){
return DAOS_ERROR;
}
if (iod.iod_size == 0)
if (iod.iod_size == 0){
#ifdef DHT_STATISTICS
object->stats->read_misses += 1;
#endif
return DAOS_READ_MISS;
}
#ifdef DHT_STATISTICS
object->stats->read_hits += 1;
#endif
return DAOS_SUCCESS;
}
@ -303,3 +326,118 @@ int DAOSKV_remove(DAOSKV *object, void *key, int key_size)
return DAOS_SUCCESS;
}
int DAOSKV_print_statistics(DAOSKV *object){
#ifdef DHT_STATISTICS
int *written_buckets;
int *read_misses, sum_read_misses;
int *read_hits, sum_read_hits;
int *evictions, sum_evictions;
int sum_w_access, sum_r_access, *w_access, *r_access;
int rank;
MPI_Comm_rank(object->communicator, &rank);
// disable possible warning of unitialized variable, which is not the case
// since we're using MPI_Gather to obtain all values only on rank 0
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
// obtaining all values from all processes in the communicator
if (rank == 0) read_misses = (int *)malloc(object->comm_size * sizeof(int));
if (MPI_Gather(&object->stats->read_misses, 1, MPI_INT, read_misses, 1,
MPI_INT, 0, object->communicator) != 0)
return DAOS_MPI_ERROR;
if (MPI_Reduce(&object->stats->read_misses, &sum_read_misses, 1, MPI_INT,
MPI_SUM, 0, object->communicator) != 0)
return DAOS_MPI_ERROR;
object->stats->read_misses = 0;
if (rank == 0) read_hits = (int *)malloc(object->comm_size * sizeof(int));
if (MPI_Gather(&object->stats->read_hits, 1, MPI_INT, read_hits, 1,
MPI_INT, 0, object->communicator) != 0)
return DAOS_MPI_ERROR;
if (MPI_Reduce(&object->stats->read_hits, &sum_read_hits, 1, MPI_INT,
MPI_SUM, 0, object->communicator) != 0)
return DAOS_MPI_ERROR;
object->stats->read_hits = 0;
if (rank == 0) evictions = (int *)malloc(object->comm_size * sizeof(int));
if (MPI_Gather(&object->stats->evictions, 1, MPI_INT, evictions, 1, MPI_INT, 0,
object->communicator) != 0)
return DAOS_MPI_ERROR;
if (MPI_Reduce(&object->stats->evictions, &sum_evictions, 1, MPI_INT, MPI_SUM,
0, object->communicator) != 0)
return DAOS_MPI_ERROR;
object->stats->evictions = 0;
if (rank == 0) w_access = (int *)malloc(object->comm_size * sizeof(int));
if (MPI_Gather(&object->stats->w_access, 1, MPI_INT, w_access, 1, MPI_INT, 0,
object->communicator) != 0)
return DAOS_MPI_ERROR;
if (MPI_Reduce(&object->stats->w_access, &sum_w_access, 1, MPI_INT, MPI_SUM, 0,
object->communicator) != 0)
return DAOS_MPI_ERROR;
object->stats->w_access = 0;
if (rank == 0) r_access = (int *)malloc(object->comm_size * sizeof(int));
if (MPI_Gather(&object->stats->r_access, 1, MPI_INT, r_access, 1, MPI_INT, 0,
object->communicator) != 0)
return DAOS_MPI_ERROR;
if (MPI_Reduce(&object->stats->r_access, &sum_r_access, 1, MPI_INT, MPI_SUM, 0,
object->communicator) != 0)
return DAOS_MPI_ERROR;
object->stats->r_access = 0;
if (rank == 0) written_buckets = (int *)calloc(object->comm_size, sizeof(int));
if (MPI_Reduce(object->stats->writes_local, written_buckets, object->comm_size,
MPI_INT, MPI_SUM, 0, object->communicator) != 0)
return DAOS_MPI_ERROR;
if (rank == 0) { // only process with rank 0 will print out results as a
// object
int sum_written_buckets = 0;
for (int i = 0; i < object->comm_size; i++) {
sum_written_buckets += written_buckets[i];
}
int members = 7;
int padsize = (members * 12) + 1;
char pad[padsize + 1];
memset(pad, '-', padsize * sizeof(char));
pad[padsize] = '\0';
printf("\n");
printf("%-35s||resets with every call of this function\n", " ");
printf("%-11s|%-11s|%-11s||%-11s|%-11s|%-11s|%-11s|%-11s\n", "rank", "occupied",
"free", "w_access", "r_access", "read misses","read hits", "evictions");
printf("%s\n", pad);
for (int i = 0; i < object->comm_size; i++) {
printf("%-11d|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d|%-11d\n", i,
written_buckets[i], 0,
w_access[i], r_access[i], read_misses[i],read_hits[i], evictions[i]);
}
printf("%s\n", pad);
printf("%-11s|%-11d|%-11d||%-11d|%-11d|%-11d|%-11d|%-11d\n", "sum",
sum_written_buckets,
0,
sum_w_access, sum_r_access, sum_read_misses,sum_read_hits, sum_evictions);
printf("%s\n", pad);
printf("%s %d\n",
"new entries:", sum_written_buckets - object->stats->old_writes);
printf("\n");
fflush(stdout);
object->stats->old_writes = sum_written_buckets;
}
// enable warning again
#pragma GCC diagnostic pop
MPI_Barrier(object->communicator);
return DAOS_SUCCESS;
#endif
}