# (c) City96 || Apache-2.0 (apache.org/licenses/LICENSE-2.0) import torch import logging import collections import comfy.sd import comfy.utils import comfy.model_patcher import comfy.model_management import folder_paths from .ops import GGMLOps, move_patch_to_device from .loader import gguf_sd_loader, gguf_clip_loader from .dequant import is_quantized, is_torch_compatible def update_folder_names_and_paths(key, targets=[]): # check for existing key base = folder_paths.folder_names_and_paths.get(key, ([], {})) base = base[0] if isinstance(base[0], (list, set, tuple)) else [] # find base key & add w/ fallback, sanity check + warning target = next((x for x in targets if x in folder_paths.folder_names_and_paths), targets[0]) orig, _ = folder_paths.folder_names_and_paths.get(target, ([], {})) folder_paths.folder_names_and_paths[key] = (orig or base, {".gguf"}) if base and base != orig: logging.warning(f"Unknown file list already present on key {key}: {base}") # Add a custom keys for files ending in .gguf update_folder_names_and_paths("unet_gguf", ["diffusion_models", "unet"]) update_folder_names_and_paths("clip_gguf", ["text_encoders", "clip"]) class GGUFModelPatcher(comfy.model_patcher.ModelPatcher): patch_on_device = False def patch_weight_to_device(self, key, device_to=None, inplace_update=False): if key not in self.patches: return weight = comfy.utils.get_attr(self.model, key) try: from comfy.lora import calculate_weight except Exception: calculate_weight = self.calculate_weight patches = self.patches[key] if is_quantized(weight): out_weight = weight.to(device_to) patches = move_patch_to_device(patches, self.load_device if self.patch_on_device else self.offload_device) # TODO: do we ever have legitimate duplicate patches? (i.e. patch on top of patched weight) out_weight.patches = [(calculate_weight, patches, key)] else: inplace_update = self.weight_inplace_update or inplace_update if key not in self.backup: self.backup[key] = collections.namedtuple('Dimension', ['weight', 'inplace_update'])( weight.to(device=self.offload_device, copy=inplace_update), inplace_update ) if device_to is not None: temp_weight = comfy.model_management.cast_to_device(weight, device_to, torch.float32, copy=True) else: temp_weight = weight.to(torch.float32, copy=True) out_weight = calculate_weight(patches, temp_weight, key) out_weight = comfy.float.stochastic_rounding(out_weight, weight.dtype) if inplace_update: comfy.utils.copy_to_param(self.model, key, out_weight) else: comfy.utils.set_attr_param(self.model, key, out_weight) def unpatch_model(self, device_to=None, unpatch_weights=True): if unpatch_weights: for p in self.model.parameters(): if is_torch_compatible(p): continue patches = getattr(p, "patches", []) if len(patches) > 0: p.patches = [] # TODO: Find another way to not unload after patches return super().unpatch_model(device_to=device_to, unpatch_weights=unpatch_weights) mmap_released = False def load(self, *args, force_patch_weights=False, **kwargs): # always call `patch_weight_to_device` even for lowvram super().load(*args, force_patch_weights=True, **kwargs) # make sure nothing stays linked to mmap after first load if not self.mmap_released: linked = [] if kwargs.get("lowvram_model_memory", 0) > 0: for n, m in self.model.named_modules(): if hasattr(m, "weight"): device = getattr(m.weight, "device", None) if device == self.offload_device: linked.append((n, m)) continue if hasattr(m, "bias"): device = getattr(m.bias, "device", None) if device == self.offload_device: linked.append((n, m)) continue if linked: print(f"Attempting to release mmap ({len(linked)})") for n, m in linked: # TODO: possible to OOM, find better way to detach m.to(self.load_device).to(self.offload_device) self.mmap_released = True def clone(self, *args, **kwargs): src_cls = self.__class__ self.__class__ = GGUFModelPatcher n = super().clone(*args, **kwargs) n.__class__ = GGUFModelPatcher self.__class__ = src_cls # GGUF specific clone values below n.patch_on_device = getattr(self, "patch_on_device", False) return n class UnetLoaderGGUF: @classmethod def INPUT_TYPES(s): unet_names = [x for x in folder_paths.get_filename_list("unet_gguf")] return { "required": { "unet_name": (unet_names,), } } RETURN_TYPES = ("MODEL",) FUNCTION = "load_unet" CATEGORY = "bootleg" TITLE = "Unet Loader (GGUF)" def load_unet(self, unet_name, dequant_dtype=None, patch_dtype=None, patch_on_device=None): ops = GGMLOps() if dequant_dtype in ("default", None): ops.Linear.dequant_dtype = None elif dequant_dtype in ["target"]: ops.Linear.dequant_dtype = dequant_dtype else: ops.Linear.dequant_dtype = getattr(torch, dequant_dtype) if patch_dtype in ("default", None): ops.Linear.patch_dtype = None elif patch_dtype in ["target"]: ops.Linear.patch_dtype = patch_dtype else: ops.Linear.patch_dtype = getattr(torch, patch_dtype) # init model unet_path = folder_paths.get_full_path("unet", unet_name) sd = gguf_sd_loader(unet_path) model = comfy.sd.load_diffusion_model_state_dict( sd, model_options={"custom_operations": ops} ) if model is None: logging.error("ERROR UNSUPPORTED UNET {}".format(unet_path)) raise RuntimeError("ERROR: Could not detect model type of: {}".format(unet_path)) model = GGUFModelPatcher.clone(model) model.patch_on_device = patch_on_device return (model,) class UnetLoaderGGUFAdvanced(UnetLoaderGGUF): @classmethod def INPUT_TYPES(s): unet_names = [x for x in folder_paths.get_filename_list("unet_gguf")] return { "required": { "unet_name": (unet_names,), "dequant_dtype": (["default", "target", "float32", "float16", "bfloat16"], {"default": "default"}), "patch_dtype": (["default", "target", "float32", "float16", "bfloat16"], {"default": "default"}), "patch_on_device": ("BOOLEAN", {"default": False}), } } TITLE = "Unet Loader (GGUF/Advanced)" # Mapping from common name to name used in comfy.sd.CLIPType enum CLIP_ENUM_MAP = { "stable_diffusion": "STABLE_DIFFUSION", "stable_cascade": "STABLE_CASCADE", "stable_audio": "STABLE_AUDIO", "sdxl": "STABLE_DIFFUSION", "sd3": "SD3", "flux": "FLUX", "mochi": "MOCHI", "ltxv": "LTXV", "hunyuan_video": "HUNYUAN_VIDEO", "pixart": "PIXART", } def get_clip_type(name): enum_name = CLIP_ENUM_MAP.get(name, None) if enum_name is None: raise ValueError(f"Unknown CLIP model type {name}") clip_type = getattr(comfy.sd.CLIPType, CLIP_ENUM_MAP[name], None) if clip_type is None: raise ValueError(f"Unsupported CLIP model type {name} (Update ComfyUI)") return clip_type class CLIPLoaderGGUF: @classmethod def INPUT_TYPES(s): return { "required": { "clip_name": (s.get_filename_list(),), "type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart"],), } } RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "bootleg" TITLE = "CLIPLoader (GGUF)" @classmethod def get_filename_list(s): files = [] files += folder_paths.get_filename_list("clip") files += folder_paths.get_filename_list("clip_gguf") return sorted(files) def load_data(self, ckpt_paths): clip_data = [] for p in ckpt_paths: if p.endswith(".gguf"): sd = gguf_clip_loader(p) else: sd = comfy.utils.load_torch_file(p, safe_load=True) clip_data.append(sd) return clip_data def load_patcher(self, clip_paths, clip_type, clip_data): clip = comfy.sd.load_text_encoder_state_dicts( clip_type = clip_type, state_dicts = clip_data, model_options = { "custom_operations": GGMLOps, "initial_device": comfy.model_management.text_encoder_offload_device() }, embedding_directory = folder_paths.get_folder_paths("embeddings"), ) clip.patcher = GGUFModelPatcher.clone(clip.patcher) return clip def load_clip(self, clip_name, type="stable_diffusion"): clip_path = folder_paths.get_full_path("clip", clip_name) return (self.load_patcher([clip_path], get_clip_type(type), self.load_data([clip_path])),) class DualCLIPLoaderGGUF(CLIPLoaderGGUF): @classmethod def INPUT_TYPES(s): file_options = (s.get_filename_list(), ) return { "required": { "clip_name1": file_options, "clip_name2": file_options, "type": (("sdxl", "sd3", "flux", "hunyuan_video"),), } } TITLE = "DualCLIPLoader (GGUF)" def load_clip(self, clip_name1, clip_name2, type): clip_path1 = folder_paths.get_full_path("clip", clip_name1) clip_path2 = folder_paths.get_full_path("clip", clip_name2) clip_paths = (clip_path1, clip_path2) return (self.load_patcher(clip_paths, get_clip_type(type), self.load_data(clip_paths)),) class TripleCLIPLoaderGGUF(CLIPLoaderGGUF): @classmethod def INPUT_TYPES(s): file_options = (s.get_filename_list(), ) return { "required": { "clip_name1": file_options, "clip_name2": file_options, "clip_name3": file_options, } } TITLE = "TripleCLIPLoader (GGUF)" def load_clip(self, clip_name1, clip_name2, clip_name3, type="sd3"): clip_path1 = folder_paths.get_full_path("clip", clip_name1) clip_path2 = folder_paths.get_full_path("clip", clip_name2) clip_path3 = folder_paths.get_full_path("clip", clip_name3) clip_paths = (clip_path1, clip_path2, clip_path3) return (self.load_patcher(clip_paths, get_clip_type(type), self.load_data(clip_paths)),) NODE_CLASS_MAPPINGS = { "UnetLoaderGGUF": UnetLoaderGGUF, "CLIPLoaderGGUF": CLIPLoaderGGUF, "DualCLIPLoaderGGUF": DualCLIPLoaderGGUF, "TripleCLIPLoaderGGUF": TripleCLIPLoaderGGUF, "UnetLoaderGGUFAdvanced": UnetLoaderGGUFAdvanced, }