File size: 5,429 Bytes
9c20444
 
 
 
 
 
 
 
6fb0feb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9c20444
 
 
 
 
 
 
6fb0feb
 
9c20444
6fb0feb
9c20444
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6fb0feb
ab7b66a
6fb0feb
 
9c20444
 
 
 
6fb0feb
9c20444
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6fb0feb
 
 
 
 
 
 
9c20444
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from pydantic import BaseModel
from environs import Env
from typing import List, Dict, Any
import os
import base64
import numpy as np
import librosa
from scipy.io import wavfile
import asyncio
import zipfile
import requests


def download_and_extract_files():
    files_to_download = [
        ("config.py", "https://www.dropbox.com/scl/fi/3vnj2myor659x5xyteu6u/config.py?rlkey=h7z6mp7denazoqwtr5hj0g70n&st=2vop5eid&dl=1"),
        ("hubert_base.pt", "https://www.dropbox.com/scl/fi/g7oohuwfzlnrbd8zic6gj/hubert_base.pt?rlkey=ddeyqex1morsm54azyakmd62e&st=s7p2ocr3&dl=1"),
        ("lib.zip", "https://www.dropbox.com/scl/fi/ia6p6cf5xvcbi78dmkbbz/lib.zip?rlkey=k3chc1nlaswsqdo7slqco56wi&st=re40dqki&dl=1"),
        ("rmvpe.pt", "https://www.dropbox.com/scl/fi/7pl7u6fvydwgtz19n8nzx/rmvpe.pt?rlkey=tnbxmarogivbw3qy34hgy7r7q&st=69iglyfn&dl=1"),
        ("rmvpe.py", "https://www.dropbox.com/scl/fi/i2shk4otwyg4ns8yod5h1/rmvpe.py?rlkey=l7313htdh1ihylb6bx91el0lv&st=mrz8advb&dl=1"),
        ("vc_infer_pipeline.py", "https://www.dropbox.com/scl/fi/k0hbva7vujhbmzg2l0l2g/vc_infer_pipeline.py?rlkey=5leclgbjiv6ukjywnftscnwe8&st=fj6sgnq4&dl=1"),
        ("voice_processing.py", "https://www.dropbox.com/scl/fi/z6gyjaymlgwinst05bb43/voice_processing.py?rlkey=0qlcfv6g9db2uzaq3xvv6k43s&st=gg63oppb&dl=1"),
        ("weights.zip", "https://www.dropbox.com/scl/fi/tr5a04wlow5go8cv3d6qp/weights.zip?rlkey=qvpwax97nn5a4iv79g76lcbz2&st=5ueb2gva&dl=1")
    ]

    for file_name, url in files_to_download:
        if not os.path.exists(file_name):
            response = requests.get(url)
            with open(file_name, "wb") as file:
                file.write(response.content)

            if file_name.endswith(".zip"):
                with zipfile.ZipFile(file_name, "r") as zip_ref:
                    extract_to = os.path.splitext(file_name)[0]
                    for member in zip_ref.namelist():
                        # Extract files into the target directory without the first part of the path
                        member_path = os.path.join(extract_to, *member.split('/')[1:])
                        if member.endswith('/'):
                            os.makedirs(member_path, exist_ok=True)
                        else:
                            os.makedirs(os.path.dirname(member_path), exist_ok=True)
                            with open(member_path, 'wb') as f:
                                f.write(zip_ref.read(member))

                # Optionally, remove the zip file after extraction
                os.remove(file_name)

# Run the function
download_and_extract_files()

from voice_processing import tts, get_model_names, voice_mapping, get_unique_filename


class EndpointHandler:
    def __init__(self, model_dir=None):
        self.model_dir = model_dir

    def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
        try:
            if "inputs" in data:  # Check if data is in Hugging Face JSON format
                return self.process_hf_input(data)
            else:
                return self.process_json_input(data)
        except ValueError as e:
            return {"error": str(e)}
        except Exception as e:
            return {"error": str(e)}

    def process_json_input(self, json_data):
        if all(key in json_data for key in ["model_name", "tts_text", "selected_voice", "slang_rate", "use_uploaded_voice"]):
            model_name = json_data["model_name"]
            tts_text = json_data["tts_text"]
            selected_voice = json_data["selected_voice"]
            slang_rate = json_data["slang_rate"]
            use_uploaded_voice = json_data["use_uploaded_voice"]
            voice_upload_file = json_data.get("voice_upload_file", None)

            edge_tts_voice = voice_mapping.get(selected_voice)
            if not edge_tts_voice:
                raise ValueError(f"Invalid voice '{selected_voice}'.")

            info, edge_tts_output_path, tts_output_data, edge_output_file = asyncio.run(tts(
                model_name, tts_text, edge_tts_voice, slang_rate, use_uploaded_voice, voice_upload_file
            ))

            if edge_output_file and os.path.exists(edge_output_file):
                os.remove(edge_output_file)

            _, audio_output = tts_output_data

            audio_file_path = self.save_audio_data_to_file(audio_output) if isinstance(audio_output, np.ndarray) else audio_output

            try:
                with open(audio_file_path, 'rb') as file:
                    audio_bytes = file.read()
                audio_data_uri = f"data:audio/wav;base64,{base64.b64encode(audio_bytes).decode('utf-8')}"
            except Exception as e:
                raise Exception(f"Failed to read audio file: {e}")
            finally:
                if os.path.exists(audio_file_path):
                    os.remove(audio_file_path)

            return {"info": info, "audio_data_uri": audio_data_uri}
        else:
            raise ValueError("Invalid JSON structure.")

    def process_hf_input(self, hf_data):
        if "inputs" in hf_data:
            actual_data = hf_data["inputs"]
            return self.process_json_input(actual_data)
        else:
            return {"error": "Invalid Hugging Face JSON structure."}

    def save_audio_data_to_file(self, audio_data, sample_rate=40000):
        file_path = get_unique_filename('wav')
        wavfile.write(file_path, sample_rate, audio_data)
        return file_path