|
import hashlib |
|
import time |
|
import re |
|
import logging |
|
import requests |
|
import subprocess |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.responses import FileResponse |
|
from fastapi.staticfiles import StaticFiles |
|
import tempfile |
|
import os |
|
import uuid |
|
import gc |
|
import json |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
global_download_dir = tempfile.mkdtemp() |
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
def md5(string): |
|
"""Generate MD5 hash of a string.""" |
|
return hashlib.md5(string.encode('utf-8')).hexdigest() |
|
|
|
BASE_URL = 'https://tecuts-request.hf.space' |
|
BASE = 'https://www.qobuz.com/api.json/0.2/' |
|
TOKEN = os.getenv('TOKEN') |
|
|
|
|
|
APP_ID = '579939560' |
|
|
|
|
|
HEADERS = { |
|
'X-App-Id': APP_ID, |
|
'X-User-Auth-Token': TOKEN, |
|
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.21) Gecko/20100312 Firefox/3.6' |
|
} |
|
|
|
@app.get('/') |
|
def main(): |
|
return {"appStatus": "running"} |
|
|
|
@app.post('/track') |
|
async def fetch_data_for_url(request: Request): |
|
try: |
|
data = await request.json() |
|
except Exception as e: |
|
logging.error(f"Error parsing JSON: {e}") |
|
raise HTTPException(status_code=400, detail="Invalid JSON") |
|
|
|
url = data.get('url') |
|
if not url: |
|
raise HTTPException(status_code=400, detail="URL is required") |
|
|
|
logging.info(f'Fetching data for: {url}') |
|
|
|
track_id_match = re.search(r'\d+$', url) |
|
if not track_id_match: |
|
logging.error('Track ID not found in your input.') |
|
raise HTTPException(status_code=400, detail="Track ID not found in URL") |
|
|
|
track_id = track_id_match.group(0) |
|
timestamp = int(time.time()) |
|
rSigRaw = f'trackgetFileUrlformat_id27intentstreamtrack_id{track_id}{timestamp}fa31fc13e7a28e7d70bb61e91aa9e178' |
|
rSig = md5(rSigRaw) |
|
|
|
download_url = f'{BASE}track/getFileUrl?format_id=27&intent=stream&track_id={track_id}&request_ts={timestamp}&request_sig={rSig}' |
|
|
|
response = requests.get(download_url, headers=HEADERS) |
|
if response.status_code != 200: |
|
logging.error(f"Failed to fetch the track file URL: {response.status_code}") |
|
raise HTTPException(status_code=500, detail="Failed to fetch the track file URL") |
|
|
|
file_url = response.json().get('url') |
|
if not file_url: |
|
logging.error("No file URL returned from Qobuz") |
|
raise HTTPException(status_code=500, detail="No file URL returned") |
|
|
|
|
|
flac_file_path = os.path.join(global_download_dir, f'{uuid.uuid4()}.flac') |
|
with requests.get(file_url, stream=True) as r: |
|
r.raise_for_status() |
|
with open(flac_file_path, 'wb') as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
logging.info(f"FLAC file downloaded at {flac_file_path}") |
|
|
|
|
|
alac_file_path = os.path.join(global_download_dir, f'{uuid.uuid4()}.m4a') |
|
with subprocess.Popen(['ffmpeg', '-i', flac_file_path, '-c:a', 'alac', alac_file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as process: |
|
process.communicate() |
|
logging.info(f"Converted to ALAC: {alac_file_path}") |
|
|
|
|
|
os.remove(flac_file_path) |
|
logging.info(f"Deleted FLAC file: {flac_file_path}") |
|
|
|
|
|
alac_file_url = f'{BASE_URL}/file/{os.path.basename(alac_file_path)}' |
|
logging.info(f"Returning ALAC file URL: {alac_file_url}") |
|
|
|
|
|
gc.collect() |
|
|
|
|
|
return {"url": alac_file_url} |
|
|
|
|
|
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
|
|
|
def extract_video_id(url): |
|
|
|
pattern = r'(?:(?:youtube\.com\/(?:watch\?v=|embed\/|shorts\/)|youtu\.be\/)([a-zA-Z0-9_-]{11}))' |
|
|
|
|
|
match = re.search(pattern, url) |
|
|
|
if match: |
|
return match.group(1) |
|
else: |
|
return None |
|
|
|
|
|
@app.get("/yt-audio") |
|
async def yt_audio(url: str): |
|
try: |
|
|
|
videoId = extract_video_id(url) |
|
if videoId is None: |
|
raise ValueError("Video ID extraction failed") |
|
|
|
|
|
headers = { |
|
"X-Goog-Authuser": "0", |
|
"X-Origin": "https://www.youtube.com", |
|
"X-goog-Visitor-Id": "CgtMWlVXOTFCcE5Edyj9p4i9BjIKCgJVUxIEGgAgZA%3D%3D" |
|
} |
|
|
|
|
|
body = { |
|
"context": { |
|
"client": { |
|
"clientName": "IOS", |
|
"clientVersion": "19.14.3", |
|
"deviceModel": "iPhone15,4" |
|
} |
|
}, |
|
"videoId": videoId |
|
} |
|
Player_Url = "https://www.youtube.com/youtubei/v1/player?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false" |
|
|
|
|
|
logging.info(f"Sending request to URL: {url} with video ID: {videoId}") |
|
response = requests.post( |
|
Player_Url, |
|
timeout=20, |
|
json=body, |
|
headers=headers |
|
) |
|
response.raise_for_status() |
|
|
|
logging.info(f"Received response from URL: {url}, status code: {response.status_code}") |
|
return response.json() |
|
|
|
except requests.RequestException as e: |
|
logging.error(f"Request error: {str(e)}") |
|
return {"error": f"Request error: {str(e)}"} |
|
except json.JSONDecodeError as e: |
|
logging.error(f"JSON decoding error: {str(e)}") |
|
return {"error": f"JSON decoding error: {str(e)}"} |
|
except ValueError as e: |
|
logging.error(f"Value error: {str(e)}") |
|
return {"error": f"Value error: {str(e)}"} |
|
except Exception as e: |
|
logging.error(f"Unexpected error: {str(e)}") |
|
return {"error": f"Unexpected error: {str(e)}"} |
|
|
|
|
|
@app.middleware("http") |
|
async def set_mime_type_middleware(request: Request, call_next): |
|
response = await call_next(request) |
|
if request.url.path.endswith(".m4a"): |
|
response.headers["Content-Type"] = "audio/m4a" |
|
return response |
|
|