|
from predictions import get_embeddings, get_cosine_distance |
|
from utils.pt_util import restore_objects, save_model, save_objects, restore_model |
|
from utils.preprocessing import extract_fbanks |
|
from models.cross_entropy_model import FBankCrossEntropyNetV2 |
|
from trainer.cross_entropy_train import test, train |
|
import numpy as np |
|
import torch |
|
from data_proc.cross_entropy_dataset import FBanksCrossEntropyDataset, DataLoader |
|
import json |
|
from torch import optim |
|
import os |
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
|
|
|
|
|
async def train_auth( |
|
train_dataset_path: str = 'dataset-speaker-csf/fbanks-train', |
|
test_dataset_path: str = 'dataset-speaker-csf/fbanks-test', |
|
model_name: str = 'fbanks-net-auth', |
|
model_layers : int = 4, |
|
epochs: int = 2, |
|
lr: float = 0.0005, |
|
batch_size: int = 16, |
|
labId: str = '', |
|
): |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
import multiprocessing |
|
kwargs = {'num_workers': multiprocessing.cpu_count(), |
|
'pin_memory': True} if torch.cuda.is_available() else {} |
|
try: |
|
train_dataset = FBanksCrossEntropyDataset(train_dataset_path) |
|
train_loader = DataLoader( |
|
train_dataset, batch_size=batch_size, shuffle=True, **kwargs) |
|
test_dataset = FBanksCrossEntropyDataset(test_dataset_path) |
|
test_loader = DataLoader( |
|
test_dataset, batch_size=batch_size, shuffle=True, **kwargs) |
|
except: |
|
return 'path dataset test or train is not exist' |
|
if model_name == 'fbanks-net-auth': |
|
model = FBankCrossEntropyNetV2(num_layers= model_layers, reduction='mean').to(device) |
|
else: |
|
model = None |
|
return {"model not exist in lab"} |
|
|
|
model_path = f'./modelDir/{labId}/log_train/{model_name}/{model_layers}/' |
|
model = restore_model(model, model_path) |
|
last_epoch, max_accuracy, train_losses, test_losses, train_accuracies, test_accuracies = restore_objects( |
|
model_path, (0, 0, [], [], [], [])) |
|
start = last_epoch + 1 if max_accuracy > 0 else 0 |
|
|
|
models_path = [] |
|
optimizer = optim.Adam(model.parameters(), lr=lr) |
|
for epoch in range(start, epochs): |
|
train_loss, train_accuracy = train( |
|
model, device, train_loader, optimizer, epoch, 500) |
|
test_loss, test_accuracy = test(model, device, test_loader) |
|
print('After epoch: {}, train_loss: {}, test loss is: {}, train_accuracy: {}, ' |
|
'test_accuracy: {}'.format(epoch, train_loss, test_loss, train_accuracy, test_accuracy)) |
|
|
|
train_losses.append(train_loss) |
|
test_losses.append(test_loss) |
|
train_accuracies.append(train_accuracy) |
|
test_accuracies.append(test_accuracy) |
|
if test_accuracy > max_accuracy: |
|
max_accuracy = test_accuracy |
|
model_path = save_model(model, epoch, model_path) |
|
models_path.append(model_path) |
|
save_objects((epoch, max_accuracy, train_losses, test_losses, |
|
train_accuracies, test_accuracies), epoch, model_path) |
|
print('saved epoch: {} as checkpoint'.format(epoch)) |
|
train_history = { |
|
"train_accuracies": train_accuracies, |
|
"test_accuracies": test_accuracies, |
|
"train_losses": train_losses, |
|
"test_losses": test_losses, |
|
"model_path": models_path |
|
} |
|
return { |
|
'history': json.dumps(train_history) |
|
} |
|
|
|
|
|
async def test_auth( |
|
test_dataset_path: str = 'dataset-speaker-csf/fbanks-test', |
|
model_name: str = 'fbanks-net-auth', |
|
model_layers : int = 4, |
|
batch_size: int = 2, |
|
labId: str = '', |
|
): |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
import multiprocessing |
|
kwargs = {'num_workers': multiprocessing.cpu_count(), |
|
'pin_memory': True} if torch.cuda.is_available() else {} |
|
try: |
|
test_dataset = FBanksCrossEntropyDataset(test_dataset_path) |
|
test_loader = DataLoader( |
|
test_dataset, batch_size=batch_size, shuffle=True, **kwargs) |
|
except: |
|
return 'path dataset test is not exist' |
|
|
|
model_folder_path = f'./modelDir/{labId}/log_train/{model_name}/{model_layers}/' |
|
for file in os.listdir(model_folder_path): |
|
if file.endswith(".pth"): |
|
model_path = os.path.join(model_folder_path, file) |
|
if model_name == 'fbanks-net-auth': |
|
try: |
|
model = FBankCrossEntropyNetV2(num_layers=model_layers, reduction= "mean") |
|
cpkt = torch.load(model_path) |
|
model.load_state_dict(cpkt) |
|
model.to(device) |
|
except: |
|
print('cuda load is error') |
|
device = torch.device("cpu") |
|
model = FBankCrossEntropyNetV2(num_layers=model_layers,reduction= "mean") |
|
cpkt = torch.load(model_path) |
|
model.load_state_dict(cpkt) |
|
model.to(device) |
|
else: |
|
model = None |
|
return {"model not exist in lab"} |
|
test_loss, accurancy_mean = test(model, device, test_loader) |
|
|
|
return { |
|
'test_loss': test_loss, |
|
'test_accuracy': accurancy_mean |
|
} |
|
|
|
|
|
async def infer_auth( |
|
speech_file_path: str = 'sample.wav', |
|
model_name: str = 'fbanks-net-auth', |
|
model_layers : int = 4, |
|
name_speaker: str = 'Hưng Phạm', |
|
threshold: float = 0.1, |
|
labId: str = '', |
|
): |
|
speaker_path = f'./modelDir/{labId}/speaker/' |
|
dir_ = speaker_path + name_speaker |
|
if not os.path.exists(dir_): |
|
return {'message': 'name speaker is not exist,please add speaker'} |
|
|
|
model_folder_path = f'./modelDir/{labId}/log_train/{model_name}/{model_layers}/' |
|
for file in os.listdir(model_folder_path): |
|
if file.endswith(".pth"): |
|
model_path = os.path.join(model_folder_path, file) |
|
if model_name == 'fbanks-net-auth': |
|
try: |
|
model = FBankCrossEntropyNetV2(num_layers=model_layers, reduction= "mean") |
|
cpkt = torch.load(model_path) |
|
model.load_state_dict(cpkt) |
|
model.to(device) |
|
except: |
|
print('cuda load is error') |
|
device = torch.device("cpu") |
|
model = FBankCrossEntropyNetV2(num_layers=model_layers,reduction= "mean") |
|
cpkt = torch.load(model_path) |
|
model.load_state_dict(cpkt) |
|
model.to(device) |
|
else: |
|
model = None |
|
return {"model not exist in lab"} |
|
|
|
fbanks = extract_fbanks(speech_file_path) |
|
embeddings = get_embeddings(fbanks, model) |
|
stored_embeddings = np.load( |
|
speaker_path + name_speaker + '/embeddings.npy') |
|
stored_embeddings = stored_embeddings.reshape((1, -1)) |
|
distances = get_cosine_distance(embeddings, stored_embeddings) |
|
print('mean distances', np.mean(distances), flush=True) |
|
positives = distances < threshold |
|
positives_mean = np.mean(positives) |
|
if positives_mean >= threshold: |
|
return { |
|
"positives_mean": positives_mean, |
|
"name_speaker": name_speaker, |
|
"auth": True, |
|
} |
|
else: |
|
return { |
|
"positives_mean": positives_mean, |
|
"name_speaker": name_speaker, |
|
"auth": False, |
|
} |
|
|
|
if __name__ == '__main__': |
|
result = train_auth() |
|
print(result) |