Spaces:
Running
on
Zero
Running
on
Zero
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor | |
import torch | |
import numpy as np | |
import av | |
import spaces | |
import gradio as gr | |
import os | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16 | |
) | |
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' | |
processor = LlavaNextVideoProcessor.from_pretrained(model_name) | |
model = LlavaNextVideoForConditionalGeneration.from_pretrained( | |
model_name, | |
quantization_config=quantization_config, | |
device_map='auto' | |
) | |
def read_video_pyav(container, indices): | |
''' | |
Decode the video with PyAV decoder. | |
Args: | |
container (av.container.input.InputContainer): PyAV container. | |
indices (List[int]): List of frame indices to decode. | |
Returns: | |
np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). | |
''' | |
frames = [] | |
container.seek(0) | |
start_index = indices[0] | |
end_index = indices[-1] | |
for i, frame in enumerate(container.decode(video=0)): | |
if i > end_index: | |
break | |
if i >= start_index and i in indices: | |
frames.append(frame) | |
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) | |
def process_video(video_file, question_parts): | |
# Open video and sample frames | |
with av.open(video_file.name) as container: # Access file name from Gradio input | |
total_frames = container.streams.video[0].frames | |
indices = np.arange(0, total_frames, total_frames / 8).astype(int) | |
video_clip = read_video_pyav(container, indices) | |
# Combine question parts into a single question | |
question = " ".join(question_parts) | |
# Prepare conversation | |
conversation = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": f"{question}"}, | |
{"type": "video"}, | |
], | |
}, | |
] | |
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
# Prepare inputs for the model | |
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) | |
# Generate output | |
generate_kwargs = {"max_new_tokens": 500, "do_sample": False, "top_p": 0.9} | |
output = model.generate(**input, **generate_kwargs) | |
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] | |
return generated_text.split("ASSISTANT: ", 1)[-1].strip() | |
def process_videos(video_files, question): | |
"""Processes multiple videos and answers a single question for each.""" | |
answers = [] | |
for video_file in video_files: | |
video_name = os.path.basename(video_file.name) | |
answer = process_video(video_file, question) | |
answers.append(f"**Video: {video_name}**\n{answer}\n") | |
return "\n---\n".join(answers) | |
# Define Gradio interface for multiple videos | |
def gradio_interface(videos, indoors_outdoors, standing_sitting, hands_free, interacting_screen): | |
question = "Is the subject in the video" | |
if indoors_outdoors: | |
question += "present indoors or outdoors? " | |
if standing_sitting: | |
question += "standing or sitting? " | |
if hands_free: | |
question += "hands free or not? " | |
if interacting_screen: | |
question += "interacting with any screen in the background?" | |
answers = process_videos(videos, question) | |
return answers | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.File(label="Upload Videos", file_count="multiple"), | |
gr.Checkbox(label="Indoors or Outdoors", value=False), | |
gr.Checkbox(label="Standing or Sitting", value=False), | |
gr.Checkbox(label="Hands Free or Not", value=False), | |
gr.Checkbox(label="Interacting with Screen", value=False), | |
], | |
outputs=gr.Textbox(label="Generated Answers"), | |
title="Video Question Answering", | |
description="Upload multiple videos and select questions to get answers." | |
) | |
if __name__ == "__main__": | |
iface.launch(debug=True) |