|
import copy |
|
import os |
|
import shutil |
|
|
|
import cv2 |
|
import gradio as gr |
|
import modules.scripts as scripts |
|
|
|
from modules import images |
|
from modules.processing import process_images |
|
from modules.shared import opts |
|
from PIL import Image |
|
|
|
import numpy as np |
|
|
|
_BASEDIR = "/controlnet-m2m" |
|
_BASEFILE = "animation" |
|
|
|
def get_all_frames(video_path): |
|
if video_path is None: |
|
return None |
|
cap = cv2.VideoCapture(video_path) |
|
frame_list = [] |
|
if not cap.isOpened(): |
|
return |
|
while True: |
|
ret, frame = cap.read() |
|
if ret: |
|
frame_list.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
else: |
|
return frame_list |
|
|
|
def get_min_frame_num(video_list): |
|
min_frame_num = -1 |
|
for video in video_list: |
|
if video is None: |
|
continue |
|
else: |
|
frame_num = len(video) |
|
print(frame_num) |
|
if min_frame_num < 0: |
|
min_frame_num = frame_num |
|
elif frame_num < min_frame_num: |
|
min_frame_num = frame_num |
|
return min_frame_num |
|
|
|
def pil2cv(image): |
|
new_image = np.array(image, dtype=np.uint8) |
|
if new_image.ndim == 2: |
|
pass |
|
elif new_image.shape[2] == 3: |
|
new_image = new_image[:, :, ::-1] |
|
elif new_image.shape[2] == 4: |
|
new_image = new_image[:, :, [2, 1, 0, 3]] |
|
return new_image |
|
|
|
|
|
def save_gif(path, image_list, name, duration): |
|
tmp_dir = path + "/tmp/" |
|
if os.path.isdir(tmp_dir): |
|
shutil.rmtree(tmp_dir) |
|
os.mkdir(tmp_dir) |
|
for i, image in enumerate(image_list): |
|
images.save_image(image, tmp_dir, f"output_{i}") |
|
|
|
os.makedirs(f"{path}{_BASEDIR}", exist_ok=True) |
|
|
|
image_list[0].save(f"{path}{_BASEDIR}/{name}.gif", save_all=True, append_images=image_list[1:], optimize=False, duration=duration, loop=0) |
|
|
|
|
|
class Script(scripts.Script): |
|
|
|
def title(self): |
|
return "controlnet m2m" |
|
|
|
def show(self, is_img2img): |
|
return True |
|
|
|
def ui(self, is_img2img): |
|
|
|
|
|
|
|
|
|
|
|
ctrls_group = () |
|
max_models = opts.data.get("control_net_max_models_num", 1) |
|
|
|
with gr.Group(): |
|
with gr.Accordion("ControlNet-M2M", open = False): |
|
duration = gr.Slider(label=f"Duration", value=50.0, minimum=10.0, maximum=200.0, step=10, interactive=True, elem_id='controlnet_movie2movie_duration_slider') |
|
with gr.Tabs(): |
|
for i in range(max_models): |
|
with gr.Tab(f"ControlNet-{i}"): |
|
with gr.TabItem("Movie Input"): |
|
ctrls_group += (gr.Video(format='mp4', source='upload', elem_id = f"video_{i}"), ) |
|
with gr.TabItem("Image Input"): |
|
ctrls_group += (gr.Image(source='upload', brush_radius=20, mirror_webcam=False, type='numpy', tool='sketch', elem_id=f'image_{i}'), ) |
|
ctrls_group += (gr.Checkbox(label=f"Save preprocessed", value=False, elem_id = f"save_pre_{i}"),) |
|
|
|
ctrls_group += (duration,) |
|
|
|
return ctrls_group |
|
|
|
def run(self, p, *args): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
contents_num = opts.data.get("control_net_max_models_num", 1) |
|
arg_num = 3 |
|
item_list = [] |
|
video_list = [] |
|
for input_set in [tuple(args[:contents_num * arg_num][i:i+3]) for i in range(0, len(args[:contents_num * arg_num]), arg_num)]: |
|
if input_set[0] is not None: |
|
item_list.append([get_all_frames(input_set[0]), "video"]) |
|
video_list.append(get_all_frames(input_set[0])) |
|
if input_set[1] is not None: |
|
item_list.append([cv2.cvtColor(pil2cv(input_set[1]["image"]), cv2.COLOR_BGRA2RGB), "image"]) |
|
|
|
save_pre = list(args[2:contents_num * arg_num:3]) |
|
item_num = len(item_list) |
|
video_num = len(video_list) |
|
duration, = args[contents_num * arg_num:] |
|
|
|
frame_num = get_min_frame_num(video_list) |
|
if frame_num > 0: |
|
output_image_list = [] |
|
pre_output_image_list = [] |
|
for i in range(item_num): |
|
pre_output_image_list.append([]) |
|
|
|
for frame in range(frame_num): |
|
copy_p = copy.copy(p) |
|
copy_p.control_net_input_image = [] |
|
for item in item_list: |
|
if item[1] == "video": |
|
copy_p.control_net_input_image.append(item[0][frame]) |
|
elif item[1] == "image": |
|
copy_p.control_net_input_image.append(item[0]) |
|
else: |
|
continue |
|
|
|
proc = process_images(copy_p) |
|
img = proc.images[0] |
|
output_image_list.append(img) |
|
|
|
for i in range(len(save_pre)): |
|
if save_pre[i]: |
|
try: |
|
pre_output_image_list[i].append(proc.images[i + 1]) |
|
except: |
|
print(f"proc.images[{i} failed") |
|
|
|
copy_p.close() |
|
|
|
|
|
|
|
seq = images.get_next_sequence_number(f"{p.outpath_samples}{_BASEDIR}", "") |
|
filename = f"{seq:05}-{proc.seed}-{_BASEFILE}" |
|
save_gif(p.outpath_samples, output_image_list, filename, duration) |
|
proc.images = [f"{p.outpath_samples}{_BASEDIR}/{filename}.gif"] |
|
|
|
|
|
for i in range(len(save_pre)): |
|
if save_pre[i]: |
|
|
|
save_gif(p.outpath_samples, pre_output_image_list[i], f"{filename}-control{i}", duration) |
|
proc.images.append(f"{p.outpath_samples}{_BASEDIR}/{filename}-control{i}.gif") |
|
|
|
else: |
|
proc = process_images(p) |
|
|
|
return proc |
|
|