Spaces:
Runtime error
Runtime error
import streamlit as st | |
from openai.error import OpenAIError | |
from .utils import * | |
from typing import Text, Union | |
multiple_files = True | |
def query_pipeline(index:VectorStore, query:Text, stream_answer:bool=False, n_sources:int=5)->Text: | |
"""This function reproduces the querying pipeline considering a given input index.""" | |
# retrieving the most relevant pieces of information within the knowledge base | |
sources = search_docs(index, query=query, k=n_sources) | |
# getting the answer, all at once | |
answer = get_answer(sources, query=query, stream_answer=stream_answer)["output_text"] | |
return answer | |
def toggle_process_document(): | |
"""Toggles the greenlight for the next step in the pipeline, i.e. processing the document.""" | |
if "processing_document_greenlight" not in st.session_state: | |
st.session_state["processing_document_greenlight"] = True | |
st.session_state["processing_document_greenlight"] = not st.session_state["processing_document_greenlight"] | |
def register_new_file_name(file_name): | |
""" | |
Registers a new file name in the internal session state. | |
""" | |
if "uploaded_file_names" not in st.session_state: | |
st.session_state["uploaded_file_names"] = [] | |
st.session_state["uploaded_file_names"].append(file_name) | |
def clear_index(): | |
""" | |
Clears the index from the internal session state. | |
This is a non reversible operation. | |
""" | |
if "index" in st.session_state: | |
del globals()["index"] | |
def clear_session_state(): | |
""" | |
Clears the session state iterating over keys. | |
This is a non reversible operation. | |
""" | |
for k in st.session_state.keys(): | |
del st.session_state[k] | |
def register_new_file(new_file): | |
""" | |
Registers a new file in the internal session state. | |
""" | |
if "uploaded_files" not in st.session_state: | |
st.session_state["uploaded_files"] = [] | |
st.session_state["uploaded_files"].extend(new_file) | |
def clear_all_files(): | |
"""Removes all uploaded files from the interal session state.""" | |
st.session_state["uploaded_files"] = [] | |
def append_uploaded_files(file): | |
"""Appends the uploaded files to the internal session state.""" | |
st.session_state.get("uploaded_files", []).extend(file) | |
def set_openai_api_key(api_key:Text)->bool: | |
"""Sets the internal OpenAI API key to the given value. | |
Args: | |
api_key (Text): OpenAI API key | |
""" | |
if not check_openai_api_key(api_key=api_key): | |
raise ValueError("Invalid OpenAI API key! Please provide a valid key.") | |
st.session_state["OPENAI_API_KEY"] = api_key | |
st.session_state["api_key_configured"] = True | |
return True | |
def parse_file(file:Union[PDFFile, DocxFile, TxtFile, CodeFile]) -> None: | |
"""Converts a file to a document using specialized parsers.""" | |
if file.name.endswith(".pdf"): | |
doc = parse_pdf(file) | |
elif file.name.endswith(".docx"): | |
doc = parse_docx(file) | |
elif file.name.split["."][1] in [".txt", ".py", ".json", ".html", ".css", ".md" ]: | |
doc = parse_txt(file) | |
else: | |
st.error("File type not yet supported! Supported files: [.pdf, .docx, .txt, .py, .json, .html, .css, .md]") | |
doc = None | |
return doc | |
# this function can be used to define a single doc processing pipeline | |
# def document_embedding_pipeline(file:Union[PDFFile, DocxFile, TxtFile, CodeFile]) -> None: | |
def qa_main(): | |
"""Main function for the QA app.""" | |
st.title("Chat with a file 💬📖") | |
st.write("Just upload something using and start chatting with a version of GPT4 that has read the file!") | |
# OpenAI API Key - TODO: consider adding a key valid for everyone | |
# st.header("Configure OpenAI API Key") | |
# st.warning('Please enter your OpenAI API Key!', icon='⚠️') | |
# uncomment the following lines to add a user-specific key | |
# user_secret = st.text_input( | |
# "Insert your OpenAI API key here ([get your API key](https://platform.openai.com/account/api-keys)).", | |
# type="password", | |
# placeholder="Paste your OpenAI API key here (sk-...)", | |
# help="You can get your API key from https://platform.openai.com/account/api-keys.", | |
# value=st.session_state.get("OPENAI_API_KEY", ""), | |
# ) | |
user_secret = st.secrets["OPENAI_API_KEY"] | |
if user_secret: | |
if set_openai_api_key(user_secret): | |
# removing this when the OpenAI API key is hardcoded | |
# st.success('OpenAI API key successfully accessed!', icon='✅') | |
# greenlight for next step, i.e. uploading the document to chat with | |
st.session_state["upload_document_greenlight"] = True | |
if st.session_state.get("upload_document_greenlight"): | |
# File that needs to be queried | |
st.header("Upload a file") | |
st.file_uploader( | |
"Upload a pdf, docx, or txt file (scanned documents not supported)", | |
type=["pdf", "docx", "txt", "py", "json", "html", "css", "md"], | |
help="Scanned documents are not supported yet 🥲", | |
accept_multiple_files=multiple_files, | |
#on_change=toggle_process_document, | |
key="uploaded_file" | |
) | |
documents = {} | |
indexes = {} | |
for file in st.session_state["uploaded_file"]: | |
parsed_file = parse_file(file) | |
# converts the files into a list of documents | |
document = text_to_docs(pages=tuple(parsed_file), file_name=file.name) | |
documents[file.name] = document | |
with st.spinner(f"Indexing {file.name} (might take some time)"): | |
try: | |
# indexing the document uploaded | |
indexes[file.name] = embed_docs(file_name=file.name, _docs=tuple(document)) | |
except OpenAIError as e: | |
st.error("OpenAI error encountered: ", e._message) | |
if len(documents)>1: | |
# documents to be indexed when providing the query | |
st.multiselect( | |
label="Select the documents to be indexed", | |
options=list(documents.keys()), | |
key="multiselect_documents_choices", | |
) | |
elif len(documents)==1: | |
st.session_state["multiselect_documents_choices"] = [list(documents.keys())[0]] | |
# this is the code that actually performs the chat process | |
if "messages" not in st.session_state: # checking if there is any cache history | |
st.session_state["messages"] = [] | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"], unsafe_allow_html=True) | |
if prompt:=st.chat_input("Ask the document something..."): | |
if prompt=="1": | |
prompt="What is this document about?" | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
with st.chat_message("assistant"): | |
# full_response will store every question asked to all the document(s) considered | |
full_response = "" | |
message_placeholder = st.empty() | |
# asking the same question to all of the documents considered | |
for chat_document in st.session_state["multiselect_documents_choices"]: | |
# keeping track of what is asked to what document | |
full_response += \ | |
f"<i>Asking</i> <b>{chat_document}</b> <i>question</i> <b>{prompt}</b></i><br>" | |
message_placeholder.markdown(full_response, unsafe_allow_html=True) | |
with st.spinner("Querying the document..."): | |
# retrieving the vector store associated to the chat document considered | |
chat_index = indexes[chat_document] | |
# producing the answer considered, live | |
for answer_bit in query_pipeline(chat_index, prompt, stream_answer=True, n_sources=20): | |
full_response += answer_bit | |
message_placeholder.markdown(full_response + "▌", unsafe_allow_html=True) | |
# appending a final entering | |
full_response += "<br>" | |
message_placeholder.markdown(full_response, unsafe_allow_html=True) | |
# appending the final response obtained after having asked all the documents | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |