shethjenil commited on
Commit
c2ae55d
·
verified ·
1 Parent(s): 9ea3ca5

Upload folder using huggingface_hub

Browse files
musc/dtw/__init__.py ADDED
File without changes
musc/dtw/__pycache__/__init__.cpython-310.pyc ADDED
Binary file (169 Bytes). View file
 
musc/dtw/__pycache__/__init__.cpython-39.pyc ADDED
Binary file (181 Bytes). View file
 
musc/dtw/__pycache__/anchor.cpython-310.pyc ADDED
Binary file (4.23 kB). View file
 
musc/dtw/__pycache__/anchor.cpython-39.pyc ADDED
Binary file (4.21 kB). View file
 
musc/dtw/__pycache__/core.__C_to_DE-6.py310.1.nbc ADDED
Binary file (113 kB). View file
 
musc/dtw/__pycache__/core.__C_to_DE-6.py310.2.nbc ADDED
Binary file (111 kB). View file
 
musc/dtw/__pycache__/core.__C_to_DE-6.py310.3.nbc ADDED
Binary file (111 kB). View file
 
musc/dtw/__pycache__/core.__C_to_DE-6.py310.nbi ADDED
Binary file (3.27 kB). View file
 
musc/dtw/__pycache__/core.__E_to_warping_path-82.py310.1.nbc ADDED
Binary file (68 kB). View file
 
musc/dtw/__pycache__/core.__E_to_warping_path-82.py310.2.nbc ADDED
Binary file (68 kB). View file
 
musc/dtw/__pycache__/core.__E_to_warping_path-82.py310.3.nbc ADDED
Binary file (68 kB). View file
 
musc/dtw/__pycache__/core.__E_to_warping_path-82.py310.nbi ADDED
Binary file (3.35 kB). View file
 
musc/dtw/__pycache__/core.cpython-310.pyc ADDED
Binary file (5.45 kB). View file
 
musc/dtw/__pycache__/core.cpython-39.pyc ADDED
Binary file (5.41 kB). View file
 
musc/dtw/__pycache__/cost.cpython-310.pyc ADDED
Binary file (2.94 kB). View file
 
musc/dtw/__pycache__/cost.cpython-39.pyc ADDED
Binary file (2.92 kB). View file
 
musc/dtw/__pycache__/mrmsdtw.cpython-310.pyc ADDED
Binary file (16.3 kB). View file
 
musc/dtw/__pycache__/mrmsdtw.cpython-39.pyc ADDED
Binary file (16.1 kB). View file
 
musc/dtw/__pycache__/utils.cpython-310.pyc ADDED
Binary file (12.3 kB). View file
 
musc/dtw/__pycache__/utils.cpython-39.pyc ADDED
Binary file (12.3 kB). View file
 
musc/dtw/__pycache__/visualization.cpython-310.pyc ADDED
Binary file (5.64 kB). View file
 
musc/dtw/__pycache__/visualization.cpython-39.pyc ADDED
Binary file (5.58 kB). View file
 
musc/dtw/anchor.py ADDED
@@ -0,0 +1,147 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from numba import jit
2
+ import numpy as np
3
+ from typing import Tuple
4
+
5
+
6
+ def project_alignment_on_a_new_feature_rate(alignment: np.ndarray,
7
+ feature_rate_old: int,
8
+ feature_rate_new: int,
9
+ cost_matrix_size_old: tuple = (),
10
+ cost_matrix_size_new: tuple = ()) -> np.ndarray:
11
+ """Projects an alignment computed for a cost matrix on a certain
12
+ feature resolution on a cost matrix having a different feature
13
+ resolution.
14
+
15
+ Parameters
16
+ ----------
17
+ alignment : np.ndarray [shape=(2, N)]
18
+ Alignment matrix
19
+
20
+ feature_rate_old : int
21
+ Feature rate of the old cost matrix
22
+
23
+ feature_rate_new : int
24
+ Feature rate of the new cost matrix
25
+
26
+ cost_matrix_size_old : tuple
27
+ Size of the old cost matrix. Possibly needed to deal with border cases
28
+
29
+ cost_matrix_size_new : tuple
30
+ Size of the new cost matrix. Possibly needed to deal with border cases
31
+
32
+ Returns
33
+ -------
34
+ np.ndarray [shape=(2, N)]
35
+ Anchor sequence for the new cost matrix
36
+ """
37
+ # Project the alignment on the new feature rate
38
+ fac = feature_rate_new / feature_rate_old
39
+ anchors = np.round(alignment * fac) + 1
40
+
41
+ # In case the sizes of the cost matrices are given explicitly and the
42
+ # alignment specifies to align the first and last elements, handle this case
43
+ # separately since this might cause problems in the general projection
44
+ # procedure.
45
+ if cost_matrix_size_old is not None and cost_matrix_size_new is not None:
46
+ if np.array_equal(alignment[:, 0], np.array([0, 0])):
47
+ anchors[:, 0] = np.array([1, 1])
48
+
49
+ if np.array_equal(alignment[:, -1], np.array(cost_matrix_size_old) - 1):
50
+ anchors[:, -1] = np.array(cost_matrix_size_new)
51
+
52
+ return anchors - 1
53
+
54
+
55
+ def derive_anchors_from_projected_alignment(projected_alignment: np.ndarray,
56
+ threshold: int) -> np.ndarray:
57
+ """Derive anchors from a projected alignment such that the area of the rectangle
58
+ defined by two subsequent anchors a1 and a2 is below a given threshold.
59
+
60
+ Parameters
61
+ ----------
62
+ projected_alignment : np.ndarray [shape=(2, N)]
63
+ Projected alignment array
64
+
65
+ threshold : int
66
+ Maximum area of the constraint rectangle
67
+
68
+ Returns
69
+ -------
70
+ anchors_res : np.ndarray [shape=(2, M)]
71
+ Resulting anchor sequence
72
+ """
73
+ L = projected_alignment.shape[1]
74
+
75
+ a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1)
76
+ a2 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1)
77
+
78
+ if __compute_area(a1, a2) <= threshold:
79
+ anchors_res = np.concatenate([a1, a2], axis=1)
80
+ elif L > 2:
81
+ center = int(np.floor(L/2 + 1))
82
+
83
+ a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1)
84
+ a2 = np.array(projected_alignment[:, center - 1], copy=True).reshape(-1, 1)
85
+ a3 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1)
86
+
87
+ if __compute_area(a1, a2) > threshold:
88
+ anchors_1 = derive_anchors_from_projected_alignment(projected_alignment[:, 0:center], threshold)
89
+ else:
90
+ anchors_1 = np.concatenate([a1, a2], axis=1)
91
+
92
+ if __compute_area(a2, a3) > threshold:
93
+ anchors_2 = derive_anchors_from_projected_alignment(projected_alignment[:, center - 1:], threshold)
94
+ else:
95
+ anchors_2 = np.concatenate([a2, a3], axis=1)
96
+
97
+ anchors_res = np.concatenate([anchors_1, anchors_2[:, 1:]], axis=1)
98
+ else:
99
+ if __compute_area(a1, a2) > threshold:
100
+ print('Only two anchor points are given which do not fulfill the constraint.')
101
+ anchors_res = np.concatenate([a1, a2], axis=1)
102
+
103
+ return anchors_res
104
+
105
+
106
+ def derive_neighboring_anchors(warping_path: np.ndarray,
107
+ anchor_indices: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
108
+ """Compute anchor points in the neighborhood of previous anchor points.
109
+
110
+ Parameters
111
+ ----------
112
+ warping_path : np.ndarray [shape=(2, N)]
113
+ Warping path
114
+
115
+ anchor_indices : np.ndarray
116
+ Indices corresponding to the anchor points in the ``warping_path``
117
+
118
+ Returns
119
+ -------
120
+ neighboring_anchors : np.ndarray [shape=(2, N-1)]
121
+ Sequence of neighboring anchors
122
+
123
+ neighboring_anchor_indices : np.ndarray
124
+ Indices into ``warping path`` corresponding to ``neighboring_anchors``
125
+ """
126
+ L = anchor_indices.shape[0]
127
+ neighboring_anchor_indices = np.zeros(L-1, dtype=int)
128
+ neighboring_anchors = np.zeros((2, L-1), dtype=int)
129
+
130
+ for k in range(1, L):
131
+ i1 = anchor_indices[k-1]
132
+ i2 = anchor_indices[k]
133
+
134
+ neighboring_anchor_indices[k-1] = i1 + np.floor((i2 - i1) / 2)
135
+ neighboring_anchors[:, k-1] = warping_path[:, neighboring_anchor_indices[k - 1]]
136
+
137
+ return neighboring_anchors, neighboring_anchor_indices
138
+
139
+
140
+ @jit(nopython=True)
141
+ def __compute_area(a: tuple,
142
+ b: tuple):
143
+ """Computes the area between two points, given as tuples"""
144
+ return (b[0] - a[0] + 1) * (b[1] - a[1] + 1)
145
+
146
+
147
+
musc/dtw/core.py ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import librosa
2
+ from numba import jit
3
+ import numpy as np
4
+
5
+
6
+ @jit(nopython=True, cache=True)
7
+ def __C_to_DE(C: np.ndarray = None,
8
+ dn: np.ndarray = np.array([1, 1, 0], np.int64),
9
+ dm: np.ndarray = np.array([1, 0, 1], np.int64),
10
+ dw: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
11
+ sub_sequence: bool = False) -> (np.ndarray, np.ndarray):
12
+ """This function computes the accumulated cost matrix D and the step index
13
+ matrix E.
14
+
15
+ Parameters
16
+ ----------
17
+ C : np.ndarray (np.float32 / np.float64) [shape=(N, M)]
18
+ Cost matrix
19
+
20
+ dn : np.ndarray (np.int64) [shape=(1, S)]
21
+ Integer array defining valid steps (N direction of C), default: [1, 1, 0]
22
+
23
+ dm : np.ndarray (np.int64) [shape=(1, S)]
24
+ Integer array defining valid steps (M direction of C), default: [1, 0, 1]
25
+
26
+ dw : np.ndarray (np.float64) [shape=(1, S)]
27
+ Double array defining the weight of the each step, default: [1.0, 1.0, 1.0]
28
+
29
+ sub_sequence : bool
30
+ Set `True` for SubSequence DTW, default: False
31
+
32
+ Returns
33
+ -------
34
+ D : np.ndarray (np.float64) [shape=(N, M)]
35
+ Accumulated cost matrix of type double
36
+
37
+ E : np.ndarray (np.int64) [shape=(N, M)]
38
+ Step index matrix.
39
+ E[n, m] holds the index of the step take to determine the value of D[n, m].
40
+ If E[n, m] is zero, no valid step was possible.
41
+ NaNs in the cost matrix are preserved, invalid fields in the cost matrix are NaNs.
42
+ """
43
+ if C is None:
44
+ raise ValueError('C must be a 2D numpy array.')
45
+
46
+ N, M = C.shape
47
+ S = dn.size
48
+
49
+ if S != dm.size or S != dw.size:
50
+ raise ValueError('The parameters dn,dm, and dw must be of equal length.')
51
+
52
+ # calc bounding box size of steps
53
+ sbbn = np.max(dn)
54
+ sbbm = np.max(dm)
55
+
56
+ # initialize E
57
+ E = np.zeros((N, M), np.int64) - 1
58
+
59
+ # initialize extended D matrix
60
+ D = np.ones((sbbn + N, sbbm + M), np.float64) * np.inf
61
+
62
+ if sub_sequence:
63
+ for m in range(M):
64
+ D[sbbn, sbbm + m] = C[0, m]
65
+ else:
66
+ D[sbbn, sbbm] = C[0, 0]
67
+
68
+ # accumulate
69
+ for m in range(sbbm, M + sbbm):
70
+ for n in range(sbbn, N + sbbn):
71
+ for s in range(S):
72
+ cost = D[n - dn[s], m - dm[s]] + C[n - sbbn, m - sbbm] * dw[s]
73
+ if cost < D[n, m]:
74
+ D[n, m] = cost
75
+ E[n - sbbn, m - sbbm] = s
76
+
77
+ D = D[sbbn: N + sbbn, sbbm: M + sbbm]
78
+
79
+ return D, E
80
+
81
+
82
+ @jit(nopython=True, cache=True)
83
+ def __E_to_warping_path(E: np.ndarray,
84
+ dn: np.ndarray = np.array([1, 1, 0], np.int64),
85
+ dm: np.ndarray = np.array([1, 0, 1], np.int64),
86
+ sub_sequence: bool = False,
87
+ end_index: int = -1) -> np.ndarray:
88
+ """This function computes a warping path based on the provided matrix E
89
+ and the allowed steps.
90
+
91
+ Parameters
92
+ ----------
93
+ E : np.ndarray (np.int64) [shape=(N, M)]
94
+ Step index matrix
95
+
96
+ dn : np.ndarray (np.int64) [shape=(1, S)]
97
+ Integer array defining valid steps (N direction of C), default: [1, 1, 0]
98
+
99
+ dm : np.ndarray (np.int64) [shape=(1, S)]
100
+ Integer array defining valid steps (M direction of C), default: [1, 0, 1]
101
+
102
+ sub_sequence : bool
103
+ Set `True` for SubSequence DTW, default: False
104
+
105
+ end_index : int
106
+ In case of SubSequence DTW
107
+
108
+ Returns
109
+ -------
110
+ warping_path : np.ndarray (np.int64) [shape=(2, M)]
111
+ Resulting optimal warping path
112
+ """
113
+ N, M = E.shape
114
+
115
+ if not sub_sequence and end_index == -1:
116
+ end_index = M - 1
117
+
118
+ m = end_index
119
+ n = N - 1
120
+
121
+ warping_path = np.zeros((2, n + m + 1))
122
+
123
+ index = 0
124
+
125
+ def _loop(m, n, index):
126
+ warping_path[:, index] = np.array([n, m])
127
+ step_index = E[n, m]
128
+ m -= dm[step_index]
129
+ n -= dn[step_index]
130
+ index += 1
131
+ return m, n, index
132
+
133
+ if sub_sequence:
134
+ while n > 0:
135
+ m, n, index = _loop(m, n, index)
136
+ else:
137
+ while m > 0 or n > 0:
138
+ m, n, index = _loop(m, n, index)
139
+
140
+ warping_path[:, index] = np.array([n, m])
141
+ warping_path = warping_path[:, index::-1]
142
+
143
+ return warping_path
144
+
145
+
146
+ def compute_warping_path(C: np.ndarray,
147
+ step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int64),
148
+ step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
149
+ implementation: str = 'synctoolbox'):
150
+ """Applies DTW on cost matrix C.
151
+
152
+ Parameters
153
+ ----------
154
+ C : np.ndarray (np.float32 / np.float64) [shape=(N, M)]
155
+ Cost matrix
156
+
157
+ step_sizes : np.ndarray (np.int64) [shape=(2, S)]
158
+ Array of step sizes
159
+
160
+ step_weights : np.ndarray (np.float64) [shape=(2, S)]
161
+ Array of step weights
162
+
163
+ implementation: str
164
+ Choose among ``synctoolbox`` and ``librosa``. (default: ``synctoolbox``)
165
+
166
+ Returns
167
+ -------
168
+ D : np.ndarray (np.float64) [shape=(N, M)]
169
+ Accumulated cost matrix
170
+
171
+ E : np.ndarray (np.int64) [shape=(N, M)]
172
+ Step index matrix
173
+
174
+ wp : np.ndarray (np.int64) [shape=(2, M)]
175
+ Warping path
176
+ """
177
+ if implementation == 'librosa':
178
+ D, wp, E = librosa.sequence.dtw(C=C,
179
+ step_sizes_sigma=step_sizes,
180
+ weights_add=np.array([0, 0, 0]),
181
+ weights_mul=step_weights,
182
+ return_steps=True,
183
+ subseq=False)
184
+ wp = wp[::-1].T
185
+
186
+ elif implementation == 'synctoolbox':
187
+ dn = step_sizes[:, 0]
188
+ dm = step_sizes[:, 1]
189
+
190
+ D, E = __C_to_DE(C,
191
+ dn=dn,
192
+ dm=dm,
193
+ dw=step_weights,
194
+ sub_sequence=False)
195
+
196
+ wp = __E_to_warping_path(E=E,
197
+ dn=dn,
198
+ dm=dm,
199
+ sub_sequence=False)
200
+
201
+ else:
202
+ raise NotImplementedError(f'No implementation found called {implementation}')
203
+
204
+ return D, E, wp
205
+
musc/dtw/cost.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from numba import jit
2
+ import numpy as np
3
+ from sklearn.metrics.pairwise import euclidean_distances
4
+
5
+ #@jit(nopython=True)
6
+ def cosine_distance(f1, f2, cos_meas_max=2.0, cos_meas_min=1.0):
7
+ """For all pairs of vectors f1' and f2' in f1 and f2, computes 1 - (f1.f2),
8
+ where '.' is the dot product, and rescales the results to lie in the
9
+ range [cos_meas_min, cos_meas_max].
10
+ Corresponds to regular cosine distance if f1' and f2' are normalized and
11
+ cos_meas_min==0.0 and cos_meas_max==1.0."""
12
+ return (1 - f1.T @ f2) * (cos_meas_max - cos_meas_min) + cos_meas_min
13
+
14
+
15
+ #@jit(nopython=True)
16
+ def euclidean_distance(f1, f2, l2_meas_max=1.0, l2_meas_min=0.0):
17
+ """Computes euclidean distances between the vectors in f1 and f2, and
18
+ rescales the results to lie in the range [cos_meas_min, cos_meas_max]."""
19
+
20
+ #S1 = np.zeros((f1.shape[1], f2.shape[1]))
21
+ #for n in range(f2.shape[1]):
22
+ # S1[:, n] = np.sqrt(np.sum((f1.T - f2[:, n]) ** 2, axis=1))
23
+ S1 = euclidean_distances(f1.T, f2.T)
24
+
25
+ return S1 * (l2_meas_max - l2_meas_min) + l2_meas_min
26
+
27
+
28
+ def compute_high_res_cost_matrix(f_chroma1: np.ndarray,
29
+ f_chroma2: np.ndarray,
30
+ f_onset1: np.ndarray,
31
+ f_onset2: np.ndarray,
32
+ weights: np.ndarray = np.array([1.0, 1.0]),
33
+ cos_meas_min: float = 1.0,
34
+ cos_meas_max: float = 2.0,
35
+ l2_meas_min: float = 0.0,
36
+ l2_meas_max: float = 1.0):
37
+ """Computes cost matrix of two sequences using two feature matrices
38
+ for each sequence. Cosine distance is used for the chroma sequences and
39
+ euclidean distance is used for the DLNCO sequences.
40
+
41
+ Parameters
42
+ ----------
43
+ f_chroma1 : np.ndarray [shape=(12, N)]
44
+ Chroma feature matrix of the first sequence (assumed to be normalized).
45
+
46
+ f_chroma2 : np.ndarray [shape=(12, M)]
47
+ Chroma feature matrix of the second sequence (assumed to be normalized).
48
+
49
+ f_onset1 : np.ndarray [shape=(12, N)]
50
+ DLNCO feature matrix of the first sequence
51
+
52
+ f_onset2 : np.ndarray [shape=(12, M)]
53
+ DLNCO feature matrix of the second sequence
54
+
55
+ weights : np.ndarray [shape=[2,]]
56
+ Weights array for the high-resolution cost computation.
57
+ weights[0] * cosine_distance + weights[1] * euclidean_distance
58
+
59
+ cos_meas_min : float
60
+ Cosine distances are shifted to be at least ``cos_meas_min``
61
+
62
+ cos_meas_max : float
63
+ Cosine distances are scaled to be at most ``cos_meas_max``
64
+
65
+ l2_meas_min : float
66
+ Euclidean distances are shifted to be at least ``l2_meas_min``
67
+
68
+ l2_meas_max : float
69
+ Euclidean distances are scaled to be at most ``l2_meas_max``
70
+
71
+ Returns
72
+ -------
73
+ C: np.ndarray [shape=(N, M)]
74
+ Cost matrix
75
+ """
76
+ cos_dis = cosine_distance(f_chroma1, f_chroma2, cos_meas_min=cos_meas_min, cos_meas_max=cos_meas_max)
77
+ euc_dis = euclidean_distance(f_onset1, f_onset2, l2_meas_min=l2_meas_min, l2_meas_max=l2_meas_max)
78
+
79
+ return weights[0] * cos_dis + weights[1] * euc_dis
80
+
musc/dtw/mrmsdtw.py ADDED
@@ -0,0 +1,616 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from numba import jit
2
+ import numpy as np
3
+ import time
4
+ from typing import List, Tuple, Optional
5
+
6
+ from .anchor import derive_anchors_from_projected_alignment, derive_neighboring_anchors, \
7
+ project_alignment_on_a_new_feature_rate
8
+ from .utils import build_path_from_warping_paths, compute_cost_matrices_between_anchors, smooth_downsample_feature, normalize_feature, compute_warping_paths_from_cost_matrices, find_anchor_indices_in_warping_path
9
+ from .visualization import sync_visualize_step1, sync_visualize_step2
10
+
11
+
12
+
13
+ def sync_via_mrmsdtw_with_anchors(f_chroma1: np.ndarray,
14
+ f_chroma2: np.ndarray,
15
+ f_onset1: np.ndarray = None,
16
+ f_onset2: np.ndarray = None,
17
+ input_feature_rate: float = 50,
18
+ step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32),
19
+ step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
20
+ threshold_rec: int = 10000,
21
+ win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]),
22
+ downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]),
23
+ verbose: bool = False,
24
+ dtw_implementation: str = 'synctoolbox',
25
+ normalize_chroma: bool = True,
26
+ chroma_norm_ord: int = 2,
27
+ chroma_norm_threshold: float = 0.001,
28
+ visualization_title: str = "MrMsDTW result",
29
+ anchor_pairs: List[Tuple] = None,
30
+ linear_inp_idx: List[int] = [],
31
+ alpha=0.5) -> np.ndarray:
32
+ """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features.
33
+ MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint
34
+ regions defined by the alignment found on the previous, coarser level.
35
+ If onset features are provided, these are used on the finest level in addition to chroma
36
+ to provide higher synchronization accuracy.
37
+
38
+ Parameters
39
+ ----------
40
+ f_chroma1 : np.ndarray [shape=(12, N)]
41
+ Chroma feature matrix of the first sequence
42
+
43
+ f_chroma2 : np.ndarray [shape=(12, M)]
44
+ Chroma feature matrix of the second sequence
45
+
46
+ f_onset1 : np.ndarray [shape=(L, N)]
47
+ Onset feature matrix of the first sequence (optional, default: None)
48
+
49
+ f_onset2 : np.ndarray [shape=(L, M)]
50
+ Onset feature matrix of the second sequence (optional, default: None)
51
+
52
+ input_feature_rate: int
53
+ Input feature rate of the chroma features (default: 50)
54
+
55
+ step_sizes: np.ndarray
56
+ DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
57
+
58
+ step_weights: np.ndarray
59
+ DTW step weights (np.array([1.0, 1.0, 1.0]))
60
+
61
+ threshold_rec: int
62
+ Defines the maximum area that is spanned by the rectangle of two
63
+ consecutive elements in the alignment (default: 10000)
64
+
65
+ win_len_smooth : np.ndarray
66
+ Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1]))
67
+
68
+ downsamp_smooth : np.ndarray
69
+ Downsampling factors (default: np.array([50, 25, 5, 1]))
70
+
71
+ verbose : bool
72
+ Set `True` for visualization (default: False)
73
+
74
+ dtw_implementation : str
75
+ DTW implementation, librosa or synctoolbox (default: synctoolbox)
76
+
77
+ normalize_chroma : bool
78
+ Set `True` to normalize input chroma features after each downsampling
79
+ and smoothing operation.
80
+
81
+ chroma_norm_ord: int
82
+ Order of chroma normalization, relevant if ``normalize_chroma`` is True.
83
+ (default: 2)
84
+
85
+ chroma_norm_threshold: float
86
+ If the norm falls below threshold for a feature vector, then the
87
+ normalized feature vector is set to be the unit vector. Relevant, if
88
+ ``normalize_chroma`` is True (default: 0.001)
89
+
90
+ visualization_title : str
91
+ Title for the visualization plots. Only relevant if 'verbose' is True
92
+ (default: "MrMsDTW result")
93
+
94
+ anchor_pairs: List[Tuple]
95
+ Anchor pairs given in seconds. Note that
96
+ * (0, 0) and (<audio-len1>, <audio-len2>) are not allowed.
97
+ * Anchors must be monotonously increasing.
98
+
99
+ linear_inp_idx: List[int]
100
+ List of the indices of intervals created by anchor pairs, for which
101
+ MrMsDTW shouldn't be run, e.g., if the interval only involves silence.
102
+
103
+ 0 ap1 ap2 ap3
104
+ | | | |
105
+ | idx0 | idx1 | idx2 | idx3 OR idx-1
106
+ | | | |
107
+
108
+ Note that index -1 corresponds to the last interval, which begins with
109
+ the last anchor pair until the end of the audio files.
110
+
111
+ alpha: float
112
+ Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm.
113
+ C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5)
114
+
115
+ Returns
116
+ -------
117
+ wp : np.ndarray [shape=(2, T)]
118
+ Resulting warping path which indicates synchronized indices.
119
+ """
120
+ if anchor_pairs is None:
121
+ wp = sync_via_mrmsdtw(f_chroma1=f_chroma1,
122
+ f_chroma2=f_chroma2,
123
+ f_onset1=f_onset1,
124
+ f_onset2=f_onset2,
125
+ input_feature_rate=input_feature_rate,
126
+ step_sizes=step_sizes,
127
+ step_weights=step_weights,
128
+ threshold_rec=threshold_rec,
129
+ win_len_smooth=win_len_smooth,
130
+ downsamp_smooth=downsamp_smooth,
131
+ verbose=verbose,
132
+ dtw_implementation=dtw_implementation,
133
+ normalize_chroma=normalize_chroma,
134
+ chroma_norm_ord=chroma_norm_ord,
135
+ chroma_norm_threshold=chroma_norm_threshold,
136
+ visualization_title=visualization_title,
137
+ alpha=alpha)
138
+ else:
139
+ # constant_intervals = [((0, x1), (0, y1), False),
140
+ # ((x1, x2), (y1, y2), True),
141
+ # ((x2, -1), (y2, -1), False)]
142
+ wp = None
143
+
144
+ if verbose:
145
+ print('Anchor points are given!')
146
+
147
+ __check_anchor_pairs(anchor_pairs, f_chroma1.shape[1], f_chroma2.shape[1], input_feature_rate)
148
+
149
+ # Add ending as the anchor point
150
+ anchor_pairs.append((-1, -1))
151
+
152
+ prev_a1 = 0
153
+ prev_a2 = 0
154
+
155
+ for idx, anchor_pair in enumerate(anchor_pairs):
156
+ cur_a1, cur_a2 = anchor_pair
157
+
158
+ # Split the features
159
+ f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split = __split_features(f_chroma1,
160
+ f_onset1,
161
+ f_chroma2,
162
+ f_onset2,
163
+ cur_a1,
164
+ cur_a2,
165
+ prev_a1,
166
+ prev_a2,
167
+ input_feature_rate)
168
+
169
+ if idx in linear_inp_idx or idx == len(anchor_pairs) - 1 and -1 in linear_inp_idx:
170
+ # Generate a diagonal warping path, if the algorithm is not supposed to executed.
171
+ # A typical scenario is the silence breaks which are enclosed by two anchor points.
172
+ if verbose:
173
+ print('A diagonal warping path is generated for the interval \n\t Feature sequence 1: %.2f - %.2f'
174
+ '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2))
175
+ wp_cur = __diagonal_warping_path(f_chroma1_split, f_chroma2_split)
176
+
177
+ else:
178
+ if verbose:
179
+ if cur_a1 != -1 and cur_a2 != -1:
180
+ print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - %.2f'
181
+ '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2))
182
+ else:
183
+ print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - end'
184
+ '\n\t Feature sequence 2: %.2f - end\n' % (prev_a1, prev_a2))
185
+ wp_cur = sync_via_mrmsdtw(f_chroma1=f_chroma1_split,
186
+ f_chroma2=f_chroma2_split,
187
+ f_onset1=f_onset1_split,
188
+ f_onset2=f_onset2_split,
189
+ input_feature_rate=input_feature_rate,
190
+ step_sizes=step_sizes,
191
+ step_weights=step_weights,
192
+ threshold_rec=threshold_rec,
193
+ win_len_smooth=win_len_smooth,
194
+ downsamp_smooth=downsamp_smooth,
195
+ verbose=verbose,
196
+ dtw_implementation=dtw_implementation,
197
+ normalize_chroma=normalize_chroma,
198
+ chroma_norm_ord=chroma_norm_ord,
199
+ chroma_norm_threshold=chroma_norm_threshold,
200
+ alpha=alpha)
201
+
202
+ if wp is None:
203
+ wp = np.array(wp_cur, copy=True)
204
+
205
+ # Concatenate warping paths
206
+ else:
207
+ wp = np.concatenate([wp, wp_cur + wp[:, -1].reshape(2, 1) + 1], axis=1)
208
+
209
+ prev_a1 = cur_a1
210
+ prev_a2 = cur_a2
211
+
212
+ anchor_pairs.pop()
213
+
214
+ return wp
215
+
216
+
217
+ def sync_via_mrmsdtw(f_chroma1: np.ndarray,
218
+ f_chroma2: np.ndarray,
219
+ f_onset1: np.ndarray = None,
220
+ f_onset2: np.ndarray = None,
221
+ input_feature_rate: float = 50,
222
+ step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32),
223
+ step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
224
+ threshold_rec: int = 10000,
225
+ win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]),
226
+ downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]),
227
+ verbose: bool = False,
228
+ dtw_implementation: str = 'synctoolbox',
229
+ normalize_chroma: bool = True,
230
+ chroma_norm_ord: int = 2,
231
+ chroma_norm_threshold: float = 0.001,
232
+ visualization_title: str = "MrMsDTW result",
233
+ alpha=0.5) -> np.ndarray:
234
+ """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features.
235
+ MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint
236
+ regions defined by the alignment found on the previous, coarser level.
237
+ If onset features are provided, these are used on the finest level in addition to chroma
238
+ to provide higher synchronization accuracy.
239
+
240
+ Parameters
241
+ ----------
242
+ f_chroma1 : np.ndarray [shape=(12, N)]
243
+ Chroma feature matrix of the first sequence
244
+
245
+ f_chroma2 : np.ndarray [shape=(12, M)]
246
+ Chroma feature matrix of the second sequence
247
+
248
+ f_onset1 : np.ndarray [shape=(L, N)]
249
+ Onset feature matrix of the first sequence (optional, default: None)
250
+
251
+ f_onset2 : np.ndarray [shape=(L, M)]
252
+ Onset feature matrix of the second sequence (optional, default: None)
253
+
254
+ input_feature_rate: int
255
+ Input feature rate of the chroma features (default: 50)
256
+
257
+ step_sizes: np.ndarray
258
+ DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
259
+
260
+ step_weights: np.ndarray
261
+ DTW step weights (np.array([1.0, 1.0, 1.0]))
262
+
263
+ threshold_rec: int
264
+ Defines the maximum area that is spanned by the rectangle of two
265
+ consecutive elements in the alignment (default: 10000)
266
+
267
+ win_len_smooth : np.ndarray
268
+ Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1]))
269
+
270
+ downsamp_smooth : np.ndarray
271
+ Downsampling factors (default: np.array([50, 25, 5, 1]))
272
+
273
+ verbose : bool
274
+ Set `True` for visualization (default: False)
275
+
276
+ dtw_implementation : str
277
+ DTW implementation, librosa or synctoolbox (default: synctoolbox)
278
+
279
+ normalize_chroma : bool
280
+ Set `True` to normalize input chroma features after each downsampling
281
+ and smoothing operation.
282
+
283
+ chroma_norm_ord: int
284
+ Order of chroma normalization, relevant if ``normalize_chroma`` is True.
285
+ (default: 2)
286
+
287
+ chroma_norm_threshold: float
288
+ If the norm falls below threshold for a feature vector, then the
289
+ normalized feature vector is set to be the unit vector. Relevant, if
290
+ ``normalize_chroma`` is True (default: 0.001)
291
+
292
+ visualization_title : str
293
+ Title for the visualization plots. Only relevant if 'verbose' is True
294
+ (default: "MrMsDTW result")
295
+
296
+ alpha: float
297
+ Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm.
298
+ C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5)
299
+
300
+ Returns
301
+ -------
302
+ alignment: np.ndarray [shape=(2, T)]
303
+ Resulting warping path which indicates synchronized indices.
304
+ """
305
+ # If onset features are given as input, high resolution MrMsDTW is activated.
306
+ high_res = False
307
+ if f_onset1 is not None and f_onset2 is not None:
308
+ high_res = True
309
+
310
+ if high_res and (f_chroma1.shape[1] != f_onset1.shape[1] or f_chroma2.shape[1] != f_onset2.shape[1]):
311
+ raise ValueError('Chroma and onset features must be of the same length.')
312
+
313
+ if downsamp_smooth[-1] != 1 or win_len_smooth[-1] != 1:
314
+ raise ValueError('The downsampling factor of the last iteration must be equal to 1, i.e.'
315
+ 'at the last iteration, it is computed at the input feature rate!')
316
+
317
+ num_iterations = win_len_smooth.shape[0]
318
+ cost_matrix_size_old = tuple()
319
+ feature_rate_old = input_feature_rate / downsamp_smooth[0]
320
+ alignment = None
321
+ total_computation_time = 0.0
322
+
323
+ # If the area is less than the threshold_rec, don't apply the multiscale DTW.
324
+ it = (num_iterations - 1) if __compute_area(f_chroma1, f_chroma2) < threshold_rec else 0
325
+
326
+ while it < num_iterations:
327
+ tic1 = time.perf_counter()
328
+
329
+ # Smooth and downsample given raw features
330
+ f_chroma1_cur, _ = smooth_downsample_feature(f_chroma1,
331
+ input_feature_rate=input_feature_rate,
332
+ win_len_smooth=win_len_smooth[it],
333
+ downsamp_smooth=downsamp_smooth[it])
334
+
335
+ f_chroma2_cur, feature_rate_new = smooth_downsample_feature(f_chroma2,
336
+ input_feature_rate=input_feature_rate,
337
+ win_len_smooth=win_len_smooth[it],
338
+ downsamp_smooth=downsamp_smooth[it])
339
+
340
+ if normalize_chroma:
341
+ f_chroma1_cur = normalize_feature(f_chroma1_cur,
342
+ norm_ord=chroma_norm_ord,
343
+ threshold=chroma_norm_threshold)
344
+
345
+ f_chroma2_cur = normalize_feature(f_chroma2_cur,
346
+ norm_ord=chroma_norm_ord,
347
+ threshold=chroma_norm_threshold)
348
+
349
+ # Project path onto new resolution
350
+ cost_matrix_size_new = (f_chroma1_cur.shape[1], f_chroma2_cur.shape[1])
351
+
352
+ if alignment is None:
353
+ # Initialize the alignment with the start and end frames of the feature sequence
354
+ anchors = np.array([[0, f_chroma1_cur.shape[1] - 1], [0, f_chroma2_cur.shape[1] - 1]])
355
+
356
+ else:
357
+ projected_alignment = project_alignment_on_a_new_feature_rate(alignment=alignment,
358
+ feature_rate_old=feature_rate_old,
359
+ feature_rate_new=feature_rate_new,
360
+ cost_matrix_size_old=cost_matrix_size_old,
361
+ cost_matrix_size_new=cost_matrix_size_new)
362
+
363
+ anchors = derive_anchors_from_projected_alignment(projected_alignment=projected_alignment,
364
+ threshold=threshold_rec)
365
+
366
+ # Cost matrix and warping path computation
367
+ if high_res and it == num_iterations - 1:
368
+ # Compute cost considering chroma and pitch onset features and alignment only in the last iteration,
369
+ # where the features are at the finest level.
370
+ cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
371
+ f_chroma2=f_chroma2_cur,
372
+ f_onset1=f_onset1,
373
+ f_onset2=f_onset2,
374
+ anchors=anchors,
375
+ alpha=alpha)
376
+
377
+ else:
378
+ cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
379
+ f_chroma2=f_chroma2_cur,
380
+ anchors=anchors,
381
+ alpha=alpha)
382
+
383
+ wp_list = compute_warping_paths_from_cost_matrices(cost_matrices_step1,
384
+ step_sizes=step_sizes,
385
+ step_weights=step_weights,
386
+ implementation=dtw_implementation)
387
+
388
+ # Concatenate warping paths
389
+ wp = build_path_from_warping_paths(warping_paths=wp_list,
390
+ anchors=anchors)
391
+
392
+ anchors_step1 = None
393
+ wp_step1 = None
394
+ num_rows_step1 = 0
395
+ num_cols_step1 = 0
396
+ ax = None
397
+
398
+ toc1 = time.perf_counter()
399
+ if verbose and cost_matrices_step1 is not None:
400
+ anchors_step1 = np.array(anchors, copy=True)
401
+ wp_step1 = np.array(wp, copy=True)
402
+ num_rows_step1, num_cols_step1 = np.sum(np.array([dtw_mat.shape for dtw_mat in cost_matrices_step1], int),
403
+ axis=0)
404
+ fig, ax = sync_visualize_step1(cost_matrices_step1,
405
+ num_rows_step1,
406
+ num_cols_step1,
407
+ anchors,
408
+ wp)
409
+ tic2 = time.perf_counter()
410
+
411
+ # Compute neighboring anchors and refine alignment using local path between neighboring anchors
412
+ anchor_indices_in_warping_path = find_anchor_indices_in_warping_path(wp, anchors=anchors)
413
+
414
+ # Compute neighboring anchors for refinement
415
+ neighboring_anchors, neighboring_anchor_indices = \
416
+ derive_neighboring_anchors(wp, anchor_indices=anchor_indices_in_warping_path)
417
+
418
+ if neighboring_anchor_indices.shape[0] > 1 \
419
+ and it == num_iterations - 1 and high_res:
420
+ cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
421
+ f_chroma2=f_chroma2_cur,
422
+ f_onset1=f_onset1,
423
+ f_onset2=f_onset2,
424
+ anchors=neighboring_anchors,
425
+ alpha=alpha)
426
+
427
+ else:
428
+ cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
429
+ f_chroma2=f_chroma2_cur,
430
+ anchors=neighboring_anchors,
431
+ alpha=alpha)
432
+
433
+ wp_list_refine = compute_warping_paths_from_cost_matrices(cost_matrices=cost_matrices_step2,
434
+ step_sizes=step_sizes,
435
+ step_weights=step_weights,
436
+ implementation=dtw_implementation)
437
+
438
+ wp = __refine_wp(wp, anchors, wp_list_refine, neighboring_anchors, neighboring_anchor_indices)
439
+
440
+ toc2 = time.perf_counter()
441
+ computation_time_it = toc2 - tic2 + toc1 - tic1
442
+ total_computation_time += computation_time_it
443
+
444
+ alignment = wp
445
+ feature_rate_old = feature_rate_new
446
+ cost_matrix_size_old = cost_matrix_size_new
447
+
448
+ if verbose and cost_matrices_step2 is not None:
449
+ sync_visualize_step2(ax,
450
+ cost_matrices_step2,
451
+ wp,
452
+ wp_step1,
453
+ num_rows_step1,
454
+ num_cols_step1,
455
+ anchors_step1,
456
+ neighboring_anchors,
457
+ plot_title=f"{visualization_title} - Level {it + 1}")
458
+ print('Level {} computation time: {:.2f} seconds'.format(it, computation_time_it))
459
+
460
+ it += 1
461
+
462
+ if verbose:
463
+ print('Computation time of MrMsDTW: {:.2f} seconds'.format(total_computation_time))
464
+
465
+ return alignment
466
+
467
+
468
+ def __diagonal_warping_path(f1: np.ndarray,
469
+ f2: np.ndarray) -> np.ndarray:
470
+ """Generates a diagonal warping path given two feature sequences.
471
+
472
+ Parameters
473
+ ----------
474
+ f1: np.ndarray [shape=(_, N)]
475
+ First feature sequence
476
+
477
+ f2: np.ndarray [shape=(_, M)]
478
+ Second feature sequence
479
+
480
+ Returns
481
+ -------
482
+ np.ndarray: Diagonal warping path [shape=(2, T)]
483
+ """
484
+ max_size = np.maximum(f1.shape[1], f2.shape[1])
485
+ min_size = np.minimum(f1.shape[1], f2.shape[1])
486
+
487
+ if min_size == 1:
488
+ return np.array([max_size - 1, 0]).reshape(-1, 1)
489
+
490
+ elif max_size == f1.shape[1]:
491
+ return np.array([np.round(np.linspace(0, max_size - 1, min_size)), np.linspace(0, min_size - 1, min_size)])
492
+
493
+ else:
494
+ return np.array([np.linspace(0, min_size-1, min_size), np.round(np.linspace(0, max_size - 1, min_size))])
495
+
496
+
497
+ @jit(nopython=True)
498
+ def __compute_area(f1, f2):
499
+ """Computes the area of the cost matrix given two feature sequences
500
+
501
+ Parameters
502
+ ----------
503
+ f1: np.ndarray
504
+ First feature sequence
505
+
506
+ f2: np.ndarray
507
+ Second feature sequence
508
+
509
+ Returns
510
+ -------
511
+ int: Area of the cost matrix
512
+ """
513
+ return f1.shape[1] * f2.shape[1]
514
+
515
+
516
+ def __split_features(f_chroma1: np.ndarray,
517
+ f_onset1: np.ndarray,
518
+ f_chroma2: np.ndarray,
519
+ f_onset2: np.ndarray,
520
+ cur_a1: float,
521
+ cur_a2: float,
522
+ prev_a1: float,
523
+ prev_a2: float,
524
+ feature_rate: int) -> Tuple[np.ndarray, Optional[np.ndarray], np.ndarray, Optional[np.ndarray]]:
525
+
526
+ if cur_a1 == -1:
527
+ f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):]
528
+ if f_onset1 is not None:
529
+ f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):]
530
+ else:
531
+ f_onset1_split = None
532
+
533
+ else:
534
+ # Split the features
535
+ f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)]
536
+ if f_onset1 is not None:
537
+ f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)]
538
+ else:
539
+ f_onset1_split = None
540
+
541
+ if cur_a2 == -1:
542
+ f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):]
543
+ if f_onset2 is not None:
544
+ f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):]
545
+ else:
546
+ f_onset2_split = None
547
+
548
+ else:
549
+ f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)]
550
+ if f_onset2 is not None:
551
+ f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)]
552
+ else:
553
+ f_onset2_split = None
554
+
555
+ return f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split
556
+
557
+
558
+ def __refine_wp(wp: np.ndarray,
559
+ anchors: np.ndarray,
560
+ wp_list_refine: List,
561
+ neighboring_anchors: np.ndarray,
562
+ neighboring_anchor_indices: np.ndarray) -> np.ndarray:
563
+ wp_length = wp[:, neighboring_anchor_indices[-1]:].shape[1]
564
+ last_list = wp[:, neighboring_anchor_indices[-1]:] - np.tile(
565
+ wp[:, neighboring_anchor_indices[-1]].reshape(-1, 1), wp_length)
566
+ wp_list_tmp = [wp[:, :neighboring_anchor_indices[0] + 1]] + wp_list_refine + [last_list]
567
+ A_tmp = np.concatenate([anchors[:, 0].reshape(-1, 1), neighboring_anchors, anchors[:, -1].reshape(-1, 1)],
568
+ axis=1)
569
+ wp_res = build_path_from_warping_paths(warping_paths=wp_list_tmp,
570
+ anchors=A_tmp)
571
+
572
+ return wp_res
573
+
574
+
575
+ def __check_anchor_pairs(anchor_pairs: List,
576
+ f_len1: int,
577
+ f_len2: int,
578
+ feature_rate: int):
579
+ """Ensures that the anchors satisfy the conditions
580
+
581
+ Parameters
582
+ ----------
583
+ anchor_pairs: List[Tuple]
584
+ List of anchor pairs
585
+
586
+ f_len1: int
587
+ Length of the first feature sequence
588
+
589
+ f_len2: int
590
+ Length of the second feature sequence
591
+
592
+ feature_rate: int
593
+ Input feature rate of the features
594
+ """
595
+ prev_a1 = 0
596
+ prev_a2 = 0
597
+ for anchor_pair in anchor_pairs:
598
+ a1, a2 = anchor_pair
599
+
600
+ if a1 <= 0 or a2 <= 0:
601
+ raise ValueError('Starting point must be a positive number!')
602
+
603
+ if a1 > f_len1 / feature_rate or a2 > f_len2 / feature_rate:
604
+ raise ValueError('Anchor points cannot be greater than the length of the input audio files!')
605
+
606
+ if a1 == f_len1 and a2 == f_len2:
607
+ raise ValueError('Both anchor points cannot be equal to the length of the audio files.')
608
+
609
+ if a1 == prev_a1 and a2 == prev_a2:
610
+ raise ValueError('Duplicate anchor pairs are not allowed!')
611
+
612
+ if a1 < prev_a1 or a2 < prev_a2:
613
+ raise ValueError('Anchor points must be monotonously increasing.')
614
+
615
+ prev_a1 = a1
616
+ prev_a2 = a2
musc/dtw/utils.py ADDED
@@ -0,0 +1,426 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ from typing import List
3
+ from numba import jit
4
+ import numpy as np
5
+ from scipy import signal
6
+ from typing import Tuple
7
+
8
+
9
+ from .core import compute_warping_path
10
+ from .cost import *
11
+
12
+
13
+
14
+ def compute_optimal_chroma_shift(f_chroma1: np.ndarray,
15
+ f_chroma2: np.ndarray,
16
+ chroma_transpositions: np.ndarray = np.arange(0, 12),
17
+ step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], int),
18
+ step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64)) -> int:
19
+ """Computes the optimal chroma shift which minimizes the DTW cost.
20
+
21
+ Parameters
22
+ ----------
23
+ f_chroma1 : np.ndarray [shape=(d_chroma, N_chroma)]
24
+ First chroma vector
25
+
26
+ f_chroma2 : np.ndarray [shape=(d_chroma, N_chroma)]
27
+ Second chroma vector
28
+
29
+ step_sizes : np.ndarray
30
+ DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
31
+
32
+ step_weights : np.ndarray
33
+ DTW step weights (default: np.array([1.0, 1.0, 1.0]))
34
+
35
+ chroma_transpositions : np.ndarray
36
+ Array of chroma shifts (default: np.arange(0, 11))
37
+
38
+ Returns
39
+ -------
40
+ opt_chroma_shift : int
41
+ Optimal chroma shift which minimizes the DTW cost.
42
+ """
43
+ if f_chroma2.shape[1] >= 9000 or f_chroma1.shape[1] >= 9000:
44
+ print("Warning: You are attempting to find the optimal chroma shift on sequences of length >= 9000. "
45
+ "This involves full DTW computation. You'll probably want to smooth and downsample your sequences to a"
46
+ " lower feature resolution before doing this.")
47
+ opt_chroma_shift = 0
48
+ dtw_cost = np.inf
49
+ for chroma_shift in chroma_transpositions:
50
+ cost_matrix_tmp = cosine_distance(f_chroma1, shift_chroma_vectors(f_chroma2, chroma_shift))
51
+ D, _, _ = compute_warping_path(cost_matrix_tmp, step_sizes=step_sizes, step_weights=step_weights)
52
+ if D[-1, -1] < dtw_cost:
53
+ dtw_cost = D[-1, -1]
54
+ opt_chroma_shift = chroma_shift
55
+
56
+ return opt_chroma_shift
57
+
58
+
59
+ def compute_warping_paths_from_cost_matrices(cost_matrices: List,
60
+ step_sizes: np.array = np.array([[1, 0], [0, 1], [1, 1]], int),
61
+ step_weights: np.array = np.array([1.0, 1.0, 1.0], np.float64),
62
+ implementation: str = 'synctoolbox') -> List:
63
+ """Computes a path via DTW on each matrix in cost_matrices
64
+
65
+ Parameters
66
+ ----------
67
+ cost_matrices : list
68
+ List of cost matrices
69
+
70
+ step_sizes : np.ndarray
71
+ DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
72
+
73
+ step_weights : np.ndarray
74
+ DTW step weights (default: np.array([1.0, 1.0, 1.0]))
75
+
76
+ implementation : str
77
+ Choose among 'synctoolbox' and 'librosa' (default: 'synctoolbox')
78
+
79
+ Returns
80
+ -------
81
+ wp_list : list
82
+ List of warping paths
83
+ """
84
+ return [compute_warping_path(C=C,
85
+ step_sizes=step_sizes,
86
+ step_weights=step_weights,
87
+ implementation=implementation)[2] for C in cost_matrices]
88
+
89
+
90
+ def compute_cost_matrices_between_anchors(f_chroma1: np.ndarray,
91
+ f_chroma2: np.ndarray,
92
+ anchors: np.ndarray,
93
+ f_onset1: np.ndarray = None,
94
+ f_onset2: np.ndarray = None,
95
+ alpha: float = 0.5) -> List:
96
+ """Computes cost matrices for the given features between subsequent
97
+ pairs of anchors points.
98
+
99
+ Parameters
100
+ ----------
101
+ f_chroma1 : np.ndarray [shape=(12, N)]
102
+ Chroma feature matrix of the first sequence
103
+
104
+ f_chroma2 : np.ndarray [shape=(12, M)]
105
+ Chroma feature matrix of the second sequence
106
+
107
+ anchors : np.ndarray [shape=(2, R)]
108
+ Anchor sequence
109
+
110
+ f_onset1 : np.ndarray [shape=(L, N)]
111
+ Onset feature matrix of the first sequence
112
+
113
+ f_onset2 : np.ndarray [shape=(L, M)]
114
+ Onset feature matrix of the second sequence
115
+
116
+ alpha: float
117
+ Alpha parameter to weight the cost functions.
118
+
119
+ Returns
120
+ -------
121
+ cost_matrices: list
122
+ List containing cost matrices
123
+ """
124
+ high_res = False
125
+ if f_onset1 is not None and f_onset2 is not None:
126
+ high_res = True
127
+
128
+ cost_matrices = list()
129
+ for k in range(anchors.shape[1] - 1):
130
+ a1 = np.array(anchors[:, k].astype(int), copy=True)
131
+ a2 = np.array(anchors[:, k + 1].astype(int), copy=True)
132
+
133
+ if high_res:
134
+ cost_matrices.append(compute_high_res_cost_matrix(f_chroma1[:, a1[0]: a2[0] + 1],
135
+ f_chroma2[:, a1[1]: a2[1] + 1],
136
+ f_onset1[:, a1[0]: a2[0] + 1],
137
+ f_onset2[:, a1[1]: a2[1] + 1],
138
+ weights=np.array([alpha, 1-alpha])))
139
+ else:
140
+ cost_matrices.append(cosine_distance(f_chroma1[:, a1[0]: a2[0] + 1],
141
+ f_chroma2[:, a1[1]: a2[1] + 1]))
142
+ return cost_matrices
143
+
144
+
145
+ def build_path_from_warping_paths(warping_paths: List,
146
+ anchors: np.ndarray = None) -> np.ndarray:
147
+ """The function builds a path from a given list of warping paths
148
+ and the anchors used to obtain these paths. The indices of the original
149
+ warping paths are adapted such that they cross the anchors.
150
+
151
+ Parameters
152
+ ----------
153
+ warping_paths : list
154
+ List of warping paths
155
+
156
+ anchors : np.ndarray [shape=(2, N)]
157
+ Anchor sequence
158
+
159
+ Returns
160
+ -------
161
+ path : np.ndarray [shape=(2, M)]
162
+ Merged path
163
+ """
164
+
165
+ if anchors is None:
166
+ # When no anchor points are given, we can construct them from the
167
+ # subpaths in the wp_list
168
+
169
+ # To do this, we assume that the first path's element is the starting
170
+ # anchor
171
+ anchors = warping_paths[0][:, 0]
172
+
173
+ # Retrieve the last element of each path
174
+ anchors_tmp = np.zeros(len(warping_paths), np.float32)
175
+ for idx, x in enumerate(warping_paths):
176
+ anchors_tmp[idx] = x[:, -1]
177
+
178
+ # Correct indices, such that the indices of the anchors are given on a
179
+ # common path. Each anchor a_l = [Nnew_[l+1];Mnew_[l+1]]
180
+ # Nnew_[l+1] = N_l + N_[l+1] -1
181
+ # Mnew_[l+1] = M_l + M_[l+1] -1
182
+
183
+ anchors_tmp = np.cumsum(anchors_tmp, axis=1)
184
+ anchors_tmp[:, 1:] = anchors_tmp[:, 1:] - [np.arange(1, anchors_tmp.shape[1]),
185
+ np.arange(1, anchors_tmp.shape[1])]
186
+
187
+ anchors = np.concatenate([anchors, anchors_tmp], axis=1)
188
+
189
+ L = len(warping_paths) + 1
190
+ path = None
191
+ wp = None
192
+
193
+ for anchor_idx in range(1, L):
194
+ anchor1 = anchors[:, anchor_idx - 1]
195
+ anchor2 = anchors[:, anchor_idx]
196
+
197
+ wp = np.array(warping_paths[anchor_idx - 1], copy=True)
198
+
199
+ # correct indices in warpingPath
200
+ wp += np.repeat(anchor1.reshape(-1, 1), wp.shape[1], axis=1).astype(wp.dtype)
201
+
202
+ # consistency checks
203
+ assert np.array_equal(wp[:, 0], anchor1), 'First entry of warping path does not coincide with anchor point'
204
+ assert np.array_equal(wp[:, -1], anchor2), 'Last entry of warping path does not coincide with anchor point'
205
+
206
+ if path is None:
207
+ path = np.array(wp[:, :-1], copy=True)
208
+ else:
209
+ path = np.concatenate([path, wp[:, :-1]], axis=1)
210
+
211
+ # append last index of warping path
212
+ path = np.concatenate([path, wp[:, -1].reshape(-1, 1)], axis=1)
213
+
214
+ return path
215
+
216
+
217
+ def find_anchor_indices_in_warping_path(warping_path: np.ndarray,
218
+ anchors: np.ndarray) -> np.ndarray:
219
+ """Compute the indices in the warping path that corresponds
220
+ to the elements in 'anchors'
221
+
222
+ Parameters
223
+ ----------
224
+ warping_path : np.ndarray [shape=(2, N)]
225
+ Warping path
226
+
227
+ anchors : np.ndarray [shape=(2, M)]
228
+ Anchor sequence
229
+
230
+ Returns
231
+ -------
232
+ indices : np.ndarray [shape=(2, M)]
233
+ Anchor indices in the ``warping_path``
234
+ """
235
+ indices = np.zeros(anchors.shape[1])
236
+
237
+ for k in range(anchors.shape[1]):
238
+ a = anchors[:, k]
239
+ indices[k] = np.where((a[0] == warping_path[0, :]) & (a[1] == warping_path[1, :]))[0]
240
+
241
+ return indices
242
+
243
+
244
+ def make_path_strictly_monotonic(P: np.ndarray) -> np.ndarray:
245
+ """Compute strict alignment path from a warping path
246
+
247
+ Wrapper around "compute_strict_alignment_path_mask" from libfmp.
248
+
249
+ Parameters
250
+ ----------
251
+ P: np.ndarray [shape=(2, N)]
252
+ Warping path
253
+
254
+ Returns
255
+ -------
256
+ P_mod: np.ndarray [shape=(2, M)]
257
+ Strict alignment path, M <= N
258
+ """
259
+ P_mod = compute_strict_alignment_path_mask(P.T)
260
+
261
+ return P_mod.T
262
+
263
+ def compute_strict_alignment_path_mask(P):
264
+ """Compute strict alignment path from a warping path
265
+
266
+ Notebook: C3/C3S3_MusicAppTempoCurve.ipynb
267
+
268
+ Args:
269
+ P (list or np.ndarray): Wapring path
270
+
271
+ Returns:
272
+ P_mod (list or np.ndarray): Strict alignment path
273
+ """
274
+ P = np.array(P, copy=True)
275
+ N, M = P[-1]
276
+ # Get indices for strict monotonicity
277
+ keep_mask = (P[1:, 0] > P[:-1, 0]) & (P[1:, 1] > P[:-1, 1])
278
+ # Add first index to enforce start boundary condition
279
+ keep_mask = np.concatenate(([True], keep_mask))
280
+ # Remove all indices for of last row or column
281
+ keep_mask[(P[:, 0] == N) | (P[:, 1] == M)] = False
282
+ # Add last index to enforce end boundary condition
283
+ keep_mask[-1] = True
284
+ P_mod = P[keep_mask, :]
285
+
286
+ return P_mod
287
+
288
+
289
+ def evaluate_synchronized_positions(ground_truth_positions: np.ndarray,
290
+ synchronized_positions: np.ndarray,
291
+ tolerances: List = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 250]):
292
+ """Compute standard evaluation measures for evaluating the quality of synchronized (musical) positions.
293
+
294
+ When synchronizing two versions of a piece of music, one can evaluate the quality of the resulting alignment
295
+ by comparing errors at musical positions (e.g. beats or measures) that appear in both versions.
296
+ This function implements two measures: mean absolute error at positions and the percentage of correctly transferred
297
+ measures given a threshold.
298
+
299
+ Parameters
300
+ ----------
301
+ ground_truth_positions: np.ndarray [shape=N]
302
+ Positions (e.g. beat or measure positions) annotated in the target version of a piece of music, in milliseconds.
303
+
304
+ synchronized_positions: np.ndarray [shape=N]
305
+ The same musical positions as in 'ground_truth_positions' obtained by transfer using music synchronization,
306
+ in milliseconds.
307
+
308
+ tolerances: list of integers
309
+ Tolerances (in miliseconds) used for comparing annotated and synchronized positions.
310
+
311
+ Returns
312
+ -------
313
+ mean_absolute_error: float
314
+ Mean absolute error for synchronized positions, in miliseconds.
315
+
316
+ accuracy_at_tolerances: list of floats
317
+ Percentages of correctly transferred measures, for each entry in 'tolerances'.
318
+
319
+ """
320
+ absolute_errors_at_positions = np.abs(synchronized_positions - ground_truth_positions)
321
+
322
+ print('Measure transfer from recording 1 to 2 yielded:')
323
+ mean_absolute_error = np.mean(absolute_errors_at_positions)
324
+ print('\nMean absolute error (MAE): %.2fms (standard deviation: %.2fms)' % (mean_absolute_error,
325
+ np.std(absolute_errors_at_positions)))
326
+ print('\nAccuracy of transferred positions at different tolerances:')
327
+ print('\t\t\tAccuracy')
328
+ print('################################')
329
+ accuracy_at_tolerances = []
330
+ for tolerance in tolerances:
331
+ accuracy = np.mean((absolute_errors_at_positions < tolerance)) * 100.0
332
+ accuracy_at_tolerances.append(accuracy)
333
+ print('Tolerance: {} ms \t{:.2f} %'.format(tolerance, accuracy))
334
+
335
+ return mean_absolute_error, accuracy_at_tolerances
336
+
337
+
338
+ def smooth_downsample_feature(f_feature: np.ndarray,
339
+ input_feature_rate: float,
340
+ win_len_smooth: int = 0,
341
+ downsamp_smooth: int = 1) -> Tuple[np.ndarray, float]:
342
+ """Temporal smoothing and downsampling of a feature sequence
343
+
344
+ Parameters
345
+ ----------
346
+ f_feature : np.ndarray
347
+ Input feature sequence, size dxN
348
+
349
+ input_feature_rate : float
350
+ Input feature rate in Hz
351
+
352
+ win_len_smooth : int
353
+ Smoothing window length. For 0, no smoothing is applied.
354
+
355
+ downsamp_smooth : int
356
+ Downsampling factor. For 1, no downsampling is applied.
357
+
358
+ Returns
359
+ -------
360
+ f_feature_stat : np.ndarray
361
+ Downsampled & smoothed feature.
362
+
363
+ new_feature_rate : float
364
+ New feature rate after downsampling
365
+ """
366
+ if win_len_smooth != 0 or downsamp_smooth != 1:
367
+ # hack to get the same results as on MATLAB
368
+ stat_window = np.hanning(win_len_smooth+2)[1:-1]
369
+ stat_window /= np.sum(stat_window)
370
+
371
+ # upfirdn filters and downsamples each column of f_stat_help
372
+ f_feature_stat = signal.upfirdn(h=stat_window, x=f_feature, up=1, down=downsamp_smooth)
373
+ seg_num = f_feature.shape[1]
374
+ stat_num = int(np.ceil(seg_num / downsamp_smooth))
375
+ cut = int(np.floor((win_len_smooth - 1) / (2 * downsamp_smooth)))
376
+ f_feature_stat = f_feature_stat[:, cut: stat_num + cut]
377
+ else:
378
+ f_feature_stat = f_feature
379
+
380
+ new_feature_rate = input_feature_rate / downsamp_smooth
381
+
382
+ return f_feature_stat, new_feature_rate
383
+
384
+
385
+ @jit(nopython=True)
386
+ def normalize_feature(feature: np.ndarray,
387
+ norm_ord: int,
388
+ threshold: float) -> np.ndarray:
389
+ """Normalizes a feature sequence according to the l^norm_ord norm.
390
+
391
+ Parameters
392
+ ----------
393
+ feature : np.ndarray
394
+ Input feature sequence of size d x N
395
+ d: dimensionality of feature vectors
396
+ N: number of feature vectors (time in frames)
397
+
398
+ norm_ord : int
399
+ Norm degree
400
+
401
+ threshold : float
402
+ If the norm falls below threshold for a feature vector, then the
403
+ normalized feature vector is set to be the normalized unit vector.
404
+
405
+ Returns
406
+ -------
407
+ f_normalized : np.ndarray
408
+ Normalized feature sequence
409
+ """
410
+ # TODO rewrite in vectorized fashion
411
+ d, N = feature.shape
412
+ f_normalized = np.zeros((d, N))
413
+
414
+ # normalize the vectors according to the l^norm_ord norm
415
+ unit_vec = np.ones(d)
416
+ unit_vec = unit_vec / np.linalg.norm(unit_vec, norm_ord)
417
+
418
+ for k in range(N):
419
+ cur_norm = np.linalg.norm(feature[:, k], norm_ord)
420
+
421
+ if cur_norm < threshold:
422
+ f_normalized[:, k] = unit_vec
423
+ else:
424
+ f_normalized[:, k] = feature[:, k] / cur_norm
425
+
426
+ return f_normalized
musc/dtw/visualization.py ADDED
@@ -0,0 +1,216 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import matplotlib
2
+ import matplotlib.cm
3
+ import matplotlib.patches
4
+ import matplotlib.pyplot as plt
5
+ import numpy as np
6
+ from typing import Tuple, List
7
+
8
+
9
+ def sync_visualize_step1(cost_matrices: List,
10
+ num_rows: int,
11
+ num_cols: int,
12
+ anchors: np.ndarray,
13
+ wp: np.ndarray) -> Tuple[plt.Figure, plt.Axes]:
14
+
15
+ fig, ax = plt.subplots(1, 1, dpi=72)
16
+ ax = __visualize_cost_matrices(ax, cost_matrices)
17
+ __visualize_constraint_rectangles(anchors[[1, 0], :],
18
+ edgecolor='firebrick')
19
+
20
+ __visualize_path_in_matrix(ax=ax,
21
+ wp=wp,
22
+ axisX=np.arange(0, num_rows),
23
+ axisY=np.arange(0, num_cols),
24
+ path_color='firebrick')
25
+
26
+ return fig, ax
27
+
28
+
29
+ def sync_visualize_step2(ax: plt.Axes,
30
+ cost_matrices: list,
31
+ wp_step2: np.ndarray,
32
+ wp_step1: np.ndarray,
33
+ num_rows_step1: int,
34
+ num_cols_step1: int,
35
+ anchors_step1: np.ndarray,
36
+ neighboring_anchors: np.ndarray,
37
+ plot_title: str = ""):
38
+
39
+ offset_x = neighboring_anchors[0, 0] - 1
40
+ offset_y = neighboring_anchors[1, 0] - 1
41
+ ax = __visualize_cost_matrices(ax=ax,
42
+ cost_matrices=cost_matrices,
43
+ offset_x=offset_x,
44
+ offset_y=offset_y)
45
+
46
+ __visualize_constraint_rectangles(anchors_step1[[1, 0], :],
47
+ edgecolor='firebrick')
48
+
49
+ __visualize_path_in_matrix(ax=ax,
50
+ wp=wp_step1,
51
+ axisX=np.arange(0, num_rows_step1),
52
+ axisY=np.arange(0, num_cols_step1),
53
+ path_color='firebrick')
54
+
55
+ __visualize_constraint_rectangles(neighboring_anchors[[1, 0], :] - 1,
56
+ edgecolor='orangered',
57
+ linestyle='--')
58
+
59
+ __visualize_path_in_matrix(ax=ax,
60
+ wp=wp_step2,
61
+ axisX=np.arange(0, num_rows_step1),
62
+ axisY=np.arange(0, num_cols_step1),
63
+ path_color='orangered')
64
+
65
+ ax.set_title(plot_title)
66
+ ax.set_ylabel("Version 1 (frames)")
67
+ ax.set_xlabel("Version 2 (frames)")
68
+
69
+ ax = plt.gca() # get the current axes
70
+ pcm = None
71
+ for pcm in ax.get_children():
72
+ if isinstance(pcm, matplotlib.cm.ScalarMappable):
73
+ break
74
+ plt.colorbar(pcm, ax=ax)
75
+ plt.tight_layout()
76
+ plt.show()
77
+
78
+
79
+ def __size_dtw_matrices(dtw_matrices: List) -> Tuple[List[np.ndarray], List[np.ndarray]]:
80
+ """Gives information about the dimensionality of a DTW matrix
81
+ given in form of a list matrix
82
+
83
+ Parameters
84
+ ----------
85
+ dtw_matrices: list
86
+ The DTW matrix (cost matrix or accumulated cost matrix) given in form a list.
87
+
88
+ Returns
89
+ -------
90
+ axisX_list: list
91
+ A list containing a horizontal axis for each of the sub matrices
92
+ which specifies the horizontal position of the respective submatrix
93
+ in the overall cost matrix.
94
+
95
+ axis_y_list: list
96
+ A list containing a vertical axis for each of the
97
+ sub matrices which specifies the vertical position of the
98
+ respective submatrix in the overall cost matrix.
99
+
100
+ """
101
+ num_matrices = len(dtw_matrices)
102
+ size_list = [dtw_mat.shape for dtw_mat in dtw_matrices]
103
+
104
+ axis_x_list = list()
105
+ axis_y_list = list()
106
+
107
+ x_acc = 0
108
+ y_acc = 0
109
+
110
+ for i in range(num_matrices):
111
+ curr_size_list = size_list[i]
112
+ axis_x_list.append(np.arange(x_acc, x_acc + curr_size_list[0]))
113
+ axis_y_list.append(np.arange(y_acc, y_acc + curr_size_list[1]))
114
+ x_acc += curr_size_list[0] - 1
115
+ y_acc += curr_size_list[1] - 1
116
+
117
+ return axis_x_list, axis_y_list
118
+
119
+
120
+ def __visualize_cost_matrices(ax: plt.Axes,
121
+ cost_matrices: list = None,
122
+ offset_x: float = 0.0,
123
+ offset_y: float = 0.0) -> plt.Axes:
124
+ """Visualizes cost matrices
125
+
126
+ Parameters
127
+ ----------
128
+ ax : axes
129
+ The Axes instance to plot on
130
+
131
+ cost_matrices : list
132
+ List of DTW cost matrices.
133
+
134
+ offset_x : float
135
+ Offset on the x axis.
136
+
137
+ offset_y : float
138
+ Offset on the y axis.
139
+
140
+ Returns
141
+ -------
142
+ ax: axes
143
+ The Axes instance to plot on
144
+
145
+ """
146
+ x_ax, y_ax = __size_dtw_matrices(dtw_matrices=cost_matrices)
147
+
148
+ for i, cur_cost in enumerate(cost_matrices[::-1]):
149
+ curr_x_ax = x_ax[i] + offset_x
150
+ curr_y_ax = y_ax[i] + offset_y
151
+ cur_cost = cost_matrices[i]
152
+ ax.imshow(cur_cost, cmap='gray_r', aspect='auto', origin='lower',
153
+ extent=[curr_y_ax[0], curr_y_ax[-1], curr_x_ax[0], curr_x_ax[-1]])
154
+
155
+ return ax
156
+
157
+
158
+ def __visualize_path_in_matrix(ax,
159
+ wp: np.ndarray = None,
160
+ axisX: np.ndarray = None,
161
+ axisY: np.ndarray = None,
162
+ path_color: str = 'r'):
163
+ """Plots a warping path on top of a given matrix. The matrix is
164
+ usually an accumulated cost matrix.
165
+
166
+ Parameters
167
+ ----------
168
+ ax : axes
169
+ The Axes instance to plot on
170
+
171
+ wp : np.ndarray
172
+ Warping path
173
+
174
+ axisX : np.ndarray
175
+ Array of X axis
176
+
177
+ axisY : np.ndarray
178
+ Array of Y axis
179
+
180
+ path_color : str
181
+ Color of the warping path to be plotted. (default: r)
182
+ """
183
+ assert axisX is not None and isinstance(axisX, np.ndarray), 'axisX must be a numpy array!'
184
+ assert axisY is not None and isinstance(axisY, np.ndarray), 'axisY must be a numpy array!'
185
+
186
+ wp = wp.astype(int)
187
+
188
+ ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], '-k', linewidth=5)
189
+ ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], color=path_color, linewidth=3)
190
+
191
+
192
+ def __visualize_constraint_rectangles(anchors: np.ndarray,
193
+ linestyle: str = '-',
194
+ edgecolor: str = 'royalblue',
195
+ linewidth: float = 1.0):
196
+
197
+ for k in range(anchors.shape[1]-1):
198
+ a1 = anchors[:, k]
199
+ a2 = anchors[:, k + 1]
200
+
201
+ # a rectangle is defined by [x y width height]
202
+ x = a1[0]
203
+ y = a1[1]
204
+ w = a2[0] - a1[0] + np.finfo(float).eps
205
+ h = a2[1] - a1[1] + np.finfo(float).eps
206
+
207
+ rect = matplotlib.patches.Rectangle((x, y), w, h,
208
+ linewidth=linewidth,
209
+ edgecolor=edgecolor,
210
+ linestyle=linestyle,
211
+ facecolor='none')
212
+
213
+ plt.gca().add_patch(rect)
214
+
215
+
216
+
musc/model.py ADDED
@@ -0,0 +1,275 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from musc.pathway import TinyPathway
2
+ from musc.synchronizer import Synchronizer
3
+ from musc.representations import PerformanceLabel
4
+ from torchaudio.models.conformer import ConformerLayer
5
+ import torch
6
+ from torch import nn
7
+ import numpy as np
8
+ import os
9
+ import json
10
+ import gdown
11
+
12
+
13
+ class FourHeads(Synchronizer):
14
+ def __init__(
15
+ self,
16
+ pathway_multiscale: int = 32,
17
+ num_pathway_layers: int = 2,
18
+ chunk_size: int = 256,
19
+ hop_length: int = 256,
20
+ encoder_dim: int = 256,
21
+ sr: int = 44100,
22
+ num_heads: int = 4,
23
+ ffn_dim: int = 128,
24
+ num_separator_layers: int = 16,
25
+ num_representation_layers: int = 4,
26
+ depthwise_conv_kernel_size: int = 31,
27
+ dropout: float = 0.25,
28
+ use_group_norm: bool = False,
29
+ convolution_first: bool = False,
30
+ labeling=PerformanceLabel(),
31
+ wiring='tiktok'
32
+ ):
33
+ super().__init__(labeling, sr=sr, hop_length=hop_length)
34
+ self.main = TinyPathway(dilation=1, hop=hop_length, localize=True,
35
+ n_layers=num_pathway_layers, chunk_size=chunk_size)
36
+ self.attendant = TinyPathway(dilation=pathway_multiscale, hop=hop_length, localize=False,
37
+ n_layers=num_pathway_layers, chunk_size=chunk_size)
38
+ assert self.main.hop == self.attendant.hop # they should output with the same sample rate
39
+ print('hop in samples:', self.main.hop)
40
+ self.input_window = self.attendant.input_window
41
+
42
+ self.encoder_dim = encoder_dim
43
+ self.dropout = nn.Dropout(dropout)
44
+
45
+ # merge two streams into a conformer input
46
+ self.stream_merger = nn.Sequential(self.dropout,
47
+ nn.Linear(self.main.out_dim + self.attendant.out_dim, self.encoder_dim))
48
+
49
+
50
+
51
+ print('main stream window:', self.main.input_window,
52
+ ', attendant stream window:', self.attendant.input_window,
53
+ ', conformer input dim:', self.encoder_dim)
54
+
55
+ center = ((chunk_size - 1) * self.main.hop) # region labeled with pitch track
56
+ main_overlap = self.main.input_window - center
57
+ main_overlap = [int(np.floor(main_overlap / 2)), int(np.ceil(main_overlap / 2))]
58
+ attendant_overlap = self.attendant.input_window - center
59
+ attendant_overlap = [int(np.floor(attendant_overlap / 2)), int(np.ceil(attendant_overlap / 2))]
60
+ print('main frame overlap:', main_overlap, ', attendant frame overlap:', attendant_overlap)
61
+ main_crop_relative = [attendant_overlap[0] - main_overlap[0], main_overlap[1] - attendant_overlap[1]]
62
+ print('crop for main pathway', main_crop_relative)
63
+ print("Total sequence duration is", self.attendant.input_window, 'samples')
64
+ print('Main stream receptive field for one frame is', (self.main.input_window - center), 'samples')
65
+ print('Attendant stream receptive field for one frame is', (self.attendant.input_window - center), 'samples')
66
+ self.frame_overlap = attendant_overlap
67
+
68
+ self.main_stream_crop = main_crop_relative
69
+ self.max_window_size = self.attendant.input_window
70
+ self.chunk_size = chunk_size
71
+
72
+ self.separator_stream = nn.ModuleList( # source-separation, reinvented
73
+ [
74
+ ConformerLayer(
75
+ input_dim=self.encoder_dim,
76
+ ffn_dim=ffn_dim,
77
+ num_attention_heads=num_heads,
78
+ depthwise_conv_kernel_size=depthwise_conv_kernel_size,
79
+ dropout=dropout,
80
+ use_group_norm=use_group_norm,
81
+ convolution_first=convolution_first,
82
+ )
83
+ for _ in range(num_separator_layers)
84
+ ]
85
+ )
86
+
87
+ self.f0_stream = nn.ModuleList(
88
+ [
89
+ ConformerLayer(
90
+ input_dim=self.encoder_dim,
91
+ ffn_dim=ffn_dim,
92
+ num_attention_heads=num_heads,
93
+ depthwise_conv_kernel_size=depthwise_conv_kernel_size,
94
+ dropout=dropout,
95
+ use_group_norm=use_group_norm,
96
+ convolution_first=convolution_first,
97
+ )
98
+ for _ in range(num_representation_layers)
99
+ ]
100
+ )
101
+ self.f0_head = nn.Linear(self.encoder_dim, len(self.labeling.f0_centers_c))
102
+
103
+ self.note_stream = nn.ModuleList(
104
+ [
105
+ ConformerLayer(
106
+ input_dim=self.encoder_dim,
107
+ ffn_dim=ffn_dim,
108
+ num_attention_heads=num_heads,
109
+ depthwise_conv_kernel_size=depthwise_conv_kernel_size,
110
+ dropout=dropout,
111
+ use_group_norm=use_group_norm,
112
+ convolution_first=convolution_first,
113
+ )
114
+ for _ in range(num_representation_layers)
115
+ ]
116
+ )
117
+ self.note_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers))
118
+
119
+ self.onset_stream = nn.ModuleList(
120
+ [
121
+ ConformerLayer(
122
+ input_dim=self.encoder_dim,
123
+ ffn_dim=ffn_dim,
124
+ num_attention_heads=num_heads,
125
+ depthwise_conv_kernel_size=depthwise_conv_kernel_size,
126
+ dropout=dropout,
127
+ use_group_norm=use_group_norm,
128
+ convolution_first=convolution_first,
129
+ )
130
+ for _ in range(num_representation_layers)
131
+ ]
132
+ )
133
+ self.onset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers))
134
+
135
+ self.offset_stream = torch.nn.ModuleList(
136
+ [
137
+ ConformerLayer(
138
+ input_dim=self.encoder_dim,
139
+ ffn_dim=ffn_dim,
140
+ num_attention_heads=num_heads,
141
+ depthwise_conv_kernel_size=depthwise_conv_kernel_size,
142
+ dropout=dropout,
143
+ use_group_norm=use_group_norm,
144
+ convolution_first=convolution_first,
145
+ )
146
+ for _ in range(num_representation_layers)
147
+ ]
148
+ )
149
+ self.offset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers))
150
+
151
+ self.labeling = labeling
152
+ self.double_merger = nn.Sequential(self.dropout, nn.Linear(2 * self.encoder_dim, self.encoder_dim))
153
+ self.triple_merger = nn.Sequential(self.dropout, nn.Linear(3 * self.encoder_dim, self.encoder_dim))
154
+ self.wiring = wiring
155
+
156
+ print('Total parameter count: ', self.count_parameters())
157
+
158
+ def count_parameters(self) -> int:
159
+ """ Count parameters of encoder """
160
+ return sum([p.numel() for p in self.parameters()])
161
+
162
+ def stream(self, x, representation, key_padding_mask=None):
163
+ for i, layer in enumerate(self.__getattr__('{}_stream'.format(representation))):
164
+ x = layer(x, key_padding_mask)
165
+ return x
166
+ def head(self, x, representation):
167
+ return self.__getattr__('{}_head'.format(representation))(x)
168
+
169
+ def forward(self, x, key_padding_mask=None):
170
+
171
+ # two auditory streams followed by the separator stream to ensure timbre-awareness
172
+ x_attendant = self.attendant(x)
173
+ x_main = self.main(x[:, self.main_stream_crop[0]:self.main_stream_crop[1]])
174
+ x = self.stream_merger(torch.cat((x_attendant, x_main), -1).squeeze(1))
175
+ x = self.stream(x, 'separator', key_padding_mask)
176
+
177
+ f0 = self.stream(x, 'f0', key_padding_mask) # they say this is a low level feature :)
178
+
179
+ if self.wiring == 'parallel':
180
+ note = self.stream(x, 'note', key_padding_mask)
181
+ onset = self.stream(x, 'onset', key_padding_mask)
182
+ offset = self.stream(x, 'offset', key_padding_mask)
183
+
184
+ elif self.wiring == 'tiktok':
185
+ onset = self.stream(x, 'onset', key_padding_mask)
186
+ offset = self.stream(x, 'offset', key_padding_mask)
187
+ # f0 is disconnected, note relies on separator, onset, and offset
188
+ note = self.stream(self.triple_merger(torch.cat((x, onset, offset), -1)), 'note', key_padding_mask)
189
+
190
+ elif self.wiring == 'tiktok2':
191
+ onset = self.stream(x, 'onset', key_padding_mask)
192
+ offset = self.stream(x, 'offset', key_padding_mask)
193
+ # note is connected to f0, onset, and offset
194
+ note = self.stream(self.triple_merger(torch.cat((f0, onset, offset), -1)), 'note', key_padding_mask)
195
+
196
+ elif self.wiring == 'spotify':
197
+ # note is connected to f0 only
198
+ note = self.stream(f0, 'note', key_padding_mask)
199
+ # here onset and onsets are higher-level features informed by the separator and note
200
+ onset = self.stream(self.double_merger(torch.cat((x, note), -1)), 'onset', key_padding_mask)
201
+ offset = self.stream(self.double_merger(torch.cat((x, note), -1)), 'offset', key_padding_mask)
202
+
203
+ else:
204
+ # onset and offset are connected to f0 and separator streams
205
+ onset = self.stream(self.double_merger(torch.cat((x, f0), -1)), 'onset', key_padding_mask)
206
+ offset = self.stream(self.double_merger(torch.cat((x, f0), -1)), 'offset', key_padding_mask)
207
+ # note is connected to f0, onset, and offset streams
208
+ note = self.stream(self.triple_merger(torch.cat((f0, onset, offset), -1)), 'note', key_padding_mask)
209
+
210
+
211
+ return {'f0': self.head(f0, 'f0'),
212
+ 'note': self.head(note, 'note'),
213
+ 'onset': self.head(onset, 'onset'),
214
+ 'offset': self.head(offset, 'offset')}
215
+
216
+
217
+ class PretrainedModel(FourHeads):
218
+ def __init__(self, instrument='violin'):
219
+ assert instrument in ['violin', 'Violin', 'vln', 'vl'], 'As of now, the only supported instrument is the violin'
220
+ instrument = 'violin'
221
+ package_dir = os.path.dirname(os.path.realpath(__file__))
222
+ with open(os.path.join(package_dir, instrument + ".json"), "r") as f:
223
+ args = json.load(f)
224
+ labeling = PerformanceLabel(note_min=args['note_low'], note_max=args['note_high'],
225
+ f0_bins_per_semitone=args['f0_bins_per_semitone'],
226
+ f0_tolerance_c=200,
227
+ f0_smooth_std_c=args['f0_smooth_std_c'], onset_smooth_std=args['onset_smooth_std'])
228
+
229
+ super().__init__(pathway_multiscale=args['pathway_multiscale'],
230
+ num_pathway_layers=args['num_pathway_layers'], wiring=args['wiring'],
231
+ hop_length=args['hop_length'], chunk_size=args['chunk_size'],
232
+ labeling=labeling, sr=args['sampling_rate'])
233
+ self.model_url = args['model_file']
234
+ self.load_weight(instrument)
235
+ self.eval()
236
+
237
+ def load_weight(self, instrument):
238
+ self.download_weights(instrument)
239
+ package_dir = os.path.dirname(os.path.realpath(__file__))
240
+ filename = "{}_model.pt".format(instrument)
241
+ self.load_state_dict(torch.load(os.path.join(package_dir, filename)))
242
+
243
+ def download_weights(self, instrument):
244
+ weight_file = "{}_model.pt".format(instrument)
245
+ package_dir = os.path.dirname(os.path.realpath(__file__))
246
+ weight_path = os.path.join(package_dir, weight_file)
247
+ if not os.path.isfile(weight_path):
248
+ package_dir = os.path.dirname(os.path.realpath(__file__))
249
+ weight_path = os.path.join(package_dir, weight_file)
250
+ if not os.path.exists(weight_path):
251
+ gdown.download(f"https://drive.google.com/uc?export=download&confirm=pbef&id={self.model_url}", weight_path)
252
+
253
+ @staticmethod
254
+ def download_youtube(url, audio_codec='wav'):
255
+ from yt_dlp import YoutubeDL
256
+ ydl_opts = {'no-playlist': True, 'quiet': True, 'format': 'bestaudio/best',
257
+ 'outtmpl': '%(id)s.%(ext)s', 'postprocessors': [{
258
+ 'key': 'FFmpegExtractAudio',
259
+ 'preferredcodec': audio_codec,
260
+ 'preferredquality': '192', }], }
261
+ with YoutubeDL(ydl_opts) as ydl:
262
+ info_dict = ydl.extract_info(url, download=False)
263
+ video_id = info_dict.get('id', None)
264
+ title = info_dict.get('title', None)
265
+ ydl.download([url])
266
+ return video_id + '.' + audio_codec, video_id, title
267
+
268
+ def transcribe_youtube(self, url, audio_codec='wav', batch_size=64,
269
+ postprocessing='spotify', include_pitch_bends=True):
270
+ file_path, video_id, title = self.download_youtube(url, audio_codec=audio_codec)
271
+ midi = self.transcribe(file_path, batch_size=batch_size,
272
+ postprocessing=postprocessing, include_pitch_bends=include_pitch_bends)
273
+ return midi, video_id, title
274
+
275
+