|
import os |
|
import sys |
|
|
|
os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
|
os.makedirs("/tmp/huggingface_cache", exist_ok=True) |
|
os.environ["PYTHONUSERBASE"] = "/tmp/.local" |
|
os.makedirs("/tmp/.local", exist_ok=True) |
|
|
|
user_site = os.path.join(os.environ["PYTHONUSERBASE"], "lib", "python3.9", "site-packages") |
|
|
|
|
|
if user_site not in sys.path: |
|
sys.path.insert(0, user_site) |
|
|
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache" |
|
|
|
|
|
import re |
|
import shutil |
|
import subprocess |
|
import time |
|
import uuid |
|
from threading import Timer |
|
|
|
|
|
from flask import Flask, render_template, request, url_for, send_from_directory |
|
from google import genai |
|
|
|
|
|
from kokoro import KPipeline |
|
import soundfile as sf |
|
import numpy as np |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
API_KEY = os.environ.get("GOOGLE_API_KEY") |
|
if not API_KEY: |
|
raise ValueError("Missing GOOGLE_API_KEY environment variable.") |
|
client = genai.Client(api_key=API_KEY) |
|
|
|
|
|
media_dir = os.path.join("/tmp", "manim_media") |
|
os.makedirs(media_dir, exist_ok=True) |
|
|
|
@app.route("/", methods=["GET", "POST"]) |
|
def index(): |
|
if request.method == "POST": |
|
prompt = request.form.get("prompt") |
|
if not prompt: |
|
return render_template("index.html") |
|
|
|
max_retries = 3 |
|
attempt = 0 |
|
last_error = None |
|
while attempt < max_retries: |
|
try: |
|
|
|
ai_response = client.models.generate_content( |
|
model="gemini-2.0-flash-lite-preview-02-05", |
|
contents=f"""You are 'Manimator', an expert Manim animator and coder. |
|
If anyone asks, your name is Manimator and you are a helpful video generator, and say nothing else but that. |
|
The user wants you to code this: {prompt}. |
|
Plan out in chain of thought what you are going to do first, then give the final code output in ```python``` codeblock. |
|
Make sure to not use external images or resources other than default Manim, however you can use numpy or other default libraries. |
|
Keep the scene uncluttered and aesthetically pleasing. |
|
Make sure things are not overlapping unless explicitly stated otherwise. |
|
It is crucial that the script works correctly on the first try, so make sure to think about the layout and storyboard and stuff of the scene. |
|
Make sure to think through what you are going to do and think about the topic before you write the code. |
|
In addition, write a commentary script inside of ```script``` codeblock. This should be short and fit the content and align with the timing of the scene. Use "..." if needed to add a bit of a pause. |
|
You got this!! <3 |
|
""" |
|
) |
|
|
|
|
|
code_pattern = r"```python\s*(.*?)\s*```" |
|
code_match = re.search(code_pattern, ai_response.text, re.DOTALL) |
|
if not code_match: |
|
raise Exception("No python code block found in the AI response.") |
|
code = code_match.group(1) |
|
|
|
|
|
script_pattern = r"```script\s*(.*?)\s*```" |
|
script_match = re.search(script_pattern, ai_response.text, re.DOTALL) |
|
if not script_match: |
|
raise Exception("No script block found in the AI response.") |
|
script = script_match.group(1) |
|
|
|
|
|
scene_match = re.search(r"class\s+(\w+)\(.*Scene.*\):", code) |
|
scene_name = scene_match.group(1) if scene_match else "MyScene" |
|
|
|
|
|
code_filename = f"generated_video_{uuid.uuid4().hex}.py" |
|
video_filename = f"output_video_{uuid.uuid4().hex}.mp4" |
|
|
|
|
|
code_filepath = os.path.join("/tmp", code_filename) |
|
with open(code_filepath, "w") as f: |
|
f.write(code) |
|
|
|
|
|
|
|
audio_pipeline = KPipeline(lang_code='a') |
|
|
|
audio_generator = audio_pipeline(script, voice='af_heart', speed=1, split_pattern=r'\n+') |
|
|
|
audio_segments = [] |
|
for _, _, audio in audio_generator: |
|
audio_segments.append(audio) |
|
|
|
if not audio_segments: |
|
raise Exception("No audio segments were generated from the commentary script.") |
|
|
|
|
|
full_audio = np.concatenate(audio_segments) |
|
commentary_audio_filename = f"commentary_{uuid.uuid4().hex}.wav" |
|
commentary_audio_path = os.path.join("/tmp", commentary_audio_filename) |
|
sf.write(commentary_audio_path, full_audio, 24000) |
|
|
|
|
|
|
|
cmd = [ |
|
"manim", |
|
"-qm", |
|
"--media_dir", media_dir, |
|
"-o", video_filename, |
|
code_filepath, |
|
scene_name |
|
] |
|
try: |
|
subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
except subprocess.CalledProcessError as cpe: |
|
app.logger.error("Manim error output: %s", cpe.stderr) |
|
raise Exception(f"Manim failed: {cpe.stderr}") |
|
|
|
|
|
expected_dir = os.path.join(media_dir, "videos", code_filename.replace(".py", ""), "720p30") |
|
video_path_in_media = os.path.join(expected_dir, video_filename) |
|
if not os.path.exists(video_path_in_media): |
|
raise Exception(f"Manim did not produce the expected output file at {video_path_in_media}") |
|
|
|
|
|
tmp_video_path = os.path.join("/tmp", video_filename) |
|
shutil.move(video_path_in_media, tmp_video_path) |
|
|
|
|
|
final_video_filename = f"final_video_{uuid.uuid4().hex}.mp4" |
|
final_video_path = os.path.join("/tmp", final_video_filename) |
|
|
|
ffmpeg_cmd = [ |
|
"ffmpeg", "-y", |
|
"-i", tmp_video_path, |
|
"-i", commentary_audio_path, |
|
"-c:v", "copy", |
|
"-c:a", "aac", |
|
"-shortest", |
|
final_video_path |
|
] |
|
try: |
|
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True) |
|
except subprocess.CalledProcessError as cpe: |
|
app.logger.error("FFmpeg error output: %s", cpe.stderr) |
|
raise Exception(f"FFmpeg failed: {cpe.stderr}") |
|
|
|
|
|
def remove_files(): |
|
for fpath in [tmp_video_path, code_filepath, commentary_audio_path, final_video_path]: |
|
try: |
|
if os.path.exists(fpath): |
|
os.remove(fpath) |
|
except Exception as e: |
|
app.logger.error("Error removing file %s: %s", fpath, e) |
|
|
|
Timer(600, remove_files).start() |
|
|
|
|
|
video_url = url_for('get_video', filename=final_video_filename) |
|
return render_template("result.html", video_url=video_url) |
|
|
|
except Exception as e: |
|
app.logger.error("Attempt %d failed: %s", attempt + 1, e) |
|
last_error = e |
|
attempt += 1 |
|
time.sleep(1) |
|
|
|
return render_template("result.html", error="An error occurred. Please try again later.") |
|
|
|
return render_template("index.html") |
|
|
|
@app.route("/video/<filename>") |
|
def get_video(filename): |
|
return send_from_directory("/tmp", filename) |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860, debug=False) |
|
|