|
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor |
|
import torch |
|
import numpy as np |
|
import av |
|
import spaces |
|
import gradio as gr |
|
import os |
|
|
|
quantization_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_compute_dtype=torch.float16 |
|
) |
|
|
|
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' |
|
|
|
processor = LlavaNextVideoProcessor.from_pretrained(model_name) |
|
model = LlavaNextVideoForConditionalGeneration.from_pretrained( |
|
model_name, |
|
quantization_config=quantization_config, |
|
device_map='auto' |
|
) |
|
|
|
@spaces.GPU |
|
def read_video_pyav(container, indices): |
|
''' |
|
Decode the video with PyAV decoder. |
|
Args: |
|
container (av.container.input.InputContainer): PyAV container. |
|
indices (List[int]): List of frame indices to decode. |
|
Returns: |
|
np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). |
|
''' |
|
frames = [] |
|
container.seek(0) |
|
start_index = indices[0] |
|
end_index = indices[-1] |
|
for i, frame in enumerate(container.decode(video=0)): |
|
if i > end_index: |
|
break |
|
if i >= start_index and i in indices: |
|
frames.append(frame) |
|
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) |
|
|
|
@spaces.GPU |
|
def process_video(video_file, question): |
|
|
|
with av.open(video_file.name) as container: |
|
total_frames = container.streams.video[0].frames |
|
indices = np.arange(0, total_frames, total_frames / 8).astype(int) |
|
video_clip = read_video_pyav(container, indices) |
|
|
|
|
|
conversation = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": f"{question}"}, |
|
{"type": "video"}, |
|
], |
|
}, |
|
] |
|
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) |
|
|
|
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) |
|
|
|
|
|
generate_kwargs = {"max_new_tokens": 500, "do_sample": False, "top_p": 0.9} |
|
output = model.generate(**input, **generate_kwargs) |
|
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] |
|
|
|
return generated_text.split("ASSISTANT: ", 1)[-1].strip() |
|
|
|
@spaces.GPU |
|
def process_videos(video_files, question): |
|
"""Processes multiple videos and answers a single question for each.""" |
|
answers = [] |
|
for video_file in video_files: |
|
video_name = os.path.basename(video_file.name) |
|
answer = process_video(video_file, question) |
|
answers.append(f"**Video: {video_name}**\n{answer}\n") |
|
return "\n---\n".join(answers) |
|
|
|
|
|
def gradio_interface(videos, question): |
|
answers = process_videos(videos, question) |
|
return answers |
|
|
|
iface = gr.Interface( |
|
fn=gradio_interface, |
|
inputs=[ |
|
gr.File(label="Upload Videos", file_count="multiple"), |
|
gr.Textbox(label="Enter Your Question") |
|
], |
|
outputs=gr.Textbox(label="Generated Answers"), |
|
title="Video Question Answering", |
|
description="Upload multiple videos and ask a single question to receive answers tailored to each video." |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch(debug=True) |