MaykaGR commited on
Commit
f5996a0
·
verified ·
1 Parent(s): 106955a

Upload folder_paths.py

Browse files
Files changed (1) hide show
  1. folder_paths.py +392 -0
folder_paths.py ADDED
@@ -0,0 +1,392 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import os
4
+ import time
5
+ import mimetypes
6
+ import logging
7
+ from typing import Literal
8
+ from collections.abc import Collection
9
+
10
+ from comfy.cli_args import args
11
+
12
+ supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft'}
13
+
14
+ folder_names_and_paths: dict[str, tuple[list[str], set[str]]] = {}
15
+
16
+ # --base-directory - Resets all default paths configured in folder_paths with a new base path
17
+ if args.base_directory:
18
+ base_path = os.path.abspath(args.base_directory)
19
+ else:
20
+ base_path = os.path.dirname(os.path.realpath(__file__))
21
+
22
+ models_dir = os.path.join(base_path, "models")
23
+ folder_names_and_paths["checkpoints"] = ([os.path.join(models_dir, "checkpoints")], supported_pt_extensions)
24
+ folder_names_and_paths["configs"] = ([os.path.join(models_dir, "configs")], [".yaml"])
25
+
26
+ folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions)
27
+ folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions)
28
+ folder_names_and_paths["text_encoders"] = ([os.path.join(models_dir, "text_encoders"), os.path.join(models_dir, "clip")], supported_pt_extensions)
29
+ folder_names_and_paths["diffusion_models"] = ([os.path.join(models_dir, "unet"), os.path.join(models_dir, "diffusion_models")], supported_pt_extensions)
30
+ folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions)
31
+ folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions)
32
+ folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions)
33
+ folder_names_and_paths["diffusers"] = ([os.path.join(models_dir, "diffusers")], ["folder"])
34
+ folder_names_and_paths["vae_approx"] = ([os.path.join(models_dir, "vae_approx")], supported_pt_extensions)
35
+
36
+ folder_names_and_paths["controlnet"] = ([os.path.join(models_dir, "controlnet"), os.path.join(models_dir, "t2i_adapter")], supported_pt_extensions)
37
+ folder_names_and_paths["gligen"] = ([os.path.join(models_dir, "gligen")], supported_pt_extensions)
38
+
39
+ folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions)
40
+
41
+ folder_names_and_paths["custom_nodes"] = ([os.path.join(base_path, "custom_nodes")], set())
42
+
43
+ folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions)
44
+
45
+ folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")], supported_pt_extensions)
46
+
47
+ folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""})
48
+
49
+ output_directory = os.path.join(base_path, "output")
50
+ temp_directory = os.path.join(base_path, "temp")
51
+ input_directory = os.path.join(base_path, "input")
52
+ user_directory = os.path.join(base_path, "user")
53
+
54
+ filename_list_cache: dict[str, tuple[list[str], dict[str, float], float]] = {}
55
+
56
+ class CacheHelper:
57
+ """
58
+ Helper class for managing file list cache data.
59
+ """
60
+ def __init__(self):
61
+ self.cache: dict[str, tuple[list[str], dict[str, float], float]] = {}
62
+ self.active = False
63
+
64
+ def get(self, key: str, default=None) -> tuple[list[str], dict[str, float], float]:
65
+ if not self.active:
66
+ return default
67
+ return self.cache.get(key, default)
68
+
69
+ def set(self, key: str, value: tuple[list[str], dict[str, float], float]) -> None:
70
+ if self.active:
71
+ self.cache[key] = value
72
+
73
+ def clear(self):
74
+ self.cache.clear()
75
+
76
+ def __enter__(self):
77
+ self.active = True
78
+ return self
79
+
80
+ def __exit__(self, exc_type, exc_value, traceback):
81
+ self.active = False
82
+ self.clear()
83
+
84
+ cache_helper = CacheHelper()
85
+
86
+ extension_mimetypes_cache = {
87
+ "webp" : "image",
88
+ }
89
+
90
+ def map_legacy(folder_name: str) -> str:
91
+ legacy = {"unet": "diffusion_models",
92
+ "clip": "text_encoders"}
93
+ return legacy.get(folder_name, folder_name)
94
+
95
+ if not os.path.exists(input_directory):
96
+ try:
97
+ os.makedirs(input_directory)
98
+ except:
99
+ logging.error("Failed to create input directory")
100
+
101
+ def set_output_directory(output_dir: str) -> None:
102
+ global output_directory
103
+ output_directory = output_dir
104
+
105
+ def set_temp_directory(temp_dir: str) -> None:
106
+ global temp_directory
107
+ temp_directory = temp_dir
108
+
109
+ def set_input_directory(input_dir: str) -> None:
110
+ global input_directory
111
+ input_directory = input_dir
112
+
113
+ def get_output_directory() -> str:
114
+ global output_directory
115
+ return output_directory
116
+
117
+ def get_temp_directory() -> str:
118
+ global temp_directory
119
+ return temp_directory
120
+
121
+ def get_input_directory() -> str:
122
+ global input_directory
123
+ return input_directory
124
+
125
+ def get_user_directory() -> str:
126
+ return user_directory
127
+
128
+ def set_user_directory(user_dir: str) -> None:
129
+ global user_directory
130
+ user_directory = user_dir
131
+
132
+
133
+ #NOTE: used in http server so don't put folders that should not be accessed remotely
134
+ def get_directory_by_type(type_name: str) -> str | None:
135
+ if type_name == "output":
136
+ return get_output_directory()
137
+ if type_name == "temp":
138
+ return get_temp_directory()
139
+ if type_name == "input":
140
+ return get_input_directory()
141
+ return None
142
+
143
+ def filter_files_content_types(files: list[str], content_types: Literal["image", "video", "audio"]) -> list[str]:
144
+ """
145
+ Example:
146
+ files = os.listdir(folder_paths.get_input_directory())
147
+ filter_files_content_types(files, ["image", "audio", "video"])
148
+ """
149
+ global extension_mimetypes_cache
150
+ result = []
151
+ for file in files:
152
+ extension = file.split('.')[-1]
153
+ if extension not in extension_mimetypes_cache:
154
+ mime_type, _ = mimetypes.guess_type(file, strict=False)
155
+ if not mime_type:
156
+ continue
157
+ content_type = mime_type.split('/')[0]
158
+ extension_mimetypes_cache[extension] = content_type
159
+ else:
160
+ content_type = extension_mimetypes_cache[extension]
161
+
162
+ if content_type in content_types:
163
+ result.append(file)
164
+ return result
165
+
166
+ # determine base_dir rely on annotation if name is 'filename.ext [annotation]' format
167
+ # otherwise use default_path as base_dir
168
+ def annotated_filepath(name: str) -> tuple[str, str | None]:
169
+ if name.endswith("[output]"):
170
+ base_dir = get_output_directory()
171
+ name = name[:-9]
172
+ elif name.endswith("[input]"):
173
+ base_dir = get_input_directory()
174
+ name = name[:-8]
175
+ elif name.endswith("[temp]"):
176
+ base_dir = get_temp_directory()
177
+ name = name[:-7]
178
+ else:
179
+ return name, None
180
+
181
+ return name, base_dir
182
+
183
+
184
+ def get_annotated_filepath(name: str, default_dir: str | None=None) -> str:
185
+ name, base_dir = annotated_filepath(name)
186
+
187
+ if base_dir is None:
188
+ if default_dir is not None:
189
+ base_dir = default_dir
190
+ else:
191
+ base_dir = get_input_directory() # fallback path
192
+
193
+ return os.path.join(base_dir, name)
194
+
195
+
196
+ def exists_annotated_filepath(name) -> bool:
197
+ name, base_dir = annotated_filepath(name)
198
+
199
+ if base_dir is None:
200
+ base_dir = get_input_directory() # fallback path
201
+
202
+ filepath = os.path.join(base_dir, name)
203
+ return os.path.exists(filepath)
204
+
205
+
206
+ def add_model_folder_path(folder_name: str, full_folder_path: str, is_default: bool = False) -> None:
207
+ global folder_names_and_paths
208
+ folder_name = map_legacy(folder_name)
209
+ if folder_name in folder_names_and_paths:
210
+ paths, _exts = folder_names_and_paths[folder_name]
211
+ if full_folder_path in paths:
212
+ if is_default and paths[0] != full_folder_path:
213
+ # If the path to the folder is not the first in the list, move it to the beginning.
214
+ paths.remove(full_folder_path)
215
+ paths.insert(0, full_folder_path)
216
+ else:
217
+ if is_default:
218
+ paths.insert(0, full_folder_path)
219
+ else:
220
+ paths.append(full_folder_path)
221
+ else:
222
+ folder_names_and_paths[folder_name] = ([full_folder_path], set())
223
+
224
+ def get_folder_paths(folder_name: str) -> list[str]:
225
+ folder_name = map_legacy(folder_name)
226
+ return folder_names_and_paths[folder_name][0][:]
227
+
228
+ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) -> tuple[list[str], dict[str, float]]:
229
+ if not os.path.isdir(directory):
230
+ return [], {}
231
+
232
+ if excluded_dir_names is None:
233
+ excluded_dir_names = []
234
+
235
+ result = []
236
+ dirs = {}
237
+
238
+ # Attempt to add the initial directory to dirs with error handling
239
+ try:
240
+ dirs[directory] = os.path.getmtime(directory)
241
+ except FileNotFoundError:
242
+ logging.warning(f"Warning: Unable to access {directory}. Skipping this path.")
243
+
244
+ logging.debug("recursive file list on directory {}".format(directory))
245
+ dirpath: str
246
+ subdirs: list[str]
247
+ filenames: list[str]
248
+
249
+ for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True):
250
+ subdirs[:] = [d for d in subdirs if d not in excluded_dir_names]
251
+ for file_name in filenames:
252
+ try:
253
+ relative_path = os.path.relpath(os.path.join(dirpath, file_name), directory)
254
+ result.append(relative_path)
255
+ except:
256
+ logging.warning(f"Warning: Unable to access {file_name}. Skipping this file.")
257
+ continue
258
+
259
+ for d in subdirs:
260
+ path: str = os.path.join(dirpath, d)
261
+ try:
262
+ dirs[path] = os.path.getmtime(path)
263
+ except FileNotFoundError:
264
+ logging.warning(f"Warning: Unable to access {path}. Skipping this path.")
265
+ continue
266
+ logging.debug("found {} files".format(len(result)))
267
+ return result, dirs
268
+
269
+ def filter_files_extensions(files: Collection[str], extensions: Collection[str]) -> list[str]:
270
+ return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files)))
271
+
272
+
273
+
274
+ def get_full_path(folder_name: str, filename: str) -> str | None:
275
+ global folder_names_and_paths
276
+ folder_name = map_legacy(folder_name)
277
+ if folder_name not in folder_names_and_paths:
278
+ return None
279
+ folders = folder_names_and_paths[folder_name]
280
+ filename = os.path.relpath(os.path.join("/", filename), "/")
281
+ for x in folders[0]:
282
+ full_path = os.path.join(x, filename)
283
+ if os.path.isfile(full_path):
284
+ return full_path
285
+ elif os.path.islink(full_path):
286
+ logging.warning("WARNING path {} exists but doesn't link anywhere, skipping.".format(full_path))
287
+
288
+ return None
289
+
290
+
291
+ def get_full_path_or_raise(folder_name: str, filename: str) -> str:
292
+ full_path = get_full_path(folder_name, filename)
293
+ if full_path is None:
294
+ raise FileNotFoundError(f"Model in folder '{folder_name}' with filename '{filename}' not found.")
295
+ return full_path
296
+
297
+
298
+ def get_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float], float]:
299
+ folder_name = map_legacy(folder_name)
300
+ global folder_names_and_paths
301
+ output_list = set()
302
+ folders = folder_names_and_paths[folder_name]
303
+ output_folders = {}
304
+ for x in folders[0]:
305
+ files, folders_all = recursive_search(x, excluded_dir_names=[".git"])
306
+ output_list.update(filter_files_extensions(files, folders[1]))
307
+ output_folders = {**output_folders, **folders_all}
308
+
309
+ return sorted(list(output_list)), output_folders, time.perf_counter()
310
+
311
+ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float], float] | None:
312
+ strong_cache = cache_helper.get(folder_name)
313
+ if strong_cache is not None:
314
+ return strong_cache
315
+
316
+ global filename_list_cache
317
+ global folder_names_and_paths
318
+ folder_name = map_legacy(folder_name)
319
+ if folder_name not in filename_list_cache:
320
+ return None
321
+ out = filename_list_cache[folder_name]
322
+
323
+ for x in out[1]:
324
+ time_modified = out[1][x]
325
+ folder = x
326
+ if os.path.getmtime(folder) != time_modified:
327
+ return None
328
+
329
+ folders = folder_names_and_paths[folder_name]
330
+ for x in folders[0]:
331
+ if os.path.isdir(x):
332
+ if x not in out[1]:
333
+ return None
334
+
335
+ return out
336
+
337
+ def get_filename_list(folder_name: str) -> list[str]:
338
+ folder_name = map_legacy(folder_name)
339
+ out = cached_filename_list_(folder_name)
340
+ if out is None:
341
+ out = get_filename_list_(folder_name)
342
+ global filename_list_cache
343
+ filename_list_cache[folder_name] = out
344
+ cache_helper.set(folder_name, out)
345
+ return list(out[0])
346
+
347
+ def get_save_image_path(filename_prefix: str, output_dir: str, image_width=0, image_height=0) -> tuple[str, str, int, str, str]:
348
+ def map_filename(filename: str) -> tuple[int, str]:
349
+ prefix_len = len(os.path.basename(filename_prefix))
350
+ prefix = filename[:prefix_len + 1]
351
+ try:
352
+ digits = int(filename[prefix_len + 1:].split('_')[0])
353
+ except:
354
+ digits = 0
355
+ return digits, prefix
356
+
357
+ def compute_vars(input: str, image_width: int, image_height: int) -> str:
358
+ input = input.replace("%width%", str(image_width))
359
+ input = input.replace("%height%", str(image_height))
360
+ now = time.localtime()
361
+ input = input.replace("%year%", str(now.tm_year))
362
+ input = input.replace("%month%", str(now.tm_mon).zfill(2))
363
+ input = input.replace("%day%", str(now.tm_mday).zfill(2))
364
+ input = input.replace("%hour%", str(now.tm_hour).zfill(2))
365
+ input = input.replace("%minute%", str(now.tm_min).zfill(2))
366
+ input = input.replace("%second%", str(now.tm_sec).zfill(2))
367
+ return input
368
+
369
+ if "%" in filename_prefix:
370
+ filename_prefix = compute_vars(filename_prefix, image_width, image_height)
371
+
372
+ subfolder = os.path.dirname(os.path.normpath(filename_prefix))
373
+ filename = os.path.basename(os.path.normpath(filename_prefix))
374
+
375
+ full_output_folder = os.path.join(output_dir, subfolder)
376
+
377
+ if os.path.commonpath((output_dir, os.path.abspath(full_output_folder))) != output_dir:
378
+ err = "**** ERROR: Saving image outside the output folder is not allowed." + \
379
+ "\n full_output_folder: " + os.path.abspath(full_output_folder) + \
380
+ "\n output_dir: " + output_dir + \
381
+ "\n commonpath: " + os.path.commonpath((output_dir, os.path.abspath(full_output_folder)))
382
+ logging.error(err)
383
+ raise Exception(err)
384
+
385
+ try:
386
+ counter = max(filter(lambda a: os.path.normcase(a[1][:-1]) == os.path.normcase(filename) and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1
387
+ except ValueError:
388
+ counter = 1
389
+ except FileNotFoundError:
390
+ os.makedirs(full_output_folder, exist_ok=True)
391
+ counter = 1
392
+ return full_output_folder, filename, counter, subfolder, filename_prefix