Spaces:
Runtime error
Runtime error
import math | |
import os | |
import warnings | |
from glob import glob | |
from typing import Union | |
from functools import partial | |
from torch.utils.data import DataLoader | |
from prefetch_generator import BackgroundGenerator | |
import random | |
import itertools | |
import yaml | |
import argparse | |
import cv2 | |
import numpy as np | |
import torch | |
from matplotlib import pyplot as plt | |
from torch import nn | |
from torch.nn.init import _calculate_fan_in_and_fan_out, _no_grad_normal_ | |
from torchvision.ops.boxes import batched_nms | |
from pathlib import Path | |
from .sync_batchnorm import SynchronizedBatchNorm2d | |
class Params: | |
def __init__(self, project_file): | |
self.params = yaml.safe_load(open(project_file).read()) | |
def __getattr__(self, item): | |
return self.params.get(item, None) | |
def save_checkpoint(ckpt, saved_path, name): | |
if isinstance(ckpt, dict): | |
if isinstance(ckpt['model'], CustomDataParallel): | |
ckpt['model'] = ckpt['model'].module.model.state_dict() | |
torch.save(ckpt, os.path.join(saved_path, name)) | |
else: | |
ckpt['model'] = ckpt['model'].model.state_dict() | |
torch.save(ckpt, os.path.join(saved_path, name)) | |
else: | |
if isinstance(ckpt, CustomDataParallel): | |
torch.save(ckpt.module.model.state_dict(), os.path.join(saved_path, name)) | |
else: | |
torch.save(ckpt.model.state_dict(), os.path.join(saved_path, name)) | |
def fitness(x): | |
# Model fitness as a weighted combination of metrics | |
w = [0.0, 0.0, 0.1, 0.9, 0.0, 0.0, 0.0] # weights for [P, R, [email protected], [email protected]:0.95, iou score, f1_score, loss] | |
return (x[:, :] * w).sum(1) | |
def invert_affine(metas: Union[float, list, tuple], preds): | |
for i in range(len(preds)): | |
if len(preds[i]['rois']) == 0: | |
continue | |
else: | |
if metas is float: | |
preds[i]['rois'][:, [0, 2]] = preds[i]['rois'][:, [0, 2]] / metas | |
preds[i]['rois'][:, [1, 3]] = preds[i]['rois'][:, [1, 3]] / metas | |
else: | |
new_w, new_h, old_w, old_h, padding_w, padding_h = metas[i] | |
preds[i]['rois'][:, [0, 2]] = preds[i]['rois'][:, [0, 2]] / (new_w / old_w) | |
preds[i]['rois'][:, [1, 3]] = preds[i]['rois'][:, [1, 3]] / (new_h / old_h) | |
return preds | |
def aspectaware_resize_padding_edited(image, width, height, interpolation=None, means=None): | |
old_h, old_w, c = image.shape | |
new_h = height | |
new_w = width | |
padding_h = 0 | |
padding_w = 0 | |
image = cv2.resize(image, (640,384), interpolation=cv2.INTER_AREA) | |
return image, new_w, new_h, old_w, old_h, padding_w, padding_h | |
def aspectaware_resize_padding(image, width, height, interpolation=None, means=None): | |
old_h, old_w, c = image.shape | |
if old_w > old_h: | |
new_w = width | |
new_h = int(width / old_w * old_h) | |
else: | |
new_w = int(height / old_h * old_w) | |
new_h = height | |
canvas = np.zeros((height, height, c), np.float32) | |
if means is not None: | |
canvas[...] = means | |
if new_w != old_w or new_h != old_h: | |
if interpolation is None: | |
image = cv2.resize(image, (new_w, new_h)) | |
else: | |
image = cv2.resize(image, (new_w, new_h), interpolation=interpolation) | |
padding_h = height - new_h | |
padding_w = width - new_w | |
if c > 1: | |
canvas[:new_h, :new_w] = image | |
else: | |
if len(image.shape) == 2: | |
canvas[:new_h, :new_w, 0] = image | |
else: | |
canvas[:new_h, :new_w] = image | |
return canvas, new_w, new_h, old_w, old_h, padding_w, padding_h, | |
def preprocess(image_path, max_size=512, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)): | |
ori_imgs = [cv2.imread(str(img_path)) for img_path in image_path] | |
normalized_imgs = [(img[..., ::-1] / 255 - mean) / std for img in ori_imgs] | |
imgs_meta = [aspectaware_resize_padding_edited(img, 640, 384, | |
means=None, interpolation=cv2.INTER_AREA) for img in normalized_imgs] | |
# imgs_meta = [aspectaware_resize_padding(img, max_size, max_size, | |
# means=None) for img in normalized_imgs] | |
framed_imgs = [img_meta[0] for img_meta in imgs_meta] | |
framed_metas = [img_meta[1:] for img_meta in imgs_meta] | |
return ori_imgs, framed_imgs, framed_metas | |
def preprocess_video(*frame_from_video, max_size=512, mean=(0.406, 0.456, 0.485), std=(0.225, 0.224, 0.229)): | |
ori_imgs = frame_from_video | |
normalized_imgs = [(img[..., ::-1] / 255 - mean) / std for img in ori_imgs] | |
imgs_meta = [aspectaware_resize_padding(img, 640, 384, | |
means=None) for img in normalized_imgs] | |
framed_imgs = [img_meta[0] for img_meta in imgs_meta] | |
framed_metas = [img_meta[1:] for img_meta in imgs_meta] | |
return ori_imgs, framed_imgs, framed_metas | |
def postprocess(x, anchors, regression, classification, regressBoxes, clipBoxes, threshold, iou_threshold): | |
transformed_anchors = regressBoxes(anchors, regression) | |
transformed_anchors = clipBoxes(transformed_anchors, x) | |
scores = torch.max(classification, dim=2, keepdim=True)[0] | |
scores_over_thresh = (scores > threshold)[:, :, 0] | |
out = [] | |
for i in range(x.shape[0]): | |
if scores_over_thresh[i].sum() == 0: | |
out.append({ | |
'rois': np.array(()), | |
'class_ids': np.array(()), | |
'scores': np.array(()), | |
}) | |
continue | |
classification_per = classification[i, scores_over_thresh[i, :], ...].permute(1, 0) | |
transformed_anchors_per = transformed_anchors[i, scores_over_thresh[i, :], ...] | |
scores_per = scores[i, scores_over_thresh[i, :], ...] | |
scores_, classes_ = classification_per.max(dim=0) | |
anchors_nms_idx = batched_nms(transformed_anchors_per, scores_per[:, 0], classes_, iou_threshold=iou_threshold) | |
if anchors_nms_idx.shape[0] != 0: | |
classes_ = classes_[anchors_nms_idx] | |
scores_ = scores_[anchors_nms_idx] | |
boxes_ = transformed_anchors_per[anchors_nms_idx, :] | |
out.append({ | |
'rois': boxes_.cpu().numpy(), | |
'class_ids': classes_.cpu().numpy(), | |
'scores': scores_.cpu().numpy(), | |
}) | |
else: | |
out.append({ | |
'rois': np.array(()), | |
'class_ids': np.array(()), | |
'scores': np.array(()), | |
}) | |
return out | |
def replace_w_sync_bn(m): | |
for var_name in dir(m): | |
target_attr = getattr(m, var_name) | |
if type(target_attr) == torch.nn.BatchNorm2d: | |
num_features = target_attr.num_features | |
eps = target_attr.eps | |
momentum = target_attr.momentum | |
affine = target_attr.affine | |
# get parameters | |
running_mean = target_attr.running_mean | |
running_var = target_attr.running_var | |
if affine: | |
weight = target_attr.weight | |
bias = target_attr.bias | |
setattr(m, var_name, | |
SynchronizedBatchNorm2d(num_features, eps, momentum, affine)) | |
target_attr = getattr(m, var_name) | |
# set parameters | |
target_attr.running_mean = running_mean | |
target_attr.running_var = running_var | |
if affine: | |
target_attr.weight = weight | |
target_attr.bias = bias | |
for var_name, children in m.named_children(): | |
replace_w_sync_bn(children) | |
class CustomDataParallel(nn.DataParallel): | |
""" | |
force splitting data to all gpus instead of sending all data to cuda:0 and then moving around. | |
""" | |
def __init__(self, module, num_gpus): | |
super().__init__(module) | |
self.num_gpus = num_gpus | |
def scatter(self, inputs, kwargs, device_ids): | |
# More like scatter and data prep at the same time. The point is we prep the data in such a way | |
# that no scatter is necessary, and there's no need to shuffle stuff around different GPUs. | |
devices = ['cuda:' + str(x) for x in range(self.num_gpus)] | |
splits = inputs[0].shape[0] // self.num_gpus | |
if splits == 0: | |
raise Exception('Batchsize must be greater than num_gpus.') | |
return [(inputs[0][splits * device_idx: splits * (device_idx + 1)].to(f'cuda:{device_idx}', non_blocking=True), | |
inputs[1][splits * device_idx: splits * (device_idx + 1)].to(f'cuda:{device_idx}', non_blocking=True), | |
inputs[2][splits * device_idx: splits * (device_idx + 1)].to(f'cuda:{device_idx}', non_blocking=True)) | |
for device_idx in range(len(devices))], \ | |
[kwargs] * len(devices) | |
def get_last_weights(weights_path): | |
weights_path = glob(weights_path + f'/*.pth') | |
weights_path = sorted(weights_path, | |
key=lambda x: int(x.rsplit('_')[-1].rsplit('.')[0]), | |
reverse=True)[0] | |
print(f'using weights {weights_path}') | |
return weights_path | |
def init_weights(model): | |
for name, module in model.named_modules(): | |
is_conv_layer = isinstance(module, nn.Conv2d) | |
if is_conv_layer: | |
if "conv_list" or "header" in name: | |
variance_scaling_(module.weight.data) | |
else: | |
nn.init.kaiming_uniform_(module.weight.data) | |
if module.bias is not None: | |
if "classifier.header" in name: | |
bias_value = -np.log((1 - 0.01) / 0.01) | |
torch.nn.init.constant_(module.bias, bias_value) | |
else: | |
module.bias.data.zero_() | |
def variance_scaling_(tensor, gain=1.): | |
# type: (Tensor, float) -> Tensor | |
r""" | |
initializer for SeparableConv in Regressor/Classifier | |
reference: https://keras.io/zh/initializers/ VarianceScaling | |
""" | |
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor) | |
std = math.sqrt(gain / float(fan_in)) | |
return _no_grad_normal_(tensor, 0., std) | |
def boolean_string(s): | |
if s not in {'False', 'True'}: | |
raise ValueError('Not a valid boolean string') | |
return s == 'True' | |
def restricted_float(x): | |
try: | |
x = float(x) | |
except ValueError: | |
raise argparse.ArgumentTypeError("%r not a floating-point literal" % (x,)) | |
if x < 0.0 or x > 1.0: | |
raise argparse.ArgumentTypeError("%r not in range [0.0, 1.0]"%(x,)) | |
return x | |
# --------------------------EVAL UTILS--------------------------- | |
def process_batch(detections, labels, iou_thresholds): | |
""" | |
Return correct predictions matrix. Both sets of boxes are in (x1, y1, x2, y2) format. | |
Arguments: | |
detections (Array[N, 6]), x1, y1, x2, y2, conf, class | |
labels (Array[M, 5]), class, x1, y1, x2, y2 | |
iou_thresholds: list iou thresholds from 0.5 -> 0.95 | |
Returns: | |
correct (Array[N, 10]), for 10 IoU levels | |
""" | |
labels = labels.to(detections.device) | |
# print("ASDA", detections[:, 5].shape) | |
# print("SADASD", labels[:, 4].shape) | |
correct = torch.zeros(detections.shape[0], iou_thresholds.shape[0], dtype=torch.bool, device=iou_thresholds.device) | |
iou = box_iou(labels[:, :4], detections[:, :4]) | |
# print(labels[:, 4], detections[:, 5]) | |
x = torch.where((iou >= iou_thresholds[0]) & (labels[:, 4:5] == detections[:, 5])) | |
# abc = detections[:,5].unsqueeze(1) | |
# print(labels[:, 4] == abc) | |
# exit() | |
if x[0].shape[0]: | |
# [label, detection, iou] | |
matches = torch.cat((torch.stack(x, 1), iou[x[0], x[1]][:, None]), 1).cpu().numpy() | |
if x[0].shape[0] > 1: | |
matches = matches[matches[:, 2].argsort()[::-1]] | |
matches = matches[np.unique(matches[:, 1], return_index=True)[1]] | |
matches = matches[np.unique(matches[:, 0], return_index=True)[1]] | |
matches = torch.Tensor(matches).to(iou_thresholds.device) | |
correct[matches[:, 1].long()] = matches[:, 2:3] >= iou_thresholds | |
return correct | |
def box_iou(box1, box2): | |
# https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py | |
""" | |
Return intersection-over-union (Jaccard index) of boxes. | |
Both sets of boxes are expected to be in (x1, y1, x2, y2) format. | |
Arguments: | |
box1 (Tensor[N, 4]) | |
box2 (Tensor[M, 4]) | |
Returns: | |
iou (Tensor[N, M]): the NxM matrix containing the pairwise | |
IoU values for every element in boxes1 and boxes2 | |
""" | |
def box_area(box): | |
# box = 4xn | |
return (box[2] - box[0]) * (box[3] - box[1]) | |
box1 = box1.cuda() | |
area1 = box_area(box1.T) | |
area2 = box_area(box2.T) | |
# inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) | |
inter = (torch.min(box1[:, None, 2:], box2[:, 2:]) - torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2) | |
return inter / (area1[:, None] + area2 - inter) # iou = inter / (area1 + area2 - inter) | |
def xywh2xyxy(x): | |
# Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right | |
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) | |
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x | |
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y | |
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x | |
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y | |
return y | |
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): | |
if len(coords) == 0: | |
return [] | |
# Rescale coords (xyxy) from img1_shape to img0_shape | |
if ratio_pad is None: # calculate from img0_shape | |
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new | |
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding | |
else: | |
gain = ratio_pad[0][0] | |
pad = ratio_pad[1] | |
coords[:, [0, 2]] -= pad[0] # x padding | |
coords[:, [1, 3]] -= pad[1] # y padding | |
coords[:, :4] /= gain | |
clip_coords(coords, img0_shape) | |
return coords | |
def clip_coords(boxes, shape): | |
# Clip bounding xyxy bounding boxes to image shape (height, width) | |
if isinstance(boxes, torch.Tensor): # faster individually | |
boxes[:, 0].clamp_(0, shape[1]) # x1 | |
boxes[:, 1].clamp_(0, shape[0]) # y1 | |
boxes[:, 2].clamp_(0, shape[1]) # x2 | |
boxes[:, 3].clamp_(0, shape[0]) # y2 | |
else: # np.array (faster grouped) | |
boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) # x1, x2 | |
boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) # y1, y2 | |
def ap_per_class(tp, conf, pred_cls, target_cls, plot=False, save_dir='precision-recall_curve.png', names=[]): | |
""" Compute the average precision, given the recall and precision curves. | |
Source: https://github.com/rafaelpadilla/Object-Detection-Metrics. | |
# Arguments | |
tp: True positives (nparray, nx1 or nx10). | |
conf: Objectness value from 0-1 (nparray). | |
pred_cls: Predicted object classes (nparray). | |
target_cls: True object classes (nparray). | |
plot: Plot precision-recall curve at [email protected] | |
save_dir: Plot save directory | |
# Returns | |
The average precision as computed in py-faster-rcnn. | |
""" | |
# Sort by objectness | |
i = np.argsort(-conf) | |
tp, conf, pred_cls = tp[i], conf[i], pred_cls[i] | |
# Find unique classes | |
unique_classes = np.unique(target_cls) | |
# Create Precision-Recall curve and compute AP for each class | |
px, py = np.linspace(0, 1, 1000), [] # for plotting | |
pr_score = 0.1 # score to evaluate P and R https://github.com/ultralytics/yolov3/issues/898 | |
s = [unique_classes.shape[0], tp.shape[1]] # number class, number iou thresholds (i.e. 10 for mAP0.5...0.95) | |
ap, p, r = np.zeros(s), np.zeros((unique_classes.shape[0], 1000)), np.zeros((unique_classes.shape[0], 1000)) | |
for ci, c in enumerate(unique_classes): | |
i = pred_cls == c | |
n_l = (target_cls == c).sum() # number of labels | |
n_p = i.sum() # number of predictions | |
if n_p == 0 or n_l == 0: | |
continue | |
else: | |
# Accumulate FPs and TPs | |
fpc = (1 - tp[i]).cumsum(0) | |
tpc = tp[i].cumsum(0) | |
# Recall | |
recall = tpc / (n_l + 1e-16) # recall curve | |
r[ci] = np.interp(-px, -conf[i], recall[:, 0], left=0) # negative x, xp because xp decreases | |
# Precision | |
precision = tpc / (tpc + fpc) # precision curve | |
p[ci] = np.interp(-px, -conf[i], precision[:, 0], left=1) # p at pr_score | |
# AP from recall-precision curve | |
for j in range(tp.shape[1]): | |
ap[ci, j], mpre, mrec = compute_ap(recall[:, j], precision[:, j]) | |
if plot and (j == 0): | |
py.append(np.interp(px, mrec, mpre)) # precision at [email protected] | |
# Compute F1 score (harmonic mean of precision and recall) | |
f1 = 2 * p * r / (p + r + 1e-16) | |
i=r.mean(0).argmax() | |
if plot: | |
plot_pr_curve(px, py, ap, save_dir, names) | |
return p[:, i], r[:, i], f1[:, i], ap, unique_classes.astype('int32') | |
def compute_ap(recall, precision): | |
""" Compute the average precision, given the recall and precision curves | |
# Arguments | |
recall: The recall curve (list) | |
precision: The precision curve (list) | |
# Returns | |
Average precision, precision curve, recall curve | |
""" | |
# Append sentinel values to beginning and end | |
mrec = np.concatenate(([0.0], recall, [1.0])) | |
mpre = np.concatenate(([1.0], precision, [0.0])) | |
# Compute the precision envelope | |
mpre = np.flip(np.maximum.accumulate(np.flip(mpre))) | |
# Integrate area under curve | |
method = 'interp' # methods: 'continuous', 'interp' | |
if method == 'interp': | |
x = np.linspace(0, 1, 101) # 101-point interp (COCO) | |
ap = np.trapz(np.interp(x, mrec, mpre), x) # integrate | |
else: # 'continuous' | |
i = np.where(mrec[1:] != mrec[:-1])[0] # points where x axis (recall) changes | |
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) # area under curve | |
return ap, mpre, mrec | |
def plot_pr_curve(px, py, ap, save_dir='pr_curve.png', names=()): | |
# Precision-recall curve | |
fig, ax = plt.subplots(1, 1, figsize=(9, 6), tight_layout=True) | |
py = np.stack(py, axis=1) | |
if 0 < len(names) < 21: # display per-class legend if < 21 classes | |
for i, y in enumerate(py.T): | |
ax.plot(px, y, linewidth=1, label=f'{names[i]} {ap[i, 0]:.3f}') # plot(recall, precision) | |
else: | |
ax.plot(px, py, linewidth=1, color='grey') # plot(recall, precision) | |
ax.plot(px, py.mean(1), linewidth=3, color='blue', label='all classes %.3f [email protected]' % ap[:, 0].mean()) | |
ax.set_xlabel('Recall') | |
ax.set_ylabel('Precision') | |
ax.set_xlim(0, 1) | |
ax.set_ylim(0, 1) | |
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left") | |
fig.savefig(Path(save_dir), dpi=250) | |
plt.close() | |
def plot_mc_curve(px, py, save_dir='mc_curve.png', names=(), xlabel='Confidence', ylabel='Metric'): | |
# Metric-confidence curve | |
fig, ax = plt.subplots(1, 1, figsize=(9, 6), tight_layout=True) | |
if 0 < len(names) < 21: # display per-class legend if < 21 classes | |
for i, y in enumerate(py): | |
ax.plot(px, y, linewidth=1, label=f'{names[i]}') # plot(confidence, metric) | |
else: | |
ax.plot(px, py.T, linewidth=1, color='grey') # plot(confidence, metric) | |
y = py.mean(0) | |
ax.plot(px, y, linewidth=3, color='blue', label=f'all classes {y.max():.2f} at {px[y.argmax()]:.3f}') | |
ax.set_xlabel(xlabel) | |
ax.set_ylabel(ylabel) | |
ax.set_xlim(0, 1) | |
ax.set_ylim(0, 1) | |
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left") | |
fig.savefig(Path(save_dir), dpi=250) | |
plt.close() | |
def cal_weighted_ap(ap50): | |
return 0.2 * ap50[1] + 0.3 * ap50[0] + 0.5 * ap50[2] | |
class ConfusionMatrix: | |
# Updated version of https://github.com/kaanakan/object_detection_confusion_matrix | |
def __init__(self, nc, conf=0.25, iou_thres=0.45): | |
self.matrix = np.zeros((nc + 1, nc + 1)) | |
self.nc = nc # number of classes | |
self.conf = conf | |
self.iou_thres = iou_thres | |
def process_batch(self, detections, labels): | |
""" | |
Return intersection-over-union (Jaccard index) of boxes. | |
Both sets of boxes are expected to be in (x1, y1, x2, y2) format. | |
Arguments: | |
detections (Array[N, 6]), x1, y1, x2, y2, conf, class | |
labels (Array[M, 5]), class, x1, y1, x2, y2 | |
Returns: | |
None, updates confusion matrix accordingly | |
""" | |
detections = detections[detections[:, 4] > self.conf] | |
gt_classes = labels[:, 4].int() | |
detection_classes = detections[:, 5].int() | |
iou = box_iou(labels[:, :4], detections[:, :4]) | |
x = torch.where(iou > self.iou_thres) | |
if x[0].shape[0]: | |
matches = torch.cat((torch.stack(x, 1), iou[x[0], x[1]][:, None]), 1).cpu().numpy() | |
if x[0].shape[0] > 1: | |
matches = matches[matches[:, 2].argsort()[::-1]] | |
matches = matches[np.unique(matches[:, 1], return_index=True)[1]] | |
matches = matches[matches[:, 2].argsort()[::-1]] | |
matches = matches[np.unique(matches[:, 0], return_index=True)[1]] | |
else: | |
matches = np.zeros((0, 3)) | |
n = matches.shape[0] > 0 | |
m0, m1, _ = matches.transpose().astype(np.int16) | |
for i, gc in enumerate(gt_classes): | |
j = m0 == i | |
if n and sum(j) == 1: | |
self.matrix[detection_classes[m1[j]], gc] += 1 # correct | |
else: | |
self.matrix[self.nc, gc] += 1 # background FP | |
if n: | |
for i, dc in enumerate(detection_classes): | |
if not any(m1 == i): | |
self.matrix[dc, self.nc] += 1 # background FN | |
def matrix(self): | |
return self.matrix | |
def tp_fp(self): | |
tp = self.matrix.diagonal() # true positives | |
fp = self.matrix.sum(1) - tp # false positives | |
fn = self.matrix.sum(0) - tp # false negatives (missed detections) | |
return tp[:-1], fp[:-1], fn[:-1] # remove background class | |
def plot(self, normalize=True, save_dir='', names=()): | |
try: | |
import seaborn as sn | |
array = self.matrix / ((self.matrix.sum(0).reshape(1, -1) + 1E-6) if normalize else 1) # normalize columns | |
array[array < 0.005] = np.nan # don't annotate (would appear as 0.00) | |
fig = plt.figure(figsize=(12, 9), tight_layout=True) | |
sn.set(font_scale=1.0 if self.nc < 50 else 0.8) # for label size | |
labels = (0 < len(names) < 99) and len(names) == self.nc # apply names to ticklabels | |
with warnings.catch_warnings(): | |
warnings.simplefilter('ignore') # suppress empty matrix RuntimeWarning: All-NaN slice encountered | |
sn.heatmap(array, annot=self.nc < 30, annot_kws={"size": 8}, cmap='Blues', fmt='.2f', square=True, | |
xticklabels=names + ['background FP'] if labels else "auto", | |
yticklabels=names + ['background FN'] if labels else "auto").set_facecolor((1, 1, 1)) | |
fig.axes[0].set_xlabel('True') | |
fig.axes[0].set_ylabel('Predicted') | |
fig.savefig(Path(save_dir) / 'confusion_matrix.png', dpi=250) | |
plt.close() | |
except Exception as e: | |
print(f'WARNING: ConfusionMatrix plot failure: {e}') | |
def print(self): | |
for i in range(self.nc + 1): | |
print(' '.join(map(str, self.matrix[i]))) | |
class BBoxTransform(nn.Module): | |
def forward(self, anchors, regression): | |
y_centers_a = (anchors[..., 0] + anchors[..., 2]) / 2 | |
x_centers_a = (anchors[..., 1] + anchors[..., 3]) / 2 | |
ha = anchors[..., 2] - anchors[..., 0] | |
wa = anchors[..., 3] - anchors[..., 1] | |
w = regression[..., 3].exp() * wa | |
h = regression[..., 2].exp() * ha | |
y_centers = regression[..., 0] * ha + y_centers_a | |
x_centers = regression[..., 1] * wa + x_centers_a | |
ymin = y_centers - h / 2. | |
xmin = x_centers - w / 2. | |
ymax = y_centers + h / 2. | |
xmax = x_centers + w / 2. | |
return torch.stack([xmin, ymin, xmax, ymax], dim=2) | |
class ClipBoxes(nn.Module): | |
def __init__(self): | |
super(ClipBoxes, self).__init__() | |
def forward(self, boxes, img): | |
batch_size, num_channels, height, width = img.shape | |
boxes[:, :, 0] = torch.clamp(boxes[:, :, 0], min=0) | |
boxes[:, :, 1] = torch.clamp(boxes[:, :, 1], min=0) | |
boxes[:, :, 2] = torch.clamp(boxes[:, :, 2], max=width - 1) | |
boxes[:, :, 3] = torch.clamp(boxes[:, :, 3], max=height - 1) | |
return boxes | |
class Anchors(nn.Module): | |
def __init__(self, anchor_scale=4., pyramid_levels=None, **kwargs): | |
super().__init__() | |
self.anchor_scale = anchor_scale | |
if pyramid_levels is None: | |
self.pyramid_levels = [3, 4, 5, 6, 7] | |
else: | |
self.pyramid_levels = pyramid_levels | |
self.strides = kwargs.get('strides', [2 ** x for x in self.pyramid_levels]) | |
self.scales = np.array(kwargs.get('scales', [2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)])) | |
self.ratios = kwargs.get('ratios', [(1.0, 1.0), (1.4, 0.7), (0.7, 1.4)]) | |
self.last_anchors = {} | |
self.last_shape = None | |
def forward(self, image, dtype=torch.float32): | |
"""Generates multiscale anchor boxes. | |
Args: | |
image_size: integer number of input image size. The input image has the | |
same dimension for width and height. The image_size should be divided by | |
the largest feature stride 2^max_level. | |
anchor_scale: float number representing the scale of size of the base | |
anchor to the feature stride 2^level. | |
anchor_configs: a dictionary with keys as the levels of anchors and | |
values as a list of anchor configuration. | |
Returns: | |
anchor_boxes: a numpy array with shape [N, 4], which stacks anchors on all | |
feature levels. | |
Raises: | |
ValueError: input size must be the multiple of largest feature stride. | |
""" | |
image_shape = image.shape[2:] | |
if image_shape == self.last_shape and image.device in self.last_anchors: | |
return self.last_anchors[image.device] | |
if self.last_shape is None or self.last_shape != image_shape: | |
self.last_shape = image_shape | |
if dtype == torch.float16: | |
dtype = np.float16 | |
else: | |
dtype = np.float32 | |
boxes_all = [] | |
for stride in self.strides: | |
boxes_level = [] | |
for scale, ratio in itertools.product(self.scales, self.ratios): | |
if image_shape[1] % stride != 0: | |
raise ValueError('input size must be divided by the stride.') | |
base_anchor_size = self.anchor_scale * stride * scale | |
anchor_size_x_2 = base_anchor_size * ratio[0] / 2.0 | |
anchor_size_y_2 = base_anchor_size * ratio[1] / 2.0 | |
x = np.arange(stride / 2, image_shape[1], stride) | |
y = np.arange(stride / 2, image_shape[0], stride) | |
xv, yv = np.meshgrid(x, y) | |
xv = xv.reshape(-1) | |
yv = yv.reshape(-1) | |
# y1,x1,y2,x2 | |
boxes = np.vstack((yv - anchor_size_y_2, xv - anchor_size_x_2, | |
yv + anchor_size_y_2, xv + anchor_size_x_2)) | |
boxes = np.swapaxes(boxes, 0, 1) | |
boxes_level.append(np.expand_dims(boxes, axis=1)) | |
# concat anchors on the same level to the reshape NxAx4 | |
boxes_level = np.concatenate(boxes_level, axis=1) | |
boxes_all.append(boxes_level.reshape([-1, 4])) | |
anchor_boxes = np.vstack(boxes_all) | |
anchor_boxes = torch.from_numpy(anchor_boxes.astype(dtype)).to(image.device) | |
anchor_boxes = anchor_boxes.unsqueeze(0) | |
# save it for later use to reduce overhead | |
self.last_anchors[image.device] = anchor_boxes | |
return anchor_boxes | |
class DataLoaderX(DataLoader): | |
"""prefetch dataloader""" | |
def __iter__(self): | |
return BackgroundGenerator(super().__iter__()) | |
def augment_hsv(img, hgain=0.5, sgain=0.5, vgain=0.5): | |
"""change color hue, saturation, value""" | |
r = np.random.uniform(-1, 1, 3) * [hgain, sgain, vgain] + 1 # random gains | |
hue, sat, val = cv2.split(cv2.cvtColor(img, cv2.COLOR_BGR2HSV)) | |
dtype = img.dtype # uint8 | |
x = np.arange(0, 256, dtype=np.int16) | |
lut_hue = ((x * r[0]) % 180).astype(dtype) | |
lut_sat = np.clip(x * r[1], 0, 255).astype(dtype) | |
lut_val = np.clip(x * r[2], 0, 255).astype(dtype) | |
img_hsv = cv2.merge((cv2.LUT(hue, lut_hue), cv2.LUT(sat, lut_sat), cv2.LUT(val, lut_val))).astype(dtype) | |
cv2.cvtColor(img_hsv, cv2.COLOR_HSV2BGR, dst=img) # no return needed | |
# Histogram equalization | |
# if random.random() < 0.2: | |
# for i in range(3): | |
# img[:, :, i] = cv2.equalizeHist(img[:, :, i]) | |
def random_perspective(combination, targets=(), degrees=10, translate=.1, scale=.1, shear=10, perspective=0.0, | |
border=(0, 0)): | |
"""combination of img transform""" | |
# torchvision.transforms.RandomAffine(degrees=(-10, 10), translate=(.1, .1), scale=(.9, 1.1), shear=(-10, 10)) | |
# targets = [cls, xyxy] | |
img, gray, line = combination | |
height = img.shape[0] + border[0] * 2 # shape(h,w,c) | |
width = img.shape[1] + border[1] * 2 | |
# Center | |
C = np.eye(3) | |
C[0, 2] = -img.shape[1] / 2 # x translation (pixels) | |
C[1, 2] = -img.shape[0] / 2 # y translation (pixels) | |
# Perspective | |
P = np.eye(3) | |
P[2, 0] = random.uniform(-perspective, perspective) # x perspective (about y) | |
P[2, 1] = random.uniform(-perspective, perspective) # y perspective (about x) | |
# Rotation and Scale | |
R = np.eye(3) | |
a = random.uniform(-degrees, degrees) | |
# a += random.choice([-180, -90, 0, 90]) # add 90deg rotations to small rotations | |
s = random.uniform(1 - scale, 1 + scale) | |
# s = 2 ** random.uniform(-scale, scale) | |
R[:2] = cv2.getRotationMatrix2D(angle=a, center=(0, 0), scale=s) | |
# Shear | |
S = np.eye(3) | |
S[0, 1] = math.tan(random.uniform(-shear, shear) * math.pi / 180) # x shear (deg) | |
S[1, 0] = math.tan(random.uniform(-shear, shear) * math.pi / 180) # y shear (deg) | |
# Translation | |
T = np.eye(3) | |
T[0, 2] = random.uniform(0.5 - translate, 0.5 + translate) * width # x translation (pixels) | |
T[1, 2] = random.uniform(0.5 - translate, 0.5 + translate) * height # y translation (pixels) | |
# Combined rotation matrix | |
M = T @ S @ R @ P @ C # order of operations (right to left) is IMPORTANT | |
if (border[0] != 0) or (border[1] != 0) or (M != np.eye(3)).any(): # image changed | |
if perspective: | |
img = cv2.warpPerspective(img, M, dsize=(width, height), borderValue=(114, 114, 114)) | |
gray = cv2.warpPerspective(gray, M, dsize=(width, height), borderValue=0) | |
line = cv2.warpPerspective(line, M, dsize=(width, height), borderValue=0) | |
else: # affine | |
img = cv2.warpAffine(img, M[:2], dsize=(width, height), borderValue=(114, 114, 114)) | |
gray = cv2.warpAffine(gray, M[:2], dsize=(width, height), borderValue=0) | |
line = cv2.warpAffine(line, M[:2], dsize=(width, height), borderValue=0) | |
# Visualize | |
# import matplotlib.pyplot as plt | |
# ax = plt.subplots(1, 2, figsize=(12, 6))[1].ravel() | |
# ax[0].imshow(img[:, :, ::-1]) # base | |
# ax[1].imshow(img2[:, :, ::-1]) # warped | |
# Transform label coordinates | |
n = len(targets) | |
if n: | |
# warp points | |
xy = np.ones((n * 4, 3)) | |
xy[:, :2] = targets[:, [1, 2, 3, 4, 1, 4, 3, 2]].reshape(n * 4, 2) # x1y1, x2y2, x1y2, x2y1 | |
xy = xy @ M.T # transform | |
if perspective: | |
xy = (xy[:, :2] / xy[:, 2:3]).reshape(n, 8) # rescale | |
else: # affine | |
xy = xy[:, :2].reshape(n, 8) | |
# create new boxes | |
x = xy[:, [0, 2, 4, 6]] | |
y = xy[:, [1, 3, 5, 7]] | |
xy = np.concatenate((x.min(1), y.min(1), x.max(1), y.max(1))).reshape(4, n).T | |
# # apply angle-based reduction of bounding boxes | |
# radians = a * math.pi / 180 | |
# reduction = max(abs(math.sin(radians)), abs(math.cos(radians))) ** 0.5 | |
# x = (xy[:, 2] + xy[:, 0]) / 2 | |
# y = (xy[:, 3] + xy[:, 1]) / 2 | |
# w = (xy[:, 2] - xy[:, 0]) * reduction | |
# h = (xy[:, 3] - xy[:, 1]) * reduction | |
# xy = np.concatenate((x - w / 2, y - h / 2, x + w / 2, y + h / 2)).reshape(4, n).T | |
# clip boxes | |
xy[:, [0, 2]] = xy[:, [0, 2]].clip(0, width) | |
xy[:, [1, 3]] = xy[:, [1, 3]].clip(0, height) | |
# filter candidates | |
i = _box_candidates(box1=targets[:, 1:5].T * s, box2=xy.T) | |
targets = targets[i] | |
targets[:, 1:5] = xy[i] | |
combination = (img, gray, line) | |
return combination, targets | |
def cutout(combination, labels): | |
# Applies image cutout augmentation https://arxiv.org/abs/1708.04552 | |
image, gray = combination | |
h, w = image.shape[:2] | |
def bbox_ioa(box1, box2): | |
# Returns the intersection over box2 area given box1, box2. box1 is 4, box2 is nx4. boxes are x1y1x2y2 | |
box2 = box2.transpose() | |
# Get the coordinates of bounding boxes | |
b1_x1, b1_y1, b1_x2, b1_y2 = box1[0], box1[1], box1[2], box1[3] | |
b2_x1, b2_y1, b2_x2, b2_y2 = box2[0], box2[1], box2[2], box2[3] | |
# Intersection area | |
inter_area = (np.minimum(b1_x2, b2_x2) - np.maximum(b1_x1, b2_x1)).clip(0) * \ | |
(np.minimum(b1_y2, b2_y2) - np.maximum(b1_y1, b2_y1)).clip(0) | |
# box2 area | |
box2_area = (b2_x2 - b2_x1) * (b2_y2 - b2_y1) + 1e-16 | |
# Intersection over box2 area | |
return inter_area / box2_area | |
# create random masks | |
scales = [0.5] * 1 + [0.25] * 2 + [0.125] * 4 + [0.0625] * 8 + [0.03125] * 16 # image size fraction | |
for s in scales: | |
mask_h = random.randint(1, int(h * s)) | |
mask_w = random.randint(1, int(w * s)) | |
# box | |
xmin = max(0, random.randint(0, w) - mask_w // 2) | |
ymin = max(0, random.randint(0, h) - mask_h // 2) | |
xmax = min(w, xmin + mask_w) | |
ymax = min(h, ymin + mask_h) | |
# print('xmin:{},ymin:{},xmax:{},ymax:{}'.format(xmin,ymin,xmax,ymax)) | |
# apply random color mask | |
image[ymin:ymax, xmin:xmax] = [random.randint(64, 191) for _ in range(3)] | |
gray[ymin:ymax, xmin:xmax] = -1 | |
# return unobscured labels | |
if len(labels) and s > 0.03: | |
box = np.array([xmin, ymin, xmax, ymax], dtype=np.float32) | |
ioa = bbox_ioa(box, labels[:, 1:5]) # intersection over area | |
labels = labels[ioa < 0.60] # remove >60% obscured labels | |
return image, gray, labels | |
def letterbox(combination, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True): | |
"""缩放并在图片顶部、底部添加灰边,具体参考:https://zhuanlan.zhihu.com/p/172121380""" | |
# Resize image to a 32-pixel-multiple rectangle https://github.com/ultralytics/yolov3/issues/232 | |
img, gray, line = combination | |
shape = img.shape[:2] # current shape [height, width] | |
if isinstance(new_shape, int): | |
new_shape = (new_shape, new_shape) | |
# Scale ratio (new / old) | |
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) | |
if not scaleup: # only scale down, do not scale up (for better test mAP) | |
r = min(r, 1.0) | |
# Compute padding | |
ratio = r, r # width, height ratios | |
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) | |
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding | |
if auto: # minimum rectangle | |
dw, dh = np.mod(dw, 32), np.mod(dh, 32) # wh padding | |
elif scaleFill: # stretch | |
dw, dh = 0.0, 0.0 | |
new_unpad = (new_shape[1], new_shape[0]) | |
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios | |
dw /= 2 # divide padding into 2 sides | |
dh /= 2 | |
if shape[::-1] != new_unpad: # resize | |
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) | |
gray = cv2.resize(gray, new_unpad, interpolation=cv2.INTER_LINEAR) | |
line = cv2.resize(line, new_unpad, interpolation=cv2.INTER_LINEAR) | |
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) | |
left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) | |
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border | |
gray = cv2.copyMakeBorder(gray, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0) # add border | |
line = cv2.copyMakeBorder(line, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0) # add border | |
combination = (img, gray, line) | |
return combination, ratio, (dw, dh) | |
def _box_candidates(box1, box2, wh_thr=2, ar_thr=20, area_thr=0.1): # box1(4,n), box2(4,n) | |
# Compute candidate boxes: box1 before augment, box2 after augment, wh_thr (pixels), aspect_ratio_thr, area_ratio | |
w1, h1 = box1[2] - box1[0], box1[3] - box1[1] | |
w2, h2 = box2[2] - box2[0], box2[3] - box2[1] | |
ar = np.maximum(w2 / (h2 + 1e-16), h2 / (w2 + 1e-16)) # aspect ratio | |
return (w2 > wh_thr) & (h2 > wh_thr) & (w2 * h2 / (w1 * h1 + 1e-16) > area_thr) & (ar < ar_thr) # candidates |