|
from flask import Flask, render_template, jsonify, request |
|
from ytmusicapi import YTMusic |
|
import os |
|
import logging |
|
import requests |
|
from datetime import datetime, timedelta |
|
import time |
|
import asyncio |
|
import cloudscraper |
|
from pydantic import BaseModel |
|
from urllib.parse import urlparse, parse_qs |
|
from collections import defaultdict |
|
import threading |
|
import aiohttp |
|
|
|
|
|
app = Flask(__name__) |
|
ytmusic = YTMusic() |
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/search', methods=['POST']) |
|
def search(): |
|
query = request.json.get('query', '') |
|
search_results = ytmusic.search(query, filter="songs") |
|
return jsonify(search_results) |
|
|
|
@app.route('/searcht', methods=['POST']) |
|
def searcht(): |
|
query = request.json.get('query', '') |
|
logger.info(f"serch query: {query}") |
|
search_results = ytmusic.search(query, filter="songs") |
|
first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} |
|
return jsonify(first_song) |
|
|
|
|
|
|
|
def extract_amazon_track_id(url: str): |
|
if "music.amazon.com" in url: |
|
|
|
parsed_url = urlparse(url) |
|
query_params = parse_qs(parsed_url.query) |
|
if "trackAsin" in query_params: |
|
return query_params["trackAsin"][0] |
|
|
|
|
|
if "/tracks/" in url: |
|
return url.split("/tracks/")[-1].split("?")[0] |
|
|
|
return None |
|
|
|
|
|
|
|
def get_song_link_info(url: str): |
|
|
|
if "music.amazon.com" in url: |
|
track_id = extract_amazon_track_id(url) |
|
if track_id: |
|
|
|
api_url = f"https://api.song.link/v1-alpha.1/links?type=song&platform=amazonMusic&id={track_id}&userCountry=US" |
|
else: |
|
|
|
api_url = f"https://api.song.link/v1-alpha.1/links?url={url}&userCountry=US" |
|
else: |
|
|
|
api_url = f"https://api.song.link/v1-alpha.1/links?url={url}&userCountry=US" |
|
|
|
|
|
response = requests.get(api_url) |
|
if response.status_code == 200: |
|
return response.json() |
|
else: |
|
return None |
|
|
|
|
|
def extract_url(links_by_platform: dict, platform: str): |
|
if platform in links_by_platform: |
|
return links_by_platform[platform]["url"] |
|
return None |
|
|
|
|
|
|
|
def extract_track_info(entities_by_unique_id: dict, platform: str): |
|
for entity in entities_by_unique_id.values(): |
|
if entity["apiProvider"] == platform: |
|
return entity["title"], entity["artistName"] |
|
return None, None |
|
|
|
|
|
|
|
@app.route('/match', methods=['POST']) |
|
async def match(): |
|
data = request.json |
|
track_url = data.get('url') |
|
|
|
if not track_url: |
|
raise HTTPException(status_code=400, detail="No URL provided") |
|
|
|
track_info = get_song_link_info(track_url) |
|
if not track_info: |
|
raise HTTPException(status_code=404, detail="Could not fetch track info") |
|
|
|
youtube_url = extract_url(track_info["linksByPlatform"], "youtube") |
|
entityUniqueId = track_info["entityUniqueId"] |
|
logger.info(f"songlink info: {entityUniqueId}") |
|
title = track_info["entitiesByUniqueId"][entityUniqueId]["title"] |
|
artist = track_info["entitiesByUniqueId"][entityUniqueId]["artistName"] |
|
if youtube_url: |
|
video_id = youtube_url.split("v=")[1] if "v=" in youtube_url else None |
|
if title and artist: |
|
filename = f"{title} - {artist}" |
|
return {"url": youtube_url, "filename": filename, "track_id": video_id} |
|
else: |
|
return {"url": youtube_url, "filename": "Unknown Track - Unknown Artist", "track_id": video_id} |
|
else: |
|
|
|
search_query = f'{title}+{artist}' |
|
search_results = ytmusic.search(search_query, filter="songs") |
|
first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} |
|
if 'videoId' in first_song: |
|
videoId = first_song["videoId"] |
|
ym_url = f'https://www.youtube.com/watch?v={videoId}' |
|
return {"filename": search_query, "url": ym_url, "track_id": videoId} |
|
else: |
|
raise HTTPException(status_code=404, detail="Video ID not found") |
|
|
|
|
|
raise HTTPException(status_code=404, detail="No matching URL found") |
|
|
|
|
|
|
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
|
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
|
|
api_rotator = ApiRotator([ |
|
"https://cobalt-api.ayo.tf", |
|
"https://dwnld.nichind.dev", |
|
"https://cobalt-api.kwiatekmiki.com", |
|
"http://34.107.254.11" |
|
|
|
]) |
|
|
|
|
|
|
|
async def get_track_download_url(track_id: str, quality: str) -> str: |
|
apis = api_rotator.get_prioritized_apis() |
|
session = cloudscraper.create_scraper() |
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
} |
|
|
|
for i, api_url in enumerate(apis): |
|
try: |
|
logger.info(f"Attempting to get download URL from: {api_url}") |
|
y_url = f"https://youtu.be/{track_id}" |
|
response = session.post( |
|
api_url, |
|
timeout=20, |
|
json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality}, |
|
headers=headers |
|
) |
|
logger.info(f"Response status: {response.status_code}") |
|
logger.info(f"Response content: {response.content}") |
|
|
|
if response.headers.get('content-type', '').startswith('application/json'): |
|
json_response = response.json() |
|
error_code = json_response.get("error", {}).get("code", "") |
|
|
|
if error_code == "error.api.content.video.unavailable": |
|
logger.warning(f"Video unavailable error from {api_url}") |
|
break |
|
|
|
if "url" in json_response: |
|
api_rotator.update_last_successful(i) |
|
return json_response["url"] |
|
|
|
except Exception as e: |
|
logger.error(f"Failed with {api_url}: {str(e)}") |
|
continue |
|
|
|
logger.error(f"No download URL found") |
|
return {"error": "Download URL not found"} |
|
|
|
|
|
async def get_audio_download_url(track_id: str, quality: str) -> str: |
|
youtube_url = f'https://www.youtube.com/watch?v={track_id}' |
|
donwnload_url = f'https://chrunos-shadl.hf.space/yt/dl?url={youtube_url}&type=audio' |
|
return donwnload_url |
|
|
|
|
|
async def get_download_url(track_id, quality): |
|
url = "https://chrunos-ytdl2.hf.space/download" |
|
youtube_url = f'https://www.youtube.com/watch?v={track_id}' |
|
async with aiohttp.ClientSession() as session: |
|
data = {"url": youtube_url} |
|
try: |
|
async with session.post(url, json=data) as response: |
|
if response.status == 200: |
|
result = await response.json() |
|
return result.get('download_url') |
|
else: |
|
print(f"请求失败,状态码: {response.status}") |
|
return None |
|
except aiohttp.ClientError as e: |
|
print(f"发生客户端错误: {e}") |
|
return None |
|
|
|
|
|
@app.route('/track_dl', methods=['POST']) |
|
async def track_dl(): |
|
|
|
data = request.get_json() |
|
track_id = data.get('track_id') |
|
quality = data.get('quality', '128') |
|
|
|
try: |
|
quality_num = int(quality) |
|
if quality_num > 128 or quality.upper() == 'FLAC': |
|
return jsonify({ |
|
"error": "Quality above 128 or FLAC is for Premium users Only.", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
}), 400 |
|
|
|
dl_url = await get_download_url(track_id, quality) |
|
logger.info(dl_url) |
|
|
|
if dl_url and "http" in dl_url: |
|
|
|
result = { |
|
"url": dl_url, |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
} |
|
return jsonify(result) |
|
else: |
|
return jsonify({ |
|
"error": "Failed to Fetch the Track.", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
}), 400 |
|
|
|
except ValueError: |
|
return jsonify({ |
|
"error": "Invalid quality value provided. It should be a valid integer or FLAC.", |
|
"premium": "https://chrunos.com/premium-shortcuts/" |
|
}), 400 |
|
|
|
|
|
|
|
|
|
@app.route('/get_artist', methods=['GET']) |
|
def get_artist(): |
|
artist_id = request.args.get('id') |
|
artist_info = ytmusic.get_artist(artist_id) |
|
return jsonify(artist_info) |
|
|
|
@app.route('/get_album', methods=['GET']) |
|
def get_album(): |
|
album_id = request.args.get('id') |
|
album_info = ytmusic.get_album(album_id) |
|
return jsonify(album_info) |
|
|
|
@app.route('/get_song', methods=['GET']) |
|
def get_song(): |
|
song_id = request.args.get('id') |
|
song_info = ytmusic.get_song(song_id) |
|
return jsonify(song_info) |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=7860) |
|
|