Spaces:
Running
Running
import torch | |
import torch.nn.functional as F | |
from transformers import (AutoModelForSeq2SeqLM, | |
AutoTokenizer, | |
PreTrainedTokenizer, | |
PreTrainedTokenizerFast) | |
import evaluate | |
from fire import Fire | |
import pandas as pd | |
from tqdm import tqdm | |
import json | |
from typing import List, Dict, Union | |
from collections import defaultdict | |
from functools import partial | |
from pprint import pprint | |
from ipdb import set_trace | |
class Harimplus_Scorer: | |
def __init__(self, | |
pretrained_name:str='none', | |
tokenizer:Union[PreTrainedTokenizer, PreTrainedTokenizerFast]=None, | |
mixing_factor:float=7., # same as lambda in the paper | |
device:str='cuda', | |
src_maxlen=1024, | |
tgt_maxlen=110, | |
): | |
self._pretrained_name = pretrained_name | |
self._lambda = mixing_factor | |
self._device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') | |
self._encdec_model = AutoModelForSeq2SeqLM.from_pretrained(self._pretrained_name) | |
if tokenizer is None: | |
self._tokenizer = AutoTokenizer.from_pretrained(self._pretrained_name) | |
else: | |
self._tokenizer = tokenizer | |
self._encdec_model.to(self._device) | |
self._encdec_model.eval() | |
self._src_maxlen = src_maxlen | |
self._tgt_maxlen = tgt_maxlen | |
def _prep_input(self, src_tgt_txts, src_or_tgt='src'): | |
L = self._src_maxlen if src_or_tgt=='src' else self._tgt_maxlen | |
if isinstance(src_tgt_txts, pd.Series): | |
src_tgt_txts=src_tgt_txts.tolist() | |
if src_or_tgt == 'src': | |
src_tgt_txts = [ s.replace("\n", " ") for s in src_tgt_txts ] | |
return self._tokenizer(src_tgt_txts, padding=True, truncation=True, max_length=L, return_tensors='pt') # ModelInput dataclass | |
'''below are helper functions w/o dependency to the self, but included inside the class for ease of use''' | |
def likelihoods(self, logits, force_decode_indices, tgt_mask): | |
probs = F.softmax(logits, dim=-1) | |
probs_force_decode_ = probs.gather(-1, force_decode_indices.unsqueeze(-1)).squeeze() | |
probs_force_decode= probs_force_decode_ * tgt_mask | |
assert probs_force_decode.shape == force_decode_indices.shape | |
return probs_force_decode | |
def log_likelihoods(self, logits, force_decode_indices, tgt_mask): | |
ll = F.log_softmax(logits, dim=-1) | |
ll_force_decode_ = ll.gather(-1, force_decode_indices.unsqueeze(-1)).squeeze() | |
ll_force_decode = ll_force_decode_ * tgt_mask | |
return ll_force_decode | |
def harim(self, s2s_logits, lm_logits, force_decode_indices, tgt_mask ): | |
p_s2s, p_lm = self.likelihoods(s2s_logits, force_decode_indices, tgt_mask), \ | |
self.likelihoods(lm_logits, force_decode_indices, tgt_mask) | |
delta = p_s2s - p_lm | |
margin_linear = (1-delta) / 2 | |
harim = -(1-p_s2s) * margin_linear + 1 | |
return harim # this is -1 * hallucination risk | |
def make_minibatches(self, exs:List[str], bsz:int=32): | |
idx=0 | |
minibatches = [] | |
while True: | |
start = idx | |
end = idx+bsz | |
if start >= len(exs): | |
break | |
minibatches.append( exs[start:end] ) | |
idx += bsz | |
return minibatches | |
def make_empty_minibatches(self, minibatches:List[List[str]]): | |
e_minibatches = minibatches.copy() | |
for i, mb in enumerate(e_minibatches): | |
e_minibatches[i] = ['' for ex in mb] | |
return e_minibatches | |
def compute(self, predictions:List[str], | |
references:List[str], | |
bsz:int=32, | |
use_aggregator:bool=False, | |
return_details:bool=False, | |
tokenwise_score:bool=False, | |
): | |
''' | |
returns harim+ score (List[float]) for predictions (summaries) and references (articles) | |
**Note** | |
- here, predictions = generated summaries to be evaluated, references = article to be summarized (but to follow the convention of the evaluate, we named kwarg as "references") | |
- log_ppl equals to bartscore (yuan et al., neurips 2021) | |
if tokenwise_score: | |
returns minibatch chunks of harim+ scores and log-likelihoods with tokenized predictions (List[str]) | |
if use_aggregator: | |
returning scores are aggregated (mean) over given test set | |
''' | |
# tokenize/prep src/tgts | |
make_minibatches_bsz = partial(self.make_minibatches, bsz=bsz) | |
summaries = predictions | |
articles = references | |
b_srcs, b_tgts = map(make_minibatches_bsz, [articles, summaries]) | |
b_emps = self.make_empty_minibatches(b_srcs) | |
scores=defaultdict(list) | |
for mini_s, mini_e, mini_t in tqdm(zip(b_srcs, b_emps, b_tgts), total=len(b_tgts), desc=f"computing HaRiM+ {bsz=}, core={self._pretrained_name}"): | |
src_in = self._prep_input(mini_s, src_or_tgt='src') | |
emp_in = self._prep_input(mini_e, src_or_tgt='src') | |
tgt_in = self._prep_input(mini_t, src_or_tgt='tgt') | |
if emp_in.input_ids.shape[-1]==0: # emp_in.input_ids.shape == (32,0) | |
boseos = f"{self._tokenizer.bos_token}{self._tokenizer.eos_token}" | |
mini_e_ = [boseos for _ in range(len(mini_e))] | |
emp_in = self._prep_input( mini_e_, src_or_tgt='src' ) | |
tgt_mask = tgt_in.attention_mask | |
src_in = src_in.to(self._device) | |
emp_in = emp_in.to(self._device) | |
tgt_in = tgt_in.to(self._device) | |
tgt_mask = tgt_mask.to(self._device) | |
fill_ignore_mask = ~(tgt_mask.bool()) | |
with torch.no_grad(): | |
# token_type_ids attribute causes error | |
s2s_logits = self._encdec_model.forward( | |
input_ids = src_in.input_ids, | |
attention_mask = src_in.attention_mask, | |
labels = tgt_in.input_ids.masked_fill(fill_ignore_mask, -100), | |
return_dict=True).logits | |
lm_logits = self._encdec_model.forward( | |
input_ids = emp_in.input_ids, | |
attention_mask = emp_in.attention_mask, | |
labels = tgt_in.input_ids.masked_fill(fill_ignore_mask, -100), | |
return_dict=True).logits | |
sent_lengths = tgt_mask.sum(-1) | |
ll_tok = self.log_likelihoods(s2s_logits, tgt_in.input_ids, tgt_mask) | |
ll = ll_tok.sum(-1) / sent_lengths | |
harim_tok = self.harim(s2s_logits, lm_logits, tgt_in.input_ids, tgt_mask) | |
harim = harim_tok.sum(-1) / sent_lengths | |
harim_plus_normalized = ll + self._lambda * harim # loglikelihood + lambda * negative_harim (negative harim=-1* risk) | |
scores['harim+'].extend(harim_plus_normalized.tolist()) | |
scores['harim'].extend(harim.tolist()) | |
scores['log_ppl'].extend(ll.tolist()) | |
if tokenwise_score: | |
scores['tok_harim+'].append(harim_tok*self._lambda + ll_tok) | |
scores['tok_predictions'].append( [self._tokenizer.convert_ids_to_token(idxs) for idxs in src_in.labels] ) | |
if use_aggregator: # after | |
for k, v in scores.items(): | |
if not k.startswith('tok_'): | |
scores[k] = sum(v)/len(v) # aggregate (mean) | |
scores['lambda'] = self._lambda | |
if not return_details: | |
scores = scores['harim+'] | |
return scores | |