import math import torch import numpy as np import torch.nn as nn import torch.nn.functional as F from typing import List, Optional, Tuple from .configuration import AVHubertConfig from fairseq.utils import index_put from fairseq.modules import LayerNorm, SamePad from fairseq.models.wav2vec.wav2vec2 import TransformerSentenceEncoderLayer from fairseq.modules.transformer_sentence_encoder import init_bert_params class TransformerEncoder(nn.Module): def __init__(self, config: AVHubertConfig) -> None: super().__init__() self.dropout = config.dropout self.embedding_dim = config.encoder_embed_dim self.pos_conv = nn.Conv1d( self.embedding_dim, self.embedding_dim, kernel_size=config.conv_pos, padding=config.conv_pos // 2, groups=config.conv_pos_groups, ) dropout = 0 std = math.sqrt((4 * (1.0 - dropout)) / (config.conv_pos * self.embedding_dim)) nn.init.normal_(self.pos_conv.weight, mean=0, std=std) nn.init.constant_(self.pos_conv.bias, 0) self.pos_conv = nn.utils.weight_norm( self.pos_conv, name="weight", dim=2 ) self.pos_conv = nn.Sequential(self.pos_conv, SamePad(config.conv_pos), nn.GELU()) self.layers = nn.ModuleList( [ TransformerSentenceEncoderLayer( embedding_dim=self.embedding_dim, ffn_embedding_dim=config.encoder_ffn_embed_dim, num_attention_heads=config.encoder_attention_heads, dropout=self.dropout, attention_dropout=config.attention_dropout, activation_dropout=config.activation_dropout, activation_fn=config.activation_fn, layer_norm_first=config.layer_norm_first, ) for _ in range(config.encoder_layers) ] ) self.layer_norm_first = config.layer_norm_first self.layer_norm = LayerNorm(self.embedding_dim) self.layerdrop = config.encoder_layerdrop self.apply(init_bert_params) def forward( self, x: torch.Tensor, padding_mask: Optional[torch.Tensor] = None, layer: Optional[int] = None, ) -> Tuple[torch.Tensor, List[Tuple[torch.Tensor, torch.Tensor]]]: x, layer_results = self.extract_features(x, padding_mask, layer) if self.layer_norm_first and layer is None: x = self.layer_norm(x) return x, layer_results def extract_features( self, x: torch.Tensor, padding_mask: Optional[torch.Tensor] = None, tgt_layer: Optional[int] = None, ) -> Tuple[torch.Tensor, List[Tuple[torch.Tensor, torch.Tensor]]]: if padding_mask is not None: x = index_put(x, padding_mask, 0) x_conv = self.pos_conv(x.transpose(1, 2)) x_conv = x_conv.transpose(1, 2) x = x + x_conv if not self.layer_norm_first: x = self.layer_norm(x) x = F.dropout(x, p=self.dropout, training=self.training) # B x T x C -> T x B x C x = x.transpose(0, 1) layer_results = [] r = None for i, layer in enumerate(self.layers): dropout_probability = np.random.random() if not self.training or (dropout_probability > self.layerdrop): x, z = layer(x, self_attn_padding_mask=padding_mask, need_weights=False) if tgt_layer is not None: layer_results.append((x, z)) if i == tgt_layer: r = x break if r is not None: x = r # T x B x C -> B x T x C x = x.transpose(0, 1) return x, layer_results