|
import os |
|
import shutil |
|
|
|
from sklearn.metrics import accuracy_score, f1_score |
|
from sklearn.preprocessing import LabelEncoder |
|
from transformers import AutoConfig, BertConfig, BertModel |
|
|
|
from .imports import * |
|
|
|
|
|
def save_model(model, model_save_directory): |
|
if not os.path.exists(model_save_directory): |
|
os.makedirs(model_save_directory) |
|
|
|
|
|
if isinstance(model, nn.DataParallel): |
|
model_state_dict = ( |
|
model.module.state_dict() |
|
) |
|
else: |
|
model_state_dict = model.state_dict() |
|
|
|
|
|
model_state_dict = { |
|
k.replace("module.", ""): v for k, v in model_state_dict.items() |
|
} |
|
|
|
model_save_path = os.path.join(model_save_directory, "pytorch_model.bin") |
|
torch.save(model_state_dict, model_save_path) |
|
|
|
|
|
if isinstance(model, nn.DataParallel): |
|
model.module.config.to_json_file( |
|
os.path.join(model_save_directory, "config.json") |
|
) |
|
else: |
|
model.config.to_json_file(os.path.join(model_save_directory, "config.json")) |
|
|
|
print(f"Model and configuration saved to {model_save_directory}") |
|
|
|
|
|
def calculate_task_specific_metrics(task_true_labels, task_pred_labels): |
|
task_metrics = {} |
|
for task_name in task_true_labels.keys(): |
|
true_labels = task_true_labels[task_name] |
|
pred_labels = task_pred_labels[task_name] |
|
f1 = f1_score(true_labels, pred_labels, average="macro") |
|
accuracy = accuracy_score(true_labels, pred_labels) |
|
task_metrics[task_name] = {"f1": f1, "accuracy": accuracy} |
|
return task_metrics |
|
|
|
|
|
def calculate_combined_f1(combined_labels, combined_preds): |
|
|
|
le = LabelEncoder() |
|
|
|
|
|
le.fit(combined_labels + combined_preds) |
|
encoded_true_labels = le.transform(combined_labels) |
|
encoded_pred_labels = le.transform(combined_preds) |
|
|
|
|
|
print("\nLabel Encoder Mapping:") |
|
for index, class_label in enumerate(le.classes_): |
|
print(f"'{class_label}': {index}") |
|
|
|
|
|
accuracy = accuracy_score(encoded_true_labels, encoded_pred_labels) |
|
|
|
|
|
f1 = f1_score(encoded_true_labels, encoded_pred_labels, average="macro") |
|
|
|
return f1, accuracy |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_layer_freeze_range(pretrained_path): |
|
""" |
|
Dynamically determines the number of layers to freeze based on the model depth from its configuration. |
|
Args: |
|
pretrained_path (str): Path to the pretrained model directory or model identifier. |
|
Returns: |
|
dict: A dictionary with 'min' and 'max' keys indicating the range of layers to freeze. |
|
""" |
|
if pretrained_path: |
|
config = AutoConfig.from_pretrained(pretrained_path) |
|
total_layers = config.num_hidden_layers |
|
return {"min": 0, "max": total_layers - 1} |
|
else: |
|
return {"min": 0, "max": 0} |
|
|