Spaces:
Build error
Build error
import os | |
import librosa | |
import numpy as np | |
import torch | |
from scipy.interpolate import interp1d | |
from scipy.ndimage import binary_erosion | |
from scipy.signal import medfilt | |
from utils.audio.pitch.extractor_utils import get_med_curve, clean_short_v_frag | |
def crepe_predict(audio, sr, model_capacity='full', center=True, step_size=10, verbose=1): | |
from crepe.core import to_viterbi_cents, to_local_average_cents | |
from crepe import get_activation | |
np.seterr(divide='ignore', invalid='ignore') | |
activation = get_activation(audio, sr, model_capacity=model_capacity, | |
center=center, step_size=step_size, | |
verbose=verbose) | |
confidence = activation.max(axis=1) | |
cents_v = to_viterbi_cents(activation) | |
frequency_v = 10 * 2 ** (cents_v / 1200) | |
frequency_v[np.isnan(frequency_v)] = 0 | |
cents = to_local_average_cents(activation) | |
frequency = 10 * 2 ** (cents / 1200) | |
frequency[np.isnan(frequency)] = 0 | |
time = np.arange(confidence.shape[0]) * step_size / 1000.0 | |
return time, frequency_v, frequency, confidence, activation | |
def load_model(device, capacity='full'): | |
import torchcrepe | |
# Bind model and capacity | |
capacity = capacity | |
model = torchcrepe.Crepe(capacity) | |
# Load weights | |
file = os.path.join(os.path.dirname(torchcrepe.__file__), 'assets', f'{capacity}.pth') | |
model.load_state_dict(torch.load(file, map_location='cpu')) | |
# Place on device | |
model = model.to(torch.device(device)) | |
# Eval mode | |
model.eval() | |
return model | |
def crepe_predict_torch(audio, sr, hop_length=None, model_capacity='full', | |
batch_size=None, device='cpu', pad=True): | |
from torchcrepe import preprocess, PITCH_BINS | |
import warnings | |
from crepe.core import to_viterbi_cents, to_local_average_cents | |
warnings.filterwarnings('ignore', message=r'Named tensors and all their associated APIs.*') | |
# Postprocessing breaks gradients, so just don't compute them | |
with torch.no_grad(): | |
# Preprocess audio | |
generator = preprocess(audio, | |
sr, | |
hop_length, | |
batch_size, | |
device, | |
pad) | |
frames = next(generator) | |
# Infer independent probabilities for each pitch bin | |
model = load_model(device, model_capacity) | |
model = model.to(frames.device) | |
activation = model(frames) | |
del model | |
del frames | |
# shape=(batch, 360, time / hop_length) | |
activation = activation.reshape(-1, PITCH_BINS).cpu().numpy() | |
torch.cuda.empty_cache() | |
confidence = activation.max(axis=1) | |
cents_v = to_viterbi_cents(activation) | |
frequency_v = 10 * 2 ** (cents_v / 1200) | |
frequency_v[np.isnan(frequency_v)] = 0 | |
cents = to_local_average_cents(activation) | |
frequency = 10 * 2 ** (cents / 1200) | |
frequency[np.isnan(frequency)] = 0 | |
return frequency_v, frequency, confidence, activation | |
def cents_to_bins(cents): | |
"""Converts cents to pitch bins""" | |
CENTS_PER_BIN = 20 # cents | |
bins = (cents - 1997.3794084376191) / CENTS_PER_BIN | |
return np.round(bins).astype(int) | |
def cents_to_frequency(cents): | |
"""Converts cents to frequency in Hz""" | |
return 10 * 2 ** (cents / 1200) | |
def frequency_to_bins(frequency): | |
"""Convert frequency in Hz to pitch bins""" | |
return cents_to_bins(frequency_to_cents(frequency)) | |
def frequency_to_cents(frequency): | |
"""Convert frequency in Hz to cents""" | |
return 1200 * np.log2(frequency / 10. + 1e-8) | |
def find_nearest_f0_in_piptrack(f0, pitches): | |
i_frame = np.arange(len(f0)) | |
return pitches[i_frame, np.abs(f0[:, None] - pitches).argmin(-1)] | |
def f0_energy_corrector(wav_data_16k, f0_func, f0_min, f0_max, fix_octave_error=True): | |
hop_size = 256 | |
win_size = hop_size * 6 | |
sr = 16000 | |
spec = np.abs(librosa.stft(wav_data_16k, n_fft=win_size, hop_length=hop_size, | |
win_length=win_size, pad_mode="constant").T) | |
T = spec.shape[0] | |
x_h256 = np.arange(0, 1, 1 / T)[:T] | |
x_h256[-1] = 1 | |
f0 = f0_func(x_h256) | |
freqs = librosa.fft_frequencies(sr=sr, n_fft=win_size) | |
x_idx = np.arange(T) | |
def find_nearest_stft_bin(f0_): | |
return np.abs(freqs[None, :] - f0_[:, None]).argmin(-1) | |
def get_energy_mask(f0_lambda, hars=None, win_size=3): | |
if hars is None: | |
hars = [1] | |
mask = np.zeros([T, 10000]).astype(bool) | |
mask_bins = [] | |
for multiple in hars: | |
f0_bin_idx = find_nearest_stft_bin(f0_lambda(f0, multiple)) | |
for delta in range(-win_size // 2, 1 + win_size // 2): | |
y_idx = f0_bin_idx + delta | |
if np.max(y_idx) < spec.shape[1]: | |
mask_bins.append(spec[x_idx, y_idx]) | |
mask[x_idx, y_idx] = 1 | |
mask_bins = np.stack(mask_bins, 1) | |
energy_ = np.mean(mask_bins, 1) | |
return energy_, mask | |
bottom_idx = find_nearest_stft_bin(np.array([70]))[0] | |
bottom_energy = spec[:, :bottom_idx].mean() | |
pitches, _ = librosa.piptrack( | |
wav_data_16k, sr, | |
n_fft=win_size, win_length=win_size, hop_length=hop_size, | |
fmin=50, fmax=3000, ref=bottom_energy) | |
pitches = pitches.T[:T] | |
f0_piptrack = find_nearest_f0_in_piptrack(f0, pitches) | |
f0_raw = f0 | |
f0 = f0_piptrack | |
# find uv first (for obtaining mean_energy_mharfhar) | |
energy_har, mask_har = get_energy_mask(lambda f0, m: f0 * m, [1, 2], 3) | |
energy_mhalfhar, mask_mhalfhar = get_energy_mask(lambda f0, m: f0 * (m - 0.5), [1], 5) | |
r_energy = energy_har / np.clip(energy_mhalfhar, 1e-8, None) | |
uv = np.zeros_like(f0).astype(bool) | |
uv |= r_energy < 10 | |
uv |= (f0 > f0_max) | (f0 < f0_min) | |
uv |= energy_har < bottom_energy | |
mean_energy_mharfhar = np.clip(energy_mhalfhar[~uv].mean(), 1e-8, None) | |
if len(uv) > 0: | |
spec = np.clip(spec - spec[uv].mean(0)[None, :], 1e-8, None) | |
# fix octave error | |
r_energy_div_dict = {} | |
if fix_octave_error: | |
for div, mul, thres in [ | |
(2, (1,), 20), | |
(3, (1, 2), 20), | |
(5, (1, 2, 3), 20), | |
]: | |
energy_div_har, mask_div_har = get_energy_mask(lambda f0, m: f0 / div * m, mul, 3) | |
r_energy_div = energy_div_har / mean_energy_mharfhar | |
r_energy_div = medfilt(r_energy_div, 5) | |
r_energy_div_dict[div] = r_energy_div | |
div_mask = (r_energy_div > thres) & (f0 / div > f0_min) | |
f0[div_mask] /= div | |
div_mask_erosion = binary_erosion(div_mask, iterations=2) | |
div_pos = sorted(np.where(div_mask_erosion)[0]) | |
for pos in div_pos: | |
for s in range(10): | |
if pos - s not in div_pos and pos - s >= 0: | |
f0[pos - s] = pitches[pos - s, np.abs(f0[pos] - pitches[pos - s]).argmin()] | |
if pos + s not in div_pos and pos + s < T: | |
f0[pos + s] = pitches[pos + s, np.abs(f0[pos] - pitches[pos + s]).argmin()] | |
# find uv second | |
energy_har, mask_har = get_energy_mask(lambda f0, m: f0 * m, [1, 2], 3) | |
energy_mhalfhar, mask_mhalfhar = get_energy_mask(lambda f0, m: f0 * (m - 0.5), [1], 5) | |
energy_har_2, _ = get_energy_mask(lambda f0, m: f0 * m, [2], 3) | |
energy_mhalfhar_2, _ = get_energy_mask(lambda f0, m: f0 * (m - 0.5), [2, 3], 3) | |
r_energy = energy_har / np.clip(energy_mhalfhar, 1e-8, None) | |
r_energy = medfilt(r_energy, 3) | |
r_energy_2 = energy_har_2 / np.clip(energy_mhalfhar_2, 1e-8, None) | |
r_energy_2 = medfilt(r_energy_2, 3) | |
r_energy_2_mask = r_energy_2 < 3 | |
r_energy_2_mask = binary_erosion(r_energy_2_mask, iterations=3) | |
uv = np.zeros_like(f0).astype(bool) | |
uv |= r_energy < 8 | |
uv |= r_energy_2_mask | |
uv |= (f0 > f0_max) | (f0 < f0_min) | |
uv |= energy_har < bottom_energy | |
func_uv = interp1d(x_h256, uv, 'nearest') | |
func_f0_div = interp1d(x_h256, f0, 'nearest') | |
spec_log = np.log10(spec + 1e-8) | |
return func_uv, func_f0_div, { | |
'spec': spec_log, | |
'energy_har': energy_har, 'energy_halfhar': energy_mhalfhar, | |
'r_energy': r_energy, 'r_energy_2': r_energy_2, | |
'mask_har': mask_har, 'mask_halfhar': mask_mhalfhar, | |
'bottom_energy': bottom_energy, | |
'r_energy_div_dict': r_energy_div_dict, | |
'f0_piptrack': f0_piptrack, | |
'f0_raw': f0_raw | |
} | |
def crepe_with_corrector(wav_data, hop_size, audio_sample_rate, f0_min, f0_max, return_states=False, *args, **kwargs): | |
wav_data = wav_data.astype(np.double) | |
wav_data_16k = librosa.resample(wav_data, audio_sample_rate, 16000) | |
time, f0_10ms, f0_nov, confi, activation = crepe_predict( | |
wav_data_16k, 16000, step_size=10, model_capacity='small', center=True, verbose=0) | |
T_10ms = len(f0_10ms) | |
x_10ms = np.arange(0, 1, 1 / T_10ms)[:T_10ms] | |
x_10ms[-1] = 1.0 | |
func_f0 = interp1d(x_10ms, f0_10ms, 'nearest') | |
n_mel_frames = int(len(wav_data) // hop_size) | |
x_new = np.arange(0, 1, 1 / n_mel_frames)[:n_mel_frames] | |
x_new[-1] = 1.0 | |
# correct f0 using energy spec (first round) | |
func_uv, func_f0, states = f0_energy_corrector(wav_data_16k, func_f0, f0_min, f0_max, fix_octave_error=True) | |
f0_10ms = func_f0(x_10ms) | |
uv_10ms = (func_uv(x_10ms) > 1e-4) & (confi < 0.9) | |
uv_10ms = medfilt(uv_10ms.astype(float), 3) > 1e-4 | |
states['activation'] = activation | |
states['confidence'] = confi | |
# viterbi by voiced chunk, to fix incorrect viterbi smoothing in UV border. | |
f0_10ms[uv_10ms] = 0 | |
f0_10ms_new = np.zeros_like(f0_10ms).astype(float) | |
v_begin = -1 | |
for i in range(T_10ms): | |
if not uv_10ms[i] and i < T_10ms - 1: | |
if v_begin == -1: | |
v_begin = i | |
elif v_begin != -1: | |
v_end = i - 1 if uv_10ms[i] else i | |
if v_end - v_begin > 3: | |
f0_bins = frequency_to_bins(f0_10ms[v_begin:v_end + 1]) | |
for j, k in zip(range(v_begin, v_end + 1), f0_bins): | |
if f0_10ms[j] > 1e-4: | |
activation[j, k + 10:] /= 5 | |
cents_v = to_viterbi_cents(activation[v_begin:v_end + 1]) | |
f0__ = 10 * 2 ** (cents_v / 1200) | |
f0__[np.isnan(f0__)] = 0 | |
f0_10ms_new[v_begin:v_end + 1] = f0__ | |
v_begin = -1 | |
f0_10ms = f0_10ms_new | |
# remove pitch deviated from median | |
f0_10ms[confi < 0.1] = 0 | |
try: | |
x_med_curve, y_med_curve = get_med_curve(f0_10ms) | |
f0_med_curve = interp1d(np.array(x_med_curve), np.array(y_med_curve), 'nearest')(np.arange(len(f0_10ms))) | |
f0_10ms[(f0_10ms < f0_med_curve - 100) | (f0_10ms > f0_med_curve + 100)] = 0 | |
states['f0_med_curve'] = interp1d(x_10ms, f0_med_curve)(x_new) | |
except: | |
pass | |
# print("| WARN: catch an Error in get_med_curve.") | |
# traceback.print_exc() | |
# correct f0 using energy spec (second round), for better UV | |
func_f0 = interp1d(x_10ms, f0_10ms, 'nearest') | |
func_uv, func_f0, states_ = f0_energy_corrector(wav_data_16k, func_f0, f0_min, f0_max, fix_octave_error=False) | |
del states_['r_energy_div_dict'] | |
states.update(states_) | |
# interpolate f0 | |
confi_new = interp1d(x_10ms, confi)(x_new) | |
f0 = func_f0(x_new) | |
uv = (clean_short_v_frag(f0) | (func_uv(x_new) > 1e-4)) & (confi_new < 0.9) | |
uv = medfilt(uv.astype(float), 3) > 1e-4 | |
f0 = medfilt(f0, 3) | |
f0[uv] = 0 | |
if return_states: | |
return f0, states | |
else: | |
return f0 | |