#from pydantic import BaseModel from environs import Env from typing import List, Dict, Any import os import base64 import numpy as np import librosa from scipy.io import wavfile import asyncio import shutil import zipfile import requests def download_and_extract_files(): files_to_download = [ ("config.py", "https://www.dropbox.com/scl/fi/ls7vmjk75uou8ayfn6kj4/config.py?rlkey=4qluzl5l07zq1j9mkl9n6j66u&st=0yit9dzx&dl=1"), ("hubert_base.pt", "https://www.dropbox.com/scl/fi/g7oohuwfzlnrbd8zic6gj/hubert_base.pt?rlkey=ddeyqex1morsm54azyakmd62e&st=rsrvf964&dl=1"), ("lib.zip", "https://www.dropbox.com/scl/fi/ia6p6cf5xvcbi78dmkbbz/lib.zip?rlkey=k3chc1nlaswsqdo7slqco56wi&st=19n9syfd&dl=1"), ("rmvpe.pt", "https://www.dropbox.com/scl/fi/7pl7u6fvydwgtz19n8nzx/rmvpe.pt?rlkey=tnbxmarogivbw3qy34hgy7r7q&st=um8d4230&dl=1"), ("rmvpe.py", "https://www.dropbox.com/scl/fi/i2shk4otwyg4ns8yod5h1/rmvpe.py?rlkey=l7313htdh1ihylb6bx91el0lv&st=xhkfog8j&dl=1"), ("vc_infer_pipeline.py", "https://www.dropbox.com/scl/fi/bvz7s2wf2y67twpg583lg/vc_infer_pipeline.py?rlkey=q4w7oww5e7e2qdfh3herofk4o&st=4sck87ny&dl=1"), ("voice_processing.py", "https://www.dropbox.com/scl/fi/emrmjsuz0mod4r2x9e43f/voice_processing.py?rlkey=6baomwowns9y3yq1pl6syer0t&st=d9u51gba&dl=1"), ("weights.zip", "https://www.dropbox.com/scl/fi/tr5a04wlow5go8cv3d6qp/weights.zip?rlkey=qvpwax97nn5a4iv79g76lcbz2&st=5ueb2gva&dl=1") ] for file_name, url in files_to_download: if not os.path.exists(file_name): response = requests.get(url) with open(file_name, "wb") as file: file.write(response.content) if file_name.endswith(".zip"): with zipfile.ZipFile(file_name, "r") as zip_ref: extract_to = os.path.splitext(file_name)[0] for member in zip_ref.namelist(): # Extract files into the target directory without the first part of the path member_path = os.path.join(extract_to, *member.split('/')[1:]) if member.endswith('/'): os.makedirs(member_path, exist_ok=True) else: os.makedirs(os.path.dirname(member_path), exist_ok=True) with open(member_path, 'wb') as f: f.write(zip_ref.read(member)) # Optionally, remove the zip file after extraction os.remove(file_name) # Run the function download_and_extract_files() from voice_processing import tts, get_model_names, voice_mapping, get_unique_filename class EndpointHandler: def __init__(self, model_dir=None): self.model_dir = model_dir def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: try: if "inputs" in data: # Check if data is in Hugging Face JSON format return self.process_hf_input(data) else: return self.process_json_input(data) except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": str(e)} def process_json_input(self, json_data): if all(key in json_data for key in ["model_name", "tts_text", "selected_voice", "slang_rate", "use_uploaded_voice"]): model_name = json_data["model_name"] tts_text = json_data["tts_text"] selected_voice = json_data["selected_voice"] slang_rate = json_data["slang_rate"] use_uploaded_voice = json_data["use_uploaded_voice"] voice_upload_file = json_data.get("voice_upload_file", None) edge_tts_voice = voice_mapping.get(selected_voice) if not edge_tts_voice: raise ValueError(f"Invalid voice '{selected_voice}'.") info, edge_tts_output_path, tts_output_data, edge_output_file = asyncio.run(tts( model_name, tts_text, edge_tts_voice, slang_rate, use_uploaded_voice, voice_upload_file )) if edge_output_file and os.path.exists(edge_output_file): os.remove(edge_output_file) _, audio_output = tts_output_data audio_file_path = self.save_audio_data_to_file(audio_output) if isinstance(audio_output, np.ndarray) else audio_output try: with open(audio_file_path, 'rb') as file: audio_bytes = file.read() audio_data_uri = f"data:audio/wav;base64,{base64.b64encode(audio_bytes).decode('utf-8')}" except Exception as e: raise Exception(f"Failed to read audio file: {e}") finally: if os.path.exists(audio_file_path): os.remove(audio_file_path) return {"info": info, "audio_data_uri": audio_data_uri} else: raise ValueError("Invalid JSON structure.") def process_hf_input(self, hf_data): if "inputs" in hf_data: actual_data = hf_data["inputs"] return self.process_json_input(actual_data) else: return {"error": "Invalid Hugging Face JSON structure."} def save_audio_data_to_file(self, audio_data, sample_rate=40000): file_path = get_unique_filename('wav') wavfile.write(file_path, sample_rate, audio_data) return file_path