Persian-TTS-sherpa / app_utils.py
karim23657's picture
Update app_utils.py
4e7459e verified
import os
import json
import shutil
import subprocess
import requests
import tarfile
from pathlib import Path
import soundfile as sf
import sherpa_onnx
import numpy as np
models = [
['mms fa','https://huggingface.co/willwade/mms-tts-multilingual-models-onnx/resolve/main/fas',"🌠 راد",'https://huggingface.co/facebook/mms-tts-fas'],
['coqui-vits-female1-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/persian-tts-female1-vits-coqui',"🌺 نگار",'https://huggingface.co/Kamtera/persian-tts-female1-vits'],
['coqui-vits-male1-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/persian-tts-male1-vits-coqui',"🌟 آرش",'https://huggingface.co/Kamtera/persian-tts-male1-vits'],
['coqui-vits-male-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/male-male-coqui-vits',"🦁 کیان",'https://huggingface.co/Kamtera/persian-tts-male-vits'],
['coqui-vits-female-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/female-female-coqui-vits',"🌷 مهتاب",'https://huggingface.co/Kamtera/persian-tts-female-vits'],
['coqui-vits-female-GPTInformal-karim23657','https://huggingface.co/karim23657/persian-tts-vits/tree/main/female-GPTInformal-coqui-vits',"🌼 شیوا",'https://huggingface.co/karim23657/persian-tts-female-GPTInformal-Persian-vits'],
['coqui-vits-male-SmartGitiCorp','https://huggingface.co/karim23657/persian-tts-vits/tree/main/male-SmartGitiCorp-coqui-vits',"🚀 بهمن",'https://huggingface.co/SmartGitiCorp/persian_tts_vits'],
['vits-piper-fa-ganji','https://huggingface.co/karim23657/persian-tts-vits/tree/main/vits-piper-fa-ganji',"🚀 برنا",'https://huggingface.co/SadeghK/persian-text-to-speech'],
['vits-piper-fa-ganji-adabi','https://huggingface.co/karim23657/persian-tts-vits/tree/main/vits-piper-fa-ganji-adabi',"🚀 برنا-1",'https://huggingface.co/SadeghK/persian-text-to-speech'],
['vits-piper-fa-gyro-medium','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-piper-fa_IR-gyro-medium.tar.bz2',"💧 نیما",'https://huggingface.co/gyroing/Persian-Piper-Model-gyro'],
['piper-fa-amir-medium','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-piper-fa_IR-amir-medium.tar.bz2',"⚡️ آریا",'https://huggingface.co/SadeghK/persian-text-to-speech'],
['vits-mimic3-fa-haaniye_low','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-mimic3-fa-haaniye_low.tar.bz2',"🌹 ریما",'https://github.com/MycroftAI/mimic3'],
['vits-piper-fa_en-rezahedayatfar-ibrahimwalk-medium','https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/vits-piper-fa_en-rezahedayatfar-ibrahimwalk-medium.tar.bz2',"🌠 پیام",'https://huggingface.co/mah92/persian-english-piper-tts-model'],
]
def download_and_extract_model(url, destination):
"""Download and extract the model files."""
print(f"Downloading from URL: {url}")
print(f"Destination: {destination}")
# Convert Hugging Face URL format if needed
if "huggingface.co" in url:
# Replace /tree/main/ with /resolve/main/ for direct file download
base_url = url.replace("/tree/main/", "/resolve/main/")
model_id = base_url.split("/")[-1]
# Check if this is an MMS model
is_mms_model = True
if is_mms_model:
# MMS models have both model.onnx and tokens.txt
model_url = f"{base_url}/model.onnx"
tokens_url = f"{base_url}/tokens.txt"
# Download model.onnx
print("Downloading model.onnx...")
model_path = os.path.join(destination, "model.onnx")
response = requests.get(model_url, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download model from {model_url}. Status code: {response.status_code}")
total_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
print(f"Total size: {total_size / (1024*1024):.1f} MB")
with open(model_path, "wb") as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = int((downloaded / total_size) * 100)
if percent % 10 == 0:
print(f" {percent}%", end="", flush=True)
print("\nModel download complete")
# Download tokens.txt
print("Downloading tokens.txt...")
tokens_path = os.path.join(destination, "tokens.txt")
response = requests.get(tokens_url, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download tokens from {tokens_url}. Status code: {response.status_code}")
with open(tokens_path, "wb") as f:
f.write(response.content)
print("Tokens download complete")
return
else:
# Other models are stored as tar.bz2 files
url = f"{base_url}.tar.bz2"
# Try the URL
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download model from {url}. Status code: {response.status_code}")
# Check if this is a Git LFS file pointer
content_start = response.content[:100].decode('utf-8', errors='ignore')
if content_start.startswith('version https://git-lfs.github.com/spec/v1'):
raise Exception(f"Received Git LFS pointer instead of file content from {url}")
# Create model directory if it doesn't exist
os.makedirs(destination, exist_ok=True)
# For non-MMS models, handle tar.bz2 files
tar_path = os.path.join(destination, "model.tar.bz2")
# Download the file
print("Downloading model archive...")
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
print(f"Total size: {total_size / (1024*1024):.1f} MB")
with open(tar_path, "wb") as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = int((downloaded / total_size) * 100)
if percent % 10 == 0:
print(f" {percent}%", end="", flush=True)
print("\nDownload complete")
# Extract the tar.bz2 file
print(f"Extracting {tar_path} to {destination}")
try:
with tarfile.open(tar_path, "r:bz2") as tar:
tar.extractall(path=destination)
os.remove(tar_path)
print("Extraction complete")
except Exception as e:
print(f"Error during extraction: {str(e)}")
raise
print("Contents of destination directory:")
for root, dirs, files in os.walk(destination):
print(f"\nDirectory: {root}")
if dirs:
print(" Subdirectories:", dirs)
if files:
print(" Files:", files)
def dl_espeak_data():
# Download the file
tar_path='espeak-ng-data.tar.bz2'
print("Downloading model archive...")
response = requests.get('https://github.com/k2-fsa/sherpa-onnx/releases/download/tts-models/espeak-ng-data.tar.bz2', stream=True)
total_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
print(f"Total size: {total_size / (1024*1024):.1f} MB")
with open(tar_path, "wb") as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = int((downloaded / total_size) * 100)
if percent % 10 == 0:
print(f" {percent}%", end="", flush=True)
print("\nDownload complete")
# Extract the tar.bz2 file
destination=os.path.dirname(os.path.abspath(__file__))
print(f"Extracting {tar_path} to {destination}")
try:
with tarfile.open(tar_path, "r:bz2") as tar:
tar.extractall(path=destination)
os.remove(tar_path)
print("Extraction complete")
except Exception as e:
print(f"Error during extraction: {str(e)}")
raise
print("Contents of destination directory:")
for root, dirs, files in os.walk(destination):
print(f"\nDirectory: {root}")
if dirs:
print(" Subdirectories:", dirs)
if files:
print(" Files:", files)
dl_espeak_data()
def find_model_files(model_dir):
"""Find model files in the given directory and its subdirectories."""
model_files = {}
# Check if this is an MMS model
is_mms = True
for root, _, files in os.walk(model_dir):
for file in files:
file_path = os.path.join(root, file)
# Model file
if file.endswith('.onnx'):
model_files['model'] = file_path
# Tokens file
elif file == 'tokens.txt':
model_files['tokens'] = file_path
# Lexicon file (only for non-MMS models)
elif file == 'lexicon.txt' and not is_mms:
model_files['lexicon'] = file_path
# Create empty lexicon file if needed (only for non-MMS models)
if not is_mms and 'model' in model_files and 'lexicon' not in model_files:
model_dir = os.path.dirname(model_files['model'])
lexicon_path = os.path.join(model_dir, 'lexicon.txt')
with open(lexicon_path, 'w', encoding='utf-8') as f:
pass # Create empty file
model_files['lexicon'] = lexicon_path
return model_files if 'model' in model_files else {}
def generate_audio(text, model_info):
"""Generate audio from text using the specified model."""
try:
model_dir = os.path.join("./models", model_info)
print(f"\nLooking for model in: {model_dir}")
# Download model if it doesn't exist
if not os.path.exists(model_dir):
print(f"Model directory doesn't exist, downloading {model_info}...")
os.makedirs(model_dir, exist_ok=True)
for i in models:
if model_info == i[2]:
model_url=i[1]
download_and_extract_model(model_url, model_dir)
print(f"Contents of {model_dir}:")
for item in os.listdir(model_dir):
item_path = os.path.join(model_dir, item)
if os.path.isdir(item_path):
print(f" Directory: {item}")
print(f" Contents: {os.listdir(item_path)}")
else:
print(f" File: {item}")
# Find and validate model files
model_files = find_model_files(model_dir)
if not model_files or 'model' not in model_files:
raise ValueError(f"Could not find required model files in {model_dir}")
print("\nFound model files:")
print(f"Model: {model_files['model']}")
print(f"Tokens: {model_files.get('tokens', 'Not found')}")
print(f"Lexicon: {model_files.get('lexicon', 'Not required for MMS')}\n")
# Check if this is an MMS model
is_mms = 'mms' in os.path.basename(model_dir).lower()
# Create configuration based on model type
if is_mms:
if 'tokens' not in model_files or not os.path.exists(model_files['tokens']):
raise ValueError("tokens.txt is required for MMS models")
# MMS models use tokens.txt and no lexicon
vits_config = sherpa_onnx.OfflineTtsVitsModelConfig(
model_files['model'], # model
'', # lexicon
model_files['tokens'], # tokens
'', # data_dir
'', # dict_dir
0.667, # noise_scale
0.8, # noise_scale_w
1.0 # length_scale
)
else:
# Non-MMS models use lexicon.txt
if 'tokens' not in model_files or not os.path.exists(model_files['tokens']):
raise ValueError("tokens.txt is required for VITS models")
# Set data dir if it exists
espeak_data = os.path.join(os.path.dirname(model_files['model']), 'espeak-ng-data')
data_dir = espeak_data if os.path.exists(espeak_data) else 'espeak-ng-data'
# Get lexicon path if it exists
lexicon = model_files.get('lexicon', '') if os.path.exists(model_files.get('lexicon', '')) else ''
# Create VITS model config
vits_config = sherpa_onnx.OfflineTtsVitsModelConfig(
model_files['model'], # model
lexicon, # lexicon
model_files['tokens'], # tokens
data_dir, # data_dir
'', # dict_dir
0.667, # noise_scale
0.8, # noise_scale_w
1.0 # length_scale
)
# Create the model config with VITS
model_config = sherpa_onnx.OfflineTtsModelConfig()
model_config.vits = vits_config
# Create TTS configuration
config = sherpa_onnx.OfflineTtsConfig(
model=model_config,
max_num_sentences=2
)
# Initialize TTS engine
tts = sherpa_onnx.OfflineTts(config)
# Generate audio
audio_data = tts.generate(text)
# Ensure we have valid audio data
if audio_data is None or len(audio_data.samples) == 0:
raise ValueError("Failed to generate audio - no data generated")
# Convert samples list to numpy array and normalize
audio_array = np.array(audio_data.samples, dtype=np.float32)
if np.any(audio_array): # Check if array is not all zeros
audio_array = audio_array / np.abs(audio_array).max()
else:
raise ValueError("Generated audio is empty")
# Return in Gradio's expected format (numpy array, sample rate)
return (audio_array, audio_data.sample_rate)
except Exception as e:
error_msg = str(e)
# Check for OOV or token conversion errors
if "out of vocabulary" in error_msg.lower() or "token" in error_msg.lower():
error_msg = f"Text contains unsupported characters: {error_msg}"
print(f"Error generating audio: {error_msg}")
print(f"Error in TTS generation: {error_msg}")
raise
def tts_interface(selected_model, text, status_output):
try:
if not text.strip():
return None, "Please enter some text"
model_id = selected_model
# Store original text for status message
original_text = text
try:
# Update status with language info
voice_name = model_id
status = f"Generating speech using {voice_name} ..."
# Generate audio
audio_data, sample_rate = generate_audio(text, model_id)
# Include translation info in final status if text was actually translated
final_status = f"Generated speech using {voice_name}"
final_status += f"\nText: '{text}'"
return (sample_rate, audio_data), final_status
except ValueError as e:
# Handle known errors with user-friendly messages
error_msg = str(e)
if "cannot process some words" in error_msg.lower():
return None, error_msg
return None, f"Error: {error_msg}"
except Exception as e:
print(f"Error in TTS generation: {str(e)}")
error_msg = str(e)
return None, f"Error: {error_msg}"