import gradio as gr import io import base64 from flask import Flask, render_template, request, send_file, jsonify import torch import json from PIL import Image from diffusers import DiffusionPipeline from diffusers import ( DDPMScheduler, DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler, EulerAncestralDiscreteScheduler, EulerDiscreteScheduler, DPMSolverMultistepScheduler, ) import threading import requests from flask import Flask, render_template_string from gradio import Interface from diffusers import AutoencoderKL import pandas as pd import base64 from config import * # get # - initial configuration, # - a list of available devices from the config file # - a list of available models from the config file # - a list of available schedulers from the config file # - a dict that contains code to for reproduction initial_config, devices, models, schedulers, code = get_inital_config() device = initial_config["device"] model = initial_config["model"] scheduler = initial_config["scheduler"] variant = initial_config["variant"] allow_tensorfloat32 = initial_config["allow_tensorfloat32"] use_safetensors = initial_config["use_safetensors"] data_type = initial_config["data_type"] safety_checker = initial_config["safety_checker"] requires_safety_checker = initial_config["requires_safety_checker"] manual_seed = initial_config["manual_seed"] inference_steps = initial_config["inference_steps"] guidance_scale = initial_config["guidance_scale"] prompt = initial_config["prompt"] negative_prompt = initial_config["negative_prompt"] config_history = [] def device_change(device): code[code_pos_device] = f'''device = "{device}"''' return get_sorted_code() def models_change(model, scheduler): use_safetensors = False # no model selected (because this is UI init run) if type(model) != list and model is not None: use_safetensors = str(model_configs[model]['use_safetensors']) # if no scheduler is selected, choose the default one for this model if scheduler == None: scheduler = model_configs[model]['scheduler'] code[code_pos_init_pipeline] = f'''pipeline = DiffusionPipeline.from_pretrained( "{model}", use_safetensors=use_safetensors, torch_dtype=data_type, variant=variant).to(device)''' safety_checker_change(safety_checker) requires_safety_checker_change(requires_safety_checker) return get_sorted_code(), use_safetensors, scheduler def data_type_change(selected_data_type): get_data_type(selected_data_type) return get_sorted_code() def get_data_type(selected_data_type): if selected_data_type == "bfloat16": code[code_pos_data_type] = 'data_type = torch.bfloat16' data_type = torch.bfloat16 # BFloat16 is not supported on MPS as of 01/2024 else: code[code_pos_data_type] = 'data_type = torch.float16' data_type = torch.float16 # Half-precision weights, as of https://huggingface.co/docs/diffusers/main/en/optimization/fp16 will save GPU memory return data_type def tensorfloat32_change(allow_tensorfloat32): get_tensorfloat32(allow_tensorfloat32) return get_sorted_code() def get_tensorfloat32(allow_tensorfloat32): code[code_pos_tf32] = f'torch.backends.cuda.matmul.allow_tf32 = {allow_tensorfloat32}' return True if str(allow_tensorfloat32).lower() == 'true' else False def variant_change(variant): if str(variant) == 'None': code[code_pos_variant] = f'variant = {variant}' else: code[code_pos_variant] = f'variant = "{variant}"' return get_sorted_code() def safety_checker_change(safety_checker): if not safety_checker or str(safety_checker).lower == 'false': code[code_pos_safety_checker] = f'pipeline.safety_checker = None' else: code[code_pos_safety_checker] = '' return get_sorted_code() def requires_safety_checker_change(requires_safety_checker): code[code_pos_requires_safety_checker] = f'pipeline.requires_safety_checker = {requires_safety_checker}' return get_sorted_code() def schedulers_change(scheduler): if type(scheduler) != list and scheduler is not None: code[code_pos_scheduler] = f'pipeline.scheduler = {scheduler}.from_config(pipeline.scheduler.config)' return get_sorted_code(), scheduler_configs[scheduler] else: return get_sorted_code(), '' def get_scheduler(scheduler, config): if scheduler == "DDPMScheduler": return DDPMScheduler.from_config(config) elif scheduler == "DDIMScheduler": return DDIMScheduler.from_config(config) elif scheduler == "PNDMScheduler": return PNDMScheduler.from_config(config) elif scheduler == "LMSDiscreteScheduler": return LMSDiscreteScheduler.from_config(config) elif scheduler == "EulerAncestralDiscreteScheduler": return EulerAncestralDiscreteScheduler.from_config(config) elif scheduler == "EulerDiscreteScheduler": return EulerDiscreteScheduler.from_config(config) elif scheduler == "DPMSolverMultistepScheduler": return DPMSolverMultistepScheduler.from_config(config) else: return DPMSolverMultistepScheduler.from_config(config) # pipeline def run_inference(model, device, use_safetensors, data_type, variant, safety_checker, requires_safety_checker, scheduler, prompt, negative_prompt, inference_steps, manual_seed, guidance_scale, progress=gr.Progress(track_tqdm=True)): if model != None and scheduler != None: progress((1,3), desc="Preparing pipeline initialization...") torch.backends.cuda.matmul.allow_tf32 = get_tensorfloat32(allow_tensorfloat32) # Use TensorFloat-32 as of https://huggingface.co/docs/diffusers/main/en/optimization/fp16 faster, but slightly less accurate computations bool_use_safetensors = True if use_safetensors.lower() == 'true' else False progress((2,3), desc="Initializing pipeline...") pipeline = DiffusionPipeline.from_pretrained( model, use_safetensors=bool_use_safetensors, torch_dtype=get_data_type(data_type), variant=variant).to(device) if safety_checker is None or str(safety_checker).lower == 'false': pipeline.safety_checker = None pipeline.requires_safety_checker = bool(requires_safety_checker) pipeline.scheduler = get_scheduler(scheduler, pipeline.scheduler.config) if manual_seed < 0 or manual_seed is None or manual_seed == '': generator = torch.Generator(device) else: generator = torch.manual_seed(42) progress((3,3), desc="Creating the result...") image = pipeline( prompt=prompt, negative_prompt=negative_prompt, generator=generator, num_inference_steps=int(inference_steps), guidance_scale=float(guidance_scale)).images[0] return "Done.", image else: return "Please select a model AND a scheduler.", None def dict_list_to_markdown_table(config_history): if not config_history: return "" headers = list(config_history[0].keys()) markdown_table = "| share | " + " | ".join(headers) + " |\n" markdown_table += "| --- | " + " | ".join(["---"] * len(headers)) + " |\n" for index, config in enumerate(config_history): encoded_config = base64.b64encode(str(config).encode()).decode() share_link = f'📎' markdown_table += f"| {share_link} | " + " | ".join(str(config.get(key, "")) for key in headers) + " |\n" markdown_table = '