Spaces:
Sleeping
Sleeping
File size: 2,828 Bytes
ecb85c3 a6e0554 ecb85c3 a6e0554 ecb85c3 a6e0554 ecb85c3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch
import numpy as np
import av
import spaces
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf'
processor = LlavaNextVideoProcessor.from_pretrained(model_name)
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_name,
quantization_config=quantization_config,
device_map='auto'
)
@spaces.GPU
def read_video_pyav(container, indices):
'''
Decode the video with PyAV decoder.
Args:
container (av.container.input.InputContainer): PyAV container.
indices (List[int]): List of frame indices to decode.
Returns:
np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
'''
frames = []
container.seek(0)
start_index = indices[0]
end_index = indices[-1]
for i, frame in enumerate(container.decode(video=0)):
if i > end_index:
break
if i >= start_index and i in indices:
frames.append(frame)
return np.stack([x.to_ndarray(format="rgb24") for x in frames])
@spaces.GPU
def process_video(video_file, question):
# Open video and sample frames
with av.open(video_file) as container:
total_frames = container.streams.video[0].frames
indices = np.arange(0, total_frames, total_frames / 8).astype(int)
video_clip = read_video_pyav(container, indices)
# Prepare conversation
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": f"{question}"},
{"type": "video"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
# Prepare inputs for the model
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device)
# Generate output
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}
output = model.generate(**input, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0]
return generated_text.split("ASSISTANT: ", 1)[-1].strip()
# Define Gradio interface
def gradio_interface(video, question):
return process_video(video, question)
iface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Video(label="Upload Video"),
gr.Textbox(label="Enter Question")
],
outputs=gr.Textbox(label="Generated Answer"),
title="Video Question Answering",
description="Upload a video and enter a question to get a generated text response."
)
if __name__ == "__main__":
iface.launch(debug=True)
|