IL-TUR-Leaderboard / eval_utils.py
abhinav-joshi's picture
clean codebase
e22e877
raw
history blame
19 kB
import json
import re
from collections import defaultdict
import evaluate
# import nltk
import numpy as np
from nervaluate import Evaluator
from rouge_score import rouge_scorer
from sacrebleu.metrics import BLEU, CHRF
from sklearn.metrics import f1_score
from tqdm import tqdm
from transformers import AutoTokenizer
from transformers import AutoTokenizer
import re
import string
class TF_Tokenizer:
def __init__(self, model_str):
tok = AutoTokenizer.from_pretrained(model_str)
def __call__(self, txt):
return self.tok.tokenize(txt)
class WS_Tokenizer:
def __init__(self):
pass
def __call__(self, txt):
return re.findall(r"[{}]|\w+".format(string.punctuation), txt)
def convert_spans_to_bio(txt, roles, tokenizer_func):
roles = sorted(roles, key=lambda x: x["start"])
roles_left = [r["start"] for r in roles]
ttxt = tokenizer_func(txt)
c = 0
cr = -1
prev = "O"
troles = []
for tok in ttxt:
if c >= len(txt):
break
while txt[c] == " ":
c += 1
else:
if c in roles_left: # Start of a new role
ind = roles_left.index(c)
cr = roles[ind]["end"]
prev = "I-" + roles[ind]["label"]
troles.append("B-" + roles[ind]["label"])
else:
if c < cr: # Assign previous role
troles.append(prev)
else: # Assign 'O'
troles.append("O")
c += len(tok)
if len(ttxt) != len(troles):
troles += ["O"] * (len(ttxt) - len(troles))
assert len(ttxt) == len(troles)
return troles
def convert_bio_to_spans(txt, troles, tokenizer_func):
c = 0
c2 = 0
cr = -1
cs = -1
prev = "O"
roles = []
ttxt = tokenizer_func(txt)
if len(ttxt) != len(troles):
ttxt = ttxt[: len(troles)]
for j, tok in enumerate(ttxt):
if c >= len(txt):
break
while c < len(txt) and txt[c].isspace():
c += 1
if tok[:2] == "##" or tok == "[UNK]":
c += len(tok) - 2 if tok[:2] == "##" else 1
else:
if troles[j].startswith("B-"):
if cs >= cr:
cr = c
if cs >= 0:
roles.append({"start": cs, "end": c2, "label": prev})
cs = c
prev = troles[j][2:]
else:
if troles[j] == "O":
if cs >= cr:
cr = c
if cs >= 0:
roles.append({"start": cs, "end": c2, "label": prev})
c += len(tok)
c2 = c
if cs >= cr:
if cs >= 0:
roles.append({"start": cs, "end": c2, "label": prev})
return roles
def span2bio(txt, labels):
roles = sorted(labels, key=lambda x: x["label"])
roles_left = [r["start"] for r in roles]
ttxt = re.findall(r"[{}]|\w+".format(string.punctuation), txt)
c = 0
cr = -1
prev = "O"
troles = []
for tok in ttxt:
if c >= len(txt):
break
while txt[c] == " ":
c += 1
else:
if c in roles_left: # Start of a new role
ind = roles_left.index(c)
cr = roles[ind]["end"]
prev = "I-" + roles[ind]["label"]
troles.append("B-" + roles[ind]["label"])
else:
if c < cr: # Assign previous role
troles.append(prev)
else: # Assign 'O'
troles.append("O")
c += len(tok)
if len(ttxt) != len(troles):
troles += ["O"] * (len(ttxt) - len(troles))
assert len(ttxt) == len(troles)
return ttxt, troles
def load_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
def get_micro_at_k(gold, pred, k):
gold_set = set(gold)
pred_set = set(pred[:k])
return len(gold_set & pred_set), len(gold_set), len(pred_set)
def evaluate_bail(gold_data, pred_data):
gold_labels = []
pred_labels = []
for id, label in gold_data.items():
gold_labels.append(label)
pred_labels.append(pred_data.get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
print("Macro-F1 on HLDC-all-districts test set:", f1)
return f"{f1:.2f}"
def evaluate_cjpe(gold_data, pred_data):
# Evaluate prediction
gold_labels = []
pred_labels = []
for id, label in gold_data["prediction"].items():
gold_labels.append(label)
pred_labels.append(pred_data["prediction"].get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
prediction_result = {"cjpe-eval": f1}
# Evaluate explanation
rouge = evaluate.load("rouge")
bleu = evaluate.load("bleu")
gold_explanations = [exp["expert_1"] for exp in gold_data["explanation"].values()]
pred_explanations = [exp["expert_1"] for exp in pred_data["explanation"].values()]
rouge_scores = rouge.compute(
predictions=pred_explanations, references=gold_explanations
)
bleu_score = bleu.compute(
predictions=pred_explanations, references=gold_explanations
)
explanation_result = {
"cjpe-exp-eval": {
"rouge": [rouge_scores],
"bleu": [bleu_score],
}
}
return {**prediction_result, **explanation_result}
def evaluate_lner(gold_data, pred_data, text_data):
labels = [
"APP",
"RESP",
"A.COUNSEL",
"R.COUNSEL",
"JUDGE",
"WIT",
"AUTH",
"COURT",
"STAT",
"PREC",
"DATE",
"CASENO",
]
results_per_fold = {}
for fold in range(1, 4):
gold = gold_data[f"fold_{fold}"]
pred = pred_data[f"fold_{fold}"]
text = text_data[f"fold_{fold}"]
texts, gold_labels, pred_labels = [], [], []
for id, gold_label in tqdm(gold.items()):
txt = text[id]
pred_label = pred.get(id, [])
txt_seg, gold_bio = span2bio(txt, gold_label)
_, pred_bio = span2bio(txt, pred_label)
texts.append(txt_seg)
gold_labels.append(gold_bio)
pred_labels.append(pred_bio)
evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list")
results, results_per_tag, _, _ = evaluator.evaluate()
f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag]
avg_f1 = sum(f1_scores) / len(f1_scores)
print(f"Strict Macro-F1 on Fold {fold}:", avg_f1)
results_per_fold[f"fold_{fold}"] = avg_f1
return {"strict mF1": f"{np.mean(list(results_per_fold.values()))}:.2f"}
def evaluate_rr(gold_data, pred_data):
all_gold_labels = []
all_pred_labels = []
for id, gold_labels in gold_data.items():
pred_labels = pred_data.get(id, ["None"] * len(gold_labels))
all_gold_labels.extend(gold_labels)
all_pred_labels.extend(pred_labels)
mf1 = f1_score(all_gold_labels, all_pred_labels, average="macro")
print(f"Macro-F1 on combined test set:", mf1)
return {"mF1": f"{mf1:.2f}"}
def evaluate_lsi(gold_data, pred_data):
with open("lsi_label_vocab.json") as f:
label_vocab = json.load(f)
gold_matrix = np.zeros((len(gold_data), len(label_vocab)))
pred_matrix = np.zeros((len(gold_data), len(label_vocab)))
for i, (id, gold_labels) in enumerate(gold_data.items()):
pred_labels = pred_data.get(id, [])
for label in gold_labels:
if label in label_vocab:
gold_matrix[i, label_vocab[label]] = 1
for label in pred_labels:
if label in label_vocab:
pred_matrix[i, label_vocab[label]] = 1
f1 = f1_score(gold_matrix, pred_matrix, average="macro")
print("Macro-F1 on ILSI test set:", f1)
return f1
def evaluate_pcr(gold_data, pred_data):
f1_scores = []
for k in range(1, 21):
correct, gold_total, pred_total = 0, 0, 0
for id, gold_candidates in gold_data.items():
pred_candidates = pred_data.get(id, [])
gold_candidates = [c for c in gold_candidates if c != id]
pred_candidates = [c for c in pred_candidates if c != id]
c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k)
correct += c
gold_total += g
pred_total += p
precision = correct / pred_total if pred_total > 0 else 0
recall = correct / gold_total if gold_total > 0 else 0
f1 = (
2 * precision * recall / (precision + recall)
if precision + recall > 0
else 0
)
f1_scores.append(f1)
print(f"Micro-F1@{k} on IL-PCR test set:", f1)
return np.mean(f1_scores)
def evaluate_summ(gold_data, pred_data):
gold_summaries = []
pred_summaries = []
for id, gold_summary in gold_data.items():
if id in pred_data:
gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip()
pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip()
gold_summaries.append(gold_summary)
pred_summaries.append(pred_summary)
rouge = evaluate.load("rouge")
rouge_scores = rouge.compute(predictions=pred_summaries, references=gold_summaries)
print("Rouge-L:", rouge_scores)
return {"ROUGE-L": rouge_scores, "BERTSCORE": "-"}
def evaluate_lmt(gold_data, pred_data):
tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert")
bleu = BLEU()
chrfp = CHRF(word_order=2)
gleu = evaluate.load("google_bleu")
G = defaultdict(lambda: defaultdict(list))
P = defaultdict(lambda: defaultdict(list))
for dataset in gold_data:
for id, gold_text in gold_data[dataset].items():
lang = id.split("/")[1].strip()
gold_tokens = " ".join(tokenizer.tokenize(gold_text))
pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id]))
G[dataset][lang].append(gold_tokens)
P[dataset][lang].append(pred_tokens)
bleu_scores, chrfpp_scores, gleu_scores = [], [], []
for dataset in G:
print("Dataset", dataset)
dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], []
for lang in G[dataset]:
gold = G[dataset][lang]
pred = P[dataset][lang]
bleu_score = bleu.corpus_score(pred, [gold]).score
chrfpp_score = chrfp.corpus_score(pred, [gold]).score
gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"]
dataset_bleu.append(bleu_score)
dataset_chrfpp.append(chrfpp_score)
dataset_gleu.append(gleu_score)
bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu))
chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp))
gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu))
return {
"BLEU": sum(bleu_scores) / len(bleu_scores),
"GLEU": sum(gleu_scores) / len(gleu_scores),
"chrF++": sum(chrfpp_scores) / len(chrfpp_scores),
}
def create_output_json(evaluation_results):
output = {
"Method": "GPT-5 (2-shot)",
"Submitted By": "IL-TUR",
"Github Link": "dummy submission",
"L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]},
"RR": {"mF1": evaluation_results["rr"]["mF1"]},
"CJPE": {
"mF1": evaluation_results["cjpe"]["mF1"],
"ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"],
"BLEU": evaluation_results["cjpe"]["BLEU"],
},
"BAIL": {"mF1": evaluation_results["bail"]},
"LSI": {"mF1": evaluation_results["lsi"]},
"PCR": {"muF1@K": evaluation_results["pcr"]},
"SUMM": {
"ROUGE-L": evaluation_results["summ"]["ROUGE-L"],
"BERTSCORE": "-", # Placeholder BERTSCORE
},
"L-MT": {
"BLEU": evaluation_results["lmt"]["BLEU"],
"GLEU": evaluation_results["lmt"]["GLEU"],
"chrF++": evaluation_results["lmt"]["chrF++"],
},
}
return [output] # Wrap in a list to match the desired format
def main():
# gold_data = load_json("IL_TUR_eval_gold.json")
# pred_data = load_json("IL_TUR_eval_submission2.json")
gold_data = load_json("submissions/baseline/IL_TUR_eval_gold_small.json")
pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_small.json")
pred_data = gold_data
evaluation_results = {}
for task in pred_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task])
elif task == "cjpe":
evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task]))
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], pred_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task])
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task])
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task])
elif task == "summ":
evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task])
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task])
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in pred_data:
evaluation_results[task] = blank_scores[task]
# Generate the output JSON
output_json = create_output_json(evaluation_results)
with open("evaluation_results.json", "w") as f:
json.dump(output_json, f, indent=2)
print("Evaluation results saved to evaluation_results.json")
def get_evaluation_scores(gold_data, submission_data):
evaluation_results = {}
for task in submission_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(
gold_data[task], submission_data[task]
)
elif task == "cjpe":
evaluation_results.update(
evaluate_cjpe(gold_data[task], submission_data[task])
)
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], submission_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(
gold_data[task], submission_data[task]
)
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(
gold_data[task], submission_data[task]
)
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(
gold_data[task], submission_data[task]
)
elif task == "summ":
evaluation_results[task] = evaluate_summ(
gold_data[task], submission_data[task]
)
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(
gold_data[task], submission_data[task]
)
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in submission_data:
evaluation_results[task] = blank_scores[task]
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
output_json = create_output_json(evaluation_results)
return output_json
if __name__ == "__main__":
main()