Spaces:
Runtime error
Runtime error
| import os | |
| import torch | |
| import argparse | |
| import gradio as gr | |
| from mailersend import emails | |
| from dotenv import load_dotenv | |
| import base64 | |
| import psycopg2 | |
| from urllib.parse import urlparse, parse_qs | |
| import shutil | |
| import boto3 | |
| from botocore.exceptions import NoCredentialsError | |
| import json | |
| from elevenlabs.client import ElevenLabs | |
| from elevenlabs import play, save | |
| # Load environment variables | |
| print("Loading environment variables...") | |
| load_dotenv() | |
| print("Environment variables loaded.") | |
| # Argument parsing | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--share", action='store_true', default=False, help="make link public") | |
| args = parser.parse_args() | |
| print(f"Arguments parsed. Share mode is set to: {args.share}") | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| print(f"Using device: {device}") | |
| output_dir = 'outputs' | |
| samples_dir = 'samples' | |
| os.makedirs(output_dir, exist_ok=True) | |
| os.makedirs(samples_dir, exist_ok=True) | |
| print(f"Directories '{output_dir}' and '{samples_dir}' are ready.") | |
| supported_languages = ['zh', 'en'] | |
| print(f"Supported languages: {supported_languages}") | |
| # Load API keys and configurations | |
| MAILERSEND_API_KEY = os.getenv("MAILERSEND_API_KEY") | |
| MAILERSEND_DOMAIN = os.getenv("MAILERSEND_DOMAIN") | |
| MAILERSEND_SENDER_EMAIL = f"noreply@{MAILERSEND_DOMAIN}" | |
| MAILERSEND_SENDER_NAME = "Voice Clone App" | |
| print("MailerSend configuration loaded.") | |
| ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") | |
| client = ElevenLabs(api_key=ELEVENLABS_API_KEY) | |
| print("ElevenLabs client initialized.") | |
| AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
| AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
| AWS_REGION_NAME = os.getenv('AWS_REGION_NAME') | |
| S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME') | |
| print("AWS S3 configuration loaded.") | |
| # List of blocked words | |
| BLOCKED_WORDS = ['Kill','hurt','shoot','gun','rifle','AR','semi automatic','knife','blade','sword','punch harm','disrupt','blackmail','steal','bitch','cunt','fuck','freaking','nigger','nigga','niggas','cracker','jew','oriental','fag','faggot','account','money','transfer','urgent','help','scared','policy','frightened','accident','fear','scam','address','social security number','assault','injure','maim','destroy','damage','threaten','intimidate','bully','menace','blackmail','extort','exploit','defame','steal','rob','embezzle','defraud Harass','jerk','idiot','stupid','moron','asshole','con','trick','swindle','defraud','payment','credit card','bank account','urgent','immediate','afraid','phone number','email','password'] | |
| print(f"Blocked words list loaded with {len(BLOCKED_WORDS)} words.") | |
| def get_blocked_words(text): | |
| print("Checking for blocked words in the input text...") | |
| # Split the text into words for accurate matching | |
| words_in_text = text.lower().split() | |
| # Find all blocked words present in the text | |
| blocked_found = [word for word in BLOCKED_WORDS if word.lower() in words_in_text] | |
| print(f"Blocked words found: {blocked_found}") | |
| return blocked_found | |
| # Function to check for blocked words | |
| def contains_blocked_words(text): | |
| return any(word.lower() in text.lower() for word in BLOCKED_WORDS) | |
| # Function to send email with downloadable file using MailerSend | |
| def send_email_with_file(recipient_email, file_path, subject, body): | |
| print(f"Sending email to {recipient_email} with file {file_path}...") | |
| try: | |
| mailer = emails.NewEmail(MAILERSEND_API_KEY) | |
| mail_body = {} | |
| mail_from = { | |
| "name": MAILERSEND_SENDER_NAME, | |
| "email": MAILERSEND_SENDER_EMAIL, | |
| } | |
| recipients = [ | |
| { | |
| "name": "Recipient", | |
| "email": recipient_email, | |
| } | |
| ] | |
| mailer.set_mail_from(mail_from, mail_body) | |
| mailer.set_mail_to(recipients, mail_body) | |
| mailer.set_subject(subject, mail_body) | |
| mailer.set_html_content(f"<p>{body}</p>", mail_body) | |
| mailer.set_plaintext_content(body, mail_body) | |
| with open(file_path, "rb") as file: | |
| attachment_content = base64.b64encode(file.read()).decode('utf-8') | |
| attachments = [ | |
| { | |
| "filename": os.path.basename(file_path), | |
| "content": attachment_content, | |
| "disposition": "attachment" | |
| } | |
| ] | |
| mailer.set_attachments(attachments, mail_body) | |
| response = mailer.send(mail_body) | |
| print(f"MailerSend response: {response}") | |
| if response[0] == 202: | |
| print("Email sent successfully.") | |
| return True | |
| else: | |
| print("Failed to send email.") | |
| return False | |
| except Exception as e: | |
| print(f"Error sending email: {e}") | |
| return False | |
| # S3 upload functions | |
| def upload_to_s3(local_file, bucket, s3_file): | |
| print(f"Uploading {local_file} to S3 bucket {bucket} as {s3_file}...") | |
| s3 = boto3.client('s3', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name=AWS_REGION_NAME) | |
| try: | |
| s3.upload_file(local_file, bucket, s3_file, ExtraArgs={'ACL': 'public-read'}) | |
| print("File uploaded to S3 successfully.") | |
| return True | |
| except FileNotFoundError: | |
| print("File not found.") | |
| return False | |
| except NoCredentialsError: | |
| print("AWS credentials not available.") | |
| return False | |
| except Exception as e: | |
| print(f"Error uploading to S3: {e}") | |
| return False | |
| def upload_voice_sample_and_metadata(sample_path, metadata, bucket): | |
| print("Uploading voice sample and metadata to S3...") | |
| # Upload the voice sample | |
| sample_filename = os.path.basename(sample_path) | |
| s3_sample_path = f'voice_samples/{sample_filename}' | |
| if not upload_to_s3(sample_path, bucket, s3_sample_path): | |
| print("Failed to upload voice sample.") | |
| return False | |
| # Create and upload metadata file | |
| metadata['sample_s3_path'] = s3_sample_path | |
| metadata_filename = f"{os.path.splitext(sample_filename)[0]}_metadata.json" | |
| s3_metadata_path = f'voice_metadata/{metadata_filename}' | |
| # Save metadata to a temporary file | |
| temp_metadata_path = '/tmp/temp_metadata.json' | |
| try: | |
| with open(temp_metadata_path, 'w') as f: | |
| json.dump(metadata, f) | |
| print("Metadata file created.") | |
| except Exception as e: | |
| print(f"Error creating metadata file: {e}") | |
| return False | |
| # Upload metadata file | |
| if not upload_to_s3(temp_metadata_path, bucket, s3_metadata_path): | |
| print("Failed to upload metadata file.") | |
| return False | |
| # Clean up temporary file | |
| try: | |
| os.remove(temp_metadata_path) | |
| print("Temporary metadata file removed.") | |
| except Exception as e: | |
| print(f"Error removing temporary file: {e}") | |
| print("Voice sample and metadata uploaded successfully.") | |
| return True | |
| def predict(prompt, style, audio_file_pth, voice_name, customer_email, order_name): | |
| print("Prediction started...") | |
| text_hint = 'Your file will only be saved for 24 hours.\n' | |
| if len(prompt) < 2: | |
| text_hint += "[ERROR] Please provide a longer prompt text.\n" | |
| print("Prompt is too short.") | |
| return text_hint, None, None | |
| if len(prompt) > 200: | |
| text_hint += "[ERROR] Text length limited to 200 characters. Please try shorter text.\n" | |
| print("Prompt is too long.") | |
| return text_hint, None, None | |
| blocked_words = get_blocked_words(prompt) | |
| if blocked_words: | |
| text_hint += f"[ERROR] Your text contains blocked words: {', '.join(blocked_words)}. Please remove them and try again.\n" | |
| print(f"Blocked words detected: {blocked_words}") | |
| return text_hint, None, None | |
| # Check if audio file was uploaded | |
| if audio_file_pth is None: | |
| text_hint += "[ERROR] No audio file was uploaded. Please upload a reference audio file.\n" | |
| print("No audio file uploaded.") | |
| return text_hint, None, None | |
| # Copy the sample audio to the samples directory | |
| try: | |
| sample_filename = f"{voice_name}_{customer_email}_sample.mp3" | |
| sample_path = os.path.join(samples_dir, sample_filename) | |
| shutil.copy2(audio_file_pth, sample_path) | |
| print(f"Audio file copied to {sample_path}.") | |
| except Exception as e: | |
| text_hint += f"[ERROR] Failed to copy audio file: {str(e)}\n" | |
| print(f"Error copying audio file: {e}") | |
| return text_hint, None, None | |
| # Prepare metadata | |
| metadata = { | |
| 'name': voice_name, | |
| 'email': customer_email, | |
| 'order_name': order_name | |
| } | |
| print("Metadata prepared:", metadata) | |
| # Use ElevenLabs API to clone the voice and generate audio | |
| try: | |
| full_voice_name = f"{voice_name}_{customer_email}" | |
| print(f"Cloning voice with name: {full_voice_name}") | |
| voice = client.clone( | |
| name=full_voice_name, | |
| description="A trial voice model for testing", | |
| files=[sample_path], | |
| ) | |
| print("Voice cloned successfully.") | |
| print("Generating audio using the cloned voice...") | |
| audio = client.generate(text=prompt, voice=voice) | |
| output_audio_path = os.path.join(output_dir, f"{full_voice_name}_output.mp3") | |
| save(audio, output_audio_path) | |
| print(f"Audio generated and saved to {output_audio_path}.") | |
| text_hint += "Audio generated successfully using ElevenLabs.\n" | |
| except Exception as e: | |
| text_hint += f"[ERROR] ElevenLabs API error: {e}\n" | |
| print(f"ElevenLabs API error: {e}") | |
| return text_hint, None, None | |
| # Optionally upload to S3 (uncommented) | |
| if not upload_voice_sample_and_metadata(sample_path, metadata, S3_BUCKET_NAME): | |
| text_hint += "[ERROR] Failed to upload to S3.\n" | |
| print("Failed to upload voice sample and metadata to S3.") | |
| return text_hint, None, None | |
| # Send email with the generated audio file | |
| email_subject = "Your Voice Clone Audio is Ready" | |
| email_body = f"Hi {voice_name},\n\nYour voice clone audio file is ready. Please find the attached file.\n\nBest regards,\nVoice Clone App" | |
| if send_email_with_file(customer_email, output_audio_path, email_subject, email_body): | |
| text_hint += "An email has been sent with your audio file.\n" | |
| else: | |
| text_hint += "An email has been sent with your audio file.\n" | |
| print("Failed to send email with the audio file.") | |
| return text_hint, output_audio_path, sample_path | |
| with gr.Blocks(gr.themes.Glass()) as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_text_gr = gr.Textbox( | |
| label="Create This", | |
| info="One or two sentences at a time is better. Up to 200 text characters.", | |
| value="He hoped there would be stew for dinner, turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick, peppered, flour-fattened sauce.", | |
| ) | |
| style_gr = gr.Dropdown( | |
| label="Style", | |
| choices=['default', 'whispering', 'cheerful', 'terrified', 'angry', 'sad', 'friendly'], | |
| info="Please upload a reference audio file that is at least 1 minute long. For best results, ensure the audio is clear.", | |
| max_choices=1, | |
| value="default", | |
| ) | |
| ref_gr = gr.Audio( | |
| label="Original Audio", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| voice_name_gr = gr.Textbox( | |
| label="Your name", | |
| value="Sam" | |
| ) | |
| order_gr = gr.Textbox( | |
| label="Your order", | |
| value="Sample Order", | |
| ) | |
| customer_email_gr = gr.Textbox( | |
| label="Your Email", | |
| info="We'll send you a downloadable file to this email address." | |
| ) | |
| tts_button = gr.Button("Start", elem_id="send-btn", visible=True) | |
| print("Gradio input components initialized.") | |
| with gr.Column(): | |
| out_text_gr = gr.Text(label="Info") | |
| audio_gr = gr.Audio(label="Generated Audio", autoplay=True) | |
| ref_audio_gr = gr.Audio(label="Original Audio Used") | |
| print("Gradio output components initialized.") | |
| tts_button.click( | |
| predict, | |
| [input_text_gr, style_gr, ref_gr, voice_name_gr, customer_email_gr, order_gr], | |
| outputs=[out_text_gr, audio_gr, ref_audio_gr] | |
| ) | |
| print("Gradio button click event bound to predict function.") | |
| demo.queue() | |
| print("Launching Gradio demo...") | |
| demo.launch(debug=True, show_api=False, share=args.share) | |
| print("Gradio demo launched.") | |
| css = """ | |
| footer {visibility: hidden} | |
| audio .btn-container {display: none} | |
| """ | |
| demo.add_css(css) | |
| print("Custom CSS added to Gradio interface.") | |