|
import shutil |
|
from fastapi import FastAPI, HTTPException, Request |
|
from deezspot.deezloader import DeeLogin |
|
from deezspot.spotloader import SpoLogin |
|
import requests |
|
import os |
|
import logging |
|
import json |
|
from typing import Optional |
|
from fastapi.staticfiles import StaticFiles |
|
from dotenv import load_dotenv |
|
from pydantic import BaseModel |
|
from urllib.parse import quote |
|
from pathlib import Path |
|
import uuid |
|
from fastapi import BackgroundTasks |
|
from collections import defaultdict |
|
import time |
|
from datetime import timedelta |
|
import gc |
|
from typing import Dict |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI(title="Deezer API") |
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
os.makedirs("downloads", exist_ok=True) |
|
app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads") |
|
|
|
|
|
DEEZER_API_URL = "https://api.deezer.com" |
|
|
|
|
|
BASE_URL = "https://chrunos-depot.hf.space" |
|
|
|
|
|
ARL_TOKEN = os.getenv('ARL') |
|
|
|
|
|
class DownloadRequest(BaseModel): |
|
url: str |
|
quality: str |
|
arl: str |
|
|
|
|
|
def convert_deezer_short_link_async(short_link: str) -> str: |
|
try: |
|
response = requests.get(short_link, allow_redirects=True) |
|
return response.url |
|
except requests.RequestException as e: |
|
print(f"An error occurred: {e}") |
|
return "" |
|
|
|
|
|
@app.get("/") |
|
def read_root(): |
|
return {"message": "running"} |
|
|
|
|
|
|
|
def get_track_info(track_id: str): |
|
try: |
|
response = requests.get(f"{DEEZER_API_URL}/track/{track_id}") |
|
if response.status_code != 200: |
|
raise HTTPException(status_code=404, detail="Track not found") |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error fetching track metadata: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except Exception as e: |
|
logger.error(f"Error fetching track metadata: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
@app.get("/track/{track_id}") |
|
def get_track(track_id: str): |
|
return get_track_info(track_id) |
|
|
|
|
|
|
|
class RateLimiter: |
|
def __init__(self, max_requests: int, time_window: timedelta): |
|
self.max_requests = max_requests |
|
self.time_window = time_window |
|
self.requests: Dict[str, list] = defaultdict(list) |
|
|
|
def _cleanup_old_requests(self, user_ip: str) -> None: |
|
"""Remove requests that are outside the time window.""" |
|
current_time = time.time() |
|
self.requests[user_ip] = [ |
|
timestamp for timestamp in self.requests[user_ip] |
|
if current_time - timestamp < self.time_window.total_seconds() |
|
] |
|
|
|
def is_rate_limited(self, user_ip: str) -> bool: |
|
"""Check if the user has exceeded their rate limit.""" |
|
self._cleanup_old_requests(user_ip) |
|
|
|
|
|
current_count = len(self.requests[user_ip]) |
|
|
|
|
|
current_time = time.time() |
|
self.requests[user_ip].append(current_time) |
|
|
|
|
|
return (current_count + 1) > self.max_requests |
|
|
|
def get_current_count(self, user_ip: str) -> int: |
|
"""Get the current request count for an IP.""" |
|
self._cleanup_old_requests(user_ip) |
|
return len(self.requests[user_ip]) |
|
|
|
|
|
|
|
rate_limiter = RateLimiter( |
|
max_requests=3, |
|
time_window=timedelta(days=1) |
|
) |
|
|
|
|
|
def get_user_ip(request: Request) -> str: |
|
"""Helper function to get user's IP address.""" |
|
forwarded = request.headers.get("X-Forwarded-For") |
|
if forwarded: |
|
return forwarded.split(",")[0] |
|
return request.client.host |
|
|
|
|
|
|
|
@app.post("/download/track") |
|
def download_track(request: Request, download_request: DownloadRequest): |
|
try: |
|
user_ip = get_user_ip(request) |
|
|
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"detail": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"help": "https://t.me/chrunoss" |
|
} |
|
) |
|
if download_request.arl is None or download_request.arl.strip() == "": |
|
ARL = ARL_TOKEN |
|
else: |
|
ARL = download_request.arl |
|
logger.info(f'arl: {ARL}') |
|
url = download_request.url |
|
if 'dzr.page' in url or 'deezer.page' in url: |
|
url = convert_deezer_short_link_async(url) |
|
quality = download_request.quality |
|
dl = DeeLogin(arl=ARL) |
|
|
|
if quality not in ["MP3_320", "MP3_128", "FLAC"]: |
|
raise HTTPException(status_code=400, detail="Invalid quality specified") |
|
|
|
|
|
track_id = url.split("/")[-1] |
|
|
|
|
|
track_info = get_track_info(track_id) |
|
track_link = track_info.get("link") |
|
if not track_link: |
|
raise HTTPException(status_code=404, detail="Track link not found") |
|
|
|
|
|
track_title = track_info.get("title", "track") |
|
artist_name = track_info.get("artist", {}).get("name", "unknown") |
|
file_extension = "flac" if quality == "FLAC" else "mp3" |
|
expected_filename = f"{artist_name} - {track_title}.{file_extension}".replace("/", "_") |
|
|
|
|
|
for root, dirs, files in os.walk("downloads"): |
|
for file in files: |
|
os.remove(os.path.join(root, file)) |
|
for dir in dirs: |
|
shutil.rmtree(os.path.join(root, dir)) |
|
|
|
|
|
try: |
|
|
|
dl.download_trackdee( |
|
link_track=track_link, |
|
output_dir="downloads", |
|
quality_download=quality, |
|
recursive_quality=False, |
|
recursive_download=False |
|
) |
|
except Exception as e: |
|
logger.error(f"Error downloading file: {e}") |
|
raise HTTPException(status_code=500, detail="File download failed") |
|
|
|
|
|
filepath = None |
|
for root, dirs, files in os.walk("downloads"): |
|
for file in files: |
|
if file.endswith(f'.{file_extension}'): |
|
filepath = os.path.join(root, file) |
|
break |
|
if filepath: |
|
break |
|
|
|
if not filepath: |
|
raise HTTPException(status_code=500, detail=f"{file_extension} file not found after download") |
|
if filepath: |
|
file_size = os.path.getsize(filepath) |
|
logger.info(f"Downloaded file size: {file_size} bytes") |
|
|
|
|
|
relative_path = quote(str(os.path.relpath(filepath, "downloads"))) |
|
|
|
download_url = f"{BASE_URL}/downloads/{relative_path}" |
|
logger.info(f"Download successful: {download_url}") |
|
gc.collect() |
|
return {"download_url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
|
except Exception as e: |
|
logger.error(f"Error downloading track: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
class AlbumRequest(BaseModel): |
|
id: str |
|
|
|
|
|
|
|
@app.post("/z_album") |
|
def fetch_album(request: AlbumRequest): |
|
album_id = request.id |
|
try: |
|
response = requests.get(f"{DEEZER_API_URL}/album/{album_id}") |
|
response.raise_for_status() |
|
album_data = response.json() |
|
tracks = album_data.get("tracks", {}).get("data", []) |
|
result = [] |
|
for track in tracks: |
|
title = track.get("title") |
|
link = track.get("link") |
|
if title and link: |
|
result.append({ |
|
"title": title, |
|
"link": link |
|
}) |
|
return result |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error fetching album: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except Exception as e: |
|
logger.error(f"Error fetching album: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
class PlaylistRequest(BaseModel): |
|
id: str |
|
|
|
|
|
|
|
@app.post("/z_playlist") |
|
def fetch_playlist(request: PlaylistRequest): |
|
playlist_id = request.id |
|
try: |
|
response = requests.get(f"{DEEZER_API_URL}/playlist/{playlist_id}") |
|
response.raise_for_status() |
|
playlist_data = response.json() |
|
tracks = playlist_data.get("tracks", {}).get("data", []) |
|
result = [] |
|
for track in tracks: |
|
title = track.get("title") |
|
link = track.get("link") |
|
if title and link: |
|
result.append({ |
|
"title": title, |
|
"link": link |
|
}) |
|
return result |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error fetching album: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except Exception as e: |
|
logger.error(f"Error fetching album: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
@app.get("/z_search") |
|
def search_tracks(query: str, limit: Optional[int] = 10): |
|
try: |
|
response = requests.get(f"{DEEZER_API_URL}/search", params={"q": query, "limit": limit}) |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error searching tracks: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except Exception as e: |
|
logger.error(f"Error searching tracks: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
'''SPOTIFY_USERNAME = os.getenv("SPOTIFY_USERNAME") |
|
SPOTIFY_CREDENTIALS = os.getenv("SPOTIFY_CREDENTIALS") |
|
|
|
if not SPOTIFY_USERNAME or not SPOTIFY_CREDENTIALS: |
|
raise RuntimeError("Spotify credentials not found in environment variables") |
|
|
|
# Create a temporary credentials.json file |
|
CREDENTIALS_PATH = "/tmp/credentials.json" |
|
with open(CREDENTIALS_PATH, "w") as f: |
|
json.dump({ |
|
"username": SPOTIFY_USERNAME, |
|
"credentials": SPOTIFY_CREDENTIALS, |
|
"type": "AUTHENTICATION_STORED_SPOTIFY_CREDENTIALS" |
|
}, f) |
|
|
|
# Initialize Spotify client |
|
spo = SpoLogin(credentials_path=CREDENTIALS_PATH) |
|
|
|
|
|
# 定义请求体模型 |
|
class DownloadRequest(BaseModel): |
|
url: str |
|
quality: str |
|
|
|
|
|
def cleanup_dir(dir_path: Path): |
|
"""Background task to clean up directory""" |
|
try: |
|
shutil.rmtree(dir_path) |
|
logger.info(f"Cleaned up directory: {dir_path}") |
|
except Exception as e: |
|
logger.error(f"Error cleaning up {dir_path}: {e}") |
|
|
|
|
|
# Download a Spotify track and return a download URL |
|
@app.post("/spot-track/{track_id}") |
|
async def download_spotify_track( |
|
track_id: str, |
|
background_tasks: BackgroundTasks |
|
): |
|
try: |
|
downloads_dir = Path("downloads") |
|
# Create unique directory for this download |
|
download_id = uuid.uuid4().hex |
|
download_dir = downloads_dir / download_id |
|
download_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
# Download to unique directory |
|
logger.info(f"Downloading to {download_dir}") |
|
spo.download_track( |
|
link_track=f"https://open.spotify.com/track/{track_id}", |
|
output_dir=str(download_dir), |
|
quality_download="VERY_HIGH", |
|
recursive_quality=False, |
|
recursive_download=False, |
|
not_interface=False, |
|
method_save=1 |
|
) |
|
|
|
# Find downloaded file |
|
filepath = next(download_dir.glob(f"**/*.ogg"), None) |
|
if not filepath: |
|
raise HTTPException(status_code=500, detail="File not found after download") |
|
|
|
# Schedule cleanup after response is sent |
|
#background_tasks.add_task(cleanup_dir, download_dir) |
|
|
|
# Return path with unique directory |
|
relative_path = quote(str(filepath.relative_to(downloads_dir))) |
|
return {"download_url": f"{BASE_URL}/downloads/{relative_path}"} |
|
|
|
except Exception as e: |
|
logger.error(f"Error downloading track: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
''' |
|
|
|
|
|
|
|
|