import sys sys.path.append('..') import os.path import options import cv2 import dlib import numpy as np import options as opt import matplotlib.pyplot as plt from tqdm.auto import tqdm from multiprocessing import Pool predictor_path = '../pretrain/shape_predictor_68_face_landmarks.dat' predictor = dlib.shape_predictor(predictor_path) detector = dlib.get_frontal_face_detector() RUN_PARALLEL = True FORCE_RATIO = True BORDER = 10 base = os.path.abspath('..') image_dir = os.path.join(base, options.images_dir) anno_dir = os.path.join(base, options.alignments_dir) crop_dir = os.path.join(base, options.crop_images_dir) def get_mouth_marks(shape): marks = np.zeros((2, 20)) co = 0 # Specific for the mouth. for ii in range(48, 68): """ This for loop is going over all mouth-related features. X and Y coordinates are extracted and stored separately. """ X = shape.part(ii) A = (X.x, X.y) marks[0, co] = X.x marks[1, co] = X.y co += 1 # Get the extreme points(top-left & bottom-right) X_left, Y_left, X_right, Y_right = [ int(np.amin(marks, axis=1)[0]), int(np.amin(marks, axis=1)[1]), int(np.amax(marks, axis=1)[0]), int(np.amax(marks, axis=1)[1]) ] return X_left, Y_left, X_right, Y_right translate_pairs = [] for speaker_no in range(1, 35): speaker_name = f's{speaker_no}' speaker_image_dir = os.path.join(image_dir, speaker_name) speaker_crop_dir = os.path.join(crop_dir, speaker_name) speaker_anno_dir = os.path.join(anno_dir, speaker_name) if not os.path.exists(speaker_image_dir): continue if not os.path.exists(speaker_crop_dir): os.mkdir(speaker_crop_dir) sentence_dirs = os.listdir(speaker_image_dir) for sentence in sentence_dirs: anno_filepath = os.path.join(speaker_anno_dir, f'{sentence}.align') if not os.path.exists(anno_filepath): continue translate_pairs.append((speaker_no, sentence)) print('PAIRS', len(translate_pairs)) def extract_mouth_image(speaker_no, sentence): speaker_name = f's{speaker_no}' speaker_image_dir = os.path.join(image_dir, speaker_name) speaker_crop_dir = os.path.join(crop_dir, speaker_name) img_sentence_dir = os.path.join(speaker_image_dir, sentence) crop_sentence_dir = os.path.join(speaker_crop_dir, sentence) filenames = os.listdir(img_sentence_dir) if not os.path.exists(crop_sentence_dir): os.mkdir(crop_sentence_dir) for filename in filenames: img_filepath = os.path.join(img_sentence_dir, filename) if not img_filepath.endswith('.jpg'): continue crop_filepath = os.path.join(crop_sentence_dir, filename) image = cv2.imread(img_filepath) detection_bbox = detector(image, 1)[0] # (360 x 288 x 3) width, height, depth = image.shape shape = predictor(image, detection_bbox) X_left, Y_left, X_right, Y_right = get_mouth_marks(shape) # Find the center of the mouth. X_center = (X_left + X_right) / 2.0 Y_center = (Y_left + Y_right) / 2.0 # Make a boarder for cropping. X_left_new = X_left - BORDER Y_left_new = Y_left - BORDER X_right_new = X_right + BORDER Y_right_new = Y_right + BORDER # Width and height for cropping # (before and after considering the border) width_new = X_right_new - X_left_new height_new = Y_right_new - Y_left_new width_current = X_right - X_left height_current = Y_right - Y_left height_crop_max = height_new width_crop_max = width_new if width_crop_max % 2 == 1: width_crop_max += 1 if height_crop_max % 2 == 1: height_crop_max += 1 if FORCE_RATIO: if width_crop_max < height_crop_max * 2: width_crop_max = height_crop_max * 2 else: height_crop_max = width_crop_max // 2 # Find the cropping points(top-left and bottom-right). X_left_crop = int(X_center - width_crop_max / 2.0) X_right_crop = int(X_center + width_crop_max / 2.0) Y_left_crop = int(Y_center - height_crop_max / 2.0) Y_right_crop = int(Y_center + height_crop_max / 2.0) X_left_crop = max(X_left_crop, 0) Y_left_crop = max(Y_left_crop, 0) mouth = image[ Y_left_crop:Y_right_crop, X_left_crop:X_right_crop, : ] if FORCE_RATIO: height, width, _ = mouth.shape if width != height * 2: mouth = cv2.resize( mouth, dsize=(height * 2, height), interpolation=cv2.INTER_CUBIC ) # print('SHAPE', mouth.shape) # print('IMG_PATH', img_filepath) # plt.imshow(mouth); plt.show() # print('CF', crop_filepath) cv2.imwrite(crop_filepath, mouth) return speaker_no, sentence if RUN_PARALLEL: def kwargify(**kwargs): return kwargs pbar = tqdm(translate_pairs) pool = Pool(processes=12) jobs = [] def callback(resp): pbar.desc = str(resp) pbar.update(1) for translate_pair in translate_pairs: speaker_no, sentence = translate_pair job_kwargs = kwargify( speaker_no=speaker_no, sentence=sentence ) job = pool.apply_async( extract_mouth_image, kwds=job_kwargs, callback=callback ) jobs.append(job) # Wait for all tasks to complete for job in jobs: job.wait() pool.close() pool.join() else: for translate_pair in tqdm(translate_pairs): extract_mouth_image(*translate_pair)