mimictalk / utils /audio /pitch /crepe_utils.py
mrbear1024's picture
init project
8eb4303
import os
import librosa
import numpy as np
import torch
from scipy.interpolate import interp1d
from scipy.ndimage import binary_erosion
from scipy.signal import medfilt
from utils.audio.pitch.extractor_utils import get_med_curve, clean_short_v_frag
def crepe_predict(audio, sr, model_capacity='full', center=True, step_size=10, verbose=1):
from crepe.core import to_viterbi_cents, to_local_average_cents
from crepe import get_activation
np.seterr(divide='ignore', invalid='ignore')
activation = get_activation(audio, sr, model_capacity=model_capacity,
center=center, step_size=step_size,
verbose=verbose)
confidence = activation.max(axis=1)
cents_v = to_viterbi_cents(activation)
frequency_v = 10 * 2 ** (cents_v / 1200)
frequency_v[np.isnan(frequency_v)] = 0
cents = to_local_average_cents(activation)
frequency = 10 * 2 ** (cents / 1200)
frequency[np.isnan(frequency)] = 0
time = np.arange(confidence.shape[0]) * step_size / 1000.0
return time, frequency_v, frequency, confidence, activation
def load_model(device, capacity='full'):
import torchcrepe
# Bind model and capacity
capacity = capacity
model = torchcrepe.Crepe(capacity)
# Load weights
file = os.path.join(os.path.dirname(torchcrepe.__file__), 'assets', f'{capacity}.pth')
model.load_state_dict(torch.load(file, map_location='cpu'))
# Place on device
model = model.to(torch.device(device))
# Eval mode
model.eval()
return model
def crepe_predict_torch(audio, sr, hop_length=None, model_capacity='full',
batch_size=None, device='cpu', pad=True):
from torchcrepe import preprocess, PITCH_BINS
import warnings
from crepe.core import to_viterbi_cents, to_local_average_cents
warnings.filterwarnings('ignore', message=r'Named tensors and all their associated APIs.*')
# Postprocessing breaks gradients, so just don't compute them
with torch.no_grad():
# Preprocess audio
generator = preprocess(audio,
sr,
hop_length,
batch_size,
device,
pad)
frames = next(generator)
# Infer independent probabilities for each pitch bin
model = load_model(device, model_capacity)
model = model.to(frames.device)
activation = model(frames)
del model
del frames
# shape=(batch, 360, time / hop_length)
activation = activation.reshape(-1, PITCH_BINS).cpu().numpy()
torch.cuda.empty_cache()
confidence = activation.max(axis=1)
cents_v = to_viterbi_cents(activation)
frequency_v = 10 * 2 ** (cents_v / 1200)
frequency_v[np.isnan(frequency_v)] = 0
cents = to_local_average_cents(activation)
frequency = 10 * 2 ** (cents / 1200)
frequency[np.isnan(frequency)] = 0
return frequency_v, frequency, confidence, activation
def cents_to_bins(cents):
"""Converts cents to pitch bins"""
CENTS_PER_BIN = 20 # cents
bins = (cents - 1997.3794084376191) / CENTS_PER_BIN
return np.round(bins).astype(int)
def cents_to_frequency(cents):
"""Converts cents to frequency in Hz"""
return 10 * 2 ** (cents / 1200)
def frequency_to_bins(frequency):
"""Convert frequency in Hz to pitch bins"""
return cents_to_bins(frequency_to_cents(frequency))
def frequency_to_cents(frequency):
"""Convert frequency in Hz to cents"""
return 1200 * np.log2(frequency / 10. + 1e-8)
def find_nearest_f0_in_piptrack(f0, pitches):
i_frame = np.arange(len(f0))
return pitches[i_frame, np.abs(f0[:, None] - pitches).argmin(-1)]
def f0_energy_corrector(wav_data_16k, f0_func, f0_min, f0_max, fix_octave_error=True):
hop_size = 256
win_size = hop_size * 6
sr = 16000
spec = np.abs(librosa.stft(wav_data_16k, n_fft=win_size, hop_length=hop_size,
win_length=win_size, pad_mode="constant").T)
T = spec.shape[0]
x_h256 = np.arange(0, 1, 1 / T)[:T]
x_h256[-1] = 1
f0 = f0_func(x_h256)
freqs = librosa.fft_frequencies(sr=sr, n_fft=win_size)
x_idx = np.arange(T)
def find_nearest_stft_bin(f0_):
return np.abs(freqs[None, :] - f0_[:, None]).argmin(-1)
def get_energy_mask(f0_lambda, hars=None, win_size=3):
if hars is None:
hars = [1]
mask = np.zeros([T, 10000]).astype(bool)
mask_bins = []
for multiple in hars:
f0_bin_idx = find_nearest_stft_bin(f0_lambda(f0, multiple))
for delta in range(-win_size // 2, 1 + win_size // 2):
y_idx = f0_bin_idx + delta
if np.max(y_idx) < spec.shape[1]:
mask_bins.append(spec[x_idx, y_idx])
mask[x_idx, y_idx] = 1
mask_bins = np.stack(mask_bins, 1)
energy_ = np.mean(mask_bins, 1)
return energy_, mask
bottom_idx = find_nearest_stft_bin(np.array([70]))[0]
bottom_energy = spec[:, :bottom_idx].mean()
pitches, _ = librosa.piptrack(
wav_data_16k, sr,
n_fft=win_size, win_length=win_size, hop_length=hop_size,
fmin=50, fmax=3000, ref=bottom_energy)
pitches = pitches.T[:T]
f0_piptrack = find_nearest_f0_in_piptrack(f0, pitches)
f0_raw = f0
f0 = f0_piptrack
# find uv first (for obtaining mean_energy_mharfhar)
energy_har, mask_har = get_energy_mask(lambda f0, m: f0 * m, [1, 2], 3)
energy_mhalfhar, mask_mhalfhar = get_energy_mask(lambda f0, m: f0 * (m - 0.5), [1], 5)
r_energy = energy_har / np.clip(energy_mhalfhar, 1e-8, None)
uv = np.zeros_like(f0).astype(bool)
uv |= r_energy < 10
uv |= (f0 > f0_max) | (f0 < f0_min)
uv |= energy_har < bottom_energy
mean_energy_mharfhar = np.clip(energy_mhalfhar[~uv].mean(), 1e-8, None)
if len(uv) > 0:
spec = np.clip(spec - spec[uv].mean(0)[None, :], 1e-8, None)
# fix octave error
r_energy_div_dict = {}
if fix_octave_error:
for div, mul, thres in [
(2, (1,), 20),
(3, (1, 2), 20),
(5, (1, 2, 3), 20),
]:
energy_div_har, mask_div_har = get_energy_mask(lambda f0, m: f0 / div * m, mul, 3)
r_energy_div = energy_div_har / mean_energy_mharfhar
r_energy_div = medfilt(r_energy_div, 5)
r_energy_div_dict[div] = r_energy_div
div_mask = (r_energy_div > thres) & (f0 / div > f0_min)
f0[div_mask] /= div
div_mask_erosion = binary_erosion(div_mask, iterations=2)
div_pos = sorted(np.where(div_mask_erosion)[0])
for pos in div_pos:
for s in range(10):
if pos - s not in div_pos and pos - s >= 0:
f0[pos - s] = pitches[pos - s, np.abs(f0[pos] - pitches[pos - s]).argmin()]
if pos + s not in div_pos and pos + s < T:
f0[pos + s] = pitches[pos + s, np.abs(f0[pos] - pitches[pos + s]).argmin()]
# find uv second
energy_har, mask_har = get_energy_mask(lambda f0, m: f0 * m, [1, 2], 3)
energy_mhalfhar, mask_mhalfhar = get_energy_mask(lambda f0, m: f0 * (m - 0.5), [1], 5)
energy_har_2, _ = get_energy_mask(lambda f0, m: f0 * m, [2], 3)
energy_mhalfhar_2, _ = get_energy_mask(lambda f0, m: f0 * (m - 0.5), [2, 3], 3)
r_energy = energy_har / np.clip(energy_mhalfhar, 1e-8, None)
r_energy = medfilt(r_energy, 3)
r_energy_2 = energy_har_2 / np.clip(energy_mhalfhar_2, 1e-8, None)
r_energy_2 = medfilt(r_energy_2, 3)
r_energy_2_mask = r_energy_2 < 3
r_energy_2_mask = binary_erosion(r_energy_2_mask, iterations=3)
uv = np.zeros_like(f0).astype(bool)
uv |= r_energy < 8
uv |= r_energy_2_mask
uv |= (f0 > f0_max) | (f0 < f0_min)
uv |= energy_har < bottom_energy
func_uv = interp1d(x_h256, uv, 'nearest')
func_f0_div = interp1d(x_h256, f0, 'nearest')
spec_log = np.log10(spec + 1e-8)
return func_uv, func_f0_div, {
'spec': spec_log,
'energy_har': energy_har, 'energy_halfhar': energy_mhalfhar,
'r_energy': r_energy, 'r_energy_2': r_energy_2,
'mask_har': mask_har, 'mask_halfhar': mask_mhalfhar,
'bottom_energy': bottom_energy,
'r_energy_div_dict': r_energy_div_dict,
'f0_piptrack': f0_piptrack,
'f0_raw': f0_raw
}
def crepe_with_corrector(wav_data, hop_size, audio_sample_rate, f0_min, f0_max, return_states=False, *args, **kwargs):
wav_data = wav_data.astype(np.double)
wav_data_16k = librosa.resample(wav_data, audio_sample_rate, 16000)
time, f0_10ms, f0_nov, confi, activation = crepe_predict(
wav_data_16k, 16000, step_size=10, model_capacity='small', center=True, verbose=0)
T_10ms = len(f0_10ms)
x_10ms = np.arange(0, 1, 1 / T_10ms)[:T_10ms]
x_10ms[-1] = 1.0
func_f0 = interp1d(x_10ms, f0_10ms, 'nearest')
n_mel_frames = int(len(wav_data) // hop_size)
x_new = np.arange(0, 1, 1 / n_mel_frames)[:n_mel_frames]
x_new[-1] = 1.0
# correct f0 using energy spec (first round)
func_uv, func_f0, states = f0_energy_corrector(wav_data_16k, func_f0, f0_min, f0_max, fix_octave_error=True)
f0_10ms = func_f0(x_10ms)
uv_10ms = (func_uv(x_10ms) > 1e-4) & (confi < 0.9)
uv_10ms = medfilt(uv_10ms.astype(float), 3) > 1e-4
states['activation'] = activation
states['confidence'] = confi
# viterbi by voiced chunk, to fix incorrect viterbi smoothing in UV border.
f0_10ms[uv_10ms] = 0
f0_10ms_new = np.zeros_like(f0_10ms).astype(float)
v_begin = -1
for i in range(T_10ms):
if not uv_10ms[i] and i < T_10ms - 1:
if v_begin == -1:
v_begin = i
elif v_begin != -1:
v_end = i - 1 if uv_10ms[i] else i
if v_end - v_begin > 3:
f0_bins = frequency_to_bins(f0_10ms[v_begin:v_end + 1])
for j, k in zip(range(v_begin, v_end + 1), f0_bins):
if f0_10ms[j] > 1e-4:
activation[j, k + 10:] /= 5
cents_v = to_viterbi_cents(activation[v_begin:v_end + 1])
f0__ = 10 * 2 ** (cents_v / 1200)
f0__[np.isnan(f0__)] = 0
f0_10ms_new[v_begin:v_end + 1] = f0__
v_begin = -1
f0_10ms = f0_10ms_new
# remove pitch deviated from median
f0_10ms[confi < 0.1] = 0
try:
x_med_curve, y_med_curve = get_med_curve(f0_10ms)
f0_med_curve = interp1d(np.array(x_med_curve), np.array(y_med_curve), 'nearest')(np.arange(len(f0_10ms)))
f0_10ms[(f0_10ms < f0_med_curve - 100) | (f0_10ms > f0_med_curve + 100)] = 0
states['f0_med_curve'] = interp1d(x_10ms, f0_med_curve)(x_new)
except:
pass
# print("| WARN: catch an Error in get_med_curve.")
# traceback.print_exc()
# correct f0 using energy spec (second round), for better UV
func_f0 = interp1d(x_10ms, f0_10ms, 'nearest')
func_uv, func_f0, states_ = f0_energy_corrector(wav_data_16k, func_f0, f0_min, f0_max, fix_octave_error=False)
del states_['r_energy_div_dict']
states.update(states_)
# interpolate f0
confi_new = interp1d(x_10ms, confi)(x_new)
f0 = func_f0(x_new)
uv = (clean_short_v_frag(f0) | (func_uv(x_new) > 1e-4)) & (confi_new < 0.9)
uv = medfilt(uv.astype(float), 3) > 1e-4
f0 = medfilt(f0, 3)
f0[uv] = 0
if return_states:
return f0, states
else:
return f0