|
import requests |
|
import json |
|
|
|
|
|
class VectaraQuery(): |
|
def __init__(self, api_key: str, corpus_keys: list[str], prompt_name: str = None): |
|
self.corpus_keys = corpus_keys |
|
self.api_key = api_key |
|
self.prompt_name = prompt_name if prompt_name else "vectara-summary-ext-24-05-sml" |
|
self.conv_id = None |
|
|
|
|
|
def get_body(self, query_str: str, stream: False): |
|
corpora_list = [{ |
|
'corpus_key': corpus_key, 'lexical_interpolation': 0.005 |
|
} for corpus_key in self.corpus_keys |
|
] |
|
|
|
return { |
|
'query': query_str, |
|
'search': |
|
{ |
|
'corpora': corpora_list, |
|
'offset': 0, |
|
'limit': 50, |
|
'context_configuration': |
|
{ |
|
'sentences_before': 2, |
|
'sentences_after': 2, |
|
'start_tag': "%START_SNIPPET%", |
|
'end_tag': "%END_SNIPPET%", |
|
}, |
|
'reranker': |
|
{ |
|
'type': 'customer_reranker', |
|
'reranker_id': 'rnk_272725719' |
|
}, |
|
}, |
|
'generation': |
|
{ |
|
'prompt_name': self.prompt_name, |
|
'max_used_search_results': 10, |
|
'response_language': 'eng', |
|
'citations': |
|
{ |
|
'style': 'none' |
|
} |
|
}, |
|
'chat': |
|
{ |
|
'store': True |
|
}, |
|
'stream_response': stream |
|
} |
|
|
|
|
|
def get_headers(self): |
|
return { |
|
"Content-Type": "application/json", |
|
"Accept": "application/json", |
|
"x-api-key": self.api_key, |
|
"grpc-timeout": "60S" |
|
} |
|
|
|
def get_stream_headers(self): |
|
return { |
|
"Content-Type": "application/json", |
|
"Accept": "text/event-stream", |
|
"x-api-key": self.api_key, |
|
"grpc-timeout": "60S" |
|
} |
|
|
|
def submit_query(self, query_str: str): |
|
|
|
if self.conv_id: |
|
endpoint = f"https://api.vectara.io/v2/chats/{self.conv_id}/turns" |
|
else: |
|
endpoint = "https://api.vectara.io/v2/chats" |
|
|
|
body = self.get_body(query_str, stream=False) |
|
|
|
response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_headers()) |
|
|
|
if response.status_code != 200: |
|
print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}") |
|
if response.status_code == 429: |
|
return "Sorry, Vectara chat turns exceeds plan limit." |
|
return "Sorry, something went wrong in my brain. Please try again later." |
|
|
|
res = response.json() |
|
|
|
if self.conv_id is None: |
|
self.conv_id = res['chat_id'] |
|
|
|
summary = res['answer'] |
|
|
|
return summary |
|
|
|
def submit_query_streaming(self, query_str: str): |
|
|
|
if self.conv_id: |
|
endpoint = f"https://api.vectara.io/v2/chats/{self.conv_id}/turns" |
|
else: |
|
endpoint = "https://api.vectara.io/v2/chats" |
|
|
|
body = self.get_body(query_str, stream=True) |
|
|
|
response = requests.post(endpoint, data=json.dumps(body), verify=True, headers=self.get_stream_headers(), stream=True) |
|
|
|
if response.status_code != 200: |
|
print(f"Query failed with code {response.status_code}, reason {response.reason}, text {response.text}") |
|
if response.status_code == 429: |
|
return "Sorry, Vectara chat turns exceeds plan limit." |
|
return "Sorry, something went wrong in my brain. Please try again later." |
|
|
|
chunks = [] |
|
for line in response.iter_lines(): |
|
line = line.decode('utf-8') |
|
if line: |
|
key, value = line.split(':', 1) |
|
if key == 'data': |
|
line = json.loads(value) |
|
if line['type'] == 'generation_chunk': |
|
chunk = line['generation_chunk'] |
|
chunks.append(chunk) |
|
yield chunk |
|
|
|
return ''.join(chunks) |