from pydantic import BaseModel from environs import Env from typing import List, Dict, Any import os import base64 import numpy as np import librosa from scipy.io import wavfile import shutil class EndpointHandler: def __init__(self, model_dir=None): self.model_dir = model_dir def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: try: # Clone the repository repo_url = "https://huggingface.co/mazalaai/TTS_Mongolian.git" os.system(f"git clone {repo_url}") # Rename the "lib" directory to "libb" and the "weights" directory to "weights2" repo_dir = "TTS_Mongolian" lib_dir = os.path.join(repo_dir, "lib") libb_dir = os.path.join(repo_dir, "libb") if os.path.exists(lib_dir): os.rename(lib_dir, libb_dir) weights_dir = os.path.join(repo_dir, "weights") weights2_dir = os.path.join(repo_dir, "weights2") if os.path.exists(weights_dir): os.rename(weights_dir, weights2_dir) # Copy all files from the cloned repository to the /repository directory dest_dir = "/repository" for item in os.listdir(repo_dir): item_path = os.path.join(repo_dir, item) if os.path.isfile(item_path): shutil.copy(item_path, dest_dir) elif os.path.isdir(item_path): shutil.copytree(item_path, os.path.join(dest_dir, item)) # Import the voice_processing module and functions from voice_processing import tts, get_model_names, voice_mapping, get_unique_filename if "inputs" in data: # Check if data is in Hugging Face JSON format return self.process_hf_input(data) else: return self.process_json_input(data) except ValueError as e: return {"error": str(e)} except Exception as e: return {"error": str(e)} finally: # Clean up the cloned repository and copied files/directories if os.path.exists(repo_dir): shutil.rmtree(repo_dir) for item in os.listdir(dest_dir): if item.startswith("TTS_Mongolian"): item_path = os.path.join(dest_dir, item) if os.path.isfile(item_path): os.remove(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path) def process_json_input(self, json_data): if all(key in json_data for key in ["model_name", "tts_text", "selected_voice", "slang_rate", "use_uploaded_voice"]): model_name = json_data["model_name"] tts_text = json_data["tts_text"] selected_voice = json_data["selected_voice"] slang_rate = json_data["slang_rate"] use_uploaded_voice = json_data["use_uploaded_voice"] voice_upload_file = json_data.get("voice_upload_file", None) edge_tts_voice = voice_mapping.get(selected_voice) if not edge_tts_voice: raise ValueError(f"Invalid voice '{selected_voice}'.") info, edge_tts_output_path, tts_output_data, edge_output_file = tts( model_name, tts_text, edge_tts_voice, slang_rate, use_uploaded_voice, voice_upload_file ) if edge_output_file and os.path.exists(edge_output_file): os.remove(edge_output_file) _, audio_output = tts_output_data audio_file_path = self.save_audio_data_to_file(audio_output) if isinstance(audio_output, np.ndarray) else audio_output try: with open(audio_file_path, 'rb') as file: audio_bytes = file.read() audio_data_uri = f"data:audio/wav;base64,{base64.b64encode(audio_bytes).decode('utf-8')}" except Exception as e: raise Exception(f"Failed to read audio file: {e}") finally: if os.path.exists(audio_file_path): os.remove(audio_file_path) return {"info": info, "audio_data_uri": audio_data_uri} else: raise ValueError("Invalid JSON structure.") def process_hf_input(self, hf_data): if "inputs" in hf_data: actual_data = hf_data["inputs"] return self.process_json_input(actual_data) else: return {"error": "Invalid Hugging Face JSON structure."} def save_audio_data_to_file(self, audio_data, sample_rate=40000): file_path = get_unique_filename('wav') wavfile.write(file_path, sample_rate, audio_data) return file_path