import json import joblib import pandas as pd from sklearn.preprocessing import StandardScaler from pickle import load import numpy as np class RTUPipeline: scaler = None def __init__(self, scaler_path=None): self.output_col_names = [ "hp_hws_temp", "rtu_003_sa_temp", "rtu_003_oadmpr_pct", "rtu_003_ra_temp", "rtu_003_oa_temp", "rtu_003_ma_temp", "rtu_003_sf_vfd_spd_fbk_tn", "rtu_003_rf_vfd_spd_fbk_tn", "rtu_004_sa_temp", "rtu_004_oadmpr_pct", "rtu_004_ra_temp", "rtu_004_oa_temp", "rtu_004_ma_temp", "rtu_004_sf_vfd_spd_fbk_tn", "rtu_004_rf_vfd_spd_fbk_tn", "rtu_001_sa_temp", "rtu_001_oadmpr_pct", "rtu_001_ra_temp", "rtu_001_oa_temp", "rtu_001_ma_temp", "rtu_001_sf_vfd_spd_fbk_tn", "rtu_001_rf_vfd_spd_fbk_tn", "rtu_002_sa_temp", "rtu_002_oadmpr_pct", "rtu_002_ra_temp", "rtu_002_oa_temp", "rtu_002_ma_temp", "rtu_002_sf_vfd_spd_fbk_tn", "rtu_002_rf_vfd_spd_fbk_tn", # "rtu_004_sat_sp_tn", # "rtu_003_sat_sp_tn", # "rtu_001_sat_sp_tn", # "rtu_002_sat_sp_tn", # "air_temp_set_1", # "air_temp_set_2", # "dew_point_temperature_set_1d", # "relative_humidity_set_1", # "solar_radiation_set_1", ] self.input_col_names = ["air_temp_set_1", "air_temp_set_2", "dew_point_temperature_set_1d", "relative_humidity_set_1", "solar_radiation_set_1", ] self.num_inputs = len(self.input_col_names) self.num_outputs = len(self.output_col_names) self.column_names = self.output_col_names+self.input_col_names if scaler_path: self.scaler = self.get_scaler(scaler_path) self.df = pd.DataFrame(columns = self.column_names) def get_scaler(self, scaler_path): return scaler_path def get_window(self, df): len_df = np.len(df) if len_df > 30: return df[len_df - 31 : len_df].astype("float32") else: return None def transform_window(self, df_window): return self.scaler.transform(df_window) def prepare_input(self,df_trans): return df_trans[:30, :].reshape((1, 30, len(self.column_names))) def extract_data_from_message(self, message): payload = json.loads(message.payload.decode()) len_df = len(self.df) # self.df.loc[len_df] = {'hp_hws_temp':payload['hp_hws_temp'], # 'rtu_003_sa_temp':payload['rtu_003_sa_temp'], # 'rtu_003_oadmpr_pct': payload["rtu_003_oadmpr_pct"], # 'rtu_003_ra_temp':payload["rtu_003_ra_temp"], # 'rtu_003_oa_temp': payload["rtu_003_oa_temp"], # 'rtu_003_ma_temp': payload["rtu_003_ma_temp"], # 'rtu_003_sf_vfd_spd_fbk_tn': payload["rtu_003_sf_vfd_spd_fbk_tn"], # 'rtu_003_rf_vfd_spd_fbk_tn':payload["rtu_003_rf_vfd_spd_fbk_tn"], # 'rtu_004_sa_temp':payload["rtu_004_sa_temp"], # 'rtu_004_oadmpr_pct':payload["rtu_004_oadmpr_pct"], # 'rtu_004_ra_temp':payload["rtu_004_ra_temp"], # 'rtu_004_oa_temp':payload["rtu_004_oa_temp"], # 'rtu_004_ma_temp':payload["rtu_004_ma_temp"], # 'rtu_004_sf_vfd_spd_fbk_tn':payload["rtu_004_sf_vfd_spd_fbk_tn"], # 'rtu_004_rf_vfd_spd_fbk_tn':payload["rtu_004_rf_vfd_spd_fbk_tn"], # 'rtu_001_sa_temp':payload["rtu_001_sa_temp"], # 'rtu_001_oadmpr_pct': payload["rtu_001_oadmpr_pct"], # 'rtu_001_ra_temp':payload["rtu_001_ra_temp"], # 'rtu_001_oa_temp': payload["rtu_001_oa_temp"], # 'rtu_001_ma_temp': payload["rtu_001_ma_temp"], # 'rtu_001_sf_vfd_spd_fbk_tn': payload["rtu_001_sf_vfd_spd_fbk_tn"], # 'rtu_001_rf_vfd_spd_fbk_tn':payload["rtu_001_rf_vfd_spd_fbk_tn"], # 'rtu_002_sa_temp':payload["rtu_002_sa_temp"], # 'rtu_002_oadmpr_pct':payload["rtu_002_oadmpr_pct"], # 'rtu_002_ra_temp':payload["rtu_002_ra_temp"], # 'rtu_002_oa_temp':payload["rtu_002_oa_temp"], # 'rtu_002_ma_temp':payload["rtu_002_ma_temp"], # 'rtu_002_sf_vfd_spd_fbk_tn':payload["rtu_002_sf_vfd_spd_fbk_tn"], # 'rtu_002_rf_vfd_spd_fbk_tn':payload["rtu_002_rf_vfd_spd_fbk_tn"], # 'rtu_004_sat_sp_tn':payload["rtu_004_sat_sp_tn"], # 'rtu_003_sat_sp_tn' :payload["rtu_003_sat_sp_tn"], # 'rtu_001_sat_sp_tn':payload["rtu_001_sat_sp_tn"], # 'rtu_002_sat_sp_tn':payload["rtu_002_sat_sp_tn"], # 'air_temp_set_1':payload["air_temp_set_1"], # 'air_temp_set_2':payload["air_temp_set_2"], # 'dew_point_temperature_set_1d':payload["dew_point_temperature_set_1d"], # 'relative_humidity_set_1':payload["relative_humidity_set_1"], # 'solar_radiation_set_1':payload["solar_radiation_set_1"]} self.df.loc[len_df] = {} for col in self.column_names: self.df.loc[len_df][col] = payload[col] return self.df def fit(self,message): len_df = np.len(df) df = self.extract_data_from_message(message) df_window = self.get_window(df, len_df) if df_window is not None: df_trans = self.transform_window(df_window, self.scaler) df_new = self.prepare_input(df_trans) return df_new,df_trans