Smart_AAS_v2.0 / app.py
TahaRasouli's picture
Update app.py
984b32f verified
raw
history blame
13.7 kB
import streamlit as st
import os
import tempfile
from typing import List
from unified_document_processor import UnifiedDocumentProcessor, CustomEmbeddingFunction
import chromadb
from chromadb.config import Settings
from groq import Groq
def initialize_session_state():
"""Initialize all session state variables"""
if 'CHROMADB_DIR' not in st.session_state:
st.session_state.CHROMADB_DIR = os.path.join(os.getcwd(), 'chromadb_data')
os.makedirs(st.session_state.CHROMADB_DIR, exist_ok=True)
if 'processed_files' not in st.session_state:
st.session_state.processed_files = dict(pdf=[], xml=[])
if 'processor' not in st.session_state:
try:
st.session_state.processor = None # Will be initialized in StreamlitDocProcessor
except Exception as e:
st.error(f"Error initializing processor: {str(e)}")
class StreamlitDocProcessor:
def __init__(self):
if st.session_state.processor is None:
try:
groq_api_key = st.secrets["GROQ_API_KEY"]
# Initialize processor with persistent ChromaDB
st.session_state.processor = self.initialize_processor(groq_api_key)
# Update processed files after initializing processor
st.session_state.processed_files = self.get_processed_files()
except Exception as e:
st.error(f"Error initializing processor: {str(e)}")
return
def initialize_processor(self, groq_api_key):
"""Initialize the processor with persistent ChromaDB"""
class PersistentUnifiedDocumentProcessor(UnifiedDocumentProcessor):
def __init__(self, api_key, collection_name="unified_content", persist_dir=None):
self.groq_client = Groq(api_key=api_key)
self.max_elements_per_chunk = 50
self.pdf_chunk_size = 500
self.pdf_overlap = 50
self._initialize_nltk()
# Initialize persistent ChromaDB
self.chroma_client = chromadb.PersistentClient(
path=persist_dir,
settings=Settings(
allow_reset=True,
is_persistent=True
)
)
# Get or create collection
try:
self.collection = self.chroma_client.get_collection(
name=collection_name,
embedding_function=CustomEmbeddingFunction()
)
except:
self.collection = self.chroma_client.create_collection(
name=collection_name,
embedding_function=CustomEmbeddingFunction()
)
return PersistentUnifiedDocumentProcessor(
groq_api_key,
persist_dir=st.session_state.CHROMADB_DIR
)
def get_processed_files(self) -> dict:
"""Get list of processed files from ChromaDB"""
try:
if st.session_state.processor:
return st.session_state.processor.get_available_files()
return dict(pdf=[], xml=[])
except Exception as e:
st.error(f"Error getting processed files: {str(e)}")
return dict(pdf=[], xml=[])
def run(self):
st.title("Document Assistant")
# Create sidebar for navigation
st.sidebar.title("Navigation")
page = st.sidebar.selectbox(
"Choose a page",
["Upload & Process", "Query"]
)
# Add sidebar information
with st.sidebar.expander("About"):
st.write("""
This application allows you to:
- Upload PDF and XML documents
- Process them for semantic search
- Query the documents with different levels of detail
""")
if page == "Upload & Process":
self.upload_and_process_page()
else:
self.qa_page()
def upload_and_process_page(self):
st.header("Upload and Process Documents")
uploaded_files = st.file_uploader(
"Upload PDF or XML files",
type=['pdf', 'xml'],
accept_multiple_files=True
)
if uploaded_files:
for uploaded_file in uploaded_files:
# Create progress containers
progress_bar = st.progress(0)
status_container = st.empty()
file_ext = os.path.splitext(uploaded_file.name)[1][1:]
if uploaded_file.name not in st.session_state.processed_files.get(file_ext, []):
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file:
tmp_file.write(uploaded_file.getbuffer())
temp_path = tmp_file.name
status_container.text(f'Processing {uploaded_file.name}...')
if file_ext == 'xml':
# Add processing status updates
status_container.text('Parsing XML...')
progress_bar.progress(10)
result = st.session_state.processor.process_file(temp_path)
if result['success']:
total_chunks = result['total_chunks']
for i, chunk_result in enumerate(result['results']):
# Update progress for each batch
progress = min(95, int(10 + (85 * (i / total_chunks))))
progress_bar.progress(progress)
status_container.text(f'Processing chunk {i+1}/{total_chunks}...')
else:
# Regular PDF processing
result = st.session_state.processor.process_file(temp_path)
if result['success']:
if file_ext not in st.session_state.processed_files:
st.session_state.processed_files[file_ext] = []
st.session_state.processed_files[file_ext].append(uploaded_file.name)
progress_bar.progress(100)
status_container.success(f"Successfully processed {uploaded_file.name}")
else:
progress_bar.progress(100)
status_container.error(f"Failed to process {uploaded_file.name}: {result['error']}")
except Exception as e:
status_container.error(f"Error processing {uploaded_file.name}: {str(e)}")
finally:
try:
os.unlink(temp_path)
except:
pass
else:
status_container.info(f"{uploaded_file.name} has already been processed")
progress_bar.progress(100)
def qa_page(self):
st.header("Query Documents")
try:
# Refresh available files
st.session_state.processed_files = self.get_processed_files()
if not any(st.session_state.processed_files.values()):
st.warning("No processed files available. Please upload and process some files first.")
return
# Create combined list of files with icons
all_files = []
for file in st.session_state.processed_files.get('xml', []):
all_files.append(f"πŸ“± {file}")
for file in st.session_state.processed_files.get('pdf', []):
all_files.append(f"πŸ“„ {file}")
if not all_files:
st.warning("No processed files available. Please upload and process some files first.")
return
# File selection
selected_files = st.multiselect(
"Select files to search through",
sorted(all_files),
default=all_files
)
# Remove icons from selected files
selected_files = [f.split(' ', 1)[1] for f in selected_files]
if not selected_files:
st.warning("Please select at least one file to search through.")
return
# Question input
question = st.text_input("Enter your question:")
if question:
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Quick Answer"):
try:
with st.spinner("Getting quick answer..."):
answer = st.session_state.processor.ask_question_selective(
question,
selected_files
)
st.write("Answer:", answer)
except Exception as e:
st.error(f"Error getting answer: {str(e)}")
with col2:
if st.button("Detailed Answer"):
try:
with st.spinner("Getting detailed answer..."):
result = st.session_state.processor.get_detailed_context(
question,
selected_files
)
if result['success']:
st.write("### Relevant Information")
for item in result['results']:
with st.expander(f"Source: {item['metadata']['source_file']} ({item['metadata']['content_type'].upper()})"):
# Use similarity_score instead of relevance_score
st.write(f"Similarity Score: {item['similarity_score']}%")
if item['metadata']['content_type'] == 'xml':
st.write(f"XML Path: {item['source_info']['path']}")
st.write("Content:", item['content'])
else:
st.error(result['error'])
except Exception as e:
st.error(f"Error getting detailed answer: {str(e)}")
with col3:
if st.button("Complete Analysis"):
try:
with st.spinner("Performing complete analysis..."):
result = st.session_state.processor.get_summary_and_details(
question,
selected_files
)
if result['success']:
st.write("### Summary")
st.write(result['summary'])
st.write("### Detailed Information")
for item in result['details']:
with st.expander(f"Source: {item['metadata']['source_file']} ({item['metadata']['content_type'].upper()})"):
# Use similarity_score instead of relevance_score
st.write(f"Similarity Score: {item.get('similarity_score', 'N/A')}%")
if item['metadata']['content_type'] == 'xml':
st.write(f"XML Path: {item['source_info']['path']}")
if 'parent_info' in item:
st.write("Parent Element:", item['parent_info']['content'])
if 'children_info' in item:
st.write("Related Elements:")
for child in item['children_info']:
st.write(f"- {child['content']}")
st.write("Content:", item['content'])
else:
st.error(result['error'])
except Exception as e:
st.error(f"Error getting complete analysis: {str(e)}")
except Exception as e:
st.error(f"Error in Q&A interface: {str(e)}")
def main():
# Set page config
st.set_page_config(
page_title="Document Assistant",
page_icon="πŸ“š",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state
initialize_session_state()
# Create and run app
app = StreamlitDocProcessor()
app.run()
if __name__ == "__main__":
main()