Update pdf_utils.py
Browse files- pdf_utils.py +28 -41
pdf_utils.py
CHANGED
@@ -1,10 +1,12 @@
|
|
|
|
1 |
import re
|
2 |
import pdfplumber
|
|
|
|
|
3 |
from typing import List, Optional
|
4 |
-
import textract
|
5 |
from docx import Document
|
6 |
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
7 |
-
import
|
8 |
import logging
|
9 |
import warnings
|
10 |
|
@@ -19,53 +21,47 @@ def clean_text(text: str) -> str:
|
|
19 |
text = re.sub(r'\n\s*\n', '\n\n', text) # Remove multiple newlines
|
20 |
return text.strip()
|
21 |
|
22 |
-
def extract_text_from_pdf(
|
23 |
"""
|
24 |
-
Extract text from PDF using pdfplumber.
|
25 |
"""
|
26 |
-
extracted_text = []
|
27 |
try:
|
28 |
-
|
29 |
-
|
30 |
-
|
31 |
-
page_text = page.extract_text()
|
32 |
-
if page_text:
|
33 |
-
extracted_text.append(page_text)
|
34 |
-
else:
|
35 |
-
logger.warning(f"No text extracted from page {page_num}")
|
36 |
-
except Exception as e:
|
37 |
-
logger.error(f"Error extracting text from page {page_num}: {e}")
|
38 |
-
continue
|
39 |
-
|
40 |
-
if not extracted_text:
|
41 |
-
logger.warning("No text was extracted from any page of the PDF")
|
42 |
-
return None
|
43 |
-
|
44 |
-
return clean_text('\n'.join(extracted_text))
|
45 |
except Exception as e:
|
46 |
-
logger.error(f"
|
47 |
return None
|
48 |
|
49 |
-
def extract_text_from_docx(
|
50 |
"""
|
51 |
Extract text from DOCX with enhanced error handling.
|
52 |
"""
|
53 |
try:
|
54 |
-
doc = Document(
|
55 |
text = '\n'.join(para.text for para in doc.paragraphs if para.text.strip())
|
56 |
return clean_text(text) if text else None
|
57 |
except Exception as e:
|
58 |
-
logger.error(f"Failed to process DOCX {
|
59 |
return None
|
60 |
|
61 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
62 |
|
63 |
def extract_text_from_file(uploaded_file) -> Optional[str]:
|
64 |
"""
|
65 |
-
Extract text from various file types
|
66 |
-
If file is uploaded as file-like object, save it temporarily.
|
67 |
"""
|
68 |
-
if isinstance(uploaded_file, str): #
|
69 |
file_path = uploaded_file
|
70 |
else: # Handle file-like objects (e.g., uploaded files)
|
71 |
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
@@ -81,7 +77,7 @@ def extract_text_from_file(uploaded_file) -> Optional[str]:
|
|
81 |
|
82 |
try:
|
83 |
if file_extension == ".pdf":
|
84 |
-
text = extract_text_from_pdf(file_path)
|
85 |
elif file_extension == ".docx":
|
86 |
text = extract_text_from_docx(file_path)
|
87 |
elif file_extension == ".txt":
|
@@ -92,7 +88,7 @@ def extract_text_from_file(uploaded_file) -> Optional[str]:
|
|
92 |
with open(file_path, "r", encoding="latin-1") as file:
|
93 |
text = clean_text(file.read())
|
94 |
else:
|
95 |
-
text =
|
96 |
|
97 |
if not text:
|
98 |
logger.warning(f"No text content extracted from {file_path}")
|
@@ -104,7 +100,6 @@ def extract_text_from_file(uploaded_file) -> Optional[str]:
|
|
104 |
logger.error(f"Error extracting text from {file_path}: {e}")
|
105 |
return None
|
106 |
|
107 |
-
|
108 |
def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
|
109 |
"""
|
110 |
Split text into chunks with improved handling and validation.
|
@@ -131,11 +126,3 @@ def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> L
|
|
131 |
logger.error(f"Error splitting text: {e}")
|
132 |
return []
|
133 |
|
134 |
-
# Example usage
|
135 |
-
if __name__ == "__main__":
|
136 |
-
sample_file = "/Users/jessicawin/Downloads/github-recovery-codes.txt"
|
137 |
-
if os.path.exists(sample_file):
|
138 |
-
file_text = extract_text_from_file(sample_file)
|
139 |
-
if file_text:
|
140 |
-
chunks = split_text(file_text)
|
141 |
-
print(f"Successfully processed file into {len(chunks)} chunks")
|
|
|
1 |
+
import os
|
2 |
import re
|
3 |
import pdfplumber
|
4 |
+
import fitz # PyMuPDF
|
5 |
+
from tika import parser
|
6 |
from typing import List, Optional
|
|
|
7 |
from docx import Document
|
8 |
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
9 |
+
import tempfile
|
10 |
import logging
|
11 |
import warnings
|
12 |
|
|
|
21 |
text = re.sub(r'\n\s*\n', '\n\n', text) # Remove multiple newlines
|
22 |
return text.strip()
|
23 |
|
24 |
+
def extract_text_from_pdf(file_path: str) -> Optional[str]:
|
25 |
"""
|
26 |
+
Extract text from PDF using PyMuPDF (faster than pdfplumber).
|
27 |
"""
|
|
|
28 |
try:
|
29 |
+
doc = fitz.open(file_path)
|
30 |
+
text = "\n".join(page.get_text("text") for page in doc)
|
31 |
+
return clean_text(text) if text else None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
32 |
except Exception as e:
|
33 |
+
logger.error(f"Error extracting text from {file_path} using PyMuPDF: {e}")
|
34 |
return None
|
35 |
|
36 |
+
def extract_text_from_docx(file_path: str) -> Optional[str]:
|
37 |
"""
|
38 |
Extract text from DOCX with enhanced error handling.
|
39 |
"""
|
40 |
try:
|
41 |
+
doc = Document(file_path)
|
42 |
text = '\n'.join(para.text for para in doc.paragraphs if para.text.strip())
|
43 |
return clean_text(text) if text else None
|
44 |
except Exception as e:
|
45 |
+
logger.error(f"Failed to process DOCX {file_path}: {e}")
|
46 |
return None
|
47 |
|
48 |
+
def extract_text_from_other_files(file_path: str) -> Optional[str]:
|
49 |
+
"""
|
50 |
+
Extract text using Apache Tika for other file formats.
|
51 |
+
"""
|
52 |
+
try:
|
53 |
+
parsed = parser.from_file(file_path)
|
54 |
+
text = parsed.get("content", "").strip()
|
55 |
+
return clean_text(text) if text else None
|
56 |
+
except Exception as e:
|
57 |
+
logger.error(f"Error extracting text from {file_path} using Tika: {e}")
|
58 |
+
return None
|
59 |
|
60 |
def extract_text_from_file(uploaded_file) -> Optional[str]:
|
61 |
"""
|
62 |
+
Extract text from various file types.
|
|
|
63 |
"""
|
64 |
+
if isinstance(uploaded_file, str): # Handle direct file paths
|
65 |
file_path = uploaded_file
|
66 |
else: # Handle file-like objects (e.g., uploaded files)
|
67 |
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
|
|
77 |
|
78 |
try:
|
79 |
if file_extension == ".pdf":
|
80 |
+
text = extract_text_from_pdf(file_path) # Use PyMuPDF
|
81 |
elif file_extension == ".docx":
|
82 |
text = extract_text_from_docx(file_path)
|
83 |
elif file_extension == ".txt":
|
|
|
88 |
with open(file_path, "r", encoding="latin-1") as file:
|
89 |
text = clean_text(file.read())
|
90 |
else:
|
91 |
+
text = extract_text_from_other_files(file_path) # Use Apache Tika
|
92 |
|
93 |
if not text:
|
94 |
logger.warning(f"No text content extracted from {file_path}")
|
|
|
100 |
logger.error(f"Error extracting text from {file_path}: {e}")
|
101 |
return None
|
102 |
|
|
|
103 |
def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
|
104 |
"""
|
105 |
Split text into chunks with improved handling and validation.
|
|
|
126 |
logger.error(f"Error splitting text: {e}")
|
127 |
return []
|
128 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|