planRun / more_core_bksss.py
sanbo
update sth. at 2025-01-04 23:21:29
a487faf
raw
history blame
6.66 kB
import os
import asyncio
import time
import multiprocessing
from typing import Dict, Any, List, Optional
from fastapi import FastAPI, Request, HTTPException
import uvicorn
import tiktoken
from json.decoder import JSONDecodeError
# Initialize FastAPI application
app = FastAPI(
title="ones",
description="High-performance API service",
version="1.0.0|2025.1.6"
)
# debug
debug = False
class ServerConfig:
"""Server configuration handler with dynamic worker calculation"""
@staticmethod
def get_workers_count() -> int:
"""
Calculate optimal number of workers
Default: 4, Maximum: 8
Formula: min(max(4, (2 * CPU cores) + 1), 8)
"""
try:
cpu_cores = multiprocessing.cpu_count()
recommended_workers = (2 * cpu_cores) + 1
return min(max(4, recommended_workers), 8)
except Exception as e:
if debug:
print(f"Worker count calculation failed: {e}, using default 4")
return 4
@classmethod
def get_uvicorn_config(cls,
app: FastAPI,
host: str = "0.0.0.0",
port: int = 7860) -> uvicorn.Config:
"""Get optimized Uvicorn configuration"""
workers = cls.get_workers_count()
if debug:
print(f"Configuring server with {workers} workers")
return uvicorn.Config(
app=app,
host=host,
port=port,
workers=workers,
loop="uvloop",
limit_concurrency=1000,
timeout_keep_alive=30,
access_log=True,
log_level="info",
http="httptools"
)
class TokenProcessor:
"""Token processing handler"""
def __init__(self):
self.encoding = tiktoken.get_encoding("cl100k_base")
async def calculate_tokens(self, text: str) -> int:
"""Calculate tokens for given text"""
return len(self.encoding.encode(text))
class ChatGPTSimulator:
"""ChatGPT response simulator"""
def __init__(self):
self.token_processor = TokenProcessor()
@staticmethod
def generate_id(letters: int = 4, numbers: int = 6) -> str:
"""Generate random chat completion ID"""
import random
import string
letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters))
numbers_str = ''.join(random.choices(string.digits, k=numbers))
return f"chatcmpl-{letters_str}{numbers_str}"
async def generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate simulated ChatGPT response"""
try:
result = "This is a test result."
prompt_tokens = await self.token_processor.calculate_tokens(str(data))
completion_tokens = await self.token_processor.calculate_tokens(result)
total_tokens = prompt_tokens + completion_tokens
response_data = {
"id": self.generate_id(),
"object": "chat.completion",
"created": int(time.time()),
"model": data.get("model", "gpt-3.5-turbo"),
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
},
"choices": [
{
"message": {
"role": "assistant",
"content": result
},
"finish_reason": "stop",
"index": 0
}
]
}
return response_data
except Exception as e:
if debug:
print(f"Response generation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
class RouteManager:
"""Route management handler"""
def __init__(self, app: FastAPI):
self.app = app
self.simulator = ChatGPTSimulator()
def setup_routes(self):
"""Setup API routes"""
routes = self._get_dynamic_routes()
if debug:
print(f"Registering routes: {routes}")
for path in routes:
self._register_route(path)
def _get_dynamic_routes(self) -> List[str]:
"""Get dynamic routes based on environment variables"""
default_path = "/v1/chat/completions"
replace_chat = os.getenv("REPLACE_CHAT", "")
prefix_chat = os.getenv("PREFIX_CHAT", "")
append_chat = os.getenv("APPEND_CHAT", "")
if replace_chat:
return [path.strip() for path in replace_chat.split(",") if path.strip()]
routes = []
if prefix_chat:
routes.extend(f"{prefix.rstrip('/')}{default_path}"
for prefix in prefix_chat.split(","))
return routes
if append_chat:
append_paths = [path.strip() for path in append_chat.split(",") if path.strip()]
routes = [default_path] + append_paths
return routes or [default_path]
def _register_route(self, path: str):
"""Register a single route"""
@self.app.post(path)
async def chat_endpoint(request: Request) -> Dict[str, Any]:
try:
headers = dict(request.headers)
data = await request.json()
if debug:
print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}")
return await self.simulator.generate_response(headers, data)
except JSONDecodeError as e:
if debug:
print(f"JSON decode error: {e}")
raise HTTPException(status_code=400, detail="Invalid JSON format")
except Exception as e:
if debug:
print(f"Request processing error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/")
async def health_check() -> str:
"""Health check endpoint"""
return "Service is running..."
def run_server(host: str = "0.0.0.0", port: int = 7860):
"""Run server with optimized configuration"""
config = ServerConfig.get_uvicorn_config(
app=app,
host=host,
port=port
)
server = uvicorn.Server(config)
server.run()
if __name__ == "__main__":
route_manager = RouteManager(app)
route_manager.setup_routes()
port = int(os.getenv("PORT", "7860"))
run_server(port=port)