import logging from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple import gradio as gr import pandas as pd import requests from dataclasses import dataclass from tenacity import retry, stop_after_attempt, wait_fixed, wait_exponential import plotly.express as px import plotly.graph_objects as go import os import json import urllib.parse # IMPORTANT: For joining URLs # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("pesticide_app.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class SubstanceDetails: name: str substance_id: int status: Optional[str] = None approval_date: Optional[str] = None expiry_date: Optional[str] = None cas_number: Optional[str] = None ec_number: Optional[str] = None class PesticideDataFetcher: BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides" HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'application/json', 'Accept-Language': 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7', # Keeping this, but it might not be needed for /active_substances "Content-Type": "application/json", "Cache-Control": "no-cache" } CACHE_DIR = "cache" SUBSTANCE_CACHE_FILE = os.path.join(CACHE_DIR, "substance_cache.json") PRODUCT_CACHE_FILE = os.path.join(CACHE_DIR, "product_cache.json") MRL_CACHE_FILE = os.path.join(CACHE_DIR, "mrl_cache.json") def __init__(self, use_cache: bool = True): self.session = requests.Session() self.session.headers.update(self.HEADERS) self._substance_cache: Dict[int, SubstanceDetails] = {} self._product_cache: Dict[int, Dict[str, Any]] = {} self._mrl_cache: Dict[int, List[Dict[str, Any]]] = {} self.use_cache = use_cache if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) if use_cache: self._load_caches() # Preload substances only if cache is empty if not self._substance_cache: self.preload_substance_names() def _load_caches(self): """Loads cached data from files.""" try: if os.path.exists(self.SUBSTANCE_CACHE_FILE): with open(self.SUBSTANCE_CACHE_FILE, 'r', encoding='utf-8') as f: substance_data = json.load(f) self._substance_cache = {int(k): SubstanceDetails(**v) for k, v in substance_data.items()} logger.info(f"Loaded substance cache: {len(self._substance_cache)} substances") if os.path.exists(self.PRODUCT_CACHE_FILE): with open(self.PRODUCT_CACHE_FILE, 'r', encoding='utf-8') as f: self._product_cache = {int(k): v for k, v in json.load(f).items()} logger.info(f"Loaded product cache: {len(self._product_cache)} products") if os.path.exists(self.MRL_CACHE_FILE): with open(self.MRL_CACHE_FILE, 'r', encoding='utf-8') as f: self._mrl_cache = {int(k): v for k, v in json.load(f).items()} logger.info(f"Loaded MRL cache: {len(self._mrl_cache)} entries") except Exception as e: logger.error(f"Error loading caches: {e}") self._substance_cache = {} self._product_cache = {} self._mrl_cache = {} def _save_caches(self): """Saves cached data to files.""" try: with open(self.SUBSTANCE_CACHE_FILE, 'w', encoding='utf-8') as f: substance_data = {str(k): v.__dict__ for k, v in self._substance_cache.items()} json.dump(substance_data, f, ensure_ascii=False, indent=2) with open(self.PRODUCT_CACHE_FILE, 'w', encoding='utf-8') as f: json.dump({str(k): v for k, v in self._product_cache.items()}, f, ensure_ascii=False, indent=2) with open(self.MRL_CACHE_FILE, 'w', encoding='utf-8') as f: json.dump({str(k): v for k, v in self._mrl_cache.items()}, f, ensure_ascii=False, indent=2) logger.info("All caches saved successfully") except Exception as e: logger.error(f"Error saving caches: {e}") @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def fetch_data(self, url: str, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: """Fetches data from the API with retry mechanism and optional parameters.""" try: response = self.session.get(url, params=params, timeout=15) response.raise_for_status() data = response.json() logger.debug(f"API response for {url}: {str(data)[:200]}...") return data except requests.RequestException as e: logger.error(f"Request failed for {url}: {e}") return None # Return None on failure def preload_substance_names(self) -> None: """Preloads substance names from the API.""" logger.info("Preloading substances...") url = f"{self.BASE_URL}/active_substances" # Correct endpoint params = {"format": "json", "api-version": "v2.0"} # Correct parameters, NO LANGUAGE substances_loaded = 0 data = self.fetch_data(url, params=params) if not data or "value" not in data: logger.warning("Initial fetch returned no data or unexpected format. Skipping preloading.") return while url: if "error" in data: logger.error(f"Error preloading substances: {data.get('error', 'Unknown error')}") break for substance in data.get("value", []): substance_id = substance.get("substanceId") # Corrected Key if substance_id and substance_id not in self._substance_cache: self._substance_cache[substance_id] = SubstanceDetails( name=substance.get("substanceName", "Unknown"), # Corrected Key substance_id=substance_id, status=substance.get("status"), approval_date=substance.get("approvalDate"), expiry_date=substance.get("expiryDate"), cas_number=substance.get("casNumber"), ec_number=substance.get("ecNumber") ) substances_loaded += 1 # Removed _load_substance_details next_link = data.get("nextLink") if next_link: # Correctly join the BASE_URL with the relative nextLink url = urllib.parse.urljoin(self.BASE_URL, next_link) params = None # IMPORTANT: Don't send params again! data = self.fetch_data(url) # Fetch the next page of data. NO PARAMS. if not data: break # Exit loop if data fetch fail else: url = None logger.info(f"Substances loaded so far: {substances_loaded}") if self.use_cache: self._save_caches() logger.info(f"Preloading complete. Total substances: {len(self._substance_cache)}") def get_substance_name(self, substance_id: int) -> str: """Retrieves the name of a substance by its ID.""" # First, check the cache if substance_id in self._substance_cache: return self._substance_cache[substance_id].name # If not in cache, fetch from API and use v2.0 url = f"{self.BASE_URL}/active_substances/{substance_id}" # Correct endpoint params = {"format": "json", "api-version": "v2.0"} # Use v2.0 data = self.fetch_data(url, params=params) if not data: return f"Unknown Substance ({substance_id})" # Use correct keys and expect a single object, not a list substance_name = data.get("substanceName", f"Substance {substance_id}") # Update the cache. Since we request details, get ALL details self._substance_cache[substance_id] = SubstanceDetails( name=substance_name, substance_id=substance_id, status=data.get("status"), approval_date=data.get("approvalDate"), expiry_date=data.get("expiryDate"), cas_number=data.get("casNumber"), ec_number=data.get("ecNumber") ) if self.use_cache: self._save_caches() return substance_name def get_product_list(self) -> List[Dict[str, Any]]: """Retrieves the list of all products.""" if self._product_cache: # Check the cache first return list(self._product_cache.values()) logger.info("Retrieving product list...") url = f"{self.BASE_URL}/pesticide_residues_products" # Correct endpoint params = {"format": "json", "language": "FR", "api-version": "v2.0"} # Correct params products_loaded = 0 all_products = [] while url: data = self.fetch_data(url, params=params) if not data: logger.warning("No data returned for product list.") break if "error" in data: logger.error(f"Error retrieving products: {data.get('error', 'No info')}") break # Handle both list and dict responses if isinstance(data, list): products = data elif isinstance(data, dict) and "value" in data: products = data["value"] else: logger.error(f"Unexpected API response format: {data}") break for product in products: product_id = product.get("productId") if product_id: self._product_cache[product_id] = product products_loaded += 1 all_products.append(product) next_link = data.get("nextLink") if isinstance(data, dict) else None #Get nextLink #Corrected nextlink verification if next_link: # Correctly join the BASE_URL with the relative nextLink url = urllib.parse.urljoin(self.BASE_URL, next_link) params = None # IMPORTANT: Don't send params again! The nextLink ALREADY contains them. else: url = None logger.info(f"Products retrieved so far: {products_loaded}") if self.use_cache: self._save_caches() logger.info(f"Total products retrieved: {len(self._product_cache)}") return all_products def get_mrls(self, product_id: int) -> List[Dict[str, Any]]: """Retrieves MRLs for a specific product.""" if product_id in self._mrl_cache: return self._mrl_cache[product_id] logger.info(f"Retrieving MRLs for product {product_id}...") url = f"{self.BASE_URL}/pesticide_residues_products/{product_id}/mrls" #Correct endpoint params = {"format": "json", "language": "FR", "api-version": "v2.0"} # Use v2.0 and language mrls = [] while url: data = self.fetch_data(url, params=params) if not data: break if "error" in data: logger.error(f"Error retrieving MRLs: {data.get('error', 'No info')}") break mrls.extend(data.get("value", [])) next_link = data.get("nextLink") if next_link: url = urllib.parse.urljoin(self.BASE_URL, next_link) params = None else: url = None self._mrl_cache[product_id] = mrls if self.use_cache: self._save_caches() logger.info(f"MRLs retrieved for product {product_id}: {len(mrls)}") return mrls def search_substances(self, query: str) -> List[SubstanceDetails]: """Searches for substances by name.""" query = query.lower() return sorted([ substance for substance in self._substance_cache.values() if query in substance.name.lower() ], key=lambda x: x.name) def get_substance_mrls(self, substance_id: int) -> List[Dict[str, Any]]: """Retrieves all products with MRLs for a given substance.""" logger.info(f"Retrieving MRLs for substance {substance_id}...") url = f"{self.BASE_URL}/pesticide_residues_mrls" # Correct endpoint params = {"format": "json", "api-version": "v2.0", "pesticide_residue_id": substance_id} # Correct params all_mrls = [] while url: data = self.fetch_data(url, params=params) if not data: break if "error" in data: logger.error(f"Error retrieving MRLs: {data.get('error', 'No info')}") break all_mrls.extend(data.get("value", [])) next_link = data.get("nextLink") if next_link: url = urllib.parse.urljoin(self.BASE_URL, next_link) params = None else: url = None logger.info(f"MRLs retrieved for substance {substance_id}: {len(all_mrls)}") return all_mrls def format_date(self, date_str: Optional[str]) -> str: """Formats an ISO date string into a readable format.""" if not date_str: return "N/C" try: dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) return dt.strftime("%d/%m/%Y") except (ValueError, TypeError): return date_str class PesticideApp: def __init__(self, use_cache: bool = True): logger.info("Initializing application...") self.fetcher = PesticideDataFetcher(use_cache=use_cache) logger.info("Retrieving product list...") products = self.fetcher.get_product_list() # Handle empty product list gracefully if products: self.product_list = { p.get('productName', 'Unnamed'): p.get('productId', 0) for p in products } else: self.product_list = {'No products found': 0} # Provide a default logger.warning("No products found. Product dropdown will be empty.") self.substances = sorted([ sd.name for sd in self.fetcher._substance_cache.values() ]) logger.info(f"Application initialized with {len(self.product_list)} products and {len(self.substances)} substances") def format_date(self, date_str: Optional[str]) -> str: """Formats an ISO date string.""" return self.fetcher.format_date(date_str) def get_product_details(self, product_name: str) -> pd.DataFrame: """Retrieves details for a product and returns a DataFrame.""" logger.info(f"Retrieving details for product: {product_name}") product_id = self.product_list.get(product_name) if not product_id: return pd.DataFrame([{"Error": "Product not found"}]) mrls = self.fetcher.get_mrls(product_id) if not mrls: return pd.DataFrame([{"Error": "No MRL data found"}]) data = [] for mrl in mrls: substance_id = mrl.get("pesticideResidueId", 0) substance_name = self.fetcher.get_substance_name(substance_id) substance = self.fetcher._substance_cache.get(substance_id) data.append({ "Substance": substance_name, "Valeur LMR": mrl.get("mrlValue", "N/C"), "Unité": mrl.get("mrlUnit", "mg/kg"), "Date d'effet": self.format_date(mrl.get("entryIntoForceDate")), "Statut": getattr(substance, "status", "N/C"), "CAS": getattr(substance, "cas_number", "N/C"), "EC": getattr(substance, "ec_number", "N/C"), "Date d'approbation": self.format_date(getattr(substance, "approval_date", None)), "Date d'expiration": self.format_date(getattr(substance, "expiry_date", None)) }) df = pd.DataFrame(data) logger.info(f"Details retrieved for {product_name}: {len(df)} entries") return df def search_substance(self, substance_query: str) -> pd.DataFrame: """Searches for substances by name.""" logger.info(f"Searching for substances: {substance_query}") if not substance_query or len(substance_query) < 3: return pd.DataFrame([{"Message": "Please enter at least 3 characters for the search"}]) results = self.fetcher.search_substances(substance_query) if not results: return pd.DataFrame([{"Message": "No substances found"}]) data = [{ "ID": substance.substance_id, "Nom": substance.name, "Statut": substance.status or "N/C", "N° CAS": substance.cas_number or "N/C", "N° EC": substance.ec_number or "N/C", "Date d'approbation": self.format_date(substance.approval_date), "Date d'expiration": self.format_date(substance.expiry_date) } for substance in results] df = pd.DataFrame(data) logger.info(f"Search results for '{substance_query}': {len(df)} substances found") return df def get_substance_mrls(self, substance_id: int) -> pd.DataFrame: """Retrieves all products with MRLs for a given substance and returns a DataFrame.""" logger.info(f"Retrieving MRLs for substance ID: {substance_id}") if not substance_id: return pd.DataFrame([{"Error": "Invalid substance ID"}]) substance = self.fetcher._substance_cache.get(substance_id) if not substance: return pd.DataFrame([{"Error": "Substance not found"}]) all_mrls = self.fetcher.get_substance_mrls(substance_id) if not all_mrls: return pd.DataFrame([{"Message": f"No MRLs found for {substance.name}"}]) data = [] for mrl in all_mrls: product_id = mrl.get("productId") product_name = "Unknown" if product_id in self.fetcher._product_cache: product_name = self.fetcher._product_cache[product_id].get("productName", "Unknown") data.append({ "Produit": product_name, "Valeur LMR": mrl.get("mrlValue", "N/C"), "Unité": mrl.get("mrlUnit", "mg/kg"), "Date d'effet": self.format_date(mrl.get("entryIntoForceDate")), "Notes": mrl.get("footnotes", "") }) df = pd.DataFrame(data) logger.info(f"MRLs retrieved for {substance.name}: {len(df)} entries") return df def create_histogram(self, df: pd.DataFrame) -> go.Figure: """Creates a histogram of MRL values.""" if "Valeur LMR" not in df.columns or df.empty: return go.Figure(data=[], layout=go.Layout(title="No data available for histogram")) numeric_values = [] for val in df["Valeur LMR"]: try: if isinstance(val, str): if val not in ("*", "LOQ", "N/C"): numeric_values.append(float(val.replace(",", "."))) else: numeric_values.append(float(val)) except (ValueError, TypeError): continue if not numeric_values: return go.Figure(data=[], layout=go.Layout(title="No numeric values available for histogram")) fig = go.Figure(data=[go.Histogram(x=numeric_values, nbinsx=20)]) fig.update_layout( title="Distribution of MRL Values", xaxis_title="MRL Value (mg/kg)", yaxis_title="Frequency", bargap=0.1 ) return fig def create_pie_chart(self, df: pd.DataFrame, column: str) -> go.Figure: """Creates a pie chart for a given column.""" if column not in df.columns or df.empty: return go.Figure(data=[], layout=go.Layout(title=f"No data available for {column}")) value_counts = df[column].value_counts() fig = go.Figure(data=[go.Pie(labels=value_counts.index, values=value_counts.values)]) fig.update_layout(title=f"Distribution by {column}") return fig def create_ui(self) -> gr.Blocks: """Creates the user interface with Gradio.""" with gr.Blocks(theme=gr.themes.Soft()) as ui: gr.HTML("""
View Maximum Residue Limits (MRLs) of pesticides in food products.
This application allows you to view the Maximum Residue Limits (MRLs) of pesticides allowed in food products in the European Union.
Data is sourced from the official European Commission API: