Spaces:
Build error
Build error
| import os | |
| import json | |
| from indexer import indexer | |
| import re | |
| from tvdb import fetch_and_cache_json | |
| from threading import Event, Thread | |
| import time | |
| import logging | |
| from utils import convert_to_gb | |
| from api import InstancesAPI | |
| CACHE_DIR = os.getenv("CACHE_DIR") | |
| download_progress = {} | |
| class LoadBalancer: | |
| def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1): | |
| self.version = "0.0.2.10 V Beta" | |
| self.instances = [] | |
| self.instances_health = {} | |
| self.polling_interval = polling_interval | |
| self.max_retries = max_retries | |
| self.initial_delay = initial_delay | |
| self.stop_event = Event() | |
| self.instances_api = InstancesAPI(self.instances) | |
| self.CACHE_DIR = cache_dir | |
| self.INDEX_FILE = index_file | |
| self.TOKEN = token | |
| self.REPO = repo | |
| self.FILM_STORE = {} | |
| self.TV_STORE = {} | |
| self.file_structure = None | |
| self.index_file_last_modified = None | |
| # Ensure CACHE_DIR exists | |
| if not os.path.exists(self.CACHE_DIR): | |
| os.makedirs(self.CACHE_DIR) | |
| # Index the file structure initially | |
| indexer() | |
| # Load the file structure JSON | |
| self.load_file_structure() | |
| # Start polling and file checking in separate threads | |
| polling_thread = Thread(target=self.start_polling) | |
| polling_thread.daemon = True | |
| polling_thread.start() | |
| file_checking_thread = Thread(target=self.check_file_updates) | |
| file_checking_thread.daemon = True | |
| file_checking_thread.start() | |
| def load_file_structure(self): | |
| if not os.path.exists(self.INDEX_FILE): | |
| raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") | |
| with open(self.INDEX_FILE, 'r') as f: | |
| self.file_structure = json.load(f) | |
| logging.info("File structure loaded successfully.") | |
| def check_file_updates(self): | |
| while not self.stop_event.is_set(): | |
| if self.index_file_last_modified != os.path.getmtime(self.INDEX_FILE): | |
| logging.info(f"{self.INDEX_FILE} has been updated. Re-indexing...") | |
| indexer() # Re-run the indexer | |
| self.load_file_structure() # Reload the file structure | |
| self.index_file_last_modified = os.path.getmtime(self.INDEX_FILE) | |
| # Restart prefetching thread | |
| if hasattr(self, 'prefetch_thread') and self.prefetch_thread.is_alive(): | |
| self.prefetch_thread.join() | |
| self.prefetch_thread = Thread(target=self.start_prefetching) | |
| self.prefetch_thread.daemon = True | |
| self.prefetch_thread.start() | |
| time.sleep(120) # Check every 2 minutes | |
| def register_instance(self, instance_url): | |
| if instance_url not in self.instances: | |
| self.instances.append(instance_url) | |
| logging.info(f"Registered instance {instance_url}") | |
| else: | |
| logging.info(f"Instance {instance_url} is already registered.") | |
| def remove_instance(self, instance_url): | |
| if instance_url in self.instances: | |
| self.instances.remove(instance_url) | |
| self.instances_health.pop(instance_url, None) | |
| logging.info(f"Removed instance {instance_url}") | |
| else: | |
| logging.info(f"Instance {instance_url} not found for removal.") | |
| def get_reports(self): | |
| reports = self.instances_api.fetch_reports() | |
| # Initialize temporary JSON data holders | |
| temp_film_store = {} | |
| temp_tv_store = {} | |
| for instance_url in self.instances[:]: # Copy list to avoid modification during iteration | |
| if instance_url in reports: | |
| report = reports[instance_url] | |
| logging.info(f"Report from {instance_url}: {report}") | |
| self.process_report(instance_url, report, temp_film_store, temp_tv_store) | |
| else: | |
| logging.error(f"Failed to get report from {instance_url}. Removing instance.") | |
| self.remove_instance(instance_url) | |
| self.FILM_STORE = temp_film_store | |
| self.TV_STORE = temp_tv_store | |
| def process_report(self, instance_url, report, temp_film_store, temp_tv_store): | |
| film_store = report.get('film_store', {}) | |
| tv_store = report.get('tv_store', {}) | |
| cache_size = report.get('cache_size') | |
| logging.info(f"Processing report from {instance_url}") | |
| # Update temporary film store | |
| for title, path in film_store.items(): | |
| url = f"{instance_url}/api/film/{title.replace(' ', '%20')}" | |
| temp_film_store[title] = url | |
| # Update temporary TV store | |
| for title, seasons in tv_store.items(): | |
| if title not in temp_tv_store: | |
| temp_tv_store[title] = {} | |
| for season, episodes in seasons.items(): | |
| if season not in temp_tv_store[title]: | |
| temp_tv_store[title][season] = {} | |
| for episode, path in episodes.items(): | |
| url = f"{instance_url}/api/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}" | |
| temp_tv_store[title][season][episode] = url | |
| logging.info("Film and TV Stores processed successfully.") | |
| self.update_instances_health(instance=instance_url, cache_size=cache_size) | |
| def start_polling(self): | |
| logging.info("Starting polling.") | |
| while not self.stop_event.is_set(): | |
| self.get_reports() | |
| time.sleep(self.polling_interval) | |
| logging.info("Polling stopped.") | |
| def stop_polling(self): | |
| logging.info("Stopping polling.") | |
| self.stop_event.set() | |
| def start_prefetching(self): | |
| """Start the metadata prefetching in a separate thread.""" | |
| self.prefetch_metadata() | |
| ################################################################# | |
| def update_instances_health(self, instance, cache_size): | |
| self.instances_health[instance] = {"used":cache_size["cache_size"], | |
| "total": "50 GB"} | |
| logging.info(f"Updated instance {instance} with cache size {cache_size}") | |
| def download_film_to_best_instance(self, title): | |
| """ | |
| Downloads a film to the first instance that has more free space on the self.instance_health list variable. | |
| The instance_health looks like this: | |
| { | |
| "https://unicone-studio-instance1.hf.space": { | |
| "total": "50 GB", | |
| "used": "3.33 GB" | |
| } | |
| } | |
| Args: | |
| title (str): The title of the film. | |
| """ | |
| best_instance = None | |
| max_free_space = -1 | |
| # Calculate free space for each instance | |
| for instance_url, space_info in self.instances_health.items(): | |
| total_space = convert_to_gb(space_info['total']) | |
| used_space = convert_to_gb(space_info['used']) | |
| free_space = total_space - used_space | |
| if free_space > max_free_space: | |
| max_free_space = free_space | |
| best_instance = instance_url | |
| if best_instance: | |
| result = self.instances_api.download_film(best_instance, title) | |
| film_id = result["film_id"] | |
| status = result["status"] | |
| progress_url = f'{best_instance}/api/progress/{film_id}' | |
| response = { | |
| "film_id":film_id, | |
| "status":status, | |
| "progress_url":progress_url | |
| } | |
| return response | |
| else: | |
| logging.error("No suitable instance found for downloading the film.") | |
| return {"error": "No suitable instance found for downloading the film."} | |
| def download_episode_to_best_instance(self, title, season, episode): | |
| """ | |
| Downloads a episode to the first instance that has more free space on the self.instance_health list variable. | |
| The instance_health looks like this: | |
| { | |
| "https://unicone-studio-instance1.hf.space": { | |
| "total": "50 GB", | |
| "used": "3.33 GB" | |
| } | |
| } | |
| Args: | |
| title (str): The title of the Tv show. | |
| season (str): The season of the Tv show. | |
| episode (str): The title of the Tv show. | |
| """ | |
| best_instance = None | |
| max_free_space = -1 | |
| # Calculate free space for each instance | |
| for instance_url, space_info in self.instances_health.items(): | |
| total_space = convert_to_gb(space_info['total']) | |
| used_space = convert_to_gb(space_info['used']) | |
| free_space = total_space - used_space | |
| if free_space > max_free_space: | |
| max_free_space = free_space | |
| best_instance = instance_url | |
| if best_instance: | |
| result = self.instances_api.download_episode(best_instance, title, season, episode) | |
| episode_id = result["episode_id"] | |
| status = result["status"] | |
| progress_url = f'{best_instance}/api/progress/{episode_id}' | |
| response = { | |
| "episode_id":episode_id, | |
| "status":status, | |
| "progress_url":progress_url | |
| } | |
| return response | |
| else: | |
| logging.error("No suitable instance found for downloading the film.") | |
| return {"error": "No suitable instance found for downloading the film."} | |
| ################################################################# | |
| def find_movie_path(self, title): | |
| """Find the path of the movie in the JSON data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'films': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| for item in sub_directory['contents']: | |
| if item['type'] == 'file' and title.lower() in item['path'].lower(): | |
| return item['path'] | |
| return None | |
| def find_tv_path(self, title): | |
| """Find the path of the TV show in the JSON data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory['path'] | |
| return None | |
| def get_tv_structure(self, title): | |
| """Find the path of the TV show in the JSON data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory | |
| return None | |
| def get_film_id(self, title): | |
| """Generate a film ID based on the title.""" | |
| return title.replace(" ", "_").lower() | |
| def prefetch_metadata(self): | |
| """Prefetch metadata for all items in the file structure.""" | |
| for item in self.file_structure: | |
| if 'contents' in item: | |
| for sub_item in item['contents']: | |
| original_title = sub_item['path'].split('/')[-1] | |
| media_type = 'series' if item['path'].startswith('tv') else 'movie' | |
| title = original_title | |
| year = None | |
| # Extract year from the title if available | |
| match = re.search(r'\((\d{4})\)', original_title) | |
| if match: | |
| year_str = match.group(1) | |
| if year_str.isdigit() and len(year_str) == 4: | |
| title = original_title[:match.start()].strip() | |
| year = int(year_str) | |
| else: | |
| parts = original_title.rsplit(' ', 1) | |
| if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: | |
| title = parts[0].strip() | |
| year = int(parts[-1]) | |
| fetch_and_cache_json(original_title, title, media_type, year) | |
| def get_all_tv_shows(self): | |
| """Get all TV shows from the indexed cache structure JSON file.""" | |
| tv_shows = {} | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| show_title = sub_directory['path'].split('/')[-1] | |
| tv_shows[show_title] = [] | |
| for season_directory in sub_directory['contents']: | |
| if season_directory['type'] == 'directory': | |
| season = season_directory['path'].split('/')[-1] | |
| for episode in season_directory['contents']: | |
| if episode['type'] == 'file': | |
| tv_shows[show_title].append({ | |
| "season": season, | |
| "episode": episode['path'].split('/')[-1], | |
| "path": episode['path'] | |
| }) | |
| return tv_shows | |
| def get_all_films(self): | |
| """Get all films from the indexed cache structure JSON file.""" | |
| films = [] | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'films': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| films.append(sub_directory['path']) | |
| return films |