import json import joblib import pandas as pd from sklearn.preprocessing import StandardScaler import numpy as np class VAVPipeline: """ A class representing a Variable Air Volume (VAV) pipeline. Attributes: rtu_id (int): The ID of the RTU (Roof Top Unit). scaler_path (str): The path to the scaler file. window_size (int): The size of the sliding window. Methods: get_scaler(scaler_path): Loads the scaler from the given path. get_window(df): Returns the sliding window of the given dataframe. transform_window(df_window): Transforms the values of the dataframe using the scaler. prepare_input(df_trans): Prepares the input for the model. get_input_output(df): Extracts the input and output column names from the dataframe. extract_data_from_message(message): Extracts data from the message payload and returns a dataframe. fit(message): Fits the model with the extracted data and returns the prepared input and transformed data. """ def __init__(self, rtu_id, scaler_path=None, window_size=30): """ Initializes a VAVPipeline object. Args: rtu_id (int): The ID of the RTU (Roof Top Unit). scaler_path (str, optional): The path to the scaler file. Defaults to None. window_size (int, optional): The size of the sliding window. Defaults to 30. """ self.get_cols = True self.window_size = window_size self.rtu_id = rtu_id if rtu_id == 1: self.zones = [69, 68, 67, 66, 65, 64, 42, 41, 40, 39, 38, 37, 36] if rtu_id == 2: self.zones = [ 72, 71, 63, 62, 60, 59, 58, 57, 50, 49, 44, 43, 35, 34, 33, 32, 31, 30, 29, 28, ] if rtu_id == 3: self.zones = [61, 56, 55, 48, 45, 26, 25, 18] if rtu_id == 4: self.zones = [16, 17, 21, 23, 24, 46, 47, 51, 52, 53, 54] self.output_col_names = [] self.input_col_names = [ f"rtu_00{rtu_id}_fltrd_sa_flow_tn", f"rtu_00{rtu_id}_sa_temp", "air_temp_set_1", "air_temp_set_2", "dew_point_temperature_set_1d", "relative_humidity_set_1", "solar_radiation_set_1", ] self.column_names = self.output_col_names + self.input_col_names self.num_inputs = len(self.input_col_names) self.num_outputs = len(self.output_col_names) if scaler_path: self.scaler = self.get_scaler(scaler_path) def get_scaler(self, scaler_path): """ Loads the scaler from the given path. Args: scaler_path (str): The path to the scaler file. Returns: StandardScaler: The loaded scaler object. """ return joblib.load(scaler_path) def get_window(self, df): """ Returns the sliding window of the given dataframe. Args: df (pd.DataFrame): The dataframe. Returns: pd.DataFrame: The sliding window dataframe. """ len_df = len(df) if len_df > self.window_size: return df[len_df - (self.window_size + 1) : len_df].astype("float32") else: return None def transform_window(self, df_window): """ Transforms the values of the dataframe using the scaler. Args: df_window (pd.DataFrame): The dataframe. Returns: np.ndarray: The transformed values. """ return self.scaler.transform(df_window.values) def prepare_input(self, df_trans): """ Prepares the input for the model. Args: df_trans (np.ndarray): The transformed values. Returns: np.ndarray: The prepared input. """ return df_trans[: self.window_size, :].reshape( (1, self.window_size, len(self.column_names)) ) def get_input_output(self, df: pd.DataFrame): """ Extracts the input and output column names from the dataframe. Args: df (pd.DataFrame): The dataframe. """ for zone in self.zones: for column in df.columns: if ( f"zone_0{zone}" in column and "co2" not in column and "hw_valve" not in column and "cooling_sp" not in column and "heating_sp" not in column ): self.output_col_names.append(column) self.input_col_names = [ f"rtu_00{self.rtu_id}_fltrd_sa_flow_tn", f"rtu_00{self.rtu_id}_sa_temp", "air_temp_set_1", "air_temp_set_2", "dew_point_temperature_set_1d", "relative_humidity_set_1", "solar_radiation_set_1", ] for zone in self.zones: for column in df.columns: if f"zone_0{zone}" in column: if "cooling_sp" in column or "heating_sp" in column: self.input_col_names.append(column) self.column_names = self.output_col_names + self.input_col_names self.num_inputs = len(self.input_col_names) self.num_outputs = len(self.output_col_names) self.df = pd.DataFrame(columns=self.column_names) def extract_data_from_message(self, df: pd.DataFrame): """ Extracts data from the message payload and returns a dataframe. Args: message: The message containing the payload. Returns: pd.DataFrame: The extracted data as a dataframe. """ if self.get_cols == True: self.get_input_output(df) self.get_cols = False df = df[self.column_names] len_df = len(self.df) if len_df != 0: self.df = pd.concat([self.df, df], axis=0) else: self.df = df if len_df > 31: self.df = self.df.iloc[len_df - 31 : len_df] self.df.loc[len_df] = self.df.mean() return self.df else: return None def fit(self, df: pd.DataFrame): """ Fits the model with the extracted data and returns the prepared input and transformed data. Args: message: The message containing the data. Returns: tuple: A tuple containing the prepared input and transformed data. """ df_window = self.extract_data_from_message(df) if df_window is not None: df_trans = self.transform_window(df_window) df_new = self.prepare_input(df_trans) else: df_new = None df_trans = None return df_new, df_trans