|
import os |
|
|
|
|
|
import torch |
|
|
|
|
|
import json |
|
from PIL import Image |
|
|
|
|
|
|
|
import base64 |
|
import io |
|
|
|
|
|
from accelerate import load_checkpoint_and_dispatch, init_empty_weights |
|
|
|
|
|
|
|
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
|
|
|
|
|
from omnilmm.utils import disable_torch_init |
|
from omnilmm.model.omnilmm import OmniLMMForCausalLM |
|
from omnilmm.model.utils import build_transform |
|
|
|
|
|
from omnilmm.train.train_utils import omni_preprocess |
|
|
|
|
|
DEFAULT_IMAGE_TOKEN = "<image>" |
|
DEFAULT_IMAGE_PATCH_TOKEN = "<im_patch>" |
|
DEFAULT_IM_START_TOKEN = "<im_start>" |
|
DEFAULT_IM_END_TOKEN = "<im_end>" |
|
|
|
|
|
|
|
def init_omni_lmm(model_path): |
|
torch.backends.cuda.matmul.allow_tf32 = True |
|
disable_torch_init() |
|
model_name = os.path.expanduser(model_path) |
|
print(f'Load omni_lmm model and tokenizer from {model_name}') |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
model_name, model_max_length=2048) |
|
|
|
if False: |
|
|
|
with init_empty_weights(): |
|
model = OmniLMMForCausalLM.from_pretrained(model_name, tune_clip=True, torch_dtype=torch.bfloat16) |
|
model = load_checkpoint_and_dispatch(model, model_name, dtype=torch.bfloat16, |
|
device_map="auto", no_split_module_classes=['Eva','MistralDecoderLayer', 'ModuleList', 'Resampler'] |
|
) |
|
else: |
|
model = OmniLMMForCausalLM.from_pretrained( |
|
model_name, tune_clip=True, torch_dtype=torch.bfloat16 |
|
).to(device='cuda', dtype=torch.bfloat16) |
|
|
|
image_processor = build_transform( |
|
is_train=False, input_size=model.model.config.image_size, std_mode='OPENAI_CLIP') |
|
|
|
mm_use_im_start_end = getattr(model.config, "mm_use_im_start_end", False) |
|
assert mm_use_im_start_end |
|
|
|
tokenizer.add_tokens([DEFAULT_IMAGE_PATCH_TOKEN, DEFAULT_IM_START_TOKEN, |
|
DEFAULT_IM_END_TOKEN], special_tokens=True) |
|
|
|
|
|
vision_config = model.model.vision_config |
|
vision_config.im_patch_token = tokenizer.convert_tokens_to_ids( |
|
[DEFAULT_IMAGE_PATCH_TOKEN])[0] |
|
vision_config.use_im_start_end = mm_use_im_start_end |
|
vision_config.im_start_token, vision_config.im_end_token = tokenizer.convert_tokens_to_ids( |
|
[DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN]) |
|
image_token_len = model.model.config.num_query |
|
|
|
return model, image_processor, image_token_len, tokenizer |
|
|
|
def expand_question_into_multimodal(question_text, image_token_len, im_st_token, im_ed_token, im_patch_token): |
|
if '<image>' in question_text[0]['content']: |
|
question_text[0]['content'] = question_text[0]['content'].replace( |
|
'<image>', im_st_token + im_patch_token * image_token_len + im_ed_token) |
|
else: |
|
question_text[0]['content'] = im_st_token + im_patch_token * \ |
|
image_token_len + im_ed_token + '\n' + question_text[0]['content'] |
|
return question_text |
|
|
|
def wrap_question_for_omni_lmm(question, image_token_len, tokenizer): |
|
question = expand_question_into_multimodal( |
|
question, image_token_len, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN, DEFAULT_IMAGE_PATCH_TOKEN) |
|
|
|
conversation = question |
|
data_dict = omni_preprocess(sources=[conversation], |
|
tokenizer=tokenizer, |
|
generation=True) |
|
|
|
data_dict = dict(input_ids=data_dict["input_ids"][0], |
|
labels=data_dict["labels"][0]) |
|
return data_dict |
|
|
|
|
|
|
|
class OmniLMM12B: |
|
def __init__(self, model_path) -> None: |
|
model, img_processor, image_token_len, tokenizer = init_omni_lmm(model_path) |
|
self.model = model |
|
self.image_token_len = image_token_len |
|
self.image_transform = img_processor |
|
self.tokenizer = tokenizer |
|
self.model.eval() |
|
|
|
def decode(self, image, input_ids): |
|
with torch.inference_mode(): |
|
output = self.model.generate_vllm( |
|
input_ids=input_ids.unsqueeze(0).cuda(), |
|
images=image.unsqueeze(0).half().cuda(), |
|
temperature=0.6, |
|
max_new_tokens=1024, |
|
|
|
do_sample=True, |
|
output_scores=True, |
|
return_dict_in_generate=True, |
|
repetition_penalty=1.1, |
|
top_k=30, |
|
top_p=0.9, |
|
) |
|
|
|
response = self.tokenizer.decode( |
|
output.sequences[0], skip_special_tokens=True) |
|
response = response.strip() |
|
return response |
|
|
|
def chat(self, input): |
|
try: |
|
image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
|
except Exception as e: |
|
return "Image decode error" |
|
|
|
msgs = json.loads(input['question']) |
|
input_ids = wrap_question_for_omni_lmm( |
|
msgs, self.image_token_len, self.tokenizer)['input_ids'] |
|
input_ids = torch.as_tensor(input_ids) |
|
|
|
image = self.image_transform(image) |
|
|
|
out = self.decode(image, input_ids) |
|
|
|
return out |
|
|
|
|
|
def img2base64(file_name): |
|
with open(file_name, 'rb') as f: |
|
encoded_string = base64.b64encode(f.read()) |
|
return encoded_string |
|
|
|
class MiniCPMV: |
|
def __init__(self, model_path) -> None: |
|
self.model = AutoModel.from_pretrained(model_path, trust_remote_code=True).to(dtype=torch.bfloat16) |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
|
self.model.eval().cuda() |
|
|
|
def chat(self, input): |
|
try: |
|
image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
|
except Exception as e: |
|
return "Image decode error" |
|
|
|
msgs = json.loads(input['question']) |
|
|
|
answer, context, _ = self.model.chat( |
|
image=image, |
|
msgs=msgs, |
|
context=None, |
|
tokenizer=self.tokenizer, |
|
sampling=True, |
|
temperature=0.7 |
|
) |
|
return answer |
|
|
|
class MiniCPMV2_5: |
|
def __init__(self, model_path) -> None: |
|
self.model = AutoModel.from_pretrained(model_path, trust_remote_code=True).to(dtype=torch.float16) |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
|
self.model.eval().cuda() |
|
|
|
def chat(self, input): |
|
try: |
|
image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
|
except Exception as e: |
|
return "Image decode error" |
|
|
|
msgs = json.loads(input['question']) |
|
|
|
answer = self.model.chat( |
|
image=image, |
|
msgs=msgs, |
|
tokenizer=self.tokenizer, |
|
sampling=True, |
|
temperature=0.7 |
|
) |
|
return answer |
|
|
|
class MiniCPMV2_6: |
|
def __init__(self, model_path, multi_gpus=False) -> None: |
|
|
|
print('torch_version:', torch.__version__) |
|
if multi_gpus: |
|
from accelerate import load_checkpoint_and_dispatch, init_empty_weights, infer_auto_device_map |
|
with init_empty_weights(): |
|
model = AutoModel.from_pretrained(model_path, trust_remote_code=True, |
|
attn_implementation='sdpa', torch_dtype=torch.bfloat16) |
|
|
|
device_map = infer_auto_device_map(model, max_memory={0: "10GB", 1: "10GB"}, |
|
no_split_module_classes=['SiglipVisionTransformer', 'Qwen2DecoderLayer']) |
|
device_id = device_map["llm.model.embed_tokens"] |
|
device_map["llm.lm_head"] = device_id |
|
device_map["vpm"] = device_id |
|
device_map["resampler"] = device_id |
|
device_id2 = device_map["llm.model.layers.26"] |
|
device_map["llm.model.layers.8"] = device_id2 |
|
device_map["llm.model.layers.9"] = device_id2 |
|
device_map["llm.model.layers.10"] = device_id2 |
|
device_map["llm.model.layers.11"] = device_id2 |
|
device_map["llm.model.layers.12"] = device_id2 |
|
device_map["llm.model.layers.13"] = device_id2 |
|
device_map["llm.model.layers.14"] = device_id2 |
|
device_map["llm.model.layers.15"] = device_id2 |
|
device_map["llm.model.layers.16"] = device_id2 |
|
print(device_map) |
|
|
|
self.model = load_checkpoint_and_dispatch(model, model_path, dtype=torch.bfloat16, device_map=device_map) |
|
self.model.eval() |
|
else: |
|
self.model = AutoModel.from_pretrained(model_path, trust_remote_code=True, |
|
attn_implementation='sdpa', torch_dtype=torch.bfloat16) |
|
self.model.eval().cuda() |
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
|
|
|
def chat(self, input): |
|
image = None |
|
if "image" in input and len(input["image"]) > 10: |
|
try: |
|
image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
|
except Exception as e: |
|
return "Image decode error" |
|
|
|
msgs = json.loads(input["question"]) |
|
|
|
for msg in msgs: |
|
contents = msg.pop('content') |
|
if isinstance(contents, str): |
|
contents = [contents] |
|
|
|
new_cnts = [] |
|
for c in contents: |
|
if isinstance(c, dict): |
|
if c['type'] == 'text': |
|
c = c['pairs'] |
|
elif c['type'] == 'image': |
|
c = Image.open(io.BytesIO(base64.b64decode(c["pairs"]))).convert('RGB') |
|
else: |
|
raise ValueError("content type only support text and image.") |
|
new_cnts.append(c) |
|
msg['content'] = new_cnts |
|
print(f'msgs: {str(msgs)}') |
|
|
|
answer = self.model.chat( |
|
image=image, |
|
msgs=msgs, |
|
tokenizer=self.tokenizer, |
|
) |
|
return answer |
|
|
|
|
|
class MiniCPMVChat: |
|
def __init__(self, model_path, multi_gpus=False) -> None: |
|
if '12B' in model_path: |
|
self.model = OmniLMM12B(model_path) |
|
elif 'MiniCPM-Llama3-V' in model_path: |
|
self.model = MiniCPMV2_5(model_path) |
|
elif 'MiniCPM-V-2_6' in model_path: |
|
self.model = MiniCPMV2_6(model_path, multi_gpus) |
|
else: |
|
self.model = MiniCPMV(model_path) |
|
|
|
def chat(self, input): |
|
return self.model.chat(input) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
model_path = 'openbmb/OmniLMM-12B' |
|
chat_model = MiniCPMVChat(model_path) |
|
|
|
im_64 = img2base64('./assets/worldmap_ck.jpg') |
|
|
|
|
|
msgs = [{"role": "user", "content": "What is interesting about this image?"}] |
|
input = {"image": im_64, "question": json.dumps(msgs, ensure_ascii=True)} |
|
answer = chat_model.chat(input) |
|
print(msgs[-1]["content"]+'\n', answer) |
|
|
|
|
|
msgs.append({"role": "assistant", "content": answer}) |
|
msgs.append({"role": "user", "content": "Where is China in the image"}) |
|
input = {"image": im_64,"question": json.dumps(msgs, ensure_ascii=True)} |
|
answer = chat_model.chat(input) |
|
print(msgs[-1]["content"]+'\n', answer) |
|
|
|
|