document functions

This commit is contained in:
Hannes Signer 2025-02-26 12:57:39 +01:00
parent 03918222c4
commit cce2f696a0

View File

@ -1,5 +1,5 @@
import keras import keras
from keras.layers import Dense, Dropout, Input,BatchNormalization, LeakyReLU from keras.layers import Dense, Dropout, Input, BatchNormalization, LeakyReLU
import tensorflow as tf import tensorflow as tf
import h5py import h5py
import numpy as np import numpy as np
@ -18,46 +18,33 @@ import os
from preprocessing import * from preprocessing import *
from sklearn import set_config from sklearn import set_config
from importlib import reload from importlib import reload
set_config(transform_output = "pandas")
# preprocessing pipeline set_config(transform_output="pandas")
#
def Safelog(val):
# get range of vector
if val > 0:
return np.log10(val)
elif val < 0:
return -np.log10(-val)
else:
return 0
def Safeexp(val):
if val > 0:
return -10 ** -val
elif val < 0:
return 10 ** val
else:
return 0
def model_definition(architecture): def model_definition(architecture):
"""Definition of the respective AI model. Three models are currently being analysed, which are labelled small, large or paper.
Args:
architecture (String): Choose between 'small', 'large' or 'paper'.
Returns:
keras model: Returns the respective model.
"""
dtype = "float32" dtype = "float32"
if architecture == "small": if architecture == "small":
model = keras.Sequential( model = keras.Sequential(
[ [
keras.Input(shape=(8,), dtype="float32"), keras.Input(shape=(8,), dtype=dtype),
keras.layers.Dense(units=128, dtype="float32"), keras.layers.Dense(units=128, dtype=dtype),
LeakyReLU(negative_slope=0.01), LeakyReLU(negative_slope=0.01),
# Dropout(0.2), # Dropout(0.2),
keras.layers.Dense(units=128, dtype="float32"), keras.layers.Dense(units=128, dtype=dtype),
LeakyReLU(negative_slope=0.01), LeakyReLU(negative_slope=0.01),
keras.layers.Dense(units=8, dtype="float32") keras.layers.Dense(units=8, dtype=dtype),
] ]
) )
elif architecture == "large": elif architecture == "large":
model = keras.Sequential( model = keras.Sequential(
[ [
@ -68,312 +55,569 @@ def model_definition(architecture):
LeakyReLU(negative_slope=0.01), LeakyReLU(negative_slope=0.01),
keras.layers.Dense(512, dtype=dtype), keras.layers.Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01), LeakyReLU(negative_slope=0.01),
keras.layers.Dense(8, dtype=dtype) keras.layers.Dense(8, dtype=dtype),
] ]
) )
elif architecture == "paper": elif architecture == "paper":
model = keras.Sequential( model = keras.Sequential(
[keras.layers.Input(shape=(8,), dtype=dtype), [
keras.layers.Dense(128, dtype=dtype), keras.layers.Input(shape=(8,), dtype=dtype),
LeakyReLU(negative_slope=0.01), keras.layers.Dense(128, dtype=dtype),
keras.layers.Dense(256, dtype=dtype), LeakyReLU(negative_slope=0.01),
LeakyReLU(negative_slope=0.01), keras.layers.Dense(256, dtype=dtype),
keras.layers.Dense(512, dtype=dtype), LeakyReLU(negative_slope=0.01),
LeakyReLU(negative_slope=0.01), keras.layers.Dense(512, dtype=dtype),
keras.layers.Dense(256, dtype=dtype), LeakyReLU(negative_slope=0.01),
LeakyReLU(negative_slope=0.01), keras.layers.Dense(256, dtype=dtype),
keras.layers.Dense(8, dtype=dtype) LeakyReLU(negative_slope=0.01),
]) keras.layers.Dense(8, dtype=dtype),
]
)
else:
raise Exception(
"No valid architecture found."
+ "Choose between 'small', 'large' or 'paper'."
)
return model return model
@keras.saving.register_keras_serializable() @keras.saving.register_keras_serializable()
def custom_loss(preprocess, column_dict, h1, h2, h3, scaler_type="minmax", loss_variant="huber", delta=1.0): def custom_loss(
# extract the scaling parameters preprocess,
column_dict,
h1,
h2,
h3,
scaler_type="minmax",
loss_variant="huber",
delta=1.0,
):
"""Custom tensorflow loss function to combine Huber Loss with mass balance.
This is inspired by PINN (Physics Informed Neural Networks) where the loss function is a combination of the physics-based loss and the data-driven loss.
The mass balance is a physics-based loss that ensures the conservation of mass in the system.
A tensorflow loss function accepts only the two arguments y_true and y_pred. Therefore, a nested function is used to pass the additional arguments.
Args:
preprocess: preprocessing object
column_dict: dictionary with the column names as keys and the corresponding index as values. (i.e {'H': 0, 'O': 1, 'Ba': 2, 'Cl': 3, 'S': 4, 'Sr': 5, 'Barite': 6, 'Celestite': 7})
h1: hyperparameter for the importance of the huber loss
h2: hyperparameter for the importance of the Barium mass balance term
h3: hyperparameter for the importance of the Strontium mass balance term
scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax".
loss_variant: Loss function approach. Choose between "huber and "huber_mass_balance". Defaults to "huber".
delta: Hyperparameter for the Huber function threshold. Defaults to 1.0.
Returns:
loss function
"""
# as far as I know tensorflow does not directly support the use of scaler objects
# therefore, the backtransformation is done manually
if scaler_type == "minmax": if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.data_range_, dtype=tf.float32) scale_X = tf.convert_to_tensor(
min_X = tf.convert_to_tensor(preprocess.scaler_X.data_min_, dtype=tf.float32) preprocess.scaler_X.data_range_, dtype=tf.float32
scale_y = tf.convert_to_tensor(preprocess.scaler_y.data_range_, dtype=tf.float32) )
min_y = tf.convert_to_tensor(preprocess.scaler_y.data_min_, dtype=tf.float32) min_X = tf.convert_to_tensor(
preprocess.scaler_X.data_min_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.data_range_, dtype=tf.float32
)
min_y = tf.convert_to_tensor(
preprocess.scaler_y.data_min_, dtype=tf.float32)
elif scaler_type == "standard": elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32) scale_X = tf.convert_to_tensor(
mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32) preprocess.scaler_X.scale_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32) mean_X = tf.convert_to_tensor(
mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32) preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(
preprocess.scaler_y.mean_, dtype=tf.float32)
else:
raise Exception("No valid scaler type found. Choose between 'standard' and 'minmax'.")
def loss(results, predicted): def loss(results, predicted):
# inverse min/max scaling # inverse min/max scaling
if scaler_type == "minmax": if scaler_type == "minmax":
predicted_inverse = predicted * scale_y + min_y predicted_inverse = predicted * scale_y + min_y
results_inverse = results * scale_X + min_X results_inverse = results * scale_X + min_X
# inverse standard scaling # inverse standard scaling
elif scaler_type == "standard": elif scaler_type == "standard":
predicted_inverse = predicted * scale_y + mean_y predicted_inverse = predicted * scale_y + mean_y
results_inverse = results * scale_X + mean_X results_inverse = results * scale_X + mean_X
# apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was used
# apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was applied
if preprocess.func_dict_out is not None: if preprocess.func_dict_out is not None:
predicted_inverse = tf.math.expm1(predicted_inverse) predicted_inverse = tf.math.expm1(predicted_inverse)
results_inverse = tf.math.expm1(results_inverse) results_inverse = tf.math.expm1(results_inverse)
# mass balance # mass balance
# in total no Barium and Strontium should be lost in one simulation step
dBa = tf.keras.backend.abs( dBa = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]]) - (
(results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]]) predicted_inverse[:, column_dict["Ba"]]
+ predicted_inverse[:, column_dict["Barite"]]
)
- (
results_inverse[:, column_dict["Ba"]]
+ results_inverse[:, column_dict["Barite"]]
)
) )
dSr = tf.keras.backend.abs( dSr = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]]) - (
(results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]]) predicted_inverse[:, column_dict["Sr"]]
+ predicted_inverse[:, column_dict["Celestite"]]
)
- (
results_inverse[:, column_dict["Sr"]]
+ results_inverse[:, column_dict["Celestite"]]
)
) )
# huber loss # huber loss
huber_loss = tf.keras.losses.Huber(delta)(results, predicted) huber_loss = tf.keras.losses.Huber(delta)(results, predicted)
# total loss # total loss
if loss_variant == "huber": if loss_variant == "huber":
total_loss = huber_loss total_loss = huber_loss
elif loss_variant == "huber_mass_balance": elif loss_variant == "huber_mass_balance":
total_loss = h1 * huber_loss + h2 * dBa + h3 * dSr total_loss = h1 * huber_loss + h2 * dBa + h3 * dSr
else:
raise Exception(
"No valid loss variant found. Choose between 'huber' and 'huber_mass_balance'.")
return total_loss return total_loss
return loss return loss
def mass_balance_metric(preprocess, column_dict, scaler_type="minmax"): def mass_balance_metric(preprocess, column_dict, scaler_type="minmax"):
"""Auxilary function to calculate the mass balance during training.
Args:
preprocess: preprocessing object
column_dict: dictionary with the column names as keys and the corresponding index as values
scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax".
Returns:
mean of both mass balance terms
"""
if scaler_type == "minmax": if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.data_range_, dtype=tf.float32) scale_X = tf.convert_to_tensor(
min_X = tf.convert_to_tensor(preprocess.scaler_X.data_min_, dtype=tf.float32) preprocess.scaler_X.data_range_, dtype=tf.float32
scale_y = tf.convert_to_tensor(preprocess.scaler_y.data_range_, dtype=tf.float32) )
min_y = tf.convert_to_tensor(preprocess.scaler_y.data_min_, dtype=tf.float32) min_X = tf.convert_to_tensor(
preprocess.scaler_X.data_min_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.data_range_, dtype=tf.float32
)
min_y = tf.convert_to_tensor(
preprocess.scaler_y.data_min_, dtype=tf.float32)
elif scaler_type == "standard": elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32) scale_X = tf.convert_to_tensor(
mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32) preprocess.scaler_X.scale_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32) mean_X = tf.convert_to_tensor(
mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32) preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(
preprocess.scaler_y.mean_, dtype=tf.float32)
def mass_balance(results, predicted): def mass_balance(results, predicted):
# inverse min/max scaling # inverse min/max scaling
if scaler_type == "minmax": if scaler_type == "minmax":
predicted_inverse = predicted * scale_y + min_y predicted_inverse = predicted * scale_y + min_y
results_inverse = results * scale_X + min_X results_inverse = results * scale_X + min_X
elif scaler_type == "standard": elif scaler_type == "standard":
predicted_inverse = predicted * scale_y + mean_y predicted_inverse = predicted * scale_y + mean_y
results_inverse = results * scale_X + mean_X results_inverse = results * scale_X + mean_X
if preprocess.func_dict_out is not None: if preprocess.func_dict_out is not None:
predicted_inverse = tf.math.expm1(predicted_inverse) predicted_inverse = tf.math.expm1(predicted_inverse)
results_inverse = tf.math.expm1(results_inverse) results_inverse = tf.math.expm1(results_inverse)
# mass balance # mass balance
dBa = tf.keras.backend.abs( dBa = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]]) - (
(results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]]) predicted_inverse[:, column_dict["Ba"]]
+ predicted_inverse[:, column_dict["Barite"]]
)
- (
results_inverse[:, column_dict["Ba"]]
+ results_inverse[:, column_dict["Barite"]]
)
) )
dSr = tf.keras.backend.abs( dSr = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]]) - (
(results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]]) predicted_inverse[:, column_dict["Sr"]]
+ predicted_inverse[:, column_dict["Celestite"]]
)
- (
results_inverse[:, column_dict["Sr"]]
+ results_inverse[:, column_dict["Celestite"]]
)
) )
return tf.reduce_mean(dBa + dSr) return tf.reduce_mean(dBa + dSr)
return mass_balance return mass_balance
def huber_metric(preprocess, scaler_type="minmax", delta=1.0): def huber_metric(delta=1.0):
"""Auxilary function to calculate the Huber loss during training.
Args:
preprocess (_type_): _description_
scaler_type (str, optional): _description_. Defaults to "minmax".
delta (float, optional): _description_. Defaults to 1.0.
"""
def huber(results, predicted): def huber(results, predicted):
huber_loss = tf.keras.losses.Huber(delta)(results, predicted) huber_loss = tf.keras.losses.Huber(delta)(results, predicted)
return huber_loss return huber_loss
return huber return huber
def mass_balance_evaluation(model, X, preprocess): def mass_balance_evaluation(model, X, preprocess):
"""Calculates the mass balance difference for each cell.
Args:
model: trained model
X: data where the mass balance should be calculated
preprocess: preprocessing object
Returns:
vector with the mass balance difference for each cell
"""
# predict the chemistry # predict the chemistry
columns = X.iloc[:, X.columns != "Class"].columns columns = X.iloc[:, X.columns != "Class"].columns
prediction = pd.DataFrame(model.predict(X[columns]), columns=columns) prediction = pd.DataFrame(model.predict(X[columns]), columns=columns)
# backtransform min/max or standard scaler # backtransform min/max or standard scaler
X = pd.DataFrame(preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]), columns=columns) X = pd.DataFrame(
prediction = pd.DataFrame(preprocess.scaler_y.inverse_transform(prediction), columns=columns) preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]),
columns=columns,
)
prediction = pd.DataFrame(
preprocess.scaler_y.inverse_transform(prediction), columns=columns
)
# apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was applied # apply exp1m on the columns of predicted_inverse and results_inverse if log transformation was applied
if preprocess.func_dict_out is not None: if preprocess.func_dict_out is not None:
X = preprocess.funcInverse(X) X = preprocess.funcInverse(X)
# calculate mass balance
dBa = np.abs((prediction["Ba"] + prediction["Barite"]) - (X["Ba"] + X["Barite"]))
print(dBa.min())
dSr = np.abs((prediction["Sr"] + prediction["Celestite"]) - (X["Sr"] + X["Celestite"]))
print(dSr.min())
return dBa+dSr
class preprocessing:
# calculate mass balance
dBa = np.abs(
(prediction["Ba"] + prediction["Barite"]) - (X["Ba"] + X["Barite"]))
print(dBa.min())
dSr = np.abs(
(prediction["Sr"] + prediction["Celestite"]) -
(X["Sr"] + X["Celestite"])
)
return dBa + dSr
class preprocessing:
"""
A class used to preprocess data for model training.
Attributes
"""
def __init__(self, func_dict_in=None, func_dict_out=None, random_state=42): def __init__(self, func_dict_in=None, func_dict_out=None, random_state=42):
"""Initialization of the preprocessing object.
Args:
func_dict_in: function for transformation. Defaults to None.
func_dict_out: function for backtransformation. Defaults to None.
random_state (int, optional): Seed for reproducability. Defaults to 42.
"""
self.random_state = random_state self.random_state = random_state
self.scaler_X = None self.scaler_X = None
self.scaler_y = None self.scaler_y = None
self.func_dict_in = func_dict_in if func_dict_in is not None else None self.func_dict_in = func_dict_in if func_dict_in is not None else None
self.func_dict_out = func_dict_out if func_dict_out is not None else None self.func_dict_out = func_dict_out if func_dict_out is not None else None
self.state = {"cluster": False, "log": False, "balance": False, "scale": False} self.state = {"cluster": False, "log": False,
"balance": False, "scale": False}
def funcTranform(self, *args): def funcTranform(self, *args):
"""Apply the transformation function to the data columnwise.
Returns:
pandas data frame: transformed data
"""
for i in args: for i in args:
for key in i.keys(): for key in i.keys():
if "Class" not in key: if "Class" not in key:
i[key] = i[key].apply(self.func_dict_in) i[key] = i[key].apply(self.func_dict_in)
self.state["log"] = True self.state["log"] = True
return args return args
def funcInverse(self, *args): def funcInverse(self, *args):
"""Apply the backtransformation function to the data columnwise.
Returns:
pandas data frame: backtransformed data
"""
for i in args: for i in args:
for key in i.keys(): for key in i.keys():
if "Class" not in key: if "Class" not in key:
i[key] = i[key].apply(self.func_dict_out) i[key] = i[key].apply(self.func_dict_out)
self.state["log"] = False self.state["log"] = False
return args return args
def cluster(self, X, y, species='Barite', n_clusters=2, x_length=50, y_length=50): def cluster(self, X, y, species="Barite", n_clusters=2, x_length=50, y_length=50):
"""Apply k-means clustering to the data to differentiate betweeen reactive and non-reactive cells.
Args:
X: design data set
y: target data set
species (str, optional): Chemical species to which clustering is be applied. Defaults to "Barite".
n_clusters (int, optional): Number of clusters. Defaults to 2.
x_length: x dimension of the grid. Defaults to 50.
y_length: y dimension of the grid. Defaults to 50.
Returns:
X, y dataframes with an additional column "Class" containing the cluster labels.
"""
class_labels = np.array([]) class_labels = np.array([])
grid_length = x_length * y_length grid_length = x_length * y_length
iterations = int(len(X) / grid_length) iterations = int(len(X) / grid_length)
# calculate the cluster for each chemical iteration step
for i in range(0, iterations): for i in range(0, iterations):
field = np.array(X[species][(i*grid_length):(i*grid_length+grid_length)] field = np.array(
).reshape(x_length, y_length) X[species][(i * grid_length): (i * grid_length + grid_length)]
kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state).fit(field.reshape(-1, 1)) ).reshape(x_length, y_length)
kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state).fit(
field.reshape(-1, 1)
)
class_labels = np.append(class_labels.astype(int), kmeans.labels_) class_labels = np.append(class_labels.astype(int), kmeans.labels_)
if ("Class" in X.columns and "Class" in y.columns): if "Class" in X.columns and "Class" in y.columns:
print("Class column already exists") print("Class column already exists")
else: else:
class_labels_df = pd.DataFrame(class_labels, columns=['Class']) class_labels_df = pd.DataFrame(class_labels, columns=["Class"])
X = pd.concat([X, class_labels_df], axis=1) X = pd.concat([X, class_labels_df], axis=1)
y = pd.concat([y, class_labels_df], axis=1) y = pd.concat([y, class_labels_df], axis=1)
self.state["cluster"] = True self.state["cluster"] = True
return X, y return X, y
def balancer(self, X, y, strategy, sample_fraction=0.5): def balancer(self, X, y, strategy, sample_fraction=0.5):
"""Apply sampling strategies to balance the dataset.
Args:
X: design dataset (before the simulation)
y: target dataset (after the simulation)
strategy: Sampling strategy. Choose between "smote" (Synthetic Minority Oversampling Technique), "over" (Oversampling) and "under" (Undersampling).
sample_fraction (float, optional): Define balancer target. Specifies the target fraction of the minority class after the balancing step. Defaults to 0.5.
Returns:
X, y: resampled datasets
"""
number_features = (X.columns != "Class").sum() number_features = (X.columns != "Class").sum()
if("Class" not in X.columns): if "Class" not in X.columns:
if("Class" in y.columns): if "Class" in y.columns:
classes = y['Class'] classes = y["Class"]
else: else:
raise Exception("No class column found") raise Exception("No class column found")
else: else:
classes = X['Class'] classes = X["Class"]
counter = classes.value_counts() counter = classes.value_counts()
print("Amount class 0 before:", counter[0] / (counter[0] + counter[1]) ) print("Amount class 0 before:",
print("Amount class 1 before:", counter[1] / (counter[0] + counter[1]) ) counter[0] / (counter[0] + counter[1]))
df = pd.concat([X.loc[:,X.columns != "Class"], y.loc[:, y.columns != "Class"], classes], axis=1) print("Amount class 1 before:",
counter[1] / (counter[0] + counter[1]))
df = pd.concat(
[
X.loc[:, X.columns != "Class"],
y.loc[:, y.columns != "Class"],
classes,
],
axis=1,
)
if strategy == 'smote': if strategy == "smote":
print("Using SMOTE strategy") print("Using SMOTE strategy")
smote = SMOTE(sampling_strategy=sample_fraction) smote = SMOTE(sampling_strategy=sample_fraction)
df_resampled, classes_resampled = smote.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"]) df_resampled, classes_resampled = smote.fit_resample(
df.loc[:, df.columns != "Class"], df.loc[:,
df.columns == "Class"]
)
elif strategy == 'over': elif strategy == "over":
print("Using Oversampling") print("Using Oversampling")
over = RandomOverSampler() over = RandomOverSampler()
df_resampled, classes_resampled = over.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"]) df_resampled, classes_resampled = over.fit_resample(
df.loc[:, df.columns != "Class"], df.loc[:,
df.columns == "Class"]
)
elif strategy == 'under': elif strategy == "under":
print("Using Undersampling") print("Using Undersampling")
under = RandomUnderSampler() under = RandomUnderSampler()
df_resampled, classes_resampled = under.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"]) df_resampled, classes_resampled = under.fit_resample(
df.loc[:, df.columns != "Class"], df.loc[:,
df.columns == "Class"]
)
else: else:
print("No sampling selected. Output equals input.")
return X, y return X, y
counter = classes_resampled["Class"].value_counts() counter = classes_resampled["Class"].value_counts()
print("Amount class 0 after:", counter[0] / (counter[0] + counter[1]) ) print("Amount class 0 after:", counter[0] / (counter[0] + counter[1]))
print("Amount class 1 after:", counter[1] / (counter[0] + counter[1]) ) print("Amount class 1 after:", counter[1] / (counter[0] + counter[1]))
design_resampled = pd.concat([df_resampled.iloc[:,0:number_features], classes_resampled], axis=1) design_resampled = pd.concat(
target_resampled = pd.concat([df_resampled.iloc[:,number_features:], classes_resampled], axis=1) [df_resampled.iloc[:, 0:number_features], classes_resampled], axis=1
)
target_resampled = pd.concat(
[df_resampled.iloc[:, number_features:], classes_resampled], axis=1
)
self.state['balance'] = True self.state["balance"] = True
return design_resampled, target_resampled return design_resampled, target_resampled
def scale_fit(self, X, y, scaling, type="Standard"):
def scale_fit(self, X, y, scaling, type='Standard'): """Fit a scaler for data preprocessing.
Args:
X: design dataset
y: target dataset
scaling: learn individual scaler for X and y when "individual" is selected or one global scaler on all data in X and y if "global" is selected (scaler_X and scaler_y are equal)
type (str, optional): Using MinMax Scaling or Standarization. Defaults to "Standard".
"""
if type == 'minmax': if type == "minmax":
self.scaler_X = MinMaxScaler() self.scaler_X = MinMaxScaler()
self.scaler_y = MinMaxScaler() self.scaler_y = MinMaxScaler()
elif type == 'standard': elif type == "standard":
self.scaler_X = StandardScaler() self.scaler_X = StandardScaler()
self.scaler_y = StandardScaler() self.scaler_y = StandardScaler()
else: else:
raise Exception("No valid scaler type found") raise Exception("No valid scaler type found")
if scaling == 'individual': if scaling == "individual":
self.scaler_X.fit(X.iloc[:, X.columns != "Class"]) self.scaler_X.fit(X.iloc[:, X.columns != "Class"])
self.scaler_y.fit(y.iloc[:, y.columns != "Class"]) self.scaler_y.fit(y.iloc[:, y.columns != "Class"])
elif scaling == 'global':
self.scaler_X.fit(pd.concat([X.iloc[:, X.columns != "Class"], y.iloc[:, y.columns != "Class"]], axis=0))
self.scaler_y = self.scaler_X
self.state['scale'] = True
def scale_transform(self, X_train, X_test, y_train, y_test):
X_train = pd.concat([self.scaler_X.transform(X_train.loc[:, X_train.columns != "Class"]), X_train.loc[:, "Class"]], axis=1)
X_test = pd.concat([self.scaler_X.transform(X_test.loc[:, X_test.columns != "Class"]), X_test.loc[:, "Class"]], axis=1)
y_train = pd.concat([self.scaler_y.transform(y_train.loc[:, y_train.columns != "Class"]), y_train.loc[:, "Class"]], axis=1) elif scaling == "global":
self.scaler_X.fit(
y_test = pd.concat([self.scaler_y.transform(y_test.loc[:, y_test.columns != "Class"]), y_test.loc[:, "Class"]], axis=1) pd.concat(
[X.iloc[:, X.columns != "Class"],
y.iloc[:, y.columns != "Class"]],
axis=0,
)
)
self.scaler_y = self.scaler_X
self.state["scale"] = True
def scale_transform(self, X_train, X_test, y_train, y_test):
""" Apply learned scaler on datasets.
Args:
X_train: design training data
X_test: test training data
y_train: target training data
y_test: test target data
Returns:
transformed dataframes
"""
X_train = pd.concat(
[
self.scaler_X.transform(
X_train.loc[:, X_train.columns != "Class"]),
X_train.loc[:, "Class"],
],
axis=1,
)
X_test = pd.concat(
[
self.scaler_X.transform(
X_test.loc[:, X_test.columns != "Class"]),
X_test.loc[:, "Class"],
],
axis=1,
)
y_train = pd.concat(
[
self.scaler_y.transform(
y_train.loc[:, y_train.columns != "Class"]),
y_train.loc[:, "Class"],
],
axis=1,
)
y_test = pd.concat(
[
self.scaler_y.transform(
y_test.loc[:, y_test.columns != "Class"]),
y_test.loc[:, "Class"],
],
axis=1,
)
return X_train, X_test, y_train, y_test return X_train, X_test, y_train, y_test
def scale_inverse(self, *args): def scale_inverse(self, *args):
"""Backtransform the dataset
Returns:
Backtransformed data frames
"""
result = [] result = []
for i in args: for i in args:
if "Class" in i.columns: if "Class" in i.columns:
inversed = pd.DataFrame(self.scaler_X.inverse_transform(i.loc[:, i.columns != "Class"]), columns=i.columns[:-1]) inversed = pd.DataFrame(
self.scaler_X.inverse_transform(
i.loc[:, i.columns != "Class"]),
columns=i.columns[:-1],
)
class_column = i.loc[:, "Class"].reset_index(drop=True) class_column = i.loc[:, "Class"].reset_index(drop=True)
i = pd.concat([inversed, class_column], axis=1) i = pd.concat([inversed, class_column], axis=1)
else: else:
i = pd.DataFrame(self.scaler_X.inverse_transform(i), columns=i.columns) i = pd.DataFrame(
self.scaler_X.inverse_transform(i), columns=i.columns)
result.append(i) result.append(i)
return result return result
def split(self, X, y, ratio=0.8): def split(self, X, y, ratio=0.8):
X_train, y_train, X_test, y_test = sk.train_test_split(X, y, test_size = ratio, random_state=self.random_state) X_train, y_train, X_test, y_test = sk.train_test_split(
X, y, test_size=ratio, random_state=self.random_state
)
return X_train, y_train, X_test, y_test return X_train, y_train, X_test, y_test
def class_selection(self, *args, class_label=0): def class_selection(self, *args, class_label=0):
"""Select only rows with specific class label
Args:
Dataframes where rows with specific label should be selected. Defaults to 0.
Returns:
Elements with selected class label.
"""
for i in args: for i in args:
i = i[i['Class'] == class_label] i = i[i["Class"] == class_label]
return args return args