|
import os |
|
|
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
|
os.environ['OMP_NUM_THREADS'] = '1' |
|
os.environ['OPENBLAS_NUM_THREADS'] = '1' |
|
os.environ['MKL_NUM_THREADS'] = '1' |
|
os.environ['VECLIB_MAXIMUM_THREADS'] = '1' |
|
os.environ['NUMEXPR_NUM_THREADS'] = '1' |
|
|
|
import streamlit as st |
|
import torch |
|
import numpy as np |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import shap |
|
from sklearn.preprocessing import MinMaxScaler |
|
import plotly.graph_objects as go |
|
import io |
|
from matplotlib.figure import Figure |
|
|
|
|
|
st.set_page_config( |
|
page_title="Waste Properties Predictor", |
|
page_icon="π", |
|
layout="wide" |
|
) |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
.stApp { |
|
max-width: 1200px; |
|
margin: 0 auto; |
|
} |
|
.main { |
|
padding: 2rem; |
|
} |
|
.stButton>button { |
|
width: 100%; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
class Net(torch.nn.Module): |
|
def __init__(self, input_size): |
|
super(Net, self).__init__() |
|
self.fc1 = torch.nn.Linear(input_size, 64) |
|
self.fc2 = torch.nn.Linear(64, 1000) |
|
self.fc3 = torch.nn.Linear(1000, 200) |
|
self.fc4 = torch.nn.Linear(200, 8) |
|
self.fc5 = torch.nn.Linear(8, 1) |
|
self.dropout = torch.nn.Dropout(0.2) |
|
|
|
|
|
self.apply(self._init_weights) |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, torch.nn.Linear): |
|
torch.nn.init.xavier_uniform_(module.weight) |
|
if module.bias is not None: |
|
module.bias.data.zero_() |
|
|
|
def forward(self, x): |
|
x = torch.nn.functional.relu(self.fc1(x)) |
|
x = self.dropout(x) |
|
x = torch.nn.functional.relu(self.fc2(x)) |
|
x = self.dropout(x) |
|
x = torch.nn.functional.relu(self.fc3(x)) |
|
x = self.dropout(x) |
|
x = torch.nn.functional.relu(self.fc4(x)) |
|
x = self.dropout(x) |
|
x = self.fc5(x) |
|
return x |
|
|
|
@st.cache_resource |
|
def load_model_and_data(): |
|
|
|
np.random.seed(32) |
|
torch.manual_seed(42) |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
data = pd.read_excel("Data_syw.xlsx") |
|
X = data.iloc[:, list(range(1, 17)) + list(range(21, 23))] |
|
|
|
|
|
y_friction = data.iloc[:, 28].values |
|
correlation_with_friction = abs(X.corrwith(pd.Series(y_friction))) |
|
selected_features_friction = correlation_with_friction[correlation_with_friction > 0.1].index |
|
X_friction = X[selected_features_friction] |
|
|
|
|
|
y_cohesion = data.iloc[:, 25].values |
|
correlation_with_cohesion = abs(X.corrwith(pd.Series(y_cohesion))) |
|
selected_features_cohesion = correlation_with_cohesion[correlation_with_cohesion > 0.1].index |
|
X_cohesion = X[selected_features_cohesion] |
|
|
|
|
|
scaler_X_friction = MinMaxScaler() |
|
scaler_y_friction = MinMaxScaler() |
|
scaler_X_friction.fit(X_friction) |
|
scaler_y_friction.fit(y_friction.reshape(-1, 1)) |
|
|
|
|
|
scaler_X_cohesion = MinMaxScaler() |
|
scaler_y_cohesion = MinMaxScaler() |
|
scaler_X_cohesion.fit(X_cohesion) |
|
scaler_y_cohesion.fit(y_cohesion.reshape(-1, 1)) |
|
|
|
|
|
friction_model = Net(input_size=len(selected_features_friction)).to(device) |
|
friction_model.load_state_dict(torch.load('friction_model.pt')) |
|
friction_model.eval() |
|
|
|
cohesion_model = Net(input_size=len(selected_features_cohesion)).to(device) |
|
cohesion_model.load_state_dict(torch.load('cohesion_model.pt')) |
|
cohesion_model.eval() |
|
|
|
return (friction_model, X_friction.columns, scaler_X_friction, scaler_y_friction, |
|
cohesion_model, X_cohesion.columns, scaler_X_cohesion, scaler_y_cohesion, |
|
device, X_friction, X_cohesion) |
|
|
|
def predict_friction(input_values, model, scaler_X, scaler_y, device): |
|
|
|
input_scaled = scaler_X.transform(input_values) |
|
input_tensor = torch.FloatTensor(input_scaled).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
prediction_scaled = model(input_tensor) |
|
prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) |
|
|
|
return prediction[0][0] |
|
|
|
def predict_cohesion(input_values, model, scaler_X, scaler_y, device): |
|
|
|
input_scaled = scaler_X.transform(input_values) |
|
input_tensor = torch.FloatTensor(input_scaled).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
prediction_scaled = model(input_tensor) |
|
prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) |
|
|
|
return prediction[0][0] |
|
|
|
def calculate_shap_values(input_values, model, X, scaler_X, scaler_y, device): |
|
def model_predict(X): |
|
X_scaled = scaler_X.transform(X) |
|
X_tensor = torch.FloatTensor(X_scaled).to(device) |
|
with torch.no_grad(): |
|
scaled_pred = model(X_tensor).cpu().numpy() |
|
return scaler_y.inverse_transform(scaled_pred.reshape(-1, 1)).flatten() |
|
|
|
try: |
|
|
|
np.random.seed(42) |
|
|
|
|
|
|
|
n_samples = min(50, len(X)) |
|
background_indices = np.random.choice(len(X), size=n_samples, replace=False) |
|
background = X.iloc[background_indices].values |
|
|
|
|
|
explainer = shap.KernelExplainer(model_predict, background) |
|
shap_values = explainer.shap_values(input_values.values, nsamples=200) |
|
|
|
if isinstance(shap_values, list): |
|
shap_values = np.array(shap_values[0]) |
|
|
|
return shap_values[0], explainer.expected_value |
|
except Exception as e: |
|
st.error(f"Error calculating SHAP values: {str(e)}") |
|
return np.zeros(len(input_values.columns)), 0.0 |
|
|
|
@st.cache_resource |
|
def create_background_data(X, n_samples=50): |
|
"""Create and cache background data for SHAP calculations""" |
|
np.random.seed(42) |
|
|
|
n_samples = min(n_samples, len(X)) |
|
background_indices = np.random.choice(len(X), size=n_samples, replace=False) |
|
return X.iloc[background_indices].values |
|
|
|
def create_waterfall_plot(shap_values, feature_names, base_value, input_data, title): |
|
|
|
explanation = shap.Explanation( |
|
values=shap_values, |
|
base_values=base_value, |
|
data=input_data, |
|
feature_names=list(feature_names) |
|
) |
|
|
|
|
|
fig = plt.figure(figsize=(12, 8)) |
|
shap.plots.waterfall(explanation, show=False) |
|
plt.title(f'{title} - Local SHAP Value Contributions') |
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', bbox_inches='tight', dpi=300) |
|
plt.close(fig) |
|
buf.seek(0) |
|
return buf |
|
|
|
def main(): |
|
st.title("π Waste Properties Predictor") |
|
st.write("This app predicts both friction angle and cohesion based on waste composition and characteristics.") |
|
|
|
try: |
|
|
|
(friction_model, friction_features, scaler_X_friction, scaler_y_friction, |
|
cohesion_model, cohesion_features, scaler_X_cohesion, scaler_y_cohesion, |
|
device, X_friction, X_cohesion) = load_model_and_data() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_features = sorted(list(set(friction_features) | set(cohesion_features))) |
|
|
|
st.header("Input Parameters") |
|
|
|
|
|
uploaded_file = st.file_uploader("Upload Excel file with input values", type=['xlsx', 'xls']) |
|
|
|
|
|
input_values = {} |
|
|
|
|
|
default_data = pd.read_excel("Data_syw.xlsx") |
|
if len(default_data) > 0: |
|
for feature in all_features: |
|
if feature in default_data.columns: |
|
input_values[feature] = float(default_data[feature].iloc[0]) |
|
|
|
|
|
if uploaded_file is not None: |
|
try: |
|
|
|
df = pd.read_excel(uploaded_file) |
|
if len(df) > 0: |
|
|
|
for feature in all_features: |
|
if feature in df.columns: |
|
input_values[feature] = float(df[feature].iloc[0]) |
|
except Exception as e: |
|
st.error(f"Error reading file: {str(e)}") |
|
|
|
st.write("Enter the waste composition and characteristics below to predict both friction angle and cohesion.") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
for i, feature in enumerate(all_features): |
|
with col1 if i < len(all_features)//2 else col2: |
|
|
|
if feature in X_friction.columns and feature in X_cohesion.columns: |
|
min_val = min(float(X_friction[feature].min()), float(X_cohesion[feature].min())) |
|
max_val = max(float(X_friction[feature].max()), float(X_cohesion[feature].max())) |
|
elif feature in X_friction.columns: |
|
min_val = float(X_friction[feature].min()) |
|
max_val = float(X_friction[feature].max()) |
|
else: |
|
min_val = float(X_cohesion[feature].min()) |
|
max_val = float(X_cohesion[feature].max()) |
|
|
|
|
|
default_value = input_values.get(feature, 0.0) |
|
|
|
input_values[feature] = st.number_input( |
|
f"{feature}", |
|
min_value=min_val, |
|
max_value=max_val, |
|
value=default_value, |
|
help=f"Range: {min_val:.2f} to {max_val:.2f}" |
|
) |
|
|
|
|
|
friction_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in friction_features]], |
|
columns=friction_features) |
|
cohesion_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in cohesion_features]], |
|
columns=cohesion_features) |
|
|
|
if st.button("Predict Properties"): |
|
with st.spinner("Calculating predictions and SHAP values..."): |
|
|
|
friction_prediction = predict_friction(friction_input_df, friction_model, scaler_X_friction, scaler_y_friction, device) |
|
cohesion_prediction = predict_cohesion(cohesion_input_df, cohesion_model, scaler_X_cohesion, scaler_y_cohesion, device) |
|
|
|
|
|
np.random.seed(42) |
|
torch.manual_seed(42) |
|
if torch.cuda.is_available(): |
|
torch.cuda.manual_seed(42) |
|
|
|
|
|
friction_shap_values, friction_base_value = calculate_shap_values(friction_input_df, friction_model, X_friction, scaler_X_friction, scaler_y_friction, device) |
|
cohesion_shap_values, cohesion_base_value = calculate_shap_values(cohesion_input_df, cohesion_model, X_cohesion, scaler_X_cohesion, scaler_y_cohesion, device) |
|
|
|
|
|
st.header("Prediction Results") |
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.metric("Friction Angle", f"{friction_prediction:.2f}Β°") |
|
|
|
with col2: |
|
st.metric("Cohesion", f"{cohesion_prediction:.2f} kPa") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.subheader("Friction Angle SHAP Analysis") |
|
friction_waterfall_plot = create_waterfall_plot( |
|
shap_values=friction_shap_values, |
|
feature_names=friction_features, |
|
base_value=friction_base_value, |
|
input_data=friction_input_df.values[0], |
|
title="Friction Angle" |
|
) |
|
st.image(friction_waterfall_plot) |
|
|
|
with col2: |
|
st.subheader("Cohesion SHAP Analysis") |
|
cohesion_waterfall_plot = create_waterfall_plot( |
|
shap_values=cohesion_shap_values, |
|
feature_names=cohesion_features, |
|
base_value=cohesion_base_value, |
|
input_data=cohesion_input_df.values[0], |
|
title="Cohesion" |
|
) |
|
st.image(cohesion_waterfall_plot) |
|
|
|
except Exception as e: |
|
st.error(f"An error occurred: {str(e)}") |
|
st.info("Please try refreshing the page. If the error persists, contact support.") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|