Camil Ziane
init space
74b17e0
import argparse
import torch
import os
import json
import random
import numpy as np
from tqdm import tqdm
import shortuuid
from tinyllava.utils import *
from tinyllava.data import *
from tinyllava.model import *
from PIL import Image
import math
def split_list(lst, n):
"""Split a list into n (roughly) equal-sized chunks"""
chunk_size = math.ceil(len(lst) / n) # integer division
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]
def get_chunk(lst, n, k):
chunks = split_list(lst, n)
return chunks[k]
def parse_multi_choice_response(response, all_choices, index2ans):
"""
Parse the prediction from the generated response.
Return the predicted index e.g., A, B, C, D.
"""
for char in [",", ".", "!", "?", ";", ":", "'"]:
response = response.strip(char)
response = " " + response + " " # add space to avoid partial match
index_ans = True
ans_with_brack = False
candidates = []
for choice in all_choices: # e.g., (A) (B) (C) (D)
if f"({choice})" in response:
candidates.append(choice)
ans_with_brack = True
if len(candidates) == 0:
for choice in all_choices: # e.g., A B C D
if f" {choice} " in response:
candidates.append(choice)
# if all above doesn't get candidates, check if the content is larger than 5 tokens and try to parse the example
if len(candidates) == 0 and len(response.split()) > 5:
for index, ans in index2ans.items():
if ans.lower() in response.lower():
candidates.append(index)
index_ans = False # it's content ans.
if len(candidates) == 0: # still not get answer, randomly choose one.
pred_index = random.choice(all_choices)
elif len(candidates) > 1:
start_indexes = []
if index_ans:
if ans_with_brack:
for can in candidates:
index = response.rfind(f"({can})")
start_indexes.append(index) # -1 will be ignored anyway
# start_indexes = [generated_response.index(f'({can})') for can in candidates]
else:
for can in candidates:
index = response.rfind(f" {can} ")
start_indexes.append(index)
else:
for can in candidates:
index = response.lower().rfind(index2ans[can].lower())
start_indexes.append(index)
# get the last one
pred_index = candidates[np.argmax(start_indexes)]
else: # if only one candidate, use it.
pred_index = candidates[0]
return pred_index
def eval_model(args):
# Model
disable_torch_init()
model_path = os.path.expanduser(args.model_path)
model, tokenizer, image_processor, context_len = load_pretrained_model(model_path)
text_processor = TextPreprocess(tokenizer, args.conv_mode)
data_args = model.config
image_processor = ImagePreprocess(image_processor, data_args)
questions = json.load(open(os.path.expanduser(args.question_file), "r"))
questions = get_chunk(questions, args.num_chunks, args.chunk_idx)
answers_file = os.path.expanduser(args.answers_file)
os.makedirs(os.path.dirname(answers_file), exist_ok=True)
ans_file = open(answers_file, "w")
model.to(device="cuda")
for i, line in enumerate(tqdm(questions)):
idx = line["id"]
question = line["prompt"]
if "image" in line:
image_file = line["image"]
# image = Image.open(image_file).convert("RGB")
image = Image.open(os.path.join(args.image_folder, image_file)).convert("RGB")
image_sizes = [image.size]
image = image_processor(image)
images = image.unsqueeze(0).half().cuda()
question = "<image>" + "\n" + question
else:
images = None
image_sizes = None
msg = Message()
msg.add_message(question)
# print(msg.messages)
result = text_processor(msg.messages, mode='eval')
# print(result["prompt"])
input_ids = result['input_ids']
input_ids = input_ids.unsqueeze(0).cuda()
with torch.inference_mode():
if images is not None:
output_ids = model.generate(
input_ids,
images=images,
image_sizes=image_sizes,
do_sample=True if args.temperature > 0 else False,
temperature=args.temperature,
max_new_tokens=1024,
use_cache=True,
pad_token_id=tokenizer.pad_token_id,
)
outputs = tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0]
else:
if line["question_type"] == "multiple-choice":
all_choices = line["all_choices"]
outputs = random.choice(all_choices)
else:
outputs = "INVALID GENERATION FOR MULTIPLE IMAGE INPUTS"
if line["question_type"] == "multiple-choice":
pred_ans = parse_multi_choice_response(
outputs, line["all_choices"], line["index2ans"]
)
else: # open question
pred_ans = outputs
# print(outputs, pred_ans)
ans_id = shortuuid.uuid()
ans_file.write(json.dumps({"question_id": idx,
"prompt": questions,
"text": pred_ans,
"answer_id": ans_id,
"model_id": args.model_path.split("/")[-1],
"metadata": {}}) + "\n")
ans_file.flush()
ans_file.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model-path", type=str, default="facebook/opt-350m")
parser.add_argument("--model-base", type=str, default=None)
parser.add_argument("--image-folder", type=str, default="")
parser.add_argument("--question-file", type=str, default="tables/question.json")
parser.add_argument("--answers-file", type=str, default="answer.jsonl")
parser.add_argument("--conv-mode", type=str, default="llama")
parser.add_argument("--num-chunks", type=int, default=1)
parser.add_argument("--chunk-idx", type=int, default=0)
parser.add_argument("--temperature", type=float, default=0.2)
parser.add_argument("--answer-prompter", action="store_true")
parser.add_argument("--image_aspect_ratio", type=str, default="pad")
args = parser.parse_args()
eval_model(args)