llm / services /pdf_service.py
Chris4K's picture
Update services/pdf_service.py
d3d2c50 verified
# services/pdf_service.py
from pathlib import Path
from typing import List, Dict, Any, Optional
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import faiss
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
from datetime import datetime
from config.config import settings
logger = logging.getLogger(__name__)
class PDFService:
def __init__(self, model_service):
self.embedder = model_service.embedder
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
self.index = None
self.chunks = []
self.last_update = None
self.pdf_metadata = {}
def process_pdf(self, pdf_path: Path) -> List[Dict[str, Any]]:
"""Process a single PDF file - now synchronous"""
try:
reader = PdfReader(str(pdf_path))
chunks = []
# Extract metadata
metadata = {
'title': reader.metadata.get('/Title', ''),
'author': reader.metadata.get('/Author', ''),
'creation_date': reader.metadata.get('/CreationDate', ''),
'pages': len(reader.pages),
'filename': pdf_path.name
}
self.pdf_metadata[pdf_path.name] = metadata
# Process each page
for page_num, page in enumerate(reader.pages):
text = page.extract_text()
if not text:
continue
page_chunks = self.text_splitter.split_text(text)
for i, chunk in enumerate(page_chunks):
chunks.append({
'text': chunk,
'source': pdf_path.name,
'page': page_num + 1,
'chunk_index': i,
'metadata': metadata,
'timestamp': datetime.now().isoformat()
})
print("--------------------------- chunks ----------------------------------")
print("--------------------------- chunks ----------------------------------")
print(chunks)
return chunks
except Exception as e:
logger.error(f"Error processing PDF {pdf_path}: {e}")
return []
async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> None:
"""Index all PDFs in the specified folder"""
try:
pdf_files = list(pdf_folder.glob('*.pdf'))
if not pdf_files:
logger.warning(f"No PDF files found in {pdf_folder}")
return
# Process PDFs using thread pool
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as executor:
chunk_lists = await loop.run_in_executor(
executor,
lambda: [self.process_pdf(pdf_file) for pdf_file in pdf_files]
)
# Combine all chunks
self.chunks = []
for chunk_list in chunk_lists:
self.chunks.extend(chunk_list)
if not self.chunks:
logger.warning("No text chunks extracted from PDFs")
return
# Create FAISS index
texts = [chunk['text'] for chunk in self.chunks]
embeddings = await loop.run_in_executor(
None,
lambda: self.embedder.encode(
texts,
convert_to_tensor=True,
show_progress_bar=True
).cpu().detach().numpy()
)
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings)
self.last_update = datetime.now()
logger.info(f"Indexed {len(self.chunks)} chunks from {len(pdf_files)} PDFs")
except Exception as e:
logger.error(f"Error indexing PDFs: {e}")
raise
async def search(
self,
query: str,
top_k: int = 5,
min_score: float = 0.5
) -> List[Dict[str, Any]]:
"""Search indexed PDFs with debug logs"""
print("--------------------------- query ----------------------------------")
print(query)
if not self.index or not self.chunks:
await self.index_pdfs()
try:
# Create query embedding
query_embedding = self.embedder.encode([query], convert_to_tensor=True)
query_embedding_np = query_embedding.cpu().detach().numpy()
print("Query Embedding Shape:", query_embedding_np.shape)
# Search in FAISS index
distances, indices = self.index.search(query_embedding_np, top_k)
print("Distances:", distances)
print("Indices:", indices)
# Process results
results = []
for i, idx in enumerate(indices[0]):
if idx >= len(self.chunks):
continue # Skip invalid indices
score = 1 - distances[0][i] # Convert distance to similarity score
print(f"Chunk Index: {idx}, Distance: {distances[0][i]}, Score: {score}")
print("----- score < min_score")
print(score < min_score)
if score < min_score:
print("skipped ---- ")
#continue # Skip low scores
chunk = self.chunks[idx].copy()
chunk['score'] = score
print("---- chuck " )
print(chunk)
results.append(chunk)
# Sort by score and take top_k
results.sort(key=lambda x: x['score'], reverse=True)
print("--------------------------- results ----------------------------------")
print(results)
return results[:top_k]
except Exception as e:
logger.error(f"Error searching PDFs: {e}")
raise