import numpy as np import scipy.stats as st import tensorflow tensorflow.compat.v1.disable_v2_behavior() tf = tensorflow.compat.v1 def layer(op): def layer_decorated(self, *args, **kwargs): # Automatically set a name if not provided. name = kwargs.setdefault('name', self.get_unique_name(op.__name__)) # Figure out the layer inputs. if len(self.terminals) == 0: raise RuntimeError('No input variables found for layer %s.' % name) elif len(self.terminals) == 1: layer_input = self.terminals[0] else: layer_input = list(self.terminals) # Perform the operation and get the output. layer_output = op(self, layer_input, *args, **kwargs) # Add to layer LUT. self.layers[name] = layer_output # This output is now the input for the next layer. self.feed(layer_output) # Return self for chained calls. return self return layer_decorated class Smoother(object): def __init__(self, inputs, filter_size, sigma): self.inputs = inputs self.terminals = [] self.layers = dict(inputs) self.filter_size = filter_size self.sigma = sigma self.setup() def setup(self): (self.feed('data') .conv(name = 'smoothing')) def get_unique_name(self, prefix): ident = sum(t.startswith(prefix) for t, _ in self.layers.items()) + 1 return '%s_%d' % (prefix, ident) def feed(self, *args): assert len(args) != 0 self.terminals = [] for fed_layer in args: if isinstance(fed_layer, str): try: fed_layer = self.layers[fed_layer] except KeyError: raise KeyError('Unknown layer name fed: %s' % fed_layer) self.terminals.append(fed_layer) return self def gauss_kernel(self, kernlen=21, nsig=3, channels=1): interval = (2*nsig+1.)/(kernlen) x = np.linspace(-nsig-interval/2., nsig+interval/2., kernlen+1) kern1d = np.diff(st.norm.cdf(x)) kernel_raw = np.sqrt(np.outer(kern1d, kern1d)) kernel = kernel_raw/kernel_raw.sum() out_filter = np.array(kernel, dtype = np.float32) out_filter = out_filter.reshape((int(kernlen), int(kernlen), 1, 1)) out_filter = np.repeat(out_filter, channels, axis = 2) return out_filter def make_gauss_var(self, name, size, sigma, c_i): kernel = self.gauss_kernel(size, sigma, c_i) var = tf.Variable(tf.convert_to_tensor(kernel), name=name) return var def get_output(self): '''Returns the smoother output.''' return self.terminals[-1] @layer def conv(self, input, name, padding='SAME'): # Get the number of channels in the input c_i = input.get_shape().as_list()[3] # Convolution for a given input and kernel convolve = lambda i, k: tf.nn.depthwise_conv2d(i, k, [1, 1, 1, 1], padding=padding) with tf.variable_scope(name) as scope: kernel = self.make_gauss_var('gauss_weight', self.filter_size, self.sigma, c_i) output = convolve(input, kernel) return output