File size: 4,808 Bytes
540a60c
 
 
 
 
 
 
 
4bf8454
540a60c
 
 
 
 
 
 
 
 
 
 
1ad8a92
540a60c
b61107f
 
 
 
 
1ad8a92
 
 
 
 
b61107f
4bf8454
 
 
 
 
 
 
540a60c
 
 
 
 
 
 
 
 
 
 
 
 
 
4bf8454
 
 
 
 
 
 
 
 
 
 
540a60c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from pydantic import BaseModel
from environs import Env
from typing import List, Dict, Any
import os
import base64
import numpy as np
import librosa
from scipy.io import wavfile
import shutil

class EndpointHandler:
    def __init__(self, model_dir=None):
        self.model_dir = model_dir

    def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
        try:
            # Clone the repository
            repo_url = "https://huggingface.co/mazalaai/TTS_Mongolian.git"
            os.system(f"git clone {repo_url}")

            # Rename the "lib" directory to "libb" and the "weights" directory to "weights2"
            repo_dir = "TTS_Mongolian"
            lib_dir = os.path.join(repo_dir, "lib")
            libb_dir = os.path.join(repo_dir, "libb")
            if os.path.exists(lib_dir):
                os.rename(lib_dir, libb_dir)

            weights_dir = os.path.join(repo_dir, "weights")
            weights2_dir = os.path.join(repo_dir, "weights2")
            if os.path.exists(weights_dir):
                os.rename(weights_dir, weights2_dir)

            # Copy all files from the cloned repository to the /repository directory
            dest_dir = "/repository"
            for item in os.listdir(repo_dir):
                item_path = os.path.join(repo_dir, item)
                if os.path.isfile(item_path):
                    shutil.copy(item_path, dest_dir)
                elif os.path.isdir(item_path):
                    shutil.copytree(item_path, os.path.join(dest_dir, item))

            # Import the voice_processing module and functions
            from voice_processing import tts, get_model_names, voice_mapping, get_unique_filename

            if "inputs" in data:
                # Check if data is in Hugging Face JSON format
                return self.process_hf_input(data)
            else:
                return self.process_json_input(data)

        except ValueError as e:
            return {"error": str(e)}
        except Exception as e:
            return {"error": str(e)}
        finally:
            # Clean up the cloned repository and copied files/directories
            if os.path.exists(repo_dir):
                shutil.rmtree(repo_dir)
            for item in os.listdir(dest_dir):
                if item.startswith("TTS_Mongolian"):
                    item_path = os.path.join(dest_dir, item)
                    if os.path.isfile(item_path):
                        os.remove(item_path)
                    elif os.path.isdir(item_path):
                        shutil.rmtree(item_path)

    def process_json_input(self, json_data):
        if all(key in json_data for key in ["model_name", "tts_text", "selected_voice", "slang_rate", "use_uploaded_voice"]):
            model_name = json_data["model_name"]
            tts_text = json_data["tts_text"]
            selected_voice = json_data["selected_voice"]
            slang_rate = json_data["slang_rate"]
            use_uploaded_voice = json_data["use_uploaded_voice"]
            voice_upload_file = json_data.get("voice_upload_file", None)
    
            edge_tts_voice = voice_mapping.get(selected_voice)
            if not edge_tts_voice:
                raise ValueError(f"Invalid voice '{selected_voice}'.")
    
            info, edge_tts_output_path, tts_output_data, edge_output_file = tts(
                model_name, tts_text, edge_tts_voice, slang_rate, use_uploaded_voice, voice_upload_file
            )
    
            if edge_output_file and os.path.exists(edge_output_file):
                os.remove(edge_output_file)
    
            _, audio_output = tts_output_data
            audio_file_path = self.save_audio_data_to_file(audio_output) if isinstance(audio_output, np.ndarray) else audio_output
    
            try:
                with open(audio_file_path, 'rb') as file:
                    audio_bytes = file.read()
                audio_data_uri = f"data:audio/wav;base64,{base64.b64encode(audio_bytes).decode('utf-8')}"
            except Exception as e:
                raise Exception(f"Failed to read audio file: {e}")
            finally:
                if os.path.exists(audio_file_path):
                    os.remove(audio_file_path)
    
            return {"info": info, "audio_data_uri": audio_data_uri}
        else:
            raise ValueError("Invalid JSON structure.")

    def process_hf_input(self, hf_data):
        if "inputs" in hf_data:
            actual_data = hf_data["inputs"]
            return self.process_json_input(actual_data)
        else:
            return {"error": "Invalid Hugging Face JSON structure."}

    def save_audio_data_to_file(self, audio_data, sample_rate=40000):
        file_path = get_unique_filename('wav')
        wavfile.write(file_path, sample_rate, audio_data)
        return file_path