|
import os |
|
from typing import Optional, Tuple |
|
import pandas as pd |
|
from skimage import io |
|
|
|
import numpy as np |
|
|
|
import torch |
|
from pytorch_lightning import LightningDataModule |
|
from torch.utils.data import DataLoader, Dataset, random_split |
|
from torchvision.transforms import transforms |
|
|
|
|
|
class FocusDataSet(Dataset): |
|
"""Dataset for z-stacked images of neglected tropical diseaeses.""" |
|
|
|
def __init__(self, csv_file, root_dir, transform=None, in_memory=True): |
|
"""Initialize focus satck dataset. |
|
|
|
Args: |
|
csv_file (string): Path to the csv file with annotations. |
|
root_dir (string): Directory with all the images. |
|
transform (callable, optional): Optional transform to be applied |
|
on a sample. |
|
""" |
|
self.metadata = pd.read_csv(csv_file) |
|
self.in_memory = in_memory |
|
self.col_index_path = self.metadata.columns.get_loc("image_path") |
|
self.col_index_focus = self.metadata.columns.get_loc("focus_value") |
|
self.root_dir = root_dir |
|
self.transform = transform |
|
|
|
self.images = [] |
|
if self.in_memory: |
|
self.images = np.array( |
|
list(map(self._load_img, self.metadata["image_path"].tolist())) |
|
) |
|
|
|
def _load_img(self, img_path): |
|
path = os.path.join(self.root_dir, img_path) |
|
img = io.imread(path) |
|
return img |
|
|
|
def __len__(self) -> int: |
|
"""Get the length of the dataset. |
|
|
|
Returns: |
|
int: the length |
|
""" |
|
return len(self.metadata) |
|
|
|
def __getitem__(self, idx): |
|
"""Get one items from the dataset. |
|
|
|
Args: |
|
idx (int) The index of the sample that is to be retrieved |
|
|
|
Returns: |
|
Item/Items which is a dictionary containing "image" and "focus_value" |
|
""" |
|
if torch.is_tensor(idx): |
|
idx = idx.tolist() |
|
|
|
if self.in_memory: |
|
image = self.images[idx] |
|
else: |
|
image = self._load_img(self.metadata.iloc[idx, self.col_index_path]) |
|
|
|
if self.transform: |
|
image = self.transform(image) |
|
|
|
focus_value = torch.from_numpy( |
|
np.asarray(self.metadata.iloc[idx, self.col_index_focus]) |
|
).float() |
|
|
|
sample = {"image": image, "focus_value": focus_value} |
|
|
|
return sample |
|
|
|
|
|
class FocusDataModule(LightningDataModule): |
|
""" |
|
LightningDataModule for FocusStack dataset. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
data_dir: str = "data/", |
|
csv_file: str = "data/metadata.csv", |
|
train_val_test_split_percentage: Tuple[int, int, int] = (0.75, 0.15, 0.15), |
|
batch_size: int = 64, |
|
num_workers: int = 0, |
|
pin_memory: bool = False, |
|
in_memory: bool = True, |
|
): |
|
super().__init__() |
|
|
|
|
|
self.save_hyperparameters(logger=False) |
|
|
|
|
|
self.transforms = transforms.Compose( |
|
[transforms.ToTensor(), transforms.ConvertImageDtype(torch.float)] |
|
) |
|
|
|
self.data_train: Optional[Dataset] = None |
|
self.data_val: Optional[Dataset] = None |
|
self.data_test: Optional[Dataset] = None |
|
self.in_memory = in_memory |
|
|
|
def prepare_data(self): |
|
"""This method is not implemented as of yet. |
|
|
|
Download data if needed. This method is called only from a single GPU. |
|
Do not use it to assign state (self.x = y). |
|
""" |
|
pass |
|
|
|
def setup(self, stage: Optional[str] = None): |
|
"""Load data. Set variables: `self.data_train`, `self.data_val`, `self.data_test`. |
|
This method is called by lightning twice for `trainer.fit()` and `trainer.test()`, so be careful if you do a random split! |
|
The `stage` can be used to differentiate whether it's called before trainer.fit()` or `trainer.test()`.""" |
|
|
|
|
|
if not self.data_train and not self.data_val and not self.data_test: |
|
dataset = FocusDataSet( |
|
self.hparams.csv_file, |
|
self.hparams.data_dir, |
|
transform=self.transforms, |
|
in_memory=self.in_memory, |
|
) |
|
train_length = int( |
|
len(dataset) * self.hparams.train_val_test_split_percentage[0] |
|
) |
|
val_length = int( |
|
len(dataset) * self.hparams.train_val_test_split_percentage[1] |
|
) |
|
test_length = len(dataset) - val_length - train_length |
|
|
|
self.data_train, self.data_val, self.data_test = random_split( |
|
dataset=dataset, |
|
lengths=(train_length, test_length, val_length), |
|
generator=torch.Generator().manual_seed(42), |
|
) |
|
|
|
def train_dataloader(self): |
|
return DataLoader( |
|
dataset=self.data_train, |
|
batch_size=self.hparams.batch_size, |
|
num_workers=self.hparams.num_workers, |
|
pin_memory=self.hparams.pin_memory, |
|
shuffle=True, |
|
) |
|
|
|
def val_dataloader(self): |
|
return DataLoader( |
|
dataset=self.data_val, |
|
batch_size=self.hparams.batch_size, |
|
num_workers=self.hparams.num_workers, |
|
pin_memory=self.hparams.pin_memory, |
|
shuffle=False, |
|
) |
|
|
|
def test_dataloader(self): |
|
return DataLoader( |
|
dataset=self.data_test, |
|
batch_size=self.hparams.batch_size, |
|
num_workers=self.hparams.num_workers, |
|
pin_memory=self.hparams.pin_memory, |
|
shuffle=False, |
|
) |
|
|