|
import io |
|
import gradio as gr |
|
import cv2 |
|
import onnxruntime as rt |
|
import sys |
|
from insightface.app import FaceAnalysis |
|
sys.path.insert(1, './recognition') |
|
from scrfd import SCRFD |
|
from arcface_onnx import ArcFaceONNX |
|
import os.path as osp |
|
import os |
|
from pathlib import Path |
|
from tqdm import tqdm |
|
import ffmpeg |
|
import random |
|
import multiprocessing as mp |
|
from concurrent.futures import ThreadPoolExecutor |
|
from insightface.model_zoo.inswapper import INSwapper |
|
import psutil |
|
from enum import Enum |
|
from insightface.app.common import Face |
|
from insightface.utils.storage import ensure_available |
|
import re |
|
import subprocess |
|
|
|
class RefacerMode(Enum): |
|
CPU, CUDA, COREML, TENSORRT = range(1, 5) |
|
|
|
class Refacer: |
|
def __init__(self, force_cpu=False, colab_performance=False): |
|
self.first_face = False |
|
self.force_cpu = force_cpu |
|
self.colab_performance = colab_performance |
|
self.__check_providers() |
|
self.total_mem = psutil.virtual_memory().total |
|
self.__init_apps() |
|
|
|
def __check_providers(self): |
|
if self.force_cpu: |
|
self.providers = ['CPUExecutionProvider'] |
|
else: |
|
self.providers = rt.get_available_providers() |
|
rt.set_default_logger_severity(4) |
|
self.sess_options = rt.SessionOptions() |
|
self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL |
|
self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
|
if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers: |
|
self.mode = RefacerMode.CPU |
|
self.use_num_cpus = mp.cpu_count() - 1 |
|
self.sess_options.intra_op_num_threads = int(self.use_num_cpus / 3) |
|
print(f"CPU mode with providers {self.providers}") |
|
elif self.colab_performance: |
|
self.mode = RefacerMode.TENSORRT |
|
self.use_num_cpus = mp.cpu_count() - 1 |
|
self.sess_options.intra_op_num_threads = int(self.use_num_cpus / 3) |
|
print(f"TENSORRT mode with providers {self.providers}") |
|
elif 'CoreMLExecutionProvider' in self.providers: |
|
self.mode = RefacerMode.COREML |
|
self.use_num_cpus = mp.cpu_count() - 1 |
|
self.sess_options.intra_op_num_threads = int(self.use_num_cpus / 3) |
|
print(f"CoreML mode with providers {self.providers}") |
|
elif 'CUDAExecutionProvider' in self.providers: |
|
self.mode = RefacerMode.CUDA |
|
self.use_num_cpus = 2 |
|
self.sess_options.intra_op_num_threads = 1 |
|
if 'TensorrtExecutionProvider' in the providers: |
|
self.providers.remove('TensorrtExecutionProvider') |
|
print(f"CUDA mode with providers {self.providers}") |
|
|
|
def __init_apps(self): |
|
assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface') |
|
|
|
model_path = os.path.join(assets_dir, 'det_10g.onnx') |
|
sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) |
|
self.face_detector = SCRFD(model_path, sess_face) |
|
self.face_detector.prepare(0, input_size=(640, 640)) |
|
|
|
model_path = os.path.join(assets_dir, 'w600k_r50.onnx') |
|
sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) |
|
self.rec_app = ArcFaceONNX(model_path, sess_rec) |
|
self.rec_app.prepare(0) |
|
|
|
model_path = 'inswapper_128.onnx' |
|
sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) |
|
self.face_swapper = INSwapper(model_path, sess_swap) |
|
|
|
def prepare_faces(self, faces): |
|
self.replacement_faces = [] |
|
for face in faces: |
|
if "origin" in face: |
|
face_threshold = face['threshold'] |
|
bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1) |
|
if len(kpss1) < 1: |
|
raise Exception('No face detected on "Face to replace" image') |
|
feat_original = self.rec_app.get(face['origin'], kpss1[0]) |
|
else: |
|
face_threshold = 0 |
|
self.first_face = True |
|
feat_original = None |
|
print('No origin image: First face change') |
|
_faces = self.__get_faces(face['destination'], max_num=1) |
|
if len(_faces) < 1: |
|
raise Exception('No face detected on "Destination face" image') |
|
self.replacement_faces.append((feat_original, _faces[0], face_threshold)) |
|
|
|
def __get_faces(self, frame, max_num=0): |
|
bboxes, kpss = self.face_detector.detect(frame, max_num=max_num, metric='default') |
|
|
|
if bboxes.shape[0] == 0: |
|
return [] |
|
ret = [] |
|
for i in range(bboxes.shape[0]): |
|
bbox = bboxes[i, 0:4] |
|
det_score = bboxes[i, 4] |
|
kps = None |
|
if kpss is not None: |
|
kps = kpss[i] |
|
face = Face(bbox=bbox, kps=kps, det_score=det_score) |
|
face.embedding = self.rec_app.get(frame, kps) |
|
ret.append(face) |
|
return ret |
|
|
|
def process_first_face(self, frame): |
|
faces = self.__get_faces(frame, max_num=1) |
|
if len(faces) != 0: |
|
frame = self.face_swapper.get(frame, faces[0], self.replacement_faces[0][1], paste_back=True) |
|
return frame |
|
|
|
def process_faces(self, frame): |
|
faces = self.__get_faces(frame, max_num=0) |
|
for rep_face in self.replacement_faces: |
|
for i in range(len(faces) - 1, -1, -1): |
|
sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding) |
|
if sim >= rep_face[2]: |
|
frame = self.face_swapper.get(frame, faces[i], rep_face[1], paste_back=True) |
|
del faces[i] |
|
break |
|
return frame |
|
|
|
def __check_video_has_audio(self, video_path): |
|
self.video_has_audio = False |
|
probe = ffmpeg.probe(video_path) |
|
audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) |
|
if audio_stream is not None: |
|
self.video_has_audio = True |
|
|
|
def reface_group(self, faces, frames): |
|
results = [] |
|
with ThreadPoolExecutor(max_workers=self.use_num_cpus) as executor: |
|
if self.first_face: |
|
results = list(tqdm(executor.map(self.process_first_face, frames), total=len(frames), desc="Processing frames")) |
|
else: |
|
results = list(tqdm(executor.map(self.process_faces, frames), total=len(frames), desc="Processing frames")) |
|
return results |
|
|
|
def reface(self, video_path, faces): |
|
self.__check_video_has_audio(video_path) |
|
self.prepare_faces(faces) |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
print(f"Total frames: {total_frames}") |
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
frames = [] |
|
with tqdm(total=total_frames, desc="Extracting frames") as pbar: |
|
while cap.isOpened(): |
|
flag, frame = cap.read() |
|
if flag and len(frame) > 0: |
|
frames.append(frame.copy()) |
|
pbar.update() |
|
else: |
|
break |
|
|
|
cap.release() |
|
pbar.close() |
|
|
|
refaced_frames = self.reface_group(faces, frames) |
|
|
|
video_buffer = io.BytesIO() |
|
out = cv2.VideoWriter('temp.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height)) |
|
|
|
for frame in refaced_frames: |
|
out.write(frame) |
|
out.release() |
|
|
|
with open('temp.mp4', 'rb') as f: |
|
video_buffer.write(f.read()) |
|
video_buffer.seek(0) |
|
|
|
os.remove('temp.mp4') |
|
|
|
return video_buffer |
|
|
|
|
|
def run(*vars): |
|
video_path = vars[0] |
|
origins = vars[1:(num_faces + 1)] |
|
destinations = vars[(num_faces + 1):(num_faces * 2) + 1] |
|
thresholds = vars[(num_faces * 2) + 1:] |
|
|
|
faces = [] |
|
for k in range(0, num_faces): |
|
if origins[k] is not None and destinations[k] is not None: |
|
faces.append({ |
|
'origin': origins[k], |
|
'destination[_{{{CITATION{{{_1{](https://github.com/qixinbo/OneButtonDeepLearning/tree/6e209f40102a7acaeb5d5798da013758c0ff9cd3/FaceSwap%2Fmenus%2FFaceSwap%2Finsightface_func%2Finsightface%2Fapp%2Fface_analysis.py)[_{{{CITATION{{{_2{](https://github.com/pgtinsley/arcface_aman/tree/7beda0d69dc40acc0138525ca84f50ecda126d8c/python-package%2Finsightface%2Fapp%2Fface_analysis.py) |