import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' import gc import cv2 import math import time import random import tf_clahe import numpy as np import pandas as pd from tqdm import tqdm from scipy import ndimage from PIL import Image # from keras_cv.utils import conv_utils import streamlit as st import matplotlib.pyplot as plt from matplotlib.colors import Normalize from matplotlib.cm import ScalarMappable import matplotlib matplotlib.use('Agg') from skimage import exposure from skimage.filters import gaussian from skimage.restoration import denoise_nl_means, estimate_sigma from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay import tensorflow as tf import tensorflow_addons as tfa from tensorflow.keras.optimizers import Adam from tensorflow.keras.utils import plot_model from tensorflow.keras.models import Sequential, Model from tensorflow.keras.__internal__.layers import BaseRandomLayer from tensorflow.keras import layers from tensorflow.keras.layers import ( Dense, Flatten, Conv2D, Activation, BatchNormalization, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D, Dropout, Input, concatenate, add, Conv2DTranspose, Lambda, SpatialDropout2D, Cropping2D, UpSampling2D, LeakyReLU, ZeroPadding2D, Reshape, Concatenate, Multiply, Permute, Add ) from tensorflow.keras.applications import ( InceptionResNetV2, DenseNet201, ResNet152V2, VGG19, EfficientNetV2M, ResNet50V2, Xception, InceptionV3, EfficientNetV2S, EfficientNetV2B3, ResNet50, ConvNeXtBase, RegNetX032 ) st.set_option('deprecation.showPyplotGlobalUse', False) import ultralytics ultralytics.checks() from ultralytics import YOLO IMAGE_SIZE = 224 NUM_CLASSES = 3 yolo_weight = './weights_yolo/oai_s_best4.pt' seg_model = YOLO(yolo_weight) def find_boundaries(mask, start, end, top=True, verbose=0): # nếu top = True, tìm đường bao bên trên cùng từ left đến right # nếu top = False, tìm đường bao dưới cùng từ left đến right boundaries = [] height, width = mask.shape contours, _ = cv2.findContours(255 * mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) areas = np.array([cv2.contourArea(cnt) for cnt in contours]) contour = contours[areas.argmax()] contour = contour.reshape(-1, 2) org_contour = contour.copy() start_idx = ((start - contour) ** 2).sum(axis=-1).argmin() end_idx = ((end - contour) ** 2).sum(axis=-1).argmin() if start_idx <= end_idx: contour = contour[start_idx:end_idx + 1] else: contour = np.concatenate([contour[start_idx:], contour[:end_idx + 1]]) if top: sorted_indices = np.argsort(contour[:, 1])[::-1] else: sorted_indices = np.argsort(contour[:, 1]) contour = contour[sorted_indices] unique_indices = sorted(np.unique(contour[:, 0], return_index=True)[1]) contour = contour[unique_indices] sorted_indices = np.argsort(contour[:, 0]) contour = contour[sorted_indices] if verbose: temp = draw_points(127 * mask.astype(np.uint8), contour, thickness=5) temp = draw_points(temp, [start, end], color=[155, 155], thickness=15) cv2_imshow(temp) return np.array(contour), np.array(org_contour) def get_contours(mask, verbose=0): limit_points = detect_limit_points(mask, verbose=verbose) upper_contour, full_upper = find_boundaries(mask == 1, limit_points[0], limit_points[1], top=False, verbose=verbose) lower_contour, full_lower = find_boundaries(mask == 2, limit_points[3], limit_points[2], top=True, verbose=verbose) if verbose: temp = draw_points(127 * mask, full_upper, thickness=3, color=(255, 0, 0)) temp = draw_points(temp, full_lower, thickness=3) cv2_imshow(temp) cv2.imwrite('full.png', temp) temp = draw_points(temp, limit_points, thickness=7, color=(0, 0, 255)) cv2_imshow(temp) cv2.imwrite('limit_points.png', temp) if verbose: temp = draw_points(127 * mask, upper_contour, thickness=3, color=(255, 0, 0)) temp = draw_points(temp, lower_contour, thickness=3) cv2_imshow(temp) cv2.imwrite('cropped.png', temp) return upper_contour, lower_contour def cv2_imshow(images): if not isinstance(images, list): images = [images] num_images = len(images) # Hiển thị ảnh đơn lẻ trực tiếp bằng imshow if num_images == 1: image = images[0] if len(image.shape) == 3 and image.shape[2] == 3: # Ảnh màu (RGB) image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) plt.imshow(image_rgb) else: # Ảnh xám plt.imshow(image, cmap='gray') plt.axis("off") plt.show() else: # Hiển thị nhiều ảnh trên cùng một cột fig, ax = plt.subplots(num_images, 1, figsize=(4, 4 * num_images)) for i in range(num_images): image = images[i] if len(image.shape) == 3 and image.shape[2] == 3: # Ảnh màu (RGB) image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) ax[i].imshow(image_rgb) else: # Ảnh xám ax[i].imshow(image, cmap='gray') ax[i].axis("off") plt.tight_layout() plt.show() def to_color(image): if len(image.shape) == 3 and image.shape[-1] == 3: return image return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) def to_gray(image): if len(image.shape) == 3 and image.shape[-1] == 3: return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) return image def apply_clahe(image, clip_limit=2.0, tile_grid_size=(8, 8)): # Convert the image to grayscale if it's a color image if len(image.shape) == 3: gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray_image = image # Create a CLAHE object clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid_size) # Apply CLAHE to the grayscale image equalized_image = clahe.apply(gray_image) return equalized_image def detect_edge(image, minVal=100, maxVal=200, blur_size=(5, 5)): image_gray = to_gray(image) blurred_image = cv2.GaussianBlur(image_gray, blur_size, 0) # Phát hiện biên cạnh bằng thuật toán Canny edges = cv2.Canny(blurred_image, minVal, maxVal) return edges def show_mask2(image, mask, label2color={1: (255, 255, 0), 2: (0, 255, 255)}, alpha=0.1): # Tạo hình ảnh mask từ mask và bảng ánh xạ màu image = to_color(image) mask_image = np.zeros_like(image) for label, color in label2color.items(): mask_image[mask == label] = color mask_image = cv2.addWeighted(image, 1 - alpha, mask_image, alpha, 0) # Hiển thị hình ảnh và mask fig, ax = plt.subplots(1, 2, figsize=(10, 5)) ax[0].imshow(image) ax[0].set_title("Image") ax[0].axis("off") ax[1].imshow(mask_image) ax[1].set_title("Mask") ax[1].axis("off") plt.show() def combine_mask(image, mask, label2color={1: (255, 255, 0), 2: (0, 255, 255)}, alpha=0.1): image = to_color(image) mask_image = np.zeros_like(image) for label, color in label2color.items(): mask_image[mask == label] = color mask_image = cv2.addWeighted(image, 1 - alpha, mask_image, alpha, 0) return mask_image ## help function import random def draw_points(image, points, color=None, random_color=False, same=True, thickness=1): if color is None and not random_color: color = (0, 255, 0) # Màu mặc định là xanh lá cây (BGR) if random_color: color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) image = to_color(image) for point in points: if random_color and not same: color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) x, y = point image = cv2.circle(image, (x, y), thickness, color, -1) # Vẽ điểm lên ảnh return image def draw_lines(image, pairs, color=None, random_color=False, same=True, thickness=1): image_with_line = to_color(np.copy(image)) if color is None and not random_color: color = (0, 255, 0) # Màu mặc định là xanh lá cây (BGR) if random_color: color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) # Vẽ đường thẳng dựa trên danh sách các cặp điểm for pair in pairs: if random_color and not same: color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) start_point = pair[0] end_point = pair[1] image_with_line = cv2.line(image_with_line, start_point, end_point, color, thickness) image_with_line = cv2.circle(image_with_line, start_point, thickness + 1, color, -1) image_with_line = cv2.circle(image_with_line, end_point, thickness + 1, color, -1) return image_with_line def detect_limit_points(mask, verbose=0): # tìm giới hạn hai bên của khớp gối h, w = mask.shape res = [] upper_pivot = np.array([0, w // 2]) # r c lower_pivot = np.array([h, w // 2]) # r c left_slice = slice(0, w // 2) right_slice = slice(w // 2, None) center_slice = slice(int(0.2 * h), int(0.8 * h)) left = np.zeros_like(mask) left[center_slice, left_slice] = mask[center_slice, left_slice] right = np.zeros_like(mask) right[center_slice, right_slice] = mask[center_slice, right_slice] if verbose: cv2_imshow([left, right]) pivot = np.array([0, w]) coords = np.argwhere(left == 1) distances = ((coords - pivot) ** 2).sum(axis=-1) point = coords[distances.argmax()][::-1] res.append(point) pivot = np.array([0, 0]) coords = np.argwhere(right == 1) distances = ((coords - pivot) ** 2).sum(axis=-1) point = coords[distances.argmax()][::-1] res.append(point) pivot = np.array([h, w]) coords = np.argwhere(left == 2) distances = ((coords - pivot) ** 2).sum(axis=-1) point = coords[distances.argmax()][::-1] res.append(point) pivot = np.array([h, 0]) coords = np.argwhere(right == 2) distances = ((coords - pivot) ** 2).sum(axis=-1) point = coords[distances.argmax()][::-1] res.append(point) if verbose: cv2_imshow(draw_points(127 * mask, res)) return res def center(contour): # array = contour[:,1] # min_value = np.min(array) # argmax_indices = np.argwhere(array == min_value) # if len(argmax_indices) == 1: # i = argmax_indices[0] # else: # i = int(np.median(argmax_indices)) # return contour[i] idx = len(contour) // 2 return contour[idx] def pooling_array(array, n, mode='mean'): if mode == 'mean': pool = lambda x: np.mean(x) elif mode == 'min': pool = lambda x: np.min(x) elif mode == 'sum': pool = lambda x: np.sum(x) if n == 1: return pool(array) array_length = len(array) if array_length < n: return array segment_length = array_length // n remaining_elements = array_length % n if remaining_elements == 0: segments = np.split(array, n) else: mid = remaining_elements * (segment_length + 1) segments = np.split(array[:mid], remaining_elements) segments += np.split(array[mid:], n - remaining_elements) segments = [pool(segment) for segment in segments] return np.array(segments) def distance(mask, upper_contour, lower_contour, p=0.12, verbose=0): x_center = (center(lower_contour)[0] + center(upper_contour)[0]) // 2 length = (lower_contour[-1, 0] - lower_contour[0, 0] + upper_contour[-1, 0] - upper_contour[0, 0]) / 2 crop_length = int(p * length) left = x_center - crop_length // 2 right = x_center + crop_length // 2 x_min = max(lower_contour[0, 0], upper_contour[0, 0]) x_max = min(lower_contour[-1, 0], upper_contour[-1, 0]) left_idx = np.where(lower_contour[:, 0] == left)[0][0] right_idx = np.where(lower_contour[:, 0] == right)[0][0] left_lower_contour = lower_contour[left_idx:] right_lower_contour = lower_contour[:right_idx + 1][::-1] left_lower_contour = lower_contour[(lower_contour[:, 0] <= left) & (lower_contour[:, 0] >= x_min)] right_lower_contour = lower_contour[(lower_contour[:, 0] >= right) & (lower_contour[:, 0] <= x_max)][::-1] left_upper_contour = upper_contour[(upper_contour[:, 0] <= left) & (upper_contour[:, 0] >= x_min)] right_upper_contour = upper_contour[(upper_contour[:, 0] >= right) & (upper_contour[:, 0] <= x_max)][::-1] if verbose == 1: temp = draw_points(mask * 127, left_lower_contour, color=(0, 255, 0), thickness=3) temp = draw_points(temp, right_lower_contour, color=(0, 255, 0), thickness=3) temp = draw_points(temp, left_upper_contour, color=(255, 0, 0), thickness=3) temp = draw_points(temp, right_upper_contour, color=(255, 0, 0), thickness=3) cv2_imshow(temp) cv2.imwrite('center_cropped.png', temp) links = list(zip(left_upper_contour, left_lower_contour)) + list(zip(right_upper_contour, right_lower_contour)) temp = left_upper_contour, right_upper_contour, left_lower_contour, right_lower_contour return left_lower_contour[:, 1] - left_upper_contour[:, 1], right_lower_contour[:, 1] - right_upper_contour[:, 1], links, temp # return None, None, links,temp def getMiddle(mask, contour, verbose=0): X = contour[:, 0].reshape(-1, 1) y = contour[:, 1] reg = LinearRegression().fit(X, y) i_min = np.argmin(y[int(len(y) * 0.2):int(len(y) * 0.8)]) + int(len(y) * 0.2) left = i_min - 1 right = i_min + 1 left_check = False right_check = False if verbose == 1: cmask = draw_points(mask, contour, thickness=2, color=(255, 0, 0)) cmask = draw_points(cmask, np.hstack([X, reg.predict(X).reshape(-1, 1).astype('int')])) cv2_imshow(cmask) plt.show() while True: while not left_check: if y[left] > reg.predict(X[left].reshape(-1, 1)): break left -= 1 while not right_check: if y[right] > reg.predict(X[right].reshape(-1, 1)): break right += 1 if verbose == 1: cmask = draw_points(cmask, [contour[left]], thickness=10, color=(255, 255, 0)) cmask = draw_points(cmask, [contour[right]], thickness=7, color=(255, 0, 255)) cv2_imshow(cmask) plt.show() left_min = np.argmin(y[int(len(y) * 0.2):left]) + int(len(y) * 0.2) if int(len(y) * 0.2) < left else left right_min = np.argmin(y[right:int(len(y) * 0.8)]) + right if right < int(len(y) * 0.8) else right if y[left_min] > reg.predict(X[left_min].reshape(-1, 1)): left_check = True if y[right_min] > reg.predict(X[right_min].reshape(-1, 1)): right_check = True if right_check and left_check: break left = left_min - 1 right = right_min + 1 return min(X.flatten()[left], X.flatten()[right]), max(X.flatten()[left], X.flatten()[right]) def get_JSW(mask, dim=None, pool='mean', p=0.3, verbose=0): if isinstance(mask, str): mask = cv2.imread(mask, 0) if mask is None: return np.zeros(10), np.zeros(10) uc, lc = get_contours(mask, verbose=verbose) left_distances, right_distances, links, contours = distance(mask, uc, lc, p=p, verbose=verbose) if verbose: print('in getjsw') temp = draw_points(mask * 127, contours[0], thickness=3, color=(255, 0, 0)) temp = draw_points(temp, contours[1], thickness=3, color=(255, 0, 0)) temp = draw_points(temp, contours[2], thickness=3, color=(0, 255, 0)) temp = draw_points(temp, contours[3], thickness=3, color=(0, 255, 0)) temp = draw_lines(temp, links[::6], color=(0, 0, 255)) cv2_imshow(temp) cv2.imwrite("drawn_lines.png", temp) if dim: left_distances = pooling_array(left_distances, dim, pool) right_distances = pooling_array(right_distances, dim, pool) return left_distances, right_distances def seg(img_path, model=seg_model, verbose=0, combine=False): img = cv2.imdecode(np.fromstring(img_path.read(), np.uint8), 1) # img = cv2.imdecode(np.frombuffer(img_path.read(), np.uint8), 1) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE) eimg = cv2.equalizeHist(img) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) eimg = clahe.apply(eimg) eimg = to_color(eimg) res = seg_model(eimg, verbose=False) mask = res[0].masks.data[0] * (res[0].boxes.cls[0] + 1) + res[0].masks.data[1] * (res[0].boxes.cls[1] + 1) mask = mask.cpu().numpy() if verbose == 1: cv2_imshow(eimg) cv2.imwrite('original.png', eimg) cv2_imshow(combine_mask(eimg, mask)) plt.show() if combine: mask = combine_mask(eimg, mask) s1 = np.sum(mask == 1) s2 = np.sum(mask == 2) return mask def split_img(img): img_size = img.shape return img[:, :(img_size[1] // 3), :], img[:, (img_size[1] // 3 * 2):, :] def combine_mask(image, mask, label2color={1: (255, 255, 0), 2: (0, 255, 255)}, alpha=0.1): image = to_color(image) image = cv2.resize(image, mask.shape) mask_image = np.zeros_like(image) for label, color in label2color.items(): mask_image[mask == label] = color mask_image = cv2.addWeighted(image, 1 - alpha, mask_image, alpha, 0) return mask_image def check_outliers(mask): pass def find_boundaries_v2(mask, top=True, verbose=0): boundaries = [] height, width = mask.shape contours, _ = cv2.findContours(255 * mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) areas = np.array([cv2.contourArea(cnt) for cnt in contours]) contour = contours[areas.argmax()] contour = contour.reshape(-1, 2) org_contour = contour.copy() pos = (contour[:, 1].max() + contour[:, 1].min()) // 2 idx = np.where(contour[:, 1] == pos) if contour[idx[0][0]][0] < contour[idx[0][1]][0] and not top: start = contour[idx[0][0]] end = contour[idx[0][1]] else: end = contour[idx[0][0]] start = contour[idx[0][1]] start_idx = ((start - contour) ** 2).sum(axis=-1).argmin() end_idx = ((end - contour) ** 2).sum(axis=-1).argmin() if start_idx <= end_idx: contour = contour[start_idx:end_idx + 1] else: contour = np.concatenate([contour[start_idx:], contour[:end_idx + 1]]) if verbose: temp = draw_points(127 * mask.astype(np.uint8), contour, thickness=5) temp = draw_points(temp, [start, end], color=[155, 155], thickness=15) cv2_imshow(temp) return np.array(contour), np.array(org_contour) def get_contours_v2(mask, verbose=0): upper_contour, full_upper = find_boundaries_v2(mask == 1, top=False, verbose=verbose) lower_contour, full_lower = find_boundaries_v2(mask == 2, top=True, verbose=verbose) if verbose: temp = draw_points(127 * mask, full_upper, thickness=3, color=(255, 0, 0)) temp = draw_points(temp, full_lower, thickness=3) plt.imshow(temp) plt.title("Segmentation") plt.axis('off') plt.show() st.pyplot() # cv2.imwrite('full.png', temp) # temp = draw_points(temp, limit_points, thickness = 7, color = (0, 0, 255)) # cv2_imshow(temp) # cv2.imwrite('limit_points.png', temp) if verbose: temp = draw_points(127 * mask, upper_contour, thickness=3, color=(255, 0, 0)) temp = draw_points(temp, lower_contour, thickness=3) cv2_imshow(temp) # st.pyplot() # cv2.imwrite('cropped.png', temp) return upper_contour, lower_contour def normalize_tuple(value, n, name, allow_zero=False): """Transforms non-negative/positive integer/integers into an integer tuple. Args: value: The value to validate and convert. Could an int, or any iterable of ints. n: The size of the tuple to be returned. name: The name of the argument being validated, e.g. "strides" or "kernel_size". This is only used to format error messages. allow_zero: Default to False. A ValueError will raised if zero is received and this param is False. Returns: A tuple of n integers. Raises: ValueError: If something else than an int/long or iterable thereof or a negative value is passed. """ error_msg = ( f"The `{name}` argument must be a tuple of {n} " f"integers. Received: {value}" ) if isinstance(value, int): value_tuple = (value,) * n else: try: value_tuple = tuple(value) except TypeError: raise ValueError(error_msg) if len(value_tuple) != n: raise ValueError(error_msg) for single_value in value_tuple: try: int(single_value) except (ValueError, TypeError): error_msg += ( f"including element {single_value} of " f"type {type(single_value)}" ) raise ValueError(error_msg) if allow_zero: unqualified_values = {v for v in value_tuple if v < 0} req_msg = ">= 0" else: unqualified_values = {v for v in value_tuple if v <= 0} req_msg = "> 0" if unqualified_values: error_msg += ( f" including {unqualified_values}" f" that does not satisfy the requirement `{req_msg}`." ) raise ValueError(error_msg) return value_tuple def adjust_pretrained_weights(model_cls, input_size, name=None): weights_model = model_cls(weights='imagenet', include_top=False, input_shape=(*input_size, 3)) target_model = model_cls(weights=None, include_top=False, input_shape=(*input_size, 1)) weights = weights_model.get_weights() weights[0] = np.sum(weights[0], axis=2, keepdims=True) target_model.set_weights(weights) del weights_model tf.keras.backend.clear_session() gc.collect() if name: target_model._name = name return target_model from keras import backend as K def squeeze_excite_block(input, ratio=16): ''' Create a channel-wise squeeze-excite block Args: input: input tensor filters: number of output filters Returns: a keras tensor References - [Squeeze and Excitation Networks](https://arxiv.org/abs/1709.01507) ''' init = input channel_axis = 1 if K.image_data_format() == "channels_first" else -1 filters = int(init.shape[channel_axis]) se_shape = (1, 1, filters) se = GlobalAveragePooling2D()(init) se = Reshape(se_shape)(se) se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se) se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se) if K.image_data_format() == 'channels_first': se = Permute((3, 1, 2))(se) x = Multiply()([init, se]) return x def spatial_squeeze_excite_block(input): ''' Create a spatial squeeze-excite block Args: input: input tensor Returns: a keras tensor References - [Concurrent Spatial and Channel Squeeze & Excitation in Fully Convolutional Networks](https://arxiv.org/abs/1803.02579) ''' se = Conv2D(1, (1, 1), activation='sigmoid', use_bias=False, kernel_initializer='he_normal')(input) x = Multiply()([input, se]) return x def channel_spatial_squeeze_excite(input, ratio=16): ''' Create a spatial squeeze-excite block Args: input: input tensor filters: number of output filters Returns: a keras tensor References - [Squeeze and Excitation Networks](https://arxiv.org/abs/1709.01507) - [Concurrent Spatial and Channel Squeeze & Excitation in Fully Convolutional Networks](https://arxiv.org/abs/1803.02579) ''' cse = squeeze_excite_block(input, ratio) sse = spatial_squeeze_excite_block(input) x = Add()([cse, sse]) return x def DoubleConv(filters, kernel_size, initializer='glorot_uniform'): def layer(x): x = Conv2D(filters, kernel_size, padding='same', kernel_initializer=initializer)(x) x = BatchNormalization()(x) x = Activation('swish')(x) x = Conv2D(filters, kernel_size, padding='same', kernel_initializer=initializer)(x) x = BatchNormalization()(x) x = Activation('swish')(x) return x return layer def UpSampling2D_block(filters, kernel_size=(3, 3), upsample_rate=(2, 2), interpolation='bilinear', initializer='glorot_uniform', skip=None): def layer(input_tensor): x = UpSampling2D(size=upsample_rate, interpolation=interpolation)(input_tensor) if skip is not None: x = Concatenate()([x, skip]) x = DoubleConv(filters, kernel_size, initializer=initializer)(x) x = channel_spatial_squeeze_excite(x) return x return layer def Conv2DTranspose_block(filters, transpose_kernel_size=(3, 3), upsample_rate=(2, 2), initializer='glorot_uniform', skip=None, met_input=None, sat_input=None): def layer(input_tensor): x = Conv2DTranspose(filters, transpose_kernel_size, strides=upsample_rate, padding='same')(input_tensor) if skip is not None: x = Concatenate()([x, skip]) x = DoubleConv(filters, transpose_kernel_size, initializer=initializer)(x) x = channel_spatial_squeeze_excite(x) return x return layer def PixelShuffle_block(filters, kernel_size=(3, 3), upsample_rate=2, initializer='glorot_uniform', skip=None, met_input=None, sat_input=None): def layer(input_tensor): x = Conv2D(filters * (upsample_rate ** 2), kernel_size, padding="same", activation="swish", kernel_initializer='Orthogonal')(input_tensor) x = tf.nn.depth_to_space(x, upsample_rate) if skip is not None: x = Concatenate()([x, skip]) x = DoubleConv(filters, kernel_size, initializer=initializer)(x) x = channel_spatial_squeeze_excite(x) return x return layer class DropBlockNoise(BaseRandomLayer): def __init__( self, rate, block_size, seed=None, **kwargs, ): super().__init__(seed=seed, **kwargs) if not 0.0 <= rate <= 1.0: raise ValueError( f"rate must be a number between 0 and 1. " f"Received: {rate}" ) self._rate = rate ( self._dropblock_height, self._dropblock_width, ) = normalize_tuple( value=block_size, n=2, name="block_size", allow_zero=False ) self.seed = seed def call(self, x, training=None): if not training or self._rate == 0.0: return x _, height, width, _ = tf.split(tf.shape(x), 4) # Unnest scalar values height = tf.squeeze(height) width = tf.squeeze(width) dropblock_height = tf.math.minimum(self._dropblock_height, height) dropblock_width = tf.math.minimum(self._dropblock_width, width) gamma = ( self._rate * tf.cast(width * height, dtype=tf.float32) / tf.cast(dropblock_height * dropblock_width, dtype=tf.float32) / tf.cast( (width - self._dropblock_width + 1) * (height - self._dropblock_height + 1), tf.float32, ) ) # Forces the block to be inside the feature map. w_i, h_i = tf.meshgrid(tf.range(width), tf.range(height)) valid_block = tf.logical_and( tf.logical_and( w_i >= int(dropblock_width // 2), w_i < width - (dropblock_width - 1) // 2, ), tf.logical_and( h_i >= int(dropblock_height // 2), h_i < width - (dropblock_height - 1) // 2, ), ) valid_block = tf.reshape(valid_block, [1, height, width, 1]) random_noise = self._random_generator.random_uniform( tf.shape(x), dtype=tf.float32 ) valid_block = tf.cast(valid_block, dtype=tf.float32) seed_keep_rate = tf.cast(1 - gamma, dtype=tf.float32) block_pattern = (1 - valid_block + seed_keep_rate + random_noise) >= 1 block_pattern = tf.cast(block_pattern, dtype=tf.float32) window_size = [1, self._dropblock_height, self._dropblock_width, 1] # Double negative and max_pool is essentially min_pooling block_pattern = -tf.nn.max_pool( -block_pattern, ksize=window_size, strides=[1, 1, 1, 1], padding="SAME", ) return ( x * tf.cast(block_pattern, x.dtype) ) def get_efficient_unet(name=None, option='full', input_shape=(IMAGE_SIZE, IMAGE_SIZE, 1), encoder_weights=None, block_type='conv-transpose', output_activation='sigmoid', kernel_initializer='glorot_uniform'): if encoder_weights == 'imagenet': encoder = adjust_pretrained_weights(EfficientNetV2S, input_shape[:-1], name) elif encoder_weights is None: encoder = EfficientNetV2S(weights=None, include_top=False, input_shape=input_shape) encoder._name = name else: raise ValueError(encoder_weights) if option == 'encoder': return encoder MBConvBlocks = [] skip_candidates = ['1b', '2d', '3d', '4f'] for mbblock_nr in skip_candidates: mbblock = encoder.get_layer('block{}_add'.format(mbblock_nr)).output MBConvBlocks.append(mbblock) head = encoder.get_layer('top_activation').output blocks = MBConvBlocks + [head] if block_type == 'upsampling': UpBlock = UpSampling2D_block elif block_type == 'conv-transpose': UpBlock = Conv2DTranspose_block elif block_type == 'pixel-shuffle': UpBlock = PixelShuffle_block else: raise ValueError(block_type) o = blocks.pop() o = UpBlock(512, initializer=kernel_initializer, skip=blocks.pop())(o) o = UpBlock(256, initializer=kernel_initializer, skip=blocks.pop())(o) o = UpBlock(128, initializer=kernel_initializer, skip=blocks.pop())(o) o = UpBlock(64, initializer=kernel_initializer, skip=blocks.pop())(o) o = UpBlock(32, initializer=kernel_initializer, skip=None)(o) o = Conv2D(input_shape[-1], (1, 1), padding='same', activation=output_activation, kernel_initializer=kernel_initializer)(o) model = Model(encoder.input, o, name=name) if option == 'full': return model, encoder elif option == 'model': return model else: raise ValueError(option) def acc(y_true, y_pred, threshold=0.5): threshold = tf.cast(threshold, y_pred.dtype) y_pred = tf.cast(y_pred > threshold, y_pred.dtype) return tf.reduce_mean(tf.cast(tf.equal(y_true, y_pred), tf.float32)) def mae(y_true, y_pred): return tf.reduce_mean(tf.abs(y_true-y_pred)) def inv_ssim(y_true, y_pred): return 1 - tf.reduce_mean(tf.image.ssim(y_true, y_pred, 1.0)) def inv_msssim(y_true, y_pred): return 1 - tf.reduce_mean(tf.image.ssim_multiscale(y_true, y_pred, 1.0, filter_size=4)) def inv_msssim_l1(y_true, y_pred, alpha=0.8): return alpha*inv_msssim(y_true, y_pred) + (1-alpha)*mae(y_true, y_pred) def inv_msssim_gaussian_l1(y_true, y_pred, alpha=0.8): l1_diff = tf.abs(y_true-y_pred) gaussian_l1 = tfa.image.gaussian_filter2d(l1_diff, filter_shape=(11, 11), sigma=1.5) return alpha*inv_msssim(y_true, y_pred) + (1-alpha)*gaussian_l1 def psnr(y_true, y_pred): return tf.reduce_mean(tf.image.psnr(y_true, y_pred, 1.0)) class MultipleTrackers(): def __init__(self, callback_lists: list): self.callbacks_list = callback_lists def __getattr__(self, attr): def helper(*arg, **kwarg): for cb in self.callbacks_list: getattr(cb, attr)(*arg, **kwarg) if attr in self.__class__.__dict__: return getattr(self, attr) else: return helper class DCGAN(): def __init__(self, input_shape=(IMAGE_SIZE, IMAGE_SIZE, 1), architecture='two-stage', pretrain_weights=None, output_activation='sigmoid', block_type='conv-transpose', kernel_initializer='glorot_uniform', noise=None, C=1.): self.C = C # Build kwargs = dict(input_shape=input_shape, output_activation=output_activation, encoder_weights=pretrain_weights, block_type=block_type, kernel_initializer=kernel_initializer) if architecture == 'two-stage': encoder = get_efficient_unet(name='dcgan_disc', option='encoder', **kwargs) self.generator = get_efficient_unet(name='dcgan_gen', option='model', **kwargs) elif architecture == 'shared': self.generator, encoder = get_efficient_unet(name='dcgan', option='full', **kwargs) else: raise ValueError(f'Unsupport architecture: {architecture}') gpooling = GlobalAveragePooling2D()(encoder.output) prediction = Dense(1, activation='sigmoid')(gpooling) self.discriminator = Model(encoder.input, prediction, name='dcgan_disc') tf.keras.backend.clear_session() _ = gc.collect() if noise: gen_inputs = self.generator.input corrupted_inputs = noise(gen_inputs) outputs = self.generator(corrupted_inputs) self.generator = Model(gen_inputs, outputs, name='dcgan_gen') tf.keras.backend.clear_session() _ = gc.collect() if output_activation == 'tanh': self.process_input = layers.Lambda(lambda img: (img*2.-1.), name='dcgan_normalize') self.process_output = layers.Lambda(lambda img: (img*0.5+0.5), name='dcgan_denormalize') gen_inputs = self.generator.input process_inputs = self.process_input(gen_inputs) process_inputs = self.generator(process_inputs) gen_outputs = self.process_output(process_inputs) self.generator = Model(gen_inputs, gen_outputs, name='dcgan_gen') disc_inputs = self.discriminator.input process_inputs = self.process_input(disc_inputs) disc_outputs = self.discriminator(process_inputs) self.discriminator = Model(disc_inputs, disc_outputs, name='dcgan_disc') tf.keras.backend.clear_session() _ = gc.collect() def summary(self): self.generator.summary() self.discriminator.summary() def compile(self, generator_optimizer=Adam(5e-4, 0.5), discriminator_optimizer=Adam(5e-4), reconstruction_loss=mae, discriminative_loss=tf.keras.losses.BinaryCrossentropy(from_logits=False), reconstruction_metrics=[], discriminative_metrics=[]): self.discriminator_optimizer = discriminator_optimizer self.discriminator.compile(optimizer=self.discriminator_optimizer) self.generator_optimizer = generator_optimizer self.generator.compile(optimizer=self.generator_optimizer) self.loss = discriminative_loss self.reconstruction_loss = reconstruction_loss self.d_loss_tracker = tf.keras.metrics.Mean() self.g_loss_tracker = tf.keras.metrics.Mean() self.g_recon_tracker = tf.keras.metrics.Mean() self.g_disc_tracker = tf.keras.metrics.Mean() self.g_metric_trackers = [(tf.keras.metrics.Mean(), metric) for metric in reconstruction_metrics] self.d_metric_trackers = [(tf.keras.metrics.Mean(), tf.keras.metrics.Mean(), tf.keras.metrics.Mean(), metric) for metric in discriminative_metrics] all_trackers = [self.d_loss_tracker, self.g_loss_tracker, self.g_recon_tracker, self.g_disc_tracker] + \ [tracker for tracker,_ in self.g_metric_trackers] + \ [tracker for t in self.d_metric_trackers for tracker in t[:-1]] self.all_trackers = MultipleTrackers(all_trackers) def discriminator_loss(self, real_output, fake_output): real_loss = self.loss(tf.ones_like(real_output), real_output) fake_loss = self.loss(tf.zeros_like(fake_output), fake_output) total_loss = 0.5*(real_loss + fake_loss) return total_loss def generator_loss(self, fake_output): return self.loss(tf.ones_like(fake_output), fake_output) @tf.function def train_step(self, images): masked, original = images n_samples = tf.shape(original)[0] with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape: generated_images = self.generator(masked, training=True) real_output = self.discriminator(original, training=True) fake_output = self.discriminator(generated_images, training=True) gen_disc_loss = self.generator_loss(fake_output) recon_loss = self.reconstruction_loss(original, generated_images) gen_loss = self.C*recon_loss + gen_disc_loss disc_loss = self.discriminator_loss(real_output, fake_output) gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.trainable_variables) gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables) self.generator_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables)) self.discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables)) self.d_loss_tracker.update_state(tf.repeat([[disc_loss]], repeats=n_samples, axis=0)) self.g_loss_tracker.update_state(tf.repeat([[gen_loss]], repeats=n_samples, axis=0)) self.g_recon_tracker.update_state(tf.repeat([[recon_loss]], repeats=n_samples, axis=0)) self.g_disc_tracker.update_state(tf.repeat([[gen_disc_loss]], repeats=n_samples, axis=0)) logs = {'d_loss': self.d_loss_tracker.result()} for tracker, real_tracker, fake_tracker, metric in self.d_metric_trackers: v_real = metric(tf.ones_like(real_output), real_output) v_fake = metric(tf.zeros_like(fake_output), fake_output) v = 0.5*(v_real + v_fake) tracker.update_state(tf.repeat([[v]], repeats=n_samples, axis=0)) real_tracker.update_state(tf.repeat([[v_real]], repeats=n_samples, axis=0)) fake_tracker.update_state(tf.repeat([[v_fake]], repeats=n_samples, axis=0)) metric_name = metric.__name__ logs['d_' + metric_name] = tracker.result() logs['d_real_' + metric_name] = real_tracker.result() logs['d_fake_' + metric_name] = fake_tracker.result() logs['g_loss'] = self.g_loss_tracker.result() logs['g_recon'] = self.g_recon_tracker.result() logs['g_disc'] = self.g_disc_tracker.result() for tracker, metric in self.g_metric_trackers: v = metric(original, generated_images) tracker.update_state(tf.repeat([[v]], repeats=n_samples, axis=0)) logs['g_' + metric.__name__] = tracker.result() return logs @tf.function def val_step(self, images): masked, original = images n_samples = tf.shape(original)[0] generated_images = self.generator(masked, training=False) real_output = self.discriminator(original, training=False) fake_output = self.discriminator(generated_images, training=False) gen_disc_loss = self.generator_loss(fake_output) recon_loss = self.reconstruction_loss(original, generated_images) gen_loss = self.C*recon_loss + gen_disc_loss disc_loss = self.discriminator_loss(real_output, fake_output) self.d_loss_tracker.update_state(tf.repeat([[disc_loss]], repeats=n_samples, axis=0)) self.g_loss_tracker.update_state(tf.repeat([[gen_loss]], repeats=n_samples, axis=0)) self.g_recon_tracker.update_state(tf.repeat([[recon_loss]], repeats=n_samples, axis=0)) self.g_disc_tracker.update_state(tf.repeat([[gen_disc_loss]], repeats=n_samples, axis=0)) logs = {'val_d_loss': self.d_loss_tracker.result()} for tracker, real_tracker, fake_tracker, metric in self.d_metric_trackers: v_real = metric(tf.ones_like(real_output), real_output) v_fake = metric(tf.zeros_like(fake_output), fake_output) v = 0.5*(v_real + v_fake) tracker.update_state(tf.repeat([[v]], repeats=n_samples, axis=0)) real_tracker.update_state(tf.repeat([[v_real]], repeats=n_samples, axis=0)) fake_tracker.update_state(tf.repeat([[v_fake]], repeats=n_samples, axis=0)) metric_name = metric.__name__ logs['val_d_' + metric_name] = tracker.result() logs['val_d_real_' + metric_name] = real_tracker.result() logs['val_d_fake_' + metric_name] = fake_tracker.result() logs['val_g_loss'] = self.g_loss_tracker.result() logs['val_g_recon'] = self.g_recon_tracker.result() logs['val_g_disc'] = self.g_disc_tracker.result() for tracker, metric in self.g_metric_trackers: v = metric(original, generated_images) tracker.update_state(tf.repeat([[v]], repeats=n_samples, axis=0)) logs['val_g_' + metric.__name__] = tracker.result() return logs def fit(self, trainset, valset=None, trainsize=-1, valsize=-1, epochs=1, display_per_epochs=5, generator_callbacks=[], discriminator_callbacks=[]): print('🌊🐉 Start Training 🐉🌊') gen_callback_tracker = tf.keras.callbacks.CallbackList( generator_callbacks, add_history=True, model=self.generator ) disc_callback_tracker = tf.keras.callbacks.CallbackList( discriminator_callbacks, add_history=True, model=self.discriminator ) callbacks_tracker = MultipleTrackers([gen_callback_tracker, disc_callback_tracker]) logs = {} callbacks_tracker.on_train_begin(logs=logs) for epoch in range(epochs): print(f'Epochs {epoch+1}/{epochs}:') callbacks_tracker.on_epoch_begin(epoch, logs=logs) batches = tqdm(trainset, desc="Train", total=trainsize, unit="step", position=0, leave=True) for batch, image_batch in enumerate(batches): callbacks_tracker.on_batch_begin(batch, logs=logs) callbacks_tracker.on_train_batch_begin(batch, logs=logs) train_logs = {k:v.numpy() for k, v in self.train_step(image_batch).items()} logs.update(train_logs) callbacks_tracker.on_train_batch_end(batch, logs=logs) callbacks_tracker.on_batch_end(batch, logs=logs) batches.set_postfix({'d_loss': train_logs['d_loss'], 'g_loss': train_logs['g_loss'] }) # Presentation stats = ", ".join("{}={:.3g}".format(k, v) for k, v in logs.items() if 'val_' not in k and 'loss' not in k) print('Train:', stats) batches.close() if valset: self.all_trackers.reset_state() batches = tqdm(valset, desc="Valid", total=valsize, unit="step", position=0, leave=True) for batch, image_batch in enumerate(batches): callbacks_tracker.on_batch_begin(batch, logs=logs) callbacks_tracker.on_test_batch_begin(batch, logs=logs) val_logs = {k:v.numpy() for k, v in self.val_step(image_batch).items()} logs.update(val_logs) callbacks_tracker.on_test_batch_end(batch, logs=logs) callbacks_tracker.on_batch_end(batch, logs=logs) # Presentation batches.set_postfix({'val_d_loss': val_logs['val_d_loss'], 'val_g_loss': val_logs['val_g_loss'] }) stats = ", ".join("{}={:.3g}".format(k, v) for k, v in logs.items() if 'val_' in k and 'loss' not in k) print('Valid:', stats) batches.close() if epoch % display_per_epochs == 0: print('-'*128) self.visualize_samples((image_batch[0][:2], image_batch[1][:2])) self.all_trackers.reset_state() callbacks_tracker.on_epoch_end(epoch, logs=logs) # tf.keras.backend.clear_session() _ = gc.collect() if self.generator.stop_training or self.discriminator.stop_training: break print('-'*128) callbacks_tracker.on_train_end(logs=logs) tf.keras.backend.clear_session() _ = gc.collect() gen_history = None for cb in gen_callback_tracker: if isinstance(cb, tf.keras.callbacks.History): gen_history = cb gen_history.history = {k:v for k,v in cb.history.items() if 'd_' not in k} disc_history = None for cb in disc_callback_tracker: if isinstance(cb, tf.keras.callbacks.History): disc_history = cb disc_history.history = {k:v for k,v in cb.history.items() if 'g_' not in k} return {'generator':gen_history, 'discriminator':disc_history} def visualize_samples(self, samples, figsize=(12, 2)): x, y = samples y_pred = self.generator.predict(x[:2], verbose=0) fig, axs = plt.subplots(1, 6, figsize=figsize) for i in range(2): pos = 3*i axs[pos].imshow(x[i], cmap='gray', vmin=0., vmax=1.) axs[pos].set_title('Masked') axs[pos].axis('off') axs[pos+1].imshow(y[i], cmap='gray', vmin=0., vmax=1.) axs[pos+1].set_title('Original') axs[pos+1].axis('off') axs[pos+2].imshow(y_pred[i], cmap='gray', vmin=0., vmax=1.) axs[pos+2].set_title('Predicted') axs[pos+2].axis('off') plt.show() # tf.keras.backend.clear_session() del y_pred _ = gc.collect() dcgan = DCGAN(input_shape=(IMAGE_SIZE, IMAGE_SIZE, 1), architecture='two-stage', output_activation='sigmoid', noise=DropBlockNoise(rate=0.1, block_size=16), pretrain_weights=None, block_type='pixel-shuffle', kernel_initializer='glorot_uniform', C=1.) restore_model = dcgan.generator restore_model.load_weights("./weights_gae/gan_efficientunet_full_augment-hist_equal_generator.h5") restore_model.trainable = False def show_image(image, title='Image', cmap_type='gray'): plt.imshow(image, cmap=cmap_type) plt.title(title) plt.axis('off') plt.show() # đảo màu những ảnh bị ngược màu def remove_negative(img): outside = np.mean(img[ : , 0]) inside = np.mean(img[ : , int(IMAGE_SIZE / 2)]) if outside < inside: return img else: return 1 - img # lựa chọn tiền xử lý: ảnh gốc, Equalization histogram, CLAHE def preprocess(img): img = remove_negative(img) img = exposure.equalize_hist(img) img = exposure.equalize_adapthist(img) img = exposure.equalize_hist(img) return img # dilate contour def dilate(mask_img): kernel_size = 2 * 22 + 1 kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8) return ndimage.binary_dilation(mask_img == 0, structure=kernel) # Tiêu đề của ứng dụng st.title("Tải và hiển thị ảnh") # Hiển thị widget tải tệp tin ảnh uploaded_file = st.file_uploader("Chọn một tệp tin ảnh", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: # Đọc dữ liệu ảnh từ tệp tin tải lên mask = seg(uploaded_file) # Sử dụng Matplotlib để đọc và hiển thị ảnh C img = plt.imread(uploaded_file, 0) img = np.array(Image.fromarray(img).resize((224, 224))) img = preprocess(img) # Hiển thị ảnh gốc show_image(img, title="Original image") plt.axis('off') st.pyplot() uc, lc = get_contours_v2(mask, verbose=1) # img = cv2.imread(filepath) mask = np.zeros((640, 640)).astype('uint8') mask = draw_points(mask, lc, thickness=1, color=(255, 255, 255)) mask = draw_points(mask, uc, thickness=1, color=(255, 255, 255)) mask = cv2.resize(mask, (224, 224), cv2.INTER_NEAREST) mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) mask = mask / 255. show_image(mask, title = "Contour") plt.axis('off') st.pyplot() # sử dụng equalization histogram mask = 1 - mask dilated = gaussian(dilate(mask), sigma=50, truncate=0.3) im = np.expand_dims(img * (1 - dilated), axis=0) im = tf.convert_to_tensor(im, dtype=tf.float32) restored_img = restore_model(im) res = tf.squeeze(tf.squeeze(restored_img, axis=-1), axis=0) show_image(im[0], title="Masked Image") plt.axis('off') st.pyplot() show_image(res, title="Reconstructed image") plt.axis('off') st.pyplot() show_image(dilated*tf.abs(img-res), title="Anomaly map", cmap_type='turbo') plt.axis('off') st.pyplot() plt.imshow(img, cmap = 'gray') plt.imshow(dilated*tf.abs(img-res), cmap ='turbo', alpha = 0.3) plt.axis('off') plt.show() st.pyplot()