Spaces:
Running
on
Zero
Running
on
Zero
from __future__ import annotations | |
import torch | |
import os | |
import sys | |
import json | |
import hashlib | |
import traceback | |
import math | |
import time | |
import random | |
import logging | |
from PIL import Image, ImageOps, ImageSequence | |
from PIL.PngImagePlugin import PngInfo | |
import numpy as np | |
import safetensors.torch | |
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy")) | |
import comfy.diffusers_load | |
import comfy.samplers | |
import comfy.sample | |
import comfy.sd | |
import comfy.utils | |
import comfy.controlnet | |
from comfy.comfy_types import IO, ComfyNodeABC, InputTypeDict | |
import comfy.clip_vision | |
import comfy.model_management | |
from comfy.cli_args import args | |
import importlib | |
import folder_paths | |
import latent_preview | |
import node_helpers | |
def before_node_execution(): | |
comfy.model_management.throw_exception_if_processing_interrupted() | |
def interrupt_processing(value=True): | |
comfy.model_management.interrupt_current_processing(value) | |
MAX_RESOLUTION=16384 | |
class CLIPTextEncode(ComfyNodeABC): | |
def INPUT_TYPES(s) -> InputTypeDict: | |
return { | |
"required": { | |
"text": (IO.STRING, {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}), | |
"clip": (IO.CLIP, {"tooltip": "The CLIP model used for encoding the text."}) | |
} | |
} | |
RETURN_TYPES = (IO.CONDITIONING,) | |
OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",) | |
FUNCTION = "encode" | |
CATEGORY = "conditioning" | |
DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images." | |
def encode(self, clip, text): | |
tokens = clip.tokenize(text) | |
return (clip.encode_from_tokens_scheduled(tokens), ) | |
class ConditioningCombine: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "combine" | |
CATEGORY = "conditioning" | |
def combine(self, conditioning_1, conditioning_2): | |
return (conditioning_1 + conditioning_2, ) | |
class ConditioningAverage : | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), | |
"conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "addWeighted" | |
CATEGORY = "conditioning" | |
def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): | |
out = [] | |
if len(conditioning_from) > 1: | |
logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") | |
cond_from = conditioning_from[0][0] | |
pooled_output_from = conditioning_from[0][1].get("pooled_output", None) | |
for i in range(len(conditioning_to)): | |
t1 = conditioning_to[i][0] | |
pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) | |
t0 = cond_from[:,:t1.shape[1]] | |
if t0.shape[1] < t1.shape[1]: | |
t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) | |
tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) | |
t_to = conditioning_to[i][1].copy() | |
if pooled_output_from is not None and pooled_output_to is not None: | |
t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) | |
elif pooled_output_from is not None: | |
t_to["pooled_output"] = pooled_output_from | |
n = [tw, t_to] | |
out.append(n) | |
return (out, ) | |
class ConditioningConcat: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"conditioning_to": ("CONDITIONING",), | |
"conditioning_from": ("CONDITIONING",), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "concat" | |
CATEGORY = "conditioning" | |
def concat(self, conditioning_to, conditioning_from): | |
out = [] | |
if len(conditioning_from) > 1: | |
logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") | |
cond_from = conditioning_from[0][0] | |
for i in range(len(conditioning_to)): | |
t1 = conditioning_to[i][0] | |
tw = torch.cat((t1, cond_from),1) | |
n = [tw, conditioning_to[i][1].copy()] | |
out.append(n) | |
return (out, ) | |
class ConditioningSetArea: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), | |
"height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), | |
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "append" | |
CATEGORY = "conditioning" | |
def append(self, conditioning, width, height, x, y, strength): | |
c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8), | |
"strength": strength, | |
"set_area_to_bounds": False}) | |
return (c, ) | |
class ConditioningSetAreaPercentage: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), | |
"height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), | |
"x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), | |
"y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "append" | |
CATEGORY = "conditioning" | |
def append(self, conditioning, width, height, x, y, strength): | |
c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x), | |
"strength": strength, | |
"set_area_to_bounds": False}) | |
return (c, ) | |
class ConditioningSetAreaStrength: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "append" | |
CATEGORY = "conditioning" | |
def append(self, conditioning, strength): | |
c = node_helpers.conditioning_set_values(conditioning, {"strength": strength}) | |
return (c, ) | |
class ConditioningSetMask: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"mask": ("MASK", ), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), | |
"set_cond_area": (["default", "mask bounds"],), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "append" | |
CATEGORY = "conditioning" | |
def append(self, conditioning, mask, set_cond_area, strength): | |
set_area_to_bounds = False | |
if set_cond_area != "default": | |
set_area_to_bounds = True | |
if len(mask.shape) < 3: | |
mask = mask.unsqueeze(0) | |
c = node_helpers.conditioning_set_values(conditioning, {"mask": mask, | |
"set_area_to_bounds": set_area_to_bounds, | |
"mask_strength": strength}) | |
return (c, ) | |
class ConditioningZeroOut: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", )}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "zero_out" | |
CATEGORY = "advanced/conditioning" | |
def zero_out(self, conditioning): | |
c = [] | |
for t in conditioning: | |
d = t[1].copy() | |
pooled_output = d.get("pooled_output", None) | |
if pooled_output is not None: | |
d["pooled_output"] = torch.zeros_like(pooled_output) | |
n = [torch.zeros_like(t[0]), d] | |
c.append(n) | |
return (c, ) | |
class ConditioningSetTimestepRange: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), | |
"end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "set_range" | |
CATEGORY = "advanced/conditioning" | |
def set_range(self, conditioning, start, end): | |
c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start, | |
"end_percent": end}) | |
return (c, ) | |
class VAEDecode: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"samples": ("LATENT", {"tooltip": "The latent to be decoded."}), | |
"vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."}) | |
} | |
} | |
RETURN_TYPES = ("IMAGE",) | |
OUTPUT_TOOLTIPS = ("The decoded image.",) | |
FUNCTION = "decode" | |
CATEGORY = "latent" | |
DESCRIPTION = "Decodes latent images back into pixel space images." | |
def decode(self, vae, samples): | |
images = vae.decode(samples["samples"]) | |
if len(images.shape) == 5: #Combine batches | |
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) | |
return (images, ) | |
class VAEDecodeTiled: | |
def INPUT_TYPES(s): | |
return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), | |
"tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 32}), | |
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), | |
"temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to decode at a time."}), | |
"temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), | |
}} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "decode" | |
CATEGORY = "_for_testing" | |
def decode(self, vae, samples, tile_size, overlap=64, temporal_size=64, temporal_overlap=8): | |
if tile_size < overlap * 4: | |
overlap = tile_size // 4 | |
if temporal_size < temporal_overlap * 2: | |
temporal_overlap = temporal_overlap // 2 | |
temporal_compression = vae.temporal_compression_decode() | |
if temporal_compression is not None: | |
temporal_size = max(2, temporal_size // temporal_compression) | |
temporal_overlap = max(1, min(temporal_size // 2, temporal_overlap // temporal_compression)) | |
else: | |
temporal_size = None | |
temporal_overlap = None | |
compression = vae.spacial_compression_decode() | |
images = vae.decode_tiled(samples["samples"], tile_x=tile_size // compression, tile_y=tile_size // compression, overlap=overlap // compression, tile_t=temporal_size, overlap_t=temporal_overlap) | |
if len(images.shape) == 5: #Combine batches | |
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) | |
return (images, ) | |
class VAEEncode: | |
def INPUT_TYPES(s): | |
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "encode" | |
CATEGORY = "latent" | |
def encode(self, vae, pixels): | |
t = vae.encode(pixels[:,:,:,:3]) | |
return ({"samples":t}, ) | |
class VAEEncodeTiled: | |
def INPUT_TYPES(s): | |
return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), | |
"tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}), | |
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), | |
"temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to encode at a time."}), | |
"temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "encode" | |
CATEGORY = "_for_testing" | |
def encode(self, vae, pixels, tile_size, overlap, temporal_size=64, temporal_overlap=8): | |
t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, overlap=overlap, tile_t=temporal_size, overlap_t=temporal_overlap) | |
return ({"samples": t}, ) | |
class VAEEncodeForInpaint: | |
def INPUT_TYPES(s): | |
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "encode" | |
CATEGORY = "latent/inpaint" | |
def encode(self, vae, pixels, mask, grow_mask_by=6): | |
x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio | |
y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio | |
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") | |
pixels = pixels.clone() | |
if pixels.shape[1] != x or pixels.shape[2] != y: | |
x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2 | |
y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2 | |
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] | |
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] | |
#grow mask by a few pixels to keep things seamless in latent space | |
if grow_mask_by == 0: | |
mask_erosion = mask | |
else: | |
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) | |
padding = math.ceil((grow_mask_by - 1) / 2) | |
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) | |
m = (1.0 - mask.round()).squeeze(1) | |
for i in range(3): | |
pixels[:,:,:,i] -= 0.5 | |
pixels[:,:,:,i] *= m | |
pixels[:,:,:,i] += 0.5 | |
t = vae.encode(pixels) | |
return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) | |
class InpaintModelConditioning: | |
def INPUT_TYPES(s): | |
return {"required": {"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"vae": ("VAE", ), | |
"pixels": ("IMAGE", ), | |
"mask": ("MASK", ), | |
"noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}), | |
}} | |
RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") | |
RETURN_NAMES = ("positive", "negative", "latent") | |
FUNCTION = "encode" | |
CATEGORY = "conditioning/inpaint" | |
def encode(self, positive, negative, pixels, vae, mask, noise_mask=True): | |
x = (pixels.shape[1] // 8) * 8 | |
y = (pixels.shape[2] // 8) * 8 | |
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") | |
orig_pixels = pixels | |
pixels = orig_pixels.clone() | |
if pixels.shape[1] != x or pixels.shape[2] != y: | |
x_offset = (pixels.shape[1] % 8) // 2 | |
y_offset = (pixels.shape[2] % 8) // 2 | |
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] | |
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] | |
m = (1.0 - mask.round()).squeeze(1) | |
for i in range(3): | |
pixels[:,:,:,i] -= 0.5 | |
pixels[:,:,:,i] *= m | |
pixels[:,:,:,i] += 0.5 | |
concat_latent = vae.encode(pixels) | |
orig_latent = vae.encode(orig_pixels) | |
out_latent = {} | |
out_latent["samples"] = orig_latent | |
if noise_mask: | |
out_latent["noise_mask"] = mask | |
out = [] | |
for conditioning in [positive, negative]: | |
c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent, | |
"concat_mask": mask}) | |
out.append(c) | |
return (out[0], out[1], out_latent) | |
class SaveLatent: | |
def __init__(self): | |
self.output_dir = folder_paths.get_output_directory() | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT", ), | |
"filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, | |
} | |
RETURN_TYPES = () | |
FUNCTION = "save" | |
OUTPUT_NODE = True | |
CATEGORY = "_for_testing" | |
def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): | |
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) | |
# support save metadata for latent sharing | |
prompt_info = "" | |
if prompt is not None: | |
prompt_info = json.dumps(prompt) | |
metadata = None | |
if not args.disable_metadata: | |
metadata = {"prompt": prompt_info} | |
if extra_pnginfo is not None: | |
for x in extra_pnginfo: | |
metadata[x] = json.dumps(extra_pnginfo[x]) | |
file = f"{filename}_{counter:05}_.latent" | |
results = list() | |
results.append({ | |
"filename": file, | |
"subfolder": subfolder, | |
"type": "output" | |
}) | |
file = os.path.join(full_output_folder, file) | |
output = {} | |
output["latent_tensor"] = samples["samples"] | |
output["latent_format_version_0"] = torch.tensor([]) | |
comfy.utils.save_torch_file(output, file, metadata=metadata) | |
return { "ui": { "latents": results } } | |
class LoadLatent: | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] | |
return {"required": {"latent": [sorted(files), ]}, } | |
CATEGORY = "_for_testing" | |
RETURN_TYPES = ("LATENT", ) | |
FUNCTION = "load" | |
def load(self, latent): | |
latent_path = folder_paths.get_annotated_filepath(latent) | |
latent = safetensors.torch.load_file(latent_path, device="cpu") | |
multiplier = 1.0 | |
if "latent_format_version_0" not in latent: | |
multiplier = 1.0 / 0.18215 | |
samples = {"samples": latent["latent_tensor"].float() * multiplier} | |
return (samples, ) | |
def IS_CHANGED(s, latent): | |
image_path = folder_paths.get_annotated_filepath(latent) | |
m = hashlib.sha256() | |
with open(image_path, 'rb') as f: | |
m.update(f.read()) | |
return m.digest().hex() | |
def VALIDATE_INPUTS(s, latent): | |
if not folder_paths.exists_annotated_filepath(latent): | |
return "Invalid latent file: {}".format(latent) | |
return True | |
class CheckpointLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ), | |
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}} | |
RETURN_TYPES = ("MODEL", "CLIP", "VAE") | |
FUNCTION = "load_checkpoint" | |
CATEGORY = "advanced/loaders" | |
DEPRECATED = True | |
def load_checkpoint(self, config_name, ckpt_name): | |
config_path = folder_paths.get_full_path("configs", config_name) | |
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) | |
return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) | |
class CheckpointLoaderSimple: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), {"tooltip": "The name of the checkpoint (model) to load."}), | |
} | |
} | |
RETURN_TYPES = ("MODEL", "CLIP", "VAE") | |
OUTPUT_TOOLTIPS = ("The model used for denoising latents.", | |
"The CLIP model used for encoding text prompts.", | |
"The VAE model used for encoding and decoding images to and from latent space.") | |
FUNCTION = "load_checkpoint" | |
CATEGORY = "loaders" | |
DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents." | |
def load_checkpoint(self, ckpt_name): | |
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) | |
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) | |
return out[:3] | |
class DiffusersLoader: | |
def INPUT_TYPES(cls): | |
paths = [] | |
for search_path in folder_paths.get_folder_paths("diffusers"): | |
if os.path.exists(search_path): | |
for root, subdir, files in os.walk(search_path, followlinks=True): | |
if "model_index.json" in files: | |
paths.append(os.path.relpath(root, start=search_path)) | |
return {"required": {"model_path": (paths,), }} | |
RETURN_TYPES = ("MODEL", "CLIP", "VAE") | |
FUNCTION = "load_checkpoint" | |
CATEGORY = "advanced/loaders/deprecated" | |
def load_checkpoint(self, model_path, output_vae=True, output_clip=True): | |
for search_path in folder_paths.get_folder_paths("diffusers"): | |
if os.path.exists(search_path): | |
path = os.path.join(search_path, model_path) | |
if os.path.exists(path): | |
model_path = path | |
break | |
return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) | |
class unCLIPCheckpointLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), | |
}} | |
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") | |
FUNCTION = "load_checkpoint" | |
CATEGORY = "loaders" | |
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): | |
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) | |
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) | |
return out | |
class CLIPSetLastLayer: | |
def INPUT_TYPES(s): | |
return {"required": { "clip": ("CLIP", ), | |
"stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), | |
}} | |
RETURN_TYPES = ("CLIP",) | |
FUNCTION = "set_last_layer" | |
CATEGORY = "conditioning" | |
def set_last_layer(self, clip, stop_at_clip_layer): | |
clip = clip.clone() | |
clip.clip_layer(stop_at_clip_layer) | |
return (clip,) | |
class LoraLoader: | |
def __init__(self): | |
self.loaded_lora = None | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}), | |
"clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}), | |
"lora_name": (folder_paths.get_filename_list("loras"), {"tooltip": "The name of the LoRA."}), | |
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}), | |
"strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}), | |
} | |
} | |
RETURN_TYPES = ("MODEL", "CLIP") | |
OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.") | |
FUNCTION = "load_lora" | |
CATEGORY = "loaders" | |
DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together." | |
def load_lora(self, model, clip, lora_name, strength_model, strength_clip): | |
if strength_model == 0 and strength_clip == 0: | |
return (model, clip) | |
lora_path = folder_paths.get_full_path_or_raise("loras", lora_name) | |
lora = None | |
if self.loaded_lora is not None: | |
if self.loaded_lora[0] == lora_path: | |
lora = self.loaded_lora[1] | |
else: | |
self.loaded_lora = None | |
if lora is None: | |
lora = comfy.utils.load_torch_file(lora_path, safe_load=True) | |
self.loaded_lora = (lora_path, lora) | |
model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) | |
return (model_lora, clip_lora) | |
class LoraLoaderModelOnly(LoraLoader): | |
def INPUT_TYPES(s): | |
return {"required": { "model": ("MODEL",), | |
"lora_name": (folder_paths.get_filename_list("loras"), ), | |
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), | |
}} | |
RETURN_TYPES = ("MODEL",) | |
FUNCTION = "load_lora_model_only" | |
def load_lora_model_only(self, model, lora_name, strength_model): | |
return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) | |
class VAELoader: | |
def vae_list(): | |
vaes = folder_paths.get_filename_list("vae") | |
approx_vaes = folder_paths.get_filename_list("vae_approx") | |
sdxl_taesd_enc = False | |
sdxl_taesd_dec = False | |
sd1_taesd_enc = False | |
sd1_taesd_dec = False | |
sd3_taesd_enc = False | |
sd3_taesd_dec = False | |
f1_taesd_enc = False | |
f1_taesd_dec = False | |
for v in approx_vaes: | |
if v.startswith("taesd_decoder."): | |
sd1_taesd_dec = True | |
elif v.startswith("taesd_encoder."): | |
sd1_taesd_enc = True | |
elif v.startswith("taesdxl_decoder."): | |
sdxl_taesd_dec = True | |
elif v.startswith("taesdxl_encoder."): | |
sdxl_taesd_enc = True | |
elif v.startswith("taesd3_decoder."): | |
sd3_taesd_dec = True | |
elif v.startswith("taesd3_encoder."): | |
sd3_taesd_enc = True | |
elif v.startswith("taef1_encoder."): | |
f1_taesd_dec = True | |
elif v.startswith("taef1_decoder."): | |
f1_taesd_enc = True | |
if sd1_taesd_dec and sd1_taesd_enc: | |
vaes.append("taesd") | |
if sdxl_taesd_dec and sdxl_taesd_enc: | |
vaes.append("taesdxl") | |
if sd3_taesd_dec and sd3_taesd_enc: | |
vaes.append("taesd3") | |
if f1_taesd_dec and f1_taesd_enc: | |
vaes.append("taef1") | |
return vaes | |
def load_taesd(name): | |
sd = {} | |
approx_vaes = folder_paths.get_filename_list("vae_approx") | |
encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) | |
decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) | |
enc = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder)) | |
for k in enc: | |
sd["taesd_encoder.{}".format(k)] = enc[k] | |
dec = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder)) | |
for k in dec: | |
sd["taesd_decoder.{}".format(k)] = dec[k] | |
if name == "taesd": | |
sd["vae_scale"] = torch.tensor(0.18215) | |
sd["vae_shift"] = torch.tensor(0.0) | |
elif name == "taesdxl": | |
sd["vae_scale"] = torch.tensor(0.13025) | |
sd["vae_shift"] = torch.tensor(0.0) | |
elif name == "taesd3": | |
sd["vae_scale"] = torch.tensor(1.5305) | |
sd["vae_shift"] = torch.tensor(0.0609) | |
elif name == "taef1": | |
sd["vae_scale"] = torch.tensor(0.3611) | |
sd["vae_shift"] = torch.tensor(0.1159) | |
return sd | |
def INPUT_TYPES(s): | |
return {"required": { "vae_name": (s.vae_list(), )}} | |
RETURN_TYPES = ("VAE",) | |
FUNCTION = "load_vae" | |
CATEGORY = "loaders" | |
#TODO: scale factor? | |
def load_vae(self, vae_name): | |
if vae_name in ["taesd", "taesdxl", "taesd3", "taef1"]: | |
sd = self.load_taesd(vae_name) | |
else: | |
vae_path = folder_paths.get_full_path_or_raise("vae", vae_name) | |
sd = comfy.utils.load_torch_file(vae_path) | |
vae = comfy.sd.VAE(sd=sd) | |
return (vae,) | |
class ControlNetLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} | |
RETURN_TYPES = ("CONTROL_NET",) | |
FUNCTION = "load_controlnet" | |
CATEGORY = "loaders" | |
def load_controlnet(self, control_net_name): | |
controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) | |
controlnet = comfy.controlnet.load_controlnet(controlnet_path) | |
return (controlnet,) | |
class DiffControlNetLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "model": ("MODEL",), | |
"control_net_name": (folder_paths.get_filename_list("controlnet"), )}} | |
RETURN_TYPES = ("CONTROL_NET",) | |
FUNCTION = "load_controlnet" | |
CATEGORY = "loaders" | |
def load_controlnet(self, model, control_net_name): | |
controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) | |
controlnet = comfy.controlnet.load_controlnet(controlnet_path, model) | |
return (controlnet,) | |
class ControlNetApply: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"control_net": ("CONTROL_NET", ), | |
"image": ("IMAGE", ), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "apply_controlnet" | |
DEPRECATED = True | |
CATEGORY = "conditioning/controlnet" | |
def apply_controlnet(self, conditioning, control_net, image, strength): | |
if strength == 0: | |
return (conditioning, ) | |
c = [] | |
control_hint = image.movedim(-1,1) | |
for t in conditioning: | |
n = [t[0], t[1].copy()] | |
c_net = control_net.copy().set_cond_hint(control_hint, strength) | |
if 'control' in t[1]: | |
c_net.set_previous_controlnet(t[1]['control']) | |
n[1]['control'] = c_net | |
n[1]['control_apply_to_uncond'] = True | |
c.append(n) | |
return (c, ) | |
class ControlNetApplyAdvanced: | |
def INPUT_TYPES(s): | |
return {"required": {"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"control_net": ("CONTROL_NET", ), | |
"image": ("IMAGE", ), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), | |
"start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), | |
"end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) | |
}, | |
"optional": {"vae": ("VAE", ), | |
} | |
} | |
RETURN_TYPES = ("CONDITIONING","CONDITIONING") | |
RETURN_NAMES = ("positive", "negative") | |
FUNCTION = "apply_controlnet" | |
CATEGORY = "conditioning/controlnet" | |
def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]): | |
if strength == 0: | |
return (positive, negative) | |
control_hint = image.movedim(-1,1) | |
cnets = {} | |
out = [] | |
for conditioning in [positive, negative]: | |
c = [] | |
for t in conditioning: | |
d = t[1].copy() | |
prev_cnet = d.get('control', None) | |
if prev_cnet in cnets: | |
c_net = cnets[prev_cnet] | |
else: | |
c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat) | |
c_net.set_previous_controlnet(prev_cnet) | |
cnets[prev_cnet] = c_net | |
d['control'] = c_net | |
d['control_apply_to_uncond'] = False | |
n = [t[0], d] | |
c.append(n) | |
out.append(c) | |
return (out[0], out[1]) | |
class UNETLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "unet_name": (folder_paths.get_filename_list("diffusion_models"), ), | |
"weight_dtype": (["default", "fp8_e4m3fn", "fp8_e4m3fn_fast", "fp8_e5m2"],) | |
}} | |
RETURN_TYPES = ("MODEL",) | |
FUNCTION = "load_unet" | |
CATEGORY = "advanced/loaders" | |
def load_unet(self, unet_name, weight_dtype): | |
model_options = {} | |
if weight_dtype == "fp8_e4m3fn": | |
model_options["dtype"] = torch.float8_e4m3fn | |
elif weight_dtype == "fp8_e4m3fn_fast": | |
model_options["dtype"] = torch.float8_e4m3fn | |
model_options["fp8_optimizations"] = True | |
elif weight_dtype == "fp8_e5m2": | |
model_options["dtype"] = torch.float8_e5m2 | |
unet_path = folder_paths.get_full_path_or_raise("diffusion_models", unet_name) | |
model = comfy.sd.load_diffusion_model(unet_path, model_options=model_options) | |
return (model,) | |
class CLIPLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "clip_name": (folder_paths.get_filename_list("text_encoders"), ), | |
"type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart", "cosmos"], ), | |
}, | |
"optional": { | |
"device": (["default", "cpu"], {"advanced": True}), | |
}} | |
RETURN_TYPES = ("CLIP",) | |
FUNCTION = "load_clip" | |
CATEGORY = "advanced/loaders" | |
DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 / clip-g / clip-l\nstable_audio: t5\nmochi: t5\ncosmos: old t5 xxl" | |
def load_clip(self, clip_name, type="stable_diffusion", device="default"): | |
if type == "stable_cascade": | |
clip_type = comfy.sd.CLIPType.STABLE_CASCADE | |
elif type == "sd3": | |
clip_type = comfy.sd.CLIPType.SD3 | |
elif type == "stable_audio": | |
clip_type = comfy.sd.CLIPType.STABLE_AUDIO | |
elif type == "mochi": | |
clip_type = comfy.sd.CLIPType.MOCHI | |
elif type == "ltxv": | |
clip_type = comfy.sd.CLIPType.LTXV | |
elif type == "pixart": | |
clip_type = comfy.sd.CLIPType.PIXART | |
else: | |
clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION | |
model_options = {} | |
if device == "cpu": | |
model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") | |
clip_path = folder_paths.get_full_path_or_raise("text_encoders", clip_name) | |
clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) | |
return (clip,) | |
class DualCLIPLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "clip_name1": (folder_paths.get_filename_list("text_encoders"), ), | |
"clip_name2": (folder_paths.get_filename_list("text_encoders"), ), | |
"type": (["sdxl", "sd3", "flux", "hunyuan_video"], ), | |
}, | |
"optional": { | |
"device": (["default", "cpu"], {"advanced": True}), | |
}} | |
RETURN_TYPES = ("CLIP",) | |
FUNCTION = "load_clip" | |
CATEGORY = "advanced/loaders" | |
DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5" | |
def load_clip(self, clip_name1, clip_name2, type, device="default"): | |
clip_path1 = folder_paths.get_full_path_or_raise("text_encoders", clip_name1) | |
clip_path2 = folder_paths.get_full_path_or_raise("text_encoders", clip_name2) | |
if type == "sdxl": | |
clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION | |
elif type == "sd3": | |
clip_type = comfy.sd.CLIPType.SD3 | |
elif type == "flux": | |
clip_type = comfy.sd.CLIPType.FLUX | |
elif type == "hunyuan_video": | |
clip_type = comfy.sd.CLIPType.HUNYUAN_VIDEO | |
model_options = {} | |
if device == "cpu": | |
model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") | |
clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) | |
return (clip,) | |
class CLIPVisionLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ), | |
}} | |
RETURN_TYPES = ("CLIP_VISION",) | |
FUNCTION = "load_clip" | |
CATEGORY = "loaders" | |
def load_clip(self, clip_name): | |
clip_path = folder_paths.get_full_path_or_raise("clip_vision", clip_name) | |
clip_vision = comfy.clip_vision.load(clip_path) | |
return (clip_vision,) | |
class CLIPVisionEncode: | |
def INPUT_TYPES(s): | |
return {"required": { "clip_vision": ("CLIP_VISION",), | |
"image": ("IMAGE",), | |
"crop": (["center", "none"],) | |
}} | |
RETURN_TYPES = ("CLIP_VISION_OUTPUT",) | |
FUNCTION = "encode" | |
CATEGORY = "conditioning" | |
def encode(self, clip_vision, image, crop): | |
crop_image = True | |
if crop != "center": | |
crop_image = False | |
output = clip_vision.encode_image(image, crop=crop_image) | |
return (output,) | |
class StyleModelLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}} | |
RETURN_TYPES = ("STYLE_MODEL",) | |
FUNCTION = "load_style_model" | |
CATEGORY = "loaders" | |
def load_style_model(self, style_model_name): | |
style_model_path = folder_paths.get_full_path_or_raise("style_models", style_model_name) | |
style_model = comfy.sd.load_style_model(style_model_path) | |
return (style_model,) | |
class StyleModelApply: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"style_model": ("STYLE_MODEL", ), | |
"clip_vision_output": ("CLIP_VISION_OUTPUT", ), | |
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.001}), | |
"strength_type": (["multiply", "attn_bias"], ), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "apply_stylemodel" | |
CATEGORY = "conditioning/style_model" | |
def apply_stylemodel(self, conditioning, style_model, clip_vision_output, strength, strength_type): | |
cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) | |
if strength_type == "multiply": | |
cond *= strength | |
n = cond.shape[1] | |
c_out = [] | |
for t in conditioning: | |
(txt, keys) = t | |
keys = keys.copy() | |
if strength_type == "attn_bias" and strength != 1.0: | |
# math.log raises an error if the argument is zero | |
# torch.log returns -inf, which is what we want | |
attn_bias = torch.log(torch.Tensor([strength])) | |
# get the size of the mask image | |
mask_ref_size = keys.get("attention_mask_img_shape", (1, 1)) | |
n_ref = mask_ref_size[0] * mask_ref_size[1] | |
n_txt = txt.shape[1] | |
# grab the existing mask | |
mask = keys.get("attention_mask", None) | |
# create a default mask if it doesn't exist | |
if mask is None: | |
mask = torch.zeros((txt.shape[0], n_txt + n_ref, n_txt + n_ref), dtype=torch.float16) | |
# convert the mask dtype, because it might be boolean | |
# we want it to be interpreted as a bias | |
if mask.dtype == torch.bool: | |
# log(True) = log(1) = 0 | |
# log(False) = log(0) = -inf | |
mask = torch.log(mask.to(dtype=torch.float16)) | |
# now we make the mask bigger to add space for our new tokens | |
new_mask = torch.zeros((txt.shape[0], n_txt + n + n_ref, n_txt + n + n_ref), dtype=torch.float16) | |
# copy over the old mask, in quandrants | |
new_mask[:, :n_txt, :n_txt] = mask[:, :n_txt, :n_txt] | |
new_mask[:, :n_txt, n_txt+n:] = mask[:, :n_txt, n_txt:] | |
new_mask[:, n_txt+n:, :n_txt] = mask[:, n_txt:, :n_txt] | |
new_mask[:, n_txt+n:, n_txt+n:] = mask[:, n_txt:, n_txt:] | |
# now fill in the attention bias to our redux tokens | |
new_mask[:, :n_txt, n_txt:n_txt+n] = attn_bias | |
new_mask[:, n_txt+n:, n_txt:n_txt+n] = attn_bias | |
keys["attention_mask"] = new_mask.to(txt.device) | |
keys["attention_mask_img_shape"] = mask_ref_size | |
c_out.append([torch.cat((txt, cond), dim=1), keys]) | |
return (c_out,) | |
class unCLIPConditioning: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning": ("CONDITIONING", ), | |
"clip_vision_output": ("CLIP_VISION_OUTPUT", ), | |
"strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), | |
"noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "apply_adm" | |
CATEGORY = "conditioning" | |
def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): | |
if strength == 0: | |
return (conditioning, ) | |
c = [] | |
for t in conditioning: | |
o = t[1].copy() | |
x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation} | |
if "unclip_conditioning" in o: | |
o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x] | |
else: | |
o["unclip_conditioning"] = [x] | |
n = [t[0], o] | |
c.append(n) | |
return (c, ) | |
class GLIGENLoader: | |
def INPUT_TYPES(s): | |
return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}} | |
RETURN_TYPES = ("GLIGEN",) | |
FUNCTION = "load_gligen" | |
CATEGORY = "loaders" | |
def load_gligen(self, gligen_name): | |
gligen_path = folder_paths.get_full_path_or_raise("gligen", gligen_name) | |
gligen = comfy.sd.load_gligen(gligen_path) | |
return (gligen,) | |
class GLIGENTextBoxApply: | |
def INPUT_TYPES(s): | |
return {"required": {"conditioning_to": ("CONDITIONING", ), | |
"clip": ("CLIP", ), | |
"gligen_textbox_model": ("GLIGEN", ), | |
"text": ("STRING", {"multiline": True, "dynamicPrompts": True}), | |
"width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), | |
"height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), | |
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
}} | |
RETURN_TYPES = ("CONDITIONING",) | |
FUNCTION = "append" | |
CATEGORY = "conditioning/gligen" | |
def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): | |
c = [] | |
cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") | |
for t in conditioning_to: | |
n = [t[0], t[1].copy()] | |
position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] | |
prev = [] | |
if "gligen" in n[1]: | |
prev = n[1]['gligen'][2] | |
n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) | |
c.append(n) | |
return (c, ) | |
class EmptyLatentImage: | |
def __init__(self): | |
self.device = comfy.model_management.intermediate_device() | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}), | |
"height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}), | |
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."}) | |
} | |
} | |
RETURN_TYPES = ("LATENT",) | |
OUTPUT_TOOLTIPS = ("The empty latent image batch.",) | |
FUNCTION = "generate" | |
CATEGORY = "latent" | |
DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling." | |
def generate(self, width, height, batch_size=1): | |
latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) | |
return ({"samples":latent}, ) | |
class LatentFromBatch: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), | |
"batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), | |
"length": ("INT", {"default": 1, "min": 1, "max": 64}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "frombatch" | |
CATEGORY = "latent/batch" | |
def frombatch(self, samples, batch_index, length): | |
s = samples.copy() | |
s_in = samples["samples"] | |
batch_index = min(s_in.shape[0] - 1, batch_index) | |
length = min(s_in.shape[0] - batch_index, length) | |
s["samples"] = s_in[batch_index:batch_index + length].clone() | |
if "noise_mask" in samples: | |
masks = samples["noise_mask"] | |
if masks.shape[0] == 1: | |
s["noise_mask"] = masks.clone() | |
else: | |
if masks.shape[0] < s_in.shape[0]: | |
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] | |
s["noise_mask"] = masks[batch_index:batch_index + length].clone() | |
if "batch_index" not in s: | |
s["batch_index"] = [x for x in range(batch_index, batch_index+length)] | |
else: | |
s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] | |
return (s,) | |
class RepeatLatentBatch: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), | |
"amount": ("INT", {"default": 1, "min": 1, "max": 64}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "repeat" | |
CATEGORY = "latent/batch" | |
def repeat(self, samples, amount): | |
s = samples.copy() | |
s_in = samples["samples"] | |
s["samples"] = s_in.repeat((amount, 1,1,1)) | |
if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: | |
masks = samples["noise_mask"] | |
if masks.shape[0] < s_in.shape[0]: | |
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] | |
s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1)) | |
if "batch_index" in s: | |
offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 | |
s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] | |
return (s,) | |
class LatentUpscale: | |
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] | |
crop_methods = ["disabled", "center"] | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), | |
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"crop": (s.crop_methods,)}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "upscale" | |
CATEGORY = "latent" | |
def upscale(self, samples, upscale_method, width, height, crop): | |
if width == 0 and height == 0: | |
s = samples | |
else: | |
s = samples.copy() | |
if width == 0: | |
height = max(64, height) | |
width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2])) | |
elif height == 0: | |
width = max(64, width) | |
height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1])) | |
else: | |
width = max(64, width) | |
height = max(64, height) | |
s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) | |
return (s,) | |
class LatentUpscaleBy: | |
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), | |
"scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "upscale" | |
CATEGORY = "latent" | |
def upscale(self, samples, upscale_method, scale_by): | |
s = samples.copy() | |
width = round(samples["samples"].shape[-1] * scale_by) | |
height = round(samples["samples"].shape[-2] * scale_by) | |
s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") | |
return (s,) | |
class LatentRotate: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), | |
"rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "rotate" | |
CATEGORY = "latent/transform" | |
def rotate(self, samples, rotation): | |
s = samples.copy() | |
rotate_by = 0 | |
if rotation.startswith("90"): | |
rotate_by = 1 | |
elif rotation.startswith("180"): | |
rotate_by = 2 | |
elif rotation.startswith("270"): | |
rotate_by = 3 | |
s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) | |
return (s,) | |
class LatentFlip: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), | |
"flip_method": (["x-axis: vertically", "y-axis: horizontally"],), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "flip" | |
CATEGORY = "latent/transform" | |
def flip(self, samples, flip_method): | |
s = samples.copy() | |
if flip_method.startswith("x"): | |
s["samples"] = torch.flip(samples["samples"], dims=[2]) | |
elif flip_method.startswith("y"): | |
s["samples"] = torch.flip(samples["samples"], dims=[3]) | |
return (s,) | |
class LatentComposite: | |
def INPUT_TYPES(s): | |
return {"required": { "samples_to": ("LATENT",), | |
"samples_from": ("LATENT",), | |
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "composite" | |
CATEGORY = "latent" | |
def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): | |
x = x // 8 | |
y = y // 8 | |
feather = feather // 8 | |
samples_out = samples_to.copy() | |
s = samples_to["samples"].clone() | |
samples_to = samples_to["samples"] | |
samples_from = samples_from["samples"] | |
if feather == 0: | |
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] | |
else: | |
samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] | |
mask = torch.ones_like(samples_from) | |
for t in range(feather): | |
if y != 0: | |
mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) | |
if y + samples_from.shape[2] < samples_to.shape[2]: | |
mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) | |
if x != 0: | |
mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) | |
if x + samples_from.shape[3] < samples_to.shape[3]: | |
mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) | |
rev_mask = torch.ones_like(mask) - mask | |
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask | |
samples_out["samples"] = s | |
return (samples_out,) | |
class LatentBlend: | |
def INPUT_TYPES(s): | |
return {"required": { | |
"samples1": ("LATENT",), | |
"samples2": ("LATENT",), | |
"blend_factor": ("FLOAT", { | |
"default": 0.5, | |
"min": 0, | |
"max": 1, | |
"step": 0.01 | |
}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "blend" | |
CATEGORY = "_for_testing" | |
def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): | |
samples_out = samples1.copy() | |
samples1 = samples1["samples"] | |
samples2 = samples2["samples"] | |
if samples1.shape != samples2.shape: | |
samples2.permute(0, 3, 1, 2) | |
samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') | |
samples2.permute(0, 2, 3, 1) | |
samples_blended = self.blend_mode(samples1, samples2, blend_mode) | |
samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) | |
samples_out["samples"] = samples_blended | |
return (samples_out,) | |
def blend_mode(self, img1, img2, mode): | |
if mode == "normal": | |
return img2 | |
else: | |
raise ValueError(f"Unsupported blend mode: {mode}") | |
class LatentCrop: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), | |
"width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), | |
"height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), | |
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "crop" | |
CATEGORY = "latent/transform" | |
def crop(self, samples, width, height, x, y): | |
s = samples.copy() | |
samples = samples['samples'] | |
x = x // 8 | |
y = y // 8 | |
#enfonce minimum size of 64 | |
if x > (samples.shape[3] - 8): | |
x = samples.shape[3] - 8 | |
if y > (samples.shape[2] - 8): | |
y = samples.shape[2] - 8 | |
new_height = height // 8 | |
new_width = width // 8 | |
to_x = new_width + x | |
to_y = new_height + y | |
s['samples'] = samples[:,:,y:to_y, x:to_x] | |
return (s,) | |
class SetLatentNoiseMask: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT",), | |
"mask": ("MASK",), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "set_mask" | |
CATEGORY = "latent/inpaint" | |
def set_mask(self, samples, mask): | |
s = samples.copy() | |
s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) | |
return (s,) | |
def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): | |
latent_image = latent["samples"] | |
latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) | |
if disable_noise: | |
noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") | |
else: | |
batch_inds = latent["batch_index"] if "batch_index" in latent else None | |
noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) | |
noise_mask = None | |
if "noise_mask" in latent: | |
noise_mask = latent["noise_mask"] | |
callback = latent_preview.prepare_callback(model, steps) | |
disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED | |
samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, | |
denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, | |
force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) | |
out = latent.copy() | |
out["samples"] = samples | |
return (out, ) | |
class KSampler: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}), | |
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "The random seed used for creating the noise."}), | |
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}), | |
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}), | |
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}), | |
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}), | |
"positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}), | |
"negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}), | |
"latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}), | |
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}), | |
} | |
} | |
RETURN_TYPES = ("LATENT",) | |
OUTPUT_TOOLTIPS = ("The denoised latent.",) | |
FUNCTION = "sample" | |
CATEGORY = "sampling" | |
DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image." | |
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): | |
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) | |
class KSamplerAdvanced: | |
def INPUT_TYPES(s): | |
return {"required": | |
{"model": ("MODEL",), | |
"add_noise": (["enable", "disable"], ), | |
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), | |
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}), | |
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), | |
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), | |
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), | |
"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"latent_image": ("LATENT", ), | |
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), | |
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), | |
"return_with_leftover_noise": (["disable", "enable"], ), | |
} | |
} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "sample" | |
CATEGORY = "sampling" | |
def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): | |
force_full_denoise = True | |
if return_with_leftover_noise == "enable": | |
force_full_denoise = False | |
disable_noise = False | |
if add_noise == "disable": | |
disable_noise = True | |
return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) | |
class SaveImage: | |
def __init__(self): | |
self.output_dir = folder_paths.get_output_directory() | |
self.type = "output" | |
self.prefix_append = "" | |
self.compress_level = 4 | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"images": ("IMAGE", {"tooltip": "The images to save."}), | |
"filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."}) | |
}, | |
"hidden": { | |
"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO" | |
}, | |
} | |
RETURN_TYPES = () | |
FUNCTION = "save_images" | |
OUTPUT_NODE = True | |
CATEGORY = "image" | |
DESCRIPTION = "Saves the input images to your ComfyUI output directory." | |
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): | |
filename_prefix += self.prefix_append | |
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) | |
results = list() | |
for (batch_number, image) in enumerate(images): | |
i = 255. * image.cpu().numpy() | |
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) | |
metadata = None | |
if not args.disable_metadata: | |
metadata = PngInfo() | |
if prompt is not None: | |
metadata.add_text("prompt", json.dumps(prompt)) | |
if extra_pnginfo is not None: | |
for x in extra_pnginfo: | |
metadata.add_text(x, json.dumps(extra_pnginfo[x])) | |
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) | |
file = f"{filename_with_batch_num}_{counter:05}_.png" | |
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level) | |
results.append({ | |
"filename": file, | |
"subfolder": subfolder, | |
"type": self.type | |
}) | |
counter += 1 | |
return { "ui": { "images": results } } | |
class PreviewImage(SaveImage): | |
def __init__(self): | |
self.output_dir = folder_paths.get_temp_directory() | |
self.type = "temp" | |
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) | |
self.compress_level = 1 | |
def INPUT_TYPES(s): | |
return {"required": | |
{"images": ("IMAGE", ), }, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, | |
} | |
class LoadImage: | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] | |
return {"required": | |
{"image": (sorted(files), {"image_upload": True})}, | |
} | |
CATEGORY = "image" | |
RETURN_TYPES = ("IMAGE", "MASK") | |
FUNCTION = "load_image" | |
def load_image(self, image): | |
image_path = folder_paths.get_annotated_filepath(image) | |
img = node_helpers.pillow(Image.open, image_path) | |
output_images = [] | |
output_masks = [] | |
w, h = None, None | |
excluded_formats = ['MPO'] | |
for i in ImageSequence.Iterator(img): | |
i = node_helpers.pillow(ImageOps.exif_transpose, i) | |
if i.mode == 'I': | |
i = i.point(lambda i: i * (1 / 255)) | |
image = i.convert("RGB") | |
if len(output_images) == 0: | |
w = image.size[0] | |
h = image.size[1] | |
if image.size[0] != w or image.size[1] != h: | |
continue | |
image = np.array(image).astype(np.float32) / 255.0 | |
image = torch.from_numpy(image)[None,] | |
if 'A' in i.getbands(): | |
mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 | |
mask = 1. - torch.from_numpy(mask) | |
else: | |
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") | |
output_images.append(image) | |
output_masks.append(mask.unsqueeze(0)) | |
if len(output_images) > 1 and img.format not in excluded_formats: | |
output_image = torch.cat(output_images, dim=0) | |
output_mask = torch.cat(output_masks, dim=0) | |
else: | |
output_image = output_images[0] | |
output_mask = output_masks[0] | |
return (output_image, output_mask) | |
def IS_CHANGED(s, image): | |
image_path = folder_paths.get_annotated_filepath(image) | |
m = hashlib.sha256() | |
with open(image_path, 'rb') as f: | |
m.update(f.read()) | |
return m.digest().hex() | |
def VALIDATE_INPUTS(s, image): | |
if not folder_paths.exists_annotated_filepath(image): | |
return "Invalid image file: {}".format(image) | |
return True | |
class LoadImageMask: | |
_color_channels = ["alpha", "red", "green", "blue"] | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] | |
return {"required": | |
{"image": (sorted(files), {"image_upload": True}), | |
"channel": (s._color_channels, ), } | |
} | |
CATEGORY = "mask" | |
RETURN_TYPES = ("MASK",) | |
FUNCTION = "load_image" | |
def load_image(self, image, channel): | |
image_path = folder_paths.get_annotated_filepath(image) | |
i = node_helpers.pillow(Image.open, image_path) | |
i = node_helpers.pillow(ImageOps.exif_transpose, i) | |
if i.getbands() != ("R", "G", "B", "A"): | |
if i.mode == 'I': | |
i = i.point(lambda i: i * (1 / 255)) | |
i = i.convert("RGBA") | |
mask = None | |
c = channel[0].upper() | |
if c in i.getbands(): | |
mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 | |
mask = torch.from_numpy(mask) | |
if c == 'A': | |
mask = 1. - mask | |
else: | |
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") | |
return (mask.unsqueeze(0),) | |
def IS_CHANGED(s, image, channel): | |
image_path = folder_paths.get_annotated_filepath(image) | |
m = hashlib.sha256() | |
with open(image_path, 'rb') as f: | |
m.update(f.read()) | |
return m.digest().hex() | |
def VALIDATE_INPUTS(s, image): | |
if not folder_paths.exists_annotated_filepath(image): | |
return "Invalid image file: {}".format(image) | |
return True | |
class ImageScale: | |
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] | |
crop_methods = ["disabled", "center"] | |
def INPUT_TYPES(s): | |
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), | |
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), | |
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), | |
"crop": (s.crop_methods,)}} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "upscale" | |
CATEGORY = "image/upscaling" | |
def upscale(self, image, upscale_method, width, height, crop): | |
if width == 0 and height == 0: | |
s = image | |
else: | |
samples = image.movedim(-1,1) | |
if width == 0: | |
width = max(1, round(samples.shape[3] * height / samples.shape[2])) | |
elif height == 0: | |
height = max(1, round(samples.shape[2] * width / samples.shape[3])) | |
s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop) | |
s = s.movedim(1,-1) | |
return (s,) | |
class ImageScaleBy: | |
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] | |
def INPUT_TYPES(s): | |
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), | |
"scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "upscale" | |
CATEGORY = "image/upscaling" | |
def upscale(self, image, upscale_method, scale_by): | |
samples = image.movedim(-1,1) | |
width = round(samples.shape[3] * scale_by) | |
height = round(samples.shape[2] * scale_by) | |
s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled") | |
s = s.movedim(1,-1) | |
return (s,) | |
class ImageInvert: | |
def INPUT_TYPES(s): | |
return {"required": { "image": ("IMAGE",)}} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "invert" | |
CATEGORY = "image" | |
def invert(self, image): | |
s = 1.0 - image | |
return (s,) | |
class ImageBatch: | |
def INPUT_TYPES(s): | |
return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "batch" | |
CATEGORY = "image" | |
def batch(self, image1, image2): | |
if image1.shape[1:] != image2.shape[1:]: | |
image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) | |
s = torch.cat((image1, image2), dim=0) | |
return (s,) | |
class EmptyImage: | |
def __init__(self, device="cpu"): | |
self.device = device | |
def INPUT_TYPES(s): | |
return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), | |
"height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), | |
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), | |
"color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), | |
}} | |
RETURN_TYPES = ("IMAGE",) | |
FUNCTION = "generate" | |
CATEGORY = "image" | |
def generate(self, width, height, batch_size=1, color=0): | |
r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) | |
g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) | |
b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) | |
return (torch.cat((r, g, b), dim=-1), ) | |
class ImagePadForOutpaint: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"image": ("IMAGE",), | |
"left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), | |
"feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), | |
} | |
} | |
RETURN_TYPES = ("IMAGE", "MASK") | |
FUNCTION = "expand_image" | |
CATEGORY = "image" | |
def expand_image(self, image, left, top, right, bottom, feathering): | |
d1, d2, d3, d4 = image.size() | |
new_image = torch.ones( | |
(d1, d2 + top + bottom, d3 + left + right, d4), | |
dtype=torch.float32, | |
) * 0.5 | |
new_image[:, top:top + d2, left:left + d3, :] = image | |
mask = torch.ones( | |
(d2 + top + bottom, d3 + left + right), | |
dtype=torch.float32, | |
) | |
t = torch.zeros( | |
(d2, d3), | |
dtype=torch.float32 | |
) | |
if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: | |
for i in range(d2): | |
for j in range(d3): | |
dt = i if top != 0 else d2 | |
db = d2 - i if bottom != 0 else d2 | |
dl = j if left != 0 else d3 | |
dr = d3 - j if right != 0 else d3 | |
d = min(dt, db, dl, dr) | |
if d >= feathering: | |
continue | |
v = (feathering - d) / feathering | |
t[i, j] = v * v | |
mask[top:top + d2, left:left + d3] = t | |
return (new_image, mask) | |
NODE_CLASS_MAPPINGS = { | |
"KSampler": KSampler, | |
"CheckpointLoaderSimple": CheckpointLoaderSimple, | |
"CLIPTextEncode": CLIPTextEncode, | |
"CLIPSetLastLayer": CLIPSetLastLayer, | |
"VAEDecode": VAEDecode, | |
"VAEEncode": VAEEncode, | |
"VAEEncodeForInpaint": VAEEncodeForInpaint, | |
"VAELoader": VAELoader, | |
"EmptyLatentImage": EmptyLatentImage, | |
"LatentUpscale": LatentUpscale, | |
"LatentUpscaleBy": LatentUpscaleBy, | |
"LatentFromBatch": LatentFromBatch, | |
"RepeatLatentBatch": RepeatLatentBatch, | |
"SaveImage": SaveImage, | |
"PreviewImage": PreviewImage, | |
"LoadImage": LoadImage, | |
"LoadImageMask": LoadImageMask, | |
"ImageScale": ImageScale, | |
"ImageScaleBy": ImageScaleBy, | |
"ImageInvert": ImageInvert, | |
"ImageBatch": ImageBatch, | |
"ImagePadForOutpaint": ImagePadForOutpaint, | |
"EmptyImage": EmptyImage, | |
"ConditioningAverage": ConditioningAverage , | |
"ConditioningCombine": ConditioningCombine, | |
"ConditioningConcat": ConditioningConcat, | |
"ConditioningSetArea": ConditioningSetArea, | |
"ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, | |
"ConditioningSetAreaStrength": ConditioningSetAreaStrength, | |
"ConditioningSetMask": ConditioningSetMask, | |
"KSamplerAdvanced": KSamplerAdvanced, | |
"SetLatentNoiseMask": SetLatentNoiseMask, | |
"LatentComposite": LatentComposite, | |
"LatentBlend": LatentBlend, | |
"LatentRotate": LatentRotate, | |
"LatentFlip": LatentFlip, | |
"LatentCrop": LatentCrop, | |
"LoraLoader": LoraLoader, | |
"CLIPLoader": CLIPLoader, | |
"UNETLoader": UNETLoader, | |
"DualCLIPLoader": DualCLIPLoader, | |
"CLIPVisionEncode": CLIPVisionEncode, | |
"StyleModelApply": StyleModelApply, | |
"unCLIPConditioning": unCLIPConditioning, | |
"ControlNetApply": ControlNetApply, | |
"ControlNetApplyAdvanced": ControlNetApplyAdvanced, | |
"ControlNetLoader": ControlNetLoader, | |
"DiffControlNetLoader": DiffControlNetLoader, | |
"StyleModelLoader": StyleModelLoader, | |
"CLIPVisionLoader": CLIPVisionLoader, | |
"VAEDecodeTiled": VAEDecodeTiled, | |
"VAEEncodeTiled": VAEEncodeTiled, | |
"unCLIPCheckpointLoader": unCLIPCheckpointLoader, | |
"GLIGENLoader": GLIGENLoader, | |
"GLIGENTextBoxApply": GLIGENTextBoxApply, | |
"InpaintModelConditioning": InpaintModelConditioning, | |
"CheckpointLoader": CheckpointLoader, | |
"DiffusersLoader": DiffusersLoader, | |
"LoadLatent": LoadLatent, | |
"SaveLatent": SaveLatent, | |
"ConditioningZeroOut": ConditioningZeroOut, | |
"ConditioningSetTimestepRange": ConditioningSetTimestepRange, | |
"LoraLoaderModelOnly": LoraLoaderModelOnly, | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
# Sampling | |
"KSampler": "KSampler", | |
"KSamplerAdvanced": "KSampler (Advanced)", | |
# Loaders | |
"CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", | |
"CheckpointLoaderSimple": "Load Checkpoint", | |
"VAELoader": "Load VAE", | |
"LoraLoader": "Load LoRA", | |
"CLIPLoader": "Load CLIP", | |
"ControlNetLoader": "Load ControlNet Model", | |
"DiffControlNetLoader": "Load ControlNet Model (diff)", | |
"StyleModelLoader": "Load Style Model", | |
"CLIPVisionLoader": "Load CLIP Vision", | |
"UpscaleModelLoader": "Load Upscale Model", | |
"UNETLoader": "Load Diffusion Model", | |
# Conditioning | |
"CLIPVisionEncode": "CLIP Vision Encode", | |
"StyleModelApply": "Apply Style Model", | |
"CLIPTextEncode": "CLIP Text Encode (Prompt)", | |
"CLIPSetLastLayer": "CLIP Set Last Layer", | |
"ConditioningCombine": "Conditioning (Combine)", | |
"ConditioningAverage ": "Conditioning (Average)", | |
"ConditioningConcat": "Conditioning (Concat)", | |
"ConditioningSetArea": "Conditioning (Set Area)", | |
"ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", | |
"ConditioningSetMask": "Conditioning (Set Mask)", | |
"ControlNetApply": "Apply ControlNet (OLD)", | |
"ControlNetApplyAdvanced": "Apply ControlNet", | |
# Latent | |
"VAEEncodeForInpaint": "VAE Encode (for Inpainting)", | |
"SetLatentNoiseMask": "Set Latent Noise Mask", | |
"VAEDecode": "VAE Decode", | |
"VAEEncode": "VAE Encode", | |
"LatentRotate": "Rotate Latent", | |
"LatentFlip": "Flip Latent", | |
"LatentCrop": "Crop Latent", | |
"EmptyLatentImage": "Empty Latent Image", | |
"LatentUpscale": "Upscale Latent", | |
"LatentUpscaleBy": "Upscale Latent By", | |
"LatentComposite": "Latent Composite", | |
"LatentBlend": "Latent Blend", | |
"LatentFromBatch" : "Latent From Batch", | |
"RepeatLatentBatch": "Repeat Latent Batch", | |
# Image | |
"SaveImage": "Save Image", | |
"PreviewImage": "Preview Image", | |
"LoadImage": "Load Image", | |
"LoadImageMask": "Load Image (as Mask)", | |
"ImageScale": "Upscale Image", | |
"ImageScaleBy": "Upscale Image By", | |
"ImageUpscaleWithModel": "Upscale Image (using Model)", | |
"ImageInvert": "Invert Image", | |
"ImagePadForOutpaint": "Pad Image for Outpainting", | |
"ImageBatch": "Batch Images", | |
"ImageCrop": "Image Crop", | |
"ImageBlend": "Image Blend", | |
"ImageBlur": "Image Blur", | |
"ImageQuantize": "Image Quantize", | |
"ImageSharpen": "Image Sharpen", | |
"ImageScaleToTotalPixels": "Scale Image to Total Pixels", | |
# _for_testing | |
"VAEDecodeTiled": "VAE Decode (Tiled)", | |
"VAEEncodeTiled": "VAE Encode (Tiled)", | |
} | |
EXTENSION_WEB_DIRS = {} | |
# Dictionary of successfully loaded module names and associated directories. | |
LOADED_MODULE_DIRS = {} | |
def get_module_name(module_path: str) -> str: | |
""" | |
Returns the module name based on the given module path. | |
Examples: | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.py") -> "my_custom_node" | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node") -> "my_custom_node" | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/") -> "my_custom_node" | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__.py") -> "my_custom_node" | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__") -> "my_custom_node" | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__/") -> "my_custom_node" | |
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.disabled") -> "custom_nodes | |
Args: | |
module_path (str): The path of the module. | |
Returns: | |
str: The module name. | |
""" | |
base_path = os.path.basename(module_path) | |
if os.path.isfile(module_path): | |
base_path = os.path.splitext(base_path)[0] | |
return base_path | |
def load_custom_node(module_path: str, ignore=set(), module_parent="custom_nodes") -> bool: | |
module_name = os.path.basename(module_path) | |
if os.path.isfile(module_path): | |
sp = os.path.splitext(module_path) | |
module_name = sp[0] | |
try: | |
logging.debug("Trying to load custom node {}".format(module_path)) | |
if os.path.isfile(module_path): | |
module_spec = importlib.util.spec_from_file_location(module_name, module_path) | |
module_dir = os.path.split(module_path)[0] | |
else: | |
module_spec = importlib.util.spec_from_file_location(module_name, os.path.join(module_path, "__init__.py")) | |
module_dir = module_path | |
module = importlib.util.module_from_spec(module_spec) | |
sys.modules[module_name] = module | |
module_spec.loader.exec_module(module) | |
LOADED_MODULE_DIRS[module_name] = os.path.abspath(module_dir) | |
if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None: | |
web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY"))) | |
if os.path.isdir(web_dir): | |
EXTENSION_WEB_DIRS[module_name] = web_dir | |
if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None: | |
for name, node_cls in module.NODE_CLASS_MAPPINGS.items(): | |
if name not in ignore: | |
NODE_CLASS_MAPPINGS[name] = node_cls | |
node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path)) | |
if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None: | |
NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) | |
return True | |
else: | |
logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.") | |
return False | |
except Exception as e: | |
logging.warning(traceback.format_exc()) | |
logging.warning(f"Cannot import {module_path} module for custom nodes: {e}") | |
return False | |
def init_external_custom_nodes(): | |
""" | |
Initializes the external custom nodes. | |
This function loads custom nodes from the specified folder paths and imports them into the application. | |
It measures the import times for each custom node and logs the results. | |
Returns: | |
None | |
""" | |
base_node_names = set(NODE_CLASS_MAPPINGS.keys()) | |
node_paths = folder_paths.get_folder_paths("custom_nodes") | |
node_import_times = [] | |
for custom_node_path in node_paths: | |
possible_modules = os.listdir(os.path.realpath(custom_node_path)) | |
if "__pycache__" in possible_modules: | |
possible_modules.remove("__pycache__") | |
for possible_module in possible_modules: | |
module_path = os.path.join(custom_node_path, possible_module) | |
if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue | |
if module_path.endswith(".disabled"): continue | |
time_before = time.perf_counter() | |
success = load_custom_node(module_path, base_node_names, module_parent="custom_nodes") | |
node_import_times.append((time.perf_counter() - time_before, module_path, success)) | |
if len(node_import_times) > 0: | |
logging.info("\nImport times for custom nodes:") | |
for n in sorted(node_import_times): | |
if n[2]: | |
import_message = "" | |
else: | |
import_message = " (IMPORT FAILED)" | |
logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1])) | |
logging.info("") | |
def init_builtin_extra_nodes(): | |
""" | |
Initializes the built-in extra nodes in ComfyUI. | |
This function loads the extra node files located in the "comfy_extras" directory and imports them into ComfyUI. | |
If any of the extra node files fail to import, a warning message is logged. | |
Returns: | |
None | |
""" | |
extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras") | |
extras_files = [ | |
"nodes_latent.py", | |
"nodes_hypernetwork.py", | |
"nodes_upscale_model.py", | |
"nodes_post_processing.py", | |
"nodes_mask.py", | |
"nodes_compositing.py", | |
"nodes_rebatch.py", | |
"nodes_model_merging.py", | |
"nodes_tomesd.py", | |
"nodes_clip_sdxl.py", | |
"nodes_canny.py", | |
"nodes_freelunch.py", | |
"nodes_custom_sampler.py", | |
"nodes_hypertile.py", | |
"nodes_model_advanced.py", | |
"nodes_model_downscale.py", | |
"nodes_images.py", | |
"nodes_video_model.py", | |
"nodes_sag.py", | |
"nodes_perpneg.py", | |
"nodes_stable3d.py", | |
"nodes_sdupscale.py", | |
"nodes_photomaker.py", | |
"nodes_pixart.py", | |
"nodes_cond.py", | |
"nodes_morphology.py", | |
"nodes_stable_cascade.py", | |
"nodes_differential_diffusion.py", | |
"nodes_ip2p.py", | |
"nodes_model_merging_model_specific.py", | |
"nodes_pag.py", | |
"nodes_align_your_steps.py", | |
"nodes_attention_multiply.py", | |
"nodes_advanced_samplers.py", | |
"nodes_webcam.py", | |
"nodes_audio.py", | |
"nodes_sd3.py", | |
"nodes_gits.py", | |
"nodes_controlnet.py", | |
"nodes_hunyuan.py", | |
"nodes_flux.py", | |
"nodes_lora_extract.py", | |
"nodes_torch_compile.py", | |
"nodes_mochi.py", | |
"nodes_slg.py", | |
"nodes_mahiro.py", | |
"nodes_lt.py", | |
"nodes_hooks.py", | |
"nodes_load_3d.py", | |
"nodes_cosmos.py", | |
] | |
import_failed = [] | |
for node_file in extras_files: | |
if not load_custom_node(os.path.join(extras_dir, node_file), module_parent="comfy_extras"): | |
import_failed.append(node_file) | |
return import_failed | |
def init_extra_nodes(init_custom_nodes=True): | |
import_failed = init_builtin_extra_nodes() | |
if init_custom_nodes: | |
init_external_custom_nodes() | |
else: | |
logging.info("Skipping loading of custom nodes") | |
if len(import_failed) > 0: | |
logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n") | |
for node in import_failed: | |
logging.warning("IMPORT FAILED: {}".format(node)) | |
logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") | |
if args.windows_standalone_build: | |
logging.warning("Please run the update script: update/update_comfyui.bat") | |
else: | |
logging.warning("Please do a: pip install -r requirements.txt") | |
logging.warning("") | |
return import_failed | |