Spaces:
Sleeping
Sleeping
from pathlib import Path | |
import itertools | |
from typing import Any | |
import math | |
import random | |
from collections import namedtuple, defaultdict | |
import warnings | |
from tqdm import tqdm | |
import numpy as np | |
import pandas as pd | |
from scipy.spatial.transform import Rotation as R | |
from sklearn.cluster import MeanShift | |
import skimage.feature as feature | |
from skimage import color | |
from skimage.feature import SIFT, match_descriptors, plot_matches | |
from skimage.filters import ( | |
threshold_otsu, | |
threshold_triangle, | |
threshold_triangle, | |
threshold_isodata, | |
threshold_li, | |
threshold_yen, | |
threshold_mean, | |
threshold_minimum, | |
threshold_triangle, | |
threshold_yen, | |
) | |
import cv2 | |
from PIL import Image | |
import matplotlib.pyplot as plt | |
import scripts.tap_const as tc | |
def _update_axis( | |
axis, | |
image, | |
title=None, | |
fontsize=18, | |
remove_axis=True, | |
title_loc="center", | |
): | |
axis.imshow(image, origin="upper") | |
if title is not None: | |
axis.set_title(title, fontsize=fontsize, loc=title_loc) | |
if remove_axis is True: | |
axis.set_axis_off() | |
Circle = namedtuple("Circle", field_names="x,y,r") | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
color_spaces = { | |
"rgb": ["red", "green", "blue"], | |
"hsv": ["h", "s", "v"], | |
"yiq": ["y", "i", "q"], | |
"lab": ["l", "a", "b"], | |
} | |
available_threholds = ["otsu", "triangle", "isodata", "li", "mean", "minimum", "yen"] | |
def bgr_to_rgb(clr: tuple) -> tuple: | |
"""Converts from BGR to RGB | |
Arguments: | |
clr {tuple} -- Source color | |
Returns: | |
tuple -- Converted color | |
""" | |
return (clr[2], clr[1], clr[0]) | |
def rgb_to_bgr(clr: tuple) -> tuple: | |
"""Converts from RGB to BGR | |
Arguments: | |
clr {tuple} -- Source color | |
Returns: | |
tuple -- Converted color | |
""" | |
return (clr[0], clr[1], clr[2]) | |
def load_image(file_name, file_path=None, rgb: bool = True, image_size: int = None): | |
path = ( | |
file_name | |
if isinstance(file_name, Path) is True and file_name.is_file() is True | |
else file_path.joinpath(file_name) | |
) | |
try: | |
image = cv2.imread(str(path)) | |
if rgb is True: | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
if image_size is not None: | |
image = cv2.resize( | |
image, dsize=(image_size, image_size), interpolation=cv2.INTER_LANCZOS4 | |
) | |
return image | |
except Exception as e: | |
print(file_name) | |
print(f"Failed load imge: {str(e)}") | |
return None | |
def to_pil(image, size: tuple = None): | |
if image is None: | |
return None | |
ret = Image.fromarray(image) | |
if size is not None: | |
ret = ret.resize(size=size, resample=Image.Resampling.LANCZOS) | |
return ret | |
def to_cv(image, size: tuple = None): | |
if size is not None: | |
image = image.resize(size=size, resample=Image.Resampling.LANCZOS) | |
return np.array(image) | |
def rgb2X(rgb_color, target_color_space): | |
if isinstance(rgb_color, np.ndarray) is False: | |
rgb_color = np.array(rgb_color) | |
if target_color_space == "hsv": | |
return color.rgb2hsv(rgb_color) | |
elif target_color_space == "yiq": | |
return color.rgb2yiq(rgb_color) | |
elif target_color_space == "lab": | |
return color.rgb2lab(rgb_color) | |
else: | |
return rgb_color | |
def get_channels(image, color_space): | |
"""Get all channels from a colorspace | |
Args: | |
image (np.ndarray): Source RGB image | |
color_space (str): colorspace | |
Raises: | |
NotImplementedError: Unknown color space | |
Returns: | |
tuple: channels | |
""" | |
if color_space == "rgb": | |
return cv2.split(image) | |
elif color_space == "hsv": | |
return cv2.split(cv2.cvtColor(image, cv2.COLOR_BGR2HSV)) | |
elif color_space == "yiq": | |
return [ | |
((c - np.min(c)) / (np.max(c) - np.min(c)) * 255).astype(np.uint8) | |
for c in cv2.split(to_cv(color.rgb2yiq(to_pil(image)))) | |
] | |
elif color_space == "lab": | |
return cv2.split(cv2.cvtColor(image, cv2.COLOR_BGR2LAB)) | |
else: | |
raise NotImplementedError(f"Unknowncolorspace {color_space}") | |
def get_channel(image, color_space, channel): | |
channels = get_channels(image=image, color_space=color_space) | |
if channel in ["red", "h", "y", "l"]: | |
return channels[0] | |
if channel in ["green", "s", "i", "a"]: | |
return channels[1] | |
if channel in ["blue", "v", "q", "b"]: | |
return channels[2] | |
else: | |
raise NotImplementedError( | |
f"Unknown combination color space {color_space}, channel {channel}" | |
) | |
def multi_and(image_list: tuple): | |
"""Performs an AND with all the images in the tuple | |
:param image_list: | |
:return: image | |
""" | |
img_lst = [i for i in image_list if i is not None] | |
list_len_ = len(img_lst) | |
if list_len_ == 0: | |
return None | |
elif list_len_ == 1: | |
return img_lst[0] | |
else: | |
res = cv2.bitwise_and(img_lst[0], img_lst[1]) | |
if len(img_lst) > 2: | |
for current_image in img_lst[2:]: | |
res = cv2.bitwise_and(res, current_image) | |
return res | |
def multi_or(image_list: tuple): | |
"""Performs an OR with all the images in the tuple | |
:param image_list: | |
:return: image | |
""" | |
img_lst = [i for i in image_list if i is not None] | |
list_len_ = len(img_lst) | |
if list_len_ == 0: | |
return None | |
elif list_len_ == 1: | |
return img_lst[0] | |
else: | |
res = cv2.bitwise_or(img_lst[0], img_lst[1]) | |
if list_len_ > 2: | |
for current_image in img_lst[2:]: | |
res = cv2.bitwise_or(res, current_image) | |
return res | |
def get_threshold(image, colorspace, channel, method, is_over: bool): | |
channel = get_channels(image=image, color_space=colorspace)[ | |
color_spaces[colorspace].index(channel) | |
] | |
if method == "otsu": | |
threshold = threshold_otsu(channel) | |
elif method == "triangle": | |
threshold = threshold_triangle(channel) | |
elif method == "isodata": | |
threshold = threshold_isodata(channel) | |
elif method == "li": | |
threshold = threshold_li(channel) | |
elif method == "mean": | |
threshold = threshold_mean(channel) | |
elif method == "minimum": | |
threshold = threshold_minimum(channel) | |
elif method == "yen": | |
threshold = threshold_yen(channel) | |
else: | |
raise NotImplementedError(f"Unknown method {method}") | |
return threshold, channel > threshold if is_over is True else channel < threshold | |
class Rectangle: | |
def __init__(self, x, y, w, h, score=None, user="dummy") -> None: | |
self.x = int(x) | |
self.y = int(y) | |
self.w = int(w) | |
self.h = int(h) | |
self.score = score | |
self.user = user | |
def __repr__(self) -> str: | |
return f"[{self.x},{self.y}:{self.w},{self.h}]" | |
def __str__(self) -> str: | |
return f"[{self.x},{self.y}:{self.w},{self.h}]" | |
def draw(self, image, color=tc.C_WHITE, thickness=2): | |
return cv2.rectangle( | |
image, | |
pt1=(self.x1, self.y1), | |
pt2=(self.x2, self.y2), | |
color=color, | |
thickness=thickness, | |
) | |
def draw_as_bbox(self, image, color=tc.C_WHITE, thickness=2): | |
return cv2.circle( | |
self.draw(image=image, color=color, thickness=thickness), | |
center=(self.cx, self.cy), | |
radius=5, | |
color=tuple(reversed(color)), | |
thickness=thickness, | |
) | |
def to_dict(self, extended: bool = False): | |
out = self.__dict__ | |
if extended is False: | |
return out | |
return out | { | |
"cx": self.cx, | |
"cy": self.cy, | |
"x1": self.x1, | |
"x2": self.x2, | |
"y1": self.y1, | |
"y2": self.y2, | |
} | |
def assign(self, src): | |
self.x = src.x | |
self.y = src.y | |
self.w = src.w | |
self.h = src.h | |
self.user = src.user | |
self.score = src.score | |
def union(self, b): | |
posX = min(self.x, b.x) | |
posY = min(self.y, b.y) | |
res = Rectangle( | |
x=posX, | |
y=posY, | |
w=max(self.right, b.right) - posX, | |
h=max(self.bottom, b.bottom) - posY, | |
user=self.user, | |
) | |
return res | |
def intersection(self, b): | |
posX = max(self.x, b.x) | |
posY = max(self.y, b.y) | |
candidate = Rectangle( | |
x=posX, | |
y=posY, | |
w=min(self.right, b.right) - posX, | |
h=min(self.bottom, b.bottom) - posY, | |
) | |
if candidate.w > 0 and candidate.h > 0: | |
return candidate | |
return Rectangle(0, 0, 0, 0) | |
def ratio(self, b): | |
return self.intersection(b).area / self.union(b).area | |
def contains(self, b, ratio): | |
return self.ratio(b) > ratio | |
def to_bbox(self): | |
return [self.x, self.y, self.right, self.bottom] | |
def from_row(cls, row): | |
return cls(x=row.x, y=row.y, w=row.w, h=row.h) | |
def from_bbox(cls, bbox): | |
x, y, w, h = bbox | |
return cls(x=x, y=y, w=w, h=h) | |
def bottom(self): | |
return self.y + self.h | |
def right(self): | |
return self.x + self.w | |
def start_row(self): | |
return self.y | |
def end_row(self): | |
return self.y + self.h | |
def start_col(self): | |
return self.x | |
def end_col(self): | |
return self.x + self.w | |
def cx(self): | |
return self.x + self.w // 2 | |
def cy(self): | |
return self.y + self.h // 2 | |
def x1(self): | |
return self.x | |
def x2(self): | |
return self.x + self.w | |
def y1(self): | |
return self.y | |
def y2(self): | |
return self.y + self.h | |
def area(self): | |
return self.w * self.h | |
def get_brightness(image, bbox: Rectangle = None): | |
if bbox is None: | |
r, g, b = cv2.split(image) | |
else: | |
r, g, b = cv2.split( | |
image[bbox.start_row : bbox.end_row, bbox.start_col : bbox.end_col] | |
) | |
return np.sqrt( | |
0.241 * np.power(r.astype(float), 2) | |
+ 0.691 * np.power(g.astype(float), 2) | |
+ 0.068 * np.power(b.astype(float), 2) | |
).mean() | |
def get_sharpness(image): | |
return cv2.Laplacian( | |
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY), | |
cv2.CV_64F, | |
).var() | |
def masked_image(image, mask, background_alpha: float = 0.8): | |
background = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) * background_alpha | |
background[background > 255] = 255 | |
background = cv2.merge([background, background, background]).astype(np.uint8) | |
return cv2.bitwise_or( | |
cv2.bitwise_and(background, background, mask=255 - mask), | |
cv2.bitwise_and(image, image, mask=mask), | |
) | |
def remove_empty_boxes(boxes): | |
return boxes.dropna(subset=["x", "y", "w", "h"]) | |
def draw_boxes(image, boxes, thickness=2): | |
boxes_ = remove_empty_boxes(boxes=boxes) | |
if len(boxes_) > 0: | |
for rect in [Rectangle.from_row(row) for _, row in boxes_.iterrows()]: | |
image = rect.draw(image=image, color=tc.C_FUCHSIA, thickness=thickness) | |
return image | |
def rotate_image(image, angle): | |
(h, w) = image.shape[:2] | |
return cv2.warpAffine( | |
image, cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0), (w, h) | |
) | |
def fix_brightness( | |
image, | |
brightness_target=70, | |
bbox: Rectangle = Rectangle(10, 10, 100, 100), | |
pass_through: bool = False, | |
): | |
avg_bright = get_brightness(image=image, bbox=bbox) | |
if pass_through is True: | |
return image | |
elif avg_bright > brightness_target: | |
gamma = brightness_target / avg_bright | |
if gamma != 1: | |
inv_gamma = 1.0 / gamma | |
table = np.array( | |
[((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)] | |
).astype("uint8") | |
return cv2.LUT(src=image, lut=table) | |
else: | |
return image | |
else: | |
return cv2.convertScaleAbs( | |
src=image, | |
alpha=(brightness_target + avg_bright) / (2 * avg_bright), | |
beta=(brightness_target - avg_bright) / 2, | |
) | |
def update_data(data, new_data): | |
for k, v in new_data.items(): | |
if k not in data: | |
data[k] = [] | |
data[k].append(v) | |
return data | |
def add_perceived_bightness_data(image, data): | |
b, g, r = cv2.split(image) | |
s = np.sqrt( | |
0.241 * np.power(r.astype(float), 2) | |
+ 0.691 * np.power(g.astype(float), 2) | |
+ 0.068 * np.power(b.astype(float), 2) | |
) | |
return update_data( | |
data=data, | |
new_data={ | |
"cl_bright_mean": s.mean(), | |
"cl_bright_std": np.std(s), | |
"cl_bright_min": s.min(), | |
"cl_bright_max": s.max(), | |
}, | |
) | |
def add_hsv_data(image, data): | |
h, s, *_ = cv2.split(cv2.cvtColor(image, cv2.COLOR_BGR2HSV)) | |
return update_data( | |
data=data, | |
new_data={ | |
"cl_hue_mean": h.mean(), | |
"cl_hue_std": np.std(h), | |
"cl_hue_min": h.min(), | |
"cl_hue_max": h.max(), | |
"cl_sat_mean": s.mean(), | |
"cl_sat_std": np.std(s), | |
"cl_sat_min": s.min(), | |
"cl_sat_max": s.max(), | |
}, | |
) | |
def add_lab_data(image, data): | |
_, a, b = cv2.split(cv2.cvtColor(image, cv2.COLOR_BGR2LAB)) | |
return update_data( | |
data=data, | |
new_data={ | |
"cl_a_mean": a.mean(), | |
"cl_a_std": np.std(a), | |
"cl_a_min": a.min(), | |
"cl_a_max": a.max(), | |
"cl_b_mean": b.mean(), | |
"cl_b_std": np.std(b), | |
"cl_b_min": b.min(), | |
"cl_b_max": b.max(), | |
}, | |
) | |
def add_yiq_data(image, data): | |
_, i, q = get_channels(image=image, color_space="yiq") | |
return update_data( | |
data=data, | |
new_data={ | |
"cl_i_mean": i.mean(), | |
"cl_i_std": np.std(i), | |
"cl_i_min": i.min(), | |
"cl_i_max": i.max(), | |
"cl_q_mean": q.mean(), | |
"cl_q_std": np.std(q), | |
"cl_q_min": q.min(), | |
"cl_q_max": q.max(), | |
}, | |
) | |
def add_grey_comatrix_data(image, data, distances, angles) -> dict: | |
graycom = feature.graycomatrix( | |
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY), | |
[x[2] for x in distances], | |
[x[2] for x in angles], | |
levels=256, | |
) | |
new_data = {} | |
for k, v in dict( | |
contrast=np.array(feature.graycoprops(graycom, "contrast")), | |
dissimilarity=np.array(feature.graycoprops(graycom, "dissimilarity")), | |
homogeneity=np.array(feature.graycoprops(graycom, "homogeneity")), | |
energy=np.array(feature.graycoprops(graycom, "energy")), | |
correlation=np.array(feature.graycoprops(graycom, "correlation")), | |
asm=np.array(feature.graycoprops(graycom, "ASM")), | |
).items(): | |
for e in itertools.product(distances, angles): | |
new_data[f"gp_{k}_{e[0][0]}_{e[1][0]}"] = v[e[0][1]][e[1][1]] | |
return update_data(data=data, new_data=new_data) | |
def get_sharpness(image): | |
return cv2.Laplacian( | |
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY), | |
cv2.CV_64F, | |
).var() | |
def add_image_data( | |
df, | |
file_path_column, | |
path_to_images=None, | |
distances=[["d1", 0, 1], ["d10", 1, 10]], | |
angles=[["a0", 0, 0], ["api4", 1, np.pi / 4]], | |
image_size: int = 0, | |
): | |
patch_data = defaultdict(list) | |
for file_path in tqdm(df[file_path_column], desc="Embedding image data"): | |
image = load_image(file_name=file_path, file_path=path_to_images) | |
if image_size > 0: | |
image = cv2.resize(image, dsize=(image_size, image_size)) | |
patch_data[file_path_column].append(file_path) | |
patch_data = add_perceived_bightness_data(image=image, data=patch_data) | |
patch_data = add_hsv_data(image=image, data=patch_data) | |
patch_data = add_lab_data(image=image, data=patch_data) | |
patch_data = add_yiq_data(image=image, data=patch_data) | |
patch_data = add_grey_comatrix_data( | |
image=image, | |
data=patch_data, | |
distances=distances, | |
angles=angles, | |
) | |
patch_data["sharpness"].append(get_sharpness(image)) | |
return pd.merge( | |
left=df, right=pd.DataFrame(data=patch_data), on=file_path_column, how="left" | |
) | |
def filter_contours(mask, min_contour_size): | |
return cv2.bitwise_and( | |
cv2.drawContours( | |
np.zeros_like(mask), | |
[ | |
c | |
for c in cv2.findContours( | |
mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_TC89_L1 | |
)[-2:-1][0] | |
if cv2.contourArea(c) > min_contour_size and c.any() | |
], | |
-1, | |
255, | |
-1, | |
), | |
mask, | |
) | |
return ( | |
cv2.drawContours( | |
mask, | |
contours=[ | |
c | |
for c in get_external_contours(mask) | |
if cv2.contourArea(c) < min_contour_size | |
], | |
contourIdx=-1, | |
color=0, | |
thickness=-1, | |
) | |
if min_contour_size > 0 | |
else mask | |
) | |
def get_raw_mask( | |
image, | |
thresholds: list, | |
min_contour_size: int = 0, | |
bitwise_op: str = "and", | |
delete_over: int = -1, | |
): | |
"""Create a mask seed fh,or SAM | |
Args: | |
image (np.array): Source image | |
thresholds (list): list of thresholds to be applies | |
cm_min_contour_size (int): Contours below threshold will be discarded | |
""" | |
mask = thresholds[0].threshold(image) | |
for threshold in thresholds[1:]: | |
if bitwise_op == "and": | |
mask = cv2.bitwise_and(mask, threshold.threshold(image)) | |
elif bitwise_op == "or": | |
mask = cv2.bitwise_or(mask, threshold.threshold(image)) | |
else: | |
raise NotImplementedError | |
if min_contour_size > 0: | |
mask = filter_contours(mask, min_contour_size=min_contour_size) | |
if delete_over > 0: | |
h, w = mask.shape | |
mask = cv2.bitwise_and( | |
mask, | |
cv2.circle( | |
np.zeros_like(mask), | |
center=(w // 2, h // 2), | |
color=255, | |
radius=delete_over, | |
thickness=-1, | |
), | |
) | |
return mask | |
def build_distance_map( | |
mask, threshold: float = 0.8, demo: bool = False, label: int = 1 | |
): | |
dist_transform = cv2.distanceTransform( | |
mask, cv2.DIST_L2, cv2.DIST_MASK_PRECISE | |
).astype(np.uint8) | |
threshold = np.max(dist_transform) * threshold | |
ret = dist_transform.copy() | |
ret[ret >= threshold] = 255 | |
ret[ret < threshold] = 0 | |
dt = (dist_transform.astype(float) / np.max(dist_transform) * 255).astype(np.uint8) | |
dt = cv2.equalizeHist(dist_transform) | |
if demo is True: | |
if label == 1: | |
return cv2.merge([np.zeros_like(mask), ret, np.zeros_like(mask)]) | |
elif label == 0: | |
return cv2.merge([ret, np.zeros_like(mask), np.zeros_like(mask)]) | |
else: | |
raise NotImplementedError | |
else: | |
return ret | |
def get_contours(mask, retrieve_mode, method): | |
return [ | |
c | |
for c in cv2.findContours(mask.copy(), retrieve_mode, method)[-2:-1][0] | |
if cv2.contourArea(c, True) < 0 | |
] | |
def get_seeds( | |
mask, modes: dict = {"rnd": 10}, label: int = 1, max_seeds: int = 0 | |
) -> dict: | |
input_points = pd.Series() | |
if "rnd" in modes: | |
nz = np.count_nonzero(mask) | |
if nz > 0: | |
input_points = pd.concat( | |
[ | |
input_points, | |
pd.Series([list(p[0]) for p in cv2.findNonZero(mask)]).sample( | |
n=nz // modes["rnd"] | |
), | |
] | |
) | |
if "dist" in modes: | |
dt = cv2.equalizeHist( | |
cv2.distanceTransform(mask, cv2.DIST_L2, cv2.DIST_MASK_PRECISE).astype( | |
np.uint8 | |
) | |
) | |
nz = np.count_nonzero(dt) | |
if nz > 0: | |
candidates = cv2.findNonZero(dt) | |
input_points = pd.concat( | |
[ | |
input_points, | |
pd.Series( | |
[ | |
p[0] | |
for p in random.choices( | |
population=candidates, | |
weights=list(dt[np.nonzero(dt)] / 255), | |
k=candidates.size // int(modes["dist"]), | |
) | |
] | |
), | |
] | |
) | |
if len(input_points) > max_seeds: | |
input_points = input_points.sample(n=max_seeds) | |
input_points = np.array(input_points.to_list()) | |
return { | |
"input_points": input_points, | |
"input_labels": np.array([label for _ in range(len(input_points))]), | |
} | |
def get_grid(mask, dots_per_rc): | |
grid = np.zeros_like(mask) | |
step = np.max(grid.shape[:2]) // dots_per_rc | |
grid[::step, ::step] = 1 | |
grid[step // 2 :: step, step // 2 :: step] = 1 | |
return grid.astype(np.uint8) | |
def get_grid_seeds( | |
mask_keep, mask_discard, keep_dots_per_row: int, discard_dots_per_row: int | |
): | |
keep_points = np.array( | |
[ | |
p[0] | |
for p in cv2.findNonZero( | |
cv2.bitwise_and( | |
mask_keep, get_grid(mask=mask_keep, dots_per_rc=keep_dots_per_row) | |
) | |
) | |
] | |
) | |
discard_points = np.array( | |
[ | |
p[0] | |
for p in cv2.findNonZero( | |
cv2.bitwise_and( | |
mask_discard, | |
get_grid(mask=mask_discard, dots_per_rc=discard_dots_per_row), | |
) | |
) | |
] | |
) | |
return merge_seeds( | |
[ | |
{ | |
"input_points": keep_points, | |
"input_labels": np.array([1 for _ in range(len(keep_points))]), | |
}, | |
{ | |
"input_points": discard_points, | |
"input_labels": np.array([0 for _ in range(len(discard_points))]), | |
}, | |
] | |
) | |
def merge_seeds(seeds_list: list): | |
try: | |
return { | |
"input_points": np.concatenate( | |
[a["input_points"] for a in seeds_list if a["input_points"].size > 0] | |
), | |
"input_labels": np.concatenate( | |
[a["input_labels"] for a in seeds_list if a["input_labels"].size > 0] | |
), | |
} | |
except Exception as e: | |
return { | |
"input_points": np.array([]), | |
"input_labels": np.array([]), | |
} | |
def enlarge_contours(mask, increase_by) -> list: | |
return Morphologer( | |
op="dilate", kernel_size=increase_by, proc_count=1, kernel=cv2.MORPH_ELLIPSE | |
)(mask) | |
class Morphologer: | |
allowed_ops = ["none", "erode", "dilate", "open", "close"] | |
def __init__( | |
self, op=None, kernel_size=3, proc_count=1, kernel=cv2.MORPH_RECT | |
) -> None: | |
self.op = op | |
self.kernel_size = kernel_size | |
self.proc_count = proc_count | |
self.kernel = kernel | |
def __call__(self, mask) -> Any: | |
return self.process(mask=mask) | |
def __repr__(self) -> str: | |
return f"{self.op}|{self.kernel_size}|{self.proc_count}|{self.kernel}" | |
def to_json(self): | |
return self.__dict__ | |
def from_json(self, d: dict): | |
for k, v in d.items(): | |
setattr(self, k, v) | |
def create_from_json(cls, d: dict): | |
out = cls() | |
out.from_json(d) | |
return out | |
def update(self, op, kernel_size, proc_count, kernel): | |
self.op = op | |
self.kernel_size = kernel_size | |
self.proc_count = proc_count | |
self.kernel = kernel | |
def process(self, mask): | |
if self.kernel_size > 2: | |
if self.op == "erode": | |
op = cv2.MORPH_ERODE | |
elif self.op == "dilate": | |
op = cv2.MORPH_DILATE | |
elif self.op == "open": | |
op = cv2.MORPH_OPEN | |
elif self.op == "close": | |
op = cv2.MORPH_CLOSE | |
elif self.op == "none": | |
return mask | |
else: | |
raise NotImplementedError | |
for _ in range(self.proc_count): | |
mask = cv2.morphologyEx( | |
mask, | |
op=op, | |
kernel=cv2.getStructuringElement( | |
shape=self.kernel, ksize=(self.kernel_size, self.kernel_size) | |
), | |
) | |
return mask | |
class Thresholder: | |
def __init__( | |
self, | |
color_space: str = "hsv", | |
channel: str = "h", | |
min=0, | |
max=255, | |
) -> None: | |
self.color_space = color_space | |
self.channel = channel | |
self.min = min | |
self.max = max | |
def __repr__(self) -> str: | |
return f"{self.color_space}|{self.channel}|{self.min}|{self.max}" | |
def __str__(self) -> str: | |
return f"{self.color_space}, {self.channel}: {self.min}->{self.max}" | |
def update(self, min_max): | |
self.min = min_max[0] | |
self.max = min_max[1] | |
def to_json(self): | |
return self.__dict__ | |
def from_json(self, d: dict): | |
for k, v in d.items(): | |
setattr(self, k, v) | |
def create_from_json(cls, d: dict): | |
out = cls() | |
out.from_json(d) | |
return out | |
def threshold(self, image): | |
channel = get_channel( | |
image=image, color_space=self.color_space, channel=self.channel | |
) | |
return cv2.inRange(channel, self.min, self.max) | |
def __call__(self, image) -> Any: | |
return self.threshold(image=image) | |
def display_name(self) -> str: | |
return f"{self.color_space}: {self.channel}" | |
def as_tuple(self) -> tuple: | |
return self.min, self.max | |
def check_hue( | |
image, mask, cnt, ok_hue_thresholder: Thresholder, ok_hue_pxl_ratio: float = 0.2 | |
): | |
x, y, w, h = [int(i) for i in cv2.boundingRect(cnt)] | |
masked_image = cv2.bitwise_and( | |
image, image, mask=cv2.drawContours(np.zeros_like(mask), [cnt], -1, 255, -1) | |
) | |
box_hue = cv2.split( | |
cv2.cvtColor(masked_image[y : y + h, x : x + w], cv2.COLOR_RGB2HSV) | |
)[0] | |
return ( | |
box_hue[ | |
(box_hue > ok_hue_thresholder.min) & (box_hue < ok_hue_thresholder.max) | |
].size | |
> np.count_nonzero(box_hue) * ok_hue_pxl_ratio | |
) | |
def get_distance_data(hull, origin, max_dist): | |
""" | |
Calculates distances from origin to contour barycenter, | |
also returns surface data | |
:param hull: | |
:param origin: | |
:param max_dist: | |
:return: dict | |
""" | |
m = cv2.moments(hull) | |
if m["m00"] != 0: | |
cx_ = int(m["m10"] / m["m00"]) | |
cy_ = int(m["m01"] / m["m00"]) | |
dist_ = math.sqrt(math.pow(cx_ - origin.x, 2) + math.pow(cy_ - origin.y, 2)) | |
dist_scaled_inverted = 1 - dist_ / max_dist | |
res_ = dict( | |
dist=dist_, | |
cx=cx_, | |
cy=cy_, | |
dist_scaled_inverted=dist_scaled_inverted, | |
area=cv2.contourArea(hull), | |
scaled_area=cv2.contourArea(hull) * math.pow(dist_scaled_inverted, 2), | |
) | |
else: | |
res_ = dict( | |
dist=0, | |
cx=0, | |
cy=0, | |
dist_scaled_inverted=0, | |
area=0, | |
scaled_area=0, | |
) | |
return res_ | |
def check_size(cnt, tolerance_area): | |
return (tolerance_area is not None) and ( | |
(tolerance_area < 0) or cv2.contourArea(cnt) >= tolerance_area | |
) | |
def is_contour_overlap(mask, cmp_contour, master_contour) -> bool: | |
cmp_image = cv2.drawContours(np.zeros_like(mask), [cmp_contour], -1, 255, -1) | |
master_image = cv2.drawContours(np.zeros_like(mask), [master_contour], -1, 255, -1) | |
return cv2.bitwise_and(cmp_image, master_image).any() == True | |
def is_distance_ok(mask, cmp_contour, master_contour, tolerance_distance=None) -> bool: | |
# print(cv2.minEnclosingCircle(cmp_contour)) | |
cmp_image = enlarge_contours( | |
cv2.drawContours(np.zeros_like(mask), [cmp_contour], -1, 255, -1), | |
increase_by=tolerance_distance, | |
) | |
# print(cv2.minEnclosingCircle(get_external_contours(cmp_image)[0])) | |
# print(cv2.minEnclosingCircle(master_contour)) | |
master_image = enlarge_contours( | |
cv2.drawContours(np.zeros_like(mask), [master_contour], -1, 255, -1), | |
increase_by=tolerance_distance, | |
) | |
# print(cv2.minEnclosingCircle(get_external_contours(master_image)[0])) | |
bit_image = cv2.bitwise_and(cmp_image, master_image) | |
return bit_image.any() == True | |
def fast_check_contour( | |
mask, cmp_contour, master_contour, tolerance_area=None, tolerance_distance=None | |
): | |
# Check contour intersection | |
if is_contour_overlap( | |
mask=mask, cmp_contour=cmp_contour, master_contour=master_contour | |
): | |
return tc.KLC_OVERLAPS | |
# Check contour distance | |
ok_dist = is_distance_ok(mask, cmp_contour, master_contour, tolerance_distance) | |
# Check contour size | |
ok_size = check_size(cnt=cmp_contour, tolerance_area=tolerance_area) | |
if ok_size and ok_dist: | |
return tc.KLC_OK_TOLERANCE | |
elif not ok_size and not ok_dist: | |
return tc.KLC_SMALL_FAR | |
elif not ok_size: | |
return tc.KLC_SMALL | |
elif not ok_dist: | |
return tc.KLC_FAR | |
# MARK: fast Keep linled contours | |
def fast_keep_linked_contours( | |
src_mask, | |
tolerance_distance: int, | |
tolerance_area: int, | |
morph_op: Morphologer, | |
min_contour_size: int = 0, | |
epsilon: float = 0.001, | |
skip_linked_contours: bool = False, | |
mean_channel_data: dict = None, | |
source_image=None, | |
) -> dict: | |
mask = src_mask.copy() | |
# Delete all small contours | |
if min_contour_size > 0: | |
mask = filter_contours(mask=mask, min_contour_size=min_contour_size) | |
# Apply morphology operation to remove vnoise | |
if morph_op is not None: | |
mask = morph_op(mask) | |
contours = get_external_contours(mask) | |
# print(len(contours)) | |
# ret = [] | |
# canvas = cv2.drawContours( | |
# cv2.merge([np.zeros_like(src_mask) for _ in range(3)]), | |
# contours, | |
# -1, | |
# tc.C_WHITE, | |
# -1, | |
# ) | |
# canvas = cv2.drawContours(canvas, contours, -1, tc.C_FUCHSIA, 4) | |
# ret.append(canvas) | |
# Skip if not Enough contours | |
if len(contours) == 0: | |
return { | |
"im_clean_mask": np.zeros_like(src_mask), | |
"im_clean_mask_demo": cv2.merge( | |
[ | |
np.zeros_like(src_mask), | |
np.zeros_like(src_mask), | |
np.zeros_like(src_mask), | |
] | |
), | |
} | |
elif len(contours) == 1 or skip_linked_contours is True: | |
clean_mask = cv2.bitwise_and( | |
src_mask, | |
cv2.drawContours( | |
image=np.zeros_like(mask), | |
contours=contours, | |
contourIdx=-1, | |
color=255, | |
thickness=-1, | |
), | |
) | |
return { | |
"im_clean_mask": clean_mask, | |
"im_clean_mask_demo": cv2.merge( | |
[ | |
src_mask, | |
clean_mask, | |
cv2.drawContours( | |
image=np.zeros_like(mask), | |
contours=[ | |
cv2.approxPolyDP( | |
cnt, epsilon * cv2.arcLength(cnt, True), True | |
) | |
for cnt in contours | |
], | |
contourIdx=-1, | |
color=255, | |
thickness=-1, | |
), | |
] | |
), | |
} | |
# Find root contour | |
if mean_channel_data is not None and source_image is not None: | |
# Use contour with matching mean color | |
channel = get_channel( | |
image=source_image, | |
color_space=mean_channel_data["color_space"], | |
channel=mean_channel_data["channel"], | |
) | |
df = pd.DataFrame( | |
data={ | |
"area": [cv2.contourArea(c) for c in contours], | |
"mean_dist": [ | |
abs( | |
cv2.mean( | |
src=channel.flatten(), | |
mask=cv2.drawContours( | |
np.zeros_like(mask), [c], -1, 255, -1 | |
).flatten(), | |
)[0] | |
- mean_channel_data["mean"] | |
) | |
for c in contours | |
], | |
} | |
) | |
if abs(df.mean_dist.min() - df.mean_dist.max()) < 4: | |
root_index = df[df.area == df.area.max()].index[0] | |
else: | |
X = np.reshape(df.mean_dist.to_list(), (-1, 1)) | |
ms = MeanShift() | |
ms.fit(X) | |
df["level"] = ms.predict(X) | |
df["level_dist_min"] = ( | |
df.groupby("level").transform(lambda x: x.mean()).mean_dist | |
) | |
df = df[df["level_dist_min"] == df["level_dist_min"].min()] | |
root_index = df[df.area == df.area.max()].index[0] | |
root_contour = contours[root_index] | |
good_contours = [contours.pop(root_index)] | |
unknown_contours = [] | |
else: | |
# Use bigger contour in the center | |
# Find the largest contour | |
root_contour = contours[0] | |
big_idx = 0 | |
h, w = src_mask.shape | |
roi_root = Circle(w // 2, h // 2, max(h, w) // 2) | |
dist_max = roi_root.r | |
max_area = 0 | |
for i, contour in enumerate(contours): | |
morph_dict = get_distance_data(contour, roi_root, dist_max) | |
if morph_dict["scaled_area"] > max_area: | |
max_area = morph_dict["scaled_area"] | |
root_contour = contour | |
big_idx = i | |
# parse all contours and switch | |
good_contours = [contours.pop(big_idx)] | |
unknown_contours = [] | |
# print(len(contours) + len(good_contours)) | |
# canvas = cv2.drawContours( | |
# cv2.merge([np.zeros_like(src_mask) for _ in range(3)]), | |
# contours, | |
# -1, | |
# tc.C_WHITE, | |
# -1, | |
# ) | |
# canvas = cv2.drawContours(canvas, contours, -1, tc.C_FUCHSIA, 4) | |
# # canvas = cv2.drawContours(canvas, good_contours, -1, tc.C_GREEN, -1) | |
# canvas = cv2.drawContours(canvas, good_contours, -1, tc.C_LIME, 4) | |
# ret.append(canvas) | |
# return ret | |
while len(contours) > 0: | |
contour = contours.pop() | |
res = fast_check_contour( | |
mask=src_mask, | |
cmp_contour=contour, | |
master_contour=root_contour, | |
tolerance_area=tolerance_area, | |
tolerance_distance=tolerance_distance, | |
) | |
if res == tc.KLC_FULLY_INSIDE: | |
pass | |
elif res in [tc.KLC_OVERLAPS, tc.KLC_OK_TOLERANCE]: | |
good_contours.append(contour) | |
else: | |
unknown_contours.append(contour) | |
# Try to aggregate unknown contours to good contours | |
stable = False | |
while not stable: | |
stable = True | |
i = 0 | |
iter_count = 1 | |
while i < len(unknown_contours): | |
contour = unknown_contours[i] | |
res = tc.KLC_SMALL_FAR | |
for good_contour in good_contours: | |
res = fast_check_contour( | |
mask=src_mask, | |
cmp_contour=contour, | |
master_contour=good_contour, | |
tolerance_area=tolerance_area, | |
tolerance_distance=tolerance_distance, | |
) | |
if res == tc.KLC_FULLY_INSIDE: | |
del unknown_contours[i] | |
stable = False | |
break | |
elif res in [tc.KLC_OVERLAPS, tc.KLC_OK_TOLERANCE]: | |
good_contours.append(unknown_contours.pop(i)) | |
stable = False | |
break | |
elif res in [ | |
tc.KLC_SMALL_FAR, | |
tc.KLC_SMALL, | |
tc.KLC_FAR, | |
]: | |
pass | |
if res in [ | |
tc.KLC_SMALL_FAR, | |
tc.KLC_SMALL, | |
tc.KLC_FAR, | |
]: | |
i += 1 | |
iter_count += 1 | |
# At this point we have the zone were the contours are allowed to be | |
contour_template = cv2.bitwise_and( | |
cv2.drawContours(np.zeros_like(mask), good_contours, -1, 255, -1), | |
mask, | |
) | |
out_mask = np.zeros_like(mask) | |
for cnt in get_contours(src_mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE): | |
if ( | |
cv2.contourArea(cnt, oriented=True) < 0 | |
and cv2.bitwise_and( | |
contour_template, | |
cv2.drawContours(np.zeros_like(mask), [cnt], 0, 255, -1), | |
).any() | |
): | |
out_mask = cv2.drawContours(out_mask, [cnt], 0, 255, -1) | |
im_clean_mask = cv2.bitwise_and(out_mask, src_mask) | |
im_clean_mask_demo = cv2.merge( | |
[np.zeros_like(src_mask), np.zeros_like(src_mask), np.zeros_like(src_mask)] | |
) | |
for contour in good_contours: | |
im_clean_mask_demo = cv2.drawContours( | |
im_clean_mask_demo, [contour], 0, tc.C_WHITE, -1 | |
) | |
im_clean_mask_demo = cv2.drawContours( | |
im_clean_mask_demo, [root_contour], 0, tc.C_GREEN, -1 | |
) | |
for contour in unknown_contours: | |
ok_size = check_size(cnt=contour, tolerance_area=tolerance_area) | |
ok_distance = is_distance_ok( | |
mask=im_clean_mask, | |
cmp_contour=contour, | |
master_contour=good_contours[0], | |
tolerance_distance=tolerance_distance, | |
) | |
contour_color = ( | |
bgr_to_rgb(tc.C_FUCHSIA) | |
if ok_distance is False and ok_size is False | |
else bgr_to_rgb(tc.C_RED) if ok_size is False else bgr_to_rgb(tc.C_BLUE) | |
) | |
im_clean_mask_demo = cv2.drawContours( | |
im_clean_mask_demo, [contour], 0, contour_color, -1 | |
) | |
im_clean_mask_demo = cv2.bitwise_and( | |
im_clean_mask_demo, im_clean_mask_demo, mask=src_mask | |
) | |
return { | |
"im_clean_mask": im_clean_mask, | |
"im_clean_mask_demo": im_clean_mask_demo, | |
} | |
def get_cnt_center(cnt): | |
mmnt = cv2.moments(cnt) | |
return int(mmnt["m10"] / mmnt["m00"]), int(mmnt["m01"] / mmnt["m00"]) | |
def dbg_min_distance(mask, cnt1, cnt2): | |
if cv2.bitwise_and( | |
cv2.drawContours(mask, [cnt1], -1, 255, -1), | |
cv2.drawContours(mask, [cnt2], -1, 255, -1), | |
).any(): | |
return 0, get_cnt_center(cnt1), get_cnt_center(cnt2) | |
min_dist = 1000000000000 | |
for cur_pt1 in cnt1: | |
for cur_pt2 in cnt2: | |
cur_dist = cv2.norm(cur_pt1, cur_pt2) | |
if abs(cur_dist) < min_dist: | |
min_dist = abs(cur_dist) | |
pt1 = cur_pt1 | |
pt2 = cur_pt2 | |
return min_dist, pt1[0], pt2[0] | |
def min_distance(mask, cnt1, cnt2): | |
if cv2.bitwise_and( | |
cv2.drawContours(np.zeros_like(mask), [cnt1], -1, 255, -1), | |
cv2.drawContours(np.zeros_like(mask), [cnt2], -1, 255, -1), | |
).any(): | |
return 0 | |
return min(*[cv2.norm(pt1, pt2) for pt1, pt2 in itertools.product(cnt1, cnt2)]) | |
def check_dist(distance, tolerance_distance): | |
return (tolerance_distance is not None) and ( | |
tolerance_distance < 0 or distance <= tolerance_distance | |
) | |
def check_hull( | |
mask, cmp_hull, master_hull, tolerance_area=None, tolerance_distance=None | |
): | |
# Check hull intersection | |
if cv2.bitwise_and( | |
cv2.drawContours(np.zeros_like(mask), [cmp_hull], -1, 255, -1), | |
cv2.drawContours(np.zeros_like(mask), [master_hull], -1, 255, -1), | |
).any(): | |
return tc.KLC_OVERLAPS | |
# Check point to point | |
min_dist = min_distance(mask=mask, cnt1=cmp_hull, cnt2=master_hull) | |
if min_dist <= 0: | |
return tc.KLC_OVERLAPS | |
else: | |
ok_size = check_size(cnt=cmp_hull, tolerance_area=tolerance_area) | |
ok_dist = check_dist(distance=min_dist, tolerance_distance=tolerance_distance) | |
if ok_size and ok_dist: | |
return tc.KLC_OK_TOLERANCE | |
elif not ok_size and not ok_dist: | |
return tc.KLC_SMALL_FAR | |
elif not ok_size: | |
return tc.KLC_SMALL | |
elif not ok_dist: | |
return tc.KLC_FAR | |
# MARK: Keep linled contours | |
def keep_linked_contours( | |
src_mask, | |
tolerance_distance: int, | |
tolerance_area: int, | |
morph_op: Morphologer, | |
min_contour_size: int = 0, | |
epsilon: float = 0.001, | |
skip_linked_contours: bool = False, | |
mean_channel_data: dict = None, | |
source_image=None, | |
) -> dict: | |
mask = src_mask.copy() | |
# Delete all small contours | |
if min_contour_size > 0: | |
mask = filter_contours(mask=mask, min_contour_size=min_contour_size) | |
# Apply morphology operation to remove vnoise | |
if morph_op is not None: | |
mask = morph_op(mask) | |
contours = get_external_contours(mask) | |
if len(contours) == 0: | |
return { | |
"im_clean_mask": np.zeros_like(src_mask), | |
"im_clean_mask_demo": cv2.merge( | |
[ | |
np.zeros_like(src_mask), | |
np.zeros_like(src_mask), | |
np.zeros_like(src_mask), | |
] | |
), | |
} | |
elif len(contours) == 1 or skip_linked_contours is True: | |
clean_mask = cv2.bitwise_and( | |
src_mask, | |
cv2.drawContours( | |
image=np.zeros_like(mask), | |
contours=contours, | |
contourIdx=-1, | |
color=255, | |
thickness=-1, | |
), | |
) | |
return { | |
"im_clean_mask": clean_mask, | |
"im_clean_mask_demo": cv2.merge( | |
[ | |
src_mask, | |
clean_mask, | |
cv2.drawContours( | |
image=np.zeros_like(mask), | |
contours=[ | |
cv2.approxPolyDP( | |
cnt, epsilon * cv2.arcLength(cnt, True), True | |
) | |
for cnt in contours | |
], | |
contourIdx=-1, | |
color=255, | |
thickness=-1, | |
), | |
] | |
), | |
} | |
if mean_channel_data is not None and source_image is not None: | |
channel = get_channel( | |
image=source_image, | |
color_space=mean_channel_data["color_space"], | |
channel=mean_channel_data["channel"], | |
) | |
contours = get_external_contours(mask) | |
df = pd.DataFrame( | |
data={ | |
"area": [cv2.contourArea(c) for c in contours], | |
"mean_dist": [ | |
abs( | |
cv2.mean( | |
src=channel.flatten(), | |
mask=cv2.drawContours( | |
np.zeros_like(mask), [c], -1, 255, -1 | |
).flatten(), | |
)[0] | |
- mean_channel_data["mean"] | |
) | |
for c in contours | |
], | |
} | |
) | |
if abs(df.mean_dist.min() - df.mean_dist.max()) < 4: | |
root_index = df[df.area == df.area.max()].index[0] | |
else: | |
X = np.reshape(df.mean_dist.to_list(), (-1, 1)) | |
ms = MeanShift() | |
ms.fit(X) | |
df["level"] = ms.predict(X) | |
df["level_dist_min"] = ( | |
df.groupby("level").transform(lambda x: x.mean()).mean_dist | |
) | |
df = df[df["level_dist_min"] == df["level_dist_min"].min()] | |
root_index = df[df.area == df.area.max()].index[0] | |
hulls = [ | |
cv2.approxPolyDP(cnt, epsilon * cv2.arcLength(cnt, True), True) | |
for cnt in contours | |
] | |
root_hull = hulls[root_index] | |
good_hulls = [hulls.pop(root_index)] | |
unknown_hulls = [] | |
else: | |
# Transform all contours into approximations | |
hulls = [ | |
cv2.approxPolyDP(cnt, epsilon * cv2.arcLength(cnt, True), True) | |
for cnt in contours | |
] | |
# Find the largest hull | |
root_hull = hulls[0] | |
big_idx = 0 | |
h, w = src_mask.shape | |
roi_root = Circle(w // 2, h // 2, max(h, w) // 2) | |
dist_max = roi_root.r | |
max_area = 0 | |
for i, hull in enumerate(hulls): | |
morph_dict = get_distance_data(hull, roi_root, dist_max) | |
if morph_dict["scaled_area"] > max_area: | |
max_area = morph_dict["scaled_area"] | |
root_hull = hull | |
big_idx = i | |
# parse all hulls and switch | |
good_hulls = [hulls.pop(big_idx)] | |
unknown_hulls = [] | |
while len(hulls) > 0: | |
hull = hulls.pop() | |
res = check_hull( | |
mask=src_mask, | |
cmp_hull=hull, | |
master_hull=root_hull, | |
tolerance_area=tolerance_area, | |
tolerance_distance=tolerance_distance, | |
) | |
if res == tc.KLC_FULLY_INSIDE: | |
pass | |
elif res in [tc.KLC_OVERLAPS, tc.KLC_OK_TOLERANCE]: | |
good_hulls.append(hull) | |
else: | |
unknown_hulls.append(hull) | |
# Try to aggregate unknown hulls to good hulls | |
stable = False | |
while not stable: | |
stable = True | |
i = 0 | |
iter_count = 1 | |
while i < len(unknown_hulls): | |
hull = unknown_hulls[i] | |
res = tc.KLC_SMALL_FAR | |
for good_hull in good_hulls: | |
res = check_hull( | |
mask=src_mask, | |
cmp_hull=hull, | |
master_hull=good_hull, | |
tolerance_area=tolerance_area, | |
tolerance_distance=tolerance_distance, | |
) | |
if res == tc.KLC_FULLY_INSIDE: | |
del unknown_hulls[i] | |
stable = False | |
break | |
elif res in [tc.KLC_OVERLAPS, tc.KLC_OK_TOLERANCE]: | |
good_hulls.append(unknown_hulls.pop(i)) | |
stable = False | |
break | |
elif res in [ | |
tc.KLC_SMALL_FAR, | |
tc.KLC_SMALL, | |
tc.KLC_FAR, | |
]: | |
pass | |
if res in [ | |
tc.KLC_SMALL_FAR, | |
tc.KLC_SMALL, | |
tc.KLC_FAR, | |
]: | |
i += 1 | |
iter_count += 1 | |
# At this point we have the zone were the contours are allowed to be | |
hull_template = cv2.bitwise_and( | |
cv2.drawContours(np.zeros_like(mask), good_hulls, -1, 255, -1), | |
mask, | |
) | |
out_mask = np.zeros_like(mask) | |
for cnt in get_contours(src_mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE): | |
if ( | |
cv2.contourArea(cnt, oriented=True) < 0 | |
and cv2.bitwise_and( | |
hull_template, | |
cv2.drawContours(np.zeros_like(mask), [cnt], 0, 255, -1), | |
).any() | |
): | |
out_mask = cv2.drawContours(out_mask, [cnt], 0, 255, -1) | |
im_clean_mask = cv2.bitwise_and(out_mask, src_mask) | |
im_clean_mask_demo = cv2.merge( | |
[np.zeros_like(src_mask), np.zeros_like(src_mask), np.zeros_like(src_mask)] | |
) | |
for hull in good_hulls: | |
im_clean_mask_demo = cv2.drawContours( | |
im_clean_mask_demo, [hull], 0, tc.C_WHITE, -1 | |
) | |
im_clean_mask_demo = cv2.drawContours( | |
im_clean_mask_demo, [root_hull], 0, tc.C_GREEN, -1 | |
) | |
for hull in unknown_hulls: | |
ok_size = check_size(cnt=hull, tolerance_area=tolerance_area) | |
min_dist = ( | |
min_distance(im_clean_mask_demo, hull, good_hulls[0]) | |
if len(good_hulls) == 1 | |
else min( | |
*[ | |
min_distance(im_clean_mask_demo, hull, good_hull) | |
for good_hull in good_hulls | |
] | |
) | |
) | |
ok_distance = check_dist(distance=min_dist, tolerance_distance=tolerance_area) | |
contour_color = ( | |
bgr_to_rgb(tc.C_FUCHSIA) | |
if ok_distance is False and ok_size is False | |
else bgr_to_rgb(tc.C_RED) if ok_size is False else bgr_to_rgb(tc.C_BLUE) | |
) | |
im_clean_mask_demo = cv2.drawContours( | |
im_clean_mask_demo, [hull], 0, contour_color, -1 | |
) | |
im_clean_mask_demo = cv2.bitwise_and( | |
im_clean_mask_demo, im_clean_mask_demo, mask=src_mask | |
) | |
return { | |
"im_clean_mask": im_clean_mask, | |
"im_clean_mask_demo": im_clean_mask_demo, | |
} | |
def get_external_contours(mask): | |
external_contours = [] | |
contours, hierarchys = cv2.findContours( | |
mask, mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE | |
) | |
if len(contours) == 0: | |
return [] | |
for cnt, hier in zip(contours, hierarchys[0]): | |
if hier[-1] == -1: | |
external_contours.append(cnt) | |
if len(external_contours) == 0: | |
return [] | |
external_contours.sort(key=lambda x: cv2.contourArea(x, oriented=False)) | |
return external_contours | |
def get_internal_contours(mask): | |
internal_contours = [] | |
contours, hierarchys = cv2.findContours( | |
mask, mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_SIMPLE | |
) | |
if len(contours) == 0: | |
return [] | |
for cnt, hier in zip(contours, hierarchys[0]): | |
if hier[-1] != -1: | |
internal_contours.append(cnt) | |
if len(internal_contours) == 0: | |
return [] | |
internal_contours.sort(key=lambda x: cv2.contourArea(x, oriented=False)) | |
return internal_contours | |
def get_filled_contours(mask): | |
return [ | |
cnt | |
for cnt in cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)[0] | |
if cv2.bitwise_and( | |
mask, cv2.drawContours(np.zeros_like(mask), [cnt], -1, 255, -1) | |
).any() | |
== True | |
] | |
def get_main_contour(mask): | |
external_contours = get_external_contours(mask) | |
if len(external_contours) == 0: | |
return None | |
elif len(external_contours) == 1: | |
return external_contours[0] | |
else: | |
big_idx = 0 | |
h, w = mask.shape | |
roi_root = Circle(w // 2, h // 2, max(h, w) // 2) | |
dist_max = roi_root.r | |
max_area = 0 | |
for i, hull in enumerate(external_contours): | |
morph_dict = get_distance_data(hull, roi_root, dist_max) | |
if morph_dict["scaled_area"] > max_area: | |
max_area = morph_dict["scaled_area"] | |
big_idx = i | |
return external_contours[big_idx] | |
def get_suspect_contours(mask): | |
external_contours = get_external_contours(mask) | |
if len(external_contours) == 0: | |
return [] | |
big_idx = 0 | |
h, w = mask.shape | |
roi_root = Circle(w // 2, h // 2, max(h, w) // 2) | |
dist_max = roi_root.r | |
max_area = 0 | |
for i, hull in enumerate(external_contours): | |
morph_dict = get_distance_data(hull, roi_root, dist_max) | |
if morph_dict["scaled_area"] > max_area: | |
max_area = morph_dict["scaled_area"] | |
big_idx = i | |
external_contours.pop(big_idx) | |
return external_contours | |
def get_contours_dict(mask): | |
main = get_main_contour(mask) | |
internal = get_internal_contours(mask) | |
suspect = get_suspect_contours(mask) | |
ret = {} | |
if main is not None: | |
ret["main"] = main | |
if internal: | |
ret["internal"] = internal | |
if suspect: | |
ret["suspect"] = suspect | |
return ret | |
def find_matches( | |
previous_image, | |
previous_mask, | |
current_image, | |
current_mask, | |
safe_ratio: float = 0.5, | |
plot_debug: dict = {}, | |
): | |
desc_extractor = SIFT() | |
# Previous image | |
masked_previous_image = cv2.equalizeHist( | |
cv2.cvtColor( | |
cv2.bitwise_and(previous_image, previous_image, mask=previous_mask), | |
cv2.COLOR_RGB2GRAY, | |
) | |
) | |
desc_extractor.detect_and_extract(masked_previous_image) | |
kp_previous = desc_extractor.keypoints | |
desc_previous = desc_extractor.descriptors | |
# Current image | |
masked_current_image = cv2.equalizeHist( | |
cv2.cvtColor( | |
cv2.bitwise_and(current_image, current_image, mask=current_mask), | |
cv2.COLOR_RGB2GRAY, | |
) | |
) | |
desc_extractor.detect_and_extract(masked_current_image) | |
kp_current = desc_extractor.keypoints | |
desc_current = desc_extractor.descriptors | |
# Find matches | |
matches = match_descriptors( | |
desc_previous, desc_current, max_ratio=0.8, cross_check=True | |
) | |
matches_previous = kp_previous[matches[:, 0]] | |
matches_current = kp_current[matches[:, 1]] | |
distances = np.array( | |
[np.linalg.norm(p1 - p2) for p1, p2 in zip(matches_previous, matches_current)] | |
) | |
matches_previous = matches_previous[ | |
(distances < np.median(distances) / safe_ratio) | |
& (distances > np.median(distances) * safe_ratio) | |
] | |
matches_current = matches_current[ | |
(distances < np.median(distances) / safe_ratio) | |
& (distances > np.median(distances) * safe_ratio) | |
] | |
if plot_debug: | |
if "ax" in plot_debug: | |
ax = plot_debug["ax"] | |
else: | |
_, ax = plt.subplots(nrows=1, ncols=1, figsize=(20, 10)) | |
plot_matches( | |
ax=ax, | |
image1=plot_debug["previous"], | |
image2=plot_debug["current"], | |
keypoints1=kp_previous, | |
keypoints2=kp_current, | |
matches=matches, | |
only_matches=plot_debug.get("only_matches", True), | |
) | |
ax.axis("off") | |
if "title" in plot_debug: | |
ax.set_title(plot_debug["title"]) | |
if "ax" not in plot_debug: | |
plt.show() | |
return matches_previous, matches_current | |
def find_rotation_anlge( | |
previous_image, | |
previous_mask, | |
current_image, | |
current_mask, | |
plot_debug: dict = {}, | |
): | |
matches_previous, matches_current = find_matches( | |
previous_image=previous_image, | |
previous_mask=previous_mask, | |
current_image=current_image, | |
current_mask=current_mask, | |
plot_debug=plot_debug, | |
) | |
rot, *_ = R.align_vectors( | |
np.pad( | |
matches_previous - matches_previous.mean(axis=0), | |
pad_width=[0, 1], | |
mode="constant", | |
), | |
np.pad( | |
matches_current - matches_current.mean(axis=0), | |
pad_width=[0, 1], | |
mode="constant", | |
), | |
return_sensitivity=True, | |
) | |
return rot.as_euler("zyx", degrees=True)[0] | |
def match_previous_rotation( | |
previous_image, | |
previous_mask, | |
current_image, | |
current_mask, | |
plot_debug: dict = {}, | |
): | |
if plot_debug: | |
fig = plt.figure( | |
figsize=plot_debug["fig_size"] if "fig_size" in plot_debug else (8, 8) | |
) | |
grid_spec = fig.add_gridspec(nrows=2, ncols=3) | |
plt_descriptors = fig.add_subplot(grid_spec[0, :]) | |
plot_debug = plot_debug | { | |
"previous": cv2.bitwise_and( | |
previous_image, previous_image, mask=previous_mask | |
), | |
"current": cv2.bitwise_and(current_image, current_image, mask=current_mask), | |
"ax": plt_descriptors, | |
} | |
angle = find_rotation_anlge( | |
previous_image=previous_image, | |
previous_mask=previous_mask, | |
current_image=current_image, | |
current_mask=current_mask, | |
plot_debug=plot_debug, | |
) | |
ret = rotate_image(current_image, angle=angle) | |
if plot_debug: | |
plt_descriptors.set_title(f"{plot_debug['title']}, angle={angle:.2f} ") | |
_update_axis( | |
axis=fig.add_subplot(grid_spec[1, 0]), | |
image=previous_image, | |
title="previous image", | |
) | |
_update_axis( | |
axis=fig.add_subplot(grid_spec[1, 1]), image=ret, title="rotated image" | |
) | |
_update_axis( | |
axis=fig.add_subplot(grid_spec[1, 2]), | |
image=current_image, | |
title="current image", | |
) | |
plt.show() | |
return ret | |
def sift_contours( | |
clean_mask, target_mask, morph_op: Morphologer = Morphologer(op="none") | |
): | |
contours = get_contours_dict(morph_op(target_mask)) | |
if "suspect" not in contours: | |
return target_mask | |
suspects = contours["suspect"] | |
goods = [] | |
i = 0 | |
cm = morph_op(clean_mask) | |
while i < len(suspects): | |
if cv2.bitwise_and( | |
cv2.drawContours(np.zeros_like(cm), suspects, i, 255, -1), | |
cm, | |
).any(): | |
goods.append(suspects.pop(i)) | |
else: | |
i += 1 | |
# Finalize | |
ret = cv2.drawContours( | |
np.zeros_like(cm), | |
contours=[contours["main"]] + goods, | |
contourIdx=-1, | |
color=255, | |
thickness=-1, | |
) | |
ret = cv2.drawContours( | |
ret, | |
contours=contours.get("internal", []), | |
contourIdx=-1, | |
color=0, | |
thickness=-1, | |
) | |
return ret | |
def draw_mask( | |
image, | |
mask, | |
background_type: str = "bw", | |
draw_contours: bool = True, | |
mask_properties: list = [], | |
contours_thickness: int = 4, | |
): | |
foreground = cv2.bitwise_and(image, image, mask=mask) | |
if background_type == "bw": | |
background = cv2.cvtColor( | |
cv2.bitwise_and(image, image, mask=255 - mask), cv2.COLOR_RGB2GRAY | |
) | |
background = background * 0.4 | |
background[background > 255] = 255 | |
background = cv2.merge([background, background, background]).astype(np.uint8) | |
elif background_type == "source": | |
background = image.copy() | |
elif isinstance(background_type, tuple): | |
background = np.full_like(image, background_type) | |
else: | |
raise NotImplementedError | |
out = cv2.bitwise_or(foreground, background) | |
if draw_contours is True or isinstance(draw_contours, tuple): | |
cur_color = 0 | |
colors = ( | |
[ | |
tc.C_BLUE, | |
tc.C_BLUE_VIOLET, | |
tc.C_CABIN_BLUE, | |
tc.C_CYAN, | |
tc.C_FUCHSIA, | |
tc.C_LIGHT_STEEL_BLUE, | |
tc.C_MAROON, | |
tc.C_ORANGE, | |
tc.C_PURPLE, | |
tc.C_RED, | |
tc.C_TEAL, | |
tc.C_YELLOW, | |
] | |
if isinstance(draw_contours, bool) | |
else [draw_contours] if isinstance(draw_contours, tuple) else [tc.C_WHITE] | |
) | |
for cnt in cv2.findContours( | |
mask, mode=cv2.RETR_LIST, method=cv2.CHAIN_APPROX_NONE | |
)[0]: | |
out = cv2.drawContours( | |
out, | |
[cnt], | |
contourIdx=0, | |
color=colors[cur_color], | |
thickness=contours_thickness, | |
) | |
cur_color += 1 | |
if cur_color > len(colors) - 1: | |
cur_color = 0 | |
if tc.MP_CENTROID in mask_properties: | |
moments = cv2.moments(mask, binaryImage=True) | |
cmx, cmy = ( | |
moments["m10"] / moments["m00"], | |
moments["m01"] / moments["m00"], | |
) | |
out = cv2.circle(out, (int(cmx), int(cmy)), 10, tc.C_BLUE, 4) | |
return out | |
def draw_marker( | |
image, | |
pos: list, | |
marker_size: int, | |
thickness: int, | |
colors: tuple, | |
marker_type=cv2.MARKER_CROSS, | |
): | |
return cv2.drawMarker( | |
cv2.drawMarker( | |
image, | |
pos, | |
color=colors[0], | |
markerType=marker_type, | |
markerSize=marker_size, | |
thickness=thickness, | |
), | |
pos, | |
color=colors[1], | |
markerType=marker_type, | |
markerSize=marker_size // 2, | |
thickness=thickness // 2, | |
) | |
def draw_seeds(image, seeds, selected_label=None, marker_size=None, marker_thickness=4): | |
marker_size = max(image.shape) // 50 if marker_size is None else marker_size | |
for seed, label in zip(seeds["input_points"], seeds["input_labels"]): | |
if selected_label is not None and label != selected_label: | |
continue | |
image = draw_marker( | |
image=image, | |
pos=seed, | |
marker_size=marker_size, | |
thickness=marker_thickness, | |
colors=(tc.C_WHITE, tc.C_BLUE if label == 0 else tc.C_LIME), | |
marker_type=cv2.MARKER_TILTED_CROSS if label == 0 else cv2.MARKER_SQUARE, | |
) | |
return image | |
def get_concat_h_multi_resize(im_list, resample=Image.Resampling.BICUBIC): | |
min_height = min(im.height for im in im_list) | |
im_list_resize = [ | |
im.resize( | |
(int(im.width * min_height / im.height), min_height), resample=resample | |
) | |
for im in im_list | |
] | |
total_width = sum(im.width for im in im_list_resize) | |
dst = Image.new("RGB", (total_width, min_height)) | |
pos_x = 0 | |
for im in im_list_resize: | |
dst.paste(im, (pos_x, 0)) | |
pos_x += im.width | |
return dst | |
def get_concat_v_multi_resize(im_list, resample=Image.Resampling.BICUBIC): | |
min_width = min(im.width for im in im_list) | |
im_list_resize = [ | |
im.resize((min_width, int(im.height * min_width / im.width)), resample=resample) | |
for im in im_list | |
] | |
total_height = sum(im.height for im in im_list_resize) | |
dst = Image.new("RGB", (min_width, total_height)) | |
pos_y = 0 | |
for im in im_list_resize: | |
dst.paste(im, (0, pos_y)) | |
pos_y += im.height | |
return dst | |
def get_concat_tile_resize(im_list_2d, resample=Image.Resampling.BICUBIC): | |
im_list_v = [ | |
get_concat_h_multi_resize(im_list_h, resample=resample) | |
for im_list_h in im_list_2d | |
] | |
return get_concat_v_multi_resize(im_list_v, resample=resample) | |
def get_tiles(img_list, row_count, resample=Image.Resampling.BICUBIC): | |
if isinstance(img_list, np.ndarray) is False: | |
img_list = np.asarray(img_list, dtype="object") | |
return get_concat_tile_resize(np.split(img_list, row_count), resample) | |