Spaces:
Running
Running
File size: 12,707 Bytes
97f80b5 9595346 97f80b5 9595346 97f80b5 9ce8ff3 97f80b5 9ce8ff3 97f80b5 9ce8ff3 97f80b5 9ce8ff3 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9ce8ff3 97f80b5 9ce8ff3 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9ce8ff3 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 97f80b5 9595346 9ce8ff3 97f80b5 9595346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
import os
import re
import logging
import uuid
import time
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from typing import Optional, Dict, Any, List, AsyncGenerator
import asyncio
import subprocess
import tempfile
from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
import openai # For your custom API
import google.generativeai as genai # For Gemini API
from google.generativeai.types import GenerationConfig
# --- Logging Configuration ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# --- Configuration ---
CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1"
CUSTOM_API_MODEL_DEFAULT = "gemma3:27b"
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest"
GEMINI_REQUEST_TIMEOUT_SECONDS = 300
MAX_TRANSCRIPT_CHARS = 750000
COOKIES_FILE_PATH = "private.txt"
# --- In-Memory Task Storage ---
tasks_db: Dict[str, Dict[str, Any]] = {}
# --- Pydantic Models ---
class ChatPayload(BaseModel):
message: str
temperature: float = Field(0.6, ge=0.0, le=1.0)
class SummarizeRequest(BaseModel):
url: str
prompt: str = "Please summarize the following video transcript."
temperature: float = Field(0.6, ge=0.0, le=1.0)
class GeminiTaskRequest(BaseModel):
url: str
message: str
gemini_model: str
api_key: str = Field(..., description="User's Gemini API Key")
class TaskSubmissionResponse(BaseModel):
task_id: str
status: str
task_detail_url: str
class TaskStatusResponse(BaseModel):
task_id: str
status: str
submitted_at: datetime
last_updated_at: datetime
result: Optional[str] = None
error: Optional[str] = None
# --- FastAPI App Initialization ---
app = FastAPI(
title="YouTube Summarizer Backend",
description="A dual-endpoint API for video summarization. Provides a 'Fast Summary' using a custom model and an advanced 'Gemini Summary' with user-provided keys.",
version="4.0.0"
)
# --- Helper Functions ---
def is_youtube_url(url: Optional[str]) -> bool:
if not url:
return False
youtube_regex = (
r'(https?://)?(www\.)?'
r'(youtube|youtu|youtube-nocookie)\.(com|be)/'
r'(watch\?v=|embed/|v/|shorts/|.+\?v=)?([^&=%\?]{11})'
)
return re.match(youtube_regex, url) is not None
def extract_video_id(url: str) -> Optional[str]:
if not is_youtube_url(url):
return None
patterns = [
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
r'(?:embed\/|v\/|shorts\/)([0-9A-Za-z_-]{11}).*',
r'youtu\.be\/([0-9A-Za-z_-]{11}).*'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def parse_vtt_content(vtt_content: str) -> str:
"""
Parses VTT subtitle content to extract text, removing timestamps, metadata, and duplicate lines.
"""
lines = vtt_content.split('\n')
text_lines = []
seen_lines = set()
for line in lines:
line = line.strip()
if (not line or line.startswith('WEBVTT') or line.startswith('NOTE') or '-->' in line or line.isdigit()):
continue
clean_line = re.sub(r'<[^>]+>', '', line)
clean_line = re.sub(r'\s+', ' ', clean_line).strip()
if not clean_line or clean_line in seen_lines:
continue
text_lines.append(clean_line)
seen_lines.add(clean_line)
return ' '.join(text_lines)
async def get_transcript_with_yt_dlp_cookies(video_id: str, task_id: str) -> Optional[str]:
"""
Fetches transcript using yt-dlp, downloading the default available language.
"""
logger.info(f"[{task_id}] Attempting transcript fetch for video ID: {video_id}.")
if not os.path.exists(COOKIES_FILE_PATH):
logger.error(f"[{task_id}] Cookies file not found at {COOKIES_FILE_PATH}.")
return None
try:
video_url = f"https://www.youtube.com/watch?v={video_id}"
with tempfile.TemporaryDirectory() as temp_dir:
cmd = [
"yt-dlp",
"--skip-download",
"--write-auto-subs",
"--write-subs",
"--sub-format", "vtt",
"--cookies", COOKIES_FILE_PATH,
"-o", os.path.join(temp_dir, "%(id)s.%(ext)s"),
video_url
]
result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
logger.error(f"[{task_id}] yt-dlp failed. Stderr: {result.stderr}")
return None
downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.vtt')]
if not downloaded_files:
logger.warning(f"[{task_id}] No subtitle files found for video ID: {video_id}")
return None
subtitle_file_path = os.path.join(temp_dir, downloaded_files[0])
with open(subtitle_file_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
transcript_text = parse_vtt_content(subtitle_content)
if not transcript_text:
return None
logger.info(f"[{task_id}] Transcript fetched. Length: {len(transcript_text)}")
return transcript_text[:MAX_TRANSCRIPT_CHARS]
except Exception as e:
logger.error(f"[{task_id}] Error fetching transcript: {e}", exc_info=True)
return None
# --- Internal Business Logic ---
async def stream_custom_api_response(message: str, temperature: float) -> AsyncGenerator[str, None]:
"""
Reusable async generator to stream responses from the custom API.
"""
custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY")
custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT)
custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT)
if not custom_api_key_secret:
logger.error("'CUSTOM_API_SECRET_KEY' is not configured.")
yield '{"error": "Backend API key is not configured."}'
return
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=custom_api_key_secret, base_url=custom_api_base_url, timeout=60.0)
stream = await client.chat.completions.create(
model=custom_api_model,
temperature=temperature,
messages=[{"role": "user", "content": message}],
stream=True
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
logger.error(f"Error streaming from Custom API: {e}", exc_info=True)
yield f'{{"error": "Error processing with the summary model: {str(e)}"}}'
async def process_gemini_request_background(task_id: str, request_data: GeminiTaskRequest):
"""
Background process for the Gemini endpoint. Fetches transcript if possible,
otherwise uses the URL, and calls the Gemini API with the user's key.
"""
tasks_db[task_id]["status"] = "PROCESSING"
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
logger.info(f"[{task_id}] Starting background Gemini processing for URL: {request_data.url}")
video_id = extract_video_id(request_data.url)
transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) if video_id else None
try:
genai.configure(api_key=request_data.api_key)
model_instance = genai.GenerativeModel(model_name=request_data.gemini_model)
content_parts = [{"text": request_data.message}]
if transcript_text:
logger.info(f"[{task_id}] Sending transcript to Gemini.")
content_parts.append({"text": f"\n\nVideo Transcript:\n{transcript_text}"})
else:
logger.info(f"[{task_id}] No transcript found. Sending video URL to Gemini for multimodal processing.")
content_parts.append({"file_data": {"mime_type": "video/youtube", "file_uri": request_data.url}})
response = await model_instance.generate_content_async(
[{"parts": content_parts}],
stream=False,
request_options={"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS}
)
full_response_text = getattr(response, 'text', '')
tasks_db[task_id]["status"] = "COMPLETED"
tasks_db[task_id]["result"] = full_response_text
logger.info(f"[{task_id}] Gemini processing successful.")
except Exception as e:
logger.error(f"[{task_id}] Error during Gemini processing: {e}", exc_info=True)
tasks_db[task_id]["status"] = "FAILED"
tasks_db[task_id]["error"] = str(e)
finally:
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
# --- API Endpoints ---
@app.post("/summarize", response_class=StreamingResponse)
async def fast_summary(request_data: SummarizeRequest):
"""
Endpoint for a fast summary using the built-in custom model.
Requires a transcript to be available for the video.
"""
task_id = f"summary-{uuid.uuid4()}" # A temporary ID for logging
logger.info(f"[{task_id}] Received request for Fast Summary: {request_data.url}")
if not is_youtube_url(request_data.url):
return JSONResponse(status_code=400, content={"detail": "Invalid YouTube URL provided."})
video_id = extract_video_id(request_data.url)
if not video_id:
return JSONResponse(status_code=400, content={"detail": "Could not extract video ID from URL."})
transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id)
if not transcript_text:
logger.warning(f"[{task_id}] No transcript found for Fast Summary.")
error_content = {
"detail": "A transcript could not be found for this video. The 'Fast Summary' feature requires available captions. Please try the 'Advanced Summary' with a Gemini model."
}
return JSONResponse(status_code=404, content=error_content)
logger.info(f"[{task_id}] Transcript found, starting summary stream.")
full_prompt = f"{request_data.prompt}\n\nTranscript:\n{transcript_text}"
return StreamingResponse(
stream_custom_api_response(full_prompt, request_data.temperature),
media_type="text/plain; charset=utf-8"
)
@app.post("/gemini/submit_task", response_model=TaskSubmissionResponse)
async def submit_gemini_task(
request_data: GeminiTaskRequest,
background_tasks: BackgroundTasks,
http_request: Request
):
"""
Endpoint for 'Bring-Your-Own-Key' Gemini summarization.
Accepts a user's API key and model choice, and works whether a transcript is available or not.
"""
task_id = str(uuid.uuid4())
logger.info(f"[{task_id}] Received request for Gemini Summary. Model: {request_data.gemini_model}")
if not is_youtube_url(request_data.url):
raise HTTPException(status_code=400, detail="Invalid YouTube URL provided.")
tasks_db[task_id] = {
"status": "PENDING", "result": None, "error": None,
"submitted_at": datetime.now(timezone.utc),
"last_updated_at": datetime.now(timezone.utc),
"request_params": request_data.model_dump(exclude={'api_key'}) # Exclude key from DB
}
background_tasks.add_task(process_gemini_request_background, task_id, request_data)
return TaskSubmissionResponse(
task_id=task_id,
status="PENDING",
task_detail_url=str(http_request.url_for('get_gemini_task_status', task_id=task_id))
)
@app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse)
async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")):
task = tasks_db.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task ID not found.")
return TaskStatusResponse(**task, task_id=task_id)
@app.post("/chat", response_class=StreamingResponse)
async def direct_chat(payload: ChatPayload):
"""
A general-purpose chat endpoint that streams responses from the custom model.
"""
logger.info(f"Direct chat request: '{payload.message[:50]}...'")
return StreamingResponse(
stream_custom_api_response(payload.message, payload.temperature),
media_type="text/plain; charset=utf-8"
)
@app.get("/")
async def read_root():
return {"message": "Server Error 504."} |