Smart_AAS_v2.0 / app.py
TahaRasouli's picture
Update app.py
412b7cf verified
raw
history blame
14.9 kB
import streamlit as st
import os
import tempfile
from typing import List
from unified_document_processor import UnifiedDocumentProcessor, CustomEmbeddingFunction
import chromadb
from chromadb.config import Settings
from groq import Groq
def initialize_session_state():
"""Initialize all session state variables"""
if 'CHROMADB_DIR' not in st.session_state:
st.session_state.CHROMADB_DIR = os.path.join(os.getcwd(), 'chromadb_data')
os.makedirs(st.session_state.CHROMADB_DIR, exist_ok=True)
if 'processed_files' not in st.session_state:
st.session_state.processed_files = dict(pdf=[], xml=[])
if 'processor' not in st.session_state:
try:
st.session_state.processor = None # Will be initialized in StreamlitDocProcessor
except Exception as e:
st.error(f"Error initializing processor: {str(e)}")
class StreamlitDocProcessor:
def __init__(self):
if st.session_state.processor is None:
try:
groq_api_key = st.secrets["GROQ_API_KEY"]
# Initialize processor with persistent ChromaDB
st.session_state.processor = self.initialize_processor(groq_api_key)
# Update processed files after initializing processor
st.session_state.processed_files = self.get_processed_files()
except Exception as e:
st.error(f"Error initializing processor: {str(e)}")
return
def initialize_processor(self, groq_api_key):
"""Initialize the processor with persistent ChromaDB"""
class PersistentUnifiedDocumentProcessor(UnifiedDocumentProcessor):
def __init__(self, api_key, collection_name="unified_content", persist_dir=None):
self.groq_client = Groq(api_key=api_key)
self.max_elements_per_chunk = 50
self.pdf_chunk_size = 500
self.pdf_overlap = 50
self._initialize_nltk()
# Initialize persistent ChromaDB
self.chroma_client = chromadb.PersistentClient(
path=persist_dir,
settings=Settings(
allow_reset=True,
is_persistent=True
)
)
# Get or create collection
try:
self.collection = self.chroma_client.get_collection(
name=collection_name,
embedding_function=CustomEmbeddingFunction()
)
except:
self.collection = self.chroma_client.create_collection(
name=collection_name,
embedding_function=CustomEmbeddingFunction()
)
return PersistentUnifiedDocumentProcessor(
groq_api_key,
persist_dir=st.session_state.CHROMADB_DIR
)
def get_processed_files(self) -> dict:
"""Get list of processed files from ChromaDB"""
try:
if st.session_state.processor:
return st.session_state.processor.get_available_files()
return dict(pdf=[], xml=[])
except Exception as e:
st.error(f"Error getting processed files: {str(e)}")
return dict(pdf=[], xml=[])
def run(self):
st.title("Document Assistant")
# Create sidebar for navigation
st.sidebar.title("Navigation")
page = st.sidebar.selectbox(
"Choose a page",
["Upload & Process", "Query"]
)
# Add sidebar information
with st.sidebar.expander("About"):
st.write("""
This application allows you to:
- Upload PDF and XML documents
- Process them for semantic search
- Query the documents with different levels of detail
""")
if page == "Upload & Process":
self.upload_and_process_page()
else:
self.qa_page()
def upload_and_process_page(self):
st.header("Upload and Process Documents")
# Add instructions
with st.expander("Instructions", expanded=True):
st.write("""
1. Click 'Browse files' to select documents
2. You can select multiple files at once
3. Supported formats: PDF and XML
4. Wait for processing to complete
5. Processed files will be listed below
""")
# File uploader
uploaded_files = st.file_uploader(
"Upload PDF or XML files",
type=['pdf', 'xml'],
accept_multiple_files=True,
help="Select one or more PDF or XML files to upload"
)
if uploaded_files:
for uploaded_file in uploaded_files:
# Create progress bar and status container
col1, col2 = st.columns([3, 1])
with col1:
progress_bar = st.progress(0)
with col2:
status_text = st.empty()
# Check if file is already processed
file_ext = os.path.splitext(uploaded_file.name)[1][1:] # Get extension without dot
if uploaded_file.name not in st.session_state.processed_files.get(file_ext, []):
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file:
tmp_file.write(uploaded_file.getbuffer())
temp_path = tmp_file.name
# Process the file
status_text.info('Processing...')
progress_bar.progress(25)
result = st.session_state.processor.process_file(temp_path)
progress_bar.progress(75)
if result['success']:
if file_ext not in st.session_state.processed_files:
st.session_state.processed_files[file_ext] = []
st.session_state.processed_files[file_ext].append(uploaded_file.name)
progress_bar.progress(100)
status_text.success("βœ“ Success")
else:
progress_bar.progress(100)
status_text.error("βœ— Failed")
st.error(f"Failed to process {uploaded_file.name}: {result['error']}")
except Exception as e:
status_text.error("βœ— Error")
st.error(f"Error processing {uploaded_file.name}: {str(e)}")
finally:
# Clean up temporary file
try:
os.unlink(temp_path)
except:
pass
else:
status_text.info("Already processed")
progress_bar.progress(100)
# Display processed files
if any(st.session_state.processed_files.values()):
st.subheader("Processed Files")
col1, col2 = st.columns(2)
with col1:
if st.session_state.processed_files.get('xml'):
st.write("πŸ“± XML Files:")
for file in sorted(st.session_state.processed_files['xml']):
st.text(f" β€’ {file}")
with col2:
if st.session_state.processed_files.get('pdf'):
st.write("πŸ“„ PDF Files:")
for file in sorted(st.session_state.processed_files['pdf']):
st.text(f" β€’ {file}")
def qa_page(self):
st.header("Query Documents")
try:
# Refresh available files
st.session_state.processed_files = self.get_processed_files()
if not any(st.session_state.processed_files.values()):
st.warning("No processed files available. Please upload and process some files first.")
return
# Create combined list of files with icons
all_files = []
for file in st.session_state.processed_files.get('xml', []):
all_files.append(f"πŸ“± {file}")
for file in st.session_state.processed_files.get('pdf', []):
all_files.append(f"πŸ“„ {file}")
if not all_files:
st.warning("No processed files available. Please upload and process some files first.")
return
# Add query instructions
with st.expander("Query Instructions", expanded=True):
st.write("""
Choose your query type:
- **Quick Answer**: Basic response with essential information
- **Detailed Answer**: Shows sources and relevance with expandable details
- **Complete Analysis**: Provides summary and full breakdown with XML hierarchies
""")
# File selection
selected_files = st.multiselect(
"Select files to search through",
sorted(all_files),
default=all_files,
help="Choose which files to include in your search"
)
# Remove icons from selected files
selected_files = [f.split(' ', 1)[1] for f in selected_files]
if not selected_files:
st.warning("Please select at least one file to search through.")
return
# Question input
question = st.text_input(
"Enter your question:",
help="Type your question here and choose a query type below"
)
if question:
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Quick Answer", help="Get a concise answer quickly"):
try:
with st.spinner("Getting quick answer..."):
answer = st.session_state.processor.ask_question_selective(
question,
selected_files
)
st.write("Answer:", answer)
except Exception as e:
st.error(f"Error getting answer: {str(e)}")
with col2:
if st.button("Detailed Answer", help="Get answer with sources and relevance scores"):
try:
with st.spinner("Getting detailed answer..."):
result = st.session_state.processor.get_detailed_context(
question,
selected_files
)
if result['success']:
st.write("### Relevant Information")
for item in result['results']:
with st.expander(f"Source: {item['metadata']['source_file']} ({item['metadata']['content_type'].upper()})"):
st.write(f"Relevance Score: {item['relevance_score']:.2f}")
if item['metadata']['content_type'] == 'xml':
st.write(f"XML Path: {item['source_info']['path']}")
st.write("Content:", item['content'])
else:
st.error(result['error'])
except Exception as e:
st.error(f"Error getting detailed answer: {str(e)}")
with col3:
if st.button("Complete Analysis", help="Get comprehensive analysis with XML hierarchy"):
try:
with st.spinner("Performing complete analysis..."):
result = st.session_state.processor.get_summary_and_details(
question,
selected_files
)
if result['success']:
st.write("### Summary")
st.write(result['summary'])
st.write("### Detailed Information")
for item in result['details']:
with st.expander(f"Source: {item['metadata']['source_file']} ({item['metadata']['content_type'].upper()})"):
st.write(f"Relevance Score: {item['relevance_score']:.2f}")
if item['metadata']['content_type'] == 'xml':
st.write(f"XML Path: {item['source_info']['path']}")
if 'parent_info' in item:
st.write("Parent Element:", item['parent_info']['content'])
if 'children_info' in item:
st.write("Related Elements:")
for child in item['children_info']:
st.write(f"- {child['content']}")
st.write("Content:", item['content'])
else:
st.error(result['error'])
except Exception as e:
st.error(f"Error getting complete analysis: {str(e)}")
except Exception as e:
st.error(f"Error in Q&A interface: {str(e)}")
def main():
# Set page config
st.set_page_config(
page_title="Document Assistant",
page_icon="πŸ“š",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state
initialize_session_state()
# Create and run app
app = StreamlitDocProcessor()
app.run()
if __name__ == "__main__":
main()