|
import os |
|
import random |
|
from traceback import print_exc |
|
from typing import List, Tuple |
|
|
|
import gradio as gr |
|
import numpy as np |
|
try: from moviepy.editor import concatenate_videoclips, ImageClip |
|
except ImportError: print(f"moviepy python module not installed. Will not be able to generate video.") |
|
|
|
import modules.scripts as scripts |
|
from modules.processing import Processed, process_images, StableDiffusionProcessing, get_fixed_seed |
|
from modules.shared import state |
|
from modules.devices import torch_gc |
|
|
|
DEFAULT_MODE = 'simple' |
|
DEFAULT_STEP = 64 |
|
DEFAULT_SIZE = 512 |
|
DEFAULT_VIDEO_SAVE = True |
|
DEFAULT_VIDEO_FPS = 3 |
|
DEFAULT_VIDEO_CONCAT = 'compose' |
|
DEFAULT_DEBUG = True |
|
|
|
HINT_H_OPTS = '<start>:<end>:<step>, e.g.: 512:1024:64' |
|
HINT_W_OPTS = '<start>:<end>:<step>, e.g.: 512:1024:64' |
|
HINT_HW_OPTS = '<h_start>:<h_end>:<h_step>:<w_start>:<w_end>:<w_step>, e.g.: 512:768:768:512:32' |
|
|
|
|
|
def _list_to_int(ls:List[str]): |
|
return [int(x.strip()) for x in ls] |
|
|
|
def hwrange(start, end, step=DEFAULT_STEP): |
|
def _offset(end:int, step:int): |
|
if step > 0: return end + 1 |
|
if step < 0: return end - 1 |
|
|
|
assert start > 0 and end > 0, 'range boundary should be positive' |
|
assert step > 0, 'step size must be postive! (the ascending/descending order is auto inferred from `start` and `end`:)' |
|
|
|
if start > end: step = -step |
|
return list(range(start, _offset(end, step), step)) |
|
|
|
def parse_simple_opts(s:str) -> List[int]: |
|
r = [] |
|
|
|
sect = s.strip() |
|
if ':' in sect: |
|
segs = _list_to_int(sect.split(':')) |
|
if len(segs) == 2: |
|
start, end = segs[0], segs[1] |
|
r.extend(hwrange(start, end)) |
|
elif len(segs) == 3: |
|
start, end, step = segs[0], segs[1], segs[2] |
|
r.extend(hwrange(start, end, step)) |
|
else: raise ValueError(f'unkonw format for sect {sect}') |
|
else: |
|
r.append(int(sect)) |
|
|
|
return r |
|
|
|
def zip_hw(heights:List[int], widths:List[int]) -> List[Tuple[int, int]]: |
|
if not heights or not widths: return [ ] |
|
|
|
maxlen = max(len(heights), len(widths)) |
|
while len(heights) < maxlen: heights.append(heights[-1]) |
|
while len(widths) < maxlen: widths .append(widths[-1]) |
|
|
|
return [(h, w) for h, w in zip(heights, widths)] |
|
|
|
def parse_advance_opts(s:str) -> List[Tuple[int, int]]: |
|
r = [] |
|
|
|
|
|
def _(x, hw): |
|
if x == -1: |
|
if r: return r[-1][hw] |
|
else: return DEFAULT_SIZE |
|
else: return x |
|
def _h(x): return _(x, 0) |
|
def _w(x): return _(x, 1) |
|
|
|
def parse_1_seg(segs): |
|
hw, = segs |
|
r.append((_h(hw), _w(hw))) |
|
|
|
def parse_2_seg(segs): |
|
h, w = segs |
|
r.append((_h(h), _w(w))) |
|
|
|
def parse_3_seg(segs): |
|
hw_start, hw_end, step = segs |
|
hw_start, hw_end = _h(hw_start), _w(hw_end) |
|
r.extend([(hw, hw) for hw in hwrange(hw_start, hw_end, step)]) |
|
|
|
def parse_4_seg(segs): |
|
h_start, h_end, w_start, w_end = segs |
|
h_start, h_end = _h(h_start), _w(h_end) |
|
w_start, w_end = _h(w_start), _w(w_end) |
|
hs = hwrange(h_start, h_end) |
|
ws = hwrange(w_start, w_end) |
|
hws = zip_hw(hs, ws) |
|
r.extend(hws) |
|
|
|
def parse_5_seg(segs): |
|
h_start, h_end, w_start, w_end, step = segs |
|
h_start, h_end = _h(h_start), _w(h_end) |
|
w_start, w_end = _h(w_start), _w(w_end) |
|
hs = hwrange(h_start, h_end, step) |
|
ws = hwrange(w_start, w_end, step) |
|
hws = zip_hw(hs, ws) |
|
r.extend(hws) |
|
|
|
def parse_6_seg(segs): |
|
h_start, h_end, h_step, w_start, w_end, w_step = segs |
|
h_start, h_end = _h(h_start), _w(h_end) |
|
w_start, w_end = _h(w_start), _w(w_end) |
|
hs = hwrange(h_start, h_end, h_step) |
|
ws = hwrange(w_start, w_end, w_step) |
|
hws = zip_hw(hs, ws) |
|
r.extend(hws) |
|
|
|
sects = s.strip().split(',') |
|
for sect in sects: |
|
segs = _list_to_int(sect.strip().split(':')) |
|
locals().get(f'parse_{len(segs)}_seg')(segs) |
|
|
|
if r: |
|
rr = [r[0]] |
|
for hw in r[1:]: |
|
if hw != rr[-1]: |
|
rr.append(hw) |
|
return rr |
|
else: |
|
return r |
|
|
|
|
|
class Script(scripts.Script): |
|
|
|
def title(self): |
|
return 'Size Travel' |
|
|
|
def describe(self): |
|
return "Travel through a series of image sizes and generates a video." |
|
|
|
def show(self, is_img2img): |
|
return True |
|
|
|
def ui(self, is_img2img): |
|
with gr.Row(): |
|
mode = gr.Radio(choices=['simple', 'advance'], value=lambda: DEFAULT_MODE) |
|
|
|
with gr.Row(visible=DEFAULT_MODE=='simple') as tab_simple: |
|
height_opt = gr.Textbox(label='Height Variation', lines=1, placeholder=HINT_H_OPTS) |
|
width_opt = gr.Textbox(label='Width Variation', lines=1, placeholder=HINT_W_OPTS) |
|
|
|
with gr.Row(visible=DEFAULT_MODE=='advance') as tab_advance: |
|
advance_opt = gr.Textbox(label='Height/Width Variation', lines=3, placeholder=HINT_HW_OPTS) |
|
|
|
with gr.Row(): |
|
video_fps = gr.Number(label='Video FPS', value=lambda: DEFAULT_VIDEO_FPS) |
|
video_concat = gr.Radio(label='Video concat method', choices=['compose', 'chain'], value=lambda: DEFAULT_VIDEO_CONCAT) |
|
|
|
show_debug = gr.Checkbox(label='Show verbose debug info at console', value=lambda: DEFAULT_DEBUG) |
|
|
|
def switch_mode(mode): |
|
return [ |
|
{ 'visible': mode == 'simple', '__type__': 'update' }, |
|
{ 'visible': mode == 'advance', '__type__': 'update' }, |
|
] |
|
|
|
mode.change(fn=switch_mode, inputs=[mode], outputs=[tab_simple, tab_advance]) |
|
|
|
return [mode, height_opt, width_opt, advance_opt, video_fps, video_concat, show_debug] |
|
|
|
def get_next_sequence_number(path): |
|
from pathlib import Path |
|
""" |
|
Determines and returns the next sequence number to use when saving an image in the specified directory. |
|
The sequence starts at 0. |
|
""" |
|
result = -1 |
|
dir = Path(path) |
|
for file in dir.iterdir(): |
|
if not file.is_dir(): continue |
|
try: |
|
num = int(file.name) |
|
if num > result: result = num |
|
except ValueError: |
|
pass |
|
return result + 1 |
|
|
|
def run(self, p:StableDiffusionProcessing, mode, height_opt, width_opt, advance_opt, video_fps, video_concat, show_debug): |
|
initial_info = None |
|
images = [] |
|
|
|
if mode == 'simple': |
|
if not height_opt or not width_opt: |
|
return Processed(p, images, p.seed, 'run in simple mode but got empty "height_opt" or "width_opt"') |
|
|
|
hs = parse_simple_opts(height_opt) |
|
ws = parse_simple_opts(width_opt) |
|
hws = zip_hw(hs, ws) |
|
elif mode == 'advance': |
|
if not advance_opt: |
|
return Processed(p, images, p.seed, 'run in advance mode, but get empty "advance_opt"') |
|
|
|
hws = parse_advance_opts(advance_opt) |
|
else: |
|
return Processed(p, images, p.seed, f'unknown size_travel mode {mode}') |
|
|
|
if show_debug: print('[size_travel] hws:', hws) |
|
|
|
|
|
travel_path = os.path.join(p.outpath_samples, 'size_travel') |
|
os.makedirs(travel_path, exist_ok=True) |
|
travel_number = Script.get_next_sequence_number(travel_path) |
|
travel_path = os.path.join(travel_path, f"{travel_number:05}") |
|
p.outpath_samples = travel_path |
|
|
|
|
|
p.n_iter = 1 |
|
p.batch_size = 1 |
|
|
|
|
|
p.seed = get_fixed_seed(p.seed) |
|
self.subseed = p.subseed |
|
if show_debug: |
|
print('seed:', p.seed) |
|
print('subseed:', p.subseed) |
|
|
|
|
|
n_jobs = len(hws) |
|
state.job_count = n_jobs |
|
print(f"Generating {n_jobs} images.") |
|
for h, w in hws: |
|
if state.interrupted: break |
|
torch_gc() |
|
|
|
p.height = h |
|
p.width = w |
|
p.subseed = self.subseed |
|
|
|
try: |
|
proc = process_images(p) |
|
if initial_info is None: initial_info = proc.info |
|
images += proc.images |
|
except: |
|
print(f'>> error gen size ({h}, {w})') |
|
if show_debug: print_exc() |
|
|
|
if video_fps > 0 and len(images) > 1: |
|
try: |
|
imgs = [np.asarray(t) for t in images] |
|
frames = [ImageClip(img, duration=1/video_fps) for img in imgs] |
|
clip = concatenate_videoclips(frames, method=video_concat) |
|
clip.fps = video_fps |
|
clip.write_videofile(os.path.join(travel_path, f"travel-{travel_number:05}.mp4"), verbose=False, audio=False) |
|
except NameError: pass |
|
except: print_exc() |
|
|
|
return Processed(p, images, p.seed, initial_info) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
assert parse_simple_opts('512:768:32') == [512, 544, 576, 608, 640, 672, 704, 736, 768] |
|
assert parse_simple_opts('768:512:32') == [768, 736, 704, 672, 640, 608, 576, 544, 512] |
|
assert parse_simple_opts('512:768') == [512, 544, 576, 608, 640, 672, 704, 736, 768] |
|
assert parse_simple_opts('512') == [512] |
|
assert parse_simple_opts('512:768:114514') == [512] |
|
|
|
hs = parse_simple_opts('512:768:128') == [512, 640, 768] |
|
ws = parse_simple_opts('512') == [512] |
|
assert zip_hw(hs, ws) == [(512, 512), (640, 512), (768, 512)] |
|
ws = parse_simple_opts('512:768:256') == [512, 768] |
|
assert zip_hw(hs, ws) == [(512, 512), (640, 768), (768, 768)] |
|
|
|
|
|
hws = parse_advance_opts('512, 512:512:10, 512:512:512:512:10, 512:512:3:512:512:3') |
|
assert hws == [(512, 512)] |
|
|
|
hws = parse_advance_opts('1:9:2:6:2') |
|
assert hws == [(1, 2), (3, 4), (5, 6), (7, 6), (9, 6)] |
|
|
|
hws = parse_advance_opts('1:3:1:30:10:-10') |
|
assert hws == [(1, 30), (2, 20), (3, 10)] |
|
hws = parse_advance_opts('1:3:1:30:10:-20') |
|
assert hws == [(1, 30), (2, 10), (3, 10)] |
|
|
|
hws = parse_advance_opts('512, 384:384, -1:768:128, 768:512:114514, -1:768:-1:512:128') |
|
assert hws == [(512, 512), (384, 384), (512, 512), (640, 640), (768, 768), (768, 640), (768, 512)] |
|
|
|
print('All tests passed.') |
|
|