rag / app.py
jessica45's picture
initial commit
8953dfc verified
raw
history blame
15.2 kB
# import streamlit as st
# from pdf_utils import extract_text_from_file, split_text
# from chroma_db_utils import create_chroma_db, load_chroma_collection
# from query_handler import handle_query
# import os
# import re
# import tempfile
# def generate_collection_name(file_path=None):
# """Generate a valid collection name from a file path."""
# base_name = os.path.basename(file_path) if file_path else "collection"
# # Remove file extension
# base_name = re.sub(r'\..*$', '', base_name)
# # Replace invalid characters and ensure it starts with a letter
# base_name = re.sub(r'\W+', '_', base_name)
# base_name = re.sub(r'^[^a-zA-Z]+', '', base_name)
# return base_name
# def process_uploaded_file(uploaded_file, chroma_db_path):
# """Process the uploaded file and create/load ChromaDB collection."""
# # Create a temporary file to store the uploaded content
# with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
# tmp_file.write(uploaded_file.getvalue())
# file_path = tmp_file.name
# try:
# # Generate collection name from original filename
# collection_name = generate_collection_name(uploaded_file.name)
# # Extract and process text
# file_text = extract_text_from_file(file_path)
# if file_text is None:
# return None, "Failed to extract text from the file."
# chunked_text = split_text(file_text)
# # Try to load existing collection or create new one
# try:
# db = load_chroma_collection(collection_name, chroma_db_path)
# st.success("Loaded existing ChromaDB collection.")
# except Exception:
# db = create_chroma_db(chunked_text, collection_name, chroma_db_path)
# st.success("Created new ChromaDB collection.")
# return db, None
# except Exception as e:
# return None, f"Error processing file: {str(e)}"
# finally:
# # Clean up temporary file
# os.unlink(file_path)
# def main():
# st.title("File Question Answering System")
# # Sidebar for configuration
# st.sidebar.header("Configuration")
# chroma_db_path = st.sidebar.text_input(
# "ChromaDB Path",
# value="./chroma_db",
# help="Directory where ChromaDB collections will be stored"
# )
# # Main content
# st.write("Upload a file and ask questions about its content!")
# # File uploader
# uploaded_file = st.file_uploader("Upload a file", type=["pdf", "docx", "txt"])
# # Session state initialization
# if 'db' not in st.session_state:
# st.session_state.db = None
# if uploaded_file is not None:
# # Process file if not already processed
# if st.session_state.db is None:
# with st.spinner("Processing PDF file..."):
# db, error = process_uploaded_file(uploaded_file, chroma_db_path)
# if error:
# st.error(error)
# else:
# st.session_state.db = db
# st.success("File processed successfully!")
# # Question answering interface
# st.subheader("Ask a Question")
# question = st.text_input("Enter your question:")
# if question:
# if st.session_state.db is not None:
# with st.spinner("Finding answer..."):
# answer = handle_query(question, st.session_state.db)
# st.subheader("Answer:")
# st.write(answer)
# else:
# st.error("Please wait for the file to be processed or try uploading again.")
# # Clear database button
# if st.button("Clear Database"):
# st.session_state.db = None
# st.success("Database cleared. You can upload a new file.")
# if __name__ == "__main__":
# main()
import streamlit as st
import os
from typing import List
import time
from pdf_utils import extract_text_from_file, split_text
from chroma_db_utils import create_chroma_db
from query_handler import handle_query
def initialize_session_state():
"""Initialize session state variables."""
if 'messages' not in st.session_state:
st.session_state.messages = []
if 'db' not in st.session_state:
st.session_state.db = None
if 'chunks' not in st.session_state:
st.session_state.chunks = []
def process_uploaded_file(uploaded_file) -> List[str]:
"""Process the uploaded file and return text chunks."""
# Create a temporary file to store the uploaded content
with open(uploaded_file.name, "wb") as f:
f.write(uploaded_file.getbuffer())
try:
# Extract text from the file
extracted_text = extract_text_from_file(uploaded_file.name)
if extracted_text:
# Split text into chunks
chunks = split_text(extracted_text)
return chunks
else:
st.error("No text could be extracted from the file.")
return []
finally:
# Clean up temporary file
if os.path.exists(uploaded_file.name):
os.remove(uploaded_file.name)
def main():
st.title("πŸ“š Document Q&A System")
# Initialize session state
initialize_session_state()
# Sidebar for file upload
with st.sidebar:
st.header("Document Upload")
uploaded_file = st.file_uploader(
"Upload your document",
type=['pdf', 'docx', 'txt'],
help="Supported formats: PDF, DOCX, TXT"
)
if uploaded_file:
with st.spinner("Processing document..."):
# Process the uploaded file
chunks = process_uploaded_file(uploaded_file)
if chunks:
# Create/update the database
st.session_state.chunks = chunks
st.session_state.db = create_chroma_db(chunks)
st.success(f"Document processed! Created {len(chunks)} chunks.")
# Add system message to chat history
if not st.session_state.messages:
st.session_state.messages.append({
"role": "system",
"content": "I've processed your document. You can now ask questions about it!"
})
# Main chat interface
st.header("πŸ’¬ Chat")
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
# Chat input
if prompt := st.chat_input("Ask a question about your document"):
# Only process if we have a database
if st.session_state.db is None:
st.error("Please upload a document first!")
return
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message
with st.chat_message("user"):
st.write(prompt)
# Generate and display assistant response
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
try:
response = handle_query(prompt, st.session_state.db)
st.write(response)
# Add assistant response to chat history
st.session_state.messages.append({
"role": "assistant",
"content": response
})
except Exception as e:
st.error(f"Error generating response: {str(e)}")
# Add a clear chat button
if st.sidebar.button("Clear Chat"):
st.session_state.messages = []
st.experimental_rerun()
if __name__ == "__main__":
main()
# import streamlit as st
# from chromadb.config import Settings
# import os
# import chromadb
# from typing import List
# import time
# import google
# import datetime
# # from chroma_db_utils import create_chroma_db, get_relevant_passage
# from query_handler import generate_answer, handle_query
# from pdf_utils import extract_text_from_file, split_text
# import logging
# # Configure logging
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)
# def create_chroma_db(chunks: List[str]):
# """Create and return an ephemeral ChromaDB collection."""
# try:
# # Initialize ChromaDB with ephemeral storage
# client = chromadb.EphemeralClient()
# # Create collection
# collection_name = f"temp_collection_{int(time.time())}"
# collection = client.create_collection(name=collection_name)
# # Add documents
# collection.add(
# documents=chunks,
# ids=[f"doc_{i}" for i in range(len(chunks))]
# )
# # Verify the data was added
# verify_count = collection.count()
# print(f"Verified: Added {verify_count} documents to collection {collection_name}")
# # Store both client and collection in session state
# st.session_state.chroma_client = client
# return collection
# except Exception as e:
# print(f"Error creating ChromaDB: {str(e)}")
# return None
# def get_relevant_passage(query: str, collection):
# """Get relevant passages from the collection."""
# try:
# # Use the collection directly since it's ephemeral
# results = collection.query(
# query_texts=[query],
# n_results=2
# )
# if results and 'documents' in results:
# print(f"Found {len(results['documents'])} relevant passages")
# return results['documents']
# return None
# except Exception as e:
# print(f"Error in get_relevant_passage: {str(e)}")
# return None
# def initialize_session_state():
# """Initialize Streamlit session state variables."""
# if "chat_history" not in st.session_state:
# st.session_state.chat_history = []
# if "chroma_collection" not in st.session_state:
# st.session_state.chroma_collection = None
# if "chroma_client" not in st.session_state:
# st.session_state.chroma_client = None
# def process_uploaded_file(uploaded_file) -> List[str]:
# """Process the uploaded file and return text chunks."""
# temp_file_path = f"/tmp/{uploaded_file.name}"
# try:
# with open(temp_file_path, "wb") as f:
# f.write(uploaded_file.getbuffer())
# # Extract text from the file
# extracted_text = extract_text_from_file(temp_file_path)
# if extracted_text:
# # Split text into chunks
# chunks = split_text(extracted_text)
# return chunks
# else:
# st.error("No text could be extracted from the file.")
# return []
# finally:
# if os.path.exists(temp_file_path):
# os.remove(temp_file_path)
# def chat_interface():
# st.title("Chat with Your Documents πŸ“„πŸ’¬")
# # Debug: Print current state
# print(f"Current chroma_collection state: {st.session_state.chroma_collection}")
# uploaded_files = st.file_uploader(
# "Upload your files (TXT, PDF)",
# accept_multiple_files=True,
# type=['txt', 'pdf']
# )
# if uploaded_files and st.button("Process Files"):
# with st.spinner("Processing files..."):
# all_chunks = []
# for uploaded_file in uploaded_files:
# chunks = process_uploaded_file(uploaded_file)
# print(f"Processed {len(chunks)} chunks from {uploaded_file.name}")
# if chunks:
# all_chunks.extend(chunks)
# if all_chunks:
# print(f"Creating ChromaDB with {len(all_chunks)} total chunks")
# # Create ChromaDB collection with all documents
# db = create_chroma_db(all_chunks)
# if db:
# # Verify the collection immediately after creation
# try:
# verify_count = db.count()
# print(f"Verification - Collection size: {verify_count}")
# # Try a test query
# test_query = db.query(
# query_texts=["test verification query"],
# n_results=1
# )
# print("Verification - Query test successful")
# st.session_state.chroma_collection = db
# st.success(f"Files processed successfully! {verify_count} chunks loaded.")
# except Exception as e:
# print(f"Verification failed: {str(e)}")
# st.error("Database verification failed")
# else:
# st.error("Failed to create database")
# # Query interface
# if st.session_state.chroma_collection is not None:
# print("ChromaDB collection found in session state")
# query = st.text_input("Ask a question about your documents:")
# if st.button("Send") and query:
# print(f"Processing query: {query}")
# with st.spinner("Generating response..."):
# try:
# # Verify both client and collection exist
# if st.session_state.chroma_client is None or st.session_state.chroma_collection is None:
# st.error("Please upload documents first")
# return
# collection = st.session_state.chroma_collection
# print(f"Collection name: {collection.name}")
# print(f"Collection size: {collection.count()}")
# relevant_passages = get_relevant_passage(query, collection)
# if relevant_passages:
# response = handle_query(query, relevant_passages)
# st.session_state.chat_history.append((query, response))
# else:
# st.warning("No relevant information found in the documents.")
# except Exception as e:
# print(f"Full error during query processing: {str(e)}")
# logger.exception("Detailed error trace:") # This will log the full stack trace
# st.error("Failed to process your question. Please try again.")
# else:
# print("No ChromaDB collection in session state")
# if __name__ == "__main__":
# initialize_session_state()
# chat_interface()