|
import math |
|
import torch |
|
import torch.nn as nn |
|
|
|
from ldm.modules.diffusionmodules.openaimodel import ( |
|
TimestepEmbedSequential, |
|
ResBlock, |
|
Downsample, |
|
) |
|
|
|
from ldm.modules.diffusionmodules.util import ( |
|
conv_nd, |
|
linear, |
|
timestep_embedding, |
|
checkpoint, |
|
normalization, |
|
zero_module, |
|
) |
|
|
|
from srmodule.attn import get_attn_func |
|
|
|
attn_func = None |
|
|
|
|
|
class QKVAttentionLegacy(nn.Module): |
|
""" |
|
A module which performs QKV attention. Matches legacy QKVAttention + input/ouput heads shaping |
|
""" |
|
|
|
def __init__(self, n_heads): |
|
super().__init__() |
|
self.n_heads = n_heads |
|
|
|
def forward(self, qkv): |
|
""" |
|
Apply QKV attention. |
|
:param qkv: an [N x (3 * H * C) x T] tensor of Qs, Ks, and Vs. |
|
:return: an [N x (H * C) x T] tensor after attention. |
|
""" |
|
bs, width, length = qkv.shape |
|
assert width % (3 * self.n_heads) == 0 |
|
ch = width // (3 * self.n_heads) |
|
q, k, v = qkv.reshape(bs * self.n_heads, ch * 3, length).split(ch, dim=1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
q, k, v = map( |
|
lambda t:t.permute(0,2,1) |
|
.contiguous(), |
|
(q, k, v), |
|
) |
|
global attn_func |
|
a = attn_func(q, k, v) |
|
a = ( |
|
a.permute(0,2,1) |
|
.reshape(bs, -1, length) |
|
) |
|
return a |
|
|
|
class AttentionBlock(nn.Module): |
|
""" |
|
An attention block that allows spatial positions to attend to each other. |
|
Originally ported from here, but adapted to the N-d case. |
|
https://github.com/hojonathanho/diffusion/blob/1e0dceb3b3495bbe19116a5e1b3596cd0706c543/diffusion_tf/models/unet.py#L66. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
channels, |
|
num_heads=1, |
|
num_head_channels=-1, |
|
use_checkpoint=False, |
|
use_new_attention_order=False, |
|
): |
|
super().__init__() |
|
self.channels = channels |
|
if num_head_channels == -1: |
|
self.num_heads = num_heads |
|
else: |
|
assert ( |
|
channels % num_head_channels == 0 |
|
), f"q,k,v channels {channels} is not divisible by num_head_channels {num_head_channels}" |
|
self.num_heads = channels // num_head_channels |
|
self.norm = normalization(channels) |
|
self.qkv = conv_nd(1, channels, channels * 3, 1) |
|
self.attention = QKVAttentionLegacy(self.num_heads) |
|
|
|
self.proj_out = zero_module(conv_nd(1, channels, channels, 1)) |
|
|
|
def forward(self, x): |
|
return checkpoint(self._forward, (x,), self.parameters(), True) |
|
|
|
def _forward(self, x): |
|
b, c, *spatial = x.shape |
|
x = x.reshape(b, c, -1) |
|
qkv = self.qkv(self.norm(x)) |
|
h = self.attention(qkv) |
|
h = self.proj_out(h) |
|
return (x + h).reshape(b, c, *spatial) |
|
|
|
|
|
class EncoderUNetModelWT(nn.Module): |
|
""" |
|
The half UNet model with attention and timestep embedding. |
|
For usage, see UNet. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
in_channels, |
|
model_channels, |
|
out_channels, |
|
num_res_blocks, |
|
attention_resolutions, |
|
dropout=0, |
|
channel_mult=(1, 2, 4, 8), |
|
conv_resample=True, |
|
dims=2, |
|
use_checkpoint=False, |
|
use_fp16=False, |
|
num_heads=4, |
|
num_head_channels=-1, |
|
num_heads_upsample=-1, |
|
use_scale_shift_norm=False, |
|
resblock_updown=False, |
|
use_new_attention_order=False, |
|
): |
|
super().__init__() |
|
|
|
if num_heads_upsample == -1: |
|
num_heads_upsample = num_heads |
|
|
|
self.in_channels = in_channels |
|
self.model_channels = model_channels |
|
self.out_channels = out_channels |
|
self.num_res_blocks = num_res_blocks |
|
self.attention_resolutions = attention_resolutions |
|
self.dropout = dropout |
|
self.channel_mult = channel_mult |
|
self.conv_resample = conv_resample |
|
self.use_checkpoint = use_checkpoint |
|
self.dtype = torch.float16 if use_fp16 else torch.float32 |
|
self.num_heads = num_heads |
|
self.num_head_channels = num_head_channels |
|
self.num_heads_upsample = num_heads_upsample |
|
|
|
time_embed_dim = model_channels * 4 |
|
self.time_embed = nn.Sequential( |
|
linear(model_channels, time_embed_dim), |
|
nn.SiLU(), |
|
linear(time_embed_dim, time_embed_dim), |
|
) |
|
|
|
self.input_blocks = nn.ModuleList( |
|
[ |
|
TimestepEmbedSequential( |
|
conv_nd(dims, in_channels, model_channels, 3, padding=1) |
|
) |
|
] |
|
) |
|
self._feature_size = model_channels |
|
input_block_chans = [] |
|
ch = model_channels |
|
ds = 1 |
|
for level, mult in enumerate(channel_mult): |
|
for _ in range(num_res_blocks): |
|
layers = [ |
|
ResBlock( |
|
ch, |
|
time_embed_dim, |
|
dropout, |
|
out_channels=mult * model_channels, |
|
dims=dims, |
|
use_checkpoint=use_checkpoint, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
) |
|
] |
|
ch = mult * model_channels |
|
if ds in attention_resolutions: |
|
layers.append( |
|
AttentionBlock( |
|
ch, |
|
use_checkpoint=use_checkpoint, |
|
num_heads=num_heads, |
|
num_head_channels=num_head_channels, |
|
use_new_attention_order=use_new_attention_order, |
|
) |
|
) |
|
self.input_blocks.append(TimestepEmbedSequential(*layers)) |
|
self._feature_size += ch |
|
if level != len(channel_mult) - 1: |
|
out_ch = ch |
|
self.input_blocks.append( |
|
TimestepEmbedSequential( |
|
ResBlock( |
|
ch, |
|
time_embed_dim, |
|
dropout, |
|
out_channels=out_ch, |
|
dims=dims, |
|
use_checkpoint=use_checkpoint, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
down=True, |
|
) |
|
if resblock_updown |
|
else Downsample( |
|
ch, conv_resample, dims=dims, out_channels=out_ch |
|
) |
|
) |
|
) |
|
ch = out_ch |
|
input_block_chans.append(ch) |
|
ds *= 2 |
|
self._feature_size += ch |
|
|
|
self.middle_block = TimestepEmbedSequential( |
|
ResBlock( |
|
ch, |
|
time_embed_dim, |
|
dropout, |
|
dims=dims, |
|
use_checkpoint=use_checkpoint, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
), |
|
AttentionBlock( |
|
ch, |
|
use_checkpoint=use_checkpoint, |
|
num_heads=num_heads, |
|
num_head_channels=num_head_channels, |
|
use_new_attention_order=use_new_attention_order, |
|
), |
|
ResBlock( |
|
ch, |
|
time_embed_dim, |
|
dropout, |
|
dims=dims, |
|
use_checkpoint=use_checkpoint, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
), |
|
) |
|
input_block_chans.append(ch) |
|
self._feature_size += ch |
|
self.input_block_chans = input_block_chans |
|
|
|
self.fea_tran = nn.ModuleList([]) |
|
|
|
for i in range(len(input_block_chans)): |
|
self.fea_tran.append( |
|
ResBlock( |
|
input_block_chans[i], |
|
time_embed_dim, |
|
dropout, |
|
out_channels=out_channels, |
|
dims=dims, |
|
use_checkpoint=use_checkpoint, |
|
use_scale_shift_norm=use_scale_shift_norm, |
|
) |
|
) |
|
|
|
@torch.no_grad() |
|
def forward(self, x, timesteps): |
|
""" |
|
Apply the model to an input batch. |
|
:param x: an [N x C x ...] Tensor of inputs. |
|
:param timesteps: a 1-D batch of timesteps. |
|
:return: an [N x K] Tensor of outputs. |
|
""" |
|
emb = self.time_embed(timestep_embedding(timesteps, self.model_channels)) |
|
|
|
result_list = [] |
|
results = {} |
|
h = x.type(self.dtype) |
|
for module in self.input_blocks: |
|
last_h = h |
|
h = module(h, emb) |
|
if h.size(-1) != last_h.size(-1): |
|
result_list.append(last_h) |
|
h = self.middle_block(h, emb) |
|
result_list.append(h) |
|
|
|
assert len(result_list) == len(self.fea_tran) |
|
|
|
for i in range(len(result_list)): |
|
results[str(result_list[i].size(-1))] = self.fea_tran[i](result_list[i], emb) |
|
|
|
return results |
|
|
|
def load_from_dict(self, state_dict): |
|
""" |
|
Load model weights from a dictionary. |
|
:param state_dict: a dict of parameters. |
|
""" |
|
filtered_dict = {} |
|
for k, v in state_dict.items(): |
|
if k.startswith("structcond_stage_model."): |
|
filtered_dict[k[len("structcond_stage_model.") :]] = v |
|
self.load_state_dict(filtered_dict) |
|
|
|
|
|
def build_unetwt() -> EncoderUNetModelWT: |
|
""" |
|
Build a model from a state dict. |
|
:param state_dict: a dict of parameters. |
|
:return: a nn.Module. |
|
""" |
|
|
|
|
|
|
|
model = EncoderUNetModelWT( |
|
in_channels=4, |
|
model_channels=256, |
|
out_channels=256, |
|
num_res_blocks=2, |
|
attention_resolutions=[ 4, 2, 1 ], |
|
dropout=0.0, |
|
channel_mult=[1, 1, 2, 2], |
|
conv_resample=True, |
|
dims=2, |
|
use_checkpoint=False, |
|
use_fp16=False, |
|
num_heads=4, |
|
num_head_channels=-1, |
|
num_heads_upsample=-1, |
|
use_scale_shift_norm=False, |
|
resblock_updown=False, |
|
use_new_attention_order=False, |
|
) |
|
global attn_func |
|
attn_func = get_attn_func() |
|
return model |
|
|
|
|
|
if __name__ == "__main__": |
|
''' |
|
Test the lr encoder model. |
|
''' |
|
path = '../models/stablesr_sd21.ckpt' |
|
state_dict = torch.load(path) |
|
for key in state_dict.keys(): |
|
print(key) |
|
model = build_unetwt() |
|
model.load_from_dict(state_dict) |
|
model = model.cuda() |
|
test_latent = torch.randn(1, 4, 64, 64).half().cuda() |
|
test_timesteps = torch.tensor([0]).half().cuda() |
|
with torch.no_grad(): |
|
test_result = model(test_latent, test_timesteps) |
|
print(test_result.keys()) |