import os
import base64
import io
import sqlite3
import torch
import gradio as gr
import pandas as pd
from PIL import Image
import requests
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, AutoModelForCausalLM, AutoTokenizer
from huggingface_hub import hf_hub_download
from datasets import load_dataset
import traceback
from tqdm import tqdm
import zipfile
# Define constants for vikhyatk/moondream2 model
MOON_DREAM_MODEL_ID = "vikhyatk/moondream2"
MOON_DREAM_REVISION = "2024-08-26"
# Define constants for the Qwen2-VL models
QWEN2_VL_MODELS = [
'Qwen/Qwen2-VL-7B-Instruct',
'Qwen/Qwen2-VL-2B-Instruct-GPTQ-Int4',
'OpenGVLab/InternVL2-1B',
'Qwen/Qwen2-VL-72B',
]
# List of models to use (combining unique entries from available models and QWEN2_VL_MODELS)
available_models = [
*QWEN2_VL_MODELS, # Expands the QWEN2_VL_MODELS list into the available_models
'microsoft/Phi-3-vision-128k-instruct',
'vikhyatk/moondream2'
]
# List of available Hugging Face datasets
dataset_options = [
"gokaygokay/panorama_hdr_dataset",
"OpenGVLab/CRPE"
]
# List of text prompts to use
text_prompts = [
"Provide a detailed description of the image contents, including all visible objects, people, activities, and extract any text present within the image using Optical Character Recognition (OCR). Organize the extracted text in a structured table format with columns for original text, its translation into English, and the language it is written in.",
"Offer a thorough description of all elements within the image, from objects to individuals and their activities. Ensure any legible text seen in the image is extracted using Optical Character Recognition (OCR). Provide an accurate narrative that encapsulates the full content of the image.",
"Create a four-sentence caption for the image. Start by specifying the style and type, such as painting, photograph, or digital art. In the next sentences, detail the contents and the composition clearly and concisely. Use language suited for prompting a text-to-image model, separating descriptive terms with commas instead of 'or'. Keep the description direct, avoiding interpretive phrases or abstract expressions",
]
# SQLite setup
# def init_db():
# conn = sqlite3.connect('image_outputs.db')
# cursor = conn.cursor()
# cursor.execute('''
# CREATE TABLE IF NOT EXISTS image_outputs (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# image BLOB,
# prompt TEXT,
# output TEXT,
# model_name TEXT
# )
# ''')
# conn.commit()
# conn.close()
def image_to_binary(image_path):
with open(image_path, 'rb') as file:
return file.read()
# def store_in_db(image_path, prompt, output, model_name):
# conn = sqlite3.connect('image_outputs.db')
# cursor = conn.cursor()
# image_blob = image_to_binary(image_path)
# cursor.execute('''
# INSERT INTO image_outputs (image, prompt, output, model_name)
# VALUES (?, ?, ?, ?)
# ''', (image_blob, prompt, output, model_name))
# conn.commit()
# conn.close()
# Function to encode an image to base64 for HTML display
def encode_image(image):
img_buffer = io.BytesIO()
image.save(img_buffer, format="PNG")
img_str = base64.b64encode(img_buffer.getvalue()).decode("utf-8")
return f''
# Function to load and display images from the panorama_hdr_dataset
def load_dataset_images(dataset_name, num_images):
try:
dataset = load_dataset(dataset_name, split='train')
images = []
for i, item in enumerate(dataset[:num_images]):
if 'image' in item:
img = item['image']
print (type(img))
encoded_img = encode_image(img)
metadata = f"Width: {img.width}, Height: {img.height}"
if 'hdr' in item:
metadata += f", HDR: {item['hdr']}"
images.append(f"
Image {i+1}
{encoded_img}
{metadata}
")
if not images:
return "No images could be loaded from this dataset. Please check the dataset structure."
return "".join(images)
except Exception as e:
print(f"Error loading dataset: {e}")
traceback.print_exc()
# Function to generate output
def generate_output(model, processor, prompt, image, model_name, device):
try:
image_bytes = io.BytesIO()
image.save(image_bytes, format="PNG")
image_bytes = image_bytes.getvalue()
if model_name in QWEN2_VL_MODELS:
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image_bytes},
{"type": "text", "text": prompt},
]
}
]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(
text=[text],
images=[Image.open(io.BytesIO(image_bytes))],
padding=True,
return_tensors="pt",
)
inputs = {k: v.to(device) for k, v in inputs.items()}
generated_ids = model.generate(**inputs, max_new_tokens=1024)
generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs['input_ids'], generated_ids)]
response_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
return response_text
elif model_name == 'microsoft/Phi-3-vision-128k-instruct':
messages = [{"role": "user", "content": f"<|image_1|>\n{prompt}"}]
prompt = processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(prompt, [image], return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
generate_ids = model.generate(**inputs, eos_token_id=processor.tokenizer.eos_token_id, max_new_tokens=1024)
generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
response_text = processor.batch_decode(generate_ids, skip_special_tokens=True)[0]
return response_text
elif model_name == 'vikhyatk/moondream2':
tokenizer = AutoTokenizer.from_pretrained(MOON_DREAM_MODEL_ID, revision=MOON_DREAM_REVISION)
enc_image = model.encode_image(image)
response_text = model.answer_question(enc_image, prompt, tokenizer)
return response_text
except Exception as e:
return f"Error during generation with model {model_name}: {e}"
# Function to list and encode images from a directory
def list_images(directory_path):
images = []
for filename in os.listdir(directory_path):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
image_path = os.path.join(directory_path, filename)
encoded_img = encode_image(image_path)
images.append({
"filename": filename,
"image": encoded_img
})
return images
# Function to extract images from a ZIP file
# Function to extract images from a ZIP file
def extract_images_from_zip(zip_file):
images = []
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
for file_info in zip_ref.infolist():
if file_info.filename.lower().endswith(('.png', '.jpg', '.jpeg')):
with zip_ref.open(file_info) as file:
try:
img = Image.open(file)
img = img.convert("RGB") # Ensure the image is in RGB mode
encoded_img = img
images.append({
"filename": file_info.filename,
"image": encoded_img
})
except Exception as e:
print(f"Error opening image {file_info.filename}: {e}")
return images
# Gradio interface function for running inference
def run_inference(model_names, dataset_input, num_images_input, prompts, device_map, torch_dtype, trust_remote_code,use_flash_attn, use_zip, zip_file):
data = []
torch_dtype_value = torch.float16 if torch_dtype == "torch.float16" else torch.float32
device_map_value = "cuda" if torch.cuda.is_available() else "cpu" if device_map == "auto" else device_map
model_processors = {}
for model_name in model_names:
try:
if model_name in QWEN2_VL_MODELS:
model = Qwen2VLForConditionalGeneration.from_pretrained(
model_name,
torch_dtype=torch_dtype_value,
device_map=device_map_value
).eval()
processor = AutoProcessor.from_pretrained(model_name)
elif model_name == 'microsoft/Phi-3-vision-128k-instruct':
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map=device_map_value,
torch_dtype=torch_dtype_value,
trust_remote_code=trust_remote_code,
use_flash_attn=use_flash_attn
).eval()
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=trust_remote_code)
elif model_name == 'vikhyatk/moondream2':
model = AutoModelForCausalLM.from_pretrained(
MOON_DREAM_MODEL_ID,
trust_remote_code=True,
revision=MOON_DREAM_REVISION
).eval()
processor = None # No processor needed for this model
model_processors[model_name] = (model, processor)
except Exception as e:
print(f"Error loading model {model_name}: {e}")
try:
# Load images from the ZIP file if use_zip is True
if use_zip:
images = extract_images_from_zip(zip_file)
print ("Number of images in zip:" , len(images))
for img in tqdm(images):
try:
img_data = img['image']
if not isinstance(img_data, str):
# Convert the Image object to a base64-encoded string
img_buffer = io.BytesIO()
img['image'].save(img_buffer, format="PNG")
img_data = base64.b64encode(img_buffer.getvalue()).decode("utf-8")
img_data=f''
row_data = {"Image": img_data} # Assuming encode_image is defined elsewhere
for model_name in model_names:
if model_name in model_processors:
model, processor = model_processors[model_name]
for prompt in prompts:
try:
# Ensure image is defined
image = img['image']
response_text = generate_output(model, processor, prompt, image, model_name, device_map_value)
row_data[f"{model_name}_Response_{prompt}"] = response_text
except Exception as e:
row_data[f"{model_name}_Response_{prompt}"] = f"Error during generation with model {model_name}: {e}"
traceback.print_exc()
data.append(row_data)
except Exception as e:
print(f"Error processing image {img['filename']}: {e}")
traceback.print_exc()
# Load the dataset if use_zip is False
else:
dataset = load_dataset(dataset_input, split='train')
for i in tqdm(range(num_images_input)):
if dataset_input == "OpenGVLab/CRPE":
image = dataset[i]['image']
elif dataset_input == "gokaygokay/panorama_hdr_dataset":
image = dataset[i]['png_image']
else:
image = dataset[i]['image']
encoded_img = encode_image(image)
row_data = {"Image": encoded_img}
for model_name in model_names:
if model_name in model_processors:
model, processor = model_processors[model_name]
for prompt in prompts:
try:
response_text = generate_output(model, processor, prompt, image, model_name, device_map_value)
row_data[f"{model_name}_Response_{prompt}"] = response_text
except Exception as e:
row_data[f"{model_name}_Response_{prompt}"] = f"Error during generation with model {model_name}: {e}"
data.append(row_data)
except Exception as e:
print(f"Error loading dataset: {e}")
traceback.print_exc()
return pd.DataFrame(data).to_html(escape=False)
# Gradio UI setup
def create_gradio_interface():
css = """
#output {
height: 500px;
overflow: auto;
}
"""
with gr.Blocks(css=css) as demo:
# Title
gr.Markdown("# VLM-Image-Analysis: A Vision-and-Language Modeling Framework.")
gr.Markdown("""
- Handle a batch of images from a ZIP file OR
- Processes images from an HF DB
- Compatible with png, jpg, jpeg, and webp formats
- Compatibility with various AI models: Qwen2-VL-7B-Instruct, Qwen2-VL-2B-Instruct-GPTQ-Int4, InternVL2-1B, Qwen2-VL-72B, /Phi-3-vision-128k-instruct and moondream2""")
# image_path = os.path.abspath("static/image.jpg")
# gr.Image(value=image_path, label="HF Image", width=300, height=300)
with gr.Tab("VLM model and Dataset selection"):
gr.Markdown("### Dataset Selection: HF or from a ZIP file.")
with gr.Accordion("Advanced Settings", open=True):
with gr.Row():
# with gr.Column():
use_zip_input = gr.Checkbox(label="Use ZIP File", value=False)
dataset_input = gr.Dropdown(choices=dataset_options, label="Select Dataset", value=dataset_options[1], visible=True)
num_images_input = gr.Radio(choices=[1, 5, 20], label="Number of Images", value=5)
zip_file_input = gr.File(label="Upload ZIP File of Images", file_types=[".zip"])
gr.Markdown("### VLM Model Selection")
with gr.Row():
with gr.Column():
models_input = gr.CheckboxGroup(choices=available_models, label="Select Models", value=available_models[4])
prompts_input = gr.CheckboxGroup(choices=text_prompts, label="Select Prompts", value=text_prompts[2])
submit_btn = gr.Button("Run Inference")
with gr.Row():
output_display = gr.HTML(label="Results")
with gr.Tab("GPU Device Settings"):
device_map_input = gr.Radio(choices=["auto", "cpu", "cuda"], label="Device Map", value="auto")
torch_dtype_input = gr.Radio(choices=["torch.float16", "torch.float32"], label="Torch Dtype", value="torch.float16")
trust_remote_code_input = gr.Checkbox(label="Trust Remote Code", value=True)
use_flash_attn = gr.Checkbox(label="Use flash-attn 2 (Ampere GPUs or newer.)", value=False)
def run_inference_wrapper(model_names, dataset_input, num_images_input, prompts, device_map, torch_dtype, trust_remote_code,use_flash_attn, use_zip, zip_file):
return run_inference(model_names, dataset_input, num_images_input, prompts, device_map, torch_dtype, trust_remote_code,use_flash_attn, use_zip, zip_file)
def toggle_dataset_visibility(use_zip):
return gr.update(visible=not use_zip)
submit_btn.click(
fn=run_inference_wrapper,
inputs=[models_input, dataset_input, num_images_input, prompts_input, device_map_input, torch_dtype_input, trust_remote_code_input,use_flash_attn, use_zip_input, zip_file_input],
outputs=output_display
)
use_zip_input.change(
fn=toggle_dataset_visibility,
inputs=use_zip_input,
outputs=dataset_input
)
demo.launch(debug=True, share=False)
if __name__ == "__main__":
create_gradio_interface()