Spaces:
Runtime error
Runtime error
import gradio as gr | |
import pandas as pd | |
import json | |
from collections import defaultdict | |
# Create tokenizer for biomed model | |
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification | |
tokenizer = AutoTokenizer.from_pretrained("d4data/biomedical-ner-all") # https://huggingface.co/d4data/biomedical-ner-all?text=asthma | |
model = AutoModelForTokenClassification.from_pretrained("d4data/biomedical-ner-all") | |
pipe = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple") | |
# Matplotlib for entity graph | |
import matplotlib.pyplot as plt | |
plt.switch_backend("Agg") | |
# Load examples from JSON | |
import os | |
# Load terminology datasets: | |
basedir = os.path.dirname(__file__) | |
dataLOINC = pd.read_csv(basedir + "\\" + f'LoincTableCore.csv') | |
dataPanels = pd.read_csv(basedir + "\\" + f'PanelsAndForms-ACW1208Labeled.csv') | |
dataSNOMED = pd.read_csv(basedir + "\\" + f'sct2_TextDefinition_Full-en_US1000124_20220901.txt',sep='\t') | |
dataOMS = pd.read_csv(basedir + "\\" + f'SnomedOMS.csv') | |
dataICD10 = pd.read_csv(basedir + "\\" + f'ICD10Diagnosis.csv') | |
dir_path = os.path.dirname(os.path.realpath(__file__)) | |
EXAMPLES = {} | |
with open(dir_path + "\\" + "examples.json", "r") as f: | |
example_json = json.load(f) | |
EXAMPLES = {x["text"]: x["label"] for x in example_json} | |
def MatchLOINC(name): | |
#basedir = os.path.dirname(__file__) | |
pd.set_option("display.max_rows", None) | |
#data = pd.read_csv(basedir + "\\" + f'LoincTableCore.csv') | |
data = dataLOINC | |
swith=data.loc[data['COMPONENT'].str.contains(name, case=False, na=False)] | |
return swith | |
def MatchLOINCPanelsandForms(name): | |
#basedir = os.path.dirname(__file__) | |
#data = pd.read_csv(basedir + "\\" + f'PanelsAndForms-ACW1208Labeled.csv') | |
data = dataPanels | |
# Assessment Name: | |
#swith=data.loc[data['ParentName'].str.contains(name, case=False, na=False)] | |
# Assessment Question: | |
swith=data.loc[data['LoincName'].str.contains(name, case=False, na=False)] | |
return swith | |
def MatchSNOMED(name): | |
#basedir = os.path.dirname(__file__) | |
#data = pd.read_csv(basedir + "\\" + f'sct2_TextDefinition_Full-en_US1000124_20220901.txt',sep='\t') | |
data = dataSNOMED | |
swith=data.loc[data['term'].str.contains(name, case=False, na=False)] | |
return swith | |
def MatchOMS(name): | |
#basedir = os.path.dirname(__file__) | |
#data = pd.read_csv(basedir + "\\" + f'SnomedOMS.csv') | |
data = dataOMS | |
swith=data.loc[data['SNOMED CT'].str.contains(name, case=False, na=False)] | |
return swith | |
def MatchICD10(name): | |
#basedir = os.path.dirname(__file__) | |
#data = pd.read_csv(basedir + "\\" + f'ICD10Diagnosis.csv') | |
data = dataICD10 | |
swith=data.loc[data['Description'].str.contains(name, case=False, na=False)] | |
return swith | |
def SaveResult(text, outputfileName): | |
#try: | |
basedir = os.path.dirname(__file__) | |
savePath = outputfileName | |
print("Saving: " + text + " to " + savePath) | |
from os.path import exists | |
file_exists = exists(savePath) | |
if file_exists: | |
with open(outputfileName, "a") as f: #append | |
#for line in text: | |
f.write(str(text.replace("\n"," "))) | |
f.write('\n') | |
else: | |
with open(outputfileName, "w") as f: #write | |
#for line in text: | |
f.write(str(text.replace("\n"," "))) | |
f.write('\n') | |
#except ValueError as err: | |
# raise ValueError("File Save Error in SaveResult \n" + format_tb(err.__traceback__)[0] + err.args[0] + "\nEnd of error message.") from None | |
return | |
def loadFile(filename): | |
try: | |
basedir = os.path.dirname(__file__) | |
loadPath = basedir + "\\" + filename | |
print("Loading: " + loadPath) | |
from os.path import exists | |
file_exists = exists(loadPath) | |
if file_exists: | |
with open(loadPath, "r") as f: #read | |
contents = f.read() | |
print(contents) | |
return contents | |
except ValueError as err: | |
raise ValueError("File Save Error in SaveResult \n" + format_tb(err.__traceback__)[0] + err.args[0] + "\nEnd of error message.") from None | |
return "" | |
def get_today_filename(): | |
from datetime import datetime | |
date = datetime.now().strftime("%Y_%m_%d-%I.%M.%S.%p") | |
#print(f"filename_{date}") 'filename_2023_01_12-03-29-22_AM' | |
return f"MedNER_{date}.csv" | |
def get_base(filename): | |
basedir = os.path.dirname(__file__) | |
loadPath = basedir + "\\" + filename | |
#print("Loading: " + loadPath) | |
return loadPath | |
def group_by_entity(raw): | |
outputFile = get_base(get_today_filename()) | |
out = defaultdict(int) | |
for ent in raw: | |
out[ent["entity_group"]] += 1 | |
myEntityGroup = ent["entity_group"] | |
print("Found entity group type: " + myEntityGroup) | |
if (myEntityGroup in ['Sign_symptom', 'Detailed_description', 'History', 'Activity', 'Medication' ]): | |
eterm = ent["word"].replace('#','') | |
minlength = 3 | |
if len(eterm) > minlength: | |
print("Found eterm: " + eterm) | |
eterm.replace("#","") | |
g1=MatchLOINC(eterm) | |
g2=MatchLOINCPanelsandForms(eterm) | |
g3=MatchSNOMED(eterm) | |
g4=MatchOMS(eterm) | |
g5=MatchICD10(eterm) | |
sAll = "" | |
print("Saving to output file " + outputFile) | |
# Create harmonisation output format of input to output code, name, Text | |
try: # 18 fields, output to labeled CSV dataset for results teaching on scored regret changes to action plan with data inputs | |
col = " 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19" | |
#LOINC | |
g11 = g1['LOINC_NUM'].to_string().replace(","," ") | |
g12 = g1['COMPONENT'].to_string().replace(","," ") | |
s1 = ("LOINC Terms of entity ," + myEntityGroup + ", with term ," + eterm + ", LOINC codes of ," + g11 + ", and LOINC questions of ," + g12 + ", Label,Value, Label,Value, Label,Value ") | |
SaveResult(s1, outputFile) | |
#LOINC Panels | |
g21 = g2['Loinc'].to_string().replace(","," ") | |
g22 = g2['LoincName'].to_string().replace(","," ") | |
g23 = g2['ParentLoinc'].to_string().replace(","," ") | |
g24 = g2['ParentName'].to_string().replace(","," ") | |
s2 = ("LOINC Panels of entity ," + myEntityGroup + ", with term ," + eterm + ", LOINC codes of ," + g21 + ", and LOINC name of ," + g22 + ", and Parent codes of ," + g23 + ", with Parent names of ," + g24 + ", Label,Value ") | |
SaveResult(s2, outputFile) | |
#SNOMED | |
g31 = g3['conceptId'].to_string().replace(","," ") | |
g32 = g3['term'].to_string().replace(","," ") | |
s3 = ("SNOMED Terms of entity ," + myEntityGroup + ", with term ," + eterm + ", SNOMED concepts of ," + g31 + ", and SNOMED terms of ," + g32 + ", Label,Value, Label,Value, Label,Value ") | |
SaveResult(s3, outputFile) | |
#OMS | |
g41 = g4['Omaha Code'].to_string().replace(","," ") | |
g42 = g4['SNOMED CT concept ID'].to_string().replace(","," ") | |
g43 = g4['SNOMED CT'].to_string().replace(","," ") | |
g44 = g4['PR'].to_string().replace(","," ") | |
g45 = g4['S&S'].to_string().replace(","," ") | |
s4 = ("OMS Terms of entity ," + myEntityGroup + ", with term ," + eterm + ", Omaha codes of ," + g41 + ", and SNOMED concepts of ," + g42 + ", and SNOMED codes of ," + g43 + ", and OMS problem of ," + g44 + ", and OMS Sign Symptom of ," + g45) | |
if g41 != 'Series([] )': SaveResult(s4, outputFile) | |
#ICD10 | |
g51 = g5['Code'].to_string().replace(","," ") | |
g52 = g5['Description'].to_string().replace(","," ") | |
s5 = ("ICD10 matches of entity ," + myEntityGroup + ", with term ," + eterm + ", ICD10 codes of ," + g51 + ", and descriptions of ," + g52 + ", Label,Value, Label,Value, Label,Value ") | |
if g51 != 'Series([] )':SaveResult(s5, outputFile) | |
except ValueError as err: | |
raise ValueError("Error in group by entity \n" + format_tb(err.__traceback__)[0] + err.args[0] + "\nEnd of error message.") from None | |
#print(sAll) | |
#return out; | |
#break; | |
# out["total"] = sum(out.values()) | |
# return out | |
return outputFile | |
def plot_to_figure(grouped): | |
fig = plt.figure() | |
plt.bar(x=list(grouped.keys()), height=list(grouped.values())) | |
plt.margins(0.2) | |
plt.subplots_adjust(bottom=0.4) | |
plt.xticks(rotation=90) | |
return fig | |
def ner(text): | |
raw = pipe(text) | |
ner_content = { | |
"text": text, | |
"entities": [ | |
{ | |
"entity": x["entity_group"], | |
"word": x["word"], | |
"score": x["score"], | |
"start": x["start"], | |
"end": x["end"], | |
} | |
for x in raw | |
], | |
} | |
#grouped = group_by_entity(raw) | |
outputFile = group_by_entity(raw) | |
#figure = plot_to_figure(grouped) | |
label = EXAMPLES.get(text, "Unknown") | |
#meta = { | |
# "entity_counts": grouped, | |
# "entities": len(set(grouped.keys())), | |
# "counts": sum(grouped.values()), | |
# } | |
#return (ner_content, meta, label, figure) | |
outputDataframe = pd.read_csv(outputFile) | |
outputFile = outputFile.replace(os.path.dirname(__file__) + "\\","") # Just filename for File download UI output element | |
#return (ner_content, meta, label, figure, outputDataframe, outputFile) | |
return (ner_content, outputDataframe, outputFile) | |
# New way = Gradio Blocks: | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown( | |
""" | |
# 🩺⚕️NLP UI for AI Biomedical Entity Type Recognition with Clinical Terminology Harmonisation | |
""" | |
) | |
input = gr.Textbox(label="Note text", value="") | |
output=[ | |
gr.HighlightedText(label="NER", combine_adjacent=True) | |
] | |
with gr.Tab("Biomedical Entity Recognition"): | |
output=[ | |
gr.HighlightedText(label="NER", combine_adjacent=True), | |
#gr.JSON(label="Entity Counts"), | |
#gr.Label(label="Rating"), | |
#gr.Plot(label="Bar"), | |
gr.Dataframe(label="Dataframe"), | |
gr.File(label="File"), | |
] | |
examples=list(EXAMPLES.keys()) | |
gr.Examples(examples, inputs=input) | |
input.change(fn=ner, inputs=input, outputs=output) | |
with gr.Tab("Clinical Terminology Resolution"): | |
#output=[ | |
# gr.Textbox(placeholder="CT Match Results", lines=10) | |
#] | |
with gr.Row(variant="compact"): | |
btnLOINC = gr.Button("LOINC") | |
btnPanels = gr.Button("Panels") | |
btnSNOMED = gr.Button("SNOMED") | |
btnOMS = gr.Button("OMS") | |
btnICD10 = gr.Button("ICD10") | |
output=[ | |
gr.HighlightedText(label="NER", combine_adjacent=True), | |
gr.File(label="File"), # add download link here | |
gr.Dataframe(label="Dataframe", headers=["LOINC", "Panels", "SNOMED", "OMS", "ICD10"]), # add harmonised output for input corpus here as a dataframe to UI | |
gr.Textbox(placeholder="CT Match Results", lines=10) # add matched text scratchpad here | |
] | |
#textCT = gr.Textbox(placeholder="CT Match Results", lines=10) | |
#btnLOINC.click(loadFile, inputs=["LOINCTerms.txt"], outputs=output) | |
#btnPanels.click(loadFile, "LOINCPanelsandForms.txt", output) | |
#btnSNOMED.click(loadFile, "SNOMEDTerms.txt", output) | |
#btnOMS.click(loadFile, "OMSTerms.txt", output) | |
#btnICD10.click(loadFile, "ICD10Terms.txt", output) | |
examples=list(EXAMPLES.keys()) | |
gr.Examples(examples, inputs=input) | |
input.change(fn=ner, inputs=input, outputs=output) | |
#with gr.Tab("Examples Page 1"): | |
# gr.Examples(["a", "b", "c"], inputs=input) | |
#with gr.Tab("Examples Page 2"): | |
# gr.Examples(["d", "e", "f"], inputs=input) | |
#with gr.Tab("Examples Page 2"): | |
# gr.Examples(["g", "h", "i"], inputs=input) | |
demo.launch(debug=True) | |
# Old Way - Interface Load | |
#interface = gr.Interface( | |
# ner, | |
# inputs=gr.Textbox(label="Note text", value=""), | |
# outputs=[ | |
# gr.HighlightedText(label="NER", combine_adjacent=True), | |
# gr.JSON(label="Entity Counts"), | |
# gr.Label(label="Rating"), | |
# gr.Plot(label="Bar"), | |
# ], | |
# examples=list(EXAMPLES.keys()), | |
# allow_flagging="never", | |
#) | |
#interface.launch() |