CHATBOT / model.py
Marcos12886's picture
Update model.py
d99bc5f verified
raw
history blame
6.93 kB
import os
import json
import random
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
from huggingface_hub import login, upload_folder
from transformers.integrations import TensorBoardCallback
from transformers import (
Wav2Vec2FeatureExtractor, HubertConfig, HubertForSequenceClassification,
Trainer, TrainingArguments,
EarlyStoppingCallback
)
MODEL = "ntu-spml/distilhubert" # modelo base utilizado, para usar otro basta con cambiar esto
FEATURE_EXTRACTOR = Wav2Vec2FeatureExtractor.from_pretrained(MODEL)
seed = 123
MAX_DURATION = 1.00
SAMPLING_RATE = FEATURE_EXTRACTOR.sampling_rate # 16000
token = os.getenv('MODEL_REPO_ID')
config_file = "models_config.json"
clasificador = "class"
monitor = "mon"
batch_size = 4096
class AudioDataset(Dataset):
def __init__(self, dataset_path, label2id):
self.dataset_path = dataset_path
self.label2id = label2id
self.file_paths = []
self.labels = []
for label_dir, label_id in self.label2id.items():
label_path = os.path.join(self.dataset_path, label_dir)
if os.path.isdir(label_path):
for file_name in os.listdir(label_path):
audio_path = os.path.join(label_path, file_name)
self.file_paths.append(audio_path)
self.labels.append(label_id)
def __len__(self):
return len(self.file_paths)
def __getitem__(self, idx):
audio_path = self.file_paths[idx]
label = self.labels[idx]
input_values = self.preprocess_audio(audio_path)
return {
"input_values": input_values,
"labels": torch.tensor(label)
}
def preprocess_audio(self, audio_path):
waveform, sample_rate = torchaudio.load(
audio_path,
normalize=True, # Convierte a float32
# num_frames= # TODO: Probar para que no haga falta recortar los audios
)
if sample_rate != SAMPLING_RATE: # Resamplear si no es 16kHz
resampler = torchaudio.transforms.Resample(sample_rate, SAMPLING_RATE)
waveform = resampler(waveform)
if waveform.shape[0] > 1: # Si es stereo, convertir a mono
waveform = waveform.mean(dim=0)
waveform = waveform / torch.max(torch.abs(waveform))
inputs = FEATURE_EXTRACTOR(
waveform,
sampling_rate=SAMPLING_RATE,
return_tensors="pt",
max_length=int(SAMPLING_RATE * MAX_DURATION),
truncation=True,
padding=True,
)
return inputs.input_values.squeeze()
def seed_everything():
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':16384:8'
def build_label_mappings(dataset_path):
label2id = {}
id2label = {}
label_id = 0
for label_dir in os.listdir(dataset_path):
if os.path.isdir(os.path.join(dataset_path, label_dir)):
label2id[label_dir] = label_id
id2label[label_id] = label_dir
label_id += 1
return label2id, id2label
def create_dataloader(dataset_path, test_size=0.2, num_workers=12, shuffle=True, pin_memory=True):
label2id, id2label = build_label_mappings(dataset_path)
dataset = AudioDataset(dataset_path, label2id)
dataset_size = len(dataset)
indices = list(range(dataset_size))
random.shuffle(indices)
split_idx = int(dataset_size * (1 - test_size))
train_indices = indices[:split_idx]
test_indices = indices[split_idx:]
train_dataset = torch.utils.data.Subset(dataset, train_indices)
test_dataset = torch.utils.data.Subset(dataset, test_indices)
train_dataloader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory
)
test_dataloader = DataLoader(
test_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory
)
return train_dataloader, test_dataloader, label2id, id2label
def load_model(num_labels, label2id, id2label):
config = HubertConfig.from_pretrained(
MODEL,
num_labels=num_labels,
label2id=label2id,
id2label=id2label,
finetuning_task="audio-classification"
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = HubertForSequenceClassification.from_pretrained( # TODO: mirar parámetros. Posibles optimizaciones
MODEL,
config=config,
torch_dtype=torch.float32, # No afecta 1ª época, mejor ponerlo
)
model.to(device)
return model
def model_params(dataset_path):
train_dataloader, test_dataloader, label2id, id2label = create_dataloader(dataset_path)
model = load_model(num_labels=len(id2label), label2id=label2id, id2label=id2label)
return model, train_dataloader, test_dataloader, id2label
def compute_metrics(eval_pred):
predictions = torch.argmax(input=eval_pred.predictions)
references = eval_pred.label_ids
return {
"accuracy": torch.mean(predictions == references),
}
def main(training_args, output_dir, dataset_path):
seed_everything()
model, train_dataloader, test_dataloader, _ = model_params(dataset_path)
trainer = Trainer(
model=model,
args=training_args,
compute_metrics=compute_metrics,
train_dataset=train_dataloader.dataset,
eval_dataset=test_dataloader.dataset,
callbacks=[TensorBoardCallback(), EarlyStoppingCallback(early_stopping_patience=3)]
)
torch.cuda.empty_cache() # liberar memoria de la GPU
trainer.train() # se pueden modificar los parámetros para continuar el train
login(token, add_to_git_credential=True)
trainer.push_to_hub(token=token) # Subir modelo a mi cuenta. Necesario para hacer la predicción, no sé por qué.
trainer.save_model(output_dir) # para subir el modelo a Hugging Face. Necesario para hacer la predicción, no sé por qué.
os.makedirs(output_dir, exist_ok=True) # Crear carpeta con el modelo si no existe
# upload_folder(repo_id=f"A-POR-LOS-8000/{output_dir}",folder_path=output_dir, token=token) # subir modelo a organización
def load_config(model_name):
with open(config_file, 'r') as f:
config = json.load(f)
model_config = config[model_name]
training_args = TrainingArguments(**model_config["training_args"])
model_config["training_args"] = training_args
return model_config
if __name__ == "__main__":
# config = load_config(clasificador) # PARA CAMBIAR MODELOS
config = load_config(monitor) # PARA CAMBIAR MODELOS
training_args = config["training_args"]
output_dir = config["output_dir"]
dataset_path = config["dataset_path"]
main(training_args, output_dir, dataset_path)