File size: 9,012 Bytes
ea58bad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
from __future__ import annotations
import configparser
import os
import threading
import re
from modules import shared, errors, cache, scripts
from modules.gitpython_hack import Repo
from modules.paths_internal import extensions_dir, extensions_builtin_dir, script_path # noqa: F401
os.makedirs(extensions_dir, exist_ok=True)
def active():
if shared.cmd_opts.disable_all_extensions or shared.opts.disable_all_extensions == "all":
return []
elif shared.cmd_opts.disable_extra_extensions or shared.opts.disable_all_extensions == "extra":
return [x for x in extensions if x.enabled and x.is_builtin]
else:
return [x for x in extensions if x.enabled]
class ExtensionMetadata:
filename = "metadata.ini"
config: configparser.ConfigParser
canonical_name: str
requires: list
def __init__(self, path, canonical_name):
self.config = configparser.ConfigParser()
filepath = os.path.join(path, self.filename)
# `self.config.read()` will quietly swallow OSErrors (which FileNotFoundError is),
# so no need to check whether the file exists beforehand.
try:
self.config.read(filepath)
except Exception:
errors.report(f"Error reading {self.filename} for extension {canonical_name}.", exc_info=True)
self.canonical_name = self.config.get("Extension", "Name", fallback=canonical_name)
self.canonical_name = canonical_name.lower().strip()
self.requires = self.get_script_requirements("Requires", "Extension")
def get_script_requirements(self, field, section, extra_section=None):
"""reads a list of requirements from the config; field is the name of the field in the ini file,
like Requires or Before, and section is the name of the [section] in the ini file; additionally,
reads more requirements from [extra_section] if specified."""
x = self.config.get(section, field, fallback='')
if extra_section:
x = x + ', ' + self.config.get(extra_section, field, fallback='')
return self.parse_list(x.lower())
def parse_list(self, text):
"""converts a line from config ("ext1 ext2, ext3 ") into a python list (["ext1", "ext2", "ext3"])"""
if not text:
return []
# both "," and " " are accepted as separator
return [x for x in re.split(r"[,\s]+", text.strip()) if x]
class Extension:
lock = threading.Lock()
cached_fields = ['remote', 'commit_date', 'branch', 'commit_hash', 'version']
metadata: ExtensionMetadata
def __init__(self, name, path, enabled=True, is_builtin=False, metadata=None):
self.name = name
self.path = path
self.enabled = enabled
self.status = ''
self.can_update = False
self.is_builtin = is_builtin
self.commit_hash = ''
self.commit_date = None
self.version = ''
self.branch = None
self.remote = None
self.have_info_from_repo = False
self.metadata = metadata if metadata else ExtensionMetadata(self.path, name.lower())
self.canonical_name = metadata.canonical_name
def to_dict(self):
return {x: getattr(self, x) for x in self.cached_fields}
def from_dict(self, d):
for field in self.cached_fields:
setattr(self, field, d[field])
def read_info_from_repo(self):
if self.is_builtin or self.have_info_from_repo:
return
def read_from_repo():
with self.lock:
if self.have_info_from_repo:
return
self.do_read_info_from_repo()
return self.to_dict()
try:
d = cache.cached_data_for_file('extensions-git', self.name, os.path.join(self.path, ".git"), read_from_repo)
self.from_dict(d)
except FileNotFoundError:
pass
self.status = 'unknown' if self.status == '' else self.status
def do_read_info_from_repo(self):
repo = None
try:
if os.path.exists(os.path.join(self.path, ".git")):
repo = Repo(self.path)
except Exception:
errors.report(f"Error reading github repository info from {self.path}", exc_info=True)
if repo is None or repo.bare:
self.remote = None
else:
try:
self.remote = next(repo.remote().urls, None)
commit = repo.head.commit
self.commit_date = commit.committed_date
if repo.active_branch:
self.branch = repo.active_branch.name
self.commit_hash = commit.hexsha
self.version = self.commit_hash[:8]
except Exception:
errors.report(f"Failed reading extension data from Git repository ({self.name})", exc_info=True)
self.remote = None
self.have_info_from_repo = True
def list_files(self, subdir, extension):
dirpath = os.path.join(self.path, subdir)
if not os.path.isdir(dirpath):
return []
res = []
for filename in sorted(os.listdir(dirpath)):
res.append(scripts.ScriptFile(self.path, filename, os.path.join(dirpath, filename)))
res = [x for x in res if os.path.splitext(x.path)[1].lower() == extension and os.path.isfile(x.path)]
return res
def check_updates(self):
repo = Repo(self.path)
for fetch in repo.remote().fetch(dry_run=True):
if fetch.flags != fetch.HEAD_UPTODATE:
self.can_update = True
self.status = "new commits"
return
try:
origin = repo.rev_parse('origin')
if repo.head.commit != origin:
self.can_update = True
self.status = "behind HEAD"
return
except Exception:
self.can_update = False
self.status = "unknown (remote error)"
return
self.can_update = False
self.status = "latest"
def fetch_and_reset_hard(self, commit='origin'):
repo = Repo(self.path)
# Fix: `error: Your local changes to the following files would be overwritten by merge`,
# because WSL2 Docker set 755 file permissions instead of 644, this results to the error.
repo.git.fetch(all=True)
repo.git.reset(commit, hard=True)
self.have_info_from_repo = False
def list_extensions():
extensions.clear()
if shared.cmd_opts.disable_all_extensions:
print("*** \"--disable-all-extensions\" arg was used, will not load any extensions ***")
elif shared.opts.disable_all_extensions == "all":
print("*** \"Disable all extensions\" option was set, will not load any extensions ***")
elif shared.cmd_opts.disable_extra_extensions:
print("*** \"--disable-extra-extensions\" arg was used, will only load built-in extensions ***")
elif shared.opts.disable_all_extensions == "extra":
print("*** \"Disable all extensions\" option was set, will only load built-in extensions ***")
loaded_extensions = {}
# scan through extensions directory and load metadata
for dirname in [extensions_builtin_dir, extensions_dir]:
if not os.path.isdir(dirname):
continue
for extension_dirname in sorted(os.listdir(dirname)):
path = os.path.join(dirname, extension_dirname)
if not os.path.isdir(path):
continue
canonical_name = extension_dirname
metadata = ExtensionMetadata(path, canonical_name)
# check for duplicated canonical names
already_loaded_extension = loaded_extensions.get(metadata.canonical_name)
if already_loaded_extension is not None:
errors.report(f'Duplicate canonical name "{canonical_name}" found in extensions "{extension_dirname}" and "{already_loaded_extension.name}". Former will be discarded.', exc_info=False)
continue
is_builtin = dirname == extensions_builtin_dir
extension = Extension(name=extension_dirname, path=path, enabled=extension_dirname not in shared.opts.disabled_extensions, is_builtin=is_builtin, metadata=metadata)
extensions.append(extension)
loaded_extensions[canonical_name] = extension
# check for requirements
for extension in extensions:
for req in extension.metadata.requires:
required_extension = loaded_extensions.get(req)
if required_extension is None:
errors.report(f'Extension "{extension.name}" requires "{req}" which is not installed.', exc_info=False)
continue
if not extension.enabled:
errors.report(f'Extension "{extension.name}" requires "{required_extension.name}" which is disabled.', exc_info=False)
continue
extensions: list[Extension] = []
|