|
|
|
import os |
|
import warnings |
|
from argparse import ArgumentParser |
|
|
|
import cv2 |
|
import mmcv as mmcv |
|
|
|
from mmpose.apis import (collect_multi_frames, inference_top_down_pose_model, |
|
init_pose_model, vis_pose_tracking_result) |
|
from mmpose.core import Smoother |
|
from mmpose.datasets import DatasetInfo |
|
|
|
try: |
|
from mmtrack.apis import inference_mot |
|
from mmtrack.apis import init_model as init_tracking_model |
|
has_mmtrack = True |
|
except (ImportError, ModuleNotFoundError): |
|
has_mmtrack = False |
|
|
|
|
|
def process_mmtracking_results(mmtracking_results): |
|
"""Process mmtracking results. |
|
|
|
:param mmtracking_results: |
|
:return: a list of tracked bounding boxes |
|
""" |
|
person_results = [] |
|
|
|
|
|
if 'track_bboxes' in mmtracking_results: |
|
tracking_results = mmtracking_results['track_bboxes'][0] |
|
elif 'track_results' in mmtracking_results: |
|
tracking_results = mmtracking_results['track_results'][0] |
|
|
|
for track in tracking_results: |
|
person = {} |
|
person['track_id'] = int(track[0]) |
|
person['bbox'] = track[1:] |
|
person_results.append(person) |
|
return person_results |
|
|
|
|
|
def main(): |
|
"""Visualize the demo images. |
|
|
|
Using mmdet to detect the human. |
|
""" |
|
parser = ArgumentParser() |
|
parser.add_argument('tracking_config', help='Config file for tracking') |
|
parser.add_argument('pose_config', help='Config file for pose') |
|
parser.add_argument('pose_checkpoint', help='Checkpoint file for pose') |
|
parser.add_argument('--video-path', type=str, help='Video path') |
|
parser.add_argument( |
|
'--show', |
|
action='store_true', |
|
default=False, |
|
help='whether to show visualizations.') |
|
parser.add_argument( |
|
'--out-video-root', |
|
default='', |
|
help='Root of the output video file. ' |
|
'Default not saving the visualization video.') |
|
parser.add_argument( |
|
'--device', default='cuda:0', help='Device used for inference') |
|
parser.add_argument( |
|
'--bbox-thr', |
|
type=float, |
|
default=0.3, |
|
help='Bounding box score threshold') |
|
parser.add_argument( |
|
'--kpt-thr', type=float, default=0.3, help='Keypoint score threshold') |
|
parser.add_argument( |
|
'--radius', |
|
type=int, |
|
default=4, |
|
help='Keypoint radius for visualization') |
|
parser.add_argument( |
|
'--thickness', |
|
type=int, |
|
default=1, |
|
help='Link thickness for visualization') |
|
parser.add_argument( |
|
'--smooth', |
|
action='store_true', |
|
help='Apply a temporal filter to smooth the pose estimation results. ' |
|
'See also --smooth-filter-cfg.') |
|
parser.add_argument( |
|
'--smooth-filter-cfg', |
|
type=str, |
|
default='configs/_base_/filters/one_euro.py', |
|
help='Config file of the filter to smooth the pose estimation ' |
|
'results. See also --smooth.') |
|
|
|
parser.add_argument( |
|
'--use-multi-frames', |
|
action='store_true', |
|
default=False, |
|
help='whether to use multi frames for inference in the pose' |
|
'estimation stage. Default: False.') |
|
parser.add_argument( |
|
'--online', |
|
action='store_true', |
|
default=False, |
|
help='inference mode. If set to True, can not use future frame' |
|
'information when using multi frames for inference in the pose' |
|
'estimation stage. Default: False.') |
|
|
|
assert has_mmtrack, 'Please install mmtrack to run the demo.' |
|
|
|
args = parser.parse_args() |
|
|
|
assert args.show or (args.out_video_root != '') |
|
assert args.tracking_config is not None |
|
|
|
print('Initializing model...') |
|
tracking_model = init_tracking_model( |
|
args.tracking_config, None, device=args.device.lower()) |
|
|
|
pose_model = init_pose_model( |
|
args.pose_config, args.pose_checkpoint, device=args.device.lower()) |
|
|
|
dataset = pose_model.cfg.data['test']['type'] |
|
dataset_info = pose_model.cfg.data['test'].get('dataset_info', None) |
|
if dataset_info is None: |
|
warnings.warn( |
|
'Please set `dataset_info` in the config.' |
|
'Check https://github.com/open-mmlab/mmpose/pull/663 for details.', |
|
DeprecationWarning) |
|
else: |
|
dataset_info = DatasetInfo(dataset_info) |
|
|
|
|
|
video = mmcv.VideoReader(args.video_path) |
|
assert video.opened, f'Faild to load video file {args.video_path}' |
|
|
|
if args.out_video_root == '': |
|
save_out_video = False |
|
else: |
|
os.makedirs(args.out_video_root, exist_ok=True) |
|
save_out_video = True |
|
|
|
if save_out_video: |
|
fps = video.fps |
|
size = (video.width, video.height) |
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
videoWriter = cv2.VideoWriter( |
|
os.path.join(args.out_video_root, |
|
f'vis_{os.path.basename(args.video_path)}'), fourcc, |
|
fps, size) |
|
|
|
|
|
if args.use_multi_frames: |
|
assert 'frame_indices_test' in pose_model.cfg.data.test.data_cfg |
|
indices = pose_model.cfg.data.test.data_cfg['frame_indices_test'] |
|
|
|
|
|
if args.smooth: |
|
smoother = Smoother(filter_cfg=args.smooth_filter_cfg, keypoint_dim=2) |
|
else: |
|
smoother = None |
|
|
|
|
|
return_heatmap = False |
|
|
|
|
|
|
|
output_layer_names = None |
|
|
|
print('Running inference...') |
|
for frame_id, cur_frame in enumerate(mmcv.track_iter_progress(video)): |
|
|
|
if args.use_multi_frames: |
|
frames = collect_multi_frames(video, frame_id, indices, |
|
args.online) |
|
|
|
mmtracking_results = inference_mot( |
|
tracking_model, cur_frame, frame_id=frame_id) |
|
|
|
|
|
person_results = process_mmtracking_results(mmtracking_results) |
|
|
|
|
|
pose_results, returned_outputs = inference_top_down_pose_model( |
|
pose_model, |
|
frames if args.use_multi_frames else cur_frame, |
|
person_results, |
|
bbox_thr=args.bbox_thr, |
|
format='xyxy', |
|
dataset=dataset, |
|
dataset_info=dataset_info, |
|
return_heatmap=return_heatmap, |
|
outputs=output_layer_names) |
|
|
|
if smoother: |
|
pose_results = smoother.smooth(pose_results) |
|
|
|
|
|
vis_frame = vis_pose_tracking_result( |
|
pose_model, |
|
cur_frame, |
|
pose_results, |
|
radius=args.radius, |
|
thickness=args.thickness, |
|
dataset=dataset, |
|
dataset_info=dataset_info, |
|
kpt_score_thr=args.kpt_thr, |
|
show=False) |
|
|
|
if args.show: |
|
cv2.imshow('Frame', vis_frame) |
|
|
|
if save_out_video: |
|
videoWriter.write(vis_frame) |
|
|
|
if args.show and cv2.waitKey(1) & 0xFF == ord('q'): |
|
break |
|
|
|
if save_out_video: |
|
videoWriter.release() |
|
if args.show: |
|
cv2.destroyAllWindows() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|