FM4M-demo1 / models /fm4m.py
ipd's picture
init
85ec4af
raw
history blame
34.5 kB
from sklearn.metrics import roc_auc_score, roc_curve
import datetime
import os
import umap
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle
import json
from xgboost import XGBClassifier, XGBRegressor
import xgboost as xgb
from sklearn.metrics import roc_auc_score, mean_squared_error
import xgboost as xgb
from sklearn.svm import SVR
from sklearn.linear_model import LinearRegression
from sklearn.kernel_ridge import KernelRidge
import json
from sklearn.compose import TransformedTargetRegressor
from sklearn.preprocessing import MinMaxScaler
import torch
from transformers import AutoTokenizer, AutoModel
import sys
sys.path.append("models/")
from models.selfies_model.load import SELFIES as bart
from models.mhg_model import load as mhg
from models.smi_ted.smi_ted_light.load import load_smi_ted
datasets = {}
models = {}
downstream_models ={}
def avail_models_data():
global datasets
global models
datasets = [{"Dataset": "hiv", "Input": "smiles", "Output": "HIV_active", "Path": "data/hiv", "Timestamp": "2024-06-26 11:27:37"},
{"Dataset": "esol", "Input": "smiles", "Output": "ESOL predicted log solubility in mols per litre", "Path": "data/esol", "Timestamp": "2024-06-26 11:31:46"},
{"Dataset": "freesolv", "Input": "smiles", "Output": "expt", "Path": "data/freesolv", "Timestamp": "2024-06-26 11:33:47"},
{"Dataset": "lipo", "Input": "smiles", "Output": "y", "Path": "data/lipo", "Timestamp": "2024-06-26 11:34:37"},
{"Dataset": "bace", "Input": "smiles", "Output": "Class", "Path": "data/bace", "Timestamp": "2024-06-26 11:36:40"},
{"Dataset": "bbbp", "Input": "smiles", "Output": "p_np", "Path": "data/bbbp", "Timestamp": "2024-06-26 11:39:23"},
{"Dataset": "clintox", "Input": "smiles", "Output": "CT_TOX", "Path": "data/clintox", "Timestamp": "2024-06-26 11:42:43"}]
models = [{"Name": "bart","Model Name": "SELFIES-TED","Description": "BART model for string based SELFIES modality", "Timestamp": "2024-06-21 12:32:20"},
{"Name": "mol-xl","Model Name": "Molformer", "Description": "MolFormer model for string based SMILES modality", "Timestamp": "2024-06-21 12:35:56"},
{"Name": "mhg", "Model Name": "MHG-GED","Description": "Molecular hypergraph model", "Timestamp": "2024-07-10 00:09:42"},
{"Name": "smi-ted", "Model Name": "SMI-TED","Description": "SMILES based encoder decoder model", "Timestamp": "2024-07-10 00:09:42"}]
def avail_models(raw=False):
global models
models = [{"Name": "smi-ted", "Model Name": "SMI-TED","Description": "SMILES based encoder decoder model"},
{"Name": "bart","Model Name": "SELFIES-TED","Description": "BART model for string based SELFIES modality"},
{"Name": "mol-xl","Model Name": "Molformer", "Description": "MolFormer model for string based SMILES modality"},
{"Name": "mhg", "Model Name": "MHG-GED","Description": "Molecular hypergraph model"},
]
if raw: return models
else:
return pd.DataFrame(models).drop('Name', axis=1)
return models
def avail_downstream_models():
global downstream_models
with open("downstream_models.json", "r") as outfile:
downstream_models = json.load(outfile)
return downstream_models
def avail_datasets():
global datasets
datasets = [{"Dataset": "hiv", "Input": "smiles", "Output": "HIV_active", "Path": "data/hiv",
"Timestamp": "2024-06-26 11:27:37"},
{"Dataset": "esol", "Input": "smiles", "Output": "ESOL predicted log solubility in mols per litre",
"Path": "data/esol", "Timestamp": "2024-06-26 11:31:46"},
{"Dataset": "freesolv", "Input": "smiles", "Output": "expt", "Path": "data/freesolv",
"Timestamp": "2024-06-26 11:33:47"},
{"Dataset": "lipo", "Input": "smiles", "Output": "y", "Path": "data/lipo",
"Timestamp": "2024-06-26 11:34:37"},
{"Dataset": "bace", "Input": "smiles", "Output": "Class", "Path": "data/bace",
"Timestamp": "2024-06-26 11:36:40"},
{"Dataset": "bbbp", "Input": "smiles", "Output": "p_np", "Path": "data/bbbp",
"Timestamp": "2024-06-26 11:39:23"},
{"Dataset": "clintox", "Input": "smiles", "Output": "CT_TOX", "Path": "data/clintox",
"Timestamp": "2024-06-26 11:42:43"}]
return datasets
def reset():
"""datasets = {"esol": ["smiles", "ESOL predicted log solubility in mols per litre", "data/esol", "2024-06-26 11:36:46.509324"],
"freesolv": ["smiles", "expt", "data/freesolv", "2024-06-26 11:37:37.393273"],
"lipo": ["smiles", "y", "data/lipo", "2024-06-26 11:37:37.393273"],
"hiv": ["smiles", "HIV_active", "data/hiv", "2024-06-26 11:37:37.393273"],
"bace": ["smiles", "Class", "data/bace", "2024-06-26 11:38:40.058354"],
"bbbp": ["smiles", "p_np", "data/bbbp","2024-06-26 11:38:40.058354"],
"clintox": ["smiles", "CT_TOX", "data/clintox","2024-06-26 11:38:40.058354"],
"sider": ["smiles","1:", "data/sider","2024-06-26 11:38:40.058354"],
"tox21": ["smiles",":-2", "data/tox21","2024-06-26 11:38:40.058354"]
}"""
datasets = [
{"Dataset": "hiv", "Input": "smiles", "Output": "HIV_active", "Path": "data/hiv", "Timestamp": "2024-06-26 11:27:37"},
{"Dataset": "esol", "Input": "smiles", "Output": "ESOL predicted log solubility in mols per litre", "Path": "data/esol", "Timestamp": "2024-06-26 11:31:46"},
{"Dataset": "freesolv", "Input": "smiles", "Output": "expt", "Path": "data/freesolv", "Timestamp": "2024-06-26 11:33:47"},
{"Dataset": "lipo", "Input": "smiles", "Output": "y", "Path": "data/lipo", "Timestamp": "2024-06-26 11:34:37"},
{"Dataset": "bace", "Input": "smiles", "Output": "Class", "Path": "data/bace", "Timestamp": "2024-06-26 11:36:40"},
{"Dataset": "bbbp", "Input": "smiles", "Output": "p_np", "Path": "data/bbbp", "Timestamp": "2024-06-26 11:39:23"},
{"Dataset": "clintox", "Input": "smiles", "Output": "CT_TOX", "Path": "data/clintox", "Timestamp": "2024-06-26 11:42:43"},
#{"Dataset": "sider", "Input": "smiles", "Output": "1:", "path": "data/sider", "Timestamp": "2024-06-26 11:38:40.058354"},
#{"Dataset": "tox21", "Input": "smiles", "Output": ":-2", "path": "data/tox21", "Timestamp": "2024-06-26 11:38:40.058354"}
]
models = [{"Name": "bart", "Description": "BART model for string based SELFIES modality",
"Timestamp": "2024-06-21 12:32:20"},
{"Name": "mol-xl", "Description": "MolFormer model for string based SMILES modality",
"Timestamp": "2024-06-21 12:35:56"},
{"Name": "mhg", "Description": "MHG", "Timestamp": "2024-07-10 00:09:42"},
{"Name": "spec-gru", "Description": "Spectrum modality with GRU", "Timestamp": "2024-07-10 00:09:42"},
{"Name": "spec-lstm", "Description": "Spectrum modality with LSTM", "Timestamp": "2024-07-10 00:09:54"},
{"Name": "3d-vae", "Description": "VAE model for 3D atom positions", "Timestamp": "2024-07-10 00:10:08"}]
downstream_models = [
{"Name": "XGBClassifier", "Description": "XG Boost Classifier",
"Timestamp": "2024-06-21 12:31:20"},
{"Name": "XGBRegressor", "Description": "XG Boost Regressor",
"Timestamp": "2024-06-21 12:32:56"},
{"Name": "2-FNN", "Description": "A two layer feedforward network",
"Timestamp": "2024-06-24 14:34:16"},
{"Name": "3-FNN", "Description": "A three layer feedforward network",
"Timestamp": "2024-06-24 14:38:37"},
]
with open("datasets.json", "w") as outfile:
json.dump(datasets, outfile)
with open("models.json", "w") as outfile:
json.dump(models, outfile)
with open("downstream_models.json", "w") as outfile:
json.dump(downstream_models, outfile)
def update_data_list(list_data):
#datasets[list_data[0]] = list_data[1:]
with open("datasets.json", "w") as outfile:
json.dump(datasets, outfile)
avail_models_data()
def update_model_list(list_model):
#models[list_model[0]] = list_model[1]
with open("models.json", "w") as outfile:
json.dump(list_model, outfile)
avail_models_data()
def update_downstream_model_list(list_model):
#models[list_model[0]] = list_model[1]
with open("downstream_models.json", "w") as outfile:
json.dump(list_model, outfile)
avail_models_data()
avail_models_data()
def list_models():
#print(*list(models.keys()),sep='\n')
data = avail_models(raw=True)
# Convert data to a pandas DataFrame
df = pd.DataFrame(data)
# Add a column for displaying row numbers starting from 1
df.index += 1
# Create dropdown widget for sorting
sort_dropdown = widgets.Dropdown(
options=['Name', 'Timestamp'],
value='Name',
description='Sort by:',
disabled=False,
)
# Output widget to display the table
output = widgets.Output()
# Define function to update display based on sorting
def update_display(change):
with output:
output.clear_output(wait=True)
sorted_df = df.sort_values(by=sort_dropdown.value)
display(sorted_df.style.set_properties(**{
'text-align': 'left', 'border': '1px solid #ddd',
}))
# Attach the update_display function to the dropdown widget
sort_dropdown.observe(update_display, names='value')
# Display the dropdown and the table initially
display(sort_dropdown, output)
update_display(None)
def list_downstream_models():
#print(*list(models.keys()),sep='\n')
data = avail_downstream_models()
# Convert data to a pandas DataFrame
df = pd.DataFrame(data)
# Add a column for displaying row numbers starting from 1
df.index += 1
# Create dropdown widget for sorting
sort_dropdown = widgets.Dropdown(
options=['Name', 'Timestamp'],
value='Timestamp',
description='Sort by:',
disabled=False,
)
# Output widget to display the table
output = widgets.Output()
# Define function to update display based on sorting
def update_display(change):
with output:
output.clear_output(wait=True)
sorted_df = df.sort_values(by=sort_dropdown.value)
display(sorted_df.style.set_properties(**{
'text-align': 'left', 'border': '1px solid #ddd',
}))
# Attach the update_display function to the dropdown widget
sort_dropdown.observe(update_display, names='value')
# Display the dropdown and the table initially
display(sort_dropdown, output)
update_display(None)
def list_data():
#print(*list(datasets.keys()),sep='\n')
data = avail_datasets()
# Convert data to a pandas DataFrame
df = pd.DataFrame(data)
# Add a column for displaying row numbers starting from 1
df.index += 1
# Create dropdown widget for sorting
sort_dropdown = widgets.Dropdown(
options=['Dataset', 'Input', 'Output', 'Path', 'Timestamp'],
value='Input',
description='Sort by:',
disabled=False,
)
# Output widget to display the table
output = widgets.Output()
# Define function to update display based on sorting
def update_display(change):
with output:
output.clear_output(wait=True)
sorted_df = df.sort_values(by=sort_dropdown.value)
display(sorted_df.style.set_properties(**{
'text-align': 'left', 'border': '1px solid #ddd',
}))
# Attach the update_display function to the dropdown widget
sort_dropdown.observe(update_display, names='value')
# Display the dropdown and the table initially
display(sort_dropdown, output)
update_display(None)
def vizualize(roc_auc,fpr, tpr, features, labels):
#def vizualize(features, labels):
reducer = umap.UMAP(metric="jaccard", n_neighbors=20, n_components=2, low_memory=True, min_dist=0.001, verbose=False)
features_umap = reducer.fit_transform(features)
x = labels.values
index_0 = [index for index in range(len(x)) if x[index] == 0]
index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap[index_0]
class_1 = features_umap[index_1]
# Function to create ROC AUC plot
def plot_roc_auc():
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc='lower right')
plt.show()
# Function to create scatter plot of the dataset distribution
def plot_distribution():
plt.figure(figsize=(8, 6))
#plt.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.coolwarm, edgecolors='k')
plt.scatter(class_1[:, 0], class_1[:, 1], c='red', label='Class 1')
plt.scatter(class_0[:, 0], class_0[:, 1], c='blue', label='Class 0')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Dataset Distribution')
plt.show()
# Create tabs using ipywidgets
tab_contents = ['ROC AUC', 'Distribution']
children = [widgets.Output(), widgets.Output()]
tab = widgets.Tab()
tab.children = children
for i in range(len(tab_contents)):
tab.set_title(i, tab_contents[i])
# Display plots in their respective tabs
with children[0]:
plot_roc_auc()
with children[1]:
plot_distribution()
# Display the tab widget
display(tab)
def get_representation(train_data,test_data,model_type, return_tensor=True):
alias = {"MHG-GED": "mhg", "SELFIES-TED": "bart", "MolFormer": "mol-xl", "Molformer": "mol-xl", "SMI-TED": "smi-ted"}
if model_type in alias.keys():
model_type = alias[model_type]
if model_type == "mhg":
model = mhg.load("models/mhg_model/pickles/mhggnn_pretrained_model_0724_2023.pickle")
with torch.no_grad():
train_emb = model.encode(train_data)
x_batch = torch.stack(train_emb)
test_emb = model.encode(test_data)
x_batch_test = torch.stack(test_emb)
if not return_tensor:
x_batch = pd.DataFrame(x_batch)
x_batch_test = pd.DataFrame(x_batch_test)
elif model_type == "bart":
model = bart()
model.load()
x_batch = model.encode(train_data, return_tensor=return_tensor)
x_batch_test = model.encode(test_data, return_tensor=return_tensor)
elif model_type == "smi-ted":
model = load_smi_ted(folder='./models/smi_ted/smi_ted_light', ckpt_filename='smi-ted-Light_40.pt')
with torch.no_grad():
x_batch = model.encode(train_data, return_torch=return_tensor)
x_batch_test = model.encode(test_data, return_torch=return_tensor)
elif model_type == "mol-xl":
model = AutoModel.from_pretrained("ibm/MoLFormer-XL-both-10pct", deterministic_eval=True,
trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained("ibm/MoLFormer-XL-both-10pct", trust_remote_code=True)
if type(train_data) == list:
inputs = tokenizer(train_data, padding=True, return_tensors="pt")
else:
inputs = tokenizer(list(train_data.values), padding=True, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
x_batch = outputs.pooler_output
if type(test_data) == list:
inputs = tokenizer(test_data, padding=True, return_tensors="pt")
else:
inputs = tokenizer(list(test_data.values), padding=True, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
x_batch_test = outputs.pooler_output
if not return_tensor:
x_batch = pd.DataFrame(x_batch)
x_batch_test = pd.DataFrame(x_batch_test)
return x_batch, x_batch_test
def single_modal(model,dataset, downstream_model,params):
print(model)
alias = {"MHG-GED":"mhg", "SELFIES-TED": "bart", "MolFormer":"mol-xl", "SMI-TED": "smi-ted"}
data = avail_models(raw=True)
df = pd.DataFrame(data)
print(list(df["Name"].values))
if alias[model] in list(df["Name"].values):
if model in alias.keys():
model_type = alias[model]
else:
model_type = model
else:
print("Model not available")
return
data = avail_datasets()
df = pd.DataFrame(data)
print(list(df["Dataset"].values))
if dataset in list(df["Dataset"].values):
task = dataset
with open(f"representation/{task}_{model_type}.pkl", "rb") as f1:
x_batch, y_batch, x_batch_test, y_batch_test = pickle.load(f1)
print(f" Representation loaded successfully")
else:
print("Custom Dataset")
#return
components = dataset.split(",")
train_data = pd.read_csv(components[0])[components[2]]
test_data = pd.read_csv(components[1])[components[2]]
y_batch = pd.read_csv(components[0])[components[3]]
y_batch_test = pd.read_csv(components[1])[components[3]]
x_batch, x_batch_test = get_representation(train_data,test_data,model_type)
print(f" Representation loaded successfully")
print(f" Calculating ROC AUC Score ...")
if downstream_model == "XGBClassifier":
xgb_predict_concat = XGBClassifier(**params) # n_estimators=5000, learning_rate=0.01, max_depth=10
xgb_predict_concat.fit(x_batch, y_batch)
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1]
roc_auc = roc_auc_score(y_batch_test, y_prob)
fpr, tpr, _ = roc_curve(y_batch_test, y_prob)
print(f"ROC-AUC Score: {roc_auc:.4f}")
try:
with open(f"plot_emb/{task}_{model_type}.pkl", "rb") as f1:
class_0,class_1 = pickle.load(f1)
except:
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1,
verbose=False)
n_samples = np.minimum(1000, len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
x = y_batch.values[:n_samples]
index_0 = [index for index in range(len(x)) if x[index] == 0]
index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap[index_0]
class_1 = features_umap[index_1]
print("Generating latent plots : Done")
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch )
result = f"ROC-AUC Score: {roc_auc:.4f}"
return result, roc_auc,fpr, tpr, class_0, class_1
elif downstream_model == "DefaultClassifier":
xgb_predict_concat = XGBClassifier() # n_estimators=5000, learning_rate=0.01, max_depth=10
xgb_predict_concat.fit(x_batch, y_batch)
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1]
roc_auc = roc_auc_score(y_batch_test, y_prob)
fpr, tpr, _ = roc_curve(y_batch_test, y_prob)
print(f"ROC-AUC Score: {roc_auc:.4f}")
try:
with open(f"plot_emb/{task}_{model_type}.pkl", "rb") as f1:
class_0,class_1 = pickle.load(f1)
except:
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors= 10, n_components=2, low_memory=True, min_dist=0.1, verbose=False)
n_samples = np.minimum(1000,len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
x = y_batch.values[:n_samples]
index_0 = [index for index in range(len(x)) if x[index] == 0]
index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap[index_0]
class_1 = features_umap[index_1]
print("Generating latent plots : Done")
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch )
result = f"ROC-AUC Score: {roc_auc:.4f}"
return result, roc_auc,fpr, tpr, class_0, class_1
elif downstream_model == "SVR":
regressor = SVR(**params)
model = TransformedTargetRegressor(regressor= regressor,
transformer = MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch,y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1,
verbose=False)
n_samples = np.minimum(1000, len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
x = y_batch.values[:n_samples]
#index_0 = [index for index in range(len(x)) if x[index] == 0]
#index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap#[index_0]
class_1 = features_umap#[index_1]
print("Generating latent plots : Done")
return result, RMSE_score,y_batch_test, y_prob, class_0, class_1
elif downstream_model == "Kernel Ridge":
regressor = KernelRidge(**params)
model = TransformedTargetRegressor(regressor=regressor,
transformer=MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch, y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1,
verbose=False)
n_samples = np.minimum(1000, len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
x = y_batch.values[:n_samples]
# index_0 = [index for index in range(len(x)) if x[index] == 0]
# index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap#[index_0]
class_1 = features_umap#[index_1]
print("Generating latent plots : Done")
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1
elif downstream_model == "Linear Regression":
regressor = LinearRegression(**params)
model = TransformedTargetRegressor(regressor=regressor,
transformer=MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch, y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1,
verbose=False)
n_samples = np.minimum(1000, len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
x = y_batch.values[:n_samples]
# index_0 = [index for index in range(len(x)) if x[index] == 0]
# index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap#[index_0]
class_1 = features_umap#[index_1]
print("Generating latent plots : Done")
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1
elif downstream_model == "DefaultRegressor":
regressor = SVR(kernel="rbf", degree=3, C=5, gamma="scale", epsilon=0.01)
model = TransformedTargetRegressor(regressor=regressor,
transformer=MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch, y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1,
verbose=False)
n_samples = np.minimum(1000, len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
x = y_batch.values[:n_samples]
# index_0 = [index for index in range(len(x)) if x[index] == 0]
# index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap#[index_0]
class_1 = features_umap#[index_1]
print("Generating latent plots : Done")
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1
def multi_modal(model_list,dataset, downstream_model,params):
print(model_list)
data = avail_datasets()
df = pd.DataFrame(data)
list(df["Dataset"].values)
if dataset in list(df["Dataset"].values):
task = dataset
predefined = True
else:
predefined = False
components = dataset.split(",")
train_data = pd.read_csv(components[0])[components[2]]
test_data = pd.read_csv(components[1])[components[2]]
y_batch = pd.read_csv(components[0])[components[3]]
y_batch_test = pd.read_csv(components[1])[components[3]]
print("Custom Dataset loaded")
data = avail_models(raw=True)
df = pd.DataFrame(data)
list(df["Name"].values)
alias = {"MHG-GED":"mhg", "SELFIES-TED": "bart", "MolFormer":"mol-xl", "SMI-TED":"smi-ted"}
#if set(model_list).issubset(list(df["Name"].values)):
if set(model_list).issubset(list(alias.keys())):
for i, model in enumerate(model_list):
if model in alias.keys():
model_type = alias[model]
else:
model_type = model
if i == 0:
if predefined:
with open(f"representation/{task}_{model_type}.pkl", "rb") as f1:
x_batch, y_batch, x_batch_test, y_batch_test = pickle.load(f1)
print(f" Loaded representation/{task}_{model_type}.pkl")
else:
x_batch, x_batch_test = get_representation(train_data, test_data, model_type)
x_batch = pd.DataFrame(x_batch)
x_batch_test = pd.DataFrame(x_batch_test)
else:
if predefined:
with open(f"representation/{task}_{model_type}.pkl", "rb") as f1:
x_batch_1, y_batch_1, x_batch_test_1, y_batch_test_1 = pickle.load(f1)
print(f" Loaded representation/{task}_{model_type}.pkl")
else:
x_batch_1, x_batch_test_1 = get_representation(train_data, test_data, model_type)
x_batch_1 = pd.DataFrame(x_batch_1)
x_batch_test_1 = pd.DataFrame(x_batch_test_1)
x_batch = pd.concat([x_batch, x_batch_1], axis=1)
x_batch_test = pd.concat([x_batch_test, x_batch_test_1], axis=1)
else:
print("Model not available")
return
num_columns = x_batch_test.shape[1]
x_batch_test.columns = [f'{i + 1}' for i in range(num_columns)]
num_columns = x_batch.shape[1]
x_batch.columns = [f'{i + 1}' for i in range(num_columns)]
print(f"Representations loaded successfully")
try:
with open(f"plot_emb/{task}_multi.pkl", "rb") as f1:
class_0, class_1 = pickle.load(f1)
except:
print("Generating latent plots")
reducer = umap.UMAP(metric='euclidean', n_neighbors=10, n_components=2, low_memory=True, min_dist=0.1,
verbose=False)
n_samples = np.minimum(1000, len(x_batch))
features_umap = reducer.fit_transform(x_batch[:n_samples])
if "Classifier" in downstream_model:
x = y_batch.values[:n_samples]
index_0 = [index for index in range(len(x)) if x[index] == 0]
index_1 = [index for index in range(len(x)) if x[index] == 1]
class_0 = features_umap[index_0]
class_1 = features_umap[index_1]
else:
class_0 = features_umap
class_1 = features_umap
print("Generating latent plots : Done")
print(f" Calculating ROC AUC Score ...")
if downstream_model == "XGBClassifier":
xgb_predict_concat = XGBClassifier(**params)#n_estimators=5000, learning_rate=0.01, max_depth=10)
xgb_predict_concat.fit(x_batch, y_batch)
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1]
roc_auc = roc_auc_score(y_batch_test, y_prob)
fpr, tpr, _ = roc_curve(y_batch_test, y_prob)
print(f"ROC-AUC Score: {roc_auc:.4f}")
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch )
#vizualize(x_batch_test, y_batch_test)
print(f"ROC-AUC Score: {roc_auc:.4f}")
result = f"ROC-AUC Score: {roc_auc:.4f}"
return result, roc_auc,fpr, tpr, class_0, class_1
elif downstream_model == "DefaultClassifier":
xgb_predict_concat = XGBClassifier()#n_estimators=5000, learning_rate=0.01, max_depth=10)
xgb_predict_concat.fit(x_batch, y_batch)
y_prob = xgb_predict_concat.predict_proba(x_batch_test)[:, 1]
roc_auc = roc_auc_score(y_batch_test, y_prob)
fpr, tpr, _ = roc_curve(y_batch_test, y_prob)
print(f"ROC-AUC Score: {roc_auc:.4f}")
#vizualize(roc_auc,fpr, tpr, x_batch, y_batch )
#vizualize(x_batch_test, y_batch_test)
print(f"ROC-AUC Score: {roc_auc:.4f}")
result = f"ROC-AUC Score: {roc_auc:.4f}"
return result, roc_auc,fpr, tpr, class_0, class_1
elif downstream_model == "SVR":
regressor = SVR(**params)
model = TransformedTargetRegressor(regressor= regressor,
transformer = MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch,y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
return result, RMSE_score,y_batch_test, y_prob, class_0, class_1
elif downstream_model == "Linear Regression":
regressor = LinearRegression(**params)
model = TransformedTargetRegressor(regressor=regressor,
transformer=MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch, y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1
elif downstream_model == "Kernel Ridge":
regressor = KernelRidge(**params)
model = TransformedTargetRegressor(regressor=regressor,
transformer=MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch, y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1
elif downstream_model == "DefaultRegressor":
regressor = SVR(kernel="rbf", degree=3, C=5, gamma="scale", epsilon=0.01)
model = TransformedTargetRegressor(regressor=regressor,
transformer=MinMaxScaler(feature_range=(-1, 1))
).fit(x_batch, y_batch)
y_prob = model.predict(x_batch_test)
RMSE_score = np.sqrt(mean_squared_error(y_batch_test, y_prob))
print(f"RMSE Score: {RMSE_score:.4f}")
result = f"RMSE Score: {RMSE_score:.4f}"
return result, RMSE_score, y_batch_test, y_prob, class_0, class_1
def finetune_optuna(x_batch,y_batch, x_batch_test, y_test ):
print(f" Finetuning with Optuna and calculating ROC AUC Score ...")
X_train = x_batch.values
y_train = y_batch.values
X_test = x_batch_test.values
y_test = y_test.values
def objective(trial):
# Define parameters to be optimized
params = {
# 'objective': 'binary:logistic',
'eval_metric': 'auc',
'verbosity': 0,
'n_estimators': trial.suggest_int('n_estimators', 1000, 10000),
# 'booster': trial.suggest_categorical('booster', ['gbtree', 'gblinear', 'dart']),
# 'lambda': trial.suggest_loguniform('lambda', 1e-8, 1.0),
'alpha': trial.suggest_loguniform('alpha', 1e-8, 1.0),
'max_depth': trial.suggest_int('max_depth', 1, 12),
# 'eta': trial.suggest_loguniform('eta', 1e-8, 1.0),
# 'gamma': trial.suggest_loguniform('gamma', 1e-8, 1.0),
# 'grow_policy': trial.suggest_categorical('grow_policy', ['depthwise', 'lossguide']),
# "subsample": trial.suggest_float("subsample", 0.05, 1.0),
# "colsample_bytree": trial.suggest_float("colsample_bytree", 0.05, 1.0),
}
# Train XGBoost model
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
model = xgb.train(params, dtrain)
# Predict probabilities
y_pred = model.predict(dtest)
# Calculate ROC AUC score
roc_auc = roc_auc_score(y_test, y_pred)
print("ROC_AUC : ", roc_auc)
return roc_auc