ert / app.py
savage1221's picture
Update app.py
19a6bcf verified
raw
history blame
14.1 kB
import os
from transformers import AutoTokenizer
# from optimum.intel import OVModelForCausalLM, OVWeightQuantizationConfig
# from optimum.intel.openvino import OVModelForCausalLM
from transformers import AutoConfig, AutoTokenizer
import gradio as gr
import time
from threading import Thread
from transformers import (
TextIteratorStreamer,
StoppingCriteria,
StoppingCriteriaList,
GenerationConfig,
)
# model_name = "openai-community/gpt2-large"
# model_dir = "F:\\phi3\\openvinomodel\\phi3\\int4"
# model_name = "savage1221/lora-fine"
# save_name = model_name.split("/")[-1] + "_openvino"
# precision = "f32"
# quantization_config = OVWeightQuantizationConfig(
# bits=4,
# sym=False,
# group_size=128,
# ratio=0.6,
# trust_remote_code=True,
# )
# ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "CACHE_DIR": ""}
# device = "gpu"
# load_kwargs = {
# "device": device,
# "ov_config": {
# "PERFORMANCE_HINT": "LATENCY",
# # "INFERENCE_PRECISION_HINT": precision,
# "CACHE_DIR": os.path.join(save_name, "model_cache"), # OpenVINO will use this directory as cache
# },
# "compile": False,
# "quantization_config": quantization_config,
# "trust_remote_code": True,
# # ov_config = ov_config
# }
# # Check whether the model was already exported
# saved = os.path.exists(save_name)
# model = OVModelForCausalLM.from_pretrained(
# # model_name
# model_name if not saved else save_name,
# export=not saved,
# **load_kwargs,
# )
# model = OVModelForCausalLM.from_pretrained(
# model_name,
# device='GPU.0',
# ov_config=ov_config,
# config=AutoConfig.from_pretrained(model_name, trust_remote_code=True),
# trust_remote_code=True,
# )
# # Load tokenizer to be used with the model
# tokenizer = AutoTokenizer.from_pretrained(model_name if not saved else save_name)
# tokenizer = AutoTokenizer.from_pretrained(model_name )
# # Save the exported model locally
# if not saved:
# model.save_pretrained(save_name)
# tokenizer.save_pretrained(save_name)
# # TODO Optional: export to huggingface/hub
# model_size = os.stat(os.path.join(save_name, "openvino_model.bin")).st_size / 1024 ** 3
# print(f'Model size in FP32: ~5.4GB, current model size in 4bit: {model_size:.2f}GB')
#####################################################################
# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("savage1221/lora-fine", trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained("savage1221/lora-fine", trust_remote_code=True)
# Copied and modified from https://github.com/bigcode-project/bigcode-evaluation-harness/blob/main/bigcode_eval/generation.py#L13
class SuffixCriteria(StoppingCriteria):
def __init__(self, start_length, eof_strings, tokenizer, check_fn=None):
self.start_length = start_length
self.eof_strings = eof_strings
self.tokenizer = tokenizer
if check_fn is None:
check_fn = lambda decoded_generation: any(
[decoded_generation.endswith(stop_string) for stop_string in self.eof_strings]
)
self.check_fn = check_fn
def __call__(self, input_ids, scores, **kwargs):
"""Returns True if generated sequence ends with any of the stop strings"""
decoded_generations = self.tokenizer.batch_decode(input_ids[:, self.start_length :])
return all([self.check_fn(decoded_generation) for decoded_generation in decoded_generations])
def is_partial_stop(output, stop_str):
"""Check whether the output contains a partial stop str."""
for i in range(0, min(len(output), len(stop_str))):
if stop_str.startswith(output[-i:]):
return True
return False
# Set the chat template to the tokenizer. The chat template implements the simple template of
# User: content
# Assistant: content
# ...
# Read more about chat templates here https://huggingface.co/docs/transformers/main/en/chat_templating
tokenizer.chat_template = "{% for message in messages %}{{message['role'] + ': ' + message['content'] + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ 'Assistant:' }}{% endif %}"
# def prepare_history_for_model(history):
# """
# Converts the history to a tokenized prompt in the format expected by the model.
# Params:
# history: dialogue history
# Returns:
# Tokenized prompt
# """
# messages = []
# for idx, (user_msg, model_msg) in enumerate(history):
# # skip the last assistant message if its empty, the tokenizer will do the formating
# if idx == len(history) - 1 and not model_msg:
# messages.append({"role": "User", "content": user_msg})
# break
# if user_msg:
# messages.append({"role": "User", "content": user_msg})
# if model_msg:
# messages.append({"role": "Assistant", "content": model_msg})
# input_token = tokenizer.apply_chat_template(
# messages,
# add_generation_prompt=True,
# tokenize=True,
# return_tensors="pt",
# return_dict=True
# )
# return input_token
def prepare_history_for_model(history):
"""
Converts the history to a tokenized prompt in the format expected by the model.
Params:
history: dialogue history
Returns:
Tokenized prompt
"""
messages = []
# Add instruction
instruction = "Generate quotes for AWS RDS services"
messages.append({"role": "Instruction", "content": instruction})
for idx, (user_msg, model_msg) in enumerate(history):
# Assuming the user message contains the product information
if user_msg:
messages.append({"role": "Input", "content": user_msg})
# Skip the last assistant message if it's empty
if idx == len(history) - 1 and not model_msg:
break
if model_msg:
messages.append({"role": "Output", "content": model_msg})
input_token = tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt",
return_dict=True
)
return input_token
def generate(history, temperature, max_new_tokens, top_p, repetition_penalty, assisted):
"""
Generates the assistant's reponse given the chatbot history and generation parameters
Params:
history: conversation history formated in pairs of user and assistant messages `[user_message, assistant_message]`
temperature: parameter for control the level of creativity in AI-generated text.
By adjusting the `temperature`, you can influence the AI model's probability distribution, making the text more focused or diverse.
max_new_tokens: The maximum number of tokens we allow the model to generate as a response.
top_p: parameter for control the range of tokens considered by the AI model based on their cumulative probability.
repetition_penalty: parameter for penalizing tokens based on how frequently they occur in the text.
assisted: boolean parameter to enable/disable assisted generation with speculative decoding.
Yields:
Updated history and generation status.
"""
start = time.perf_counter()
# Construct the input message string for the model by concatenating the current system message and conversation history
# Tokenize the messages string
inputs = prepare_history_for_model(history)
input_length = inputs['input_ids'].shape[1]
# truncate input in case it is too long.
# TODO improve this
if input_length > 2000:
history = [history[-1]]
inputs = prepare_history_for_model(history)
input_length = inputs['input_ids'].shape[1]
prompt_char = "β–Œ"
history[-1][1] = prompt_char
yield history, "Status: Generating...", *([gr.update(interactive=False)] * 4)
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
# Create a stopping criteria to prevent the model from playing the role of the user aswell.
stop_str = ["\nUser:", "\nAssistant:", "\nRules:", "\nQuestion:"]
stopping_criteria = StoppingCriteriaList([SuffixCriteria(input_length, stop_str, tokenizer)])
# Prepare input for generate
generation_config = GenerationConfig(
max_new_tokens=max_new_tokens,
do_sample=temperature > 0.0,
temperature=temperature if temperature > 0.0 else 1.0,
repetition_penalty=repetition_penalty,
top_p=top_p,
eos_token_id=[tokenizer.eos_token_id],
pad_token_id=tokenizer.eos_token_id,
)
generate_kwargs = dict(
streamer=streamer,
generation_config=generation_config,
stopping_criteria=stopping_criteria,
) | inputs
if assisted:
target_generate = stateless_model.generate
generate_kwargs["assistant_model"] = asst_model
else:
target_generate = model.generate
t1 = Thread(target=target_generate, kwargs=generate_kwargs)
t1.start()
# Initialize an empty string to store the generated text.
partial_text = ""
for new_text in streamer:
partial_text += new_text
history[-1][1] = partial_text + prompt_char
for s in stop_str:
if (pos := partial_text.rfind(s)) != -1:
break
if pos != -1:
partial_text = partial_text[:pos]
break
elif any([is_partial_stop(partial_text, s) for s in stop_str]):
continue
yield history, "Status: Generating...", *([gr.update(interactive=False)] * 4)
history[-1][1] = partial_text
generation_time = time.perf_counter() - start
yield history, f'Generation time: {generation_time:.2f} sec', *([gr.update(interactive=True)] * 4)
#############################################################
# model.compile()
try:
demo.close()
except:
pass
EXAMPLES = [
["What is OpenVINO?"],
["Can you explain to me briefly what is Python programming language?"],
["Explain the plot of Cinderella in a sentence."],
["Write a Python function to perform binary search over a sorted list. Use markdown to write code"],
["Lily has a rubber ball that she drops from the top of a wall. The wall is 2 meters tall. How long will it take for the ball to reach the ground?"],
]
def add_user_text(message, history):
"""
Add user's message to chatbot history
Params:
message: current user message
history: conversation history
Returns:
Updated history, clears user message and status
"""
# Append current user message to history with a blank assistant message which will be generated by the model
history.append([message, None])
return ('', history)
def prepare_for_regenerate(history):
"""
Delete last assistant message to prepare for regeneration
Params:
history: conversation history
Returns:
updated history
"""
history[-1][1] = None
return history
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown('<h1 style="text-align: center;">Chat with Phi-3 on Meteor Lake iGPU</h1>')
chatbot = gr.Chatbot()
with gr.Row():
assisted = gr.Checkbox(value=False, label="Assisted Generation", scale=10)
msg = gr.Textbox(placeholder="Enter message here...", show_label=False, autofocus=True, scale=75)
status = gr.Textbox("Status: Idle", show_label=False, max_lines=1, scale=15)
with gr.Row():
submit = gr.Button("Submit", variant='primary')
regenerate = gr.Button("Regenerate")
clear = gr.Button("Clear")
with gr.Accordion("Advanced Options:", open=False):
with gr.Row():
with gr.Column():
temperature = gr.Slider(
label="Temperature",
value=0.0,
minimum=0.0,
maximum=1.0,
step=0.05,
interactive=True,
)
max_new_tokens = gr.Slider(
label="Max new tokens",
value=512,
minimum=0,
maximum=1024,
step=32,
interactive=True,
)
with gr.Column():
top_p = gr.Slider(
label="Top-p (nucleus sampling)",
value=1.0,
minimum=0.0,
maximum=1.0,
step=0.05,
interactive=True,
)
repetition_penalty = gr.Slider(
label="Repetition penalty",
value=1.0,
minimum=1.0,
maximum=2.0,
step=0.1,
interactive=True,
)
gr.Examples(
EXAMPLES, inputs=msg, label="Click on any example and press the 'Submit' button"
)
# Sets generate function to be triggered when the user submit a new message
gr.on(
triggers=[submit.click, msg.submit],
fn=add_user_text,
inputs=[msg, chatbot],
outputs=[msg, chatbot],
queue=False,
).then(
fn=generate,
inputs=[chatbot, temperature, max_new_tokens, top_p, repetition_penalty, assisted],
outputs=[chatbot, status, msg, submit, regenerate, clear],
concurrency_limit=1,
queue=True
)
regenerate.click(
fn=prepare_for_regenerate,
inputs=chatbot,
outputs=chatbot,
queue=True,
concurrency_limit=1
).then(
fn=generate,
inputs=[chatbot, temperature, max_new_tokens, top_p, repetition_penalty, assisted],
outputs=[chatbot, status, msg, submit, regenerate, clear],
concurrency_limit=1,
queue=True
)
clear.click(fn=lambda: (None, "Status: Idle"), inputs=None, outputs=[chatbot, status], queue=False)
demo.launch()