File size: 12,931 Bytes
a389e8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e1a9cb
a389e8d
1e1a9cb
a389e8d
 
 
 
 
1e1a9cb
a389e8d
 
1e1a9cb
 
a389e8d
 
 
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
1e1a9cb
a389e8d
 
 
 
1e1a9cb
a389e8d
 
 
1e1a9cb
a389e8d
 
 
 
 
1e1a9cb
a389e8d
 
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
 
 
 
1e1a9cb
a389e8d
 
 
 
 
 
 
 
1e1a9cb
a389e8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e1a9cb
a389e8d
 
 
 
 
 
 
 
 
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
 
 
 
1e1a9cb
a389e8d
 
 
 
 
 
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
 
1e1a9cb
 
 
 
a389e8d
 
 
1e1a9cb
a389e8d
 
 
 
1e1a9cb
a389e8d
 
 
 
 
 
1e1a9cb
a389e8d
 
1e1a9cb
 
 
 
 
 
 
 
a389e8d
 
1e1a9cb
a389e8d
1e1a9cb
a389e8d
1e1a9cb
 
 
 
 
a389e8d
1e1a9cb
a389e8d
 
 
1e1a9cb
a389e8d
 
 
1e1a9cb
a389e8d
 
 
1e1a9cb
a389e8d
 
 
 
 
1e1a9cb
a389e8d
 
 
 
 
1e1a9cb
a389e8d
 
 
 
 
 
 
1e1a9cb
a389e8d
 
1e1a9cb
a389e8d
 
 
 
 
 
 
 
1e1a9cb
a389e8d
 
 
 
1e1a9cb
a389e8d
 
 
 
 
1e1a9cb
 
 
a389e8d
 
 
1e1a9cb
 
a389e8d
 
 
1e1a9cb
 
 
 
 
 
 
a389e8d
 
 
 
 
1e1a9cb
 
 
aa25384
1e1a9cb
 
a389e8d
1e1a9cb
a389e8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e1a9cb
a389e8d
 
 
 
 
1e1a9cb
a389e8d
1e1a9cb
 
 
 
 
 
a389e8d
 
1e1a9cb
a389e8d
1e1a9cb
a389e8d
 
 
 
 
 
1e1a9cb
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import os
import torch
import argparse
import gradio as gr
from mailersend import emails
from dotenv import load_dotenv
import base64
import psycopg2
from urllib.parse import urlparse, parse_qs
import shutil
import boto3
from botocore.exceptions import NoCredentialsError
import json
from elevenlabs.client import ElevenLabs
from elevenlabs import play, save

# Load environment variables
print("Loading environment variables...")
load_dotenv()
print("Environment variables loaded.")

# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument("--share", action='store_true', default=False, help="make link public")
args = parser.parse_args()
print(f"Arguments parsed. Share mode is set to: {args.share}")

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

output_dir = 'outputs'
samples_dir = 'samples'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(samples_dir, exist_ok=True)
print(f"Directories '{output_dir}' and '{samples_dir}' are ready.")

supported_languages = ['zh', 'en']
print(f"Supported languages: {supported_languages}")

# Load API keys and configurations
MAILERSEND_API_KEY = os.getenv("MAILERSEND_API_KEY")
MAILERSEND_DOMAIN = os.getenv("MAILERSEND_DOMAIN")
MAILERSEND_SENDER_EMAIL = f"noreply@{MAILERSEND_DOMAIN}"
MAILERSEND_SENDER_NAME = "Voice Clone App"
print("MailerSend configuration loaded.")

ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
print("ElevenLabs client initialized.")

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION_NAME = os.getenv('AWS_REGION_NAME')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')
print("AWS S3 configuration loaded.")

# List of blocked words
BLOCKED_WORDS = ['Kill','hurt','shoot','gun','rifle','AR','semi automatic','knife','blade','sword','punch harm','disrupt','blackmail','steal','bitch','cunt','fuck','freaking','nigger','nigga','niggas','cracker','jew','oriental','fag','faggot','account','money','transfer','urgent','help','scared','policy','frightened','accident','fear','scam','address','social security number','assault','injure','maim','destroy','damage','threaten','intimidate','bully','menace','blackmail','extort','exploit','defame','steal','rob','embezzle','defraud Harass','jerk','idiot','stupid','moron','asshole','con','trick','swindle','defraud','payment','credit card','bank account','urgent','immediate','afraid','phone number','email','password']
print(f"Blocked words list loaded with {len(BLOCKED_WORDS)} words.")

def get_blocked_words(text):
    print("Checking for blocked words in the input text...")
    # Split the text into words for accurate matching
    words_in_text = text.lower().split()
    # Find all blocked words present in the text
    blocked_found = [word for word in BLOCKED_WORDS if word.lower() in words_in_text]
    print(f"Blocked words found: {blocked_found}")
    return blocked_found

# Function to check for blocked words
def contains_blocked_words(text):
    return any(word.lower() in text.lower() for word in BLOCKED_WORDS)

# Function to send email with downloadable file using MailerSend
def send_email_with_file(recipient_email, file_path, subject, body):
    print(f"Sending email to {recipient_email} with file {file_path}...")
    try:
        mailer = emails.NewEmail(MAILERSEND_API_KEY)

        mail_body = {}
        mail_from = {
            "name": MAILERSEND_SENDER_NAME,
            "email": MAILERSEND_SENDER_EMAIL,
        }
        recipients = [
            {
                "name": "Recipient",
                "email": recipient_email,
            }
        ]

        mailer.set_mail_from(mail_from, mail_body)
        mailer.set_mail_to(recipients, mail_body)
        mailer.set_subject(subject, mail_body)
        mailer.set_html_content(f"<p>{body}</p>", mail_body)
        mailer.set_plaintext_content(body, mail_body)

        with open(file_path, "rb") as file:
            attachment_content = base64.b64encode(file.read()).decode('utf-8')

        attachments = [
            {
                "filename": os.path.basename(file_path),
                "content": attachment_content,
                "disposition": "attachment"
            }
        ]
        mailer.set_attachments(attachments, mail_body)

        response = mailer.send(mail_body)
        print(f"MailerSend response: {response}")

        if response[0] == 202:
            print("Email sent successfully.")
            return True
        else:
            print("Failed to send email.")
            return False
    except Exception as e:
        print(f"Error sending email: {e}")
        return False

# S3 upload functions
def upload_to_s3(local_file, bucket, s3_file):
    print(f"Uploading {local_file} to S3 bucket {bucket} as {s3_file}...")
    s3 = boto3.client('s3', 
                      aws_access_key_id=AWS_ACCESS_KEY_ID,
                      aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                      region_name=AWS_REGION_NAME)

    try:
        s3.upload_file(local_file, bucket, s3_file, ExtraArgs={'ACL': 'public-read'})
        print("File uploaded to S3 successfully.")
        return True
    except FileNotFoundError:
        print("File not found.")
        return False
    except NoCredentialsError:
        print("AWS credentials not available.")
        return False
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return False

def upload_voice_sample_and_metadata(sample_path, metadata, bucket):
    print("Uploading voice sample and metadata to S3...")
    # Upload the voice sample
    sample_filename = os.path.basename(sample_path)
    s3_sample_path = f'voice_samples/{sample_filename}'
    if not upload_to_s3(sample_path, bucket, s3_sample_path):
        print("Failed to upload voice sample.")
        return False

    # Create and upload metadata file
    metadata['sample_s3_path'] = s3_sample_path
    metadata_filename = f"{os.path.splitext(sample_filename)[0]}_metadata.json"
    s3_metadata_path = f'voice_metadata/{metadata_filename}'

    # Save metadata to a temporary file
    temp_metadata_path = '/tmp/temp_metadata.json'
    try:
        with open(temp_metadata_path, 'w') as f:
            json.dump(metadata, f)
        print("Metadata file created.")
    except Exception as e:
        print(f"Error creating metadata file: {e}")
        return False

    # Upload metadata file
    if not upload_to_s3(temp_metadata_path, bucket, s3_metadata_path):
        print("Failed to upload metadata file.")
        return False

    # Clean up temporary file
    try:
        os.remove(temp_metadata_path)
        print("Temporary metadata file removed.")
    except Exception as e:
        print(f"Error removing temporary file: {e}")

    print("Voice sample and metadata uploaded successfully.")
    return True

def predict(prompt, style, audio_file_pth, voice_name, customer_email, order_name):
    print("Prediction started...")
    text_hint = 'Your file will only be saved for 24 hours.\n'
    if len(prompt) < 2:
        text_hint += "[ERROR] Please provide a longer prompt text.\n"
        print("Prompt is too short.")
        return text_hint, None, None
    if len(prompt) > 200:
        text_hint += "[ERROR] Text length limited to 200 characters. Please try shorter text.\n"
        print("Prompt is too long.")
        return text_hint, None, None

    blocked_words = get_blocked_words(prompt)
    if blocked_words:
        text_hint += f"[ERROR] Your text contains blocked words: {', '.join(blocked_words)}. Please remove them and try again.\n"
        print(f"Blocked words detected: {blocked_words}")
        return text_hint, None, None

    # Check if audio file was uploaded
    if audio_file_pth is None:
        text_hint += "[ERROR] No audio file was uploaded. Please upload a reference audio file.\n"
        print("No audio file uploaded.")
        return text_hint, None, None

    # Copy the sample audio to the samples directory
    try:
        sample_filename = f"{voice_name}_{customer_email}_sample.mp3"
        sample_path = os.path.join(samples_dir, sample_filename)
        shutil.copy2(audio_file_pth, sample_path)
        print(f"Audio file copied to {sample_path}.")
    except Exception as e:
        text_hint += f"[ERROR] Failed to copy audio file: {str(e)}\n"
        print(f"Error copying audio file: {e}")
        return text_hint, None, None

    # Prepare metadata
    metadata = {
        'name': voice_name,
        'email': customer_email,
        'order_name': order_name
    }
    print("Metadata prepared:", metadata)

    # Use ElevenLabs API to clone the voice and generate audio
    try:
        full_voice_name = f"{voice_name}_{customer_email}"
        print(f"Cloning voice with name: {full_voice_name}")
        voice = client.clone(
            name=full_voice_name,
            description="A trial voice model for testing",
            files=[sample_path],
        )
        print("Voice cloned successfully.")

        print("Generating audio using the cloned voice...")
        audio = client.generate(text=prompt, voice=voice)
        output_audio_path = os.path.join(output_dir, f"{full_voice_name}_output.mp3")
        save(audio, output_audio_path)
        print(f"Audio generated and saved to {output_audio_path}.")

        text_hint += "Audio generated successfully using ElevenLabs.\n"
    except Exception as e:
        text_hint += f"[ERROR] ElevenLabs API error: {e}\n"
        print(f"ElevenLabs API error: {e}")
        return text_hint, None, None

    # Optionally upload to S3 (uncommented)
    if not upload_voice_sample_and_metadata(sample_path, metadata, S3_BUCKET_NAME):
        text_hint += "[ERROR] Failed to upload to S3.\n"
        print("Failed to upload voice sample and metadata to S3.")
        return text_hint, None, None

    # Send email with the generated audio file
    email_subject = "Your Voice Clone Audio is Ready"
    email_body = f"Hi {voice_name},\n\nYour voice clone audio file is ready. Please find the attached file.\n\nBest regards,\nVoice Clone App"
    if send_email_with_file(customer_email, output_audio_path, email_subject, email_body):
        text_hint += "An email has been sent with your audio file.\n"
    else:
        text_hint += "An email has been sent with your audio file.\n"
        print("Failed to send email with the audio file.")

    return text_hint, output_audio_path, sample_path

with gr.Blocks(gr.themes.Glass()) as demo:
    with gr.Row():
        with gr.Column():
            input_text_gr = gr.Textbox(
                label="Create This",
                info="One or two sentences at a time is better. Up to 200 text characters.",
                value="He hoped there would be stew for dinner, turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick, peppered, flour-fattened sauce.",
            )
            style_gr = gr.Dropdown(
                label="Style",
                choices=['default', 'whispering', 'cheerful', 'terrified', 'angry', 'sad', 'friendly'],
                info="Please upload a reference audio file that is at least 1 minute long. For best results, ensure the audio is clear.",
                max_choices=1,
                value="default",
            )
            ref_gr = gr.Audio(
                label="Original Audio",
                type="filepath",
                sources=["upload"],
            )
            voice_name_gr = gr.Textbox(
                label="Your name",
                value="Sam"
            )
            order_gr = gr.Textbox(
                label="Your order",
                value="Sample Order",
            )
            customer_email_gr = gr.Textbox(
                label="Your Email",
                info="We'll send you a downloadable file to this email address."
            )
            tts_button = gr.Button("Start", elem_id="send-btn", visible=True)
            print("Gradio input components initialized.")

        with gr.Column():
            out_text_gr = gr.Text(label="Info")
            audio_gr = gr.Audio(label="Generated Audio", autoplay=True)
            ref_audio_gr = gr.Audio(label="Original Audio Used")
            print("Gradio output components initialized.")

            tts_button.click(
                predict, 
                [input_text_gr, style_gr, ref_gr, voice_name_gr, customer_email_gr, order_gr], 
                outputs=[out_text_gr, audio_gr, ref_audio_gr]
            )
            print("Gradio button click event bound to predict function.")

    demo.queue()
    print("Launching Gradio demo...")
    demo.launch(debug=True, show_api=False, share=args.share)
    print("Gradio demo launched.")

css = """
footer {visibility: hidden}
audio .btn-container {display: none}
"""

demo.add_css(css)
print("Custom CSS added to Gradio interface.")