|
import inspect |
|
from typing import List |
|
|
|
import os |
|
os.environ["KERAS_BACKEND"] = "torch" |
|
import keras |
|
|
|
|
|
|
|
|
|
|
|
from keras import Model |
|
from keras import optimizers |
|
from keras import ops as K |
|
from keras import config as KK |
|
|
|
from keras import layers |
|
from keras.layers import Input, Layer, Conv1D, Dense, BatchNormalization, LayerNormalization, Activation, SpatialDropout1D, Lambda |
|
|
|
|
|
def is_power_of_two(num: int): |
|
return num != 0 and ((num & (num - 1)) == 0) |
|
|
|
|
|
def adjust_dilations(dilations: list): |
|
if all([is_power_of_two(i) for i in dilations]): |
|
return dilations |
|
else: |
|
new_dilations = [2 ** i for i in dilations] |
|
return new_dilations |
|
|
|
|
|
class ResidualBlock(Layer): |
|
|
|
def __init__(self, |
|
dilation_rate: int, |
|
nb_filters: int, |
|
kernel_size: int, |
|
padding: str, |
|
activation: str = 'relu', |
|
dropout_rate: float = 0, |
|
kernel_initializer: str = 'he_normal', |
|
use_batch_norm: bool = False, |
|
use_layer_norm: bool = False, |
|
use_weight_norm: bool = False, |
|
**kwargs): |
|
"""Defines the residual block for the WaveNet TCN |
|
Args: |
|
x: The previous layer in the model |
|
training: boolean indicating whether the layer should behave in training mode or in inference mode |
|
dilation_rate: The dilation power of 2 we are using for this residual block |
|
nb_filters: The number of convolutional filters to use in this block |
|
kernel_size: The size of the convolutional kernel |
|
padding: The padding used in the convolutional layers, 'same' or 'causal'. |
|
activation: The final activation used in o = Activation(x + F(x)) |
|
dropout_rate: Float between 0 and 1. Fraction of the input units to drop. |
|
kernel_initializer: Initializer for the kernel weights matrix (Conv1D). |
|
use_batch_norm: Whether to use batch normalization in the residual layers or not. |
|
use_layer_norm: Whether to use layer normalization in the residual layers or not. |
|
use_weight_norm: Whether to use weight normalization in the residual layers or not. |
|
kwargs: Any initializers for Layer class. |
|
""" |
|
|
|
self.dilation_rate = dilation_rate |
|
self.nb_filters = nb_filters |
|
self.kernel_size = kernel_size |
|
self.padding = padding |
|
self.activation = activation |
|
self.dropout_rate = dropout_rate |
|
self.use_batch_norm = use_batch_norm |
|
self.use_layer_norm = use_layer_norm |
|
self.use_weight_norm = use_weight_norm |
|
self.kernel_initializer = kernel_initializer |
|
self.layers = [] |
|
self.shape_match_conv = None |
|
self.res_output_shape = None |
|
self.final_activation = None |
|
|
|
super(ResidualBlock, self).__init__(**kwargs) |
|
|
|
def _build_layer(self, layer): |
|
"""Helper function for building layer |
|
Args: |
|
layer: Appends layer to internal layer list and builds it based on the current output |
|
shape of ResidualBlocK. Updates current output shape. |
|
""" |
|
self.layers.append(layer) |
|
self.layers[-1].build(self.res_output_shape) |
|
self.res_output_shape = self.layers[-1].compute_output_shape(self.res_output_shape) |
|
|
|
def build(self, input_shape): |
|
|
|
|
|
self.layers = [] |
|
self.res_output_shape = input_shape |
|
|
|
for k in range(2): |
|
name = 'conv1D_{}'.format(k) |
|
|
|
conv = Conv1D( |
|
filters=self.nb_filters, |
|
kernel_size=self.kernel_size, |
|
dilation_rate=self.dilation_rate, |
|
padding=self.padding, |
|
name=name, |
|
kernel_initializer=self.kernel_initializer |
|
) |
|
if self.use_weight_norm: |
|
from tensorflow_addons.layers import WeightNormalization |
|
|
|
|
|
conv = WeightNormalization(conv) |
|
self._build_layer(conv) |
|
|
|
|
|
if self.use_batch_norm: |
|
self._build_layer(BatchNormalization()) |
|
elif self.use_layer_norm: |
|
self._build_layer(LayerNormalization()) |
|
elif self.use_weight_norm: |
|
pass |
|
|
|
|
|
self._build_layer(Activation(self.activation, name='Act_Conv1D_{}'.format(k))) |
|
self._build_layer(SpatialDropout1D(rate=self.dropout_rate, name='SDropout_{}'.format(k))) |
|
|
|
if self.nb_filters != input_shape[-1]: |
|
|
|
name = 'matching_conv1D' |
|
|
|
|
|
|
|
self.shape_match_conv = Conv1D( |
|
filters=self.nb_filters, |
|
kernel_size=1, |
|
padding='same', |
|
name=name, |
|
kernel_initializer=self.kernel_initializer |
|
) |
|
else: |
|
name = 'matching_identity' |
|
self.shape_match_conv = Lambda(lambda x: x, name=name) |
|
|
|
|
|
self.shape_match_conv.build(input_shape) |
|
self.res_output_shape = self.shape_match_conv.compute_output_shape(input_shape) |
|
|
|
self._build_layer(Activation(self.activation, name='Act_Conv_Blocks')) |
|
self.final_activation = Activation(self.activation, name='Act_Res_Block') |
|
self.final_activation.build(self.res_output_shape) |
|
|
|
|
|
for layer in self.layers: |
|
self.__setattr__(layer.name, layer) |
|
self.__setattr__(self.shape_match_conv.name, self.shape_match_conv) |
|
self.__setattr__(self.final_activation.name, self.final_activation) |
|
|
|
super(ResidualBlock, self).build(input_shape) |
|
|
|
def call(self, inputs, training=None, **kwargs): |
|
""" |
|
Returns: A tuple where the first element is the residual model tensor, and the second |
|
is the skip connection tensor. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
x1 = inputs |
|
for layer in self.layers: |
|
training_flag = 'training' in dict(inspect.signature(layer.call).parameters) |
|
x1 = layer(x1, training=training) if training_flag else layer(x1) |
|
x2 = self.shape_match_conv(inputs) |
|
x1_x2 = self.final_activation(layers.add([x2, x1], name='Add_Res')) |
|
return [x1_x2, x1] |
|
|
|
def compute_output_shape(self, input_shape): |
|
return [self.res_output_shape, self.res_output_shape] |
|
|
|
|
|
class TCN(Layer): |
|
"""Creates a TCN layer. |
|
Input shape: |
|
A tensor of shape (batch_size, timesteps, input_dim). |
|
Args: |
|
nb_filters: The number of filters to use in the convolutional layers. Can be a list. |
|
kernel_size: The size of the kernel to use in each convolutional layer. |
|
dilations: The list of the dilations. Example is: [1, 2, 4, 8, 16, 32, 64]. |
|
nb_stacks : The number of stacks of residual blocks to use. |
|
padding: The padding to use in the convolutional layers, 'causal' or 'same'. |
|
use_skip_connections: Boolean. If we want to add skip connections from input to each residual blocK. |
|
return_sequences: Boolean. Whether to return the last output in the output sequence, or the full sequence. |
|
activation: The activation used in the residual blocks o = Activation(x + F(x)). |
|
dropout_rate: Float between 0 and 1. Fraction of the input units to drop. |
|
kernel_initializer: Initializer for the kernel weights matrix (Conv1D). |
|
use_batch_norm: Whether to use batch normalization in the residual layers or not. |
|
use_layer_norm: Whether to use layer normalization in the residual layers or not. |
|
use_weight_norm: Whether to use weight normalization in the residual layers or not. |
|
kwargs: Any other arguments for configuring parent class Layer. For example "name=str", Name of the model. |
|
Use unique names when using multiple TCN. |
|
Returns: |
|
A TCN layer. |
|
""" |
|
|
|
def __init__(self, |
|
nb_filters=256, |
|
kernel_size=5, |
|
nb_stacks=1, |
|
dilations=(1, 2, 4, 8, 16, 32), |
|
padding='causal', |
|
use_skip_connections=True, |
|
dropout_rate=0.0, |
|
return_sequences=False, |
|
activation='relu', |
|
kernel_initializer='he_normal', |
|
use_batch_norm=False, |
|
use_layer_norm=False, |
|
use_weight_norm=False, |
|
**kwargs): |
|
print("nb_filters:", nb_filters, "kernel_size", kernel_size) |
|
self.return_sequences = return_sequences |
|
self.dropout_rate = dropout_rate |
|
self.use_skip_connections = use_skip_connections |
|
self.dilations = dilations |
|
self.nb_stacks = nb_stacks |
|
self.kernel_size = kernel_size |
|
self.nb_filters = nb_filters |
|
self.activation_name = activation |
|
self.padding = padding |
|
self.kernel_initializer = kernel_initializer |
|
self.use_batch_norm = use_batch_norm |
|
self.use_layer_norm = use_layer_norm |
|
self.use_weight_norm = use_weight_norm |
|
self.skip_connections = [] |
|
self.residual_blocks = [] |
|
self.layers_outputs = [] |
|
self.build_output_shape = None |
|
self.slicer_layer = None |
|
self.output_slice_index = None |
|
self.padding_same_and_time_dim_unknown = False |
|
|
|
if self.use_batch_norm + self.use_layer_norm + self.use_weight_norm > 1: |
|
raise ValueError('Only one normalization can be specified at once.') |
|
|
|
if isinstance(self.nb_filters, list): |
|
assert len(self.nb_filters) == len(self.dilations) |
|
if len(set(self.nb_filters)) > 1 and self.use_skip_connections: |
|
raise ValueError('Skip connections are not compatible ' |
|
'with a list of filters, unless they are all equal.') |
|
|
|
if padding != 'causal' and padding != 'same': |
|
raise ValueError("Only 'causal' or 'same' padding are compatible for this layer.") |
|
|
|
|
|
super(TCN, self).__init__(**kwargs) |
|
|
|
@property |
|
def receptive_field(self): |
|
return 1 + 2 * (self.kernel_size - 1) * self.nb_stacks * sum(self.dilations) |
|
|
|
def build(self, input_shape): |
|
|
|
|
|
self.build_output_shape = input_shape |
|
|
|
|
|
self.residual_blocks = [] |
|
total_num_blocks = self.nb_stacks * len(self.dilations) |
|
if not self.use_skip_connections: |
|
total_num_blocks += 1 |
|
|
|
for s in range(self.nb_stacks): |
|
for i, d in enumerate(self.dilations): |
|
res_block_filters = self.nb_filters[i] if isinstance(self.nb_filters, list) else self.nb_filters |
|
self.residual_blocks.append(ResidualBlock(dilation_rate=d, |
|
nb_filters=res_block_filters, |
|
kernel_size=self.kernel_size, |
|
padding=self.padding, |
|
activation=self.activation_name, |
|
dropout_rate=self.dropout_rate, |
|
use_batch_norm=self.use_batch_norm, |
|
use_layer_norm=self.use_layer_norm, |
|
use_weight_norm=self.use_weight_norm, |
|
kernel_initializer=self.kernel_initializer, |
|
name='residual_block_{}'.format(len(self.residual_blocks)))) |
|
|
|
self.residual_blocks[-1].build(self.build_output_shape) |
|
self.build_output_shape = self.residual_blocks[-1].res_output_shape |
|
|
|
|
|
for layer in self.residual_blocks: |
|
self.__setattr__(layer.name, layer) |
|
|
|
self.output_slice_index = None |
|
if self.padding == 'same': |
|
time = self.build_output_shape.as_list()[1] |
|
if time is not None: |
|
self.output_slice_index = int(self.build_output_shape.as_list()[1] / 2) |
|
else: |
|
|
|
self.padding_same_and_time_dim_unknown = True |
|
|
|
else: |
|
self.output_slice_index = -1 |
|
self.slicer_layer = Lambda(lambda tt: tt[:, self.output_slice_index, :], name='Slice_Output') |
|
|
|
if type(self.build_output_shape) == tuple: |
|
static = list(self.build_output_shape) |
|
else: |
|
static = self.build_output_shape.as_list() |
|
self.slicer_layer.build(static) |
|
|
|
def compute_output_shape(self, input_shape): |
|
""" |
|
Overridden in case keras uses it somewhere... no idea. Just trying to avoid future errors. |
|
""" |
|
if not self.built: |
|
self.build(input_shape) |
|
if not self.return_sequences: |
|
batch_size = self.build_output_shape[0] |
|
batch_size = batch_size.value if hasattr(batch_size, 'value') else batch_size |
|
nb_filters = self.build_output_shape[-1] |
|
return [batch_size, nb_filters] |
|
else: |
|
|
|
return [v.value if hasattr(v, 'value') else v for v in self.build_output_shape] |
|
|
|
def call(self, inputs, training=None, **kwargs): |
|
x = inputs |
|
self.layers_outputs = [x] |
|
self.skip_connections = [] |
|
for res_block in self.residual_blocks: |
|
|
|
|
|
|
|
|
|
x, skip_out = res_block(x, training=training) |
|
|
|
self.skip_connections.append(skip_out) |
|
self.layers_outputs.append(x) |
|
|
|
if self.use_skip_connections: |
|
x = layers.add(self.skip_connections, name='Add_Skip_Connections') |
|
self.layers_outputs.append(x) |
|
|
|
if not self.return_sequences: |
|
|
|
if self.padding_same_and_time_dim_unknown: |
|
self.output_slice_index = K.shape(self.layers_outputs[-1])[1] // 2 |
|
x = self.slicer_layer(x) |
|
self.layers_outputs.append(x) |
|
return x |
|
|
|
def get_config(self): |
|
""" |
|
Returns the config of a the layer. This is used for saving and loading from a model |
|
:return: python dictionary with specs to rebuild layer |
|
""" |
|
config = super(TCN, self).get_config() |
|
config['nb_filters'] = self.nb_filters |
|
config['kernel_size'] = self.kernel_size |
|
config['nb_stacks'] = self.nb_stacks |
|
config['dilations'] = self.dilations |
|
config['padding'] = self.padding |
|
config['use_skip_connections'] = self.use_skip_connections |
|
config['dropout_rate'] = self.dropout_rate |
|
config['return_sequences'] = self.return_sequences |
|
config['activation'] = self.activation_name |
|
config['use_batch_norm'] = self.use_batch_norm |
|
config['use_layer_norm'] = self.use_layer_norm |
|
config['use_weight_norm'] = self.use_weight_norm |
|
config['kernel_initializer'] = self.kernel_initializer |
|
return config |
|
|
|
|
|
def compiled_tcn(num_feat, |
|
num_classes, |
|
nb_filters, |
|
kernel_size, |
|
dilations, |
|
nb_stacks, |
|
max_len, |
|
output_len=1, |
|
padding='causal', |
|
use_skip_connections=False, |
|
return_sequences=True, |
|
regression=False, |
|
dropout_rate=0.05, |
|
name='tcn', |
|
kernel_initializer='he_normal', |
|
activation='relu', |
|
opt='adam', |
|
lr=0.002, |
|
use_batch_norm=False, |
|
use_layer_norm=False, |
|
use_weight_norm=False): |
|
|
|
"""Creates a compiled TCN model for a given task (i.e. regression or classification). |
|
Classification uses a sparse categorical loss. Please input class ids and not one-hot encodings. |
|
Args: |
|
num_feat: The number of features of your input, i.e. the last dimension of: (batch_size, timesteps, input_dim). |
|
num_classes: The size of the final dense layer, how many classes we are predicting. |
|
nb_filters: The number of filters to use in the convolutional layers. |
|
kernel_size: The size of the kernel to use in each convolutional layer. |
|
dilations: The list of the dilations. Example is: [1, 2, 4, 8, 16, 32, 64]. |
|
nb_stacks : The number of stacks of residual blocks to use. |
|
max_len: The maximum sequence length, use None if the sequence length is dynamic. |
|
padding: The padding to use in the convolutional layers. |
|
use_skip_connections: Boolean. If we want to add skip connections from input to each residual blocK. |
|
return_sequences: Boolean. Whether to return the last output in the output sequence, or the full sequence. |
|
regression: Whether the output should be continuous or discrete. |
|
dropout_rate: Float between 0 and 1. Fraction of the input units to drop. |
|
activation: The activation used in the residual blocks o = Activation(x + F(x)). |
|
name: Name of the model. Useful when having multiple TCN. |
|
kernel_initializer: Initializer for the kernel weights matrix (Conv1D). |
|
opt: Optimizer name. |
|
lr: Learning rate. |
|
use_batch_norm: Whether to use batch normalization in the residual layers or not. |
|
use_layer_norm: Whether to use layer normalization in the residual layers or not. |
|
use_weight_norm: Whether to use weight normalization in the residual layers or not. |
|
Returns: |
|
A compiled keras TCN. |
|
""" |
|
|
|
dilations = adjust_dilations(dilations) |
|
|
|
input_layer = Input(shape=(max_len, num_feat)) |
|
|
|
x = TCN(nb_filters, kernel_size, nb_stacks, dilations, padding, |
|
use_skip_connections, dropout_rate, return_sequences, |
|
activation, kernel_initializer, use_batch_norm, use_layer_norm, |
|
use_weight_norm, name=name)(input_layer) |
|
|
|
print('x.shape=', x.shape) |
|
|
|
def get_opt(): |
|
if opt == 'adam': |
|
return optimizers.Adam(lr=lr, clipnorm=1.) |
|
elif opt == 'rmsprop': |
|
return optimizers.RMSprop(lr=lr, clipnorm=1.) |
|
else: |
|
raise Exception('Only Adam and RMSProp are available here') |
|
|
|
if not regression: |
|
|
|
print('asdasfdasfa') |
|
x = Dense(num_classes)(x) |
|
x = Activation('softmax')(x) |
|
output_layer = x |
|
model = Model(input_layer, output_layer) |
|
|
|
|
|
|
|
|
|
def accuracy(y_true, y_pred): |
|
|
|
if K.ndim(y_true) == K.ndim(y_pred): |
|
y_true = K.squeeze(y_true, -1) |
|
|
|
y_pred_labels = K.argmax(y_pred, axis=-1) |
|
y_pred_labels = K.cast(y_pred_labels, KK.floatx()) |
|
return K.cast(K.equal(y_true, y_pred_labels), KK.floatx()) |
|
|
|
model.compile(get_opt(), loss='sparse_categorical_crossentropy', metrics=[accuracy]) |
|
else: |
|
|
|
x = Dense(output_len)(x) |
|
x = Activation('linear')(x) |
|
output_layer = x |
|
model = Model(input_layer, output_layer) |
|
model.compile(get_opt(), loss='mean_squared_error') |
|
print('model.x = {}'.format(input_layer.shape)) |
|
print('model.y = {}'.format(output_layer.shape)) |
|
return model |
|
|
|
|
|
def tcn_full_summary(model: Model, expand_residual_blocks=True): |
|
|
|
layers = model._layers.copy() |
|
model._layers.clear() |
|
|
|
for i in range(len(layers)): |
|
if isinstance(layers[i], TCN): |
|
for layer in layers[i]._layers: |
|
if not isinstance(layer, ResidualBlock): |
|
if not hasattr(layer, '__iter__'): |
|
model._layers.append(layer) |
|
else: |
|
if expand_residual_blocks: |
|
for lyr in layer._layers: |
|
if not hasattr(lyr, '__iter__'): |
|
model._layers.append(lyr) |
|
else: |
|
model._layers.append(layer) |
|
else: |
|
model._layers.append(layers[i]) |
|
|
|
model.summary() |
|
|
|
|
|
model._layers.clear() |
|
[model._layers.append(lyr) for lyr in layers] |