Spaces:
Sleeping
Sleeping
""" | |
AutoEIS Hugging Face Space Application | |
Optimized for limited resources with workflow integration | |
""" | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import base64 | |
import json | |
import requests | |
import io | |
import os | |
import psutil | |
import gc | |
from typing import Dict, Any, Optional, Tuple | |
from datetime import datetime | |
import traceback | |
import asyncio | |
import aiohttp | |
from urllib.parse import parse_qs, urlparse | |
# Import AutoEIS with error handling | |
try: | |
import autoeis as ae | |
except ImportError as e: | |
print(f"Warning: AutoEIS import issue: {e}") | |
ae = None | |
# Memory monitoring | |
def get_memory_usage(): | |
"""Get current memory usage in MB""" | |
process = psutil.Process(os.getpid()) | |
return process.memory_info().rss / 1024 / 1024 | |
def check_memory_available(): | |
"""Check if enough memory is available""" | |
memory_mb = get_memory_usage() | |
available_mb = psutil.virtual_memory().available / 1024 / 1024 | |
return available_mb > 500 # Need at least 500MB free | |
# Global variables for workflow integration | |
workflow_context = { | |
"workflow_id": None, | |
"node_id": None, | |
"callback_url": None, | |
"auth_token": None | |
} | |
# Optimized parameters for HF Spaces | |
HF_OPTIMIZED_PARAMS = { | |
"iters": 30, # Increased for better circuit detection | |
"complexity": 10, # Increased for better circuit detection | |
"generations": 20, # Increased for better circuit detection | |
"population_size": 50, # Keep moderate for memory | |
"tol": 1e-3, # Tighter tolerance for better fits | |
"parallel": True, # Enable parallel processing | |
"terminals": "RLP", # R: resistor, L: inductor, P: constant-phase element | |
"seed": 42 # Random seed for reproducibility | |
} | |
def parse_workflow_params(request: gr.Request) -> Dict[str, Any]: | |
"""Parse workflow parameters from URL or headers""" | |
params = {} | |
# Try to get params from URL query string | |
if request and hasattr(request, 'query_params'): | |
query_params = dict(request.query_params) | |
if 'params' in query_params: | |
try: | |
encoded_params = query_params['params'] | |
decoded = base64.b64decode(encoded_params) | |
params = json.loads(decoded) | |
except Exception as e: | |
print(f"Error parsing URL params: {e}") | |
return params | |
def decode_csv_data(encoded_data: str) -> pd.DataFrame: | |
"""Decode base64 CSV data to DataFrame""" | |
try: | |
csv_bytes = base64.b64decode(encoded_data) | |
csv_string = csv_bytes.decode('utf-8') | |
df = pd.read_csv(io.StringIO(csv_string)) | |
return df | |
except Exception as e: | |
print(f"Error decoding CSV: {e}") | |
return None | |
async def send_callback(results: Dict[str, Any]) -> bool: | |
"""Send results back to workflow system""" | |
if not workflow_context["callback_url"]: | |
return False | |
try: | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {workflow_context['auth_token']}" | |
} | |
payload = { | |
"workflow_id": workflow_context["workflow_id"], | |
"node_id": workflow_context["node_id"], | |
"status": "completed", | |
"results": results, | |
"analysis_timestamp": datetime.utcnow().isoformat() + "Z" | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
workflow_context["callback_url"], | |
json=payload, | |
headers=headers, | |
timeout=aiohttp.ClientTimeout(total=30) | |
) as response: | |
return response.status == 200 | |
except Exception as e: | |
print(f"Callback error: {e}") | |
return False | |
def create_sample_data() -> pd.DataFrame: | |
"""Create sample EIS data for demonstration""" | |
frequencies = np.logspace(5, -2, 50) # 100kHz to 0.01Hz | |
# Simple RC circuit simulation | |
R0 = 100 # Ohms | |
R1 = 500 # Ohms | |
C1 = 1e-6 # Farads | |
omega = 2 * np.pi * frequencies | |
Z_R0 = R0 | |
Z_RC = R1 / (1 + 1j * omega * R1 * C1) | |
Z_total = Z_R0 + Z_RC | |
df = pd.DataFrame({ | |
'frequency': frequencies, | |
'z_real': Z_total.real, | |
'z_imag': -Z_total.imag | |
}) | |
return df | |
def detect_column_names(df: pd.DataFrame) -> Dict[str, str]: | |
"""Auto-detect column names for EIS data""" | |
columns = df.columns.tolist() | |
mapping = {} | |
# Frequency column detection | |
freq_candidates = ['frequency', 'freq', 'f', 'Hz', 'Frequency'] | |
for col in columns: | |
if any(candidate.lower() in col.lower() for candidate in freq_candidates): | |
mapping['frequency'] = col | |
break | |
# Real impedance column detection | |
real_candidates = ['z_real', 'real', 'zreal', 'z\'', 'realImpedance', 'real_impedance', 'Re(Z)', 'Re_Z'] | |
for col in columns: | |
if any(candidate.lower() in col.lower().replace('(', '').replace(')', '') for candidate in real_candidates): | |
mapping['z_real'] = col | |
break | |
# Imaginary impedance column detection | |
imag_candidates = ['z_imag', 'imag', 'zimag', 'z\'\'', 'imagImpedance', 'imag_impedance', 'Im(Z)', 'Im_Z', '-z\'\''] | |
for col in columns: | |
if any(candidate.lower() in col.lower().replace('(', '').replace(')', '') for candidate in imag_candidates): | |
mapping['z_imag'] = col | |
break | |
return mapping | |
def analyze_eis_optimized( | |
df: pd.DataFrame, | |
circuit_model: str = "auto", | |
algorithm: str = "lm", | |
use_hf_params: bool = True, | |
progress_callback=None | |
) -> Tuple[Dict[str, Any], str, str]: | |
""" | |
Analyze EIS data with HF optimization | |
Returns: (results_dict, nyquist_plot, bode_plot) | |
""" | |
if ae is None: | |
return {"error": "AutoEIS not available"}, None, None | |
# Check memory before starting | |
if not check_memory_available(): | |
gc.collect() # Try garbage collection | |
if not check_memory_available(): | |
return {"error": "Insufficient memory available"}, None, None | |
try: | |
# Auto-detect column names | |
column_mapping = detect_column_names(df) | |
if 'frequency' not in column_mapping: | |
return {"error": "Could not find frequency column. Expected names: frequency, freq, f, Hz"}, None, None | |
if 'z_real' not in column_mapping: | |
return {"error": "Could not find real impedance column. Expected names: z_real, real, realImpedance, Re(Z)"}, None, None | |
if 'z_imag' not in column_mapping: | |
return {"error": "Could not find imaginary impedance column. Expected names: z_imag, imag, imagImpedance, Im(Z)"}, None, None | |
# Prepare impedance data using detected column names | |
freq = df[column_mapping['frequency']].values | |
z_real = df[column_mapping['z_real']].values | |
z_imag = df[column_mapping['z_imag']].values | |
# Handle imaginary part sign convention | |
# EIS convention: Z = Z' - jZ'' (imaginary part should be negative for typical circuits) | |
# If most imaginary values are positive, we might need to flip the sign | |
if np.mean(z_imag) > 0: | |
z_imag = -z_imag # Flip sign to follow EIS convention | |
if progress_callback: | |
progress_callback(0.15, "Adjusted imaginary impedance sign...") | |
Z = z_real + 1j * z_imag | |
# Use optimized parameters for HF | |
params = HF_OPTIMIZED_PARAMS.copy() if use_hf_params else {} | |
if progress_callback: | |
progress_callback(0.2, "Initializing AutoEIS...") | |
# Circuit detection with limited complexity | |
if circuit_model == "auto": | |
if progress_callback: | |
progress_callback(0.4, "Detecting circuit model...") | |
# Use simpler approach for HF - corrected parameter order and names | |
circuits_df = ae.core.generate_equivalent_circuits( | |
freq, # Frequency array (correct order) | |
Z, # Impedance array (correct order) | |
iters=params.get("iters", 20), | |
complexity=params.get("complexity", 8), | |
generations=params.get("generations", 15), | |
population_size=params.get("population_size", 50), | |
tol=params.get("tol", 1e-2), | |
parallel=params.get("parallel", True), | |
terminals=params.get("terminals", "RLP"), | |
seed=params.get("seed", 42) | |
) | |
if circuits_df is not None and len(circuits_df) > 0: | |
# Extract the best circuit string from the DataFrame | |
circuit_str = circuits_df.iloc[0]['circuitstring'] # Take the best circuit | |
else: | |
circuit_str = "R0-[R1,C1]" # Fallback simple circuit | |
else: | |
circuit_str = circuit_model | |
if progress_callback: | |
progress_callback(0.6, "Fitting circuit parameters...") | |
# Fit the circuit | |
circuit = ae.core.get_parameterized_circuit(circuit_str) | |
fitted_params = ae.core.fit_parameters(circuit, freq, Z) | |
if progress_callback: | |
progress_callback(0.8, "Generating plots...") | |
# Generate plots | |
import matplotlib | |
matplotlib.use('Agg') # Non-interactive backend | |
import matplotlib.pyplot as plt | |
# Nyquist plot | |
fig_nyquist, ax_nyquist = plt.subplots(figsize=(8, 6)) | |
ax_nyquist.plot(Z.real, Z.imag, 'bo', label='Data', markersize=6) | |
# Add fitted curve if available | |
if fitted_params: | |
Z_fit = circuit(freq, **fitted_params) | |
ax_nyquist.plot(Z_fit.real, Z_fit.imag, 'r-', label='Fit', linewidth=2) | |
ax_nyquist.set_xlabel('Z\' (Ω)') | |
ax_nyquist.set_ylabel('-Z\'\' (Ω)') | |
ax_nyquist.set_title('Nyquist Plot') | |
ax_nyquist.legend() | |
ax_nyquist.grid(True, alpha=0.3) | |
ax_nyquist.set_aspect('equal') | |
# Bode plot | |
fig_bode, (ax_mag, ax_phase) = plt.subplots(2, 1, figsize=(8, 8)) | |
Z_mag = np.abs(Z) | |
Z_phase = np.angle(Z, deg=True) | |
ax_mag.loglog(freq, Z_mag, 'bo', label='Data', markersize=6) | |
ax_mag.set_ylabel('|Z| (Ω)') | |
ax_mag.set_title('Bode Plot - Magnitude') | |
ax_mag.grid(True, which="both", alpha=0.3) | |
ax_mag.legend() | |
ax_phase.semilogx(freq, Z_phase, 'bo', label='Data', markersize=6) | |
if fitted_params: | |
Z_fit = circuit(freq, **fitted_params) | |
ax_mag.loglog(freq, np.abs(Z_fit), 'r-', label='Fit', linewidth=2) | |
ax_phase.semilogx(freq, np.angle(Z_fit, deg=True), 'r-', label='Fit', linewidth=2) | |
ax_phase.set_xlabel('Frequency (Hz)') | |
ax_phase.set_ylabel('Phase (°)') | |
ax_phase.set_title('Bode Plot - Phase') | |
ax_phase.grid(True, alpha=0.3) | |
ax_phase.legend() | |
plt.tight_layout() | |
# Calculate fit quality | |
if fitted_params: | |
Z_fit = circuit(freq, **fitted_params) | |
residuals = Z - Z_fit | |
chi_squared = np.sum(np.abs(residuals)**2) / len(Z) | |
fit_error = np.sqrt(chi_squared) | |
else: | |
chi_squared = None | |
fit_error = None | |
# Prepare results | |
results = { | |
"circuit_model": circuit_str, | |
"fit_parameters": fitted_params if fitted_params else {}, | |
"fit_error": float(fit_error) if fit_error else None, | |
"chi_squared": float(chi_squared) if chi_squared else None, | |
"memory_usage_mb": get_memory_usage(), | |
"column_mapping": column_mapping, | |
"data_points": len(freq) | |
} | |
if progress_callback: | |
progress_callback(1.0, "Analysis complete!") | |
# Clean up memory | |
gc.collect() | |
return results, fig_nyquist, fig_bode | |
except Exception as e: | |
error_msg = f"Analysis error: {str(e)}\n{traceback.format_exc()}" | |
print(error_msg) | |
return {"error": error_msg}, None, None | |
finally: | |
# Always try to free memory | |
gc.collect() | |
def process_analysis( | |
data_file, | |
circuit_model, | |
algorithm, | |
use_optimization, | |
progress=gr.Progress() | |
): | |
"""Main analysis function for Gradio interface""" | |
progress(0.1, "Starting analysis...") | |
# Load data | |
if data_file is None: | |
progress(0.2, "Using sample data...") | |
df = create_sample_data() | |
else: | |
try: | |
df = pd.read_csv(data_file.name) | |
except Exception as e: | |
return {"error": f"Failed to read CSV: {e}"}, None, None | |
# Run analysis | |
results, nyquist_plot, bode_plot = analyze_eis_optimized( | |
df, | |
circuit_model=circuit_model, | |
algorithm=algorithm, | |
use_hf_params=use_optimization, | |
progress_callback=progress | |
) | |
return results, nyquist_plot, bode_plot | |
async def send_to_workflow(results): | |
"""Send results back to workflow""" | |
if workflow_context["callback_url"]: | |
success = await send_callback(results) | |
return "✅ Results sent to workflow!" if success else "❌ Failed to send results" | |
return "No workflow callback URL configured" | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="AutoEIS Analyzer", theme=gr.themes.Soft()) as app: | |
# Header | |
gr.Markdown(""" | |
# 🔬 AutoEIS Analysis Tool | |
### Automated Electrochemical Impedance Spectroscopy Analysis | |
Optimized for Hugging Face Spaces with workflow integration support. | |
""") | |
# Memory monitor | |
with gr.Row(): | |
memory_display = gr.Textbox( | |
label="Memory Usage", | |
value=f"{get_memory_usage():.1f} MB", | |
interactive=False, | |
scale=1 | |
) | |
workflow_info = gr.Textbox( | |
label="Workflow Context", | |
value="No workflow connected", | |
interactive=False, | |
scale=3 | |
) | |
with gr.Tabs(): | |
# Data Input Tab | |
with gr.Tab("📊 Data Input"): | |
with gr.Row(): | |
data_file = gr.File( | |
label="Upload EIS Data (CSV)", | |
file_types=[".csv"], | |
value=None | |
) | |
with gr.Row(): | |
gr.Markdown(""" | |
**Supported CSV Formats (auto-detected):** | |
**Frequency column**: `frequency`, `freq`, `f`, `Hz` | |
**Real impedance**: `z_real`, `real`, `realImpedance`, `Re(Z)` | |
**Imaginary impedance**: `z_imag`, `imag`, `imagImpedance`, `Im(Z)` | |
✅ **Your file format is automatically detected!** | |
Leave empty to use sample data. | |
""") | |
data_preview = gr.DataFrame( | |
label="Data Preview (first 10 rows)", | |
interactive=False | |
) | |
# Parameters Tab | |
with gr.Tab("⚙️ Parameters"): | |
with gr.Row(): | |
circuit_model = gr.Dropdown( | |
choices=["auto", "R0-[R1,C1]", "R0-[R1,P1]", "R0-[R1,C1]-[R2,C2]"], | |
value="auto", | |
label="Circuit Model", | |
info="Select 'auto' for automatic detection" | |
) | |
algorithm = gr.Radio( | |
choices=["lm", "trf", "dogbox"], | |
value="lm", | |
label="Fitting Algorithm", | |
info="Levenberg-Marquardt (lm) is usually best" | |
) | |
with gr.Row(): | |
use_optimization = gr.Checkbox( | |
value=True, | |
label="Use HF-optimized parameters", | |
info="Recommended for Hugging Face Spaces (faster, less memory)" | |
) | |
with gr.Row(): | |
gr.Markdown(""" | |
**HF-Optimized Settings (when enabled):** | |
- Circuit iterations: 30 (balanced performance/accuracy) | |
- Complexity limit: 10 (prevents overfitting) | |
- Population size: 50 (memory efficient) | |
- Tolerance: 1e-3 (good fit quality) | |
- Components: R (resistors), L (inductors), P (CPE) | |
""") | |
# Results Tab | |
with gr.Tab("📈 Results"): | |
with gr.Row(): | |
results_json = gr.JSON( | |
label="Analysis Results", | |
value=None | |
) | |
with gr.Row(): | |
nyquist_plot = gr.Plot( | |
label="Nyquist Plot", | |
show_label=True | |
) | |
bode_plot = gr.Plot( | |
label="Bode Plot", | |
show_label=True | |
) | |
with gr.Row(): | |
workflow_btn = gr.Button( | |
"📤 Send to Workflow", | |
variant="secondary", | |
visible=False | |
) | |
workflow_status = gr.Textbox( | |
label="Workflow Status", | |
interactive=False, | |
visible=False | |
) | |
# Action buttons | |
with gr.Row(): | |
analyze_btn = gr.Button( | |
"🚀 Run Analysis", | |
variant="primary", | |
size="lg" | |
) | |
clear_btn = gr.Button( | |
"🔄 Clear", | |
variant="secondary" | |
) | |
# Event handlers | |
def update_preview(file): | |
if file is None: | |
df = create_sample_data() | |
return df.head(10), f"Memory: {get_memory_usage():.1f} MB" | |
try: | |
df = pd.read_csv(file.name) | |
# Detect column mapping | |
mapping = detect_column_names(df) | |
missing = [] | |
if 'frequency' not in mapping: | |
missing.append("frequency") | |
if 'z_real' not in mapping: | |
missing.append("real impedance") | |
if 'z_imag' not in mapping: | |
missing.append("imaginary impedance") | |
if missing: | |
status = f"Memory: {get_memory_usage():.1f} MB | ⚠️ Missing: {', '.join(missing)}" | |
else: | |
status = f"Memory: {get_memory_usage():.1f} MB | ✅ All columns detected" | |
return df.head(10), status | |
except Exception as e: | |
return None, f"Memory: {get_memory_usage():.1f} MB | ❌ Error: {str(e)}" | |
def clear_all(): | |
gc.collect() | |
return ( | |
None, # data_file | |
None, # data_preview | |
"auto", # circuit_model | |
"lm", # algorithm | |
True, # use_optimization | |
None, # results_json | |
None, # nyquist_plot | |
None, # bode_plot | |
f"Memory: {get_memory_usage():.1f} MB" # memory_display | |
) | |
# Wire up events | |
data_file.change( | |
fn=update_preview, | |
inputs=[data_file], | |
outputs=[data_preview, memory_display] | |
) | |
analyze_btn.click( | |
fn=process_analysis, | |
inputs=[ | |
data_file, | |
circuit_model, | |
algorithm, | |
use_optimization | |
], | |
outputs=[ | |
results_json, | |
nyquist_plot, | |
bode_plot | |
] | |
) | |
clear_btn.click( | |
fn=clear_all, | |
outputs=[ | |
data_file, | |
data_preview, | |
circuit_model, | |
algorithm, | |
use_optimization, | |
results_json, | |
nyquist_plot, | |
bode_plot, | |
memory_display | |
] | |
) | |
workflow_btn.click( | |
fn=lambda r: asyncio.run(send_to_workflow(r)), | |
inputs=[results_json], | |
outputs=[workflow_status] | |
) | |
# Load workflow params on startup | |
def on_load(request: gr.Request): | |
params = parse_workflow_params(request) | |
if params: | |
workflow_context.update({ | |
"workflow_id": params.get("workflow_id"), | |
"node_id": params.get("node_id"), | |
"callback_url": params.get("callback_url"), | |
"auth_token": params.get("auth_token") | |
}) | |
if params.get("input_data", {}).get("csv_data"): | |
# Decode and process CSV data | |
df = decode_csv_data(params["input_data"]["csv_data"]) | |
if df is not None: | |
return f"Workflow: {params.get('workflow_id', 'Unknown')}", True, True | |
return f"Workflow: {params.get('workflow_id', 'Unknown')}", True, False | |
return "No workflow connected", False, False | |
app.load( | |
fn=on_load, | |
outputs=[workflow_info, workflow_btn, workflow_status] | |
) | |
return app | |
# Launch the app | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
show_error=True | |
) |