|
import os |
|
import re |
|
import time |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
from typing import List, Optional, Dict, Any |
|
from urllib.parse import urlparse |
|
from fastapi import FastAPI, HTTPException, Query, Request, BackgroundTasks |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import JSONResponse |
|
from pydantic import BaseModel |
|
from selenium import webdriver |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from selenium.webdriver.chrome.options import Options |
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException |
|
import uvicorn |
|
|
|
app = FastAPI( |
|
title="Threads Media Extractor API", |
|
description="Extract media URLs from Threads posts - Optimized version", |
|
version="2.1.0" |
|
) |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
driver_pool = [] |
|
executor = ThreadPoolExecutor(max_workers=2) |
|
|
|
class MediaItem(BaseModel): |
|
url: str |
|
|
|
class ThreadsResponse(BaseModel): |
|
post_url: str |
|
url: Optional[str] = None |
|
picker: Optional[List[MediaItem]] = None |
|
media_count: int |
|
post_text: Optional[str] = None |
|
author: Optional[str] = None |
|
success: bool |
|
processing_time: Optional[float] = None |
|
|
|
class Config: |
|
|
|
exclude_none = True |
|
|
|
class ErrorResponse(BaseModel): |
|
error: str |
|
success: bool = False |
|
|
|
def create_optimized_driver(): |
|
"""Create and configure optimized Chrome WebDriver""" |
|
options = Options() |
|
options.add_argument('--headless=new') |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('--disable-dev-shm-usage') |
|
options.add_argument('--disable-gpu') |
|
options.add_argument('--disable-extensions') |
|
options.add_argument('--disable-plugins') |
|
options.add_argument('--disable-default-apps') |
|
options.add_argument('--disable-background-timer-throttling') |
|
options.add_argument('--disable-backgrounding-occluded-windows') |
|
options.add_argument('--disable-renderer-backgrounding') |
|
options.add_argument('--disable-features=TranslateUI') |
|
options.add_argument('--disable-ipc-flooding-protection') |
|
|
|
|
|
options.add_argument('--memory-pressure-off') |
|
options.add_argument('--max_old_space_size=4096') |
|
options.add_argument('--window-size=1280,720') |
|
|
|
|
|
options.add_argument('--aggressive-cache-discard') |
|
options.add_argument('--disable-background-networking') |
|
|
|
|
|
options.add_experimental_option('useAutomationExtension', False) |
|
options.add_experimental_option("excludeSwitches", ["enable-automation"]) |
|
options.add_argument('--disable-blink-features=AutomationControlled') |
|
|
|
|
|
options.add_argument('--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') |
|
|
|
try: |
|
driver = webdriver.Chrome(options=options) |
|
driver.implicitly_wait(5) |
|
driver.set_page_load_timeout(15) |
|
|
|
|
|
driver.execute_cdp_cmd('Network.setUserAgentOverride', { |
|
"userAgent": 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' |
|
}) |
|
|
|
return driver |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Failed to create browser driver: {str(e)}") |
|
|
|
def get_driver(): |
|
"""Get driver from pool or create new one""" |
|
if driver_pool: |
|
return driver_pool.pop() |
|
return create_optimized_driver() |
|
|
|
def return_driver(driver): |
|
"""Return driver to pool for reuse""" |
|
if len(driver_pool) < 2: |
|
driver_pool.append(driver) |
|
else: |
|
try: |
|
driver.quit() |
|
except: |
|
pass |
|
|
|
def extract_post_id_from_url(url: str) -> Optional[str]: |
|
"""Extract post ID from Threads URL""" |
|
patterns = [ |
|
r'threads\.net/@[^/]+/post/([A-Za-z0-9_-]+)', |
|
r'threads\.net/t/([A-Za-z0-9_-]+)', |
|
r'threads\.com/@[^/]+/post/([A-Za-z0-9_-]+)', |
|
r'threads\.com/t/([A-Za-z0-9_-]+)', |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, url) |
|
if match: |
|
return match.group(1) |
|
|
|
return None |
|
|
|
def is_valid_threads_url(url: str) -> bool: |
|
"""Validate if URL is a valid Threads URL""" |
|
try: |
|
parsed = urlparse(url) |
|
return ( |
|
parsed.netloc in ['threads.net', 'www.threads.net', 'threads.com', 'www.threads.com'] and |
|
(('/post/' in parsed.path) or ('/t/' in parsed.path)) |
|
) |
|
except: |
|
return False |
|
|
|
def fast_extract_media(driver: webdriver.Chrome, url: str) -> Dict[str, Any]: |
|
"""Optimized media extraction with faster loading""" |
|
media_urls = [] |
|
post_text = None |
|
author = None |
|
|
|
try: |
|
start_time = time.time() |
|
|
|
|
|
driver.get(url) |
|
|
|
|
|
try: |
|
WebDriverWait(driver, 8).until( |
|
lambda d: d.execute_script("return document.readyState") == "complete" |
|
) |
|
except TimeoutException: |
|
pass |
|
|
|
|
|
time.sleep(1.5) |
|
|
|
|
|
video_elements = driver.find_elements(By.TAG_NAME, 'video') |
|
for video in video_elements: |
|
src = video.get_attribute('src') |
|
if src and src.startswith('http'): |
|
media_urls.append(src) |
|
|
|
|
|
sources = video.find_elements(By.TAG_NAME, 'source') |
|
for source in sources: |
|
src = source.get_attribute('src') |
|
if src and src.startswith('http'): |
|
media_urls.append(src) |
|
|
|
|
|
if not media_urls: |
|
img_elements = driver.find_elements(By.TAG_NAME, 'img')[:10] |
|
for img in img_elements: |
|
src = img.get_attribute('src') |
|
if src and src.startswith('http') and any(ext in src.lower() for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']): |
|
if not any(exclude in src.lower() for exclude in ['profile', 'avatar', 'icon', 'logo']): |
|
media_urls.append(src) |
|
|
|
|
|
try: |
|
text_elements = driver.find_elements(By.CSS_SELECTOR, 'div[role="article"] span, article span')[:5] |
|
for element in text_elements: |
|
text = element.text.strip() |
|
if text and len(text) > 10 and not post_text: |
|
post_text = text |
|
break |
|
except: |
|
pass |
|
|
|
|
|
seen = set() |
|
unique_media_urls = [] |
|
for url in media_urls: |
|
if url not in seen: |
|
seen.add(url) |
|
unique_media_urls.append(url) |
|
|
|
processing_time = time.time() - start_time |
|
|
|
return { |
|
"media_urls": unique_media_urls, |
|
"post_text": post_text, |
|
"author": author, |
|
"processing_time": processing_time |
|
} |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error extracting media: {str(e)}") |
|
|
|
def extract_media_sync(url: str) -> Dict[str, Any]: |
|
"""Synchronous wrapper for thread execution""" |
|
driver = None |
|
try: |
|
driver = get_driver() |
|
result = fast_extract_media(driver, url) |
|
return result |
|
finally: |
|
if driver: |
|
return_driver(driver) |
|
|
|
|
|
|
|
@app.get("/") |
|
async def health_check(): |
|
"""Health check endpoint""" |
|
return { |
|
"status": "healthy", |
|
"service": "extractor", |
|
"version": "2.1.0", |
|
"driver_pool_size": len(driver_pool) |
|
} |
|
|
|
|
|
|
|
@app.get("/extract") |
|
async def extract_media(url: str = Query(..., description="Threads post URL")): |
|
""" |
|
Extract media URLs from a Threads post - Optimized version |
|
|
|
Args: |
|
url: The Threads post URL to extract media from |
|
|
|
Returns: |
|
ThreadsResponse with media URLs and metadata |
|
""" |
|
|
|
|
|
if not url: |
|
raise HTTPException(status_code=400, detail="URL parameter is required") |
|
|
|
if not is_valid_threads_url(url): |
|
raise HTTPException(status_code=400, detail="Invalid Threads URL format") |
|
|
|
|
|
post_id = extract_post_id_from_url(url) |
|
if not post_id: |
|
raise HTTPException(status_code=400, detail="Could not extract post ID from URL") |
|
|
|
try: |
|
|
|
loop = asyncio.get_event_loop() |
|
extracted_data = await loop.run_in_executor(executor, extract_media_sync, url) |
|
|
|
media_urls = extracted_data["media_urls"] |
|
media_count = len(media_urls) |
|
|
|
|
|
response_data = { |
|
"post_url": url, |
|
"media_count": media_count, |
|
"post_text": extracted_data["post_text"], |
|
"author": extracted_data["author"], |
|
"success": True, |
|
"processing_time": extracted_data.get("processing_time") |
|
} |
|
|
|
|
|
if media_count == 1: |
|
response_data["url"] = media_urls[0] |
|
|
|
elif media_count > 1: |
|
response_data["picker"] = [{"url": url} for url in media_urls] |
|
|
|
|
|
|
|
|
|
response = ThreadsResponse(**response_data) |
|
return JSONResponse(content=response.model_dump(exclude_none=True)) |
|
|
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
|
|
|
|
|
@app.on_event("shutdown") |
|
async def shutdown_event(): |
|
"""Clean up resources on shutdown""" |
|
executor.shutdown(wait=False) |
|
for driver in driver_pool: |
|
try: |
|
driver.quit() |
|
except: |
|
pass |
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
"""Custom HTTP exception handler""" |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={ |
|
"error": exc.detail, |
|
"success": False, |
|
"status_code": exc.status_code |
|
} |
|
) |
|
|
|
if __name__ == "__main__": |
|
port = int(os.environ.get("PORT", 7860)) |
|
uvicorn.run(app, host="0.0.0.0", port=port) |