import numpy as np import torch import torch.nn as nn from torch.nn import functional as F try: import torch.distributed.nn from torch import distributed as dist has_distributed = True except ImportError: has_distributed = False try: import horovod.torch as hvd except ImportError: hvd = None def gather_features( image_features, text_features, local_loss=False, gather_with_grad=False, rank=0, world_size=1, use_horovod=False ): assert has_distributed, 'torch.distributed did not import correctly, please use a PyTorch version with support.' if use_horovod: assert hvd is not None, 'Please install horovod' if gather_with_grad: all_image_features = hvd.allgather(image_features) all_text_features = hvd.allgather(text_features) else: with torch.no_grad(): all_image_features = hvd.allgather(image_features) all_text_features = hvd.allgather(text_features) if not local_loss: # ensure grads for local rank when all_* features don't have a gradient gathered_image_features = list(all_image_features.chunk(world_size, dim=0)) gathered_text_features = list(all_text_features.chunk(world_size, dim=0)) gathered_image_features[rank] = image_features gathered_text_features[rank] = text_features all_image_features = torch.cat(gathered_image_features, dim=0) all_text_features = torch.cat(gathered_text_features, dim=0) else: # We gather tensors from all gpus if gather_with_grad: all_image_features = torch.cat(torch.distributed.nn.all_gather(image_features), dim=0) all_text_features = torch.cat(torch.distributed.nn.all_gather(text_features), dim=0) else: gathered_image_features = [torch.zeros_like(image_features) for _ in range(world_size)] gathered_text_features = [torch.zeros_like(text_features) for _ in range(world_size)] dist.all_gather(gathered_image_features, image_features) dist.all_gather(gathered_text_features, text_features) if not local_loss: # ensure grads for local rank when all_* features don't have a gradient gathered_image_features[rank] = image_features gathered_text_features[rank] = text_features all_image_features = torch.cat(gathered_image_features, dim=0) all_text_features = torch.cat(gathered_text_features, dim=0) return all_image_features, all_text_features class ClipLoss(nn.Module): def __init__( self, local_loss=False, gather_with_grad=False, cache_labels=False, rank=0, world_size=1, use_horovod=False, ): super().__init__() self.local_loss = local_loss self.gather_with_grad = gather_with_grad self.cache_labels = cache_labels self.rank = rank self.world_size = world_size self.use_horovod = use_horovod # cache state self.prev_num_logits = 0 self.labels = {} def forward(self, image_features, text_features): logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07)) device = image_features.device if self.world_size > 1: all_image_features, all_text_features = gather_features( image_features, text_features, self.local_loss, self.gather_with_grad, self.rank, self.world_size, self.use_horovod) if self.local_loss: logits_per_image = logit_scale * image_features @ all_text_features.T logits_per_text = logit_scale * text_features @ all_image_features.T else: logits_per_image = logit_scale * all_image_features @ all_text_features.T logits_per_text = logits_per_image.T else: logits_per_image = logit_scale * image_features @ text_features.T logits_per_text = logit_scale * text_features @ image_features.T # calculated ground-truth and cache if enabled num_logits = logits_per_image.shape[0] labels = torch.eye(num_logits, device=device, dtype=torch.float) pred_1 = F.log_softmax(logits_per_image,dim=-1) pred_2 = F.log_softmax(logits_per_text,dim=-1) loss_a = F.kl_div(pred_1, labels,reduction = 'sum')/num_logits loss_b = F.kl_div(pred_2, labels,reduction = 'sum')/num_logits total_loss = (loss_a + loss_b)/2 return total_loss class AsymmetricLoss(nn.Module): def __init__(self, gamma_neg=4, gamma_pos=1, clip=0.05, eps=1e-8, disable_torch_grad_focal_loss=True): super(AsymmetricLoss, self).__init__() self.gamma_neg = gamma_neg self.gamma_pos = gamma_pos self.clip = clip self.disable_torch_grad_focal_loss = disable_torch_grad_focal_loss self.eps = eps def forward(self, x, y, use_weight = False): """" Parameters ---------- x: input logits y: targets (multi-label binarized vector) """ # Calculating Probabilities x_sigmoid = torch.sigmoid(x) xs_pos = x_sigmoid xs_neg = 1 - x_sigmoid # Asymmetric Clipping if self.clip is not None and self.clip > 0: xs_neg = (xs_neg + self.clip).clamp(max=1) # Basic CE calculation los_pos = y * torch.log(xs_pos.clamp(min=self.eps)) los_neg = (1 - y) * torch.log(xs_neg.clamp(min=self.eps)) loss = los_pos + los_neg # Asymmetric Focusing if self.gamma_neg > 0 or self.gamma_pos > 0: if self.disable_torch_grad_focal_loss: torch.set_grad_enabled(False) pt0 = xs_pos * y pt1 = xs_neg * (1 - y) # pt = p if t > 0 else 1-p pt = pt0 + pt1 one_sided_gamma = self.gamma_pos * y + self.gamma_neg * (1 - y) one_sided_w = torch.pow(1 - pt, one_sided_gamma) if self.disable_torch_grad_focal_loss: torch.set_grad_enabled(True) loss *= one_sided_w if use_weight: return loss return -loss.sum() class RalSingleLoss(nn.Module): ''' This loss is intended for single-label classification problems ''' def __init__(self, gamma_pos=0, gamma_neg=4, eps: float = 0.1, epsilon_pos_pow = -2.5, reduction='mean'): super(RalSingleLoss, self).__init__() self.eps = eps self.logsoftmax = nn.LogSoftmax(dim=-1) self.targets_classes = [] self.gamma_pos = gamma_pos self.gamma_neg = gamma_neg self.reduction = reduction self.epsilon_pos = 1.0 self.epsilon_neg = 0.0 self.epsilon_pos_pow = epsilon_pos_pow self.lamb = 1.5 def forward(self, inputs, target): ''' "input" dimensions: - (batch_size,number_classes) "target" dimensions: - (batch_size) ''' num_classes = inputs.size()[-1] log_preds = self.logsoftmax(inputs) self.targets_classes = torch.zeros_like(inputs).scatter_(1, target.long().unsqueeze(1), 1) # ASL weights targets = self.targets_classes anti_targets = 1 - targets xs_pos = torch.exp(log_preds) xs_neg = 1 - xs_pos xs_pos = torch.exp(log_preds)* (torch.log(xs_pos.clamp(min=self.eps)) + self.epsilon_pos * (1 - xs_pos.clamp(min=self.eps)) + self.epsilon_pos_pow * 0.5 * torch.pow(1 - xs_pos.clamp(min=self.eps), 2) ) * torch.log(xs_pos) xs_neg = (1 - xs_pos) * (torch.log(xs_neg.clamp(min=self.eps)) + self.epsilon_neg * (xs_neg.clamp(min=self.eps)) ) * -(self.lamb - xs_neg) * xs_neg ** 2 asymmetric_w = torch.pow(1 - xs_pos - xs_neg, self.gamma_pos * targets + self.gamma_neg * anti_targets) log_preds = log_preds * asymmetric_w if self.eps > 0: # label smoothing self.targets_classes = self.targets_classes.mul(1 - self.eps).add(self.eps / num_classes) # loss calculation loss = - self.targets_classes.mul(log_preds) loss = loss.sum(dim=-1) if self.reduction == 'mean': loss = loss.mean() return loss class Ralloss(nn.Module): def __init__(self, gamma_neg=4, gamma_pos=0, clip=0.05, eps=1e-8, lamb=1.5, epsilon_neg=0.0, epsilon_pos=1.0, epsilon_pos_pow=-2.5, disable_torch_grad_focal_loss=False): super(Ralloss, self).__init__() self.gamma_neg = gamma_neg self.gamma_pos = gamma_pos self.clip = clip self.disable_torch_grad_focal_loss = disable_torch_grad_focal_loss self.eps = eps # parameters of Taylor expansion polynomials self.epsilon_pos = epsilon_pos self.epsilon_neg = epsilon_neg self.epsilon_pos_pow = epsilon_pos_pow self.margin = 1.0 self.lamb = lamb def forward(self, x, y, use_weight=False): """" x: input logits with size (batch_size, number of labels). y: binarized multi-label targets with size (batch_size, number of labels). """ # Calculating Probabilities x_sigmoid = torch.sigmoid(x) xs_pos = x_sigmoid xs_neg = 1 - x_sigmoid # Asymmetric Clipping if self.clip is not None and self.clip > 0: xs_neg = (xs_neg + self.clip).clamp(max=1) # Basic Taylor expansion polynomials los_pos = y * (torch.log(xs_pos.clamp(min=self.eps)) + self.epsilon_pos * (1 - xs_pos.clamp(min=self.eps)) + self.epsilon_pos_pow * 0.5 * torch.pow(1 - xs_pos.clamp(min=self.eps), 2)) los_neg = (1 - y) * (torch.log(xs_neg.clamp(min=self.eps)) + self.epsilon_neg * (xs_neg.clamp(min=self.eps)) ) * (self.lamb - x_sigmoid) * x_sigmoid ** 2 * (self.lamb - xs_neg) loss = los_pos + los_neg # Asymmetric Focusing if self.gamma_neg > 0 or self.gamma_pos > 0: if self.disable_torch_grad_focal_loss: torch.set_grad_enabled(False) pt0 = xs_pos * y pt1 = xs_neg * (1 - y) # pt = p if t > 0 else 1-p pt = pt0 + pt1 one_sided_gamma = self.gamma_pos * y + self.gamma_neg * (1 - y) one_sided_w = torch.pow(1 - pt, one_sided_gamma) if self.disable_torch_grad_focal_loss: torch.set_grad_enabled(True) loss *= one_sided_w if use_weight: return loss return -loss.sum()