|
import streamlit as st |
|
import os |
|
import numpy as np |
|
import pandas as pd |
|
from sklearn.preprocessing import MinMaxScaler |
|
import torch |
|
import time |
|
|
|
from utils.transform import compute_gradient |
|
from model.lstm import LSTMModel |
|
from model.tcn import TCNModel |
|
from model.tcn import move_custom_layers_to_device |
|
from utils.metrics import calculate_metrics |
|
|
|
each_feature_name = ["q_1","q_2","q_3","p_1","p_2","p_3"] |
|
|
|
def uniform_sampling(data, n_sample): |
|
k = len(data) // n_sample |
|
return data[::k] |
|
|
|
st.set_page_config(page_title="Prediction", page_icon=":chart_with_upwards_trend:", layout="wide", initial_sidebar_state="auto") |
|
|
|
|
|
|
|
with st.sidebar: |
|
slider_predict_step = st.slider('Predicted Step', 0, 20, 20) |
|
|
|
number_input_sample_id = st.number_input("Select Sample ID 1~10", value=1, placeholder="Type a number...", min_value=1, max_value=10, step=1) |
|
|
|
squences_start_idx = st.slider('Squences Start Index', 0, 700 - slider_predict_step, 0) |
|
|
|
st.subheader("Model Configuration") |
|
st.write("LSTM Window Size: ", 200) |
|
st.write("TCN Window Size: ", 300) |
|
st.write("Predicted Step: ", slider_predict_step) |
|
st.write("Feature Augmentation: ", "Second-order derivative") |
|
|
|
file_path = os.path.join("data", "file"+str(number_input_sample_id)+".dat.npz") |
|
data = pd.DataFrame(np.load(file_path)['data']) |
|
scaler = MinMaxScaler() |
|
uniform_data = uniform_sampling(data, n_sample=1000).sort_index().values[:, 1:8] |
|
normal_uniform_data = scaler.fit_transform(uniform_data) |
|
data_sequences = torch.tensor(np.stack(normal_uniform_data)).float() |
|
original_data_sequences = torch.tensor(np.stack(uniform_data)).float() |
|
selected_data = data_sequences[squences_start_idx:squences_start_idx+300+slider_predict_step] |
|
original_selected_data = original_data_sequences[squences_start_idx:squences_start_idx+300+slider_predict_step] |
|
input_data = torch.stack([compute_gradient(i, degree=2) for i in selected_data]).unsqueeze(0) |
|
|
|
with st.sidebar: |
|
st.subheader("Data Configuration") |
|
st.write("Sample ID: ", number_input_sample_id) |
|
|
|
st.write("Squences Start Index: ", squences_start_idx) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
lstm_cpu_ckpt_file = os.path.join("model", "lstm.ckpt") |
|
lstm_cpu_model = LSTMModel.load_from_checkpoint(lstm_cpu_ckpt_file) |
|
lstm_cpu_model.to("cpu") |
|
lstm_cpu_model.eval() |
|
lstm_cpu_start_time = time.time() |
|
with torch.no_grad(): |
|
lstm_cpu_preds = lstm_cpu_model(input_data[:, 100:300, :]) |
|
lstm_cpu_end_time = time.time() |
|
lstm_innv_preds = scaler.inverse_transform(lstm_cpu_preds.squeeze().cpu().numpy()) |
|
lstm_normal_preds = lstm_cpu_preds.squeeze().cpu().numpy() |
|
|
|
del lstm_cpu_model |
|
|
|
|
|
|
|
|
|
input_data_cpu = input_data.to("cpu") |
|
tcn_cpu_ckpt_file = os.path.join("model", "tcn.ckpt") |
|
tcn_cpu_model = TCNModel.load_from_checkpoint(tcn_cpu_ckpt_file) |
|
move_custom_layers_to_device(tcn_cpu_model, "cpu") |
|
tcn_cpu_model.eval() |
|
tcn_cpu_start_time = time.time() |
|
with torch.no_grad(): |
|
y_hat = None |
|
for i in range(slider_predict_step): |
|
if i == 0: |
|
y_hat = tcn_cpu_model(input_data_cpu[:,:300,:]) |
|
else: |
|
gd_y_hat = compute_gradient(y_hat[:, :i, :], degree=2).to('cpu') |
|
output = tcn_cpu_model(torch.concatenate([input_data_cpu[:, i:300, :], gd_y_hat], dim=1).to('cpu')) |
|
y_hat = torch.concatenate([y_hat, output], dim=1) |
|
tcn_cpu_preds = y_hat |
|
tcn_cpu_end_time = time.time() |
|
tcn_innv_preds = scaler.inverse_transform(tcn_cpu_preds.squeeze().cpu().numpy()) |
|
tcn_normal_preds = tcn_cpu_preds.squeeze().cpu().numpy() |
|
|
|
del tcn_cpu_model |
|
|
|
st.subheader("Normalized Prediction") |
|
|
|
i = 1 |
|
for each_col in st.columns(6): |
|
with each_col: |
|
raw_data = selected_data[:, i] |
|
lstm_data = [np.nan] * 300 + lstm_normal_preds[:slider_predict_step, :][:, i].tolist() |
|
tcn_data = [np.nan] * 300 + tcn_normal_preds[:, i].tolist() |
|
st.markdown(f"<div style='text-align: center'>{each_feature_name[i-1]}</div>", unsafe_allow_html=True) |
|
|
|
st.line_chart(pd.DataFrame({"Original": raw_data, "LSTM": lstm_data, "TCN": tcn_data}), |
|
color=["#EE4035", "#0077BB", "#7BC043"]) |
|
i += 1 |
|
|
|
|
|
|
|
|
|
st.subheader("Inverse Normalized Prediction") |
|
|
|
i = 1 |
|
for each_col in st.columns(6): |
|
with each_col: |
|
raw_data = original_selected_data[:, i] |
|
lstm_data = [np.nan] * 300 + lstm_innv_preds[:slider_predict_step, :][:, i].tolist() |
|
tcn_data = [np.nan] * 300 + tcn_innv_preds[:, i].tolist() |
|
st.markdown(f"<div style='text-align: center'>{each_feature_name[i - 1]}</div>", unsafe_allow_html=True) |
|
st.line_chart(pd.DataFrame({"Original": raw_data, "LSTM": lstm_data, "TCN": tcn_data}), |
|
color=["#EE4035", "#0077BB", "#7BC043"]) |
|
i += 1 |
|
|
|
LSTM_SMAPE, LSTM_MSE, LSTM_RMSE, LSTM_MAE, LSTM_R2, LSTM_PSD = calculate_metrics(selected_data[300:300+slider_predict_step, :].cpu().numpy(), lstm_normal_preds[:slider_predict_step, :]) |
|
TCN_SMAPE, TCN_MSE, TCN_RMSE, TCN_MAE, TCN_R2, TCN_PSD = calculate_metrics(selected_data[300:300+slider_predict_step, :].cpu().numpy(), tcn_normal_preds) |
|
|
|
results_df = pd.DataFrame({ |
|
"Model": ["LSTM", "TCN"], |
|
"SMAPE": [LSTM_SMAPE, TCN_SMAPE], |
|
"MSE": [LSTM_MSE, TCN_MSE], |
|
"RMSE": [LSTM_RMSE, TCN_RMSE], |
|
"MAE": [LSTM_MAE, TCN_MAE], |
|
"R2": [LSTM_R2, TCN_R2], |
|
"PSD": [LSTM_PSD, TCN_PSD] |
|
}) |
|
|
|
time_df = pd.DataFrame({ |
|
"Model": ["LSTM-CPU", "TCN-CPU"], |
|
"Time(ms)": [(lstm_cpu_end_time - lstm_cpu_start_time)*1000, |
|
(tcn_cpu_end_time - tcn_cpu_start_time)*1000] |
|
}) |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.subheader("Evaluation Metrics") |
|
st.write(results_df) |
|
|
|
with col2: |
|
st.subheader("Prediction Time") |
|
st.write(time_df) |