Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import math | |
from typing import Sequence | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from mmcv.cnn import build_conv_layer, build_norm_layer | |
from mmengine.model import BaseModule, ModuleList | |
from mmengine.utils import digit_version, to_2tuple | |
from mmpose.models.builder import TRANSFORMER | |
from easydict import EasyDict | |
from einops import rearrange, repeat | |
from mmpose.core import force_fp32 | |
from mmcv.cnn.bricks.transformer import (BaseTransformerLayer, | |
TransformerLayerSequence, | |
build_transformer_layer_sequence) | |
# from mmcv.cnn.bricks.registry import (TRANSFORMER_LAYER, | |
# TRANSFORMER_LAYER_SEQUENCE) | |
import torch.distributions as distributions | |
from mmcv.ops.multi_scale_deform_attn import MultiScaleDeformableAttention | |
from torch.nn.init import normal_ | |
import copy | |
import warnings | |
from mmcv.cnn import build_activation_layer, build_norm_layer | |
from mmengine.model import xavier_init | |
from utils.human_models import smpl_x | |
from config import cfg | |
from mmengine import Registry | |
TRANSFORMER_LAYER = Registry('transformerLayer') | |
TRANSFORMER_LAYER_SEQUENCE = Registry('transformer-layers sequence') | |
def point_sample(input, point_coords, **kwargs): | |
""" | |
A wrapper around :function:`torch.nn.functional.grid_sample` to support 3D point_coords tensors. | |
Unlike :function:`torch.nn.functional.grid_sample` it assumes `point_coords` to lie inside | |
[0, 1] x [0, 1] square. | |
Args: | |
input (Tensor): A tensor of shape (N, C, H, W) that contains features map on a H x W grid. | |
point_coords (Tensor): A tensor of shape (N, P, 2) or (N, Hgrid, Wgrid, 2) that contains | |
[0, 1] x [0, 1] normalized point coordinates. | |
Returns: | |
output (Tensor): A tensor of shape (N, C, P) or (N, C, Hgrid, Wgrid) that contains | |
features for points in `point_coords`. The features are obtained via bilinear | |
interplation from `input` the same way as :function:`torch.nn.functional.grid_sample`. | |
""" | |
add_dim = False | |
if point_coords.dim() == 3: | |
add_dim = True | |
point_coords = point_coords.unsqueeze(2) | |
output = F.grid_sample(input, 2.0 * point_coords - 1.0, **kwargs) | |
if add_dim: | |
output = output.squeeze(3) | |
return output | |
def nlc_to_nchw(x, hw_shape): | |
"""Convert [N, L, C] shape tensor to [N, C, H, W] shape tensor. | |
Args: | |
x (Tensor): The input tensor of shape [N, L, C] before conversion. | |
hw_shape (Sequence[int]): The height and width of output feature map. | |
Returns: | |
Tensor: The output tensor of shape [N, C, H, W] after conversion. | |
""" | |
H, W = hw_shape | |
assert len(x.shape) == 3 | |
B, L, C = x.shape | |
assert L == H * W, 'The seq_len does not match H, W' | |
return x.transpose(1, 2).reshape(B, C, H, W).contiguous() | |
def nchw_to_nlc(x): | |
"""Flatten [N, C, H, W] shape tensor to [N, L, C] shape tensor. | |
Args: | |
x (Tensor): The input tensor of shape [N, C, H, W] before conversion. | |
Returns: | |
Tensor: The output tensor of shape [N, L, C] after conversion. | |
""" | |
assert len(x.shape) == 4 | |
return x.flatten(2).transpose(1, 2).contiguous() | |
class AdaptivePadding(nn.Module): | |
"""Applies padding to input (if needed) so that input can get fully covered | |
by filter you specified. It support two modes "same" and "corner". The | |
"same" mode is same with "SAME" padding mode in TensorFlow, pad zero around | |
input. The "corner" mode would pad zero to bottom right. | |
Args: | |
kernel_size (int | tuple): Size of the kernel: | |
stride (int | tuple): Stride of the filter. Default: 1: | |
dilation (int | tuple): Spacing between kernel elements. | |
Default: 1 | |
padding (str): Support "same" and "corner", "corner" mode | |
would pad zero to bottom right, and "same" mode would | |
pad zero around input. Default: "corner". | |
Example: | |
>>> kernel_size = 16 | |
>>> stride = 16 | |
>>> dilation = 1 | |
>>> input = torch.rand(1, 1, 15, 17) | |
>>> adap_pad = AdaptivePadding( | |
>>> kernel_size=kernel_size, | |
>>> stride=stride, | |
>>> dilation=dilation, | |
>>> padding="corner") | |
>>> out = adap_pad(input) | |
>>> assert (out.shape[2], out.shape[3]) == (16, 32) | |
>>> input = torch.rand(1, 1, 16, 17) | |
>>> out = adap_pad(input) | |
>>> assert (out.shape[2], out.shape[3]) == (16, 32) | |
""" | |
def __init__(self, kernel_size=1, stride=1, dilation=1, padding='corner'): | |
super(AdaptivePadding, self).__init__() | |
assert padding in ('same', 'corner') | |
kernel_size = to_2tuple(kernel_size) | |
stride = to_2tuple(stride) | |
padding = to_2tuple(padding) | |
dilation = to_2tuple(dilation) | |
self.padding = padding | |
self.kernel_size = kernel_size | |
self.stride = stride | |
self.dilation = dilation | |
def get_pad_shape(self, input_shape): | |
input_h, input_w = input_shape | |
kernel_h, kernel_w = self.kernel_size | |
stride_h, stride_w = self.stride | |
output_h = math.ceil(input_h / stride_h) | |
output_w = math.ceil(input_w / stride_w) | |
pad_h = max((output_h - 1) * stride_h + | |
(kernel_h - 1) * self.dilation[0] + 1 - input_h, 0) | |
pad_w = max((output_w - 1) * stride_w + | |
(kernel_w - 1) * self.dilation[1] + 1 - input_w, 0) | |
return pad_h, pad_w | |
def forward(self, x): | |
pad_h, pad_w = self.get_pad_shape(x.size()[-2:]) | |
if pad_h > 0 or pad_w > 0: | |
if self.padding == 'corner': | |
x = F.pad(x, [0, pad_w, 0, pad_h]) | |
elif self.padding == 'same': | |
x = F.pad(x, [ | |
pad_w // 2, pad_w - pad_w // 2, pad_h // 2, | |
pad_h - pad_h // 2 | |
]) | |
return x | |
class PatchEmbed(BaseModule): | |
"""Image to Patch Embedding. | |
We use a conv layer to implement PatchEmbed. | |
Args: | |
in_channels (int): The num of input channels. Default: 3 | |
embed_dims (int): The dimensions of embedding. Default: 768 | |
conv_type (str): The config dict for embedding | |
conv layer type selection. Default: "Conv2d. | |
kernel_size (int): The kernel_size of embedding conv. Default: 16. | |
stride (int): The slide stride of embedding conv. | |
Default: None (Would be set as `kernel_size`). | |
padding (int | tuple | string ): The padding length of | |
embedding conv. When it is a string, it means the mode | |
of adaptive padding, support "same" and "corner" now. | |
Default: "corner". | |
dilation (int): The dilation rate of embedding conv. Default: 1. | |
bias (bool): Bias of embed conv. Default: True. | |
norm_cfg (dict, optional): Config dict for normalization layer. | |
Default: None. | |
input_size (int | tuple | None): The size of input, which will be | |
used to calculate the out size. Only work when `dynamic_size` | |
is False. Default: None. | |
init_cfg (`mmcv.ConfigDict`, optional): The Config for initialization. | |
Default: None. | |
""" | |
def __init__( | |
self, | |
in_channels=3, | |
embed_dims=768, | |
conv_type='Conv2d', | |
kernel_size=16, | |
stride=16, | |
padding='corner', | |
dilation=1, | |
bias=True, | |
norm_cfg=None, | |
input_size=None, | |
init_cfg=None, | |
): | |
super(PatchEmbed, self).__init__(init_cfg=init_cfg) | |
self.embed_dims = embed_dims | |
if stride is None: | |
stride = kernel_size | |
kernel_size = to_2tuple(kernel_size) | |
stride = to_2tuple(stride) | |
dilation = to_2tuple(dilation) | |
if isinstance(padding, str): | |
self.adap_padding = AdaptivePadding( | |
kernel_size=kernel_size, | |
stride=stride, | |
dilation=dilation, | |
padding=padding) | |
# disable the padding of conv | |
padding = 0 | |
else: | |
self.adap_padding = None | |
padding = to_2tuple(padding) | |
self.projection = build_conv_layer( | |
dict(type=conv_type), | |
in_channels=in_channels, | |
out_channels=embed_dims, | |
kernel_size=kernel_size, | |
stride=stride, | |
padding=padding, | |
dilation=dilation, | |
bias=bias) | |
if norm_cfg is not None: | |
self.norm = build_norm_layer(norm_cfg, embed_dims)[1] | |
else: | |
self.norm = None | |
if input_size: | |
input_size = to_2tuple(input_size) | |
# `init_out_size` would be used outside to | |
# calculate the num_patches | |
# when `use_abs_pos_embed` outside | |
self.init_input_size = input_size | |
if self.adap_padding: | |
pad_h, pad_w = self.adap_padding.get_pad_shape(input_size) | |
input_h, input_w = input_size | |
input_h = input_h + pad_h | |
input_w = input_w + pad_w | |
input_size = (input_h, input_w) | |
# https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html | |
h_out = (input_size[0] + 2 * padding[0] - dilation[0] * | |
(kernel_size[0] - 1) - 1) // stride[0] + 1 | |
w_out = (input_size[1] + 2 * padding[1] - dilation[1] * | |
(kernel_size[1] - 1) - 1) // stride[1] + 1 | |
self.init_out_size = (h_out, w_out) | |
else: | |
self.init_input_size = None | |
self.init_out_size = None | |
def forward(self, x): | |
""" | |
Args: | |
x (Tensor): Has shape (B, C, H, W). In most case, C is 3. | |
Returns: | |
tuple: Contains merged results and its spatial shape. | |
- x (Tensor): Has shape (B, out_h * out_w, embed_dims) | |
- out_size (tuple[int]): Spatial shape of x, arrange as | |
(out_h, out_w). | |
""" | |
if self.adap_padding: | |
x = self.adap_padding(x) | |
x = self.projection(x) | |
out_size = (x.shape[2], x.shape[3]) | |
x = x.flatten(2).transpose(1, 2) | |
if self.norm is not None: | |
x = self.norm(x) | |
return x, out_size | |
class PatchMerging(BaseModule): | |
"""Merge patch feature map. | |
This layer groups feature map by kernel_size, and applies norm and linear | |
layers to the grouped feature map. Our implementation uses `nn.Unfold` to | |
merge patch, which is about 25% faster than original implementation. | |
Instead, we need to modify pretrained models for compatibility. | |
Args: | |
in_channels (int): The num of input channels. | |
to gets fully covered by filter and stride you specified.. | |
Default: True. | |
out_channels (int): The num of output channels. | |
kernel_size (int | tuple, optional): the kernel size in the unfold | |
layer. Defaults to 2. | |
stride (int | tuple, optional): the stride of the sliding blocks in the | |
unfold layer. Default: None. (Would be set as `kernel_size`) | |
padding (int | tuple | string ): The padding length of | |
embedding conv. When it is a string, it means the mode | |
of adaptive padding, support "same" and "corner" now. | |
Default: "corner". | |
dilation (int | tuple, optional): dilation parameter in the unfold | |
layer. Default: 1. | |
bias (bool, optional): Whether to add bias in linear layer or not. | |
Defaults: False. | |
norm_cfg (dict, optional): Config dict for normalization layer. | |
Default: dict(type='LN'). | |
init_cfg (dict, optional): The extra config for initialization. | |
Default: None. | |
""" | |
def __init__(self, | |
in_channels, | |
out_channels, | |
kernel_size=2, | |
stride=None, | |
padding='corner', | |
dilation=1, | |
bias=False, | |
norm_cfg=dict(type='LN'), | |
init_cfg=None): | |
super().__init__(init_cfg=init_cfg) | |
self.in_channels = in_channels | |
self.out_channels = out_channels | |
if stride: | |
stride = stride | |
else: | |
stride = kernel_size | |
kernel_size = to_2tuple(kernel_size) | |
stride = to_2tuple(stride) | |
dilation = to_2tuple(dilation) | |
if isinstance(padding, str): | |
self.adap_padding = AdaptivePadding( | |
kernel_size=kernel_size, | |
stride=stride, | |
dilation=dilation, | |
padding=padding) | |
# disable the padding of unfold | |
padding = 0 | |
else: | |
self.adap_padding = None | |
padding = to_2tuple(padding) | |
self.sampler = nn.Unfold( | |
kernel_size=kernel_size, | |
dilation=dilation, | |
padding=padding, | |
stride=stride) | |
sample_dim = kernel_size[0] * kernel_size[1] * in_channels | |
if norm_cfg is not None: | |
self.norm = build_norm_layer(norm_cfg, sample_dim)[1] | |
else: | |
self.norm = None | |
self.reduction = nn.Linear(sample_dim, out_channels, bias=bias) | |
def forward(self, x, input_size): | |
""" | |
Args: | |
x (Tensor): Has shape (B, H*W, C_in). | |
input_size (tuple[int]): The spatial shape of x, arrange as (H, W). | |
Default: None. | |
Returns: | |
tuple: Contains merged results and its spatial shape. | |
- x (Tensor): Has shape (B, Merged_H * Merged_W, C_out) | |
- out_size (tuple[int]): Spatial shape of x, arrange as | |
(Merged_H, Merged_W). | |
""" | |
B, L, C = x.shape | |
assert isinstance(input_size, Sequence), f'Expect ' \ | |
f'input_size is ' \ | |
f'`Sequence` ' \ | |
f'but get {input_size}' | |
H, W = input_size | |
assert L == H * W, 'input feature has wrong size' | |
x = x.view(B, H, W, C).permute([0, 3, 1, 2]) # B, C, H, W | |
# Use nn.Unfold to merge patch. About 25% faster than original method, | |
# but need to modify pretrained model for compatibility | |
if self.adap_padding: | |
x = self.adap_padding(x) | |
H, W = x.shape[-2:] | |
x = self.sampler(x) | |
# if kernel_size=2 and stride=2, x should has shape (B, 4*C, H/2*W/2) | |
out_h = (H + 2 * self.sampler.padding[0] - self.sampler.dilation[0] * | |
(self.sampler.kernel_size[0] - 1) - | |
1) // self.sampler.stride[0] + 1 | |
out_w = (W + 2 * self.sampler.padding[1] - self.sampler.dilation[1] * | |
(self.sampler.kernel_size[1] - 1) - | |
1) // self.sampler.stride[1] + 1 | |
output_size = (out_h, out_w) | |
x = x.transpose(1, 2) # B, H/2*W/2, 4*C | |
x = self.norm(x) if self.norm else x | |
x = self.reduction(x) | |
return x, output_size | |
def inverse_sigmoid(x, eps=1e-5): | |
"""Inverse function of sigmoid. | |
Args: | |
x (Tensor): The tensor to do the | |
inverse. | |
eps (float): EPS avoid numerical | |
overflow. Defaults 1e-5. | |
Returns: | |
Tensor: The x has passed the inverse | |
function of sigmoid, has same | |
shape with input. | |
""" | |
x = x.clamp(min=0, max=1) | |
x1 = x.clamp(min=eps) | |
x2 = (1 - x).clamp(min=eps) | |
return torch.log(x1 / x2) | |
class DetrTransformerEncoder_zero_layer(): | |
def __init__(self, *args, post_norm_cfg=dict(type='LN'), **kwargs): | |
pass | |
def __call__(self, | |
query, | |
key, | |
value, | |
query_pos=None, | |
key_pos=None, | |
attn_masks=None, | |
query_key_padding_mask=None, | |
key_padding_mask=None, | |
**kwargs): | |
query = query + query_pos | |
return query | |
class DetrTransformerDecoderLayer_grouped(BaseTransformerLayer): | |
def __init__(self, | |
attn_cfgs, | |
feedforward_channels, | |
ffn_dropout=0.0, | |
operation_order=None, | |
act_cfg=dict(type='ReLU', inplace=True), | |
norm_cfg=dict(type='LN'), | |
ffn_num_fcs=2, | |
num_joints=17, | |
**kwargs): | |
super(DetrTransformerDecoderLayer_grouped, self).__init__( | |
attn_cfgs=attn_cfgs, | |
feedforward_channels=feedforward_channels, | |
ffn_dropout=ffn_dropout, | |
operation_order=operation_order, | |
act_cfg=act_cfg, | |
norm_cfg=norm_cfg, | |
ffn_num_fcs=ffn_num_fcs, | |
**kwargs) | |
# assert len(operation_order) == 6 | |
# assert set(operation_order) == set( | |
# ['self_attn', 'norm', 'cross_attn', 'ffn']) | |
self.num_joints = num_joints | |
# self.num_joints = len(smpl_x.pos_joint_part['rhand']) | |
# self.num_joints = len(smpl_x.pos_joint_part['body']) + len(smpl_x.pos_joint_part['rhand']) + len(smpl_x.pos_joint_part['lhand']) | |
def forward(self, | |
query, | |
key=None, | |
value=None, | |
query_pos=None, | |
key_pos=None, | |
attn_masks=None, | |
query_key_padding_mask=None, | |
key_padding_mask=None, | |
**kwargs): | |
norm_index = 0 | |
attn_index = 0 | |
ffn_index = 0 | |
identity = query | |
if attn_masks is None: | |
attn_masks = [None for _ in range(self.num_attn)] | |
elif isinstance(attn_masks, torch.Tensor): | |
attn_masks = [ | |
copy.deepcopy(attn_masks) for _ in range(self.num_attn) | |
] | |
warnings.warn(f'Use same attn_mask in all attentions in ' | |
f'{self.__class__.__name__} ') | |
else: | |
assert len(attn_masks) == self.num_attn, f'The length of ' \ | |
f'attn_masks {len(attn_masks)} must be equal ' \ | |
f'to the number of attention in ' \ | |
f'operation_order {self.num_attn}' | |
for layer in self.operation_order: | |
if layer == 'self_attn': | |
# print(query.shape) | |
assert query.size(0) % self.num_joints == 0, f'query.shape: {query.shape}, num_joints: {self.num_joints}' | |
num_group = query.size(0) // self.num_joints | |
bs = query.size(1) | |
temp_query = rearrange(query, '(g k) b c -> k (g b) c', | |
g=num_group, k=self.num_joints) | |
temp_identity = rearrange(identity, '(g k) b c -> k (g b) c', | |
g=num_group, k=self.num_joints) | |
temp_query_pos = rearrange(query_pos, '(g k) b c -> k (g b) c', | |
g=num_group, k=self.num_joints) | |
temp_key = temp_value = temp_query | |
query = self.attentions[attn_index]( | |
temp_query, | |
temp_key, | |
temp_value, | |
temp_identity if self.pre_norm else None, | |
query_pos=temp_query_pos, | |
key_pos=temp_query_pos, | |
attn_mask=attn_masks[attn_index], | |
key_padding_mask=query_key_padding_mask, | |
**kwargs) | |
query = rearrange(query, 'k (g b) c -> (g k) b c', | |
g=num_group, b=bs) | |
attn_index += 1 | |
identity = query | |
elif layer == 'norm': | |
query = self.norms[norm_index](query) | |
norm_index += 1 | |
elif layer == 'cross_attn': | |
query = self.attentions[attn_index]( | |
query, | |
key, | |
value, | |
identity if self.pre_norm else None, | |
query_pos=query_pos, | |
key_pos=key_pos, | |
attn_mask=attn_masks[attn_index], | |
key_padding_mask=key_padding_mask, | |
**kwargs) | |
attn_index += 1 | |
identity = query | |
elif layer == 'ffn': | |
query = self.ffns[ffn_index]( | |
query, identity if self.pre_norm else None) | |
ffn_index += 1 | |
if 'cross_attn' not in self.operation_order: | |
query = query + value.sum() * 0 | |
return query | |
class DeformableDetrTransformerDecoder(TransformerLayerSequence): | |
"""Implements the decoder in DETR transformer. | |
Args: | |
return_intermediate (bool): Whether to return intermediate outputs. | |
coder_norm_cfg (dict): Config of last normalization layer. Default: | |
`LN`. | |
""" | |
def __init__(self, *args, return_intermediate=False, **kwargs): | |
super(DeformableDetrTransformerDecoder, self).__init__(*args, **kwargs) | |
self.return_intermediate = return_intermediate | |
def forward(self, | |
query, | |
*args, | |
reference_points=None, | |
valid_ratios=None, | |
reg_branches=None, | |
fc_coord=None, | |
**kwargs): | |
output = query | |
intermediate = [] | |
intermediate_reference_points = [] | |
for lid, layer in enumerate(self.layers): | |
if reference_points.shape[-1] == 4: | |
reference_points_input = reference_points[:, :, None] * \ | |
torch.cat([valid_ratios, valid_ratios], -1)[:, None] | |
else: | |
assert reference_points.shape[-1] == 3 | |
# print(reference_points.shape, valid_ratios.shape) # [48,65,3], [48,4,3] | |
reference_points_input = reference_points[:, :, None, :2] * \ | |
valid_ratios[:, None] | |
# assert reference_points.shape[-1] == 2 | |
# reference_points_input = reference_points[:, :, None] * \ | |
# valid_ratios[:, None] | |
# print(output.shape, reference_points_input.shape) | |
output = layer( | |
output, | |
*args, | |
reference_points=reference_points_input, | |
**kwargs) | |
output = output.permute(1, 0, 2) | |
# if reg_branches is not None: | |
# tmp = reg_branches[lid](output) | |
# | |
# if fc_coord is not None: | |
# tmp = fc_coord(tmp) | |
# | |
# if reference_points.shape[-1] == 4: | |
# new_reference_points = tmp + inverse_sigmoid( | |
# reference_points) | |
# new_reference_points = new_reference_points.sigmoid() | |
# else: | |
# assert reference_points.shape[-1] == 3 | |
# new_reference_points = tmp | |
# new_reference_points[..., :3] = tmp[ | |
# ..., :3] + inverse_sigmoid(reference_points) | |
# new_reference_points = new_reference_points.sigmoid() | |
# # else: | |
# # assert reference_points.shape[-1] == 2 | |
# # new_reference_points = tmp | |
# # new_reference_points[..., :2] = tmp[ | |
# # ..., :2] + inverse_sigmoid(reference_points) | |
# # new_reference_points = new_reference_points.sigmoid() | |
# # # reference_points = new_reference_points.detach() | |
# # reference_points = new_reference_points | |
# reference_points = new_reference_points | |
output = output.permute(1, 0, 2) | |
if self.return_intermediate: | |
intermediate.append(output) | |
intermediate_reference_points.append(reference_points) | |
if self.return_intermediate: | |
return torch.stack(intermediate), torch.stack( | |
intermediate_reference_points) | |
return output, reference_points | |
class Linear_with_norm(nn.Module): | |
def __init__(self, in_channel, out_channel, bias=True, norm=True): | |
super(Linear_with_norm, self).__init__() | |
self.bias = bias | |
self.norm = norm | |
self.linear = nn.Linear(in_channel, out_channel, bias) | |
nn.init.xavier_uniform_(self.linear.weight, gain=0.01) | |
def forward(self, x): | |
y = x.matmul(self.linear.weight.t()) | |
if self.norm: | |
x_norm = torch.norm(x, dim=1, keepdim=True) | |
y = y / x_norm | |
if self.bias: | |
y = y + self.linear.bias | |
return y | |
class Transformer(BaseModule): | |
"""Implements the DETR transformer. | |
Following the official DETR implementation, this module copy-paste | |
from torch.nn.Transformer with modifications: | |
* positional encodings are passed in MultiheadAttention | |
* extra LN at the end of encoder is removed | |
* decoder returns a stack of activations from all decoding layers | |
See `paper: End-to-End Object Detection with Transformers | |
<https://arxiv.org/pdf/2005.12872>`_ for details. | |
Args: | |
encoder (`mmcv.ConfigDict` | Dict): Config of | |
TransformerEncoder. Defaults to None. | |
decoder ((`mmcv.ConfigDict` | Dict)): Config of | |
TransformerDecoder. Defaults to None | |
init_cfg (obj:`mmcv.ConfigDict`): The Config for initialization. | |
Defaults to None. | |
""" | |
def __init__(self, encoder=None, decoder=None, init_cfg=None): | |
super(Transformer, self).__init__(init_cfg=init_cfg) | |
self.encoder = build_transformer_layer_sequence(encoder) | |
self.decoder = build_transformer_layer_sequence(decoder) | |
# self.embed_dims = self.encoder.embed_dims | |
def init_weights(self): | |
# follow the official DETR to init parameters | |
for m in self.modules(): | |
if hasattr(m, 'weight') and m.weight.dim() > 1: | |
xavier_init(m, distribution='uniform') | |
self._is_init = True | |
def forward(self, x, mask, query_embed, pos_embed): | |
"""Forward function for `Transformer`. | |
Args: | |
x (Tensor): Input query with shape [bs, c, h, w] where | |
c = embed_dims. | |
mask (Tensor): The key_padding_mask used for encoder and decoder, | |
with shape [bs, h, w]. | |
query_embed (Tensor): The query embedding for decoder, with shape | |
[num_query, c]. | |
pos_embed (Tensor): The positional encoding for encoder and | |
decoder, with the same shape as `x`. | |
Returns: | |
tuple[Tensor]: results of decoder containing the following tensor. | |
- out_dec: Output from decoder. If return_intermediate_dec \ | |
is True output has shape [num_dec_layers, bs, | |
num_query, embed_dims], else has shape [1, bs, \ | |
num_query, embed_dims]. | |
- memory: Output results from encoder, with shape \ | |
[bs, embed_dims, h, w]. | |
""" | |
bs, c, h, w = x.shape | |
# use `view` instead of `flatten` for dynamically exporting to ONNX | |
x = x.view(bs, c, -1).permute(2, 0, 1) # [bs, c, h, w] -> [h*w, bs, c] | |
pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1) | |
query_embed = query_embed.unsqueeze(1).repeat( | |
1, bs, 1) # [num_query, dim] -> [num_query, bs, dim] | |
mask = mask.view(bs, -1) # [bs, h, w] -> [bs, h*w] | |
memory = self.encoder( | |
query=x, | |
key=None, | |
value=None, | |
query_pos=pos_embed, | |
query_key_padding_mask=mask) | |
target = torch.zeros_like(query_embed) | |
# out_dec: [num_layers, num_query, bs, dim] | |
out_dec = self.decoder( | |
query=target, | |
key=memory, | |
value=memory, | |
key_pos=pos_embed, | |
query_pos=query_embed, | |
key_padding_mask=mask) | |
out_dec = out_dec.transpose(1, 2) | |
memory = memory.permute(1, 2, 0).reshape(bs, c, h, w) | |
return out_dec, memory | |
class PoseurTransformer_v3(Transformer): | |
""" add noise training """ | |
def __init__(self, | |
as_two_stage=False, | |
num_feature_levels=4, | |
two_stage_num_proposals=300, | |
num_joints=17, | |
use_soft_argmax=False, | |
use_soft_argmax_def=False, | |
proposal_feature='backbone_s', # or encoder_memory | |
image_size=[192, 256], | |
init_q_sigmoid=False, | |
soft_arg_stride=4, | |
add_feat_2_query=False, | |
query_pose_emb=True, | |
num_noise_sample=3, | |
num_noise_point=4, | |
noise_sigma=0.2, | |
embed_dims=256, | |
**kwargs): | |
super(PoseurTransformer_v3, self).__init__(**kwargs) | |
assert query_pose_emb == True | |
# self.num_noise_sample = num_noise_sample | |
self.num_noise_sample = num_noise_sample | |
self.num_noise_point = num_noise_point | |
self.noise_sigma = noise_sigma | |
self.add_feat_2_query = add_feat_2_query | |
self.as_two_stage = as_two_stage | |
self.num_feature_levels = num_feature_levels | |
self.two_stage_num_proposals = two_stage_num_proposals | |
try: | |
self.embed_dims = self.encoder.embed_dims | |
except: | |
self.embed_dims = embed_dims | |
self.num_joints = num_joints | |
# self.num_joints = 17 | |
# self.num_joints = len(smpl_x.pos_joint_part['rhand']) # body_joints+bboxes | |
# self.num_joints = len(smpl_x.pos_joint_part['body']) + len(smpl_x.pos_joint_part['rhand']) + len(smpl_x.pos_joint_part['lhand']) | |
self.use_soft_argmax = use_soft_argmax | |
self.use_soft_argmax_def = use_soft_argmax_def | |
assert not (self.use_soft_argmax & self.use_soft_argmax_def) | |
self.init_q_sigmoid = init_q_sigmoid | |
self.image_size = image_size | |
self.soft_arg_stride = soft_arg_stride | |
self.proposal_feature = proposal_feature | |
self.query_pose_emb = query_pose_emb | |
self.prior = distributions.MultivariateNormal(torch.zeros(2), torch.eye(2) * self.noise_sigma) | |
self.init_layers() | |
def init_layers(self): | |
"""Initialize layers of the DeformableDetrTransformer.""" | |
self.level_embeds = nn.Parameter( | |
torch.Tensor(self.num_feature_levels, self.embed_dims)) | |
if self.as_two_stage: | |
self.avg_pool = nn.AdaptiveAvgPool2d(1) | |
# self.fc_sigma = Linear_with_norm(self.embed_dims, self.num_joints * 2, norm=False) | |
self.fc_sigma = Linear_with_norm(self.embed_dims, self.num_joints * 3, norm=False) | |
if self.use_soft_argmax: | |
self.soft_argmax_coord = Heatmap1DHead(in_channels=self.embed_dims, expand_ratio=2, hidden_dims=(512,), | |
image_size=self.image_size, stride=self.soft_arg_stride) | |
self.fc_layers = [self.fc_sigma] | |
elif self.use_soft_argmax_def: | |
self.soft_argmax_coord = Heatmap2DHead(in_channels=self.embed_dims, | |
image_size=self.image_size, stride=self.soft_arg_stride) | |
self.fc_layers = [self.fc_sigma] | |
else: | |
# self.fc_coord = Linear_with_norm(self.embed_dims, self.num_joints * 2) | |
self.fc_coord = Linear_with_norm(self.embed_dims, self.num_joints * 3) | |
self.fc_layers = [self.fc_coord, self.fc_sigma] | |
if self.query_pose_emb: | |
self.pos_trans = nn.Linear(self.embed_dims * 2, | |
self.embed_dims) | |
self.pos_trans_norm = nn.LayerNorm(self.embed_dims) | |
# self.pos_embed = nn.Embedding(17,self.embed_dims) | |
self.pos_embed = nn.Embedding(self.num_joints, self.embed_dims) | |
else: | |
self.pos_trans = nn.Linear(self.embed_dims * 2, | |
self.embed_dims * 2) | |
self.pos_trans_norm = nn.LayerNorm(self.embed_dims * 2) | |
else: | |
self.reference_points = nn.Linear(self.embed_dims, 2) | |
self.fp16_enabled = False | |
def init_weights(self): | |
"""Initialize the transformer weights.""" | |
for p in self.parameters(): | |
if p.dim() > 1: | |
nn.init.xavier_uniform_(p) | |
for m in self.modules(): | |
if isinstance(m, MultiScaleDeformableAttention): | |
m.init_weights() | |
if not self.as_two_stage: | |
xavier_init(self.reference_points, distribution='uniform', bias=0.) | |
normal_(self.level_embeds) | |
if self.use_soft_argmax: | |
self.soft_argmax_coord.init_weights() | |
if self.as_two_stage: | |
for m in self.fc_layers: | |
if isinstance(m, nn.Linear): | |
nn.init.xavier_uniform_(m.weight, gain=0.01) | |
def gen_encoder_output_proposals(self, memory, memory_padding_mask, | |
spatial_shapes): | |
"""Generate proposals from encoded memory. | |
Args: | |
memory (Tensor) : The output of encoder, | |
has shape (bs, num_key, embed_dim). num_key is | |
equal the number of points on feature map from | |
all level. | |
memory_padding_mask (Tensor): Padding mask for memory. | |
has shape (bs, num_key). | |
spatial_shapes (Tensor): The shape of all feature maps. | |
has shape (num_level, 2). | |
Returns: | |
tuple: A tuple of feature map and bbox prediction. | |
- output_memory (Tensor): The input of decoder, \ | |
has shape (bs, num_key, embed_dim). num_key is \ | |
equal the number of points on feature map from \ | |
all levels. | |
- output_proposals (Tensor): The normalized proposal \ | |
after a inverse sigmoid, has shape \ | |
(bs, num_keys, 4). | |
""" | |
N, S, C = memory.shape | |
proposals = [] | |
_cur = 0 | |
for lvl, (H, W) in enumerate(spatial_shapes): | |
mask_flatten_ = memory_padding_mask[:, _cur:(_cur + H * W)].view( | |
N, H, W, 1) | |
valid_H = torch.sum(~mask_flatten_[:, :, 0, 0], 1) | |
valid_W = torch.sum(~mask_flatten_[:, 0, :, 0], 1) | |
grid_y, grid_x = torch.meshgrid( | |
torch.linspace( | |
0, H - 1, H, dtype=torch.float32, device=memory.device), | |
torch.linspace( | |
0, W - 1, W, dtype=torch.float32, device=memory.device)) | |
grid = torch.cat([grid_x.unsqueeze(-1), grid_y.unsqueeze(-1)], -1) | |
scale = torch.cat([valid_W.unsqueeze(-1), | |
valid_H.unsqueeze(-1)], 1).view(N, 1, 1, 2) | |
grid = (grid.unsqueeze(0).expand(N, -1, -1, -1) + 0.5) / scale | |
wh = torch.ones_like(grid) * 0.05 * (2.0 ** lvl) | |
# proposal = torch.cat((grid, wh), -1).view(N, -1, 4) | |
proposal = grid.view(N, -1, 2) | |
proposals.append(proposal) | |
_cur += (H * W) | |
output_proposals = torch.cat(proposals, 1) | |
output_proposals_valid = ((output_proposals > 0.01) & | |
(output_proposals < 0.99)).all( | |
-1, keepdim=True) | |
output_proposals = torch.log(output_proposals / (1 - output_proposals)) | |
output_proposals = output_proposals.masked_fill( | |
memory_padding_mask.unsqueeze(-1), float('inf')) | |
output_proposals = output_proposals.masked_fill( | |
~output_proposals_valid, float('inf')) | |
output_memory = memory | |
output_memory = output_memory.masked_fill( | |
memory_padding_mask.unsqueeze(-1), float(0)) | |
output_memory = output_memory.masked_fill(~output_proposals_valid, | |
float(0)) | |
output_memory = self.enc_output_norm(self.enc_output(output_memory)) | |
return output_memory, output_proposals | |
def get_reference_points(spatial_shapes, valid_ratios, device): | |
"""Get the reference points used in decoder. | |
Args: | |
spatial_shapes (Tensor): The shape of all | |
feature maps, has shape (num_level, 2). | |
valid_ratios (Tensor): The radios of valid | |
points on the feature map, has shape | |
(bs, num_levels, 2) | |
device (obj:`device`): The device where | |
reference_points should be. | |
Returns: | |
Tensor: reference points used in decoder, has \ | |
shape (bs, num_keys, num_levels, 2). | |
""" | |
# print(spatial_shapes) | |
reference_points_list = [] | |
for lvl, (H, W) in enumerate(spatial_shapes): | |
# TODO check this 0.5 | |
ref_y, ref_x = torch.meshgrid( | |
torch.linspace( | |
0.5, H - 0.5, H, dtype=torch.float32, device=device), | |
torch.linspace( | |
0.5, W - 0.5, W, dtype=torch.float32, device=device)) | |
ref_y = ref_y.reshape(-1)[None] / ( | |
valid_ratios[:, None, lvl, 1] * H) | |
ref_x = ref_x.reshape(-1)[None] / ( | |
valid_ratios[:, None, lvl, 0] * W) | |
ref = torch.stack((ref_x, ref_y), -1) | |
reference_points_list.append(ref) | |
# print(reference_points_list[-1]) # range:(0,1) | |
# print(H, W) [8,6] | |
reference_points = torch.cat(reference_points_list, 1) | |
reference_points = reference_points[:, :, None] * valid_ratios[:, None] | |
return reference_points | |
def get_valid_ratio(self, mask): | |
"""Get the valid radios of feature maps of all level.""" | |
_, H, W = mask.shape | |
valid_H = torch.sum(~mask[:, :, 0], 1) | |
valid_W = torch.sum(~mask[:, 0, :], 1) | |
valid_ratio_h = valid_H.float() / H | |
valid_ratio_w = valid_W.float() / W | |
valid_ratio = torch.stack([valid_ratio_w, valid_ratio_h], -1) | |
return valid_ratio | |
def get_proposal_pos_embed(self, | |
proposals, | |
num_pos_feats=128, | |
temperature=10000): | |
"""Get the position embedding of proposal.""" | |
num_pos_feats = self.embed_dims // 3 + 1 | |
scale = 2 * math.pi | |
dim_t = torch.arange( | |
num_pos_feats, dtype=torch.float32, device=proposals.device) | |
dim_t = temperature ** (2 * (dim_t // 2) / num_pos_feats) | |
# N, L, 2 | |
if self.init_q_sigmoid: | |
proposals = proposals.sigmoid() * scale | |
else: | |
proposals = proposals * scale | |
# N, L, 3, 86 | |
pos = proposals[:, :, :, None] / dim_t | |
# N, L, 3, 43, 2 | |
pos = torch.stack((pos[:, :, :, 0::2].sin(), pos[:, :, :, 1::2].cos()), dim=4).flatten(2) | |
return pos[:, :, :self.embed_dims] | |
def forward(self, | |
mlvl_feats, | |
mlvl_masks, | |
query_embed, | |
mlvl_pos_embeds, | |
reg_branches=None, | |
fc_coord=None, | |
cls_branches=None, | |
coord_init=None, | |
query_init=None, | |
**kwargs): | |
assert self.as_two_stage or query_embed is not None | |
feat_flatten = [] | |
mask_flatten = [] | |
lvl_pos_embed_flatten = [] | |
spatial_shapes = [] | |
for lvl, (feat, mask, pos_embed) in enumerate( | |
zip(mlvl_feats, mlvl_masks, mlvl_pos_embeds)): | |
bs, c, h, w = feat.shape | |
spatial_shape = (h, w) | |
spatial_shapes.append(spatial_shape) | |
feat = feat.flatten(2).transpose(1, 2) | |
mask = mask.flatten(1) | |
pos_embed = pos_embed.flatten(2).transpose(1, 2) | |
lvl_pos_embed = pos_embed + self.level_embeds[lvl].view(1, 1, -1) | |
lvl_pos_embed_flatten.append(lvl_pos_embed) | |
feat_flatten.append(feat) | |
mask_flatten.append(mask) | |
feat_flatten = torch.cat(feat_flatten, 1) | |
mask_flatten = torch.cat(mask_flatten, 1) | |
lvl_pos_embed_flatten = torch.cat(lvl_pos_embed_flatten, 1) | |
spatial_shapes = torch.as_tensor( | |
spatial_shapes, dtype=torch.long, device=feat_flatten.device) | |
level_start_index = torch.cat((spatial_shapes.new_zeros( | |
(1,)), spatial_shapes.prod(1).cumsum(0)[:-1])) | |
valid_ratios = torch.stack( | |
[self.get_valid_ratio(m) for m in mlvl_masks], 1) | |
# [bs, H*W, num_lvls, 2] | |
# print(spatial_shape) | |
reference_points = \ | |
self.get_reference_points(spatial_shapes, | |
valid_ratios, | |
device=feat.device) | |
# print(reference_points.shape, valid_ratios.shape) # [bs, 4080, 4, 2]; [bs, 4, 2] | |
feat_flatten = feat_flatten.permute(1, 0, 2) # (H*W, bs, embed_dims) | |
lvl_pos_embed_flatten = lvl_pos_embed_flatten.permute( | |
1, 0, 2) # (H*W, bs, embed_dims) | |
memory = self.encoder( | |
query=feat_flatten, | |
key=None, | |
value=None, | |
query_pos=lvl_pos_embed_flatten, | |
query_key_padding_mask=mask_flatten, | |
spatial_shapes=spatial_shapes, | |
reference_points=reference_points, | |
level_start_index=level_start_index, | |
valid_ratios=valid_ratios, | |
**kwargs) | |
memory = memory.permute(1, 0, 2) | |
bs, _, c = memory.shape | |
if self.proposal_feature == 'backbone_l': | |
x = mlvl_feats[0] | |
elif self.proposal_feature == 'backbone_s': | |
x = mlvl_feats[-1] | |
point_sample_feat = mlvl_feats[-1] | |
elif self.proposal_feature == 'encoder_memory_l': | |
x = memory.permute(0, 2, 1)[:, :, :int(level_start_index[1])].view_as(mlvl_feats[0]) | |
point_sample_feat = memory.permute(0, 2, 1)[:, :, :int(level_start_index[1])].view_as(mlvl_feats[0]) | |
elif self.proposal_feature == 'encoder_memory_s': | |
x = memory.permute(0, 2, 1)[:, :, int(level_start_index[-1]):].view_as(mlvl_feats[-1]) | |
else: | |
raise NotImplementedError | |
BATCH_SIZE = x.shape[0] | |
if coord_init is not None: | |
pred_jts = coord_init | |
enc_outputs = None | |
else: | |
if self.use_soft_argmax: | |
out_coord = self.soft_argmax_coord(x) # bs, 17, 2 | |
assert out_coord.shape[2] == 2 | |
x = self.avg_pool(x).reshape(BATCH_SIZE, -1) | |
out_sigma = self.fc_sigma(x).reshape(BATCH_SIZE, self.num_joints, -1) | |
elif self.use_soft_argmax_def: | |
out_coord = self.soft_argmax_coord(x) # bs, 17, 2 | |
assert out_coord.shape[2] == 2 | |
x = self.avg_pool(x).reshape(BATCH_SIZE, -1) | |
out_sigma = self.fc_sigma(x).reshape(BATCH_SIZE, self.num_joints, -1) | |
else: | |
x = self.avg_pool(x).reshape(BATCH_SIZE, -1) | |
out_coord = self.fc_coord(x).reshape(BATCH_SIZE, self.num_joints, 3) | |
assert out_coord.shape[2] == 3 | |
out_sigma = self.fc_sigma(x).reshape(BATCH_SIZE, self.num_joints, -1) | |
# (B, N, 3) | |
pred_jts = out_coord.reshape(BATCH_SIZE, self.num_joints, 3) | |
sigma = out_sigma.reshape(BATCH_SIZE, self.num_joints, -1).sigmoid() | |
scores = 1 - sigma | |
scores = torch.mean(scores, dim=2, keepdim=True) | |
enc_outputs = EasyDict( | |
pred_jts=pred_jts, | |
sigma=sigma, | |
maxvals=scores.float(), | |
) | |
reference_points = pred_jts.detach() | |
reference_points_cliped = reference_points.clip(0, 1) | |
init_reference_out = reference_points_cliped | |
if query_init is not None: | |
query = query_init | |
else: | |
pred_jts_pos_embed = self.get_proposal_pos_embed(reference_points.detach()) | |
reference_points_pos_embed = self.get_proposal_pos_embed(reference_points_cliped.detach()) # query init here | |
if self.add_feat_2_query: | |
query_feat = point_sample(point_sample_feat, init_reference_out, align_corners=False).permute(0, 2, 1) | |
reference_points_pos_embed = reference_points_pos_embed + query_feat | |
query_pos_emb = torch.cat([pred_jts_pos_embed, reference_points_pos_embed], dim=2) | |
pos_trans_out = self.pos_trans_norm(self.pos_trans(query_pos_emb)) | |
query = pos_trans_out | |
query_pos = self.pos_embed.weight.clone().repeat(bs, 1, 1).contiguous() | |
# decoder | |
query = query.permute(1, 0, 2) | |
memory = memory.permute(1, 0, 2) | |
query_pos = query_pos.permute(1, 0, 2) | |
inter_states, inter_references = self.decoder( | |
query=query, | |
key=None, | |
value=memory, | |
query_pos=query_pos, | |
key_padding_mask=mask_flatten, | |
reference_points=reference_points, | |
spatial_shapes=spatial_shapes, | |
level_start_index=level_start_index, | |
valid_ratios=valid_ratios, | |
reg_branches=reg_branches, | |
fc_coord=fc_coord, | |
**kwargs) | |
inter_references_out = inter_references | |
return memory.permute(1, 0, 2), spatial_shapes, level_start_index, inter_states, init_reference_out, \ | |
inter_references_out, enc_outputs | |