# Natural Language Toolkit: NKJP Corpus Reader # # Copyright (C) 2001-2023 NLTK Project # Author: Gabriela Kaczka # URL: # For license information, see LICENSE.TXT import functools import os import re import tempfile from nltk.corpus.reader.util import concat from nltk.corpus.reader.xmldocs import XMLCorpusReader, XMLCorpusView def _parse_args(fun): """ Wraps function arguments: if fileids not specified then function set NKJPCorpusReader paths. """ @functools.wraps(fun) def decorator(self, fileids=None, **kwargs): if not fileids: fileids = self._paths return fun(self, fileids, **kwargs) return decorator class NKJPCorpusReader(XMLCorpusReader): WORDS_MODE = 0 SENTS_MODE = 1 HEADER_MODE = 2 RAW_MODE = 3 def __init__(self, root, fileids=".*"): """ Corpus reader designed to work with National Corpus of Polish. See http://nkjp.pl/ for more details about NKJP. use example: import nltk import nkjp from nkjp import NKJPCorpusReader x = NKJPCorpusReader(root='/home/USER/nltk_data/corpora/nkjp/', fileids='') # obtain the whole corpus x.header() x.raw() x.words() x.tagged_words(tags=['subst', 'comp']) #Link to find more tags: nkjp.pl/poliqarp/help/ense2.html x.sents() x = NKJPCorpusReader(root='/home/USER/nltk_data/corpora/nkjp/', fileids='Wilk*') # obtain particular file(s) x.header(fileids=['WilkDom', '/home/USER/nltk_data/corpora/nkjp/WilkWilczy']) x.tagged_words(fileids=['WilkDom', '/home/USER/nltk_data/corpora/nkjp/WilkWilczy'], tags=['subst', 'comp']) """ if isinstance(fileids, str): XMLCorpusReader.__init__(self, root, fileids + ".*/header.xml") else: XMLCorpusReader.__init__( self, root, [fileid + "/header.xml" for fileid in fileids] ) self._paths = self.get_paths() def get_paths(self): return [ os.path.join(str(self._root), f.split("header.xml")[0]) for f in self._fileids ] def fileids(self): """ Returns a list of file identifiers for the fileids that make up this corpus. """ return [f.split("header.xml")[0] for f in self._fileids] def _view(self, filename, tags=None, **kwargs): """ Returns a view specialised for use with particular corpus file. """ mode = kwargs.pop("mode", NKJPCorpusReader.WORDS_MODE) if mode is NKJPCorpusReader.WORDS_MODE: return NKJPCorpus_Morph_View(filename, tags=tags) elif mode is NKJPCorpusReader.SENTS_MODE: return NKJPCorpus_Segmentation_View(filename, tags=tags) elif mode is NKJPCorpusReader.HEADER_MODE: return NKJPCorpus_Header_View(filename, tags=tags) elif mode is NKJPCorpusReader.RAW_MODE: return NKJPCorpus_Text_View( filename, tags=tags, mode=NKJPCorpus_Text_View.RAW_MODE ) else: raise NameError("No such mode!") def add_root(self, fileid): """ Add root if necessary to specified fileid. """ if self.root in fileid: return fileid return self.root + fileid @_parse_args def header(self, fileids=None, **kwargs): """ Returns header(s) of specified fileids. """ return concat( [ self._view( self.add_root(fileid), mode=NKJPCorpusReader.HEADER_MODE, **kwargs ).handle_query() for fileid in fileids ] ) @_parse_args def sents(self, fileids=None, **kwargs): """ Returns sentences in specified fileids. """ return concat( [ self._view( self.add_root(fileid), mode=NKJPCorpusReader.SENTS_MODE, **kwargs ).handle_query() for fileid in fileids ] ) @_parse_args def words(self, fileids=None, **kwargs): """ Returns words in specified fileids. """ return concat( [ self._view( self.add_root(fileid), mode=NKJPCorpusReader.WORDS_MODE, **kwargs ).handle_query() for fileid in fileids ] ) @_parse_args def tagged_words(self, fileids=None, **kwargs): """ Call with specified tags as a list, e.g. tags=['subst', 'comp']. Returns tagged words in specified fileids. """ tags = kwargs.pop("tags", []) return concat( [ self._view( self.add_root(fileid), mode=NKJPCorpusReader.WORDS_MODE, tags=tags, **kwargs ).handle_query() for fileid in fileids ] ) @_parse_args def raw(self, fileids=None, **kwargs): """ Returns words in specified fileids. """ return concat( [ self._view( self.add_root(fileid), mode=NKJPCorpusReader.RAW_MODE, **kwargs ).handle_query() for fileid in fileids ] ) class NKJPCorpus_Header_View(XMLCorpusView): def __init__(self, filename, **kwargs): """ HEADER_MODE A stream backed corpus view specialized for use with header.xml files in NKJP corpus. """ self.tagspec = ".*/sourceDesc$" XMLCorpusView.__init__(self, filename + "header.xml", self.tagspec) def handle_query(self): self._open() header = [] while True: segm = XMLCorpusView.read_block(self, self._stream) if len(segm) == 0: break header.extend(segm) self.close() return header def handle_elt(self, elt, context): titles = elt.findall("bibl/title") title = [] if titles: title = "\n".join(title.text.strip() for title in titles) authors = elt.findall("bibl/author") author = [] if authors: author = "\n".join(author.text.strip() for author in authors) dates = elt.findall("bibl/date") date = [] if dates: date = "\n".join(date.text.strip() for date in dates) publishers = elt.findall("bibl/publisher") publisher = [] if publishers: publisher = "\n".join(publisher.text.strip() for publisher in publishers) idnos = elt.findall("bibl/idno") idno = [] if idnos: idno = "\n".join(idno.text.strip() for idno in idnos) notes = elt.findall("bibl/note") note = [] if notes: note = "\n".join(note.text.strip() for note in notes) return { "title": title, "author": author, "date": date, "publisher": publisher, "idno": idno, "note": note, } class XML_Tool: """ Helper class creating xml file to one without references to nkjp: namespace. That's needed because the XMLCorpusView assumes that one can find short substrings of XML that are valid XML, which is not true if a namespace is declared at top level """ def __init__(self, root, filename): self.read_file = os.path.join(root, filename) self.write_file = tempfile.NamedTemporaryFile(delete=False) def build_preprocessed_file(self): try: fr = open(self.read_file) fw = self.write_file line = " " while len(line): line = fr.readline() x = re.split(r"nkjp:[^ ]* ", line) # in all files ret = " ".join(x) x = re.split("", ret) # in ann_segmentation.xml ret = " ".join(x) x = re.split("", ret) # in ann_segmentation.xml ret = " ".join(x) x = re.split("", ret) # in ann_segmentation.xml ret = " ".join(x) x = re.split("", ret) # in ann_segmentation.xml ret = " ".join(x) fw.write(ret) fr.close() fw.close() return self.write_file.name except Exception as e: self.remove_preprocessed_file() raise Exception from e def remove_preprocessed_file(self): os.remove(self.write_file.name) class NKJPCorpus_Segmentation_View(XMLCorpusView): """ A stream backed corpus view specialized for use with ann_segmentation.xml files in NKJP corpus. """ def __init__(self, filename, **kwargs): self.tagspec = ".*p/.*s" # intersperse NKJPCorpus_Text_View self.text_view = NKJPCorpus_Text_View( filename, mode=NKJPCorpus_Text_View.SENTS_MODE ) self.text_view.handle_query() # xml preprocessing self.xml_tool = XML_Tool(filename, "ann_segmentation.xml") # base class init XMLCorpusView.__init__( self, self.xml_tool.build_preprocessed_file(), self.tagspec ) def get_segm_id(self, example_word): return example_word.split("(")[1].split(",")[0] def get_sent_beg(self, beg_word): # returns index of beginning letter in sentence return int(beg_word.split(",")[1]) def get_sent_end(self, end_word): # returns index of end letter in sentence splitted = end_word.split(")")[0].split(",") return int(splitted[1]) + int(splitted[2]) def get_sentences(self, sent_segm): # returns one sentence id = self.get_segm_id(sent_segm[0]) segm = self.text_view.segm_dict[id] # text segment beg = self.get_sent_beg(sent_segm[0]) end = self.get_sent_end(sent_segm[len(sent_segm) - 1]) return segm[beg:end] def remove_choice(self, segm): ret = [] prev_txt_end = -1 prev_txt_nr = -1 for word in segm: txt_nr = self.get_segm_id(word) # get increasing sequence of ids: in case of choice get first possibility if self.get_sent_beg(word) > prev_txt_end - 1 or prev_txt_nr != txt_nr: ret.append(word) prev_txt_end = self.get_sent_end(word) prev_txt_nr = txt_nr return ret def handle_query(self): try: self._open() sentences = [] while True: sent_segm = XMLCorpusView.read_block(self, self._stream) if len(sent_segm) == 0: break for segm in sent_segm: segm = self.remove_choice(segm) sentences.append(self.get_sentences(segm)) self.close() self.xml_tool.remove_preprocessed_file() return sentences except Exception as e: self.xml_tool.remove_preprocessed_file() raise Exception from e def handle_elt(self, elt, context): ret = [] for seg in elt: ret.append(seg.get("corresp")) return ret class NKJPCorpus_Text_View(XMLCorpusView): """ A stream backed corpus view specialized for use with text.xml files in NKJP corpus. """ SENTS_MODE = 0 RAW_MODE = 1 def __init__(self, filename, **kwargs): self.mode = kwargs.pop("mode", 0) self.tagspec = ".*/div/ab" self.segm_dict = dict() # xml preprocessing self.xml_tool = XML_Tool(filename, "text.xml") # base class init XMLCorpusView.__init__( self, self.xml_tool.build_preprocessed_file(), self.tagspec ) def handle_query(self): try: self._open() x = self.read_block(self._stream) self.close() self.xml_tool.remove_preprocessed_file() return x except Exception as e: self.xml_tool.remove_preprocessed_file() raise Exception from e def read_block(self, stream, tagspec=None, elt_handler=None): """ Returns text as a list of sentences. """ txt = [] while True: segm = XMLCorpusView.read_block(self, stream) if len(segm) == 0: break for part in segm: txt.append(part) return [" ".join([segm for segm in txt])] def get_segm_id(self, elt): for attr in elt.attrib: if attr.endswith("id"): return elt.get(attr) def handle_elt(self, elt, context): # fill dictionary to use later in sents mode if self.mode is NKJPCorpus_Text_View.SENTS_MODE: self.segm_dict[self.get_segm_id(elt)] = elt.text return elt.text class NKJPCorpus_Morph_View(XMLCorpusView): """ A stream backed corpus view specialized for use with ann_morphosyntax.xml files in NKJP corpus. """ def __init__(self, filename, **kwargs): self.tags = kwargs.pop("tags", None) self.tagspec = ".*/seg/fs" self.xml_tool = XML_Tool(filename, "ann_morphosyntax.xml") XMLCorpusView.__init__( self, self.xml_tool.build_preprocessed_file(), self.tagspec ) def handle_query(self): try: self._open() words = [] while True: segm = XMLCorpusView.read_block(self, self._stream) if len(segm) == 0: break for part in segm: if part is not None: words.append(part) self.close() self.xml_tool.remove_preprocessed_file() return words except Exception as e: self.xml_tool.remove_preprocessed_file() raise Exception from e def handle_elt(self, elt, context): word = "" flag = False is_not_interp = True # if tags not specified, then always return word if self.tags is None: flag = True for child in elt: # get word if "name" in child.keys() and child.attrib["name"] == "orth": for symbol in child: if symbol.tag == "string": word = symbol.text elif "name" in child.keys() and child.attrib["name"] == "interps": for symbol in child: if "type" in symbol.keys() and symbol.attrib["type"] == "lex": for symbol2 in symbol: if ( "name" in symbol2.keys() and symbol2.attrib["name"] == "ctag" ): for symbol3 in symbol2: if ( "value" in symbol3.keys() and self.tags is not None and symbol3.attrib["value"] in self.tags ): flag = True elif ( "value" in symbol3.keys() and symbol3.attrib["value"] == "interp" ): is_not_interp = False if flag and is_not_interp: return word