IRS-chat / query.py
github-actions
Sync updates from source repository
674c19e
raw
history blame
8.55 kB
import requests
import json
import re
from urllib.parse import quote
def extract_between_tags(text, start_tag, end_tag):
start_index = text.find(start_tag)
end_index = text.find(end_tag, start_index)
return text[start_index+len(start_tag):end_index-len(end_tag)]
class CitationNormalizer():
def __init__(self, responses, docs):
self.docs = docs
self.responses = responses
self.refs = []
def normalize_citations(self, summary):
start_tag = "%START_SNIPPET%"
end_tag = "%END_SNIPPET%"
# find all references in the summary
pattern = r'\[\d{1,2}\]'
matches = [match.span() for match in re.finditer(pattern, summary)]
# figure out unique list of references
for match in matches:
start, end = match
response_num = int(summary[start+1:end-1])
doc_num = self.responses[response_num-1]['documentIndex']
metadata = {item['name']: item['value'] for item in self.docs[doc_num]['metadata']}
text = extract_between_tags(self.responses[response_num-1]['text'], start_tag, end_tag)
if 'url' in metadata.keys():
url = f"{metadata['url']}#:~:text={quote(text)}"
if url not in self.refs:
self.refs.append(url)
# replace references with markdown links
refs_dict = {url:(inx+1) for inx,url in enumerate(self.refs)}
for match in reversed(matches):
start, end = match
response_num = int(summary[start+1:end-1])
doc_num = self.responses[response_num-1]['documentIndex']
metadata = {item['name']: item['value'] for item in self.docs[doc_num]['metadata']}
text = extract_between_tags(self.responses[response_num-1]['text'], start_tag, end_tag)
if 'url' in metadata.keys():
url = f"{metadata['url']}#:~:text={quote(text)}"
citation_inx = refs_dict[url]
summary = summary[:start] + f'[\[{citation_inx}\]]({url})' + summary[end:]
else:
summary = summary[:start] + summary[end:]
return summary
class VectaraQuery():
def __init__(self, api_key: str, customer_id: str, corpus_ids: list[str], prompt_name: str = None):
self.customer_id = customer_id
self.corpus_ids = corpus_ids
self.api_key = api_key
self.prompt_name = prompt_name if prompt_name else "vectara-experimental-summary-ext-2023-12-11-sml"
self.conv_id = None
def get_body(self, query_str: str):
corpora_key_list = [{
'customer_id': self.customer_id, 'corpus_id': corpus_id, 'lexical_interpolation_config': {'lambda': 0.025}
} for corpus_id in self.corpus_ids
]
return {
'query': [
{
'query': query_str,
'start': 0,
'numResults': 50,
'corpusKey': corpora_key_list,
'context_config': {
'sentences_before': 2,
'sentences_after': 2,
'start_tag': "%START_SNIPPET%",
'end_tag': "%END_SNIPPET%",
},
'rerankingConfig':
{
'rerankerId': 272725718,
'mmrConfig': {
'diversityBias': 0.3
}
},
'summary': [
{
'responseLang': 'eng',
'maxSummarizedResults': 5,
'summarizerPromptName': self.prompt_name,
'chat': {
'store': True,
'conversationId': self.conv_id
},
}
]
}
]
}
def get_headers(self):
return {
"Content-Type": "application/json",
"Accept": "application/json",
"customer-id": self.customer_id,
"x-api-key": self.api_key,
"grpc-timeout": "60S"
}
def submit_query(self, query_str: str):
endpoint = f"https://api.vectara.io/v1/query"
body = self.get_body(query_str)
response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_headers())
if response.status_code != 200:
print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}")
return "Sorry, something went wrong in my brain. Please try again later."
res = response.json()
top_k = 10
summary = res['responseSet'][0]['summary'][0]['text']
responses = res['responseSet'][0]['response'][:top_k]
docs = res['responseSet'][0]['document']
chat = res['responseSet'][0]['summary'][0].get('chat', None)
if chat and chat['status'] is not None:
st_code = chat['status']
print(f"Chat query failed with code {st_code}")
if st_code == 'RESOURCE_EXHAUSTED':
self.conv_id = None
return 'Sorry, Vectara chat turns exceeds plan limit.'
return 'Sorry, something went wrong in my brain. Please try again later.'
self.conv_id = chat['conversationId'] if chat else None
summary = CitationNormalizer(responses, docs).normalize_citations(summary)
return summary
def submit_query_streaming(self, query_str: str):
endpoint = f"https://api.vectara.io/v1/stream-query"
body = self.get_body(query_str)
response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_headers(), stream=True)
if response.status_code != 200:
print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}")
return "Sorry, something went wrong in my brain. Please try again later."
chunks = []
accumulated_text = "" # Initialize text accumulation
pattern_max_length = 50 # Example heuristic
for line in response.iter_lines():
if line: # filter out keep-alive new lines
data = json.loads(line.decode('utf-8'))
res = data['result']
response_set = res['responseSet']
if response_set is None:
# grab next chunk and yield it as output
summary = res.get('summary', None)
if summary is None or len(summary)==0:
continue
else:
chat = summary.get('chat', None)
if chat and chat.get('status', None):
st_code = chat['status']
print(f"Chat query failed with code {st_code}")
if st_code == 'RESOURCE_EXHAUSTED':
self.conv_id = None
return 'Sorry, Vectara chat turns exceeds plan limit.'
return 'Sorry, something went wrong in my brain. Please try again later.'
conv_id = chat.get('conversationId', None) if chat else None
if conv_id:
self.conv_id = conv_id
chunk = summary['text']
accumulated_text += chunk # Append current chunk to accumulation
if len(accumulated_text) > pattern_max_length:
accumulated_text = re.sub(r"\[\d+\]", "", accumulated_text)
accumulated_text = re.sub(r"\s+\.", ".", accumulated_text)
out_chunk = accumulated_text[:-pattern_max_length]
chunks.append(out_chunk)
yield out_chunk
accumulated_text = accumulated_text[-pattern_max_length:]
if summary['done']:
break
# yield the last piece
if len(accumulated_text) > 0:
accumulated_text = re.sub(r" \[\d+\]\.", ".", accumulated_text)
chunks.append(accumulated_text)
yield accumulated_text
return ''.join(chunks)