MsRandom's picture
Update timezone
4011c6a verified
raw
history blame
8.41 kB
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from zoneinfo import ZoneInfo
America/New_York
import bittensor as bt
import gradio as gr
from packaging import version
from substrateinterface import Keypair
from wandb.apis.importers import wandb
from wandb.apis.public import Run
WANDB_RUN_PATH = os.environ["WANDB_RUN_PATH"]
REFRESH_RATE = 30
NET_UID = 39
AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD = 600 # 10 minutes
ETA_WARNING_THRESHOLD = 43200 # 12 hours
subtensor = bt.subtensor()
metagraph = bt.metagraph(netuid=NET_UID)
bt.logging.disable_logging()
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}", fill_height=True, fill_width=True)
class Status(Enum):
BENCHMARKING = ("Benchmarking", "orange")
DONE = ("Done", "springgreen")
BROKEN = ("Broken", "red")
STOPPED = ("Stopped", "red")
CRASHED = ("Crashed", "red")
INITIALIZING = ("Initializing", "orange")
def get_alt_time_text(self) -> str:
return "∞" if self.is_broken() else "N/A"
def name(self):
return self.value[0]
def color(self):
return self.value[1]
def is_broken(self):
return self == Status.BROKEN or self == Status.STOPPED or self == Status.CRASHED or self == Status.INITIALIZING
@dataclass
class State:
status: Status
hotkey: str
name: str
version: str
winner: int | None
submissions: int
benchmarks: int
invalid: int
average_benchmark_time: float
vtrust: float
updated: int
data: dict[int, State] = {}
last_refresh: datetime = datetime.fromtimestamp(0, tz=ZoneInfo("US/Pacific"))
def is_valid_run(run: Run):
required_config_keys = ["hotkey", "uid", "contest", "signature"]
for key in required_config_keys:
if key not in run.config:
return False
uid = run.config["uid"]
validator_hotkey = run.config["hotkey"]
contest_name = run.config["contest"]
signing_message = f"{uid}:{validator_hotkey}:{contest_name}"
try:
return Keypair(validator_hotkey).verify(signing_message, run.config["signature"])
except Exception:
return False
def get_identity(uid: int) -> str | None:
identity = subtensor.substrate.query('SubtensorModule', 'Identities', [metagraph.coldkeys[uid]])
return identity.value["name"] if identity != None else None
def fetch_wandb_data():
wandb_api = wandb.Api()
global data
data.clear()
now = datetime.now(tz=ZoneInfo("US/Pacific"))
noon = now.replace(hour=12, minute=0, second=0, microsecond=0)
if now.hour < 12:
noon -= timedelta(days=1)
wandb_runs = wandb_api.runs(
WANDB_RUN_PATH,
filters={"config.type": "validator", "created_at": {'$gt': str(noon)}},
order="-created_at",
)
for run in wandb_runs:
if not is_valid_run(run):
continue
uid = run.config["uid"]
if not metagraph.validator_permit[uid]:
continue
winner = None
submissions: set[int] = set()
benchmarks: set[int] = set()
invalid: list[int] = []
completed = False
average_benchmark_time = 0.0
for key, value in run.summary.items():
if key == "average_benchmark_time":
average_benchmark_time = float(value)
elif key == "invalid":
invalid = value
elif key == "submissions":
for submission_key, submission_value in value.items():
submissions.add(int(submission_key))
elif key == "benchmarks":
for benchmark_uid, benchmark in value.items():
if "winner" in benchmark:
winner = benchmark_uid
completed = True
break
for benchmark_key, benchmark_value in value.items():
benchmarks.add(int(benchmark_key))
status = Status.BENCHMARKING
run_state = run.state
if run_state == "finished":
status = Status.STOPPED
elif run_state == "crashed":
status = Status.CRASHED
elif completed:
status = Status.DONE
elif not submissions or (not average_benchmark_time and benchmarks):
if 12 <= now.hour < 13:
status = Status.INITIALIZING
else:
status = Status.BROKEN
block = subtensor.get_current_block()
data[uid] = State(
status=status,
hotkey=run.config["hotkey"],
name=get_identity(uid) or run.config["hotkey"],
version=run.tags[1][8:],
winner=winner,
submissions=len(submissions),
benchmarks=len(benchmarks),
invalid=len(invalid),
average_benchmark_time=average_benchmark_time,
vtrust=metagraph.validator_trust[uid],
updated=block - metagraph.last_update[uid],
)
data = dict(sorted(data.items()))
def get_latest_version(data: dict[int, State]) -> str:
latest_version = version.parse("0.0.0")
for source_validator_uid, state in data.items():
current_version = version.parse(state.version)
if current_version > latest_version:
latest_version = current_version
return str(latest_version)
def try_refresh():
global last_refresh
now = datetime.now(tz=ZoneInfo("US/Pacific"))
if (now - last_refresh).total_seconds() > REFRESH_RATE:
print(f"Refreshing States at {now.strftime('%Y-%m-%d %H:%M:%S')}")
metagraph.sync(subtensor=subtensor)
fetch_wandb_data()
last_refresh = now
def get_data() -> gr.Dataframe:
try_refresh()
elements: list[tuple] = []
latest_version = get_latest_version(data)
now = datetime.now(tz=ZoneInfo("US/Pacific"))
for uid, state in data.items():
eta = int(state.average_benchmark_time * (state.submissions - (state.benchmarks + state.invalid)))
time_left = timedelta(seconds=eta)
eta_date = now + time_left
eta_time = eta_date.strftime("%Y-%m-%d %I:%M:%S %p") if eta > 0 and state.status == Status.BENCHMARKING else state.status.get_alt_time_text()
average_time_text = f"{timedelta(seconds=int(state.average_benchmark_time))}" if state.average_benchmark_time else state.status.get_alt_time_text(),
elements.append((
uid,
state.name,
f"<span style='color: {'springgreen' if state.version == latest_version else 'red'}'>{state.version}</span>",
f"<span style='color: {state.status.color()}'>{state.status.name()}</span>",
f"<span style='color: {'springgreen' if state.winner else 'orange'}'>{state.winner if state.winner else 'N/A'}</span>",
state.benchmarks + state.invalid,
state.submissions,
state.invalid,
f"<span style='color: {'orange' if state.average_benchmark_time > AVERAGE_BENCHMARK_TIME_WARNING_THRESHOLD else 'springgreen'}'>{average_time_text[0]}</span>",
f"<span style='color: {'orange' if eta > ETA_WARNING_THRESHOLD else 'springgreen'}'>{eta_time}</span>",
f"<span style='color: {'orange' if eta > ETA_WARNING_THRESHOLD else 'springgreen'}'>{time_left if eta > 0 and state.status == Status.BENCHMARKING else state.status.get_alt_time_text()}</span>",
f"<span style='color: {'springgreen' if state.vtrust > 0.75 else 'red'}'>{state.vtrust:.4f}</span>",
f"<span style='color: {'springgreen' if state.updated < 1000 else 'red'}'>{state.updated}</span>",
))
return gr.Dataframe(
elements,
headers=["UID", "Name", "Version", "Status", "Winner UID", "Tested", "Submissions", "Invalid", "Avg. Benchmark Time", "ETA (Eastern Time)", "ETA Remaining", "VTrust", "Updated"],
datatype=["number", "markdown", "markdown", "markdown", "markdown", "number", "number", "number", "markdown", "markdown", "markdown", "markdown", "markdown"],
label=f"SN{NET_UID} Validator States (Last updated: {last_refresh.strftime('%Y-%m-%d %I:%M:%S %p')} EST)",
interactive=False,
)
def main():
with demo:
table = gr.Dataframe()
table.attach_load_event(lambda: get_data(), None)
demo.launch()
if __name__ == '__main__':
main()