Spaces:
Running
Running
import os | |
import re | |
import logging | |
import uuid | |
import time | |
from datetime import datetime, timezone, timedelta | |
from collections import defaultdict | |
from typing import Optional, Dict, Any, List, AsyncGenerator | |
import asyncio | |
import subprocess | |
import tempfile | |
from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request | |
from fastapi.responses import StreamingResponse, JSONResponse | |
from pydantic import BaseModel, Field | |
import openai # For your custom API | |
import google.generativeai as genai # For Gemini API | |
from google.generativeai.types import GenerationConfig | |
# --- Logging Configuration --- | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
# --- Configuration --- | |
CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1" | |
CUSTOM_API_MODEL_DEFAULT = "gemma3:27b" | |
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest" | |
GEMINI_REQUEST_TIMEOUT_SECONDS = 300 | |
MAX_TRANSCRIPT_CHARS = 750000 | |
COOKIES_FILE_PATH = "private.txt" | |
# --- In-Memory Task Storage --- | |
tasks_db: Dict[str, Dict[str, Any]] = {} | |
# --- Pydantic Models --- | |
class ChatPayload(BaseModel): | |
message: str | |
temperature: float = Field(0.6, ge=0.0, le=1.0) | |
class SummarizeRequest(BaseModel): | |
url: str | |
prompt: str = "Please summarize the following video transcript." | |
temperature: float = Field(0.6, ge=0.0, le=1.0) | |
class GeminiTaskRequest(BaseModel): | |
url: str | |
message: str | |
gemini_model: str | |
api_key: str = Field(..., description="User's Gemini API Key") | |
class TaskSubmissionResponse(BaseModel): | |
task_id: str | |
status: str | |
task_detail_url: str | |
class TaskStatusResponse(BaseModel): | |
task_id: str | |
status: str | |
submitted_at: datetime | |
last_updated_at: datetime | |
result: Optional[str] = None | |
error: Optional[str] = None | |
# --- FastAPI App Initialization --- | |
app = FastAPI( | |
title="YouTube Summarizer Backend", | |
description="A dual-endpoint API for video summarization. Provides a 'Fast Summary' using a custom model and an advanced 'Gemini Summary' with user-provided keys.", | |
version="4.0.0" | |
) | |
# --- Helper Functions --- | |
def is_youtube_url(url: Optional[str]) -> bool: | |
if not url: | |
return False | |
youtube_regex = ( | |
r'(https?://)?(www\.)?' | |
r'(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
r'(watch\?v=|embed/|v/|shorts/|.+\?v=)?([^&=%\?]{11})' | |
) | |
return re.match(youtube_regex, url) is not None | |
def extract_video_id(url: str) -> Optional[str]: | |
if not is_youtube_url(url): | |
return None | |
patterns = [ | |
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', | |
r'(?:embed\/|v\/|shorts\/)([0-9A-Za-z_-]{11}).*', | |
r'youtu\.be\/([0-9A-Za-z_-]{11}).*' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return None | |
def parse_vtt_content(vtt_content: str) -> str: | |
""" | |
Parses VTT subtitle content to extract text, removing timestamps, metadata, and duplicate lines. | |
""" | |
lines = vtt_content.split('\n') | |
text_lines = [] | |
seen_lines = set() | |
for line in lines: | |
line = line.strip() | |
if (not line or line.startswith('WEBVTT') or line.startswith('NOTE') or '-->' in line or line.isdigit()): | |
continue | |
clean_line = re.sub(r'<[^>]+>', '', line) | |
clean_line = re.sub(r'\s+', ' ', clean_line).strip() | |
if not clean_line or clean_line in seen_lines: | |
continue | |
text_lines.append(clean_line) | |
seen_lines.add(clean_line) | |
return ' '.join(text_lines) | |
async def get_transcript_with_yt_dlp_cookies(video_id: str, task_id: str) -> Optional[str]: | |
""" | |
Fetches transcript using yt-dlp, downloading the default available language. | |
""" | |
logger.info(f"[{task_id}] Attempting transcript fetch for video ID: {video_id}.") | |
if not os.path.exists(COOKIES_FILE_PATH): | |
logger.error(f"[{task_id}] Cookies file not found at {COOKIES_FILE_PATH}.") | |
return None | |
try: | |
video_url = f"https://www.youtube.com/watch?v={video_id}" | |
with tempfile.TemporaryDirectory() as temp_dir: | |
cmd = [ | |
"yt-dlp", | |
"--skip-download", | |
"--write-auto-subs", | |
"--write-subs", | |
"--sub-format", "vtt", | |
"--cookies", COOKIES_FILE_PATH, | |
"-o", os.path.join(temp_dir, "%(id)s.%(ext)s"), | |
video_url | |
] | |
result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60) | |
if result.returncode != 0: | |
logger.error(f"[{task_id}] yt-dlp failed. Stderr: {result.stderr}") | |
return None | |
downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.vtt')] | |
if not downloaded_files: | |
logger.warning(f"[{task_id}] No subtitle files found for video ID: {video_id}") | |
return None | |
subtitle_file_path = os.path.join(temp_dir, downloaded_files[0]) | |
with open(subtitle_file_path, 'r', encoding='utf-8') as f: | |
subtitle_content = f.read() | |
transcript_text = parse_vtt_content(subtitle_content) | |
if not transcript_text: | |
return None | |
logger.info(f"[{task_id}] Transcript fetched. Length: {len(transcript_text)}") | |
return transcript_text[:MAX_TRANSCRIPT_CHARS] | |
except Exception as e: | |
logger.error(f"[{task_id}] Error fetching transcript: {e}", exc_info=True) | |
return None | |
# --- Internal Business Logic --- | |
async def stream_custom_api_response(message: str, temperature: float) -> AsyncGenerator[str, None]: | |
""" | |
Reusable async generator to stream responses from the custom API. | |
""" | |
custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY") | |
custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT) | |
custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT) | |
if not custom_api_key_secret: | |
logger.error("'CUSTOM_API_SECRET_KEY' is not configured.") | |
yield '{"error": "Backend API key is not configured."}' | |
return | |
try: | |
from openai import AsyncOpenAI | |
client = AsyncOpenAI(api_key=custom_api_key_secret, base_url=custom_api_base_url, timeout=60.0) | |
stream = await client.chat.completions.create( | |
model=custom_api_model, | |
temperature=temperature, | |
messages=[{"role": "user", "content": message}], | |
stream=True | |
) | |
async for chunk in stream: | |
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
yield chunk.choices[0].delta.content | |
except Exception as e: | |
logger.error(f"Error streaming from Custom API: {e}", exc_info=True) | |
yield f'{{"error": "Error processing with the summary model: {str(e)}"}}' | |
async def process_gemini_request_background(task_id: str, request_data: GeminiTaskRequest): | |
""" | |
Background process for the Gemini endpoint. Fetches transcript if possible, | |
otherwise uses the URL, and calls the Gemini API with the user's key. | |
""" | |
tasks_db[task_id]["status"] = "PROCESSING" | |
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) | |
logger.info(f"[{task_id}] Starting background Gemini processing for URL: {request_data.url}") | |
video_id = extract_video_id(request_data.url) | |
transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) if video_id else None | |
try: | |
genai.configure(api_key=request_data.api_key) | |
model_instance = genai.GenerativeModel(model_name=request_data.gemini_model) | |
content_parts = [{"text": request_data.message}] | |
if transcript_text: | |
logger.info(f"[{task_id}] Sending transcript to Gemini.") | |
content_parts.append({"text": f"\n\nVideo Transcript:\n{transcript_text}"}) | |
else: | |
logger.info(f"[{task_id}] No transcript found. Sending video URL to Gemini for multimodal processing.") | |
content_parts.append({"file_data": {"mime_type": "video/youtube", "file_uri": request_data.url}}) | |
response = await model_instance.generate_content_async( | |
[{"parts": content_parts}], | |
stream=False, | |
request_options={"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS} | |
) | |
full_response_text = getattr(response, 'text', '') | |
tasks_db[task_id]["status"] = "COMPLETED" | |
tasks_db[task_id]["result"] = full_response_text | |
logger.info(f"[{task_id}] Gemini processing successful.") | |
except Exception as e: | |
logger.error(f"[{task_id}] Error during Gemini processing: {e}", exc_info=True) | |
tasks_db[task_id]["status"] = "FAILED" | |
tasks_db[task_id]["error"] = str(e) | |
finally: | |
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) | |
# --- API Endpoints --- | |
async def fast_summary(request_data: SummarizeRequest): | |
""" | |
Endpoint for a fast summary using the built-in custom model. | |
Requires a transcript to be available for the video. | |
""" | |
task_id = f"summary-{uuid.uuid4()}" # A temporary ID for logging | |
logger.info(f"[{task_id}] Received request for Fast Summary: {request_data.url}") | |
if not is_youtube_url(request_data.url): | |
return JSONResponse(status_code=400, content={"detail": "Invalid YouTube URL provided."}) | |
video_id = extract_video_id(request_data.url) | |
if not video_id: | |
return JSONResponse(status_code=400, content={"detail": "Could not extract video ID from URL."}) | |
transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) | |
if not transcript_text: | |
logger.warning(f"[{task_id}] No transcript found for Fast Summary.") | |
error_content = { | |
"detail": "A transcript could not be found for this video. The 'Fast Summary' feature requires available captions. Please try the 'Advanced Summary' with a Gemini model." | |
} | |
return JSONResponse(status_code=404, content=error_content) | |
logger.info(f"[{task_id}] Transcript found, starting summary stream.") | |
full_prompt = f"{request_data.prompt}\n\nTranscript:\n{transcript_text}" | |
return StreamingResponse( | |
stream_custom_api_response(full_prompt, request_data.temperature), | |
media_type="text/plain; charset=utf-8" | |
) | |
async def submit_gemini_task( | |
request_data: GeminiTaskRequest, | |
background_tasks: BackgroundTasks, | |
http_request: Request | |
): | |
""" | |
Endpoint for 'Bring-Your-Own-Key' Gemini summarization. | |
Accepts a user's API key and model choice, and works whether a transcript is available or not. | |
""" | |
task_id = str(uuid.uuid4()) | |
logger.info(f"[{task_id}] Received request for Gemini Summary. Model: {request_data.gemini_model}") | |
if not is_youtube_url(request_data.url): | |
raise HTTPException(status_code=400, detail="Invalid YouTube URL provided.") | |
tasks_db[task_id] = { | |
"status": "PENDING", "result": None, "error": None, | |
"submitted_at": datetime.now(timezone.utc), | |
"last_updated_at": datetime.now(timezone.utc), | |
"request_params": request_data.model_dump(exclude={'api_key'}) # Exclude key from DB | |
} | |
background_tasks.add_task(process_gemini_request_background, task_id, request_data) | |
return TaskSubmissionResponse( | |
task_id=task_id, | |
status="PENDING", | |
task_detail_url=str(http_request.url_for('get_gemini_task_status', task_id=task_id)) | |
) | |
async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")): | |
task = tasks_db.get(task_id) | |
if not task: | |
raise HTTPException(status_code=404, detail="Task ID not found.") | |
return TaskStatusResponse(**task, task_id=task_id) | |
async def direct_chat(payload: ChatPayload): | |
""" | |
A general-purpose chat endpoint that streams responses from the custom model. | |
""" | |
logger.info(f"Direct chat request: '{payload.message[:50]}...'") | |
return StreamingResponse( | |
stream_custom_api_response(payload.message, payload.temperature), | |
media_type="text/plain; charset=utf-8" | |
) | |
async def read_root(): | |
return {"message": "Server Error 504."} |