Spaces:
Sleeping
Sleeping
import datetime | |
import json | |
import logging | |
import os | |
from pickle import load | |
from typing import Callable, List | |
import librosa | |
import numpy as np | |
import pandas as pd | |
import tensorflow as tf | |
from tensorflow import keras | |
from keras import backend as K | |
from keras.callbacks import CSVLogger | |
from kapre.time_frequency import Spectrogram | |
from models.importer_audio import audio_importer | |
import dawdreamer as daw | |
from scipy.io import wavfile | |
import librosa | |
from generators.parameters import ParameterSet, ParamValue | |
from models.common.data_generator import SoundDataGenerator | |
weight_var = K.variable(0.) | |
class Weight_trans(keras.callbacks.Callback): | |
def __init__(self, weight_var, transition, epochs): | |
self.alpha = weight_var | |
self.transition = transition | |
self.epochs = epochs | |
def on_epoch_end(self, epoch, logs={}): | |
if epoch > 680: | |
if self.transition == "linear": | |
K.set_value(self.alpha, ((epoch)/(self.epochs) - 0.617)*0.00001) | |
tf.print(f"new weight {weight_var.numpy()}") | |
if self.transition == "linear2": | |
K.set_value(self.alpha, (1.5625*epoch - 1.0625)*0.00001) | |
tf.print(f"new weight {weight_var.numpy()}") | |
if self.transition == "log": | |
K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.67285)/tf.math.log(0.0005)) - 0.35)*0.00001) | |
tf.print("log") | |
if self.transition == "log2": | |
K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.6575)/tf.math.log(0.0005)) - 0.5)*0.00001) | |
tf.print("log") | |
if self.transition == "log3": | |
K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.67978)/tf.math.log(0.00000005)) - 0.5)*0.00001) | |
tf.print("log") | |
if self.transition == "square": | |
K.set_value(self.alpha, 4.1*tf.pow(epoch*0.001 - 0.65, 2) + 0.002) | |
print("exp") | |
if self.transition == "quad": | |
K.set_value(self.alpha, 33*tf.pow(epoch*0.001 - 0.65, 4) + 0.002) | |
print("quad") | |
def train_val_split( | |
x_train: np.ndarray, y_train: np.ndarray, split: float = 0.2, | |
) -> tuple: | |
slice: int = int(x_train.shape[0] * split) | |
x_val: np.ndarray = x_train[-slice:] | |
y_val: np.ndarray = y_train[-slice:] | |
x_train = x_train[:-slice] | |
y_train = y_train[:-slice] | |
return (x_val, y_val, x_train, y_train) | |
"""Model Utils""" | |
def mean_percentile_rank(y_true, y_pred, k=5): | |
""" | |
@paper | |
The first evaluation measure is the Mean Percentile Rank | |
(MPR) which is computed per synthesizer parameter. | |
""" | |
# TODO | |
def top_k_mean_accuracy(y_true, y_pred, k=5): | |
""" | |
@ paper | |
The top-k mean accuracy is obtained by computing the top-k | |
accuracy for each test example and then taking the mean across | |
all examples. In the same manner as done in the MPR analysis, | |
we compute the top-k mean accuracy per synthesizer | |
parameter for π = 1, ... ,5. | |
""" | |
# TODO: per parameter? | |
original_shape = tf.shape(y_true) | |
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) | |
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) | |
top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k) | |
correct_pred = tf.reshape(top_k, original_shape[:-1]) | |
return tf.reduce_mean(tf.cast(correct_pred, tf.float32)) | |
def CustomLoss(y_true, y_pred): | |
bce = tf.keras.losses.BinaryCrossentropy() | |
weights = custom_spectral_loss(y_true, y_pred) | |
weight_shift = (1-weight_var.numpy())+(weight_var.numpy()*weights.numpy()) | |
# tf.print(f"New weight is {weight_shift}") | |
loss = bce(y_true, y_pred, sample_weight=weight_shift) | |
return loss | |
def custom_spectral_loss(y_true, y_pred): | |
# tf.print("After compiling model :",tf.executing_eagerly()) | |
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) | |
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) | |
# Assuming y_true and y_pred contain parameters for audio synthesis | |
# Extract parameters from y_true and y_pred | |
with open("test_datasets/InverSynth_params.pckl", "rb") as f: | |
parameters: ParameterSet = load(f) | |
predlist_true: List[ParamValue] = parameters.decode(y_true[0]) | |
predlist_pred: List[ParamValue] = parameters.decode(y_pred[0]) | |
# Convert parameter lists to DataFrames | |
# Generate audio from parameters | |
audio_true, penalty = generate_audio(predlist_true) | |
audio_pred, penalty = generate_audio(predlist_pred) | |
# Compute spectrogram | |
if SPECTRO_TYPE == 'spectro': | |
spectrogram_true = tf.math.abs(tf.signal.stft(audio_true, frame_length=1024, frame_step=512)) | |
spectrogram_pred = tf.math.abs(tf.signal.stft(audio_pred, frame_length=1024, frame_step=512)) | |
elif SPECTRO_TYPE == 'qtrans': | |
spectrogram_true = librosa.amplitude_to_db(librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max) | |
spectrogram_pred = librosa.amplitude_to_db(librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max) | |
elif SPECTRO_TYPE == 'mel': | |
mel_spect = librosa.feature.melspectrogram(audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024) | |
spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max) | |
mel_spect = librosa.feature.melspectrogram(audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024) | |
spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max) | |
#L1 LOSS | |
if LOSS_TYPE == 'L1': | |
spectral_loss = penalty*tf.reduce_mean(tf.abs(spectrogram_true-spectrogram_pred)) | |
#L2 LOSS | |
elif LOSS_TYPE =='L2': | |
spectral_loss = penalty*tf.reduce_mean((spectrogram_true - spectrogram_pred)**2) | |
#COSINE LOSS | |
elif LOSS_TYPE == 'COSINE': | |
spectral_loss = tf.losses.cosine_distance(spectrogram_true, spectrogram_pred, weights=1.0, axis=-1) | |
return spectral_loss | |
def summarize_compile(model: keras.Model): | |
model.summary(line_length=80, positions=[0.33, 0.65, 0.8, 1.0], show_trainable=True, expand_nested=True) | |
# Specify the training configuration (optimizer, loss, metrics) | |
model.compile( | |
optimizer=keras.optimizers.Adam(), # Optimizer- Adam [14] optimizer | |
# Loss function to minimize | |
# @paper: Therefore, we converged on using sigmoid activations with binary cross entropy loss. | |
# loss=keras.losses.BinaryCrossentropy(), | |
loss=CustomLoss, | |
# List of metrics to monitor | |
metrics=[ | |
# @paper: 1) Mean Percentile Rank? | |
# mean_percentile_rank, | |
# @paper: 2) Top-k mean accuracy based evaluation | |
top_k_mean_accuracy, | |
custom_spectral_loss, | |
# Extra Adding 3) spectroloss accuracy | |
# Extra Adding 4) combined | |
# @paper: 5) Mean Absolute Error based evaluation | |
keras.metrics.MeanAbsoluteError(), | |
], | |
) | |
def fit( | |
model: keras.Model, | |
x_train: np.ndarray, | |
y_train: np.ndarray, | |
x_val: np.ndarray, | |
y_val: np.ndarray, | |
batch_size: int = 16, | |
epochs: int = 200, | |
) -> keras.Model: | |
# @paper: | |
# with a minibatch size of 16 for | |
# 100 epochs. The best weights for each model were set by | |
# employing an early stopping procedure. | |
logging.info("# Fit model on training data") | |
history = model.fit( | |
x_train, | |
y_train, | |
batch_size=batch_size, | |
epochs=epochs, | |
# @paper: | |
# Early stopping procedure: | |
# We pass some validation for | |
# monitoring validation loss and metrics | |
# at the end of each epoch | |
validation_data=(x_val, y_val), | |
verbose=0, | |
) | |
# The returned "history" object holds a record | |
# of the loss values and metric values during training | |
logging.info("\nhistory dict:", history.history) | |
return model | |
def compare(target, prediction, params, precision=1, print_output=False): | |
if print_output and len(prediction) < 10: | |
print(prediction) | |
print("Pred: {}".format(np.round(prediction, decimals=2))) | |
print("PRnd: {}".format(np.round(prediction))) | |
print("Act : {}".format(target)) | |
print("+" * 5) | |
pred: List[ParamValue] = params.decode(prediction) | |
act: List[ParamValue] = params.decode(target) | |
pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred] | |
act_index: List[int] = [np.array(p.encoding).argmax() for p in act] | |
width = 8 | |
names = "Parameter: " | |
act_s = "Actual: " | |
pred_s = "Predicted: " | |
pred_i = "Pred. Indx:" | |
act_i = "Act. Index:" | |
diff_i = "Index Diff:" | |
for p in act: | |
names += p.name.rjust(width)[:width] | |
act_s += f"{p.value:>8.2f}" | |
for p in pred: | |
pred_s += f"{p.value:>8.2f}" | |
for p in pred_index: | |
pred_i += f"{p:>8}" | |
for p in act_index: | |
act_i += f"{p:>8}" | |
for i in range(len(act_index)): | |
diff = pred_index[i] - act_index[i] | |
diff_i += f"{diff:>8}" | |
exact = 0.0 | |
close = 0.0 | |
n_params = len(pred_index) | |
for i in range(n_params): | |
if pred_index[i] == act_index[i]: | |
exact = exact + 1.0 | |
if abs(pred_index[i] - act_index[i]) <= precision: | |
close = close + 1.0 | |
exact_ratio = exact / n_params | |
close_ratio = close / n_params | |
if print_output: | |
print(names) | |
print(act_s) | |
print(pred_s) | |
print(act_i) | |
print(pred_i) | |
print(diff_i) | |
print("-" * 30) | |
return exact_ratio, close_ratio | |
def evaluate( | |
prediction: np.ndarray, x: np.ndarray, y: np.ndarray, params: ParameterSet, | |
): | |
print("Prediction Shape: {}".format(prediction.shape)) | |
num: int = x.shape[0] | |
correct: int = 0 | |
correct_r: float = 0.0 | |
close_r: float = 0.0 | |
for i in range(num): | |
should_print = i < 5 | |
exact, close = compare( | |
target=y[i], | |
prediction=prediction[i], | |
params=params, | |
print_output=should_print, | |
) | |
if exact == 1.0: | |
correct = correct + 1 | |
correct_r += exact | |
close_r += close | |
summary = params.explain() | |
print( | |
"{} Parameters with {} levels (fixed: {})".format( | |
summary["n_variable"], summary["levels"], summary["n_fixed"] | |
) | |
) | |
print( | |
"Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format( | |
correct, | |
num, | |
correct / num * 100, | |
correct_r / num * 100, | |
close_r / num * 100, | |
) | |
) | |
def data_format_audio(audio: np.ndarray, data_format: str) -> np.ndarray: | |
# `(None, n_channel, n_freq, n_time)` if `'channels_first'`, | |
# `(None, n_freq, n_time, n_channel)` if `'channels_last'`, | |
if data_format == "channels_last": | |
audio = audio[np.newaxis, :, np.newaxis] | |
else: | |
audio = audio[np.newaxis, np.newaxis, :] | |
return audio | |
""" | |
Wrap up the whole training process in a standard function. Gets a callback | |
to actually make the model, to keep it as flexible as possible. | |
# Params: | |
# - dataset_name (dataset name) | |
# - model_name: (C1..C6,e2e) | |
# - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model | |
# - epochs: int | |
# - dataset_dir: place to find input data | |
# - output_dir: place to put outputs | |
# - parameters_file (override parameters filename) | |
# - dataset_file (override dataset filename) | |
# - data_format (channels_first or channels_last) | |
# - run_name: to save this run as | |
""" | |
#LOSS TYPE FOR CUSTOM LOSS FUNCTION | |
LOSS_TYPE = 'L1' | |
SPECTRO_TYPE = 'spectro' | |
PRINT = 1 | |
#DAWDREAMER EXPORT SETTINGS | |
SAMPLE_RATE = 16384 | |
BUFFER_SIZE = 1024 | |
SYNTH_PLUGIN = "libTAL-NoiseMaker.so" | |
ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE) | |
SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN) | |
SYNTH.add_midi_note(40, 127, 0, 0.8) | |
with open('plugin_config/TAL-NoiseMaker-config.json') as f: | |
data = json.load(f) | |
dico=[] | |
# Extract the key ID from the JSON data | |
key_id = data['parameters'] | |
for param in key_id: | |
dico.append(param['id']) | |
DICO=dico | |
def train_model( | |
# Main options | |
dataset_name: str, | |
model_name: str, | |
epochs: int, | |
model_callback: Callable[[str, int, int, str], keras.Model], | |
dataset_dir: str, | |
output_dir: str, # Directory names | |
dataset_file: str = None, | |
parameters_file: str = None, | |
run_name: str = None, | |
data_format: str = "channels_last", | |
save_best: bool = True, | |
resume: bool = False, | |
checkpoint: bool = True, | |
model_type: str = "STFT", | |
): | |
tf.config.run_functions_eagerly(True) | |
# tf.data.experimental.enable_debug_mode() | |
time_generated = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') | |
if not dataset_file: | |
dataset_file = ( | |
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5" | |
) | |
if not parameters_file: | |
parameters_file = ( | |
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl" | |
) | |
if not run_name: | |
run_name = dataset_name + "_" + model_name | |
model_file = f"{output_dir}/model/{run_name}_{time_generated}" | |
if not os.path.exists(model_file): | |
os.makedirs(model_file) | |
best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}" | |
if not os.path.exists(best_model_file): | |
os.makedirs(best_model_file) | |
if resume: | |
# checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
# history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
checkpoint_model_file = f"{output_dir}/history/InverSynth_C6XL_checkpoint_20231201-103344" | |
history_file = f"{output_dir}/checkpoints/InverSynth_C6XL_20231201-103344" | |
else: | |
os.makedirs(f"{output_dir}/history", exist_ok=True) | |
os.makedirs(f"{output_dir}/checkpoints", exist_ok=True) | |
history_file = f"{output_dir}/history/{run_name}_{time_generated}" | |
checkpoint_model_file = f"{output_dir}/checkpoints/{run_name}_checkpoint_{time_generated}" | |
history_graph_file = f"{output_dir}/{run_name}.pdf" | |
print(tf.config.list_physical_devices('GPU')) | |
gpu_avail = len(tf.config.list_physical_devices('GPU')) # True/False | |
cuda_gpu_avail = len(tf.config.list_physical_devices('GPU')) # True/False | |
print("+" * 30) | |
print(f"++ {run_name}") | |
print( | |
f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs" | |
) | |
print(f"Saving model in {output_dir} as {model_file}") | |
print(f"Saving history as {history_file}") | |
print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}") | |
print("+" * 30) | |
os.makedirs(output_dir, exist_ok=True) | |
# Get training and validation generators | |
params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True} | |
training_generator = SoundDataGenerator(first=0.8, **params) | |
validation_generator = SoundDataGenerator(last=0.2, **params) | |
n_samples = training_generator.get_audio_length() | |
print(f"get_audio_length: {n_samples}") | |
n_outputs = training_generator.get_label_size() | |
# set keras image_data_format | |
# NOTE: on CPU only `channels_last` is supported | |
physical_devices = tf.config.list_physical_devices('GPU') | |
keras.backend.set_image_data_format(data_format) | |
model: keras.Model = None | |
if resume and os.path.exists(checkpoint_model_file): | |
history = pd.read_csv(history_file) | |
# Note - its zero indexed in the file, but 1 indexed in the display | |
initial_epoch: int = max(history.iloc[:, 0]) + 1 | |
# epochs:int = initial_epoch | |
print( | |
f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}" | |
) | |
model = keras.models.load_model( | |
checkpoint_model_file | |
, | |
custom_objects={"top_k_mean_accuracy": top_k_mean_accuracy, "Spectrogram" : Spectrogram, | |
"custom_spectral_loss": custom_spectral_loss, "CustomLoss": CustomLoss | |
}, | |
) | |
else: | |
model = model_callback( | |
model_name=model_name, | |
inputs=n_samples, | |
outputs=n_outputs, | |
data_format=data_format, | |
) | |
# keras.utils.plot_model(model, to_file='model.png', show_shapes=True, show_layer_activations=True) | |
# Summarize and compile the model | |
summarize_compile(model) | |
initial_epoch = 0 | |
open(history_file, "w").close() | |
callbacks = [] | |
best_callback = keras.callbacks.ModelCheckpoint( | |
filepath=best_model_file, | |
save_weights_only=False, | |
save_best_only=True, | |
verbose=1, | |
) | |
checkpoint_callback = keras.callbacks.ModelCheckpoint( | |
filepath=checkpoint_model_file, | |
save_weights_only=False, | |
save_best_only=False, | |
verbose=1, | |
) | |
os.makedirs(f"{output_dir}/logs", exist_ok=True) | |
log_dir = f"{output_dir}/logs/" + time_generated | |
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True, profile_batch = '500,520') | |
if save_best: | |
callbacks.append(best_callback) | |
if checkpoint: | |
callbacks.append(checkpoint_callback) | |
callbacks.append(tensorboard_callback) | |
callbacks.append(CSVLogger(history_file, append=True)) | |
callbacks.append(Weight_trans(weight_var, "log3" ,epochs)) | |
# Parameter data - needed for decoding! | |
# Fit the model | |
history = None | |
try: | |
history = model.fit( | |
x=training_generator, | |
validation_data=validation_generator, | |
epochs=epochs, | |
callbacks=callbacks, | |
initial_epoch=initial_epoch, | |
verbose=1, # https://github.com/tensorflow/tensorflow/issues/38064 | |
) | |
except Exception as e: | |
print(f"Something went wrong during `model.fit`: {e}") | |
# Save model | |
model.save(model_file) | |
# Save history | |
if history and not resume: | |
try: | |
hist_df = pd.DataFrame(history.history) | |
try: | |
fig = hist_df.plot(subplots=True, figsize=(8, 25)) | |
fig[0].get_figure().savefig(history_graph_file) | |
except Exception as e: | |
print("Couldn't create history graph") | |
print(e) | |
except Exception as e: | |
tf.print("Couldn't save history") | |
print(e) | |
# evaluate prediction on random sample from validation set | |
# Parameter data - needed for decoding! | |
with open(parameters_file, "rb") as f: | |
parameters: ParameterSet = load(f) | |
# Shuffle data | |
validation_generator.on_epoch_end() | |
X, y = validation_generator.__getitem__(0) | |
X.reshape((X.__len__(), 1, 16384)) | |
# if model_type == "STFT": | |
# # stft expects shape (channel, sample_rate) | |
# X = np.moveaxis(X, 1, -1) | |
prediction: np.ndarray = model.predict(X) | |
evaluate(prediction, X, y, parameters) | |
print("++++" * 5) | |
print("Pushing to trained model") | |
print("++++" * 5) | |
Valid=False | |
while Valid==False: | |
file = namefile = input("Enter .wav test file path: ") | |
if os.path.exists(file): | |
Valid=True | |
else: | |
print("File Path invalid, try again ") | |
newpred = model.predict(audio_importer(str(f'{namefile}'))) | |
predlist: List[ParamValue] = parameters.decode(newpred[0]) | |
df = pd.DataFrame(predlist) | |
print(df) | |
df = df.drop(['encoding'], axis=1) | |
# saving the dataframe | |
if not os.path.exists(str(f'output/wav_inferred')): | |
os.makedirs(str(f'output/wav_inferred')) | |
head, tail = os.path.split(str(f'{namefile}')) | |
print("Outputting CSV config in " + str(f'output/wav_inferred')) | |
df.to_csv(str(f'output/wav_inferred/{tail}.csv')) | |
#export(prediction, X, y, parameters) | |
# Loop through the rows of the DataFrame | |
i = 0 | |
for values in df['value'].values: | |
# Set parameters using DataFrame values | |
SYNTH.set_parameter(DICO[i],values) | |
# (MIDI note, velocity, start, duration) | |
i += 1 | |
#Setting volume to 0.9 | |
SYNTH.set_parameter(1, 0.9) | |
# Set up the processing graph | |
graph = [ | |
# synth takes no inputs, so we give an empty list. | |
(SYNTH, []), | |
] | |
ENGINE.load_graph(graph) | |
ENGINE.render(1) | |
data = ENGINE.get_audio() | |
try: | |
data = librosa.to_mono(data).transpose() | |
except: | |
tf.print("ERROR" * 100) | |
df = df.fillna(0) | |
data = df.to_numpy() | |
data = librosa.to_mono(data).transpose() | |
tf.print("crashed, nan in generation") | |
synth_params = dict(SYNTH.get_patch()) | |
print(synth_params) | |
df = pd.DataFrame(data) | |
# penalty=1000000 | |
# df = pd.DataFrame(data) | |
# df = df.fillna(0) | |
# data = df.to_numpy() | |
wavfile.write(str(f'output/wav_inferred/gen_{tail}.wav'), SAMPLE_RATE, data) | |
def generate_audio(df_params): | |
# Loop through the rows of the DataFrame | |
i = 0 | |
penalty=1 | |
for param in df_params: | |
# Set parameters using DataFrame values | |
SYNTH.set_parameter(DICO[i], param.value) | |
# (MIDI note, velocity, start, duration) | |
i += 1 | |
# Set up the processing graph | |
graph = [ | |
# synth takes no inputs, so we give an empty list. | |
(SYNTH, []), | |
] | |
ENGINE.load_graph(graph) | |
ENGINE.render(1) | |
data = ENGINE.get_audio() | |
if np.isnan(data).any(): | |
# df = pd.DataFrame(data) | |
# df = df.fillna(0) | |
# data = df.to_numpy() | |
tf.print("crashed, nan in generation") | |
synth_params = dict(SYNTH.get_patch()) | |
print(synth_params) | |
try: | |
data = librosa.to_mono(data).transpose() | |
if(librosa.util.valid_audio(data)): | |
result = np.array(data) | |
return result, penalty | |
except: | |
tf.print("crashed, nan in generation") | |
raise("Nan in generation, crashed") | |