|
from pdf2image import convert_from_path, pdfinfo_from_path
|
|
|
|
from PIL import Image, ImageFile
|
|
import os
|
|
import re
|
|
import time
|
|
import json
|
|
import pymupdf
|
|
from pymupdf import Document
|
|
import pandas as pd
|
|
|
|
import shutil
|
|
from pymupdf import Rect
|
|
from fitz import Page
|
|
from tqdm import tqdm
|
|
from gradio import Progress
|
|
from typing import List, Optional
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from pdf2image import convert_from_path
|
|
from PIL import Image
|
|
from scipy.spatial import cKDTree
|
|
|
|
from tools.config import output_folder, IMAGES_DPI, LOAD_TRUNCATED_IMAGES, MAX_IMAGE_PIXELS, CUSTOM_BOX_COLOUR
|
|
from tools.helper_functions import get_file_name_without_type, tesseract_ocr_option, text_ocr_option, textract_option, read_file
|
|
|
|
image_dpi = float(IMAGES_DPI)
|
|
if not MAX_IMAGE_PIXELS: Image.MAX_IMAGE_PIXELS = None
|
|
else: Image.MAX_IMAGE_PIXELS = MAX_IMAGE_PIXELS
|
|
ImageFile.LOAD_TRUNCATED_IMAGES = LOAD_TRUNCATED_IMAGES.lower() == "true"
|
|
|
|
def is_pdf_or_image(filename):
|
|
"""
|
|
Check if a file name is a PDF or an image file.
|
|
|
|
Args:
|
|
filename (str): The name of the file.
|
|
|
|
Returns:
|
|
bool: True if the file name ends with ".pdf", ".jpg", or ".png", False otherwise.
|
|
"""
|
|
if filename.lower().endswith(".pdf") or filename.lower().endswith(".jpg") or filename.lower().endswith(".jpeg") or filename.lower().endswith(".png"):
|
|
output = True
|
|
else:
|
|
output = False
|
|
return output
|
|
|
|
def is_pdf(filename):
|
|
"""
|
|
Check if a file name is a PDF.
|
|
|
|
Args:
|
|
filename (str): The name of the file.
|
|
|
|
Returns:
|
|
bool: True if the file name ends with ".pdf", False otherwise.
|
|
"""
|
|
return filename.lower().endswith(".pdf")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_image_size_and_reduce(out_path:str, image:Image):
|
|
'''
|
|
Check if a given image size is above around 4.5mb, and reduce size if necessary. 5mb is the maximum possible to submit to AWS Textract.
|
|
'''
|
|
|
|
|
|
max_size = 4.5 * 1024 * 1024
|
|
file_size = os.path.getsize(out_path)
|
|
|
|
width = image.width
|
|
height = image.height
|
|
|
|
|
|
if file_size > max_size:
|
|
|
|
|
|
print(f"Image size before {width}x{height}, original file_size: {file_size}")
|
|
|
|
while file_size > max_size:
|
|
|
|
new_width = int(width * 0.5)
|
|
new_height = int(height * 0.5)
|
|
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
|
|
image.save(out_path, format="PNG", optimize=True)
|
|
|
|
|
|
file_size = os.path.getsize(out_path)
|
|
print(f"Resized to {new_width}x{new_height}, new file_size: {file_size}")
|
|
else:
|
|
new_width = width
|
|
new_height = height
|
|
|
|
return new_width, new_height
|
|
|
|
def process_single_page(pdf_path: str, page_num: int, image_dpi: float, output_dir: str = 'input') -> tuple[int, str]:
|
|
try:
|
|
|
|
output_dir = os.path.join(os.getcwd(), output_dir)
|
|
out_path = os.path.join(output_dir, f"{os.path.basename(pdf_path)}_{page_num}.png")
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
|
|
|
if os.path.exists(out_path):
|
|
|
|
image = Image.open(out_path)
|
|
else:
|
|
|
|
image_l = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1,
|
|
dpi=image_dpi, use_cropbox=False, use_pdftocairo=False)
|
|
image = image_l[0]
|
|
image = image.convert("L")
|
|
image.save(out_path, format="PNG")
|
|
|
|
width, height = image.size
|
|
|
|
|
|
width, height = check_image_size_and_reduce(out_path, image)
|
|
|
|
return page_num, out_path, width, height
|
|
|
|
except Exception as e:
|
|
print(f"Error processing page {page_num + 1}: {e}")
|
|
return page_num, "", width, height
|
|
|
|
def convert_pdf_to_images(pdf_path: str, prepare_for_review:bool=False, page_min: int = 0, image_dpi: float = image_dpi, num_threads: int = 8, output_dir: str = '/input'):
|
|
|
|
|
|
if prepare_for_review == True:
|
|
page_count = pdfinfo_from_path(pdf_path)['Pages']
|
|
else:
|
|
page_count = pdfinfo_from_path(pdf_path)['Pages']
|
|
|
|
print(f"Number of pages in PDF: {page_count}")
|
|
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
futures = []
|
|
for page_num in range(page_min, page_count):
|
|
futures.append(executor.submit(process_single_page, pdf_path, page_num, image_dpi))
|
|
|
|
for future in tqdm(as_completed(futures), total=len(futures), unit="pages", desc="Converting pages"):
|
|
page_num, result, width, height = future.result()
|
|
if result:
|
|
results.append((page_num, result, width, height))
|
|
else:
|
|
print(f"Page {page_num + 1} failed to process.")
|
|
|
|
|
|
results.sort(key=lambda x: x[0])
|
|
images = [result[1] for result in results]
|
|
widths = [result[2] for result in results]
|
|
heights = [result[3] for result in results]
|
|
|
|
print("PDF has been converted to images.")
|
|
return images, widths, heights
|
|
|
|
|
|
def process_file(file_path:str, prepare_for_review:bool=False):
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
|
if file_extension in ['.jpg', '.jpeg', '.png']:
|
|
print(f"{file_path} is an image file.")
|
|
|
|
img_object = [file_path]
|
|
|
|
|
|
image = Image.open(file_path)
|
|
img_object, image_sizes_width, image_sizes_height = check_image_size_and_reduce(file_path, image)
|
|
|
|
|
|
elif file_extension == '.pdf':
|
|
print(f"{file_path} is a PDF file. Converting to image set")
|
|
|
|
img_object, image_sizes_width, image_sizes_height = convert_pdf_to_images(file_path, prepare_for_review)
|
|
|
|
else:
|
|
print(f"{file_path} is not an image or PDF file.")
|
|
img_object = []
|
|
image_sizes_width = []
|
|
image_sizes_height = []
|
|
|
|
return img_object, image_sizes_width, image_sizes_height
|
|
|
|
def get_input_file_names(file_input:List[str]):
|
|
'''
|
|
Get list of input files to report to logs.
|
|
'''
|
|
|
|
all_relevant_files = []
|
|
file_name_with_extension = ""
|
|
full_file_name = ""
|
|
|
|
|
|
if isinstance(file_input, dict):
|
|
file_input = os.path.abspath(file_input["name"])
|
|
|
|
if isinstance(file_input, str):
|
|
file_input_list = [file_input]
|
|
else:
|
|
file_input_list = file_input
|
|
|
|
for file in file_input_list:
|
|
if isinstance(file, str):
|
|
file_path = file
|
|
else:
|
|
file_path = file.name
|
|
|
|
file_path_without_ext = get_file_name_without_type(file_path)
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
|
if (file_extension in ['.jpg', '.jpeg', '.png', '.pdf', '.xlsx', '.csv', '.parquet']) & ("review_file" not in file_path_without_ext):
|
|
all_relevant_files.append(file_path_without_ext)
|
|
file_name_with_extension = file_path_without_ext + file_extension
|
|
full_file_name = file_path
|
|
|
|
all_relevant_files_str = ", ".join(all_relevant_files)
|
|
|
|
|
|
|
|
|
|
return all_relevant_files_str, file_name_with_extension, full_file_name, all_relevant_files
|
|
|
|
def convert_color_to_range_0_1(color):
|
|
return tuple(component / 255 for component in color)
|
|
|
|
def redact_single_box(pymupdf_page:Page, pymupdf_rect:Rect, img_annotation_box:dict, custom_colours:bool=False):
|
|
pymupdf_x1 = pymupdf_rect[0]
|
|
pymupdf_y1 = pymupdf_rect[1]
|
|
pymupdf_x2 = pymupdf_rect[2]
|
|
pymupdf_y2 = pymupdf_rect[3]
|
|
|
|
|
|
redact_bottom_y = pymupdf_y1 + 2
|
|
redact_top_y = pymupdf_y2 - 2
|
|
|
|
|
|
if (redact_top_y - redact_bottom_y) < 1:
|
|
middle_y = (pymupdf_y1 + pymupdf_y2) / 2
|
|
redact_bottom_y = middle_y - 1
|
|
redact_top_y = middle_y + 1
|
|
|
|
|
|
|
|
rect_small_pixel_height = Rect(pymupdf_x1, redact_bottom_y, pymupdf_x2, redact_top_y)
|
|
|
|
|
|
|
|
pymupdf_page.add_redact_annot(rect_small_pixel_height)
|
|
|
|
|
|
shape = pymupdf_page.new_shape()
|
|
shape.draw_rect(pymupdf_rect)
|
|
|
|
if custom_colours == True:
|
|
if img_annotation_box["color"][0] > 1:
|
|
out_colour = convert_color_to_range_0_1(img_annotation_box["color"])
|
|
else:
|
|
out_colour = img_annotation_box["color"]
|
|
else:
|
|
if CUSTOM_BOX_COLOUR == "grey":
|
|
out_colour = (0.5, 0.5, 0.5)
|
|
else:
|
|
out_colour = (0,0,0)
|
|
|
|
shape.finish(color=out_colour, fill=out_colour)
|
|
|
|
shape.commit()
|
|
|
|
|
|
def convert_pymupdf_to_image_coords(pymupdf_page, x1, y1, x2, y2, image: Image):
|
|
'''
|
|
Converts coordinates from pymupdf format to image coordinates,
|
|
accounting for mediabox dimensions and offset.
|
|
'''
|
|
|
|
rect = pymupdf_page.rect
|
|
rect_width = rect.width
|
|
rect_height = rect.height
|
|
|
|
|
|
mediabox = pymupdf_page.mediabox
|
|
mediabox_width = mediabox.width
|
|
mediabox_height = mediabox.height
|
|
|
|
|
|
image_page_width, image_page_height = image.size
|
|
|
|
|
|
image_to_mediabox_x_scale = image_page_width / mediabox_width
|
|
image_to_mediabox_y_scale = image_page_height / mediabox_height
|
|
|
|
image_to_rect_scale_width = image_page_width / rect_width
|
|
image_to_rect_scale_height = image_page_height / rect_height
|
|
|
|
|
|
x_offset = rect.x0 - mediabox.x0
|
|
y_offset = rect.y0 - mediabox.y0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
x1_image = x1 * image_to_mediabox_x_scale
|
|
x2_image = x2 * image_to_mediabox_x_scale
|
|
y1_image = y1 * image_to_mediabox_y_scale
|
|
y2_image = y2 * image_to_mediabox_y_scale
|
|
|
|
|
|
if mediabox_width != rect_width:
|
|
|
|
mediabox_to_rect_x_scale = mediabox_width / rect_width
|
|
mediabox_to_rect_y_scale = mediabox_height / rect_height
|
|
|
|
rect_to_mediabox_x_scale = rect_width / mediabox_width
|
|
|
|
|
|
mediabox_rect_x_diff = (mediabox_width - rect_width) * (image_to_mediabox_x_scale / 2)
|
|
mediabox_rect_y_diff = (mediabox_height - rect_height) * (image_to_mediabox_y_scale / 2)
|
|
|
|
x1_image -= mediabox_rect_x_diff
|
|
x2_image -= mediabox_rect_x_diff
|
|
y1_image += mediabox_rect_y_diff
|
|
y2_image += mediabox_rect_y_diff
|
|
|
|
|
|
x1_image *= mediabox_to_rect_x_scale
|
|
x2_image *= mediabox_to_rect_x_scale
|
|
y1_image *= mediabox_to_rect_y_scale
|
|
y2_image *= mediabox_to_rect_y_scale
|
|
|
|
return x1_image, y1_image, x2_image, y2_image
|
|
|
|
def redact_whole_pymupdf_page(rect_height, rect_width, image, page, custom_colours, border = 5):
|
|
|
|
border = 5
|
|
|
|
whole_page_x1, whole_page_y1 = 0 + border, 0 + border
|
|
whole_page_x2, whole_page_y2 = rect_width - border, rect_height - border
|
|
|
|
whole_page_image_x1, whole_page_image_y1, whole_page_image_x2, whole_page_image_y2 = convert_pymupdf_to_image_coords(page, whole_page_x1, whole_page_y1, whole_page_x2, whole_page_y2, image)
|
|
|
|
|
|
whole_page_rect = Rect(whole_page_x1, whole_page_y1, whole_page_x2, whole_page_y2)
|
|
|
|
|
|
whole_page_img_annotation_box = {}
|
|
whole_page_img_annotation_box["xmin"] = whole_page_image_x1
|
|
whole_page_img_annotation_box["ymin"] = whole_page_image_y1
|
|
whole_page_img_annotation_box["xmax"] = whole_page_image_x2
|
|
whole_page_img_annotation_box["ymax"] = whole_page_image_y2
|
|
whole_page_img_annotation_box["color"] = (0,0,0)
|
|
whole_page_img_annotation_box["label"] = "Whole page"
|
|
|
|
redact_single_box(page, whole_page_rect, whole_page_img_annotation_box, custom_colours)
|
|
|
|
return whole_page_img_annotation_box
|
|
|
|
def create_page_size_objects(pymupdf_doc:Document, image_sizes_width:List[float], image_sizes_height:List[float]):
|
|
page_sizes = []
|
|
original_cropboxes = []
|
|
|
|
for page_no, page in enumerate(pymupdf_doc):
|
|
reported_page_no = page_no + 1
|
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_no)
|
|
original_cropboxes.append(pymupdf_page.cropbox)
|
|
|
|
|
|
|
|
if image_sizes_width and image_sizes_height:
|
|
out_page_image_sizes = {"page":reported_page_no, "image_width":image_sizes_width[page_no], "image_height":image_sizes_height[page_no], "mediabox_width":pymupdf_page.mediabox.width, "mediabox_height": pymupdf_page.mediabox.height, "cropbox_width":pymupdf_page.cropbox.width, "cropbox_height":pymupdf_page.cropbox.height}
|
|
else:
|
|
out_page_image_sizes = {"page":reported_page_no, "image_width":pd.NA(), "image_height":pd.NA(), "mediabox_width":pymupdf_page.mediabox.width, "mediabox_height": pymupdf_page.mediabox.height, "cropbox_width":pymupdf_page.cropbox.width, "cropbox_height":pymupdf_page.cropbox.height}
|
|
|
|
page_sizes.append(out_page_image_sizes)
|
|
|
|
return page_sizes, original_cropboxes
|
|
|
|
def prepare_image_or_pdf(
|
|
file_paths: List[str],
|
|
in_redact_method: str,
|
|
latest_file_completed: int = 0,
|
|
out_message: List[str] = [],
|
|
first_loop_state: bool = False,
|
|
number_of_pages:int = 1,
|
|
all_annotations_object:List = [],
|
|
prepare_for_review:bool = False,
|
|
in_fully_redacted_list:List[int]=[],
|
|
output_folder:str=output_folder,
|
|
prepare_images:bool=True,
|
|
progress: Progress = Progress(track_tqdm=True)
|
|
) -> tuple[List[str], List[str]]:
|
|
"""
|
|
Prepare and process image or text PDF files for redaction.
|
|
|
|
This function takes a list of file paths, processes each file based on the specified redaction method,
|
|
and returns the output messages and processed file paths.
|
|
|
|
Args:
|
|
file_paths (List[str]): List of file paths to process.
|
|
in_redact_method (str): The redaction method to use.
|
|
latest_file_completed (optional, int): Index of the last completed file.
|
|
out_message (optional, List[str]): List to store output messages.
|
|
first_loop_state (optional, bool): Flag indicating if this is the first iteration.
|
|
number_of_pages (optional, int): integer indicating the number of pages in the document
|
|
all_annotations_object(optional, List of annotation objects): All annotations for current document
|
|
prepare_for_review(optional, bool): Is this preparation step preparing pdfs and json files to review current redactions?
|
|
in_fully_redacted_list(optional, List of int): A list of pages to fully redact
|
|
output_folder (optional, str): The output folder for file save
|
|
prepare_images (optional, bool): A boolean indicating whether to create images for each PDF page. Defaults to true
|
|
progress (optional, Progress): Progress tracker for the operation
|
|
|
|
|
|
Returns:
|
|
tuple[List[str], List[str]]: A tuple containing the output messages and processed file paths.
|
|
"""
|
|
|
|
tic = time.perf_counter()
|
|
json_from_csv = False
|
|
original_cropboxes = []
|
|
converted_file_paths = []
|
|
image_file_paths = []
|
|
pymupdf_doc = []
|
|
review_file_csv = pd.DataFrame()
|
|
|
|
if isinstance(in_fully_redacted_list, pd.DataFrame):
|
|
if not in_fully_redacted_list.empty:
|
|
in_fully_redacted_list = in_fully_redacted_list.iloc[:,0].tolist()
|
|
|
|
|
|
if first_loop_state==True:
|
|
print("first_loop_state is True")
|
|
latest_file_completed = 0
|
|
out_message = []
|
|
all_annotations_object = []
|
|
else:
|
|
print("Now attempting file:", str(latest_file_completed))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(out_message, str):
|
|
out_message = [out_message]
|
|
|
|
if not file_paths:
|
|
file_paths = []
|
|
|
|
if isinstance(file_paths, dict):
|
|
file_paths = os.path.abspath(file_paths["name"])
|
|
|
|
if isinstance(file_paths, str):
|
|
file_path_number = 1
|
|
else:
|
|
file_path_number = len(file_paths)
|
|
|
|
|
|
print("Number of file paths:", file_path_number)
|
|
print("Latest_file_completed:", latest_file_completed)
|
|
|
|
latest_file_completed = int(latest_file_completed)
|
|
|
|
|
|
if latest_file_completed >= file_path_number:
|
|
print("Last file reached, returning files:", str(latest_file_completed))
|
|
if isinstance(out_message, list):
|
|
final_out_message = '\n'.join(out_message)
|
|
else:
|
|
final_out_message = out_message
|
|
return final_out_message, converted_file_paths, image_file_paths, number_of_pages, number_of_pages, pymupdf_doc, all_annotations_object, review_file_csv, original_cropboxes, page_sizes
|
|
|
|
|
|
|
|
progress(0.1, desc='Preparing file')
|
|
|
|
if isinstance(file_paths, str):
|
|
file_paths_list = [file_paths]
|
|
file_paths_loop = file_paths_list
|
|
else:
|
|
if prepare_for_review == False:
|
|
file_paths_list = file_paths
|
|
file_paths_loop = [file_paths_list[int(latest_file_completed)]]
|
|
else:
|
|
file_paths_list = file_paths
|
|
file_paths_loop = file_paths
|
|
|
|
file_paths_loop = sorted(file_paths_loop, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json'))
|
|
|
|
|
|
for file in file_paths_loop:
|
|
converted_file_path = []
|
|
image_file_path = []
|
|
|
|
if isinstance(file, str):
|
|
file_path = file
|
|
else:
|
|
file_path = file.name
|
|
file_path_without_ext = get_file_name_without_type(file_path)
|
|
file_name_with_ext = os.path.basename(file_path)
|
|
|
|
if not file_path:
|
|
out_message = "Please select a file."
|
|
print(out_message)
|
|
raise Exception(out_message)
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
|
if is_pdf(file_path):
|
|
pymupdf_doc = pymupdf.open(file_path)
|
|
pymupdf_pages = pymupdf_doc.page_count
|
|
|
|
|
|
|
|
converted_file_path = file_path
|
|
|
|
if prepare_images==True:
|
|
image_file_paths, image_sizes_width, image_sizes_height = process_file(file_path, prepare_for_review)
|
|
else:
|
|
print("Skipping image preparation")
|
|
image_file_paths=[]
|
|
image_sizes_width=[]
|
|
image_sizes_height=[]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
page_sizes, original_cropboxes = create_page_size_objects(pymupdf_doc, image_sizes_width, image_sizes_height)
|
|
|
|
|
|
if (not all_annotations_object) & (prepare_for_review == True):
|
|
all_annotations_object = []
|
|
|
|
for image_path in image_file_paths:
|
|
annotation = {}
|
|
annotation["image"] = image_path
|
|
annotation["boxes"] = []
|
|
|
|
all_annotations_object.append(annotation)
|
|
|
|
elif is_pdf_or_image(file_path):
|
|
|
|
if file_extension in ['.jpg', '.jpeg', '.png'] and in_redact_method == text_ocr_option:
|
|
in_redact_method = tesseract_ocr_option
|
|
|
|
|
|
pymupdf_doc = pymupdf.open()
|
|
|
|
img = Image.open(file_path)
|
|
rect = pymupdf.Rect(0, 0, img.width, img.height)
|
|
pymupdf_page = pymupdf_doc.new_page(width=img.width, height=img.height)
|
|
pymupdf_page.insert_image(rect, filename=file_path)
|
|
pymupdf_page = pymupdf_doc.load_page(0)
|
|
|
|
original_cropboxes.append(pymupdf_page.cropbox)
|
|
|
|
file_path_str = str(file_path)
|
|
|
|
image_file_paths, image_sizes_width, image_sizes_height = process_file(file_path_str, prepare_for_review)
|
|
|
|
|
|
|
|
out_page_image_sizes = {"page":1, "image_width":image_sizes_width[0], "image_height":image_sizes_height[0], "mediabox_width":pymupdf_page.mediabox.width, "mediabox_height": pymupdf_page.mediabox.height, "cropbox_width":original_cropboxes[-1].width, "cropbox_height":original_cropboxes[-1].height}
|
|
page_sizes.append(out_page_image_sizes)
|
|
|
|
converted_file_path = output_folder + file_name_with_ext
|
|
|
|
pymupdf_doc.save(converted_file_path)
|
|
|
|
print("Inserted image into PDF file")
|
|
|
|
elif file_extension in ['.csv']:
|
|
review_file_csv = read_file(file)
|
|
all_annotations_object = convert_review_df_to_annotation_json(review_file_csv, image_file_paths, page_sizes)
|
|
json_from_csv = True
|
|
print("Converted CSV review file to json")
|
|
|
|
|
|
if (file_extension in ['.json']) | (json_from_csv == True):
|
|
|
|
if (file_extension in ['.json']) & (prepare_for_review == True):
|
|
print("Preparing file for review")
|
|
if isinstance(file_path, str):
|
|
with open(file_path, 'r') as json_file:
|
|
all_annotations_object = json.load(json_file)
|
|
else:
|
|
|
|
all_annotations_object = json.loads(file_path)
|
|
|
|
|
|
elif (file_extension == '.json') and (prepare_for_review is not True):
|
|
|
|
|
|
out_folder = os.path.join(output_folder, file_path_without_ext + ".json")
|
|
|
|
|
|
shutil.copy2(file_path, out_folder)
|
|
|
|
continue
|
|
|
|
|
|
if all_annotations_object:
|
|
|
|
|
|
|
|
image_file_paths_pages = [
|
|
int(re.search(r'_(\d+)\.png$', os.path.basename(s)).group(1))
|
|
for s in image_file_paths
|
|
if re.search(r'_(\d+)\.png$', os.path.basename(s))
|
|
]
|
|
image_file_paths_pages = [int(i) for i in image_file_paths_pages]
|
|
|
|
|
|
if image_file_paths:
|
|
|
|
|
|
|
|
|
|
|
|
for i, image_file_path in enumerate(image_file_paths):
|
|
|
|
if i < len(all_annotations_object):
|
|
annotation = all_annotations_object[i]
|
|
else:
|
|
annotation = {}
|
|
all_annotations_object.append(annotation)
|
|
|
|
|
|
try:
|
|
if not annotation:
|
|
annotation = {"image":"", "boxes": []}
|
|
annotation_page_number = int(re.search(r'_(\d+)\.png$', image_file_path).group(1))
|
|
|
|
else:
|
|
annotation_page_number = int(re.search(r'_(\d+)\.png$', annotation["image"]).group(1))
|
|
except Exception as e:
|
|
print("Extracting page number from image failed due to:", e)
|
|
annotation_page_number = 0
|
|
|
|
|
|
|
|
if annotation_page_number in image_file_paths_pages:
|
|
|
|
|
|
correct_image_page = annotation_page_number
|
|
annotation["image"] = image_file_paths[correct_image_page]
|
|
else:
|
|
print("Page", annotation_page_number, "image file not found.")
|
|
|
|
all_annotations_object[i] = annotation
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out_folder = output_folder + file_path_without_ext + ".json"
|
|
with open(out_folder, 'w') as json_file:
|
|
json.dump(all_annotations_object, json_file, indent=4)
|
|
continue
|
|
|
|
|
|
else:
|
|
if in_redact_method == tesseract_ocr_option or in_redact_method == textract_option:
|
|
if is_pdf_or_image(file_path) == False:
|
|
out_message = "Please upload a PDF file or image file (JPG, PNG) for image analysis."
|
|
print(out_message)
|
|
raise Exception(out_message)
|
|
|
|
elif in_redact_method == text_ocr_option:
|
|
if is_pdf(file_path) == False:
|
|
out_message = "Please upload a PDF file for text analysis."
|
|
print(out_message)
|
|
raise Exception(out_message)
|
|
|
|
|
|
converted_file_paths.append(converted_file_path)
|
|
image_file_paths.extend(image_file_path)
|
|
|
|
toc = time.perf_counter()
|
|
out_time = f"File '{file_path_without_ext}' prepared in {toc - tic:0.1f} seconds."
|
|
|
|
print(out_time)
|
|
|
|
out_message.append(out_time)
|
|
out_message_out = '\n'.join(out_message)
|
|
|
|
number_of_pages = len(image_file_paths)
|
|
|
|
return out_message_out, converted_file_paths, image_file_paths, number_of_pages, number_of_pages, pymupdf_doc, all_annotations_object, review_file_csv, original_cropboxes, page_sizes
|
|
|
|
def convert_text_pdf_to_img_pdf(in_file_path:str, out_text_file_path:List[str], image_dpi:float=image_dpi):
|
|
file_path_without_ext = get_file_name_without_type(in_file_path)
|
|
|
|
out_file_paths = out_text_file_path
|
|
|
|
|
|
print("Creating image version of redacted PDF to embed redactions.")
|
|
|
|
pdf_text_image_paths, image_sizes_width, image_sizes_height = process_file(out_text_file_path[0])
|
|
out_text_image_file_path = output_folder + file_path_without_ext + "_text_redacted_as_img.pdf"
|
|
pdf_text_image_paths[0].save(out_text_image_file_path, "PDF" ,resolution=image_dpi, save_all=True, append_images=pdf_text_image_paths[1:])
|
|
|
|
|
|
|
|
out_file_paths = [out_text_image_file_path]
|
|
|
|
out_message = "PDF " + file_path_without_ext + " converted to image-based file."
|
|
print(out_message)
|
|
|
|
|
|
|
|
return out_message, out_file_paths
|
|
|
|
def join_values_within_threshold(df1:pd.DataFrame, df2:pd.DataFrame):
|
|
|
|
threshold = 5
|
|
|
|
|
|
df1['key'] = 1
|
|
df2['key'] = 1
|
|
merged = pd.merge(df1, df2, on='key').drop(columns=['key'])
|
|
|
|
|
|
conditions = (
|
|
(abs(merged['xmin_x'] - merged['xmin_y']) <= threshold) &
|
|
(abs(merged['xmax_x'] - merged['xmax_y']) <= threshold) &
|
|
(abs(merged['ymin_x'] - merged['ymin_y']) <= threshold) &
|
|
(abs(merged['ymax_x'] - merged['ymax_y']) <= threshold)
|
|
)
|
|
|
|
|
|
filtered = merged[conditions]
|
|
|
|
|
|
result = filtered.drop_duplicates(subset=['xmin_x', 'xmax_x', 'ymin_x', 'ymax_x'])
|
|
|
|
|
|
final_df = pd.merge(df1, result, left_on=['xmin', 'xmax', 'ymin', 'ymax'], right_on=['xmin_x', 'xmax_x', 'ymin_x', 'ymax_x'], how='left')
|
|
|
|
|
|
final_df = final_df.drop(columns=['key'])
|
|
print(final_df)
|
|
|
|
|
|
def convert_annotation_json_to_review_df(all_annotations:List[dict], redaction_decision_output:pd.DataFrame=pd.DataFrame(), page_sizes:List[dict]=[]) -> pd.DataFrame:
|
|
'''
|
|
Convert the annotation json data to a dataframe format. Add on any text from the initial review_file dataframe by joining on pages/co-ordinates (doesn't work very well currently).
|
|
'''
|
|
|
|
flattened_annotation_data = []
|
|
page_sizes_df = pd.DataFrame()
|
|
|
|
if not isinstance(redaction_decision_output, pd.DataFrame):
|
|
redaction_decision_output = pd.DataFrame()
|
|
|
|
for annotation in all_annotations:
|
|
|
|
|
|
image_path = annotation["image"]
|
|
|
|
|
|
match = re.search(r'_(\d+)\.png$', image_path)
|
|
if match:
|
|
number = match.group(1)
|
|
|
|
reported_number = int(number) + 1
|
|
else:
|
|
print("No number found before .png. Returning page 1.")
|
|
reported_number = 1
|
|
|
|
|
|
if 'boxes' not in annotation:
|
|
annotation['boxes'] = []
|
|
|
|
for box in annotation["boxes"]:
|
|
if 'text' not in box:
|
|
data_to_add = {"image": image_path, "page": reported_number, **box}
|
|
else:
|
|
data_to_add = {"image": image_path, "page": reported_number, "text": box['text'], **box}
|
|
|
|
flattened_annotation_data.append(data_to_add)
|
|
|
|
|
|
review_file_df = pd.DataFrame(flattened_annotation_data)
|
|
|
|
if page_sizes:
|
|
page_sizes_df = pd.DataFrame(page_sizes)
|
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int)
|
|
|
|
|
|
|
|
if "xmin" in review_file_df.columns:
|
|
if review_file_df["xmin"].max() >= 1 and review_file_df["xmax"].max() >= 1 and review_file_df["ymin"].max() >= 1 and review_file_df["ymax"].max() >= 1:
|
|
|
|
review_file_df["page"] = review_file_df["page"].astype(int)
|
|
|
|
if "image_width" not in review_file_df.columns and not page_sizes_df.empty:
|
|
review_file_df = review_file_df.merge(page_sizes_df, on="page", how="left")
|
|
|
|
if "image_width" in review_file_df.columns:
|
|
|
|
review_file_df["xmin"] = review_file_df["xmin"] / review_file_df["image_width"]
|
|
review_file_df["xmax"] = review_file_df["xmax"] / review_file_df["image_width"]
|
|
review_file_df["ymin"] = review_file_df["ymin"] / review_file_df["image_height"]
|
|
review_file_df["ymax"] = review_file_df["ymax"] / review_file_df["image_height"]
|
|
|
|
|
|
|
|
if not redaction_decision_output.empty:
|
|
|
|
if redaction_decision_output["xmin"].max() >= 1 and redaction_decision_output["xmax"].max() >= 1 and redaction_decision_output["ymin"].max() >= 1 and redaction_decision_output["ymax"].max() >= 1:
|
|
|
|
redaction_decision_output["page"] = redaction_decision_output["page"].astype(int)
|
|
|
|
if "image_width" not in redaction_decision_output.columns and not page_sizes_df.empty:
|
|
redaction_decision_output = redaction_decision_output.merge(page_sizes_df, on="page", how="left")
|
|
|
|
if "image_width" in redaction_decision_output.columns:
|
|
redaction_decision_output["xmin"] = redaction_decision_output["xmin"] / redaction_decision_output["image_width"]
|
|
redaction_decision_output["xmax"] = redaction_decision_output["xmax"] / redaction_decision_output["image_width"]
|
|
redaction_decision_output["ymin"] = redaction_decision_output["ymin"] / redaction_decision_output["image_height"]
|
|
redaction_decision_output["ymax"] = redaction_decision_output["ymax"] / redaction_decision_output["image_height"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not redaction_decision_output.empty:
|
|
if not 'text' in redaction_decision_output.columns:
|
|
redaction_decision_output['text'] = ''
|
|
|
|
if not 'text' in review_file_df.columns:
|
|
review_file_df['text'] = ''
|
|
|
|
|
|
df1 = review_file_df.copy()
|
|
df2 = redaction_decision_output.copy()
|
|
|
|
|
|
|
|
|
|
|
|
merge_keys = ['xmin', 'ymin', 'xmax', 'ymax', 'label', 'page']
|
|
df1['key'] = df1[merge_keys].astype(str).agg('_'.join, axis=1)
|
|
df2['key'] = df2[merge_keys].astype(str).agg('_'.join, axis=1)
|
|
|
|
|
|
|
|
|
|
|
|
merged_df = df1.merge(df2[['key', 'text']], on='key', how='left', suffixes=('', '_duplicate'))
|
|
|
|
|
|
merged_df['text'] = merged_df['text'].combine_first(merged_df.pop('text_duplicate'))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tree = cKDTree(df2[['xmin', 'ymin', 'xmax', 'ymax']].values)
|
|
query_coords = df1[['xmin', 'ymin', 'xmax', 'ymax']].values
|
|
|
|
|
|
tolerance = 0.01
|
|
distances, indices = tree.query(query_coords, distance_upper_bound=tolerance)
|
|
|
|
|
|
for i, (dist, idx) in enumerate(zip(distances, indices)):
|
|
if dist < tolerance and idx < len(df2):
|
|
merged_df.at[i, 'text'] = df2.iloc[idx]['text']
|
|
|
|
|
|
merged_df.drop(columns=['key'], inplace=True)
|
|
|
|
review_file_df = merged_df
|
|
|
|
review_file_df = review_file_df[["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax", "text"]]
|
|
|
|
|
|
for col in ["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax", "text"]:
|
|
if col not in review_file_df.columns:
|
|
review_file_df[col] = ''
|
|
|
|
|
|
|
|
|
|
|
|
review_file_df.loc[:,"color"] = review_file_df.loc[:,"color"].apply(lambda x: tuple(x) if isinstance(x, list) else x)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
review_file_df = review_file_df.sort_values(['page', 'ymin', 'xmin', 'label'])
|
|
|
|
|
|
|
|
return review_file_df
|
|
|
|
def convert_review_df_to_annotation_json(review_file_df:pd.DataFrame, image_paths:List[Image.Image], page_sizes:List[dict]=[]) -> List[dict]:
|
|
'''
|
|
Convert a review csv to a json file for use by the Gradio Annotation object.
|
|
'''
|
|
|
|
|
|
if page_sizes:
|
|
page_sizes_df = pd.DataFrame(page_sizes)
|
|
|
|
|
|
if len(page_sizes_df.loc[page_sizes_df["image_width"].isnull(),"image_width"]) == len(page_sizes_df["image_width"]):
|
|
print("No image dimensions found, converting first page.")
|
|
|
|
|
|
elif len(page_sizes_df.loc[page_sizes_df["image_width"].isnull(),"image_width"]) == 0:
|
|
|
|
if "image_width" not in review_file_df.columns:
|
|
review_file_df = review_file_df.merge(page_sizes_df, how="left", on = "page")
|
|
|
|
|
|
if review_file_df["xmin"].max() <= 1 and review_file_df["xmax"].max() <= 1 and review_file_df["ymin"].max() <= 1 and review_file_df["ymax"].max() <= 1:
|
|
review_file_df["xmin"] = review_file_df["xmin"] * review_file_df["image_width"]
|
|
review_file_df["xmax"] = review_file_df["xmax"] * review_file_df["image_width"]
|
|
review_file_df["ymin"] = review_file_df["ymin"] * review_file_df["image_height"]
|
|
review_file_df["ymax"] = review_file_df["ymax"] * review_file_df["image_height"]
|
|
|
|
|
|
review_file_df = review_file_df[["image", "page", "xmin", "ymin", "xmax", "ymax", "color", "label"]]
|
|
|
|
|
|
review_file_df.loc[:, "color"] = review_file_df.loc[:,"color"].apply(lambda x: tuple(x) if isinstance(x, list) else x)
|
|
|
|
|
|
grouped_csv_pages = review_file_df.groupby('page')
|
|
|
|
|
|
json_data = []
|
|
|
|
for page_no, pdf_image_path in enumerate(image_paths):
|
|
reported_page_number = int(page_no + 1)
|
|
|
|
if reported_page_number in review_file_df["page"].values:
|
|
|
|
|
|
selected_csv_pages = grouped_csv_pages.get_group(reported_page_number)
|
|
annotation_boxes = selected_csv_pages.drop(columns=['image', 'page']).to_dict(orient='records')
|
|
|
|
|
|
|
|
annotation = {
|
|
"image": pdf_image_path,
|
|
"boxes": annotation_boxes
|
|
}
|
|
|
|
else:
|
|
annotation = {}
|
|
annotation["image"] = pdf_image_path
|
|
annotation["boxes"] = []
|
|
|
|
|
|
json_data.append(annotation)
|
|
|
|
return json_data |