SmallCapDemo / src /utils.py
RitaParadaRamos's picture
Upload 13 files
28cac5b
from torch.utils.data import Dataset
from PIL import Image
import torch
import json
import h5py
import bisect
CAPTION_LENGTH = 25
SIMPLE_PREFIX = "This image shows "
def prep_strings(text, tokenizer, template=None, retrieved_caps=None, k=None, is_test=False, max_length=None):
if is_test:
padding = False
truncation = False
else:
padding = True
truncation = True
if retrieved_caps is not None:
infix = '\n\n'.join(retrieved_caps[:k]) + '.'
prefix = template.replace('||', infix)
else:
prefix = SIMPLE_PREFIX
prefix_ids = tokenizer.encode(prefix)
len_prefix = len(prefix_ids)
text_ids = tokenizer.encode(text, add_special_tokens=False)
if truncation:
text_ids = text_ids[:CAPTION_LENGTH]
input_ids = prefix_ids + text_ids if not is_test else prefix_ids
# we ignore the prefix (minus one as the first subtoken in the prefix is not predicted)
label_ids = [-100] * (len_prefix - 1) + text_ids + [tokenizer.eos_token_id]
if padding:
input_ids += [tokenizer.pad_token_id] * (max_length - len(input_ids))
label_ids += [-100] * (max_length - len(label_ids))
if is_test:
return input_ids
else:
return input_ids, label_ids
def postprocess_preds(pred, tokenizer):
pred = pred.split(SIMPLE_PREFIX)[-1]
pred = pred.replace(tokenizer.pad_token, '')
if pred.startswith(tokenizer.bos_token):
pred = pred[len(tokenizer.bos_token):]
if pred.endswith(tokenizer.eos_token):
pred = pred[:-len(tokenizer.eos_token)]
return pred
class TrainDataset(Dataset):
def __init__(self, df, features_path, tokenizer, rag=False, template_path=None, k=None, max_caption_length=25):
self.df = df
self.tokenizer = tokenizer
self.features = h5py.File(features_path, 'r')
if rag:
self.template = open(template_path).read().strip() + ' '
self.max_target_length = (max_caption_length # target caption
+ max_caption_length * k # retrieved captions
+ len(tokenizer.encode(self.template)) # template
+ len(tokenizer.encode('\n\n')) * (k-1) # separator between captions
)
assert k is not None
self.k = k
self.rag = rag
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
text = self.df['text'][idx]
if self.rag:
caps = self.df['caps'][idx]
decoder_input_ids, labels = prep_strings(text, self.tokenizer, template=self.template,
retrieved_caps=caps, k=self.k, max_length=self.max_target_length)
else:
decoder_input_ids, labels = prep_strings(text, self.tokenizer, max_length=self.max_target_length)
# load precomputed features
encoder_outputs = self.features[self.df['cocoid'][idx]][()]
encoding = {"encoder_outputs": torch.tensor(encoder_outputs),
"decoder_input_ids": torch.tensor(decoder_input_ids),
"labels": torch.tensor(labels)}
return encoding
def load_data_for_training(annot_path, caps_path=None):
annotations = json.load(open(annot_path))['images']
if caps_path is not None:
retrieved_caps = json.load(open(caps_path))
data = {'train': [], 'val': []}
for item in annotations:
file_name = item['filename'].split('_')[-1]
if caps_path is not None:
caps = retrieved_caps[str(item['cocoid'])]
else:
caps = None
samples = []
for sentence in item['sentences']:
samples.append({'file_name': file_name, 'cocoid': str(item['cocoid']), 'caps': caps, 'text': ' '.join(sentence['tokens'])})
if item['split'] == 'train' or item['split'] == 'restval':
data['train'] += samples
elif item['split'] == 'val':
data['val'] += samples
return data
def load_data_for_inference(annot_path, caps_path=None):
annotations = json.load(open(annot_path))['images']
if caps_path is not None:
retrieved_caps = json.load(open(caps_path))
data = {'test': [], 'val': []}
for item in annotations:
file_name = item['filename'].split('_')[-1]
if caps_path is not None:
caps = retrieved_caps[str(item['cocoid'])]
else:
caps = None
image = {'file_name': file_name, 'caps': caps, 'image_id': str(item['cocoid'])}
if item['split'] == 'test':
data['test'].append(image)
elif item['split'] == 'val':
data['val'].append(image)
return data