|
import gradio as gr |
|
import io |
|
import base64 |
|
from flask import Flask, render_template, request, send_file, jsonify |
|
import torch |
|
import json |
|
from PIL import Image |
|
from diffusers import DiffusionPipeline |
|
from diffusers import ( |
|
DDPMScheduler, |
|
DDIMScheduler, |
|
PNDMScheduler, |
|
LMSDiscreteScheduler, |
|
EulerAncestralDiscreteScheduler, |
|
EulerDiscreteScheduler, |
|
DPMSolverMultistepScheduler, |
|
) |
|
import threading |
|
import requests |
|
from flask import Flask, render_template_string |
|
from gradio import Interface |
|
from diffusers import AutoencoderKL |
|
import pandas as pd |
|
import base64 |
|
from config import * |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
initial_config, devices, models, schedulers, code = get_inital_config() |
|
|
|
device = initial_config["device"] |
|
model = initial_config["model"] |
|
scheduler = initial_config["scheduler"] |
|
variant = initial_config["variant"] |
|
allow_tensorfloat32 = initial_config["allow_tensorfloat32"] |
|
use_safetensors = initial_config["use_safetensors"] |
|
data_type = initial_config["data_type"] |
|
safety_checker = initial_config["safety_checker"] |
|
requires_safety_checker = initial_config["requires_safety_checker"] |
|
manual_seed = initial_config["manual_seed"] |
|
inference_steps = initial_config["inference_steps"] |
|
guidance_scale = initial_config["guidance_scale"] |
|
prompt = initial_config["prompt"] |
|
negative_prompt = initial_config["negative_prompt"] |
|
|
|
config_history = [] |
|
|
|
def device_change(device): |
|
|
|
code[code_pos_device] = f'''device = "{device}"''' |
|
|
|
return get_sorted_code() |
|
|
|
def models_change(model, scheduler): |
|
|
|
use_safetensors = False |
|
|
|
|
|
if type(model) != list and model is not None: |
|
|
|
use_safetensors = str(model_configs[model]['use_safetensors']) |
|
|
|
|
|
if scheduler == None: |
|
|
|
scheduler = model_configs[model]['scheduler'] |
|
|
|
code[code_pos_init_pipeline] = f'''pipeline = DiffusionPipeline.from_pretrained( |
|
"{model}", |
|
use_safetensors=use_safetensors, |
|
torch_dtype=data_type, |
|
variant=variant).to(device)''' |
|
|
|
safety_checker_change(safety_checker) |
|
requires_safety_checker_change(requires_safety_checker) |
|
|
|
return get_sorted_code(), use_safetensors, scheduler |
|
|
|
def data_type_change(selected_data_type): |
|
|
|
get_data_type(selected_data_type) |
|
return get_sorted_code() |
|
|
|
def get_data_type(selected_data_type): |
|
|
|
if selected_data_type == "bfloat16": |
|
code[code_pos_data_type] = 'data_type = torch.bfloat16' |
|
data_type = torch.bfloat16 |
|
else: |
|
code[code_pos_data_type] = 'data_type = torch.float16' |
|
data_type = torch.float16 |
|
|
|
return data_type |
|
|
|
def tensorfloat32_change(allow_tensorfloat32): |
|
|
|
get_tensorfloat32(allow_tensorfloat32) |
|
|
|
return get_sorted_code() |
|
|
|
def get_tensorfloat32(allow_tensorfloat32): |
|
|
|
code[code_pos_tf32] = f'torch.backends.cuda.matmul.allow_tf32 = {allow_tensorfloat32}' |
|
|
|
return True if str(allow_tensorfloat32).lower() == 'true' else False |
|
|
|
def variant_change(variant): |
|
|
|
if str(variant) == 'None': |
|
code[code_pos_variant] = f'variant = {variant}' |
|
else: |
|
code[code_pos_variant] = f'variant = "{variant}"' |
|
|
|
return get_sorted_code() |
|
|
|
def safety_checker_change(safety_checker): |
|
|
|
if not safety_checker or str(safety_checker).lower == 'false': |
|
code[code_pos_safety_checker] = f'pipeline.safety_checker = None' |
|
else: |
|
code[code_pos_safety_checker] = '' |
|
|
|
return get_sorted_code() |
|
|
|
def requires_safety_checker_change(requires_safety_checker): |
|
|
|
code[code_pos_requires_safety_checker] = f'pipeline.requires_safety_checker = {requires_safety_checker}' |
|
|
|
return get_sorted_code() |
|
|
|
def schedulers_change(scheduler): |
|
|
|
if type(scheduler) != list and scheduler is not None: |
|
|
|
code[code_pos_scheduler] = f'pipeline.scheduler = {scheduler}.from_config(pipeline.scheduler.config)' |
|
|
|
return get_sorted_code(), scheduler_configs[scheduler] |
|
|
|
else: |
|
|
|
return get_sorted_code(), '' |
|
|
|
def get_scheduler(scheduler, config): |
|
|
|
if scheduler == "DDPMScheduler": |
|
return DDPMScheduler.from_config(config) |
|
elif scheduler == "DDIMScheduler": |
|
return DDIMScheduler.from_config(config) |
|
elif scheduler == "PNDMScheduler": |
|
return PNDMScheduler.from_config(config) |
|
elif scheduler == "LMSDiscreteScheduler": |
|
return LMSDiscreteScheduler.from_config(config) |
|
elif scheduler == "EulerAncestralDiscreteScheduler": |
|
return EulerAncestralDiscreteScheduler.from_config(config) |
|
elif scheduler == "EulerDiscreteScheduler": |
|
return EulerDiscreteScheduler.from_config(config) |
|
elif scheduler == "DPMSolverMultistepScheduler": |
|
return DPMSolverMultistepScheduler.from_config(config) |
|
else: |
|
return DPMSolverMultistepScheduler.from_config(config) |
|
|
|
|
|
def run_inference(model, |
|
device, |
|
use_safetensors, |
|
data_type, |
|
variant, |
|
safety_checker, |
|
requires_safety_checker, |
|
scheduler, |
|
prompt, |
|
negative_prompt, |
|
inference_steps, |
|
manual_seed, |
|
guidance_scale, |
|
progress=gr.Progress(track_tqdm=True)): |
|
|
|
if model != None and scheduler != None: |
|
|
|
progress((1,3), desc="Preparing pipeline initialization...") |
|
|
|
torch.backends.cuda.matmul.allow_tf32 = get_tensorfloat32(allow_tensorfloat32) |
|
|
|
bool_use_safetensors = True if use_safetensors.lower() == 'true' else False |
|
|
|
progress((2,3), desc="Initializing pipeline...") |
|
|
|
pipeline = DiffusionPipeline.from_pretrained( |
|
model, |
|
use_safetensors=bool_use_safetensors, |
|
torch_dtype=get_data_type(data_type), |
|
variant=variant).to(device) |
|
|
|
if safety_checker is None or str(safety_checker).lower == 'false': |
|
pipeline.safety_checker = None |
|
|
|
pipeline.requires_safety_checker = bool(requires_safety_checker) |
|
|
|
pipeline.scheduler = get_scheduler(scheduler, pipeline.scheduler.config) |
|
|
|
|
|
if manual_seed < 0 or manual_seed is None or manual_seed == '': |
|
generator = torch.Generator(device) |
|
else: |
|
generator = torch.manual_seed(42) |
|
|
|
progress((3,3), desc="Creating the result...") |
|
|
|
image = pipeline( |
|
prompt=prompt, |
|
negative_prompt=negative_prompt, |
|
generator=generator, |
|
num_inference_steps=int(inference_steps), |
|
guidance_scale=float(guidance_scale)).images[0] |
|
|
|
|
|
return "Done.", image |
|
|
|
else: |
|
|
|
return "Please select a model AND a scheduler.", None |
|
|
|
def dict_list_to_markdown_table(config_history): |
|
|
|
if not config_history: |
|
return "" |
|
|
|
headers = list(config_history[0].keys()) |
|
markdown_table = "| share | " + " | ".join(headers) + " |\n" |
|
markdown_table += "| --- | " + " | ".join(["---"] * len(headers)) + " |\n" |
|
|
|
for index, config in enumerate(config_history): |
|
encoded_config = base64.b64encode(str(config).encode()).decode() |
|
share_link = f'<a target="_blank" href="?config={encoded_config}">📎</a>' |
|
markdown_table += f"| {share_link} | " + " | ".join(str(config.get(key, "")) for key in headers) + " |\n" |
|
|
|
markdown_table = '<div style="overflow-x: auto;">\n\n' + markdown_table + '</div>' |
|
|
|
return markdown_table |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
in_import_config = gr.Text() |
|
|
|
gr.Markdown('''## Text-2-Image Playground |
|
<small>by <a target="_blank" href="https://www.linkedin.com/in/nickyreinert/">Nicky Reinert</a> | |
|
home base: https://huggingface.co/spaces/n42/pictero |
|
</small>''') |
|
gr.Markdown("### Device specific settings") |
|
with gr.Row(): |
|
in_devices = gr.Dropdown(label="Device:", value=device, choices=devices, filterable=True, multiselect=False, allow_custom_value=True) |
|
in_data_type = gr.Radio(label="Data Type:", value=data_type, choices=["bfloat16", "float16"], info="`blfoat16` is not supported on MPS devices right now; Half-precision weights, will save GPU memory, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16") |
|
in_allow_tensorfloat32 = gr.Radio(label="Allow TensorFloat32:", value=allow_tensorfloat32, choices=[True, False], info="is not supported on MPS devices right now; use TensorFloat-32 is faster, but results in slightly less accurate computations, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16 ") |
|
in_variant = gr.Radio(label="Variant:", value=variant, choices=["fp16", None], info="Use half-precision weights will save GPU memory, not all models support that, see https://huggingface.co/docs/diffusers/main/en/optimization/fp16 ") |
|
|
|
gr.Markdown("### Model specific settings") |
|
with gr.Row(): |
|
in_models = gr.Dropdown(choices=models, label="Model") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
in_use_safetensors = gr.Radio(label="Use safe tensors:", choices=["True", "False"], interactive=False) |
|
with gr.Column(scale=1): |
|
in_safety_checker = gr.Radio(label="Enable safety checker:", value=safety_checker, choices=[True, False]) |
|
in_requires_safety_checker = gr.Radio(label="Requires safety checker:", value=requires_safety_checker, choices=[True, False]) |
|
|
|
gr.Markdown("### Scheduler") |
|
with gr.Row(): |
|
in_schedulers = gr.Dropdown(choices=schedulers, label="Scheduler", info="see https://huggingface.co/docs/diffusers/using-diffusers/loading#schedulers" ) |
|
out_scheduler_description = gr.Textbox(value="", label="Description") |
|
|
|
gr.Markdown("### Adapters") |
|
with gr.Row(): |
|
gr.Markdown('Choose an adapter.') |
|
|
|
gr.Markdown("### Inference settings") |
|
with gr.Row(): |
|
in_prompt = gr.TextArea(label="Prompt", value=prompt) |
|
in_negative_prompt = gr.TextArea(label="Negative prompt", value=negative_prompt) |
|
with gr.Row(): |
|
in_inference_steps = gr.Textbox(label="Inference steps", value=inference_steps) |
|
in_manual_seed = gr.Textbox(label="Manual seed", value=manual_seed, info="Set this to -1 or leave it empty to randomly generate an image. A fixed value will result in a similar image for every run") |
|
in_guidance_scale = gr.Textbox(label="Guidance Scale", value=guidance_scale, info="A low guidance scale leads to a faster inference time, with the drawback that negative prompts don’t have any effect on the denoising process.") |
|
|
|
gr.Markdown("### Output") |
|
with gr.Row(): |
|
btn_start_pipeline = gr.Button(value="Run inferencing") |
|
with gr.Row(): |
|
|
|
out_image = gr.Image() |
|
out_code = gr.Code(get_sorted_code(), label="Code") |
|
with gr.Row(): |
|
out_current_config = gr.Code(value=str(initial_config), label="Current config") |
|
with gr.Row(): |
|
out_config_history = gr.Markdown(dict_list_to_markdown_table(config_history)) |
|
|
|
in_devices.change(device_change, inputs=[in_devices], outputs=[out_code]) |
|
in_data_type.change(data_type_change, inputs=[in_data_type], outputs=[out_code]) |
|
in_allow_tensorfloat32.change(tensorfloat32_change, inputs=[in_allow_tensorfloat32], outputs=[out_code]) |
|
in_variant.change(variant_change, inputs=[in_variant], outputs=[out_code]) |
|
in_models.change(models_change, inputs=[in_models, in_schedulers], outputs=[out_code, in_use_safetensors, in_schedulers]) |
|
in_safety_checker.change(safety_checker_change, inputs=[in_safety_checker], outputs=[out_code]) |
|
in_requires_safety_checker.change(requires_safety_checker_change, inputs=[in_requires_safety_checker], outputs=[out_code]) |
|
in_schedulers.change(schedulers_change, inputs=[in_schedulers], outputs=[out_code, out_scheduler_description]) |
|
btn_start_pipeline.click(run_inference, inputs=[ |
|
in_models, |
|
in_devices, |
|
in_use_safetensors, |
|
in_data_type, |
|
in_variant, |
|
in_safety_checker, |
|
in_requires_safety_checker, |
|
in_schedulers, |
|
in_prompt, |
|
in_negative_prompt, |
|
in_inference_steps, |
|
in_manual_seed, |
|
in_guidance_scale |
|
], outputs=[out_image]) |
|
|
|
demo.load(fn=init_config, inputs=out_current_config, |
|
outputs=[ |
|
in_models, |
|
in_devices, |
|
in_use_safetensors, |
|
in_data_type, |
|
in_variant, |
|
in_safety_checker, |
|
in_requires_safety_checker, |
|
in_schedulers, |
|
in_prompt, |
|
in_negative_prompt, |
|
in_inference_steps, |
|
in_manual_seed, |
|
in_guidance_scale |
|
]) |
|
|
|
demo.launch() |