planRun / degpt.py
sanbo
update sth. at 2025-01-11 21:22:14
3a64abf
raw
history blame
21.7 kB
"""
update time: 2025.01.09
verson: 0.1.125
"""
import json
import re
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import aiohttp
import requests
debug = False
# 全局变量
last_request_time = 0 # 上次请求的时间戳
cache_duration = 14400 # 缓存有效期,单位:秒 (4小时)
'''用于存储缓存的模型数据'''
cached_models = {
"object": "list",
"data": [],
"version": "0.1.125",
"provider": "DeGPT",
"name": "DeGPT",
"default_locale": "en-US",
"status": True,
"time": 0
}
'''基础请求地址'''
base_addrs = [
# "America"
"https://usa-chat.degpt.ai/api",
# "Singapore"
"https://singapore-chat.degpt.ai/api",
# "Korea"
"https://korea-chat.degpt.ai/api"
]
'''基础域名'''
base_url = 'https://singapore-chat.degpt.ai/api'
'''基础模型'''
base_model = "Pixtral-124B"
# 全局变量:存储所有模型的统计信息
# 格式:{model_name: {"calls": 调用次数, "fails": 失败次数, "last_fail": 最后失败时间}}
MODEL_STATS: Dict[str, Dict] = {}
def record_call(model_name: str, success: bool = True) -> None:
"""
记录模型调用情况
Args:
model_name: 模型名称
success: 调用是否成功
"""
global MODEL_STATS
if model_name not in MODEL_STATS:
MODEL_STATS[model_name] = {"calls": 0, "fails": 0, "last_fail": None}
stats = MODEL_STATS[model_name]
stats["calls"] += 1
if not success:
stats["fails"] += 1
stats["last_fail"] = datetime.now()
def get_auto_model(cooldown_seconds: int = 300) -> str:
"""异步获取最优模型"""
try:
if not MODEL_STATS:
get_models()
best_model = None
best_rate = -1.0
now = datetime.now()
for name, stats in MODEL_STATS.items():
if stats.get("last_fail") and (now - stats["last_fail"]) < timedelta(seconds=cooldown_seconds):
continue
total_calls = stats["calls"]
if total_calls > 0:
success_rate = (total_calls - stats["fails"]) / total_calls
if success_rate > best_rate:
best_rate = success_rate
best_model = name
default_model = best_model or base_model
if debug:
print(f"选择模型: {default_model}")
return default_model
except Exception as e:
if debug:
print(f"模型选择错误: {e}")
return base_model
def reload_check():
"""检查并更新系统状态
1. 如果模型数据为空,更新模型数据
2. 测试当前base_url是否可用,不可用则切换
"""
global base_url, cached_models
try:
# 检查模型数据
if not cached_models["data"]:
if debug:
print("模型数据为空,开始更新...")
get_models()
# 测试用例 - 平衡效率和功能验证
test_payload = {
"model": base_model,
"messages": [{
"role": "user",
"content": [{"type": "text", "text": "test"}]
}],
"temperature": 0.7,
"max_tokens": 50,
"top_p": 1.0,
"frequency_penalty": 0.0,
"project": "DecentralGPT",
"stream": True
}
headers = {
'Accept': '*/*',
'Content-Type': 'application/json'
}
with aiohttp.ClientSession() as session:
# 测试当前URL
try:
with session.post(
f"{base_url}/v0/chat/completion/proxy",
headers=headers,
json=test_payload,
timeout=5 # 较短的超时时间提高效率
) as response:
if response.status == 200:
# 验证响应格式
if response.read():
if debug:
print(f"当前URL可用: {base_url}")
return
except Exception as e:
if debug:
print(f"当前URL不可用: {e}")
# 测试其他URL
for url in base_addrs:
if url == base_url:
continue
try:
with session.post(
f"{url}/v0/chat/completion/proxy",
headers=headers,
json=test_payload,
timeout=5
) as response:
if response.status == 200 and response.read():
base_url = url
if debug:
print(f"切换到新URL: {base_url}")
return
except Exception as e:
if debug:
print(f"URL {url} 测试失败: {e}")
continue
if debug:
print("所有URL不可用,保持当前URL")
except Exception as e:
if debug:
print(f"系统检查失败: {e}")
def _fetch_and_update_models():
"""Thread-safe model fetching and cache updating"""
global cached_models
try:
get_from_js()
except Exception as e:
print(f"{e}")
try:
get_alive_models()
except Exception as e:
print(f"{e}")
def get_models():
"""model data retrieval with thread safety"""
global cached_models, last_request_time
current_time = time.time()
if (current_time - last_request_time) > cache_duration:
try:
# Update timestamp before awaiting to prevent concurrent updates
last_request_time = current_time
_fetch_and_update_models()
except Exception as e:
print(f"{e}")
return json.dumps(cached_models)
def get_alive_models():
"""
获取活的模型版本,并更新全局缓存
"""
global cached_models, last_request_time
# 发送 GET 请求
url = 'https://www.degpt.ai/api/config'
headers = {'Content-Type': 'application/json'}
response = requests.get(url, headers=headers)
# 检查响应是否成功
if response.status_code == 200:
try:
data = response.json() # 解析响应 JSON 数据
default_models = data.get("default_models", "").split(",") # 获取默认模型并分割成列表
# 获取当前时间戳(以秒为单位)
timestamp_in_seconds = time.time()
# 转换为毫秒(乘以 1000)
timestamp_in_milliseconds = int(timestamp_in_seconds * 1000)
## config
cached_models['version']=data['version']
cached_models['provider']=data['provider']
cached_models['name']=data['provider']
cached_models['time']=timestamp_in_milliseconds
if default_models:
# print("\n提取的模型列表:")
existing_ids = {m.get('id') for m in cached_models["data"]}
for model_id in default_models:
record_call(model_id)
if model_id and model_id not in existing_ids:
model_data = {
"id": model_id,
"object": "model",
"model": model_id,
"created": timestamp_in_milliseconds,
"owned_by": model_id.split("-")[0] if "-" in model_id else "unknown",
"name": model_id,
"description": '',
"support": '',
"tip": ''
}
cached_models["data"].append(model_data)
# 更新全局缓存
last_request_time = timestamp_in_seconds # 更新缓存时间戳
# print("获取新的模型数据:", models)
except json.JSONDecodeError as e:
print("JSON 解码错误:", e)
else:
print(f"请求失败,状态码: {response.status_code}")
def parse_models_from_js(js_content: str) -> List[Dict]:
"""解析JS内容中的模型信息"""
try:
pattern = r'models\s*:\s*\[([^\]]+)\]'
match = re.search(pattern, js_content)
if match:
models_data = match.group(1)
models_data = re.sub(r'(\w+):', r'"\1":', models_data)
models_data = models_data.replace("'", '"')
models_data = f"[{models_data}]"
try:
models = json.loads(models_data)
return models
except json.JSONDecodeError as e:
return []
return []
except Exception as e:
return []
# def get_model_names_from_js(url="https://www.degpt.ai/", timeout: int = 60):
# global cached_models
# try:
# with async_playwright() as p:
# browser = p.chromium.launch(
# headless=True,
# args=['--no-sandbox']
# )
# context = browser.new_context(
# viewport={'width': 1920, 'height': 1080},
# user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/119.0.0.0 Safari/537.36'
# )
# page = context.new_page()
#
# def handle_response(response):
# try:
# if response.request.resource_type == "script":
# content_type = response.headers.get('content-type', '').lower()
# if 'javascript' in content_type:
# js_content = response.text()
# if 'models' in js_content:
# # print(f"找到包含模型信息的JS文件: {response.url}")
# models = parse_models_from_js(js_content)
# if models:
# # print("\n提取的模型列表:")
# existing_ids = {m.get('id') for m in cached_models["data"]}
# for model in models:
# model_id = model.get('model', '').strip()
# # print(f"- 名称: {model.get('name', '')}")
# # print(f" 模型: {model.get('model', '')}")
# # print(f" 描述: {model.get('desc', '')}")
# record_call(model_id)
# if model_id and model_id not in existing_ids:
# model_data = {
# "id": model_id,
# "object": "model",
# "model": model_id,
# "created": int(time.time()),
# "owned_by": model_id.split("-")[0] if "-" in model_id else "unknown",
# "name": model.get('name', ''),
# "description": model.get('desc', ''),
# "support": model.get('support', 'text'),
# "tip": model.get('tip', '')
# }
# cached_models["data"].append(model_data)
# # print(f"添加新模型: {model_id}")
# except Exception as e:
# print(f"处理响应时发生错误: {str(e)}")
# logging.error(f"Response处理异常: {e}", exc_info=True)
#
# page.on("response", handle_response)
#
# try:
# page.goto(url, timeout=timeout * 1000, wait_until='networkidle')
# page.wait_for_timeout(5000)
# except Exception as e:
# print(f"页面加载错误: {str(e)}")
# logging.error(f"页面加载异常: {e}", exc_info=True)
# finally:
# browser.close()
# except Exception as e:
# print(f"提取过程发生错误: {str(e)}")
# get_from_js()
# raise
# def parse_models_and_urls_from_js(js_content: str) -> Dict:
# """从JS内容中解析模型和URL信息"""
# result = {"models": [], "urls": []}
#
# try:
# # 提取模型信息
# model_pattern = r'models\s*:\s*\[([^\]]+)\]'
# model_match = re.search(model_pattern, js_content)
#
# if model_match:
# models_data = model_match.group(1)
# models_data = re.sub(r'(\w+):', r'"\1":', models_data)
# models_data = models_data.replace("'", '"')
# models_data = f"[{models_data}]"
#
# try:
# models = json.loads(models_data)
# result["models"] = models
# except json.JSONDecodeError as e:
# print(f"Error decoding models JSON: {e}")
#
# # 提取URL信息
# url_pattern = r'\{name\s*:\s*"([^"]+)"\s*,\s*url\s*:\s*"([^"]+)"'
# url_matches = re.findall(url_pattern, js_content)
#
# if url_matches:
# urls = [{"name": name, "url": url} for name, url in url_matches]
# result["urls"] = urls
#
# return result
#
# except Exception as e:
# print(f"Error parsing JS content: {e}")
# return result
def get_from_js():
global cached_models
# 获取 JavaScript 文件内容
# url = "https://www.degpt.ai/_app/immutable/chunks/index.83d92b06.js"
# url = "https://www.degpt.ai/_app/immutable/chunks/index.4aecf75a.js"
url = "https://www.degpt.ai/_app/immutable/chunks/index.e0d19999.js"
response = requests.get(url)
if response.status_code == 200:
js_content = response.text
models = parse_models_from_js(js_content)
# xx = parse_models_and_urls_from_js(js_content)
if models:
if debug:
print("get_from_js提取的模型列表:")
existing_ids = {m.get('id') for m in cached_models["data"]}
for model in models:
model_id = model.get('model', '').strip()
if debug:
print(f"get_from_js 名称: {model.get('name', '')}")
print(f"get_from_js 模型: {model.get('model', '')}")
print(f"get_from_js 描述: {model.get('desc', '')}")
record_call(model_id)
if model_id and model_id not in existing_ids:
model_data = {
"id": model_id,
"object": "model",
"model": model_id,
"created": int(time.time()),
"owned_by": model_id.split("-")[0] if "-" in model_id else "unknown",
"name": model.get('name', ''),
"description": model.get('desc', ''),
"support": model.get('support', 'text'),
"tip": model.get('tip', '')
}
cached_models["data"].append(model_data)
if debug:
print(f"get_from_js添加新模型: {model_id}")
def is_model_available(model_id: str, cooldown_seconds: int = 300) -> bool:
"""
判断模型是否在模型列表中且非最近失败的模型
Args:
model_id: 模型ID,需要检查的模型标识符
cooldown_seconds: 失败冷却时间(秒),默认300秒
Returns:
bool: 如果模型可用返回True,否则返回False
Note:
- 当MODEL_STATS为空时会自动调用get_models()更新数据
- 检查模型是否在冷却期内,如果在冷却期则返回False
"""
global MODEL_STATS
# 如果MODEL_STATS为空,加载模型数据
if not MODEL_STATS:
get_models()
# 检查模型是否在统计信息中
if model_id not in MODEL_STATS:
return False
# 检查是否在冷却期内
stats = MODEL_STATS[model_id]
if stats["last_fail"]:
time_since_failure = datetime.now() - stats["last_fail"]
if time_since_failure < timedelta(seconds=cooldown_seconds):
return False
return True
def get_model_by_autoupdate(model_id: Optional[str] = None, cooldown_seconds: int = 300) -> Optional[str]:
"""
检查提供的model_id是否可用,如果不可用则返回成功率最高的模型
Args:
model_id: 指定的模型ID,可选参数
cooldown_seconds: 失败冷却时间(秒),默认300秒
Returns:
str | None: 返回可用的模型ID,如果没有可用模型则返回None
Note:
- 当MODEL_STATS为空时会自动调用get_models()更新数据
- 如果指定的model_id可用,则直接返回
- 如果指定的model_id不可用,则返回成功率最高的模型
"""
global MODEL_STATS
# 如果MODEL_STATS为空,加载模型数据
if not MODEL_STATS:
get_models()
# 如果提供了model_id且可用,直接返回
if model_id and is_model_available(model_id, cooldown_seconds):
return model_id
# 否则返回成功率最高的可用模型
return get_auto_model(cooldown_seconds=cooldown_seconds)
def is_chatgpt_format(data):
"""Check if the data is in the expected ChatGPT format"""
try:
# If the data is a string, try to parse it as JSON
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError:
return False # If the string can't be parsed, it's not in the expected format
# Now check if data is a dictionary and contains the necessary structure
if isinstance(data, dict):
# Ensure 'choices' is a list and the first item has a 'message' field
if "choices" in data and isinstance(data["choices"], list) and len(data["choices"]) > 0:
if "message" in data["choices"][0]:
return True
except Exception as e:
print(f"Error checking ChatGPT format: {e}")
return False
def chat_completion_message(
user_prompt,
user_id: str = None,
session_id: str = None,
system_prompt="You are a helpful assistant.",
model=base_model,
project="DecentralGPT", stream=False,
temperature=0.3, max_tokens=1024, top_p=0.5,
frequency_penalty=0, presence_penalty=0):
"""未来会增加回话隔离: 单人对话,单次会话"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
return chat_completion_messages(messages, user_id, session_id, model, project, stream, temperature, max_tokens,
top_p, frequency_penalty,
presence_penalty)
def chat_completion_messages(
messages,
model=base_model,
user_id: str = None,
session_id: str = None,
project="DecentralGPT", stream=False, temperature=0.3, max_tokens=1024, top_p=0.5,
frequency_penalty=0, presence_penalty=0):
# 确保model有效
if not model or model == "auto":
model = get_auto_model()
else:
model = get_model_by_autoupdate(model)
if debug:
print(f"校准后的model: {model}")
headers = {
'sec-ch-ua-platform': '"macOS"',
'Referer': 'https://www.degpt.ai/',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'sec-ch-ua': 'Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'DNT': '1',
'Content-Type': 'application/json',
'sec-ch-ua-mobile': '?0'
}
payload = {
# make sure ok
"model": model,
"messages": messages,
"project": project,
"stream": stream,
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
"frequency_penalty": frequency_penalty,
"presence_penalty": presence_penalty
}
# print(json.dumps(headers, indent=4))
# print(json.dumps(payload, indent=4))
return chat_completion(headers, payload)
def chat_completion(headers, payload):
"""处理用户请求并保留上下文"""
try:
url = f'{base_url}/v0/chat/completion/proxy'
response = requests.post(url, headers=headers, json=payload)
response.encoding = 'utf-8'
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return "请求失败,请检查网络或参数配置。"
except (KeyError, IndexError) as e:
print(f"解析响应时出错: {e}")
return "解析响应内容失败。"
return {}
# if __name__ == '__main__':
# print("get_models: ",get_models())
# print("cached_models:",cached_models)
# print("base_url: ",base_url)
# print("MODEL_STATS:",MODEL_STATS)