File size: 3,263 Bytes
b5dba8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import shutil
import zipfile

import numpy
import torchaudio

from hubert.pre_kmeans_hubert import CustomHubert

import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def prepare(path):
    """
    Put all the training data in one folder
    :param path: The path to the training data, with 2 subdirectories with zips, "semantic" and "wav", with equal pairs in both directories
    """
    path = os.path.abspath(path)
    raw_data_paths = {
        'semantic': os.path.join(path, 'semantic'),
        'wav': os.path.join(path, 'wav')
    }
    prepared_path = os.path.join(path, 'prepared')

    if not os.path.isdir(prepared_path):
        os.mkdir(prepared_path)

    offset = 0

    for zip_file in os.listdir(raw_data_paths['semantic']):
        print(f'Extracting {os.path.basename(zip_file)}')
        offset = extract_files({
            'semantic': os.path.join(raw_data_paths['semantic'], zip_file),
            'wav': os.path.join(raw_data_paths['wav'], zip_file)
        }, prepared_path, offset)


def extract_files(zip_files: dict[str, str], out: str, start_offset: int = 0) -> int:
    new_offset = start_offset
    with zipfile.ZipFile(zip_files['semantic'], 'r') as semantic_zip:
        with zipfile.ZipFile(zip_files['wav'], 'r') as wav_zip:
            for file in semantic_zip.infolist():
                for file2 in wav_zip.infolist():
                    if ''.join(file.filename.split('.')[:-1]).lower() == ''.join(file2.filename.split('.')[:-1]):
                        semantic_zip.extract(file, out)
                        shutil.move(os.path.join(out, file.filename), os.path.join(out, f'{new_offset}_semantic.npy'))
                        wav_zip.extract(file2, out)
                        shutil.move(os.path.join(out, file2.filename), os.path.join(out, f'{new_offset}_wav.wav'))
                        new_offset += 1
            wav_zip.close()
        semantic_zip.close()

    return new_offset


def prepare2(path, model):
    prepared = os.path.join(path, 'prepared')
    ready = os.path.join(path, 'ready')
    hubert_model = CustomHubert(checkpoint_path=model, device=device)
    if not os.path.isdir(ready):
        os.mkdir(ready)

    wav_string = '_wav.wav'
    sem_string = '_semantic.npy'

    for input_file in os.listdir(prepared):
        input_path = os.path.join(prepared, input_file)
        if input_file.endswith(wav_string):
            file_num = int(input_file[:-len(wav_string)])
            fname = f'{file_num}_semantic_features.npy'
            print('Processing', input_file)
            if os.path.isfile(fname):
                continue
            wav, sr = torchaudio.load(input_path)
            wav = wav.to(device)

            if wav.shape[0] == 2:  # Stereo to mono if needed
                wav = wav.mean(0, keepdim=True)

            output = hubert_model.forward(wav, input_sample_hz=sr)
            out_array = output.cpu().numpy()
            numpy.save(os.path.join(ready, fname), out_array)
        elif input_file.endswith(sem_string):
            fname = os.path.join(ready, input_file)
            if os.path.isfile(fname):
                continue
            shutil.copy(input_path, fname)
    print('All set! We\'re ready to train!')