Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import base64 | |
import datetime | |
import dotenv | |
import pandas as pd | |
import streamlit as st | |
from streamlit_tags import st_tags | |
from PyPDF2 import PdfReader, PdfWriter | |
from presidio_helpers import ( | |
analyzer_engine, | |
get_supported_entities, | |
analyze, | |
anonymize, | |
) | |
st.set_page_config( | |
page_title="Presidio PHI De-identification", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
menu_items={"About": "https://microsoft.github.io/presidio/"}, | |
) | |
dotenv.load_dotenv() | |
logger = logging.getLogger("presidio-streamlit") | |
# Sidebar | |
st.sidebar.header("PHI De-identification with Presidio") | |
model_help_text = "Select Named Entity Recognition (NER) model for PHI detection." | |
model_list = [ | |
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"), | |
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"), | |
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"), | |
] | |
st_model = st.sidebar.selectbox( | |
"NER model package", | |
[model[0] for model in model_list], | |
index=0, | |
help=model_help_text, | |
) | |
# Display HuggingFace link for selected model | |
selected_model_url = next(url for model, url in model_list if model == st_model) | |
st.sidebar.markdown(f"[View model on HuggingFace]({selected_model_url})") | |
# Extract model package | |
st_model_package = st_model.split("/")[0] | |
st_model = st_model if st_model_package.lower() not in ("huggingface") else "/".join(st_model.split("/")[1:]) | |
analyzer_params = (st_model_package, st_model) | |
st.sidebar.warning("Note: Models might take some time to download on first run.") | |
st_operator = st.sidebar.selectbox( | |
"De-identification approach", | |
["replace", "redact", "mask"], | |
index=0, | |
help="Select PHI manipulation method.", | |
) | |
st_threshold = st.sidebar.slider( | |
label="Acceptance threshold", | |
min_value=0.0, | |
max_value=1.0, | |
value=0.35, | |
) | |
st_return_decision_process = st.sidebar.checkbox( | |
"Add analysis explanations", | |
value=False, | |
) | |
# Allow and deny lists | |
with st.sidebar.expander("Allowlists and denylists", expanded=False): | |
st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.") | |
st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.") | |
# PDF processing functions | |
def get_timestamp_prefix(): | |
central = pytz.timezone("US/Central") | |
now = datetime.now(central) | |
return now.strftime("%I%M%p_%d-%m-%y").upper() | |
def save_pdf(pdf_input): | |
"""Save uploaded PDF to disk.""" | |
try: | |
original_name = pdf_input.name | |
with open(original_name, "wb") as f: | |
f.write(pdf_input.read()) | |
return original_name | |
except Exception as e: | |
st.error(f"Failed to save PDF: {str(e)}") | |
return None | |
def read_pdf(pdf_path): | |
"""Read text from a PDF using PyPDF2.""" | |
try: | |
reader = PdfReader(pdf_path) | |
text = "" | |
for page in reader.pages: | |
page_text = page.extract_text() or "" | |
text += page_text + "\n" | |
return text | |
except Exception as e: | |
st.error(f"Failed to read PDF: {str(e)}") | |
return None | |
def create_pdf(text, input_path, output_filename): | |
"""Create a PDF with anonymized text using PyPDF2.""" | |
try: | |
reader = PdfReader(input_path) | |
writer = PdfWriter() | |
for page in reader.pages: | |
writer.add_page(page) | |
with open(output_filename, "wb") as f: | |
writer.write(f) | |
return output_filename | |
except Exception as e: | |
st.error(f"Failed to create PDF: {str(e)}") | |
return None | |
# Main panel | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Input") | |
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"]) | |
if uploaded_file: | |
try: | |
# Save PDF to disk | |
pdf_path = save_pdf(uploaded_file) | |
if not pdf_path: | |
raise ValueError("Failed to save PDF") | |
# Read PDF | |
text = read_pdf(pdf_path) | |
if not text: | |
raise ValueError("No text extracted from PDF") | |
# Initialize analyzer | |
try: | |
analyzer = analyzer_engine(*analyzer_params) | |
except Exception as e: | |
st.error(f"Failed to load model: {str(e)}") | |
st.info("Ensure models are downloaded and check network/permissions.") | |
raise | |
# Analyze | |
st_analyze_results = analyze( | |
analyzer=analyzer, | |
text=text, | |
entities=get_supported_entities(*analyzer_params), | |
language="en", | |
score_threshold=st_threshold, | |
return_decision_process=st_return_decision_process, | |
allow_list=st_allow_list, | |
deny_list=st_deny_list, | |
) | |
# Process results | |
phi_types = set(res.entity_type for res in st_analyze_results) | |
if phi_types: | |
st.success(f"Removed PHI types: {', '.join(phi_types)}") | |
else: | |
st.info("No PHI detected") | |
# Anonymize | |
anonymized_result = anonymize( | |
text=text, | |
operator=st_operator, | |
analyze_results=st_analyze_results, | |
) | |
# Generate output filename with timestamp | |
timestamp = get_timestamp_prefix() | |
output_filename = f"{timestamp}_{uploaded_file.name}" | |
# Create new PDF | |
pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename) | |
if not pdf_output: | |
raise ValueError("Failed to generate PDF") | |
# Generate base64 download link | |
try: | |
with open(output_filename, "rb") as f: | |
pdf_bytes = f.read() | |
b64 = base64.b64encode(pdf_bytes).decode() | |
href = f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>' | |
st.markdown(href, unsafe_allow_html=True) | |
except Exception as e: | |
st.error(f"Error generating download link: {str(e)}") | |
raise | |
# Display findings | |
with col2: | |
st.subheader("Findings") | |
if st_analyze_results: | |
df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results]) | |
df["text"] = [text[res.start:res.end] for res in st_analyze_results] | |
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename( | |
{ | |
"entity_type": "Entity type", | |
"text": "Text", | |
"start": "Start", | |
"end": "End", | |
"score": "Confidence", | |
}, | |
axis=1, | |
) | |
if st_return_decision_process: | |
analysis_explanation_df = pd.DataFrame.from_records( | |
[r.analysis_explanation.to_dict() for r in st_analyze_results] | |
) | |
df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1) | |
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True) | |
else: | |
st.text("No findings") | |
# Clean up temporary file | |
if os.path.exists(pdf_path): | |
os.remove(pdf_path) | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
logger.error(f"Processing error: {str(e)}") |