Spaces:
Build error
Build error
# (c) City96 || Apache-2.0 (apache.org/licenses/LICENSE-2.0) | |
import os | |
import torch | |
import gguf # This needs to be the llama.cpp one specifically! | |
import argparse | |
from tqdm import tqdm | |
from safetensors.torch import load_file | |
QUANTIZATION_THRESHOLD = 1024 | |
REARRANGE_THRESHOLD = 512 | |
MAX_TENSOR_NAME_LENGTH = 127 | |
class ModelTemplate: | |
arch = "invalid" # string describing architecture | |
shape_fix = False # whether to reshape tensors | |
keys_detect = [] # list of lists to match in state dict | |
keys_banned = [] # list of keys that should mark model as invalid for conversion | |
class ModelFlux(ModelTemplate): | |
arch = "flux" | |
keys_detect = [ | |
("transformer_blocks.0.attn.norm_added_k.weight",), | |
("double_blocks.0.img_attn.proj.weight",), | |
] | |
keys_banned = ["transformer_blocks.0.attn.norm_added_k.weight",] | |
class ModelSD3(ModelTemplate): | |
arch = "sd3" | |
keys_detect = [ | |
("transformer_blocks.0.attn.add_q_proj.weight",), | |
("joint_blocks.0.x_block.attn.qkv.weight",), | |
] | |
keys_banned = ["transformer_blocks.0.attn.add_q_proj.weight",] | |
class ModelAura(ModelTemplate): | |
arch = "aura" | |
keys_detect = [ | |
("double_layers.3.modX.1.weight",), | |
("joint_transformer_blocks.3.ff_context.out_projection.weight",), | |
] | |
keys_banned = ["joint_transformer_blocks.3.ff_context.out_projection.weight",] | |
class ModelLTXV(ModelTemplate): | |
arch = "ltxv" | |
keys_detect = [ | |
( | |
"adaln_single.emb.timestep_embedder.linear_2.weight", | |
"transformer_blocks.27.scale_shift_table", | |
"caption_projection.linear_2.weight", | |
) | |
] | |
class ModelSDXL(ModelTemplate): | |
arch = "sdxl" | |
shape_fix = True | |
keys_detect = [ | |
("down_blocks.0.downsamplers.0.conv.weight", "add_embedding.linear_1.weight",), | |
( | |
"input_blocks.3.0.op.weight", "input_blocks.6.0.op.weight", | |
"output_blocks.2.2.conv.weight", "output_blocks.5.2.conv.weight", | |
), # Non-diffusers | |
("label_emb.0.0.weight",), | |
] | |
class ModelSD1(ModelTemplate): | |
arch = "sd1" | |
shape_fix = True | |
keys_detect = [ | |
("down_blocks.0.downsamplers.0.conv.weight",), | |
( | |
"input_blocks.3.0.op.weight", "input_blocks.6.0.op.weight", "input_blocks.9.0.op.weight", | |
"output_blocks.2.1.conv.weight", "output_blocks.5.2.conv.weight", "output_blocks.8.2.conv.weight" | |
), # Non-diffusers | |
] | |
# The architectures are checked in order and the first successful match terminates the search. | |
arch_list = [ModelFlux, ModelSD3, ModelAura, ModelLTXV, ModelSDXL, ModelSD1] | |
def is_model_arch(model, state_dict): | |
# check if model is correct | |
matched = False | |
invalid = False | |
for match_list in model.keys_detect: | |
if all(key in state_dict for key in match_list): | |
matched = True | |
invalid = any(key in state_dict for key in model.keys_banned) | |
break | |
assert not invalid, "Model architecture not allowed for conversion! (i.e. reference VS diffusers format)" | |
return matched | |
def detect_arch(state_dict): | |
model_arch = None | |
for arch in arch_list: | |
if is_model_arch(arch, state_dict): | |
model_arch = arch | |
break | |
assert model_arch is not None, "Unknown model architecture!" | |
return model_arch | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Generate F16 GGUF files from single UNET") | |
parser.add_argument("--src", required=True, help="Source model ckpt file.") | |
parser.add_argument("--dst", help="Output unet gguf file.") | |
args = parser.parse_args() | |
if not os.path.isfile(args.src): | |
parser.error("No input provided!") | |
return args | |
def load_state_dict(path): | |
if any(path.endswith(x) for x in [".ckpt", ".pt", ".bin", ".pth"]): | |
state_dict = torch.load(path, map_location="cpu", weights_only=True) | |
state_dict = state_dict.get("model", state_dict) | |
else: | |
state_dict = load_file(path) | |
# only keep unet with no prefix! | |
prefix = None | |
for pfx in ["model.diffusion_model.", "model."]: | |
if any([x.startswith(pfx) for x in state_dict.keys()]): | |
prefix = pfx | |
break | |
sd = {} | |
for k, v in state_dict.items(): | |
if prefix and prefix not in k: | |
continue | |
if prefix: | |
k = k.replace(prefix, "") | |
sd[k] = v | |
return sd | |
def load_model(path): | |
state_dict = load_state_dict(path) | |
model_arch = detect_arch(state_dict) | |
print(f"* Architecture detected from input: {model_arch.arch}") | |
writer = gguf.GGUFWriter(path=None, arch=model_arch.arch) | |
return (writer, state_dict, model_arch) | |
def handle_tensors(args, writer, state_dict, model_arch): | |
name_lengths = tuple(sorted( | |
((key, len(key)) for key in state_dict.keys()), | |
key=lambda item: item[1], | |
reverse=True, | |
)) | |
if not name_lengths: | |
return | |
max_name_len = name_lengths[0][1] | |
if max_name_len > MAX_TENSOR_NAME_LENGTH: | |
bad_list = ", ".join(f"{key!r} ({namelen})" for key, namelen in name_lengths if namelen > MAX_TENSOR_NAME_LENGTH) | |
raise ValueError(f"Can only handle tensor names up to {MAX_TENSOR_NAME_LENGTH} characters. Tensors exceeding the limit: {bad_list}") | |
for key, data in tqdm(state_dict.items()): | |
old_dtype = data.dtype | |
if data.dtype == torch.bfloat16: | |
data = data.to(torch.float32).numpy() | |
# this is so we don't break torch 2.0.X | |
elif data.dtype in [getattr(torch, "float8_e4m3fn", "_invalid"), getattr(torch, "float8_e5m2", "_invalid")]: | |
data = data.to(torch.float16).numpy() | |
else: | |
data = data.numpy() | |
n_dims = len(data.shape) | |
data_shape = data.shape | |
data_qtype = getattr( | |
gguf.GGMLQuantizationType, | |
"BF16" if old_dtype == torch.bfloat16 else "F16" | |
) | |
# get number of parameters (AKA elements) in this tensor | |
n_params = 1 | |
for dim_size in data_shape: | |
n_params *= dim_size | |
# keys to keep as max precision | |
blacklist = { | |
"time_embedding.", | |
"add_embedding.", | |
"time_in.", | |
"txt_in.", | |
"vector_in.", | |
"img_in.", | |
"guidance_in.", | |
"final_layer.", | |
} | |
if old_dtype in (torch.float32, torch.bfloat16): | |
if n_dims == 1: | |
# one-dimensional tensors should be kept in F32 | |
# also speeds up inference due to not dequantizing | |
data_qtype = gguf.GGMLQuantizationType.F32 | |
elif n_params <= QUANTIZATION_THRESHOLD: | |
# very small tensors | |
data_qtype = gguf.GGMLQuantizationType.F32 | |
elif ".weight" in key and any(x in key for x in blacklist): | |
data_qtype = gguf.GGMLQuantizationType.F32 | |
if (model_arch.shape_fix # NEVER reshape for models such as flux | |
and n_dims > 1 # Skip one-dimensional tensors | |
and n_params >= REARRANGE_THRESHOLD # Only rearrange tensors meeting the size requirement | |
and (n_params / 256).is_integer() # Rearranging only makes sense if total elements is divisible by 256 | |
and not (data.shape[-1] / 256).is_integer() # Only need to rearrange if the last dimension is not divisible by 256 | |
): | |
orig_shape = data.shape | |
data = data.reshape(n_params // 256, 256) | |
writer.add_array(f"comfy.gguf.orig_shape.{key}", tuple(int(dim) for dim in orig_shape)) | |
try: | |
data = gguf.quants.quantize(data, data_qtype) | |
except (AttributeError, gguf.QuantError) as e: | |
tqdm.write(f"falling back to F16: {e}") | |
data_qtype = gguf.GGMLQuantizationType.F16 | |
data = gguf.quants.quantize(data, data_qtype) | |
new_name = key # do we need to rename? | |
shape_str = f"{{{', '.join(str(n) for n in reversed(data.shape))}}}" | |
tqdm.write(f"{f'%-{max_name_len + 4}s' % f'{new_name}'} {old_dtype} --> {data_qtype.name}, shape = {shape_str}") | |
writer.add_tensor(new_name, data, raw_dtype=data_qtype) | |
if __name__ == "__main__": | |
args = parse_args() | |
path = args.src | |
writer, state_dict, model_arch = load_model(path) | |
writer.add_quantization_version(gguf.GGML_QUANT_VERSION) | |
if next(iter(state_dict.values())).dtype == torch.bfloat16: | |
out_path = f"{os.path.splitext(path)[0]}-BF16.gguf" | |
writer.add_file_type(gguf.LlamaFileType.MOSTLY_BF16) | |
else: | |
out_path = f"{os.path.splitext(path)[0]}-F16.gguf" | |
writer.add_file_type(gguf.LlamaFileType.MOSTLY_F16) | |
out_path = args.dst or out_path | |
if os.path.isfile(out_path): | |
input("Output exists enter to continue or ctrl+c to abort!") | |
handle_tensors(path, writer, state_dict, model_arch) | |
writer.write_header_to_file(path=out_path) | |
writer.write_kv_data_to_file() | |
writer.write_tensors_to_file(progress=True) | |
writer.close() | |