|
import logging |
|
from concurrent.futures import ThreadPoolExecutor |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Optional |
|
import gradio as gr |
|
import pandas as pd |
|
import requests |
|
from dataclasses import dataclass |
|
from tenacity import retry, stop_after_attempt, wait_fixed |
|
import plotly.express as px |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class PesticideRecord: |
|
"""Structure de données pour les enregistrements de pesticides.""" |
|
substance_name: str |
|
mrl_value: float |
|
entry_into_force_date: str |
|
regulation_number: str |
|
regulation_url: str |
|
modification_date: Optional[str] = None |
|
substance_status: Optional[str] = None |
|
approval_date: Optional[str] = None |
|
expiry_date: Optional[str] = None |
|
|
|
class PesticideDataFetcher: |
|
"""Classe pour gérer la récupération des données sur les pesticides.""" |
|
BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides" |
|
HEADERS = { |
|
'Content-Type': 'application/json', |
|
'Cache-Control': 'no-cache', |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
} |
|
|
|
def __init__(self): |
|
self.session = self._create_session() |
|
self._substance_cache = {} |
|
self._product_cache = {} |
|
|
|
def _create_session(self): |
|
"""Crée une session pour les requêtes HTTP.""" |
|
session = requests.Session() |
|
for header, value in self.HEADERS.items(): |
|
session.headers[header] = value |
|
return session |
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) |
|
def fetch_data(self, url: str) -> Dict: |
|
"""Récupère les données depuis l'API avec gestion d'erreurs et retry.""" |
|
try: |
|
response = self.session.get(url, timeout=10) |
|
response.raise_for_status() |
|
data = response.json() |
|
logger.info(f"Fetched data from {url}: {str(data)[:200]}...") |
|
return data |
|
except requests.RequestException as e: |
|
logger.error(f"Failed to fetch data from {url}: {str(e)}") |
|
return {"error": str(e)} |
|
|
|
def get_active_substance_details(self, substance_name: str) -> Dict: |
|
"""Récupère les détails d'une substance active.""" |
|
if substance_name in self._substance_cache: |
|
return self._substance_cache[substance_name] |
|
url = f"{self.BASE_URL}/active_substances?format=json&substance_name={substance_name}&api-version=v2.0" |
|
response = self.fetch_data(url) |
|
if response and "value" in response and response["value"]: |
|
substance_data = response["value"][0] |
|
self._substance_cache[substance_name] = { |
|
"status": substance_data.get("substance_status"), |
|
"approval_date": substance_data.get("approval_date"), |
|
"expiry_date": substance_data.get("expiry_date") |
|
} |
|
return self._substance_cache[substance_name] |
|
return {} |
|
|
|
def get_products(self) -> List[Dict]: |
|
"""Récupère la liste complète des produits avec pagination.""" |
|
if self._product_cache: |
|
return self._product_cache |
|
all_products = [] |
|
base_url = f"{self.BASE_URL}/pesticide_residues_products?format=json&language=FR&api-version=v2.0" |
|
url = base_url |
|
while url: |
|
response = self.fetch_data(url) |
|
if not response or "value" not in response: |
|
break |
|
all_products.extend(response["value"]) |
|
next_link = response.get("@odata.nextLink") |
|
if next_link: |
|
url = next_link |
|
else: |
|
break |
|
self._product_cache = all_products |
|
logger.info(f"Récupéré {len(all_products)} produits au total") |
|
return all_products |
|
|
|
def get_mrls(self, product_id: int) -> List[Dict]: |
|
"""Récupère les LMR pour un produit spécifique.""" |
|
url = f"{self.BASE_URL}/pesticide_residues_mrls?format=json&product_id={product_id}&api-version=v2.0" |
|
response = self.fetch_data(url) |
|
return response.get("value", []) |
|
|
|
def get_substance_details(self, pesticide_residue_id: int) -> Dict: |
|
"""Récupère les détails d'une substance à partir de son ID.""" |
|
url = f"{self.BASE_URL}/pesticide_residues/{pesticide_residue_id}?format=json&api-version=v2.0" |
|
response = self.fetch_data(url) |
|
if not response or "value" not in response or not response["value"]: |
|
logger.warning(f"Pas de détails trouvés pour la substance {pesticide_residue_id}") |
|
return {"substance_name": f"Substance {pesticide_residue_id}"} |
|
substance_data = response["value"][0] |
|
substance_name = substance_data.get("substance_name") |
|
if not substance_name: |
|
logger.warning(f"Nom de substance non trouvé pour l'ID {pesticide_residue_id}") |
|
return {"substance_name": f"Substance {pesticide_residue_id}"} |
|
active_url = f"{self.BASE_URL}/active_substances?format=json&substance_name={substance_name}&api-version=v2.0" |
|
active_response = self.fetch_data(active_url) |
|
details = { |
|
"substance_name": substance_name, |
|
"status": None, |
|
"approval_date": None, |
|
"expiry_date": None |
|
} |
|
if active_response and "value" in active_response and active_response["value"]: |
|
active_data = active_response["value"][0] |
|
details.update({ |
|
"status": active_data.get("substance_status"), |
|
"approval_date": active_data.get("approval_date"), |
|
"expiry_date": active_data.get("expiry_date") |
|
}) |
|
return details |
|
|
|
def get_substance_name_by_id(self, substance_id: int) -> str: |
|
"""Récupère le nom de la substance à partir de son ID.""" |
|
url = f"{self.BASE_URL}/active_substances/{substance_id}?format=json&api-version=v2.0" |
|
response = self.fetch_data(url) |
|
if response and "value" in response and response["value"]: |
|
substance_data = response["value"][0] |
|
return substance_data.get("substance_name", f"Substance {substance_id}") |
|
return f"Substance {substance_id}" |
|
|
|
def get_all_substances(self) -> List[str]: |
|
"""Récupère la liste complète des substances actives avec pagination.""" |
|
if self._substance_cache: |
|
return list(self._substance_cache.keys()) |
|
all_substances = set() |
|
base_url = f"{self.BASE_URL}/active_substances?format=json&api-version=v2.0" |
|
url = base_url |
|
while url: |
|
response = self.fetch_data(url) |
|
if not response or "value" not in response: |
|
break |
|
for item in response.get("value", []): |
|
substance_name = item.get("substance_name") |
|
if substance_name: |
|
all_substances.add(substance_name) |
|
self._substance_cache[substance_name] = { |
|
"status": item.get("substance_status"), |
|
"approval_date": item.get("approval_date"), |
|
"expiry_date": item.get("expiry_date") |
|
} |
|
next_link = response.get("@odata.nextLink") |
|
if next_link: |
|
url = next_link |
|
else: |
|
break |
|
logger.info(f"Récupéré {len(all_substances)} substances au total") |
|
return sorted(all_substances) |
|
|
|
class PesticideInterface: |
|
"""Classe pour gérer l'interface utilisateur Gradio.""" |
|
def __init__(self): |
|
self.fetcher = PesticideDataFetcher() |
|
self.products = self.fetcher.get_products() |
|
self.product_choices = { |
|
p['product_name']: p['product_id'] for p in self.products |
|
} |
|
self.substances = self.fetcher.get_all_substances() |
|
self._cache = {} |
|
logger.info(f"Initialized interface with {len(self.product_choices)} products and {len(self.substances)} substances.") |
|
|
|
def parse_date(self, date_str: str) -> Optional[str]: |
|
"""Convertit une date au format 'YYYY-MM-DD'.""" |
|
if not date_str: |
|
return None |
|
for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"): |
|
try: |
|
return datetime.strptime(date_str, fmt).strftime("%Y-%m-%d") |
|
except ValueError: |
|
continue |
|
logger.warning(f"Impossible de parser la date : {date_str}") |
|
return None |
|
|
|
def filter_by_period(self, data: List[Dict], period: str) -> List[Dict]: |
|
"""Filtre les données selon la période sélectionnée.""" |
|
if period == "Toutes les dates": |
|
return data |
|
today = datetime.now() |
|
start_date = { |
|
"Dernière semaine": today - timedelta(days=7), |
|
"Dernier mois": today - timedelta(days=30), |
|
"Prochains 6 mois": today + timedelta(days=180) |
|
}.get(period) |
|
if not start_date: |
|
return data |
|
filtered_data = [] |
|
for item in data: |
|
date_str = item.get("entry_into_force_date") or item.get("modification_date") |
|
if date_str: |
|
parsed_date = self.parse_date(date_str) |
|
if parsed_date: |
|
item_date = datetime.strptime(parsed_date, "%Y-%m-%d") |
|
if (period == "Prochains 6 mois" and item_date <= start_date) or (period != "Prochains 6 mois" and item_date >= start_date): |
|
item["parsed_date"] = parsed_date |
|
filtered_data.append(item) |
|
logger.info(f"Filtered {len(data)} items to {len(filtered_data)} items for period {period}.") |
|
return filtered_data |
|
|
|
def format_regulation_link(self, regulation_url: str, regulation_number: str) -> str: |
|
"""Formate un lien de règlement en HTML cliquable.""" |
|
if not regulation_url: |
|
return regulation_number |
|
return f'<a href="{regulation_url}" target="_blank">{regulation_number}</a>' |
|
|
|
def get_product_details(self, product_name: str, period: str, show_only_changes: bool) -> pd.DataFrame: |
|
"""Récupère et formate les détails des MRLs pour un produit donné.""" |
|
logger.info(f"Récupération des détails pour le produit: {product_name}") |
|
try: |
|
if not product_name: |
|
return pd.DataFrame({"Message": ["Veuillez sélectionner un produit"]}) |
|
product_id = self.product_choices.get(product_name) |
|
if not product_id: |
|
return pd.DataFrame({"Message": ["Produit non trouvé"]}) |
|
cache_key = f"{product_id}_{period}_{show_only_changes}" |
|
if cache_key in self._cache: |
|
return self._cache[cache_key] |
|
mrls = self.fetcher.get_mrls(product_id) |
|
if period != "Toutes les dates": |
|
mrls = self.filter_by_period(mrls, period) |
|
if not mrls: |
|
return pd.DataFrame({"Message": ["Aucune donnée trouvée pour la période sélectionnée"]}) |
|
processed_mrls = [] |
|
with ThreadPoolExecutor(max_workers=10) as executor: |
|
futures = { |
|
executor.submit(self.fetcher.get_substance_details, mrl["pesticide_residue_id"]): mrl |
|
for mrl in mrls |
|
} |
|
for future in futures: |
|
mrl = futures[future] |
|
try: |
|
substance_details = future.result() |
|
logger.info(f"Détails de la substance récupérés: {substance_details}") |
|
|
|
mrl_value = mrl.get("mrl_value", "") |
|
if isinstance(mrl_value, (int, float)): |
|
formatted_mrl = f"{mrl_value}{'*' if str(mrl_value).endswith('*') else ''}" |
|
else: |
|
formatted_mrl = str(mrl_value) |
|
mrl_data = { |
|
"Substance": substance_details.get("substance_name", ""), |
|
"Valeur LMR": formatted_mrl, |
|
"Date d'application": self.parse_date(mrl.get("entry_into_force_date")), |
|
"Date de modification": self.parse_date(mrl.get("modification_date")), |
|
"Règlement": self.format_regulation_link( |
|
mrl.get("regulation_url", ""), |
|
mrl.get("regulation_number", "") or mrl.get("regulation_reference", "") |
|
), |
|
"Statut": substance_details.get("status", ""), |
|
"Date d'approbation": self.parse_date(substance_details.get("approval_date")), |
|
"Date d'expiration": self.parse_date(substance_details.get("expiry_date")) |
|
} |
|
logger.info(f"Données MRL formatées: {mrl_data}") |
|
processed_mrls.append(mrl_data) |
|
except Exception as e: |
|
logger.error(f"Erreur lors du traitement de la substance: {e}") |
|
df = pd.DataFrame(processed_mrls) |
|
if show_only_changes and "Date de modification" in df.columns: |
|
df = df[df["Date de modification"].notna()] |
|
df = df.sort_values("Date d'application", ascending=False) |
|
columns_order = [ |
|
"Substance", "Valeur LMR", "Date d'application", "Date de modification", |
|
"Règlement", "Statut", "Date d'approbation", "Date d'expiration" |
|
] |
|
df = df[columns_order] |
|
self._cache[cache_key] = df |
|
return df |
|
except Exception as e: |
|
logger.error(f"Erreur dans get_product_details: {str(e)}") |
|
return pd.DataFrame({"Message": [f"Erreur: {str(e)}"]}) |
|
|
|
def create_graph(self, df: pd.DataFrame) -> gr.Plot: |
|
"""Crée un graphique interactif pour les dates d'application.""" |
|
fig = px.scatter(df, x='Date d\'application', y='Valeur LMR', color='Substance', title='Dates d\'application des LMR') |
|
return fig |
|
|
|
def export_data(self, df: pd.DataFrame) -> str: |
|
"""Exporte les données sous forme de fichier CSV.""" |
|
csv_file_path = "mrls_data.csv" |
|
df.to_csv(csv_file_path, index=False) |
|
return csv_file_path |
|
|
|
def get_substance_details_table(self, substance_name: str) -> pd.DataFrame: |
|
"""Récupère et formate les détails d'une substance active.""" |
|
logger.info(f"Récupération des détails pour la substance: {substance_name}") |
|
try: |
|
if not substance_name: |
|
return pd.DataFrame({"Message": ["Veuillez sélectionner une substance"]}) |
|
substance_details = self.fetcher.get_active_substance_details(substance_name) |
|
if not substance_details: |
|
return pd.DataFrame({"Message": ["Substance non trouvée"]}) |
|
df = pd.DataFrame([substance_details]) |
|
df = df.rename(columns={ |
|
"status": "Statut", |
|
"approval_date": "Date d'approbation", |
|
"expiry_date": "Date d'expiration" |
|
}) |
|
df.insert(0, "Substance", substance_name) |
|
return df |
|
except Exception as e: |
|
logger.error(f"Erreur dans get_substance_details_table: {str(e)}") |
|
return pd.DataFrame({"Message": [f"Erreur: {str(e)}"]}) |
|
|
|
def create_interface(self) -> gr.Blocks: |
|
"""Crée l'interface Gradio avec un design amélioré.""" |
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="green", secondary_hue="blue")) as interface: |
|
gr.Markdown(""" |
|
# 🌿 Base de données des pesticides de l'UE |
|
Consultez les Limites Maximales de Résidus (LMR) et les informations sur les substances actives. |
|
""") |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
product_dropdown = gr.Dropdown( |
|
choices=sorted(list(self.product_choices.keys())), |
|
label="Produit", |
|
info="Sélectionnez un produit agricole" |
|
) |
|
with gr.Column(scale=1): |
|
period_radio = gr.Radio( |
|
choices=["Dernière semaine", "Dernier mois", "Prochains 6 mois", "Toutes les dates"], |
|
value="Toutes les dates", |
|
label="Période", |
|
info="Filtrer par période" |
|
) |
|
show_changes = gr.Checkbox( |
|
label="Afficher uniquement les modifications récentes", |
|
info="Cochez pour voir uniquement les LMR qui ont été modifiées" |
|
) |
|
with gr.Row(): |
|
fetch_btn = gr.Button("📊 Analyser les données", variant="primary") |
|
with gr.Row(): |
|
mrls_table = gr.Dataframe( |
|
headers=["Substance", "Valeur LMR", "Date d'application", |
|
"Date de modification", "Règlement", "Statut", |
|
"Date d'approbation", "Date d'expiration"], |
|
interactive=False |
|
) |
|
graph_output = gr.Plot(label="Graphique des Dates d'Application") |
|
with gr.Row(): |
|
export_btn = gr.Button("Exporter les données", variant="secondary") |
|
export_output = gr.File(label="Fichier CSV Exporté") |
|
with gr.Row(): |
|
substance_dropdown = gr.Dropdown( |
|
choices=sorted(self.substances), |
|
label="Substance", |
|
info="Sélectionnez une substance active" |
|
) |
|
substance_table = gr.Dataframe( |
|
headers=["Substance", "Statut", "Date d'approbation", "Date d'expiration"], |
|
interactive=False |
|
) |
|
fetch_btn.click( |
|
fn=self.get_product_details, |
|
inputs=[product_dropdown, period_radio, show_changes], |
|
outputs=[mrls_table] |
|
) |
|
mrls_table.change( |
|
fn=self.create_graph, |
|
inputs=mrls_table, |
|
outputs=graph_output |
|
) |
|
export_btn.click( |
|
fn=self.export_data, |
|
inputs=mrls_table, |
|
outputs=export_output |
|
) |
|
substance_dropdown.change( |
|
fn=self.get_substance_details_table, |
|
inputs=substance_dropdown, |
|
outputs=substance_table |
|
) |
|
gr.Markdown(""" |
|
### Légende |
|
Valeur LMR** : Limite Maximale de Résidus autorisée |
|
Date d'application** : Date d'entrée en vigueur de la LMR |
|
Date de modification** : Date de la dernière modification |
|
Règlement** : Référence du règlement européen (cliquez pour accéder au texte) |
|
Statut** : État d'approbation de la substance active |
|
Date d'approbation** : Date d'approbation de la substance active |
|
Date d'expiration** : Date d'expiration de l'approbation |
|
""") |
|
return interface |
|
|
|
def main(): |
|
interface = PesticideInterface() |
|
app = interface.create_interface() |
|
app.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |