Spaces:
Sleeping
Sleeping
# app.py — resilient ASR/OCR with Diagnostics (no NLTK) | |
from __future__ import annotations | |
import os, re, json, time, glob, uuid, shutil, subprocess, urllib.parse, io | |
from typing import List, Dict, Optional | |
from datetime import datetime, timezone | |
import numpy as np | |
import pandas as pd | |
import requests | |
import gradio as gr | |
# ---------- small helpers ---------- | |
def now_iso(): return datetime.now(timezone.utc).isoformat() | |
def normalize_ws(s: str) -> str: return re.sub(r"\s+", " ", s or "").strip() | |
def sent_tokenize(txt: str) -> List[str]: | |
return [s.strip() for s in re.split(r'(?<=[.!?])\s+|\n+', txt or '') if s.strip()] | |
def domain_from_url(url: str) -> str: | |
try: return urllib.parse.urlparse(url).netloc.lower() | |
except Exception: return "" | |
USER_AGENT = "DisinfoFactcheck/1.0 (contact: [email protected])" | |
HEADERS = {"User-Agent": USER_AGENT} | |
DEFAULT_ALLOWLIST = [ | |
"who.int","cdc.gov","nih.gov","ema.europa.eu","ecdc.europa.eu", | |
"reuters.com","apnews.com","associatedpress.com","bbc.com","bbc.co.uk", | |
"nytimes.com","washingtonpost.com","theguardian.com", | |
"factcheck.org","snopes.com","fullfact.org","politifact.com", | |
"un.org","unesco.org","oecd.org","worldbank.org","imf.org", | |
"nature.com","sciencemag.org","thelancet.com","nejm.org", | |
"britannica.com","nationalgeographic.com","history.com","worldhistory.org", | |
"smithsonianmag.com","metmuseum.org","egypt.travel" | |
] | |
# ---------- guarded imports ---------- | |
def _try(name): | |
try: return __import__(name) | |
except Exception: return None | |
duckduckgo_search = _try("duckduckgo_search") | |
trafilatura = _try("trafilatura") | |
rank_bm25 = _try("rank_bm25") | |
sentence_transformers = _try("sentence_transformers") | |
transformers = _try("transformers") | |
# ASR backends | |
try: | |
from faster_whisper import WhisperModel as FWWhisperModel | |
except Exception: | |
FWWhisperModel = None | |
try: | |
import whisper as OpenAIWhisper | |
except Exception: | |
OpenAIWhisper = None | |
# OCR backends | |
try: | |
import easyocr as _easyocr | |
except Exception: | |
_easyocr = None | |
try: | |
import pytesseract as _pyt | |
except Exception: | |
_pyt = None | |
try: | |
import cv2 | |
except Exception: | |
cv2 = None | |
# ---------- env probes ---------- | |
def ffmpeg_available() -> bool: | |
return bool(shutil.which("ffmpeg")) | |
def gpu_available() -> bool: | |
return bool(shutil.which("nvidia-smi")) | |
def asr_backends(): | |
b = [] | |
if FWWhisperModel: b.append("faster-whisper") | |
if OpenAIWhisper: b.append("openai-whisper") | |
return b | |
def ocr_backends(): | |
b = [] | |
if _easyocr and cv2: b.append("easyocr") | |
if _pyt and shutil.which("tesseract"): b.append("tesseract") | |
return b | |
# ---------- text chunking ---------- | |
def split_into_chunks(text: str, max_chars: int = 700) -> List[str]: | |
sents = [normalize_ws(s) for s in sent_tokenize(text or "")] | |
chunks, cur = [], "" | |
for s in sents: | |
if len(cur) + 1 + len(s) > max_chars and cur: | |
chunks.append(cur.strip()); cur = s | |
else: | |
cur = (cur + " " + s).strip() | |
if cur: chunks.append(cur.strip()) | |
return [c for c in chunks if len(c) > 40] | |
# ---------- Wikipedia ---------- | |
WIKI_API = "https://en.wikipedia.org/w/api.php" | |
def wiki_search(query: str, n: int = 6) -> List[Dict]: | |
r = requests.get(WIKI_API, params={"action":"query","list":"search","srsearch":query,"srlimit":n,"format":"json"}, | |
headers=HEADERS, timeout=20) | |
r.raise_for_status() | |
return r.json().get("query",{}).get("search",[]) | |
def wiki_page_content(pageid: int) -> Dict: | |
r = requests.get(WIKI_API, params={"action":"query","prop":"extracts|info|revisions","pageids":pageid,"inprop":"url", | |
"rvprop":"timestamp","explaintext":1,"format":"json"}, | |
headers=HEADERS, timeout=20) | |
r.raise_for_status() | |
page = next(iter(r.json().get("query",{}).get("pages",{}).values())) | |
return {"pageid": page.get("pageid"), "title": page.get("title"), "url": page.get("fullurl"), | |
"last_modified": (page.get("revisions") or [{}])[0].get("timestamp"), "text": page.get("extract") or ""} | |
REPORTING_PREFIXES = re.compile(r'^(from a video:|another line says:|it also claims:|the video says:|the speaker claims:|someone said:)', re.I) | |
STOP = {"the","a","an","from","it","also","claims","claim","says","said","line","video","across","cities","that","this","these","those","is","are","was","were","has","have","had","will","can","does","did"} | |
def sanitize_claim_for_search(s: str) -> str: | |
s = REPORTING_PREFIXES.sub('', (s or "").strip()).strip('"\'' ) | |
s = re.sub(r"[^A-Za-z0-9\s-]", " ", s) | |
return re.sub(r"\s+", " ", s).strip() | |
def keywords_only(s: str, limit: int = 10) -> str: | |
toks = [w for w in s.lower().split() if w not in STOP] | |
return " ".join(toks[:limit]) or s | |
def heuristic_rewrites(s: str) -> List[str]: | |
rewrites = [s, s + " misinformation"] | |
rewrites.append(re.sub(r"5g[^\w]+.*covid[- ]?19", "5G COVID-19 conspiracy", s, flags=re.I)) | |
rewrites.append(re.sub(r"owns?\s+the\s+world\s+health\s+organization", "Bill Gates WHO relationship", s, flags=re.I)) | |
rewrites.append(re.sub(r"nasa[^\w]+.*darkness", "NASA hoax darkness", s, flags=re.I)) | |
return list(dict.fromkeys([sanitize_claim_for_search(x) for x in rewrites])) | |
def build_wiki_corpus(claim: str, max_pages: int = 6, chunk_chars: int = 600) -> List[Dict]: | |
s1 = sanitize_claim_for_search(claim) | |
variants = [claim, s1, keywords_only(s1, 10)] + heuristic_rewrites(s1) | |
seen, corpus = set(), [] | |
for q in variants: | |
qn = q.strip() | |
if not qn or qn.lower() in seen: continue | |
seen.add(qn.lower()) | |
for res in wiki_search(qn, n=max_pages): | |
pg = wiki_page_content(res["pageid"]) | |
if not pg["text"]: continue | |
for j, ch in enumerate(split_into_chunks(pg["text"], max_chars=chunk_chars)): | |
corpus.append({"id": f"wiki-{pg['pageid']}-{j}", "source":"wikipedia", "pageid": pg["pageid"], | |
"title": pg["title"], "url": pg["url"], "published": pg["last_modified"] or now_iso(), | |
"text": ch}) | |
if len(corpus) >= max_pages * 2: break | |
return list({d["id"]: d for d in corpus}.values()) | |
# ---------- Web retrieval ---------- | |
def ddg_search(query: str, max_results: int = 10, allowlist: Optional[List[str]] = None) -> List[Dict]: | |
if duckduckgo_search is None: return [] | |
DDGS = duckduckgo_search.DDGS | |
allowlist = allowlist or DEFAULT_ALLOWLIST | |
out = [] | |
with DDGS() as ddgs: | |
for r in ddgs.text(query, region="wt-wt", safesearch="moderate", timelimit=None, max_results=max_results): | |
url = r.get("href") or r.get("url") or "" | |
if url and any(domain_from_url(url).endswith(dom) for dom in allowlist): | |
out.append({"title": r.get("title",""), "url": url, "snippet": r.get("body","")}) | |
return out | |
def fetch_clean_text(url: str) -> str: | |
if trafilatura is None: | |
try: | |
r = requests.get(url, headers=HEADERS, timeout=12); r.raise_for_status() | |
txt = re.sub(r"<[^>]+>", " ", r.text) | |
return normalize_ws(txt)[:8000] | |
except Exception: | |
return "" | |
try: | |
downloaded = trafilatura.fetch_url(url) | |
if not downloaded: return "" | |
txt = trafilatura.extract(downloaded, include_comments=False, include_images=False) | |
return txt or "" | |
except Exception: | |
return "" | |
def build_web_corpus(claim: str, allowlist: Optional[List[str]] = None, per_query_results: int = 8, chunk_chars: int = 700) -> List[Dict]: | |
allowlist = allowlist or DEFAULT_ALLOWLIST | |
s1 = sanitize_claim_for_search(claim) | |
variants = [claim, s1, keywords_only(s1, 10)] + heuristic_rewrites(s1) | |
seen, corpus = set(), [] | |
for q in variants: | |
qn = q.strip() | |
if not qn or qn.lower() in seen: continue | |
seen.add(qn.lower()) | |
for h in ddg_search(qn, max_results=per_query_results, allowlist=allowlist): | |
url = h["url"]; text = fetch_clean_text(url) | |
if not text: continue | |
for j, ch in enumerate(split_into_chunks(text, max_chars=chunk_chars)): | |
corpus.append({"id": f"web-{hash(url)}-{j}", "source":"web", "title": h["title"] or domain_from_url(url), | |
"url": url, "published": now_iso(), "text": ch}) | |
time.sleep(0.6) | |
if len(corpus) >= per_query_results * 4: break | |
return list({d["id"]: d for d in corpus}.values()) | |
# ---------- retrieval ---------- | |
def tokenize_simple(text: str) -> List[str]: | |
text = re.sub(r"[^a-z0-9\s]", " ", (text or "").lower()) | |
return [w for w in text.split() if w and w not in {"the","a","an","and","or","of","to","in","for","on","with"}] | |
def rrf_merge(orderings: List[List[str]], k: int = 60) -> List[str]: | |
scores = {} | |
for ordering in orderings: | |
for r, doc_id in enumerate(ordering): | |
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0/(k + r) | |
return [doc for doc,_ in sorted(scores.items(), key=lambda x: -x[1])] | |
BM25Okapi = getattr(rank_bm25, "BM25Okapi", None) if rank_bm25 else None | |
_emb_model, st_util = None, None | |
if sentence_transformers: | |
try: | |
_emb_model = sentence_transformers.SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1") | |
from sentence_transformers import util as st_util | |
except Exception: | |
_emb_model, st_util = None, None | |
def retrieve_hybrid(claim: str, docs: List[Dict], k: int = 8) -> List[Dict]: | |
if not docs: return [] | |
# BM25 (or overlap fallback) | |
if BM25Okapi: | |
corpus_tokens = [tokenize_simple(d["text"]) for d in docs] | |
bm25 = BM25Okapi(corpus_tokens) | |
bm25_scores = bm25.get_scores(tokenize_simple(claim)) | |
bm25_order = [docs[i]["id"] for i in list(np.argsort(-np.array(bm25_scores)))] | |
else: | |
q_toks = set(tokenize_simple(claim)) | |
overlaps = [(i, len(q_toks.intersection(set(tokenize_simple(d["text"]))))) for i, d in enumerate(docs)] | |
bm25_order = [docs[i]["id"] for i,_ in sorted(overlaps, key=lambda x: -x[1])] | |
# Dense (optional) | |
dense_order = [] | |
if _emb_model and st_util: | |
try: | |
q_emb = _emb_model.encode([claim], convert_to_tensor=True, show_progress_bar=False) | |
d_emb = _emb_model.encode([d["text"] for d in docs], convert_to_tensor=True, show_progress_bar=False) | |
sims = st_util.cos_sim(q_emb, d_emb).cpu().numpy().ravel() | |
dense_order = [docs[i]["id"] for i in list(np.argsort(-sims))] | |
except Exception: | |
dense_order = bm25_order | |
ordering = rrf_merge([bm25_order, dense_order or bm25_order], k=60) | |
top_ids = set(ordering[:max(k, 14)]) | |
id2doc = {d["id"]: d for d in docs} | |
ranked_docs = [id2doc[i] for i in ordering if i in top_ids] | |
return [{**doc, "score": float(1/(60+i))} for i, doc in enumerate(ranked_docs[:k])] | |
# ---------- verifier (transformers optional; heuristic fallback) ---------- | |
_nli = None | |
if transformers: | |
try: | |
AutoModelForSequenceClassification = transformers.AutoModelForSequenceClassification | |
AutoTokenizer = transformers.AutoTokenizer | |
_tok = AutoTokenizer.from_pretrained("roberta-large-mnli") | |
_mdl = AutoModelForSequenceClassification.from_pretrained("roberta-large-mnli") | |
_nli = transformers.pipeline("text-classification", model=_mdl, tokenizer=_tok, | |
return_all_scores=True, truncation=True, device=-1) | |
except Exception: | |
_nli = None | |
def verify_with_nli(claim: str, evidence: List[Dict]) -> Dict: | |
if _nli: | |
best_ent_id, best_ent_p = None, 0.0 | |
best_con_id, best_con_p = None, 0.0 | |
for e in evidence or []: | |
prem = (e.get("text") or "").strip() | |
if not prem: continue | |
outputs = _nli([{"text": prem, "text_pair": claim}]) | |
probs = {d["label"].upper(): float(d["score"]) for d in outputs[0]} | |
ent, con = probs.get("ENTAILMENT", 0.0), probs.get("CONTRADICTION", 0.0) | |
if ent > best_ent_p: best_ent_id, best_ent_p = e.get("id"), ent | |
if con > best_con_p: best_con_id, best_con_p = e.get("id"), con | |
label, used = "NEI", [] | |
conf = max(0.34, float(best_ent_p*0.5 + (1-best_con_p)*0.25)) | |
rationale = "Insufficient or inconclusive evidence." | |
if best_ent_p >= 0.60 and (best_ent_p - best_con_p) >= 0.10: | |
label, used, conf, rationale = "SUPPORT", [best_ent_id] if best_ent_id else [], best_ent_p, "Top evidence entails the claim." | |
elif best_con_p >= 0.60 and (best_con_p - best_ent_p) >= 0.10: | |
label, used, conf, rationale = "REFUTE", [best_con_id] if best_con_id else [], best_con_p, "Top evidence contradicts the claim." | |
return {"label": label, "used_evidence_ids": used, "confidence": float(conf), "rationale": rationale} | |
# heuristic fallback | |
text = " ".join((e.get("text") or "")[:400].lower() for e in evidence[:6]) | |
k = sanitize_claim_for_search(claim).lower() | |
if any(x in text for x in ["false","hoax","debunked","misinformation","no evidence","not true"]) and any(y in text for y in k.split()[:4]): | |
return {"label":"REFUTE","used_evidence_ids":[evidence[0]["id"]] if evidence else [],"confidence":0.6,"rationale":"Heuristic: refutation keywords."} | |
if any(x in text for x in ["confirmed","approved","verified","evidence shows","found that"]) and any(y in text for y in k.split()[:4]): | |
return {"label":"SUPPORT","used_evidence_ids":[evidence[0]["id"]] if evidence else [],"confidence":0.55,"rationale":"Heuristic: support keywords."} | |
return {"label":"NEI","used_evidence_ids":[],"confidence":0.4,"rationale":"Insufficient signal without NLI."} | |
def enforce_json_schema(x: Dict) -> Dict: | |
return {"label": str(x.get("label","NEI")).upper(), | |
"used_evidence_ids": [str(i) for i in x.get("used_evidence_ids", []) if i], | |
"confidence": float(x.get("confidence", 0.5)), | |
"rationale": str(x.get("rationale","")).strip()[:300]} | |
def filter_by_time(docs: List[Dict], t_max_iso: str) -> List[Dict]: | |
try: tmax = datetime.fromisoformat(t_max_iso.replace("Z","+00:00")) | |
except Exception: tmax = datetime.now(timezone.utc) | |
kept = [] | |
for d in docs: | |
try: | |
dt = datetime.fromisoformat(d["published"].replace("Z","+00:00")) | |
if dt <= tmax: kept.append(d) | |
except Exception: | |
kept.append(d) | |
return kept | |
def verify_claim(claim_text: str, use_web: bool = True, use_wiki: bool = True, | |
allowlist: Optional[List[str]] = None, t_claim_iso: Optional[str] = None, k: int = 8) -> Dict: | |
t_claim_iso = t_claim_iso or now_iso() | |
allowlist = allowlist or DEFAULT_ALLOWLIST | |
docs = [] | |
if use_wiki: docs += build_wiki_corpus(claim_text, max_pages=6, chunk_chars=600) | |
if use_web: docs += build_web_corpus(claim_text, allowlist=allowlist, per_query_results=8, chunk_chars=700) | |
corpus_at_t = filter_by_time(docs, t_claim_iso) | |
top_at_t = retrieve_hybrid(claim_text, corpus_at_t, k=k) | |
top_now = retrieve_hybrid(claim_text, docs, k=k) | |
res_t = enforce_json_schema(verify_with_nli(claim_text, top_at_t)) | |
res_n = enforce_json_schema(verify_with_nli(claim_text, top_now)) | |
return {"claim": claim_text, "t_claim": t_claim_iso, "label_at_t": res_t["label"], "label_now": res_n["label"], | |
"used_evidence_ids_at_t": res_t["used_evidence_ids"], "used_evidence_ids_now": res_n["used_evidence_ids"], | |
"confidence": float((res_t["confidence"] + res_n["confidence"]) / 2.0), | |
"rationale": res_n["rationale"] if res_n["rationale"] else res_t["rationale"], | |
"evidence_top_now": top_now} | |
def run_on_claims(claims: List[str], use_web: bool, use_wiki: bool, allowlist: List[str], k: int = 8) -> List[Dict]: | |
outs = [] | |
for c in claims: | |
c = (c or "").strip() | |
if not c: continue | |
outs.append(verify_claim(c, use_web=use_web, use_wiki=use_wiki, allowlist=allowlist, t_claim_iso=now_iso(), k=k)) | |
return outs | |
# ---------- ASR ---------- | |
def extract_audio_ffmpeg(video_path: str, out_wav: str, sr: int = 16000) -> str: | |
cmd = ["ffmpeg","-y","-i",video_path,"-vn","-acodec","pcm_s16le","-ar",str(sr),"-ac","1",out_wav] | |
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
return out_wav | |
def run_whisper_asr(audio_path: str, model_size: str = "base", language: Optional[str] = None) -> str: | |
# Prefer faster-whisper | |
if FWWhisperModel is not None: | |
device = "cuda" if gpu_available() else "cpu" | |
compute_type = "float16" if device == "cuda" else "int8" | |
model = FWWhisperModel(model_size, device=device, compute_type=compute_type) | |
segments, info = model.transcribe(audio_path, language=language, vad_filter=True, beam_size=5) | |
return " ".join(seg.text for seg in segments).strip() | |
# Fallback to OpenAI whisper | |
if OpenAIWhisper is not None: | |
model = OpenAIWhisper.load_model(model_size) | |
result = model.transcribe(audio_path, language=language) if language else model.transcribe(audio_path) | |
return (result.get("text") or "").strip() | |
# No backend | |
return "" | |
# ---------- OCR (EasyOCR → Tesseract) ---------- | |
def _tess_langs(langs_csv: str) -> str: | |
map_ = {"en":"eng","ar":"ara","fr":"fra","de":"deu","es":"spa","it":"ita","pt":"por","ru":"rus","zh":"chi_sim"} | |
codes = [x.strip().lower() for x in (langs_csv or "en").split(",") if x.strip()] | |
return "+".join(map_.get(c, c) for c in codes) or "eng" | |
def preprocess_for_ocr(img_path: str): | |
if cv2 is None: return None | |
img = cv2.imread(img_path, cv2.IMREAD_COLOR) | |
if img is None: return None | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
gray = cv2.bilateralFilter(gray, 7, 50, 50) | |
gray = cv2.equalizeHist(gray) | |
th = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 31, 9) | |
return th | |
def _ocr_with_easyocr(frames: List[str], langs_csv: str, max_images: Optional[int]) -> List[str]: | |
if not (_easyocr and cv2): return [] | |
try: | |
gpu = gpu_available() | |
reader = _easyocr.Reader([c.strip() for c in langs_csv.split(",") if c.strip()], gpu=gpu) | |
texts, count = [], 0 | |
for fp in frames: | |
if max_images and count >= max_images: break | |
img = preprocess_for_ocr(fp) | |
if img is None: | |
count += 1; | |
continue | |
for (_bbox, txt, conf) in reader.readtext(img): | |
txt = normalize_ws(txt) | |
if txt and conf >= 0.35: texts.append(txt) | |
count += 1 | |
uniq, seen = [], set() | |
for t in texts: | |
k = t.lower() | |
if k not in seen: uniq.append(t); seen.add(k) | |
return uniq | |
except Exception: | |
return [] | |
def _ocr_with_tesseract(frames: List[str], langs_csv: str, max_images: Optional[int]) -> List[str]: | |
if not (_pyt and shutil.which("tesseract") and cv2): return [] | |
lang = _tess_langs(langs_csv) | |
texts, count = [], 0 | |
for fp in frames: | |
if max_images and count >= max_images: break | |
img = preprocess_for_ocr(fp) | |
if img is None: | |
count += 1; | |
continue | |
try: | |
raw = _pyt.image_to_string(img, lang=lang) | |
except Exception: | |
try: | |
raw = _pyt.image_to_string(img, lang="eng") | |
except Exception: | |
raw = "" | |
for line in (raw or "").splitlines(): | |
line = normalize_ws(line) | |
if len(line) >= 3: texts.append(line) | |
count += 1 | |
uniq, seen = [], set() | |
for t in texts: | |
k = t.lower() | |
if k not in seen: uniq.append(t); seen.add(k) | |
return uniq | |
def run_ocr_on_frames(frames: List[str], languages: str = "en", max_images: Optional[int] = None) -> List[str]: | |
langs_csv = languages or "en" | |
out = _ocr_with_easyocr(frames, langs_csv, max_images) | |
if out: return out | |
out = _ocr_with_tesseract(frames, langs_csv, max_images) | |
return out | |
# ---------- video processing ---------- | |
def download_video(url: str, out_dir: str = "videos") -> str: | |
os.makedirs(out_dir, exist_ok=True) | |
out_tpl = os.path.join(out_dir, "%(title)s.%(ext)s") | |
subprocess.run(["yt-dlp","-o",out_tpl,url], check=True) | |
files = sorted(glob.glob(os.path.join(out_dir, "*")), key=os.path.getmtime) | |
return files[-1] if files else "" | |
def sample_frames_ffmpeg(video_path: str, out_dir: str = "frames", fps: float = 0.5) -> List[str]: | |
os.makedirs(out_dir, exist_ok=True) | |
try: | |
subprocess.run(["ffmpeg","-y","-i",video_path,"-vf",f"fps={fps}", os.path.join(out_dir, "frame_%06d.jpg")], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
except Exception: | |
return [] | |
return sorted(glob.glob(os.path.join(out_dir, "frame_*.jpg"))) | |
def aggregate_text(asr_text: str, ocr_lines: List[str]) -> str: | |
parts = [] | |
if asr_text: parts.append(asr_text) | |
if ocr_lines: parts.append("\n".join(ocr_lines)) | |
agg = normalize_ws("\n".join(parts)) | |
uniq_lines, seen = [], set() | |
for line in agg.split("\n"): | |
k = line.strip().lower() | |
if k and k not in seen: uniq_lines.append(line.strip()); seen.add(k) | |
return "\n".join(uniq_lines) | |
def suggest_claims(text: str, top_k: int = 10) -> List[str]: | |
sents = [re.sub(r'^[\'"“”]+|[\'"“”]+$', '', x).strip() for x in re.split(r'[.!?\n]+', text or "") if x.strip()] | |
candidates = [s for s in sents if len(s) >= 12 and re.search(r"\b(is|are|was|were|has|have|had|will|can|does|did|cause|causes|leads|led|prove|proves|confirm|confirms|predict|predicts|announce|announces|claim|claims|say|says|warn|warns|plan|plans|declare|declares|ban|bans|approve|approves)\b", s, re.I)] | |
if not candidates: | |
fallback = [s for s in sents if 8 <= len(s) <= 140] | |
scored = [] | |
for s in fallback: | |
score = (1 if re.search(r'\d', s) else 0) + sum(1 for w in s.split()[:6] if w[:1].isupper()) | |
scored.append((score, s)) | |
candidates = [s for _, s in sorted(scored, key=lambda x: -x[0])[:top_k]] | |
return candidates[:top_k] | |
def process_video(video_file: Optional[str] = None, video_url: Optional[str] = None, | |
whisper_model: str = "base", asr_language: Optional[str] = None, | |
ocr_langs: str = "en", fps: float = 0.5, max_ocr_images: int = 200) -> Dict: | |
workdir = f"session_{uuid.uuid4().hex[:8]}"; os.makedirs(workdir, exist_ok=True) | |
# pick source | |
if video_url and video_url.strip(): | |
vp = download_video(video_url.strip(), out_dir=workdir) | |
elif video_file and os.path.exists(video_file): | |
vp = shutil.copy(video_file, os.path.join(workdir, os.path.basename(video_file))) | |
else: | |
raise ValueError("Provide either a local video file path or a URL.") | |
# audio | |
wav = os.path.join(workdir, "audio_16k.wav") | |
if not ffmpeg_available(): | |
raise RuntimeError("ffmpeg binary not found. Ensure apt.txt includes 'ffmpeg'.") | |
extract_audio_ffmpeg(vp, wav, sr=16000) | |
# ASR (never hard-fail) | |
asr_text = "" | |
try: | |
asr_text = run_whisper_asr(wav, model_size=whisper_model, language=asr_language) | |
if not asr_text: | |
asr_text = "[ASR skipped: no backend available]" | |
except Exception as e: | |
asr_text = f"[ASR skipped: {e}]" | |
open(os.path.join(workdir, "transcript_asr.txt"), "w").write(asr_text) | |
# frames | |
frames_dir = os.path.join(workdir, "frames") | |
frames = sample_frames_ffmpeg(vp, out_dir=frames_dir, fps=fps) | |
# OCR (never hard-fail) | |
ocr_lines = [] | |
try: | |
if frames: | |
ocr_lines = run_ocr_on_frames(frames, languages=ocr_langs, max_images=int(max_ocr_images)) | |
else: | |
ocr_lines = [] | |
except Exception as e: | |
ocr_lines = [f"[OCR error: {e}]"] | |
if not ocr_lines: | |
ocr_lines = ["[OCR skipped: no backend available]"] | |
open(os.path.join(workdir, "transcript_ocr.txt"), "w").write("\n".join(ocr_lines)) | |
# aggregate + suggestions | |
agg = aggregate_text(asr_text, ocr_lines) | |
open(os.path.join(workdir, "transcript_aggregated.txt"), "w").write(agg) | |
suggestions = suggest_claims(agg, top_k=10) | |
return {"workdir": workdir, "video_path": vp, "asr_text": asr_text, "ocr_lines": ocr_lines, | |
"aggregated_text": agg, "suggested_claims": suggestions} | |
# ---------- Gradio UI ---------- | |
THEME_CSS = """ | |
<style> | |
body, .gradio-container { | |
background: radial-gradient(1200px 600px at 20% -10%, rgba(122,60,255,0.20), transparent 50%), | |
radial-gradient(1000px 400px at 80% 10%, rgba(0,179,255,0.14), transparent 50%), | |
linear-gradient(180deg, #0f1020, #0a0a12) !important; | |
color: #fff; | |
} | |
.glass { background: rgba(255,255,255,0.06); backdrop-filter: blur(8px); | |
border: 1px solid rgba(255,255,255,0.08); border-radius: 18px !important; } | |
.neon-btn { background: linear-gradient(90deg, rgba(122,60,255,0.9), rgba(0,179,255,0.9)); | |
border-radius: 12px; color: white; box-shadow: 0 0 24px rgba(122,60,255,0.35); } | |
.neon-title { background: linear-gradient(90deg, #b28cff, #7a3cff, #00b3ff); | |
-webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 900; } | |
</style> | |
""" | |
def ui_run_factcheck(claims_text: str, use_web: bool, use_wiki: bool, allowlist_str: str): | |
claims = [c.strip() for c in (claims_text or "").splitlines() if c.strip()] | |
if not claims: return "Please enter one claim per line.", None | |
allow = [d.strip() for d in (allowlist_str or ", ".join(DEFAULT_ALLOWLIST)).split(",") if d.strip()] | |
res = run_on_claims(claims, use_web=use_web, use_wiki=use_wiki, allowlist=allow, k=8) | |
rows, cards = [], [] | |
for v in res: | |
lines = ["─"*74, f"CLAIM: {v['claim']}", f"t_claim: {v['t_claim']}", | |
f"verdict@T: {v['label_at_t']} | verdict@Now: {v['label_now']} | confidence: {v['confidence']:.2f}", | |
f"rationale: {v.get('rationale','')}"] | |
evs = v.get("evidence_top_now", []) or [] | |
if not evs: lines.append("EVIDENCE: (none retrieved)") | |
else: | |
lines.append("EVIDENCE (top):") | |
for e in evs[:6]: | |
snippet = (e.get("text","") or "").replace("\n"," ") | |
snippet = (snippet[:220] + "...") if len(snippet) > 220 else snippet | |
title = e.get("title","") or e.get("source","") | |
lines.append(f" • [{title}] {e.get('url','')}") | |
lines.append(f" {snippet}") | |
cards.append("\n".join(lines)) | |
rows.append({"claim": v["claim"], "verdict_at_t": v["label_at_t"], "verdict_now": v["label_now"], | |
"confidence": round(float(v["confidence"]), 3), | |
"used_ids": "|".join(v.get("used_evidence_ids_now", []))}) | |
df = pd.DataFrame(rows) | |
return "\n\n".join(cards), df | |
def ui_ingest_and_suggest(video_file, video_url, whisper_model, asr_language, ocr_langs, fps, max_ocr_images): | |
try: vp = video_file.name if video_file else None | |
except Exception: vp = None | |
out = process_video(video_file=vp, video_url=video_url, | |
whisper_model=whisper_model, asr_language=asr_language or None, | |
ocr_langs=ocr_langs, fps=float(fps), max_ocr_images=int(max_ocr_images)) | |
asr_preview = (out["asr_text"][:1200] + "...") if len(out["asr_text"]) > 1200 else out["asr_text"] | |
ocr_preview = "\n".join(out["ocr_lines"][:50]) | |
agg_preview = (out["aggregated_text"][:2000] + "...") if len(out["aggregated_text"]) > 2000 else out["aggregated_text"] | |
sugg = "\n".join(out["suggested_claims"]) | |
return asr_preview, ocr_preview, agg_preview, sugg, sugg | |
def run_diagnostics(): | |
lines = [] | |
lines.append(f"FFmpeg: {'found' if ffmpeg_available() else 'NOT found'}") | |
lines.append(f"GPU: {'available' if gpu_available() else 'CPU only'}") | |
lines.append(f"ASR backends: {', '.join(asr_backends()) or 'none'}") | |
lines.append(f"OCR backends: {', '.join(ocr_backends()) or 'none'}") | |
# ffmpeg version | |
try: | |
v = subprocess.run(['ffmpeg','-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5) | |
lines.append(v.stdout.splitlines()[0]) | |
except Exception as e: | |
lines.append(f"ffmpeg version: {e}") | |
# tesseract version | |
try: | |
if shutil.which("tesseract"): | |
tv = subprocess.run(['tesseract','-v'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5) | |
lines.append("Tesseract: " + tv.stdout.splitlines()[0]) | |
else: | |
lines.append("Tesseract: NOT found on PATH") | |
except Exception as e: | |
lines.append(f"Tesseract: {e}") | |
# EasyOCR smoke (import only) | |
lines.append(f"EasyOCR import: {'ok' if _easyocr else 'fail'}; OpenCV: {'ok' if cv2 is not None else 'fail'}") | |
# Create a quick OCR synthetic test with Tesseract if available | |
try: | |
from PIL import Image, ImageDraw | |
img = Image.new("RGB", (480, 120), (255,255,255)) | |
d = ImageDraw.Draw(img); d.text((10,40), "AEGIS TEST 123", fill=(0,0,0)) | |
tmp = f"diag_{uuid.uuid4().hex[:6]}.png"; img.save(tmp) | |
o = run_ocr_on_frames([tmp], languages="en", max_images=1) | |
os.remove(tmp) | |
lines.append("OCR synthetic test: " + ("OK: " + " | ".join(o) if o else "no text read")) | |
except Exception as e: | |
lines.append(f"OCR synthetic test error: {e}") | |
return "\n".join(lines) | |
with gr.Blocks(css=THEME_CSS, fill_height=True) as demo: | |
gr.HTML("<h1 class='neon-title' style='font-size:42px;margin:8px 0;'>Claim Checker</h1><p style='opacity:.75;margin:-6px 0 18px;'>Make every claim earn its proof.</p>") | |
with gr.Tab("Manual Claims"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
claims_box = gr.Textbox(label="Claims (one per line)", lines=8, placeholder="e.g. 5G towers caused COVID-19", elem_classes=["glass"]) | |
with gr.Row(): | |
use_web = gr.Checkbox(value=True, label="Use Web retrieval") | |
use_wiki = gr.Checkbox(value=True, label="Use Wikipedia") | |
#allowlist_box = gr.Textbox(label="Domain allowlist (comma-separated)", value=DEFAULT_ALLOWLIST, lines=2) | |
run_btn = gr.Button("Run Fact-Check") | |
with gr.Column(scale=1): | |
out_text = gr.Textbox(label="Verdicts + Sources", lines=18, interactive=False, elem_classes=["glass"]) | |
out_df = gr.Dataframe(label="Structured Results", interactive=False) | |
run_btn.click(ui_run_factcheck, | |
inputs=[claims_box, use_web, use_wiki], | |
outputs=[out_text, out_df]) | |
with gr.Tab("Video Ingest (ASR + OCR)"): | |
gr.Markdown("Upload a video **OR** provide a URL. Whisper + EasyOCR/Tesseract run; text is aggregated and claims suggested.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
video_upload = gr.File(label="Upload video (mp4/mov/mkv...)", file_types=["video"]) | |
video_url = gr.Textbox(label="Or paste video URL (YouTube/direct link)") | |
with gr.Row(): | |
whisper_model = gr.Dropdown(choices=["tiny","base","small","medium"], value="base", label="Whisper model") | |
asr_language = gr.Textbox(label="ASR language hint (optional, e.g., en, ar)") | |
with gr.Row(): | |
ocr_langs = gr.Textbox(value="en", label="OCR languages (comma-separated, e.g., en,ar)") | |
fps = gr.Slider(minimum=0.2, maximum=2.0, value=0.5, step=0.1, label="OCR frame sampling FPS") | |
max_ocr_images = gr.Slider(minimum=20, maximum=600, value=200, step=10, label="Max frames for OCR") | |
run_ingest = gr.Button("Ingest Video (ASR + OCR)", elem_classes=["neon-btn"]) | |
with gr.Column(scale=1): | |
asr_out = gr.Textbox(label="ASR Transcript (preview)", lines=10, elem_classes=["glass"]) | |
ocr_out = gr.Textbox(label="OCR Lines (preview)", lines=10, elem_classes=["glass"]) | |
agg_out = gr.Textbox(label="Aggregated Text (preview)", lines=12, elem_classes=["glass"]) | |
sugg_out = gr.Textbox(label="Suggested Claims", lines=10, elem_classes=["glass"]) | |
to_manual = gr.Textbox(label="Copy to Manual Claims", lines=8, elem_classes=["glass"]) | |
run_ingest.click(ui_ingest_and_suggest, | |
inputs=[video_upload, video_url, whisper_model, asr_language, ocr_langs, fps, max_ocr_images], | |
outputs=[asr_out, ocr_out, agg_out, sugg_out, to_manual]) | |
demo.launch() | |