import gradio as gr from transformers import LlavaOnevisionProcessor, LlavaOnevisionForConditionalGeneration, TextIteratorStreamer from threading import Thread import re import time from PIL import Image import torch import cv2 import spaces model_id = "llava-hf/llava-onevision-qwen2-0.5b-ov-hf" processor = LlavaOnevisionProcessor.from_pretrained(model_id) model = LlavaOnevisionForConditionalGeneration.from_pretrained(model_id, torch_dtype=torch.float16) model.to("cuda") def sample_frames(video_file, num_frames): video = cv2.VideoCapture(video_file) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) interval = total_frames // num_frames frames = [] for i in range(total_frames): ret, frame = video.read() pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) if not ret: continue if i % interval == 0: frames.append(pil_img) video.release() return frames def bot_streaming(message, history): txt = message["text"] ext_buffer = f"USER: {txt} ASSISTANT: " if message["files"]: if len(message["files"]) == 1: image = [message.files[0].path] # interleaved images or video elif len(message["files"]) > 1: image = [msg["path"] for msg in message["files"]] else: def has_file_data(lst): return any(isinstance(item, FileData) for sublist in lst if isinstance(sublist, tuple) for item in sublist) def extract_paths(lst): return [item["path"] for sublist in lst if isinstance(sublist, tuple) for item in sublist if isinstance(item, FileData)] latest_text_only_index = -1 for i, item in enumerate(history): if all(isinstance(sub_item, str) for sub_item in item): latest_text_only_index = i image = [path for i, item in enumerate(history) if i < latest_text_only_index and has_file_data(item) for path in extract_paths(item)] if message["files"] is None: gr.Error("You need to upload an image or video for LLaVA to work.") video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg") image_extensions = Image.registered_extensions() image_extensions = tuple([ex for ex, f in image_extensions.items()]) image_list = [] video_list = [] print("media", image) if len(image) == 1: if image[0].endswith(video_extensions): video_list = sample_frames(image[0], 12) prompt = f"USER: