BenCzechMark / compare_significance.py
idolezal's picture
P艡id谩ny kategorie do `tasks_metadata.json`
c93b288
raw
history blame
9.72 kB
import argparse
import json
from collections import defaultdict
from typing import Sequence
import numpy as np
from numba import njit, prange
from scipy.stats import ttest_rel
from sklearn.metrics import roc_curve, auc
from tqdm import tqdm
SUPPORTED_METRICS = [
"avg_mcauroc", # for classification tasks
"exact_match", # for QA tasks
"acc", # for multichoice tasks
"rouge_raw_r2_mid_f", # for summarization tasks
"word_perplexity", # for language modeling tasks
]
def _get_CMs(i, probabilities, references, thresholds):
confusion_matrices = []
for threshold in thresholds[i]:
TP = 0
FP = 0
TN = 0
FN = 0
for j in range(len(probabilities)):
if probabilities[j][i] >= threshold:
if references[j] == i:
TP += 1
else:
FP += 1
else:
if references[j] == i:
FN += 1
else:
TN += 1
cm = {"TP": TP, "FP": FP, "TN": TN, "FN": FN, "threshold": threshold, "class": i}
confusion_matrices.append(cm)
return confusion_matrices
def compute_significance_ttest(scores_A, scores_B):
delta = np.mean(scores_A) - np.mean(scores_B)
if delta <= 0:
return 1.0, delta
t, p = ttest_rel(scores_A, scores_B)
# correct for one-tailed test
p_value = p / 2
return p_value, delta
@njit(parallel=True)
def compute_significance_bootstrap(scores_A, scores_B):
n = len(scores_A)
R = 1_000
delta_orig = np.mean(scores_A) - np.mean(scores_B)
if delta_orig <= 0:
return 1.0, delta_orig
r = 0
for _ in prange(R):
samples = np.random.choice(n, n, replace=True)
temp_A = scores_A[samples]
temp_B = scores_B[samples]
delta = np.mean(temp_A) - np.mean(temp_B)
if delta > 2 * delta_orig:
r += 1
pval = r / R
return pval, delta_orig
def compute_significance_avg_mcauroc(probsA: Sequence[Sequence[float]], referencesA: Sequence[int],
probsB: Sequence[Sequence[float]], referencesB: Sequence[int]):
# compute MC-AUC for model A
model_A_scores = get_mc_auc_samples(probsA, referencesA, Nsamples=100)
model_B_scores = get_mc_auc_samples(probsB, referencesB, Nsamples=100)
delta = np.mean(model_A_scores) - np.mean(model_B_scores)
# one-tailed test
p_value = ((model_A_scores[:, np.newaxis] <= model_B_scores[np.newaxis, :]).sum()
/ (len(model_A_scores) * len(model_B_scores)))
return p_value, delta
# Helper function to convert confusion matrices to numba-compatible arrays
def convert_confusion_matrices(confusion_matrices):
num_thresholds = len(confusion_matrices)
tp = np.empty(num_thresholds)
fn = np.empty(num_thresholds)
for k in range(num_thresholds):
tp[k] = confusion_matrices[k]["TP"]
fn[k] = confusion_matrices[k]["FN"]
return tp, fn
@njit(parallel=True)
def compute_tpr_variates(tp, fn, 位, Nsamples, num_thresholds):
tpr_variates_for_each_fpr = np.empty((num_thresholds, Nsamples))
for k in prange(num_thresholds):
tpr_variates_for_each_fpr[k, :] = np.random.beta(tp[k] + 位, fn[k] + 位, Nsamples)
return tpr_variates_for_each_fpr
def get_mc_auc_samples(probs, references, Nsamples=1_000_000):
n_classes = list(range(len(probs[0])))
fpr = dict()
thresholds = dict()
# compute AUC for every class
auc_scores_per_class = []
for i in range(len(n_classes)):
# for i-th class vs all others
fpr[i], _, thresholds[i] = roc_curve(y_true=[1 if x == n_classes[i] else 0 for x in references],
y_score=[prob[i] for prob in probs])
confusion_matrices = _get_CMs(i, probs, references, thresholds)
tp, fn = convert_confusion_matrices(confusion_matrices)
位 = 1.0 # <- Flat prior
# 位 = 0.5 # <- Jeffrey's prior
# sample variates for every threshold
# tpr_variates_for_each_fpr = []
# for k in range(len(thresholds[i])):
# tpr_variates_for_each_fpr.append(
# numpy.random.beta(confusion_matrices[k]["TP"] + 位, confusion_matrices[k]["FN"] + 位, Nsamples))
tpr_variates_for_each_fpr = compute_tpr_variates(tp, fn, 位, Nsamples, len(thresholds[i]))
# fprs x tpr_variates
# tpr_variates_for_each_fpr = np.array(tpr_variates_for_each_fpr)
# now pick 1 variate for each fpr, and compute AUC
auc_scores = []
for tpr_variates in tpr_variates_for_each_fpr.T:
auc_score = auc(fpr[i], tpr_variates)
# if numpy.isnan(auc_score):
# auc_score = 0
auc_scores.append(auc_score)
auc_scores_per_class.append(auc_scores)
auc_scores_per_class = np.array(auc_scores_per_class)
mcauc_scores = np.mean(auc_scores_per_class, axis=0)
return mcauc_scores
def read_json(file_path):
data = defaultdict(list)
with open(file_path, "r") as f:
fc = json.load(f)
for task, results in fc["predictions"].items():
# determine the metric
metric = None
for key in SUPPORTED_METRICS:
if key in results[0]:
metric = key
break
if metric is None:
raise ValueError(f"Unsupported metric in {file_path}")
if metric == "avg_mcauroc":
local_data = [line[metric] for line in fc["predictions"][task]]
unzipped_list = list(zip(*local_data))
golds = unzipped_list[0]
probs = unzipped_list[1]
data[task] = (golds, probs), metric
else:
scores = [line[metric] for line in fc["predictions"][task]]
data[task] = scores, metric
# make sure all tasks are submitted
METADATA_FILE = "tasks_metadata.json"
with open(METADATA_FILE, "r") as f:
metadata = json.load(f)
all_tasks = list(metadata.keys())
all_missing_tasks = []
for task in all_tasks:
if task not in data:
all_missing_tasks.append(task)
if len(all_missing_tasks) > 0:
EOLN = "\n"
raise ValueError(f"Missing tasks in {file_path}: {EOLN.join(all_missing_tasks)}")
return data
def process_task(task, dataA, dataB, significance_level):
metricA = dataA[task][1]
metricB = dataB[task][1]
assert metricA == metricB
assert len(dataA[task]) == len(dataB[task])
if metricA == "avg_mcauroc":
p_value, delta = compute_significance_avg_mcauroc(probsA=dataA[task][0][1], referencesA=dataA[task][0][0],
probsB=dataB[task][0][1], referencesB=dataB[task][0][0])
elif metricA in ["acc", "exact_match"]:
p_value, delta = compute_significance_ttest(scores_A=dataA[task][0], scores_B=dataB[task][0])
elif metricA in ["rouge_raw_r2_mid_f", "word_perplexity"]:
p_value, delta = compute_significance_bootstrap(scores_A=np.array(dataA[task][0]),
scores_B=np.array(dataB[task][0]))
else:
raise ValueError(f"Unsupported metric {metricA}")
if delta <= 0:
p_value = 1.0
return task, {
"significant": not (p_value > significance_level),
"p_value": p_value,
"delta": delta,
}
def check_significance(fileA, fileB, significance_level=0.05):
dataA = read_json(fileA)
dataB = read_json(fileB)
decisions = dict()
_iter = tqdm(list(dataA.keys()))
for task in _iter:
_iter.set_description(f"Processing task: {task}")
metricA = dataA[task][1]
metricB = dataB[task][1]
assert metricA == metricB
assert len(dataA[task]) == len(dataB[task])
if metricA == "avg_mcauroc":
p_value, delta = compute_significance_avg_mcauroc(probsA=dataA[task][0][1], referencesA=dataA[task][0][0],
probsB=dataB[task][0][1], referencesB=dataB[task][0][0])
elif metricA in ["acc", "exact_match"]:
p_value, delta = compute_significance_ttest(scores_A=dataA[task][0], scores_B=dataB[task][0])
elif metricA in ["rouge_raw_r2_mid_f", "word_perplexity"]:
p_value, delta = compute_significance_bootstrap(scores_A=np.array(dataA[task][0]),
scores_B=np.array(dataB[task][0]))
else:
raise ValueError(f"Unsupported metric {metricA}")
if delta <= 0:
p_value = 1.0
decisions[task] = {
"significant": not (p_value > significance_level),
"p_value": p_value,
"delta": delta,
}
return decisions
def main():
parser = argparse.ArgumentParser(description="One-tailed test if model A improves over model B.")
parser.add_argument("--modelA", help="ModelA JSON file from lm harness.")
parser.add_argument("--modelB", help="ModelB JSON file from lm harness.")
parser.add_argument("--significance_level", type=float, default=0.05, help="Significance level (e.g., 0.05)")
args = parser.parse_args()
result = check_significance(args.modelA, args.modelB, args.significance_level)
print(json.dumps(result, indent=2))
# harness already returns stderr estimate for sampling distribution
# see https://github.com/EleutherAI/lm-evaluation-harness/blob/6433bd3fe3033d302b22cdcd53af237e9039ef29/lm_eval/api/metrics.py#L213
if __name__ == "__main__":
check_significance("../csmpt.json", "../llama3_instruct.json", 0.05)
main()