import numpy as np from tensorflow.keras.models import load_model import joblib class RTUAnomalizer: model = None kmeans_models = [] def __init__( self, prediction_model_path=None, clustering_model_paths=None, num_inputs=None, num_outputs=None, ): self.num_inputs = num_inputs self.num_outputs = num_outputs if not prediction_model_path is None and not clustering_model_paths is None: self.load_models(prediction_model_path, clustering_model_paths) def initialize_lists(self, size=30): initial_values = [0] * size return initial_values.copy(), initial_values.copy(), initial_values.copy() def load_models(self, prediction_model_path, clustering_model_paths): self.model = load_model(prediction_model_path) for path in clustering_model_paths: self.kmeans_models.append(joblib.load(path)) def predict(self, df_new): return self.model.predict(df_new) def calculate_residuals(self, df_trans, pred): actual = df_trans[30, : self.num_outputs] resid = actual - pred return actual, resid def resize_prediction(self, pred, df_trans): pred = np.resize( pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :])) ) pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[ 30, self.num_outputs : ] return pred def inverse_transform(self, scaler, pred, df_trans): pred = scaler.inverse_transform(np.array(pred)) actual = scaler.inverse_transform(np.array([df_trans[30, :]])) return actual, pred def update_lists(self, actual_list, pred_list, resid_list, actual, pred, resid): actual_list.pop(0) pred_list.pop(0) resid_list.pop(0) actual_list.append(actual[0, 1]) pred_list.append(pred[0, 1]) resid_list.append(resid[0, 1]) return actual_list, pred_list, resid_list def calculate_distances(self, resid): dist = [] for i, model in enumerate(self.kmeans_models): dist.append( np.linalg.norm( resid[:, (i * 7) + 1 : (i * 7) + 8] - model.cluster_centers_[0], ord=2, axis=1, ) ) return np.array(dist) def pipeline(self, df_new, df_trans, scaler): actual_list, pred_list, resid_list = self.initialize_lists() pred = self.predict(df_new) actual, resid = self.calculate_residuals(df_trans, pred) pred = self.resize_prediction(pred, df_trans) actual, pred = self.inverse_transform(scaler, pred, df_trans) actual_list, pred_list, resid_list = self.update_lists( actual_list, pred_list, resid_list, actual, pred, resid ) dist = self.calculate_distances(resid) return actual_list, pred_list, resid_list, dist