import concurrent.futures import random import gradio as gr import requests import io, base64, json # import spaces from PIL import Image from .model_config import model_config from .model_worker import BaseModelWorker class ModelManager: def __init__(self): self.models_config = model_config self.models_worker: list[BaseModelWorker] = {} self.build_model_workers() def build_model_workers(self): for cfg in self.models_config.values(): worker = BaseModelWorker(cfg.model_name, cfg.i2s_model, cfg.online_model, cfg.model_path) self.models_worker[cfg.model_name] = worker def get_all_models(self): models = [] for model_name in self.models_config.keys(): models.append(model_name) return models def get_t2s_models(self): models = [] for cfg in self.models_config.values(): if not cfg.i2s_model: models.append(cfg.model_name) return models def get_i2s_models(self): models = [] for cfg in self.models_config.values(): if cfg.i2s_model: models.append(cfg.model_name) return models def get_online_models(self): models = [] for cfg in self.models_config.values(): if cfg.online_model: models.append(cfg.model_name) return models def get_models(self, i2s_model:bool, online_model:bool): models = [] for cfg in self.models_config.values(): if cfg.i2s_model==i2s_model and cfg.online_model==online_model: models.append(cfg.model_name) return models def check_online(self, name): worker = self.models_worker[name] if not worker.online_model: return # @spaces.GPU(duration=120) def inference(self, prompt, model_name): worker = self.models_worker[model_name] result = worker.inference(prompt=prompt) return result def render(self, prompt, model_name): worker = self.models_worker[model_name] result = worker.render(prompt=prompt) return result def inference_parallel(self, prompt, model_A, model_B): results = [] model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.inference, prompt, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1] def inference_parallel_anony(self, prompt, model_A, model_B, i2s_model): if model_A == model_B == "": model_A, model_B = random.sample(self.get_models(i2s_model=i2s_model, online_model=True), 2) model_names = [model_A, model_B] results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.inference, prompt, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1] def render_parallel(self, prompt, model_A, model_B): results = [] model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.render, prompt, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1] # def i2s_inference_parallel(self, image, model_A, model_B): # results = [] # model_names = [model_A, model_B] # with concurrent.futures.ThreadPoolExecutor() as executor: # future_to_result = {executor.submit(self.inference, image, model): model # for model in model_names} # for future in concurrent.futures.as_completed(future_to_result): # result = future.result() # results.append(result) # return results[0], results[1]