import sys import time from fastapi import FastAPI, BackgroundTasks, Request, HTTPException from fastapi.responses import FileResponse from fastapi.concurrency import run_in_threadpool import yt_dlp import ffmpeg import urllib.parse import os from datetime import datetime import schedule import requests import uvicorn import subprocess import json from dotenv import load_dotenv import mimetypes import tempfile from mutagen.mp4 import MP4, MP4Cover, MP4FreeForm from mutagen.mp3 import MP3 from mutagen.id3 import ID3, USLT, SYLT, Encoding, APIC, TIT2, TPE1, TALB, TPE2, TDRC, TCON, TRCK, COMM from mutagen.oggvorbis import OggVorbis from mutagen.oggopus import OggOpus from mutagen.flac import FLAC, Picture from PIL import Image from io import BytesIO from pathlib import Path from fastapi.staticfiles import StaticFiles tmp_dir = tempfile.gettempdir() BASE_URL = "https://chrunos-multi.hf.space" MP4_TAGS_MAP = { "album": "\xa9alb", "album_artist": "aART", "artist": "\xa9ART", "composer": "\xa9wrt", "copyright": "cprt", "lyrics": "\xa9lyr", "comment": "desc", "media_type": "stik", "producer": "\xa9prd", "rating": "rtng", "release_date": "\xa9day", "title": "\xa9nam", "url": "\xa9url", } def env_to_cookies(env_content: str, output_file: str) -> None: """Convert environment variable content back to cookie file""" try: # Extract content from env format if '="' not in env_content: raise ValueError("Invalid env content format") content = env_content.split('="', 1)[1].strip('"') # Replace escaped newlines with actual newlines cookie_content = content.replace('\\n', '\n') # Write to cookie file with open(output_file, 'w') as f: f.write(cookie_content) except Exception as e: raise ValueError(f"Error converting to cookie file: {str(e)}") def save_to_env_file(env_content: str, env_file: str = '.env') -> None: """Save environment variable content to .env file""" try: with open(env_file, 'w') as f: f.write(env_content) #print(f"Successfully saved to {env_file}") except Exception as e: raise ValueError(f"Error saving to env file: {str(e)}") def env_to_cookies_from_env(output_file: str) -> None: """Convert environment variable from .env file to cookie file""" try: load_dotenv() # Load from .env file env_content = os.getenv('FIREFOX_COOKIES') #print(f"Printing env content: \n{env_content}") if not env_content: raise ValueError("FIREFOX_COOKIES not found in .env file") env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file) except Exception as e: raise ValueError(f"Error converting to cookie file: {str(e)}") def get_cookies(): """Get cookies from environment variable""" load_dotenv() cookie_content = os.getenv('FIREFOX_COOKIES') #print(cookie_content) if not cookie_content: raise ValueError("FIREFOX_COOKIES environment variable not set") return cookie_content def create_temp_cookie_file(): """Create temporary cookie file from environment variable""" temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') try: cookie_content = get_cookies() # Replace escaped newlines with actual newlines cookie_content = cookie_content.replace('\\n', '\n') temp_cookie.write() temp_cookie.flush() return Path(temp_cookie.name) finally: temp_cookie.close() load_dotenv() app = FastAPI() ydl_opts = { 'format': 'best', 'quiet': True, #'outtmpl': f'{VIDEO_DIR}/%(id)s.%(ext)s', 'max_filesize': 50 * 1024 * 1024 } @app.get('/') def main(): return "Chrunos Downloader API Is Running." @app.get("/get_video_url") async def get_video_url(youtube_url: str): try: cookiefile = "firefox-cookies.txt" env_to_cookies_from_env("firefox-cookies.txt") # Add cookies ydl_opts["cookiefile"] = "firefox-cookies.txt" #create_temp_cookie_file() with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(youtube_url, download=False) return info except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Define a global temporary download directory global_download_dir = tempfile.mkdtemp() @app.post("/high") async def download_high_quality_video(request: Request): data = await request.json() video_url = data.get('url') cookiefile = "firefox-cookies.txt" env_to_cookies_from_env("firefox-cookies.txt") # Generate a unique output filename using timestamp timestamp = datetime.now().strftime('%Y%m%d%H%M%S') output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') ydl_opts = { 'format': 'bestvideo[height<=720]+bestaudio/best', 'outtmpl': output_template, 'quiet': True, 'no_warnings': True, 'noprogress': True, 'merge_output_format': 'mp4' } ydl_opts["cookiefile"] = "firefox-cookies.txt" #create_temp_cookie_file() await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) # Find the downloaded file (with the title as filename) downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) if not downloaded_files: return {"error": "Download failed"} # Assume there is only one matching file downloaded_file = downloaded_files[0] encoded_filename = urllib.parse.quote(downloaded_file.name) download_url = f"{BASE_URL}/file/{encoded_filename}" return {"url": download_url} @app.post("/max") async def download_high_quality_video(request: Request): data = await request.json() video_url = data.get('url') quality = data.get('quality', '1080') # Default to 1080p if not specified # Check if the requested quality is above 1080p if int(quality) > 1080: error_message = "Quality above 1080p is for premium users. Please check the URL for more information." help_url = "https://chrunos.com/premium-shortcuts/" # Replace with your actual URL return {"error": error_message, "url": help_url} cookiefile = "firefox-cookies.txt" env_to_cookies_from_env("firefox-cookies.txt") timestamp = datetime.now().strftime('%Y%m%d%H%M%S') output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') # Convert quality string to height height_map = { '480': 480, '720': 720, '1080': 1080 } max_height = height_map.get(quality, 1080) # Use the quality variable correctly # Determine format string based on quality format_str = f'bestvideo[height<={max_height}][vcodec^=avc]+bestaudio/best' ydl_opts = { 'format': format_str, 'outtmpl': output_template, 'quiet': True, 'no_warnings': True, 'noprogress': True, 'merge_output_format': 'mp4', 'cookiefile': cookiefile } await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) if not downloaded_files: return {"error": "Download failed"} downloaded_file = downloaded_files[0] encoded_filename = urllib.parse.quote(downloaded_file.name) download_url = f"{BASE_URL}/file/{encoded_filename}" return {"url": download_url} @app.post("/audio") async def download_audio(request: Request): data = await request.json() video_url = data.get('url') cookiefile = "firefox-cookies.txt" env_to_cookies_from_env("firefox-cookies.txt") timestamp = datetime.now().strftime('%Y%m%d%H%M%S') output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': output_template, 'quiet': True, 'no_warnings': True, 'noprogress': True, 'cookiefile': cookiefile } await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.*")) if not downloaded_files: return {"error": "Download failed"} downloaded_file = downloaded_files[0] encoded_filename = urllib.parse.quote(downloaded_file.name) download_url = f"{BASE_URL}/file/{encoded_filename}" return {"url": download_url} # Mount the static files directory app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") @app.middleware("http") async def set_mime_type_middleware(request: Request, call_next): response = await call_next(request) if request.url.path.endswith(".mp4"): response.headers["Content-Type"] = "video/mp4" return response