|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import List
|
|
import os
|
|
import uuid
|
|
import aiohttp
|
|
import logging
|
|
import openai
|
|
from pathlib import Path
|
|
import subprocess
|
|
import shutil
|
|
import ssl
|
|
import json
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydub import AudioSegment
|
|
from PIL import Image
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
|
|
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
|
|
BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
|
|
|
|
|
|
openai.api_key = OPENAI_API_KEY
|
|
if OPENAI_BASE_URL:
|
|
openai.api_base = OPENAI_BASE_URL
|
|
|
|
app = FastAPI()
|
|
|
|
app.mount("/storage", StaticFiles(directory="storage"), name="storage")
|
|
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
class ComicData(BaseModel):
|
|
captions: List[str]
|
|
speeches: List[str]
|
|
panels: List[str]
|
|
|
|
|
|
async def download_image(url, output_path):
|
|
try:
|
|
|
|
ssl_context = ssl.create_default_context()
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
|
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
with open(output_path, 'wb') as f:
|
|
f.write(await response.read())
|
|
with Image.open(output_path) as img:
|
|
width, height = img.size
|
|
return output_path, width
|
|
else:
|
|
logger.error(f"Failed to download image: {response.status}")
|
|
return None, 0
|
|
except Exception as e:
|
|
logger.error(f"Error downloading image: {e}")
|
|
return None, 0
|
|
|
|
|
|
async def generate_speech(text, voice="alloy", output_path=None):
|
|
try:
|
|
if not output_path:
|
|
output_path = f"{uuid.uuid4()}.mp3"
|
|
|
|
response = openai.audio.speech.create(
|
|
model="tts-1",
|
|
voice=voice,
|
|
input=text
|
|
)
|
|
|
|
|
|
with open(output_path, "wb") as f:
|
|
f.write(response.content)
|
|
|
|
return output_path
|
|
except Exception as e:
|
|
logger.error(f"Error generating speech: {e}")
|
|
return None
|
|
|
|
|
|
def get_audio_duration(audio_path):
|
|
try:
|
|
audio = AudioSegment.from_file(audio_path)
|
|
return len(audio) / 1000.0
|
|
except Exception as e:
|
|
logger.error(f"Error getting audio duration: {e}")
|
|
return 5.0
|
|
|
|
|
|
ASS_STYLE_HEADER = """
|
|
[Script Info]
|
|
WrapStyle: 0
|
|
ScaledBorderAndShadow: yes
|
|
PlayResX: 1920
|
|
PlayResY: 1080
|
|
|
|
[V4+ Styles]
|
|
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
|
|
; Caption样式:高亮青蓝,深蓝描边
|
|
Style: Caption,Noto Sans CJK SC,46,&H00FFFF44,&H0000FFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,3,2,10,10,39,0
|
|
; Speech样式:鲜亮黄,黑色描边(最醒目)
|
|
Style: Speech,Noto Sans CJK SC,42,&H00FF77FF,&H00FFFFFF,&H003800BF,&H00000000,0,0,0,0,100,100,0,0,1,2,3,8,10,10,39,0
|
|
|
|
[Events]
|
|
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
|
|
"""
|
|
def smart_wrap(text, image_width, font_size=48):
|
|
"""动态计算每行字符数"""
|
|
|
|
avg_char_width = font_size * 0.6
|
|
max_chars_per_line = max(1, int(image_width / avg_char_width) - 2)
|
|
|
|
|
|
lines = []
|
|
current_line = []
|
|
current_width = 0
|
|
|
|
for char in text:
|
|
char_width = font_size if ord(char) > 255 else font_size//2
|
|
if current_width + char_width > image_width - 100:
|
|
lines.append(''.join(current_line))
|
|
current_line = [char]
|
|
current_width = char_width
|
|
else:
|
|
current_line.append(char)
|
|
current_width += char_width
|
|
lines.append(''.join(current_line))
|
|
|
|
return r'\N'.join(lines)
|
|
|
|
|
|
def create_caption_subtitle_file(project_dir, captions, panel_start_times, panel_durations, image_widths):
|
|
try:
|
|
subtitle_file = os.path.join(project_dir, "captions.ass")
|
|
|
|
with open(subtitle_file, "w", encoding="utf-8") as f:
|
|
f.write(ASS_STYLE_HEADER)
|
|
for i, (caption, start, duration, width) in enumerate(zip(captions, panel_start_times, panel_durations, image_widths)):
|
|
wrapped_text = smart_wrap(caption, width)
|
|
f.write(
|
|
f"Dialogue: 0,{format_time(start)},{format_time(start + duration)},"
|
|
f"Caption,,0,0,0,,{wrapped_text}\n"
|
|
)
|
|
|
|
return subtitle_file
|
|
except Exception as e:
|
|
logger.error(f"Error creating caption subtitle file: {e}")
|
|
return None
|
|
|
|
|
|
def create_speech_subtitle_file(project_dir, speeches, panel_start_times, panel_durations, image_widths):
|
|
try:
|
|
subtitle_file = os.path.join(project_dir, "speeches.ass")
|
|
|
|
with open(subtitle_file, "w", encoding="utf-8") as f:
|
|
f.write(ASS_STYLE_HEADER)
|
|
for i, (speech, start, duration, width) in enumerate(zip(speeches, panel_start_times, panel_durations, image_widths)):
|
|
wrapped_text = smart_wrap(speech, width)
|
|
f.write(
|
|
f"Dialogue: 0,{format_time(start)},{format_time(start + duration)},"
|
|
f"Speech,,0,0,0,,{wrapped_text}\n"
|
|
)
|
|
|
|
return subtitle_file
|
|
except Exception as e:
|
|
logger.error(f"Error creating speech subtitle file: {e}")
|
|
return None
|
|
|
|
|
|
def format_time(seconds):
|
|
hours = int(seconds / 3600)
|
|
minutes = int((seconds % 3600) / 60)
|
|
secs = int(seconds % 60)
|
|
centisecs = int((seconds - int(seconds)) * 100)
|
|
return f"{hours}:{minutes:02}:{secs:02}.{centisecs:02}"
|
|
|
|
|
|
async def create_audio_file(project_dir, captions, speeches):
|
|
try:
|
|
audio_parts = []
|
|
audio_durations = {}
|
|
panel_start_times = [0]
|
|
current_time = 0
|
|
panel_durations = []
|
|
|
|
|
|
for i, (caption, speech) in enumerate(zip(captions, speeches)):
|
|
panel_audio_parts = []
|
|
panel_duration = 0
|
|
|
|
|
|
if caption:
|
|
caption_audio = os.path.join(project_dir, f"caption_{i}.mp3")
|
|
result = await generate_speech(caption, "f2ed19ca0ea246bf9cbc6382be00e4fc", caption_audio)
|
|
if result:
|
|
duration = get_audio_duration(caption_audio)
|
|
audio_durations[f"caption_{i}"] = duration
|
|
panel_audio_parts.append(caption_audio)
|
|
panel_duration += duration
|
|
|
|
|
|
if speech:
|
|
speech_audio = os.path.join(project_dir, f"speech_{i}.mp3")
|
|
result = await generate_speech(speech, "3b55b3d84d2f453a98d8ca9bb24182d6", speech_audio)
|
|
if result:
|
|
duration = get_audio_duration(speech_audio)
|
|
audio_durations[f"speech_{i}"] = duration
|
|
panel_audio_parts.append(speech_audio)
|
|
panel_duration += duration
|
|
|
|
|
|
if panel_duration == 0:
|
|
panel_duration = 5.0
|
|
|
|
panel_durations.append(panel_duration)
|
|
|
|
|
|
if panel_audio_parts:
|
|
panel_combined = os.path.join(project_dir, f"panel_{i}_combined.mp3")
|
|
combined = AudioSegment.empty()
|
|
|
|
for audio_path in panel_audio_parts:
|
|
segment = AudioSegment.from_file(audio_path)
|
|
combined += segment
|
|
|
|
combined.export(panel_combined, format="mp3")
|
|
audio_parts.append(panel_combined)
|
|
|
|
|
|
current_time += panel_duration
|
|
if i < len(captions) - 1:
|
|
panel_start_times.append(current_time)
|
|
|
|
if not audio_parts:
|
|
logger.error("No audio parts generated")
|
|
return None, {}, [], []
|
|
|
|
|
|
combined_audio = os.path.join(project_dir, "combined_audio.mp3")
|
|
final_combined = AudioSegment.empty()
|
|
|
|
for audio_path in audio_parts:
|
|
segment = AudioSegment.from_file(audio_path)
|
|
final_combined += segment
|
|
|
|
final_combined.export(combined_audio, format="mp3")
|
|
|
|
|
|
durations_file = os.path.join(project_dir, "audio_durations.json")
|
|
with open(durations_file, "w") as f:
|
|
json.dump(audio_durations, f)
|
|
|
|
|
|
panel_times_file = os.path.join(project_dir, "panel_times.json")
|
|
with open(panel_times_file, "w") as f:
|
|
json.dump({"start_times": panel_start_times, "durations": panel_durations}, f)
|
|
|
|
return combined_audio, audio_durations, panel_start_times, panel_durations
|
|
except Exception as e:
|
|
logger.error(f"Error creating audio file: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return None, {}, [], []
|
|
|
|
def process_sub_path(path):
|
|
"""深度处理FFmpeg路径转义"""
|
|
|
|
processed = Path(path).as_posix()
|
|
|
|
processed = processed.translate(str.maketrans({
|
|
':': r'\:',
|
|
"'": r"\\\'",
|
|
',': r'\\,',
|
|
'[': r'\\[',
|
|
']': r'\\]',
|
|
' ': r'\ '
|
|
}))
|
|
return f"'{processed}'"
|
|
|
|
|
|
def create_video(project_dir, image_paths, caption_subtitle_file, speech_subtitle_file,
|
|
audio_file, output_video, audio_durations, panel_start_times, panel_durations):
|
|
try:
|
|
|
|
filter_parts = []
|
|
concat_parts = []
|
|
for i, (img, duration) in enumerate(zip(image_paths, panel_durations)):
|
|
|
|
anim_duration = max(duration * 0.166, 0.2)
|
|
fade_in = min(anim_duration, duration * 0.5)
|
|
fade_out = min(anim_duration, duration - fade_in)
|
|
|
|
filter_part = (
|
|
f"[{i}:v]loop=loop=-1:size=1,trim=duration={duration}[base{i}];"
|
|
f"[base{i}]format=yuva420p,"
|
|
f"fade=in:st=0:d={fade_in}:alpha=1,"
|
|
f"fade=out:st={duration-fade_out}:d={fade_out}:alpha=1[anim{i}];"
|
|
)
|
|
filter_parts.append(filter_part)
|
|
concat_parts.append(f"[anim{i}]")
|
|
concat_str = f"{''.join(concat_parts)}concat=n={len(image_paths)}:v=1:a=0[outv]"
|
|
filter_complex = ''.join(filter_parts) + concat_str
|
|
|
|
temp_video = os.path.join(project_dir, "temp_video.mp4")
|
|
cmd1 = ["ffmpeg", "-y"]
|
|
for img in image_paths:
|
|
cmd1.extend(["-i", img])
|
|
cmd1.extend([
|
|
"-i", audio_file,
|
|
"-filter_complex", filter_complex,
|
|
"-map", "[outv]",
|
|
"-map", f"{len(image_paths)}:a",
|
|
"-c:v", "libx264", "-pix_fmt", "yuv420p",
|
|
"-c:a", "aac", "-strict", "experimental",
|
|
"-vsync", "vfr",
|
|
"-async", "1",
|
|
"-movflags", "+faststart",
|
|
temp_video
|
|
])
|
|
subprocess.run(cmd1, check=True)
|
|
|
|
|
|
combined_filter = (
|
|
f"subtitles={process_sub_path(caption_subtitle_file)}",
|
|
f"subtitles={process_sub_path(speech_subtitle_file)}"
|
|
)
|
|
filter_chain = ",".join(combined_filter)
|
|
|
|
cmd_combined = [
|
|
"ffmpeg", "-y",
|
|
"-i", temp_video,
|
|
"-vf", filter_chain,
|
|
"-c:a", "copy",
|
|
"-c:v", "libx264",
|
|
"-preset", "fast",
|
|
"-movflags", "+faststart",
|
|
output_video
|
|
]
|
|
subprocess.run(cmd_combined, check=True)
|
|
|
|
os.remove(temp_video)
|
|
return output_video
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"FFmpeg failed with cmd: {' '.join(e.cmd)}")
|
|
logger.error(f"FFmpeg stderr: {e.stderr}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {str(e)}")
|
|
return None
|
|
|
|
|
|
def upload_to_local_storage(local_path, relative_path):
|
|
try:
|
|
|
|
storage_dir = os.path.abspath("storage")
|
|
os.makedirs(storage_dir, exist_ok=True)
|
|
|
|
|
|
target_dir = os.path.dirname(os.path.join(storage_dir, relative_path))
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
target_path = os.path.join(storage_dir, relative_path)
|
|
|
|
|
|
shutil.copy2(local_path, target_path)
|
|
|
|
|
|
relative_url = f"/storage/{relative_path.replace(os.sep, '/')}"
|
|
full_url = f"{BASE_URL}{relative_url}"
|
|
return full_url
|
|
except Exception as e:
|
|
logger.error(f"Error copying to local storage: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
@app.post("/api/generate-video")
|
|
async def generate_video(comic_data: ComicData):
|
|
|
|
project_id = str(uuid.uuid4())
|
|
|
|
project_dir = os.path.abspath(os.path.join("temp", project_id))
|
|
os.makedirs(project_dir, exist_ok=True)
|
|
|
|
logger.info(f"Created project directory: {project_dir}")
|
|
|
|
try:
|
|
|
|
image_paths = []
|
|
image_widths = []
|
|
for i, panel_url in enumerate(comic_data.panels):
|
|
output_path = os.path.join(project_dir, f"panel_{i}.jpg")
|
|
path_result, img_width = await download_image(panel_url, output_path)
|
|
if path_result:
|
|
image_paths.append(path_result)
|
|
image_widths.append(img_width)
|
|
else:
|
|
image_widths.append(1920)
|
|
|
|
if not image_paths:
|
|
raise HTTPException(status_code=500, detail="Failed to download images")
|
|
|
|
logger.info(f"Downloaded {len(image_paths)} images")
|
|
|
|
|
|
audio_file, audio_durations, panel_start_times, panel_durations = await create_audio_file(
|
|
project_dir, comic_data.captions, comic_data.speeches
|
|
)
|
|
if not audio_file:
|
|
raise HTTPException(status_code=500, detail="Failed to create audio file")
|
|
|
|
logger.info(f"Created audio file: {audio_file}")
|
|
|
|
|
|
caption_subtitle_file = create_caption_subtitle_file(
|
|
project_dir, comic_data.captions, panel_start_times, panel_durations, image_widths
|
|
)
|
|
if not caption_subtitle_file:
|
|
raise HTTPException(status_code=500, detail="Failed to create caption subtitle file")
|
|
|
|
speech_subtitle_file = create_speech_subtitle_file(
|
|
project_dir, comic_data.speeches, panel_start_times, panel_durations, image_widths
|
|
)
|
|
if not speech_subtitle_file:
|
|
raise HTTPException(status_code=500, detail="Failed to create speech subtitle file")
|
|
|
|
logger.info(f"Created subtitle files: {caption_subtitle_file}, {speech_subtitle_file}")
|
|
|
|
|
|
output_video = os.path.join(project_dir, "output.mp4")
|
|
result = create_video(
|
|
project_dir, image_paths, caption_subtitle_file, speech_subtitle_file,
|
|
audio_file, output_video, audio_durations, panel_start_times, panel_durations
|
|
)
|
|
if not result:
|
|
raise HTTPException(status_code=500, detail="Failed to create video")
|
|
|
|
logger.info(f"Created video: {output_video}")
|
|
|
|
|
|
video_url = upload_to_local_storage(output_video, f"{project_id}/video.mp4")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
shutil.rmtree(project_dir, ignore_errors=True)
|
|
|
|
return {
|
|
"videoUrl": video_url,
|
|
|
|
|
|
|
|
"projectId": project_id
|
|
}
|
|
except Exception as e:
|
|
|
|
shutil.rmtree(project_dir, ignore_errors=True)
|
|
logger.error(f"Error generating video: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/")
|
|
async def health_check():
|
|
return {"status": "ok"}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |