|
from pathlib import Path |
|
import math |
|
|
|
from rich.console import Console |
|
from rich.table import Table |
|
from rich.pretty import Pretty |
|
|
|
import numpy as np |
|
|
|
import pandas as pd |
|
|
|
import cv2 |
|
|
|
from sklearn.cluster import MeanShift |
|
|
|
from skimage.transform import hough_circle, hough_circle_peaks |
|
|
|
|
|
import torch |
|
from torch.utils.data import Dataset, DataLoader |
|
from torchvision import transforms |
|
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor |
|
|
|
from torchvision.models.detection import ( |
|
fasterrcnn_resnet50_fpn_v2, |
|
FasterRCNN_ResNet50_FPN_V2_Weights, |
|
) |
|
|
|
import pytorch_lightning as pl |
|
from pytorch_lightning.callbacks import RichProgressBar |
|
from pytorch_lightning import Trainer |
|
|
|
import albumentations as A |
|
from albumentations.pytorch.transforms import ToTensorV2 |
|
|
|
import matplotlib.pyplot as plt |
|
|
|
import com_const as cc |
|
import com_image as ci |
|
|
|
g_device = ( |
|
"mps" |
|
if torch.backends.mps.is_built() is True |
|
else "cuda" if torch.backends.cuda.is_built() else "cpu" |
|
) |
|
|
|
|
|
def load_tray_image(image_name): |
|
return ci.load_image( |
|
file_name=image_name, path_to_images=cc.path_to_plates, rgb=True |
|
) |
|
|
|
|
|
def build_albumentations( |
|
image_size: int = 10, |
|
gamma=(60, 180), |
|
mean=(0.485, 0.456, 0.406), |
|
std=(0.229, 0.224, 0.225), |
|
): |
|
return { |
|
"resize": [ |
|
A.Resize(height=image_size * 32 * 2, width=image_size * 32 * 3, p=1) |
|
], |
|
"train": [ |
|
A.HorizontalFlip(p=0.3), |
|
A.RandomBrightnessContrast( |
|
brightness_limit=0.25, contrast_limit=0.25, p=0.5 |
|
), |
|
A.RandomGamma(gamma_limit=gamma, p=0.5), |
|
], |
|
"to_tensor": [A.Normalize(mean=mean, std=std, p=1), ToTensorV2()], |
|
"un_normalize": [ |
|
A.Normalize( |
|
mean=[-m / s for m, s in zip(mean, std)], |
|
std=[1.0 / s for s in std], |
|
always_apply=True, |
|
max_pixel_value=1.0, |
|
), |
|
], |
|
} |
|
|
|
|
|
def get_augmentations( |
|
image_size: int = 10, |
|
gamma=(60, 180), |
|
kinds: list = ["resize", "to_tensor"], |
|
mean=(0.485, 0.456, 0.406), |
|
std=(0.229, 0.224, 0.225), |
|
inferrence: bool = False, |
|
): |
|
td_ = build_albumentations( |
|
image_size=image_size, |
|
gamma=gamma, |
|
mean=mean, |
|
std=std, |
|
) |
|
augs = [] |
|
for k in kinds: |
|
augs += td_[k] |
|
if inferrence is True: |
|
return A.Compose(augs) |
|
else: |
|
return A.Compose( |
|
augs, |
|
bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, |
|
) |
|
|
|
|
|
def safe_row_col(row, col): |
|
"""Ensures that row is a string and col is an integer |
|
Args: |
|
row (int or str): row output must be string |
|
col (int or str): col output must be int |
|
""" |
|
if row is not None and col is not None: |
|
if isinstance(col, str): |
|
row, col = col, row |
|
return row, col |
|
|
|
|
|
def _update_axis(axis, image, title=None, fontsize=10, remove_axis=True): |
|
axis.imshow(image, origin="upper") |
|
if title is not None: |
|
axis.set_title(title, fontsize=fontsize) |
|
|
|
|
|
def make_patches_grid(images, row_count, col_count=None, figsize=(20, 20)): |
|
col_count = row_count if col_count is None else col_count |
|
_, axii = plt.subplots(row_count, col_count, figsize=figsize) |
|
for ax, image in zip(axii.reshape(-1), images): |
|
if isinstance(image, tuple): |
|
title = image[1] |
|
image = image[0] |
|
else: |
|
title = None |
|
try: |
|
_update_axis(axis=ax, image=image, remove_axis=True, title=title) |
|
except: |
|
pass |
|
ax.set_axis_off() |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
|
|
def print_boxes( |
|
image_name, |
|
boxes, |
|
highlight=(None, None), |
|
draw_first_line: bool = False, |
|
return_plot: bool = True, |
|
): |
|
r, c = safe_row_col(*highlight) |
|
image = load_tray_image(image_name=image_name) |
|
|
|
fnt = cv2.FONT_HERSHEY_SIMPLEX |
|
fnt_scale = 3 |
|
fnt_thickness = 8 |
|
|
|
column_colors = { |
|
1: (255, 0, 0), |
|
2: (0, 0, 255), |
|
3: (255, 255, 0), |
|
4: (0, 255, 255), |
|
} |
|
|
|
for box in boxes[["x1", "y1", "x2", "y2", "cx", "cy", "row", "col"]].values: |
|
color = ( |
|
(255, 0, 255) |
|
if c == box[7] and r == box[6] |
|
else column_colors.get(box[7], (255, 255, 244)) |
|
) |
|
thickness = 20 if c == box[7] and r == box[6] else 10 |
|
image = cv2.rectangle( |
|
image, |
|
(int(box[0]), int(box[1])), |
|
(int(box[2]), int(box[3])), |
|
color, |
|
thickness, |
|
) |
|
label = str(box[6]).upper() + str(int(box[7])) |
|
(w, h), _ = cv2.getTextSize(label, fnt, fnt_scale, fnt_thickness) |
|
x, y = (int(box[0]), int(box[1]) - fnt_thickness) |
|
image = cv2.rectangle( |
|
image, |
|
(x - fnt_thickness, y - h - fnt_thickness), |
|
(x + fnt_thickness + w, y + fnt_thickness), |
|
color, |
|
-1, |
|
) |
|
image = cv2.putText( |
|
image, |
|
label, |
|
(x + fnt_thickness, y), |
|
fnt, |
|
fnt_scale, |
|
(0, 0, 0), |
|
fnt_thickness, |
|
) |
|
|
|
if draw_first_line is True: |
|
line = get_first_vert_line(image_name=image_name) |
|
if line is not None: |
|
x1, y1, x2, y2 = line |
|
cv2.line( |
|
image, |
|
[ |
|
int(i) |
|
for i in (np.array([x2, y2]) - np.array([x1, y1])) * 10 |
|
+ np.array([x1, y1]) |
|
], |
|
[ |
|
int(i) |
|
for i in (np.array([x1, y1]) - np.array([x2, y2])) * 10 |
|
+ np.array([x2, y2]) |
|
], |
|
(255, 0, 255), |
|
20, |
|
lineType=8, |
|
) |
|
|
|
if return_plot is True: |
|
plt.figure(figsize=(10, 10)) |
|
plt.imshow(image) |
|
plt.tight_layout() |
|
plt.axis("off") |
|
plt.show() |
|
else: |
|
return image |
|
|
|
|
|
def crop_to_vert(image): |
|
return image[0 : image.shape[1] // 2, 0 : image.shape[0] // 3] |
|
|
|
|
|
def get_first_vert_line(image_name, min_angle=80, max_angle=100): |
|
r, *_ = cv2.split(load_tray_image(image_name)) |
|
|
|
red_crop = cv2.normalize( |
|
crop_to_vert(r), |
|
None, |
|
alpha=0, |
|
beta=200, |
|
norm_type=cv2.NORM_MINMAX, |
|
) |
|
|
|
lines = cv2.HoughLinesP( |
|
image=ci.close( |
|
cv2.Canny(red_crop, 50, 200, None, 3), |
|
kernel_size=5, |
|
proc_times=5, |
|
), |
|
rho=1, |
|
theta=np.pi / 180, |
|
threshold=50, |
|
minLineLength=red_crop.shape[0] // 5, |
|
maxLineGap=20, |
|
) |
|
if lines is not None: |
|
min_x = red_crop.shape[0] |
|
sel_line = None |
|
for _, line in enumerate(lines): |
|
x1, y1, x2, y2 = line[0] |
|
min_angle, max_angle = min(min_angle, max_angle), max(min_angle, max_angle) |
|
line_angle = math.atan2(y2 - y1, x2 - x1) * 180 / math.pi * -1 |
|
if min_angle <= abs(line_angle) <= max_angle and min(x1, x2) < min_x: |
|
min_x = min(x1, x2) |
|
sel_line = (x1, y1, x2, y2) |
|
|
|
if sel_line is not None: |
|
return sel_line |
|
else: |
|
return None |
|
|
|
|
|
def draw_first_line(image_name, dot_size=10, crop_canvas: bool = False): |
|
line = get_first_vert_line(image_name=image_name) |
|
if line is None: |
|
return canvas |
|
x1, y1, x2, y2 = line |
|
canvas = load_tray_image(image_name) |
|
if crop_canvas is True: |
|
canvas = crop_to_vert(canvas) |
|
cv2.circle(canvas, (x1, y1), dot_size, (255, 0, 0)) |
|
cv2.circle(canvas, (x2, y2), dot_size, (0, 255, 0)) |
|
cv2.line(canvas, (x1, y1), (x2, y2), (0, 0, 255), 10) |
|
return canvas |
|
|
|
|
|
def get_bbox(image_name, bboxes, row, col): |
|
if isinstance(bboxes, pd.Series): |
|
return bboxes |
|
else: |
|
row, col = safe_row_col(row, col) |
|
return bboxes[ |
|
( |
|
bboxes.file_name |
|
== (image_name.name if isinstance(image_name, Path) else image_name) |
|
) |
|
& (bboxes.row == row) |
|
& (bboxes.col == col) |
|
].iloc[0] |
|
|
|
|
|
def get_hough_leaf_disc_circle( |
|
image_name, |
|
bboxes, |
|
row=-1, |
|
col=-1, |
|
padding: int = 10, |
|
allow_move: bool = False, |
|
): |
|
padded_leaf_disk = get_leaf_disk_wbb( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
) |
|
*_, b = cv2.split(padded_leaf_disk) |
|
|
|
min_t, max_t = 100, 200 |
|
rb = cv2.Canny( |
|
cv2.normalize( |
|
b, |
|
None, |
|
alpha=0, |
|
beta=200, |
|
norm_type=cv2.NORM_MINMAX, |
|
), |
|
min_t, |
|
max_t, |
|
None, |
|
3, |
|
) |
|
|
|
bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
|
hough_radii = np.arange(bbox.max_size // 2 - 10, bbox.max_size // 2 + 10, 10) |
|
hough_res = hough_circle(rb, hough_radii) |
|
|
|
|
|
_, cx, cy, radii = hough_circle_peaks( |
|
hough_res, |
|
hough_radii, |
|
min_xdistance=10, |
|
min_ydistance=10, |
|
total_num_peaks=1, |
|
) |
|
|
|
cx = cx[0] |
|
cy = cy[0] |
|
r = radii[0] |
|
|
|
if allow_move is True: |
|
h, w, c = padded_leaf_disk.shape |
|
if cx - r < 0: |
|
cx += abs(r - cx) |
|
if cx + r > w: |
|
cx -= abs(r - cx) |
|
if cy - r < 0: |
|
cy += abs(cy - r) |
|
if cy + r > h: |
|
cy -= abs(cy - r) |
|
|
|
return dict(cx=cx, cy=cy, r=radii) |
|
|
|
|
|
def get_hough_leaf_disk_patch( |
|
image_name, |
|
bboxes, |
|
patch_size=-1, |
|
row=-1, |
|
col=-1, |
|
padding: int = 10, |
|
radius_crop=0, |
|
disc=None, |
|
allow_move: bool = False, |
|
image_folder=None, |
|
): |
|
if patch_size > 0: |
|
try: |
|
bbox = get_bbox(image_name, bboxes, row, col) |
|
cx = int(bbox.cx) |
|
cy = int(bbox.cy) |
|
except: |
|
return None |
|
patch_size = patch_size // 2 |
|
|
|
return A.crop( |
|
load_tray_image(image_name, image_folder=image_folder), |
|
cx - patch_size, |
|
cy - patch_size, |
|
cx + patch_size, |
|
cy + patch_size, |
|
) |
|
else: |
|
if disc is None: |
|
disc = get_hough_leaf_disc_circle( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
allow_move=allow_move, |
|
) |
|
|
|
r = int((disc["r"] - radius_crop) / math.sqrt(2)) |
|
cx = int(disc["cx"]) |
|
cy = int(disc["cy"]) |
|
|
|
left = cx - r |
|
top = cy - r |
|
right = cx + r |
|
bottom = cy + r |
|
|
|
return get_leaf_disk_wbb( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
)[top:bottom, left:right] |
|
|
|
|
|
def get_hough_segment_disk( |
|
image_name, |
|
bboxes, |
|
row=-1, |
|
col=-1, |
|
padding: int = 10, |
|
radius_crop=0, |
|
disc=None, |
|
allow_move: bool = False, |
|
): |
|
if disc is None: |
|
disc = get_hough_leaf_disc_circle( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
allow_move=allow_move, |
|
) |
|
|
|
padded_leaf_disk = get_leaf_disk_wbb( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
) |
|
r = int(disc["r"] - radius_crop) |
|
rc = int((disc["r"] - radius_crop) / math.sqrt(2)) |
|
cx = int(disc["cx"]) |
|
cy = int(disc["cy"]) |
|
left = cx - r |
|
top = cy - r |
|
right = cx + r |
|
bottom = cy + r |
|
|
|
return cv2.bitwise_and( |
|
padded_leaf_disk, |
|
padded_leaf_disk, |
|
mask=cv2.circle(np.zeros_like(padded_leaf_disk[:, :, 0]), (cx, cy), r, 255, -1), |
|
)[top:bottom, left:right] |
|
|
|
|
|
def draw_hough_bb_to_patch_process( |
|
image_name, |
|
bboxes, |
|
row=-1, |
|
col=-1, |
|
padding: int = 10, |
|
radius_crop=0, |
|
disc=None, |
|
allow_move: bool = False, |
|
): |
|
if disc is None: |
|
disc = get_hough_leaf_disc_circle( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
allow_move=allow_move, |
|
) |
|
|
|
padded_leaf_disk = get_leaf_disk_wbb( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
padding=padding, |
|
) |
|
r = int(disc["r"] - radius_crop) |
|
rc = int((disc["r"] - radius_crop) / math.sqrt(2)) |
|
cx = int(disc["cx"]) |
|
cy = int(disc["cy"]) |
|
left = cx - r |
|
top = cy - r |
|
right = cx + r |
|
bottom = cy + r |
|
|
|
return cv2.circle( |
|
cv2.circle( |
|
cv2.rectangle( |
|
cv2.rectangle( |
|
padded_leaf_disk, |
|
(cx - rc, cy - rc), |
|
(cx + rc, cy + rc), |
|
(0, 255, 0), |
|
5, |
|
), |
|
(left, top), |
|
(right, bottom), |
|
(255, 0, 155), |
|
5, |
|
), |
|
(cx, cy), |
|
10, |
|
(255, 0, 155), |
|
-1, |
|
), |
|
(cx, cy), |
|
r, |
|
(255, 0, 155), |
|
5, |
|
) |
|
|
|
|
|
def get_leaf_disk_wbb(image_name, bboxes, row=-1, col=-1, image_path: Path = None): |
|
try: |
|
bbox = get_bbox(image_name, bboxes, row, col) |
|
return load_tray_image(image_name if image_path is None else image_path)[ |
|
int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2) |
|
] |
|
except: |
|
return None |
|
|
|
|
|
def get_fast_leaf_disc_circle( |
|
image_name, bboxes, row=-1, col=-1, percent_radius: float = 1.0 |
|
): |
|
bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
|
return int(bbox.cx), int(bbox.cy), int((bbox.max_size / 2) * percent_radius) |
|
|
|
|
|
def get_fast_segment_disk( |
|
image_name, |
|
bboxes, |
|
row=-1, |
|
col=-1, |
|
percent_radius: float = 1.0, |
|
image_path: Path = None, |
|
): |
|
cx, cy, r = get_fast_leaf_disc_circle( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
percent_radius=percent_radius, |
|
) |
|
src_image = load_tray_image(image_name if image_path is None else image_path) |
|
left = cx - r |
|
top = cy - r |
|
right = cx + r |
|
bottom = cy + r |
|
|
|
return cv2.bitwise_and( |
|
src_image, |
|
src_image, |
|
mask=cv2.circle(np.zeros_like(src_image[:, :, 0]), (cx, cy), r, 255, -1), |
|
)[top:bottom, left:right] |
|
|
|
|
|
def get_fast_leaf_disk_patch( |
|
image_name, |
|
bboxes, |
|
row=-1, |
|
col=-1, |
|
percent_radius: float = 1.0, |
|
image_path: Path = None, |
|
): |
|
cx, cy, r = get_fast_leaf_disc_circle( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
percent_radius=percent_radius, |
|
) |
|
r = int(r / math.sqrt(2)) |
|
left = cx - r |
|
top = cy - r |
|
right = cx + r |
|
bottom = cy + r |
|
|
|
return load_tray_image(image_name if image_path is None else image_path)[ |
|
top:bottom, left:right |
|
] |
|
|
|
|
|
def draw_fast_bb_to_patch_process( |
|
image_name, |
|
bboxes, |
|
row=-1, |
|
col=-1, |
|
percent_radius: float = 1.0, |
|
image_path: Path = None, |
|
add_center: bool = True, |
|
): |
|
cx, cy, r = get_fast_leaf_disc_circle( |
|
image_name=image_name, |
|
bboxes=bboxes, |
|
row=row, |
|
col=col, |
|
percent_radius=percent_radius, |
|
) |
|
bbox = get_bbox(image_name=image_name, bboxes=bboxes, row=row, col=col) |
|
image = load_tray_image(image_name if image_path is None else image_path) |
|
rc = int(r / math.sqrt(2)) |
|
|
|
cv2.circle(image, (cx, cy), r, color=(255, 0, 155), thickness=5) |
|
if add_center is True: |
|
cv2.circle(image, (cx, cy), 10, color=(255, 0, 155), thickness=-1) |
|
cv2.rectangle(image, (cx - rc, cy - rc), (cx + rc, cy + rc), (0, 255, 0), 5) |
|
|
|
return image[int(bbox.y1) : int(bbox.y2), int(bbox.x1) : int(bbox.x2)] |
|
|
|
|
|
class LeafDiskDetectorDataset(Dataset): |
|
def __init__( |
|
self, |
|
csv, |
|
transform=None, |
|
yxyx: bool = False, |
|
return_id: bool = False, |
|
bboxes: bool = True, |
|
): |
|
self.boxes = csv.copy() |
|
self.images = list(self.boxes.plate_name.unique()) |
|
self.transforms = transform |
|
if transform is not None: |
|
self.width, self.height = transform[0].width, transform[0].height |
|
else: |
|
self.width, self.height = 0, 0 |
|
self.yxyx = yxyx |
|
self.return_id = return_id |
|
self.bboxes = bboxes |
|
|
|
def __len__(self): |
|
return len(self.images) |
|
|
|
def load_boxes(self, idx): |
|
if "x" in self.boxes.columns: |
|
boxes = self.boxes[self.boxes.plate_name == self.images[idx]].dropna() |
|
size = boxes.shape[0] |
|
return ( |
|
(size, boxes[["x1", "y1", "x2", "y2"]].values) if size > 0 else (0, []) |
|
) |
|
return 0, [] |
|
|
|
def load_tray_image(self, idx): |
|
return load_tray_image(self.images[idx]) |
|
|
|
def get_by_sample_name(self, plate_name): |
|
return self[self.images.index(plate_name)] |
|
|
|
def get_image_by_name(self, plate_name): |
|
return load_tray_image(plate_name) |
|
|
|
def draw_image_with_boxes(self, plate_name): |
|
image, labels, *_ = self[self.images.index(plate_name)] |
|
boxes = labels[self.get_boxes_key()] |
|
for box in boxes: |
|
box_indexes = [1, 0, 3, 2] if self.yxyx is True else [0, 1, 2, 3] |
|
image = cv2.rectangle( |
|
image, |
|
|
|
(int(box[box_indexes[0]]), int(box[box_indexes[1]])), |
|
(int(box[box_indexes[2]]), int(box[box_indexes[3]])), |
|
(255, 0, 0), |
|
2, |
|
) |
|
return image |
|
|
|
def get_boxes_key(self): |
|
return "bboxes" if self.bboxes is True else "boxes" |
|
|
|
def __getitem__(self, index): |
|
num_box, boxes = self.load_boxes( |
|
index |
|
) |
|
img = self.load_tray_image(index) |
|
|
|
if num_box > 0: |
|
boxes = torch.as_tensor(boxes, dtype=torch.float32) |
|
else: |
|
|
|
boxes = torch.zeros((0, 4), dtype=torch.float32) |
|
|
|
image_id = torch.tensor([index]) |
|
labels = torch.ones((num_box,), dtype=torch.int64) |
|
target = { |
|
self.get_boxes_key(): boxes, |
|
"labels": labels, |
|
"image_id": image_id, |
|
"area": torch.as_tensor( |
|
(boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]), |
|
dtype=torch.float32, |
|
), |
|
"iscrowd": torch.zeros((num_box,), dtype=torch.int64), |
|
"img_size": torch.tensor([self.height, self.width]), |
|
"img_scale": torch.tensor([1.0]), |
|
} |
|
|
|
if self.transforms is not None: |
|
sample = { |
|
"image": img, |
|
"bboxes": target[self.get_boxes_key()], |
|
"labels": labels, |
|
} |
|
sample = self.transforms(**sample) |
|
img = sample["image"] |
|
if num_box > 0: |
|
|
|
boxes = np.array(sample["bboxes"]) |
|
|
|
if self.yxyx is True: |
|
boxes[:, [0, 1, 2, 3]] = boxes[:, [1, 0, 3, 2]] |
|
|
|
target[self.get_boxes_key()] = torch.as_tensor( |
|
boxes, dtype=torch.float32 |
|
) |
|
else: |
|
target[self.get_boxes_key()] = torch.zeros((0, 4), dtype=torch.float32) |
|
else: |
|
img = transforms.ToTensor()(img) |
|
if self.return_id is True: |
|
return img, target, image_id |
|
else: |
|
return img, target |
|
|
|
|
|
def collate_fn(batch): |
|
images, targets = tuple(zip(*batch)) |
|
images = torch.stack(images) |
|
images = images.float() |
|
|
|
boxes = [target["boxes"].float() for target in targets] |
|
labels = [target["labels"].float() for target in targets] |
|
|
|
return images, targets |
|
|
|
|
|
def find_best_lr(model, default_root_dir=cc.path_to_chk_detector): |
|
|
|
trainer = Trainer( |
|
default_root_dir=default_root_dir, |
|
auto_lr_find=True, |
|
accelerator="gpu", |
|
callbacks=[RichProgressBar()], |
|
) |
|
|
|
|
|
trainer.tune(model) |
|
|
|
return model.learning_rate |
|
|
|
|
|
class LeafDiskDetector(pl.LightningModule): |
|
def __init__( |
|
self, |
|
batch_size: int, |
|
learning_rate: float, |
|
max_epochs: int, |
|
image_factor: int, |
|
train_data: pd.DataFrame, |
|
val_data: pd.DataFrame, |
|
test_data: pd.DataFrame, |
|
augmentations_kinds: list = ["resize", "train", "to_tensor"], |
|
augmentations_params: dict = {"gamma": (60, 180)}, |
|
num_workers: int = 0, |
|
accumulate_grad_batches: int = 3, |
|
selected_device: str = g_device, |
|
optimizer: str = "adam", |
|
scheduler: str = None, |
|
scheduler_params: dict = {}, |
|
): |
|
super().__init__() |
|
|
|
self.model_name = "ldd" |
|
|
|
|
|
self.batch_size = batch_size |
|
self.selected_device = selected_device |
|
self.learning_rate = learning_rate |
|
self.num_workers = num_workers |
|
self.max_epochs = max_epochs |
|
self.accumulate_grad_batches = accumulate_grad_batches |
|
|
|
|
|
self.train_data = train_data |
|
self.val_data = val_data |
|
self.test_data = test_data |
|
|
|
|
|
self.optimizer = optimizer |
|
self.scheduler = scheduler |
|
self.scheduler_params = scheduler_params |
|
|
|
|
|
self.image_factor = image_factor |
|
self.augmentations_kinds = augmentations_kinds |
|
self.augmentations_params = augmentations_params |
|
|
|
self.train_augmentations = get_augmentations( |
|
image_size=self.image_factor, |
|
kinds=self.augmentations_kinds, |
|
**self.augmentations_params, |
|
) |
|
|
|
self.val_augmentations = get_augmentations( |
|
image_size=self.image_factor, |
|
kinds=["resize", "to_tensor"], |
|
**self.augmentations_params, |
|
) |
|
|
|
|
|
self.encoder = fasterrcnn_resnet50_fpn_v2( |
|
weights=FasterRCNN_ResNet50_FPN_V2_Weights |
|
) |
|
num_classes = 2 |
|
|
|
in_features = self.encoder.roi_heads.box_predictor.cls_score.in_features |
|
|
|
self.encoder.roi_heads.box_predictor = FastRCNNPredictor( |
|
in_features, num_classes |
|
) |
|
|
|
self.save_hyperparameters() |
|
|
|
def hr_desc(self): |
|
table = Table(title=f"{self.model_name} params & values") |
|
table.add_column("Param", justify="right", style="bold", no_wrap=True) |
|
table.add_column("Value") |
|
|
|
def add_pairs(table_, attributes: list) -> None: |
|
for a in attributes: |
|
try: |
|
table_.add_row(a, Pretty(getattr(self, a))) |
|
except: |
|
pass |
|
|
|
add_pairs( |
|
table, |
|
["model_name", "batch_size", "num_workers", "accumulate_grad_batches"], |
|
) |
|
table.add_row("image_width", Pretty(self.train_augmentations[0].width)) |
|
table.add_row("image_height", Pretty(self.train_augmentations[0].height)) |
|
add_pairs( |
|
table, |
|
["image_factor", "augmentations_kinds", "augmentations_params"], |
|
) |
|
|
|
add_pairs( |
|
table, |
|
["learning_rate", "optimizer", "scheduler", "scheduler_params"], |
|
) |
|
|
|
for name, df in zip( |
|
["train", "val", "test"], |
|
[self.train_data, self.val_data, self.test_data], |
|
): |
|
table.add_row( |
|
name, |
|
Pretty( |
|
f"shape: {str(df.shape)}, images: {len(df.plate_name.unique())}" |
|
), |
|
) |
|
|
|
Console().print(table) |
|
|
|
def configure_optimizers(self): |
|
|
|
if self.optimizer == "adam": |
|
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) |
|
elif self.optimizer == "sgd": |
|
optimizer = torch.optim.SGD(self.parameters(), lr=self.learning_rate) |
|
else: |
|
optimizer = None |
|
|
|
|
|
if self.scheduler == "cycliclr": |
|
scheduler = torch.optim.lr_scheduler.CyclicLR( |
|
optimizer, |
|
base_lr=self.learning_rate, |
|
max_lr=0.01, |
|
step_size_up=100, |
|
mode=self.scheduler_mode, |
|
) |
|
elif self.scheduler == "steplr": |
|
self.scheduler_params["optimizer"] = optimizer |
|
scheduler = torch.optim.lr_scheduler.StepLR(**self.scheduler_params) |
|
self.scheduler_params.pop("optimizer") |
|
elif self.scheduler == "plateau": |
|
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( |
|
optimizer, |
|
mode="min", |
|
factor=0.2, |
|
patience=10, |
|
min_lr=1e-6, |
|
) |
|
scheduler = {"scheduler": scheduler, "monitor": "val_loss"} |
|
else: |
|
scheduler = None |
|
if scheduler is None: |
|
return optimizer |
|
else: |
|
return [optimizer], [scheduler] |
|
|
|
def train_dataloader(self): |
|
return DataLoader( |
|
LeafDiskDetectorDataset( |
|
csv=self.train_data, |
|
transform=self.train_augmentations, |
|
bboxes=False, |
|
), |
|
batch_size=self.batch_size, |
|
shuffle=True, |
|
num_workers=self.num_workers, |
|
collate_fn=collate_fn, |
|
pin_memory=True, |
|
) |
|
|
|
def val_dataloader(self): |
|
return DataLoader( |
|
LeafDiskDetectorDataset( |
|
csv=self.train_data, |
|
transform=self.val_augmentations, |
|
bboxes=False, |
|
), |
|
batch_size=self.batch_size, |
|
num_workers=self.num_workers, |
|
collate_fn=collate_fn, |
|
pin_memory=True, |
|
) |
|
|
|
def test_dataloader(self): |
|
return DataLoader( |
|
LeafDiskDetectorDataset( |
|
csv=self.train_data, |
|
transform=self.val_augmentations, |
|
bboxes=False, |
|
), |
|
batch_size=self.batch_size, |
|
num_workers=self.num_workers, |
|
collate_fn=collate_fn, |
|
pin_memory=True, |
|
) |
|
|
|
def forward(self, x): |
|
return self.encoder(x) |
|
|
|
def step_(self, batch, batch_index): |
|
x, y = batch |
|
self.train() |
|
loss_dict = self.encoder(x, y) |
|
return sum(loss for loss in loss_dict.values()) |
|
|
|
def training_step(self, batch, batch_idx): |
|
loss = self.step_(batch=batch, batch_index=batch_idx) |
|
self.log( |
|
"train_loss", loss, on_step=True, prog_bar=True, batch_size=self.batch_size |
|
) |
|
return loss |
|
|
|
def validation_step(self, batch, batch_idx): |
|
loss = self.step_(batch=batch, batch_index=batch_idx) |
|
self.log( |
|
"val_loss", |
|
loss, |
|
on_epoch=True, |
|
on_step=False, |
|
prog_bar=True, |
|
batch_size=self.batch_size, |
|
) |
|
self.log("train_loss", loss) |
|
return loss |
|
|
|
def test_step(self, batch, batch_idx): |
|
loss = self.step_( |
|
batch=batch, batch_index=batch_idx, batch_size=self.batch_size |
|
) |
|
self.log("test_loss", loss) |
|
return loss |
|
|
|
def prepare_bboxes( |
|
self, |
|
image_name, |
|
score_threshold=0.90, |
|
ar_threshold=1.5, |
|
size_threshold=0.30, |
|
): |
|
augs = get_augmentations( |
|
image_size=self.image_factor, |
|
kinds=["resize", "to_tensor"], |
|
inferrence=True, |
|
**self.augmentations_params, |
|
) |
|
image = load_tray_image(image_name=image_name) |
|
|
|
self.to(g_device) |
|
self.eval() |
|
predictions = self(augs(image=image)["image"].to(g_device).unsqueeze(0)) |
|
|
|
boxes = predictions[0]["boxes"].detach().to("cpu").numpy() |
|
scores = predictions[0]["scores"].detach().to("cpu").numpy() |
|
|
|
filtered_predictions = [ |
|
[box[i] for i in range(4)] |
|
for box, score in zip(boxes, scores) |
|
if score > score_threshold |
|
] |
|
|
|
restore_size = A.Compose( |
|
[A.Resize(width=image.shape[1], height=image.shape[0])], |
|
|
|
bbox_params={"format": "pascal_voc", "label_fields": ["labels"]}, |
|
) |
|
|
|
sample = { |
|
"image": image, |
|
"bboxes": filtered_predictions, |
|
"labels": [1 for _ in range(len(filtered_predictions))], |
|
} |
|
sample = restore_size(**sample) |
|
|
|
resized_predictions = sample["bboxes"] |
|
|
|
from siuba import _, filter, mutate |
|
|
|
boxes = ( |
|
pd.DataFrame(data=resized_predictions, columns=["x1", "y1", "x2", "y2"]) |
|
>> mutate( |
|
x1=_.x1 * image.shape[1] / augs[0].width, |
|
y1=_.y1 * image.shape[0] / augs[0].height, |
|
x2=_.x2 * image.shape[1] / augs[0].width, |
|
y2=_.y2 * image.shape[0] / augs[0].height, |
|
) |
|
>> mutate(width=_.x2 - _.x1, height=_.y2 - _.y1) |
|
>> mutate(cx=(_.x1 + _.x2) / 2, cy=(_.y1 + _.y2) / 2) |
|
>> mutate(area=_.width * _.height) |
|
>> mutate(ar=_.width / _.height) |
|
) |
|
boxes.insert( |
|
0, |
|
"file_name", |
|
image_name.name if isinstance(image_name, Path) else image_name, |
|
) |
|
boxes["max_size"] = boxes[["width", "height"]].max(axis=1) |
|
|
|
ar_boxes = ( |
|
boxes |
|
>> filter(_.width / _.height < ar_threshold) |
|
>> filter(_.height / _.width < ar_threshold) |
|
) |
|
|
|
return ar_boxes[ar_boxes.area > ar_boxes.area.max() * size_threshold] |
|
|
|
@staticmethod |
|
def init_cols(bboxes): |
|
bboxes = bboxes.copy() |
|
|
|
|
|
X = np.reshape(bboxes.cx.to_list(), (-1, 1)) |
|
ms = MeanShift(bandwidth=100, bin_seeding=True) |
|
ms.fit(X) |
|
cols = ms.predict(X) |
|
bboxes["col"] = cols |
|
|
|
bboxes = bboxes.sort_values("cx") |
|
bboxes["mean_cx"] = ( |
|
bboxes.groupby("col").transform("mean", numeric_only=True).cx |
|
) |
|
bboxes = bboxes.sort_values("mean_cx") |
|
for i, val in enumerate(bboxes.mean_cx.unique()): |
|
bboxes.loc[bboxes["mean_cx"] == val, "col"] = i |
|
|
|
|
|
bboxes = bboxes.sort_values("cy") |
|
X = np.reshape(bboxes.cy.to_list(), (-1, 1)) |
|
ms = MeanShift(bandwidth=100, bin_seeding=True) |
|
ms.fit(X) |
|
rows = ms.predict(X) |
|
bboxes["row"] = rows |
|
|
|
bboxes = bboxes.sort_values("cy") |
|
bboxes["mean_cy"] = ( |
|
bboxes.groupby("row").transform("mean", numeric_only=True).cy |
|
) |
|
bboxes = bboxes.sort_values("mean_cy") |
|
for i, val in zip(["a", "b", "c"], bboxes.mean_cy.unique()): |
|
bboxes.loc[bboxes["mean_cy"] == val, "row"] = i |
|
|
|
bboxes = bboxes.sort_values("cx") |
|
|
|
return bboxes |
|
|
|
@staticmethod |
|
def finalize_indexing(bboxes, image_name): |
|
bboxes = bboxes.copy() |
|
bboxes = bboxes.sort_values("cx") |
|
labels_unique = bboxes.col.unique() |
|
labels = bboxes.col.to_numpy() |
|
if len(labels_unique) < 4: |
|
inc_labels = [[i, 0] for i in range(len(labels_unique))] |
|
max_width = bboxes.max_size.max() |
|
|
|
|
|
|
|
left_most_line = get_first_vert_line(image_name=image_name) |
|
if left_most_line is not None: |
|
left_most_point = bboxes.x1.min() - min( |
|
left_most_line[0], left_most_line[1] |
|
) |
|
else: |
|
left_most_point = bboxes.x1.min() - (max_width / 2) |
|
i = 1 |
|
while left_most_point > i * 1.1 * max_width: |
|
inc_labels[0][1] += 1 |
|
i += 1 |
|
|
|
|
|
prev_min_min = bboxes[bboxes.col == 0].x2.max() |
|
|
|
for label in labels_unique[1:]: |
|
current_label_contours = bboxes[bboxes.col == label] |
|
max_width = current_label_contours.max_size.max() |
|
min_left = current_label_contours.x1.min() |
|
i = 1 |
|
while min_left - prev_min_min > i * 1.1 * max_width: |
|
inc_labels[label][1] += 1 |
|
i += 1 |
|
prev_min_min = min_left + max_width |
|
|
|
for pos, inc in reversed(inc_labels): |
|
labels[labels >= pos] += inc |
|
|
|
bboxes["col"] = labels |
|
|
|
labels_unique = np.unique(labels) |
|
|
|
bboxes["col"] += 1 |
|
|
|
return bboxes.sort_values(["row", "col"]) |
|
|
|
def index_plate( |
|
self, |
|
image_name, |
|
score_threshold=0.90, |
|
ar_threshold=1.5, |
|
size_threshold=0.50, |
|
): |
|
bboxes = self.prepare_bboxes( |
|
image_name=image_name, |
|
score_threshold=score_threshold, |
|
ar_threshold=ar_threshold, |
|
size_threshold=size_threshold, |
|
) |
|
if bboxes.shape[0] == 0: |
|
return bboxes |
|
|
|
bboxes = self.init_cols(bboxes=bboxes) |
|
bboxes = self.finalize_indexing(bboxes=bboxes, image_name=image_name) |
|
|
|
return bboxes |
|
|
|
|
|
def test_augmentations( |
|
df, |
|
image_size, |
|
kinds: list = ["resize", "train"], |
|
row_count=2, |
|
col_count=4, |
|
**aug_params, |
|
): |
|
src_dataset = LeafDiskDetectorDataset( |
|
csv=df, |
|
transform=get_augmentations( |
|
image_size=image_size, kinds=["resize"], **aug_params |
|
), |
|
) |
|
|
|
test_dataset = LeafDiskDetectorDataset( |
|
csv=df, |
|
transform=get_augmentations(image_size=image_size, kinds=kinds, **aug_params), |
|
) |
|
|
|
image_name = df.sample(n=1).iloc[0].plate_name |
|
|
|
images = [(src_dataset.draw_image_with_boxes(plate_name=image_name), "Source")] + [ |
|
(test_dataset.draw_image_with_boxes(plate_name=image_name), "Augmented") |
|
for i in range(row_count * col_count - 1) |
|
] |
|
|
|
make_patches_grid( |
|
images=images, |
|
row_count=row_count, |
|
col_count=col_count, |
|
figsize=(col_count * 4, row_count * 3), |
|
) |
|
|
|
|
|
def get_file_path_from_row(row, path_to_patches: Path): |
|
return path_to_patches.joinpath(row.file_name) |
|
|
|
|
|
def get_fast_images( |
|
row, path_to_patches, percent_radius: float = 1.0, add_process_image: bool = False |
|
): |
|
d = {} |
|
try: |
|
d["leaf_disc_box"] = get_leaf_disk_wbb( |
|
row.file_name, row, image_path=get_file_path_from_row(row, path_to_patches) |
|
) |
|
except: |
|
pass |
|
try: |
|
d["segmented_leaf_disc"] = get_fast_segment_disk( |
|
image_name=row.file_name, |
|
bboxes=row, |
|
percent_radius=percent_radius, |
|
image_path=get_file_path_from_row(row, path_to_patches), |
|
) |
|
except: |
|
pass |
|
try: |
|
d["leaf_disc_patch"] = get_fast_leaf_disk_patch( |
|
image_name=row.file_name, |
|
bboxes=row, |
|
percent_radius=percent_radius, |
|
image_path=get_file_path_from_row(row, path_to_patches), |
|
) |
|
except: |
|
pass |
|
if add_process_image is True: |
|
try: |
|
d["process_image"] = draw_fast_bb_to_patch_process( |
|
image_name=row.file_name, |
|
bboxes=row, |
|
percent_radius=percent_radius, |
|
image_path=get_file_path_from_row(row, path_to_patches), |
|
) |
|
except: |
|
pass |
|
|
|
return d |
|
|
|
|
|
def save_images(row: pd.Series, images_data: dict, errors: dict, paths: dict): |
|
fn = f"{Path(row.file_name).stem}_{row.row}_{int(row.col)}.png" |
|
for k, image in images_data.items(): |
|
if k not in paths: |
|
continue |
|
path_to_image = paths[k].joinpath(fn) |
|
if image is not None: |
|
if path_to_image.is_file() is False: |
|
cv2.imwrite(str(path_to_image), cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) |
|
elif errors is not None: |
|
errors[k].append(row.file_name) |
|
else: |
|
pass |
|
|
|
|
|
def handle_bbox( |
|
row: pd.Series, |
|
paths: dict, |
|
errors: dict = None, |
|
percent_radius: float = 1.0, |
|
add_process_image: bool = False, |
|
): |
|
save_images( |
|
row=row, |
|
images_data=get_fast_images( |
|
row=row, |
|
percent_radius=percent_radius, |
|
add_process_image=add_process_image, |
|
path_to_patches=paths["plates"], |
|
), |
|
errors=errors, |
|
paths=paths, |
|
) |
|
|