|
import os
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.utils.data import Dataset, DataLoader
|
|
from torchvision import transforms
|
|
from transformers import ViTModel, BertTokenizerFast, BertConfig, BertLMHeadModel, AdamW
|
|
from PIL import Image, ImageFile
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
|
|
|
|
Image.MAX_IMAGE_PIXELS = None
|
|
|
|
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
print(f"Using device: {device}")
|
|
|
|
|
|
VIT_MODEL_NAME = "google/vit-base-patch16-224"
|
|
BERT_MODEL_NAME = "dbmdz/bert-base-turkish-cased"
|
|
model = "TeLVE_v1.0.pth"
|
|
MAX_LENGTH = 128
|
|
BATCH_SIZE = 8
|
|
EPOCHS = 5
|
|
LEARNING_RATE = 2e-5
|
|
|
|
class ImageCaptioningDataset(Dataset):
|
|
def __init__(self, dataframe, img_dir, tokenizer):
|
|
self.dataframe = dataframe
|
|
self.img_dir = img_dir
|
|
self.tokenizer = tokenizer
|
|
self.transform = transforms.Compose([
|
|
transforms.Resize((224, 224)),
|
|
transforms.ToTensor(),
|
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
|
|
])
|
|
|
|
def __len__(self):
|
|
return len(self.dataframe)
|
|
|
|
def __getitem__(self, idx):
|
|
row = self.dataframe.iloc[idx]
|
|
img_path = os.path.join(self.img_dir, row['photo_id'] + ".jpg")
|
|
|
|
try:
|
|
image = Image.open(img_path).convert('RGB')
|
|
image = self.transform(image)
|
|
except (FileNotFoundError, IOError):
|
|
|
|
return None
|
|
|
|
caption = row['ai_description']
|
|
|
|
|
|
if not isinstance(caption, str):
|
|
return None
|
|
|
|
encoding = self.tokenizer(
|
|
caption,
|
|
add_special_tokens=True,
|
|
max_length=MAX_LENGTH,
|
|
padding='max_length',
|
|
truncation=True,
|
|
return_attention_mask=True,
|
|
return_tensors='pt'
|
|
)
|
|
|
|
return {
|
|
'pixel_values': image,
|
|
'input_ids': encoding['input_ids'].squeeze(),
|
|
'attention_mask': encoding['attention_mask'].squeeze(),
|
|
'labels': encoding['input_ids'].squeeze()
|
|
}
|
|
|
|
|
|
class ImageCaptioningModel(nn.Module):
|
|
def __init__(self, vit_model, bert_model):
|
|
super(ImageCaptioningModel, self).__init__()
|
|
self.vit = vit_model
|
|
self.bert = bert_model
|
|
self.linear = nn.Linear(self.vit.config.hidden_size, self.bert.config.hidden_size)
|
|
|
|
def forward(self, pixel_values, input_ids, attention_mask, labels=None):
|
|
image_features = self.vit(pixel_values).last_hidden_state
|
|
image_features = self.linear(image_features)
|
|
|
|
outputs = self.bert(input_ids=input_ids,
|
|
attention_mask=attention_mask,
|
|
encoder_hidden_states=image_features,
|
|
labels=labels,
|
|
return_dict=True)
|
|
|
|
return outputs.loss, outputs.logits
|
|
|
|
def collate_fn(batch):
|
|
|
|
batch = list(filter(lambda x: x is not None, batch))
|
|
if len(batch) == 0:
|
|
return None
|
|
return {key: torch.stack([item[key] for item in batch]) for key in batch[0]}
|
|
|
|
def train_vlm_model():
|
|
|
|
encodings = ['utf-8', 'iso-8859-9', 'windows-1254']
|
|
for encoding in encodings:
|
|
try:
|
|
df = pd.read_csv('./datasets/' + model + '.tsv000', sep='\t', encoding=encoding)
|
|
print(f"Successfully read the file with {encoding} encoding.")
|
|
break
|
|
except UnicodeDecodeError:
|
|
print(f"Failed to read with {encoding} encoding. Trying next...")
|
|
else:
|
|
raise ValueError("Could not read the file with any of the specified encodings.")
|
|
|
|
|
|
tokenizer = BertTokenizerFast.from_pretrained(BERT_MODEL_NAME)
|
|
|
|
|
|
dataset = ImageCaptioningDataset(df, '../download/images', tokenizer)
|
|
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
|
|
|
|
|
|
vit_model = ViTModel.from_pretrained(VIT_MODEL_NAME)
|
|
bert_config = BertConfig.from_pretrained(BERT_MODEL_NAME)
|
|
bert_config.is_decoder = True
|
|
bert_config.add_cross_attention = True
|
|
bert_model = BertLMHeadModel.from_pretrained(BERT_MODEL_NAME, config=bert_config)
|
|
|
|
|
|
model = ImageCaptioningModel(vit_model, bert_model)
|
|
model.to(device)
|
|
|
|
|
|
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
|
|
|
|
|
|
model.train()
|
|
for epoch in range(EPOCHS):
|
|
total_loss = 0
|
|
progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{EPOCHS}")
|
|
for batch in progress_bar:
|
|
if batch is None:
|
|
continue
|
|
|
|
pixel_values = batch['pixel_values'].to(device)
|
|
input_ids = batch['input_ids'].to(device)
|
|
attention_mask = batch['attention_mask'].to(device)
|
|
labels = batch['labels'].to(device)
|
|
|
|
optimizer.zero_grad()
|
|
loss, _ = model(pixel_values, input_ids, attention_mask, labels)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
total_loss += loss.item()
|
|
progress_bar.set_postfix({'loss': loss.item()})
|
|
|
|
print(f"Epoch {epoch+1}/{EPOCHS}, Average Loss: {total_loss/len(dataloader)}")
|
|
|
|
|
|
torch.save(model.state_dict(), "./models/" + model)
|
|
tokenizer.save_pretrained("./tokenizer")
|
|
|
|
if __name__ == "__main__":
|
|
train_vlm_model() |