Spaces:
Runtime error
Runtime error
File size: 8,434 Bytes
51fe9d2 0489db2 0e17089 51fe9d2 07b1b19 4f5c619 07b1b19 4f5c619 0489db2 4f5c619 0489db2 4f5c619 0489db2 7a7c4d5 0489db2 51fe9d2 0489db2 4f5c619 7a7c4d5 51fe9d2 0489db2 7a7c4d5 51fe9d2 4f5c619 0489db2 51fe9d2 0489db2 07b1b19 0489db2 0e17089 4f5c619 0e17089 0489db2 7a7c4d5 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 0e17089 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 d5bd88b 4f5c619 07b1b19 4f5c619 51fe9d2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import streamlit as st
from openai.error import OpenAIError
from .utils import *
from typing import Text, Union
multiple_files = True
def query_pipeline(index:VectorStore, query:Text, stream_answer:bool=False, n_sources:int=5)->Text:
"""This function reproduces the querying pipeline considering a given input index."""
# retrieving the most relevant pieces of information within the knowledge base
sources = search_docs(index, query=query, k=n_sources)
# getting the answer, all at once
answer = get_answer(sources, query=query, stream_answer=stream_answer)["output_text"]
return answer
def toggle_process_document():
"""Toggles the greenlight for the next step in the pipeline, i.e. processing the document."""
if "processing_document_greenlight" not in st.session_state:
st.session_state["processing_document_greenlight"] = True
st.session_state["processing_document_greenlight"] = not st.session_state["processing_document_greenlight"]
def register_new_file_name(file_name):
"""
Registers a new file name in the internal session state.
"""
if "uploaded_file_names" not in st.session_state:
st.session_state["uploaded_file_names"] = []
st.session_state["uploaded_file_names"].append(file_name)
def clear_index():
"""
Clears the index from the internal session state.
This is a non reversible operation.
"""
if "index" in st.session_state:
del globals()["index"]
def clear_session_state():
"""
Clears the session state iterating over keys.
This is a non reversible operation.
"""
for k in st.session_state.keys():
del st.session_state[k]
def register_new_file(new_file):
"""
Registers a new file in the internal session state.
"""
if "uploaded_files" not in st.session_state:
st.session_state["uploaded_files"] = []
st.session_state["uploaded_files"].extend(new_file)
def clear_all_files():
"""Removes all uploaded files from the interal session state."""
st.session_state["uploaded_files"] = []
def append_uploaded_files(file):
"""Appends the uploaded files to the internal session state."""
st.session_state.get("uploaded_files", []).extend(file)
def set_openai_api_key(api_key:Text)->bool:
"""Sets the internal OpenAI API key to the given value.
Args:
api_key (Text): OpenAI API key
"""
if not check_openai_api_key(api_key=api_key):
raise ValueError("Invalid OpenAI API key! Please provide a valid key.")
st.session_state["OPENAI_API_KEY"] = api_key
st.session_state["api_key_configured"] = True
return True
def parse_file(file:Union[PDFFile, DocxFile, TxtFile, CodeFile]) -> None:
"""Converts a file to a document using specialized parsers."""
if file.name.endswith(".pdf"):
doc = parse_pdf(file)
elif file.name.endswith(".docx"):
doc = parse_docx(file)
elif file.name.split["."][1] in [".txt", ".py", ".json", ".html", ".css", ".md" ]:
doc = parse_txt(file)
else:
st.error("File type not yet supported! Supported files: [.pdf, .docx, .txt, .py, .json, .html, .css, .md]")
doc = None
return doc
# this function can be used to define a single doc processing pipeline
# def document_embedding_pipeline(file:Union[PDFFile, DocxFile, TxtFile, CodeFile]) -> None:
def qa_main():
"""Main function for the QA app."""
st.title("Chat with a file 💬📖")
st.write("Just upload something using and start chatting with a version of GPT4 that has read the file!")
# OpenAI API Key - TODO: consider adding a key valid for everyone
# st.header("Configure OpenAI API Key")
# st.warning('Please enter your OpenAI API Key!', icon='⚠️')
# uncomment the following lines to add a user-specific key
# user_secret = st.text_input(
# "Insert your OpenAI API key here ([get your API key](https://platform.openai.com/account/api-keys)).",
# type="password",
# placeholder="Paste your OpenAI API key here (sk-...)",
# help="You can get your API key from https://platform.openai.com/account/api-keys.",
# value=st.session_state.get("OPENAI_API_KEY", ""),
# )
user_secret = st.secrets["OPENAI_API_KEY"]
if user_secret:
if set_openai_api_key(user_secret):
# removing this when the OpenAI API key is hardcoded
# st.success('OpenAI API key successfully accessed!', icon='✅')
# greenlight for next step, i.e. uploading the document to chat with
st.session_state["upload_document_greenlight"] = True
if st.session_state.get("upload_document_greenlight"):
# File that needs to be queried
st.header("Upload a file")
st.file_uploader(
"Upload a pdf, docx, or txt file (scanned documents not supported)",
type=["pdf", "docx", "txt", "py", "json", "html", "css", "md"],
help="Scanned documents are not supported yet 🥲",
accept_multiple_files=multiple_files,
#on_change=toggle_process_document,
key="uploaded_file"
)
documents = {}
indexes = {}
for file in st.session_state["uploaded_file"]:
parsed_file = parse_file(file)
# converts the files into a list of documents
document = text_to_docs(pages=tuple(parsed_file), file_name=file.name)
documents[file.name] = document
with st.spinner(f"Indexing {file.name} (might take some time)"):
try:
# indexing the document uploaded
indexes[file.name] = embed_docs(file_name=file.name, _docs=tuple(document))
except OpenAIError as e:
st.error("OpenAI error encountered: ", e._message)
if len(documents)>1:
# documents to be indexed when providing the query
st.multiselect(
label="Select the documents to be indexed",
options=list(documents.keys()),
key="multiselect_documents_choices",
)
elif len(documents)==1:
st.session_state["multiselect_documents_choices"] = [list(documents.keys())[0]]
# this is the code that actually performs the chat process
if "messages" not in st.session_state: # checking if there is any cache history
st.session_state["messages"] = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"], unsafe_allow_html=True)
if prompt:=st.chat_input("Ask the document something..."):
if prompt=="1":
prompt="What is this document about?"
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
# full_response will store every question asked to all the document(s) considered
full_response = ""
message_placeholder = st.empty()
# asking the same question to all of the documents considered
for chat_document in st.session_state["multiselect_documents_choices"]:
# keeping track of what is asked to what document
full_response += \
f"<i>Asking</i> <b>{chat_document}</b> <i>question</i> <b>{prompt}</b></i><br>"
message_placeholder.markdown(full_response, unsafe_allow_html=True)
with st.spinner("Querying the document..."):
# retrieving the vector store associated to the chat document considered
chat_index = indexes[chat_document]
# producing the answer considered, live
for answer_bit in query_pipeline(chat_index, prompt, stream_answer=True, n_sources=20):
full_response += answer_bit
message_placeholder.markdown(full_response + "▌", unsafe_allow_html=True)
# appending a final entering
full_response += "<br>"
message_placeholder.markdown(full_response, unsafe_allow_html=True)
# appending the final response obtained after having asked all the documents
st.session_state.messages.append({"role": "assistant", "content": full_response})
|