""" AutoEIS Hugging Face Space Application Optimized for limited resources with workflow integration """ import gradio as gr import pandas as pd import numpy as np import base64 import json import requests import io import os import psutil import gc from typing import Dict, Any, Optional, Tuple from datetime import datetime import traceback import asyncio import aiohttp from urllib.parse import parse_qs, urlparse # Import AutoEIS with error handling try: import autoeis as ae except ImportError as e: print(f"Warning: AutoEIS import issue: {e}") ae = None # Memory monitoring def get_memory_usage(): """Get current memory usage in MB""" process = psutil.Process(os.getpid()) return process.memory_info().rss / 1024 / 1024 def check_memory_available(): """Check if enough memory is available""" memory_mb = get_memory_usage() available_mb = psutil.virtual_memory().available / 1024 / 1024 return available_mb > 500 # Need at least 500MB free # Global variables for workflow integration workflow_context = { "workflow_id": None, "node_id": None, "callback_url": None, "auth_token": None } # Optimized parameters for HF Spaces HF_OPTIMIZED_PARAMS = { "iters": 30, # Increased for better circuit detection "complexity": 10, # Increased for better circuit detection "generations": 20, # Increased for better circuit detection "population_size": 50, # Keep moderate for memory "tol": 1e-3, # Tighter tolerance for better fits "parallel": True, # Enable parallel processing "terminals": "RLP", # R: resistor, L: inductor, P: constant-phase element "seed": 42 # Random seed for reproducibility } def parse_workflow_params(request: gr.Request) -> Dict[str, Any]: """Parse workflow parameters from URL or headers""" params = {} # Try to get params from URL query string if request and hasattr(request, 'query_params'): query_params = dict(request.query_params) if 'params' in query_params: try: encoded_params = query_params['params'] decoded = base64.b64decode(encoded_params) params = json.loads(decoded) except Exception as e: print(f"Error parsing URL params: {e}") return params def decode_csv_data(encoded_data: str) -> pd.DataFrame: """Decode base64 CSV data to DataFrame""" try: csv_bytes = base64.b64decode(encoded_data) csv_string = csv_bytes.decode('utf-8') df = pd.read_csv(io.StringIO(csv_string)) return df except Exception as e: print(f"Error decoding CSV: {e}") return None async def send_callback(results: Dict[str, Any]) -> bool: """Send results back to workflow system""" if not workflow_context["callback_url"]: return False try: headers = { "Content-Type": "application/json", "Authorization": f"Bearer {workflow_context['auth_token']}" } payload = { "workflow_id": workflow_context["workflow_id"], "node_id": workflow_context["node_id"], "status": "completed", "results": results, "analysis_timestamp": datetime.utcnow().isoformat() + "Z" } async with aiohttp.ClientSession() as session: async with session.post( workflow_context["callback_url"], json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: return response.status == 200 except Exception as e: print(f"Callback error: {e}") return False def create_sample_data() -> pd.DataFrame: """Create sample EIS data for demonstration""" frequencies = np.logspace(5, -2, 50) # 100kHz to 0.01Hz # Simple RC circuit simulation R0 = 100 # Ohms R1 = 500 # Ohms C1 = 1e-6 # Farads omega = 2 * np.pi * frequencies Z_R0 = R0 Z_RC = R1 / (1 + 1j * omega * R1 * C1) Z_total = Z_R0 + Z_RC df = pd.DataFrame({ 'frequency': frequencies, 'z_real': Z_total.real, 'z_imag': -Z_total.imag }) return df def detect_column_names(df: pd.DataFrame) -> Dict[str, str]: """Auto-detect column names for EIS data""" columns = df.columns.tolist() mapping = {} # Frequency column detection freq_candidates = ['frequency', 'freq', 'f', 'Hz', 'Frequency'] for col in columns: if any(candidate.lower() in col.lower() for candidate in freq_candidates): mapping['frequency'] = col break # Real impedance column detection real_candidates = ['z_real', 'real', 'zreal', 'z\'', 'realImpedance', 'real_impedance', 'Re(Z)', 'Re_Z'] for col in columns: if any(candidate.lower() in col.lower().replace('(', '').replace(')', '') for candidate in real_candidates): mapping['z_real'] = col break # Imaginary impedance column detection imag_candidates = ['z_imag', 'imag', 'zimag', 'z\'\'', 'imagImpedance', 'imag_impedance', 'Im(Z)', 'Im_Z', '-z\'\''] for col in columns: if any(candidate.lower() in col.lower().replace('(', '').replace(')', '') for candidate in imag_candidates): mapping['z_imag'] = col break return mapping def analyze_eis_optimized( df: pd.DataFrame, circuit_model: str = "auto", algorithm: str = "lm", use_hf_params: bool = True, progress_callback=None ) -> Tuple[Dict[str, Any], str, str]: """ Analyze EIS data with HF optimization Returns: (results_dict, nyquist_plot, bode_plot) """ if ae is None: return {"error": "AutoEIS not available"}, None, None # Check memory before starting if not check_memory_available(): gc.collect() # Try garbage collection if not check_memory_available(): return {"error": "Insufficient memory available"}, None, None try: # Auto-detect column names column_mapping = detect_column_names(df) if 'frequency' not in column_mapping: return {"error": "Could not find frequency column. Expected names: frequency, freq, f, Hz"}, None, None if 'z_real' not in column_mapping: return {"error": "Could not find real impedance column. Expected names: z_real, real, realImpedance, Re(Z)"}, None, None if 'z_imag' not in column_mapping: return {"error": "Could not find imaginary impedance column. Expected names: z_imag, imag, imagImpedance, Im(Z)"}, None, None # Prepare impedance data using detected column names freq = df[column_mapping['frequency']].values z_real = df[column_mapping['z_real']].values z_imag = df[column_mapping['z_imag']].values # Handle imaginary part sign convention # EIS convention: Z = Z' - jZ'' (imaginary part should be negative for typical circuits) # If most imaginary values are positive, we might need to flip the sign if np.mean(z_imag) > 0: z_imag = -z_imag # Flip sign to follow EIS convention if progress_callback: progress_callback(0.15, "Adjusted imaginary impedance sign...") Z = z_real + 1j * z_imag # Use optimized parameters for HF params = HF_OPTIMIZED_PARAMS.copy() if use_hf_params else {} if progress_callback: progress_callback(0.2, "Initializing AutoEIS...") # Circuit detection with limited complexity if circuit_model == "auto": if progress_callback: progress_callback(0.4, "Detecting circuit model...") # Use simpler approach for HF - corrected parameter order and names circuits_df = ae.core.generate_equivalent_circuits( freq, # Frequency array (correct order) Z, # Impedance array (correct order) iters=params.get("iters", 20), complexity=params.get("complexity", 8), generations=params.get("generations", 15), population_size=params.get("population_size", 50), tol=params.get("tol", 1e-2), parallel=params.get("parallel", True), terminals=params.get("terminals", "RLP"), seed=params.get("seed", 42) ) if circuits_df is not None and len(circuits_df) > 0: # Extract the best circuit string from the DataFrame circuit_str = circuits_df.iloc[0]['circuitstring'] # Take the best circuit else: circuit_str = "R0-[R1,C1]" # Fallback simple circuit else: circuit_str = circuit_model if progress_callback: progress_callback(0.6, "Fitting circuit parameters...") # Fit the circuit circuit = ae.core.get_parameterized_circuit(circuit_str) fitted_params = ae.core.fit_parameters(circuit, freq, Z) if progress_callback: progress_callback(0.8, "Generating plots...") # Generate plots import matplotlib matplotlib.use('Agg') # Non-interactive backend import matplotlib.pyplot as plt # Nyquist plot fig_nyquist, ax_nyquist = plt.subplots(figsize=(8, 6)) ax_nyquist.plot(Z.real, Z.imag, 'bo', label='Data', markersize=6) # Add fitted curve if available if fitted_params: Z_fit = circuit(freq, **fitted_params) ax_nyquist.plot(Z_fit.real, Z_fit.imag, 'r-', label='Fit', linewidth=2) ax_nyquist.set_xlabel('Z\' (Ω)') ax_nyquist.set_ylabel('-Z\'\' (Ω)') ax_nyquist.set_title('Nyquist Plot') ax_nyquist.legend() ax_nyquist.grid(True, alpha=0.3) ax_nyquist.set_aspect('equal') # Bode plot fig_bode, (ax_mag, ax_phase) = plt.subplots(2, 1, figsize=(8, 8)) Z_mag = np.abs(Z) Z_phase = np.angle(Z, deg=True) ax_mag.loglog(freq, Z_mag, 'bo', label='Data', markersize=6) ax_mag.set_ylabel('|Z| (Ω)') ax_mag.set_title('Bode Plot - Magnitude') ax_mag.grid(True, which="both", alpha=0.3) ax_mag.legend() ax_phase.semilogx(freq, Z_phase, 'bo', label='Data', markersize=6) if fitted_params: Z_fit = circuit(freq, **fitted_params) ax_mag.loglog(freq, np.abs(Z_fit), 'r-', label='Fit', linewidth=2) ax_phase.semilogx(freq, np.angle(Z_fit, deg=True), 'r-', label='Fit', linewidth=2) ax_phase.set_xlabel('Frequency (Hz)') ax_phase.set_ylabel('Phase (°)') ax_phase.set_title('Bode Plot - Phase') ax_phase.grid(True, alpha=0.3) ax_phase.legend() plt.tight_layout() # Calculate fit quality if fitted_params: Z_fit = circuit(freq, **fitted_params) residuals = Z - Z_fit chi_squared = np.sum(np.abs(residuals)**2) / len(Z) fit_error = np.sqrt(chi_squared) else: chi_squared = None fit_error = None # Prepare results results = { "circuit_model": circuit_str, "fit_parameters": fitted_params if fitted_params else {}, "fit_error": float(fit_error) if fit_error else None, "chi_squared": float(chi_squared) if chi_squared else None, "memory_usage_mb": get_memory_usage(), "column_mapping": column_mapping, "data_points": len(freq) } if progress_callback: progress_callback(1.0, "Analysis complete!") # Clean up memory gc.collect() return results, fig_nyquist, fig_bode except Exception as e: error_msg = f"Analysis error: {str(e)}\n{traceback.format_exc()}" print(error_msg) return {"error": error_msg}, None, None finally: # Always try to free memory gc.collect() def process_analysis( data_file, circuit_model, algorithm, use_optimization, progress=gr.Progress() ): """Main analysis function for Gradio interface""" progress(0.1, "Starting analysis...") # Load data if data_file is None: progress(0.2, "Using sample data...") df = create_sample_data() else: try: df = pd.read_csv(data_file.name) except Exception as e: return {"error": f"Failed to read CSV: {e}"}, None, None # Run analysis results, nyquist_plot, bode_plot = analyze_eis_optimized( df, circuit_model=circuit_model, algorithm=algorithm, use_hf_params=use_optimization, progress_callback=progress ) return results, nyquist_plot, bode_plot async def send_to_workflow(results): """Send results back to workflow""" if workflow_context["callback_url"]: success = await send_callback(results) return "✅ Results sent to workflow!" if success else "❌ Failed to send results" return "No workflow callback URL configured" # Create Gradio interface def create_interface(): with gr.Blocks(title="AutoEIS Analyzer", theme=gr.themes.Soft()) as app: # Header gr.Markdown(""" # 🔬 AutoEIS Analysis Tool ### Automated Electrochemical Impedance Spectroscopy Analysis Optimized for Hugging Face Spaces with workflow integration support. """) # Memory monitor with gr.Row(): memory_display = gr.Textbox( label="Memory Usage", value=f"{get_memory_usage():.1f} MB", interactive=False, scale=1 ) workflow_info = gr.Textbox( label="Workflow Context", value="No workflow connected", interactive=False, scale=3 ) with gr.Tabs(): # Data Input Tab with gr.Tab("📊 Data Input"): with gr.Row(): data_file = gr.File( label="Upload EIS Data (CSV)", file_types=[".csv"], value=None ) with gr.Row(): gr.Markdown(""" **Supported CSV Formats (auto-detected):** **Frequency column**: `frequency`, `freq`, `f`, `Hz` **Real impedance**: `z_real`, `real`, `realImpedance`, `Re(Z)` **Imaginary impedance**: `z_imag`, `imag`, `imagImpedance`, `Im(Z)` ✅ **Your file format is automatically detected!** Leave empty to use sample data. """) data_preview = gr.DataFrame( label="Data Preview (first 10 rows)", interactive=False ) # Parameters Tab with gr.Tab("⚙️ Parameters"): with gr.Row(): circuit_model = gr.Dropdown( choices=["auto", "R0-[R1,C1]", "R0-[R1,P1]", "R0-[R1,C1]-[R2,C2]"], value="auto", label="Circuit Model", info="Select 'auto' for automatic detection" ) algorithm = gr.Radio( choices=["lm", "trf", "dogbox"], value="lm", label="Fitting Algorithm", info="Levenberg-Marquardt (lm) is usually best" ) with gr.Row(): use_optimization = gr.Checkbox( value=True, label="Use HF-optimized parameters", info="Recommended for Hugging Face Spaces (faster, less memory)" ) with gr.Row(): gr.Markdown(""" **HF-Optimized Settings (when enabled):** - Circuit iterations: 30 (balanced performance/accuracy) - Complexity limit: 10 (prevents overfitting) - Population size: 50 (memory efficient) - Tolerance: 1e-3 (good fit quality) - Components: R (resistors), L (inductors), P (CPE) """) # Results Tab with gr.Tab("📈 Results"): with gr.Row(): results_json = gr.JSON( label="Analysis Results", value=None ) with gr.Row(): nyquist_plot = gr.Plot( label="Nyquist Plot", show_label=True ) bode_plot = gr.Plot( label="Bode Plot", show_label=True ) with gr.Row(): workflow_btn = gr.Button( "📤 Send to Workflow", variant="secondary", visible=False ) workflow_status = gr.Textbox( label="Workflow Status", interactive=False, visible=False ) # Action buttons with gr.Row(): analyze_btn = gr.Button( "🚀 Run Analysis", variant="primary", size="lg" ) clear_btn = gr.Button( "🔄 Clear", variant="secondary" ) # Event handlers def update_preview(file): if file is None: df = create_sample_data() return df.head(10), f"Memory: {get_memory_usage():.1f} MB" try: df = pd.read_csv(file.name) # Detect column mapping mapping = detect_column_names(df) missing = [] if 'frequency' not in mapping: missing.append("frequency") if 'z_real' not in mapping: missing.append("real impedance") if 'z_imag' not in mapping: missing.append("imaginary impedance") if missing: status = f"Memory: {get_memory_usage():.1f} MB | ⚠️ Missing: {', '.join(missing)}" else: status = f"Memory: {get_memory_usage():.1f} MB | ✅ All columns detected" return df.head(10), status except Exception as e: return None, f"Memory: {get_memory_usage():.1f} MB | ❌ Error: {str(e)}" def clear_all(): gc.collect() return ( None, # data_file None, # data_preview "auto", # circuit_model "lm", # algorithm True, # use_optimization None, # results_json None, # nyquist_plot None, # bode_plot f"Memory: {get_memory_usage():.1f} MB" # memory_display ) # Wire up events data_file.change( fn=update_preview, inputs=[data_file], outputs=[data_preview, memory_display] ) analyze_btn.click( fn=process_analysis, inputs=[ data_file, circuit_model, algorithm, use_optimization ], outputs=[ results_json, nyquist_plot, bode_plot ] ) clear_btn.click( fn=clear_all, outputs=[ data_file, data_preview, circuit_model, algorithm, use_optimization, results_json, nyquist_plot, bode_plot, memory_display ] ) workflow_btn.click( fn=lambda r: asyncio.run(send_to_workflow(r)), inputs=[results_json], outputs=[workflow_status] ) # Load workflow params on startup def on_load(request: gr.Request): params = parse_workflow_params(request) if params: workflow_context.update({ "workflow_id": params.get("workflow_id"), "node_id": params.get("node_id"), "callback_url": params.get("callback_url"), "auth_token": params.get("auth_token") }) if params.get("input_data", {}).get("csv_data"): # Decode and process CSV data df = decode_csv_data(params["input_data"]["csv_data"]) if df is not None: return f"Workflow: {params.get('workflow_id', 'Unknown')}", True, True return f"Workflow: {params.get('workflow_id', 'Unknown')}", True, False return "No workflow connected", False, False app.load( fn=on_load, outputs=[workflow_info, workflow_btn, workflow_status] ) return app # Launch the app if __name__ == "__main__": app = create_interface() app.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )