version-2 / app.py
DrishtiSharma's picture
Update app.py
60453ba verified
import sys
import os
import re
import shutil
import time
import fitz
import streamlit as st
import nltk
import tempfile
import subprocess
# Pin NLTK to version 3.9.1
REQUIRED_NLTK_VERSION = "3.9.1"
subprocess.run([sys.executable, "-m", "pip", "install", f"nltk=={REQUIRED_NLTK_VERSION}"])
# Set up temporary directory for NLTK resources
nltk_data_path = os.path.join(tempfile.gettempdir(), "nltk_data")
os.makedirs(nltk_data_path, exist_ok=True)
nltk.data.path.append(nltk_data_path)
# Download 'punkt_tab' for compatibility
try:
print("Ensuring NLTK 'punkt_tab' resource is downloaded...")
nltk.download("punkt_tab", download_dir=nltk_data_path)
except Exception as e:
print(f"Error downloading NLTK 'punkt_tab': {e}")
raise e
sys.path.append(os.path.abspath("."))
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from langchain.document_loaders import UnstructuredPDFLoader
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from patent_downloader import PatentDownloader
PERSISTED_DIRECTORY = tempfile.mkdtemp()
# Fetch API key securely from the environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("Critical Error: OpenAI API key not found in the environment variables. Please configure it.")
st.stop()
def check_poppler_installed():
if not shutil.which("pdfinfo"):
raise EnvironmentError(
"Poppler is not installed or not in PATH. Install 'poppler-utils' for PDF processing."
)
check_poppler_installed()
def extract_patent_number(url):
pattern = r"/patent/([A-Z]{2}\d+)"
match = re.search(pattern, url)
return match.group(1) if match else None
def download_pdf(patent_number):
try:
patent_downloader = PatentDownloader(verbose=True)
output_path = patent_downloader.download(patents=patent_number, output_path=tempfile.gettempdir())
return output_path[0]
except Exception as e:
st.error(f"Failed to download patent PDF: {e}")
st.stop()
def clean_extracted_text(text):
"""
Cleans extracted text to remove metadata, headers, and irrelevant content.
"""
lines = text.split("\n")
cleaned_lines = []
for line in lines:
line = line.strip()
# Filter out lines with metadata patterns
if (
re.match(r"^(U\.S\.|United States|Sheet|Figure|References|Patent No|Date of Patent)", line)
or re.match(r"^\(?\d+\)?$", line) # Matches single numbers (page numbers)
or "Examiner" in line
or "Attorney" in line
or len(line) < 30 # Skip very short lines
):
continue
cleaned_lines.append(line)
return "\n".join(cleaned_lines)
def load_docs(document_path):
"""
Load and clean the PDF content, then split into chunks.
"""
try:
import fitz # PyMuPDF for text extraction
# Step 1: Extract plain text from PDF
doc = fitz.open(document_path)
extracted_text = []
for page_num, page in enumerate(doc):
page_text = page.get_text("text") # Extract text
clean_page_text = clean_extracted_text(page_text)
if clean_page_text: # Keep only non-empty cleaned text
extracted_text.append(clean_page_text)
doc.close()
# Combine all pages into one text
full_text = "\n".join(extracted_text)
st.write(f"Total Cleaned Text Length: {len(full_text)} characters")
# Step 2: Chunk the cleaned text
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
separators=["\n\n", "\n", " ", ""]
)
split_docs = text_splitter.create_documents([full_text])
st.write(f"Total Chunks After Splitting: {len(split_docs)}")
for i, doc in enumerate(split_docs[:3]): # Show first 3 chunks only
st.write(f"Chunk {i + 1}: {doc.page_content[:300]}...")
return split_docs
except Exception as e:
st.error(f"Failed to load and process PDF: {e}")
st.stop()
def initialize_vector_store(documents, persist_dir):
"""
Initialize the vector store with the provided documents.
"""
try:
# Initialize HuggingFace embeddings
embeddings = HuggingFaceEmbeddings()
# Create a Chroma vector store with the embeddings
vectordb = Chroma.from_documents(
documents=documents,
embedding=embeddings, # Pass embeddings directly
persist_directory=persist_dir
)
vectordb.persist() # Persist the vector store to disk
return vectordb
except Exception as e:
st.error(f"Failed to initialize the vector store: {e}")
st.stop()
def create_retriever(vectordb):
"""
Create a retriever from the vector store.
"""
return vectordb.as_retriever(search_kwargs={"k": 3})
def create_retrieval_chain(vectordb, api_key):
"""
Create a conversational retrieval chain with memory.
"""
retriever = create_retriever(vectordb)
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
return ConversationalRetrievalChain.from_llm(
llm=OpenAI(temperature=0, openai_api_key=api_key),
retriever=retriever,
memory=memory
)
def setup_retrieval_pipeline(file_path, persist_dir, api_key):
"""
Load documents, create a vector store, and initialize a retrieval chain.
"""
st.write(f"Processing file: {file_path}")
# Step 1: Process and chunk documents
documents = load_docs(file_path)
if not documents:
st.error("Failed to process documents. Please check the input file.")
return None
# Step 2: Initialize vector store
vectordb = initialize_vector_store(documents, persist_dir)
# Step 3: Create retrieval chain
retrieval_chain = create_retrieval_chain(vectordb, api_key)
return retrieval_chain
def extract_patent_number(url):
pattern = r"/patent/([A-Z]{2}\d+)"
match = re.search(pattern, url)
return match.group(1) if match else None
def preview_pdf(pdf_path, scale_factor=0.5):
"""
Generate and display a resized preview of the first page of the PDF.
Args:
pdf_path (str): Path to the PDF file.
scale_factor (float): Factor to reduce the image size (default is 0.5).
Returns:
str: Path to the resized image preview.
"""
try:
# Open the PDF and extract the first page
doc = fitz.open(pdf_path)
first_page = doc[0]
# Apply scaling using a transformation matrix
matrix = fitz.Matrix(scale_factor, scale_factor) # Scale down the image
pix = first_page.get_pixmap(matrix=matrix) # Generate scaled image
# Save the preview image
temp_image_path = os.path.join(tempfile.gettempdir(), "pdf_preview.png")
pix.save(temp_image_path)
doc.close()
return temp_image_path
except Exception as e:
st.error(f"Error generating PDF preview: {e}")
return None
if __name__ == "__main__":
st.set_page_config(
page_title="Patent Chat: Google Patents Chat Demo",
page_icon="",
layout="wide",
initial_sidebar_state="expanded",
)
st.header(" Patent Chat: Google Patents Chat Demo")
# Input for Google Patent Link
patent_link = st.text_area(
"Enter Google Patent Link:",
value="https://patents.google.com/patent/US8676427B1/en",
height=90
)
# Initialize session state
for key in ["LOADED_PATENT", "pdf_preview", "loaded_pdf_path", "chain", "messages", "loading_complete"]:
if key not in st.session_state:
st.session_state[key] = None
# Button to load and process patent
if st.button("Load and Process Patent"):
if not patent_link:
st.warning("Please enter a valid Google patent link.")
st.stop()
# Extract patent number
patent_number = extract_patent_number(patent_link)
if not patent_number:
st.error("Invalid patent link format.")
st.stop()
st.write(f"Patent number: **{patent_number}**")
# File handling
pdf_path = os.path.join(tempfile.gettempdir(), f"{patent_number}.pdf")
if not os.path.isfile(pdf_path):
with st.spinner(" Downloading patent file..."):
try:
pdf_path = download_pdf(patent_number)
st.write(f"\u2705 File downloaded: {pdf_path}")
except Exception as e:
st.error(f"Failed to download patent: {e}")
st.stop()
else:
st.write("\u2705 File already downloaded.")
# Generate PDF preview only if not already displayed
if not st.session_state.get("pdf_preview_displayed", False):
with st.spinner("Generating PDF preview..."):
preview_image_path = preview_pdf(pdf_path, scale_factor=0.5)
if preview_image_path:
st.session_state.pdf_preview = preview_image_path
st.image(preview_image_path, caption="First Page Preview", use_container_width=False)
st.session_state["pdf_preview_displayed"] = True
else:
st.warning("Failed to generate PDF preview.")
st.session_state.pdf_preview = None
# Load the document into the system
st.session_state["loading_complete"] = False
with st.spinner("Loading document into the system..."):
try:
st.session_state.chain = setup_retrieval_pipeline(
pdf_path, PERSISTED_DIRECTORY, OPENAI_API_KEY
)
st.session_state.LOADED_PATENT = patent_number
st.session_state.loaded_pdf_path = pdf_path
st.session_state.messages = [{"role": "assistant", "content": "Hello! How can I assist you with this patent?"}]
st.session_state["loading_complete"] = True
except Exception as e:
st.error(f"Failed to load the document: {e}")
st.session_state["loading_complete"] = False
st.stop()
if st.session_state["loading_complete"]:
st.success("Document successfully loaded! You can now start asking questions.")
# Display previous chat messages
if st.session_state.messages:
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# User input for questions
if st.session_state.chain:
if user_input := st.chat_input("What is your question?"):
# User message
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
# Assistant response
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
with st.spinner("Generating response..."):
try:
# Generate response using the chain
assistant_response = st.session_state.chain({"question": user_input})
full_response = assistant_response.get("answer", "I'm sorry, I couldn't process that question.")
except Exception as e:
full_response = f"An error occurred: {e}"
message_placeholder.markdown(full_response)
st.session_state.messages.append({"role": "assistant", "content": full_response})
else:
st.info("Press the 'Load and Process Patent' button to start processing.")