|
import statistics |
|
|
|
import evaluate |
|
import re |
|
import html as _html |
|
import itertools as _itertools |
|
import random as _random |
|
|
|
from collections import namedtuple as _namedtuple |
|
|
|
import spacy as _spacy |
|
from os import system as _system |
|
|
|
_DESCRIPTION = """\ Fragments computes the extractiveness between source articles and summaries. The metric computes |
|
two scores: coverage and density. The code is adapted from the newsroom package( |
|
https://github.com/lil-lab/newsroom/blob/master/newsroom/analyze/fragments.py) and all credits goes to the authors of |
|
said code.""" |
|
|
|
_KWARGS_DESCRIPTION = """ |
|
Computes coverage and density scores of source articles and their corresponding summaries. |
|
Args: |
|
articles (list of str): source articles of the summaries. |
|
predictions (list of str): list of lists of or just a list of references for each translation. |
|
language (str): string of which language to use, currently supported are only 'english' and 'german'. Defaults to 'german' |
|
Returns: |
|
'coverage': Coverage is the percentage of words in a summary that are from the source article |
|
'density': Density is the average length of the text spans copied from the document that are contained in the summary. |
|
Examples: |
|
|
|
>>> articles = ["This is article 1", "This is article 2"] |
|
>>> summaries = ["Summary of article 1", "Summary of article 2"] |
|
>>> fragments = evaluate.load("fragments") |
|
>>> results = fragments.compute(articles=articles, predictions=summaries) |
|
>>> print(results["bleu"]) |
|
""" |
|
|
|
|
|
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION) |
|
class Fragments(evaluate.Metric): |
|
def _info(self): |
|
return evaluate.MetricInfo( |
|
module_type="metric", |
|
description=_DESCRIPTION, |
|
citation=_CITATION, |
|
inputs_description=_KWARGS_DESCRIPTION, |
|
features= |
|
datasets.Features( |
|
{ |
|
"articles": datasets.Value("string", id="sequence"), |
|
"predictions": datasets.Value("string", id="sequence"), |
|
} |
|
), |
|
codebase_urls=["https://github.com/lil-lab/newsroom/blob/master/newsroom/analyze/fragments.py"] |
|
) |
|
|
|
def _compute(self, articles, predictions, language="german"): |
|
coverages = [] |
|
densities = [] |
|
for article, summary in zip(articles, predictions): |
|
fragments = FragmentsOriginal(article, summary, language=language) |
|
coverages.append(fragments.coverage()) |
|
densities.append(fragments.density()) |
|
|
|
return { |
|
'coverage': coverages, |
|
'mean_coverage': statistics.mean(coverages), |
|
'density': densities, |
|
'mean_density': statistics.mean(density), |
|
} |
|
|
|
|
|
class FragmentsOriginal(object): |
|
Match = _namedtuple("Match", ("summary", "text", "length")) |
|
|
|
@classmethod |
|
def _load_model(cls, language): |
|
|
|
if language == 'english': |
|
if not hasattr(cls, "_en"): |
|
|
|
try: |
|
|
|
cls._en = _spacy.load("en_core_web_sm") |
|
|
|
except: |
|
|
|
_system("python -m spacy download en_core_web_sm") |
|
cls._en = _spacy.load("en_core_web_sm") |
|
if language == 'german': |
|
if not hasattr(cls, "_de"): |
|
|
|
try: |
|
|
|
cls._de = _spacy.load("de_core_news_sm") |
|
|
|
except: |
|
|
|
_system("python -m spacy download de_core_news_sm") |
|
cls._de = _spacy.load("de_core_news_sm") |
|
|
|
def __init__(self, text, summary, language="german", tokenize=True, case=False): |
|
|
|
self._load_model(language) |
|
|
|
self._tokens = tokenize |
|
|
|
self.summary = self._tokenize(summary, language) if tokenize else summary.split() |
|
self.text = self._tokenize(text, language) if tokenize else text.split() |
|
|
|
self._norm_summary = self._normalize(self.summary, case) |
|
self._norm_text = self._normalize(self.text, case) |
|
|
|
self._match(self._norm_summary, self._norm_text) |
|
|
|
def _tokenize(self, text, language): |
|
|
|
""" |
|
Tokenizes input using the fastest possible SpaCy configuration. |
|
This is optional, can be disabled in constructor. |
|
""" |
|
|
|
if language == "english": |
|
return self._en(text, disable=["tagger", "parser", "ner", "textcat"]) |
|
elif language == "german": |
|
return self._de(text, disable=["tagger", "parser", "ner", "textcat"]) |
|
else: |
|
return NotImplementedError |
|
|
|
def _normalize(self, tokens, case=False): |
|
|
|
""" |
|
Lowercases and turns tokens into distinct words. |
|
""" |
|
|
|
return [ |
|
str(t).lower() |
|
if not case |
|
else str(t) |
|
for t in tokens |
|
] |
|
|
|
def overlaps(self): |
|
|
|
""" |
|
Return a list of Fragments.Match objects between summary and text. |
|
This is a list of named tuples of the form (summary, text, length): |
|
- summary (int): the start index of the match in the summary |
|
- text (int): the start index of the match in the reference |
|
- length (int): the length of the extractive fragment |
|
""" |
|
|
|
return self._matches |
|
|
|
def strings(self, min_length=0, raw=None, summary_base=True): |
|
|
|
""" |
|
Return a list of explicit match strings between the summary and reference. |
|
Note that this will be in the same format as the strings are input. This is |
|
important to remember if tokenization is done manually. If tokenization is |
|
specified automatically on the raw strings, raw strings will automatically |
|
be returned rather than SpaCy tokenized sequences. |
|
Arguments: |
|
- min_length (int): filter out overlaps shorter than this (default = 0) |
|
- raw (bool): return raw input rather than stringified |
|
- (default = False if automatic tokenization, True otherwise) |
|
- summary_base (true): strings are based of summary text (default = True) |
|
Returns: |
|
- list of overlaps, where overlaps are strings or token sequences |
|
""" |
|
|
|
|
|
|
|
base = self.summary if summary_base else self.text |
|
|
|
|
|
|
|
strings = [ |
|
base[i: i + length] |
|
for i, j, length |
|
in self.overlaps() |
|
if length > min_length |
|
] |
|
|
|
|
|
|
|
|
|
|
|
if self._tokens and raw: |
|
|
|
for i, s in enumerate(strings): |
|
strings[i] = str(s) |
|
|
|
|
|
|
|
return strings |
|
|
|
def coverage(self, summary_base=True): |
|
|
|
""" |
|
Return the COVERAGE score of the summary and text. |
|
Arguments: |
|
- summary_base (bool): use summary as numerator (default = True) |
|
Returns: |
|
- decimal COVERAGE score within [0, 1] |
|
""" |
|
|
|
numerator = sum(o.length for o in self.overlaps()) |
|
|
|
if summary_base: |
|
denominator = len(self.summary) |
|
else: |
|
denominator = len(self.reference) |
|
|
|
if denominator == 0: |
|
return 0 |
|
else: |
|
return numerator / denominator |
|
|
|
def density(self, summary_base=True): |
|
|
|
""" |
|
Return the DENSITY score of summary and text. |
|
Arguments: |
|
- summary_base (bool): use summary as numerator (default = True) |
|
Returns: |
|
- decimal DENSITY score within [0, ...] |
|
""" |
|
|
|
numerator = sum(o.length ** 2 for o in self.overlaps()) |
|
|
|
if summary_base: |
|
denominator = len(self.summary) |
|
else: |
|
denominator = len(self.reference) |
|
|
|
if denominator == 0: |
|
return 0 |
|
else: |
|
return numerator / denominator |
|
|
|
def compression(self, text_to_summary=True): |
|
|
|
""" |
|
Return compression ratio between summary and text. |
|
Arguments: |
|
- text_to_summary (bool): compute text/summary ratio (default = True) |
|
Returns: |
|
- decimal compression score within [0, ...] |
|
""" |
|
|
|
ratio = [len(self.text), len(self.summary)] |
|
|
|
try: |
|
|
|
if text_to_summary: |
|
return ratio[0] / ratio[1] |
|
else: |
|
return ratio[1] / ratio[0] |
|
|
|
except ZeroDivisionError: |
|
|
|
return 0 |
|
|
|
def _match(self, a, b): |
|
|
|
""" |
|
Raw procedure for matching summary in text, described in paper. |
|
""" |
|
|
|
self._matches = [] |
|
|
|
a_start = b_start = 0 |
|
|
|
while a_start < len(a): |
|
|
|
best_match = None |
|
best_match_length = 0 |
|
|
|
while b_start < len(b): |
|
|
|
if a[a_start] == b[b_start]: |
|
|
|
a_end = a_start |
|
b_end = b_start |
|
|
|
while a_end < len(a) and b_end < len(b) \ |
|
and b[b_end] == a[a_end]: |
|
b_end += 1 |
|
a_end += 1 |
|
|
|
length = a_end - a_start |
|
|
|
if length > best_match_length: |
|
best_match = Fragments.Match(a_start, b_start, length) |
|
best_match_length = length |
|
|
|
b_start = b_end |
|
|
|
else: |
|
|
|
b_start += 1 |
|
|
|
b_start = 0 |
|
|
|
if best_match: |
|
|
|
if best_match_length > 0: |
|
self._matches.append(best_match) |
|
|
|
a_start += best_match_length |
|
|
|
else: |
|
|
|
a_start += 1 |
|
|
|
def _htmltokens(self, tokens): |
|
|
|
""" |
|
Carefully process tokens to handle whitespace and HTML characters. |
|
""" |
|
|
|
return [ |
|
[ |
|
_html.escape(t.text).replace("\n", "<br/>"), |
|
_html.escape(t.whitespace_).replace("\n", "<br/>") |
|
] |
|
|
|
for t in tokens |
|
] |
|
|
|
def annotate(self, min_length=0, text_truncation=None, novel_italics=False): |
|
|
|
""" |
|
Used to annotate fragments for website visualization. |
|
Arguments: |
|
- min_length (int): minimum length overlap to count (default = 0) |
|
- text_truncation (int): tuncated text length (default = None) |
|
- novel_italics (bool): italicize novel words (default = True) |
|
Returns: |
|
- a tuple of strings: (summary HTML, text HTML) |
|
""" |
|
|
|
start = """ |
|
<u |
|
style="color: {color}; border-color: {color};" |
|
data-ref="{ref}" title="Length: {length}" |
|
> |
|
""".strip() |
|
|
|
end = """ |
|
</u> |
|
""".strip() |
|
|
|
|
|
|
|
|
|
summary = self._htmltokens(self.summary) |
|
text = self._htmltokens(self.text) |
|
|
|
|
|
|
|
if novel_italics: |
|
|
|
novel = set(self._norm_summary) - set(self._norm_text) |
|
|
|
for word_whitespace in summary: |
|
|
|
if word_whitespace[0].lower() in novel: |
|
word_whitespace[0] = "<em>" + word_whitespace[0] + "</em>" |
|
|
|
|
|
|
|
|
|
if text_truncation is not None: |
|
text = text[:text_truncation] |
|
|
|
|
|
|
|
colors = self._itercolors() |
|
|
|
for overlap in self.overlaps(): |
|
|
|
|
|
|
|
if overlap.length < min_length: |
|
continue |
|
|
|
|
|
|
|
|
|
ref = _random.randint(0, 1e10) |
|
color = next(colors) |
|
|
|
|
|
|
|
summary[overlap.summary][0] = start.format( |
|
color=color, |
|
ref=ref, |
|
length=overlap.length, |
|
) + summary[overlap.summary][0] |
|
|
|
|
|
|
|
text[overlap.text][0] = start.format( |
|
color=color, |
|
ref=ref, |
|
length=overlap.length, |
|
) + text[overlap.text][0] |
|
|
|
|
|
|
|
summary[overlap.summary + overlap.length - 1][0] += end |
|
|
|
|
|
|
|
text[overlap.text + overlap.length - 1][0] += end |
|
|
|
|
|
|
|
summary = " ".join("".join("".join(tw) for tw in summary).split()) |
|
text = " ".join("".join("".join(tw) for tw in text).split()) |
|
|
|
|
|
|
|
return summary, text |
|
|
|
def _itercolors(self): |
|
|
|
|
|
|
|
return _itertools.cycle(( |
|
|
|
"#393b79", |
|
"#5254a3", |
|
"#6b6ecf", |
|
"#9c9ede", |
|
"#637939", |
|
"#8ca252", |
|
"#b5cf6b", |
|
"#cedb9c", |
|
"#8c6d31", |
|
"#bd9e39", |
|
"#e7ba52", |
|
"#e7cb94", |
|
"#843c39", |
|
"#ad494a", |
|
"#d6616b", |
|
"#e7969c", |
|
"#7b4173", |
|
"#a55194", |
|
"#ce6dbd", |
|
"#de9ed6", |
|
|
|
)) |
|
|
|
|
|
|