import inspect from typing import List import os os.environ["KERAS_BACKEND"] = "torch" import keras # from keras_core import backend as K, Model, Input, optimizers # from keras_core import backend as Model, Input, optimizers # from keras_core import backend as K from keras import Model from keras import optimizers from keras import ops as K from keras import config as KK from keras import layers from keras.layers import Input, Layer, Conv1D, Dense, BatchNormalization, LayerNormalization, Activation, SpatialDropout1D, Lambda def is_power_of_two(num: int): return num != 0 and ((num & (num - 1)) == 0) def adjust_dilations(dilations: list): if all([is_power_of_two(i) for i in dilations]): return dilations else: new_dilations = [2 ** i for i in dilations] return new_dilations class ResidualBlock(Layer): def __init__(self, dilation_rate: int, nb_filters: int, kernel_size: int, padding: str, activation: str = 'relu', dropout_rate: float = 0, kernel_initializer: str = 'he_normal', use_batch_norm: bool = False, use_layer_norm: bool = False, use_weight_norm: bool = False, **kwargs): """Defines the residual block for the WaveNet TCN Args: x: The previous layer in the model training: boolean indicating whether the layer should behave in training mode or in inference mode dilation_rate: The dilation power of 2 we are using for this residual block nb_filters: The number of convolutional filters to use in this block kernel_size: The size of the convolutional kernel padding: The padding used in the convolutional layers, 'same' or 'causal'. activation: The final activation used in o = Activation(x + F(x)) dropout_rate: Float between 0 and 1. Fraction of the input units to drop. kernel_initializer: Initializer for the kernel weights matrix (Conv1D). use_batch_norm: Whether to use batch normalization in the residual layers or not. use_layer_norm: Whether to use layer normalization in the residual layers or not. use_weight_norm: Whether to use weight normalization in the residual layers or not. kwargs: Any initializers for Layer class. """ self.dilation_rate = dilation_rate self.nb_filters = nb_filters self.kernel_size = kernel_size self.padding = padding self.activation = activation self.dropout_rate = dropout_rate self.use_batch_norm = use_batch_norm self.use_layer_norm = use_layer_norm self.use_weight_norm = use_weight_norm self.kernel_initializer = kernel_initializer self.layers = [] self.shape_match_conv = None self.res_output_shape = None self.final_activation = None super(ResidualBlock, self).__init__(**kwargs) def _build_layer(self, layer): """Helper function for building layer Args: layer: Appends layer to internal layer list and builds it based on the current output shape of ResidualBlocK. Updates current output shape. """ self.layers.append(layer) self.layers[-1].build(self.res_output_shape) self.res_output_shape = self.layers[-1].compute_output_shape(self.res_output_shape) def build(self, input_shape): #with K.name_scope(self.name): # name scope used to make sure weights get unique names self.layers = [] self.res_output_shape = input_shape for k in range(2): # dilated conv block. name = 'conv1D_{}'.format(k) # with K.name_scope(name): # name scope used to make sure weights get unique names conv = Conv1D( filters=self.nb_filters, kernel_size=self.kernel_size, dilation_rate=self.dilation_rate, padding=self.padding, name=name, kernel_initializer=self.kernel_initializer ) if self.use_weight_norm: from tensorflow_addons.layers import WeightNormalization # wrap it. WeightNormalization API is different than BatchNormalization or LayerNormalization. #with K.name_scope('norm_{}'.format(k)): conv = WeightNormalization(conv) self._build_layer(conv) #with K.name_scope('norm_{}'.format(k)): if self.use_batch_norm: self._build_layer(BatchNormalization()) elif self.use_layer_norm: self._build_layer(LayerNormalization()) elif self.use_weight_norm: pass # done above. # with K.name_scope('act_and_dropout_{}'.format(k)): self._build_layer(Activation(self.activation, name='Act_Conv1D_{}'.format(k))) self._build_layer(SpatialDropout1D(rate=self.dropout_rate, name='SDropout_{}'.format(k))) if self.nb_filters != input_shape[-1]: # 1x1 conv to match the shapes (channel dimension). name = 'matching_conv1D' #with K.name_scope(name): # make and build this layer separately because it directly uses input_shape. # 1x1 conv. self.shape_match_conv = Conv1D( filters=self.nb_filters, kernel_size=1, padding='same', name=name, kernel_initializer=self.kernel_initializer ) else: name = 'matching_identity' self.shape_match_conv = Lambda(lambda x: x, name=name) #with K.name_scope(name): self.shape_match_conv.build(input_shape) self.res_output_shape = self.shape_match_conv.compute_output_shape(input_shape) self._build_layer(Activation(self.activation, name='Act_Conv_Blocks')) self.final_activation = Activation(self.activation, name='Act_Res_Block') self.final_activation.build(self.res_output_shape) # probably isn't necessary # this is done to force Keras to add the layers in the list to self._layers for layer in self.layers: self.__setattr__(layer.name, layer) self.__setattr__(self.shape_match_conv.name, self.shape_match_conv) self.__setattr__(self.final_activation.name, self.final_activation) super(ResidualBlock, self).build(input_shape) # done to make sure self.built is set True def call(self, inputs, training=None, **kwargs): """ Returns: A tuple where the first element is the residual model tensor, and the second is the skip connection tensor. """ # https://arxiv.org/pdf/1803.01271.pdf page 4, Figure 1 (b). # x1: Dilated Conv -> Norm -> Dropout (x2). # x2: Residual (1x1 matching conv - optional). # Output: x1 + x2. # x1 -> connected to skip connections. # x1 + x2 -> connected to the next block. # input # x1 x2 # conv1D 1x1 Conv1D (optional) # ... # conv1D # ... # x1 + x2 x1 = inputs for layer in self.layers: training_flag = 'training' in dict(inspect.signature(layer.call).parameters) x1 = layer(x1, training=training) if training_flag else layer(x1) x2 = self.shape_match_conv(inputs) x1_x2 = self.final_activation(layers.add([x2, x1], name='Add_Res')) return [x1_x2, x1] def compute_output_shape(self, input_shape): return [self.res_output_shape, self.res_output_shape] class TCN(Layer): """Creates a TCN layer. Input shape: A tensor of shape (batch_size, timesteps, input_dim). Args: nb_filters: The number of filters to use in the convolutional layers. Can be a list. kernel_size: The size of the kernel to use in each convolutional layer. dilations: The list of the dilations. Example is: [1, 2, 4, 8, 16, 32, 64]. nb_stacks : The number of stacks of residual blocks to use. padding: The padding to use in the convolutional layers, 'causal' or 'same'. use_skip_connections: Boolean. If we want to add skip connections from input to each residual blocK. return_sequences: Boolean. Whether to return the last output in the output sequence, or the full sequence. activation: The activation used in the residual blocks o = Activation(x + F(x)). dropout_rate: Float between 0 and 1. Fraction of the input units to drop. kernel_initializer: Initializer for the kernel weights matrix (Conv1D). use_batch_norm: Whether to use batch normalization in the residual layers or not. use_layer_norm: Whether to use layer normalization in the residual layers or not. use_weight_norm: Whether to use weight normalization in the residual layers or not. kwargs: Any other arguments for configuring parent class Layer. For example "name=str", Name of the model. Use unique names when using multiple TCN. Returns: A TCN layer. """ def __init__(self, nb_filters=256, kernel_size=5, nb_stacks=1, dilations=(1, 2, 4, 8, 16, 32), padding='causal', use_skip_connections=True, dropout_rate=0.0, return_sequences=False, activation='relu', kernel_initializer='he_normal', use_batch_norm=False, use_layer_norm=False, use_weight_norm=False, **kwargs): print("nb_filters:", nb_filters, "kernel_size", kernel_size) self.return_sequences = return_sequences self.dropout_rate = dropout_rate self.use_skip_connections = use_skip_connections self.dilations = dilations self.nb_stacks = nb_stacks self.kernel_size = kernel_size self.nb_filters = nb_filters self.activation_name = activation self.padding = padding self.kernel_initializer = kernel_initializer self.use_batch_norm = use_batch_norm self.use_layer_norm = use_layer_norm self.use_weight_norm = use_weight_norm self.skip_connections = [] self.residual_blocks = [] self.layers_outputs = [] self.build_output_shape = None self.slicer_layer = None # in case return_sequence=False self.output_slice_index = None # in case return_sequence=False self.padding_same_and_time_dim_unknown = False # edge case if padding='same' and time_dim = None if self.use_batch_norm + self.use_layer_norm + self.use_weight_norm > 1: raise ValueError('Only one normalization can be specified at once.') if isinstance(self.nb_filters, list): assert len(self.nb_filters) == len(self.dilations) if len(set(self.nb_filters)) > 1 and self.use_skip_connections: raise ValueError('Skip connections are not compatible ' 'with a list of filters, unless they are all equal.') if padding != 'causal' and padding != 'same': raise ValueError("Only 'causal' or 'same' padding are compatible for this layer.") # initialize parent class super(TCN, self).__init__(**kwargs) @property def receptive_field(self): return 1 + 2 * (self.kernel_size - 1) * self.nb_stacks * sum(self.dilations) def build(self, input_shape): # member to hold current output shape of the layer for building purposes self.build_output_shape = input_shape # list to hold all the member ResidualBlocks self.residual_blocks = [] total_num_blocks = self.nb_stacks * len(self.dilations) if not self.use_skip_connections: total_num_blocks += 1 # cheap way to do a false case for below for s in range(self.nb_stacks): for i, d in enumerate(self.dilations): res_block_filters = self.nb_filters[i] if isinstance(self.nb_filters, list) else self.nb_filters self.residual_blocks.append(ResidualBlock(dilation_rate=d, nb_filters=res_block_filters, kernel_size=self.kernel_size, padding=self.padding, activation=self.activation_name, dropout_rate=self.dropout_rate, use_batch_norm=self.use_batch_norm, use_layer_norm=self.use_layer_norm, use_weight_norm=self.use_weight_norm, kernel_initializer=self.kernel_initializer, name='residual_block_{}'.format(len(self.residual_blocks)))) # build newest residual block self.residual_blocks[-1].build(self.build_output_shape) self.build_output_shape = self.residual_blocks[-1].res_output_shape # this is done to force keras to add the layers in the list to self._layers for layer in self.residual_blocks: self.__setattr__(layer.name, layer) self.output_slice_index = None if self.padding == 'same': time = self.build_output_shape.as_list()[1] if time is not None: # if time dimension is defined. e.g. shape = (bs, 500, input_dim). self.output_slice_index = int(self.build_output_shape.as_list()[1] / 2) else: # It will known at call time. c.f. self.call. self.padding_same_and_time_dim_unknown = True else: self.output_slice_index = -1 # causal case. self.slicer_layer = Lambda(lambda tt: tt[:, self.output_slice_index, :], name='Slice_Output') if type(self.build_output_shape) == tuple: static = list(self.build_output_shape) else: static = self.build_output_shape.as_list() self.slicer_layer.build(static) def compute_output_shape(self, input_shape): """ Overridden in case keras uses it somewhere... no idea. Just trying to avoid future errors. """ if not self.built: self.build(input_shape) if not self.return_sequences: batch_size = self.build_output_shape[0] batch_size = batch_size.value if hasattr(batch_size, 'value') else batch_size nb_filters = self.build_output_shape[-1] return [batch_size, nb_filters] else: # Compatibility tensorflow 1.x return [v.value if hasattr(v, 'value') else v for v in self.build_output_shape] def call(self, inputs, training=None, **kwargs): x = inputs self.layers_outputs = [x] self.skip_connections = [] for res_block in self.residual_blocks: # try: # x, skip_out = res_block(x, training=training) # except TypeError: # compatibility with tensorflow 1.x # x, skip_out = res_block(K.cast(x, 'float32'), training=training) x, skip_out = res_block(x, training=training) self.skip_connections.append(skip_out) self.layers_outputs.append(x) if self.use_skip_connections: x = layers.add(self.skip_connections, name='Add_Skip_Connections') self.layers_outputs.append(x) if not self.return_sequences: # case: time dimension is unknown. e.g. (bs, None, input_dim). if self.padding_same_and_time_dim_unknown: self.output_slice_index = K.shape(self.layers_outputs[-1])[1] // 2 x = self.slicer_layer(x) self.layers_outputs.append(x) return x def get_config(self): """ Returns the config of a the layer. This is used for saving and loading from a model :return: python dictionary with specs to rebuild layer """ config = super(TCN, self).get_config() config['nb_filters'] = self.nb_filters config['kernel_size'] = self.kernel_size config['nb_stacks'] = self.nb_stacks config['dilations'] = self.dilations config['padding'] = self.padding config['use_skip_connections'] = self.use_skip_connections config['dropout_rate'] = self.dropout_rate config['return_sequences'] = self.return_sequences config['activation'] = self.activation_name config['use_batch_norm'] = self.use_batch_norm config['use_layer_norm'] = self.use_layer_norm config['use_weight_norm'] = self.use_weight_norm config['kernel_initializer'] = self.kernel_initializer return config def compiled_tcn(num_feat, # type: int num_classes, # type: int nb_filters, # type: int kernel_size, # type: int dilations, # type: List[int] nb_stacks, # type: int max_len, # type: int output_len=1, # type: int padding='causal', # type: str use_skip_connections=False, # type: bool return_sequences=True, regression=False, # type: bool dropout_rate=0.05, # type: float name='tcn', # type: str, kernel_initializer='he_normal', # type: str, activation='relu', # type:str, opt='adam', lr=0.002, use_batch_norm=False, use_layer_norm=False, use_weight_norm=False): # type: (...) -> Model """Creates a compiled TCN model for a given task (i.e. regression or classification). Classification uses a sparse categorical loss. Please input class ids and not one-hot encodings. Args: num_feat: The number of features of your input, i.e. the last dimension of: (batch_size, timesteps, input_dim). num_classes: The size of the final dense layer, how many classes we are predicting. nb_filters: The number of filters to use in the convolutional layers. kernel_size: The size of the kernel to use in each convolutional layer. dilations: The list of the dilations. Example is: [1, 2, 4, 8, 16, 32, 64]. nb_stacks : The number of stacks of residual blocks to use. max_len: The maximum sequence length, use None if the sequence length is dynamic. padding: The padding to use in the convolutional layers. use_skip_connections: Boolean. If we want to add skip connections from input to each residual blocK. return_sequences: Boolean. Whether to return the last output in the output sequence, or the full sequence. regression: Whether the output should be continuous or discrete. dropout_rate: Float between 0 and 1. Fraction of the input units to drop. activation: The activation used in the residual blocks o = Activation(x + F(x)). name: Name of the model. Useful when having multiple TCN. kernel_initializer: Initializer for the kernel weights matrix (Conv1D). opt: Optimizer name. lr: Learning rate. use_batch_norm: Whether to use batch normalization in the residual layers or not. use_layer_norm: Whether to use layer normalization in the residual layers or not. use_weight_norm: Whether to use weight normalization in the residual layers or not. Returns: A compiled keras TCN. """ dilations = adjust_dilations(dilations) input_layer = Input(shape=(max_len, num_feat)) x = TCN(nb_filters, kernel_size, nb_stacks, dilations, padding, use_skip_connections, dropout_rate, return_sequences, activation, kernel_initializer, use_batch_norm, use_layer_norm, use_weight_norm, name=name)(input_layer) print('x.shape=', x.shape) def get_opt(): if opt == 'adam': return optimizers.Adam(lr=lr, clipnorm=1.) elif opt == 'rmsprop': return optimizers.RMSprop(lr=lr, clipnorm=1.) else: raise Exception('Only Adam and RMSProp are available here') if not regression: # classification print('asdasfdasfa') x = Dense(num_classes)(x) x = Activation('softmax')(x) output_layer = x model = Model(input_layer, output_layer) # https://github.com/keras-team/keras/pull/11373 # It's now in Keras@master but still not available with pip. # TODO remove later. def accuracy(y_true, y_pred): # reshape in case it's in shape (num_samples, 1) instead of (num_samples,) if K.ndim(y_true) == K.ndim(y_pred): y_true = K.squeeze(y_true, -1) # convert dense predictions to labels y_pred_labels = K.argmax(y_pred, axis=-1) y_pred_labels = K.cast(y_pred_labels, KK.floatx()) return K.cast(K.equal(y_true, y_pred_labels), KK.floatx()) model.compile(get_opt(), loss='sparse_categorical_crossentropy', metrics=[accuracy]) else: # regression x = Dense(output_len)(x) x = Activation('linear')(x) output_layer = x model = Model(input_layer, output_layer) model.compile(get_opt(), loss='mean_squared_error') print('model.x = {}'.format(input_layer.shape)) print('model.y = {}'.format(output_layer.shape)) return model def tcn_full_summary(model: Model, expand_residual_blocks=True): layers = model._layers.copy() # store existing layers model._layers.clear() # clear layers for i in range(len(layers)): if isinstance(layers[i], TCN): for layer in layers[i]._layers: if not isinstance(layer, ResidualBlock): if not hasattr(layer, '__iter__'): model._layers.append(layer) else: if expand_residual_blocks: for lyr in layer._layers: if not hasattr(lyr, '__iter__'): model._layers.append(lyr) else: model._layers.append(layer) else: model._layers.append(layers[i]) model.summary() # print summary # restore original layers model._layers.clear() [model._layers.append(lyr) for lyr in layers]