Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
from sklearn.preprocessing import PowerTransformer | |
import matplotlib.pyplot as plt | |
import shap | |
import os | |
import json | |
import pickle | |
import sys | |
import warnings | |
# Suppress OpenMP warnings | |
warnings.filterwarnings("ignore", message=".*OpenMP.*") | |
# Suppress PowerTransformer feature names warning | |
warnings.filterwarnings("ignore", message=".*has feature names.*") | |
# Get the absolute path of the current file | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
# Create temp directory for plots if it doesn't exist | |
os.makedirs(os.path.join(current_dir, 'temp'), exist_ok=True) | |
# Define the model classes from 2wayembed.py | |
class FeatureEmbedding(nn.Module): | |
def __init__(self, input_dim=1, embedding_dim=32): | |
super().__init__() | |
self.embedding = nn.Sequential( | |
nn.Linear(input_dim, embedding_dim), | |
nn.ReLU(), | |
nn.Linear(embedding_dim, embedding_dim) | |
) | |
def forward(self, x): | |
return self.embedding(x) | |
class TabularTransformerWithEmbedding(nn.Module): | |
def __init__(self, num_features=6, embedding_dim=32, output_dim=1, num_attention_heads=4): | |
super().__init__() | |
self.num_features = num_features | |
self.embedding_dim = embedding_dim | |
# Create separate embedding for each feature | |
self.feature_embeddings = nn.ModuleList([ | |
FeatureEmbedding(input_dim=1, embedding_dim=embedding_dim) | |
for _ in range(num_features) | |
]) | |
# 1D Feature Attention (attention across features) | |
self.feature_attention = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_attention_heads) | |
self.feature_norm = nn.LayerNorm(embedding_dim) | |
# 1D Sample Attention (attention across samples/rows in batch) | |
self.sample_attention = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_attention_heads) | |
self.sample_norm = nn.LayerNorm(embedding_dim) | |
# Combine layer | |
self.combine_layer = nn.Linear(embedding_dim*2, embedding_dim) | |
self.combine_activation = nn.ReLU() | |
# Output layers | |
self.output_layers = nn.Sequential( | |
nn.Linear(embedding_dim, embedding_dim), | |
nn.ReLU(), | |
nn.Linear(embedding_dim, output_dim) | |
) | |
def forward(self, x): | |
# x shape: (batch_size, num_features) | |
batch_size = x.shape[0] | |
# Project each feature to embedding space | |
embedded_features = [] | |
for i in range(self.num_features): | |
# Extract single feature and project to embedding dimension | |
feature = x[:, i:i+1] # (batch_size, 1) | |
projected = self.feature_embeddings[i](feature) # (batch_size, embedding_dim) | |
embedded_features.append(projected) | |
# Stack features for attention | |
# Shape: (num_features, batch_size, embedding_dim) | |
embeddings = torch.stack(embedded_features) | |
# 1. Feature Attention (attending to features) | |
# Each feature attends to all other features | |
# Apply feature attention in multiple layers | |
feature_attended = embeddings | |
for _ in range(4): | |
# Apply attention | |
attended_layer, _ = self.feature_attention(feature_attended, feature_attended, feature_attended) | |
# Add residual connection | |
feature_attended = attended_layer + feature_attended | |
# Apply layer normalization | |
feature_attended = self.feature_norm(feature_attended) | |
# 2. Sample Attention (attending to samples) | |
# Permute to make batch dimension first for sample attention | |
# Shape: (batch_size, num_features, embedding_dim) | |
sample_input = embeddings.permute(1, 0, 2) | |
# Permute back for attention: (num_features, batch_size, embedding_dim) | |
sample_input = sample_input.permute(1, 0, 2) | |
# Apply sample attention in multiple layers | |
sample_attended = sample_input | |
for _ in range(4): | |
# Apply attention | |
attended_layer, _ = self.sample_attention(sample_attended, sample_attended, sample_attended) | |
# Add residual connection | |
sample_attended = attended_layer + sample_attended | |
# Apply layer normalization | |
sample_attended = self.sample_norm(sample_attended) | |
# Combine both attention mechanisms | |
# First, make batch dimension first for both | |
# Shape: (batch_size, num_features, embedding_dim) | |
feature_attended = feature_attended.permute(1, 0, 2) | |
sample_attended = sample_attended.permute(1, 0, 2) | |
# Mean across features to get a single vector per sample | |
# Shape: (batch_size, embedding_dim) | |
feature_pooled = feature_attended.mean(dim=1) | |
sample_pooled = sample_attended.mean(dim=1) | |
# Concatenate the two attention results | |
# Shape: (batch_size, embedding_dim*2) | |
combined = torch.cat([feature_pooled, sample_pooled], dim=1) | |
# Project back to embedding_dim | |
combined = self.combine_layer(combined) | |
combined = self.combine_activation(combined) | |
# Final output layers | |
output = self.output_layers(combined) # (batch_size, output_dim) | |
return output | |
class ShapModel: | |
def __init__(self, model): | |
self.model = model | |
def __call__(self, X): | |
with torch.no_grad(): | |
X_tensor = torch.FloatTensor(X.values if isinstance(X, pd.DataFrame) else X) | |
output = self.model(X_tensor) | |
return output.numpy() | |
def load_model_and_scalers(): | |
"""Load the model, scalers, and data""" | |
# Set paths relative to the current file | |
model_path = os.path.join(current_dir, "best_val_r2_model.pth") | |
data_path = os.path.join(current_dir, "data.xlsx") | |
scaler_x_path = os.path.join(current_dir, "scaler_X.pkl") | |
scaler_y_path = os.path.join(current_dir, "scaler_y.pkl") | |
# Load data | |
df = pd.read_excel(data_path) | |
X = df.iloc[:, 0:6] # First 6 columns for features | |
y = df.iloc[:, 6] # 7th column for target (Y) | |
feature_names = X.columns.tolist() | |
# Initialize model | |
model = TabularTransformerWithEmbedding(num_features=6, embedding_dim=32, output_dim=1, num_attention_heads=4) | |
# Load model state dict | |
state_dict = torch.load(model_path) | |
# Remove feature_weights if present in the state dict but not in the model | |
if 'feature_weights' in state_dict and not hasattr(model, 'feature_weights'): | |
del state_dict['feature_weights'] | |
# Load the state dict with strict=False to allow missing keys | |
model.load_state_dict(state_dict, strict=False) | |
model.eval() | |
# Load saved scalers with error handling | |
try: | |
with open(scaler_x_path, 'rb') as f: | |
scaler_X = pickle.load(f) | |
with open(scaler_y_path, 'rb') as f: | |
scaler_y = pickle.load(f) | |
except (FileNotFoundError, pickle.UnpicklingError) as e: | |
# If saved scalers not found or unpickling error, create new ones | |
st.warning(f"Issue with saved scalers: {str(e)}. Creating new scalers.") | |
scaler_X = PowerTransformer(method='yeo-johnson') | |
scaler_y = PowerTransformer(method='yeo-johnson') | |
# Fit scalers | |
scaler_X.fit(X) | |
scaler_y.fit(y.values.reshape(-1, 1)) | |
# Save the new scalers | |
with open(scaler_x_path, 'wb') as f: | |
pickle.dump(scaler_X, f) | |
with open(scaler_y_path, 'wb') as f: | |
pickle.dump(scaler_y, f) | |
# Save feature names for later use | |
with open(os.path.join(current_dir, 'feature_names.json'), 'w') as f: | |
json.dump(feature_names, f) | |
return model, scaler_X, scaler_y, feature_names, X | |
def explain_prediction(model, input_df, X_background, scaler_X, scaler_y, feature_names): | |
"""Generate SHAP explanation for a prediction""" | |
try: | |
# Create a prediction function for SHAP | |
def predict_fn(X): | |
try: | |
# Convert to numpy array if it's a DataFrame to avoid feature names warning | |
X_array = X.values if isinstance(X, pd.DataFrame) else X | |
X_tensor = torch.FloatTensor(scaler_X.transform(X_array)) | |
with torch.no_grad(): | |
scaled_pred = model(X_tensor).numpy() | |
return scaler_y.inverse_transform(scaled_pred) | |
except Exception as e: | |
st.error(f"Error in prediction function: {str(e)}") | |
# Return zeros as fallback | |
return np.zeros((X_array.shape[0], 1)) | |
# Create a ShapModel instance | |
shap_model = ShapModel(model) | |
# Calculate SHAP values | |
background = shap.kmeans(X_background.values, 10) | |
explainer = shap.KernelExplainer(predict_fn, background) | |
# Get SHAP values for the input | |
# Convert to numpy array to avoid feature names warning | |
input_array = input_df.values | |
shap_values = explainer.shap_values(input_array) | |
# Handle different SHAP value formats | |
if isinstance(shap_values, list): | |
shap_values = np.array(shap_values[0]) | |
# Ensure correct shape for waterfall plot | |
if len(shap_values.shape) > 1: | |
if shap_values.shape[0] == len(feature_names): | |
shap_values = shap_values.T | |
shap_values = shap_values.flatten() | |
# Create waterfall plot | |
plt.figure(figsize=(10, 6)) | |
shap.plots.waterfall( | |
shap.Explanation( | |
values=shap_values, | |
base_values=explainer.expected_value if np.isscalar(explainer.expected_value) | |
else explainer.expected_value[0], | |
data=input_df.iloc[0].values, | |
feature_names=feature_names | |
), | |
show=False | |
) | |
plt.title('Feature Contributions to Prediction') | |
plt.tight_layout() | |
# Save the plot to a temporary file | |
temp_dir = os.path.join(current_dir, 'temp') | |
os.makedirs(temp_dir, exist_ok=True) | |
temp_file = os.path.join(temp_dir, 'shap_explanation.png') | |
plt.savefig(temp_file, dpi=300, bbox_inches='tight') | |
plt.close() | |
return explainer.expected_value, shap_values, temp_file | |
except Exception as e: | |
st.error(f"Error generating explanation: {str(e)}") | |
return 0, np.zeros(len(feature_names)), None | |
def model_predict(model, input_df, scaler_X, scaler_y): | |
"""Make a prediction using the model""" | |
try: | |
# Scale input data | |
# Convert DataFrame to numpy array before transformation to avoid feature names warning | |
X_scaled = scaler_X.transform(input_df.values) | |
X_tensor = torch.FloatTensor(X_scaled) | |
# Make prediction | |
with torch.no_grad(): | |
scaled_pred = model(X_tensor).numpy() | |
# Inverse transform to get original scale prediction | |
prediction = scaler_y.inverse_transform(scaled_pred) | |
return prediction.flatten() | |
except Exception as e: | |
st.error(f"Error making prediction: {str(e)}") | |
# Return a default value in case of error | |
return np.array([0.0]) | |
# Set page title and description | |
st.set_page_config( | |
page_title="Soil Resistivity Predictor", | |
page_icon="🧪", | |
layout="wide" | |
) | |
st.title("Soil Resistivity Prediction Tool") | |
st.markdown(""" | |
This application predicts soil resistivity based on various soil properties using a deep learning model. | |
Enter the soil properties below and click the 'Predict Resistivity' button to get a prediction. | |
""") | |
# Ensure temp directory exists | |
temp_dir = os.path.join(current_dir, 'temp') | |
os.makedirs(temp_dir, exist_ok=True) | |
# Add a session state to track if this is the first run | |
if 'first_run' not in st.session_state: | |
st.session_state.first_run = True | |
# Clear any existing temp files on first run | |
for file in os.listdir(temp_dir): | |
if file.endswith('.png'): | |
try: | |
os.remove(os.path.join(temp_dir, file)) | |
except: | |
pass | |
# Load model and scalers | |
try: | |
model, scaler_X, scaler_y, feature_names, X = load_model_and_scalers() | |
# Create input fields for features | |
st.subheader("Input Features") | |
# Create two columns for input fields | |
col1, col2 = st.columns(2) | |
# Dictionary to store input values | |
input_values = {} | |
# Create input fields split between two columns | |
for i, feature in enumerate(feature_names): | |
# Get min and max values for each feature | |
min_val = float(X[feature].min()) | |
max_val = float(X[feature].max()) | |
# Add input field to alternating columns | |
with col1 if i < len(feature_names)//2 else col2: | |
# Use session state to maintain values between reruns | |
if f'input_{feature}' not in st.session_state: | |
st.session_state[f'input_{feature}'] = float(X[feature].mean()) | |
input_values[feature] = st.number_input( | |
f"{feature}", | |
min_value=float(min_val * 0.9), # Allow slightly below min | |
max_value=float(max_val * 1.1), # Allow slightly above max | |
value=st.session_state[f'input_{feature}'], | |
key=f'input_widget_{feature}', | |
help=f"Range: {min_val:.2f} to {max_val:.2f}" | |
) | |
# Update session state with current value | |
st.session_state[f'input_{feature}'] = input_values[feature] | |
# Add predict button | |
if st.button("Predict Resistivity", type="primary"): | |
try: | |
# Create input DataFrame | |
input_df = pd.DataFrame([input_values]) | |
# Make prediction | |
with st.spinner("Calculating prediction..."): | |
prediction = model_predict(model, input_df, scaler_X, scaler_y) | |
# Display prediction | |
st.subheader("Prediction Result") | |
st.markdown(f"### Predicted Resistivity: {prediction[0]:.2f} Ω·m") | |
# Calculate and display SHAP values | |
with st.spinner("Generating explanation..."): | |
st.subheader("Feature Importance Explanation") | |
# Get SHAP values using the training data as background | |
expected_value, shap_values, temp_file = explain_prediction( | |
model, input_df, X, scaler_X, scaler_y, feature_names | |
) | |
# Display the waterfall plot | |
if temp_file and os.path.exists(temp_file): | |
try: | |
st.image(temp_file) | |
except Exception as img_error: | |
st.error(f"Error displaying SHAP explanation image: {str(img_error)}") | |
else: | |
st.warning("Could not generate SHAP explanation plot.") | |
except Exception as pred_error: | |
st.error(f"Error during prediction process: {str(pred_error)}") | |
st.exception(pred_error) | |
except Exception as e: | |
st.error(f""" | |
Error loading the model and data. Please make sure: | |
1. The model file 'best_val_r2_model.pth' exists in the application directory | |
2. The data file 'data.xlsx' exists in the application directory | |
3. The scaler files 'scaler_X.pkl' and 'scaler_y.pkl' exist in the application directory | |
4. All required packages are installed | |
Error details: {str(e)} | |
""") | |
# Show detailed error information | |
st.exception(e) |