import cv2 import numpy as np import os from scipy.optimize import least_squares from tomlkit import boolean from tqdm import tqdm import matplotlib.pyplot as plt class Image_loader(): def __init__(self, img_dir:str, downscale_factor:float): # loading the Camera intrinsic parameters K with open(os.path.join(img_dir,'K.txt')) as f: self.K = np.array(list((map(lambda x:list(map(lambda x:float(x), x.strip().split(' '))),f.read().split('\n'))))) self.image_list = [] # Loading the set of images for image in sorted(os.listdir(img_dir)): if image[-4:].lower() == '.jpg' or image[-5:].lower() == '.png': self.image_list.append(os.path.join(img_dir,image)) self.path = os.getcwd() self.factor = downscale_factor self.downscale() def downscale(self) -> None: ''' Downscales the Image intrinsic parameter acc to the downscale factor ''' self.K[0, 0] /= self.factor self.K[1, 1] /= self.factor self.K[0, 2] /= self.factor self.K[1, 2] /= self.factor def downscale_image(self, image): for _ in range(1,int(self.factor / 2) + 1): image = cv2.pyrDown(image) return image class Sfm(): def __init__(self, img_dir:str, downscale_factor:float = 2.0) -> None: ''' Initialise and Sfm object. ''' self.img_obj = Image_loader(img_dir,downscale_factor) def triangulation(self, point_2d_1, point_2d_2, projection_matrix_1, projection_matrix_2) -> tuple: ''' Triangulates 3d points from 2d vectors and projection matrices returns projection matrix of first camera, projection matrix of second camera, point cloud ''' pt_cloud = cv2.triangulatePoints(point_2d_1, point_2d_2, projection_matrix_1.T, projection_matrix_2.T) return projection_matrix_1.T, projection_matrix_2.T, (pt_cloud / pt_cloud[3]) def PnP(self, obj_point, image_point , K, dist_coeff, rot_vector, initial) -> tuple: ''' Finds an object pose from 3D-2D point correspondences using the RANSAC scheme. returns rotational matrix, translational matrix, image points, object points, rotational vector ''' if initial == 1: obj_point = obj_point[:, 0 ,:] image_point = image_point.T rot_vector = rot_vector.T _, rot_vector_calc, tran_vector, inlier = cv2.solvePnPRansac(obj_point, image_point, K, dist_coeff, cv2.SOLVEPNP_ITERATIVE) # Converts a rotation matrix to a rotation vector or vice versa rot_matrix, _ = cv2.Rodrigues(rot_vector_calc) if inlier is not None: image_point = image_point[inlier[:, 0]] obj_point = obj_point[inlier[:, 0]] rot_vector = rot_vector[inlier[:, 0]] return rot_matrix, tran_vector, image_point, obj_point, rot_vector def reprojection_error(self, obj_points, image_points, transform_matrix, K, homogenity) ->tuple: ''' Calculates the reprojection error ie the distance between the projected points and the actual points. returns total error, object points ''' rot_matrix = transform_matrix[:3, :3] tran_vector = transform_matrix[:3, 3] rot_vector, _ = cv2.Rodrigues(rot_matrix) if homogenity == 1: obj_points = cv2.convertPointsFromHomogeneous(obj_points.T) image_points_calc, _ = cv2.projectPoints(obj_points, rot_vector, tran_vector, K, None) image_points_calc = np.float32(image_points_calc[:, 0, :]) total_error = cv2.norm(image_points_calc, np.float32(image_points.T) if homogenity == 1 else np.float32(image_points), cv2.NORM_L2) return total_error / len(image_points_calc), obj_points def optimal_reprojection_error(self, obj_points) -> np.array: ''' calculates of the reprojection error during bundle adjustment returns error ''' transform_matrix = obj_points[0:12].reshape((3,4)) K = obj_points[12:21].reshape((3,3)) rest = int(len(obj_points[21:]) * 0.4) p = obj_points[21:21 + rest].reshape((2, int(rest/2))).T obj_points = obj_points[21 + rest:].reshape((int(len(obj_points[21 + rest:])/3), 3)) rot_matrix = transform_matrix[:3, :3] tran_vector = transform_matrix[:3, 3] rot_vector, _ = cv2.Rodrigues(rot_matrix) image_points, _ = cv2.projectPoints(obj_points, rot_vector, tran_vector, K, None) image_points = image_points[:, 0, :] error = [ (p[idx] - image_points[idx])**2 for idx in range(len(p))] return np.array(error).ravel()/len(p) def bundle_adjustment(self, _3d_point, opt, transform_matrix_new, K, r_error) -> tuple: ''' Bundle adjustment for the image and object points returns object points, image points, transformation matrix ''' opt_variables = np.hstack((transform_matrix_new.ravel(), K.ravel())) opt_variables = np.hstack((opt_variables, opt.ravel())) opt_variables = np.hstack((opt_variables, _3d_point.ravel())) values_corrected = least_squares(self.optimal_reprojection_error, opt_variables, gtol = r_error).x K = values_corrected[12:21].reshape((3,3)) rest = int(len(values_corrected[21:]) * 0.4) return values_corrected[21 + rest:].reshape((int(len(values_corrected[21 + rest:])/3), 3)), values_corrected[21:21 + rest].reshape((2, int(rest/2))).T, values_corrected[0:12].reshape((3,4)) def to_ply(self, path, point_cloud, colors) -> None: ''' Generates the .ply which can be used to open the point cloud ''' out_points = point_cloud.reshape(-1, 3) * 200 out_colors = colors.reshape(-1, 3) print(out_colors.shape, out_points.shape) verts = np.hstack([out_points, out_colors]) mean = np.mean(verts[:, :3], axis=0) scaled_verts = verts[:, :3] - mean dist = np.sqrt(scaled_verts[:, 0] ** 2 + scaled_verts[:, 1] ** 2 + scaled_verts[:, 2] ** 2) indx = np.where(dist < np.mean(dist) + 300) verts = verts[indx] ply_header = '''ply format ascii 1.0 element vertex %(vert_num)d property float x property float y property float z property uchar blue property uchar green property uchar red end_header ''' # Ensure the 'res' directory exists res_dir = os.path.join(path, 'res') os.makedirs(res_dir, exist_ok=True) # Use os.path.basename to extract the folder name in a platform-independent way folder_name = os.path.basename(os.path.dirname(self.img_obj.image_list[0])) ply_file_path = os.path.join(res_dir, folder_name + '.ply') # Write the .ply file with open(ply_file_path, 'w') as f: f.write(ply_header % dict(vert_num=len(verts))) np.savetxt(f, verts, '%f %f %f %d %d %d') def common_points(self, image_points_1, image_points_2, image_points_3) -> tuple: ''' Finds the common points between image 1 and 2 , image 2 and 3 returns common points of image 1-2, common points of image 2-3, mask of common points 1-2 , mask for common points 2-3 ''' cm_points_1 = [] cm_points_2 = [] for i in range(image_points_1.shape[0]): a = np.where(image_points_2 == image_points_1[i, :]) if a[0].size != 0: cm_points_1.append(i) cm_points_2.append(a[0][0]) mask_array_1 = np.ma.array(image_points_2, mask=False) mask_array_1.mask[cm_points_2] = True mask_array_1 = mask_array_1.compressed() mask_array_1 = mask_array_1.reshape(int(mask_array_1.shape[0] / 2), 2) mask_array_2 = np.ma.array(image_points_3, mask=False) mask_array_2.mask[cm_points_2] = True mask_array_2 = mask_array_2.compressed() mask_array_2 = mask_array_2.reshape(int(mask_array_2.shape[0] / 2), 2) print(" Shape New Array", mask_array_1.shape, mask_array_2.shape) return np.array(cm_points_1), np.array(cm_points_2), mask_array_1, mask_array_2 def find_features(self, image_0, image_1) -> tuple: ''' Feature detection using the sift algorithm and KNN return keypoints(features) of image1 and image2 ''' sift = cv2.xfeatures2d.SIFT_create() key_points_0, desc_0 = sift.detectAndCompute(cv2.cvtColor(image_0, cv2.COLOR_BGR2GRAY), None) key_points_1, desc_1 = sift.detectAndCompute(cv2.cvtColor(image_1, cv2.COLOR_BGR2GRAY), None) bf = cv2.BFMatcher() matches = bf.knnMatch(desc_0, desc_1, k=2) feature = [] for m, n in matches: if m.distance < 0.70 * n.distance: feature.append(m) return np.float32([key_points_0[m.queryIdx].pt for m in feature]), np.float32([key_points_1[m.trainIdx].pt for m in feature]) def __call__(self, enable_bundle_adjustment:boolean=False): # cv2.namedWindow('image', cv2.WINDOW_NORMAL) pose_array = self.img_obj.K.ravel() transform_matrix_0 = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0]]) transform_matrix_1 = np.empty((3, 4)) pose_0 = np.matmul(self.img_obj.K, transform_matrix_0) pose_1 = np.empty((3, 4)) total_points = np.zeros((1, 3)) total_colors = np.zeros((1, 3)) image_0 = self.img_obj.downscale_image(cv2.imread(self.img_obj.image_list[0])) image_1 = self.img_obj.downscale_image(cv2.imread(self.img_obj.image_list[1])) feature_0, feature_1 = self.find_features(image_0, image_1) # Essential matrix essential_matrix, em_mask = cv2.findEssentialMat(feature_0, feature_1, self.img_obj.K, method=cv2.RANSAC, prob=0.999, threshold=0.4, mask=None) feature_0 = feature_0[em_mask.ravel() == 1] feature_1 = feature_1[em_mask.ravel() == 1] _, rot_matrix, tran_matrix, em_mask = cv2.recoverPose(essential_matrix, feature_0, feature_1, self.img_obj.K) feature_0 = feature_0[em_mask.ravel() > 0] feature_1 = feature_1[em_mask.ravel() > 0] transform_matrix_1[:3, :3] = np.matmul(rot_matrix, transform_matrix_0[:3, :3]) transform_matrix_1[:3, 3] = transform_matrix_0[:3, 3] + np.matmul(transform_matrix_0[:3, :3], tran_matrix.ravel()) pose_1 = np.matmul(self.img_obj.K, transform_matrix_1) feature_0, feature_1, points_3d = self.triangulation(pose_0, pose_1, feature_0, feature_1) error, points_3d = self.reprojection_error(points_3d, feature_1, transform_matrix_1, self.img_obj.K, homogenity = 1) #ideally error < 1 print("REPROJECTION ERROR: ", error) _, _, feature_1, points_3d, _ = self.PnP(points_3d, feature_1, self.img_obj.K, np.zeros((5, 1), dtype=np.float32), feature_0, initial=1) total_images = len(self.img_obj.image_list) - 2 pose_array = np.hstack((np.hstack((pose_array, pose_0.ravel())), pose_1.ravel())) threshold = 0.5 for i in tqdm(range(total_images)): image_2 = self.img_obj.downscale_image(cv2.imread(self.img_obj.image_list[i + 2])) features_cur, features_2 = self.find_features(image_1, image_2) if i != 0: feature_0, feature_1, points_3d = self.triangulation(pose_0, pose_1, feature_0, feature_1) feature_1 = feature_1.T points_3d = cv2.convertPointsFromHomogeneous(points_3d.T) points_3d = points_3d[:, 0, :] cm_points_0, cm_points_1, cm_mask_0, cm_mask_1 = self.common_points(feature_1, features_cur, features_2) cm_points_2 = features_2[cm_points_1] cm_points_cur = features_cur[cm_points_1] rot_matrix, tran_matrix, cm_points_2, points_3d, cm_points_cur = self.PnP(points_3d[cm_points_0], cm_points_2, self.img_obj.K, np.zeros((5, 1), dtype=np.float32), cm_points_cur, initial = 0) transform_matrix_1 = np.hstack((rot_matrix, tran_matrix)) pose_2 = np.matmul(self.img_obj.K, transform_matrix_1) error, points_3d = self.reprojection_error(points_3d, cm_points_2, transform_matrix_1, self.img_obj.K, homogenity = 0) cm_mask_0, cm_mask_1, points_3d = self.triangulation(pose_1, pose_2, cm_mask_0, cm_mask_1) error, points_3d = self.reprojection_error(points_3d, cm_mask_1, transform_matrix_1, self.img_obj.K, homogenity = 1) print("Reprojection Error: ", error) pose_array = np.hstack((pose_array, pose_2.ravel())) # takes a long time to run if enable_bundle_adjustment: points_3d, cm_mask_1, transform_matrix_1 = self.bundle_adjustment(points_3d, cm_mask_1, transform_matrix_1, self.img_obj.K, threshold) pose_2 = np.matmul(self.img_obj.K, transform_matrix_1) error, points_3d = self.reprojection_error(points_3d, cm_mask_1, transform_matrix_1, self.img_obj.K, homogenity = 0) print("Bundle Adjusted error: ",error) total_points = np.vstack((total_points, points_3d)) points_left = np.array(cm_mask_1, dtype=np.int32) color_vector = np.array([image_2[l[1], l[0]] for l in points_left]) total_colors = np.vstack((total_colors, color_vector)) else: total_points = np.vstack((total_points, points_3d[:, 0, :])) points_left = np.array(cm_mask_1, dtype=np.int32) color_vector = np.array([image_2[l[1], l[0]] for l in points_left.T]) total_colors = np.vstack((total_colors, color_vector)) transform_matrix_0 = np.copy(transform_matrix_1) pose_0 = np.copy(pose_1) # plt.scatter(i, error) # plt.pause(0.05) image_0 = np.copy(image_1) image_1 = np.copy(image_2) feature_0 = np.copy(features_cur) feature_1 = np.copy(features_2) pose_1 = np.copy(pose_2) # # cv2.imshow(self.img_obj.image_list[0].split('\\')[-2], image_2) # if cv2.waitKey(1) & 0xff == ord('q'): # break # cv2.destroyAllWindows() print("Printing to .ply file") print(total_points.shape, total_colors.shape) self.to_ply(self.img_obj.path, total_points, total_colors) print("Completed Exiting ...") output_dir = os.path.join(self.img_obj.path, 'res') os.makedirs(output_dir, exist_ok=True) # Ensure the 'res' directory exists folder_name = os.path.basename(os.path.dirname(self.img_obj.image_list[0])) output_file = os.path.join(output_dir, f"{folder_name}_pose_array.csv") np.savetxt(output_file, pose_array, delimiter='\n') def run_sfm(path): sfm = Sfm(path) sfm()