import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from zoneinfo import ZoneInfo import bittensor as bt import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 60 * 5 # 5 minutes NET_UID = 39 AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 # 10 minutes ETA_WARNING_THRESHOLD = 43200 # 12 hours subtensor = bt.subtensor() metagraph = bt.metagraph(netuid=NET_UID) bt.logging.disable_logging() wandb_api = wandb.Api() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) class Status(Enum): IN_PROGRESS = ("In Progress", "orange") DONE = ("Done", "springgreen") BROKEN = ("Broken", "red") STOPPED = ("Stopped", "red") CRASHED = ("Crashed", "red") def get_alt_time_text(self) -> str: return "∞" if self.is_broken() else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] def is_broken(self): return self == Status.BROKEN or self == Status.STOPPED or self == Status.CRASHED @dataclass class State: status: Status hotkey: str name: str version: str winner: int | None submissions: int benchmarks: int invalid: int average_benchmark_time: float vtrust: float updated: int data: dict[int, State] = {} last_refresh: datetime = datetime.fromtimestamp(0, tz=ZoneInfo("America/New_York")) def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def get_identity(uid: int) -> str | None: identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]]) return identity.value["name"] if identity != None else None def fetch_wandb_data() -> dict[int, State]: data: dict[int, State] = {} now = datetime.now(tz=ZoneInfo("America/New_York")) noon = now.replace(hour=12, minute=0, second=0, microsecond=0) if now.hour < 12: noon -= timedelta(days=1) wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(noon)}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue uid = run.config["uid"] if not metagraph.validator_permit[uid]: continue winner = None submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] completed = False average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = value elif key == "submissions": for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) elif key == "benchmarks": for benchmark_uid, benchmark in value.items(): if "winner" in benchmark: winner = benchmark_uid completed = True break for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) status = Status.IN_PROGRESS run_state = run.state if run_state == "finished": status = Status.STOPPED elif run_state == "crashed": status = Status.CRASHED elif completed: status = Status.DONE elif not submissions or (not average_benchmark_time and benchmarks): status = Status.BROKEN block = subtensor.get_current_block() data[uid] = State( status=status, hotkey=run.config["hotkey"], name=get_identity(uid) or run.config["hotkey"], version=run.tags[1][8:], winner=winner, submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time, vtrust=metagraph.validator_trust[uid], updated=block - metagraph.last_update[uid], ) return dict(sorted(data.items())) def get_latest_version(data: dict[int, State]) -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def get_data() -> list[tuple]: global data global last_refresh now = datetime.now(tz=ZoneInfo("America/New_York")) if (now - last_refresh).total_seconds() > REFRESH_RATE: metagraph.sync(subtensor=subtensor) data = fetch_wandb_data() last_refresh = now print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") elements: list[tuple] = [] latest_version = get_latest_version(data) for uid, state in data.items(): eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(), elements.append(( uid, state.name, f"{state.version}", f"{state.status.name()}", f"{state.winner if state.winner else 'N/A'}", state.benchmarks + state.invalid, state.submissions, state.invalid, f" AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text()}", f" 0.75 else 'red'}'>{state.vtrust:.4f}", f"{state.updated}", )) return elements def main(): with demo: with gr.Accordion(f"Validator States"): gr.components.Dataframe( get_data, every=REFRESH_RATE, headers=["UID", "Name", "Version", "Status", "Winner UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "VTrust", "Updated"], datatype=["number", "markdown", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"], elem_id="state-table", ) demo.queue().launch() if __name__ == '__main__': main()