infer-vst / back /models /launch.py
yderre-aubay's picture
Update back/models/launch.py
d5f5022 verified
import datetime
import json
import os
from pickle import load
from typing import Callable, List
import librosa
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from keras import backend as K
from kapre.time_frequency import Spectrogram
from models.convert_to_preset import convert_csv_to_preset
from models.importer_audio import audio_importer
import dawdreamer as daw
from scipy.io import wavfile
import librosa
from generators.parameters import ParameterSet, ParamValue
weight_var = K.variable(0.0)
class Weight_trans(keras.callbacks.Callback):
def __init__(self, weight_var, transition, epochs):
self.alpha = weight_var
self.transition = transition
self.epochs = epochs
def on_epoch_end(self, epoch, logs={}):
if epoch > 680:
if self.transition == "linear":
K.set_value(self.alpha, ((epoch) / (self.epochs) - 0.617) * 0.00001)
tf.print(f"new weight {weight_var.numpy()}")
if self.transition == "linear2":
K.set_value(self.alpha, (1.5625 * epoch - 1.0625) * 0.00001)
tf.print(f"new weight {weight_var.numpy()}")
if self.transition == "log":
K.set_value(
self.alpha,
(
1
- (tf.math.log(epoch * 0.001 - 0.67285) / tf.math.log(0.0005))
- 0.35
)
* 0.00001,
)
tf.print("log")
if self.transition == "log2":
K.set_value(
self.alpha,
(
1
- (tf.math.log(epoch * 0.001 - 0.6575) / tf.math.log(0.0005))
- 0.5
)
* 0.00001,
)
tf.print("log")
if self.transition == "log3":
K.set_value(
self.alpha,
(
1
- (
tf.math.log(epoch * 0.001 - 0.67978)
/ tf.math.log(0.00000005)
)
- 0.5
)
* 0.00001,
)
tf.print("log")
if self.transition == "square":
K.set_value(self.alpha, 4.1 * tf.pow(epoch * 0.001 - 0.65, 2) + 0.002)
print("exp")
if self.transition == "quad":
K.set_value(self.alpha, 33 * tf.pow(epoch * 0.001 - 0.65, 4) + 0.002)
print("quad")
"""Model Utils"""
def mean_percentile_rank(y_true, y_pred, k=5):
"""
@paper
The first evaluation measure is the Mean Percentile Rank
(MPR) which is computed per synthesizer parameter.
"""
# TODO
def top_k_mean_accuracy(y_true, y_pred, k=5):
"""
@ paper
The top-k mean accuracy is obtained by computing the top-k
accuracy for each test example and then taking the mean across
all examples. In the same manner as done in the MPR analysis,
we compute the top-k mean accuracy per synthesizer
parameter for π‘˜ = 1, ... ,5.
"""
# TODO: per parameter?
original_shape = tf.shape(y_true)
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1]))
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1]))
top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k)
correct_pred = tf.reshape(top_k, original_shape[:-1])
return tf.reduce_mean(tf.cast(correct_pred, tf.float32))
@tf.function
def CustomLoss(y_true, y_pred):
bce = tf.keras.losses.BinaryCrossentropy()
weights = custom_spectral_loss(y_true, y_pred)
weight_shift = (1 - weight_var.numpy()) + (weight_var.numpy() * weights.numpy())
# tf.print(f"New weight is {weight_shift}")
loss = bce(y_true, y_pred, sample_weight=weight_shift)
return loss
@tf.function
def custom_spectral_loss(y_true, y_pred):
# tf.print("After compiling model :",tf.executing_eagerly())
y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1]))
y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1]))
# Assuming y_true and y_pred contain parameters for audio synthesis
# Extract parameters from y_true and y_pred
with open("test_datasets/InverSynth_params.pckl", "rb") as f:
parameters: ParameterSet = load(f)
predlist_true: List[ParamValue] = parameters.decode(y_true[0])
predlist_pred: List[ParamValue] = parameters.decode(y_pred[0])
# Convert parameter lists to DataFrames
# Generate audio from parameters
audio_true, penalty = generate_audio(predlist_true)
audio_pred, penalty = generate_audio(predlist_pred)
# Compute spectrogram
if SPECTRO_TYPE == "spectro":
spectrogram_true = tf.math.abs(
tf.signal.stft(audio_true, frame_length=1024, frame_step=512)
)
spectrogram_pred = tf.math.abs(
tf.signal.stft(audio_pred, frame_length=1024, frame_step=512)
)
elif SPECTRO_TYPE == "qtrans":
spectrogram_true = librosa.amplitude_to_db(
librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max
)
spectrogram_pred = librosa.amplitude_to_db(
librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max
)
elif SPECTRO_TYPE == "mel":
mel_spect = librosa.feature.melspectrogram(
audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024
)
spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max)
mel_spect = librosa.feature.melspectrogram(
audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024
)
spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max)
# L1 LOSS
if LOSS_TYPE == "L1":
spectral_loss = penalty * tf.reduce_mean(
tf.abs(spectrogram_true - spectrogram_pred)
)
# L2 LOSS
elif LOSS_TYPE == "L2":
spectral_loss = penalty * tf.reduce_mean(
(spectrogram_true - spectrogram_pred) ** 2
)
# COSINE LOSS
elif LOSS_TYPE == "COSINE":
spectral_loss = tf.losses.cosine_distance(
spectrogram_true, spectrogram_pred, weights=1.0, axis=-1
)
return spectral_loss
def compare(target, prediction, params, precision=1, print_output=False):
if print_output and len(prediction) < 10:
print(prediction)
print("Pred: {}".format(np.round(prediction, decimals=2)))
print("PRnd: {}".format(np.round(prediction)))
print("Act : {}".format(target))
print("+" * 5)
pred: List[ParamValue] = params.decode(prediction)
act: List[ParamValue] = params.decode(target)
pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred]
act_index: List[int] = [np.array(p.encoding).argmax() for p in act]
width = 8
names = "Parameter: "
act_s = "Actual: "
pred_s = "Predicted: "
pred_i = "Pred. Indx:"
act_i = "Act. Index:"
diff_i = "Index Diff:"
for p in act:
names += p.name.rjust(width)[:width]
act_s += f"{p.value:>8.2f}"
for p in pred:
pred_s += f"{p.value:>8.2f}"
for p in pred_index:
pred_i += f"{p:>8}"
for p in act_index:
act_i += f"{p:>8}"
for i in range(len(act_index)):
diff = pred_index[i] - act_index[i]
diff_i += f"{diff:>8}"
exact = 0.0
close = 0.0
n_params = len(pred_index)
for i in range(n_params):
if pred_index[i] == act_index[i]:
exact = exact + 1.0
if abs(pred_index[i] - act_index[i]) <= precision:
close = close + 1.0
exact_ratio = exact / n_params
close_ratio = close / n_params
if print_output:
print(names)
print(act_s)
print(pred_s)
print(act_i)
print(pred_i)
print(diff_i)
print("-" * 30)
return exact_ratio, close_ratio
def evaluate(
prediction: np.ndarray,
x: np.ndarray,
y: np.ndarray,
params: ParameterSet,
):
print("Prediction Shape: {}".format(prediction.shape))
num: int = x.shape[0]
correct: int = 0
correct_r: float = 0.0
close_r: float = 0.0
for i in range(num):
should_print = i < 5
exact, close = compare(
target=y[i],
prediction=prediction[i],
params=params,
print_output=should_print,
)
if exact == 1.0:
correct = correct + 1
correct_r += exact
close_r += close
summary = params.explain()
print(
"{} Parameters with {} levels (fixed: {})".format(
summary["n_variable"], summary["levels"], summary["n_fixed"]
)
)
print(
"Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format(
correct,
num,
correct / num * 100,
correct_r / num * 100,
close_r / num * 100,
)
)
"""
Wrap up the whole training process in a standard function. Gets a callback
to actually make the model, to keep it as flexible as possible.
# Params:
# - dataset_name (dataset name)
# - model_name: (C1..C6,e2e)
# - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model
# - epochs: int
# - dataset_dir: place to find input data
# - output_dir: place to put outputs
# - parameters_file (override parameters filename)
# - dataset_file (override dataset filename)
# - data_format (channels_first or channels_last)
# - run_name: to save this run as
"""
# LOSS TYPE FOR CUSTOM LOSS FUNCTION
LOSS_TYPE = "L1"
SPECTRO_TYPE = "spectro"
PRINT = 1
# DAWDREAMER EXPORT SETTINGS
SAMPLE_RATE = 16384
BUFFER_SIZE = 1024
SYNTH_PLUGIN = 'libTAL-NoiseMaker.so'
# SYNTH_PLUGIN = "TAL-NoiseMaker.vst3"
ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE)
SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN)
SYNTH.add_midi_note(40, 127, 0, 0.8)
with open("plugin_config/TAL-NoiseMaker-config.json") as f:
data = json.load(f)
dico = []
# Extract the key ID from the JSON data
key_id = data["parameters"]
for param in key_id:
dico.append(param["id"])
DICO = dico
def train_model(
# Main options
dataset_name: str,
model_name: str,
epochs: int,
model_callback: Callable[[str, int, int, str], keras.Model],
dataset_dir: str,
output_dir: str, # Directory names
dataset_file: str = None,
parameters_file: str = None,
run_name: str = None,
data_format: str = "channels_last",
save_best: bool = True,
resume: bool = False,
checkpoint: bool = True,
model_type: str = "STFT",
):
tf.config.run_functions_eagerly(True)
# tf.data.experimental.enable_debug_mode()
time_generated = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
if not dataset_file:
dataset_file = (
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5"
)
if not parameters_file:
parameters_file = (
os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl"
)
if not run_name:
run_name = dataset_name + "_" + model_name
model_file = f"{output_dir}/model/{run_name}_{time_generated}"
if not os.path.exists(model_file):
os.makedirs(model_file)
best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}"
if not os.path.exists(best_model_file):
os.makedirs(best_model_file)
if resume:
# checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
# history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
checkpoint_model_file = (
f"{output_dir}/checkpoints/model"
)
history_file = f"{output_dir}/history/model"
print(tf.config.list_physical_devices("GPU"))
gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False
cuda_gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False
print("+" * 30)
print(f"++ {run_name}")
print(
f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs"
)
print(f"Saving model in {output_dir} as {model_file}")
print(f"Saving history as {history_file}")
print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}")
print("+" * 30)
os.makedirs(output_dir, exist_ok=True)
# Get training and validation generators
params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True}
model: keras.Model = None
if resume and os.path.exists(checkpoint_model_file):
history = pd.read_csv(history_file)
# Note - its zero indexed in the file, but 1 indexed in the display
initial_epoch: int = max(history.iloc[:, 0]) + 1
# epochs:int = initial_epoch
print(
f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}"
)
model = keras.models.load_model(
checkpoint_model_file,
custom_objects={
"top_k_mean_accuracy": top_k_mean_accuracy,
"Spectrogram": Spectrogram,
"custom_spectral_loss": custom_spectral_loss,
"CustomLoss": CustomLoss,
},
)
return model, parameters_file
def inference(model: keras.Model, parameters_file: str, file_path: str, file_id: str):
# Start infer
with open(parameters_file, "rb") as f:
parameters: ParameterSet = load(f)
print("++++" * 5)
print("Pushing to trained model")
print("++++" * 5)
Valid = False
while Valid == False:
namefile = file_path
if os.path.exists(namefile):
Valid = True
else:
raise("File Path invalid, try again ")
try:
newpred = model.predict(audio_importer(str(f"{namefile}")))
except:
raise "Crashed"
predlist: List[ParamValue] = parameters.decode(newpred[0])
df = pd.DataFrame(predlist)
print(df)
df = df.drop(["encoding"], axis=1)
# saving the dataframe
print("Outputting CSV config in " + str(f"temp/"))
csv_path = str(f"temp/{file_id}_config.csv")
xml_path_wow = (f"temp/{file_id}_config.noisemakerpreset")
df.to_csv(csv_path)
xml_path = convert_csv_to_preset(csv_path, xml_path_wow)
# export(prediction, X, y, parameters)
# Loop through the rows of the DataFrame
i = 0
for values in df["value"].values:
# Set parameters using DataFrame values
SYNTH.set_parameter(DICO[i], values)
# (MIDI note, velocity, start, duration)
i += 1
# Setting volume to 0.9
SYNTH.set_parameter(1, 0.9)
# Set up the processing graph
graph = [
# synth takes no inputs, so we give an empty list.
(SYNTH, []),
]
ENGINE.load_graph(graph)
ENGINE.render(1)
data = ENGINE.get_audio()
try:
data = librosa.to_mono(data).transpose()
except:
tf.print("ERROR" * 100)
df = df.fillna(0)
data = df.to_numpy()
data = librosa.to_mono(data).transpose()
tf.print("crashed, nan in generation")
synth_params = dict(SYNTH.get_patch())
print(synth_params)
df = pd.DataFrame(data)
# penalty=1000000
# df = pd.DataFrame(data)
# df = df.fillna(0)
# data = df.to_numpy()
output_file_path = str(f"temp/{file_id}_generated.wav")
wavfile.write(output_file_path, SAMPLE_RATE, data)
return file_path, xml_path, output_file_path
def generate_audio(df_params):
# Loop through the rows of the DataFrame
i = 0
penalty = 1
for param in df_params:
# Set parameters using DataFrame values
SYNTH.set_parameter(DICO[i], param.value)
# (MIDI note, velocity, start, duration)
i += 1
# Set up the processing graph
graph = [
# synth takes no inputs, so we give an empty list.
(SYNTH, []),
]
ENGINE.load_graph(graph)
ENGINE.render(1)
data = ENGINE.get_audio()
try:
data = librosa.to_mono(data).transpose()
except:
print("ERROR" * 100)
df = pd.DataFrame(data)
df = df.fillna(0)
data = df.to_numpy()
data = librosa.to_mono(data).transpose()
result = np.array(data)
return result, penalty