# Copyright (C) 2024-present Naver Corporation. All rights reserved. # Licensed under CC BY-NC-SA 4.0 (non-commercial use only). # # -------------------------------------------------------- # MASt3R to colmap export functions # -------------------------------------------------------- import os import torch import copy import numpy as np import torchvision import numpy as np from tqdm import tqdm from scipy.cluster.hierarchy import DisjointSet from scipy.spatial.transform import Rotation as R from mast3r.utils.misc import hash_md5 from mast3r.fast_nn import extract_correspondences_nonsym, bruteforce_reciprocal_nns import mast3r.utils.path_to_dust3r # noqa from dust3r.utils.geometry import find_reciprocal_matches, xy_grid # noqa def convert_im_matches_pairs(img0, img1, image_to_colmap, im_keypoints, matches_im0, matches_im1, viz): if viz: from matplotlib import pyplot as pl image_mean = torch.as_tensor( [0.5, 0.5, 0.5], device='cpu').reshape(1, 3, 1, 1) image_std = torch.as_tensor( [0.5, 0.5, 0.5], device='cpu').reshape(1, 3, 1, 1) rgb0 = img0['img'] * image_std + image_mean rgb0 = torchvision.transforms.functional.to_pil_image(rgb0[0]) rgb0 = np.array(rgb0) rgb1 = img1['img'] * image_std + image_mean rgb1 = torchvision.transforms.functional.to_pil_image(rgb1[0]) rgb1 = np.array(rgb1) imgs = [rgb0, rgb1] # visualize a few matches n_viz = 100 num_matches = matches_im0.shape[0] match_idx_to_viz = np.round(np.linspace( 0, num_matches - 1, n_viz)).astype(int) viz_matches_im0, viz_matches_im1 = matches_im0[match_idx_to_viz], matches_im1[match_idx_to_viz] H0, W0, H1, W1 = *imgs[0].shape[:2], *imgs[1].shape[:2] rgb0 = np.pad(imgs[0], ((0, max(H1 - H0, 0)), (0, 0), (0, 0)), 'constant', constant_values=0) rgb1 = np.pad(imgs[1], ((0, max(H0 - H1, 0)), (0, 0), (0, 0)), 'constant', constant_values=0) img = np.concatenate((rgb0, rgb1), axis=1) pl.figure() pl.imshow(img) cmap = pl.get_cmap('jet') for ii in range(n_viz): (x0, y0), (x1, y1) = viz_matches_im0[ii].T, viz_matches_im1[ii].T pl.plot([x0, x1 + W0], [y0, y1], '-+', color=cmap(ii / (n_viz - 1)), scalex=False, scaley=False) pl.show(block=True) matches = [matches_im0.astype(np.float64), matches_im1.astype(np.float64)] imgs = [img0, img1] imidx0 = img0['idx'] imidx1 = img1['idx'] ravel_matches = [] for j in range(2): H, W = imgs[j]['true_shape'][0] with np.errstate(invalid='ignore'): qx, qy = matches[j].round().astype(np.int32).T ravel_matches_j = qx.clip(min=0, max=W - 1, out=qx) + W * qy.clip(min=0, max=H - 1, out=qy) ravel_matches.append(ravel_matches_j) imidxj = imgs[j]['idx'] for m in ravel_matches_j: if m not in im_keypoints[imidxj]: im_keypoints[imidxj][m] = 0 im_keypoints[imidxj][m] += 1 imid0 = copy.deepcopy(image_to_colmap[imidx0]['colmap_imid']) imid1 = copy.deepcopy(image_to_colmap[imidx1]['colmap_imid']) if imid0 > imid1: colmap_matches = np.stack([ravel_matches[1], ravel_matches[0]], axis=-1) imid0, imid1 = imid1, imid0 imidx0, imidx1 = imidx1, imidx0 else: colmap_matches = np.stack([ravel_matches[0], ravel_matches[1]], axis=-1) colmap_matches = np.unique(colmap_matches, axis=0) return imidx0, imidx1, colmap_matches def get_im_matches(pred1, pred2, pairs, image_to_colmap, im_keypoints, conf_thr, is_sparse=True, subsample=8, pixel_tol=0, viz=False, device='cuda'): im_matches = {} for i in range(len(pred1['pts3d'])): imidx0 = pairs[i][0]['idx'] imidx1 = pairs[i][1]['idx'] if 'desc' in pred1: # mast3r descs = [pred1['desc'][i], pred2['desc'][i]] confidences = [pred1['desc_conf'][i], pred2['desc_conf'][i]] desc_dim = descs[0].shape[-1] if is_sparse: corres = extract_correspondences_nonsym(descs[0], descs[1], confidences[0], confidences[1], device=device, subsample=subsample, pixel_tol=pixel_tol) conf = corres[2] mask = conf >= conf_thr matches_im0 = corres[0][mask].cpu().numpy() matches_im1 = corres[1][mask].cpu().numpy() else: confidence_masks = [confidences[0] >= conf_thr, confidences[1] >= conf_thr] pts2d_list, desc_list = [], [] for j in range(2): conf_j = confidence_masks[j].cpu().numpy().flatten() true_shape_j = pairs[i][j]['true_shape'][0] pts2d_j = xy_grid( true_shape_j[1], true_shape_j[0]).reshape(-1, 2)[conf_j] desc_j = descs[j].detach().cpu( ).numpy().reshape(-1, desc_dim)[conf_j] pts2d_list.append(pts2d_j) desc_list.append(desc_j) if len(desc_list[0]) == 0 or len(desc_list[1]) == 0: continue nn0, nn1 = bruteforce_reciprocal_nns(desc_list[0], desc_list[1], device=device, dist='dot', block_size=2**13) reciprocal_in_P0 = (nn1[nn0] == np.arange(len(nn0))) matches_im1 = pts2d_list[1][nn0][reciprocal_in_P0] matches_im0 = pts2d_list[0][reciprocal_in_P0] else: pts3d = [pred1['pts3d'][i], pred2['pts3d_in_other_view'][i]] confidences = [pred1['conf'][i], pred2['conf'][i]] if is_sparse: corres = extract_correspondences_nonsym(pts3d[0], pts3d[1], confidences[0], confidences[1], device=device, subsample=subsample, pixel_tol=pixel_tol, ptmap_key='3d') conf = corres[2] mask = conf >= conf_thr matches_im0 = corres[0][mask].cpu().numpy() matches_im1 = corres[1][mask].cpu().numpy() else: confidence_masks = [confidences[0] >= conf_thr, confidences[1] >= conf_thr] # find 2D-2D matches between the two images pts2d_list, pts3d_list = [], [] for j in range(2): conf_j = confidence_masks[j].cpu().numpy().flatten() true_shape_j = pairs[i][j]['true_shape'][0] pts2d_j = xy_grid(true_shape_j[1], true_shape_j[0]).reshape(-1, 2)[conf_j] pts3d_j = pts3d[j].detach().cpu().numpy().reshape(-1, 3)[conf_j] pts2d_list.append(pts2d_j) pts3d_list.append(pts3d_j) PQ, PM = pts3d_list[0], pts3d_list[1] if len(PQ) == 0 or len(PM) == 0: continue reciprocal_in_PM, nnM_in_PQ, num_matches = find_reciprocal_matches( PQ, PM) matches_im1 = pts2d_list[1][reciprocal_in_PM] matches_im0 = pts2d_list[0][nnM_in_PQ][reciprocal_in_PM] if len(matches_im0) == 0: continue imidx0, imidx1, colmap_matches = convert_im_matches_pairs(pairs[i][0], pairs[i][1], image_to_colmap, im_keypoints, matches_im0, matches_im1, viz) im_matches[(imidx0, imidx1)] = colmap_matches return im_matches def get_im_matches_from_cache(pairs, cache_path, desc_conf, subsample, image_to_colmap, im_keypoints, conf_thr, viz=False, device='cuda'): im_matches = {} for i in range(len(pairs)): imidx0 = pairs[i][0]['idx'] imidx1 = pairs[i][1]['idx'] corres_idx1 = hash_md5(pairs[i][0]['instance']) corres_idx2 = hash_md5(pairs[i][1]['instance']) path_corres = cache_path + f'/corres_conf={desc_conf}_{subsample=}/{corres_idx1}-{corres_idx2}.pth' if os.path.isfile(path_corres): score, (xy1, xy2, confs) = torch.load(path_corres, map_location=device) else: path_corres = cache_path + f'/corres_conf={desc_conf}_{subsample=}/{corres_idx2}-{corres_idx1}.pth' score, (xy2, xy1, confs) = torch.load(path_corres, map_location=device) mask = confs >= conf_thr matches_im0 = xy1[mask].cpu().numpy() matches_im1 = xy2[mask].cpu().numpy() if len(matches_im0) == 0: continue imidx0, imidx1, colmap_matches = convert_im_matches_pairs(pairs[i][0], pairs[i][1], image_to_colmap, im_keypoints, matches_im0, matches_im1, viz) im_matches[(imidx0, imidx1)] = colmap_matches return im_matches def export_images(db, images, image_paths, focals, ga_world_to_cam, camera_model): # add cameras/images to the db # with the output of ga as prior image_to_colmap = {} im_keypoints = {} for idx in range(len(image_paths)): im_keypoints[idx] = {} H, W = images[idx]["orig_shape"] if focals is None: focal_x = focal_y = 1.2 * max(W, H) prior_focal_length = False cx = W / 2.0 cy = H / 2.0 elif isinstance(focals[idx], np.ndarray) and len(focals[idx].shape) == 2: # intrinsics focal_x = focals[idx][0, 0] focal_y = focals[idx][1, 1] cx = focals[idx][0, 2] * images[idx]["to_orig"][0, 0] cy = focals[idx][1, 2] * images[idx]["to_orig"][1, 1] prior_focal_length = True else: focal_x = focal_y = float(focals[idx]) prior_focal_length = True cx = W / 2.0 cy = H / 2.0 focal_x = focal_x * images[idx]["to_orig"][0, 0] focal_y = focal_y * images[idx]["to_orig"][1, 1] if camera_model == "SIMPLE_PINHOLE": model_id = 0 focal = (focal_x + focal_y) / 2.0 params = np.asarray([focal, cx, cy], np.float64) elif camera_model == "PINHOLE": model_id = 1 params = np.asarray([focal_x, focal_y, cx, cy], np.float64) elif camera_model == "SIMPLE_RADIAL": model_id = 2 focal = (focal_x + focal_y) / 2.0 params = np.asarray([focal, cx, cy, 0.0], np.float64) elif camera_model == "OPENCV": model_id = 4 params = np.asarray([focal_x, focal_y, cx, cy, 0.0, 0.0, 0.0, 0.0], np.float64) else: raise ValueError(f"invalid camera model {camera_model}") H, W = int(H), int(W) # OPENCV camera model camid = db.add_camera( model_id, W, H, params, prior_focal_length=prior_focal_length) if ga_world_to_cam is None: prior_t = np.zeros(3) prior_q = np.zeros(4) else: q = R.from_matrix(ga_world_to_cam[idx][:3, :3]).as_quat() prior_t = ga_world_to_cam[idx][:3, 3] prior_q = np.array([q[-1], q[0], q[1], q[2]]) imid = db.add_image( image_paths[idx], camid, prior_q=prior_q, prior_t=prior_t) image_to_colmap[idx] = { 'colmap_imid': imid, 'colmap_camid': camid } return image_to_colmap, im_keypoints def export_matches(db, images, image_to_colmap, im_keypoints, im_matches, min_len_track, skip_geometric_verification): colmap_image_pairs = [] # 2D-2D are quite dense # we want to remove the very small tracks # and export only kpt for which we have values # build tracks print("building tracks") keypoints_to_track_id = {} track_id_to_kpt_list = [] to_merge = [] for (imidx0, imidx1), colmap_matches in tqdm(im_matches.items()): if imidx0 not in keypoints_to_track_id: keypoints_to_track_id[imidx0] = {} if imidx1 not in keypoints_to_track_id: keypoints_to_track_id[imidx1] = {} for m in colmap_matches: if m[0] not in keypoints_to_track_id[imidx0] and m[1] not in keypoints_to_track_id[imidx1]: # new pair of kpts never seen before track_idx = len(track_id_to_kpt_list) keypoints_to_track_id[imidx0][m[0]] = track_idx keypoints_to_track_id[imidx1][m[1]] = track_idx track_id_to_kpt_list.append( [(imidx0, m[0]), (imidx1, m[1])]) elif m[1] not in keypoints_to_track_id[imidx1]: # 0 has a track, not 1 track_idx = keypoints_to_track_id[imidx0][m[0]] keypoints_to_track_id[imidx1][m[1]] = track_idx track_id_to_kpt_list[track_idx].append((imidx1, m[1])) elif m[0] not in keypoints_to_track_id[imidx0]: # 1 has a track, not 0 track_idx = keypoints_to_track_id[imidx1][m[1]] keypoints_to_track_id[imidx0][m[0]] = track_idx track_id_to_kpt_list[track_idx].append((imidx0, m[0])) else: # both have tracks, merge them track_idx0 = keypoints_to_track_id[imidx0][m[0]] track_idx1 = keypoints_to_track_id[imidx1][m[1]] if track_idx0 != track_idx1: # let's deal with them later to_merge.append((track_idx0, track_idx1)) # regroup merge targets print("merging tracks") unique = np.unique(to_merge) tree = DisjointSet(unique) for track_idx0, track_idx1 in tqdm(to_merge): tree.merge(track_idx0, track_idx1) subsets = tree.subsets() print("applying merge") for setvals in tqdm(subsets): new_trackid = len(track_id_to_kpt_list) kpt_list = [] for track_idx in setvals: kpt_list.extend(track_id_to_kpt_list[track_idx]) for imidx, kpid in track_id_to_kpt_list[track_idx]: keypoints_to_track_id[imidx][kpid] = new_trackid track_id_to_kpt_list.append(kpt_list) # binc = np.bincount([len(v) for v in track_id_to_kpt_list]) # nonzero = np.nonzero(binc) # nonzerobinc = binc[nonzero[0]] # print(nonzero[0].tolist()) # print(nonzerobinc) num_valid_tracks = sum( [1 for v in track_id_to_kpt_list if len(v) >= min_len_track]) keypoints_to_idx = {} print(f"squashing keypoints - {num_valid_tracks} valid tracks") for imidx, keypoints_imid in tqdm(im_keypoints.items()): imid = image_to_colmap[imidx]['colmap_imid'] keypoints_kept = [] keypoints_to_idx[imidx] = {} for kp in keypoints_imid.keys(): if kp not in keypoints_to_track_id[imidx]: continue track_idx = keypoints_to_track_id[imidx][kp] track_length = len(track_id_to_kpt_list[track_idx]) if track_length < min_len_track: continue keypoints_to_idx[imidx][kp] = len(keypoints_kept) keypoints_kept.append(kp) if len(keypoints_kept) == 0: continue keypoints_kept = np.array(keypoints_kept) keypoints_kept = np.unravel_index(keypoints_kept, images[imidx]['true_shape'][0])[ 0].base[:, ::-1].copy().astype(np.float32) # rescale coordinates keypoints_kept[:, 0] += 0.5 keypoints_kept[:, 1] += 0.5 keypoints_kept = geotrf(images[imidx]['to_orig'], keypoints_kept, norm=True) H, W = images[imidx]['orig_shape'] keypoints_kept[:, 0] = keypoints_kept[:, 0].clip(min=0, max=W - 0.01) keypoints_kept[:, 1] = keypoints_kept[:, 1].clip(min=0, max=H - 0.01) db.add_keypoints(imid, keypoints_kept) print("exporting im_matches") for (imidx0, imidx1), colmap_matches in im_matches.items(): imid0, imid1 = image_to_colmap[imidx0]['colmap_imid'], image_to_colmap[imidx1]['colmap_imid'] assert imid0 < imid1 final_matches = np.array([[keypoints_to_idx[imidx0][m[0]], keypoints_to_idx[imidx1][m[1]]] for m in colmap_matches if m[0] in keypoints_to_idx[imidx0] and m[1] in keypoints_to_idx[imidx1]]) if len(final_matches) > 0: colmap_image_pairs.append( (images[imidx0]['instance'], images[imidx1]['instance'])) db.add_matches(imid0, imid1, final_matches) if skip_geometric_verification: db.add_two_view_geometry(imid0, imid1, final_matches) return colmap_image_pairs