Spaces:
Runtime error
Runtime error
''' | |
Author: Qiguang Chen | |
Date: 2023-01-11 10:39:26 | |
LastEditors: Qiguang Chen | |
LastEditTime: 2023-02-17 21:08:19 | |
Description: non-pretrained encoder model | |
''' | |
import math | |
import einops | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence | |
from common.utils import HiddenData, InputData | |
from model.encoder.base_encoder import BaseEncoder | |
class NonPretrainedEncoder(BaseEncoder): | |
""" | |
Encoder structure based on bidirectional LSTM and self-attention. | |
""" | |
def __init__(self, **config): | |
""" init non-pretrained encoder | |
Args: | |
config (dict): | |
embedding (dict): | |
dropout_rate (float): dropout rate. | |
load_embedding_name (str): None if not use pretrained embedding or embedding name like "glove.6B.300d.txt". | |
embedding_matrix (Tensor, Optional): embedding matrix tensor. Enabled if load_embedding_name is not None. | |
vocab_size (str, Optional): vocabulary size. Enabled if load_embedding_name is None. | |
lstm (dict): | |
output_dim (int): lstm output dim. | |
bidirectional (bool): if use BiLSTM or LSTM. | |
layer_num (int): number of layers. | |
dropout_rate (float): dropout rate. | |
attention (dict, Optional): | |
dropout_rate (float): dropout rate. | |
hidden_dim (int): attention hidden dim. | |
output_dim (int): attention output dim. | |
unflat_attention (dict, optional): Enabled if attention is not None. | |
dropout_rate (float): dropout rate. | |
""" | |
super(NonPretrainedEncoder, self).__init__() | |
self.config = config | |
# Embedding Initialization | |
embed_config = config["embedding"] | |
self.__embedding_dim = embed_config["embedding_dim"] | |
if embed_config.get("load_embedding_name") and embed_config.get("embedding_matrix"): | |
self.__embedding_layer = nn.Embedding.from_pretrained(embed_config["embedding_matrix"], padding_idx=0) | |
else: | |
self.__embedding_layer = nn.Embedding( | |
embed_config["vocab_size"], self.__embedding_dim | |
) | |
self.__embedding_dropout_layer = nn.Dropout(embed_config["dropout_rate"]) | |
# LSTM Initialization | |
lstm_config = config["lstm"] | |
self.__hidden_size = lstm_config["output_dim"] | |
self.__lstm_layer = nn.LSTM( | |
input_size=self.__embedding_dim, | |
hidden_size=lstm_config["output_dim"] // 2, | |
batch_first=True, | |
bidirectional=lstm_config["bidirectional"], | |
dropout=lstm_config["dropout_rate"], | |
num_layers=lstm_config["layer_num"] | |
) | |
if self.config.get("attention"): | |
# Attention Initialization | |
att_config = config["attention"] | |
self.__attention_dropout_layer = nn.Dropout(att_config["dropout_rate"]) | |
self.__attention_layer = QKVAttention( | |
self.__embedding_dim, self.__embedding_dim, self.__embedding_dim, | |
att_config["hidden_dim"], att_config["output_dim"], att_config["dropout_rate"] | |
) | |
if self.config.get("unflat_attention"): | |
unflat_att_config = config["unflat_attention"] | |
self.__sentattention = UnflatSelfAttention( | |
lstm_config["output_dim"] + att_config["output_dim"], | |
unflat_att_config["dropout_rate"] | |
) | |
def forward(self, inputs: InputData): | |
""" Forward process for Non-Pretrained Encoder. | |
Args: | |
inputs: padded input ids, masks. | |
Returns: | |
encoded hidden vectors. | |
""" | |
# LSTM Encoder | |
# Padded_text should be instance of LongTensor. | |
embedded_text = self.__embedding_layer(inputs.input_ids) | |
dropout_text = self.__embedding_dropout_layer(embedded_text) | |
seq_lens = inputs.attention_mask.sum(-1).detach().cpu() | |
# Pack and Pad process for input of variable length. | |
packed_text = pack_padded_sequence(dropout_text, seq_lens, batch_first=True, enforce_sorted=False) | |
lstm_hiddens, (h_last, c_last) = self.__lstm_layer(packed_text) | |
padded_hiddens, _ = pad_packed_sequence(lstm_hiddens, batch_first=True) | |
if self.config.get("attention"): | |
# Attention Encoder | |
dropout_text = self.__attention_dropout_layer(embedded_text) | |
attention_hiddens = self.__attention_layer( | |
dropout_text, dropout_text, dropout_text, mask=inputs.attention_mask | |
) | |
# Attention + LSTM | |
hiddens = torch.cat([attention_hiddens, padded_hiddens], dim=-1) | |
hidden = HiddenData(None, hiddens) | |
if self.config.get("return_with_input"): | |
hidden.add_input(inputs) | |
if self.config.get("return_sentence_level_hidden"): | |
if self.config.get("unflat_attention"): | |
sentence = self.__sentattention(hiddens, seq_lens) | |
else: | |
sentence = hiddens[:, 0, :] | |
hidden.update_intent_hidden_state(sentence) | |
else: | |
sentence_hidden = None | |
if self.config.get("return_sentence_level_hidden"): | |
sentence_hidden = torch.cat((h_last[-1], h_last[-1], c_last[-1], c_last[-2]), dim=-1) | |
hidden = HiddenData(sentence_hidden, padded_hiddens) | |
if self.config.get("return_with_input"): | |
hidden.add_input(inputs) | |
return hidden | |
class QKVAttention(nn.Module): | |
""" | |
Attention mechanism based on Query-Key-Value architecture. And | |
especially, when query == key == value, it's self-attention. | |
""" | |
def __init__(self, query_dim, key_dim, value_dim, hidden_dim, output_dim, dropout_rate): | |
super(QKVAttention, self).__init__() | |
# Record hyper-parameters. | |
self.__query_dim = query_dim | |
self.__key_dim = key_dim | |
self.__value_dim = value_dim | |
self.__hidden_dim = hidden_dim | |
self.__output_dim = output_dim | |
self.__dropout_rate = dropout_rate | |
# Declare network structures. | |
self.__query_layer = nn.Linear(self.__query_dim, self.__hidden_dim) | |
self.__key_layer = nn.Linear(self.__key_dim, self.__hidden_dim) | |
self.__value_layer = nn.Linear(self.__value_dim, self.__output_dim) | |
self.__dropout_layer = nn.Dropout(p=self.__dropout_rate) | |
def forward(self, input_query, input_key, input_value, mask=None): | |
""" The forward propagation of attention. | |
Here we require the first dimension of input key | |
and value are equal. | |
Args: | |
input_query: is query tensor, (n, d_q) | |
input_key: is key tensor, (m, d_k) | |
input_value: is value tensor, (m, d_v) | |
Returns: | |
attention based tensor, (n, d_h) | |
""" | |
# Linear transform to fine-tune dimension. | |
linear_query = self.__query_layer(input_query) | |
linear_key = self.__key_layer(input_key) | |
linear_value = self.__value_layer(input_value) | |
score_tensor = torch.matmul( | |
linear_query, | |
linear_key.transpose(-2, -1) | |
) / math.sqrt(self.__hidden_dim) | |
if mask is not None: | |
attn_mask = einops.repeat((mask == 0), "b l -> b l h", h=score_tensor.shape[-1]) | |
score_tensor = score_tensor.masked_fill_(attn_mask, -float(1e20)) | |
score_tensor = F.softmax(score_tensor, dim=-1) | |
forced_tensor = torch.matmul(score_tensor, linear_value) | |
forced_tensor = self.__dropout_layer(forced_tensor) | |
return forced_tensor | |
class UnflatSelfAttention(nn.Module): | |
""" | |
scores each element of the sequence with a linear layer and uses the normalized scores to compute a context over the sequence. | |
""" | |
def __init__(self, d_hid, dropout=0.): | |
super().__init__() | |
self.scorer = nn.Linear(d_hid, 1) | |
self.dropout = nn.Dropout(dropout) | |
def forward(self, inp, lens): | |
batch_size, seq_len, d_feat = inp.size() | |
inp = self.dropout(inp) | |
scores = self.scorer(inp.contiguous().view(-1, d_feat)).view(batch_size, seq_len) | |
max_len = max(lens) | |
for i, l in enumerate(lens): | |
if l < max_len: | |
scores.data[i, l:] = -np.inf | |
scores = F.softmax(scores, dim=1) | |
context = scores.unsqueeze(2).expand_as(inp).mul(inp).sum(1) | |
return context |