Spaces:
Runtime error
Runtime error
import os | |
import shutil | |
import subprocess | |
import re | |
from pydub import AudioSegment | |
import tempfile | |
from tqdm import tqdm | |
import gradio as gr | |
import nltk | |
import ebooklib | |
import bs4 | |
from ebooklib import epub | |
from bs4 import BeautifulSoup | |
from nltk.tokenize import sent_tokenize | |
import csv | |
import argparse | |
import threading | |
import logging | |
from datetime import datetime | |
import time | |
import json | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("audiobook_converter.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger("audiobook_converter") | |
# Download NLTK resources if not already present | |
try: | |
nltk.data.find('tokenizers/punkt') | |
except LookupError: | |
logger.info("Downloading NLTK punkt tokenizer...") | |
nltk.download('punkt', quiet=True) | |
# Utility functions for directory management | |
def ensure_directory(directory_path): | |
"""Create directory if it doesn't exist.""" | |
if not os.path.exists(directory_path): | |
os.makedirs(directory_path) | |
logger.info(f"Created directory: {directory_path}") | |
return directory_path | |
def remove_directory(folder_path): | |
"""Remove directory and all its contents.""" | |
if os.path.exists(folder_path): | |
try: | |
shutil.rmtree(folder_path) | |
logger.info(f"Removed directory: {folder_path}") | |
except Exception as e: | |
logger.error(f"Error removing directory {folder_path}: {e}") | |
def wipe_directory(folder_path): | |
"""Remove all contents of a directory without deleting the directory itself.""" | |
if not os.path.exists(folder_path): | |
logger.warning(f"Directory does not exist: {folder_path}") | |
return | |
for item in os.listdir(folder_path): | |
item_path = os.path.join(folder_path, item) | |
try: | |
if os.path.isfile(item_path): | |
os.remove(item_path) | |
elif os.path.isdir(item_path): | |
shutil.rmtree(item_path) | |
except Exception as e: | |
logger.error(f"Error removing {item_path}: {e}") | |
logger.info(f"Wiped contents of directory: {folder_path}") | |
# Text processing functions | |
def clean_text(text): | |
"""Clean up text by removing unnecessary whitespace and fixing common issues.""" | |
# Replace multiple newlines with a single one | |
text = re.sub(r'\n\s*\n', '\n\n', text) | |
# Replace multiple spaces with a single space | |
text = re.sub(r' +', ' ', text) | |
# Fix broken sentences (e.g., "word . Next" -> "word. Next") | |
text = re.sub(r'(\w) \. (\w)', r'\1. \2', text) | |
return text.strip() | |
def split_into_natural_sentences(text): | |
"""Split text into natural sentences using NLTK with additional rules.""" | |
# Initial sentence splitting | |
sentences = sent_tokenize(text) | |
# Post-process sentences to handle special cases | |
processed_sentences = [] | |
buffer = "" | |
for sent in sentences: | |
# Handle quotes that span multiple sentences but should be treated as one | |
if buffer: | |
current = buffer + " " + sent | |
buffer = "" | |
else: | |
current = sent | |
# Check for unbalanced quotes, which might indicate a continuing sentence | |
if current.count('"') % 2 != 0 or current.count("'") % 2 != 0: | |
buffer = current | |
continue | |
# Check if sentence ends with abbreviation or is too short (might be a continuation) | |
if len(current) < 20 and not re.search(r'[.!?]\s*$', current): | |
buffer = current | |
continue | |
processed_sentences.append(current) | |
# Add any remaining buffer | |
if buffer: | |
processed_sentences.append(buffer) | |
return processed_sentences | |
# eBook processing functions | |
def extract_metadata_and_cover(ebook_path): | |
"""Extract metadata and cover image from an ebook.""" | |
cover_path = ebook_path.rsplit('.', 1)[0] + '.jpg' | |
try: | |
subprocess.run(['ebook-meta', ebook_path, '--get-cover', cover_path], | |
check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) | |
# Check if cover was extracted | |
if os.path.exists(cover_path) and os.path.getsize(cover_path) > 0: | |
logger.info(f"Cover extracted to: {cover_path}") | |
return cover_path | |
else: | |
logger.warning("Cover extraction failed or resulted in empty file") | |
return None | |
except Exception as e: | |
logger.error(f"Error extracting eBook metadata: {e}") | |
return None | |
def convert_to_epub(input_path, output_path): | |
"""Convert any ebook format to EPUB using Calibre.""" | |
try: | |
logger.info(f"Converting {input_path} to EPUB format...") | |
result = subprocess.run( | |
['ebook-convert', input_path, output_path, '--enable-heuristics'], | |
check=True, | |
stderr=subprocess.PIPE, | |
stdout=subprocess.PIPE | |
) | |
logger.info(f"Successfully converted to EPUB: {output_path}") | |
return True | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error converting to EPUB: {e}") | |
logger.error(f"STDERR: {e.stderr.decode('utf-8', errors='replace')}") | |
return False | |
def convert_to_text(input_path, output_path): | |
"""Convert any ebook format directly to TXT using Calibre.""" | |
try: | |
logger.info(f"Converting {input_path} to TXT format...") | |
result = subprocess.run( | |
['ebook-convert', input_path, output_path, | |
'--enable-heuristics', | |
'--chapter-mark=pagebreak', | |
'--paragraph-type=unformatted'], | |
check=True, | |
stderr=subprocess.PIPE, | |
stdout=subprocess.PIPE | |
) | |
logger.info(f"Successfully converted to TXT: {output_path}") | |
return True | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error converting to TXT: {e}") | |
logger.error(f"STDERR: {e.stderr.decode('utf-8', errors='replace')}") | |
return False | |
def detect_chapters_from_text(text_path): | |
"""Detect chapters in a text file based on common patterns.""" | |
with open(text_path, 'r', encoding='utf-8', errors='replace') as f: | |
content = f.read() | |
# Different chapter detection patterns | |
chapter_patterns = [ | |
r'(?:^|\n)(?:\s*)(?:Chapter|CHAPTER)\s+[0-9IVXLCDM]+(?:\s*:|\.\s|\s)(.+?)(?=\n)', | |
r'(?:^|\n)(?:\s*)(?:Chapter|CHAPTER)\s+[0-9IVXLCDM]+(?:\s*:|\.\s|\s)', | |
r'(?:^|\n)(?:\s*)(?:[0-9]+|[IVXLCDM]+)\.?\s+(.+?)(?=\n)', | |
r'(?:^|\n)(?:\s*)\* \* \*(?:\s*\n)', | |
r'(?:^|\n)(?:\s*)[-—]\s*(\d+\s*[-—]|\w+)(?:\s*\n)' | |
] | |
chapters = [] | |
for pattern in chapter_patterns: | |
matches = re.finditer(pattern, content, re.MULTILINE) | |
positions = [(m.start(), m.group()) for m in matches] | |
if positions: | |
# If we found chapters with this pattern, add to our list | |
for i, (pos, title) in enumerate(positions): | |
end_pos = positions[i+1][0] if i < len(positions)-1 else len(content) | |
chapter_text = content[pos:end_pos].strip() | |
chapters.append((i+1, clean_text(chapter_text))) | |
# If we found chapters with this pattern, stop looking | |
if len(chapters) > 3: # Require at least 3 chapters for a valid detection | |
break | |
# If no chapters detected, create artificial chapters based on length | |
if not chapters: | |
logger.info("No clear chapter markers found, creating artificial chapters") | |
chunk_size = min(10000, max(5000, len(content) // 20)) # Aim for ~20 chapters | |
# Split content into chunks | |
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] | |
chapters = [(i+1, clean_text(chunk)) for i, chunk in enumerate(chunks)] | |
return chapters | |
def save_chapters_as_files(chapters, output_dir): | |
"""Save detected chapters as individual text files.""" | |
ensure_directory(output_dir) | |
for chapter_num, chapter_text in chapters: | |
filename = f"chapter_{chapter_num:03d}.txt" | |
filepath = os.path.join(output_dir, filename) | |
with open(filepath, 'w', encoding='utf-8') as f: | |
f.write(chapter_text) | |
logger.info(f"Saved chapter {chapter_num} to {filename}") | |
return len(chapters) | |
def process_ebook_to_chapters(ebook_path, chapters_dir): | |
"""Process ebook into chapter text files.""" | |
ensure_directory(chapters_dir) | |
# Create temp directory for intermediate files | |
temp_dir = os.path.join(os.path.dirname(chapters_dir), "temp") | |
ensure_directory(temp_dir) | |
# Determine file paths | |
temp_epub = os.path.join(temp_dir, "converted.epub") | |
temp_txt = os.path.join(temp_dir, "converted.txt") | |
# First try direct conversion to text | |
if convert_to_text(ebook_path, temp_txt): | |
chapters = detect_chapters_from_text(temp_txt) | |
num_chapters = save_chapters_as_files(chapters, chapters_dir) | |
logger.info(f"Processed {num_chapters} chapters from text conversion") | |
return num_chapters | |
# If that fails, try EPUB conversion first | |
logger.info("Direct text conversion failed, trying via EPUB...") | |
if convert_to_epub(ebook_path, temp_epub) and convert_to_text(temp_epub, temp_txt): | |
chapters = detect_chapters_from_text(temp_txt) | |
num_chapters = save_chapters_as_files(chapters, chapters_dir) | |
logger.info(f"Processed {num_chapters} chapters via EPUB conversion") | |
return num_chapters | |
# If both methods fail, return 0 chapters | |
logger.error("Failed to process ebook into chapters") | |
return 0 | |
# Audio processing functions | |
def sanitize_for_espeak(text): | |
"""Sanitize text for espeak compatibility.""" | |
# Replace problematic characters | |
text = re.sub(r'[–—]', '-', text) # Em/en dashes to hyphens | |
text = re.sub(r'["""]', '"', text) # Smart quotes to regular quotes | |
text = re.sub(r'[''`]', "'", text) # Smart apostrophes to regular apostrophes | |
text = re.sub(r'[…]', '...', text) # Ellipsis character to three dots | |
# Remove or replace other problematic characters | |
text = re.sub(r'[<>|]', ' ', text) | |
text = re.sub(r'[\x00-\x1F\x7F]', '', text) # Control characters | |
return text | |
def convert_text_to_speech(text, output_path, voice="en", speed=170, pitch=50, gap=5): | |
"""Convert text to speech using espeak-ng.""" | |
try: | |
# Create a temporary file for the text | |
with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8', suffix='.txt', delete=False) as temp_file: | |
temp_file.write(sanitize_for_espeak(text)) | |
temp_file_path = temp_file.name | |
# Call espeak-ng with the text file | |
subprocess.run([ | |
"espeak-ng", | |
"-v", voice, | |
"-f", temp_file_path, | |
"-w", output_path, | |
f"-s{speed}", | |
f"-p{pitch}", | |
f"-g{gap}" | |
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) | |
# Remove the temporary file | |
os.unlink(temp_file_path) | |
# Verify the output file exists and has content | |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
return True | |
else: | |
logger.error(f"Speech synthesis produced empty file: {output_path}") | |
return False | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error in speech synthesis: {e}") | |
logger.error(f"STDERR: {e.stderr.decode('utf-8', errors='replace')}") | |
return False | |
except Exception as e: | |
logger.error(f"Unexpected error in speech synthesis: {e}") | |
return False | |
def convert_chapters_to_audio(chapters_dir, output_audio_dir, voice="en", speed=170, pitch=50, gap=5, progress_callback=None): | |
"""Convert all chapter text files to audio files.""" | |
ensure_directory(output_audio_dir) | |
# Get all chapter files | |
chapter_files = [f for f in os.listdir(chapters_dir) if f.startswith('chapter_') and f.endswith('.txt')] | |
chapter_files.sort(key=lambda f: int(re.search(r'chapter_(\d+)', f).group(1))) | |
total_chapters = len(chapter_files) | |
processed_chapters = 0 | |
failed_chapters = 0 | |
for chapter_file in chapter_files: | |
chapter_path = os.path.join(chapters_dir, chapter_file) | |
chapter_num = int(re.search(r'chapter_(\d+)', chapter_file).group(1)) | |
output_file = os.path.join(output_audio_dir, f"audio_chapter_{chapter_num:03d}.wav") | |
logger.info(f"Converting chapter {chapter_num} to audio...") | |
# Read the chapter text | |
with open(chapter_path, 'r', encoding='utf-8', errors='replace') as f: | |
chapter_text = f.read() | |
# Split into sentences for better processing | |
sentences = split_into_natural_sentences(chapter_text) | |
combined_audio = AudioSegment.empty() | |
# Process each sentence | |
for i, sentence in enumerate(sentences): | |
if not sentence.strip(): | |
continue | |
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
temp_wav.close() | |
try: | |
# Convert sentence to speech | |
if convert_text_to_speech(sentence, temp_wav.name, voice, speed, pitch, gap): | |
sentence_audio = AudioSegment.from_wav(temp_wav.name) | |
combined_audio += sentence_audio | |
# Add a small pause between sentences | |
combined_audio += AudioSegment.silent(duration=50) | |
else: | |
logger.warning(f"Failed to convert sentence in chapter {chapter_num}: {sentence[:50]}...") | |
except Exception as e: | |
logger.error(f"Error processing sentence: {e}") | |
finally: | |
# Clean up temporary file | |
if os.path.exists(temp_wav.name): | |
os.unlink(temp_wav.name) | |
# Export the combined audio for this chapter | |
if len(combined_audio) > 0: | |
combined_audio.export(output_file, format='wav') | |
logger.info(f"Saved audio for chapter {chapter_num}") | |
processed_chapters += 1 | |
else: | |
logger.error(f"No audio generated for chapter {chapter_num}") | |
failed_chapters += 1 | |
# Update progress | |
if progress_callback: | |
progress_callback((processed_chapters + failed_chapters) / total_chapters, | |
f"Processed {processed_chapters}/{total_chapters} chapters") | |
return processed_chapters, failed_chapters | |
def create_m4b_audiobook(input_audio_dir, ebook_path, output_dir, title=None, author=None, progress_callback=None): | |
"""Create M4B audiobook from chapter audio files.""" | |
ensure_directory(output_dir) | |
# Extract base name from ebook path | |
base_name = os.path.splitext(os.path.basename(ebook_path))[0] | |
output_m4b = os.path.join(output_dir, f"{base_name}.m4b") | |
# Get chapter files | |
chapter_files = [f for f in os.listdir(input_audio_dir) if f.startswith('audio_chapter_') and f.endswith('.wav')] | |
chapter_files.sort(key=lambda f: int(re.search(r'audio_chapter_(\d+)', f).group(1))) | |
if not chapter_files: | |
logger.error("No audio chapter files found") | |
return None | |
# Create temporary directory | |
temp_dir = tempfile.mkdtemp() | |
try: | |
# Combine audio files | |
combined_wav = os.path.join(temp_dir, "combined.wav") | |
combined_audio = AudioSegment.empty() | |
chapter_positions = [] | |
current_position = 0 | |
for i, chapter_file in enumerate(chapter_files): | |
chapter_path = os.path.join(input_audio_dir, chapter_file) | |
logger.info(f"Adding chapter {i+1}/{len(chapter_files)} to audiobook") | |
try: | |
audio = AudioSegment.from_wav(chapter_path) | |
chapter_positions.append((current_position, len(audio), f"Chapter {i+1}")) | |
combined_audio += audio | |
current_position += len(audio) | |
# Add silence between chapters | |
if i < len(chapter_files) - 1: | |
silence = AudioSegment.silent(duration=1000) # 1 second | |
combined_audio += silence | |
current_position += 1000 | |
except Exception as e: | |
logger.error(f"Error processing audio file {chapter_file}: {e}") | |
# Export combined audio | |
combined_audio.export(combined_wav, format="wav") | |
# Extract cover | |
cover_image = extract_metadata_and_cover(ebook_path) | |
# Create metadata file | |
metadata_file = os.path.join(temp_dir, "metadata.txt") | |
with open(metadata_file, 'w') as f: | |
f.write(';FFMETADATA1\n') | |
if title: | |
f.write(f"title={title}\n") | |
if author: | |
f.write(f"artist={author}\n") | |
# Add chapters | |
for i, (start, duration, title) in enumerate(chapter_positions): | |
f.write(f'[CHAPTER]\nTIMEBASE=1/1000\nSTART={start}\n') | |
f.write(f'END={start + duration}\ntitle={title}\n') | |
# Create M4B file with ffmpeg | |
ffmpeg_cmd = ['ffmpeg', '-i', combined_wav, '-i', metadata_file] | |
if cover_image and os.path.exists(cover_image): | |
ffmpeg_cmd += ['-i', cover_image, '-map', '0:a', '-map', '2:v'] | |
ffmpeg_cmd += ['-c:v', 'png', '-disposition:v', 'attached_pic'] | |
else: | |
ffmpeg_cmd += ['-map', '0:a'] | |
ffmpeg_cmd += ['-map_metadata', '1', '-c:a', 'aac', '-b:a', '192k', output_m4b] | |
# Execute ffmpeg command | |
try: | |
subprocess.run(ffmpeg_cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) | |
logger.info(f"M4B file created successfully: {output_m4b}") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error creating M4B file: {e}") | |
logger.error(f"STDERR: {e.stderr.decode('utf-8', errors='replace')}") | |
# Try simplified approach | |
logger.info("Trying simplified M4B creation...") | |
simple_cmd = ['ffmpeg', '-i', combined_wav, '-c:a', 'aac', '-b:a', '192k', output_m4b] | |
try: | |
subprocess.run(simple_cmd, check=True) | |
logger.info(f"M4B file created with simplified method: {output_m4b}") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Simplified M4B creation also failed: {e}") | |
return None | |
finally: | |
# Clean up temporary directory | |
shutil.rmtree(temp_dir) | |
if os.path.exists(output_m4b) and os.path.getsize(output_m4b) > 0: | |
return output_m4b | |
else: | |
logger.error("M4B file was not created or is empty") | |
return None | |
# Main conversion function | |
def convert_ebook_to_audiobook(ebook_file, speed, pitch, voice, gap, progress=None): | |
"""Main function to convert ebook to audiobook.""" | |
start_time = time.time() | |
# Initialize directories | |
base_dir = os.path.abspath(os.path.dirname(__file__)) | |
working_dir = os.path.join(base_dir, "Working_files") | |
chapters_dir = os.path.join(working_dir, "chapters") | |
audio_dir = os.path.join(base_dir, "Chapter_wav_files") | |
output_dir = os.path.join(base_dir, "Audiobooks") | |
# Ensure output directory exists | |
ensure_directory(output_dir) | |
# Clean up previous files | |
remove_directory(working_dir) | |
remove_directory(audio_dir) | |
# Create necessary directories | |
ensure_directory(working_dir) | |
ensure_directory(chapters_dir) | |
ensure_directory(audio_dir) | |
ebook_path = ebook_file.name | |
ebook_name = os.path.basename(ebook_path) | |
try: | |
# Extract basic metadata if possible | |
try: | |
meta_result = subprocess.run(['ebook-meta', ebook_path], | |
stdout=subprocess.PIPE, text=True, check=False) | |
title_match = re.search(r'Title\s+:\s+(.*)', meta_result.stdout) | |
author_match = re.search(r'Author\(s\)\s+:\s+(.*)', meta_result.stdout) | |
title = title_match.group(1) if title_match else None | |
author = author_match.group(1) if author_match else None | |
except Exception as e: | |
logger.warning(f"Could not extract metadata: {e}") | |
title = author = None | |
# Process ebook to chapters | |
if progress: | |
progress(0.1, desc="Extracting chapters from ebook") | |
num_chapters = process_ebook_to_chapters(ebook_path, chapters_dir) | |
if num_chapters == 0: | |
return f"Failed to extract chapters from {ebook_name}", None | |
# Convert chapters to audio | |
if progress: | |
progress(0.3, desc="Converting text to speech") | |
processed, failed = convert_chapters_to_audio( | |
chapters_dir, | |
audio_dir, | |
voice.split()[0], | |
int(speed), | |
int(pitch), | |
int(gap), | |
lambda prog, desc: progress(0.3 + prog * 0.6, desc=desc) if progress else None | |
) | |
if processed == 0: | |
return f"Failed to convert any chapters to audio for {ebook_name}", None | |
# Create M4B audiobook | |
if progress: | |
progress(0.9, desc="Creating M4B audiobook") | |
m4b_path = create_m4b_audiobook(audio_dir, ebook_path, output_dir, title, author) | |
if not m4b_path: | |
return f"Failed to create M4B file for {ebook_name}", None | |
# Conversion complete | |
elapsed_time = time.time() - start_time | |
if progress: | |
progress(1.0, desc="Conversion complete") | |
return f"Audiobook created: {os.path.basename(m4b_path)} (in {elapsed_time:.1f} seconds)", m4b_path | |
except Exception as e: | |
logger.error(f"Error converting ebook: {e}", exc_info=True) | |
return f"Error: {str(e)}", None | |
# Utility functions for Gradio interface | |
def get_available_voices(): | |
"""Get list of available espeak-ng voices.""" | |
try: | |
result = subprocess.run(['espeak-ng', '--voices'], | |
stdout=subprocess.PIPE, text=True, check=True) | |
lines = result.stdout.splitlines()[1:] # Skip header | |
voices = [] | |
for line in lines: | |
parts = line.split() | |
if len(parts) > 3: | |
voice_id = parts[3] # Language code | |
description = ' '.join(parts[3:]) # Description | |
voices.append(f"{voice_id} ({description})") | |
return sorted(voices) | |
except Exception as e: | |
logger.error(f"Error getting voices: {e}") | |
# Return some default voices as fallback | |
return ["en (English)", "en-us (American English)", "en-gb (British English)"] | |
def list_audiobooks(): | |
"""List all audiobooks in the output directory.""" | |
output_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "Audiobooks") | |
ensure_directory(output_dir) | |
files = [] | |
for filename in os.listdir(output_dir): | |
if filename.endswith('.m4b'): | |
filepath = os.path.join(output_dir, filename) | |
files.append(filepath) | |
return sorted(files) | |
# Gradio interface (continued) | |
def create_gradio_interface(port=7860): | |
"""Create and launch Gradio interface.""" | |
# Create theme | |
theme = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="green", | |
neutral_hue="slate", | |
text_size=gr.themes.sizes.text_md, | |
) | |
# Create interface | |
with gr.Blocks(theme=theme, title="eBook to Audiobook Converter") as demo: | |
gr.Markdown( | |
""" | |
# 📚 eBook to Audiobook Converter | |
Convert any eBook format (EPUB, MOBI, PDF, etc.) to an M4B audiobook using eSpeak-NG. | |
## Features | |
- Automatic chapter detection | |
- Natural sentence splitting | |
- Multiple voice and language options | |
- Customizable speech settings | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
ebook_file = gr.File(label="eBook File", file_types=[".epub", ".mobi", ".azw", ".azw3", ".fb2", ".txt", ".pdf"]) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
speed = gr.Slider(minimum=80, maximum=450, value=170, step=1, | |
label="Speech Speed", info="Higher values = faster speech") | |
with gr.Column(scale=1): | |
pitch = gr.Slider(minimum=0, maximum=99, value=50, step=1, | |
label="Voice Pitch", info="Higher values = higher pitch") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gap = gr.Slider(minimum=0, maximum=20, value=5, step=1, | |
label="Pause Length", info="Pause between words (ms)") | |
with gr.Column(scale=1): | |
voice_dropdown = gr.Dropdown( | |
choices=get_available_voices(), | |
label="Voice", | |
value="en (English)", | |
info="Select language and voice variant" | |
) | |
convert_btn = gr.Button("Convert to Audiobook", variant="primary") | |
cancel_btn = gr.Button("Cancel Conversion", variant="stop") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
conversion_status = gr.Textbox(label="Conversion Status", interactive=False) | |
with gr.Column(scale=1): | |
audio_player = gr.Audio(label="Preview", type="filepath", interactive=False) | |
gr.Markdown("## Download Audiobooks") | |
with gr.Row(): | |
refresh_btn = gr.Button("Refresh List") | |
download_btn = gr.Button("Download Selected File") | |
audiobook_files = gr.Dropdown( | |
choices=list_audiobooks(), | |
label="Available Audiobooks", | |
value=None, | |
interactive=True | |
) | |
# Define conversion task state | |
conversion_task = {"running": False, "thread": None} | |
# Handle events | |
def start_conversion(ebook_file, speed, pitch, voice, gap, progress=gr.Progress()): | |
# Check if already running | |
if conversion_task["running"]: | |
return "A conversion is already in progress. Please wait or cancel it.", None | |
if not ebook_file: | |
return "Please select an eBook file first.", None | |
conversion_task["running"] = True | |
result, output_path = convert_ebook_to_audiobook(ebook_file, speed, pitch, voice, gap, progress) | |
conversion_task["running"] = False | |
return result, output_path | |
def cancel_current_conversion(): | |
if conversion_task["running"]: | |
conversion_task["running"] = False | |
return "Conversion cancelled." | |
else: | |
return "No conversion is currently running." | |
def refresh_audiobook_list(): | |
return gr.Dropdown.update(choices=list_audiobooks()) | |
# Connect events | |
convert_btn.click( | |
start_conversion, | |
inputs=[ebook_file, speed, pitch, voice_dropdown, gap], | |
outputs=[conversion_status, audio_player] | |
) | |
cancel_btn.click( | |
cancel_current_conversion, | |
outputs=[conversion_status] | |
) | |
refresh_btn.click( | |
refresh_audiobook_list, | |
outputs=[audiobook_files] | |
) | |
download_btn.click( | |
lambda x: x, | |
inputs=[audiobook_files], | |
outputs=[audiobook_files] | |
) | |
ebook_file.upload( | |
lambda: "eBook uploaded successfully", | |
outputs=[conversion_status] | |
) | |
# Launch the interface | |
demo.launch(server_port=port, share=True) | |
return demo | |
# Command-line interface | |
def main(): | |
"""Command-line entry point.""" | |
parser = argparse.ArgumentParser(description='Convert eBooks to Audiobooks') | |
parser.add_argument('--gui', action='store_true', help='Launch graphical interface') | |
parser.add_argument('--port', type=int, default=7860, help='Port for web interface') | |
parser.add_argument('--ebook', type=str, help='Path to eBook file') | |
parser.add_argument('--voice', default='en', help='eSpeak voice to use') | |
parser.add_argument('--speed', type=int, default=170, help='Speech speed') | |
parser.add_argument('--pitch', type=int, default=50, help='Voice pitch') | |
parser.add_argument('--gap', type=int, default=5, help='Word gap') | |
args = parser.parse_args() | |
if args.gui: | |
create_gradio_interface(port=args.port) | |
elif args.ebook: | |
# Create a temporary file-like object for the ebook path | |
class FilePath: | |
def __init__(self, path): | |
self.name = path | |
print(f"Converting {args.ebook} to audiobook...") | |
result, output_path = convert_ebook_to_audiobook( | |
FilePath(args.ebook), | |
args.speed, | |
args.pitch, | |
args.voice, | |
args.gap | |
) | |
print(result) | |
else: | |
# Default to GUI if no arguments | |
create_gradio_interface() | |
if __name__ == "__main__": | |
main() |