|
|
|
from pathlib import Path |
|
from typing import List, Dict, Any, Optional |
|
from PyPDF2 import PdfReader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
import faiss |
|
import numpy as np |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
import logging |
|
from datetime import datetime |
|
from config.config import settings |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class PDFService: |
|
def __init__(self, model_service): |
|
self.embedder = model_service.embedder |
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=settings.CHUNK_SIZE, |
|
chunk_overlap=settings.CHUNK_OVERLAP, |
|
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] |
|
) |
|
self.index = None |
|
self.chunks = [] |
|
self.last_update = None |
|
self.pdf_metadata = {} |
|
|
|
def process_pdf(self, pdf_path: Path) -> List[Dict[str, Any]]: |
|
"""Process a single PDF file - now synchronous""" |
|
try: |
|
reader = PdfReader(str(pdf_path)) |
|
chunks = [] |
|
|
|
|
|
metadata = { |
|
'title': reader.metadata.get('/Title', ''), |
|
'author': reader.metadata.get('/Author', ''), |
|
'creation_date': reader.metadata.get('/CreationDate', ''), |
|
'pages': len(reader.pages), |
|
'filename': pdf_path.name |
|
} |
|
self.pdf_metadata[pdf_path.name] = metadata |
|
|
|
|
|
for page_num, page in enumerate(reader.pages): |
|
text = page.extract_text() |
|
if not text: |
|
continue |
|
|
|
page_chunks = self.text_splitter.split_text(text) |
|
for i, chunk in enumerate(page_chunks): |
|
chunks.append({ |
|
'text': chunk, |
|
'source': pdf_path.name, |
|
'page': page_num + 1, |
|
'chunk_index': i, |
|
'metadata': metadata, |
|
'timestamp': datetime.now().isoformat() |
|
}) |
|
print("--------------------------- chunks ----------------------------------") |
|
print("--------------------------- chunks ----------------------------------") |
|
print(chunks) |
|
return chunks |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing PDF {pdf_path}: {e}") |
|
return [] |
|
|
|
async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> None: |
|
"""Index all PDFs in the specified folder""" |
|
try: |
|
pdf_files = list(pdf_folder.glob('*.pdf')) |
|
if not pdf_files: |
|
logger.warning(f"No PDF files found in {pdf_folder}") |
|
return |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
with ThreadPoolExecutor() as executor: |
|
chunk_lists = await loop.run_in_executor( |
|
executor, |
|
lambda: [self.process_pdf(pdf_file) for pdf_file in pdf_files] |
|
) |
|
|
|
|
|
self.chunks = [] |
|
for chunk_list in chunk_lists: |
|
self.chunks.extend(chunk_list) |
|
|
|
if not self.chunks: |
|
logger.warning("No text chunks extracted from PDFs") |
|
return |
|
|
|
|
|
texts = [chunk['text'] for chunk in self.chunks] |
|
embeddings = await loop.run_in_executor( |
|
None, |
|
lambda: self.embedder.encode( |
|
texts, |
|
convert_to_tensor=True, |
|
show_progress_bar=True |
|
).cpu().detach().numpy() |
|
) |
|
|
|
dimension = embeddings.shape[1] |
|
self.index = faiss.IndexFlatL2(dimension) |
|
self.index.add(embeddings) |
|
|
|
self.last_update = datetime.now() |
|
|
|
logger.info(f"Indexed {len(self.chunks)} chunks from {len(pdf_files)} PDFs") |
|
|
|
except Exception as e: |
|
logger.error(f"Error indexing PDFs: {e}") |
|
raise |
|
|
|
async def search( |
|
self, |
|
query: str, |
|
top_k: int = 5, |
|
min_score: float = 0.5 |
|
) -> List[Dict[str, Any]]: |
|
"""Search indexed PDFs with debug logs""" |
|
print("--------------------------- query ----------------------------------") |
|
print(query) |
|
if not self.index or not self.chunks: |
|
await self.index_pdfs() |
|
|
|
try: |
|
|
|
query_embedding = self.embedder.encode([query], convert_to_tensor=True) |
|
query_embedding_np = query_embedding.cpu().detach().numpy() |
|
print("Query Embedding Shape:", query_embedding_np.shape) |
|
|
|
|
|
distances, indices = self.index.search(query_embedding_np, top_k) |
|
print("Distances:", distances) |
|
print("Indices:", indices) |
|
|
|
|
|
results = [] |
|
for i, idx in enumerate(indices[0]): |
|
if idx >= len(self.chunks): |
|
continue |
|
|
|
score = 1 - distances[0][i] |
|
print(f"Chunk Index: {idx}, Distance: {distances[0][i]}, Score: {score}") |
|
print("----- score < min_score") |
|
print(score < min_score) |
|
if score < min_score: |
|
print("skipped ---- ") |
|
|
|
|
|
|
|
chunk = self.chunks[idx].copy() |
|
chunk['score'] = score |
|
print("---- chuck " ) |
|
print(chunk) |
|
results.append(chunk) |
|
|
|
|
|
results.sort(key=lambda x: x['score'], reverse=True) |
|
|
|
print("--------------------------- results ----------------------------------") |
|
print(results) |
|
|
|
return results[:top_k] |
|
|
|
except Exception as e: |
|
logger.error(f"Error searching PDFs: {e}") |
|
raise |
|
|