Spaces:
Sleeping
Sleeping
import json | |
import os | |
import re | |
import numpy as np | |
import paddle.inference as paddle_infer | |
from paddlenlp.transformers import ErnieTokenizer | |
__all__ = ['PunctuationExecutor'] | |
class PunctuationExecutor: | |
def __init__(self, model_dir, use_gpu=True, gpu_mem=500, num_threads=4): | |
# config | |
model_path = os.path.join(model_dir, 'model.pdmodel') | |
params_path = os.path.join(model_dir, 'model.pdiparams') | |
if not os.path.exists(model_path) or not os.path.exists(params_path): | |
raise Exception("{}{}".format(model_path, params_path)) | |
self.config = paddle_infer.Config(model_path, params_path) | |
# | |
pretrained_token = 'ernie-1.0' | |
if os.path.exists(os.path.join(model_dir, 'info.json')): | |
with open(os.path.join(model_dir, 'info.json'), 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
pretrained_token = data['pretrained_token'] | |
if use_gpu: | |
self.config.enable_use_gpu(gpu_mem, 0) | |
else: | |
self.config.disable_gpu() | |
self.config.set_cpu_math_library_num_threads(num_threads) | |
# enable memory optim | |
self.config.enable_memory_optim() | |
self.config.disable_glog_info() | |
# config predictor | |
self.predictor = paddle_infer.create_predictor(self.config) | |
# | |
self.input_ids_handle = self.predictor.get_input_handle('input_ids') | |
self.token_type_ids_handle = self.predictor.get_input_handle('token_type_ids') | |
# | |
self.output_names = self.predictor.get_output_names() | |
self._punc_list = [] | |
if not os.path.join(model_dir, 'vocab.txt'): | |
raise Exception("{}".format(os.path.join(model_dir, 'vocab.txt'))) | |
with open(os.path.join(model_dir, 'vocab.txt'), 'r', encoding='utf-8') as f: | |
for line in f: | |
self._punc_list.append(line.strip()) | |
self.tokenizer = ErnieTokenizer.from_pretrained(pretrained_token) | |
# | |
self('') | |
def _clean_text(self, text): | |
text = text.lower() | |
text = re.sub('[^A-Za-z0-9\u4e00-\u9fa5]', '', text) | |
text = re.sub(f'[{"".join([p for p in self._punc_list][1:])}]', '', text) | |
return text | |
# | |
def preprocess(self, text: str): | |
clean_text = self._clean_text(text) | |
if len(clean_text) == 0: return None | |
tokenized_input = self.tokenizer(list(clean_text), return_length=True, is_split_into_words=True) | |
input_ids = tokenized_input['input_ids'] | |
seg_ids = tokenized_input['token_type_ids'] | |
seq_len = tokenized_input['seq_len'] | |
return input_ids, seg_ids, seq_len | |
def infer(self, input_ids: list, seg_ids: list): | |
# | |
self.input_ids_handle.reshape([1, len(input_ids)]) | |
self.token_type_ids_handle.reshape([1, len(seg_ids)]) | |
self.input_ids_handle.copy_from_cpu(np.array([input_ids]).astype('int64')) | |
self.token_type_ids_handle.copy_from_cpu(np.array([seg_ids]).astype('int64')) | |
# predictor | |
self.predictor.run() | |
# | |
output_handle = self.predictor.get_output_handle(self.output_names[0]) | |
output_data = output_handle.copy_to_cpu() | |
return output_data | |
# | |
def postprocess(self, input_ids, seq_len, preds): | |
tokens = self.tokenizer.convert_ids_to_tokens(input_ids[1:seq_len - 1]) | |
labels = preds[1:seq_len - 1].tolist() | |
assert len(tokens) == len(labels) | |
text = '' | |
for t, l in zip(tokens, labels): | |
text += t | |
if l != 0: | |
text += self._punc_list[l] | |
return text | |
def __call__(self, text: str) -> str: | |
# | |
input_ids, seg_ids, seq_len = self.preprocess(text) | |
preds = self.infer(input_ids=input_ids, seg_ids=seg_ids) | |
if len(preds.shape) == 2: | |
preds = preds[0] | |
text = self.postprocess(input_ids, seq_len, preds) | |
return text |