Commit
·
84c83c0
1
Parent(s):
a748df6
General improvement in quick image matching and merging
Browse files- app.py +2 -2
- tools/aws_textract.py +40 -47
- tools/custom_image_analyser_engine.py +148 -132
- tools/file_conversion.py +1 -1
- tools/file_redaction.py +202 -77
app.py
CHANGED
@@ -89,7 +89,7 @@ with app:
|
|
89 |
with gr.Tab("PDFs/images"):
|
90 |
with gr.Accordion("Redact document", open = True):
|
91 |
in_doc_files = gr.File(label="Choose document/image files (PDF, JPG, PNG)", file_count= "multiple", file_types=['.pdf', '.jpg', '.png', '.json'])
|
92 |
-
in_redaction_method = gr.Radio(label="Choose document redaction method.
|
93 |
gr.Markdown("""If you only want to redact certain pages, or certain entities (e.g. just email addresses), please go to the redaction settings tab.""")
|
94 |
document_redact_btn = gr.Button("Redact document(s)", variant="primary")
|
95 |
|
@@ -150,7 +150,7 @@ with app:
|
|
150 |
page_min = gr.Number(precision=0,minimum=0,maximum=9999, label="Lowest page to redact")
|
151 |
page_max = gr.Number(precision=0,minimum=0,maximum=9999, label="Highest page to redact")
|
152 |
with gr.Row():
|
153 |
-
handwrite_signature_checkbox = gr.CheckboxGroup(choices=["Redact all identified handwriting", "Redact all identified signatures"], value=["Redact all identified handwriting", "Redact all identified signatures"])
|
154 |
with gr.Accordion("Settings for open text or xlsx/csv files", open = True):
|
155 |
anon_strat = gr.Radio(choices=["replace with <REDACTED>", "replace with <ENTITY_NAME>", "redact", "hash", "mask", "encrypt", "fake_first_name"], label="Select an anonymisation method.", value = "replace with <REDACTED>")
|
156 |
|
|
|
89 |
with gr.Tab("PDFs/images"):
|
90 |
with gr.Accordion("Redact document", open = True):
|
91 |
in_doc_files = gr.File(label="Choose document/image files (PDF, JPG, PNG)", file_count= "multiple", file_types=['.pdf', '.jpg', '.png', '.json'])
|
92 |
+
in_redaction_method = gr.Radio(label="Choose document redaction method. AWS Textract has a cost per page so please only use when needed.", value = "Simple text analysis - PDFs with selectable text", choices=["Simple text analysis - PDFs with selectable text", "Quick image analysis - typed text", "Complex image analysis - docs with handwriting/signatures (AWS Textract)"])
|
93 |
gr.Markdown("""If you only want to redact certain pages, or certain entities (e.g. just email addresses), please go to the redaction settings tab.""")
|
94 |
document_redact_btn = gr.Button("Redact document(s)", variant="primary")
|
95 |
|
|
|
150 |
page_min = gr.Number(precision=0,minimum=0,maximum=9999, label="Lowest page to redact")
|
151 |
page_max = gr.Number(precision=0,minimum=0,maximum=9999, label="Highest page to redact")
|
152 |
with gr.Row():
|
153 |
+
handwrite_signature_checkbox = gr.CheckboxGroup(label="AWS Textract settings", choices=["Redact all identified handwriting", "Redact all identified signatures"], value=["Redact all identified handwriting", "Redact all identified signatures"])
|
154 |
with gr.Accordion("Settings for open text or xlsx/csv files", open = True):
|
155 |
anon_strat = gr.Radio(choices=["replace with <REDACTED>", "replace with <ENTITY_NAME>", "redact", "hash", "mask", "encrypt", "fake_first_name"], label="Select an anonymisation method.", value = "replace with <REDACTED>")
|
156 |
|
tools/aws_textract.py
CHANGED
@@ -91,8 +91,9 @@ def json_to_ocrresult(json_data, page_width, page_height):
|
|
91 |
handwriting_recogniser_results = []
|
92 |
signatures = []
|
93 |
handwriting = []
|
|
|
94 |
|
95 |
-
|
96 |
|
97 |
for text_block in json_data:
|
98 |
|
@@ -100,17 +101,23 @@ def json_to_ocrresult(json_data, page_width, page_height):
|
|
100 |
is_handwriting = False
|
101 |
|
102 |
|
103 |
-
|
104 |
if (text_block['BlockType'] == 'LINE') | (text_block['BlockType'] == 'SIGNATURE'): # (text_block['BlockType'] == 'WORD') |
|
105 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
106 |
if text_block['BlockType'] == 'LINE':
|
|
|
107 |
# Extract text and bounding box for the line
|
108 |
line_text = text_block.get('Text', '')
|
109 |
-
line_bbox = text_block["Geometry"]["BoundingBox"]
|
110 |
-
line_left = int(line_bbox["Left"] * page_width)
|
111 |
-
line_top = int(line_bbox["Top"] * page_height)
|
112 |
-
line_right = int((line_bbox["Left"] + line_bbox["Width"]) * page_width)
|
113 |
-
line_bottom = int((line_bbox["Top"] + line_bbox["Height"]) * page_height)
|
114 |
|
115 |
words = []
|
116 |
if 'Relationships' in text_block:
|
@@ -128,12 +135,12 @@ def json_to_ocrresult(json_data, page_width, page_height):
|
|
128 |
word_bottom = int((word_bbox["Top"] + word_bbox["Height"]) * page_height)
|
129 |
|
130 |
# Extract BoundingBox details
|
131 |
-
|
132 |
-
|
133 |
|
134 |
# Convert proportional coordinates to absolute coordinates
|
135 |
-
|
136 |
-
|
137 |
|
138 |
words.append({
|
139 |
'text': word_text,
|
@@ -146,18 +153,14 @@ def json_to_ocrresult(json_data, page_width, page_height):
|
|
146 |
is_handwriting = True
|
147 |
entity_name = "HANDWRITING"
|
148 |
word_end = len(entity_name)
|
149 |
-
recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= word_text, score= confidence, start=0, end=word_end, left=word_left, top=word_top, width=width_abs, height=height_abs)
|
150 |
-
handwriting.append(recogniser_result)
|
151 |
-
print("Handwriting found:", handwriting[-1])
|
152 |
|
153 |
-
|
154 |
-
'bounding_box': (line_left, line_top, line_right, line_bottom),
|
155 |
-
'words': words
|
156 |
-
}
|
157 |
|
158 |
-
|
159 |
|
160 |
-
|
|
|
|
|
161 |
|
162 |
elif (text_block['BlockType'] == 'SIGNATURE'):
|
163 |
line_text = "SIGNATURE"
|
@@ -167,38 +170,26 @@ def json_to_ocrresult(json_data, page_width, page_height):
|
|
167 |
confidence = text_block['Confidence']
|
168 |
word_end = len(entity_name)
|
169 |
|
170 |
-
|
171 |
-
bbox = text_block["Geometry"]["BoundingBox"]
|
172 |
-
left = bbox["Left"]
|
173 |
-
top = bbox["Top"]
|
174 |
-
width = bbox["Width"]
|
175 |
-
height = bbox["Height"]
|
176 |
-
|
177 |
-
# Convert proportional coordinates to absolute coordinates
|
178 |
-
left_abs = int(left * page_width)
|
179 |
-
top_abs = int(top * page_height)
|
180 |
-
width_abs = int(width * page_width)
|
181 |
-
height_abs = int(height * page_height)
|
182 |
|
183 |
-
recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= line_text, score= confidence, start=0, end=word_end, left=left_abs, top=top_abs, width=width_abs, height=height_abs)
|
184 |
signatures.append(recogniser_result)
|
185 |
print("Signature found:", signatures[-1])
|
186 |
|
187 |
-
|
188 |
-
|
189 |
-
|
190 |
-
|
191 |
-
|
192 |
-
|
193 |
-
|
194 |
-
|
195 |
-
|
196 |
-
|
197 |
-
|
198 |
-
|
199 |
|
200 |
# Create OCRResult with absolute coordinates
|
201 |
-
ocr_result = OCRResult(line_text,
|
202 |
all_ocr_results.append(ocr_result)
|
203 |
|
204 |
is_signature_or_handwriting = is_signature | is_handwriting
|
@@ -209,5 +200,7 @@ def json_to_ocrresult(json_data, page_width, page_height):
|
|
209 |
|
210 |
if is_signature: signature_recogniser_results.append(recogniser_result)
|
211 |
if is_handwriting: handwriting_recogniser_results.append(recogniser_result)
|
|
|
|
|
212 |
|
213 |
-
return all_ocr_results, signature_or_handwriting_recogniser_results, signature_recogniser_results, handwriting_recogniser_results,
|
|
|
91 |
handwriting_recogniser_results = []
|
92 |
signatures = []
|
93 |
handwriting = []
|
94 |
+
ocr_results_with_children = {}
|
95 |
|
96 |
+
i = 1
|
97 |
|
98 |
for text_block in json_data:
|
99 |
|
|
|
101 |
is_handwriting = False
|
102 |
|
103 |
|
104 |
+
|
105 |
if (text_block['BlockType'] == 'LINE') | (text_block['BlockType'] == 'SIGNATURE'): # (text_block['BlockType'] == 'WORD') |
|
106 |
|
107 |
+
# Extract text and bounding box for the line
|
108 |
+
line_bbox = text_block["Geometry"]["BoundingBox"]
|
109 |
+
line_left = int(line_bbox["Left"] * page_width)
|
110 |
+
line_top = int(line_bbox["Top"] * page_height)
|
111 |
+
line_right = int((line_bbox["Left"] + line_bbox["Width"]) * page_width)
|
112 |
+
line_bottom = int((line_bbox["Top"] + line_bbox["Height"]) * page_height)
|
113 |
+
|
114 |
+
width_abs = int(line_bbox["Width"] * page_width)
|
115 |
+
height_abs = int(line_bbox["Height"] * page_height)
|
116 |
+
|
117 |
if text_block['BlockType'] == 'LINE':
|
118 |
+
|
119 |
# Extract text and bounding box for the line
|
120 |
line_text = text_block.get('Text', '')
|
|
|
|
|
|
|
|
|
|
|
121 |
|
122 |
words = []
|
123 |
if 'Relationships' in text_block:
|
|
|
135 |
word_bottom = int((word_bbox["Top"] + word_bbox["Height"]) * page_height)
|
136 |
|
137 |
# Extract BoundingBox details
|
138 |
+
word_width = word_bbox["Width"]
|
139 |
+
word_height = word_bbox["Height"]
|
140 |
|
141 |
# Convert proportional coordinates to absolute coordinates
|
142 |
+
word_width_abs = int(word_width * page_width)
|
143 |
+
word_height_abs = int(word_height * page_height)
|
144 |
|
145 |
words.append({
|
146 |
'text': word_text,
|
|
|
153 |
is_handwriting = True
|
154 |
entity_name = "HANDWRITING"
|
155 |
word_end = len(entity_name)
|
|
|
|
|
|
|
156 |
|
157 |
+
recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= word_text, score= confidence, start=0, end=word_end, left=word_left, top=word_top, width=word_width_abs, height=word_height_abs)
|
|
|
|
|
|
|
158 |
|
159 |
+
handwriting.append(recogniser_result)
|
160 |
|
161 |
+
print("Handwriting found:", handwriting[-1])
|
162 |
+
|
163 |
+
# If handwriting or signature, add to bounding box
|
164 |
|
165 |
elif (text_block['BlockType'] == 'SIGNATURE'):
|
166 |
line_text = "SIGNATURE"
|
|
|
170 |
confidence = text_block['Confidence']
|
171 |
word_end = len(entity_name)
|
172 |
|
173 |
+
recogniser_result = CustomImageRecognizerResult(entity_type=entity_name, text= line_text, score= confidence, start=0, end=word_end, left=line_left, top=line_top, width=width_abs, height=height_abs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
174 |
|
|
|
175 |
signatures.append(recogniser_result)
|
176 |
print("Signature found:", signatures[-1])
|
177 |
|
178 |
+
words = []
|
179 |
+
words.append({
|
180 |
+
'text': line_text,
|
181 |
+
'bounding_box': (line_left, line_top, line_right, line_bottom)
|
182 |
+
})
|
183 |
+
|
184 |
+
ocr_results_with_children["text_line_" + str(i)] = {
|
185 |
+
"line": i,
|
186 |
+
'text': line_text,
|
187 |
+
'bounding_box': (line_left, line_top, line_right, line_bottom),
|
188 |
+
'words': words
|
189 |
+
}
|
190 |
|
191 |
# Create OCRResult with absolute coordinates
|
192 |
+
ocr_result = OCRResult(line_text, line_left, line_top, width_abs, height_abs)
|
193 |
all_ocr_results.append(ocr_result)
|
194 |
|
195 |
is_signature_or_handwriting = is_signature | is_handwriting
|
|
|
200 |
|
201 |
if is_signature: signature_recogniser_results.append(recogniser_result)
|
202 |
if is_handwriting: handwriting_recogniser_results.append(recogniser_result)
|
203 |
+
|
204 |
+
i += 1
|
205 |
|
206 |
+
return all_ocr_results, signature_or_handwriting_recogniser_results, signature_recogniser_results, handwriting_recogniser_results, ocr_results_with_children
|
tools/custom_image_analyser_engine.py
CHANGED
@@ -9,6 +9,7 @@ import PIL
|
|
9 |
from PIL import ImageDraw, ImageFont, Image
|
10 |
from typing import Optional, Tuple, Union
|
11 |
from copy import deepcopy
|
|
|
12 |
|
13 |
@dataclass
|
14 |
class OCRResult:
|
@@ -399,6 +400,11 @@ class ContrastSegmentedImageEnhancer(ImagePreprocessor):
|
|
399 |
adjusted_contrast = contrast
|
400 |
return adjusted_image, contrast, adjusted_contrast
|
401 |
|
|
|
|
|
|
|
|
|
|
|
402 |
class CustomImageAnalyzerEngine:
|
403 |
def __init__(
|
404 |
self,
|
@@ -412,13 +418,6 @@ class CustomImageAnalyzerEngine:
|
|
412 |
self.tesseract_config = tesseract_config or '--oem 3 --psm 11'
|
413 |
|
414 |
if not image_preprocessor:
|
415 |
-
# image_preprocessor = ImagePreprocessor(
|
416 |
-
# c_low_contrast=10,
|
417 |
-
# c_high_contrast=20,
|
418 |
-
# contrast_threshold=0.5,
|
419 |
-
# bg_threshold=128,
|
420 |
-
# block_size=11
|
421 |
-
# )
|
422 |
image_preprocessor = ContrastSegmentedImageEnhancer()
|
423 |
#print(image_preprocessor)
|
424 |
self.image_preprocessor = image_preprocessor
|
@@ -432,9 +431,6 @@ class CustomImageAnalyzerEngine:
|
|
432 |
|
433 |
image_processed, preprocessing_metadata = self.image_preprocessor.preprocess_image(image)
|
434 |
|
435 |
-
#print("pre-processing metadata:", preprocessing_metadata)
|
436 |
-
#image_processed.save("image_processed.png")
|
437 |
-
|
438 |
ocr_data = pytesseract.image_to_data(image_processed, output_type=pytesseract.Output.DICT, config=self.tesseract_config)
|
439 |
|
440 |
if preprocessing_metadata and ("scale_factor" in preprocessing_metadata):
|
@@ -460,64 +456,95 @@ class CustomImageAnalyzerEngine:
|
|
460 |
|
461 |
def analyze_text(
|
462 |
self,
|
463 |
-
|
464 |
ocr_results_with_children: Dict[str, Dict],
|
465 |
**text_analyzer_kwargs
|
466 |
) -> List[CustomImageRecognizerResult]:
|
467 |
# Define English as default language, if not specified
|
468 |
if "language" not in text_analyzer_kwargs:
|
469 |
text_analyzer_kwargs["language"] = "en"
|
|
|
|
|
|
|
470 |
|
471 |
allow_list = text_analyzer_kwargs.get('allow_list', [])
|
472 |
|
473 |
combined_results = []
|
474 |
-
for
|
475 |
# Analyze each OCR result (line) individually
|
476 |
analyzer_result = self.analyzer_engine.analyze(
|
477 |
-
text=
|
478 |
)
|
479 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
480 |
for result in analyzer_result:
|
481 |
# Extract the relevant portion of text based on start and end
|
482 |
-
relevant_text =
|
483 |
|
484 |
# Find the corresponding entry in ocr_results_with_children
|
485 |
-
|
486 |
-
|
487 |
-
|
488 |
-
|
489 |
-
|
490 |
-
|
491 |
-
|
492 |
-
|
493 |
-
|
494 |
-
|
495 |
-
|
496 |
-
text
|
497 |
-
|
498 |
-
|
499 |
-
|
500 |
-
|
501 |
-
|
502 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
503 |
# Fallback to previous method if not found in ocr_results_with_children
|
504 |
-
|
505 |
-
|
506 |
-
text=relevant_text,
|
507 |
-
left=ocr_result.left + self.estimate_x_offset(relevant_text, result.start),
|
508 |
-
top=ocr_result.top,
|
509 |
-
width=self.estimate_width(ocr_result=ocr_result, start=result.start, end=result.end),
|
510 |
-
height=ocr_result.height
|
511 |
-
)
|
512 |
|
513 |
-
|
514 |
-
|
515 |
-
|
|
|
516 |
|
|
|
|
|
|
|
|
|
517 |
# Map the analyzer results to bounding boxes for this line
|
518 |
line_results = self.map_analyzer_results_to_bounding_boxes(
|
519 |
-
[
|
520 |
)
|
|
|
|
|
521 |
|
522 |
combined_results.extend(line_results)
|
523 |
|
@@ -526,98 +553,64 @@ class CustomImageAnalyzerEngine:
|
|
526 |
@staticmethod
|
527 |
def map_analyzer_results_to_bounding_boxes(
|
528 |
text_analyzer_results: List[RecognizerResult],
|
529 |
-
|
530 |
full_text: str,
|
531 |
allow_list: List[str],
|
532 |
-
|
533 |
) -> List[CustomImageRecognizerResult]:
|
534 |
-
|
535 |
text_position = 0
|
536 |
|
537 |
-
for
|
538 |
-
word_end = text_position + len(
|
539 |
|
540 |
-
#print("Checking relevant OCR result:",
|
541 |
-
|
542 |
-
for
|
543 |
-
max_of_current_text_pos_or_result_start_pos = max(text_position,
|
544 |
-
min_of_result_end_pos_or_results_end = min(word_end,
|
545 |
|
546 |
-
|
547 |
-
|
|
|
548 |
|
549 |
-
if (max_of_current_text_pos_or_result_start_pos < min_of_result_end_pos_or_results_end) and (
|
550 |
-
print("result",
|
|
|
|
|
551 |
|
552 |
-
#
|
553 |
-
|
554 |
-
|
|
|
|
|
555 |
# Use the bounding box from ocr_results_with_children
|
556 |
-
bbox = child_info['bounding_box']
|
557 |
left, top, right, bottom = bbox
|
558 |
width = right - left
|
559 |
height = bottom - top
|
|
|
560 |
else:
|
561 |
-
|
562 |
-
|
563 |
-
top = ocr_result.top
|
564 |
-
width = ocr_result.width
|
565 |
-
height = ocr_result.height
|
566 |
|
567 |
-
|
568 |
CustomImageRecognizerResult(
|
569 |
-
entity_type=
|
570 |
-
start=
|
571 |
-
end=
|
572 |
-
score=
|
573 |
left=left,
|
574 |
top=top,
|
575 |
width=width,
|
576 |
height=height,
|
577 |
-
text=
|
578 |
)
|
579 |
)
|
580 |
|
581 |
text_position = word_end + 1 # +1 for the space between words
|
582 |
|
583 |
-
return
|
584 |
-
|
585 |
-
# @staticmethod
|
586 |
-
# def map_analyzer_results_to_bounding_boxes(
|
587 |
-
# text_analyzer_results: List[RecognizerResult],
|
588 |
-
# ocr_results: List[OCRResult],
|
589 |
-
# full_text: str,
|
590 |
-
# allow_list: List[str],
|
591 |
-
# ) -> List[CustomImageRecognizerResult]:
|
592 |
-
# pii_bboxes = []
|
593 |
-
# text_position = 0
|
594 |
-
|
595 |
-
# for ocr_result in ocr_results:
|
596 |
-
# word_end = text_position + len(ocr_result.text)
|
597 |
-
|
598 |
-
# print("Checking relevant OCR result:", ocr_result)
|
599 |
-
|
600 |
-
# for result in text_analyzer_results:
|
601 |
-
# if (max(text_position, result.start) < min(word_end, result.end)) and (ocr_result.text not in allow_list):
|
602 |
-
# print("result", result, "made it through if statement")
|
603 |
-
|
604 |
-
# pii_bboxes.append(
|
605 |
-
# CustomImageRecognizerResult(
|
606 |
-
# entity_type=result.entity_type,
|
607 |
-
# start=result.start,
|
608 |
-
# end=result.end,
|
609 |
-
# score=result.score,
|
610 |
-
# left=ocr_result.left,
|
611 |
-
# top=ocr_result.top,
|
612 |
-
# width=ocr_result.width,
|
613 |
-
# height=ocr_result.height,
|
614 |
-
# text=ocr_result.text
|
615 |
-
# )
|
616 |
-
# )
|
617 |
-
|
618 |
-
# text_position = word_end + 1 # +1 for the space between words
|
619 |
-
|
620 |
-
# return pii_bboxes
|
621 |
|
622 |
@staticmethod
|
623 |
def remove_space_boxes(ocr_result: dict) -> dict:
|
@@ -789,6 +782,21 @@ def combine_ocr_results(ocr_results, x_threshold=50, y_threshold=12):
|
|
789 |
current_bbox = None
|
790 |
line_counter = 1
|
791 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
792 |
for result in sorted_results:
|
793 |
if not current_line:
|
794 |
# Start a new line
|
@@ -797,6 +805,7 @@ def combine_ocr_results(ocr_results, x_threshold=50, y_threshold=12):
|
|
797 |
else:
|
798 |
# Check if the result is on the same line (y-axis) and close horizontally (x-axis)
|
799 |
last_result = current_line[-1]
|
|
|
800 |
if abs(result.top - last_result.top) <= y_threshold and \
|
801 |
(result.left - (last_result.left + last_result.width)) <= x_threshold:
|
802 |
# Update the bounding box to include the new word
|
@@ -810,18 +819,22 @@ def combine_ocr_results(ocr_results, x_threshold=50, y_threshold=12):
|
|
810 |
)
|
811 |
current_line.append(result)
|
812 |
else:
|
|
|
|
|
813 |
# Commit the current line and start a new one
|
814 |
combined_results.append(current_bbox)
|
815 |
-
new_format_results[current_bbox.text] = { # f"combined_text_{line_counter}"
|
816 |
-
|
817 |
-
|
818 |
-
|
819 |
-
|
820 |
-
|
821 |
-
|
822 |
-
|
823 |
-
|
824 |
-
}
|
|
|
|
|
825 |
line_counter += 1
|
826 |
current_line = [result]
|
827 |
current_bbox = result
|
@@ -829,16 +842,19 @@ def combine_ocr_results(ocr_results, x_threshold=50, y_threshold=12):
|
|
829 |
# Append the last line
|
830 |
if current_bbox:
|
831 |
combined_results.append(current_bbox)
|
832 |
-
new_format_results[current_bbox.text] = { # f"combined_text_{line_counter}"
|
833 |
-
|
834 |
-
|
835 |
-
|
836 |
-
|
837 |
-
|
838 |
-
|
839 |
-
|
840 |
-
|
841 |
-
}
|
|
|
|
|
|
|
842 |
|
843 |
return combined_results, new_format_results
|
844 |
|
|
|
9 |
from PIL import ImageDraw, ImageFont, Image
|
10 |
from typing import Optional, Tuple, Union
|
11 |
from copy import deepcopy
|
12 |
+
import string # Import string to get a list of common punctuation characters
|
13 |
|
14 |
@dataclass
|
15 |
class OCRResult:
|
|
|
400 |
adjusted_contrast = contrast
|
401 |
return adjusted_image, contrast, adjusted_contrast
|
402 |
|
403 |
+
def bounding_boxes_overlap(box1, box2):
|
404 |
+
"""Check if two bounding boxes overlap."""
|
405 |
+
return (box1[0] < box2[2] and box2[0] < box1[2] and
|
406 |
+
box1[1] < box2[3] and box2[1] < box1[3])
|
407 |
+
|
408 |
class CustomImageAnalyzerEngine:
|
409 |
def __init__(
|
410 |
self,
|
|
|
418 |
self.tesseract_config = tesseract_config or '--oem 3 --psm 11'
|
419 |
|
420 |
if not image_preprocessor:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
421 |
image_preprocessor = ContrastSegmentedImageEnhancer()
|
422 |
#print(image_preprocessor)
|
423 |
self.image_preprocessor = image_preprocessor
|
|
|
431 |
|
432 |
image_processed, preprocessing_metadata = self.image_preprocessor.preprocess_image(image)
|
433 |
|
|
|
|
|
|
|
434 |
ocr_data = pytesseract.image_to_data(image_processed, output_type=pytesseract.Output.DICT, config=self.tesseract_config)
|
435 |
|
436 |
if preprocessing_metadata and ("scale_factor" in preprocessing_metadata):
|
|
|
456 |
|
457 |
def analyze_text(
|
458 |
self,
|
459 |
+
line_level_ocr_results: List[OCRResult],
|
460 |
ocr_results_with_children: Dict[str, Dict],
|
461 |
**text_analyzer_kwargs
|
462 |
) -> List[CustomImageRecognizerResult]:
|
463 |
# Define English as default language, if not specified
|
464 |
if "language" not in text_analyzer_kwargs:
|
465 |
text_analyzer_kwargs["language"] = "en"
|
466 |
+
|
467 |
+
horizontal_buffer = 0 # add pixels to right of width
|
468 |
+
height_buffer = 2 # add pixels to bounding box height
|
469 |
|
470 |
allow_list = text_analyzer_kwargs.get('allow_list', [])
|
471 |
|
472 |
combined_results = []
|
473 |
+
for i, line_level_ocr_result in enumerate(line_level_ocr_results):
|
474 |
# Analyze each OCR result (line) individually
|
475 |
analyzer_result = self.analyzer_engine.analyze(
|
476 |
+
text=line_level_ocr_result.text, **text_analyzer_kwargs
|
477 |
)
|
478 |
+
|
479 |
+
if i < len(ocr_results_with_children): # Check if i is a valid index
|
480 |
+
child_level_key = list(ocr_results_with_children.keys())[i]
|
481 |
+
else:
|
482 |
+
continue
|
483 |
+
|
484 |
+
ocr_results_with_children_line_level = ocr_results_with_children[child_level_key]
|
485 |
+
|
486 |
+
# Go through results to add bounding boxes
|
487 |
for result in analyzer_result:
|
488 |
# Extract the relevant portion of text based on start and end
|
489 |
+
relevant_text = line_level_ocr_result.text[result.start:result.end]
|
490 |
|
491 |
# Find the corresponding entry in ocr_results_with_children
|
492 |
+
child_words = ocr_results_with_children_line_level['words']
|
493 |
+
|
494 |
+
# Initialize bounding box values
|
495 |
+
left, top, bottom = float('inf'), float('inf'), float('-inf')
|
496 |
+
all_words = ""
|
497 |
+
word_num = 0 # Initialize word count
|
498 |
+
total_width = 0 # Initialize total width
|
499 |
+
|
500 |
+
for word_text in relevant_text.split(): # Iterate through each word in relevant_text
|
501 |
+
print("Looking for word_text:", word_text)
|
502 |
+
for word in child_words:
|
503 |
+
#if word['text'].strip(string.punctuation).strip() == word_text.strip(string.punctuation).strip(): # Check for exact match
|
504 |
+
if word_text in word['text']:
|
505 |
+
found_word = word
|
506 |
+
print("found_word:", found_word)
|
507 |
+
|
508 |
+
if word_num == 0: # First word
|
509 |
+
left = found_word['bounding_box'][0]
|
510 |
+
top = found_word['bounding_box'][1]
|
511 |
+
bottom = max(bottom, found_word['bounding_box'][3]) # Update bottom for all words
|
512 |
+
all_words += found_word['text'] + " " # Concatenate words
|
513 |
+
total_width = found_word['bounding_box'][2] - left # Add each word's width
|
514 |
+
word_num += 1
|
515 |
+
break # Move to the next word in relevant_text
|
516 |
+
|
517 |
+
width = total_width + horizontal_buffer # Set width to total width of all matched words
|
518 |
+
height = bottom - top if word_num > 0 else 0 # Calculate height
|
519 |
+
|
520 |
+
relevant_line_ocr_result = OCRResult(
|
521 |
+
text=relevant_text,
|
522 |
+
left=left,
|
523 |
+
top=top - height_buffer,
|
524 |
+
width=width,
|
525 |
+
height=height + height_buffer
|
526 |
+
)
|
527 |
+
|
528 |
+
if not ocr_results_with_children_line_level:
|
529 |
# Fallback to previous method if not found in ocr_results_with_children
|
530 |
+
print("No child info found")
|
531 |
+
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
532 |
|
533 |
+
# Reset the word positions indicated in the relevant ocr_result - i.e. it starts from 0 and ends at word length
|
534 |
+
result_reset_pos = result
|
535 |
+
result_reset_pos.start = 0
|
536 |
+
result_reset_pos.end = len(relevant_text)
|
537 |
|
538 |
+
print("result_reset_pos:", result_reset_pos)
|
539 |
+
print("relevant_line_ocr_result:", relevant_line_ocr_result)
|
540 |
+
#print("ocr_results_with_children_line_level:", ocr_results_with_children_line_level)
|
541 |
+
|
542 |
# Map the analyzer results to bounding boxes for this line
|
543 |
line_results = self.map_analyzer_results_to_bounding_boxes(
|
544 |
+
[result_reset_pos], [relevant_line_ocr_result], relevant_line_ocr_result.text, allow_list, ocr_results_with_children_line_level
|
545 |
)
|
546 |
+
|
547 |
+
print("line_results:", line_results)
|
548 |
|
549 |
combined_results.extend(line_results)
|
550 |
|
|
|
553 |
@staticmethod
|
554 |
def map_analyzer_results_to_bounding_boxes(
|
555 |
text_analyzer_results: List[RecognizerResult],
|
556 |
+
redaction_relevant_ocr_results: List[OCRResult],
|
557 |
full_text: str,
|
558 |
allow_list: List[str],
|
559 |
+
ocr_results_with_children_child_info: Dict[str, Dict]
|
560 |
) -> List[CustomImageRecognizerResult]:
|
561 |
+
redaction_bboxes = []
|
562 |
text_position = 0
|
563 |
|
564 |
+
for redaction_relevant_ocr_result in redaction_relevant_ocr_results:
|
565 |
+
word_end = text_position + len(redaction_relevant_ocr_result.text)
|
566 |
|
567 |
+
#print("Checking relevant OCR result:", redaction_relevant_ocr_result)
|
568 |
+
|
569 |
+
for redaction_result in text_analyzer_results:
|
570 |
+
max_of_current_text_pos_or_result_start_pos = max(text_position, redaction_result.start)
|
571 |
+
min_of_result_end_pos_or_results_end = min(word_end, redaction_result.end)
|
572 |
|
573 |
+
redaction_result_bounding_box = (redaction_relevant_ocr_result.left, redaction_relevant_ocr_result.top,
|
574 |
+
redaction_relevant_ocr_result.left + redaction_relevant_ocr_result.width,
|
575 |
+
redaction_relevant_ocr_result.top + redaction_relevant_ocr_result.height)
|
576 |
|
577 |
+
if (max_of_current_text_pos_or_result_start_pos < min_of_result_end_pos_or_results_end) and (redaction_relevant_ocr_result.text not in allow_list):
|
578 |
+
#print("result", redaction_result, "made it through if statement")
|
579 |
+
# Find the corresponding entry in ocr_results_with_children that overlap with the redaction result
|
580 |
+
child_info = ocr_results_with_children_child_info#.get(full_text)
|
581 |
|
582 |
+
#print("child_info in sub function:", child_info)
|
583 |
+
#print("redaction_result_bounding_box:", redaction_result_bounding_box)
|
584 |
+
print("Overlaps?", bounding_boxes_overlap(redaction_result_bounding_box, child_info['bounding_box']))
|
585 |
+
|
586 |
+
if bounding_boxes_overlap(redaction_result_bounding_box, child_info['bounding_box']):
|
587 |
# Use the bounding box from ocr_results_with_children
|
588 |
+
bbox = redaction_result_bounding_box #child_info['bounding_box']
|
589 |
left, top, right, bottom = bbox
|
590 |
width = right - left
|
591 |
height = bottom - top
|
592 |
+
|
593 |
else:
|
594 |
+
print("Could not find OCR result")
|
595 |
+
continue
|
|
|
|
|
|
|
596 |
|
597 |
+
redaction_bboxes.append(
|
598 |
CustomImageRecognizerResult(
|
599 |
+
entity_type=redaction_result.entity_type,
|
600 |
+
start=redaction_result.start,
|
601 |
+
end=redaction_result.end,
|
602 |
+
score=redaction_result.score,
|
603 |
left=left,
|
604 |
top=top,
|
605 |
width=width,
|
606 |
height=height,
|
607 |
+
text=redaction_relevant_ocr_result.text
|
608 |
)
|
609 |
)
|
610 |
|
611 |
text_position = word_end + 1 # +1 for the space between words
|
612 |
|
613 |
+
return redaction_bboxes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
614 |
|
615 |
@staticmethod
|
616 |
def remove_space_boxes(ocr_result: dict) -> dict:
|
|
|
782 |
current_bbox = None
|
783 |
line_counter = 1
|
784 |
|
785 |
+
def create_ocr_result_with_children(combined_results, i, current_bbox, current_line):
|
786 |
+
combined_results["text_line_" + str(i)] = {
|
787 |
+
"line": i,
|
788 |
+
'text': current_bbox.text,
|
789 |
+
'bounding_box': (current_bbox.left, current_bbox.top,
|
790 |
+
current_bbox.left + current_bbox.width,
|
791 |
+
current_bbox.top + current_bbox.height),
|
792 |
+
'words': [{'text': word.text,
|
793 |
+
'bounding_box': (word.left, word.top,
|
794 |
+
word.left + word.width,
|
795 |
+
word.top + word.height)}
|
796 |
+
for word in current_line]
|
797 |
+
}
|
798 |
+
return combined_results["text_line_" + str(i)]
|
799 |
+
|
800 |
for result in sorted_results:
|
801 |
if not current_line:
|
802 |
# Start a new line
|
|
|
805 |
else:
|
806 |
# Check if the result is on the same line (y-axis) and close horizontally (x-axis)
|
807 |
last_result = current_line[-1]
|
808 |
+
|
809 |
if abs(result.top - last_result.top) <= y_threshold and \
|
810 |
(result.left - (last_result.left + last_result.width)) <= x_threshold:
|
811 |
# Update the bounding box to include the new word
|
|
|
819 |
)
|
820 |
current_line.append(result)
|
821 |
else:
|
822 |
+
|
823 |
+
|
824 |
# Commit the current line and start a new one
|
825 |
combined_results.append(current_bbox)
|
826 |
+
# new_format_results[current_bbox.text] = { # f"combined_text_{line_counter}"
|
827 |
+
# 'bounding_box': (current_bbox.left, current_bbox.top,
|
828 |
+
# current_bbox.left + current_bbox.width,
|
829 |
+
# current_bbox.top + current_bbox.height),
|
830 |
+
# 'words': [{'text': word.text,
|
831 |
+
# 'bounding_box': (word.left, word.top,
|
832 |
+
# word.left + word.width,
|
833 |
+
# word.top + word.height)}
|
834 |
+
# for word in current_line]
|
835 |
+
# }
|
836 |
+
new_format_results["text_line_" + str(line_counter)] = create_ocr_result_with_children(new_format_results, line_counter, current_bbox, current_line)
|
837 |
+
|
838 |
line_counter += 1
|
839 |
current_line = [result]
|
840 |
current_bbox = result
|
|
|
842 |
# Append the last line
|
843 |
if current_bbox:
|
844 |
combined_results.append(current_bbox)
|
845 |
+
# new_format_results[current_bbox.text] = { # f"combined_text_{line_counter}"
|
846 |
+
# 'bounding_box': (current_bbox.left, current_bbox.top,
|
847 |
+
# current_bbox.left + current_bbox.width,
|
848 |
+
# current_bbox.top + current_bbox.height),
|
849 |
+
# 'words': [{'text': word.text,
|
850 |
+
# 'bounding_box': (word.left, word.top,
|
851 |
+
# word.left + word.width,
|
852 |
+
# word.top + word.height)}
|
853 |
+
# for word in current_line]
|
854 |
+
# }
|
855 |
+
|
856 |
+
new_format_results["text_line_" + str(line_counter)] = create_ocr_result_with_children(new_format_results, line_counter, current_bbox, current_line)
|
857 |
+
|
858 |
|
859 |
return combined_results, new_format_results
|
860 |
|
tools/file_conversion.py
CHANGED
@@ -219,7 +219,7 @@ def prepare_image_or_pdf(
|
|
219 |
print(out_message)
|
220 |
return out_message, out_file_paths
|
221 |
|
222 |
-
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis -
|
223 |
# Analyse and redact image-based pdf or image
|
224 |
if is_pdf_or_image(file_path) == False:
|
225 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
|
|
219 |
print(out_message)
|
220 |
return out_message, out_file_paths
|
221 |
|
222 |
+
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
223 |
# Analyse and redact image-based pdf or image
|
224 |
if is_pdf_or_image(file_path) == False:
|
225 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
tools/file_redaction.py
CHANGED
@@ -9,7 +9,7 @@ import pandas as pd
|
|
9 |
|
10 |
#from presidio_image_redactor.entities import ImageRecognizerResult
|
11 |
from pdfminer.high_level import extract_pages
|
12 |
-
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTTextLineHorizontal
|
13 |
from pikepdf import Pdf, Dictionary, Name
|
14 |
import gradio as gr
|
15 |
from gradio import Progress
|
@@ -88,8 +88,11 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
88 |
print("In allow list:", in_allow_list_flat)
|
89 |
else:
|
90 |
in_allow_list_flat = []
|
|
|
|
|
91 |
|
92 |
-
for file in
|
|
|
93 |
file_path = file.name
|
94 |
|
95 |
if file_path:
|
@@ -104,14 +107,14 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
104 |
print(out_message)
|
105 |
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
106 |
|
107 |
-
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis -
|
108 |
#Analyse and redact image-based pdf or image
|
109 |
if is_pdf_or_image(file_path) == False:
|
110 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
111 |
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
112 |
|
113 |
print("Redacting file " + file_path_without_ext + " as an image-based file")
|
114 |
-
pdf_images,
|
115 |
|
116 |
# Save file
|
117 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
@@ -124,10 +127,14 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
124 |
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
125 |
|
126 |
# Save decision making process
|
127 |
-
output_logs_str = str(output_logs)
|
128 |
-
logs_output_file_name = out_image_file_path + "_decision_process_output.txt"
|
129 |
-
with open(logs_output_file_name, "w") as f:
|
130 |
-
|
|
|
|
|
|
|
|
|
131 |
log_files_output_paths.append(logs_output_file_name)
|
132 |
|
133 |
# Save Textract request metadata (if exists)
|
@@ -147,7 +154,7 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
147 |
|
148 |
# Analyse text-based pdf
|
149 |
print('Redacting file as text-based PDF')
|
150 |
-
pdf_text,
|
151 |
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
152 |
pdf_text.save(out_text_file_path)
|
153 |
|
@@ -159,12 +166,19 @@ def choose_and_run_redactor(file_paths:List[str], image_paths:List[str], languag
|
|
159 |
img_output_summary, img_output_file_path = convert_text_pdf_to_img_pdf(file_path, [out_text_file_path])
|
160 |
out_file_paths.extend(img_output_file_path)
|
161 |
|
162 |
-
|
163 |
-
logs_output_file_name = img_output_file_path[0] + "_decision_process_output.txt"
|
164 |
-
with open(logs_output_file_name, "w") as f:
|
165 |
-
|
|
|
|
|
|
|
166 |
log_files_output_paths.append(logs_output_file_name)
|
167 |
|
|
|
|
|
|
|
|
|
168 |
out_message_new = "File '" + file_path_without_ext + "' successfully redacted"
|
169 |
out_message.append(out_message_new)
|
170 |
|
@@ -205,7 +219,7 @@ def bounding_boxes_overlap(box1, box2):
|
|
205 |
return (box1[0] < box2[2] and box2[0] < box1[2] and
|
206 |
box1[1] < box2[3] and box2[1] < box1[3])
|
207 |
|
208 |
-
def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_results=[], handwriting_recogniser_results=[], handwrite_signature_checkbox: List[str]=["Redact all identified handwriting", "Redact all identified signatures"], horizontal_threshold=
|
209 |
merged_bboxes = []
|
210 |
grouped_bboxes = defaultdict(list)
|
211 |
|
@@ -348,9 +362,11 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
348 |
|
349 |
all_ocr_results = []
|
350 |
all_decision_process = []
|
|
|
|
|
351 |
|
352 |
-
if analysis_type == "Quick image analysis - typed text": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".
|
353 |
-
elif analysis_type == "Complex image analysis -
|
354 |
|
355 |
for n in range(0, number_of_pages):
|
356 |
handwriting_or_signature_boxes = []
|
@@ -395,21 +411,21 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
395 |
# Step 1: Perform OCR. Either with Tesseract, or with AWS Textract
|
396 |
if analysis_type == "Quick image analysis - typed text":
|
397 |
|
398 |
-
|
399 |
|
400 |
# Combine OCR results
|
401 |
-
|
402 |
|
403 |
#print("ocr_results after:", ocr_results)
|
404 |
|
405 |
-
# Save
|
406 |
-
ocr_results_with_children_str = str(
|
407 |
logs_output_file_name = output_folder + "ocr_with_children.txt"
|
408 |
with open(logs_output_file_name, "w") as f:
|
409 |
f.write(ocr_results_with_children_str)
|
410 |
|
411 |
# Import results from json and convert
|
412 |
-
if analysis_type == "Complex image analysis -
|
413 |
|
414 |
# Convert the image to bytes using an in-memory buffer
|
415 |
image_buffer = io.BytesIO()
|
@@ -429,18 +445,18 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
429 |
text_blocks = json.load(json_file)
|
430 |
text_blocks = text_blocks['Blocks']
|
431 |
|
432 |
-
|
433 |
|
434 |
-
# Save
|
435 |
-
ocr_results_with_children_str = str(
|
436 |
-
logs_output_file_name = output_folder + "ocr_with_children_textract.txt"
|
437 |
-
with open(logs_output_file_name, "w") as f:
|
438 |
-
|
439 |
|
440 |
# Step 2: Analyze text and identify PII
|
441 |
-
|
442 |
-
|
443 |
-
|
444 |
language=language,
|
445 |
entities=chosen_redact_entities,
|
446 |
allow_list=allow_list,
|
@@ -448,49 +464,80 @@ def redact_image_pdf(file_path:str, image_paths:List[str], language:str, chosen_
|
|
448 |
)
|
449 |
|
450 |
if analysis_type == "Quick image analysis - typed text": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".txt"
|
451 |
-
elif analysis_type == "Complex image analysis -
|
452 |
|
453 |
# Save decision making process
|
454 |
-
bboxes_str = str(
|
455 |
with open(interim_results_file_path, "w") as f:
|
456 |
f.write(bboxes_str)
|
457 |
|
458 |
# Merge close bounding boxes
|
459 |
-
|
460 |
|
461 |
-
|
462 |
-
if merged_bboxes:
|
463 |
-
for bbox in merged_bboxes:
|
464 |
-
print(f"Entity: {bbox.entity_type}, Text: {bbox.text}, Bbox: ({bbox.left}, {bbox.top}, {bbox.width}, {bbox.height})")
|
465 |
-
|
466 |
-
|
467 |
-
decision_process_output_str = "Page " + reported_page_number + ":\n" + str(merged_bboxes)
|
468 |
-
all_decision_process.append(decision_process_output_str)
|
469 |
|
470 |
# 3. Draw the merged boxes
|
471 |
draw = ImageDraw.Draw(image)
|
472 |
|
473 |
-
for box in
|
474 |
x0 = box.left
|
475 |
y0 = box.top
|
476 |
x1 = x0 + box.width
|
477 |
y1 = y0 + box.height
|
478 |
draw.rectangle([x0, y0, x1, y1], fill=fill)
|
479 |
|
480 |
-
|
481 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
482 |
|
483 |
images.append(image)
|
484 |
|
485 |
# Write OCR results as a log file
|
486 |
-
|
487 |
-
with open(ocr_results_file_path, "w") as f:
|
488 |
-
|
489 |
-
logging_file_paths.append(ocr_results_file_path)
|
490 |
|
491 |
-
|
|
|
492 |
|
493 |
-
return images,
|
494 |
|
495 |
def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
|
496 |
if isinstance(text_container, LTTextContainer):
|
@@ -512,7 +559,56 @@ def analyze_text_container(text_container, language, chosen_redact_entities, sco
|
|
512 |
return analyzer_results, characters
|
513 |
return [], []
|
514 |
|
515 |
-
def
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
516 |
'''
|
517 |
Merge identified bounding boxes containing PII that are very close to one another
|
518 |
'''
|
@@ -520,15 +616,19 @@ def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, verti
|
|
520 |
if len(analyzer_results) > 0 and len(characters) > 0:
|
521 |
# Extract bounding box coordinates for sorting
|
522 |
bounding_boxes = []
|
|
|
523 |
for result in analyzer_results:
|
524 |
char_boxes = [char.bbox for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
|
|
525 |
if char_boxes:
|
526 |
# Calculate the bounding box that encompasses all characters
|
527 |
left = min(box[0] for box in char_boxes)
|
528 |
bottom = min(box[1] for box in char_boxes)
|
529 |
right = max(box[2] for box in char_boxes)
|
530 |
top = max(box[3] for box in char_boxes) + vertical_padding
|
531 |
-
bounding_boxes.append((bottom, left, result, [left, bottom, right, top])) # (y, x, result, bbox)
|
|
|
|
|
532 |
|
533 |
# Sort the results by y-coordinate and then by x-coordinate
|
534 |
bounding_boxes.sort()
|
@@ -537,22 +637,24 @@ def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, verti
|
|
537 |
current_box = None
|
538 |
current_y = None
|
539 |
current_result = None
|
|
|
540 |
|
541 |
-
for y, x, result, char_box in bounding_boxes:
|
542 |
-
print(f"Considering result: {result}")
|
543 |
-
print(f"Character box: {char_box}")
|
544 |
|
545 |
if current_y is None or current_box is None:
|
546 |
current_box = char_box
|
547 |
current_y = char_box[1]
|
548 |
current_result = result
|
549 |
-
|
|
|
550 |
else:
|
551 |
vertical_diff_bboxes = abs(char_box[1] - current_y)
|
552 |
horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
|
553 |
|
554 |
-
print(f"Comparing boxes: current_box={current_box}, char_box={char_box}")
|
555 |
-
print(f"Vertical diff: {vertical_diff_bboxes}, Horizontal diff: {horizontal_diff_bboxes}")
|
556 |
|
557 |
if (
|
558 |
vertical_diff_bboxes <= 5
|
@@ -561,26 +663,30 @@ def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, verti
|
|
561 |
current_box[2] = char_box[2] # Extend the current box horizontally
|
562 |
current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
|
563 |
current_result.end = max(current_result.end, result.end) # Extend the text range
|
564 |
-
|
|
|
|
|
|
|
565 |
else:
|
566 |
merged_bounding_boxes.append(
|
567 |
-
{"boundingBox": current_box, "result": current_result})
|
568 |
-
print(f"Appending merged box: {current_box}")
|
569 |
|
570 |
# Reset current_box and current_y after appending
|
571 |
current_box = char_box
|
572 |
current_y = char_box[1]
|
573 |
current_result = result
|
574 |
-
|
|
|
575 |
|
576 |
# After finishing with the current result, add the last box for this result
|
577 |
if current_box:
|
578 |
-
merged_bounding_boxes.append({"boundingBox": current_box, "result": current_result})
|
579 |
-
print(f"Appending final box for result: {current_box}")
|
580 |
|
581 |
if not merged_bounding_boxes:
|
582 |
analyzed_bounding_boxes.extend(
|
583 |
-
{"boundingBox": char.bbox, "result": result}
|
584 |
for result in analyzer_results
|
585 |
for char in characters[result.start:result.end]
|
586 |
if isinstance(char, LTChar)
|
@@ -588,7 +694,7 @@ def merge_bounding_boxes(analyzer_results, characters, combine_pixel_dist, verti
|
|
588 |
else:
|
589 |
analyzed_bounding_boxes.extend(merged_bounding_boxes)
|
590 |
|
591 |
-
print("Analyzed bounding boxes:\n\n", analyzed_bounding_boxes)
|
592 |
|
593 |
return analyzed_bounding_boxes
|
594 |
|
@@ -635,7 +741,7 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
635 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
636 |
'''
|
637 |
annotations_all_pages = []
|
638 |
-
decision_process_table_all_pages =
|
639 |
|
640 |
combine_pixel_dist = 100 # Horizontal distance between PII bounding boxes under/equal they are combined into one
|
641 |
|
@@ -669,7 +775,7 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
669 |
|
670 |
|
671 |
annotations_on_page = []
|
672 |
-
decision_process_table_on_page =
|
673 |
|
674 |
for page_layout in extract_pages(filename, page_numbers = [page_no], maxpages=1):
|
675 |
|
@@ -678,25 +784,41 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
678 |
text_container_analyzer_results = []
|
679 |
text_container_analyzed_bounding_boxes = []
|
680 |
characters = []
|
|
|
681 |
|
682 |
if analysis_type == "Simple text analysis - PDFs with selectable text":
|
683 |
for i, text_container in enumerate(page_layout):
|
684 |
|
685 |
text_container_analyzer_results, characters = analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list)
|
686 |
|
687 |
-
#
|
688 |
-
|
689 |
|
690 |
-
|
|
|
691 |
|
692 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
693 |
|
694 |
-
|
695 |
|
|
|
|
|
696 |
|
697 |
-
|
698 |
page_analyzer_results.extend(text_container_analyzer_results)
|
|
|
699 |
|
|
|
|
|
|
|
700 |
|
701 |
decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
|
702 |
|
@@ -705,12 +827,15 @@ def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str]
|
|
705 |
|
706 |
# Make page annotations
|
707 |
page.Annots = pdf.make_indirect(annotations_on_page)
|
708 |
-
|
709 |
annotations_all_pages.extend([annotations_on_page])
|
710 |
-
|
|
|
|
|
|
|
|
|
711 |
|
712 |
print("For page number:", page_no, "there are", len(annotations_all_pages[page_num]), "annotations")
|
713 |
|
714 |
#page_num += 1
|
715 |
|
716 |
-
return pdf, decision_process_table_all_pages
|
|
|
9 |
|
10 |
#from presidio_image_redactor.entities import ImageRecognizerResult
|
11 |
from pdfminer.high_level import extract_pages
|
12 |
+
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTTextLineHorizontal, LTAnno
|
13 |
from pikepdf import Pdf, Dictionary, Name
|
14 |
import gradio as gr
|
15 |
from gradio import Progress
|
|
|
88 |
print("In allow list:", in_allow_list_flat)
|
89 |
else:
|
90 |
in_allow_list_flat = []
|
91 |
+
|
92 |
+
progress(0.5, desc="Redacting file")
|
93 |
|
94 |
+
for file in file_paths_loop:
|
95 |
+
#for file in progress.tqdm(file_paths_loop, desc="Redacting files", unit = "files"):
|
96 |
file_path = file.name
|
97 |
|
98 |
if file_path:
|
|
|
107 |
print(out_message)
|
108 |
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
109 |
|
110 |
+
if in_redact_method == "Quick image analysis - typed text" or in_redact_method == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
111 |
#Analyse and redact image-based pdf or image
|
112 |
if is_pdf_or_image(file_path) == False:
|
113 |
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
114 |
return out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, estimated_time_taken_state, all_request_metadata_str
|
115 |
|
116 |
print("Redacting file " + file_path_without_ext + " as an image-based file")
|
117 |
+
pdf_images, redaction_logs, logging_file_paths, new_request_metadata = redact_image_pdf(file_path, image_paths, language, chosen_redact_entities, in_allow_list_flat, is_a_pdf, page_min, page_max, in_redact_method, handwrite_signature_checkbox)
|
118 |
|
119 |
# Save file
|
120 |
out_image_file_path = output_folder + file_path_without_ext + "_redacted_as_img.pdf"
|
|
|
127 |
out_message.append("File '" + file_path_without_ext + "' successfully redacted")
|
128 |
|
129 |
# Save decision making process
|
130 |
+
# output_logs_str = str(output_logs)
|
131 |
+
# logs_output_file_name = out_image_file_path + "_decision_process_output.txt"
|
132 |
+
# with open(logs_output_file_name, "w") as f:
|
133 |
+
# f.write(output_logs_str)
|
134 |
+
# log_files_output_paths.append(logs_output_file_name)
|
135 |
+
|
136 |
+
logs_output_file_name = out_image_file_path + "_decision_process_output.csv"
|
137 |
+
redaction_logs.to_csv(logs_output_file_name)
|
138 |
log_files_output_paths.append(logs_output_file_name)
|
139 |
|
140 |
# Save Textract request metadata (if exists)
|
|
|
154 |
|
155 |
# Analyse text-based pdf
|
156 |
print('Redacting file as text-based PDF')
|
157 |
+
pdf_text, decision_process_logs, page_text_outputs = redact_text_pdf(file_path, language, chosen_redact_entities, in_allow_list_flat, page_min, page_max, "Simple text analysis - PDFs with selectable text")
|
158 |
out_text_file_path = output_folder + file_path_without_ext + "_text_redacted.pdf"
|
159 |
pdf_text.save(out_text_file_path)
|
160 |
|
|
|
166 |
img_output_summary, img_output_file_path = convert_text_pdf_to_img_pdf(file_path, [out_text_file_path])
|
167 |
out_file_paths.extend(img_output_file_path)
|
168 |
|
169 |
+
#decision_process_logs_str = str(decision_process_logs)
|
170 |
+
#logs_output_file_name = img_output_file_path[0] + "_decision_process_output.txt"
|
171 |
+
#with open(logs_output_file_name, "w") as f:
|
172 |
+
# f.write(output_logs_str)
|
173 |
+
|
174 |
+
logs_output_file_name = img_output_file_path[0] + "_decision_process_output.csv"
|
175 |
+
decision_process_logs.to_csv(logs_output_file_name)
|
176 |
log_files_output_paths.append(logs_output_file_name)
|
177 |
|
178 |
+
all_text_output_file_name = img_output_file_path[0] + "_all_text_output.csv"
|
179 |
+
page_text_outputs.to_csv(all_text_output_file_name)
|
180 |
+
log_files_output_paths.append(all_text_output_file_name)
|
181 |
+
|
182 |
out_message_new = "File '" + file_path_without_ext + "' successfully redacted"
|
183 |
out_message.append(out_message_new)
|
184 |
|
|
|
219 |
return (box1[0] < box2[2] and box2[0] < box1[2] and
|
220 |
box1[1] < box2[3] and box2[1] < box1[3])
|
221 |
|
222 |
+
def merge_img_bboxes(bboxes, combined_results: Dict, signature_recogniser_results=[], handwriting_recogniser_results=[], handwrite_signature_checkbox: List[str]=["Redact all identified handwriting", "Redact all identified signatures"], horizontal_threshold:int=50, vertical_threshold:int=12):
|
223 |
merged_bboxes = []
|
224 |
grouped_bboxes = defaultdict(list)
|
225 |
|
|
|
362 |
|
363 |
all_ocr_results = []
|
364 |
all_decision_process = []
|
365 |
+
all_line_level_ocr_results_df = pd.DataFrame()
|
366 |
+
all_decision_process_table = pd.DataFrame()
|
367 |
|
368 |
+
if analysis_type == "Quick image analysis - typed text": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".csv"
|
369 |
+
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": ocr_results_file_path = output_folder + "ocr_results_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.csv"
|
370 |
|
371 |
for n in range(0, number_of_pages):
|
372 |
handwriting_or_signature_boxes = []
|
|
|
411 |
# Step 1: Perform OCR. Either with Tesseract, or with AWS Textract
|
412 |
if analysis_type == "Quick image analysis - typed text":
|
413 |
|
414 |
+
word_level_ocr_results = image_analyser.perform_ocr(image)
|
415 |
|
416 |
# Combine OCR results
|
417 |
+
line_level_ocr_results, line_level_ocr_results_with_children = combine_ocr_results(word_level_ocr_results)
|
418 |
|
419 |
#print("ocr_results after:", ocr_results)
|
420 |
|
421 |
+
# Save ocr_with_children_outputs
|
422 |
+
ocr_results_with_children_str = str(line_level_ocr_results_with_children)
|
423 |
logs_output_file_name = output_folder + "ocr_with_children.txt"
|
424 |
with open(logs_output_file_name, "w") as f:
|
425 |
f.write(ocr_results_with_children_str)
|
426 |
|
427 |
# Import results from json and convert
|
428 |
+
if analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)":
|
429 |
|
430 |
# Convert the image to bytes using an in-memory buffer
|
431 |
image_buffer = io.BytesIO()
|
|
|
445 |
text_blocks = json.load(json_file)
|
446 |
text_blocks = text_blocks['Blocks']
|
447 |
|
448 |
+
line_level_ocr_results, handwriting_or_signature_boxes, signature_recogniser_results, handwriting_recogniser_results, line_level_ocr_results_with_children = json_to_ocrresult(text_blocks, page_width, page_height)
|
449 |
|
450 |
+
# Save ocr_with_children_output
|
451 |
+
# ocr_results_with_children_str = str(line_level_ocr_results_with_children)
|
452 |
+
# logs_output_file_name = output_folder + "ocr_with_children_textract.txt"
|
453 |
+
# with open(logs_output_file_name, "w") as f:
|
454 |
+
# f.write(ocr_results_with_children_str)
|
455 |
|
456 |
# Step 2: Analyze text and identify PII
|
457 |
+
redaction_bboxes = image_analyser.analyze_text(
|
458 |
+
line_level_ocr_results,
|
459 |
+
line_level_ocr_results_with_children,
|
460 |
language=language,
|
461 |
entities=chosen_redact_entities,
|
462 |
allow_list=allow_list,
|
|
|
464 |
)
|
465 |
|
466 |
if analysis_type == "Quick image analysis - typed text": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + ".txt"
|
467 |
+
elif analysis_type == "Complex image analysis - docs with handwriting/signatures (AWS Textract)": interim_results_file_path = output_folder + "interim_analyser_bboxes_" + file_name + "_pages_" + str(page_min + 1) + "_" + str(page_max) + "_textract.txt"
|
468 |
|
469 |
# Save decision making process
|
470 |
+
bboxes_str = str(redaction_bboxes)
|
471 |
with open(interim_results_file_path, "w") as f:
|
472 |
f.write(bboxes_str)
|
473 |
|
474 |
# Merge close bounding boxes
|
475 |
+
merged_redaction_bboxes = merge_img_bboxes(redaction_bboxes, line_level_ocr_results_with_children, signature_recogniser_results, handwriting_recogniser_results, handwrite_signature_checkbox)
|
476 |
|
477 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
478 |
|
479 |
# 3. Draw the merged boxes
|
480 |
draw = ImageDraw.Draw(image)
|
481 |
|
482 |
+
for box in merged_redaction_bboxes:
|
483 |
x0 = box.left
|
484 |
y0 = box.top
|
485 |
x1 = x0 + box.width
|
486 |
y1 = y0 + box.height
|
487 |
draw.rectangle([x0, y0, x1, y1], fill=fill)
|
488 |
|
489 |
+
# Log OCR results
|
490 |
+
|
491 |
+
#line_level_ocr_results_str = "Page:" + reported_page_number + "\n" + str(line_level_ocr_results)
|
492 |
+
#all_ocr_results.append(line_level_ocr_results_str)
|
493 |
+
|
494 |
+
# Convert to DataFrame and add to ongoing logging table
|
495 |
+
line_level_ocr_results_df = pd.DataFrame([{
|
496 |
+
'page': reported_page_number,
|
497 |
+
'text': result.text,
|
498 |
+
'left': result.left,
|
499 |
+
'top': result.top,
|
500 |
+
'width': result.width,
|
501 |
+
'height': result.height
|
502 |
+
} for result in line_level_ocr_results])
|
503 |
+
|
504 |
+
all_line_level_ocr_results_df = pd.concat([all_line_level_ocr_results_df, line_level_ocr_results_df])
|
505 |
+
|
506 |
+
# Convert decision process to table
|
507 |
+
# Export the decision making process
|
508 |
+
if merged_redaction_bboxes:
|
509 |
+
# for bbox in merged_redaction_bboxes:
|
510 |
+
# print(f"Entity: {bbox.entity_type}, Text: {bbox.text}, Bbox: ({bbox.left}, {bbox.top}, {bbox.width}, {bbox.height})")
|
511 |
+
|
512 |
+
#decision_process_output_str = "Page " + reported_page_number + ":\n" + str(merged_redaction_bboxes)
|
513 |
+
#all_decision_process.append(decision_process_output_str)
|
514 |
+
|
515 |
+
decision_process_table = pd.DataFrame([{
|
516 |
+
'page': reported_page_number,
|
517 |
+
'entity_type': result.entity_type,
|
518 |
+
'start': result.start,
|
519 |
+
'end': result.end,
|
520 |
+
'score': result.score,
|
521 |
+
'left': result.left,
|
522 |
+
'top': result.top,
|
523 |
+
'width': result.width,
|
524 |
+
'height': result.height,
|
525 |
+
'text': result.text
|
526 |
+
} for result in merged_redaction_bboxes])
|
527 |
+
|
528 |
+
all_decision_process_table = pd.concat([all_decision_process_table, decision_process_table])
|
529 |
|
530 |
images.append(image)
|
531 |
|
532 |
# Write OCR results as a log file
|
533 |
+
# line_level_ocr_results_out = "\n".join(all_ocr_results)
|
534 |
+
# with open(ocr_results_file_path, "w") as f:
|
535 |
+
# f.write(line_level_ocr_results_out)
|
|
|
536 |
|
537 |
+
all_line_level_ocr_results_df.to_csv(ocr_results_file_path)
|
538 |
+
logging_file_paths.append(ocr_results_file_path)
|
539 |
|
540 |
+
return images, all_decision_process_table, logging_file_paths, request_metadata
|
541 |
|
542 |
def analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list):
|
543 |
if isinstance(text_container, LTTextContainer):
|
|
|
559 |
return analyzer_results, characters
|
560 |
return [], []
|
561 |
|
562 |
+
def create_text_bounding_boxes_from_characters(char_objects:List[LTChar]) -> OCRResult:
|
563 |
+
'''
|
564 |
+
Create an OCRResult object based on a list of pdfminer LTChar objects.
|
565 |
+
'''
|
566 |
+
|
567 |
+
# Initialize variables
|
568 |
+
full_text = ""
|
569 |
+
overall_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')] # [x0, y0, x1, y1]
|
570 |
+
word_bboxes = []
|
571 |
+
|
572 |
+
# Iterate through the character objects
|
573 |
+
current_word = ""
|
574 |
+
current_word_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')] # [x0, y0, x1, y1]
|
575 |
+
|
576 |
+
for char in char_objects:
|
577 |
+
if isinstance(char, LTAnno):
|
578 |
+
# Handle space separately by finalizing the word
|
579 |
+
full_text += char.get_text() # Adds space or newline
|
580 |
+
if current_word: # Only finalize if there is a current word
|
581 |
+
word_bboxes.append((current_word, current_word_bbox))
|
582 |
+
current_word = ""
|
583 |
+
current_word_bbox = [float('inf'), float('inf'), float('-inf'), float('-inf')] # Reset for next word
|
584 |
+
continue
|
585 |
+
|
586 |
+
# Concatenate text for LTChar
|
587 |
+
full_text += char.get_text()
|
588 |
+
|
589 |
+
# Update overall bounding box
|
590 |
+
x0, y0, x1, y1 = char.bbox
|
591 |
+
overall_bbox[0] = min(overall_bbox[0], x0) # x0
|
592 |
+
overall_bbox[1] = min(overall_bbox[1], y0) # y0
|
593 |
+
overall_bbox[2] = max(overall_bbox[2], x1) # x1
|
594 |
+
overall_bbox[3] = max(overall_bbox[3], y1) # y1
|
595 |
+
|
596 |
+
# Update current word
|
597 |
+
current_word += char.get_text()
|
598 |
+
|
599 |
+
# Update current word bounding box
|
600 |
+
current_word_bbox[0] = min(current_word_bbox[0], x0) # x0
|
601 |
+
current_word_bbox[1] = min(current_word_bbox[1], y0) # y0
|
602 |
+
current_word_bbox[2] = max(current_word_bbox[2], x1) # x1
|
603 |
+
current_word_bbox[3] = max(current_word_bbox[3], y1) # y1
|
604 |
+
|
605 |
+
# Finalize the last word if any
|
606 |
+
if current_word:
|
607 |
+
word_bboxes.append((current_word, current_word_bbox))
|
608 |
+
|
609 |
+
return OCRResult(full_text, overall_bbox[0], overall_bbox[1], overall_bbox[2], overall_bbox[3])
|
610 |
+
|
611 |
+
def merge_text_bounding_boxes(analyzer_results:CustomImageRecognizerResult, characters:List[LTChar], combine_pixel_dist:int, vertical_padding:int=2):
|
612 |
'''
|
613 |
Merge identified bounding boxes containing PII that are very close to one another
|
614 |
'''
|
|
|
616 |
if len(analyzer_results) > 0 and len(characters) > 0:
|
617 |
# Extract bounding box coordinates for sorting
|
618 |
bounding_boxes = []
|
619 |
+
text_out = []
|
620 |
for result in analyzer_results:
|
621 |
char_boxes = [char.bbox for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
622 |
+
char_text = [char._text for char in characters[result.start:result.end] if isinstance(char, LTChar)]
|
623 |
if char_boxes:
|
624 |
# Calculate the bounding box that encompasses all characters
|
625 |
left = min(box[0] for box in char_boxes)
|
626 |
bottom = min(box[1] for box in char_boxes)
|
627 |
right = max(box[2] for box in char_boxes)
|
628 |
top = max(box[3] for box in char_boxes) + vertical_padding
|
629 |
+
bounding_boxes.append((bottom, left, result, [left, bottom, right, top], char_text)) # (y, x, result, bbox, text)
|
630 |
+
|
631 |
+
char_text = "".join(char_text)
|
632 |
|
633 |
# Sort the results by y-coordinate and then by x-coordinate
|
634 |
bounding_boxes.sort()
|
|
|
637 |
current_box = None
|
638 |
current_y = None
|
639 |
current_result = None
|
640 |
+
current_text = []
|
641 |
|
642 |
+
for y, x, result, char_box, text in bounding_boxes:
|
643 |
+
#print(f"Considering result: {result}")
|
644 |
+
#print(f"Character box: {char_box}")
|
645 |
|
646 |
if current_y is None or current_box is None:
|
647 |
current_box = char_box
|
648 |
current_y = char_box[1]
|
649 |
current_result = result
|
650 |
+
current_text = list(text)
|
651 |
+
#print(f"Starting new box: {current_box}")
|
652 |
else:
|
653 |
vertical_diff_bboxes = abs(char_box[1] - current_y)
|
654 |
horizontal_diff_bboxes = abs(char_box[0] - current_box[2])
|
655 |
|
656 |
+
#print(f"Comparing boxes: current_box={current_box}, char_box={char_box}")
|
657 |
+
#print(f"Vertical diff: {vertical_diff_bboxes}, Horizontal diff: {horizontal_diff_bboxes}")
|
658 |
|
659 |
if (
|
660 |
vertical_diff_bboxes <= 5
|
|
|
663 |
current_box[2] = char_box[2] # Extend the current box horizontally
|
664 |
current_box[3] = max(current_box[3], char_box[3]) # Ensure the top is the highest
|
665 |
current_result.end = max(current_result.end, result.end) # Extend the text range
|
666 |
+
# Add a space if current_text is not empty
|
667 |
+
if current_text:
|
668 |
+
current_text.append(" ") # Add space between texts
|
669 |
+
current_text.extend(text)
|
670 |
else:
|
671 |
merged_bounding_boxes.append(
|
672 |
+
{"text":"".join(current_text),"boundingBox": current_box, "result": current_result})
|
673 |
+
#print(f"Appending merged box: {current_box}")
|
674 |
|
675 |
# Reset current_box and current_y after appending
|
676 |
current_box = char_box
|
677 |
current_y = char_box[1]
|
678 |
current_result = result
|
679 |
+
current_text = list(text)
|
680 |
+
#print(f"Starting new box: {current_box}")
|
681 |
|
682 |
# After finishing with the current result, add the last box for this result
|
683 |
if current_box:
|
684 |
+
merged_bounding_boxes.append({"text":"".join(current_text), "boundingBox": current_box, "result": current_result})
|
685 |
+
#print(f"Appending final box for result: {current_box}")
|
686 |
|
687 |
if not merged_bounding_boxes:
|
688 |
analyzed_bounding_boxes.extend(
|
689 |
+
{"text":text, "boundingBox": char.bbox, "result": result}
|
690 |
for result in analyzer_results
|
691 |
for char in characters[result.start:result.end]
|
692 |
if isinstance(char, LTChar)
|
|
|
694 |
else:
|
695 |
analyzed_bounding_boxes.extend(merged_bounding_boxes)
|
696 |
|
697 |
+
#print("Analyzed bounding boxes:\n\n", analyzed_bounding_boxes)
|
698 |
|
699 |
return analyzed_bounding_boxes
|
700 |
|
|
|
741 |
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
742 |
'''
|
743 |
annotations_all_pages = []
|
744 |
+
decision_process_table_all_pages = pd.DataFrame()
|
745 |
|
746 |
combine_pixel_dist = 100 # Horizontal distance between PII bounding boxes under/equal they are combined into one
|
747 |
|
|
|
775 |
|
776 |
|
777 |
annotations_on_page = []
|
778 |
+
decision_process_table_on_page = pd.DataFrame()
|
779 |
|
780 |
for page_layout in extract_pages(filename, page_numbers = [page_no], maxpages=1):
|
781 |
|
|
|
784 |
text_container_analyzer_results = []
|
785 |
text_container_analyzed_bounding_boxes = []
|
786 |
characters = []
|
787 |
+
page_text_outputs = pd.DataFrame()
|
788 |
|
789 |
if analysis_type == "Simple text analysis - PDFs with selectable text":
|
790 |
for i, text_container in enumerate(page_layout):
|
791 |
|
792 |
text_container_analyzer_results, characters = analyze_text_container(text_container, language, chosen_redact_entities, score_threshold, allow_list)
|
793 |
|
794 |
+
# Create dataframe for all the text on the page
|
795 |
+
line_level_text_results = create_text_bounding_boxes_from_characters(characters)
|
796 |
|
797 |
+
if line_level_text_results.text:
|
798 |
+
line_level_text_results_list = [line_level_text_results]
|
799 |
|
800 |
+
# Convert to DataFrame and add to ongoing logging table
|
801 |
+
line_level_text_results_df = pd.DataFrame([{
|
802 |
+
'page': page_no + 1,
|
803 |
+
'text': result.text,
|
804 |
+
'left': result.left,
|
805 |
+
'top': result.top,
|
806 |
+
'width': result.width,
|
807 |
+
'height': result.height
|
808 |
+
} for result in line_level_text_results_list])
|
809 |
|
810 |
+
page_text_outputs = pd.concat([page_text_outputs, line_level_text_results_df])
|
811 |
|
812 |
+
# Merge bounding boxes if very close together
|
813 |
+
text_container_analyzed_bounding_boxes = merge_text_bounding_boxes(text_container_analyzer_results, characters, combine_pixel_dist, vertical_padding = 2)
|
814 |
|
815 |
+
|
816 |
page_analyzer_results.extend(text_container_analyzer_results)
|
817 |
+
page_analyzed_bounding_boxes.extend(text_container_analyzed_bounding_boxes)
|
818 |
|
819 |
+
|
820 |
+
print("page_analyzer_results:", page_analyzer_results)
|
821 |
+
print("page_analyzed_bounding_boxes:", page_analyzed_bounding_boxes)
|
822 |
|
823 |
decision_process_table_on_page = create_text_redaction_process_results(page_analyzer_results, page_analyzed_bounding_boxes, page_num)
|
824 |
|
|
|
827 |
|
828 |
# Make page annotations
|
829 |
page.Annots = pdf.make_indirect(annotations_on_page)
|
|
|
830 |
annotations_all_pages.extend([annotations_on_page])
|
831 |
+
|
832 |
+
decision_process_table_all_pages = pd.concat([decision_process_table_all_pages, decision_process_table_on_page])
|
833 |
+
|
834 |
+
page_text_outputs = page_text_outputs.sort_values(["top", "left"], ascending=[False, False]).reset_index(drop=True)
|
835 |
+
#page_text_outputs.to_csv("text_page_text_outputs.csv")
|
836 |
|
837 |
print("For page number:", page_no, "there are", len(annotations_all_pages[page_num]), "annotations")
|
838 |
|
839 |
#page_num += 1
|
840 |
|
841 |
+
return pdf, decision_process_table_all_pages, page_text_outputs
|