import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from time import sleep from zoneinfo import ZoneInfo import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run HOTKEYS_TO_KNOWN_VALIDATORS: dict[str, str] = { "5GKH9FPPnWSUoeeTJp19wVtd84XqFW4pyK2ijV2GsFbhTrP1": "Taostats & Corcel", "5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3": "Openτensor Foundaτion", "5FFApaS75bv5pJHfAp2FVLBj9ZaXuFDjEypsaBNc1wCfe52v": "RoundTable21", "5HEo565WAy4Dbq3Sv271SAi7syBSofyfhhwRNjFNSM2gP9M2": "Foundry", "5HK5tp6t2S59DywmHRWPBVJeJ86T61KjurYqeooqj8sREpeN": "Bittensor Guru", "5GP7c3fFazW9GXK8Up3qgu2DJBk8inu4aK9TZy3RuoSWVCMi": "Datura", "5EhvL1FVkQPpMjZX4MAADcW42i3xPSF1KiCpuaxTYVr28sux": "TAO-Validator.com", "5E4z3h9yVhmQyCFWNbY9BPpwhx4xFiPwq3eeqmBgVF6KULde": "Tensorplex Labs", "5Hb63SvXBXqZ8zw6mwW1A39fHdqUrJvohXgepyhp2jgWedSB": "Miner's Union Validator", "5CXRfP2ekFhe62r7q3vppRajJmGhTi7vwvb2yr79jveZ282w": "Rizzo", "5CVS9d1NcQyWKUyadLevwGxg6LgBcF9Lik6NSnbe5q59jwhE": "Ary van der Touw", "5Fq5v71D4LX8Db1xsmRSy6udQThcZ8sFDqxQFwnUZ1BuqY5A": "NorthTensor", "5CsvRJXuR955WojnGMdok1hbhffZyB4N5ocrv82f3p5A2zVp": "Owl Ventures", "5HNQURvmjjYhTSksi8Wfsw676b4owGwfLR2BFAQzG7H3HhYf": "Neural Internet", "5DvTpiniW9s3APmHRYn8FroUWyfnLtrsid5Mtn5EwMXHN2ed": "FirstTensor.com", } WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 60 * 5 # 5 minutes AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 # 10 minutes ETA_WARNING_THRESHOLD = 43200 # 12 hours class Status(Enum): IN_PROGRESS = ("In Progress", "orange") DONE = ("Done", "springgreen") BROKEN = ("Broken", "red") def get_alt_time_text(self) -> str: return "∞" if self == Status.BROKEN else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] @dataclass class State: status: Status hotkey: str version: str submissions: int benchmarks: int invalid: int average_benchmark_time: float wandb_api = wandb.Api() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True) data: dict[int, State] = {} def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def fetch_wandb_data(): now = datetime.now(tz=ZoneInfo("America/New_York")) global data wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(now - timedelta(days=1))}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] completed = False average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = value elif key == "submissions": for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) elif key == "benchmarks": if any("winner" in benchmark for benchmark in dict(value).values()): completed = True for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) source_validator_uid = run.config["uid"] status = Status.IN_PROGRESS if completed: status = Status.DONE elif not submissions or (not average_benchmark_time and benchmarks): status = Status.BROKEN data[source_validator_uid] = State( status=status, hotkey=run.config["hotkey"], version=run.tags[1][8:], submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time ) data = dict(sorted(data.items())) def get_latest_version() -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def refresh(): fetch_wandb_data() demo.clear() now = datetime.now(tz=ZoneInfo("America/New_York")) with demo: with gr.Accordion(f"Validator States (Last updated: {now.strftime('%Y-%m-%d %I:%M:%S %p')} EST)"): elements: list[tuple] = [] latest_version = get_latest_version() for source_validator_uid, state in data.items(): eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(), elements.append(( source_validator_uid, HOTKEYS_TO_KNOWN_VALIDATORS.get(state.hotkey, state.hotkey), f"{state.version}", f"{state.status.name()}", state.benchmarks + state.invalid, state.submissions, state.invalid, f" AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.IN_PROGRESS else state.status.get_alt_time_text()}" )) gr.components.Dataframe( elements, headers=["UID", "Name", "Version", "Status", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining"], datatype=["number", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown"], elem_id="state-table", ) def main(): refresh() demo.launch(prevent_thread_lock=True) while True: sleep(REFRESH_RATE) now = datetime.now(tz=ZoneInfo("America/New_York")) print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") refresh() if __name__ == '__main__': main()