adapt notebook for new dataset

This commit is contained in:
Hannes Signer 2025-03-26 13:35:17 +01:00
parent c7b89505c2
commit 52940efdb9
2 changed files with 233 additions and 458 deletions

File diff suppressed because one or more lines are too long

View File

@ -21,7 +21,7 @@ from importlib import reload
set_config(transform_output="pandas")
def model_definition(architecture):
def model_definition(architecture, n_input, n_output):
"""Definition of the respective AI model. Three models are currently being analysed, which are labelled small, large or paper.
Args:
@ -34,66 +34,71 @@ def model_definition(architecture):
if architecture == "small":
model = keras.Sequential(
[
keras.Input(shape=(8,), dtype=dtype),
keras.Input(shape=(n_input,), dtype=dtype),
keras.layers.Dense(units=128, dtype=dtype),
LeakyReLU(negative_slope=0.01),
# Dropout(0.2),
keras.layers.Dense(units=128, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(units=8, dtype=dtype),
keras.layers.Dense(units=n_output, dtype=dtype),
]
)
elif architecture == "large":
model = keras.Sequential(
[
keras.layers.Input(shape=(8,), dtype=dtype),
keras.layers.Input(shape=(n_input,), dtype=dtype),
keras.layers.Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(1024, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(8, dtype=dtype),
keras.layers.Dense(n_output, dtype=dtype),
]
)
elif architecture == "large_batch_normalization":
model = keras.Sequential([
keras.layers.Input(shape=(8,), dtype=dtype),
BatchNormalization(),
keras.layers.Input(shape=(n_input,), dtype=dtype),
BatchNormalization(),
Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
# BatchNormalization(),
Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
Dropout(0.05),
BatchNormalization(),
Dense(1024, dtype=dtype),
LeakyReLU(negative_slope=0.01),
# BatchNormalization(),
Dense(1024, dtype=dtype),
LeakyReLU(negative_slope=0.01),
Dropout(0.05),
BatchNormalization(),
Dense(512, dtype=dtype),
LeakyReLU(negative_slope=0.01),
Dense(512, dtype=dtype),
Dropout(0.05),
LeakyReLU(negative_slope=0.01),
Dense(8, dtype=dtype),
Dense(n_output, dtype=dtype),
])
elif architecture == "large_self_normalization":
model = keras.Sequential([
keras.layers.Input(shape=(8,), dtype=dtype),
Dense(512, activation='selu', kernel_initializer='lecun_normal', dtype=dtype),
AlphaDropout(0.05),
Dense(1024, activation='selu', kernel_initializer='lecun_normal',dtype=dtype),
AlphaDropout(0.05),
Dense(512, activation='selu', kernel_initializer='lecun_normal',dtype=dtype),
AlphaDropout(0.05),
Dense(8, dtype=dtype),
keras.layers.Input(shape=(n_input,), dtype=dtype),
Dense(512, activation='selu',
kernel_initializer='lecun_normal', dtype=dtype),
AlphaDropout(0.05),
Dense(1024, activation='selu',
kernel_initializer='lecun_normal', dtype=dtype),
AlphaDropout(0.05),
Dense(512, activation='selu',
kernel_initializer='lecun_normal', dtype=dtype),
AlphaDropout(0.05),
Dense(n_output, dtype=dtype),
])
elif architecture == "paper":
model = keras.Sequential(
[
keras.layers.Input(shape=(8,), dtype=dtype),
keras.layers.Input(shape=(n_input,), dtype=dtype),
keras.layers.Dense(128, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(256, dtype=dtype),
@ -102,14 +107,13 @@ def model_definition(architecture):
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(256, dtype=dtype),
LeakyReLU(negative_slope=0.01),
keras.layers.Dense(8, dtype=dtype),
keras.layers.Dense(n_output, dtype=dtype),
]
)
else:
raise Exception(
"No valid architecture found."
+ "Choose between 'small', 'large' or 'paper'."
)
return model
@ -139,7 +143,7 @@ def custom_loss(
h1: hyperparameter for the importance of the huber loss
h2: hyperparameter for the importance of the Barium mass balance term
h3: hyperparameter for the importance of the Strontium mass balance term
scaler_type: Normalization approach. Choose between "standard" and "minmax". Defaults to "minmax".
scaler_type: Normalization approach. Choose between "standard", "minmax" and "none". Defaults to "minmax".
loss_variant: Loss function approach. Choose between "huber and "huber_mass_balance". Defaults to "huber".
delta: Hyperparameter for the Huber function threshold. Defaults to 1.0.
@ -152,37 +156,31 @@ def custom_loss(
if preprocess.scaler_type != scaler_type:
raise Exception(
"Data normalized with scaler different than specified for the training. Compare the scaling approach on preprocessing and training.")
"Data normalized with scaler different than specified for the training. Compare the scaling approach on preprocessing and training. Scaler type on preprocessing was {0} and on training {1}.".format(
preprocess.scaler_type, scaler_type))
try:
if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(
preprocess.scaler_X.data_range_, dtype=tf.float32
)
min_X = tf.convert_to_tensor(
preprocess.scaler_X.data_min_, dtype=tf.float32
)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.data_range_, dtype=tf.float32
)
min_y = tf.convert_to_tensor(
preprocess.scaler_y.data_min_, dtype=tf.float32
)
elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(
preprocess.scaler_X.scale_, dtype=tf.float32)
mean_X = tf.convert_to_tensor(
preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(
preprocess.scaler_y.mean_, dtype=tf.float32)
except AttributeError:
raise Exception(
"Data normalized with scaler different than specified for the training. Compare the scaling approach on preprocessing and training."
if scaler_type == "minmax":
scale_X = tf.convert_to_tensor(
preprocess.scaler_X.data_range_, dtype=tf.float32
)
min_X = tf.convert_to_tensor(
preprocess.scaler_X.data_min_, dtype=tf.float32
)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.data_range_, dtype=tf.float32
)
min_y = tf.convert_to_tensor(
preprocess.scaler_y.data_min_, dtype=tf.float32
)
elif scaler_type == "standard":
scale_X = tf.convert_to_tensor(
preprocess.scaler_X.scale_, dtype=tf.float32)
mean_X = tf.convert_to_tensor(
preprocess.scaler_X.mean_, dtype=tf.float32)
scale_y = tf.convert_to_tensor(
preprocess.scaler_y.scale_, dtype=tf.float32)
mean_y = tf.convert_to_tensor(
preprocess.scaler_y.mean_, dtype=tf.float32)
def loss(results, predicted):
# inverse min/max scaling
@ -358,11 +356,11 @@ def mass_balance_evaluation(model, X, preprocess):
prediction = pd.DataFrame(model.predict(X[columns]), columns=columns)
# backtransform min/max or standard scaler
if preprocess.scaler_X is None:
X = pd.DataFrame(
preprocess.scaler_X.inverse_transform(X.iloc[:, X.columns != "Class"]),
columns=columns,
preprocess.scaler_X.inverse_transform(
X.iloc[:, X.columns != "Class"]),
columns=columns,
)
prediction = pd.DataFrame(
preprocess.scaler_y.inverse_transform(prediction), columns=columns
@ -429,6 +427,7 @@ class preprocessing:
self.func_dict_out = func_dict_out if func_dict_out is not None else None
self.state = {"cluster": False, "log": False,
"balance": False, "scale": False}
self.scaler_type = "none"
def funcTranform(self, *args):
"""Apply the transformation function to the data columnwise.
@ -456,7 +455,7 @@ class preprocessing:
self.state["log"] = False
return args
def cluster(self, X, y, species="Barite", n_clusters=2, x_length=50, y_length=50):
def cluster_kmeans(self, X, y, species="Barite", n_clusters=2, x_length=50, y_length=50):
"""Apply k-means clustering to the data to differentiate betweeen reactive and non-reactive cells.
Args:
@ -470,10 +469,10 @@ class preprocessing:
Returns:
X, y dataframes with an additional column "Class" containing the cluster labels.
"""
class_labels = np.array([])
grid_length = x_length * y_length
iterations = int(len(X) / grid_length)
# calculate the cluster for each chemical iteration step
for i in range(0, iterations):
field = np.array(
@ -483,7 +482,6 @@ class preprocessing:
field.reshape(-1, 1)
)
class_labels = np.append(class_labels.astype(int), kmeans.labels_)
if "Class" in X.columns and "Class" in y.columns:
print("Class column already exists")
else:
@ -494,6 +492,22 @@ class preprocessing:
return X, y
def cluster_manual(self, X, y, species="Cl", threshold=1E-10):
if "Class" in X.columns or "Class" in y.columns:
raise Exception("Class column already exists")
label = np.zeros(len(X))
label[X[species] > threshold] = 1
X["Class"] = label
y["Class"] = label
return X, y
def balancer(self, X, y, strategy, sample_fraction=0.5):
"""Apply sampling strategies to balance the dataset.
@ -570,7 +584,7 @@ class preprocessing:
self.state["balance"] = True
return design_resampled, target_resampled
def scale_fit(self, X, y, scaling, type="Standard"):
def scale_fit(self, X, y, scaling, type="standard"):
self.scaler_type = type
"""Fit a scaler for data preprocessing.
@ -697,7 +711,8 @@ class preprocessing:
Returns:
Elements with selected class label.
"""
result = []
for i in args:
i = i[i["Class"] == class_label]
result.append(i[i["Class"] == class_label])
return args
return result