import torch import numpy as np # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') class PGDAttacker(): def __init__(self, radius, steps, step_size, random_start, norm_type, ascending=True): self.radius = radius # attack radius self.steps = steps # how many step to conduct pgd self.step_size = step_size # coefficient of PGD self.random_start = random_start self.norm_type = norm_type # which norm of your noise self.ascending = ascending # perform gradient ascending, i.e, to maximum the loss def output(self, x, model, tokens_lens, text_token): x = x + model.positional_embedding.type(model.dtype) x = x.permute(1, 0, 2) # NLD -> LND x, weight = model.transformer(x) x = x.permute(1, 0, 2) # LND -> NLD x = model.ln_final(x).type(model.dtype) x = x[torch.arange(x.shape[0]), text_token.argmax(dim=-1)] @ model.text_projection attention_weights_all = [] for i in range(len(tokens_lens)): attention_weights = weight[-1][i][min(76, tokens_lens[i])][:1+min(75, max(tokens_lens))][1:][:-1] attention_weights_all.append(attention_weights) attention_weights = torch.stack(attention_weights_all, dim=0) return x, attention_weights def perturb(self, device, m_tokens_len, bs, criterion, x, y,a_indices,encoder, tokens_lens=None, model=None, text_token=None): if self.steps==0 or self.radius==0: return x.clone() adv_x = x.clone() if self.random_start: if self.norm_type == 'l-infty': adv_x += 2 * (torch.rand_like(x) - 0.5) * self.radius else: adv_x += 2 * (torch.rand_like(x) - 0.5) * self.radius / self.steps self._clip_(adv_x, x) ''' temporarily shutdown autograd of model to improve pgd efficiency ''' # adv_x, attention_weights = self.output(adv_x, model, tokens_lens, text_token) # model.eval() encoder.eval() for pp in encoder.parameters(): pp.requires_grad = False for step in range(self.steps): adv_x_o = adv_x.clone() adv_x.requires_grad_() _y = encoder(a_indices,adv_x) loss = criterion(y.to(device), _y, m_tokens_len, bs) grad = torch.autograd.grad(loss, [adv_x])[0] with torch.no_grad(): if not self.ascending: grad.mul_(-1) if self.norm_type == 'l-infty': adv_x.add_(torch.sign(grad), alpha=self.step_size) else: if self.norm_type == 'l2': grad_norm = (grad.reshape(grad.shape[0],-1)**2).sum(dim=1).sqrt() elif self.norm_type == 'l1': grad_norm = grad.reshape(grad.shape[0],-1).abs().sum(dim=1) grad_norm = grad_norm.reshape( -1, *( [1] * (len(x.shape)-1) ) ) scaled_grad = grad / (grad_norm + 1e-10) adv_x.add_(scaled_grad, alpha=self.step_size) self._clip_(adv_x, adv_x_o) ''' reopen autograd of model after pgd ''' # decoder.trian() for pp in encoder.parameters(): pp.requires_grad = True return adv_x # , attention_weights def perturb_random(self, criterion, x, data, decoder,y,target_model,encoder=None): if self.steps==0 or self.radius==0: return x.clone() adv_x = x.clone() if self.norm_type == 'l-infty': adv_x += 2 * (torch.rand_like(x) - 0.5) * self.radius else: adv_x += 2 * (torch.rand_like(x) - 0.5) * self.radius / self.steps self._clip_(adv_x, x) return adv_x.data def perturb_iat(self, criterion, x, data, decoder,y,target_model,encoder=None): if self.steps==0 or self.radius==0: return x.clone() B = x.shape[0] L = x.shape[1] H = x.shape[2] nb_num = 8 alpha = torch.rand(B,L,nb_num,1).to(device) A_1 = x.unsqueeze(2).expand(B,L,nb_num,H) A_2 = x.unsqueeze(1).expand(B,L,L,H) rand_idx = [] for i in range(L): rand_idx.append(np.random.choice(L,nb_num,replace=False)) rand_idx = np.array(rand_idx) rand_idx = torch.tensor(rand_idx).long().reshape(1,L,1,nb_num).expand(B,L,H,nb_num).to(device) # A_2 = A_2[:,np.arange(0,L), rand_idx,:] A_2 = torch.gather(A_2.reshape(B,L,H,L),-1,rand_idx).reshape(B,L,nb_num, H) A_e = A_1 - A_2 # A_e # adv_x = (A_e * alpha).sum(dim=-1) + x.clone() adv_x = x.clone() if self.random_start: if self.norm_type == 'l-infty': adv_x += 2 * (torch.rand_like(x) - 0.5) * self.radius else: adv_x += 2 * (torch.rand_like(x) - 0.5) * self.radius / self.steps self._clip_(adv_x, x) # assert adv_x.shape[0] == 8 ''' temporarily shutdown autograd of model to improve pgd efficiency ''' # model.eval() decoder.eval() for pp in decoder.parameters(): pp.requires_grad = False adv_x = x.clone() alpha.requires_grad_() for step in range(self.steps): alpha.requires_grad_() dot_Ae_alpha = (A_e * alpha).sum(dim=-2) # print("dot_Ae_alpha:", dot_Ae_alpha.shape) adv_x.add_(torch.sign(dot_Ae_alpha), alpha=self.step_size) self._clip_(adv_x, x) if encoder is None: adv_x_input = adv_x.squeeze(-1) else: adv_x_input = adv_x _y = target_model(adv_x_input, data,decoder,encoder) loss = criterion(y.to(device), _y) grad = torch.autograd.grad(loss, [alpha],retain_graph=True)[0] # with torch.no_grad(): with torch.no_grad(): if not self.ascending: grad.mul_(-1) assert self.norm_type == 'l-infty' alpha = alpha.detach()+ grad * 0.01 ''' reopen autograd of model after pgd ''' # decoder.trian() for pp in decoder.parameters(): pp.requires_grad = True return adv_x.data def _clip_(self, adv_x, x): adv_x -= x if self.norm_type == 'l-infty': adv_x.clamp_(-self.radius, self.radius) else: if self.norm_type == 'l2': norm = (adv_x.reshape(adv_x.shape[0],-1)**2).sum(dim=1).sqrt() elif self.norm_type == 'l1': norm = adv_x.reshape(adv_x.shape[0],-1).abs().sum(dim=1) norm = norm.reshape( -1, *( [1] * (len(x.shape)-1) ) ) adv_x /= (norm + 1e-10) adv_x *= norm.clamp(max=self.radius) adv_x += x adv_x.clamp_(0, 1)