|
import os |
|
from dataclasses import dataclass |
|
from datetime import datetime, timedelta |
|
from enum import Enum |
|
from zoneinfo import ZoneInfo |
|
|
|
import bittensor as bt |
|
import gradio as gr |
|
from packaging import version |
|
from substrateinterface import Keypair |
|
from wandb.apis.importers import wandb |
|
from wandb.apis.public import Run |
|
|
|
WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] |
|
|
|
REFRESH_RATE = 60 * 5 |
|
NET_UID = 39 |
|
AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 |
|
ETA_WARNING_THRESHOLD = 43200 |
|
|
|
subtensor = bt.subtensor() |
|
metagraph = bt.metagraph(netuid=NET_UID) |
|
bt.logging.disable_logging() |
|
|
|
wandb_api = wandb.Api() |
|
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) |
|
|
|
|
|
class Status(Enum): |
|
IN_PROGRESS = ("In Progress", "orange") |
|
DONE = ("Done", "springgreen") |
|
BROKEN = ("Broken", "red") |
|
STOPPED = ("Stopped", "red") |
|
CRASHED = ("Crashed", "red") |
|
|
|
def get_alt_time_text(self) -> str: |
|
return "β" if self.is_broken() else "N/A" |
|
|
|
def name(self): |
|
return self.value[0] |
|
|
|
def color(self): |
|
return self.value[1] |
|
|
|
def is_broken(self): |
|
return self == Status.BROKEN or self == Status.STOPPED or self == Status.CRASHED |
|
|
|
|
|
@dataclass |
|
class State: |
|
status: Status |
|
hotkey: str |
|
name: str |
|
version: str |
|
winner: int | None |
|
submissions: int |
|
benchmarks: int |
|
invalid: int |
|
average_benchmark_time: float |
|
vtrust: float |
|
updated: int |
|
|
|
|
|
data: dict[int, State] = {} |
|
last_refresh: datetime = datetime.fromtimestamp(0, tz=ZoneInfo("America/New_York")) |
|
|
|
|
|
def is_valid_run(run: Run): |
|
required_config_keys = ["hotkey", "uid", "contest", "signature"] |
|
|
|
for key in required_config_keys: |
|
if key not in run.config: |
|
return False |
|
|
|
uid = run.config["uid"] |
|
validator_hotkey = run.config["hotkey"] |
|
contest_name = run.config["contest"] |
|
|
|
signing_message = f"{uid}:{validator_hotkey}:{contest_name}" |
|
|
|
try: |
|
return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) |
|
except Exception: |
|
return False |
|
|
|
|
|
def get_identity(uid: int) -> str | None: |
|
identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]]) |
|
return identity.value["name"] if identity != None else None |
|
|
|
|
|
def fetch_wandb_data() -> dict[int, State]: |
|
data: dict[int, State] = {} |
|
|
|
now = datetime.now(tz=ZoneInfo("America/New_York")) |
|
noon = now.replace(hour=12, minute=0, second=0, microsecond=0) |
|
if now.hour < 12: |
|
noon -= timedelta(days=1) |
|
|
|
wandb_runs = wandb_api.runs( |
|
WANDB_RUN_PATH, |
|
filters={"config.type": "validator", "created_at": {'$gt': str(noon)}}, |
|
order="-created_at", |
|
) |
|
|
|
for run in wandb_runs: |
|
if not is_valid_run(run): |
|
continue |
|
|
|
uid = run.config["uid"] |
|
if not metagraph.validator_permit[uid]: |
|
continue |
|
|
|
winner = None |
|
submissions: set[int] = set() |
|
benchmarks: set[int] = set() |
|
invalid: list[int] = [] |
|
completed = False |
|
average_benchmark_time = 0.0 |
|
|
|
for key, value in run.summary.items(): |
|
if key == "average_benchmark_time": |
|
average_benchmark_time = float(value) |
|
|
|
elif key == "invalid": |
|
invalid = value |
|
|
|
elif key == "submissions": |
|
for submission_key, submission_value in value.items(): |
|
submissions.add(int(submission_key)) |
|
|
|
elif key == "benchmarks": |
|
for benchmark_uid, benchmark in value.items(): |
|
if "winner" in benchmark: |
|
winner = benchmark_uid |
|
completed = True |
|
break |
|
|
|
for benchmark_key, benchmark_value in value.items(): |
|
benchmarks.add(int(benchmark_key)) |
|
|
|
status = Status.IN_PROGRESS |
|
run_state = run.state |
|
if run_state == "finished": |
|
status = Status.STOPPED |
|
elif run_state == "crashed": |
|
status = Status.CRASHED |
|
elif completed: |
|
status = Status.DONE |
|
elif not submissions or (not average_benchmark_time and benchmarks): |
|
status = Status.BROKEN |
|
|
|
block = subtensor.get_current_block() |
|
data[uid] = State( |
|
status=status, |
|
hotkey=run.config["hotkey"], |
|
name=get_identity(uid) or run.config["hotkey"], |
|
version=run.tags[1][8:], |
|
winner=winner, |
|
submissions=len(submissions), |
|
benchmarks=len(benchmarks), |
|
invalid=len(invalid), |
|
average_benchmark_time=average_benchmark_time, |
|
vtrust=metagraph.validator_trust[uid], |
|
updated=block - metagraph.last_update[uid], |
|
) |
|
|
|
return dict(sorted(data.items())) |
|
|
|
|
|
def get_latest_version(data: dict[int, State]) -> str: |
|
latest_version = version.parse("0.0.0") |
|
for source_validator_uid, state in data.items(): |
|
current_version = version.parse(state.version) |
|
if current_version > latest_version: |
|
latest_version = current_version |
|
return str(latest_version) |
|
|
|
|
|
def get_data() -> gr.Dataframe: |
|
global data |
|
global last_refresh |
|
now = datetime.now(tz=ZoneInfo("America/New_York")) |
|
|
|
if (now - last_refresh).total_seconds() > REFRESH_RATE: |
|
metagraph.sync(subtensor=subtensor) |
|
data = fetch_wandb_data() |
|
last_refresh = now |
|
print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
elements: list[tuple] = [] |
|
latest_version = get_latest_version(data) |
|
|
|
for uid, state in data.items(): |
|
eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) |
|
time_left = timedelta(seconds=eta) |
|
eta_date = now + time_left |
|
eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text() |
|
|
|
average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(), |
|
|
|
elements.append(( |
|
uid, |
|
state.name, |
|
f"<span style='color: {'springgreen' if state.version == latest_version else 'red'}'>{state.version}</span>", |
|
f"<span style='color: {state.status.color()}'>{state.status.name()}</span>", |
|
f"<span style='color: {'springgreen' if state.winner else 'orange'}'>{state.winner if state.winner else 'N/A'}</span>", |
|
state.benchmarks + state.invalid, |
|
state.submissions, |
|
state.invalid, |
|
f"<span style='color: {'orange' if state.average_benchmark_time > AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}</span>", |
|
f"<span style='color: {'orange' if eta > ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}</span>", |
|
f"<span style='color: {'orange' if eta > ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text()}</span>", |
|
f"<span style='color: {'springgreen' if state.vtrust > 0.75 else 'red'}'>{state.vtrust:.4f}</span>", |
|
f"<span style='color: {'springgreen' if state.updated < 1000 else 'red'}'>{state.updated}</span>", |
|
)) |
|
|
|
return gr.Dataframe( |
|
elements, |
|
headers=["UID", "Name", "Version", "Status", "Winner UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "VTrust", "Updated"], |
|
datatype=["number", "markdown", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"], |
|
label=f"SN{NET_UID} Validator States (Last updated: {last_refresh.strftime('%Y-%m-%d %I:%M:%S %p')} EST)", |
|
interactive=False, |
|
) |
|
|
|
|
|
def main(): |
|
with demo: |
|
table = get_data() |
|
table.attach_load_event(lambda _: get_data(), REFRESH_RATE, [table]) |
|
demo.queue().launch() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|