Streamlit_LSTM / app.py
CJRobert's picture
Update app.py
3bd93ea verified
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
# Fix random seed for reproducibility
tf.random.set_seed(7)
# Function to create the dataset matrix
def create_dataset(dataset, look_back=1):
dataX, dataY = [], []
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), 0]
dataX.append(a)
dataY.append(dataset[i + look_back, 0])
return np.array(dataX), np.array(dataY)
def lstm_prediction(file, epochs):
# Load the dataset
dataframe = pd.read_csv(file, usecols=[1], engine='python', encoding="big5")
dataset = dataframe.values
dataset = dataset.astype('float32')
# Normalize the dataset
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)
# Split into train and test sets
train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:]
# Create the dataset matrix
look_back = 1
trainX, trainY = create_dataset(train, look_back)
testX, testY = create_dataset(test, look_back)
# Reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1]))
testX = np.reshape(testX, (testX.shape[0], 1, testX.shape[1]))
# Create and fit the LSTM network
model = Sequential()
model.add(LSTM(4, input_shape=(1, look_back)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
# Set up a callback to update Streamlit during training
class StreamlitCallback(tf.keras.callbacks.Callback):
def __init__(self):
super().__init__()
self.epoch_bar = st.progress(0)
self.loss_placeholder = st.empty()
def on_epoch_end(self, epoch, logs=None):
self.epoch_bar.progress((epoch + 1) / epochs)
self.loss_placeholder.text(f'Epoch {epoch + 1}/{epochs}, Loss: {logs["loss"]:.4f}')
# Fit the model
model.fit(trainX, trainY, epochs=epochs, batch_size=1, verbose=0, callbacks=[StreamlitCallback()])
# Make predictions
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)
# Invert predictions
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform([trainY])
testPredict = scaler.inverse_transform(testPredict)
testY = scaler.inverse_transform([testY])
# Calculate root mean squared error
trainScore = np.sqrt(mean_squared_error(trainY[0], trainPredict[:,0]))
testScore = np.sqrt(mean_squared_error(testY[0], testPredict[:,0]))
# Prepare the plot
trainPredictPlot = np.empty_like(dataset)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
testPredictPlot = np.empty_like(dataset)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(trainPredict)+(look_back*2)+1:len(dataset)-1, :] = testPredict
plt.figure(figsize=(12, 8))
plt.plot(scaler.inverse_transform(dataset), label='Original Data', color='blue')
plt.plot(trainPredictPlot, label='Training Predictions', linestyle='--', color='green')
plt.plot(testPredictPlot, label='Test Predictions', linestyle='--', color='red')
plt.xlabel('Time')
plt.ylabel('Scaled Values')
plt.title('Original Data and Predictions')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
# Show the plot
st.pyplot(plt)
return trainScore, testScore
st.title("LSTM Time Series Prediction")
st.write("Upload a CSV file with time series data for prediction.")
# File uploader
uploaded_file = st.file_uploader("Choose a CSV file", type="csv")
# Number input for epochs
epochs = st.number_input("Enter number of epochs", min_value=1, max_value=1000, value=50, step=1)
if uploaded_file is not None:
train_score, test_score = lstm_prediction(uploaded_file, epochs)
st.write(f'Train Score: {train_score:.2f} RMSE')
st.write(f'Test Score: {test_score:.2f} RMSE')