import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from zoneinfo import ZoneInfo America/New_York import bittensor as bt import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 30 NET_UID = 39 AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 # 10 minutes ETA_WARNING_THRESHOLD = 43200 # 12 hours subtensor = bt.subtensor() metagraph = bt.metagraph(netuid=NET_UID) bt.logging.disable_logging() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) class Status(Enum): BENCHMARKING = ("Benchmarking", "orange") DONE = ("Done", "springgreen") BROKEN = ("Broken", "red") STOPPED = ("Stopped", "red") CRASHED = ("Crashed", "red") INITIALIZING = ("Initializing", "orange") def get_alt_time_text(self) -> str: return "∞" if self.is_broken() else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] def is_broken(self): return self == Status.BROKEN or self == Status.STOPPED or self == Status.CRASHED or self == Status.INITIALIZING @dataclass class State: status: Status hotkey: str name: str version: str winner: int | None submissions: int benchmarks: int invalid: int average_benchmark_time: float vtrust: float updated: int data: dict[int, State] = {} last_refresh: datetime = datetime.fromtimestamp(0, tz=ZoneInfo("US/Pacific")) def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def get_identity(uid: int) -> str | None: identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]]) return identity.value["name"] if identity != None else None def fetch_wandb_data(): wandb_api = wandb.Api() global data data.clear() now = datetime.now(tz=ZoneInfo("US/Pacific")) noon = now.replace(hour=12, minute=0, second=0, microsecond=0) if now.hour < 12: noon -= timedelta(days=1) wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(noon)}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue uid = run.config["uid"] if not metagraph.validator_permit[uid]: continue winner = None submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] completed = False average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = value elif key == "submissions": for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) elif key == "benchmarks": for benchmark_uid, benchmark in value.items(): if "winner" in benchmark: winner = benchmark_uid completed = True break for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) status = Status.BENCHMARKING run_state = run.state if run_state == "finished": status = Status.STOPPED elif run_state == "crashed": status = Status.CRASHED elif completed: status = Status.DONE elif not submissions or (not average_benchmark_time and benchmarks): if 12 <= now.hour < 13: status = Status.INITIALIZING else: status = Status.BROKEN block = subtensor.get_current_block() data[uid] = State( status=status, hotkey=run.config["hotkey"], name=get_identity(uid) or run.config["hotkey"], version=run.tags[1][8:], winner=winner, submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time, vtrust=metagraph.validator_trust[uid], updated=block - metagraph.last_update[uid], ) data = dict(sorted(data.items())) def get_latest_version(data: dict[int, State]) -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def try_refresh(): global last_refresh now = datetime.now(tz=ZoneInfo("US/Pacific")) if (now - last_refresh).total_seconds() > REFRESH_RATE: print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") metagraph.sync(subtensor=subtensor) fetch_wandb_data() last_refresh = now def get_data() -> gr.Dataframe: try_refresh() elements: list[tuple] = [] latest_version = get_latest_version(data) now = datetime.now(tz=ZoneInfo("US/Pacific")) for uid, state in data.items(): eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.BENCHMARKING else state.status.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(), elements.append(( uid, state.name, f"{state.version}", f"{state.status.name()}", f"{state.winner if state.winner else 'N/A'}", state.benchmarks + state.invalid, state.submissions, state.invalid, f" AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.BENCHMARKING else state.status.get_alt_time_text()}", f" 0.75 else 'red'}'>{state.vtrust:.4f}", f"{state.updated}", )) return gr.Dataframe( elements, headers=["UID", "Name", "Version", "Status", "Winner UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "VTrust", "Updated"], datatype=["number", "markdown", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"], label=f"SN{NET_UID} Validator States (Last updated: {last_refresh.strftime('%Y-%m-%d %I:%M:%S %p')} EST)", interactive=False, ) def main(): with demo: table = gr.Dataframe() table.attach_load_event(lambda: get_data(), None) demo.launch() if __name__ == '__main__': main()