import streamlit as st import os import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler import torch import time from utils.transform import compute_gradient from model.lstm import LSTMModel from model.tcn import TCNModel from model.tcn import move_custom_layers_to_device from utils.metrics import calculate_metrics each_feature_name = ["q_1","q_2","q_3","p_1","p_2","p_3"] def uniform_sampling(data, n_sample): k = len(data) // n_sample return data[::k] st.set_page_config(page_title="Prediction", page_icon=":chart_with_upwards_trend:", layout="wide", initial_sidebar_state="auto") #st.title("Prediction") with st.sidebar: slider_predict_step = st.slider('Predicted Step', 0, 20, 20) number_input_sample_id = st.number_input("Select Sample ID 1~10", value=1, placeholder="Type a number...", min_value=1, max_value=10, step=1) squences_start_idx = st.slider('Squences Start Index', 0, 700 - slider_predict_step, 0) st.subheader("Model Configuration") st.write("LSTM Window Size: ", 200) st.write("TCN Window Size: ", 300) st.write("Predicted Step: ", slider_predict_step) st.write("Feature Augmentation: ", "Second-order derivative") file_path = os.path.join("data", "file"+str(number_input_sample_id)+".dat.npz") data = pd.DataFrame(np.load(file_path)['data']) scaler = MinMaxScaler() uniform_data = uniform_sampling(data, n_sample=1000).sort_index().values[:, 1:8] normal_uniform_data = scaler.fit_transform(uniform_data) data_sequences = torch.tensor(np.stack(normal_uniform_data)).float() original_data_sequences = torch.tensor(np.stack(uniform_data)).float() selected_data = data_sequences[squences_start_idx:squences_start_idx+300+slider_predict_step] original_selected_data = original_data_sequences[squences_start_idx:squences_start_idx+300+slider_predict_step] input_data = torch.stack([compute_gradient(i, degree=2) for i in selected_data]).unsqueeze(0) with st.sidebar: st.subheader("Data Configuration") st.write("Sample ID: ", number_input_sample_id) #st.write("Origianl Shape: ", data_sequences.shape) st.write("Squences Start Index: ", squences_start_idx) #st.write("Selected Shape: ", selected_data.shape) #st.write("Input Shape: ", input_data.shape) # st.write(selected_data[0]) # st.write(input_data[0][0]) ################################################# ## LSTM CPU Inference ################################################# lstm_cpu_ckpt_file = os.path.join("model", "lstm.ckpt") lstm_cpu_model = LSTMModel.load_from_checkpoint(lstm_cpu_ckpt_file) lstm_cpu_model.to("cpu") lstm_cpu_model.eval() lstm_cpu_start_time = time.time() with torch.no_grad(): lstm_cpu_preds = lstm_cpu_model(input_data[:, 100:300, :]) lstm_cpu_end_time = time.time() lstm_innv_preds = scaler.inverse_transform(lstm_cpu_preds.squeeze().cpu().numpy()) lstm_normal_preds = lstm_cpu_preds.squeeze().cpu().numpy() del lstm_cpu_model ################################################# ## TCN CPU Inference ################################################# input_data_cpu = input_data.to("cpu") tcn_cpu_ckpt_file = os.path.join("model", "tcn.ckpt") tcn_cpu_model = TCNModel.load_from_checkpoint(tcn_cpu_ckpt_file) move_custom_layers_to_device(tcn_cpu_model, "cpu") tcn_cpu_model.eval() tcn_cpu_start_time = time.time() with torch.no_grad(): y_hat = None for i in range(slider_predict_step): if i == 0: y_hat = tcn_cpu_model(input_data_cpu[:,:300,:]) else: gd_y_hat = compute_gradient(y_hat[:, :i, :], degree=2).to('cpu') output = tcn_cpu_model(torch.concatenate([input_data_cpu[:, i:300, :], gd_y_hat], dim=1).to('cpu')) y_hat = torch.concatenate([y_hat, output], dim=1) tcn_cpu_preds = y_hat tcn_cpu_end_time = time.time() tcn_innv_preds = scaler.inverse_transform(tcn_cpu_preds.squeeze().cpu().numpy()) tcn_normal_preds = tcn_cpu_preds.squeeze().cpu().numpy() del tcn_cpu_model st.subheader("Normalized Prediction") i = 1 for each_col in st.columns(6): with each_col: raw_data = selected_data[:, i] lstm_data = [np.nan] * 300 + lstm_normal_preds[:slider_predict_step, :][:, i].tolist() tcn_data = [np.nan] * 300 + tcn_normal_preds[:, i].tolist() st.markdown(f"
{each_feature_name[i-1]}
", unsafe_allow_html=True) #st.write(np.array(raw_data).shape, np.array(lstm_data).shape, np.array(tcn_data).shape) st.line_chart(pd.DataFrame({"Original": raw_data, "LSTM": lstm_data, "TCN": tcn_data}), color=["#EE4035", "#0077BB", "#7BC043"]) i += 1 # with st.sidebar: # st.write("Predicted Shape: ", lstm_preds.shape) st.subheader("Inverse Normalized Prediction") i = 1 for each_col in st.columns(6): with each_col: raw_data = original_selected_data[:, i] lstm_data = [np.nan] * 300 + lstm_innv_preds[:slider_predict_step, :][:, i].tolist() tcn_data = [np.nan] * 300 + tcn_innv_preds[:, i].tolist() st.markdown(f"
{each_feature_name[i - 1]}
", unsafe_allow_html=True) st.line_chart(pd.DataFrame({"Original": raw_data, "LSTM": lstm_data, "TCN": tcn_data}), color=["#EE4035", "#0077BB", "#7BC043"]) i += 1 LSTM_SMAPE, LSTM_MSE, LSTM_RMSE, LSTM_MAE, LSTM_R2, LSTM_PSD = calculate_metrics(selected_data[300:300+slider_predict_step, :].cpu().numpy(), lstm_normal_preds[:slider_predict_step, :]) TCN_SMAPE, TCN_MSE, TCN_RMSE, TCN_MAE, TCN_R2, TCN_PSD = calculate_metrics(selected_data[300:300+slider_predict_step, :].cpu().numpy(), tcn_normal_preds) results_df = pd.DataFrame({ "Model": ["LSTM", "TCN"], "SMAPE": [LSTM_SMAPE, TCN_SMAPE], "MSE": [LSTM_MSE, TCN_MSE], "RMSE": [LSTM_RMSE, TCN_RMSE], "MAE": [LSTM_MAE, TCN_MAE], "R2": [LSTM_R2, TCN_R2], "PSD": [LSTM_PSD, TCN_PSD] }) time_df = pd.DataFrame({ "Model": ["LSTM-CPU", "TCN-CPU"], "Time(ms)": [(lstm_cpu_end_time - lstm_cpu_start_time)*1000, (tcn_cpu_end_time - tcn_cpu_start_time)*1000] }) col1, col2 = st.columns(2) with col1: st.subheader("Evaluation Metrics") st.write(results_df) with col2: st.subheader("Prediction Time") st.write(time_df)