import os import random import joblib import librosa import numpy as np import pandas as pd from tqdm import tqdm from fastapi import APIRouter from datetime import datetime from datasets import load_dataset, Audio from sklearn.metrics import accuracy_score from .utils.evaluation import AudioEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info from dotenv import load_dotenv load_dotenv() router = APIRouter() DESCRIPTION = "Random Baseline" ROUTE = "/audio" def is_valid_duration(example): """ Filter function to remove samples with decoding errors. To be used with datasets.filter() """ return len(example["audio"]["array"]) > 0 def enhanced_dsp_pipeline(y, sr, n_fft=80, hop_length=40): """Extract enhanced audio features.""" features = {} # Normalize audio with a larger maximum value y = librosa.util.normalize(y, norm=np.inf) # Apply pre-emphasis to enhance high frequencies y_pre = librosa.effects.preemphasis(y, coef=0.97) # Compute spectrograms for both original and pre-emphasized signals D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length) D_pre = librosa.stft(y_pre, n_fft=n_fft, hop_length=hop_length) S = np.abs(D) S_pre = np.abs(D_pre) # Core spectral features from original signal features['centroid'] = librosa.feature.spectral_centroid(S=S, sr=sr).ravel() features['roloff'] = librosa.feature.spectral_rolloff(S=S, sr=sr, roll_percent=0.85).ravel() features['zcr'] = librosa.feature.zero_crossing_rate(y, frame_length=n_fft, hop_length=hop_length).ravel() features['rmse'] = librosa.feature.rms(S=S, frame_length=n_fft).ravel() features['flux'] = librosa.onset.onset_strength(y=y, sr=sr).ravel() # Additional features from pre-emphasized signal features['pre_centroid'] = librosa.feature.spectral_centroid(S=S_pre, sr=sr).ravel() features['pre_roloff'] = librosa.feature.spectral_rolloff(S=S_pre, sr=sr, roll_percent=0.85).ravel() features['pre_contrast'] = librosa.feature.spectral_contrast(S=S_pre, sr=sr, n_bands=2).ravel() # Bandwidth at different frequency cutoffs features['bandwidth_80'] = librosa.feature.spectral_bandwidth(S=S, sr=sr, p=0.8).ravel() features['bandwidth_90'] = librosa.feature.spectral_bandwidth(S=S, sr=sr, p=0.9).ravel() # Enhanced MFCC computation with more coefficients and focused frequency bands mfcc = librosa.feature.mfcc( y=y_pre, # Use pre-emphasized signal sr=sr, n_fft=n_fft*2, # Increased frequency resolution hop_length=hop_length, n_mfcc=20, # Increased from 13 to 20 fmin=50, # Focus on chainsaw frequency range fmax=2000, # Upper limit for chainsaw harmonics n_mels=40 # Increased mel bands ) # Compute deltas and double-deltas (acceleration coefficients) mfcc_delta = librosa.feature.delta(mfcc) mfcc_delta2 = librosa.feature.delta(mfcc, order=2) # Add static MFCC coefficients for idx, v_mfcc in enumerate(mfcc): features[f'mfcc_{idx}'] = v_mfcc.ravel() # Add delta coefficients for idx, v_delta in enumerate(mfcc_delta): features[f'mfcc_delta_{idx}'] = v_delta.ravel() # Add double-delta coefficients for idx, v_delta2 in enumerate(mfcc_delta2): features[f'mfcc_delta2_{idx}'] = v_delta2.ravel() # Calculate covariance between consecutive MFCC coefficients for i in range(mfcc.shape[0]-1): features[f'mfcc_cov_{i}_{i+1}'] = np.cov(mfcc[i], mfcc[i+1])[0,1] # Calculate statistics stats_dict = {} for k, v in features.items(): stats_dict[f'{k}_max'] = np.max(v) stats_dict[f'{k}_min'] = np.min(v) stats_dict[f'{k}_mean'] = np.mean(v) stats_dict[f'{k}_std'] = np.std(v) return stats_dict def segment_features(y, sr, segment_duration=0.5): """Extract features from audio segments.""" segment_length = int(segment_duration * sr) segments = [y[i:i + segment_length] for i in range(0, len(y), segment_length)] all_features = [] for segment in segments: if len(segment) >= segment_length // 2: features = enhanced_dsp_pipeline(segment, sr) all_features.append(features) if not all_features: return enhanced_dsp_pipeline(y, sr) # Aggregate features across segments aggregated_features = {} for key in all_features[0].keys(): values = [f[key] for f in all_features] aggregated_features[key] = np.mean(values) aggregated_features[f"{key}_var"] = np.var(values) return aggregated_features def process_dataset(dataset): """Process the dataset and prepare features.""" features = [] labels = [] for d in tqdm(dataset): y = d["audio"]["array"] label = d["label"] # Process original audio segment_feats = segment_features(y, sr=4000) features.append(segment_feats) labels.append(label) X = pd.DataFrame(features) y = np.array(labels) return X, y def evaluate_model(model, X_test, selected_features): """Evaluate model on test set.""" X_test_selected = X_test[selected_features] return model.predict(X_test_selected) @router.post(ROUTE, tags=["Audio Task"], description=DESCRIPTION) async def evaluate_audio(request: AudioEvaluationRequest): """ Evaluate audio classification for rainforest sound detection. Current Model: Random Baseline - Makes random predictions from the label space (0-1) - Used as a baseline for comparison """ # Get space info username, space_url = get_space_info() # Define the label mapping LABEL_MAPPING = { "chainsaw": 0, "environment": 1 } # Load and prepare the dataset # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate dataset = load_dataset(request.dataset_name,token=os.getenv("HF_TOKEN")) # Split dataset train_test = dataset["train"] test_dataset = dataset["test"] # Start tracking emissions tracker.start() tracker.start_task("inference") #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE CODE HERE # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. #-------------------------------------------------------------------------------------------- # Make random predictions (placeholder for actual model inference) test_dataset = test_dataset.filter(is_valid_duration) test_dataset = test_dataset.cast_column("audio", Audio(sampling_rate=4000)) X_test, true_labels = process_dataset(test_dataset) model = joblib.load('tasks/assets/chainsaw_model.joblib') selected_features = joblib.load('tasks/assets/selected_features.joblib') predictions = evaluate_model(model, X_test, selected_features) #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE STOPS HERE #-------------------------------------------------------------------------------------------- # Stop tracking emissions emissions_data = tracker.stop_task() # Calculate accuracy accuracy = accuracy_score(true_labels, predictions) # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "accuracy": float(accuracy), "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } return results