Spaces:
Paused
Paused
from flask import Flask, jsonify, render_template, redirect, request, Response | |
import os | |
import json | |
import re | |
import requests | |
import urllib.parse | |
from datetime import datetime, timedelta | |
from threading import Thread | |
from hf_scrapper import get_system_proxies, ffmpeg_stream | |
from indexer import indexer | |
from dotenv import load_dotenv | |
import subprocess | |
load_dotenv() | |
INDEX_FILE = os.getenv("INDEX_FILE") | |
TOKEN = os.getenv("TOKEN") | |
REPO = os.getenv("REPO") | |
THETVDB_API_KEY = os.getenv("THETVDB_API_KEY") | |
THETVDB_API_URL = os.getenv("THETVDB_API_URL") | |
CACHE_DIR = os.getenv("CACHE_DIR") | |
TOKEN_EXPIRY = None | |
THETVDB_TOKEN = None | |
proxies = get_system_proxies() | |
if not os.path.exists(CACHE_DIR): | |
os.makedirs(CACHE_DIR) | |
indexer() | |
# Check if INDEX_FILE exists | |
if not os.path.exists(INDEX_FILE): | |
raise FileNotFoundError(f"{INDEX_FILE} not found. Please make sure the file exists.") | |
with open(INDEX_FILE, 'r') as f: | |
file_structure = json.load(f) | |
def authenticate_thetvdb(): | |
global THETVDB_TOKEN, TOKEN_EXPIRY | |
auth_url = f"{THETVDB_API_URL}/login" | |
auth_data = { | |
"apikey": THETVDB_API_KEY | |
} | |
try: | |
response = requests.post(auth_url, json=auth_data, proxies=proxies) | |
response.raise_for_status() | |
response_data = response.json() | |
THETVDB_TOKEN = response_data['data']['token'] | |
TOKEN_EXPIRY = datetime.now() + timedelta(days=30) # token is valid for 1 month | |
except requests.RequestException as e: | |
print(f"Authentication failed: {e}") | |
THETVDB_TOKEN = None | |
TOKEN_EXPIRY = None | |
def get_thetvdb_token(): | |
global THETVDB_TOKEN, TOKEN_EXPIRY | |
if not THETVDB_TOKEN or datetime.now() >= TOKEN_EXPIRY: | |
authenticate_thetvdb() | |
return THETVDB_TOKEN | |
def fetch_and_cache_json(original_title, title, media_type, year=None): | |
if year: | |
search_url = f"{THETVDB_API_URL}/search?query={urllib.parse.quote(title)}&type={media_type}&year={year}" | |
else: | |
search_url = f"{THETVDB_API_URL}/search?query={urllib.parse.quote(title)}&type={media_type}" | |
token = get_thetvdb_token() | |
if not token: | |
print("Authentication failed") | |
return | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"accept": "application/json", | |
} | |
try: | |
response = requests.get(search_url, headers=headers, proxies=proxies) | |
response.raise_for_status() | |
data = response.json() | |
if 'data' in data and data['data']: | |
# Use original_title to save JSON response to cache | |
json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(original_title)}.json") | |
with open(json_cache_path, 'w') as f: | |
json.dump(data, f) | |
except requests.RequestException as e: | |
print(f"Error fetching data: {e}") | |
def prefetch_metadata(): | |
for item in file_structure: | |
if 'contents' in item: | |
for sub_item in item['contents']: | |
original_title = sub_item['path'].split('/')[-1] | |
media_type = 'series' if item['path'].startswith('tv') else 'movie' | |
title = original_title | |
year = None | |
# Check if the title contains a year in parentheses | |
match = re.search(r'\((\d{4})\)', original_title) | |
if match: | |
year_str = match.group(1) | |
if year_str.isdigit() and len(year_str) == 4: | |
title = original_title[:match.start()].strip() | |
year = int(year_str) | |
else: | |
# Check if the title contains a year at the end without parentheses | |
parts = original_title.rsplit(' ', 1) | |
if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: | |
title = parts[0].strip() | |
year = int(parts[-1]) | |
fetch_and_cache_json(original_title, title, media_type, year) | |
def get_film_file_path(title): | |
# URL-decode the title | |
decoded_title = urllib.parse.unquote(title) | |
# Normalize the title for comparison | |
normalized_title = decoded_title.split(' (')[0] | |
normalized_title = normalized_title.strip() | |
for item in file_structure: | |
if item['path'].startswith('films'): | |
for sub_item in item['contents']: | |
sub_item_title = sub_item['path'].split('/')[-1] | |
# Normalize sub_item title | |
normalized_sub_item_title = sub_item_title.split(' (')[0] | |
normalized_sub_item_title = normalized_sub_item_title.strip() | |
if normalized_title == normalized_sub_item_title: | |
for file in sub_item['contents']: | |
if file['type'] == 'file': | |
return file['path'] | |
return None | |
def get_tv_show_seasons(title): | |
seasons = [] | |
for item in file_structure: | |
if item['path'].startswith('tv'): | |
for sub_item in item['contents']: | |
if sub_item['type'] == 'directory' and title in sub_item['path']: | |
for season in sub_item['contents']: | |
if season['type'] == 'directory': | |
episodes = [] | |
for episode in season['contents']: | |
if episode['type'] == 'file': | |
episodes.append({ | |
"title": episode['path'].split('/')[-1], | |
"path": episode['path'] | |
}) | |
seasons.append({ | |
"season": season['path'].split('/')[-1], | |
"episodes": episodes | |
}) | |
return seasons | |
return [] | |
# Run prefetch_metadata in a background thread | |
def start_prefetching(): | |
prefetch_metadata() | |
def generate(file_url): | |
token = TOKEN = os.getenv("TOKEN") | |
output_stream = ffmpeg_stream(file_url, token) | |
# Start prefetching before running the Flask app | |
thread = Thread(target=start_prefetching) | |
thread.daemon = True | |
thread.start() | |
app = Flask(__name__) | |
def home(): | |
return render_template('index.html') | |
def player(): | |
return render_template('player.html') | |
def list_films(): | |
films = [item for item in file_structure if item['path'].startswith('films')] | |
return jsonify([sub_item for film in films for sub_item in film['contents']]) | |
def list_tv(): | |
tv_shows = [item for item in file_structure if item['path'].startswith('tv')] | |
return jsonify([sub_item for show in tv_shows for sub_item in show['contents']]) | |
def film_page(title): | |
title = urllib.parse.unquote(title) # Decode the URL parameter | |
film_file_path = get_film_file_path(title) | |
if not film_file_path: | |
return jsonify({'error': 'Film not found'}), 404 | |
# Fetch cached metadata | |
json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json") | |
if os.path.exists(json_cache_path): | |
with open(json_cache_path, 'r') as f: | |
metadata = json.load(f) | |
else: | |
return jsonify({'error': 'Metadata not found'}), 404 | |
return jsonify({ | |
'metadata': metadata, | |
'file_path': film_file_path | |
}) | |
def tv_page(show_title): | |
show_title = urllib.parse.unquote(show_title) # Decode the URL parameter | |
seasons = get_tv_show_seasons(show_title) | |
if not seasons: | |
return jsonify({'error': 'TV show not found'}), 404 | |
# Fetch cached metadata | |
json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(show_title)}.json") | |
if os.path.exists(json_cache_path): | |
with open(json_cache_path, 'r') as f: | |
metadata = json.load(f) | |
else: | |
return jsonify({'error': 'Metadata not found'}), 404 | |
return jsonify({ | |
'metadata': metadata, | |
'seasons': seasons | |
}) | |
def get_metadata(): | |
title = request.args.get('title') | |
if not title: | |
return jsonify({'error': 'No title provided'}), 400 | |
json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json") | |
if os.path.exists(json_cache_path): | |
with open(json_cache_path, 'r') as f: | |
data = json.load(f) | |
return jsonify(data) | |
# If metadata is not found in cache, return an error | |
return jsonify({'error': 'Metadata not found'}), 404 | |
def stream_video(): | |
# this route currently only stream the file from huggingface using ffmpy. can't play them in the web yet. need to implement later. | |
file_path = request.args.get('path') | |
if not file_path: | |
return "File path not provided", 400 | |
file_url = f"https://huggingface.co/{REPO}/resolve/main/{file_path}" | |
return Response(generate(file_url), content_type='video/mp4') | |
if __name__ == '__main__': | |
app.run(debug=True, host="0.0.0.0", port=7860) | |