document_loaders / toolbox /string /tokenization.py
HoneyTian's picture
first commit
e94100d
raw
history blame
27.7 kB
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
分词器
"""
from collections import defaultdict
import json
import logging
import re
from typing import *
import unicodedata
from tqdm import tqdm
from toolbox.string.character import Character, LowerCase, Pattern
logger = logging.getLogger(__file__)
_DEFAULT_SPLITTER_NAME = 'unknown'
class Splitter(object):
def __init__(self, name=_DEFAULT_SPLITTER_NAME):
self.name = name
def split(self, text: str) -> List[str]:
raise NotImplementedError()
def post_process(self, tokens: List[List[str]]):
return tokens
class ByCharSplitterV1(Splitter):
def __init__(self, name=_DEFAULT_SPLITTER_NAME):
super().__init__(name=name)
def split(self, text: str) -> List[str]:
return self._split(text)
@staticmethod
def _split(text: str) -> List[str]:
flag = Character.f_unknown
sep = '[sep]'
ret = ''
for c in text:
if Character.is_hyphens(c):
ret += c
flag = Character.f_is_hyphens
elif Character.is_punctuation(c) or Character.is_cjk_character(c) or Character.is_jap_character(c):
if flag != Character.f_is_hyphens:
c = sep + c
ret += c
flag = Character.f_is_punctuation
elif Character.is_space(c):
# 连续的多个空隔, 不能合并为 1 个. 合并后, 分出的词不等于原来输入的别名.
if flag != Character.f_is_space:
c = sep + c
ret += c
flag = Character.f_is_space
elif Character.is_alpha(c):
if flag not in (Character.f_is_alpha, Character.f_is_hyphens):
c = sep + c
ret += c
flag = Character.f_is_alpha
elif Character.is_num(c):
if flag not in (Character.f_is_num, Character.f_is_hyphens):
c = sep + c
ret += c
flag = Character.f_is_num
else:
if flag not in (Character.f_unknown, Character.f_is_hyphens):
c = sep + c
ret += c
flag = Character.f_unknown
ret = ret.split(sep)
ret = [ch for ch in ret if ch != '']
if len(''.join(ret)) != len(text):
raise AssertionError('this method should not change the char num. '
'text: {}, ret: {}'.format(text, ''.join(ret)))
return ret
class ByCharSplitterV2(Splitter):
"""
在正则表达式的锚点识别时, `3000-3999` 应能分割出 `000`, 因此, 连续的数字须视作一个 token.
于是定义了此类, 以区别于将连续的数字被识别为多个 token.
ByCharSplitterV1 中, 连续的数字如 `3000` 将被分割为 ['3', '0', '0', '0']
"""
def __init__(self, name=_DEFAULT_SPLITTER_NAME):
super().__init__(name=name)
def split(self, text: str) -> List[str]:
return self._split(text)
@staticmethod
def _split(text: str) -> List[str]:
"""将 text 分割为 token list, 然后再按 token 到 trie 树匹配, 分词. """
flag = Character.f_unknown
sep = '[sep]'
ret = ''
for c in text:
if Character.is_hyphens(c):
# 3000-3999 应能分割出 000, 因此, 连字符不能生效.
c = sep + c
ret += c
flag = Character.f_is_hyphens
elif Character.is_punctuation(c) or Character.is_cjk_character(c) or Character.is_jap_character(c):
if flag != Character.f_is_hyphens:
c = sep + c
ret += c
flag = Character.f_is_punctuation
elif Character.is_space(c):
# 连续的多个空隔, 不能合并为 1 个. 合并后, 分出的词不等于原来输入的别名.
if flag != Character.f_is_space:
c = sep + c
ret += c
flag = Character.f_is_space
elif Character.is_alpha(c):
if flag not in (Character.f_is_alpha, Character.f_is_hyphens):
c = sep + c
ret += c
flag = Character.f_is_alpha
elif Character.is_num(c):
# 3000-3999 应能分割出 000, 因此, 连续的数字视作一个 token.
if flag not in (Character.f_is_hyphens,):
c = sep + c
ret += c
flag = Character.f_is_num
else:
if flag not in (Character.f_unknown, Character.f_is_hyphens):
c = sep + c
ret += c
flag = Character.f_unknown
ret = ret.split(sep)
ret = [ch for ch in ret if ch != '']
if len(''.join(ret)) != len(text):
raise AssertionError('this method should not change the char num. '
'text: {}, ret: {}'.format(text, ''.join(ret)))
return ret
class ListSplitter(Splitter):
def split(self, text: str):
return list(text)
class ListEncodeOneSplitter(Splitter):
def split(self, text: str):
result = list()
for c in text:
dummy = '[{}]'.format(ord(c))
result.append(dummy)
return result
def post_process(self, tokens: List[List[str]]):
tokens_ = list()
for token in tokens:
token_ = list()
for t in token:
idx = t[1:-1]
t = chr(int(idx))
token_.append(t)
tokens_.append(token_)
return tokens_
_DEFAULT_SPLITTER_NAME_TO_SPLITTER = {
'by_char_splitter_v1': ByCharSplitterV1(),
'by_char_splitter_v2': ByCharSplitterV2(),
'list_splitter': ListSplitter(),
'list_encoder_one_splitter': ListEncodeOneSplitter(),
}
_DEFAULT_TOKENIZER_NAME = 'unknown'
class Tokenizer(object):
"""Abstract"""
@staticmethod
def lowercase(string: str) -> str:
string = LowerCase.lowercase(string)
return string
def __init__(self, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False):
self.name = name
self.case_sensitive = case_sensitive
def insert(self, word: str) -> None:
raise NotImplementedError()
def insert_from_list(self, words: Iterable[Any]) -> None:
words = cast(List[Any], words)
if len(words) == 0:
return None
for word in tqdm(words):
self.insert(word)
def insert_black(self, word: str) -> None:
raise NotImplementedError()
def insert_black_from_list(self, words: Iterable[Any]) -> None:
words = cast(List[Any], words)
if len(words) == 0:
return None
for word in tqdm(words):
self.insert_black(word)
def tokenize(self, text: str, full_mode: bool = False) -> Tuple[List[str], List[bool]]:
raise NotImplementedError()
@staticmethod
def _merge_tokens(tokens: List[str], isword_list: List[bool]) -> Tuple[List[str], List[bool]]:
"""
在 tokenize 分词后, 由于应用了黑名单, 有些分割出的词被标记为 False,
这导致结果中出现连续的两个 False.
在 segmenter 中, 多个分词器选后执行, 连续的两个 False 应合并, 以优化后面的分词的效果.
这里, 只合并连续的两个 False, 不处理其它符号.
"""
tokens2, isword_list2 = list(), list()
false_token = ''
for token, isword in zip(tokens, isword_list):
if isword is False:
false_token += str(token)
continue
if false_token != '':
tokens2.append(false_token)
isword_list2.append(False)
tokens2.append(token)
isword_list2.append(isword)
false_token = ''
else:
if false_token != '':
tokens2.append(false_token)
isword_list2.append(False)
return tokens2, isword_list2
class TrieNode(object):
"""建立词典的Trie树节点"""
def __init__(self, t_word=None):
self.t_word = t_word
self.children = dict()
def add_children(self, k, v):
self.children[k] = v
@property
def text(self):
if self.t_word is None:
return None
return ''.join(self.t_word)
@property
def isword(self):
if self.t_word is None:
return False
return True
def __repr__(self):
return '<{}.{} t_word={}>'.format(self.__module__, self.__class__.__name__, self.t_word)
class FastTokenizer(Tokenizer):
@staticmethod
def demo1():
fast = FastTokenizer()
fast.insert('我要退款')
fast.insert('色彩显示')
fast.insert('我要')
fast.insert('退款')
fast.insert('eid')
fast.insert('手机')
fast.insert('机不')
text = '手机不错我要退款'
c = fast.tokenize(text, full_mode=True)
print(c)
return
@staticmethod
def demo2():
fast = FastTokenizer(splitter=ListEncodeOneSplitter())
# fast.insert('พูดว่')
fast.insert('พูดว่า')
fast.insert('นะ')
fast.insert('พูดถึง')
fast.insert('คำพูด')
fast.insert('บอ')
text = 'พูดว่าอะไรนะ'
c = fast.tokenize(text, full_mode=False)
print(c)
return
@staticmethod
def token_list_to_string_list(token_list: List[List[str]]) -> List[str]:
"""因为 spliter 是将句子分割为 List[str], tokenize 是将列表中的子字符串合并为词. """
ret = list()
for l in token_list:
ret.append(''.join(l))
return ret
def __init__(self, splitter: Optional[Union[Splitter, str]] = None, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False):
if isinstance(splitter, str):
splitter = _DEFAULT_SPLITTER_NAME_TO_SPLITTER[splitter]
self.splitter = splitter or ByCharSplitterV1()
self.trie = TrieNode()
self._black_list: List[str] = list()
super(FastTokenizer, self).__init__(name=name, case_sensitive=case_sensitive)
def insert(self, word: str) -> None:
word = str(word)
if not self.case_sensitive:
word = self.lowercase(word)
t_word = self.splitter.split(word)
self._insert_node(t_word)
def insert_black(self, word: str) -> None:
"""
黑名单.
如遇到 `watch tv` 时, 不要识别出 `watch`.
注意: 因为是最大匹配, 所以在 `watch` 在黑名单时, `watch tv` 是可以识别到的.
"""
if word not in self._black_list:
self.insert(word)
self._black_list.append(word)
def _insert_node(self, t_word: List[str]) -> None:
now = self.trie
for t in t_word[:-1]:
if t not in now.children:
now.add_children(t, TrieNode())
now = now.children[t]
t = t_word[-1]
if t not in now.children:
now.add_children(t, TrieNode(t_word))
else:
now.children[t].t_word = t_word
def _tokenize(self, t_word: list, full_mode: bool = False):
outlst, iswlst = list(), list()
l = len(t_word)
b_idx = 0
l_idx = 0
max_e_idx = 0
while b_idx < l:
now = self.trie
found = False
ptr = b_idx
e_idx = None
while True:
t = t_word[ptr]
if not self.case_sensitive:
t = self.lowercase(t)
if t not in now.children and e_idx is not None:
found = True
break
if t not in now.children and e_idx is None:
break
if now.isword and full_mode:
if full_mode:
outlst.append(t_word[b_idx: ptr])
iswlst.append(True)
now = now.children[t]
ptr += 1
if now.isword:
e_idx = ptr
if ptr == l and e_idx is None:
break
if ptr == l and e_idx is not None:
found = True
break
if found is True:
if l_idx != b_idx:
outlst.append(t_word[l_idx: b_idx])
iswlst.append(False)
outlst.append(t_word[b_idx: e_idx])
iswlst.append(True)
max_e_idx = max(max_e_idx, e_idx)
if full_mode:
b_idx += 1
else:
b_idx = e_idx
l_idx = b_idx
else:
b_idx += 1
if max_e_idx < l:
outlst.append(t_word[l_idx:l])
iswlst.append(False)
return outlst, iswlst
def tokenize(self, text: Union[str, List[str]], full_mode=False) -> Tuple[List[str], List[bool]]:
if isinstance(text, list):
text_list = text
else:
text_list = [text]
outlst, iswlst = list(), list()
for text in text_list:
t_word = self.splitter.split(text)
outlst_tmp, iswlst_tmp = self._tokenize(t_word, full_mode)
outlst.extend(outlst_tmp)
iswlst.extend(iswlst_tmp)
outlst = self.splitter.post_process(outlst)
outlst = self.token_list_to_string_list(outlst)
# 应用黑名单.
for idx, out in enumerate(outlst):
if out in self._black_list:
iswlst[idx] = False
outlst, iswlst = self._merge_tokens(outlst, iswlst)
return outlst, iswlst
class TagTokenizer(FastTokenizer):
def __init__(self, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False):
super().__init__(name=name, case_sensitive=case_sensitive)
self._word2flags_dict = defaultdict(list)
def insert(self, word: str, tag: str = None) -> None:
if tag is not None:
self._word2flags_dict[word].append(tag)
super().insert(word)
def tokenize(self, text: Union[str, List[str]], full_mode: bool = False) -> Tuple[List[str], List[bool]]:
outlst, iswlst = super().tokenize(text)
iswlst2 = list()
for out, isw in zip(outlst, iswlst):
if isw is True:
iswlst2.append(self._word2flags_dict.get(out, True))
else:
iswlst2.append(False)
return outlst, iswlst2
class RegularTokenizer(Tokenizer):
"""
不同于 FastTokenizer, 此处用正则表示代替词来进行匹配.
优化:
1. 基于正则表达式 index 的快速查找.
2. re.compile. 在遇到无效正则表达式时, 会报错.
"""
@staticmethod
def demo1():
regular = RegularTokenizer()
regular.insert('我要退款')
regular.insert('色彩显示')
regular.insert('我要')
regular.insert('退款')
regular.insert('eid')
regular.insert('手机')
regular.insert('机不')
regular.insert(r'\d+左右')
text = '1500左右的手机不错我要退款'
ret = regular.tokenize(text, full_mode=False)
print(ret)
return
@staticmethod
def _outlst_iswlst_append(token, isword, outlst, iswlst):
if len(token) > 0:
outlst.append(token)
iswlst.append(isword)
return outlst, iswlst
def __init__(self, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False):
self.regular_quick_find_tokenizer = RegularQuickFindTokenizer()
self._black_list = list()
super(RegularTokenizer, self).__init__(name=name, case_sensitive=case_sensitive)
def insert(self, word: str) -> None:
"""
:param word: 正则表达式.
"""
self.regular_quick_find_tokenizer.insert(pattern=str(word))
def insert_black(self, word: str) -> None:
"""添加黑名单"""
if word not in self._black_list:
self._black_list.append(word)
def tokenize(self, text: str, full_mode: bool = False) -> Tuple[List[str], List[bool]]:
text = str(text)
if not self.case_sensitive:
text_ = self.lowercase(text)
else:
text_ = text
potential_pattern, no_index_pattern = self.regular_quick_find_tokenizer.get_potential_pattern(text=text_)
# | 取并集, & 取交集.
pattern_set = potential_pattern | no_index_pattern
span_list = list()
for pattern in pattern_set:
try:
if self.case_sensitive:
pattern = re.compile(pattern)
else:
pattern = re.compile(pattern, re.I)
except re.error as e:
logger.error('{}, pattern: {}'.format(e, pattern))
continue
match_iter = re.finditer(pattern, text_)
for match in match_iter:
match_str = match.group(0).strip()
if len(match_str) >= 2:
span_list.append(match.span())
if full_mode:
span_accept = span_list
else:
span_list = sorted(span_list, key=lambda x: x[1] - x[0], reverse=True)
span_list = sorted(span_list, key=lambda x: x[0], reverse=False)
span_accept = [(0, 0)]
for span in span_list:
if span[0] >= span_accept[-1][1]:
span_accept.append(span)
outlst, iswlst = list(), list()
last_idx = None
for b, e in span_accept:
if last_idx is None:
outlst, iswlst = self._outlst_iswlst_append(text[:b], False, outlst, iswlst)
else:
outlst, iswlst = self._outlst_iswlst_append(text[last_idx:b], False, outlst, iswlst)
outlst, iswlst = self._outlst_iswlst_append(text[b:e], True, outlst, iswlst)
last_idx = e
outlst, iswlst = self._outlst_iswlst_append(text[last_idx:], False, outlst, iswlst)
# 应用黑名单.
for idx, out in enumerate(outlst):
if out in self._black_list:
iswlst[idx] = False
return self._merge_tokens(outlst, iswlst)
class RegularQuickFindTokenizer(FastTokenizer):
"""
根据正则表达式的锚点, 快速查找可能在 text 上成立的正则表达式.
1. insert 正则表达式,
2. 获取索引, 并插入分词器,
3. 使用分词器对句子分词, 匹配到的部分就有可能匹配其正则表达式.
"""
@staticmethod
def demo1():
quick = RegularQuickFindTokenizer()
quick.insert('.*[0-9]000.*到[0-9]999.*')
quick.insert('^(?=.*(华为|苹果).*(手机|手表)).*(电脑|平板).*(?=.*小米(手机|手表)).*$')
quick.insert(r'.*(输入密码)0米(\d{2.10}).*')
quick.insert(r'.*(输入|密码)(\d{2.10}).*')
quick.insert('^(?=.*(华为|苹果).*(电脑|平板|手表).*$')
quick.insert('*0米.*(左|右).*')
quick.insert('.*[0-9].*[0-9].*')
quick.insert(r'\d+左右')
text = '3000-3999 的华为手表, 有没有, 1500左右的也可以. '
ret = quick.tokenize(text)
print(ret)
ret = quick.get_potential_pattern(text)
print(ret)
return
def __init__(self, splitter: Optional[Splitter] = None, name=_DEFAULT_TOKENIZER_NAME, case_sensitive=False):
splitter = splitter or ByCharSplitterV2()
self._no_index_pattern: Set[str] = set()
self._index_to_pattern: Dict[str, Set[str]] = defaultdict(set)
super().__init__(splitter=splitter, name=name, case_sensitive=case_sensitive)
def insert(self, pattern: str) -> None:
indexes: List[str] = RegularIndexParse.get_indexes(pattern)
if indexes is None:
self._no_index_pattern.add(pattern)
else:
for index in indexes:
self._index_to_pattern[index].add(pattern)
super(RegularQuickFindTokenizer, self).insert(index)
def get_potential_pattern(self, text: str) -> Tuple[Set[str], Set[str]]:
"""
:return: 两个集合, 第一个是潜在正则表达式集合, 第二个是 insert 进来的无 index 正则,
"""
pattern = set()
# full_mode 默认为 True, 全量匹配所有可能的正则.
outlst, iswlst = self.tokenize(text, full_mode=True)
for out, isw in zip(outlst, iswlst):
if isw is True:
# 这里的方括号索引, 应该不会报错.
pattern.update(self._index_to_pattern[out])
return pattern, self._no_index_pattern
class RegularIndexParse(object):
alp_num_ch = re.compile(Pattern.alp_num_ch)
brackets = re.compile(Pattern.brackets)
square_brackets = re.compile(Pattern.square_brackets)
regex_dsw_find = re.compile(Pattern.regex_dsw_find)
@staticmethod
def demo1():
pattern = r'\d+左右'
ret = RegularIndexParse.get_indexes(pattern)
print(ret)
return
def __init__(self):
pass
@classmethod
def _split_by_brackers(cls, text):
# 按照括号对称分割字符串
brackets = ['(', ')']
result = []
tmp = ''
flag = 0
for s in text:
if s not in brackets:
tmp += s
elif s == '(':
if tmp and flag == 0:
result.append(tmp)
tmp = ''
tmp += s
flag = flag + 1
else:
tmp += s
flag = flag - 1
if flag == 0:
result.append(tmp)
tmp = ''
if tmp:
result.append(tmp)
return result
@classmethod
def _get_index_in_brackets(cls, text):
# 文本中存在括号
# 先查找括号外是否有索引
# 如果没有,则查找括号内的索引组
index = cls._get_index_out_of_brackets(text)
if index:
return [index.group()]
tmps = cls.brackets.findall(text)
index = []
for tmp in tmps:
tmp_index = cls.alp_num_ch.findall(tmp)
if len(index) == 0:
index = tmp_index
elif len(tmp_index) < len(index):
index = tmp_index
return index
@classmethod
def _get_index_out_of_brackets(cls, text):
# 去除正则表达式中, 在圆括号内的文字.
tmp1 = cls.brackets.sub('', text)
# 去除正则表达式中, 方括号部分
tmp2 = cls.square_brackets.sub('', tmp1)
# 去除如 \d+, \s+ 等.
tmp3 = cls.regex_dsw_find.sub('', tmp2)
# 取去除括号后的正则中的第一个文字作为 index.
tmp4 = cls.alp_num_ch.search(tmp3)
return tmp4
@classmethod
def get_indexes(cls, text: str) -> Union[List[str], None]:
indexes = cls._get_index_out_of_brackets(text)
if indexes:
return [indexes.group()]
pieces = cls._split_by_brackers(text)
for p in pieces:
if '(' in p:
if '(' in p[1:-1]:
tmp_index = cls._get_index_in_brackets(p[1:-1])
else:
tmp_index = cls.alp_num_ch.findall(p)
if indexes is None:
indexes = tmp_index
else:
if len(tmp_index) < len(indexes):
indexes = tmp_index
return indexes
class IndivisibleTokenizer(FastTokenizer):
def __init__(self,
indivisible_dict: Dict[str, Tuple[List[str], List[List[str]]]],
case_sensitive=False):
"""
指定分割词 / 不可分割词.
将词分按指定方式分割. 元组中第一项是分词的列表, 第二项是每个子词对应的词性(可以有多个词性).
"""
super(IndivisibleTokenizer, self).__init__(case_sensitive=case_sensitive)
self.word2tags = defaultdict(list)
for word, t_words in indivisible_dict.items():
self.insert(word, t_words)
@classmethod
def from_json_file(cls, filename, case_sensitive=False):
with open(filename, 'r', encoding='utf-8') as f:
indivisible_dict = json.load(f)
return cls(indivisible_dict=indivisible_dict, case_sensitive=case_sensitive)
def insert(self, word: str, tag: Tuple[List[str], List[List[str]]] = None) -> None:
if tag is None:
tag = list()
self.word2tags[word] = tag
super().insert(word)
def tokenize(self, text: Union[str, List[str]], full_mode: bool = False) -> Tuple[List[str], List[bool]]:
outlst, iswlst = super().tokenize(text)
outlst2, iswlst2 = list(), list()
for out, isw in zip(outlst, iswlst):
if isw is True:
word_list, tags_list = self.word2tags[out]
outlst2.extend(word_list)
iswlst2.extend(tags_list)
else:
outlst2.append(out)
iswlst2.append(isw)
return outlst2, iswlst2
def whitespace_tokenize(text):
"""Runs basic whitespace cleaning and splitting on a piece of text."""
text = text.strip()
if not text:
return []
tokens = text.split()
return tokens
def demo1():
text = '我想买一个老人用的, 1500左右, huawei watch gt 感觉还可以, 它性价比高吗, 有优惠活动吗?'
fast = FastTokenizer()
fast.insert_from_list(['huawei watch gt', 'huawei p30系列', 'huawei p30 pro'])
# fast.insert('huawei p30系列')
result = fast.tokenize(text)
print(result)
return
def demo2():
text = '我想买一个老人用的, 1500左右, huawei watch gt 感觉还可以, 它性价比高吗, 有优惠活动吗?'
fast = RegularTokenizer()
fast.insert_from_list([r'\d+'])
result = fast.tokenize(text)
print(result)
return
def demo3():
text = '我想买一个老人用的, 1500左右, huawei watch gt 感觉还可以, 它性价比高吗, 有优惠活动吗?'
RegularIndexParse.get_indexes('')
ret = RegularIndexParse.get_indexes('.*[0-9]000.*到[0-9]999.*')
print(ret)
ret = RegularIndexParse.get_indexes('.*[0-9].*[0-9].*')
print(ret)
# ret = RegularIndexParse.get_indexes('.*[0-9]000.*到[0-9]999.*')
# print(ret)
# ret = RegularIndexParse.get_indexes('.*[0-9]000.*到[0-9]999.*')
# print(ret)
# ret = RegularIndexParse.get_indexes('.*[0-9]000.*到[0-9]999.*')
# print(ret)
# quick.insert('^(?=.*(华为|苹果).*(手机|手表)).*(电脑|平板).*(?=.*小米(手机|手表)).*$')
# quick.insert('.*(输入密码)0米(\d{2.10}).*')
# quick.insert('.*(输入|密码)(\d{2.10}).*')
# quick.insert('^(?=.*(华为|苹果).*(电脑|平板|手表).*$')
# quick.insert('*0米.*(左|右).*')
# quick.insert('.*[0-9].*[0-9].*')
return
def demo4():
FastTokenizer.demo2()
# FastTokenizer.demo3()
# RegularQuickFindTokenizer.demo1()
return
if __name__ == '__main__':
# demo1()
# demo2()
# demo3()
demo4()