Spaces:
Runtime error
Runtime error
import gradio as gr | |
import subprocess | |
import os | |
import tempfile | |
import shutil | |
import re | |
import logging | |
from pathlib import Path | |
from PIL import Image, UnidentifiedImageError # For checking image validity | |
try: | |
import mutagen | |
from mutagen.mp3 import MP3, EasyMP3 | |
from mutagen.oggvorbis import OggVorbis | |
from mutagen.flac import FLAC | |
from mutagen.mp4 import MP4, MP4Cover | |
from mutagen.id3 import ID3, APIC, PictureType, error as ID3Error | |
MUTAGEN_AVAILABLE = True | |
except ImportError: | |
MUTAGEN_AVAILABLE = False | |
logging.warning("Mutagen library not found. Cover art embedding will be disabled.") | |
logging.warning("Install it using: pip install mutagen") | |
# --- Configuration & Logging --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Helper Functions --- | |
def check_command(command): | |
"""Checks if a command exists in the system's PATH.""" | |
try: | |
if os.name == 'nt': | |
# 'where' command on Windows | |
subprocess.run(['where', command], check=True, capture_output=True, timeout=5) | |
else: | |
# 'command -v' is generally preferred and more portable than 'which' on Unix-like systems | |
subprocess.run(['command', '-v', command], check=True, capture_output=True, timeout=5) | |
logging.info(f"Command '{command}' found.") | |
return True | |
except FileNotFoundError: | |
logging.error(f"Command '{command}' check tool ('where' or 'command') not found.") | |
return False | |
except subprocess.CalledProcessError: | |
logging.warning(f"Command '{command}' not found in PATH.") | |
return False | |
except subprocess.TimeoutExpired: | |
logging.error(f"Timeout checking for command '{command}'. Assuming not found.") | |
return False | |
except Exception as e: | |
logging.error(f"Unexpected error checking for command '{command}': {e}") | |
return False | |
def get_espeak_voices(): | |
"""Gets available espeak-ng voices and their languages.""" | |
voices = {} | |
try: | |
cmd = ['espeak-ng', '--voices'] | |
logging.info(f"Getting voices with command: {' '.join(cmd)}") | |
# Use a timeout to prevent hanging if espeak-ng has issues | |
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8', errors='ignore', timeout=15) | |
# Example line format: P L V Language Code Age/Gender VoiceName File Other Langs | |
# 2 y en-US M american-english-us Mbrola/us1 (en 10) | |
# 1 af M afrikaans Afrikaans | |
# More robust pattern to handle variations | |
pattern = re.compile(r"^\s*\d+\s+[yn\-]\s+([\w\-]+)\s+[MF\-]?\s+([\w\s\(\)\-]+?)\s+([\w\/\s\-]+?)(?:\s+\(.*\))?\s*$") | |
lines = result.stdout.splitlines() | |
if not lines or len(lines) < 2: # Check if there's output beyond the header | |
logging.warning("No voice lines found in 'espeak-ng --voices' output.") | |
raise ValueError("No voice data returned.") | |
for line in lines[1:]: # Skip header | |
match = pattern.match(line.strip()) | |
if match: | |
# Extract code (group 1) and language name (group 2) | |
code = match.group(1).strip() | |
lang_name = match.group(2).strip() | |
# Clean up language name (remove potential file paths sometimes included) | |
lang_name = lang_name.split(" ")[0] | |
# Prioritize names like "english-us" over just "english" if code reflects it | |
display_name = f"{lang_name.replace('-', ' ').title()} ({code})" | |
# Avoid duplicates, preferring more specific codes if names clash slightly | |
if display_name not in voices: | |
voices[display_name] = code | |
else: | |
# Simpler split as fallback for lines that don't match complex regex | |
parts = line.split() | |
if len(parts) >= 4 and parts[0].isdigit(): | |
code = parts[1] | |
lang_name = parts[3] | |
display_name = f"{lang_name.strip().title()} ({code})" | |
if display_name not in voices: | |
voices[display_name] = code | |
else: | |
logging.warning(f"Could not parse voice line: {line}") | |
if not voices: | |
logging.warning("Could not parse any voices from 'espeak-ng --voices'. Using fallback list.") | |
raise ValueError("Parsing failed.") | |
# Sort voices alphabetically by display name | |
sorted_voices = dict(sorted(voices.items())) | |
logging.info(f"Found {len(sorted_voices)} espeak-ng voices.") | |
return sorted_voices | |
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError, Exception) as e: | |
logging.error(f"Error getting espeak-ng voices: {e}") | |
# Provide a basic fallback list if the command fails or parsing fails | |
return {"English (en)": "en", "Spanish (es)": "es", "French (fr)": "fr", "German (de)": "de"} | |
# --- Main Conversion Logic --- | |
def convert_ebook_to_audio(ebook_file, language_display, output_format, embed_cover, progress=gr.Progress(track_tqdm=True)): | |
""" | |
Converts an ebook file to an audiobook using Calibre and espeak-ng. | |
""" | |
if not ebook_file: | |
return None, None, "**Error:** No ebook file provided." | |
# Check required commands *before* creating temp dir | |
calibre_convert_ok = check_command("ebook-convert") | |
calibre_meta_ok = check_command("ebook-meta") # Check always, needed logic follows | |
espeak_ok = check_command("espeak-ng") | |
lame_ok = check_command("lame") | |
oggenc_ok = check_command("oggenc") # From vorbis-tools | |
missing = [] | |
if not calibre_convert_ok: missing.append("Calibre ('ebook-convert')") | |
if not calibre_meta_ok: missing.append("Calibre ('ebook-meta' - for cover art)") | |
if not espeak_ok: missing.append("espeak-ng") | |
if not lame_ok and output_format == 'mp3': missing.append("LAME (for MP3)") | |
if not oggenc_ok and output_format == 'ogg': missing.append("oggenc (for OGG)") | |
if missing: | |
error_msg = f"**Error:** Missing required system command(s):\n- {', '.join(missing)}\n\nPlease ensure they are installed in the environment (check packages.txt)." | |
logging.error(error_msg.replace("**Error:** ","").replace("\n- "," ").replace("\n"," ")) # Log plain text | |
return None, None, error_msg | |
temp_dir = tempfile.mkdtemp(prefix="ebook_audio_") | |
logging.info(f"Created temporary directory: {temp_dir}") | |
status_updates = ["βΆοΈ Conversion process started..."] | |
cover_image_path_final = None # Track final usable cover path for display/embedding | |
audio_output_path_final = None # Keep track of the final audio path for return | |
try: | |
input_ebook_path = ebook_file.name # Gradio provides a temp path for the upload | |
# Sanitize filename slightly for output files | |
base_filename = re.sub(r'[^\w\-]+', '_', Path(input_ebook_path).stem) | |
txt_output_path = os.path.join(temp_dir, f"{base_filename}.txt") | |
# Use a generic name first, then check format | |
cover_output_path_temp = os.path.join(temp_dir, "cover_temp") | |
audio_output_path = os.path.join(temp_dir, f"{base_filename}.{output_format}") | |
# --- Step 1: Extract Cover Art (Optional) --- | |
cover_extracted = False | |
if embed_cover and calibre_meta_ok: | |
progress(0.1, desc="πΌοΈ Extracting cover art (optional)...") | |
status_updates.append(" Attempting to extract cover art...") | |
try: | |
cmd_meta = ['ebook-meta', input_ebook_path, '--get-cover', cover_output_path_temp] | |
logging.info(f"Running cover extraction: {' '.join(cmd_meta)}") | |
# Use timeout for ebook-meta as well | |
result_meta = subprocess.run(cmd_meta, check=True, capture_output=True, text=True, errors='ignore', timeout=30) | |
if os.path.exists(cover_output_path_temp) and os.path.getsize(cover_output_path_temp) > 0: | |
# Validate image and get format | |
try: | |
img = Image.open(cover_output_path_temp) | |
img.verify() # Basic check | |
img.close() # Need to close after verify | |
# Reopen to check format properly and prepare final path | |
img = Image.open(cover_output_path_temp) | |
img_format = img.format.lower() if img.format else 'jpeg' # Default guess | |
img.close() | |
# Define final path with correct extension | |
valid_ext = f".{img_format}" if img_format in ['jpeg', 'png', 'gif'] else ".jpg" # Default to jpg | |
cover_image_path_final = os.path.join(temp_dir, f"cover_final{valid_ext}") | |
shutil.move(cover_output_path_temp, cover_image_path_final) # Rename with correct extension | |
cover_extracted = True | |
status_updates.append(f" β Cover art extracted successfully ({img_format.upper()}).") | |
logging.info(f"Cover art extracted to {cover_image_path_final}") | |
except (IOError, SyntaxError, UnidentifiedImageError) as img_err: | |
logging.warning(f"Extracted file at {cover_output_path_temp} is not a valid image: {img_err}") | |
status_updates.append(" β οΈ Extracted 'cover' file is not a valid image. Will skip embedding.") | |
if os.path.exists(cover_output_path_temp): os.remove(cover_output_path_temp) # Clean up invalid file | |
if cover_image_path_final and os.path.exists(cover_image_path_final): os.remove(cover_image_path_final) | |
cover_image_path_final = None # Ensure it's None | |
else: | |
status_updates.append(" βΉοΈ No cover art found in the ebook metadata.") | |
logging.info("ebook-meta ran but did not produce a cover file or it was empty.") | |
if os.path.exists(cover_output_path_temp): os.remove(cover_output_path_temp) # Clean up empty file | |
except subprocess.TimeoutExpired: | |
status_updates.append(f" β οΈ Timeout trying to extract cover art.") | |
logging.warning(f"ebook-meta timed out.") | |
except subprocess.CalledProcessError as e: | |
stderr_decoded = e.stderr.decode(errors='ignore').strip() if e.stderr else "No stderr" | |
status_updates.append(f" β οΈ Failed to extract cover art. Error: {stderr_decoded[:200]}{'...' if len(stderr_decoded)>200 else ''}") # Keep it short | |
logging.warning(f"ebook-meta failed: {stderr_decoded}") | |
except Exception as e: | |
status_updates.append(f" β οΈ An unexpected error occurred during cover extraction: {e}") | |
logging.error(f"Unexpected error during cover extraction: {e}", exc_info=True) | |
# Ensure temp file is removed if final path wasn't set | |
if not cover_image_path_final and os.path.exists(cover_output_path_temp): | |
os.remove(cover_output_path_temp) | |
elif embed_cover and not calibre_meta_ok: | |
status_updates.append(" βΉοΈ Cover art embedding requested, but 'ebook-meta' command not found.") | |
elif embed_cover and not MUTAGEN_AVAILABLE: | |
status_updates.append(" βΉοΈ Cover art embedding requested, but 'mutagen' Python library not installed.") | |
# --- Step 2: Convert Ebook to TXT --- | |
progress(0.3, desc="π Converting ebook to TXT...") | |
status_updates.append("π Converting ebook to plain text...") | |
try: | |
# Add options known to help with TXT output quality | |
# --input-encoding=utf8 is often needed for non-ASCII content | |
cmd_convert = [ | |
'ebook-convert', input_ebook_path, txt_output_path, | |
'--enable-heuristics', | |
'--output-profile=generic_eink', # Profiles can influence text formatting | |
'--input-encoding=utf8', # Try specifying UTF-8 | |
'--pretty-print' # Can sometimes help structure | |
] | |
logging.info(f"Running ebook conversion: {' '.join(cmd_convert)}") | |
# Increased timeout for potentially large books | |
result_convert = subprocess.run(cmd_convert, check=True, capture_output=True, encoding='utf-8', errors='ignore', timeout=300) # 5 mins | |
# Log stdout/stderr even on success for warnings | |
if result_convert.stdout: logging.info(f"ebook-convert stdout: {result_convert.stdout.strip()}") | |
if result_convert.stderr: logging.warning(f"ebook-convert stderr: {result_convert.stderr.strip()}") | |
status_updates.append(" β Ebook converted to TXT.") | |
logging.info("Ebook successfully converted to TXT.") | |
except subprocess.TimeoutExpired: | |
error_msg = "**Error:** Calibre conversion timed out (may be a very large or complex book)." | |
status_updates.append(f" β Calibre conversion timed out.") | |
logging.error("Error during Calibre conversion: Timeout") | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
except subprocess.CalledProcessError as e: | |
stderr_decoded = e.stderr.decode(errors='ignore').strip() if e.stderr else "No stderr" | |
stdout_decoded = e.stdout.decode(errors='ignore').strip() if e.stdout else "No stdout" | |
error_details = f"Stderr:\n```\n{stderr_decoded}\n```\nStdout:\n```\n{stdout_decoded}\n```" if stderr_decoded or stdout_decoded else str(e) | |
error_msg = f"**Error:** Calibre conversion failed (Exit Code {e.returncode}).\n{error_details}" | |
status_updates.append(f" β Calibre conversion failed.") | |
logging.error(f"Error during Calibre conversion: Exit Code {e.returncode}\nStderr: {stderr_decoded}\nStdout: {stdout_decoded}") | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
except Exception as e: | |
error_msg = f"**Error:** An unexpected error occurred during ebook conversion:\n{e}" | |
status_updates.append(f" β Unexpected conversion error.") | |
logging.error(f"Unexpected error during ebook conversion: {e}", exc_info=True) | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
# Check if TXT file was actually created and is not empty | |
if not os.path.exists(txt_output_path) or os.path.getsize(txt_output_path) == 0: | |
error_msg = "**Error:** Calibre finished, but the output TXT file is missing or empty.\nThis can happen with image-based ebooks (like scanned PDFs, comics, CBZ/CBR) or DRM-protected files.\nCalibre cannot process these types into text." | |
status_updates.append(f" β TXT output empty/missing.") | |
logging.error("Calibre finished, but the output TXT file is missing or empty.") | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
# --- Step 3: Convert TXT to Audio --- | |
progress(0.6, desc="π£οΈ Converting TXT to Audio...") | |
status_updates.append("π£οΈ Converting text to speech...") | |
voice_code = available_voices.get(language_display, 'en') # Get code from display name | |
# Base espeak-ng command: specify voice, read from file | |
cmd_speak = ['espeak-ng', '-v', voice_code, '-f', txt_output_path] | |
# Optionally add speed or other espeak parameters here: | |
# cmd_speak.extend(['-s', '160']) # Example: Set speed (default 175) | |
try: | |
logging.info(f"Preparing audio command for format: {output_format}") | |
# Define timeout for TTS process (can be long for large books) | |
tts_timeout = 1800 # 30 minutes | |
if output_format == 'wav': | |
cmd_speak.extend(['-w', audio_output_path]) | |
logging.info(f"Running espeak-ng (WAV): {' '.join(cmd_speak)}") | |
result_speak = subprocess.run(cmd_speak, check=True, capture_output=True, timeout=tts_timeout) | |
if result_speak.stderr: logging.warning(f"espeak-ng stderr (WAV): {result_speak.stderr.decode(errors='ignore').strip()}") | |
elif output_format == 'mp3': | |
if not lame_ok: raise FileNotFoundError("LAME command not found") | |
cmd_speak.append('--stdout') # espeak outputs WAV to stdout | |
cmd_lame = ['lame', '-', audio_output_path] # LAME reads WAV from stdin, outputs MP3 | |
logging.info(f"Running pipe: {' '.join(cmd_speak)} | {' '.join(cmd_lame)}") | |
# Start espeak-ng process | |
ps_speak = subprocess.Popen(cmd_speak, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# Start LAME process, piping espeak's stdout to LAME's stdin | |
ps_lame = subprocess.Popen(cmd_lame, stdin=ps_speak.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# *** Crucial: Allow ps_speak stdout to be closed by ps_lame if it finishes reading *** | |
if ps_speak.stdout: | |
ps_speak.stdout.close() | |
# Capture stderr from both processes, wait for LAME first (end of pipeline) | |
try: | |
lame_stdout_bytes, lame_stderr_bytes = ps_lame.communicate(timeout=tts_timeout + 60) # Allow extra time for encoding | |
except subprocess.TimeoutExpired: | |
logging.error("LAME process timed out.") | |
ps_speak.kill() # Kill upstream process too | |
ps_lame.kill() | |
raise subprocess.TimeoutExpired(cmd_lame, tts_timeout + 60) | |
# Now wait for espeak and capture its stderr | |
speak_stderr_bytes = ps_speak.stderr.read() if ps_speak.stderr else b"" | |
ps_speak.wait() # Wait for espeak to fully terminate | |
if ps_speak.stderr: ps_speak.stderr.close() | |
# Decode stderr for logging/errors | |
lame_stderr_str = lame_stderr_bytes.decode(errors='ignore').strip() | |
speak_stderr_str = speak_stderr_bytes.decode(errors='ignore').strip() | |
# Check return codes AFTER both processes finished | |
if ps_lame.returncode != 0: | |
logging.error(f"LAME failed with exit code {ps_lame.returncode}. LAME stderr: {lame_stderr_str}") | |
raise subprocess.CalledProcessError(ps_lame.returncode, cmd_lame, stderr=lame_stderr_bytes) | |
if ps_speak.returncode != 0: | |
logging.error(f"espeak-ng failed with exit code {ps_speak.returncode}. espeak-ng stderr: {speak_stderr_str}") | |
raise subprocess.CalledProcessError(ps_speak.returncode, cmd_speak, stderr=speak_stderr_bytes) | |
# Log any non-fatal warnings from stderr | |
if lame_stderr_str: logging.warning(f"LAME stderr: {lame_stderr_str}") | |
if speak_stderr_str: logging.warning(f"espeak-ng stderr: {speak_stderr_str}") | |
elif output_format == 'ogg': | |
if not oggenc_ok: raise FileNotFoundError("oggenc command not found") | |
cmd_speak.append('--stdout') # espeak outputs WAV to stdout | |
# oggenc reads WAV from stdin ('-') and writes to output file ('-o') | |
cmd_ogg = ['oggenc', '-o', audio_output_path, '-'] | |
logging.info(f"Running pipe: {' '.join(cmd_speak)} | {' '.join(cmd_ogg)}") | |
ps_speak = subprocess.Popen(cmd_speak, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
ps_ogg = subprocess.Popen(cmd_ogg, stdin=ps_speak.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if ps_speak.stdout: | |
ps_speak.stdout.close() | |
try: | |
ogg_stdout_bytes, ogg_stderr_bytes = ps_ogg.communicate(timeout=tts_timeout + 60) | |
except subprocess.TimeoutExpired: | |
logging.error("oggenc process timed out.") | |
ps_speak.kill() | |
ps_ogg.kill() | |
raise subprocess.TimeoutExpired(cmd_ogg, tts_timeout + 60) | |
speak_stderr_bytes = ps_speak.stderr.read() if ps_speak.stderr else b"" | |
ps_speak.wait() | |
if ps_speak.stderr: ps_speak.stderr.close() | |
ogg_stderr_str = ogg_stderr_bytes.decode(errors='ignore').strip() | |
speak_stderr_str = speak_stderr_bytes.decode(errors='ignore').strip() | |
if ps_ogg.returncode != 0: | |
logging.error(f"oggenc failed with exit code {ps_ogg.returncode}. oggenc stderr: {ogg_stderr_str}") | |
raise subprocess.CalledProcessError(ps_ogg.returncode, cmd_ogg, stderr=ogg_stderr_bytes) | |
if ps_speak.returncode != 0: | |
logging.error(f"espeak-ng failed with exit code {ps_speak.returncode}. espeak-ng stderr: {speak_stderr_str}") | |
raise subprocess.CalledProcessError(ps_speak.returncode, cmd_speak, stderr=speak_stderr_bytes) | |
if ogg_stderr_str: logging.warning(f"oggenc stderr: {ogg_stderr_str}") | |
if speak_stderr_str: logging.warning(f"espeak-ng stderr: {speak_stderr_str}") | |
else: | |
raise ValueError(f"Unsupported output format selected: {output_format}") | |
status_updates.append(" β Text converted to audio.") | |
logging.info(f"Text successfully converted to {output_format.upper()}.") | |
except subprocess.CalledProcessError as e: | |
command_name = Path(e.cmd[0]).name if isinstance(e.cmd, list) else e.cmd | |
stderr_str = e.stderr.decode(errors='ignore').strip() if isinstance(e.stderr, bytes) else (e.stderr or "") | |
stdout_str = e.stdout.decode(errors='ignore').strip() if isinstance(e.stdout, bytes) else (e.stdout or "") | |
error_details = stderr_str or stdout_str or "No output/error captured." | |
exit_status_str = f"exit status {e.returncode}" if e.returncode is not None else "unknown exit status" | |
cmd_str = ' '.join(e.cmd) if isinstance(e.cmd, list) else e.cmd | |
error_msg = (f"**Error:** Audio generation failed.\n\n" | |
f"**Process:** `{command_name}`\n" | |
f"**Command:**\n```\n{cmd_str}\n```\n" | |
f"**Exit Status:** {exit_status_str}\n\n" | |
f"**Output/Error:**\n```\n{error_details}\n```") | |
status_updates.append(f" β Audio generation failed ({command_name}).") | |
logging.error(f"Audio generation failed. Command: `{cmd_str}` Exit: {exit_status_str} Details: {error_details}") | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
except subprocess.TimeoutExpired as e: | |
command_name = Path(e.cmd[0]).name if isinstance(e.cmd, list) else e.cmd | |
error_msg = f"**Error:** Audio generation timed out (over {e.timeout}s) during `{command_name}` processing.\nThe ebook might be too long for the current timeout limit." | |
status_updates.append(f" β Audio generation timed out.") | |
logging.error(f"Audio generation timed out for command: {' '.join(e.cmd)}") | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
except FileNotFoundError as e: | |
# This should ideally be caught by initial checks, but handle defensively | |
missing_cmd = e.filename or "Unknown command" | |
error_msg = f"**Error:** Command `{missing_cmd}` not found during audio generation for {output_format.upper()} output.\nPlease check `packages.txt`." | |
status_updates.append(f" β Command '{missing_cmd}' not found.") | |
logging.error(f"Error: Command '{missing_cmd}' not found during execution.") | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
except Exception as e: | |
error_msg = f"**Error:** An unexpected error occurred during audio generation:\n```\n{e}\n```" | |
status_updates.append(f" β Unexpected audio error.") | |
logging.error(f"An unexpected error occurred during audio generation: {e}", exc_info=True) | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
# --- Step 3b: Verify Audio Output --- | |
if not os.path.exists(audio_output_path) or os.path.getsize(audio_output_path) < 256: # Check if file exists and has *some* data | |
error_msg = f"**Error:** Audio generation command finished, but the output file '{Path(audio_output_path).name}' is missing or empty/too small.\nCheck logs for potential errors during the TTS or encoding process." | |
status_updates.append(f" β Audio output missing or invalid.") | |
logging.error(f"Audio output file missing or too small after generation: {audio_output_path}") | |
# Try to provide more context if stderr was captured earlier | |
# last_stderr = speak_stderr_str or lame_stderr_str or ogg_stderr_str # From pipe section | |
# if last_stderr: error_msg += f"\nLast captured error output:\n```\n{last_stderr}\n```" | |
return None, cover_image_path_final, "\n".join(status_updates) + f"\n\n{error_msg}" | |
# --- Step 4: Embed Cover Art (Optional) --- | |
if embed_cover and cover_extracted and MUTAGEN_AVAILABLE and cover_image_path_final and os.path.exists(cover_image_path_final): | |
progress(0.9, desc="πΌοΈ Embedding cover art...") | |
status_updates.append("πΌοΈ Embedding cover art into audio file...") | |
try: | |
with open(cover_image_path_final, 'rb') as img_f: | |
cover_data = img_f.read() | |
# Determine mimetype robustly using Pillow | |
mime_type = 'image/jpeg' # Default | |
img_width, img_height, img_depth = 0, 0, 24 # Defaults for FLAC/OGG | |
try: | |
img = Image.open(cover_image_path_final) | |
mime_type = Image.MIME.get(img.format) | |
img_width, img_height = img.width, img.height | |
img_depth = {'RGB': 24, 'RGBA': 32, 'L': 8, 'P': 8}.get(img.mode, 24) # Palette 'P' often 8-bit | |
img.close() | |
if not mime_type: | |
ext = Path(cover_image_path_final).suffix.lower() | |
if ext == ".jpg" or ext == ".jpeg": mime_type = 'image/jpeg' | |
elif ext == ".png": mime_type = 'image/png' | |
else: raise ValueError("Unsupported image format for MIME detection") # Force fallback | |
logging.info(f"Using cover mime type: {mime_type}, Dimensions: {img_width}x{img_height}, Depth: {img_depth}") | |
except Exception as pil_err: | |
logging.warning(f"Could not determine MIME type/dimensions using PIL: {pil_err}. Falling back to image/jpeg.") | |
mime_type = 'image/jpeg' # Fallback | |
logging.info(f"Attempting to embed cover art ({mime_type}) into {audio_output_path}") | |
audio = mutagen.File(audio_output_path, easy=True) # Use easy=True for simple tags, fallback to non-easy for picture | |
if audio is None: | |
# Try loading without easy=True if easy fails | |
audio = mutagen.File(audio_output_path, easy=False) | |
if audio is None: | |
raise ValueError("Mutagen could not load the audio file. Format might be unsupported or file corrupted.") | |
# --- Add Title/Artist using Easy Interface if possible --- | |
try: | |
if isinstance(audio, mutagen.easy.EasyMutagen): # Check if Easy interface loaded | |
if not audio.get('title'): audio['title'] = Path(base_filename).name.replace('_', ' ') # Use sanitized filename base | |
if not audio.get('artist'): audio['artist'] = "Ebook Speaker" | |
audio.save() # Save easy tags | |
# Reload without easy=True for picture embedding if needed by format | |
audio = mutagen.File(audio_output_path, easy=False) | |
elif audio is not None: # Easy interface failed, try basic tags with normal interface | |
if not audio.tags.get('TIT2'): audio.tags.add(mutagen.id3.TIT2(encoding=3, text=Path(base_filename).name.replace('_', ' '))) | |
if not audio.tags.get('TPE1'): audio.tags.add(mutagen.id3.TPE1(encoding=3, text="Ebook Speaker")) | |
audio.save() | |
audio = mutagen.File(audio_output_path, easy=False) # Reload after save | |
except Exception as tag_err: | |
logging.warning(f"Could not set basic title/artist tags: {tag_err}") | |
# --- Embed Picture (using non-easy interface often required) --- | |
if audio is None: # Check again after potential reload | |
raise ValueError("Audio object became None after tag saving.") | |
# Clear existing art first (important!) | |
try: | |
audio.tags.delall('APIC') # ID3v2 (MP3) | |
audio.tags.delall('covr') # MP4 | |
if hasattr(audio, 'clear_pictures'): audio.clear_pictures() # FLAC | |
if "metadata_block_picture" in audio: del audio["metadata_block_picture"] # OggVorbis | |
audio.save() | |
# Reload again after deleting to ensure clean slate | |
audio = mutagen.File(audio_output_path, easy=False) | |
if audio is None: raise ValueError("Audio object None after clearing art.") | |
except (AttributeError, KeyError, TypeError, Exception) as clear_err: | |
logging.warning(f"Could not definitively clear existing artwork: {clear_err}. Proceeding anyway.") | |
# Add the new cover | |
save_needed = False | |
if isinstance(audio, (MP3, EasyMP3)): # Handles MP3 | |
if audio.tags is None: audio.add_tags() | |
audio.tags.add( | |
APIC( | |
encoding=3, # 3 is for UTF-8 | |
mime=mime_type, | |
type=PictureType.COVER_FRONT, # Use standard enum | |
desc='Cover', | |
data=cover_data | |
) | |
) | |
save_needed = True | |
elif isinstance(audio, FLAC): | |
pic = mutagen.flac.Picture() | |
pic.data = cover_data | |
pic.type = PictureType.COVER_FRONT | |
pic.mime = mime_type | |
pic.width = img_width | |
pic.height = img_height | |
pic.depth = img_depth | |
audio.add_picture(pic) | |
save_needed = True | |
elif isinstance(audio, OggVorbis): | |
# Ogg Vorbis uses base64 encoded FLAC Picture block | |
import base64 | |
pic = mutagen.flac.Picture() | |
pic.data = cover_data | |
pic.type = PictureType.COVER_FRONT | |
pic.mime = mime_type | |
pic.width = img_width | |
pic.height = img_height | |
pic.depth = img_depth | |
audio["METADATA_BLOCK_PICTURE"] = [base64.b64encode(pic.write()).decode("ascii")] | |
save_needed = True | |
elif isinstance(audio, MP4): # Handles M4A/M4B | |
if mime_type == 'image/jpeg': pic_format = MP4Cover.FORMAT_JPEG | |
elif mime_type == 'image/png': pic_format = MP4Cover.FORMAT_PNG | |
else: pic_format = MP4Cover.FORMAT_UNDEFINED | |
if pic_format != MP4Cover.FORMAT_UNDEFINED: | |
audio['covr'] = [MP4Cover(cover_data, imageformat=pic_format)] | |
save_needed = True | |
else: logging.warning(f"Unsupported cover image format ({mime_type}) for MP4 embedding.") | |
else: | |
logging.warning(f"Cover embedding not implemented for this audio type: {type(audio)}") | |
if save_needed: | |
audio.save() | |
status_updates.append(" β Cover art embedded successfully.") | |
logging.info("Cover art embedded successfully.") | |
elif embed_cover: # Only report skip if embedding was attempted but failed type match | |
status_updates.append(" β οΈ Cover embedding skipped (unsupported audio format for mutagen?).") | |
logging.warning(f"Could not embed cover: audio format {type(audio)} not explicitly handled.") | |
except (mutagen.MutagenError, ValueError, IOError, TypeError, KeyError, AttributeError) as e: | |
status_updates.append(f" β οΈ Could not embed cover art. Error: {str(e)[:100]}...") | |
logging.warning(f"Failed to embed cover art: {e}", exc_info=True) | |
except Exception as e: | |
status_updates.append(f" β οΈ An unexpected error occurred during cover art embedding.") | |
logging.error(f"Unexpected error during cover embedding: {e}", exc_info=True) | |
elif embed_cover and not cover_extracted: | |
status_updates.append(" βΉοΈ Cover art embedding skipped (no cover extracted or invalid).") | |
elif embed_cover and not MUTAGEN_AVAILABLE: | |
# This was logged earlier, but confirm skip in status | |
status_updates.append(" β οΈ Cover art embedding skipped (Mutagen library not installed).") | |
# --- Step 5: Prepare final output --- | |
progress(1.0, desc="β Complete!") | |
status_updates.append("π Conversion complete!") | |
audio_output_path_final = audio_output_path # Mark the path as final | |
# Return paths for Gradio components | |
final_status = "\n".join(status_updates) | |
logging.info(f"Returning audio: {audio_output_path_final}, cover: {cover_image_path_final}, Status: Success.") | |
# Return audio path for Audio component, cover path for Image, status for Textbox | |
return audio_output_path_final, cover_image_path_final, final_status | |
except Exception as e: | |
# Catch-all for unexpected errors in the main try block | |
error_msg = f"An unexpected critical error occurred in the main process: {e}" | |
status_updates.append(f" β CRITICAL ERROR: {error_msg}") | |
logging.error(error_msg, exc_info=True) | |
# Return None for audio, cover path (if extracted), and the error status | |
final_status = "\n".join(status_updates) | |
return None, cover_image_path_final, f"{final_status}\n\n**Error:** An unexpected critical error occurred.\nCheck application logs for details.\n{e}" | |
finally: | |
# --- Cleanup --- | |
# We leave the final audio and cover files in temp_dir for Gradio to serve. | |
# Clean up intermediate files ONLY. | |
try: | |
if 'txt_output_path' in locals() and os.path.exists(txt_output_path): | |
os.remove(txt_output_path) | |
logging.info(f"Removed intermediate file: {txt_output_path}") | |
# Remove temporary cover if it's different from final or if final doesn't exist | |
if 'cover_output_path_temp' in locals() and os.path.exists(cover_output_path_temp): | |
if not cover_image_path_final or cover_output_path_temp != cover_image_path_final: | |
os.remove(cover_output_path_temp) | |
logging.info(f"Removed intermediate file: {cover_output_path_temp}") | |
# Note: Gradio typically copies temp files, but leaving the dir might be safer | |
# If space becomes an issue, add shutil.rmtree(temp_dir) here, | |
# but ensure Gradio doesn't need the original files after the function returns. | |
logging.info(f"Temporary directory '{temp_dir}' contains final output files and will be cleaned up by Gradio/system later.") | |
except OSError as e: | |
logging.warning(f"Error during cleanup of intermediate files: {e}") | |
# --- Gradio Interface Definition --- | |
print("Initializing Gradio Interface...") | |
print("Fetching available eSpeak-NG voices...") | |
available_voices = get_espeak_voices() | |
voice_choices = list(available_voices.keys()) | |
print(f"Found {len(voice_choices)} voices.") | |
# Try to find a sensible default voice (e.g., US English) | |
default_voice = "English (en)" # Basic fallback | |
possible_defaults = [ | |
"English (United States) (en-us)", | |
"English (Us) (en-us)", # Variations in naming | |
"English (en-us)", | |
"English (Great Britain) (en-gb)", | |
"English (Gb) (en-gb)", | |
"English (en-gb)", | |
"English (en)", | |
] | |
for V in possible_defaults: | |
if V in voice_choices: | |
default_voice = V | |
break | |
if not voice_choices: | |
logging.error("FATAL: No espeak voices found or parsed. Language selection will fail.") | |
# Add a dummy entry if empty to prevent Gradio crash, though unusable | |
voice_choices = ["Error: No Voices Found"] | |
default_voice = voice_choices[0] | |
available_voices = {default_voice: "error"} | |
# Check for external tools on startup and display warnings if needed | |
print("Checking required external commands...") | |
startup_warnings = [] | |
if not check_command("ebook-convert"): startup_warnings.append("Calibre ('ebook-convert')") | |
if not check_command("ebook-meta"): startup_warnings.append("Calibre ('ebook-meta' - needed for cover art)") | |
if not check_command("espeak-ng"): startup_warnings.append("espeak-ng (core TTS engine)") | |
if not check_command("lame"): startup_warnings.append("LAME (needed for MP3 output)") | |
if not check_command("oggenc"): startup_warnings.append("oggenc (needed for OGG output, from 'vorbis-tools')") | |
if not MUTAGEN_AVAILABLE: startup_warnings.append("Python 'mutagen' library (needed for embedding cover art - install via requirements.txt)") | |
startup_message = "" | |
if startup_warnings: | |
warning_list = "\n- ".join(startup_warnings) | |
startup_message = ( | |
"**β οΈ Startup Warning: The following components might be missing or not found:**\n\n" | |
f"- {warning_list}\n\n" | |
"Please ensure system packages are listed in `packages.txt` and Python libraries in `requirements.txt`. " | |
"Functionality relying on missing components will fail. Check container build logs for installation errors." | |
) | |
print("--- STARTUP WARNING ---") | |
print(f"Missing components: {', '.join(startup_warnings)}") | |
print("-----------------------") | |
# Define UI Elements | |
print("Building Gradio UI...") | |
with gr.Blocks(theme=gr.themes.Soft(), title="Ebook to Audiobook") as demo: | |
gr.Markdown( | |
""" | |
# Ebook to Audiobook Converter π§π | |
**Convert your ebooks (EPUB, MOBI, AZW3, FB2, PDF*, etc.) into audiobooks!** | |
Upload your ebook, select the desired language/voice and audio format, and click Convert. | |
Optionally, embed the cover art into the audio file metadata. | |
*(*) Note: PDF conversion works best for text-based PDFs. Scanned images or complex layouts may result in poor text extraction.* | |
""" | |
) | |
if startup_message: | |
gr.Warning(startup_message) # Use Gradio's warning component | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=300): | |
ebook_input = gr.File(label="1. Upload Ebook File", file_count="single", type="file") # Use type="file" for direct path access | |
lang_dropdown = gr.Dropdown( | |
label="2. Select Language / Voice", | |
choices=voice_choices, | |
value=default_voice if default_voice in voice_choices else (voice_choices[0] if voice_choices else None), | |
info="Uses voices available from espeak-ng.", | |
interactive=True | |
) | |
format_dropdown = gr.Dropdown( | |
label="3. Select Output Audio Format", | |
choices=["mp3", "ogg", "wav"], | |
value="mp3", | |
info="MP3 offers good compatibility and compression. OGG is open source. WAV is uncompressed.", | |
interactive=True | |
) | |
cover_checkbox = gr.Checkbox( | |
label="Embed Cover Art (if available)", | |
value=True if MUTAGEN_AVAILABLE else False, # Default based on library presence | |
info="Requires 'mutagen' library and 'ebook-meta' command.", | |
# interactive=True # Removed dynamic interactive setting to avoid potential Gradio bug | |
# Let the backend handle skipping if dependencies are missing. | |
interactive=True # Let's try keeping it interactive, the check is internal now. | |
) | |
submit_button = gr.Button("Convert to Audiobook", variant="primary", icon="βΆοΈ") | |
with gr.Column(scale=2, min_width=400): | |
status_textbox = gr.Textbox( | |
label="π Conversion Status & Log", | |
lines=10, | |
max_lines=20, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Conversion progress will appear here..." | |
) | |
with gr.Row(): | |
# Output components: Image for cover, Audio for playback | |
cover_image = gr.Image( | |
label="πΌοΈ Extracted Cover Art", | |
type="filepath", # Function returns a path | |
interactive=False, | |
height=250, | |
show_download_button=True | |
) | |
audio_output_player = gr.Audio( | |
label="π§ Generated Audiobook", | |
type="filepath", # Function returns a path | |
interactive=False # Playback is interactive, but component value isn't set by user | |
) | |
# REMOVED separate download button - gr.Audio and gr.Image have download capabilities | |
# Connect components | |
submit_button.click( | |
fn=convert_ebook_to_audio, | |
inputs=[ebook_input, lang_dropdown, format_dropdown, cover_checkbox], | |
# Outputs map directly to the components defined above | |
outputs=[audio_output_player, cover_image, status_textbox] | |
) | |
gr.Markdown("--- \n *Powered by Calibre, eSpeak-NG, LAME, OggEnc, Mutagen, and Gradio.*") | |
# --- Launch the App --- | |
if __name__ == "__main__": | |
print("Starting Gradio App Server...") | |
if not voice_choices or voice_choices[0].startswith("Error"): | |
print("\nWARNING: Could not retrieve voices from espeak-ng. Language selection may be broken!\n") | |
# Set share=True for Hugging Face Spaces deployment. | |
# debug=True can be helpful locally but disable for production. | |
# server_name="0.0.0.0" allows access within Docker/network. | |
demo.launch(share=True, server_name="0.0.0.0") | |
print("Gradio App Launched.") |