SissiFeng's picture
Upload app.py
c131d7d verified
"""
AutoEIS Hugging Face Space Application
Optimized for limited resources with workflow integration
"""
import gradio as gr
import pandas as pd
import numpy as np
import base64
import json
import requests
import io
import os
import psutil
import gc
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
import traceback
import asyncio
import aiohttp
from urllib.parse import parse_qs, urlparse
# Import AutoEIS with error handling
try:
import autoeis as ae
except ImportError as e:
print(f"Warning: AutoEIS import issue: {e}")
ae = None
# Memory monitoring
def get_memory_usage():
"""Get current memory usage in MB"""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
def check_memory_available():
"""Check if enough memory is available"""
memory_mb = get_memory_usage()
available_mb = psutil.virtual_memory().available / 1024 / 1024
return available_mb > 500 # Need at least 500MB free
# Global variables for workflow integration
workflow_context = {
"workflow_id": None,
"node_id": None,
"callback_url": None,
"auth_token": None
}
# Optimized parameters for HF Spaces
HF_OPTIMIZED_PARAMS = {
"iters": 30, # Increased for better circuit detection
"complexity": 10, # Increased for better circuit detection
"generations": 20, # Increased for better circuit detection
"population_size": 50, # Keep moderate for memory
"tol": 1e-3, # Tighter tolerance for better fits
"parallel": True, # Enable parallel processing
"terminals": "RLP", # R: resistor, L: inductor, P: constant-phase element
"seed": 42 # Random seed for reproducibility
}
def parse_workflow_params(request: gr.Request) -> Dict[str, Any]:
"""Parse workflow parameters from URL or headers"""
params = {}
# Try to get params from URL query string
if request and hasattr(request, 'query_params'):
query_params = dict(request.query_params)
if 'params' in query_params:
try:
encoded_params = query_params['params']
decoded = base64.b64decode(encoded_params)
params = json.loads(decoded)
except Exception as e:
print(f"Error parsing URL params: {e}")
return params
def decode_csv_data(encoded_data: str) -> pd.DataFrame:
"""Decode base64 CSV data to DataFrame"""
try:
csv_bytes = base64.b64decode(encoded_data)
csv_string = csv_bytes.decode('utf-8')
df = pd.read_csv(io.StringIO(csv_string))
return df
except Exception as e:
print(f"Error decoding CSV: {e}")
return None
async def send_callback(results: Dict[str, Any]) -> bool:
"""Send results back to workflow system"""
if not workflow_context["callback_url"]:
return False
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {workflow_context['auth_token']}"
}
payload = {
"workflow_id": workflow_context["workflow_id"],
"node_id": workflow_context["node_id"],
"status": "completed",
"results": results,
"analysis_timestamp": datetime.utcnow().isoformat() + "Z"
}
async with aiohttp.ClientSession() as session:
async with session.post(
workflow_context["callback_url"],
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
return response.status == 200
except Exception as e:
print(f"Callback error: {e}")
return False
def create_sample_data() -> pd.DataFrame:
"""Create sample EIS data for demonstration"""
frequencies = np.logspace(5, -2, 50) # 100kHz to 0.01Hz
# Simple RC circuit simulation
R0 = 100 # Ohms
R1 = 500 # Ohms
C1 = 1e-6 # Farads
omega = 2 * np.pi * frequencies
Z_R0 = R0
Z_RC = R1 / (1 + 1j * omega * R1 * C1)
Z_total = Z_R0 + Z_RC
df = pd.DataFrame({
'frequency': frequencies,
'z_real': Z_total.real,
'z_imag': -Z_total.imag
})
return df
def detect_column_names(df: pd.DataFrame) -> Dict[str, str]:
"""Auto-detect column names for EIS data"""
columns = df.columns.tolist()
mapping = {}
# Frequency column detection
freq_candidates = ['frequency', 'freq', 'f', 'Hz', 'Frequency']
for col in columns:
if any(candidate.lower() in col.lower() for candidate in freq_candidates):
mapping['frequency'] = col
break
# Real impedance column detection
real_candidates = ['z_real', 'real', 'zreal', 'z\'', 'realImpedance', 'real_impedance', 'Re(Z)', 'Re_Z']
for col in columns:
if any(candidate.lower() in col.lower().replace('(', '').replace(')', '') for candidate in real_candidates):
mapping['z_real'] = col
break
# Imaginary impedance column detection
imag_candidates = ['z_imag', 'imag', 'zimag', 'z\'\'', 'imagImpedance', 'imag_impedance', 'Im(Z)', 'Im_Z', '-z\'\'']
for col in columns:
if any(candidate.lower() in col.lower().replace('(', '').replace(')', '') for candidate in imag_candidates):
mapping['z_imag'] = col
break
return mapping
def analyze_eis_optimized(
df: pd.DataFrame,
circuit_model: str = "auto",
algorithm: str = "lm",
use_hf_params: bool = True,
progress_callback=None
) -> Tuple[Dict[str, Any], str, str]:
"""
Analyze EIS data with HF optimization
Returns: (results_dict, nyquist_plot, bode_plot)
"""
if ae is None:
return {"error": "AutoEIS not available"}, None, None
# Check memory before starting
if not check_memory_available():
gc.collect() # Try garbage collection
if not check_memory_available():
return {"error": "Insufficient memory available"}, None, None
try:
# Auto-detect column names
column_mapping = detect_column_names(df)
if 'frequency' not in column_mapping:
return {"error": "Could not find frequency column. Expected names: frequency, freq, f, Hz"}, None, None
if 'z_real' not in column_mapping:
return {"error": "Could not find real impedance column. Expected names: z_real, real, realImpedance, Re(Z)"}, None, None
if 'z_imag' not in column_mapping:
return {"error": "Could not find imaginary impedance column. Expected names: z_imag, imag, imagImpedance, Im(Z)"}, None, None
# Prepare impedance data using detected column names
freq = df[column_mapping['frequency']].values
z_real = df[column_mapping['z_real']].values
z_imag = df[column_mapping['z_imag']].values
# Handle imaginary part sign convention
# EIS convention: Z = Z' - jZ'' (imaginary part should be negative for typical circuits)
# If most imaginary values are positive, we might need to flip the sign
if np.mean(z_imag) > 0:
z_imag = -z_imag # Flip sign to follow EIS convention
if progress_callback:
progress_callback(0.15, "Adjusted imaginary impedance sign...")
Z = z_real + 1j * z_imag
# Use optimized parameters for HF
params = HF_OPTIMIZED_PARAMS.copy() if use_hf_params else {}
if progress_callback:
progress_callback(0.2, "Initializing AutoEIS...")
# Circuit detection with limited complexity
if circuit_model == "auto":
if progress_callback:
progress_callback(0.4, "Detecting circuit model...")
# Use simpler approach for HF - corrected parameter order and names
circuits_df = ae.core.generate_equivalent_circuits(
freq, # Frequency array (correct order)
Z, # Impedance array (correct order)
iters=params.get("iters", 20),
complexity=params.get("complexity", 8),
generations=params.get("generations", 15),
population_size=params.get("population_size", 50),
tol=params.get("tol", 1e-2),
parallel=params.get("parallel", True),
terminals=params.get("terminals", "RLP"),
seed=params.get("seed", 42)
)
if circuits_df is not None and len(circuits_df) > 0:
# Extract the best circuit string from the DataFrame
circuit_str = circuits_df.iloc[0]['circuitstring'] # Take the best circuit
else:
circuit_str = "R0-[R1,C1]" # Fallback simple circuit
else:
circuit_str = circuit_model
if progress_callback:
progress_callback(0.6, "Fitting circuit parameters...")
# Fit the circuit
circuit = ae.core.get_parameterized_circuit(circuit_str)
fitted_params = ae.core.fit_parameters(circuit, freq, Z)
if progress_callback:
progress_callback(0.8, "Generating plots...")
# Generate plots
import matplotlib
matplotlib.use('Agg') # Non-interactive backend
import matplotlib.pyplot as plt
# Nyquist plot
fig_nyquist, ax_nyquist = plt.subplots(figsize=(8, 6))
ax_nyquist.plot(Z.real, Z.imag, 'bo', label='Data', markersize=6)
# Add fitted curve if available
if fitted_params:
Z_fit = circuit(freq, **fitted_params)
ax_nyquist.plot(Z_fit.real, Z_fit.imag, 'r-', label='Fit', linewidth=2)
ax_nyquist.set_xlabel('Z\' (Ω)')
ax_nyquist.set_ylabel('-Z\'\' (Ω)')
ax_nyquist.set_title('Nyquist Plot')
ax_nyquist.legend()
ax_nyquist.grid(True, alpha=0.3)
ax_nyquist.set_aspect('equal')
# Bode plot
fig_bode, (ax_mag, ax_phase) = plt.subplots(2, 1, figsize=(8, 8))
Z_mag = np.abs(Z)
Z_phase = np.angle(Z, deg=True)
ax_mag.loglog(freq, Z_mag, 'bo', label='Data', markersize=6)
ax_mag.set_ylabel('|Z| (Ω)')
ax_mag.set_title('Bode Plot - Magnitude')
ax_mag.grid(True, which="both", alpha=0.3)
ax_mag.legend()
ax_phase.semilogx(freq, Z_phase, 'bo', label='Data', markersize=6)
if fitted_params:
Z_fit = circuit(freq, **fitted_params)
ax_mag.loglog(freq, np.abs(Z_fit), 'r-', label='Fit', linewidth=2)
ax_phase.semilogx(freq, np.angle(Z_fit, deg=True), 'r-', label='Fit', linewidth=2)
ax_phase.set_xlabel('Frequency (Hz)')
ax_phase.set_ylabel('Phase (°)')
ax_phase.set_title('Bode Plot - Phase')
ax_phase.grid(True, alpha=0.3)
ax_phase.legend()
plt.tight_layout()
# Calculate fit quality
if fitted_params:
Z_fit = circuit(freq, **fitted_params)
residuals = Z - Z_fit
chi_squared = np.sum(np.abs(residuals)**2) / len(Z)
fit_error = np.sqrt(chi_squared)
else:
chi_squared = None
fit_error = None
# Prepare results
results = {
"circuit_model": circuit_str,
"fit_parameters": fitted_params if fitted_params else {},
"fit_error": float(fit_error) if fit_error else None,
"chi_squared": float(chi_squared) if chi_squared else None,
"memory_usage_mb": get_memory_usage(),
"column_mapping": column_mapping,
"data_points": len(freq)
}
if progress_callback:
progress_callback(1.0, "Analysis complete!")
# Clean up memory
gc.collect()
return results, fig_nyquist, fig_bode
except Exception as e:
error_msg = f"Analysis error: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return {"error": error_msg}, None, None
finally:
# Always try to free memory
gc.collect()
def process_analysis(
data_file,
circuit_model,
algorithm,
use_optimization,
progress=gr.Progress()
):
"""Main analysis function for Gradio interface"""
progress(0.1, "Starting analysis...")
# Load data
if data_file is None:
progress(0.2, "Using sample data...")
df = create_sample_data()
else:
try:
df = pd.read_csv(data_file.name)
except Exception as e:
return {"error": f"Failed to read CSV: {e}"}, None, None
# Run analysis
results, nyquist_plot, bode_plot = analyze_eis_optimized(
df,
circuit_model=circuit_model,
algorithm=algorithm,
use_hf_params=use_optimization,
progress_callback=progress
)
return results, nyquist_plot, bode_plot
async def send_to_workflow(results):
"""Send results back to workflow"""
if workflow_context["callback_url"]:
success = await send_callback(results)
return "✅ Results sent to workflow!" if success else "❌ Failed to send results"
return "No workflow callback URL configured"
# Create Gradio interface
def create_interface():
with gr.Blocks(title="AutoEIS Analyzer", theme=gr.themes.Soft()) as app:
# Header
gr.Markdown("""
# 🔬 AutoEIS Analysis Tool
### Automated Electrochemical Impedance Spectroscopy Analysis
Optimized for Hugging Face Spaces with workflow integration support.
""")
# Memory monitor
with gr.Row():
memory_display = gr.Textbox(
label="Memory Usage",
value=f"{get_memory_usage():.1f} MB",
interactive=False,
scale=1
)
workflow_info = gr.Textbox(
label="Workflow Context",
value="No workflow connected",
interactive=False,
scale=3
)
with gr.Tabs():
# Data Input Tab
with gr.Tab("📊 Data Input"):
with gr.Row():
data_file = gr.File(
label="Upload EIS Data (CSV)",
file_types=[".csv"],
value=None
)
with gr.Row():
gr.Markdown("""
**Supported CSV Formats (auto-detected):**
**Frequency column**: `frequency`, `freq`, `f`, `Hz`
**Real impedance**: `z_real`, `real`, `realImpedance`, `Re(Z)`
**Imaginary impedance**: `z_imag`, `imag`, `imagImpedance`, `Im(Z)`
✅ **Your file format is automatically detected!**
Leave empty to use sample data.
""")
data_preview = gr.DataFrame(
label="Data Preview (first 10 rows)",
interactive=False
)
# Parameters Tab
with gr.Tab("⚙️ Parameters"):
with gr.Row():
circuit_model = gr.Dropdown(
choices=["auto", "R0-[R1,C1]", "R0-[R1,P1]", "R0-[R1,C1]-[R2,C2]"],
value="auto",
label="Circuit Model",
info="Select 'auto' for automatic detection"
)
algorithm = gr.Radio(
choices=["lm", "trf", "dogbox"],
value="lm",
label="Fitting Algorithm",
info="Levenberg-Marquardt (lm) is usually best"
)
with gr.Row():
use_optimization = gr.Checkbox(
value=True,
label="Use HF-optimized parameters",
info="Recommended for Hugging Face Spaces (faster, less memory)"
)
with gr.Row():
gr.Markdown("""
**HF-Optimized Settings (when enabled):**
- Circuit iterations: 30 (balanced performance/accuracy)
- Complexity limit: 10 (prevents overfitting)
- Population size: 50 (memory efficient)
- Tolerance: 1e-3 (good fit quality)
- Components: R (resistors), L (inductors), P (CPE)
""")
# Results Tab
with gr.Tab("📈 Results"):
with gr.Row():
results_json = gr.JSON(
label="Analysis Results",
value=None
)
with gr.Row():
nyquist_plot = gr.Plot(
label="Nyquist Plot",
show_label=True
)
bode_plot = gr.Plot(
label="Bode Plot",
show_label=True
)
with gr.Row():
workflow_btn = gr.Button(
"📤 Send to Workflow",
variant="secondary",
visible=False
)
workflow_status = gr.Textbox(
label="Workflow Status",
interactive=False,
visible=False
)
# Action buttons
with gr.Row():
analyze_btn = gr.Button(
"🚀 Run Analysis",
variant="primary",
size="lg"
)
clear_btn = gr.Button(
"🔄 Clear",
variant="secondary"
)
# Event handlers
def update_preview(file):
if file is None:
df = create_sample_data()
return df.head(10), f"Memory: {get_memory_usage():.1f} MB"
try:
df = pd.read_csv(file.name)
# Detect column mapping
mapping = detect_column_names(df)
missing = []
if 'frequency' not in mapping:
missing.append("frequency")
if 'z_real' not in mapping:
missing.append("real impedance")
if 'z_imag' not in mapping:
missing.append("imaginary impedance")
if missing:
status = f"Memory: {get_memory_usage():.1f} MB | ⚠️ Missing: {', '.join(missing)}"
else:
status = f"Memory: {get_memory_usage():.1f} MB | ✅ All columns detected"
return df.head(10), status
except Exception as e:
return None, f"Memory: {get_memory_usage():.1f} MB | ❌ Error: {str(e)}"
def clear_all():
gc.collect()
return (
None, # data_file
None, # data_preview
"auto", # circuit_model
"lm", # algorithm
True, # use_optimization
None, # results_json
None, # nyquist_plot
None, # bode_plot
f"Memory: {get_memory_usage():.1f} MB" # memory_display
)
# Wire up events
data_file.change(
fn=update_preview,
inputs=[data_file],
outputs=[data_preview, memory_display]
)
analyze_btn.click(
fn=process_analysis,
inputs=[
data_file,
circuit_model,
algorithm,
use_optimization
],
outputs=[
results_json,
nyquist_plot,
bode_plot
]
)
clear_btn.click(
fn=clear_all,
outputs=[
data_file,
data_preview,
circuit_model,
algorithm,
use_optimization,
results_json,
nyquist_plot,
bode_plot,
memory_display
]
)
workflow_btn.click(
fn=lambda r: asyncio.run(send_to_workflow(r)),
inputs=[results_json],
outputs=[workflow_status]
)
# Load workflow params on startup
def on_load(request: gr.Request):
params = parse_workflow_params(request)
if params:
workflow_context.update({
"workflow_id": params.get("workflow_id"),
"node_id": params.get("node_id"),
"callback_url": params.get("callback_url"),
"auth_token": params.get("auth_token")
})
if params.get("input_data", {}).get("csv_data"):
# Decode and process CSV data
df = decode_csv_data(params["input_data"]["csv_data"])
if df is not None:
return f"Workflow: {params.get('workflow_id', 'Unknown')}", True, True
return f"Workflow: {params.get('workflow_id', 'Unknown')}", True, False
return "No workflow connected", False, False
app.load(
fn=on_load,
outputs=[workflow_info, workflow_btn, workflow_status]
)
return app
# Launch the app
if __name__ == "__main__":
app = create_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)