import torch from torch.utils import data import numpy as np from os.path import join as pjoin import random import codecs as cs from tqdm import tqdm import utils.paramUtil as paramUtil from torch.utils.data._utils.collate import default_collate def collate_fn(batch): batch.sort(key=lambda x: x[3], reverse=True) return default_collate(batch) '''For use of training text-2-motion generative model''' class Text2MotionDataset(data.Dataset): def __init__(self, dataset_name, is_test, w_vectorizer, feat_bias = 5, max_text_len = 20, unit_length = 4): self.max_length = 20 self.pointer = 0 self.dataset_name = dataset_name self.is_test = is_test self.max_text_len = max_text_len self.unit_length = unit_length self.w_vectorizer = w_vectorizer if dataset_name == 't2m': self.data_root = './dataset/HumanML3D' self.motion_dir = pjoin(self.data_root, 'new_joint_vecs') self.text_dir = pjoin(self.data_root, 'texts') self.joints_num = 22 radius = 4 fps = 20 self.max_motion_length = 196 dim_pose = 263 kinematic_chain = paramUtil.t2m_kinematic_chain self.meta_dir = 'checkpoints/t2m/VQVAEV3_CB1024_CMT_H1024_NRES3/meta' elif dataset_name == 'kit': self.data_root = './dataset/KIT-ML' self.motion_dir = pjoin(self.data_root, 'new_joint_vecs') self.text_dir = pjoin(self.data_root, 'texts') self.joints_num = 21 radius = 240 * 8 fps = 12.5 dim_pose = 251 self.max_motion_length = 196 kinematic_chain = paramUtil.kit_kinematic_chain self.meta_dir = 'checkpoints/kit/VQVAEV3_CB1024_CMT_H1024_NRES3/meta' mean = np.load(pjoin(self.meta_dir, 'mean.npy')) std = np.load(pjoin(self.meta_dir, 'std.npy')) if is_test: split_file = pjoin(self.data_root, 'test.txt') # test.txt else: split_file = pjoin(self.data_root, 'val.txt') min_motion_len = 40 if self.dataset_name =='t2m' else 24 # min_motion_len = 64 joints_num = self.joints_num data_dict = {} id_list = [] with cs.open(split_file, 'r') as f: for line in f.readlines(): id_list.append(line.strip()) new_name_list = [] length_list = [] for name in tqdm(id_list): try: motion = np.load(pjoin(self.motion_dir, name + '.npy')) if (len(motion)) < min_motion_len or (len(motion) >= 200): continue text_data = [] flag = False with cs.open(pjoin(self.text_dir, name + '.txt')) as f: for line in f.readlines(): text_dict = {} line_split = line.strip().split('#') caption = line_split[0] txt_perb = line_split[-1] tokens = line_split[1].split(' ') f_tag = float(line_split[2]) to_tag = float(line_split[3]) f_tag = 0.0 if np.isnan(f_tag) else f_tag to_tag = 0.0 if np.isnan(to_tag) else to_tag text_dict['caption'] = caption text_dict['caption_perb'] = txt_perb text_dict['tokens'] = tokens if f_tag == 0.0 and to_tag == 0.0: flag = True text_data.append(text_dict) else: try: n_motion = motion[int(f_tag*fps) : int(to_tag*fps)] if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200): continue new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name while new_name in data_dict: new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name data_dict[new_name] = {'motion': n_motion, 'length': len(n_motion), 'text':[text_dict]} new_name_list.append(new_name) length_list.append(len(n_motion)) except: print(line_split) print(line_split[2], line_split[3], f_tag, to_tag, name) # break if flag: data_dict[name] = {'motion': motion, 'length': len(motion), 'text': text_data} new_name_list.append(name) length_list.append(len(motion)) except Exception as e: # print(e) pass name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1])) self.mean = mean self.std = std self.length_arr = np.array(length_list) self.data_dict = data_dict self.name_list = name_list self.reset_max_len(self.max_length) def reset_max_len(self, length): assert length <= self.max_motion_length self.pointer = np.searchsorted(self.length_arr, length) print("Pointer Pointing at %d"%self.pointer) self.max_length = length def inv_transform(self, data): return data * self.std + self.mean def forward_transform(self, data): return (data - self.mean) / self.std def __len__(self): return len(self.data_dict) - self.pointer def __getitem__(self, item): idx = self.pointer + item name = self.name_list[idx] data = self.data_dict[name] # data = self.data_dict[self.name_list[idx]] motion, m_length, text_list = data['motion'], data['length'], data['text'] # Randomly select a caption text_data = random.choice(text_list) caption, tokens, caption_perb = text_data['caption'], text_data['tokens'], text_data['caption_perb'] if len(tokens) < self.max_text_len: # pad with "unk" tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] sent_len = len(tokens) tokens = tokens + ['unk/OTHER'] * (self.max_text_len + 2 - sent_len) else: # crop tokens = tokens[:self.max_text_len] tokens = ['sos/OTHER'] + tokens + ['eos/OTHER'] sent_len = len(tokens) pos_one_hots = [] word_embeddings = [] for token in tokens: word_emb, pos_oh = self.w_vectorizer[token] pos_one_hots.append(pos_oh[None, :]) word_embeddings.append(word_emb[None, :]) pos_one_hots = np.concatenate(pos_one_hots, axis=0) word_embeddings = np.concatenate(word_embeddings, axis=0) if self.unit_length < 10: coin2 = np.random.choice(['single', 'single', 'double']) else: coin2 = 'single' if coin2 == 'double': m_length = (m_length // self.unit_length - 1) * self.unit_length elif coin2 == 'single': m_length = (m_length // self.unit_length) * self.unit_length idx = random.randint(0, len(motion) - m_length) motion = motion[idx:idx+m_length] "Z Normalization" motion = (motion - self.mean) / self.std if m_length < self.max_motion_length: motion = np.concatenate([motion, np.zeros((self.max_motion_length - m_length, motion.shape[1])) ], axis=0) return word_embeddings, pos_one_hots, caption, caption_perb, sent_len, motion, m_length, '_'.join(tokens), name def DATALoader(dataset_name, is_test, batch_size, w_vectorizer, num_workers = 8, unit_length = 4) : val_loader = torch.utils.data.DataLoader(Text2MotionDataset(dataset_name, is_test, w_vectorizer, unit_length=unit_length), batch_size, shuffle = True, num_workers=num_workers, collate_fn=collate_fn, drop_last = True) return val_loader from torch.utils.data.distributed import DistributedSampler def DATALoader_ddp(dataset_name, is_test, batch_size, w_vectorizer, num_workers = 8, unit_length = 4): val_dataset = Text2MotionDataset(dataset_name, is_test, w_vectorizer, unit_length=unit_length) val_sampler = DistributedSampler(val_dataset) val_loader = torch.utils.data.DataLoader(val_dataset, batch_size, num_workers=num_workers, collate_fn=collate_fn, drop_last = True, sampler=val_sampler) return val_loader def cycle(iterable): while True: for x in iterable: yield x