drewThomasson's picture
Update app.py
1859aa0 verified
raw
history blame
43.8 kB
import gradio as gr
import subprocess
import os
import tempfile
import shutil
import re
import logging
from pathlib import Path
from PIL import Image, UnidentifiedImageError # For checking image validity
try:
import mutagen
from mutagen.mp3 import MP3, EasyMP3
from mutagen.oggvorbis import OggVorbis
from mutagen.flac import FLAC
from mutagen.mp4 import MP4, MP4Cover
from mutagen.id3 import ID3, APIC, PictureType, error as ID3Error
MUTAGEN_AVAILABLE = True
except ImportError:
MUTAGEN_AVAILABLE = False
logging.warning("Mutagen library not found. Cover art embedding will be disabled.")
logging.warning("Install it using: pip install mutagen")
# --- Configuration & Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Helper Functions ---
def check_command(command):
"""Checks if a command exists in the system's PATH."""
try:
if os.name == 'nt':
# 'where' command on Windows
subprocess.run(['where', command], check=True, capture_output=True, timeout=5)
else:
# 'command -v' is generally preferred and more portable than 'which' on Unix-like systems
subprocess.run(['command', '-v', command], check=True, capture_output=True, timeout=5)
logging.info(f"Command '{command}' found.")
return True
except FileNotFoundError:
logging.error(f"Command '{command}' check tool ('where' or 'command') not found.")
return False
except subprocess.CalledProcessError:
logging.warning(f"Command '{command}' not found in PATH.")
return False
except subprocess.TimeoutExpired:
logging.error(f"Timeout checking for command '{command}'. Assuming not found.")
return False
except Exception as e:
logging.error(f"Unexpected error checking for command '{command}': {e}")
return False
def get_espeak_voices():
"""Gets available espeak-ng voices and their languages."""
voices = {}
try:
cmd = ['espeak-ng', '--voices']
logging.info(f"Getting voices with command: {' '.join(cmd)}")
# Use a timeout to prevent hanging if espeak-ng has issues
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8', errors='ignore', timeout=15)
# Example line format: P L V Language Code Age/Gender VoiceName File Other Langs
# 2 y en-US M american-english-us Mbrola/us1 (en 10)
# 1 af M afrikaans Afrikaans
# More robust pattern to handle variations
pattern = re.compile(r"^\s*\d+\s+[yn\-]\s+([\w\-]+)\s+[MF\-]?\s+([\w\s\(\)\-]+?)\s+([\w\/\s\-]+?)(?:\s+\(.*\))?\s*$")
lines = result.stdout.splitlines()
if not lines or len(lines) < 2: # Check if there's output beyond the header
logging.warning("No voice lines found in 'espeak-ng --voices' output.")
raise ValueError("No voice data returned.")
for line in lines[1:]: # Skip header
match = pattern.match(line.strip())
if match:
# Extract code (group 1) and language name (group 2)
code = match.group(1).strip()
lang_name = match.group(2).strip()
# Clean up language name (remove potential file paths sometimes included)
lang_name = lang_name.split(" ")[0]
# Prioritize names like "english-us" over just "english" if code reflects it
display_name = f"{lang_name.replace('-', ' ').title()} ({code})"
# Avoid duplicates, preferring more specific codes if names clash slightly
if display_name not in voices:
voices[display_name] = code
else:
# Simpler split as fallback for lines that don't match complex regex
parts = line.split()
if len(parts) >= 4 and parts[0].isdigit():
code = parts[1]
lang_name = parts[3]
display_name = f"{lang_name.strip().title()} ({code})"
if display_name not in voices:
voices[display_name] = code
else:
logging.warning(f"Could not parse voice line: {line}")
if not voices:
logging.warning("Could not parse any voices from 'espeak-ng --voices'. Using fallback list.")
raise ValueError("Parsing failed.")
# Sort voices alphabetically by display name
sorted_voices = dict(sorted(voices.items()))
logging.info(f"Found {len(sorted_voices)} espeak-ng voices.")
return sorted_voices
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError, Exception) as e:
logging.error(f"Error getting espeak-ng voices: {e}")
# Provide a basic fallback list if the command fails or parsing fails
return {"English (en)": "en", "Spanish (es)": "es", "French (fr)": "fr", "German (de)": "de"}
# --- Main Conversion Logic ---
def convert_ebook_to_audio(ebook_file, language_display, output_format, embed_cover, progress=gr.Progress(track_tqdm=True)):
"""
Converts an ebook file to an audiobook using Calibre and espeak-ng.
"""
if not ebook_file:
return None, None, "**Error:** No ebook file provided."
# Check required commands *before* creating temp dir
calibre_convert_ok = check_command("ebook-convert")
calibre_meta_ok = check_command("ebook-meta") # Check always, needed logic follows
espeak_ok = check_command("espeak-ng")
lame_ok = check_command("lame")
oggenc_ok = check_command("oggenc") # From vorbis-tools
missing = []
if not calibre_convert_ok: missing.append("Calibre ('ebook-convert')")
if not calibre_meta_ok: missing.append("Calibre ('ebook-meta' - for cover art)")
if not espeak_ok: missing.append("espeak-ng")
if not lame_ok and output_format == 'mp3': missing.append("LAME (for MP3)")
if not oggenc_ok and output_format == 'ogg': missing.append("oggenc (for OGG)")
if missing:
error_msg = f"**Error:** Missing required system command(s):\n- {', '.join(missing)}\n\nPlease ensure they are installed in the environment (check packages.txt)."
logging.error(error_msg.replace("**Error:** ","").replace("\n- "," ").replace("\n"," ")) # Log plain text
return None, None, error_msg
temp_dir = tempfile.mkdtemp(prefix="ebook_audio_")
logging.info(f"Created temporary directory: {temp_dir}")
status_updates = ["▢️ Conversion process started..."]
cover_image_path_final = None # Track final usable cover path for display/embedding
audio_output_path_final = None # Keep track of the final audio path for return
try:
input_ebook_path = ebook_file.name # Gradio provides a temp path for the upload
# Sanitize filename slightly for output files
base_filename = re.sub(r'[^\w\-]+', '_', Path(input_ebook_path).stem)
txt_output_path = os.path.join(temp_dir, f"{base_filename}.txt")
# Use a generic name first, then check format
cover_output_path_temp = os.path.join(temp_dir, "cover_temp")
audio_output_path = os.path.join(temp_dir, f"{base_filename}.{output_format}")
# --- Step 1: Extract Cover Art (Optional) ---
cover_extracted = False
if embed_cover and calibre_meta_ok:
progress(0.1, desc="πŸ–ΌοΈ Extracting cover art (optional)...")
status_updates.append(" Attempting to extract cover art...")
try:
cmd_meta = ['ebook-meta', input_ebook_path, '--get-cover', cover_output_path_temp]
logging.info(f"Running cover extraction: {' '.join(cmd_meta)}")
# Use timeout for ebook-meta as well
result_meta = subprocess.run(cmd_meta, check=True, capture_output=True, text=True, errors='ignore', timeout=30)
if os.path.exists(cover_output_path_temp) and os.path.getsize(cover_output_path_temp) > 0:
# Validate image and get format
try:
img = Image.open(cover_output_path_temp)
img.verify() # Basic check
img.close() # Need to close after verify
# Reopen to check format properly and prepare final path
img = Image.open(cover_output_path_temp)
img_format = img.format.lower() if img.format else 'jpeg' # Default guess
img.close()
# Define final path with correct extension
valid_ext = f".{img_format}" if img_format in ['jpeg', 'png', 'gif'] else ".jpg" # Default to jpg
cover_image_path_final = os.path.join(temp_dir, f"cover_final{valid_ext}")
shutil.move(cover_output_path_temp, cover_image_path_final) # Rename with correct extension
cover_extracted = True
status_updates.append(f" βœ… Cover art extracted successfully ({img_format.upper()}).")
logging.info(f"Cover art extracted to {cover_image_path_final}")
except (IOError, SyntaxError, UnidentifiedImageError) as img_err:
logging.warning(f"Extracted file at {cover_output_path_temp} is not a valid image: {img_err}")
status_updates.append(" ⚠️ Extracted 'cover' file is not a valid image. Will skip embedding.")
if os.path.exists(cover_output_path_temp): os.remove(cover_output_path_temp) # Clean up invalid file
if cover_image_path_final and os.path.exists(cover_image_path_final): os.remove(cover_image_path_final)
cover_image_path_final = None # Ensure it's None
else:
status_updates.append(" ℹ️ No cover art found in the ebook metadata.")
logging.info("ebook-meta ran but did not produce a cover file or it was empty.")
if os.path.exists(cover_output_path_temp): os.remove(cover_output_path_temp) # Clean up empty file
except subprocess.TimeoutExpired:
status_updates.append(f" ⚠️ Timeout trying to extract cover art.")
logging.warning(f"ebook-meta timed out.")
except subprocess.CalledProcessError as e:
stderr_decoded = e.stderr.decode(errors='ignore').strip() if e.stderr else "No stderr"
status_updates.append(f" ⚠️ Failed to extract cover art. Error: {stderr_decoded[:200]}{'...' if len(stderr_decoded)>200 else ''}") # Keep it short
logging.warning(f"ebook-meta failed: {stderr_decoded}")
except Exception as e:
status_updates.append(f" ⚠️ An unexpected error occurred during cover extraction: {e}")
logging.error(f"Unexpected error during cover extraction: {e}", exc_info=True)
# Ensure temp file is removed if final path wasn't set
if not cover_image_path_final and os.path.exists(cover_output_path_temp):
os.remove(cover_output_path_temp)
elif embed_cover and not calibre_meta_ok:
status_updates.append(" ℹ️ Cover art embedding requested, but 'ebook-meta' command not found.")
elif embed_cover and not MUTAGEN_AVAILABLE:
status_updates.append(" ℹ️ Cover art embedding requested, but 'mutagen' Python library not installed.")
# --- Step 2: Convert Ebook to TXT ---
progress(0.3, desc="πŸ“– Converting ebook to TXT...")
status_updates.append("πŸ“– Converting ebook to plain text...")
try:
# Add options known to help with TXT output quality
# --input-encoding=utf8 is often needed for non-ASCII content
cmd_convert = [
'ebook-convert', input_ebook_path, txt_output_path,
'--enable-heuristics',
'--output-profile=generic_eink', # Profiles can influence text formatting
'--input-encoding=utf8', # Try specifying UTF-8
'--pretty-print' # Can sometimes help structure
]
logging.info(f"Running ebook conversion: {' '.join(cmd_convert)}")
# Increased timeout for potentially large books
result_convert = subprocess.run(cmd_convert, check=True, capture_output=True, encoding='utf-8', errors='ignore', timeout=300) # 5 mins
# Log stdout/stderr even on success for warnings
if result_convert.stdout: logging.info(f"ebook-convert stdout: {result_convert.stdout.strip()}")
if result_convert.stderr: logging.warning(f"ebook-convert stderr: {result_convert.stderr.strip()}")
status_updates.append(" βœ… Ebook converted to TXT.")
logging.info("Ebook successfully converted to TXT.")
except subprocess.TimeoutExpired:
error_msg = "**Error:** Calibre conversion timed out (may be a very large or complex book)."
status_updates.append(f" ❌ Calibre conversion timed out.")
logging.error("Error during Calibre conversion: Timeout")
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
except subprocess.CalledProcessError as e:
stderr_decoded = e.stderr.decode(errors='ignore').strip() if e.stderr else "No stderr"
stdout_decoded = e.stdout.decode(errors='ignore').strip() if e.stdout else "No stdout"
error_details = f"Stderr:\n```\n{stderr_decoded}\n```\nStdout:\n```\n{stdout_decoded}\n```" if stderr_decoded or stdout_decoded else str(e)
error_msg = f"**Error:** Calibre conversion failed (Exit Code {e.returncode}).\n{error_details}"
status_updates.append(f" ❌ Calibre conversion failed.")
logging.error(f"Error during Calibre conversion: Exit Code {e.returncode}\nStderr: {stderr_decoded}\nStdout: {stdout_decoded}")
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
except Exception as e:
error_msg = f"**Error:** An unexpected error occurred during ebook conversion:\n{e}"
status_updates.append(f" ❌ Unexpected conversion error.")
logging.error(f"Unexpected error during ebook conversion: {e}", exc_info=True)
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
# Check if TXT file was actually created and is not empty
if not os.path.exists(txt_output_path) or os.path.getsize(txt_output_path) == 0:
error_msg = "**Error:** Calibre finished, but the output TXT file is missing or empty.\nThis can happen with image-based ebooks (like scanned PDFs, comics, CBZ/CBR) or DRM-protected files.\nCalibre cannot process these types into text."
status_updates.append(f" ❌ TXT output empty/missing.")
logging.error("Calibre finished, but the output TXT file is missing or empty.")
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
# --- Step 3: Convert TXT to Audio ---
progress(0.6, desc="πŸ—£οΈ Converting TXT to Audio...")
status_updates.append("πŸ—£οΈ Converting text to speech...")
voice_code = available_voices.get(language_display, 'en') # Get code from display name
# Base espeak-ng command: specify voice, read from file
cmd_speak = ['espeak-ng', '-v', voice_code, '-f', txt_output_path]
# Optionally add speed or other espeak parameters here:
# cmd_speak.extend(['-s', '160']) # Example: Set speed (default 175)
try:
logging.info(f"Preparing audio command for format: {output_format}")
# Define timeout for TTS process (can be long for large books)
tts_timeout = 1800 # 30 minutes
if output_format == 'wav':
cmd_speak.extend(['-w', audio_output_path])
logging.info(f"Running espeak-ng (WAV): {' '.join(cmd_speak)}")
result_speak = subprocess.run(cmd_speak, check=True, capture_output=True, timeout=tts_timeout)
if result_speak.stderr: logging.warning(f"espeak-ng stderr (WAV): {result_speak.stderr.decode(errors='ignore').strip()}")
elif output_format == 'mp3':
if not lame_ok: raise FileNotFoundError("LAME command not found")
cmd_speak.append('--stdout') # espeak outputs WAV to stdout
cmd_lame = ['lame', '-', audio_output_path] # LAME reads WAV from stdin, outputs MP3
logging.info(f"Running pipe: {' '.join(cmd_speak)} | {' '.join(cmd_lame)}")
# Start espeak-ng process
ps_speak = subprocess.Popen(cmd_speak, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Start LAME process, piping espeak's stdout to LAME's stdin
ps_lame = subprocess.Popen(cmd_lame, stdin=ps_speak.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# *** Crucial: Allow ps_speak stdout to be closed by ps_lame if it finishes reading ***
if ps_speak.stdout:
ps_speak.stdout.close()
# Capture stderr from both processes, wait for LAME first (end of pipeline)
try:
lame_stdout_bytes, lame_stderr_bytes = ps_lame.communicate(timeout=tts_timeout + 60) # Allow extra time for encoding
except subprocess.TimeoutExpired:
logging.error("LAME process timed out.")
ps_speak.kill() # Kill upstream process too
ps_lame.kill()
raise subprocess.TimeoutExpired(cmd_lame, tts_timeout + 60)
# Now wait for espeak and capture its stderr
speak_stderr_bytes = ps_speak.stderr.read() if ps_speak.stderr else b""
ps_speak.wait() # Wait for espeak to fully terminate
if ps_speak.stderr: ps_speak.stderr.close()
# Decode stderr for logging/errors
lame_stderr_str = lame_stderr_bytes.decode(errors='ignore').strip()
speak_stderr_str = speak_stderr_bytes.decode(errors='ignore').strip()
# Check return codes AFTER both processes finished
if ps_lame.returncode != 0:
logging.error(f"LAME failed with exit code {ps_lame.returncode}. LAME stderr: {lame_stderr_str}")
raise subprocess.CalledProcessError(ps_lame.returncode, cmd_lame, stderr=lame_stderr_bytes)
if ps_speak.returncode != 0:
logging.error(f"espeak-ng failed with exit code {ps_speak.returncode}. espeak-ng stderr: {speak_stderr_str}")
raise subprocess.CalledProcessError(ps_speak.returncode, cmd_speak, stderr=speak_stderr_bytes)
# Log any non-fatal warnings from stderr
if lame_stderr_str: logging.warning(f"LAME stderr: {lame_stderr_str}")
if speak_stderr_str: logging.warning(f"espeak-ng stderr: {speak_stderr_str}")
elif output_format == 'ogg':
if not oggenc_ok: raise FileNotFoundError("oggenc command not found")
cmd_speak.append('--stdout') # espeak outputs WAV to stdout
# oggenc reads WAV from stdin ('-') and writes to output file ('-o')
cmd_ogg = ['oggenc', '-o', audio_output_path, '-']
logging.info(f"Running pipe: {' '.join(cmd_speak)} | {' '.join(cmd_ogg)}")
ps_speak = subprocess.Popen(cmd_speak, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ps_ogg = subprocess.Popen(cmd_ogg, stdin=ps_speak.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if ps_speak.stdout:
ps_speak.stdout.close()
try:
ogg_stdout_bytes, ogg_stderr_bytes = ps_ogg.communicate(timeout=tts_timeout + 60)
except subprocess.TimeoutExpired:
logging.error("oggenc process timed out.")
ps_speak.kill()
ps_ogg.kill()
raise subprocess.TimeoutExpired(cmd_ogg, tts_timeout + 60)
speak_stderr_bytes = ps_speak.stderr.read() if ps_speak.stderr else b""
ps_speak.wait()
if ps_speak.stderr: ps_speak.stderr.close()
ogg_stderr_str = ogg_stderr_bytes.decode(errors='ignore').strip()
speak_stderr_str = speak_stderr_bytes.decode(errors='ignore').strip()
if ps_ogg.returncode != 0:
logging.error(f"oggenc failed with exit code {ps_ogg.returncode}. oggenc stderr: {ogg_stderr_str}")
raise subprocess.CalledProcessError(ps_ogg.returncode, cmd_ogg, stderr=ogg_stderr_bytes)
if ps_speak.returncode != 0:
logging.error(f"espeak-ng failed with exit code {ps_speak.returncode}. espeak-ng stderr: {speak_stderr_str}")
raise subprocess.CalledProcessError(ps_speak.returncode, cmd_speak, stderr=speak_stderr_bytes)
if ogg_stderr_str: logging.warning(f"oggenc stderr: {ogg_stderr_str}")
if speak_stderr_str: logging.warning(f"espeak-ng stderr: {speak_stderr_str}")
else:
raise ValueError(f"Unsupported output format selected: {output_format}")
status_updates.append(" βœ… Text converted to audio.")
logging.info(f"Text successfully converted to {output_format.upper()}.")
except subprocess.CalledProcessError as e:
command_name = Path(e.cmd[0]).name if isinstance(e.cmd, list) else e.cmd
stderr_str = e.stderr.decode(errors='ignore').strip() if isinstance(e.stderr, bytes) else (e.stderr or "")
stdout_str = e.stdout.decode(errors='ignore').strip() if isinstance(e.stdout, bytes) else (e.stdout or "")
error_details = stderr_str or stdout_str or "No output/error captured."
exit_status_str = f"exit status {e.returncode}" if e.returncode is not None else "unknown exit status"
cmd_str = ' '.join(e.cmd) if isinstance(e.cmd, list) else e.cmd
error_msg = (f"**Error:** Audio generation failed.\n\n"
f"**Process:** `{command_name}`\n"
f"**Command:**\n```\n{cmd_str}\n```\n"
f"**Exit Status:** {exit_status_str}\n\n"
f"**Output/Error:**\n```\n{error_details}\n```")
status_updates.append(f" ❌ Audio generation failed ({command_name}).")
logging.error(f"Audio generation failed. Command: `{cmd_str}` Exit: {exit_status_str} Details: {error_details}")
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
except subprocess.TimeoutExpired as e:
command_name = Path(e.cmd[0]).name if isinstance(e.cmd, list) else e.cmd
error_msg = f"**Error:** Audio generation timed out (over {e.timeout}s) during `{command_name}` processing.\nThe ebook might be too long for the current timeout limit."
status_updates.append(f" ❌ Audio generation timed out.")
logging.error(f"Audio generation timed out for command: {' '.join(e.cmd)}")
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
except FileNotFoundError as e:
# This should ideally be caught by initial checks, but handle defensively
missing_cmd = e.filename or "Unknown command"
error_msg = f"**Error:** Command `{missing_cmd}` not found during audio generation for {output_format.upper()} output.\nPlease check `packages.txt`."
status_updates.append(f" ❌ Command '{missing_cmd}' not found.")
logging.error(f"Error: Command '{missing_cmd}' not found during execution.")
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
except Exception as e:
error_msg = f"**Error:** An unexpected error occurred during audio generation:\n```\n{e}\n```"
status_updates.append(f" ❌ Unexpected audio error.")
logging.error(f"An unexpected error occurred during audio generation: {e}", exc_info=True)
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
# --- Step 3b: Verify Audio Output ---
if not os.path.exists(audio_output_path) or os.path.getsize(audio_output_path) < 256: # Check if file exists and has *some* data
error_msg = f"**Error:** Audio generation command finished, but the output file '{Path(audio_output_path).name}' is missing or empty/too small.\nCheck logs for potential errors during the TTS or encoding process."
status_updates.append(f" ❌ Audio output missing or invalid.")
logging.error(f"Audio output file missing or too small after generation: {audio_output_path}")
# Try to provide more context if stderr was captured earlier
# last_stderr = speak_stderr_str or lame_stderr_str or ogg_stderr_str # From pipe section
# if last_stderr: error_msg += f"\nLast captured error output:\n```\n{last_stderr}\n```"
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}"
# --- Step 4: Embed Cover Art (Optional) ---
if embed_cover and cover_extracted and MUTAGEN_AVAILABLE and cover_image_path_final and os.path.exists(cover_image_path_final):
progress(0.9, desc="πŸ–ΌοΈ Embedding cover art...")
status_updates.append("πŸ–ΌοΈ Embedding cover art into audio file...")
try:
with open(cover_image_path_final, 'rb') as img_f:
cover_data = img_f.read()
# Determine mimetype robustly using Pillow
mime_type = 'image/jpeg' # Default
img_width, img_height, img_depth = 0, 0, 24 # Defaults for FLAC/OGG
try:
img = Image.open(cover_image_path_final)
mime_type = Image.MIME.get(img.format)
img_width, img_height = img.width, img.height
img_depth = {'RGB': 24, 'RGBA': 32, 'L': 8, 'P': 8}.get(img.mode, 24) # Palette 'P' often 8-bit
img.close()
if not mime_type:
ext = Path(cover_image_path_final).suffix.lower()
if ext == ".jpg" or ext == ".jpeg": mime_type = 'image/jpeg'
elif ext == ".png": mime_type = 'image/png'
else: raise ValueError("Unsupported image format for MIME detection") # Force fallback
logging.info(f"Using cover mime type: {mime_type}, Dimensions: {img_width}x{img_height}, Depth: {img_depth}")
except Exception as pil_err:
logging.warning(f"Could not determine MIME type/dimensions using PIL: {pil_err}. Falling back to image/jpeg.")
mime_type = 'image/jpeg' # Fallback
logging.info(f"Attempting to embed cover art ({mime_type}) into {audio_output_path}")
audio = mutagen.File(audio_output_path, easy=True) # Use easy=True for simple tags, fallback to non-easy for picture
if audio is None:
# Try loading without easy=True if easy fails
audio = mutagen.File(audio_output_path, easy=False)
if audio is None:
raise ValueError("Mutagen could not load the audio file. Format might be unsupported or file corrupted.")
# --- Add Title/Artist using Easy Interface if possible ---
try:
if isinstance(audio, mutagen.easy.EasyMutagen): # Check if Easy interface loaded
if not audio.get('title'): audio['title'] = Path(base_filename).name.replace('_', ' ') # Use sanitized filename base
if not audio.get('artist'): audio['artist'] = "Ebook Speaker"
audio.save() # Save easy tags
# Reload without easy=True for picture embedding if needed by format
audio = mutagen.File(audio_output_path, easy=False)
elif audio is not None: # Easy interface failed, try basic tags with normal interface
if not audio.tags.get('TIT2'): audio.tags.add(mutagen.id3.TIT2(encoding=3, text=Path(base_filename).name.replace('_', ' ')))
if not audio.tags.get('TPE1'): audio.tags.add(mutagen.id3.TPE1(encoding=3, text="Ebook Speaker"))
audio.save()
audio = mutagen.File(audio_output_path, easy=False) # Reload after save
except Exception as tag_err:
logging.warning(f"Could not set basic title/artist tags: {tag_err}")
# --- Embed Picture (using non-easy interface often required) ---
if audio is None: # Check again after potential reload
raise ValueError("Audio object became None after tag saving.")
# Clear existing art first (important!)
try:
audio.tags.delall('APIC') # ID3v2 (MP3)
audio.tags.delall('covr') # MP4
if hasattr(audio, 'clear_pictures'): audio.clear_pictures() # FLAC
if "metadata_block_picture" in audio: del audio["metadata_block_picture"] # OggVorbis
audio.save()
# Reload again after deleting to ensure clean slate
audio = mutagen.File(audio_output_path, easy=False)
if audio is None: raise ValueError("Audio object None after clearing art.")
except (AttributeError, KeyError, TypeError, Exception) as clear_err:
logging.warning(f"Could not definitively clear existing artwork: {clear_err}. Proceeding anyway.")
# Add the new cover
save_needed = False
if isinstance(audio, (MP3, EasyMP3)): # Handles MP3
if audio.tags is None: audio.add_tags()
audio.tags.add(
APIC(
encoding=3, # 3 is for UTF-8
mime=mime_type,
type=PictureType.COVER_FRONT, # Use standard enum
desc='Cover',
data=cover_data
)
)
save_needed = True
elif isinstance(audio, FLAC):
pic = mutagen.flac.Picture()
pic.data = cover_data
pic.type = PictureType.COVER_FRONT
pic.mime = mime_type
pic.width = img_width
pic.height = img_height
pic.depth = img_depth
audio.add_picture(pic)
save_needed = True
elif isinstance(audio, OggVorbis):
# Ogg Vorbis uses base64 encoded FLAC Picture block
import base64
pic = mutagen.flac.Picture()
pic.data = cover_data
pic.type = PictureType.COVER_FRONT
pic.mime = mime_type
pic.width = img_width
pic.height = img_height
pic.depth = img_depth
audio["METADATA_BLOCK_PICTURE"] = [base64.b64encode(pic.write()).decode("ascii")]
save_needed = True
elif isinstance(audio, MP4): # Handles M4A/M4B
if mime_type == 'image/jpeg': pic_format = MP4Cover.FORMAT_JPEG
elif mime_type == 'image/png': pic_format = MP4Cover.FORMAT_PNG
else: pic_format = MP4Cover.FORMAT_UNDEFINED
if pic_format != MP4Cover.FORMAT_UNDEFINED:
audio['covr'] = [MP4Cover(cover_data, imageformat=pic_format)]
save_needed = True
else: logging.warning(f"Unsupported cover image format ({mime_type}) for MP4 embedding.")
else:
logging.warning(f"Cover embedding not implemented for this audio type: {type(audio)}")
if save_needed:
audio.save()
status_updates.append(" βœ… Cover art embedded successfully.")
logging.info("Cover art embedded successfully.")
elif embed_cover: # Only report skip if embedding was attempted but failed type match
status_updates.append(" ⚠️ Cover embedding skipped (unsupported audio format for mutagen?).")
logging.warning(f"Could not embed cover: audio format {type(audio)} not explicitly handled.")
except (mutagen.MutagenError, ValueError, IOError, TypeError, KeyError, AttributeError) as e:
status_updates.append(f" ⚠️ Could not embed cover art. Error: {str(e)[:100]}...")
logging.warning(f"Failed to embed cover art: {e}", exc_info=True)
except Exception as e:
status_updates.append(f" ⚠️ An unexpected error occurred during cover art embedding.")
logging.error(f"Unexpected error during cover embedding: {e}", exc_info=True)
elif embed_cover and not cover_extracted:
status_updates.append(" ℹ️ Cover art embedding skipped (no cover extracted or invalid).")
elif embed_cover and not MUTAGEN_AVAILABLE:
# This was logged earlier, but confirm skip in status
status_updates.append(" ⚠️ Cover art embedding skipped (Mutagen library not installed).")
# --- Step 5: Prepare final output ---
progress(1.0, desc="βœ… Complete!")
status_updates.append("🏁 Conversion complete!")
audio_output_path_final = audio_output_path # Mark the path as final
# Return paths for Gradio components
final_status = "\n".join(status_updates)
logging.info(f"Returning audio: {audio_output_path_final}, cover: {cover_image_path_final}, Status: Success.")
# Return audio path for Audio component, cover path for Image, status for Textbox
return audio_output_path_final, cover_image_path_final, final_status
except Exception as e:
# Catch-all for unexpected errors in the main try block
error_msg = f"An unexpected critical error occurred in the main process: {e}"
status_updates.append(f" ❌ CRITICAL ERROR: {error_msg}")
logging.error(error_msg, exc_info=True)
# Return None for audio, cover path (if extracted), and the error status
final_status = "\n".join(status_updates)
return None, cover_image_path_final, f"{final_status}\n\n**Error:** An unexpected critical error occurred.\nCheck application logs for details.\n{e}"
finally:
# --- Cleanup ---
# We leave the final audio and cover files in temp_dir for Gradio to serve.
# Clean up intermediate files ONLY.
try:
if 'txt_output_path' in locals() and os.path.exists(txt_output_path):
os.remove(txt_output_path)
logging.info(f"Removed intermediate file: {txt_output_path}")
# Remove temporary cover if it's different from final or if final doesn't exist
if 'cover_output_path_temp' in locals() and os.path.exists(cover_output_path_temp):
if not cover_image_path_final or cover_output_path_temp != cover_image_path_final:
os.remove(cover_output_path_temp)
logging.info(f"Removed intermediate file: {cover_output_path_temp}")
# Note: Gradio typically copies temp files, but leaving the dir might be safer
# If space becomes an issue, add shutil.rmtree(temp_dir) here,
# but ensure Gradio doesn't need the original files after the function returns.
logging.info(f"Temporary directory '{temp_dir}' contains final output files and will be cleaned up by Gradio/system later.")
except OSError as e:
logging.warning(f"Error during cleanup of intermediate files: {e}")
# --- Gradio Interface Definition ---
print("Initializing Gradio Interface...")
print("Fetching available eSpeak-NG voices...")
available_voices = get_espeak_voices()
voice_choices = list(available_voices.keys())
print(f"Found {len(voice_choices)} voices.")
# Try to find a sensible default voice (e.g., US English)
default_voice = "English (en)" # Basic fallback
possible_defaults = [
"English (United States) (en-us)",
"English (Us) (en-us)", # Variations in naming
"English (en-us)",
"English (Great Britain) (en-gb)",
"English (Gb) (en-gb)",
"English (en-gb)",
"English (en)",
]
for V in possible_defaults:
if V in voice_choices:
default_voice = V
break
if not voice_choices:
logging.error("FATAL: No espeak voices found or parsed. Language selection will fail.")
# Add a dummy entry if empty to prevent Gradio crash, though unusable
voice_choices = ["Error: No Voices Found"]
default_voice = voice_choices[0]
available_voices = {default_voice: "error"}
# Check for external tools on startup and display warnings if needed
print("Checking required external commands...")
startup_warnings = []
if not check_command("ebook-convert"): startup_warnings.append("Calibre ('ebook-convert')")
if not check_command("ebook-meta"): startup_warnings.append("Calibre ('ebook-meta' - needed for cover art)")
if not check_command("espeak-ng"): startup_warnings.append("espeak-ng (core TTS engine)")
if not check_command("lame"): startup_warnings.append("LAME (needed for MP3 output)")
if not check_command("oggenc"): startup_warnings.append("oggenc (needed for OGG output, from 'vorbis-tools')")
if not MUTAGEN_AVAILABLE: startup_warnings.append("Python 'mutagen' library (needed for embedding cover art - install via requirements.txt)")
startup_message = ""
if startup_warnings:
warning_list = "\n- ".join(startup_warnings)
startup_message = (
"**⚠️ Startup Warning: The following components might be missing or not found:**\n\n"
f"- {warning_list}\n\n"
"Please ensure system packages are listed in `packages.txt` and Python libraries in `requirements.txt`. "
"Functionality relying on missing components will fail. Check container build logs for installation errors."
)
print("--- STARTUP WARNING ---")
print(f"Missing components: {', '.join(startup_warnings)}")
print("-----------------------")
# Define UI Elements
print("Building Gradio UI...")
with gr.Blocks(theme=gr.themes.Soft(), title="Ebook to Audiobook") as demo:
gr.Markdown(
"""
# Ebook to Audiobook Converter πŸŽ§πŸ“š
**Convert your ebooks (EPUB, MOBI, AZW3, FB2, PDF*, etc.) into audiobooks!**
Upload your ebook, select the desired language/voice and audio format, and click Convert.
Optionally, embed the cover art into the audio file metadata.
*(*) Note: PDF conversion works best for text-based PDFs. Scanned images or complex layouts may result in poor text extraction.*
"""
)
if startup_message:
gr.Warning(startup_message) # Use Gradio's warning component
with gr.Row():
with gr.Column(scale=1, min_width=300):
ebook_input = gr.File(label="1. Upload Ebook File", file_count="single", type="file") # Use type="file" for direct path access
lang_dropdown = gr.Dropdown(
label="2. Select Language / Voice",
choices=voice_choices,
value=default_voice if default_voice in voice_choices else (voice_choices[0] if voice_choices else None),
info="Uses voices available from espeak-ng.",
interactive=True
)
format_dropdown = gr.Dropdown(
label="3. Select Output Audio Format",
choices=["mp3", "ogg", "wav"],
value="mp3",
info="MP3 offers good compatibility and compression. OGG is open source. WAV is uncompressed.",
interactive=True
)
cover_checkbox = gr.Checkbox(
label="Embed Cover Art (if available)",
value=True if MUTAGEN_AVAILABLE else False, # Default based on library presence
info="Requires 'mutagen' library and 'ebook-meta' command.",
# interactive=True # Removed dynamic interactive setting to avoid potential Gradio bug
# Let the backend handle skipping if dependencies are missing.
interactive=True # Let's try keeping it interactive, the check is internal now.
)
submit_button = gr.Button("Convert to Audiobook", variant="primary", icon="▢️")
with gr.Column(scale=2, min_width=400):
status_textbox = gr.Textbox(
label="πŸ“Š Conversion Status & Log",
lines=10,
max_lines=20,
interactive=False,
show_copy_button=True,
placeholder="Conversion progress will appear here..."
)
with gr.Row():
# Output components: Image for cover, Audio for playback
cover_image = gr.Image(
label="πŸ–ΌοΈ Extracted Cover Art",
type="filepath", # Function returns a path
interactive=False,
height=250,
show_download_button=True
)
audio_output_player = gr.Audio(
label="🎧 Generated Audiobook",
type="filepath", # Function returns a path
interactive=False # Playback is interactive, but component value isn't set by user
)
# REMOVED separate download button - gr.Audio and gr.Image have download capabilities
# Connect components
submit_button.click(
fn=convert_ebook_to_audio,
inputs=[ebook_input, lang_dropdown, format_dropdown, cover_checkbox],
# Outputs map directly to the components defined above
outputs=[audio_output_player, cover_image, status_textbox]
)
gr.Markdown("--- \n *Powered by Calibre, eSpeak-NG, LAME, OggEnc, Mutagen, and Gradio.*")
# --- Launch the App ---
if __name__ == "__main__":
print("Starting Gradio App Server...")
if not voice_choices or voice_choices[0].startswith("Error"):
print("\nWARNING: Could not retrieve voices from espeak-ng. Language selection may be broken!\n")
# Set share=True for Hugging Face Spaces deployment.
# debug=True can be helpful locally but disable for production.
# server_name="0.0.0.0" allows access within Docker/network.
demo.launch(share=True, server_name="0.0.0.0")
print("Gradio App Launched.")