from flask import Flask, render_template, jsonify, request |
from ytmusicapi import YTMusic |
import os |
import logging |
import requests |
from datetime import datetime, timedelta |
import time |
import asyncio |
import cloudscraper |
from pydantic import BaseModel |
from urllib.parse import urlparse, parse_qs |
from collections import defaultdict |
import threading |
import aiohttp |
app = Flask(__name__) |
ytmusic = YTMusic() |
logging.basicConfig(level=logging.INFO) |
logger = logging.getLogger(__name__) |
@app.route('/') |
def index(): |
return render_template('index.html') |
@app.route('/search', methods=['POST']) |
def search(): |
query = request.json.get('query', '') |
search_results = ytmusic.search(query, filter="songs") |
return jsonify(search_results) |
@app.route('/searcht', methods=['POST']) |
def searcht(): |
query = request.json.get('query', '') |
logger.info(f"serch query: {query}") |
search_results = ytmusic.search(query, filter="songs") |
first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} |
return jsonify(first_song) |
def extract_amazon_track_id(url: str): |
if "music.amazon.com" in url: |
parsed_url = urlparse(url) |
query_params = parse_qs(parsed_url.query) |
if "trackAsin" in query_params: |
return query_params["trackAsin"][0] |
if "/tracks/" in url: |
return url.split("/tracks/")[-1].split("?")[0] |
return None |
def get_song_link_info(url: str): |
if "music.amazon.com" in url: |
track_id = extract_amazon_track_id(url) |
if track_id: |
api_url = f"https://api.song.link/v1-alpha.1/links?type=song&platform=amazonMusic&id={track_id}&userCountry=US" |
else: |
api_url = f"https://api.song.link/v1-alpha.1/links?url={url}&userCountry=US" |
else: |
api_url = f"https://api.song.link/v1-alpha.1/links?url={url}&userCountry=US" |
response = requests.get(api_url) |
if response.status_code == 200: |
return response.json() |
else: |
return None |
def extract_url(links_by_platform: dict, platform: str): |
if platform in links_by_platform: |
return links_by_platform[platform]["url"] |
return None |
def extract_track_info(entities_by_unique_id: dict, platform: str): |
for entity in entities_by_unique_id.values(): |
if entity["apiProvider"] == platform: |
return entity["title"], entity["artistName"] |
return None, None |
@app.route('/match', methods=['POST']) |
async def match(): |
data = request.json |
track_url = data.get('url') |
if not track_url: |
raise HTTPException(status_code=400, detail="No URL provided") |
track_info = get_song_link_info(track_url) |
if not track_info: |
raise HTTPException(status_code=404, detail="Could not fetch track info") |
youtube_url = extract_url(track_info["linksByPlatform"], "youtube") |
entityUniqueId = track_info["entityUniqueId"] |
logger.info(f"songlink info: {entityUniqueId}") |
title = track_info["entitiesByUniqueId"][entityUniqueId]["title"] |
artist = track_info["entitiesByUniqueId"][entityUniqueId]["artistName"] |
if youtube_url: |
video_id = youtube_url.split("v=")[1] if "v=" in youtube_url else None |
if title and artist: |
filename = f"{title} - {artist}" |
return {"url": youtube_url, "filename": filename, "track_id": video_id} |
else: |
return {"url": youtube_url, "filename": "Unknown Track - Unknown Artist", "track_id": video_id} |
else: |
search_query = f'{title}+{artist}' |
search_results = ytmusic.search(search_query, filter="songs") |
first_song = next((song for song in search_results if 'videoId' in song and song['videoId']), {}) if search_results else {} |
if 'videoId' in first_song: |
videoId = first_song["videoId"] |
ym_url = f'https://www.youtube.com/watch?v={videoId}' |
return {"filename": search_query, "url": ym_url, "track_id": videoId} |
else: |
raise HTTPException(status_code=404, detail="Video ID not found") |
raise HTTPException(status_code=404, detail="No matching URL found") |
class ApiRotator: |
def __init__(self, apis): |
self.apis = apis |
self.last_successful_index = None |
def get_prioritized_apis(self): |
if self.last_successful_index is not None: |
rotated_apis = ( |
[self.apis[self.last_successful_index]] + |
self.apis[:self.last_successful_index] + |
self.apis[self.last_successful_index+1:] |
) |
return rotated_apis |
return self.apis |
def update_last_successful(self, index): |
self.last_successful_index = index |
api_rotator = ApiRotator([ |
"https://cobalt-api.ayo.tf", |
"https://dwnld.nichind.dev", |
"https://cobalt-api.kwiatekmiki.com", |
"" |
]) |
async def get_track_download_url(track_id: str, quality: str) -> str: |
apis = api_rotator.get_prioritized_apis() |
session = cloudscraper.create_scraper() |
headers = { |
"Accept": "application/json", |
"Content-Type": "application/json", |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
} |
for i, api_url in enumerate(apis): |
try: |
logger.info(f"Attempting to get download URL from: {api_url}") |
y_url = f"https://youtu.be/{track_id}" |
response = session.post( |
api_url, |
timeout=20, |
json={"url": y_url, "audioFormat": "mp3", "downloadMode": "audio", "audioBitrate": quality}, |
headers=headers |
) |
logger.info(f"Response status: {response.status_code}") |
logger.info(f"Response content: {response.content}") |
if response.headers.get('content-type', '').startswith('application/json'): |
json_response = response.json() |
error_code = json_response.get("error", {}).get("code", "") |
if error_code == "error.api.content.video.unavailable": |
logger.warning(f"Video unavailable error from {api_url}") |
break |
if "url" in json_response: |
api_rotator.update_last_successful(i) |
return json_response["url"] |
except Exception as e: |
logger.error(f"Failed with {api_url}: {str(e)}") |
continue |
logger.error(f"No download URL found") |
return {"error": "Download URL not found"} |
async def get_audio_download_url(track_id: str, quality: str) -> str: |
youtube_url = f'https://www.youtube.com/watch?v={track_id}' |
donwnload_url = f'https://chrunos-shadl.hf.space/yt/dl?url={youtube_url}&type=audio' |
return donwnload_url |
async def get_download_url(track_id, quality): |
url = "https://chrunos-ytdl2.hf.space/download" |
youtube_url = f'https://www.youtube.com/watch?v={track_id}' |
async with aiohttp.ClientSession() as session: |
data = {"url": youtube_url} |
try: |
async with session.post(url, json=data) as response: |
if response.status == 200: |
result = await response.json() |
return result.get('download_url') |
else: |
print(f"请求失败,状态码: {response.status}") |
return None |
except aiohttp.ClientError as e: |
print(f"发生客户端错误: {e}") |
return None |
@app.route('/track_dl', methods=['POST']) |
async def track_dl(): |
data = request.get_json() |
track_id = data.get('track_id') |
quality = data.get('quality', '128') |
try: |
quality_num = int(quality) |
if quality_num > 128 or quality.upper() == 'FLAC': |
return jsonify({ |
"error": "Quality above 128 or FLAC is for Premium users Only.", |
"premium": "https://chrunos.com/premium-shortcuts/" |
}), 400 |
dl_url = await get_download_url(track_id, quality) |
logger.info(dl_url) |
if dl_url and "http" in dl_url: |
result = { |
"url": dl_url, |
"premium": "https://chrunos.com/premium-shortcuts/" |
} |
return jsonify(result) |
else: |
return jsonify({ |
"error": "Failed to Fetch the Track.", |
"premium": "https://chrunos.com/premium-shortcuts/" |
}), 400 |
except ValueError: |
return jsonify({ |
"error": "Invalid quality value provided. It should be a valid integer or FLAC.", |
"premium": "https://chrunos.com/premium-shortcuts/" |
}), 400 |
@app.route('/get_artist', methods=['GET']) |
def get_artist(): |
artist_id = request.args.get('id') |
artist_info = ytmusic.get_artist(artist_id) |
return jsonify(artist_info) |
@app.route('/get_album', methods=['GET']) |
def get_album(): |
album_id = request.args.get('id') |
album_info = ytmusic.get_album(album_id) |
return jsonify(album_info) |
@app.route('/get_song', methods=['GET']) |
def get_song(): |
song_id = request.args.get('id') |
song_info = ytmusic.get_song(song_id) |
return jsonify(song_info) |
if __name__ == '__main__': |
app.run(host='', port=7860) |