smart-buildings / src /vav /VAVPipeline.py
akshayballal's picture
chore: Remove unused pickle files and refactor VAV module
3759ba9
import json
import joblib
import pandas as pd
from sklearn.preprocessing import StandardScaler
import numpy as np
class VAVPipeline:
"""
A class representing a Variable Air Volume (VAV) pipeline.
Attributes:
rtu_id (int): The ID of the RTU (Roof Top Unit).
scaler_path (str): The path to the scaler file.
window_size (int): The size of the sliding window.
Methods:
get_scaler(scaler_path): Loads the scaler from the given path.
get_window(df): Returns the sliding window of the given dataframe.
transform_window(df_window): Transforms the values of the dataframe using the scaler.
prepare_input(df_trans): Prepares the input for the model.
get_input_output(df): Extracts the input and output column names from the dataframe.
extract_data_from_message(message): Extracts data from the message payload and returns a dataframe.
fit(message): Fits the model with the extracted data and returns the prepared input and transformed data.
"""
def __init__(self, rtu_id, scaler_path=None, window_size=30):
"""
Initializes a VAVPipeline object.
Args:
rtu_id (int): The ID of the RTU (Roof Top Unit).
scaler_path (str, optional): The path to the scaler file. Defaults to None.
window_size (int, optional): The size of the sliding window. Defaults to 30.
"""
self.get_cols = True
self.window_size = window_size
self.rtu_id = rtu_id
if rtu_id == 1:
self.zones = [69, 68, 67, 66, 65, 64, 42, 41, 40, 39, 38, 37, 36]
if rtu_id == 2:
self.zones = [
72,
71,
63,
62,
60,
59,
58,
57,
50,
49,
44,
43,
35,
34,
33,
32,
31,
30,
29,
28,
]
if rtu_id == 3:
self.zones = [61, 56, 55, 48, 45, 26, 25, 18]
if rtu_id == 4:
self.zones = [16, 17, 21, 23, 24, 46, 47, 51, 52, 53, 54]
self.output_col_names = []
self.input_col_names = [
f"rtu_00{rtu_id}_fltrd_sa_flow_tn",
f"rtu_00{rtu_id}_sa_temp",
"air_temp_set_1",
"air_temp_set_2",
"dew_point_temperature_set_1d",
"relative_humidity_set_1",
"solar_radiation_set_1",
]
self.column_names = self.output_col_names + self.input_col_names
self.num_inputs = len(self.input_col_names)
self.num_outputs = len(self.output_col_names)
if scaler_path:
self.scaler = self.get_scaler(scaler_path)
def get_scaler(self, scaler_path):
"""
Loads the scaler from the given path.
Args:
scaler_path (str): The path to the scaler file.
Returns:
StandardScaler: The loaded scaler object.
"""
return joblib.load(scaler_path)
def get_window(self, df):
"""
Returns the sliding window of the given dataframe.
Args:
df (pd.DataFrame): The dataframe.
Returns:
pd.DataFrame: The sliding window dataframe.
"""
len_df = len(df)
if len_df > self.window_size:
return df[len_df - (self.window_size + 1) : len_df].astype("float32")
else:
return None
def transform_window(self, df_window):
"""
Transforms the values of the dataframe using the scaler.
Args:
df_window (pd.DataFrame): The dataframe.
Returns:
np.ndarray: The transformed values.
"""
return self.scaler.transform(df_window.values)
def prepare_input(self, df_trans):
"""
Prepares the input for the model.
Args:
df_trans (np.ndarray): The transformed values.
Returns:
np.ndarray: The prepared input.
"""
return df_trans[: self.window_size, :].reshape(
(1, self.window_size, len(self.column_names))
)
def get_input_output(self, df: pd.DataFrame):
"""
Extracts the input and output column names from the dataframe.
Args:
df (pd.DataFrame): The dataframe.
"""
for zone in self.zones:
for column in df.columns:
if (
f"zone_0{zone}" in column
and "co2" not in column
and "hw_valve" not in column
and "cooling_sp" not in column
and "heating_sp" not in column
):
self.output_col_names.append(column)
self.input_col_names = [
f"rtu_00{self.rtu_id}_fltrd_sa_flow_tn",
f"rtu_00{self.rtu_id}_sa_temp",
"air_temp_set_1",
"air_temp_set_2",
"dew_point_temperature_set_1d",
"relative_humidity_set_1",
"solar_radiation_set_1",
]
for zone in self.zones:
for column in df.columns:
if f"zone_0{zone}" in column:
if "cooling_sp" in column or "heating_sp" in column:
self.input_col_names.append(column)
self.column_names = self.output_col_names + self.input_col_names
self.num_inputs = len(self.input_col_names)
self.num_outputs = len(self.output_col_names)
self.df = pd.DataFrame(columns=self.column_names)
def extract_data_from_message(self, df: pd.DataFrame):
"""
Extracts data from the message payload and returns a dataframe.
Args:
message: The message containing the payload.
Returns:
pd.DataFrame: The extracted data as a dataframe.
"""
if self.get_cols == True:
self.get_input_output(df)
self.get_cols = False
df = df[self.column_names]
len_df = len(self.df)
if len_df != 0:
self.df = pd.concat([self.df, df], axis=0)
else:
self.df = df
if len_df > 31:
self.df = self.df.iloc[len_df - 31 : len_df]
self.df.loc[len_df] = self.df.mean()
return self.df
else:
return None
def fit(self, df: pd.DataFrame):
"""
Fits the model with the extracted data and returns the prepared input and transformed data.
Args:
message: The message containing the data.
Returns:
tuple: A tuple containing the prepared input and transformed data.
"""
df_window = self.extract_data_from_message(df)
if df_window is not None:
df_trans = self.transform_window(df_window)
df_new = self.prepare_input(df_trans)
else:
df_new = None
df_trans = None
return df_new, df_trans