document_redaction / tools /redaction_review.py
seanpedrickcase's picture
Fixed issues with gradio version 5.16. Fixed fuzzy search error with pages with no data.
3cecbfa
import gradio as gr
import pandas as pd
import numpy as np
from xml.etree.ElementTree import Element, SubElement, tostring, parse
from xml.dom import minidom
import uuid
from typing import List
from gradio_image_annotation import image_annotator
from gradio_image_annotation.image_annotator import AnnotatedImageData
from tools.file_conversion import is_pdf, convert_review_json_to_pandas_df, CUSTOM_BOX_COLOUR
from tools.helper_functions import get_file_name_without_type, output_folder, detect_file_type
from tools.file_redaction import redact_page_with_pymupdf
import json
import os
import pymupdf
from fitz import Document
from PIL import ImageDraw, Image
from collections import defaultdict
Image.MAX_IMAGE_PIXELS = None
def decrease_page(number:int):
'''
Decrease page number for review redactions page.
'''
#print("number:", str(number))
if number > 1:
return number - 1, number - 1
else:
return 1, 1
def increase_page(number:int, image_annotator_object:AnnotatedImageData):
'''
Increase page number for review redactions page.
'''
if not image_annotator_object:
return 1, 1
max_pages = len(image_annotator_object)
if number < max_pages:
return number + 1, number + 1
else:
return max_pages, max_pages
def update_zoom(current_zoom_level:int, annotate_current_page:int, decrease:bool=True):
if decrease == False:
if current_zoom_level >= 70:
current_zoom_level -= 10
else:
if current_zoom_level < 110:
current_zoom_level += 10
return current_zoom_level, annotate_current_page
def remove_duplicate_images_with_blank_boxes(data: List[dict]) -> List[dict]:
'''
Remove items from the annotator object where the same page exists twice.
'''
# Group items by 'image'
image_groups = defaultdict(list)
for item in data:
image_groups[item['image']].append(item)
# Process each group to prioritize items with non-empty boxes
result = []
for image, items in image_groups.items():
# Filter items with non-empty boxes
non_empty_boxes = [item for item in items if item.get('boxes')]
# Remove 'text' elements from boxes
for item in non_empty_boxes:
if 'boxes' in item:
item['boxes'] = [{k: v for k, v in box.items() if k != 'text'} for box in item['boxes']]
if non_empty_boxes:
# Keep the first entry with non-empty boxes
result.append(non_empty_boxes[0])
else:
# If all items have empty or missing boxes, keep the first item
result.append(items[0])
return result
def get_recogniser_dataframe_out(image_annotator_object, recogniser_dataframe_gr):
recogniser_entities_list = ["Redaction"]
recogniser_entities_drop = gr.Dropdown(value="", choices=[""], allow_custom_value=True, interactive=True)
recogniser_dataframe_out = recogniser_dataframe_gr
try:
review_dataframe = convert_review_json_to_pandas_df(image_annotator_object)[["page", "label"]]
recogniser_entities = review_dataframe["label"].unique().tolist()
recogniser_entities.append("ALL")
recogniser_entities_for_drop = sorted(recogniser_entities)
recogniser_dataframe_out = gr.Dataframe(review_dataframe)
recogniser_entities_drop = gr.Dropdown(value=recogniser_entities_for_drop[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True)
recogniser_entities_list = [entity for entity in recogniser_entities_for_drop if entity != 'Redaction' and entity != 'ALL'] # Remove any existing 'Redaction'
recogniser_entities_list.insert(0, 'Redaction') # Add 'Redaction' to the start of the list
except Exception as e:
print("Could not extract recogniser information:", e)
recogniser_dataframe_out = recogniser_dataframe_gr
recogniser_entities_drop = gr.Dropdown(value="", choices=[""], allow_custom_value=True, interactive=True)
recogniser_entities_list = ["Redaction"]
return recogniser_dataframe_out, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list
def update_annotator(image_annotator_object:AnnotatedImageData, page_num:int, recogniser_entities_drop=gr.Dropdown(value="ALL", allow_custom_value=True), recogniser_dataframe_gr=gr.Dataframe(pd.DataFrame(data={"page":[], "label":[]})), zoom:int=100):
'''
Update a gradio_image_annotation object with new annotation data
'''
recogniser_entities_list = ["Redaction"]
recogniser_dataframe_out = pd.DataFrame()
if recogniser_dataframe_gr.empty:
recogniser_dataframe_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list = get_recogniser_dataframe_out(image_annotator_object, recogniser_dataframe_gr)
elif recogniser_dataframe_gr.iloc[0,0] == "":
recogniser_dataframe_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list = get_recogniser_dataframe_out(image_annotator_object, recogniser_dataframe_gr)
else:
review_dataframe = update_entities_df(recogniser_entities_drop, recogniser_dataframe_gr)
recogniser_dataframe_out = gr.Dataframe(review_dataframe)
recogniser_entities_list = recogniser_dataframe_gr["label"].unique().tolist()
recogniser_entities_list = sorted(recogniser_entities_list)
recogniser_entities_list = [entity for entity in recogniser_entities_list if entity != 'Redaction'] # Remove any existing 'Redaction'
recogniser_entities_list.insert(0, 'Redaction') # Add 'Redaction' to the start of the list
zoom_str = str(zoom) + '%'
recogniser_colour_list = [(0, 0, 0) for _ in range(len(recogniser_entities_list))]
if not image_annotator_object:
page_num_reported = 1
out_image_annotator = image_annotator(
None,
boxes_alpha=0.1,
box_thickness=1,
label_list=recogniser_entities_list,
label_colors=recogniser_colour_list,
show_label=False,
height=zoom_str,
width=zoom_str,
box_min_size=1,
box_selected_thickness=2,
handle_size=4,
sources=None,#["upload"],
show_clear_button=False,
show_share_button=False,
show_remove_button=False,
handles_cursor=True,
interactive=True
)
number_reported = gr.Number(label = "Page (press enter to change)", value=page_num_reported, precision=0)
return out_image_annotator, number_reported, number_reported, page_num_reported, recogniser_entities_drop, recogniser_dataframe_out, recogniser_dataframe_gr
#print("page_num at start of update_annotator function:", page_num)
if page_num is None:
page_num = 0
# Check bounding values for current page and page max
if page_num > 0:
page_num_reported = page_num
elif page_num == 0: page_num_reported = 1
else:
page_num = 0
page_num_reported = 1
page_max_reported = len(image_annotator_object)
if page_num_reported > page_max_reported:
page_num_reported = page_max_reported
image_annotator_object = remove_duplicate_images_with_blank_boxes(image_annotator_object)
out_image_annotator = image_annotator(
value = image_annotator_object[page_num_reported - 1],
boxes_alpha=0.1,
box_thickness=1,
label_list=recogniser_entities_list,
label_colors=recogniser_colour_list,
show_label=False,
height=zoom_str,
width=zoom_str,
box_min_size=1,
box_selected_thickness=2,
handle_size=4,
sources=None,#["upload"],
show_clear_button=False,
show_share_button=False,
show_remove_button=False,
handles_cursor=True,
interactive=True
)
number_reported = gr.Number(label = "Page (press enter to change)", value=page_num_reported, precision=0)
return out_image_annotator, number_reported, number_reported, page_num_reported, recogniser_entities_drop, recogniser_dataframe_out, recogniser_dataframe_gr
def modify_existing_page_redactions(image_annotated:AnnotatedImageData, current_page:int, previous_page:int, all_image_annotations:List[AnnotatedImageData], recogniser_entities_drop=gr.Dropdown(value="ALL", allow_custom_value=True),recogniser_dataframe=gr.Dataframe(pd.DataFrame(data={"page":[], "label":[]})), clear_all:bool=False):
'''
Overwrite current image annotations with modifications
'''
if not current_page:
current_page = 1
#If no previous page or is 0, i.e. first time run, then rewrite current page
#if not previous_page:
# previous_page = current_page
#print("image_annotated:", image_annotated)
image_annotated['image'] = all_image_annotations[previous_page - 1]["image"]
if clear_all == False:
all_image_annotations[previous_page - 1] = image_annotated
else:
all_image_annotations[previous_page - 1]["boxes"] = []
#print("all_image_annotations:", all_image_annotations)
# Rewrite all_image_annotations search dataframe with latest updates
try:
review_dataframe = convert_review_json_to_pandas_df(all_image_annotations)[["page", "label"]]
#print("review_dataframe['label']", review_dataframe["label"])
recogniser_entities = review_dataframe["label"].unique().tolist()
recogniser_entities.append("ALL")
recogniser_entities = sorted(recogniser_entities)
recogniser_dataframe_out = gr.Dataframe(review_dataframe)
#recogniser_dataframe_gr = gr.Dataframe(review_dataframe)
recogniser_entities_drop = gr.Dropdown(value=recogniser_entities_drop, choices=recogniser_entities, allow_custom_value=True, interactive=True)
except Exception as e:
print("Could not extract recogniser information:", e)
recogniser_dataframe_out = recogniser_dataframe
return all_image_annotations, current_page, current_page, recogniser_entities_drop, recogniser_dataframe_out
def apply_redactions(image_annotated:AnnotatedImageData, file_paths:List[str], doc:Document, all_image_annotations:List[AnnotatedImageData], current_page:int, review_file_state, save_pdf:bool=True, progress=gr.Progress(track_tqdm=True)):
'''
Apply modified redactions to a pymupdf and export review files
'''
#print("all_image_annotations:", all_image_annotations)
output_files = []
output_log_files = []
pdf_doc = []
#print("File paths in apply_redactions:", file_paths)
image_annotated['image'] = all_image_annotations[current_page - 1]["image"]
all_image_annotations[current_page - 1] = image_annotated
if not image_annotated:
print("No image annotations found")
return doc, all_image_annotations
if isinstance(file_paths, str):
file_paths = [file_paths]
for file_path in file_paths:
#print("file_path:", file_path)
file_name_without_ext = get_file_name_without_type(file_path)
file_name_with_ext = os.path.basename(file_path)
file_extension = os.path.splitext(file_path)[1].lower()
if save_pdf == True:
# If working with image docs
if (is_pdf(file_path) == False) & (file_extension not in '.csv'):
image = Image.open(file_paths[-1])
#image = pdf_doc
draw = ImageDraw.Draw(image)
for img_annotation_box in image_annotated['boxes']:
coords = [img_annotation_box["xmin"],
img_annotation_box["ymin"],
img_annotation_box["xmax"],
img_annotation_box["ymax"]]
fill = img_annotation_box["color"]
draw.rectangle(coords, fill=fill)
output_image_path = output_folder + file_name_without_ext + "_redacted.png"
image.save(output_folder + file_name_without_ext + "_redacted.png")
output_files.append(output_image_path)
print("Redactions saved to image file")
doc = [image]
elif file_extension in '.csv':
print("This is a csv")
pdf_doc = []
# If working with pdfs
elif is_pdf(file_path) == True:
pdf_doc = pymupdf.open(file_path)
orig_pdf_file_path = file_path
output_files.append(orig_pdf_file_path)
number_of_pages = pdf_doc.page_count
print("Saving pages to file.")
for i in progress.tqdm(range(0, number_of_pages), desc="Saving redactions to file", unit = "pages"):
#print("Saving page", str(i))
image_loc = all_image_annotations[i]['image']
#print("Image location:", image_loc)
# Load in image object
if isinstance(image_loc, np.ndarray):
image = Image.fromarray(image_loc.astype('uint8'))
#all_image_annotations[i]['image'] = image_loc.tolist()
elif isinstance(image_loc, Image.Image):
image = image_loc
#image_out_folder = output_folder + file_name_without_ext + "_page_" + str(i) + ".png"
#image_loc.save(image_out_folder)
#all_image_annotations[i]['image'] = image_out_folder
elif isinstance(image_loc, str):
image = Image.open(image_loc)
pymupdf_page = pdf_doc.load_page(i) #doc.load_page(current_page -1)
pymupdf_page = redact_page_with_pymupdf(pymupdf_page, all_image_annotations[i], image)
else:
print("File type not recognised.")
#try:
if pdf_doc:
out_pdf_file_path = output_folder + file_name_without_ext + "_redacted.pdf"
pdf_doc.save(out_pdf_file_path)
output_files.append(out_pdf_file_path)
else:
print("PDF input not found. Outputs not saved to PDF.")
# If save_pdf is not true, then add the original pdf to the output files
else:
if is_pdf(file_path) == True:
orig_pdf_file_path = file_path
output_files.append(orig_pdf_file_path)
try:
#print("Saving annotations to JSON")
out_annotation_file_path = output_folder + file_name_with_ext + '_review_file.json'
with open(out_annotation_file_path, 'w') as f:
json.dump(all_image_annotations, f)
output_log_files.append(out_annotation_file_path)
#print("Saving annotations to CSV review file")
#print("review_file_state:", review_file_state)
# Convert json to csv and also save this
review_df = convert_review_json_to_pandas_df(all_image_annotations, review_file_state)
out_review_file_file_path = output_folder + file_name_with_ext + '_review_file.csv'
review_df.to_csv(out_review_file_file_path, index=None)
output_files.append(out_review_file_file_path)
except Exception as e:
print("Could not save annotations to json or csv file:", e)
return doc, all_image_annotations, output_files, output_log_files
def get_boxes_json(annotations:AnnotatedImageData):
return annotations["boxes"]
def update_entities_df(choice:str, df:pd.DataFrame):
if choice=="ALL":
return df
else:
return df.loc[df["label"]==choice,:]
def df_select_callback(df: pd.DataFrame, evt: gr.SelectData):
row_value_page = evt.row_value[0] # This is the page number value
return row_value_page
def convert_image_coords_to_adobe(pdf_page_width, pdf_page_height, image_width, image_height, x1, y1, x2, y2):
'''
Converts coordinates from image space to Adobe PDF space.
Parameters:
- pdf_page_width: Width of the PDF page
- pdf_page_height: Height of the PDF page
- image_width: Width of the source image
- image_height: Height of the source image
- x1, y1, x2, y2: Coordinates in image space
Returns:
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space
'''
# Calculate scaling factors
scale_width = pdf_page_width / image_width
scale_height = pdf_page_height / image_height
# Convert coordinates
pdf_x1 = x1 * scale_width
pdf_x2 = x2 * scale_width
# Convert Y coordinates (flip vertical axis)
# Adobe coordinates start from bottom-left
pdf_y1 = pdf_page_height - (y1 * scale_height)
pdf_y2 = pdf_page_height - (y2 * scale_height)
# Make sure y1 is always less than y2 for Adobe's coordinate system
if pdf_y1 > pdf_y2:
pdf_y1, pdf_y2 = pdf_y2, pdf_y1
return pdf_x1, pdf_y1, pdf_x2, pdf_y2
def create_xfdf(df, pdf_path, pymupdf_doc, image_paths):
'''
Create an xfdf file from a review csv file and a pdf
'''
# Create root element
xfdf = Element('xfdf', xmlns="http://ns.adobe.com/xfdf/", xml_space="preserve")
# Add header
header = SubElement(xfdf, 'header')
header.set('pdf-filepath', pdf_path)
# Add annots
annots = SubElement(xfdf, 'annots')
for _, row in df.iterrows():
page_python_format = int(row["page"])-1
pymupdf_page = pymupdf_doc.load_page(page_python_format)
pdf_page_height = pymupdf_page.rect.height
pdf_page_width = pymupdf_page.rect.width
image = image_paths[page_python_format]
#print("image:", image)
if isinstance(image, str):
image = Image.open(image)
image_page_width, image_page_height = image.size
# Create redaction annotation
redact_annot = SubElement(annots, 'redact')
# Generate unique ID
annot_id = str(uuid.uuid4())
redact_annot.set('name', annot_id)
# Set page number (subtract 1 as PDF pages are 0-based)
redact_annot.set('page', str(int(row['page']) - 1))
# Convert coordinates
x1, y1, x2, y2 = convert_image_coords_to_adobe(
pdf_page_width,
pdf_page_height,
image_page_width,
image_page_height,
row['xmin'],
row['ymin'],
row['xmax'],
row['ymax']
)
if CUSTOM_BOX_COLOUR == "grey":
colour_str = "0.5,0.5,0.5"
else:
colour_str = row['color'].strip('()').replace(' ', '')
# Set coordinates
redact_annot.set('rect', f"{x1:.2f},{y1:.2f},{x2:.2f},{y2:.2f}")
# Set redaction properties
redact_annot.set('title', row['label']) # The type of redaction (e.g., "PERSON")
redact_annot.set('contents', row['text']) # The redacted text
redact_annot.set('subject', row['label']) # The redacted text
redact_annot.set('mimetype', "Form")
# Set appearance properties
redact_annot.set('border-color', colour_str) # Black border
redact_annot.set('repeat', 'false')
redact_annot.set('interior-color', colour_str)
#redact_annot.set('fill-color', colour_str)
#redact_annot.set('outline-color', colour_str)
#redact_annot.set('overlay-color', colour_str)
#redact_annot.set('overlay-text', row['label'])
redact_annot.set('opacity', "0.5")
# Add appearance dictionary
# appearanceDict = SubElement(redact_annot, 'appearancedict')
# # Normal appearance
# normal = SubElement(appearanceDict, 'normal')
# #normal.set('appearance', 'redact')
# # Color settings for the mark (before applying redaction)
# markAppearance = SubElement(redact_annot, 'markappearance')
# markAppearance.set('stroke-color', colour_str) # Red outline
# markAppearance.set('fill-color', colour_str) # Light red fill
# markAppearance.set('opacity', '0.5') # 50% opacity
# # Final redaction appearance (after applying)
# redactAppearance = SubElement(redact_annot, 'redactAppearance')
# redactAppearance.set('fillColor', colour_str) # Black fill
# redactAppearance.set('fontName', 'Helvetica')
# redactAppearance.set('fontSize', '12')
# redactAppearance.set('textAlignment', 'left')
# redactAppearance.set('textColor', colour_str) # White text
# Convert to pretty XML string
xml_str = minidom.parseString(tostring(xfdf)).toprettyxml(indent=" ")
return xml_str
def convert_df_to_xfdf(input_files:List[str], pdf_doc, image_paths):
'''
Load in files to convert a review file into an Adobe comment file format
'''
output_paths = []
pdf_name = ""
if isinstance(input_files, str):
file_paths_list = [input_files]
else:
file_paths_list = input_files
# Sort the file paths so that the pdfs come first
file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json'))
for file in file_paths_list:
if isinstance(file, str):
file_path = file
else:
file_path = file.name
file_path_name = get_file_name_without_type(file_path)
file_path_end = detect_file_type(file_path)
if file_path_end == "pdf":
pdf_name = os.path.basename(file_path)
if file_path_end == "csv":
# If no pdf name, just get the name of the file path
if not pdf_name:
pdf_name = file_path_name
# Read CSV file
df = pd.read_csv(file_path)
df.fillna('', inplace=True) # Replace NaN with an empty string
xfdf_content = create_xfdf(df, pdf_name, pdf_doc, image_paths)
output_path = output_folder + file_path_name + "_adobe.xfdf"
with open(output_path, 'w', encoding='utf-8') as f:
f.write(xfdf_content)
output_paths.append(output_path)
return output_paths
### Convert xfdf coordinates back to image for app
def convert_adobe_coords_to_image(pdf_page_width, pdf_page_height, image_width, image_height, x1, y1, x2, y2):
'''
Converts coordinates from Adobe PDF space to image space.
Parameters:
- pdf_page_width: Width of the PDF page
- pdf_page_height: Height of the PDF page
- image_width: Width of the source image
- image_height: Height of the source image
- x1, y1, x2, y2: Coordinates in Adobe PDF space
Returns:
- Tuple of converted coordinates (x1, y1, x2, y2) in image space
'''
# Calculate scaling factors
scale_width = image_width / pdf_page_width
scale_height = image_height / pdf_page_height
# Convert coordinates
image_x1 = x1 * scale_width
image_x2 = x2 * scale_width
# Convert Y coordinates (flip vertical axis)
# Adobe coordinates start from bottom-left
image_y1 = (pdf_page_height - y1) * scale_height
image_y2 = (pdf_page_height - y2) * scale_height
# Make sure y1 is always less than y2 for image's coordinate system
if image_y1 > image_y2:
image_y1, image_y2 = image_y2, image_y1
return image_x1, image_y1, image_x2, image_y2
def parse_xfdf(xfdf_path):
'''
Parse the XFDF file and extract redaction annotations.
Parameters:
- xfdf_path: Path to the XFDF file
Returns:
- List of dictionaries containing redaction information
'''
tree = parse(xfdf_path)
root = tree.getroot()
# Define the namespace
namespace = {'xfdf': 'http://ns.adobe.com/xfdf/'}
redactions = []
# Find all redact elements using the namespace
for redact in root.findall('.//xfdf:redact', namespaces=namespace):
#print("redact:", redact)
redaction_info = {
'image': '', # Image will be filled in later
'page': int(redact.get('page')) + 1, # Convert to 1-based index
'xmin': float(redact.get('rect').split(',')[0]),
'ymin': float(redact.get('rect').split(',')[1]),
'xmax': float(redact.get('rect').split(',')[2]),
'ymax': float(redact.get('rect').split(',')[3]),
'label': redact.get('title'),
'text': redact.get('contents'),
'color': redact.get('border-color', '(0, 0, 0)') # Default to black if not specified
}
redactions.append(redaction_info)
print("redactions:", redactions)
return redactions
def convert_xfdf_to_dataframe(file_paths_list, pymupdf_doc, image_paths):
'''
Convert redaction annotations from XFDF and associated images into a DataFrame.
Parameters:
- xfdf_path: Path to the XFDF file
- pdf_doc: PyMuPDF document object
- image_paths: List of PIL Image objects corresponding to PDF pages
Returns:
- DataFrame containing redaction information
'''
output_paths = []
xfdf_paths = []
df = pd.DataFrame()
#print("Image paths:", image_paths)
# Sort the file paths so that the pdfs come first
file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json'))
for file in file_paths_list:
if isinstance(file, str):
file_path = file
else:
file_path = file.name
file_path_name = get_file_name_without_type(file_path)
file_path_end = detect_file_type(file_path)
if file_path_end == "pdf":
pdf_name = os.path.basename(file_path)
#print("pymupdf_doc:", pymupdf_doc)
# Add pdf to outputs
output_paths.append(file_path)
if file_path_end == "xfdf":
if not pdf_name:
message = "Original PDF needed to convert from .xfdf format"
print(message)
raise ValueError(message)
xfdf_path = file
# if isinstance(xfdf_paths, str):
# xfdf_path = xfdf_paths.name
# else:
# xfdf_path = xfdf_paths[0].name
file_path_name = get_file_name_without_type(xfdf_path)
#print("file_path_name:", file_path_name)
# Parse the XFDF file
redactions = parse_xfdf(xfdf_path)
# Create a DataFrame from the redaction information
df = pd.DataFrame(redactions)
df.fillna('', inplace=True) # Replace NaN with an empty string
for _, row in df.iterrows():
page_python_format = int(row["page"])-1
pymupdf_page = pymupdf_doc.load_page(page_python_format)
pdf_page_height = pymupdf_page.rect.height
pdf_page_width = pymupdf_page.rect.width
image_path = image_paths[page_python_format]
#print("image_path:", image_path)
if isinstance(image_path, str):
image = Image.open(image_path)
image_page_width, image_page_height = image.size
# Convert to image coordinates
image_x1, image_y1, image_x2, image_y2 = convert_adobe_coords_to_image(pdf_page_width, pdf_page_height, image_page_width, image_page_height, row['xmin'], row['ymin'], row['xmax'], row['ymax'])
df.loc[_, ['xmin', 'ymin', 'xmax', 'ymax']] = [image_x1, image_y1, image_x2, image_y2]
# Optionally, you can add the image path or other relevant information
#print("Image path:", image_path)
df.loc[_, 'image'] = image_path
#print('row:', row)
out_file_path = output_folder + file_path_name + "_review_file.csv"
df.to_csv(out_file_path, index=None)
output_paths.append(out_file_path)
return output_paths