reads / app.py
Chrunos's picture
Update app.py
cb4be55 verified
import os
import re
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Dict, Any
from urllib.parse import urlparse
from fastapi import FastAPI, HTTPException, Query, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
import uvicorn
app = FastAPI(
title="Threads Media Extractor API",
description="Extract media URLs from Threads posts - Optimized version",
version="2.1.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global driver pool for reuse
driver_pool = []
executor = ThreadPoolExecutor(max_workers=2)
class MediaItem(BaseModel):
url: str
class ThreadsResponse(BaseModel):
post_url: str
url: Optional[str] = None
picker: Optional[List[MediaItem]] = None
media_count: int
post_text: Optional[str] = None
author: Optional[str] = None
success: bool
processing_time: Optional[float] = None
class Config:
# Exclude fields that are None from the response
exclude_none = True
class ErrorResponse(BaseModel):
error: str
success: bool = False
def create_optimized_driver():
"""Create and configure optimized Chrome WebDriver"""
options = Options()
options.add_argument('--headless=new') # Use new headless mode
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--disable-extensions')
options.add_argument('--disable-plugins')
options.add_argument('--disable-default-apps')
options.add_argument('--disable-background-timer-throttling')
options.add_argument('--disable-backgrounding-occluded-windows')
options.add_argument('--disable-renderer-backgrounding')
options.add_argument('--disable-features=TranslateUI')
options.add_argument('--disable-ipc-flooding-protection')
# Performance optimizations
options.add_argument('--memory-pressure-off')
options.add_argument('--max_old_space_size=4096')
options.add_argument('--window-size=1280,720') # Smaller window
# Network optimizations
options.add_argument('--aggressive-cache-discard')
options.add_argument('--disable-background-networking')
# Disable unnecessary features
options.add_experimental_option('useAutomationExtension', False)
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_argument('--disable-blink-features=AutomationControlled')
# User agent
options.add_argument('--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
try:
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(5) # Reduced wait time
driver.set_page_load_timeout(15) # Reduced timeout
# Optimize browser settings
driver.execute_cdp_cmd('Network.setUserAgentOverride', {
"userAgent": 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
})
return driver
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create browser driver: {str(e)}")
def get_driver():
"""Get driver from pool or create new one"""
if driver_pool:
return driver_pool.pop()
return create_optimized_driver()
def return_driver(driver):
"""Return driver to pool for reuse"""
if len(driver_pool) < 2: # Keep max 2 drivers in pool
driver_pool.append(driver)
else:
try:
driver.quit()
except:
pass
def extract_post_id_from_url(url: str) -> Optional[str]:
"""Extract post ID from Threads URL"""
patterns = [
r'threads\.net/@[^/]+/post/([A-Za-z0-9_-]+)',
r'threads\.net/t/([A-Za-z0-9_-]+)',
r'threads\.com/@[^/]+/post/([A-Za-z0-9_-]+)',
r'threads\.com/t/([A-Za-z0-9_-]+)',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def is_valid_threads_url(url: str) -> bool:
"""Validate if URL is a valid Threads URL"""
try:
parsed = urlparse(url)
return (
parsed.netloc in ['threads.net', 'www.threads.net', 'threads.com', 'www.threads.com'] and
(('/post/' in parsed.path) or ('/t/' in parsed.path))
)
except:
return False
def fast_extract_media(driver: webdriver.Chrome, url: str) -> Dict[str, Any]:
"""Optimized media extraction with faster loading"""
media_urls = []
post_text = None
author = None
try:
start_time = time.time()
# Navigate to the URL
driver.get(url)
# Wait for essential elements only
try:
WebDriverWait(driver, 8).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except TimeoutException:
pass # Continue even if timeout
# Quick wait for dynamic content
time.sleep(1.5) # Reduced from 3 seconds
# Extract videos first (most important)
video_elements = driver.find_elements(By.TAG_NAME, 'video')
for video in video_elements:
src = video.get_attribute('src')
if src and src.startswith('http'):
media_urls.append(src)
# Check source elements
sources = video.find_elements(By.TAG_NAME, 'source')
for source in sources:
src = source.get_attribute('src')
if src and src.startswith('http'):
media_urls.append(src)
# If no videos found, look for images quickly
if not media_urls:
img_elements = driver.find_elements(By.TAG_NAME, 'img')[:10] # Limit to first 10 images
for img in img_elements:
src = img.get_attribute('src')
if src and src.startswith('http') and any(ext in src.lower() for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']):
if not any(exclude in src.lower() for exclude in ['profile', 'avatar', 'icon', 'logo']):
media_urls.append(src)
# Quick text extraction (optional, skip if taking too long)
try:
text_elements = driver.find_elements(By.CSS_SELECTOR, 'div[role="article"] span, article span')[:5]
for element in text_elements:
text = element.text.strip()
if text and len(text) > 10 and not post_text:
post_text = text
break
except:
pass
# Remove duplicates
seen = set()
unique_media_urls = []
for url in media_urls:
if url not in seen:
seen.add(url)
unique_media_urls.append(url)
processing_time = time.time() - start_time
return {
"media_urls": unique_media_urls,
"post_text": post_text,
"author": author,
"processing_time": processing_time
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error extracting media: {str(e)}")
def extract_media_sync(url: str) -> Dict[str, Any]:
"""Synchronous wrapper for thread execution"""
driver = None
try:
driver = get_driver()
result = fast_extract_media(driver, url)
return result
finally:
if driver:
return_driver(driver)
@app.get("/")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "extractor",
"version": "2.1.0",
"driver_pool_size": len(driver_pool)
}
@app.get("/extract")
async def extract_media(url: str = Query(..., description="Threads post URL")):
"""
Extract media URLs from a Threads post - Optimized version
Args:
url: The Threads post URL to extract media from
Returns:
ThreadsResponse with media URLs and metadata
"""
# Validate URL
if not url:
raise HTTPException(status_code=400, detail="URL parameter is required")
if not is_valid_threads_url(url):
raise HTTPException(status_code=400, detail="Invalid Threads URL format")
# Extract post ID
post_id = extract_post_id_from_url(url)
if not post_id:
raise HTTPException(status_code=400, detail="Could not extract post ID from URL")
try:
# Run extraction in thread pool for better async handling
loop = asyncio.get_event_loop()
extracted_data = await loop.run_in_executor(executor, extract_media_sync, url)
media_urls = extracted_data["media_urls"]
media_count = len(media_urls)
# Base response data
response_data = {
"post_url": url,
"media_count": media_count,
"post_text": extracted_data["post_text"],
"author": extracted_data["author"],
"success": True,
"processing_time": extracted_data.get("processing_time")
}
# Conditionally add url or picker based on media count
if media_count == 1:
response_data["url"] = media_urls[0]
# Don't include picker field at all
elif media_count > 1:
response_data["picker"] = [{"url": url} for url in media_urls]
# Don't include url field at all
# If media_count is 0, neither url nor picker will be included
# Create response and return as JSON with excluded None values
response = ThreadsResponse(**response_data)
return JSONResponse(content=response.model_dump(exclude_none=True))
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown"""
executor.shutdown(wait=False)
for driver in driver_pool:
try:
driver.quit()
except:
pass
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Custom HTTP exception handler"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"success": False,
"status_code": exc.status_code
}
)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)