pictero / app.py
n42's picture
refactoring
a5097b6
raw
history blame
13.8 kB
import gradio as gr
import io
import base64
from flask import Flask, render_template, request, send_file, jsonify
import torch
import json
from PIL import Image
from diffusers import DiffusionPipeline
from diffusers import (
DDPMScheduler,
DDIMScheduler,
PNDMScheduler,
LMSDiscreteScheduler,
EulerAncestralDiscreteScheduler,
EulerDiscreteScheduler,
DPMSolverMultistepScheduler,
)
import threading
import requests
from flask import Flask, render_template_string
from gradio import Interface
from diffusers import AutoencoderKL
import pandas as pd
import base64
from config import *
# get
# - initial configuration,
# - a list of available devices from the config file
# - a list of available models from the config file
# - a list of available schedulers from the config file
# - a dict that contains code to for reproduction
initial_config, devices, models, schedulers, code = get_inital_config()
device = initial_config["device"]
model = initial_config["model"]
scheduler = initial_config["scheduler"]
variant = initial_config["variant"]
allow_tensorfloat32 = initial_config["allow_tensorfloat32"]
use_safetensors = initial_config["use_safetensors"]
data_type = initial_config["data_type"]
safety_checker = initial_config["safety_checker"]
requires_safety_checker = initial_config["requires_safety_checker"]
manual_seed = initial_config["manual_seed"]
inference_steps = initial_config["inference_steps"]
guidance_scale = initial_config["guidance_scale"]
prompt = initial_config["prompt"]
negative_prompt = initial_config["negative_prompt"]
config_history = []
def device_change(device):
code[code_pos_device] = f'''device = "{device}"'''
return get_sorted_code()
def models_change(model, scheduler):
use_safetensors = False
# no model selected (because this is UI init run)
if type(model) != list and model is not None:
use_safetensors = str(model_configs[model]['use_safetensors'])
# if no scheduler is selected, choose the default one for this model
if scheduler == None:
scheduler = model_configs[model]['scheduler']
code[code_pos_init_pipeline] = f'''pipeline = DiffusionPipeline.from_pretrained(
"{model}",
use_safetensors=use_safetensors,
torch_dtype=data_type,
variant=variant).to(device)'''
safety_checker_change(safety_checker)
requires_safety_checker_change(requires_safety_checker)
return get_sorted_code(), use_safetensors, scheduler
def data_type_change(selected_data_type):
get_data_type(selected_data_type)
return get_sorted_code()
def get_data_type(selected_data_type):
if selected_data_type == "bfloat16":
code[code_pos_data_type] = 'data_type = torch.bfloat16'
data_type = torch.bfloat16 # BFloat16 is not supported on MPS as of 01/2024
else:
code[code_pos_data_type] = 'data_type = torch.float16'
data_type = torch.float16 # Half-precision weights, as of https://huggingface.co/docs/diffusers/main/en/optimization/fp16 will save GPU memory
return data_type
def tensorfloat32_change(allow_tensorfloat32):
get_tensorfloat32(allow_tensorfloat32)
return get_sorted_code()
def get_tensorfloat32(allow_tensorfloat32):
code[code_pos_tf32] = f'torch.backends.cuda.matmul.allow_tf32 = {allow_tensorfloat32}'
return True if str(allow_tensorfloat32).lower() == 'true' else False
def variant_change(variant):
if str(variant) == 'None':
code[code_pos_variant] = f'variant = {variant}'
else:
code[code_pos_variant] = f'variant = "{variant}"'
return get_sorted_code()
def safety_checker_change(safety_checker):
if not safety_checker or str(safety_checker).lower == 'false':
code[code_pos_safety_checker] = f'pipeline.safety_checker = None'
else:
code[code_pos_safety_checker] = ''
return get_sorted_code()
def requires_safety_checker_change(requires_safety_checker):
code[code_pos_requires_safety_checker] = f'pipeline.requires_safety_checker = {requires_safety_checker}'
return get_sorted_code()
def schedulers_change(scheduler):
if type(scheduler) != list and scheduler is not None:
code[code_pos_scheduler] = f'pipeline.scheduler = {scheduler}.from_config(pipeline.scheduler.config)'
return get_sorted_code(), scheduler_configs[scheduler]
else:
return get_sorted_code(), ''
def get_scheduler(scheduler, config):
if scheduler == "DDPMScheduler":
return DDPMScheduler.from_config(config)
elif scheduler == "DDIMScheduler":
return DDIMScheduler.from_config(config)
elif scheduler == "PNDMScheduler":
return PNDMScheduler.from_config(config)
elif scheduler == "LMSDiscreteScheduler":
return LMSDiscreteScheduler.from_config(config)
elif scheduler == "EulerAncestralDiscreteScheduler":
return EulerAncestralDiscreteScheduler.from_config(config)
elif scheduler == "EulerDiscreteScheduler":
return EulerDiscreteScheduler.from_config(config)
elif scheduler == "DPMSolverMultistepScheduler":
return DPMSolverMultistepScheduler.from_config(config)
else:
return DPMSolverMultistepScheduler.from_config(config)
# pipeline
def run_inference(model,
device,
use_safetensors,
data_type,
variant,
safety_checker,
requires_safety_checker,
scheduler,
prompt,
negative_prompt,
inference_steps,
manual_seed,
guidance_scale,
progress=gr.Progress(track_tqdm=True)):
if model != None and scheduler != None:
progress((1,3), desc="Preparing pipeline initialization...")
torch.backends.cuda.matmul.allow_tf32 = get_tensorfloat32(allow_tensorfloat32) # Use TensorFloat-32 as of https://huggingface.co/docs/diffusers/main/en/optimization/fp16 faster, but slightly less accurate computations
bool_use_safetensors = True if use_safetensors.lower() == 'true' else False
progress((2,3), desc="Initializing pipeline...")
pipeline = DiffusionPipeline.from_pretrained(
model,
use_safetensors=bool_use_safetensors,
torch_dtype=get_data_type(data_type),
variant=variant).to(device)
if safety_checker is None or str(safety_checker).lower == 'false':
pipeline.safety_checker = None
pipeline.requires_safety_checker = bool(requires_safety_checker)
pipeline.scheduler = get_scheduler(scheduler, pipeline.scheduler.config)
if manual_seed < 0 or manual_seed is None or manual_seed == '':
generator = torch.Generator(device)
else:
generator = torch.manual_seed(42)
progress((3,3), desc="Creating the result...")
image = pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
generator=generator,
num_inference_steps=int(inference_steps),
guidance_scale=float(guidance_scale)).images[0]
return "Done.", image
else:
return "Please select a model AND a scheduler.", None
def dict_list_to_markdown_table(config_history):
if not config_history:
return ""
headers = list(config_history[0].keys())
markdown_table = "| share | " + " | ".join(headers) + " |\n"
markdown_table += "| --- | " + " | ".join(["---"] * len(headers)) + " |\n"
for index, config in enumerate(config_history):
encoded_config = base64.b64encode(str(config).encode()).decode()
share_link = f'<a target="_blank" href="?config={encoded_config}">📎</a>'
markdown_table += f"| {share_link} | " + " | ".join(str(config.get(key, "")) for key in headers) + " |\n"
markdown_table = '<div style="overflow-x: auto;">\n\n' + markdown_table + '</div>'
return markdown_table
# interface
with gr.Blocks() as demo:
in_import_config = gr.Text()
gr.Markdown('''## Text-2-Image Playground
<small>by <a target="_blank" href="https://www.linkedin.com/in/nickyreinert/">Nicky Reinert</a> |
home base: https://huggingface.co/spaces/n42/pictero
</small>''')
gr.Markdown("### Device specific settings")
with gr.Row():
in_devices = gr.Dropdown(label="Device:", value=device, choices=devices, filterable=True, multiselect=False, allow_custom_value=True)
in_data_type = gr.Radio(label="Data Type:", value=data_type, choices=["bfloat16", "float16"], info="`blfoat16` is not supported on MPS devices right now; Half-precision weights, will save GPU memory, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16")
in_allow_tensorfloat32 = gr.Radio(label="Allow TensorFloat32:", value=allow_tensorfloat32, choices=[True, False], info="is not supported on MPS devices right now; use TensorFloat-32 is faster, but results in slightly less accurate computations, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16 ")
in_variant = gr.Radio(label="Variant:", value=variant, choices=["fp16", None], info="Use half-precision weights will save GPU memory, not all models support that, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16 ")
gr.Markdown("### Model specific settings")
with gr.Row():
in_models = gr.Dropdown(choices=models, label="Model")
with gr.Row():
with gr.Column(scale=1):
in_use_safetensors = gr.Radio(label="Use safe tensors:", choices=["True", "False"], interactive=False)
with gr.Column(scale=1):
in_safety_checker = gr.Radio(label="Enable safety checker:", value=safety_checker, choices=[True, False])
in_requires_safety_checker = gr.Radio(label="Requires safety checker:", value=requires_safety_checker, choices=[True, False])
gr.Markdown("### Scheduler")
with gr.Row():
in_schedulers = gr.Dropdown(choices=schedulers, label="Scheduler", info="see https://huggingface.co/docs/diffusers/using-diffusers/loading#schedulers" )
out_scheduler_description = gr.Textbox(value="", label="Description")
gr.Markdown("### Adapters")
with gr.Row():
gr.Markdown('Choose an adapter.')
gr.Markdown("### Inference settings")
with gr.Row():
in_prompt = gr.TextArea(label="Prompt", value=prompt)
in_negative_prompt = gr.TextArea(label="Negative prompt", value=negative_prompt)
with gr.Row():
in_inference_steps = gr.Textbox(label="Inference steps", value=inference_steps)
in_manual_seed = gr.Textbox(label="Manual seed", value=manual_seed, info="Set this to -1 or leave it empty to randomly generate an image. A fixed value will result in a similar image for every run")
in_guidance_scale = gr.Textbox(label="Guidance Scale", value=guidance_scale, info="A low guidance scale leads to a faster inference time, with the drawback that negative prompts don’t have any effect on the denoising process.")
gr.Markdown("### Output")
with gr.Row():
btn_start_pipeline = gr.Button(value="Run inferencing")
with gr.Row():
# out_result = gr.Textbox(label="Status", value="")
out_image = gr.Image()
out_code = gr.Code(get_sorted_code(), label="Code")
with gr.Row():
out_current_config = gr.Code(value=str(initial_config), label="Current config")
with gr.Row():
out_config_history = gr.Markdown(dict_list_to_markdown_table(config_history))
in_devices.change(device_change, inputs=[in_devices], outputs=[out_code])
in_data_type.change(data_type_change, inputs=[in_data_type], outputs=[out_code])
in_allow_tensorfloat32.change(tensorfloat32_change, inputs=[in_allow_tensorfloat32], outputs=[out_code])
in_variant.change(variant_change, inputs=[in_variant], outputs=[out_code])
in_models.change(models_change, inputs=[in_models, in_schedulers], outputs=[out_code, in_use_safetensors, in_schedulers])
in_safety_checker.change(safety_checker_change, inputs=[in_safety_checker], outputs=[out_code])
in_requires_safety_checker.change(requires_safety_checker_change, inputs=[in_requires_safety_checker], outputs=[out_code])
in_schedulers.change(schedulers_change, inputs=[in_schedulers], outputs=[out_code, out_scheduler_description])
btn_start_pipeline.click(run_inference, inputs=[
in_models,
in_devices,
in_use_safetensors,
in_data_type,
in_variant,
in_safety_checker,
in_requires_safety_checker,
in_schedulers,
in_prompt,
in_negative_prompt,
in_inference_steps,
in_manual_seed,
in_guidance_scale
], outputs=[out_image])
demo.load(fn=init_config, inputs=out_current_config,
outputs=[
in_models,
in_devices,
in_use_safetensors,
in_data_type,
in_variant,
in_safety_checker,
in_requires_safety_checker,
in_schedulers,
in_prompt,
in_negative_prompt,
in_inference_steps,
in_manual_seed,
in_guidance_scale
])
demo.launch()