import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import shap from sklearn.feature_selection import chi2, SelectKBest, mutual_info_classif, f_classif from sklearn.metrics import accuracy_score, log_loss, confusion_matrix, f1_score, fbeta_score, roc_auc_score from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay, classification_report, precision_recall_curve from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold, cross_val_score, RandomizedSearchCV, StratifiedKFold from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, FunctionTransformer, SplineTransformer, PolynomialFeatures from sklearn.decomposition import PCA from sklearn.linear_model import LogisticRegression from sklearn.ensemble import HistGradientBoostingClassifier, GradientBoostingClassifier # from lightgbm import LGBMClassifier from sklearn.base import BaseEstimator, TransformerMixin, clone from sklearn.utils.validation import check_is_fitted from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline, make_pipeline from sklearn.compose import ColumnTransformer, make_column_transformer from lib.transform_data import * class ClassifierStudy(): """ A class that contains tools for studying a classifier pipeline Parameters: ----------- classifier : a scikit-learn compatible binary classifier X : pd.DataFrame dataframe of features y : pd.Series series of binary target values corresponding to X classifier_name : str or None if provided, will use as classifier name in pipeline if not, will use 'clf' as name features : dict a dictionary whose keys are the feature types 'cyc','cat','ord','num','bin' and whose values are lists of features of each type. Methods: ------- set_data, set_features, set_state sets or resets attributes of self build_pipeline builds out pipeline based on supplied specs cv_score runs k-fold cross validation and reports scores randomized_search runs randomized search with cross validation and reports results fit_pipeline fits the model pipeline and stores as self.pipe_fitted predict_proba_pipeline uses a fitted pipeline to compute predicted probabilities for test or validation set score_pipeline scores predicted probabilities """ def __init__(self, classifier=None, X = None, y = None, features = None,classifier_name = None, random_state=42): self.classifier = classifier if X is not None: self.X = X.copy() if y is not None: self.y = y.copy() if features is not None: self.features = features.copy() self.random_state=random_state self.pipe, self.pipe_fitted = None, None self.classifier_name = classifier_name self.X_val, self.y_val = None, None self.y_predict_proba = None self.best_params, self.best_n_components = None, None self.shap_vals = None def set_data(self,X=None,y=None): """Method to set or reset feature and/or target data""" if X is not None: self.X = X.copy() if y is not None: self.y = y.copy() def set_features(self,features): """Method to set or reset the feature dictionary""" if features is not None: self.features = features.copy() def set_state(self,random_state): """Method to set or reset the random_state""" self.random_state = random_state def build_pipeline(self, cat_method = 'onehot',cyc_method = 'spline',num_ss=True, over_sample = False, pca=False,n_components=None, select_features = False,score_func=None,k='all', poly_features = False, degree=2, interaction_only=False): """ Method to build the model pipeline Parameters: ----------- cat_method : str specifies whether to encode categorical variables as one-hot vectors or ordinals must be either 'onehot' or 'ord' cyc_method : str specifies whether to encode cyclical features with sine/cosine encoding or periodic splines must be one of 'trig', 'spline', 'interact-trig', 'interact-spline','onehot', 'ord', or None - If 'trig' or 'spline', will set up periodic encoder with desired method - If 'onehot' or 'ord', will set up appropriate categorical encoder - If 'interact-{method}', will use encoding for HOUR_OF_DAY, encode DAY_OF_WEEK as a binary feature expressing whether the day is a weekend day, and then include interaction features among this set via PolynomialFeatures. - If None, will leave out cyclical features altogether num_ss : bool Whether or not to apply StandardScaler on the numerical features over_sample : bool set to True to include imblearn.over_sampling.RandomOverSampler step pca : bool set to True to include sklearn.decomposition.PCA step n_components : int or None number of components for sklearn.decomposition.PCA select_features : bool set to True to include sklearn.feature_selection.SelectKBest step score_func : callable score function to use for sklearn.feature_selection.SelectKBest recommended: chi2, f_classif, or mutual_info_classif k : int or 'all' number of features for sklearn.feature_selection.SelectKBest poly_features : bool set to True to include sklearn.preprocessing.PolynomialFeatures step degree : int max degree for sklearn.preprocessing.PolynomialFeatures interaction_only : bool whether or not sklearn.preprocessing.PolynomialFeatures will be limited to interaction terms only """ # Define transformer for categorical features if cat_method == 'onehot': cat_encoder = ('ohe',OneHotEncoder(handle_unknown='infrequent_if_exist')) elif cat_method == 'ord': cat_encoder = ('oe',OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=np.nan)) else: raise ValueError("cat_method must be either 'onehot' or 'ord'") cat_transform = Pipeline([('si',SimpleImputer(strategy='most_frequent')),cat_encoder]) # Define transformer for cyclic features cyc_dict = {'HOUR_OF_DAY':24,'DAY_OF_WEEK':7} if cyc_method == 'trig': cyc_transform = [(f'{feat}_cos',cos_transformer(cyc_dict[feat]),[feat]) for feat in self.features['cyc']]+\ [(f'{feat}_sin',sin_transformer(cyc_dict[feat]),[feat]) for feat in self.features['cyc']] elif cyc_method =='spline': cyc_transform = [(f'{feat}_cyclic', periodic_spline_transformer(cyc_dict[feat],n_splines=cyc_dict[feat]//2), [feat]) for feat in self.features['cyc']] elif cyc_method == 'onehot': cyc_encoder = ('ohe_cyc',OneHotEncoder(handle_unknown='infrequent_if_exist')) cyc_transform = [('cyc',Pipeline([cyc_encoder]),self.features['cyc'])] elif cyc_method == 'ord': cyc_encoder = ('oe_cyc',OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=np.nan)) cyc_transform = [('cyc',Pipeline([cyc_encoder]),self.features['cyc'])] elif cyc_method == 'interact-spline': hour_transform = (f'hour_cyc',periodic_spline_transformer(cyc_dict['HOUR_OF_DAY'],n_splines=12),['HOUR_OF_DAY']) wkend_transform = ('wkend',FunctionTransformer(lambda x: (x.isin([1,7])).astype(int)),['DAY_OF_WEEK']) cyc_transform = [('cyc',Pipeline([('cyc_col',ColumnTransformer([hour_transform, wkend_transform], remainder='drop',verbose_feature_names_out=False)), ('cyc_poly',PolynomialFeatures(degree=2,interaction_only=True, include_bias=False))]), self.features['cyc'])] elif cyc_method == 'interact-trig': hour_transform = [(f'HOUR_cos',cos_transformer(cyc_dict['HOUR_OF_DAY']),['HOUR_OF_DAY']), (f'HOUR_sin',sin_transformer(cyc_dict['HOUR_OF_DAY']),['HOUR_OF_DAY'])] wkend_transform = ('wkend',FunctionTransformer(lambda x: (x.isin([1,7])).astype(int)),['DAY_OF_WEEK']) cyc_transform = [('cyc',Pipeline([('cyc_col',ColumnTransformer(hour_transform+[wkend_transform], remainder='drop',verbose_feature_names_out=False)), ('cyc_poly',PolynomialFeatures(degree=2,interaction_only=True, include_bias=False))]), self.features['cyc'])] elif cyc_method is None: cyc_transform = [('cyc','passthrough',[])] else: raise ValueError("cyc_method must be one of 'trig','spline','interact','onehot','ord',or None") # Define numerical transform num_transform = ('num',StandardScaler(),self.features['num']) if num_ss else\ ('num','passthrough',self.features['num']) # Define column transformer col_transform = ColumnTransformer([('cat',cat_transform,self.features['cat']), ('ord','passthrough',self.features['ord']), num_transform, ('bin',SimpleImputer(strategy='most_frequent'), self.features['bin'])]+\ cyc_transform, remainder='drop',verbose_feature_names_out=False) steps = [('col',col_transform)] if 'AGE' in self.features['num']: steps.insert(0,('gi_age',GroupImputer(target = 'AGE', group_cols=['COUNTY'],strategy='median'))) if 'HOUR_OF_DAY' in self.features['cyc']: steps.insert(0,('gi_hour',GroupImputer(target = 'HOUR_OF_DAY', group_cols=['ILLUMINATION','CRASH_MONTH'],strategy='mode'))) # Insert optional steps as needed if over_sample: steps.insert(0,('os',RandomOverSampler(random_state=self.random_state))) if poly_features: steps.append(('pf',PolynomialFeatures(degree=degree,interaction_only=interaction_only))) if select_features: steps.append(('fs',SelectKBest(score_func = score_func, k = k))) if pca: steps.append(('pca',PCA(n_components=n_components,random_state=self.random_state))) # Append classifier if provided if self.classifier is not None: if self.classifier_name is not None: steps.append((f'{self.classifier_name}_clf',self.classifier)) else: steps.append(('clf',self.classifier)) # Initialize pipeline self.pipe = Pipeline(steps) def cv_score(self, scoring = 'roc_auc', n_splits = 5, n_repeats=3, thresh = 0.5, beta = 1, return_mean_score=False,print_mean_score=True,print_scores=False, n_jobs=-1, eval_size=0.1,eval_metric='auc'): """ Method for performing cross validation via RepeatedStratifiedKFold Parameters: ----------- scoring : str scoring function to use. must be one of 'roc_auc','acc','f1','','f1w' thresh : float the classification threshold for computing y_pred from y_pred_proba beta : float the beta-value to use in the f_beta score, if chosen n_splits, n_repeats : int, int number of splits and number of repeat iterations for sklearn.model_selection.RepeatedStratifiedKFold return_mean_score : bool whether or not to return the mean score print_mean_score : bool whether to print out a report of the mean score print_scores : bool whether to print out a report of CV scores for all folds n_jobs : int or None number of CPU cores to use for parallel processing -1 uses all available cores, and None defaults to 1 eval_size : float Fraction of the training set to use for early stopping eval set eval_metric : str eval metric to use in early stopping Returns: None or mean_score, depending on return_mean_score setting -------- """ assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.' assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' assert scoring in ['roc_auc','acc','f1','fb','f1w'],"scoring must be one of 'roc_auc','acc','f1','fb','f1w'" # Initialize CV iterator kf = RepeatedStratifiedKFold(n_splits = n_splits, n_repeats=n_repeats, random_state=self.random_state) # Restrict to features supplied in self.features X = self.X[[feat for feat_type in self.features for feat in self.features[feat_type]]] lgb_es=False # if isinstance(self.pipe[-1],LGBMClassifier): # if 'early_stopping_round' in self.pipe[-1].get_params(): # if self.pipe[-1].get_params()['early_stopping_rounds'] is not None: # lgb_es=True scores = [] # Iterate over folds and train, predict, score for i,(train_idx,test_idx) in enumerate(kf.split(X,self.y)): fold_X_train = X.iloc[train_idx,:] fold_X_test = X.iloc[test_idx,:] fold_y_train = self.y.iloc[train_idx] fold_y_test = self.y.iloc[test_idx] pipe=clone(self.pipe) if lgb_es: fold_X_train,fold_X_es,fold_y_train,fold_y_es = train_test_split(fold_X_train,fold_y_train, stratify=fold_y_train,test_size=eval_size, random_state=self.random_state) trans_pipe = pipe[:-1] trans_pipe.fit_transform(fold_X_train) fold_X_es = trans_pipe.transform(fold_X_es) clf_name = pipe.steps[-1][0] fit_params = {f'{clf_name}__eval_set':[(fold_X_es,fold_y_es)], f'{clf_name}__eval_metric':eval_metric, f'{clf_name}__verbose':0} else: fit_params = {} pipe.fit(fold_X_train,fold_y_train,**fit_params) fold_y_pred_proba = pipe.predict_proba(fold_X_test)[:,1] if scoring == 'roc_auc': fold_score = roc_auc_score(fold_y_test, fold_y_pred_proba) else: fold_y_pred = (fold_y_pred_proba >= thresh).astype('int') if scoring == 'acc': fold_score = accuracy_score(fold_y_test,fold_y_pred) elif scoring == 'f1': fold_score = f1_score(fold_y_test,fold_y_pred) elif scoring == 'f1w': fold_score = f1_score(fold_y_test,fold_y_pred,average='weighted') else: fold_score = fbeta_score(fold_y_test,fold_y_pred,beta=beta) scores.append(fold_score) # Average and report mean_score = np.mean(scores) if print_scores: print(f'CV scores using {scoring} score: {scores} \nMean score: {mean_score}') if print_mean_score: print(f'Mean CV {scoring} score: {mean_score}') if return_mean_score: return mean_score def randomized_search(self, params, n_components = None, n_iter=10, scoring='roc_auc',cv=5,refit=False,top_n=10, n_jobs=-1): """ Method for performing randomized search with cross validation on a given dictionary of parameter distributions Also displays a table of results the best top_n iterations Parameters: ---------- params : dict parameter distributions to use for RandomizedSearchCV n_components : int, or list, or None number of components for sklearn.decomposition.PCA - if int, will reset the PCA layer in self.pipe with provided value - if list, must be list of ints, which will be included in RandomizedSearchCV parameter distribution scoring : str scoring function for sklearn.model_selection.cross_val_score n_iter : int number of iterations to use in RandomizedSearchCV refit : bool whether to refit a final classifier with best parameters - if False, will only set self.best_params and self.best_score - if True, will set self.best_estimator in addition top_n : int or None if int, will display results from top_n best iterations only if None, will display all results n_jobs : int or None number of CPU cores to use for parallel processing -1 uses all available cores, and None defaults to 1 """ assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.' assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' assert (n_components is None)|('pca' in self.pipe.named_steps), 'Your pipeline has no PCA step. Build a pipeline with PCA first.' assert (len(params)>0)|(type(n_components)==list), 'Either pass a parameter distribution or a list of n_components values.' # Add estimator name prefix to hyperparams params = {self.pipe.steps[-1][0]+'__'+key:params[key] for key in params} # Process supplied n_components if type(n_components)==list: params['pca__n_components']=n_components elif type(n_components)==int: self.pipe['pca'].set_params(n_components=n_components) # Restrict to features supplied in self.features X = self.X[[feat for feat_type in self.features for feat in self.features[feat_type]]] # Initialize rs and fit rs = RandomizedSearchCV(self.pipe, param_distributions = params, n_iter=n_iter, scoring = scoring, cv = cv,refit=refit, random_state=self.random_state, n_jobs=n_jobs) rs.fit(X,self.y) # Display top n scores results = rs.cv_results_ results_df = pd.DataFrame(results['params']) param_names = list(results_df.columns) results_df[f'mean cv score ({scoring})']=pd.Series(results['mean_test_score']) results_df = results_df.set_index(param_names).sort_values(by=f'mean cv score ({scoring})',ascending=False) if top_n is not None: display(results_df.head(top_n).style\ .highlight_max(axis=0, props='color:white; font-weight:bold; background-color:seagreen;')) else: display(results_df.style\ .highlight_max(axis=0, props='color:white; font-weight:bold; background-color:seagreen;')) if refit: self.best_estimator = rs.best_estimator_ best_params = rs.best_params_ self.best_params = {key.split('__')[-1]:best_params[key] for key in best_params if key.split('__')[0]!='pca'} self.best_n_components = next((best_params[key] for key in best_params if key.split('__')[0]=='pca'), None) self.best_score = rs.best_score_ def fit_pipeline(self,split_first=False, eval_size=0.1,eval_metric='auc'): """ Method for fitting self.pipeline on self.X,self.y Parameters: ----------- split_first : bool if True, a train_test_split will be performed first and the validation set will be stored early_stopping : bool Indicates whether we will use early_stopping for lightgbm. If true, will split off an eval set prior to k-fold split eval_size : float Fraction of the training set to use for early stopping eval set eval_metric : str eval metric to use in early stopping """ # Need pipe and X to fit assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' assert self.X is not None, 'X does not exist. First set X.' # If no y provided, then no pipeline steps should require y step_list = [step[0] for step in self.pipe.steps] assert (('clf' not in step_list[-1])&('kf' not in step_list))|(self.y is not None), 'You must provide targets y if pipeline has a classifier step or feature selection step.' # Don't need to do a train-test split without a classifier assert (split_first==False)|('clf' in step_list[-1]), 'Only need train-test split if you have a classifier.' if split_first: X_train,X_val,y_train,y_val = train_test_split(self.X,self.y,stratify=self.y, test_size=0.2,random_state=self.random_state) self.X_val = X_val self.y_val = y_val else: X_train = self.X.copy() if self.y is not None: y_train = self.y.copy() # Restrict to features supplied in self.features X_train = X_train[[feat for feat_type in self.features for feat in self.features[feat_type]]] # If LGBM early stopping, then need to split off eval_set and define fit_params # if isinstance(self.pipe[-1],LGBMClassifier): # if self.pipe[-1].get_params()['early_stopping_rounds'] is not None: # X_train,X_es,y_train,y_es = train_test_split(X_train,y_train, # test_size=eval_size, # stratify=y_train, # random_state=self.random_state) # trans_pipe = self.pipe[:-1] # trans_pipe.fit_transform(X_train) # X_es = trans_pipe.transform(X_es) # clf_name = self.pipe.steps[-1][0] # fit_params = {f'{clf_name}__eval_set':[(X_es,y_es)], # f'{clf_name}__eval_metric':eval_metric, # f'{clf_name}__verbose':0} # else: # fit_params = {} # else: # fit_params = {} fit_params = {} # Fit and store fitted pipeline. If no classifier, fit_transform X_train and store transformed version pipe = self.pipe if 'clf' in step_list[-1]: pipe.fit(X_train,y_train,**fit_params) else: X_transformed = pipe.fit_transform(X_train) # X_transformed = pd.DataFrame(X_transformed,columns=pipe[-1].get_column_names_out()) self.X_transformed = X_transformed self.pipe_fitted = pipe def predict_proba_pipeline(self, X_test = None): """ Method for using a fitted pipeline to compute predicted probabilities for X_test (if supplied) or self.X_val Parameters: ----------- X_test : pd.DataFrame or None test data input features (if None, will use self.X_val) """ assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' assert self.pipe_fitted is not None, 'Pipeline is not fitted. First fit pipeline using fit_pipeline.' assert (X_test is not None)|(self.X_val is not None), 'Must either provide X_test and y_test or fit the pipeline with split_first=True.' if X_test is None: X_test = self.X_val # Restrict to features supplied in self.features X_test = X_test[[feat for feat_type in self.features for feat in self.features[feat_type]]] # Save prediction self.y_predict_proba = self.pipe_fitted.predict_proba(X_test)[:,1] def score_pipeline(self,y_test=None,scoring='roc_auc',thresh=0.5, beta = 1, normalize = None, print_score = True): """ Method for scoring self.pipe_fitted on supplied test data and reporting score Parameters: ----------- y_test : pd.Series or None true binary targets (if None, will use self.y_val) scoring : str specifies the metric to use for scoring must be one of 'roc_auc', 'roc_plot', 'acc', 'f1', 'f1w', 'fb','mcc','kappa','conf','classif_report' thresh : float threshhold value for computing y_pred from y_predict_proba beta : float the beta parameter in the fb score normalize : str or None the normalize parameter for the confusion_matrix. must be one of 'true','pred','all',None print_score : bool if True, will print a message reporting the score if False, will return the score as a float """ assert (y_test is not None)|(self.y_val is not None), 'Must either provide X_test and y_test or fit the pipeline with split_first=True.' assert self.y_predict_proba is not None, 'Predicted probabilities do not exist. Run predict_proba_pipeline first.' if y_test is None: y_test = self.y_val # Score and report if scoring == 'roc_plot': fig = plt.figure(figsize=(4,4)) ax = fig.add_subplot(111) RocCurveDisplay.from_predictions(y_test,self.y_predict_proba,ax=ax) plt.show() elif scoring == 'roc_auc': score = roc_auc_score(y_test, self.y_predict_proba) else: y_pred = (self.y_predict_proba >= thresh).astype('int') if scoring == 'acc': score = accuracy_score(y_test,y_pred) elif scoring == 'f1': score = f1_score(y_test,y_pred) elif scoring == 'f1w': score = f1_score(y_test,y_pred,average='weighted') elif scoring == 'fb': score = fbeta_score(y_test,y_pred,beta=beta) elif scoring == 'mcc': score = matthews_coffcoeff(y_test,y_pred) elif scoring == 'kappa': score = cohen_kappa_score(y_test,y_pred) elif scoring == 'conf': fig = plt.figure(figsize=(3,3)) ax = fig.add_subplot(111) ConfusionMatrixDisplay.from_predictions(y_test,y_pred,ax=ax,colorbar=False) plt.show() elif scoring == 'classif_report': target_names=['neither seriously injured nor killed','seriously injured or killed'] print(classification_report(y_test, y_pred,target_names=target_names)) else: raise ValueError("scoring must be one of 'roc_auc', 'roc_plot','acc', 'f1', 'f1w', 'fb','mcc','kappa','conf','classif_report'") if scoring not in ['conf','roc_plot','classif_report']: if print_score: print(f'The {scoring} score is: {score}') else: return score def shap_values(self, X_test = None, eval_size=0.1,eval_metric='auc'): """ Method for computing and SHAP values for features stratifiedtrain/test split A copy of self.pipe is fitted on the training set and then SHAP values are computed on test set samples Parameters: ----------- X_test : pd.DataFrame The test set; if provided, will not perform a train/test split before fitting eval_size : float Fraction of the training set to use for early stopping eval set eval_metric : str eval metric to use in early stopping Returns: None (stores results in self.shap_vals) -------- """ assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.' assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' # Clone pipeline, do train/test split if X_test not provided pipe = clone(self.pipe) X_train = self.X.copy() y_train = self.y.copy() if X_test is None: X_train,X_test,y_train,y_test = train_test_split(X_train,y_train,stratify=y_train, test_size=0.2,random_state=self.random_state) # Restrict to features provided in self.features, and fit X_train = X_train[[feat for feat_type in self.features for feat in self.features[feat_type]]] X_test = X_test[[feat for feat_type in self.features for feat in self.features[feat_type]]] # If LGBM early stopping, then need to split off eval_set and define fit_params # if isinstance(self.pipe[-1],LGBMClassifier): # if 'early_stopping_round' in self.pipe[-1].get_params(): # if self.pipe[-1].get_params()['early_stopping_rounds'] is not None: # X_train,X_es,y_train,y_es = train_test_split(X_train,y_train, # test_size=eval_size, # stratify=y_train, # random_state=self.random_state) # trans_pipe = self.pipe[:-1] # trans_pipe.fit_transform(X_train) # X_es = trans_pipe.transform(X_es) # clf_name = self.pipe.steps[-1][0] # fit_params = {f'{clf_name}__eval_set':[(X_es,y_es)], # f'{clf_name}__eval_metric':eval_metric, # f'{clf_name}__verbose':0} # else: # fit_params = {} # else: # fit_params = {} fit_params = {} pipe.fit(X_train,y_train,**fit_params) # SHAP will just explain classifier, so need transformed X_train and X_test X_train_trans, X_test_trans = pipe[:-1].transform(X_train), pipe[:-1].transform(X_test) # Need masker for linear model masker = shap.maskers.Independent(data=X_train_trans) # Initialize explainer and compute and store SHAP values as an explainer object explainer = shap.Explainer(pipe[-1], masker = masker, feature_names = pipe['col'].get_feature_names_out()) self.shap_vals = explainer(X_test_trans) self.X_shap = X_train_trans self.y_shap = y_train def shap_plot(self,max_display='all'): """ Method for generating plots of SHAP value results SHAP values should be already computed previously Generates two plots side by side: - a beeswarm plot of SHAP values of all samples - a barplot of mean absolute SHAP values Parameters: ----------- max_display : int or 'all' The number of features to show in the plot, in descending order by mean absolute SHAP value. If 'all', then all features will be included. Returns: None (plots displayed) -------- """ assert self.shap_vals is not None, 'No shap values exist. First compute shap values.' assert (isinstance(max_display,int))|(max_display=='all'), "'max_display' must be 'all' or an integer" if max_display=='all': title_add = ', all features' max_display = self.shap_vals.shape[1] else: title_add = f', top {max_display} features' # Plot fig=plt.figure() ax1=fig.add_subplot(121) shap.summary_plot(self.shap_vals,plot_type='bar',max_display=max_display, show=False,plot_size=0.2) ax2=fig.add_subplot(122) shap.summary_plot(self.shap_vals,plot_type='violin',max_display=max_display, show=False,plot_size=0.2) fig.set_size_inches(12,max_display/3) ax1.set_title(f'Mean absolute SHAP values'+title_add,fontsize='small') ax1.set_xlabel('mean(|SHAP value|)',fontsize='x-small') ax2.set_title(f'SHAP values'+title_add,fontsize='small') ax2.set_xlabel('SHAP value', fontsize='x-small') for ax in [ax1,ax2]: ax.set_ylabel('feature name',fontsize='x-small') ax.tick_params(axis='y', labelsize='xx-small') plt.tight_layout() plt.show() def find_best_threshold(self,beta=1,conf=True,report=True, print_result=True): """ Computes the classification threshold which gives the best F_beta score from classifier predictions, prints the best threshold and the corresponding F_beta score, and displays a confusion matrix and classification report corresponding to that threshold Parameters: ----------- beta : float the desired beta value in the F_beta score conf : bool whether to display confusion matrix report : bool whether to display classification report print_result : bool whether to print a line reporting the best threshold and resulting F_beta score Returns: None (prints results and stores self.best_thresh) -------- """ prec,rec,threshs = precision_recall_curve(self.y_val, self.y_predict_proba) F_betas = (1+beta**2)*(prec*rec)/((beta**2*prec)+rec) # Above formula is valid when TP!=0. When TP==0 # it gives np.nan whereas F_beta should be 0 F_betas = np.nan_to_num(F_betas) idx = np.argmax(F_betas) best_thresh = threshs[idx] if print_result: print(f'Threshold optimizing F_{beta} score: {best_thresh}\nBest F_{beta} score: {F_betas[idx]}') if conf: self.score_pipeline(scoring='conf',thresh=best_thresh,beta=beta) if report: self.score_pipeline(scoring='classif_report',thresh=best_thresh,beta=beta) self.best_thresh = best_thresh class LRStudy(ClassifierStudy): """ A child class of ClassifierStudy which has an additional method specific to logistic regression """ def __init__(self, classifier=None, X = None, y = None, features=None,classifier_name = 'LR', random_state=42): super().__init__(classifier, X, y,features,classifier_name,random_state) def plot_coeff(self, print_score = True, print_zero = False, title_add=None): """ Method for doing a train/validation split, fitting the classifier, predicting and scoring on the validation set, and plotting a bar chart of the logistic regression coefficients corresponding to various model features. Features with coefficient zero and periodic spline features will be excluded from the chart. Parameters: ----------- print_score : bool if True, the validation score are printed print_zero : bool if True, the list of features with zero coefficients are printed title_add : str or None an addendum that is added to the end of the plot title """ assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' assert isinstance(self.classifier,LogisticRegression),'Your classifier is not an instance of Logistic Regression.' # fit and score self.fit_pipeline(split_first = True) self.predict_proba_pipeline() score = roc_auc_score(self.y_val, self.y_predict_proba) # Retrieve coeff values from fitted pipeline coeff = pd.DataFrame({'feature name':self.pipe_fitted['col'].get_feature_names_out(), 'coeff value':self.pipe_fitted[-1].coef_.reshape(-1)})\ .sort_values(by='coeff value') coeff = coeff[~coeff['feature name']\ .isin([f'HOUR_OF_DAY_sp_{n}' for n in range(12)]\ +[f'DAY_OF_WEEK_sp_{n}' for n in range(3)])]\ .set_index('feature name') coeff_zero_features = coeff[coeff['coeff value']==0].index coeff = coeff[coeff['coeff value']!=0] # Plot feature coefficients fig = plt.figure(figsize=(30,4)) ax = fig.add_subplot(111) coeff['coeff value'].plot(kind='bar',ylabel='coeff value',ax=ax) ax.axhline(y=0, color= 'red', linewidth=2,) plot_title = 'PA bicycle collisions, 2002-2021\nLogistic regression model log-odds coefficients' if title_add is not None: plot_title += f': {title_add}' ax.set_title(plot_title) ax.tick_params(axis='x', labelsize='x-small') plt.show() if print_score: print(f'Score on validation set: {score}') if print_zero: print(f'Features with zero coefficients in trained model: {list(coeff_zero)}') self.score = score self.coeff = coeff self.coeff_zero_features = coeff_zero_features