File size: 4,375 Bytes
8a3f76a
8953dfc
 
8a3f76a
 
8953dfc
 
 
8a3f76a
8953dfc
 
 
 
 
 
 
 
 
 
 
 
 
 
8a3f76a
8953dfc
8a3f76a
8953dfc
 
8a3f76a
 
 
8953dfc
8a3f76a
8953dfc
 
8a3f76a
8953dfc
 
 
 
8a3f76a
8953dfc
 
 
8a3f76a
8953dfc
 
8a3f76a
 
 
 
 
 
 
 
 
 
 
8953dfc
 
 
8a3f76a
8953dfc
8a3f76a
8953dfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8a3f76a
8953dfc
 
 
 
 
 
 
 
 
 
8a3f76a
8953dfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
import re
import pdfplumber
import fitz  # PyMuPDF
from tika import parser
from typing import List, Optional
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tempfile
import logging
import warnings

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_text(text: str) -> str:
    """Clean extracted text by removing extra whitespace and invalid characters."""
    text = re.sub(r'\s+', ' ', text)  # Remove multiple spaces
    text = ''.join(char for char in text if char.isprintable() or char == '\n')  # Remove non-printable characters
    text = re.sub(r'\n\s*\n', '\n\n', text)  # Remove multiple newlines
    return text.strip()

def extract_text_from_pdf(file_path: str) -> Optional[str]:
    """
    Extract text from PDF using PyMuPDF (faster than pdfplumber).
    """
    try:
        doc = fitz.open(file_path)
        text = "\n".join(page.get_text("text") for page in doc)
        return clean_text(text) if text else None
    except Exception as e:
        logger.error(f"Error extracting text from {file_path} using PyMuPDF: {e}")
        return None

def extract_text_from_docx(file_path: str) -> Optional[str]:
    """
    Extract text from DOCX with enhanced error handling.
    """
    try:
        doc = Document(file_path)
        text = '\n'.join(para.text for para in doc.paragraphs if para.text.strip())
        return clean_text(text) if text else None
    except Exception as e:
        logger.error(f"Failed to process DOCX {file_path}: {e}")
        return None

def extract_text_from_other_files(file_path: str) -> Optional[str]:
    """
    Extract text using Apache Tika for other file formats.
    """
    try:
        parsed = parser.from_file(file_path)
        text = parsed.get("content", "").strip()
        return clean_text(text) if text else None
    except Exception as e:
        logger.error(f"Error extracting text from {file_path} using Tika: {e}")
        return None

def extract_text_from_file(uploaded_file) -> Optional[str]:
    """
    Extract text from various file types.
    """
    if isinstance(uploaded_file, str):  # Handle direct file paths
        file_path = uploaded_file
    else:  # Handle file-like objects (e.g., uploaded files)
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            temp_file.write(uploaded_file.read())  # Save file contents temporarily
            file_path = temp_file.name  # Temporary file path

    if not os.path.exists(file_path):
        logger.error(f"File not found: {file_path}")
        return None

    _, file_extension = os.path.splitext(file_path)
    file_extension = file_extension.lower()

    try:
        if file_extension == ".pdf":
            text = extract_text_from_pdf(file_path)  # Use PyMuPDF
        elif file_extension == ".docx":
            text = extract_text_from_docx(file_path)
        elif file_extension == ".txt":
            try:
                with open(file_path, "r", encoding="utf-8") as file:
                    text = clean_text(file.read())
            except UnicodeDecodeError:
                with open(file_path, "r", encoding="latin-1") as file:
                    text = clean_text(file.read())
        else:
            text = extract_text_from_other_files(file_path)  # Use Apache Tika

        if not text:
            logger.warning(f"No text content extracted from {file_path}")
            return None

        return text

    except Exception as e:
        logger.error(f"Error extracting text from {file_path}: {e}")
        return None

def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
    """
    Split text into chunks with improved handling and validation.
    """
    if not text:
        logger.warning("Empty text provided for splitting")
        return []

    try:
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            is_separator_regex=False
        )
        
        splits = text_splitter.split_text(text)
        
        logger.info(f"Split text into {len(splits)} chunks")
        
        return splits

    except Exception as e:
        logger.error(f"Error splitting text: {e}")
        return []