|
import os |
|
import re |
|
import pdfplumber |
|
import fitz |
|
from tika import parser |
|
from typing import List, Optional |
|
from docx import Document |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
import tempfile |
|
import logging |
|
import warnings |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def clean_text(text: str) -> str: |
|
"""Clean extracted text by removing extra whitespace and invalid characters.""" |
|
text = re.sub(r'\s+', ' ', text) |
|
text = ''.join(char for char in text if char.isprintable() or char == '\n') |
|
text = re.sub(r'\n\s*\n', '\n\n', text) |
|
return text.strip() |
|
|
|
def extract_text_from_pdf(file_path: str) -> Optional[str]: |
|
""" |
|
Extract text from PDF using PyMuPDF (faster than pdfplumber). |
|
""" |
|
try: |
|
doc = fitz.open(file_path) |
|
text = "\n".join(page.get_text("text") for page in doc) |
|
return clean_text(text) if text else None |
|
except Exception as e: |
|
logger.error(f"Error extracting text from {file_path} using PyMuPDF: {e}") |
|
return None |
|
|
|
def extract_text_from_docx(file_path: str) -> Optional[str]: |
|
""" |
|
Extract text from DOCX with enhanced error handling. |
|
""" |
|
try: |
|
doc = Document(file_path) |
|
text = '\n'.join(para.text for para in doc.paragraphs if para.text.strip()) |
|
return clean_text(text) if text else None |
|
except Exception as e: |
|
logger.error(f"Failed to process DOCX {file_path}: {e}") |
|
return None |
|
|
|
def extract_text_from_other_files(file_path: str) -> Optional[str]: |
|
""" |
|
Extract text using Apache Tika for other file formats. |
|
""" |
|
try: |
|
parsed = parser.from_file(file_path) |
|
text = parsed.get("content", "").strip() |
|
return clean_text(text) if text else None |
|
except Exception as e: |
|
logger.error(f"Error extracting text from {file_path} using Tika: {e}") |
|
return None |
|
|
|
def extract_text_from_file(uploaded_file) -> Optional[str]: |
|
""" |
|
Extract text from various file types. |
|
""" |
|
if isinstance(uploaded_file, str): |
|
file_path = uploaded_file |
|
else: |
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file: |
|
temp_file.write(uploaded_file.read()) |
|
file_path = temp_file.name |
|
|
|
if not os.path.exists(file_path): |
|
logger.error(f"File not found: {file_path}") |
|
return None |
|
|
|
_, file_extension = os.path.splitext(file_path) |
|
file_extension = file_extension.lower() |
|
|
|
try: |
|
if file_extension == ".pdf": |
|
text = extract_text_from_pdf(file_path) |
|
elif file_extension == ".docx": |
|
text = extract_text_from_docx(file_path) |
|
elif file_extension == ".txt": |
|
try: |
|
with open(file_path, "r", encoding="utf-8") as file: |
|
text = clean_text(file.read()) |
|
except UnicodeDecodeError: |
|
with open(file_path, "r", encoding="latin-1") as file: |
|
text = clean_text(file.read()) |
|
else: |
|
text = extract_text_from_other_files(file_path) |
|
|
|
if not text: |
|
logger.warning(f"No text content extracted from {file_path}") |
|
return None |
|
|
|
return text |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting text from {file_path}: {e}") |
|
return None |
|
|
|
def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]: |
|
""" |
|
Split text into chunks with improved handling and validation. |
|
""" |
|
if not text: |
|
logger.warning("Empty text provided for splitting") |
|
return [] |
|
|
|
try: |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=chunk_size, |
|
chunk_overlap=chunk_overlap, |
|
length_function=len, |
|
is_separator_regex=False |
|
) |
|
|
|
splits = text_splitter.split_text(text) |
|
|
|
logger.info(f"Split text into {len(splits)} chunks") |
|
|
|
return splits |
|
|
|
except Exception as e: |
|
logger.error(f"Error splitting text: {e}") |
|
return [] |
|
|
|
|