import subprocess import os import sys import errno import shutil import yt_dlp from mega import Mega import datetime import unicodedata import torch import glob import gradio as gr import gdown import zipfile import traceback import json import requests import wget import ffmpeg import hashlib now_dir = os.getcwd() sys.path.append(now_dir) from unidecode import unidecode import re import time from lib.infer_pack.models_onnx import SynthesizerTrnMsNSFsidM from vc_infer_pipeline import VC from lib.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from MDXNet import MDXNetDereverb from config import Config from infer_uvr5 import _audio_pre_, _audio_pre_new from huggingface_hub import HfApi, list_models from huggingface_hub import login from i18n import I18nAuto i18n = I18nAuto() from bs4 import BeautifulSoup from sklearn.cluster import MiniBatchKMeans config = Config() tmp = os.path.join(now_dir, "TEMP") shutil.rmtree(tmp, ignore_errors=True) os.environ["TEMP"] = tmp weight_root = "weights" weight_uvr5_root = "uvr5_weights" index_root = "./logs/" audio_root = "audios" names = [] for name in os.listdir(weight_root): if name.endswith(".pth"): names.append(name) index_paths = [] global indexes_list indexes_list = [] audio_paths = [] for root, dirs, files in os.walk(index_root, topdown=False): for name in files: if name.endswith(".index") and "trained" not in name: index_paths.append("%s\\%s" % (root, name)) for root, dirs, files in os.walk(audio_root, topdown=False): for name in files: audio_paths.append("%s/%s" % (root, name)) uvr5_names = [] for name in os.listdir(weight_uvr5_root): if name.endswith(".pth") or "onnx" in name: uvr5_names.append(name.replace(".pth", "")) def calculate_md5(file_path): hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def silentremove(filename): try: os.remove(filename) except OSError as e: # this would be "except OSError, e:" before Python 2.6 if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory raise # re-raise exception if a different error occurred def get_md5(temp_folder): for root, subfolders, files in os.walk(temp_folder): for file in files: if not file.startswith("G_") and not file.startswith("D_") and file.endswith(".pth") and not "_G_" in file and not "_D_" in file: md5_hash = calculate_md5(os.path.join(root, file)) return md5_hash return None def find_parent(search_dir, file_name): for dirpath, dirnames, filenames in os.walk(search_dir): if file_name in filenames: return os.path.abspath(dirpath) return None def find_folder_parent(search_dir, folder_name): for dirpath, dirnames, filenames in os.walk(search_dir): if folder_name in dirnames: return os.path.abspath(dirpath) return None def get_drive_folder_id(url): if "drive.google.com" in url: if "file/d/" in url: file_id = url.split("file/d/")[1].split("/")[0] elif "id=" in url: file_id = url.split("id=")[1].split("&")[0] else: return None def download_from_url(url): parent_path = find_folder_parent(".", "pretrained_v2") zips_path = os.path.join(parent_path, 'zips') if url != '': print(i18n("下载文件:") + f"{url}") if "drive.google.com" in url: if "file/d/" in url: file_id = url.split("file/d/")[1].split("/")[0] elif "id=" in url: file_id = url.split("id=")[1].split("&")[0] else: return None if file_id: os.chdir('./zips') result = subprocess.run(["gdown", f"https://drive.google.com/uc?id={file_id}", "--fuzzy"], capture_output=True, text=True, encoding='utf-8') if "Too many users have viewed or downloaded this file recently" in str(result.stderr): return "demasiado uso" if "Cannot retrieve the public link of the file." in str(result.stderr): return "link privado" print(result.stderr) elif "/blob/" in url: os.chdir('./zips') url = url.replace("blob", "resolve") # print("Resolved URL:", url) # Print the resolved URL wget.download(url) elif "mega.nz" in url: if "#!" in url: file_id = url.split("#!")[1].split("!")[0] elif "file/" in url: file_id = url.split("file/")[1].split("/")[0] else: return None if file_id: m = Mega() m.download_url(url, zips_path) elif "/tree/main" in url: response = requests.get(url) soup = BeautifulSoup(response.content, 'html.parser') temp_url = '' for link in soup.find_all('a', href=True): if link['href'].endswith('.zip'): temp_url = link['href'] break if temp_url: url = temp_url # print("Updated URL:", url) # Print the updated URL url = url.replace("blob", "resolve") # print("Resolved URL:", url) # Print the resolved URL if "huggingface.co" not in url: url = "https://huggingface.co" + url wget.download(url) else: print("No .zip file found on the page.") # Handle the case when no .zip file is found else: os.chdir('./zips') wget.download(url) os.chdir(parent_path) print(i18n("完整下载")) return "downloaded" else: return None class error_message(Exception): def __init__(self, mensaje): self.mensaje = mensaje super().__init__(mensaje) # 一个选项卡全局只能有一个音色 def get_vc(sid, to_return_protect0, to_return_protect1): global n_spk, tgt_sr, net_g, vc, cpt, version if sid == "" or sid == []: global hubert_model if hubert_model is not None: # 考虑到轮询, 需要加个判断看是否 sid 是由有模型切换到无模型的 print("clean_empty_cache") del net_g, n_spk, vc, hubert_model, tgt_sr # ,cpt hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None if torch.cuda.is_available(): torch.cuda.empty_cache() ###楼下不这么折腾清理不干净 if_f0 = cpt.get("f0", 1) version = cpt.get("version", "v1") if version == "v1": if if_f0 == 1: net_g = SynthesizerTrnMs256NSFsid( *cpt["config"], is_half=config.is_half ) else: net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) elif version == "v2": if if_f0 == 1: net_g = SynthesizerTrnMs768NSFsid( *cpt["config"], is_half=config.is_half ) else: net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) del net_g, cpt if torch.cuda.is_available(): torch.cuda.empty_cache() cpt = None return ( {"visible": False, "__type__": "update"}, {"visible": False, "__type__": "update"}, {"visible": False, "__type__": "update"}, ) person = "%s/%s" % (weight_root, sid) print("loading %s" % person) cpt = torch.load(person, map_location="cpu") tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk if_f0 = cpt.get("f0", 1) if if_f0 == 0: to_return_protect0 = to_return_protect1 = { "visible": False, "value": 0.5, "__type__": "update", } else: to_return_protect0 = { "visible": True, "value": to_return_protect0, "__type__": "update", } to_return_protect1 = { "visible": True, "value": to_return_protect1, "__type__": "update", } version = cpt.get("version", "v1") if version == "v1": if if_f0 == 1: net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) elif version == "v2": if if_f0 == 1: net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) else: net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) del net_g.enc_q print(net_g.load_state_dict(cpt["weight"], strict=False)) net_g.eval().to(config.device) if config.is_half: net_g = net_g.half() else: net_g = net_g.float() vc = VC(tgt_sr, config) n_spk = cpt["config"][-3] return ( {"visible": True, "maximum": n_spk, "__type__": "update"}, to_return_protect0, to_return_protect1, ) def load_downloaded_model(url): parent_path = find_folder_parent(".", "pretrained_v2") try: infos = [] logs_folders = ['0_gt_wavs','1_16k_wavs','2a_f0','2b-f0nsf','3_feature256','3_feature768'] zips_path = os.path.join(parent_path, 'zips') unzips_path = os.path.join(parent_path, 'unzips') weights_path = os.path.join(parent_path, 'weights') logs_dir = "" if os.path.exists(zips_path): shutil.rmtree(zips_path) if os.path.exists(unzips_path): shutil.rmtree(unzips_path) os.mkdir(zips_path) os.mkdir(unzips_path) download_file = download_from_url(url) if not download_file: print(i18n("无法下载模型。")) infos.append(i18n("无法下载模型。")) yield "\n".join(infos) elif download_file == "downloaded": print(i18n("模型下载成功。")) infos.append(i18n("模型下载成功。")) yield "\n".join(infos) elif download_file == "demasiado uso": raise Exception(i18n("最近查看或下载此文件的用户过多")) elif download_file == "link privado": raise Exception(i18n("无法从该私人链接获取文件")) # Descomprimir archivos descargados for filename in os.listdir(zips_path): if filename.endswith(".zip"): zipfile_path = os.path.join(zips_path,filename) print(i18n("继续提取...")) infos.append(i18n("继续提取...")) shutil.unpack_archive(zipfile_path, unzips_path, 'zip') model_name = os.path.basename(zipfile_path) logs_dir = os.path.join(parent_path,'logs', os.path.normpath(str(model_name).replace(".zip",""))) yield "\n".join(infos) else: print(i18n("解压缩出错。")) infos.append(i18n("解压缩出错。")) yield "\n".join(infos) index_file = False model_file = False D_file = False G_file = False # Copiar archivo pth for path, subdirs, files in os.walk(unzips_path): for item in files: item_path = os.path.join(path, item) if not 'G_' in item and not 'D_' in item and item.endswith('.pth'): model_file = True model_name = item.replace(".pth","") logs_dir = os.path.join(parent_path,'logs', model_name) if os.path.exists(logs_dir): shutil.rmtree(logs_dir) os.mkdir(logs_dir) if not os.path.exists(weights_path): os.mkdir(weights_path) if os.path.exists(os.path.join(weights_path, item)): os.remove(os.path.join(weights_path, item)) if os.path.exists(item_path): shutil.move(item_path, weights_path) if not model_file and not os.path.exists(logs_dir): os.mkdir(logs_dir) # Copiar index for path, subdirs, files in os.walk(unzips_path): for item in files: item_path = os.path.join(path, item) if item.startswith('added_') and item.endswith('.index'): index_file = True if os.path.exists(item_path): if os.path.exists(os.path.join(logs_dir, item)): os.remove(os.path.join(logs_dir, item)) shutil.move(item_path, logs_dir) if item.startswith('total_fea.npy') or item.startswith('events.'): if os.path.exists(item_path): if os.path.exists(os.path.join(logs_dir, item)): os.remove(os.path.join(logs_dir, item)) shutil.move(item_path, logs_dir) result = "" if model_file: if index_file: print(i18n("该模型可用于推理,并有 .index 文件。")) infos.append("\n" + i18n("该模型可用于推理,并有 .index 文件。")) yield "\n".join(infos) else: print(i18n("该模型可用于推理,但没有 .index 文件。")) infos.append("\n" + i18n("该模型可用于推理,但没有 .index 文件。")) yield "\n".join(infos) if not index_file and not model_file: print(i18n("未找到可上传的相关文件")) infos.append(i18n("未找到可上传的相关文件")) yield "\n".join(infos) if os.path.exists(zips_path): shutil.rmtree(zips_path) if os.path.exists(unzips_path): shutil.rmtree(unzips_path) os.chdir(parent_path) return result except Exception as e: os.chdir(parent_path) if "demasiado uso" in str(e): print(i18n("最近查看或下载此文件的用户过多")) yield i18n("最近查看或下载此文件的用户过多") elif "link privado" in str(e): print(i18n("无法从该私人链接获取文件")) yield i18n("无法从该私人链接获取文件") else: print(e) yield i18n("下载模型时发生错误。") finally: os.chdir(parent_path) def load_dowloaded_dataset(url): parent_path = find_folder_parent(".", "pretrained_v2") infos = [] try: zips_path = os.path.join(parent_path, 'zips') unzips_path = os.path.join(parent_path, 'unzips') datasets_path = os.path.join(parent_path, 'datasets') audio_extenions =["flac","wav"] if os.path.exists(zips_path): shutil.rmtree(zips_path) if os.path.exists(unzips_path): shutil.rmtree(unzips_path) if not os.path.exists(datasets_path): os.mkdir(datasets_path) os.mkdir(zips_path) os.mkdir(unzips_path) download_file = download_from_url(url) if not download_file: print(i18n("下载模型时发生错误。")) infos.append(i18n("下载模型时发生错误。")) yield "\n".join(infos) raise Exception(i18n("下载模型时发生错误。")) elif download_file == "downloaded": print(i18n("模型下载成功。")) infos.append(i18n("模型下载成功。")) yield "\n".join(infos) elif download_file == "demasiado uso": raise Exception(i18n("最近查看或下载此文件的用户过多")) elif download_file == "link privado": raise Exception(i18n("无法从该私人链接获取文件")) zip_path = os.listdir(zips_path) foldername = "" for file in zip_path: if file.endswith('.zip'): file_path = os.path.join(zips_path, file) print("....") foldername = file.replace(".zip","").replace(" ","").replace("-","_") dataset_path = os.path.join(datasets_path, foldername) print(i18n("继续提取...")) infos.append(i18n("继续提取...")) yield "\n".join(infos) shutil.unpack_archive(file_path, unzips_path, 'zip') if os.path.exists(dataset_path): shutil.rmtree(dataset_path) os.mkdir(dataset_path) for root, subfolders, songs in os.walk(unzips_path): for song in songs: song_path = os.path.join(root, song) if song.endswith(tuple(audio_extenions)): shutil.move(song_path, dataset_path) else: print(i18n("解压缩出错。")) infos.append(i18n("解压缩出错。")) yield "\n".join(infos) if os.path.exists(zips_path): shutil.rmtree(zips_path) if os.path.exists(unzips_path): shutil.rmtree(unzips_path) print(i18n("数据集加载成功。")) infos.append(i18n("数据集加载成功。")) yield "\n".join(infos) except Exception as e: os.chdir(parent_path) if "demasiado uso" in str(e): print(i18n("最近查看或下载此文件的用户过多")) yield i18n("最近查看或下载此文件的用户过多") elif "link privado" in str(e): print(i18n("无法从该私人链接获取文件")) yield i18n("无法从该私人链接获取文件") else: print(e) yield i18n("下载模型时发生错误。") finally: os.chdir(parent_path) def save_model(modelname, save_action): parent_path = find_folder_parent(".", "pretrained_v2") zips_path = os.path.join(parent_path, 'zips') dst = os.path.join(zips_path,modelname) logs_path = os.path.join(parent_path, 'logs', modelname) weights_path = os.path.join(parent_path, 'weights', f"{modelname}.pth") save_folder = parent_path infos = [] try: if not os.path.exists(logs_path): raise Exception("No model found.") if not 'content' in parent_path: save_folder = os.path.join(parent_path, 'RVC_Backup') else: save_folder = '/content/drive/MyDrive/RVC_Backup' infos.append(i18n("保存模型...")) yield "\n".join(infos) # Si no existe el folder RVC para guardar los modelos if not os.path.exists(save_folder): os.mkdir(save_folder) if not os.path.exists(os.path.join(save_folder, 'ManualTrainingBackup')): os.mkdir(os.path.join(save_folder, 'ManualTrainingBackup')) if not os.path.exists(os.path.join(save_folder, 'Finished')): os.mkdir(os.path.join(save_folder, 'Finished')) # Si ya existe el folders zips borro su contenido por si acaso if os.path.exists(zips_path): shutil.rmtree(zips_path) os.mkdir(zips_path) added_file = glob.glob(os.path.join(logs_path, "added_*.index")) d_file = glob.glob(os.path.join(logs_path, "D_*.pth")) g_file = glob.glob(os.path.join(logs_path, "G_*.pth")) if save_action == i18n("选择模型保存方法"): raise Exception("No method choosen.") if save_action == i18n("保存所有"): print(i18n("保存所有")) save_folder = os.path.join(save_folder, 'ManualTrainingBackup') shutil.copytree(logs_path, dst) else: # Si no existe el folder donde se va a comprimir el modelo if not os.path.exists(dst): os.mkdir(dst) if save_action == i18n("保存 D 和 G"): print(i18n("保存 D 和 G")) save_folder = os.path.join(save_folder, 'ManualTrainingBackup') if len(d_file) > 0: shutil.copy(d_file[0], dst) if len(g_file) > 0: shutil.copy(g_file[0], dst) if len(added_file) > 0: shutil.copy(added_file[0], dst) else: infos.append(i18n("保存时未编制索引...")) if save_action == i18n("保存声音"): print(i18n("保存声音")) save_folder = os.path.join(save_folder, 'Finished') if len(added_file) > 0: shutil.copy(added_file[0], dst) else: infos.append(i18n("保存时未编制索引...")) #raise gr.Error("¡No ha generado el archivo added_*.index!") yield "\n".join(infos) # Si no existe el archivo del modelo no copiarlo if not os.path.exists(weights_path): infos.append(i18n("无模型保存(PTH)")) #raise gr.Error("¡No ha generado el modelo pequeño!") else: shutil.copy(weights_path, dst) yield "\n".join(infos) infos.append("\n" + i18n("这可能需要几分钟时间,请稍候...")) yield "\n".join(infos) shutil.make_archive(os.path.join(zips_path,f"{modelname}"), 'zip', zips_path) shutil.move(os.path.join(zips_path,f"{modelname}.zip"), os.path.join(save_folder, f'{modelname}.zip')) shutil.rmtree(zips_path) #shutil.rmtree(zips_path) infos.append("\n" + i18n("正确存储模型")) yield "\n".join(infos) except Exception as e: print(e) if "No model found." in str(e): infos.append(i18n("您要保存的模型不存在,请确保输入的名称正确。")) else: infos.append(i18n("保存模型时发生错误")) yield "\n".join(infos) def load_downloaded_backup(url): parent_path = find_folder_parent(".", "pretrained_v2") try: infos = [] logs_folders = ['0_gt_wavs','1_16k_wavs','2a_f0','2b-f0nsf','3_feature256','3_feature768'] zips_path = os.path.join(parent_path, 'zips') unzips_path = os.path.join(parent_path, 'unzips') weights_path = os.path.join(parent_path, 'weights') logs_dir = os.path.join(parent_path, 'logs') if os.path.exists(zips_path): shutil.rmtree(zips_path) if os.path.exists(unzips_path): shutil.rmtree(unzips_path) os.mkdir(zips_path) os.mkdir(unzips_path) download_file = download_from_url(url) if not download_file: print(i18n("无法下载模型。")) infos.append(i18n("无法下载模型。")) yield "\n".join(infos) elif download_file == "downloaded": print(i18n("模型下载成功。")) infos.append(i18n("模型下载成功。")) yield "\n".join(infos) elif download_file == "demasiado uso": raise Exception(i18n("最近查看或下载此文件的用户过多")) elif download_file == "link privado": raise Exception(i18n("无法从该私人链接获取文件")) # Descomprimir archivos descargados for filename in os.listdir(zips_path): if filename.endswith(".zip"): zipfile_path = os.path.join(zips_path,filename) zip_dir_name = os.path.splitext(filename)[0] unzip_dir = unzips_path print(i18n("继续提取...")) infos.append(i18n("继续提取...")) shutil.unpack_archive(zipfile_path, unzip_dir, 'zip') if os.path.exists(os.path.join(unzip_dir, zip_dir_name)): # Move the inner directory with the same name shutil.move(os.path.join(unzip_dir, zip_dir_name), logs_dir) else: # Create a folder with the same name and move files new_folder_path = os.path.join(logs_dir, zip_dir_name) os.mkdir(new_folder_path) for item_name in os.listdir(unzip_dir): item_path = os.path.join(unzip_dir, item_name) if os.path.isfile(item_path): shutil.move(item_path, new_folder_path) elif os.path.isdir(item_path): shutil.move(item_path, new_folder_path) yield "\n".join(infos) else: print(i18n("解压缩出错。")) infos.append(i18n("解压缩出错。")) yield "\n".join(infos) result = "" for filename in os.listdir(unzips_path): if filename.endswith(".zip"): silentremove(filename) if os.path.exists(zips_path): shutil.rmtree(zips_path) if os.path.exists(os.path.join(parent_path, 'unzips')): shutil.rmtree(os.path.join(parent_path, 'unzips')) print(i18n("备份已成功上传。")) infos.append("\n" + i18n("备份已成功上传。")) yield "\n".join(infos) os.chdir(parent_path) return result except Exception as e: os.chdir(parent_path) if "demasiado uso" in str(e): print(i18n("最近查看或下载此文件的用户过多")) yield i18n("最近查看或下载此文件的用户过多") elif "link privado" in str(e): print(i18n("无法从该私人链接获取文件")) yield i18n("无法从该私人链接获取文件") else: print(e) yield i18n("下载模型时发生错误。") finally: os.chdir(parent_path) def save_to_wav(record_button): if record_button is None: pass else: path_to_file=record_button new_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")+'.wav' new_path='./audios/'+new_name shutil.move(path_to_file,new_path) return new_name def save_to_wav2(dropbox): file_path = dropbox.name target_path = os.path.join('./audios', os.path.basename(file_path)) if os.path.exists(target_path): os.remove(target_path) # print('Replacing old dropdown file...') shutil.move(file_path, target_path) return target_path def change_choices2(): audio_paths=[] for filename in os.listdir("./audios"): if filename.endswith(('wav', 'mp3', 'flac', 'ogg', 'opus', 'm4a', 'mp4', 'aac', 'alac', 'wma', 'aiff', 'webm', 'ac3')): audio_paths.append(os.path.join('./audios',filename).replace('\\', '/')) return {"choices": sorted(audio_paths), "__type__": "update"}, {"__type__": "update"} def get_models_by_name(modelname): url = "https://script.google.com/macros/s/AKfycbzyrdLZzUww9qbjxnbnI08budD4yxbmRPHkWbp3UEJ9h3Id5cnNNVg0UtfFAnqqX5Rr/exec" response = requests.post(url, json={ 'type': 'search_by_filename', 'filename': unidecode(modelname.strip().lower()) }) response_json = response.json() models = response_json['ocurrences'] result = [] message = "Busqueda realizada" if len(models) == 0: message = "No se han encontrado resultados." else: message = f"Se han encontrado {len(models)} resultados para {modelname}" for i in range(20): if i < len(models): urls = models[i].get('url') url = eval(urls)[0] name = str(models[i].get('name')) filename = str(models[i].get('filename')) if not name or name.strip() == "" else name # Nombre result.append( { "visible": True, "value": str("### ") + filename, "__type__": "update", }) # Url result.append( { "visible": False, "value": url, "__type__": "update", }) # Boton result.append({ "visible": True, "__type__": "update", }) # Linea separadora if i == len(models) - 1: result.append({ "visible": False, "__type__": "update", }) else: result.append({ "visible": True, "__type__": "update", }) # Row result.append( { "visible": True, "__type__": "update", }) else: # Nombre result.append( { "visible": False, "__type__": "update", }) # Url result.append( { "visible": False, "value": False, "__type__": "update", }) # Boton result.append({ "visible": False, "__type__": "update", }) # Linea result.append({ "visible": False, "__type__": "update", }) # Row result.append( { "visible": False, "__type__": "update", }) # Result result.append( { "value": message, "__type__": "update", } ) return result def search_model(): gr.Markdown(value="# Buscar un modelo") with gr.Row(): model_name = gr.inputs.Textbox(lines=1, label="Término de búsqueda") search_model_button=gr.Button("Buscar modelo") models = [] results = gr.Textbox(label="Resultado", value="", max_lines=20) with gr.Row(visible=False) as row1: l1 = gr.Markdown(value="", visible=False) l1_url = gr.Textbox("Label 1", visible=False) b1 = gr.Button("Cargar modelo", visible=False) mk1 = gr.Markdown(value="---", visible=False) b1.click(fn=load_downloaded_model, inputs=l1_url, outputs=results) with gr.Row(visible=False) as row2: l2 = gr.Markdown(value="", visible=False) l2_url = gr.Textbox("Label 1", visible=False) b2 = gr.Button("Cargar modelo", visible=False) mk2 = gr.Markdown(value="---", visible=False) b2.click(fn=load_downloaded_model, inputs=l2_url, outputs=results) with gr.Row(visible=False) as row3: l3 = gr.Markdown(value="", visible=False) l3_url = gr.Textbox("Label 1", visible=False) b3 = gr.Button("Cargar modelo", visible=False) mk3 = gr.Markdown(value="---", visible=False) b3.click(fn=load_downloaded_model, inputs=l3_url, outputs=results) with gr.Row(visible=False) as row4: l4 = gr.Markdown(value="", visible=False) l4_url = gr.Textbox("Label 1", visible=False) b4 = gr.Button("Cargar modelo", visible=False) mk4 = gr.Markdown(value="---", visible=False) b4.click(fn=load_downloaded_model, inputs=l4_url, outputs=results) with gr.Row(visible=False) as row5: l5 = gr.Markdown(value="", visible=False) l5_url = gr.Textbox("Label 1", visible=False) b5 = gr.Button("Cargar modelo", visible=False) mk5 = gr.Markdown(value="---", visible=False) b5.click(fn=load_downloaded_model, inputs=l5_url, outputs=results) with gr.Row(visible=False) as row6: l6 = gr.Markdown(value="", visible=False) l6_url = gr.Textbox("Label 1", visible=False) b6 = gr.Button("Cargar modelo", visible=False) mk6 = gr.Markdown(value="---", visible=False) b6.click(fn=load_downloaded_model, inputs=l6_url, outputs=results) with gr.Row(visible=False) as row7: l7 = gr.Markdown(value="", visible=False) l7_url = gr.Textbox("Label 1", visible=False) b7 = gr.Button("Cargar modelo", visible=False) mk7 = gr.Markdown(value="---", visible=False) b7.click(fn=load_downloaded_model, inputs=l7_url, outputs=results) with gr.Row(visible=False) as row8: l8 = gr.Markdown(value="", visible=False) l8_url = gr.Textbox("Label 1", visible=False) b8 = gr.Button("Cargar modelo", visible=False) mk8 = gr.Markdown(value="---", visible=False) b8.click(fn=load_downloaded_model, inputs=l8_url, outputs=results) with gr.Row(visible=False) as row9: l9 = gr.Markdown(value="", visible=False) l9_url = gr.Textbox("Label 1", visible=False) b9 = gr.Button("Cargar modelo", visible=False) mk9 = gr.Markdown(value="---", visible=False) b9.click(fn=load_downloaded_model, inputs=l9_url, outputs=results) with gr.Row(visible=False) as row10: l10 = gr.Markdown(value="", visible=False) l10_url = gr.Textbox("Label 1", visible=False) b10 = gr.Button("Cargar modelo", visible=False) mk10 = gr.Markdown(value="---", visible=False) b10.click(fn=load_downloaded_model, inputs=l10_url, outputs=results) with gr.Row(visible=False) as row11: l11 = gr.Markdown(value="", visible=False) l11_url = gr.Textbox("Label 1", visible=False) b11 = gr.Button("Cargar modelo", visible=False) mk11 = gr.Markdown(value="---", visible=False) b11.click(fn=load_downloaded_model, inputs=l11_url, outputs=results) with gr.Row(visible=False) as row12: l12 = gr.Markdown(value="", visible=False) l12_url = gr.Textbox("Label 1", visible=False) b12 = gr.Button("Cargar modelo", visible=False) mk12 = gr.Markdown(value="---", visible=False) b12.click(fn=load_downloaded_model, inputs=l12_url, outputs=results) with gr.Row(visible=False) as row13: l13 = gr.Markdown(value="", visible=False) l13_url = gr.Textbox("Label 1", visible=False) b13 = gr.Button("Cargar modelo", visible=False) mk13 = gr.Markdown(value="---", visible=False) b13.click(fn=load_downloaded_model, inputs=l13_url, outputs=results) with gr.Row(visible=False) as row14: l14 = gr.Markdown(value="", visible=False) l14_url = gr.Textbox("Label 1", visible=False) b14 = gr.Button("Cargar modelo", visible=False) mk14 = gr.Markdown(value="---", visible=False) b14.click(fn=load_downloaded_model, inputs=l14_url, outputs=results) with gr.Row(visible=False) as row15: l15 = gr.Markdown(value="", visible=False) l15_url = gr.Textbox("Label 1", visible=False) b15 = gr.Button("Cargar modelo", visible=False) mk15 = gr.Markdown(value="---", visible=False) b15.click(fn=load_downloaded_model, inputs=l15_url, outputs=results) with gr.Row(visible=False) as row16: l16 = gr.Markdown(value="", visible=False) l16_url = gr.Textbox("Label 1", visible=False) b16 = gr.Button("Cargar modelo", visible=False) mk16 = gr.Markdown(value="---", visible=False) b16.click(fn=load_downloaded_model, inputs=l16_url, outputs=results) with gr.Row(visible=False) as row17: l17 = gr.Markdown(value="", visible=False) l17_url = gr.Textbox("Label 1", visible=False) b17 = gr.Button("Cargar modelo", visible=False) mk17 = gr.Markdown(value="---", visible=False) b17.click(fn=load_downloaded_model, inputs=l17_url, outputs=results) with gr.Row(visible=False) as row18: l18 = gr.Markdown(value="", visible=False) l18_url = gr.Textbox("Label 1", visible=False) b18 = gr.Button("Cargar modelo", visible=False) mk18 = gr.Markdown(value="---", visible=False) b18.click(fn=load_downloaded_model, inputs=l18_url, outputs=results) with gr.Row(visible=False) as row19: l19 = gr.Markdown(value="", visible=False) l19_url = gr.Textbox("Label 1", visible=False) b19 = gr.Button("Cargar modelo", visible=False) mk19 = gr.Markdown(value="---", visible=False) b19.click(fn=load_downloaded_model, inputs=l19_url, outputs=results) with gr.Row(visible=False) as row20: l20 = gr.Markdown(value="", visible=False) l20_url = gr.Textbox("Label 1", visible=False) b20 = gr.Button("Cargar modelo", visible=False) mk20 = gr.Markdown(value="---", visible=False) b20.click(fn=load_downloaded_model, inputs=l20_url, outputs=results) # to_return_protect1 = search_model_button.click(fn=get_models_by_name, inputs=model_name, outputs=[l1,l1_url, b1, mk1, row1, l2,l2_url, b2, mk2, row2, l3,l3_url, b3, mk3, row3, l4,l4_url, b4, mk4, row4, l5,l5_url, b5, mk5, row5, l6,l6_url, b6, mk6, row6, l7,l7_url, b7, mk7, row7, l8,l8_url, b8, mk8, row8, l9,l9_url, b9, mk9, row9, l10,l10_url, b10, mk10, row10, l11,l11_url, b11, mk11, row11, l12,l12_url, b12, mk12, row12, l13,l13_url, b13, mk13, row13, l14,l14_url, b14, mk14, row14, l15,l15_url, b15, mk15, row15, l16,l16_url, b16, mk16, row16, l17,l17_url, b17, mk17, row17, l18,l18_url, b18, mk18, row18, l19,l19_url, b19, mk19, row19, l20,l20_url, b20, mk20, row20, results ]) def descargar_desde_drive(url, name, output_file): print(f"Descargando {name} de drive") try: downloaded_file = gdown.download(url, output=output_file, fuzzy=True) return downloaded_file except: print("El intento de descargar con drive no funcionó") return None def descargar_desde_mega(url, name): response = False try: file_id = None if "#!" in url: file_id = url.split("#!")[1].split("!")[0] elif "file/" in url: file_id = url.split("file/")[1].split("/")[0] else: file_id = None if file_id: mega = Mega() m = mega.login() print(f"Descargando {name} de mega") downloaded_file = m.download_url(url) return downloaded_file else: return None except Exception as e: print("Error**") print(e) return None def descargar_desde_url_basica(url, name, output_file): try: print(f"Descargando {name} de URL BASICA") filename = wget.download(url=url, out=output_file) return filename except Exception as e: print(f"Error al descargar el archivo: {str(e)}") def is_valid_model(name): parent_path = find_folder_parent(".", "pretrained_v2") unzips_path = os.path.join(parent_path, 'unzips') response = [] file_path = os.path.join(unzips_path, name) has_model = False has_index = False for root, subfolders, files in os.walk(file_path): for file in files: current_file_path = os.path.join(root, file) if not file.startswith("G_") and not file.startswith("D_") and file.endswith(".pth") and not "_G_" in file and not "_D_" in file: has_model = True if file.startswith('added_') and file.endswith('.index'): has_index = True #if has_model and has_index: if has_index: response.append(".index") if has_model: response.append(".pth") return response def create_zip(new_name): parent_path = find_folder_parent(".", "pretrained_v2") temp_folder_path = os.path.join(parent_path, 'temp_models') unzips_path = os.path.join(parent_path, 'unzips') zips_path = os.path.join(parent_path, 'zips') file_path = os.path.join(unzips_path, new_name) file_name = os.path.join(temp_folder_path, new_name) if not os.path.exists(zips_path): os.mkdir(zips_path) if os.path.exists(file_name): shutil.rmtree(file_name) os.mkdir(file_name) while not os.path.exists(file_name): time.sleep(1) for root, subfolders, files in os.walk(file_path): for file in files: current_file_path = os.path.join(root, file) if not file.startswith("G_") and not file.startswith("D_") and file.endswith(".pth") and not "_G_" in file and not "_D_" in file: print(f'Copiando {current_file_path} a {os.path.join(temp_folder_path, new_name)}') shutil.copy(current_file_path, file_name) if file.startswith('added_') and file.endswith('.index'): print(f'Copiando {current_file_path} a {os.path.join(temp_folder_path, new_name)}') shutil.copy(current_file_path, file_name) print("Comprimiendo modelo") zip_path = os.path.join(zips_path, new_name) print(f"Comprimiendo {file_name} en {zip_path}") shutil.make_archive(zip_path, 'zip', file_name) def upload_to_huggingface(file_path, new_filename): api = HfApi() login(token="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") api.upload_file( path_or_fileobj=file_path, path_in_repo=new_filename, repo_id="juuxn/RVCModels", repo_type="model", ) return f"https://huggingface.co/juuxn/RVCModels/resolve/main/{new_filename}" def publish_model_clicked(model_name, model_url, model_version, model_creator): web_service_url = "https://script.google.com/macros/s/AKfycbzyrdLZzUww9qbjxnbnI08budD4yxbmRPHkWbp3UEJ9h3Id5cnNNVg0UtfFAnqqX5Rr/exec" name = unidecode(model_name) new_name = unidecode(name.strip().replace(" ","_").replace("'","")) downloaded_path = "" url = model_url version = model_version creator = model_creator parent_path = find_folder_parent(".", "pretrained_v2") output_folder = os.path.join(parent_path, 'archivos_descargados') output_file = os.path.join(output_folder, f'{new_name}.zip') unzips_path = os.path.join(parent_path, 'unzips') zips_path = os.path.join(parent_path, 'zips') temp_folder_path = os.path.join(parent_path, 'temp_models') if os.path.exists(output_folder): shutil.rmtree(output_folder) os.mkdir(output_folder) if os.path.exists(temp_folder_path): shutil.rmtree(temp_folder_path) os.mkdir(temp_folder_path) if url and 'drive.google.com' in url: # Descargar el elemento si la URL es de Google Drive downloaded_path = descargar_desde_drive(url, new_name, output_file) elif url and 'mega.nz' in url: downloaded_path = descargar_desde_mega(url, new_name, output_file) elif url and 'pixeldrain' in url: print("No se puede descargar de pixeldrain") else: downloaded_path = descargar_desde_url_basica(url, new_name, output_file) if not downloaded_path: print(f"No se pudo descargar: {name}") else: filename = name.strip().replace(" ","_") dst =f'{filename}.zip' shutil.unpack_archive(downloaded_path, os.path.join(unzips_path, filename)) md5_hash = get_md5(os.path.join(unzips_path, filename)) if not md5_hash: print("No tiene modelo pequeño") return md5_response_raw = requests.post(web_service_url, json={ 'type': 'check_md5', 'md5_hash': md5_hash }) md5_response = md5_response_raw.json() ok = md5_response["ok"] exists = md5_response["exists"] message = md5_response["message"] is_valid = is_valid_model(filename) if md5_hash and exists: print(f"El archivo ya se ha publicado en spreadsheet con md5: {md5_hash}") return f"El archivo ya se ha publicado con md5: {md5_hash}" if ".pth" in is_valid and not exists: create_zip(filename) huggingface_url = upload_to_huggingface(os.path.join(zips_path,dst), dst) response = requests.post(web_service_url, json={ 'type': 'save_model', 'elements': [{ 'name': name, 'filename': filename, 'url': [huggingface_url], 'version': version, 'creator': creator, 'md5_hash': md5_hash, 'content': is_valid }]}) response_data = response.json() ok = response_data["ok"] message = response_data["message"] print({ 'name': name, 'filename': filename, 'url': [huggingface_url], 'version': version, 'creator': creator, 'md5_hash': md5_hash, 'content': is_valid }) if ok: return f"El archivo se ha publicado con md5: {md5_hash}" else: print(message) return message # Eliminar folder donde se decarga el modelo zip if os.path.exists(output_folder): shutil.rmtree(output_folder) # Eliminar folder de zips, donde se descomprimio el modelo descargado if os.path.exists(unzips_path): shutil.rmtree(unzips_path) # Eliminar folder donde se copiaron los archivos indispensables del modelo if os.path.exists(temp_folder_path): shutil.rmtree(temp_folder_path) # Eliminar folder donde se comprimio el modelo para enviarse a huggingface if os.path.exists(zips_path): shutil.rmtree(zips_path) def uvr(input_url, output_path, model_name, inp_root, save_root_vocal, paths, save_root_ins, agg, format0): carpeta_a_eliminar = "yt_downloads" if os.path.exists(carpeta_a_eliminar) and os.path.isdir(carpeta_a_eliminar): # Eliminar todos los archivos en la carpeta for archivo in os.listdir(carpeta_a_eliminar): ruta_archivo = os.path.join(carpeta_a_eliminar, archivo) if os.path.isfile(ruta_archivo): os.remove(ruta_archivo) elif os.path.isdir(ruta_archivo): shutil.rmtree(ruta_archivo) # Eliminar subcarpetas recursivamente def format_title(title): # Eliminar caracteres no alfanuméricos excepto espacios y guiones bajos formatted_title = re.sub(r'[^\w\s-]', '', title) # Reemplazar espacios por guiones bajos formatted_title = formatted_title.replace(" ", "_") return formatted_title ydl_opts = { 'no-windows-filenames': True, 'restrict-filenames': True, 'extract_audio': True, 'format': 'bestaudio', } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(input_url, download=False) formatted_title = format_title(info_dict.get('title', 'default_title')) formatted_outtmpl = output_path + '/' + formatted_title + '.wav' ydl_opts['outtmpl'] = formatted_outtmpl ydl = yt_dlp.YoutubeDL(ydl_opts) ydl.download([input_url]) infos = [] pre_fun = None try: inp_root, save_root_vocal, save_root_ins = [x.strip(" ").strip('"').strip("\n").strip('"').strip(" ") if isinstance(x, str) else x for x in [inp_root, save_root_vocal, save_root_ins]] if model_name == "onnx_dereverb_By_FoxJoy": pre_fun = MDXNetDereverb(15) else: func = _audio_pre_ if "DeEcho" not in model_name else _audio_pre_new pre_fun = func( agg=10, model_path=os.path.join(weight_uvr5_root, model_name + ".pth"), device=config.device, is_half=config.is_half, ) if inp_root != "": paths = [os.path.join(inp_root, name) for name in os.listdir(inp_root)] else: paths = [path.name for path in paths] for path in paths: inp_path = os.path.join(inp_root, path) need_reformat = 1 done = 0 try: info = ffmpeg.probe(inp_path, cmd="ffprobe") if ( info["streams"][0]["channels"] == 2 and info["streams"][0]["sample_rate"] == "44100" ): need_reformat = 0 pre_fun._path_audio_( inp_path, save_root_ins, save_root_vocal, format0 ) done = 1 except: need_reformat = 1 traceback.print_exc() if need_reformat == 1: tmp_path = "%s/%s.reformatted.wav" % (tmp, os.path.basename(inp_path)) os.system( "ffmpeg -i %s -vn -acodec pcm_s16le -ac 2 -ar 44100 %s -y" % (inp_path, tmp_path) ) inp_path = tmp_path try: if done == 0: pre_fun._path_audio_( inp_path, save_root_ins, save_root_vocal, format0 ) infos.append("%s->Success" % (os.path.basename(inp_path))) yield "\n".join(infos) except: infos.append( "%s->%s" % (os.path.basename(inp_path), traceback.format_exc()) ) yield "\n".join(infos) except: infos.append(traceback.format_exc()) yield "\n".join(infos) finally: try: if pre_fun is not None: # Verificar si pre_fun existe antes de eliminarlo if model_name == "onnx_dereverb_By_FoxJoy": del pre_fun.pred.model del pre_fun.pred.model_ else: del pre_fun.model del pre_fun except: traceback.print_exc() print("clean_empty_cache") if torch.cuda.is_available(): torch.cuda.empty_cache() yield "\n".join(infos) def publish_models(): with gr.Column(): gr.Markdown("# Publicar un modelo en la comunidad") gr.Markdown("El modelo se va a verificar antes de publicarse. Importante que contenga el archivo **.pth** del modelo para que no sea rechazado.") model_name = gr.inputs.Textbox(lines=1, label="Nombre descriptivo del modelo Ej: (Ben 10 [Latino] - RVC V2 - 250 Epoch)") url = gr.inputs.Textbox(lines=1, label="Enlace del modelo") moder_version = gr.Radio( label="Versión", choices=["RVC v1", "RVC v2"], value="RVC v1", interactive=True, ) model_creator = gr.inputs.Textbox(lines=1, label="ID de discord del creador del modelo Ej: <@123455656>") publish_model_button=gr.Button("Publicar modelo") results = gr.Textbox(label="Resultado", value="", max_lines=20) publish_model_button.click(fn=publish_model_clicked, inputs=[model_name, url, moder_version, model_creator], outputs=results) def download_model(): gr.Markdown(value="# " + i18n("下载模型")) gr.Markdown(value=i18n("它用于下载您的推理模型。")) with gr.Row(): model_url=gr.Textbox(label=i18n("网址")) with gr.Row(): download_model_status_bar=gr.Textbox(label=i18n("地位")) with gr.Row(): download_button=gr.Button(i18n("下载")) download_button.click(fn=load_downloaded_model, inputs=[model_url], outputs=[download_model_status_bar]) def download_backup(): gr.Markdown(value="# " + i18n("下载备份")) gr.Markdown(value=i18n("它用于下载您的训练备份。")) with gr.Row(): model_url=gr.Textbox(label=i18n("网址")) with gr.Row(): download_model_status_bar=gr.Textbox(label=i18n("地位")) with gr.Row(): download_button=gr.Button(i18n("下载")) download_button.click(fn=load_downloaded_backup, inputs=[model_url], outputs=[download_model_status_bar]) def update_dataset_list(name): new_datasets = [] for foldername in os.listdir("./datasets"): if "." not in foldername: new_datasets.append(os.path.join(find_folder_parent(".","pretrained"),"datasets",foldername)) return gr.Dropdown.update(choices=new_datasets) def download_dataset(trainset_dir4): gr.Markdown(value="# " + i18n("下载数据集")) gr.Markdown(value=i18n("下载兼容格式(.wav/.flac)的音频数据集以训练模型。")) with gr.Row(): dataset_url=gr.Textbox(label=i18n("网址")) with gr.Row(): load_dataset_status_bar=gr.Textbox(label=i18n("地位")) with gr.Row(): load_dataset_button=gr.Button(i18n("下载")) load_dataset_button.click(fn=load_dowloaded_dataset, inputs=[dataset_url], outputs=[load_dataset_status_bar]) load_dataset_status_bar.change(update_dataset_list, dataset_url, trainset_dir4) def youtube_separator(): gr.Markdown(value="# " + i18n("单独的 YouTube 曲目")) gr.Markdown(value=i18n("下载 YouTube 视频的音频并自动分离声音和伴奏轨道")) with gr.Row(): input_url = gr.inputs.Textbox(label=i18n("粘贴 YouTube 链接")) output_path = gr.Textbox( label=i18n("输入待处理音频文件夹路径(去文件管理器地址栏拷就行了)"), value=os.path.abspath(os.getcwd()).replace('\\', '/') + "/yt_downloads", visible=False, ) save_root_ins = gr.Textbox( label=i18n("输入待处理音频文件夹路径"), value=((os.getcwd()).replace('\\', '/') + "/yt_downloads"), visible=False, ) model_choose = gr.Textbox( value=os.path.abspath(os.getcwd()).replace('\\', '/') + "/uvr5_weights/HP5_only_main_vocal", visible=False, ) save_root_vocal = gr.Textbox( label=i18n("指定输出主人声文件夹"), value="audios", visible=False, ) opt_ins_root = gr.Textbox( label=i18n("指定输出非主人声文件夹"), value="opt", visible=False, ) format0 = gr.Radio( label=i18n("导出文件格式"), choices=["wav", "flac", "mp3", "m4a"], value="wav", interactive=True, visible=False, ) with gr.Row(): vc_output4 = gr.Textbox(label=i18n("地位")) with gr.Row(): but2 = gr.Button(i18n("下载并分离")) but2.click( uvr, [ input_url, output_path, model_choose, save_root_ins, save_root_vocal, opt_ins_root, format0, ], [vc_output4], )