anonymise_this / app.py
arogeriogel's picture
including metadata
c1e69f0 unverified
raw
history blame
9.59 kB
import spacy
import streamlit as st
import re
import logging
from presidio_anonymizer import AnonymizerEngine
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, RecognizerResult
from annotated_text import annotated_text
from flair_recognizer import FlairRecognizer
###############################
#### Render Streamlit page ####
###############################
st.title("Anonymise your text!")
st.markdown(
"This mini-app anonymises text using Flair. You can find the code in the Files and versions tab above."
)
# Configure logger
logging.basicConfig(format="\n%(asctime)s\n%(message)s", level=logging.INFO, force=True)
##############################
###### Define functions ######
##############################
@st.cache(allow_output_mutation=True,show_spinner=False)
def analyzer_engine():
"""Return AnalyzerEngine."""
analyzer = AnalyzerEngine()
flair_recognizer = FlairRecognizer()
analyzer.registry.add_recognizer(flair_recognizer)
return analyzer
def analyze(**kwargs):
"""Analyze input using Analyzer engine and input arguments (kwargs)."""
analyzer_engine_instance = analyzer_engine()
if "entities" not in kwargs or "All" in kwargs["entities"]:
kwargs["entities"] = None
if st.session_state.excluded_words:
logging.info(
f"type of excluded_words_recognizer: {type(st.session_state.excluded_words)}\n"
)
excluded_words_recognizer = PatternRecognizer(supported_entity="MANUAL ADD",
name="Excluded words recognizer",
deny_list=st.session_state.excluded_words.split(','))
analyzer_engine_instance.registry.add_recognizer(excluded_words_recognizer)
return analyzer_engine_instance.analyze(**kwargs)
def annotate():
text = st.session_state.text
analyze_results = st.session_state.analyze_results
tokens = []
starts=[]
# sort by start index
results = sorted(analyze_results, key=lambda x: x.start)
for i, res in enumerate(results):
# if we already have an entity for this token don't add another
if res.start not in starts:
if i == 0:
tokens.append(text[:res.start])
# append entity text and entity type
tokens.append((text[res.start: res.end], res.entity_type))
# if another entity coming i.e. we're not at the last results element, add text up to next entity
if i != len(results) - 1:
tokens.append(text[res.end:results[i+1].start])
# if no more entities coming, add all remaining text
else:
tokens.append(text[res.end:])
# append this token to the list so we don't repeat results per token
starts.append(res.start)
return tokens
def get_supported_entities():
"""Return supported entities from the Analyzer Engine."""
return analyzer_engine().get_supported_entities()
def analyze_text():
if not st.session_state.text:
st.session_state.text_error = "Please enter your text"
return
with text_spinner_placeholder:
with st.spinner("Please wait while your text is being analysed..."):
logging.info(f"This is the text being analysed: {st.session_state.text}")
st.session_state.text_error = ""
st.session_state.n_requests += 1
analyze_results = analyze(
text=st.session_state.text,
entities=st_entities,
language="en",
return_decision_process=False,
)
# if st.session_state.excluded_words:
# analyze_results = include_manual_input(analyze_results)
if st.session_state.allowed_words:
analyze_results = exclude_manual_input(analyze_results)
st.session_state.analyze_results = analyze_results
logging.info(
f"analyse results: {st.session_state.analyze_results}\n"
)
# def include_manual_input(analyze_results):
# analyze_results_extended=analyze_results
# logging.info(
# f"analyse results before adding extra words: {analyze_results}\n"
# )
# for word in st.session_state.excluded_words:
# if word in st.session_state.text:
# r = re.compile(word)
# index_entries = [[m.start(),m.end()] for m in r.finditer(st.session_state.text)]
# for entry in index_entries:
# start=entry[0]
# end=entry[1]
# analyze_results_extended.append("type": "MANUAL ADD", "start": start, "end": end, "score": 1.0})
# logging.info(
# f"analyse results after adding allowed words: {analyze_results_extended}\n"
# )
# logging.info(
# f"type of entries in results: {type(analyze_results[0])}\n"
# )
# return analyze_results_extended
## We might be able to create a new result from json https://github.com/microsoft/presidio/blob/07b854dd7ae247b916aef4d2adbb82f33bba7be8/presidio-analyzer/presidio_analyzer/recognizer_result.py#L72
def exclude_manual_input(analyze_results):
analyze_results_fltered=[]
logging.info(
f"analyse results before removing allowed words: {analyze_results}\n"
)
for token in analyze_results:
if st.session_state.text[token.start:token.end] not in st.session_state.allowed_words:
analyze_results_fltered.append(token)
logging.info(
f"analyse results after removing allowed words: {analyze_results_fltered}\n"
)
return analyze_results_fltered
@st.cache(allow_output_mutation=True)
def anonymizer_engine():
"""Return AnonymizerEngine."""
return AnonymizerEngine()
def anonymise_text():
if st.session_state.n_requests >= 50:
st.session_state.text_error = "Too many requests. Please wait a few seconds before anonymising more text."
logging.info(f"Session request limit reached: {st.session_state.n_requests}")
st.session_state.n_requests = 1
st.session_state.text_error = ""
if not st.session_state.text:
st.session_state.text_error = "Please enter your text"
return
if not st.session_state.analyze_results:
analyze_text()
with text_spinner_placeholder:
with st.spinner("Please wait while your text is being anonymised..."):
anon_results = anonymizer_engine().anonymize(st.session_state.text, st.session_state.analyze_results)
st.session_state.text_error = ""
st.session_state.n_requests += 1
st.session_state.anon_results = anon_results
logging.info(
f"text anonymised: {st.session_state.anon_results}"
)
def clear_results():
st.session_state.anon_results=""
st.session_state.analyze_results=""
# if not st.session_state.excluded_words:
analyzer_engine().registry.remove_recognizer("Excluded words recognizer")
#######################################
#### Initialize "global" variables ####
#######################################
if "text_error" not in st.session_state:
st.session_state.text_error = ""
if "analyze_results" not in st.session_state:
st.session_state.analyze_results = ""
if "anon_results" not in st.session_state:
st.session_state.anon_results = ""
if "n_requests" not in st.session_state:
st.session_state.n_requests = 0
##############################
####### Page arguments #######
##############################
# Every widget with a key is automatically added to Session State as a global variable.
# In Streamlit, interacting with a widget triggers a rerun and variables defined
# in the code get reinitialized after each rerun.
# If a callback function is associated with a widget then a change in the widget
# triggers the following sequence: First the callback function is executed and then
# the app executes from top to bottom.
st.text_input(
label="Text",
placeholder="Write your text here",
key='text',
on_change=clear_results
)
st.text_input(
label="Data to be redacted (optional)",
placeholder="John, Mary, London",
key='excluded_words',
on_change=clear_results
)
st.text_input(
label="Data to be ignored (optional)",
placeholder="NHS, GEL, Lab",
key='allowed_words',
on_change=clear_results
)
st_entities = st.sidebar.multiselect(
label="Which entities to look for?",
options=get_supported_entities(),
default=list(get_supported_entities()),
)
##############################
######## Page buttons ########
##############################
# button return true when clicked
col1, col2 = st.columns(2)
with col1:
analyze_now = st.button(
label="Analyse text",
type="primary",
on_click=analyze_text,
)
with col2:
anonymise_now = st.button(
label="Anonymise text",
type="primary",
on_click=anonymise_text,
)
##############################
######## Page actions ########
##############################
text_spinner_placeholder = st.empty()
if st.session_state.text_error:
st.error(st.session_state.text_error)
with col1:
if st.session_state.analyze_results:
annotated_tokens=annotate()
annotated_text(*annotated_tokens)
st.write(st.session_state.analyze_results)
with col2:
if st.session_state.anon_results:
st.write(st.session_state.anon_results.text)