IL-TUR-Leaderboard / eval_utils.py
shounakpaul95's picture
Update eval_utils.py
331f11f verified
raw
history blame
18.5 kB
import json
import re
from collections import defaultdict
import evaluate
import nltk
import numpy as np
from nervaluate import Evaluator
# from rouge_score import rouge_scorer
from sacrebleu.metrics import BLEU, CHRF
from sklearn.metrics import f1_score
from tqdm import tqdm
from transformers import AutoTokenizer
import rouge
import bert_score
import string
def load_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
def get_micro_at_k(gold, pred, k):
gold_set = set(gold)
pred_set = set(pred[:k])
return len(gold_set & pred_set), len(gold_set), len(pred_set)
def evaluate_bail(gold_data, pred_data):
gold_labels = []
pred_labels = []
for id, label in gold_data.items():
gold_labels.append(label)
pred_labels.append(pred_data.get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
print("Macro-F1 on HLDC-all-districts test set:", f1)
return {"mF1": f1}
def get_BLEU_score(ref_text_all, machine_text_all):
sc_all = []
for i in range(len(ref_text_all)):
ref_text = ref_text_all[i]
machine_text = machine_text_all[i]
tok_ref_text = nltk.word_tokenize(ref_text)
tok_machine_text = nltk.word_tokenize(machine_text)
sc = nltk.translate.bleu_score.sentence_bleu([tok_ref_text], tok_machine_text, weights = (0.5,0.5))
sc_all.append(sc)
return sum(sc_all)/len(sc_all)
def evaluate_cjpe(gold_data, pred_data):
# Evaluate prediction
gold_labels = []
pred_labels = []
for id, label in gold_data["prediction"].items():
gold_labels.append(label)
pred_labels.append(pred_data["prediction"].get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
prediction_result = {"cjpe-eval": f1}
print("Macro-F1 on ILDC test:", prediction_result)
R = []
B = []
rl_evaluator = rouge.Rouge(metrics=['rouge-l'], max_n=2, limit_length=False, apply_avg=True)
for x in range(1, 6):
gold_explanations = []
pred_explanations = []
for k,v in gold_data['explanation'].items():
gold_explanations.append(v[f'expert_{x}'])
pred_explanations.append(pred_data['explanation'][k])
print("Metrics for expert", x, "...", end=' ')
rougex = rl_evaluator.get_scores(pred_explanations, gold_explanations)['rouge-l']['f']
bleux = get_BLEU_score(gold_explanations, pred_explanations)
R.append(rougex)
B.append(bleux)
print("Done.")
rouge_score = sum(R)/len(R)
bleu_score = sum(B)/len(B)
explanation_result = {
"cjpe-exp-eval": {
"rouge": rouge_score,
"bleu": bleu_score,
}
}
print("Explanability for ILDC Expert:", explanation_result)
#return {**prediction_result, **explanation_result}
return {"mF1": f1, "ROUGE-L": rouge_score, "BLEU": bleu_score}
def span2bio(txt, roles):
roles = sorted(roles, key = lambda x:x['start'])
roles_left = [r['start'] for r in roles]
ttxt = re.findall(r'[{}]|\w+'.format(string.punctuation), txt)
c = 0
cr = -1
prev = 'O'
troles = []
for tok in ttxt:
if c >= len(txt):
break
while txt[c] == ' ':
c += 1
else:
if c in roles_left: # Start of a new role
ind = roles_left.index(c)
cr = roles[ind]['end']
prev = 'I-' + roles[ind]['label']
troles.append('B-' + roles[ind]['label'])
else:
if c < cr: # Assign previous role
troles.append(prev)
else: # Assign 'O'
troles.append('O')
c += len(tok)
if len(ttxt) != len(troles):
troles += ['O'] * (len(ttxt) - len(troles))
assert len(ttxt) == len(troles)
return ttxt, troles
def evaluate_lner(gold_data, pred_data, text_data):
with open("ner_labels.txt") as f:
labels = f.read().strip().split("\n")
results_per_fold = {}
for fold in range(1, len(gold_data) + 1):
gold = gold_data[f"fold_{fold}"]
pred = pred_data[f"fold_{fold}"]
text = text_data[f"fold_{fold}"]
texts, gold_labels, pred_labels = [], [], []
for id, gold_label in tqdm(gold.items()):
txt = text[id]
pred_label = pred.get(id, [])
txt_seg, gold_bio = span2bio(txt, gold_label)
_, pred_bio = span2bio(txt, pred_label)
texts.append(txt_seg)
gold_labels.append(gold_bio)
pred_labels.append(pred_bio)
evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list")
results, results_per_tag, _, _ = evaluator.evaluate()
f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag]
avg_f1 = sum(f1_scores) / len(f1_scores)
print(f"Strict Macro-F1 on Fold {fold}:", avg_f1)
results_per_fold[f"fold_{fold}"] = avg_f1
print("Strict macro-F1 on L-NER Dataset:", results_per_fold)
return {"strict mF1": sum(results_per_fold.values())/len(results_per_fold)}
def evaluate_rr(gold_data, pred_data):
all_gold_labels = []
all_pred_labels = []
with open("rr_label_vocab.json") as f:
label_vocab = json.load(f)
for id, gold_labels in gold_data.items():
pred_labels = pred_data.get(id, ["None"] * len(gold_labels))
for i in range(len(gold_labels)):
g = gold_labels[i]
p = pred_labels[i]
if g not in label_vocab: continue
for pp in p.split():
if pp in label_vocab:
p = pp
break
if p not in label_vocab: continue
all_gold_labels.append([label_vocab[g]])
all_pred_labels.append([label_vocab[p]])
f1 = f1_score(all_gold_labels, all_pred_labels, average="macro")
print(f"Macro-F1 on combined test set:", f1)
return {"mF1": f1}
def evaluate_lsi(gold_data, pred_data):
with open("lsi_label_vocab.json") as f:
label_vocab = json.load(f)
gold_matrix = np.zeros((len(gold_data), len(label_vocab)))
pred_matrix = np.zeros((len(gold_data), len(label_vocab)))
for i, (id, gold_labels) in enumerate(gold_data.items()):
pred_labels = pred_data.get(id, [])
for label in gold_labels:
if label in label_vocab:
gold_matrix[i, label_vocab[label]] = 1
for label in pred_labels:
if label in label_vocab:
pred_matrix[i, label_vocab[label]] = 1
f1 = f1_score(gold_matrix, pred_matrix, average="macro")
print("Macro-F1 on ILSI test set:", f1)
return {"mF1": f1}
def evaluate_pcr(gold_data, pred_data):
f1_scores = []
for k in range(1, 21):
correct, gold_total, pred_total = 0, 0, 0
for id, gold_candidates in tqdm(gold_data.items(), desc="pcr"):
pred_candidates = pred_data.get(id, [])
gold_candidates = [c for c in gold_candidates if c != id]
pred_candidates = [c for c in pred_candidates if c != id]
c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k)
correct += c
gold_total += g
pred_total += p
precision = correct / pred_total if pred_total > 0 else 0
recall = correct / gold_total if gold_total > 0 else 0
f1 = (
2 * precision * recall / (precision + recall)
if precision + recall > 0
else 0
)
f1_scores.append(f1)
print(f"Micro-F1@{k} on IL-PCR test set:", f1)
max_f1 = max(f1_scores)
index_max = f1_scores.index(max_f1) + 1
return {"muF1@K": f"{max_f1:.2f}@{index_max}"}
def evaluate_summ(gold_data, pred_data):
gold_summaries = []
pred_summaries = []
for id, gold_summary in gold_data.items():
if id in pred_data:
gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip()
pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip()
gold_summaries.append(gold_summary)
pred_summaries.append(pred_summary)
rl_evaluator = rouge.Rouge(metrics=['rouge-n','rouge-l'], max_n=2, limit_length=False, apply_avg=True)
rl_scores = rl_evaluator.get_scores(pred_summaries, gold_summaries)
print("Rouge:", {k:v['f'] for k,v in rl_scores.items()}, flush=True)
_, _, bs = bert_score.score(pred_summaries, gold_summaries, lang="en", verbose=True, device='cuda')
print("BERTSCORE:", bs.mean().item())
return {'ROUGE-L': rl_scores['rouge-l']['f'], 'BERTSCORE': bs.mean().item()}
def evaluate_lmt(gold_data, pred_data):
tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert", use_fast=False)
bleu = BLEU()
chrfp = CHRF(word_order=2)
gleu = evaluate.load("google_bleu")
G = defaultdict(lambda: defaultdict(list))
P = defaultdict(lambda: defaultdict(list))
for dataset in gold_data:
for id, gold_text in gold_data[dataset].items():
lang = id.split("/")[1].strip()
gold_tokens = " ".join(tokenizer.tokenize(gold_text))
pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id]))
G[dataset][lang].append(gold_tokens)
P[dataset][lang].append(pred_tokens)
bleu_scores, chrfpp_scores, gleu_scores = [], [], []
for dataset in G:
print("Dataset", dataset)
dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], []
for lang in G[dataset]:
gold = G[dataset][lang]
pred = P[dataset][lang]
bleu_score = bleu.corpus_score(pred, [gold]).score
chrfpp_score = chrfp.corpus_score(pred, [gold]).score
gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"]
dataset_bleu.append(bleu_score)
dataset_chrfpp.append(chrfpp_score)
dataset_gleu.append(gleu_score)
bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu))
chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp))
gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu))
return {
"BLEU": sum(bleu_scores) / len(bleu_scores),
"GLEU": sum(gleu_scores) / len(gleu_scores),
"chrF++": sum(chrfpp_scores) / len(chrfpp_scores),
}
def create_output_json(evaluation_results):
output = {
"Method": "Dummy Ideal",
"Submitted By": "IL-TUR",
"Github Link": "dummy submission",
"L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]},
"RR": {"mF1": evaluation_results["rr"]["mF1"]},
"CJPE": {
"mF1": evaluation_results["cjpe"]["mF1"],
"ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"],
"BLEU": evaluation_results["cjpe"]["BLEU"],
},
"BAIL": {"mF1": evaluation_results["bail"]["mF1"]},
"LSI": {"mF1": evaluation_results["lsi"]["mF1"]},
"PCR": {"muF1@K": evaluation_results["pcr"]["muF1@K"]},
"SUMM": {
"ROUGE-L": evaluation_results["summ"]["ROUGE-L"],
"BERTSCORE": evaluation_results["summ"]["BERTSCORE"] #"-", # Placeholder BERTSCORE
},
"L-MT": {
"BLEU": evaluation_results["lmt"]["BLEU"],
"GLEU": evaluation_results["lmt"]["GLEU"],
"chrF++": evaluation_results["lmt"]["chrF++"],
},
}
return [output] # Wrap in a list to match the desired format
def main():
# gold_data = load_json("IL_TUR_eval_gold.json")
# pred_data = load_json("IL_TUR_eval_submission2.json")
gold_data = load_json("submissions/baseline/IL_TUR_eval_gold.json")
pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_dummy.json")
pred_data = gold_data
evaluation_results = {}
for task in pred_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task])
elif task == "cjpe":
nltk.download('punkt')
evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task]))
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], pred_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task])
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task])
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task])
elif task == "summ":
nltk.download('punkt')
evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task])
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task])
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in pred_data:
evaluation_results[task] = blank_scores[task]
# Generate the output JSON
output_json = create_output_json(evaluation_results)
with open("evaluation_results.json", "w") as f:
json.dump(output_json, f, indent=2)
print("Evaluation results saved to evaluation_results.json")
def get_evaluation_scores(gold_data, submission_data):
evaluation_results = {}
for task in submission_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(
gold_data[task], submission_data[task]
)
elif task == "cjpe":
nltk.download('punkt')
evaluation_results.update(
evaluate_cjpe(gold_data[task], submission_data[task])
)
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], submission_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(
gold_data[task], submission_data[task]
)
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(
gold_data[task], submission_data[task]
)
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(
gold_data[task], submission_data[task]
)
elif task == "summ":
nltk.download('punkt')
evaluation_results[task] = evaluate_summ(
gold_data[task], submission_data[task]
)
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(
gold_data[task], submission_data[task]
)
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in submission_data:
evaluation_results[task] = blank_scores[task]
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
output_json = create_output_json(evaluation_results)
return output_json
if __name__ == "__main__":
main()