import os from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from time import sleep from zoneinfo import ZoneInfo import gradio as gr from substrateinterface import Keypair from wandb.apis.importers import wandb from wandb.apis.public import Run WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] REFRESH_RATE = 60 * 5 @dataclass class State: submissions: int benchmarks: int invalid: int average_benchmark_time: float # enum class Status(Enum): IN_PROGRESS = "In Progress" DONE = "Done" BROKEN = "Broken" def get_alt_time_text(self): return "∞" if self == Status.BROKEN else "N/A" wandb_api = wandb.Api() demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") data: dict[int, State] = {} def is_valid_run(run: Run): required_config_keys = ["hotkey", "uid", "contest", "signature"] for key in required_config_keys: if key not in run.config: return False uid = run.config["uid"] validator_hotkey = run.config["hotkey"] contest_name = run.config["contest"] signing_message = f"{uid}:{validator_hotkey}:{contest_name}" try: return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) except Exception: return False def fetch_wandb_data(): now = datetime.now(tz=ZoneInfo("America/New_York")) global data wandb_runs = wandb_api.runs( WANDB_RUN_PATH, filters={"config.type": "validator", "created_at": {'$gt': str(now - timedelta(days=1))}}, order="-created_at", ) for run in wandb_runs: if not is_valid_run(run): return submissions: set[int] = set() benchmarks: set[int] = set() invalid: list[int] = [] average_benchmark_time = 0.0 for key, value in run.summary.items(): if key.startswith("_"): continue if key.startswith("average_benchmark_time"): average_benchmark_time = float(value) continue if key.startswith("invalid"): invalid = value continue if key.startswith("submissions"): for submission_key, submission_value in value.items(): submissions.add(int(submission_key)) if key.startswith("benchmarks"): for benchmark_key, benchmark_value in value.items(): benchmarks.add(int(benchmark_key)) source_validator_uid = run.config["uid"] data[source_validator_uid] = State( submissions=len(submissions), benchmarks=len(benchmarks), invalid=len(invalid), average_benchmark_time=average_benchmark_time ) data = dict(sorted(data.items())) def refresh(): fetch_wandb_data() demo.clear() now = datetime.now(tz=ZoneInfo("America/New_York")) with demo: with gr.Accordion(f"Validator States (Last update: {now.strftime('%Y-%m-%d %I:%M:%S %p')} EST)"): elements: list[tuple] = [] for source_validator_uid, state in data.items(): status = Status.IN_PROGRESS if state.benchmarks + state.invalid == state.submissions: status = Status.DONE elif not state.submissions or (not state.average_benchmark_time and state.benchmarks): status = Status.BROKEN eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) time_left = timedelta(seconds=eta) eta_date = now + time_left eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 else status.get_alt_time_text() elements.append(( source_validator_uid, state.benchmarks + state.invalid, state.submissions, state.invalid, f"{timedelta(seconds=int(state.average_benchmark_time))}s" if state.average_benchmark_time else status.get_alt_time_text(), eta_time, time_left if eta > 0 else status.get_alt_time_text(), status.value, )) gr.components.Dataframe( elements, headers=["UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "Status"], datatype=["number", "number", "number", "number", "number", "markdown", "markdown", "markdown"], elem_id="state-table", interactive=False, visible=True, ) def main(): refresh() demo.launch(prevent_thread_lock=True) while True: sleep(REFRESH_RATE) now = datetime.now(tz=ZoneInfo("America/New_York")) print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") refresh() if __name__ == '__main__': main()