awacke1's picture
Update app.py
77c02fb verified
raw
history blame
7.85 kB
import logging
import os
import base64
import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_helpers import (
analyzer_engine,
get_supported_entities,
analyze,
anonymize,
)
st.set_page_config(
page_title="Presidio PHI De-identification",
layout="wide",
initial_sidebar_state="expanded",
menu_items={"About": "https://microsoft.github.io/presidio/"},
)
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")
# Sidebar
st.sidebar.header("PHI De-identification with Presidio")
model_help_text = "Select Named Entity Recognition (NER) model for PHI detection."
model_list = [
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox(
"NER model package",
[model[0] for model in model_list],
index=0,
help=model_help_text,
)
# Display HuggingFace link for selected model
selected_model_url = next(url for model, url in model_list if model == st_model)
st.sidebar.markdown(f"[View model on HuggingFace]({selected_model_url})")
# Extract model package
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() not in ("huggingface") else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Note: Models might take some time to download on first run.")
st_operator = st.sidebar.selectbox(
"De-identification approach",
["replace", "redact", "mask"],
index=0,
help="Select PHI manipulation method.",
)
st_threshold = st.sidebar.slider(
label="Acceptance threshold",
min_value=0.0,
max_value=1.0,
value=0.35,
)
st_return_decision_process = st.sidebar.checkbox(
"Add analysis explanations",
value=False,
)
# Allow and deny lists
with st.sidebar.expander("Allowlists and denylists", expanded=False):
st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
# PDF processing functions
def get_timestamp_prefix():
central = pytz.timezone("US/Central")
now = datetime.now(central)
return now.strftime("%I%M%p_%d-%m-%y").upper()
def save_pdf(pdf_input):
"""Save uploaded PDF to disk."""
try:
original_name = pdf_input.name
with open(original_name, "wb") as f:
f.write(pdf_input.read())
return original_name
except Exception as e:
st.error(f"Failed to save PDF: {str(e)}")
return None
def read_pdf(pdf_path):
"""Read text from a PDF using PyPDF2."""
try:
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
page_text = page.extract_text() or ""
text += page_text + "\n"
return text
except Exception as e:
st.error(f"Failed to read PDF: {str(e)}")
return None
def create_pdf(text, input_path, output_filename):
"""Create a PDF with anonymized text using PyPDF2."""
try:
reader = PdfReader(input_path)
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(output_filename, "wb") as f:
writer.write(f)
return output_filename
except Exception as e:
st.error(f"Failed to create PDF: {str(e)}")
return None
# Main panel
col1, col2 = st.columns(2)
with col1:
st.subheader("Input")
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
if uploaded_file:
try:
# Save PDF to disk
pdf_path = save_pdf(uploaded_file)
if not pdf_path:
raise ValueError("Failed to save PDF")
# Read PDF
text = read_pdf(pdf_path)
if not text:
raise ValueError("No text extracted from PDF")
# Initialize analyzer
try:
analyzer = analyzer_engine(*analyzer_params)
except Exception as e:
st.error(f"Failed to load model: {str(e)}")
st.info("Ensure models are downloaded and check network/permissions.")
raise
# Analyze
st_analyze_results = analyze(
analyzer=analyzer,
text=text,
entities=get_supported_entities(*analyzer_params),
language="en",
score_threshold=st_threshold,
return_decision_process=st_return_decision_process,
allow_list=st_allow_list,
deny_list=st_deny_list,
)
# Process results
phi_types = set(res.entity_type for res in st_analyze_results)
if phi_types:
st.success(f"Removed PHI types: {', '.join(phi_types)}")
else:
st.info("No PHI detected")
# Anonymize
anonymized_result = anonymize(
text=text,
operator=st_operator,
analyze_results=st_analyze_results,
)
# Generate output filename with timestamp
timestamp = get_timestamp_prefix()
output_filename = f"{timestamp}_{uploaded_file.name}"
# Create new PDF
pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
if not pdf_output:
raise ValueError("Failed to generate PDF")
# Generate base64 download link
try:
with open(output_filename, "rb") as f:
pdf_bytes = f.read()
b64 = base64.b64encode(pdf_bytes).decode()
href = f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>'
st.markdown(href, unsafe_allow_html=True)
except Exception as e:
st.error(f"Error generating download link: {str(e)}")
raise
# Display findings
with col2:
st.subheader("Findings")
if st_analyze_results:
df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
df["text"] = [text[res.start:res.end] for res in st_analyze_results]
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
{
"entity_type": "Entity type",
"text": "Text",
"start": "Start",
"end": "End",
"score": "Confidence",
},
axis=1,
)
if st_return_decision_process:
analysis_explanation_df = pd.DataFrame.from_records(
[r.analysis_explanation.to_dict() for r in st_analyze_results]
)
df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
else:
st.text("No findings")
# Clean up temporary file
if os.path.exists(pdf_path):
os.remove(pdf_path)
except Exception as e:
st.error(f"An error occurred: {str(e)}")
logger.error(f"Processing error: {str(e)}")