chatpregdrug / app.py
leonsimon23's picture
Create app.py
252a207 verified
import json
import os
from typing import Optional, Dict
import aiohttp
import chainlit as cl
import chainlit.data as cl_data
import requests
from fastgpt_data import FastgptDataLayer, now, share_id, app_name, welcome_text
fastgpt_base_url = os.getenv("FASTGPT_BASE_URL")
fastgpt_api_key = os.getenv("FASTGPT_API_KEY")
fastgpt_api_detail = os.getenv("FASTGPT_API_DETAIL", False)
cl_data._data_layer = FastgptDataLayer()
cl.config.ui.name = app_name
def download_logo():
local_filename = "./public/favicon.svg"
directory = os.path.dirname(local_filename)
os.makedirs(directory, exist_ok=True)
# Streaming, so we can iterate over the response.
with requests.get(f"{fastgpt_base_url}/icon/logo.svg", stream=True) as r:
r.raise_for_status() # Check if the request was successful
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
f.write(chunk)
download_logo()
@cl.on_chat_start
async def chat_start():
if welcome_text:
# elements = [cl.Text(content=welcomeText, display="inline")]
await cl.Message(content=welcome_text).send()
@cl.on_message
async def handle_message(message: cl.Message):
msg = cl.Message(content="")
url = f"{fastgpt_base_url}/api/v1/chat/completions"
print('message.thread_id', message.thread_id)
headers = {
"Authorization": f"Bearer {fastgpt_api_key}",
"Content-Type": "application/json"
}
data = {
"messages": [{"role": "user", "content": message.content}],
"variables": {"cTime": now},
"responseChatItemId": message.uiltin.id,
"shareId": share_id,
"chatId": message.thread_id,
"appType": "advanced",
"outLinkUid": cl.context.session.user.identifier,
"detail": fastgpt_api_detail,
"stream": True
}
async for data in fetch_sse(url, headers=headers, data=json.dumps(data), detail=fastgpt_api_detail):
delta = data['choices'][0]['delta']
if delta:
await msg.stream_token(delta['content'])
await msg.send()
@cl.header_auth_callback
def header_auth_callback(headers: Dict) -> Optional[cl.User]:
print(headers)
# 创建一个md5 hash对象
md5_hash = hashlib.md5()
user_agent_bytes = headers.get('user-agent').encode('utf-8')
# 更新这个hash对象的内容
md5_hash.update(user_agent_bytes)
# 获取md5哈希值的十六进制表示形式
md5_hex_digest = md5_hash.hexdigest()
out_link_uid = md5_hex_digest
print("MD5加密后的结果:", md5_hex_digest)
return cl.User(identifier=out_link_uid, display_name="visitor")
@cl.on_chat_resume
async def on_chat_resume():
pass
async def fetch_sse(url, headers, data, detail):
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=data) as response:
async for line in response.content:
if line: # 过滤掉空行
data = line.decode('utf-8').rstrip('\n\r')
# print(f"Received: {data}")
# 检查是否为数据行,并且是我们感兴趣的事件类型
if detail:
if data.startswith('event:'):
event_type = data.split(':', 1)[1].strip() # 提取事件类型
elif data.startswith('data:') and event_type == 'flowNodeStatus':
data = data.split(':', 1)[1].strip()
flowNodeStatus = json.loads(data)
current_step = cl.context.current_step
current_step.name = flowNodeStatus['name']
elif data.startswith('data:') and event_type == 'answer':
data = data.split(':', 1)[1].strip() # 提取数据内容
# 如果数据包含换行符,可能需要进一步处理(这取决于你的具体需求)
# 这里我们简单地打印出来
if data != '[DONE]':
yield json.loads(data)
else:
if data.startswith('data:'):
data = data.split(':', 1)[1].strip() # 提取数据内容
# 如果数据包含换行符,可能需要进一步处理(这取决于你的具体需求)
# 这里我们简单地打印出来
if data != '[DONE]':
yield json.loads(data)