Spaces:
Build error
Build error
| # Copyright (c) OpenMMLab. All rights reserved. | |
| import cv2 | |
| import numpy as np | |
| from mmpose.core.post_processing import (get_affine_transform, get_warp_matrix, | |
| warp_affine_joints) | |
| from mmpose.datasets.builder import PIPELINES | |
| from .shared_transform import Compose | |
| def _ceil_to_multiples_of(x, base=64): | |
| """Transform x to the integral multiple of the base.""" | |
| return int(np.ceil(x / base)) * base | |
| def _get_multi_scale_size(image, | |
| input_size, | |
| current_scale, | |
| min_scale, | |
| use_udp=False): | |
| """Get the size for multi-scale training. | |
| Args: | |
| image: Input image. | |
| input_size (np.ndarray[2]): Size (w, h) of the image input. | |
| current_scale (float): Scale factor. | |
| min_scale (float): Minimal scale. | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| Returns: | |
| tuple: A tuple containing multi-scale sizes. | |
| - (w_resized, h_resized) (tuple(int)): resized width/height | |
| - center (np.ndarray)image center | |
| - scale (np.ndarray): scales wrt width/height | |
| """ | |
| assert len(input_size) == 2 | |
| h, w, _ = image.shape | |
| # calculate the size for min_scale | |
| min_input_w = _ceil_to_multiples_of(min_scale * input_size[0], 64) | |
| min_input_h = _ceil_to_multiples_of(min_scale * input_size[1], 64) | |
| if w < h: | |
| w_resized = int(min_input_w * current_scale / min_scale) | |
| h_resized = int( | |
| _ceil_to_multiples_of(min_input_w / w * h, 64) * current_scale / | |
| min_scale) | |
| if use_udp: | |
| scale_w = w - 1.0 | |
| scale_h = (h_resized - 1.0) / (w_resized - 1.0) * (w - 1.0) | |
| else: | |
| scale_w = w / 200.0 | |
| scale_h = h_resized / w_resized * w / 200.0 | |
| else: | |
| h_resized = int(min_input_h * current_scale / min_scale) | |
| w_resized = int( | |
| _ceil_to_multiples_of(min_input_h / h * w, 64) * current_scale / | |
| min_scale) | |
| if use_udp: | |
| scale_h = h - 1.0 | |
| scale_w = (w_resized - 1.0) / (h_resized - 1.0) * (h - 1.0) | |
| else: | |
| scale_h = h / 200.0 | |
| scale_w = w_resized / h_resized * h / 200.0 | |
| if use_udp: | |
| center = (scale_w / 2.0, scale_h / 2.0) | |
| else: | |
| center = np.array([round(w / 2.0), round(h / 2.0)]) | |
| return (w_resized, h_resized), center, np.array([scale_w, scale_h]) | |
| def _resize_align_multi_scale(image, input_size, current_scale, min_scale): | |
| """Resize the images for multi-scale training. | |
| Args: | |
| image: Input image | |
| input_size (np.ndarray[2]): Size (w, h) of the image input | |
| current_scale (float): Current scale | |
| min_scale (float): Minimal scale | |
| Returns: | |
| tuple: A tuple containing image info. | |
| - image_resized (np.ndarray): resized image | |
| - center (np.ndarray): center of image | |
| - scale (np.ndarray): scale | |
| """ | |
| assert len(input_size) == 2 | |
| size_resized, center, scale = _get_multi_scale_size( | |
| image, input_size, current_scale, min_scale) | |
| trans = get_affine_transform(center, scale, 0, size_resized) | |
| image_resized = cv2.warpAffine(image, trans, size_resized) | |
| return image_resized, center, scale | |
| def _resize_align_multi_scale_udp(image, input_size, current_scale, min_scale): | |
| """Resize the images for multi-scale training. | |
| Args: | |
| image: Input image | |
| input_size (np.ndarray[2]): Size (w, h) of the image input | |
| current_scale (float): Current scale | |
| min_scale (float): Minimal scale | |
| Returns: | |
| tuple: A tuple containing image info. | |
| - image_resized (np.ndarray): resized image | |
| - center (np.ndarray): center of image | |
| - scale (np.ndarray): scale | |
| """ | |
| assert len(input_size) == 2 | |
| size_resized, _, _ = _get_multi_scale_size(image, input_size, | |
| current_scale, min_scale, True) | |
| _, center, scale = _get_multi_scale_size(image, input_size, min_scale, | |
| min_scale, True) | |
| trans = get_warp_matrix( | |
| theta=0, | |
| size_input=np.array(scale, dtype=np.float32), | |
| size_dst=np.array(size_resized, dtype=np.float32) - 1.0, | |
| size_target=np.array(scale, dtype=np.float32)) | |
| image_resized = cv2.warpAffine( | |
| image.copy(), trans, size_resized, flags=cv2.INTER_LINEAR) | |
| return image_resized, center, scale | |
| class HeatmapGenerator: | |
| """Generate heatmaps for bottom-up models. | |
| Args: | |
| num_joints (int): Number of keypoints | |
| output_size (np.ndarray): Size (w, h) of feature map | |
| sigma (int): Sigma of the heatmaps. | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| """ | |
| def __init__(self, output_size, num_joints, sigma=-1, use_udp=False): | |
| if not isinstance(output_size, np.ndarray): | |
| output_size = np.array(output_size) | |
| if output_size.size > 1: | |
| assert len(output_size) == 2 | |
| self.output_size = output_size | |
| else: | |
| self.output_size = np.array([output_size, output_size], | |
| dtype=np.int) | |
| self.num_joints = num_joints | |
| if sigma < 0: | |
| sigma = self.output_size.prod()**0.5 / 64 | |
| self.sigma = sigma | |
| size = 6 * sigma + 3 | |
| self.use_udp = use_udp | |
| if use_udp: | |
| self.x = np.arange(0, size, 1, np.float32) | |
| self.y = self.x[:, None] | |
| else: | |
| x = np.arange(0, size, 1, np.float32) | |
| y = x[:, None] | |
| x0, y0 = 3 * sigma + 1, 3 * sigma + 1 | |
| self.g = np.exp(-((x - x0)**2 + (y - y0)**2) / (2 * sigma**2)) | |
| def __call__(self, joints): | |
| """Generate heatmaps.""" | |
| hms = np.zeros( | |
| (self.num_joints, self.output_size[1], self.output_size[0]), | |
| dtype=np.float32) | |
| sigma = self.sigma | |
| for p in joints: | |
| for idx, pt in enumerate(p): | |
| if pt[2] > 0: | |
| x, y = int(pt[0]), int(pt[1]) | |
| if x < 0 or y < 0 or \ | |
| x >= self.output_size[0] or y >= self.output_size[1]: | |
| continue | |
| if self.use_udp: | |
| x0 = 3 * sigma + 1 + pt[0] - x | |
| y0 = 3 * sigma + 1 + pt[1] - y | |
| g = np.exp(-((self.x - x0)**2 + (self.y - y0)**2) / | |
| (2 * sigma**2)) | |
| else: | |
| g = self.g | |
| ul = int(np.round(x - 3 * sigma - | |
| 1)), int(np.round(y - 3 * sigma - 1)) | |
| br = int(np.round(x + 3 * sigma + | |
| 2)), int(np.round(y + 3 * sigma + 2)) | |
| c, d = max(0, | |
| -ul[0]), min(br[0], self.output_size[0]) - ul[0] | |
| a, b = max(0, | |
| -ul[1]), min(br[1], self.output_size[1]) - ul[1] | |
| cc, dd = max(0, ul[0]), min(br[0], self.output_size[0]) | |
| aa, bb = max(0, ul[1]), min(br[1], self.output_size[1]) | |
| hms[idx, aa:bb, | |
| cc:dd] = np.maximum(hms[idx, aa:bb, cc:dd], g[a:b, | |
| c:d]) | |
| return hms | |
| class JointsEncoder: | |
| """Encodes the visible joints into (coordinates, score); The coordinate of | |
| one joint and its score are of `int` type. | |
| (idx * output_size**2 + y * output_size + x, 1) or (0, 0). | |
| Args: | |
| max_num_people(int): Max number of people in an image | |
| num_joints(int): Number of keypoints | |
| output_size(np.ndarray): Size (w, h) of feature map | |
| tag_per_joint(bool): Option to use one tag map per joint. | |
| """ | |
| def __init__(self, max_num_people, num_joints, output_size, tag_per_joint): | |
| self.max_num_people = max_num_people | |
| self.num_joints = num_joints | |
| if not isinstance(output_size, np.ndarray): | |
| output_size = np.array(output_size) | |
| if output_size.size > 1: | |
| assert len(output_size) == 2 | |
| self.output_size = output_size | |
| else: | |
| self.output_size = np.array([output_size, output_size], | |
| dtype=np.int) | |
| self.tag_per_joint = tag_per_joint | |
| def __call__(self, joints): | |
| """ | |
| Note: | |
| - number of people in image: N | |
| - number of keypoints: K | |
| - max number of people in an image: M | |
| Args: | |
| joints (np.ndarray[N,K,3]) | |
| Returns: | |
| visible_kpts (np.ndarray[M,K,2]). | |
| """ | |
| visible_kpts = np.zeros((self.max_num_people, self.num_joints, 2), | |
| dtype=np.float32) | |
| for i in range(len(joints)): | |
| tot = 0 | |
| for idx, pt in enumerate(joints[i]): | |
| x, y = int(pt[0]), int(pt[1]) | |
| if (pt[2] > 0 and 0 <= y < self.output_size[1] | |
| and 0 <= x < self.output_size[0]): | |
| if self.tag_per_joint: | |
| visible_kpts[i][tot] = \ | |
| (idx * self.output_size.prod() | |
| + y * self.output_size[0] + x, 1) | |
| else: | |
| visible_kpts[i][tot] = (y * self.output_size[0] + x, 1) | |
| tot += 1 | |
| return visible_kpts | |
| class PAFGenerator: | |
| """Generate part affinity fields. | |
| Args: | |
| output_size (np.ndarray): Size (w, h) of feature map. | |
| limb_width (int): Limb width of part affinity fields. | |
| skeleton (list[list]): connections of joints. | |
| """ | |
| def __init__(self, output_size, limb_width, skeleton): | |
| if not isinstance(output_size, np.ndarray): | |
| output_size = np.array(output_size) | |
| if output_size.size > 1: | |
| assert len(output_size) == 2 | |
| self.output_size = output_size | |
| else: | |
| self.output_size = np.array([output_size, output_size], | |
| dtype=np.int) | |
| self.limb_width = limb_width | |
| self.skeleton = skeleton | |
| def _accumulate_paf_map_(self, pafs, src, dst, count): | |
| """Accumulate part affinity fields between two given joints. | |
| Args: | |
| pafs (np.ndarray[2,H,W]): paf maps (2 dimensions:x axis and | |
| y axis) for a certain limb connection. This argument will | |
| be modified inplace. | |
| src (np.ndarray[2,]): coordinates of the source joint. | |
| dst (np.ndarray[2,]): coordinates of the destination joint. | |
| count (np.ndarray[H,W]): count map that preserves the number | |
| of non-zero vectors at each point. This argument will be | |
| modified inplace. | |
| """ | |
| limb_vec = dst - src | |
| norm = np.linalg.norm(limb_vec) | |
| if norm == 0: | |
| unit_limb_vec = np.zeros(2) | |
| else: | |
| unit_limb_vec = limb_vec / norm | |
| min_x = max(np.floor(min(src[0], dst[0]) - self.limb_width), 0) | |
| max_x = min( | |
| np.ceil(max(src[0], dst[0]) + self.limb_width), | |
| self.output_size[0] - 1) | |
| min_y = max(np.floor(min(src[1], dst[1]) - self.limb_width), 0) | |
| max_y = min( | |
| np.ceil(max(src[1], dst[1]) + self.limb_width), | |
| self.output_size[1] - 1) | |
| range_x = list(range(int(min_x), int(max_x + 1), 1)) | |
| range_y = list(range(int(min_y), int(max_y + 1), 1)) | |
| mask = np.zeros_like(count, dtype=bool) | |
| if len(range_x) > 0 and len(range_y) > 0: | |
| xx, yy = np.meshgrid(range_x, range_y) | |
| delta_x = xx - src[0] | |
| delta_y = yy - src[1] | |
| dist = np.abs(delta_x * unit_limb_vec[1] - | |
| delta_y * unit_limb_vec[0]) | |
| mask_local = (dist < self.limb_width) | |
| mask[yy, xx] = mask_local | |
| pafs[0, mask] += unit_limb_vec[0] | |
| pafs[1, mask] += unit_limb_vec[1] | |
| count += mask | |
| return pafs, count | |
| def __call__(self, joints): | |
| """Generate the target part affinity fields.""" | |
| pafs = np.zeros( | |
| (len(self.skeleton) * 2, self.output_size[1], self.output_size[0]), | |
| dtype=np.float32) | |
| for idx, sk in enumerate(self.skeleton): | |
| count = np.zeros((self.output_size[1], self.output_size[0]), | |
| dtype=np.float32) | |
| for p in joints: | |
| src = p[sk[0]] | |
| dst = p[sk[1]] | |
| if src[2] > 0 and dst[2] > 0: | |
| self._accumulate_paf_map_(pafs[2 * idx:2 * idx + 2], | |
| src[:2], dst[:2], count) | |
| pafs[2 * idx:2 * idx + 2] /= np.maximum(count, 1) | |
| return pafs | |
| class BottomUpRandomFlip: | |
| """Data augmentation with random image flip for bottom-up. | |
| Args: | |
| flip_prob (float): Probability of flip. | |
| """ | |
| def __init__(self, flip_prob=0.5): | |
| self.flip_prob = flip_prob | |
| def __call__(self, results): | |
| """Perform data augmentation with random image flip.""" | |
| image, mask, joints = results['img'], results['mask'], results[ | |
| 'joints'] | |
| self.flip_index = results['ann_info']['flip_index'] | |
| self.output_size = results['ann_info']['heatmap_size'] | |
| assert isinstance(mask, list) | |
| assert isinstance(joints, list) | |
| assert len(mask) == len(joints) | |
| assert len(mask) == len(self.output_size) | |
| if np.random.random() < self.flip_prob: | |
| image = image[:, ::-1].copy() - np.zeros_like(image) | |
| for i, _output_size in enumerate(self.output_size): | |
| if not isinstance(_output_size, np.ndarray): | |
| _output_size = np.array(_output_size) | |
| if _output_size.size > 1: | |
| assert len(_output_size) == 2 | |
| else: | |
| _output_size = np.array([_output_size, _output_size], | |
| dtype=np.int) | |
| mask[i] = mask[i][:, ::-1].copy() | |
| joints[i] = joints[i][:, self.flip_index] | |
| joints[i][:, :, 0] = _output_size[0] - joints[i][:, :, 0] - 1 | |
| results['img'], results['mask'], results[ | |
| 'joints'] = image, mask, joints | |
| return results | |
| class BottomUpRandomAffine: | |
| """Data augmentation with random scaling & rotating. | |
| Args: | |
| rot_factor (int): Rotating to [-rotation_factor, rotation_factor] | |
| scale_factor (float): Scaling to [1-scale_factor, 1+scale_factor] | |
| scale_type: wrt ``long`` or ``short`` length of the image. | |
| trans_factor: Translation factor. | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| """ | |
| def __init__(self, | |
| rot_factor, | |
| scale_factor, | |
| scale_type, | |
| trans_factor, | |
| use_udp=False): | |
| self.max_rotation = rot_factor | |
| self.min_scale = scale_factor[0] | |
| self.max_scale = scale_factor[1] | |
| self.scale_type = scale_type | |
| self.trans_factor = trans_factor | |
| self.use_udp = use_udp | |
| def _get_scale(self, image_size, resized_size): | |
| w, h = image_size | |
| w_resized, h_resized = resized_size | |
| if w / w_resized < h / h_resized: | |
| if self.scale_type == 'long': | |
| w_pad = h / h_resized * w_resized | |
| h_pad = h | |
| elif self.scale_type == 'short': | |
| w_pad = w | |
| h_pad = w / w_resized * h_resized | |
| else: | |
| raise ValueError(f'Unknown scale type: {self.scale_type}') | |
| else: | |
| if self.scale_type == 'long': | |
| w_pad = w | |
| h_pad = w / w_resized * h_resized | |
| elif self.scale_type == 'short': | |
| w_pad = h / h_resized * w_resized | |
| h_pad = h | |
| else: | |
| raise ValueError(f'Unknown scale type: {self.scale_type}') | |
| scale = np.array([w_pad, h_pad], dtype=np.float32) | |
| return scale | |
| def __call__(self, results): | |
| """Perform data augmentation with random scaling & rotating.""" | |
| image, mask, joints = results['img'], results['mask'], results[ | |
| 'joints'] | |
| self.input_size = results['ann_info']['image_size'] | |
| if not isinstance(self.input_size, np.ndarray): | |
| self.input_size = np.array(self.input_size) | |
| if self.input_size.size > 1: | |
| assert len(self.input_size) == 2 | |
| else: | |
| self.input_size = [self.input_size, self.input_size] | |
| self.output_size = results['ann_info']['heatmap_size'] | |
| assert isinstance(mask, list) | |
| assert isinstance(joints, list) | |
| assert len(mask) == len(joints) | |
| assert len(mask) == len(self.output_size), (len(mask), | |
| len(self.output_size), | |
| self.output_size) | |
| height, width = image.shape[:2] | |
| if self.use_udp: | |
| center = np.array(((width - 1.0) / 2, (height - 1.0) / 2)) | |
| else: | |
| center = np.array((width / 2, height / 2)) | |
| img_scale = np.array([width, height], dtype=np.float32) | |
| aug_scale = np.random.random() * (self.max_scale - self.min_scale) \ | |
| + self.min_scale | |
| img_scale *= aug_scale | |
| aug_rot = (np.random.random() * 2 - 1) * self.max_rotation | |
| if self.trans_factor > 0: | |
| dx = np.random.randint(-self.trans_factor * img_scale[0] / 200.0, | |
| self.trans_factor * img_scale[0] / 200.0) | |
| dy = np.random.randint(-self.trans_factor * img_scale[1] / 200.0, | |
| self.trans_factor * img_scale[1] / 200.0) | |
| center[0] += dx | |
| center[1] += dy | |
| if self.use_udp: | |
| for i, _output_size in enumerate(self.output_size): | |
| if not isinstance(_output_size, np.ndarray): | |
| _output_size = np.array(_output_size) | |
| if _output_size.size > 1: | |
| assert len(_output_size) == 2 | |
| else: | |
| _output_size = [_output_size, _output_size] | |
| scale = self._get_scale(img_scale, _output_size) | |
| trans = get_warp_matrix( | |
| theta=aug_rot, | |
| size_input=center * 2.0, | |
| size_dst=np.array( | |
| (_output_size[0], _output_size[1]), dtype=np.float32) - | |
| 1.0, | |
| size_target=scale) | |
| mask[i] = cv2.warpAffine( | |
| (mask[i] * 255).astype(np.uint8), | |
| trans, (int(_output_size[0]), int(_output_size[1])), | |
| flags=cv2.INTER_LINEAR) / 255 | |
| mask[i] = (mask[i] > 0.5).astype(np.float32) | |
| joints[i][:, :, 0:2] = \ | |
| warp_affine_joints(joints[i][:, :, 0:2].copy(), trans) | |
| if results['ann_info']['scale_aware_sigma']: | |
| joints[i][:, :, 3] = joints[i][:, :, 3] / aug_scale | |
| scale = self._get_scale(img_scale, self.input_size) | |
| mat_input = get_warp_matrix( | |
| theta=aug_rot, | |
| size_input=center * 2.0, | |
| size_dst=np.array((self.input_size[0], self.input_size[1]), | |
| dtype=np.float32) - 1.0, | |
| size_target=scale) | |
| image = cv2.warpAffine( | |
| image, | |
| mat_input, (int(self.input_size[0]), int(self.input_size[1])), | |
| flags=cv2.INTER_LINEAR) | |
| else: | |
| for i, _output_size in enumerate(self.output_size): | |
| if not isinstance(_output_size, np.ndarray): | |
| _output_size = np.array(_output_size) | |
| if _output_size.size > 1: | |
| assert len(_output_size) == 2 | |
| else: | |
| _output_size = [_output_size, _output_size] | |
| scale = self._get_scale(img_scale, _output_size) | |
| mat_output = get_affine_transform( | |
| center=center, | |
| scale=scale / 200.0, | |
| rot=aug_rot, | |
| output_size=_output_size) | |
| mask[i] = cv2.warpAffine( | |
| (mask[i] * 255).astype(np.uint8), mat_output, | |
| (int(_output_size[0]), int(_output_size[1]))) / 255 | |
| mask[i] = (mask[i] > 0.5).astype(np.float32) | |
| joints[i][:, :, 0:2] = \ | |
| warp_affine_joints(joints[i][:, :, 0:2], mat_output) | |
| if results['ann_info']['scale_aware_sigma']: | |
| joints[i][:, :, 3] = joints[i][:, :, 3] / aug_scale | |
| scale = self._get_scale(img_scale, self.input_size) | |
| mat_input = get_affine_transform( | |
| center=center, | |
| scale=scale / 200.0, | |
| rot=aug_rot, | |
| output_size=self.input_size) | |
| image = cv2.warpAffine(image, mat_input, (int( | |
| self.input_size[0]), int(self.input_size[1]))) | |
| results['img'], results['mask'], results[ | |
| 'joints'] = image, mask, joints | |
| return results | |
| class BottomUpGenerateHeatmapTarget: | |
| """Generate multi-scale heatmap target for bottom-up. | |
| Args: | |
| sigma (int): Sigma of heatmap Gaussian | |
| max_num_people (int): Maximum number of people in an image | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| """ | |
| def __init__(self, sigma, use_udp=False): | |
| self.sigma = sigma | |
| self.use_udp = use_udp | |
| def _generate(self, num_joints, heatmap_size): | |
| """Get heatmap generator.""" | |
| heatmap_generator = [ | |
| HeatmapGenerator(output_size, num_joints, self.sigma, self.use_udp) | |
| for output_size in heatmap_size | |
| ] | |
| return heatmap_generator | |
| def __call__(self, results): | |
| """Generate multi-scale heatmap target for bottom-up.""" | |
| heatmap_generator = \ | |
| self._generate(results['ann_info']['num_joints'], | |
| results['ann_info']['heatmap_size']) | |
| target_list = list() | |
| joints_list = results['joints'] | |
| for scale_id in range(results['ann_info']['num_scales']): | |
| heatmaps = heatmap_generator[scale_id](joints_list[scale_id]) | |
| target_list.append(heatmaps.astype(np.float32)) | |
| results['target'] = target_list | |
| return results | |
| class BottomUpGenerateTarget: | |
| """Generate multi-scale heatmap target for associate embedding. | |
| Args: | |
| sigma (int): Sigma of heatmap Gaussian | |
| max_num_people (int): Maximum number of people in an image | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| """ | |
| def __init__(self, sigma, max_num_people, use_udp=False): | |
| self.sigma = sigma | |
| self.max_num_people = max_num_people | |
| self.use_udp = use_udp | |
| def _generate(self, num_joints, heatmap_size): | |
| """Get heatmap generator and joint encoder.""" | |
| heatmap_generator = [ | |
| HeatmapGenerator(output_size, num_joints, self.sigma, self.use_udp) | |
| for output_size in heatmap_size | |
| ] | |
| joints_encoder = [ | |
| JointsEncoder(self.max_num_people, num_joints, output_size, True) | |
| for output_size in heatmap_size | |
| ] | |
| return heatmap_generator, joints_encoder | |
| def __call__(self, results): | |
| """Generate multi-scale heatmap target for bottom-up.""" | |
| heatmap_generator, joints_encoder = \ | |
| self._generate(results['ann_info']['num_joints'], | |
| results['ann_info']['heatmap_size']) | |
| target_list = list() | |
| mask_list, joints_list = results['mask'], results['joints'] | |
| for scale_id in range(results['ann_info']['num_scales']): | |
| target_t = heatmap_generator[scale_id](joints_list[scale_id]) | |
| joints_t = joints_encoder[scale_id](joints_list[scale_id]) | |
| target_list.append(target_t.astype(np.float32)) | |
| mask_list[scale_id] = mask_list[scale_id].astype(np.float32) | |
| joints_list[scale_id] = joints_t.astype(np.int32) | |
| results['masks'], results['joints'] = mask_list, joints_list | |
| results['targets'] = target_list | |
| return results | |
| class BottomUpGeneratePAFTarget: | |
| """Generate multi-scale heatmaps and part affinity fields (PAF) target for | |
| bottom-up. Paper ref: Cao et al. Realtime Multi-Person 2D Human Pose | |
| Estimation using Part Affinity Fields (CVPR 2017). | |
| Args: | |
| limb_width (int): Limb width of part affinity fields | |
| """ | |
| def __init__(self, limb_width, skeleton=None): | |
| self.limb_width = limb_width | |
| self.skeleton = skeleton | |
| def _generate(self, heatmap_size, skeleton): | |
| """Get PAF generator.""" | |
| paf_generator = [ | |
| PAFGenerator(output_size, self.limb_width, skeleton) | |
| for output_size in heatmap_size | |
| ] | |
| return paf_generator | |
| def __call__(self, results): | |
| """Generate multi-scale part affinity fields for bottom-up.""" | |
| if self.skeleton is None: | |
| assert results['ann_info']['skeleton'] is not None | |
| self.skeleton = results['ann_info']['skeleton'] | |
| paf_generator = \ | |
| self._generate(results['ann_info']['heatmap_size'], | |
| self.skeleton) | |
| target_list = list() | |
| joints_list = results['joints'] | |
| for scale_id in range(results['ann_info']['num_scales']): | |
| pafs = paf_generator[scale_id](joints_list[scale_id]) | |
| target_list.append(pafs.astype(np.float32)) | |
| results['target'] = target_list | |
| return results | |
| class BottomUpGetImgSize: | |
| """Get multi-scale image sizes for bottom-up, including base_size and | |
| test_scale_factor. Keep the ratio and the image is resized to | |
| `results['ann_info']['image_size']×current_scale`. | |
| Args: | |
| test_scale_factor (List[float]): Multi scale | |
| current_scale (int): default 1 | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| """ | |
| def __init__(self, test_scale_factor, current_scale=1, use_udp=False): | |
| self.test_scale_factor = test_scale_factor | |
| self.min_scale = min(test_scale_factor) | |
| self.current_scale = current_scale | |
| self.use_udp = use_udp | |
| def __call__(self, results): | |
| """Get multi-scale image sizes for bottom-up.""" | |
| input_size = results['ann_info']['image_size'] | |
| if not isinstance(input_size, np.ndarray): | |
| input_size = np.array(input_size) | |
| if input_size.size > 1: | |
| assert len(input_size) == 2 | |
| else: | |
| input_size = np.array([input_size, input_size], dtype=np.int) | |
| img = results['img'] | |
| h, w, _ = img.shape | |
| # calculate the size for min_scale | |
| min_input_w = _ceil_to_multiples_of(self.min_scale * input_size[0], 64) | |
| min_input_h = _ceil_to_multiples_of(self.min_scale * input_size[1], 64) | |
| if w < h: | |
| w_resized = int(min_input_w * self.current_scale / self.min_scale) | |
| h_resized = int( | |
| _ceil_to_multiples_of(min_input_w / w * h, 64) * | |
| self.current_scale / self.min_scale) | |
| if self.use_udp: | |
| scale_w = w - 1.0 | |
| scale_h = (h_resized - 1.0) / (w_resized - 1.0) * (w - 1.0) | |
| else: | |
| scale_w = w / 200.0 | |
| scale_h = h_resized / w_resized * w / 200.0 | |
| else: | |
| h_resized = int(min_input_h * self.current_scale / self.min_scale) | |
| w_resized = int( | |
| _ceil_to_multiples_of(min_input_h / h * w, 64) * | |
| self.current_scale / self.min_scale) | |
| if self.use_udp: | |
| scale_h = h - 1.0 | |
| scale_w = (w_resized - 1.0) / (h_resized - 1.0) * (h - 1.0) | |
| else: | |
| scale_h = h / 200.0 | |
| scale_w = w_resized / h_resized * h / 200.0 | |
| if self.use_udp: | |
| center = (scale_w / 2.0, scale_h / 2.0) | |
| else: | |
| center = np.array([round(w / 2.0), round(h / 2.0)]) | |
| results['ann_info']['test_scale_factor'] = self.test_scale_factor | |
| results['ann_info']['base_size'] = (w_resized, h_resized) | |
| results['ann_info']['center'] = center | |
| results['ann_info']['scale'] = np.array([scale_w, scale_h]) | |
| return results | |
| class BottomUpResizeAlign: | |
| """Resize multi-scale size and align transform for bottom-up. | |
| Args: | |
| transforms (List): ToTensor & Normalize | |
| use_udp (bool): To use unbiased data processing. | |
| Paper ref: Huang et al. The Devil is in the Details: Delving into | |
| Unbiased Data Processing for Human Pose Estimation (CVPR 2020). | |
| """ | |
| def __init__(self, transforms, use_udp=False): | |
| self.transforms = Compose(transforms) | |
| if use_udp: | |
| self._resize_align_multi_scale = _resize_align_multi_scale_udp | |
| else: | |
| self._resize_align_multi_scale = _resize_align_multi_scale | |
| def __call__(self, results): | |
| """Resize multi-scale size and align transform for bottom-up.""" | |
| input_size = results['ann_info']['image_size'] | |
| if not isinstance(input_size, np.ndarray): | |
| input_size = np.array(input_size) | |
| if input_size.size > 1: | |
| assert len(input_size) == 2 | |
| else: | |
| input_size = np.array([input_size, input_size], dtype=np.int) | |
| test_scale_factor = results['ann_info']['test_scale_factor'] | |
| aug_data = [] | |
| for _, s in enumerate(sorted(test_scale_factor, reverse=True)): | |
| _results = results.copy() | |
| image_resized, _, _ = self._resize_align_multi_scale( | |
| _results['img'], input_size, s, min(test_scale_factor)) | |
| _results['img'] = image_resized | |
| _results = self.transforms(_results) | |
| transformed_img = _results['img'].unsqueeze(0) | |
| aug_data.append(transformed_img) | |
| results['ann_info']['aug_data'] = aug_data | |
| return results | |