Spaces:
Build error
Build error
import os | |
import sys | |
import traceback | |
import json | |
import asyncio | |
import concurrent | |
import threading | |
from typing import Optional | |
import typer | |
from rich import print | |
from typing_extensions import List, Annotated | |
import re | |
import git | |
import importlib | |
sys.path.append(os.path.dirname(__file__)) | |
sys.path.append(os.path.join(os.path.dirname(__file__), "glob")) | |
import manager_util | |
comfy_path = os.environ.get('COMFYUI_PATH') | |
if comfy_path is None: | |
try: | |
import folder_paths | |
comfy_path = os.path.join(os.path.dirname(folder_paths.__file__)) | |
except: | |
comfy_path = os.path.abspath(os.path.join(manager_util.comfyui_manager_path, '..', '..')) | |
sys.path.append(comfy_path) | |
import utils.extra_config | |
import cm_global | |
import manager_core as core | |
from manager_core import unified_manager | |
import cnr_utils | |
comfyui_manager_path = os.path.abspath(os.path.dirname(__file__)) | |
comfy_path = os.environ.get('COMFYUI_PATH') | |
if comfy_path is None: | |
print("\n[bold yellow]WARN: The `COMFYUI_PATH` environment variable is not set. Assuming `custom_nodes/ComfyUI-Manager/../../` as the ComfyUI path.[/bold yellow]", file=sys.stderr) | |
comfy_path = os.path.abspath(os.path.join(comfyui_manager_path, '..', '..')) | |
cm_global.pip_blacklist = ['torch', 'torchsde', 'torchvision'] | |
cm_global.pip_downgrade_blacklist = ['torch', 'torchsde', 'torchvision', 'transformers', 'safetensors', 'kornia'] | |
cm_global.pip_overrides = {'numpy': 'numpy<2'} | |
if os.path.exists(os.path.join(manager_util.comfyui_manager_path, "pip_overrides.json")): | |
with open(os.path.join(manager_util.comfyui_manager_path, "pip_overrides.json"), 'r', encoding="UTF-8", errors="ignore") as json_file: | |
cm_global.pip_overrides = json.load(json_file) | |
def check_comfyui_hash(): | |
repo = git.Repo(comfy_path) | |
core.comfy_ui_revision = len(list(repo.iter_commits('HEAD'))) | |
cm_global.variables['comfyui.revision'] = core.comfy_ui_revision | |
core.comfy_ui_commit_datetime = repo.head.commit.committed_datetime | |
check_comfyui_hash() # This is a preparation step for manager_core | |
core.check_invalid_nodes() | |
def read_downgrade_blacklist(): | |
try: | |
import configparser | |
config = configparser.ConfigParser() | |
config.read(core.manager_config.path) | |
default_conf = config['default'] | |
if 'downgrade_blacklist' in default_conf: | |
items = default_conf['downgrade_blacklist'].split(',') | |
items = [x.strip() for x in items if x != ''] | |
cm_global.pip_downgrade_blacklist += items | |
cm_global.pip_downgrade_blacklist = list(set(cm_global.pip_downgrade_blacklist)) | |
except: | |
pass | |
read_downgrade_blacklist() # This is a preparation step for manager_core | |
class Ctx: | |
folder_paths = None | |
def __init__(self): | |
self.channel = 'default' | |
self.no_deps = False | |
self.mode = 'cache' | |
self.user_directory = None | |
self.custom_nodes_paths = [os.path.join(core.comfy_path, 'custom_nodes')] | |
self.manager_files_directory = os.path.dirname(__file__) | |
if Ctx.folder_paths is None: | |
try: | |
Ctx.folder_paths = importlib.import_module('folder_paths') | |
except ImportError: | |
print("Warning: Unable to import folder_paths module") | |
def set_channel_mode(self, channel, mode): | |
if mode is not None: | |
self.mode = mode | |
valid_modes = ["remote", "local", "cache"] | |
if mode and mode.lower() not in valid_modes: | |
typer.echo( | |
f"Invalid mode: {mode}. Allowed modes are 'remote', 'local', 'cache'.", | |
err=True, | |
) | |
exit(1) | |
if channel is not None: | |
self.channel = channel | |
asyncio.run(unified_manager.reload(cache_mode=self.mode, dont_wait=False)) | |
asyncio.run(unified_manager.load_nightly(self.channel, self.mode)) | |
def set_no_deps(self, no_deps): | |
self.no_deps = no_deps | |
def set_user_directory(self, user_directory): | |
if user_directory is None: | |
return | |
extra_model_paths_yaml = os.path.join(user_directory, 'extra_model_paths.yaml') | |
if os.path.exists(extra_model_paths_yaml): | |
utils.extra_config.load_extra_path_config(extra_model_paths_yaml) | |
core.update_user_directory(user_directory) | |
if os.path.exists(core.manager_pip_overrides_path): | |
with open(core.manager_pip_overrides_path, 'r', encoding="UTF-8", errors="ignore") as json_file: | |
cm_global.pip_overrides = json.load(json_file) | |
cm_global.pip_overrides = {'numpy': 'numpy<2'} | |
def get_startup_scripts_path(): | |
return os.path.join(core.manager_startup_script_path, "install-scripts.txt") | |
def get_restore_snapshot_path(): | |
return os.path.join(core.manager_startup_script_path, "restore-snapshot.json") | |
def get_snapshot_path(): | |
return core.manager_snapshot_path | |
def get_custom_nodes_paths(): | |
if Ctx.folder_paths is None: | |
print("Error: folder_paths module is not available") | |
return [] | |
return Ctx.folder_paths.get_folder_paths('custom_nodes') | |
cmd_ctx = Ctx() | |
def install_node(node_spec_str, is_all=False, cnt_msg=''): | |
if core.is_valid_url(node_spec_str): | |
# install via urls | |
res = asyncio.run(core.gitclone_install(node_spec_str, no_deps=cmd_ctx.no_deps)) | |
if not res.result: | |
print(res.msg) | |
print(f"[bold red]ERROR: An error occurred while installing '{node_spec_str}'.[/bold red]") | |
else: | |
print(f"{cnt_msg} [INSTALLED] {node_spec_str:50}") | |
else: | |
node_spec = unified_manager.resolve_node_spec(node_spec_str) | |
if node_spec is None: | |
return | |
node_name, version_spec, is_specified = node_spec | |
# NOTE: install node doesn't allow update if version is not specified | |
if not is_specified: | |
version_spec = None | |
res = asyncio.run(unified_manager.install_by_id(node_name, version_spec, cmd_ctx.channel, cmd_ctx.mode, instant_execution=True, no_deps=cmd_ctx.no_deps)) | |
if res.action == 'skip': | |
print(f"{cnt_msg} [ SKIP ] {node_name:50} => Already installed") | |
elif res.action == 'enable': | |
print(f"{cnt_msg} [ ENABLED ] {node_name:50}") | |
elif res.action == 'install-git' and res.target == 'nightly': | |
print(f"{cnt_msg} [INSTALLED] {node_name:50}[NIGHTLY]") | |
elif res.action == 'install-git' and res.target == 'unknown': | |
print(f"{cnt_msg} [INSTALLED] {node_name:50}[UNKNOWN]") | |
elif res.action == 'install-cnr' and res.result: | |
print(f"{cnt_msg} [INSTALLED] {node_name:50}[{res.target}]") | |
elif res.action == 'switch-cnr' and res.result: | |
print(f"{cnt_msg} [INSTALLED] {node_name:50}[{res.target}]") | |
elif (res.action == 'switch-cnr' or res.action == 'install-cnr') and not res.result and node_name in unified_manager.cnr_map: | |
print(f"\nAvailable version of '{node_name}'") | |
show_versions(node_name) | |
print("") | |
else: | |
print(f"[bold red]ERROR: An error occurred while installing '{node_name}'.\n{res.msg}[/bold red]") | |
def reinstall_node(node_spec_str, is_all=False, cnt_msg=''): | |
node_spec = unified_manager.resolve_node_spec(node_spec_str) | |
node_name, version_spec, _ = node_spec | |
unified_manager.unified_uninstall(node_name, version_spec == 'unknown') | |
install_node(node_name, is_all=is_all, cnt_msg=cnt_msg) | |
def fix_node(node_spec_str, is_all=False, cnt_msg=''): | |
node_spec = unified_manager.resolve_node_spec(node_spec_str, guess_mode='active') | |
if node_spec is None: | |
if not is_all: | |
if unified_manager.resolve_node_spec(node_spec_str, guess_mode='inactive') is not None: | |
print(f"{cnt_msg} [ SKIPPED ]: {node_spec_str:50} => Disabled") | |
else: | |
print(f"{cnt_msg} [ SKIPPED ]: {node_spec_str:50} => Not installed") | |
return | |
node_name, version_spec, _ = node_spec | |
print(f"{cnt_msg} [ FIXING ]: {node_name:50}[{version_spec}]") | |
res = unified_manager.unified_fix(node_name, version_spec, no_deps=cmd_ctx.no_deps) | |
if not res.result: | |
print(f"ERROR: f{res.msg}") | |
def uninstall_node(node_spec_str: str, is_all: bool = False, cnt_msg: str = ''): | |
spec = node_spec_str.split('@') | |
if len(spec) == 2 and spec[1] == 'unknown': | |
node_name = spec[0] | |
is_unknown = True | |
else: | |
node_name = spec[0] | |
is_unknown = False | |
res = unified_manager.unified_uninstall(node_name, is_unknown) | |
if len(spec) == 1 and res.action == 'skip' and not is_unknown: | |
res = unified_manager.unified_uninstall(node_name, True) | |
if res.action == 'skip': | |
print(f"{cnt_msg} [ SKIPPED ]: {node_name:50} => Not installed") | |
elif res.result: | |
print(f"{cnt_msg} [UNINSTALLED] {node_name:50}") | |
else: | |
print(f"ERROR: An error occurred while uninstalling '{node_name}'.") | |
def update_node(node_spec_str, is_all=False, cnt_msg=''): | |
node_spec = unified_manager.resolve_node_spec(node_spec_str, 'active') | |
if node_spec is None: | |
if unified_manager.resolve_node_spec(node_spec_str, 'inactive'): | |
print(f"{cnt_msg} [ SKIPPED ]: {node_spec_str:50} => Disabled") | |
else: | |
print(f"{cnt_msg} [ SKIPPED ]: {node_spec_str:50} => Not installed") | |
return None | |
node_name, version_spec, _ = node_spec | |
res = unified_manager.unified_update(node_name, version_spec, no_deps=cmd_ctx.no_deps, return_postinstall=True) | |
if not res.result: | |
print(f"ERROR: An error occurred while updating '{node_name}'.") | |
elif res.action == 'skip': | |
print(f"{cnt_msg} [ SKIPPED ]: {node_name:50} => {res.msg}") | |
else: | |
print(f"{cnt_msg} [ UPDATED ]: {node_name:50} => ({version_spec} -> {res.target})") | |
return res.with_target(f'{node_name}@{res.target}') | |
def update_parallel(nodes): | |
is_all = False | |
if 'all' in nodes: | |
is_all = True | |
nodes = [] | |
for x in unified_manager.active_nodes.keys(): | |
nodes.append(x) | |
for x in unified_manager.unknown_active_nodes.keys(): | |
nodes.append(x+"@unknown") | |
else: | |
nodes = [x for x in nodes if x.lower() not in ['comfy', 'comfyui']] | |
total = len(nodes) | |
lock = threading.Lock() | |
processed = [] | |
i = 0 | |
def process_custom_node(x): | |
nonlocal i | |
nonlocal processed | |
with lock: | |
i += 1 | |
try: | |
res = update_node(x, is_all=is_all, cnt_msg=f'{i}/{total}') | |
with lock: | |
processed.append(res) | |
except Exception as e: | |
print(f"ERROR: {e}") | |
traceback.print_exc() | |
with concurrent.futures.ThreadPoolExecutor(4) as executor: | |
for item in nodes: | |
executor.submit(process_custom_node, item) | |
i = 1 | |
for res in processed: | |
if res is not None: | |
print(f"[{i}/{total}] Post update: {res.target}") | |
if res.postinstall is not None: | |
res.postinstall() | |
i += 1 | |
def update_comfyui(): | |
res = core.update_path(comfy_path, instant_execution=True) | |
if res == 'fail': | |
print("Updating ComfyUI has failed.") | |
elif res == 'updated': | |
print("ComfyUI is updated.") | |
else: | |
print("ComfyUI is already up to date.") | |
def enable_node(node_spec_str, is_all=False, cnt_msg=''): | |
if unified_manager.resolve_node_spec(node_spec_str, guess_mode='active') is not None: | |
print(f"{cnt_msg} [ SKIP ] {node_spec_str:50} => Already enabled") | |
return | |
node_spec = unified_manager.resolve_node_spec(node_spec_str, guess_mode='inactive') | |
if node_spec is None: | |
print(f"{cnt_msg} [ SKIP ] {node_spec_str:50} => Not found") | |
return | |
node_name, version_spec, _ = node_spec | |
res = unified_manager.unified_enable(node_name, version_spec) | |
if res.action == 'skip': | |
print(f"{cnt_msg} [ SKIP ] {node_name:50} => {res.msg}") | |
elif res.result: | |
print(f"{cnt_msg} [ENABLED] {node_name:50}") | |
else: | |
print(f"{cnt_msg} [ FAIL ] {node_name:50} => {res.msg}") | |
def disable_node(node_spec_str: str, is_all=False, cnt_msg=''): | |
if 'comfyui-manager' in node_spec_str.lower(): | |
return | |
node_spec = unified_manager.resolve_node_spec(node_spec_str, guess_mode='active') | |
if node_spec is None: | |
if unified_manager.resolve_node_spec(node_spec_str, guess_mode='inactive') is not None: | |
print(f"{cnt_msg} [ SKIP ] {node_spec_str:50} => Already disabled") | |
else: | |
print(f"{cnt_msg} [ SKIP ] {node_spec_str:50} => Not found") | |
return | |
node_name, version_spec, _ = node_spec | |
res = unified_manager.unified_disable(node_name, version_spec == 'unknown') | |
if res.action == 'skip': | |
print(f"{cnt_msg} [ SKIP ] {node_name:50} => {res.msg}") | |
elif res.result: | |
print(f"{cnt_msg} [DISABLED] {node_name:50}") | |
else: | |
print(f"{cnt_msg} [ FAIL ] {node_name:50} => {res.msg}") | |
def show_list(kind, simple=False): | |
custom_nodes = asyncio.run(unified_manager.get_custom_nodes(channel=cmd_ctx.channel, mode=cmd_ctx.mode)) | |
# collect not-installed unknown nodes | |
not_installed_unknown_nodes = [] | |
repo_unknown = {} | |
for k, v in custom_nodes.items(): | |
if 'cnr_latest' not in v: | |
if len(v['files']) == 1: | |
repo_url = v['files'][0] | |
node_name = repo_url.split('/')[-1] | |
if node_name not in unified_manager.unknown_inactive_nodes and node_name not in unified_manager.unknown_active_nodes: | |
not_installed_unknown_nodes.append(v) | |
else: | |
repo_unknown[node_name] = v | |
processed = {} | |
unknown_processed = [] | |
flag = kind in ['all', 'cnr', 'installed', 'enabled'] | |
for k, v in unified_manager.active_nodes.items(): | |
if flag: | |
cnr = unified_manager.cnr_map[k] | |
processed[k] = "[ ENABLED ] ", cnr['name'], k, cnr['publisher']['name'], v[0] | |
else: | |
processed[k] = None | |
if flag and kind != 'cnr': | |
for k, v in unified_manager.unknown_active_nodes.items(): | |
item = repo_unknown.get(k) | |
if item is None: | |
continue | |
log_item = "[ ENABLED ] ", item['title'], k, item['author'] | |
unknown_processed.append(log_item) | |
flag = kind in ['all', 'cnr', 'installed', 'disabled'] | |
for k, v in unified_manager.cnr_inactive_nodes.items(): | |
if k in processed: | |
continue | |
if flag: | |
cnr = unified_manager.cnr_map[k] | |
processed[k] = "[ DISABLED ] ", cnr['name'], k, cnr['publisher']['name'], ", ".join(list(v.keys())) | |
else: | |
processed[k] = None | |
for k, v in unified_manager.nightly_inactive_nodes.items(): | |
if k in processed: | |
continue | |
if flag: | |
cnr = unified_manager.cnr_map[k] | |
processed[k] = "[ DISABLED ] ", cnr['name'], k, cnr['publisher']['name'], 'nightly' | |
else: | |
processed[k] = None | |
if flag and kind != 'cnr': | |
for k, v in unified_manager.unknown_inactive_nodes.items(): | |
item = repo_unknown.get(k) | |
if item is None: | |
continue | |
log_item = "[ DISABLED ] ", item['title'], k, item['author'] | |
unknown_processed.append(log_item) | |
flag = kind in ['all', 'cnr', 'not-installed'] | |
for k, v in unified_manager.cnr_map.items(): | |
if k in processed: | |
continue | |
if flag: | |
cnr = unified_manager.cnr_map[k] | |
ver_spec = v['latest_version']['version'] if 'latest_version' in v else '0.0.0' | |
processed[k] = "[ NOT INSTALLED ] ", cnr['name'], k, cnr['publisher']['name'], ver_spec | |
else: | |
processed[k] = None | |
if flag and kind != 'cnr': | |
for x in not_installed_unknown_nodes: | |
if len(x['files']) == 1: | |
node_id = os.path.basename(x['files'][0]) | |
log_item = "[ NOT INSTALLED ] ", x['title'], node_id, x['author'] | |
unknown_processed.append(log_item) | |
for x in processed.values(): | |
if x is None: | |
continue | |
prefix, title, short_id, author, ver_spec = x | |
if simple: | |
print(title+'@'+ver_spec) | |
else: | |
print(f"{prefix} {title:50} {short_id:30} (author: {author:20}) \\[{ver_spec}]") | |
for x in unknown_processed: | |
prefix, title, short_id, author = x | |
if simple: | |
print(title+'@unknown') | |
else: | |
print(f"{prefix} {title:50} {short_id:30} (author: {author:20}) [UNKNOWN]") | |
async def show_snapshot(simple_mode=False): | |
json_obj = await core.get_current_snapshot() | |
if simple_mode: | |
print(f"[{json_obj['comfyui']}] comfyui") | |
for k, v in json_obj['git_custom_nodes'].items(): | |
print(f"[{v['hash']}] {k}") | |
for v in json_obj['file_custom_nodes']: | |
print(f"[ N/A ] {v['filename']}") | |
else: | |
formatted_json = json.dumps(json_obj, ensure_ascii=False, indent=4) | |
print(formatted_json) | |
def show_snapshot_list(simple_mode=False): | |
snapshot_path = cmd_ctx.get_snapshot_path() | |
files = os.listdir(snapshot_path) | |
json_files = [x for x in files if x.endswith('.json')] | |
for x in sorted(json_files): | |
print(x) | |
def cancel(): | |
if os.path.exists(cmd_ctx.get_startup_scripts_path()): | |
os.remove(cmd_ctx.get_startup_scripts_path()) | |
if os.path.exists(cmd_ctx.get_restore_snapshot_path()): | |
os.remove(cmd_ctx.get_restore_snapshot_path()) | |
async def auto_save_snapshot(): | |
path = await core.save_snapshot_with_postfix('cli-autosave') | |
print(f"Current snapshot is saved as `{path}`") | |
def get_all_installed_node_specs(): | |
res = [] | |
processed = set() | |
for k, v in unified_manager.active_nodes.items(): | |
node_spec_str = f"{k}@{v[0]}" | |
res.append(node_spec_str) | |
processed.add(k) | |
for k in unified_manager.cnr_inactive_nodes.keys(): | |
if k in processed: | |
continue | |
latest = unified_manager.get_from_cnr_inactive_nodes(k) | |
if latest is not None: | |
node_spec_str = f"{k}@{str(latest[0])}" | |
res.append(node_spec_str) | |
for k in unified_manager.nightly_inactive_nodes.keys(): | |
if k in processed: | |
continue | |
node_spec_str = f"{k}@nightly" | |
res.append(node_spec_str) | |
for k in unified_manager.unknown_active_nodes.keys(): | |
node_spec_str = f"{k}@unknown" | |
res.append(node_spec_str) | |
for k in unified_manager.unknown_inactive_nodes.keys(): | |
node_spec_str = f"{k}@unknown" | |
res.append(node_spec_str) | |
return res | |
def for_each_nodes(nodes, act, allow_all=True): | |
is_all = False | |
if allow_all and 'all' in nodes: | |
is_all = True | |
nodes = get_all_installed_node_specs() | |
else: | |
nodes = [x for x in nodes if x.lower() not in ['comfy', 'comfyui', 'all']] | |
total = len(nodes) | |
i = 1 | |
for x in nodes: | |
try: | |
act(x, is_all=is_all, cnt_msg=f'{i}/{total}') | |
except Exception as e: | |
print(f"ERROR: {e}") | |
traceback.print_exc() | |
i += 1 | |
app = typer.Typer() | |
def help(ctx: typer.Context): | |
print(ctx.find_root().get_help()) | |
ctx.exit(0) | |
def install( | |
nodes: List[str] = typer.Argument( | |
..., help="List of custom nodes to install" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
no_deps: Annotated[ | |
Optional[bool], | |
typer.Option( | |
"--no-deps", | |
show_default=False, | |
help="Skip installing any Python dependencies", | |
), | |
] = False, | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
cmd_ctx.set_no_deps(no_deps) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
for_each_nodes(nodes, act=install_node) | |
pip_fixer.fix_broken() | |
def reinstall( | |
nodes: List[str] = typer.Argument( | |
..., help="List of custom nodes to reinstall" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
no_deps: Annotated[ | |
Optional[bool], | |
typer.Option( | |
"--no-deps", | |
show_default=False, | |
help="Skip installing any Python dependencies", | |
), | |
] = False, | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
cmd_ctx.set_no_deps(no_deps) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
for_each_nodes(nodes, act=reinstall_node) | |
pip_fixer.fix_broken() | |
def uninstall( | |
nodes: List[str] = typer.Argument( | |
..., help="List of custom nodes to uninstall" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
): | |
cmd_ctx.set_channel_mode(channel, mode) | |
for_each_nodes(nodes, act=uninstall_node) | |
def update( | |
nodes: List[str] = typer.Argument( | |
..., | |
help="[all|List of custom nodes to update]" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
if 'all' in nodes: | |
asyncio.run(auto_save_snapshot()) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
for x in nodes: | |
if x.lower() in ['comfyui', 'comfy', 'all']: | |
update_comfyui() | |
break | |
update_parallel(nodes) | |
pip_fixer.fix_broken() | |
def disable( | |
nodes: List[str] = typer.Argument( | |
..., | |
help="[all|List of custom nodes to disable]" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
if 'all' in nodes: | |
asyncio.run(auto_save_snapshot()) | |
for_each_nodes(nodes, disable_node, allow_all=True) | |
def enable( | |
nodes: List[str] = typer.Argument( | |
..., | |
help="[all|List of custom nodes to enable]" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
if 'all' in nodes: | |
asyncio.run(auto_save_snapshot()) | |
for_each_nodes(nodes, enable_node, allow_all=True) | |
def fix( | |
nodes: List[str] = typer.Argument( | |
..., | |
help="[all|List of custom nodes to fix]" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
if 'all' in nodes: | |
asyncio.run(auto_save_snapshot()) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
for_each_nodes(nodes, fix_node, allow_all=True) | |
pip_fixer.fix_broken() | |
def show_versions(node_name: str): | |
versions = cnr_utils.all_versions_of_node(node_name) | |
if versions is None: | |
print(f"Node not found in Comfy Registry: {node_name}") | |
for x in versions: | |
print(f"[{x['createdAt'][:10]}] {x['version']} -- {x['changelog']}") | |
def show( | |
arg: str = typer.Argument( | |
help="[installed|enabled|not-installed|disabled|all|cnr|snapshot|snapshot-list]" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
valid_commands = [ | |
"installed", | |
"enabled", | |
"not-installed", | |
"disabled", | |
"all", | |
"cnr", | |
"snapshot", | |
"snapshot-list", | |
] | |
if arg not in valid_commands: | |
typer.echo(f"Invalid command: `show {arg}`", err=True) | |
exit(1) | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
if arg == 'snapshot': | |
show_snapshot() | |
elif arg == 'snapshot-list': | |
show_snapshot_list() | |
else: | |
show_list(arg) | |
def simple_show( | |
arg: str = typer.Argument( | |
help="[installed|enabled|not-installed|disabled|all|snapshot|snapshot-list]" | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
valid_commands = [ | |
"installed", | |
"enabled", | |
"not-installed", | |
"disabled", | |
"all", | |
"snapshot", | |
"snapshot-list", | |
] | |
if arg not in valid_commands: | |
typer.echo(f"[bold red]Invalid command: `show {arg}`[/bold red]", err=True) | |
exit(1) | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
if arg == 'snapshot': | |
show_snapshot(True) | |
elif arg == 'snapshot-list': | |
show_snapshot_list(True) | |
else: | |
show_list(arg, True) | |
def cli_only_mode( | |
mode: str = typer.Argument( | |
..., help="[enable|disable]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
) | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cli_mode_flag = os.path.join(cmd_ctx.manager_files_directory, '.enable-cli-only-mode') | |
if mode.lower() == 'enable': | |
with open(cli_mode_flag, 'w'): | |
pass | |
print("\nINFO: `cli-only-mode` is enabled\n") | |
elif mode.lower() == 'disable': | |
if os.path.exists(cli_mode_flag): | |
os.remove(cli_mode_flag) | |
print("\nINFO: `cli-only-mode` is disabled\n") | |
else: | |
print(f"\n[bold red]Invalid value for cli-only-mode: {mode}[/bold red]\n") | |
exit(1) | |
def deps_in_workflow( | |
workflow: Annotated[ | |
str, typer.Option(show_default=False, help="Workflow file (.json/.png)") | |
], | |
output: Annotated[ | |
str, typer.Option(show_default=False, help="Output file (.json)") | |
], | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
) | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
input_path = workflow | |
output_path = output | |
if not os.path.exists(input_path): | |
print(f"[bold red]File not found: {input_path}[/bold red]") | |
exit(1) | |
used_exts, unknown_nodes = asyncio.run(core.extract_nodes_from_workflow(input_path, mode=cmd_ctx.mode, channel_url=cmd_ctx.channel)) | |
custom_nodes = {} | |
for x in used_exts: | |
custom_nodes[x] = {'state': core.simple_check_custom_node(x), | |
'hash': '-' | |
} | |
res = { | |
'custom_nodes': custom_nodes, | |
'unknown_nodes': list(unknown_nodes) | |
} | |
with open(output_path, "w", encoding='utf-8') as output_file: | |
json.dump(res, output_file, indent=4) | |
print(f"Workflow dependencies are being saved into {output_path}.") | |
def save_snapshot( | |
output: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, help="Specify the output file path. (.json/.yaml)" | |
), | |
] = None, | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
) | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
path = asyncio.run(core.save_snapshot_with_postfix('snapshot', output)) | |
print(f"Current snapshot is saved as `{path}`") | |
def restore_snapshot( | |
snapshot_name: str, | |
pip_non_url: Optional[bool] = typer.Option( | |
default=None, | |
show_default=False, | |
is_flag=True, | |
help="Restore for pip packages registered on PyPI.", | |
), | |
pip_non_local_url: Optional[bool] = typer.Option( | |
default=None, | |
show_default=False, | |
is_flag=True, | |
help="Restore for pip packages registered at web URLs.", | |
), | |
pip_local_url: Optional[bool] = typer.Option( | |
default=None, | |
show_default=False, | |
is_flag=True, | |
help="Restore for pip packages specified by local paths.", | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
) | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
extras = [] | |
if pip_non_url: | |
extras.append('--pip-non-url') | |
if pip_non_local_url: | |
extras.append('--pip-non-local-url') | |
if pip_local_url: | |
extras.append('--pip-local-url') | |
print(f"PIPs restore mode: {extras}") | |
if os.path.exists(snapshot_name): | |
snapshot_path = os.path.abspath(snapshot_name) | |
else: | |
snapshot_path = os.path.join(cmd_ctx.get_snapshot_path(), snapshot_name) | |
if not os.path.exists(snapshot_path): | |
print(f"[bold red]ERROR: `{snapshot_path}` is not exists.[/bold red]") | |
exit(1) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
try: | |
asyncio.run(core.restore_snapshot(snapshot_path, extras)) | |
except Exception: | |
print("[bold red]ERROR: Failed to restore snapshot.[/bold red]") | |
traceback.print_exc() | |
raise typer.Exit(code=1) | |
pip_fixer.fix_broken() | |
def restore_dependencies( | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
) | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
node_paths = [] | |
for base_path in cmd_ctx.get_custom_nodes_paths(): | |
for name in os.listdir(base_path): | |
target = os.path.join(base_path, name) | |
if os.path.isdir(target) and not name.endswith('.disabled'): | |
node_paths.append(target) | |
total = len(node_paths) | |
i = 1 | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
for x in node_paths: | |
print("----------------------------------------------------------------------------------------------------") | |
print(f"Restoring [{i}/{total}]: {x}") | |
unified_manager.execute_install_script('', x, instant_execution=True) | |
i += 1 | |
pip_fixer.fix_broken() | |
def post_install( | |
path: str = typer.Argument( | |
help="path to custom node", | |
) | |
): | |
path = os.path.expanduser(path) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
unified_manager.execute_install_script('', path, instant_execution=True) | |
pip_fixer.fix_broken() | |
def install_deps( | |
deps: str = typer.Argument( | |
help="Dependency spec file (.json)", | |
), | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
asyncio.run(auto_save_snapshot()) | |
if not os.path.exists(deps): | |
print(f"[bold red]File not found: {deps}[/bold red]") | |
exit(1) | |
else: | |
with open(deps, 'r', encoding="UTF-8", errors="ignore") as json_file: | |
try: | |
json_obj = json.load(json_file) | |
except: | |
print(f"[bold red]Invalid json file: {deps}[/bold red]") | |
exit(1) | |
pip_fixer = manager_util.PIPFixer(manager_util.get_installed_packages()) | |
for k in json_obj['custom_nodes'].keys(): | |
state = core.simple_check_custom_node(k) | |
if state == 'installed': | |
continue | |
elif state == 'not-installed': | |
asyncio.run(core.gitclone_install(k, instant_execution=True)) | |
else: # disabled | |
core.gitclone_set_active([k], False) | |
pip_fixer.fix_broken() | |
print("Dependency installation and activation complete.") | |
def clear(): | |
cancel() | |
def export_custom_node_ids( | |
path: str, | |
channel: Annotated[ | |
str, | |
typer.Option( | |
show_default=False, | |
help="Specify the operation mode" | |
), | |
] = None, | |
mode: str = typer.Option( | |
None, | |
help="[remote|local|cache]" | |
), | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
), | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
cmd_ctx.set_channel_mode(channel, mode) | |
with open(path, "w", encoding='utf-8') as output_file: | |
for x in unified_manager.cnr_map.keys(): | |
print(x, file=output_file) | |
custom_nodes = asyncio.run(unified_manager.get_custom_nodes(channel=cmd_ctx.channel, mode=cmd_ctx.mode)) | |
for x in custom_nodes.values(): | |
if 'cnr_latest' not in x: | |
if len(x['files']) == 1: | |
repo_url = x['files'][0] | |
node_id = repo_url.split('/')[-1] | |
print(f"{node_id}@unknown", file=output_file) | |
if 'id' in x: | |
print(f"{x['id']}@unknown", file=output_file) | |
def migrate( | |
user_directory: str = typer.Option( | |
None, | |
help="user directory" | |
) | |
): | |
cmd_ctx.set_user_directory(user_directory) | |
asyncio.run(unified_manager.migrate_unmanaged_nodes()) | |
if __name__ == '__main__': | |
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) | |
sys.exit(app()) | |
print("") | |