IL-TUR-Leaderboard / eval_utils.py
abhinav-joshi's picture
add prediction submission
e1043c6
raw
history blame
15.4 kB
import json
import re
from collections import defaultdict
import evaluate
import nltk
import numpy as np
from nervaluate import Evaluator
from rouge_score import rouge_scorer
from sacrebleu.metrics import BLEU, CHRF
from sklearn.metrics import f1_score
from tqdm import tqdm
from transformers import AutoTokenizer
from ner_helpers import span2bio
def load_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
def get_micro_at_k(gold, pred, k):
gold_set = set(gold)
pred_set = set(pred[:k])
return len(gold_set & pred_set), len(gold_set), len(pred_set)
def evaluate_bail(gold_data, pred_data):
gold_labels = []
pred_labels = []
for id, label in gold_data.items():
gold_labels.append(label)
pred_labels.append(pred_data.get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
print("Macro-F1 on HLDC-all-districts test set:", f1)
return f"{f1:.2f}"
def evaluate_cjpe(gold_data, pred_data):
# Evaluate prediction
gold_labels = []
pred_labels = []
for id, label in gold_data["prediction"].items():
gold_labels.append(label)
pred_labels.append(pred_data["prediction"].get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
prediction_result = {"cjpe-eval": f1}
# Evaluate explanation
rouge = evaluate.load("rouge")
bleu = evaluate.load("bleu")
gold_explanations = [exp["expert_1"] for exp in gold_data["explanation"].values()]
pred_explanations = [exp["expert_1"] for exp in pred_data["explanation"].values()]
rouge_scores = rouge.compute(
predictions=pred_explanations, references=gold_explanations
)
bleu_score = bleu.compute(
predictions=pred_explanations, references=gold_explanations
)
explanation_result = {
"cjpe-exp-eval": {
"rouge": [rouge_scores],
"bleu": [bleu_score],
}
}
return {**prediction_result, **explanation_result}
def evaluate_lner(gold_data, pred_data, text_data):
with open("labels.txt") as f:
labels = f.read().strip().split("\n")
results_per_fold = {}
for fold in range(1, 4):
gold = gold_data[f"fold_{fold}"]
pred = pred_data[f"fold_{fold}"]
text = text_data[f"fold_{fold}"]
texts, gold_labels, pred_labels = [], [], []
for id, gold_label in tqdm(gold.items()):
txt = text[id]
pred_label = pred.get(id, [])
txt_seg, gold_bio = span2bio(txt, gold_label)
_, pred_bio = span2bio(txt, pred_label)
texts.append(txt_seg)
gold_labels.append(gold_bio)
pred_labels.append(pred_bio)
evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list")
results, results_per_tag, _, _ = evaluator.evaluate()
f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag]
avg_f1 = sum(f1_scores) / len(f1_scores)
print(f"Strict Macro-F1 on Fold {fold}:", avg_f1)
results_per_fold[f"fold_{fold}"] = avg_f1
return {"strict mF1": f"{np.mean(list(results_per_fold.values()))}:.2f"}
def evaluate_rr(gold_data, pred_data):
all_gold_labels = []
all_pred_labels = []
for id, gold_labels in gold_data.items():
pred_labels = pred_data.get(id, ["None"] * len(gold_labels))
all_gold_labels.extend(gold_labels)
all_pred_labels.extend(pred_labels)
mf1 = f1_score(all_gold_labels, all_pred_labels, average="macro")
print(f"Macro-F1 on combined test set:", mf1)
return {"mF1": f"{mf1:.2f}"}
def evaluate_lsi(gold_data, pred_data):
with open("lsi_label_vocab.json") as f:
label_vocab = json.load(f)
gold_matrix = np.zeros((len(gold_data), len(label_vocab)))
pred_matrix = np.zeros((len(gold_data), len(label_vocab)))
for i, (id, gold_labels) in enumerate(gold_data.items()):
pred_labels = pred_data.get(id, [])
for label in gold_labels:
if label in label_vocab:
gold_matrix[i, label_vocab[label]] = 1
for label in pred_labels:
if label in label_vocab:
pred_matrix[i, label_vocab[label]] = 1
f1 = f1_score(gold_matrix, pred_matrix, average="macro")
print("Macro-F1 on ILSI test set:", f1)
return f1
def evaluate_pcr(gold_data, pred_data):
f1_scores = []
for k in range(1, 21):
correct, gold_total, pred_total = 0, 0, 0
for id, gold_candidates in gold_data.items():
pred_candidates = pred_data.get(id, [])
gold_candidates = [c for c in gold_candidates if c != id]
pred_candidates = [c for c in pred_candidates if c != id]
c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k)
correct += c
gold_total += g
pred_total += p
precision = correct / pred_total if pred_total > 0 else 0
recall = correct / gold_total if gold_total > 0 else 0
f1 = (
2 * precision * recall / (precision + recall)
if precision + recall > 0
else 0
)
f1_scores.append(f1)
print(f"Micro-F1@{k} on IL-PCR test set:", f1)
return np.mean(f1_scores)
def evaluate_summ(gold_data, pred_data):
gold_summaries = []
pred_summaries = []
for id, gold_summary in gold_data.items():
if id in pred_data:
gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip()
pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip()
gold_summaries.append(gold_summary)
pred_summaries.append(pred_summary)
rouge = evaluate.load("rouge")
rouge_scores = rouge.compute(predictions=pred_summaries, references=gold_summaries)
print("Rouge-L:", rouge_scores)
return {"ROUGE-L": rouge_scores, "BERTSCORE": "-"}
def evaluate_lmt(gold_data, pred_data):
tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert")
bleu = BLEU()
chrfp = CHRF(word_order=2)
gleu = evaluate.load("google_bleu")
G = defaultdict(lambda: defaultdict(list))
P = defaultdict(lambda: defaultdict(list))
for dataset in gold_data:
for id, gold_text in gold_data[dataset].items():
lang = id.split("/")[1].strip()
gold_tokens = " ".join(tokenizer.tokenize(gold_text))
pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id]))
G[dataset][lang].append(gold_tokens)
P[dataset][lang].append(pred_tokens)
bleu_scores, chrfpp_scores, gleu_scores = [], [], []
for dataset in G:
print("Dataset", dataset)
dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], []
for lang in G[dataset]:
gold = G[dataset][lang]
pred = P[dataset][lang]
bleu_score = bleu.corpus_score(pred, [gold]).score
chrfpp_score = chrfp.corpus_score(pred, [gold]).score
gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"]
dataset_bleu.append(bleu_score)
dataset_chrfpp.append(chrfpp_score)
dataset_gleu.append(gleu_score)
bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu))
chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp))
gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu))
return {
"BLEU": sum(bleu_scores) / len(bleu_scores),
"GLEU": sum(gleu_scores) / len(gleu_scores),
"chrF++": sum(chrfpp_scores) / len(chrfpp_scores),
}
def create_output_json(evaluation_results):
output = {
"Method": "GPT-5 (2-shot)",
"Submitted By": "IL-TUR",
"Github Link": "dummy submission",
"L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]},
"RR": {"mF1": evaluation_results["rr"]["mF1"]},
"CJPE": {
"mF1": evaluation_results["cjpe"]["mF1"],
"ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"],
"BLEU": evaluation_results["cjpe"]["BLEU"],
},
"BAIL": {"mF1": evaluation_results["bail"]},
"LSI": {"mF1": evaluation_results["lsi"]},
"PCR": {"muF1@K": evaluation_results["pcr"]},
"SUMM": {
"ROUGE-L": evaluation_results["summ"]["ROUGE-L"],
"BERTSCORE": "-", # Placeholder BERTSCORE
},
"L-MT": {
"BLEU": evaluation_results["lmt"]["BLEU"],
"GLEU": evaluation_results["lmt"]["GLEU"],
"chrF++": evaluation_results["lmt"]["chrF++"],
},
}
return [output] # Wrap in a list to match the desired format
def main():
# gold_data = load_json("IL_TUR_eval_gold.json")
# pred_data = load_json("IL_TUR_eval_submission2.json")
gold_data = load_json("submissions/baseline/IL_TUR_eval_gold_small.json")
pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_small.json")
pred_data = gold_data
evaluation_results = {}
for task in pred_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task])
elif task == "cjpe":
evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task]))
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], pred_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task])
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task])
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task])
elif task == "summ":
evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task])
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task])
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in pred_data:
evaluation_results[task] = blank_scores[task]
# Generate the output JSON
output_json = create_output_json(evaluation_results)
with open("evaluation_results.json", "w") as f:
json.dump(output_json, f, indent=2)
print("Evaluation results saved to evaluation_results.json")
def get_evaluation_scores(gold_data, submission_data):
evaluation_results = {}
for task in submission_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(
gold_data[task], submission_data[task]
)
elif task == "cjpe":
evaluation_results.update(
evaluate_cjpe(gold_data[task], submission_data[task])
)
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], submission_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(
gold_data[task], submission_data[task]
)
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(
gold_data[task], submission_data[task]
)
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(
gold_data[task], submission_data[task]
)
elif task == "summ":
evaluation_results[task] = evaluate_summ(
gold_data[task], submission_data[task]
)
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(
gold_data[task], submission_data[task]
)
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in submission_data:
evaluation_results[task] = blank_scores[task]
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
output_json = create_output_json(evaluation_results)
return output_json
if __name__ == "__main__":
main()