BikeSaferPA / lib /study_classif.py
etweedy's picture
Upload 22 files
5d396e9
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import shap
from sklearn.feature_selection import chi2, SelectKBest, mutual_info_classif, f_classif
from sklearn.metrics import accuracy_score, log_loss, confusion_matrix, f1_score, fbeta_score, roc_auc_score
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay, classification_report, precision_recall_curve
from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold, cross_val_score, RandomizedSearchCV, StratifiedKFold
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, FunctionTransformer, SplineTransformer, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import HistGradientBoostingClassifier, GradientBoostingClassifier
# from lightgbm import LGBMClassifier
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.utils.validation import check_is_fitted
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.compose import ColumnTransformer, make_column_transformer
from lib.transform_data import *
class ClassifierStudy():
"""
A class that contains tools for studying a classifier pipeline
Parameters:
-----------
classifier : a scikit-learn compatible binary classifier
X : pd.DataFrame
dataframe of features
y : pd.Series
series of binary target values corresponding to X
classifier_name : str or None
if provided, will use as classifier name in pipeline
if not, will use 'clf' as name
features : dict
a dictionary whose keys are the feature types
'cyc','cat','ord','num','bin' and whose values
are lists of features of each type.
Methods:
-------
set_data, set_features, set_state
sets or resets attributes of self
build_pipeline
builds out pipeline based on supplied specs
cv_score
runs k-fold cross validation and reports scores
randomized_search
runs randomized search with cross validation
and reports results
fit_pipeline
fits the model pipeline and stores as
self.pipe_fitted
predict_proba_pipeline
uses a fitted pipeline to compute predicted
probabilities for test or validation set
score_pipeline
scores predicted probabilities
"""
def __init__(self, classifier=None, X = None, y = None,
features = None,classifier_name = None,
random_state=42):
self.classifier = classifier
if X is not None:
self.X = X.copy()
if y is not None:
self.y = y.copy()
if features is not None:
self.features = features.copy()
self.random_state=random_state
self.pipe, self.pipe_fitted = None, None
self.classifier_name = classifier_name
self.X_val, self.y_val = None, None
self.y_predict_proba = None
self.best_params, self.best_n_components = None, None
self.shap_vals = None
def set_data(self,X=None,y=None):
"""Method to set or reset feature and/or target data"""
if X is not None:
self.X = X.copy()
if y is not None:
self.y = y.copy()
def set_features(self,features):
"""Method to set or reset the feature dictionary"""
if features is not None:
self.features = features.copy()
def set_state(self,random_state):
"""Method to set or reset the random_state"""
self.random_state = random_state
def build_pipeline(self, cat_method = 'onehot',cyc_method = 'spline',num_ss=True,
over_sample = False, pca=False,n_components=None,
select_features = False,score_func=None,k='all',
poly_features = False, degree=2, interaction_only=False):
"""
Method to build the model pipeline
Parameters:
-----------
cat_method : str
specifies whether to encode categorical
variables as one-hot vectors or ordinals
must be either 'onehot' or 'ord'
cyc_method : str
specifies whether to encode cyclical features
with sine/cosine encoding or periodic splines
must be one of 'trig', 'spline', 'interact-trig',
'interact-spline','onehot', 'ord', or None
- If 'trig' or 'spline', will set up periodic encoder
with desired method
- If 'onehot' or 'ord', will set up appropriate
categorical encoder
- If 'interact-{method}', will use <method> encoding for HOUR_OF_DAY,
encode DAY_OF_WEEK as a binary feature expressing whether
the day is a weekend day, and then include interaction
features among this set via PolynomialFeatures.
- If None, will leave out cyclical features altogether
num_ss : bool
Whether or not to apply StandardScaler on the numerical features
over_sample : bool
set to True to include imblearn.over_sampling.RandomOverSampler step
pca : bool
set to True to include sklearn.decomposition.PCA step
n_components : int or None
number of components for sklearn.decomposition.PCA
select_features : bool
set to True to include sklearn.feature_selection.SelectKBest step
score_func : callable
score function to use for sklearn.feature_selection.SelectKBest
recommended: chi2, f_classif, or mutual_info_classif
k : int or 'all'
number of features for sklearn.feature_selection.SelectKBest
poly_features : bool
set to True to include sklearn.preprocessing.PolynomialFeatures step
degree : int
max degree for sklearn.preprocessing.PolynomialFeatures
interaction_only : bool
whether or not sklearn.preprocessing.PolynomialFeatures will be limited
to interaction terms only
"""
# Define transformer for categorical features
if cat_method == 'onehot':
cat_encoder = ('ohe',OneHotEncoder(handle_unknown='infrequent_if_exist'))
elif cat_method == 'ord':
cat_encoder = ('oe',OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=np.nan))
else:
raise ValueError("cat_method must be either 'onehot' or 'ord'")
cat_transform = Pipeline([('si',SimpleImputer(strategy='most_frequent')),cat_encoder])
# Define transformer for cyclic features
cyc_dict = {'HOUR_OF_DAY':24,'DAY_OF_WEEK':7}
if cyc_method == 'trig':
cyc_transform = [(f'{feat}_cos',cos_transformer(cyc_dict[feat]),[feat]) for feat in self.features['cyc']]+\
[(f'{feat}_sin',sin_transformer(cyc_dict[feat]),[feat]) for feat in self.features['cyc']]
elif cyc_method =='spline':
cyc_transform = [(f'{feat}_cyclic',
periodic_spline_transformer(cyc_dict[feat],n_splines=cyc_dict[feat]//2),
[feat]) for feat in self.features['cyc']]
elif cyc_method == 'onehot':
cyc_encoder = ('ohe_cyc',OneHotEncoder(handle_unknown='infrequent_if_exist'))
cyc_transform = [('cyc',Pipeline([cyc_encoder]),self.features['cyc'])]
elif cyc_method == 'ord':
cyc_encoder = ('oe_cyc',OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=np.nan))
cyc_transform = [('cyc',Pipeline([cyc_encoder]),self.features['cyc'])]
elif cyc_method == 'interact-spline':
hour_transform = (f'hour_cyc',periodic_spline_transformer(cyc_dict['HOUR_OF_DAY'],n_splines=12),['HOUR_OF_DAY'])
wkend_transform = ('wkend',FunctionTransformer(lambda x: (x.isin([1,7])).astype(int)),['DAY_OF_WEEK'])
cyc_transform = [('cyc',Pipeline([('cyc_col',ColumnTransformer([hour_transform, wkend_transform],
remainder='drop',verbose_feature_names_out=False)),
('cyc_poly',PolynomialFeatures(degree=2,interaction_only=True,
include_bias=False))]),
self.features['cyc'])]
elif cyc_method == 'interact-trig':
hour_transform = [(f'HOUR_cos',cos_transformer(cyc_dict['HOUR_OF_DAY']),['HOUR_OF_DAY']),
(f'HOUR_sin',sin_transformer(cyc_dict['HOUR_OF_DAY']),['HOUR_OF_DAY'])]
wkend_transform = ('wkend',FunctionTransformer(lambda x: (x.isin([1,7])).astype(int)),['DAY_OF_WEEK'])
cyc_transform = [('cyc',Pipeline([('cyc_col',ColumnTransformer(hour_transform+[wkend_transform],
remainder='drop',verbose_feature_names_out=False)),
('cyc_poly',PolynomialFeatures(degree=2,interaction_only=True,
include_bias=False))]),
self.features['cyc'])]
elif cyc_method is None:
cyc_transform = [('cyc','passthrough',[])]
else:
raise ValueError("cyc_method must be one of 'trig','spline','interact','onehot','ord',or None")
# Define numerical transform
num_transform = ('num',StandardScaler(),self.features['num']) if num_ss else\
('num','passthrough',self.features['num'])
# Define column transformer
col_transform = ColumnTransformer([('cat',cat_transform,self.features['cat']),
('ord','passthrough',self.features['ord']),
num_transform,
('bin',SimpleImputer(strategy='most_frequent'),
self.features['bin'])]+\
cyc_transform,
remainder='drop',verbose_feature_names_out=False)
steps = [('col',col_transform)]
if 'AGE' in self.features['num']:
steps.insert(0,('gi_age',GroupImputer(target = 'AGE', group_cols=['COUNTY'],strategy='median')))
if 'HOUR_OF_DAY' in self.features['cyc']:
steps.insert(0,('gi_hour',GroupImputer(target = 'HOUR_OF_DAY', group_cols=['ILLUMINATION','CRASH_MONTH'],strategy='mode')))
# Insert optional steps as needed
if over_sample:
steps.insert(0,('os',RandomOverSampler(random_state=self.random_state)))
if poly_features:
steps.append(('pf',PolynomialFeatures(degree=degree,interaction_only=interaction_only)))
if select_features:
steps.append(('fs',SelectKBest(score_func = score_func, k = k)))
if pca:
steps.append(('pca',PCA(n_components=n_components,random_state=self.random_state)))
# Append classifier if provided
if self.classifier is not None:
if self.classifier_name is not None:
steps.append((f'{self.classifier_name}_clf',self.classifier))
else:
steps.append(('clf',self.classifier))
# Initialize pipeline
self.pipe = Pipeline(steps)
def cv_score(self, scoring = 'roc_auc', n_splits = 5, n_repeats=3, thresh = 0.5, beta = 1,
return_mean_score=False,print_mean_score=True,print_scores=False, n_jobs=-1,
eval_size=0.1,eval_metric='auc'):
"""
Method for performing cross validation via RepeatedStratifiedKFold
Parameters:
-----------
scoring : str
scoring function to use. must be one of
'roc_auc','acc','f1','','f1w'
thresh : float
the classification threshold for computing y_pred
from y_pred_proba
beta : float
the beta-value to use in the f_beta score, if chosen
n_splits, n_repeats : int, int
number of splits and number of repeat iterations
for sklearn.model_selection.RepeatedStratifiedKFold
return_mean_score : bool
whether or not to return the mean score
print_mean_score : bool
whether to print out a report of the mean score
print_scores : bool
whether to print out a report of CV scores for all folds
n_jobs : int or None
number of CPU cores to use for parallel processing
-1 uses all available cores, and None defaults to 1
eval_size : float
Fraction of the training set to use for early stopping eval set
eval_metric : str
eval metric to use in early stopping
Returns: None or mean_score, depending on return_mean_score setting
--------
"""
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.'
assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.'
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.'
assert scoring in ['roc_auc','acc','f1','fb','f1w'],"scoring must be one of 'roc_auc','acc','f1','fb','f1w'"
# Initialize CV iterator
kf = RepeatedStratifiedKFold(n_splits = n_splits, n_repeats=n_repeats,
random_state=self.random_state)
# Restrict to features supplied in self.features
X = self.X[[feat for feat_type in self.features for feat in self.features[feat_type]]]
lgb_es=False
# if isinstance(self.pipe[-1],LGBMClassifier):
# if 'early_stopping_round' in self.pipe[-1].get_params():
# if self.pipe[-1].get_params()['early_stopping_rounds'] is not None:
# lgb_es=True
scores = []
# Iterate over folds and train, predict, score
for i,(train_idx,test_idx) in enumerate(kf.split(X,self.y)):
fold_X_train = X.iloc[train_idx,:]
fold_X_test = X.iloc[test_idx,:]
fold_y_train = self.y.iloc[train_idx]
fold_y_test = self.y.iloc[test_idx]
pipe=clone(self.pipe)
if lgb_es:
fold_X_train,fold_X_es,fold_y_train,fold_y_es = train_test_split(fold_X_train,fold_y_train,
stratify=fold_y_train,test_size=eval_size,
random_state=self.random_state)
trans_pipe = pipe[:-1]
trans_pipe.fit_transform(fold_X_train)
fold_X_es = trans_pipe.transform(fold_X_es)
clf_name = pipe.steps[-1][0]
fit_params = {f'{clf_name}__eval_set':[(fold_X_es,fold_y_es)],
f'{clf_name}__eval_metric':eval_metric,
f'{clf_name}__verbose':0}
else:
fit_params = {}
pipe.fit(fold_X_train,fold_y_train,**fit_params)
fold_y_pred_proba = pipe.predict_proba(fold_X_test)[:,1]
if scoring == 'roc_auc':
fold_score = roc_auc_score(fold_y_test, fold_y_pred_proba)
else:
fold_y_pred = (fold_y_pred_proba >= thresh).astype('int')
if scoring == 'acc':
fold_score = accuracy_score(fold_y_test,fold_y_pred)
elif scoring == 'f1':
fold_score = f1_score(fold_y_test,fold_y_pred)
elif scoring == 'f1w':
fold_score = f1_score(fold_y_test,fold_y_pred,average='weighted')
else:
fold_score = fbeta_score(fold_y_test,fold_y_pred,beta=beta)
scores.append(fold_score)
# Average and report
mean_score = np.mean(scores)
if print_scores:
print(f'CV scores using {scoring} score: {scores} \nMean score: {mean_score}')
if print_mean_score:
print(f'Mean CV {scoring} score: {mean_score}')
if return_mean_score:
return mean_score
def randomized_search(self, params, n_components = None, n_iter=10,
scoring='roc_auc',cv=5,refit=False,top_n=10, n_jobs=-1):
"""
Method for performing randomized search with cross validation on a given dictionary of parameter distributions
Also displays a table of results the best top_n iterations
Parameters:
----------
params : dict
parameter distributions to use for RandomizedSearchCV
n_components : int, or list, or None
number of components for sklearn.decomposition.PCA
- if int, will reset the PCA layer in self.pipe with provided value
- if list, must be list of ints, which will be included in
RandomizedSearchCV parameter distribution
scoring : str
scoring function for sklearn.model_selection.cross_val_score
n_iter : int
number of iterations to use in RandomizedSearchCV
refit : bool
whether to refit a final classifier with best parameters
- if False, will only set self.best_params and self.best_score
- if True, will set self.best_estimator in addition
top_n : int or None
if int, will display results from top_n best iterations only
if None, will display all results
n_jobs : int or None
number of CPU cores to use for parallel processing
-1 uses all available cores, and None defaults to 1
"""
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.'
assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.'
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.'
assert (n_components is None)|('pca' in self.pipe.named_steps), 'Your pipeline has no PCA step. Build a pipeline with PCA first.'
assert (len(params)>0)|(type(n_components)==list), 'Either pass a parameter distribution or a list of n_components values.'
# Add estimator name prefix to hyperparams
params = {self.pipe.steps[-1][0]+'__'+key:params[key] for key in params}
# Process supplied n_components
if type(n_components)==list:
params['pca__n_components']=n_components
elif type(n_components)==int:
self.pipe['pca'].set_params(n_components=n_components)
# Restrict to features supplied in self.features
X = self.X[[feat for feat_type in self.features for feat in self.features[feat_type]]]
# Initialize rs and fit
rs = RandomizedSearchCV(self.pipe, param_distributions = params,
n_iter=n_iter, scoring = scoring, cv = cv,refit=refit,
random_state=self.random_state, n_jobs=n_jobs)
rs.fit(X,self.y)
# Display top n scores
results = rs.cv_results_
results_df = pd.DataFrame(results['params'])
param_names = list(results_df.columns)
results_df[f'mean cv score ({scoring})']=pd.Series(results['mean_test_score'])
results_df = results_df.set_index(param_names).sort_values(by=f'mean cv score ({scoring})',ascending=False)
if top_n is not None:
display(results_df.head(top_n).style\
.highlight_max(axis=0, props='color:white; font-weight:bold; background-color:seagreen;'))
else:
display(results_df.style\
.highlight_max(axis=0, props='color:white; font-weight:bold; background-color:seagreen;'))
if refit:
self.best_estimator = rs.best_estimator_
best_params = rs.best_params_
self.best_params = {key.split('__')[-1]:best_params[key] for key in best_params if key.split('__')[0]!='pca'}
self.best_n_components = next((best_params[key] for key in best_params if key.split('__')[0]=='pca'), None)
self.best_score = rs.best_score_
def fit_pipeline(self,split_first=False, eval_size=0.1,eval_metric='auc'):
"""
Method for fitting self.pipeline on self.X,self.y
Parameters:
-----------
split_first : bool
if True, a train_test_split will be performed first
and the validation set will be stored
early_stopping : bool
Indicates whether we will use early_stopping for lightgbm.
If true, will split off an eval set prior to k-fold split
eval_size : float
Fraction of the training set to use for early stopping eval set
eval_metric : str
eval metric to use in early stopping
"""
# Need pipe and X to fit
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.'
assert self.X is not None, 'X does not exist. First set X.'
# If no y provided, then no pipeline steps should require y
step_list = [step[0] for step in self.pipe.steps]
assert (('clf' not in step_list[-1])&('kf' not in step_list))|(self.y is not None), 'You must provide targets y if pipeline has a classifier step or feature selection step.'
# Don't need to do a train-test split without a classifier
assert (split_first==False)|('clf' in step_list[-1]), 'Only need train-test split if you have a classifier.'
if split_first:
X_train,X_val,y_train,y_val = train_test_split(self.X,self.y,stratify=self.y,
test_size=0.2,random_state=self.random_state)
self.X_val = X_val
self.y_val = y_val
else:
X_train = self.X.copy()
if self.y is not None:
y_train = self.y.copy()
# Restrict to features supplied in self.features
X_train = X_train[[feat for feat_type in self.features for feat in self.features[feat_type]]]
# If LGBM early stopping, then need to split off eval_set and define fit_params
# if isinstance(self.pipe[-1],LGBMClassifier):
# if self.pipe[-1].get_params()['early_stopping_rounds'] is not None:
# X_train,X_es,y_train,y_es = train_test_split(X_train,y_train,
# test_size=eval_size,
# stratify=y_train,
# random_state=self.random_state)
# trans_pipe = self.pipe[:-1]
# trans_pipe.fit_transform(X_train)
# X_es = trans_pipe.transform(X_es)
# clf_name = self.pipe.steps[-1][0]
# fit_params = {f'{clf_name}__eval_set':[(X_es,y_es)],
# f'{clf_name}__eval_metric':eval_metric,
# f'{clf_name}__verbose':0}
# else:
# fit_params = {}
# else:
# fit_params = {}
fit_params = {}
# Fit and store fitted pipeline. If no classifier, fit_transform X_train and store transformed version
pipe = self.pipe
if 'clf' in step_list[-1]:
pipe.fit(X_train,y_train,**fit_params)
else:
X_transformed = pipe.fit_transform(X_train)
# X_transformed = pd.DataFrame(X_transformed,columns=pipe[-1].get_column_names_out())
self.X_transformed = X_transformed
self.pipe_fitted = pipe
def predict_proba_pipeline(self, X_test = None):
"""
Method for using a fitted pipeline to compute predicted
probabilities for X_test (if supplied) or self.X_val
Parameters:
-----------
X_test : pd.DataFrame or None
test data input features (if None, will use self.X_val)
"""
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.'
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.'
assert self.pipe_fitted is not None, 'Pipeline is not fitted. First fit pipeline using fit_pipeline.'
assert (X_test is not None)|(self.X_val is not None), 'Must either provide X_test and y_test or fit the pipeline with split_first=True.'
if X_test is None:
X_test = self.X_val
# Restrict to features supplied in self.features
X_test = X_test[[feat for feat_type in self.features for feat in self.features[feat_type]]]
# Save prediction
self.y_predict_proba = self.pipe_fitted.predict_proba(X_test)[:,1]
def score_pipeline(self,y_test=None,scoring='roc_auc',thresh=0.5, beta = 1,
normalize = None, print_score = True):
"""
Method for scoring self.pipe_fitted on supplied test data and reporting score
Parameters:
-----------
y_test : pd.Series or None
true binary targets (if None, will use self.y_val)
scoring : str
specifies the metric to use for scoring
must be one of
'roc_auc', 'roc_plot', 'acc', 'f1', 'f1w', 'fb','mcc','kappa','conf','classif_report'
thresh : float
threshhold value for computing y_pred
from y_predict_proba
beta : float
the beta parameter in the fb score
normalize : str or None
the normalize parameter for the
confusion_matrix. must be one of
'true','pred','all',None
print_score : bool
if True, will print a message reporting the score
if False, will return the score as a float
"""
assert (y_test is not None)|(self.y_val is not None), 'Must either provide X_test and y_test or fit the pipeline with split_first=True.'
assert self.y_predict_proba is not None, 'Predicted probabilities do not exist. Run predict_proba_pipeline first.'
if y_test is None:
y_test = self.y_val
# Score and report
if scoring == 'roc_plot':
fig = plt.figure(figsize=(4,4))
ax = fig.add_subplot(111)
RocCurveDisplay.from_predictions(y_test,self.y_predict_proba,ax=ax)
plt.show()
elif scoring == 'roc_auc':
score = roc_auc_score(y_test, self.y_predict_proba)
else:
y_pred = (self.y_predict_proba >= thresh).astype('int')
if scoring == 'acc':
score = accuracy_score(y_test,y_pred)
elif scoring == 'f1':
score = f1_score(y_test,y_pred)
elif scoring == 'f1w':
score = f1_score(y_test,y_pred,average='weighted')
elif scoring == 'fb':
score = fbeta_score(y_test,y_pred,beta=beta)
elif scoring == 'mcc':
score = matthews_coffcoeff(y_test,y_pred)
elif scoring == 'kappa':
score = cohen_kappa_score(y_test,y_pred)
elif scoring == 'conf':
fig = plt.figure(figsize=(3,3))
ax = fig.add_subplot(111)
ConfusionMatrixDisplay.from_predictions(y_test,y_pred,ax=ax,colorbar=False)
plt.show()
elif scoring == 'classif_report':
target_names=['neither seriously injured nor killed','seriously injured or killed']
print(classification_report(y_test, y_pred,target_names=target_names))
else:
raise ValueError("scoring must be one of 'roc_auc', 'roc_plot','acc', 'f1', 'f1w', 'fb','mcc','kappa','conf','classif_report'")
if scoring not in ['conf','roc_plot','classif_report']:
if print_score:
print(f'The {scoring} score is: {score}')
else:
return score
def shap_values(self, X_test = None, eval_size=0.1,eval_metric='auc'):
"""
Method for computing and SHAP values for features
stratifiedtrain/test split
A copy of self.pipe is fitted on the training set
and then SHAP values are computed on test set samples
Parameters:
-----------
X_test : pd.DataFrame
The test set; if provided, will not perform
a train/test split before fitting
eval_size : float
Fraction of the training set to use for early stopping eval set
eval_metric : str
eval metric to use in early stopping
Returns: None (stores results in self.shap_vals)
--------
"""
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.'
assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.'
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.'
# Clone pipeline, do train/test split if X_test not provided
pipe = clone(self.pipe)
X_train = self.X.copy()
y_train = self.y.copy()
if X_test is None:
X_train,X_test,y_train,y_test = train_test_split(X_train,y_train,stratify=y_train,
test_size=0.2,random_state=self.random_state)
# Restrict to features provided in self.features, and fit
X_train = X_train[[feat for feat_type in self.features for feat in self.features[feat_type]]]
X_test = X_test[[feat for feat_type in self.features for feat in self.features[feat_type]]]
# If LGBM early stopping, then need to split off eval_set and define fit_params
# if isinstance(self.pipe[-1],LGBMClassifier):
# if 'early_stopping_round' in self.pipe[-1].get_params():
# if self.pipe[-1].get_params()['early_stopping_rounds'] is not None:
# X_train,X_es,y_train,y_es = train_test_split(X_train,y_train,
# test_size=eval_size,
# stratify=y_train,
# random_state=self.random_state)
# trans_pipe = self.pipe[:-1]
# trans_pipe.fit_transform(X_train)
# X_es = trans_pipe.transform(X_es)
# clf_name = self.pipe.steps[-1][0]
# fit_params = {f'{clf_name}__eval_set':[(X_es,y_es)],
# f'{clf_name}__eval_metric':eval_metric,
# f'{clf_name}__verbose':0}
# else:
# fit_params = {}
# else:
# fit_params = {}
fit_params = {}
pipe.fit(X_train,y_train,**fit_params)
# SHAP will just explain classifier, so need transformed X_train and X_test
X_train_trans, X_test_trans = pipe[:-1].transform(X_train), pipe[:-1].transform(X_test)
# Need masker for linear model
masker = shap.maskers.Independent(data=X_train_trans)
# Initialize explainer and compute and store SHAP values as an explainer object
explainer = shap.Explainer(pipe[-1], masker = masker, feature_names = pipe['col'].get_feature_names_out())
self.shap_vals = explainer(X_test_trans)
self.X_shap = X_train_trans
self.y_shap = y_train
def shap_plot(self,max_display='all'):
"""
Method for generating plots of SHAP value results
SHAP values should be already computed previously
Generates two plots side by side:
- a beeswarm plot of SHAP values of all samples
- a barplot of mean absolute SHAP values
Parameters:
-----------
max_display : int or 'all'
The number of features to show in the plot, in descending
order by mean absolute SHAP value. If 'all', then
all features will be included.
Returns: None (plots displayed)
--------
"""
assert self.shap_vals is not None, 'No shap values exist. First compute shap values.'
assert (isinstance(max_display,int))|(max_display=='all'), "'max_display' must be 'all' or an integer"
if max_display=='all':
title_add = ', all features'
max_display = self.shap_vals.shape[1]
else:
title_add = f', top {max_display} features'
# Plot
fig=plt.figure()
ax1=fig.add_subplot(121)
shap.summary_plot(self.shap_vals,plot_type='bar',max_display=max_display,
show=False,plot_size=0.2)
ax2=fig.add_subplot(122)
shap.summary_plot(self.shap_vals,plot_type='violin',max_display=max_display,
show=False,plot_size=0.2)
fig.set_size_inches(12,max_display/3)
ax1.set_title(f'Mean absolute SHAP values'+title_add,fontsize='small')
ax1.set_xlabel('mean(|SHAP value|)',fontsize='x-small')
ax2.set_title(f'SHAP values'+title_add,fontsize='small')
ax2.set_xlabel('SHAP value', fontsize='x-small')
for ax in [ax1,ax2]:
ax.set_ylabel('feature name',fontsize='x-small')
ax.tick_params(axis='y', labelsize='xx-small')
plt.tight_layout()
plt.show()
def find_best_threshold(self,beta=1,conf=True,report=True, print_result=True):
"""
Computes the classification threshold which gives the
best F_beta score from classifier predictions,
prints the best threshold and the corresponding F_beta score,
and displays a confusion matrix and classification report
corresponding to that threshold
Parameters:
-----------
beta : float
the desired beta value in the F_beta score
conf : bool
whether to display confusion matrix
report : bool
whether to display classification report
print_result : bool
whether to print a line reporting the best threshold
and resulting F_beta score
Returns: None (prints results and stores self.best_thresh)
--------
"""
prec,rec,threshs = precision_recall_curve(self.y_val,
self.y_predict_proba)
F_betas = (1+beta**2)*(prec*rec)/((beta**2*prec)+rec)
# Above formula is valid when TP!=0. When TP==0
# it gives np.nan whereas F_beta should be 0
F_betas = np.nan_to_num(F_betas)
idx = np.argmax(F_betas)
best_thresh = threshs[idx]
if print_result:
print(f'Threshold optimizing F_{beta} score: {best_thresh}\nBest F_{beta} score: {F_betas[idx]}')
if conf:
self.score_pipeline(scoring='conf',thresh=best_thresh,beta=beta)
if report:
self.score_pipeline(scoring='classif_report',thresh=best_thresh,beta=beta)
self.best_thresh = best_thresh
class LRStudy(ClassifierStudy):
"""
A child class of ClassifierStudy which has an additional method specific to logistic regression
"""
def __init__(self, classifier=None, X = None, y = None,
features=None,classifier_name = 'LR',
random_state=42):
super().__init__(classifier, X, y,features,classifier_name,random_state)
def plot_coeff(self, print_score = True, print_zero = False, title_add=None):
"""
Method for doing a train/validation split, fitting the classifier,
predicting and scoring on the validation set, and plotting
a bar chart of the logistic regression coefficients corresponding
to various model features.
Features with coefficient zero and periodic spline features
will be excluded from the chart.
Parameters:
-----------
print_score : bool
if True, the validation score are printed
print_zero : bool
if True, the list of features with zero coefficients are printed
title_add : str or None
an addendum that is added to the end of the plot title
"""
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.'
assert isinstance(self.classifier,LogisticRegression),'Your classifier is not an instance of Logistic Regression.'
# fit and score
self.fit_pipeline(split_first = True)
self.predict_proba_pipeline()
score = roc_auc_score(self.y_val, self.y_predict_proba)
# Retrieve coeff values from fitted pipeline
coeff = pd.DataFrame({'feature name':self.pipe_fitted['col'].get_feature_names_out(),
'coeff value':self.pipe_fitted[-1].coef_.reshape(-1)})\
.sort_values(by='coeff value')
coeff = coeff[~coeff['feature name']\
.isin([f'HOUR_OF_DAY_sp_{n}' for n in range(12)]\
+[f'DAY_OF_WEEK_sp_{n}' for n in range(3)])]\
.set_index('feature name')
coeff_zero_features = coeff[coeff['coeff value']==0].index
coeff = coeff[coeff['coeff value']!=0]
# Plot feature coefficients
fig = plt.figure(figsize=(30,4))
ax = fig.add_subplot(111)
coeff['coeff value'].plot(kind='bar',ylabel='coeff value',ax=ax)
ax.axhline(y=0, color= 'red', linewidth=2,)
plot_title = 'PA bicycle collisions, 2002-2021\nLogistic regression model log-odds coefficients'
if title_add is not None:
plot_title += f': {title_add}'
ax.set_title(plot_title)
ax.tick_params(axis='x', labelsize='x-small')
plt.show()
if print_score:
print(f'Score on validation set: {score}')
if print_zero:
print(f'Features with zero coefficients in trained model: {list(coeff_zero)}')
self.score = score
self.coeff = coeff
self.coeff_zero_features = coeff_zero_features