Spaces:
Running
Running
File size: 6,759 Bytes
e7cae83 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from typing import List, Optional
import os
import subprocess
import filetype
import facefusion.globals
from facefusion import logger, process_manager
from facefusion.typing import OutputVideoPreset, Fps, AudioBuffer
from facefusion.filesystem import get_temp_frames_pattern, get_temp_output_video_path
from facefusion.vision import restrict_video_fps
def run_ffmpeg(args : List[str]) -> bool:
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ]
commands.extend(args)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
while process_manager.is_processing():
try:
if facefusion.globals.log_level == 'debug':
log_debug(process)
return process.wait(timeout = 0.5) == 0
except subprocess.TimeoutExpired:
continue
return process.returncode == 0
def open_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]:
commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'quiet' ]
commands.extend(args)
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
def log_debug(process : subprocess.Popen[bytes]) -> None:
_, stderr = process.communicate()
errors = stderr.decode().split(os.linesep)
for error in errors:
if error.strip():
logger.debug(error.strip(), __name__.upper())
def extract_frames(target_path : str, temp_video_resolution : str, temp_video_fps : Fps) -> bool:
trim_frame_start = facefusion.globals.trim_frame_start
trim_frame_end = facefusion.globals.trim_frame_end
temp_frames_pattern = get_temp_frames_pattern(target_path, '%04d')
commands = [ '-i', target_path, '-s', str(temp_video_resolution), '-q:v', '0' ]
if trim_frame_start is not None and trim_frame_end is not None:
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ':end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ])
elif trim_frame_start is not None:
commands.extend([ '-vf', 'trim=start_frame=' + str(trim_frame_start) + ',fps=' + str(temp_video_fps) ])
elif trim_frame_end is not None:
commands.extend([ '-vf', 'trim=end_frame=' + str(trim_frame_end) + ',fps=' + str(temp_video_fps) ])
else:
commands.extend([ '-vf', 'fps=' + str(temp_video_fps) ])
commands.extend([ '-vsync', '0', temp_frames_pattern ])
return run_ffmpeg(commands)
def merge_video(target_path : str, output_video_resolution : str, output_video_fps : Fps) -> bool:
temp_video_fps = restrict_video_fps(target_path, output_video_fps)
temp_output_video_path = get_temp_output_video_path(target_path)
temp_frames_pattern = get_temp_frames_pattern(target_path, '%04d')
commands = [ '-r', str(temp_video_fps), '-i', temp_frames_pattern, '-s', str(output_video_resolution), '-c:v', facefusion.globals.output_video_encoder ]
if facefusion.globals.output_video_encoder in [ 'libx264', 'libx265' ]:
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.51))
commands.extend([ '-crf', str(output_video_compression), '-preset', facefusion.globals.output_video_preset ])
if facefusion.globals.output_video_encoder in [ 'libvpx-vp9' ]:
output_video_compression = round(63 - (facefusion.globals.output_video_quality * 0.63))
commands.extend([ '-crf', str(output_video_compression) ])
if facefusion.globals.output_video_encoder in [ 'h264_nvenc', 'hevc_nvenc' ]:
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.51))
commands.extend([ '-cq', str(output_video_compression), '-preset', map_nvenc_preset(facefusion.globals.output_video_preset) ])
if facefusion.globals.output_video_encoder in [ 'h264_amf', 'hevc_amf' ]:
output_video_compression = round(51 - (facefusion.globals.output_video_quality * 0.51))
commands.extend([ '-qp_i', str(output_video_compression), '-qp_p', str(output_video_compression), '-quality', map_amf_preset(facefusion.globals.output_video_preset) ])
commands.extend([ '-vf', 'framerate=fps=' + str(output_video_fps), '-pix_fmt', 'yuv420p', '-colorspace', 'bt709', '-y', temp_output_video_path ])
return run_ffmpeg(commands)
def copy_image(target_path : str, output_path : str, temp_image_resolution : str) -> bool:
is_webp = filetype.guess_mime(target_path) == 'image/webp'
temp_image_compression = 100 if is_webp else 0
commands = [ '-i', target_path, '-s', str(temp_image_resolution), '-q:v', str(temp_image_compression), '-y', output_path ]
return run_ffmpeg(commands)
def finalize_image(output_path : str, output_image_resolution : str) -> bool:
output_image_compression = round(31 - (facefusion.globals.output_image_quality * 0.31))
commands = [ '-i', output_path, '-s', str(output_image_resolution), '-q:v', str(output_image_compression), '-y', output_path ]
return run_ffmpeg(commands)
def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int) -> Optional[AudioBuffer]:
commands = [ '-i', target_path, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sample_rate), '-ac', str(channel_total), '-']
process = open_ffmpeg(commands)
audio_buffer, _ = process.communicate()
if process.returncode == 0:
return audio_buffer
return None
def restore_audio(target_path : str, output_path : str, output_video_fps : Fps) -> bool:
trim_frame_start = facefusion.globals.trim_frame_start
trim_frame_end = facefusion.globals.trim_frame_end
temp_output_video_path = get_temp_output_video_path(target_path)
commands = [ '-i', temp_output_video_path ]
if trim_frame_start is not None:
start_time = trim_frame_start / output_video_fps
commands.extend([ '-ss', str(start_time) ])
if trim_frame_end is not None:
end_time = trim_frame_end / output_video_fps
commands.extend([ '-to', str(end_time) ])
commands.extend([ '-i', target_path, '-c', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-y', output_path ])
return run_ffmpeg(commands)
def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool:
temp_output_path = get_temp_output_video_path(target_path)
commands = [ '-i', temp_output_path, '-i', audio_path, '-af', 'apad', '-shortest', '-y', output_path ]
return run_ffmpeg(commands)
def map_nvenc_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast', 'faster', 'fast' ]:
return 'fast'
if output_video_preset == 'medium':
return 'medium'
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
return 'slow'
return None
def map_amf_preset(output_video_preset : OutputVideoPreset) -> Optional[str]:
if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]:
return 'speed'
if output_video_preset in [ 'faster', 'fast', 'medium' ]:
return 'balanced'
if output_video_preset in [ 'slow', 'slower', 'veryslow' ]:
return 'quality'
return None
|