instance2 / app.backup.py
ChandimaPrabath's picture
major update v0.2
20b65a4
raw
history blame
14.3 kB
from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS
import os
import json
import threading
import urllib.parse
from hf_scrapper import download_film, download_episode, get_system_proxies, get_download_progress
from indexer import indexer
from tvdb import fetch_and_cache_json
import re
app = Flask(__name__)
CORS(app)
# Constants and Configuration
CACHE_DIR = os.getenv("CACHE_DIR")
INDEX_FILE = os.getenv("INDEX_FILE")
TOKEN = os.getenv("TOKEN")
FILM_STORE_JSON_PATH = os.path.join(CACHE_DIR, "film_store.json")
TV_STORE_JSON_PATH = os.path.join(CACHE_DIR, "tv_store.json")
REPO = os.getenv("REPO")
download_threads = {}
# Ensure CACHE_DIR exists
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
for path in [FILM_STORE_JSON_PATH, TV_STORE_JSON_PATH]:
if not os.path.exists(path):
with open(path, 'w') as json_file:
json.dump({}, json_file)
# Index the file structure
indexer()
# Load the file structure JSON
if not os.path.exists(INDEX_FILE):
raise FileNotFoundError(f"{INDEX_FILE} not found. Please make sure the file exists.")
with open(INDEX_FILE, 'r') as f:
file_structure = json.load(f)
# Function Definitions
def load_json(file_path):
"""Load JSON data from a file."""
with open(file_path, 'r') as file:
return json.load(file)
def find_movie_path(json_data, title):
"""Find the path of the movie in the JSON data based on the title."""
for directory in json_data:
if directory['type'] == 'directory' and directory['path'] == 'films':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
for item in sub_directory['contents']:
if item['type'] == 'file' and title.lower() in item['path'].lower():
return item['path']
return None
def find_tv_path(json_data, title):
"""Find the path of the TV show in the JSON data based on the title."""
for directory in json_data:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_tv_structure(json_data,title):
"""Find the path of the TV show in the JSON data based on the title."""
for directory in json_data:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
return sub_directory
return None
def get_film_id(title):
"""Generate a film ID based on the title."""
return title.replace(" ", "_").lower()
def prefetch_metadata():
"""Prefetch metadata for all items in the file structure."""
for item in file_structure:
if 'contents' in item:
for sub_item in item['contents']:
original_title = sub_item['path'].split('/')[-1]
media_type = 'series' if item['path'].startswith('tv') else 'movie'
title = original_title
year = None
# Extract year from the title if available
match = re.search(r'\((\d{4})\)', original_title)
if match:
year_str = match.group(1)
if year_str.isdigit() and len(year_str) == 4:
title = original_title[:match.start()].strip()
year = int(year_str)
else:
parts = original_title.rsplit(' ', 1)
if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4:
title = parts[0].strip()
year = int(parts[-1])
fetch_and_cache_json(original_title, title, media_type, year)
def bytes_to_human_readable(num, suffix="B"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Y{suffix}"
def encode_episodeid(title,season,episode):
return f"{title}_{season}_{episode}"
def get_all_tv_shows(indexed_cache):
"""Get all TV shows from the indexed cache structure JSON file."""
tv_shows = {}
for directory in indexed_cache:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
show_title = sub_directory['path'].split('/')[-1]
tv_shows[show_title] = []
for season_directory in sub_directory['contents']:
if season_directory['type'] == 'directory':
season = season_directory['path'].split('/')[-1]
for episode in season_directory['contents']:
if episode['type'] == 'file':
tv_shows[show_title].append({
"season": season,
"episode": episode['path'].split('/')[-1],
"path": episode['path']
})
return tv_shows
def get_all_films(indexed_cache):
"""Get all films from the indexed cache structure JSON file."""
films = []
for directory in indexed_cache:
if directory['type'] == 'directory' and directory['path'] == 'films':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
films.append(sub_directory['path'])
return films
def start_prefetching():
"""Start the metadata prefetching in a separate thread."""
prefetch_metadata()
# Start prefetching metadata
thread = threading.Thread(target=start_prefetching)
thread.daemon = True
thread.start()
# API Endpoints
@app.route('/api/film', methods=['GET'])
def get_movie_api():
"""Endpoint to get the movie by title."""
title = request.args.get('title')
if not title:
return jsonify({"error": "Title parameter is required"}), 400
# Load the film store JSON
with open(FILM_STORE_JSON_PATH, 'r') as json_file:
film_store_data = json.load(json_file)
# Check if the film is already cached
if title in film_store_data:
cache_path = film_store_data[title]
if os.path.exists(cache_path):
return send_from_directory(os.path.dirname(cache_path), os.path.basename(cache_path))
movie_path = find_movie_path(file_structure, title)
if not movie_path:
return jsonify({"error": "Movie not found"}), 404
cache_path = os.path.join(CACHE_DIR, movie_path)
file_url = f"https://huggingface.co/{REPO}/resolve/main/{movie_path}"
proxies = get_system_proxies()
film_id = get_film_id(title)
# Start the download in a separate thread if not already downloading
if film_id not in download_threads or not download_threads[film_id].is_alive():
thread = threading.Thread(target=download_film, args=(file_url, TOKEN, cache_path, proxies, film_id, title))
download_threads[film_id] = thread
thread.start()
return jsonify({"status": "Download started", "film_id": film_id})
@app.route('/api/tv', methods=['GET'])
def get_tv_show_api():
"""Endpoint to get the TV show by title, season, and episode."""
title = request.args.get('title')
season = request.args.get('season')
episode = request.args.get('episode')
if not title or not season or not episode:
return jsonify({"error": "Title, season, and episode parameters are required"}), 400
# Load the TV store JSON
with open(TV_STORE_JSON_PATH, 'r') as json_file:
tv_store_data = json.load(json_file)
# Check if the episode is already cached
if title in tv_store_data and season in tv_store_data[title]:
for ep in tv_store_data[title][season]:
if episode in ep:
cache_path = tv_store_data[title][season][ep]
if os.path.exists(cache_path):
return send_from_directory(os.path.dirname(cache_path), os.path.basename(cache_path))
tv_path = find_tv_path(file_structure, title)
if not tv_path:
return jsonify({"error": "TV show not found"}), 404
episode_path = None
for directory in file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
for season_dir in sub_directory['contents']:
if season_dir['type'] == 'directory' and season in season_dir['path']:
for episode_file in season_dir['contents']:
if episode_file['type'] == 'file' and episode in episode_file['path']:
episode_path = episode_file['path']
break
if not episode_path:
return jsonify({"error": "Episode not found"}), 404
cache_path = os.path.join(CACHE_DIR, episode_path)
file_url = f"https://huggingface.co/{REPO}/resolve/main/{episode_path}"
proxies = get_system_proxies()
episode_id = encode_episodeid(title,season,episode)
# Start the download in a separate thread if not already downloading
if episode_id not in download_threads or not download_threads[episode_id].is_alive():
thread = threading.Thread(target=download_episode, args=(file_url, TOKEN, cache_path, proxies, episode_id, title))
download_threads[episode_id] = thread
thread.start()
return jsonify({"status": "Download started", "episode_id": episode_id})
@app.route('/api/progress/<id>', methods=['GET'])
def get_progress_api(id):
"""Endpoint to get the download progress of a movie or TV show episode."""
progress = get_download_progress(id)
return jsonify({"id": id, "progress": progress})
@app.route('/api/filmid', methods=['GET'])
def get_film_id_by_title_api():
"""Endpoint to get the film ID by providing the movie title."""
title = request.args.get('title')
if not title:
return jsonify({"error": "Title parameter is required"}), 400
film_id = get_film_id(title)
return jsonify({"film_id": film_id})
@app.route('/api/episodeid', methods=['GET'])
def get_episode_id_api():
"""Endpoint to get the episode ID by providing the TV show title, season, and episode."""
title = request.args.get('title')
season = request.args.get('season')
episode = request.args.get('episode')
if not title or not season or not episode:
return jsonify({"error": "Title, season, and episode parameters are required"}), 400
episode_id = encode_episodeid(title,season,episode)
return jsonify({"episode_id": episode_id})
@app.route('/api/cache/size', methods=['GET'])
def get_cache_size_api():
total_size = 0
for dirpath, dirnames, filenames in os.walk(CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
readable_size = bytes_to_human_readable(total_size)
return jsonify({"cache_size": readable_size})
@app.route('/api/cache/clear', methods=['POST'])
def clear_cache_api():
for dirpath, dirnames, filenames in os.walk(CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
os.remove(fp)
return jsonify({"status": "Cache cleared"})
@app.route('/api/tv/store', methods=['GET'])
def get_tv_store_api():
"""Endpoint to get the TV store JSON."""
if os.path.exists(TV_STORE_JSON_PATH):
with open(TV_STORE_JSON_PATH, 'r') as json_file:
tv_store_data = json.load(json_file)
return jsonify(tv_store_data)
return jsonify({}), 404
@app.route('/api/film/store', methods=['GET'])
def get_film_store_api():
"""Endpoint to get the film store JSON."""
if os.path.exists(FILM_STORE_JSON_PATH):
with open(FILM_STORE_JSON_PATH, 'r') as json_file:
tv_store_data = json.load(json_file)
return jsonify(tv_store_data)
return jsonify({}), 404
@app.route('/api/film/metadata', methods=['GET'])
def get_film_metadata_api():
"""Endpoint to get the film metadata by title."""
title = request.args.get('title')
if not title:
return jsonify({'error': 'No title provided'}), 400
json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
with open(json_cache_path, 'r') as f:
data = json.load(f)
return jsonify(data)
return jsonify({'error': 'Metadata not found'}), 404
@app.route('/api/tv/metadata', methods=['GET'])
def get_tv_metadata_api():
"""Endpoint to get the TV show metadata by title."""
title = request.args.get('title')
if not title:
return jsonify({'error': 'No title provided'}), 400
json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
with open(json_cache_path, 'r') as f:
data = json.load(f)
# Add the file structure to the metadata
tv_structure_data = get_tv_structure(file_structure, title)
if tv_structure_data:
data['file_structure'] = tv_structure_data
return jsonify(data)
return jsonify({'error': 'Metadata not found'}), 404
@app.route("/api/film/all")
def get_all_films_api():
return get_all_films(file_structure)
@app.route("/api/tv/all")
def get_all_tvshows_api():
return get_all_tv_shows(file_structure)
# Routes
@app.route('/')
def index():
return "Server Running ..."
# Main entry point
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=7860)