File size: 12,707 Bytes
97f80b5
 
 
 
 
 
 
9595346
97f80b5
 
 
 
 
9595346
97f80b5
 
 
 
 
 
 
 
 
 
 
9ce8ff3
97f80b5
9ce8ff3
97f80b5
 
 
 
 
 
 
9ce8ff3
97f80b5
 
9ce8ff3
97f80b5
 
 
 
 
9595346
 
 
 
 
97f80b5
9595346
97f80b5
9595346
 
97f80b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9595346
 
 
97f80b5
 
 
 
 
 
 
 
 
 
9ce8ff3
97f80b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ce8ff3
97f80b5
 
9595346
97f80b5
 
 
 
 
 
9595346
97f80b5
 
 
 
 
 
9595346
 
97f80b5
 
 
9595346
97f80b5
9595346
97f80b5
9595346
97f80b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9595346
97f80b5
9595346
97f80b5
 
9595346
 
 
 
97f80b5
9595346
97f80b5
 
 
 
 
 
 
9595346
 
97f80b5
9595346
97f80b5
 
 
 
9595346
97f80b5
9595346
97f80b5
 
 
 
 
 
9595346
 
 
97f80b5
 
 
9595346
 
97f80b5
 
 
9595346
97f80b5
9595346
 
 
97f80b5
9595346
 
97f80b5
9595346
97f80b5
9595346
 
97f80b5
 
 
9595346
 
 
 
97f80b5
 
9595346
 
97f80b5
9595346
 
 
 
 
 
 
 
97f80b5
9595346
 
 
97f80b5
9ce8ff3
97f80b5
9595346
 
 
97f80b5
 
9595346
97f80b5
 
 
 
 
 
 
 
9595346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97f80b5
 
 
 
 
 
 
9595346
 
 
 
97f80b5
9595346
 
 
 
97f80b5
 
 
 
 
9595346
97f80b5
 
9595346
97f80b5
 
 
9595346
97f80b5
 
 
 
 
 
 
 
9595346
 
 
 
 
 
 
 
 
 
 
9ce8ff3
 
97f80b5
 
9595346
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import os
import re
import logging
import uuid
import time
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from typing import Optional, Dict, Any, List, AsyncGenerator
import asyncio
import subprocess
import tempfile

from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field

import openai # For your custom API
import google.generativeai as genai # For Gemini API
from google.generativeai.types import GenerationConfig

# --- Logging Configuration ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

# --- Configuration ---
CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1"
CUSTOM_API_MODEL_DEFAULT = "gemma3:27b"
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash-latest"
GEMINI_REQUEST_TIMEOUT_SECONDS = 300
MAX_TRANSCRIPT_CHARS = 750000
COOKIES_FILE_PATH = "private.txt"

# --- In-Memory Task Storage ---
tasks_db: Dict[str, Dict[str, Any]] = {}

# --- Pydantic Models ---
class ChatPayload(BaseModel):
    message: str
    temperature: float = Field(0.6, ge=0.0, le=1.0)

class SummarizeRequest(BaseModel):
    url: str
    prompt: str = "Please summarize the following video transcript."
    temperature: float = Field(0.6, ge=0.0, le=1.0)

class GeminiTaskRequest(BaseModel):
    url: str
    message: str
    gemini_model: str
    api_key: str = Field(..., description="User's Gemini API Key")

class TaskSubmissionResponse(BaseModel):
    task_id: str
    status: str
    task_detail_url: str

class TaskStatusResponse(BaseModel):
    task_id: str
    status: str
    submitted_at: datetime
    last_updated_at: datetime
    result: Optional[str] = None
    error: Optional[str] = None

# --- FastAPI App Initialization ---
app = FastAPI(
    title="YouTube Summarizer Backend",
    description="A dual-endpoint API for video summarization. Provides a 'Fast Summary' using a custom model and an advanced 'Gemini Summary' with user-provided keys.",
    version="4.0.0"
)

# --- Helper Functions ---
def is_youtube_url(url: Optional[str]) -> bool:
    if not url:
        return False
    youtube_regex = (
        r'(https?://)?(www\.)?'
        r'(youtube|youtu|youtube-nocookie)\.(com|be)/'
        r'(watch\?v=|embed/|v/|shorts/|.+\?v=)?([^&=%\?]{11})'
    )
    return re.match(youtube_regex, url) is not None

def extract_video_id(url: str) -> Optional[str]:
    if not is_youtube_url(url):
        return None
    patterns = [
        r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
        r'(?:embed\/|v\/|shorts\/)([0-9A-Za-z_-]{11}).*',
        r'youtu\.be\/([0-9A-Za-z_-]{11}).*'
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None

def parse_vtt_content(vtt_content: str) -> str:
    """
    Parses VTT subtitle content to extract text, removing timestamps, metadata, and duplicate lines.
    """
    lines = vtt_content.split('\n')
    text_lines = []
    seen_lines = set()
    for line in lines:
        line = line.strip()
        if (not line or line.startswith('WEBVTT') or line.startswith('NOTE') or '-->' in line or line.isdigit()):
            continue
        clean_line = re.sub(r'<[^>]+>', '', line)
        clean_line = re.sub(r'\s+', ' ', clean_line).strip()
        if not clean_line or clean_line in seen_lines:
            continue
        text_lines.append(clean_line)
        seen_lines.add(clean_line)
    return ' '.join(text_lines)

async def get_transcript_with_yt_dlp_cookies(video_id: str, task_id: str) -> Optional[str]:
    """
    Fetches transcript using yt-dlp, downloading the default available language.
    """
    logger.info(f"[{task_id}] Attempting transcript fetch for video ID: {video_id}.")
    if not os.path.exists(COOKIES_FILE_PATH):
        logger.error(f"[{task_id}] Cookies file not found at {COOKIES_FILE_PATH}.")
        return None

    try:
        video_url = f"https://www.youtube.com/watch?v={video_id}"
        with tempfile.TemporaryDirectory() as temp_dir:
            cmd = [
                "yt-dlp",
                "--skip-download",
                "--write-auto-subs",
                "--write-subs",
                "--sub-format", "vtt",
                "--cookies", COOKIES_FILE_PATH,
                "-o", os.path.join(temp_dir, "%(id)s.%(ext)s"),
                video_url
            ]

            result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60)
            if result.returncode != 0:
                logger.error(f"[{task_id}] yt-dlp failed. Stderr: {result.stderr}")
                return None

            downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.vtt')]
            if not downloaded_files:
                logger.warning(f"[{task_id}] No subtitle files found for video ID: {video_id}")
                return None

            subtitle_file_path = os.path.join(temp_dir, downloaded_files[0])
            with open(subtitle_file_path, 'r', encoding='utf-8') as f:
                subtitle_content = f.read()

            transcript_text = parse_vtt_content(subtitle_content)
            if not transcript_text:
                return None

            logger.info(f"[{task_id}] Transcript fetched. Length: {len(transcript_text)}")
            return transcript_text[:MAX_TRANSCRIPT_CHARS]
    except Exception as e:
        logger.error(f"[{task_id}] Error fetching transcript: {e}", exc_info=True)
        return None

# --- Internal Business Logic ---

async def stream_custom_api_response(message: str, temperature: float) -> AsyncGenerator[str, None]:
    """
    Reusable async generator to stream responses from the custom API.
    """
    custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY")
    custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT)
    custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT)

    if not custom_api_key_secret:
        logger.error("'CUSTOM_API_SECRET_KEY' is not configured.")
        yield '{"error": "Backend API key is not configured."}'
        return

    try:
        from openai import AsyncOpenAI
        client = AsyncOpenAI(api_key=custom_api_key_secret, base_url=custom_api_base_url, timeout=60.0)
        stream = await client.chat.completions.create(
            model=custom_api_model,
            temperature=temperature,
            messages=[{"role": "user", "content": message}],
            stream=True
        )
        async for chunk in stream:
            if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    except Exception as e:
        logger.error(f"Error streaming from Custom API: {e}", exc_info=True)
        yield f'{{"error": "Error processing with the summary model: {str(e)}"}}'

async def process_gemini_request_background(task_id: str, request_data: GeminiTaskRequest):
    """
    Background process for the Gemini endpoint. Fetches transcript if possible,
    otherwise uses the URL, and calls the Gemini API with the user's key.
    """
    tasks_db[task_id]["status"] = "PROCESSING"
    tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)
    logger.info(f"[{task_id}] Starting background Gemini processing for URL: {request_data.url}")

    video_id = extract_video_id(request_data.url)
    transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id) if video_id else None

    try:
        genai.configure(api_key=request_data.api_key)
        model_instance = genai.GenerativeModel(model_name=request_data.gemini_model)
        
        content_parts = [{"text": request_data.message}]
        if transcript_text:
            logger.info(f"[{task_id}] Sending transcript to Gemini.")
            content_parts.append({"text": f"\n\nVideo Transcript:\n{transcript_text}"})
        else:
            logger.info(f"[{task_id}] No transcript found. Sending video URL to Gemini for multimodal processing.")
            content_parts.append({"file_data": {"mime_type": "video/youtube", "file_uri": request_data.url}})

        response = await model_instance.generate_content_async(
            [{"parts": content_parts}],
            stream=False,
            request_options={"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS}
        )
        
        full_response_text = getattr(response, 'text', '')
        tasks_db[task_id]["status"] = "COMPLETED"
        tasks_db[task_id]["result"] = full_response_text
        logger.info(f"[{task_id}] Gemini processing successful.")

    except Exception as e:
        logger.error(f"[{task_id}] Error during Gemini processing: {e}", exc_info=True)
        tasks_db[task_id]["status"] = "FAILED"
        tasks_db[task_id]["error"] = str(e)
    
    finally:
        tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc)

# --- API Endpoints ---

@app.post("/summarize", response_class=StreamingResponse)
async def fast_summary(request_data: SummarizeRequest):
    """
    Endpoint for a fast summary using the built-in custom model.
    Requires a transcript to be available for the video.
    """
    task_id = f"summary-{uuid.uuid4()}" # A temporary ID for logging
    logger.info(f"[{task_id}] Received request for Fast Summary: {request_data.url}")

    if not is_youtube_url(request_data.url):
        return JSONResponse(status_code=400, content={"detail": "Invalid YouTube URL provided."})

    video_id = extract_video_id(request_data.url)
    if not video_id:
        return JSONResponse(status_code=400, content={"detail": "Could not extract video ID from URL."})

    transcript_text = await get_transcript_with_yt_dlp_cookies(video_id, task_id)
    if not transcript_text:
        logger.warning(f"[{task_id}] No transcript found for Fast Summary.")
        error_content = {
            "detail": "A transcript could not be found for this video. The 'Fast Summary' feature requires available captions. Please try the 'Advanced Summary' with a Gemini model."
        }
        return JSONResponse(status_code=404, content=error_content)

    logger.info(f"[{task_id}] Transcript found, starting summary stream.")
    full_prompt = f"{request_data.prompt}\n\nTranscript:\n{transcript_text}"
    return StreamingResponse(
        stream_custom_api_response(full_prompt, request_data.temperature),
        media_type="text/plain; charset=utf-8"
    )

@app.post("/gemini/submit_task", response_model=TaskSubmissionResponse)
async def submit_gemini_task(
    request_data: GeminiTaskRequest,
    background_tasks: BackgroundTasks,
    http_request: Request
):
    """
    Endpoint for 'Bring-Your-Own-Key' Gemini summarization.
    Accepts a user's API key and model choice, and works whether a transcript is available or not.
    """
    task_id = str(uuid.uuid4())
    logger.info(f"[{task_id}] Received request for Gemini Summary. Model: {request_data.gemini_model}")

    if not is_youtube_url(request_data.url):
        raise HTTPException(status_code=400, detail="Invalid YouTube URL provided.")

    tasks_db[task_id] = {
        "status": "PENDING", "result": None, "error": None,
        "submitted_at": datetime.now(timezone.utc),
        "last_updated_at": datetime.now(timezone.utc),
        "request_params": request_data.model_dump(exclude={'api_key'}) # Exclude key from DB
    }

    background_tasks.add_task(process_gemini_request_background, task_id, request_data)

    return TaskSubmissionResponse(
        task_id=task_id,
        status="PENDING",
        task_detail_url=str(http_request.url_for('get_gemini_task_status', task_id=task_id))
    )

@app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse)
async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")):
    task = tasks_db.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task ID not found.")
    return TaskStatusResponse(**task, task_id=task_id)

@app.post("/chat", response_class=StreamingResponse)
async def direct_chat(payload: ChatPayload):
    """
    A general-purpose chat endpoint that streams responses from the custom model.
    """
    logger.info(f"Direct chat request: '{payload.message[:50]}...'")
    return StreamingResponse(
        stream_custom_api_response(payload.message, payload.temperature),
        media_type="text/plain; charset=utf-8"
    )

@app.get("/")
async def read_root():
    return {"message": "Server Error 504."}