Spaces:
Runtime error
Runtime error
Create fastgpt_data.py
Browse files- fastgpt_data.py +143 -0
fastgpt_data.py
ADDED
@@ -0,0 +1,143 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import json
|
2 |
+
import os
|
3 |
+
import uuid
|
4 |
+
from typing import Optional, List, Dict
|
5 |
+
import requests
|
6 |
+
from chainlit import PersistedUser
|
7 |
+
from chainlit.data import BaseDataLayer
|
8 |
+
from chainlit.types import PageInfo, ThreadFilter, ThreadDict, Pagination, PaginatedResponse
|
9 |
+
from literalai.helper import utc_now
|
10 |
+
|
11 |
+
fastgpt_base_url = os.getenv("FASTGPT_BASE_URL")
|
12 |
+
share_id = os.getenv("FASTGPT_SHARE_ID")
|
13 |
+
now = utc_now()
|
14 |
+
user_cur_threads = []
|
15 |
+
thread_user_dict = {}
|
16 |
+
|
17 |
+
def change_type(user_type: str):
|
18 |
+
if user_type == 'AI':
|
19 |
+
return 'assistant_message'
|
20 |
+
if user_type == 'Human':
|
21 |
+
return 'user_message'
|
22 |
+
|
23 |
+
def get_app_info():
|
24 |
+
with requests.get(f"{fastgpt_base_url}/api/core/chat/outLink/init?chatId=&shareId={share_id}&outLinkUid=123456") as resp:
|
25 |
+
app = {}
|
26 |
+
if resp.status_code == 200:
|
27 |
+
res = json.loads(resp.content)
|
28 |
+
app = res.get('data').get('app')
|
29 |
+
appId = res.get('data').get('appId')
|
30 |
+
app['id'] = appId
|
31 |
+
return app
|
32 |
+
|
33 |
+
app_info = get_app_info()
|
34 |
+
app_id = app_info.get('id')
|
35 |
+
app_name = app_info.get('name')
|
36 |
+
welcome_text = app_info.get('chatConfig').get('welcomeText')
|
37 |
+
|
38 |
+
def getHistories(user_id):
|
39 |
+
histories = []
|
40 |
+
if user_id:
|
41 |
+
with requests.post(f"{fastgpt_base_url}/api/core/chat/getHistories", data={"shareId": share_id, "outLinkUid": user_id}) as resp:
|
42 |
+
if resp.status_code == 200:
|
43 |
+
res = json.loads(resp.content)
|
44 |
+
data = res["data"]
|
45 |
+
print(data)
|
46 |
+
histories = [{"id": item["chatId"], "name": item["title"], "createdAt": item["updateTime"], "userId": user_id, "userIdentifier": user_id} for item in data]
|
47 |
+
if user_cur_threads:
|
48 |
+
thread = next((t for t in user_cur_threads if t["userId"] == user_id), None)
|
49 |
+
if thread: # 确保 thread 不为 None
|
50 |
+
thread_id = thread.get("id")
|
51 |
+
if histories:
|
52 |
+
# 检查 thread 的 ID 是否已存在于 threads 中
|
53 |
+
if not any(t.get("id") == thread_id for t in histories):
|
54 |
+
histories.insert(0, thread)
|
55 |
+
else:
|
56 |
+
# 如果 threads 是空列表,则直接插入 thread
|
57 |
+
histories.insert(0, thread)
|
58 |
+
for item in histories:
|
59 |
+
thread_user_dict[item.get('id')] = item.get('userId')
|
60 |
+
return histories
|
61 |
+
|
62 |
+
class FastgptDataLayer(BaseDataLayer):
|
63 |
+
async def get_user(self, identifier: str):
|
64 |
+
print('get_user', identifier)
|
65 |
+
return PersistedUser(id=identifier, createdAt=now, identifier=identifier)
|
66 |
+
|
67 |
+
async def update_thread(self, thread_id: str, name: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict] = None, tags: Optional[List[str]] = None,):
|
68 |
+
print('---------update_thread----------', thread_id)
|
69 |
+
thread = next((t for t in user_cur_threads if t["userId"] == user_id), None)
|
70 |
+
if thread:
|
71 |
+
if thread_id:
|
72 |
+
thread["id"] = thread_id
|
73 |
+
if name:
|
74 |
+
thread["name"] = name
|
75 |
+
if user_id:
|
76 |
+
thread["userId"] = user_id
|
77 |
+
thread["userIdentifier"] = user_id
|
78 |
+
if metadata:
|
79 |
+
thread["metadata"] = metadata
|
80 |
+
if tags:
|
81 |
+
thread["tags"] = tags
|
82 |
+
thread["createdAt"] = utc_now()
|
83 |
+
else:
|
84 |
+
print('---------update_thread----------thread_id ', thread_id, name)
|
85 |
+
user_cur_threads.append({"id": thread_id, "name": name, "metadata": metadata, "tags": tags, "createdAt": utc_now(), "userId": user_id, "userIdentifier": user_id,})
|
86 |
+
|
87 |
+
async def get_thread_author(self, thread_id: str):
|
88 |
+
print('get_thread_author')
|
89 |
+
return thread_user_dict.get(thread_id, None)
|
90 |
+
|
91 |
+
async def list_threads(self, pagination: Pagination, filters: ThreadFilter) -> PaginatedResponse[ThreadDict]:
|
92 |
+
threads = []
|
93 |
+
if filters:
|
94 |
+
threads = getHistories(filters.userId)
|
95 |
+
search = filters.search if filters.search else ""
|
96 |
+
filtered_threads = [thread for thread in threads if search in thread.get('name', '')]
|
97 |
+
start = 0
|
98 |
+
if pagination.cursor:
|
99 |
+
for i, thread in enumerate(filtered_threads):
|
100 |
+
if thread["id"] == pagination.cursor: # Find the start index using pagination.cursor
|
101 |
+
start = i + 1
|
102 |
+
break
|
103 |
+
end = start + pagination.first
|
104 |
+
paginated_threads = filtered_threads[start:end] or []
|
105 |
+
has_next_page = len(paginated_threads) > end
|
106 |
+
start_cursor = paginated_threads[0]["id"] if paginated_threads else None
|
107 |
+
end_cursor = paginated_threads[-1]["id"] if paginated_threads else None
|
108 |
+
return PaginatedResponse(pageInfo=PageInfo(hasNextPage=has_next_page, startCursor=start_cursor, endCursor=end_cursor,), data=paginated_threads,)
|
109 |
+
|
110 |
+
async def get_thread(self, thread_id: str):
|
111 |
+
print('get_thread', thread_id)
|
112 |
+
user_id = thread_user_dict.get(thread_id, None)
|
113 |
+
thread = None
|
114 |
+
if user_id:
|
115 |
+
params = {'chatId': thread_id, 'shareId': share_id, 'outLinkUid': user_id,}
|
116 |
+
with requests.get(f"{fastgpt_base_url}/api/core/chat/outLink/init", params=params,) as resp:
|
117 |
+
if resp.status_code == 200:
|
118 |
+
res = json.loads(resp.content)
|
119 |
+
data = res["data"]
|
120 |
+
if data:
|
121 |
+
history = data['history']
|
122 |
+
files = []
|
123 |
+
texts = []
|
124 |
+
for item in history:
|
125 |
+
for entry in item['value']:
|
126 |
+
if entry.get('type') == 'text':
|
127 |
+
text = {"id": item["_id"], "threadId": thread_id, "name": item["obj"], "type": change_type(item["obj"]), "input": None, "createdAt": utc_now(), "output": entry.get('text').get('content'),}
|
128 |
+
texts.append(text)
|
129 |
+
if entry.get('type') == 'file':
|
130 |
+
file = {"id": str(uuid.UUID), "threadId": thread_id, "forId": item["_id"], "name": entry.get('file').get('name'), "type": entry.get('file').get('type'), "url": entry.get('file').get('url'), "display": "inline", "size": "medium"}
|
131 |
+
files.append(file)
|
132 |
+
thread = {"id": thread_id, "name": data.get("title", ''), "createdAt": utc_now(), "userId": "admin", "userIdentifier": "admin", "metadata": {"appId": data["appId"]}, "steps": texts, "elements": files,}
|
133 |
+
return thread
|
134 |
+
|
135 |
+
async def delete_thread(self, thread_id: str):
|
136 |
+
print('delete_thread')
|
137 |
+
thread = next((t for t in user_cur_threads if t["id"] == thread_id), None)
|
138 |
+
user_id = thread_user_dict.get(thread_id, None)
|
139 |
+
if thread:
|
140 |
+
user_cur_threads.remove(thread)
|
141 |
+
if user_id:
|
142 |
+
params = {'appId': app_id, 'chatId': thread_id, 'shareId': share_id, 'outLinkUid': user_id,}
|
143 |
+
requests.get(f"{fastgpt_base_url}/api/core/chat/delHistory", params=params)
|