Chrunos's picture
Update app.py
9595346 verified
import os
import re
import logging
import uuid
import time
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from typing import Optional, Dict, Any, List, AsyncGenerator
import asyncio
import subprocess
import tempfile
from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
import openai # For your custom API
import google.generativeai as genai # For Gemini API
from google.generativeai.types import GenerationConfig
# --- Logging Configuration ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# --- Configuration ---
CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1"
CUSTOM_API_MODEL_DEFAULT = "gemma3:27b"
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest"
GEMINI_REQUEST_TIMEOUT_SECONDS = 300
MAX_TRANSCRIPT_CHARS = 750000
COOKIES_FILE_PATH = "private.txt"
# --- In-Memory Task Storage ---
tasks_db: Dict[str, Dict[str, Any]] = {}
# --- Pydantic Models ---
class ChatPayload(BaseModel):
message: str
temperature: float = Field(0.6, ge=0.0, le=1.0)
class SummarizeRequest(BaseModel):
url: str
prompt: str = "Please summarize the following video transcript."
temperature: float = Field(0.6, ge=0.0, le=1.0)
class GeminiTaskRequest(BaseModel):
url: str
message: str
gemini_model: str
api_key: str = Field(..., description="User's Gemini API Key")
class TaskSubmissionResponse(BaseModel):
task_id: str
status: str
task_detail_url: str
class TaskStatusResponse(BaseModel):
task_id: str
status: str
submitted_at: datetime
last_updated_at: datetime
result: Optional[str] = None
error: Optional[str] = None
# --- FastAPI App Initialization ---
app = FastAPI(
title="YouTube Summarizer Backend",
description="A dual-endpoint API for video summarization. Provides a 'Fast Summary' using a custom model and an advanced 'Gemini Summary' with user-provided keys.",
version="4.0.0"
)
# --- Helper Functions ---
def is_youtube_url(url: Optional[str]) -> bool:
if not url:
return False
youtube_regex = (
r'(https?://)?(www\.)?'
r'(youtube|youtu|youtube-nocookie)\.(com|be)/'
r'(watch\?v=|embed/|v/|shorts/|.+\?v=)?([^&=%\?]{11})'
)
return re.match(youtube_regex, url) is not None
def extract_video_id(url: str) -> Optional[str]:
if not is_youtube_url(url):
return None
patterns = [
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
r'(?:embed\/|v\/|shorts\/)([0-9A-Za-z_-]{11}).*',
r'youtu\.be\/([0-9A-Za-z_-]{11}).*'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def parse_vtt_content(vtt_content: str) -> str:
"""
Parses VTT subtitle content to extract text, removing timestamps, metadata, and duplicate lines.
"""
lines = vtt_content.split('\n')
text_lines = []
seen_lines = set()
for line in lines:
line = line.strip()
if (not line or line.startswith('WEBVTT') or line.startswith('NOTE') or '-->' in line or line.isdigit()):
continue
clean_line = re.sub(r'<[^>]+>', '', line)
clean_line = re.sub(r'\s+', ' ', clean_line).strip()
if not clean_line or clean_line in seen_lines:
continue
text_lines.append(clean_line)
seen_lines.add(clean_line)
return ' '.join(text_lines)
async def get_transcript_with_yt_dlp_cookies(video_id: str, task_id: str) -> Optional[str]:
"""
Fetches transcript using yt-dlp, downloading the default available language.
"""
logger.info(f"[{task_id}] Attempting transcript fetch for video ID: {video_id}.")
if not os.path.exists(COOKIES_FILE_PATH):
logger.error(f"[{task_id}] Cookies file not found at {COOKIES_FILE_PATH}.")
return None
try:
video_url = f"https://www.youtube.com/watch?v={video_id}"
with tempfile.TemporaryDirectory() as temp_dir:
cmd = [
"yt-dlp",
"--skip-download",
"--write-auto-subs",
"--write-subs",
"--sub-format", "vtt",
"--cookies", COOKIES_FILE_PATH,
"-o", os.path.join(temp_dir, "%(id)s.%(ext)s"),
video_url
]
result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
logger.error(f"[{task_id}] yt-dlp failed. Stderr: {result.stderr}")
return None
downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.vtt')]
if not downloaded_files:
logger.warning(f"[{task_id}] No subtitle files found for video ID: {video_id}")
return None
subtitle_file_path = os.path.join(temp_dir, downloaded_files[0])
with open(subtitle_file_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
transcript_text = parse_vtt_content(subtitle_content)
if not transcript_text:
return None
logger.info(f"[{task_id}] Transcript fetched. Length: {len(transcript_text)}")
return transcript_text[:MAX_TRANSCRIPT_CHARS]
except Exception as e:
logger.error(f"[{task_id}] Error fetching transcript: {e}", exc_info=True)
return None
# --- Internal Business Logic ---
async def stream_custom_api_response(message: str, temperature: float) -> AsyncGenerator[str, None]:
"""
Reusable async generator to stream responses from the custom API.
"""
custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY")
custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT)
custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT)
if not custom_api_key_secret:
logger.error("'CUSTOM_API_SECRET_KEY' is not configured.")
yield '{"error": "Backend API key is not configured."}'
return
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=custom_api_key_secret, base_url=custom_api_base_url, timeout=60.0)
stream = await client.chat.completions.create(
model=custom_api_model,
temperature=temperature,
messages=[{"role": "user", "content": message}],
stream=True
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
logger.error(f"Error streaming from Custom API: {e}", exc_info=True)
yield f'{{"error": "Error processing with the summary model: {str(e)}"}}'
async def process_gemini_request_background(task_id: str, request_data: GeminiTaskRequest):
"""
Background process for the Gemini endpoint. Fetches transcript if possible,
otherwise uses the URL, and calls the Gemini API with the user's key.
"""
tasks_db[task_id]["status"] = "PROCESSING"
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
logger.info(f"[{task_id}] Starting background Gemini processing for URL: {request_data.url}")
video_id = extract_video_id(request_data.url)
transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) if video_id else None
try:
genai.configure(api_key=request_data.api_key)
model_instance = genai.GenerativeModel(model_name=request_data.gemini_model)
content_parts = [{"text": request_data.message}]
if transcript_text:
logger.info(f"[{task_id}] Sending transcript to Gemini.")
content_parts.append({"text": f"\n\nVideo Transcript:\n{transcript_text}"})
else:
logger.info(f"[{task_id}] No transcript found. Sending video URL to Gemini for multimodal processing.")
content_parts.append({"file_data": {"mime_type": "video/youtube", "file_uri": request_data.url}})
response = await model_instance.generate_content_async(
[{"parts": content_parts}],
stream=False,
request_options={"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS}
)
full_response_text = getattr(response, 'text', '')
tasks_db[task_id]["status"] = "COMPLETED"
tasks_db[task_id]["result"] = full_response_text
logger.info(f"[{task_id}] Gemini processing successful.")
except Exception as e:
logger.error(f"[{task_id}] Error during Gemini processing: {e}", exc_info=True)
tasks_db[task_id]["status"] = "FAILED"
tasks_db[task_id]["error"] = str(e)
finally:
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
# --- API Endpoints ---
@app.post("/summarize", response_class=StreamingResponse)
async def fast_summary(request_data: SummarizeRequest):
"""
Endpoint for a fast summary using the built-in custom model.
Requires a transcript to be available for the video.
"""
task_id = f"summary-{uuid.uuid4()}" # A temporary ID for logging
logger.info(f"[{task_id}] Received request for Fast Summary: {request_data.url}")
if not is_youtube_url(request_data.url):
return JSONResponse(status_code=400, content={"detail": "Invalid YouTube URL provided."})
video_id = extract_video_id(request_data.url)
if not video_id:
return JSONResponse(status_code=400, content={"detail": "Could not extract video ID from URL."})
transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id)
if not transcript_text:
logger.warning(f"[{task_id}] No transcript found for Fast Summary.")
error_content = {
"detail": "A transcript could not be found for this video. The 'Fast Summary' feature requires available captions. Please try the 'Advanced Summary' with a Gemini model."
}
return JSONResponse(status_code=404, content=error_content)
logger.info(f"[{task_id}] Transcript found, starting summary stream.")
full_prompt = f"{request_data.prompt}\n\nTranscript:\n{transcript_text}"
return StreamingResponse(
stream_custom_api_response(full_prompt, request_data.temperature),
media_type="text/plain; charset=utf-8"
)
@app.post("/gemini/submit_task", response_model=TaskSubmissionResponse)
async def submit_gemini_task(
request_data: GeminiTaskRequest,
background_tasks: BackgroundTasks,
http_request: Request
):
"""
Endpoint for 'Bring-Your-Own-Key' Gemini summarization.
Accepts a user's API key and model choice, and works whether a transcript is available or not.
"""
task_id = str(uuid.uuid4())
logger.info(f"[{task_id}] Received request for Gemini Summary. Model: {request_data.gemini_model}")
if not is_youtube_url(request_data.url):
raise HTTPException(status_code=400, detail="Invalid YouTube URL provided.")
tasks_db[task_id] = {
"status": "PENDING", "result": None, "error": None,
"submitted_at": datetime.now(timezone.utc),
"last_updated_at": datetime.now(timezone.utc),
"request_params": request_data.model_dump(exclude={'api_key'}) # Exclude key from DB
}
background_tasks.add_task(process_gemini_request_background, task_id, request_data)
return TaskSubmissionResponse(
task_id=task_id,
status="PENDING",
task_detail_url=str(http_request.url_for('get_gemini_task_status', task_id=task_id))
)
@app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse)
async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")):
task = tasks_db.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task ID not found.")
return TaskStatusResponse(**task, task_id=task_id)
@app.post("/chat", response_class=StreamingResponse)
async def direct_chat(payload: ChatPayload):
"""
A general-purpose chat endpoint that streams responses from the custom model.
"""
logger.info(f"Direct chat request: '{payload.message[:50]}...'")
return StreamingResponse(
stream_custom_api_response(payload.message, payload.temperature),
media_type="text/plain; charset=utf-8"
)
@app.get("/")
async def read_root():
return {"message": "Server Error 504."}