import numpy as np from sklearn.cluster import KMeans from sklearn.decomposition import PCA from tensorflow.keras.models import load_model import joblib class VAVAnomalizer: def __init__( self, rtu_id, prediction_model_path, clustering_model_path, pca_model_path, num_inputs, num_outputs, ): """ Initializes a VAVAnomalizer object. Args: rtu_id (int): The ID of the RTU (Roof Top Unit) associated with the VAV (Variable Air Volume) system. prediction_model_path (str): The file path to the prediction model. pca_model_path (str): The file path to the PCA model. clustering_model_path (str): The file path to the clustering model. num_inputs (int): The number of input features for the prediction model. num_outputs (int): The number of output features for the prediction model. """ self.rtu_id = rtu_id self.num_inputs = num_inputs self.num_outputs = num_outputs self.load_models(prediction_model_path, clustering_model_path, pca_model_path) self.actual_list, self.pred_list, self.resid_list, self.resid_pca_list = ( self.initialize_lists() ) def load_models(self, prediction_model_path, clustering_model_path, pca_model_path): """ Loads the prediction model and clustering model. Args: prediction_model_path (str): The file path to the prediction model. pca_model_path (str): The file path to the PCA model. clustering_model_path (str): The file path to the clustering model. """ self.model = load_model(prediction_model_path) self.pca_model: PCA = joblib.load(pca_model_path) self.kmeans_model: KMeans = joblib.load(clustering_model_path) def initialize_lists(self, size=30): """ Initialize lists for storing actual, predicted, and residual values. Args: size (int): Size of the lists. Returns: tuple: A tuple containing three lists initialized with zeros. """ initial_values = [[0] * self.num_outputs] * size initial_values1 = [[0] * 2] * size return ( initial_values.copy(), initial_values.copy(), initial_values.copy(), initial_values1.copy(), ) def predict(self, df_new): """ Makes predictions using the prediction model. Args: df_new (numpy.ndarray): The new data for prediction. Returns: numpy.ndarray: The predicted values. """ return self.model.predict(df_new) def calculate_residuals(self, df_trans, pred): """ Calculates the residuals between the actual values and the predicted values. Args: df_trans (numpy.ndarray): The transformed data. pred (numpy.ndarray): The predicted values. Returns: numpy.ndarray: The actual values. numpy.ndarray: The residuals. """ actual = df_trans[30, : self.num_outputs] resid = pred - actual return actual, resid def calculate_distances(self, resid): """ Calculate the distances between residuals and cluster centers. Args: resid (array): Residual values. Returns: array: Array of distances. """ dist = [] dist.append( np.linalg.norm( self.pca_model.transform(resid.reshape(1, -1)) - self.kmeans_model.cluster_centers_[0] ) ) return np.array(dist) def residual_pca(self, resid): """ Perform PCA on the residuals. Args: resid (array): Residual values. Returns: array: Transformed residuals. """ return self.pca_model.transform(resid.reshape(1, -1)) def resize_prediction(self, pred, df_trans): """ Resize the predicted values to match the shape of the transformed input data. Args: pred (array): Predicted values. df_trans (DataFrame): Transformed input data. Returns: array: Resized predicted values. """ pred = np.resize( pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :])) ) pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[ 30, self.num_outputs : ] return pred def inverse_transform(self, scaler, pred, df_trans): """ Inverse transform the predicted and actual values. Args: scaler (object): Scaler object for inverse transformation. pred (array): Predicted values. df_trans (DataFrame): Transformed input data. Returns: tuple: A tuple containing the actual and predicted values after inverse transformation. """ pred = scaler.inverse_transform(np.array(pred)) actual = scaler.inverse_transform(np.array([df_trans[30, :]])) return actual, pred def update_lists(self, actual, pred, resid, resid_pca): """ Update the lists of actual, predicted, and residual values. Args: actual_list (list): List of actual values. pred_list (list): List of predicted values. resid_list (list): List of residual values. resid_pca_list (list): List of PCA-transformed residual values. actual (array): Actual values. pred (array): Predicted values. resid (array): Residual values. Returns: tuple: A tuple containing the updated lists of actual, predicted, and residual values. """ self.actual_list.pop(0) self.pred_list.pop(0) self.resid_list.pop(0) self.resid_pca_list.pop(0) self.actual_list.append(actual.flatten().tolist()) self.pred_list.append(pred.flatten().tolist()) self.resid_list.append(resid.flatten().tolist()) self.resid_pca_list.append(resid_pca.flatten().tolist()) return self.actual_list, self.pred_list, self.resid_list, self.resid_pca_list def pipeline(self, df_new, df_trans, scaler): """ Perform the anomaly detection pipeline. Args: df_new (DataFrame): Input data for prediction. df_trans (DataFrame): Transformed input data. scaler (object): Scaler object for inverse transformation. Returns: tuple: A tuple containing the lists of actual, predicted, and residual values, and the distances. """ pred = self.predict(df_new) actual, resid = self.calculate_residuals(df_trans, pred) pred = self.resize_prediction(pred, df_trans) actual, pred = self.inverse_transform(scaler, pred, df_trans) resid_pca = self.residual_pca(resid) actual_list, pred_list, resid_list, resid_pca_list = self.update_lists( actual, pred, resid, resid_pca ) dist = self.calculate_distances(resid) return actual_list, pred_list, resid_list, resid_pca_list, dist