Spaces:
Runtime error
Runtime error
import argparse | |
import datetime | |
import functools | |
import json | |
import math | |
import os | |
import time | |
import traceback | |
from dataclasses import dataclass | |
from typing import Any, Dict, List, Optional, Tuple | |
import bittensor as bt | |
import numpy as np | |
import pandas as pd | |
import wandb | |
from bittensor.extrinsics.serving import get_metadata | |
from dotenv import load_dotenv | |
from wandb.apis.public.history import HistoryScan, SampledHistoryScan | |
from competitions import COMP_NAME_TO_ID | |
NETUID = 37 | |
DELAY_SECS = 3 | |
RETRIES = 3 | |
load_dotenv() | |
WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None) | |
SUBTENSOR_ENDPOINT = os.environ.get("SUBTENSOR_ENDPOINT", None) | |
VALIDATOR_WANDB_PROJECT = "rusticluftig/finetuning" | |
BENCHMARK_WANDB_PROJECT = "rusticluftig/test-benchmarks" | |
class ModelData: | |
uid: int | |
hotkey: str | |
competition_id: int | |
namespace: str | |
name: str | |
commit: str | |
# Hash of (hash(model) + hotkey) | |
secure_hash: str | |
block: int | |
incentive: float | |
emission: float | |
def from_compressed_str( | |
cls, | |
uid: int, | |
hotkey: str, | |
cs: str, | |
block: int, | |
incentive: float, | |
emission: float, | |
): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2], | |
secure_hash=tokens[3], | |
competition_id=int(tokens[4]), | |
block=block, | |
incentive=incentive, | |
emission=emission, | |
) | |
def run_with_retries(func, *args, **kwargs): | |
"""Runs a provided function with retries in the event of a failure.""" | |
for i in range(0, RETRIES): | |
try: | |
return func(*args, **kwargs) | |
except (Exception, RuntimeError): | |
print(f"Failed to run function: {traceback.format_exc()}") | |
if i == RETRIES - 1: | |
raise | |
time.sleep(DELAY_SECS) | |
raise RuntimeError("Should never happen") | |
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: | |
"""Returns a subtensor and metagraph for the finetuning subnet.""" | |
def _internal() -> Tuple[bt.subtensor, bt.metagraph]: | |
if SUBTENSOR_ENDPOINT: | |
parser = argparse.ArgumentParser() | |
bt.subtensor.add_args(parser) | |
subtensor = bt.subtensor( | |
config=bt.config( | |
parser=parser, | |
args=["--subtensor.chain_endpoint", SUBTENSOR_ENDPOINT], | |
) | |
) | |
else: | |
subtensor = bt.subtensor("finney") | |
metagraph = subtensor.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
return run_with_retries(_internal) | |
def get_subnet_data( | |
subtensor: bt.subtensor, metagraph: bt.metagraph | |
) -> List[ModelData]: | |
result = [] | |
for uid in metagraph.uids.tolist(): | |
hotkey = metagraph.hotkeys[uid] | |
metadata = None | |
try: | |
metadata = run_with_retries( | |
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
) | |
except: | |
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}") | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = np.nan_to_num(metagraph.incentive[uid]).item() | |
emission = ( | |
np.nan_to_num(metagraph.emission[uid]).item() * 20 | |
) # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str( | |
uid, hotkey, chain_str, block, incentive, emission | |
) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def get_wandb_runs( | |
project: str, filters: Dict[str, Any], order: str = "-created_at" | |
) -> List: | |
"""Get the latest runs from Wandb, retrying infinitely until we get them. | |
Args: | |
project (str): The Wandb project to get runs from. | |
filters (Dict[str, Any]): Filters to apply to the runs. | |
order (str): Order to sort the runs by. Defaults to "-created_at" (newest first) | |
Returns: | |
List: List of runs matching the provided filters | |
""" | |
while True: | |
api = wandb.Api(api_key=WANDB_TOKEN, timeout=100) | |
runs = list( | |
api.runs( | |
project, | |
filters=filters, | |
order=order, | |
) | |
) | |
if len(runs) > 0: | |
return runs | |
# WandDB API is quite unreliable. Wait another minute and try again. | |
print("Failed to get runs from Wandb. Trying again in 60 seconds.") | |
time.sleep(60) | |
def get_scores( | |
uids: List[int], | |
wandb_runs: List, | |
) -> Dict[int, Dict[str, Optional[float]]]: | |
"""Returns the most recent scores for the provided UIDs. | |
Args: | |
uids (List[int]): List of UIDs to get scores for. | |
wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order. | |
""" | |
result = {} | |
previous_timestamp = None | |
seen_competitions = set() | |
# Iterate through the runs until we've processed all the uids. | |
for i, run in enumerate(wandb_runs): | |
if not "original_format_json" in run.summary: | |
continue | |
data = json.loads(run.summary["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = data["timestamp"] | |
# Make sure runs are indeed in descending time order. | |
assert ( | |
previous_timestamp is None or timestamp < previous_timestamp | |
), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" | |
previous_timestamp = timestamp | |
comp_id = data.get("competition_id", None) | |
for uid in uids: | |
if uid in result: | |
continue | |
if str(uid) in all_uid_data: | |
uid_data = all_uid_data[str(uid)] | |
# Only the most recent run per competition is fresh. | |
is_fresh = comp_id not in seen_competitions | |
result[uid] = { | |
"avg_loss": uid_data.get("average_loss", None), | |
"win_rate": uid_data.get("win_rate", None), | |
"win_total": uid_data.get("win_total", None), | |
"weight": uid_data.get("weight", None), | |
"competition_id": uid_data.get("competition_id", None), | |
"fresh": is_fresh, | |
} | |
seen_competitions.add(comp_id) | |
if len(result) == len(uids): | |
break | |
return result | |
def get_validator_weights( | |
metagraph: bt.metagraph, | |
) -> Dict[int, Tuple[float, int, Dict[int, float]]]: | |
"""Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight}).""" | |
ret = {} | |
for uid in metagraph.uids.tolist(): | |
vtrust = metagraph.validator_trust[uid].item() | |
stake = metagraph.stake[uid].item() | |
if vtrust > 0 and stake > 10_000: | |
ret[uid] = (vtrust, stake, {}) | |
for ouid in metagraph.uids.tolist(): | |
if ouid == uid: | |
continue | |
weight = round(metagraph.weights[uid][ouid].item(), 4) | |
if weight > 0: | |
ret[uid][-1][ouid] = weight | |
return ret | |
def get_losses_over_time(wandb_runs: List, competition_id: int) -> pd.DataFrame: | |
"""Returns a dataframe of the best average model loss over time.""" | |
timestamps = [] | |
losses = [] | |
for run in wandb_runs: | |
# For each run, check the 10 most recent steps. | |
best_loss = math.inf | |
should_add_datapoint = False | |
min_step = max(0, run.lastHistoryStep - 10) | |
history_scan = SampledHistoryScan( | |
run.client, | |
run, | |
["original_format_json"], | |
min_step, | |
run.lastHistoryStep, | |
page_size=10, | |
) | |
max_timestamp = None | |
for step in history_scan: | |
data = json.loads(step["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = datetime.datetime.fromtimestamp(data["timestamp"]) | |
if max_timestamp is None: | |
max_timestamp = timestamp | |
max_timestamp = max(max_timestamp, timestamp) | |
for _, uid_data in all_uid_data.items(): | |
loss = uid_data.get("average_loss", math.inf) | |
c_id = uid_data.get("competition_id", None) | |
if c_id is None or c_id != competition_id: | |
continue | |
# Filter out issue caused by wandb unavailability. | |
if loss < 0.99 and loss < best_loss: | |
best_loss = loss | |
should_add_datapoint = True | |
# Now that we've processed the run's most recent steps, check if we should add a datapoint. | |
if should_add_datapoint: | |
timestamps.append(max_timestamp) | |
losses.append(best_loss) | |
return pd.DataFrame({"timestamp": timestamps, "losses": losses}) | |
def is_floatable(x) -> bool: | |
return ( | |
isinstance(x, float) and not math.isnan(x) and not math.isinf(x) | |
) or isinstance(x, int) | |
def format_score(uid: int, scores, key) -> Optional[float]: | |
if uid in scores: | |
if key in scores[uid]: | |
point = scores[uid][key] | |
if is_floatable(point): | |
return round(scores[uid][key], 4) | |
return None | |
def leaderboard_data( | |
leaderboard: List[ModelData], | |
scores: Dict[int, Dict[str, Optional[float]]], | |
competition_id: int, | |
show_stale: bool, | |
) -> List[List[Any]]: | |
"""Returns the leaderboard data, based on models data and UID scores.""" | |
return [ | |
[ | |
f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})", | |
format_score(c.uid, scores, "win_rate"), | |
format_score(c.uid, scores, "avg_loss"), | |
format_score(c.uid, scores, "weight"), | |
c.uid, | |
c.block, | |
] | |
for c in leaderboard | |
if c.competition_id == competition_id | |
and ((c.uid in scores and scores[c.uid]["fresh"]) or show_stale) | |
] | |
def get_benchmarks() -> Tuple[pd.DataFrame, Dict[str, Dict[str, float]]]: | |
"""Returns the latest benchmarks and the time they were run.""" | |
if not BENCHMARK_WANDB_PROJECT: | |
print("No benchmark project set.") | |
return None, None | |
runs = get_wandb_runs( | |
project=BENCHMARK_WANDB_PROJECT, filters=None, order="+created_at" | |
) | |
timestamps, uids, models, comp_ids, mmlu, mmlu_pro = [], [], [], [], [], [] | |
for run in runs: | |
uid = run.config.get("uid", None) | |
model = run.config.get("model", None) | |
# Any run without a competition_id was for competition 2. | |
comp_name = run.config.get("competition_id", "B7_MULTI_CHOICE") | |
comp_id = COMP_NAME_TO_ID.get(comp_name, 2) | |
if not uid or not model: | |
continue | |
samples = list( | |
HistoryScan( | |
run.client, | |
run, | |
0, | |
1, | |
) | |
) | |
if not samples: | |
continue | |
sample = samples[0] | |
# Make sure we have all the required keys. | |
has_all_keys = True | |
for required_key in ["mmlu.acc,none", "mmlu_pro", "_timestamp"]: | |
if required_key not in sample: | |
has_all_keys = False | |
break | |
if not has_all_keys: | |
continue | |
comp_ids.append(comp_id) | |
timestamps.append(datetime.datetime.fromtimestamp(sample["_timestamp"])) | |
mmlu.append(sample["mmlu.acc,none"]) | |
mmlu_pro.append(sample["mmlu_pro"]) | |
uids.append(uid) | |
models.append(model) | |
return ( | |
pd.DataFrame( | |
{ | |
"timestamp": timestamps, | |
"uid": uids, | |
"model": models, | |
"competition_id": comp_ids, | |
"mmlu": mmlu, | |
"mmlu_pro": mmlu_pro, | |
} | |
), | |
{ | |
"mmlu": { | |
"Llama-3.1-8B-Instruct": 0.681, | |
"Mistral-7B-Instruct-v0.3": 0.597, | |
"gemma-2-9b-it": 0.719, | |
}, | |
"mmlu_pro": { | |
"Llama-3.1-8B-Instruct": 30.68, | |
"Mistral-7B-Instruct-v0.3": 23.06, | |
"gemma-2-9b-it": 31.95, | |
}, | |
}, | |
) | |
def make_validator_dataframe( | |
validator_df: pd.DataFrame, model_data: ModelData | |
) -> pd.DataFrame: | |
values = [ | |
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] | |
+ [validator_df[uid][-1].get(c.uid) for c in model_data if c.incentive] | |
for uid, _ in sorted( | |
zip( | |
validator_df.keys(), | |
[validator_df[x][1] for x in validator_df.keys()], | |
), | |
key=lambda x: x[1], | |
reverse=True, | |
) | |
] | |
dtypes = {"UID": int, "Stake (τ)": float, "V-Trust": float} | |
dtypes.update( | |
{ | |
f"{c.namespace}/{c.name} ({c.commit[0:8]})": float | |
for c in model_data | |
if c.incentive | |
} | |
) | |
return pd.DataFrame(values, columns=dtypes.keys()).astype(dtypes) | |
def make_metagraph_dataframe(metagraph: bt.metagraph, weights=False) -> pd.DataFrame: | |
cols = [ | |
"stake", | |
"emission", | |
"trust", | |
"validator_trust", | |
"dividends", | |
"incentive", | |
"R", | |
"consensus", | |
"validator_permit", | |
] | |
frame = pd.DataFrame({k: getattr(metagraph, k) for k in cols}) | |
frame["block"] = metagraph.block.item() | |
frame["netuid"] = NETUID | |
frame["uid"] = range(len(frame)) | |
frame["hotkey"] = [axon.hotkey for axon in metagraph.axons] | |
frame["coldkey"] = [axon.coldkey for axon in metagraph.axons] | |
if weights and metagraph.W is not None: | |
# convert NxN tensor to a list of lists so it fits into the dataframe | |
frame["weights"] = [w.tolist() for w in metagraph.W] | |
return frame | |
def load_state_vars() -> dict[Any]: | |
while True: | |
try: | |
subtensor, metagraph = get_subtensor_and_metagraph() | |
print(f"Loaded subtensor and metagraph: {metagraph}") | |
model_data: List[ModelData] = get_subnet_data(subtensor, metagraph) | |
model_data.sort(key=lambda x: x.incentive, reverse=True) | |
print(f"Loaded {len(model_data)} models") | |
vali_runs = get_wandb_runs( | |
project=VALIDATOR_WANDB_PROJECT, | |
filters={ | |
"$and": [{"config.type": "validator"}], | |
"$or": [{"config.uid": 28}, {"config.uid": 16}], | |
}, | |
) | |
print(f"Loaded {len(vali_runs)} validator runs") | |
scores = get_scores([x.uid for x in model_data], vali_runs) | |
print(f"Loaded {len(scores)} scores") | |
validator_df = get_validator_weights(metagraph) | |
weight_keys = set() | |
for uid, stats in validator_df.items(): | |
weight_keys.update(stats[-1].keys()) | |
print("Loaded validator weights") | |
# Compute loss over time for all competitions. | |
# losses_2 = get_losses_over_time(vali_runs, 2) | |
# print("Loaded losses over time for comp 2") | |
benchmarks_df, benchmarks_targets = get_benchmarks() | |
print("Loaded benchmarks") | |
break | |
except KeyboardInterrupt: | |
print("Exiting...") | |
break | |
except Exception as e: | |
print(f"Failed to get data: {traceback.format_exc()}") | |
time.sleep(30) | |
return { | |
"metagraph": metagraph, | |
"model_data": model_data, | |
"vali_runs": vali_runs, | |
"scores": scores, | |
"validator_df": validator_df, | |
"benchmarks_df": benchmarks_df, | |
"benchmarks_targets": benchmarks_targets, | |
} | |