zam / app.py
Chrunos's picture
Update app.py
dfc3dd7 verified
import sys
import time
from fastapi import FastAPI, BackgroundTasks, Request, HTTPException, Security
from fastapi.security import APIKeyHeader
from fastapi.responses import FileResponse
from fastapi.concurrency import run_in_threadpool
import yt_dlp
import ffmpeg
import urllib.parse
import os
from datetime import datetime, timedelta
import schedule
import requests
import uvicorn
import subprocess
import json
from dotenv import load_dotenv
import mimetypes
import tempfile
from PIL import Image
from io import BytesIO
from pathlib import Path
from fastapi.staticfiles import StaticFiles
from collections import defaultdict
from starlette.responses import JSONResponse
import logging
import gc
from typing import Dict, Any
import re
import asyncio
import cloudscraper
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel
tmp_dir = tempfile.gettempdir()
BASE_URL = "https://chrunos-zam.hf.space"
def env_to_cookies(env_content: str, output_file: str) -> None:
"""Convert environment variable content back to cookie file"""
try:
# Extract content from env format
if '="' not in env_content:
raise ValueError("Invalid env content format")
content = env_content.split('="', 1)[1].strip('"')
# Replace escaped newlines with actual newlines
cookie_content = content.replace('\\n', '\n')
# Write to cookie file
with open(output_file, 'w') as f:
f.write(cookie_content)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def save_to_env_file(env_content: str, env_file: str = '.env') -> None:
"""Save environment variable content to .env file"""
try:
with open(env_file, 'w') as f:
f.write(env_content)
#print(f"Successfully saved to {env_file}")
except Exception as e:
raise ValueError(f"Error saving to env file: {str(e)}")
def env_to_cookies_from_env(output_file: str) -> None:
"""Convert environment variable from .env file to cookie file"""
try:
load_dotenv() # Load from .env file
env_content = os.getenv('FIREFOX_COOKIES')
#print(f"Printing env content: \n{env_content}")
if not env_content:
raise ValueError("FIREFOX_COOKIES not found in .env file")
env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def get_cookies():
"""Get cookies from environment variable"""
load_dotenv()
cookie_content = os.getenv('FIREFOX_COOKIES')
#print(cookie_content)
if not cookie_content:
raise ValueError("FIREFOX_COOKIES environment variable not set")
return cookie_content
def create_temp_cookie_file():
"""Create temporary cookie file from environment variable"""
temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt')
try:
cookie_content = get_cookies()
# Replace escaped newlines with actual newlines
cookie_content = cookie_content.replace('\\n', '\n')
temp_cookie.write()
temp_cookie.flush()
return Path(temp_cookie.name)
finally:
temp_cookie.close()
load_dotenv()
app = FastAPI()
@app.get('/')
def main():
return "API Is Running. If you want to use this API, contact Cody from chrunos.com"
@app.get("/get_video_url")
async def get_video_url(youtube_url: str):
try:
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
# Add cookies
ydl_opts["cookiefile"] = "firefox-cookies.txt" #create_temp_cookie_file()
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=False)
return info
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
class RateLimiter:
def __init__(self, max_requests: int, time_window: timedelta):
self.max_requests = max_requests
self.time_window = time_window
self.requests: Dict[str, list] = defaultdict(list)
def _cleanup_old_requests(self, user_ip: str) -> None:
"""Remove requests that are outside the time window."""
current_time = time.time()
self.requests[user_ip] = [
timestamp for timestamp in self.requests[user_ip]
if current_time - timestamp < self.time_window.total_seconds()
]
def is_rate_limited(self, user_ip: str) -> bool:
"""Check if the user has exceeded their rate limit."""
self._cleanup_old_requests(user_ip)
# Get current count after cleanup
current_count = len(self.requests[user_ip])
# Add current request timestamp (incrementing the count)
current_time = time.time()
self.requests[user_ip].append(current_time)
# Check if user has exceeded the maximum requests
return (current_count + 1) > self.max_requests
def get_current_count(self, user_ip: str) -> int:
"""Get the current request count for an IP."""
self._cleanup_old_requests(user_ip)
return len(self.requests[user_ip])
# Initialize rate limiter with 100 requests per day
rate_limiter = RateLimiter(
max_requests=20,
time_window=timedelta(days=1)
)
def get_user_ip(request: Request) -> str:
"""Helper function to get user's IP address."""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0]
return request.client.host
class ApiRotator:
def __init__(self, apis):
self.apis = apis
self.last_successful_index = None
def get_prioritized_apis(self):
if self.last_successful_index is not None:
# Move the last successful API to the front
rotated_apis = (
[self.apis[self.last_successful_index]] +
self.apis[:self.last_successful_index] +
self.apis[self.last_successful_index+1:]
)
return rotated_apis
return self.apis
def update_last_successful(self, index):
self.last_successful_index = index
# In your function:
api_rotator = ApiRotator([
"https://dwnld.nichind.dev",
"https://cobalt-api.kwiatekmiki.com",
"https://yt.edd1e.xyz/",
"https://cobalt-api.ayo.tf",
"https://cblt.fariz.dev"
])
async def get_track_download_url(video_url: str) -> str:
apis = api_rotator.get_prioritized_apis()
session = cloudscraper.create_scraper() # Requires cloudscraper package
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
for i, api_url in enumerate(apis):
try:
logger.info(f"Attempting to get download URL from: {api_url}")
y_url = video_url
response = session.post(
api_url,
timeout=20,
json={"url": y_url, "videoQuality": "720", "filenameStyle": "pretty"},
headers=headers
)
logger.info(f"Response status: {response.status_code}")
logger.info(f"Response content: {response.content}")
if response.headers.get('content-type', '').startswith('application/json'):
json_response = response.json()
error_code = json_response.get("error", {}).get("code", "")
if error_code == "error.api.content.video.unavailable":
logger.warning(f"Video unavailable error from {api_url}")
break # Only break for specific error
if "url" in json_response:
api_rotator.update_last_successful(i)
return json_response["url"]
except Exception as e:
logger.error(f"Failed with {api_url}: {str(e)}")
continue
logger.error(f"No download URL found")
return {"error": "Download URL not found"}
def jio_search(query: str, quality: str) -> str:
try:
# Construct the API URL
api_url = f"https://saavn.dev/api/search/songs?query={query}"
session = cloudscraper.create_scraper()
# Make the API request
response = session.get(api_url)
# Check if the request was successful
response.raise_for_status()
# Get the data from the response
data = response.json().get("data")
if not data:
logger.error("No data found in the response.")
raise HTTPException(status_code=404, detail="No data found for the given query.")
# Get the song results
song_results = data.get("results")
if not song_results or len(song_results) == 0:
logger.error("No song results found in the response.")
raise HTTPException(status_code=404, detail="No song results found for the given query.")
# Iterate through each song result
for song in song_results:
download_urls = song.get("downloadUrl")
if download_urls:
for download_url in download_urls:
if download_url.get("quality") == quality:
return download_url.get("url")
logger.error(f"No download URL found for quality {quality} in the search results for query {query}.")
raise HTTPException(status_code=404, detail=f"No download URL found for quality {quality} in the search results for query {query}.")
except cloudscraper.exceptions.CloudflareChallengeError as cf_err:
logger.error(f"Cloudflare challenge error while searching for {query}: {cf_err}")
raise HTTPException(status_code=503, detail="Cloudflare challenge failed")
except HTTPException:
# Re - raise the HTTPException if it's already raised
raise
except Exception as e:
logger.error(f"Error while searching for {query}: {e}")
raise HTTPException(status_code=500, detail=f"An error occurred while searching: {str(e)}")
def jio_fetch(url: str, quality: str) -> str:
try:
# Construct the API URL
api_url = f"https://saavn.dev/api/songs?link={url}"
session = cloudscraper.create_scraper()
# Make the API request
response = session.get(api_url)
# Check if the request was successful
response.raise_for_status()
data = response.json()
song_data = data.get("data")
if not song_data or len(song_data) == 0:
logger.error("No data found in the response.")
raise HTTPException(status_code=404, detail="No data found for the given URL.")
download_urls = song_data[0].get("downloadUrl")
if not download_urls:
logger.error("No download URLs found in the response.")
raise HTTPException(status_code=404, detail="No download URLs found for the given song.")
for download_url in download_urls:
if download_url.get("quality") == quality:
return download_url.get("url")
logger.error(f"No download URL found for quality {quality}.")
raise HTTPException(status_code=404, detail=f"No download URL found for quality {quality}.")
except cloudscraper.exceptions.CloudflareChallengeError as cf_err:
logger.error(f"Cloudflare challenge error while fetching {url}: {cf_err}")
raise HTTPException(status_code=503, detail="Cloudflare challenge failed")
except HTTPException:
# Re - raise the HTTPException if it's already raised
raise
except Exception as e:
logger.error(f"Error while fetching {url}: {e}")
raise HTTPException(status_code=500, detail=f"An error occurred while fetching: {str(e)}")
# Define the request model
class JioDownloadRequest(BaseModel):
url: str = None
query: str = None
quality: str = None
@app.post("/jio_dl")
async def jio_download(request: JioDownloadRequest):
try:
url = request.url
query = request.query
quality = request.quality
if url and quality:
logger.info(f'input url: {url}')
download_url = jio_fetch(url, quality)
return {"download_url": download_url}
elif query:
logger.info(f'input query: {query}')
download_url = jio_search(query, quality)
return {"download_url": download_url}
else:
logger.error("Missing 'url' and 'quality' or 'query' in request data.")
raise HTTPException(status_code=400, detail="Missing 'url' and 'quality' or 'query' in request data")
except HTTPException:
# Re - raise the HTTPException if it's already raised
raise
except Exception as e:
logger.error(f"Error in jio_download: {e}")
raise HTTPException(status_code=500, detail=f"An error occurred during the operation: {str(e)}")
@app.post("/jio_dls")
async def jio_download(request: JioDownloadRequest):
try:
url = request.url
quality = request.quality
query = request.query
if quality == '320kbps':
return {
"error": "Quality 320kbps is for Premium users only",
"premium": "https://chrunos.com/premium-shortcuts/"
}
if url and quality:
logger.info(f'input url: {url}')
download_url = jio_fetch(url, quality)
return {"download_url": download_url}
elif query:
logger.info(f'input query: {query}')
download_url = jio_search(query, quality)
return {"download_url": download_url}
else:
logger.error("Missing 'url' and 'quality' or 'query' in request data.")
raise HTTPException(status_code=400, detail="Missing 'url' and 'quality' or 'query' in request data")
except HTTPException:
# Re - raise the HTTPException if it's already raised
raise
except Exception as e:
logger.error(f"Error in jio_download: {e}")
raise HTTPException(status_code=500, detail=f"An error occurred during the operation: {str(e)}")
EXTRACT_API = os.getenv("EXTRACT_API")
ALT_API = os.getenv("ALT_API")
def extract_video_info(video_url: str) -> str:
api_urls = [f'{ALT_API}?url={video_url}', f'{EXTRACT_API}?url={video_url}']
for api_url in api_urls:
logger.info(api_url)
session = cloudscraper.create_scraper()
try:
response = session.get(api_url, timeout=20)
if response.status_code == 200:
json_response = response.json()
result = []
if 'formats' in json_response:
for format_item in json_response['formats']:
format_url = format_item.get('url')
format_id = format_item.get('format_id')
p_cookies = format_item.get('cookies')
if format_id and format_url:
result.append({
"url": format_url,
"format_id": format_id,
"cookies": p_cookies
})
title = json_response.get('title')
logger.info(title)
if "pornhub.com" in video_url:
p_result = [item for item in result if 'hls' in item['format_id']]
return p_result
else:
if len(result) == 1:
new_item = {
"format_id": "This is Fake, Don't Choose This One",
"url": "none"
}
result.append(new_item)
return result
else:
return {"error": "No formats available"}
else:
logger.warning(f"Request failed with status code {response.status_code}, API: {api_url}")
except Exception as e:
logger.error(f"An error occurred: {e}")
return {"error": "Both APIs failed to provide valid results."}
@app.post("/test")
async def test_download(request: Request):
data = await request.json()
video_url = data.get('url')
response = extract_video_info(video_url)
return response
@app.post("/hls")
async def download_hls_video(request: Request):
data = await request.json()
hls_url = data.get('url')
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
try:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url]))
except Exception as e:
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
async def get_audio_download_url(track_id: str, quality: str) -> str:
if quality == 'mp3':
type = 'audio'
quality = 128
else:
type = 'video'
donwnload_url = f'https://chrunos-shadl.hf.space/yt/dl?url={track_id}&type={type}&quality={quality}'
return donwnload_url
@app.post("/maxs")
async def download_high_quality_video(request: Request):
user_ip = get_user_ip(request)
if rate_limiter.is_rate_limited(user_ip):
current_count = rate_limiter.get_current_count(user_ip)
raise HTTPException(
status_code=429,
detail={
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.",
"url": "https://t.me/chrunoss"
}
)
data = await request.json()
restricted_domain = "chrunos.com"
video_url = data.get('url')
quality = data.get('quality', '720') # Default to 1080p if not specified
logger.info(f'input url: {video_url}, {quality}')
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None
if video_url and restricted_domain in video_url:
return {"error": "What is wrong with you?", "url": "https://t.me/chrunoss"}
# Check if the requested quality is above 1080p
if int(quality) > 720:
error_message = "Quality above 720p is for Premium Members Only. Please check the URL for more information."
help_url = "https://chrunos.com/premium-shortcuts/" # Replace with your actual URL
return {"error": error_message, "url": help_url}
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s')
# Convert quality string to height
height_map = {
'240': 240,
'360': 360,
'480': 480,
'720': 720,
'1080': 1080
}
max_height = height_map.get(quality, 720) # Use the quality variable correctly
# Determine format string based on quality
format_str = f'bestvideo[height<={max_height}][vcodec^=avc]+bestaudio/best'
ydl_opts = {
'format': format_str,
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
if is_youtube_url:
dl_url = await get_audio_download_url(video_url, quality)
if dl_url and "http" in dl_url:
return {"url": dl_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)}
else:
return {
"error": "Failed to Fetch the video."
}
else:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)}
api_key_header = APIKeyHeader(name="X-API-Key")
# Store this securely in your environment variables
API_KEY = os.getenv("API_KEY")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=403,
detail="Invalid API key"
)
return api_key
@app.post("/audio")
async def download_audio(
request: Request
#api_key: str = Security(verify_api_key)
):
user_ip = get_user_ip(request)
if rate_limiter.is_rate_limited(user_ip):
current_count = rate_limiter.get_current_count(user_ip)
raise HTTPException(
status_code=429,
detail={
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.",
"url": "https://t.me/chrunoss"
}
)
data = await request.json()
video_url = data.get('url')
#cookiefile = "firefox-cookies.txt"
#env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
#'cookiefile': cookiefile,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}]
}
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None
if is_youtube_url:
dl_url = await get_audio_download_url(video_url, 'mp3')
if dl_url and "http" in dl_url:
return {"url": dl_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)}
else:
return {
"error": "Failed to Fetch the video."
}
else:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.*"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)}
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.post("/search")
async def search_and_download_song(request: Request,
api_key: str = Security(verify_api_key)
):
data = await request.json()
song_name = data.get('songname')
artist_name = data.get('artist')
if artist_name:
search_query = f"ytsearch:{song_name} {artist_name}"
else:
search_query = f"ytsearch:{song_name}"
logging.info(f"Search query: {search_query}")
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}],
'cookiefile': cookiefile
}
try:
logging.info("Starting yt-dlp search and download...")
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([search_query]))
logging.info("yt-dlp search and download completed")
except yt_dlp.utils.DownloadError as e:
error_message = str(e)
logging.error(f"yt-dlp error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
except Exception as e:
error_message = str(e)
logging.error(f"General error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3"))
if not downloaded_files:
logging.error("Download failed: No MP3 files found")
return JSONResponse(content={"error": "Download failed"}, status_code=500)
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logging.info(f"Download URL: {download_url}")
# Log just before returning the response
logging.info("Preparing to send response back to the client")
gc.collect()
return JSONResponse(content={"url": download_url}, status_code=200)
# Mount the static files directory
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")
@app.middleware("http")
async def set_mime_type_middleware(request: Request, call_next):
response = await call_next(request)
if request.url.path.endswith(".mp4"):
response.headers["Content-Type"] = "video/mp4"
return response