File size: 5,828 Bytes
68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf 68515e1 a487faf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import os
import time
import multiprocessing
from typing import Dict, Any, List
from fastapi import FastAPI, Request, HTTPException
import uvicorn
import tiktoken
from json.decoder import JSONDecodeError
import random
import string
app = FastAPI(
title="ones",
description="High-performance API service",
version="1.0.0|2025.1.6"
)
debug = False
class APIServer:
"""High-performance API server implementation"""
def __init__(self, app: FastAPI):
self.app = app
self.encoding = tiktoken.get_encoding("cl100k_base")
self._setup_routes()
def _setup_routes(self) -> None:
"""Initialize API routes"""
routes = self._get_routes()
for path in routes:
self._register_route(path)
@self.app.get("/")
async def health_check() -> str:
return "你好"
def _get_routes(self) -> List[str]:
"""Get configured API routes"""
default_path = "/v1/chat/completions"
replace_chat = os.getenv("REPLACE_CHAT", "")
prefix_chat = os.getenv("PREFIX_CHAT", "")
append_chat = os.getenv("APPEND_CHAT", "")
if replace_chat:
return [path.strip() for path in replace_chat.split(",") if path.strip()]
routes = []
if prefix_chat:
routes.extend(f"{prefix.rstrip('/')}{default_path}"
for prefix in prefix_chat.split(","))
return routes
if append_chat:
append_paths = [path.strip() for path in append_chat.split(",") if path.strip()]
routes = [default_path] + append_paths
return routes
return [default_path]
def _register_route(self, path: str) -> None:
"""Register a single API route"""
async def chat_endpoint(request: Request) -> Dict[str, Any]:
try:
headers = dict(request.headers)
data = await request.json()
if debug:
print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}")
return await self._generate_response(headers, data)
except JSONDecodeError as e:
if debug:
print(f"JSON decode error: {e}")
raise HTTPException(status_code=400, detail="Invalid JSON format") from e
except Exception as e:
if debug:
print(f"Request processing error: {e}")
raise HTTPException(status_code=500, detail="Internal server error") from e
self.app.post(path)(chat_endpoint)
def _calculate_tokens(self, text: str) -> int:
"""Calculate token count for text"""
return len(self.encoding.encode(text))
def _generate_id(self, letters: int = 4, numbers: int = 6) -> str:
"""Generate unique chat completion ID"""
letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters))
numbers_str = ''.join(random.choices(string.digits, k=numbers))
return f"chatcmpl-{letters_str}{numbers_str}"
async def _generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate API response"""
try:
result = "This is a test result."
prompt_tokens = self._calculate_tokens(str(data))
completion_tokens = self._calculate_tokens(result)
total_tokens = prompt_tokens + completion_tokens
return {
"id": self._generate_id(),
"object": "chat.completion",
"created": int(time.time()),
"model": data.get("model", "gpt-3.5-turbo"),
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
},
"choices": [{
"message": {
"role": "assistant",
"content": result
},
"finish_reason": "stop",
"index": 0
}]
}
except Exception as e:
if debug:
print(f"Response generation error: {e}")
raise HTTPException(status_code=500, detail=str(e)) from e
def _get_workers_count(self) -> int:
"""Calculate optimal worker count"""
try:
cpu_cores = multiprocessing.cpu_count()
recommended_workers = (2 * cpu_cores) + 1
return min(max(4, recommended_workers), 8)
except Exception as e:
if debug:
print(f"Worker count calculation failed: {e}, using default 4")
return 4
def get_server_config(self, host: str = "0.0.0.0", port: int = 7860) -> uvicorn.Config:
"""Get server configuration"""
workers = self._get_workers_count()
if debug:
print(f"Configuring server with {workers} workers")
return uvicorn.Config(
app=self.app,
host=host,
port=port,
workers=workers,
loop="uvloop",
limit_concurrency=1000,
timeout_keep_alive=30,
access_log=True,
log_level="info",
http="httptools"
)
def run(self, host: str = "0.0.0.0", port: int = 7860) -> None:
"""Run the API server"""
config = self.get_server_config(host, port)
server = uvicorn.Server(config)
server.run()
def create_server() -> APIServer:
"""Factory function to create server instance"""
return APIServer(app)
if __name__ == "__main__":
port = int(os.getenv("PORT", "7860"))
server = create_server()
server.run(port=port) |