File size: 9,850 Bytes
a389e8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import os
import torch
import argparse
import gradio as gr
from mailersend import emails
from dotenv import load_dotenv
import base64
import psycopg2
from urllib.parse import urlparse, parse_qs
import shutil
import boto3
from botocore.exceptions import NoCredentialsError
import json
from elevenlabs.client import ElevenLabs
from elevenlabs import play, save

# Load environment variables
load_dotenv()

# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument("--share", action='store_true', default=False, help="make link public")
args = parser.parse_args()

device = 'cuda' if torch.cuda.is_available() else 'cpu'
output_dir = 'outputs'
samples_dir = 'samples'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(samples_dir, exist_ok=True)

supported_languages = ['zh', 'en']

MAILERSEND_API_KEY = os.getenv("MAILERSEND_API_KEY")
MAILERSEND_DOMAIN = os.getenv("MAILERSEND_DOMAIN")
MAILERSEND_SENDER_EMAIL = f"noreply@{MAILERSEND_DOMAIN}"
MAILERSEND_SENDER_NAME = "Voice Clone App"

ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
client = ElevenLabs(api_key=ELEVENLABS_API_KEY)

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION_NAME = os.getenv('AWS_REGION_NAME')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')

# List of blocked words
BLOCKED_WORDS = ['Kill','hurt','shoot','gun','rifle','AR','semi automatic','knife','blade','sword','punch harm','disrupt','blackmail','steal','bitch','cunt','fuck','freaking','nigger','nigga','niggas','cracker','jew','oriental','fag','faggot','account','money','transfer','urgent','help','scared','policy','frightened','accident','fear','scam','address','social security number','assault','injure','maim','destroy','damage','threaten','intimidate','bully','menace','blackmail','extort','exploit','defame','steal','rob','embezzle','defraud Harass','jerk','idiot','stupid','moron','asshole','con','trick','swindle','defraud','payment','credit card','bank account','urgent','immediate','afraid','phone number','email','password']

def get_blocked_words(text):
    # Split the text into words for accurate matching
    words_in_text = text.lower().split()
    # Find all blocked words present in the text
    blocked_found = [word for word in BLOCKED_WORDS if word.lower() in words_in_text]
    return blocked_found

# Function to check for blocked words
def contains_blocked_words(text):
    return any(word.lower() in text.lower() for word in BLOCKED_WORDS)

# Function to send email with downloadable file using MailerSend
def send_email_with_file(recipient_email, file_path, subject, body):
    try:
        mailer = emails.NewEmail(MAILERSEND_API_KEY)

        mail_body = {}
        mail_from = {
            "name": MAILERSEND_SENDER_NAME,
            "email": MAILERSEND_SENDER_EMAIL,
        }
        recipients = [
            {
                "name": "Recipient",
                "email": recipient_email,
            }
        ]

        mailer.set_mail_from(mail_from, mail_body)
        mailer.set_mail_to(recipients, mail_body)
        mailer.set_subject(subject, mail_body)
        mailer.set_html_content(f"<p>{body}</p>", mail_body)
        mailer.set_plaintext_content(body, mail_body)

        with open(file_path, "rb") as file:
            attachment_content = base64.b64encode(file.read()).decode('utf-8')
        
        attachments = [
            {
                "filename": os.path.basename(file_path),
                "content": attachment_content,
                "disposition": "attachment"
            }
        ]
        mailer.set_attachments(attachments, mail_body)

        response = mailer.send(mail_body)

        if response[0] == 202:
            return True
        else:
            return False
    except Exception as e:
        return False

# S3 upload functions
def upload_to_s3(local_file, bucket, s3_file):
    s3 = boto3.client('s3', 
                      aws_access_key_id=AWS_ACCESS_KEY_ID,
                      aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                      region_name=AWS_REGION_NAME)

    try:
        s3.upload_file(local_file, bucket, s3_file, ExtraArgs={'ACL': 'public-read'})
        return True
    except FileNotFoundError:
        return False
    except NoCredentialsError:
        return False

def upload_voice_sample_and_metadata(sample_path, metadata, bucket):
    # Upload the voice sample
    sample_filename = os.path.basename(sample_path)
    s3_sample_path = f'voice_samples/{sample_filename}'
    if not upload_to_s3(sample_path, bucket, s3_sample_path):
        return False

    # Create and upload metadata file
    metadata['sample_s3_path'] = s3_sample_path
    metadata_filename = f"{os.path.splitext(sample_filename)[0]}_metadata.json"
    s3_metadata_path = f'voice_metadata/{metadata_filename}'
    
    # Save metadata to a temporary file
    temp_metadata_path = '/tmp/temp_metadata.json'
    with open(temp_metadata_path, 'w') as f:
        json.dump(metadata, f)
    
    # Upload metadata file
    if not upload_to_s3(temp_metadata_path, bucket, s3_metadata_path):
        return False
    
    # Clean up temporary file
    os.remove(temp_metadata_path)

    return True

def predict(prompt, style, audio_file_pth, voice_name, customer_email, order_name):
    text_hint = 'Your file will only be saved for 24 hours.\n'
    if len(prompt) < 2:
        text_hint += "[ERROR] Please provide a longer prompt text.\n"
        return text_hint, None, None
    if len(prompt) > 200:
        text_hint += "[ERROR] Text length limited to 200 characters. Please try shorter text.\n"
        return text_hint, None, None

    blocked_words = get_blocked_words(prompt)
    if blocked_words:
        text_hint += f"[ERROR] Your text contains blocked words: {', '.join(blocked_words)}. Please remove them and try again.\n"
        return text_hint, None, None

    # Check if audio file was uploaded
    if audio_file_pth is None:
        text_hint += "[ERROR] No audio file was uploaded. Please upload a reference audio file.\n"
        return text_hint, None, None

    # Check if audio file was uploaded
    if audio_file_pth is None:
        text_hint += "[ERROR] No audio file was uploaded. Please upload a reference audio file.\n"
        return text_hint, None, None

    # Copy the sample audio to the samples directory
    try:
        sample_filename = f"{voice_name}_{customer_email}_sample.mp3"
        sample_path = os.path.join(samples_dir, sample_filename)
        shutil.copy2(audio_file_pth, sample_path)
    except Exception as e:
        text_hint += f"[ERROR] Failed to copy audio file: {str(e)}\n"
        return text_hint, None, None

    # Prepare metadata
    metadata = {
        'name': voice_name,
        'email': customer_email,
        'order_name': order_name
    }

    # Use ElevenLabs API to clone the voice and generate audio
    try:
        full_voice_name = f"{voice_name}_{customer_email}"
        voice = client.clone(
            name=full_voice_name,
            description="A trial voice model for testing",
            files=[sample_path],
        )
        audio = client.generate(text=prompt, voice=voice)
        output_audio_path = os.path.join(output_dir, f"{full_voice_name}_output.mp3")
        save(audio, output_audio_path)
        text_hint += "Audio generated successfully using ElevenLabs.\n"
    except Exception as e:
        text_hint += f"[ERROR] ElevenLabs API error: {e}\n"
        return text_hint, None, None

    # Send email with the generated audio file
    email_subject = "Your Voice Clone Audio is Ready"
    email_body = f"Hi {voice_name},\n\nYour voice clone audio file is ready. Please find the attached file.\n\nBest regards,\nVoice Clone App"
    return text_hint, output_audio_path, sample_path
with gr.Blocks(gr.themes.Glass()) as demo:
    with gr.Row():
        with gr.Column():
            input_text_gr = gr.Textbox(
                label="Create This",
                info="One or two sentences at a time is better. Up to 200 text characters.",
                value="He hoped there would be stew for dinner, turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick, peppered, flour-fattened sauce.",
            )
            style_gr = gr.Dropdown(
                label="Style",
                choices=['default', 'whispering', 'cheerful', 'terrified', 'angry', 'sad', 'friendly'],
                info="Please upload a reference audio file that is at least 1 minute long. For best results, ensure the audio is clear.",
                max_choices=1,
                value="default",
            )
            ref_gr = gr.Audio(
                label="Original Audio",
                type="filepath",
                sources=["upload"],
            )
            voice_name_gr = gr.Textbox(
                label="Your name",
                value="Sam"
            )
            order_gr = gr.Textbox(
                label="Your order",
                value="Sample Order",
            )
            customer_email_gr = gr.Textbox(
                label="Your Email",
                info="We'll send you a downloadable file to this email address."
            )
            tts_button = gr.Button("Start", elem_id="send-btn", visible=True)

        with gr.Column():
            out_text_gr = gr.Text(label="Info")
            audio_gr = gr.Audio(label="Generated Audio", autoplay=True)
            ref_audio_gr = gr.Audio(label="Original Audio Used")

            tts_button.click(predict, [input_text_gr, style_gr, ref_gr, voice_name_gr, customer_email_gr, order_gr], outputs=[out_text_gr, audio_gr, ref_audio_gr])

    demo.queue()
    demo.launch(debug=True, show_api=False, share=args.share)

css = """
footer {visibility: hidden}
audio .btn-container {display: none}
"""

demo.add_css(css)