File size: 8,076 Bytes
8360ec7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
"""Utils for searching a query and returning top passages from search results."""
import concurrent.futures
import itertools
import os
import random
from typing import Any, Dict, List, Tuple
import bs4
import requests
import spacy
import torch
from sentence_transformers import CrossEncoder
PASSAGE_RANKER = CrossEncoder(
"cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length=512,
device="cpu",
)
SEARCH_URL = "https://api.bing.microsoft.com/v7.0/search/"
SUBSCRIPTION_KEY = os.getenv("AZURE_SEARCH_KEY")
TOKENIZER = spacy.load("en_core_web_sm", disable=["ner", "tagger", "lemmatizer"])
def chunk_text(
text: str,
sentences_per_passage: int,
filter_sentence_len: int,
sliding_distance: int = None,
) -> List[str]:
"""Chunks text into passages using a sliding window.
Args:
text: Text to chunk into passages.
sentences_per_passage: Number of sentences for each passage.
filter_sentence_len: Maximum number of chars of each sentence before being filtered.
sliding_distance: Sliding distance over the text. Allows the passages to have
overlap. The sliding distance cannot be greater than the window size.
Returns:
passages: Chunked passages from the text.
"""
if not sliding_distance or sliding_distance > sentences_per_passage:
sliding_distance = sentences_per_passage
assert sentences_per_passage > 0 and sliding_distance > 0
passages = []
try:
doc = TOKENIZER(text[:500000]) # Take 500k chars to not break tokenization.
sents = [
s.text
for s in doc.sents
if len(s.text) <= filter_sentence_len # Long sents are usually metadata.
]
for idx in range(0, len(sents), sliding_distance):
passages.append(" ".join(sents[idx : idx + sentences_per_passage]))
except UnicodeEncodeError as _: # Sometimes run into Unicode error when tokenizing.
print("Unicode error when using Spacy. Skipping text.")
return passages
def is_tag_visible(element: bs4.element) -> bool:
"""Determines if an HTML element is visible.
Args:
element: A BeautifulSoup element to check the visiblity of.
returns:
Whether the element is visible.
"""
if element.parent.name in [
"style",
"script",
"head",
"title",
"meta",
"[document]",
] or isinstance(element, bs4.element.Comment):
return False
return True
def scrape_url(url: str, timeout: float = 3) -> Tuple[str, str]:
"""Scrapes a URL for all text information.
Args:
url: URL of webpage to scrape.
timeout: Timeout of the requests call.
Returns:
web_text: The visible text of the scraped URL.
url: URL input.
"""
# Scrape the URL
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as _:
return None, url
# Extract out all text from the tags
try:
soup = bs4.BeautifulSoup(response.text, "html.parser")
texts = soup.findAll(text=True)
# Filter out invisible text from the page.
visible_text = filter(is_tag_visible, texts)
except Exception as _:
return None, url
# Returns all the text concatenated as a string.
web_text = " ".join(t.strip() for t in visible_text).strip()
# Clean up spacing.
web_text = " ".join(web_text.split())
return web_text, url
def search_bing(query: str, timeout: float = 3) -> List[str]:
"""Searches the query using Bing.
Args:
query: Search query.
timeout: Timeout of the requests call.
Returns:
search_results: A list of the top URLs relevant to the query.
"""
headers = {"Ocp-Apim-Subscription-Key": os.getenv("AZURE_SEARCH_KEY")}
params = {"q": query, "textDecorations": True, "textFormat": "HTML"}
response = requests.get(SEARCH_URL, headers=headers, params=params, timeout=timeout)
response.raise_for_status()
response = response.json()
search_results = [r["url"] for r in response["webPages"]["value"]]
return search_results
def run_search(
query: str,
cached_search_results: List[str] = None,
max_search_results_per_query: int = 3,
max_sentences_per_passage: int = 5,
sliding_distance: int = 1,
max_passages_per_search_result_to_return: int = 1,
timeout: float = 3,
randomize_num_sentences: bool = False,
filter_sentence_len: int = 250,
max_passages_per_search_result_to_score: int = 30,
) -> List[Dict[str, Any]]:
"""Searches the query on a search engine and returns the most relevant information.
Args:
query: Search query.
max_search_results_per_query: Maximum number of search results to get return.
max_sentences_per_passage: Maximum number of sentences for each passage.
filter_sentence_len: Maximum length of a sentence before being filtered.
sliding_distance: Sliding distance over the sentences of each search result.
Used to extract passages.
max_passages_per_search_result_to_score: Maxinum number of passages to score for
each search result.
max_passages_per_search_result_to_return: Maximum number of passages to return
for each search result.
Returns:
retrieved_passages: Top retrieved passages for the search query.
"""
if cached_search_results is not None:
search_results = cached_search_results
else:
search_results = search_bing(query, timeout=timeout)
# Scrape search results in parallel
with concurrent.futures.ThreadPoolExecutor() as e:
scraped_results = e.map(scrape_url, search_results, itertools.repeat(timeout))
# Remove URLs if we weren't able to scrape anything or if they are a PDF.
scraped_results = [r for r in scraped_results if r[0] and ".pdf" not in r[1]]
# Iterate through the scraped results and extract out the most useful passages.
retrieved_passages = []
for webtext, url in scraped_results[:max_search_results_per_query]:
if randomize_num_sentences:
sents_per_passage = random.randint(1, max_sentences_per_passage)
else:
sents_per_passage = max_sentences_per_passage
# Chunk the extracted text into passages.
passages = chunk_text(
text=webtext,
sentences_per_passage=sents_per_passage,
filter_sentence_len=filter_sentence_len,
sliding_distance=sliding_distance,
)
passages = passages[:max_passages_per_search_result_to_score]
if not passages:
continue
# Score the passages by relevance to the query using a cross-encoder.
scores = PASSAGE_RANKER.predict([(query, p) for p in passages]).tolist()
passage_scores = list(zip(passages, scores))
# Take the top passages_per_search passages for the current search result.
passage_scores.sort(key=lambda x: x[1], reverse=True)
for passage, score in passage_scores[:max_passages_per_search_result_to_return]:
retrieved_passages.append(
{
"text": passage,
"url": url,
"query": query,
"sents_per_passage": sents_per_passage,
"retrieval_score": score, # Cross-encoder score as retr score
}
)
if retrieved_passages:
# Sort all retrieved passages by the retrieval score.
retrieved_passages = sorted(
retrieved_passages, key=lambda d: d["retrieval_score"], reverse=True
)
# Normalize the retreival scores into probabilities
scores = [r["retrieval_score"] for r in retrieved_passages]
probs = torch.nn.functional.softmax(torch.Tensor(scores), dim=-1).tolist()
for prob, passage in zip(probs, retrieved_passages):
passage["score"] = prob
return retrieved_passages
|