Spaces:
Sleeping
Sleeping
import json | |
import joblib | |
import pandas as pd | |
from sklearn.preprocessing import StandardScaler | |
import numpy as np | |
class VAVPipeline: | |
""" | |
A class representing a Variable Air Volume (VAV) pipeline. | |
Attributes: | |
rtu_id (int): The ID of the RTU (Roof Top Unit). | |
scaler_path (str): The path to the scaler file. | |
window_size (int): The size of the sliding window. | |
Methods: | |
get_scaler(scaler_path): Loads the scaler from the given path. | |
get_window(df): Returns the sliding window of the given dataframe. | |
transform_window(df_window): Transforms the values of the dataframe using the scaler. | |
prepare_input(df_trans): Prepares the input for the model. | |
get_input_output(df): Extracts the input and output column names from the dataframe. | |
extract_data_from_message(message): Extracts data from the message payload and returns a dataframe. | |
fit(message): Fits the model with the extracted data and returns the prepared input and transformed data. | |
""" | |
def __init__(self, rtu_id, scaler_path=None, window_size=30): | |
""" | |
Initializes a VAVPipeline object. | |
Args: | |
rtu_id (int): The ID of the RTU (Roof Top Unit). | |
scaler_path (str, optional): The path to the scaler file. Defaults to None. | |
window_size (int, optional): The size of the sliding window. Defaults to 30. | |
""" | |
self.get_cols = True | |
self.window_size = window_size | |
self.rtu_id = rtu_id | |
if rtu_id == 1: | |
self.zones = [69, 68, 67, 66, 65, 64, 42, 41, 40, 39, 38, 37, 36] | |
if rtu_id == 2: | |
self.zones = [72, 71, 63, 62, 60, 59, 58,57, 50, 49, 44, 43, 35, 34, 33, 32, 31, 30, 29, 28] | |
self.output_col_names = [] | |
self.input_col_names = [ | |
f"rtu_00{rtu_id}_fltrd_sa_flow_tn", | |
f"rtu_00{rtu_id}_sa_temp", | |
"air_temp_set_1", | |
"air_temp_set_2", | |
"dew_point_temperature_set_1d", | |
"relative_humidity_set_1", | |
"solar_radiation_set_1", | |
] | |
self.column_names = self.output_col_names + self.input_col_names | |
self.num_inputs = len(self.input_col_names) | |
self.num_outputs = len(self.output_col_names) | |
if scaler_path: | |
self.scaler = self.get_scaler(scaler_path) | |
def get_scaler(self, scaler_path): | |
""" | |
Loads the scaler from the given path. | |
Args: | |
scaler_path (str): The path to the scaler file. | |
Returns: | |
StandardScaler: The loaded scaler object. | |
""" | |
return joblib.load(scaler_path) | |
def get_window(self, df): | |
""" | |
Returns the sliding window of the given dataframe. | |
Args: | |
df (pd.DataFrame): The dataframe. | |
Returns: | |
pd.DataFrame: The sliding window dataframe. | |
""" | |
len_df = len(df) | |
if len_df > self.window_size: | |
return df[len_df - (self.window_size + 1) : len_df].astype("float32") | |
else: | |
return None | |
def transform_window(self, df_window): | |
""" | |
Transforms the values of the dataframe using the scaler. | |
Args: | |
df_window (pd.DataFrame): The dataframe. | |
Returns: | |
np.ndarray: The transformed values. | |
""" | |
return self.scaler.transform(df_window.values) | |
def prepare_input(self, df_trans): | |
""" | |
Prepares the input for the model. | |
Args: | |
df_trans (np.ndarray): The transformed values. | |
Returns: | |
np.ndarray: The prepared input. | |
""" | |
return df_trans[: self.window_size, :].reshape( | |
(1, self.window_size, len(self.column_names)) | |
) | |
def get_input_output(self, df: pd.DataFrame): | |
""" | |
Extracts the input and output column names from the dataframe. | |
Args: | |
df (pd.DataFrame): The dataframe. | |
""" | |
for zone in self.zones: | |
for column in df.columns: | |
if ( | |
f"zone_0{zone}" in column | |
and "co2" not in column | |
and "hw_valve" not in column | |
and "cooling_sp" not in column | |
and "heating_sp" not in column | |
): | |
self.output_col_names.append(column) | |
self.input_col_names = [ | |
f"rtu_00{self.rtu_id}_fltrd_sa_flow_tn", | |
f"rtu_00{self.rtu_id}_sa_temp", | |
"air_temp_set_1", | |
"air_temp_set_2", | |
"dew_point_temperature_set_1d", | |
"relative_humidity_set_1", | |
"solar_radiation_set_1", | |
] | |
for zone in self.zones: | |
for column in df.columns: | |
if f"zone_0{zone}" in column: | |
if "cooling_sp" in column or "heating_sp" in column: | |
self.input_col_names.append(column) | |
self.column_names = self.output_col_names + self.input_col_names | |
self.num_inputs = len(self.input_col_names) | |
self.num_outputs = len(self.output_col_names) | |
self.df = pd.DataFrame(columns=self.column_names) | |
def extract_data_from_message(self, df: pd.DataFrame): | |
""" | |
Extracts data from the message payload and returns a dataframe. | |
Args: | |
message: The message containing the payload. | |
Returns: | |
pd.DataFrame: The extracted data as a dataframe. | |
""" | |
if self.get_cols == True: | |
self.get_input_output(df) | |
self.get_cols = False | |
df = df[self.column_names] | |
len_df = len(self.df) | |
if len_df != 0: | |
self.df = pd.concat([self.df, df], axis=0) | |
else: | |
self.df = df | |
if len_df > 31: | |
self.df = self.df.iloc[len_df - 31 : len_df] | |
self.df.loc[len_df] = self.df.mean() | |
return self.df | |
else: | |
return None | |
def fit(self, df: pd.DataFrame): | |
""" | |
Fits the model with the extracted data and returns the prepared input and transformed data. | |
Args: | |
message: The message containing the data. | |
Returns: | |
tuple: A tuple containing the prepared input and transformed data. | |
""" | |
df_window = self.extract_data_from_message(df) | |
if df_window is not None: | |
df_trans = self.transform_window(df_window) | |
df_new = self.prepare_input(df_trans) | |
else: | |
df_new = None | |
df_trans = None | |
return df_new, df_trans | |