smart-buildings / src /rtu /RTUAnomalizer1.py
jerin
update dashbord with fault color
8e50b17
raw
history blame
8.26 kB
import numpy as np
from tensorflow.keras.models import load_model
import joblib
class RTUAnomalizer1:
"""
Class for performing anomaly detection on RTU (Roof Top Unit) data.
"""
def __init__(
self,
prediction_model_path=None,
clustering_model_paths=None,
pca_model_paths=None,
num_inputs=None,
num_outputs=None,
):
"""
Initialize the RTUAnomalizer object.
Args:
prediction_model_path (str): Path to the prediction model file.
clustering_model_paths (list): List of paths to the clustering model files.
num_inputs (int): Number of input features.
num_outputs (int): Number of output features.
"""
self.model = None
self.kmeans_models = []
self.pca_models = []
self.num_inputs = num_inputs
self.num_outputs = num_outputs
if (
prediction_model_path is not None
and clustering_model_paths is not None
and pca_model_paths is not None
):
self.load_models(
prediction_model_path, clustering_model_paths, pca_model_paths
)
self.actual_list, self.pred_list, self.resid_list, self.resid_pca_list, self.distance_list = (
self.initialize_lists()
)
self.fault_1 = 0
self.fault_2 = 0
def initialize_lists(self, size=30):
"""
Initialize lists for storing actual, predicted, and residual values.
Args:
size (int): Size of the lists.
Returns:
tuple: A tuple containing three lists initialized with zeros.
"""
initial_values = [[0] * self.num_outputs] * size
initial_values1 = [[0] * 4] * size
initial_values2 = [[0] * 2] * 60
return (
initial_values.copy(),
initial_values.copy(),
initial_values.copy(),
initial_values1.copy(),
initial_values2.copy(),
)
def load_models(
self, prediction_model_path, clustering_model_paths, pca_model_paths
):
"""
Load the prediction and clustering models.
Args:
prediction_model_path (str): Path to the prediction model file.
clustering_model_paths (list): List of paths to the clustering model files.
"""
self.model = load_model(prediction_model_path)
for path in clustering_model_paths:
self.kmeans_models.append(joblib.load(path))
for path in pca_model_paths:
self.pca_models.append(joblib.load(path))
def predict(self, df_new):
"""
Make predictions using the prediction model.
Args:
df_new (DataFrame): Input data for prediction.
Returns:
array: Predicted values.
"""
return self.model.predict(df_new, verbose=0)
def calculate_residuals(self, df_trans, pred):
"""
Calculate the residuals between actual and predicted values.
Args:
df_trans (DataFrame): Transformed input data.
pred (array): Predicted values.
Returns:
tuple: A tuple containing the actual values and residuals.
"""
actual = df_trans[30, : self.num_outputs]
resid = actual - pred
return actual, resid
def resize_prediction(self, pred, df_trans):
"""
Resize the predicted values to match the shape of the transformed input data.
Args:
pred (array): Predicted values.
df_trans (DataFrame): Transformed input data.
Returns:
array: Resized predicted values.
"""
pred = np.resize(
pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :]))
)
pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[
30, self.num_outputs :
]
return pred
def inverse_transform(self, scaler, pred, df_trans):
"""
Inverse transform the predicted and actual values.
Args:
scaler (object): Scaler object for inverse transformation.
pred (array): Predicted values.
df_trans (DataFrame): Transformed input data.
Returns:
tuple: A tuple containing the actual and predicted values after inverse transformation.
"""
pred = scaler.inverse_transform(np.array(pred))
actual = scaler.inverse_transform(np.array([df_trans[30, :]]))
return actual, pred
def update_lists(self, actual, pred, resid):
"""
Update the lists of actual, predicted, and residual values.
Args:
actual_list (list): List of actual values.
pred_list (list): List of predicted values.
resid_list (list): List of residual values.
actual (array): Actual values.
pred (array): Predicted values.
resid (array): Residual values.
Returns:
tuple: A tuple containing the updated lists of actual, predicted, and residual values.
"""
self.actual_list.pop(0)
self.pred_list.pop(0)
self.resid_list.pop(0)
self.actual_list.append(actual.flatten().tolist())
self.pred_list.append(pred.flatten().tolist())
self.resid_list.append(resid.flatten().tolist())
return self.actual_list, self.pred_list, self.resid_list
def calculate_distances(self, resid):
"""
Calculate the distances between residuals and cluster centers.
Args:
resid (array): Residual values.
Returns:
array: Array of distances.
"""
dist = []
resid_pcas = []
for i, model in enumerate(self.kmeans_models):
resid_pca = self.pca_models[i].transform(
resid[:, (i * 7) + 1 : (i * 7) + 8]
)
resid_pcas = resid_pcas + resid_pca.tolist()
dist.append(
np.linalg.norm(
resid_pca - model.cluster_centers_[0],
ord=2,
axis=1,
)
)
self.distance_list.pop(0)
self.distance_list.append(np.concatenate(dist).tolist())
resid_pcas = np.array(resid_pcas).flatten().tolist()
self.resid_pca_list.pop(0)
self.resid_pca_list.append(resid_pcas)
return np.array(dist)
def fault_windowing(self):
rtu_1_dist = np.array(self.distance_list).T[0]>1 #rtu_1_threshold
rtu_1_dist = [int(x) for x in rtu_1_dist]
if sum(rtu_1_dist)>0.8*60: # 80% of the 60 min window
self.fault_1 = 1
else:
self.fault_1 = 0
rtu_2_dist = np.array(self.distance_list).T[1]>0.5 #rtu_2_threshold
rtu_2_dist = [int(x) for x in rtu_2_dist]
if sum(rtu_2_dist)>0.05*60: # 80% of the 60 min window
self.fault_2 = 1
else:
self.fault_2 = 0
def pipeline(self, df_new, df_trans, scaler):
"""
Perform the anomaly detection pipeline.
Args:
df_new (DataFrame): Input data for prediction.
df_trans (DataFrame): Transformed input data.
scaler (object): Scaler object for inverse transformation.
Returns:
tuple: A tuple containing the lists of actual, predicted, and residual values, and the distances.
"""
pred = self.predict(df_new)
actual, resid = self.calculate_residuals(df_trans, pred)
pred = self.resize_prediction(pred, df_trans)
actual, pred = self.inverse_transform(scaler, pred, df_trans)
actual_list, pred_list, resid_list = self.update_lists(actual, pred, resid)
dist = self.calculate_distances(resid)
self.fault_windowing()
return (
actual_list,
pred_list,
resid_list,
self.resid_pca_list,
dist,
np.array(self.distance_list[30:]).T[0]>1, #rtu_1_threshold
np.array(self.distance_list[30:]).T[1]>0.5, #rtu_2_threshold
self.fault_1,
self.fault_2
)