import os import json import urllib.request from tqdm import tqdm from indexer import indexer import re from tvdb import fetch_and_cache_json from threading import Event import time import logging from threading import Thread, Event from api import InstancesAPI CACHE_DIR = os.getenv("CACHE_DIR") download_progress = {} class LoadBalancer: def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1): self.version = "0.0.2.6 V Beta" self.instances = [] self.instances_health = {} self.polling_interval = polling_interval self.max_retries = max_retries self.initial_delay = initial_delay self.stop_event = Event() self.instances_api = InstancesAPI(self.instances) self.CACHE_DIR = cache_dir self.INDEX_FILE = index_file self.TOKEN = token self.REPO = repo self.FILM_STORE = {} self.TV_STORE = {} self.file_structure = None # Ensure CACHE_DIR exists if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) # Index the file structure indexer() # Load the file structure JSON if not os.path.exists(self.INDEX_FILE): raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") with open(self.INDEX_FILE, 'r') as f: self.file_structure = json.load(f) prefetch_thread = Thread(target=self.start_prefetching) prefetch_thread.daemon = True prefetch_thread.start() def register_instance(self, instance_url): if instance_url not in self.instances: self.instances.append(instance_url) logging.info(f"Registered instance {instance_url}") else: logging.info(f"Instance {instance_url} is already registered.") def remove_instance(self, instance_url): if instance_url in self.instances: self.instances.remove(instance_url) self.instances_health.pop(instance_url, None) logging.info(f"Removed instance {instance_url}") else: logging.info(f"Instance {instance_url} not found for removal.") def get_reports(self): reports = self.instances_api.fetch_reports() # Initialize temporary JSON data holders temp_film_store = {} temp_tv_store = {} for instance_url in self.instances[:]: # Copy list to avoid modification during iteration if instance_url in reports: report = reports[instance_url] logging.info(f"Report from {instance_url}: {report}") self.process_report(instance_url, report, temp_film_store, temp_tv_store) else: logging.error(f"Failed to get report from {instance_url}. Removing instance.") self.remove_instance(instance_url) self.FILM_STORE = temp_film_store self.TV_STORE = temp_tv_store def process_report(self, instance_url, report, temp_film_store, temp_tv_store): film_store = report.get('film_store', {}) tv_store = report.get('tv_store', {}) cache_size = report.get('cache_size') logging.info(f"Processing report from {instance_url}") # Update temporary film store for title, path in film_store.items(): url = f"{instance_url}/api/film/{title.replace(' ', '%20')}" temp_film_store[title] = url # Update temporary TV store for title, seasons in tv_store.items(): if title not in temp_tv_store: temp_tv_store[title] = {} for season, episodes in seasons.items(): if season not in temp_tv_store[title]: temp_tv_store[title][season] = {} for episode, path in episodes.items(): url = f"{instance_url}/api/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}" temp_tv_store[title][season][episode] = url logging.info("Film and TV Stores processed successfully.") self.update_instances_health(instance=instance_url, cache_size=cache_size) def start_polling(self): logging.info("Starting polling.") while not self.stop_event.is_set(): self.get_reports() time.sleep(self.polling_interval) logging.info("Polling stopped.") def stop_polling(self): logging.info("Stopping polling.") self.stop_event.set() ###################################################################### @staticmethod def get_system_proxies(): """ Retrieves the system's HTTP and HTTPS proxies. Returns: dict: A dictionary containing the proxies. """ try: proxies = urllib.request.getproxies() print("System proxies:", proxies) return { "http": proxies.get("http"), "https": proxies.get("http") } except Exception as e: print(f"Error getting system proxies: {e}") return {} @staticmethod def is_valid_url(url): # Simple URL validation (could be more complex if needed) regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(regex, url) is not None ################################################################# def update_instances_health(self, instance, cache_size): self.instances_health[instance] = {"used":cache_size["cache_size"], "total": "50 GB"} logging.info(f"Updated instance {instance} with cache size {cache_size}") def download_film_to_best_instance(self, title): """ Downloads a film to the first instance that has more free space on the self.instance_health list variable. The instance_health looks like this: { "https://unicone-studio-instance1.hf.space": { "total": "50 GB", "used": "3.33 GB" } } Args: title (str): The title of the film. """ best_instance = None max_free_space = -1 # Calculate free space for each instance for instance_url, space_info in self.instances_health.items(): total_space = self._convert_to_gb(space_info['total']) used_space = self._convert_to_gb(space_info['used']) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if best_instance: result = self.instances_api.download_film(best_instance, title) film_id = result["film_id"] status = result["status"] progress_url = f'{best_instance}/api/progress/{film_id}' response = { "film_id":film_id, "status":status, "progress_url":progress_url } return response else: logging.error("No suitable instance found for downloading the film.") return {"error": "No suitable instance found for downloading the film."} def download_episode_to_best_instance(self, title, season, episode): """ Downloads a episode to the first instance that has more free space on the self.instance_health list variable. The instance_health looks like this: { "https://unicone-studio-instance1.hf.space": { "total": "50 GB", "used": "3.33 GB" } } Args: title (str): The title of the Tv show. season (str): The season of the Tv show. episode (str): The title of the Tv show. """ best_instance = None max_free_space = -1 # Calculate free space for each instance for instance_url, space_info in self.instances_health.items(): total_space = self._convert_to_gb(space_info['total']) used_space = self._convert_to_gb(space_info['used']) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if best_instance: result = self.instances_api.download_episode(best_instance, title, season, episode) episode_id = result["episode_id"] status = result["status"] progress_url = f'{best_instance}/api/progress/{episode_id}' response = { "episode_id":episode_id, "status":status, "progress_url":progress_url } return response else: logging.error("No suitable instance found for downloading the film.") return {"error": "No suitable instance found for downloading the film."} def _convert_to_gb(self, space_str): """ Converts a space string like '50 GB' or '3.33 GB' to a float representing the number of GB. """ return float(space_str.split()[0]) ################################################################# def find_movie_path(self, title): """Find the path of the movie in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': for item in sub_directory['contents']: if item['type'] == 'file' and title.lower() in item['path'].lower(): return item['path'] return None def find_tv_path(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory['path'] return None def get_tv_structure(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory return None def get_film_id(self, title): """Generate a film ID based on the title.""" return title.replace(" ", "_").lower() def prefetch_metadata(self): """Prefetch metadata for all items in the file structure.""" for item in self.file_structure: if 'contents' in item: for sub_item in item['contents']: original_title = sub_item['path'].split('/')[-1] media_type = 'series' if item['path'].startswith('tv') else 'movie' title = original_title year = None # Extract year from the title if available match = re.search(r'\((\d{4})\)', original_title) if match: year_str = match.group(1) if year_str.isdigit() and len(year_str) == 4: title = original_title[:match.start()].strip() year = int(year_str) else: parts = original_title.rsplit(' ', 1) if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: title = parts[0].strip() year = int(parts[-1]) fetch_and_cache_json(original_title, title, media_type, year) def bytes_to_human_readable(self, num, suffix="B"): for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: if abs(num) < 1024.0: return f"{num:3.1f} {unit}{suffix}" num /= 1024.0 return f"{num:.1f} Y{suffix}" def encode_episodeid(self, title, season, episode): return f"{title}_{season}_{episode}" def get_all_tv_shows(self): """Get all TV shows from the indexed cache structure JSON file.""" tv_shows = {} for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': show_title = sub_directory['path'].split('/')[-1] tv_shows[show_title] = [] for season_directory in sub_directory['contents']: if season_directory['type'] == 'directory': season = season_directory['path'].split('/')[-1] for episode in season_directory['contents']: if episode['type'] == 'file': tv_shows[show_title].append({ "season": season, "episode": episode['path'].split('/')[-1], "path": episode['path'] }) return tv_shows def get_all_films(self): """Get all films from the indexed cache structure JSON file.""" films = [] for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': films.append(sub_directory['path']) return films def start_prefetching(self): """Start the metadata prefetching in a separate thread.""" self.prefetch_metadata()