|
import torch |
|
import argparse |
|
import random |
|
import re |
|
from typing import List, Optional, Union |
|
|
|
|
|
def prepare_scheduler_for_custom_training(noise_scheduler, device): |
|
if hasattr(noise_scheduler, "all_snr"): |
|
return |
|
|
|
alphas_cumprod = noise_scheduler.alphas_cumprod |
|
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod) |
|
sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod) |
|
alpha = sqrt_alphas_cumprod |
|
sigma = sqrt_one_minus_alphas_cumprod |
|
all_snr = (alpha / sigma) ** 2 |
|
|
|
noise_scheduler.all_snr = all_snr.to(device) |
|
|
|
|
|
def fix_noise_scheduler_betas_for_zero_terminal_snr(noise_scheduler): |
|
|
|
print(f"fix noise scheduler betas: https://arxiv.org/abs/2305.08891") |
|
|
|
def enforce_zero_terminal_snr(betas): |
|
|
|
alphas = 1 - betas |
|
alphas_bar = alphas.cumprod(0) |
|
alphas_bar_sqrt = alphas_bar.sqrt() |
|
|
|
|
|
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() |
|
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() |
|
|
|
alphas_bar_sqrt -= alphas_bar_sqrt_T |
|
|
|
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) |
|
|
|
|
|
alphas_bar = alphas_bar_sqrt**2 |
|
alphas = alphas_bar[1:] / alphas_bar[:-1] |
|
alphas = torch.cat([alphas_bar[0:1], alphas]) |
|
betas = 1 - alphas |
|
return betas |
|
|
|
betas = noise_scheduler.betas |
|
betas = enforce_zero_terminal_snr(betas) |
|
alphas = 1.0 - betas |
|
alphas_cumprod = torch.cumprod(alphas, dim=0) |
|
|
|
|
|
|
|
|
|
noise_scheduler.betas = betas |
|
noise_scheduler.alphas = alphas |
|
noise_scheduler.alphas_cumprod = alphas_cumprod |
|
|
|
|
|
def apply_snr_weight(loss, timesteps, noise_scheduler, gamma, v_prediction=False): |
|
snr = torch.stack([noise_scheduler.all_snr[t] for t in timesteps]) |
|
min_snr_gamma = torch.minimum(snr, torch.full_like(snr, gamma)) |
|
if v_prediction: |
|
snr_weight = torch.div(min_snr_gamma, snr+1).float().to(loss.device) |
|
else: |
|
snr_weight = torch.div(min_snr_gamma, snr).float().to(loss.device) |
|
loss = loss * snr_weight |
|
return loss |
|
|
|
|
|
def scale_v_prediction_loss_like_noise_prediction(loss, timesteps, noise_scheduler): |
|
scale = get_snr_scale(timesteps, noise_scheduler) |
|
loss = loss * scale |
|
return loss |
|
|
|
|
|
def get_snr_scale(timesteps, noise_scheduler): |
|
snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps]) |
|
snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000) |
|
scale = snr_t / (snr_t + 1) |
|
|
|
|
|
return scale |
|
|
|
|
|
def add_v_prediction_like_loss(loss, timesteps, noise_scheduler, v_pred_like_loss): |
|
scale = get_snr_scale(timesteps, noise_scheduler) |
|
|
|
loss = loss + loss / scale * v_pred_like_loss |
|
return loss |
|
|
|
def apply_debiased_estimation(loss, timesteps, noise_scheduler): |
|
snr_t = torch.stack([noise_scheduler.all_snr[t] for t in timesteps]) |
|
snr_t = torch.minimum(snr_t, torch.ones_like(snr_t) * 1000) |
|
weight = 1/torch.sqrt(snr_t) |
|
loss = weight * loss |
|
return loss |
|
|
|
|
|
|
|
|
|
def add_custom_train_arguments(parser: argparse.ArgumentParser, support_weighted_captions: bool = True): |
|
parser.add_argument( |
|
"--min_snr_gamma", |
|
type=float, |
|
default=None, |
|
help="gamma for reducing the weight of high loss timesteps. Lower numbers have stronger effect. 5 is recommended by paper. / 低いタイムステップでの高いlossに対して重みを減らすためのgamma値、低いほど効果が強く、論文では5が推奨", |
|
) |
|
parser.add_argument( |
|
"--scale_v_pred_loss_like_noise_pred", |
|
action="store_true", |
|
help="scale v-prediction loss like noise prediction loss / v-prediction lossをnoise prediction lossと同じようにスケーリングする", |
|
) |
|
parser.add_argument( |
|
"--v_pred_like_loss", |
|
type=float, |
|
default=None, |
|
help="add v-prediction like loss multiplied by this value / v-prediction lossをこの値をかけたものをlossに加算する", |
|
) |
|
parser.add_argument( |
|
"--debiased_estimation_loss", |
|
action="store_true", |
|
help="debiased estimation loss / debiased estimation loss", |
|
) |
|
if support_weighted_captions: |
|
parser.add_argument( |
|
"--weighted_captions", |
|
action="store_true", |
|
default=False, |
|
help="Enable weighted captions in the standard style (token:1.3). No commas inside parens, or shuffle/dropout may break the decoder. / 「[token]」、「(token)」「(token:1.3)」のような重み付きキャプションを有効にする。カンマを括弧内に入れるとシャッフルやdropoutで重みづけがおかしくなるので注意", |
|
) |
|
|
|
|
|
re_attention = re.compile( |
|
r""" |
|
\\\(| |
|
\\\)| |
|
\\\[| |
|
\\]| |
|
\\\\| |
|
\\| |
|
\(| |
|
\[| |
|
:([+-]?[.\d]+)\)| |
|
\)| |
|
]| |
|
[^\\()\[\]:]+| |
|
: |
|
""", |
|
re.X, |
|
) |
|
|
|
|
|
def parse_prompt_attention(text): |
|
""" |
|
Parses a string with attention tokens and returns a list of pairs: text and its associated weight. |
|
Accepted tokens are: |
|
(abc) - increases attention to abc by a multiplier of 1.1 |
|
(abc:3.12) - increases attention to abc by a multiplier of 3.12 |
|
[abc] - decreases attention to abc by a multiplier of 1.1 |
|
\( - literal character '(' |
|
\[ - literal character '[' |
|
\) - literal character ')' |
|
\] - literal character ']' |
|
\\ - literal character '\' |
|
anything else - just text |
|
>>> parse_prompt_attention('normal text') |
|
[['normal text', 1.0]] |
|
>>> parse_prompt_attention('an (important) word') |
|
[['an ', 1.0], ['important', 1.1], [' word', 1.0]] |
|
>>> parse_prompt_attention('(unbalanced') |
|
[['unbalanced', 1.1]] |
|
>>> parse_prompt_attention('\(literal\]') |
|
[['(literal]', 1.0]] |
|
>>> parse_prompt_attention('(unnecessary)(parens)') |
|
[['unnecessaryparens', 1.1]] |
|
>>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).') |
|
[['a ', 1.0], |
|
['house', 1.5730000000000004], |
|
[' ', 1.1], |
|
['on', 1.0], |
|
[' a ', 1.1], |
|
['hill', 0.55], |
|
[', sun, ', 1.1], |
|
['sky', 1.4641000000000006], |
|
['.', 1.1]] |
|
""" |
|
|
|
res = [] |
|
round_brackets = [] |
|
square_brackets = [] |
|
|
|
round_bracket_multiplier = 1.1 |
|
square_bracket_multiplier = 1 / 1.1 |
|
|
|
def multiply_range(start_position, multiplier): |
|
for p in range(start_position, len(res)): |
|
res[p][1] *= multiplier |
|
|
|
for m in re_attention.finditer(text): |
|
text = m.group(0) |
|
weight = m.group(1) |
|
|
|
if text.startswith("\\"): |
|
res.append([text[1:], 1.0]) |
|
elif text == "(": |
|
round_brackets.append(len(res)) |
|
elif text == "[": |
|
square_brackets.append(len(res)) |
|
elif weight is not None and len(round_brackets) > 0: |
|
multiply_range(round_brackets.pop(), float(weight)) |
|
elif text == ")" and len(round_brackets) > 0: |
|
multiply_range(round_brackets.pop(), round_bracket_multiplier) |
|
elif text == "]" and len(square_brackets) > 0: |
|
multiply_range(square_brackets.pop(), square_bracket_multiplier) |
|
else: |
|
res.append([text, 1.0]) |
|
|
|
for pos in round_brackets: |
|
multiply_range(pos, round_bracket_multiplier) |
|
|
|
for pos in square_brackets: |
|
multiply_range(pos, square_bracket_multiplier) |
|
|
|
if len(res) == 0: |
|
res = [["", 1.0]] |
|
|
|
|
|
i = 0 |
|
while i + 1 < len(res): |
|
if res[i][1] == res[i + 1][1]: |
|
res[i][0] += res[i + 1][0] |
|
res.pop(i + 1) |
|
else: |
|
i += 1 |
|
|
|
return res |
|
|
|
|
|
def get_prompts_with_weights(tokenizer, prompt: List[str], max_length: int): |
|
r""" |
|
Tokenize a list of prompts and return its tokens with weights of each token. |
|
|
|
No padding, starting or ending token is included. |
|
""" |
|
tokens = [] |
|
weights = [] |
|
truncated = False |
|
for text in prompt: |
|
texts_and_weights = parse_prompt_attention(text) |
|
text_token = [] |
|
text_weight = [] |
|
for word, weight in texts_and_weights: |
|
|
|
token = tokenizer(word).input_ids[1:-1] |
|
text_token += token |
|
|
|
text_weight += [weight] * len(token) |
|
|
|
if len(text_token) > max_length: |
|
truncated = True |
|
break |
|
|
|
if len(text_token) > max_length: |
|
truncated = True |
|
text_token = text_token[:max_length] |
|
text_weight = text_weight[:max_length] |
|
tokens.append(text_token) |
|
weights.append(text_weight) |
|
if truncated: |
|
print("Prompt was truncated. Try to shorten the prompt or increase max_embeddings_multiples") |
|
return tokens, weights |
|
|
|
|
|
def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, no_boseos_middle=True, chunk_length=77): |
|
r""" |
|
Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length. |
|
""" |
|
max_embeddings_multiples = (max_length - 2) // (chunk_length - 2) |
|
weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length |
|
for i in range(len(tokens)): |
|
tokens[i] = [bos] + tokens[i] + [eos] * (max_length - 1 - len(tokens[i])) |
|
if no_boseos_middle: |
|
weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i])) |
|
else: |
|
w = [] |
|
if len(weights[i]) == 0: |
|
w = [1.0] * weights_length |
|
else: |
|
for j in range(max_embeddings_multiples): |
|
w.append(1.0) |
|
w += weights[i][j * (chunk_length - 2) : min(len(weights[i]), (j + 1) * (chunk_length - 2))] |
|
w.append(1.0) |
|
w += [1.0] * (weights_length - len(w)) |
|
weights[i] = w[:] |
|
|
|
return tokens, weights |
|
|
|
|
|
def get_unweighted_text_embeddings( |
|
tokenizer, |
|
text_encoder, |
|
text_input: torch.Tensor, |
|
chunk_length: int, |
|
clip_skip: int, |
|
eos: int, |
|
pad: int, |
|
no_boseos_middle: Optional[bool] = True, |
|
): |
|
""" |
|
When the length of tokens is a multiple of the capacity of the text encoder, |
|
it should be split into chunks and sent to the text encoder individually. |
|
""" |
|
max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2) |
|
if max_embeddings_multiples > 1: |
|
text_embeddings = [] |
|
for i in range(max_embeddings_multiples): |
|
|
|
text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone() |
|
|
|
|
|
text_input_chunk[:, 0] = text_input[0, 0] |
|
if pad == eos: |
|
text_input_chunk[:, -1] = text_input[0, -1] |
|
else: |
|
for j in range(len(text_input_chunk)): |
|
if text_input_chunk[j, -1] != eos and text_input_chunk[j, -1] != pad: |
|
text_input_chunk[j, -1] = eos |
|
if text_input_chunk[j, 1] == pad: |
|
text_input_chunk[j, 1] = eos |
|
|
|
if clip_skip is None or clip_skip == 1: |
|
text_embedding = text_encoder(text_input_chunk)[0] |
|
else: |
|
enc_out = text_encoder(text_input_chunk, output_hidden_states=True, return_dict=True) |
|
text_embedding = enc_out["hidden_states"][-clip_skip] |
|
text_embedding = text_encoder.text_model.final_layer_norm(text_embedding) |
|
|
|
if no_boseos_middle: |
|
if i == 0: |
|
|
|
text_embedding = text_embedding[:, :-1] |
|
elif i == max_embeddings_multiples - 1: |
|
|
|
text_embedding = text_embedding[:, 1:] |
|
else: |
|
|
|
text_embedding = text_embedding[:, 1:-1] |
|
|
|
text_embeddings.append(text_embedding) |
|
text_embeddings = torch.concat(text_embeddings, axis=1) |
|
else: |
|
if clip_skip is None or clip_skip == 1: |
|
text_embeddings = text_encoder(text_input)[0] |
|
else: |
|
enc_out = text_encoder(text_input, output_hidden_states=True, return_dict=True) |
|
text_embeddings = enc_out["hidden_states"][-clip_skip] |
|
text_embeddings = text_encoder.text_model.final_layer_norm(text_embeddings) |
|
return text_embeddings |
|
|
|
|
|
def get_weighted_text_embeddings( |
|
tokenizer, |
|
text_encoder, |
|
prompt: Union[str, List[str]], |
|
device, |
|
max_embeddings_multiples: Optional[int] = 3, |
|
no_boseos_middle: Optional[bool] = False, |
|
clip_skip=None, |
|
): |
|
r""" |
|
Prompts can be assigned with local weights using brackets. For example, |
|
prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful', |
|
and the embedding tokens corresponding to the words get multiplied by a constant, 1.1. |
|
|
|
Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean. |
|
|
|
Args: |
|
prompt (`str` or `List[str]`): |
|
The prompt or prompts to guide the image generation. |
|
max_embeddings_multiples (`int`, *optional*, defaults to `3`): |
|
The max multiple length of prompt embeddings compared to the max output length of text encoder. |
|
no_boseos_middle (`bool`, *optional*, defaults to `False`): |
|
If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and |
|
ending token in each of the chunk in the middle. |
|
skip_parsing (`bool`, *optional*, defaults to `False`): |
|
Skip the parsing of brackets. |
|
skip_weighting (`bool`, *optional*, defaults to `False`): |
|
Skip the weighting. When the parsing is skipped, it is forced True. |
|
""" |
|
max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 |
|
if isinstance(prompt, str): |
|
prompt = [prompt] |
|
|
|
prompt_tokens, prompt_weights = get_prompts_with_weights(tokenizer, prompt, max_length - 2) |
|
|
|
|
|
max_length = max([len(token) for token in prompt_tokens]) |
|
|
|
max_embeddings_multiples = min( |
|
max_embeddings_multiples, |
|
(max_length - 1) // (tokenizer.model_max_length - 2) + 1, |
|
) |
|
max_embeddings_multiples = max(1, max_embeddings_multiples) |
|
max_length = (tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 |
|
|
|
|
|
bos = tokenizer.bos_token_id |
|
eos = tokenizer.eos_token_id |
|
pad = tokenizer.pad_token_id |
|
prompt_tokens, prompt_weights = pad_tokens_and_weights( |
|
prompt_tokens, |
|
prompt_weights, |
|
max_length, |
|
bos, |
|
eos, |
|
no_boseos_middle=no_boseos_middle, |
|
chunk_length=tokenizer.model_max_length, |
|
) |
|
prompt_tokens = torch.tensor(prompt_tokens, dtype=torch.long, device=device) |
|
|
|
|
|
text_embeddings = get_unweighted_text_embeddings( |
|
tokenizer, |
|
text_encoder, |
|
prompt_tokens, |
|
tokenizer.model_max_length, |
|
clip_skip, |
|
eos, |
|
pad, |
|
no_boseos_middle=no_boseos_middle, |
|
) |
|
prompt_weights = torch.tensor(prompt_weights, dtype=text_embeddings.dtype, device=device) |
|
|
|
|
|
previous_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype) |
|
text_embeddings = text_embeddings * prompt_weights.unsqueeze(-1) |
|
current_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype) |
|
text_embeddings = text_embeddings * (previous_mean / current_mean).unsqueeze(-1).unsqueeze(-1) |
|
|
|
return text_embeddings |
|
|
|
|
|
|
|
def pyramid_noise_like(noise, device, iterations=6, discount=0.4): |
|
b, c, w, h = noise.shape |
|
u = torch.nn.Upsample(size=(w, h), mode="bilinear").to(device) |
|
for i in range(iterations): |
|
r = random.random() * 2 + 2 |
|
wn, hn = max(1, int(w / (r**i))), max(1, int(h / (r**i))) |
|
noise += u(torch.randn(b, c, wn, hn).to(device)) * discount**i |
|
if wn == 1 or hn == 1: |
|
break |
|
return noise / noise.std() |
|
|
|
|
|
|
|
def apply_noise_offset(latents, noise, noise_offset, adaptive_noise_scale): |
|
if noise_offset is None: |
|
return noise |
|
if adaptive_noise_scale is not None: |
|
|
|
|
|
latent_mean = torch.abs(latents.mean(dim=(2, 3), keepdim=True)) |
|
|
|
|
|
noise_offset = noise_offset + adaptive_noise_scale * latent_mean |
|
noise_offset = torch.clamp(noise_offset, 0.0, None) |
|
|
|
noise = noise + noise_offset * torch.randn((latents.shape[0], latents.shape[1], 1, 1), device=latents.device) |
|
return noise |
|
|
|
|
|
""" |
|
########################################## |
|
# Perlin Noise |
|
def rand_perlin_2d(device, shape, res, fade=lambda t: 6 * t**5 - 15 * t**4 + 10 * t**3): |
|
delta = (res[0] / shape[0], res[1] / shape[1]) |
|
d = (shape[0] // res[0], shape[1] // res[1]) |
|
|
|
grid = ( |
|
torch.stack( |
|
torch.meshgrid(torch.arange(0, res[0], delta[0], device=device), torch.arange(0, res[1], delta[1], device=device)), |
|
dim=-1, |
|
) |
|
% 1 |
|
) |
|
angles = 2 * torch.pi * torch.rand(res[0] + 1, res[1] + 1, device=device) |
|
gradients = torch.stack((torch.cos(angles), torch.sin(angles)), dim=-1) |
|
|
|
tile_grads = ( |
|
lambda slice1, slice2: gradients[slice1[0] : slice1[1], slice2[0] : slice2[1]] |
|
.repeat_interleave(d[0], 0) |
|
.repeat_interleave(d[1], 1) |
|
) |
|
dot = lambda grad, shift: ( |
|
torch.stack((grid[: shape[0], : shape[1], 0] + shift[0], grid[: shape[0], : shape[1], 1] + shift[1]), dim=-1) |
|
* grad[: shape[0], : shape[1]] |
|
).sum(dim=-1) |
|
|
|
n00 = dot(tile_grads([0, -1], [0, -1]), [0, 0]) |
|
n10 = dot(tile_grads([1, None], [0, -1]), [-1, 0]) |
|
n01 = dot(tile_grads([0, -1], [1, None]), [0, -1]) |
|
n11 = dot(tile_grads([1, None], [1, None]), [-1, -1]) |
|
t = fade(grid[: shape[0], : shape[1]]) |
|
return 1.414 * torch.lerp(torch.lerp(n00, n10, t[..., 0]), torch.lerp(n01, n11, t[..., 0]), t[..., 1]) |
|
|
|
|
|
def rand_perlin_2d_octaves(device, shape, res, octaves=1, persistence=0.5): |
|
noise = torch.zeros(shape, device=device) |
|
frequency = 1 |
|
amplitude = 1 |
|
for _ in range(octaves): |
|
noise += amplitude * rand_perlin_2d(device, shape, (frequency * res[0], frequency * res[1])) |
|
frequency *= 2 |
|
amplitude *= persistence |
|
return noise |
|
|
|
|
|
def perlin_noise(noise, device, octaves): |
|
_, c, w, h = noise.shape |
|
perlin = lambda: rand_perlin_2d_octaves(device, (w, h), (4, 4), octaves) |
|
noise_perlin = [] |
|
for _ in range(c): |
|
noise_perlin.append(perlin()) |
|
noise_perlin = torch.stack(noise_perlin).unsqueeze(0) # (1, c, w, h) |
|
noise += noise_perlin # broadcast for each batch |
|
return noise / noise.std() # Scaled back to roughly unit variance |
|
""" |
|
|