Spaces:
Sleeping
Sleeping
import os | |
import cv2 | |
import numpy as np | |
from skimage.exposure import match_histograms | |
from sklearn.cluster import KMeans, DBSCAN | |
from sklearn.decomposition import PCA | |
import matplotlib.pyplot as plt | |
import matplotlib.colors as mcolors | |
import seaborn as sns | |
import time | |
def resize_images(images, resize_factor=1.0): | |
""" | |
Resizes the input and reference images based on the average dimensions of the two images and a resize factor. | |
Parameters: | |
images (tuple): A tuple containing two images (input_image, reference_image). Both images should be numpy arrays. | |
resize_factor (float): A factor by which to resize the images. Default is 1.0, which means the images will be resized to the average dimensions of the two images. | |
Returns: | |
tuple: A tuple containing the resized input and reference images. | |
Example: | |
>>> input_image = cv2.imread('input.jpg') | |
>>> reference_image = cv2.imread('reference.jpg') | |
>>> resized_images = resize_images((input_image, reference_image), resize_factor=0.5) | |
""" | |
input_image, reference_image = images | |
average_width = (input_image.shape[1] + reference_image.shape[1]) * 0.5 | |
average_height = (input_image.shape[0] + reference_image.shape[0]) * 0.5 | |
new_shape = ( | |
int(resize_factor * average_width), | |
int(resize_factor * average_height), | |
) | |
input_image = cv2.resize(input_image, new_shape, interpolation=cv2.INTER_AREA) | |
reference_image = cv2.resize( | |
reference_image, new_shape, interpolation=cv2.INTER_AREA | |
) | |
return input_image, reference_image | |
def homography(images, debug=False, output_directory=None): | |
""" | |
Apply homography transformation to align two images. | |
Args: | |
images (tuple): A tuple containing two images, where the first image is the input image and the second image is the reference image. | |
debug (bool, optional): If True, debug images will be generated. Defaults to False. | |
output_directory (str, optional): The directory to save the debug images. Defaults to None. | |
Returns: | |
tuple: A tuple containing the aligned input image and the reference image. | |
""" | |
input_image, reference_image = images | |
# Initiate SIFT detector | |
sift = cv2.SIFT_create() | |
# find the keypoints and descriptors with SIFT | |
input_keypoints, input_descriptors = sift.detectAndCompute(input_image, None) | |
reference_keypoints, reference_descriptors = sift.detectAndCompute( | |
reference_image, None | |
) | |
# BFMatcher with default params | |
bf = cv2.BFMatcher() | |
matches = bf.knnMatch(reference_descriptors, input_descriptors, k=2) | |
# Apply ratio test | |
good_draw = [] | |
good_without_list = [] | |
for m, n in matches: | |
if m.distance < 0.8 * n.distance: # 0.8 = a value suggested by David G. Lowe. | |
good_draw.append([m]) | |
good_without_list.append(m) | |
# cv.drawMatchesKnn expects list of lists as matches. | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
os.makedirs(output_directory, exist_ok=True) | |
cv2.imwrite( | |
os.path.join(output_directory, "matching.png"), | |
cv2.drawMatchesKnn( | |
reference_image, | |
reference_keypoints, | |
input_image, | |
input_keypoints, | |
good_draw, | |
None, | |
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS, | |
), | |
) | |
# Extract location of good matches | |
reference_points = np.zeros((len(good_without_list), 2), dtype=np.float32) | |
input_points = reference_points.copy() | |
for i, match in enumerate(good_without_list): | |
input_points[i, :] = reference_keypoints[match.queryIdx].pt | |
reference_points[i, :] = input_keypoints[match.trainIdx].pt | |
# Find homography | |
h, _ = cv2.findHomography(input_points, reference_points, cv2.RANSAC) | |
# Use homography | |
height, width = reference_image.shape[:2] | |
white_reference_image = 255 - np.zeros(shape=reference_image.shape, dtype=np.uint8) | |
white_reg = cv2.warpPerspective(white_reference_image, h, (width, height)) | |
blank_pixels_mask = np.any(white_reg != [255, 255, 255], axis=-1) | |
reference_image_registered = cv2.warpPerspective( | |
reference_image, h, (width, height) | |
) | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
cv2.imwrite( | |
os.path.join(output_directory, "aligned.png"), reference_image_registered | |
) | |
input_image[blank_pixels_mask] = [0, 0, 0] | |
reference_image_registered[blank_pixels_mask] = [0, 0, 0] | |
return input_image, reference_image_registered | |
def histogram_matching(images, debug=False, output_directory=None): | |
""" | |
Perform histogram matching between an input image and a reference image. | |
Args: | |
images (tuple): A tuple containing the input image and the reference image. | |
debug (bool, optional): If True, save the histogram-matched image to the output directory. Defaults to False. | |
output_directory (str, optional): The directory to save the histogram-matched image. Defaults to None. | |
Returns: | |
tuple: A tuple containing the input image and the histogram-matched reference image. | |
""" | |
input_image, reference_image = images | |
reference_image_matched = match_histograms( | |
reference_image, input_image, channel_axis=-1 | |
) | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
cv2.imwrite( | |
os.path.join(output_directory, "histogram_matched.jpg"), | |
reference_image_matched, | |
) | |
reference_image_matched = np.asarray(reference_image_matched, dtype=np.uint8) | |
return input_image, reference_image_matched | |
def preprocess_images(images, resize_factor=1.0, debug=False, output_directory=None): | |
""" | |
Preprocesses a list of images by performing the following steps: | |
1. Resizes the images based on the given resize factor. | |
2. Applies homography to align the resized images. | |
3. Performs histogram matching on the aligned images. | |
Args: | |
images (tuple): A tuple containing the input image and the reference image. | |
resize_factor (float, optional): The factor by which to resize the images. Defaults to 1.0. | |
debug (bool, optional): Whether to enable debug mode. Defaults to False. | |
output_directory (str, optional): The directory to save the output images. Defaults to None. | |
Returns: | |
tuple: The preprocessed images. | |
Example: | |
>>> images = (input_image, reference_image) | |
>>> preprocess_images(images, resize_factor=0.5, debug=True, output_directory='output/') | |
""" | |
start_time = time.time() | |
resized_images = resize_images(images, resize_factor) | |
aligned_images = homography( | |
resized_images, debug=debug, output_directory=output_directory | |
) | |
matched_images = histogram_matching( | |
aligned_images, debug=debug, output_directory=output_directory | |
) | |
print("--- Preprocessing time - %s seconds ---" % (time.time() - start_time)) | |
return matched_images | |
# The returned vector_set goes later to the PCA algorithm which derives the EVS (Eigen Vector Space). | |
# Therefore, there is a mean normalization of the data | |
# jump_size is for iterating non-overlapping windows. This parameter should be eqaul to the window_size of the system | |
def find_vector_set(descriptors, jump_size, shape): | |
""" | |
Find the vector set from the given descriptors. | |
Args: | |
descriptors (numpy.ndarray): The input descriptors. | |
jump_size (int): The jump size for sampling the descriptors. | |
shape (tuple): The shape of the descriptors. | |
Returns: | |
tuple: A tuple containing the vector set and the mean vector. | |
""" | |
size_0, size_1 = shape | |
descriptors_2d = descriptors.reshape((size_0, size_1, descriptors.shape[1])) | |
vector_set = descriptors_2d[::jump_size, ::jump_size] | |
vector_set = vector_set.reshape( | |
(vector_set.shape[0] * vector_set.shape[1], vector_set.shape[2]) | |
) | |
mean_vec = np.mean(vector_set, axis=0) | |
vector_set = vector_set - mean_vec # mean normalization | |
return vector_set, mean_vec | |
# returns the FSV (Feature Vector Space) which then goes directly to clustering (with Kmeans) | |
# Multiply the data with the EVS to get the entire data in the PCA target space | |
def find_FVS(descriptors, EVS, mean_vec): | |
""" | |
Calculate the feature vector space (FVS) by performing dot product of descriptors and EVS, | |
and subtracting the mean vector from the result. | |
Args: | |
descriptors (numpy.ndarray): Array of descriptors. | |
EVS (numpy.ndarray): Eigenvalue matrix. | |
mean_vec (numpy.ndarray): Mean vector. | |
Returns: | |
numpy.ndarray: The calculated feature vector space (FVS). | |
""" | |
FVS = np.dot(descriptors, EVS) | |
FVS = FVS - mean_vec | |
# print("\nfeature vector space size", FVS.shape) | |
return FVS | |
# assumes descriptors is already flattened | |
# returns descriptors after moving them into the PCA vector space | |
def descriptors_to_pca(descriptors, pca_target_dim, window_size, shape): | |
""" | |
Applies Principal Component Analysis (PCA) to a set of descriptors. | |
Args: | |
descriptors (list): List of descriptors. | |
pca_target_dim (int): Target dimensionality for PCA. | |
window_size (int): Size of the sliding window. | |
shape (tuple): Shape of the descriptors. | |
Returns: | |
list: Feature vector set after applying PCA. | |
""" | |
vector_set, mean_vec = find_vector_set(descriptors, window_size, shape) | |
pca = PCA(pca_target_dim) | |
pca.fit(vector_set) | |
EVS = pca.components_ | |
mean_vec = np.dot(mean_vec, EVS.transpose()) | |
FVS = find_FVS(descriptors, EVS.transpose(), mean_vec) | |
return FVS | |
def get_descriptors( | |
images, | |
window_size, | |
pca_dim_gray, | |
pca_dim_rgb, | |
debug=False, | |
output_directory=None, | |
): | |
""" | |
Compute descriptors for input images using sliding window technique and PCA. | |
Args: | |
images (tuple): A tuple containing the input image and reference image. | |
window_size (int): The size of the sliding window. | |
pca_dim_gray (int): The number of dimensions to keep for grayscale PCA. | |
pca_dim_rgb (int): The number of dimensions to keep for RGB PCA. | |
debug (bool, optional): Whether to enable debug mode. Defaults to False. | |
output_directory (str, optional): The directory to save debug images. Required if debug is True. | |
Returns: | |
numpy.ndarray: The computed descriptors. | |
Raises: | |
AssertionError: If debug is True but output_directory is not provided. | |
""" | |
input_image, reference_image = images | |
diff_image_gray = cv2.cvtColor( | |
cv2.absdiff(input_image, reference_image), cv2.COLOR_BGR2GRAY | |
) | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
cv2.imwrite(os.path.join(output_directory, "diff.jpg"), diff_image_gray) | |
# Padding for windowing | |
padded_diff_gray = np.pad( | |
diff_image_gray, | |
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)), | |
mode="constant", | |
) | |
# Sliding window for gray | |
shape = (input_image.shape[0], input_image.shape[1], window_size, window_size) | |
strides = padded_diff_gray.strides * 2 | |
windows_gray = np.lib.stride_tricks.as_strided( | |
padded_diff_gray, shape=shape, strides=strides | |
) | |
descriptors_gray_diff = windows_gray.reshape(-1, window_size * window_size) | |
# 3-channel RGB differences | |
diff_image_r = cv2.absdiff(input_image[:, :, 0], reference_image[:, :, 0]) | |
diff_image_g = cv2.absdiff(input_image[:, :, 1], reference_image[:, :, 1]) | |
diff_image_b = cv2.absdiff(input_image[:, :, 2], reference_image[:, :, 2]) | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
cv2.imwrite( | |
os.path.join(output_directory, "final_diff.jpg"), | |
cv2.absdiff(input_image, reference_image), | |
) | |
cv2.imwrite(os.path.join(output_directory, "final_diff_r.jpg"), diff_image_r) | |
cv2.imwrite(os.path.join(output_directory, "final_diff_g.jpg"), diff_image_g) | |
cv2.imwrite(os.path.join(output_directory, "final_diff_b.jpg"), diff_image_b) | |
# Padding for windowing RGB | |
padded_diff_r = np.pad( | |
diff_image_r, | |
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)), | |
mode="constant", | |
) | |
padded_diff_g = np.pad( | |
diff_image_g, | |
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)), | |
mode="constant", | |
) | |
padded_diff_b = np.pad( | |
diff_image_b, | |
((window_size // 2, window_size // 2), (window_size // 2, window_size // 2)), | |
mode="constant", | |
) | |
# Sliding window for RGB | |
windows_r = np.lib.stride_tricks.as_strided( | |
padded_diff_r, shape=shape, strides=strides | |
) | |
windows_g = np.lib.stride_tricks.as_strided( | |
padded_diff_g, shape=shape, strides=strides | |
) | |
windows_b = np.lib.stride_tricks.as_strided( | |
padded_diff_b, shape=shape, strides=strides | |
) | |
descriptors_rgb_diff = np.concatenate( | |
[ | |
windows_r.reshape(-1, window_size * window_size), | |
windows_g.reshape(-1, window_size * window_size), | |
windows_b.reshape(-1, window_size * window_size), | |
], | |
axis=1, | |
) | |
# PCA on descriptors | |
shape = input_image.shape[::-1][1:] # shape = (height, width) | |
descriptors_gray_diff = descriptors_to_pca( | |
descriptors_gray_diff, pca_dim_gray, window_size, shape | |
) | |
descriptors_rgb_diff = descriptors_to_pca( | |
descriptors_rgb_diff, pca_dim_rgb, window_size, shape | |
) | |
# Concatenate grayscale and RGB PCA results | |
descriptors = np.concatenate((descriptors_gray_diff, descriptors_rgb_diff), axis=-1) | |
return descriptors | |
def k_means_clustering(FVS, components, image_shape): | |
""" | |
Perform K-means clustering on the given feature vectors. | |
Args: | |
FVS (array-like): The feature vectors to be clustered. | |
components (int): The number of clusters (components) to create. | |
image_shape (tuple): The size of the images used to reshape the change map. | |
Returns: | |
array-like: The change map obtained from the K-means clustering. | |
""" | |
kmeans = KMeans(components, verbose=0) | |
kmeans.fit(FVS) | |
flatten_change_map = kmeans.predict(FVS) | |
change_map = np.reshape(flatten_change_map, (image_shape[0], image_shape[1])) | |
return change_map | |
def clustering_to_mse_values(change_map, input_image, reference_image, n): | |
""" | |
Compute the normalized mean squared error (MSE) values for each cluster in a change map. | |
Args: | |
change_map (numpy.ndarray): Array representing the cluster labels for each pixel in the change map. | |
input_image (numpy.ndarray): Array representing the input image. | |
reference_image (numpy.ndarray): Array representing the reference image. | |
n (int): Number of clusters. | |
Returns: | |
list: Normalized MSE values for each cluster. | |
""" | |
# Ensure the images are in integer format for calculations | |
input_image = input_image.astype(int) | |
reference_image = reference_image.astype(int) | |
# Compute the squared differences | |
squared_diff = np.mean((input_image - reference_image) ** 2, axis=-1) | |
# Initialize arrays to store MSE and size for each cluster | |
mse = np.zeros(n, dtype=float) | |
size = np.zeros(n, dtype=int) | |
# Compute the MSE and size for each cluster | |
for k in range(n): | |
mask = change_map == k | |
size[k] = np.sum(mask) | |
if size[k] > 0: | |
mse[k] = np.sum(squared_diff[mask]) | |
# Normalize MSE values by the number of pixels and the maximum possible MSE (255^2) | |
normalized_mse = (mse / size) / (255**2) | |
return normalized_mse.tolist() | |
def compute_change_map( | |
images, | |
window_size, | |
clusters, | |
pca_dim_gray, | |
pca_dim_rgb, | |
debug=False, | |
output_directory=None, | |
): | |
""" | |
Compute the change map and mean squared error (MSE) array for a pair of input and reference images. | |
Args: | |
images (tuple): A tuple containing the input and reference images. | |
window_size (int): The size of the sliding window for feature extraction. | |
clusters (int): The number of clusters for k-means clustering. | |
pca_dim_gray (int): The number of dimensions to reduce to for grayscale images. | |
pca_dim_rgb (int): The number of dimensions to reduce to for RGB images. | |
debug (bool, optional): Whether to enable debug mode. Defaults to False. | |
output_directory (str, optional): The directory to save the output files. Required if debug mode is enabled. | |
Returns: | |
tuple: A tuple containing the change map and MSE array. | |
Raises: | |
AssertionError: If debug mode is enabled but output_directory is not provided. | |
""" | |
input_image, reference_image = images | |
descriptors = get_descriptors( | |
images, | |
window_size, | |
pca_dim_gray, | |
pca_dim_rgb, | |
debug=debug, | |
output_directory=output_directory, | |
) | |
# Now we are ready for clustering! | |
change_map = k_means_clustering(descriptors, clusters, input_image.shape) | |
mse_array = clustering_to_mse_values( | |
change_map, input_image, reference_image, clusters | |
) | |
colormap = mcolors.LinearSegmentedColormap.from_list( | |
"custom_jet", plt.cm.jet(np.linspace(0, 1, clusters)) | |
) | |
colors_array = ( | |
colormap(np.linspace(0, 1, clusters))[:, :3] * 255 | |
) # Convert to RGB values | |
palette = sns.color_palette("Paired", clusters) | |
palette = np.array(palette) * 255 # Convert to RGB values | |
# Optimized loop | |
change_map_flat = change_map.ravel() | |
colored_change_map_flat = ( | |
colors_array[change_map_flat] | |
.reshape(change_map.shape[0], change_map.shape[1], 3) | |
.astype(np.uint8) | |
) | |
palette_colored_change_map_flat = ( | |
palette[change_map_flat] | |
.reshape(change_map.shape[0], change_map.shape[1], 3) | |
.astype(np.uint8) | |
) | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
cv2.imwrite( | |
os.path.join( | |
output_directory, | |
f"window_size_{window_size}_pca_dim_gray{pca_dim_gray}_pca_dim_rgb{pca_dim_rgb}_clusters_{clusters}.jpg", | |
), | |
colored_change_map_flat, | |
) | |
cv2.imwrite( | |
os.path.join( | |
output_directory, | |
f"PALETTE_window_size_{window_size}_pca_dim_gray{pca_dim_gray}_pca_dim_rgb{pca_dim_rgb}_clusters_{clusters}.jpg", | |
), | |
palette_colored_change_map_flat, | |
) | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
# Saving Output for later evaluation | |
np.savetxt( | |
os.path.join(output_directory, "clustering_data.csv"), | |
change_map, | |
delimiter=",", | |
) | |
return change_map, mse_array | |
# selects the classes to be shown to the user as 'changes'. | |
# this selection is done by an MSE heuristic using DBSCAN clustering, to seperate the highest mse-valued classes from the others. | |
# the eps density parameter of DBSCAN might differ from system to system | |
def find_group_of_accepted_classes_DBSCAN( | |
MSE_array, debug=False, output_directory=None | |
): | |
""" | |
Finds the group of accepted classes using the DBSCAN algorithm. | |
Parameters: | |
- MSE_array (list): A list of mean squared error values. | |
- debug (bool): Flag indicating whether to enable debug mode or not. Default is False. | |
- output_directory (str): The directory where the output files will be saved. Default is None. | |
Returns: | |
- accepted_classes (list): A list of indices of the accepted classes. | |
""" | |
clustering = DBSCAN(eps=0.02, min_samples=1).fit(np.array(MSE_array).reshape(-1, 1)) | |
number_of_clusters = len(set(clustering.labels_)) | |
if number_of_clusters == 1: | |
print("No significant changes are detected.") | |
# print(clustering.labels_) | |
classes = [[] for _ in range(number_of_clusters)] | |
centers = np.zeros(number_of_clusters) | |
np.add.at(centers, clustering.labels_, MSE_array) | |
for i in range(len(MSE_array)): | |
classes[clustering.labels_[i]].append(i) | |
centers /= np.array([len(c) for c in classes]) | |
min_class = np.argmin(centers) | |
accepted_classes = np.where(clustering.labels_ != min_class)[0] | |
if debug: | |
assert output_directory is not None, "Output directory must be provided" | |
plt.figure() | |
plt.xlabel("Index") | |
plt.ylabel("MSE") | |
plt.scatter(range(len(MSE_array)), MSE_array, c="red") | |
plt.scatter( | |
accepted_classes[:], | |
np.array(MSE_array)[np.array(accepted_classes)], | |
c="blue", | |
) | |
plt.title("K Mean Classification") | |
plt.savefig(os.path.join(output_directory, "mse.png")) | |
# save output for later evaluation | |
np.savetxt( | |
os.path.join(output_directory, "accepted_classes.csv"), | |
accepted_classes, | |
delimiter=",", | |
) | |
return [accepted_classes] | |
def draw_combination_on_transparent_input_image( | |
classes_mse, clustering, combination, transparent_input_image | |
): | |
""" | |
Draws a combination of classes on a transparent input image based on their mean squared error (MSE) order. | |
Args: | |
classes_mse (numpy.ndarray): Array of mean squared errors for each class. | |
clustering (dict): Dictionary containing the clustering information for each class. | |
combination (list): List of classes to be drawn on the image. | |
transparent_input_image (numpy.ndarray): Transparent input image. | |
Returns: | |
numpy.ndarray: Transparent input image with the specified combination of classes drawn on it. | |
""" | |
# HEAT MAP ACCORDING TO MSE ORDER | |
sorted_indexes = np.argsort(classes_mse) | |
for class_ in combination: | |
index = np.argwhere(sorted_indexes == class_).flatten()[0] | |
c = plt.cm.jet(float(index) / (len(classes_mse) - 1)) | |
for [i, j] in clustering[class_]: | |
transparent_input_image[i, j] = ( | |
c[2] * 255, | |
c[1] * 255, | |
c[0] * 255, | |
255, | |
) # BGR | |
return transparent_input_image | |
def detect_changes( | |
images, | |
output_alpha, | |
window_size, | |
clusters, | |
pca_dim_gray, | |
pca_dim_rgb, | |
debug=False, | |
output_directory=None, | |
): | |
""" | |
Detects changes between two images using a combination of clustering and image processing techniques. | |
Args: | |
images (tuple): A tuple containing two input images. | |
output_alpha (int): The alpha value for the output image. | |
window_size (int): The size of the sliding window used for computing change map. | |
clusters (int): The number of clusters used for clustering pixels. | |
pca_dim_gray (int): The number of dimensions to reduce the grayscale image to using PCA. | |
pca_dim_rgb (int): The number of dimensions to reduce the RGB image to using PCA. | |
debug (bool, optional): Whether to enable debug mode. Defaults to False. | |
output_directory (str, optional): The output directory for saving intermediate results. Defaults to None. | |
Returns: | |
numpy.ndarray: The resulting image with detected changes. | |
""" | |
start_time = time.time() | |
input_image, _ = images | |
clustering_map, mse_array = compute_change_map( | |
images, | |
window_size=window_size, | |
clusters=clusters, | |
pca_dim_gray=pca_dim_gray, | |
pca_dim_rgb=pca_dim_rgb, | |
debug=debug, | |
output_directory=output_directory, | |
) | |
clustering = [np.empty((0, 2), dtype=int) for _ in range(clusters)] | |
# Get the indices of each element in the clustering_map | |
indices = np.indices(clustering_map.shape).transpose(1, 2, 0).reshape(-1, 2) | |
flattened_map = clustering_map.flatten() | |
for cluster_idx in range(clusters): | |
clustering[cluster_idx] = indices[flattened_map == cluster_idx] | |
b_channel, g_channel, r_channel = cv2.split(input_image) | |
alpha_channel = np.ones(b_channel.shape, dtype=b_channel.dtype) * 255 | |
alpha_channel[:, :] = output_alpha | |
groups = find_group_of_accepted_classes_DBSCAN(mse_array, output_directory) | |
for group in groups: | |
transparent_input_image = cv2.merge( | |
(b_channel, g_channel, r_channel, alpha_channel) | |
) | |
result = draw_combination_on_transparent_input_image( | |
mse_array, clustering, group, transparent_input_image | |
) | |
print("--- Detect Changes time - %s seconds ---" % (time.time() - start_time)) | |
return result | |
def pipeline( | |
images, | |
resize_factor=1.0, | |
output_alpha=50, | |
window_size=5, | |
clusters=16, | |
pca_dim_gray=3, | |
pca_dim_rgb=9, | |
debug=False, | |
output_directory=None, | |
): | |
""" | |
Applies a pipeline of image processing steps to detect changes in a sequence of images. | |
Args: | |
images (tuple): A list of input images. | |
resize_factor (float, optional): The factor by which to resize the images. Defaults to 1.0. | |
output_alpha (int, optional): The alpha value for the output images. Defaults to 50. | |
window_size (int, optional): The size of the sliding window for change detection. Defaults to 5. | |
clusters (int, optional): The number of clusters for color quantization. Defaults to 16. | |
pca_dim_gray (int, optional): The number of dimensions to keep for grayscale PCA. Defaults to 3. | |
pca_dim_rgb (int, optional): The number of dimensions to keep for RGB PCA. Defaults to 9. | |
debug (bool, optional): Whether to enable debug mode. Defaults to False. | |
output_directory (str, optional): The directory to save the output images. Defaults to None. | |
Returns: | |
numpy.ndarray: The resulting image with detected changes. | |
""" | |
if output_directory: | |
os.makedirs(output_directory, exist_ok=True) | |
preprocessed_images = preprocess_images( | |
images, | |
resize_factor=resize_factor, | |
debug=debug, | |
output_directory=output_directory, | |
) | |
result = detect_changes( | |
preprocessed_images, | |
output_alpha=output_alpha, | |
window_size=window_size, | |
clusters=clusters, | |
pca_dim_gray=pca_dim_gray, | |
pca_dim_rgb=pca_dim_rgb, | |
debug=debug, | |
output_directory=output_directory, | |
) | |
return result |