|
import os |
|
from dataclasses import dataclass |
|
from datetime import datetime, timedelta |
|
from enum import Enum |
|
from time import sleep |
|
from zoneinfo import ZoneInfo |
|
|
|
import gradio as gr |
|
from substrateinterface import Keypair |
|
from wandb.apis.importers import wandb |
|
from wandb.apis.public import Run |
|
|
|
WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"] |
|
|
|
REFRESH_RATE = 60 * 5 |
|
|
|
|
|
@dataclass |
|
class State: |
|
version: str |
|
submissions: int |
|
benchmarks: int |
|
invalid: int |
|
average_benchmark_time: float |
|
|
|
|
|
|
|
class Status(Enum): |
|
IN_PROGRESS = "In Progress" |
|
DONE = "Done" |
|
BROKEN = "Broken" |
|
|
|
def get_alt_time_text(self): |
|
return "β" if self == Status.BROKEN else "N/A" |
|
|
|
|
|
wandb_api = wandb.Api() |
|
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") |
|
data: dict[int, State] = {} |
|
|
|
|
|
def is_valid_run(run: Run): |
|
required_config_keys = ["hotkey", "uid", "contest", "signature"] |
|
|
|
for key in required_config_keys: |
|
if key not in run.config: |
|
return False |
|
|
|
uid = run.config["uid"] |
|
validator_hotkey = run.config["hotkey"] |
|
contest_name = run.config["contest"] |
|
|
|
signing_message = f"{uid}:{validator_hotkey}:{contest_name}" |
|
|
|
try: |
|
return Keypair(validator_hotkey).verify(signing_message, run.config["signature"]) |
|
except Exception: |
|
return False |
|
|
|
|
|
def fetch_wandb_data(): |
|
now = datetime.now(tz=ZoneInfo("America/New_York")) |
|
global data |
|
wandb_runs = wandb_api.runs( |
|
WANDB_RUN_PATH, |
|
filters={"config.type": "validator", "created_at": {'$gt': str(now - timedelta(days=1))}}, |
|
order="-created_at", |
|
) |
|
|
|
for run in wandb_runs: |
|
if not is_valid_run(run): |
|
return |
|
|
|
submissions: set[int] = set() |
|
benchmarks: set[int] = set() |
|
invalid: list[int] = [] |
|
average_benchmark_time = 0.0 |
|
|
|
for key, value in run.summary.items(): |
|
if key == "average_benchmark_time": |
|
average_benchmark_time = float(value) |
|
|
|
elif key == "invalid": |
|
invalid = value |
|
|
|
elif key == "submissions": |
|
for submission_key, submission_value in value.items(): |
|
submissions.add(int(submission_key)) |
|
|
|
elif key == "benchmarks": |
|
for benchmark_key, benchmark_value in value.items(): |
|
benchmarks.add(int(benchmark_key)) |
|
|
|
source_validator_uid = run.config["uid"] |
|
data[source_validator_uid] = State( |
|
version=run.tags[1][8:], |
|
submissions=len(submissions), |
|
benchmarks=len(benchmarks), |
|
invalid=len(invalid), |
|
average_benchmark_time=average_benchmark_time |
|
) |
|
|
|
data = dict(sorted(data.items())) |
|
|
|
|
|
def refresh(): |
|
fetch_wandb_data() |
|
demo.clear() |
|
now = datetime.now(tz=ZoneInfo("America/New_York")) |
|
|
|
with demo: |
|
with gr.Accordion(f"Validator States (Last updated: {now.strftime('%Y-%m-%d %I:%M:%S %p')} EST)"): |
|
elements: list[tuple] = [] |
|
|
|
for source_validator_uid, state in data.items(): |
|
status = Status.IN_PROGRESS |
|
if state.benchmarks + state.invalid == state.submissions: |
|
status = Status.DONE |
|
elif not state.submissions or (not state.average_benchmark_time and state.benchmarks): |
|
status = Status.BROKEN |
|
|
|
eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid))) |
|
time_left = timedelta(seconds=eta) |
|
eta_date = now + time_left |
|
eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 else status.get_alt_time_text() |
|
|
|
elements.append(( |
|
source_validator_uid, |
|
state.version, |
|
state.benchmarks + state.invalid, |
|
state.submissions, |
|
state.invalid, |
|
f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else status.get_alt_time_text(), |
|
eta_time, |
|
time_left if eta > 0 else status.get_alt_time_text(), |
|
status.value, |
|
)) |
|
|
|
gr.components.Dataframe( |
|
elements, |
|
headers=["UID", "Version", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "Status"], |
|
datatype=["number", "markdown", "number", "number", "number", "number", "markdown", "markdown", "markdown"], |
|
elem_id="state-table", |
|
interactive=False, |
|
visible=True, |
|
) |
|
|
|
|
|
def main(): |
|
refresh() |
|
demo.launch(prevent_thread_lock=True) |
|
|
|
while True: |
|
sleep(REFRESH_RATE) |
|
|
|
now = datetime.now(tz=ZoneInfo("America/New_York")) |
|
print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
refresh() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|