File size: 5,171 Bytes
314064f
 
0821367
314064f
 
 
9d02d34
41ca444
314064f
 
 
 
 
 
 
 
 
 
 
 
1898dc9
314064f
 
157feb0
 
 
 
0821367
 
40e337e
0821367
 
 
 
 
 
 
 
 
 
 
 
 
314064f
 
 
 
 
 
 
 
 
0821367
9d02d34
0821367
 
 
9d02d34
0821367
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d02d34
0821367
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314064f
0821367
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314064f
 
 
 
 
 
 
0821367
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from typing import  Dict, List, Any
import torch
from diffusers import StableDiffusionXLImg2ImgPipeline, DiffusionPipeline, AutoencoderKL, DPMSolverMultistepScheduler, DDIMScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image, StableDiffusionControlNetInpaintPipeline, ControlNetModel
from PIL import Image
import base64
from io import BytesIO
from diffusers.image_processor import VaeImageProcessor
import numpy as np


# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if device.type != 'cuda':
    raise ValueError("need to run on GPU")

class EndpointHandler():
    def __init__(self, path=""):

        self.smooth_pipe = StableDiffusionXLImg2ImgPipeline.from_pretrained(
          "stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16
        )
        self.smooth_pipe.to("cuda")

        self.vae = AutoencoderKL.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0",
                                                 subfolder="vae",  use_safetensors=True,
                                                 ).to("cuda")

        self.smooth_pipe.enable_model_cpu_offload()
        self.smooth_pipe.enable_xformers_memory_efficient_attention()


        self.controlnet = ControlNetModel.from_pretrained(
            "lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16
        )

        self.pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained(
            "runwayml/stable-diffusion-v1-5", controlnet=self.controlnet, torch_dtype=torch.float16
        )

        self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config)
        self.pipe.enable_model_cpu_offload()
        self.pipe.enable_xformers_memory_efficient_attention()
        


    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        :param data: A dictionary contains `inputs` and optional `image` field.
        :return: A dictionary with `image` field contains image in base64.
        """

        method = data.pop("method", "rasterize")

        if(method == "rasterize"):
        
            encoded_image = data.pop("image", None)
            
            prompt = data.pop("prompt", "")
            num_inference_steps = data.pop("num_inference_steps", 50)
    
            if encoded_image is not None:
                image = self.decode_base64_image(encoded_image).convert('RGB')
    
                image_processor = VaeImageProcessor();
                latents = image_processor.preprocess(image)
                latents = latents.to(device="cuda")
                
                with torch.no_grad():
                    latents_dist = self.vae.encode(latents).latent_dist.sample() * self.vae.config.scaling_factor
                
                self.smooth_pipe.enable_xformers_memory_efficient_attention()
                out = self.smooth_pipe(prompt, image=latents_dist, num_inference_steps=num_inference_steps).images
    
                return out
        else:
            encoded_image = data.pop("image", None)
            encoded_mask_image = data.pop("mask_image", None)
            
            prompt = data.pop("prompt", "")
    
            negative_prompt = data.pop("negative_prompt", "")
    
            method = data.pop("method", "slow")
            strength = data.pop("strength", 0.2)
            guidance_scale = data.pop("guidance_scale", 8.0)
            num_inference_steps = data.pop("num_inference_steps", 20)

            # process image
            if encoded_image is not None and encoded_mask_image is not None:
                image = self.decode_base64_image(encoded_image).convert("RGB")
                mask_image = self.decode_base64_image(encoded_mask_image).convert("RGB")
            else:
                image = None
                mask_image = None

            control_image = self.make_inpaint_condition(image, mask_image)

            # generate image
            image = self.pipe(
                prompt=prompt,
                negative_prompt=negative_prompt,
                num_inference_steps=num_inference_steps,
                eta=1.0,
                image=image,
                mask_image=mask_image,
                control_image=control_image,
                guidance_scale=guidance_scale,
                strength=strength
            ).images[0]
    
            return image
    
    # helper to decode input image
    def decode_base64_image(self, image_string):
        base64_image = base64.b64decode(image_string)
        buffer = BytesIO(base64_image)
        image = Image.open(buffer)
        return image
        
    def make_inpaint_condition(self, image, image_mask):
        image = np.array(image.convert("RGB")).astype(np.float32) / 255.0
        image_mask = np.array(image_mask.convert("L")).astype(np.float32) / 255.0
    
        assert image.shape[0:1] == image_mask.shape[0:1], "image and image_mask must have the same image size"
        image[image_mask > 0.5] = -1.0  # set as masked pixel
        image = np.expand_dims(image, 0).transpose(0, 3, 1, 2)
        image = torch.from_numpy(image)
        return image