Ii commited on
Commit
394a698
·
verified ·
1 Parent(s): 938f7cf

Delete refacer.py

Browse files
Files changed (1) hide show
  1. refacer.py +0 -220
refacer.py DELETED
@@ -1,220 +0,0 @@
1
- import cv2
2
- import onnxruntime as rt
3
- import sys
4
- from insightface.app import FaceAnalysis
5
- sys.path.insert(1, './recognition')
6
- from scrfd import SCRFD
7
- from arcface_onnx import ArcFaceONNX
8
- import os.path as osp
9
- import os
10
- from pathlib import Path
11
- from tqdm import tqdm
12
- import ffmpeg
13
- import random
14
- import multiprocessing as mp
15
- from concurrent.futures import ThreadPoolExecutor
16
- from insightface.model_zoo.inswapper import INSwapper
17
- import psutil
18
- from enum import Enum
19
- from insightface.app.common import Face
20
- from insightface.utils.storage import ensure_available
21
- import re
22
- import subprocess
23
- import gradio as gr
24
-
25
- class RefacerMode(Enum):
26
- CPU, CUDA, COREML, TENSORRT = range(1, 5)
27
-
28
- class Refacer:
29
- def __init__(self, force_cpu=False, colab_performance=False):
30
- self.first_face = False
31
- self.force_cpu = force_cpu
32
- self.colab_performance = colab_performance
33
- self.__check_encoders()
34
- self.__check_providers()
35
- self.total_mem = psutil.virtual_memory().total
36
- self.__init_apps()
37
-
38
- def __check_providers(self):
39
- if self.force_cpu:
40
- self.providers = ['CPUExecutionProvider']
41
- else:
42
- self.providers = rt.get_available_providers()
43
- rt.set_default_logger_severity(4)
44
- self.sess_options = rt.SessionOptions()
45
- self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL
46
- self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL
47
-
48
- if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers:
49
- self.mode = RefacerMode.CPU
50
- self.use_num_cpus = mp.cpu_count()-1
51
- self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
52
- print(f"CPU mode with providers {self.providers}")
53
- elif self.colab_performance:
54
- self.mode = RefacerMode.TENSORRT
55
- self.use_num_cpus = mp.cpu_count()-1
56
- self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
57
- print(f"TENSORRT mode with providers {self.providers}")
58
- elif 'CoreMLExecutionProvider' in self.providers:
59
- self.mode = RefacerMode.COREML
60
- self.use_num_cpus = mp.cpu_count()-1
61
- self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
62
- print(f"CoreML mode with providers {self.providers}")
63
- elif 'CUDAExecutionProvider' in self.providers:
64
- self.mode = RefacerMode.CUDA
65
- self.use_num_cpus = 2
66
- self.sess_options.intra_op_num_threads = 1
67
- if 'TensorrtExecutionProvider' in self.providers:
68
- self.providers.remove('TensorrtExecutionProvider')
69
- print(f"CUDA mode with providers {self.providers}")
70
-
71
- def __init_apps(self):
72
- assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface')
73
-
74
- model_path = os.path.join(assets_dir, 'det_10g.onnx')
75
- sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers)
76
- self.face_detector = SCRFD(model_path, sess_face)
77
- self.face_detector.prepare(0, input_size=(640, 640))
78
-
79
- model_path = os.path.join(assets_dir, 'w600k_r50.onnx')
80
- sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers)
81
- self.rec_app = ArcFaceONNX(model_path, sess_rec)
82
- self.rec_app.prepare(0)
83
-
84
- model_path = 'inswapper_128.onnx'
85
- sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers)
86
- self.face_swapper = INSwapper(model_path, sess_swap)
87
-
88
- def prepare_faces(self, faces):
89
- self.replacement_faces = []
90
- for face in faces:
91
- if "origin" in face:
92
- face_threshold = face['threshold']
93
- bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1)
94
- if len(kpss1) < 1:
95
- raise Exception('No face detected on "Face to replace" image')
96
- feat_original = self.rec_app.get(face['origin'], kpss1[0])
97
- else:
98
- face_threshold = 0
99
- self.first_face = True
100
- feat_original = None
101
- print('No origin image: First face change')
102
- _faces = self.__get_faces(face['destination'], max_num=1)
103
- if len(_faces) < 1:
104
- raise Exception('No face detected on "Destination face" image')
105
- self.replacement_faces.append((feat_original, _faces[0], face_threshold))
106
-
107
- def __convert_video(self, video_path, output_video_path):
108
- if self.video_has_audio:
109
- print("Merging audio with the refaced video...")
110
- new_path = output_video_path + str(random.randint(0, 999)) + "_c.mp4"
111
- in1 = ffmpeg.input(output_video_path)
112
- in2 = ffmpeg.input(video_path)
113
- out = ffmpeg.output(in1.video, in2.audio, new_path, video_bitrate=self.ffmpeg_video_bitrate, vcodec=self.ffmpeg_video_encoder)
114
- out.run(overwrite_output=True, quiet=True)
115
- else:
116
- new_path = output_video_path
117
- print("The video doesn't have audio, so post-processing is not necessary")
118
-
119
- print(f"The process has finished.\nThe refaced video can be found at {os.path.abspath(new_path)}")
120
-
121
- # Gradio UI output directly
122
- return new_path
123
-
124
- def __get_faces(self, frame, max_num=0):
125
- bboxes, kpss = self.face_detector.detect(frame, max_num=max_num, metric='default')
126
-
127
- if bboxes.shape[0] == 0:
128
- return []
129
- ret = []
130
- for i in range(bboxes.shape[0]):
131
- bbox = bboxes[i, 0:4]
132
- det_score = bboxes[i, 4]
133
- kps = None
134
- if kpss is not None:
135
- kps = kpss[i]
136
- face = Face(bbox=bbox, kps=kps, det_score=det_score)
137
- face.embedding = self.rec_app.get(frame, kps)
138
- ret.append(face)
139
- return ret
140
-
141
- def process_first_face(self, frame):
142
- faces = self.__get_faces(frame, max_num=1)
143
- if len(faces) != 0:
144
- frame = self.face_swapper.get(frame, faces[0], self.replacement_faces[0][1], paste_back=True)
145
- return frame
146
-
147
- def process_faces(self, frame):
148
- faces = self.__get_faces(frame, max_num=0)
149
- for rep_face in self.replacement_faces:
150
- for i in range(len(faces) - 1, -1, -1):
151
- sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding)
152
- if sim >= rep_face[2]:
153
- frame = self.face_swapper.get(frame, faces[i], rep_face[1], paste_back=True)
154
- del faces[i]
155
- break
156
- return frame
157
-
158
- def __check_video_has_audio(self, video_path):
159
- self.video_has_audio = False
160
- probe = ffmpeg.probe(video_path)
161
- audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)
162
- if audio_stream is not None:
163
- self.video_has_audio = True
164
-
165
- def reface_group(self, faces, frames, output):
166
- with ThreadPoolExecutor(max_workers=self.use_num_cpus) as executor:
167
- if self.first_face:
168
- results = list(tqdm(executor.map(self.process_first_face, frames), total=len(frames), desc="Processing frames"))
169
- else:
170
- results = list(tqdm(executor.map(self.process_faces, frames), total=len(frames), desc="Processing frames"))
171
- for result in results:
172
- output.write(result)
173
-
174
- def reface(self, video_path, faces):
175
- self.__check_video_has_audio(video_path)
176
- output_video_path = os.path.join('out', Path(video_path).name)
177
- self.prepare_faces(faces)
178
-
179
- cap = cv2.VideoCapture(video_path)
180
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
181
- print(f"Total frames: {total_frames}")
182
-
183
- fps = cap.get(cv2.CAP_PROP_FPS)
184
- frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
185
- frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
186
-
187
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
188
- output = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
189
-
190
- frames = []
191
- self.k = 1
192
- with tqdm(total=total_frames, desc="Extracting frames") as pbar:
193
- while cap.isOpened():
194
- flag, frame = cap.read()
195
- if flag and len(frame) > 0:
196
- frames.append(frame.copy())
197
- pbar.update()
198
- else:
199
- break
200
- if len(frames) > 1000:
201
- self.reface_group(faces, frames, output)
202
- frames = []
203
-
204
- cap.release()
205
- pbar.close()
206
-
207
- self.reface_group(faces, frames, output)
208
- frames = []
209
- output.release()
210
-
211
- return output_video_path
212
-
213
- def __try_ffmpeg_encoder(self, vcodec):
214
- print(f"Trying FFMPEG {vcodec} encoder")
215
- command = ['ffmpeg', '-y', '-f', 'lavfi', '-i', 'testsrc=duration=1:size=1280x720:rate=30', '-vcodec', vcodec, 'testsrc.mp4']
216
- try:
217
- subprocess.run(command, check=True)
218
- return True
219
- except subprocess.CalledProcessError:
220
- return False