import json import re from collections import defaultdict import evaluate import nltk import numpy as np from nervaluate import Evaluator from sacrebleu.metrics import BLEU, CHRF from sklearn.metrics import f1_score from tqdm import tqdm from transformers import AutoTokenizer import rouge import bert_score import string def load_json(file_path): with open(file_path, "r") as f: return json.load(f) def get_micro_at_k(gold, pred, k): gold_set = set(gold) pred_set = set(pred[:k]) return len(gold_set & pred_set), len(gold_set), len(pred_set) def evaluate_bail(gold_data, pred_data): gold_labels = [] pred_labels = [] for id, label in gold_data.items(): gold_labels.append(label) pred_labels.append(pred_data.get(id, 0)) f1 = f1_score(gold_labels, pred_labels, average="macro") print("Macro-F1 on HLDC-all-districts test set:", f1) return {"mF1": f1*100} def get_BLEU_score(ref_text_all, machine_text_all): sc_all = [] for i in range(len(ref_text_all)): ref_text = ref_text_all[i] machine_text = machine_text_all[i] tok_ref_text = nltk.word_tokenize(ref_text) tok_machine_text = nltk.word_tokenize(machine_text) sc = nltk.translate.bleu_score.sentence_bleu([tok_ref_text], tok_machine_text, weights = (0.5,0.5)) sc_all.append(sc) return sum(sc_all)/len(sc_all) def evaluate_cjpe(gold_data, pred_data): # Evaluate prediction gold_labels = [] pred_labels = [] for id, label in gold_data["prediction"].items(): gold_labels.append(label) pred_labels.append(pred_data["prediction"].get(id, 0)) f1 = f1_score(gold_labels, pred_labels, average="macro") prediction_result = {"cjpe-eval": f1} print("Macro-F1 on ILDC test:", prediction_result) R = [] B = [] rl_evaluator = rouge.Rouge(metrics=['rouge-l'], max_n=2, limit_length=False, apply_avg=True) for x in range(1, 6): gold_explanations = [] pred_explanations = [] for k,v in gold_data['explanation'].items(): gold_explanations.append(v[f'expert_{x}']) pred_explanations.append(pred_data['explanation'][k]) print("Metrics for expert", x, "...", end=' ') rougex = rl_evaluator.get_scores(pred_explanations, gold_explanations)['rouge-l']['f'] bleux = get_BLEU_score(gold_explanations, pred_explanations) R.append(rougex) B.append(bleux) print("Done.") rouge_score = sum(R)/len(R) bleu_score = sum(B)/len(B) explanation_result = { "cjpe-exp-eval": { "rouge": rouge_score, "bleu": bleu_score, } } print("Explanability for ILDC Expert:", explanation_result) #return {**prediction_result, **explanation_result} return {"mF1": f1*100, "ROUGE-L": rouge_score*100, "BLEU": bleu_score*100} def span2bio(txt, roles): roles = sorted(roles, key = lambda x:x['start']) roles_left = [r['start'] for r in roles] ttxt = re.findall(r'[{}]|\w+'.format(string.punctuation), txt) c = 0 cr = -1 prev = 'O' troles = [] for tok in ttxt: if c >= len(txt): break while txt[c] == ' ': c += 1 else: if c in roles_left: # Start of a new role ind = roles_left.index(c) cr = roles[ind]['end'] prev = 'I-' + roles[ind]['label'] troles.append('B-' + roles[ind]['label']) else: if c < cr: # Assign previous role troles.append(prev) else: # Assign 'O' troles.append('O') c += len(tok) if len(ttxt) != len(troles): troles += ['O'] * (len(ttxt) - len(troles)) assert len(ttxt) == len(troles) return ttxt, troles def evaluate_lner(gold_data, pred_data, text_data): with open("ner_labels.txt") as f: labels = f.read().strip().split("\n") results_per_fold = {} for fold in range(1, len(gold_data) + 1): gold = gold_data[f"fold_{fold}"] pred = pred_data[f"fold_{fold}"] text = text_data[f"fold_{fold}"] texts, gold_labels, pred_labels = [], [], [] for id, gold_label in tqdm(gold.items()): txt = text[id] pred_label = pred.get(id, []) txt_seg, gold_bio = span2bio(txt, gold_label) _, pred_bio = span2bio(txt, pred_label) texts.append(txt_seg) gold_labels.append(gold_bio) pred_labels.append(pred_bio) evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list") results, results_per_tag, _, _ = evaluator.evaluate() f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag] avg_f1 = sum(f1_scores) / len(f1_scores) print(f"Strict Macro-F1 on Fold {fold}:", avg_f1) results_per_fold[f"fold_{fold}"] = avg_f1 print("Strict macro-F1 on L-NER Dataset:", results_per_fold) return {"strict mF1": sum(results_per_fold.values())/len(results_per_fold)*100} def evaluate_rr(gold_data, pred_data): all_gold_labels = [] all_pred_labels = [] with open("rr_label_vocab.json") as f: label_vocab = json.load(f) for id, gold_labels in gold_data.items(): pred_labels = pred_data.get(id, ["None"] * len(gold_labels)) for i in range(len(gold_labels)): g = gold_labels[i] p = pred_labels[i] if g not in label_vocab: continue for pp in p.split(): if pp in label_vocab: p = pp break if p not in label_vocab: continue all_gold_labels.append([label_vocab[g]]) all_pred_labels.append([label_vocab[p]]) f1 = f1_score(all_gold_labels, all_pred_labels, average="macro") print(f"Macro-F1 on combined test set:", f1) return {"mF1": f1*100} def evaluate_lsi(gold_data, pred_data): with open("lsi_label_vocab.json") as f: label_vocab = json.load(f) gold_matrix = np.zeros((len(gold_data), len(label_vocab))) pred_matrix = np.zeros((len(gold_data), len(label_vocab))) for i, (id, gold_labels) in enumerate(gold_data.items()): pred_labels = pred_data.get(id, []) for label in gold_labels: if label in label_vocab: gold_matrix[i, label_vocab[label]] = 1 for label in pred_labels: if label in label_vocab: pred_matrix[i, label_vocab[label]] = 1 f1 = f1_score(gold_matrix, pred_matrix, average="macro") print("Macro-F1 on ILSI test set:", f1) return {"mF1": f1*100} def evaluate_pcr(gold_data, pred_data): f1_scores = [] for k in range(1, 21): correct, gold_total, pred_total = 0, 0, 0 for id, gold_candidates in tqdm(gold_data.items(), desc="pcr"): pred_candidates = pred_data.get(id, []) gold_candidates = [c for c in gold_candidates if c != id] pred_candidates = [c for c in pred_candidates if c != id] c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k) correct += c gold_total += g pred_total += p precision = correct / pred_total if pred_total > 0 else 0 recall = correct / gold_total if gold_total > 0 else 0 f1 = ( 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0 ) f1_scores.append(f1) print(f"Micro-F1@{k} on IL-PCR test set:", f1) max_f1 = max(f1_scores) index_max = f1_scores.index(max_f1) + 1 return {"muF1@K": f"{max_f1*100:.2f}@{index_max}"} def evaluate_summ(gold_data, pred_data): gold_summaries = [] pred_summaries = [] for id, gold_summary in gold_data.items(): if id in pred_data: gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip() pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip() gold_summaries.append(gold_summary) pred_summaries.append(pred_summary) # rl_evaluator = rouge.Rouge(metrics=['rouge-l'], max_n=2, limit_length=False, apply_avg=True) # rl_scores = rl_evaluator.get_scores(pred_summaries, gold_summaries) # print("Rouge:", {k:v['f'] for k,v in rl_scores.items()}, flush=True) _, _, bs = bert_score.score(pred_summaries, gold_summaries, lang="en", verbose=True) print("BERTSCORE:", bs.mean().item()) # return {'ROUGE-L': rl_scores['rouge-l']['f'] * 100, 'BERTSCORE': bs.mean().item() * 100} return {'ROUGE-L': '-', 'BERTSCORE': bs.mean().item() * 100} def evaluate_lmt(gold_data, pred_data): tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert", use_fast=False) bleu = BLEU() chrfp = CHRF(word_order=2) gleu = evaluate.load("google_bleu") G = defaultdict(lambda: defaultdict(list)) P = defaultdict(lambda: defaultdict(list)) for dataset in gold_data: for id, gold_text in gold_data[dataset].items(): lang = id.split("/")[1].strip() gold_tokens = " ".join(tokenizer.tokenize(gold_text)) pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id])) G[dataset][lang].append(gold_tokens) P[dataset][lang].append(pred_tokens) bleu_scores, chrfpp_scores, gleu_scores = [], [], [] for dataset in G: print("Dataset", dataset) dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], [] for lang in G[dataset]: gold = G[dataset][lang] pred = P[dataset][lang] bleu_score = bleu.corpus_score(pred, [gold]).score chrfpp_score = chrfp.corpus_score(pred, [gold]).score gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"] dataset_bleu.append(bleu_score) dataset_chrfpp.append(chrfpp_score) dataset_gleu.append(gleu_score) bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu)) chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp)) gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu)) return { "BLEU": sum(bleu_scores) / len(bleu_scores), "GLEU": sum(gleu_scores) / len(gleu_scores) * 100, "chrF++": sum(chrfpp_scores) / len(chrfpp_scores), } def create_output_json(evaluation_results): output = { "Method": "Dummy Summarization", "Submitted By": "IL-TUR", "Github Link": "dummy submission", "L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]}, "RR": {"mF1": evaluation_results["rr"]["mF1"]}, "CJPE": { "mF1": evaluation_results["cjpe"]["mF1"], "ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"], "BLEU": evaluation_results["cjpe"]["BLEU"], }, "BAIL": {"mF1": evaluation_results["bail"]["mF1"]}, "LSI": {"mF1": evaluation_results["lsi"]["mF1"]}, "PCR": {"muF1@K": evaluation_results["pcr"]["muF1@K"]}, "SUMM": { "ROUGE-L": evaluation_results["summ"]["ROUGE-L"], "BERTSCORE": evaluation_results["summ"]["BERTSCORE"] #"-", # Placeholder BERTSCORE }, "L-MT": { "BLEU": evaluation_results["lmt"]["BLEU"], "GLEU": evaluation_results["lmt"]["GLEU"], "chrF++": evaluation_results["lmt"]["chrF++"], }, } return [output] # Wrap in a list to match the desired format def main(): # gold_data = load_json("IL_TUR_eval_gold.json") # pred_data = load_json("IL_TUR_eval_submission2.json") gold_data = load_json("submissions/baseline/IL_TUR_eval_gold.json") pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_dummy.json") pred_data = gold_data evaluation_results = {} for task in pred_data.keys(): print(f"Task: {task}") if task == "bail": evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task]) elif task == "cjpe": nltk.download('punkt') evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task])) elif task == "lner": text_data = load_json("lner-text.json") evaluation_results[task] = evaluate_lner( gold_data[task], pred_data[task], text_data ) elif task == "rr": evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task]) elif task == "lsi": evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task]) elif task == "pcr": evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task]) elif task == "summ": nltk.download('punkt') evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task]) elif task == "lmt": evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task]) # convert the evaluation results to the required format for task, result in evaluation_results.items(): if isinstance(result, dict): for subtask, subresult in result.items(): if isinstance(subresult, dict): for subsubtask, subsubresult in subresult.items(): evaluation_results[task][subtask][ subsubtask ] = f"{subsubresult:.2f}" else: if isinstance(subresult, str): evaluation_results[task][subtask] = subresult else: evaluation_results[task][subtask] = f"{subresult:.2f}" else: if isinstance(result, str): evaluation_results[task] = result else: evaluation_results[task] = f"{result:.2f}" blank_scores = { "lner": {"strict mF1": "-"}, "rr": {"mF1": "-"}, "cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, "bail": {"mF1": "-"}, "lsi": {"mF1": "-"}, "pcr": {"muF1@K": "-"}, "summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, "lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, } print("--------------------------Evaluation Summary--------------------------") for task, result in evaluation_results.items(): print(f"{task}: {result}") print("---------------------------------------------------------------------") # for tasks that were not present in the submission, add blank scores for task in gold_data.keys(): if task not in pred_data: evaluation_results[task] = blank_scores[task] # Generate the output JSON output_json = create_output_json(evaluation_results) with open("evaluation_results.json", "w") as f: json.dump(output_json, f, indent=2) print("Evaluation results saved to evaluation_results.json") def get_evaluation_scores(gold_data, submission_data): evaluation_results = {} for task in submission_data.keys(): print(f"Task: {task}") if task == "bail": evaluation_results[task] = evaluate_bail( gold_data[task], submission_data[task] ) elif task == "cjpe": nltk.download('punkt') evaluation_results.update( evaluate_cjpe(gold_data[task], submission_data[task]) ) elif task == "lner": text_data = load_json("lner-text.json") evaluation_results[task] = evaluate_lner( gold_data[task], submission_data[task], text_data ) elif task == "rr": evaluation_results[task] = evaluate_rr( gold_data[task], submission_data[task] ) elif task == "lsi": evaluation_results[task] = evaluate_lsi( gold_data[task], submission_data[task] ) elif task == "pcr": evaluation_results[task] = evaluate_pcr( gold_data[task], submission_data[task] ) elif task == "summ": nltk.download('punkt') evaluation_results[task] = evaluate_summ( gold_data[task], submission_data[task] ) elif task == "lmt": evaluation_results[task] = evaluate_lmt( gold_data[task], submission_data[task] ) # convert the evaluation results to the required format for task, result in evaluation_results.items(): if isinstance(result, dict): for subtask, subresult in result.items(): if isinstance(subresult, dict): for subsubtask, subsubresult in subresult.items(): evaluation_results[task][subtask][ subsubtask ] = f"{subsubresult:.2f}" else: if isinstance(subresult, str): evaluation_results[task][subtask] = subresult else: evaluation_results[task][subtask] = f"{subresult:.2f}" else: if isinstance(result, str): evaluation_results[task] = result else: evaluation_results[task] = f"{result:.2f}" blank_scores = { "lner": {"strict mF1": "-"}, "rr": {"mF1": "-"}, "cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, "bail": {"mF1": "-"}, "lsi": {"mF1": "-"}, "pcr": {"muF1@K": "-"}, "summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, "lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, } # for tasks that were not present in the submission, add blank scores for task in gold_data.keys(): if task not in submission_data: evaluation_results[task] = blank_scores[task] print("--------------------------Evaluation Summary--------------------------") for task, result in evaluation_results.items(): print(f"{task}: {result}") print("---------------------------------------------------------------------") output_json = create_output_json(evaluation_results) return output_json if __name__ == "__main__": main()