akash / app.py
nanoppa's picture
Update app.py
7e4060c verified
from flask import Flask, request, jsonify, Response
import requests
import uuid
import json
import time
import os
import re
_COOKIES = os.environ.get("COOKIES", "")
API_KEY = os.getenv("API_KEY", "linux.do")
app = Flask(__name__)
@app.before_request
def check_api_key():
key = request.headers.get("Authorization")
if key != "Bearer "+API_KEY:
return jsonify({"success": False, "message": "Unauthorized: Invalid API key"}), 403
@app.route('/v1/models', methods=['GET'])
def get_models():
# 构建请求头
headers = {
"Content-Type": "application/json",
"Cookie": _COOKIES
}
# 发送请求到Akash API
response = requests.get(
'https://chat.akash.network/api/models',
headers=headers
)
models_data = response.json()
current_timestamp = int(time.time())
converted_data = {
"object": "list",
"data": [
{
"id": model["id"],
"object": "model",
"created": current_timestamp,
"owned_by": model["name"].lower().replace(" ", "_")
}
for model in models_data["models"]
]
}
return converted_data
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
print("start")
try:
# 获取OpenAI格式的请求数据
data = request.json
# 生成唯一ID
chat_id = str(uuid.uuid4()).replace('-', '')[:16]
# 构建Akash格式的请求数据
akash_data = {
"id": chat_id,
"messages": data.get('messages', []),
"model": data.get('model', "DeepSeek-R1"),
"system": data.get('system_message', "You are a helpful assistant."),
"temperature": data.get('temperature', 0.6),
"topP": data.get('top_p', 0.95)
}
# 构建请求头
headers = {
"Content-Type": "application/json",
"Cookie": _COOKIES
}
_stream=data.get('stream', True)
# 发送请求到Akash API
response = requests.post(
'https://chat.akash.network/api/chat',
json=akash_data,
headers=headers,
stream=_stream
)
def generate():
content_buffer = ""
for line in response.iter_lines():
if not line:
continue
try:
# 解析行数据,格式为 "type:json_data"
line_str = line.decode('utf-8')
msg_type, msg_data = line_str.split(':', 1)
# 处理内容类型的消息
if msg_type == '0':
# 只去掉两边的双引号
if msg_data.startswith('"') and msg_data.endswith('"'):
msg_data = msg_data.replace('\\"', '"')
msg_data = msg_data[1:-1]
msg_data = msg_data.replace("\\n", "\n")
content_buffer += msg_data
# 构建 OpenAI 格式的响应块
chunk = {
"id": f"chatcmpl-{chat_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": data.get('model', "DeepSeek-R1"),
"choices": [{
"delta": {"content": msg_data},
"index": 0,
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
# 处理结束消息
elif msg_type in ['e', 'd']:
chunk = {
"id": f"chatcmpl-{chat_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": data.get('model', "DeepSeek-R1"),
"choices": [{
"delta": {},
"index": 0,
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
break
except Exception as e:
print(f"Error processing line: {e}")
continue
if _stream == True:
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'text/event-stream'
}
)
else:
text_matches = re.findall(r'0:"(.*?)"', response.text)
parsed_text = "".join(text_matches)
return Response(
json.dumps({
"object": "chat.completion",
"created": int(time.time() * 1000),
"model": data.get('model', "DeepSeek-R1"),
"choices": [
{
"index": 0,
"message": {
"role": "user",
"content": parsed_text
},
"finish_reason": "stop"
}
]
},ensure_ascii=False),
status=response.status_code,
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json'
}
)
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5200)