import concurrent.futures import random import gradio as gr import requests import io, base64, json import spaces import torch from PIL import Image from openai import OpenAI from .models import IMAGE_GENERATION_MODELS, IMAGE_EDITION_MODELS, load_pipeline from .fetch_museum_results import draw_from_imagen_museum, draw2_from_imagen_museum from serve.upload import get_random_mscoco_prompt class ModelManager: def __init__(self): self.model_ig_list = IMAGE_GENERATION_MODELS self.model_ie_list = IMAGE_EDITION_MODELS self.loaded_models = {} def load_model_pipe(self, model_name): if not model_name in self.loaded_models: pipe = load_pipeline(model_name) self.loaded_models[model_name] = pipe else: pipe = self.loaded_models[model_name] return pipe @spaces.GPU(duration=120) def generate_image_ig(self, prompt, model_name): pipe = self.load_model_pipe(model_name) if 'Stable-cascade' not in model_name: result = pipe(prompt=prompt).images[0] else: prior, decoder = pipe prior.enable_model_cpu_offload() prior_output = prior( prompt=prompt, height=512, width=512, negative_prompt='', guidance_scale=4.0, num_images_per_prompt=1, num_inference_steps=20 ) decoder.enable_model_cpu_offload() result = decoder( image_embeddings=prior_output.image_embeddings.to(torch.float16), prompt=prompt, negative_prompt='', guidance_scale=0.0, output_type="pil", num_inference_steps=10 ).images[0] return result def generate_image_ig_api(self, prompt, model_name): pipe = self.load_model_pipe(model_name) result = pipe(prompt=prompt) return result def generate_image_ig_museum(self, model_name): model_name = model_name.split('_')[1] result_list = draw_from_imagen_museum("t2i", model_name) image_link = result_list[0] prompt = result_list[1] return image_link, prompt def generate_image_ig_parallel_anony(self, prompt, model_A, model_B, model_C, model_D): if model_A == "" and model_B == "" and model_C == "" and model_D == "": # not_run = [11, 12, 13, 14, 15, 16, 17, 18, 19] # filtered_models = [model for i, model in enumerate(self.model_ig_list) if i not in not_run] # model_names = random.sample([model for model in filtered_models], 4) # model_names = random.sample([model for model in self.model_ig_list], 4) from .matchmaker import matchmaker model_ids = matchmaker(num_players=len(self.model_ig_list)) print(model_ids) model_names = [self.model_ig_list[i] for i in model_ids] print(model_names) else: model_names = [model_A, model_B, model_C, model_D] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(self.generate_image_ig, prompt, model) if model.startswith("huggingface") else executor.submit(self.generate_image_ig_api, prompt, model) for model in model_names] results = [future.result() for future in futures] return results[0], results[1], results[2], results[3], \ model_names[0], model_names[1], model_names[2], model_names[3] def generate_image_ig_museum_parallel_anony(self, model_A, model_B, model_C, model_D): if model_A == "" and model_B == "" and model_C == "" and model_D == "": # model_names = random.sample([model for model in self.model_ig_list], 4) from .matchmaker import matchmaker model_ids = matchmaker(num_players=len(self.model_ig_list)) print(model_ids) model_names = [self.model_ig_list[i] for i in model_ids] print(model_names) else: model_names = [model_A, model_B, model_C, model_D] prompt = get_random_mscoco_prompt() print(prompt) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(self.generate_image_ig, prompt, model) if model.startswith("huggingface") else executor.submit(self.generate_image_ig_api, prompt, model) for model in model_names] results = [future.result() for future in futures] return results[0], results[1], results[2], results[3], \ model_names[0], model_names[1], model_names[2], model_names[3], prompt def generate_image_ig_parallel(self, prompt, model_A, model_B): model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(self.generate_image_ig, prompt, model) if model.startswith("imagenhub") else executor.submit(self.generate_image_ig_api, prompt, model) for model in model_names] results = [future.result() for future in futures] return results[0], results[1] def generate_image_ig_museum_parallel(self, model_A, model_B): with concurrent.futures.ThreadPoolExecutor() as executor: model_1 = model_A.split('_')[1] model_2 = model_B.split('_')[1] result_list = draw2_from_imagen_museum("t2i", model_1, model_2) image_links = result_list[0] prompt_list = result_list[1] return image_links[0], image_links[1], prompt_list[0] @spaces.GPU(duration=200) def generate_image_ie(self, textbox_source, textbox_target, textbox_instruct, source_image, model_name): pipe = self.load_model_pipe(model_name) result = pipe(src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct) return result def generate_image_ie_museum(self, model_name): model_name = model_name.split('_')[1] result_list = draw_from_imagen_museum("tie", model_name) image_links = result_list[0] prompt_list = result_list[1] # image_links = [src, model] # prompt_list = [source_caption, target_caption, instruction] return image_links[0], image_links[1], prompt_list[0], prompt_list[1], prompt_list[2] def generate_image_ie_parallel(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B): model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model) for model in model_names] results = [future.result() for future in futures] return results[0], results[1] def generate_image_ie_museum_parallel(self, model_A, model_B): model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: model_1 = model_names[0].split('_')[1] model_2 = model_names[1].split('_')[1] result_list = draw2_from_imagen_museum("tie", model_1, model_2) image_links = result_list[0] prompt_list = result_list[1] # image_links = [src, model_A, model_B] # prompt_list = [source_caption, target_caption, instruction] return image_links[0], image_links[1], image_links[2], prompt_list[0], prompt_list[1], prompt_list[2] def generate_image_ie_parallel_anony(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B): if model_A == "" and model_B == "": model_names = random.sample([model for model in self.model_ie_list], 2) else: model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model) for model in model_names] results = [future.result() for future in futures] return results[0], results[1], model_names[0], model_names[1] def generate_image_ie_museum_parallel_anony(self, model_A, model_B): if model_A == "" and model_B == "": model_names = random.sample([model for model in self.model_ie_list], 2) else: model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: model_1 = model_names[0].split('_')[1] model_2 = model_names[1].split('_')[1] result_list = draw2_from_imagen_museum("tie", model_1, model_2) image_links = result_list[0] prompt_list = result_list[1] # image_links = [src, model_A, model_B] # prompt_list = [source_caption, target_caption, instruction] return image_links[0], image_links[1], image_links[2], prompt_list[0], prompt_list[1], prompt_list[2], model_names[0], model_names[1] raise NotImplementedError