Caleb Spradlin
initial commit
ab687e7
raw
history blame
17.6 kB
import logging
import random
from tqdm import tqdm
import numpy as np
from numpy import fliplr, flipud
import scipy.signal
SEED = 42
np.random.seed(SEED)
__author__ = "Jordan A Caraballo-Vega, Science Data Processing Branch"
__email__ = "[email protected]"
__status__ = "Production"
# ----------------------------------------------------------------------------
# module processing
#
# General functions to perform standardization of images (numpy arrays).
# A couple of methods have been implemented for testing, including global and
# local standardization for neural networks input. Data manipulation stage,
# extract random patches for training and store them in numpy arrays.
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Module Methods
# ---------------------------------------------------------------------------
# --------------------------- Normalization Functions ----------------------- #
def normalize(images, factor=65535.0) -> np.array:
"""
Normalize numpy array in the range of [0,1]
:param images: numpy array in the format (n,w,h,c).
:param factor: float number to normalize images, e.g. 2^(16)-1
:return: numpy array in the [0,1] range
"""
return images / factor
# ------------------------ Standardization Functions ----------------------- #
def global_standardization(images, strategy='per-batch') -> np.array:
"""
Standardize numpy array using global standardization.
:param images: numpy array in the format (n,w,h,c).
:param strategy: can select between per-image or per-batch.
:return: globally standardized numpy array
"""
if strategy == 'per-batch':
mean = np.mean(images) # global mean of all images
std = np.std(images) # global std of all images
for i in range(images.shape[0]): # for each image in images
images[i, :, :, :] = (images[i, :, :, :] - mean) / std
elif strategy == 'per-image':
for i in range(images.shape[0]): # for each image in images
mean = np.mean(images[i, :, :, :]) # image mean
std = np.std(images[i, :, :, :]) # image std
images[i, :, :, :] = (images[i, :, :, :] - mean) / std
return images
def local_standardization(images, filename='normalization_data',
ndata=None, strategy='per-batch'
) -> np.array:
"""
Standardize numpy array using local standardization.
:param images: numpy array in the format (n,w,h,c).
:param filename: filename to store mean and std data.
:param ndata: pandas df with mean and std values for each channel.
:param strategy: can select between per-image or per-batch.
:return: locally standardized numpy array
"""
if ndata: # for inference only
for i in range(images.shape[-1]): # for each channel in images
# standardize all images based on given mean and std
images[:, :, :, i] = \
(images[:, :, :, i] - ndata['channel_mean'][i]) / \
ndata['channel_std'][i]
return images
elif strategy == 'per-batch': # for all images in batch
f = open(filename + "_norm_data.csv", "w+")
f.write(
"i,channel_mean,channel_std,channel_mean_post,channel_std_post\n"
)
for i in range(images.shape[-1]): # for each channel in images
channel_mean = np.mean(images[:, :, :, i]) # mean for each channel
channel_std = np.std(images[:, :, :, i]) # std for each channel
images[:, :, :, i] = \
(images[:, :, :, i] - channel_mean) / channel_std
channel_mean_post = np.mean(images[:, :, :, i])
channel_std_post = np.std(images[:, :, :, i])
# write to file for each channel
f.write('{},{},{},{},{}\n'.format(i, channel_mean, channel_std,
channel_mean_post,
channel_std_post
)
)
f.close() # close file
elif strategy == 'per-image': # standardization for each image
for i in range(images.shape[0]): # for each image
for j in range(images.shape[-1]): # for each channel in images
channel_mean = np.mean(images[i, :, :, j])
channel_std = np.std(images[i, :, :, j])
images[i, :, :, j] = \
(images[i, :, :, j] - channel_mean) / channel_std
else:
raise RuntimeError(f'Standardization <{strategy}> not supported')
return images
def standardize_image(
image,
standardization_type: str,
mean: list = None,
std: list = None,
global_min: list = None,
global_max: list = None
):
"""
Standardize image within parameter, simple scaling of values.
Loca, Global, and Mixed options.
"""
image = image.astype(np.float32)
if standardization_type == 'local':
for i in range(image.shape[-1]):
image[:, :, i] = (image[:, :, i] - np.mean(image[:, :, i])) / \
(np.std(image[:, :, i]) + 1e-8)
elif standardization_type == 'minmax':
for i in range(image.shape[-1]):
image[:, :, i] = (image[:, :, i] - 0) / (55-0)
elif standardization_type == 'localminmax':
for i in range(image.shape[-1]):
image[:, :, i] = (image[:, :, i] - np.min(image[:, :, 0])) / \
(np.max(image[:, :, i])-np.min(image[:, :, i]))
elif standardization_type == 'globalminmax':
for i in range(image.shape[-1]):
image[:, :, i] = (image[:, :, i] - global_min) / \
(global_max - global_min)
elif standardization_type == 'global':
for i in range(image.shape[-1]):
image[:, :, i] = (image[:, :, i] - mean[i]) / (std[i] + 1e-8)
elif standardization_type == 'mixed':
raise NotImplementedError
return image
def standardize_batch(
image_batch,
standardization_type: str,
mean: list = None,
std: list = None
):
"""
Standardize image within parameter, simple scaling of values.
Loca, Global, and Mixed options.
"""
for item in range(image_batch.shape[0]):
image_batch[item, :, :, :] = standardize_image(
image_batch[item, :, :, :], standardization_type, mean, std)
return image_batch
# ------------------------ Data Preparation Functions ----------------------- #
def get_rand_patches_rand_cond(img, mask, n_patches=16000, sz=160, nclasses=6,
nodata_ascloud=True, method='rand'
) -> np.array:
"""
Generate training data.
:param images: ndarray in the format (w,h,c).
:param mask: integer ndarray with shape (x_sz, y_sz)
:param n_patches: number of patches
:param sz: tile size, will be used for both height and width
:param nclasses: number of classes present in the output data
:param nodata_ascloud: convert no-data values to cloud labels
:param method: choose between rand, cond, cloud
rand - select N number of random patches for each image
cond - select N number of random patches for each image,
with the condition of having 1+ class per tile.
cloud - select tiles that have clouds
:return: two numpy array with data and labels.
"""
if nodata_ascloud:
# if no-data present, change to final class
mask = mask.values # return numpy array
mask[mask > nclasses] = nclasses # some no-data are 255 or other big
mask[mask < 0] = nclasses # some no-data are -128 or smaller negative
patches = [] # list to store data patches
labels = [] # list to store label patches
for i in tqdm(range(n_patches)):
# Generate random integers from image
xc = random.randint(0, img.shape[0] - sz)
yc = random.randint(0, img.shape[1] - sz)
if method == 'cond':
# while loop to regenerate random ints if tile has only one class
while len(np.unique(mask[xc:(xc+sz), yc:(yc+sz)])) == 1 or \
6 in mask[xc:(xc+sz), yc:(yc+sz)] or \
img[xc:(xc+sz), yc:(yc+sz), :].values.min() < 0:
xc = random.randint(0, img.shape[0] - sz)
yc = random.randint(0, img.shape[1] - sz)
elif method == 'rand':
while 6 in mask[xc:(xc+sz), yc:(yc+sz)] or \
img[xc:(xc+sz), yc:(yc+sz), :].values.min() < 0:
xc = random.randint(0, img.shape[0] - sz)
yc = random.randint(0, img.shape[1] - sz)
elif method == 'cloud':
while np.count_nonzero(mask[xc:(xc+sz), yc:(yc+sz)] == 6) < 15:
xc = random.randint(0, img.shape[0] - sz)
yc = random.randint(0, img.shape[1] - sz)
# Generate img and mask patches
patch_img = img[xc:(xc + sz), yc:(yc + sz)]
patch_mask = mask[xc:(xc + sz), yc:(yc + sz)]
# Apply some random transformations
random_transformation = np.random.randint(1, 7)
if random_transformation == 1: # flip left and right
patch_img = fliplr(patch_img)
patch_mask = fliplr(patch_mask)
elif random_transformation == 2: # reverse second dimension
patch_img = flipud(patch_img)
patch_mask = flipud(patch_mask)
elif random_transformation == 3: # rotate 90 degrees
patch_img = np.rot90(patch_img, 1)
patch_mask = np.rot90(patch_mask, 1)
elif random_transformation == 4: # rotate 180 degrees
patch_img = np.rot90(patch_img, 2)
patch_mask = np.rot90(patch_mask, 2)
elif random_transformation == 5: # rotate 270 degrees
patch_img = np.rot90(patch_img, 3)
patch_mask = np.rot90(patch_mask, 3)
else: # original image
pass
patches.append(patch_img)
labels.append(patch_mask)
return np.asarray(patches), np.asarray(labels)
def get_rand_patches_aug_augcond(img, mask, n_patches=16000, sz=256,
nclasses=6, over=50, nodata_ascloud=True,
nodata=-9999, method='augcond'
) -> np.array:
"""
Generate training data.
:param images: ndarray in the format (w,h,c).
:param mask: integer ndarray with shape (x_sz, y_sz)
:param n_patches: number of patches
:param sz: tile size, will be used for both height and width
:param nclasses: number of classes present in the output data
:param over: number of pixels to overlap between images
:param nodata_ascloud: convert no-data values to cloud labels
:param method: choose between rand, cond, cloud
aug - select N * 8 number of random patches for each
image after data augmentation.
augcond - select N * 8 number of random patches for
each image, with the condition of having 1+ per
tile, after data augmentation.
:return: two numpy array with data and labels.
"""
mask = mask.values # return numpy array
if nodata_ascloud:
# if no-data present, change to final class
mask[mask > nclasses] = nodata # some no-data are 255 or other big
mask[mask < 0] = nodata # some no-data are -128 or smaller negative
patches = [] # list to store data patches
labels = [] # list to store label patches
for i in tqdm(range(n_patches)):
# Generate random integers from image
xc = random.randint(0, img.shape[0] - sz - sz)
yc = random.randint(0, img.shape[1] - sz - sz)
if method == 'augcond':
# while loop to regenerate random ints if tile has only one class
while len(np.unique(mask[xc:(xc + sz), yc:(yc + sz)])) == 1 or \
nodata in mask[xc:(xc + sz), yc:(yc + sz)] or \
nodata in mask[(xc + sz - over):(xc + sz + sz - over),
(yc + sz - over):(yc + sz + sz - over)] or \
nodata in mask[(xc + sz - over):(xc + sz + sz - over),
yc:(yc + sz)]:
xc = random.randint(0, img.shape[0] - sz - sz)
yc = random.randint(0, img.shape[1] - sz - sz)
elif method == 'aug':
# while loop to regenerate random ints if tile has only one class
while nodata in mask[xc:(xc + sz), yc:(yc + sz)] or \
nodata in mask[(xc + sz - over):(xc + sz + sz - over),
(yc + sz - over):(yc + sz + sz - over)] or \
nodata in mask[(xc + sz - over):(xc + sz + sz - over),
yc:(yc + sz)]:
xc = random.randint(0, img.shape[0] - sz - sz)
yc = random.randint(0, img.shape[1] - sz - sz)
# Generate img and mask patches
patch_img = img[xc:(xc + sz), yc:(yc + sz)] # original image patch
patch_mask = mask[xc:(xc + sz), yc:(yc + sz)] # original mask patch
# Apply transformations for data augmentation
# 1. No augmentation and append to list
patches.append(patch_img)
labels.append(patch_mask)
# 2. Rotate 90 and append to list
patches.append(np.rot90(patch_img, 1))
labels.append(np.rot90(patch_mask, 1))
# 3. Rotate 180 and append to list
patches.append(np.rot90(patch_img, 2))
labels.append(np.rot90(patch_mask, 2))
# 4. Rotate 270
patches.append(np.rot90(patch_img, 3))
labels.append(np.rot90(patch_mask, 3))
# 5. Flipped up and down’
patches.append(flipud(patch_img))
labels.append(flipud(patch_mask))
# 6. Flipped left and right
patches.append(fliplr(patch_img))
labels.append(fliplr(patch_mask))
# 7. overlapping tiles - next tile, down
patches.append(img[(xc + sz - over):(xc + sz + sz - over),
(yc + sz - over):(yc + sz + sz - over)])
labels.append(mask[(xc + sz - over):(xc + sz + sz - over),
(yc + sz - over):(yc + sz + sz - over)])
# 8. overlapping tiles - next tile, side
patches.append(img[(xc + sz - over):(xc + sz + sz - over),
yc:(yc + sz)])
labels.append(mask[(xc + sz - over):(xc + sz + sz - over),
yc:(yc + sz)])
return np.asarray(patches), np.asarray(labels)
# ------------------------ Artifact Removal Functions ----------------------- #
def _2d_spline(window_size=128, power=2) -> np.array:
"""
Window method for boundaries/edge artifacts smoothing.
:param window_size: size of window/tile to smooth
:param power: spline polinomial power to use
:return: smoothing distribution numpy array
"""
intersection = int(window_size/4)
tria = scipy.signal.triang(window_size)
wind_outer = (abs(2*(tria)) ** power)/2
wind_outer[intersection:-intersection] = 0
wind_inner = 1 - (abs(2*(tria - 1)) ** power)/2
wind_inner[:intersection] = 0
wind_inner[-intersection:] = 0
wind = wind_inner + wind_outer
wind = wind / np.average(wind)
wind = np.expand_dims(np.expand_dims(wind, 1), 2)
wind = wind * wind.transpose(1, 0, 2)
return wind
def _hann_matrix(window_size=128, power=2) -> np.array:
logging.info("Placeholder for next release.")
# -------------------------------------------------------------------------------
# module preprocessing Unit Tests
# -------------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Unit Test #1 - Testing normalization distributions
x = (np.random.randint(65536, size=(10, 128, 128, 6))).astype('float32')
x_norm = normalize(x, factor=65535) # apply static normalization
assert x_norm.max() == 1.0, "Unexpected max value."
logging.info(f"UT #1 PASS: {x_norm.mean()}, {x_norm.std()}")
# Unit Test #2 - Testing standardization distributions
standardized = global_standardization(x_norm, strategy='per-batch')
assert standardized.max() > 1.731, "Unexpected max value."
logging.info(f"UT #2 PASS: {standardized.mean()}, {standardized.std()}")
# Unit Test #3 - Testing standardization distributions
standardized = global_standardization(x_norm, strategy='per-image')
assert standardized.max() > 1.73, "Unexpected max value."
logging.info(f"UT #3 PASS: {standardized.mean()}, {standardized.std()}")
# Unit Test #4 - Testing standardization distributions
standardized = local_standardization(x_norm, filename='normalization_data',
strategy='per-batch'
)
assert standardized.max() > 1.74, "Unexpected max value."
logging.info(f"UT #4 PASS: {standardized.mean()}, {standardized.std()}")
# Unit Test #5 - Testing standardization distributions
standardized = local_standardization(x_norm, filename='normalization_data',
strategy='per-image'
)
assert standardized.max() > 1.75, "Unexpected max value."
logging.info(f"UT #5 PASS: {standardized.mean()}, {standardized.std()}")