Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.metrics import mean_squared_error | |
| import tensorflow as tf | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import Dense, LSTM | |
| # Fix random seed for reproducibility | |
| tf.random.set_seed(7) | |
| # Function to create the dataset matrix | |
| def create_dataset(dataset, look_back=1): | |
| dataX, dataY = [], [] | |
| for i in range(len(dataset)-look_back-1): | |
| a = dataset[i:(i+look_back), 0] | |
| dataX.append(a) | |
| dataY.append(dataset[i + look_back, 0]) | |
| return np.array(dataX), np.array(dataY) | |
| def lstm_prediction(file, epochs): | |
| # Load the dataset | |
| dataframe = pd.read_csv(file, usecols=[1], engine='python', encoding="big5") | |
| dataset = dataframe.values | |
| dataset = dataset.astype('float32') | |
| # Normalize the dataset | |
| scaler = MinMaxScaler(feature_range=(0, 1)) | |
| dataset = scaler.fit_transform(dataset) | |
| # Split into train and test sets | |
| train_size = int(len(dataset) * 0.8) | |
| test_size = len(dataset) - train_size | |
| train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:] | |
| # Create the dataset matrix | |
| look_back = 1 | |
| trainX, trainY = create_dataset(train, look_back) | |
| testX, testY = create_dataset(test, look_back) | |
| # Reshape input to be [samples, time steps, features] | |
| trainX = np.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1])) | |
| testX = np.reshape(testX, (testX.shape[0], 1, testX.shape[1])) | |
| # Create and fit the LSTM network | |
| model = Sequential() | |
| model.add(LSTM(4, input_shape=(1, look_back))) | |
| model.add(Dense(1)) | |
| model.compile(loss='mean_squared_error', optimizer='adam') | |
| # Set up a callback to update Streamlit during training | |
| class StreamlitCallback(tf.keras.callbacks.Callback): | |
| def __init__(self): | |
| super().__init__() | |
| self.epoch_bar = st.progress(0) | |
| self.loss_placeholder = st.empty() | |
| def on_epoch_end(self, epoch, logs=None): | |
| self.epoch_bar.progress((epoch + 1) / epochs) | |
| self.loss_placeholder.text(f'Epoch {epoch + 1}/{epochs}, Loss: {logs["loss"]:.4f}') | |
| # Fit the model | |
| model.fit(trainX, trainY, epochs=epochs, batch_size=1, verbose=0, callbacks=[StreamlitCallback()]) | |
| # Make predictions | |
| trainPredict = model.predict(trainX) | |
| testPredict = model.predict(testX) | |
| # Invert predictions | |
| trainPredict = scaler.inverse_transform(trainPredict) | |
| trainY = scaler.inverse_transform([trainY]) | |
| testPredict = scaler.inverse_transform(testPredict) | |
| testY = scaler.inverse_transform([testY]) | |
| # Calculate root mean squared error | |
| trainScore = np.sqrt(mean_squared_error(trainY[0], trainPredict[:,0])) | |
| testScore = np.sqrt(mean_squared_error(testY[0], testPredict[:,0])) | |
| # Prepare the plot | |
| trainPredictPlot = np.empty_like(dataset) | |
| trainPredictPlot[:, :] = np.nan | |
| trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict | |
| testPredictPlot = np.empty_like(dataset) | |
| testPredictPlot[:, :] = np.nan | |
| testPredictPlot[len(trainPredict)+(look_back*2)+1:len(dataset)-1, :] = testPredict | |
| plt.figure(figsize=(12, 8)) | |
| plt.plot(scaler.inverse_transform(dataset), label='Original Data', color='blue') | |
| plt.plot(trainPredictPlot, label='Training Predictions', linestyle='--', color='green') | |
| plt.plot(testPredictPlot, label='Test Predictions', linestyle='--', color='red') | |
| plt.xlabel('Time') | |
| plt.ylabel('Scaled Values') | |
| plt.title('Original Data and Predictions') | |
| plt.legend() | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| # Show the plot | |
| st.pyplot(plt) | |
| return trainScore, testScore | |
| st.title("LSTM Time Series Prediction") | |
| st.write("Upload a CSV file with time series data for prediction.") | |
| # File uploader | |
| uploaded_file = st.file_uploader("Choose a CSV file", type="csv") | |
| # Number input for epochs | |
| epochs = st.number_input("Enter number of epochs", min_value=1, max_value=1000, value=50, step=1) | |
| if uploaded_file is not None: | |
| train_score, test_score = lstm_prediction(uploaded_file, epochs) | |
| st.write(f'Train Score: {train_score:.2f} RMSE') | |
| st.write(f'Test Score: {test_score:.2f} RMSE') | |