atsushieee's picture
Upload folder using huggingface_hub
9791162
raw
history blame
16.4 kB
"""
| Description: libf0 yin implementation
| Contributors: Sebastian Rosenzweig, Simon Schwär, Edgar Suárez, Meinard Müller
| License: The MIT license, https://opensource.org/licenses/MIT
| This file is part of libf0.
"""
import numpy as np
from scipy.special import beta, comb # Scipy library for binomial beta distribution
from scipy.stats import triang # Scipy library for triangular distribution
from .yin import cumulative_mean_normalized_difference_function, parabolic_interpolation
from numba import njit
# pYIN estimate computation
def pyin(x, Fs=22050, N=2048, H=256, F_min=55.0, F_max=1760.0, R=10, thresholds=np.arange(0.01, 1, 0.01),
beta_params=[1, 18], absolute_min_prob=0.01, voicing_prob=0.5):
"""
Implementation of the pYIN F0-estimation algorithm.
.. [#] Matthias Mauch and Simon Dixon.
"PYIN: A fundamental frequency estimator using probabilistic threshold distributions".
IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP) (2014): 659-663.
Parameters
----------
x : ndarray
Audio signal
Fs : int
Sampling rate
N : int
Window size
H : int
Hop size
F_min : float or int
Minimal frequency
F_max : float or int
Maximal frequency
R : int
Frequency resolution given in cents
thresholds : ndarray
Range of thresholds
beta_params : tuple or list
Parameters of beta-distribution in the form [alpha, beta]
absolute_min_prob : float
Prior for voice activity
voicing_prob: float
Prior for transition probability?
Returns
-------
f0 : ndarray
Estimated F0-trajectory
t : ndarray
Time axis
conf : ndarray
Confidence
"""
if F_min > F_max:
raise Exception("F_min must be smaller than F_max!")
if F_min < Fs/N:
raise Exception(f"The condition (F_min >= Fs/N) was not met. With Fs = {Fs}, N = {N} and F_min = {F_min} you have the following options: \n1) Set F_min >= {np.ceil(Fs/N)} Hz. \n2) Set N >= {np.ceil(Fs/F_min).astype(int)}. \n3) Set Fs <= {np.floor(F_min * N)} Hz.")
x_pad = np.concatenate((np.zeros(N // 2), x, np.zeros(N // 2))) # Add zeros for centered estimates
# Compute Beta distribution
thr_idxs = np.arange(len(thresholds))
beta_distr = comb(len(thresholds), thr_idxs) * beta(thr_idxs+beta_params[0],
len(thresholds)-thr_idxs+beta_params[1]) / beta(beta_params[0],
beta_params[1])
# YIN with multiple thresholds, yielding observation matrix
B = int(np.log2(F_max / F_min) * (1200 / R))
F_axis = F_min * np.power(2, np.arange(B) * R / 1200) # for quantizing the estimated F0s
O, rms, p_orig, val_orig = yin_multi_thr(x_pad, Fs=Fs, N=N, H=H, F_min=F_min, F_max=F_max, thresholds=thresholds,
beta_distr=beta_distr, absolute_min_prob=absolute_min_prob, F_axis=F_axis,
voicing_prob=voicing_prob)
# Transition matrix, using triangular distribution used for pitch transition probabilities
max_step_cents = 50 # Pitch jump can be at most 50 cents from frame to frame
max_step = int(max_step_cents / R)
triang_distr = triang.pdf(np.arange(-max_step, max_step+1), 0.5, scale=2*max_step, loc=-max_step)
A = compute_transition_matrix(B, triang_distr)
# HMM smoothing
C = np.ones((2*B, 1)) / (2*B) # uniform initialization
f0_idxs = viterbi_log_likelihood(A, C.flatten(), O) # libfmp Viterbi implementation
# Obtain F0-trajectory
F_axis_extended = np.concatenate((F_axis, np.zeros(len(F_axis))))
f0 = F_axis_extended[f0_idxs]
# Suppress low power estimates
f0[0] = 0 # due to algorithmic reasons, we set the first value unvoiced
f0[rms < 0.01] = 0
# confidence
O_norm = O[:, np.arange(O.shape[1])]/np.max(O, axis=0)
conf = O_norm[f0_idxs, np.arange(O.shape[1])]
# Refine estimates by choosing the closest original YIN estimate
refine_estimates = True
if refine_estimates:
f0 = refine_estimates_yin(f0, p_orig, val_orig, Fs, R)
t = np.arange(O.shape[1]) * H / Fs # Time axis
return f0, t, conf
@njit
def refine_estimates_yin(f0, p_orig, val_orig, Fs, tol):
"""
Refine estimates using YIN CMNDF information.
Parameters
----------
f0 : ndarray
F0 in Hz
p_orig : ndarray
Original lag as computed by YIN
val_orig : ndarray
Original CMNDF values as computed by YIN
Fs : float
Sampling frequency
tol : float
Tolerance for refinements in cents
Returns
-------
f0_refined : ndarray
Refined F0-trajectory
"""
f0_refined = np.zeros_like(f0)
voiced_idxs = np.where(f0 > 0)[0]
f_orig = Fs / p_orig
# find closest original YIN estimate, maximally allowed absolute deviation: R (quantization error)
for m in voiced_idxs:
diff_cents = np.abs(1200 * np.log2(f_orig[:, m] / f0[m]))
candidate_idxs = np.where(diff_cents < tol)[0]
if not candidate_idxs.size:
f0_refined[m] = f0[m]
else:
f0_refined[m] = f_orig[candidate_idxs[np.argmin(val_orig[candidate_idxs, m])], m]
return f0_refined
@njit
def probabilistic_thresholding(cmndf, thresholds, p_min, p_max, absolute_min_prob, F_axis, Fs, beta_distr,
parabolic_interp=True):
"""
Probabilistic thresholding of the YIN CMNDF.
Parameters
----------
cmndf : ndarray
Cumulative Mean Normalized Difference Function
thresholds : ndarray
Array of thresholds for CMNDF
p_min : float
Period corresponding to the lower frequency bound
p_max : float
Period corresponding to the upper frequency bound
absolute_min_prob : float
Probability to chose absolute minimum
F_axis : ndarray
Frequency axis
Fs : float
Sampling rate
beta_distr : ndarray
Beta distribution that defines mapping between thresholds and probabilities
parabolic_interp : bool
Switch to activate/deactivate parabolic interpolation
Returns
-------
O_m : ndarray
Observations for given frame
lag_thr : ndarray
Computed lags for every threshold
val_thr : ndarray
CMNDF values for computed lag
"""
# restrict search range to interval [p_min:p_max]
cmndf[:p_min] = np.inf
cmndf[p_max:] = np.inf
# find local minima (assuming that cmndf is real in [p_min:p_max], you will always find a minimum,
# at least p_min or p_max)
min_idxs = (np.argwhere((cmndf[1:-1] < cmndf[0:-2]) & (cmndf[1:-1] < cmndf[2:]))).flatten().astype(np.int64) + 1
O_m = np.zeros(2 * len(F_axis))
# return if no minima are found, e.g., when frame is silence
if min_idxs.size == 0:
return O_m, np.ones_like(thresholds)*p_min, np.ones_like(thresholds)
# Optional: Parabolic Interpolation of local minima
if parabolic_interp:
# do not interpolate at the boarders, Numba compatible workaround for np.delete()
min_idxs_interp = delete_numba(min_idxs, np.argwhere(min_idxs == p_min))
min_idxs_interp = delete_numba(min_idxs_interp, np.argwhere(min_idxs_interp == p_max - 1))
p_corr, cmndf[min_idxs_interp] = parabolic_interpolation(cmndf[min_idxs_interp - 1],
cmndf[min_idxs_interp],
cmndf[min_idxs_interp + 1])
else:
p_corr = np.zeros_like(min_idxs).astype(np.float64)
# set p_corr=0 at the boarders (no correction done later)
if min_idxs[0] == p_min:
p_corr = np.concatenate((np.array([0.0]), p_corr))
if min_idxs[-1] == p_max - 1:
p_corr = np.concatenate((p_corr, np.array([0.0])))
lag_thr = np.zeros_like(thresholds)
val_thr = np.zeros_like(thresholds)
# loop over all thresholds
for i, threshold in enumerate(thresholds):
# minima below absolute threshold
min_idxs_thr = min_idxs[cmndf[min_idxs] < threshold]
# find first local minimum
if not min_idxs_thr.size:
lag = np.argmin(cmndf) # choose absolute minimum when no local minimum is found
am_prob = absolute_min_prob
val = np.min(cmndf)
else:
am_prob = 1
lag = np.min(min_idxs_thr) # choose first local minimum
val = cmndf[lag]
# correct lag
if parabolic_interp:
lag += p_corr[np.argmin(min_idxs_thr)]
# ensure that lag is in [p_min:p_max]
if lag < p_min:
lag = p_min
elif lag >= p_max:
lag = p_max - 1
lag_thr[i] = lag
val_thr[i] = val
idx = np.argmin(np.abs(1200 * np.log2(F_axis / (Fs / lag)))) # quantize estimated period
O_m[idx] += am_prob * beta_distr[i] # pYIN-Paper, Formula 4/5
return O_m, lag_thr, val_thr
@njit
def yin_multi_thr(x, Fs, N, H, F_min, F_max, thresholds, beta_distr, absolute_min_prob, F_axis, voicing_prob,
parabolic_interp=True):
"""
Applies YIN multiple times on input audio signals using different thresholds for CMNDF.
Parameters
----------
x : ndarray
Input audio signal
Fs : int
Sampling rate
N : int
Window size
H : int
Hop size
F_min : float
Lower frequency bound
F_max : float
Upper frequency bound
thresholds : ndarray
Array of thresholds
beta_distr : ndarray
Beta distribution that defines mapping between thresholds and probabilities
absolute_min_prob :float
Probability to chose absolute minimum
F_axis : ndarray
Frequency axis
voicing_prob : float
Probability of a frame being voiced
parabolic_interp : bool
Switch to activate/deactivate parabolic interpolation
Returns
-------
O : ndarray
Observations based on YIN output
rms : ndarray
Root mean square power
p_orig : ndarray
Original YIN period estimates
val_orig : ndarray
CMNDF values corresponding to original YIN period estimates
"""
M = int(np.floor((len(x) - N) / H)) + 1 # Compute number of estimates that will be generated
B = len(F_axis)
p_min = max(int(np.ceil(Fs / F_max)), 1) # period of maximal frequency in frames
p_max = int(np.ceil(Fs / F_min)) # period of minimal frequency in frames
if p_max > N:
raise Exception("The condition (Fmin >= Fs/N) was not met.")
rms = np.zeros(M) # RMS Power
O = np.zeros((2 * B, M)) # every voiced state has an unvoiced state (important for later HMM modeling)
p_orig = np.zeros((len(thresholds), M))
val_orig = np.zeros((len(thresholds), M))
for m in range(M):
# Take a frame from input signal
frame = x[m * H:m * H + N]
# Cumulative Mean Normalized Difference Function
cmndf = cumulative_mean_normalized_difference_function(frame, p_max)
# compute RMS power
rms[m] = np.sqrt(np.mean(frame ** 2))
# Probabilistic Thresholding with different thresholds
O_m, p_est_thr, val_thr = probabilistic_thresholding(cmndf, thresholds, p_min, p_max, absolute_min_prob, F_axis,
Fs, beta_distr, parabolic_interp=parabolic_interp)
O[:, m] = O_m
p_orig[:, m] = p_est_thr # store original YIN estimates for later refinement
val_orig[:, m] = val_thr # store original cmndf value of minimum corresponding to p_est
# normalization (pYIN-Paper, Formula 6)
O[0:B, :] *= voicing_prob
O[B:2 * B, :] = (1 - voicing_prob) * (1 - np.sum(O[0:B, :], axis=0)) / B
return O, rms, p_orig, val_orig
@njit
def compute_transition_matrix(M, triang_distr):
"""
Compute a transition matrix for PYIN Viterbi.
Parameters
----------
M : int
Matrix dimension
triang_distr : ndarray
(Triangular) distribution, defining tolerance for jumps deviating from the main diagonal
Returns
-------
A : ndarray
Transition matrix
"""
prob_self = 0.99
A = np.zeros((2*M, 2*M))
max_step = len(triang_distr) // 2
for i in range(M):
if i < max_step:
A[i, 0:i+max_step] = prob_self * triang_distr[max_step - i:-1] / np.sum(triang_distr[max_step - i:-1])
A[i+M, M:i+M+max_step] = prob_self * triang_distr[max_step - i:-1] / np.sum(triang_distr[max_step - i:-1])
if i >= max_step and i < M-max_step:
A[i, i-max_step:i+max_step+1] = prob_self * triang_distr
A[i+M, (i+M)-max_step:(i+M)+max_step+1] = prob_self * triang_distr
if i >= M-max_step:
A[i, i-max_step:M] = prob_self * triang_distr[0:max_step - (i-M)] / np.sum(triang_distr[0:max_step - (i-M)])
A[i+M, i+M-max_step:2*M] = prob_self * triang_distr[0:max_step - (i - M)] / \
np.sum(triang_distr[0:max_step - (i - M)])
A[i, i+M] = 1 - prob_self
A[i+M, i] = 1 - prob_self
return A
@njit
def viterbi_pyin(A, C, O):
"""Viterbi algorithm (pYIN variant)
Args:
A : ndarray
State transition probability matrix of dimension I x I
C : ndarray
Initial state distribution of dimension I X 1
O : ndarray
Likelihood matrix of dimension I x N
Returns:
idxs : ndarray
Optimal state sequence of length N
"""
B = O.shape[0] // 2
M = O.shape[1]
D = np.zeros((B * 2, M))
E = np.zeros((B * 2, M - 1))
idxs = np.zeros(M)
for i in range(B * 2):
D[i, 0] = C[i, 0] * O[i, 0] # D matrix Intial state setting
D[:, 0] = D[:, 0] / np.sum(D[:, 0]) # Normalization (using pYIN source code as a basis)
for n in range(1, M):
for i in range(B * 2):
abyd = np.multiply(A[:, i], D[:, n-1])
D[i, n] = np.max(abyd) * O[i, n]
E[i, n-1] = np.argmax(abyd)
D[:, n] = D[:, n] / np.sum(D[:, n]) # Row normalization to avoid underflow (pYIN source code sparseHMM)
idxs[M - 1] = np.argmax(D[:, M - 1])
for n in range(M - 2, 0, -1):
bkd = int(idxs[n+1]) # Intermediate variable to be compatible with Numba
idxs[n] = E[bkd, n]
return idxs.astype(np.int32)
@njit
def viterbi_log_likelihood(A, C, B_O):
"""Viterbi algorithm (log variant) for solving the uncovering problem
Notebook: C5/C5S3_Viterbi.ipynb
Args:
A : ndarray
State transition probability matrix of dimension I x I
C : ndarray
Initial state distribution of dimension I
B_O : ndarray
Likelihood matrix of dimension I x N
Returns:
S_opt : ndarray
Optimal state sequence of length N
"""
I = A.shape[0] # Number of states
N = B_O.shape[1] # Length of observation sequence
tiny = np.finfo(0.).tiny
A_log = np.log(A + tiny)
C_log = np.log(C + tiny)
B_O_log = np.log(B_O + tiny)
# Initialize D and E matrices
D_log = np.zeros((I, N))
E = np.zeros((I, N-1)).astype(np.int32)
D_log[:, 0] = C_log + B_O_log[:, 0]
# Compute D and E in a nested loop
for n in range(1, N):
for i in range(I):
temp_sum = A_log[:, i] + D_log[:, n-1]
D_log[i, n] = np.max(temp_sum) + B_O_log[i, n]
E[i, n-1] = np.argmax(temp_sum)
# Backtracking
S_opt = np.zeros(N).astype(np.int32)
S_opt[-1] = np.argmax(D_log[:, -1])
for n in range(N-2, -1, -1):
S_opt[n] = E[int(S_opt[n+1]), n]
return S_opt
@njit
def delete_numba(arr, num):
"""Delete number from array, Numba compatible. Inspired by:
https://stackoverflow.com/questions/53602663/delete-a-row-in-numpy-array-in-numba
"""
mask = np.zeros(len(arr), dtype=np.int64) == 0
mask[np.where(arr == num)[0]] = False
return arr[mask]