import os import re import logging import uuid import time from datetime import datetime, timezone, timedelta from collections import defaultdict from typing import Optional, Dict, Any, List, AsyncGenerator import asyncio import subprocess import tempfile from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field import openai # For your custom API import google.generativeai as genai # For Gemini API from google.generativeai.types import GenerationConfig # --- Logging Configuration --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # --- Configuration --- CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1" CUSTOM_API_MODEL_DEFAULT = "gemma3:27b" DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest" GEMINI_REQUEST_TIMEOUT_SECONDS = 300 MAX_TRANSCRIPT_CHARS = 750000 COOKIES_FILE_PATH = "private.txt" # --- In-Memory Task Storage --- tasks_db: Dict[str, Dict[str, Any]] = {} # --- Pydantic Models --- class ChatPayload(BaseModel): message: str temperature: float = Field(0.6, ge=0.0, le=1.0) class SummarizeRequest(BaseModel): url: str prompt: str = "Please summarize the following video transcript." temperature: float = Field(0.6, ge=0.0, le=1.0) class GeminiTaskRequest(BaseModel): url: str message: str gemini_model: str api_key: str = Field(..., description="User's Gemini API Key") class TaskSubmissionResponse(BaseModel): task_id: str status: str task_detail_url: str class TaskStatusResponse(BaseModel): task_id: str status: str submitted_at: datetime last_updated_at: datetime result: Optional[str] = None error: Optional[str] = None # --- FastAPI App Initialization --- app = FastAPI( title="YouTube Summarizer Backend", description="A dual-endpoint API for video summarization. Provides a 'Fast Summary' using a custom model and an advanced 'Gemini Summary' with user-provided keys.", version="4.0.0" ) # --- Helper Functions --- def is_youtube_url(url: Optional[str]) -> bool: if not url: return False youtube_regex = ( r'(https?://)?(www\.)?' r'(youtube|youtu|youtube-nocookie)\.(com|be)/' r'(watch\?v=|embed/|v/|shorts/|.+\?v=)?([^&=%\?]{11})' ) return re.match(youtube_regex, url) is not None def extract_video_id(url: str) -> Optional[str]: if not is_youtube_url(url): return None patterns = [ r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', r'(?:embed\/|v\/|shorts\/)([0-9A-Za-z_-]{11}).*', r'youtu\.be\/([0-9A-Za-z_-]{11}).*' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def parse_vtt_content(vtt_content: str) -> str: """ Parses VTT subtitle content to extract text, removing timestamps, metadata, and duplicate lines. """ lines = vtt_content.split('\n') text_lines = [] seen_lines = set() for line in lines: line = line.strip() if (not line or line.startswith('WEBVTT') or line.startswith('NOTE') or '-->' in line or line.isdigit()): continue clean_line = re.sub(r'<[^>]+>', '', line) clean_line = re.sub(r'\s+', ' ', clean_line).strip() if not clean_line or clean_line in seen_lines: continue text_lines.append(clean_line) seen_lines.add(clean_line) return ' '.join(text_lines) async def get_transcript_with_yt_dlp_cookies(video_id: str, task_id: str) -> Optional[str]: """ Fetches transcript using yt-dlp, downloading the default available language. """ logger.info(f"[{task_id}] Attempting transcript fetch for video ID: {video_id}.") if not os.path.exists(COOKIES_FILE_PATH): logger.error(f"[{task_id}] Cookies file not found at {COOKIES_FILE_PATH}.") return None try: video_url = f"https://www.youtube.com/watch?v={video_id}" with tempfile.TemporaryDirectory() as temp_dir: cmd = [ "yt-dlp", "--skip-download", "--write-auto-subs", "--write-subs", "--sub-format", "vtt", "--cookies", COOKIES_FILE_PATH, "-o", os.path.join(temp_dir, "%(id)s.%(ext)s"), video_url ] result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60) if result.returncode != 0: logger.error(f"[{task_id}] yt-dlp failed. Stderr: {result.stderr}") return None downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.vtt')] if not downloaded_files: logger.warning(f"[{task_id}] No subtitle files found for video ID: {video_id}") return None subtitle_file_path = os.path.join(temp_dir, downloaded_files[0]) with open(subtitle_file_path, 'r', encoding='utf-8') as f: subtitle_content = f.read() transcript_text = parse_vtt_content(subtitle_content) if not transcript_text: return None logger.info(f"[{task_id}] Transcript fetched. Length: {len(transcript_text)}") return transcript_text[:MAX_TRANSCRIPT_CHARS] except Exception as e: logger.error(f"[{task_id}] Error fetching transcript: {e}", exc_info=True) return None # --- Internal Business Logic --- async def stream_custom_api_response(message: str, temperature: float) -> AsyncGenerator[str, None]: """ Reusable async generator to stream responses from the custom API. """ custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY") custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT) custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT) if not custom_api_key_secret: logger.error("'CUSTOM_API_SECRET_KEY' is not configured.") yield '{"error": "Backend API key is not configured."}' return try: from openai import AsyncOpenAI client = AsyncOpenAI(api_key=custom_api_key_secret, base_url=custom_api_base_url, timeout=60.0) stream = await client.chat.completions.create( model=custom_api_model, temperature=temperature, messages=[{"role": "user", "content": message}], stream=True ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content except Exception as e: logger.error(f"Error streaming from Custom API: {e}", exc_info=True) yield f'{{"error": "Error processing with the summary model: {str(e)}"}}' async def process_gemini_request_background(task_id: str, request_data: GeminiTaskRequest): """ Background process for the Gemini endpoint. Fetches transcript if possible, otherwise uses the URL, and calls the Gemini API with the user's key. """ tasks_db[task_id]["status"] = "PROCESSING" tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) logger.info(f"[{task_id}] Starting background Gemini processing for URL: {request_data.url}") video_id = extract_video_id(request_data.url) transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) if video_id else None try: genai.configure(api_key=request_data.api_key) model_instance = genai.GenerativeModel(model_name=request_data.gemini_model) content_parts = [{"text": request_data.message}] if transcript_text: logger.info(f"[{task_id}] Sending transcript to Gemini.") content_parts.append({"text": f"\n\nVideo Transcript:\n{transcript_text}"}) else: logger.info(f"[{task_id}] No transcript found. Sending video URL to Gemini for multimodal processing.") content_parts.append({"file_data": {"mime_type": "video/youtube", "file_uri": request_data.url}}) response = await model_instance.generate_content_async( [{"parts": content_parts}], stream=False, request_options={"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS} ) full_response_text = getattr(response, 'text', '') tasks_db[task_id]["status"] = "COMPLETED" tasks_db[task_id]["result"] = full_response_text logger.info(f"[{task_id}] Gemini processing successful.") except Exception as e: logger.error(f"[{task_id}] Error during Gemini processing: {e}", exc_info=True) tasks_db[task_id]["status"] = "FAILED" tasks_db[task_id]["error"] = str(e) finally: tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) # --- API Endpoints --- @app.post("/summarize", response_class=StreamingResponse) async def fast_summary(request_data: SummarizeRequest): """ Endpoint for a fast summary using the built-in custom model. Requires a transcript to be available for the video. """ task_id = f"summary-{uuid.uuid4()}" # A temporary ID for logging logger.info(f"[{task_id}] Received request for Fast Summary: {request_data.url}") if not is_youtube_url(request_data.url): return JSONResponse(status_code=400, content={"detail": "Invalid YouTube URL provided."}) video_id = extract_video_id(request_data.url) if not video_id: return JSONResponse(status_code=400, content={"detail": "Could not extract video ID from URL."}) transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) if not transcript_text: logger.warning(f"[{task_id}] No transcript found for Fast Summary.") error_content = { "detail": "A transcript could not be found for this video. The 'Fast Summary' feature requires available captions. Please try the 'Advanced Summary' with a Gemini model." } return JSONResponse(status_code=404, content=error_content) logger.info(f"[{task_id}] Transcript found, starting summary stream.") full_prompt = f"{request_data.prompt}\n\nTranscript:\n{transcript_text}" return StreamingResponse( stream_custom_api_response(full_prompt, request_data.temperature), media_type="text/plain; charset=utf-8" ) @app.post("/gemini/submit_task", response_model=TaskSubmissionResponse) async def submit_gemini_task( request_data: GeminiTaskRequest, background_tasks: BackgroundTasks, http_request: Request ): """ Endpoint for 'Bring-Your-Own-Key' Gemini summarization. Accepts a user's API key and model choice, and works whether a transcript is available or not. """ task_id = str(uuid.uuid4()) logger.info(f"[{task_id}] Received request for Gemini Summary. Model: {request_data.gemini_model}") if not is_youtube_url(request_data.url): raise HTTPException(status_code=400, detail="Invalid YouTube URL provided.") tasks_db[task_id] = { "status": "PENDING", "result": None, "error": None, "submitted_at": datetime.now(timezone.utc), "last_updated_at": datetime.now(timezone.utc), "request_params": request_data.model_dump(exclude={'api_key'}) # Exclude key from DB } background_tasks.add_task(process_gemini_request_background, task_id, request_data) return TaskSubmissionResponse( task_id=task_id, status="PENDING", task_detail_url=str(http_request.url_for('get_gemini_task_status', task_id=task_id)) ) @app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse) async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")): task = tasks_db.get(task_id) if not task: raise HTTPException(status_code=404, detail="Task ID not found.") return TaskStatusResponse(**task, task_id=task_id) @app.post("/chat", response_class=StreamingResponse) async def direct_chat(payload: ChatPayload): """ A general-purpose chat endpoint that streams responses from the custom model. """ logger.info(f"Direct chat request: '{payload.message[:50]}...'") return StreamingResponse( stream_custom_api_response(payload.message, payload.temperature), media_type="text/plain; charset=utf-8" ) @app.get("/") async def read_root(): return {"message": "Server Error 504."}