|
import itertools |
|
import json |
|
import logging |
|
import os |
|
import time |
|
from collections import OrderedDict |
|
|
|
import numpy as np |
|
from .detools.box import DetBox |
|
from pycocotools.cocoeval import COCOeval |
|
from pycocotools.coco import COCO |
|
import pycocotools.mask as mask_util |
|
import torch |
|
import torch.nn as nn |
|
import torch.distributed as dist |
|
from torch.nn import functional as F |
|
from pathlib import Path |
|
|
|
import cv2, math |
|
from tqdm import tqdm |
|
from multiprocessing import Queue, Process |
|
from scipy.optimize import linear_sum_assignment |
|
from core.data.datasets.images import peddet_dataset_v2 as peddet_dataset |
|
import torch.distributed as dist |
|
|
|
from .seg_tester_dev import DatasetEvaluator |
|
|
|
|
|
|
|
PERSON_CLASSES = ['background', 'person'] |
|
|
|
class PedDetEvaluator(DatasetEvaluator): |
|
""" |
|
Evaluate Pedestrain Detection metrics |
|
""" |
|
|
|
def __init__( |
|
self, |
|
dataset_name, |
|
config, |
|
distributed=True, |
|
output_dir=None, |
|
): |
|
""" |
|
Args: |
|
dataset_name (str): name of the dataset to be evaluated. |
|
distributed (bool): if True, will collect results from all ranks for evaluation. |
|
Otherwise, will evaluate the results in the current process. |
|
output_dir (str): an output directory to dump results. |
|
num_classes, ignore_label: deprecated argument |
|
""" |
|
self._logger = logging.getLogger(__name__) |
|
|
|
self._dataset_name = dataset_name |
|
self._distributed = distributed |
|
self._output_dir = output_dir |
|
|
|
self._cpu_device = torch.device("cpu") |
|
self._thr = config.tester.kwargs.pos_thr |
|
self._gt_path = config.tester.kwargs.gt_path if config.tester.kwargs.gt_path.startswith('/mnt') else str((Path(peddet_dataset.__file__).parent / 'resources' / config.tester.kwargs.gt_path).resolve()) |
|
|
|
def reset(self): |
|
self._predictions = [] |
|
|
|
def process(self, inputs, outputs): |
|
""" |
|
Args: |
|
inputs: the inputs to a model |
|
It is a list of dicts. Each dict corresponds to an image and |
|
contains keys like "height", "width", "file_name". |
|
outputs: the outputs of a model. It is list of dicts with key "logits" and "bbox" |
|
""" |
|
orig_target_sizes = inputs["orig_size"] |
|
filtered_outputs = [self.deplicate(r, self._thr) for r in outputs] |
|
|
|
results = [{k:v.cpu().numpy() for k, v in r.items()} for r in filtered_outputs] |
|
dtboxes = [np.hstack([r['boxes'], r['scores'][:, np.newaxis]]) for r in results] |
|
dtboxes = [self.boxes_dump(db) for db in dtboxes] |
|
filenames = inputs['filename'] |
|
res = [{'ID':name, 'dtboxes':db} for name, db in zip(filenames, dtboxes)] |
|
assert len(res) == len(outputs) |
|
self._predictions.extend(res) |
|
|
|
def deplicate(self, record, thr): |
|
|
|
assert 'scores' in record |
|
names = [k for (k, v) in record.items()] |
|
flag = record['scores'] >= thr |
|
for name in names: |
|
record[name] = record[name][flag] |
|
return record |
|
|
|
@staticmethod |
|
def all_gather(data): |
|
""" |
|
Run all_gather on arbitrary picklable data (not necessarily tensors) |
|
Args: |
|
data: any picklable object |
|
Returns: |
|
list[data]: list of data gathered from each rank |
|
""" |
|
world_size = dist.get_world_size() |
|
if world_size == 1: |
|
return [data] |
|
|
|
|
|
buffer = pickle.dumps(data) |
|
storage = torch.ByteStorage.from_buffer(buffer) |
|
tensor = torch.ByteTensor(storage).to("cuda") |
|
|
|
|
|
local_size = torch.tensor([tensor.numel()], device="cuda") |
|
size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] |
|
dist.all_gather(size_list, local_size) |
|
size_list = [int(size.item()) for size in size_list] |
|
max_size = max(size_list) |
|
|
|
|
|
|
|
|
|
tensor_list = [] |
|
for _ in size_list: |
|
tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) |
|
if local_size != max_size: |
|
padding = torch.empty(size=(max_size - local_size,), dtype=torch.uint8, device="cuda") |
|
tensor = torch.cat((tensor, padding), dim=0) |
|
dist.all_gather(tensor_list, tensor) |
|
|
|
data_list = [] |
|
for size, tensor in zip(size_list, tensor_list): |
|
buffer = tensor.cpu().numpy().tobytes()[:size] |
|
data_list.append(pickle.loads(buffer)) |
|
|
|
return data_list |
|
|
|
def evaluate(self): |
|
""" |
|
: return: "AP", "MR", "JI", "Recall" |
|
""" |
|
if self._distributed: |
|
torch.cuda.synchronize() |
|
predictions_list = self.all_gather(self._predictions) |
|
if dist.get_rank() != 0: |
|
return |
|
|
|
proxy_number = time.time() |
|
file_path = os.path.join('tmp', 'CrowdHuman', f'Iter-{proxy_number}.human') |
|
self.save_results(self._predictions, file_path) |
|
eval_results = self._evaluate_predictions_on_crowdhuman(self._gt_path, file_path) |
|
|
|
res = {} |
|
metric_names = ["AP", "MR", "JI", "Recall"] |
|
for k, v in zip(metric_names, eval_results): |
|
print(f"{k}: {v}") |
|
res[k] = v |
|
results = OrderedDict({"pedestrain_detection": res}) |
|
self._logger.info(results) |
|
os.remove(file_path) |
|
print(f"{file_path} deleted") |
|
|
|
return results |
|
|
|
def save_results(self, content, fpath): |
|
os.makedirs(os.path.dirname(fpath), exist_ok=True) |
|
print("save results to {}".format(fpath)) |
|
with open(fpath,'w') as fid: |
|
for db in content: |
|
line = json.dumps(db)+'\n' |
|
fid.write(line) |
|
|
|
def boxes_dump(self, dtboxes): |
|
|
|
n, boxes = dtboxes.shape[0], [] |
|
for i in range(n): |
|
db = np.float64(dtboxes[i,:]) |
|
dbox = DetBox(db[0], db[1], db[2]-db[0], |
|
db[3]-db[1], tag = 1, score = db[4]) |
|
boxes.append(dbox.dumpOdf()) |
|
return boxes |
|
|
|
def _evaluate_predictions_on_crowdhuman(self, gt_path, dt_path, target_key="box", mode=0): |
|
""" |
|
Evaluate the coco results using COCOEval API. |
|
""" |
|
database = Database(gt_path, dt_path, target_key, None, mode) |
|
database.compare() |
|
AP, recall, data = database.eval_AP() |
|
mMR, _ = database.eval_MR(fppiX=data[-2], fppiY=data[-1]) |
|
return AP, mMR, computeJaccard(gt_path, dt_path), recall |
|
|
|
class PedDetMAEEvaluator(PedDetEvaluator): |
|
def process(self, inputs, outputs): |
|
""" |
|
Args: |
|
inputs: the inputs to a model |
|
It is a list of dicts. Each dict corresponds to an image and |
|
contains keys like "height", "width", "file_name". |
|
outputs: the outputs of a model. It is list of dicts with key "logits" and "bbox" |
|
""" |
|
orig_target_sizes = inputs["orig_size"] |
|
import pdb; |
|
|
|
filtered_outputs = [self.deplicate(r, self._thr) for r in outputs['pred']] |
|
|
|
results = [{k:v.cpu().numpy() for k, v in r.items()} for r in filtered_outputs] |
|
dtboxes = [np.hstack([r['boxes'], r['scores'][:, np.newaxis]]) for r in results] |
|
dtboxes = [self.boxes_dump(db) for db in dtboxes] |
|
filenames = inputs['filename'] |
|
res = [{'ID':name, 'dtboxes':db} for name, db in zip(filenames, dtboxes)] |
|
assert len(res) == len(outputs['pred']) |
|
self._predictions.extend(res) |
|
|
|
|
|
|
|
def evaluate(self): |
|
""" |
|
: return: "AP", "MR", "JI", "Recall" |
|
""" |
|
if self._distributed: |
|
torch.cuda.synchronize() |
|
predictions_list = self.all_gather(self._predictions) |
|
if dist.get_rank() != 0: |
|
return |
|
|
|
proxy_number = time.time() |
|
file_path = os.path.join('tmp', 'CrowdHuman', f'Iter-{proxy_number}.human') |
|
self.save_results(self._predictions, file_path) |
|
eval_results = self._evaluate_predictions_on_crowdhuman(self._gt_path, file_path) |
|
|
|
res = {} |
|
metric_names = ["AP", "MR", "JI", "Recall"] |
|
for k, v in zip(metric_names, eval_results): |
|
print(f"{k}: {v}") |
|
res[k] = v |
|
results = OrderedDict({"pedestrian_detection": res}) |
|
self._logger.info(results) |
|
os.remove(file_path) |
|
print(f"{file_path} deleted") |
|
|
|
return results |
|
|
|
def _evaluate_predictions_on_crowdhuman(self, gt_path, dt_path, target_key="box", mode=0): |
|
""" |
|
Evaluate the coco results using COCOEval API. |
|
""" |
|
database = Database(gt_path, dt_path, target_key, None, mode) |
|
database.compare() |
|
AP, recall, data = database.eval_AP() |
|
mMR, _ = database.eval_MR(fppiX=data[-2], fppiY=data[-1]) |
|
return AP, mMR, 0, recall |
|
|
|
class Image(object): |
|
def __init__(self, mode): |
|
self.ID = None |
|
self._width = None |
|
self._height = None |
|
self.dtboxes = None |
|
self.gtboxes = None |
|
self.eval_mode = mode |
|
|
|
self._ignNum = None |
|
self._gtNum = None |
|
self._dtNum = None |
|
|
|
def load(self, record, body_key, head_key, class_names, gtflag): |
|
""" |
|
:meth: read the object from a dict |
|
""" |
|
if "ID" in record and self.ID is None: |
|
self.ID = record['ID'] |
|
if "width" in record and self._width is None: |
|
self._width = record["width"] |
|
if "height" in record and self._height is None: |
|
self._height = record["height"] |
|
if gtflag: |
|
self._gtNum = len(record["gtboxes"]) |
|
body_bbox, head_bbox = self.load_gt_boxes(record, 'gtboxes', class_names) |
|
if self.eval_mode == 0: |
|
self.gtboxes = body_bbox |
|
self._ignNum = (body_bbox[:, -1] == -1).sum() |
|
elif self.eval_mode == 1: |
|
self.gtboxes = head_bbox |
|
self._ignNum = (head_bbox[:, -1] == -1).sum() |
|
elif self.eval_mode == 2: |
|
gt_tag = np.array( |
|
[body_bbox[i, -1] != -1 and head_bbox[i, -1] != -1 |
|
for i in range(len(body_bbox))] |
|
) |
|
self._ignNum = (gt_tag == 0).sum() |
|
self.gtboxes = np.hstack( |
|
(body_bbox[:, :-1], head_bbox[:, :-1], gt_tag.reshape(-1, 1)) |
|
) |
|
else: |
|
raise Exception('Unknown evaluation mode!') |
|
if not gtflag: |
|
self._dtNum = len(record["dtboxes"]) |
|
if self.eval_mode == 0: |
|
self.dtboxes = self.load_det_boxes(record, 'dtboxes', body_key, 'score') |
|
elif self.eval_mode == 1: |
|
self.dtboxes = self.load_det_boxes(record, 'dtboxes', head_key, 'score') |
|
elif self.eval_mode == 2: |
|
body_dtboxes = self.load_det_boxes(record, 'dtboxes', body_key) |
|
head_dtboxes = self.load_det_boxes(record, 'dtboxes', head_key, 'score') |
|
self.dtboxes = np.hstack((body_dtboxes, head_dtboxes)) |
|
else: |
|
raise Exception('Unknown evaluation mode!') |
|
|
|
def compare_caltech(self, thres): |
|
""" |
|
:meth: match the detection results with the groundtruth by Caltech matching strategy |
|
:param thres: iou threshold |
|
:type thres: float |
|
:return: a list of tuples (dtbox, imageID), in the descending sort of dtbox.score |
|
""" |
|
if self.dtboxes is None or self.gtboxes is None: |
|
return list() |
|
|
|
dtboxes = self.dtboxes if self.dtboxes is not None else list() |
|
gtboxes = self.gtboxes if self.gtboxes is not None else list() |
|
|
|
dtboxes = np.array(sorted(dtboxes, key=lambda x: x[-1], reverse=True)) |
|
gtboxes = np.array(sorted(gtboxes, key=lambda x: x[-1], reverse=True)) |
|
if len(dtboxes): |
|
overlap_iou = self.box_overlap_opr(dtboxes, gtboxes[gtboxes[:, -1] > 0], True) |
|
overlap_ioa = self.box_overlap_opr(dtboxes, gtboxes[gtboxes[:, -1] <= 0], False) |
|
ign = np.any(overlap_ioa > thres, 1) |
|
pos = np.any(overlap_iou > thres, 1) |
|
else: |
|
return list() |
|
|
|
scorelist = list() |
|
for i, dt in enumerate(dtboxes): |
|
maxpos = np.argmax(overlap_iou[i]) |
|
if overlap_iou[i, maxpos] > thres: |
|
overlap_iou[:, maxpos] = 0 |
|
scorelist.append((dt, 1, self.ID, pos[i])) |
|
elif not ign[i]: |
|
scorelist.append((dt, 0, self.ID, pos[i])) |
|
return scorelist |
|
|
|
def compare_caltech_union(self, thres): |
|
""" |
|
:meth: match the detection results with the groundtruth by Caltech matching strategy |
|
:param thres: iou threshold |
|
:type thres: float |
|
:return: a list of tuples (dtbox, imageID), in the descending sort of dtbox.score |
|
""" |
|
dtboxes = self.dtboxes if self.dtboxes is not None else list() |
|
gtboxes = self.gtboxes if self.gtboxes is not None else list() |
|
if len(dtboxes) == 0: |
|
return list() |
|
dt_matched = np.zeros(dtboxes.shape[0]) |
|
gt_matched = np.zeros(gtboxes.shape[0]) |
|
|
|
dtboxes = np.array(sorted(dtboxes, key=lambda x: x[-1], reverse=True)) |
|
gtboxes = np.array(sorted(gtboxes, key=lambda x: x[-1], reverse=True)) |
|
dt_body_boxes = np.hstack((dtboxes[:, :4], dtboxes[:, -1][:, None])) |
|
dt_head_boxes = dtboxes[:, 4:8] |
|
gt_body_boxes = np.hstack((gtboxes[:, :4], gtboxes[:, -1][:, None])) |
|
gt_head_boxes = gtboxes[:, 4:8] |
|
overlap_iou = self.box_overlap_opr(dt_body_boxes, gt_body_boxes, True) |
|
overlap_head = self.box_overlap_opr(dt_head_boxes, gt_head_boxes, True) |
|
overlap_ioa = self.box_overlap_opr(dt_body_boxes, gt_body_boxes, False) |
|
|
|
scorelist = list() |
|
for i, dt in enumerate(dtboxes): |
|
maxpos = -1 |
|
maxiou = thres |
|
for j, gt in enumerate(gtboxes): |
|
if gt_matched[j] == 1: |
|
continue |
|
if gt[-1] > 0: |
|
o_body = overlap_iou[i][j] |
|
o_head = overlap_head[i][j] |
|
if o_body > maxiou and o_head > maxiou: |
|
maxiou = o_body |
|
maxpos = j |
|
else: |
|
if maxpos >= 0: |
|
break |
|
else: |
|
o_body = overlap_ioa[i][j] |
|
if o_body > thres: |
|
maxiou = o_body |
|
maxpos = j |
|
if maxpos >= 0: |
|
if gtboxes[maxpos, -1] > 0: |
|
gt_matched[maxpos] = 1 |
|
dt_matched[i] = 1 |
|
scorelist.append((dt, 1, self.ID)) |
|
else: |
|
dt_matched[i] = -1 |
|
else: |
|
dt_matched[i] = 0 |
|
scorelist.append((dt, 0, self.ID)) |
|
return scorelist |
|
|
|
def box_overlap_opr(self, dboxes: np.ndarray, gboxes: np.ndarray, if_iou) -> np.ndarray: |
|
eps = 1e-6 |
|
assert dboxes.shape[-1] >= 4 and gboxes.shape[-1] >= 4 |
|
N, K = dboxes.shape[0], gboxes.shape[0] |
|
dtboxes = np.tile(np.expand_dims(dboxes, axis=1), (1, K, 1)) |
|
gtboxes = np.tile(np.expand_dims(gboxes, axis=0), (N, 1, 1)) |
|
|
|
iw = (np.minimum(dtboxes[:, :, 2], gtboxes[:, :, 2]) |
|
- np.maximum(dtboxes[:, :, 0], gtboxes[:, :, 0])) |
|
ih = (np.minimum(dtboxes[:, :, 3], gtboxes[:, :, 3]) |
|
- np.maximum(dtboxes[:, :, 1], gtboxes[:, :, 1])) |
|
inter = np.maximum(0, iw) * np.maximum(0, ih) |
|
|
|
dtarea = (dtboxes[:, :, 2] - dtboxes[:, :, 0]) * (dtboxes[:, :, 3] - dtboxes[:, :, 1]) |
|
if if_iou: |
|
gtarea = (gtboxes[:, :, 2] - gtboxes[:, :, 0]) * (gtboxes[:, :, 3] - gtboxes[:, :, 1]) |
|
ious = inter / (dtarea + gtarea - inter + eps) |
|
else: |
|
ious = inter / (dtarea + eps) |
|
return ious |
|
|
|
def clip_all_boader(self): |
|
|
|
def _clip_boundary(boxes, height, width): |
|
assert boxes.shape[-1] >= 4 |
|
boxes[:, 0] = np.minimum(np.maximum(boxes[:, 0], 0), width - 1) |
|
boxes[:, 1] = np.minimum(np.maximum(boxes[:, 1], 0), height - 1) |
|
boxes[:, 2] = np.maximum(np.minimum(boxes[:, 2], width), 0) |
|
boxes[:, 3] = np.maximum(np.minimum(boxes[:, 3], height), 0) |
|
return boxes |
|
|
|
assert self.dtboxes.shape[-1] >= 4 |
|
assert self.gtboxes.shape[-1] >= 4 |
|
assert self._width is not None and self._height is not None |
|
if self.eval_mode == 2: |
|
self.dtboxes[:, :4] = _clip_boundary(self.dtboxes[:, :4], self._height, self._width) |
|
self.gtboxes[:, :4] = _clip_boundary(self.gtboxes[:, :4], self._height, self._width) |
|
self.dtboxes[:, 4:8] = _clip_boundary(self.dtboxes[:, 4:8], self._height, self._width) |
|
self.gtboxes[:, 4:8] = _clip_boundary(self.gtboxes[:, 4:8], self._height, self._width) |
|
else: |
|
self.dtboxes = _clip_boundary(self.dtboxes, self._height, self._width) |
|
self.gtboxes = _clip_boundary(self.gtboxes, self._height, self._width) |
|
|
|
def load_gt_boxes(self, dict_input, key_name, class_names): |
|
assert key_name in dict_input |
|
if len(dict_input[key_name]) < 1: |
|
return np.empty([0, 5]) |
|
head_bbox = [] |
|
body_bbox = [] |
|
for rb in dict_input[key_name]: |
|
if rb['tag'] in class_names: |
|
body_tag = class_names.index(rb['tag']) |
|
head_tag = 1 |
|
else: |
|
body_tag = -1 |
|
head_tag = -1 |
|
if 'extra' in rb: |
|
if 'ignore' in rb['extra']: |
|
if rb['extra']['ignore'] != 0: |
|
body_tag = -1 |
|
head_tag = -1 |
|
if 'head_attr' in rb: |
|
if 'ignore' in rb['head_attr']: |
|
if rb['head_attr']['ignore'] != 0: |
|
head_tag = -1 |
|
|
|
body_bbox.append((*rb['fbox'], body_tag)) |
|
|
|
|
|
body_bbox = np.array(body_bbox) |
|
body_bbox[:, 2:4] += body_bbox[:, :2] |
|
return body_bbox, head_bbox |
|
|
|
def load_det_boxes(self, dict_input, key_name, key_box, key_score=None, key_tag=None): |
|
assert key_name in dict_input |
|
if len(dict_input[key_name]) < 1: |
|
return np.empty([0, 5]) |
|
else: |
|
assert key_box in dict_input[key_name][0] |
|
if key_score: |
|
assert key_score in dict_input[key_name][0] |
|
if key_tag: |
|
assert key_tag in dict_input[key_name][0] |
|
if key_score: |
|
if key_tag: |
|
bboxes = np.vstack( |
|
[ |
|
np.hstack( |
|
(rb[key_box], rb[key_score], rb[key_tag]) |
|
) for rb in dict_input[key_name] |
|
] |
|
) |
|
else: |
|
bboxes = np.array([(*rb[key_box], rb[key_score]) for rb in dict_input[key_name]]) |
|
else: |
|
if key_tag: |
|
bboxes = np.vstack( |
|
[np.hstack((rb[key_box], rb[key_tag])) for rb in dict_input[key_name]] |
|
) |
|
else: |
|
bboxes = np.vstack([rb[key_box] for rb in dict_input[key_name]]) |
|
bboxes[:, 2:4] += bboxes[:, :2] |
|
return bboxes |
|
|
|
def compare_voc(self, thres): |
|
""" |
|
:meth: match the detection results with the groundtruth by VOC matching strategy |
|
:param thres: iou threshold |
|
:type thres: float |
|
:return: a list of tuples (dtbox, imageID), in the descending sort of dtbox.score |
|
""" |
|
if self.dtboxes is None: |
|
return list() |
|
dtboxes = self.dtboxes |
|
gtboxes = self.gtboxes if self.gtboxes is not None else list() |
|
dtboxes.sort(key=lambda x: x.score, reverse=True) |
|
gtboxes.sort(key=lambda x: x.ign) |
|
|
|
scorelist = list() |
|
for i, dt in enumerate(dtboxes): |
|
maxpos = -1 |
|
maxiou = thres |
|
|
|
for j, gt in enumerate(gtboxes): |
|
overlap = dt.iou(gt) |
|
if overlap > maxiou: |
|
maxiou = overlap |
|
maxpos = j |
|
|
|
if maxpos >= 0: |
|
if gtboxes[maxpos].ign == 0: |
|
gtboxes[maxpos].matched = 1 |
|
dtboxes[i].matched = 1 |
|
scorelist.append((dt, self.ID)) |
|
else: |
|
dtboxes[i].matched = -1 |
|
else: |
|
dtboxes[i].matched = 0 |
|
scorelist.append((dt, self.ID)) |
|
return scorelist |
|
|
|
class Database(object): |
|
def __init__(self, gtpath=None, dtpath=None, body_key=None, head_key=None, mode=0): |
|
""" |
|
mode=0: only body; mode=1: only head |
|
""" |
|
self.images = dict() |
|
self.eval_mode = mode |
|
self.loadData(gtpath, body_key, head_key, if_gt=True) |
|
self.loadData(dtpath, body_key, head_key, if_gt=False) |
|
|
|
self._ignNum = sum([self.images[i]._ignNum for i in self.images]) |
|
self._gtNum = sum([self.images[i]._gtNum for i in self.images]) |
|
self._imageNum = len(self.images) |
|
self.scorelist = None |
|
|
|
def loadData(self, fpath, body_key=None, head_key=None, if_gt=True): |
|
assert os.path.isfile(fpath), fpath + " does not exist!" |
|
with open(fpath, "r") as f: |
|
lines = f.readlines() |
|
records = [json.loads(line.strip('\n')) for line in lines] |
|
if if_gt: |
|
records = records[0] |
|
for record in records: |
|
self.images[record["ID"]] = Image(self.eval_mode) |
|
self.images[record["ID"]].load(record, body_key, head_key, PERSON_CLASSES, True) |
|
else: |
|
for record in records: |
|
self.images[record["ID"]].load(record, body_key, head_key, PERSON_CLASSES, False) |
|
self.images[record["ID"]].clip_all_boader() |
|
|
|
def compare(self, thres=0.5, matching=None): |
|
""" |
|
match the detection results with the groundtruth in the whole database |
|
""" |
|
assert matching is None or matching == "VOC", matching |
|
scorelist = list() |
|
for ID in self.images: |
|
if matching == "VOC": |
|
result = self.images[ID].compare_voc(thres) |
|
else: |
|
result = self.images[ID].compare_caltech(thres) |
|
scorelist.extend(result) |
|
|
|
scorelist.sort(key=lambda x: x[0][-1], reverse=True) |
|
self.scorelist = scorelist |
|
|
|
def eval_MR(self, ref="CALTECH_-2", fppiX=None, fppiY=None): |
|
""" |
|
evaluate by Caltech-style log-average miss rate |
|
ref: str - "CALTECH_-2"/"CALTECH_-4" |
|
""" |
|
|
|
def _find_gt(lst, target): |
|
for idx, item in enumerate(lst): |
|
if item >= target: |
|
return idx |
|
return len(lst) - 1 |
|
|
|
assert ref == "CALTECH_-2" or ref == "CALTECH_-4", ref |
|
if ref == "CALTECH_-2": |
|
|
|
ref = [0.0100, 0.0178, 0.03160, 0.0562, 0.1000, 0.1778, 0.3162, 0.5623, 1.000] |
|
else: |
|
|
|
ref = [0.0001, 0.0003, 0.00100, 0.0032, 0.0100, 0.0316, 0.1000, 0.3162, 1.000] |
|
|
|
if self.scorelist is None: |
|
self.compare() |
|
|
|
tp, fp = 0.0, 0.0 |
|
if fppiX is None or fppiY is None: |
|
fppiX, fppiY = list(), list() |
|
for i, item in enumerate(self.scorelist): |
|
if item[1] == 1: |
|
tp += 1.0 |
|
elif item[1] == 0: |
|
fp += 1.0 |
|
|
|
fn = (self._gtNum - self._ignNum) - tp |
|
recall = tp / (tp + fn) |
|
missrate = 1.0 - recall |
|
fppi = fp / self._imageNum |
|
fppiX.append(fppi) |
|
fppiY.append(missrate) |
|
|
|
score = list() |
|
for pos in ref: |
|
argmin = _find_gt(fppiX, pos) |
|
if argmin >= 0: |
|
score.append(fppiY[argmin]) |
|
score = np.array(score) |
|
MR = np.exp(np.log(score).mean()) |
|
return MR, (fppiX, fppiY) |
|
|
|
def eval_AP(self): |
|
""" |
|
:meth: evaluate by average precision |
|
""" |
|
|
|
def _calculate_map(recall, precision): |
|
assert len(recall) == len(precision) |
|
area = 0 |
|
for i in range(1, len(recall)): |
|
delta_h = (precision[i - 1] + precision[i]) / 2 |
|
delta_w = recall[i] - recall[i - 1] |
|
area += delta_w * delta_h |
|
return area |
|
|
|
tp, fp, dp = 0.0, 0.0, 0.0 |
|
rpX, rpY = list(), list() |
|
total_gt = self._gtNum - self._ignNum |
|
total_images = self._imageNum |
|
|
|
fpn = [] |
|
dpn = [] |
|
recalln = [] |
|
thr = [] |
|
fppi = [] |
|
mr = [] |
|
for i, item in enumerate(self.scorelist): |
|
if item[1] == 1: |
|
tp += 1.0 |
|
elif item[1] == 0: |
|
fp += 1.0 |
|
dp += item[-1] |
|
fn = total_gt - tp |
|
recall = tp / (tp + fn) |
|
precision = tp / (tp + fp) |
|
rpX.append(recall) |
|
rpY.append(precision) |
|
fpn.append(fp) |
|
dpn.append(dp) |
|
recalln.append(tp) |
|
thr.append(item[0][-1]) |
|
fppi.append(fp / total_images) |
|
mr.append(1 - recall) |
|
|
|
AP = _calculate_map(rpX, rpY) |
|
return AP, recall, (rpX, rpY, thr, fpn, dpn, recalln, fppi, mr) |
|
|
|
def computeJaccard(gt_path, dt_path): |
|
dt = load_func(dt_path) |
|
gt = load_func(gt_path) |
|
ji = 0. |
|
for i in range(1, 10): |
|
results = common_process(worker, dt, 1, gt, i * 0.1, 0.5) |
|
ji = max(ji, np.sum([rb['ratio'] for rb in results]) / 4370) |
|
return ji |
|
|
|
def load_func(fpath): |
|
assert os.path.exists(fpath) |
|
|
|
with open(fpath,'r') as fid: |
|
lines = fid.readlines() |
|
records = [json.loads(line.strip('\n')) for line in lines] |
|
if len(records) == 1: records = records[0] |
|
return records |
|
|
|
def worker(result_queue, records, gt, score_thr, bm_thr): |
|
|
|
total, eps = len(records), 1e-6 |
|
for i in range(total): |
|
record = records[i] |
|
ID = record['ID'] |
|
|
|
|
|
if len(record['dtboxes']) < 1: |
|
result_queue.put_nowait(None) |
|
continue |
|
|
|
GT = list(filter(lambda rb:rb['ID'] == ID, gt)) |
|
if len(GT) < 1: |
|
result_queue.put_nowait(None) |
|
continue |
|
|
|
GT = GT[0] |
|
if 'height' in record and 'width' in record: |
|
height, width = record['height'], record['width'] |
|
else: |
|
height, width = GT['height'], GT['width'] |
|
flags = np.array([is_ignore(rb) for rb in GT['gtboxes']]) |
|
rows = np.where(~flags)[0] |
|
ignores = np.where(flags)[0] |
|
|
|
gtboxes = np.vstack([GT['gtboxes'][j]['fbox'] for j in rows]) |
|
gtboxes = recover_func(gtboxes) |
|
gtboxes = clip_boundary(gtboxes, height, width) |
|
|
|
if ignores.size: |
|
ignores = np.vstack([GT['gtboxes'][j]['fbox'] for j in ignores]) |
|
ignores = recover_func(ignores) |
|
ignores = clip_boundary(ignores, height, width) |
|
|
|
dtboxes = np.vstack([np.hstack([rb['box'], rb['score']]) for rb in record['dtboxes']]) |
|
dtboxes = recover_func(dtboxes) |
|
dtboxes = clip_boundary(dtboxes, height, width) |
|
rows = np.where(dtboxes[:,-1]> score_thr)[0] |
|
dtboxes = dtboxes[rows,...] |
|
|
|
matches = compute_JC(dtboxes, gtboxes, bm_thr) |
|
dt_ign, gt_ign = 0, 0 |
|
|
|
if ignores.size: |
|
indices = np.array([j for (j,_) in matches]) |
|
dt_ign = get_ignores(indices, dtboxes, ignores, bm_thr) |
|
indices = np.array([j for (_,j) in matches]) |
|
gt_ign = get_ignores(indices, gtboxes, ignores, bm_thr) |
|
|
|
k = len(matches) |
|
m = gtboxes.shape[0] - gt_ign |
|
n = dtboxes.shape[0] - dt_ign |
|
|
|
ratio = k / (m + n -k + eps) |
|
recall = k / (m + eps) |
|
cover = k / (n + eps) |
|
noise = 1 - cover |
|
|
|
result_dict = dict(ID = ID, ratio = ratio, recall = recall , noise = noise , |
|
cover = cover, k= k ,n = n, m = m) |
|
result_queue.put_nowait(result_dict) |
|
|
|
def common_process(func, data, nr_procs, *args): |
|
|
|
total = len(data) |
|
stride = math.ceil(total / nr_procs) |
|
result_queue = Queue(10000) |
|
results, procs = [], [] |
|
tqdm.monitor_interval = 0 |
|
pbar = tqdm(total = total, leave = False, ascii = True) |
|
for i in range(nr_procs): |
|
start = i*stride |
|
end = np.min([start+stride,total]) |
|
sample_data = data[start:end] |
|
|
|
|
|
p = Process(target= func,args=(result_queue, sample_data, *args)) |
|
p.start() |
|
procs.append(p) |
|
|
|
for i in range(total): |
|
|
|
t = result_queue.get() |
|
if t is None: |
|
pbar.update(1) |
|
continue |
|
results.append(t) |
|
pbar.update() |
|
for p in procs: |
|
p.join() |
|
return results |
|
|
|
def recover_func(bboxes): |
|
|
|
assert bboxes.shape[1]>=4 |
|
bboxes[:, 2:4] += bboxes[:,:2] |
|
return bboxes |
|
|
|
def clip_boundary(dtboxes,height,width): |
|
|
|
assert dtboxes.shape[-1]>=4 |
|
dtboxes[:,0] = np.minimum(np.maximum(dtboxes[:,0],0), width - 1) |
|
dtboxes[:,1] = np.minimum(np.maximum(dtboxes[:,1],0), height - 1) |
|
dtboxes[:,2] = np.maximum(np.minimum(dtboxes[:,2],width), 0) |
|
dtboxes[:,3] = np.maximum(np.minimum(dtboxes[:,3],height), 0) |
|
return dtboxes |
|
|
|
def get_ignores(indices, boxes, ignores, ioa_thr): |
|
|
|
indices = list(set(np.arange(boxes.shape[0])) - set(indices)) |
|
rboxes = boxes[indices, :] |
|
ioas = compute_ioa_matrix(rboxes, ignores) |
|
ioas = np.max(ioas, axis = 1) |
|
rows = np.where(ioas > ioa_thr)[0] |
|
return rows.size |
|
|
|
def compute_ioa_matrix(dboxes: np.ndarray, gboxes: np.ndarray): |
|
|
|
assert dboxes.shape[-1] >= 4 and gboxes.shape[-1] >= 4 |
|
N, K = dboxes.shape[0], gboxes.shape[0] |
|
eps = 1e-6 |
|
dtboxes = np.tile(np.expand_dims(dboxes, axis = 1), (1, K, 1)) |
|
gtboxes = np.tile(np.expand_dims(gboxes, axis = 0), (N, 1, 1)) |
|
|
|
iw = np.minimum(dtboxes[:,:,2], gtboxes[:,:,2]) - np.maximum(dtboxes[:,:,0], gtboxes[:,:,0]) |
|
ih = np.minimum(dtboxes[:,:,3], gtboxes[:,:,3]) - np.maximum(dtboxes[:,:,1], gtboxes[:,:,1]) |
|
inter = np.maximum(0, iw) * np.maximum(0, ih) |
|
|
|
dtarea = np.maximum(dtboxes[:,:,2] - dtboxes[:,:,0], 0) * np.maximum(dtboxes[:,:,3] - dtboxes[:,:,1], 0) |
|
ioas = inter / (dtarea + eps) |
|
return ioas |
|
|
|
def is_ignore(record): |
|
|
|
flag = False |
|
if 'extra' in record: |
|
if 'ignore' in record['extra']: |
|
flag = True if record['extra']['ignore'] else False |
|
return flag |
|
|
|
def compute_iou_matrix(dboxes:np.ndarray, gboxes:np.ndarray): |
|
|
|
assert dboxes.shape[-1] >= 4 and gboxes.shape[-1] >= 4 |
|
eps = 1e-6 |
|
N, K = dboxes.shape[0], gboxes.shape[0] |
|
dtboxes = np.tile(np.expand_dims(dboxes, axis = 1), (1, K, 1)) |
|
gtboxes = np.tile(np.expand_dims(gboxes, axis = 0), (N, 1, 1)) |
|
|
|
iw = np.minimum(dtboxes[:,:,2], gtboxes[:,:,2]) - np.maximum(dtboxes[:,:,0], gtboxes[:,:,0]) |
|
ih = np.minimum(dtboxes[:,:,3], gtboxes[:,:,3]) - np.maximum(dtboxes[:,:,1], gtboxes[:,:,1]) |
|
inter = np.maximum(0, iw) * np.maximum(0, ih) |
|
|
|
dtarea = (dtboxes[:,:,2] - dtboxes[:,:,0]) * (dtboxes[:,:,3] - dtboxes[:,:,1]) |
|
gtarea = (gtboxes[:,:,2] - gtboxes[:,:,0]) * (gtboxes[:,:,3] - gtboxes[:,:,1]) |
|
ious = inter / (dtarea + gtarea - inter + eps) |
|
return ious |
|
|
|
def compute_lap(dtboxes, gtboxes, thr): |
|
|
|
eps = 1e-7 |
|
n, k = dtboxes.shape[0], gtboxes.shape[0] |
|
if k + n < 2: |
|
m, n = np.array([]), np.array([]) |
|
return m, n |
|
|
|
overlaps = compute_iou_matrix(dtboxes, gtboxes) |
|
|
|
if n < 2: |
|
cols = np.argmax(overlaps, axis = 1) |
|
rows = np.array([0]) |
|
m, n = (rows, cols) if thr - overlaps[rows, cols] < eps else (np.array([]), np.array([])) |
|
return m, n |
|
|
|
if k < 2: |
|
|
|
rows = np.argmax(overlaps, axis = 0) |
|
cols = np.array([0]) |
|
m,n = (rows, cols) if thr - overlaps[rows, cols] < eps else (np.array([]), np.array([])) |
|
return m, n |
|
|
|
ious = overlaps * (overlaps >= thr) |
|
|
|
matches = minimumWeightMatching(-ious) |
|
m, n = np.array([i for i, _ in matches]).astype(np.int32), np.array([i for _, i in matches]).astype(np.int32) |
|
indice = np.where(overlaps[m, n] < thr)[0] |
|
|
|
if indice.size >= m.size: |
|
m, n = np.array([]), np.array([]) |
|
else: |
|
index = np.array(list(set(np.arange(m.size)) - set(indice))).astype(np.int) |
|
m, n = m[index], n[index] |
|
|
|
return m, n |
|
|
|
def minimumWeightMatching(costSet : np.ndarray) -> list: |
|
''' |
|
Computes a minimum-weight matching in a bipartite graph |
|
(A union B, E). |
|
costSet: |
|
An (m x n)-matrix of real values, where costSet[i, j] |
|
is the cost of matching the i:th vertex in A to the j:th |
|
vertex of B. A value of numpy.inf is allowed, and is |
|
interpreted as missing the (i, j)-edge. |
|
returns: |
|
A minimum-weight matching given as a list of pairs (i, j), |
|
denoting that the i:th vertex of A be paired with the j:th |
|
vertex of B. |
|
''' |
|
|
|
m, n = costSet.shape |
|
nMax = max(m, n) |
|
|
|
|
|
|
|
|
|
|
|
costSet_ = np.full((nMax, nMax), np.inf) |
|
|
|
mask = costSet < 0 |
|
costSet_[:m, :n][mask] = costSet[mask] |
|
assert costSet_.shape[0] == costSet_.shape[1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
practicalInfinity = 2 * costSet[costSet < np.inf].max() + 10 |
|
except ValueError: |
|
|
|
|
|
practicalInfinity = 1 |
|
|
|
|
|
costSet_[costSet_ == np.inf] = practicalInfinity |
|
|
|
|
|
iSet, jSet = linear_sum_assignment(costSet_) |
|
assert len(iSet) == len(jSet) |
|
|
|
|
|
indices = [(iSet[k], jSet[k]) |
|
for k in range(len(iSet)) |
|
if costSet_[iSet[k], jSet[k]] != practicalInfinity] |
|
|
|
return indices |
|
|
|
|
|
def compute_JC(detection:np.ndarray, gt:np.ndarray, iou_thresh:np.ndarray): |
|
|
|
rows, cols = compute_lap(detection, gt, iou_thresh) |
|
return [(i, j) for i, j in zip(rows, cols)] |
|
|