model-training/src/preprocessing.py
2025-02-19 17:58:47 +01:00

383 lines
15 KiB
Python

import keras
from keras.layers import Dense, Dropout, Input,BatchNormalization, LeakyReLU
import tensorflow as tf
import h5py
import numpy as np
import pandas as pd
import time
import sklearn.model_selection as sk
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from collections import Counter
import os
from preprocessing import *
from sklearn import set_config
from importlib import reload
set_config(transform_output = "pandas")
# preprocessing pipeline
#
def Safelog(val):
# get range of vector
if val > 0:
return np.log10(val)
elif val < 0:
return -np.log10(-val)
else:
return 0
def Safeexp(val):
if val > 0:
return -10 ** -val
elif val < 0:
return 10 ** val
else:
return 0
def model_definition(architecture):
dtype = "float32"
if architecture == "small":
model = keras.Sequential(
[
keras.Input(shape=(8,), dtype="float32"),
keras.layers.Dense(units=128, dtype="float32"),
LeakyReLU(negative_slope=0.01),
# Dropout(0.2),
keras.layers.Dense(units=128, dtype="float32"),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(units=8, dtype="float32")
]
)
elif architecture == "large":
model = keras.Sequential(
[
keras.layers.Input(shape=(8,), dtype=dtype),
keras.layers.Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(1024, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(8, dtype=dtype)
]
)
elif architecture == "paper":
model = keras.Sequential(
[keras.layers.Input(shape=(8,), dtype=dtype),
keras.layers.Dense(128, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(256, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(256, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(8, dtype=dtype)
])
return model
@keras.saving.register_keras_serializable()
def custom_loss(preprocess, column_dict, h1, h2, h3, scaler_type="minmax", loss_variant="huber", delta=1.0):
# extract the scaling parameters
if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32)
min_X = tf.convert_to_tensor(preprocess.scaler_X.min_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32)
min_y = tf.convert_to_tensor(preprocess.scaler_y.min_, dtype=tf.float32)
elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32)
mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32)
def loss(results, predicted):
# inverse min/max scaling
if scaler_type == "minmax":
predicted_inverse = predicted * scale_y + min_y
results_inverse = results * scale_X + min_X
elif scaler_type == "standard":
predicted_inverse = predicted * scale_y + mean_y
results_inverse = results * scale_X + mean_X
# mass balance
dBa = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]]) -
(results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]])
)
dSr = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]]) -
(results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]])
)
# H/O ratio has to be 2
# h2o_ratio = tf.keras.backend.abs(
# (predicted_inverse[:, column_dict["H"]] / predicted_inverse[:, column_dict["O"]]) - 2
# )
# huber loss
huber_loss = tf.keras.losses.Huber(delta)(results, predicted)
# total loss
if loss_variant == "huber":
total_loss = huber_loss
elif loss_variant == "huber_mass_balance":
total_loss = h1 * huber_loss + h2 * dBa + h3 * dSr
return total_loss
return loss
def mass_balance_metric(preprocess, column_dict, scaler_type="minmax"):
if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32)
min_X = tf.convert_to_tensor(preprocess.scaler_X.min_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32)
min_y = tf.convert_to_tensor(preprocess.scaler_y.min_, dtype=tf.float32)
elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32)
mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32)
def mass_balance(results, predicted):
# inverse min/max scaling
if scaler_type == "minmax":
predicted_inverse = predicted * scale_y + min_y
results_inverse = results * scale_X + min_X
elif scaler_type == "standard":
predicted_inverse = predicted * scale_y + mean_y
results_inverse = results * scale_X + mean_X
# mass balance
dBa = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Ba"]] + predicted_inverse[:, column_dict["Barite"]]) -
(results_inverse[:, column_dict["Ba"]] + results_inverse[:, column_dict["Barite"]])
)
dSr = tf.keras.backend.abs(
(predicted_inverse[:, column_dict["Sr"]] + predicted_inverse[:, column_dict["Celestite"]]) -
(results_inverse[:, column_dict["Sr"]] + results_inverse[:, column_dict["Celestite"]])
)
return tf.reduce_mean(dBa + dSr)
return mass_balance
def huber_metric(preprocess, scaler_type="minmax", delta=1.0):
if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32)
min_X = tf.convert_to_tensor(preprocess.scaler_X.min_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32)
min_y = tf.convert_to_tensor(preprocess.scaler_y.min_, dtype=tf.float32)
elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(preprocess.scaler_X.scale_, dtype=tf.float32)
mean_X = tf.convert_to_tensor(preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(preprocess.scaler_y.mean_, dtype=tf.float32)
def huber(results, predicted):
if scaler_type == "minmax":
predicted_inverse = predicted * scale_y + min_y
results_inverse = results * scale_X + min_X
elif scaler_type == "standard":
predicted_inverse = predicted * scale_y + mean_y
results_inverse = results * scale_X + mean_X
huber_loss = tf.keras.losses.Huber(delta)(results, predicted)
return huber_loss
return huber
def mass_balance_evaluation(model, X, preprocess):
# predict the chemistry
columns = X.iloc[:, X.columns != "Class"].columns
prediction = pd.DataFrame(model.predict(X[columns]), columns=columns)
# backtransform min/max or standard scaler
X = pd.DataFrame(preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]), columns=columns)
prediction = pd.DataFrame(preprocess.scaler_y.inverse_transform(prediction), columns=columns)
# calculate mass balance
dBa = np.abs((prediction["Ba"] + prediction["Barite"]) - (X["Ba"] + X["Barite"]))
print(dBa.min())
dSr = np.abs((prediction["Sr"] + prediction["Celestite"]) - (X["Sr"] + X["Celestite"]))
print(dSr.min())
return dBa+dSr
class preprocessing:
def __init__(self, func_dict_in=None, func_dict_out=None, random_state=42):
self.random_state = random_state
self.scaler_X = None
self.scaler_y = None
self.func_dict_in = None
self.func_dict_in = func_dict_in if func_dict_in is not None else None
self.func_dict_out = func_dict_out if func_dict_out is not None else None
self.state = {"cluster": False, "log": False, "balance": False, "scale": False}
def funcTranform(self, X, y):
for key in X.keys():
if "Class" not in key:
X[key] = X[key].apply(self.func_dict_in[key])
y[key] = y[key].apply(self.func_dict_in[key])
self.state["log"] = True
return X, y
def funcInverse(self, X, y):
for key in X.keys():
if "Class" not in key:
X[key] = X[key].apply(self.func_dict_out[key])
y[key] = y[key].apply(self.func_dict_out[key])
self.state["log"] = False
return X, y
def cluster(self, X, y, species='Barite', n_clusters=2, x_length=50, y_length=50):
class_labels = np.array([])
grid_length = x_length * y_length
iterations = int(len(X) / grid_length)
for i in range(0, iterations):
field = np.array(X[species][(i*grid_length):(i*grid_length+grid_length)]
).reshape(x_length, y_length)
kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state).fit(field.reshape(-1, 1))
class_labels = np.append(class_labels.astype(int), kmeans.labels_)
if ("Class" in X.columns and "Class" in y.columns):
print("Class column already exists")
else:
class_labels_df = pd.DataFrame(class_labels, columns=['Class'])
X = pd.concat([X, class_labels_df], axis=1)
y = pd.concat([y, class_labels_df], axis=1)
self.state["cluster"] = True
return X, y
def balancer(self, X, y, strategy, sample_fraction=0.5):
number_features = (X.columns != "Class").sum()
if("Class" not in X.columns):
if("Class" in y.columns):
classes = y['Class']
else:
raise Exception("No class column found")
else:
classes = X['Class']
counter = classes.value_counts()
print("Amount class 0 before:", counter[0] / (counter[0] + counter[1]) )
print("Amount class 1 before:", counter[1] / (counter[0] + counter[1]) )
df = pd.concat([X.loc[:,X.columns != "Class"], y.loc[:, y.columns != "Class"], classes], axis=1)
if strategy == 'smote':
print("Using SMOTE strategy")
smote = SMOTE(sampling_strategy=sample_fraction)
df_resampled, classes_resampled = smote.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"])
elif strategy == 'over':
print("Using Oversampling")
over = RandomOverSampler()
df_resampled, classes_resampled = over.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"])
elif strategy == 'under':
print("Using Undersampling")
under = RandomUnderSampler()
df_resampled, classes_resampled = under.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df. columns == "Class"])
else:
return X, y
counter = classes_resampled["Class"].value_counts()
print("Amount class 0 after:", counter[0] / (counter[0] + counter[1]) )
print("Amount class 1 after:", counter[1] / (counter[0] + counter[1]) )
design_resampled = pd.concat([df_resampled.iloc[:,0:number_features], classes_resampled], axis=1)
target_resampled = pd.concat([df_resampled.iloc[:,number_features:], classes_resampled], axis=1)
self.state['balance'] = True
return design_resampled, target_resampled
def scale_fit(self, X, y, scaling, type='Standard'):
if type == 'minmax':
self.scaler_X = MinMaxScaler()
self.scaler_y = MinMaxScaler()
elif type == 'standard':
self.scaler_X = StandardScaler()
self.scaler_y = StandardScaler()
else:
raise Exception("No valid scaler type found")
if scaling == 'individual':
self.scaler_X.fit(X.iloc[:, X.columns != "Class"])
self.scaler_y.fit(y.iloc[:, y.columns != "Class"])
elif scaling == 'global':
self.scaler_X.fit(pd.concat([X.iloc[:, X.columns != "Class"], y.iloc[:, y.columns != "Class"]], axis=0))
self.scaler_y = self.scaler_X
self.state['scale'] = True
def scale_transform(self, X_train, X_test, y_train, y_test):
X_train = pd.concat([self.scaler_X.transform(X_train.loc[:, X_train.columns != "Class"]), X_train.loc[:, "Class"]], axis=1)
X_test = pd.concat([self.scaler_X.transform(X_test.loc[:, X_test.columns != "Class"]), X_test.loc[:, "Class"]], axis=1)
y_train = pd.concat([self.scaler_y.transform(y_train.loc[:, y_train.columns != "Class"]), y_train.loc[:, "Class"]], axis=1)
y_test = pd.concat([self.scaler_y.transform(y_test.loc[:, y_test.columns != "Class"]), y_test.loc[:, "Class"]], axis=1)
return X_train, X_test, y_train, y_test
def scale_inverse(self, X):
if("Class" in X.columns):
X = pd.concat([self.scaler_X.inverse_transform(X.loc[:, X.columns != "Class"]), X.loc[:, "Class"]], axis=1)
else:
X = self.scaler_X.inverse_transform(X)
return X
def split(self, X, y, ratio=0.8):
X_train, y_train, X_test, y_test = sk.train_test_split(X, y, test_size = ratio, random_state=self.random_state)
return X_train, y_train, X_test, y_test