|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
from torch.utils.data import DataLoader |
|
from .NormalDataset import NormalDataset |
|
|
|
|
|
import pytorch_lightning as pl |
|
|
|
|
|
class NormalModule(pl.LightningDataModule): |
|
def __init__(self, cfg): |
|
super(NormalModule, self).__init__() |
|
self.cfg = cfg |
|
self.overfit = self.cfg.overfit |
|
|
|
if self.overfit: |
|
self.batch_size = 1 |
|
else: |
|
self.batch_size = self.cfg.batch_size |
|
|
|
self.data_size = {} |
|
|
|
def prepare_data(self): |
|
|
|
pass |
|
|
|
@staticmethod |
|
def worker_init_fn(worker_id): |
|
np.random.seed(np.random.get_state()[1][0] + worker_id) |
|
|
|
def setup(self, stage): |
|
|
|
if stage == 'fit' or stage is None: |
|
self.train_dataset = NormalDataset(cfg=self.cfg, split="train") |
|
self.val_dataset = NormalDataset(cfg=self.cfg, split="val") |
|
self.data_size = { |
|
'train': len(self.train_dataset), |
|
'val': len(self.val_dataset) |
|
} |
|
|
|
if stage == 'test' or stage is None: |
|
self.test_dataset = NormalDataset(cfg=self.cfg, split="test") |
|
|
|
def train_dataloader(self): |
|
|
|
train_data_loader = DataLoader(self.train_dataset, |
|
batch_size=self.batch_size, |
|
shuffle=not self.overfit, |
|
num_workers=self.cfg.num_threads, |
|
pin_memory=True, |
|
worker_init_fn=self.worker_init_fn) |
|
|
|
return train_data_loader |
|
|
|
def val_dataloader(self): |
|
|
|
if self.overfit: |
|
current_dataset = self.train_dataset |
|
else: |
|
current_dataset = self.val_dataset |
|
|
|
val_data_loader = DataLoader(current_dataset, |
|
batch_size=self.batch_size, |
|
shuffle=False, |
|
num_workers=self.cfg.num_threads, |
|
pin_memory=True) |
|
|
|
return val_data_loader |
|
|
|
def test_dataloader(self): |
|
|
|
test_data_loader = DataLoader(self.test_dataset, |
|
batch_size=1, |
|
shuffle=False, |
|
num_workers=self.cfg.num_threads, |
|
pin_memory=True) |
|
|
|
return test_data_loader |
|
|