Yann
push backend
86694c3
import datetime
import json
import logging
import os
from pickle import load
from typing import Callable, List
import librosa
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from keras import backend as K
from keras.callbacks import CSVLogger
from kapre.time_frequency import Spectrogram
from models.importer_audio import audio_importer
import dawdreamer as daw
from scipy.io import wavfile
import librosa
from generators.parameters import ParameterSet, ParamValue
from models.common.data_generator import SoundDataGenerator
weight_var = K.variable(0.)
class Weight_trans(keras.callbacks.Callback):
def __init__(self, weight_var, transition, epochs):
self.alpha = weight_var
self.transition = transition
self.epochs = epochs
def on_epoch_end(self, epoch, logs={}):
if epoch > 680:
if self.transition == "linear":
K.set_value(self.alpha, ((epoch)/(self.epochs) - 0.617)*0.00001)
tf.print(f"new weight {weight_var.numpy()}")
if self.transition == "linear2":
K.set_value(self.alpha, (1.5625*epoch - 1.0625)*0.00001)
tf.print(f"new weight {weight_var.numpy()}")
if self.transition == "log":
K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.67285)/tf.math.log(0.0005)) - 0.35)*0.00001)
tf.print("log")
if self.transition == "log2":
K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.6575)/tf.math.log(0.0005)) - 0.5)*0.00001)
tf.print("log")
if self.transition == "log3":
K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.67978)/tf.math.log(0.00000005)) - 0.5)*0.00001)
tf.print("log")
if self.transition == "square":
K.set_value(self.alpha, 4.1*tf.pow(epoch*0.001 - 0.65, 2) + 0.002)
print("exp")
if self.transition == "quad":
K.set_value(self.alpha, 33*tf.pow(epoch*0.001 - 0.65, 4) + 0.002)
print("quad")
def train_val_split(
x_train: np.ndarray, y_train: np.ndarray, split: float = 0.2,
) -> tuple:
slice: int = int(x_train.shape[0] * split)
x_val: np.ndarray = x_train[-slice:]
y_val: np.ndarray = y_train[-slice:]
x_train = x_train[:-slice]
y_train = y_train[:-slice]
return (x_val, y_val, x_train, y_train)
"""Model Utils"""
def mean_percentile_rank(y_true, y_pred, k=5):
"""
@paper
The first evaluation measure is the Mean Percentile Rank
(MPR) which is computed per synthesizer parameter.
"""
# TODO
def top_k_mean_accuracy(y_true, y_pred, k=5):
"""
@ paper
The top-k mean accuracy is obtained by computing the top-k
accuracy for each test example and then taking the mean across
all examples. In the same manner as done in the MPR analysis,
we compute the top-k mean accuracy per synthesizer
parameter for π‘˜ = 1, ... ,5.
"""
# TODO: per parameter?
original_shape = tf.shape(y_true)
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1]))
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1]))
top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k)
correct_pred = tf.reshape(top_k, original_shape[:-1])
return tf.reduce_mean(tf.cast(correct_pred, tf.float32))
@tf.function
def CustomLoss(y_true, y_pred):
bce = tf.keras.losses.BinaryCrossentropy()
weights = custom_spectral_loss(y_true, y_pred)
weight_shift = (1-weight_var.numpy())+(weight_var.numpy()*weights.numpy())
# tf.print(f"New weight is {weight_shift}")
loss = bce(y_true, y_pred, sample_weight=weight_shift)
return loss
@tf.function
def custom_spectral_loss(y_true, y_pred):
# tf.print("After compiling model :",tf.executing_eagerly())
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1]))
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1]))
# Assuming y_true and y_pred contain parameters for audio synthesis
# Extract parameters from y_true and y_pred
with open("test_datasets/InverSynth_params.pckl", "rb") as f:
parameters: ParameterSet = load(f)
predlist_true: List[ParamValue] = parameters.decode(y_true[0])
predlist_pred: List[ParamValue] = parameters.decode(y_pred[0])
# Convert parameter lists to DataFrames
# Generate audio from parameters
audio_true, penalty = generate_audio(predlist_true)
audio_pred, penalty = generate_audio(predlist_pred)
# Compute spectrogram
if SPECTRO_TYPE == 'spectro':
spectrogram_true = tf.math.abs(tf.signal.stft(audio_true, frame_length=1024, frame_step=512))
spectrogram_pred = tf.math.abs(tf.signal.stft(audio_pred, frame_length=1024, frame_step=512))
elif SPECTRO_TYPE == 'qtrans':
spectrogram_true = librosa.amplitude_to_db(librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max)
spectrogram_pred = librosa.amplitude_to_db(librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max)
elif SPECTRO_TYPE == 'mel':
mel_spect = librosa.feature.melspectrogram(audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024)
spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max)
mel_spect = librosa.feature.melspectrogram(audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024)
spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max)
#L1 LOSS
if LOSS_TYPE == 'L1':
spectral_loss = penalty*tf.reduce_mean(tf.abs(spectrogram_true-spectrogram_pred))
#L2 LOSS
elif LOSS_TYPE =='L2':
spectral_loss = penalty*tf.reduce_mean((spectrogram_true - spectrogram_pred)**2)
#COSINE LOSS
elif LOSS_TYPE == 'COSINE':
spectral_loss = tf.losses.cosine_distance(spectrogram_true, spectrogram_pred, weights=1.0, axis=-1)
return spectral_loss
def summarize_compile(model: keras.Model):
model.summary(line_length=80, positions=[0.33, 0.65, 0.8, 1.0], show_trainable=True, expand_nested=True)
# Specify the training configuration (optimizer, loss, metrics)
model.compile(
optimizer=keras.optimizers.Adam(), # Optimizer- Adam [14] optimizer
# Loss function to minimize
# @paper: Therefore, we converged on using sigmoid activations with binary cross entropy loss.
# loss=keras.losses.BinaryCrossentropy(),
loss=CustomLoss,
# List of metrics to monitor
metrics=[
# @paper: 1) Mean Percentile Rank?
# mean_percentile_rank,
# @paper: 2) Top-k mean accuracy based evaluation
top_k_mean_accuracy,
custom_spectral_loss,
# Extra Adding 3) spectroloss accuracy
# Extra Adding 4) combined
# @paper: 5) Mean Absolute Error based evaluation
keras.metrics.MeanAbsoluteError(),
],
)
def fit(
model: keras.Model,
x_train: np.ndarray,
y_train: np.ndarray,
x_val: np.ndarray,
y_val: np.ndarray,
batch_size: int = 16,
epochs: int = 200,
) -> keras.Model:
# @paper:
# with a minibatch size of 16 for
# 100 epochs. The best weights for each model were set by
# employing an early stopping procedure.
logging.info("# Fit model on training data")
history = model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
# @paper:
# Early stopping procedure:
# We pass some validation for
# monitoring validation loss and metrics
# at the end of each epoch
validation_data=(x_val, y_val),
verbose=0,
)
# The returned "history" object holds a record
# of the loss values and metric values during training
logging.info("\nhistory dict:", history.history)
return model
def compare(target, prediction, params, precision=1, print_output=False):
if print_output and len(prediction) < 10:
print(prediction)
print("Pred: {}".format(np.round(prediction, decimals=2)))
print("PRnd: {}".format(np.round(prediction)))
print("Act : {}".format(target))
print("+" * 5)
pred: List[ParamValue] = params.decode(prediction)
act: List[ParamValue] = params.decode(target)
pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred]
act_index: List[int] = [np.array(p.encoding).argmax() for p in act]
width = 8
names = "Parameter: "
act_s = "Actual: "
pred_s = "Predicted: "
pred_i = "Pred. Indx:"
act_i = "Act. Index:"
diff_i = "Index Diff:"
for p in act:
names += p.name.rjust(width)[:width]
act_s += f"{p.value:>8.2f}"
for p in pred:
pred_s += f"{p.value:>8.2f}"
for p in pred_index:
pred_i += f"{p:>8}"
for p in act_index:
act_i += f"{p:>8}"
for i in range(len(act_index)):
diff = pred_index[i] - act_index[i]
diff_i += f"{diff:>8}"
exact = 0.0
close = 0.0
n_params = len(pred_index)
for i in range(n_params):
if pred_index[i] == act_index[i]:
exact = exact + 1.0
if abs(pred_index[i] - act_index[i]) <= precision:
close = close + 1.0
exact_ratio = exact / n_params
close_ratio = close / n_params
if print_output:
print(names)
print(act_s)
print(pred_s)
print(act_i)
print(pred_i)
print(diff_i)
print("-" * 30)
return exact_ratio, close_ratio
def evaluate(
prediction: np.ndarray, x: np.ndarray, y: np.ndarray, params: ParameterSet,
):
print("Prediction Shape: {}".format(prediction.shape))
num: int = x.shape[0]
correct: int = 0
correct_r: float = 0.0
close_r: float = 0.0
for i in range(num):
should_print = i < 5
exact, close = compare(
target=y[i],
prediction=prediction[i],
params=params,
print_output=should_print,
)
if exact == 1.0:
correct = correct + 1
correct_r += exact
close_r += close
summary = params.explain()
print(
"{} Parameters with {} levels (fixed: {})".format(
summary["n_variable"], summary["levels"], summary["n_fixed"]
)
)
print(
"Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format(
correct,
num,
correct / num * 100,
correct_r / num * 100,
close_r / num * 100,
)
)
def data_format_audio(audio: np.ndarray, data_format: str) -> np.ndarray:
# `(None, n_channel, n_freq, n_time)` if `'channels_first'`,
# `(None, n_freq, n_time, n_channel)` if `'channels_last'`,
if data_format == "channels_last":
audio = audio[np.newaxis, :, np.newaxis]
else:
audio = audio[np.newaxis, np.newaxis, :]
return audio
"""
Wrap up the whole training process in a standard function. Gets a callback
to actually make the model, to keep it as flexible as possible.
# Params:
# - dataset_name (dataset name)
# - model_name: (C1..C6,e2e)
# - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model
# - epochs: int
# - dataset_dir: place to find input data
# - output_dir: place to put outputs
# - parameters_file (override parameters filename)
# - dataset_file (override dataset filename)
# - data_format (channels_first or channels_last)
# - run_name: to save this run as
"""
#LOSS TYPE FOR CUSTOM LOSS FUNCTION
LOSS_TYPE = 'L1'
SPECTRO_TYPE = 'spectro'
PRINT = 1
#DAWDREAMER EXPORT SETTINGS
SAMPLE_RATE = 16384
BUFFER_SIZE = 1024
SYNTH_PLUGIN = "libTAL-NoiseMaker.so"
ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE)
SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN)
SYNTH.add_midi_note(40, 127, 0, 0.8)
with open('plugin_config/TAL-NoiseMaker-config.json') as f:
data = json.load(f)
dico=[]
# Extract the key ID from the JSON data
key_id = data['parameters']
for param in key_id:
dico.append(param['id'])
DICO=dico
def train_model(
# Main options
dataset_name: str,
model_name: str,
epochs: int,
model_callback: Callable[[str, int, int, str], keras.Model],
dataset_dir: str,
output_dir: str, # Directory names
dataset_file: str = None,
parameters_file: str = None,
run_name: str = None,
data_format: str = "channels_last",
save_best: bool = True,
resume: bool = False,
checkpoint: bool = True,
model_type: str = "STFT",
):
tf.config.run_functions_eagerly(True)
# tf.data.experimental.enable_debug_mode()
time_generated = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
if not dataset_file:
dataset_file = (
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5"
)
if not parameters_file:
parameters_file = (
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl"
)
if not run_name:
run_name = dataset_name + "_" + model_name
model_file = f"{output_dir}/model/{run_name}_{time_generated}"
if not os.path.exists(model_file):
os.makedirs(model_file)
best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}"
if not os.path.exists(best_model_file):
os.makedirs(best_model_file)
if resume:
# checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
# history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
checkpoint_model_file = f"{output_dir}/history/InverSynth_C6XL_checkpoint_20231201-103344"
history_file = f"{output_dir}/checkpoints/InverSynth_C6XL_20231201-103344"
else:
os.makedirs(f"{output_dir}/history", exist_ok=True)
os.makedirs(f"{output_dir}/checkpoints", exist_ok=True)
history_file = f"{output_dir}/history/{run_name}_{time_generated}"
checkpoint_model_file = f"{output_dir}/checkpoints/{run_name}_checkpoint_{time_generated}"
history_graph_file = f"{output_dir}/{run_name}.pdf"
print(tf.config.list_physical_devices('GPU'))
gpu_avail = len(tf.config.list_physical_devices('GPU')) # True/False
cuda_gpu_avail = len(tf.config.list_physical_devices('GPU')) # True/False
print("+" * 30)
print(f"++ {run_name}")
print(
f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs"
)
print(f"Saving model in {output_dir} as {model_file}")
print(f"Saving history as {history_file}")
print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}")
print("+" * 30)
os.makedirs(output_dir, exist_ok=True)
# Get training and validation generators
params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True}
training_generator = SoundDataGenerator(first=0.8, **params)
validation_generator = SoundDataGenerator(last=0.2, **params)
n_samples = training_generator.get_audio_length()
print(f"get_audio_length: {n_samples}")
n_outputs = training_generator.get_label_size()
# set keras image_data_format
# NOTE: on CPU only `channels_last` is supported
physical_devices = tf.config.list_physical_devices('GPU')
keras.backend.set_image_data_format(data_format)
model: keras.Model = None
if resume and os.path.exists(checkpoint_model_file):
history = pd.read_csv(history_file)
# Note - its zero indexed in the file, but 1 indexed in the display
initial_epoch: int = max(history.iloc[:, 0]) + 1
# epochs:int = initial_epoch
print(
f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}"
)
model = keras.models.load_model(
checkpoint_model_file
,
custom_objects={"top_k_mean_accuracy": top_k_mean_accuracy, "Spectrogram" : Spectrogram,
"custom_spectral_loss": custom_spectral_loss, "CustomLoss": CustomLoss
},
)
else:
model = model_callback(
model_name=model_name,
inputs=n_samples,
outputs=n_outputs,
data_format=data_format,
)
# keras.utils.plot_model(model, to_file='model.png', show_shapes=True, show_layer_activations=True)
# Summarize and compile the model
summarize_compile(model)
initial_epoch = 0
open(history_file, "w").close()
callbacks = []
best_callback = keras.callbacks.ModelCheckpoint(
filepath=best_model_file,
save_weights_only=False,
save_best_only=True,
verbose=1,
)
checkpoint_callback = keras.callbacks.ModelCheckpoint(
filepath=checkpoint_model_file,
save_weights_only=False,
save_best_only=False,
verbose=1,
)
os.makedirs(f"{output_dir}/logs", exist_ok=True)
log_dir = f"{output_dir}/logs/" + time_generated
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True, profile_batch = '500,520')
if save_best:
callbacks.append(best_callback)
if checkpoint:
callbacks.append(checkpoint_callback)
callbacks.append(tensorboard_callback)
callbacks.append(CSVLogger(history_file, append=True))
callbacks.append(Weight_trans(weight_var, "log3" ,epochs))
# Parameter data - needed for decoding!
# Fit the model
history = None
try:
history = model.fit(
x=training_generator,
validation_data=validation_generator,
epochs=epochs,
callbacks=callbacks,
initial_epoch=initial_epoch,
verbose=1, # https://github.com/tensorflow/tensorflow/issues/38064
)
except Exception as e:
print(f"Something went wrong during `model.fit`: {e}")
# Save model
model.save(model_file)
# Save history
if history and not resume:
try:
hist_df = pd.DataFrame(history.history)
try:
fig = hist_df.plot(subplots=True, figsize=(8, 25))
fig[0].get_figure().savefig(history_graph_file)
except Exception as e:
print("Couldn't create history graph")
print(e)
except Exception as e:
tf.print("Couldn't save history")
print(e)
# evaluate prediction on random sample from validation set
# Parameter data - needed for decoding!
with open(parameters_file, "rb") as f:
parameters: ParameterSet = load(f)
# Shuffle data
validation_generator.on_epoch_end()
X, y = validation_generator.__getitem__(0)
X.reshape((X.__len__(), 1, 16384))
# if model_type == "STFT":
# # stft expects shape (channel, sample_rate)
# X = np.moveaxis(X, 1, -1)
prediction: np.ndarray = model.predict(X)
evaluate(prediction, X, y, parameters)
print("++++" * 5)
print("Pushing to trained model")
print("++++" * 5)
Valid=False
while Valid==False:
file = namefile = input("Enter .wav test file path: ")
if os.path.exists(file):
Valid=True
else:
print("File Path invalid, try again ")
newpred = model.predict(audio_importer(str(f'{namefile}')))
predlist: List[ParamValue] = parameters.decode(newpred[0])
df = pd.DataFrame(predlist)
print(df)
df = df.drop(['encoding'], axis=1)
# saving the dataframe
if not os.path.exists(str(f'output/wav_inferred')):
os.makedirs(str(f'output/wav_inferred'))
head, tail = os.path.split(str(f'{namefile}'))
print("Outputting CSV config in " + str(f'output/wav_inferred'))
df.to_csv(str(f'output/wav_inferred/{tail}.csv'))
#export(prediction, X, y, parameters)
# Loop through the rows of the DataFrame
i = 0
for values in df['value'].values:
# Set parameters using DataFrame values
SYNTH.set_parameter(DICO[i],values)
# (MIDI note, velocity, start, duration)
i += 1
#Setting volume to 0.9
SYNTH.set_parameter(1, 0.9)
# Set up the processing graph
graph = [
# synth takes no inputs, so we give an empty list.
(SYNTH, []),
]
ENGINE.load_graph(graph)
ENGINE.render(1)
data = ENGINE.get_audio()
try:
data = librosa.to_mono(data).transpose()
except:
tf.print("ERROR" * 100)
df = df.fillna(0)
data = df.to_numpy()
data = librosa.to_mono(data).transpose()
tf.print("crashed, nan in generation")
synth_params = dict(SYNTH.get_patch())
print(synth_params)
df = pd.DataFrame(data)
# penalty=1000000
# df = pd.DataFrame(data)
# df = df.fillna(0)
# data = df.to_numpy()
wavfile.write(str(f'output/wav_inferred/gen_{tail}.wav'), SAMPLE_RATE, data)
def generate_audio(df_params):
# Loop through the rows of the DataFrame
i = 0
penalty=1
for param in df_params:
# Set parameters using DataFrame values
SYNTH.set_parameter(DICO[i], param.value)
# (MIDI note, velocity, start, duration)
i += 1
# Set up the processing graph
graph = [
# synth takes no inputs, so we give an empty list.
(SYNTH, []),
]
ENGINE.load_graph(graph)
ENGINE.render(1)
data = ENGINE.get_audio()
if np.isnan(data).any():
# df = pd.DataFrame(data)
# df = df.fillna(0)
# data = df.to_numpy()
tf.print("crashed, nan in generation")
synth_params = dict(SYNTH.get_patch())
print(synth_params)
try:
data = librosa.to_mono(data).transpose()
if(librosa.util.valid_audio(data)):
result = np.array(data)
return result, penalty
except:
tf.print("crashed, nan in generation")
raise("Nan in generation, crashed")