Spaces:
Runtime error
Runtime error
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import shap | |
from sklearn.feature_selection import chi2, SelectKBest, mutual_info_classif, f_classif | |
from sklearn.metrics import accuracy_score, log_loss, confusion_matrix, f1_score, fbeta_score, roc_auc_score | |
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay, classification_report, precision_recall_curve | |
from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold, cross_val_score, RandomizedSearchCV, StratifiedKFold | |
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, FunctionTransformer, SplineTransformer, PolynomialFeatures | |
from sklearn.decomposition import PCA | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.ensemble import HistGradientBoostingClassifier, GradientBoostingClassifier | |
# from lightgbm import LGBMClassifier | |
from sklearn.base import BaseEstimator, TransformerMixin, clone | |
from sklearn.utils.validation import check_is_fitted | |
from sklearn.impute import SimpleImputer | |
from sklearn.pipeline import Pipeline, make_pipeline | |
from sklearn.compose import ColumnTransformer, make_column_transformer | |
from lib.transform_data import * | |
class ClassifierStudy(): | |
""" | |
A class that contains tools for studying a classifier pipeline | |
Parameters: | |
----------- | |
classifier : a scikit-learn compatible binary classifier | |
X : pd.DataFrame | |
dataframe of features | |
y : pd.Series | |
series of binary target values corresponding to X | |
classifier_name : str or None | |
if provided, will use as classifier name in pipeline | |
if not, will use 'clf' as name | |
features : dict | |
a dictionary whose keys are the feature types | |
'cyc','cat','ord','num','bin' and whose values | |
are lists of features of each type. | |
Methods: | |
------- | |
set_data, set_features, set_state | |
sets or resets attributes of self | |
build_pipeline | |
builds out pipeline based on supplied specs | |
cv_score | |
runs k-fold cross validation and reports scores | |
randomized_search | |
runs randomized search with cross validation | |
and reports results | |
fit_pipeline | |
fits the model pipeline and stores as | |
self.pipe_fitted | |
predict_proba_pipeline | |
uses a fitted pipeline to compute predicted | |
probabilities for test or validation set | |
score_pipeline | |
scores predicted probabilities | |
""" | |
def __init__(self, classifier=None, X = None, y = None, | |
features = None,classifier_name = None, | |
random_state=42): | |
self.classifier = classifier | |
if X is not None: | |
self.X = X.copy() | |
if y is not None: | |
self.y = y.copy() | |
if features is not None: | |
self.features = features.copy() | |
self.random_state=random_state | |
self.pipe, self.pipe_fitted = None, None | |
self.classifier_name = classifier_name | |
self.X_val, self.y_val = None, None | |
self.y_predict_proba = None | |
self.best_params, self.best_n_components = None, None | |
self.shap_vals = None | |
def set_data(self,X=None,y=None): | |
"""Method to set or reset feature and/or target data""" | |
if X is not None: | |
self.X = X.copy() | |
if y is not None: | |
self.y = y.copy() | |
def set_features(self,features): | |
"""Method to set or reset the feature dictionary""" | |
if features is not None: | |
self.features = features.copy() | |
def set_state(self,random_state): | |
"""Method to set or reset the random_state""" | |
self.random_state = random_state | |
def build_pipeline(self, cat_method = 'onehot',cyc_method = 'spline',num_ss=True, | |
over_sample = False, pca=False,n_components=None, | |
select_features = False,score_func=None,k='all', | |
poly_features = False, degree=2, interaction_only=False): | |
""" | |
Method to build the model pipeline | |
Parameters: | |
----------- | |
cat_method : str | |
specifies whether to encode categorical | |
variables as one-hot vectors or ordinals | |
must be either 'onehot' or 'ord' | |
cyc_method : str | |
specifies whether to encode cyclical features | |
with sine/cosine encoding or periodic splines | |
must be one of 'trig', 'spline', 'interact-trig', | |
'interact-spline','onehot', 'ord', or None | |
- If 'trig' or 'spline', will set up periodic encoder | |
with desired method | |
- If 'onehot' or 'ord', will set up appropriate | |
categorical encoder | |
- If 'interact-{method}', will use <method> encoding for HOUR_OF_DAY, | |
encode DAY_OF_WEEK as a binary feature expressing whether | |
the day is a weekend day, and then include interaction | |
features among this set via PolynomialFeatures. | |
- If None, will leave out cyclical features altogether | |
num_ss : bool | |
Whether or not to apply StandardScaler on the numerical features | |
over_sample : bool | |
set to True to include imblearn.over_sampling.RandomOverSampler step | |
pca : bool | |
set to True to include sklearn.decomposition.PCA step | |
n_components : int or None | |
number of components for sklearn.decomposition.PCA | |
select_features : bool | |
set to True to include sklearn.feature_selection.SelectKBest step | |
score_func : callable | |
score function to use for sklearn.feature_selection.SelectKBest | |
recommended: chi2, f_classif, or mutual_info_classif | |
k : int or 'all' | |
number of features for sklearn.feature_selection.SelectKBest | |
poly_features : bool | |
set to True to include sklearn.preprocessing.PolynomialFeatures step | |
degree : int | |
max degree for sklearn.preprocessing.PolynomialFeatures | |
interaction_only : bool | |
whether or not sklearn.preprocessing.PolynomialFeatures will be limited | |
to interaction terms only | |
""" | |
# Define transformer for categorical features | |
if cat_method == 'onehot': | |
cat_encoder = ('ohe',OneHotEncoder(handle_unknown='infrequent_if_exist')) | |
elif cat_method == 'ord': | |
cat_encoder = ('oe',OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=np.nan)) | |
else: | |
raise ValueError("cat_method must be either 'onehot' or 'ord'") | |
cat_transform = Pipeline([('si',SimpleImputer(strategy='most_frequent')),cat_encoder]) | |
# Define transformer for cyclic features | |
cyc_dict = {'HOUR_OF_DAY':24,'DAY_OF_WEEK':7} | |
if cyc_method == 'trig': | |
cyc_transform = [(f'{feat}_cos',cos_transformer(cyc_dict[feat]),[feat]) for feat in self.features['cyc']]+\ | |
[(f'{feat}_sin',sin_transformer(cyc_dict[feat]),[feat]) for feat in self.features['cyc']] | |
elif cyc_method =='spline': | |
cyc_transform = [(f'{feat}_cyclic', | |
periodic_spline_transformer(cyc_dict[feat],n_splines=cyc_dict[feat]//2), | |
[feat]) for feat in self.features['cyc']] | |
elif cyc_method == 'onehot': | |
cyc_encoder = ('ohe_cyc',OneHotEncoder(handle_unknown='infrequent_if_exist')) | |
cyc_transform = [('cyc',Pipeline([cyc_encoder]),self.features['cyc'])] | |
elif cyc_method == 'ord': | |
cyc_encoder = ('oe_cyc',OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=np.nan)) | |
cyc_transform = [('cyc',Pipeline([cyc_encoder]),self.features['cyc'])] | |
elif cyc_method == 'interact-spline': | |
hour_transform = (f'hour_cyc',periodic_spline_transformer(cyc_dict['HOUR_OF_DAY'],n_splines=12),['HOUR_OF_DAY']) | |
wkend_transform = ('wkend',FunctionTransformer(lambda x: (x.isin([1,7])).astype(int)),['DAY_OF_WEEK']) | |
cyc_transform = [('cyc',Pipeline([('cyc_col',ColumnTransformer([hour_transform, wkend_transform], | |
remainder='drop',verbose_feature_names_out=False)), | |
('cyc_poly',PolynomialFeatures(degree=2,interaction_only=True, | |
include_bias=False))]), | |
self.features['cyc'])] | |
elif cyc_method == 'interact-trig': | |
hour_transform = [(f'HOUR_cos',cos_transformer(cyc_dict['HOUR_OF_DAY']),['HOUR_OF_DAY']), | |
(f'HOUR_sin',sin_transformer(cyc_dict['HOUR_OF_DAY']),['HOUR_OF_DAY'])] | |
wkend_transform = ('wkend',FunctionTransformer(lambda x: (x.isin([1,7])).astype(int)),['DAY_OF_WEEK']) | |
cyc_transform = [('cyc',Pipeline([('cyc_col',ColumnTransformer(hour_transform+[wkend_transform], | |
remainder='drop',verbose_feature_names_out=False)), | |
('cyc_poly',PolynomialFeatures(degree=2,interaction_only=True, | |
include_bias=False))]), | |
self.features['cyc'])] | |
elif cyc_method is None: | |
cyc_transform = [('cyc','passthrough',[])] | |
else: | |
raise ValueError("cyc_method must be one of 'trig','spline','interact','onehot','ord',or None") | |
# Define numerical transform | |
num_transform = ('num',StandardScaler(),self.features['num']) if num_ss else\ | |
('num','passthrough',self.features['num']) | |
# Define column transformer | |
col_transform = ColumnTransformer([('cat',cat_transform,self.features['cat']), | |
('ord','passthrough',self.features['ord']), | |
num_transform, | |
('bin',SimpleImputer(strategy='most_frequent'), | |
self.features['bin'])]+\ | |
cyc_transform, | |
remainder='drop',verbose_feature_names_out=False) | |
steps = [('col',col_transform)] | |
if 'AGE' in self.features['num']: | |
steps.insert(0,('gi_age',GroupImputer(target = 'AGE', group_cols=['COUNTY'],strategy='median'))) | |
if 'HOUR_OF_DAY' in self.features['cyc']: | |
steps.insert(0,('gi_hour',GroupImputer(target = 'HOUR_OF_DAY', group_cols=['ILLUMINATION','CRASH_MONTH'],strategy='mode'))) | |
# Insert optional steps as needed | |
if over_sample: | |
steps.insert(0,('os',RandomOverSampler(random_state=self.random_state))) | |
if poly_features: | |
steps.append(('pf',PolynomialFeatures(degree=degree,interaction_only=interaction_only))) | |
if select_features: | |
steps.append(('fs',SelectKBest(score_func = score_func, k = k))) | |
if pca: | |
steps.append(('pca',PCA(n_components=n_components,random_state=self.random_state))) | |
# Append classifier if provided | |
if self.classifier is not None: | |
if self.classifier_name is not None: | |
steps.append((f'{self.classifier_name}_clf',self.classifier)) | |
else: | |
steps.append(('clf',self.classifier)) | |
# Initialize pipeline | |
self.pipe = Pipeline(steps) | |
def cv_score(self, scoring = 'roc_auc', n_splits = 5, n_repeats=3, thresh = 0.5, beta = 1, | |
return_mean_score=False,print_mean_score=True,print_scores=False, n_jobs=-1, | |
eval_size=0.1,eval_metric='auc'): | |
""" | |
Method for performing cross validation via RepeatedStratifiedKFold | |
Parameters: | |
----------- | |
scoring : str | |
scoring function to use. must be one of | |
'roc_auc','acc','f1','','f1w' | |
thresh : float | |
the classification threshold for computing y_pred | |
from y_pred_proba | |
beta : float | |
the beta-value to use in the f_beta score, if chosen | |
n_splits, n_repeats : int, int | |
number of splits and number of repeat iterations | |
for sklearn.model_selection.RepeatedStratifiedKFold | |
return_mean_score : bool | |
whether or not to return the mean score | |
print_mean_score : bool | |
whether to print out a report of the mean score | |
print_scores : bool | |
whether to print out a report of CV scores for all folds | |
n_jobs : int or None | |
number of CPU cores to use for parallel processing | |
-1 uses all available cores, and None defaults to 1 | |
eval_size : float | |
Fraction of the training set to use for early stopping eval set | |
eval_metric : str | |
eval metric to use in early stopping | |
Returns: None or mean_score, depending on return_mean_score setting | |
-------- | |
""" | |
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' | |
assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.' | |
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' | |
assert scoring in ['roc_auc','acc','f1','fb','f1w'],"scoring must be one of 'roc_auc','acc','f1','fb','f1w'" | |
# Initialize CV iterator | |
kf = RepeatedStratifiedKFold(n_splits = n_splits, n_repeats=n_repeats, | |
random_state=self.random_state) | |
# Restrict to features supplied in self.features | |
X = self.X[[feat for feat_type in self.features for feat in self.features[feat_type]]] | |
lgb_es=False | |
# if isinstance(self.pipe[-1],LGBMClassifier): | |
# if 'early_stopping_round' in self.pipe[-1].get_params(): | |
# if self.pipe[-1].get_params()['early_stopping_rounds'] is not None: | |
# lgb_es=True | |
scores = [] | |
# Iterate over folds and train, predict, score | |
for i,(train_idx,test_idx) in enumerate(kf.split(X,self.y)): | |
fold_X_train = X.iloc[train_idx,:] | |
fold_X_test = X.iloc[test_idx,:] | |
fold_y_train = self.y.iloc[train_idx] | |
fold_y_test = self.y.iloc[test_idx] | |
pipe=clone(self.pipe) | |
if lgb_es: | |
fold_X_train,fold_X_es,fold_y_train,fold_y_es = train_test_split(fold_X_train,fold_y_train, | |
stratify=fold_y_train,test_size=eval_size, | |
random_state=self.random_state) | |
trans_pipe = pipe[:-1] | |
trans_pipe.fit_transform(fold_X_train) | |
fold_X_es = trans_pipe.transform(fold_X_es) | |
clf_name = pipe.steps[-1][0] | |
fit_params = {f'{clf_name}__eval_set':[(fold_X_es,fold_y_es)], | |
f'{clf_name}__eval_metric':eval_metric, | |
f'{clf_name}__verbose':0} | |
else: | |
fit_params = {} | |
pipe.fit(fold_X_train,fold_y_train,**fit_params) | |
fold_y_pred_proba = pipe.predict_proba(fold_X_test)[:,1] | |
if scoring == 'roc_auc': | |
fold_score = roc_auc_score(fold_y_test, fold_y_pred_proba) | |
else: | |
fold_y_pred = (fold_y_pred_proba >= thresh).astype('int') | |
if scoring == 'acc': | |
fold_score = accuracy_score(fold_y_test,fold_y_pred) | |
elif scoring == 'f1': | |
fold_score = f1_score(fold_y_test,fold_y_pred) | |
elif scoring == 'f1w': | |
fold_score = f1_score(fold_y_test,fold_y_pred,average='weighted') | |
else: | |
fold_score = fbeta_score(fold_y_test,fold_y_pred,beta=beta) | |
scores.append(fold_score) | |
# Average and report | |
mean_score = np.mean(scores) | |
if print_scores: | |
print(f'CV scores using {scoring} score: {scores} \nMean score: {mean_score}') | |
if print_mean_score: | |
print(f'Mean CV {scoring} score: {mean_score}') | |
if return_mean_score: | |
return mean_score | |
def randomized_search(self, params, n_components = None, n_iter=10, | |
scoring='roc_auc',cv=5,refit=False,top_n=10, n_jobs=-1): | |
""" | |
Method for performing randomized search with cross validation on a given dictionary of parameter distributions | |
Also displays a table of results the best top_n iterations | |
Parameters: | |
---------- | |
params : dict | |
parameter distributions to use for RandomizedSearchCV | |
n_components : int, or list, or None | |
number of components for sklearn.decomposition.PCA | |
- if int, will reset the PCA layer in self.pipe with provided value | |
- if list, must be list of ints, which will be included in | |
RandomizedSearchCV parameter distribution | |
scoring : str | |
scoring function for sklearn.model_selection.cross_val_score | |
n_iter : int | |
number of iterations to use in RandomizedSearchCV | |
refit : bool | |
whether to refit a final classifier with best parameters | |
- if False, will only set self.best_params and self.best_score | |
- if True, will set self.best_estimator in addition | |
top_n : int or None | |
if int, will display results from top_n best iterations only | |
if None, will display all results | |
n_jobs : int or None | |
number of CPU cores to use for parallel processing | |
-1 uses all available cores, and None defaults to 1 | |
""" | |
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' | |
assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.' | |
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' | |
assert (n_components is None)|('pca' in self.pipe.named_steps), 'Your pipeline has no PCA step. Build a pipeline with PCA first.' | |
assert (len(params)>0)|(type(n_components)==list), 'Either pass a parameter distribution or a list of n_components values.' | |
# Add estimator name prefix to hyperparams | |
params = {self.pipe.steps[-1][0]+'__'+key:params[key] for key in params} | |
# Process supplied n_components | |
if type(n_components)==list: | |
params['pca__n_components']=n_components | |
elif type(n_components)==int: | |
self.pipe['pca'].set_params(n_components=n_components) | |
# Restrict to features supplied in self.features | |
X = self.X[[feat for feat_type in self.features for feat in self.features[feat_type]]] | |
# Initialize rs and fit | |
rs = RandomizedSearchCV(self.pipe, param_distributions = params, | |
n_iter=n_iter, scoring = scoring, cv = cv,refit=refit, | |
random_state=self.random_state, n_jobs=n_jobs) | |
rs.fit(X,self.y) | |
# Display top n scores | |
results = rs.cv_results_ | |
results_df = pd.DataFrame(results['params']) | |
param_names = list(results_df.columns) | |
results_df[f'mean cv score ({scoring})']=pd.Series(results['mean_test_score']) | |
results_df = results_df.set_index(param_names).sort_values(by=f'mean cv score ({scoring})',ascending=False) | |
if top_n is not None: | |
display(results_df.head(top_n).style\ | |
.highlight_max(axis=0, props='color:white; font-weight:bold; background-color:seagreen;')) | |
else: | |
display(results_df.style\ | |
.highlight_max(axis=0, props='color:white; font-weight:bold; background-color:seagreen;')) | |
if refit: | |
self.best_estimator = rs.best_estimator_ | |
best_params = rs.best_params_ | |
self.best_params = {key.split('__')[-1]:best_params[key] for key in best_params if key.split('__')[0]!='pca'} | |
self.best_n_components = next((best_params[key] for key in best_params if key.split('__')[0]=='pca'), None) | |
self.best_score = rs.best_score_ | |
def fit_pipeline(self,split_first=False, eval_size=0.1,eval_metric='auc'): | |
""" | |
Method for fitting self.pipeline on self.X,self.y | |
Parameters: | |
----------- | |
split_first : bool | |
if True, a train_test_split will be performed first | |
and the validation set will be stored | |
early_stopping : bool | |
Indicates whether we will use early_stopping for lightgbm. | |
If true, will split off an eval set prior to k-fold split | |
eval_size : float | |
Fraction of the training set to use for early stopping eval set | |
eval_metric : str | |
eval metric to use in early stopping | |
""" | |
# Need pipe and X to fit | |
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' | |
assert self.X is not None, 'X does not exist. First set X.' | |
# If no y provided, then no pipeline steps should require y | |
step_list = [step[0] for step in self.pipe.steps] | |
assert (('clf' not in step_list[-1])&('kf' not in step_list))|(self.y is not None), 'You must provide targets y if pipeline has a classifier step or feature selection step.' | |
# Don't need to do a train-test split without a classifier | |
assert (split_first==False)|('clf' in step_list[-1]), 'Only need train-test split if you have a classifier.' | |
if split_first: | |
X_train,X_val,y_train,y_val = train_test_split(self.X,self.y,stratify=self.y, | |
test_size=0.2,random_state=self.random_state) | |
self.X_val = X_val | |
self.y_val = y_val | |
else: | |
X_train = self.X.copy() | |
if self.y is not None: | |
y_train = self.y.copy() | |
# Restrict to features supplied in self.features | |
X_train = X_train[[feat for feat_type in self.features for feat in self.features[feat_type]]] | |
# If LGBM early stopping, then need to split off eval_set and define fit_params | |
# if isinstance(self.pipe[-1],LGBMClassifier): | |
# if self.pipe[-1].get_params()['early_stopping_rounds'] is not None: | |
# X_train,X_es,y_train,y_es = train_test_split(X_train,y_train, | |
# test_size=eval_size, | |
# stratify=y_train, | |
# random_state=self.random_state) | |
# trans_pipe = self.pipe[:-1] | |
# trans_pipe.fit_transform(X_train) | |
# X_es = trans_pipe.transform(X_es) | |
# clf_name = self.pipe.steps[-1][0] | |
# fit_params = {f'{clf_name}__eval_set':[(X_es,y_es)], | |
# f'{clf_name}__eval_metric':eval_metric, | |
# f'{clf_name}__verbose':0} | |
# else: | |
# fit_params = {} | |
# else: | |
# fit_params = {} | |
fit_params = {} | |
# Fit and store fitted pipeline. If no classifier, fit_transform X_train and store transformed version | |
pipe = self.pipe | |
if 'clf' in step_list[-1]: | |
pipe.fit(X_train,y_train,**fit_params) | |
else: | |
X_transformed = pipe.fit_transform(X_train) | |
# X_transformed = pd.DataFrame(X_transformed,columns=pipe[-1].get_column_names_out()) | |
self.X_transformed = X_transformed | |
self.pipe_fitted = pipe | |
def predict_proba_pipeline(self, X_test = None): | |
""" | |
Method for using a fitted pipeline to compute predicted | |
probabilities for X_test (if supplied) or self.X_val | |
Parameters: | |
----------- | |
X_test : pd.DataFrame or None | |
test data input features (if None, will use self.X_val) | |
""" | |
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' | |
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' | |
assert self.pipe_fitted is not None, 'Pipeline is not fitted. First fit pipeline using fit_pipeline.' | |
assert (X_test is not None)|(self.X_val is not None), 'Must either provide X_test and y_test or fit the pipeline with split_first=True.' | |
if X_test is None: | |
X_test = self.X_val | |
# Restrict to features supplied in self.features | |
X_test = X_test[[feat for feat_type in self.features for feat in self.features[feat_type]]] | |
# Save prediction | |
self.y_predict_proba = self.pipe_fitted.predict_proba(X_test)[:,1] | |
def score_pipeline(self,y_test=None,scoring='roc_auc',thresh=0.5, beta = 1, | |
normalize = None, print_score = True): | |
""" | |
Method for scoring self.pipe_fitted on supplied test data and reporting score | |
Parameters: | |
----------- | |
y_test : pd.Series or None | |
true binary targets (if None, will use self.y_val) | |
scoring : str | |
specifies the metric to use for scoring | |
must be one of | |
'roc_auc', 'roc_plot', 'acc', 'f1', 'f1w', 'fb','mcc','kappa','conf','classif_report' | |
thresh : float | |
threshhold value for computing y_pred | |
from y_predict_proba | |
beta : float | |
the beta parameter in the fb score | |
normalize : str or None | |
the normalize parameter for the | |
confusion_matrix. must be one of | |
'true','pred','all',None | |
print_score : bool | |
if True, will print a message reporting the score | |
if False, will return the score as a float | |
""" | |
assert (y_test is not None)|(self.y_val is not None), 'Must either provide X_test and y_test or fit the pipeline with split_first=True.' | |
assert self.y_predict_proba is not None, 'Predicted probabilities do not exist. Run predict_proba_pipeline first.' | |
if y_test is None: | |
y_test = self.y_val | |
# Score and report | |
if scoring == 'roc_plot': | |
fig = plt.figure(figsize=(4,4)) | |
ax = fig.add_subplot(111) | |
RocCurveDisplay.from_predictions(y_test,self.y_predict_proba,ax=ax) | |
plt.show() | |
elif scoring == 'roc_auc': | |
score = roc_auc_score(y_test, self.y_predict_proba) | |
else: | |
y_pred = (self.y_predict_proba >= thresh).astype('int') | |
if scoring == 'acc': | |
score = accuracy_score(y_test,y_pred) | |
elif scoring == 'f1': | |
score = f1_score(y_test,y_pred) | |
elif scoring == 'f1w': | |
score = f1_score(y_test,y_pred,average='weighted') | |
elif scoring == 'fb': | |
score = fbeta_score(y_test,y_pred,beta=beta) | |
elif scoring == 'mcc': | |
score = matthews_coffcoeff(y_test,y_pred) | |
elif scoring == 'kappa': | |
score = cohen_kappa_score(y_test,y_pred) | |
elif scoring == 'conf': | |
fig = plt.figure(figsize=(3,3)) | |
ax = fig.add_subplot(111) | |
ConfusionMatrixDisplay.from_predictions(y_test,y_pred,ax=ax,colorbar=False) | |
plt.show() | |
elif scoring == 'classif_report': | |
target_names=['neither seriously injured nor killed','seriously injured or killed'] | |
print(classification_report(y_test, y_pred,target_names=target_names)) | |
else: | |
raise ValueError("scoring must be one of 'roc_auc', 'roc_plot','acc', 'f1', 'f1w', 'fb','mcc','kappa','conf','classif_report'") | |
if scoring not in ['conf','roc_plot','classif_report']: | |
if print_score: | |
print(f'The {scoring} score is: {score}') | |
else: | |
return score | |
def shap_values(self, X_test = None, eval_size=0.1,eval_metric='auc'): | |
""" | |
Method for computing and SHAP values for features | |
stratifiedtrain/test split | |
A copy of self.pipe is fitted on the training set | |
and then SHAP values are computed on test set samples | |
Parameters: | |
----------- | |
X_test : pd.DataFrame | |
The test set; if provided, will not perform | |
a train/test split before fitting | |
eval_size : float | |
Fraction of the training set to use for early stopping eval set | |
eval_metric : str | |
eval metric to use in early stopping | |
Returns: None (stores results in self.shap_vals) | |
-------- | |
""" | |
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' | |
assert (self.X is not None)&(self.y is not None), 'X and/or y does not exist. First supply X and y using set_data.' | |
assert 'clf' in self.pipe.steps[-1][0], 'The pipeline has no classifier. Build a pipeline with a classifier first.' | |
# Clone pipeline, do train/test split if X_test not provided | |
pipe = clone(self.pipe) | |
X_train = self.X.copy() | |
y_train = self.y.copy() | |
if X_test is None: | |
X_train,X_test,y_train,y_test = train_test_split(X_train,y_train,stratify=y_train, | |
test_size=0.2,random_state=self.random_state) | |
# Restrict to features provided in self.features, and fit | |
X_train = X_train[[feat for feat_type in self.features for feat in self.features[feat_type]]] | |
X_test = X_test[[feat for feat_type in self.features for feat in self.features[feat_type]]] | |
# If LGBM early stopping, then need to split off eval_set and define fit_params | |
# if isinstance(self.pipe[-1],LGBMClassifier): | |
# if 'early_stopping_round' in self.pipe[-1].get_params(): | |
# if self.pipe[-1].get_params()['early_stopping_rounds'] is not None: | |
# X_train,X_es,y_train,y_es = train_test_split(X_train,y_train, | |
# test_size=eval_size, | |
# stratify=y_train, | |
# random_state=self.random_state) | |
# trans_pipe = self.pipe[:-1] | |
# trans_pipe.fit_transform(X_train) | |
# X_es = trans_pipe.transform(X_es) | |
# clf_name = self.pipe.steps[-1][0] | |
# fit_params = {f'{clf_name}__eval_set':[(X_es,y_es)], | |
# f'{clf_name}__eval_metric':eval_metric, | |
# f'{clf_name}__verbose':0} | |
# else: | |
# fit_params = {} | |
# else: | |
# fit_params = {} | |
fit_params = {} | |
pipe.fit(X_train,y_train,**fit_params) | |
# SHAP will just explain classifier, so need transformed X_train and X_test | |
X_train_trans, X_test_trans = pipe[:-1].transform(X_train), pipe[:-1].transform(X_test) | |
# Need masker for linear model | |
masker = shap.maskers.Independent(data=X_train_trans) | |
# Initialize explainer and compute and store SHAP values as an explainer object | |
explainer = shap.Explainer(pipe[-1], masker = masker, feature_names = pipe['col'].get_feature_names_out()) | |
self.shap_vals = explainer(X_test_trans) | |
self.X_shap = X_train_trans | |
self.y_shap = y_train | |
def shap_plot(self,max_display='all'): | |
""" | |
Method for generating plots of SHAP value results | |
SHAP values should be already computed previously | |
Generates two plots side by side: | |
- a beeswarm plot of SHAP values of all samples | |
- a barplot of mean absolute SHAP values | |
Parameters: | |
----------- | |
max_display : int or 'all' | |
The number of features to show in the plot, in descending | |
order by mean absolute SHAP value. If 'all', then | |
all features will be included. | |
Returns: None (plots displayed) | |
-------- | |
""" | |
assert self.shap_vals is not None, 'No shap values exist. First compute shap values.' | |
assert (isinstance(max_display,int))|(max_display=='all'), "'max_display' must be 'all' or an integer" | |
if max_display=='all': | |
title_add = ', all features' | |
max_display = self.shap_vals.shape[1] | |
else: | |
title_add = f', top {max_display} features' | |
# Plot | |
fig=plt.figure() | |
ax1=fig.add_subplot(121) | |
shap.summary_plot(self.shap_vals,plot_type='bar',max_display=max_display, | |
show=False,plot_size=0.2) | |
ax2=fig.add_subplot(122) | |
shap.summary_plot(self.shap_vals,plot_type='violin',max_display=max_display, | |
show=False,plot_size=0.2) | |
fig.set_size_inches(12,max_display/3) | |
ax1.set_title(f'Mean absolute SHAP values'+title_add,fontsize='small') | |
ax1.set_xlabel('mean(|SHAP value|)',fontsize='x-small') | |
ax2.set_title(f'SHAP values'+title_add,fontsize='small') | |
ax2.set_xlabel('SHAP value', fontsize='x-small') | |
for ax in [ax1,ax2]: | |
ax.set_ylabel('feature name',fontsize='x-small') | |
ax.tick_params(axis='y', labelsize='xx-small') | |
plt.tight_layout() | |
plt.show() | |
def find_best_threshold(self,beta=1,conf=True,report=True, print_result=True): | |
""" | |
Computes the classification threshold which gives the | |
best F_beta score from classifier predictions, | |
prints the best threshold and the corresponding F_beta score, | |
and displays a confusion matrix and classification report | |
corresponding to that threshold | |
Parameters: | |
----------- | |
beta : float | |
the desired beta value in the F_beta score | |
conf : bool | |
whether to display confusion matrix | |
report : bool | |
whether to display classification report | |
print_result : bool | |
whether to print a line reporting the best threshold | |
and resulting F_beta score | |
Returns: None (prints results and stores self.best_thresh) | |
-------- | |
""" | |
prec,rec,threshs = precision_recall_curve(self.y_val, | |
self.y_predict_proba) | |
F_betas = (1+beta**2)*(prec*rec)/((beta**2*prec)+rec) | |
# Above formula is valid when TP!=0. When TP==0 | |
# it gives np.nan whereas F_beta should be 0 | |
F_betas = np.nan_to_num(F_betas) | |
idx = np.argmax(F_betas) | |
best_thresh = threshs[idx] | |
if print_result: | |
print(f'Threshold optimizing F_{beta} score: {best_thresh}\nBest F_{beta} score: {F_betas[idx]}') | |
if conf: | |
self.score_pipeline(scoring='conf',thresh=best_thresh,beta=beta) | |
if report: | |
self.score_pipeline(scoring='classif_report',thresh=best_thresh,beta=beta) | |
self.best_thresh = best_thresh | |
class LRStudy(ClassifierStudy): | |
""" | |
A child class of ClassifierStudy which has an additional method specific to logistic regression | |
""" | |
def __init__(self, classifier=None, X = None, y = None, | |
features=None,classifier_name = 'LR', | |
random_state=42): | |
super().__init__(classifier, X, y,features,classifier_name,random_state) | |
def plot_coeff(self, print_score = True, print_zero = False, title_add=None): | |
""" | |
Method for doing a train/validation split, fitting the classifier, | |
predicting and scoring on the validation set, and plotting | |
a bar chart of the logistic regression coefficients corresponding | |
to various model features. | |
Features with coefficient zero and periodic spline features | |
will be excluded from the chart. | |
Parameters: | |
----------- | |
print_score : bool | |
if True, the validation score are printed | |
print_zero : bool | |
if True, the list of features with zero coefficients are printed | |
title_add : str or None | |
an addendum that is added to the end of the plot title | |
""" | |
assert self.pipe is not None, 'No pipeline exists; first build a pipeline using build_pipeline.' | |
assert isinstance(self.classifier,LogisticRegression),'Your classifier is not an instance of Logistic Regression.' | |
# fit and score | |
self.fit_pipeline(split_first = True) | |
self.predict_proba_pipeline() | |
score = roc_auc_score(self.y_val, self.y_predict_proba) | |
# Retrieve coeff values from fitted pipeline | |
coeff = pd.DataFrame({'feature name':self.pipe_fitted['col'].get_feature_names_out(), | |
'coeff value':self.pipe_fitted[-1].coef_.reshape(-1)})\ | |
.sort_values(by='coeff value') | |
coeff = coeff[~coeff['feature name']\ | |
.isin([f'HOUR_OF_DAY_sp_{n}' for n in range(12)]\ | |
+[f'DAY_OF_WEEK_sp_{n}' for n in range(3)])]\ | |
.set_index('feature name') | |
coeff_zero_features = coeff[coeff['coeff value']==0].index | |
coeff = coeff[coeff['coeff value']!=0] | |
# Plot feature coefficients | |
fig = plt.figure(figsize=(30,4)) | |
ax = fig.add_subplot(111) | |
coeff['coeff value'].plot(kind='bar',ylabel='coeff value',ax=ax) | |
ax.axhline(y=0, color= 'red', linewidth=2,) | |
plot_title = 'PA bicycle collisions, 2002-2021\nLogistic regression model log-odds coefficients' | |
if title_add is not None: | |
plot_title += f': {title_add}' | |
ax.set_title(plot_title) | |
ax.tick_params(axis='x', labelsize='x-small') | |
plt.show() | |
if print_score: | |
print(f'Score on validation set: {score}') | |
if print_zero: | |
print(f'Features with zero coefficients in trained model: {list(coeff_zero)}') | |
self.score = score | |
self.coeff = coeff | |
self.coeff_zero_features = coeff_zero_features | |