changechip-space / changechip.py
ooferdoodles's picture
yes
242ebc0
import os
import cv2
import numpy as np
from skimage.exposure import match_histograms
from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import seaborn as sns
import time
def resize_images(images, resize_factor=1.0):
"""
Resizes the input and reference images based on the average dimensions of the two images and a resize factor.
Parameters:
images (tuple): A tuple containing two images (input_image, reference_image). Both images should be numpy arrays.
resize_factor (float): A factor by which to resize the images. Default is 1.0, which means the images will be resized to the average dimensions of the two images.
Returns:
tuple: A tuple containing the resized input and reference images.
Example:
>>> input_image = cv2.imread('input.jpg')
>>> reference_image = cv2.imread('reference.jpg')
>>> resized_images = resize_images((input_image, reference_image), resize_factor=0.5)
"""
input_image, reference_image = images
average_width = (input_image.shape[1] + reference_image.shape[1]) * 0.5
average_height = (input_image.shape[0] + reference_image.shape[0]) * 0.5
new_shape = (
int(resize_factor * average_width),
int(resize_factor * average_height),
)
input_image = cv2.resize(input_image, new_shape, interpolation=cv2.INTER_AREA)
reference_image = cv2.resize(
reference_image, new_shape, interpolation=cv2.INTER_AREA
)
return input_image, reference_image
def homography(images, debug=False, output_directory=None):
"""
Apply homography transformation to align two images.
Args:
images (tuple): A tuple containing two images, where the first image is the input image and the second image is the reference image.
debug (bool, optional): If True, debug images will be generated. Defaults to False.
output_directory (str, optional): The directory to save the debug images. Defaults to None.
Returns:
tuple: A tuple containing the aligned input image and the reference image.
"""
input_image, reference_image = images
# Initiate SIFT detector
sift = cv2.SIFT_create()
# find the keypoints and descriptors with SIFT
input_keypoints, input_descriptors = sift.detectAndCompute(input_image, None)
reference_keypoints, reference_descriptors = sift.detectAndCompute(
reference_image, None
)
# BFMatcher with default params
bf = cv2.BFMatcher()
matches = bf.knnMatch(reference_descriptors, input_descriptors, k=2)
# Apply ratio test
good_draw = []
good_without_list = []
for m, n in matches:
if m.distance < 0.8 * n.distance: # 0.8 = a value suggested by David G. Lowe.
good_draw.append([m])
good_without_list.append(m)
# cv.drawMatchesKnn expects list of lists as matches.
if debug:
assert output_directory is not None, "Output directory must be provided"
os.makedirs(output_directory, exist_ok=True)
cv2.imwrite(
os.path.join(output_directory, "matching.png"),
cv2.drawMatchesKnn(
reference_image,
reference_keypoints,
input_image,
input_keypoints,
good_draw,
None,
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS,
),
)
# Extract location of good matches
reference_points = np.zeros((len(good_without_list), 2), dtype=np.float32)
input_points = reference_points.copy()
for i, match in enumerate(good_without_list):
input_points[i, :] = reference_keypoints[match.queryIdx].pt
reference_points[i, :] = input_keypoints[match.trainIdx].pt
# Find homography
h, _ = cv2.findHomography(input_points, reference_points, cv2.RANSAC)
# Use homography
height, width = reference_image.shape[:2]
white_reference_image = 255 - np.zeros(shape=reference_image.shape, dtype=np.uint8)
white_reg = cv2.warpPerspective(white_reference_image, h, (width, height))
blank_pixels_mask = np.any(white_reg != [255, 255, 255], axis=-1)
reference_image_registered = cv2.warpPerspective(
reference_image, h, (width, height)
)
if debug:
assert output_directory is not None, "Output directory must be provided"
cv2.imwrite(
os.path.join(output_directory, "aligned.png"), reference_image_registered
)
input_image[blank_pixels_mask] = [0, 0, 0]
reference_image_registered[blank_pixels_mask] = [0, 0, 0]
return input_image, reference_image_registered
def histogram_matching(images, debug=False, output_directory=None):
"""
Perform histogram matching between an input image and a reference image.
Args:
images (tuple): A tuple containing the input image and the reference image.
debug (bool, optional): If True, save the histogram-matched image to the output directory. Defaults to False.
output_directory (str, optional): The directory to save the histogram-matched image. Defaults to None.
Returns:
tuple: A tuple containing the input image and the histogram-matched reference image.
"""
input_image, reference_image = images
reference_image_matched = match_histograms(
reference_image, input_image, channel_axis=-1
)
if debug:
assert output_directory is not None, "Output directory must be provided"
cv2.imwrite(
os.path.join(output_directory, "histogram_matched.jpg"),
reference_image_matched,
)
reference_image_matched = np.asarray(reference_image_matched, dtype=np.uint8)
return input_image, reference_image_matched
def preprocess_images(images, resize_factor=1.0, debug=False, output_directory=None):
"""
Preprocesses a list of images by performing the following steps:
1. Resizes the images based on the given resize factor.
2. Applies homography to align the resized images.
3. Performs histogram matching on the aligned images.
Args:
images (tuple): A tuple containing the input image and the reference image.
resize_factor (float, optional): The factor by which to resize the images. Defaults to 1.0.
debug (bool, optional): Whether to enable debug mode. Defaults to False.
output_directory (str, optional): The directory to save the output images. Defaults to None.
Returns:
tuple: The preprocessed images.
Example:
>>> images = (input_image, reference_image)
>>> preprocess_images(images, resize_factor=0.5, debug=True, output_directory='output/')
"""
start_time = time.time()
resized_images = resize_images(images, resize_factor)
aligned_images = homography(
resized_images, debug=debug, output_directory=output_directory
)
matched_images = histogram_matching(
aligned_images, debug=debug, output_directory=output_directory
)
print("--- Preprocessing time - %s seconds ---" % (time.time() - start_time))
return matched_images
# The returned vector_set goes later to the PCA algorithm which derives the EVS (Eigen Vector Space).
# Therefore, there is a mean normalization of the data
# jump_size is for iterating non-overlapping windows. This parameter should be eqaul to the window_size of the system
def find_vector_set(descriptors, jump_size, shape):
"""
Find the vector set from the given descriptors.
Args:
descriptors (numpy.ndarray): The input descriptors.
jump_size (int): The jump size for sampling the descriptors.
shape (tuple): The shape of the descriptors.
Returns:
tuple: A tuple containing the vector set and the mean vector.
"""
size_0, size_1 = shape
descriptors_2d = descriptors.reshape((size_0, size_1, descriptors.shape[1]))
vector_set = descriptors_2d[::jump_size, ::jump_size]
vector_set = vector_set.reshape(
(vector_set.shape[0] * vector_set.shape[1], vector_set.shape[2])
)
mean_vec = np.mean(vector_set, axis=0)
vector_set = vector_set - mean_vec # mean normalization
return vector_set, mean_vec
# returns the FSV (Feature Vector Space) which then goes directly to clustering (with Kmeans)
# Multiply the data with the EVS to get the entire data in the PCA target space
def find_FVS(descriptors, EVS, mean_vec):
"""
Calculate the feature vector space (FVS) by performing dot product of descriptors and EVS,
and subtracting the mean vector from the result.
Args:
descriptors (numpy.ndarray): Array of descriptors.
EVS (numpy.ndarray): Eigenvalue matrix.
mean_vec (numpy.ndarray): Mean vector.
Returns:
numpy.ndarray: The calculated feature vector space (FVS).
"""
FVS = np.dot(descriptors, EVS)
FVS = FVS - mean_vec
# print("\nfeature vector space size", FVS.shape)
return FVS
# assumes descriptors is already flattened
# returns descriptors after moving them into the PCA vector space
def descriptors_to_pca(descriptors, pca_target_dim, window_size, shape):
"""
Applies Principal Component Analysis (PCA) to a set of descriptors.
Args:
descriptors (list): List of descriptors.
pca_target_dim (int): Target dimensionality for PCA.
window_size (int): Size of the sliding window.
shape (tuple): Shape of the descriptors.
Returns:
list: Feature vector set after applying PCA.
"""
vector_set, mean_vec = find_vector_set(descriptors, window_size, shape)
pca = PCA(pca_target_dim)
pca.fit(vector_set)
EVS = pca.components_
mean_vec = np.dot(mean_vec, EVS.transpose())
FVS = find_FVS(descriptors, EVS.transpose(), mean_vec)
return FVS
def get_descriptors(
images,
window_size,
pca_dim_gray,
pca_dim_rgb,
debug=False,
output_directory=None,
):
"""
Compute descriptors for input images using sliding window technique and PCA.
Args:
images (tuple): A tuple containing the input image and reference image.
window_size (int): The size of the sliding window.
pca_dim_gray (int): The number of dimensions to keep for grayscale PCA.
pca_dim_rgb (int): The number of dimensions to keep for RGB PCA.
debug (bool, optional): Whether to enable debug mode. Defaults to False.
output_directory (str, optional): The directory to save debug images. Required if debug is True.
Returns:
numpy.ndarray: The computed descriptors.
Raises:
AssertionError: If debug is True but output_directory is not provided.
"""
input_image, reference_image = images
diff_image_gray = cv2.cvtColor(
cv2.absdiff(input_image, reference_image), cv2.COLOR_BGR2GRAY
)
if debug:
assert output_directory is not None, "Output directory must be provided"
cv2.imwrite(os.path.join(output_directory, "diff.jpg"), diff_image_gray)
# Padding for windowing
padded_diff_gray = np.pad(
diff_image_gray,
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)),
mode="constant",
)
# Sliding window for gray
shape = (input_image.shape[0], input_image.shape[1], window_size, window_size)
strides = padded_diff_gray.strides * 2
windows_gray = np.lib.stride_tricks.as_strided(
padded_diff_gray, shape=shape, strides=strides
)
descriptors_gray_diff = windows_gray.reshape(-1, window_size * window_size)
# 3-channel RGB differences
diff_image_r = cv2.absdiff(input_image[:, :, 0], reference_image[:, :, 0])
diff_image_g = cv2.absdiff(input_image[:, :, 1], reference_image[:, :, 1])
diff_image_b = cv2.absdiff(input_image[:, :, 2], reference_image[:, :, 2])
if debug:
assert output_directory is not None, "Output directory must be provided"
cv2.imwrite(
os.path.join(output_directory, "final_diff.jpg"),
cv2.absdiff(input_image, reference_image),
)
cv2.imwrite(os.path.join(output_directory, "final_diff_r.jpg"), diff_image_r)
cv2.imwrite(os.path.join(output_directory, "final_diff_g.jpg"), diff_image_g)
cv2.imwrite(os.path.join(output_directory, "final_diff_b.jpg"), diff_image_b)
# Padding for windowing RGB
padded_diff_r = np.pad(
diff_image_r,
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)),
mode="constant",
)
padded_diff_g = np.pad(
diff_image_g,
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)),
mode="constant",
)
padded_diff_b = np.pad(
diff_image_b,
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)),
mode="constant",
)
# Sliding window for RGB
windows_r = np.lib.stride_tricks.as_strided(
padded_diff_r, shape=shape, strides=strides
)
windows_g = np.lib.stride_tricks.as_strided(
padded_diff_g, shape=shape, strides=strides
)
windows_b = np.lib.stride_tricks.as_strided(
padded_diff_b, shape=shape, strides=strides
)
descriptors_rgb_diff = np.concatenate(
[
windows_r.reshape(-1, window_size * window_size),
windows_g.reshape(-1, window_size * window_size),
windows_b.reshape(-1, window_size * window_size),
],
axis=1,
)
# PCA on descriptors
shape = input_image.shape[::-1][1:] # shape = (height, width)
descriptors_gray_diff = descriptors_to_pca(
descriptors_gray_diff, pca_dim_gray, window_size, shape
)
descriptors_rgb_diff = descriptors_to_pca(
descriptors_rgb_diff, pca_dim_rgb, window_size, shape
)
# Concatenate grayscale and RGB PCA results
descriptors = np.concatenate((descriptors_gray_diff, descriptors_rgb_diff), axis=-1)
return descriptors
def k_means_clustering(FVS, components, image_shape):
"""
Perform K-means clustering on the given feature vectors.
Args:
FVS (array-like): The feature vectors to be clustered.
components (int): The number of clusters (components) to create.
image_shape (tuple): The size of the images used to reshape the change map.
Returns:
array-like: The change map obtained from the K-means clustering.
"""
kmeans = KMeans(components, verbose=0)
kmeans.fit(FVS)
flatten_change_map = kmeans.predict(FVS)
change_map = np.reshape(flatten_change_map, (image_shape[0], image_shape[1]))
return change_map
def clustering_to_mse_values(change_map, input_image, reference_image, n):
"""
Compute the normalized mean squared error (MSE) values for each cluster in a change map.
Args:
change_map (numpy.ndarray): Array representing the cluster labels for each pixel in the change map.
input_image (numpy.ndarray): Array representing the input image.
reference_image (numpy.ndarray): Array representing the reference image.
n (int): Number of clusters.
Returns:
list: Normalized MSE values for each cluster.
"""
# Ensure the images are in integer format for calculations
input_image = input_image.astype(int)
reference_image = reference_image.astype(int)
# Compute the squared differences
squared_diff = np.mean((input_image - reference_image) ** 2, axis=-1)
# Initialize arrays to store MSE and size for each cluster
mse = np.zeros(n, dtype=float)
size = np.zeros(n, dtype=int)
# Compute the MSE and size for each cluster
for k in range(n):
mask = change_map == k
size[k] = np.sum(mask)
if size[k] > 0:
mse[k] = np.sum(squared_diff[mask])
# Normalize MSE values by the number of pixels and the maximum possible MSE (255^2)
normalized_mse = (mse / size) / (255**2)
return normalized_mse.tolist()
def compute_change_map(
images,
window_size,
clusters,
pca_dim_gray,
pca_dim_rgb,
debug=False,
output_directory=None,
):
"""
Compute the change map and mean squared error (MSE) array for a pair of input and reference images.
Args:
images (tuple): A tuple containing the input and reference images.
window_size (int): The size of the sliding window for feature extraction.
clusters (int): The number of clusters for k-means clustering.
pca_dim_gray (int): The number of dimensions to reduce to for grayscale images.
pca_dim_rgb (int): The number of dimensions to reduce to for RGB images.
debug (bool, optional): Whether to enable debug mode. Defaults to False.
output_directory (str, optional): The directory to save the output files. Required if debug mode is enabled.
Returns:
tuple: A tuple containing the change map and MSE array.
Raises:
AssertionError: If debug mode is enabled but output_directory is not provided.
"""
input_image, reference_image = images
descriptors = get_descriptors(
images,
window_size,
pca_dim_gray,
pca_dim_rgb,
debug=debug,
output_directory=output_directory,
)
# Now we are ready for clustering!
change_map = k_means_clustering(descriptors, clusters, input_image.shape)
mse_array = clustering_to_mse_values(
change_map, input_image, reference_image, clusters
)
colormap = mcolors.LinearSegmentedColormap.from_list(
"custom_jet", plt.cm.jet(np.linspace(0, 1, clusters))
)
colors_array = (
colormap(np.linspace(0, 1, clusters))[:, :3] * 255
) # Convert to RGB values
palette = sns.color_palette("Paired", clusters)
palette = np.array(palette) * 255 # Convert to RGB values
# Optimized loop
change_map_flat = change_map.ravel()
colored_change_map_flat = (
colors_array[change_map_flat]
.reshape(change_map.shape[0], change_map.shape[1], 3)
.astype(np.uint8)
)
palette_colored_change_map_flat = (
palette[change_map_flat]
.reshape(change_map.shape[0], change_map.shape[1], 3)
.astype(np.uint8)
)
if debug:
assert output_directory is not None, "Output directory must be provided"
cv2.imwrite(
os.path.join(
output_directory,
f"window_size_{window_size}_pca_dim_gray{pca_dim_gray}_pca_dim_rgb{pca_dim_rgb}_clusters_{clusters}.jpg",
),
colored_change_map_flat,
)
cv2.imwrite(
os.path.join(
output_directory,
f"PALETTE_window_size_{window_size}_pca_dim_gray{pca_dim_gray}_pca_dim_rgb{pca_dim_rgb}_clusters_{clusters}.jpg",
),
palette_colored_change_map_flat,
)
if debug:
assert output_directory is not None, "Output directory must be provided"
# Saving Output for later evaluation
np.savetxt(
os.path.join(output_directory, "clustering_data.csv"),
change_map,
delimiter=",",
)
return change_map, mse_array
# selects the classes to be shown to the user as 'changes'.
# this selection is done by an MSE heuristic using DBSCAN clustering, to seperate the highest mse-valued classes from the others.
# the eps density parameter of DBSCAN might differ from system to system
def find_group_of_accepted_classes_DBSCAN(
MSE_array, debug=False, output_directory=None
):
"""
Finds the group of accepted classes using the DBSCAN algorithm.
Parameters:
- MSE_array (list): A list of mean squared error values.
- debug (bool): Flag indicating whether to enable debug mode or not. Default is False.
- output_directory (str): The directory where the output files will be saved. Default is None.
Returns:
- accepted_classes (list): A list of indices of the accepted classes.
"""
clustering = DBSCAN(eps=0.02, min_samples=1).fit(np.array(MSE_array).reshape(-1, 1))
number_of_clusters = len(set(clustering.labels_))
if number_of_clusters == 1:
print("No significant changes are detected.")
# print(clustering.labels_)
classes = [[] for _ in range(number_of_clusters)]
centers = np.zeros(number_of_clusters)
np.add.at(centers, clustering.labels_, MSE_array)
for i in range(len(MSE_array)):
classes[clustering.labels_[i]].append(i)
centers /= np.array([len(c) for c in classes])
min_class = np.argmin(centers)
accepted_classes = np.where(clustering.labels_ != min_class)[0]
if debug:
assert output_directory is not None, "Output directory must be provided"
plt.figure()
plt.xlabel("Index")
plt.ylabel("MSE")
plt.scatter(range(len(MSE_array)), MSE_array, c="red")
plt.scatter(
accepted_classes[:],
np.array(MSE_array)[np.array(accepted_classes)],
c="blue",
)
plt.title("K Mean Classification")
plt.savefig(os.path.join(output_directory, "mse.png"))
# save output for later evaluation
np.savetxt(
os.path.join(output_directory, "accepted_classes.csv"),
accepted_classes,
delimiter=",",
)
return [accepted_classes]
def draw_combination_on_transparent_input_image(
classes_mse, clustering, combination, transparent_input_image
):
"""
Draws a combination of classes on a transparent input image based on their mean squared error (MSE) order.
Args:
classes_mse (numpy.ndarray): Array of mean squared errors for each class.
clustering (dict): Dictionary containing the clustering information for each class.
combination (list): List of classes to be drawn on the image.
transparent_input_image (numpy.ndarray): Transparent input image.
Returns:
numpy.ndarray: Transparent input image with the specified combination of classes drawn on it.
"""
# HEAT MAP ACCORDING TO MSE ORDER
sorted_indexes = np.argsort(classes_mse)
for class_ in combination:
index = np.argwhere(sorted_indexes == class_).flatten()[0]
c = plt.cm.jet(float(index) / (len(classes_mse) - 1))
for [i, j] in clustering[class_]:
transparent_input_image[i, j] = (
c[2] * 255,
c[1] * 255,
c[0] * 255,
255,
) # BGR
return transparent_input_image
def detect_changes(
images,
output_alpha,
window_size,
clusters,
pca_dim_gray,
pca_dim_rgb,
debug=False,
output_directory=None,
):
"""
Detects changes between two images using a combination of clustering and image processing techniques.
Args:
images (tuple): A tuple containing two input images.
output_alpha (int): The alpha value for the output image.
window_size (int): The size of the sliding window used for computing change map.
clusters (int): The number of clusters used for clustering pixels.
pca_dim_gray (int): The number of dimensions to reduce the grayscale image to using PCA.
pca_dim_rgb (int): The number of dimensions to reduce the RGB image to using PCA.
debug (bool, optional): Whether to enable debug mode. Defaults to False.
output_directory (str, optional): The output directory for saving intermediate results. Defaults to None.
Returns:
numpy.ndarray: The resulting image with detected changes.
"""
start_time = time.time()
input_image, _ = images
clustering_map, mse_array = compute_change_map(
images,
window_size=window_size,
clusters=clusters,
pca_dim_gray=pca_dim_gray,
pca_dim_rgb=pca_dim_rgb,
debug=debug,
output_directory=output_directory,
)
clustering = [np.empty((0, 2), dtype=int) for _ in range(clusters)]
# Get the indices of each element in the clustering_map
indices = np.indices(clustering_map.shape).transpose(1, 2, 0).reshape(-1, 2)
flattened_map = clustering_map.flatten()
for cluster_idx in range(clusters):
clustering[cluster_idx] = indices[flattened_map == cluster_idx]
b_channel, g_channel, r_channel = cv2.split(input_image)
alpha_channel = np.ones(b_channel.shape, dtype=b_channel.dtype) * 255
alpha_channel[:, :] = output_alpha
groups = find_group_of_accepted_classes_DBSCAN(mse_array, output_directory)
for group in groups:
transparent_input_image = cv2.merge(
(b_channel, g_channel, r_channel, alpha_channel)
)
result = draw_combination_on_transparent_input_image(
mse_array, clustering, group, transparent_input_image
)
print("--- Detect Changes time - %s seconds ---" % (time.time() - start_time))
return result
def pipeline(
images,
resize_factor=1.0,
output_alpha=50,
window_size=5,
clusters=16,
pca_dim_gray=3,
pca_dim_rgb=9,
debug=False,
output_directory=None,
):
"""
Applies a pipeline of image processing steps to detect changes in a sequence of images.
Args:
images (tuple): A list of input images.
resize_factor (float, optional): The factor by which to resize the images. Defaults to 1.0.
output_alpha (int, optional): The alpha value for the output images. Defaults to 50.
window_size (int, optional): The size of the sliding window for change detection. Defaults to 5.
clusters (int, optional): The number of clusters for color quantization. Defaults to 16.
pca_dim_gray (int, optional): The number of dimensions to keep for grayscale PCA. Defaults to 3.
pca_dim_rgb (int, optional): The number of dimensions to keep for RGB PCA. Defaults to 9.
debug (bool, optional): Whether to enable debug mode. Defaults to False.
output_directory (str, optional): The directory to save the output images. Defaults to None.
Returns:
numpy.ndarray: The resulting image with detected changes.
"""
if output_directory:
os.makedirs(output_directory, exist_ok=True)
preprocessed_images = preprocess_images(
images,
resize_factor=resize_factor,
debug=debug,
output_directory=output_directory,
)
result = detect_changes(
preprocessed_images,
output_alpha=output_alpha,
window_size=window_size,
clusters=clusters,
pca_dim_gray=pca_dim_gray,
pca_dim_rgb=pca_dim_rgb,
debug=debug,
output_directory=output_directory,
)
return result