|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""event2note.py: |
|
|
|
Event to NoteEvent: |
|
• event2note_event |
|
|
|
NoteEvent to Note: |
|
• note_event2note |
|
• merge_zipped_note_events_and_ties_to_notes |
|
|
|
""" |
|
import warnings |
|
from collections import Counter |
|
from typing import List, Tuple, Optional, Dict, Counter |
|
|
|
from utils.note_event_dataclasses import Note, NoteEvent |
|
from utils.note_event_dataclasses import Event |
|
from utils.note2event import validate_notes, trim_overlapping_notes |
|
|
|
MINIMUM_OFFSET_SEC = 0.01 |
|
|
|
DECODING_ERR_TYPES = [ |
|
'decoding_time', 'Err/Missing prg in tie', 'Err/Missing tie', 'Err/Shift out of range', 'Err/Missing prg', |
|
'Err/Missing vel', 'Err/Multi-tie type 1', 'Err/Multi-tie type 2', 'Err/Unknown event', 'Err/onset not found', |
|
'Err/active ne incomplete', 'Err/merging segment tie', 'Err/long note > 10s' |
|
] |
|
|
|
|
|
def event2note_event(events: List[Event], |
|
start_time: float = 0.0, |
|
sort: bool = True, |
|
tps: int = 100) -> Tuple[List[NoteEvent], List[NoteEvent], List[Tuple[int]], Counter[str]]: |
|
"""Convert events to note events. |
|
|
|
Args: |
|
events: A list of events. |
|
start_time: The start time of the segment. |
|
sort: Whether to sort the note events. |
|
tps: Ticks per second. |
|
|
|
Returns: |
|
List[NoteEvent]: A list of note events. |
|
List[NoteEvent]: A list of tie note events. |
|
List[Tuple[int]]: A list of last activity of segment. [(program, pitch), ...]. This is useful |
|
for validating notes within a batch of segments extracted from a file. |
|
Counter[str]: A dictionary of error counters. |
|
""" |
|
assert (start_time >= 0.) |
|
|
|
|
|
tie_index = program_state = None |
|
tie_note_events = [] |
|
last_activity = [] |
|
error_counter = {} |
|
|
|
for i, e in enumerate(events): |
|
try: |
|
if e.type == 'tie': |
|
tie_index = i |
|
break |
|
if e.type == 'shift': |
|
break |
|
elif e.type == 'program': |
|
program_state = e.value |
|
elif e.type == 'pitch': |
|
if program_state is None: |
|
raise ValueError('Err/Missing prg in tie') |
|
tie_note_events.append( |
|
NoteEvent(is_drum=False, program=program_state, time=None, velocity=1, pitch=e.value)) |
|
last_activity.append((program_state, e.value)) |
|
except ValueError as ve: |
|
error_type = str(ve) |
|
error_counter[error_type] = error_counter.get(error_type, 0.) + 1 |
|
|
|
try: |
|
if tie_index is None: |
|
raise ValueError('Err/Missing tie') |
|
else: |
|
events = events[tie_index + 1:] |
|
except ValueError as ve: |
|
error_type = str(ve) |
|
error_counter[error_type] = error_counter.get(error_type, 0.) + 1 |
|
return [], [], [], error_counter |
|
|
|
|
|
note_events = [] |
|
velocity_state = None |
|
start_tick = round(start_time * tps) |
|
tick_state = start_tick |
|
|
|
|
|
for e in events: |
|
try: |
|
if e.type == 'shift': |
|
if e.value <= 0 or e.value > 1000: |
|
raise ValueError('Err/Shift out of range') |
|
|
|
tick_state = start_tick + e.value |
|
elif e.type == 'drum': |
|
note_events.append( |
|
NoteEvent(is_drum=True, program=128, time=tick_state / tps, velocity=1, pitch=e.value)) |
|
elif e.type == 'program': |
|
program_state = e.value |
|
elif e.type == 'velocity': |
|
velocity_state = e.value |
|
elif e.type == 'pitch': |
|
if program_state is None: |
|
raise ValueError('Err/Missing prg') |
|
elif velocity_state is None: |
|
raise ValueError('Err/Missing vel') |
|
|
|
if velocity_state > 0: |
|
last_activity.append((program_state, e.value)) |
|
elif velocity_state == 0 and (program_state, e.value) in last_activity: |
|
last_activity.remove((program_state, e.value)) |
|
else: |
|
|
|
raise ValueError('Err/Note off without note on') |
|
note_events.append( |
|
NoteEvent(is_drum=False, |
|
program=program_state, |
|
time=tick_state / tps, |
|
velocity=velocity_state, |
|
pitch=e.value)) |
|
elif e.type == 'EOS': |
|
break |
|
elif e.type == 'PAD': |
|
continue |
|
elif e.type == 'UNK': |
|
continue |
|
elif e.type == 'tie': |
|
if tick_state == start_tick: |
|
raise ValueError('Err/Multi-tie type 1') |
|
else: |
|
raise ValueError('Err/Multi-tie type 2') |
|
else: |
|
raise ValueError(f'Err/Unknown event') |
|
except ValueError as ve: |
|
error_type = str(ve) |
|
error_counter[error_type] = error_counter.get(error_type, 0.) + 1 |
|
|
|
if sort: |
|
note_events.sort(key=lambda n_ev: (n_ev.time, n_ev.is_drum, n_ev.program, n_ev.velocity, n_ev.pitch)) |
|
tie_note_events.sort(key=lambda n_ev: (n_ev.is_drum, n_ev.program, n_ev.pitch)) |
|
|
|
return note_events, tie_note_events, last_activity, error_counter |
|
|
|
|
|
def note_event2note( |
|
note_events: List[NoteEvent], |
|
tie_note_events: Optional[List[NoteEvent]] = None, |
|
sort: bool = True, |
|
fix_offset: bool = True, |
|
trim_overlap: bool = True, |
|
) -> Tuple[List[Note], Counter[str]]: |
|
"""Convert note events to notes. |
|
|
|
Returns: |
|
List[Note]: A list of merged note events. |
|
Counter[str]: A dictionary of error counters. |
|
""" |
|
|
|
notes = [] |
|
active_note_events = {} |
|
|
|
error_counter = {} |
|
|
|
if tie_note_events is not None: |
|
for ne in tie_note_events: |
|
active_note_events[(ne.pitch, ne.program)] = ne |
|
|
|
if sort: |
|
note_events.sort(key=lambda ne: (ne.time, ne.is_drum, ne.pitch, ne.velocity, ne.program)) |
|
|
|
for ne in note_events: |
|
try: |
|
if ne.time == None: |
|
continue |
|
elif ne.is_drum: |
|
if ne.velocity == 1: |
|
notes.append( |
|
Note(is_drum=True, |
|
program=128, |
|
onset=ne.time, |
|
offset=ne.time + MINIMUM_OFFSET_SEC, |
|
pitch=ne.pitch, |
|
velocity=1)) |
|
else: |
|
continue |
|
elif ne.velocity == 1: |
|
active_ne = active_note_events.get((ne.pitch, ne.program)) |
|
if active_ne is not None: |
|
active_note_events.pop((ne.pitch, ne.program)) |
|
notes.append( |
|
Note(False, active_ne.program, active_ne.time, ne.time, active_ne.pitch, active_ne.velocity)) |
|
active_note_events[(ne.pitch, ne.program)] = ne |
|
|
|
elif ne.velocity == 0: |
|
active_ne = active_note_events.pop((ne.pitch, ne.program), None) |
|
if active_ne is not None: |
|
notes.append( |
|
Note(False, active_ne.program, active_ne.time, ne.time, active_ne.pitch, active_ne.velocity)) |
|
else: |
|
raise ValueError('Err/onset not found') |
|
except ValueError as ve: |
|
error_type = str(ve) |
|
error_counter[error_type] = error_counter.get(error_type, 0.) + 1 |
|
|
|
for ne in active_note_events.values(): |
|
try: |
|
if ne.velocity == 1: |
|
if ne.program == None or ne.pitch == None: |
|
raise ValueError('Err/active ne incomplete') |
|
elif ne.time == None: |
|
continue |
|
else: |
|
notes.append( |
|
Note(is_drum=False, |
|
program=ne.program, |
|
onset=ne.time, |
|
offset=ne.time + MINIMUM_OFFSET_SEC, |
|
pitch=ne.pitch, |
|
velocity=1)) |
|
except ValueError as ve: |
|
error_type = str(ve) |
|
error_counter[error_type] = error_counter.get(error_type, 0.) + 1 |
|
|
|
if fix_offset: |
|
for n in list(notes): |
|
try: |
|
if n.offset - n.onset > 10: |
|
n.offset = n.onset + MINIMUM_OFFSET_SEC |
|
raise ValueError('Err/long note > 10s') |
|
except ValueError as ve: |
|
error_type = str(ve) |
|
error_counter[error_type] = error_counter.get(error_type, 0.) + 1 |
|
|
|
if sort: |
|
notes.sort(key=lambda note: (note.onset, note.is_drum, note.program, note.velocity, note.pitch)) |
|
|
|
if fix_offset: |
|
notes = validate_notes(notes, fix=True) |
|
|
|
if trim_overlap: |
|
notes = trim_overlapping_notes(notes, sort=True) |
|
|
|
return notes, error_counter |
|
|
|
|
|
def merge_zipped_note_events_and_ties_to_notes(zipped_note_events_and_ties, |
|
force_note_off_missing_tie=True, |
|
fix_offset=True) -> Tuple[List[Note], Counter[str]]: |
|
"""Merge zipped note events and ties. |
|
|
|
Args: |
|
zipped_note_events_and_ties: A list of tuples of (note events, tie note events, last_activity, start time). |
|
force_note_off_missing_tie: Whether to force note off for missing tie note events. |
|
fix_offset: Whether to fix the offset of notes. |
|
|
|
Returns: |
|
List[Note]: A list of merged note events. |
|
Counter[str]: A dictionary of error counters. |
|
""" |
|
merged_note_events = [] |
|
prev_last_activity = None |
|
seg_merge_err_cnt = Counter() |
|
for nes, tie_nes, last_activity, start_time in zipped_note_events_and_ties: |
|
if prev_last_activity is not None and force_note_off_missing_tie: |
|
|
|
prog_pitch_tie = set([(ne.program, ne.pitch) for ne in tie_nes]) |
|
for prog_pitch_pla in prev_last_activity: |
|
if prog_pitch_pla not in prog_pitch_tie: |
|
|
|
|
|
merged_note_events.append( |
|
NoteEvent(is_drum=False, |
|
program=prog_pitch_pla[0], |
|
time=start_time, |
|
velocity=0, |
|
pitch=prog_pitch_pla[1])) |
|
seg_merge_err_cnt['Err/merging segment tie'] += 1 |
|
else: |
|
pass |
|
merged_note_events += nes |
|
prev_last_activity = last_activity |
|
|
|
|
|
notes, err_cnt = note_event2note(merged_note_events, tie_note_events=None, fix_offset=fix_offset) |
|
|
|
|
|
err_cnt.update(seg_merge_err_cnt) |
|
return notes, err_cnt |
|
|