import os import random import sys from typing import Sequence, Mapping, Any, Union import torch import gradio as gr from huggingface_hub import hf_hub_download import spaces from comfy import model_management from huggingface_hub import hf_hub_download hf_hub_download( repo_id="Madespace/clip", filename="google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", local_dir="models/clip" ) hf_hub_download( repo_id="ezioruan/inswapper_128.onnx", filename="inswapper_128.onnx", local_dir="models/insightface" ) hf_hub_download( repo_id="gmk123/GFPGAN", filename="GFPGANv1.4.pth", local_dir="models/facerestore_models" ) hf_hub_download( repo_id="gemasai/4x_NMKD-Superscale-SP_178000_G", filename="4x_NMKD-Superscale-SP_178000_G.pth", local_dir="models/upscale_models" ) def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: """Returns the value at the given index of a sequence or mapping. If the object is a sequence (like list or string), returns the value at the given index. If the object is a mapping (like a dictionary), returns the value at the index-th key. Some return a dictionary, in these cases, we look for the "results" key Args: obj (Union[Sequence, Mapping]): The object to retrieve the value from. index (int): The index of the value to retrieve. Returns: Any: The value at the given index. Raises: IndexError: If the index is out of bounds for the object and the object is not a mapping. """ try: return obj[index] except KeyError: return obj["result"][index] def find_path(name: str, path: str = None) -> str: """ Recursively looks at parent folders starting from the given path until it finds the given name. Returns the path as a Path object if found, or None otherwise. """ # If no path is given, use the current working directory if path is None: path = os.getcwd() # Check if the current directory contains the name if name in os.listdir(path): path_name = os.path.join(path, name) print(f"{name} found: {path_name}") return path_name # Get the parent directory parent_directory = os.path.dirname(path) # If the parent directory is the same as the current directory, we've reached the root and stop the search if parent_directory == path: return None # Recursively call the function with the parent directory return find_path(name, parent_directory) def add_comfyui_directory_to_sys_path() -> None: """ Add 'ComfyUI' to the sys.path """ comfyui_path = find_path("ComfyUI") if comfyui_path is not None and os.path.isdir(comfyui_path): sys.path.append(comfyui_path) print(f"'{comfyui_path}' added to sys.path") def add_extra_model_paths() -> None: """ Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. """ try: from main import load_extra_path_config except ImportError: print( "Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." ) from ut.extra_config import load_extra_path_config extra_model_paths = find_path("extra_model_paths.yaml") if extra_model_paths is not None: load_extra_path_config(extra_model_paths) else: print("Could not find the extra_model_paths config file.") add_comfyui_directory_to_sys_path() add_extra_model_paths() def import_custom_nodes() -> None: """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS This function sets up a new asyncio event loop, initializes the PromptServer, creates a PromptQueue, and initializes the custom nodes. """ import asyncio import execution from nodes import init_extra_nodes import server # Creating a new event loop and setting it as the default loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Creating an instance of PromptServer with the loop server_instance = server.PromptServer(loop) execution.PromptQueue(server_instance) # Initializing custom nodes init_extra_nodes() from nodes import NODE_CLASS_MAPPINGS #TO be added to "model_loaders" as it loads a model # downloadandloadcogvideomodel = NODE_CLASS_MAPPINGS[ # "DownloadAndLoadCogVideoModel" # ]() # downloadandloadcogvideomodel_1 = downloadandloadcogvideomodel.loadmodel( # model="THUDM/CogVideoX-5b", # precision="bf16", # quantization="disabled", # enable_sequential_cpu_offload=True, # attention_mode="sdpa", # load_device="main_device", # ) # loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() # cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() # cliploader_20 = cliploader.load_clip( # clip_name="t5/google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", # type="sd3", # device="default", # ) # emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() # cogvideotextencode = NODE_CLASS_MAPPINGS["CogVideoTextEncode"]() # cogvideosampler = NODE_CLASS_MAPPINGS["CogVideoSampler"]() # cogvideodecode = NODE_CLASS_MAPPINGS["CogVideoDecode"]() # reactorfaceswap = NODE_CLASS_MAPPINGS["ReActorFaceSwap"]() # cr_upscale_image = NODE_CLASS_MAPPINGS["CR Upscale Image"]() # vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() # #Add all the models that load a safetensors file # model_loaders = [downloadandloadcogvideomodel_1, cliploader_20] # # Check which models are valid and how to best load them # valid_models = [ # getattr(loader[0], 'patcher', loader[0]) # for loader in model_loaders # if not isinstance(loader[0], dict) and not isinstance(getattr(loader[0], 'patcher', None), dict) # ] # #Finally loads the models # model_management.load_models_gpu(valid_models) #Run ComfyUI Workflow @spaces.GPU(duration=800) def generate_video(positive_prompt, num_frames, input_image): print("Positive Prompt:", positive_prompt) print("Number of Frames:", num_frames) print("Input Image:", input_image) progress = gr.Progress(track_tqdm=True) import_custom_nodes() with torch.inference_mode(): downloadandloadcogvideomodel = NODE_CLASS_MAPPINGS[ "DownloadAndLoadCogVideoModel" ]() downloadandloadcogvideomodel_1 = downloadandloadcogvideomodel.loadmodel( model="THUDM/CogVideoX-5b", precision="bf16", quantization="disabled", enable_sequential_cpu_offload=True, attention_mode="sdpa", load_device="main_device", ) loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() loadimage_8 = loadimage.load_image(image=input_image) cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() cliploader_20 = cliploader.load_clip( clip_name="google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", type="sd3", device="default", ) emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() emptylatentimage_161 = emptylatentimage.generate( width=480, #reduce this to avoid OOM error height=480, #reduce this to avoid OOM error batch_size=1 #reduce this to avoid OOM error ) cogvideotextencode = NODE_CLASS_MAPPINGS["CogVideoTextEncode"]() cogvideosampler = NODE_CLASS_MAPPINGS["CogVideoSampler"]() cogvideodecode = NODE_CLASS_MAPPINGS["CogVideoDecode"]() reactorfaceswap = NODE_CLASS_MAPPINGS["ReActorFaceSwap"]() cr_upscale_image = NODE_CLASS_MAPPINGS["CR Upscale Image"]() vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() for q in range(1): cogvideotextencode_30 = cogvideotextencode.process( prompt=positive_prompt, strength=1, force_offload=True, clip=get_value_at_index(cliploader_20, 0), ) cogvideotextencode_31 = cogvideotextencode.process( prompt='', strength=1, force_offload=True, clip=get_value_at_index(cogvideotextencode_30, 1), ) cogvideosampler_155 = cogvideosampler.process( num_frames=num_frames, steps=30, #reduce this to avoid OOM error cfg=6, seed=random.randint(1, 2**64), scheduler="CogVideoXDDIM", denoise_strength=1, model=get_value_at_index(downloadandloadcogvideomodel_1, 0), positive=get_value_at_index(cogvideotextencode_30, 0), negative=get_value_at_index(cogvideotextencode_31, 0), samples=get_value_at_index(emptylatentimage_161, 0), ) cogvideodecode_11 = cogvideodecode.decode( enable_vae_tiling=False, tile_sample_min_height=240,#reduce this to avoid OOM error tile_sample_min_width=240,#reduce this to avoid OOM error tile_overlap_factor_height=0.2, tile_overlap_factor_width=0.2, auto_tile_size=True, vae=get_value_at_index(downloadandloadcogvideomodel_1, 1), samples=get_value_at_index(cogvideosampler_155, 0), ) reactorfaceswap_3 = reactorfaceswap.execute( enabled=True, swap_model="inswapper_128.onnx", facedetection="retinaface_resnet50", face_restore_model="GFPGANv1.4.pth", face_restore_visibility=1, codeformer_weight=0.75, detect_gender_input="no", detect_gender_source="no", input_faces_index="0", source_faces_index="0", console_log_level=1, input_image=get_value_at_index(cogvideodecode_11, 0), source_image=get_value_at_index(loadimage_8, 0), ) cr_upscale_image_151 = cr_upscale_image.upscale( upscale_model="4x_NMKD-Superscale-SP_178000_G.pth", mode="rescale", rescale_factor=4, resize_width=720, resampling_method="lanczos", supersample="true", rounding_modulus=16, image=get_value_at_index(reactorfaceswap_3, 0), ) vhs_videocombine_154 = vhs_videocombine.combine_video( frame_rate=8, loop_count=0, filename_prefix="AnimateDiff", format="video/h264-mp4", pix_fmt="yuv420p", crf=19, save_metadata=True, trim_to_audio=False, pingpong=True, save_output=True, images=get_value_at_index(cr_upscale_image_151, 0), unique_id=7214086815220268849, ) video_path = f"output/{vhs_videocombine_154['ui']['gifs'][0]['filename']}" image_path = f"output/{vhs_videocombine_154['result'][0][1][0].split('/')[-1]}" print(vhs_videocombine_154) print(video_path, image_path) return video_path, image_path if __name__ == "__main__": with gr.Blocks() as app: with gr.Row(): positive_prompt = gr.Textbox(label="Positive Prompt", value="A young Asian man with shoulder-length black hair, wearing a stylish black outfit, playing an acoustic guitar on a dimly lit stage. His full face is visible, showing a calm and focused expression as he strums the guitar. A microphone stand is positioned near him, and a music stand with sheet music is in front of him. The stage lighting casts a soft, warm glow on his face, and the background features an intimate live music setting with visible metal beams and soft blue ambient lighting. The scene captures the artistic mood of a live performance, emphasizing the details of the guitar, the musician’s fingers on the strings, and the relaxed yet passionate vibe of the moment.", lines=2) with gr.Row(): num_frames = gr.Number(label="Number of Frames", value=10) with gr.Row(): input_image = gr.Image(label="Input Image", type="filepath") submit = gr.Button("Submit") output_video = gr.Video(label="Output Video") output_image = gr.Image(label="Output Image") submit.click( fn=generate_video, inputs=[positive_prompt, num_frames, input_image], outputs=[output_video, output_image] ) app.launch(share=True)