from flask import Flask, jsonify, request, send_from_directory from flask_cors import CORS import os import json import threading import urllib.parse from hf_scrapper import download_film, download_episode, get_system_proxies, get_download_progress from indexer import indexer from tvdb import fetch_and_cache_json import re app = Flask(__name__) CORS(app) # Constants and Configuration CACHE_DIR = os.getenv("CACHE_DIR") INDEX_FILE = os.getenv("INDEX_FILE") TOKEN = os.getenv("TOKEN") FILM_STORE_JSON_PATH = os.path.join(CACHE_DIR, "film_store.json") TV_STORE_JSON_PATH = os.path.join(CACHE_DIR, "tv_store.json") REPO = os.getenv("REPO") download_threads = {} # Ensure CACHE_DIR exists if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) for path in [FILM_STORE_JSON_PATH, TV_STORE_JSON_PATH]: if not os.path.exists(path): with open(path, 'w') as json_file: json.dump({}, json_file) # Index the file structure indexer() # Load the file structure JSON if not os.path.exists(INDEX_FILE): raise FileNotFoundError(f"{INDEX_FILE} not found. Please make sure the file exists.") with open(INDEX_FILE, 'r') as f: file_structure = json.load(f) # Function Definitions def load_json(file_path): """Load JSON data from a file.""" with open(file_path, 'r') as file: return json.load(file) def find_movie_path(json_data, title): """Find the path of the movie in the JSON data based on the title.""" for directory in json_data: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': for item in sub_directory['contents']: if item['type'] == 'file' and title.lower() in item['path'].lower(): return item['path'] return None def find_tv_path(json_data, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in json_data: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory['path'] return None def get_tv_structure(json_data,title): """Find the path of the TV show in the JSON data based on the title.""" for directory in json_data: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory return None def get_film_id(title): """Generate a film ID based on the title.""" return title.replace(" ", "_").lower() def prefetch_metadata(): """Prefetch metadata for all items in the file structure.""" for item in file_structure: if 'contents' in item: for sub_item in item['contents']: original_title = sub_item['path'].split('/')[-1] media_type = 'series' if item['path'].startswith('tv') else 'movie' title = original_title year = None # Extract year from the title if available match = re.search(r'\((\d{4})\)', original_title) if match: year_str = match.group(1) if year_str.isdigit() and len(year_str) == 4: title = original_title[:match.start()].strip() year = int(year_str) else: parts = original_title.rsplit(' ', 1) if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: title = parts[0].strip() year = int(parts[-1]) fetch_and_cache_json(original_title, title, media_type, year) def bytes_to_human_readable(num, suffix="B"): for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: if abs(num) < 1024.0: return f"{num:3.1f} {unit}{suffix}" num /= 1024.0 return f"{num:.1f} Y{suffix}" def encode_episodeid(title,season,episode): return f"{title}_{season}_{episode}" def get_all_tv_shows(indexed_cache): """Get all TV shows from the indexed cache structure JSON file.""" tv_shows = {} for directory in indexed_cache: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': show_title = sub_directory['path'].split('/')[-1] tv_shows[show_title] = [] for season_directory in sub_directory['contents']: if season_directory['type'] == 'directory': season = season_directory['path'].split('/')[-1] for episode in season_directory['contents']: if episode['type'] == 'file': tv_shows[show_title].append({ "season": season, "episode": episode['path'].split('/')[-1], "path": episode['path'] }) return tv_shows def get_all_films(indexed_cache): """Get all films from the indexed cache structure JSON file.""" films = [] for directory in indexed_cache: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': films.append(sub_directory['path']) return films def start_prefetching(): """Start the metadata prefetching in a separate thread.""" prefetch_metadata() # Start prefetching metadata thread = threading.Thread(target=start_prefetching) thread.daemon = True thread.start() # API Endpoints @app.route('/api/film', methods=['GET']) def get_movie_api(): """Endpoint to get the movie by title.""" title = request.args.get('title') if not title: return jsonify({"error": "Title parameter is required"}), 400 # Load the film store JSON with open(FILM_STORE_JSON_PATH, 'r') as json_file: film_store_data = json.load(json_file) # Check if the film is already cached if title in film_store_data: cache_path = film_store_data[title] if os.path.exists(cache_path): return send_from_directory(os.path.dirname(cache_path), os.path.basename(cache_path)) movie_path = find_movie_path(file_structure, title) if not movie_path: return jsonify({"error": "Movie not found"}), 404 cache_path = os.path.join(CACHE_DIR, movie_path) file_url = f"https://huggingface.co/{REPO}/resolve/main/{movie_path}" proxies = get_system_proxies() film_id = get_film_id(title) # Start the download in a separate thread if not already downloading if film_id not in download_threads or not download_threads[film_id].is_alive(): thread = threading.Thread(target=download_film, args=(file_url, TOKEN, cache_path, proxies, film_id, title)) download_threads[film_id] = thread thread.start() return jsonify({"status": "Download started", "film_id": film_id}) @app.route('/api/tv', methods=['GET']) def get_tv_show_api(): """Endpoint to get the TV show by title, season, and episode.""" title = request.args.get('title') season = request.args.get('season') episode = request.args.get('episode') if not title or not season or not episode: return jsonify({"error": "Title, season, and episode parameters are required"}), 400 # Load the TV store JSON with open(TV_STORE_JSON_PATH, 'r') as json_file: tv_store_data = json.load(json_file) # Check if the episode is already cached if title in tv_store_data and season in tv_store_data[title]: for ep in tv_store_data[title][season]: if episode in ep: cache_path = tv_store_data[title][season][ep] if os.path.exists(cache_path): return send_from_directory(os.path.dirname(cache_path), os.path.basename(cache_path)) tv_path = find_tv_path(file_structure, title) if not tv_path: return jsonify({"error": "TV show not found"}), 404 episode_path = None for directory in file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): for season_dir in sub_directory['contents']: if season_dir['type'] == 'directory' and season in season_dir['path']: for episode_file in season_dir['contents']: if episode_file['type'] == 'file' and episode in episode_file['path']: episode_path = episode_file['path'] break if not episode_path: return jsonify({"error": "Episode not found"}), 404 cache_path = os.path.join(CACHE_DIR, episode_path) file_url = f"https://huggingface.co/{REPO}/resolve/main/{episode_path}" proxies = get_system_proxies() episode_id = encode_episodeid(title,season,episode) # Start the download in a separate thread if not already downloading if episode_id not in download_threads or not download_threads[episode_id].is_alive(): thread = threading.Thread(target=download_episode, args=(file_url, TOKEN, cache_path, proxies, episode_id, title)) download_threads[episode_id] = thread thread.start() return jsonify({"status": "Download started", "episode_id": episode_id}) @app.route('/api/progress/', methods=['GET']) def get_progress_api(id): """Endpoint to get the download progress of a movie or TV show episode.""" progress = get_download_progress(id) return jsonify({"id": id, "progress": progress}) @app.route('/api/filmid', methods=['GET']) def get_film_id_by_title_api(): """Endpoint to get the film ID by providing the movie title.""" title = request.args.get('title') if not title: return jsonify({"error": "Title parameter is required"}), 400 film_id = get_film_id(title) return jsonify({"film_id": film_id}) @app.route('/api/episodeid', methods=['GET']) def get_episode_id_api(): """Endpoint to get the episode ID by providing the TV show title, season, and episode.""" title = request.args.get('title') season = request.args.get('season') episode = request.args.get('episode') if not title or not season or not episode: return jsonify({"error": "Title, season, and episode parameters are required"}), 400 episode_id = encode_episodeid(title,season,episode) return jsonify({"episode_id": episode_id}) @app.route('/api/cache/size', methods=['GET']) def get_cache_size_api(): total_size = 0 for dirpath, dirnames, filenames in os.walk(CACHE_DIR): for f in filenames: fp = os.path.join(dirpath, f) total_size += os.path.getsize(fp) readable_size = bytes_to_human_readable(total_size) return jsonify({"cache_size": readable_size}) @app.route('/api/cache/clear', methods=['POST']) def clear_cache_api(): for dirpath, dirnames, filenames in os.walk(CACHE_DIR): for f in filenames: fp = os.path.join(dirpath, f) os.remove(fp) return jsonify({"status": "Cache cleared"}) @app.route('/api/tv/store', methods=['GET']) def get_tv_store_api(): """Endpoint to get the TV store JSON.""" if os.path.exists(TV_STORE_JSON_PATH): with open(TV_STORE_JSON_PATH, 'r') as json_file: tv_store_data = json.load(json_file) return jsonify(tv_store_data) return jsonify({}), 404 @app.route('/api/film/store', methods=['GET']) def get_film_store_api(): """Endpoint to get the TV store JSON.""" if os.path.exists(FILM_STORE_JSON_PATH): with open(FILM_STORE_JSON_PATH, 'r') as json_file: tv_store_data = json.load(json_file) return jsonify(tv_store_data) return jsonify({}), 404 @app.route('/api/film/metadata', methods=['GET']) def get_film_metadata_api(): """Endpoint to get the film metadata by title.""" title = request.args.get('title') if not title: return jsonify({'error': 'No title provided'}), 400 json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json") if os.path.exists(json_cache_path): with open(json_cache_path, 'r') as f: data = json.load(f) return jsonify(data) return jsonify({'error': 'Metadata not found'}), 404 @app.route('/api/tv/metadata', methods=['GET']) def get_tv_metadata_api(): """Endpoint to get the TV show metadata by title.""" title = request.args.get('title') if not title: return jsonify({'error': 'No title provided'}), 400 json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json") if os.path.exists(json_cache_path): with open(json_cache_path, 'r') as f: data = json.load(f) # Add the file structure to the metadata tv_structure_data = get_tv_structure(file_structure, title) if tv_structure_data: data['file_structure'] = tv_structure_data return jsonify(data) return jsonify({'error': 'Metadata not found'}), 404 @app.route("/api/film/all") def get_all_films_api(): return get_all_films(file_structure) @app.route("/api/tv/all") def get_all_tvshows_api(): return get_all_tv_shows(file_structure) # Routes @app.route('/') def index(): return "Server Running" # Main entry point if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=7860)