import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from time import sleep from zoneinfo import ZoneInfo import bittensor as bt import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 60 * 5 # 5 minutes NET_UID = 39 AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 # 10 minutes ETA_WARNING_THRESHOLD = 43200 # 12 hours subtensor = bt.subtensor() metagraph = bt.metagraph(netuid=NET_UID) bt.logging.disable_logging() wandb_api = wandb.Api() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) class Status(Enum): IN_PROGRESS = ("In Progress", "orange") DONE = ("Done", "springgreen") BROKEN = ("Broken", "red") STOPPED = ("Stopped", "red") CRASHED = ("Crashed", "red") def get_alt_time_text(self) -> str: return "∞" if self.is_broken() else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] def is_broken(self): return self == Status.BROKEN or self == Status.STOPPED or self == Status.CRASHED @dataclass class State: status: Status hotkey: str version: str winner: int | None submissions: int benchmarks: int invalid: int average_benchmark_time: float data: dict[int, State] = {} validator_identities: dict[int, str] = {} validator_vtrust: dict[int, float] = {} validator_updated: dict[int, int] = {} def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def fetch_wandb_data(): now = datetime.now(tz=ZoneInfo("America/New_York")) noon = now.replace(hour=12, minute=0, second=0, microsecond=0) if now.hour < 12: noon -= timedelta(days=1) global data wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(noon)}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue uid = run.config["uid"] if not metagraph.validator_permit[uid]: continue winner = None submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] completed = False average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = value elif key == "submissions": for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) elif key == "benchmarks": for benchmark_uid, benchmark in value.items(): if "winner" in benchmark: winner = benchmark_uid completed = True break for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) status = Status.IN_PROGRESS run_state = run.state if run_state == "finished": status = Status.STOPPED elif run_state == "crashed": status = Status.CRASHED elif completed: status = Status.DONE elif not submissions or (not average_benchmark_time and benchmarks): status = Status.BROKEN data[uid] = State( status=status, hotkey=run.config["hotkey"], version=run.tags[1][8:], winner=winner, submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time ) data = dict(sorted(data.items())) def fetch_identities(): validator_identities.clear() for uid in data.keys(): identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]]) if identity != None: validator_identities[uid] = identity.value["name"] def fetch_metagraph_data(): validator_vtrust.clear() validator_updated.clear() block = subtensor.get_current_block() for uid in data.keys(): validator_vtrust[uid] = metagraph.validator_trust[uid] validator_updated[uid] = block - metagraph.last_update[uid] def get_validator_name(validator_uid: int) -> str: if validator_uid in validator_identities: return validator_identities[validator_uid] else: return metagraph.hotkeys[validator_uid] def get_latest_version() -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def refresh(): metagraph.sync(subtensor=subtensor) fetch_wandb_data() fetch_identities() fetch_metagraph_data() demo.clear() now = datetime.now(tz=ZoneInfo("America/New_York")) with demo: with gr.Accordion(f"Validator States (Last updated: {now.strftime('%Y-%m-%d %I:%M:%S %p')} EST)"): elements: list[tuple] = [] latest_version = get_latest_version() for uid, state in data.items(): eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(), elements.append(( uid, get_validator_name(uid), f"{state.version}", f"{state.status.name()}", f"{state.winner if state.winner else 'N/A'}", state.benchmarks + state.invalid, state.submissions, state.invalid, f" AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text()}", f" 0.75 else 'red'}'>{validator_vtrust[uid]:.4f}", f"{validator_updated[uid]}", )) gr.components.Dataframe( elements, headers=["UID", "Name", "Version", "Status", "Winner UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "VTrust", "Updated"], datatype=["number", "markdown", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"], elem_id="state-table", ) def main(): refresh() demo.launch(prevent_thread_lock=True) while True: sleep(REFRESH_RATE) now = datetime.now(tz=ZoneInfo("America/New_York")) print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") refresh() if __name__ == '__main__': main()