|
from typing import List, Tuple, Any |
|
|
|
import os |
|
from functools import lru_cache |
|
|
|
from pyarabic.araby import tokenize, strip_tashkeel |
|
|
|
import numpy as np |
|
import torch as T |
|
from torch.utils.data import Dataset |
|
|
|
try: |
|
from transformers import PreTrainedTokenizer |
|
except: |
|
from typing import Any as PreTrainedTokenizer |
|
|
|
from data_utils import DatasetUtils |
|
import diac_utils as du |
|
|
|
class DataRetriever(Dataset): |
|
def __init__( |
|
self, |
|
lines, |
|
data_utils: DatasetUtils, |
|
is_test: bool = False, |
|
*, |
|
tokenizer: PreTrainedTokenizer, |
|
lines_mode: bool = False, |
|
**kwargs, |
|
): |
|
super(DataRetriever).__init__() |
|
|
|
self.data_utils = data_utils |
|
self.is_test = is_test |
|
self.tokenizer = tokenizer |
|
|
|
self.stride = data_utils.test_stride |
|
|
|
self.data_points = lines |
|
|
|
self.bos_token_id = int(self.tokenizer.bos_token_id or self.tokenizer.cls_token_id) |
|
self.eos_token_id = int(self.tokenizer.eos_token_id or self.tokenizer.sep_token_id) |
|
|
|
self.max_tokens = self.data_utils.max_token_count |
|
self.max_slen = self.data_utils.max_sent_len |
|
self.max_wlen = self.data_utils.max_word_len |
|
|
|
self.p_val = self.tokenizer.pad_token_id |
|
self.pc_val = self.data_utils.pad_char_id |
|
self.pt_val = self.data_utils.pad_target_val |
|
|
|
self.char_x_padding = [self.pc_val] * self.max_wlen |
|
self.diac_x_padding = [[self.pc_val]*8] * self.max_wlen |
|
self.diac_y_padding = [self.pt_val] * self.max_wlen |
|
|
|
def preprocess(self, data, dtype=T.long): |
|
return [T.tensor(np.array(x), dtype=dtype) for x in data] |
|
|
|
def __len__(self): |
|
return len(self.data_points) |
|
|
|
@lru_cache(maxsize=1024 * 2) |
|
def __getitem__(self, idx: int) -> Tuple[List[T.Tensor], T.Tensor, T.Tensor]: |
|
word_x, char_x, diac_x, diac_y, subword_lengths = self.create_sentence(idx) |
|
return ( |
|
self.preprocess([word_x, char_x, diac_x]), |
|
T.tensor(diac_y, dtype=T.long), |
|
T.tensor(subword_lengths, dtype=T.long) |
|
) |
|
|
|
def create_sentence(self, idx): |
|
line = self.data_points[idx] |
|
|
|
words: List[str] = tokenize(line.strip()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subwords_x = [self.bos_token_id] |
|
subword_lengths = [] |
|
|
|
char_x = [] |
|
diac_x = [] |
|
diac_y = [] |
|
diac_y_tmp = [] |
|
|
|
for i_word, word in enumerate(words): |
|
word = du.strip_unknown_tashkeel(word) |
|
word_chars = du.split_word_on_characters_with_diacritics(word) |
|
cx, cy, cy_3head = du.create_label_for_word(word_chars) |
|
|
|
word_strip = strip_tashkeel(word) |
|
|
|
|
|
word_sub_ids = self.tokenizer(word_strip)['input_ids'][1:-1] |
|
subword_lengths += [len(word_sub_ids)] |
|
|
|
subwords_x += word_sub_ids |
|
|
|
|
|
char_x += [self.data_utils.pad_and_truncate_sequence(cx, self.max_wlen)] |
|
|
|
diac_y += [self.data_utils.pad_and_truncate_sequence(cy, self.max_wlen, pad=self.data_utils.pad_target_val)] |
|
diac_y_tmp += [self.data_utils.pad_and_truncate_sequence(cy_3head, self.max_wlen, pad=[self.data_utils.pad_target_val]*3)] |
|
|
|
assert len(char_x) == len(subword_lengths), f"{char_x=}; {subword_lengths=} ;;" |
|
assert len(char_x) == len(words) |
|
|
|
diac_x = self.data_utils.create_decoder_input(diac_y_tmp) |
|
|
|
subwords_x += [self.eos_token_id] |
|
|
|
assert len(subword_lengths) == len(words) |
|
subwords_x = self.data_utils.pad_and_truncate_sequence(subwords_x, self.max_tokens, pad=self.p_val) |
|
subword_lengths = self.data_utils.pad_and_truncate_sequence(subword_lengths, self.max_slen, pad=0) |
|
|
|
char_x = self.data_utils.pad_and_truncate_sequence(char_x, self.max_slen, pad=self.char_x_padding) |
|
diac_x = self.data_utils.pad_and_truncate_sequence(diac_x, self.max_slen, pad=self.diac_x_padding) |
|
diac_y = self.data_utils.pad_and_truncate_sequence(diac_y, self.max_slen, pad=self.diac_y_padding) |
|
|
|
return subwords_x, char_x, diac_x, diac_y, subword_lengths |