import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from zoneinfo import ZoneInfo import bittensor as bt import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 30 NET_UID = 39 AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 # 10 minutes ETA_WARNING_THRESHOLD = 43200 # 12 hours subtensor = bt.subtensor() metagraph = bt.metagraph(netuid=NET_UID) bt.logging.disable_logging() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) class Status(Enum): BENCHMARKING = ("Benchmarking", "orange") DONE = ("Done", "springgreen") BROKEN = ("Broken", "red") STOPPED = ("Stopped", "red") CRASHED = ("Crashed", "red") INITIALIZING = ("Initializing", "orange") def get_alt_time_text(self) -> str: return "∞" if self.is_broken() else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] def is_broken(self): return self == Status.BROKEN or self == Status.STOPPED or self == Status.CRASHED or self == Status.INITIALIZING @dataclass class State: status: Status hotkey: str name: str version: str winner: int | None submissions: int benchmarks: int invalid: int average_benchmark_time: float vtrust: float updated: int data: dict[int, State] = {} last_refresh: datetime = datetime.fromtimestamp(0, tz=ZoneInfo("US/Pacific")) def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def get_identity(uid: int) -> str | None: identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]]) return identity.value["name"] if identity != None else None def fetch_wandb_data(): wandb_api = wandb.Api() global data data.clear() now = datetime.now(tz=ZoneInfo("America/New_York")) noon = now.replace(hour=12, minute=0, second=0, microsecond=0) if now.hour < 12: noon -= timedelta(days=1) wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(noon)}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue uid = run.config["uid"] if not metagraph.validator_permit[uid]: continue winner = None submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] completed = False average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = value elif key == "submissions": for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) elif key == "benchmarks": for benchmark_uid, benchmark in value.items(): if "winner" in benchmark: winner = benchmark_uid completed = True break for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) status = Status.BENCHMARKING run_state = run.state if run_state == "finished": status = Status.STOPPED elif run_state == "crashed": status = Status.CRASHED elif completed: status = Status.DONE elif not submissions or (not average_benchmark_time and benchmarks): if 12 <= now.hour < 13: status = Status.INITIALIZING else: status = Status.BROKEN block = subtensor.get_current_block() data[uid] = State( status=status, hotkey=run.config["hotkey"], name=get_identity(uid) or run.config["hotkey"], version=run.tags[1][8:], winner=winner, submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time, vtrust=metagraph.validator_trust[uid], updated=block - metagraph.last_update[uid], ) data = dict(sorted(data.items())) def get_latest_version(data: dict[int, State]) -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def try_refresh(): global last_refresh now = datetime.now(tz=ZoneInfo("America/New_York")) if (now - last_refresh).total_seconds() > REFRESH_RATE: print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") metagraph.sync(subtensor=subtensor) fetch_wandb_data() last_refresh = now def get_data() -> gr.Dataframe: try_refresh() elements: list[tuple] = [] latest_version = get_latest_version(data) now = datetime.now(tz=ZoneInfo("America/New_York")) for uid, state in data.items(): eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.BENCHMARKING else state.status.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(), elements.append(( uid, state.name, f"{state.version}", f"{state.status.name()}", f"{state.winner if state.winner else 'N/A'}", state.benchmarks + state.invalid, state.submissions, state.invalid, f" AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.BENCHMARKING else state.status.get_alt_time_text()}", f" 0.75 else 'red'}'>{state.vtrust:.4f}", f"{state.updated}", )) return gr.Dataframe( elements, headers=["UID", "Name", "Version", "Status", "Winner UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "VTrust", "Updated"], datatype=["number", "markdown", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"], label=f"SN{NET_UID} Validator States (Last updated: {last_refresh.strftime('%Y-%m-%d %I:%M:%S %p')} EST)", interactive=False, ) def main(): with demo: table = gr.Dataframe() table.attach_load_event(lambda: get_data(), None) demo.launch() if __name__ == '__main__': main()