AVCER / app /model_architectures.py
ElenaRyumina's picture
Summary
47aeb66
"""
File: model.py
Author: Elena Ryumina and Dmitry Ryumin
Description: This module provides model architectures.
License: MIT License
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np
from transformers.models.wav2vec2.modeling_wav2vec2 import (
Wav2Vec2Model,
Wav2Vec2PreTrainedModel,
)
from typing import Optional
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False)
self.batch_norm1 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding='same', bias=False)
self.batch_norm2 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99)
self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0, bias=False)
self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion, eps=0.001, momentum=0.99)
self.i_downsample = i_downsample
self.stride = stride
self.relu = nn.ReLU()
def forward(self, x):
identity = x.clone()
x = self.relu(self.batch_norm1(self.conv1(x)))
x = self.relu(self.batch_norm2(self.conv2(x)))
x = self.conv3(x)
x = self.batch_norm3(x)
#downsample if needed
if self.i_downsample is not None:
identity = self.i_downsample(identity)
#add identity
x+=identity
x=self.relu(x)
return x
class Conv2dSame(torch.nn.Conv2d):
def calc_same_pad(self, i: int, k: int, s: int, d: int) -> int:
return max((math.ceil(i / s) - 1) * s + (k - 1) * d + 1 - i, 0)
def forward(self, x: torch.Tensor) -> torch.Tensor:
ih, iw = x.size()[-2:]
pad_h = self.calc_same_pad(i=ih, k=self.kernel_size[0], s=self.stride[0], d=self.dilation[0])
pad_w = self.calc_same_pad(i=iw, k=self.kernel_size[1], s=self.stride[1], d=self.dilation[1])
if pad_h > 0 or pad_w > 0:
x = F.pad(
x, [pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2]
)
return F.conv2d(
x,
self.weight,
self.bias,
self.stride,
self.padding,
self.dilation,
self.groups,
)
class ResNet(nn.Module):
def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
super(ResNet, self).__init__()
self.in_channels = 64
self.conv_layer_s2_same = Conv2dSame(num_channels, 64, 7, stride=2, groups=1, bias=False)
self.batch_norm1 = nn.BatchNorm2d(64, eps=0.001, momentum=0.99)
self.relu = nn.ReLU()
self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2)
self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64, stride=1)
self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1,1))
self.fc1 = nn.Linear(512*ResBlock.expansion, 512)
self.relu1 = nn.ReLU()
self.fc2 = nn.Linear(512, num_classes)
def extract_features(self, x):
x = self.relu(self.batch_norm1(self.conv_layer_s2_same(x)))
x = self.max_pool(x)
# print(x.shape)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.reshape(x.shape[0], -1)
x = self.fc1(x)
return x
def forward(self, x):
x = self.extract_features(x)
x = self.relu1(x)
x = self.fc2(x)
return x
def _make_layer(self, ResBlock, blocks, planes, stride=1):
ii_downsample = None
layers = []
if stride != 1 or self.in_channels != planes*ResBlock.expansion:
ii_downsample = nn.Sequential(
nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride, bias=False, padding=0),
nn.BatchNorm2d(planes*ResBlock.expansion, eps=0.001, momentum=0.99)
)
layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
self.in_channels = planes*ResBlock.expansion
for i in range(blocks-1):
layers.append(ResBlock(self.in_channels, planes))
return nn.Sequential(*layers)
def ResNet50(num_classes, channels=3):
return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)
class LSTMPyTorch(nn.Module):
def __init__(self):
super(LSTMPyTorch, self).__init__()
self.lstm1 = nn.LSTM(input_size=512, hidden_size=512, batch_first=True, bidirectional=False)
self.lstm2 = nn.LSTM(input_size=512, hidden_size=256, batch_first=True, bidirectional=False)
self.fc = nn.Linear(256, 7)
# self.softmax = nn.Softmax(dim=1)
def forward(self, x):
x, _ = self.lstm1(x)
x, _ = self.lstm2(x)
x = self.fc(x[:, -1, :])
# x = self.softmax(x)
return x
class ExprModelV3(Wav2Vec2PreTrainedModel):
def __init__(self, config) -> None:
super().__init__(config)
self.config = config
self.wav2vec2 = Wav2Vec2Model(config)
self.tl1 = TransformerLayer(
input_dim=1024, num_heads=32, dropout=0.1, positional_encoding=True
)
self.tl2 = TransformerLayer(
input_dim=1024, num_heads=16, dropout=0.1, positional_encoding=True
)
self.f_size = 1024
self.time_downsample = torch.nn.Sequential(
torch.nn.Conv1d(
self.f_size, self.f_size, kernel_size=5, stride=3, dilation=2
),
torch.nn.BatchNorm1d(self.f_size),
torch.nn.MaxPool1d(5),
torch.nn.ReLU(),
torch.nn.Conv1d(self.f_size, self.f_size, kernel_size=3),
torch.nn.BatchNorm1d(self.f_size),
torch.nn.AdaptiveAvgPool1d(1),
torch.nn.ReLU(),
)
self.feature_downsample = nn.Linear(self.f_size, 8)
self.init_weights()
self.unfreeze_last_n_blocks(4)
def freeze_conv_only(self):
# freeze conv
for param in self.wav2vec2.feature_extractor.conv_layers.parameters():
param.requires_grad = False
def unfreeze_last_n_blocks(self, num_blocks: int) -> None:
# freeze all wav2vec
for param in self.wav2vec2.parameters():
param.requires_grad = False
# unfreeze last n transformer blocks
for i in range(0, num_blocks):
for param in self.wav2vec2.encoder.layers[-1 * (i + 1)].parameters():
param.requires_grad = True
def forward(self, x):
x = self.wav2vec2(x)[0]
x = self.tl1(query=x, key=x, value=x)
x = self.tl2(query=x, key=x, value=x)
x = x.permute(0, 2, 1)
x = self.time_downsample(x)
x = x.squeeze()
x = self.feature_downsample(x)
return x
class ScaledDotProductAttention_MultiHead(nn.Module):
def __init__(self):
super(ScaledDotProductAttention_MultiHead, self).__init__()
self.softmax = nn.Softmax(dim=-1)
def forward(self, query, key, value, mask=None):
if mask is not None:
raise ValueError("Mask is not supported yet")
# key, query, value shapes: [batch_size, num_heads, seq_len, dim]
emb_dim = key.shape[-1]
# Calculate attention weights
attention_weights = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(
emb_dim
)
# masking
if mask is not None:
raise ValueError("Mask is not supported yet")
# Softmax
attention_weights = self.softmax(attention_weights)
# modify value
value = torch.matmul(attention_weights, value)
return value, attention_weights
class PositionWiseFeedForward(nn.Module):
def __init__(self, input_dim, hidden_dim, dropout: float = 0.1):
super().__init__()
self.layer_1 = nn.Linear(input_dim, hidden_dim)
self.layer_2 = nn.Linear(hidden_dim, input_dim)
self.layer_norm = nn.LayerNorm(input_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# feed-forward network
x = self.layer_1(x)
x = self.dropout(x)
x = F.relu(x)
x = self.layer_2(x)
return x
class Add_and_Norm(nn.Module):
def __init__(self, input_dim, dropout: Optional[float] = 0.1):
super().__init__()
self.layer_norm = nn.LayerNorm(input_dim)
if dropout is not None:
self.dropout = nn.Dropout(dropout)
def forward(self, x1, residual):
x = x1
# apply dropout of needed
if hasattr(self, "dropout"):
x = self.dropout(x)
# add and then norm
x = x + residual
x = self.layer_norm(x)
return x
class MultiHeadAttention(nn.Module):
def __init__(self, input_dim, num_heads, dropout: Optional[float] = 0.1):
super().__init__()
self.input_dim = input_dim
self.num_heads = num_heads
if input_dim % num_heads != 0:
raise ValueError("input_dim must be divisible by num_heads")
self.head_dim = input_dim // num_heads
self.dropout = dropout
# initialize weights
self.query_w = nn.Linear(input_dim, self.num_heads * self.head_dim, bias=False)
self.keys_w = nn.Linear(input_dim, self.num_heads * self.head_dim, bias=False)
self.values_w = nn.Linear(input_dim, self.num_heads * self.head_dim, bias=False)
self.ff_layer_after_concat = nn.Linear(
self.num_heads * self.head_dim, input_dim, bias=False
)
self.attention = ScaledDotProductAttention_MultiHead()
if self.dropout is not None:
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, mask=None):
# query, keys, values shapes: [batch_size, seq_len, input_dim]
batch_size, len_query, len_keys, len_values = (
queries.size(0),
queries.size(1),
keys.size(1),
values.size(1),
)
# linear transformation before attention
queries = (
self.query_w(queries)
.view(batch_size, len_query, self.num_heads, self.head_dim)
.transpose(1, 2)
) # [batch_size, num_heads, seq_len, dim]
keys = (
self.keys_w(keys)
.view(batch_size, len_keys, self.num_heads, self.head_dim)
.transpose(1, 2)
) # [batch_size, num_heads, seq_len, dim]
values = (
self.values_w(values)
.view(batch_size, len_values, self.num_heads, self.head_dim)
.transpose(1, 2)
) # [batch_size, num_heads, seq_len, dim]
# attention itself
values, attention_weights = self.attention(
queries, keys, values, mask=mask
) # values shape:[batch_size, num_heads, seq_len, dim]
# concatenation
out = (
values.transpose(1, 2)
.contiguous()
.view(batch_size, len_values, self.num_heads * self.head_dim)
) # [batch_size, seq_len, num_heads * dim = input_dim]
# go through last linear layer
out = self.ff_layer_after_concat(out)
return out
class EncoderLayer(nn.Module):
def __init__(
self,
input_dim,
num_heads,
dropout: Optional[float] = 0.1,
positional_encoding: bool = True,
):
super(EncoderLayer, self).__init__()
self.positional_encoding = positional_encoding
self.input_dim = input_dim
self.num_heads = num_heads
self.head_dim = input_dim // num_heads
self.dropout = dropout
# initialize layers
self.self_attention = MultiHeadAttention(input_dim, num_heads, dropout=dropout)
self.feed_forward = PositionWiseFeedForward(
input_dim, input_dim, dropout=dropout
)
self.add_norm_after_attention = Add_and_Norm(input_dim, dropout=dropout)
self.add_norm_after_ff = Add_and_Norm(input_dim, dropout=dropout)
# calculate positional encoding
if self.positional_encoding:
self.positional_encoding = PositionalEncoding(input_dim)
def forward(self, x):
# x shape: [batch_size, seq_len, input_dim]
# positional encoding
if self.positional_encoding:
x = self.positional_encoding(x)
# multi-head attention
residual = x
x = self.self_attention(x, x, x)
x = self.add_norm_after_attention(x, residual)
# feed forward
residual = x
x = self.feed_forward(x)
x = self.add_norm_after_ff(x, residual)
return x
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
position = torch.arange(max_len).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
)
pe = torch.zeros(max_len, 1, d_model)
pe[:, 0, 0::2] = torch.sin(position * div_term)
pe[:, 0, 1::2] = torch.cos(position * div_term)
pe = pe.permute(
1, 0, 2
) # [seq_len, batch_size, embedding_dim] -> [batch_size, seq_len, embedding_dim]
self.register_buffer("pe", pe)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Args:
x: Tensor, shape [batch_size, seq_len, embedding_dim]
"""
x = x + self.pe[:, : x.size(1)]
return self.dropout(x)
class TransformerLayer(nn.Module):
def __init__(
self,
input_dim,
num_heads,
dropout: Optional[float] = 0.1,
positional_encoding: bool = True,
):
super(TransformerLayer, self).__init__()
self.positional_encoding = positional_encoding
self.input_dim = input_dim
self.num_heads = num_heads
self.head_dim = input_dim // num_heads
self.dropout = dropout
# initialize layers
self.self_attention = MultiHeadAttention(input_dim, num_heads, dropout=dropout)
self.feed_forward = PositionWiseFeedForward(
input_dim, input_dim, dropout=dropout
)
self.add_norm_after_attention = Add_and_Norm(input_dim, dropout=dropout)
self.add_norm_after_ff = Add_and_Norm(input_dim, dropout=dropout)
# calculate positional encoding
if self.positional_encoding:
self.positional_encoding = PositionalEncoding(input_dim)
def forward(self, key, value, query, mask=None):
# key, value, and query shapes: [batch_size, seq_len, input_dim]
# positional encoding
if self.positional_encoding:
key = self.positional_encoding(key)
value = self.positional_encoding(value)
query = self.positional_encoding(query)
# multi-head attention
residual = query
x = self.self_attention(queries=query, keys=key, values=value, mask=mask)
x = self.add_norm_after_attention(x, residual)
# feed forward
residual = x
x = self.feed_forward(x)
x = self.add_norm_after_ff(x, residual)
return x