### Copyright (C) 2018-2021 Marco De Lucia (GFZ Potsdam) ### ### POET is free software; you can redistribute it and/or modify it under the ### terms of the GNU General Public License as published by the Free Software ### Foundation; either version 2 of the License, or (at your option) any later ### version. ### ### POET is distributed in the hope that it will be useful, but WITHOUT ANY ### WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR ### A PARTICULAR PURPOSE. See the GNU General Public License for more details. ### ### You should have received a copy of the GNU General Public License along with ### this program; if not, write to the Free Software Foundation, Inc., 51 ### Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ## Simple function to check file extension. It is needed to check if ## the GridFile is SUM (MUFITS format) or rds/RData FileExt <- function (x) { pos <- regexpr("\\.([[:alnum:]]+)$", x) ifelse(pos > -1L, substring(x, pos + 1L), "") } ### This function is only called by the master process. It sets up ### the data structures for the coupled simulations and performs the ### first timestep or "iteration zero", possibly involving the phreeqc ### calculation of the initial state (if it is not already provided as ### matrix) and the first Advection iteration master_init <- function(setup) { msgm("Process with rank 0 reading GRID and MUFITS_sims") ## MDL TODO: actually grid and all the MUFITS snapshots should be ## read and stored only by the master process!! ## Setup the directory where we will store the results verb <- FALSE if (local_rank==0) { verb <- TRUE ## verbosity loading MUFITS results if (!dir.exists(fileout)) { dir.create(fileout) msgm("created directory ", fileout) } else { msgm("dir ", fileout," already exists, I will overwrite!") } if (!exists("store_result")) msgm("store_result doesn't exist!") else msgm("store_result is ", store_result) } else { } ## to enhance flexibility, gridfile can be now given in SUM or, ## already processed, in rds format. We check for extension first. gridext <- FileExt(setup$gridfile) if (gridext=="SUM") { msgm("Generating grid from MUFITS SUM file...") mufits_grid <- ReadGrid(setup$gridfile, verbose=verb) } else { msgm("Reading grid from rds file...") mufits_grid <- readRDS(setup$gridfile) } ## load all the snapshots at once. The setup argument "snapshots" ## now can be either a *directory* where the .SUM files are stored ## or a rds file. if (dir.exists(setup$snapshots)) { msgm(" points to a directory; reading from SUM files in there...") mufits_sims <- LoadMufitsSumRes(dir=setup$snapshots, verbose=verb) } else { msgm(" points to a file. Reading as rds...") mufits_sims <- readRDS(setup$snapshots) } ## Since this function is evaluated by the R process called from ## the C++ program, we need to make available all these variables ## to the R parent frame! assign("mufits_grid", mufits_grid, pos=parent.frame()) assign("mufits_sims", mufits_sims, pos=parent.frame()) ## cat(local_rank, "assignement complete\n") ## we calculate the *coupling* iterations nstep <- length(mufits_sims) msgm("using", nstep,"flow snapshots") ## cat(local_rank, "nstep:", nstep, "\n") ## cat(local_rank, "names:", paste(names(mufits_sims), collate=","), "\n") ## we have these snapshots; we output the results of the coupling ## after each timesteps timesteps <- diff(sapply(mufits_sims, function(x) {return(x$info)})) ## cat(local_rank, "timesteps:", paste0(timesteps, collate=" "), "\n") maxiter <- length(timesteps) ## steady case not treated in this version!!!! steady <- FALSE setup$iter <- 1 setup$maxiter <- maxiter setup$timesteps <- timesteps setup$simulation_time <- 0 setup$steady <- steady if (nrow(setup$bound)==1) { boundmatAct <- t(RedModRphree::pH2Act(setup$bound)) msg("formed correct matrix from setup$bound:") print(boundmatAct) } else { boundmatAct <- RedModRphree::pH2Act(setup$bound) } setup$boundmatAct <- boundmatAct return(setup) } ## This function is called by all processes. ## Since the worker processes need state_C in worker.cpp -> worker_function() ## even the worker need to run this code. ## TODO: worker.cpp -> worker_function() --> state_C is only needed to obtain headers ## and size of dataframe ... if this will be adjusted, this function will be ## unnecessary for workers init_chemistry <- function(setup) { ## setup the chemistry if (!is.matrix(setup$initsim)) { msgm("initial state defined through PHREEQC simulation, assuming homogeneous medium!") tmpfirstrun <- RunPQC_Warnings(setup$initsim, second=TRUE) ## if (local_rank==0){ ## print(tmpfirstrun) ## } remcolnames <- colnames(tmpfirstrun) if (nrow(tmpfirstrun) > 1) { ## msgm("Iter 0 selecting second row") firstrun <- matrix(tmpfirstrun[2,], nrow=1, byrow=TRUE) colnames(firstrun) <- remcolnames } else { firstrun <- tmpfirstrun } state_C <- matrix(rep(firstrun,setup$n), byrow=TRUE, ncol=length(setup$prop)) colnames(state_C) <- colnames(firstrun) ## if (local_rank==0) ## saveRDS(state_C, "initstate.rds") } else { msgm("given initial state") state_C <- setup$initsim } ## save state_T and state_C setup$state_C <- state_C return(setup) } ## relic code, may be useful in future finit <- function(setup) { ## saved <- setup$saved ## state_C <- setup$state_C ## timesteps <- setup$timesteps ## out_save <- setup$out_save ## to_save <- setup$to_save ## if (out_save) { ## msgm("saving ") ## attr(saved,"savedtimesteps") <- timesteps[save] ## attr(saved,"timesteps") <- timesteps ## return(saved) ## } else { ## attr(saved,"timesteps") <- timesteps ## return(state_C) ## } } ## This function, called only by the master, computes the *inner* ## timesteps for transport given the requested simulation timestep and ## the current snapshot (through the Courant Fredrich Levy condition) ## NB: we always iterate: 1)T 2)C master_iteration_setup <- function(setup) { ## in this version, "grid" and "mufits_sims" are variables already ## living in the environment of the R process with local_rank 0 ## if (local_rank == 0) { ## cat("Process ", local_rank, "iteration_start\n") ## } else { ## cat("Process ", local_rank, "SHOULD NOT call iteration_start!\n") ## } ## in C++, "iter" starts from 1 iter <- setup$iter next_dt <- setup$timesteps[iter] ## setting the current snapshot snap <- mufits_sims[[iter]] msgm("Current simulation time:", round(setup$simulation_time),"[s] or ", round(setup$simulation_time/3600/24,2), "[day]") msgm("Time step for current iteration:", round(next_dt),"[s] or ", round(next_dt/3600/24,2), "[day]") setup$simulation_time <- setup$simulation_time+next_dt msgm("Target simulation time:", round(setup$simulation_time),"[s] or ", round((setup$simulation_time)/3600/24,2), "[day]") ## since the phase and density may change in MUFITS ## simulations/snapshots, we MUST tell their name at setup. Here ## we match the right column for both variables nphase <- match(setup$phase, colnames(snap$conn)) ndensity <- match(setup$density, colnames(snap$cell)) ## the "new cfl" flux <- snap$conn[, nphase] cellporvol <- mufits_grid$cell$PORO * mufits_grid$cell$CELLVOL ## in MUFITS, positive flux means from CELLID1 to CELLID2 pos_flux <- ifelse(flux>0, snap$conn$CONNID1, snap$conn$CONNID2) poro <- mufits_grid$cell$PORO[pos_flux] vol <- mufits_grid$cell$CELLVOL[snap$conn$CONNID1] ## extract the right density for the right fluxes density <- snap$cell[pos_flux, ndensity] ## transform flux from Mufits ton/day to m3/s. This is a VOLUMETRIC FLUX fluxv <- flux/3600/24*1000/density ## now we want the darcy velocity excl0 <- which(abs(fluxv)<.Machine$double.eps) ## msgm("excl0"); print(excl0) ## msgm("length(excl0)"); print(length(excl0)) ## The CFL condition is expressed in terms of total flux and total ## pore volume cfl <- as.numeric(min(abs(vol*poro/fluxv)[-excl0])) if (!is.finite(cfl)) stop(msgm("CFL is ", cfl,"; quitting")) allowed_dt <- setup$Cf*cfl requested_dt <- next_dt ## target_time - setup$simulation_time msgm("CFL allows dt of <", round(cfl, 2)," [s]; multiplicator is ", setup$Cf, "; requested_dt is: ", round(requested_dt, 2)) if (requested_dt > allowed_dt) { inniter <- floor(requested_dt/allowed_dt) inner_timesteps <- c(rep(allowed_dt, inniter), requested_dt - allowed_dt * inniter) msgm("Performing ", inniter, " inner iterations") } else { inner_timesteps <- requested_dt inniter <- 1 } setup$inner_timesteps <- inner_timesteps setup$requested_dt <- requested_dt setup$allowed_dt <- allowed_dt setup$inniter <- inniter setup$iter <- iter setup$fluxv <- fluxv setup$no_transport_cells <- excl0 setup$cellporvol <- cellporvol return(setup) } ## This function, called only by master, stores on disk the last ## calculated time step if store_result is TRUE and increments the ## iteration counter master_iteration_end <- function(setup) { iter <- setup$iter ## MDL Write on disk state_T and state_C after every iteration if (store_result) { nameout <- paste0(fileout, '/iter_', sprintf('%03d', iter), '.rds') saveRDS(list(T=setup$state_T, C=setup$state_C), file=nameout) msgm("results stored in <", nameout, ">") } msgm("done iteration", iter, "/", setup$maxiter) setup$iter <- setup$iter + 1 return(setup) } ## master: compute advection master_advection <- function(setup) { msgm("requested dt=", setup$requested_dt) concmat <- RedModRphree::pH2Act(setup$state_C) inner_timesteps <- setup$inner_timesteps Cf <- setup$Cf iter <- setup$iter excl <- setup$no_transport_cells ## msgm("excl"); print(excl) immobile <- setup$immobile boundmat <- setup$boundmatAct snap <- mufits_sims[[setup$iter]] ## conc is a matrix with colnames if (is.matrix(concmat)) { if (length(immobile)>0) val <- concmat[,-immobile] else val <- concmat ## sort the columns of the matrix matching the names of boundary sptr <- colnames(val) inflow <- boundmat[, colnames(boundmat)!="cbound", drop=FALSE] spin <- colnames(inflow) ind <- match(spin,sptr) if (TRUE %in% is.na(ind)) { msgm("Missing boundary conditions for some species") val <- cbind(val,matrix(0,ncol=sum(is.na(ind)),nrow=nrow(val) )) colnames(val) <- c(sptr,spin[is.na(ind)]) } sptr <- colnames(val) ind <- match(sptr,spin) ## msgm("ind"); print(ind) cnew <- val msgm("Computing transport of ", ncol(val), " species") ## if (local_rank==0) { ## saveRDS(list(conc=newconc, times=times, activeCells=grid$cell$CELLID[-exclude_cell], fluxv=fluxv), ## file=paste0("TranspUpwind_", local_rank, ".RData")) ## } ## cnew[ boundmat[, 1] ] <- boundmat[, 2] ## cells where we won't update the concentrations: union of ## boundary and no-flow cells if (length(excl) == 0) exclude_cell <- sort(c(boundmat[, 1])) else exclude_cell <- sort(c(boundmat[, 1], excl)) ## msgm("mufits_grid$cell$CELLID[-excludecell]:") ## print(mufits_grid$cell$CELLID[-exclude_cell]) for (i in seq(1, ncol(val))) { ## form a 2-column matrix with cell id and boundary ## concentration for those elements newboundmat <- boundmat[,c(1, ind[i]+1)] ## vector with the old concentrations concv <- val[,i] ## apply boundary conditions to the concentration vector ## (they should stay constant but better safe than sorry) concv[newboundmat[, 1]] <- newboundmat[, 2] ## call the function cnew[,i] <- CppTransportUpwindIrr(concv = concv, times = inner_timesteps, activeCells = mufits_grid$cell$CELLID[-exclude_cell], fluxv = setup$fluxv, listconn = mufits_grid$listconn, porVol = setup$cellporvol) } colnames(cnew) <- colnames(val) if ( length(immobile) > 0) { res <- cbind(cnew, concmat[,immobile]) } else { res <- cnew } ## check for negative values. This SHOULD NOT OCCUR and may be ## the result of numerical dispersion (or bug in my transport ## code!) if (any(res <0 )) { a <- length(res[res < 0 ]) res[res < 0 ] <- 0 msgm("-> ", a, "concentrations were negative") } ## msgm("state_T after iteration", setup$iter, ":") ## print(head(res)) } else { msgm("state_C at iteration", setup$iter, " is not a Matrix, doing nothing!") } setup$state_T <- res return(setup) } ## function for the workers to compute chemistry through PHREEQC slave_chemistry <- function(setup, data) { base <- setup$base first <- setup$first prop <- setup$prop immobile <- setup$immobile kin <- setup$kin ann <- setup$ann if (local_rank == 0) { iter <- setup$iter timesteps <- setup$timesteps dt <- timesteps[iter] } state_T <- data ## not the global field, but the work-package ## treat special H+/pH, e-/pe cases state_T <- RedModRphree::Act2pH(state_T) ## reduction of the problem if(setup$reduce) reduced <- ReduceStateOmit(state_T, omit=setup$ann) else reduced <- state_T ## if (local_rank==1) { ## msg("worker", local_rank,"; iter=", iter, "dt=", dt) ## msg("reduce is", setup$reduce) ## msg("data:") ## print(reduced) ## msg("base:") ## print(base) ## msg("first:") ## print(first) ## msg("ann:") ## print(ann) ## msg("prop:") ## print(prop) ## msg("immobile:") ## print(immobile) ## msg("kin:") ## print(kin) ## } ## form the PHREEQC input script for the current work package inplist <- splitMultiKin(data=reduced, procs=1, base=base, first=first, ann=ann, prop=prop, minerals=immobile, kin=kin, dt=dt) ## if (local_rank==1 & iter==1) ## RPhreeWriteInp("FirstInp", inplist) tmpC <- RunPQC_Warnings(inplist, procs=1, second=TRUE) ## recompose after the reduction if (setup$reduce) state_C <- RecomposeState(tmpC, reduced) else { state_C <- tmpC } ## the next line is needed since we don't need all columns of ## PHREEQC output return(state_C[, prop]) } ## This function, called by master master_chemistry <- function(setup, data) { state_T <- setup$state_T msgm(" chemistry iteration", setup$iter) ## treat special H+/pH, e-/pe cases state_T <- RedModRphree::Act2pH(state_T) ## reduction of the problem if(setup$reduce) reduced <- ReduceStateOmit(state_T, omit=setup$ann) else reduced <- state_T ### inject data from workers res_C <- data rownames(res_C) <- NULL ## print(res_C) if (nrow(res_C) > nrow(reduced)) { res_C <- res_C[seq(2,nrow(res_C), by=2),] } ## recompose after the reduction if (setup$reduce) state_C <- RecomposeState(res_C, reduced) else { state_C <- res_C } setup$state_C <- state_C setup$reduced <- reduced return(setup) } ## Adapted version for "reduction" ReduceStateOmit <- function (data, omit=NULL, sign=6) { require(mgcv) rem <- colnames(data) if (is.list(omit)) { indomi <- match(names(omit), colnames(data)) datao <- data[, -indomi] } else datao <- data datao <- signif(datao, sign) red <- mgcv::uniquecombs(datao) inds <- attr(red, "index") now <- ncol(red) ## reattach the omitted column(s) ## FIXME: control if more than one ann is present if (is.list(omit)) { red <- cbind(red, rep( data[ 1, indomi], nrow(red))) colnames(red)[now+1] <- names(omit) ret <- red[, colnames(data)] } else { ret <- red } rownames(ret) <- NULL attr(ret, "index") <- inds return(ret) } ## Attach the name of the calling function to the message displayed on ## R's stdout msgm <- function (...) { if (local_rank==0) { fname <- as.list(sys.call(-1))[[1]] prefix <- paste0("R: ", fname, " ::") cat(paste(prefix, ..., "\n")) } invisible() } RunPQC_Warnings <- function (input, procs = 1, second = TRUE) { .runPQC <- function(input, onlysecond) { require(phreeqc) phreeqc::phrSetErrorStringsOn(TRUE) error_count <- phreeqc::phrRunString(input) warnings <- phreeqc::phrGetWarningStrings() checked_warn <- CheckWarnings(warnings) if (is.null(error_count)) { out <- phreeqc::phrGetSelectedOutput()[[1]] nso <- colnames(out) nso <- sub("..mol.kgw.", "", nso, fixed = TRUE) nso <- sub(".mol.kgw.", "", nso, fixed = TRUE) nso <- sub("^k_", "", nso) nso <- sub(".g.", "(g)", nso, fixed = TRUE) if (second) out <- out[seq(2, nrow(out), by = 2), ] colnames(out) <- nso return(data.matrix(out)) } else { msgm("Error in simmulation!") cat(phrGetErrorStrings(), sep="\n") } } if (!is.list(input)) { if (is.character(input)) { res <- .runPQC(input, onlysecond = second) } else { stopmsg("something wrong with the input, dying!") } } else { res <- foreach(i = seq_along(input), .combine = rbind) %dopar% .runPQC(input[[i]], onlysecond = second) } return(res) } CheckWarnings <- function(warnings) { ## get rid of "Negative moles in solution" warnings recover <- grep("Negative moles", warnings, fixed=TRUE) warnings <- warnings[-recover] lw <- length(warnings) if (lw>0) { ## nfile <- paste0("warnings_", local_rank, ".txt") nfile <- "Warnings.txt" cat(warnings, file=nfile, sep="\n", append=TRUE) } return(lw) }