OpenSLU / model /encoder /non_pretrained_encoder.py
LightChen2333's picture
Upload 78 files
223340a
'''
Author: Qiguang Chen
Date: 2023-01-11 10:39:26
LastEditors: Qiguang Chen
LastEditTime: 2023-02-17 21:08:19
Description: non-pretrained encoder model
'''
import math
import einops
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from common.utils import HiddenData, InputData
from model.encoder.base_encoder import BaseEncoder
class NonPretrainedEncoder(BaseEncoder):
"""
Encoder structure based on bidirectional LSTM and self-attention.
"""
def __init__(self, **config):
""" init non-pretrained encoder
Args:
config (dict):
embedding (dict):
dropout_rate (float): dropout rate.
load_embedding_name (str): None if not use pretrained embedding or embedding name like "glove.6B.300d.txt".
embedding_matrix (Tensor, Optional): embedding matrix tensor. Enabled if load_embedding_name is not None.
vocab_size (str, Optional): vocabulary size. Enabled if load_embedding_name is None.
lstm (dict):
output_dim (int): lstm output dim.
bidirectional (bool): if use BiLSTM or LSTM.
layer_num (int): number of layers.
dropout_rate (float): dropout rate.
attention (dict, Optional):
dropout_rate (float): dropout rate.
hidden_dim (int): attention hidden dim.
output_dim (int): attention output dim.
unflat_attention (dict, optional): Enabled if attention is not None.
dropout_rate (float): dropout rate.
"""
super(NonPretrainedEncoder, self).__init__()
self.config = config
# Embedding Initialization
embed_config = config["embedding"]
self.__embedding_dim = embed_config["embedding_dim"]
if embed_config.get("load_embedding_name") and embed_config.get("embedding_matrix"):
self.__embedding_layer = nn.Embedding.from_pretrained(embed_config["embedding_matrix"], padding_idx=0)
else:
self.__embedding_layer = nn.Embedding(
embed_config["vocab_size"], self.__embedding_dim
)
self.__embedding_dropout_layer = nn.Dropout(embed_config["dropout_rate"])
# LSTM Initialization
lstm_config = config["lstm"]
self.__hidden_size = lstm_config["output_dim"]
self.__lstm_layer = nn.LSTM(
input_size=self.__embedding_dim,
hidden_size=lstm_config["output_dim"] // 2,
batch_first=True,
bidirectional=lstm_config["bidirectional"],
dropout=lstm_config["dropout_rate"],
num_layers=lstm_config["layer_num"]
)
if self.config.get("attention"):
# Attention Initialization
att_config = config["attention"]
self.__attention_dropout_layer = nn.Dropout(att_config["dropout_rate"])
self.__attention_layer = QKVAttention(
self.__embedding_dim, self.__embedding_dim, self.__embedding_dim,
att_config["hidden_dim"], att_config["output_dim"], att_config["dropout_rate"]
)
if self.config.get("unflat_attention"):
unflat_att_config = config["unflat_attention"]
self.__sentattention = UnflatSelfAttention(
lstm_config["output_dim"] + att_config["output_dim"],
unflat_att_config["dropout_rate"]
)
def forward(self, inputs: InputData):
""" Forward process for Non-Pretrained Encoder.
Args:
inputs: padded input ids, masks.
Returns:
encoded hidden vectors.
"""
# LSTM Encoder
# Padded_text should be instance of LongTensor.
embedded_text = self.__embedding_layer(inputs.input_ids)
dropout_text = self.__embedding_dropout_layer(embedded_text)
seq_lens = inputs.attention_mask.sum(-1).detach().cpu()
# Pack and Pad process for input of variable length.
packed_text = pack_padded_sequence(dropout_text, seq_lens, batch_first=True, enforce_sorted=False)
lstm_hiddens, (h_last, c_last) = self.__lstm_layer(packed_text)
padded_hiddens, _ = pad_packed_sequence(lstm_hiddens, batch_first=True)
if self.config.get("attention"):
# Attention Encoder
dropout_text = self.__attention_dropout_layer(embedded_text)
attention_hiddens = self.__attention_layer(
dropout_text, dropout_text, dropout_text, mask=inputs.attention_mask
)
# Attention + LSTM
hiddens = torch.cat([attention_hiddens, padded_hiddens], dim=-1)
hidden = HiddenData(None, hiddens)
if self.config.get("return_with_input"):
hidden.add_input(inputs)
if self.config.get("return_sentence_level_hidden"):
if self.config.get("unflat_attention"):
sentence = self.__sentattention(hiddens, seq_lens)
else:
sentence = hiddens[:, 0, :]
hidden.update_intent_hidden_state(sentence)
else:
sentence_hidden = None
if self.config.get("return_sentence_level_hidden"):
sentence_hidden = torch.cat((h_last[-1], h_last[-1], c_last[-1], c_last[-2]), dim=-1)
hidden = HiddenData(sentence_hidden, padded_hiddens)
if self.config.get("return_with_input"):
hidden.add_input(inputs)
return hidden
class QKVAttention(nn.Module):
"""
Attention mechanism based on Query-Key-Value architecture. And
especially, when query == key == value, it's self-attention.
"""
def __init__(self, query_dim, key_dim, value_dim, hidden_dim, output_dim, dropout_rate):
super(QKVAttention, self).__init__()
# Record hyper-parameters.
self.__query_dim = query_dim
self.__key_dim = key_dim
self.__value_dim = value_dim
self.__hidden_dim = hidden_dim
self.__output_dim = output_dim
self.__dropout_rate = dropout_rate
# Declare network structures.
self.__query_layer = nn.Linear(self.__query_dim, self.__hidden_dim)
self.__key_layer = nn.Linear(self.__key_dim, self.__hidden_dim)
self.__value_layer = nn.Linear(self.__value_dim, self.__output_dim)
self.__dropout_layer = nn.Dropout(p=self.__dropout_rate)
def forward(self, input_query, input_key, input_value, mask=None):
""" The forward propagation of attention.
Here we require the first dimension of input key
and value are equal.
Args:
input_query: is query tensor, (n, d_q)
input_key: is key tensor, (m, d_k)
input_value: is value tensor, (m, d_v)
Returns:
attention based tensor, (n, d_h)
"""
# Linear transform to fine-tune dimension.
linear_query = self.__query_layer(input_query)
linear_key = self.__key_layer(input_key)
linear_value = self.__value_layer(input_value)
score_tensor = torch.matmul(
linear_query,
linear_key.transpose(-2, -1)
) / math.sqrt(self.__hidden_dim)
if mask is not None:
attn_mask = einops.repeat((mask == 0), "b l -> b l h", h=score_tensor.shape[-1])
score_tensor = score_tensor.masked_fill_(attn_mask, -float(1e20))
score_tensor = F.softmax(score_tensor, dim=-1)
forced_tensor = torch.matmul(score_tensor, linear_value)
forced_tensor = self.__dropout_layer(forced_tensor)
return forced_tensor
class UnflatSelfAttention(nn.Module):
"""
scores each element of the sequence with a linear layer and uses the normalized scores to compute a context over the sequence.
"""
def __init__(self, d_hid, dropout=0.):
super().__init__()
self.scorer = nn.Linear(d_hid, 1)
self.dropout = nn.Dropout(dropout)
def forward(self, inp, lens):
batch_size, seq_len, d_feat = inp.size()
inp = self.dropout(inp)
scores = self.scorer(inp.contiguous().view(-1, d_feat)).view(batch_size, seq_len)
max_len = max(lens)
for i, l in enumerate(lens):
if l < max_len:
scores.data[i, l:] = -np.inf
scores = F.softmax(scores, dim=1)
context = scores.unsqueeze(2).expand_as(inp).mul(inp).sum(1)
return context