Spaces:
Sleeping
Sleeping
import datetime | |
import json | |
import os | |
from pickle import load | |
from typing import Callable, List | |
import librosa | |
import numpy as np | |
import pandas as pd | |
import tensorflow as tf | |
from tensorflow import keras | |
from keras import backend as K | |
from kapre.time_frequency import Spectrogram | |
from models.convert_to_preset import convert_csv_to_preset | |
from models.importer_audio import audio_importer | |
import dawdreamer as daw | |
from scipy.io import wavfile | |
import librosa | |
from generators.parameters import ParameterSet, ParamValue | |
weight_var = K.variable(0.0) | |
class Weight_trans(keras.callbacks.Callback): | |
def __init__(self, weight_var, transition, epochs): | |
self.alpha = weight_var | |
self.transition = transition | |
self.epochs = epochs | |
def on_epoch_end(self, epoch, logs={}): | |
if epoch > 680: | |
if self.transition == "linear": | |
K.set_value(self.alpha, ((epoch) / (self.epochs) - 0.617) * 0.00001) | |
tf.print(f"new weight {weight_var.numpy()}") | |
if self.transition == "linear2": | |
K.set_value(self.alpha, (1.5625 * epoch - 1.0625) * 0.00001) | |
tf.print(f"new weight {weight_var.numpy()}") | |
if self.transition == "log": | |
K.set_value( | |
self.alpha, | |
( | |
1 | |
- (tf.math.log(epoch * 0.001 - 0.67285) / tf.math.log(0.0005)) | |
- 0.35 | |
) | |
* 0.00001, | |
) | |
tf.print("log") | |
if self.transition == "log2": | |
K.set_value( | |
self.alpha, | |
( | |
1 | |
- (tf.math.log(epoch * 0.001 - 0.6575) / tf.math.log(0.0005)) | |
- 0.5 | |
) | |
* 0.00001, | |
) | |
tf.print("log") | |
if self.transition == "log3": | |
K.set_value( | |
self.alpha, | |
( | |
1 | |
- ( | |
tf.math.log(epoch * 0.001 - 0.67978) | |
/ tf.math.log(0.00000005) | |
) | |
- 0.5 | |
) | |
* 0.00001, | |
) | |
tf.print("log") | |
if self.transition == "square": | |
K.set_value(self.alpha, 4.1 * tf.pow(epoch * 0.001 - 0.65, 2) + 0.002) | |
print("exp") | |
if self.transition == "quad": | |
K.set_value(self.alpha, 33 * tf.pow(epoch * 0.001 - 0.65, 4) + 0.002) | |
print("quad") | |
"""Model Utils""" | |
def mean_percentile_rank(y_true, y_pred, k=5): | |
""" | |
@paper | |
The first evaluation measure is the Mean Percentile Rank | |
(MPR) which is computed per synthesizer parameter. | |
""" | |
# TODO | |
def top_k_mean_accuracy(y_true, y_pred, k=5): | |
""" | |
@ paper | |
The top-k mean accuracy is obtained by computing the top-k | |
accuracy for each test example and then taking the mean across | |
all examples. In the same manner as done in the MPR analysis, | |
we compute the top-k mean accuracy per synthesizer | |
parameter for π = 1, ... ,5. | |
""" | |
# TODO: per parameter? | |
original_shape = tf.shape(y_true) | |
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) | |
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) | |
top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k) | |
correct_pred = tf.reshape(top_k, original_shape[:-1]) | |
return tf.reduce_mean(tf.cast(correct_pred, tf.float32)) | |
def CustomLoss(y_true, y_pred): | |
bce = tf.keras.losses.BinaryCrossentropy() | |
weights = custom_spectral_loss(y_true, y_pred) | |
weight_shift = (1 - weight_var.numpy()) + (weight_var.numpy() * weights.numpy()) | |
# tf.print(f"New weight is {weight_shift}") | |
loss = bce(y_true, y_pred, sample_weight=weight_shift) | |
return loss | |
def custom_spectral_loss(y_true, y_pred): | |
# tf.print("After compiling model :",tf.executing_eagerly()) | |
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) | |
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) | |
# Assuming y_true and y_pred contain parameters for audio synthesis | |
# Extract parameters from y_true and y_pred | |
with open("test_datasets/InverSynth_params.pckl", "rb") as f: | |
parameters: ParameterSet = load(f) | |
predlist_true: List[ParamValue] = parameters.decode(y_true[0]) | |
predlist_pred: List[ParamValue] = parameters.decode(y_pred[0]) | |
# Convert parameter lists to DataFrames | |
# Generate audio from parameters | |
audio_true, penalty = generate_audio(predlist_true) | |
audio_pred, penalty = generate_audio(predlist_pred) | |
# Compute spectrogram | |
if SPECTRO_TYPE == "spectro": | |
spectrogram_true = tf.math.abs( | |
tf.signal.stft(audio_true, frame_length=1024, frame_step=512) | |
) | |
spectrogram_pred = tf.math.abs( | |
tf.signal.stft(audio_pred, frame_length=1024, frame_step=512) | |
) | |
elif SPECTRO_TYPE == "qtrans": | |
spectrogram_true = librosa.amplitude_to_db( | |
librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max | |
) | |
spectrogram_pred = librosa.amplitude_to_db( | |
librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max | |
) | |
elif SPECTRO_TYPE == "mel": | |
mel_spect = librosa.feature.melspectrogram( | |
audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024 | |
) | |
spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max) | |
mel_spect = librosa.feature.melspectrogram( | |
audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024 | |
) | |
spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max) | |
# L1 LOSS | |
if LOSS_TYPE == "L1": | |
spectral_loss = penalty * tf.reduce_mean( | |
tf.abs(spectrogram_true - spectrogram_pred) | |
) | |
# L2 LOSS | |
elif LOSS_TYPE == "L2": | |
spectral_loss = penalty * tf.reduce_mean( | |
(spectrogram_true - spectrogram_pred) ** 2 | |
) | |
# COSINE LOSS | |
elif LOSS_TYPE == "COSINE": | |
spectral_loss = tf.losses.cosine_distance( | |
spectrogram_true, spectrogram_pred, weights=1.0, axis=-1 | |
) | |
return spectral_loss | |
def compare(target, prediction, params, precision=1, print_output=False): | |
if print_output and len(prediction) < 10: | |
print(prediction) | |
print("Pred: {}".format(np.round(prediction, decimals=2))) | |
print("PRnd: {}".format(np.round(prediction))) | |
print("Act : {}".format(target)) | |
print("+" * 5) | |
pred: List[ParamValue] = params.decode(prediction) | |
act: List[ParamValue] = params.decode(target) | |
pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred] | |
act_index: List[int] = [np.array(p.encoding).argmax() for p in act] | |
width = 8 | |
names = "Parameter: " | |
act_s = "Actual: " | |
pred_s = "Predicted: " | |
pred_i = "Pred. Indx:" | |
act_i = "Act. Index:" | |
diff_i = "Index Diff:" | |
for p in act: | |
names += p.name.rjust(width)[:width] | |
act_s += f"{p.value:>8.2f}" | |
for p in pred: | |
pred_s += f"{p.value:>8.2f}" | |
for p in pred_index: | |
pred_i += f"{p:>8}" | |
for p in act_index: | |
act_i += f"{p:>8}" | |
for i in range(len(act_index)): | |
diff = pred_index[i] - act_index[i] | |
diff_i += f"{diff:>8}" | |
exact = 0.0 | |
close = 0.0 | |
n_params = len(pred_index) | |
for i in range(n_params): | |
if pred_index[i] == act_index[i]: | |
exact = exact + 1.0 | |
if abs(pred_index[i] - act_index[i]) <= precision: | |
close = close + 1.0 | |
exact_ratio = exact / n_params | |
close_ratio = close / n_params | |
if print_output: | |
print(names) | |
print(act_s) | |
print(pred_s) | |
print(act_i) | |
print(pred_i) | |
print(diff_i) | |
print("-" * 30) | |
return exact_ratio, close_ratio | |
def evaluate( | |
prediction: np.ndarray, | |
x: np.ndarray, | |
y: np.ndarray, | |
params: ParameterSet, | |
): | |
print("Prediction Shape: {}".format(prediction.shape)) | |
num: int = x.shape[0] | |
correct: int = 0 | |
correct_r: float = 0.0 | |
close_r: float = 0.0 | |
for i in range(num): | |
should_print = i < 5 | |
exact, close = compare( | |
target=y[i], | |
prediction=prediction[i], | |
params=params, | |
print_output=should_print, | |
) | |
if exact == 1.0: | |
correct = correct + 1 | |
correct_r += exact | |
close_r += close | |
summary = params.explain() | |
print( | |
"{} Parameters with {} levels (fixed: {})".format( | |
summary["n_variable"], summary["levels"], summary["n_fixed"] | |
) | |
) | |
print( | |
"Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format( | |
correct, | |
num, | |
correct / num * 100, | |
correct_r / num * 100, | |
close_r / num * 100, | |
) | |
) | |
""" | |
Wrap up the whole training process in a standard function. Gets a callback | |
to actually make the model, to keep it as flexible as possible. | |
# Params: | |
# - dataset_name (dataset name) | |
# - model_name: (C1..C6,e2e) | |
# - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model | |
# - epochs: int | |
# - dataset_dir: place to find input data | |
# - output_dir: place to put outputs | |
# - parameters_file (override parameters filename) | |
# - dataset_file (override dataset filename) | |
# - data_format (channels_first or channels_last) | |
# - run_name: to save this run as | |
""" | |
# LOSS TYPE FOR CUSTOM LOSS FUNCTION | |
LOSS_TYPE = "L1" | |
SPECTRO_TYPE = "spectro" | |
PRINT = 1 | |
# DAWDREAMER EXPORT SETTINGS | |
SAMPLE_RATE = 16384 | |
BUFFER_SIZE = 1024 | |
SYNTH_PLUGIN = 'libTAL-NoiseMaker.so' | |
# SYNTH_PLUGIN = "TAL-NoiseMaker.vst3" | |
ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE) | |
SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN) | |
SYNTH.add_midi_note(40, 127, 0, 0.8) | |
with open("plugin_config/TAL-NoiseMaker-config.json") as f: | |
data = json.load(f) | |
dico = [] | |
# Extract the key ID from the JSON data | |
key_id = data["parameters"] | |
for param in key_id: | |
dico.append(param["id"]) | |
DICO = dico | |
def train_model( | |
# Main options | |
dataset_name: str, | |
model_name: str, | |
epochs: int, | |
model_callback: Callable[[str, int, int, str], keras.Model], | |
dataset_dir: str, | |
output_dir: str, # Directory names | |
dataset_file: str = None, | |
parameters_file: str = None, | |
run_name: str = None, | |
data_format: str = "channels_last", | |
save_best: bool = True, | |
resume: bool = False, | |
checkpoint: bool = True, | |
model_type: str = "STFT", | |
): | |
tf.config.run_functions_eagerly(True) | |
# tf.data.experimental.enable_debug_mode() | |
time_generated = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") | |
if not dataset_file: | |
dataset_file = ( | |
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5" | |
) | |
if not parameters_file: | |
parameters_file = ( | |
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl" | |
) | |
if not run_name: | |
run_name = dataset_name + "_" + model_name | |
model_file = f"{output_dir}/model/{run_name}_{time_generated}" | |
if not os.path.exists(model_file): | |
os.makedirs(model_file) | |
best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}" | |
if not os.path.exists(best_model_file): | |
os.makedirs(best_model_file) | |
if resume: | |
# checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
# history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
checkpoint_model_file = ( | |
f"{output_dir}/checkpoints/model" | |
) | |
history_file = f"{output_dir}/history/model" | |
print(tf.config.list_physical_devices("GPU")) | |
gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False | |
cuda_gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False | |
print("+" * 30) | |
print(f"++ {run_name}") | |
print( | |
f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs" | |
) | |
print(f"Saving model in {output_dir} as {model_file}") | |
print(f"Saving history as {history_file}") | |
print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}") | |
print("+" * 30) | |
os.makedirs(output_dir, exist_ok=True) | |
# Get training and validation generators | |
params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True} | |
model: keras.Model = None | |
if resume and os.path.exists(checkpoint_model_file): | |
history = pd.read_csv(history_file) | |
# Note - its zero indexed in the file, but 1 indexed in the display | |
initial_epoch: int = max(history.iloc[:, 0]) + 1 | |
# epochs:int = initial_epoch | |
print( | |
f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}" | |
) | |
model = keras.models.load_model( | |
checkpoint_model_file, | |
custom_objects={ | |
"top_k_mean_accuracy": top_k_mean_accuracy, | |
"Spectrogram": Spectrogram, | |
"custom_spectral_loss": custom_spectral_loss, | |
"CustomLoss": CustomLoss, | |
}, | |
) | |
return model, parameters_file | |
def inference(model: keras.Model, parameters_file: str, file_path: str, file_id: str): | |
# Start infer | |
with open(parameters_file, "rb") as f: | |
parameters: ParameterSet = load(f) | |
print("++++" * 5) | |
print("Pushing to trained model") | |
print("++++" * 5) | |
Valid = False | |
while Valid == False: | |
namefile = file_path | |
if os.path.exists(namefile): | |
Valid = True | |
else: | |
raise("File Path invalid, try again ") | |
try: | |
newpred = model.predict(audio_importer(str(f"{namefile}"))) | |
except: | |
raise "Crashed" | |
predlist: List[ParamValue] = parameters.decode(newpred[0]) | |
df = pd.DataFrame(predlist) | |
print(df) | |
df = df.drop(["encoding"], axis=1) | |
# saving the dataframe | |
print("Outputting CSV config in " + str(f"temp/")) | |
csv_path = str(f"temp/{file_id}_config.csv") | |
xml_path_wow = (f"temp/{file_id}_config.noisemakerpreset") | |
df.to_csv(csv_path) | |
xml_path = convert_csv_to_preset(csv_path, xml_path_wow) | |
# export(prediction, X, y, parameters) | |
# Loop through the rows of the DataFrame | |
i = 0 | |
for values in df["value"].values: | |
# Set parameters using DataFrame values | |
SYNTH.set_parameter(DICO[i], values) | |
# (MIDI note, velocity, start, duration) | |
i += 1 | |
# Setting volume to 0.9 | |
SYNTH.set_parameter(1, 0.9) | |
# Set up the processing graph | |
graph = [ | |
# synth takes no inputs, so we give an empty list. | |
(SYNTH, []), | |
] | |
ENGINE.load_graph(graph) | |
ENGINE.render(1) | |
data = ENGINE.get_audio() | |
try: | |
data = librosa.to_mono(data).transpose() | |
except: | |
tf.print("ERROR" * 100) | |
df = df.fillna(0) | |
data = df.to_numpy() | |
data = librosa.to_mono(data).transpose() | |
tf.print("crashed, nan in generation") | |
synth_params = dict(SYNTH.get_patch()) | |
print(synth_params) | |
df = pd.DataFrame(data) | |
# penalty=1000000 | |
# df = pd.DataFrame(data) | |
# df = df.fillna(0) | |
# data = df.to_numpy() | |
output_file_path = str(f"temp/{file_id}_generated.wav") | |
wavfile.write(output_file_path, SAMPLE_RATE, data) | |
return file_path, xml_path, output_file_path | |
def generate_audio(df_params): | |
# Loop through the rows of the DataFrame | |
i = 0 | |
penalty = 1 | |
for param in df_params: | |
# Set parameters using DataFrame values | |
SYNTH.set_parameter(DICO[i], param.value) | |
# (MIDI note, velocity, start, duration) | |
i += 1 | |
# Set up the processing graph | |
graph = [ | |
# synth takes no inputs, so we give an empty list. | |
(SYNTH, []), | |
] | |
ENGINE.load_graph(graph) | |
ENGINE.render(1) | |
data = ENGINE.get_audio() | |
try: | |
data = librosa.to_mono(data).transpose() | |
except: | |
print("ERROR" * 100) | |
df = pd.DataFrame(data) | |
df = df.fillna(0) | |
data = df.to_numpy() | |
data = librosa.to_mono(data).transpose() | |
result = np.array(data) | |
return result, penalty | |