Spaces:
Running
Running
from curses import raw | |
from .data_augment import TrainTransform, ValTransform | |
from .datasets.coco import COCODataset | |
from .datasets.mm_coco import MM_COCODataset | |
from .datasets.mosaicdetection import MosaicDetection | |
from utils.common.others import HiddenPrints | |
import os | |
import json | |
from tqdm import tqdm | |
from utils.common.log import logger | |
from .norm_categories_index import ensure_index_start_from_1_and_successive | |
def get_default_yolox_coco_dataset(data_dir, json_file_path, img_size=416, train=True): | |
logger.info(f'[get yolox dataset] "{json_file_path}"') | |
if train: | |
with HiddenPrints(): | |
dataset = COCODataset( | |
data_dir=data_dir, | |
json_file=json_file_path, | |
name='', | |
img_size=(img_size, img_size), | |
preproc=TrainTransform( | |
max_labels=50, | |
flip_prob=0.5, | |
hsv_prob=1.0), | |
cache=False, | |
) | |
# dataset = COCODataset( | |
# data_dir=data_dir, | |
# json_file=json_file_path, | |
# name='', | |
# img_size=(img_size, img_size), | |
# preproc=ValTransform(legacy=False), | |
# ) | |
dataset = MosaicDetection( | |
dataset, | |
mosaic=True, | |
img_size=(img_size, img_size), | |
preproc=TrainTransform( | |
max_labels=120, | |
flip_prob=0.5, | |
hsv_prob=1.0), | |
degrees=10.0, | |
translate=0.1, | |
mosaic_scale=(0.1, 2), | |
mixup_scale=(0.5, 1.5), | |
shear=2.0, | |
enable_mixup=True, | |
mosaic_prob=1.0, | |
mixup_prob=1.0, | |
only_return_xy=True | |
) | |
else: | |
with HiddenPrints(): | |
dataset = COCODataset( | |
data_dir=data_dir, | |
json_file=json_file_path, | |
name='', | |
img_size=(img_size, img_size), | |
preproc=ValTransform(legacy=False), | |
) | |
# print(json_file_path, len(dataset)) | |
return dataset | |
def get_yolox_coco_dataset_with_caption(data_dir, json_file_path, img_size=416, transform=None, train=True, classes=None): | |
logger.info(f'[get yolox dataset] "{json_file_path}"') | |
if train: | |
with HiddenPrints(): | |
dataset = COCODataset( | |
data_dir=data_dir, | |
json_file=json_file_path, | |
name='', | |
img_size=(img_size, img_size), | |
preproc=TrainTransform( | |
max_labels=50, | |
flip_prob=0.5, | |
hsv_prob=1.0), | |
cache=False, | |
) | |
# dataset = COCODataset( | |
# data_dir=data_dir, | |
# json_file=json_file_path, | |
# name='', | |
# img_size=(img_size, img_size), | |
# preproc=ValTransform(legacy=False), | |
# ) | |
coco = dataset.coco | |
dataset = MosaicDetection( | |
dataset, | |
mosaic=True, | |
img_size=(img_size, img_size), | |
preproc=TrainTransform( | |
max_labels=120, | |
flip_prob=0.5, | |
hsv_prob=1.0), | |
degrees=10.0, | |
translate=0.1, | |
mosaic_scale=(0.1, 2), | |
mixup_scale=(0.5, 1.5), | |
shear=2.0, | |
enable_mixup=True, | |
mosaic_prob=1.0, | |
mixup_prob=1.0, | |
only_return_xy=True | |
) | |
dataset = MM_COCODataset(dataset, transform=transform, split='train', coco=coco, classes=classes) | |
else: | |
with HiddenPrints(): | |
dataset = COCODataset( | |
data_dir=data_dir, | |
json_file=json_file_path, | |
name='', | |
img_size=(img_size, img_size), | |
preproc=ValTransform(legacy=False), | |
) | |
dataset = MM_COCODataset(dataset, transform=transform, split='val', coco=dataset.coco, classes=classes) | |
# print(json_file_path, len(dataset)) | |
return dataset | |
import hashlib | |
def _hash(o): | |
if isinstance(o, list): | |
o = sorted(o) | |
elif isinstance(o, dict): | |
o = {k: o[k] for k in sorted(o)} | |
elif isinstance(o, set): | |
o = sorted(list(o)) | |
# else: | |
# print(type(o)) | |
obj = hashlib.md5() | |
obj.update(str(o).encode('utf-8')) | |
return obj.hexdigest() | |
DEBUG = True | |
def remap_dataset(json_file_path, ignore_classes, category_idx_map): | |
# k and v in category_idx_map indicates the index of categories, not 'id' of categories | |
ignore_classes = sorted(list(ignore_classes)) | |
# print(ignore_classes, category_idx_map) | |
if len(ignore_classes) == 0 and category_idx_map is None: | |
return json_file_path | |
# hash_str = '_'.join(ignore_classes) + str(category_idx_map) | |
hash_str = _hash(f'yolox_dataset_cache_{_hash(ignore_classes)}_{_hash(category_idx_map)}') | |
cached_json_file_path = f'{json_file_path}.{hash(hash_str)}' | |
# TODO: | |
if os.path.exists(cached_json_file_path): | |
if DEBUG: | |
os.remove(cached_json_file_path) | |
else: | |
logger.info(f'get cached dataset in {cached_json_file_path}') | |
return cached_json_file_path | |
with open(json_file_path, 'r') as f: | |
raw_ann = json.load(f) | |
id_to_idx_map = {c['id']: i for i, c in enumerate(raw_ann['categories'])} | |
ignore_classes_id = [c['id'] for c in raw_ann['categories'] if c['name'] in ignore_classes] | |
raw_ann['categories'] = [c for c in raw_ann['categories'] if c['id'] not in ignore_classes_id] | |
raw_ann['annotations'] = [ann for ann in raw_ann['annotations'] if ann['category_id'] not in ignore_classes_id] | |
ann_img_map = {ann['image_id']: 1 for ann in raw_ann['annotations']} | |
raw_ann['images'] = [img for img in raw_ann['images'] if img['id'] in ann_img_map.keys()] | |
# print(category_idx_map, id_to_idx_map) | |
# NOTE: category idx starts from 0 or 1? 1 | |
# NOTE: reshuffle "categories" | |
new_categories = [{"id": i, "name": f"dummy-{i}"} for i in range(int(os.environ['_ZQL_NUMC']))] | |
for c in raw_ann['categories']: | |
# print(c) | |
# print(id_to_idx_map, c['id'], category_idx_map) | |
new_idx = category_idx_map[id_to_idx_map[c['id']]] | |
new_categories[new_idx] = c | |
c['id'] = new_idx | |
raw_ann['categories'] = new_categories | |
for ann in raw_ann['annotations']: | |
ann['category_id'] = category_idx_map[id_to_idx_map[ann['category_id']]] | |
if 'segmentation' in ann: | |
del ann['segmentation'] | |
with open(cached_json_file_path, 'w') as f: | |
json.dump(raw_ann, f) | |
return cached_json_file_path | |
def coco_split(ann_json_file_path, ratio=0.8): | |
if os.path.exists(ann_json_file_path + f'.{ratio}.split1') and not DEBUG: | |
return ann_json_file_path + f'.{ratio}.split1', ann_json_file_path + f'.{ratio}.split2' | |
with open(ann_json_file_path, 'r') as f: | |
raw_ann = json.load(f) | |
import copy | |
import torch | |
res_ann1, res_ann2 = copy.deepcopy(raw_ann), copy.deepcopy(raw_ann) | |
images = raw_ann['images'] | |
cache_images_path = ann_json_file_path + '.tmp-cached-shuffled-images' | |
if True: | |
import random | |
random.shuffle(images) | |
torch.save(images, cache_images_path) | |
else: | |
images = torch.load(cache_images_path) | |
images1, images2 = images[0: int(len(images) * ratio)], images[int(len(images) * ratio): ] | |
images1_id, images2_id = {i['id']: 0 for i in images1}, {i['id']: 0 for i in images2} | |
ann1 = [ann for ann in raw_ann['annotations'] if ann['image_id'] in images1_id.keys()] | |
ann2 = [ann for ann in raw_ann['annotations'] if ann['image_id'] in images2_id.keys()] | |
res_ann1['images'] = images1 | |
res_ann1['annotations'] = ann1 | |
res_ann2['images'] = images2 | |
res_ann2['annotations'] = ann2 | |
from utils.common.data_record import write_json | |
write_json(ann_json_file_path + f'.{ratio}.split1', res_ann1, indent=0, backup=False) | |
write_json(ann_json_file_path + f'.{ratio}.split2', res_ann2, indent=0, backup=False) | |
return ann_json_file_path + f'.{ratio}.split1', ann_json_file_path + f'.{ratio}.split2' | |
def coco_train_val_test_split(ann_json_file_path, split): | |
train_ann_p, test_ann_p = coco_split(ann_json_file_path) | |
if split == 'test': | |
return test_ann_p | |
train_ann_p, val_ann_p = coco_split(train_ann_p) | |
return train_ann_p if split == 'train' else val_ann_p | |
def coco_train_val_split(train_ann_p, split): | |
train_ann_p, val_ann_p = coco_split(train_ann_p) | |
return train_ann_p if split == 'train' else val_ann_p | |
def visualize_coco_dataset(dataset, num_images, res_save_p, cxcy): | |
from torchvision.transforms import ToTensor | |
from torchvision.utils import make_grid | |
from PIL import Image, ImageDraw | |
import matplotlib.pyplot as plt | |
import numpy as np | |
def draw_bbox(img, bbox, label, f): | |
# if f: | |
# img = np.uint8(img.transpose(1, 2, 0)) | |
img = Image.fromarray(img) | |
draw = ImageDraw.Draw(img) | |
draw.rectangle(bbox, outline=(255, 0, 0), width=6) | |
draw.text((bbox[0], bbox[1]), label) | |
return np.array(img) | |
d = dataset.dataset | |
if d.__class__.__name__ == 'MosaicDetection': | |
d = d._dataset | |
class_ids = d.class_ids # category_id | |
def get_cname(label): | |
return d.coco.loadCats(class_ids[int(label)])[0]['name'] | |
def cxcywh2xyxy(bbox): | |
cx, cy, w, h = bbox | |
x1, y1 = cx - w/2, cy - h/2 | |
x2, y2 = cx + w/2, cy + h/2 | |
return x1, y1, x2, y2 | |
xs = [] | |
import random | |
for image_i in range(num_images): | |
x, y = dataset[random.randint(0, len(dataset) - 1)][:2] | |
x = np.uint8(x.transpose(1, 2, 0)) | |
for label_i, label_info in enumerate(y): | |
if sum(label_info[1:]) == 0: # pad label | |
break | |
label, bbox = label_info[0], label_info[1:] | |
if cxcy: | |
bbox = cxcywh2xyxy(bbox) | |
x = draw_bbox(x, bbox, str(label) + '-' + get_cname(label), label_i == 0) | |
# print(x.shape) | |
xs += [x] | |
xs = [ToTensor()(x) for x in xs] | |
grid = make_grid(xs, normalize=True, nrow=2) | |
plt.axis('off') | |
img = grid.permute(1, 2, 0).numpy() | |
plt.imshow(img) | |
plt.savefig(res_save_p, dpi=300) | |
plt.clf() | |