import os import random import sys from typing import Sequence, Mapping, Any, Union import torch import gradio as gr from huggingface_hub import hf_hub_download import spaces from comfy import model_management hf_hub_download(repo_id="black-forest-labs/FLUX.1-Redux-dev", filename="flux1-redux-dev.safetensors", local_dir="models/style_models") hf_hub_download(repo_id="black-forest-labs/FLUX.1-Depth-dev", filename="flux1-depth-dev.safetensors", local_dir="models/diffusion_models") hf_hub_download(repo_id="Comfy-Org/sigclip_vision_384", filename="sigclip_vision_patch14_384.safetensors", local_dir="models/clip_vision") hf_hub_download(repo_id="Kijai/DepthAnythingV2-safetensors", filename="depth_anything_v2_vitl_fp32.safetensors", local_dir="models/depthanything") hf_hub_download(repo_id="black-forest-labs/FLUX.1-dev", filename="ae.safetensors", local_dir="models/vae/FLUX1") hf_hub_download(repo_id="comfyanonymous/flux_text_encoders", filename="clip_l.safetensors", local_dir="models/text_encoders") hf_hub_download(repo_id="comfyanonymous/flux_text_encoders", filename="t5xxl_fp16.safetensors", local_dir="models/text_encoders/t5") def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: """Returns the value at the given index of a sequence or mapping. If the object is a sequence (like list or string), returns the value at the given index. If the object is a mapping (like a dictionary), returns the value at the index-th key. Some return a dictionary, in these cases, we look for the "results" key Args: obj (Union[Sequence, Mapping]): The object to retrieve the value from. index (int): The index of the value to retrieve. Returns: Any: The value at the given index. Raises: IndexError: If the index is out of bounds for the object and the object is not a mapping. """ try: return obj[index] except KeyError: return obj["result"][index] def find_path(name: str, path: str = None) -> str: """ Recursively looks at parent folders starting from the given path until it finds the given name. Returns the path as a Path object if found, or None otherwise. """ # If no path is given, use the current working directory if path is None: path = os.getcwd() # Check if the current directory contains the name if name in os.listdir(path): path_name = os.path.join(path, name) print(f"{name} found: {path_name}") return path_name # Get the parent directory parent_directory = os.path.dirname(path) # If the parent directory is the same as the current directory, we've reached the root and stop the search if parent_directory == path: return None # Recursively call the function with the parent directory return find_path(name, parent_directory) def add_comfyui_directory_to_sys_path() -> None: """ Add 'ComfyUI' to the sys.path """ comfyui_path = find_path("ComfyUI") if comfyui_path is not None and os.path.isdir(comfyui_path): sys.path.append(comfyui_path) print(f"'{comfyui_path}' added to sys.path") def add_extra_model_paths() -> None: """ Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. """ # Ensure custom_nodes directory exists custom_nodes_path = os.path.join(os.getcwd(), "custom_nodes") if not os.path.exists(custom_nodes_path): os.makedirs(custom_nodes_path) print(f"Created custom_nodes directory at: {custom_nodes_path}") try: from main import load_extra_path_config except ImportError: print( "Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." ) from utils.extra_config import load_extra_path_config extra_model_paths = find_path("extra_model_paths.yaml") if extra_model_paths is not None: load_extra_path_config(extra_model_paths) else: print("Could not find the extra_model_paths config file.") add_comfyui_directory_to_sys_path() add_extra_model_paths() def import_custom_nodes() -> None: """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS This function sets up a new asyncio event loop, initializes the PromptServer, creates a PromptQueue, and initializes the custom nodes. """ import asyncio import execution from nodes import init_extra_nodes import server # Creating a new event loop and setting it as the default loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Creating an instance of PromptServer with the loop server_instance = server.PromptServer(loop) execution.PromptQueue(server_instance) # Initializing custom nodes init_extra_nodes() # Initialize nodes before using them import_custom_nodes() # Now import and use NODE_CLASS_MAPPINGS from nodes import NODE_CLASS_MAPPINGS # Create instances of the nodes we'll use try: # Load required models dualcliploader = NODE_CLASS_MAPPINGS["DualCLIPLoader"]() vaeloader = NODE_CLASS_MAPPINGS["VAELoader"]() unetloader = NODE_CLASS_MAPPINGS["UNETLoader"]() clipvisionloader = NODE_CLASS_MAPPINGS["CLIPVisionLoader"]() stylemodelloader = NODE_CLASS_MAPPINGS["StyleModelLoader"]() # Image processing nodes loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() imagescale = NODE_CLASS_MAPPINGS["ImageScale"]() vaedecode = NODE_CLASS_MAPPINGS["VAEDecode"]() vaeencode = NODE_CLASS_MAPPINGS["VAEEncode"]() saveimage = NODE_CLASS_MAPPINGS["SaveImage"]() # Conditioning and sampling nodes cliptextencode = NODE_CLASS_MAPPINGS["CLIPTextEncode"]() ksampler = NODE_CLASS_MAPPINGS["KSampler"]() emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() except KeyError as e: print(f"Error: Could not find node {e} in NODE_CLASS_MAPPINGS") print("Available nodes:", list(NODE_CLASS_MAPPINGS.keys())) raise # Load all the models that need a safetensors file model_loaders = [ dualcliploader.load_clip( clip_name1="t5/t5xxl_fp16.safetensors", clip_name2="clip_l.safetensors", type="flux", ), vaeloader.load_vae("vae/FLUX1/ae.safetensors"), unetloader.load_unet("diffusion_models/flux1-depth-dev.safetensors"), clipvisionloader.load_clip("clip_vision/sigclip_vision_patch14_384.safetensors"), stylemodelloader.load_style_model("style_models/flux1-redux-dev.safetensors") ] # Check which models are valid valid_models = [ model for model in model_loaders if model is not None and len(model) > 0 ] @spaces.GPU(duration=60) def generate_image(prompt, structure_image, style_image, depth_strength, style_strength): with torch.inference_mode(): # Set up image dimensions width = 1024 height = 1024 # Load and process the input images loaded_structure = loadimage.load_image(structure_image) loaded_style = loadimage.load_image(style_image) # Scale images if needed scaled_structure = imagescale.upscale(loaded_structure, width, height, "lanczos", "center") scaled_style = imagescale.upscale(loaded_style, width, height, "lanczos", "center") # Create empty latent latent = emptylatentimage.generate(width, height, 1) # Encode the prompt conditioning = cliptextencode.encode( clip=get_value_at_index(dualcliploader.load_clip( clip_name1="t5/t5xxl_fp16.safetensors", clip_name2="clip_l.safetensors", type="flux", ), 0), text=prompt ) # Sample the image sampled = ksampler.sample( model=get_value_at_index(unetloader.load_unet("diffusion_models/flux1-depth-dev.safetensors"), 0), positive=conditioning, negative=None, latent=latent, seed=random.randint(1, 2**32), steps=20, cfg=7.5, sampler_name="euler", scheduler="normal", denoise=1.0, ) # Decode the latent to image decoded = vaedecode.decode( samples=sampled, vae=get_value_at_index(vaeloader.load_vae("vae/FLUX1/ae.safetensors"), 0) ) # Save the final image saved = saveimage.save_images(decoded) return saved if __name__ == "__main__": # Comment out the main() call # Start your Gradio app with gr.Blocks() as app: # Add a title gr.Markdown("# FLUX Style Shaping") with gr.Row(): with gr.Column(): # Add an input prompt_input = gr.Textbox(label="Prompt", placeholder="Enter your prompt here...") # Add a `Row` to include the groups side by side with gr.Row(): # First group includes structure image and depth strength with gr.Group(): structure_image = gr.Image(label="Structure Image", type="filepath") depth_strength = gr.Slider(minimum=0, maximum=50, value=15, label="Depth Strength") # Second group includes style image and style strength with gr.Group(): style_image = gr.Image(label="Style Image", type="filepath") style_strength = gr.Slider(minimum=0, maximum=1, value=0.5, label="Style Strength") # The generate button generate_btn = gr.Button("Generate") with gr.Column(): # The output image output_image = gr.Image(label="Generated Image") # When clicking the button, it will trigger the `generate_image` function, with the respective inputs # and the output an image generate_btn.click( fn=generate_image, inputs=[prompt_input, structure_image, style_image, depth_strength, style_strength], outputs=[output_image] ) app.launch(share=True)