diff --git a/src/preprocessing.py b/src/preprocessing.py index b0b7ef5..61ba0bc 100644 --- a/src/preprocessing.py +++ b/src/preprocessing.py @@ -1,5 +1,5 @@ import keras -from keras.layers import Dense, Dropout, Input,BatchNormalization, LeakyReLU +from keras.layers import Dense, Dropout, Input, BatchNormalization, LeakyReLU import tensorflow as tf import h5py import numpy as np @@ -18,46 +18,33 @@ import os from preprocessing import * from sklearn import set_config from importlib import reload -set_config(transform_output = "pandas") -# preprocessing pipeline -# +set_config(transform_output="pandas") -def Safelog(val): - # get range of vector - if val > 0: - return np.log10(val) - elif val < 0: - return -np.log10(-val) - else: - return 0 -def Safeexp(val): - if val > 0: - return -10 ** -val - elif val < 0: - return 10 ** val - else: - return 0 - - def model_definition(architecture): + """Definition of the respective AI model. Three models are currently being analysed, which are labelled ‘small’, ‘large’ or ‘paper’. + + Args: + architecture (String): Choose between 'small', 'large' or 'paper'. + Returns: + keras model: Returns the respective model. + """ dtype = "float32" if architecture == "small": model = keras.Sequential( [ - keras.Input(shape=(8,), dtype="float32"), - keras.layers.Dense(units=128, dtype="float32"), + keras.Input(shape=(8,), dtype=dtype), + keras.layers.Dense(units=128, dtype=dtype), LeakyReLU(negative_slope=0.01), # Dropout(0.2), - keras.layers.Dense(units=128, dtype="float32"), + keras.layers.Dense(units=128, dtype=dtype), LeakyReLU(negative_slope=0.01), - keras.layers.Dense(units=8, dtype="float32") + keras.layers.Dense(units=8, dtype=dtype), ] ) - - + elif architecture == "large": model = keras.Sequential( [ @@ -68,312 +55,569 @@ def model_definition(architecture): LeakyReLU(negative_slope=0.01), keras.layers.Dense(512, dtype=dtype), LeakyReLU(negative_slope=0.01), - keras.layers.Dense(8, dtype=dtype) + keras.layers.Dense(8, dtype=dtype), ] ) - + elif architecture == "paper": model = keras.Sequential( - [keras.layers.Input(shape=(8,), dtype=dtype), - keras.layers.Dense(128, dtype=dtype), - LeakyReLU(negative_slope=0.01), - keras.layers.Dense(256, dtype=dtype), - LeakyReLU(negative_slope=0.01), - keras.layers.Dense(512, dtype=dtype), - LeakyReLU(negative_slope=0.01), - keras.layers.Dense(256, dtype=dtype), - LeakyReLU(negative_slope=0.01), - keras.layers.Dense(8, dtype=dtype) - ]) - + [ + keras.layers.Input(shape=(8,), dtype=dtype), + keras.layers.Dense(128, dtype=dtype), + LeakyReLU(negative_slope=0.01), + keras.layers.Dense(256, dtype=dtype), + LeakyReLU(negative_slope=0.01), + keras.layers.Dense(512, dtype=dtype), + LeakyReLU(negative_slope=0.01), + keras.layers.Dense(256, dtype=dtype), + LeakyReLU(negative_slope=0.01), + keras.layers.Dense(8, dtype=dtype), + ] + ) + + else: + raise Exception( + "No valid architecture found." + + "Choose between 'small', 'large' or 'paper'." + ) + return model - + + @keras.saving.register_keras_serializable() -def custom_loss(preprocess, column_dict, h1, h2, h3, scaler_type="minmax", loss_variant="huber", delta=1.0): - # extract the scaling parameters - +def custom_loss( + preprocess, + column_dict, + h1, + h2, + h3, + scaler_type="minmax", + loss_variant="huber", + delta=1.0, +): + """Custom tensorflow loss function to combine Huber Loss with mass balance. + This is inspired by PINN (Physics Informed Neural Networks) where the loss function is a combination of the physics-based loss and the data-driven loss. + The mass balance is a physics-based loss that ensures the conservation of mass in the system. + A tensorflow loss function accepts only the two arguments y_true and y_pred. Therefore, a nested function is used to pass the additional arguments. + + Args: + preprocess: preprocessing object + column_dict: dictionary with the column names as keys and the corresponding index as values. (i.e {'H': 0, 'O': 1, 'Ba': 2, 'Cl': 3, 'S': 4, 'Sr': 5, 'Barite': 6, 'Celestite': 7}) + h1: hyperparameter for the importance of the huber loss + h2: hyperparameter for the importance of the Barium mass balance term + h3: hyperparameter for the importance of the Strontium mass balance term + scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax". + loss_variant: Loss function approach. Choose between "huber and "huber_mass_balance". Defaults to "huber". + delta: Hyperparameter for the Huber function threshold. Defaults to 1.0. + + Returns: + loss function + """ + + # as far as I know tensorflow does not directly support the use of scaler objects + # therefore, the backtransformation is done manually if scaler_type == "minmax": - scale_X = tf.convert_to_tensor(preprocess.scaler_X.data_range_, dtype=tf.float32) - min_X = tf.convert_to_tensor(preprocess.scaler_X.data_min_, dtype=tf.float32) - scale_y = tf.convert_to_tensor(preprocess.scaler_y.data_range_, dtype=tf.float32) - min_y = tf.convert_to_tensor(preprocess.scaler_y.data_min_, dtype=tf.float32) - + scale_X = tf.convert_to_tensor( + preprocess.scaler_X.data_range_, dtype=tf.float32 + ) + min_X = tf.convert_to_tensor( + preprocess.scaler_X.data_min_, dtype=tf.float32) + scale_y = tf.convert_to_tensor( + preprocess.scaler_y.data_range_, dtype=tf.float32 + ) + min_y = tf.convert_to_tensor( + preprocess.scaler_y.data_min_, dtype=tf.float32) + elif scaler_type == "standard": - scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32) - mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32) - scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32) - mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32) + scale_X = tf.convert_to_tensor( + preprocess.scaler_X.scale_, dtype=tf.float32) + mean_X = tf.convert_to_tensor( + preprocess.scaler_X.mean_, dtype=tf.float32) + scale_y = tf.convert_to_tensor( + preprocess.scaler_y.scale_, dtype=tf.float32) + mean_y = tf.convert_to_tensor( + preprocess.scaler_y.mean_, dtype=tf.float32) + + else: + raise Exception("No valid scaler type found. Choose between 'standard' and 'minmax'.") def loss(results, predicted): - # inverse min/max scaling if scaler_type == "minmax": predicted_inverse = predicted * scale_y + min_y results_inverse = results * scale_X + min_X - + # inverse standard scaling elif scaler_type == "standard": predicted_inverse = predicted * scale_y + mean_y results_inverse = results * scale_X + mean_X - - - # apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was applied + + # apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was used if preprocess.func_dict_out is not None: predicted_inverse = tf.math.expm1(predicted_inverse) results_inverse = tf.math.expm1(results_inverse) - + # mass balance + # in total no Barium and Strontium should be lost in one simulation step dBa = tf.keras.backend.abs( - (predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]]) - - (results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]]) + ( + predicted_inverse[:, column_dict["Ba"]] + + predicted_inverse[:, column_dict["Barite"]] + ) + - ( + results_inverse[:, column_dict["Ba"]] + + results_inverse[:, column_dict["Barite"]] + ) ) dSr = tf.keras.backend.abs( - (predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]]) - - (results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]]) + ( + predicted_inverse[:, column_dict["Sr"]] + + predicted_inverse[:, column_dict["Celestite"]] + ) + - ( + results_inverse[:, column_dict["Sr"]] + + results_inverse[:, column_dict["Celestite"]] + ) ) # huber loss huber_loss = tf.keras.losses.Huber(delta)(results, predicted) - + # total loss if loss_variant == "huber": total_loss = huber_loss elif loss_variant == "huber_mass_balance": total_loss = h1 * huber_loss + h2 * dBa + h3 * dSr + else: + raise Exception( + "No valid loss variant found. Choose between 'huber' and 'huber_mass_balance'.") return total_loss return loss + def mass_balance_metric(preprocess, column_dict, scaler_type="minmax"): + """Auxilary function to calculate the mass balance during training. + + Args: + preprocess: preprocessing object + column_dict: dictionary with the column names as keys and the corresponding index as values + scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax". + + Returns: + mean of both mass balance terms + """ + if scaler_type == "minmax": - scale_X = tf.convert_to_tensor(preprocess.scaler_X.data_range_, dtype=tf.float32) - min_X = tf.convert_to_tensor(preprocess.scaler_X.data_min_, dtype=tf.float32) - scale_y = tf.convert_to_tensor(preprocess.scaler_y.data_range_, dtype=tf.float32) - min_y = tf.convert_to_tensor(preprocess.scaler_y.data_min_, dtype=tf.float32) + scale_X = tf.convert_to_tensor( + preprocess.scaler_X.data_range_, dtype=tf.float32 + ) + min_X = tf.convert_to_tensor( + preprocess.scaler_X.data_min_, dtype=tf.float32) + scale_y = tf.convert_to_tensor( + preprocess.scaler_y.data_range_, dtype=tf.float32 + ) + min_y = tf.convert_to_tensor( + preprocess.scaler_y.data_min_, dtype=tf.float32) elif scaler_type == "standard": - scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32) - mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32) - scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32) - mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32) - - + scale_X = tf.convert_to_tensor( + preprocess.scaler_X.scale_, dtype=tf.float32) + mean_X = tf.convert_to_tensor( + preprocess.scaler_X.mean_, dtype=tf.float32) + scale_y = tf.convert_to_tensor( + preprocess.scaler_y.scale_, dtype=tf.float32) + mean_y = tf.convert_to_tensor( + preprocess.scaler_y.mean_, dtype=tf.float32) + def mass_balance(results, predicted): # inverse min/max scaling if scaler_type == "minmax": predicted_inverse = predicted * scale_y + min_y results_inverse = results * scale_X + min_X - + elif scaler_type == "standard": predicted_inverse = predicted * scale_y + mean_y results_inverse = results * scale_X + mean_X - + if preprocess.func_dict_out is not None: predicted_inverse = tf.math.expm1(predicted_inverse) results_inverse = tf.math.expm1(results_inverse) # mass balance dBa = tf.keras.backend.abs( - (predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]]) - - (results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]]) + ( + predicted_inverse[:, column_dict["Ba"]] + + predicted_inverse[:, column_dict["Barite"]] + ) + - ( + results_inverse[:, column_dict["Ba"]] + + results_inverse[:, column_dict["Barite"]] + ) ) dSr = tf.keras.backend.abs( - (predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]]) - - (results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]]) + ( + predicted_inverse[:, column_dict["Sr"]] + + predicted_inverse[:, column_dict["Celestite"]] + ) + - ( + results_inverse[:, column_dict["Sr"]] + + results_inverse[:, column_dict["Celestite"]] + ) ) return tf.reduce_mean(dBa + dSr) - + return mass_balance -def huber_metric(preprocess, scaler_type="minmax", delta=1.0): +def huber_metric(delta=1.0): + """Auxilary function to calculate the Huber loss during training. + + Args: + preprocess (_type_): _description_ + scaler_type (str, optional): _description_. Defaults to "minmax". + delta (float, optional): _description_. Defaults to 1.0. + """ - def huber(results, predicted): + def huber(results, predicted): huber_loss = tf.keras.losses.Huber(delta)(results, predicted) return huber_loss - + return huber + def mass_balance_evaluation(model, X, preprocess): + """Calculates the mass balance difference for each cell. + + Args: + model: trained model + X: data where the mass balance should be calculated + preprocess: preprocessing object + + Returns: + vector with the mass balance difference for each cell + """ # predict the chemistry columns = X.iloc[:, X.columns != "Class"].columns prediction = pd.DataFrame(model.predict(X[columns]), columns=columns) - + # backtransform min/max or standard scaler - X = pd.DataFrame(preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]), columns=columns) - prediction = pd.DataFrame(preprocess.scaler_y.inverse_transform(prediction), columns=columns) - + X = pd.DataFrame( + preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]), + columns=columns, + ) + prediction = pd.DataFrame( + preprocess.scaler_y.inverse_transform(prediction), columns=columns + ) + # apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was applied if preprocess.func_dict_out is not None: X = preprocess.funcInverse(X) - - # calculate mass balance - dBa = np.abs((prediction["Ba"] + prediction["Barite"]) - (X["Ba"] + X["Barite"])) - print(dBa.min()) - dSr = np.abs((prediction["Sr"] + prediction["Celestite"]) - (X["Sr"] + X["Celestite"])) - print(dSr.min()) - return dBa+dSr - - -class preprocessing: + # calculate mass balance + dBa = np.abs( + (prediction["Ba"] + prediction["Barite"]) - (X["Ba"] + X["Barite"])) + print(dBa.min()) + dSr = np.abs( + (prediction["Sr"] + prediction["Celestite"]) - + (X["Sr"] + X["Celestite"]) + ) + + return dBa + dSr + + +class preprocessing: + """ + A class used to preprocess data for model training. + Attributes + """ + def __init__(self, func_dict_in=None, func_dict_out=None, random_state=42): + """Initialization of the preprocessing object. + + Args: + func_dict_in: function for transformation. Defaults to None. + func_dict_out: function for backtransformation. Defaults to None. + random_state (int, optional): Seed for reproducability. Defaults to 42. + """ self.random_state = random_state self.scaler_X = None self.scaler_y = None self.func_dict_in = func_dict_in if func_dict_in is not None else None self.func_dict_out = func_dict_out if func_dict_out is not None else None - self.state = {"cluster": False, "log": False, "balance": False, "scale": False} - + self.state = {"cluster": False, "log": False, + "balance": False, "scale": False} + def funcTranform(self, *args): - + """Apply the transformation function to the data columnwise. + + Returns: + pandas data frame: transformed data + """ for i in args: - for key in i.keys(): + for key in i.keys(): if "Class" not in key: i[key] = i[key].apply(self.func_dict_in) self.state["log"] = True return args - + def funcInverse(self, *args): - + """Apply the backtransformation function to the data columnwise. + + Returns: + pandas data frame: backtransformed data + """ for i in args: for key in i.keys(): if "Class" not in key: i[key] = i[key].apply(self.func_dict_out) self.state["log"] = False return args - - def cluster(self, X, y, species='Barite', n_clusters=2, x_length=50, y_length=50): - + + def cluster(self, X, y, species="Barite", n_clusters=2, x_length=50, y_length=50): + """Apply k-means clustering to the data to differentiate betweeen reactive and non-reactive cells. + + Args: + X: design data set + y: target data set + species (str, optional): Chemical species to which clustering is be applied. Defaults to "Barite". + n_clusters (int, optional): Number of clusters. Defaults to 2. + x_length: x dimension of the grid. Defaults to 50. + y_length: y dimension of the grid. Defaults to 50. + + Returns: + X, y dataframes with an additional column "Class" containing the cluster labels. + """ class_labels = np.array([]) grid_length = x_length * y_length iterations = int(len(X) / grid_length) + # calculate the cluster for each chemical iteration step for i in range(0, iterations): - field = np.array(X[species][(i*grid_length):(i*grid_length+grid_length)] - ).reshape(x_length, y_length) - kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state).fit(field.reshape(-1, 1)) + field = np.array( + X[species][(i * grid_length): (i * grid_length + grid_length)] + ).reshape(x_length, y_length) + kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state).fit( + field.reshape(-1, 1) + ) class_labels = np.append(class_labels.astype(int), kmeans.labels_) - if ("Class" in X.columns and "Class" in y.columns): + if "Class" in X.columns and "Class" in y.columns: print("Class column already exists") else: - class_labels_df = pd.DataFrame(class_labels, columns=['Class']) + class_labels_df = pd.DataFrame(class_labels, columns=["Class"]) X = pd.concat([X, class_labels_df], axis=1) y = pd.concat([y, class_labels_df], axis=1) self.state["cluster"] = True - + return X, y - - + def balancer(self, X, y, strategy, sample_fraction=0.5): - + """Apply sampling strategies to balance the dataset. + + Args: + X: design dataset (before the simulation) + y: target dataset (after the simulation) + strategy: Sampling strategy. Choose between "smote" (Synthetic Minority Oversampling Technique), "over" (Oversampling) and "under" (Undersampling). + sample_fraction (float, optional): Define balancer target. Specifies the target fraction of the minority class after the balancing step. Defaults to 0.5. + + Returns: + X, y: resampled datasets + """ number_features = (X.columns != "Class").sum() - if("Class" not in X.columns): - if("Class" in y.columns): - classes = y['Class'] + if "Class" not in X.columns: + if "Class" in y.columns: + classes = y["Class"] else: raise Exception("No class column found") else: - classes = X['Class'] + classes = X["Class"] counter = classes.value_counts() - print("Amount class 0 before:", counter[0] / (counter[0] + counter[1]) ) - print("Amount class 1 before:", counter[1] / (counter[0] + counter[1]) ) - df = pd.concat([X.loc[:,X.columns != "Class"], y.loc[:, y.columns != "Class"], classes], axis=1) + print("Amount class 0 before:", + counter[0] / (counter[0] + counter[1])) + print("Amount class 1 before:", + counter[1] / (counter[0] + counter[1])) + df = pd.concat( + [ + X.loc[:, X.columns != "Class"], + y.loc[:, y.columns != "Class"], + classes, + ], + axis=1, + ) - if strategy == 'smote': + if strategy == "smote": print("Using SMOTE strategy") smote = SMOTE(sampling_strategy=sample_fraction) - df_resampled, classes_resampled = smote.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"]) + df_resampled, classes_resampled = smote.fit_resample( + df.loc[:, df.columns != "Class"], df.loc[:, + df.columns == "Class"] + ) - elif strategy == 'over': + elif strategy == "over": print("Using Oversampling") over = RandomOverSampler() - df_resampled, classes_resampled = over.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"]) + df_resampled, classes_resampled = over.fit_resample( + df.loc[:, df.columns != "Class"], df.loc[:, + df.columns == "Class"] + ) - elif strategy == 'under': + elif strategy == "under": print("Using Undersampling") under = RandomUnderSampler() - df_resampled, classes_resampled = under.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"]) + df_resampled, classes_resampled = under.fit_resample( + df.loc[:, df.columns != "Class"], df.loc[:, + df.columns == "Class"] + ) else: + print("No sampling selected. Output equals input.") return X, y counter = classes_resampled["Class"].value_counts() - print("Amount class 0 after:", counter[0] / (counter[0] + counter[1]) ) - print("Amount class 1 after:", counter[1] / (counter[0] + counter[1]) ) + print("Amount class 0 after:", counter[0] / (counter[0] + counter[1])) + print("Amount class 1 after:", counter[1] / (counter[0] + counter[1])) - design_resampled = pd.concat([df_resampled.iloc[:,0:number_features], classes_resampled], axis=1) - target_resampled = pd.concat([df_resampled.iloc[:,number_features:], classes_resampled], axis=1) + design_resampled = pd.concat( + [df_resampled.iloc[:, 0:number_features], classes_resampled], axis=1 + ) + target_resampled = pd.concat( + [df_resampled.iloc[:, number_features:], classes_resampled], axis=1 + ) - self.state['balance'] = True + self.state["balance"] = True return design_resampled, target_resampled - - - def scale_fit(self, X, y, scaling, type='Standard'): + + def scale_fit(self, X, y, scaling, type="Standard"): + """Fit a scaler for data preprocessing. + + Args: + X: design dataset + y: target dataset + scaling: learn individual scaler for X and y when "individual" is selected or one global scaler on all data in X and y if "global" is selected (scaler_X and scaler_y are equal) + type (str, optional): Using MinMax Scaling or Standarization. Defaults to "Standard". + """ - if type == 'minmax': + if type == "minmax": self.scaler_X = MinMaxScaler() self.scaler_y = MinMaxScaler() - elif type == 'standard': + elif type == "standard": self.scaler_X = StandardScaler() self.scaler_y = StandardScaler() - + else: raise Exception("No valid scaler type found") - - if scaling == 'individual': + + if scaling == "individual": self.scaler_X.fit(X.iloc[:, X.columns != "Class"]) self.scaler_y.fit(y.iloc[:, y.columns != "Class"]) - - elif scaling == 'global': - self.scaler_X.fit(pd.concat([X.iloc[:, X.columns != "Class"], y.iloc[:, y.columns != "Class"]], axis=0)) - self.scaler_y = self.scaler_X - - self.state['scale'] = True - - def scale_transform(self, X_train, X_test, y_train, y_test): - X_train = pd.concat([self.scaler_X.transform(X_train.loc[:, X_train.columns != "Class"]), X_train.loc[:, "Class"]], axis=1) - - X_test = pd.concat([self.scaler_X.transform(X_test.loc[:, X_test.columns != "Class"]), X_test.loc[:, "Class"]], axis=1) - y_train = pd.concat([self.scaler_y.transform(y_train.loc[:, y_train.columns != "Class"]), y_train.loc[:, "Class"]], axis=1) - - y_test = pd.concat([self.scaler_y.transform(y_test.loc[:, y_test.columns != "Class"]), y_test.loc[:, "Class"]], axis=1) + elif scaling == "global": + self.scaler_X.fit( + pd.concat( + [X.iloc[:, X.columns != "Class"], + y.iloc[:, y.columns != "Class"]], + axis=0, + ) + ) + self.scaler_y = self.scaler_X + + self.state["scale"] = True + + def scale_transform(self, X_train, X_test, y_train, y_test): + """ Apply learned scaler on datasets. + + Args: + X_train: design training data + X_test: test training data + y_train: target training data + y_test: test target data + + Returns: + transformed dataframes + """ + X_train = pd.concat( + [ + self.scaler_X.transform( + X_train.loc[:, X_train.columns != "Class"]), + X_train.loc[:, "Class"], + ], + axis=1, + ) + + X_test = pd.concat( + [ + self.scaler_X.transform( + X_test.loc[:, X_test.columns != "Class"]), + X_test.loc[:, "Class"], + ], + axis=1, + ) + + y_train = pd.concat( + [ + self.scaler_y.transform( + y_train.loc[:, y_train.columns != "Class"]), + y_train.loc[:, "Class"], + ], + axis=1, + ) + + y_test = pd.concat( + [ + self.scaler_y.transform( + y_test.loc[:, y_test.columns != "Class"]), + y_test.loc[:, "Class"], + ], + axis=1, + ) + return X_train, X_test, y_train, y_test - + def scale_inverse(self, *args): + """Backtransform the dataset + + Returns: + Backtransformed data frames + """ + result = [] for i in args: if "Class" in i.columns: - inversed = pd.DataFrame(self.scaler_X.inverse_transform(i.loc[:, i.columns != "Class"]), columns=i.columns[:-1]) + inversed = pd.DataFrame( + self.scaler_X.inverse_transform( + i.loc[:, i.columns != "Class"]), + columns=i.columns[:-1], + ) class_column = i.loc[:, "Class"].reset_index(drop=True) i = pd.concat([inversed, class_column], axis=1) else: - i = pd.DataFrame(self.scaler_X.inverse_transform(i), columns=i.columns) + i = pd.DataFrame( + self.scaler_X.inverse_transform(i), columns=i.columns) result.append(i) return result - + def split(self, X, y, ratio=0.8): - X_train, y_train, X_test, y_test = sk.train_test_split(X, y, test_size = ratio, random_state=self.random_state) - + X_train, y_train, X_test, y_test = sk.train_test_split( + X, y, test_size=ratio, random_state=self.random_state + ) + return X_train, y_train, X_test, y_test - + def class_selection(self, *args, class_label=0): - + """Select only rows with specific class label + + Args: + Dataframes where rows with specific label should be selected. Defaults to 0. + + Returns: + Elements with selected class label. + """ for i in args: - i = i[i['Class'] == class_label] - + i = i[i["Class"] == class_label] + return args - - - - - - - - \ No newline at end of file