import os import time import argparse import torch import torchaudio import torchvision from torch.utils.data import Dataset, DataLoader from torch.utils.tensorboard import SummaryWriter import numpy as np from efficient_model import MobileNetGRUModel, EfficientNetCNNModel, SqueezeNetTransformerModel # Print library version information print(f"\033[92mINFO\033[0m: PyTorch version: {torch.__version__}") print(f"\033[92mINFO\033[0m: Torchaudio version: {torchaudio.__version__}") print(f"\033[92mINFO\033[0m: Torchvision version: {torchvision.__version__}") # Device selection device = torch.device( "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" ) print(f"\033[92mINFO\033[0m: Using device: {device}") # Hyperparameters (using the best configuration from search) batch_size = 4 epochs = 20 fc_hidden_size = 64 learning_rate = 0.0005 dropout_rate = 0.5 # Model save directory os.makedirs("./models/", exist_ok=True) class PreprocessedDataset(Dataset): def __init__(self, data_dir): self.data_dir = data_dir self.samples = [ os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith(".pt") ] def __len__(self): return len(self.samples) def __getitem__(self, idx): sample_path = self.samples[idx] mfcc, image, label = torch.load(sample_path) return mfcc.float(), image.float(), label def calculate_mae(outputs, labels): """Calculate Mean Absolute Error between outputs and labels""" return torch.abs(outputs - labels).mean().item() def evaluate_model(model, test_loader, criterion): model.eval() test_loss = 0.0 mae_sum = 0.0 all_predictions = [] all_labels = [] # For debugging debug_samples = [] with torch.no_grad(): for mfcc, image, label in test_loader: mfcc, image, label = mfcc.to(device), image.to(device), label.to(device) output = model(mfcc, image) label = label.view(-1, 1).float() # Store debug samples (handling batch dimension properly) if len(debug_samples) < 5: # Extract individual samples from the batch for i in range(min(len(output), 5 - len(debug_samples))): debug_samples.append((output[i][0].item(), label[i][0].item())) # Calculate MSE loss loss = criterion(output, label) test_loss += loss.item() # Calculate MAE mae = torch.abs(output - label).mean() mae_sum += mae.item() # Store predictions and labels for additional analysis all_predictions.extend(output.cpu().numpy()) all_labels.extend(label.cpu().numpy()) avg_loss = test_loss / len(test_loader) avg_mae = mae_sum / len(test_loader) # Convert to numpy arrays for easier analysis all_predictions = np.array(all_predictions).flatten() all_labels = np.array(all_labels).flatten() # Print debug samples print("\nDEBUG SAMPLES (Prediction, Label):") for i, (pred, label) in enumerate(debug_samples): print(f"Sample {i+1}: Prediction = {pred:.4f}, Label = {label:.4f}, Difference = {abs(pred-label):.4f}") return avg_loss, avg_mae, all_predictions, all_labels def train_model(model_type): try: # Create model based on type if model_type == "mobilenet_gru": model = MobileNetGRUModel( gru_hidden_size=32, gru_layers=1, fc_hidden_size=fc_hidden_size, dropout_rate=dropout_rate ).to(device) model_name = "MobileNetGRU" elif model_type == "efficientnet_cnn": model = EfficientNetCNNModel( fc_hidden_size=fc_hidden_size, dropout_rate=dropout_rate ).to(device) model_name = "EfficientNetCNN" elif model_type == "squeezenet_transformer": model = SqueezeNetTransformerModel( nhead=4, dim_feedforward=128, fc_hidden_size=fc_hidden_size, dropout_rate=dropout_rate ).to(device) model_name = "SqueezeNetTransformer" else: raise ValueError(f"Unknown model type: {model_type}") # Data loading data_dir = "./processed/" dataset = PreprocessedDataset(data_dir) n_samples = len(dataset) # Check label range all_labels = [] for i in range(min(10, len(dataset))): _, _, label = dataset[i] all_labels.append(label) print("\nLABEL RANGE CHECK:") print(f"Sample labels: {all_labels}") print(f"Min label: {min(all_labels)}, Max label: {max(all_labels)}") train_size = int(0.7 * n_samples) val_size = int(0.2 * n_samples) test_size = n_samples - train_size - val_size train_dataset, val_dataset, test_dataset = torch.utils.data.random_split( dataset, [train_size, val_size, test_size] ) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # Loss function and optimizer criterion = torch.nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) # TensorBoard writer = SummaryWriter(f"runs/{model_name}/") global_step = 0 print(f"\033[92mINFO\033[0m: Training {model_name} model for {epochs} epochs") print(f"\033[92mINFO\033[0m: Training samples: {len(train_dataset)}") print(f"\033[92mINFO\033[0m: Validation samples: {len(val_dataset)}") print(f"\033[92mINFO\033[0m: Test samples: {len(test_dataset)}") print(f"\033[92mINFO\033[0m: Batch size: {batch_size}") print(f"\033[92mINFO\033[0m: Learning rate: {learning_rate}") print(f"\033[92mINFO\033[0m: Dropout rate: {dropout_rate}") best_val_loss = float('inf') best_model_path = None # Calculate model size model_size = sum(p.numel() for p in model.parameters()) / 1e6 # in millions print(f"\033[92mINFO\033[0m: Model parameters: {model_size:.2f}M") # Training loop for epoch in range(epochs): print(f"\033[92mINFO\033[0m: Training epoch ({epoch+1}/{epochs})") model.train() running_loss = 0.0 running_mae = 0.0 n_batches = 0 start_time = time.time() try: for mfcc, image, label in train_loader: mfcc, image, label = mfcc.to(device), image.to(device), label.to(device) optimizer.zero_grad() output = model(mfcc, image) label = label.view(-1, 1).float() loss = criterion(output, label) loss.backward() optimizer.step() running_loss += loss.item() running_mae += calculate_mae(output, label) n_batches += 1 writer.add_scalar("Training/Loss", loss.item(), global_step) writer.add_scalar("Training/MAE", calculate_mae(output, label), global_step) global_step += 1 except Exception as e: print(f"\033[91mERR!\033[0m: {e}") epoch_time = time.time() - start_time # Validation phase model.eval() val_loss = 0.0 val_mae = 0.0 val_batches = 0 with torch.no_grad(): try: for mfcc, image, label in val_loader: mfcc, image, label = ( mfcc.to(device), image.to(device), label.to(device), ) output = model(mfcc, image) label = label.view(-1, 1).float() # Calculate loss loss = criterion(output, label) val_loss += loss.item() # Calculate MAE val_mae += calculate_mae(output, label) val_batches += 1 except Exception as e: print(f"\033[91mERR!\033[0m: {e}") avg_train_loss = running_loss / n_batches avg_train_mae = running_mae / n_batches avg_val_loss = val_loss / val_batches avg_val_mae = val_mae / val_batches # Record validation metrics writer.add_scalar("Validation/Loss", avg_val_loss, epoch) writer.add_scalar("Validation/MAE", avg_val_mae, epoch) print( f"Epoch [{epoch+1}/{epochs}], Time: {epoch_time:.2f}s, " f"Train Loss: {avg_train_loss:.4f}, Train MAE: {avg_train_mae:.4f}, " f"Val Loss: {avg_val_loss:.4f}, Val MAE: {avg_val_mae:.4f}" ) # Save model checkpoint timestamp = time.strftime("%Y%m%d-%H%M%S") model_path = f"models/{model_name}_model_{epoch+1}_{timestamp}.pt" torch.save(model.state_dict(), model_path) # Save the best model based on validation loss if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss best_model_path = model_path print(f"\033[92mINFO\033[0m: New best model saved with validation loss: {best_val_loss:.4f}") print( f"\033[92mINFO\033[0m: Model checkpoint epoch [{epoch+1}/{epochs}] saved: {model_path}" ) print(f"\033[92mINFO\033[0m: Training complete") # Load the best model for testing print(f"\033[92mINFO\033[0m: Loading best model from {best_model_path} for testing") model.load_state_dict(torch.load(best_model_path)) # Evaluate on test set test_loss, test_mae, predictions, labels = evaluate_model(model, test_loader, criterion) # Calculate additional metrics max_error = np.max(np.abs(predictions - labels)) min_error = np.min(np.abs(predictions - labels)) print("\n" + "="*50) print(f"TEST RESULTS FOR {model_name}:") print(f"Test Loss (MSE): {test_loss:.4f}") print(f"Mean Absolute Error: {test_mae:.4f}") print(f"Maximum Absolute Error: {max_error:.4f}") print(f"Minimum Absolute Error: {min_error:.4f}") # Add test results to TensorBoard writer.add_scalar("Test/MSE", test_loss, 0) writer.add_scalar("Test/MAE", test_mae, 0) writer.add_scalar("Test/Max_Error", max_error, 0) writer.add_scalar("Test/Min_Error", min_error, 0) # Create a histogram of absolute errors abs_errors = np.abs(predictions - labels) writer.add_histogram("Test/Absolute_Errors", abs_errors, 0) print("="*50) # Final summary print("\nTRAINING SUMMARY:") print(f"Model: {model_name}") print(f"Model Size: {model_size:.2f}M parameters") print(f"Best Validation Loss: {best_val_loss:.4f}") print(f"Final Test Loss: {test_loss:.4f}") print(f"Final Test MAE: {test_mae:.4f}") print(f"Best model saved at: {best_model_path}") writer.close() # Return metrics for comparison return { "model_name": model_name, "model_size": model_size, "val_loss": best_val_loss, "test_loss": test_loss, "test_mae": test_mae, "model_path": best_model_path } except Exception as e: print(f"\033[91mERR!\033[0m: Error training {model_type}: {e}") # Return a placeholder result return { "model_name": model_type, "model_size": 0, "val_loss": float('inf'), "test_loss": float('inf'), "test_mae": float('inf'), "model_path": None, "error": str(e) } def test_cpu_inference(model_path, model_type): """Test CPU inference speed for the given model""" # Create model based on type if model_type == "mobilenet_gru": model = MobileNetGRUModel( gru_hidden_size=32, gru_layers=1, fc_hidden_size=fc_hidden_size, dropout_rate=dropout_rate ) model_name = "MobileNetGRU" elif model_type == "efficientnet_cnn": model = EfficientNetCNNModel( fc_hidden_size=fc_hidden_size, dropout_rate=dropout_rate ) model_name = "EfficientNetCNN" elif model_type == "squeezenet_transformer": model = SqueezeNetTransformerModel( nhead=4, dim_feedforward=128, fc_hidden_size=fc_hidden_size, dropout_rate=dropout_rate ) model_name = "SqueezeNetTransformer" else: raise ValueError(f"Unknown model type: {model_type}") # Load model weights model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) model.eval() # Create dummy input dummy_mfcc = torch.randn(1, 10, 376) # Batch size 1, 10 time steps, 376 features dummy_image = torch.randn(1, 3, 224, 224) # Batch size 1, 3 channels, 224x224 image # Warm-up for _ in range(10): _ = model(dummy_mfcc, dummy_image) # Measure inference time num_runs = 100 start_time = time.time() for _ in range(num_runs): _ = model(dummy_mfcc, dummy_image) end_time = time.time() avg_time = (end_time - start_time) / num_runs print(f"\n{model_name} CPU Inference Time:") print(f"Average over {num_runs} runs: {avg_time*1000:.2f} ms") return avg_time if __name__ == "__main__": parser = argparse.ArgumentParser(description="Train and evaluate efficient models") parser.add_argument( "--model", type=str, choices=["mobilenet_gru", "efficientnet_cnn", "squeezenet_transformer", "all"], default="all", help="Model architecture to train" ) args = parser.parse_args() results = [] if args.model == "all": # Train all models for model_type in ["mobilenet_gru", "efficientnet_cnn", "squeezenet_transformer"]: print(f"\n\n{'='*50}") print(f"TRAINING {model_type.upper()}") print(f"{'='*50}\n") result = train_model(model_type) results.append(result) # Test CPU inference inference_time = test_cpu_inference(result["model_path"], model_type) result["inference_time"] = inference_time else: # Train specific model result = train_model(args.model) results.append(result) # Test CPU inference inference_time = test_cpu_inference(result["model_path"], args.model) result["inference_time"] = inference_time # Compare results print("\n\n" + "="*80) print("MODEL COMPARISON") print("="*80) print(f"{'Model':<25} {'Size (M)':<10} {'Val Loss':<10} {'Test Loss':<10} {'Test MAE':<10} {'CPU Time (ms)':<15}") print("-"*80) for result in results: print(f"{result['model_name']:<25} {result['model_size']:<10.2f} {result['val_loss']:<10.4f} " f"{result['test_loss']:<10.4f} {result['test_mae']:<10.4f} {result['inference_time']*1000:<15.2f}") print("="*80) # Find best model best_model = min(results, key=lambda x: x["test_mae"]) print(f"\nBEST MODEL: {best_model['model_name']}") print(f"Test MAE: {best_model['test_mae']:.4f}") print(f"CPU Inference Time: {best_model['inference_time']*1000:.2f} ms") print(f"Model Path: {best_model['model_path']}")