import keras from keras.layers import Dense, Dropout, Input, BatchNormalization, LeakyReLU import tensorflow as tf import h5py import numpy as np import pandas as pd import time import sklearn.model_selection as sk import matplotlib.pyplot as plt from sklearn.cluster import KMeans from sklearn.pipeline import Pipeline, make_pipeline from sklearn.preprocessing import StandardScaler, MinMaxScaler from imblearn.over_sampling import SMOTE from imblearn.under_sampling import RandomUnderSampler from imblearn.over_sampling import RandomOverSampler from collections import Counter import os from preprocessing import * from sklearn import set_config from importlib import reload set_config(transform_output="pandas") def model_definition(architecture): """Definition of the respective AI model. Three models are currently being analysed, which are labelled ‘small’, ‘large’ or ‘paper’. Args: architecture (String): Choose between 'small', 'large' or 'paper'. Returns: keras model: Returns the respective model. """ dtype = "float32" if architecture == "small": model = keras.Sequential( [ keras.Input(shape=(8,), dtype=dtype), keras.layers.Dense(units=128, dtype=dtype), LeakyReLU(negative_slope=0.01), # Dropout(0.2), keras.layers.Dense(units=128, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(units=8, dtype=dtype), ] ) elif architecture == "large": model = keras.Sequential( [ keras.layers.Input(shape=(8,), dtype=dtype), keras.layers.Dense(512, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(1024, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(512, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(8, dtype=dtype), ] ) elif architecture == "paper": model = keras.Sequential( [ keras.layers.Input(shape=(8,), dtype=dtype), keras.layers.Dense(128, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(256, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(512, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(256, dtype=dtype), LeakyReLU(negative_slope=0.01), keras.layers.Dense(8, dtype=dtype), ] ) else: raise Exception( "No valid architecture found." + "Choose between 'small', 'large' or 'paper'." ) return model @keras.saving.register_keras_serializable() def custom_loss( preprocess, column_dict, h1, h2, h3, scaler_type="minmax", loss_variant="huber", delta=1.0, ): """ Custom tensorflow loss function to combine Huber Loss with mass balance. This is inspired by PINN (Physics Informed Neural Networks) where the loss function is a combination of the physics-based loss and the data-driven loss. The mass balance is a physics-based loss that ensures the conservation of mass in the system. A tensorflow loss function accepts only the two arguments y_true and y_pred. Therefore, a nested function is used to pass the additional arguments. Args: preprocess: preprocessing object column_dict: dictionary with the column names as keys and the corresponding index as values. (i.e {'H': 0, 'O': 1, 'Ba': 2, 'Cl': 3, 'S': 4, 'Sr': 5, 'Barite': 6, 'Celestite': 7}) h1: hyperparameter for the importance of the huber loss h2: hyperparameter for the importance of the Barium mass balance term h3: hyperparameter for the importance of the Strontium mass balance term scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax". loss_variant: Loss function approach. Choose between "huber and "huber_mass_balance". Defaults to "huber". delta: Hyperparameter for the Huber function threshold. Defaults to 1.0. Returns: loss function """ # as far as I know tensorflow does not directly support the use of scaler objects # therefore, the backtransformation is done manually try: if scaler_type == "minmax": scale_X = tf.convert_to_tensor( preprocess.scaler_X.data_range_, dtype=tf.float32 ) min_X = tf.convert_to_tensor( preprocess.scaler_X.data_min_, dtype=tf.float32 ) scale_y = tf.convert_to_tensor( preprocess.scaler_y.data_range_, dtype=tf.float32 ) min_y = tf.convert_to_tensor( preprocess.scaler_y.data_min_, dtype=tf.float32 ) elif scaler_type == "standard": scale_X = tf.convert_to_tensor( preprocess.scaler_X.scale_, dtype=tf.float32) mean_X = tf.convert_to_tensor( preprocess.scaler_X.mean_, dtype=tf.float32) scale_y = tf.convert_to_tensor( preprocess.scaler_y.scale_, dtype=tf.float32) mean_y = tf.convert_to_tensor( preprocess.scaler_y.mean_, dtype=tf.float32) else: raise Exception( "No valid scaler type found. Choose between 'standard' and 'minmax'." ) except AttributeError: raise Exception( "Data normalized with scaler different than specified for the training. Compare the scaling approach on preprocessing and training." ) def loss(results, predicted): # inverse min/max scaling if scaler_type == "minmax": predicted_inverse = predicted * scale_y + min_y results_inverse = results * scale_X + min_X # inverse standard scaling elif scaler_type == "standard": predicted_inverse = predicted * scale_y + mean_y results_inverse = results * scale_X + mean_X # apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was used if preprocess.func_dict_out is not None: predicted_inverse = tf.math.expm1(predicted_inverse) results_inverse = tf.math.expm1(results_inverse) # mass balance # in total no Barium and Strontium should be lost in one simulation step dBa = tf.keras.backend.abs( ( predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]] ) - ( results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]] ) ) dSr = tf.keras.backend.abs( ( predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]] ) - ( results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]] ) ) # huber loss huber_loss = tf.keras.losses.Huber(delta)(results, predicted) # total loss if loss_variant == "huber": total_loss = huber_loss elif loss_variant == "huber_mass_balance": total_loss = h1 * huber_loss + h2 * dBa + h3 * dSr else: raise Exception( "No valid loss variant found. Choose between 'huber' and 'huber_mass_balance'." ) return total_loss return loss def mass_balance_metric(preprocess, column_dict, scaler_type="minmax"): """Auxilary function to calculate the mass balance during training. Args: preprocess: preprocessing object column_dict: dictionary with the column names as keys and the corresponding index as values scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax". Returns: mean of both mass balance terms """ if scaler_type == "minmax": scale_X = tf.convert_to_tensor( preprocess.scaler_X.data_range_, dtype=tf.float32 ) min_X = tf.convert_to_tensor( preprocess.scaler_X.data_min_, dtype=tf.float32) scale_y = tf.convert_to_tensor( preprocess.scaler_y.data_range_, dtype=tf.float32 ) min_y = tf.convert_to_tensor( preprocess.scaler_y.data_min_, dtype=tf.float32) elif scaler_type == "standard": scale_X = tf.convert_to_tensor( preprocess.scaler_X.scale_, dtype=tf.float32) mean_X = tf.convert_to_tensor( preprocess.scaler_X.mean_, dtype=tf.float32) scale_y = tf.convert_to_tensor( preprocess.scaler_y.scale_, dtype=tf.float32) mean_y = tf.convert_to_tensor( preprocess.scaler_y.mean_, dtype=tf.float32) def mass_balance(results, predicted): # inverse min/max scaling if scaler_type == "minmax": predicted_inverse = predicted * scale_y + min_y results_inverse = results * scale_X + min_X elif scaler_type == "standard": predicted_inverse = predicted * scale_y + mean_y results_inverse = results * scale_X + mean_X if preprocess.func_dict_out is not None: predicted_inverse = tf.math.expm1(predicted_inverse) results_inverse = tf.math.expm1(results_inverse) # mass balance dBa = tf.keras.backend.abs( ( predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]] ) - ( results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]] ) ) dSr = tf.keras.backend.abs( ( predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]] ) - ( results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]] ) ) return tf.reduce_mean(dBa + dSr) return mass_balance def huber_metric(delta=1.0): """Auxilary function to calculate the Huber loss during training. Args: preprocess (_type_): _description_ scaler_type (str, optional): _description_. Defaults to "minmax". delta (float, optional): _description_. Defaults to 1.0. """ def huber(results, predicted): huber_loss = tf.keras.losses.Huber(delta)(results, predicted) return huber_loss return huber def mass_balance_evaluation(model, X, preprocess): """Calculates the mass balance difference for each cell. Args: model: trained model X: data where the mass balance should be calculated preprocess: preprocessing object Returns: vector with the mass balance difference for each cell """ # predict the chemistry columns = X.iloc[:, X.columns != "Class"].columns classes = X["Class"] classes.reset_index(drop=True, inplace=True) prediction = pd.DataFrame(model.predict(X[columns]), columns=columns) # backtransform min/max or standard scaler X = pd.DataFrame( preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]), columns=columns, ) prediction = pd.DataFrame( preprocess.scaler_y.inverse_transform(prediction), columns=columns ) # apply backtransformation if log transformation was applied if preprocess.func_dict_out is not None: X = preprocess.funcInverse(X)[0] prediction = preprocess.funcInverse(prediction)[0] # calculate mass balance dBa = np.abs( (prediction["Ba"] + prediction["Barite"]) - (X["Ba"] + X["Barite"])) dSr = np.abs( (prediction["Sr"] + prediction["Celestite"]) - (X["Sr"] + X["Celestite"]) ) mass_balance_result = pd.DataFrame( {"dBa": dBa, "dSr": dSr, "mass_balance": dBa + dSr, "Class": classes} ) return mass_balance_result def mass_balance_ratio(results, threshold=1e-5): proportion = {} mass_balance_threshold = results[results["mass_balance"] <= threshold] overall = len(mass_balance_threshold) class_0_amount = len( mass_balance_threshold[mass_balance_threshold["Class"] == 0]) class_1_amount = len( mass_balance_threshold[mass_balance_threshold["Class"] == 1]) proportion["overall"] = overall / len(results) proportion["class_0"] = class_0_amount / \ len(results[results["Class"] == 0]) proportion["class_1"] = class_1_amount / \ len(results[results["Class"] == 1]) return proportion class preprocessing: """ A class used to preprocess data for model training. Attributes """ def __init__(self, func_dict_in=None, func_dict_out=None, random_state=42): """Initialization of the preprocessing object. Args: func_dict_in: function for transformation. Defaults to None. func_dict_out: function for backtransformation. Defaults to None. random_state (int, optional): Seed for reproducability. Defaults to 42. """ self.random_state = random_state self.scaler_X = None self.scaler_y = None self.func_dict_in = func_dict_in if func_dict_in is not None else None self.func_dict_out = func_dict_out if func_dict_out is not None else None self.state = {"cluster": False, "log": False, "balance": False, "scale": False} def funcTranform(self, *args): """Apply the transformation function to the data columnwise. Returns: pandas data frame: transformed data """ for i in args: for key in i.keys(): if "Class" not in key: i[key] = i[key].apply(self.func_dict_in) self.state["log"] = True return args def funcInverse(self, *args): """Apply the backtransformation function to the data columnwise. Returns: pandas data frame: backtransformed data """ for i in args: for key in i.keys(): if "Class" not in key: i[key] = i[key].apply(self.func_dict_out) self.state["log"] = False return args def cluster(self, X, y, species="Barite", n_clusters=2, x_length=50, y_length=50): """Apply k-means clustering to the data to differentiate betweeen reactive and non-reactive cells. Args: X: design data set y: target data set species (str, optional): Chemical species to which clustering is be applied. Defaults to "Barite". n_clusters (int, optional): Number of clusters. Defaults to 2. x_length: x dimension of the grid. Defaults to 50. y_length: y dimension of the grid. Defaults to 50. Returns: X, y dataframes with an additional column "Class" containing the cluster labels. """ class_labels = np.array([]) grid_length = x_length * y_length iterations = int(len(X) / grid_length) # calculate the cluster for each chemical iteration step for i in range(0, iterations): field = np.array( X[species][(i * grid_length): (i * grid_length + grid_length)] ).reshape(x_length, y_length) kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state).fit( field.reshape(-1, 1) ) class_labels = np.append(class_labels.astype(int), kmeans.labels_) if "Class" in X.columns and "Class" in y.columns: print("Class column already exists") else: class_labels_df = pd.DataFrame(class_labels, columns=["Class"]) X = pd.concat([X, class_labels_df], axis=1) y = pd.concat([y, class_labels_df], axis=1) self.state["cluster"] = True return X, y def balancer(self, X, y, strategy, sample_fraction=0.5): """Apply sampling strategies to balance the dataset. Args: X: design dataset (before the simulation) y: target dataset (after the simulation) strategy: Sampling strategy. Choose between "smote" (Synthetic Minority Oversampling Technique), "over" (Oversampling) and "under" (Undersampling). sample_fraction (float, optional): Define balancer target. Specifies the target fraction of the minority class after the balancing step. Defaults to 0.5. Returns: X, y: resampled datasets """ number_features = (X.columns != "Class").sum() if "Class" not in X.columns: if "Class" in y.columns: classes = y["Class"] else: raise Exception("No class column found") else: classes = X["Class"] counter = classes.value_counts() print("Amount class 0 before:", counter[0] / (counter[0] + counter[1])) print("Amount class 1 before:", counter[1] / (counter[0] + counter[1])) df = pd.concat( [ X.loc[:, X.columns != "Class"], y.loc[:, y.columns != "Class"], classes, ], axis=1, ) if strategy == "smote": print("Using SMOTE strategy") smote = SMOTE(sampling_strategy=sample_fraction) df_resampled, classes_resampled = smote.fit_resample( df.loc[:, df.columns != "Class"], df.loc[:, df.columns == "Class"] ) elif strategy == "over": print("Using Oversampling") over = RandomOverSampler() df_resampled, classes_resampled = over.fit_resample( df.loc[:, df.columns != "Class"], df.loc[:, df.columns == "Class"] ) elif strategy == "under": print("Using Undersampling") under = RandomUnderSampler() df_resampled, classes_resampled = under.fit_resample( df.loc[:, df.columns != "Class"], df.loc[:, df.columns == "Class"] ) else: print("No sampling selected. Output equals input.") return X, y counter = classes_resampled["Class"].value_counts() print("Amount class 0 after:", counter[0] / (counter[0] + counter[1])) print("Amount class 1 after:", counter[1] / (counter[0] + counter[1])) design_resampled = pd.concat( [df_resampled.iloc[:, 0:number_features], classes_resampled], axis=1 ) target_resampled = pd.concat( [df_resampled.iloc[:, number_features:], classes_resampled], axis=1 ) self.state["balance"] = True return design_resampled, target_resampled def scale_fit(self, X, y, scaling, type="Standard"): """Fit a scaler for data preprocessing. Args: X: design dataset y: target dataset scaling: learn individual scaler for X and y when "individual" is selected or one global scaler on all data in X and y if "global" is selected (scaler_X and scaler_y are equal) type (str, optional): Using MinMax Scaling or Standarization. Defaults to "Standard". """ if type == "minmax": self.scaler_X = MinMaxScaler() self.scaler_y = MinMaxScaler() elif type == "standard": self.scaler_X = StandardScaler() self.scaler_y = StandardScaler() else: raise Exception("No valid scaler type found") if scaling == "individual": self.scaler_X.fit(X.iloc[:, X.columns != "Class"]) self.scaler_y.fit(y.iloc[:, y.columns != "Class"]) elif scaling == "global": self.scaler_X.fit( pd.concat( [X.iloc[:, X.columns != "Class"], y.iloc[:, y.columns != "Class"]], axis=0, ) ) self.scaler_y = self.scaler_X self.state["scale"] = True def scale_transform(self, X_train, X_test, y_train, y_test): """Apply learned scaler on datasets. Args: X_train: design training data X_test: test training data y_train: target training data y_test: test target data Returns: transformed dataframes """ X_train = pd.concat( [ self.scaler_X.transform( X_train.loc[:, X_train.columns != "Class"]), X_train.loc[:, "Class"], ], axis=1, ) X_test = pd.concat( [ self.scaler_X.transform( X_test.loc[:, X_test.columns != "Class"]), X_test.loc[:, "Class"], ], axis=1, ) y_train = pd.concat( [ self.scaler_y.transform( y_train.loc[:, y_train.columns != "Class"]), y_train.loc[:, "Class"], ], axis=1, ) y_test = pd.concat( [ self.scaler_y.transform( y_test.loc[:, y_test.columns != "Class"]), y_test.loc[:, "Class"], ], axis=1, ) return X_train, X_test, y_train, y_test def scale_inverse(self, *args): """Backtransform the dataset Returns: Backtransformed data frames """ result = [] for i in args: if "Class" in i.columns: inversed = pd.DataFrame( self.scaler_X.inverse_transform( i.loc[:, i.columns != "Class"]), columns=i.columns[:-1], ) class_column = i.loc[:, "Class"].reset_index(drop=True) i = pd.concat([inversed, class_column], axis=1) else: i = pd.DataFrame( self.scaler_X.inverse_transform(i), columns=i.columns) result.append(i) return result def split(self, X, y, ratio=0.8): X_train, y_train, X_test, y_test = sk.train_test_split( X, y, test_size=ratio, random_state=self.random_state ) return X_train, y_train, X_test, y_test def class_selection(self, *args, class_label=0): """Select only rows with specific class label Args: Dataframes where rows with specific label should be selected. Defaults to 0. Returns: Elements with selected class label. """ for i in args: i = i[i["Class"] == class_label] return args