akshayballal's picture
Refactor RTU pipeline for improved scalability and maintainability
767e14d
import json
from rtu.RTUAnomalizer1 import RTUAnomalizer1
from rtu.RTUAnomalizer2 import RTUAnomalizer2
from rtu.RTUPipeline import RTUPipeline
from vav.VAVPipeline import VAVPipeline
from vav.VAVAnomalizer import VAVAnomalizer
from energy_prediction.EnergyPredictionNorth import EnergyPredictionNorth
import paho.mqtt.client as mqtt
def main():
rtu_data_pipeline = RTUPipeline(
scaler1_path="src/rtu/models/scaler_rtu_1_2.pkl",
scaler2_path="src/rtu/models/scaler_rtu_3_4.pkl",
)
# RTU - 1, 2
rtu_anomalizer1 = RTUAnomalizer1(
prediction_model_path="src/rtu/models/lstm_2rtu_smooth_04.keras",
clustering_model_paths=[
"src/rtu/models/kmeans_rtu_1.pkl",
"src/rtu/models/kmeans_rtu_2.pkl",
],
pca_model_paths=[
"src/rtu/models/pca_rtu_1.pkl",
"src/rtu/models/pca_rtu_2.pkl",
],
num_inputs=rtu_data_pipeline.num_inputs,
num_outputs=rtu_data_pipeline.num_outputs,
)
print(rtu_anomalizer1.kmeans_models)
# RTU - 3,4
rtu_anomalizer2 = RTUAnomalizer2(
prediction_model_path="src/rtu/models/lstm_2rtu_smooth_03.keras",
clustering_model_paths=[
"src/rtu/models/kmeans_rtu_3.pkl",
"src/rtu/models/kmeans_rtu_4.pkl",
],
pca_model_paths=[
"src/rtu/models/pca_rtu_3.pkl",
"src/rtu/models/pca_rtu_4.pkl",
],
num_inputs=rtu_data_pipeline.num_inputs,
num_outputs=rtu_data_pipeline.num_outputs,
)
vav_pipeline = VAVPipeline(rtu_id=1, scaler_path="src/vav/models/scaler_vav_1.pkl")
vav_anomalizer = VAVAnomalizer(
rtu_id=1,
prediction_model_path="src/vav/models/lstm_vav_01.keras",
clustering_model_path="src/vav/models/kmeans_vav_1.pkl",
num_inputs=vav_pipeline.num_inputs,
num_outputs=vav_pipeline.num_outputs,
)
# print(vav_pipeline.input_col_names)
# print(len(vav_pipeline.output_col_names))
def on_message(client, userdata, message):
df_new_vav, df_trans_vav = vav_pipeline.fit(message)
vav_anomalizer.num_inputs = vav_pipeline.num_inputs
vav_anomalizer.num_outputs = vav_pipeline.num_outputs
if not df_new_vav is None and not df_trans_vav is None:
out_vav = vav_anomalizer.pipeline(
df_new_vav, df_trans_vav, vav_pipeline.scaler
)
df_new1, df_trans1, df_new2, df_trans2 = rtu_data_pipeline.fit(message)
if (
not df_new1 is None
and not df_trans1 is None
and not df_new2 is None
and not df_trans2 is None
):
out1, out2, out3, out4 = rtu_anomalizer1.pipeline(
df_new1, df_trans1, rtu_data_pipeline.scaler1
)
out5, out6, out7, out8 = rtu_anomalizer2.pipeline(
df_new2, df_trans2, rtu_data_pipeline.scaler2
)
# print(out2)
broker_address = "localhost"
broker_port = 1883
topic = "sensor_data"
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
print("Connecting to broker")
client.on_message = on_message
client.connect(broker_address, broker_port)
client.subscribe(topic)
client.loop_forever()
if __name__ == "__main__":
main()