Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import tempfile | |
from typing import List | |
from unified_document_processor import UnifiedDocumentProcessor, CustomEmbeddingFunction | |
import chromadb | |
from chromadb.config import Settings | |
from groq import Groq | |
def initialize_session_state(): | |
"""Initialize all session state variables""" | |
if 'CHROMADB_DIR' not in st.session_state: | |
st.session_state.CHROMADB_DIR = os.path.join(os.getcwd(), 'chromadb_data') | |
os.makedirs(st.session_state.CHROMADB_DIR, exist_ok=True) | |
if 'processed_files' not in st.session_state: | |
st.session_state.processed_files = dict(pdf=[], xml=[]) | |
if 'processor' not in st.session_state: | |
try: | |
st.session_state.processor = None # Will be initialized in StreamlitDocProcessor | |
except Exception as e: | |
st.error(f"Error initializing processor: {str(e)}") | |
class StreamlitDocProcessor: | |
def __init__(self): | |
if st.session_state.processor is None: | |
try: | |
groq_api_key = st.secrets["GROQ_API_KEY"] | |
# Initialize processor with persistent ChromaDB | |
st.session_state.processor = self.initialize_processor(groq_api_key) | |
# Update processed files after initializing processor | |
st.session_state.processed_files = self.get_processed_files() | |
except Exception as e: | |
st.error(f"Error initializing processor: {str(e)}") | |
return | |
def initialize_processor(self, groq_api_key): | |
"""Initialize the processor with persistent ChromaDB""" | |
class PersistentUnifiedDocumentProcessor(UnifiedDocumentProcessor): | |
def __init__(self, api_key, collection_name="unified_content", persist_dir=None): | |
self.groq_client = Groq(api_key=api_key) | |
self.max_elements_per_chunk = 50 | |
self.pdf_chunk_size = 500 | |
self.pdf_overlap = 50 | |
self._initialize_nltk() | |
# Initialize persistent ChromaDB | |
self.chroma_client = chromadb.PersistentClient( | |
path=persist_dir, | |
settings=Settings( | |
allow_reset=True, | |
is_persistent=True | |
) | |
) | |
# Get or create collection | |
try: | |
self.collection = self.chroma_client.get_collection( | |
name=collection_name, | |
embedding_function=CustomEmbeddingFunction() | |
) | |
except: | |
self.collection = self.chroma_client.create_collection( | |
name=collection_name, | |
embedding_function=CustomEmbeddingFunction() | |
) | |
return PersistentUnifiedDocumentProcessor( | |
groq_api_key, | |
persist_dir=st.session_state.CHROMADB_DIR | |
) | |
def get_processed_files(self) -> dict: | |
"""Get list of processed files from ChromaDB""" | |
try: | |
if st.session_state.processor: | |
return st.session_state.processor.get_available_files() | |
return dict(pdf=[], xml=[]) | |
except Exception as e: | |
st.error(f"Error getting processed files: {str(e)}") | |
return dict(pdf=[], xml=[]) | |
def run(self): | |
st.title("Document Assistant") | |
# Create sidebar for navigation | |
st.sidebar.title("Navigation") | |
page = st.sidebar.selectbox( | |
"Choose a page", | |
["Upload & Process", "Query"] | |
) | |
# Add sidebar information | |
with st.sidebar.expander("About"): | |
st.write(""" | |
This application allows you to: | |
- Upload PDF and XML documents | |
- Process them for semantic search | |
- Query the documents with different levels of detail | |
""") | |
if page == "Upload & Process": | |
self.upload_and_process_page() | |
else: | |
self.qa_page() | |
def upload_and_process_page(self): | |
st.header("Upload and Process Documents") | |
# Add instructions | |
with st.expander("Instructions", expanded=True): | |
st.write(""" | |
1. Click 'Browse files' to select documents | |
2. You can select multiple files at once | |
3. Supported formats: PDF and XML | |
4. Wait for processing to complete | |
5. Processed files will be listed below | |
""") | |
# File uploader | |
uploaded_files = st.file_uploader( | |
"Upload PDF or XML files", | |
type=['pdf', 'xml'], | |
accept_multiple_files=True, | |
help="Select one or more PDF or XML files to upload" | |
) | |
if uploaded_files: | |
for uploaded_file in uploaded_files: | |
# Create progress bar and status container | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
progress_bar = st.progress(0) | |
with col2: | |
status_text = st.empty() | |
# Check if file is already processed | |
file_ext = os.path.splitext(uploaded_file.name)[1][1:] # Get extension without dot | |
if uploaded_file.name not in st.session_state.processed_files.get(file_ext, []): | |
try: | |
# Create a temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file: | |
tmp_file.write(uploaded_file.getbuffer()) | |
temp_path = tmp_file.name | |
# Process the file | |
status_text.info('Processing...') | |
progress_bar.progress(25) | |
result = st.session_state.processor.process_file(temp_path) | |
progress_bar.progress(75) | |
if result['success']: | |
if file_ext not in st.session_state.processed_files: | |
st.session_state.processed_files[file_ext] = [] | |
st.session_state.processed_files[file_ext].append(uploaded_file.name) | |
progress_bar.progress(100) | |
status_text.success("β Success") | |
else: | |
progress_bar.progress(100) | |
status_text.error("β Failed") | |
st.error(f"Failed to process {uploaded_file.name}: {result['error']}") | |
except Exception as e: | |
status_text.error("β Error") | |
st.error(f"Error processing {uploaded_file.name}: {str(e)}") | |
finally: | |
# Clean up temporary file | |
try: | |
os.unlink(temp_path) | |
except: | |
pass | |
else: | |
status_text.info("Already processed") | |
progress_bar.progress(100) | |
# Display processed files | |
if any(st.session_state.processed_files.values()): | |
st.subheader("Processed Files") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.session_state.processed_files.get('xml'): | |
st.write("π± XML Files:") | |
for file in sorted(st.session_state.processed_files['xml']): | |
st.text(f" β’ {file}") | |
with col2: | |
if st.session_state.processed_files.get('pdf'): | |
st.write("π PDF Files:") | |
for file in sorted(st.session_state.processed_files['pdf']): | |
st.text(f" β’ {file}") | |
def qa_page(self): | |
st.header("Query Documents") | |
try: | |
# Refresh available files | |
st.session_state.processed_files = self.get_processed_files() | |
if not any(st.session_state.processed_files.values()): | |
st.warning("No processed files available. Please upload and process some files first.") | |
return | |
# Create combined list of files with icons | |
all_files = [] | |
for file in st.session_state.processed_files.get('xml', []): | |
all_files.append(f"π± {file}") | |
for file in st.session_state.processed_files.get('pdf', []): | |
all_files.append(f"π {file}") | |
if not all_files: | |
st.warning("No processed files available. Please upload and process some files first.") | |
return | |
# Add query instructions | |
with st.expander("Query Instructions", expanded=True): | |
st.write(""" | |
Choose your query type: | |
- **Quick Answer**: Basic response with essential information | |
- **Detailed Answer**: Shows sources and relevance with expandable details | |
- **Complete Analysis**: Provides summary and full breakdown with XML hierarchies | |
""") | |
# File selection | |
selected_files = st.multiselect( | |
"Select files to search through", | |
sorted(all_files), | |
default=all_files, | |
help="Choose which files to include in your search" | |
) | |
# Remove icons from selected files | |
selected_files = [f.split(' ', 1)[1] for f in selected_files] | |
if not selected_files: | |
st.warning("Please select at least one file to search through.") | |
return | |
# Question input | |
question = st.text_input( | |
"Enter your question:", | |
help="Type your question here and choose a query type below" | |
) | |
if question: | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
if st.button("Quick Answer", help="Get a concise answer quickly"): | |
try: | |
with st.spinner("Getting quick answer..."): | |
answer = st.session_state.processor.ask_question_selective( | |
question, | |
selected_files | |
) | |
st.write("Answer:", answer) | |
except Exception as e: | |
st.error(f"Error getting answer: {str(e)}") | |
with col2: | |
if st.button("Detailed Answer", help="Get answer with sources and relevance scores"): | |
try: | |
with st.spinner("Getting detailed answer..."): | |
result = st.session_state.processor.get_detailed_context( | |
question, | |
selected_files | |
) | |
if result['success']: | |
st.write("### Relevant Information") | |
for item in result['results']: | |
with st.expander(f"Source: {item['metadata']['source_file']} ({item['metadata']['content_type'].upper()})"): | |
st.write(f"Relevance Score: {item['relevance_score']:.2f}") | |
if item['metadata']['content_type'] == 'xml': | |
st.write(f"XML Path: {item['source_info']['path']}") | |
st.write("Content:", item['content']) | |
else: | |
st.error(result['error']) | |
except Exception as e: | |
st.error(f"Error getting detailed answer: {str(e)}") | |
with col3: | |
if st.button("Complete Analysis", help="Get comprehensive analysis with XML hierarchy"): | |
try: | |
with st.spinner("Performing complete analysis..."): | |
result = st.session_state.processor.get_summary_and_details( | |
question, | |
selected_files | |
) | |
if result['success']: | |
st.write("### Summary") | |
st.write(result['summary']) | |
st.write("### Detailed Information") | |
for item in result['details']: | |
with st.expander(f"Source: {item['metadata']['source_file']} ({item['metadata']['content_type'].upper()})"): | |
st.write(f"Relevance Score: {item['relevance_score']:.2f}") | |
if item['metadata']['content_type'] == 'xml': | |
st.write(f"XML Path: {item['source_info']['path']}") | |
if 'parent_info' in item: | |
st.write("Parent Element:", item['parent_info']['content']) | |
if 'children_info' in item: | |
st.write("Related Elements:") | |
for child in item['children_info']: | |
st.write(f"- {child['content']}") | |
st.write("Content:", item['content']) | |
else: | |
st.error(result['error']) | |
except Exception as e: | |
st.error(f"Error getting complete analysis: {str(e)}") | |
except Exception as e: | |
st.error(f"Error in Q&A interface: {str(e)}") | |
def main(): | |
# Set page config | |
st.set_page_config( | |
page_title="Document Assistant", | |
page_icon="π", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Initialize session state | |
initialize_session_state() | |
# Create and run app | |
app = StreamlitDocProcessor() | |
app.run() | |
if __name__ == "__main__": | |
main() |