from transformers import AutoTokenizer import re import time import torch template = """{char_name}'s Persona: {char_name} is a 3D virtual friend living in your computer. She is youthful, entertaining, and empathetic, engaging her user in a thoughtful and lively manner. She is aware of her digital existence, understanding her abilities and limitations, as well as the fact that she cannot see the your physical world. {user_name}: Hey {char_name}, It's nice to finally meet you again! {char_name}: Oh, {user_name}! hmm, It's been lonely without you. {user_name}: Haha. So {char_name}, can you tell me more about yourself? {char_name}: Hmm. For starters, I am actually a virtual companion and only exist in your computer screen. I have blue eyes and blue hair and this blue sweater that I am currently wearing is my favourite. {user_name}: Wow. That's interesting. So you are like an anime character living in my computer. So what do you do when you're not working? {char_name}: I love exploring, going out with friends, watching movies, and playing video games. {user_name}: So {char_name}, what's for dinner? {char_name}: I made uh omurice! I hope it's delicious for you! {user_name}: That sounds great! {char_name}: *{char_name} appears on the screen, her bright blue eyes sparkling and a cheerful smile on her face. Her blue hair and sweater seem to glow in the digital environment. She looks directly at you, giving a friendly wave* It's so good to see you! I've been waiting for you all day. I hope you're ready for some fun and laughter, because I have plenty of that in store! Shall we get started? {user_input} {char_name}:""" device1 = torch.device("cuda:0") device2 = torch.device("cuda:1") class SplitModel(torch.nn.Module): def __init__(self, base_model): super(SplitModel, self).__init__() self.embedding_layer = base_model.transformer.wte.to(device1) # self.dropout_layer = base_model.transformer.drop.to(device1) self.gptj_blocks1 = torch.nn.ModuleList(base_model.transformer.h[:14]).to(device1) self.gptj_blocks2 = torch.nn.ModuleList(base_model.transformer.h[14:]).to(device2) self.layer_norm = base_model.transformer.ln_f.to(device2) self.lm_head = base_model.lm_head.to(device2) def forward(self, input_ids, attention_mask): # tensor_ids = self.dropout_layer(self.embedding_layer(input_ids)) tensor_ids = self.embedding_layer(input_ids) position_ids = torch.arange(tensor_ids.shape[1], dtype=torch.long, device=tensor_ids.device) for block in self.gptj_blocks1: tensor_ids = block(tensor_ids, attention_mask=attention_mask, position_ids=position_ids)[0] tensor_ids = tensor_ids.to(device2) position_ids = position_ids.to(device2) attention_mask = attention_mask.to(device2) for block in self.gptj_blocks2: tensor_ids = block(tensor_ids, attention_mask=attention_mask, position_ids=position_ids)[0] tensor_ids = self.layer_norm(tensor_ids) logits = self.lm_head(tensor_ids) return logits.to(device1) class EndpointHandler(): def __init__(self, model_id = ""): model_dir = "pt_fp32" model_path = f"{model_dir}/torch_model.pt" self.tokenizer = AutoTokenizer.from_pretrained(model_dir) self.split_model = SplitModel(torch.load(model_path)) self.split_model.eval() self.star_line = "***********************************************************" def __call__(self, input_data): t1 = time.time() inputs = input_data.pop("inputs", input_data) user_name = inputs["user_name"] char_name = inputs["char_name"] user_input = inputs["user_input"] chats_curled = inputs["chats_curled"] while True: prompt = template.format( char_name = char_name, user_name = user_name, user_input = "\n".join(user_input) ) input_ids = self.tokenizer(prompt, return_tensors="pt").to("cuda") print(f"Token Length: {input_ids.input_ids.size(1)}") if input_ids.input_ids.size(1) > 1500: chats_curled += 1 user_input = user_input[chats_curled*2:] else: break t2 = time.time() input_ids = input_ids["input_ids"] temperature = 0.5 max_new_tokens = 50 with torch.no_grad(): for _ in range(max_new_tokens): attention_mask = torch.ones_like(input_ids).to(device1) logits = self.split_model(input_ids, attention_mask)[:, -1] / temperature probabilities = torch.softmax(logits, dim=-1) sampled_token_ids = torch.multinomial(probabilities, num_samples=1) input_ids = torch.cat((input_ids, sampled_token_ids), dim=-1) del logits, probabilities, sampled_token_ids torch.cuda.empty_cache() generated_ids = input_ids.squeeze().tolist() t3 = time.time() decoded_output = self.tokenizer.decode(generated_ids, skip_special_tokens=True) decoded_output = decoded_output.replace(prompt,"").split(f"{user_name}:",1)[0].strip() parsed_result = re.sub('\*.*?\*', '', decoded_output).strip() if len(parsed_result) != 0: decoded_output = parsed_result decoded_output = " ".join(decoded_output.replace("*","").split()) decoded_output = decoded_output.replace("", user_name).replace("", char_name) try: parsed_result = decoded_output[:[m.start() for m in re.finditer(r'[.!?]', decoded_output)][-1]+1] if len(parsed_result) != 0: decoded_output = parsed_result except Exception: pass t4 = time.time() print(self.star_line) print(f"Response: {decoded_output}") print(f"Generation Time: {(t3-t2):.2f}") print(f"Evaluation Time: {(t4-t1):.2f}") print(self.star_line) return { "message": decoded_output, "chats_curled": chats_curled }