|
import json |
|
import gradio as gr |
|
import openai |
|
import os |
|
import sys |
|
|
|
|
|
my_api_key = "" |
|
initial_prompt = "You are a helpful assistant." |
|
|
|
if my_api_key == "": |
|
my_api_key = os.environ.get('my_api_key') |
|
|
|
if my_api_key == "empty": |
|
print("Please give a api key!") |
|
sys.exit(1) |
|
|
|
def parse_text(text): |
|
lines = text.split("\n") |
|
for i,line in enumerate(lines): |
|
if "```" in line: |
|
items = line.split('`') |
|
if items[-1]: |
|
lines[i] = f'<pre><code class="{items[-1]}">' |
|
else: |
|
lines[i] = f'</code></pre>' |
|
else: |
|
if i>0: |
|
line = line.replace("<", "<") |
|
line = line.replace(">", ">") |
|
lines[i] = '<br/>'+line.replace(" ", " ") |
|
return "".join(lines) |
|
|
|
def get_response(system, context, myKey, raw = False): |
|
openai.api_key = myKey |
|
response = openai.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=[system, *context], |
|
) |
|
openai.api_key = "" |
|
if raw: |
|
return response |
|
else: |
|
statistics = f'本次对话Tokens用量【{response["usage"]["total_tokens"]} / 4096】 ( 提问+上文 {response["usage"]["prompt_tokens"]},回答 {response["usage"]["completion_tokens"]} )' |
|
message = response["choices"][0]["message"]["content"] |
|
|
|
message_with_stats = f'{message}\n\n================\n\n{statistics}' |
|
|
|
|
|
return message, parse_text(message_with_stats) |
|
|
|
def predict(chatbot, input_sentence, system, context, myKey): |
|
if len(input_sentence) == 0: |
|
return [] |
|
context.append({"role": "user", "content": f"{input_sentence}"}) |
|
|
|
try: |
|
message, message_with_stats = get_response(system, context, myKey) |
|
except: |
|
chatbot.append((input_sentence, "请求失败,请检查API-key是否正确。")) |
|
return chatbot, context |
|
|
|
context.append({"role": "assistant", "content": message}) |
|
|
|
chatbot.append((input_sentence, message_with_stats)) |
|
|
|
return chatbot, context |
|
|
|
def retry(chatbot, system, context, myKey): |
|
if len(context) == 0: |
|
return [], [] |
|
try: |
|
message, message_with_stats = get_response(system, context[:-1], myKey) |
|
except: |
|
chatbot.append(("重试请求", "请求失败,请检查API-key是否正确。")) |
|
return chatbot, context |
|
context[-1] = {"role": "assistant", "content": message} |
|
|
|
chatbot[-1] = (context[-2]["content"], message_with_stats) |
|
return chatbot, context |
|
|
|
def delete_last_conversation(chatbot, context): |
|
if len(context) == 0: |
|
return [], [] |
|
chatbot = chatbot[:-1] |
|
context = context[:-2] |
|
return chatbot, context |
|
|
|
def reduce_token(chatbot, system, context, myKey): |
|
context.append({"role": "user", "content": "请帮我总结一下上述对话的内容,实现减少tokens的同时,保证对话的质量。在总结中不要加入这一句话。"}) |
|
|
|
response = get_response(system, context, myKey, raw=True) |
|
|
|
statistics = f'本次对话Tokens用量【{response["usage"]["completion_tokens"]+12+12+8} / 4096】' |
|
optmz_str = parse_text( f'好的,我们之前聊了:{response["choices"][0]["message"]["content"]}\n\n================\n\n{statistics}' ) |
|
chatbot.append(("请帮我总结一下上述对话的内容,实现减少tokens的同时,保证对话的质量。", optmz_str)) |
|
|
|
context = [] |
|
context.append({"role": "user", "content": "我们之前聊了什么?"}) |
|
context.append({"role": "assistant", "content": f'我们之前聊了:{response["choices"][0]["message"]["content"]}'}) |
|
return chatbot, context |
|
|
|
def save_chat_history(filepath, system, context): |
|
if filepath == "": |
|
return |
|
history = {"system": system, "context": context} |
|
with open(f"{filepath}.json", "w") as f: |
|
json.dump(history, f) |
|
|
|
def load_chat_history(fileobj): |
|
with open(fileobj.name, "r") as f: |
|
history = json.load(f) |
|
context = history["context"] |
|
chathistory = [] |
|
for i in range(0, len(context), 2): |
|
chathistory.append((parse_text(context[i]["content"]), parse_text(context[i+1]["content"]))) |
|
return chathistory , history["system"], context, history["system"]["content"] |
|
|
|
def get_history_names(): |
|
with open("history.json", "r") as f: |
|
history = json.load(f) |
|
return list(history.keys()) |
|
|
|
|
|
def reset_state(): |
|
return [], [] |
|
|
|
def update_system(new_system_prompt): |
|
return {"role": "system", "content": new_system_prompt} |
|
|
|
def set_apikey(new_api_key, myKey): |
|
old_api_key = myKey |
|
try: |
|
get_response(update_system(initial_prompt), [{"role": "user", "content": "test"}], new_api_key) |
|
except: |
|
return "无效的api-key", myKey |
|
encryption_str = "验证成功,api-key已做遮挡处理:" + new_api_key[:4] + "..." + new_api_key[-4:] |
|
return encryption_str, new_api_key |
|
|
|
|
|
with gr.Blocks() as demo: |
|
keyTxt = gr.Textbox(show_label=True, placeholder=f"在这里输入你的API-key...", value=my_api_key, label="API Key").style(container=True) |
|
chatbot = gr.Chatbot().style(color_map=("#1D51EE", "#585A5B")) |
|
context = gr.State([]) |
|
systemPrompt = gr.State(update_system(initial_prompt)) |
|
myKey = gr.State(my_api_key) |
|
topic = gr.State("未命名对话历史记录") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=12): |
|
txt = gr.Textbox(show_label=False, placeholder="在这里输入").style(container=False) |
|
with gr.Column(min_width=50, scale=1): |
|
submitBtn = gr.Button("🚀", variant="primary") |
|
with gr.Row(): |
|
emptyBtn = gr.Button("🧹 新的对话") |
|
retryBtn = gr.Button("🔄 重新生成") |
|
delLastBtn = gr.Button("🗑️ 删除上条对话") |
|
reduceTokenBtn = gr.Button("♻️ 优化Tokens") |
|
newSystemPrompt = gr.Textbox(show_label=True, placeholder=f"在这里输入新的System Prompt...", label="更改 System prompt").style(container=True) |
|
systemPromptDisplay = gr.Textbox(show_label=True, value=initial_prompt, interactive=False, label="目前的 System prompt").style(container=True) |
|
with gr.Accordion(label="保存/加载对话历史记录(在文本框中输入文件名,点击“保存对话”按钮,历史记录文件会被存储到本地)", open=False): |
|
with gr.Column(): |
|
with gr.Row(): |
|
with gr.Column(scale=6): |
|
saveFileName = gr.Textbox(show_label=True, placeholder=f"在这里输入保存的文件名...", label="保存对话", value="对话历史记录").style(container=True) |
|
with gr.Column(scale=1): |
|
saveBtn = gr.Button("💾 保存对话") |
|
uploadBtn = gr.UploadButton("📂 读取对话", file_count="single", file_types=["json"]) |
|
|
|
txt.submit(predict, [chatbot, txt, systemPrompt, context, myKey], [chatbot, context], show_progress=True) |
|
txt.submit(lambda :"", None, txt) |
|
submitBtn.click(predict, [chatbot, txt, systemPrompt, context, myKey], [chatbot, context], show_progress=True) |
|
submitBtn.click(lambda :"", None, txt) |
|
emptyBtn.click(reset_state, outputs=[chatbot, context]) |
|
newSystemPrompt.submit(update_system, newSystemPrompt, systemPrompt) |
|
newSystemPrompt.submit(lambda x: x, newSystemPrompt, systemPromptDisplay) |
|
newSystemPrompt.submit(lambda :"", None, newSystemPrompt) |
|
retryBtn.click(retry, [chatbot, systemPrompt, context, myKey], [chatbot, context], show_progress=True) |
|
delLastBtn.click(delete_last_conversation, [chatbot, context], [chatbot, context], show_progress=True) |
|
reduceTokenBtn.click(reduce_token, [chatbot, systemPrompt, context, myKey], [chatbot, context], show_progress=True) |
|
keyTxt.submit(set_apikey, [keyTxt, myKey], [keyTxt, myKey], show_progress=True) |
|
uploadBtn.upload(load_chat_history, uploadBtn, [chatbot, systemPrompt, context, systemPromptDisplay], show_progress=True) |
|
saveBtn.click(save_chat_history, [saveFileName, systemPrompt, context], None, show_progress=True) |
|
|
|
|
|
demo.launch() |