import os import asyncio import numpy as np import gradio as gr from PIL import Image from io import BytesIO from moviepy import VideoFileClip import matplotlib.pyplot as plt import base64 from TStar.TStarFramework import run_tstar def img2base64(image_path): return base64.b64encode(open(image_path, "rb").read()).decode("utf-8") def create_timeline(frame_times, duration): """ Creates a timeline visualization for the sampled frames. """ fig, ax = plt.subplots(figsize=(10, 2)) ax.set_xlim(0, duration) ax.hlines(0.5, 0, duration, colors="gray", linestyles="dotted") ax.plot(frame_times, [0.5] * len(frame_times), 'ro') ax.set_xlabel("Time (s)") buf = BytesIO() fig.savefig(buf, format='png') buf.seek(0) plt.close(fig) return Image.open(buf) def analyze_and_sample_frames( video_file, question, openai_api_key, num_frames=8, batch=1, total_batches=3 ): """ 结合后端 run_tstar 函数,对视频进行关键帧搜索,并在前端生成可视化所需的结果: - metadata: 记录后端结果与问题、答案等信息 - frames: PIL 图像列表,用于在 Gradio Gallery 显示 - frame_times: 关键帧时间戳列表(秒) - timeline_image: 带有关键帧标注的时间线图像 """ if not os.path.exists(video_file): print("video_file does not exist:", video_file) return None, None, None, None if not question: question = "No question provided" options = "Freeform Question" # 你也可以根据 batch / total_batches 动态改变 search 的参数 # 例如 batch 越大,search_budget 越大;或者直接固定即可 # 这里只做演示,不做复杂逻辑 results = run_tstar( video_path=video_file, question=question, options=options, grounder="gpt-4o", heuristic="owl-vit", device="cuda:0", search_nframes=num_frames, grid_rows=4, grid_cols=4, confidence_threshold=0.6, search_budget=0.5, output_dir='./output', openai_api_key=openai_api_key ) # 从后端结果解析关键信息 frame_times = results.get("Frame Timestamps", []) answer = results.get("Answer", "No answer") grounding_objects = results.get("Grounding Objects", []) # 截取关键帧图像 frames = [] clip = VideoFileClip(video_file) video_duration = clip.duration for t in frame_times: # 确保时间戳不超过视频长度 if t > video_duration: t = video_duration frame_img = clip.get_frame(t) # 取对应秒的帧,返回 (H,W,3) numpy frame_pil = Image.fromarray(frame_img.astype(np.uint8)) frames.append(frame_pil) clip.close() # 生成时间线图像 timeline_image = create_timeline(frame_times, duration=video_duration) # 生成元数据(可根据需要增减字段) metadata = { "batch": batch, "total_batches": total_batches, "question": question, "answer": answer, "grounding_objects": grounding_objects, "frame_times": frame_times } return metadata, frames, frame_times, timeline_image def switch_batch(state_batches, selected_batch): """ Switches the display to the selected batch. """ if not selected_batch or selected_batch == "": return None, None, None, None batch_index = int(selected_batch.split()[-1]) - 1 timeline_image, frames, metadata = state_batches[batch_index] return ( gr.update(value=timeline_image, visible=True), gr.update(value=frames, visible=True), gr.update(value=metadata, visible=True), selected_batch, ) async def process_video_iteratively_with_state(video_file, question_input, openai_api_key_input, state_batches, current_display_batch, total_batches=1, num_frames=8): """ Processes the video and samples frames iteratively. """ if not video_file: yield None, None, None, "No video uploaded!", None, state_batches, current_display_batch return metadata = None for batch in range(1, total_batches + 1): metadata, frames, frame_times, timeline_image = analyze_and_sample_frames( video_file, question=question_input, openai_api_key=openai_api_key_input, num_frames=num_frames, batch=batch, total_batches=total_batches ) if metadata is None: continue state_batches.append((timeline_image, frames, metadata)) batch_choices = [f"Batch {i + 1}" for i in range(len(state_batches))] if current_display_batch is None or current_display_batch == f"Batch {batch - 1}": current_display_batch = f"Batch {batch}" yield ( gr.update(value=timeline_image, visible=True), gr.update(value=frames, visible=True), gr.update(value=metadata, visible=True), f"Processing Batch: {batch} / Total Batches: {total_batches}", gr.update(choices=batch_choices, value=f"Batch {batch}", visible=True), state_batches, current_display_batch, ) await asyncio.sleep(0.5) def generate_header(base64_logo, title="⭐ T - Efficient Long Video QA Tool"): """ Generates the header section for the app. """ return f""" <h1 style="text-align: center; font-size: 3em; color: #4CAF50; font-family: 'Open Sans', sans-serif; margin-bottom: 20px;">{title}</h1> <div style="display: flex; justify-content: center; align-items: center; height: 333px;"> <img src="data:image/png;base64,{base64_logo}" alt="Logo" style="width: auto; height: 300px; border-radius: 10px; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);"> </div> <div style="display: flex; justify-content: center; align-items: center; margin-top: 20px;"> <h2 style="text-align: center; font-size: 2em; color: #333; margin-bottom: 30px;">📖 How to Use?</h2> </div> """ def generate_instruction(step, title, description): """ Generates a single instruction card. """ return f""" <div style="background-color: #F9F9F9; padding: 20px; border-radius: 10px; box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); height: 150px; display: flex; flex-direction: column; justify-content: flex-start;"> <h3 style="font-size: 1.5em; color: #4CAF50; font-family: 'Open Sans', sans-serif; margin-bottom: 10px;">Step {step}: {title}</h3> <p style="font-size: 1em; color: #666; line-height: 1.5; margin: 0;"> {description} </p> </div> """ def create_ui_components(default_video_path): """ Creates the UI components for the Gradio application. """ # Layout in two columns with gr.Row(equal_height=True, elem_id="video-container"): with gr.Column(scale=1, min_width=300): # Left Column: Video Upload gr.Markdown(""" <br> <h2 style="color: #333;">Upload Your Video</h3> <p style="color: #666; font-size: 0.9em;">You can upload a sample video or provide your own video for analysis.</p> """) video_input = gr.File( label="Select Video", type="filepath", value=default_video_path, interactive=True ) with gr.Column(scale=1.5, min_width=400): # Right Column: Video Preview gr.Markdown(""" <br> <h2 style="color: #333;">Video Preview</h3> <p style="color: #666; font-size: 0.9em;">View your video here before starting the analysis.</p> """) video_preview = gr.Video( label="Preview", value=default_video_path, visible=True, autoplay=True, loop=True, ) # add a textbox to input question openai_api_key_input = gr.Textbox( label="Provide your OpenAI API Key", placeholder="sk-...", value="sk-...", type="text", elem_id="openai-api-key-input", ) question_input = gr.Textbox( label="Ask a Question", placeholder="", value="Where's the microwave? A. Under the cabinet B. On top of the refrigerator C. Next to the stove D. Beside the sink E. In the pantry", type="text", elem_id="question-input", ) submit_button = gr.Button( "Analyze!", elem_id="analyze-button", ) # Add a new component for displaying the video preview state_batches = gr.State([]) # Stores all generated batch data current_display_batch = gr.State(None) # Tracks the currently displayed batch output_timeline = gr.Image(label="Video Timeline", type="pil", visible=False) output_frames = gr.Gallery(label="Sampled Frames", columns=8, visible=False, height=200) batch_status = gr.Text(label="Batch Status", value="No Batch Processed Yet", visible=True) batch_selector = gr.Dropdown(choices=[], label="Select Batch", visible=False) output_metadata = gr.JSON(label="Video Metadata", visible=False) return ( openai_api_key_input, video_input, question_input, submit_button, video_preview, # Add the video preview component state_batches, current_display_batch, output_timeline, output_frames, batch_status, batch_selector, output_metadata, ) def update_video_preview(video_file, default_video_path): return gr.update(value=(video_file.name if video_file else default_video_path), visible=True, autoplay=True, loop=True) if __name__ == "__main__": # Default sample video path sample_video_path = "data/sample.mp4" logo_path = "data/logo.png" base64_logo = img2base64(logo_path) with gr.Blocks() as demo: # Add header gr.Markdown(generate_header(base64_logo)) # Add instructions steps = [ ("Upload", "Sample video is provided. You can also upload your own!<br>Click <strong>Video Preview</strong> to preview it."), ("Analyze", "Ask a question and click <strong>'Analyze'</strong>.<br>The system will track keyframes to answer your question."), ("Visualize", "View keyframes with their sample distribution.<br>Explore keyframe tracking dynamics visually!"), ] with gr.Row(equal_height=True, elem_id="instructions-container"): for i, (title, description) in enumerate(steps, start=1): with gr.Column(scale=1, min_width=100): gr.Markdown(generate_instruction(i, title, description)) ( openai_api_key_input, video_input, question_input, submit_button, video_preview, # Video preview component state_batches, current_display_batch, output_timeline, output_frames, batch_status, batch_selector, output_metadata, ) = create_ui_components(sample_video_path) video_input.change( fn=update_video_preview, inputs=[video_input, gr.State(sample_video_path)], outputs=video_preview, ) submit_button.click( fn=process_video_iteratively_with_state, inputs=[video_input, question_input, openai_api_key_input, state_batches, current_display_batch], outputs=[ output_timeline, output_frames, output_metadata, batch_status, batch_selector, state_batches, current_display_batch, ], ) batch_selector.change( fn=switch_batch, inputs=[state_batches, batch_selector], outputs=[output_timeline, output_frames, output_metadata, current_display_batch], ) # Launch Gradio application demo.launch(share=True, server_name="0.0.0.0", server_port=8088)