|
import numpy as np |
|
from skimage import transform |
|
import pydicom |
|
from io import BytesIO |
|
from PIL import Image |
|
import nibabel as nib |
|
import SimpleITK as sitk |
|
from skimage import measure |
|
|
|
|
|
""" |
|
This script contains utility functions for reading and processing different imaging modalities. |
|
""" |
|
|
|
|
|
CT_WINDOWS = {'abdomen': [-150, 250], |
|
'lung': [-1000, 1000], |
|
'pelvis': [-55, 200], |
|
'liver': [-25, 230], |
|
'colon': [-68, 187], |
|
'pancreas': [-100, 200]} |
|
|
|
def process_intensity_image(image_data, is_CT, site=None): |
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_CT: |
|
|
|
if site and site in CT_WINDOWS: |
|
window = CT_WINDOWS[site] |
|
else: |
|
raise ValueError(f'Please choose CT site from {CT_WINDOWS.keys()}') |
|
lower_bound, upper_bound = window |
|
else: |
|
|
|
lower_bound, upper_bound = np.percentile( |
|
image_data[image_data > 0], 0.5 |
|
), np.percentile(image_data[image_data > 0], 99.5) |
|
|
|
image_data_pre = np.clip(image_data, lower_bound, upper_bound) |
|
image_data_pre = ( |
|
(image_data_pre - image_data_pre.min()) |
|
/ (image_data_pre.max() - image_data_pre.min()) |
|
* 255.0 |
|
) |
|
|
|
|
|
shape = image_data_pre.shape |
|
if shape[0] > shape[1]: |
|
pad = (shape[0]-shape[1])//2 |
|
pad_width = ((0,0), (pad, pad)) |
|
elif shape[0] < shape[1]: |
|
pad = (shape[1]-shape[0])//2 |
|
pad_width = ((pad, pad), (0,0)) |
|
else: |
|
pad_width = None |
|
|
|
if pad_width is not None: |
|
image_data_pre = np.pad(image_data_pre, pad_width, 'constant', constant_values=0) |
|
|
|
|
|
image_size = 1024 |
|
resize_image = transform.resize(image_data_pre, (image_size, image_size), order=3, |
|
mode='constant', preserve_range=True, anti_aliasing=True) |
|
|
|
|
|
resize_image = np.stack([resize_image]*3, axis=-1) |
|
|
|
return resize_image.astype(np.uint8) |
|
|
|
|
|
|
|
def read_dicom(image_path, is_CT, site=None): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ds = pydicom.dcmread(image_path) |
|
image_array = ds.pixel_array * ds.RescaleSlope + ds.RescaleIntercept |
|
|
|
image_array = process_intensity_image(image_array, is_CT, site) |
|
|
|
return image_array |
|
|
|
|
|
def read_nifti(image_path, is_CT, slice_idx, site=None, HW_index=(0, 1), channel_idx=None): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
nii = nib.load(image_path) |
|
image_array = nii.get_fdata() |
|
|
|
if HW_index != (0, 1): |
|
image_array = np.moveaxis(image_array, HW_index, (0, 1)) |
|
|
|
|
|
if channel_idx is None: |
|
image_array = image_array[:, :, slice_idx] |
|
else: |
|
image_array = image_array[:, :, slice_idx, channel_idx] |
|
|
|
image_array = process_intensity_image(image_array, is_CT, site) |
|
return image_array |
|
|
|
|
|
|
|
def read_rgb(image_path): |
|
|
|
|
|
|
|
|
|
|
|
|
|
image = Image.open(image_path) |
|
image = np.array(image) |
|
if len(image.shape) == 2: |
|
image = np.stack([image]*3, axis=-1) |
|
elif image.shape[2] == 4: |
|
image = image[:,:,:3] |
|
|
|
|
|
shape = image.shape |
|
if shape[0] > shape[1]: |
|
pad = (shape[0]-shape[1])//2 |
|
pad_width = ((0,0), (pad, pad), (0,0)) |
|
elif shape[0] < shape[1]: |
|
pad = (shape[1]-shape[0])//2 |
|
pad_width = ((pad, pad), (0,0), (0,0)) |
|
else: |
|
pad_width = None |
|
|
|
if pad_width is not None: |
|
image = np.pad(image, pad_width, 'constant', constant_values=0) |
|
|
|
|
|
image_size = 1024 |
|
resize_image = np.zeros((image_size, image_size, 3), dtype=np.uint8) |
|
for i in range(3): |
|
resize_image[:,:,i] = transform.resize(image[:,:,i], (image_size, image_size), order=3, |
|
mode='constant', preserve_range=True, anti_aliasing=True) |
|
|
|
return resize_image |
|
|
|
|
|
|
|
def get_instances(mask): |
|
|
|
seg = sitk.GetImageFromArray(mask) |
|
filled = sitk.BinaryFillhole(seg) |
|
d = sitk.SignedMaurerDistanceMap(filled, insideIsPositive=False, squaredDistance=False, useImageSpacing=False) |
|
|
|
ws = sitk.MorphologicalWatershed( d, markWatershedLine=False, level=1) |
|
ws = sitk.Mask( ws, sitk.Cast(seg, ws.GetPixelID())) |
|
ins_mask = sitk.GetArrayFromImage(ws) |
|
|
|
|
|
props = measure.regionprops_table(ins_mask, properties=('label', 'area')) |
|
mean_area = np.mean(props['area']) |
|
std_area = np.std(props['area']) |
|
|
|
threshold = mean_area - 2*std_area - 1 |
|
ins_mask_filtered = ins_mask.copy() |
|
for i, area in zip(props['label'], props['area']): |
|
if area < threshold: |
|
ins_mask_filtered[ins_mask == i] = 0 |
|
|
|
return ins_mask_filtered |
|
|
|
|