Spaces:
Running
on
A10G
Running
on
A10G
''' Towards An End-to-End Framework for Video Inpainting | |
''' | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import torchvision | |
from einops import rearrange | |
from model.modules.base_module import BaseNetwork | |
from model.modules.sparse_transformer import TemporalSparseTransformerBlock, SoftSplit, SoftComp | |
from model.modules.spectral_norm import spectral_norm as _spectral_norm | |
from model.modules.flow_loss_utils import flow_warp | |
from model.modules.deformconv import ModulatedDeformConv2d | |
from .misc import constant_init | |
def length_sq(x): | |
return torch.sum(torch.square(x), dim=1, keepdim=True) | |
def fbConsistencyCheck(flow_fw, flow_bw, alpha1=0.01, alpha2=0.5): | |
flow_bw_warped = flow_warp(flow_bw, flow_fw.permute(0, 2, 3, 1)) # wb(wf(x)) | |
flow_diff_fw = flow_fw + flow_bw_warped # wf + wb(wf(x)) | |
mag_sq_fw = length_sq(flow_fw) + length_sq(flow_bw_warped) # |wf| + |wb(wf(x))| | |
occ_thresh_fw = alpha1 * mag_sq_fw + alpha2 | |
# fb_valid_fw = (length_sq(flow_diff_fw) < occ_thresh_fw).float() | |
fb_valid_fw = (length_sq(flow_diff_fw) < occ_thresh_fw).to(flow_fw) | |
return fb_valid_fw | |
class DeformableAlignment(ModulatedDeformConv2d): | |
"""Second-order deformable alignment module.""" | |
def __init__(self, *args, **kwargs): | |
# self.max_residue_magnitude = kwargs.pop('max_residue_magnitude', 10) | |
self.max_residue_magnitude = kwargs.pop('max_residue_magnitude', 3) | |
super(DeformableAlignment, self).__init__(*args, **kwargs) | |
self.conv_offset = nn.Sequential( | |
nn.Conv2d(2*self.out_channels + 2 + 1 + 2, self.out_channels, 3, 1, 1), | |
nn.LeakyReLU(negative_slope=0.1, inplace=True), | |
nn.Conv2d(self.out_channels, self.out_channels, 3, 1, 1), | |
nn.LeakyReLU(negative_slope=0.1, inplace=True), | |
nn.Conv2d(self.out_channels, self.out_channels, 3, 1, 1), | |
nn.LeakyReLU(negative_slope=0.1, inplace=True), | |
nn.Conv2d(self.out_channels, 27 * self.deform_groups, 3, 1, 1), | |
) | |
self.init_offset() | |
def init_offset(self): | |
constant_init(self.conv_offset[-1], val=0, bias=0) | |
def forward(self, x, cond_feat, flow): | |
out = self.conv_offset(cond_feat) | |
o1, o2, mask = torch.chunk(out, 3, dim=1) | |
# offset | |
offset = self.max_residue_magnitude * torch.tanh(torch.cat((o1, o2), dim=1)) | |
offset = offset + flow.flip(1).repeat(1, offset.size(1) // 2, 1, 1) | |
# mask | |
mask = torch.sigmoid(mask) | |
return torchvision.ops.deform_conv2d(x, offset, self.weight, self.bias, | |
self.stride, self.padding, | |
self.dilation, mask) | |
class BidirectionalPropagation(nn.Module): | |
def __init__(self, channel, learnable=True): | |
super(BidirectionalPropagation, self).__init__() | |
self.deform_align = nn.ModuleDict() | |
self.backbone = nn.ModuleDict() | |
self.channel = channel | |
self.prop_list = ['backward_1', 'forward_1'] | |
self.learnable = learnable | |
if self.learnable: | |
for i, module in enumerate(self.prop_list): | |
self.deform_align[module] = DeformableAlignment( | |
channel, channel, 3, padding=1, deform_groups=16) | |
self.backbone[module] = nn.Sequential( | |
nn.Conv2d(2*channel+2, channel, 3, 1, 1), | |
nn.LeakyReLU(negative_slope=0.2, inplace=True), | |
nn.Conv2d(channel, channel, 3, 1, 1), | |
) | |
self.fuse = nn.Sequential( | |
nn.Conv2d(2*channel+2, channel, 3, 1, 1), | |
nn.LeakyReLU(negative_slope=0.2, inplace=True), | |
nn.Conv2d(channel, channel, 3, 1, 1), | |
) | |
def binary_mask(self, mask, th=0.1): | |
mask[mask>th] = 1 | |
mask[mask<=th] = 0 | |
# return mask.float() | |
return mask.to(mask) | |
def forward(self, x, flows_forward, flows_backward, mask, interpolation='bilinear'): | |
""" | |
x shape : [b, t, c, h, w] | |
return [b, t, c, h, w] | |
""" | |
# For backward warping | |
# pred_flows_forward for backward feature propagation | |
# pred_flows_backward for forward feature propagation | |
b, t, c, h, w = x.shape | |
feats, masks = {}, {} | |
feats['input'] = [x[:, i, :, :, :] for i in range(0, t)] | |
masks['input'] = [mask[:, i, :, :, :] for i in range(0, t)] | |
prop_list = ['backward_1', 'forward_1'] | |
cache_list = ['input'] + prop_list | |
for p_i, module_name in enumerate(prop_list): | |
feats[module_name] = [] | |
masks[module_name] = [] | |
if 'backward' in module_name: | |
frame_idx = range(0, t) | |
frame_idx = frame_idx[::-1] | |
flow_idx = frame_idx | |
flows_for_prop = flows_forward | |
flows_for_check = flows_backward | |
else: | |
frame_idx = range(0, t) | |
flow_idx = range(-1, t - 1) | |
flows_for_prop = flows_backward | |
flows_for_check = flows_forward | |
for i, idx in enumerate(frame_idx): | |
feat_current = feats[cache_list[p_i]][idx] | |
mask_current = masks[cache_list[p_i]][idx] | |
if i == 0: | |
feat_prop = feat_current | |
mask_prop = mask_current | |
else: | |
flow_prop = flows_for_prop[:, flow_idx[i], :, :, :] | |
flow_check = flows_for_check[:, flow_idx[i], :, :, :] | |
flow_vaild_mask = fbConsistencyCheck(flow_prop, flow_check) | |
feat_warped = flow_warp(feat_prop, flow_prop.permute(0, 2, 3, 1), interpolation) | |
if self.learnable: | |
cond = torch.cat([feat_current, feat_warped, flow_prop, flow_vaild_mask, mask_current], dim=1) | |
feat_prop = self.deform_align[module_name](feat_prop, cond, flow_prop) | |
mask_prop = mask_current | |
else: | |
mask_prop_valid = flow_warp(mask_prop, flow_prop.permute(0, 2, 3, 1)) | |
mask_prop_valid = self.binary_mask(mask_prop_valid) | |
union_vaild_mask = self.binary_mask(mask_current*flow_vaild_mask*(1-mask_prop_valid)) | |
feat_prop = union_vaild_mask * feat_warped + (1-union_vaild_mask) * feat_current | |
# update mask | |
mask_prop = self.binary_mask(mask_current*(1-(flow_vaild_mask*(1-mask_prop_valid)))) | |
# refine | |
if self.learnable: | |
feat = torch.cat([feat_current, feat_prop, mask_current], dim=1) | |
feat_prop = feat_prop + self.backbone[module_name](feat) | |
# feat_prop = self.backbone[module_name](feat_prop) | |
feats[module_name].append(feat_prop) | |
masks[module_name].append(mask_prop) | |
# end for | |
if 'backward' in module_name: | |
feats[module_name] = feats[module_name][::-1] | |
masks[module_name] = masks[module_name][::-1] | |
outputs_b = torch.stack(feats['backward_1'], dim=1).view(-1, c, h, w) | |
outputs_f = torch.stack(feats['forward_1'], dim=1).view(-1, c, h, w) | |
if self.learnable: | |
mask_in = mask.view(-1, 2, h, w) | |
masks_b, masks_f = None, None | |
outputs = self.fuse(torch.cat([outputs_b, outputs_f, mask_in], dim=1)) + x.view(-1, c, h, w) | |
else: | |
masks_b = torch.stack(masks['backward_1'], dim=1) | |
masks_f = torch.stack(masks['forward_1'], dim=1) | |
outputs = outputs_f | |
return outputs_b.view(b, -1, c, h, w), outputs_f.view(b, -1, c, h, w), \ | |
outputs.view(b, -1, c, h, w), masks_f | |
class Encoder(nn.Module): | |
def __init__(self): | |
super(Encoder, self).__init__() | |
self.group = [1, 2, 4, 8, 1] | |
self.layers = nn.ModuleList([ | |
nn.Conv2d(5, 64, kernel_size=3, stride=2, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1, groups=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(640, 512, kernel_size=3, stride=1, padding=1, groups=2), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(768, 384, kernel_size=3, stride=1, padding=1, groups=4), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(640, 256, kernel_size=3, stride=1, padding=1, groups=8), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(512, 128, kernel_size=3, stride=1, padding=1, groups=1), | |
nn.LeakyReLU(0.2, inplace=True) | |
]) | |
def forward(self, x): | |
bt, c, _, _ = x.size() | |
# h, w = h//4, w//4 | |
out = x | |
for i, layer in enumerate(self.layers): | |
if i == 8: | |
x0 = out | |
_, _, h, w = x0.size() | |
if i > 8 and i % 2 == 0: | |
g = self.group[(i - 8) // 2] | |
x = x0.view(bt, g, -1, h, w) | |
o = out.view(bt, g, -1, h, w) | |
out = torch.cat([x, o], 2).view(bt, -1, h, w) | |
out = layer(out) | |
return out | |
class deconv(nn.Module): | |
def __init__(self, | |
input_channel, | |
output_channel, | |
kernel_size=3, | |
padding=0): | |
super().__init__() | |
self.conv = nn.Conv2d(input_channel, | |
output_channel, | |
kernel_size=kernel_size, | |
stride=1, | |
padding=padding) | |
def forward(self, x): | |
x = F.interpolate(x, | |
scale_factor=2, | |
mode='bilinear', | |
align_corners=True) | |
return self.conv(x) | |
class InpaintGenerator(BaseNetwork): | |
def __init__(self, init_weights=True, model_path=None): | |
super(InpaintGenerator, self).__init__() | |
channel = 128 | |
hidden = 512 | |
# encoder | |
self.encoder = Encoder() | |
# decoder | |
self.decoder = nn.Sequential( | |
deconv(channel, 128, kernel_size=3, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
deconv(64, 64, kernel_size=3, padding=1), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1)) | |
# soft split and soft composition | |
kernel_size = (7, 7) | |
padding = (3, 3) | |
stride = (3, 3) | |
t2t_params = { | |
'kernel_size': kernel_size, | |
'stride': stride, | |
'padding': padding | |
} | |
self.ss = SoftSplit(channel, hidden, kernel_size, stride, padding) | |
self.sc = SoftComp(channel, hidden, kernel_size, stride, padding) | |
self.max_pool = nn.MaxPool2d(kernel_size, stride, padding) | |
# feature propagation module | |
self.img_prop_module = BidirectionalPropagation(3, learnable=False) | |
self.feat_prop_module = BidirectionalPropagation(128, learnable=True) | |
depths = 8 | |
num_heads = 4 | |
window_size = (5, 9) | |
pool_size = (4, 4) | |
self.transformers = TemporalSparseTransformerBlock(dim=hidden, | |
n_head=num_heads, | |
window_size=window_size, | |
pool_size=pool_size, | |
depths=depths, | |
t2t_params=t2t_params) | |
if init_weights: | |
self.init_weights() | |
if model_path is not None: | |
print('Pretrained ProPainter has loaded...') | |
ckpt = torch.load(model_path, map_location='cpu') | |
self.load_state_dict(ckpt, strict=True) | |
# print network parameter number | |
self.print_network() | |
def img_propagation(self, masked_frames, completed_flows, masks, interpolation='nearest'): | |
_, _, prop_frames, updated_masks = self.img_prop_module(masked_frames, completed_flows[0], completed_flows[1], masks, interpolation) | |
return prop_frames, updated_masks | |
def forward(self, masked_frames, completed_flows, masks_in, masks_updated, num_local_frames, interpolation='bilinear', t_dilation=2): | |
""" | |
Args: | |
masks_in: original mask | |
masks_updated: updated mask after image propagation | |
""" | |
l_t = num_local_frames | |
b, t, _, ori_h, ori_w = masked_frames.size() | |
# extracting features | |
enc_feat = self.encoder(torch.cat([masked_frames.view(b * t, 3, ori_h, ori_w), | |
masks_in.view(b * t, 1, ori_h, ori_w), | |
masks_updated.view(b * t, 1, ori_h, ori_w)], dim=1)) | |
_, c, h, w = enc_feat.size() | |
local_feat = enc_feat.view(b, t, c, h, w)[:, :l_t, ...] | |
ref_feat = enc_feat.view(b, t, c, h, w)[:, l_t:, ...] | |
fold_feat_size = (h, w) | |
ds_flows_f = F.interpolate(completed_flows[0].view(-1, 2, ori_h, ori_w), scale_factor=1/4, mode='bilinear', align_corners=False).view(b, l_t-1, 2, h, w)/4.0 | |
ds_flows_b = F.interpolate(completed_flows[1].view(-1, 2, ori_h, ori_w), scale_factor=1/4, mode='bilinear', align_corners=False).view(b, l_t-1, 2, h, w)/4.0 | |
ds_mask_in = F.interpolate(masks_in.reshape(-1, 1, ori_h, ori_w), scale_factor=1/4, mode='nearest').view(b, t, 1, h, w) | |
ds_mask_in_local = ds_mask_in[:, :l_t] | |
ds_mask_updated_local = F.interpolate(masks_updated[:,:l_t].reshape(-1, 1, ori_h, ori_w), scale_factor=1/4, mode='nearest').view(b, l_t, 1, h, w) | |
if self.training: | |
mask_pool_l = self.max_pool(ds_mask_in.view(-1, 1, h, w)) | |
mask_pool_l = mask_pool_l.view(b, t, 1, mask_pool_l.size(-2), mask_pool_l.size(-1)) | |
else: | |
mask_pool_l = self.max_pool(ds_mask_in_local.view(-1, 1, h, w)) | |
mask_pool_l = mask_pool_l.view(b, l_t, 1, mask_pool_l.size(-2), mask_pool_l.size(-1)) | |
prop_mask_in = torch.cat([ds_mask_in_local, ds_mask_updated_local], dim=2) | |
_, _, local_feat, _ = self.feat_prop_module(local_feat, ds_flows_f, ds_flows_b, prop_mask_in, interpolation) | |
enc_feat = torch.cat((local_feat, ref_feat), dim=1) | |
trans_feat = self.ss(enc_feat.view(-1, c, h, w), b, fold_feat_size) | |
mask_pool_l = rearrange(mask_pool_l, 'b t c h w -> b t h w c').contiguous() | |
trans_feat = self.transformers(trans_feat, fold_feat_size, mask_pool_l, t_dilation=t_dilation) | |
trans_feat = self.sc(trans_feat, t, fold_feat_size) | |
trans_feat = trans_feat.view(b, t, -1, h, w) | |
enc_feat = enc_feat + trans_feat | |
if self.training: | |
output = self.decoder(enc_feat.view(-1, c, h, w)) | |
output = torch.tanh(output).view(b, t, 3, ori_h, ori_w) | |
else: | |
output = self.decoder(enc_feat[:, :l_t].view(-1, c, h, w)) | |
output = torch.tanh(output).view(b, l_t, 3, ori_h, ori_w) | |
return output | |
# ###################################################################### | |
# Discriminator for Temporal Patch GAN | |
# ###################################################################### | |
class Discriminator(BaseNetwork): | |
def __init__(self, | |
in_channels=3, | |
use_sigmoid=False, | |
use_spectral_norm=True, | |
init_weights=True): | |
super(Discriminator, self).__init__() | |
self.use_sigmoid = use_sigmoid | |
nf = 32 | |
self.conv = nn.Sequential( | |
spectral_norm( | |
nn.Conv3d(in_channels=in_channels, | |
out_channels=nf * 1, | |
kernel_size=(3, 5, 5), | |
stride=(1, 2, 2), | |
padding=1, | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(64, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 1, | |
nf * 2, | |
kernel_size=(3, 5, 5), | |
stride=(1, 2, 2), | |
padding=(1, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(128, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 2, | |
nf * 4, | |
kernel_size=(3, 5, 5), | |
stride=(1, 2, 2), | |
padding=(1, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(256, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 4, | |
nf * 4, | |
kernel_size=(3, 5, 5), | |
stride=(1, 2, 2), | |
padding=(1, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(256, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 4, | |
nf * 4, | |
kernel_size=(3, 5, 5), | |
stride=(1, 2, 2), | |
padding=(1, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(256, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv3d(nf * 4, | |
nf * 4, | |
kernel_size=(3, 5, 5), | |
stride=(1, 2, 2), | |
padding=(1, 2, 2))) | |
if init_weights: | |
self.init_weights() | |
def forward(self, xs): | |
# T, C, H, W = xs.shape (old) | |
# B, T, C, H, W (new) | |
xs_t = torch.transpose(xs, 1, 2) | |
feat = self.conv(xs_t) | |
if self.use_sigmoid: | |
feat = torch.sigmoid(feat) | |
out = torch.transpose(feat, 1, 2) # B, T, C, H, W | |
return out | |
class Discriminator_2D(BaseNetwork): | |
def __init__(self, | |
in_channels=3, | |
use_sigmoid=False, | |
use_spectral_norm=True, | |
init_weights=True): | |
super(Discriminator_2D, self).__init__() | |
self.use_sigmoid = use_sigmoid | |
nf = 32 | |
self.conv = nn.Sequential( | |
spectral_norm( | |
nn.Conv3d(in_channels=in_channels, | |
out_channels=nf * 1, | |
kernel_size=(1, 5, 5), | |
stride=(1, 2, 2), | |
padding=(0, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(64, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 1, | |
nf * 2, | |
kernel_size=(1, 5, 5), | |
stride=(1, 2, 2), | |
padding=(0, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(128, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 2, | |
nf * 4, | |
kernel_size=(1, 5, 5), | |
stride=(1, 2, 2), | |
padding=(0, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(256, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 4, | |
nf * 4, | |
kernel_size=(1, 5, 5), | |
stride=(1, 2, 2), | |
padding=(0, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(256, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
spectral_norm( | |
nn.Conv3d(nf * 4, | |
nf * 4, | |
kernel_size=(1, 5, 5), | |
stride=(1, 2, 2), | |
padding=(0, 2, 2), | |
bias=not use_spectral_norm), use_spectral_norm), | |
# nn.InstanceNorm2d(256, track_running_stats=False), | |
nn.LeakyReLU(0.2, inplace=True), | |
nn.Conv3d(nf * 4, | |
nf * 4, | |
kernel_size=(1, 5, 5), | |
stride=(1, 2, 2), | |
padding=(0, 2, 2))) | |
if init_weights: | |
self.init_weights() | |
def forward(self, xs): | |
# T, C, H, W = xs.shape (old) | |
# B, T, C, H, W (new) | |
xs_t = torch.transpose(xs, 1, 2) | |
feat = self.conv(xs_t) | |
if self.use_sigmoid: | |
feat = torch.sigmoid(feat) | |
out = torch.transpose(feat, 1, 2) # B, T, C, H, W | |
return out | |
def spectral_norm(module, mode=True): | |
if mode: | |
return _spectral_norm(module) | |
return module | |