import gradio as gr import cv2 import numpy as np import tensorflow as tf import tensorflow_addons as tfa # Imported # Copyright 2019 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Implements Weighted kappa loss.""" from typing import Optional import tensorflow as tf from typeguard import typechecked from tensorflow_addons.utils.types import Number @tf.keras.utils.register_keras_serializable(package="Addons") class WeightedKappaLoss(tf.keras.losses.Loss): r"""Implements the Weighted Kappa loss function. Weighted Kappa loss was introduced in the [Weighted kappa loss function for multi-class classification of ordinal data in deep learning] (https://www.sciencedirect.com/science/article/abs/pii/S0167865517301666). Weighted Kappa is widely used in Ordinal Classification Problems. The loss value lies in $ [-\infty, \log 2] $, where $ \log 2 $ means the random prediction. Usage: >>> kappa_loss = tfa.losses.WeightedKappaLoss(num_classes=4) >>> y_true = tf.constant([[0, 0, 1, 0], [0, 1, 0, 0], ... [1, 0, 0, 0], [0, 0, 0, 1]]) >>> y_pred = tf.constant([[0.1, 0.2, 0.6, 0.1], [0.1, 0.5, 0.3, 0.1], ... [0.8, 0.05, 0.05, 0.1], [0.01, 0.09, 0.1, 0.8]]) >>> loss = kappa_loss(y_true, y_pred) >>> loss Usage with `tf.keras` API: >>> model = tf.keras.Model() >>> model.compile('sgd', loss=tfa.losses.WeightedKappaLoss(num_classes=4)) <... outputs should be softmax results if you want to weight the samples, just multiply the outputs by the sample weight ...> """ @typechecked def __init__( self, num_classes: int, weightage: Optional[str] = "quadratic", name: Optional[str] = "cohen_kappa_loss", epsilon: Optional[Number] = 1e-6, reduction: str = tf.keras.losses.Reduction.NONE, ): r"""Creates a `WeightedKappaLoss` instance. Args: num_classes: Number of unique classes in your dataset. weightage: (Optional) Weighting to be considered for calculating kappa statistics. A valid value is one of ['linear', 'quadratic']. Defaults to 'quadratic'. name: (Optional) String name of the metric instance. epsilon: (Optional) increment to avoid log zero, so the loss will be $ \log(1 - k + \epsilon) $, where $ k $ lies in $ [-1, 1] $. Defaults to 1e-6. Raises: ValueError: If the value passed for `weightage` is invalid i.e. not any one of ['linear', 'quadratic'] """ super().__init__(name=name, reduction=reduction) if weightage not in ("linear", "quadratic"): raise ValueError("Unknown kappa weighting type.") self.weightage = weightage self.num_classes = num_classes self.epsilon = epsilon or tf.keras.backend.epsilon() label_vec = tf.range(num_classes, dtype=tf.keras.backend.floatx()) self.row_label_vec = tf.reshape(label_vec, [1, num_classes]) self.col_label_vec = tf.reshape(label_vec, [num_classes, 1]) col_mat = tf.tile(self.col_label_vec, [1, num_classes]) row_mat = tf.tile(self.row_label_vec, [num_classes, 1]) if weightage == "linear": self.weight_mat = tf.abs(col_mat - row_mat) else: self.weight_mat = (col_mat - row_mat) ** 2 def call(self, y_true, y_pred): y_true = tf.cast(y_true, dtype=self.col_label_vec.dtype) y_pred = tf.cast(y_pred, dtype=self.weight_mat.dtype) batch_size = tf.shape(y_true)[0] cat_labels = tf.matmul(y_true, self.col_label_vec) cat_label_mat = tf.tile(cat_labels, [1, self.num_classes]) row_label_mat = tf.tile(self.row_label_vec, [batch_size, 1]) if self.weightage == "linear": weight = tf.abs(cat_label_mat - row_label_mat) else: weight = (cat_label_mat - row_label_mat) ** 2 numerator = tf.reduce_sum(weight * y_pred) label_dist = tf.reduce_sum(y_true, axis=0, keepdims=True) pred_dist = tf.reduce_sum(y_pred, axis=0, keepdims=True) w_pred_dist = tf.matmul(self.weight_mat, pred_dist, transpose_b=True) denominator = tf.reduce_sum(tf.matmul(label_dist, w_pred_dist)) denominator /= tf.cast(batch_size, dtype=denominator.dtype) loss = tf.math.divide_no_nan(numerator, denominator) return tf.math.log(loss + self.epsilon) def get_config(self): config = { "num_classes": self.num_classes, "weightage": self.weightage, "epsilon": self.epsilon, } base_config = super().get_config() return {**base_config, **config} # Copyright 2019 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Implements Cohen's Kappa.""" import tensorflow as tf import numpy as np import tensorflow.keras.backend as K from tensorflow.keras.metrics import Metric from typing import Union FloatTensorLike = Union[tf.Tensor, float, np.float16, np.float32, np.float64] AcceptableDTypes = Union[tf.DType, np.dtype, type, int, str, None] from typeguard import typechecked from typing import Optional @tf.keras.utils.register_keras_serializable(package="Addons") class CohenKappa(Metric): """Computes Kappa score between two raters. The score lies in the range `[-1, 1]`. A score of -1 represents complete disagreement between two raters whereas a score of 1 represents complete agreement between the two raters. A score of 0 means agreement by chance. Note: As of now, this implementation considers all labels while calculating the Cohen's Kappa score. Args: num_classes: Number of unique classes in your dataset. weightage: (optional) Weighting to be considered for calculating kappa statistics. A valid value is one of [None, 'linear', 'quadratic']. Defaults to `None` sparse_labels: (bool) Valid only for multi-class scenario. If True, ground truth labels are expected to be integers and not one-hot encoded. regression: (bool) If set, that means the problem is being treated as a regression problem where you are regressing the predictions. **Note:** If you are regressing for the values, the the output layer should contain a single unit. name: (optional) String name of the metric instance dtype: (optional) Data type of the metric result. Defaults to `None`. Raises: ValueError: If the value passed for `weightage` is invalid i.e. not any one of [None, 'linear', 'quadratic']. Usage: >>> y_true = np.array([4, 4, 3, 4, 2, 4, 1, 1], dtype=np.int32) >>> y_pred = np.array([4, 4, 3, 4, 4, 2, 1, 1], dtype=np.int32) >>> weights = np.array([1, 1, 2, 5, 10, 2, 3, 3], dtype=np.int32) >>> metric = tfa.metrics.CohenKappa(num_classes=5, sparse_labels=True) >>> metric.update_state(y_true , y_pred) >>> result = metric.result() >>> result.numpy() 0.61904764 >>> # To use this with weights, sample_weight argument can be used. >>> metric = tfa.metrics.CohenKappa(num_classes=5, sparse_labels=True) >>> metric.update_state(y_true , y_pred , sample_weight=weights) >>> result = metric.result() >>> result.numpy() 0.37209308 Usage with `tf.keras` API: >>> inputs = tf.keras.Input(shape=(10,)) >>> x = tf.keras.layers.Dense(10)(inputs) >>> outputs = tf.keras.layers.Dense(1)(x) >>> model = tf.keras.models.Model(inputs=inputs, outputs=outputs) >>> model.compile('sgd', loss='mse', metrics=[tfa.metrics.CohenKappa(num_classes=3, sparse_labels=True)]) """ @typechecked def __init__( self, num_classes: FloatTensorLike, name: str = "cohen_kappa", weightage: Optional[str] = None, sparse_labels: bool = False, regression: bool = False, dtype: AcceptableDTypes = None, ): """Creates a `CohenKappa` instance.""" super().__init__(name=name, dtype=dtype) if weightage not in (None, "linear", "quadratic"): raise ValueError("Unknown kappa weighting type.") if num_classes == 2: self._update = self._update_binary_class_model elif num_classes > 2: self._update = self._update_multi_class_model else: raise ValueError( """Number of classes must be greater than or euqal to two""" ) self.weightage = weightage self.num_classes = num_classes self.regression = regression self.sparse_labels = sparse_labels self.conf_mtx = self.add_weight( "conf_mtx", shape=(self.num_classes, self.num_classes), initializer=tf.keras.initializers.zeros, dtype=tf.float32, ) def update_state(self, y_true, y_pred, sample_weight=None): """Accumulates the confusion matrix condition statistics. Args: y_true: Labels assigned by the first annotator with shape `[num_samples,]`. y_pred: Labels assigned by the second annotator with shape `[num_samples,]`. The kappa statistic is symmetric, so swapping `y_true` and `y_pred` doesn't change the value. sample_weight (optional): for weighting labels in confusion matrix Defaults to `None`. The dtype for weights should be the same as the dtype for confusion matrix. For more details, please check `tf.math.confusion_matrix`. Returns: Update op. """ return self._update(y_true, y_pred, sample_weight) def _update_binary_class_model(self, y_true, y_pred, sample_weight=None): y_true = tf.cast(y_true, dtype=tf.int64) y_pred = tf.cast(y_pred, dtype=tf.float32) y_pred = tf.cast(y_pred > 0.5, dtype=tf.int64) return self._update_confusion_matrix(y_true, y_pred, sample_weight) @tf.function def _update_multi_class_model(self, y_true, y_pred, sample_weight=None): v = tf.argmax(y_true, axis=1) if not self.sparse_labels else y_true y_true = tf.cast(v, dtype=tf.int64) y_pred = self._cast_ypred(y_pred) return self._update_confusion_matrix(y_true, y_pred, sample_weight) @tf.function def _cast_ypred(self, y_pred): if tf.rank(y_pred) > 1: if not self.regression: y_pred = tf.cast(tf.argmax(y_pred, axis=-1), dtype=tf.int64) else: y_pred = tf.math.round(tf.math.abs(y_pred)) y_pred = tf.cast(y_pred, dtype=tf.int64) else: y_pred = tf.cast(y_pred, dtype=tf.int64) return y_pred @tf.function def _safe_squeeze(self, y): y = tf.squeeze(y) # Check for scalar result if tf.rank(y) == 0: y = tf.expand_dims(y, 0) return y def _update_confusion_matrix(self, y_true, y_pred, sample_weight): y_true = self._safe_squeeze(y_true) y_pred = self._safe_squeeze(y_pred) new_conf_mtx = tf.math.confusion_matrix( labels=y_true, predictions=y_pred, num_classes=self.num_classes, weights=sample_weight, dtype=tf.float32, ) return self.conf_mtx.assign_add(new_conf_mtx) def result(self): nb_ratings = tf.shape(self.conf_mtx)[0] weight_mtx = tf.ones([nb_ratings, nb_ratings], dtype=tf.float32) # 2. Create a weight matrix if self.weightage is None: diagonal = tf.zeros([nb_ratings], dtype=tf.float32) weight_mtx = tf.linalg.set_diag(weight_mtx, diagonal=diagonal) else: weight_mtx += tf.cast(tf.range(nb_ratings), dtype=tf.float32) weight_mtx = tf.cast(weight_mtx, dtype=self.dtype) if self.weightage == "linear": weight_mtx = tf.abs(weight_mtx - tf.transpose(weight_mtx)) else: weight_mtx = tf.pow((weight_mtx - tf.transpose(weight_mtx)), 2) weight_mtx = tf.cast(weight_mtx, dtype=self.dtype) # 3. Get counts actual_ratings_hist = tf.reduce_sum(self.conf_mtx, axis=1) pred_ratings_hist = tf.reduce_sum(self.conf_mtx, axis=0) # 4. Get the outer product out_prod = pred_ratings_hist[..., None] * actual_ratings_hist[None, ...] # 5. Normalize the confusion matrix and outer product conf_mtx = self.conf_mtx / tf.reduce_sum(self.conf_mtx) out_prod = out_prod / tf.reduce_sum(out_prod) conf_mtx = tf.cast(conf_mtx, dtype=self.dtype) out_prod = tf.cast(out_prod, dtype=self.dtype) # 6. Calculate Kappa score numerator = tf.reduce_sum(conf_mtx * weight_mtx) denominator = tf.reduce_sum(out_prod * weight_mtx) return tf.cond( tf.math.is_nan(denominator), true_fn=lambda: 0.0, false_fn=lambda: 1 - (numerator / denominator), ) def get_config(self): """Returns the serializable config of the metric.""" config = { "num_classes": self.num_classes, "weightage": self.weightage, "sparse_labels": self.sparse_labels, "regression": self.regression, } base_config = super().get_config() return {**base_config, **config} def reset_state(self): """Resets all of the metric state variables.""" for v in self.variables: K.set_value( v, np.zeros((self.num_classes, self.num_classes), v.dtype.as_numpy_dtype), ) def reset_states(self): # Backwards compatibility alias of `reset_state`. New classes should # only implement `reset_state`. # Required in Tensorflow < 2.5.0 return self.reset_state() # Copyright 2019 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Implements Multi-label confusion matrix scores.""" import warnings import tensorflow as tf from tensorflow.keras import backend as K from tensorflow.keras.metrics import Metric import numpy as np from typeguard import typechecked class MultiLabelConfusionMatrix(Metric): """Computes Multi-label confusion matrix. Class-wise confusion matrix is computed for the evaluation of classification. If multi-class input is provided, it will be treated as multilabel data. Consider classification problem with two classes (i.e num_classes=2). Resultant matrix `M` will be in the shape of `(num_classes, 2, 2)`. Every class `i` has a dedicated matrix of shape `(2, 2)` that contains: - true negatives for class `i` in `M(0,0)` - false positives for class `i` in `M(0,1)` - false negatives for class `i` in `M(1,0)` - true positives for class `i` in `M(1,1)` Args: num_classes: `int`, the number of labels the prediction task can have. name: (Optional) string name of the metric instance. dtype: (Optional) data type of the metric result. Usage: >>> # multilabel confusion matrix >>> y_true = np.array([[1, 0, 1], [0, 1, 0]], dtype=np.int32) >>> y_pred = np.array([[1, 0, 0], [0, 1, 1]], dtype=np.int32) >>> metric = tfa.metrics.MultiLabelConfusionMatrix(num_classes=3) >>> metric.update_state(y_true, y_pred) >>> result = metric.result() >>> result.numpy() #doctest: -DONT_ACCEPT_BLANKLINE array([[[1., 0.], [0., 1.]], [[1., 0.], [0., 1.]], [[0., 1.], [1., 0.]]], dtype=float32) >>> # if multiclass input is provided >>> y_true = np.array([[1, 0, 0], [0, 1, 0]], dtype=np.int32) >>> y_pred = np.array([[1, 0, 0], [0, 0, 1]], dtype=np.int32) >>> metric = tfa.metrics.MultiLabelConfusionMatrix(num_classes=3) >>> metric.update_state(y_true, y_pred) >>> result = metric.result() >>> result.numpy() #doctest: -DONT_ACCEPT_BLANKLINE array([[[1., 0.], [0., 1.]], [[1., 0.], [1., 0.]], [[1., 1.], [0., 0.]]], dtype=float32) """ @typechecked def __init__( self, num_classes: FloatTensorLike, name: str = "Multilabel_confusion_matrix", dtype: AcceptableDTypes = None, **kwargs, ): super().__init__(name=name, dtype=dtype) self.num_classes = num_classes self.true_positives = self.add_weight( "true_positives", shape=[self.num_classes], initializer="zeros", dtype=self.dtype, ) self.false_positives = self.add_weight( "false_positives", shape=[self.num_classes], initializer="zeros", dtype=self.dtype, ) self.false_negatives = self.add_weight( "false_negatives", shape=[self.num_classes], initializer="zeros", dtype=self.dtype, ) self.true_negatives = self.add_weight( "true_negatives", shape=[self.num_classes], initializer="zeros", dtype=self.dtype, ) def update_state(self, y_true, y_pred, sample_weight=None): if sample_weight is not None: warnings.warn( "`sample_weight` is not None. Be aware that MultiLabelConfusionMatrix " "does not take `sample_weight` into account when computing the metric " "value." ) y_true = tf.cast(y_true, tf.int32) y_pred = tf.cast(y_pred, tf.int32) # true positive true_positive = tf.math.count_nonzero(y_true * y_pred, 0) # predictions sum pred_sum = tf.math.count_nonzero(y_pred, 0) # true labels sum true_sum = tf.math.count_nonzero(y_true, 0) false_positive = pred_sum - true_positive false_negative = true_sum - true_positive y_true_negative = tf.math.not_equal(y_true, 1) y_pred_negative = tf.math.not_equal(y_pred, 1) true_negative = tf.math.count_nonzero( tf.math.logical_and(y_true_negative, y_pred_negative), axis=0 ) # true positive state update self.true_positives.assign_add(tf.cast(true_positive, self.dtype)) # false positive state update self.false_positives.assign_add(tf.cast(false_positive, self.dtype)) # false negative state update self.false_negatives.assign_add(tf.cast(false_negative, self.dtype)) # true negative state update self.true_negatives.assign_add(tf.cast(true_negative, self.dtype)) def result(self): flat_confusion_matrix = tf.convert_to_tensor( [ self.true_negatives, self.false_positives, self.false_negatives, self.true_positives, ] ) # reshape into 2*2 matrix confusion_matrix = tf.reshape(tf.transpose(flat_confusion_matrix), [-1, 2, 2]) return confusion_matrix def get_config(self): """Returns the serializable config of the metric.""" config = { "num_classes": self.num_classes, } base_config = super().get_config() return {**base_config, **config} def reset_state(self): reset_value = np.zeros(self.num_classes, dtype=np.int32) K.batch_set_value([(v, reset_value) for v in self.variables]) def reset_states(self): # Backwards compatibility alias of `reset_state`. New classes should # only implement `reset_state`. # Required in Tensorflow < 2.5.0 return self.reset_state() ##### IMAGE_SIZE = 128 NUM_CLASSES = 3 def preprocess_image(image, target_size, add_clahe=True, clip_limit=4, tile_grid_size=(40,40), all_clahe=True): """ Preprocess the images to remove black borders and improve contrast using Y channel and CLAHE """ try: # Crop the image to remove black borders cropped_rgb_image = remove_black_borders(image) if add_clahe: equalised_image = apply_clahe( cropped_rgb_image, clip_limit, tile_grid_size, all_clahe) # Resize the image to target size resized_image = cv2.resize(equalised_image, (target_size, target_size)) else: # Resize the image to target size resized_image = cv2.resize(cropped_rgb_image, (target_size, target_size)) return resized_image except Exception as e: print("Error processing image: ") print(e) def remove_black_borders(image): """ Crop the image to remove black borders """ green_channel_image = image[:, :, 1] # Find the contours in the green channel contours, _ = cv2.findContours( green_channel_image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) # Find the largest contour largest_contour = max(contours, key=cv2.contourArea) # Get the bounding rectangle of the largest contour x, y, w, h = cv2.boundingRect(largest_contour) # Create a mask with the same size as the bounding rectangle mask = np.zeros((h, w), np.uint8) # Draw the largest contour on the mask cv2.drawContours(mask, [largest_contour - [x, y]], 0, 255, -1) # Convert the mask to a 3 channel image mask_3_channel = cv2.merge([mask, mask, mask]) # Crop the image using the mask cropped_image = cv2.bitwise_and(image[y:y+h, x:x+w], mask_3_channel) return cropped_image def apply_clahe(image, clip_limit=4, tile_grid_size=(40,40), all_clahe=False): """ Preprocess the image """ try: # Extract the y channel from the cropped rgb image yuv_image = cv2.cvtColor(image, cv2.COLOR_RGB2YUV) cropped_y_channel_image = yuv_image[:, :, 0] # Apply CLAHE to the y channel of the cropped image clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid_size) enhanced_y_channel = clahe.apply(cropped_y_channel_image) if all_clahe: cropped_u_channel = yuv_image[:, :, 1] enhanced_u_channel = clahe.apply(cropped_u_channel) cropped_v_channel = yuv_image[:, :, 2] enhanced_v_channel = clahe.apply(cropped_v_channel) enhanced_yuv_image = np.stack([enhanced_y_channel, enhanced_u_channel, enhanced_v_channel], axis=-1) # Convert YUV to RGB enhanced_rgb_image = cv2.cvtColor(enhanced_yuv_image, cv2.COLOR_YUV2RGB) return enhanced_rgb_image else: # Convert the y channel image to grayscale enhanced_grayscale_image = cv2.convertScaleAbs(enhanced_y_channel, alpha=(255/219)) # Repeat the equalised grayscale for all 3-channels enhanced_grayscale_3_channels = np.stack((enhanced_grayscale_image,) * 3, axis=-1) return enhanced_grayscale_3_channels except Exception as e: print("Error processing image: ", e) def load_vgg16_model(): vgg16_DR = tf.keras.models.load_model('./VGG16_32_128_CLAHE_4_40_trainableFalse.h5', custom_objects={ 'WeightedKappaLoss': tfa.losses.WeightedKappaLoss, 'CohenKappa': tfa.metrics.CohenKappa, 'F1Score': tf.keras.metrics.F1Score, 'MultiLabelConfusionMatrix': tfa.metrics.MultiLabelConfusionMatrix }) return vgg16_DR def predict_input_image(img): img = preprocess_image(img, IMAGE_SIZE, add_clahe=True, clip_limit=4, tile_grid_size=(40,40), all_clahe=False) img_4d=img.reshape(-1, IMAGE_SIZE, IMAGE_SIZE, 3) model = load_vgg16_model() prediction=model.predict([img_4d])[0] dr_classes = ['No DR', 'Mild DR', 'Referable DR'] return {dr_classes[i]: float(prediction[i]) for i in range(3)} image = gr.inputs.Image(shape=(IMAGE_SIZE, IMAGE_SIZE)) label = gr.outputs.Label(num_top_classes=NUM_CLASSES) examples_dir = './example_images' iface = gr.Interface( fn=predict_input_image, inputs=image, outputs=label, title="Diabetic Retinopathy (DR) Screener", description="Submit a retinal image to classify it into No DR, Mild DR, or Referable DR (Moderate/Severe/Proliferative DR)\n- No DR: Rescreen in 12 months.\n- Mild DR: Rescreen in 6 months but be mindful of your sugar level to delay symptoms.\n- Referable DR: Please visit an eye specialist immediately!", article="Note that this Deep Neural Network (DNN) model has an 89% Quadratic Weighted Kappa (QWK) score and 87% Sensitivity and as such, may not always be correct.\nPlease consult an eye specialist to validate the results. If you're an eyecare professional, please help improve the model by flagging incorrect predictions for future model retraining. Thank you!", examples=examples_dir, allow_flagging="manual", flagging_options=["Incorrect! Should be Referable DR", "Incorrect! Should be Mild DR", "Incorrect! Should be No DR", "Ambiguous"] ) iface.launch()