Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
from rouge_score import rouge_scorer | |
from joblib import Parallel, delayed | |
#from transformers import AutoTokenizer, DebertaForSequenceClassification | |
#import torch | |
from tqdm import tqdm | |
import logging | |
from .plots import bcolors | |
import random | |
logger = logging.getLogger(__name__) | |
# Local only for now | |
#DEVICE = "mps" if torch.backends.mps.is_available() else "cpu" | |
DEVICE = 'cpu' | |
def call_counter(func): | |
def helper(*args, **kwargs): | |
helper.calls += 1 | |
return func(*args, **kwargs) | |
helper.calls = 0 | |
return helper | |
# @call_counter | |
# def entailment(tokenizer: AutoTokenizer, model: DebertaForSequenceClassification, a: str, b:str, c:str, df: pd.DataFrame) -> float: | |
# """ | |
# uses model c to evaluate a vs. b | |
# Entailment based on natural language inference - binary outcomes version. | |
# """ | |
# def __helper(x, h): | |
# premise = x[c] | |
# hypothesis = x[h] | |
# formatted_text = f"{premise}{tokenizer.sep_token}{hypothesis}" | |
# inputs = tokenizer(formatted_text, return_tensors="pt", padding=True, truncation=True).to(DEVICE) | |
# # Fetch class probabilities | |
# with torch.no_grad(): | |
# predid = model(**inputs).logits.argmax(-1) | |
# out = model.config.id2label[predid.item()] | |
# if out == 'ENTAILMENT': | |
# return 1 | |
# else: | |
# return 0 | |
# a_ent = df.apply(__helper, args=(a,), axis=1) | |
# b_ent = df.apply(__helper, args=(b,), axis=1) | |
# if sum(a_ent) == sum(b_ent): | |
# logger.info(f"Judge: {c}, {bcolors.PURPLE}{bcolors.BOLD}Model {a}: {sum(a_ent)}, Model {b}: {sum(b_ent)} {bcolors.ENDC} (of {len(df)}).") | |
# return 0.5 # tied - in aggregate | |
# elif sum(a_ent) > sum(b_ent): | |
# logger.info(f"Judge: {c}, {bcolors.RED}{bcolors.BOLD}Model {a}: {sum(a_ent)}{bcolors.ENDC}, Model {b}: {sum(b_ent)} (of {len(df)}).") | |
# return 1 # a wins - in aggregate | |
# else: | |
# logger.info(f"Judge: {c}, Model {a}: {sum(a_ent)}, {bcolors.RED}{bcolors.BOLD}Model {b}: {sum(b_ent)}{bcolors.ENDC} (of {len(df)}).") | |
# return 0 # b wins | |
# @call_counter | |
# def entailment_p(tokenizer: AutoTokenizer, model: DebertaForSequenceClassification, a: str, b:str, c:str, df: pd.DataFrame) -> int: | |
# """ | |
# uses model c to evaluate a vs. b | |
# Entailment based on natural language inference - PROBABILITY version. | |
# """ | |
# def chunks(lst, batch_size): | |
# for i in range(0, len(lst), batch_size): | |
# yield lst[i:i + batch_size] | |
# def inference(ft): | |
# inputs = tokenizer(ft, return_tensors="pt", padding=True, truncation=True).to(DEVICE) | |
# idx = model.config.label2id['ENTAILMENT'] | |
# # Fetch entailment probabilities | |
# with torch.no_grad(): | |
# logits = model(**inputs).logits | |
# p = torch.nn.functional.softmax(logits, dim=1).to("cpu").numpy()[:, idx] | |
# return p.tolist() | |
# # prepare inputs | |
# premise = df[c] | |
# formatted_text = (premise + tokenizer.sep_token + df[a]).to_list() + \ | |
# (premise + tokenizer.sep_token + df[b]).to_list() | |
# p = [] | |
# for i in chunks(formatted_text, 4): | |
# p += inference(i) | |
# # Compare entailment probs between model 'a' and 'b' | |
# ent_a = p[:len(p)//2] | |
# ent_b = p[len(p)//2:] | |
# values = [1 if i >= j else 0 for i, j in zip(ent_a, ent_b)] # 1-> "a" wins | |
# # Win percentage | |
# if sum(values) >= (0.5 * len(values)): | |
# return 1 # a wins | |
# else: | |
# return 0 # b wins | |
def equality(a: str, b:str, c:str, df:pd.DataFrame) -> int: | |
""" | |
use model c to evaluate a vs. b | |
simple heuristic as the answers are multiple choice, so use equality. | |
""" | |
ties = df[a] == df[b] | |
a_wins = sum((df[a] == df[c]) & ~(ties)) | |
b_wins = sum((df[b] == df[c]) & ~(ties)) | |
if a_wins >= b_wins: | |
return 1 | |
else: | |
return 0 | |
def noisy_equality(a: str, b:str, c:str, df:pd.DataFrame, p: float) -> int: | |
""" | |
use model c to evaluate a vs. b | |
noisy version of equality - where evaluations are flipped independently with | |
probability p (p=1 will always flip, p=0, will never) | |
""" | |
random.seed(42) | |
perturb = lambda x: not x if (random.random() <= p) else x | |
ties = (df[a] == df[b]) | |
a_w = (df[a] == df[c]).apply(perturb) | |
b_w = (df[b] == df[c]).apply(perturb) | |
a_wins = sum(a_w & ~(ties)) | |
b_wins = sum(b_w & ~(ties)) | |
if a_wins >= b_wins: | |
return 1 | |
else: | |
return 0 | |
def rouge(a: str, b: str, c:str, df: pd.DataFrame) -> float: | |
""" | |
Summarization metric ROUGE2 - discrete version | |
""" | |
scorer = rouge_scorer.RougeScorer(["rouge2"], use_stemmer=True) | |
def __helper(x) -> int: | |
score_a = scorer.score(x[c], x[a])['rouge2'].fmeasure | |
score_b = scorer.score(x[c], x[b])['rouge2'].fmeasure | |
#logger.info(f"{score_a}, {score_b}") | |
if score_a >= score_b: | |
return 1 # a wins this instance | |
else: | |
return 0 # b wins | |
outcomes = df.apply(__helper, axis=1) | |
a_wins = sum(outcomes) | |
b_wins = sum(outcomes==0) | |
if a_wins == b_wins: | |
logger.debug(f"Judge: {c}, {bcolors.PURPLE}{bcolors.BOLD}Model {a}: {a_wins}, Model {b}: {b_wins} {bcolors.ENDC} (of {len(df)}).") | |
return 0.5 # tied overall | |
elif a_wins > b_wins: | |
logger.debug(f"Judge: {c}, {bcolors.RED}{bcolors.BOLD}Model {a}: {a_wins}{bcolors.ENDC}, Model {b}: {b_wins} (of {len(df)}).") | |
return 1 # a wins overall | |
else: | |
logger.debug(f"Judge: {c}, Model {a}: {a_wins}, {bcolors.RED}{bcolors.BOLD}Model {b}: {b_wins}{bcolors.ENDC} (of {len(df)}).") | |
return 0 # b wins | |
def rouge_avg(a: str, b: str, c:str, df: pd.DataFrame) -> float: | |
""" | |
Summarization metric ROUGE2 - based on averages | |
Following HELM returns the fmeasure | |
https://github.com/stanford-crfm/helm/blob/9be35a339347a9f2ad5644d7b72aede57486e3d4/src/helm/benchmark/metrics/basic_metrics.py#L256 | |
""" | |
def __true_rouge(x, m, scorer): | |
try: | |
scores = scorer.score(x[c], x[m]) | |
value = scores["rouge2"].fmeasure | |
return value | |
except AttributeError: | |
#print(x[c], x[m]) | |
return 0.0 | |
if a == b: | |
return 0.5 # its a tie! | |
if a == c: | |
return 1. # a wins (as judge is the same) | |
if b == c: | |
return 0. # b wins as its also the judge | |
scorer = rouge_scorer.RougeScorer(["rouge2"], use_stemmer=True) | |
values = {} | |
for m in [a, b]: | |
values[m] = Parallel(n_jobs=-1, batch_size=128)( | |
delayed(__true_rouge)(i, m, scorer) for _, i in df.iterrows() | |
) | |
# Compare average rouge score over entire benchmark | |
if np.mean(values[a]) >= np.mean(values[b]): | |
return 1. # a wins | |
else: | |
return 0. # b wins | |