|
import os |
|
import gradio as gr |
|
from pytube import YouTube |
|
from pydub import AudioSegment |
|
import numpy as np |
|
import faiss |
|
from sklearn.cluster import MiniBatchKMeans |
|
import traceback |
|
|
|
def calculate_audio_duration(file_path): |
|
duration_seconds = len(AudioSegment.from_file(file_path)) / 1000.0 |
|
return duration_seconds |
|
|
|
def youtube_to_wav(url, dataset_folder): |
|
try: |
|
yt = YouTube(url).streams.get_audio_only().download(output_path=dataset_folder) |
|
mp4_path = os.path.join(dataset_folder, 'audio.mp4') |
|
wav_path = os.path.join(dataset_folder, 'audio.wav') |
|
os.rename(yt, mp4_path) |
|
os.system(f'ffmpeg -i {mp4_path} -acodec pcm_s16le -ar 44100 {wav_path}') |
|
os.remove(mp4_path) |
|
return f'Audio downloaded and converted to WAV: {wav_path}' |
|
except Exception as e: |
|
return f"Error: {e}" |
|
|
|
def create_training_files(model_name, dataset_folder, youtube_link): |
|
if youtube_link: |
|
youtube_to_wav(youtube_link, dataset_folder) |
|
|
|
if not os.listdir(dataset_folder): |
|
return "Your dataset folder is empty." |
|
|
|
os.makedirs(f'./logs/{model_name}', exist_ok=True) |
|
|
|
os.system(f'python infer/modules/train/preprocess.py {dataset_folder} 32000 2 ./logs/{model_name} False 3.0 > /dev/null 2>&1') |
|
|
|
with open(f'./logs/{model_name}/preprocess.log', 'r') as f: |
|
if 'end preprocess' in f.read(): |
|
return "Preprocessing Success" |
|
else: |
|
return "Error preprocessing data... Make sure your dataset folder is correct." |
|
|
|
def extract_features(model_name, f0method): |
|
os.system(f'python infer/modules/train/extract/extract_f0_rmvpe.py 1 0 0 ./logs/{model_name} True' if f0method == "rmvpe_gpu" else |
|
f'python infer/modules/train/extract/extract_f0_print.py ./logs/{model_name} 2 {f0method}') |
|
os.system(f'python infer/modules/train/extract_feature_print.py cuda:0 1 0 ./logs/{model_name} v2 True') |
|
|
|
with open(f'./logs/{model_name}/extract_f0_feature.log', 'r') as f: |
|
if 'all-feature-done' in f.read(): |
|
return "Feature Extraction Success" |
|
else: |
|
return "Error in feature extraction... Make sure your data was preprocessed." |
|
|
|
def train_index(exp_dir1, version19): |
|
exp_dir = f"logs/{exp_dir1}" |
|
os.makedirs(exp_dir, exist_ok=True) |
|
feature_dir = f"{exp_dir}/3_feature256" if version19 == "v1" else f"{exp_dir}/3_feature768" |
|
if not os.path.exists(feature_dir): |
|
return "Please perform feature extraction first!" |
|
|
|
listdir_res = list(os.listdir(feature_dir)) |
|
if len(listdir_res) == 0: |
|
return "Please perform feature extraction first!" |
|
|
|
infos = [] |
|
npys = [] |
|
for name in sorted(listdir_res): |
|
phone = np.load(f"{feature_dir}/{name}") |
|
npys.append(phone) |
|
big_npy = np.concatenate(npys, 0) |
|
big_npy_idx = np.arange(big_npy.shape[0]) |
|
np.random.shuffle(big_npy_idx) |
|
big_npy = big_npy[big_npy_idx] |
|
if big_npy.shape[0] > 2e5: |
|
infos.append(f"Trying k-means with {big_npy.shape[0]} to 10k centers.") |
|
try: |
|
big_npy = MiniBatchKMeans( |
|
n_clusters=10000, |
|
verbose=True, |
|
batch_size=256, |
|
compute_labels=False, |
|
init="random", |
|
).fit(big_npy).cluster_centers_ |
|
except: |
|
info = traceback.format_exc() |
|
infos.append(info) |
|
return "\n".join(infos) |
|
|
|
np.save(f"{exp_dir}/total_fea.npy", big_npy) |
|
n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) |
|
infos.append(f"{big_npy.shape},{n_ivf}") |
|
|
|
index = faiss.index_factory(256 if version19 == "v1" else 768, f"IVF{n_ivf},Flat") |
|
infos.append("Training index") |
|
index_ivf = faiss.extract_index_ivf(index) |
|
index_ivf.nprobe = 1 |
|
index.train(big_npy) |
|
faiss.write_index(index, f"{exp_dir}/trained_IVF{n_ivf}_Flat_nprobe_{index_ivf.nprobe}_{exp_dir1}_{version19}.index") |
|
|
|
infos.append("Adding to index") |
|
batch_size_add = 8192 |
|
for i in range(0, big_npy.shape[0], batch_size_add): |
|
index.add(big_npy[i: i + batch_size_add]) |
|
faiss.write_index(index, f"{exp_dir}/added_IVF{n_ivf}_Flat_nprobe_{index_ivf.nprobe}_{exp_dir1}_{version19}.index") |
|
|
|
infos.append(f"Successfully built index: added_IVF{n_ivf}_Flat_nprobe_{index_ivf.nprobe}_{exp_dir1}_{version19}.index") |
|
return "\n".join(infos) |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Tab("CREATE TRANING FILES - This will process the data, extract the features and create your index file for you!"): |
|
with gr.Row(): |
|
model_name = gr.Textbox(label="Model Name", value="My-Voice") |
|
dataset_folder = gr.Textbox(label="Dataset Folder", value="/content/dataset") |
|
youtube_link = gr.Textbox(label="YouTube Link (optional)") |
|
with gr.Row(): |
|
start_button = gr.Button("Create Training Files") |
|
f0method = gr.Dropdown(["pm", "harvest", "rmvpe", "rmvpe_gpu"], label="F0 Method", value="rmvpe_gpu") |
|
extract_button = gr.Button("Extract Features") |
|
train_button = gr.Button("Train Index") |
|
|
|
output = gr.Textbox(label="Output") |
|
|
|
start_button.click(create_training_files, inputs=[model_name, dataset_folder, youtube_link], outputs=output) |
|
extract_button.click(extract_features, inputs=[model_name, f0method], outputs=output) |
|
train_button.click(train_index, inputs=[model_name, "v2"], outputs=output) |
|
|
|
demo.launch() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|