|
import os |
|
import asyncio |
|
import time |
|
import multiprocessing |
|
from typing import Dict, Any, List, Optional |
|
from fastapi import FastAPI, Request, HTTPException |
|
import uvicorn |
|
import tiktoken |
|
from json.decoder import JSONDecodeError |
|
|
|
|
|
app = FastAPI( |
|
title="ones", |
|
description="High-performance API service", |
|
version="1.0.0|2025.1.6" |
|
) |
|
|
|
debug = False |
|
|
|
|
|
class ServerConfig: |
|
"""Server configuration handler with dynamic worker calculation""" |
|
|
|
@staticmethod |
|
def get_workers_count() -> int: |
|
""" |
|
Calculate optimal number of workers |
|
Default: 4, Maximum: 8 |
|
Formula: min(max(4, (2 * CPU cores) + 1), 8) |
|
""" |
|
try: |
|
cpu_cores = multiprocessing.cpu_count() |
|
recommended_workers = (2 * cpu_cores) + 1 |
|
return min(max(4, recommended_workers), 8) |
|
except Exception as e: |
|
if debug: |
|
print(f"Worker count calculation failed: {e}, using default 4") |
|
return 4 |
|
|
|
@classmethod |
|
def get_uvicorn_config(cls, |
|
app: FastAPI, |
|
host: str = "0.0.0.0", |
|
port: int = 7860) -> uvicorn.Config: |
|
"""Get optimized Uvicorn configuration""" |
|
workers = cls.get_workers_count() |
|
if debug: |
|
print(f"Configuring server with {workers} workers") |
|
|
|
return uvicorn.Config( |
|
app=app, |
|
host=host, |
|
port=port, |
|
workers=workers, |
|
loop="uvloop", |
|
limit_concurrency=1000, |
|
timeout_keep_alive=30, |
|
access_log=True, |
|
log_level="info", |
|
http="httptools" |
|
) |
|
|
|
|
|
class TokenProcessor: |
|
"""Token processing handler""" |
|
|
|
def __init__(self): |
|
self.encoding = tiktoken.get_encoding("cl100k_base") |
|
|
|
async def calculate_tokens(self, text: str) -> int: |
|
"""Calculate tokens for given text""" |
|
return len(self.encoding.encode(text)) |
|
|
|
|
|
class ChatGPTSimulator: |
|
"""ChatGPT response simulator""" |
|
|
|
def __init__(self): |
|
self.token_processor = TokenProcessor() |
|
|
|
@staticmethod |
|
def generate_id(letters: int = 4, numbers: int = 6) -> str: |
|
"""Generate random chat completion ID""" |
|
import random |
|
import string |
|
letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters)) |
|
numbers_str = ''.join(random.choices(string.digits, k=numbers)) |
|
return f"chatcmpl-{letters_str}{numbers_str}" |
|
|
|
async def generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Generate simulated ChatGPT response""" |
|
try: |
|
result = "This is a test result." |
|
|
|
prompt_tokens = await self.token_processor.calculate_tokens(str(data)) |
|
completion_tokens = await self.token_processor.calculate_tokens(result) |
|
total_tokens = prompt_tokens + completion_tokens |
|
|
|
response_data = { |
|
"id": self.generate_id(), |
|
"object": "chat.completion", |
|
"created": int(time.time()), |
|
"model": data.get("model", "gpt-3.5-turbo"), |
|
"usage": { |
|
"prompt_tokens": prompt_tokens, |
|
"completion_tokens": completion_tokens, |
|
"total_tokens": total_tokens |
|
}, |
|
"choices": [ |
|
{ |
|
"message": { |
|
"role": "assistant", |
|
"content": result |
|
}, |
|
"finish_reason": "stop", |
|
"index": 0 |
|
} |
|
] |
|
} |
|
return response_data |
|
|
|
except Exception as e: |
|
if debug: |
|
print(f"Response generation error: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
class RouteManager: |
|
"""Route management handler""" |
|
|
|
def __init__(self, app: FastAPI): |
|
self.app = app |
|
self.simulator = ChatGPTSimulator() |
|
|
|
def setup_routes(self): |
|
"""Setup API routes""" |
|
routes = self._get_dynamic_routes() |
|
if debug: |
|
print(f"Registering routes: {routes}") |
|
for path in routes: |
|
self._register_route(path) |
|
|
|
def _get_dynamic_routes(self) -> List[str]: |
|
"""Get dynamic routes based on environment variables""" |
|
default_path = "/v1/chat/completions" |
|
replace_chat = os.getenv("REPLACE_CHAT", "") |
|
prefix_chat = os.getenv("PREFIX_CHAT", "") |
|
append_chat = os.getenv("APPEND_CHAT", "") |
|
|
|
if replace_chat: |
|
return [path.strip() for path in replace_chat.split(",") if path.strip()] |
|
|
|
routes = [] |
|
if prefix_chat: |
|
routes.extend(f"{prefix.rstrip('/')}{default_path}" |
|
for prefix in prefix_chat.split(",")) |
|
return routes |
|
|
|
if append_chat: |
|
append_paths = [path.strip() for path in append_chat.split(",") if path.strip()] |
|
routes = [default_path] + append_paths |
|
|
|
return routes or [default_path] |
|
|
|
def _register_route(self, path: str): |
|
"""Register a single route""" |
|
|
|
@self.app.post(path) |
|
async def chat_endpoint(request: Request) -> Dict[str, Any]: |
|
try: |
|
headers = dict(request.headers) |
|
data = await request.json() |
|
if debug: |
|
print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}") |
|
return await self.simulator.generate_response(headers, data) |
|
except JSONDecodeError as e: |
|
if debug: |
|
print(f"JSON decode error: {e}") |
|
raise HTTPException(status_code=400, detail="Invalid JSON format") |
|
except Exception as e: |
|
if debug: |
|
print(f"Request processing error: {e}") |
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
|
@app.get("/") |
|
async def health_check() -> str: |
|
"""Health check endpoint""" |
|
return "Service is running..." |
|
|
|
|
|
def run_server(host: str = "0.0.0.0", port: int = 7860): |
|
"""Run server with optimized configuration""" |
|
config = ServerConfig.get_uvicorn_config( |
|
app=app, |
|
host=host, |
|
port=port |
|
) |
|
|
|
server = uvicorn.Server(config) |
|
server.run() |
|
|
|
|
|
if __name__ == "__main__": |
|
route_manager = RouteManager(app) |
|
route_manager.setup_routes() |
|
port = int(os.getenv("PORT", "7860")) |
|
run_server(port=port) |