# Copyright (c) Facebook, Inc. and its affiliates. # pyre-unsafe import numpy as np from typing import Dict, List, Optional import fvcore.nn.weight_init as weight_init import torch import torch.nn as nn from torch.nn import functional as F from detectron2.layers import Conv2d, ShapeSpec, get_norm from detectron2.modeling import ROI_HEADS_REGISTRY, StandardROIHeads from detectron2.modeling.poolers import ROIPooler from detectron2.modeling.roi_heads import select_foreground_proposals from detectron2.structures import ImageList, Instances from .. import ( build_densepose_data_filter, build_densepose_embedder, build_densepose_head, build_densepose_losses, build_densepose_predictor, densepose_inference, ) class Decoder(nn.Module): """ A semantic segmentation head described in detail in the Panoptic Feature Pyramid Networks paper (https://arxiv.org/abs/1901.02446). It takes FPN features as input and merges information from all levels of the FPN into single output. """ def __init__(self, cfg, input_shape: Dict[str, ShapeSpec], in_features): super(Decoder, self).__init__() # fmt: off self.in_features = in_features feature_strides = {k: v.stride for k, v in input_shape.items()} feature_channels = {k: v.channels for k, v in input_shape.items()} num_classes = cfg.MODEL.ROI_DENSEPOSE_HEAD.DECODER_NUM_CLASSES conv_dims = cfg.MODEL.ROI_DENSEPOSE_HEAD.DECODER_CONV_DIMS self.common_stride = cfg.MODEL.ROI_DENSEPOSE_HEAD.DECODER_COMMON_STRIDE norm = cfg.MODEL.ROI_DENSEPOSE_HEAD.DECODER_NORM # fmt: on self.scale_heads = [] for in_feature in self.in_features: head_ops = [] head_length = max( 1, int(np.log2(feature_strides[in_feature]) - np.log2(self.common_stride)) ) for k in range(head_length): conv = Conv2d( feature_channels[in_feature] if k == 0 else conv_dims, conv_dims, kernel_size=3, stride=1, padding=1, bias=not norm, norm=get_norm(norm, conv_dims), activation=F.relu, ) weight_init.c2_msra_fill(conv) head_ops.append(conv) if feature_strides[in_feature] != self.common_stride: head_ops.append( nn.Upsample(scale_factor=2, mode="bilinear", align_corners=False) ) self.scale_heads.append(nn.Sequential(*head_ops)) self.add_module(in_feature, self.scale_heads[-1]) self.predictor = Conv2d(conv_dims, num_classes, kernel_size=1, stride=1, padding=0) weight_init.c2_msra_fill(self.predictor) def forward(self, features: List[torch.Tensor]): for i, _ in enumerate(self.in_features): if i == 0: x = self.scale_heads[i](features[i]) else: x = x + self.scale_heads[i](features[i]) x = self.predictor(x) return x @ROI_HEADS_REGISTRY.register() class DensePoseROIHeads(StandardROIHeads): """ A Standard ROIHeads which contains an addition of DensePose head. """ def __init__(self, cfg, input_shape): super().__init__(cfg, input_shape) self._init_densepose_head(cfg, input_shape) def _init_densepose_head(self, cfg, input_shape): # fmt: off self.densepose_on = cfg.MODEL.DENSEPOSE_ON if not self.densepose_on: return self.densepose_data_filter = build_densepose_data_filter(cfg) dp_pooler_resolution = cfg.MODEL.ROI_DENSEPOSE_HEAD.POOLER_RESOLUTION dp_pooler_sampling_ratio = cfg.MODEL.ROI_DENSEPOSE_HEAD.POOLER_SAMPLING_RATIO dp_pooler_type = cfg.MODEL.ROI_DENSEPOSE_HEAD.POOLER_TYPE self.use_decoder = cfg.MODEL.ROI_DENSEPOSE_HEAD.DECODER_ON # fmt: on if self.use_decoder: dp_pooler_scales = (1.0 / input_shape[self.in_features[0]].stride,) else: dp_pooler_scales = tuple(1.0 / input_shape[k].stride for k in self.in_features) in_channels = [input_shape[f].channels for f in self.in_features][0] if self.use_decoder: self.decoder = Decoder(cfg, input_shape, self.in_features) self.densepose_pooler = ROIPooler( output_size=dp_pooler_resolution, scales=dp_pooler_scales, sampling_ratio=dp_pooler_sampling_ratio, pooler_type=dp_pooler_type, ) self.densepose_head = build_densepose_head(cfg, in_channels) self.densepose_predictor = build_densepose_predictor( cfg, self.densepose_head.n_out_channels ) self.densepose_losses = build_densepose_losses(cfg) self.embedder = build_densepose_embedder(cfg) def _forward_densepose(self, features: Dict[str, torch.Tensor], instances: List[Instances]): """ Forward logic of the densepose prediction branch. Args: features (dict[str, Tensor]): input data as a mapping from feature map name to tensor. Axis 0 represents the number of images `N` in the input data; axes 1-3 are channels, height, and width, which may vary between feature maps (e.g., if a feature pyramid is used). instances (list[Instances]): length `N` list of `Instances`. The i-th `Instances` contains instances for the i-th input image, In training, they can be the proposals. In inference, they can be the predicted boxes. Returns: In training, a dict of losses. In inference, update `instances` with new fields "densepose" and return it. """ if not self.densepose_on: return {} if self.training else instances features_list = [features[f] for f in self.in_features] if self.training: proposals, _ = select_foreground_proposals(instances, self.num_classes) features_list, proposals = self.densepose_data_filter(features_list, proposals) if len(proposals) > 0: proposal_boxes = [x.proposal_boxes for x in proposals] if self.use_decoder: features_list = [self.decoder(features_list)] features_dp = self.densepose_pooler(features_list, proposal_boxes) densepose_head_outputs = self.densepose_head(features_dp) densepose_predictor_outputs = self.densepose_predictor(densepose_head_outputs) densepose_loss_dict = self.densepose_losses( proposals, densepose_predictor_outputs, embedder=self.embedder ) return densepose_loss_dict else: pred_boxes = [x.pred_boxes for x in instances] if self.use_decoder: features_list = [self.decoder(features_list)] features_dp = self.densepose_pooler(features_list, pred_boxes) if len(features_dp) > 0: densepose_head_outputs = self.densepose_head(features_dp) densepose_predictor_outputs = self.densepose_predictor(densepose_head_outputs) else: densepose_predictor_outputs = None densepose_inference(densepose_predictor_outputs, instances) return instances def forward( self, images: ImageList, features: Dict[str, torch.Tensor], proposals: List[Instances], targets: Optional[List[Instances]] = None, ): instances, losses = super().forward(images, features, proposals, targets) del targets, images if self.training: losses.update(self._forward_densepose(features, instances)) return instances, losses def forward_with_given_boxes( self, features: Dict[str, torch.Tensor], instances: List[Instances] ): """ Use the given boxes in `instances` to produce other (non-box) per-ROI outputs. This is useful for downstream tasks where a box is known, but need to obtain other attributes (outputs of other heads). Test-time augmentation also uses this. Args: features: same as in `forward()` instances (list[Instances]): instances to predict other outputs. Expect the keys "pred_boxes" and "pred_classes" to exist. Returns: instances (list[Instances]): the same `Instances` objects, with extra fields such as `pred_masks` or `pred_keypoints`. """ instances = super().forward_with_given_boxes(features, instances) instances = self._forward_densepose(features, instances) return instances