File size: 6,796 Bytes
8360ec7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import bs4
import spacy
import requests
from collections import Counter
from string import punctuation
from typing import List, Dict, Tuple, Any


def is_tag_visible(element: bs4.element) -> bool:
    """Determines if an HTML element is visible.

    Args:
        element: A BeautifulSoup element to check the visiblity of.
    returns:
        Whether the element is visible.
    """
    if element.parent.name in [
        "style",
        "script",
        "head",
        "title",
        "meta",
        "[document]",
    ] or isinstance(element, bs4.element.Comment):
        return False
    return True


def scrape_url(url: str, timeout: float = 3) -> Tuple[str, str]:
    """Scrapes a URL for all text information.

    Args:
        url: URL of webpage to scrape.
        timeout: Timeout of the requests call.
    Returns:
        web_text: The visible text of the scraped URL.
        url: URL input.
    """
    # Scrape the URL
    try:
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()
    except requests.exceptions.RequestException as _:
        print("URL Require Error.")
        return None, url

    # Extract out all text from the tags
    try:
        soup = bs4.BeautifulSoup(response.text, "html.parser")
        texts = soup.findAll(text=True)
        # Filter out invisible text from the page.
        visible_text = filter(is_tag_visible, texts)
    except Exception as _:
        print("BS4 Error.")
        return None, url

    # Returns all the text concatenated as a string.
    web_text = " ".join(t.strip() for t in visible_text).strip()
    # Clean up spacing.
    web_text = " ".join(web_text.split())
    return web_text, url


def get_hotwords(text: str, top_k: int = 10) -> List[str]:
    """# extract key words for a text, return most frequent topk keywords
    """
    nlp = spacy.load("en_core_web_sm")
    pos_tag = ['PROPN', 'ADJ', 'NOUN'] 
    doc = nlp(text.lower()) 

    result = []
    for token in doc:
        if(token.text in nlp.Defaults.stop_words or token.text in punctuation):
            continue
        if(token.pos_ in pos_tag):
            result.append(token.text)
            
    most_common_list = Counter(result).most_common(top_k)
    keywords = [item[0] for item in most_common_list]
    return keywords


def select_doc_by_keyword_coverage(claim: str, docs: List[str], 
                                   top_k_keywords: int = 10, top_k_docs: int = 5) -> List[int]:
    """count how many keywords appeared in this document len(appeared_keywords)
       sort documents by the count that represents the degree of coverage of the claim for the doc
       return index of top-k docs"""
    # get keywords in the claim.
    keywords = get_hotwords(claim, top_k_keywords)
    
    # how many keywords are contained in each doc
    counts = []
    for doc in docs:
        doc = doc.lower() # as all keywords are lowercase
        count = [1 for word in keywords if word in doc]
        counts.append(sum(count))

    # we keep the docs that contain the most keywords, as we aim to cut off lots of unrelevant docs
    max_count = max(counts)
    selected_docs_index = [i for i in range(len(docs)) if counts[i] == max_count]
    if len(selected_docs_index) < top_k_docs:
        # we sort docs by coverage, then keep top-K
        docs_index_sorted_coverage = sorted(range(len(counts)), key=lambda k: counts[k], reverse=True)
        selected_docs_index = docs_index_sorted_coverage[:top_k_docs]
    
    print("There are {} web pages selected.".format(len(selected_docs_index)))
    return selected_docs_index


def chunk_text(text: str, sentences_per_passage: int, 
               filter_sentence_len: int, sliding_distance: int = None) -> List[str]:
    """Chunks text into passages using a sliding window.

    Args:
        text: Text to chunk into passages.
        sentences_per_passage: Number of sentences for each passage.
        filter_sentence_len: Maximum number of chars of each sentence before being filtered.
        sliding_distance: Sliding distance over the text. Allows the passages to have
            overlap. The sliding distance cannot be greater than the window size.
    Returns:
        passages: Chunked passages from the text.
    """
    TOKENIZER = spacy.load("en_core_web_sm", disable=["ner", "tagger", "lemmatizer"])
    if not sliding_distance or sliding_distance > sentences_per_passage:
        sliding_distance = sentences_per_passage
    assert sentences_per_passage > 0 and sliding_distance > 0

    passages = []
    try:
        doc = TOKENIZER(text[:500000])  # Take 500k chars to not break tokenization.
        sents = [
            s.text
            for s in doc.sents
            if len(s.text) <= filter_sentence_len  # Long sents are usually metadata.
        ]
        for idx in range(0, len(sents), sliding_distance):
            passages.append(" ".join(sents[idx : idx + sentences_per_passage]))
    except UnicodeEncodeError as _:  # Sometimes run into Unicode error when tokenizing.
        print("Unicode error when using Spacy. Skipping text.")

    return passages


def select_passages_by_semantic_similarity(claim: str, selected_docs: List[str],
                                           max_sentences_per_passage: int = 3, filter_sentence_len: int = 250,
                                           sliding_distance: int = 3, top_k_passage: int = 5) -> Tuple[list, list]:
    passages: List[str] = []
    for doc in selected_docs:
        # RARR default setting (5, 250, 1) for chunk
        snippets = chunk_text(doc, max_sentences_per_passage, filter_sentence_len, sliding_distance) 
        passages.extend(snippets)
    passages = list(set(passages)) # remove repeated ones
    print("{} snippets of text are splitted.".format(len(passages)))
    
    # score each snippet of text against claim
    nlp = spacy.load("en_core_web_sm")
    claim = nlp(claim)
    sim = []
    for p in passages:
        sim.append(claim.similarity(nlp(p)))
    
    # sort by similarity score and keep topk
    index_sorted_sim = sorted(range(len(sim)), key=lambda k: sim[k], reverse=True)
    topk_passages = [passages[i] for i in index_sorted_sim[:top_k_passage]]

    # find docs of topk_passages: one passage may occur in multiple docs
    passage_doc_id: List[list] = []
    for p in topk_passages:
        temp = []
        for id, doc in enumerate(selected_docs):
            if p in doc:
                temp.append(id)
        # if fail to find docs of this passage, just pass.
        # this will lead some [], [], [] in evidence list for this snippet of text 
        if len(temp) == 0:
            print("Error in matching selected passage to its docs!")
        passage_doc_id.append(temp)

    return topk_passages, passage_doc_id