import cv2
from numpy import random
from collections import deque
import numpy as np
import math
import torch
import torch.backends.cudnn as cudnn
from utils.google_utils import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.general import (
check_img_size, non_max_suppression, apply_classifier, scale_coords, xyxy2xywh, strip_optimizer)
from utils.plots import plot_one_box
from utils.torch_utils import select_device, load_classifier, time_synchronized
from models.models import *
from utils.datasets import *
from utils.general import *
from deep_sort_pytorch.utils.parser import get_config
from deep_sort_pytorch.deep_sort import DeepSort
from byte_track.bytetracker import ByteTrack
from yolov7.yolov7_detector import YOLOv7Detector
def load_classes(path):
# Loads *.names file at 'path'
with open(path, 'r') as f:
names = f.read().split('\n')
return list(filter(None, names)) # filter removes empty strings (such as last line)
global names
names = load_classes('data/coco.names')
colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
data_deque = {}
speed_four_line_queue = {}
object_counter = {}
# line1 = [(250,450), (1000, 450)]
line2 = [(200,500), (1050, 500)]
def xyxy_to_xywh(*xyxy):
"""" Calculates the relative bounding box from absolute pixel values. """
bbox_left = min([xyxy[0].item(), xyxy[2].item()])
bbox_top = min([xyxy[1].item(), xyxy[3].item()])
bbox_w = abs(xyxy[0].item() - xyxy[2].item())
bbox_h = abs(xyxy[1].item() - xyxy[3].item())
x_c = (bbox_left + bbox_w / 2)
y_c = (bbox_top + bbox_h / 2)
w = bbox_w
h = bbox_h
return x_c, y_c, w, h
def xyxy_to_tlwh(bbox_xyxy):
tlwh_bboxs = []
for i, box in enumerate(bbox_xyxy):
x1, y1, x2, y2 = [int(i) for i in box]
top = x1
left = y1
w = int(x2 - x1)
h = int(y2 - y1)
tlwh_obj = [top, left, w, h]
tlwh_bboxs.append(tlwh_obj)
return tlwh_bboxs
def compute_color_for_labels(label):
"""
Simple function that adds fixed color depending on the class
"""
if label == 0: #person #BGR
color = (85,45,255)
elif label == 2: # Car
color = (222,82,175)
elif label == 3: # Motobike
color = (0, 204, 255)
elif label == 5: # Bus
color = (0, 149, 255)
else:
color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
return tuple(color)
def draw_border(img, pt1, pt2, color, thickness, r, d):
x1,y1 = pt1
x2,y2 = pt2
# Top left
cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness)
cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness)
cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness)
# Top right
cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness)
cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness)
cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness)
# Bottom left
cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness)
cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness)
cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness)
# Bottom right
cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness)
cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness)
cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness)
cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA)
cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA)
cv2.circle(img, (x1 +r, y1+r), 2, color, 12)
cv2.circle(img, (x2 -r, y1+r), 2, color, 12)
cv2.circle(img, (x1 +r, y2-r), 2, color, 12)
cv2.circle(img, (x2 -r, y2-r), 2, color, 12)
return img
def UI_box(x, img, color=None, label=None, line_thickness=None):
# Plots one bounding box on image img
tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
color = color or [random.randint(0, 255) for _ in range(3)]
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
# cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
if label:
tf = max(tl - 1, 1) # font thickness
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
# c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
img = draw_border(img, (c1[0], c1[1] - t_size[1] -3), (c1[0] + t_size[0], c1[1]+3), color, 1, 8, 2)
# cv2.line(img, c1, c2, color, 30)
# cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled
cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
def estimateSpeed(location1, location2):
d_pixels = math.sqrt(math.pow(location2[0] - location1[0], 2) + math.pow(location2[1] - location1[1], 2))
ppm = 8 #Pixels per Meter
d_meters = d_pixels / ppm
time_constant = 15 * 3.6
speed = d_meters * time_constant
return speed
# Return true if line segments AB and CD intersect
def intersect(A,B,C,D):
return ccw(A,C,D) != ccw(B,C,D) and ccw(A,B,C) != ccw(A,B,D)
def ccw(A,B,C):
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])
def draw_boxes(img, bbox, object_id, identities=None, offset=(0, 0)):
# cv2.line(img, line2[0], line2[1], (0,200,0), 3)
height, width, _ = img.shape
# remove tracked point from buffer if object is lost
for key in list(data_deque):
if key not in identities:
data_deque.pop(key)
for i, box in enumerate(bbox):
x1, y1 = int(box[0]), int(box[1])
x2, y2 = x1 + int(box[2]), y1 + int(box[3])
box=[x1, y1, x2, y2]
# x1, y1, x2, y2 = [int(i) for i in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# box_area = (x2-x1) * (y2-y1)
box_height = (y2-y1)
# code to find center of bottom edge
center = (int((x2+x1)/ 2), int((y2+y2)/2))
# get ID of object
id = int(identities[i]) if identities is not None else 0
# create new buffer for new object
if id not in data_deque:
data_deque[id] = deque(maxlen= 64)
speed_four_line_queue[id] = []
color = compute_color_for_labels(int(object_id[i]))
obj_name = names[int(object_id[i])]
label = '%s' % (obj_name)
# add center to buffer
data_deque[id].appendleft(center)
# print("id ", id)
# print("data_deque[id] ", data_deque[id])
if len(data_deque[id]) >= 2:
# print("data_deque[id][i-1]", data_deque[id][1], data_deque[id][0])
if intersect(data_deque[id][0], data_deque[id][1], line2[0], line2[1]):# or intersect(data_deque[id][0], data_deque[id][1], line1[0], line1[1]) or intersect(data_deque[id][0], data_deque[id][1], line3[0], line3[1]) or intersect(data_deque[id][0], data_deque[id][1], line4[0], line4[1]) :
# cv2.line(img, line2[0], line2[1], (0,100,0), 3)
obj_speed = estimateSpeed(data_deque[id][1], data_deque[id][0])
speed_four_line_queue[id].append(obj_speed)
if obj_name not in object_counter:
object_counter[obj_name] = 1
else:
object_counter[obj_name] += 1
try:
label = label + " " + str(sum(speed_four_line_queue[id])//len(speed_four_line_queue[id]))
except :
pass
UI_box(box, img, label=label, color=color, line_thickness=2)
# draw trail
for i in range(1, len(data_deque[id])):
# check if on buffer value is none
if data_deque[id][i - 1] is None or data_deque[id][i] is None:
continue
# generate dynamic thickness of trails
thickness = int(np.sqrt(64 / float(i + i)) * 1.5)
# draw trails
cv2.line(img, data_deque[id][i - 1], data_deque[id][i], color, thickness)
count = 0
for idx, (key, value) in enumerate(object_counter.items()):
# print(idx, key, value)
cnt_str = str(key) + ": " + str(value)
cv2.line(img, (width - 150 ,25+ (idx*40)), (width,25 + (idx*40)), [85,45,255], 30)
cv2.putText(img, cnt_str, (width - 150, 35 + (idx*40)), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA)
count += value
return img, count
def load_yolov7_and_process_each_frame(model, vid_name, enable_GPU, save_video, confidence, assigned_class_id, kpi1_text, kpi2_text, kpi3_text, stframe):
data_deque.clear()
speed_four_line_queue.clear()
object_counter.clear()
if model == 'yolov7':
weights = 'yolov7/weights/yolov7.onnx'
elif model == 'yolov7-tiny':
weights = 'yolov7/weights/yolov7-tiny.onnx'
else:
print('Model Not Found!')
exit()
detector = YOLOv7Detector(weights=weights, use_cuda=enable_GPU, use_onnx=True)
tracker = ByteTrack(detector)
# dataset = LoadImages(vid_name, img_size=1280, auto_size=64)
vdo = cv2.VideoCapture(vid_name)
results = []
start = time.time()
count = 0
frame_id = 0
prevTime = 0
fourcc = 'mp4v' # output video codec
fps = vdo.get(cv2.CAP_PROP_FPS)
w = int(vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))
vid_writer = cv2.VideoWriter('inference/output/results.mp4', cv2.VideoWriter_fourcc(*fourcc), fps, (w, h))
while vdo.isOpened():
# for path, img, im0s, vid_cap in dataset:
curr_time = time.time()
frame_id +=1
_, img = vdo.read()
if _ == False:
break
# img = cv2.cvtColor(ori_im, cv2.COLOR_BGR2RGB)
# img, count, obj_ids = tracker.inference(img, conf_thresh=confidence, classes=assigned_class_id)
bboxes, ids, scores, obj_ids = tracker.inference(img, conf_thresh=confidence, classes=assigned_class_id)
# print(bboxes[0].shape if len(bboxes)>0 else None)
img, count = draw_boxes(img, bboxes, obj_ids, identities=ids)
currTime = time.time()
fps = 1 / (currTime - prevTime)
prevTime = currTime
# Save results (image with detections)
cv2.line(img, (20,25), (127,25), [85,45,255], 30)
cv2.putText(img, f'FPS: {int(fps)}', (11, 35), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA)
if save_video:
vid_writer.write(img)
kpi1_text.write(f"
{fps:.1f}
", unsafe_allow_html=True)
kpi2_text.write(f"{len(data_deque)}
", unsafe_allow_html=True)
kpi3_text.write(f"{count}
", unsafe_allow_html=True)
# if frame_id%3==0:
# stframe.image(img, channels = 'BGR',use_column_width=True)
stframe.image(img, channels = 'BGR',use_column_width=True)
end = time.time()
print('Done. (%.3fs)' % (end - start))
cv2.destroyAllWindows()
vdo.release()
vid_writer.release()
def load_yolor_and_process_each_frame(vid_name, enable_GPU, confidence, assigned_class_id, kpi1_text, kpi2_text, kpi3_text, stframe):
data_deque.clear()
speed_four_line_queue.clear()
object_counter.clear()
out, source, weights, save_txt, imgsz, cfg = \
'inference/output', vid_name, 'yolor_p6.pt', False, 1280, 'cfg/yolor_p6.cfg'
#webcam = source == '0' or source.startswith('rtsp') or source.startswith('http') or source.endswith('.txt')
webcam = source == 0 or source.startswith('rtsp') or source.startswith('http') or source.endswith('.txt')
# initialize deepsort
cfg_deep = get_config()
cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")
# attempt_download("deep_sort_pytorch/deep_sort/deep/checkpoint/ckpt.t7", repo='mikel-brostrom/Yolov5_DeepSort_Pytorch')
deepsort = DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,
use_cuda=True)
# Initialize GPU
if enable_GPU:
device = select_device('gpu')
else:
device = select_device('cpu')
if os.path.exists(out):
shutil.rmtree(out) # delete output folder
os.makedirs(out) # make new output folder
half = device.type != 'cpu' # half precision only supported on CUDA
# Load model
model = Darknet(cfg, imgsz)#.cuda()
model.load_state_dict(torch.load(weights, map_location=device)['model'])
model.to(device).eval()
if half:
model.half() # to FP16
# Second-stage classifier
classify = False
if classify:
modelc = load_classifier(name='resnet101', n=2) # initialize
modelc.load_state_dict(torch.load('weights/resnet101.pt', map_location=device)['model']) # load weights
modelc.to(device).eval()
# Set Dataloader
vid_path, vid_writer = None, None
if webcam:
save_img = True
print("HEREHERER")
# cudnn.benchmark = True # set True to speed up constant image size inference
# dataset = LoadStreams(source, img_size=imgsz)
else:
save_img = True
dataset = LoadImages(source, img_size=imgsz, auto_size=64)
# Run inference
t0 = time.time()
img = torch.zeros((1, 3, imgsz, imgsz), device=device) # init img
_ = model(img.half() if half else img) if device.type != 'cpu' else None # run once
prevTime = 0
count = 0
if webcam: # code for only webcam
vid = cv2.VideoCapture(0)
while vid.isOpened():
ret, img = vid.read()
if not ret:
continue
im0s = img.copy()
print(im0s.shape)
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to bsx3x416x416
print(img.shape)
img = torch.from_numpy(img.copy()).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
print(img.shape)
# Inference
t1 = time_synchronized()
pred = model(img)[0]
# Apply NMS
pred = non_max_suppression(pred, confidence, 0.5, classes=assigned_class_id, agnostic=False)
t2 = time_synchronized()
# Apply Classifier
if classify:
pred = apply_classifier(pred, modelc, img, im0s)
print("HERE")
# Process detections
for i, det in enumerate(pred): # detections per image
p, s, im0 = "webcam_out.mp4", '', im0s
# save_path = str(Path(out) / Path(p).name)
# txt_path = str(Path(out) / Path(p).stem) + ('_%g' % dataset.frame if dataset.mode == 'video' else '')
s += '%gx%g ' % img.shape[2:] # print string
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
s += '%g %ss, ' % (n, names[int(c)]) # add to string
xywh_bboxs = []
confs = []
oids = []
# Write results
for *xyxy, conf, cls in det:
# to deep sort format
x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy)
xywh_obj = [x_c, y_c, bbox_w, bbox_h]
xywh_bboxs.append(xywh_obj)
confs.append([conf.item()])
oids.append(int(cls))
# if save_txt: # Write to file
# xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh
# with open(txt_path + '.txt', 'a') as f:
# f.write(('%g ' * 5 + '\n') % (cls, *xywh)) # label format
xywhs = torch.Tensor(xywh_bboxs)
confss = torch.Tensor(confs)
outputs = deepsort.update(xywhs, confss, oids, im0)
if len(outputs) > 0:
bbox_xyxy = outputs[:, :4]
identities = outputs[:, -2]
object_id = outputs[:, -1]
im0, count = draw_boxes(im0, bbox_xyxy, object_id,identities)
# Print time (inference + NMS)
print('%sDone. (%.3fs)' % (s, t2 - t1))
currTime = time.time()
fps = 1 / (currTime - prevTime)
prevTime = currTime
cv2.line(im0, (20,25), (127,25), [85,45,255], 30)
cv2.putText(im0, f'FPS: {int(fps)}', (11, 35), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA)
kpi1_text.write(f"{'{:.1f}'.format(fps)}
", unsafe_allow_html=True)
# # Save results (image with detections)
# if save_img:
# if dataset.mode == 'images':
# cv2.imwrite(save_path, im0)
# else:
# if vid_path != save_path: # new video
# vid_path = save_path
# if isinstance(vid_writer, cv2.VideoWriter):
# vid_writer.release() # release previous video writer
# fourcc = 'mp4v' # output video codec
# fps = vid_cap.get(cv2.CAP_PROP_FPS)
# w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*fourcc), fps, (w, h))
# vid_writer.write(im0)
# data_deque assign inside yolor.py
kpi2_text.write(f"{len(data_deque)}
", unsafe_allow_html=True)
kpi3_text.write(f"{count}
", unsafe_allow_html=True)
stframe.image(im0,channels = 'BGR',use_column_width=True)
else: # without webcam
for path, img, im0s, vid_cap in dataset:
# print(path)
# print(img.shape)
# print(im0s.shape)
# print(vid_cap)
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Inference
t1 = time_synchronized()
print(img.shape)
pred = model(img)[0]
# Apply NMS
pred = non_max_suppression(pred, confidence, 0.5, classes=assigned_class_id, agnostic=False)
t2 = time_synchronized()
# Apply Classifier
if classify:
pred = apply_classifier(pred, modelc, img, im0s)
# Process detections
for i, det in enumerate(pred): # detections per image
if webcam: # batch_size >= 1
p, s, im0 = path[i], '%g: ' % i, im0s[i].copy()
else:
p, s, im0 = path, '', im0s
save_path = str(Path(out) / Path(p).name)
txt_path = str(Path(out) / Path(p).stem) + ('_%g' % dataset.frame if dataset.mode == 'video' else '')
s += '%gx%g ' % img.shape[2:] # print string
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
s += '%g %ss, ' % (n, names[int(c)]) # add to string
xywh_bboxs = []
confs = []
oids = []
# Write results
for *xyxy, conf, cls in det:
# to deep sort format
x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy)
xywh_obj = [x_c, y_c, bbox_w, bbox_h]
xywh_bboxs.append(xywh_obj)
confs.append([conf.item()])
oids.append(int(cls))
if save_txt: # Write to file
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh
with open(txt_path + '.txt', 'a') as f:
f.write(('%g ' * 5 + '\n') % (cls, *xywh)) # label format
xywhs = torch.Tensor(xywh_bboxs)
confss = torch.Tensor(confs)
outputs = deepsort.update(xywhs, confss, oids, im0)
if len(outputs) > 0:
bbox_xyxy = outputs[:, :4]
identities = outputs[:, -2]
object_id = outputs[:, -1]
im0, count = draw_boxes(im0, bbox_xyxy, object_id,identities)
# Print time (inference + NMS)
print('%sDone. (%.3fs)' % (s, t2 - t1))
currTime = time.time()
fps = 1 / (currTime - prevTime)
prevTime = currTime
cv2.line(im0, (20,25), (127,25), [85,45,255], 30)
cv2.putText(im0, f'FPS: {int(fps)}', (11, 35), 0, 1, [225, 255, 255], thickness=2, lineType=cv2.LINE_AA)
kpi1_text.write(f"{'{:.1f}'.format(fps)}
", unsafe_allow_html=True)
# Save results (image with detections)
if save_img:
if dataset.mode == 'images':
cv2.imwrite(save_path, im0)
else:
if vid_path != save_path: # new video
vid_path = save_path
if isinstance(vid_writer, cv2.VideoWriter):
vid_writer.release() # release previous video writer
fourcc = 'mp4v' # output video codec
fps = vid_cap.get(cv2.CAP_PROP_FPS)
w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*fourcc), fps, (w, h))
vid_writer.write(im0)
# data_deque assign inside yolor.py
kpi2_text.write(f"{len(data_deque)}
", unsafe_allow_html=True)
kpi3_text.write(f"{count}
", unsafe_allow_html=True)
stframe.image(im0,channels = 'BGR',use_column_width=True)
if save_txt or save_img:
print('Results saved to %s' % Path(out))
if platform == 'darwin': # MacOS
os.system('open ' + save_path)
print('Done. (%.3fs)' % (time.time() - t0))
cv2.destroyAllWindows()
vid.release()