File size: 5,166 Bytes
4b9e459
6145eb5
 
 
 
edb8520
6145eb5
 
 
 
 
 
 
 
 
 
 
96b5a68
 
 
0c3b446
 
 
edb8520
 
0c3b446
 
 
edb8520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c3b446
edb8520
6145eb5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edb8520
 
 
 
 
6145eb5
edb8520
 
6145eb5
 
 
 
 
edb8520
6145eb5
 
 
edb8520
 
6145eb5
 
 
 
 
edb8520
6145eb5
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
import torch
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler, DPMSolverSinglestepScheduler
from diffusers.pipelines import StableDiffusionInpaintPipeline
from huggingface_hub import hf_hub_download
import numpy as np
import random
from PIL import Image
import io

app = FastAPI()

MAX_SEED = np.iinfo(np.int32).max
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load HF token from environment variable
HF_TOKEN = os.getenv("HF_TOKEN")

# Dictionary to store loaded pipelines
loaded_pipelines = {}

# Function to load pipeline dynamically
def load_pipeline(model_name: str):
    if model_name in loaded_pipelines:
        return loaded_pipelines[model_name]

    if model_name == "Fluently XL Final":
        pipe = StableDiffusionXLPipeline.from_single_file(
            hf_hub_download(repo_id="fluently/Fluently-XL-Final", filename="FluentlyXL-Final.safetensors", token=HF_TOKEN),
            torch_dtype=torch.float16,
            use_safetensors=True,
        )
        pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
    elif model_name == "Fluently Anime":
        pipe = StableDiffusionPipeline.from_pretrained(
            "fluently/Fluently-anime",
            torch_dtype=torch.float16,
            use_safetensors=True,
        )
        pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
    elif model_name == "Fluently Epic":
        pipe = StableDiffusionPipeline.from_pretrained(
            "fluently/Fluently-epic",
            torch_dtype=torch.float16,
            use_safetensors=True,
        )
        pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
    elif model_name == "Fluently XL v4":
        pipe = StableDiffusionXLPipeline.from_pretrained(
            "fluently/Fluently-XL-v4",
            torch_dtype=torch.float16,
            use_safetensors=True,
        )
        pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)
    elif model_name == "Fluently XL v3 Lightning":
        pipe = StableDiffusionXLPipeline.from_pretrained(
            "fluently/Fluently-XL-v3-lightning",
            torch_dtype=torch.float16,
            use_safetensors=True,
        )
        pipe.scheduler = DPMSolverSinglestepScheduler.from_config(pipe.scheduler.config, use_karras_sigmas=False, timestep_spacing="trailing", lower_order_final=True)
    elif model_name == "Fluently v4 inpaint":
        pipe = StableDiffusionInpaintPipeline.from_pretrained(
            "fluently/Fluently-v4-inpainting",
            torch_dtype=torch.float16,
            use_safetensors=True,
        )
    else:
        raise ValueError(f"Unknown model: {model_name}")
    
    pipe.to(device)
    loaded_pipelines[model_name] = pipe
    return pipe

def randomize_seed_fn(seed: int, randomize_seed: bool) -> int:
    if randomize_seed:
        seed = random.randint(0, MAX_SEED)
    return seed

@app.post("/generate")
async def generate(
    model: str = Form(...),
    prompt: str = Form(...),
    negative_prompt: str = Form(""),
    use_negative_prompt: bool = Form(False),
    seed: int = Form(0),
    width: int = Form(1024),
    height: int = Form(1024),
    guidance_scale: float = Form(3),
    randomize_seed: bool = Form(False),
    inpaint_image: UploadFile = File(None),
    mask_image: UploadFile = File(None),
    blur_factor: float = Form(1.0),
    strength: float = Form(0.75)
):
    seed = int(randomize_seed_fn(seed, randomize_seed))

    if not use_negative_prompt:
        negative_prompt = ""

    inpaint_image_pil = Image.open(io.BytesIO(await inpaint_image.read())) if inpaint_image else None
    mask_image_pil = Image.open(io.BytesIO(await mask_image.read())) if mask_image else None

    pipe = load_pipeline(model)

    if model in ["Fluently v4 inpaint"]:
        blurred_mask = pipe.mask_processor.blur(mask_image_pil, blur_factor=blur_factor)
        images = pipe(
            prompt=prompt,
            image=inpaint_image_pil,
            mask_image=blurred_mask,
            negative_prompt=negative_prompt,
            width=width,
            height=height,
            guidance_scale=guidance_scale,
            num_inference_steps=30,
            strength=strength,
            num_images_per_prompt=1,
            output_type="pil",
        ).images
    else:
        images = pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            width=width,
            height=height,
            guidance_scale=guidance_scale,
            num_inference_steps=25 if model == "Fluently XL Final" else 30,
            num_images_per_prompt=1,
            output_type="pil",
        ).images

    img = images[0]
    img_byte_arr = io.BytesIO()
    img.save(img_byte_arr, format='PNG')
    img_byte_arr.seek(0)

    return StreamingResponse(img_byte_arr, media_type="image/png")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)