Spaces:
Running
Running
File size: 6,643 Bytes
8ff63e4 7ed0b8b 8ff63e4 7ed0b8b 8ff63e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import yaml
import random
import asyncio
from typing import Literal
from functools import cached_property
import httpx
from pydantic import BaseModel
from text_generation import AsyncClient
from spitfight.log import get_logger
logger = get_logger(__name__)
class Worker(BaseModel):
"""A worker that serves a model."""
# Worker's container name, since we're using Overlay networks.
hostname: str
# For TGI, this would always be 80.
port: int
# User-friendly model name, e.g. "Llama2-7B".
model_name: str
# Hugging Face model ID, e.g. "metaai/Llama-2-13b-chat-hf".
model_id: str
# Whether the model worker container is good.
status: Literal["up", "down"]
class Config:
keep_untouched = (cached_property,)
@cached_property
def url(self) -> str:
return f"http://{self.hostname}:{self.port}"
def get_client(self) -> AsyncClient:
return AsyncClient(base_url=self.url)
def audit(self) -> None:
"""Make sure the worker is running and information is as expected.
Assumed to be called on app startup when workers are initialized.
This method will just raise `ValueError`s if audit fails in order to
prevent the controller from starting if anything is wrong.
"""
try:
response = httpx.get(self.url + "/info")
except (httpx.ConnectError, httpx.TimeoutException) as e:
raise ValueError(f"Could not connect to {self!r}: {e!r}")
if response.status_code != 200:
raise ValueError(f"Could not get /info from {self!r}.")
info = response.json()
if info["model_id"] != self.model_id:
raise ValueError(f"Model name mismatch: {info['model_id']} != {self.model_id}")
self.status = "up"
logger.info("%s is up.", repr(self))
async def check_status(self) -> None:
"""Check worker status and update `self.status` accordingly."""
async with httpx.AsyncClient() as client:
try:
response = await client.get(self.url + "/info")
except (httpx.ConnectError, httpx.TimeoutException) as e:
self.status = "down"
logger.warning("%s is down: %s", repr(self), repr(e))
return
if response.status_code != 200:
self.status = "down"
logger.warning("GET /info from %s returned %s.", repr(self), response.json())
return
info = response.json()
if info["model_id"] != self.model_id:
self.status = "down"
logger.warning(
"Model name mismatch for %s: %s != %s",
repr(self),
info["model_id"],
self.model_id,
)
return
logger.info("%s is up.", repr(self))
self.status = "up"
class WorkerService:
"""A service that manages model serving workers.
Worker objects are only created once and shared across the
entire application. Especially, changing the status of a worker
will immediately take effect on the result of `choose_two`.
Attributes:
workers (list[Worker]): The list of workers.
"""
def __init__(self, compose_files: list[str]) -> None:
"""Initialize the worker service."""
self.workers: list[Worker] = []
worker_model_names = set()
for compose_file in compose_files:
spec = yaml.safe_load(open(compose_file))
for model_name, service_spec in spec["services"].items():
command = service_spec["command"]
for i, cmd in enumerate(command):
if cmd == "--model-id":
model_id = command[i + 1]
break
else:
raise ValueError(f"Could not find model ID in {command!r}")
worker_model_names.add(model_name)
worker = Worker(
hostname=service_spec["container_name"],
port=80,
model_name=model_name,
model_id=model_id,
status="down",
)
worker.audit()
self.workers.append(worker)
if len(worker_model_names) != len(self.workers):
raise ValueError("Model names must be unique.")
def get_worker(self, model_name: str) -> Worker:
"""Get a worker by model name."""
for worker in self.workers:
if worker.model_name == model_name:
if worker.status == "down":
# This is an unfortunate case where, when the two models were chosen,
# the worker was up, but after that went down before the request
# completed. We'll just raise a 500 internal error and have the user
# try again. This won't be common.
raise RuntimeError(f"The worker with model name {model_name} is down.")
return worker
raise ValueError(f"Worker with model name {model_name} does not exist.")
def choose_two(self) -> tuple[Worker, Worker]:
"""Choose two different workers.
Good place to use the Strategy Pattern when we want to
implement different strategies for choosing workers.
"""
live_workers = [worker for worker in self.workers if worker.status == "up"]
if len(live_workers) < 2:
raise ValueError("Not enough live workers to choose from.")
worker_a, worker_b = random.sample(live_workers, 2)
return worker_a, worker_b
def choose_based_on_preference(self, preference: str) -> tuple[Worker, Worker]:
"""Choose two different workers based on user preference.
Specifically, if `preference` is `"Random"`, this is equivalent to
choosing two models at random. Otherwise, if `preference` is a model
name, this is equivalent to choosing that model and another model at
random. In that case, the order of the two models is also randomized.
"""
if preference == "Random":
return self.choose_two()
else:
worker_a = self.get_worker(preference)
worker_b = random.choice([worker for worker in self.workers if worker != worker_a])
return tuple(random.sample([worker_a, worker_b], 2))
async def check_workers(self) -> None:
"""Check the status of all workers."""
await asyncio.gather(*[worker.check_status() for worker in self.workers])
|