Spaces:
Sleeping
Sleeping
import os | |
import cv2 | |
import yaml | |
import numpy as np | |
import warnings | |
from skimage import img_as_ubyte | |
import safetensors | |
import safetensors.torch | |
warnings.filterwarnings('ignore') | |
import torch | |
from src.facerender.modules.keypoint_detector import HEEstimator, KPDetector | |
from src.facerender.modules.mapping import MappingNet | |
from src.facerender.modules.generator import OcclusionAwareSPADEGenerator | |
from src.facerender.modules.make_animation import make_animation | |
# from pydub import AudioSegment | |
from src.utils.face_enhancer import enhancer_generator_with_len, enhancer_list | |
from src.utils.paste_pic import paste_pic | |
from src.utils.videoio import save_video_with_watermark | |
import imageio | |
try: | |
import webui # in webui | |
in_webui = True | |
except: | |
in_webui = False | |
def opencv_save_video(path, videos, fps = 25, img_size = 256): | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
video_writer = cv2.VideoWriter(path, fourcc, float(fps), (img_size, img_size)) | |
# Write each frame to the video file | |
for frame in videos: | |
video_writer.write(frame) | |
# cv2.imshow("Stream", frame) | |
# if cv2.waitKey(1) & 0xFF == ord('q'): | |
# break | |
# Release the video writer and close the file | |
video_writer.release() | |
class AnimateFromCoeff(): | |
def __init__(self, sadtalker_path, device): | |
with open(sadtalker_path['facerender_yaml']) as f: | |
config = yaml.safe_load(f) | |
generator = OcclusionAwareSPADEGenerator(**config['model_params']['generator_params'], | |
**config['model_params']['common_params']) | |
kp_extractor = KPDetector(**config['model_params']['kp_detector_params'], | |
**config['model_params']['common_params']) | |
he_estimator = HEEstimator(**config['model_params']['he_estimator_params'], | |
**config['model_params']['common_params']) | |
mapping = MappingNet(**config['model_params']['mapping_params']) | |
generator.to(device) | |
kp_extractor.to(device) | |
he_estimator.to(device) | |
mapping.to(device) | |
for param in generator.parameters(): | |
param.requires_grad = False | |
for param in kp_extractor.parameters(): | |
param.requires_grad = False | |
for param in he_estimator.parameters(): | |
param.requires_grad = False | |
for param in mapping.parameters(): | |
param.requires_grad = False | |
if sadtalker_path is not None: | |
if 'checkpoint' in sadtalker_path: # use safe tensor | |
self.load_cpk_facevid2vid_safetensor(sadtalker_path['checkpoint'], kp_detector=kp_extractor, generator=generator, he_estimator=None) | |
else: | |
self.load_cpk_facevid2vid(sadtalker_path['free_view_checkpoint'], kp_detector=kp_extractor, generator=generator, he_estimator=he_estimator) | |
else: | |
raise AttributeError("Checkpoint should be specified for video head pose estimator.") | |
if sadtalker_path['mappingnet_checkpoint'] is not None: | |
self.load_cpk_mapping(sadtalker_path['mappingnet_checkpoint'], mapping=mapping) | |
else: | |
raise AttributeError("Checkpoint should be specified for video head pose estimator.") | |
self.kp_extractor = kp_extractor | |
self.generator = generator | |
self.he_estimator = he_estimator | |
self.mapping = mapping | |
self.kp_extractor.eval() | |
self.generator.eval() | |
self.he_estimator.eval() | |
self.mapping.eval() | |
self.device = device | |
def load_cpk_facevid2vid_safetensor(self, checkpoint_path, generator=None, | |
kp_detector=None, he_estimator=None, | |
device="cpu"): | |
checkpoint = safetensors.torch.load_file(checkpoint_path) | |
if generator is not None: | |
x_generator = {} | |
for k,v in checkpoint.items(): | |
if 'generator' in k: | |
x_generator[k.replace('generator.', '')] = v | |
generator.load_state_dict(x_generator) | |
if kp_detector is not None: | |
x_generator = {} | |
for k,v in checkpoint.items(): | |
if 'kp_extractor' in k: | |
x_generator[k.replace('kp_extractor.', '')] = v | |
kp_detector.load_state_dict(x_generator) | |
if he_estimator is not None: | |
x_generator = {} | |
for k,v in checkpoint.items(): | |
if 'he_estimator' in k: | |
x_generator[k.replace('he_estimator.', '')] = v | |
he_estimator.load_state_dict(x_generator) | |
return None | |
def load_cpk_facevid2vid(self, checkpoint_path, generator=None, discriminator=None, | |
kp_detector=None, he_estimator=None, optimizer_generator=None, | |
optimizer_discriminator=None, optimizer_kp_detector=None, | |
optimizer_he_estimator=None, device="cpu"): | |
checkpoint = torch.load(checkpoint_path, map_location=torch.device(device)) | |
if generator is not None: | |
generator.load_state_dict(checkpoint['generator']) | |
if kp_detector is not None: | |
kp_detector.load_state_dict(checkpoint['kp_detector']) | |
if he_estimator is not None: | |
he_estimator.load_state_dict(checkpoint['he_estimator']) | |
if discriminator is not None: | |
try: | |
discriminator.load_state_dict(checkpoint['discriminator']) | |
except: | |
print ('No discriminator in the state-dict. Dicriminator will be randomly initialized') | |
if optimizer_generator is not None: | |
optimizer_generator.load_state_dict(checkpoint['optimizer_generator']) | |
if optimizer_discriminator is not None: | |
try: | |
optimizer_discriminator.load_state_dict(checkpoint['optimizer_discriminator']) | |
except RuntimeError as e: | |
print ('No discriminator optimizer in the state-dict. Optimizer will be not initialized') | |
if optimizer_kp_detector is not None: | |
optimizer_kp_detector.load_state_dict(checkpoint['optimizer_kp_detector']) | |
if optimizer_he_estimator is not None: | |
optimizer_he_estimator.load_state_dict(checkpoint['optimizer_he_estimator']) | |
return checkpoint['epoch'] | |
def load_cpk_mapping(self, checkpoint_path, mapping=None, discriminator=None, | |
optimizer_mapping=None, optimizer_discriminator=None, device='cpu'): | |
checkpoint = torch.load(checkpoint_path, map_location=torch.device(device)) | |
if mapping is not None: | |
mapping.load_state_dict(checkpoint['mapping']) | |
if discriminator is not None: | |
discriminator.load_state_dict(checkpoint['discriminator']) | |
if optimizer_mapping is not None: | |
optimizer_mapping.load_state_dict(checkpoint['optimizer_mapping']) | |
if optimizer_discriminator is not None: | |
optimizer_discriminator.load_state_dict(checkpoint['optimizer_discriminator']) | |
return checkpoint['epoch'] | |
def generate(self, x, video_save_dir, pic_path, crop_info, enhancer=None, background_enhancer=None, preprocess='crop', img_size=256, fps = 25): | |
source_image=x['source_image'].type(torch.FloatTensor) | |
source_semantics=x['source_semantics'].type(torch.FloatTensor) | |
target_semantics=x['target_semantics_list'].type(torch.FloatTensor) | |
source_image=source_image.to(self.device) | |
source_semantics=source_semantics.to(self.device) | |
target_semantics=target_semantics.to(self.device) | |
frame_num = x['frame_num'] | |
predictions_video = make_animation(source_image, source_semantics, target_semantics, | |
self.generator, self.kp_extractor, self.he_estimator, self.mapping, | |
None, None, None, use_exp = True) | |
predictions_video = predictions_video.reshape((-1,)+predictions_video.shape[2:]) | |
predictions_video = predictions_video[:frame_num] | |
video = [] | |
for idx in range(predictions_video.shape[0]): | |
image = predictions_video[idx] | |
image = np.transpose(image.data.cpu().numpy(), [1, 2, 0]).astype(np.float32) | |
video.append(image) | |
result = img_as_ubyte(video) | |
### the generated video is 256x256, so we keep the aspect ratio, | |
original_size = crop_info[0] | |
if original_size: | |
# sadtalker origin | |
result = [ cv2.resize(result_i,(img_size, int(img_size * original_size[1]/original_size[0]) )) for result_i in result ] | |
# result = [ cv2.cvtColor(cv2.resize(result_i, (img_size, int(img_size * original_size[1]/original_size[0]) )), cv2.COLOR_BGR2RGB) for result_i in result ] | |
# result = [ cv2.cvtColor(result_i, cv2.COLOR_BGR2RGB) for result_i in result ] | |
video_name = x['video_name'] + '.mp4' | |
path = os.path.join(video_save_dir, 'temp_'+video_name) | |
print("fps: ", fps, len(result)) | |
# imageio.mimsave(path, result, fps=float(fps)) | |
# OpenCV save | |
# save_video(path, result, fps, img_size) | |
print(path) | |
# mimsave | |
imageio.mimsave(path, result, fps=float(fps)) | |
av_path = os.path.join(video_save_dir, video_name) | |
return_path = av_path | |
audio_path = x['audio_path'] | |
# audio_name = os.path.splitext(os.path.split(audio_path)[-1])[0] | |
# new_audio_path = os.path.join(video_save_dir, 'new_answer.wav') | |
# start_time = 0 | |
# # cog will not keep the .mp3 filename | |
# sound = AudioSegment.from_file(audio_path) | |
# frames = frame_num | |
# end_time = start_time + frames*1/fps*1000 | |
# word1=sound.set_frame_rate(16000) | |
# word = word1[start_time:end_time] | |
# word.export(new_audio_path, format="wav") | |
save_video_with_watermark(path, audio_path, av_path, watermark= False) | |
if 'full' in preprocess.lower(): | |
# only add watermark to the full image. | |
video_name_full = x['video_name'] + '_full.mp4' | |
full_video_path = os.path.join(video_save_dir, video_name_full) | |
return_path = full_video_path | |
paste_pic(path, pic_path, crop_info, audio_path, full_video_path, extended_crop= True if 'ext' in preprocess.lower() else False) | |
print(f'The generated video is named {video_save_dir}/{video_name_full}') | |
else: | |
full_video_path = av_path | |
### paste back then enhancers | |
if enhancer: | |
video_name_enhancer = x['video_name'] + '_enhanced.mp4' | |
enhanced_path = os.path.join(video_save_dir, 'temp_'+video_name_enhancer) | |
av_path_enhancer = os.path.join(video_save_dir, video_name_enhancer) | |
return_path = av_path_enhancer | |
try: | |
enhanced_images_gen_with_len = enhancer_generator_with_len(full_video_path, method=enhancer, bg_upsampler=background_enhancer) | |
imageio.mimsave(enhanced_path, enhanced_images_gen_with_len, fps=float(fps)) | |
# print(enhanced_images_gen_with_len.shape) | |
# save_video(enhanced_path, enhanced_images_gen_with_len, fps, img_size) | |
except: | |
enhanced_images_gen_with_len = enhancer_list(full_video_path, method=enhancer, bg_upsampler=background_enhancer) | |
# print(enhanced_images_gen_with_len.shape) | |
# save_video(enhanced_path, enhanced_images_gen_with_len, fps, img_size) | |
imageio.mimsave(enhanced_path, enhanced_images_gen_with_len, fps=float(fps)) | |
save_video_with_watermark(enhanced_path, audio_path, av_path_enhancer, watermark= False) | |
# print(f'The generated video is named {video_save_dir}/{video_name_enhancer}') | |
os.remove(enhanced_path) | |
os.remove(path) | |
return return_path | |