encapsulate functionality

This commit is contained in:
Hannes Signer 2025-01-22 17:36:06 +01:00
parent a4c419c618
commit 78a57654a3
2 changed files with 381 additions and 1181 deletions

File diff suppressed because one or more lines are too long

View File

@ -13,6 +13,7 @@ from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from collections import Counter
import os
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# preprocessing pipeline
#
@ -46,18 +47,19 @@ class FuncTransform():
self.func_transform = func_transform
self.func_inverse = func_inverse
def fit(self, X):
def fit(self, X, y=None):
return self
def transform(self, X):
def transform(self, X, y=None):
X = X.copy()
for key in X.keys():
if "Class" not in key:
X[key] = X[key].apply(self.func_transform[key])
return X
def fit_transform(self, X):
return self.fit(X).transform(X)
def fit_transform(self, X, y=None):
self.fit(X)
return self.transform(X, y)
def inverse_transform(self, X_log):
X_log = X_log.copy()
@ -66,38 +68,112 @@ class FuncTransform():
X_log[key] = X_log[key].apply(self.func_inverse[key])
return X_log
class DataSetSampling():
def __init__(self, X, y, sampling_strategy):
self.X = X
self.y = y
self.sampling_strategy = sampling_strategy
def clustering(X, n_clusters=2, random_state=42, x_length=50, y_length=50):
'''
Function to cluster data with KMeans.
'''
class_labels = np.array([])
grid_length = x_length * y_length
iterations = int(len(X) / grid_length)
for i in range(0, iterations):
field = np.array(X['Barite'][(i*grid_length):(i*grid_length+grid_length)]
).reshape(x_length, y_length)
kmeans = KMeans(n_clusters=n_clusters, random_state=random_state).fit(
field.reshape(-1, 1))
class_labels = np.append(class_labels.astype(int), kmeans.labels_)
def fit(self, X):
pass
if("Class" in X.columns and "Class" in X.columns):
print("Class column already exists")
else:
class_labels_df = pd.DataFrame(class_labels, columns=['Class'])
X_clustered = pd.concat([X, class_labels_df], axis=1)
def transform(self):
pass
class Scaling():
def __init__(self, X, scaling_strategy):
self.X = X
self.scaler = scaling_strategy
return X_clustered
def balancer(design, target, strategy, sample_fraction=0.5):
def fit(self, X):
pass
def transform(self):
pass
def fit_transform(self, X):
pass
def inverse_transform(self, X):
pass
number_features = (design.columns != "Class").sum()
if("Class" not in design.columns):
if("Class" in target.columns):
classes = target['Class']
else:
raise Exception("No class column found")
else:
classes = design['Class']
counter = classes.value_counts()
print("Amount class 0 before:", counter[0] / (counter[0] + counter[1]) )
print("Amount class 1 before:", counter[1] / (counter[0] + counter[1]) )
df = pd.concat([design.loc[:,design.columns != "Class"], target.loc[:, target.columns != "Class"], classes], axis=1)
if strategy == 'smote':
print("Using SMOTE strategy")
smote = SMOTE(sampling_strategy=sample_fraction)
df_resampled, classes_resampled = smote.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df.columns == "Class"])
elif strategy == 'over':
print("Using Oversampling")
over = RandomOverSampler()
df_resampled, classes_resampled = over.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df.columns == "Class"])
elif strategy == 'under':
print("Using Undersampling")
under = RandomUnderSampler()
df_resampled, classes_resampled = under.fit_resample(df.loc[:, df.columns != "Class"], df.loc[:, df.columns == "Class"])
else:
classes_resampled = classes
counter = classes_resampled["Class"].value_counts()
print("Amount class 0 after:", counter[0] / (counter[0] + counter[1]) )
print("Amount class 1 after:", counter[1] / (counter[0] + counter[1]) )
design_resampled = pd.concat([df_resampled.iloc[:,0:number_features], classes_resampled], axis=1)
target_resampled = pd.concat([df_resampled.iloc[:,number_features:], classes_resampled], axis=1)
return design_resampled, target_resampled
def plot_simulation(X, timestep, component='Barite', x_length=50, y_length=50):
grid_length = x_length * y_length
max_iter = int(len(X) / grid_length)
if(timestep >= max_iter):
raise Exception("timestep is not in the simulation range")
plt.imshow(np.array(X[component][(timestep*grid_length):(timestep*grid_length+grid_length)]).reshape(x_length,y_length), interpolation='bicubic', origin='lower')
if("Class" in X.columns):
plt.contour(np.array(X['Class'][(timestep*grid_length):(timestep*grid_length+grid_length)]).reshape(x_length,y_length), levels=[0.1], colors='red', origin='lower')
plt.show()
def preprocessing(df_design, df_targets, func_dict_in, func_dict_out, sampling, test_size):
df_design = clustering(df_design)
df_design_log = FuncTransform(func_dict_in, func_dict_out).fit_transform(df_design)
df_results_log = FuncTransform(func_dict_in, func_dict_out).fit_transform(df_targets)
X_train, X_test, y_train, y_test = sk.train_test_split(df_design_log, df_results_log, test_size = test_size, random_state=42)
X_train, y_train = balancer(X_train, y_train, sampling)
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X_train = scaler_X.fit_transform(X_train)
X_test = scaler_X.transform(X_test)
y_train = scaler_y.fit_transform(y_train)
y_test = scaler_y.transform(y_test)
X_train, X_val, y_train, y_val = sk.train_test_split(X_train, y_train, test_size = 0.1)
return X_train, X_val, X_test, y_train, y_val, y_test