import numpy as np from tensorflow.keras.models import load_model import joblib class RTUAnomalizer1: """ Class for performing anomaly detection on RTU (Roof Top Unit) data. """ def __init__( self, prediction_model_path=None, clustering_model_paths=None, pca_model_paths=None, num_inputs=None, num_outputs=None, ): """ Initialize the RTUAnomalizer object. Args: prediction_model_path (str): Path to the prediction model file. clustering_model_paths (list): List of paths to the clustering model files. num_inputs (int): Number of input features. num_outputs (int): Number of output features. """ self.model = None self.kmeans_models = [] self.pca_models = [] self.num_inputs = num_inputs self.num_outputs = num_outputs if ( prediction_model_path is not None and clustering_model_paths is not None and pca_model_paths is not None ): self.load_models( prediction_model_path, clustering_model_paths, pca_model_paths ) self.actual_list, self.pred_list, self.resid_list, self.resid_pca_list, self.distance_list = ( self.initialize_lists() ) self.fault_1 = 0 self.fault_2 = 0 def initialize_lists(self, size=30): """ Initialize lists for storing actual, predicted, and residual values. Args: size (int): Size of the lists. Returns: tuple: A tuple containing three lists initialized with zeros. """ initial_values = [[0] * self.num_outputs] * size initial_values1 = [[0] * 4] * size initial_values2 = [[0] * 2] * 60 return ( initial_values.copy(), initial_values.copy(), initial_values.copy(), initial_values1.copy(), initial_values2.copy(), ) def load_models( self, prediction_model_path, clustering_model_paths, pca_model_paths ): """ Load the prediction and clustering models. Args: prediction_model_path (str): Path to the prediction model file. clustering_model_paths (list): List of paths to the clustering model files. """ self.model = load_model(prediction_model_path) for path in clustering_model_paths: self.kmeans_models.append(joblib.load(path)) for path in pca_model_paths: self.pca_models.append(joblib.load(path)) def predict(self, df_new): """ Make predictions using the prediction model. Args: df_new (DataFrame): Input data for prediction. Returns: array: Predicted values. """ return self.model.predict(df_new, verbose=0) def calculate_residuals(self, df_trans, pred): """ Calculate the residuals between actual and predicted values. Args: df_trans (DataFrame): Transformed input data. pred (array): Predicted values. Returns: tuple: A tuple containing the actual values and residuals. """ actual = df_trans[30, : self.num_outputs] resid = actual - pred return actual, resid def resize_prediction(self, pred, df_trans): """ Resize the predicted values to match the shape of the transformed input data. Args: pred (array): Predicted values. df_trans (DataFrame): Transformed input data. Returns: array: Resized predicted values. """ pred = np.resize( pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :])) ) pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[ 30, self.num_outputs : ] return pred def inverse_transform(self, scaler, pred, df_trans): """ Inverse transform the predicted and actual values. Args: scaler (object): Scaler object for inverse transformation. pred (array): Predicted values. df_trans (DataFrame): Transformed input data. Returns: tuple: A tuple containing the actual and predicted values after inverse transformation. """ pred = scaler.inverse_transform(np.array(pred)) actual = scaler.inverse_transform(np.array([df_trans[30, :]])) return actual, pred def update_lists(self, actual, pred, resid): """ Update the lists of actual, predicted, and residual values. Args: actual_list (list): List of actual values. pred_list (list): List of predicted values. resid_list (list): List of residual values. actual (array): Actual values. pred (array): Predicted values. resid (array): Residual values. Returns: tuple: A tuple containing the updated lists of actual, predicted, and residual values. """ self.actual_list.pop(0) self.pred_list.pop(0) self.resid_list.pop(0) self.actual_list.append(actual.flatten().tolist()) self.pred_list.append(pred.flatten().tolist()) self.resid_list.append(resid.flatten().tolist()) return self.actual_list, self.pred_list, self.resid_list def calculate_distances(self, resid): """ Calculate the distances between residuals and cluster centers. Args: resid (array): Residual values. Returns: array: Array of distances. """ dist = [] resid_pcas = [] for i, model in enumerate(self.kmeans_models): resid_pca = self.pca_models[i].transform( resid[:, (i * 7) + 1 : (i * 7) + 8] ) resid_pcas = resid_pcas + resid_pca.tolist() dist.append( np.linalg.norm( resid_pca - model.cluster_centers_[0], ord=2, axis=1, ) ) self.distance_list.pop(0) self.distance_list.append(np.concatenate(dist).tolist()) resid_pcas = np.array(resid_pcas).flatten().tolist() self.resid_pca_list.pop(0) self.resid_pca_list.append(resid_pcas) return np.array(dist) def fault_windowing(self): rtu_1_dist = np.array(self.distance_list).T[0]>1 #rtu_1_threshold rtu_1_dist = [int(x) for x in rtu_1_dist] if sum(rtu_1_dist)>0.8*60: # 80% of the 60 min window self.fault_1 = 1 else: self.fault_1 = 0 rtu_2_dist = np.array(self.distance_list).T[1]>2.5 #rtu_2_threshold rtu_2_dist = [int(x) for x in rtu_2_dist] if sum(rtu_2_dist)>0.8*60: # 80% of the 60 min window self.fault_2 = 1 else: self.fault_2 = 0 def pipeline(self, df_new, df_trans, scaler): """ Perform the anomaly detection pipeline. Args: df_new (DataFrame): Input data for prediction. df_trans (DataFrame): Transformed input data. scaler (object): Scaler object for inverse transformation. Returns: tuple: A tuple containing the lists of actual, predicted, and residual values, and the distances. """ pred = self.predict(df_new) actual, resid = self.calculate_residuals(df_trans, pred) pred = self.resize_prediction(pred, df_trans) actual, pred = self.inverse_transform(scaler, pred, df_trans) actual_list, pred_list, resid_list = self.update_lists(actual, pred, resid) dist = self.calculate_distances(resid) self.fault_windowing() return ( actual_list, pred_list, resid_list, self.resid_pca_list, dist, np.array(self.distance_list[30:]).T[0]>1, #rtu_1_threshold np.array(self.distance_list[30:]).T[1]>2.5, #rtu_2_threshold self.fault_1, self.fault_2 )