import torch import torch.nn as nn import torch.nn.functional as F from timm.models.layers import Mlp, DropPath, to_2tuple, trunc_normal_ import math import numpy as np class ResidualBlock(nn.Module): def __init__(self, in_planes, planes, norm_fn='group', stride=1): super(ResidualBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, padding=1, stride=stride) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, padding=1) self.relu = nn.ReLU(inplace=True) num_groups = planes // 8 if norm_fn == 'group': self.norm1 = nn.GroupNorm(num_groups=num_groups, num_channels=planes) self.norm2 = nn.GroupNorm(num_groups=num_groups, num_channels=planes) if not stride == 1: self.norm3 = nn.GroupNorm(num_groups=num_groups, num_channels=planes) elif norm_fn == 'batch': self.norm1 = nn.BatchNorm2d(planes) self.norm2 = nn.BatchNorm2d(planes) if not stride == 1: self.norm3 = nn.BatchNorm2d(planes) elif norm_fn == 'instance': self.norm1 = nn.InstanceNorm2d(planes) self.norm2 = nn.InstanceNorm2d(planes) if not stride == 1: self.norm3 = nn.InstanceNorm2d(planes) elif norm_fn == 'none': self.norm1 = nn.Sequential() self.norm2 = nn.Sequential() if not stride == 1: self.norm3 = nn.Sequential() if stride == 1: self.downsample = None else: self.downsample = nn.Sequential( nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride), self.norm3) def forward(self, x): y = x y = self.relu(self.norm1(self.conv1(y))) y = self.relu(self.norm2(self.conv2(y))) if self.downsample is not None: x = self.downsample(x) return self.relu(x+y) class BottleneckBlock(nn.Module): def __init__(self, in_planes, planes, norm_fn='group', stride=1): super(BottleneckBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, planes//4, kernel_size=1, padding=0) self.conv2 = nn.Conv2d(planes//4, planes//4, kernel_size=3, padding=1, stride=stride) self.conv3 = nn.Conv2d(planes//4, planes, kernel_size=1, padding=0) self.relu = nn.ReLU(inplace=True) num_groups = planes // 8 if norm_fn == 'group': self.norm1 = nn.GroupNorm(num_groups=num_groups, num_channels=planes//4) self.norm2 = nn.GroupNorm(num_groups=num_groups, num_channels=planes//4) self.norm3 = nn.GroupNorm(num_groups=num_groups, num_channels=planes) if not stride == 1: self.norm4 = nn.GroupNorm(num_groups=num_groups, num_channels=planes) elif norm_fn == 'batch': self.norm1 = nn.BatchNorm2d(planes//4) self.norm2 = nn.BatchNorm2d(planes//4) self.norm3 = nn.BatchNorm2d(planes) if not stride == 1: self.norm4 = nn.BatchNorm2d(planes) elif norm_fn == 'instance': self.norm1 = nn.InstanceNorm2d(planes//4) self.norm2 = nn.InstanceNorm2d(planes//4) self.norm3 = nn.InstanceNorm2d(planes) if not stride == 1: self.norm4 = nn.InstanceNorm2d(planes) elif norm_fn == 'none': self.norm1 = nn.Sequential() self.norm2 = nn.Sequential() self.norm3 = nn.Sequential() if not stride == 1: self.norm4 = nn.Sequential() if stride == 1: self.downsample = None else: self.downsample = nn.Sequential( nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride), self.norm4) def forward(self, x): y = x y = self.relu(self.norm1(self.conv1(y))) y = self.relu(self.norm2(self.conv2(y))) y = self.relu(self.norm3(self.conv3(y))) if self.downsample is not None: x = self.downsample(x) return self.relu(x+y) class BasicEncoder(nn.Module): def __init__(self, input_dim=3, output_dim=128, norm_fn='batch', dropout=0.0): super(BasicEncoder, self).__init__() self.norm_fn = norm_fn mul = input_dim // 3 if self.norm_fn == 'group': self.norm1 = nn.GroupNorm(num_groups=8, num_channels=64 * mul) elif self.norm_fn == 'batch': self.norm1 = nn.BatchNorm2d(64 * mul) elif self.norm_fn == 'instance': self.norm1 = nn.InstanceNorm2d(64 * mul) elif self.norm_fn == 'none': self.norm1 = nn.Sequential() self.conv1 = nn.Conv2d(input_dim, 64 * mul, kernel_size=7, stride=2, padding=3) self.relu1 = nn.ReLU(inplace=True) self.in_planes = 64 * mul self.layer1 = self._make_layer(64 * mul, stride=1) self.layer2 = self._make_layer(96 * mul, stride=2) self.layer3 = self._make_layer(128 * mul, stride=2) # output convolution self.conv2 = nn.Conv2d(128 * mul, output_dim, kernel_size=1) self.dropout = None if dropout > 0: self.dropout = nn.Dropout2d(p=dropout) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.InstanceNorm2d, nn.GroupNorm)): if m.weight is not None: nn.init.constant_(m.weight, 1) if m.bias is not None: nn.init.constant_(m.bias, 0) def _make_layer(self, dim, stride=1): layer1 = ResidualBlock(self.in_planes, dim, self.norm_fn, stride=stride) layer2 = ResidualBlock(dim, dim, self.norm_fn, stride=1) layers = (layer1, layer2) self.in_planes = dim return nn.Sequential(*layers) def compute_params(self): num = 0 for param in self.parameters(): num += np.prod(param.size()) return num def forward(self, x): # if input is list, combine batch dimension is_list = isinstance(x, tuple) or isinstance(x, list) if is_list: batch_dim = x[0].shape[0] x = torch.cat(x, dim=0) x = self.conv1(x) x = self.norm1(x) x = self.relu1(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.conv2(x) if self.training and self.dropout is not None: x = self.dropout(x) if is_list: x = torch.split(x, [batch_dim, batch_dim], dim=0) return x class SmallEncoder(nn.Module): def __init__(self, output_dim=128, norm_fn='batch', dropout=0.0): super(SmallEncoder, self).__init__() self.norm_fn = norm_fn if self.norm_fn == 'group': self.norm1 = nn.GroupNorm(num_groups=8, num_channels=32) elif self.norm_fn == 'batch': self.norm1 = nn.BatchNorm2d(32) elif self.norm_fn == 'instance': self.norm1 = nn.InstanceNorm2d(32) elif self.norm_fn == 'none': self.norm1 = nn.Sequential() self.conv1 = nn.Conv2d(3, 32, kernel_size=7, stride=2, padding=3) self.relu1 = nn.ReLU(inplace=True) self.in_planes = 32 self.layer1 = self._make_layer(32, stride=1) self.layer2 = self._make_layer(64, stride=2) self.layer3 = self._make_layer(96, stride=2) self.dropout = None if dropout > 0: self.dropout = nn.Dropout2d(p=dropout) self.conv2 = nn.Conv2d(96, output_dim, kernel_size=1) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.InstanceNorm2d, nn.GroupNorm)): if m.weight is not None: nn.init.constant_(m.weight, 1) if m.bias is not None: nn.init.constant_(m.bias, 0) def _make_layer(self, dim, stride=1): layer1 = BottleneckBlock(self.in_planes, dim, self.norm_fn, stride=stride) layer2 = BottleneckBlock(dim, dim, self.norm_fn, stride=1) layers = (layer1, layer2) self.in_planes = dim return nn.Sequential(*layers) def forward(self, x): # if input is list, combine batch dimension is_list = isinstance(x, tuple) or isinstance(x, list) if is_list: batch_dim = x[0].shape[0] x = torch.cat(x, dim=0) x = self.conv1(x) x = self.norm1(x) x = self.relu1(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.conv2(x) if self.training and self.dropout is not None: x = self.dropout(x) if is_list: x = torch.split(x, [batch_dim, batch_dim], dim=0) return x class ConvNets(nn.Module): def __init__(self, in_dim, out_dim, inter_dim, depth, stride=1): super(ConvNets, self).__init__() self.conv_first = nn.Conv2d(in_dim, inter_dim, kernel_size=3, padding=1, stride=stride) self.conv_last = nn.Conv2d(inter_dim, out_dim, kernel_size=3, padding=1, stride=stride) self.relu = nn.ReLU(inplace=True) self.inter_convs = nn.ModuleList( [ResidualBlock(inter_dim, inter_dim, norm_fn='none', stride=1) for i in range(depth)]) def forward(self, x): x = self.relu(self.conv_first(x)) for inter_conv in self.inter_convs: x = inter_conv(x) x = self.conv_last(x) return x class FlowHead(nn.Module): def __init__(self, input_dim=128, hidden_dim=256): super(FlowHead, self).__init__() self.conv1 = nn.Conv2d(input_dim, hidden_dim, 3, padding=1) self.conv2 = nn.Conv2d(hidden_dim, 2, 3, padding=1) self.relu = nn.ReLU(inplace=True) def forward(self, x): return self.conv2(self.relu(self.conv1(x))) class ConvGRU(nn.Module): def __init__(self, hidden_dim=128, input_dim=192+128): super(ConvGRU, self).__init__() self.convz = nn.Conv2d(hidden_dim+input_dim, hidden_dim, 3, padding=1) self.convr = nn.Conv2d(hidden_dim+input_dim, hidden_dim, 3, padding=1) self.convq = nn.Conv2d(hidden_dim+input_dim, hidden_dim, 3, padding=1) def forward(self, h, x): hx = torch.cat([h, x], dim=1) z = torch.sigmoid(self.convz(hx)) r = torch.sigmoid(self.convr(hx)) q = torch.tanh(self.convq(torch.cat([r*h, x], dim=1))) h = (1-z) * h + z * q return h class SepConvGRU(nn.Module): def __init__(self, hidden_dim=128, input_dim=192+128): super(SepConvGRU, self).__init__() self.convz1 = nn.Conv2d(hidden_dim+input_dim, hidden_dim, (1,5), padding=(0,2)) self.convr1 = nn.Conv2d(hidden_dim+input_dim, hidden_dim, (1,5), padding=(0,2)) self.convq1 = nn.Conv2d(hidden_dim+input_dim, hidden_dim, (1,5), padding=(0,2)) self.convz2 = nn.Conv2d(hidden_dim+input_dim, hidden_dim, (5,1), padding=(2,0)) self.convr2 = nn.Conv2d(hidden_dim+input_dim, hidden_dim, (5,1), padding=(2,0)) self.convq2 = nn.Conv2d(hidden_dim+input_dim, hidden_dim, (5,1), padding=(2,0)) def forward(self, h, x): # horizontal hx = torch.cat([h, x], dim=1) z = torch.sigmoid(self.convz1(hx)) r = torch.sigmoid(self.convr1(hx)) q = torch.tanh(self.convq1(torch.cat([r*h, x], dim=1))) h = (1-z) * h + z * q # vertical hx = torch.cat([h, x], dim=1) z = torch.sigmoid(self.convz2(hx)) r = torch.sigmoid(self.convr2(hx)) q = torch.tanh(self.convq2(torch.cat([r*h, x], dim=1))) h = (1-z) * h + z * q return h class BasicMotionEncoder(nn.Module): def __init__(self, args): super(BasicMotionEncoder, self).__init__() cor_planes = args.motion_feature_dim self.convc1 = nn.Conv2d(cor_planes, 256, 1, padding=0) self.convc2 = nn.Conv2d(256, 192, 3, padding=1) self.convf1 = nn.Conv2d(2, 128, 7, padding=3) self.convf2 = nn.Conv2d(128, 64, 3, padding=1) self.conv = nn.Conv2d(64+192, 128-2, 3, padding=1) def forward(self, flow, corr): cor = F.relu(self.convc1(corr)) cor = F.relu(self.convc2(cor)) flo = F.relu(self.convf1(flow)) flo = F.relu(self.convf2(flo)) cor_flo = torch.cat([cor, flo], dim=1) out = F.relu(self.conv(cor_flo)) return torch.cat([out, flow], dim=1) class BasicFuseMotion(nn.Module): def __init__(self, args): super(BasicFuseMotion, self).__init__() cor_planes = args.motion_feature_dim out_planes = args.query_latent_dim self.normf1 = nn.InstanceNorm2d(128) self.normf2 = nn.InstanceNorm2d(128) self.convf1 = nn.Conv2d(2, 128, 3, padding=1) self.convf2 = nn.Conv2d(128, 128, 3, padding=1) self.convf3 = nn.Conv2d(128, 64, 3, padding=1) s = 1 self.normc1 = nn.InstanceNorm2d(256*s) self.normc2 = nn.InstanceNorm2d(256*s) self.normc3 = nn.InstanceNorm2d(256*s) self.convc1 = nn.Conv2d(cor_planes+128, 256*s, 1, padding=0) self.convc2 = nn.Conv2d(256*s, 256*s, 3, padding=1) self.convc3 = nn.Conv2d(256*s, 256*s, 3, padding=1) self.convc4 = nn.Conv2d(256*s, 256*s, 3, padding=1) self.conv = nn.Conv2d(256*s + 64, out_planes, 1, padding=0) def forward(self, flow, feat, context1=None): flo = F.relu(self.normf1(self.convf1(flow))) flo = F.relu(self.normf2(self.convf2(flo))) flo = self.convf3(flo) feat = torch.cat([feat, context1], dim=1) feat = F.relu(self.normc1(self.convc1(feat))) feat = F.relu(self.normc2(self.convc2(feat))) feat = F.relu(self.normc3(self.convc3(feat))) feat = self.convc4(feat) feat = torch.cat([flo, feat], dim=1) feat = F.relu(self.conv(feat)) return feat class BasicUpdateBlock(nn.Module): def __init__(self, args, hidden_dim=128, input_dim=128): super(BasicUpdateBlock, self).__init__() self.args = args self.encoder = BasicMotionEncoder(args) self.gru = SepConvGRU(hidden_dim=hidden_dim, input_dim=128+hidden_dim) self.flow_head = FlowHead(hidden_dim, hidden_dim=256) self.mask = nn.Sequential( nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(256, 64*9, 1, padding=0)) def forward(self, net, inp, corr, flow, upsample=True): motion_features = self.encoder(flow, corr) inp = torch.cat([inp, motion_features], dim=1) net = self.gru(net, inp) delta_flow = self.flow_head(net) # scale mask to balence gradients mask = .25 * self.mask(net) return net, mask, delta_flow class DirectMeanMaskPredictor(nn.Module): def __init__(self, args): super(DirectMeanMaskPredictor, self).__init__() self.flow_head = FlowHead(args.predictor_dim, hidden_dim=256) self.mask = nn.Sequential( nn.Conv2d(args.predictor_dim, 256, 3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(256, 64*9, 1, padding=0)) def forward(self, motion_features): delta_flow = self.flow_head(motion_features) mask = .25 * self.mask(motion_features) return mask, delta_flow class BaiscMeanPredictor(nn.Module): def __init__(self, args, hidden_dim=128): super(BaiscMeanPredictor, self).__init__() self.args = args self.encoder = BasicMotionEncoder(args) self.flow_head = FlowHead(hidden_dim, hidden_dim=256) self.mask = nn.Sequential( nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(256, 64*9, 1, padding=0)) def forward(self, latent, flow): motion_features = self.encoder(flow, latent) delta_flow = self.flow_head(motion_features) mask = .25 * self.mask(motion_features) return mask, delta_flow class BasicRPEEncoder(nn.Module): def __init__(self, args): super(BasicRPEEncoder, self).__init__() self.args = args dim = args.query_latent_dim self.encoder = nn.Sequential( nn.Linear(2, dim // 2), nn.ReLU(inplace=True), nn.Linear(dim // 2, dim), nn.ReLU(inplace=True), nn.Linear(dim, dim) ) def forward(self, rpe_tokens): return self.encoder(rpe_tokens) from .twins import Block, CrossBlock class TwinsSelfAttentionLayer(nn.Module): def __init__(self, args): super(TwinsSelfAttentionLayer, self).__init__() self.args = args embed_dim = 256 num_heads = 8 mlp_ratio = 4 ws = 7 sr_ratio = 4 dpr = 0. drop_rate = 0. attn_drop_rate=0. self.local_block = Block(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr, sr_ratio=sr_ratio, ws=ws, with_rpe=True) self.global_block = Block(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr, sr_ratio=sr_ratio, ws=1, with_rpe=True) self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, nn.Linear): trunc_normal_(m.weight, std=.02) if isinstance(m, nn.Linear) and m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): nn.init.constant_(m.bias, 0) nn.init.constant_(m.weight, 1.0) elif isinstance(m, nn.Conv2d): fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels fan_out //= m.groups m.weight.data.normal_(0, math.sqrt(2.0 / fan_out)) if m.bias is not None: m.bias.data.zero_() elif isinstance(m, nn.BatchNorm2d): m.weight.data.fill_(1.0) m.bias.data.zero_() def forward(self, x, tgt, size): x = self.local_block(x, size) x = self.global_block(x, size) tgt = self.local_block(tgt, size) tgt = self.global_block(tgt, size) return x, tgt class TwinsCrossAttentionLayer(nn.Module): def __init__(self, args): super(TwinsCrossAttentionLayer, self).__init__() self.args = args embed_dim = 256 num_heads = 8 mlp_ratio = 4 ws = 7 sr_ratio = 4 dpr = 0. drop_rate = 0. attn_drop_rate=0. self.local_block = Block(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr, sr_ratio=sr_ratio, ws=ws, with_rpe=True) self.global_block = CrossBlock(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr, sr_ratio=sr_ratio, ws=1, with_rpe=True) self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, nn.Linear): trunc_normal_(m.weight, std=.02) if isinstance(m, nn.Linear) and m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): nn.init.constant_(m.bias, 0) nn.init.constant_(m.weight, 1.0) elif isinstance(m, nn.Conv2d): fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels fan_out //= m.groups m.weight.data.normal_(0, math.sqrt(2.0 / fan_out)) if m.bias is not None: m.bias.data.zero_() elif isinstance(m, nn.BatchNorm2d): m.weight.data.fill_(1.0) m.bias.data.zero_() def forward(self, x, tgt, size): x = self.local_block(x, size) tgt = self.local_block(tgt, size) x, tgt = self.global_block(x, tgt, size) return x, tgt