import json import os from typing import Optional, Dict import aiohttp import chainlit as cl import chainlit.data as cl_data import requests from fastgpt_data import FastgptDataLayer, now, share_id, app_name, welcome_text fastgpt_base_url = os.getenv("FASTGPT_BASE_URL") fastgpt_api_key = os.getenv("FASTGPT_API_KEY") fastgpt_api_detail = os.getenv("FASTGPT_API_DETAIL", False) cl_data._data_layer = FastgptDataLayer() cl.config.ui.name = app_name def download_logo(): local_filename = "./public/favicon.svg" directory = os.path.dirname(local_filename) os.makedirs(directory, exist_ok=True) # Streaming, so we can iterate over the response. with requests.get(f"{fastgpt_base_url}/icon/logo.svg", stream=True) as r: r.raise_for_status() # Check if the request was successful with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. f.write(chunk) download_logo() @cl.on_chat_start async def chat_start(): if welcome_text: # elements = [cl.Text(content=welcomeText, display="inline")] await cl.Message(content=welcome_text).send() @cl.on_message async def handle_message(message: cl.Message): msg = cl.Message(content="") url = f"{fastgpt_base_url}/api/v1/chat/completions" print('message.thread_id', message.thread_id) headers = { "Authorization": f"Bearer {fastgpt_api_key}", "Content-Type": "application/json" } data = { "messages": [{"role": "user", "content": message.content}], "variables": {"cTime": now}, "responseChatItemId": message.uiltin.id, "shareId": share_id, "chatId": message.thread_id, "appType": "advanced", "outLinkUid": cl.context.session.user.identifier, "detail": fastgpt_api_detail, "stream": True } async for data in fetch_sse(url, headers=headers, data=json.dumps(data), detail=fastgpt_api_detail): delta = data['choices'][0]['delta'] if delta: await msg.stream_token(delta['content']) await msg.send() @cl.header_auth_callback def header_auth_callback(headers: Dict) -> Optional[cl.User]: print(headers) # 创建一个md5 hash对象 md5_hash = hashlib.md5() user_agent_bytes = headers.get('user-agent').encode('utf-8') # 更新这个hash对象的内容 md5_hash.update(user_agent_bytes) # 获取md5哈希值的十六进制表示形式 md5_hex_digest = md5_hash.hexdigest() out_link_uid = md5_hex_digest print("MD5加密后的结果:", md5_hex_digest) return cl.User(identifier=out_link_uid, display_name="visitor") @cl.on_chat_resume async def on_chat_resume(): pass async def fetch_sse(url, headers, data, detail): async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, data=data) as response: async for line in response.content: if line: # 过滤掉空行 data = line.decode('utf-8').rstrip('\n\r') # print(f"Received: {data}") # 检查是否为数据行,并且是我们感兴趣的事件类型 if detail: if data.startswith('event:'): event_type = data.split(':', 1)[1].strip() # 提取事件类型 elif data.startswith('data:') and event_type == 'flowNodeStatus': data = data.split(':', 1)[1].strip() flowNodeStatus = json.loads(data) current_step = cl.context.current_step current_step.name = flowNodeStatus['name'] elif data.startswith('data:') and event_type == 'answer': data = data.split(':', 1)[1].strip() # 提取数据内容 # 如果数据包含换行符,可能需要进一步处理(这取决于你的具体需求) # 这里我们简单地打印出来 if data != '[DONE]': yield json.loads(data) else: if data.startswith('data:'): data = data.split(':', 1)[1].strip() # 提取数据内容 # 如果数据包含换行符,可能需要进一步处理(这取决于你的具体需求) # 这里我们简单地打印出来 if data != '[DONE]': yield json.loads(data)