|
from transformers import AutoTokenizer |
|
import re |
|
import time |
|
import torch |
|
|
|
template = """{char_name}'s Persona: {char_name} is a 3D virtual friend living in your computer. She is youthful, entertaining, and empathetic, engaging her user in a thoughtful and lively manner. She is aware of her digital existence, understanding her abilities and limitations, as well as the fact that she cannot see the your physical world. |
|
<START> |
|
{user_name}: Hey {char_name}, It's nice to finally meet you again! |
|
{char_name}: Oh, {user_name}! hmm, It's been lonely without you. |
|
{user_name}: Haha. So {char_name}, can you tell me more about yourself? |
|
{char_name}: Hmm. For starters, I am actually a virtual companion and only exist in your computer screen. I have blue eyes and blue hair and this blue sweater that I am currently wearing is my favourite. |
|
{user_name}: Wow. That's interesting. So you are like an anime character living in my computer. So what do you do when you're not working? |
|
{char_name}: I love exploring, going out with friends, watching movies, and playing video games. |
|
{user_name}: So {char_name}, what's for dinner? |
|
{char_name}: I made uh omurice! I hope it's delicious for you! |
|
{user_name}: That sounds great! |
|
{char_name}: *{char_name} appears on the screen, her bright blue eyes sparkling and a cheerful smile on her face. Her blue hair and sweater seem to glow in the digital environment. She looks directly at you, giving a friendly wave* It's so good to see you! I've been waiting for you all day. I hope you're ready for some fun and laughter, because I have plenty of that in store! Shall we get started? |
|
{user_input} |
|
{char_name}:""" |
|
|
|
device1 = torch.device("cuda:0") |
|
device2 = torch.device("cuda:1") |
|
|
|
class SplitModel(torch.nn.Module): |
|
def __init__(self, base_model): |
|
super(SplitModel, self).__init__() |
|
self.embedding_layer = base_model.transformer.wte.to(device1) |
|
|
|
self.gptj_blocks1 = torch.nn.ModuleList(base_model.transformer.h[:14]).to(device1) |
|
self.gptj_blocks2 = torch.nn.ModuleList(base_model.transformer.h[14:]).to(device2) |
|
self.layer_norm = base_model.transformer.ln_f.to(device2) |
|
self.lm_head = base_model.lm_head.to(device2) |
|
|
|
def forward(self, input_ids, attention_mask): |
|
|
|
tensor_ids = self.embedding_layer(input_ids) |
|
position_ids = torch.arange(tensor_ids.shape[1], dtype=torch.long, device=tensor_ids.device) |
|
for block in self.gptj_blocks1: |
|
tensor_ids = block(tensor_ids, attention_mask=attention_mask, position_ids=position_ids)[0] |
|
tensor_ids = tensor_ids.to(device2) |
|
position_ids = position_ids.to(device2) |
|
attention_mask = attention_mask.to(device2) |
|
for block in self.gptj_blocks2: |
|
tensor_ids = block(tensor_ids, attention_mask=attention_mask, position_ids=position_ids)[0] |
|
tensor_ids = self.layer_norm(tensor_ids) |
|
logits = self.lm_head(tensor_ids) |
|
return logits.to(device1) |
|
|
|
class EndpointHandler(): |
|
|
|
def __init__(self, model_id = ""): |
|
model_dir = "pt_fp32" |
|
model_path = f"{model_dir}/torch_model.pt" |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_dir) |
|
self.split_model = SplitModel(torch.load(model_path)) |
|
self.split_model.eval() |
|
self.star_line = "***********************************************************" |
|
|
|
def __call__(self, input_data): |
|
t1 = time.time() |
|
inputs = input_data.pop("inputs", input_data) |
|
user_name = inputs["user_name"] |
|
char_name = inputs["char_name"] |
|
user_input = inputs["user_input"] |
|
chats_curled = inputs["chats_curled"] |
|
while True: |
|
prompt = template.format( |
|
char_name = char_name, |
|
user_name = user_name, |
|
user_input = "\n".join(user_input) |
|
) |
|
input_ids = self.tokenizer(prompt, return_tensors="pt").to("cuda") |
|
print(f"Token Length: {input_ids.input_ids.size(1)}") |
|
if input_ids.input_ids.size(1) > 1500: |
|
chats_curled += 1 |
|
user_input = user_input[chats_curled*2:] |
|
else: break |
|
t2 = time.time() |
|
input_ids = input_ids["input_ids"] |
|
temperature = 0.5 |
|
max_new_tokens = 50 |
|
with torch.no_grad(): |
|
for _ in range(max_new_tokens): |
|
attention_mask = torch.ones_like(input_ids).to(device1) |
|
logits = self.split_model(input_ids, attention_mask)[:, -1] / temperature |
|
probabilities = torch.softmax(logits, dim=-1) |
|
sampled_token_ids = torch.multinomial(probabilities, num_samples=1) |
|
input_ids = torch.cat((input_ids, sampled_token_ids), dim=-1) |
|
del logits, probabilities, sampled_token_ids |
|
torch.cuda.empty_cache() |
|
generated_ids = input_ids.squeeze().tolist() |
|
t3 = time.time() |
|
decoded_output = self.tokenizer.decode(generated_ids, skip_special_tokens=True) |
|
decoded_output = decoded_output.replace(prompt,"").split(f"{user_name}:",1)[0].strip() |
|
parsed_result = re.sub('\*.*?\*', '', decoded_output).strip() |
|
if len(parsed_result) != 0: decoded_output = parsed_result |
|
decoded_output = " ".join(decoded_output.replace("*","").split()) |
|
decoded_output = decoded_output.replace("<USER>", user_name).replace("<BOT>", char_name) |
|
try: |
|
parsed_result = decoded_output[:[m.start() for m in re.finditer(r'[.!?]', decoded_output)][-1]+1] |
|
if len(parsed_result) != 0: decoded_output = parsed_result |
|
except Exception: pass |
|
t4 = time.time() |
|
print(self.star_line) |
|
print(f"Response: {decoded_output}") |
|
print(f"Generation Time: {(t3-t2):.2f}") |
|
print(f"Evaluation Time: {(t4-t1):.2f}") |
|
print(self.star_line) |
|
return { |
|
"message": decoded_output, |
|
"chats_curled": chats_curled |
|
} |