Spaces:
Running
Running
import os | |
import torch | |
import argparse | |
import gradio as gr | |
from mailersend import emails | |
from dotenv import load_dotenv | |
import base64 | |
import psycopg2 | |
from urllib.parse import urlparse, parse_qs | |
import shutil | |
import boto3 | |
from botocore.exceptions import NoCredentialsError | |
import json | |
from elevenlabs import ElevenLabs | |
# Load environment variables | |
load_dotenv() | |
# Argument parsing | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--share", action='store_true', default=False, help="make link public") | |
args = parser.parse_args() | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
output_dir = 'outputs' | |
samples_dir = 'samples' | |
os.makedirs(output_dir, exist_ok=True) | |
os.makedirs(samples_dir, exist_ok=True) | |
supported_languages = ['zh', 'en'] | |
MAILERSEND_API_KEY = os.getenv("MAILERSEND_API_KEY") | |
MAILERSEND_DOMAIN = os.getenv("MAILERSEND_DOMAIN") | |
MAILERSEND_SENDER_EMAIL = f"noreply@{MAILERSEND_DOMAIN}" | |
MAILERSEND_SENDER_NAME = "Voice Clone App" | |
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") | |
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) | |
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
AWS_REGION_NAME = os.getenv('AWS_REGION_NAME') | |
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME') | |
# List of blocked words | |
BLOCKED_WORDS = ['Kill','hurt','shoot','gun','rifle','AR','semi automatic','knife','blade','sword','punch harm','disrupt','blackmail','steal','bitch','cunt','fuck','freaking','nigger','nigga','niggas','cracker','jew','oriental','fag','faggot','account','money','transfer','urgent','help','scared','policy','frightened','accident','fear','scam','address','social security number','assault','injure','maim','destroy','damage','threaten','intimidate','bully','menace','blackmail','extort','exploit','defame','steal','rob','embezzle','defraud Harass','jerk','idiot','stupid','moron','asshole','con','trick','swindle','defraud','payment','credit card','bank account','urgent','immediate','afraid','phone number','email','password'] | |
# Function to check for blocked words | |
def contains_blocked_words(text): | |
return any(word.lower() in text.lower() for word in BLOCKED_WORDS) | |
# Function to send email with downloadable file using MailerSend | |
def send_email_with_file(recipient_email, file_path, subject, body): | |
try: | |
mailer = emails.NewEmail(MAILERSEND_API_KEY) | |
mail_body = {} | |
mail_from = { | |
"name": MAILERSEND_SENDER_NAME, | |
"email": MAILERSEND_SENDER_EMAIL, | |
} | |
recipients = [ | |
{ | |
"name": "Recipient", | |
"email": recipient_email, | |
} | |
] | |
mailer.set_mail_from(mail_from, mail_body) | |
mailer.set_mail_to(recipients, mail_body) | |
mailer.set_subject(subject, mail_body) | |
mailer.set_html_content(f"<p>{body}</p>", mail_body) | |
mailer.set_plaintext_content(body, mail_body) | |
with open(file_path, "rb") as file: | |
attachment_content = base64.b64encode(file.read()).decode('utf-8') | |
attachments = [ | |
{ | |
"filename": os.path.basename(file_path), | |
"content": attachment_content, | |
"disposition": "attachment" | |
} | |
] | |
mailer.set_attachments(attachments, mail_body) | |
response = mailer.send(mail_body) | |
if response[0] == 202: | |
print("Email sent successfully") | |
return True | |
else: | |
print(f"Failed to send email. Status code: {response[0]}") | |
print(f"Response: {response[1]}") | |
return False | |
except Exception as e: | |
print(f"An error occurred while sending email: {e}") | |
return False | |
# Database connection details | |
connection_string = os.environ.get("DATABASE_URL") | |
result = urlparse(connection_string) | |
user = result.username | |
password = result.password | |
host = result.hostname | |
port = result.port | |
database = result.path[1:] | |
sslmode = parse_qs(result.query)['sslmode'][0] | |
# Function to add user information to the database | |
def add_user_info_to_db(email, order_name, sample_audio_path, name): | |
connection = psycopg2.connect( | |
dbname=database, | |
user=user, | |
password=password, | |
host=host, | |
port=port, | |
sslmode=sslmode | |
) | |
cursor = connection.cursor() | |
insert_query = """ | |
INSERT INTO main (email, order_name, sample_audio, name) | |
VALUES (%s, %s, %s, %s); | |
""" | |
try: | |
cursor.execute(insert_query, (email, order_name, sample_audio_path, name)) | |
connection.commit() | |
print("User information added to the database successfully") | |
except Exception as error: | |
print(f"Error occurred during data insertion: {error}") | |
connection.rollback() | |
finally: | |
cursor.close() | |
connection.close() | |
# S3 upload functions | |
def upload_to_s3(local_file, bucket, s3_file): | |
s3 = boto3.client('s3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
region_name=AWS_REGION_NAME) | |
try: | |
s3.upload_file(local_file, bucket, s3_file, ExtraArgs={'ACL': 'public-read'}) | |
print(f"Upload Successful: {s3_file}") | |
return True | |
except FileNotFoundError: | |
print(f"The file was not found: {local_file}") | |
return False | |
except NoCredentialsError: | |
print("Credentials not available") | |
return False | |
def upload_voice_sample_and_metadata(sample_path, metadata, bucket): | |
# Upload the voice sample | |
sample_filename = os.path.basename(sample_path) | |
s3_sample_path = f'voice_samples/{sample_filename}' | |
if not upload_to_s3(sample_path, bucket, s3_sample_path): | |
return False | |
# Create and upload metadata file | |
metadata['sample_s3_path'] = s3_sample_path | |
metadata_filename = f"{os.path.splitext(sample_filename)[0]}_metadata.json" | |
s3_metadata_path = f'voice_metadata/{metadata_filename}' | |
# Save metadata to a temporary file | |
temp_metadata_path = '/tmp/temp_metadata.json' | |
with open(temp_metadata_path, 'w') as f: | |
json.dump(metadata, f) | |
# Upload metadata file | |
if not upload_to_s3(temp_metadata_path, bucket, s3_metadata_path): | |
return False | |
# Clean up temporary file | |
os.remove(temp_metadata_path) | |
return True | |
# Predict function with ElevenLabs API usage | |
def predict(prompt, style, audio_file_pth, voice_name, customer_email, order_name): | |
text_hint = 'Your file will only be saved for 24 hours.\n' | |
if len(prompt) < 2: | |
text_hint += "[ERROR] Please provide a longer prompt text.\n" | |
return text_hint, None, None | |
if len(prompt) > 200: | |
text_hint += "[ERROR] Text length limited to 200 characters. Please try shorter text.\n" | |
return text_hint, None, None | |
if contains_blocked_words(prompt): | |
text_hint += "[ERROR] Your text contains blocked words. Please remove them and try again.\n" | |
return text_hint, None, None | |
# Copy the sample audio to the samples directory | |
sample_filename = f"{voice_name}_{customer_email}_sample.mp3" | |
sample_path = os.path.join(samples_dir, sample_filename) | |
shutil.copy2(audio_file_pth, sample_path) | |
# Prepare metadata | |
metadata = { | |
'name': voice_name, | |
'email': customer_email, | |
'order_name': order_name | |
} | |
# Upload voice sample and metadata to S3 | |
if upload_voice_sample_and_metadata(sample_path, metadata, S3_BUCKET_NAME): | |
text_hint += "Voice sample and metadata uploaded to S3 successfully.\n" | |
else: | |
text_hint += "Failed to upload voice sample and metadata to S3.\n" | |
# Add user information to the database | |
add_user_info_to_db(customer_email, order_name, sample_path, voice_name) | |
# Use ElevenLabs API to generate the cloned voice audio | |
try: | |
response = elevenlabs_client.text_to_speech(prompt, voice_name=voice_name, style=style) | |
output_audio_path = os.path.join(output_dir, f"{voice_name}_output.mp3") | |
with open(output_audio_path, 'wb') as f: | |
f.write(response['audio']) | |
text_hint += "Audio generated successfully using ElevenLabs.\n" | |
except Exception as e: | |
text_hint += f"[ERROR] ElevenLabs API error: {e}\n" | |
return text_hint, None, None | |
# Send email with the generated audio file | |
email_subject = "Your Voice Clone Audio is Ready" | |
email_body = f"Hi {voice_name},\n\nYour voice clone audio file is ready. Please find the attached file.\n\nBest regards,\nVoice Clone App" | |
if send_email_with_file(customer_email, output_audio_path, email_subject, email_body): | |
text_hint += "Email sent successfully with the generated audio file.\n" | |
else: | |
text_hint += "Failed to send email with the generated audio file.\n" | |
return text_hint, output_audio_path, audio_file_pth | |
# Gradio interface setup | |
with gr.Blocks(gr.themes.Glass()) as demo: | |
with gr.Row(): | |
with gr.Column(): | |
input_text_gr = gr.Textbox( | |
label="Create This", | |
info="One or two sentences at a time is better. Up to 200 text characters.", | |
value="He hoped there would be stew for dinner, turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick, peppered, flour-fattened sauce.", | |
) | |
style_gr = gr.Dropdown( | |
label="Style", | |
choices=['default', 'whispering', 'cheerful', 'terrified', 'angry', 'sad', 'friendly'], | |
info="Please upload a reference audio file that is at least 1 minute long. For best results, ensure the audio is clear.", | |
max_choices=1, | |
value="default", | |
) | |
ref_gr = gr.Audio( | |
label="Original Audio", | |
type="filepath", | |
sources=["upload"], | |
) | |
voice_name_gr = gr.Textbox( | |
label="Your name", | |
value="Sam" | |
) | |
order_gr = gr.Textbox( | |
label="Your order", | |
value="Sample Order", | |
) | |
customer_email_gr = gr.Textbox( | |
label="Your Email", | |
info="We'll send you a downloadable file to this email address." | |
) | |
tts_button = gr.Button("Start", elem_id="send-btn", visible=True) | |
with gr.Column(): | |
out_text_gr = gr.Text(label="Info") | |
audio_gr = gr.Audio(label="Generated Audio", autoplay=True) | |
ref_audio_gr = gr.Audio(label="Original Audio Used") | |
tts_button.click(predict, [input_text_gr, style_gr, ref_gr, voice_name_gr, customer_email_gr, order_gr], outputs=[out_text_gr, audio_gr, ref_audio_gr]) | |
demo.queue() | |
demo.launch(debug=True, show_api=False, share=args.share) | |
css = """ | |
footer {visibility: hidden} | |
audio .btn-container {display: none} | |
""" | |
demo.add_css(css) | |