askai / app.py
nanoppa's picture
Update app.py
27e0439 verified
from flask import Flask, request, jsonify, Response
import requests
import uuid
import json
import time
import os
import re
import logging
from itertools import cycle
# 配置日志输出(可调整级别为DEBUG以获得更详细日志)
logging.basicConfig(level=logging.INFO)
_COOKIES = os.getenv("COOKIES", "")
API_KEY = os.getenv("API_KEY", "linux.do")
app = Flask(__name__)
COOKIES = _COOKIES.split(',')
iterator = cycle(COOKIES)
cookie_index = 0
def get_cookie():
return next(iterator)
@app.before_request
def check_api_key():
key = request.headers.get("Authorization")
if key != "Bearer " + API_KEY:
logging.warning("Unauthorized access attempt with key: %s", key)
return jsonify({"success": False, "message": "Unauthorized: Invalid API key"}), 403
@app.route('/v1/models', methods=['GET'])
def get_models():
logging.info("Received /v1/models request")
_cookie = get_cookie()
logging.info(_cookie[:50])
headers = {"Content-Type": "application/json", "Cookie": _cookie}
response = requests.get('https://chat.akash.network/api/models', headers=headers)
models_data = response.json()
print(models_data)
current_timestamp = int(time.time())
converted_data = {
"object": "list",
"data": [
{
"id": model["id"],
"object": "model",
"created": current_timestamp,
"owned_by": "openai" if "Meta" in model["id"] else "third_party",
"permissions": [],
"root": model["id"],
"parent": None,
"capabilities": {
"temperature": model.get("temperature"),
"top_p": model.get("top_p")
},
"name": model.get("name"),
"description": model.get("description"),
"available": model.get("available")
}
for model in models_data
]
}
logging.info("Response for /v1/models: %s", json.dumps(converted_data, ensure_ascii=False))
return jsonify(converted_data)
def generate_stream(akash_response, chat_id, model):
"""
解析 Akash 接口的流式响应数据,并生成符合 OpenAI API 流返回格式的 chunk 数据。
"""
for line in akash_response.iter_lines():
if not line:
continue
try:
line_str = line.decode('utf-8').strip()
msg_type, msg_data = line_str.split(':', 1)
if msg_type == '0':
token = msg_data.strip()
# 去掉两边的引号并处理转义字符
if token.startswith('"') and token.endswith('"'):
token = token[1:-1].replace('\\"', '"')
token = token.replace("\\n", "\n")
chunk = {
"id": f"chatcmpl-{chat_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"delta": {"content": token},
"index": 0,
"finish_reason": None
}]
}
logging.debug("Streaming chunk: %s", json.dumps(chunk, ensure_ascii=False))
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
elif msg_type in ['e', 'd']:
chunk = {
"id": f"chatcmpl-{chat_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"delta": {},
"index": 0,
"finish_reason": "stop"
}]
}
logging.debug("Streaming finish chunk: %s", json.dumps(chunk, ensure_ascii=False))
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
break
except Exception as ex:
logging.error("Error processing stream line: %s", ex)
continue
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
try:
data = request.get_json()
logging.info("Received /v1/chat/completions request: %s", json.dumps(data, ensure_ascii=False))
chat_id = str(uuid.uuid4()).replace('-', '')[:16]
model = data.get('model', "DeepSeek-R1")
akash_data = {
"id": chat_id,
"messages": data.get('messages', []),
"model": model,
"system": data.get('system_message', "You are a helpful assistant."),
"temperature": data.get('temperature', 0.6),
"topP": data.get('top_p', 0.95)
}
_cookie = get_cookie()
logging.info(_cookie[:50])
headers = {"Content-Type": "application/json", "Cookie": _cookie}
# 默认 stream 模式开启,但针对 AkashGen 模型关闭流式响应
stream_flag = True
if model == "AkashGen":
stream_flag = False
else:
stream_flag = data.get('stream',False)
logging.info("streamflag: %s", stream_flag)
akash_response = requests.post(
'https://chat.akash.network/api/chat',
json=akash_data,
headers=headers,
stream=stream_flag
)
logging.info("Akash API response status: %s", akash_response.status_code)
if stream_flag:
return Response(
generate_stream(akash_response, chat_id, model),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
else:
if model != "AkashGen":
text_matches = re.findall(r'0:"(.*?)"', akash_response.text)
parsed_text = "".join(text_matches)
response_payload = {
"object": "chat.completion",
"created": int(time.time() * 1000),
"model": model,
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": parsed_text},
"finish_reason": "stop"
}]
}
logging.info("Non-stream response payload: %s", json.dumps(response_payload, ensure_ascii=False))
return Response(
json.dumps(response_payload, ensure_ascii=False),
status=akash_response.status_code,
mimetype='application/json'
)
else:
match = re.search(r"jobId='([^']+)'", akash_response.text)
if match:
job_id = match.group(1)
logging.info("AkashGen jobId: %s", job_id)
while True:
try:
img_response = requests.get(
f'https://chat.akash.network/api/image-status?ids={job_id}',
headers=headers
)
img_data = img_response.json()
if img_data[0]["status"] == "completed":
response_payload = {
"object": "chat.completion",
"created": int(time.time() * 1000),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": f"{img_data[0]['result']}"
},
"finish_reason": "stop"
}]
}
logging.info("AkashGen completed response payload: %s", json.dumps(response_payload, ensure_ascii=False))
return Response(
json.dumps(response_payload, ensure_ascii=False),
status=akash_response.status_code,
mimetype='application/json'
)
else:
logging.info("图片生成中,jobId: %s", job_id)
except Exception as e:
logging.error("请求图片状态异常: %s", e)
time.sleep(5)
else:
logging.error("未能解析到 jobId")
return jsonify({"error": "当前官方服务异常"}), 500
except Exception as e:
logging.exception("chat_completions error:")
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5200)