muPPIt / predict_2.py
AlienChen's picture
Upload 139 files
65bd8af verified
import torch
from torch.utils.data import DataLoader, Subset
from torch.optim import AdamW
import torch.nn.functional as F
import torch.nn as nn
from datasets import load_from_disk
import esm
import numpy as np
import math
import os
from transformers import AutoTokenizer
from torch.optim.lr_scheduler import CosineAnnealingLR
from transformers import get_linear_schedule_with_warmup
from tqdm import tqdm
from torch.cuda.amp import autocast, GradScaler
import gc
import pdb
import pandas as pd
from collections import defaultdict
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
##################### Hyper-parameters #############################################
binder = 'STKKDEREYKSPAEIAEFLF'
wildtype = 'LAAVSVDCSEYPKPACTLEYRPLCGSDNKTYGNKCNFCNAVVESNGTLTLSHFGKC'
mutant = 'LAAVSVDCSEYPKPACTLEYRPLCGSDNKTYRNKCNFCNAVVESNGTLTLSHFGKC'
max_epochs = 30
batch_size = 4
lr = 1e-4
dropout = 0.1
margin = 10
accumulation_steps = 16
num_heads = 4
checkpoint_path = '/home/tc415/muPPIt_embedding/checkpoints/improved_train_5/epoch=28_acc=0.59'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('''
max_epochs = 30
batch_size = 4
lr = 1e-4
dropout = 0.1
margin = 10
accumulation_steps = 16
''')
####################################################################################
vhse8_values = {
'A': [0.15, -1.11, -1.35, -0.92, 0.02, -0.91, 0.36, -0.48],
'R': [-1.47, 1.45, 1.24, 1.27, 1.55, 1.47, 1.30, 0.83],
'N': [-0.99, 0.00, 0.69, -0.37, -0.55, 0.85, 0.73, -0.80],
'D': [-1.15, 0.67, -0.41, -0.01, -2.68, 1.31, 0.03, 0.56],
'C': [0.18, -1.67, -0.21, 0.00, 1.20, -1.61, -0.19, -0.41],
'Q': [-0.96, 0.12, 0.18, 0.16, 0.09, 0.42, -0.20, -0.41],
'E': [-1.18, 0.40, 0.10, 0.36, -2.16, -0.17, 0.91, 0.36],
'G': [-0.20, -1.53, -2.63, 2.28, -0.53, -1.18, -1.34, 1.10],
'H': [-0.43, -0.25, 0.37, 0.19, 0.51, 1.28, 0.93, 0.65],
'I': [1.27, 0.14, 0.30, -1.80, 0.30, -1.61, -0.16, -0.13],
'L': [1.36, 0.07, 0.26, -0.80, 0.22, -1.37, 0.08, -0.62],
'K': [-1.17, 0.70, 0.80, 1.64, 0.67, 1.63, 0.13, -0.01],
'M': [1.01, -0.53, 0.43, 0.00, 0.23, 0.10, -0.86, -0.68],
'F': [1.52, 0.61, 0.95, -0.16, 0.25, 0.28, -1.33, -0.65],
'P': [0.22, -0.17, -0.50, -0.05, 0.01, -1.34, 0.19, 3.56],
'S': [-0.67, -0.86, -1.07, -0.41, -0.32, 0.27, -0.64, 0.11],
'T': [-0.34, -0.51, -0.55, -1.06, 0.01, -0.01, -0.79, 0.39],
'W': [1.50, 2.06, 1.79, 0.75, 0.75, 0.13, -1.06, -0.85],
'Y': [0.61, 1.60, 1.17, 0.73, 0.53, 0.25, -0.96, -0.52],
'V': [0.76, -0.92, 0.17, -1.91, 0.22, -1.40, -0.24, -0.03],
}
aa_to_idx = {'A': 5, 'R': 10, 'N': 17, 'D': 13, 'C': 23, 'Q': 16, 'E': 9, 'G': 6, 'H': 21, 'I': 12, 'L': 4, 'K': 15, 'M': 20, 'F': 18, 'P': 14, 'S': 8, 'T': 11, 'W': 22, 'Y': 19, 'V': 7}
vhse8_tensor = torch.zeros(33, 8)
for aa, values in vhse8_values.items():
aa_index = aa_to_idx[aa]
vhse8_tensor[aa_index] = torch.tensor(values)
vhse8_tensor = vhse8_tensor.to(device)
vhse8_tensor.requires_grad = False
class muPPIt(torch.nn.Module):
def __init__(self, d_node, num_heads, margin, lr, device):
super(muPPIt, self).__init__()
self.esm, self.alphabet = esm.pretrained.esm2_t33_650M_UR50D()
for param in self.esm.parameters():
param.requires_grad = False
self.attention = torch.nn.MultiheadAttention(embed_dim=d_node, num_heads=num_heads)
self.layer_norm = torch.nn.LayerNorm(d_node)
self.map = torch.nn.Sequential(
torch.nn.Linear(d_node, d_node // 2),
torch.nn.SiLU(),
torch.nn.Linear(d_node // 2, 1)
)
for layer in self.map:
if isinstance(layer, nn.Linear):
nn.init.kaiming_uniform_(layer.weight, a=0, mode='fan_in', nonlinearity='leaky_relu')
if layer.bias is not None:
nn.init.zeros_(layer.bias)
self.margin = margin
self.learning_rate = lr
self.loss_threshold = 20 # Set a threshold for identifying hard examples
self.device = device
# Easy and hard example tracking
self.easy_example_indices = np.load('/home/tc415/muPPIt_embedding/dataset/ppiref_index.npy').tolist()
self.hard_example_indices = np.load('/home/tc415/muPPIt_embedding/dataset/skempi_index.npy').tolist()
def forward(self, binder_tokens, wt_tokens, mut_tokens):
device = self.device
global vhse8_tensor
with torch.no_grad():
binder_pad_mask = (binder_tokens != self.alphabet.padding_idx).int()
binder_embed = self.esm(binder_tokens, repr_layers=[33], return_contacts=False)["representations"][33] * binder_pad_mask.unsqueeze(-1)
binder_vhse8 = vhse8_tensor[binder_tokens]
binder_embed = torch.concat([binder_embed, binder_vhse8], dim=-1)
mut_pad_mask = (mut_tokens != self.alphabet.padding_idx).int()
mut_embed = self.esm(mut_tokens, repr_layers=[33], return_contacts=False)["representations"][33] * mut_pad_mask.unsqueeze(-1)
mut_vhse8 = vhse8_tensor[mut_tokens]
mut_embed = torch.concat([mut_embed, mut_vhse8], dim=-1)
wt_pad_mask = (wt_tokens != self.alphabet.padding_idx).int()
wt_embed = self.esm(wt_tokens, repr_layers=[33], return_contacts=False)["representations"][33] * wt_pad_mask.unsqueeze(-1)
wt_vhse8 = vhse8_tensor[wt_tokens]
wt_embed = torch.concat([wt_embed, wt_vhse8], dim=-1)
binder_wt = torch.concat([binder_embed, wt_embed], dim=1)
binder_mut = torch.concat([binder_embed, mut_embed], dim=1)
binder_wt = binder_wt.transpose(0,1)
binder_mut = binder_mut.transpose(0,1)
binder_wt_attn, _ = self.attention(binder_wt, binder_wt, binder_wt)
binder_mut_attn, _ = self.attention(binder_mut, binder_mut, binder_mut)
binder_wt_attn = binder_wt + binder_wt_attn
binder_mut_attn = binder_mut + binder_mut_attn
binder_wt_attn = binder_wt_attn.transpose(0, 1)
binder_mut_attn = binder_mut_attn.transpose(0, 1)
binder_wt_attn = self.layer_norm(binder_wt_attn)
binder_mut_attn = self.layer_norm(binder_mut_attn)
mapped_binder_wt = self.map(binder_wt_attn).squeeze(-1) # B*(L1+L2)
mapped_binder_mut = self.map(binder_mut_attn).squeeze(-1) # B*(L1+L2)
distance = torch.sqrt(torch.sum((mapped_binder_wt - mapped_binder_mut) ** 2, dim=-1))
return distance
tokenizer = AutoTokenizer.from_pretrained("facebook/esm2_t33_650M_UR50D")
def predict(model, binder, wildtype, mutant):
global tokenizer
binder_tokens = torch.tensor(tokenizer(binder)['input_ids']).unsqueeze(0).to(device)
wt_tokens = torch.tensor(tokenizer(wildtype)['input_ids']).unsqueeze(0).to(device)
mut_tokens = torch.tensor(tokenizer(mutant)['input_ids']).unsqueeze(0).to(device)
distance = model.forward(binder_tokens, wt_tokens, mut_tokens)
return distance
def compute_mean(tuple_list):
sum_count_dict = defaultdict(lambda: [0, 0]) # [sum, count]
# Iterate through the list and update the sum and count
for key, value in tuple_list:
sum_count_dict[key][0] += value # Sum of tuple[1] for the same tuple[0]
sum_count_dict[key][1] += 1 # Count the occurrences
# Calculate the mean for each unique tuple[0]
mean_dict = {key: round(sum_value[0] / sum_value[1],2) for key, sum_value in sum_count_dict.items()}
print(dict(sorted(mean_dict.items())))
model = muPPIt(d_node=1288, num_heads=num_heads, margin=margin, lr=lr, device=device).to(device)
model.load_state_dict(torch.load(checkpoint_path))
model.eval()
# df = pd.read_csv('/home/tc415/muPPIt_embedding/dataset/skempi_test.csv')
# results = []
# for index, row in tqdm(df.iterrows(), total=len(df)):
# binder = row['binder']
# wildtype = row['wt']
# mutant = row['mut']
# mut_aff = np.log10(row['mut_affinity'])
# wt_aff = np.log10(row['wt_affinity'])
# with torch.no_grad():
# distance = predict(model, binder, wildtype, mutant)
# results.append((int(abs(wt_aff - mut_aff)), distance.item()))
# compute_mean(results)
binders = ['LCVECMATRVQLECNLCSNV', 'STKKDEREYKSPAEIAEFLF', 'RVIYVQSKIKLSKSQKKSKS', 'GMKQKROLVSAVVKAPAMTA', 'GRDRKQVSESPEYSLKSRKK', 'NEFIVIDTSIDIGPPRSRQA']
for binder in binders:
distance = predict(model, binder, wildtype, mutant)
print(f"Distance = {distance}")