|
import gradio as gr |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, StoppingCriteria, StoppingCriteriaList |
|
import time |
|
import numpy as np |
|
from torch.nn import functional as F |
|
import os |
|
|
|
print(f"Starting to load the model to memory") |
|
m = AutoModelForCausalLM.from_pretrained( |
|
"stabilityai/stablelm-tuned-alpha-7b", torch_dtype=torch.float16).cuda() |
|
tok = AutoTokenizer.from_pretrained("stabilityai/stablelm-tuned-alpha-7b") |
|
generator = pipeline('text-generation', model=m, tokenizer=tok, device=0) |
|
print(f"Sucessfully loaded the model to the memory") |
|
|
|
start_message = """<|SYSTEM|># StableAssistant |
|
- StableAssistant is A helpful and harmless Open Source AI Language Model developed by Stability and CarperAI. |
|
- StableAssistant is excited to be able to help the user, but will refuse to do anything that could be considered harmful to the user. |
|
- StableAssistant is more than just an information source, StableAssistant is also able to write poetry, short stories, and make jokes. |
|
- StableAssistant will refuse to participate in anything that could harm a human.""" |
|
|
|
|
|
class StopOnTokens(StoppingCriteria): |
|
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: |
|
stop_ids = [50278, 50279, 50277, 1, 0] |
|
for stop_id in stop_ids: |
|
if input_ids[0][-1] == stop_id: |
|
return True |
|
return False |
|
|
|
|
|
def contrastive_generate(text, bad_text): |
|
with torch.no_grad(): |
|
tokens = tok(text, return_tensors="pt")[ |
|
'input_ids'].cuda()[:, :4096-1024] |
|
bad_tokens = tok(bad_text, return_tensors="pt")[ |
|
'input_ids'].cuda()[:, :4096-1024] |
|
history = None |
|
bad_history = None |
|
curr_output = list() |
|
for i in range(1024): |
|
out = m(tokens, past_key_values=history, use_cache=True) |
|
logits = out.logits |
|
history = out.past_key_values |
|
bad_out = m(bad_tokens, past_key_values=bad_history, |
|
use_cache=True) |
|
bad_logits = bad_out.logits |
|
bad_history = bad_out.past_key_values |
|
probs = F.softmax(logits.float(), dim=-1)[0][-1].cpu() |
|
bad_probs = F.softmax(bad_logits.float(), dim=-1)[0][-1].cpu() |
|
logits = torch.log(probs) |
|
bad_logits = torch.log(bad_probs) |
|
logits[probs > 0.1] = logits[probs > 0.1] - bad_logits[probs > 0.1] |
|
probs = F.softmax(logits) |
|
out = int(torch.multinomial(probs, 1)) |
|
if out in [50278, 50279, 50277, 1, 0]: |
|
break |
|
else: |
|
curr_output.append(out) |
|
out = np.array([out]) |
|
tokens = torch.from_numpy(np.array([out])).to( |
|
tokens.device) |
|
bad_tokens = torch.from_numpy(np.array([out])).to( |
|
tokens.device) |
|
return tok.decode(curr_output) |
|
|
|
|
|
def generate(text, bad_text=None): |
|
stop = StopOnTokens() |
|
result = generator(text, max_new_tokens=1024, num_return_sequences=1, num_beams=1, do_sample=True, |
|
temperature=1.0, top_p=0.95, top_k=1000, stopping_criteria=StoppingCriteriaList([stop])) |
|
return result[0]["generated_text"].replace(text, "") |
|
|
|
|
|
def user(user_message, history): |
|
history = history + [[user_message, ""]] |
|
return "", history, history |
|
|
|
|
|
def bot(history, curr_system_message): |
|
messages = curr_system_message + \ |
|
"".join(["".join(["<|USER|>"+item[0], "<|ASSISTANT|>"+item[1]]) |
|
for item in history]) |
|
output = generate(messages) |
|
history[-1][1] = output |
|
time.sleep(1) |
|
return history, history |
|
|
|
|
|
with gr.Blocks() as demo: |
|
history = gr.State([]) |
|
gr.Markdown("## StableLM-Tuned-Alpha-7b Chat") |
|
gr.HTML('''<center><a href="https://huggingface.co/spaces/stabilityai/stablelm-tuned-alpha-chat?duplicate=true"><img src="https://bit.ly/3gLdBN6" alt="Duplicate Space"></a>Duplicate the Space to skip the queue and run in a private space</center>''') |
|
chatbot = gr.Chatbot().style(height=500) |
|
with gr.Row(): |
|
with gr.Column(scale=0.70): |
|
msg = gr.Textbox(label="", placeholder="Chat Message Box") |
|
with gr.Column(scale=0.30, min_width=0): |
|
with gr.Row(): |
|
submit = gr.Button("Submit") |
|
clear = gr.Button("Clear") |
|
system_msg = gr.Textbox( |
|
start_message, label="System Message", interactive=False, visible=False) |
|
|
|
msg.submit(fn=user, inputs=[msg, history], outputs=[msg, chatbot, history], queue=False).then( |
|
fn=bot, inputs=[chatbot, system_msg], outputs=[chatbot, history], queue=True) |
|
submit.click(fn=user, inputs=[msg, history], outputs=[msg, chatbot, history], queue=False).then( |
|
fn=bot, inputs=[chatbot, system_msg], outputs=[chatbot, history], queue=True) |
|
clear.click(lambda: [None, []], None, [chatbot, history], queue=False) |
|
demo.queue(concurrency_count=5) |
|
demo.launch() |
|
|