|
|
|
|
|
|
|
import math |
|
import copy |
|
import torch |
|
import torch.nn as nn |
|
import numpy as np |
|
|
|
|
|
|
|
def gelu(x): |
|
"""Implementation of the gelu activation function. |
|
For information: OpenAI GPT's gelu is slightly different (and gives slightly different results): |
|
0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3)))) |
|
Also see https://arxiv.org/abs/1606.08415 |
|
""" |
|
return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0))) |
|
|
|
|
|
|
|
try: |
|
from apex.normalization.fused_layer_norm import FusedLayerNorm as BertLayerNorm |
|
except ImportError: |
|
|
|
class BertLayerNorm(nn.Module): |
|
def __init__(self, hidden_size, eps=1e-12): |
|
"""Construct a layernorm module in the TF style (epsilon inside the square root). |
|
""" |
|
super(BertLayerNorm, self).__init__() |
|
self.weight = nn.Parameter(torch.ones(hidden_size)) |
|
self.bias = nn.Parameter(torch.zeros(hidden_size)) |
|
self.variance_epsilon = eps |
|
|
|
def forward(self, x): |
|
u = x.mean(-1, keepdim=True) |
|
s = (x - u).pow(2).mean(-1, keepdim=True) |
|
x = (x - u) / torch.sqrt(s + self.variance_epsilon) |
|
return self.weight * x + self.bias |
|
|
|
|
|
class BertConfig(object): |
|
def __init__(self, |
|
vocab_size, |
|
hidden_size=768, |
|
num_hidden_layers=12, |
|
num_attention_heads=12, |
|
intermediate_size=3072, |
|
hidden_act="gelu", |
|
hidden_dropout_prob=0.1, |
|
max_position_embeddings=512, |
|
attention_probs_dropout_prob=0.1, |
|
type_vocab_size=2): |
|
self.vocab_size = vocab_size |
|
self.hidden_size = hidden_size |
|
self.num_hidden_layers = num_hidden_layers |
|
self.num_attention_heads = num_attention_heads |
|
self.hidden_act = hidden_act |
|
self.intermediate_size = intermediate_size |
|
self.hidden_dropout_prob = hidden_dropout_prob |
|
self.max_position_embeddings = max_position_embeddings |
|
self.attention_probs_dropout_prob = attention_probs_dropout_prob |
|
self.type_vocab_size = type_vocab_size |
|
|
|
|
|
class BertSelfAttention(nn.Module): |
|
def __init__(self, config): |
|
super(BertSelfAttention, self).__init__() |
|
if config.hidden_size % config.num_attention_heads != 0: |
|
raise ValueError( |
|
"The hidden size (%d) is not a multiple of the number of attention " |
|
"heads (%d)" % (config.hidden_size, config.num_attention_heads)) |
|
self.num_attention_heads = config.num_attention_heads |
|
self.attention_head_size = int(config.hidden_size / config.num_attention_heads) |
|
self.all_head_size = self.num_attention_heads * self.attention_head_size |
|
|
|
self.query = nn.Linear(config.hidden_size, self.all_head_size) |
|
self.key = nn.Linear(config.hidden_size, self.all_head_size) |
|
self.value = nn.Linear(config.hidden_size, self.all_head_size) |
|
|
|
self.dropout = nn.Dropout(config.attention_probs_dropout_prob) |
|
|
|
def transpose_for_scores(self, x): |
|
new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) |
|
x = x.view(*new_x_shape) |
|
return x.permute(0, 2, 1, 3) |
|
|
|
def forward(self, hidden_states, attention_mask): |
|
mixed_query_layer = self.query(hidden_states) |
|
mixed_key_layer = self.key(hidden_states) |
|
mixed_value_layer = self.value(hidden_states) |
|
|
|
query_layer = self.transpose_for_scores(mixed_query_layer) |
|
key_layer = self.transpose_for_scores(mixed_key_layer) |
|
value_layer = self.transpose_for_scores(mixed_value_layer) |
|
|
|
|
|
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) |
|
attention_scores = attention_scores / math.sqrt(self.attention_head_size) |
|
|
|
if attention_mask is not None: |
|
attention_scores = attention_scores + attention_mask |
|
|
|
|
|
attention_probs = nn.Softmax(dim=-1)(attention_scores) |
|
|
|
|
|
|
|
attention_probs = self.dropout(attention_probs) |
|
|
|
context_layer = torch.matmul(attention_probs, value_layer) |
|
context_layer = context_layer.permute(0, 2, 1, 3).contiguous() |
|
new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) |
|
context_layer = context_layer.view(*new_context_layer_shape) |
|
return context_layer |
|
|
|
|
|
class BertSelfOutput(nn.Module): |
|
def __init__(self, config): |
|
super(BertSelfOutput, self).__init__() |
|
self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
|
self.LayerNorm = BertLayerNorm(config.hidden_size, eps=1e-12) |
|
self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
|
def forward(self, hidden_states, input_tensor): |
|
hidden_states = self.dense(hidden_states) |
|
hidden_states = self.dropout(hidden_states) |
|
hidden_states = self.LayerNorm(hidden_states + input_tensor) |
|
return hidden_states |
|
|
|
|
|
class BertAttention(nn.Module): |
|
def __init__(self, config): |
|
super(BertAttention, self).__init__() |
|
self.self = BertSelfAttention(config) |
|
self.output = BertSelfOutput(config) |
|
|
|
def forward(self, input_tensor, attention_mask): |
|
self_output = self.self(input_tensor, attention_mask) |
|
attention_output = self.output(self_output, input_tensor) |
|
return attention_output |
|
|
|
|
|
class BertIntermediate(nn.Module): |
|
def __init__(self, config): |
|
super(BertIntermediate, self).__init__() |
|
self.dense = nn.Linear(config.hidden_size, config.intermediate_size) |
|
self.intermediate_act_fn = gelu |
|
|
|
def forward(self, hidden_states): |
|
hidden_states = self.dense(hidden_states) |
|
hidden_states = self.intermediate_act_fn(hidden_states) |
|
return hidden_states |
|
|
|
|
|
class BertOutput(nn.Module): |
|
def __init__(self, config): |
|
super(BertOutput, self).__init__() |
|
self.dense = nn.Linear(config.intermediate_size, config.hidden_size) |
|
self.LayerNorm = BertLayerNorm(config.hidden_size, eps=1e-12) |
|
self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
|
def forward(self, hidden_states, input_tensor): |
|
hidden_states = self.dense(hidden_states) |
|
hidden_states = self.dropout(hidden_states) |
|
hidden_states = self.LayerNorm(hidden_states + input_tensor) |
|
return hidden_states |
|
|
|
|
|
class BertLayer(nn.Module): |
|
def __init__(self, config): |
|
super(BertLayer, self).__init__() |
|
self.attention = BertAttention(config) |
|
self.intermediate = BertIntermediate(config) |
|
self.output = BertOutput(config) |
|
|
|
def forward(self, hidden_states, attention_mask): |
|
attention_output = self.attention(hidden_states, attention_mask) |
|
intermediate_output = self.intermediate(attention_output) |
|
layer_output = self.output(intermediate_output, attention_output) |
|
return layer_output |
|
|
|
|
|
class BertEncoder(nn.Module): |
|
def __init__(self, config): |
|
super(BertEncoder, self).__init__() |
|
layer = BertLayer(config) |
|
self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(config.num_hidden_layers)]) |
|
|
|
def forward(self, hidden_states, attention_mask=None, output_all_encoded_layers=True): |
|
all_encoder_layers = [] |
|
for layer_module in self.layer: |
|
hidden_states = layer_module(hidden_states, attention_mask) |
|
if output_all_encoded_layers: |
|
all_encoder_layers.append(hidden_states) |
|
if not output_all_encoded_layers: |
|
all_encoder_layers.append(hidden_states) |
|
return all_encoder_layers |
|
|
|
|
|
class BertEmbeddings(nn.Module): |
|
"""Construct the embeddings from word, position and token_type embeddings. |
|
""" |
|
|
|
def __init__(self, config): |
|
super(BertEmbeddings, self).__init__() |
|
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) |
|
|
|
|
|
|
|
self.LayerNorm = BertLayerNorm(config.hidden_size, eps=1e-12) |
|
self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
|
def forward(self, input_ids, token_type_ids=None): |
|
seq_length = input_ids.size(1) |
|
position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) |
|
position_ids = position_ids.unsqueeze(0).expand_as(input_ids[:, :, 0]) |
|
|
|
position_embeddings = self.position_embeddings(position_ids) |
|
|
|
embeddings = input_ids + position_embeddings |
|
|
|
embeddings = self.LayerNorm(embeddings) |
|
embeddings = self.dropout(embeddings) |
|
return embeddings |
|
|
|
|
|
class PositionalEncoding(nn.Module): |
|
def __init__(self, config): |
|
super(PositionalEncoding, self).__init__() |
|
emb_dim = config.hidden_size |
|
max_len = config.max_position_embeddings |
|
self.position_enc = self.position_encoding_init(max_len, emb_dim) |
|
|
|
@staticmethod |
|
def position_encoding_init(n_position, emb_dim): |
|
''' Init the sinusoid position encoding table ''' |
|
|
|
|
|
position_enc = np.array([ |
|
[pos / np.power(10000, 2 * (j // 2) / emb_dim) for j in range(emb_dim)] |
|
if pos != 0 else np.zeros(emb_dim) for pos in range(n_position)]) |
|
|
|
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) |
|
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) |
|
return torch.from_numpy(position_enc).type(torch.FloatTensor) |
|
|
|
def forward(self, word_seq): |
|
position_encoding = self.position_enc.unsqueeze(0).expand_as(word_seq) |
|
position_encoding = position_encoding.to(word_seq.device) |
|
word_pos_encoded = word_seq + position_encoding |
|
return word_pos_encoded |
|
|
|
|
|
class BertPooler(nn.Module): |
|
def __init__(self, config): |
|
super(BertPooler, self).__init__() |
|
self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
|
self.activation = nn.Tanh() |
|
|
|
def forward(self, hidden_states): |
|
|
|
|
|
first_token_tensor = hidden_states[:, 0] |
|
pooled_output = self.dense(first_token_tensor) |
|
pooled_output = self.activation(pooled_output) |
|
return pooled_output |