# import streamlit as st | |
# from pdf_utils import extract_text_from_file, split_text | |
# from chroma_db_utils import create_chroma_db, load_chroma_collection | |
# from query_handler import handle_query | |
# import os | |
# import re | |
# import tempfile | |
# def generate_collection_name(file_path=None): | |
# """Generate a valid collection name from a file path.""" | |
# base_name = os.path.basename(file_path) if file_path else "collection" | |
# # Remove file extension | |
# base_name = re.sub(r'\..*$', '', base_name) | |
# # Replace invalid characters and ensure it starts with a letter | |
# base_name = re.sub(r'\W+', '_', base_name) | |
# base_name = re.sub(r'^[^a-zA-Z]+', '', base_name) | |
# return base_name | |
# def process_uploaded_file(uploaded_file, chroma_db_path): | |
# """Process the uploaded file and create/load ChromaDB collection.""" | |
# # Create a temporary file to store the uploaded content | |
# with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
# tmp_file.write(uploaded_file.getvalue()) | |
# file_path = tmp_file.name | |
# try: | |
# # Generate collection name from original filename | |
# collection_name = generate_collection_name(uploaded_file.name) | |
# # Extract and process text | |
# file_text = extract_text_from_file(file_path) | |
# if file_text is None: | |
# return None, "Failed to extract text from the file." | |
# chunked_text = split_text(file_text) | |
# # Try to load existing collection or create new one | |
# try: | |
# db = load_chroma_collection(collection_name, chroma_db_path) | |
# st.success("Loaded existing ChromaDB collection.") | |
# except Exception: | |
# db = create_chroma_db(chunked_text, collection_name, chroma_db_path) | |
# st.success("Created new ChromaDB collection.") | |
# return db, None | |
# except Exception as e: | |
# return None, f"Error processing file: {str(e)}" | |
# finally: | |
# # Clean up temporary file | |
# os.unlink(file_path) | |
# def main(): | |
# st.title("File Question Answering System") | |
# # Sidebar for configuration | |
# st.sidebar.header("Configuration") | |
# chroma_db_path = st.sidebar.text_input( | |
# "ChromaDB Path", | |
# value="./chroma_db", | |
# help="Directory where ChromaDB collections will be stored" | |
# ) | |
# # Main content | |
# st.write("Upload a file and ask questions about its content!") | |
# # File uploader | |
# uploaded_file = st.file_uploader("Upload a file", type=["pdf", "docx", "txt"]) | |
# # Session state initialization | |
# if 'db' not in st.session_state: | |
# st.session_state.db = None | |
# if uploaded_file is not None: | |
# # Process file if not already processed | |
# if st.session_state.db is None: | |
# with st.spinner("Processing PDF file..."): | |
# db, error = process_uploaded_file(uploaded_file, chroma_db_path) | |
# if error: | |
# st.error(error) | |
# else: | |
# st.session_state.db = db | |
# st.success("File processed successfully!") | |
# # Question answering interface | |
# st.subheader("Ask a Question") | |
# question = st.text_input("Enter your question:") | |
# if question: | |
# if st.session_state.db is not None: | |
# with st.spinner("Finding answer..."): | |
# answer = handle_query(question, st.session_state.db) | |
# st.subheader("Answer:") | |
# st.write(answer) | |
# else: | |
# st.error("Please wait for the file to be processed or try uploading again.") | |
# # Clear database button | |
# if st.button("Clear Database"): | |
# st.session_state.db = None | |
# st.success("Database cleared. You can upload a new file.") | |
# if __name__ == "__main__": | |
# main() | |
import streamlit as st | |
import os | |
from typing import List | |
import time | |
from pdf_utils import extract_text_from_file, split_text | |
from chroma_db_utils import create_chroma_db | |
from query_handler import handle_query | |
def initialize_session_state(): | |
"""Initialize session state variables.""" | |
if 'messages' not in st.session_state: | |
st.session_state.messages = [] | |
if 'db' not in st.session_state: | |
st.session_state.db = None | |
if 'chunks' not in st.session_state: | |
st.session_state.chunks = [] | |
def process_uploaded_file(uploaded_file) -> List[str]: | |
"""Process the uploaded file and return text chunks.""" | |
# Create a temporary file to store the uploaded content | |
with open(uploaded_file.name, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
try: | |
# Extract text from the file | |
extracted_text = extract_text_from_file(uploaded_file.name) | |
if extracted_text: | |
# Split text into chunks | |
chunks = split_text(extracted_text) | |
return chunks | |
else: | |
st.error("No text could be extracted from the file.") | |
return [] | |
finally: | |
# Clean up temporary file | |
if os.path.exists(uploaded_file.name): | |
os.remove(uploaded_file.name) | |
def main(): | |
st.title("π Document Q&A System") | |
# Initialize session state | |
initialize_session_state() | |
# Sidebar for file upload | |
with st.sidebar: | |
st.header("Document Upload") | |
uploaded_file = st.file_uploader( | |
"Upload your document", | |
type=['pdf', 'docx', 'txt'], | |
help="Supported formats: PDF, DOCX, TXT" | |
) | |
if uploaded_file: | |
with st.spinner("Processing document..."): | |
# Process the uploaded file | |
chunks = process_uploaded_file(uploaded_file) | |
if chunks: | |
# Create/update the database | |
st.session_state.chunks = chunks | |
st.session_state.db = create_chroma_db(chunks) | |
st.success(f"Document processed! Created {len(chunks)} chunks.") | |
# Add system message to chat history | |
if not st.session_state.messages: | |
st.session_state.messages.append({ | |
"role": "system", | |
"content": "I've processed your document. You can now ask questions about it!" | |
}) | |
# Main chat interface | |
st.header("π¬ Chat") | |
# Display chat messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.write(message["content"]) | |
# Chat input | |
if prompt := st.chat_input("Ask a question about your document"): | |
# Only process if we have a database | |
if st.session_state.db is None: | |
st.error("Please upload a document first!") | |
return | |
# Add user message to chat history | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Display user message | |
with st.chat_message("user"): | |
st.write(prompt) | |
# Generate and display assistant response | |
with st.chat_message("assistant"): | |
with st.spinner("Thinking..."): | |
try: | |
response = handle_query(prompt, st.session_state.db) | |
st.write(response) | |
# Add assistant response to chat history | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": response | |
}) | |
except Exception as e: | |
st.error(f"Error generating response: {str(e)}") | |
# Add a clear chat button | |
if st.sidebar.button("Clear Chat"): | |
st.session_state.messages = [] | |
st.experimental_rerun() | |
if __name__ == "__main__": | |
main() | |
# import streamlit as st | |
# from chromadb.config import Settings | |
# import os | |
# import chromadb | |
# from typing import List | |
# import time | |
# import google | |
# import datetime | |
# # from chroma_db_utils import create_chroma_db, get_relevant_passage | |
# from query_handler import generate_answer, handle_query | |
# from pdf_utils import extract_text_from_file, split_text | |
# import logging | |
# # Configure logging | |
# logging.basicConfig(level=logging.INFO) | |
# logger = logging.getLogger(__name__) | |
# def create_chroma_db(chunks: List[str]): | |
# """Create and return an ephemeral ChromaDB collection.""" | |
# try: | |
# # Initialize ChromaDB with ephemeral storage | |
# client = chromadb.EphemeralClient() | |
# # Create collection | |
# collection_name = f"temp_collection_{int(time.time())}" | |
# collection = client.create_collection(name=collection_name) | |
# # Add documents | |
# collection.add( | |
# documents=chunks, | |
# ids=[f"doc_{i}" for i in range(len(chunks))] | |
# ) | |
# # Verify the data was added | |
# verify_count = collection.count() | |
# print(f"Verified: Added {verify_count} documents to collection {collection_name}") | |
# # Store both client and collection in session state | |
# st.session_state.chroma_client = client | |
# return collection | |
# except Exception as e: | |
# print(f"Error creating ChromaDB: {str(e)}") | |
# return None | |
# def get_relevant_passage(query: str, collection): | |
# """Get relevant passages from the collection.""" | |
# try: | |
# # Use the collection directly since it's ephemeral | |
# results = collection.query( | |
# query_texts=[query], | |
# n_results=2 | |
# ) | |
# if results and 'documents' in results: | |
# print(f"Found {len(results['documents'])} relevant passages") | |
# return results['documents'] | |
# return None | |
# except Exception as e: | |
# print(f"Error in get_relevant_passage: {str(e)}") | |
# return None | |
# def initialize_session_state(): | |
# """Initialize Streamlit session state variables.""" | |
# if "chat_history" not in st.session_state: | |
# st.session_state.chat_history = [] | |
# if "chroma_collection" not in st.session_state: | |
# st.session_state.chroma_collection = None | |
# if "chroma_client" not in st.session_state: | |
# st.session_state.chroma_client = None | |
# def process_uploaded_file(uploaded_file) -> List[str]: | |
# """Process the uploaded file and return text chunks.""" | |
# temp_file_path = f"/tmp/{uploaded_file.name}" | |
# try: | |
# with open(temp_file_path, "wb") as f: | |
# f.write(uploaded_file.getbuffer()) | |
# # Extract text from the file | |
# extracted_text = extract_text_from_file(temp_file_path) | |
# if extracted_text: | |
# # Split text into chunks | |
# chunks = split_text(extracted_text) | |
# return chunks | |
# else: | |
# st.error("No text could be extracted from the file.") | |
# return [] | |
# finally: | |
# if os.path.exists(temp_file_path): | |
# os.remove(temp_file_path) | |
# def chat_interface(): | |
# st.title("Chat with Your Documents ππ¬") | |
# # Debug: Print current state | |
# print(f"Current chroma_collection state: {st.session_state.chroma_collection}") | |
# uploaded_files = st.file_uploader( | |
# "Upload your files (TXT, PDF)", | |
# accept_multiple_files=True, | |
# type=['txt', 'pdf'] | |
# ) | |
# if uploaded_files and st.button("Process Files"): | |
# with st.spinner("Processing files..."): | |
# all_chunks = [] | |
# for uploaded_file in uploaded_files: | |
# chunks = process_uploaded_file(uploaded_file) | |
# print(f"Processed {len(chunks)} chunks from {uploaded_file.name}") | |
# if chunks: | |
# all_chunks.extend(chunks) | |
# if all_chunks: | |
# print(f"Creating ChromaDB with {len(all_chunks)} total chunks") | |
# # Create ChromaDB collection with all documents | |
# db = create_chroma_db(all_chunks) | |
# if db: | |
# # Verify the collection immediately after creation | |
# try: | |
# verify_count = db.count() | |
# print(f"Verification - Collection size: {verify_count}") | |
# # Try a test query | |
# test_query = db.query( | |
# query_texts=["test verification query"], | |
# n_results=1 | |
# ) | |
# print("Verification - Query test successful") | |
# st.session_state.chroma_collection = db | |
# st.success(f"Files processed successfully! {verify_count} chunks loaded.") | |
# except Exception as e: | |
# print(f"Verification failed: {str(e)}") | |
# st.error("Database verification failed") | |
# else: | |
# st.error("Failed to create database") | |
# # Query interface | |
# if st.session_state.chroma_collection is not None: | |
# print("ChromaDB collection found in session state") | |
# query = st.text_input("Ask a question about your documents:") | |
# if st.button("Send") and query: | |
# print(f"Processing query: {query}") | |
# with st.spinner("Generating response..."): | |
# try: | |
# # Verify both client and collection exist | |
# if st.session_state.chroma_client is None or st.session_state.chroma_collection is None: | |
# st.error("Please upload documents first") | |
# return | |
# collection = st.session_state.chroma_collection | |
# print(f"Collection name: {collection.name}") | |
# print(f"Collection size: {collection.count()}") | |
# relevant_passages = get_relevant_passage(query, collection) | |
# if relevant_passages: | |
# response = handle_query(query, relevant_passages) | |
# st.session_state.chat_history.append((query, response)) | |
# else: | |
# st.warning("No relevant information found in the documents.") | |
# except Exception as e: | |
# print(f"Full error during query processing: {str(e)}") | |
# logger.exception("Detailed error trace:") # This will log the full stack trace | |
# st.error("Failed to process your question. Please try again.") | |
# else: | |
# print("No ChromaDB collection in session state") | |
# if __name__ == "__main__": | |
# initialize_session_state() | |
# chat_interface() |