File size: 5,171 Bytes
314064f 0821367 314064f 9d02d34 41ca444 314064f 1898dc9 314064f 157feb0 0821367 40e337e 0821367 314064f 0821367 9d02d34 0821367 9d02d34 0821367 9d02d34 0821367 314064f 0821367 314064f 0821367 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
from typing import Dict, List, Any
import torch
from diffusers import StableDiffusionXLImg2ImgPipeline, DiffusionPipeline, AutoencoderKL, DPMSolverMultistepScheduler, DDIMScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image, StableDiffusionControlNetInpaintPipeline, ControlNetModel
from PIL import Image
import base64
from io import BytesIO
from diffusers.image_processor import VaeImageProcessor
import numpy as np
# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type != 'cuda':
raise ValueError("need to run on GPU")
class EndpointHandler():
def __init__(self, path=""):
self.smooth_pipe = StableDiffusionXLImg2ImgPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16
)
self.smooth_pipe.to("cuda")
self.vae = AutoencoderKL.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0",
subfolder="vae", use_safetensors=True,
).to("cuda")
self.smooth_pipe.enable_model_cpu_offload()
self.smooth_pipe.enable_xformers_memory_efficient_attention()
self.controlnet = ControlNetModel.from_pretrained(
"lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16
)
self.pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5", controlnet=self.controlnet, torch_dtype=torch.float16
)
self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config)
self.pipe.enable_model_cpu_offload()
self.pipe.enable_xformers_memory_efficient_attention()
def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
"""
:param data: A dictionary contains `inputs` and optional `image` field.
:return: A dictionary with `image` field contains image in base64.
"""
method = data.pop("method", "rasterize")
if(method == "rasterize"):
encoded_image = data.pop("image", None)
prompt = data.pop("prompt", "")
num_inference_steps = data.pop("num_inference_steps", 50)
if encoded_image is not None:
image = self.decode_base64_image(encoded_image).convert('RGB')
image_processor = VaeImageProcessor();
latents = image_processor.preprocess(image)
latents = latents.to(device="cuda")
with torch.no_grad():
latents_dist = self.vae.encode(latents).latent_dist.sample() * self.vae.config.scaling_factor
self.smooth_pipe.enable_xformers_memory_efficient_attention()
out = self.smooth_pipe(prompt, image=latents_dist, num_inference_steps=num_inference_steps).images
return out
else:
encoded_image = data.pop("image", None)
encoded_mask_image = data.pop("mask_image", None)
prompt = data.pop("prompt", "")
negative_prompt = data.pop("negative_prompt", "")
method = data.pop("method", "slow")
strength = data.pop("strength", 0.2)
guidance_scale = data.pop("guidance_scale", 8.0)
num_inference_steps = data.pop("num_inference_steps", 20)
# process image
if encoded_image is not None and encoded_mask_image is not None:
image = self.decode_base64_image(encoded_image).convert("RGB")
mask_image = self.decode_base64_image(encoded_mask_image).convert("RGB")
else:
image = None
mask_image = None
control_image = self.make_inpaint_condition(image, mask_image)
# generate image
image = self.pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
eta=1.0,
image=image,
mask_image=mask_image,
control_image=control_image,
guidance_scale=guidance_scale,
strength=strength
).images[0]
return image
# helper to decode input image
def decode_base64_image(self, image_string):
base64_image = base64.b64decode(image_string)
buffer = BytesIO(base64_image)
image = Image.open(buffer)
return image
def make_inpaint_condition(self, image, image_mask):
image = np.array(image.convert("RGB")).astype(np.float32) / 255.0
image_mask = np.array(image_mask.convert("L")).astype(np.float32) / 255.0
assert image.shape[0:1] == image_mask.shape[0:1], "image and image_mask must have the same image size"
image[image_mask > 0.5] = -1.0 # set as masked pixel
image = np.expand_dims(image, 0).transpose(0, 3, 1, 2)
image = torch.from_numpy(image)
return image
|