phi3_mini_amd_NPU / model_utils.py
BoruiXu's picture
upload
55a580f verified
#
# Copyright © 2023 Advanced Micro Devices, Inc. All rights reserved.
#
import torch
import logging
import time
import random
import numpy as np
prompts = [ "What is the meaning of life?",
"Tell me something you don't know.",
"What does Xilinx do?",
"What is the mass of earth?",
"What is a poem?",
"What is recursion?",
"Tell me a one line joke.",
"Who is Gilgamesh?",
"Tell me something about cryptocurrency.",
"How did it all begin?"
]
def warmup(model, tokenizer, max_new_tokens=30):
print(f"Warming up ... ")
for prompt in prompts[0:1]:
inputs = tokenizer(prompt, return_tensors="pt")
generate_ids = model.generate(inputs.input_ids, attention_mask=inputs.attention_mask, max_new_tokens=max_new_tokens)
_ = tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
print(f"Warm up DONE!! ")
def decode_prompt(model, tokenizer, prompt, input_ids=None, max_new_tokens=30):
if input_ids is None:
print(f"prompt: {prompt}")
start = time.time()
inputs = tokenizer(prompt, return_tensors="pt")
end = time.time()
logging.critical(f"[PROFILE][WARMUP] tokenizer: {end-start}")
else:
logging.critical(f"[PROFILE][WARMUP] tokenizer: na") # for logging consistency
start, end = 0, 0
prompt_tokens = 0
input_ids_ = input_ids if prompt is None else inputs.input_ids
# attention_mask = torch.ones((1, input_ids.numel())) if prompt is None else inputs.attention_mask
start = time.time()
generate_ids = model.generate(input_ids_, max_new_tokens=max_new_tokens,eos_token_id=None)
# generate_ids = model.generate(input_ids_, attention_mask=attention_mask, max_new_tokens=max_new_tokens)
end = time.time()
prompt_tokens = input_ids_.shape[1]
num_tokens_out = generate_ids.shape[1]
new_tokens_generated = num_tokens_out - prompt_tokens
generate_time = (end - start)
# print(generate_time)
time_per_token = (generate_time/new_tokens_generated)*1e3
# print(time_per_token)
logging.critical(f"[PROFILE][AIE] generate: {generate_time} for {num_tokens_out} tokens; prompt-tokens: {prompt_tokens}; time per generated token: {time_per_token}")
start = time.time()
response = tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
end = time.time()
logging.critical(f"[PROFILE][WARMUP] tokenizer decode: {end-start}")
print(f"response: {response}")
logging.critical(f"response: {response}")
def decode_prompts(model, tokenizer, max_new_tokens=30):
for prompt in prompts:
logging.critical("*"*40)
print("*"*40)
decode_prompt(model, tokenizer, prompt, max_new_tokens=max_new_tokens)
def get_wikitext2(tokenizer, dataset="non-raw", nsamples=128, seqlen=2048):
""" gptq """
from datasets import load_dataset
if dataset == "non-raw":
traindata = load_dataset('wikitext', 'wikitext-2-v1', split='train')
testdata = load_dataset('wikitext', 'wikitext-2-v1', split='test')
elif dataset == "raw":
traindata = load_dataset('wikitext', 'wikitext-2-raw-v1', split='train')
testdata = load_dataset('wikitext', 'wikitext-2-raw-v1', split='test')
else:
raise ValueError(
"You are using an unsupported dataset, only support wikitext2-raw-v1 and wikitext2-v1."
"Using wikitext2-raw-v1 with --dataset=raw and wikitext2-v1 with --dataset=non-raw."
)
trainenc = tokenizer("\n\n".join(traindata['text']), return_tensors='pt')
testenc = tokenizer("\n\n".join(testdata['text']), return_tensors='pt')
dataloader = []
for _ in range(nsamples):
i = random.randint(0, testenc.input_ids.shape[1] - seqlen - 1)
j = i + seqlen
inp = testenc.input_ids[:, i:j]
tar = inp.clone()
tar[:, :-1] = -100
dataloader.append((inp, tar))
return dataloader, testenc
def perplexity(model, tokenizer, dataset, framework="pytorch"):
random.seed(0)
np.random.seed(0)
torch.random.manual_seed(0)
print(f"Calculating Perplexity on wikitext2 test set ...")
model = model#.cuda()
dataloader, testenc = get_wikitext2(tokenizer, dataset=dataset)
model.seqlen = 2048
test_enc = testenc.input_ids
nsamples = 2 #test_enc.numel() // model.seqlen
if framework == "pytorch":
dtype = next(iter(model.parameters())).dtype
loss = torch.nn.CrossEntropyLoss()
nlls = []
with torch.no_grad():
attention_mask = torch.ones((1, test_enc.numel()))#.cuda()
for i in range(nsamples):
batch = test_enc[:, (i * model.seqlen):((i + 1) * model.seqlen)]#.cuda()
if framework == "pytorch":
out = model(
batch,
attention_mask=attention_mask[:, (i * model.seqlen):((i + 1) * model.seqlen)].reshape((1, -1))
)
else :
out = model(
batch,
attention_mask=batch.new_ones(batch.shape)
)
shift_labels = test_enc[
:, (i * model.seqlen):((i + 1) * model.seqlen)
][:, 1:]#.cuda()
loss_fct = torch.nn.CrossEntropyLoss()
loss = loss_fct(out.logits[0][:-1, :], shift_labels.view(-1))
neg_log_likelihood = loss.float() * model.seqlen
nlls.append(neg_log_likelihood)
ppl = torch.exp(torch.stack(nlls).sum() / (nsamples * model.seqlen))
print('Perplexity:', ppl.item())