Spaces:
Running
Running
## Restormer: Efficient Transformer for High-Resolution Image Restoration | |
## Syed Waqas Zamir, Aditya Arora, Salman Khan, Munawar Hayat, Fahad Shahbaz Khan, and Ming-Hsuan Yang | |
## https://arxiv.org/abs/2111.09881 | |
import torch | |
import torch.nn.functional as F | |
import os | |
from skimage import img_as_ubyte | |
import cv2 | |
import argparse | |
import imageio | |
from skimage.transform import resize | |
from scipy.spatial import ConvexHull | |
from tqdm import tqdm | |
import numpy as np | |
import modules.generator as G | |
import modules.keypoint_detector as KPD | |
import yaml | |
from collections import OrderedDict | |
import depth | |
parser = argparse.ArgumentParser(description='Test DaGAN on your own images') | |
parser.add_argument('--source_image', default='./temp/source.jpg', type=str, help='Directory of input source image') | |
parser.add_argument('--driving_video', default='./temp/driving.mp4', type=str, help='Directory for driving video') | |
parser.add_argument('--output', default='./temp/result.mp4', type=str, help='Directory for driving video') | |
args = parser.parse_args() | |
def normalize_kp(kp_source, kp_driving, kp_driving_initial, adapt_movement_scale=False, | |
use_relative_movement=False, use_relative_jacobian=False): | |
if adapt_movement_scale: | |
source_area = ConvexHull(kp_source['value'][0].data.cpu().numpy()).volume | |
driving_area = ConvexHull(kp_driving_initial['value'][0].data.cpu().numpy()).volume | |
adapt_movement_scale = np.sqrt(source_area) / np.sqrt(driving_area) | |
else: | |
adapt_movement_scale = 1 | |
kp_new = {k: v for k, v in kp_driving.items()} | |
if use_relative_movement: | |
kp_value_diff = (kp_driving['value'] - kp_driving_initial['value']) | |
kp_value_diff *= adapt_movement_scale | |
kp_new['value'] = kp_value_diff + kp_source['value'] | |
if use_relative_jacobian: | |
jacobian_diff = torch.matmul(kp_driving['jacobian'], torch.inverse(kp_driving_initial['jacobian'])) | |
kp_new['jacobian'] = torch.matmul(jacobian_diff, kp_source['jacobian']) | |
return kp_new | |
def find_best_frame(source, driving, cpu=False): | |
import face_alignment | |
def normalize_kp(kp): | |
kp = kp - kp.mean(axis=0, keepdims=True) | |
area = ConvexHull(kp[:, :2]).volume | |
area = np.sqrt(area) | |
kp[:, :2] = kp[:, :2] / area | |
return kp | |
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=True, | |
device='cpu' if cpu else 'cuda') | |
kp_source = fa.get_landmarks(255 * source)[0] | |
kp_source = normalize_kp(kp_source) | |
norm = float('inf') | |
frame_num = 0 | |
for i, image in tqdm(enumerate(driving)): | |
kp_driving = fa.get_landmarks(255 * image)[0] | |
kp_driving = normalize_kp(kp_driving) | |
new_norm = (np.abs(kp_source - kp_driving) ** 2).sum() | |
if new_norm < norm: | |
norm = new_norm | |
frame_num = i | |
return frame_num | |
def make_animation(source_image, driving_video, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False): | |
sources = [] | |
drivings = [] | |
with torch.no_grad(): | |
predictions = [] | |
depth_gray = [] | |
source = torch.tensor(source_image[np.newaxis].astype(np.float32)).permute(0, 3, 1, 2) | |
driving = torch.tensor(np.array(driving_video)[np.newaxis].astype(np.float32)).permute(0, 4, 1, 2, 3) | |
if not cpu: | |
source = source.cuda() | |
driving = driving.cuda() | |
outputs = depth_decoder(depth_encoder(source)) | |
depth_source = outputs[("disp", 0)] | |
outputs = depth_decoder(depth_encoder(driving[:, :, 0])) | |
depth_driving = outputs[("disp", 0)] | |
source_kp = torch.cat((source,depth_source),1) | |
driving_kp = torch.cat((driving[:, :, 0],depth_driving),1) | |
kp_source = kp_detector(source_kp) | |
kp_driving_initial = kp_detector(driving_kp) | |
# kp_source = kp_detector(source) | |
# kp_driving_initial = kp_detector(driving[:, :, 0]) | |
for frame_idx in tqdm(range(driving.shape[2])): | |
driving_frame = driving[:, :, frame_idx] | |
if not cpu: | |
driving_frame = driving_frame.cuda() | |
outputs = depth_decoder(depth_encoder(driving_frame)) | |
depth_map = outputs[("disp", 0)] | |
gray_driving = np.transpose(depth_map.data.cpu().numpy(), [0, 2, 3, 1])[0] | |
gray_driving = 1-gray_driving/np.max(gray_driving) | |
frame = torch.cat((driving_frame,depth_map),1) | |
kp_driving = kp_detector(frame) | |
kp_norm = normalize_kp(kp_source=kp_source, kp_driving=kp_driving, | |
kp_driving_initial=kp_driving_initial, use_relative_movement=relative, | |
use_relative_jacobian=relative, adapt_movement_scale=adapt_movement_scale) | |
out = generator(source, kp_source=kp_source, kp_driving=kp_norm,source_depth = depth_source, driving_depth = depth_map) | |
drivings.append(np.transpose(driving_frame.data.cpu().numpy(), [0, 2, 3, 1])[0]) | |
sources.append(np.transpose(source.data.cpu().numpy(), [0, 2, 3, 1])[0]) | |
predictions.append(np.transpose(out['prediction'].data.cpu().numpy(), [0, 2, 3, 1])[0]) | |
depth_gray.append(gray_driving) | |
return sources, drivings, predictions,depth_gray | |
with open("config/vox-adv-256.yaml") as f: | |
config = yaml.load(f) | |
generator = G.SPADEDepthAwareGenerator(**config['model_params']['generator_params'],**config['model_params']['common_params']) | |
config['model_params']['common_params']['num_channels'] = 4 | |
kp_detector = KPD.KPDetector(**config['model_params']['kp_detector_params'],**config['model_params']['common_params']) | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
g_checkpoint = torch.load("generator.pt", map_location=device) | |
kp_checkpoint = torch.load("kp_detector.pt", map_location=device) | |
ckp_generator = OrderedDict((k.replace('module.',''),v) for k,v in g_checkpoint.items()) | |
generator.load_state_dict(ckp_generator) | |
ckp_kp_detector = OrderedDict((k.replace('module.',''),v) for k,v in kp_checkpoint.items()) | |
kp_detector.load_state_dict(ckp_kp_detector) | |
depth_encoder = depth.ResnetEncoder(18, False) | |
depth_decoder = depth.DepthDecoder(num_ch_enc=depth_encoder.num_ch_enc, scales=range(4)) | |
loaded_dict_enc = torch.load('encoder.pth') | |
loaded_dict_dec = torch.load('depth.pth') | |
filtered_dict_enc = {k: v for k, v in loaded_dict_enc.items() if k in depth_encoder.state_dict()} | |
depth_encoder.load_state_dict(filtered_dict_enc) | |
ckp_depth_decoder= {k: v for k, v in loaded_dict_dec.items() if k in depth_decoder.state_dict()} | |
depth_decoder.load_state_dict(ckp_depth_decoder) | |
depth_encoder.eval() | |
depth_decoder.eval() | |
# device = torch.device('cpu') | |
# stx() | |
generator = generator.to(device) | |
kp_detector = kp_detector.to(device) | |
depth_encoder = depth_encoder.to(device) | |
depth_decoder = depth_decoder.to(device) | |
generator.eval() | |
kp_detector.eval() | |
depth_encoder.eval() | |
depth_decoder.eval() | |
img_multiple_of = 8 | |
with torch.inference_mode(): | |
if torch.cuda.is_available(): | |
torch.cuda.ipc_collect() | |
torch.cuda.empty_cache() | |
source_image = imageio.imread(args.source_image) | |
reader = imageio.get_reader(args.driving_video) | |
fps = reader.get_meta_data()['fps'] | |
driving_video = [] | |
try: | |
for im in reader: | |
driving_video.append(im) | |
except RuntimeError: | |
pass | |
reader.close() | |
source_image = resize(source_image, (256, 256))[..., :3] | |
driving_video = [resize(frame, (256, 256))[..., :3] for frame in driving_video] | |
i = find_best_frame(source_image, driving_video) | |
print ("Best frame: " + str(i)) | |
driving_forward = driving_video[i:] | |
driving_backward = driving_video[:(i+1)][::-1] | |
sources_forward, drivings_forward, predictions_forward,depth_forward = make_animation(source_image, driving_forward, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False) | |
sources_backward, drivings_backward, predictions_backward,depth_backward = make_animation(source_image, driving_backward, generator, kp_detector, relative=True, adapt_movement_scale=True, cpu=False) | |
predictions = predictions_backward[::-1] + predictions_forward[1:] | |
sources = sources_backward[::-1] + sources_forward[1:] | |
drivings = drivings_backward[::-1] + drivings_forward[1:] | |
depth_gray = depth_backward[::-1] + depth_forward[1:] | |
imageio.mimsave(args.output, [np.concatenate((img_as_ubyte(s),img_as_ubyte(d),img_as_ubyte(p)),1) for (s,d,p) in zip(sources, drivings, predictions)], fps=fps) | |
imageio.mimsave("gray.mp4", depth_gray, fps=fps) | |
# merge the gray video | |
animation = np.array(imageio.mimread(args.output,memtest=False)) | |
gray = np.array(imageio.mimread("gray.mp4",memtest=False)) | |
src_dst = animation[:,:,:512,:] | |
animate = animation[:,:,512:,:] | |
merge = np.concatenate((src_dst,gray,animate),2) | |
imageio.mimsave(args.output, merge, fps=fps) | |
# print(f"\nRestored images are saved at {out_dir}") |