import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from zoneinfo import ZoneInfo import bittensor as bt import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 30 NET_UID = 39 AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 300 # 5 minutes ETA_WARNING_THRESHOLD = 43200 # 12 hours UPDATED_WARNING_THRESHOLD = 1000 VTRUST_WARNING_THRESHOLD = 0.75 subtensor = bt.subtensor() metagraph = bt.metagraph(netuid=NET_UID) class BenchmarkState(Enum): NOT_STARTED = ("Not Started", "orange") IN_PROGRESS = ("In Progress", "orange") FINISHED = ("Finished", "springgreen") OUT_OF_DATE = ("Out of Date", "red") STOPPED = ("Stopped", "red") CRASHED = ("Crashed", "red") FAILED = ("Failed", "red") def get_alt_time_text(self) -> str: return "∞" if self.is_broken() else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] def is_broken(self): return self == BenchmarkState.NOT_STARTED or self == BenchmarkState.STOPPED or self == BenchmarkState.CRASHED or self == BenchmarkState.FAILED @dataclass class State: benchmarking_state: BenchmarkState hotkey: str name: str version: str submissions: int benchmarks: int invalid: int average_benchmark_time: float vtrust: float updated: int data: dict[int, State] = {} last_refresh: datetime = datetime.fromtimestamp(0, tz=ZoneInfo("US/Pacific")) def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{run.name}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def get_identity(uid: int) -> str | None: identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]]) return identity.value["name"] if identity != None else None def fetch_wandb_data(): wandb_api = wandb.Api() global data data.clear() now = datetime.now(tz=ZoneInfo("US/Pacific")) noon = now.replace(hour=12, minute=0, second=0, microsecond=0) if now.hour < 12: noon -= timedelta(days=1) wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(noon)}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue uid = run.config["uid"] if uid in data: continue if not metagraph.validator_permit[uid]: continue submissions: set[int] = set() benchmarks: set[int] = set() invalid: dict[int, str] = {} average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = dict(value) elif key == "submissions": submissions = {int(uid) for uid in value.keys()} elif key == "benchmarks": benchmarks = {int(uid) for uid in value.keys()} benchmarking_state: BenchmarkState run_state = run.state if run_state == "finished": benchmarking_state = BenchmarkState.STOPPED elif run_state == "crashed": benchmarking_state = BenchmarkState.CRASHED elif run_state == "failed": benchmarking_state = BenchmarkState.FAILED elif "benchmarking_state" in run.summary: benchmarking_state = BenchmarkState[run.summary.get("benchmarking_state")] else: benchmarking_state = BenchmarkState.OUT_OF_DATE for invalid_uid, reason in invalid.items(): if not reason.startswith("Duplicate of UID"): benchmarks.add(invalid_uid) block = subtensor.get_current_block() data[uid] = State( benchmarking_state=benchmarking_state, hotkey=run.config["hotkey"], name=get_identity(uid) or run.config["hotkey"], version=run.tags[1][8:], submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time, vtrust=metagraph.validator_trust[uid], updated=block - metagraph.last_update[uid], ) data = dict(sorted(data.items())) def get_latest_version(data: dict[int, State]) -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def try_refresh(): global last_refresh now = datetime.now(tz=ZoneInfo("US/Pacific")) if (now - last_refresh).total_seconds() > REFRESH_RATE: print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") metagraph.sync(subtensor=subtensor) fetch_wandb_data() last_refresh = now def get_data() -> gr.Dataframe: try_refresh() elements: list[tuple] = [] latest_version = get_latest_version(data) now = datetime.now(tz=ZoneInfo("US/Pacific")) for uid, state in data.items(): eta = int(state.average_benchmark_time * (state.submissions - state.benchmarks)) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.benchmarking_state == BenchmarkState.IN_PROGRESS else state.benchmarking_state.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.benchmarking_state.get_alt_time_text(), elements.append(( uid, state.name, f"{state.version}", f"{state.benchmarking_state.name()}", state.benchmarks, state.submissions, state.invalid, f" AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.benchmarking_state == BenchmarkState.IN_PROGRESS else state.benchmarking_state.get_alt_time_text()}", f" VTRUST_WARNING_THRESHOLD else 'red'}'>{state.vtrust:.4f}", f"{state.updated}", )) return gr.Dataframe( elements, headers=["UID", "Name", "Version", "Status", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Pacific Time)", "ETA Remaining", "VTrust", "Updated"], datatype=["number", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"], label=f"SN{NET_UID} Validator States (Last updated: {last_refresh.strftime('%Y-%m-%d %I:%M:%S %p')} PST)", interactive=False, ) def main(): with gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) as demo: table = gr.Dataframe() table.attach_load_event(lambda: get_data(), None) demo.launch() if __name__ == '__main__': main()