import os import gradio as gr from pytube import YouTube from pydub import AudioSegment import numpy as np import faiss from sklearn.cluster import MiniBatchKMeans import traceback def calculate_audio_duration(file_path): duration_seconds = len(AudioSegment.from_file(file_path)) / 1000.0 return duration_seconds def youtube_to_wav(url, dataset_folder): try: yt = YouTube(url).streams.get_audio_only().download(output_path=dataset_folder) mp4_path = os.path.join(dataset_folder, 'audio.mp4') wav_path = os.path.join(dataset_folder, 'audio.wav') os.rename(yt, mp4_path) os.system(f'ffmpeg -i {mp4_path} -acodec pcm_s16le -ar 44100 {wav_path}') os.remove(mp4_path) return f'Audio downloaded and converted to WAV: {wav_path}' except Exception as e: return f"Error: {e}" def create_training_files(model_name, dataset_folder, youtube_link): if youtube_link: youtube_to_wav(youtube_link, dataset_folder) if not os.listdir(dataset_folder): return "Your dataset folder is empty." os.makedirs(f'./logs/{model_name}', exist_ok=True) os.system(f'python infer/modules/train/preprocess.py {dataset_folder} 32000 2 ./logs/{model_name} False 3.0 > /dev/null 2>&1') with open(f'./logs/{model_name}/preprocess.log', 'r') as f: if 'end preprocess' in f.read(): return "Preprocessing Success" else: return "Error preprocessing data... Make sure your dataset folder is correct." def extract_features(model_name, f0method): os.system(f'python infer/modules/train/extract/extract_f0_rmvpe.py 1 0 0 ./logs/{model_name} True' if f0method == "rmvpe_gpu" else f'python infer/modules/train/extract/extract_f0_print.py ./logs/{model_name} 2 {f0method}') os.system(f'python infer/modules/train/extract_feature_print.py cuda:0 1 0 ./logs/{model_name} v2 True') with open(f'./logs/{model_name}/extract_f0_feature.log', 'r') as f: if 'all-feature-done' in f.read(): return "Feature Extraction Success" else: return "Error in feature extraction... Make sure your data was preprocessed." def train_index(exp_dir1, version19): exp_dir = f"logs/{exp_dir1}" os.makedirs(exp_dir, exist_ok=True) feature_dir = f"{exp_dir}/3_feature256" if version19 == "v1" else f"{exp_dir}/3_feature768" if not os.path.exists(feature_dir): return "Please perform feature extraction first!" listdir_res = list(os.listdir(feature_dir)) if len(listdir_res) == 0: return "Please perform feature extraction first!" infos = [] npys = [] for name in sorted(listdir_res): phone = np.load(f"{feature_dir}/{name}") npys.append(phone) big_npy = np.concatenate(npys, 0) big_npy_idx = np.arange(big_npy.shape[0]) np.random.shuffle(big_npy_idx) big_npy = big_npy[big_npy_idx] if big_npy.shape[0] > 2e5: infos.append(f"Trying k-means with {big_npy.shape[0]} to 10k centers.") try: big_npy = MiniBatchKMeans( n_clusters=10000, verbose=True, batch_size=256, compute_labels=False, init="random", ).fit(big_npy).cluster_centers_ except: info = traceback.format_exc() infos.append(info) return "\n".join(infos) np.save(f"{exp_dir}/total_fea.npy", big_npy) n_ivf = min(int(16 * np.sqrt(big_npy.shape[0])), big_npy.shape[0] // 39) infos.append(f"{big_npy.shape},{n_ivf}") index = faiss.index_factory(256 if version19 == "v1" else 768, f"IVF{n_ivf},Flat") infos.append("Training index") index_ivf = faiss.extract_index_ivf(index) index_ivf.nprobe = 1 index.train(big_npy) faiss.write_index(index, f"{exp_dir}/trained_IVF{n_ivf}_Flat_nprobe_{index_ivf.nprobe}_{exp_dir1}_{version19}.index") infos.append("Adding to index") batch_size_add = 8192 for i in range(0, big_npy.shape[0], batch_size_add): index.add(big_npy[i: i + batch_size_add]) faiss.write_index(index, f"{exp_dir}/added_IVF{n_ivf}_Flat_nprobe_{index_ivf.nprobe}_{exp_dir1}_{version19}.index") infos.append(f"Successfully built index: added_IVF{n_ivf}_Flat_nprobe_{index_ivf.nprobe}_{exp_dir1}_{version19}.index") return "\n".join(infos) with gr.Blocks() as demo: with gr.Tab("CREATE TRANING FILES - This will process the data, extract the features and create your index file for you!"): with gr.Row(): model_name = gr.Textbox(label="Model Name", value="My-Voice") dataset_folder = gr.Textbox(label="Dataset Folder", value="/content/dataset") youtube_link = gr.Textbox(label="YouTube Link (optional)") with gr.Row(): start_button = gr.Button("Create Training Files") f0method = gr.Dropdown(["pm", "harvest", "rmvpe", "rmvpe_gpu"], label="F0 Method", value="rmvpe_gpu") extract_button = gr.Button("Extract Features") train_button = gr.Button("Train Index") output = gr.Textbox(label="Output") start_button.click(create_training_files, inputs=[model_name, dataset_folder, youtube_link], outputs=output) extract_button.click(extract_features, inputs=[model_name, f0method], outputs=output) train_button.click(train_index, inputs=[model_name, "v2"], outputs=output) demo.launch() # beta state ......