import spacy import streamlit as st import re import logging from presidio_anonymizer import AnonymizerEngine from presidio_analyzer import AnalyzerEngine, PatternRecognizer from annotated_text import annotated_text from flair_recognizer import FlairRecognizer ############################### #### Render Streamlit page #### ############################### st.title("Anonymise your text!") st.markdown( "This mini-app anonymises text using Flair. You can find the code in the Files and versions tab above." ) # Configure logger logging.basicConfig(format="\n%(asctime)s\n%(message)s", level=logging.INFO, force=True) ############################## ###### Define functions ###### ############################## @st.cache(allow_output_mutation=True,show_spinner=False) def analyzer_engine(): """Return AnalyzerEngine.""" analyzer = AnalyzerEngine() flair_recognizer = FlairRecognizer() analyzer.registry.add_recognizer(flair_recognizer) return analyzer def analyze(**kwargs): """Analyze input using Analyzer engine and input arguments (kwargs).""" analyzer_engine = analyzer_engine() if "entities" not in kwargs or "All" in kwargs["entities"]: kwargs["entities"] = None if st.session_state.excluded_words: excluded_words_recognizer = PatternRecognizer(supported_entity="MANUAL ADD", deny_list=st.session_state.excluded_words) analyzer_engine.registry.add_recognizer(excluded_words_recognizer) return analyzer_engine.analyze(**kwargs) def annotate(): text = st.session_state.text analyze_results = st.session_state.analyze_results tokens = [] starts=[] # sort by start index results = sorted(analyze_results, key=lambda x: x.start) for i, res in enumerate(results): # if we already have an entity for this token don't add another if res.start not in starts: if i == 0: tokens.append(text[:res.start]) # append entity text and entity type tokens.append((text[res.start: res.end], res.entity_type)) # if another entity coming i.e. we're not at the last results element, add text up to next entity if i != len(results) - 1: tokens.append(text[res.end:results[i+1].start]) # if no more entities coming, add all remaining text else: tokens.append(text[res.end:]) # append this token to the list so we don't repeat results per token starts.append(res.start) return tokens def get_supported_entities(): """Return supported entities from the Analyzer Engine.""" return analyzer_engine().get_supported_entities() def analyze_text(): if not st.session_state.text: st.session_state.text_error = "Please enter your text" return with text_spinner_placeholder: with st.spinner("Please wait while your text is being analysed..."): logging.info(f"This is the text being analysed: {st.session_state.text}") st.session_state.text_error = "" st.session_state.n_requests += 1 analyze_results = analyze( text=st.session_state.text, entities=st_entities, language="en", return_decision_process=False, ) # if st.session_state.excluded_words: # analyze_results = include_manual_input(analyze_results) if st.session_state.allowed_words: analyze_results = exclude_manual_input(analyze_results) st.session_state.analyze_results = analyze_results logging.info( f"analyse results: {st.session_state.analyze_results}\n" ) # def include_manual_input(analyze_results): # analyze_results_extended=analyze_results # logging.info( # f"analyse results before adding extra words: {analyze_results}\n" # ) # for word in st.session_state.excluded_words: # if word in st.session_state.text: # r = re.compile(word) # index_entries = [[m.start(),m.end()] for m in r.finditer(st.session_state.text)] # for entry in index_entries: # start=entry[0] # end=entry[1] # analyze_results_extended.append("type": "MANUAL ADD", "start": start, "end": end, "score": 1.0}) # logging.info( # f"analyse results after adding allowed words: {analyze_results_extended}\n" # ) # logging.info( # f"type of entries in results: {type(analyze_results[0])}\n" # ) # return analyze_results_extended def exclude_manual_input(analyze_results): analyze_results_fltered=[] logging.info( f"analyse results before removing allowed words: {analyze_results}\n" ) for token in analyze_results: if st.session_state.text[token.start:token.end] not in st.session_state.allowed_words: analyze_results_fltered.append(token) logging.info( f"analyse results after removing allowed words: {analyze_results_fltered}\n" ) return analyze_results_fltered @st.cache(allow_output_mutation=True) def anonymizer_engine(): """Return AnonymizerEngine.""" return AnonymizerEngine() def anonymise_text(): if st.session_state.n_requests >= 50: st.session_state.text_error = "Too many requests. Please wait a few seconds before anonymising more text." logging.info(f"Session request limit reached: {st.session_state.n_requests}") st.session_state.n_requests = 1 st.session_state.text_error = "" if not st.session_state.text: st.session_state.text_error = "Please enter your text" return if not st.session_state.analyze_results: analyze_text() with text_spinner_placeholder: with st.spinner("Please wait while your text is being anonymised..."): anon_results = anonymizer_engine().anonymize(st.session_state.text, st.session_state.analyze_results) st.session_state.text_error = "" st.session_state.n_requests += 1 st.session_state.anon_results = anon_results logging.info( f"text anonymised: {st.session_state.anon_results}" ) def clear_results(): st.session_state.anon_results="" st.session_state.analyze_results="" ####################################### #### Initialize "global" variables #### ####################################### if "text_error" not in st.session_state: st.session_state.text_error = "" if "analyze_results" not in st.session_state: st.session_state.analyze_results = "" if "anon_results" not in st.session_state: st.session_state.anon_results = "" if "n_requests" not in st.session_state: st.session_state.n_requests = 0 ############################## ####### Page arguments ####### ############################## # Every widget with a key is automatically added to Session State as a global variable. # In Streamlit, interacting with a widget triggers a rerun and variables defined # in the code get reinitialized after each rerun. # If a callback function is associated with a widget then a change in the widget # triggers the following sequence: First the callback function is executed and then # the app executes from top to bottom. st.text_input( label="Text", placeholder="Write your text here", key='text', on_change=clear_results ) st.text_input( label="Data to be redacted (optional)", placeholder="John, Mary, London", key='excluded_words', on_change=clear_results ) st.text_input( label="Data to be ignored (optional)", placeholder="NHS, GEL, Lab", key='allowed_words', on_change=clear_results ) st_entities = st.sidebar.multiselect( label="Which entities to look for?", options=get_supported_entities(), default=list(get_supported_entities()), ) ############################## ######## Page buttons ######## ############################## # button return true when clicked col1, col2 = st.columns(2) with col1: analyze_now = st.button( label="Analyse text", type="primary", on_click=analyze_text, ) with col2: anonymise_now = st.button( label="Anonymise text", type="primary", on_click=anonymise_text, ) ############################## ######## Page actions ######## ############################## text_spinner_placeholder = st.empty() if st.session_state.text_error: st.error(st.session_state.text_error) with col1: if st.session_state.analyze_results: annotated_tokens=annotate() annotated_text(*annotated_tokens) st.write(st.session_state.analyze_results) with col2: if st.session_state.anon_results: st.write(st.session_state.anon_results.text)