""" |
update time: 2025.01.09 |
verson: 0.1.125 |
""" |
import json |
import re |
import time |
from datetime import datetime, timedelta |
from typing import List, Dict, Optional |
import aiohttp |
import requests |
debug = False |
last_request_time = 0 |
cache_duration = 14400 |
'''用于存储缓存的模型数据''' |
cached_models = { |
"object": "list", |
"data": [], |
"version": "0.1.125", |
"provider": "DeGPT", |
"name": "DeGPT", |
"default_locale": "en-US", |
"status": True, |
"time": 0 |
} |
'''基础请求地址''' |
base_addrs = [ |
"https://usa-chat.degpt.ai/api", |
"https://singapore-chat.degpt.ai/api", |
"https://korea-chat.degpt.ai/api" |
] |
'''基础域名''' |
base_url = 'https://singapore-chat.degpt.ai/api' |
'''基础模型''' |
base_model = "Pixtral-124B" |
MODEL_STATS: Dict[str, Dict] = {} |
def record_call(model_name: str, success: bool = True) -> None: |
""" |
记录模型调用情况 |
Args: |
model_name: 模型名称 |
success: 调用是否成功 |
""" |
global MODEL_STATS |
if model_name not in MODEL_STATS: |
MODEL_STATS[model_name] = {"calls": 0, "fails": 0, "last_fail": None} |
stats = MODEL_STATS[model_name] |
stats["calls"] += 1 |
if not success: |
stats["fails"] += 1 |
stats["last_fail"] = datetime.now() |
def get_auto_model(cooldown_seconds: int = 300) -> str: |
"""异步获取最优模型""" |
try: |
if not MODEL_STATS: |
get_models() |
best_model = None |
best_rate = -1.0 |
now = datetime.now() |
for name, stats in MODEL_STATS.items(): |
if stats.get("last_fail") and (now - stats["last_fail"]) < timedelta(seconds=cooldown_seconds): |
continue |
total_calls = stats["calls"] |
if total_calls > 0: |
success_rate = (total_calls - stats["fails"]) / total_calls |
if success_rate > best_rate: |
best_rate = success_rate |
best_model = name |
default_model = best_model or base_model |
if debug: |
print(f"选择模型: {default_model}") |
return default_model |
except Exception as e: |
if debug: |
print(f"模型选择错误: {e}") |
return base_model |
def reload_check(): |
"""检查并更新系统状态 |
1. 如果模型数据为空,更新模型数据 |
2. 测试当前base_url是否可用,不可用则切换 |
""" |
global base_url, cached_models |
try: |
if not cached_models["data"]: |
if debug: |
print("模型数据为空,开始更新...") |
get_models() |
test_payload = { |
"model": base_model, |
"messages": [{ |
"role": "user", |
"content": [{"type": "text", "text": "test"}] |
}], |
"temperature": 0.7, |
"max_tokens": 50, |
"top_p": 1.0, |
"frequency_penalty": 0.0, |
"project": "DecentralGPT", |
"stream": True |
} |
headers = { |
'Accept': '*/*', |
'Content-Type': 'application/json' |
} |
with aiohttp.ClientSession() as session: |
try: |
with session.post( |
f"{base_url}/v0/chat/completion/proxy", |
headers=headers, |
json=test_payload, |
timeout=5 |
) as response: |
if response.status == 200: |
if response.read(): |
if debug: |
print(f"当前URL可用: {base_url}") |
return |
except Exception as e: |
if debug: |
print(f"当前URL不可用: {e}") |
for url in base_addrs: |
if url == base_url: |
continue |
try: |
with session.post( |
f"{url}/v0/chat/completion/proxy", |
headers=headers, |
json=test_payload, |
timeout=5 |
) as response: |
if response.status == 200 and response.read(): |
base_url = url |
if debug: |
print(f"切换到新URL: {base_url}") |
return |
except Exception as e: |
if debug: |
print(f"URL {url} 测试失败: {e}") |
continue |
if debug: |
print("所有URL不可用,保持当前URL") |
except Exception as e: |
if debug: |
print(f"系统检查失败: {e}") |
def _fetch_and_update_models(): |
"""Thread-safe model fetching and cache updating""" |
global cached_models |
try: |
get_from_js() |
except Exception as e: |
print(f"{e}") |
try: |
get_alive_models() |
except Exception as e: |
print(f"{e}") |
def get_models(): |
"""model data retrieval with thread safety""" |
global cached_models, last_request_time |
current_time = time.time() |
if (current_time - last_request_time) > cache_duration: |
try: |
last_request_time = current_time |
_fetch_and_update_models() |
except Exception as e: |
print(f"{e}") |
return json.dumps(cached_models) |
def get_alive_models(): |
""" |
获取活的模型版本,并更新全局缓存 |
""" |
global cached_models, last_request_time |
url = 'https://www.degpt.ai/api/config' |
headers = {'Content-Type': 'application/json'} |
response = requests.get(url, headers=headers) |
if response.status_code == 200: |
try: |
data = response.json() |
default_models = data.get("default_models", "").split(",") |
timestamp_in_seconds = time.time() |
timestamp_in_milliseconds = int(timestamp_in_seconds * 1000) |
cached_models['version']=data['version'] |
cached_models['provider']=data['provider'] |
cached_models['name']=data['provider'] |
cached_models['time']=timestamp_in_milliseconds |
if default_models: |
existing_ids = {m.get('id') for m in cached_models["data"]} |
for model_id in default_models: |
record_call(model_id) |
if model_id and model_id not in existing_ids: |
model_data = { |
"id": model_id, |
"object": "model", |
"model": model_id, |
"created": timestamp_in_milliseconds, |
"owned_by": model_id.split("-")[0] if "-" in model_id else "unknown", |
"name": model_id, |
"description": '', |
"support": '', |
"tip": '' |
} |
cached_models["data"].append(model_data) |
last_request_time = timestamp_in_seconds |
except json.JSONDecodeError as e: |
print("JSON 解码错误:", e) |
else: |
print(f"请求失败,状态码: {response.status_code}") |
def parse_models_from_js(js_content: str) -> List[Dict]: |
"""解析JS内容中的模型信息""" |
try: |
pattern = r'models\s*:\s*\[([^\]]+)\]' |
match = re.search(pattern, js_content) |
if match: |
models_data = match.group(1) |
models_data = re.sub(r'(\w+):', r'"\1":', models_data) |
models_data = models_data.replace("'", '"') |
models_data = f"[{models_data}]" |
try: |
models = json.loads(models_data) |
return models |
except json.JSONDecodeError as e: |
return [] |
return [] |
except Exception as e: |
return [] |
def get_from_js(): |
global cached_models |
url = "https://www.degpt.ai/_app/immutable/chunks/index.e0d19999.js" |
response = requests.get(url) |
if response.status_code == 200: |
js_content = response.text |
models = parse_models_from_js(js_content) |
if models: |
if debug: |
print("get_from_js提取的模型列表:") |
existing_ids = {m.get('id') for m in cached_models["data"]} |
for model in models: |
model_id = model.get('model', '').strip() |
if debug: |
print(f"get_from_js 名称: {model.get('name', '')}") |
print(f"get_from_js 模型: {model.get('model', '')}") |
print(f"get_from_js 描述: {model.get('desc', '')}") |
record_call(model_id) |
if model_id and model_id not in existing_ids: |
model_data = { |
"id": model_id, |
"object": "model", |
"model": model_id, |
"created": int(time.time()), |
"owned_by": model_id.split("-")[0] if "-" in model_id else "unknown", |
"name": model.get('name', ''), |
"description": model.get('desc', ''), |
"support": model.get('support', 'text'), |
"tip": model.get('tip', '') |
} |
cached_models["data"].append(model_data) |
if debug: |
print(f"get_from_js添加新模型: {model_id}") |
def is_model_available(model_id: str, cooldown_seconds: int = 300) -> bool: |
""" |
判断模型是否在模型列表中且非最近失败的模型 |
Args: |
model_id: 模型ID,需要检查的模型标识符 |
cooldown_seconds: 失败冷却时间(秒),默认300秒 |
Returns: |
bool: 如果模型可用返回True,否则返回False |
Note: |
- 当MODEL_STATS为空时会自动调用get_models()更新数据 |
- 检查模型是否在冷却期内,如果在冷却期则返回False |
""" |
global MODEL_STATS |
if not MODEL_STATS: |
get_models() |
if model_id not in MODEL_STATS: |
return False |
stats = MODEL_STATS[model_id] |
if stats["last_fail"]: |
time_since_failure = datetime.now() - stats["last_fail"] |
if time_since_failure < timedelta(seconds=cooldown_seconds): |
return False |
return True |
def get_model_by_autoupdate(model_id: Optional[str] = None, cooldown_seconds: int = 300) -> Optional[str]: |
""" |
检查提供的model_id是否可用,如果不可用则返回成功率最高的模型 |
Args: |
model_id: 指定的模型ID,可选参数 |
cooldown_seconds: 失败冷却时间(秒),默认300秒 |
Returns: |
str | None: 返回可用的模型ID,如果没有可用模型则返回None |
Note: |
- 当MODEL_STATS为空时会自动调用get_models()更新数据 |
- 如果指定的model_id可用,则直接返回 |
- 如果指定的model_id不可用,则返回成功率最高的模型 |
""" |
global MODEL_STATS |
if not MODEL_STATS: |
get_models() |
if model_id and is_model_available(model_id, cooldown_seconds): |
return model_id |
return get_auto_model(cooldown_seconds=cooldown_seconds) |
def is_chatgpt_format(data): |
"""Check if the data is in the expected ChatGPT format""" |
try: |
if isinstance(data, str): |
try: |
data = json.loads(data) |
except json.JSONDecodeError: |
return False |
if isinstance(data, dict): |
if "choices" in data and isinstance(data["choices"], list) and len(data["choices"]) > 0: |
if "message" in data["choices"][0]: |
return True |
except Exception as e: |
print(f"Error checking ChatGPT format: {e}") |
return False |
def chat_completion_message( |
user_prompt, |
user_id: str = None, |
session_id: str = None, |
system_prompt="You are a helpful assistant.", |
model=base_model, |
project="DecentralGPT", stream=False, |
temperature=0.3, max_tokens=1024, top_p=0.5, |
frequency_penalty=0, presence_penalty=0): |
"""未来会增加回话隔离: 单人对话,单次会话""" |
messages = [ |
{"role": "system", "content": system_prompt}, |
{"role": "user", "content": user_prompt} |
] |
return chat_completion_messages(messages, user_id, session_id, model, project, stream, temperature, max_tokens, |
top_p, frequency_penalty, |
presence_penalty) |
def chat_completion_messages( |
messages, |
model=base_model, |
user_id: str = None, |
session_id: str = None, |
project="DecentralGPT", stream=False, temperature=0.3, max_tokens=1024, top_p=0.5, |
frequency_penalty=0, presence_penalty=0): |
if not model or model == "auto": |
model = get_auto_model() |
else: |
model = get_model_by_autoupdate(model) |
if debug: |
print(f"校准后的model: {model}") |
headers = { |
'sec-ch-ua-platform': '"macOS"', |
'Referer': 'https://www.degpt.ai/', |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36', |
'sec-ch-ua': 'Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', |
'DNT': '1', |
'Content-Type': 'application/json', |
'sec-ch-ua-mobile': '?0' |
} |
payload = { |
"model": model, |
"messages": messages, |
"project": project, |
"stream": stream, |
"temperature": temperature, |
"max_tokens": max_tokens, |
"top_p": top_p, |
"frequency_penalty": frequency_penalty, |
"presence_penalty": presence_penalty |
} |
return chat_completion(headers, payload) |
def chat_completion(headers, payload): |
"""处理用户请求并保留上下文""" |
try: |
url = f'{base_url}/v0/chat/completion/proxy' |
response = requests.post(url, headers=headers, json=payload) |
response.encoding = 'utf-8' |
response.raise_for_status() |
return response.json() |
except requests.exceptions.RequestException as e: |
print(f"请求失败: {e}") |
return "请求失败,请检查网络或参数配置。" |
except (KeyError, IndexError) as e: |
print(f"解析响应时出错: {e}") |
return "解析响应内容失败。" |
return {} |