File size: 4,808 Bytes
540a60c 4bf8454 540a60c 1ad8a92 540a60c b61107f 1ad8a92 b61107f 4bf8454 540a60c 4bf8454 540a60c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
from pydantic import BaseModel
from environs import Env
from typing import List, Dict, Any
import os
import base64
import numpy as np
import librosa
from scipy.io import wavfile
import shutil
class EndpointHandler:
def __init__(self, model_dir=None):
self.model_dir = model_dir
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
try:
# Clone the repository
repo_url = "https://huggingface.co/mazalaai/TTS_Mongolian.git"
os.system(f"git clone {repo_url}")
# Rename the "lib" directory to "libb" and the "weights" directory to "weights2"
repo_dir = "TTS_Mongolian"
lib_dir = os.path.join(repo_dir, "lib")
libb_dir = os.path.join(repo_dir, "libb")
if os.path.exists(lib_dir):
os.rename(lib_dir, libb_dir)
weights_dir = os.path.join(repo_dir, "weights")
weights2_dir = os.path.join(repo_dir, "weights2")
if os.path.exists(weights_dir):
os.rename(weights_dir, weights2_dir)
# Copy all files from the cloned repository to the /repository directory
dest_dir = "/repository"
for item in os.listdir(repo_dir):
item_path = os.path.join(repo_dir, item)
if os.path.isfile(item_path):
shutil.copy(item_path, dest_dir)
elif os.path.isdir(item_path):
shutil.copytree(item_path, os.path.join(dest_dir, item))
# Import the voice_processing module and functions
from voice_processing import tts, get_model_names, voice_mapping, get_unique_filename
if "inputs" in data:
# Check if data is in Hugging Face JSON format
return self.process_hf_input(data)
else:
return self.process_json_input(data)
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": str(e)}
finally:
# Clean up the cloned repository and copied files/directories
if os.path.exists(repo_dir):
shutil.rmtree(repo_dir)
for item in os.listdir(dest_dir):
if item.startswith("TTS_Mongolian"):
item_path = os.path.join(dest_dir, item)
if os.path.isfile(item_path):
os.remove(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
def process_json_input(self, json_data):
if all(key in json_data for key in ["model_name", "tts_text", "selected_voice", "slang_rate", "use_uploaded_voice"]):
model_name = json_data["model_name"]
tts_text = json_data["tts_text"]
selected_voice = json_data["selected_voice"]
slang_rate = json_data["slang_rate"]
use_uploaded_voice = json_data["use_uploaded_voice"]
voice_upload_file = json_data.get("voice_upload_file", None)
edge_tts_voice = voice_mapping.get(selected_voice)
if not edge_tts_voice:
raise ValueError(f"Invalid voice '{selected_voice}'.")
info, edge_tts_output_path, tts_output_data, edge_output_file = tts(
model_name, tts_text, edge_tts_voice, slang_rate, use_uploaded_voice, voice_upload_file
)
if edge_output_file and os.path.exists(edge_output_file):
os.remove(edge_output_file)
_, audio_output = tts_output_data
audio_file_path = self.save_audio_data_to_file(audio_output) if isinstance(audio_output, np.ndarray) else audio_output
try:
with open(audio_file_path, 'rb') as file:
audio_bytes = file.read()
audio_data_uri = f"data:audio/wav;base64,{base64.b64encode(audio_bytes).decode('utf-8')}"
except Exception as e:
raise Exception(f"Failed to read audio file: {e}")
finally:
if os.path.exists(audio_file_path):
os.remove(audio_file_path)
return {"info": info, "audio_data_uri": audio_data_uri}
else:
raise ValueError("Invalid JSON structure.")
def process_hf_input(self, hf_data):
if "inputs" in hf_data:
actual_data = hf_data["inputs"]
return self.process_json_input(actual_data)
else:
return {"error": "Invalid Hugging Face JSON structure."}
def save_audio_data_to_file(self, audio_data, sample_rate=40000):
file_path = get_unique_filename('wav')
wavfile.write(file_path, sample_rate, audio_data)
return file_path |