Spaces:
Runtime error
Runtime error
import spacy | |
import streamlit as st | |
import re | |
import logging | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_analyzer import AnalyzerEngine, PatternRecognizer | |
from annotated_text import annotated_text | |
from flair_recognizer import FlairRecognizer | |
############################### | |
#### Render Streamlit page #### | |
############################### | |
st.title("Anonymise your text!") | |
st.markdown( | |
"This mini-app anonymises text using Flair. You can find the code in the Files and versions tab above." | |
) | |
# Configure logger | |
logging.basicConfig(format="\n%(asctime)s\n%(message)s", level=logging.INFO, force=True) | |
############################## | |
###### Define functions ###### | |
############################## | |
def analyzer_engine(): | |
"""Return AnalyzerEngine.""" | |
analyzer = AnalyzerEngine() | |
flair_recognizer = FlairRecognizer() | |
analyzer.registry.add_recognizer(flair_recognizer) | |
return analyzer | |
def analyze(**kwargs): | |
"""Analyze input using Analyzer engine and input arguments (kwargs).""" | |
analyzer_engine = analyzer_engine() | |
if "entities" not in kwargs or "All" in kwargs["entities"]: | |
kwargs["entities"] = None | |
if st.session_state.excluded_words: | |
excluded_words_recognizer = PatternRecognizer(supported_entity="MANUAL ADD", | |
deny_list=st.session_state.excluded_words) | |
analyzer_engine.registry.add_recognizer(excluded_words_recognizer) | |
return analyzer_engine.analyze(**kwargs) | |
def annotate(): | |
text = st.session_state.text | |
analyze_results = st.session_state.analyze_results | |
tokens = [] | |
starts=[] | |
# sort by start index | |
results = sorted(analyze_results, key=lambda x: x.start) | |
for i, res in enumerate(results): | |
# if we already have an entity for this token don't add another | |
if res.start not in starts: | |
if i == 0: | |
tokens.append(text[:res.start]) | |
# append entity text and entity type | |
tokens.append((text[res.start: res.end], res.entity_type)) | |
# if another entity coming i.e. we're not at the last results element, add text up to next entity | |
if i != len(results) - 1: | |
tokens.append(text[res.end:results[i+1].start]) | |
# if no more entities coming, add all remaining text | |
else: | |
tokens.append(text[res.end:]) | |
# append this token to the list so we don't repeat results per token | |
starts.append(res.start) | |
return tokens | |
def get_supported_entities(): | |
"""Return supported entities from the Analyzer Engine.""" | |
return analyzer_engine().get_supported_entities() | |
def analyze_text(): | |
if not st.session_state.text: | |
st.session_state.text_error = "Please enter your text" | |
return | |
with text_spinner_placeholder: | |
with st.spinner("Please wait while your text is being analysed..."): | |
logging.info(f"This is the text being analysed: {st.session_state.text}") | |
st.session_state.text_error = "" | |
st.session_state.n_requests += 1 | |
analyze_results = analyze( | |
text=st.session_state.text, | |
entities=st_entities, | |
language="en", | |
return_decision_process=False, | |
) | |
# if st.session_state.excluded_words: | |
# analyze_results = include_manual_input(analyze_results) | |
if st.session_state.allowed_words: | |
analyze_results = exclude_manual_input(analyze_results) | |
st.session_state.analyze_results = analyze_results | |
logging.info( | |
f"analyse results: {st.session_state.analyze_results}\n" | |
) | |
# def include_manual_input(analyze_results): | |
# analyze_results_extended=analyze_results | |
# logging.info( | |
# f"analyse results before adding extra words: {analyze_results}\n" | |
# ) | |
# for word in st.session_state.excluded_words: | |
# if word in st.session_state.text: | |
# r = re.compile(word) | |
# index_entries = [[m.start(),m.end()] for m in r.finditer(st.session_state.text)] | |
# for entry in index_entries: | |
# start=entry[0] | |
# end=entry[1] | |
# analyze_results_extended.append("type": "MANUAL ADD", "start": start, "end": end, "score": 1.0}) | |
# logging.info( | |
# f"analyse results after adding allowed words: {analyze_results_extended}\n" | |
# ) | |
# logging.info( | |
# f"type of entries in results: {type(analyze_results[0])}\n" | |
# ) | |
# return analyze_results_extended | |
def exclude_manual_input(analyze_results): | |
analyze_results_fltered=[] | |
logging.info( | |
f"analyse results before removing allowed words: {analyze_results}\n" | |
) | |
for token in analyze_results: | |
if st.session_state.text[token.start:token.end] not in st.session_state.allowed_words: | |
analyze_results_fltered.append(token) | |
logging.info( | |
f"analyse results after removing allowed words: {analyze_results_fltered}\n" | |
) | |
return analyze_results_fltered | |
def anonymizer_engine(): | |
"""Return AnonymizerEngine.""" | |
return AnonymizerEngine() | |
def anonymise_text(): | |
if st.session_state.n_requests >= 50: | |
st.session_state.text_error = "Too many requests. Please wait a few seconds before anonymising more text." | |
logging.info(f"Session request limit reached: {st.session_state.n_requests}") | |
st.session_state.n_requests = 1 | |
st.session_state.text_error = "" | |
if not st.session_state.text: | |
st.session_state.text_error = "Please enter your text" | |
return | |
if not st.session_state.analyze_results: | |
analyze_text() | |
with text_spinner_placeholder: | |
with st.spinner("Please wait while your text is being anonymised..."): | |
anon_results = anonymizer_engine().anonymize(st.session_state.text, st.session_state.analyze_results) | |
st.session_state.text_error = "" | |
st.session_state.n_requests += 1 | |
st.session_state.anon_results = anon_results | |
logging.info( | |
f"text anonymised: {st.session_state.anon_results}" | |
) | |
def clear_results(): | |
st.session_state.anon_results="" | |
st.session_state.analyze_results="" | |
####################################### | |
#### Initialize "global" variables #### | |
####################################### | |
if "text_error" not in st.session_state: | |
st.session_state.text_error = "" | |
if "analyze_results" not in st.session_state: | |
st.session_state.analyze_results = "" | |
if "anon_results" not in st.session_state: | |
st.session_state.anon_results = "" | |
if "n_requests" not in st.session_state: | |
st.session_state.n_requests = 0 | |
############################## | |
####### Page arguments ####### | |
############################## | |
# Every widget with a key is automatically added to Session State as a global variable. | |
# In Streamlit, interacting with a widget triggers a rerun and variables defined | |
# in the code get reinitialized after each rerun. | |
# If a callback function is associated with a widget then a change in the widget | |
# triggers the following sequence: First the callback function is executed and then | |
# the app executes from top to bottom. | |
st.text_input( | |
label="Text", | |
placeholder="Write your text here", | |
key='text', | |
on_change=clear_results | |
) | |
st.text_input( | |
label="Data to be redacted (optional)", | |
placeholder="John, Mary, London", | |
key='excluded_words', | |
on_change=clear_results | |
) | |
st.text_input( | |
label="Data to be ignored (optional)", | |
placeholder="NHS, GEL, Lab", | |
key='allowed_words', | |
on_change=clear_results | |
) | |
st_entities = st.sidebar.multiselect( | |
label="Which entities to look for?", | |
options=get_supported_entities(), | |
default=list(get_supported_entities()), | |
) | |
############################## | |
######## Page buttons ######## | |
############################## | |
# button return true when clicked | |
col1, col2 = st.columns(2) | |
with col1: | |
analyze_now = st.button( | |
label="Analyse text", | |
type="primary", | |
on_click=analyze_text, | |
) | |
with col2: | |
anonymise_now = st.button( | |
label="Anonymise text", | |
type="primary", | |
on_click=anonymise_text, | |
) | |
############################## | |
######## Page actions ######## | |
############################## | |
text_spinner_placeholder = st.empty() | |
if st.session_state.text_error: | |
st.error(st.session_state.text_error) | |
with col1: | |
if st.session_state.analyze_results: | |
annotated_tokens=annotate() | |
annotated_text(*annotated_tokens) | |
st.write(st.session_state.analyze_results) | |
with col2: | |
if st.session_state.anon_results: | |
st.write(st.session_state.anon_results.text) |