Spaces:
Runtime error
Runtime error
import torch | |
import math | |
from backend.misc import image_resize | |
from backend import memory_management, state_dict, utils | |
from backend.nn.cnets import cldm, t2i_adapter | |
from backend.patcher.base import ModelPatcher | |
from backend.operations import using_forge_operations, ForgeOperations, main_stream_worker, weights_manual_cast | |
def apply_controlnet_advanced( | |
unet, | |
controlnet, | |
image_bchw, | |
strength, | |
start_percent, | |
end_percent, | |
positive_advanced_weighting=None, | |
negative_advanced_weighting=None, | |
advanced_frame_weighting=None, | |
advanced_sigma_weighting=None, | |
advanced_mask_weighting=None | |
): | |
""" | |
# positive_advanced_weighting or negative_advanced_weighting | |
Unet has input, middle, output blocks, and we can give different weights to each layers in all blocks. | |
Below is an example for stronger control in middle block. | |
This is helpful for some high-res fix passes. | |
positive_advanced_weighting = { | |
'input': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2], | |
'middle': [1.0], | |
'output': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2] | |
} | |
negative_advanced_weighting = { | |
'input': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2], | |
'middle': [1.0], | |
'output': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2] | |
} | |
# advanced_frame_weighting | |
The advanced_frame_weighting is a weight applied to each image in a batch. | |
The length of this list must be same with batch size | |
For example, if batch size is 5, you can use advanced_frame_weighting = [0, 0.25, 0.5, 0.75, 1.0] | |
If you view the 5 images as 5 frames in a video, this will lead to progressively stronger control over time. | |
# advanced_sigma_weighting | |
The advanced_sigma_weighting allows you to dynamically compute control | |
weights given diffusion timestep (sigma). | |
For example below code can softly make beginning steps stronger than ending steps. | |
sigma_max = unet.model.model_sampling.sigma_max | |
sigma_min = unet.model.model_sampling.sigma_min | |
advanced_sigma_weighting = lambda s: (s - sigma_min) / (sigma_max - sigma_min) | |
# advanced_mask_weighting | |
A mask can be applied to control signals. | |
This should be a tensor with shape B 1 H W where the H and W can be arbitrary. | |
This mask will be resized automatically to match the shape of all injection layers. | |
""" | |
cnet = controlnet.copy().set_cond_hint(image_bchw, strength, (start_percent, end_percent)) | |
cnet.positive_advanced_weighting = positive_advanced_weighting | |
cnet.negative_advanced_weighting = negative_advanced_weighting | |
cnet.advanced_frame_weighting = advanced_frame_weighting | |
cnet.advanced_sigma_weighting = advanced_sigma_weighting | |
if advanced_mask_weighting is not None: | |
assert isinstance(advanced_mask_weighting, torch.Tensor) | |
B, C, H, W = advanced_mask_weighting.shape | |
assert B > 0 and C == 1 and H > 0 and W > 0 | |
cnet.advanced_mask_weighting = advanced_mask_weighting | |
m = unet.clone() | |
m.add_patched_controlnet(cnet) | |
return m | |
def compute_controlnet_weighting(control, cnet): | |
positive_advanced_weighting = getattr(cnet, 'positive_advanced_weighting', None) | |
negative_advanced_weighting = getattr(cnet, 'negative_advanced_weighting', None) | |
advanced_frame_weighting = getattr(cnet, 'advanced_frame_weighting', None) | |
advanced_sigma_weighting = getattr(cnet, 'advanced_sigma_weighting', None) | |
advanced_mask_weighting = getattr(cnet, 'advanced_mask_weighting', None) | |
transformer_options = cnet.transformer_options | |
if positive_advanced_weighting is None and negative_advanced_weighting is None \ | |
and advanced_frame_weighting is None and advanced_sigma_weighting is None \ | |
and advanced_mask_weighting is None: | |
return control | |
cond_or_uncond = transformer_options['cond_or_uncond'] | |
sigmas = transformer_options['sigmas'] | |
cond_mark = transformer_options['cond_mark'] | |
if advanced_frame_weighting is not None: | |
advanced_frame_weighting = torch.Tensor(advanced_frame_weighting * len(cond_or_uncond)).to(sigmas) | |
assert advanced_frame_weighting.shape[0] == cond_mark.shape[0], \ | |
'Frame weighting list length is different from batch size!' | |
if advanced_sigma_weighting is not None: | |
advanced_sigma_weighting = torch.cat([advanced_sigma_weighting(sigmas)] * len(cond_or_uncond)) | |
for k, v in control.items(): | |
for i in range(len(v)): | |
control_signal = control[k][i] | |
if not isinstance(control_signal, torch.Tensor): | |
continue | |
B, C, H, W = control_signal.shape | |
positive_weight = 1.0 | |
negative_weight = 1.0 | |
sigma_weight = 1.0 | |
frame_weight = 1.0 | |
if positive_advanced_weighting is not None: | |
positive_weight = get_at(positive_advanced_weighting.get(k, []), i, 1.0) | |
if negative_advanced_weighting is not None: | |
negative_weight = get_at(negative_advanced_weighting.get(k, []), i, 1.0) | |
if advanced_sigma_weighting is not None: | |
sigma_weight = advanced_sigma_weighting | |
if advanced_frame_weighting is not None: | |
frame_weight = advanced_frame_weighting | |
final_weight = positive_weight * (1.0 - cond_mark) + negative_weight * cond_mark | |
final_weight = final_weight * sigma_weight * frame_weight | |
if isinstance(advanced_mask_weighting, torch.Tensor): | |
if advanced_mask_weighting.shape[0] != 1: | |
k_ = int(control_signal.shape[0] // advanced_mask_weighting.shape[0]) | |
if control_signal.shape[0] == k_ * advanced_mask_weighting.shape[0]: | |
advanced_mask_weighting = advanced_mask_weighting.repeat(k_, 1, 1, 1) | |
control_signal = control_signal * torch.nn.functional.interpolate(advanced_mask_weighting.to(control_signal), size=(H, W), mode='bilinear') | |
control[k][i] = control_signal * final_weight[:, None, None, None] | |
return control | |
def broadcast_image_to(tensor, target_batch_size, batched_number): | |
current_batch_size = tensor.shape[0] | |
if current_batch_size == 1: | |
return tensor | |
per_batch = target_batch_size // batched_number | |
tensor = tensor[:per_batch] | |
if per_batch > tensor.shape[0]: | |
tensor = torch.cat([tensor] * (per_batch // tensor.shape[0]) + [tensor[:(per_batch % tensor.shape[0])]], dim=0) | |
current_batch_size = tensor.shape[0] | |
if current_batch_size == target_batch_size: | |
return tensor | |
else: | |
return torch.cat([tensor] * batched_number, dim=0) | |
def get_at(array, index, default=None): | |
return array[index] if 0 <= index < len(array) else default | |
class ControlBase: | |
def __init__(self, device=None): | |
self.cond_hint_original = None | |
self.cond_hint = None | |
self.strength = 1.0 | |
self.timestep_percent_range = (0.0, 1.0) | |
self.global_average_pooling = False | |
self.timestep_range = None | |
self.transformer_options = {} | |
if device is None: | |
device = memory_management.get_torch_device() | |
self.device = device | |
self.previous_controlnet = None | |
def set_cond_hint(self, cond_hint, strength=1.0, timestep_percent_range=(0.0, 1.0)): | |
self.cond_hint_original = cond_hint | |
self.strength = strength | |
self.timestep_percent_range = timestep_percent_range | |
return self | |
def pre_run(self, model, percent_to_timestep_function): | |
self.timestep_range = (percent_to_timestep_function(self.timestep_percent_range[0]), percent_to_timestep_function(self.timestep_percent_range[1])) | |
if self.previous_controlnet is not None: | |
self.previous_controlnet.pre_run(model, percent_to_timestep_function) | |
def set_previous_controlnet(self, controlnet): | |
self.previous_controlnet = controlnet | |
return self | |
def cleanup(self): | |
if self.previous_controlnet is not None: | |
self.previous_controlnet.cleanup() | |
if self.cond_hint is not None: | |
del self.cond_hint | |
self.cond_hint = None | |
self.timestep_range = None | |
def get_models(self): | |
out = [] | |
if self.previous_controlnet is not None: | |
out += self.previous_controlnet.get_models() | |
return out | |
def copy_to(self, c): | |
c.cond_hint_original = self.cond_hint_original | |
c.strength = self.strength | |
c.timestep_percent_range = self.timestep_percent_range | |
c.global_average_pooling = self.global_average_pooling | |
def inference_memory_requirements(self, dtype): | |
if self.previous_controlnet is not None: | |
return self.previous_controlnet.inference_memory_requirements(dtype) | |
return 0 | |
def control_merge(self, control_input, control_output, control_prev, output_dtype): | |
out = {'input': [], 'middle': [], 'output': []} | |
if control_input is not None: | |
for i in range(len(control_input)): | |
key = 'input' | |
x = control_input[i] | |
if x is not None: | |
x *= self.strength | |
if x.dtype != output_dtype: | |
x = x.to(output_dtype) | |
out[key].insert(0, x) | |
if control_output is not None: | |
for i in range(len(control_output)): | |
if i == (len(control_output) - 1): | |
key = 'middle' | |
index = 0 | |
else: | |
key = 'output' | |
index = i | |
x = control_output[i] | |
if x is not None: | |
if self.global_average_pooling: | |
x = torch.mean(x, dim=(2, 3), keepdim=True).repeat(1, 1, x.shape[2], x.shape[3]) | |
x *= self.strength | |
if x.dtype != output_dtype: | |
x = x.to(output_dtype) | |
out[key].append(x) | |
out = compute_controlnet_weighting(out, self) | |
if control_prev is not None: | |
for x in ['input', 'middle', 'output']: | |
o = out[x] | |
for i in range(len(control_prev[x])): | |
prev_val = control_prev[x][i] | |
if i >= len(o): | |
o.append(prev_val) | |
elif prev_val is not None: | |
if o[i] is None: | |
o[i] = prev_val | |
else: | |
if o[i].shape[0] < prev_val.shape[0]: | |
o[i] = prev_val + o[i] | |
else: | |
o[i] += prev_val | |
return out | |
class ControlNet(ControlBase): | |
def __init__(self, control_model, global_average_pooling=False, device=None, load_device=None, manual_cast_dtype=None): | |
super().__init__(device) | |
self.control_model = control_model | |
self.load_device = load_device | |
self.control_model_wrapped = ModelPatcher(self.control_model, load_device=load_device, offload_device=memory_management.unet_offload_device()) | |
self.global_average_pooling = global_average_pooling | |
self.model_sampling_current = None | |
self.manual_cast_dtype = manual_cast_dtype | |
def get_control(self, x_noisy, t, cond, batched_number): | |
to = self.transformer_options | |
for conditioning_modifier in to.get('controlnet_conditioning_modifiers', []): | |
x_noisy, t, cond, batched_number = conditioning_modifier(self, x_noisy, t, cond, batched_number) | |
control_prev = None | |
if self.previous_controlnet is not None: | |
control_prev = self.previous_controlnet.get_control(x_noisy, t, cond, batched_number) | |
if self.timestep_range is not None: | |
if t[0] > self.timestep_range[0] or t[0] < self.timestep_range[1]: | |
if control_prev is not None: | |
return control_prev | |
else: | |
return None | |
dtype = self.control_model.dtype | |
if self.manual_cast_dtype is not None: | |
dtype = self.manual_cast_dtype | |
output_dtype = x_noisy.dtype | |
if self.cond_hint is None or x_noisy.shape[2] * 8 != self.cond_hint.shape[2] or x_noisy.shape[3] * 8 != self.cond_hint.shape[3]: | |
if self.cond_hint is not None: | |
del self.cond_hint | |
self.cond_hint = None | |
self.cond_hint = image_resize.adaptive_resize(self.cond_hint_original, x_noisy.shape[3] * 8, x_noisy.shape[2] * 8, 'nearest-exact', "center").to(dtype) | |
if x_noisy.shape[0] != self.cond_hint.shape[0]: | |
self.cond_hint = broadcast_image_to(self.cond_hint, x_noisy.shape[0], batched_number) | |
context = cond['c_crossattn'] | |
y = cond.get('y', None) | |
if y is not None: | |
y = y.to(dtype) | |
timestep = self.model_sampling_current.timestep(t) | |
x_noisy = self.model_sampling_current.calculate_input(t, x_noisy) | |
controlnet_model_function_wrapper = to.get('controlnet_model_function_wrapper', None) | |
if controlnet_model_function_wrapper is not None: | |
wrapper_args = dict(x=x_noisy.to(dtype), hint=self.cond_hint, timesteps=timestep.float(), | |
context=context.to(dtype), y=y) | |
wrapper_args['model'] = self | |
wrapper_args['inner_model'] = self.control_model | |
control = controlnet_model_function_wrapper(**wrapper_args) | |
else: | |
control = self.control_model(x=x_noisy.to(dtype), hint=self.cond_hint.to(self.device), timesteps=timestep.float(), context=context.to(dtype), y=y) | |
return self.control_merge(None, control, control_prev, output_dtype) | |
def copy(self): | |
c = ControlNet(self.control_model, global_average_pooling=self.global_average_pooling, load_device=self.load_device, manual_cast_dtype=self.manual_cast_dtype) | |
self.copy_to(c) | |
return c | |
def get_models(self): | |
out = super().get_models() | |
out.append(self.control_model_wrapped) | |
return out | |
def pre_run(self, model, percent_to_timestep_function): | |
super().pre_run(model, percent_to_timestep_function) | |
self.model_sampling_current = model.predictor | |
def cleanup(self): | |
self.model_sampling_current = None | |
super().cleanup() | |
class ControlLoraOps(ForgeOperations): | |
class Linear(torch.nn.Module): | |
def __init__(self, in_features: int, out_features: int, bias: bool = True, device=None, dtype=None) -> None: | |
super().__init__() | |
self.in_features = in_features | |
self.out_features = out_features | |
self.weight = None | |
self.up = None | |
self.down = None | |
self.bias = None | |
def forward(self, input): | |
weight, bias, signal = weights_manual_cast(self, input) | |
with main_stream_worker(weight, bias, signal): | |
if self.up is not None: | |
return torch.nn.functional.linear(input, weight + (torch.mm(self.up.flatten(start_dim=1), self.down.flatten(start_dim=1))).reshape(self.weight.shape).type(input.dtype), bias) | |
else: | |
return torch.nn.functional.linear(input, weight, bias) | |
class Conv2d(torch.nn.Module): | |
def __init__( | |
self, | |
in_channels, | |
out_channels, | |
kernel_size, | |
stride=1, | |
padding=0, | |
dilation=1, | |
groups=1, | |
bias=True, | |
padding_mode='zeros', | |
device=None, | |
dtype=None | |
): | |
super().__init__() | |
self.in_channels = in_channels | |
self.out_channels = out_channels | |
self.kernel_size = kernel_size | |
self.stride = stride | |
self.padding = padding | |
self.dilation = dilation | |
self.transposed = False | |
self.output_padding = 0 | |
self.groups = groups | |
self.padding_mode = padding_mode | |
self.weight = None | |
self.bias = None | |
self.up = None | |
self.down = None | |
def forward(self, input): | |
weight, bias, signal = weights_manual_cast(self, input) | |
with main_stream_worker(weight, bias, signal): | |
if self.up is not None: | |
return torch.nn.functional.conv2d(input, weight + (torch.mm(self.up.flatten(start_dim=1), self.down.flatten(start_dim=1))).reshape(self.weight.shape).type(input.dtype), bias, self.stride, self.padding, self.dilation, self.groups) | |
else: | |
return torch.nn.functional.conv2d(input, weight, bias, self.stride, self.padding, self.dilation, self.groups) | |
class ControlLora(ControlNet): | |
def __init__(self, control_weights, global_average_pooling=False, device=None): | |
ControlBase.__init__(self, device) | |
self.control_weights = control_weights | |
self.global_average_pooling = global_average_pooling | |
def pre_run(self, model, percent_to_timestep_function): | |
super().pre_run(model, percent_to_timestep_function) | |
controlnet_config = model.diffusion_model.config.copy() | |
controlnet_config.pop("out_channels") | |
controlnet_config["hint_channels"] = self.control_weights["input_hint_block.0.weight"].shape[1] | |
dtype = model.storage_dtype | |
if dtype in ['nf4', 'fp4', 'gguf']: | |
dtype = torch.float16 | |
controlnet_config["dtype"] = dtype | |
self.manual_cast_dtype = model.computation_dtype | |
with using_forge_operations(operations=ControlLoraOps, dtype=dtype): | |
self.control_model = cldm.ControlNet(**controlnet_config) | |
self.control_model.to(device=memory_management.get_torch_device(), dtype=dtype) | |
diffusion_model = model.diffusion_model | |
sd = diffusion_model.state_dict() | |
for k in sd: | |
weight = sd[k] | |
try: | |
utils.set_attr(self.control_model, k, weight) | |
except: | |
pass | |
for k in self.control_weights: | |
if k not in {"lora_controlnet"}: | |
utils.set_attr(self.control_model, k, self.control_weights[k].to(dtype).to(memory_management.get_torch_device())) | |
def copy(self): | |
c = ControlLora(self.control_weights, global_average_pooling=self.global_average_pooling) | |
self.copy_to(c) | |
return c | |
def cleanup(self): | |
del self.control_model | |
self.control_model = None | |
super().cleanup() | |
def get_models(self): | |
out = ControlBase.get_models(self) | |
return out | |
def inference_memory_requirements(self, dtype): | |
return utils.calculate_parameters(self.control_weights) * memory_management.dtype_size(dtype) + ControlBase.inference_memory_requirements(self, dtype) | |
class T2IAdapter(ControlBase): | |
def __init__(self, t2i_model, channels_in, device=None): | |
super().__init__(device) | |
self.t2i_model = t2i_model | |
self.channels_in = channels_in | |
self.control_input = None | |
def scale_image_to(self, width, height): | |
unshuffle_amount = self.t2i_model.unshuffle_amount | |
width = math.ceil(width / unshuffle_amount) * unshuffle_amount | |
height = math.ceil(height / unshuffle_amount) * unshuffle_amount | |
return width, height | |
def get_control(self, x_noisy, t, cond, batched_number): | |
to = self.transformer_options | |
for conditioning_modifier in to.get('controlnet_conditioning_modifiers', []): | |
x_noisy, t, cond, batched_number = conditioning_modifier(self, x_noisy, t, cond, batched_number) | |
control_prev = None | |
if self.previous_controlnet is not None: | |
control_prev = self.previous_controlnet.get_control(x_noisy, t, cond, batched_number) | |
if self.timestep_range is not None: | |
if t[0] > self.timestep_range[0] or t[0] < self.timestep_range[1]: | |
if control_prev is not None: | |
return control_prev | |
else: | |
return None | |
if self.cond_hint is None or x_noisy.shape[2] * 8 != self.cond_hint.shape[2] or x_noisy.shape[3] * 8 != self.cond_hint.shape[3]: | |
if self.cond_hint is not None: | |
del self.cond_hint | |
self.control_input = None | |
self.cond_hint = None | |
width, height = self.scale_image_to(x_noisy.shape[3] * 8, x_noisy.shape[2] * 8) | |
self.cond_hint = image_resize.adaptive_resize(self.cond_hint_original, width, height, 'nearest-exact', "center").float() | |
if self.channels_in == 1 and self.cond_hint.shape[1] > 1: | |
self.cond_hint = torch.mean(self.cond_hint, 1, keepdim=True) | |
if x_noisy.shape[0] != self.cond_hint.shape[0]: | |
self.cond_hint = broadcast_image_to(self.cond_hint, x_noisy.shape[0], batched_number) | |
if self.control_input is None: | |
self.t2i_model.to(x_noisy.dtype) | |
self.t2i_model.to(self.device) | |
controlnet_model_function_wrapper = to.get('controlnet_model_function_wrapper', None) | |
if controlnet_model_function_wrapper is not None: | |
wrapper_args = dict(hint=self.cond_hint.to(x_noisy.dtype)) | |
wrapper_args['model'] = self | |
wrapper_args['inner_model'] = self.t2i_model | |
wrapper_args['inner_t2i_model'] = self.t2i_model | |
self.control_input = controlnet_model_function_wrapper(**wrapper_args) | |
else: | |
self.control_input = self.t2i_model(self.cond_hint.to(x_noisy)) | |
self.t2i_model.cpu() | |
control_input = list(map(lambda a: None if a is None else a.clone(), self.control_input)) | |
mid = None | |
if self.t2i_model.xl == True: | |
mid = control_input[-1:] | |
control_input = control_input[:-1] | |
return self.control_merge(control_input, mid, control_prev, x_noisy.dtype) | |
def copy(self): | |
c = T2IAdapter(self.t2i_model, self.channels_in) | |
self.copy_to(c) | |
return c | |
def load_t2i_adapter(t2i_data): | |
if 'adapter' in t2i_data: | |
t2i_data = t2i_data['adapter'] | |
if 'adapter.body.0.resnets.0.block1.weight' in t2i_data: # diffusers format | |
prefix_replace = {} | |
for i in range(4): | |
for j in range(2): | |
prefix_replace["adapter.body.{}.resnets.{}.".format(i, j)] = "body.{}.".format(i * 2 + j) | |
prefix_replace["adapter.body.{}.".format(i, j)] = "body.{}.".format(i * 2) | |
prefix_replace["adapter."] = "" | |
t2i_data = state_dict.state_dict_prefix_replace(t2i_data, prefix_replace) | |
keys = t2i_data.keys() | |
if "body.0.in_conv.weight" in keys: | |
cin = t2i_data['body.0.in_conv.weight'].shape[1] | |
model_ad = t2i_adapter.Adapter_light(cin=cin, channels=[320, 640, 1280, 1280], nums_rb=4) | |
elif 'conv_in.weight' in keys: | |
cin = t2i_data['conv_in.weight'].shape[1] | |
channel = t2i_data['conv_in.weight'].shape[0] | |
ksize = t2i_data['body.0.block2.weight'].shape[2] | |
use_conv = False | |
down_opts = list(filter(lambda a: a.endswith("down_opt.op.weight"), keys)) | |
if len(down_opts) > 0: | |
use_conv = True | |
xl = False | |
if cin == 256 or cin == 768: | |
xl = True | |
model_ad = t2i_adapter.Adapter(cin=cin, channels=[channel, channel * 2, channel * 4, channel * 4][:4], nums_rb=2, ksize=ksize, sk=True, use_conv=use_conv, xl=xl) | |
else: | |
return None | |
missing, unexpected = model_ad.load_state_dict(t2i_data) | |
if len(missing) > 0: | |
print("t2i missing", missing) | |
if len(unexpected) > 0: | |
print("t2i unexpected", unexpected) | |
return T2IAdapter(model_ad, model_ad.input_channels) | |