Spaces:
Sleeping
Sleeping
# Natural Language Toolkit: Concordance Application | |
# | |
# Copyright (C) 2001-2023 NLTK Project | |
# Author: Sumukh Ghodke <[email protected]> | |
# URL: <https://www.nltk.org/> | |
# For license information, see LICENSE.TXT | |
import queue as q | |
import re | |
import threading | |
from tkinter import ( | |
END, | |
LEFT, | |
SUNKEN, | |
Button, | |
Entry, | |
Frame, | |
IntVar, | |
Label, | |
Menu, | |
OptionMenu, | |
Scrollbar, | |
StringVar, | |
Text, | |
Tk, | |
) | |
from tkinter.font import Font | |
from nltk.corpus import ( | |
alpino, | |
brown, | |
cess_cat, | |
cess_esp, | |
floresta, | |
indian, | |
mac_morpho, | |
nps_chat, | |
sinica_treebank, | |
treebank, | |
) | |
from nltk.draw.util import ShowText | |
from nltk.util import in_idle | |
WORD_OR_TAG = "[^/ ]+" | |
BOUNDARY = r"\b" | |
CORPUS_LOADED_EVENT = "<<CL_EVENT>>" | |
SEARCH_TERMINATED_EVENT = "<<ST_EVENT>>" | |
SEARCH_ERROR_EVENT = "<<SE_EVENT>>" | |
ERROR_LOADING_CORPUS_EVENT = "<<ELC_EVENT>>" | |
POLL_INTERVAL = 50 | |
# NB All corpora must be specified in a lambda expression so as not to be | |
# loaded when the module is imported. | |
_DEFAULT = "English: Brown Corpus (Humor, simplified)" | |
_CORPORA = { | |
"Catalan: CESS-CAT Corpus (simplified)": lambda: cess_cat.tagged_sents( | |
tagset="universal" | |
), | |
"English: Brown Corpus": lambda: brown.tagged_sents(), | |
"English: Brown Corpus (simplified)": lambda: brown.tagged_sents( | |
tagset="universal" | |
), | |
"English: Brown Corpus (Press, simplified)": lambda: brown.tagged_sents( | |
categories=["news", "editorial", "reviews"], tagset="universal" | |
), | |
"English: Brown Corpus (Religion, simplified)": lambda: brown.tagged_sents( | |
categories="religion", tagset="universal" | |
), | |
"English: Brown Corpus (Learned, simplified)": lambda: brown.tagged_sents( | |
categories="learned", tagset="universal" | |
), | |
"English: Brown Corpus (Science Fiction, simplified)": lambda: brown.tagged_sents( | |
categories="science_fiction", tagset="universal" | |
), | |
"English: Brown Corpus (Romance, simplified)": lambda: brown.tagged_sents( | |
categories="romance", tagset="universal" | |
), | |
"English: Brown Corpus (Humor, simplified)": lambda: brown.tagged_sents( | |
categories="humor", tagset="universal" | |
), | |
"English: NPS Chat Corpus": lambda: nps_chat.tagged_posts(), | |
"English: NPS Chat Corpus (simplified)": lambda: nps_chat.tagged_posts( | |
tagset="universal" | |
), | |
"English: Wall Street Journal Corpus": lambda: treebank.tagged_sents(), | |
"English: Wall Street Journal Corpus (simplified)": lambda: treebank.tagged_sents( | |
tagset="universal" | |
), | |
"Chinese: Sinica Corpus": lambda: sinica_treebank.tagged_sents(), | |
"Chinese: Sinica Corpus (simplified)": lambda: sinica_treebank.tagged_sents( | |
tagset="universal" | |
), | |
"Dutch: Alpino Corpus": lambda: alpino.tagged_sents(), | |
"Dutch: Alpino Corpus (simplified)": lambda: alpino.tagged_sents( | |
tagset="universal" | |
), | |
"Hindi: Indian Languages Corpus": lambda: indian.tagged_sents(files="hindi.pos"), | |
"Hindi: Indian Languages Corpus (simplified)": lambda: indian.tagged_sents( | |
files="hindi.pos", tagset="universal" | |
), | |
"Portuguese: Floresta Corpus (Portugal)": lambda: floresta.tagged_sents(), | |
"Portuguese: Floresta Corpus (Portugal, simplified)": lambda: floresta.tagged_sents( | |
tagset="universal" | |
), | |
"Portuguese: MAC-MORPHO Corpus (Brazil)": lambda: mac_morpho.tagged_sents(), | |
"Portuguese: MAC-MORPHO Corpus (Brazil, simplified)": lambda: mac_morpho.tagged_sents( | |
tagset="universal" | |
), | |
"Spanish: CESS-ESP Corpus (simplified)": lambda: cess_esp.tagged_sents( | |
tagset="universal" | |
), | |
} | |
class ConcordanceSearchView: | |
_BACKGROUND_COLOUR = "#FFF" # white | |
# Colour of highlighted results | |
_HIGHLIGHT_WORD_COLOUR = "#F00" # red | |
_HIGHLIGHT_WORD_TAG = "HL_WRD_TAG" | |
_HIGHLIGHT_LABEL_COLOUR = "#C0C0C0" # dark grey | |
_HIGHLIGHT_LABEL_TAG = "HL_LBL_TAG" | |
# Percentage of text left of the scrollbar position | |
_FRACTION_LEFT_TEXT = 0.30 | |
def __init__(self): | |
self.queue = q.Queue() | |
self.model = ConcordanceSearchModel(self.queue) | |
self.top = Tk() | |
self._init_top(self.top) | |
self._init_menubar() | |
self._init_widgets(self.top) | |
self.load_corpus(self.model.DEFAULT_CORPUS) | |
self.after = self.top.after(POLL_INTERVAL, self._poll) | |
def _init_top(self, top): | |
top.geometry("950x680+50+50") | |
top.title("NLTK Concordance Search") | |
top.bind("<Control-q>", self.destroy) | |
top.protocol("WM_DELETE_WINDOW", self.destroy) | |
top.minsize(950, 680) | |
def _init_widgets(self, parent): | |
self.main_frame = Frame( | |
parent, dict(background=self._BACKGROUND_COLOUR, padx=1, pady=1, border=1) | |
) | |
self._init_corpus_select(self.main_frame) | |
self._init_query_box(self.main_frame) | |
self._init_results_box(self.main_frame) | |
self._init_paging(self.main_frame) | |
self._init_status(self.main_frame) | |
self.main_frame.pack(fill="both", expand=True) | |
def _init_menubar(self): | |
self._result_size = IntVar(self.top) | |
self._cntx_bf_len = IntVar(self.top) | |
self._cntx_af_len = IntVar(self.top) | |
menubar = Menu(self.top) | |
filemenu = Menu(menubar, tearoff=0, borderwidth=0) | |
filemenu.add_command( | |
label="Exit", underline=1, command=self.destroy, accelerator="Ctrl-q" | |
) | |
menubar.add_cascade(label="File", underline=0, menu=filemenu) | |
editmenu = Menu(menubar, tearoff=0) | |
rescntmenu = Menu(editmenu, tearoff=0) | |
rescntmenu.add_radiobutton( | |
label="20", | |
variable=self._result_size, | |
underline=0, | |
value=20, | |
command=self.set_result_size, | |
) | |
rescntmenu.add_radiobutton( | |
label="50", | |
variable=self._result_size, | |
underline=0, | |
value=50, | |
command=self.set_result_size, | |
) | |
rescntmenu.add_radiobutton( | |
label="100", | |
variable=self._result_size, | |
underline=0, | |
value=100, | |
command=self.set_result_size, | |
) | |
rescntmenu.invoke(1) | |
editmenu.add_cascade(label="Result Count", underline=0, menu=rescntmenu) | |
cntxmenu = Menu(editmenu, tearoff=0) | |
cntxbfmenu = Menu(cntxmenu, tearoff=0) | |
cntxbfmenu.add_radiobutton( | |
label="60 characters", | |
variable=self._cntx_bf_len, | |
underline=0, | |
value=60, | |
command=self.set_cntx_bf_len, | |
) | |
cntxbfmenu.add_radiobutton( | |
label="80 characters", | |
variable=self._cntx_bf_len, | |
underline=0, | |
value=80, | |
command=self.set_cntx_bf_len, | |
) | |
cntxbfmenu.add_radiobutton( | |
label="100 characters", | |
variable=self._cntx_bf_len, | |
underline=0, | |
value=100, | |
command=self.set_cntx_bf_len, | |
) | |
cntxbfmenu.invoke(1) | |
cntxmenu.add_cascade(label="Before", underline=0, menu=cntxbfmenu) | |
cntxafmenu = Menu(cntxmenu, tearoff=0) | |
cntxafmenu.add_radiobutton( | |
label="70 characters", | |
variable=self._cntx_af_len, | |
underline=0, | |
value=70, | |
command=self.set_cntx_af_len, | |
) | |
cntxafmenu.add_radiobutton( | |
label="90 characters", | |
variable=self._cntx_af_len, | |
underline=0, | |
value=90, | |
command=self.set_cntx_af_len, | |
) | |
cntxafmenu.add_radiobutton( | |
label="110 characters", | |
variable=self._cntx_af_len, | |
underline=0, | |
value=110, | |
command=self.set_cntx_af_len, | |
) | |
cntxafmenu.invoke(1) | |
cntxmenu.add_cascade(label="After", underline=0, menu=cntxafmenu) | |
editmenu.add_cascade(label="Context", underline=0, menu=cntxmenu) | |
menubar.add_cascade(label="Edit", underline=0, menu=editmenu) | |
self.top.config(menu=menubar) | |
def set_result_size(self, **kwargs): | |
self.model.result_count = self._result_size.get() | |
def set_cntx_af_len(self, **kwargs): | |
self._char_after = self._cntx_af_len.get() | |
def set_cntx_bf_len(self, **kwargs): | |
self._char_before = self._cntx_bf_len.get() | |
def _init_corpus_select(self, parent): | |
innerframe = Frame(parent, background=self._BACKGROUND_COLOUR) | |
self.var = StringVar(innerframe) | |
self.var.set(self.model.DEFAULT_CORPUS) | |
Label( | |
innerframe, | |
justify=LEFT, | |
text=" Corpus: ", | |
background=self._BACKGROUND_COLOUR, | |
padx=2, | |
pady=1, | |
border=0, | |
).pack(side="left") | |
other_corpora = list(self.model.CORPORA.keys()).remove( | |
self.model.DEFAULT_CORPUS | |
) | |
om = OptionMenu( | |
innerframe, | |
self.var, | |
self.model.DEFAULT_CORPUS, | |
command=self.corpus_selected, | |
*self.model.non_default_corpora() | |
) | |
om["borderwidth"] = 0 | |
om["highlightthickness"] = 1 | |
om.pack(side="left") | |
innerframe.pack(side="top", fill="x", anchor="n") | |
def _init_status(self, parent): | |
self.status = Label( | |
parent, | |
justify=LEFT, | |
relief=SUNKEN, | |
background=self._BACKGROUND_COLOUR, | |
border=0, | |
padx=1, | |
pady=0, | |
) | |
self.status.pack(side="top", anchor="sw") | |
def _init_query_box(self, parent): | |
innerframe = Frame(parent, background=self._BACKGROUND_COLOUR) | |
another = Frame(innerframe, background=self._BACKGROUND_COLOUR) | |
self.query_box = Entry(another, width=60) | |
self.query_box.pack(side="left", fill="x", pady=25, anchor="center") | |
self.search_button = Button( | |
another, | |
text="Search", | |
command=self.search, | |
borderwidth=1, | |
highlightthickness=1, | |
) | |
self.search_button.pack(side="left", fill="x", pady=25, anchor="center") | |
self.query_box.bind("<KeyPress-Return>", self.search_enter_keypress_handler) | |
another.pack() | |
innerframe.pack(side="top", fill="x", anchor="n") | |
def search_enter_keypress_handler(self, *event): | |
self.search() | |
def _init_results_box(self, parent): | |
innerframe = Frame(parent) | |
i1 = Frame(innerframe) | |
i2 = Frame(innerframe) | |
vscrollbar = Scrollbar(i1, borderwidth=1) | |
hscrollbar = Scrollbar(i2, borderwidth=1, orient="horiz") | |
self.results_box = Text( | |
i1, | |
font=Font(family="courier", size="16"), | |
state="disabled", | |
borderwidth=1, | |
yscrollcommand=vscrollbar.set, | |
xscrollcommand=hscrollbar.set, | |
wrap="none", | |
width="40", | |
height="20", | |
exportselection=1, | |
) | |
self.results_box.pack(side="left", fill="both", expand=True) | |
self.results_box.tag_config( | |
self._HIGHLIGHT_WORD_TAG, foreground=self._HIGHLIGHT_WORD_COLOUR | |
) | |
self.results_box.tag_config( | |
self._HIGHLIGHT_LABEL_TAG, foreground=self._HIGHLIGHT_LABEL_COLOUR | |
) | |
vscrollbar.pack(side="left", fill="y", anchor="e") | |
vscrollbar.config(command=self.results_box.yview) | |
hscrollbar.pack(side="left", fill="x", expand=True, anchor="w") | |
hscrollbar.config(command=self.results_box.xview) | |
# there is no other way of avoiding the overlap of scrollbars while using pack layout manager!!! | |
Label(i2, text=" ", background=self._BACKGROUND_COLOUR).pack( | |
side="left", anchor="e" | |
) | |
i1.pack(side="top", fill="both", expand=True, anchor="n") | |
i2.pack(side="bottom", fill="x", anchor="s") | |
innerframe.pack(side="top", fill="both", expand=True) | |
def _init_paging(self, parent): | |
innerframe = Frame(parent, background=self._BACKGROUND_COLOUR) | |
self.prev = prev = Button( | |
innerframe, | |
text="Previous", | |
command=self.previous, | |
width="10", | |
borderwidth=1, | |
highlightthickness=1, | |
state="disabled", | |
) | |
prev.pack(side="left", anchor="center") | |
self.next = next = Button( | |
innerframe, | |
text="Next", | |
command=self.__next__, | |
width="10", | |
borderwidth=1, | |
highlightthickness=1, | |
state="disabled", | |
) | |
next.pack(side="right", anchor="center") | |
innerframe.pack(side="top", fill="y") | |
self.current_page = 0 | |
def previous(self): | |
self.clear_results_box() | |
self.freeze_editable() | |
self.model.prev(self.current_page - 1) | |
def __next__(self): | |
self.clear_results_box() | |
self.freeze_editable() | |
self.model.next(self.current_page + 1) | |
def about(self, *e): | |
ABOUT = "NLTK Concordance Search Demo\n" | |
TITLE = "About: NLTK Concordance Search Demo" | |
try: | |
from tkinter.messagebox import Message | |
Message(message=ABOUT, title=TITLE, parent=self.main_frame).show() | |
except: | |
ShowText(self.top, TITLE, ABOUT) | |
def _bind_event_handlers(self): | |
self.top.bind(CORPUS_LOADED_EVENT, self.handle_corpus_loaded) | |
self.top.bind(SEARCH_TERMINATED_EVENT, self.handle_search_terminated) | |
self.top.bind(SEARCH_ERROR_EVENT, self.handle_search_error) | |
self.top.bind(ERROR_LOADING_CORPUS_EVENT, self.handle_error_loading_corpus) | |
def _poll(self): | |
try: | |
event = self.queue.get(block=False) | |
except q.Empty: | |
pass | |
else: | |
if event == CORPUS_LOADED_EVENT: | |
self.handle_corpus_loaded(event) | |
elif event == SEARCH_TERMINATED_EVENT: | |
self.handle_search_terminated(event) | |
elif event == SEARCH_ERROR_EVENT: | |
self.handle_search_error(event) | |
elif event == ERROR_LOADING_CORPUS_EVENT: | |
self.handle_error_loading_corpus(event) | |
self.after = self.top.after(POLL_INTERVAL, self._poll) | |
def handle_error_loading_corpus(self, event): | |
self.status["text"] = "Error in loading " + self.var.get() | |
self.unfreeze_editable() | |
self.clear_all() | |
self.freeze_editable() | |
def handle_corpus_loaded(self, event): | |
self.status["text"] = self.var.get() + " is loaded" | |
self.unfreeze_editable() | |
self.clear_all() | |
self.query_box.focus_set() | |
def handle_search_terminated(self, event): | |
# todo: refactor the model such that it is less state sensitive | |
results = self.model.get_results() | |
self.write_results(results) | |
self.status["text"] = "" | |
if len(results) == 0: | |
self.status["text"] = "No results found for " + self.model.query | |
else: | |
self.current_page = self.model.last_requested_page | |
self.unfreeze_editable() | |
self.results_box.xview_moveto(self._FRACTION_LEFT_TEXT) | |
def handle_search_error(self, event): | |
self.status["text"] = "Error in query " + self.model.query | |
self.unfreeze_editable() | |
def corpus_selected(self, *args): | |
new_selection = self.var.get() | |
self.load_corpus(new_selection) | |
def load_corpus(self, selection): | |
if self.model.selected_corpus != selection: | |
self.status["text"] = "Loading " + selection + "..." | |
self.freeze_editable() | |
self.model.load_corpus(selection) | |
def search(self): | |
self.current_page = 0 | |
self.clear_results_box() | |
self.model.reset_results() | |
query = self.query_box.get() | |
if len(query.strip()) == 0: | |
return | |
self.status["text"] = "Searching for " + query | |
self.freeze_editable() | |
self.model.search(query, self.current_page + 1) | |
def write_results(self, results): | |
self.results_box["state"] = "normal" | |
row = 1 | |
for each in results: | |
sent, pos1, pos2 = each[0].strip(), each[1], each[2] | |
if len(sent) != 0: | |
if pos1 < self._char_before: | |
sent, pos1, pos2 = self.pad(sent, pos1, pos2) | |
sentence = sent[pos1 - self._char_before : pos1 + self._char_after] | |
if not row == len(results): | |
sentence += "\n" | |
self.results_box.insert(str(row) + ".0", sentence) | |
word_markers, label_markers = self.words_and_labels(sent, pos1, pos2) | |
for marker in word_markers: | |
self.results_box.tag_add( | |
self._HIGHLIGHT_WORD_TAG, | |
str(row) + "." + str(marker[0]), | |
str(row) + "." + str(marker[1]), | |
) | |
for marker in label_markers: | |
self.results_box.tag_add( | |
self._HIGHLIGHT_LABEL_TAG, | |
str(row) + "." + str(marker[0]), | |
str(row) + "." + str(marker[1]), | |
) | |
row += 1 | |
self.results_box["state"] = "disabled" | |
def words_and_labels(self, sentence, pos1, pos2): | |
search_exp = sentence[pos1:pos2] | |
words, labels = [], [] | |
labeled_words = search_exp.split(" ") | |
index = 0 | |
for each in labeled_words: | |
if each == "": | |
index += 1 | |
else: | |
word, label = each.split("/") | |
words.append( | |
(self._char_before + index, self._char_before + index + len(word)) | |
) | |
index += len(word) + 1 | |
labels.append( | |
(self._char_before + index, self._char_before + index + len(label)) | |
) | |
index += len(label) | |
index += 1 | |
return words, labels | |
def pad(self, sent, hstart, hend): | |
if hstart >= self._char_before: | |
return sent, hstart, hend | |
d = self._char_before - hstart | |
sent = "".join([" "] * d) + sent | |
return sent, hstart + d, hend + d | |
def destroy(self, *e): | |
if self.top is None: | |
return | |
self.top.after_cancel(self.after) | |
self.top.destroy() | |
self.top = None | |
def clear_all(self): | |
self.query_box.delete(0, END) | |
self.model.reset_query() | |
self.clear_results_box() | |
def clear_results_box(self): | |
self.results_box["state"] = "normal" | |
self.results_box.delete("1.0", END) | |
self.results_box["state"] = "disabled" | |
def freeze_editable(self): | |
self.query_box["state"] = "disabled" | |
self.search_button["state"] = "disabled" | |
self.prev["state"] = "disabled" | |
self.next["state"] = "disabled" | |
def unfreeze_editable(self): | |
self.query_box["state"] = "normal" | |
self.search_button["state"] = "normal" | |
self.set_paging_button_states() | |
def set_paging_button_states(self): | |
if self.current_page == 0 or self.current_page == 1: | |
self.prev["state"] = "disabled" | |
else: | |
self.prev["state"] = "normal" | |
if self.model.has_more_pages(self.current_page): | |
self.next["state"] = "normal" | |
else: | |
self.next["state"] = "disabled" | |
def fire_event(self, event): | |
# Firing an event so that rendering of widgets happen in the mainloop thread | |
self.top.event_generate(event, when="tail") | |
def mainloop(self, *args, **kwargs): | |
if in_idle(): | |
return | |
self.top.mainloop(*args, **kwargs) | |
class ConcordanceSearchModel: | |
def __init__(self, queue): | |
self.queue = queue | |
self.CORPORA = _CORPORA | |
self.DEFAULT_CORPUS = _DEFAULT | |
self.selected_corpus = None | |
self.reset_query() | |
self.reset_results() | |
self.result_count = None | |
self.last_sent_searched = 0 | |
def non_default_corpora(self): | |
copy = [] | |
copy.extend(list(self.CORPORA.keys())) | |
copy.remove(self.DEFAULT_CORPUS) | |
copy.sort() | |
return copy | |
def load_corpus(self, name): | |
self.selected_corpus = name | |
self.tagged_sents = [] | |
runner_thread = self.LoadCorpus(name, self) | |
runner_thread.start() | |
def search(self, query, page): | |
self.query = query | |
self.last_requested_page = page | |
self.SearchCorpus(self, page, self.result_count).start() | |
def next(self, page): | |
self.last_requested_page = page | |
if len(self.results) < page: | |
self.search(self.query, page) | |
else: | |
self.queue.put(SEARCH_TERMINATED_EVENT) | |
def prev(self, page): | |
self.last_requested_page = page | |
self.queue.put(SEARCH_TERMINATED_EVENT) | |
def reset_results(self): | |
self.last_sent_searched = 0 | |
self.results = [] | |
self.last_page = None | |
def reset_query(self): | |
self.query = None | |
def set_results(self, page, resultset): | |
self.results.insert(page - 1, resultset) | |
def get_results(self): | |
return self.results[self.last_requested_page - 1] | |
def has_more_pages(self, page): | |
if self.results == [] or self.results[0] == []: | |
return False | |
if self.last_page is None: | |
return True | |
return page < self.last_page | |
class LoadCorpus(threading.Thread): | |
def __init__(self, name, model): | |
threading.Thread.__init__(self) | |
self.model, self.name = model, name | |
def run(self): | |
try: | |
ts = self.model.CORPORA[self.name]() | |
self.model.tagged_sents = [ | |
" ".join(w + "/" + t for (w, t) in sent) for sent in ts | |
] | |
self.model.queue.put(CORPUS_LOADED_EVENT) | |
except Exception as e: | |
print(e) | |
self.model.queue.put(ERROR_LOADING_CORPUS_EVENT) | |
class SearchCorpus(threading.Thread): | |
def __init__(self, model, page, count): | |
self.model, self.count, self.page = model, count, page | |
threading.Thread.__init__(self) | |
def run(self): | |
q = self.processed_query() | |
sent_pos, i, sent_count = [], 0, 0 | |
for sent in self.model.tagged_sents[self.model.last_sent_searched :]: | |
try: | |
m = re.search(q, sent) | |
except re.error: | |
self.model.reset_results() | |
self.model.queue.put(SEARCH_ERROR_EVENT) | |
return | |
if m: | |
sent_pos.append((sent, m.start(), m.end())) | |
i += 1 | |
if i > self.count: | |
self.model.last_sent_searched += sent_count - 1 | |
break | |
sent_count += 1 | |
if self.count >= len(sent_pos): | |
self.model.last_sent_searched += sent_count - 1 | |
self.model.last_page = self.page | |
self.model.set_results(self.page, sent_pos) | |
else: | |
self.model.set_results(self.page, sent_pos[:-1]) | |
self.model.queue.put(SEARCH_TERMINATED_EVENT) | |
def processed_query(self): | |
new = [] | |
for term in self.model.query.split(): | |
term = re.sub(r"\.", r"[^/ ]", term) | |
if re.match("[A-Z]+$", term): | |
new.append(BOUNDARY + WORD_OR_TAG + "/" + term + BOUNDARY) | |
elif "/" in term: | |
new.append(BOUNDARY + term + BOUNDARY) | |
else: | |
new.append(BOUNDARY + term + "/" + WORD_OR_TAG + BOUNDARY) | |
return " ".join(new) | |
def app(): | |
d = ConcordanceSearchView() | |
d.mainloop() | |
if __name__ == "__main__": | |
app() | |
__all__ = ["app"] | |