Spaces:
No application file
No application file
File size: 3,263 Bytes
b5dba8a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
import os
import shutil
import zipfile
import numpy
import torchaudio
from hubert.pre_kmeans_hubert import CustomHubert
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def prepare(path):
"""
Put all the training data in one folder
:param path: The path to the training data, with 2 subdirectories with zips, "semantic" and "wav", with equal pairs in both directories
"""
path = os.path.abspath(path)
raw_data_paths = {
'semantic': os.path.join(path, 'semantic'),
'wav': os.path.join(path, 'wav')
}
prepared_path = os.path.join(path, 'prepared')
if not os.path.isdir(prepared_path):
os.mkdir(prepared_path)
offset = 0
for zip_file in os.listdir(raw_data_paths['semantic']):
print(f'Extracting {os.path.basename(zip_file)}')
offset = extract_files({
'semantic': os.path.join(raw_data_paths['semantic'], zip_file),
'wav': os.path.join(raw_data_paths['wav'], zip_file)
}, prepared_path, offset)
def extract_files(zip_files: dict[str, str], out: str, start_offset: int = 0) -> int:
new_offset = start_offset
with zipfile.ZipFile(zip_files['semantic'], 'r') as semantic_zip:
with zipfile.ZipFile(zip_files['wav'], 'r') as wav_zip:
for file in semantic_zip.infolist():
for file2 in wav_zip.infolist():
if ''.join(file.filename.split('.')[:-1]).lower() == ''.join(file2.filename.split('.')[:-1]):
semantic_zip.extract(file, out)
shutil.move(os.path.join(out, file.filename), os.path.join(out, f'{new_offset}_semantic.npy'))
wav_zip.extract(file2, out)
shutil.move(os.path.join(out, file2.filename), os.path.join(out, f'{new_offset}_wav.wav'))
new_offset += 1
wav_zip.close()
semantic_zip.close()
return new_offset
def prepare2(path, model):
prepared = os.path.join(path, 'prepared')
ready = os.path.join(path, 'ready')
hubert_model = CustomHubert(checkpoint_path=model, device=device)
if not os.path.isdir(ready):
os.mkdir(ready)
wav_string = '_wav.wav'
sem_string = '_semantic.npy'
for input_file in os.listdir(prepared):
input_path = os.path.join(prepared, input_file)
if input_file.endswith(wav_string):
file_num = int(input_file[:-len(wav_string)])
fname = f'{file_num}_semantic_features.npy'
print('Processing', input_file)
if os.path.isfile(fname):
continue
wav, sr = torchaudio.load(input_path)
wav = wav.to(device)
if wav.shape[0] == 2: # Stereo to mono if needed
wav = wav.mean(0, keepdim=True)
output = hubert_model.forward(wav, input_sample_hz=sr)
out_array = output.cpu().numpy()
numpy.save(os.path.join(ready, fname), out_array)
elif input_file.endswith(sem_string):
fname = os.path.join(ready, input_file)
if os.path.isfile(fname):
continue
shutil.copy(input_path, fname)
print('All set! We\'re ready to train!')
|