import cv2 import os import numpy as np import yaml from paddle.inference import Config, create_predictor, PrecisionType from PIL import Image from .download import get_model_path from .preprocess import preprocess, Resize, NormalizeImage, Permute, PadStride, decode_image from .visualize import draw_det class Detector(object): def __init__(self, model_name): parent_path = os.path.abspath(os.path.join(__file__, *(['..'] * 2))) yml_file = os.path.join(parent_path, 'configs/{}.yml'.format(model_name)) with open(yml_file, 'r') as f: yml_conf = yaml.safe_load(f) infer_model = get_model_path(yml_conf['model_path']) infer_params = get_model_path(yml_conf['param_path']) config = Config(infer_model, infer_params) device = yml_conf.get('device', 'CPU') run_mode = yml_conf.get('mode', 'paddle') cpu_threads = yml_conf.get('cpu_threads', 1) if device == 'CPU': config.disable_gpu() config.set_cpu_math_library_num_threads(cpu_threads) elif device == 'GPU': # initial GPU memory(M), device ID config.enable_use_gpu(200, 0) # optimize graph and fuse op config.switch_ir_optim(True) precision_map = { 'trt_int8': Config.Precision.Int8, 'trt_fp32': Config.Precision.Float32, 'trt_fp16': Config.Precision.Half } if run_mode in precision_map.keys(): config.enable_tensorrt_engine( workspace_size=(1 << 25) * batch_size, max_batch_size=batch_size, min_subgraph_size=yml_conf['min_subgraph_size'], precision_mode=precision_map[run_mode], use_static=True, use_calib_mode=False) if yml_conf['use_dynamic_shape']: min_input_shape = { 'image': [batch_size, 3, 640, 640], 'scale_factor': [batch_size, 2] } max_input_shape = { 'image': [batch_size, 3, 1280, 1280], 'scale_factor': [batch_size, 2] } opt_input_shape = { 'image': [batch_size, 3, 1024, 1024], 'scale_factor': [batch_size, 2] } config.set_trt_dynamic_shape_info(min_input_shape, max_input_shape, opt_input_shape) # disable print log when predict config.disable_glog_info() # enable shared memory config.enable_memory_optim() # disable feed, fetch OP, needed by zero_copy_run config.switch_use_feed_fetch_ops(False) self.predictor = create_predictor(config) self.yml_conf = yml_conf self.preprocess_ops = self.create_preprocess_ops(yml_conf) self.input_names = self.predictor.get_input_names() self.output_names = self.predictor.get_output_names() self.draw_threshold = yml_conf.get('draw_threshold', 0.5) self.class_names = yml_conf['label_list'] def create_preprocess_ops(self, yml_conf): preprocess_ops = [] for op_info in yml_conf['Preprocess']: new_op_info = op_info.copy() op_type = new_op_info.pop('type') preprocess_ops.append(eval(op_type)(**new_op_info)) return preprocess_ops def create_inputs(self, image_files): inputs = dict() im_list, im_info_list = [], [] for im_path in image_files: im, im_info = preprocess(im_path, self.preprocess_ops) im_list.append(im) im_info_list.append(im_info) inputs['im_shape'] = np.stack([e['im_shape'] for e in im_info_list], axis=0).astype('float32') inputs['scale_factor'] = np.stack([e['scale_factor'] for e in im_info_list], axis=0).astype('float32') inputs['image'] = np.stack(im_list, axis=0).astype('float32') return inputs def __call__(self, image_file): inputs = self.create_inputs([image_file]) for name in self.input_names: input_tensor = self.predictor.get_input_handle(name) input_tensor.copy_from_cpu(inputs[name]) self.predictor.run() boxes_tensor = self.predictor.get_output_handle(self.output_names[0]) np_boxes = boxes_tensor.copy_to_cpu() boxes_num = self.predictor.get_output_handle(self.output_names[1]) np_boxes_num = boxes_num.copy_to_cpu() if np_boxes_num.sum() <= 0: np_boxes = np.zeros([0, 6]) if isinstance(image_file, str): image = Image.open(image_file).convert('RGB') elif isinstance(image_file, np.ndarray): image = image_file expect_boxes = (np_boxes[:, 1] > self.draw_threshold) & (np_boxes[:, 0] > -1) np_boxes = np_boxes[expect_boxes, :] image = draw_det(image, np_boxes, self.class_names) return image, {'bboxes': np_boxes.tolist()}