File size: 4,375 Bytes
8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc 8a3f76a 8953dfc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
import re
import pdfplumber
import fitz # PyMuPDF
from tika import parser
from typing import List, Optional
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tempfile
import logging
import warnings
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def clean_text(text: str) -> str:
"""Clean extracted text by removing extra whitespace and invalid characters."""
text = re.sub(r'\s+', ' ', text) # Remove multiple spaces
text = ''.join(char for char in text if char.isprintable() or char == '\n') # Remove non-printable characters
text = re.sub(r'\n\s*\n', '\n\n', text) # Remove multiple newlines
return text.strip()
def extract_text_from_pdf(file_path: str) -> Optional[str]:
"""
Extract text from PDF using PyMuPDF (faster than pdfplumber).
"""
try:
doc = fitz.open(file_path)
text = "\n".join(page.get_text("text") for page in doc)
return clean_text(text) if text else None
except Exception as e:
logger.error(f"Error extracting text from {file_path} using PyMuPDF: {e}")
return None
def extract_text_from_docx(file_path: str) -> Optional[str]:
"""
Extract text from DOCX with enhanced error handling.
"""
try:
doc = Document(file_path)
text = '\n'.join(para.text for para in doc.paragraphs if para.text.strip())
return clean_text(text) if text else None
except Exception as e:
logger.error(f"Failed to process DOCX {file_path}: {e}")
return None
def extract_text_from_other_files(file_path: str) -> Optional[str]:
"""
Extract text using Apache Tika for other file formats.
"""
try:
parsed = parser.from_file(file_path)
text = parsed.get("content", "").strip()
return clean_text(text) if text else None
except Exception as e:
logger.error(f"Error extracting text from {file_path} using Tika: {e}")
return None
def extract_text_from_file(uploaded_file) -> Optional[str]:
"""
Extract text from various file types.
"""
if isinstance(uploaded_file, str): # Handle direct file paths
file_path = uploaded_file
else: # Handle file-like objects (e.g., uploaded files)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(uploaded_file.read()) # Save file contents temporarily
file_path = temp_file.name # Temporary file path
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return None
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
try:
if file_extension == ".pdf":
text = extract_text_from_pdf(file_path) # Use PyMuPDF
elif file_extension == ".docx":
text = extract_text_from_docx(file_path)
elif file_extension == ".txt":
try:
with open(file_path, "r", encoding="utf-8") as file:
text = clean_text(file.read())
except UnicodeDecodeError:
with open(file_path, "r", encoding="latin-1") as file:
text = clean_text(file.read())
else:
text = extract_text_from_other_files(file_path) # Use Apache Tika
if not text:
logger.warning(f"No text content extracted from {file_path}")
return None
return text
except Exception as e:
logger.error(f"Error extracting text from {file_path}: {e}")
return None
def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
"""
Split text into chunks with improved handling and validation.
"""
if not text:
logger.warning("Empty text provided for splitting")
return []
try:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False
)
splits = text_splitter.split_text(text)
logger.info(f"Split text into {len(splits)} chunks")
return splits
except Exception as e:
logger.error(f"Error splitting text: {e}")
return []
|