Spaces:
Sleeping
Sleeping
Update presidio_helpers.py
Browse files- presidio_helpers.py +61 -61
presidio_helpers.py
CHANGED
@@ -9,9 +9,7 @@ from presidio_analyzer import (
|
|
9 |
RecognizerResult,
|
10 |
RecognizerRegistry,
|
11 |
PatternRecognizer,
|
12 |
-
Pattern,
|
13 |
)
|
14 |
-
from presidio_analyzer.nlp_engine import NlpEngine
|
15 |
from presidio_anonymizer import AnonymizerEngine
|
16 |
from presidio_anonymizer.entities import OperatorConfig
|
17 |
|
@@ -21,14 +19,13 @@ logger = logging.getLogger("presidio-streamlit")
|
|
21 |
def nlp_engine_and_registry(
|
22 |
model_family: str,
|
23 |
model_path: str,
|
24 |
-
) -> Tuple[
|
25 |
"""Create the NLP Engine instance based on the requested model."""
|
26 |
registry = RecognizerRegistry()
|
27 |
|
28 |
-
|
29 |
-
|
30 |
-
|
31 |
-
try:
|
32 |
nlp = spacy.load(model_path)
|
33 |
registry.load_predefined_recognizers()
|
34 |
registry.add_recognizer_from_dict({
|
@@ -39,13 +36,8 @@ def nlp_engine_and_registry(
|
|
39 |
"package": "spacy",
|
40 |
})
|
41 |
return nlp, registry
|
42 |
-
|
43 |
-
|
44 |
-
raise
|
45 |
-
elif model_family.lower() == "flair":
|
46 |
-
from flair.models import SequenceTagger
|
47 |
-
from flair.data import Sentence
|
48 |
-
try:
|
49 |
tagger = SequenceTagger.load(model_path)
|
50 |
registry.load_predefined_recognizers()
|
51 |
registry.add_recognizer_from_dict({
|
@@ -56,12 +48,8 @@ def nlp_engine_and_registry(
|
|
56 |
"package": "flair",
|
57 |
})
|
58 |
return tagger, registry
|
59 |
-
|
60 |
-
|
61 |
-
raise
|
62 |
-
elif model_family.lower() == "huggingface":
|
63 |
-
from transformers import pipeline
|
64 |
-
try:
|
65 |
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
|
66 |
registry.load_predefined_recognizers()
|
67 |
registry.add_recognizer_from_dict({
|
@@ -72,11 +60,11 @@ def nlp_engine_and_registry(
|
|
72 |
"package": "transformers",
|
73 |
})
|
74 |
return nlp, registry
|
75 |
-
|
76 |
-
|
77 |
-
|
78 |
-
|
79 |
-
raise
|
80 |
|
81 |
@st.cache_resource
|
82 |
def analyzer_engine(
|
@@ -110,24 +98,28 @@ def analyze(
|
|
110 |
deny_list: List[str],
|
111 |
) -> List[RecognizerResult]:
|
112 |
"""Analyze text for PHI entities."""
|
113 |
-
|
114 |
-
|
115 |
-
|
116 |
-
|
117 |
-
|
118 |
-
|
119 |
-
|
120 |
-
|
121 |
-
|
122 |
-
|
123 |
-
|
124 |
-
|
125 |
-
|
126 |
-
|
127 |
-
|
128 |
-
|
129 |
-
|
130 |
-
|
|
|
|
|
|
|
|
|
131 |
|
132 |
def anonymize(
|
133 |
text: str,
|
@@ -137,20 +129,24 @@ def anonymize(
|
|
137 |
number_of_chars: int = 15,
|
138 |
) -> dict:
|
139 |
"""Anonymize detected PHI entities in the text."""
|
140 |
-
|
141 |
-
|
142 |
-
|
143 |
-
|
144 |
-
|
145 |
-
|
146 |
-
"
|
147 |
-
|
148 |
-
|
149 |
-
|
150 |
-
|
151 |
-
|
152 |
-
|
153 |
-
|
|
|
|
|
|
|
|
|
154 |
|
155 |
def create_ad_hoc_deny_list_recognizer(
|
156 |
deny_list: Optional[List[str]] = None,
|
@@ -158,7 +154,11 @@ def create_ad_hoc_deny_list_recognizer(
|
|
158 |
"""Create a recognizer for deny list items."""
|
159 |
if not deny_list:
|
160 |
return None
|
161 |
-
|
162 |
-
|
163 |
-
|
164 |
-
|
|
|
|
|
|
|
|
|
|
9 |
RecognizerResult,
|
10 |
RecognizerRegistry,
|
11 |
PatternRecognizer,
|
|
|
12 |
)
|
|
|
13 |
from presidio_anonymizer import AnonymizerEngine
|
14 |
from presidio_anonymizer.entities import OperatorConfig
|
15 |
|
|
|
19 |
def nlp_engine_and_registry(
|
20 |
model_family: str,
|
21 |
model_path: str,
|
22 |
+
) -> Tuple[object, RecognizerRegistry]:
|
23 |
"""Create the NLP Engine instance based on the requested model."""
|
24 |
registry = RecognizerRegistry()
|
25 |
|
26 |
+
try:
|
27 |
+
if model_family.lower() == "spacy":
|
28 |
+
import spacy
|
|
|
29 |
nlp = spacy.load(model_path)
|
30 |
registry.load_predefined_recognizers()
|
31 |
registry.add_recognizer_from_dict({
|
|
|
36 |
"package": "spacy",
|
37 |
})
|
38 |
return nlp, registry
|
39 |
+
elif model_family.lower() == "flair":
|
40 |
+
from flair.models import SequenceTagger
|
|
|
|
|
|
|
|
|
|
|
41 |
tagger = SequenceTagger.load(model_path)
|
42 |
registry.load_predefined_recognizers()
|
43 |
registry.add_recognizer_from_dict({
|
|
|
48 |
"package": "flair",
|
49 |
})
|
50 |
return tagger, registry
|
51 |
+
elif model_family.lower() == "huggingface":
|
52 |
+
from transformers import pipeline
|
|
|
|
|
|
|
|
|
53 |
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
|
54 |
registry.load_predefined_recognizers()
|
55 |
registry.add_recognizer_from_dict({
|
|
|
60 |
"package": "transformers",
|
61 |
})
|
62 |
return nlp, registry
|
63 |
+
else:
|
64 |
+
raise ValueError(f"Model family {model_family} not supported")
|
65 |
+
except Exception as e:
|
66 |
+
logger.error(f"Error loading model {model_path} for {model_family}: {str(e)}")
|
67 |
+
raise RuntimeError(f"Failed to load model: {str(e)}. Ensure model is downloaded and accessible.")
|
68 |
|
69 |
@st.cache_resource
|
70 |
def analyzer_engine(
|
|
|
98 |
deny_list: List[str],
|
99 |
) -> List[RecognizerResult]:
|
100 |
"""Analyze text for PHI entities."""
|
101 |
+
try:
|
102 |
+
results = analyzer.analyze(
|
103 |
+
text=text,
|
104 |
+
entities=entities,
|
105 |
+
language=language,
|
106 |
+
score_threshold=score_threshold,
|
107 |
+
return_decision_process=return_decision_process,
|
108 |
+
)
|
109 |
+
# Apply allow and deny lists
|
110 |
+
filtered_results = []
|
111 |
+
for result in results:
|
112 |
+
text_snippet = text[result.start:result.end].lower()
|
113 |
+
if any(word.lower() in text_snippet for word in allow_list):
|
114 |
+
continue
|
115 |
+
if any(word.lower() in text_snippet for word in deny_list):
|
116 |
+
filtered_results.append(result)
|
117 |
+
elif not deny_list:
|
118 |
+
filtered_results.append(result)
|
119 |
+
return filtered_results
|
120 |
+
except Exception as e:
|
121 |
+
logger.error(f"Analysis error: {str(e)}")
|
122 |
+
raise
|
123 |
|
124 |
def anonymize(
|
125 |
text: str,
|
|
|
129 |
number_of_chars: int = 15,
|
130 |
) -> dict:
|
131 |
"""Anonymize detected PHI entities in the text."""
|
132 |
+
try:
|
133 |
+
anonymizer = AnonymizerEngine()
|
134 |
+
operator_config = {
|
135 |
+
"DEFAULT": OperatorConfig(operator, {})
|
136 |
+
}
|
137 |
+
if operator == "mask":
|
138 |
+
operator_config["DEFAULT"] = OperatorConfig(operator, {
|
139 |
+
"masking_char": mask_char,
|
140 |
+
"chars_to_mask": number_of_chars,
|
141 |
+
})
|
142 |
+
return anonymizer.anonymize(
|
143 |
+
text=text,
|
144 |
+
analyzer_results=analyze_results,
|
145 |
+
operators=operator_config,
|
146 |
+
)
|
147 |
+
except Exception as e:
|
148 |
+
logger.error(f"Anonymization error: {str(e)}")
|
149 |
+
raise
|
150 |
|
151 |
def create_ad_hoc_deny_list_recognizer(
|
152 |
deny_list: Optional[List[str]] = None,
|
|
|
154 |
"""Create a recognizer for deny list items."""
|
155 |
if not deny_list:
|
156 |
return None
|
157 |
+
try:
|
158 |
+
deny_list_recognizer = PatternRecognizer(
|
159 |
+
supported_entity="GENERIC_PII", deny_list=deny_list
|
160 |
+
)
|
161 |
+
return deny_list_recognizer
|
162 |
+
except Exception as e:
|
163 |
+
logger.error(f"Error creating deny list recognizer: {str(e)}")
|
164 |
+
raise
|