Python ONNXRuntime inference example

#3
by happyme531 - opened

Made by myself, just in case anyone want to run this with Python.
Some model params are hardcoded, may need to change if you want to run the 7B version.

import numpy as np
import onnxruntime as ort
from tokenizers import Tokenizer
import cv2
import tqdm

model_path = "."
onnx_model_path = f"{model_path}/onnx"
tokenizer = Tokenizer.from_file(f"{model_path}/tokenizer.json")
# np.random.seed(0)

image = None
prompt = "A stunning princess from kabul in red, white traditional clothing, blue eyes, brown hair"
mode = "t2i" # 文本生成图片, 反过来是it2t -> 图片/文本生成文本

# image = "./test.png"
# prompt = "仔细描述这张图片."
# mode = "it2t"

tempature = 1

# 1. 加载模型

# 语言模型的Embedding和图片理解用的视觉编码器
# <- input_ids: int64[batch_size,sequence_length]
# <- pixel_values: float32[batch_size,num_images,3,384,384]
# <- images_seq_mask: boolean[batch_size,sequence_length]
# <- images_emb_mask: boolean[batch_size,num_images,num_image_tokens]  (每个图片576个token)
# -> inputs_embeds: float32[batch_size,sequence_length,2048]
prepare_inputs = ort.InferenceSession(f"{onnx_model_path}/prepare_inputs_embeds.onnx")
# 语言模型
# <- inputs_embeds: float32[batch_size,sequence_length,2048]
# <- attention_mask: int64[batch_size,total_sequence_length]
# <- position_ids: int64[batch_size,sequence_length]
# <- past_key_values.<0..23>.key: float32[batch_size,16,past_sequence_length,128]
# <- past_key_values.<0..23>.value: float32[batch_size,16,past_sequence_length,128]
# -> hidden_states: float32[batch_size,sequence_length,2048]
# -> present.<0..23>.key: float32[batch_size,16,total_sequence_length,128]
# -> present.<0..23>.value: float32[batch_size,16,total_sequence_length,128]
language_model = ort.InferenceSession(f"{onnx_model_path}/language_model_q4.onnx") 
# LM Head
# <- hidden_states: float32[batch_size,sequence_length,2048]
# -> logits: float32[batch_size,sequence_length,102400]
lm_head = ort.InferenceSession(f"{onnx_model_path}/lm_head.onnx")
# 图片生成Head
# <- hidden_states: float32[batch_size,sequence_length,2048]
# -> logits: float32[batch_size,sequence_length,16384]
gen_head = ort.InferenceSession(f"{onnx_model_path}/gen_head.onnx")
# 图片生成Embedding
# <- image_ids: int64[batch_size,sequence_length]
# -> inputs_embeds: float32[batch_size,sequence_length,2048]
gen_img_embeds = ort.InferenceSession(f"{onnx_model_path}/gen_img_embeds.onnx")
# 文本Embedding
# <- input_ids: int64[batch_size,sequence_length]
# -> inputs_embeds: float32[batch_size,sequence_length,2048]
text_embeds = ort.InferenceSession(f"{onnx_model_path}/embed_tokens.onnx")
# VQVAE 解码器 (576个token变成一个384x384的图片)
# <- generated_tokens: int64[batch_size,sequence_length]
# -> decoded_image: float32[batch_size,3,384,384]
image_decode = ort.InferenceSession(f"{onnx_model_path}/image_decode.onnx")

# 2. 预处理输入
# tokenizer会在最开始加<|begin▁of▁sentence|>, 这里不要加!
if mode == "t2i":
    input_str = f"""<|User|>: {prompt}

<|Assistant|>:<begin_of_image>"""
else:
    input_str = f"""You are a helpful language and vision assistant. You are able to understand the visual content that the user provides, and assist the user with a variety of tasks using natural language.

<|User|>: <image_placeholder>
{prompt}

<|Assistant|>:"""

# 3. 生成Embedding

# 把<image_placeholder>替换为576个<image_placeholder>
input_str = input_str.replace("<image_placeholder>", "<image_placeholder>" * 576)
input = tokenizer.encode(input_str)
input_ids = np.array([input.ids], dtype=np.int64)
input_len = len(input.ids)
attention_mask = np.array([input.attention_mask], dtype=np.int64)
images_seq_mask = np.array([[1 if id == 100581 else 0 for id in input.ids]], dtype=np.bool_)  # 为什么<image_placeholder>有两个id?
position_ids = np.expand_dims(np.arange(input_len), axis=0)
#图片预处理
if image:
    img = cv2.imread(image)
    if img is None:
        raise ValueError(f"无法读取图片: {image}")
    # 将BGR转换为RGB
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    # 调整尺寸为目标大小:384x384
    target_size = 384
    img = cv2.resize(img, (target_size, target_size), interpolation=cv2.INTER_LINEAR)
    # 转换数据类型为float32,并使用rescale_factor归一化像素值到[0,1]
    img = img.astype(np.float32) * 0.00392156862745098  # 0.00392156 = 1/255
    # 根据配置文件归一化: (img - image_mean) / image_std,其中 image_mean = [0.5, 0.5, 0.5], image_std = [0.5, 0.5, 0.5]
    img = (img - np.array([0.5, 0.5, 0.5], dtype=np.float32)) / np.array([0.5, 0.5, 0.5], dtype=np.float32)
    # 如果图像尺寸不是正方形,还可以用background_color填充,这里由于直接resize为正方形,故忽略这部分
    # 转换图像维度为 [batch_size, num_images, channels, height, width]
    # 先将HWC格式转为CHW
    img = img.transpose(2, 0, 1)  # 得到 [3, 384, 384]
    pixel_values = np.expand_dims(np.expand_dims(img, axis=0), axis=1)  # [1, 1, 3, 384, 384]
    images_emb_mask = np.ones((1, 1, 576), dtype=np.bool_)
else:
    pixel_values = np.zeros((0, 0, 3, 384, 384), dtype=np.float32)
    images_emb_mask = np.zeros((1, 0, 576), dtype=np.bool_)

inputs_embeds = prepare_inputs.run(None, {
    "input_ids": input_ids,
    # "attention_mask": attention_mask,
    "images_seq_mask": images_seq_mask,
    "images_emb_mask": images_emb_mask,
    "pixel_values": pixel_values
})[0]

# 4. 语言模型
num_layers = 24  # 假设语言模型有24层
# 初始化 KVCache,每层的 past_key 与 past_value 初始时序列长度为0
past_kv = {}
for layer in range(num_layers):
    past_kv[f"past_key_values.{layer}.key"] = np.zeros((1, 16, 0, 128), dtype=np.float32)
    past_kv[f"past_key_values.{layer}.value"] = np.zeros((1, 16, 0, 128), dtype=np.float32)
# 用于保存生成的图片 token(这里仅保留条件分支生成的 token)
generated_tokens = []

# 循环生成576个图片 token
current_pos = position_ids[0, -1]
for i in tqdm.tqdm(range(576)):
    # 构造语言模型输入参数,包含输入 embedding、attention_mask、position_ids 以及 KVCache
    model_have_position_ids = language_model.get_inputs()[2].name == "position_ids"
    lm_inputs = {
        "inputs_embeds": inputs_embeds,
        "attention_mask": attention_mask,
        "position_ids": position_ids if model_have_position_ids else None,
    }
    lm_inputs.update(past_kv)  # 添加 KVCache 作为输入
    # 使用语言模型进行单步推理(利用 KVCache 节省计算,每步推理只输入最新 token 的 embedding)
    lm_outs = language_model.run(None, lm_inputs)
    hidden_states = lm_outs[0]  # 第1个输出为 hidden_states, shape: [1, current_seq_len, 2048]
    # 取每个分支最新生成 token 对应的 hidden state(即最后一个 token 的 hidden state)
    hs = hidden_states[:, -1:, :]  # shape: [1, 1, 2048]
    # 提取新的 KVCache,从输出中除隐藏状态外,后续2*24项为各层 present key 与 value
    new_past_kv = {}
    out_index = 1
    for layer in range(num_layers):
        new_past_kv[f"past_key_values.{layer}.key"] = lm_outs[out_index]
        out_index += 1
        new_past_kv[f"past_key_values.{layer}.value"] = lm_outs[out_index]
        out_index += 1
    # 更新 KVCache,用于下次推理
    past_kv = new_past_kv
    # 使用 Head 得到当前步输出的 logits,输出形状为 [1, 1, 16384]
    logits = (gen_head if mode == "t2i" else lm_head).run(None, {"hidden_states": hs})[0]
    logits = logits[:, -1, :]  # shape: [1, 16384]
    # 温度采样,调整 logits 分布并随机采样 (不能用贪婪采样)
    logits = logits / tempature  # 使用之前定义的 tempature 参数
    # 计算 softmax
    exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))[0]
    probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
    # 多项式采样
    probs = probs.astype(np.float64)
    probs /= probs.sum()
    next_token = int(np.random.multinomial(1, probs).argmax())
    generated_tokens.append(next_token)
    if next_token == 100001:  # eos
        break
    # 将生成的 token 转换为 embedding
    if mode == "t2i":
        inputs_embeds = gen_img_embeds.run(None, {"image_ids": np.array([[next_token]], dtype=np.int64)})[0]  # shape: [1, 1, 2048]
    else:
        inputs_embeds = text_embeds.run(None, {"input_ids": np.array([[next_token]], dtype=np.int64)})[0]  # shape: [1, 1, 2048]
    # 更新 attention mask,新生成 token 标记为1
    new_mask = np.ones((1, 1), dtype=attention_mask.dtype)
    attention_mask = np.concatenate([attention_mask, new_mask], axis=1)
    # 更新 position_ids,位置号递增
    current_pos += 1
    position_ids = np.array([[current_pos]], dtype=position_ids.dtype)

# 5. 图片或者文本解码
if mode == "t2i":
    # 将生成的576个图片token拼接为数组,输入到VQVAE解码器进行图像解码
    generated_tokens_array = np.array([generated_tokens], dtype=np.int64)  # shape: [1, 576]
    decoded_image = image_decode.run(None, {"generated_tokens": generated_tokens_array})[0]  # 输出形状: [1, 3, 384, 384]
    decoded_image = np.clip((decoded_image + 1) / 2 * 255, 0, 255)
    # 后处理图像:将图像从CHW转换为HWC,并利用cv2保存为png格式
    decoded_image = np.squeeze(decoded_image, axis=0)  # [3, 384, 384]
    decoded_image = np.transpose(decoded_image, (1, 2, 0))  # [384, 384, 3]
    cv2.imwrite("generated.png", cv2.cvtColor(decoded_image, cv2.COLOR_RGB2BGR))
    print("(generated.png)")
else:
    decoded_text = tokenizer.decode(generated_tokens)
    print(f"{decoded_text}")

Sign up or log in to comment