Spaces:
Runtime error
Runtime error
import json | |
import os | |
from typing import Optional, Dict | |
import aiohttp | |
import chainlit as cl | |
import chainlit.data as cl_data | |
import requests | |
from fastgpt_data import FastgptDataLayer, now, share_id, app_name, welcome_text | |
fastgpt_base_url = os.getenv("FASTGPT_BASE_URL") | |
fastgpt_api_key = os.getenv("FASTGPT_API_KEY") | |
fastgpt_api_detail = os.getenv("FASTGPT_API_DETAIL", False) | |
cl_data._data_layer = FastgptDataLayer() | |
cl.config.ui.name = app_name | |
def download_logo(): | |
local_filename = "./public/favicon.svg" | |
directory = os.path.dirname(local_filename) | |
os.makedirs(directory, exist_ok=True) | |
# Streaming, so we can iterate over the response. | |
with requests.get(f"{fastgpt_base_url}/icon/logo.svg", stream=True) as r: | |
r.raise_for_status() # Check if the request was successful | |
with open(local_filename, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
# If you have chunk encoded response uncomment if | |
# and set chunk_size parameter to None. | |
f.write(chunk) | |
download_logo() | |
async def chat_start(): | |
if welcome_text: | |
# elements = [cl.Text(content=welcomeText, display="inline")] | |
await cl.Message(content=welcome_text).send() | |
async def handle_message(message: cl.Message): | |
msg = cl.Message(content="") | |
url = f"{fastgpt_base_url}/api/v1/chat/completions" | |
print('message.thread_id', message.thread_id) | |
headers = { | |
"Authorization": f"Bearer {fastgpt_api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"messages": [{"role": "user", "content": message.content}], | |
"variables": {"cTime": now}, | |
"responseChatItemId": message.uiltin.id, | |
"shareId": share_id, | |
"chatId": message.thread_id, | |
"appType": "advanced", | |
"outLinkUid": cl.context.session.user.identifier, | |
"detail": fastgpt_api_detail, | |
"stream": True | |
} | |
async for data in fetch_sse(url, headers=headers, data=json.dumps(data), detail=fastgpt_api_detail): | |
delta = data['choices'][0]['delta'] | |
if delta: | |
await msg.stream_token(delta['content']) | |
await msg.send() | |
def header_auth_callback(headers: Dict) -> Optional[cl.User]: | |
print(headers) | |
# 创建一个md5 hash对象 | |
md5_hash = hashlib.md5() | |
user_agent_bytes = headers.get('user-agent').encode('utf-8') | |
# 更新这个hash对象的内容 | |
md5_hash.update(user_agent_bytes) | |
# 获取md5哈希值的十六进制表示形式 | |
md5_hex_digest = md5_hash.hexdigest() | |
out_link_uid = md5_hex_digest | |
print("MD5加密后的结果:", md5_hex_digest) | |
return cl.User(identifier=out_link_uid, display_name="visitor") | |
async def on_chat_resume(): | |
pass | |
async def fetch_sse(url, headers, data, detail): | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, headers=headers, data=data) as response: | |
async for line in response.content: | |
if line: # 过滤掉空行 | |
data = line.decode('utf-8').rstrip('\n\r') | |
# print(f"Received: {data}") | |
# 检查是否为数据行,并且是我们感兴趣的事件类型 | |
if detail: | |
if data.startswith('event:'): | |
event_type = data.split(':', 1)[1].strip() # 提取事件类型 | |
elif data.startswith('data:') and event_type == 'flowNodeStatus': | |
data = data.split(':', 1)[1].strip() | |
flowNodeStatus = json.loads(data) | |
current_step = cl.context.current_step | |
current_step.name = flowNodeStatus['name'] | |
elif data.startswith('data:') and event_type == 'answer': | |
data = data.split(':', 1)[1].strip() # 提取数据内容 | |
# 如果数据包含换行符,可能需要进一步处理(这取决于你的具体需求) | |
# 这里我们简单地打印出来 | |
if data != '[DONE]': | |
yield json.loads(data) | |
else: | |
if data.startswith('data:'): | |
data = data.split(':', 1)[1].strip() # 提取数据内容 | |
# 如果数据包含换行符,可能需要进一步处理(这取决于你的具体需求) | |
# 这里我们简单地打印出来 | |
if data != '[DONE]': | |
yield json.loads(data) | |