import pandas as pd from pickle import load from datetime import datetime, date from sklearn.preprocessing import StandardScaler import joblib import json import numpy as np class EnergyPredictionPipeline: scalerNorth = None scalerSouth = None def __init__(self, scaler1_path=None,scaler2_path=None): if scaler1_path: self.scalerNorth = self.get_scaler(scaler1_path) if scaler2_path: self.scalerSouth = self.get_scaler(scaler2_path) self.input_col_names = self.input_col_names + [ "date", "hvac_N" ] def get_scaler(self, scaler_path): return joblib.load(scaler_path) def transform_windows(self, df): return self.scalerNorth.transform(df) def date_encoder(df): df['day_of_week'] = df.index.dayofweek df['hour_of_day'] = df.index.hour df['month'] = df.index.month df['day_encoding'] = np.sin(2*np.pi*df['day_of_week']/7) df['hour_encoding'] = np.sin(2*np.pi*df['hour_of_day']/24) df['month_encoding'] = np.sin(2*np.pi*df['month']/12) return df def prepare_input(self, df_new): df = df_new.copy() df["date"] = pd.to_datetime(df["date"]) df.set_index("date", inplace=True) df = df.resample("H").mean() df = self.date_encoder(df) df.reset_index(inplace=True, drop=True) return df def extract_data_from_message(self, message): payload = json.loads(message.payload.decode()) len_df = len(self.df) k = {} for col in self.input_col_names: k[col] = payload[col] self.df.loc[len_df] = k return self.df def get_window(self, df): len_df = len(df) print(len_df) if len_df > 4*7*24: return df[len_df - 673 : len_df].astype("float32") else: return None def fit(self, message): df_new = self.extract_data_from_message(message) df_window = self.get_window(df_new) if df_window is not None: df = self.prepare_input(df_window) df = self.transform_windows(df) else: df = None return df