from io import StringIO import numpy as np import pandas as pd import requests from bs4 import BeautifulSoup import json import os import traceback import uuid import zipfile import io import subprocess import os import re import time from datetime import datetime from dotenv import load_dotenv import warnings from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Any, Dict, List, Literal, Optional load_dotenv() warnings.filterwarnings("ignore") app = FastAPI(title="3GPP Document Finder API", description="API to find 3GPP documents based on TSG document IDs") app.mount("/static", StaticFiles(directory="static"), name="static") origins = [ "*", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_text(specification: str, version: str): """Récupère les bytes du PDF à partir d'une spécification et d'une version.""" doc_id = specification series = doc_id.split(".")[0] response = requests.get( f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", verify=False, headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} ) if response.status_code != 200: raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") zip_bytes = io.BytesIO(response.content) with zipfile.ZipFile(zip_bytes) as zf: for file_name in zf.namelist(): if file_name.endswith("zip"): print("Another ZIP !") zip_bytes = io.BytesIO(zf.read(file_name)) zf = zipfile.ZipFile(zip_bytes) for file_name2 in zf.namelist(): if file_name2.endswith("doc") or file_name2.endswith("docx"): if "cover" in file_name2.lower(): print("COVER !") continue ext = file_name2.split(".")[-1] doc_bytes = zf.read(file_name2) temp_id = str(uuid.uuid4()) input_path = f"/tmp/{temp_id}.{ext}" output_path = f"/tmp/{temp_id}.txt" with open(input_path, "wb") as f: f.write(doc_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "r") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) return txt_data elif file_name.endswith("doc") or file_name.endswith("docx"): if "cover" in file_name.lower(): print("COVER !") continue ext = file_name.split(".")[-1] doc_bytes = zf.read(file_name) temp_id = str(uuid.uuid4()) input_path = f"/tmp/{temp_id}.{ext}" output_path = f"/tmp/{temp_id}.txt" print("Ecriture") with open(input_path, "wb") as f: f.write(doc_bytes) print("Convertissement") subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) print("Ecriture TXT") with open(output_path, "r", encoding="utf-8") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) return txt_data raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") def get_scope(specification: str, version: str): try: spec_text = get_text(specification, version) scp_i = 0 nxt_i = 0 for x in range(len(spec_text)): text = spec_text[x] if re.search(r"scope$", text, flags=re.IGNORECASE): scp_i = x nxt_i = scp_i + 10 if re.search(r"references$", text, flags=re.IGNORECASE): nxt_i = x return re.sub(r"\s+", " ", " ".join(spec_text[scp_i+1:nxt_i])) if len(spec_text[scp_i+1:nxt_i]) < 2 else "Not found" except Exception as e: traceback.print_exception(e) return "Not found (error)" class DocRequest(BaseModel): doc_id: str release: Optional[int] = None class DocResponse(BaseModel): doc_id: str url: str scope: Optional[str] = None search_time: float class BatchDocRequest(BaseModel): doc_ids: List[str] release: Optional[int] = None class BatchDocResponse(BaseModel): results: Dict[str, str] missing: List[str] search_time: float class KeywordRequest(BaseModel): keywords: str release: Optional[str] = None wg: Optional[str] = None spec_type: Optional[Literal["TS", "TR"]] = None mode: Optional[Literal["and", "or"]] = "and" class KeywordResponse(BaseModel): results: List[Dict[str, str]] search_time: float class TsgDocFinder: def __init__(self): self.main_ftp_url = "https://www.3gpp.org/ftp" self.indexer_file = "indexed_docs.json" self.indexer, self.last_indexer_date = self.load_indexer() def load_indexer(self): """Load existing index if available""" if os.path.exists(self.indexer_file): with open(self.indexer_file, "r", encoding="utf-8") as f: x = json.load(f) return x["docs"], x["last_indexed_date"] return {}, None def save_indexer(self): """Save the updated index""" self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") with open(self.indexer_file, "w", encoding="utf-8") as f: today = datetime.today() output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date} json.dump(output, f, indent=4, ensure_ascii=False) def get_workgroup(self, doc): main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None if main_tsg is None: return None, None, None workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() return main_tsg, workgroup, doc def find_workgroup_url(self, main_tsg, workgroup): """Find the URL for the specific workgroup""" response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) soup = BeautifulSoup(response.text, 'html.parser') for item in soup.find_all("tr"): link = item.find("a") if link and workgroup in link.get_text(): return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" def get_docs_from_url(self, url): """Get list of documents/directories from a URL""" try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: print(f"Error accessing {url}: {e}") return [] def search_document(self, doc_id: str, release=None): original_id = doc_id if original_id in self.indexer: return self.indexer[original_id] for doc in self.indexer: if doc.startswith(original_id): return self.indexer[doc] # 2. Recherche live "classique" (TSG/CT) main_tsg, workgroup, doc = self.get_workgroup(doc_id) if main_tsg: wg_url = self.find_workgroup_url(main_tsg, workgroup) if wg_url: meeting_folders = self.get_docs_from_url(wg_url) for folder in meeting_folders: meeting_url = f"{wg_url}/{folder}" meeting_contents = self.get_docs_from_url(meeting_url) key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None if key is not None: docs_url = f"{meeting_url}/{key}" files = self.get_docs_from_url(docs_url) for file in files: if doc in file.lower() or original_id in file: doc_url = f"{docs_url}/{file}" self.indexer[original_id] = doc_url return doc_url # ZIP subfolder if "zip" in [x for x in files]: zip_url = f"{docs_url}/zip" zip_files = self.get_docs_from_url(zip_url) for file in zip_files: if doc in file.lower() or original_id in file: doc_url = f"{zip_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url # 3. Dernier recours : tenter dans /ftp/workshop (recherche live) workshop_url = f"{self.main_ftp_url}/workshop" meetings = self.get_docs_from_url(workshop_url) for meeting in meetings: if meeting in ['./', '../']: continue meeting_url = f"{workshop_url}/{meeting}" contents = self.get_docs_from_url(meeting_url) for sub in contents: if sub.lower() in ['docs', 'tdocs']: docs_url = f"{meeting_url}/{sub}" files = self.get_docs_from_url(docs_url) for file in files: if doc_id.lower() in file.lower() or original_id in file: doc_url = f"{docs_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url if "zip" in [x.lower() for x in files]: zip_url = f"{docs_url}/zip" zip_files = self.get_docs_from_url(zip_url) for file in zip_files: if doc_id.lower() in file.lower() or original_id in file: doc_url = f"{zip_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url return f"Document {doc_id} not found" class SpecDocFinder: def __init__(self): self.chars = "0123456789abcdefghijklmnopqrstuvwxyz" self.indexer_file = "indexed_specifications.json" self.indexer_specs, self.indexer_scopes, self.last_indexer_date = self.load_indexer() def load_indexer(self): """Load existing index if available""" if os.path.exists(self.indexer_file): with open(self.indexer_file, "r", encoding="utf-8") as f: x = json.load(f) return x["specs"], x["scopes"], x["last_indexed_date"] return {}, {}, None def save_indexer(self): """Save the updated index""" self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") with open(self.indexer_file, "w", encoding="utf-8") as f: today = datetime.today() output = {"specs": self.indexer_specs, "scopes": self.indexer_scopes, "last_indexed_date": self.last_indexer_date} json.dump(output, f, indent=4, ensure_ascii=False) def search_document(self, doc_id, release = None): for key in self.indexer_specs.keys(): if str(doc_id) in key: return self.indexer_specs[key]['url'] series = doc_id.split(".")[0] while len(series) < 2: series = "0" + series url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}" response = requests.get(url, verify=False) soup = BeautifulSoup(response.text, 'html.parser') items = soup.find_all("tr")[1:] version_found = None if release is None: try: item = items[-1].find("a") except Exception as e: print(e) return f"Unable to find specification {doc_id}" a, b, c = [_ for _ in item.get_text().split("-")[-1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, item.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" else: for item in items: x = item.find("a") if f"{doc_id.replace('.', '')}-{self.chars[int(release)]}" in x.get_text(): a, b, c = [_ for _ in x.get_text().split("-")[-1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, x.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" finder_tsg = TsgDocFinder() finder_spec = SpecDocFinder() @app.get("/") async def main_menu(): return FileResponse(os.path.join("templates", "index.html")) @app.post("/search-spec", response_model=KeywordResponse) def search_spec(request: KeywordRequest): start_time = time.time() kws = [_.lower() for _ in request.keywords.split(" ")] results = [] for string, spec in finder_spec.indexer_specs.items(): if request.mode == "and": if not all(kw in string.lower() for kw in kws): continue elif request.mode == "or": if not any(kw in string.lower() for kw in kws): continue release = request.release working_group = request.wg spec_type = request.spec_type if spec.get('version', None) is None or (release is not None and spec["version"].split(".")[0] != str(release)): continue if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group): continue if spec_type is not None and spec["type"] != spec_type: continue results.append(spec) if len(results) > 0: return KeywordResponse( results=results, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specifications not found") # def search_spec(request: KeywordRequest): # chars = "0123456789abcdefghijklmnopqrstuvwxyz" # start_time = time.time() # response = requests.get(f'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, verify=False) # dfs = pd.read_html(StringIO(response.text), storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, encoding="utf-8") # for x in range(len(dfs)): # dfs[x] = dfs[x].replace({np.nan: None}) # columns_needed = [0, 1, 2, 3, 4] # extracted_dfs: List[pd.DataFrame] = [df.iloc[:, columns_needed] for df in dfs] # columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] # specifications = [] # for df in extracted_dfs: # for index, row in df.iterrows(): # doc = row.to_list() # doc_dict = dict(zip(columns, doc)) # specifications.append(doc_dict) # kws = [_.lower() for _ in request.keywords.split(" ")] # results = [] # for spec in specifications: # if request.mode == "and": # if not all(kw in spec["title"].lower() for kw in kws): # continue # elif request.mode == "or": # if not any(kw in spec["title"].lower() for kw in kws): # continue # release = request.release # working_group = request.wg # spec_type = request.spec_type # if spec.get('vers', None) is None or (release is not None and spec["vers"].split(".")[0] != str(release)): # continue # if spec.get('WG', None) is None or (working_group is not None and spec["WG"] != working_group): # continue # if spec_type is not None and spec["type"] != spec_type: # continue # doc_id = str(spec["spec_num"]) # series = doc_id.split(".")[0] # a, b, c = str(spec["vers"]).split(".") # print(spec["vers"]) # if not (int(a) > 35 or int(b) > 35 or int(c) > 35): # spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}.zip" # else: # x,y,z = str(a), str(b), str(c) # while len(x) < 2: # x = "0" + x # while len(y) < 2: # y = "0" + y # while len(z) < 2: # z = "0" + z # spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{x}{y}{z}.zip" # results.append({ # "id": str(spec["spec_num"]), # "title": spec["title"], # "type": "Technical Specification" if spec["type"] == "TS" else "Technical Report", # "release": str(spec["vers"].split(".")[0]), # "version": str(spec["vers"]), # "working_group": spec["WG"], # "url": spec_url # }) # if len(results) > 0: # return KeywordResponse( # results=results, # search_time=time.time() - start_time # ) # else: # raise HTTPException(status_code=404, detail="Specification not found") @app.post("/find", response_model=DocResponse) def find_document(request: DocRequest): start_time = time.time() finder = finder_tsg if request.doc_id[0].isalpha() else finder_spec print(finder) result = finder.search_document(request.doc_id, request.release) if "not found" not in result and "Could not" not in result and "Unable" not in result: version = result.split("/")[-1].replace(".zip", "").split("-")[-1] return DocResponse( doc_id=request.doc_id, url=result, search_time=time.time() - start_time ) if isinstance(finder, TsgDocFinder) else DocResponse( doc_id=request.doc_id, url=result, search_time=time.time() - start_time, scope=finder.indexer_scopes[request.doc_id] if request.doc_id in finder.indexer_scopes else get_scope(request.doc_id, version) ) else: raise HTTPException(status_code=404, detail=result) @app.post("/batch", response_model=BatchDocResponse) def find_documents_batch(request: BatchDocRequest): start_time = time.time() results = {} missing = [] for doc_id in request.doc_ids: finder = finder_tsg if doc_id[0].isalpha() else finder_spec result = finder.search_document(doc_id) if "not found" not in result and "Could not" not in result and "Unable" not in result: results[doc_id] = result else: missing.append(doc_id) return BatchDocResponse( results=results, missing=missing, search_time=time.time() - start_time )