import datetime import json import logging import os from pickle import load from typing import Callable, List import librosa import numpy as np import pandas as pd import tensorflow as tf from tensorflow import keras from keras import backend as K from keras.callbacks import CSVLogger from kapre.time_frequency import Spectrogram from models.importer_audio import audio_importer import dawdreamer as daw from scipy.io import wavfile import librosa from generators.parameters import ParameterSet, ParamValue from models.common.data_generator import SoundDataGenerator weight_var = K.variable(0.) class Weight_trans(keras.callbacks.Callback): def __init__(self, weight_var, transition, epochs): self.alpha = weight_var self.transition = transition self.epochs = epochs def on_epoch_end(self, epoch, logs={}): if epoch > 680: if self.transition == "linear": K.set_value(self.alpha, ((epoch)/(self.epochs) - 0.617)*0.00001) tf.print(f"new weight {weight_var.numpy()}") if self.transition == "linear2": K.set_value(self.alpha, (1.5625*epoch - 1.0625)*0.00001) tf.print(f"new weight {weight_var.numpy()}") if self.transition == "log": K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.67285)/tf.math.log(0.0005)) - 0.35)*0.00001) tf.print("log") if self.transition == "log2": K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.6575)/tf.math.log(0.0005)) - 0.5)*0.00001) tf.print("log") if self.transition == "log3": K.set_value(self.alpha, (1- (tf.math.log(epoch*0.001 - 0.67978)/tf.math.log(0.00000005)) - 0.5)*0.00001) tf.print("log") if self.transition == "square": K.set_value(self.alpha, 4.1*tf.pow(epoch*0.001 - 0.65, 2) + 0.002) print("exp") if self.transition == "quad": K.set_value(self.alpha, 33*tf.pow(epoch*0.001 - 0.65, 4) + 0.002) print("quad") def train_val_split( x_train: np.ndarray, y_train: np.ndarray, split: float = 0.2, ) -> tuple: slice: int = int(x_train.shape[0] * split) x_val: np.ndarray = x_train[-slice:] y_val: np.ndarray = y_train[-slice:] x_train = x_train[:-slice] y_train = y_train[:-slice] return (x_val, y_val, x_train, y_train) """Model Utils""" def mean_percentile_rank(y_true, y_pred, k=5): """ @paper The first evaluation measure is the Mean Percentile Rank (MPR) which is computed per synthesizer parameter. """ # TODO def top_k_mean_accuracy(y_true, y_pred, k=5): """ @ paper The top-k mean accuracy is obtained by computing the top-k accuracy for each test example and then taking the mean across all examples. In the same manner as done in the MPR analysis, we compute the top-k mean accuracy per synthesizer parameter for 𝑘 = 1, ... ,5. """ # TODO: per parameter? original_shape = tf.shape(y_true) y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k) correct_pred = tf.reshape(top_k, original_shape[:-1]) return tf.reduce_mean(tf.cast(correct_pred, tf.float32)) @tf.function def CustomLoss(y_true, y_pred): bce = tf.keras.losses.BinaryCrossentropy() weights = custom_spectral_loss(y_true, y_pred) weight_shift = (1-weight_var.numpy())+(weight_var.numpy()*weights.numpy()) # tf.print(f"New weight is {weight_shift}") loss = bce(y_true, y_pred, sample_weight=weight_shift) return loss @tf.function def custom_spectral_loss(y_true, y_pred): # tf.print("After compiling model :",tf.executing_eagerly()) y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) # Assuming y_true and y_pred contain parameters for audio synthesis # Extract parameters from y_true and y_pred with open("test_datasets/InverSynth_params.pckl", "rb") as f: parameters: ParameterSet = load(f) predlist_true: List[ParamValue] = parameters.decode(y_true[0]) predlist_pred: List[ParamValue] = parameters.decode(y_pred[0]) # Convert parameter lists to DataFrames # Generate audio from parameters audio_true, penalty = generate_audio(predlist_true) audio_pred, penalty = generate_audio(predlist_pred) # Compute spectrogram if SPECTRO_TYPE == 'spectro': spectrogram_true = tf.math.abs(tf.signal.stft(audio_true, frame_length=1024, frame_step=512)) spectrogram_pred = tf.math.abs(tf.signal.stft(audio_pred, frame_length=1024, frame_step=512)) elif SPECTRO_TYPE == 'qtrans': spectrogram_true = librosa.amplitude_to_db(librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max) spectrogram_pred = librosa.amplitude_to_db(librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max) elif SPECTRO_TYPE == 'mel': mel_spect = librosa.feature.melspectrogram(audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024) spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max) mel_spect = librosa.feature.melspectrogram(audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024) spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max) #L1 LOSS if LOSS_TYPE == 'L1': spectral_loss = penalty*tf.reduce_mean(tf.abs(spectrogram_true-spectrogram_pred)) #L2 LOSS elif LOSS_TYPE =='L2': spectral_loss = penalty*tf.reduce_mean((spectrogram_true - spectrogram_pred)**2) #COSINE LOSS elif LOSS_TYPE == 'COSINE': spectral_loss = tf.losses.cosine_distance(spectrogram_true, spectrogram_pred, weights=1.0, axis=-1) return spectral_loss def summarize_compile(model: keras.Model): model.summary(line_length=80, positions=[0.33, 0.65, 0.8, 1.0], show_trainable=True, expand_nested=True) # Specify the training configuration (optimizer, loss, metrics) model.compile( optimizer=keras.optimizers.Adam(), # Optimizer- Adam [14] optimizer # Loss function to minimize # @paper: Therefore, we converged on using sigmoid activations with binary cross entropy loss. # loss=keras.losses.BinaryCrossentropy(), loss=CustomLoss, # List of metrics to monitor metrics=[ # @paper: 1) Mean Percentile Rank? # mean_percentile_rank, # @paper: 2) Top-k mean accuracy based evaluation top_k_mean_accuracy, custom_spectral_loss, # Extra Adding 3) spectroloss accuracy # Extra Adding 4) combined # @paper: 5) Mean Absolute Error based evaluation keras.metrics.MeanAbsoluteError(), ], ) def fit( model: keras.Model, x_train: np.ndarray, y_train: np.ndarray, x_val: np.ndarray, y_val: np.ndarray, batch_size: int = 16, epochs: int = 200, ) -> keras.Model: # @paper: # with a minibatch size of 16 for # 100 epochs. The best weights for each model were set by # employing an early stopping procedure. logging.info("# Fit model on training data") history = model.fit( x_train, y_train, batch_size=batch_size, epochs=epochs, # @paper: # Early stopping procedure: # We pass some validation for # monitoring validation loss and metrics # at the end of each epoch validation_data=(x_val, y_val), verbose=0, ) # The returned "history" object holds a record # of the loss values and metric values during training logging.info("\nhistory dict:", history.history) return model def compare(target, prediction, params, precision=1, print_output=False): if print_output and len(prediction) < 10: print(prediction) print("Pred: {}".format(np.round(prediction, decimals=2))) print("PRnd: {}".format(np.round(prediction))) print("Act : {}".format(target)) print("+" * 5) pred: List[ParamValue] = params.decode(prediction) act: List[ParamValue] = params.decode(target) pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred] act_index: List[int] = [np.array(p.encoding).argmax() for p in act] width = 8 names = "Parameter: " act_s = "Actual: " pred_s = "Predicted: " pred_i = "Pred. Indx:" act_i = "Act. Index:" diff_i = "Index Diff:" for p in act: names += p.name.rjust(width)[:width] act_s += f"{p.value:>8.2f}" for p in pred: pred_s += f"{p.value:>8.2f}" for p in pred_index: pred_i += f"{p:>8}" for p in act_index: act_i += f"{p:>8}" for i in range(len(act_index)): diff = pred_index[i] - act_index[i] diff_i += f"{diff:>8}" exact = 0.0 close = 0.0 n_params = len(pred_index) for i in range(n_params): if pred_index[i] == act_index[i]: exact = exact + 1.0 if abs(pred_index[i] - act_index[i]) <= precision: close = close + 1.0 exact_ratio = exact / n_params close_ratio = close / n_params if print_output: print(names) print(act_s) print(pred_s) print(act_i) print(pred_i) print(diff_i) print("-" * 30) return exact_ratio, close_ratio def evaluate( prediction: np.ndarray, x: np.ndarray, y: np.ndarray, params: ParameterSet, ): print("Prediction Shape: {}".format(prediction.shape)) num: int = x.shape[0] correct: int = 0 correct_r: float = 0.0 close_r: float = 0.0 for i in range(num): should_print = i < 5 exact, close = compare( target=y[i], prediction=prediction[i], params=params, print_output=should_print, ) if exact == 1.0: correct = correct + 1 correct_r += exact close_r += close summary = params.explain() print( "{} Parameters with {} levels (fixed: {})".format( summary["n_variable"], summary["levels"], summary["n_fixed"] ) ) print( "Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format( correct, num, correct / num * 100, correct_r / num * 100, close_r / num * 100, ) ) def data_format_audio(audio: np.ndarray, data_format: str) -> np.ndarray: # `(None, n_channel, n_freq, n_time)` if `'channels_first'`, # `(None, n_freq, n_time, n_channel)` if `'channels_last'`, if data_format == "channels_last": audio = audio[np.newaxis, :, np.newaxis] else: audio = audio[np.newaxis, np.newaxis, :] return audio """ Wrap up the whole training process in a standard function. Gets a callback to actually make the model, to keep it as flexible as possible. # Params: # - dataset_name (dataset name) # - model_name: (C1..C6,e2e) # - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model # - epochs: int # - dataset_dir: place to find input data # - output_dir: place to put outputs # - parameters_file (override parameters filename) # - dataset_file (override dataset filename) # - data_format (channels_first or channels_last) # - run_name: to save this run as """ #LOSS TYPE FOR CUSTOM LOSS FUNCTION LOSS_TYPE = 'L1' SPECTRO_TYPE = 'spectro' PRINT = 1 #DAWDREAMER EXPORT SETTINGS SAMPLE_RATE = 16384 BUFFER_SIZE = 1024 SYNTH_PLUGIN = "libTAL-NoiseMaker.so" ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE) SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN) SYNTH.add_midi_note(40, 127, 0, 0.8) with open('plugin_config/TAL-NoiseMaker-config.json') as f: data = json.load(f) dico=[] # Extract the key ID from the JSON data key_id = data['parameters'] for param in key_id: dico.append(param['id']) DICO=dico def train_model( # Main options dataset_name: str, model_name: str, epochs: int, model_callback: Callable[[str, int, int, str], keras.Model], dataset_dir: str, output_dir: str, # Directory names dataset_file: str = None, parameters_file: str = None, run_name: str = None, data_format: str = "channels_last", save_best: bool = True, resume: bool = False, checkpoint: bool = True, model_type: str = "STFT", ): tf.config.run_functions_eagerly(True) # tf.data.experimental.enable_debug_mode() time_generated = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') if not dataset_file: dataset_file = ( os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5" ) if not parameters_file: parameters_file = ( os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl" ) if not run_name: run_name = dataset_name + "_" + model_name model_file = f"{output_dir}/model/{run_name}_{time_generated}" if not os.path.exists(model_file): os.makedirs(model_file) best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}" if not os.path.exists(best_model_file): os.makedirs(best_model_file) if resume: # checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" # history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" checkpoint_model_file = f"{output_dir}/history/InverSynth_C6XL_checkpoint_20231201-103344" history_file = f"{output_dir}/checkpoints/InverSynth_C6XL_20231201-103344" else: os.makedirs(f"{output_dir}/history", exist_ok=True) os.makedirs(f"{output_dir}/checkpoints", exist_ok=True) history_file = f"{output_dir}/history/{run_name}_{time_generated}" checkpoint_model_file = f"{output_dir}/checkpoints/{run_name}_checkpoint_{time_generated}" history_graph_file = f"{output_dir}/{run_name}.pdf" print(tf.config.list_physical_devices('GPU')) gpu_avail = len(tf.config.list_physical_devices('GPU')) # True/False cuda_gpu_avail = len(tf.config.list_physical_devices('GPU')) # True/False print("+" * 30) print(f"++ {run_name}") print( f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs" ) print(f"Saving model in {output_dir} as {model_file}") print(f"Saving history as {history_file}") print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}") print("+" * 30) os.makedirs(output_dir, exist_ok=True) # Get training and validation generators params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True} training_generator = SoundDataGenerator(first=0.8, **params) validation_generator = SoundDataGenerator(last=0.2, **params) n_samples = training_generator.get_audio_length() print(f"get_audio_length: {n_samples}") n_outputs = training_generator.get_label_size() # set keras image_data_format # NOTE: on CPU only `channels_last` is supported physical_devices = tf.config.list_physical_devices('GPU') keras.backend.set_image_data_format(data_format) model: keras.Model = None if resume and os.path.exists(checkpoint_model_file): history = pd.read_csv(history_file) # Note - its zero indexed in the file, but 1 indexed in the display initial_epoch: int = max(history.iloc[:, 0]) + 1 # epochs:int = initial_epoch print( f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}" ) model = keras.models.load_model( checkpoint_model_file , custom_objects={"top_k_mean_accuracy": top_k_mean_accuracy, "Spectrogram" : Spectrogram, "custom_spectral_loss": custom_spectral_loss, "CustomLoss": CustomLoss }, ) else: model = model_callback( model_name=model_name, inputs=n_samples, outputs=n_outputs, data_format=data_format, ) # keras.utils.plot_model(model, to_file='model.png', show_shapes=True, show_layer_activations=True) # Summarize and compile the model summarize_compile(model) initial_epoch = 0 open(history_file, "w").close() callbacks = [] best_callback = keras.callbacks.ModelCheckpoint( filepath=best_model_file, save_weights_only=False, save_best_only=True, verbose=1, ) checkpoint_callback = keras.callbacks.ModelCheckpoint( filepath=checkpoint_model_file, save_weights_only=False, save_best_only=False, verbose=1, ) os.makedirs(f"{output_dir}/logs", exist_ok=True) log_dir = f"{output_dir}/logs/" + time_generated tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True, profile_batch = '500,520') if save_best: callbacks.append(best_callback) if checkpoint: callbacks.append(checkpoint_callback) callbacks.append(tensorboard_callback) callbacks.append(CSVLogger(history_file, append=True)) callbacks.append(Weight_trans(weight_var, "log3" ,epochs)) # Parameter data - needed for decoding! # Fit the model history = None try: history = model.fit( x=training_generator, validation_data=validation_generator, epochs=epochs, callbacks=callbacks, initial_epoch=initial_epoch, verbose=1, # https://github.com/tensorflow/tensorflow/issues/38064 ) except Exception as e: print(f"Something went wrong during `model.fit`: {e}") # Save model model.save(model_file) # Save history if history and not resume: try: hist_df = pd.DataFrame(history.history) try: fig = hist_df.plot(subplots=True, figsize=(8, 25)) fig[0].get_figure().savefig(history_graph_file) except Exception as e: print("Couldn't create history graph") print(e) except Exception as e: tf.print("Couldn't save history") print(e) # evaluate prediction on random sample from validation set # Parameter data - needed for decoding! with open(parameters_file, "rb") as f: parameters: ParameterSet = load(f) # Shuffle data validation_generator.on_epoch_end() X, y = validation_generator.__getitem__(0) X.reshape((X.__len__(), 1, 16384)) # if model_type == "STFT": # # stft expects shape (channel, sample_rate) # X = np.moveaxis(X, 1, -1) prediction: np.ndarray = model.predict(X) evaluate(prediction, X, y, parameters) print("++++" * 5) print("Pushing to trained model") print("++++" * 5) Valid=False while Valid==False: file = namefile = input("Enter .wav test file path: ") if os.path.exists(file): Valid=True else: print("File Path invalid, try again ") newpred = model.predict(audio_importer(str(f'{namefile}'))) predlist: List[ParamValue] = parameters.decode(newpred[0]) df = pd.DataFrame(predlist) print(df) df = df.drop(['encoding'], axis=1) # saving the dataframe if not os.path.exists(str(f'output/wav_inferred')): os.makedirs(str(f'output/wav_inferred')) head, tail = os.path.split(str(f'{namefile}')) print("Outputting CSV config in " + str(f'output/wav_inferred')) df.to_csv(str(f'output/wav_inferred/{tail}.csv')) #export(prediction, X, y, parameters) # Loop through the rows of the DataFrame i = 0 for values in df['value'].values: # Set parameters using DataFrame values SYNTH.set_parameter(DICO[i],values) # (MIDI note, velocity, start, duration) i += 1 #Setting volume to 0.9 SYNTH.set_parameter(1, 0.9) # Set up the processing graph graph = [ # synth takes no inputs, so we give an empty list. (SYNTH, []), ] ENGINE.load_graph(graph) ENGINE.render(1) data = ENGINE.get_audio() try: data = librosa.to_mono(data).transpose() except: tf.print("ERROR" * 100) df = df.fillna(0) data = df.to_numpy() data = librosa.to_mono(data).transpose() tf.print("crashed, nan in generation") synth_params = dict(SYNTH.get_patch()) print(synth_params) df = pd.DataFrame(data) # penalty=1000000 # df = pd.DataFrame(data) # df = df.fillna(0) # data = df.to_numpy() wavfile.write(str(f'output/wav_inferred/gen_{tail}.wav'), SAMPLE_RATE, data) def generate_audio(df_params): # Loop through the rows of the DataFrame i = 0 penalty=1 for param in df_params: # Set parameters using DataFrame values SYNTH.set_parameter(DICO[i], param.value) # (MIDI note, velocity, start, duration) i += 1 # Set up the processing graph graph = [ # synth takes no inputs, so we give an empty list. (SYNTH, []), ] ENGINE.load_graph(graph) ENGINE.render(1) data = ENGINE.get_audio() if np.isnan(data).any(): # df = pd.DataFrame(data) # df = df.fillna(0) # data = df.to_numpy() tf.print("crashed, nan in generation") synth_params = dict(SYNTH.get_patch()) print(synth_params) try: data = librosa.to_mono(data).transpose() if(librosa.util.valid_audio(data)): result = np.array(data) return result, penalty except: tf.print("crashed, nan in generation") raise("Nan in generation, crashed")