import datetime import json import os from pickle import load from typing import Callable, List import librosa import numpy as np import pandas as pd import tensorflow as tf from tensorflow import keras from keras import backend as K from kapre.time_frequency import Spectrogram from models.convert_to_preset import convert_csv_to_preset from models.importer_audio import audio_importer import dawdreamer as daw from scipy.io import wavfile import librosa from generators.parameters import ParameterSet, ParamValue weight_var = K.variable(0.0) class Weight_trans(keras.callbacks.Callback): def __init__(self, weight_var, transition, epochs): self.alpha = weight_var self.transition = transition self.epochs = epochs def on_epoch_end(self, epoch, logs={}): if epoch > 680: if self.transition == "linear": K.set_value(self.alpha, ((epoch) / (self.epochs) - 0.617) * 0.00001) tf.print(f"new weight {weight_var.numpy()}") if self.transition == "linear2": K.set_value(self.alpha, (1.5625 * epoch - 1.0625) * 0.00001) tf.print(f"new weight {weight_var.numpy()}") if self.transition == "log": K.set_value( self.alpha, ( 1 - (tf.math.log(epoch * 0.001 - 0.67285) / tf.math.log(0.0005)) - 0.35 ) * 0.00001, ) tf.print("log") if self.transition == "log2": K.set_value( self.alpha, ( 1 - (tf.math.log(epoch * 0.001 - 0.6575) / tf.math.log(0.0005)) - 0.5 ) * 0.00001, ) tf.print("log") if self.transition == "log3": K.set_value( self.alpha, ( 1 - ( tf.math.log(epoch * 0.001 - 0.67978) / tf.math.log(0.00000005) ) - 0.5 ) * 0.00001, ) tf.print("log") if self.transition == "square": K.set_value(self.alpha, 4.1 * tf.pow(epoch * 0.001 - 0.65, 2) + 0.002) print("exp") if self.transition == "quad": K.set_value(self.alpha, 33 * tf.pow(epoch * 0.001 - 0.65, 4) + 0.002) print("quad") """Model Utils""" def mean_percentile_rank(y_true, y_pred, k=5): """ @paper The first evaluation measure is the Mean Percentile Rank (MPR) which is computed per synthesizer parameter. """ # TODO def top_k_mean_accuracy(y_true, y_pred, k=5): """ @ paper The top-k mean accuracy is obtained by computing the top-k accuracy for each test example and then taking the mean across all examples. In the same manner as done in the MPR analysis, we compute the top-k mean accuracy per synthesizer parameter for 𝑘 = 1, ... ,5. """ # TODO: per parameter? original_shape = tf.shape(y_true) y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k) correct_pred = tf.reshape(top_k, original_shape[:-1]) return tf.reduce_mean(tf.cast(correct_pred, tf.float32)) @tf.function def CustomLoss(y_true, y_pred): bce = tf.keras.losses.BinaryCrossentropy() weights = custom_spectral_loss(y_true, y_pred) weight_shift = (1 - weight_var.numpy()) + (weight_var.numpy() * weights.numpy()) # tf.print(f"New weight is {weight_shift}") loss = bce(y_true, y_pred, sample_weight=weight_shift) return loss @tf.function def custom_spectral_loss(y_true, y_pred): # tf.print("After compiling model :",tf.executing_eagerly()) y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) # Assuming y_true and y_pred contain parameters for audio synthesis # Extract parameters from y_true and y_pred with open("test_datasets/InverSynth_params.pckl", "rb") as f: parameters: ParameterSet = load(f) predlist_true: List[ParamValue] = parameters.decode(y_true[0]) predlist_pred: List[ParamValue] = parameters.decode(y_pred[0]) # Convert parameter lists to DataFrames # Generate audio from parameters audio_true, penalty = generate_audio(predlist_true) audio_pred, penalty = generate_audio(predlist_pred) # Compute spectrogram if SPECTRO_TYPE == "spectro": spectrogram_true = tf.math.abs( tf.signal.stft(audio_true, frame_length=1024, frame_step=512) ) spectrogram_pred = tf.math.abs( tf.signal.stft(audio_pred, frame_length=1024, frame_step=512) ) elif SPECTRO_TYPE == "qtrans": spectrogram_true = librosa.amplitude_to_db( librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max ) spectrogram_pred = librosa.amplitude_to_db( librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max ) elif SPECTRO_TYPE == "mel": mel_spect = librosa.feature.melspectrogram( audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024 ) spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max) mel_spect = librosa.feature.melspectrogram( audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024 ) spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max) # L1 LOSS if LOSS_TYPE == "L1": spectral_loss = penalty * tf.reduce_mean( tf.abs(spectrogram_true - spectrogram_pred) ) # L2 LOSS elif LOSS_TYPE == "L2": spectral_loss = penalty * tf.reduce_mean( (spectrogram_true - spectrogram_pred) ** 2 ) # COSINE LOSS elif LOSS_TYPE == "COSINE": spectral_loss = tf.losses.cosine_distance( spectrogram_true, spectrogram_pred, weights=1.0, axis=-1 ) return spectral_loss def compare(target, prediction, params, precision=1, print_output=False): if print_output and len(prediction) < 10: print(prediction) print("Pred: {}".format(np.round(prediction, decimals=2))) print("PRnd: {}".format(np.round(prediction))) print("Act : {}".format(target)) print("+" * 5) pred: List[ParamValue] = params.decode(prediction) act: List[ParamValue] = params.decode(target) pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred] act_index: List[int] = [np.array(p.encoding).argmax() for p in act] width = 8 names = "Parameter: " act_s = "Actual: " pred_s = "Predicted: " pred_i = "Pred. Indx:" act_i = "Act. Index:" diff_i = "Index Diff:" for p in act: names += p.name.rjust(width)[:width] act_s += f"{p.value:>8.2f}" for p in pred: pred_s += f"{p.value:>8.2f}" for p in pred_index: pred_i += f"{p:>8}" for p in act_index: act_i += f"{p:>8}" for i in range(len(act_index)): diff = pred_index[i] - act_index[i] diff_i += f"{diff:>8}" exact = 0.0 close = 0.0 n_params = len(pred_index) for i in range(n_params): if pred_index[i] == act_index[i]: exact = exact + 1.0 if abs(pred_index[i] - act_index[i]) <= precision: close = close + 1.0 exact_ratio = exact / n_params close_ratio = close / n_params if print_output: print(names) print(act_s) print(pred_s) print(act_i) print(pred_i) print(diff_i) print("-" * 30) return exact_ratio, close_ratio def evaluate( prediction: np.ndarray, x: np.ndarray, y: np.ndarray, params: ParameterSet, ): print("Prediction Shape: {}".format(prediction.shape)) num: int = x.shape[0] correct: int = 0 correct_r: float = 0.0 close_r: float = 0.0 for i in range(num): should_print = i < 5 exact, close = compare( target=y[i], prediction=prediction[i], params=params, print_output=should_print, ) if exact == 1.0: correct = correct + 1 correct_r += exact close_r += close summary = params.explain() print( "{} Parameters with {} levels (fixed: {})".format( summary["n_variable"], summary["levels"], summary["n_fixed"] ) ) print( "Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format( correct, num, correct / num * 100, correct_r / num * 100, close_r / num * 100, ) ) """ Wrap up the whole training process in a standard function. Gets a callback to actually make the model, to keep it as flexible as possible. # Params: # - dataset_name (dataset name) # - model_name: (C1..C6,e2e) # - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model # - epochs: int # - dataset_dir: place to find input data # - output_dir: place to put outputs # - parameters_file (override parameters filename) # - dataset_file (override dataset filename) # - data_format (channels_first or channels_last) # - run_name: to save this run as """ # LOSS TYPE FOR CUSTOM LOSS FUNCTION LOSS_TYPE = "L1" SPECTRO_TYPE = "spectro" PRINT = 1 # DAWDREAMER EXPORT SETTINGS SAMPLE_RATE = 16384 BUFFER_SIZE = 1024 SYNTH_PLUGIN = 'libTAL-NoiseMaker.so' # SYNTH_PLUGIN = "TAL-NoiseMaker.vst3" ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE) SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN) SYNTH.add_midi_note(40, 127, 0, 0.8) with open("plugin_config/TAL-NoiseMaker-config.json") as f: data = json.load(f) dico = [] # Extract the key ID from the JSON data key_id = data["parameters"] for param in key_id: dico.append(param["id"]) DICO = dico def train_model( # Main options dataset_name: str, model_name: str, epochs: int, model_callback: Callable[[str, int, int, str], keras.Model], dataset_dir: str, output_dir: str, # Directory names dataset_file: str = None, parameters_file: str = None, run_name: str = None, data_format: str = "channels_last", save_best: bool = True, resume: bool = False, checkpoint: bool = True, model_type: str = "STFT", ): tf.config.run_functions_eagerly(True) # tf.data.experimental.enable_debug_mode() time_generated = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") if not dataset_file: dataset_file = ( os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5" ) if not parameters_file: parameters_file = ( os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl" ) if not run_name: run_name = dataset_name + "_" + model_name model_file = f"{output_dir}/model/{run_name}_{time_generated}" if not os.path.exists(model_file): os.makedirs(model_file) best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}" if not os.path.exists(best_model_file): os.makedirs(best_model_file) if resume: # checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" # history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" checkpoint_model_file = ( f"{output_dir}/checkpoints/model" ) history_file = f"{output_dir}/history/model" print(tf.config.list_physical_devices("GPU")) gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False cuda_gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False print("+" * 30) print(f"++ {run_name}") print( f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs" ) print(f"Saving model in {output_dir} as {model_file}") print(f"Saving history as {history_file}") print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}") print("+" * 30) os.makedirs(output_dir, exist_ok=True) # Get training and validation generators params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True} model: keras.Model = None if resume and os.path.exists(checkpoint_model_file): history = pd.read_csv(history_file) # Note - its zero indexed in the file, but 1 indexed in the display initial_epoch: int = max(history.iloc[:, 0]) + 1 # epochs:int = initial_epoch print( f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}" ) model = keras.models.load_model( checkpoint_model_file, custom_objects={ "top_k_mean_accuracy": top_k_mean_accuracy, "Spectrogram": Spectrogram, "custom_spectral_loss": custom_spectral_loss, "CustomLoss": CustomLoss, }, ) return model, parameters_file def inference(model: keras.Model, parameters_file: str, file_path: str, file_id: str): # Start infer with open(parameters_file, "rb") as f: parameters: ParameterSet = load(f) print("++++" * 5) print("Pushing to trained model") print("++++" * 5) Valid = False while Valid == False: namefile = file_path if os.path.exists(namefile): Valid = True else: raise("File Path invalid, try again ") try: newpred = model.predict(audio_importer(str(f"{namefile}"))) except: raise "Crashed" predlist: List[ParamValue] = parameters.decode(newpred[0]) df = pd.DataFrame(predlist) print(df) df = df.drop(["encoding"], axis=1) # saving the dataframe print("Outputting CSV config in " + str(f"temp/")) csv_path = str(f"temp/{file_id}_config.csv") xml_path_wow = (f"temp/{file_id}_config.noisemakerpreset") df.to_csv(csv_path) xml_path = convert_csv_to_preset(csv_path, xml_path_wow) # export(prediction, X, y, parameters) # Loop through the rows of the DataFrame i = 0 for values in df["value"].values: # Set parameters using DataFrame values SYNTH.set_parameter(DICO[i], values) # (MIDI note, velocity, start, duration) i += 1 # Setting volume to 0.9 SYNTH.set_parameter(1, 0.9) # Set up the processing graph graph = [ # synth takes no inputs, so we give an empty list. (SYNTH, []), ] ENGINE.load_graph(graph) ENGINE.render(1) data = ENGINE.get_audio() try: data = librosa.to_mono(data).transpose() except: tf.print("ERROR" * 100) df = df.fillna(0) data = df.to_numpy() data = librosa.to_mono(data).transpose() tf.print("crashed, nan in generation") synth_params = dict(SYNTH.get_patch()) print(synth_params) df = pd.DataFrame(data) # penalty=1000000 # df = pd.DataFrame(data) # df = df.fillna(0) # data = df.to_numpy() output_file_path = str(f"temp/{file_id}_generated.wav") wavfile.write(output_file_path, SAMPLE_RATE, data) return file_path, xml_path, output_file_path def generate_audio(df_params): # Loop through the rows of the DataFrame i = 0 penalty = 1 for param in df_params: # Set parameters using DataFrame values SYNTH.set_parameter(DICO[i], param.value) # (MIDI note, velocity, start, duration) i += 1 # Set up the processing graph graph = [ # synth takes no inputs, so we give an empty list. (SYNTH, []), ] ENGINE.load_graph(graph) ENGINE.render(1) data = ENGINE.get_audio() try: data = librosa.to_mono(data).transpose() except: print("ERROR" * 100) df = pd.DataFrame(data) df = df.fillna(0) data = df.to_numpy() data = librosa.to_mono(data).transpose() result = np.array(data) return result, penalty