ctheodoris's picture
precommit formatting
f07bfd7
raw
history blame
13.1 kB
import os
import random
import numpy as np
import pandas as pd
import torch
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from .imports import *
from .model import GeneformerMultiTask
from .utils import calculate_task_specific_metrics
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def initialize_wandb(config):
if config.get("use_wandb", False):
import wandb
wandb.init(project=config["wandb_project"], config=config)
print("Weights & Biases (wandb) initialized and will be used for logging.")
else:
print(
"Weights & Biases (wandb) is not enabled. Logging will use other methods."
)
def create_model(config, num_labels_list, device):
model = GeneformerMultiTask(
config["pretrained_path"],
num_labels_list,
dropout_rate=config["dropout_rate"],
use_task_weights=config["use_task_weights"],
task_weights=config["task_weights"],
max_layers_to_freeze=config["max_layers_to_freeze"],
use_attention_pooling=config["use_attention_pooling"],
)
if config["use_data_parallel"]:
model = nn.DataParallel(model)
return model.to(device)
def setup_optimizer_and_scheduler(model, config, total_steps):
optimizer = AdamW(
model.parameters(),
lr=config["learning_rate"],
weight_decay=config["weight_decay"],
)
warmup_steps = int(config["warmup_ratio"] * total_steps)
if config["lr_scheduler_type"] == "linear":
scheduler = get_linear_schedule_with_warmup(
optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps
)
elif config["lr_scheduler_type"] == "cosine":
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_steps,
num_cycles=0.5,
)
return optimizer, scheduler
def train_epoch(
model, train_loader, optimizer, scheduler, device, config, writer, epoch
):
model.train()
progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{config['epochs']}")
for batch_idx, batch in enumerate(progress_bar):
optimizer.zero_grad()
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = [
batch["labels"][task_name].to(device) for task_name in config["task_names"]
]
loss, _, _ = model(input_ids, attention_mask, labels)
loss.backward()
if config["gradient_clipping"]:
torch.nn.utils.clip_grad_norm_(model.parameters(), config["max_grad_norm"])
optimizer.step()
scheduler.step()
writer.add_scalar(
"Training Loss", loss.item(), epoch * len(train_loader) + batch_idx
)
if config.get("use_wandb", False):
import wandb
wandb.log({"Training Loss": loss.item()})
# Update progress bar
progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})
return loss.item() # Return the last batch loss
def validate_model(model, val_loader, device, config):
model.eval()
val_loss = 0.0
task_true_labels = {task_name: [] for task_name in config["task_names"]}
task_pred_labels = {task_name: [] for task_name in config["task_names"]}
task_pred_probs = {task_name: [] for task_name in config["task_names"]}
with torch.no_grad():
for batch in val_loader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = [
batch["labels"][task_name].to(device)
for task_name in config["task_names"]
]
loss, logits, _ = model(input_ids, attention_mask, labels)
val_loss += loss.item()
for sample_idx in range(len(batch["input_ids"])):
for i, task_name in enumerate(config["task_names"]):
true_label = batch["labels"][task_name][sample_idx].item()
pred_label = torch.argmax(logits[i][sample_idx], dim=-1).item()
pred_prob = (
torch.softmax(logits[i][sample_idx], dim=-1).cpu().numpy()
)
task_true_labels[task_name].append(true_label)
task_pred_labels[task_name].append(pred_label)
task_pred_probs[task_name].append(pred_prob)
val_loss /= len(val_loader)
return val_loss, task_true_labels, task_pred_labels, task_pred_probs
def log_metrics(task_metrics, val_loss, config, writer, epochs):
for task_name, metrics in task_metrics.items():
print(
f"{task_name} - Validation F1 Macro: {metrics['f1']:.4f}, Validation Accuracy: {metrics['accuracy']:.4f}"
)
if config.get("use_wandb", False):
import wandb
wandb.log(
{
f"{task_name} Validation F1 Macro": metrics["f1"],
f"{task_name} Validation Accuracy": metrics["accuracy"],
}
)
writer.add_scalar("Validation Loss", val_loss, epochs)
for task_name, metrics in task_metrics.items():
writer.add_scalar(f"{task_name} - Validation F1 Macro", metrics["f1"], epochs)
writer.add_scalar(
f"{task_name} - Validation Accuracy", metrics["accuracy"], epochs
)
def save_validation_predictions(
val_cell_id_mapping,
task_true_labels,
task_pred_labels,
task_pred_probs,
config,
trial_number=None,
):
if trial_number is not None:
trial_results_dir = os.path.join(config["results_dir"], f"trial_{trial_number}")
os.makedirs(trial_results_dir, exist_ok=True)
val_preds_file = os.path.join(trial_results_dir, "val_preds.csv")
else:
val_preds_file = os.path.join(config["results_dir"], "manual_run_val_preds.csv")
rows = []
for sample_idx in range(len(val_cell_id_mapping)):
row = {"Cell ID": val_cell_id_mapping[sample_idx]}
for task_name in config["task_names"]:
row[f"{task_name} True"] = task_true_labels[task_name][sample_idx]
row[f"{task_name} Pred"] = task_pred_labels[task_name][sample_idx]
row[f"{task_name} Probabilities"] = ",".join(
map(str, task_pred_probs[task_name][sample_idx])
)
rows.append(row)
df = pd.DataFrame(rows)
df.to_csv(val_preds_file, index=False)
print(f"Validation predictions saved to {val_preds_file}")
def train_model(
config,
device,
train_loader,
val_loader,
train_cell_id_mapping,
val_cell_id_mapping,
num_labels_list,
):
set_seed(config["seed"])
initialize_wandb(config)
model = create_model(config, num_labels_list, device)
total_steps = len(train_loader) * config["epochs"]
optimizer, scheduler = setup_optimizer_and_scheduler(model, config, total_steps)
log_dir = os.path.join(config["tensorboard_log_dir"], "manual_run")
writer = SummaryWriter(log_dir=log_dir)
epoch_progress = tqdm(range(config["epochs"]), desc="Training Progress")
for epoch in epoch_progress:
last_loss = train_epoch(
model, train_loader, optimizer, scheduler, device, config, writer, epoch
)
epoch_progress.set_postfix({"last_loss": f"{last_loss:.4f}"})
val_loss, task_true_labels, task_pred_labels, task_pred_probs = validate_model(
model, val_loader, device, config
)
task_metrics = calculate_task_specific_metrics(task_true_labels, task_pred_labels)
log_metrics(task_metrics, val_loss, config, writer, config["epochs"])
writer.close()
save_validation_predictions(
val_cell_id_mapping, task_true_labels, task_pred_labels, task_pred_probs, config
)
if config.get("use_wandb", False):
import wandb
wandb.finish()
print(f"\nFinal Validation Loss: {val_loss:.4f}")
return val_loss, model # Return both the validation loss and the trained model
def objective(
trial,
train_loader,
val_loader,
train_cell_id_mapping,
val_cell_id_mapping,
num_labels_list,
config,
device,
):
set_seed(config["seed"]) # Set the seed before each trial
initialize_wandb(config)
# Hyperparameters
config["learning_rate"] = trial.suggest_float(
"learning_rate",
config["hyperparameters"]["learning_rate"]["low"],
config["hyperparameters"]["learning_rate"]["high"],
log=config["hyperparameters"]["learning_rate"]["log"],
)
config["warmup_ratio"] = trial.suggest_float(
"warmup_ratio",
config["hyperparameters"]["warmup_ratio"]["low"],
config["hyperparameters"]["warmup_ratio"]["high"],
)
config["weight_decay"] = trial.suggest_float(
"weight_decay",
config["hyperparameters"]["weight_decay"]["low"],
config["hyperparameters"]["weight_decay"]["high"],
)
config["dropout_rate"] = trial.suggest_float(
"dropout_rate",
config["hyperparameters"]["dropout_rate"]["low"],
config["hyperparameters"]["dropout_rate"]["high"],
)
config["lr_scheduler_type"] = trial.suggest_categorical(
"lr_scheduler_type", config["hyperparameters"]["lr_scheduler_type"]["choices"]
)
config["use_attention_pooling"] = trial.suggest_categorical(
"use_attention_pooling", [True, False]
)
if config["use_task_weights"]:
config["task_weights"] = [
trial.suggest_float(
f"task_weight_{i}",
config["hyperparameters"]["task_weights"]["low"],
config["hyperparameters"]["task_weights"]["high"],
)
for i in range(len(num_labels_list))
]
weight_sum = sum(config["task_weights"])
config["task_weights"] = [
weight / weight_sum for weight in config["task_weights"]
]
else:
config["task_weights"] = None
# Fix for max_layers_to_freeze
if isinstance(config["max_layers_to_freeze"], dict):
config["max_layers_to_freeze"] = trial.suggest_int(
"max_layers_to_freeze",
config["max_layers_to_freeze"]["min"],
config["max_layers_to_freeze"]["max"],
)
elif isinstance(config["max_layers_to_freeze"], int):
# If it's already an int, we don't need to suggest it
pass
else:
raise ValueError("Invalid type for max_layers_to_freeze. Expected dict or int.")
model = create_model(config, num_labels_list, device)
total_steps = len(train_loader) * config["epochs"]
optimizer, scheduler = setup_optimizer_and_scheduler(model, config, total_steps)
log_dir = os.path.join(config["tensorboard_log_dir"], f"trial_{trial.number}")
writer = SummaryWriter(log_dir=log_dir)
for epoch in range(config["epochs"]):
train_epoch(
model, train_loader, optimizer, scheduler, device, config, writer, epoch
)
val_loss, task_true_labels, task_pred_labels, task_pred_probs = validate_model(
model, val_loader, device, config
)
task_metrics = calculate_task_specific_metrics(task_true_labels, task_pred_labels)
log_metrics(task_metrics, val_loss, config, writer, config["epochs"])
writer.close()
save_validation_predictions(
val_cell_id_mapping,
task_true_labels,
task_pred_labels,
task_pred_probs,
config,
trial.number,
)
trial.set_user_attr("model_state_dict", model.state_dict())
trial.set_user_attr("task_weights", config["task_weights"])
trial.report(val_loss, config["epochs"])
if trial.should_prune():
raise optuna.TrialPruned()
if config.get("use_wandb", False):
import wandb
wandb.log(
{
"trial_number": trial.number,
"val_loss": val_loss,
**{
f"{task_name}_f1": metrics["f1"]
for task_name, metrics in task_metrics.items()
},
**{
f"{task_name}_accuracy": metrics["accuracy"]
for task_name, metrics in task_metrics.items()
},
**{
k: v
for k, v in config.items()
if k
in [
"learning_rate",
"warmup_ratio",
"weight_decay",
"dropout_rate",
"lr_scheduler_type",
"use_attention_pooling",
"max_layers_to_freeze",
]
},
}
)
wandb.finish()
return val_loss