|
import tensorflow as tf |
|
from tensorflow import keras |
|
from tensorflow.keras import layers, optimizers |
|
|
|
|
|
class BoolformerLayer(layers.Layer): |
|
def __init__(self, threshold=0.5, **kwargs): |
|
super(BoolformerLayer, self).__init__(**kwargs) |
|
self.threshold = threshold |
|
|
|
def build(self, input_shape): |
|
self.dense_layer = layers.Dense(input_shape[-1], activation='relu') |
|
|
|
def call(self, inputs): |
|
logic_and = tf.math.logical_and(inputs, inputs > self.threshold) |
|
logic_transformed = self.dense_layer(logic_and) |
|
return logic_transformed |
|
|
|
|
|
def positional_encoding(seq_length, d_model): |
|
position = tf.range(seq_length, dtype=tf.float32)[:, tf.newaxis] |
|
div_term = tf.exp(tf.range(0, d_model, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / d_model)) |
|
pos_encoding = position * div_term |
|
pos_encoding = tf.concat([tf.sin(pos_encoding[:, 0::2]), tf.cos(pos_encoding[:, 1::2])], axis=-1) |
|
return pos_encoding[tf.newaxis, ...] |
|
|
|
|
|
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout_rate=0.1): |
|
attention_output = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout_rate)(inputs, inputs) |
|
attention_output = layers.Dropout(dropout_rate)(attention_output) |
|
attention_output = layers.LayerNormalization(epsilon=1e-6)(inputs + attention_output) |
|
|
|
ffn_output = layers.Dense(ff_dim, activation="relu")(attention_output) |
|
ffn_output = layers.Dense(inputs.shape[-1])(ffn_output) |
|
ffn_output = layers.Dropout(dropout_rate)(ffn_output) |
|
return layers.LayerNormalization(epsilon=1e-6)(attention_output + ffn_output) |
|
|
|
|
|
class QLearningLayer(layers.Layer): |
|
def __init__(self, action_space_size, learning_rate=0.01, gamma=0.95, **kwargs): |
|
super(QLearningLayer, self).__init__(**kwargs) |
|
self.action_space_size = action_space_size |
|
self.learning_rate = learning_rate |
|
self.gamma = gamma |
|
|
|
def build(self, input_shape): |
|
self.q_table = tf.Variable(initial_value=tf.random.uniform([input_shape[-1], self.action_space_size], 0, 1), trainable=True) |
|
|
|
def call(self, state, action=None, reward=None, next_state=None): |
|
if action is not None and reward is not None and next_state is not None: |
|
q_update = reward + self.gamma * tf.reduce_max(self.q_table[next_state]) |
|
self.q_table[state, action].assign((1 - self.learning_rate) * self.q_table[state, action] + self.learning_rate * q_update) |
|
return tf.argmax(self.q_table[state], axis=1) |
|
|
|
|
|
def create_neural_network_model(seq_length, d_model, action_space_size): |
|
input_layer = keras.Input(shape=(seq_length, d_model)) |
|
|
|
pos_encoded = positional_encoding(seq_length, d_model) + input_layer |
|
transformer_output = transformer_encoder(pos_encoded, head_size=32, num_heads=2, ff_dim=64) |
|
|
|
x_bool = BoolformerLayer()(transformer_output) |
|
rl_layer = QLearningLayer(action_space_size=action_space_size)(x_bool) |
|
|
|
output_layer = layers.Dense(action_space_size, activation='softmax', name='Output')(rl_layer) |
|
reward_layer = layers.Dense(1, name='Reward')(rl_layer) |
|
|
|
model = keras.Model(inputs=input_layer, outputs=[output_layer, reward_layer]) |
|
opt = optimizers.Adam(learning_rate=0.001) |
|
model.compile(optimizer=opt, loss={'Output': 'categorical_crossentropy', 'Reward': 'mean_squared_error'}, |
|
metrics={'Output': 'accuracy'}) |
|
|
|
return model |
|
|
|
|
|
seq_length = 128 |
|
d_model = 512 |
|
action_space_size = 10 |
|
|
|
model = create_neural_network_model(seq_length, d_model, action_space_size) |