|
import torch |
|
import os |
|
import numpy as np |
|
from .ab_dataset import ABDataset |
|
|
|
|
|
class _SplitDataset(torch.utils.data.Dataset): |
|
"""Used by split_dataset""" |
|
|
|
def __init__(self, underlying_dataset, keys): |
|
super(_SplitDataset, self).__init__() |
|
self.underlying_dataset = underlying_dataset |
|
self.keys = keys |
|
|
|
def __getitem__(self, key): |
|
return self.underlying_dataset[self.keys[key]] |
|
|
|
def __len__(self): |
|
return len(self.keys) |
|
|
|
|
|
def split_dataset(dataset, n, seed=0, transform=None): |
|
|
|
if isinstance(dataset, ABDataset): |
|
if dataset.task_type == 'Object Detection': |
|
return split_dataset_det(dataset, n, seed) |
|
if dataset.task_type == 'MM Object Detection': |
|
return split_dataset_det_mm(dataset, n, seed, transform=transform) |
|
|
|
""" |
|
Return a pair of datasets corresponding to a random split of the given |
|
dataset, with n datapoints in the first dataset and the rest in the last, |
|
using the given random seed |
|
""" |
|
assert n <= len(dataset), f'{n}_{len(dataset)}' |
|
|
|
cache_p = f'{n}_{seed}_{len(dataset)}' |
|
cache_p = os.path.join(os.path.expanduser( |
|
'~'), '.domain_benchmark_split_dataset_cache_' + str(cache_p)) |
|
if os.path.exists(cache_p): |
|
keys_1, keys_2 = torch.load(cache_p) |
|
else: |
|
keys = list(range(len(dataset))) |
|
np.random.RandomState(seed).shuffle(keys) |
|
keys_1 = keys[:n] |
|
keys_2 = keys[n:] |
|
torch.save((keys_1, keys_2), cache_p) |
|
|
|
return _SplitDataset(dataset, keys_1), _SplitDataset(dataset, keys_2) |
|
|
|
|
|
def train_val_split(dataset, split): |
|
assert split in ['train', 'val'] |
|
if split == 'train': |
|
return split_dataset(dataset, int(len(dataset) * 0.8))[0] |
|
else: |
|
return split_dataset(dataset, int(len(dataset) * 0.8))[1] |
|
|
|
|
|
def train_val_test_split(dataset, split): |
|
assert split in ['train', 'val', 'test'] |
|
|
|
train_set, test_set = split_dataset(dataset, int(len(dataset) * 0.8)) |
|
train_set, val_set = split_dataset(train_set, int(len(train_set) * 0.8)) |
|
|
|
return {'train': train_set, 'val': val_set, 'test': test_set}[split] |
|
|
|
|
|
def split_dataset_det(dataset: ABDataset, n, seed=0): |
|
coco_ann_json_path = dataset.ann_json_file_path_for_split |
|
from .object_detection.yolox_data_util.api import coco_split, get_default_yolox_coco_dataset |
|
split_coco_ann_json_path = coco_split(coco_ann_json_path, ratio=n / len(dataset))[0] |
|
|
|
return get_default_yolox_coco_dataset(dataset.root_dir, split_coco_ann_json_path, train=dataset.split == 'train'), None |
|
|
|
def split_dataset_det_mm(dataset: ABDataset, n, seed=0, transform=None): |
|
coco_ann_json_path = dataset.ann_json_file_path_for_split |
|
from .object_detection.yolox_data_util.api import coco_split, get_yolox_coco_dataset_with_caption |
|
split_coco_ann_json_path = coco_split(coco_ann_json_path, ratio=n / len(dataset))[0] |
|
|
|
return get_yolox_coco_dataset_with_caption(dataset.root_dir, split_coco_ann_json_path, transform=transform, train=dataset.split == 'train', classes=dataset.classes), None |