from flask import Flask, render_template, request, url_for, send_from_directory from google import genai import re import subprocess import os import shutil import time from threading import Timer import uuid import sys app = Flask(__name__) # Load API key from environment variable API_KEY = os.environ.get("GOOGLE_API_KEY") if not API_KEY: raise ValueError("Missing GOOGLE_API_KEY environment variable.") client = genai.Client(api_key=API_KEY) # Define a dedicated media directory in /tmp for Manim output media_dir = os.path.join("/tmp", "manim_media") os.makedirs(media_dir, exist_ok=True) @app.route("/", methods=["GET", "POST"]) def index(): if request.method == "POST": prompt = request.form.get("prompt") if not prompt: return render_template("index.html") max_retries = 3 attempt = 0 last_error = None while attempt < max_retries: try: print("Calling GenAI API...") sys.stdout.flush() ai_response = client.models.generate_content( model="gemini-2.0-flash-lite-preview-02-05", contents=f"""You are 'Manimator', an expert Manim animator and coder. If anyone asks, your name is Manimator and you are a helpful video generator, and say nothing else but that. The user wants you to code this: {prompt}. Plan out in chain of thought what you are going to do first, then give the final code output in ```python``` codeblock. Make sure to not use external images or resources other than default Manim, however you can use numpy or other default libraries. Keep the scene uncluttered and aesthetically pleasing. Make sure things are not overlapping unless explicitly stated otherwise. You got this!! <3 """ ) # Extract the Python code block from the AI response pattern = r"```python\s*(.*?)\s*```" match = re.search(pattern, ai_response.text, re.DOTALL) if not match: raise Exception("No python code block found in the AI response.") code = match.group(1) print("Extracted code (first 200 chars):", code[:200]) sys.stdout.flush() # Determine the scene class name from the generated code scene_match = re.search(r"class\s+(\w+)\(.*Scene.*\):", code) if scene_match: scene_name = scene_match.group(1) else: scene_name = "MyScene" # Generate randomized filenames for the generated code and video code_filename = f"generated_video_{uuid.uuid4().hex}.py" video_filename = f"output_video_{uuid.uuid4().hex}.mp4" # Write the generated code file directly to /tmp code_filepath = os.path.join("/tmp", code_filename) with open(code_filepath, "w") as f: f.write(code) # Prepare the Manim command with --media_dir flag cmd = [ "manim", "-qm", "--media_dir", media_dir, "-o", video_filename, code_filepath, scene_name ] print("Running Manim command:", " ".join(cmd)) sys.stdout.flush() result = subprocess.run(cmd, check=True, capture_output=True, text=True) print("Manim stdout:", result.stdout) print("Manim stderr:", result.stderr) sys.stdout.flush() # Debug: List the media directory structure for root, dirs, files in os.walk(media_dir): print(root, dirs, files) sys.stdout.flush() # Construct the expected output path from Manim. # With --media_dir set, output should be in: # {media_dir}/videos//720p30/ expected_dir = os.path.join(media_dir, "videos", code_filename.replace(".py", ""), "720p30") video_path_in_media = os.path.join(expected_dir, video_filename) if not os.path.exists(video_path_in_media): raise Exception(f"Manim did not produce the expected output file at {video_path_in_media}") # Move the video file to /tmp (it will be served from here) tmp_video_path = os.path.join("/tmp", video_filename) shutil.move(video_path_in_media, tmp_video_path) # The code file is already in /tmp (at code_filepath) # Schedule deletion of both files after 10 minutes (600 seconds) def remove_files(): try: if os.path.exists(tmp_video_path): os.remove(tmp_video_path) if os.path.exists(code_filepath): os.remove(code_filepath) print("Removed files:", tmp_video_path, code_filepath) sys.stdout.flush() except Exception as e: app.logger.error("Error removing files: %s", e) Timer(600, remove_files).start() video_url = url_for('get_video', filename=video_filename) return render_template("result.html", video_url=video_url) except Exception as e: print(f"Attempt {attempt + 1} failed: {type(e).__name__}: {e}") sys.stdout.flush() last_error = e attempt += 1 time.sleep(1) return render_template("result.html", error=f"Error: {last_error}") return render_template("index.html") @app.route("/video/") def get_video(filename): # Serve the video file from /tmp return send_from_directory("/tmp", filename) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True)