File size: 3,939 Bytes
e49f5ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
from typing import List, Dict, Any, Tuple
import os
import logging
import requests
from time import sleep
from uuid import uuid4
import streamlit as st
API_ENDPOINT = "http://localhost:8000"
STATUS = "initialized"
HS_VERSION = "hs_version"
DOC_REQUEST = "query"
DOC_FEEDBACK = "feedback"
DOC_UPLOAD = "file-upload"
headers ={'api_key': 'mywonderfulapikey'}
def haystack_is_ready():
"""
Used to show the "Haystack is loading..." message
"""
url = f"{API_ENDPOINT}/{STATUS}"
try:
if requests.get(url, headers=headers).status_code < 400:
return True
except Exception as e:
logging.exception(e)
sleep(1) # To avoid spamming a non-existing endpoint at startup
return False
@st.cache
def haystack_version():
"""
Get the Haystack version from the REST API
"""
url = f"{API_ENDPOINT}/{HS_VERSION}"
return requests.get(url, timeout=0.1).json()["hs_version"]
def query(query, filters={}, top_k_reader=5, top_k_retriever=5) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""
Send a query to the REST API and parse the answer.
Returns both a ready-to-use representation of the results and the raw JSON.
"""
url = f"{API_ENDPOINT}/{DOC_REQUEST}"
params = {"filters": filters, "Retriever": {"top_k": top_k_retriever}, "Reader": {"top_k": top_k_reader}}
req = {"query": query, "params": params}
response_raw = requests.post(url, json=req, headers=headers)
if response_raw.status_code >= 400 and response_raw.status_code != 503:
raise Exception(f"{vars(response_raw)}")
response = response_raw.json()
if "errors" in response:
raise Exception(", ".join(response["errors"]))
# Format response
results = []
answers = response["answers"]
for answer in answers:
if answer.get("answer", None):
results.append(
{
"context": "..." + answer["context"] + "...",
"answer": answer.get("answer", None),
"source": answer["meta"]["name"],
"relevance": round(answer["score"] * 100, 2),
"document": [doc for doc in response["documents"] if doc["id"] == answer["document_id"]][0],
"offset_start_in_doc": answer["offsets_in_document"][0]["start"],
"_raw": answer
}
)
else:
results.append(
{
"context": None,
"answer": None,
"document": None,
"relevance": round(answer["score"] * 100, 2),
"_raw": answer,
}
)
return results, response
def send_feedback(query, answer_obj, is_correct_answer, is_correct_document, document) -> None:
"""
Send a feedback (label) to the REST API
"""
url = f"{API_ENDPOINT}/{DOC_FEEDBACK}"
req = {
"id": str(uuid4()),
"query": query,
"document": document,
"is_correct_answer": is_correct_answer,
"is_correct_document": is_correct_document,
"origin": "user-feedback",
"answer": answer_obj
}
response_raw = requests.post(url, json=req)
if response_raw.status_code >= 400:
raise ValueError(f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}")
def upload_doc(file):
url = f"{API_ENDPOINT}/{DOC_UPLOAD}"
files = [("files", file)]
response = requests.post(url, files=files).json()
return response
def get_backlink(result) -> Tuple[str, str]:
if result.get("document", None):
doc = result["document"]
if isinstance(doc, dict):
if doc.get("meta", None):
if isinstance(doc["meta"], dict):
if doc["meta"].get("url", None) :
return doc["meta"]["url"]
return None |