Switti-1024 / models /pipeline.py
realantonvoronov
update for 1024
1dc27f0
raw
history blame
9.74 kB
import torch
from torchvision.transforms import ToPILImage
from PIL.Image import Image as PILImage
from models.vqvae import VQVAEHF
from models.clip import FrozenCLIPEmbedder
from models.switti import SwittiHF, get_crop_condition
from models.helpers import sample_with_top_k_top_p_, gumbel_softmax_with_rng
TRAIN_IMAGE_SIZE = (512, 512)
class SwittiPipeline:
vae_path = "yresearch/VQVAE-Switti"
text_encoder_path = "openai/clip-vit-large-patch14"
text_encoder_2_path = "laion/CLIP-ViT-bigG-14-laion2B-39B-b160k"
def __init__(self, switti, vae, text_encoder, text_encoder_2,
device, dtype=torch.float32,
):
self.switti = switti.to(dtype)
self.vae = vae.to(dtype)
self.text_encoder = text_encoder.to(dtype)
self.text_encoder_2 = text_encoder_2.to(dtype)
self.switti.eval()
self.vae.eval()
self.device = device
@classmethod
def from_pretrained(cls,
pretrained_model_name_or_path,
torch_dtype=torch.bfloat16,
device="cuda",
reso=1024,
):
switti = SwittiHF.from_pretrained(pretrained_model_name_or_path).to(device)
vae = VQVAEHF.from_pretrained(cls.vae_path, reso=reso).to(device)
text_encoder = FrozenCLIPEmbedder(cls.text_encoder_path, device=device)
text_encoder_2 = FrozenCLIPEmbedder(cls.text_encoder_2_path, device=device)
return cls(switti, vae, text_encoder, text_encoder_2, device, torch_dtype)
@staticmethod
def to_image(tensor):
return [ToPILImage()(
(255 * img.cpu().detach()).to(torch.uint8))
for img in tensor]
def _encode_prompt(self, prompt: str | list[str]):
prompt = [prompt] if isinstance(prompt, str) else prompt
encodings = [
self.text_encoder.encode(prompt),
self.text_encoder_2.encode(prompt),
]
prompt_embeds = torch.concat(
[encoding.last_hidden_state for encoding in encodings], dim=-1
)
pooled_prompt_embeds = encodings[-1].pooler_output
attn_bias = encodings[-1].attn_bias
return prompt_embeds, pooled_prompt_embeds, attn_bias
def encode_prompt(
self,
prompt: str | list[str],
null_prompt: str = "",
encode_null: bool = True,
):
prompt_embeds, pooled_prompt_embeds, attn_bias = self._encode_prompt(prompt)
if encode_null:
B, L, hidden_dim = prompt_embeds.shape
pooled_dim = pooled_prompt_embeds.shape[1]
null_embeds, null_pooled_embeds, null_attn_bias = self._encode_prompt(null_prompt)
null_embeds = null_embeds[:, :L].expand(B, L, hidden_dim).to(prompt_embeds.device)
null_pooled_embeds = null_pooled_embeds.expand(B, pooled_dim).to(pooled_prompt_embeds.device)
null_attn_bias = null_attn_bias[:, :L].expand(B, L).to(attn_bias.device)
prompt_embeds = torch.cat([prompt_embeds, null_embeds], dim=0)
pooled_prompt_embeds = torch.cat([pooled_prompt_embeds, null_pooled_embeds], dim=0)
attn_bias = torch.cat([attn_bias, null_attn_bias], dim=0)
return prompt_embeds, pooled_prompt_embeds, attn_bias
@torch.inference_mode()
def __call__(
self,
prompt: str | list[str],
null_prompt: str = "",
seed: int | None = None,
cfg: float = 6.,
top_k: int = 400,
top_p: float = 0.95,
more_smooth: bool = False,
return_pil: bool = True,
smooth_start_si: int = 0,
turn_off_cfg_start_si: int = 10,
turn_on_cfg_start_si: int = 0,
last_scale_temp: None | float = None,
) -> torch.Tensor | list[PILImage]:
"""
only used for inference, on autoregressive mode
:param prompt: text prompt to generate an image
:param null_prompt: negative prompt for CFG
:param seed: random seed
:param cfg: classifier-free guidance ratio
:param top_k: top-k sampling
:param top_p: top-p sampling
:param more_smooth: sampling using gumbel softmax; only used in visualization, not used in FID/IS benchmarking
:return: if return_pil: list of PIL Images, else: torch.tensor (B, 3, H, W) in [0, 1]
"""
assert not self.switti.training
switti = self.switti
vae = self.vae
vae_quant = self.vae.quantize
if seed is None:
rng = None
else:
rng = torch.Generator(self.device).manual_seed(seed)
context, cond_vector, context_attn_bias = self.encode_prompt(prompt, null_prompt)
B = context.shape[0] // 2
cond_vector = switti.text_pooler(cond_vector)
if switti.use_crop_cond:
crop_coords = get_crop_condition(2 * B * [TRAIN_IMAGE_SIZE[0]],
2 * B * [TRAIN_IMAGE_SIZE[1]],
).to(cond_vector.device)
crop_embed = switti.crop_embed(crop_coords.view(-1)).reshape(2 * B, switti.D)
crop_cond = switti.crop_proj(crop_embed)
else:
crop_cond = None
sos = cond_BD = cond_vector
lvl_pos = switti.lvl_embed(switti.lvl_1L)
if not switti.rope:
lvl_pos += switti.pos_1LC
next_token_map = (
sos.unsqueeze(1)
+ switti.pos_start.expand(2 * B, switti.first_l, -1)
+ lvl_pos[:, : switti.first_l]
)
cur_L = 0
f_hat = sos.new_zeros(B, switti.Cvae, switti.patch_nums[-1], switti.patch_nums[-1])
for b in switti.blocks:
b.attn.kv_caching(switti.use_ar) # Use KV caching if switti is in the AR mode
b.cross_attn.kv_caching(True)
for si, pn in enumerate(switti.patch_nums): # si: i-th segment
ratio = si / switti.num_stages_minus_1
x_BLC = next_token_map
if switti.rope:
freqs_cis = switti.freqs_cis[:, cur_L : cur_L + pn * pn]
else:
freqs_cis = switti.freqs_cis
if si >= turn_off_cfg_start_si:
apply_smooth = False
x_BLC = x_BLC[:B]
context = context[:B]
context_attn_bias = context_attn_bias[:B]
freqs_cis = freqs_cis[:B]
cond_BD = cond_BD[:B]
if crop_cond is not None:
crop_cond = crop_cond[:B]
for b in switti.blocks:
if b.attn.caching and b.attn.cached_k is not None:
b.attn.cached_k = b.attn.cached_k[:B]
b.attn.cached_v = b.attn.cached_v[:B]
if b.cross_attn.caching and b.cross_attn.cached_k is not None:
b.cross_attn.cached_k = b.cross_attn.cached_k[:B]
b.cross_attn.cached_v = b.cross_attn.cached_v[:B]
else:
apply_smooth = more_smooth
for block in switti.blocks:
x_BLC = block(
x=x_BLC,
cond_BD=cond_BD,
attn_bias=None,
context=context,
context_attn_bias=context_attn_bias,
freqs_cis=freqs_cis,
crop_cond=crop_cond,
)
cur_L += pn * pn
logits_BlV = switti.get_logits(x_BLC, cond_BD)
# Guidance
if si < turn_on_cfg_start_si:
# t = 0, i. e. no guidance
logits_BlV = logits_BlV[:B]
elif si >= turn_on_cfg_start_si and si < turn_off_cfg_start_si:
# default const cfg
t = cfg
logits_BlV = (1 + t) * logits_BlV[:B] - t * logits_BlV[B:]
elif last_scale_temp is not None:
logits_BlV = logits_BlV / last_scale_temp
if apply_smooth and si >= smooth_start_si:
# not used when evaluating FID/IS/Precision/Recall
gum_t = max(0.27 * (1 - ratio * 0.95), 0.005) # refer to mask-git
idx_Bl = gumbel_softmax_with_rng(
logits_BlV.mul(1 + ratio), tau=gum_t, hard=False, dim=-1, rng=rng,
)
h_BChw = idx_Bl @ vae_quant.embedding.weight.unsqueeze(0)
else:
# default nucleus sampling
idx_Bl = sample_with_top_k_top_p_(
logits_BlV, rng=rng, top_k=top_k, top_p=top_p, num_samples=1,
)[:, :, 0]
h_BChw = vae_quant.embedding(idx_Bl)
h_BChw = h_BChw.transpose_(1, 2).reshape(B, switti.Cvae, pn, pn)
f_hat, next_token_map = vae_quant.get_next_autoregressive_input(
si, len(switti.patch_nums), f_hat, h_BChw,
)
if si != switti.num_stages_minus_1: # prepare for next stage
next_token_map = next_token_map.view(B, switti.Cvae, -1).transpose(1, 2)
next_token_map = (
switti.word_embed(next_token_map)
+ lvl_pos[:, cur_L : cur_L + switti.patch_nums[si + 1] ** 2]
)
# double the batch sizes due to CFG
next_token_map = next_token_map.repeat(2, 1, 1)
for b in switti.blocks:
b.attn.kv_caching(False)
b.cross_attn.kv_caching(False)
# de-normalize, from [-1, 1] to [0, 1]
img = vae.fhat_to_img(f_hat).add(1).mul(0.5)
if return_pil:
img = self.to_image(img)
return img