# Ultralytics YOLO 🚀, AGPL-3.0 license import os import random from pathlib import Path import numpy as np import torch from PIL import Image from torch.utils.data import dataloader, distributed from ultralytics.data.dataset import GroundingDataset, YOLODataset, YOLOMultiModalDataset from ultralytics.data.loaders import ( LOADERS, LoadImagesAndVideos, LoadPilAndNumpy, LoadScreenshots, LoadStreams, LoadTensor, SourceTypes, autocast_list, ) from ultralytics.data.utils import IMG_FORMATS, PIN_MEMORY, VID_FORMATS from ultralytics.utils import RANK, colorstr from ultralytics.utils.checks import check_file class InfiniteDataLoader(dataloader.DataLoader): """ Dataloader that reuses workers. Uses same syntax as vanilla DataLoader. """ def __init__(self, *args, **kwargs): """Dataloader that infinitely recycles workers, inherits from DataLoader.""" super().__init__(*args, **kwargs) object.__setattr__(self, "batch_sampler", _RepeatSampler(self.batch_sampler)) self.iterator = super().__iter__() def __len__(self): """Returns the length of the batch sampler's sampler.""" return len(self.batch_sampler.sampler) def __iter__(self): """Creates a sampler that repeats indefinitely.""" for _ in range(len(self)): yield next(self.iterator) def reset(self): """ Reset iterator. This is useful when we want to modify settings of dataset while training. """ self.iterator = self._get_iterator() class _RepeatSampler: """ Sampler that repeats forever. Args: sampler (Dataset.sampler): The sampler to repeat. """ def __init__(self, sampler): """Initializes an object that repeats a given sampler indefinitely.""" self.sampler = sampler def __iter__(self): """Iterates over the 'sampler' and yields its contents.""" while True: yield from iter(self.sampler) def seed_worker(worker_id): # noqa """Set dataloader worker seed https://pytorch.org/docs/stable/notes/randomness.html#dataloader.""" worker_seed = torch.initial_seed() % 2**32 np.random.seed(worker_seed) random.seed(worker_seed) def build_yolo_dataset(cfg, img_path, batch, data, mode="train", rect=False, stride=32, multi_modal=False): """Build YOLO Dataset.""" dataset = YOLOMultiModalDataset if multi_modal else YOLODataset return dataset( img_path=img_path, imgsz=cfg.imgsz, batch_size=batch, augment=mode == "train", # augmentation hyp=cfg, # TODO: probably add a get_hyps_from_cfg function rect=cfg.rect or rect, # rectangular batches cache=cfg.cache or None, single_cls=cfg.single_cls or False, stride=int(stride), pad=0.0 if mode == "train" else 0.5, prefix=colorstr(f"{mode}: "), task=cfg.task, classes=cfg.classes, data=data, fraction=cfg.fraction if mode == "train" else 1.0, ) def build_grounding(cfg, img_path, json_file, batch, mode="train", rect=False, stride=32): """Build YOLO Dataset.""" return GroundingDataset( img_path=img_path, json_file=json_file, imgsz=cfg.imgsz, batch_size=batch, augment=mode == "train", # augmentation hyp=cfg, # TODO: probably add a get_hyps_from_cfg function rect=cfg.rect or rect, # rectangular batches cache=cfg.cache or None, single_cls=cfg.single_cls or False, stride=int(stride), pad=0.0 if mode == "train" else 0.5, prefix=colorstr(f"{mode}: "), task=cfg.task, classes=cfg.classes, fraction=cfg.fraction if mode == "train" else 1.0, ) def build_dataloader(dataset, batch, workers, shuffle=True, rank=-1): """Return an InfiniteDataLoader or DataLoader for training or validation set.""" batch = min(batch, len(dataset)) nd = torch.cuda.device_count() # number of CUDA devices nw = min(os.cpu_count() // max(nd, 1), workers) # number of workers sampler = None if rank == -1 else distributed.DistributedSampler(dataset, shuffle=shuffle) generator = torch.Generator() generator.manual_seed(6148914691236517205 + RANK) return InfiniteDataLoader( dataset=dataset, batch_size=batch, shuffle=shuffle and sampler is None, num_workers=nw, sampler=sampler, pin_memory=PIN_MEMORY, collate_fn=getattr(dataset, "collate_fn", None), worker_init_fn=seed_worker, generator=generator, ) def check_source(source): """Check source type and return corresponding flag values.""" webcam, screenshot, from_img, in_memory, tensor = False, False, False, False, False if isinstance(source, (str, int, Path)): # int for local usb camera source = str(source) is_file = Path(source).suffix[1:] in (IMG_FORMATS | VID_FORMATS) is_url = source.lower().startswith(("https://", "http://", "rtsp://", "rtmp://", "tcp://")) webcam = source.isnumeric() or source.endswith(".streams") or (is_url and not is_file) screenshot = source.lower() == "screen" if is_url and is_file: source = check_file(source) # download elif isinstance(source, LOADERS): in_memory = True elif isinstance(source, (list, tuple)): source = autocast_list(source) # convert all list elements to PIL or np arrays from_img = True elif isinstance(source, (Image.Image, np.ndarray)): from_img = True elif isinstance(source, torch.Tensor): tensor = True else: raise TypeError("Unsupported image type. For supported types see https://docs.ultralytics.com/modes/predict") return source, webcam, screenshot, from_img, in_memory, tensor def load_inference_source(source=None, batch=1, vid_stride=1, buffer=False): """ Loads an inference source for object detection and applies necessary transformations. Args: source (str, Path, Tensor, PIL.Image, np.ndarray): The input source for inference. batch (int, optional): Batch size for dataloaders. Default is 1. vid_stride (int, optional): The frame interval for video sources. Default is 1. buffer (bool, optional): Determined whether stream frames will be buffered. Default is False. Returns: dataset (Dataset): A dataset object for the specified input source. """ source, stream, screenshot, from_img, in_memory, tensor = check_source(source) source_type = source.source_type if in_memory else SourceTypes(stream, screenshot, from_img, tensor) # Dataloader if tensor: dataset = LoadTensor(source) elif in_memory: dataset = source elif stream: dataset = LoadStreams(source, vid_stride=vid_stride, buffer=buffer) elif screenshot: dataset = LoadScreenshots(source) elif from_img: dataset = LoadPilAndNumpy(source) else: dataset = LoadImagesAndVideos(source, batch=batch, vid_stride=vid_stride) # Attach source types to the dataset setattr(dataset, "source_type", source_type) return dataset