xyzxyzxyz / app.py
CuddleBuddys's picture
Update app.py
aa25384 verified
import os
import torch
import argparse
import gradio as gr
from mailersend import emails
from dotenv import load_dotenv
import base64
import psycopg2
from urllib.parse import urlparse, parse_qs
import shutil
import boto3
from botocore.exceptions import NoCredentialsError
import json
from elevenlabs.client import ElevenLabs
from elevenlabs import play, save
# Load environment variables
print("Loading environment variables...")
load_dotenv()
print("Environment variables loaded.")
# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument("--share", action='store_true', default=False, help="make link public")
args = parser.parse_args()
print(f"Arguments parsed. Share mode is set to: {args.share}")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")
output_dir = 'outputs'
samples_dir = 'samples'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(samples_dir, exist_ok=True)
print(f"Directories '{output_dir}' and '{samples_dir}' are ready.")
supported_languages = ['zh', 'en']
print(f"Supported languages: {supported_languages}")
# Load API keys and configurations
MAILERSEND_API_KEY = os.getenv("MAILERSEND_API_KEY")
MAILERSEND_DOMAIN = os.getenv("MAILERSEND_DOMAIN")
MAILERSEND_SENDER_EMAIL = f"noreply@{MAILERSEND_DOMAIN}"
MAILERSEND_SENDER_NAME = "Voice Clone App"
print("MailerSend configuration loaded.")
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
print("ElevenLabs client initialized.")
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION_NAME = os.getenv('AWS_REGION_NAME')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')
print("AWS S3 configuration loaded.")
# List of blocked words
BLOCKED_WORDS = ['Kill','hurt','shoot','gun','rifle','AR','semi automatic','knife','blade','sword','punch harm','disrupt','blackmail','steal','bitch','cunt','fuck','freaking','nigger','nigga','niggas','cracker','jew','oriental','fag','faggot','account','money','transfer','urgent','help','scared','policy','frightened','accident','fear','scam','address','social security number','assault','injure','maim','destroy','damage','threaten','intimidate','bully','menace','blackmail','extort','exploit','defame','steal','rob','embezzle','defraud Harass','jerk','idiot','stupid','moron','asshole','con','trick','swindle','defraud','payment','credit card','bank account','urgent','immediate','afraid','phone number','email','password']
print(f"Blocked words list loaded with {len(BLOCKED_WORDS)} words.")
def get_blocked_words(text):
print("Checking for blocked words in the input text...")
# Split the text into words for accurate matching
words_in_text = text.lower().split()
# Find all blocked words present in the text
blocked_found = [word for word in BLOCKED_WORDS if word.lower() in words_in_text]
print(f"Blocked words found: {blocked_found}")
return blocked_found
# Function to check for blocked words
def contains_blocked_words(text):
return any(word.lower() in text.lower() for word in BLOCKED_WORDS)
# Function to send email with downloadable file using MailerSend
def send_email_with_file(recipient_email, file_path, subject, body):
print(f"Sending email to {recipient_email} with file {file_path}...")
try:
mailer = emails.NewEmail(MAILERSEND_API_KEY)
mail_body = {}
mail_from = {
"name": MAILERSEND_SENDER_NAME,
"email": MAILERSEND_SENDER_EMAIL,
}
recipients = [
{
"name": "Recipient",
"email": recipient_email,
}
]
mailer.set_mail_from(mail_from, mail_body)
mailer.set_mail_to(recipients, mail_body)
mailer.set_subject(subject, mail_body)
mailer.set_html_content(f"<p>{body}</p>", mail_body)
mailer.set_plaintext_content(body, mail_body)
with open(file_path, "rb") as file:
attachment_content = base64.b64encode(file.read()).decode('utf-8')
attachments = [
{
"filename": os.path.basename(file_path),
"content": attachment_content,
"disposition": "attachment"
}
]
mailer.set_attachments(attachments, mail_body)
response = mailer.send(mail_body)
print(f"MailerSend response: {response}")
if response[0] == 202:
print("Email sent successfully.")
return True
else:
print("Failed to send email.")
return False
except Exception as e:
print(f"Error sending email: {e}")
return False
# S3 upload functions
def upload_to_s3(local_file, bucket, s3_file):
print(f"Uploading {local_file} to S3 bucket {bucket} as {s3_file}...")
s3 = boto3.client('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION_NAME)
try:
s3.upload_file(local_file, bucket, s3_file, ExtraArgs={'ACL': 'public-read'})
print("File uploaded to S3 successfully.")
return True
except FileNotFoundError:
print("File not found.")
return False
except NoCredentialsError:
print("AWS credentials not available.")
return False
except Exception as e:
print(f"Error uploading to S3: {e}")
return False
def upload_voice_sample_and_metadata(sample_path, metadata, bucket):
print("Uploading voice sample and metadata to S3...")
# Upload the voice sample
sample_filename = os.path.basename(sample_path)
s3_sample_path = f'voice_samples/{sample_filename}'
if not upload_to_s3(sample_path, bucket, s3_sample_path):
print("Failed to upload voice sample.")
return False
# Create and upload metadata file
metadata['sample_s3_path'] = s3_sample_path
metadata_filename = f"{os.path.splitext(sample_filename)[0]}_metadata.json"
s3_metadata_path = f'voice_metadata/{metadata_filename}'
# Save metadata to a temporary file
temp_metadata_path = '/tmp/temp_metadata.json'
try:
with open(temp_metadata_path, 'w') as f:
json.dump(metadata, f)
print("Metadata file created.")
except Exception as e:
print(f"Error creating metadata file: {e}")
return False
# Upload metadata file
if not upload_to_s3(temp_metadata_path, bucket, s3_metadata_path):
print("Failed to upload metadata file.")
return False
# Clean up temporary file
try:
os.remove(temp_metadata_path)
print("Temporary metadata file removed.")
except Exception as e:
print(f"Error removing temporary file: {e}")
print("Voice sample and metadata uploaded successfully.")
return True
def predict(prompt, style, audio_file_pth, voice_name, customer_email, order_name):
print("Prediction started...")
text_hint = 'Your file will only be saved for 24 hours.\n'
if len(prompt) < 2:
text_hint += "[ERROR] Please provide a longer prompt text.\n"
print("Prompt is too short.")
return text_hint, None, None
if len(prompt) > 200:
text_hint += "[ERROR] Text length limited to 200 characters. Please try shorter text.\n"
print("Prompt is too long.")
return text_hint, None, None
blocked_words = get_blocked_words(prompt)
if blocked_words:
text_hint += f"[ERROR] Your text contains blocked words: {', '.join(blocked_words)}. Please remove them and try again.\n"
print(f"Blocked words detected: {blocked_words}")
return text_hint, None, None
# Check if audio file was uploaded
if audio_file_pth is None:
text_hint += "[ERROR] No audio file was uploaded. Please upload a reference audio file.\n"
print("No audio file uploaded.")
return text_hint, None, None
# Copy the sample audio to the samples directory
try:
sample_filename = f"{voice_name}_{customer_email}_sample.mp3"
sample_path = os.path.join(samples_dir, sample_filename)
shutil.copy2(audio_file_pth, sample_path)
print(f"Audio file copied to {sample_path}.")
except Exception as e:
text_hint += f"[ERROR] Failed to copy audio file: {str(e)}\n"
print(f"Error copying audio file: {e}")
return text_hint, None, None
# Prepare metadata
metadata = {
'name': voice_name,
'email': customer_email,
'order_name': order_name
}
print("Metadata prepared:", metadata)
# Use ElevenLabs API to clone the voice and generate audio
try:
full_voice_name = f"{voice_name}_{customer_email}"
print(f"Cloning voice with name: {full_voice_name}")
voice = client.clone(
name=full_voice_name,
description="A trial voice model for testing",
files=[sample_path],
)
print("Voice cloned successfully.")
print("Generating audio using the cloned voice...")
audio = client.generate(text=prompt, voice=voice)
output_audio_path = os.path.join(output_dir, f"{full_voice_name}_output.mp3")
save(audio, output_audio_path)
print(f"Audio generated and saved to {output_audio_path}.")
text_hint += "Audio generated successfully using ElevenLabs.\n"
except Exception as e:
text_hint += f"[ERROR] ElevenLabs API error: {e}\n"
print(f"ElevenLabs API error: {e}")
return text_hint, None, None
# Optionally upload to S3 (uncommented)
if not upload_voice_sample_and_metadata(sample_path, metadata, S3_BUCKET_NAME):
text_hint += "[ERROR] Failed to upload to S3.\n"
print("Failed to upload voice sample and metadata to S3.")
return text_hint, None, None
# Send email with the generated audio file
email_subject = "Your Voice Clone Audio is Ready"
email_body = f"Hi {voice_name},\n\nYour voice clone audio file is ready. Please find the attached file.\n\nBest regards,\nVoice Clone App"
if send_email_with_file(customer_email, output_audio_path, email_subject, email_body):
text_hint += "An email has been sent with your audio file.\n"
else:
text_hint += "An email has been sent with your audio file.\n"
print("Failed to send email with the audio file.")
return text_hint, output_audio_path, sample_path
with gr.Blocks(gr.themes.Glass()) as demo:
with gr.Row():
with gr.Column():
input_text_gr = gr.Textbox(
label="Create This",
info="One or two sentences at a time is better. Up to 200 text characters.",
value="He hoped there would be stew for dinner, turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick, peppered, flour-fattened sauce.",
)
style_gr = gr.Dropdown(
label="Style",
choices=['default', 'whispering', 'cheerful', 'terrified', 'angry', 'sad', 'friendly'],
info="Please upload a reference audio file that is at least 1 minute long. For best results, ensure the audio is clear.",
max_choices=1,
value="default",
)
ref_gr = gr.Audio(
label="Original Audio",
type="filepath",
sources=["upload"],
)
voice_name_gr = gr.Textbox(
label="Your name",
value="Sam"
)
order_gr = gr.Textbox(
label="Your order",
value="Sample Order",
)
customer_email_gr = gr.Textbox(
label="Your Email",
info="We'll send you a downloadable file to this email address."
)
tts_button = gr.Button("Start", elem_id="send-btn", visible=True)
print("Gradio input components initialized.")
with gr.Column():
out_text_gr = gr.Text(label="Info")
audio_gr = gr.Audio(label="Generated Audio", autoplay=True)
ref_audio_gr = gr.Audio(label="Original Audio Used")
print("Gradio output components initialized.")
tts_button.click(
predict,
[input_text_gr, style_gr, ref_gr, voice_name_gr, customer_email_gr, order_gr],
outputs=[out_text_gr, audio_gr, ref_audio_gr]
)
print("Gradio button click event bound to predict function.")
demo.queue()
print("Launching Gradio demo...")
demo.launch(debug=True, show_api=False, share=args.share)
print("Gradio demo launched.")
css = """
footer {visibility: hidden}
audio .btn-container {display: none}
"""
demo.add_css(css)
print("Custom CSS added to Gradio interface.")