# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from typing import Dict, Optional, Tuple import torch import torch.nn.functional as F from fairseq import utils from fairseq.incremental_decoding_utils import with_incremental_state from fairseq.modules.fairseq_dropout import FairseqDropout from torch import Tensor, nn try: from fairseq.model_parallel.megatron.mpu import ( get_cuda_rng_tracker, get_model_parallel_world_size, ColumnParallelLinear, RowParallelLinear, ) has_megatron_submodule = True except (ImportError, ModuleNotFoundError): has_megatron_submodule = False @with_incremental_state class ModelParallelMultiheadAttention(nn.Module): """Model parallel Multi-headed attention. This performs the Multi-headed attention over multiple gpus. See "Megatron-LM: https://arxiv.org/pdf/1909.08053.pdf" for more details. """ def __init__( self, embed_dim, num_heads, kdim=None, vdim=None, dropout=0.0, bias=True, self_attention=False, encoder_decoder_attention=False, ): super().__init__() if not has_megatron_submodule: raise ImportError( "\n\nPlease install the megatron submodule:" "\n\n git submodule update --init " "fairseq/model_parallel/megatron" ) self.embed_dim = embed_dim self.kdim = kdim if kdim is not None else embed_dim self.vdim = vdim if vdim is not None else embed_dim self.qkv_same_dim = self.kdim == embed_dim and self.vdim == embed_dim self.model_parallel_size = get_model_parallel_world_size() self.num_heads_partition = num_heads // self.model_parallel_size assert ( self.num_heads_partition * self.model_parallel_size == num_heads ), "Number of heads must be divisible by model parallel size" self.dropout_module = FairseqDropout( dropout, module_name=self.__class__.__name__ ) self.head_dim = embed_dim // num_heads assert ( self.head_dim * num_heads == self.embed_dim ), "embed_dim must be divisible by num_heads" self.scaling = self.head_dim ** -0.5 self.self_attention = self_attention self.encoder_decoder_attention = encoder_decoder_attention assert ( not self.self_attention or self.qkv_same_dim ), "Self-attention requires query, key and value to be of the same size" self.k_proj = ColumnParallelLinear( self.kdim, embed_dim, bias=bias, gather_output=False ) self.v_proj = ColumnParallelLinear( self.vdim, embed_dim, bias=bias, gather_output=False ) self.q_proj = ColumnParallelLinear( embed_dim, embed_dim, bias=bias, gather_output=False ) self.out_proj = RowParallelLinear( embed_dim, embed_dim, bias=bias, input_is_parallel=True ) def forward( self, query, key: Optional[Tensor], value: Optional[Tensor], key_padding_mask: Optional[Tensor] = None, incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]] = None, static_kv: bool = False, attn_mask: Optional[Tensor] = None, **unused_kwargs, ) -> Tuple[Tensor, Optional[Tensor]]: """Input shape: Time x Batch x Channel Args: key_padding_mask (ByteTensor, optional): mask to exclude keys that are pads, of shape `(batch, src_len)`, where padding elements are indicated by 1s. attn_mask (ByteTensor, optional): typically used to implement causal attention, where the mask prevents the attention from looking forward in time (default: None). """ tgt_len, bsz, embed_dim = query.size() assert embed_dim == self.embed_dim assert list(query.size()) == [tgt_len, bsz, embed_dim] is_tpu = query.device.type == "xla" if incremental_state is not None: saved_state = self._get_input_buffer(incremental_state) if saved_state is not None and "prev_key" in saved_state: # previous time steps are cached - no need to recompute # key and value if they are static if static_kv: assert self.encoder_decoder_attention and not self.self_attention key = value = None else: saved_state = None if self.self_attention: q = self.q_proj(query) k = self.k_proj(query) v = self.v_proj(query) elif self.encoder_decoder_attention: # encoder-decoder attention q = self.q_proj(query) if key is None: assert value is None k = v = None else: k = self.k_proj(key) v = self.v_proj(key) else: assert key is not None and value is not None q = self.q_proj(query) k = self.k_proj(key) v = self.v_proj(value) q *= self.scaling q = ( q.contiguous() .view(tgt_len, bsz * self.num_heads_partition, self.head_dim) .transpose(0, 1) ) if k is not None: k = ( k.contiguous() .view(-1, bsz * self.num_heads_partition, self.head_dim) .transpose(0, 1) ) if v is not None: v = ( v.contiguous() .view(-1, bsz * self.num_heads_partition, self.head_dim) .transpose(0, 1) ) if saved_state is not None: # saved states are stored with shape (bsz, num_heads_partition, seq_len, head_dim) if "prev_key" in saved_state: _prev_key = saved_state["prev_key"] assert _prev_key is not None prev_key = _prev_key.view( bsz * self.num_heads_partition, -1, self.head_dim ) if static_kv: k = prev_key else: assert k is not None k = torch.cat([prev_key, k], dim=1) if "prev_value" in saved_state: _prev_value = saved_state["prev_value"] assert _prev_value is not None prev_value = _prev_value.view( bsz * self.num_heads_partition, -1, self.head_dim ) if static_kv: v = prev_value else: assert v is not None v = torch.cat([prev_value, v], dim=1) prev_key_padding_mask: Optional[Tensor] = None if "prev_key_padding_mask" in saved_state: prev_key_padding_mask = saved_state["prev_key_padding_mask"] assert k is not None and v is not None key_padding_mask = ( ModelParallelMultiheadAttention._append_prev_key_padding_mask( key_padding_mask=key_padding_mask, prev_key_padding_mask=prev_key_padding_mask, batch_size=bsz, src_len=k.size(1), static_kv=static_kv, ) ) saved_state["prev_key"] = k.view( bsz, self.num_heads_partition, -1, self.head_dim ) saved_state["prev_value"] = v.view( bsz, self.num_heads_partition, -1, self.head_dim ) saved_state["prev_key_padding_mask"] = key_padding_mask # In this branch incremental_state is never None assert incremental_state is not None incremental_state = self._set_input_buffer(incremental_state, saved_state) assert k is not None src_len = k.size(1) # This is part of a workaround to get around fork/join parallelism # not supporting Optional types. if key_padding_mask is not None and key_padding_mask.dim() == 0: key_padding_mask = None if key_padding_mask is not None: assert key_padding_mask.size(0) == bsz assert key_padding_mask.size(1) == src_len attn_weights = torch.bmm(q, k.transpose(1, 2)) assert list(attn_weights.size()) == [ bsz * self.num_heads_partition, tgt_len, src_len, ] if attn_mask is not None: attn_mask = attn_mask.unsqueeze(0) attn_weights += attn_mask if key_padding_mask is not None: # don't attend to padding symbols attn_weights = attn_weights.view( bsz, self.num_heads_partition, tgt_len, src_len ) if not is_tpu: attn_weights = attn_weights.masked_fill( key_padding_mask.unsqueeze(1).unsqueeze(2).to(torch.bool), float("-inf"), ) else: attn_weights = attn_weights.transpose(0, 2) attn_weights = attn_weights.masked_fill(key_padding_mask, float("-inf")) attn_weights = attn_weights.transpose(0, 2) attn_weights = attn_weights.view( bsz * self.num_heads_partition, tgt_len, src_len ) attn_weights_float = utils.softmax(attn_weights, dim=-1) attn_weights = attn_weights_float.type_as(attn_weights) with get_cuda_rng_tracker().fork(): attn_probs = self.dropout_module(attn_weights) assert v is not None attn = torch.bmm(attn_probs, v) assert list(attn.size()) == [ bsz * self.num_heads_partition, tgt_len, self.head_dim, ] embed_dim_partition = embed_dim // self.model_parallel_size attn = attn.transpose(0, 1).contiguous().view(tgt_len, bsz, embed_dim_partition) attn = self.out_proj(attn) # return attn_weights None to keep the return type same as single gpu multihead attention # This will be deprecated. attn_weights: Optional[Tensor] = None return attn, attn_weights @staticmethod def _append_prev_key_padding_mask( key_padding_mask: Optional[Tensor], prev_key_padding_mask: Optional[Tensor], batch_size: int, src_len: int, static_kv: bool, ) -> Optional[Tensor]: # saved key padding masks have shape (bsz, seq_len) if prev_key_padding_mask is not None and static_kv: new_key_padding_mask = prev_key_padding_mask elif prev_key_padding_mask is not None and key_padding_mask is not None: new_key_padding_mask = torch.cat( [prev_key_padding_mask.float(), key_padding_mask.float()], dim=1 ) # During incremental decoding, as the padding token enters and # leaves the frame, there will be a time when prev or current # is None elif prev_key_padding_mask is not None: filler = torch.zeros(batch_size, src_len - prev_key_padding_mask.size(1)) if prev_key_padding_mask.is_cuda: filler = filler.cuda() new_key_padding_mask = torch.cat( [prev_key_padding_mask.float(), filler.float()], dim=1 ) elif key_padding_mask is not None: filler = torch.zeros(batch_size, src_len - key_padding_mask.size(1)) if key_padding_mask.is_cuda: filler = filler.cuda() new_key_padding_mask = torch.cat( [filler.float(), key_padding_mask.float()], dim=1 ) else: new_key_padding_mask = prev_key_padding_mask return new_key_padding_mask def reorder_incremental_state( self, incremental_state: Dict[str, Dict[str, Optional[Tensor]]], new_order ): """Reorder buffered internal state (for incremental generation).""" input_buffer = self._get_input_buffer(incremental_state) if input_buffer is not None: for k in input_buffer.keys(): if input_buffer[k] is not None: input_buffer[k] = input_buffer[k].index_select(0, new_order) incremental_state = self._set_input_buffer(incremental_state, input_buffer) return incremental_state def _get_input_buffer( self, incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]] ) -> Dict[str, Optional[Tensor]]: result = self.get_incremental_state(incremental_state, "attn_state") if result is not None: return result else: empty_result: Dict[str, Optional[Tensor]] = {} return empty_result def _set_input_buffer( self, incremental_state: Dict[str, Dict[str, Optional[Tensor]]], buffer: Dict[str, Optional[Tensor]], ): return self.set_incremental_state(incremental_state, "attn_state", buffer)