import gradio as gr import io import base64 from flask import Flask, render_template, request, send_file, jsonify import torch import json from PIL import Image from diffusers import DiffusionPipeline from diffusers import ( DDPMScheduler, DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler, EulerAncestralDiscreteScheduler, EulerDiscreteScheduler, DPMSolverMultistepScheduler, ) import threading import requests from flask import Flask, render_template_string from gradio import Interface from diffusers import AutoencoderKL def load_app_config(): global appConfig try: with open('appConfig.json', 'r') as f: appConfig = json.load(f) except FileNotFoundError: print("App config file not found.") except json.JSONDecodeError: print("Error decoding JSON in app config file.") except Exception as e: print("An error occurred while loading app config:", str(e)) load_app_config() # code output order code = {} code_pos_device = '001_code' code_pos_data_type = '002_data_type' code_pos_tf32 = '003_tf32' code_pos_variant = '004_variant' code_pos_init_pipeline = '050_init_pipe' code_pos_requires_safety_checker = '054_requires_safety_checker' code_pos_safety_checker = '055_safety_checker' code_pos_scheduler = '060_scheduler' # model config model_configs = appConfig.get("models", {}) models = list(model_configs.keys()) scheduler_configs = appConfig.get("schedulers", {}) schedulers = list(scheduler_configs.keys()) device = None variant = None allow_tensorfloat32 = False use_safetensors = False data_type = 'float16' safety_checker = False requires_safety_checker = False manual_seed = 42 inference_steps = 10 guidance_scale = 0.5 prompt = 'A white rabbit' negative_prompt = 'lowres, cropped, worst quality, low quality, chat bubble, chat bubbles, ugly' # init device parameters if torch.cuda.is_available(): device = "cuda" data_type = 'bfloat16' allow_tensorfloat32 = True elif torch.backends.mps.is_available(): device = "mps" else: device = "cpu" def get_sorted_code(): return '\r\n'.join(value[1] for value in sorted(code.items())) # change methods def device_change(device): code[code_pos_device] = f'''device = "{device}"''' return get_sorted_code() def models_change(model, scheduler): use_safetensors = False # no model selected (because this is UI init run) if type(model) != list: use_safetensors = str(model_configs[model]['use_safetensors']) # if no scheduler is selected, choose the default one for this model if scheduler == None: scheduler = model_configs[model]['scheduler'] code[code_pos_init_pipeline] = f'''pipeline = DiffusionPipeline.from_pretrained( "{model}", use_safetensors=use_safetensors, torch_dtype=data_type, variant=variant).to(device)''' safety_checker_change(safety_checker) requires_safety_checker_change(requires_safety_checker) return get_sorted_code(), use_safetensors, scheduler def data_type_change(selected_data_type): get_data_type(selected_data_type) return get_sorted_code() def get_data_type(selected_data_type): if selected_data_type == "bfloat16": code[code_pos_data_type] = 'data_type = torch.bfloat16' data_type = torch.bfloat16 # BFloat16 is not supported on MPS as of 01/2024 else: code[code_pos_data_type] = 'data_type = torch.float16' data_type = torch.float16 # Half-precision weights, as of https://huggingface.co/docs/diffusers/main/en/optimization/fp16 will save GPU memory return data_type def tensorfloat32_change(allow_tensorfloat32): get_tensorfloat32(allow_tensorfloat32) return get_sorted_code() def get_tensorfloat32(allow_tensorfloat32): code[code_pos_tf32] = f'torch.backends.cuda.matmul.allow_tf32 = {allow_tensorfloat32}' return True if str(allow_tensorfloat32).lower() == 'true' else False def variant_change(variant): if str(variant) == 'None': code[code_pos_variant] = f'variant = {variant}' else: code[code_pos_variant] = f'variant = "{variant}"' return get_sorted_code() def safety_checker_change(safety_checker): if not safety_checker or str(safety_checker).lower == 'false': code[code_pos_safety_checker] = f'pipeline.safety_checker = None' else: code[code_pos_safety_checker] = '' return get_sorted_code() def requires_safety_checker_change(requires_safety_checker): code[code_pos_requires_safety_checker] = f'pipeline.requires_safety_checker = {requires_safety_checker}' return get_sorted_code() def schedulers_change(scheduler): if type(scheduler) != list: code[code_pos_scheduler] = f'pipeline.scheduler = {scheduler}.from_config(pipeline.scheduler.config)' return get_sorted_code(), scheduler_configs[scheduler] else: return get_sorted_code(), '' def get_scheduler(scheduler, config): if scheduler == "DDPMScheduler": return DDPMScheduler.from_config(config) elif scheduler == "DDIMScheduler": return DDIMScheduler.from_config(config) elif scheduler == "PNDMScheduler": return PNDMScheduler.from_config(config) elif scheduler == "LMSDiscreteScheduler": return LMSDiscreteScheduler.from_config(config) elif scheduler == "EulerAncestralDiscreteScheduler": return EulerAncestralDiscreteScheduler.from_config(config) elif scheduler == "EulerDiscreteScheduler": return EulerDiscreteScheduler.from_config(config) elif scheduler == "DPMSolverMultistepScheduler": return DPMSolverMultistepScheduler.from_config(config) else: return DPMSolverMultistepScheduler.from_config(config) # pipeline def start_pipeline(model, device, use_safetensors, data_type, variant, safety_checker, requires_safety_checker, scheduler, prompt, negative_prompt, inference_steps, manual_seed, guidance_scale, progress=gr.Progress(track_tqdm=True)): if model != None and scheduler != None: progress((1,3), desc="Preparing pipeline initialization...") torch.backends.cuda.matmul.allow_tf32 = get_tensorfloat32(allow_tensorfloat32) # Use TensorFloat-32 as of https://huggingface.co/docs/diffusers/main/en/optimization/fp16 faster, but slightly less accurate computations bool_use_safetensors = True if use_safetensors.lower() == 'true' else False progress((2,3), desc="Initializing pipeline...") pipeline = DiffusionPipeline.from_pretrained( model, use_safetensors=bool_use_safetensors, torch_dtype=get_data_type(data_type), variant=variant).to(device) if safety_checker is None or str(safety_checker).lower == 'false': pipeline.safety_checker = None pipeline.requires_safety_checker = bool(requires_safety_checker) pipeline.scheduler = get_scheduler(scheduler, pipeline.scheduler.config) generator = torch.Generator(device) progress((3,3), desc="Creating the result...") image = pipeline( prompt=prompt, negative_prompt=negative_prompt, generator=generator.manual_seed(int(manual_seed)), num_inference_steps=int(inference_steps), guidance_scale=float(guidance_scale)).images[0] return "Done.", image else: return "Please select a model AND a scheduler.", None code[code_pos_device] = f'device = "{device}"' code[code_pos_variant] = f'variant = {variant}' code[code_pos_tf32] = f'torch.backends.cuda.matmul.allow_tf32 = {allow_tensorfloat32}' code[code_pos_data_type] = 'data_type = torch.bfloat16' code[code_pos_init_pipeline] = 'sys.exit("No model selected!")' code[code_pos_safety_checker] = 'pipeline.safety_checker = None' code[code_pos_requires_safety_checker] = f'pipeline.requires_safety_checker = {requires_safety_checker}' code[code_pos_scheduler] = 'sys.exit("No scheduler selected!")' # interface with gr.Blocks() as demo: gr.Markdown("## Image Generation") gr.Markdown("### Device specific settings") with gr.Row(): rg_device = gr.Radio(label="Device:", value=device, choices=["cpu", "mps", "gpu"]) rg_data_type = gr.Radio(label="Data Type:", value=data_type, choices=["bfloat16", "float16"], info="`blfoat16` is not supported on MPS devices right now; Half-precision weights, will save GPU memory, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16") rg_allow_tensorfloat32 = gr.Radio(label="Allow TensorFloat32:", value=allow_tensorfloat32, choices=[True, False], info="is not supported on MPS devices right now; use TensorFloat-32 is faster, but results in slightly less accurate computations, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16 ") rg_variant = gr.Radio(label="Variant:", value=variant, choices=["fp16", None], info="Use half-precision weights will save GPU memory, not all models support that, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16 ") gr.Markdown("### Model specific settings") with gr.Row(): dd_models = gr.Dropdown(choices=models, label="Model", ) with gr.Row(): with gr.Column(scale=1): rg_use_safetensors = gr.Radio(label="Use safe tensors:", choices=["True", "False"], interactive=False) with gr.Column(scale=1): rg_safety_checker = gr.Radio(label="Enable safety checker:", value=safety_checker, choices=[True, False]) rg_requires_safety_checker = gr.Radio(label="Requires safety checker:", value=requires_safety_checker, choices=[True, False]) gr.Markdown("### Scheduler") with gr.Row(): dd_schedulers = gr.Dropdown(choices=schedulers, label="Scheduler", info="see https://huggingface.co/docs/diffusers/using-diffusers/loading#schedulers" ) txt_scheduler = gr.Textbox(value="", label="Description") gr.Markdown("### Adapters") with gr.Row(): gr.Markdown('Choose an adapter.') gr.Markdown("### Inference settings") with gr.Row(): el_prompt = gr.TextArea(label="Prompt", value=prompt) el_negative_prompt = gr.TextArea(label="Negative prompt", value=negative_prompt) with gr.Row(): el_inference_steps = gr.Textbox(label="Inference steps", value=inference_steps) el_manual_seed = gr.Textbox(label="Manual seed", value=manual_seed) el_guidance_scale = gr.Textbox(label="Guidance Scale", value=guidance_scale, info="A low guidance scale leads to a faster inference time, with the drawback that negative prompts don’t have any effect on the denoising process.") gr.Markdown("### Output") with gr.Row(): btn_start_pipeline = gr.Button(value="Run inferencing") with gr.Row(): tb_result = gr.Textbox(label="Status", value="") el_result = gr.Image() txt_code = gr.Code(get_sorted_code(), label="Code") rg_device.change(device_change, inputs=[rg_device], outputs=[txt_code]) rg_data_type.change(data_type_change, inputs=[rg_data_type], outputs=[txt_code]) rg_allow_tensorfloat32.change(tensorfloat32_change, inputs=[rg_allow_tensorfloat32], outputs=[txt_code]) rg_variant.change(variant_change, inputs=[rg_variant], outputs=[txt_code]) dd_models.change(models_change, inputs=[dd_models, dd_schedulers], outputs=[txt_code, rg_use_safetensors, dd_schedulers]) rg_safety_checker.change(safety_checker_change, inputs=[rg_safety_checker], outputs=[txt_code]) rg_requires_safety_checker.change(requires_safety_checker_change, inputs=[rg_requires_safety_checker], outputs=[txt_code]) dd_schedulers.change(schedulers_change, inputs=[dd_schedulers], outputs=[txt_code, txt_scheduler]) btn_start_pipeline.click(start_pipeline, inputs=[ dd_models, rg_device, rg_use_safetensors, rg_data_type, rg_variant, rg_safety_checker, rg_requires_safety_checker, dd_schedulers, el_prompt, el_negative_prompt, el_inference_steps, el_manual_seed, el_guidance_scale ], outputs=[tb_result, el_result]) demo.launch()