samtrack / aot /dataloaders /video_transforms.py
aikenml's picture
Upload folder using huggingface_hub
c985ba4
import random
import cv2
import numpy as np
from PIL import Image
import torch
import torchvision.transforms as TF
import dataloaders.image_transforms as IT
cv2.setNumThreads(0)
class Resize(object):
"""Rescale the image in a sample to a given size.
Args:
output_size (tuple or int): Desired output size. If tuple, output is
matched to output_size. If int, smaller of image edges is matched
to output_size keeping aspect ratio the same.
"""
def __init__(self, output_size, use_padding=False):
assert isinstance(output_size, (int, tuple))
if isinstance(output_size, int):
self.output_size = (output_size, output_size)
else:
self.output_size = output_size
self.use_padding = use_padding
def __call__(self, sample):
return self.padding(sample) if self.use_padding else self.rescale(
sample)
def rescale(self, sample):
prev_img = sample['prev_img']
h, w = prev_img.shape[:2]
if self.output_size == (h, w):
return sample
else:
new_h, new_w = self.output_size
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img':
flagval = cv2.INTER_CUBIC
else:
flagval = cv2.INTER_NEAREST
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
all_tmp = tmp
for tmp in all_tmp:
tmp = cv2.resize(tmp,
dsize=(new_w, new_h),
interpolation=flagval)
new_tmp.append(tmp)
tmp = new_tmp
else:
tmp = cv2.resize(tmp,
dsize=(new_w, new_h),
interpolation=flagval)
sample[elem] = tmp
return sample
def padding(self, sample):
prev_img = sample['prev_img']
h, w = prev_img.shape[:2]
if self.output_size == (h, w):
return sample
else:
new_h, new_w = self.output_size
def sep_pad(x):
x0 = np.random.randint(0, x + 1)
x1 = x - x0
return x0, x1
top_pad, bottom_pad = sep_pad(new_h - h)
left_pad, right_pad = sep_pad(new_w - w)
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img':
pad_value = (124, 116, 104)
else:
pad_value = (0)
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
all_tmp = tmp
for tmp in all_tmp:
tmp = cv2.copyMakeBorder(tmp,
top_pad,
bottom_pad,
left_pad,
right_pad,
cv2.BORDER_CONSTANT,
value=pad_value)
new_tmp.append(tmp)
tmp = new_tmp
else:
tmp = cv2.copyMakeBorder(tmp,
top_pad,
bottom_pad,
left_pad,
right_pad,
cv2.BORDER_CONSTANT,
value=pad_value)
sample[elem] = tmp
return sample
class BalancedRandomCrop(object):
"""Crop randomly the image in a sample.
Args:
output_size (tuple or int): Desired output size. If int, square crop
is made.
"""
def __init__(self,
output_size,
max_step=5,
max_obj_num=5,
min_obj_pixel_num=100):
assert isinstance(output_size, (int, tuple))
if isinstance(output_size, int):
self.output_size = (output_size, output_size)
else:
assert len(output_size) == 2
self.output_size = output_size
self.max_step = max_step
self.max_obj_num = max_obj_num
self.min_obj_pixel_num = min_obj_pixel_num
def __call__(self, sample):
image = sample['prev_img']
h, w = image.shape[:2]
new_h, new_w = self.output_size
new_h = h if new_h >= h else new_h
new_w = w if new_w >= w else new_w
ref_label = sample["ref_label"]
prev_label = sample["prev_label"]
curr_label = sample["curr_label"]
is_contain_obj = False
step = 0
while (not is_contain_obj) and (step < self.max_step):
step += 1
top = np.random.randint(0, h - new_h + 1)
left = np.random.randint(0, w - new_w + 1)
after_crop = []
contains = []
for elem in ([ref_label, prev_label] + curr_label):
tmp = elem[top:top + new_h, left:left + new_w]
contains.append(np.unique(tmp))
after_crop.append(tmp)
all_obj = list(np.sort(contains[0]))
if all_obj[-1] == 0:
continue
# remove background
if all_obj[0] == 0:
all_obj = all_obj[1:]
# remove small obj
new_all_obj = []
for obj_id in all_obj:
after_crop_pixels = np.sum(after_crop[0] == obj_id)
if after_crop_pixels > self.min_obj_pixel_num:
new_all_obj.append(obj_id)
if len(new_all_obj) == 0:
is_contain_obj = False
else:
is_contain_obj = True
if len(new_all_obj) > self.max_obj_num:
random.shuffle(new_all_obj)
new_all_obj = new_all_obj[:self.max_obj_num]
all_obj = [0] + new_all_obj
post_process = []
for elem in after_crop:
new_elem = elem * 0
for idx in range(len(all_obj)):
obj_id = all_obj[idx]
if obj_id == 0:
continue
mask = elem == obj_id
new_elem += (mask * idx).astype(np.uint8)
post_process.append(new_elem.astype(np.uint8))
sample["ref_label"] = post_process[0]
sample["prev_label"] = post_process[1]
curr_len = len(sample["curr_img"])
sample["curr_label"] = []
for idx in range(curr_len):
sample["curr_label"].append(post_process[idx + 2])
for elem in sample.keys():
if 'meta' in elem or 'label' in elem:
continue
if elem == 'curr_img':
new_tmp = []
for tmp_ in sample[elem]:
tmp_ = tmp_[top:top + new_h, left:left + new_w]
new_tmp.append(tmp_)
sample[elem] = new_tmp
else:
tmp = sample[elem]
tmp = tmp[top:top + new_h, left:left + new_w]
sample[elem] = tmp
obj_num = len(all_obj) - 1
sample['meta']['obj_num'] = obj_num
return sample
class RandomScale(object):
"""Randomly resize the image and the ground truth to specified scales.
Args:
scales (list): the list of scales
"""
def __init__(self, min_scale=1., max_scale=1.3, short_edge=None):
self.min_scale = min_scale
self.max_scale = max_scale
self.short_edge = short_edge
def __call__(self, sample):
# Fixed range of scales
sc = np.random.uniform(self.min_scale, self.max_scale)
# Align short edge
if self.short_edge is not None:
image = sample['prev_img']
h, w = image.shape[:2]
if h > w:
sc *= float(self.short_edge) / w
else:
sc *= float(self.short_edge) / h
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img':
flagval = cv2.INTER_CUBIC
else:
flagval = cv2.INTER_NEAREST
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
for tmp_ in tmp:
tmp_ = cv2.resize(tmp_,
None,
fx=sc,
fy=sc,
interpolation=flagval)
new_tmp.append(tmp_)
tmp = new_tmp
else:
tmp = cv2.resize(tmp,
None,
fx=sc,
fy=sc,
interpolation=flagval)
sample[elem] = tmp
return sample
class RandomScaleV2(object):
"""Randomly resize the image and the ground truth to specified scales.
Args:
scales (list): the list of scales
"""
def __init__(self,
min_scale=0.36,
max_scale=1.0,
short_edge=None,
ratio=[3. / 4., 4. / 3.]):
self.min_scale = min_scale
self.max_scale = max_scale
self.short_edge = short_edge
self.ratio = ratio
def __call__(self, sample):
image = sample['prev_img']
h, w = image.shape[:2]
new_h, new_w = self.get_params(h, w)
sc_x = float(new_w) / w
sc_y = float(new_h) / h
# Align short edge
if not (self.short_edge is None):
if h > w:
sc_x *= float(self.short_edge) / w
sc_y *= float(self.short_edge) / w
else:
sc_x *= float(self.short_edge) / h
sc_y *= float(self.short_edge) / h
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img':
flagval = cv2.INTER_CUBIC
else:
flagval = cv2.INTER_NEAREST
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
for tmp_ in tmp:
tmp_ = cv2.resize(tmp_,
None,
fx=sc_x,
fy=sc_y,
interpolation=flagval)
new_tmp.append(tmp_)
tmp = new_tmp
else:
tmp = cv2.resize(tmp,
None,
fx=sc_x,
fy=sc_y,
interpolation=flagval)
sample[elem] = tmp
return sample
def get_params(self, height, width):
area = height * width
log_ratio = [np.log(item) for item in self.ratio]
for _ in range(10):
target_area = area * np.random.uniform(self.min_scale**2,
self.max_scale**2)
aspect_ratio = np.exp(np.random.uniform(log_ratio[0],
log_ratio[1]))
w = int(round(np.sqrt(target_area * aspect_ratio)))
h = int(round(np.sqrt(target_area / aspect_ratio)))
if 0 < w <= width and 0 < h <= height:
return h, w
# Fallback to central crop
in_ratio = float(width) / float(height)
if in_ratio < min(self.ratio):
w = width
h = int(round(w / min(self.ratio)))
elif in_ratio > max(self.ratio):
h = height
w = int(round(h * max(self.ratio)))
else: # whole image
w = width
h = height
return h, w
class RestrictSize(object):
"""Randomly resize the image and the ground truth to specified scales.
Args:
scales (list): the list of scales
"""
def __init__(self, max_short_edge=None, max_long_edge=800 * 1.3):
self.max_short_edge = max_short_edge
self.max_long_edge = max_long_edge
assert ((max_short_edge is None)) or ((max_long_edge is None))
def __call__(self, sample):
# Fixed range of scales
sc = None
image = sample['ref_img']
h, w = image.shape[:2]
# Align short edge
if not (self.max_short_edge is None):
if h > w:
short_edge = w
else:
short_edge = h
if short_edge < self.max_short_edge:
sc = float(self.max_short_edge) / short_edge
else:
if h > w:
long_edge = h
else:
long_edge = w
if long_edge > self.max_long_edge:
sc = float(self.max_long_edge) / long_edge
if sc is None:
new_h = h
new_w = w
else:
new_h = int(sc * h)
new_w = int(sc * w)
new_h = new_h - (new_h - 1) % 4
new_w = new_w - (new_w - 1) % 4
if new_h == h and new_w == w:
return sample
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if 'label' in elem:
flagval = cv2.INTER_NEAREST
else:
flagval = cv2.INTER_CUBIC
tmp = cv2.resize(tmp, dsize=(new_w, new_h), interpolation=flagval)
sample[elem] = tmp
return sample
class RandomHorizontalFlip(object):
"""Horizontally flip the given image and ground truth randomly with a probability of 0.5."""
def __init__(self, prob):
self.p = prob
def __call__(self, sample):
if random.random() < self.p:
for elem in sample.keys():
if 'meta' in elem:
continue
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
for tmp_ in sample[elem]:
tmp_ = cv2.flip(tmp_, flipCode=1)
new_tmp.append(tmp_)
sample[elem] = new_tmp
else:
tmp = sample[elem]
tmp = cv2.flip(tmp, flipCode=1)
sample[elem] = tmp
return sample
class RandomVerticalFlip(object):
"""Vertically flip the given image and ground truth randomly with a probability of 0.5."""
def __init__(self, prob=0.3):
self.p = prob
def __call__(self, sample):
if random.random() < self.p:
for elem in sample.keys():
if 'meta' in elem:
continue
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
for tmp_ in sample[elem]:
tmp_ = cv2.flip(tmp_, flipCode=0)
new_tmp.append(tmp_)
sample[elem] = new_tmp
else:
tmp = sample[elem]
tmp = cv2.flip(tmp, flipCode=0)
sample[elem] = tmp
return sample
class RandomGaussianBlur(object):
def __init__(self, prob=0.3, sigma=[0.1, 2.]):
self.aug = TF.RandomApply([IT.GaussianBlur(sigma)], p=prob)
def __call__(self, sample):
for elem in sample.keys():
if 'meta' in elem or 'label' in elem:
continue
if elem == 'curr_img':
new_tmp = []
for tmp_ in sample[elem]:
tmp_ = self.apply_augmentation(tmp_)
new_tmp.append(tmp_)
sample[elem] = new_tmp
else:
tmp = sample[elem]
tmp = self.apply_augmentation(tmp)
sample[elem] = tmp
return sample
def apply_augmentation(self, x):
x = Image.fromarray(np.uint8(x))
x = self.aug(x)
x = np.array(x, dtype=np.float32)
return x
class RandomGrayScale(RandomGaussianBlur):
def __init__(self, prob=0.2):
self.aug = TF.RandomGrayscale(p=prob)
class RandomColorJitter(RandomGaussianBlur):
def __init__(self,
prob=0.8,
brightness=0.4,
contrast=0.4,
saturation=0.2,
hue=0.1):
self.aug = TF.RandomApply(
[TF.ColorJitter(brightness, contrast, saturation, hue)], p=prob)
class SubtractMeanImage(object):
def __init__(self, mean, change_channels=False):
self.mean = mean
self.change_channels = change_channels
def __call__(self, sample):
for elem in sample.keys():
if 'image' in elem:
if self.change_channels:
sample[elem] = sample[elem][:, :, [2, 1, 0]]
sample[elem] = np.subtract(
sample[elem], np.array(self.mean, dtype=np.float32))
return sample
def __str__(self):
return 'SubtractMeanImage' + str(self.mean)
class ToTensor(object):
"""Convert ndarrays in sample to Tensors."""
def __call__(self, sample):
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if elem == 'curr_img' or elem == 'curr_label':
new_tmp = []
for tmp_ in tmp:
if tmp_.ndim == 2:
tmp_ = tmp_[:, :, np.newaxis]
tmp_ = tmp_.transpose((2, 0, 1))
new_tmp.append(torch.from_numpy(tmp_).int())
else:
tmp_ = tmp_ / 255.
tmp_ -= (0.485, 0.456, 0.406)
tmp_ /= (0.229, 0.224, 0.225)
tmp_ = tmp_.transpose((2, 0, 1))
new_tmp.append(torch.from_numpy(tmp_))
tmp = new_tmp
else:
if tmp.ndim == 2:
tmp = tmp[:, :, np.newaxis]
tmp = tmp.transpose((2, 0, 1))
tmp = torch.from_numpy(tmp).int()
else:
tmp = tmp / 255.
tmp -= (0.485, 0.456, 0.406)
tmp /= (0.229, 0.224, 0.225)
tmp = tmp.transpose((2, 0, 1))
tmp = torch.from_numpy(tmp)
sample[elem] = tmp
return sample
class MultiRestrictSize(object):
def __init__(self,
max_short_edge=None,
max_long_edge=800,
flip=False,
multi_scale=[1.3],
align_corners=True,
max_stride=16):
self.max_short_edge = max_short_edge
self.max_long_edge = max_long_edge
self.multi_scale = multi_scale
self.flip = flip
self.align_corners = align_corners
self.max_stride = max_stride
def __call__(self, sample):
samples = []
image = sample['current_img']
h, w = image.shape[:2]
for scale in self.multi_scale:
# restrict short edge
sc = 1.
if self.max_short_edge is not None:
if h > w:
short_edge = w
else:
short_edge = h
if short_edge > self.max_short_edge:
sc *= float(self.max_short_edge) / short_edge
new_h, new_w = sc * h, sc * w
# restrict long edge
sc = 1.
if self.max_long_edge is not None:
if new_h > new_w:
long_edge = new_h
else:
long_edge = new_w
if long_edge > self.max_long_edge:
sc *= float(self.max_long_edge) / long_edge
new_h, new_w = sc * new_h, sc * new_w
new_h = int(new_h * scale)
new_w = int(new_w * scale)
if self.align_corners:
if (new_h - 1) % self.max_stride != 0:
new_h = int(
np.around((new_h - 1) / self.max_stride) *
self.max_stride + 1)
if (new_w - 1) % self.max_stride != 0:
new_w = int(
np.around((new_w - 1) / self.max_stride) *
self.max_stride + 1)
else:
if new_h % self.max_stride != 0:
new_h = int(
np.around(new_h / self.max_stride) * self.max_stride)
if new_w % self.max_stride != 0:
new_w = int(
np.around(new_w / self.max_stride) * self.max_stride)
if new_h == h and new_w == w:
samples.append(sample)
else:
new_sample = {}
for elem in sample.keys():
if 'meta' in elem:
new_sample[elem] = sample[elem]
continue
tmp = sample[elem]
if 'label' in elem:
new_sample[elem] = sample[elem]
continue
else:
flagval = cv2.INTER_CUBIC
tmp = cv2.resize(tmp,
dsize=(new_w, new_h),
interpolation=flagval)
new_sample[elem] = tmp
samples.append(new_sample)
if self.flip:
now_sample = samples[-1]
new_sample = {}
for elem in now_sample.keys():
if 'meta' in elem:
new_sample[elem] = now_sample[elem].copy()
new_sample[elem]['flip'] = True
continue
tmp = now_sample[elem]
tmp = tmp[:, ::-1].copy()
new_sample[elem] = tmp
samples.append(new_sample)
return samples
class MultiToTensor(object):
def __call__(self, samples):
for idx in range(len(samples)):
sample = samples[idx]
for elem in sample.keys():
if 'meta' in elem:
continue
tmp = sample[elem]
if tmp is None:
continue
if tmp.ndim == 2:
tmp = tmp[:, :, np.newaxis]
tmp = tmp.transpose((2, 0, 1))
samples[idx][elem] = torch.from_numpy(tmp).int()
else:
tmp = tmp / 255.
tmp -= (0.485, 0.456, 0.406)
tmp /= (0.229, 0.224, 0.225)
tmp = tmp.transpose((2, 0, 1))
samples[idx][elem] = torch.from_numpy(tmp)
return samples