import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from time import sleep from zoneinfo import ZoneInfo import gradio as gr from packaging import version from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run HOTKEYS_TO_KNOWN_VALIDATORS: dict[str, str] = { "5GKH9FPPnWSUoeeTJp19wVtd84XqFW4pyK2ijV2GsFbhTrP1": "Taostats & Corcel", "5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3": "Openτensor Foundaτion", "5FFApaS75bv5pJHfAp2FVLBj9ZaXuFDjEypsaBNc1wCfe52v": "RoundTable21", "5HEo565WAy4Dbq3Sv271SAi7syBSofyfhhwRNjFNSM2gP9M2": "Foundry", "5HK5tp6t2S59DywmHRWPBVJeJ86T61KjurYqeooqj8sREpeN": "Bittensor Guru", "5GP7c3fFazW9GXK8Up3qgu2DJBk8inu4aK9TZy3RuoSWVCMi": "Datura", "5EhvL1FVkQPpMjZX4MAADcW42i3xPSF1KiCpuaxTYVr28sux": "TAO-Validator.com", "5E4z3h9yVhmQyCFWNbY9BPpwhx4xFiPwq3eeqmBgVF6KULde": "Tensorplex Labs", "5Hb63SvXBXqZ8zw6mwW1A39fHdqUrJvohXgepyhp2jgWedSB": "Miner's Union Validator", "5CXRfP2ekFhe62r7q3vppRajJmGhTi7vwvb2yr79jveZ282w": "Rizzo", "5CVS9d1NcQyWKUyadLevwGxg6LgBcF9Lik6NSnbe5q59jwhE": "Ary van der Touw", "5Fq5v71D4LX8Db1xsmRSy6udQThcZ8sFDqxQFwnUZ1BuqY5A": "NorthTensor", "5CsvRJXuR955WojnGMdok1hbhffZyB4N5ocrv82f3p5A2zVp": "Owl Ventures", "5HNQURvmjjYhTSksi8Wfsw676b4owGwfLR2BFAQzG7H3HhYf": "Neural Internet", "5DvTpiniW9s3APmHRYn8FroUWyfnLtrsid5Mtn5EwMXHN2ed": "FirstTensor.com", } WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 60 * 5 ETA_WARNING_THRESHOLD = 43200 # 12 hours @dataclass class State: hotkey: str version: str submissions: int benchmarks: int invalid: int average_benchmark_time: float # enum class Status(Enum): IN_PROGRESS = ("In Progress", "orange") DONE = ("Done", "springgreen") BROKEN = ("Broken", "red") def get_alt_time_text(self) -> str: return "∞" if self == Status.BROKEN else "N/A" def name(self): return self.value[0] def color(self): return self.value[1] wandb_api = wandb.Api() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") data: dict[int, State] = {} def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def fetch_wandb_data(): now = datetime.now(tz=ZoneInfo("America/New_York")) global data wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(now - timedelta(days=1))}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): continue submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] average_benchmark_time = 0.0 for key, value in run.summary.items(): if key == "average_benchmark_time": average_benchmark_time = float(value) elif key == "invalid": invalid = value elif key == "submissions": for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) elif key == "benchmarks": for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) source_validator_uid = run.config["uid"] data[source_validator_uid] = State( hotkey=run.config["hotkey"], version=run.tags[1][8:], submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time ) data = dict(sorted(data.items())) def get_latest_version() -> str: latest_version = version.parse("0.0.0") for source_validator_uid, state in data.items(): current_version = version.parse(state.version) if current_version > latest_version: latest_version = current_version return str(latest_version) def refresh(): fetch_wandb_data() demo.clear() now = datetime.now(tz=ZoneInfo("America/New_York")) with demo: with gr.Accordion(f"Validator States (Last updated: {now.strftime('%Y-%m-%d %I:%M:%S %p')} EST)"): elements: list[tuple] = [] latest_version = get_latest_version() for source_validator_uid, state in data.items(): status = Status.IN_PROGRESS if state.benchmarks + state.invalid == state.submissions: status = Status.DONE elif not state.submissions or (not state.average_benchmark_time and state.benchmarks): status = Status.BROKEN eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 else status.get_alt_time_text() average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else status.get_alt_time_text(), elements.append(( source_validator_uid, HOTKEYS_TO_KNOWN_VALIDATORS.get(state.hotkey, state.hotkey), f"{state.version}", f"{status.name()}", state.benchmarks + state.invalid, state.submissions, state.invalid, f" 600 else 'springgreen'}'>{average_time_text[0]}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}", f" ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 else status.get_alt_time_text()}" )) gr.components.Dataframe( elements, headers=["UID", "Name", "Version", "Status", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining"], datatype=["number", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown"], elem_id="state-table", ) def main(): refresh() demo.launch(prevent_thread_lock=True) while True: sleep(REFRESH_RATE) now = datetime.now(tz=ZoneInfo("America/New_York")) print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") refresh() if __name__ == '__main__': main()