import os import asyncio import time import multiprocessing from typing import Dict, Any, List, Optional from fastapi import FastAPI, Request, HTTPException import uvicorn import tiktoken from json.decoder import JSONDecodeError # Initialize FastAPI application app = FastAPI( title="ones", description="High-performance API service", version="1.0.0|2025.1.6" ) # debug debug = False class ServerConfig: """Server configuration handler with dynamic worker calculation""" @staticmethod def get_workers_count() -> int: """ Calculate optimal number of workers Default: 4, Maximum: 8 Formula: min(max(4, (2 * CPU cores) + 1), 8) """ try: cpu_cores = multiprocessing.cpu_count() recommended_workers = (2 * cpu_cores) + 1 return min(max(4, recommended_workers), 8) except Exception as e: if debug: print(f"Worker count calculation failed: {e}, using default 4") return 4 @classmethod def get_uvicorn_config(cls, app: FastAPI, host: str = "0.0.0.0", port: int = 7860) -> uvicorn.Config: """Get optimized Uvicorn configuration""" workers = cls.get_workers_count() if debug: print(f"Configuring server with {workers} workers") return uvicorn.Config( app=app, host=host, port=port, workers=workers, loop="uvloop", limit_concurrency=1000, timeout_keep_alive=30, access_log=True, log_level="info", http="httptools" ) class TokenProcessor: """Token processing handler""" def __init__(self): self.encoding = tiktoken.get_encoding("cl100k_base") async def calculate_tokens(self, text: str) -> int: """Calculate tokens for given text""" return len(self.encoding.encode(text)) class ChatGPTSimulator: """ChatGPT response simulator""" def __init__(self): self.token_processor = TokenProcessor() @staticmethod def generate_id(letters: int = 4, numbers: int = 6) -> str: """Generate random chat completion ID""" import random import string letters_str = ''.join(random.choices(string.ascii_lowercase, k=letters)) numbers_str = ''.join(random.choices(string.digits, k=numbers)) return f"chatcmpl-{letters_str}{numbers_str}" async def generate_response(self, headers: Dict[str, str], data: Dict[str, Any]) -> Dict[str, Any]: """Generate simulated ChatGPT response""" try: result = "This is a test result." prompt_tokens = await self.token_processor.calculate_tokens(str(data)) completion_tokens = await self.token_processor.calculate_tokens(result) total_tokens = prompt_tokens + completion_tokens response_data = { "id": self.generate_id(), "object": "chat.completion", "created": int(time.time()), "model": data.get("model", "gpt-3.5-turbo"), "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens }, "choices": [ { "message": { "role": "assistant", "content": result }, "finish_reason": "stop", "index": 0 } ] } return response_data except Exception as e: if debug: print(f"Response generation error: {e}") raise HTTPException(status_code=500, detail=str(e)) class RouteManager: """Route management handler""" def __init__(self, app: FastAPI): self.app = app self.simulator = ChatGPTSimulator() def setup_routes(self): """Setup API routes""" routes = self._get_dynamic_routes() if debug: print(f"Registering routes: {routes}") for path in routes: self._register_route(path) def _get_dynamic_routes(self) -> List[str]: """Get dynamic routes based on environment variables""" default_path = "/v1/chat/completions" replace_chat = os.getenv("REPLACE_CHAT", "") prefix_chat = os.getenv("PREFIX_CHAT", "") append_chat = os.getenv("APPEND_CHAT", "") if replace_chat: return [path.strip() for path in replace_chat.split(",") if path.strip()] routes = [] if prefix_chat: routes.extend(f"{prefix.rstrip('/')}{default_path}" for prefix in prefix_chat.split(",")) return routes if append_chat: append_paths = [path.strip() for path in append_chat.split(",") if path.strip()] routes = [default_path] + append_paths return routes or [default_path] def _register_route(self, path: str): """Register a single route""" @self.app.post(path) async def chat_endpoint(request: Request) -> Dict[str, Any]: try: headers = dict(request.headers) data = await request.json() if debug: print(f"Request received...\r\n\tHeaders: {headers},\r\n\tData: {data}") return await self.simulator.generate_response(headers, data) except JSONDecodeError as e: if debug: print(f"JSON decode error: {e}") raise HTTPException(status_code=400, detail="Invalid JSON format") except Exception as e: if debug: print(f"Request processing error: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/") async def health_check() -> str: """Health check endpoint""" return "Service is running..." def run_server(host: str = "0.0.0.0", port: int = 7860): """Run server with optimized configuration""" config = ServerConfig.get_uvicorn_config( app=app, host=host, port=port ) server = uvicorn.Server(config) server.run() if __name__ == "__main__": route_manager = RouteManager(app) route_manager.setup_routes() port = int(os.getenv("PORT", "7860")) run_server(port=port)