Spaces:
Runtime error
Runtime error
Harisreedhar
commited on
Commit
•
226cc7a
1
Parent(s):
cf144f1
update
Browse files- app.py +28 -27
- face_analyser.py +90 -25
- face_enhancer.py +32 -19
- face_swapper.py +53 -91
- nsfw_detector.py +9 -5
- swapper.py +0 -106
- utils.py +129 -5
app.py
CHANGED
@@ -17,13 +17,12 @@ import concurrent.futures
|
|
17 |
from moviepy.editor import VideoFileClip
|
18 |
|
19 |
from nsfw_detector import get_nsfw_detector
|
20 |
-
from face_swapper import Inswapper, paste_to_whole
|
21 |
from face_analyser import detect_conditions, get_analysed_data, swap_options_list
|
22 |
-
from face_enhancer import
|
23 |
from face_parsing import init_parser, swap_regions, mask_regions, mask_regions_to_list, SoftErosion
|
24 |
from utils import trim_video, StreamerThread, ProcessBar, open_directory, split_list_by_lengths, merge_img_sequence_from_ref
|
25 |
|
26 |
-
|
27 |
## ------------------------------ USER ARGS ------------------------------
|
28 |
|
29 |
parser = argparse.ArgumentParser(description="Swap-Mukham Face Swapper")
|
@@ -69,9 +68,12 @@ FACE_ANALYSER = None
|
|
69 |
FACE_ENHANCER = None
|
70 |
FACE_PARSER = None
|
71 |
NSFW_DETECTOR = None
|
|
|
|
|
|
|
72 |
|
73 |
## ------------------------------ SET EXECUTION PROVIDER ------------------------------
|
74 |
-
# Note:
|
75 |
|
76 |
PROVIDER = ["CPUExecutionProvider"]
|
77 |
|
@@ -88,7 +90,7 @@ else:
|
|
88 |
print("\n********** Running on CPU **********\n")
|
89 |
|
90 |
device = "cuda" if USE_CUDA else "cpu"
|
91 |
-
|
92 |
|
93 |
## ------------------------------ LOAD MODELS ------------------------------
|
94 |
|
@@ -223,7 +225,7 @@ def process(
|
|
223 |
yield f"### \n 🔞 {message}", *ui_before()
|
224 |
assert not nsfw, message
|
225 |
return False
|
226 |
-
|
227 |
|
228 |
yield "### \n ⌛ Analysing face data...", *ui_before()
|
229 |
if condition != "Specific Face":
|
@@ -241,26 +243,24 @@ def process(
|
|
241 |
|
242 |
yield "### \n ⌛ Swapping faces...", *ui_before()
|
243 |
preds, aimgs, matrs = FACE_SWAPPER.batch_forward(whole_frame_list, analysed_targets, analysed_sources)
|
244 |
-
|
245 |
|
246 |
if enable_face_parser:
|
247 |
yield "### \n ⌛ Applying face-parsing mask...", *ui_before()
|
248 |
for idx, (pred, aimg) in tqdm(enumerate(zip(preds, aimgs)), total=len(preds), desc="Face parsing"):
|
249 |
preds[idx] = swap_regions(pred, aimg, FACE_PARSER, smooth_mask, includes=includes, blur=int(blur_amount))
|
250 |
-
|
251 |
|
252 |
if face_enhancer_name != "NONE":
|
253 |
yield f"### \n ⌛ Enhancing faces with {face_enhancer_name}...", *ui_before()
|
254 |
for idx, pred in tqdm(enumerate(preds), total=len(preds), desc=f"{face_enhancer_name}"):
|
255 |
-
|
256 |
-
|
257 |
-
elif face_enhancer_name.startswith("REAL-ESRGAN"):
|
258 |
-
pred = realesrgan_enhance(pred, FACE_ENHANCER)
|
259 |
-
|
260 |
preds[idx] = cv2.resize(pred, (512,512))
|
261 |
aimgs[idx] = cv2.resize(aimgs[idx], (512,512))
|
262 |
matrs[idx] /= 0.25
|
263 |
-
|
|
|
264 |
|
265 |
split_preds = split_list_by_lengths(preds, num_faces_per_frame)
|
266 |
del preds
|
@@ -270,19 +270,19 @@ def process(
|
|
270 |
del matrs
|
271 |
|
272 |
yield "### \n ⌛ Post-processing...", *ui_before()
|
273 |
-
def
|
274 |
whole_img_path = frame_img
|
275 |
whole_img = cv2.imread(whole_img_path)
|
276 |
for p, a, m in zip(split_preds[frame_idx], split_aimgs[frame_idx], split_matrs[frame_idx]):
|
277 |
whole_img = paste_to_whole(p, a, m, whole_img, laplacian_blend=enable_laplacian_blend, crop_mask=(crop_top, crop_bott, crop_left, crop_right))
|
278 |
cv2.imwrite(whole_img_path, whole_img)
|
279 |
|
280 |
-
def
|
281 |
with concurrent.futures.ThreadPoolExecutor() as executor:
|
282 |
futures = []
|
283 |
for idx, frame_img in enumerate(image_sequence):
|
284 |
future = executor.submit(
|
285 |
-
|
286 |
idx,
|
287 |
frame_img,
|
288 |
split_preds,
|
@@ -302,8 +302,7 @@ def process(
|
|
302 |
except Exception as e:
|
303 |
print(f"An error occurred: {e}")
|
304 |
|
305 |
-
|
306 |
-
optimize_processing(
|
307 |
image_sequence,
|
308 |
split_preds,
|
309 |
split_aimgs,
|
@@ -432,13 +431,13 @@ def update_radio(value):
|
|
432 |
|
433 |
|
434 |
def swap_option_changed(value):
|
435 |
-
if value
|
436 |
return (
|
437 |
gr.update(visible=True),
|
438 |
gr.update(visible=False),
|
439 |
gr.update(visible=True),
|
440 |
)
|
441 |
-
elif value ==
|
442 |
return (
|
443 |
gr.update(visible=False),
|
444 |
gr.update(visible=True),
|
@@ -497,7 +496,7 @@ def stop_running():
|
|
497 |
if hasattr(STREAMER, "stop"):
|
498 |
STREAMER.stop()
|
499 |
STREAMER = None
|
500 |
-
|
501 |
|
502 |
|
503 |
def slider_changed(show_frame, video_path, frame_index):
|
@@ -538,8 +537,10 @@ with gr.Blocks(css=css) as interface:
|
|
538 |
with gr.Row():
|
539 |
with gr.Column(scale=0.4):
|
540 |
with gr.Tab("📄 Swap Condition"):
|
541 |
-
swap_option = gr.
|
542 |
swap_options_list,
|
|
|
|
|
543 |
show_label=False,
|
544 |
value=swap_options_list[0],
|
545 |
interactive=True,
|
@@ -636,7 +637,7 @@ with gr.Blocks(css=css) as interface:
|
|
636 |
)
|
637 |
|
638 |
face_enhancer_name = gr.Dropdown(
|
639 |
-
|
640 |
)
|
641 |
|
642 |
source_image_input = gr.Image(
|
@@ -675,8 +676,8 @@ with gr.Blocks(css=css) as interface:
|
|
675 |
)
|
676 |
|
677 |
with gr.Box(visible=True) as input_video_group:
|
678 |
-
|
679 |
-
video_input =
|
680 |
label="Target Video Path", interactive=True
|
681 |
)
|
682 |
with gr.Accordion("✂️ Trim video", open=False):
|
@@ -837,7 +838,7 @@ with gr.Blocks(css=css) as interface:
|
|
837 |
]
|
838 |
|
839 |
swap_event = swap_button.click(
|
840 |
-
fn=process, inputs=swap_inputs, outputs=swap_outputs, show_progress=True
|
841 |
)
|
842 |
|
843 |
cancel_button.click(
|
|
|
17 |
from moviepy.editor import VideoFileClip
|
18 |
|
19 |
from nsfw_detector import get_nsfw_detector
|
20 |
+
from face_swapper import Inswapper, paste_to_whole, place_foreground_on_background
|
21 |
from face_analyser import detect_conditions, get_analysed_data, swap_options_list
|
22 |
+
from face_enhancer import get_available_enhancer_names, load_face_enhancer_model
|
23 |
from face_parsing import init_parser, swap_regions, mask_regions, mask_regions_to_list, SoftErosion
|
24 |
from utils import trim_video, StreamerThread, ProcessBar, open_directory, split_list_by_lengths, merge_img_sequence_from_ref
|
25 |
|
|
|
26 |
## ------------------------------ USER ARGS ------------------------------
|
27 |
|
28 |
parser = argparse.ArgumentParser(description="Swap-Mukham Face Swapper")
|
|
|
68 |
FACE_ENHANCER = None
|
69 |
FACE_PARSER = None
|
70 |
NSFW_DETECTOR = None
|
71 |
+
FACE_ENHANCER_LIST = ["NONE"]
|
72 |
+
FACE_ENHANCER_LIST.extend(get_available_enhancer_names())
|
73 |
+
|
74 |
|
75 |
## ------------------------------ SET EXECUTION PROVIDER ------------------------------
|
76 |
+
# Note: Non CUDA users may change settings here
|
77 |
|
78 |
PROVIDER = ["CPUExecutionProvider"]
|
79 |
|
|
|
90 |
print("\n********** Running on CPU **********\n")
|
91 |
|
92 |
device = "cuda" if USE_CUDA else "cpu"
|
93 |
+
EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None
|
94 |
|
95 |
## ------------------------------ LOAD MODELS ------------------------------
|
96 |
|
|
|
225 |
yield f"### \n 🔞 {message}", *ui_before()
|
226 |
assert not nsfw, message
|
227 |
return False
|
228 |
+
EMPTY_CACHE()
|
229 |
|
230 |
yield "### \n ⌛ Analysing face data...", *ui_before()
|
231 |
if condition != "Specific Face":
|
|
|
243 |
|
244 |
yield "### \n ⌛ Swapping faces...", *ui_before()
|
245 |
preds, aimgs, matrs = FACE_SWAPPER.batch_forward(whole_frame_list, analysed_targets, analysed_sources)
|
246 |
+
EMPTY_CACHE()
|
247 |
|
248 |
if enable_face_parser:
|
249 |
yield "### \n ⌛ Applying face-parsing mask...", *ui_before()
|
250 |
for idx, (pred, aimg) in tqdm(enumerate(zip(preds, aimgs)), total=len(preds), desc="Face parsing"):
|
251 |
preds[idx] = swap_regions(pred, aimg, FACE_PARSER, smooth_mask, includes=includes, blur=int(blur_amount))
|
252 |
+
EMPTY_CACHE()
|
253 |
|
254 |
if face_enhancer_name != "NONE":
|
255 |
yield f"### \n ⌛ Enhancing faces with {face_enhancer_name}...", *ui_before()
|
256 |
for idx, pred in tqdm(enumerate(preds), total=len(preds), desc=f"{face_enhancer_name}"):
|
257 |
+
enhancer_model, enhancer_model_runner = FACE_ENHANCER
|
258 |
+
pred = enhancer_model_runner(pred, enhancer_model)
|
|
|
|
|
|
|
259 |
preds[idx] = cv2.resize(pred, (512,512))
|
260 |
aimgs[idx] = cv2.resize(aimgs[idx], (512,512))
|
261 |
matrs[idx] /= 0.25
|
262 |
+
|
263 |
+
EMPTY_CACHE()
|
264 |
|
265 |
split_preds = split_list_by_lengths(preds, num_faces_per_frame)
|
266 |
del preds
|
|
|
270 |
del matrs
|
271 |
|
272 |
yield "### \n ⌛ Post-processing...", *ui_before()
|
273 |
+
def post_process(frame_idx, frame_img, split_preds, split_aimgs, split_matrs, enable_laplacian_blend, crop_top, crop_bott, crop_left, crop_right):
|
274 |
whole_img_path = frame_img
|
275 |
whole_img = cv2.imread(whole_img_path)
|
276 |
for p, a, m in zip(split_preds[frame_idx], split_aimgs[frame_idx], split_matrs[frame_idx]):
|
277 |
whole_img = paste_to_whole(p, a, m, whole_img, laplacian_blend=enable_laplacian_blend, crop_mask=(crop_top, crop_bott, crop_left, crop_right))
|
278 |
cv2.imwrite(whole_img_path, whole_img)
|
279 |
|
280 |
+
def concurrent_post_process(image_sequence, split_preds, split_aimgs, split_matrs, enable_laplacian_blend, crop_top, crop_bott, crop_left, crop_right):
|
281 |
with concurrent.futures.ThreadPoolExecutor() as executor:
|
282 |
futures = []
|
283 |
for idx, frame_img in enumerate(image_sequence):
|
284 |
future = executor.submit(
|
285 |
+
post_process,
|
286 |
idx,
|
287 |
frame_img,
|
288 |
split_preds,
|
|
|
302 |
except Exception as e:
|
303 |
print(f"An error occurred: {e}")
|
304 |
|
305 |
+
concurrent_post_process(
|
|
|
306 |
image_sequence,
|
307 |
split_preds,
|
308 |
split_aimgs,
|
|
|
431 |
|
432 |
|
433 |
def swap_option_changed(value):
|
434 |
+
if value.startswith("Age"):
|
435 |
return (
|
436 |
gr.update(visible=True),
|
437 |
gr.update(visible=False),
|
438 |
gr.update(visible=True),
|
439 |
)
|
440 |
+
elif value == "Specific Face":
|
441 |
return (
|
442 |
gr.update(visible=False),
|
443 |
gr.update(visible=True),
|
|
|
496 |
if hasattr(STREAMER, "stop"):
|
497 |
STREAMER.stop()
|
498 |
STREAMER = None
|
499 |
+
yield "cancelled !"
|
500 |
|
501 |
|
502 |
def slider_changed(show_frame, video_path, frame_index):
|
|
|
537 |
with gr.Row():
|
538 |
with gr.Column(scale=0.4):
|
539 |
with gr.Tab("📄 Swap Condition"):
|
540 |
+
swap_option = gr.Dropdown(
|
541 |
swap_options_list,
|
542 |
+
info="Choose which face or faces in the target image to swap.",
|
543 |
+
multiselect=False,
|
544 |
show_label=False,
|
545 |
value=swap_options_list[0],
|
546 |
interactive=True,
|
|
|
637 |
)
|
638 |
|
639 |
face_enhancer_name = gr.Dropdown(
|
640 |
+
FACE_ENHANCER_LIST, label="Face Enhancer", value="NONE", multiselect=False, interactive=True
|
641 |
)
|
642 |
|
643 |
source_image_input = gr.Image(
|
|
|
676 |
)
|
677 |
|
678 |
with gr.Box(visible=True) as input_video_group:
|
679 |
+
vid_widget = gr.Video if USE_COLAB else gr.Text
|
680 |
+
video_input = vid_widget(
|
681 |
label="Target Video Path", interactive=True
|
682 |
)
|
683 |
with gr.Accordion("✂️ Trim video", open=False):
|
|
|
838 |
]
|
839 |
|
840 |
swap_event = swap_button.click(
|
841 |
+
fn=process, inputs=swap_inputs, outputs=swap_outputs, show_progress=True,
|
842 |
)
|
843 |
|
844 |
cancel_button.click(
|
face_analyser.py
CHANGED
@@ -5,24 +5,58 @@ from tqdm import tqdm
|
|
5 |
from utils import scale_bbox_from_center
|
6 |
|
7 |
detect_conditions = [
|
|
|
8 |
"left most",
|
9 |
"right most",
|
10 |
"top most",
|
11 |
"bottom most",
|
12 |
-
"
|
13 |
-
"
|
14 |
-
"
|
15 |
]
|
16 |
|
17 |
swap_options_list = [
|
18 |
-
"All
|
|
|
19 |
"Age less than",
|
20 |
"Age greater than",
|
21 |
"All Male",
|
22 |
"All Female",
|
23 |
-
"
|
|
|
|
|
|
|
|
|
|
|
|
|
24 |
]
|
25 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
26 |
def analyse_face(image, model, return_single_face=True, detect_condition="best detection", scale=1.0):
|
27 |
faces = model.get(image)
|
28 |
if scale != 1: # landmark-scale
|
@@ -35,25 +69,7 @@ def analyse_face(image, model, return_single_face=True, detect_condition="best d
|
|
35 |
if not return_single_face:
|
36 |
return faces
|
37 |
|
38 |
-
|
39 |
-
if total_faces == 1:
|
40 |
-
return faces[0]
|
41 |
-
|
42 |
-
print(f"{total_faces} face detected. Using {detect_condition} face.")
|
43 |
-
if detect_condition == "left most":
|
44 |
-
return sorted(faces, key=lambda face: face["bbox"][0])[0]
|
45 |
-
elif detect_condition == "right most":
|
46 |
-
return sorted(faces, key=lambda face: face["bbox"][0])[-1]
|
47 |
-
elif detect_condition == "top most":
|
48 |
-
return sorted(faces, key=lambda face: face["bbox"][1])[0]
|
49 |
-
elif detect_condition == "bottom most":
|
50 |
-
return sorted(faces, key=lambda face: face["bbox"][1])[-1]
|
51 |
-
elif detect_condition == "most width":
|
52 |
-
return sorted(faces, key=lambda face: face["bbox"][2])[-1]
|
53 |
-
elif detect_condition == "most height":
|
54 |
-
return sorted(faces, key=lambda face: face["bbox"][3])[-1]
|
55 |
-
elif detect_condition == "best detection":
|
56 |
-
return sorted(faces, key=lambda face: face["det_score"])[-1]
|
57 |
|
58 |
|
59 |
def cosine_distance(a, b):
|
@@ -90,7 +106,7 @@ def get_analysed_data(face_analyser, image_sequence, source_data, swap_condition
|
|
90 |
|
91 |
n_faces = 0
|
92 |
for analysed_face in analysed_faces:
|
93 |
-
if swap_condition == "All
|
94 |
analysed_target_list.append(analysed_face)
|
95 |
analysed_source_list.append(analysed_source)
|
96 |
whole_frame_eql_list.append(frame_path)
|
@@ -124,6 +140,55 @@ def get_analysed_data(face_analyser, image_sequence, source_data, swap_condition
|
|
124 |
whole_frame_eql_list.append(frame_path)
|
125 |
n_faces += 1
|
126 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
127 |
num_faces_per_frame.append(n_faces)
|
128 |
|
129 |
return analysed_target_list, analysed_source_list, whole_frame_eql_list, num_faces_per_frame
|
|
|
5 |
from utils import scale_bbox_from_center
|
6 |
|
7 |
detect_conditions = [
|
8 |
+
"best detection",
|
9 |
"left most",
|
10 |
"right most",
|
11 |
"top most",
|
12 |
"bottom most",
|
13 |
+
"middle",
|
14 |
+
"biggest",
|
15 |
+
"smallest",
|
16 |
]
|
17 |
|
18 |
swap_options_list = [
|
19 |
+
"All Face",
|
20 |
+
"Specific Face",
|
21 |
"Age less than",
|
22 |
"Age greater than",
|
23 |
"All Male",
|
24 |
"All Female",
|
25 |
+
"Left Most",
|
26 |
+
"Right Most",
|
27 |
+
"Top Most",
|
28 |
+
"Bottom Most",
|
29 |
+
"Middle",
|
30 |
+
"Biggest",
|
31 |
+
"Smallest",
|
32 |
]
|
33 |
|
34 |
+
def get_single_face(faces, method="best detection"):
|
35 |
+
total_faces = len(faces)
|
36 |
+
if total_faces == 1:
|
37 |
+
return faces[0]
|
38 |
+
|
39 |
+
print(f"{total_faces} face detected. Using {method} face.")
|
40 |
+
if method == "best detection":
|
41 |
+
return sorted(faces, key=lambda face: face["det_score"])[-1]
|
42 |
+
elif method == "left most":
|
43 |
+
return sorted(faces, key=lambda face: face["bbox"][0])[0]
|
44 |
+
elif method == "right most":
|
45 |
+
return sorted(faces, key=lambda face: face["bbox"][0])[-1]
|
46 |
+
elif method == "top most":
|
47 |
+
return sorted(faces, key=lambda face: face["bbox"][1])[0]
|
48 |
+
elif method == "bottom most":
|
49 |
+
return sorted(faces, key=lambda face: face["bbox"][1])[-1]
|
50 |
+
elif method == "middle":
|
51 |
+
return sorted(faces, key=lambda face: (
|
52 |
+
(face["bbox"][0] + face["bbox"][2]) / 2 - 0.5) ** 2 +
|
53 |
+
((face["bbox"][1] + face["bbox"][3]) / 2 - 0.5) ** 2)[len(faces) // 2]
|
54 |
+
elif method == "biggest":
|
55 |
+
return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[-1]
|
56 |
+
elif method == "smallest":
|
57 |
+
return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[0]
|
58 |
+
|
59 |
+
|
60 |
def analyse_face(image, model, return_single_face=True, detect_condition="best detection", scale=1.0):
|
61 |
faces = model.get(image)
|
62 |
if scale != 1: # landmark-scale
|
|
|
69 |
if not return_single_face:
|
70 |
return faces
|
71 |
|
72 |
+
return get_single_face(faces, method=detect_condition)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
73 |
|
74 |
|
75 |
def cosine_distance(a, b):
|
|
|
106 |
|
107 |
n_faces = 0
|
108 |
for analysed_face in analysed_faces:
|
109 |
+
if swap_condition == "All Face":
|
110 |
analysed_target_list.append(analysed_face)
|
111 |
analysed_source_list.append(analysed_source)
|
112 |
whole_frame_eql_list.append(frame_path)
|
|
|
140 |
whole_frame_eql_list.append(frame_path)
|
141 |
n_faces += 1
|
142 |
|
143 |
+
if swap_condition == "Left Most":
|
144 |
+
analysed_face = get_single_face(analysed_faces, method="left most")
|
145 |
+
analysed_target_list.append(analysed_face)
|
146 |
+
analysed_source_list.append(analysed_source)
|
147 |
+
whole_frame_eql_list.append(frame_path)
|
148 |
+
n_faces += 1
|
149 |
+
|
150 |
+
elif swap_condition == "Right Most":
|
151 |
+
analysed_face = get_single_face(analysed_faces, method="right most")
|
152 |
+
analysed_target_list.append(analysed_face)
|
153 |
+
analysed_source_list.append(analysed_source)
|
154 |
+
whole_frame_eql_list.append(frame_path)
|
155 |
+
n_faces += 1
|
156 |
+
|
157 |
+
elif swap_condition == "Top Most":
|
158 |
+
analysed_face = get_single_face(analysed_faces, method="top most")
|
159 |
+
analysed_target_list.append(analysed_face)
|
160 |
+
analysed_source_list.append(analysed_source)
|
161 |
+
whole_frame_eql_list.append(frame_path)
|
162 |
+
n_faces += 1
|
163 |
+
|
164 |
+
elif swap_condition == "Bottom Most":
|
165 |
+
analysed_face = get_single_face(analysed_faces, method="bottom most")
|
166 |
+
analysed_target_list.append(analysed_face)
|
167 |
+
analysed_source_list.append(analysed_source)
|
168 |
+
whole_frame_eql_list.append(frame_path)
|
169 |
+
n_faces += 1
|
170 |
+
|
171 |
+
elif swap_condition == "Middle":
|
172 |
+
analysed_face = get_single_face(analysed_faces, method="middle")
|
173 |
+
analysed_target_list.append(analysed_face)
|
174 |
+
analysed_source_list.append(analysed_source)
|
175 |
+
whole_frame_eql_list.append(frame_path)
|
176 |
+
n_faces += 1
|
177 |
+
|
178 |
+
elif swap_condition == "Biggest":
|
179 |
+
analysed_face = get_single_face(analysed_faces, method="biggest")
|
180 |
+
analysed_target_list.append(analysed_face)
|
181 |
+
analysed_source_list.append(analysed_source)
|
182 |
+
whole_frame_eql_list.append(frame_path)
|
183 |
+
n_faces += 1
|
184 |
+
|
185 |
+
elif swap_condition == "Smallest":
|
186 |
+
analysed_face = get_single_face(analysed_faces, method="smallest")
|
187 |
+
analysed_target_list.append(analysed_face)
|
188 |
+
analysed_source_list.append(analysed_source)
|
189 |
+
whole_frame_eql_list.append(frame_path)
|
190 |
+
n_faces += 1
|
191 |
+
|
192 |
num_faces_per_frame.append(n_faces)
|
193 |
|
194 |
return analysed_target_list, analysed_source_list, whole_frame_eql_list, num_faces_per_frame
|
face_enhancer.py
CHANGED
@@ -4,36 +4,49 @@ import gfpgan
|
|
4 |
from PIL import Image
|
5 |
from upscaler.RealESRGAN import RealESRGAN
|
6 |
|
7 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
8 |
|
9 |
def load_face_enhancer_model(name='GFPGAN', device="cpu"):
|
|
|
|
|
|
|
10 |
if name == 'GFPGAN':
|
11 |
-
model_path =
|
12 |
-
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path)
|
13 |
-
model = gfpgan.GFPGANer(model_path=model_path, upscale=1)
|
14 |
elif name == 'REAL-ESRGAN 2x':
|
15 |
-
model_path = "./assets/pretrained_models/RealESRGAN_x2.pth"
|
16 |
-
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path)
|
17 |
model = RealESRGAN(device, scale=2)
|
18 |
model.load_weights(model_path, download=False)
|
19 |
elif name == 'REAL-ESRGAN 4x':
|
20 |
-
model_path = "./assets/pretrained_models/RealESRGAN_x4.pth"
|
21 |
-
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path)
|
22 |
model = RealESRGAN(device, scale=4)
|
23 |
model.load_weights(model_path, download=False)
|
24 |
elif name == 'REAL-ESRGAN 8x':
|
25 |
-
model_path = "./assets/pretrained_models/RealESRGAN_x8.pth"
|
26 |
-
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path)
|
27 |
model = RealESRGAN(device, scale=8)
|
28 |
model.load_weights(model_path, download=False)
|
29 |
else:
|
30 |
model = None
|
31 |
-
return model
|
32 |
-
|
33 |
-
def gfpgan_enhance(img, model, has_aligned=True):
|
34 |
-
_, imgs, _ = model.enhance(img, paste_back=True, has_aligned=has_aligned)
|
35 |
-
return imgs[0]
|
36 |
-
|
37 |
-
def realesrgan_enhance(img, model):
|
38 |
-
img = model.predict(img)
|
39 |
-
return img
|
|
|
4 |
from PIL import Image
|
5 |
from upscaler.RealESRGAN import RealESRGAN
|
6 |
|
7 |
+
|
8 |
+
def gfpgan_runner(img, model):
|
9 |
+
_, imgs, _ = model.enhance(img, paste_back=True, has_aligned=True)
|
10 |
+
return imgs[0]
|
11 |
+
|
12 |
+
|
13 |
+
def realesrgan_runner(img, model):
|
14 |
+
img = model.predict(img)
|
15 |
+
return img
|
16 |
+
|
17 |
+
|
18 |
+
supported_enhancers = {
|
19 |
+
"GFPGAN": ("./assets/pretrained_models/GFPGANv1.4.pth", gfpgan_runner),
|
20 |
+
"REAL-ESRGAN 2x": ("./assets/pretrained_models/RealESRGAN_x2.pth", realesrgan_runner),
|
21 |
+
"REAL-ESRGAN 4x": ("./assets/pretrained_models/RealESRGAN_x4.pth", realesrgan_runner),
|
22 |
+
"REAL-ESRGAN 8x": ("./assets/pretrained_models/RealESRGAN_x8.pth", realesrgan_runner)
|
23 |
+
}
|
24 |
+
|
25 |
+
|
26 |
+
def get_available_enhancer_names():
|
27 |
+
available = []
|
28 |
+
for name, data in supported_enhancers.items():
|
29 |
+
path = os.path.join(os.path.abspath(os.path.dirname(__file__)), data[0])
|
30 |
+
if os.path.exists(path):
|
31 |
+
available.append(name)
|
32 |
+
return available
|
33 |
+
|
34 |
|
35 |
def load_face_enhancer_model(name='GFPGAN', device="cpu"):
|
36 |
+
assert name in get_available_enhancer_names(), f"Face enhancer {name} unavailable."
|
37 |
+
model_path, model_runner = supported_enhancers.get(name)
|
38 |
+
model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path)
|
39 |
if name == 'GFPGAN':
|
40 |
+
model = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=device)
|
|
|
|
|
41 |
elif name == 'REAL-ESRGAN 2x':
|
|
|
|
|
42 |
model = RealESRGAN(device, scale=2)
|
43 |
model.load_weights(model_path, download=False)
|
44 |
elif name == 'REAL-ESRGAN 4x':
|
|
|
|
|
45 |
model = RealESRGAN(device, scale=4)
|
46 |
model.load_weights(model_path, download=False)
|
47 |
elif name == 'REAL-ESRGAN 8x':
|
|
|
|
|
48 |
model = RealESRGAN(device, scale=8)
|
49 |
model.load_weights(model_path, download=False)
|
50 |
else:
|
51 |
model = None
|
52 |
+
return (model, model_runner)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
face_swapper.py
CHANGED
@@ -7,6 +7,8 @@ import numpy as np
|
|
7 |
from tqdm import tqdm
|
8 |
from onnx import numpy_helper
|
9 |
from skimage import transform as trans
|
|
|
|
|
10 |
|
11 |
arcface_dst = np.array(
|
12 |
[[38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366],
|
@@ -62,33 +64,44 @@ class Inswapper():
|
|
62 |
self.input_size = tuple(input_shape[2:4][::-1])
|
63 |
|
64 |
def forward(self, imgs, latents):
|
65 |
-
|
66 |
for img, latent in zip(imgs, latents):
|
67 |
img = (img - self.input_mean) / self.input_std
|
68 |
pred = self.session.run(self.output_names, {self.input_names[0]: img, self.input_names[1]: latent})[0]
|
69 |
-
|
70 |
-
return batch_preds
|
71 |
|
72 |
def get(self, imgs, target_faces, source_faces):
|
73 |
-
|
74 |
-
|
75 |
-
|
76 |
-
|
77 |
-
|
78 |
-
|
79 |
-
|
80 |
-
|
81 |
-
|
82 |
-
|
83 |
-
latent = np.dot(latent, self.emap)
|
84 |
-
latent /= np.linalg.norm(latent)
|
85 |
pred = self.session.run(self.output_names, {self.input_names[0]: blob, self.input_names[1]: latent})[0]
|
86 |
pred = pred.transpose((0, 2, 3, 1))[0]
|
87 |
pred = np.clip(255 * pred, 0, 255).astype(np.uint8)[:, :, ::-1]
|
88 |
-
|
89 |
-
|
90 |
-
|
91 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
92 |
|
93 |
def batch_forward(self, img_list, target_f_list, source_f_list):
|
94 |
num_samples = len(img_list)
|
@@ -96,8 +109,9 @@ class Inswapper():
|
|
96 |
|
97 |
preds = []
|
98 |
aimgs = []
|
99 |
-
|
100 |
-
|
|
|
101 |
start_idx = i * self.batch_size
|
102 |
end_idx = min((i + 1) * self.batch_size, num_samples)
|
103 |
|
@@ -105,86 +119,26 @@ class Inswapper():
|
|
105 |
batch_target_f = target_f_list[start_idx:end_idx]
|
106 |
batch_source_f = source_f_list[start_idx:end_idx]
|
107 |
|
108 |
-
batch_pred, batch_aimg,
|
109 |
preds.extend(batch_pred)
|
110 |
aimgs.extend(batch_aimg)
|
111 |
-
|
112 |
-
|
113 |
-
|
114 |
-
|
115 |
-
def laplacian_blending(A, B, m, num_levels=4):
|
116 |
-
assert A.shape == B.shape
|
117 |
-
assert B.shape == m.shape
|
118 |
-
height = m.shape[0]
|
119 |
-
width = m.shape[1]
|
120 |
-
size_list = np.array([4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096])
|
121 |
-
size = size_list[np.where(size_list > max(height, width))][0]
|
122 |
-
GA = np.zeros((size, size, 3), dtype=np.float32)
|
123 |
-
GA[:height, :width, :] = A
|
124 |
-
GB = np.zeros((size, size, 3), dtype=np.float32)
|
125 |
-
GB[:height, :width, :] = B
|
126 |
-
GM = np.zeros((size, size, 3), dtype=np.float32)
|
127 |
-
GM[:height, :width, :] = m
|
128 |
-
gpA = [GA]
|
129 |
-
gpB = [GB]
|
130 |
-
gpM = [GM]
|
131 |
-
for i in range(num_levels):
|
132 |
-
GA = cv2.pyrDown(GA)
|
133 |
-
GB = cv2.pyrDown(GB)
|
134 |
-
GM = cv2.pyrDown(GM)
|
135 |
-
gpA.append(np.float32(GA))
|
136 |
-
gpB.append(np.float32(GB))
|
137 |
-
gpM.append(np.float32(GM))
|
138 |
-
lpA = [gpA[num_levels-1]]
|
139 |
-
lpB = [gpB[num_levels-1]]
|
140 |
-
gpMr = [gpM[num_levels-1]]
|
141 |
-
for i in range(num_levels-1,0,-1):
|
142 |
-
LA = np.subtract(gpA[i-1], cv2.pyrUp(gpA[i]))
|
143 |
-
LB = np.subtract(gpB[i-1], cv2.pyrUp(gpB[i]))
|
144 |
-
lpA.append(LA)
|
145 |
-
lpB.append(LB)
|
146 |
-
gpMr.append(gpM[i-1])
|
147 |
-
LS = []
|
148 |
-
for la,lb,gm in zip(lpA,lpB,gpMr):
|
149 |
-
ls = la * gm + lb * (1.0 - gm)
|
150 |
-
LS.append(ls)
|
151 |
-
ls_ = LS[0]
|
152 |
-
for i in range(1,num_levels):
|
153 |
-
ls_ = cv2.pyrUp(ls_)
|
154 |
-
ls_ = cv2.add(ls_, LS[i])
|
155 |
-
ls_ = np.clip(ls_[:height, :width, :], 0, 255)
|
156 |
-
return ls_
|
157 |
|
158 |
|
159 |
def paste_to_whole(bgr_fake, aimg, M, whole_img, laplacian_blend=True, crop_mask=(0,0,0,0)):
|
160 |
IM = cv2.invertAffineTransform(M)
|
161 |
|
162 |
-
img_white =
|
163 |
-
|
164 |
-
|
165 |
-
|
166 |
-
|
167 |
-
if top > 0: img_white[:top, :] = 0
|
168 |
-
if bottom > 0: img_white[-bottom:, :] = 0
|
169 |
-
|
170 |
-
left = int(crop_mask[2])
|
171 |
-
right = int(crop_mask[3])
|
172 |
-
if left + right < aimg.shape[0]:
|
173 |
-
if left > 0: img_white[:, :left] = 0
|
174 |
-
if right > 0: img_white[:, -right:] = 0
|
175 |
-
|
176 |
-
bgr_fake = cv2.warpAffine(
|
177 |
-
bgr_fake, IM, (whole_img.shape[1], whole_img.shape[0]), borderValue=0.0
|
178 |
-
)
|
179 |
-
img_white = cv2.warpAffine(
|
180 |
-
img_white, IM, (whole_img.shape[1], whole_img.shape[0]), borderValue=0.0
|
181 |
-
)
|
182 |
img_white[img_white > 20] = 255
|
183 |
img_mask = img_white
|
184 |
mask_h_inds, mask_w_inds = np.where(img_mask == 255)
|
185 |
-
|
186 |
-
mask_w = np.max(mask_w_inds) - np.min(mask_w_inds)
|
187 |
-
mask_size = int(np.sqrt(mask_h * mask_w))
|
188 |
|
189 |
k = max(mask_size // 10, 10)
|
190 |
img_mask = cv2.erode(img_mask, np.ones((k, k), np.uint8), iterations=1)
|
@@ -201,3 +155,11 @@ def paste_to_whole(bgr_fake, aimg, M, whole_img, laplacian_blend=True, crop_mask
|
|
201 |
|
202 |
fake_merged = img_mask * bgr_fake + (1 - img_mask) * whole_img.astype(np.float32)
|
203 |
return fake_merged.astype("uint8")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
7 |
from tqdm import tqdm
|
8 |
from onnx import numpy_helper
|
9 |
from skimage import transform as trans
|
10 |
+
import torchvision.transforms.functional as F
|
11 |
+
from utils import make_white_image, laplacian_blending
|
12 |
|
13 |
arcface_dst = np.array(
|
14 |
[[38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366],
|
|
|
64 |
self.input_size = tuple(input_shape[2:4][::-1])
|
65 |
|
66 |
def forward(self, imgs, latents):
|
67 |
+
preds = []
|
68 |
for img, latent in zip(imgs, latents):
|
69 |
img = (img - self.input_mean) / self.input_std
|
70 |
pred = self.session.run(self.output_names, {self.input_names[0]: img, self.input_names[1]: latent})[0]
|
71 |
+
preds.append(pred)
|
|
|
72 |
|
73 |
def get(self, imgs, target_faces, source_faces):
|
74 |
+
imgs = list(imgs)
|
75 |
+
|
76 |
+
preds = [None] * len(imgs)
|
77 |
+
aimgs = [None] * len(imgs)
|
78 |
+
matrs = [None] * len(imgs)
|
79 |
+
|
80 |
+
for idx, (img, target_face, source_face) in enumerate(zip(imgs, target_faces, source_faces)):
|
81 |
+
aimg, M, blob, latent = self.prepare_data(img, target_face, source_face)
|
82 |
+
aimgs[idx] = aimg
|
83 |
+
matrs[idx] = M
|
|
|
|
|
84 |
pred = self.session.run(self.output_names, {self.input_names[0]: blob, self.input_names[1]: latent})[0]
|
85 |
pred = pred.transpose((0, 2, 3, 1))[0]
|
86 |
pred = np.clip(255 * pred, 0, 255).astype(np.uint8)[:, :, ::-1]
|
87 |
+
preds[idx] = pred
|
88 |
+
|
89 |
+
return (preds, aimgs, matrs)
|
90 |
+
|
91 |
+
def prepare_data(self, img, target_face, source_face):
|
92 |
+
if isinstance(img, str):
|
93 |
+
img = cv2.imread(img)
|
94 |
+
|
95 |
+
aimg, M = norm_crop2(img, target_face.kps, self.input_size[0])
|
96 |
+
|
97 |
+
blob = cv2.dnn.blobFromImage(aimg, 1.0 / self.input_std, self.input_size,
|
98 |
+
(self.input_mean, self.input_mean, self.input_mean), swapRB=True)
|
99 |
+
|
100 |
+
latent = source_face.normed_embedding.reshape((1, -1))
|
101 |
+
latent = np.dot(latent, self.emap)
|
102 |
+
latent /= np.linalg.norm(latent)
|
103 |
+
|
104 |
+
return (aimg, M, blob, latent)
|
105 |
|
106 |
def batch_forward(self, img_list, target_f_list, source_f_list):
|
107 |
num_samples = len(img_list)
|
|
|
109 |
|
110 |
preds = []
|
111 |
aimgs = []
|
112 |
+
matrs = []
|
113 |
+
|
114 |
+
for i in tqdm(range(num_batches), desc="Swapping face"):
|
115 |
start_idx = i * self.batch_size
|
116 |
end_idx = min((i + 1) * self.batch_size, num_samples)
|
117 |
|
|
|
119 |
batch_target_f = target_f_list[start_idx:end_idx]
|
120 |
batch_source_f = source_f_list[start_idx:end_idx]
|
121 |
|
122 |
+
batch_pred, batch_aimg, batch_matr = self.get(batch_img, batch_target_f, batch_source_f)
|
123 |
preds.extend(batch_pred)
|
124 |
aimgs.extend(batch_aimg)
|
125 |
+
matrs.extend(batch_matr)
|
126 |
+
|
127 |
+
return (preds, aimgs, matrs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
128 |
|
129 |
|
130 |
def paste_to_whole(bgr_fake, aimg, M, whole_img, laplacian_blend=True, crop_mask=(0,0,0,0)):
|
131 |
IM = cv2.invertAffineTransform(M)
|
132 |
|
133 |
+
img_white = make_white_image(aimg.shape[:2], crop=crop_mask, white_value=255)
|
134 |
+
|
135 |
+
bgr_fake = cv2.warpAffine(bgr_fake, IM, (whole_img.shape[1], whole_img.shape[0]), borderValue=0.0)
|
136 |
+
img_white = cv2.warpAffine(img_white, IM, (whole_img.shape[1], whole_img.shape[0]), borderValue=0.0)
|
137 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
138 |
img_white[img_white > 20] = 255
|
139 |
img_mask = img_white
|
140 |
mask_h_inds, mask_w_inds = np.where(img_mask == 255)
|
141 |
+
mask_size = int(np.sqrt(np.ptp(mask_h_inds) * np.ptp(mask_w_inds)))
|
|
|
|
|
142 |
|
143 |
k = max(mask_size // 10, 10)
|
144 |
img_mask = cv2.erode(img_mask, np.ones((k, k), np.uint8), iterations=1)
|
|
|
155 |
|
156 |
fake_merged = img_mask * bgr_fake + (1 - img_mask) * whole_img.astype(np.float32)
|
157 |
return fake_merged.astype("uint8")
|
158 |
+
|
159 |
+
def place_foreground_on_background(foreground, background, matrix):
|
160 |
+
matrix = cv2.invertAffineTransform(matrix)
|
161 |
+
mask = np.ones(foreground.shape, dtype="float32")
|
162 |
+
foreground = cv2.warpAffine(foreground, matrix, (background.shape[1], background.shape[0]), borderValue=0.0)
|
163 |
+
mask = cv2.warpAffine(mask, matrix, (background.shape[1], background.shape[0]), borderValue=0.0)
|
164 |
+
composite_image = mask * foreground + (1 - mask) * background
|
165 |
+
return composite_image
|
nsfw_detector.py
CHANGED
@@ -7,6 +7,7 @@ import torch
|
|
7 |
import timm
|
8 |
from tqdm import tqdm
|
9 |
|
|
|
10 |
normalize_t = Normalize((0.4814, 0.4578, 0.4082), (0.2686, 0.2613, 0.2757))
|
11 |
|
12 |
#nsfw classifier
|
@@ -28,7 +29,7 @@ class NSFWClassifier(nn.Module):
|
|
28 |
x = nsfw_model.linear_probe(x)
|
29 |
return x
|
30 |
|
31 |
-
def is_nsfw(self, img_paths, threshold = 0.
|
32 |
skip_step = 1
|
33 |
total_len = len(img_paths)
|
34 |
if total_len < 100: skip_step = 1
|
@@ -37,16 +38,19 @@ class NSFWClassifier(nn.Module):
|
|
37 |
if total_len > 1000 and total_len < 10000: skip_step = 50
|
38 |
if total_len > 10000: skip_step = 100
|
39 |
|
40 |
-
for idx in tqdm(range(0, total_len, skip_step), total=total_len, desc="Checking for NSFW contents"):
|
41 |
-
|
42 |
-
img =
|
43 |
img = np.array(img)/255
|
44 |
img = T.ToTensor()(img).unsqueeze(0).float()
|
45 |
if next(self.parameters()).is_cuda:
|
46 |
img = img.cuda()
|
47 |
with torch.no_grad():
|
48 |
score = self.forward(img).sigmoid()[0].item()
|
49 |
-
if score > threshold:
|
|
|
|
|
|
|
50 |
return False
|
51 |
|
52 |
def get_nsfw_detector(model_path='nsfwmodel_281.pth', device="cpu"):
|
|
|
7 |
import timm
|
8 |
from tqdm import tqdm
|
9 |
|
10 |
+
# https://github.com/Whiax/NSFW-Classifier/raw/main/nsfwmodel_281.pth
|
11 |
normalize_t = Normalize((0.4814, 0.4578, 0.4082), (0.2686, 0.2613, 0.2757))
|
12 |
|
13 |
#nsfw classifier
|
|
|
29 |
x = nsfw_model.linear_probe(x)
|
30 |
return x
|
31 |
|
32 |
+
def is_nsfw(self, img_paths, threshold = 0.98):
|
33 |
skip_step = 1
|
34 |
total_len = len(img_paths)
|
35 |
if total_len < 100: skip_step = 1
|
|
|
38 |
if total_len > 1000 and total_len < 10000: skip_step = 50
|
39 |
if total_len > 10000: skip_step = 100
|
40 |
|
41 |
+
for idx in tqdm(range(0, total_len, skip_step), total=int(total_len // skip_step), desc="Checking for NSFW contents"):
|
42 |
+
_img = Image.open(img_paths[idx]).convert('RGB')
|
43 |
+
img = _img.resize((224, 224))
|
44 |
img = np.array(img)/255
|
45 |
img = T.ToTensor()(img).unsqueeze(0).float()
|
46 |
if next(self.parameters()).is_cuda:
|
47 |
img = img.cuda()
|
48 |
with torch.no_grad():
|
49 |
score = self.forward(img).sigmoid()[0].item()
|
50 |
+
if score > threshold:
|
51 |
+
print(f"Detected nsfw score:{score}")
|
52 |
+
_img.save("nsfw.jpg")
|
53 |
+
return True
|
54 |
return False
|
55 |
|
56 |
def get_nsfw_detector(model_path='nsfwmodel_281.pth', device="cpu"):
|
swapper.py
DELETED
@@ -1,106 +0,0 @@
|
|
1 |
-
import cv2
|
2 |
-
import numpy as np
|
3 |
-
from insightface.utils import face_align
|
4 |
-
from face_parsing.swap import swap_regions
|
5 |
-
from utils import add_logo_to_image
|
6 |
-
|
7 |
-
swap_options_list = [
|
8 |
-
"All face",
|
9 |
-
"Age less than",
|
10 |
-
"Age greater than",
|
11 |
-
"All Male",
|
12 |
-
"All Female",
|
13 |
-
"Specific Face",
|
14 |
-
]
|
15 |
-
|
16 |
-
|
17 |
-
def swap_face(whole_img, target_face, source_face, models):
|
18 |
-
inswapper = models.get("swap")
|
19 |
-
face_enhancer = models.get("enhance", None)
|
20 |
-
face_parser = models.get("face_parser", None)
|
21 |
-
fe_enable = models.get("enhance_sett", False)
|
22 |
-
|
23 |
-
bgr_fake, M = inswapper.get(whole_img, target_face, source_face, paste_back=False)
|
24 |
-
image_size = 128 if not fe_enable else 512
|
25 |
-
aimg, _ = face_align.norm_crop2(whole_img, target_face.kps, image_size=image_size)
|
26 |
-
|
27 |
-
if face_parser is not None:
|
28 |
-
fp_enable, includes, smooth_mask, blur_amount = models.get("face_parser_sett")
|
29 |
-
if fp_enable:
|
30 |
-
bgr_fake = swap_regions(
|
31 |
-
bgr_fake, aimg, face_parser, smooth_mask, includes=includes, blur=blur_amount
|
32 |
-
)
|
33 |
-
|
34 |
-
if fe_enable:
|
35 |
-
_, bgr_fake, _ = face_enhancer.enhance(
|
36 |
-
bgr_fake, paste_back=True, has_aligned=True
|
37 |
-
)
|
38 |
-
bgr_fake = bgr_fake[0]
|
39 |
-
M /= 0.25
|
40 |
-
|
41 |
-
IM = cv2.invertAffineTransform(M)
|
42 |
-
|
43 |
-
img_white = np.full((aimg.shape[0], aimg.shape[1]), 255, dtype=np.float32)
|
44 |
-
bgr_fake = cv2.warpAffine(
|
45 |
-
bgr_fake, IM, (whole_img.shape[1], whole_img.shape[0]), borderValue=0.0
|
46 |
-
)
|
47 |
-
img_white = cv2.warpAffine(
|
48 |
-
img_white, IM, (whole_img.shape[1], whole_img.shape[0]), borderValue=0.0
|
49 |
-
)
|
50 |
-
img_white[img_white > 20] = 255
|
51 |
-
img_mask = img_white
|
52 |
-
mask_h_inds, mask_w_inds = np.where(img_mask == 255)
|
53 |
-
mask_h = np.max(mask_h_inds) - np.min(mask_h_inds)
|
54 |
-
mask_w = np.max(mask_w_inds) - np.min(mask_w_inds)
|
55 |
-
mask_size = int(np.sqrt(mask_h * mask_w))
|
56 |
-
|
57 |
-
k = max(mask_size // 10, 10)
|
58 |
-
img_mask = cv2.erode(img_mask, np.ones((k, k), np.uint8), iterations=1)
|
59 |
-
|
60 |
-
k = max(mask_size // 20, 5)
|
61 |
-
kernel_size = (k, k)
|
62 |
-
blur_size = tuple(2 * i + 1 for i in kernel_size)
|
63 |
-
img_mask = cv2.GaussianBlur(img_mask, blur_size, 0) / 255
|
64 |
-
|
65 |
-
img_mask = np.reshape(img_mask, [img_mask.shape[0], img_mask.shape[1], 1])
|
66 |
-
fake_merged = img_mask * bgr_fake + (1 - img_mask) * whole_img.astype(np.float32)
|
67 |
-
fake_merged = add_logo_to_image(fake_merged.astype("uint8"))
|
68 |
-
return fake_merged
|
69 |
-
|
70 |
-
|
71 |
-
def swap_face_with_condition(
|
72 |
-
whole_img, target_faces, source_face, condition, age, models
|
73 |
-
):
|
74 |
-
swapped = whole_img.copy()
|
75 |
-
|
76 |
-
for target_face in target_faces:
|
77 |
-
if condition == "All face":
|
78 |
-
swapped = swap_face(swapped, target_face, source_face, models)
|
79 |
-
elif condition == "Age less than" and target_face["age"] < age:
|
80 |
-
swapped = swap_face(swapped, target_face, source_face, models)
|
81 |
-
elif condition == "Age greater than" and target_face["age"] > age:
|
82 |
-
swapped = swap_face(swapped, target_face, source_face, models)
|
83 |
-
elif condition == "All Male" and target_face["gender"] == 1:
|
84 |
-
swapped = swap_face(swapped, target_face, source_face, models)
|
85 |
-
elif condition == "All Female" and target_face["gender"] == 0:
|
86 |
-
swapped = swap_face(swapped, target_face, source_face, models)
|
87 |
-
|
88 |
-
return swapped
|
89 |
-
|
90 |
-
|
91 |
-
def swap_specific(source_specifics, target_faces, whole_img, models, threshold=0.6):
|
92 |
-
swapped = whole_img.copy()
|
93 |
-
|
94 |
-
for source_face, specific_face in source_specifics:
|
95 |
-
specific_embed = specific_face["embedding"]
|
96 |
-
specific_embed /= np.linalg.norm(specific_embed)
|
97 |
-
|
98 |
-
for target_face in target_faces:
|
99 |
-
target_embed = target_face["embedding"]
|
100 |
-
target_embed /= np.linalg.norm(target_embed)
|
101 |
-
cosine_distance = 1 - np.dot(specific_embed, target_embed)
|
102 |
-
if cosine_distance > threshold:
|
103 |
-
continue
|
104 |
-
swapped = swap_face(swapped, target_face, source_face, models)
|
105 |
-
|
106 |
-
return swapped
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
utils.py
CHANGED
@@ -2,13 +2,64 @@ import os
|
|
2 |
import cv2
|
3 |
import time
|
4 |
import glob
|
|
|
5 |
import shutil
|
6 |
import platform
|
7 |
import datetime
|
8 |
import subprocess
|
|
|
9 |
from threading import Thread
|
10 |
from moviepy.editor import VideoFileClip, ImageSequenceClip
|
11 |
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
12 |
|
13 |
|
14 |
def trim_video(video_path, output_path, start_frame, stop_frame):
|
@@ -23,9 +74,11 @@ def trim_video(video_path, output_path, start_frame, stop_frame):
|
|
23 |
start_time = start_frame / fps
|
24 |
duration = (stop_frame - start_frame) / fps
|
25 |
|
|
|
|
|
26 |
trimmed_video = video.subclip(start_time, start_time + duration)
|
27 |
trimmed_video.write_videofile(
|
28 |
-
trimmed_video_file_path, codec="libx264", audio_codec="aac"
|
29 |
)
|
30 |
trimmed_video.close()
|
31 |
video.close()
|
@@ -91,9 +144,6 @@ class ProcessBar:
|
|
91 |
return info_text
|
92 |
|
93 |
|
94 |
-
logo_image = cv2.imread("./assets/images/logo.png", cv2.IMREAD_UNCHANGED)
|
95 |
-
|
96 |
-
|
97 |
def add_logo_to_image(img, logo=logo_image):
|
98 |
logo_size = int(img.shape[1] * 0.1)
|
99 |
logo = cv2.resize(logo, (logo_size, logo_size))
|
@@ -111,6 +161,7 @@ def add_logo_to_image(img, logo=logo_image):
|
|
111 |
]
|
112 |
return img
|
113 |
|
|
|
114 |
def split_list_by_lengths(data, length_list):
|
115 |
split_data = []
|
116 |
start_idx = 0
|
@@ -121,6 +172,7 @@ def split_list_by_lengths(data, length_list):
|
|
121 |
start_idx = end_idx
|
122 |
return split_data
|
123 |
|
|
|
124 |
def merge_img_sequence_from_ref(ref_video_path, image_sequence, output_file_name):
|
125 |
video_clip = VideoFileClip(ref_video_path)
|
126 |
fps = video_clip.fps
|
@@ -132,12 +184,15 @@ def merge_img_sequence_from_ref(ref_video_path, image_sequence, output_file_name
|
|
132 |
if audio_clip is not None:
|
133 |
edited_video_clip = edited_video_clip.set_audio(audio_clip)
|
134 |
|
|
|
|
|
135 |
edited_video_clip.set_duration(duration).write_videofile(
|
136 |
-
output_file_name, codec="libx264"
|
137 |
)
|
138 |
edited_video_clip.close()
|
139 |
video_clip.close()
|
140 |
|
|
|
141 |
def scale_bbox_from_center(bbox, scale_width, scale_height, image_width, image_height):
|
142 |
# Extract the coordinates of the bbox
|
143 |
x1, y1, x2, y2 = bbox
|
@@ -167,3 +222,72 @@ def scale_bbox_from_center(bbox, scale_width, scale_height, image_width, image_h
|
|
167 |
# Return the scaled bbox coordinates
|
168 |
scaled_bbox = [new_x1, new_y1, new_x2, new_y2]
|
169 |
return scaled_bbox
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2 |
import cv2
|
3 |
import time
|
4 |
import glob
|
5 |
+
import torch
|
6 |
import shutil
|
7 |
import platform
|
8 |
import datetime
|
9 |
import subprocess
|
10 |
+
import numpy as np
|
11 |
from threading import Thread
|
12 |
from moviepy.editor import VideoFileClip, ImageSequenceClip
|
13 |
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
|
14 |
+
from face_parsing import init_parser, swap_regions, mask_regions, mask_regions_to_list, SoftErosion
|
15 |
+
|
16 |
+
|
17 |
+
logo_image = cv2.imread("./assets/images/logo.png", cv2.IMREAD_UNCHANGED)
|
18 |
+
|
19 |
+
|
20 |
+
quality_types = ["poor", "low", "medium", "high", "best"]
|
21 |
+
|
22 |
+
|
23 |
+
bitrate_quality_by_resolution = {
|
24 |
+
240: {"poor": "300k", "low": "500k", "medium": "800k", "high": "1000k", "best": "1200k"},
|
25 |
+
360: {"poor": "500k","low": "800k","medium": "1200k","high": "1500k","best": "2000k"},
|
26 |
+
480: {"poor": "800k","low": "1200k","medium": "2000k","high": "2500k","best": "3000k"},
|
27 |
+
720: {"poor": "1500k","low": "2500k","medium": "4000k","high": "5000k","best": "6000k"},
|
28 |
+
1080: {"poor": "2500k","low": "4000k","medium": "6000k","high": "7000k","best": "8000k"},
|
29 |
+
1440: {"poor": "4000k","low": "6000k","medium": "8000k","high": "10000k","best": "12000k"},
|
30 |
+
2160: {"poor": "8000k","low": "10000k","medium": "12000k","high": "15000k","best": "20000k"}
|
31 |
+
}
|
32 |
+
|
33 |
+
|
34 |
+
crf_quality_by_resolution = {
|
35 |
+
240: {"poor": 45, "low": 35, "medium": 28, "high": 23, "best": 20},
|
36 |
+
360: {"poor": 35, "low": 28, "medium": 23, "high": 20, "best": 18},
|
37 |
+
480: {"poor": 28, "low": 23, "medium": 20, "high": 18, "best": 16},
|
38 |
+
720: {"poor": 23, "low": 20, "medium": 18, "high": 16, "best": 14},
|
39 |
+
1080: {"poor": 20, "low": 18, "medium": 16, "high": 14, "best": 12},
|
40 |
+
1440: {"poor": 18, "low": 16, "medium": 14, "high": 12, "best": 10},
|
41 |
+
2160: {"poor": 16, "low": 14, "medium": 12, "high": 10, "best": 8}
|
42 |
+
}
|
43 |
+
|
44 |
+
|
45 |
+
def get_bitrate_for_resolution(resolution, quality):
|
46 |
+
available_resolutions = list(bitrate_quality_by_resolution.keys())
|
47 |
+
closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution))
|
48 |
+
return bitrate_quality_by_resolution[closest_resolution][quality]
|
49 |
+
|
50 |
+
|
51 |
+
def get_crf_for_resolution(resolution, quality):
|
52 |
+
available_resolutions = list(crf_quality_by_resolution.keys())
|
53 |
+
closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution))
|
54 |
+
return crf_quality_by_resolution[closest_resolution][quality]
|
55 |
+
|
56 |
+
|
57 |
+
def get_video_bitrate(video_file):
|
58 |
+
ffprobe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries',
|
59 |
+
'stream=bit_rate', '-of', 'default=noprint_wrappers=1:nokey=1', video_file]
|
60 |
+
result = subprocess.run(ffprobe_cmd, stdout=subprocess.PIPE)
|
61 |
+
kbps = max(int(result.stdout) // 1000, 10)
|
62 |
+
return str(kbps) + 'k'
|
63 |
|
64 |
|
65 |
def trim_video(video_path, output_path, start_frame, stop_frame):
|
|
|
74 |
start_time = start_frame / fps
|
75 |
duration = (stop_frame - start_frame) / fps
|
76 |
|
77 |
+
bitrate = get_bitrate_for_resolution(min(*video.size), "high")
|
78 |
+
|
79 |
trimmed_video = video.subclip(start_time, start_time + duration)
|
80 |
trimmed_video.write_videofile(
|
81 |
+
trimmed_video_file_path, codec="libx264", audio_codec="aac", bitrate=bitrate,
|
82 |
)
|
83 |
trimmed_video.close()
|
84 |
video.close()
|
|
|
144 |
return info_text
|
145 |
|
146 |
|
|
|
|
|
|
|
147 |
def add_logo_to_image(img, logo=logo_image):
|
148 |
logo_size = int(img.shape[1] * 0.1)
|
149 |
logo = cv2.resize(logo, (logo_size, logo_size))
|
|
|
161 |
]
|
162 |
return img
|
163 |
|
164 |
+
|
165 |
def split_list_by_lengths(data, length_list):
|
166 |
split_data = []
|
167 |
start_idx = 0
|
|
|
172 |
start_idx = end_idx
|
173 |
return split_data
|
174 |
|
175 |
+
|
176 |
def merge_img_sequence_from_ref(ref_video_path, image_sequence, output_file_name):
|
177 |
video_clip = VideoFileClip(ref_video_path)
|
178 |
fps = video_clip.fps
|
|
|
184 |
if audio_clip is not None:
|
185 |
edited_video_clip = edited_video_clip.set_audio(audio_clip)
|
186 |
|
187 |
+
bitrate = get_bitrate_for_resolution(min(*edited_video_clip.size), "high")
|
188 |
+
|
189 |
edited_video_clip.set_duration(duration).write_videofile(
|
190 |
+
output_file_name, codec="libx264", bitrate=bitrate,
|
191 |
)
|
192 |
edited_video_clip.close()
|
193 |
video_clip.close()
|
194 |
|
195 |
+
|
196 |
def scale_bbox_from_center(bbox, scale_width, scale_height, image_width, image_height):
|
197 |
# Extract the coordinates of the bbox
|
198 |
x1, y1, x2, y2 = bbox
|
|
|
222 |
# Return the scaled bbox coordinates
|
223 |
scaled_bbox = [new_x1, new_y1, new_x2, new_y2]
|
224 |
return scaled_bbox
|
225 |
+
|
226 |
+
|
227 |
+
def laplacian_blending(A, B, m, num_levels=4):
|
228 |
+
assert A.shape == B.shape
|
229 |
+
assert B.shape == m.shape
|
230 |
+
height = m.shape[0]
|
231 |
+
width = m.shape[1]
|
232 |
+
size_list = np.array([4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096])
|
233 |
+
size = size_list[np.where(size_list > max(height, width))][0]
|
234 |
+
GA = np.zeros((size, size, 3), dtype=np.float32)
|
235 |
+
GA[:height, :width, :] = A
|
236 |
+
GB = np.zeros((size, size, 3), dtype=np.float32)
|
237 |
+
GB[:height, :width, :] = B
|
238 |
+
GM = np.zeros((size, size, 3), dtype=np.float32)
|
239 |
+
GM[:height, :width, :] = m
|
240 |
+
gpA = [GA]
|
241 |
+
gpB = [GB]
|
242 |
+
gpM = [GM]
|
243 |
+
for i in range(num_levels):
|
244 |
+
GA = cv2.pyrDown(GA)
|
245 |
+
GB = cv2.pyrDown(GB)
|
246 |
+
GM = cv2.pyrDown(GM)
|
247 |
+
gpA.append(np.float32(GA))
|
248 |
+
gpB.append(np.float32(GB))
|
249 |
+
gpM.append(np.float32(GM))
|
250 |
+
lpA = [gpA[num_levels-1]]
|
251 |
+
lpB = [gpB[num_levels-1]]
|
252 |
+
gpMr = [gpM[num_levels-1]]
|
253 |
+
for i in range(num_levels-1,0,-1):
|
254 |
+
LA = np.subtract(gpA[i-1], cv2.pyrUp(gpA[i]))
|
255 |
+
LB = np.subtract(gpB[i-1], cv2.pyrUp(gpB[i]))
|
256 |
+
lpA.append(LA)
|
257 |
+
lpB.append(LB)
|
258 |
+
gpMr.append(gpM[i-1])
|
259 |
+
LS = []
|
260 |
+
for la,lb,gm in zip(lpA,lpB,gpMr):
|
261 |
+
ls = la * gm + lb * (1.0 - gm)
|
262 |
+
LS.append(ls)
|
263 |
+
ls_ = LS[0]
|
264 |
+
for i in range(1,num_levels):
|
265 |
+
ls_ = cv2.pyrUp(ls_)
|
266 |
+
ls_ = cv2.add(ls_, LS[i])
|
267 |
+
ls_ = np.clip(ls_[:height, :width, :], 0, 255)
|
268 |
+
return ls_
|
269 |
+
|
270 |
+
|
271 |
+
def make_white_image(shape, crop=None, white_value=255):
|
272 |
+
img_white = np.full((shape[0], shape[1]), white_value, dtype=np.float32)
|
273 |
+
if crop is not None:
|
274 |
+
top = int(crop[0])
|
275 |
+
bottom = int(crop[1])
|
276 |
+
if top + bottom < shape[1]:
|
277 |
+
if top > 0: img_white[:top, :] = 0
|
278 |
+
if bottom > 0: img_white[-bottom:, :] = 0
|
279 |
+
|
280 |
+
left = int(crop[2])
|
281 |
+
right = int(crop[3])
|
282 |
+
if left + right < shape[0]:
|
283 |
+
if left > 0: img_white[:, :left] = 0
|
284 |
+
if right > 0: img_white[:, -right:] = 0
|
285 |
+
|
286 |
+
return img_white
|
287 |
+
|
288 |
+
|
289 |
+
def remove_hair(img, model=None):
|
290 |
+
if model is None:
|
291 |
+
path = "./assets/pretrained_models/79999_iter.pth"
|
292 |
+
model = init_parser(path, mode="cuda" if torch.cuda.is_available() else "cpu")
|
293 |
+
|