|
import sys |
|
import time |
|
from fastapi import FastAPI, BackgroundTasks, Request, HTTPException, Security |
|
from fastapi.security import APIKeyHeader |
|
from fastapi.responses import FileResponse |
|
from fastapi.concurrency import run_in_threadpool |
|
import yt_dlp |
|
import ffmpeg |
|
import urllib.parse |
|
import os |
|
from datetime import datetime, timedelta |
|
import schedule |
|
import requests |
|
import uvicorn |
|
import subprocess |
|
import json |
|
from dotenv import load_dotenv |
|
import mimetypes |
|
import tempfile |
|
from PIL import Image |
|
from io import BytesIO |
|
from pathlib import Path |
|
from fastapi.staticfiles import StaticFiles |
|
from collections import defaultdict |
|
from starlette.responses import JSONResponse |
|
import logging |
|
import gc |
|
from typing import Dict, Any |
|
import re |
|
import asyncio |
|
import cloudscraper |
|
import httpx |
|
from bs4 import BeautifulSoup |
|
from pydantic import BaseModel |
|
|
|
tmp_dir = tempfile.gettempdir() |
|
BASE_URL = "https://chrunos-zam.hf.space" |
|
|
|
|
|
def env_to_cookies(env_content: str, output_file: str) -> None: |
|
"""Convert environment variable content back to cookie file""" |
|
try: |
|
|
|
if '="' not in env_content: |
|
raise ValueError("Invalid env content format") |
|
|
|
content = env_content.split('="', 1)[1].strip('"') |
|
|
|
|
|
cookie_content = content.replace('\\n', '\n') |
|
|
|
|
|
with open(output_file, 'w') as f: |
|
f.write(cookie_content) |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error converting to cookie file: {str(e)}") |
|
|
|
def save_to_env_file(env_content: str, env_file: str = '.env') -> None: |
|
"""Save environment variable content to .env file""" |
|
try: |
|
with open(env_file, 'w') as f: |
|
f.write(env_content) |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error saving to env file: {str(e)}") |
|
|
|
def env_to_cookies_from_env(output_file: str) -> None: |
|
"""Convert environment variable from .env file to cookie file""" |
|
try: |
|
load_dotenv() |
|
env_content = os.getenv('FIREFOX_COOKIES') |
|
|
|
if not env_content: |
|
raise ValueError("FIREFOX_COOKIES not found in .env file") |
|
|
|
env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file) |
|
except Exception as e: |
|
raise ValueError(f"Error converting to cookie file: {str(e)}") |
|
|
|
def get_cookies(): |
|
"""Get cookies from environment variable""" |
|
load_dotenv() |
|
cookie_content = os.getenv('FIREFOX_COOKIES') |
|
|
|
if not cookie_content: |
|
raise ValueError("FIREFOX_COOKIES environment variable not set") |
|
return cookie_content |
|
|
|
def create_temp_cookie_file(): |
|
"""Create temporary cookie file from environment variable""" |
|
temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') |
|
try: |
|
cookie_content = get_cookies() |
|
|
|
cookie_content = cookie_content.replace('\\n', '\n') |
|
temp_cookie.write() |
|
temp_cookie.flush() |
|
return Path(temp_cookie.name) |
|
finally: |
|
temp_cookie.close() |
|
|
|
load_dotenv() |
|
app = FastAPI() |
|
|
|
|
|
|
|
@app.get('/') |
|
def main(): |
|
return "API Is Running. If you want to use this API, contact Cody from chrunos.com" |
|
|
|
@app.get("/get_video_url") |
|
async def get_video_url(youtube_url: str): |
|
try: |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
ydl_opts["cookiefile"] = "firefox-cookies.txt" |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(youtube_url, download=False) |
|
return info |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
global_download_dir = tempfile.mkdtemp() |
|
|
|
|
|
|
|
class RateLimiter: |
|
def __init__(self, max_requests: int, time_window: timedelta): |
|
self.max_requests = max_requests |
|
self.time_window = time_window |
|
self.requests: Dict[str, list] = defaultdict(list) |
|
|
|
def _cleanup_old_requests(self, user_ip: str) -> None: |
|
"""Remove requests that are outside the time window.""" |
|
current_time = time.time() |
|
self.requests[user_ip] = [ |
|
timestamp for timestamp in self.requests[user_ip] |
|
if current_time - timestamp < self.time_window.total_seconds() |
|
] |
|
|
|
def is_rate_limited(self, user_ip: str) -> bool: |
|
"""Check if the user has exceeded their rate limit.""" |
|
self._cleanup_old_requests(user_ip) |
|
|
|
|
|
current_count = len(self.requests[user_ip]) |
|
|
|
|
|
current_time = time.time() |
|
self.requests[user_ip].append(current_time) |
|
|
|
|
|
return (current_count + 1) > self.max_requests |
|
|
|
def get_current_count(self, user_ip: str) -> int: |
|
"""Get the current request count for an IP.""" |
|
self._cleanup_old_requests(user_ip) |
|
return len(self.requests[user_ip]) |
|
|
|
|
|
|
|
rate_limiter = RateLimiter( |
|
max_requests=20, |
|
time_window=timedelta(days=1) |
|
) |
|
|
|
def get_user_ip(request: Request) -> str: |
|
"""Helper function to get user's IP address.""" |
|
forwarded = request.headers.get("X-Forwarded-For") |
|
if forwarded: |
|
return forwarded.split(",")[0] |
|
return request.client.host |
|
|
|
|
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
|
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
|
|
api_rotator = ApiRotator([ |
|
"https://dwnld.nichind.dev", |
|
"https://cobalt-api.kwiatekmiki.com", |
|
"https://yt.edd1e.xyz/", |
|
"https://cobalt-api.ayo.tf", |
|
"https://cblt.fariz.dev" |
|
]) |
|
|
|
|
|
|
|
async def get_track_download_url(video_url: str) -> str: |
|
apis = api_rotator.get_prioritized_apis() |
|
session = cloudscraper.create_scraper() |
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
} |
|
|
|
for i, api_url in enumerate(apis): |
|
try: |
|
logger.info(f"Attempting to get download URL from: {api_url}") |
|
y_url = video_url |
|
response = session.post( |
|
api_url, |
|
timeout=20, |
|
json={"url": y_url, "videoQuality": "720", "filenameStyle": "pretty"}, |
|
headers=headers |
|
) |
|
logger.info(f"Response status: {response.status_code}") |
|
logger.info(f"Response content: {response.content}") |
|
|
|
if response.headers.get('content-type', '').startswith('application/json'): |
|
json_response = response.json() |
|
error_code = json_response.get("error", {}).get("code", "") |
|
|
|
if error_code == "error.api.content.video.unavailable": |
|
logger.warning(f"Video unavailable error from {api_url}") |
|
break |
|
|
|
if "url" in json_response: |
|
api_rotator.update_last_successful(i) |
|
return json_response["url"] |
|
|
|
except Exception as e: |
|
logger.error(f"Failed with {api_url}: {str(e)}") |
|
continue |
|
|
|
logger.error(f"No download URL found") |
|
return {"error": "Download URL not found"} |
|
|
|
|
|
def jio_search(query: str, quality: str) -> str: |
|
try: |
|
|
|
api_url = f"https://saavn.dev/api/search/songs?query={query}" |
|
session = cloudscraper.create_scraper() |
|
|
|
response = session.get(api_url) |
|
|
|
response.raise_for_status() |
|
|
|
data = response.json().get("data") |
|
if not data: |
|
logger.error("No data found in the response.") |
|
raise HTTPException(status_code=404, detail="No data found for the given query.") |
|
|
|
song_results = data.get("results") |
|
if not song_results or len(song_results) == 0: |
|
logger.error("No song results found in the response.") |
|
raise HTTPException(status_code=404, detail="No song results found for the given query.") |
|
|
|
for song in song_results: |
|
download_urls = song.get("downloadUrl") |
|
if download_urls: |
|
for download_url in download_urls: |
|
if download_url.get("quality") == quality: |
|
return download_url.get("url") |
|
logger.error(f"No download URL found for quality {quality} in the search results for query {query}.") |
|
raise HTTPException(status_code=404, detail=f"No download URL found for quality {quality} in the search results for query {query}.") |
|
except cloudscraper.exceptions.CloudflareChallengeError as cf_err: |
|
logger.error(f"Cloudflare challenge error while searching for {query}: {cf_err}") |
|
raise HTTPException(status_code=503, detail="Cloudflare challenge failed") |
|
except HTTPException: |
|
|
|
raise |
|
except Exception as e: |
|
logger.error(f"Error while searching for {query}: {e}") |
|
raise HTTPException(status_code=500, detail=f"An error occurred while searching: {str(e)}") |
|
|
|
|
|
def jio_fetch(url: str, quality: str) -> str: |
|
try: |
|
|
|
api_url = f"https://saavn.dev/api/songs?link={url}" |
|
session = cloudscraper.create_scraper() |
|
|
|
response = session.get(api_url) |
|
|
|
response.raise_for_status() |
|
data = response.json() |
|
song_data = data.get("data") |
|
if not song_data or len(song_data) == 0: |
|
logger.error("No data found in the response.") |
|
raise HTTPException(status_code=404, detail="No data found for the given URL.") |
|
download_urls = song_data[0].get("downloadUrl") |
|
if not download_urls: |
|
logger.error("No download URLs found in the response.") |
|
raise HTTPException(status_code=404, detail="No download URLs found for the given song.") |
|
for download_url in download_urls: |
|
if download_url.get("quality") == quality: |
|
return download_url.get("url") |
|
logger.error(f"No download URL found for quality {quality}.") |
|
raise HTTPException(status_code=404, detail=f"No download URL found for quality {quality}.") |
|
except cloudscraper.exceptions.CloudflareChallengeError as cf_err: |
|
logger.error(f"Cloudflare challenge error while fetching {url}: {cf_err}") |
|
raise HTTPException(status_code=503, detail="Cloudflare challenge failed") |
|
except HTTPException: |
|
|
|
raise |
|
except Exception as e: |
|
logger.error(f"Error while fetching {url}: {e}") |
|
raise HTTPException(status_code=500, detail=f"An error occurred while fetching: {str(e)}") |
|
|
|
|
|
class JioDownloadRequest(BaseModel): |
|
url: str = None |
|
query: str = None |
|
quality: str = None |
|
|
|
@app.post("/jio_dl") |
|
async def jio_download(request: JioDownloadRequest): |
|
try: |
|
url = request.url |
|
query = request.query |
|
quality = request.quality |
|
if url and quality: |
|
logger.info(f'input url: {url}') |
|
download_url = jio_fetch(url, quality) |
|
return {"download_url": download_url} |
|
elif query: |
|
logger.info(f'input query: {query}') |
|
download_url = jio_search(query, quality) |
|
return {"download_url": download_url} |
|
else: |
|
logger.error("Missing 'url' and 'quality' or 'query' in request data.") |
|
raise HTTPException(status_code=400, detail="Missing 'url' and 'quality' or 'query' in request data") |
|
except HTTPException: |
|
|
|
raise |
|
except Exception as e: |
|
logger.error(f"Error in jio_download: {e}") |
|
raise HTTPException(status_code=500, detail=f"An error occurred during the operation: {str(e)}") |
|
|
|
|
|
@app.post("/jio_dls") |
|
async def jio_download(request: JioDownloadRequest): |
|
try: |
|
url = request.url |
|
quality = request.quality |
|
query = request.query |
|
if quality == '320kbps': |
|
return { |
|
"error": "Quality 320kbps is for Premium users only", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
} |
|
if url and quality: |
|
logger.info(f'input url: {url}') |
|
download_url = jio_fetch(url, quality) |
|
return {"download_url": download_url} |
|
elif query: |
|
logger.info(f'input query: {query}') |
|
download_url = jio_search(query, quality) |
|
return {"download_url": download_url} |
|
else: |
|
logger.error("Missing 'url' and 'quality' or 'query' in request data.") |
|
raise HTTPException(status_code=400, detail="Missing 'url' and 'quality' or 'query' in request data") |
|
except HTTPException: |
|
|
|
raise |
|
except Exception as e: |
|
logger.error(f"Error in jio_download: {e}") |
|
raise HTTPException(status_code=500, detail=f"An error occurred during the operation: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EXTRACT_API = os.getenv("EXTRACT_API") |
|
ALT_API = os.getenv("ALT_API") |
|
|
|
|
|
def extract_video_info(video_url: str) -> str: |
|
api_urls = [f'{ALT_API}?url={video_url}', f'{EXTRACT_API}?url={video_url}'] |
|
for api_url in api_urls: |
|
logger.info(api_url) |
|
session = cloudscraper.create_scraper() |
|
try: |
|
response = session.get(api_url, timeout=20) |
|
|
|
if response.status_code == 200: |
|
json_response = response.json() |
|
result = [] |
|
if 'formats' in json_response: |
|
for format_item in json_response['formats']: |
|
format_url = format_item.get('url') |
|
format_id = format_item.get('format_id') |
|
p_cookies = format_item.get('cookies') |
|
if format_id and format_url: |
|
result.append({ |
|
"url": format_url, |
|
"format_id": format_id, |
|
"cookies": p_cookies |
|
}) |
|
title = json_response.get('title') |
|
logger.info(title) |
|
if "pornhub.com" in video_url: |
|
p_result = [item for item in result if 'hls' in item['format_id']] |
|
return p_result |
|
else: |
|
if len(result) == 1: |
|
new_item = { |
|
"format_id": "This is Fake, Don't Choose This One", |
|
"url": "none" |
|
} |
|
result.append(new_item) |
|
return result |
|
else: |
|
return {"error": "No formats available"} |
|
else: |
|
logger.warning(f"Request failed with status code {response.status_code}, API: {api_url}") |
|
except Exception as e: |
|
logger.error(f"An error occurred: {e}") |
|
return {"error": "Both APIs failed to provide valid results."} |
|
|
|
|
|
@app.post("/test") |
|
async def test_download(request: Request): |
|
data = await request.json() |
|
video_url = data.get('url') |
|
response = extract_video_info(video_url) |
|
return response |
|
|
|
|
|
@app.post("/hls") |
|
async def download_hls_video(request: Request): |
|
data = await request.json() |
|
hls_url = data.get('url') |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
|
ydl_opts = { |
|
'format': 'best', |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'merge_output_format': 'mp4' |
|
} |
|
|
|
try: |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url])) |
|
except Exception as e: |
|
return {"error": f"Download failed: {str(e)}"} |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
gc.collect() |
|
return {"url": download_url} |
|
|
|
|
|
async def get_audio_download_url(track_id: str, quality: str) -> str: |
|
if quality == 'mp3': |
|
type = 'audio' |
|
quality = 128 |
|
else: |
|
type = 'video' |
|
donwnload_url = f'https://chrunos-shadl.hf.space/yt/dl?url={track_id}&type={type}&quality={quality}' |
|
return donwnload_url |
|
|
|
|
|
@app.post("/maxs") |
|
async def download_high_quality_video(request: Request): |
|
user_ip = get_user_ip(request) |
|
|
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"url": "https://t.me/chrunoss" |
|
} |
|
) |
|
|
|
|
|
data = await request.json() |
|
restricted_domain = "chrunos.com" |
|
video_url = data.get('url') |
|
quality = data.get('quality', '720') |
|
logger.info(f'input url: {video_url}, {quality}') |
|
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None |
|
if video_url and restricted_domain in video_url: |
|
return {"error": "What is wrong with you?", "url": "https://t.me/chrunoss"} |
|
|
|
|
|
if int(quality) > 720: |
|
error_message = "Quality above 720p is for Premium Members Only. Please check the URL for more information." |
|
help_url = "https://chrunos.com/premium-shortcuts/" |
|
return {"error": error_message, "url": help_url} |
|
|
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
|
|
|
height_map = { |
|
'240': 240, |
|
'360': 360, |
|
'480': 480, |
|
'720': 720, |
|
'1080': 1080 |
|
} |
|
max_height = height_map.get(quality, 720) |
|
|
|
|
|
format_str = f'bestvideo[height<={max_height}][vcodec^=avc]+bestaudio/best' |
|
|
|
ydl_opts = { |
|
'format': format_str, |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'merge_output_format': 'mp4' |
|
} |
|
|
|
if is_youtube_url: |
|
dl_url = await get_audio_download_url(video_url, quality) |
|
if dl_url and "http" in dl_url: |
|
return {"url": dl_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
|
else: |
|
return { |
|
"error": "Failed to Fetch the video." |
|
} |
|
|
|
else: |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
|
|
|
|
gc.collect() |
|
|
|
return {"url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
api_key_header = APIKeyHeader(name="X-API-Key") |
|
|
|
|
|
API_KEY = os.getenv("API_KEY") |
|
|
|
|
|
async def verify_api_key(api_key: str = Security(api_key_header)): |
|
if api_key != API_KEY: |
|
raise HTTPException( |
|
status_code=403, |
|
detail="Invalid API key" |
|
) |
|
return api_key |
|
|
|
@app.post("/audio") |
|
async def download_audio( |
|
request: Request |
|
|
|
): |
|
user_ip = get_user_ip(request) |
|
|
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"url": "https://t.me/chrunoss" |
|
} |
|
) |
|
data = await request.json() |
|
video_url = data.get('url') |
|
|
|
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
|
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '192' |
|
}] |
|
|
|
} |
|
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None |
|
if is_youtube_url: |
|
dl_url = await get_audio_download_url(video_url, 'mp3') |
|
if dl_url and "http" in dl_url: |
|
return {"url": dl_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
|
else: |
|
return { |
|
"error": "Failed to Fetch the video." |
|
} |
|
else: |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.*")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
gc.collect() |
|
return {"url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
@app.post("/search") |
|
async def search_and_download_song(request: Request, |
|
api_key: str = Security(verify_api_key) |
|
): |
|
data = await request.json() |
|
song_name = data.get('songname') |
|
artist_name = data.get('artist') |
|
if artist_name: |
|
search_query = f"ytsearch:{song_name} {artist_name}" |
|
else: |
|
search_query = f"ytsearch:{song_name}" |
|
|
|
logging.info(f"Search query: {search_query}") |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '192' |
|
}], |
|
'cookiefile': cookiefile |
|
} |
|
|
|
try: |
|
logging.info("Starting yt-dlp search and download...") |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([search_query])) |
|
logging.info("yt-dlp search and download completed") |
|
except yt_dlp.utils.DownloadError as e: |
|
error_message = str(e) |
|
logging.error(f"yt-dlp error: {error_message}") |
|
return JSONResponse(content={"error": error_message}, status_code=500) |
|
except Exception as e: |
|
error_message = str(e) |
|
logging.error(f"General error: {error_message}") |
|
return JSONResponse(content={"error": error_message}, status_code=500) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3")) |
|
if not downloaded_files: |
|
logging.error("Download failed: No MP3 files found") |
|
return JSONResponse(content={"error": "Download failed"}, status_code=500) |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
logging.info(f"Download URL: {download_url}") |
|
|
|
|
|
logging.info("Preparing to send response back to the client") |
|
gc.collect() |
|
return JSONResponse(content={"url": download_url}, status_code=200) |
|
|
|
|
|
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.middleware("http") |
|
async def set_mime_type_middleware(request: Request, call_next): |
|
response = await call_next(request) |
|
if request.url.path.endswith(".mp4"): |
|
response.headers["Content-Type"] = "video/mp4" |
|
return response |
|
|
|
|