Spaces:
Sleeping
Sleeping
# Streamlit App to perform the conversational retrieval using ConversationalResponse class | |
# 1. Main Title of App | |
# 2. PDF File Loader | |
# 3. Streaming Chat Window to ask questions and get answers from ConversationalResponse | |
# 4. Callback Handler to stream the output of the ConversationalResponse | |
# 5. Handle the chat interaction with the ConversationalResponse | |
import streamlit as st | |
from streamlit_chat import message | |
from langchain.callbacks.base import BaseCallbackHandler | |
from src.main import ConversationalResponse | |
import os | |
# Constants | |
ROLE_USER = "user" | |
ROLE_ASSISTANT = "assistant" | |
st.set_page_config(page_title="Chat with Documents", page_icon="π¦") | |
st.title("Chat with PDF Documents π€π") | |
st.markdown("by [Rohan Kataria](https://www.linkedin.com/in/imrohan/) view more at [VEW.AI](https://vew.ai/)") | |
#streamlit message block | |
st.markdown("This app allows you to chat with documents. You can upload a PDF file and ask questions about it. In the backround uses the ConversationalRetrival chain from langchain and Streamlit for UI.") | |
class StreamHandler(BaseCallbackHandler): | |
""" | |
StreamHandler is a callback handler that streams the output of the ConversationalResponse. | |
""" | |
def __init__(self, container: st.delta_generator.DeltaGenerator, initial_text: str = ""): | |
self.container = container | |
self.text = initial_text | |
def on_llm_new_token(self, token: str, **kwargs) -> None: | |
self.text += token | |
self.container.markdown(self.text) | |
def load_agent(file_path, api_key): | |
""" | |
Load the ConversationalResponse agent from the given file path. | |
""" | |
with st.spinner('Loading the file...'): | |
agent = ConversationalResponse(file_path, api_key) | |
st.success("File Loaded Successfully") | |
return agent | |
def handle_chat(agent): | |
""" | |
Handle the chat interaction with the user. | |
""" | |
if "messages" not in st.session_state or st.sidebar.button("Clear message history"): | |
st.session_state["messages"] = [{"role": ROLE_ASSISTANT, "content": "How can I help you?"}] | |
for msg in st.session_state.messages: | |
st.chat_message(msg["role"]).write(msg["content"]) | |
user_query = st.chat_input(placeholder="Ask me anything!") | |
if user_query: | |
st.session_state.messages.append({"role": ROLE_USER, "content": user_query}) | |
st.chat_message(ROLE_USER).write(user_query) | |
# Generate the response | |
with st.spinner("Generating response"): | |
response = agent(user_query) | |
# Display the response immediately | |
st.chat_message(ROLE_ASSISTANT).write(response) | |
# Add the response to the message history | |
st.session_state.messages.append({"role": ROLE_ASSISTANT, "content": response}) | |
def main(): | |
""" | |
Main function to handle file upload and chat interaction. | |
""" | |
# API Key Loader | |
api_key = st.sidebar.text_input("Enter your OpenAI API Key", type="password") | |
if api_key: | |
os.environ["OPENAI_API_KEY"] = api_key | |
else: | |
st.sidebar.error("Please enter your OpenAI API Key.") | |
return | |
# PDF File Loader to upload the file in the sidebar in session state | |
uploaded_file = st.sidebar.file_uploader("Choose a PDF file", type="pdf") | |
if uploaded_file is None: | |
st.error("Please upload a file.") | |
return | |
file_details = {"FileName":uploaded_file.name,"FileType":uploaded_file.type,"FileSize":uploaded_file.size} | |
st.write(file_details) | |
# Create a temp folder | |
if not os.path.exists("temp"): | |
os.mkdir("temp") | |
# Save the file in temp folder | |
file_path = os.path.join("temp",uploaded_file.name) | |
with open(file_path,"wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
agent = load_agent(file_path, api_key) | |
handle_chat(agent) | |
# Delete the file from temp folder | |
os.remove(file_path) | |
if __name__ == "__main__": | |
main() | |