import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, StoppingCriteria, StoppingCriteriaList | |
import time | |
import numpy as np | |
from torch.nn import functional as F | |
import os | |
token_key = os.environ.get("HF_ACCESS_TOKEN") | |
# if torch.cuda.is_available(): | |
# m = AutoModelForCausalLM.from_pretrained("stabilityai/stablelm-tuned-alpha-7b",use_auth_token=token_key, torch_dtype=torch.float16).cuda() | |
# tok = AutoTokenizer.from_pretrained("stabilityai/stablelm-tuned-alpha-7b",use_auth_token=token_key) | |
# else: | |
# m = AutoModelForCausalLM.from_pretrained("stabilityai/stablelm-tuned-alpha-7b",use_auth_token=token_key, torch_dtype=torch.float16) | |
# tok = AutoTokenizer.from_pretrained("stabilityai/stablelm-tuned-alpha-7b",use_auth_token=token_key) | |
# generator = pipeline('text-generation', model=m, tokenizer=tok, device=0) | |
# start_message = """<|SYSTEM|># StableAssistant | |
# - StableAssistant is A helpful and harmless Open Source AI Language Model developed by Stability and CarperAI. | |
# - StableAssistant is excited to be able to help the user, but will refuse to do anything that could be considered harmful to the user. | |
# - StableAssistant is more than just an information source, StableAssistant is also able to write poetry, short stories, and make jokes. | |
# - StableAssistant will refuse to participate in anything that could harm a human.""" | |
# class StopOnTokens(StoppingCriteria): | |
# def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: | |
# stop_ids = [50278, 50279, 50277, 1, 0] | |
# for stop_id in stop_ids: | |
# if input_ids[0][-1] == stop_id: | |
# return True | |
# return False | |
# def contrastive_generate(text, bad_text): | |
# with torch.no_grad(): | |
# if torch.cuda_is_available(): | |
# tokens = tok(text, return_tensors="pt")['input_ids'].cuda()[:,:4096-1024] | |
# bad_tokens = tok(bad_text, return_tensors="pt")['input_ids'].cuda()[:,:4096-1024] | |
# else: | |
# tokens = tok(text, return_tensors="pt")['input_ids'][:,:4096-1024] | |
# bad_tokens = tok(bad_text, return_tensors="pt")['input_ids'][:,:4096-1024] | |
# history = None | |
# bad_history = None | |
# curr_output = list() | |
# for i in range(1024): | |
# out = m(tokens, past_key_values=history, use_cache=True) | |
# logits = out.logits | |
# history = out.past_key_values | |
# bad_out = m(bad_tokens, past_key_values=bad_history, use_cache=True) | |
# bad_logits = bad_out.logits | |
# bad_history = bad_out.past_key_values | |
# probs = F.softmax(logits.float(), dim=-1)[0][-1].cpu() | |
# bad_probs = F.softmax(bad_logits.float(), dim=-1)[0][-1].cpu() | |
# logits = torch.log(probs) | |
# bad_logits = torch.log(bad_probs) | |
# logits[probs > 0.1] = logits[probs > 0.1] - bad_logits[probs > 0.1] | |
# probs = F.softmax(logits) | |
# out = int(torch.multinomial(probs, 1)) | |
# if out in [50278, 50279, 50277, 1, 0]: | |
# break | |
# else: | |
# curr_output.append(out) | |
# out = np.array([out]) | |
# tokens = torch.from_numpy(np.array([out])).to( | |
# tokens.device) | |
# bad_tokens = torch.from_numpy(np.array([out])).to( | |
# tokens.device) | |
# return tok.decode(curr_output) | |
# def generate(text, bad_text=None): | |
# stop = StopOnTokens() | |
# result = generator(text, max_new_tokens=1024, num_return_sequences=1, num_beams=1, do_sample=True, temperature=1.0, top_p=0.95, top_k=1000, stopping_criteria=StoppingCriteriaList([stop])) | |
# return result[0]["generated_text"].replace(text, "") | |
# def user(user_message, history): | |
# return "", history + [[user_message, ""]] | |
# def bot(history, curr_system_message): | |
# messages = curr_system_message + "".join(["".join(["<|USER|>"+item[0], "<|ASSISTANT|>"+item[1]]) for item in history]) | |
# output = generate(messages) | |
# history[-1][1] = output | |
# time.sleep(1) | |
# return history | |
# def system_update(msg): | |
# global curr_system_message | |
# curr_system_message = msg | |
# with gr.Blocks() as demo: | |
# gr.Markdown("###StableLM-tuned-Alpha-7B Chat") | |
# with gr.Row(): | |
# with gr.Column(): | |
# chatbot = gr.Chatbot([]) | |
# clear = gr.Button("Clear") | |
# with gr.Column(): | |
# system_msg = start_message#gr.Textbox(start_message, label="System Message", interactive=True) | |
# msg = gr.Textbox(label="Chat Message") | |
# msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
# bot, [chatbot, system_msg], chatbot | |
# ) | |
# system_msg.change(system_update, system_msg, None, queue=False) | |
# clear.click(lambda: None, None, chatbot, queue=False) | |
# demo.launch(share=True) |