Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
@@ -1,53 +1,28 @@
|
|
1 |
-
# app.py — AEGIS FactCheck (
|
2 |
from __future__ import annotations
|
3 |
-
import os,
|
4 |
from typing import List, Dict, Optional
|
5 |
from datetime import datetime, timezone
|
6 |
|
7 |
import numpy as np
|
8 |
import pandas as pd
|
9 |
import requests
|
10 |
-
|
11 |
-
import nltk
|
12 |
-
from nltk.tokenize import sent_tokenize
|
13 |
-
nltk.download('punkt', quiet=True)
|
14 |
-
|
15 |
-
|
16 |
-
from rank_bm25 import BM25Okapi
|
17 |
-
from sentence_transformers import SentenceTransformer, util
|
18 |
-
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
|
19 |
-
|
20 |
-
from duckduckgo_search import DDGS
|
21 |
-
import trafilatura
|
22 |
-
|
23 |
-
# OCR / ASR
|
24 |
-
import easyocr, cv2, whisper
|
25 |
-
|
26 |
import gradio as gr
|
27 |
|
28 |
-
#
|
29 |
-
|
30 |
-
|
31 |
-
USE_WIKI_DEFAULT = True
|
32 |
-
|
33 |
-
try:
|
34 |
-
from nltk.tokenize import sent_tokenize # optional, if nltk happens to be installed
|
35 |
-
except Exception:
|
36 |
-
def sent_tokenize(txt: str):
|
37 |
-
import re
|
38 |
-
# simple rule-based splitter
|
39 |
-
return [s.strip() for s in re.split(r'(?<=[.!?])\s+|\n+', txt or '') if s.strip()]
|
40 |
-
|
41 |
-
try:
|
42 |
-
import openai
|
43 |
-
_has_key = True if os.environ.get("OPENAI_API_KEY") else False
|
44 |
-
except Exception:
|
45 |
-
_has_key = False
|
46 |
|
47 |
-
def
|
48 |
-
return (
|
49 |
|
50 |
-
def
|
|
|
|
|
|
|
|
|
|
|
|
|
51 |
|
52 |
USER_AGENT = "DisinfoFactcheck/1.0 (contact: [email protected])"
|
53 |
HEADERS = {"User-Agent": USER_AGENT}
|
@@ -61,19 +36,37 @@ DEFAULT_ALLOWLIST = [
|
|
61 |
"nature.com","sciencemag.org","thelancet.com","nejm.org",
|
62 |
]
|
63 |
|
64 |
-
#
|
65 |
-
REPORTING_PREFIXES = re.compile(r'^(from a video:|another line says:|it also claims:|the video says:|the speaker claims:|someone said:)', re.I)
|
66 |
-
STOP = {"the","a","an","from","it","also","claims","claim","says","said","line","video","across","cities","that","this","these","those","is","are","was","were","has","have","had","will","can","does","did"}
|
67 |
|
68 |
-
|
69 |
-
|
|
|
|
|
|
|
|
|
70 |
|
71 |
-
|
72 |
-
|
73 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
74 |
|
75 |
-
|
76 |
-
|
|
|
77 |
chunks, cur = [], ""
|
78 |
for s in sents:
|
79 |
if len(cur) + 1 + len(s) > max_chars and cur:
|
@@ -83,8 +76,7 @@ def split_into_chunks(text: str, max_chars: int = 700):
|
|
83 |
if cur: chunks.append(cur.strip())
|
84 |
return [c for c in chunks if len(c) > 40]
|
85 |
|
86 |
-
|
87 |
-
# ===================== Wikipedia =====================
|
88 |
WIKI_API = "https://en.wikipedia.org/w/api.php"
|
89 |
|
90 |
def wiki_search(query: str, n: int = 6) -> List[Dict]:
|
@@ -102,6 +94,9 @@ def wiki_page_content(pageid: int) -> Dict:
|
|
102 |
return {"pageid": page.get("pageid"), "title": page.get("title"), "url": page.get("fullurl"),
|
103 |
"last_modified": (page.get("revisions") or [{}])[0].get("timestamp"), "text": page.get("extract") or ""}
|
104 |
|
|
|
|
|
|
|
105 |
def sanitize_claim_for_search(s: str) -> str:
|
106 |
s = REPORTING_PREFIXES.sub('', (s or "").strip()).strip('"\'' )
|
107 |
s = re.sub(r"[^A-Za-z0-9\s-]", " ", s)
|
@@ -112,11 +107,10 @@ def keywords_only(s: str, limit: int = 10) -> str:
|
|
112 |
return " ".join(toks[:limit]) or s
|
113 |
|
114 |
def heuristic_rewrites(s: str) -> List[str]:
|
115 |
-
rewrites = [s]
|
116 |
rewrites.append(re.sub(r"5g[^\w]+.*covid[- ]?19", "5G COVID-19 conspiracy", s, flags=re.I))
|
117 |
rewrites.append(re.sub(r"owns?\s+the\s+world\s+health\s+organization", "Bill Gates WHO relationship", s, flags=re.I))
|
118 |
rewrites.append(re.sub(r"nasa[^\w]+.*darkness", "NASA hoax darkness", s, flags=re.I))
|
119 |
-
rewrites.append(s + " misinformation")
|
120 |
return list(dict.fromkeys([sanitize_claim_for_search(x) for x in rewrites]))
|
121 |
|
122 |
def build_wiki_corpus(claim: str, max_pages: int = 6, chunk_chars: int = 600) -> List[Dict]:
|
@@ -137,8 +131,11 @@ def build_wiki_corpus(claim: str, max_pages: int = 6, chunk_chars: int = 600) ->
|
|
137 |
if len(corpus) >= max_pages * 2: break
|
138 |
return list({d["id"]: d for d in corpus}.values())
|
139 |
|
140 |
-
#
|
141 |
def ddg_search(query: str, max_results: int = 10, allowlist: Optional[List[str]] = None) -> List[Dict]:
|
|
|
|
|
|
|
142 |
allowlist = allowlist or DEFAULT_ALLOWLIST
|
143 |
out = []
|
144 |
with DDGS() as ddgs:
|
@@ -149,6 +146,14 @@ def ddg_search(query: str, max_results: int = 10, allowlist: Optional[List[str]]
|
|
149 |
return out
|
150 |
|
151 |
def fetch_clean_text(url: str) -> str:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
152 |
try:
|
153 |
downloaded = trafilatura.fetch_url(url)
|
154 |
if not downloaded: return ""
|
@@ -172,22 +177,11 @@ def build_web_corpus(claim: str, allowlist: Optional[List[str]] = None, per_quer
|
|
172 |
for j, ch in enumerate(split_into_chunks(text, max_chars=chunk_chars)):
|
173 |
corpus.append({"id": f"web-{hash(url)}-{j}", "source":"web", "title": h["title"] or domain_from_url(url),
|
174 |
"url": url, "published": now_iso(), "text": ch})
|
175 |
-
time.sleep(0.
|
176 |
if len(corpus) >= per_query_results * 4: break
|
177 |
return list({d["id"]: d for d in corpus}.values())
|
178 |
|
179 |
-
#
|
180 |
-
def filter_by_time(docs: List[Dict], t_max_iso: str) -> List[Dict]:
|
181 |
-
tmax = datetime.fromisoformat(t_max_iso.replace("Z","+00:00"))
|
182 |
-
kept = []
|
183 |
-
for d in docs:
|
184 |
-
try:
|
185 |
-
dt = datetime.fromisoformat(d["published"].replace("Z","+00:00"))
|
186 |
-
if dt <= tmax: kept.append(d)
|
187 |
-
except Exception:
|
188 |
-
kept.append(d)
|
189 |
-
return kept
|
190 |
-
|
191 |
def tokenize_simple(text: str) -> List[str]:
|
192 |
text = re.sub(r"[^a-z0-9\s]", " ", (text or "").lower())
|
193 |
return [w for w in text.split() if w and w not in {"the","a","an","and","or","of","to","in","for","on","with"}]
|
@@ -199,36 +193,116 @@ def rrf_merge(orderings: List[List[str]], k: int = 60) -> List[str]:
|
|
199 |
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0/(k + r)
|
200 |
return [doc for doc,_ in sorted(scores.items(), key=lambda x: -x[1])]
|
201 |
|
202 |
-
|
203 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
204 |
|
205 |
def retrieve_hybrid(claim: str, docs: List[Dict], k: int = 8) -> List[Dict]:
|
206 |
if not docs: return []
|
207 |
-
|
208 |
-
|
209 |
-
|
210 |
-
|
211 |
-
|
212 |
-
|
213 |
-
|
214 |
-
|
215 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
216 |
top_ids = set(ordering[:max(k, 14)])
|
217 |
id2doc = {d["id"]: d for d in docs}
|
218 |
ranked_docs = [id2doc[i] for i in ordering if i in top_ids]
|
219 |
return [{**doc, "score": float(1/(60+i))} for i, doc in enumerate(ranked_docs[:k])]
|
220 |
|
221 |
-
|
222 |
-
|
223 |
-
|
224 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
225 |
|
226 |
-
|
227 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
228 |
|
229 |
-
|
230 |
-
{
|
231 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
232 |
|
233 |
def format_evidence_block(evs: List[Dict]) -> str:
|
234 |
lines = []
|
@@ -239,62 +313,6 @@ def format_evidence_block(evs: List[Dict]) -> str:
|
|
239 |
lines.append(f"[{e['id']}] ({e.get('published','')}) {title} — {e.get('url','')}\n{snippet}")
|
240 |
return "\n\n".join(lines)
|
241 |
|
242 |
-
# Baseline NLI
|
243 |
-
NLI_NAME = "roberta-large-mnli"
|
244 |
-
nli_tok = AutoTokenizer.from_pretrained(NLI_NAME)
|
245 |
-
nli_model = AutoModelForSequenceClassification.from_pretrained(NLI_NAME)
|
246 |
-
nli = pipeline("text-classification", model=nli_model, tokenizer=nli_tok, return_all_scores=True, truncation=True, device=-1)
|
247 |
-
|
248 |
-
def verify_with_nli(claim: str, evidence: List[Dict]) -> Dict:
|
249 |
-
best_ent_id, best_ent_p = None, 0.0
|
250 |
-
best_con_id, best_con_p = None, 0.0
|
251 |
-
for e in evidence or []:
|
252 |
-
prem = (e.get("text") or "").strip()
|
253 |
-
if not prem: continue
|
254 |
-
outputs = nli([{"text": prem, "text_pair": claim}])
|
255 |
-
probs = {d["label"].upper(): float(d["score"]) for d in outputs[0]}
|
256 |
-
ent, con = probs.get("ENTAILMENT", 0.0), probs.get("CONTRADICTION", 0.0)
|
257 |
-
if ent > best_ent_p: best_ent_id, best_ent_p = e.get("id"), ent
|
258 |
-
if con > best_con_p: best_con_id, best_con_p = e.get("id"), con
|
259 |
-
label, used = "NEI", []
|
260 |
-
conf = max(0.34, float(best_ent_p*0.5 + (1-best_con_p)*0.25))
|
261 |
-
rationale = "Insufficient or inconclusive evidence."
|
262 |
-
if best_ent_p >= 0.60 and (best_ent_p - best_con_p) >= 0.10:
|
263 |
-
label, used, conf, rationale = "SUPPORT", [best_ent_id] if best_ent_id else [], best_ent_p, "Top evidence entails the claim."
|
264 |
-
elif best_con_p >= 0.60 and (best_con_p - best_ent_p) >= 0.10:
|
265 |
-
label, used, conf, rationale = "REFUTE", [best_con_id] if best_con_id else [], best_con_p, "Top evidence contradicts the claim."
|
266 |
-
return {"label": label, "used_evidence_ids": used, "confidence": float(conf), "rationale": rationale}
|
267 |
-
|
268 |
-
def verify_with_openai(claim: str, evidence: List[Dict]) -> Dict:
|
269 |
-
if not has_llm():
|
270 |
-
return verify_with_nli(claim, evidence)
|
271 |
-
try:
|
272 |
-
client = openai.OpenAI()
|
273 |
-
resp = client.chat.completions.create(
|
274 |
-
model=os.environ.get("OPENAI_MODEL","gpt-4o-mini"),
|
275 |
-
messages=[
|
276 |
-
{"role":"system","content":"You verify factual claims using only provided evidence and return strict JSON."},
|
277 |
-
{"role":"user","content": VERIFIER_PROMPT.format(claim=claim, evidence_block=format_evidence_block(evidence))}
|
278 |
-
],
|
279 |
-
temperature=0.0,
|
280 |
-
response_format={"type": "json_object"}
|
281 |
-
)
|
282 |
-
j = json.loads(resp.choices[0].message.content)
|
283 |
-
return {"label": str(j.get("label","NEI")).upper(),
|
284 |
-
"used_evidence_ids": [str(x) for x in j.get("used_evidence_ids", [])],
|
285 |
-
"confidence": float(j.get("confidence", 0.5)),
|
286 |
-
"rationale": str(j.get("rationale","")).strip()[:300]}
|
287 |
-
except Exception as e:
|
288 |
-
alt = verify_with_nli(claim, evidence)
|
289 |
-
alt["rationale"] = f"NLI fallback due to LLM error: {e}"
|
290 |
-
return alt
|
291 |
-
|
292 |
-
def enforce_json_schema(x: Dict) -> Dict:
|
293 |
-
return {"label": str(x.get("label","NEI")).upper(),
|
294 |
-
"used_evidence_ids": [str(i) for i in x.get("used_evidence_ids", []) if i],
|
295 |
-
"confidence": float(x.get("confidence", 0.5)),
|
296 |
-
"rationale": str(x.get("rationale","")).strip()[:300]}
|
297 |
-
|
298 |
def verify_claim(claim_text: str, use_web: bool = True, use_wiki: bool = True,
|
299 |
allowlist: Optional[List[str]] = None, t_claim_iso: Optional[str] = None, k: int = 8) -> Dict:
|
300 |
t_claim_iso = t_claim_iso or now_iso()
|
@@ -305,8 +323,8 @@ def verify_claim(claim_text: str, use_web: bool = True, use_wiki: bool = True,
|
|
305 |
corpus_at_t = filter_by_time(docs, t_claim_iso)
|
306 |
top_at_t = retrieve_hybrid(claim_text, corpus_at_t, k=k)
|
307 |
top_now = retrieve_hybrid(claim_text, docs, k=k)
|
308 |
-
res_t = enforce_json_schema(
|
309 |
-
res_n = enforce_json_schema(
|
310 |
return {"claim": claim_text, "t_claim": t_claim_iso, "label_at_t": res_t["label"], "label_now": res_n["label"],
|
311 |
"used_evidence_ids_at_t": res_t["used_evidence_ids"], "used_evidence_ids_now": res_n["used_evidence_ids"],
|
312 |
"confidence": float((res_t["confidence"] + res_n["confidence"]) / 2.0),
|
@@ -314,20 +332,28 @@ def verify_claim(claim_text: str, use_web: bool = True, use_wiki: bool = True,
|
|
314 |
"evidence_top_now": top_now}
|
315 |
|
316 |
def run_on_claims(claims: List[str], use_web: bool, use_wiki: bool, allowlist: List[str], k: int = 8) -> List[Dict]:
|
317 |
-
|
318 |
-
|
319 |
-
|
320 |
-
|
|
|
|
|
|
|
|
|
321 |
def extract_audio_ffmpeg(video_path: str, out_wav: str, sr: int = 16000) -> str:
|
322 |
cmd = ["ffmpeg","-y","-i",video_path,"-vn","-acodec","pcm_s16le","-ar",str(sr),"-ac","1",out_wav]
|
323 |
-
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
|
|
324 |
|
325 |
def run_whisper_asr(audio_path: str, model_size: str = "base", language: Optional[str] = None) -> str:
|
326 |
-
|
|
|
|
|
327 |
result = model.transcribe(audio_path, language=language) if language else model.transcribe(audio_path)
|
328 |
return result.get("text","").strip()
|
329 |
|
330 |
def download_video(url: str, out_dir: str = "videos") -> str:
|
|
|
331 |
os.makedirs(out_dir, exist_ok=True)
|
332 |
out_tpl = os.path.join(out_dir, "%(title)s.%(ext)s")
|
333 |
subprocess.run(["yt-dlp","-o",out_tpl,url], check=True)
|
@@ -341,6 +367,7 @@ def sample_frames_ffmpeg(video_path: str, out_dir: str = "frames", fps: float =
|
|
341 |
return sorted(glob.glob(os.path.join(out_dir, "frame_*.jpg")))
|
342 |
|
343 |
def preprocess_for_ocr(img_path: str):
|
|
|
344 |
img = cv2.imread(img_path, cv2.IMREAD_COLOR)
|
345 |
if img is None: return None
|
346 |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
@@ -351,6 +378,8 @@ def preprocess_for_ocr(img_path: str):
|
|
351 |
return th
|
352 |
|
353 |
def run_easyocr_on_frames(frames: List[str], languages: List[str] = ["en"], gpu: Optional[bool] = None, max_images: Optional[int] = None) -> List[str]:
|
|
|
|
|
354 |
if gpu is None:
|
355 |
gpu = True if (os.environ.get("SPACE_ID") or shutil.which("nvidia-smi")) else False
|
356 |
reader = easyocr.Reader(languages, gpu=gpu)
|
@@ -402,7 +431,7 @@ def process_video(video_file: Optional[str] = None, video_url: Optional[str] = N
|
|
402 |
return {"workdir": workdir, "video_path": vp, "asr_text": asr_text, "ocr_lines": ocr_lines,
|
403 |
"aggregated_text": agg, "suggested_claims": suggestions}
|
404 |
|
405 |
-
#
|
406 |
CLAIM_MIN_LEN = 12
|
407 |
VERB_TRIGGERS = r"\b(" + "|".join([
|
408 |
"is","are","was","were","has","have","had","will","can","does","did",
|
@@ -423,7 +452,7 @@ def suggest_claims(text: str, top_k: int = 10) -> List[str]:
|
|
423 |
candidates = [s for _, s in sorted(scored, key=lambda x: -x[0])[:top_k]]
|
424 |
return candidates[:top_k]
|
425 |
|
426 |
-
#
|
427 |
THEME_CSS = """
|
428 |
<style>
|
429 |
body, .gradio-container {
|
@@ -460,10 +489,7 @@ def ui_run_factcheck(claims_text: str, use_web: bool, use_wiki: bool, allowlist_
|
|
460 |
allow = [d.strip() for d in (allowlist_str or ", ".join(DEFAULT_ALLOWLIST)).split(",") if d.strip()]
|
461 |
res = run_on_claims(claims, use_web=use_web, use_wiki=use_wiki, allowlist=allow, k=8)
|
462 |
|
463 |
-
|
464 |
-
used = lambda v: "|".join(v.get("used_evidence_ids_now", []))
|
465 |
-
rows = []
|
466 |
-
cards = []
|
467 |
for v in res:
|
468 |
lines = ["─"*74, f"CLAIM: {v['claim']}", f"t_claim: {v['t_claim']}",
|
469 |
f"verdict@T: {v['label_at_t']} | verdict@Now: {v['label_now']} | confidence: {v['confidence']:.2f}",
|
@@ -480,7 +506,8 @@ def ui_run_factcheck(claims_text: str, use_web: bool, use_wiki: bool, allowlist_
|
|
480 |
lines.append(f" {snippet}")
|
481 |
cards.append("\n".join(lines))
|
482 |
rows.append({"claim": v["claim"], "verdict_at_t": v["label_at_t"], "verdict_now": v["label_now"],
|
483 |
-
"confidence": round(float(v["confidence"]), 3),
|
|
|
484 |
|
485 |
df = pd.DataFrame(rows)
|
486 |
return "\n\n".join(cards), df
|
@@ -488,9 +515,12 @@ def ui_run_factcheck(claims_text: str, use_web: bool, use_wiki: bool, allowlist_
|
|
488 |
def ui_ingest_and_suggest(video_file, video_url, whisper_model, asr_language, ocr_langs, fps, max_ocr_images):
|
489 |
try: vp = video_file.name if video_file else None
|
490 |
except Exception: vp = None
|
491 |
-
|
492 |
-
|
493 |
-
|
|
|
|
|
|
|
494 |
asr_preview = (out["asr_text"][:1200] + "...") if len(out["asr_text"]) > 1200 else out["asr_text"]
|
495 |
ocr_preview = "\n".join(out["ocr_lines"][:50])
|
496 |
agg_preview = (out["aggregated_text"][:2000] + "...") if len(out["aggregated_text"]) > 2000 else out["aggregated_text"]
|
@@ -505,8 +535,8 @@ with gr.Blocks(css=THEME_CSS, fill_height=True) as demo:
|
|
505 |
with gr.Column(scale=1):
|
506 |
claims_box = gr.Textbox(label="Claims (one per line)", lines=8, placeholder="e.g. NASA predicts three days of darkness", elem_classes=["glass"])
|
507 |
with gr.Row():
|
508 |
-
use_web = gr.Checkbox(value=
|
509 |
-
use_wiki = gr.Checkbox(value=
|
510 |
allowlist_box = gr.Textbox(label="Domain allowlist (comma-separated)", value=", ".join(DEFAULT_ALLOWLIST), lines=2)
|
511 |
run_btn = gr.Button("Run Fact-Check", elem_classes=["neon-btn"])
|
512 |
with gr.Column(scale=1):
|
|
|
1 |
+
# app.py — AEGIS FactCheck (SLIM, guarded imports, no NLTK)
|
2 |
from __future__ import annotations
|
3 |
+
import os, re, json, time, glob, uuid, shutil, subprocess, urllib.parse
|
4 |
from typing import List, Dict, Optional
|
5 |
from datetime import datetime, timezone
|
6 |
|
7 |
import numpy as np
|
8 |
import pandas as pd
|
9 |
import requests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
10 |
import gradio as gr
|
11 |
|
12 |
+
# ---------- lightweight helpers ----------
|
13 |
+
def now_iso() -> str:
|
14 |
+
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
15 |
|
16 |
+
def normalize_ws(s: str) -> str:
|
17 |
+
return re.sub(r"\s+", " ", s or "").strip()
|
18 |
|
19 |
+
def sent_tokenize_fallback(txt: str) -> List[str]:
|
20 |
+
# NLTK-free sentence splitter
|
21 |
+
return [s.strip() for s in re.split(r'(?<=[.!?])\s+|\n+', txt or '') if s.strip()]
|
22 |
+
|
23 |
+
def domain_from_url(url: str) -> str:
|
24 |
+
try: return urllib.parse.urlparse(url).netloc.lower()
|
25 |
+
except Exception: return ""
|
26 |
|
27 |
USER_AGENT = "DisinfoFactcheck/1.0 (contact: [email protected])"
|
28 |
HEADERS = {"User-Agent": USER_AGENT}
|
|
|
36 |
"nature.com","sciencemag.org","thelancet.com","nejm.org",
|
37 |
]
|
38 |
|
39 |
+
FORCE_BASELINE = True # leave True; OpenAI LLM path is optional and guarded
|
|
|
|
|
40 |
|
41 |
+
# ---------- optional imports (guarded) ----------
|
42 |
+
def _try_import(name: str):
|
43 |
+
try:
|
44 |
+
return __import__(name)
|
45 |
+
except Exception:
|
46 |
+
return None
|
47 |
|
48 |
+
duckduckgo_search = _try_import("duckduckgo_search")
|
49 |
+
trafilatura = _try_import("trafilatura")
|
50 |
+
rank_bm25 = _try_import("rank_bm25")
|
51 |
+
sentence_transformers = _try_import("sentence_transformers")
|
52 |
+
transformers = _try_import("transformers")
|
53 |
+
torch = _try_import("torch")
|
54 |
+
|
55 |
+
# Heavy CV/ASR guarded
|
56 |
+
cv2 = _try_import("cv2")
|
57 |
+
easyocr = _try_import("easyocr")
|
58 |
+
whisper = _try_import("whisper")
|
59 |
+
|
60 |
+
# OpenAI is optional
|
61 |
+
_openai = _try_import("openai")
|
62 |
+
_has_openai_key = bool(os.environ.get("OPENAI_API_KEY"))
|
63 |
+
|
64 |
+
def has_llm() -> bool:
|
65 |
+
return (not FORCE_BASELINE) and _openai is not None and _has_openai_key
|
66 |
|
67 |
+
# ---------- text splitting ----------
|
68 |
+
def split_into_chunks(text: str, max_chars: int = 700) -> List[str]:
|
69 |
+
sents = [normalize_ws(s) for s in sent_tokenize_fallback(text or "")]
|
70 |
chunks, cur = [], ""
|
71 |
for s in sents:
|
72 |
if len(cur) + 1 + len(s) > max_chars and cur:
|
|
|
76 |
if cur: chunks.append(cur.strip())
|
77 |
return [c for c in chunks if len(c) > 40]
|
78 |
|
79 |
+
# ---------- Wikipedia ----------
|
|
|
80 |
WIKI_API = "https://en.wikipedia.org/w/api.php"
|
81 |
|
82 |
def wiki_search(query: str, n: int = 6) -> List[Dict]:
|
|
|
94 |
return {"pageid": page.get("pageid"), "title": page.get("title"), "url": page.get("fullurl"),
|
95 |
"last_modified": (page.get("revisions") or [{}])[0].get("timestamp"), "text": page.get("extract") or ""}
|
96 |
|
97 |
+
REPORTING_PREFIXES = re.compile(r'^(from a video:|another line says:|it also claims:|the video says:|the speaker claims:|someone said:)', re.I)
|
98 |
+
STOP = {"the","a","an","from","it","also","claims","claim","says","said","line","video","across","cities","that","this","these","those","is","are","was","were","has","have","had","will","can","does","did"}
|
99 |
+
|
100 |
def sanitize_claim_for_search(s: str) -> str:
|
101 |
s = REPORTING_PREFIXES.sub('', (s or "").strip()).strip('"\'' )
|
102 |
s = re.sub(r"[^A-Za-z0-9\s-]", " ", s)
|
|
|
107 |
return " ".join(toks[:limit]) or s
|
108 |
|
109 |
def heuristic_rewrites(s: str) -> List[str]:
|
110 |
+
rewrites = [s, s + " misinformation"]
|
111 |
rewrites.append(re.sub(r"5g[^\w]+.*covid[- ]?19", "5G COVID-19 conspiracy", s, flags=re.I))
|
112 |
rewrites.append(re.sub(r"owns?\s+the\s+world\s+health\s+organization", "Bill Gates WHO relationship", s, flags=re.I))
|
113 |
rewrites.append(re.sub(r"nasa[^\w]+.*darkness", "NASA hoax darkness", s, flags=re.I))
|
|
|
114 |
return list(dict.fromkeys([sanitize_claim_for_search(x) for x in rewrites]))
|
115 |
|
116 |
def build_wiki_corpus(claim: str, max_pages: int = 6, chunk_chars: int = 600) -> List[Dict]:
|
|
|
131 |
if len(corpus) >= max_pages * 2: break
|
132 |
return list({d["id"]: d for d in corpus}.values())
|
133 |
|
134 |
+
# ---------- Web retrieval ----------
|
135 |
def ddg_search(query: str, max_results: int = 10, allowlist: Optional[List[str]] = None) -> List[Dict]:
|
136 |
+
if duckduckgo_search is None:
|
137 |
+
return []
|
138 |
+
DDGS = duckduckgo_search.DDGS
|
139 |
allowlist = allowlist or DEFAULT_ALLOWLIST
|
140 |
out = []
|
141 |
with DDGS() as ddgs:
|
|
|
146 |
return out
|
147 |
|
148 |
def fetch_clean_text(url: str) -> str:
|
149 |
+
if trafilatura is None: # degrade
|
150 |
+
try:
|
151 |
+
# last-chance plain GET (messy but better than nothing)
|
152 |
+
r = requests.get(url, headers=HEADERS, timeout=12); r.raise_for_status()
|
153 |
+
txt = re.sub(r"<[^>]+>", " ", r.text)
|
154 |
+
return normalize_ws(txt)[:8000]
|
155 |
+
except Exception:
|
156 |
+
return ""
|
157 |
try:
|
158 |
downloaded = trafilatura.fetch_url(url)
|
159 |
if not downloaded: return ""
|
|
|
177 |
for j, ch in enumerate(split_into_chunks(text, max_chars=chunk_chars)):
|
178 |
corpus.append({"id": f"web-{hash(url)}-{j}", "source":"web", "title": h["title"] or domain_from_url(url),
|
179 |
"url": url, "published": now_iso(), "text": ch})
|
180 |
+
time.sleep(0.6) # polite
|
181 |
if len(corpus) >= per_query_results * 4: break
|
182 |
return list({d["id"]: d for d in corpus}.values())
|
183 |
|
184 |
+
# ---------- retrieval scoring (BM25 + optional dense) ----------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
185 |
def tokenize_simple(text: str) -> List[str]:
|
186 |
text = re.sub(r"[^a-z0-9\s]", " ", (text or "").lower())
|
187 |
return [w for w in text.split() if w and w not in {"the","a","an","and","or","of","to","in","for","on","with"}]
|
|
|
193 |
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0/(k + r)
|
194 |
return [doc for doc,_ in sorted(scores.items(), key=lambda x: -x[1])]
|
195 |
|
196 |
+
# try to load BM25
|
197 |
+
BM25Okapi = getattr(rank_bm25, "BM25Okapi", None) if rank_bm25 else None
|
198 |
+
|
199 |
+
# try to prepare sentence-transformers
|
200 |
+
_emb_model = None
|
201 |
+
if sentence_transformers:
|
202 |
+
try:
|
203 |
+
_emb_model = sentence_transformers.SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1")
|
204 |
+
from sentence_transformers import util as st_util
|
205 |
+
except Exception:
|
206 |
+
_emb_model = None
|
207 |
+
st_util = None
|
208 |
+
else:
|
209 |
+
st_util = None
|
210 |
|
211 |
def retrieve_hybrid(claim: str, docs: List[Dict], k: int = 8) -> List[Dict]:
|
212 |
if not docs: return []
|
213 |
+
# BM25 (always available? If not, fall back to keyword cosine)
|
214 |
+
bm25_order = []
|
215 |
+
if BM25Okapi:
|
216 |
+
corpus_tokens = [tokenize_simple(d["text"]) for d in docs]
|
217 |
+
bm25 = BM25Okapi(corpus_tokens)
|
218 |
+
bm25_scores = bm25.get_scores(tokenize_simple(claim))
|
219 |
+
bm25_order = [docs[i]["id"] for i in list(np.argsort(-np.array(bm25_scores)))]
|
220 |
+
else:
|
221 |
+
# poor-man BM25: sort by overlap count
|
222 |
+
q_toks = set(tokenize_simple(claim))
|
223 |
+
overlaps = [(i, len(q_toks.intersection(set(tokenize_simple(d["text"]))))) for i, d in enumerate(docs)]
|
224 |
+
bm25_order = [docs[i]["id"] for i,_ in sorted(overlaps, key=lambda x: -x[1])]
|
225 |
+
|
226 |
+
# Dense (optional)
|
227 |
+
dense_order = []
|
228 |
+
if _emb_model and st_util:
|
229 |
+
try:
|
230 |
+
q_emb = _emb_model.encode([claim], convert_to_tensor=True, show_progress_bar=False)
|
231 |
+
d_emb = _emb_model.encode([d["text"] for d in docs], convert_to_tensor=True, show_progress_bar=False)
|
232 |
+
sims = st_util.cos_sim(q_emb, d_emb).cpu().numpy().ravel()
|
233 |
+
dense_order = [docs[i]["id"] for i in list(np.argsort(-sims))]
|
234 |
+
except Exception:
|
235 |
+
dense_order = bm25_order
|
236 |
+
|
237 |
+
ordering = rrf_merge([bm25_order, dense_order or bm25_order], k=60)
|
238 |
top_ids = set(ordering[:max(k, 14)])
|
239 |
id2doc = {d["id"]: d for d in docs}
|
240 |
ranked_docs = [id2doc[i] for i in ordering if i in top_ids]
|
241 |
return [{**doc, "score": float(1/(60+i))} for i, doc in enumerate(ranked_docs[:k])]
|
242 |
|
243 |
+
# ---------- verifier (transformers optional; heuristic fallback) ----------
|
244 |
+
_nli_pipeline = None
|
245 |
+
if transformers:
|
246 |
+
try:
|
247 |
+
AutoModelForSequenceClassification = transformers.AutoModelForSequenceClassification
|
248 |
+
AutoTokenizer = transformers.AutoTokenizer
|
249 |
+
_nli_tok = AutoTokenizer.from_pretrained("roberta-large-mnli")
|
250 |
+
_nli_model = AutoModelForSequenceClassification.from_pretrained("roberta-large-mnli")
|
251 |
+
_nli_pipeline = transformers.pipeline("text-classification", model=_nli_model, tokenizer=_nli_tok,
|
252 |
+
return_all_scores=True, truncation=True, device=-1)
|
253 |
+
except Exception:
|
254 |
+
_nli_pipeline = None
|
255 |
|
256 |
+
def verify_with_nli(claim: str, evidence: List[Dict]) -> Dict:
|
257 |
+
# If NLI pipeline available
|
258 |
+
if _nli_pipeline:
|
259 |
+
best_ent_id, best_ent_p = None, 0.0
|
260 |
+
best_con_id, best_con_p = None, 0.0
|
261 |
+
for e in evidence or []:
|
262 |
+
prem = (e.get("text") or "").strip()
|
263 |
+
if not prem: continue
|
264 |
+
outputs = _nli_pipeline([{"text": prem, "text_pair": claim}])
|
265 |
+
probs = {d["label"].upper(): float(d["score"]) for d in outputs[0]}
|
266 |
+
ent, con = probs.get("ENTAILMENT", 0.0), probs.get("CONTRADICTION", 0.0)
|
267 |
+
if ent > best_ent_p: best_ent_id, best_ent_p = e.get("id"), ent
|
268 |
+
if con > best_con_p: best_con_id, best_con_p = e.get("id"), con
|
269 |
+
label, used = "NEI", []
|
270 |
+
conf = max(0.34, float(best_ent_p*0.5 + (1-best_con_p)*0.25))
|
271 |
+
rationale = "Insufficient or inconclusive evidence."
|
272 |
+
if best_ent_p >= 0.60 and (best_ent_p - best_con_p) >= 0.10:
|
273 |
+
label, used, conf, rationale = "SUPPORT", [best_ent_id] if best_ent_id else [], best_ent_p, "Top evidence entails the claim."
|
274 |
+
elif best_con_p >= 0.60 and (best_con_p - best_ent_p) >= 0.10:
|
275 |
+
label, used, conf, rationale = "REFUTE", [best_con_id] if best_con_id else [], best_con_p, "Top evidence contradicts the claim."
|
276 |
+
return {"label": label, "used_evidence_ids": used, "confidence": float(conf), "rationale": rationale}
|
277 |
+
|
278 |
+
# Heuristic fallback (no transformers)
|
279 |
+
text = " ".join((e.get("text") or "")[:400].lower() for e in evidence[:6])
|
280 |
+
k = sanitize_claim_for_search(claim).lower()
|
281 |
+
if any(x in text for x in ["false", "hoax", "debunked", "misinformation", "no evidence", "not true"]) and any(y in text for y in k.split()[:4]):
|
282 |
+
return {"label": "REFUTE", "used_evidence_ids": [evidence[0]["id"]] if evidence else [], "confidence": 0.6, "rationale": "Heuristic: evidence indicates refutation keywords."}
|
283 |
+
if any(x in text for x in ["confirmed", "approved", "verified", "evidence shows", "found that"]) and any(y in text for y in k.split()[:4]):
|
284 |
+
return {"label": "SUPPORT", "used_evidence_ids": [evidence[0]["id"]] if evidence else [], "confidence": 0.55, "rationale": "Heuristic: evidence indicates support keywords."}
|
285 |
+
return {"label": "NEI", "used_evidence_ids": [], "confidence": 0.4, "rationale": "Insufficient signal without NLI."}
|
286 |
|
287 |
+
def enforce_json_schema(x: Dict) -> Dict:
|
288 |
+
return {"label": str(x.get("label","NEI")).upper(),
|
289 |
+
"used_evidence_ids": [str(i) for i in x.get("used_evidence_ids", []) if i],
|
290 |
+
"confidence": float(x.get("confidence", 0.5)),
|
291 |
+
"rationale": str(x.get("rationale","")).strip()[:300]}
|
292 |
+
|
293 |
+
def filter_by_time(docs: List[Dict], t_max_iso: str) -> List[Dict]:
|
294 |
+
try:
|
295 |
+
tmax = datetime.fromisoformat(t_max_iso.replace("Z","+00:00"))
|
296 |
+
except Exception:
|
297 |
+
tmax = datetime.now(timezone.utc)
|
298 |
+
kept = []
|
299 |
+
for d in docs:
|
300 |
+
try:
|
301 |
+
dt = datetime.fromisoformat(d["published"].replace("Z","+00:00"))
|
302 |
+
if dt <= tmax: kept.append(d)
|
303 |
+
except Exception:
|
304 |
+
kept.append(d)
|
305 |
+
return kept
|
306 |
|
307 |
def format_evidence_block(evs: List[Dict]) -> str:
|
308 |
lines = []
|
|
|
313 |
lines.append(f"[{e['id']}] ({e.get('published','')}) {title} — {e.get('url','')}\n{snippet}")
|
314 |
return "\n\n".join(lines)
|
315 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
316 |
def verify_claim(claim_text: str, use_web: bool = True, use_wiki: bool = True,
|
317 |
allowlist: Optional[List[str]] = None, t_claim_iso: Optional[str] = None, k: int = 8) -> Dict:
|
318 |
t_claim_iso = t_claim_iso or now_iso()
|
|
|
323 |
corpus_at_t = filter_by_time(docs, t_claim_iso)
|
324 |
top_at_t = retrieve_hybrid(claim_text, corpus_at_t, k=k)
|
325 |
top_now = retrieve_hybrid(claim_text, docs, k=k)
|
326 |
+
res_t = enforce_json_schema(verify_with_nli(claim_text, top_at_t))
|
327 |
+
res_n = enforce_json_schema(verify_with_nli(claim_text, top_now))
|
328 |
return {"claim": claim_text, "t_claim": t_claim_iso, "label_at_t": res_t["label"], "label_now": res_n["label"],
|
329 |
"used_evidence_ids_at_t": res_t["used_evidence_ids"], "used_evidence_ids_now": res_n["used_evidence_ids"],
|
330 |
"confidence": float((res_t["confidence"] + res_n["confidence"]) / 2.0),
|
|
|
332 |
"evidence_top_now": top_now}
|
333 |
|
334 |
def run_on_claims(claims: List[str], use_web: bool, use_wiki: bool, allowlist: List[str], k: int = 8) -> List[Dict]:
|
335 |
+
outs = []
|
336 |
+
for c in claims:
|
337 |
+
c = (c or "").strip()
|
338 |
+
if not c: continue
|
339 |
+
outs.append(verify_claim(c, use_web=use_web, use_wiki=use_wiki, allowlist=allowlist, t_claim_iso=now_iso(), k=k))
|
340 |
+
return outs
|
341 |
+
|
342 |
+
# ---------- ASR + OCR (guarded) ----------
|
343 |
def extract_audio_ffmpeg(video_path: str, out_wav: str, sr: int = 16000) -> str:
|
344 |
cmd = ["ffmpeg","-y","-i",video_path,"-vn","-acodec","pcm_s16le","-ar",str(sr),"-ac","1",out_wav]
|
345 |
+
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
346 |
+
return out_wav
|
347 |
|
348 |
def run_whisper_asr(audio_path: str, model_size: str = "base", language: Optional[str] = None) -> str:
|
349 |
+
if whisper is None:
|
350 |
+
raise RuntimeError("Whisper not available. Ensure openai-whisper is installed and FFmpeg present.")
|
351 |
+
model = whisper.load_model(model_size)
|
352 |
result = model.transcribe(audio_path, language=language) if language else model.transcribe(audio_path)
|
353 |
return result.get("text","").strip()
|
354 |
|
355 |
def download_video(url: str, out_dir: str = "videos") -> str:
|
356 |
+
# yt-dlp is installed via requirements; call binary
|
357 |
os.makedirs(out_dir, exist_ok=True)
|
358 |
out_tpl = os.path.join(out_dir, "%(title)s.%(ext)s")
|
359 |
subprocess.run(["yt-dlp","-o",out_tpl,url], check=True)
|
|
|
367 |
return sorted(glob.glob(os.path.join(out_dir, "frame_*.jpg")))
|
368 |
|
369 |
def preprocess_for_ocr(img_path: str):
|
370 |
+
if cv2 is None: return None
|
371 |
img = cv2.imread(img_path, cv2.IMREAD_COLOR)
|
372 |
if img is None: return None
|
373 |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
378 |
return th
|
379 |
|
380 |
def run_easyocr_on_frames(frames: List[str], languages: List[str] = ["en"], gpu: Optional[bool] = None, max_images: Optional[int] = None) -> List[str]:
|
381 |
+
if easyocr is None:
|
382 |
+
raise RuntimeError("EasyOCR not available. Ensure easyocr + opencv-python-headless are installed.")
|
383 |
if gpu is None:
|
384 |
gpu = True if (os.environ.get("SPACE_ID") or shutil.which("nvidia-smi")) else False
|
385 |
reader = easyocr.Reader(languages, gpu=gpu)
|
|
|
431 |
return {"workdir": workdir, "video_path": vp, "asr_text": asr_text, "ocr_lines": ocr_lines,
|
432 |
"aggregated_text": agg, "suggested_claims": suggestions}
|
433 |
|
434 |
+
# ---------- claim suggestions ----------
|
435 |
CLAIM_MIN_LEN = 12
|
436 |
VERB_TRIGGERS = r"\b(" + "|".join([
|
437 |
"is","are","was","were","has","have","had","will","can","does","did",
|
|
|
452 |
candidates = [s for _, s in sorted(scored, key=lambda x: -x[0])[:top_k]]
|
453 |
return candidates[:top_k]
|
454 |
|
455 |
+
# ---------- Gradio theme & UI ----------
|
456 |
THEME_CSS = """
|
457 |
<style>
|
458 |
body, .gradio-container {
|
|
|
489 |
allow = [d.strip() for d in (allowlist_str or ", ".join(DEFAULT_ALLOWLIST)).split(",") if d.strip()]
|
490 |
res = run_on_claims(claims, use_web=use_web, use_wiki=use_wiki, allowlist=allow, k=8)
|
491 |
|
492 |
+
rows, cards = [], []
|
|
|
|
|
|
|
493 |
for v in res:
|
494 |
lines = ["─"*74, f"CLAIM: {v['claim']}", f"t_claim: {v['t_claim']}",
|
495 |
f"verdict@T: {v['label_at_t']} | verdict@Now: {v['label_now']} | confidence: {v['confidence']:.2f}",
|
|
|
506 |
lines.append(f" {snippet}")
|
507 |
cards.append("\n".join(lines))
|
508 |
rows.append({"claim": v["claim"], "verdict_at_t": v["label_at_t"], "verdict_now": v["label_now"],
|
509 |
+
"confidence": round(float(v["confidence"]), 3),
|
510 |
+
"used_ids": "|".join(v.get("used_evidence_ids_now", []))})
|
511 |
|
512 |
df = pd.DataFrame(rows)
|
513 |
return "\n\n".join(cards), df
|
|
|
515 |
def ui_ingest_and_suggest(video_file, video_url, whisper_model, asr_language, ocr_langs, fps, max_ocr_images):
|
516 |
try: vp = video_file.name if video_file else None
|
517 |
except Exception: vp = None
|
518 |
+
try:
|
519 |
+
out = process_video(video_file=vp, video_url=video_url,
|
520 |
+
whisper_model=whisper_model, asr_language=asr_language or None,
|
521 |
+
ocr_langs=ocr_langs, fps=float(fps), max_ocr_images=int(max_ocr_images))
|
522 |
+
except Exception as e:
|
523 |
+
return f"Error during ingest: {e}", "", "", "", ""
|
524 |
asr_preview = (out["asr_text"][:1200] + "...") if len(out["asr_text"]) > 1200 else out["asr_text"]
|
525 |
ocr_preview = "\n".join(out["ocr_lines"][:50])
|
526 |
agg_preview = (out["aggregated_text"][:2000] + "...") if len(out["aggregated_text"]) > 2000 else out["aggregated_text"]
|
|
|
535 |
with gr.Column(scale=1):
|
536 |
claims_box = gr.Textbox(label="Claims (one per line)", lines=8, placeholder="e.g. NASA predicts three days of darkness", elem_classes=["glass"])
|
537 |
with gr.Row():
|
538 |
+
use_web = gr.Checkbox(value=True, label="Use Web retrieval")
|
539 |
+
use_wiki = gr.Checkbox(value=True, label="Use Wikipedia")
|
540 |
allowlist_box = gr.Textbox(label="Domain allowlist (comma-separated)", value=", ".join(DEFAULT_ALLOWLIST), lines=2)
|
541 |
run_btn = gr.Button("Run Fact-Check", elem_classes=["neon-btn"])
|
542 |
with gr.Column(scale=1):
|