Spaces:
Sleeping
Sleeping
import numpy as np | |
from sklearn.cluster import KMeans | |
from sklearn.decomposition import PCA | |
from tensorflow.keras.models import load_model | |
import joblib | |
class VAVAnomalizer: | |
def __init__( | |
self, | |
rtu_id, | |
prediction_model_path, | |
clustering_model_path, | |
pca_model_path, | |
num_inputs, | |
num_outputs, | |
): | |
""" | |
Initializes a VAVAnomalizer object. | |
Args: | |
rtu_id (int): The ID of the RTU (Roof Top Unit) associated with the VAV (Variable Air Volume) system. | |
prediction_model_path (str): The file path to the prediction model. | |
pca_model_path (str): The file path to the PCA model. | |
clustering_model_path (str): The file path to the clustering model. | |
num_inputs (int): The number of input features for the prediction model. | |
num_outputs (int): The number of output features for the prediction model. | |
""" | |
self.rtu_id = rtu_id | |
self.num_inputs = num_inputs | |
self.num_outputs = num_outputs | |
self.load_models(prediction_model_path, clustering_model_path, pca_model_path) | |
self.actual_list, self.pred_list, self.resid_list, self.resid_pca_list = ( | |
self.initialize_lists() | |
) | |
def load_models(self, prediction_model_path, clustering_model_path, pca_model_path): | |
""" | |
Loads the prediction model and clustering model. | |
Args: | |
prediction_model_path (str): The file path to the prediction model. | |
pca_model_path (str): The file path to the PCA model. | |
clustering_model_path (str): The file path to the clustering model. | |
""" | |
self.model = load_model(prediction_model_path) | |
self.pca_model: PCA = joblib.load(pca_model_path) | |
self.kmeans_model: KMeans = joblib.load(clustering_model_path) | |
def initialize_lists(self, size=30): | |
""" | |
Initialize lists for storing actual, predicted, and residual values. | |
Args: | |
size (int): Size of the lists. | |
Returns: | |
tuple: A tuple containing three lists initialized with zeros. | |
""" | |
initial_values = [[0] * self.num_outputs] * size | |
initial_values1 = [[0] * 2] * size | |
return ( | |
initial_values.copy(), | |
initial_values.copy(), | |
initial_values.copy(), | |
initial_values1.copy(), | |
) | |
def predict(self, df_new): | |
""" | |
Makes predictions using the prediction model. | |
Args: | |
df_new (numpy.ndarray): The new data for prediction. | |
Returns: | |
numpy.ndarray: The predicted values. | |
""" | |
return self.model.predict(df_new) | |
def calculate_residuals(self, df_trans, pred): | |
""" | |
Calculates the residuals between the actual values and the predicted values. | |
Args: | |
df_trans (numpy.ndarray): The transformed data. | |
pred (numpy.ndarray): The predicted values. | |
Returns: | |
numpy.ndarray: The actual values. | |
numpy.ndarray: The residuals. | |
""" | |
actual = df_trans[30, : self.num_outputs] | |
resid = pred - actual | |
return actual, resid | |
def calculate_distances(self, resid): | |
""" | |
Calculate the distances between residuals and cluster centers. | |
Args: | |
resid (array): Residual values. | |
Returns: | |
array: Array of distances. | |
""" | |
dist = [] | |
dist.append( | |
np.linalg.norm( | |
self.pca_model.transform(resid.reshape(1, -1)) | |
- self.kmeans_model.cluster_centers_[0] | |
) | |
) | |
return np.array(dist) | |
def residual_pca(self, resid): | |
""" | |
Perform PCA on the residuals. | |
Args: | |
resid (array): Residual values. | |
Returns: | |
array: Transformed residuals. | |
""" | |
return self.pca_model.transform(resid.reshape(1, -1)) | |
def resize_prediction(self, pred, df_trans): | |
""" | |
Resize the predicted values to match the shape of the transformed input data. | |
Args: | |
pred (array): Predicted values. | |
df_trans (DataFrame): Transformed input data. | |
Returns: | |
array: Resized predicted values. | |
""" | |
pred = np.resize( | |
pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :])) | |
) | |
pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[ | |
30, self.num_outputs : | |
] | |
return pred | |
def inverse_transform(self, scaler, pred, df_trans): | |
""" | |
Inverse transform the predicted and actual values. | |
Args: | |
scaler (object): Scaler object for inverse transformation. | |
pred (array): Predicted values. | |
df_trans (DataFrame): Transformed input data. | |
Returns: | |
tuple: A tuple containing the actual and predicted values after inverse transformation. | |
""" | |
pred = scaler.inverse_transform(np.array(pred)) | |
actual = scaler.inverse_transform(np.array([df_trans[30, :]])) | |
return actual, pred | |
def update_lists(self, actual, pred, resid, resid_pca): | |
""" | |
Update the lists of actual, predicted, and residual values. | |
Args: | |
actual_list (list): List of actual values. | |
pred_list (list): List of predicted values. | |
resid_list (list): List of residual values. | |
resid_pca_list (list): List of PCA-transformed residual values. | |
actual (array): Actual values. | |
pred (array): Predicted values. | |
resid (array): Residual values. | |
Returns: | |
tuple: A tuple containing the updated lists of actual, predicted, and residual values. | |
""" | |
self.actual_list.pop(0) | |
self.pred_list.pop(0) | |
self.resid_list.pop(0) | |
self.resid_pca_list.pop(0) | |
self.actual_list.append(actual.flatten().tolist()) | |
self.pred_list.append(pred.flatten().tolist()) | |
self.resid_list.append(resid.flatten().tolist()) | |
self.resid_pca_list.append(resid_pca.flatten().tolist()) | |
return self.actual_list, self.pred_list, self.resid_list, self.resid_pca_list | |
def pipeline(self, df_new, df_trans, scaler): | |
""" | |
Perform the anomaly detection pipeline. | |
Args: | |
df_new (DataFrame): Input data for prediction. | |
df_trans (DataFrame): Transformed input data. | |
scaler (object): Scaler object for inverse transformation. | |
Returns: | |
tuple: A tuple containing the lists of actual, predicted, and residual values, and the distances. | |
""" | |
pred = self.predict(df_new) | |
actual, resid = self.calculate_residuals(df_trans, pred) | |
pred = self.resize_prediction(pred, df_trans) | |
actual, pred = self.inverse_transform(scaler, pred, df_trans) | |
resid_pca = self.residual_pca(resid) | |
actual_list, pred_list, resid_list, resid_pca_list = self.update_lists( | |
actual, pred, resid, resid_pca | |
) | |
dist = self.calculate_distances(resid) | |
return actual_list, pred_list, resid_list, resid_pca_list, dist | |