Spaces:
Sleeping
Sleeping
import logging | |
import os.path | |
import pathlib | |
import re | |
import urllib.parse | |
import urllib.request | |
from typing import List, Optional, Tuple | |
from pip._internal.exceptions import BadCommand, InstallationError | |
from pip._internal.utils.misc import HiddenText, display_path, hide_url | |
from pip._internal.utils.subprocess import make_command | |
from pip._internal.vcs.versioncontrol import ( | |
AuthInfo, | |
RemoteNotFoundError, | |
RemoteNotValidError, | |
RevOptions, | |
VersionControl, | |
find_path_to_project_root_from_repo_root, | |
vcs, | |
) | |
urlsplit = urllib.parse.urlsplit | |
urlunsplit = urllib.parse.urlunsplit | |
logger = logging.getLogger(__name__) | |
GIT_VERSION_REGEX = re.compile( | |
r"^git version " # Prefix. | |
r"(\d+)" # Major. | |
r"\.(\d+)" # Dot, minor. | |
r"(?:\.(\d+))?" # Optional dot, patch. | |
r".*$" # Suffix, including any pre- and post-release segments we don't care about. | |
) | |
HASH_REGEX = re.compile('^[a-fA-F0-9]{40}$') | |
# SCP (Secure copy protocol) shorthand. e.g. '[email protected]:foo/bar.git' | |
SCP_REGEX = re.compile(r"""^ | |
# Optional user, e.g. 'git@' | |
(\w+@)? | |
# Server, e.g. 'github.com'. | |
([^/:]+): | |
# The server-side path. e.g. 'user/project.git'. Must start with an | |
# alphanumeric character so as not to be confusable with a Windows paths | |
# like 'C:/foo/bar' or 'C:\foo\bar'. | |
(\w[^:]*) | |
$""", re.VERBOSE) | |
def looks_like_hash(sha): | |
# type: (str) -> bool | |
return bool(HASH_REGEX.match(sha)) | |
class Git(VersionControl): | |
name = 'git' | |
dirname = '.git' | |
repo_name = 'clone' | |
schemes = ( | |
'git+http', 'git+https', 'git+ssh', 'git+git', 'git+file', | |
) | |
# Prevent the user's environment variables from interfering with pip: | |
# https://github.com/pypa/pip/issues/1130 | |
unset_environ = ('GIT_DIR', 'GIT_WORK_TREE') | |
default_arg_rev = 'HEAD' | |
def get_base_rev_args(rev): | |
# type: (str) -> List[str] | |
return [rev] | |
def is_immutable_rev_checkout(self, url, dest): | |
# type: (str, str) -> bool | |
_, rev_options = self.get_url_rev_options(hide_url(url)) | |
if not rev_options.rev: | |
return False | |
if not self.is_commit_id_equal(dest, rev_options.rev): | |
# the current commit is different from rev, | |
# which means rev was something else than a commit hash | |
return False | |
# return False in the rare case rev is both a commit hash | |
# and a tag or a branch; we don't want to cache in that case | |
# because that branch/tag could point to something else in the future | |
is_tag_or_branch = bool( | |
self.get_revision_sha(dest, rev_options.rev)[0] | |
) | |
return not is_tag_or_branch | |
def get_git_version(self) -> Tuple[int, ...]: | |
version = self.run_command( | |
['version'], show_stdout=False, stdout_only=True | |
) | |
match = GIT_VERSION_REGEX.match(version) | |
if not match: | |
return () | |
return tuple(int(c) for c in match.groups()) | |
def get_current_branch(cls, location): | |
# type: (str) -> Optional[str] | |
""" | |
Return the current branch, or None if HEAD isn't at a branch | |
(e.g. detached HEAD). | |
""" | |
# git-symbolic-ref exits with empty stdout if "HEAD" is a detached | |
# HEAD rather than a symbolic ref. In addition, the -q causes the | |
# command to exit with status code 1 instead of 128 in this case | |
# and to suppress the message to stderr. | |
args = ['symbolic-ref', '-q', 'HEAD'] | |
output = cls.run_command( | |
args, | |
extra_ok_returncodes=(1, ), | |
show_stdout=False, | |
stdout_only=True, | |
cwd=location, | |
) | |
ref = output.strip() | |
if ref.startswith('refs/heads/'): | |
return ref[len('refs/heads/'):] | |
return None | |
def get_revision_sha(cls, dest, rev): | |
# type: (str, str) -> Tuple[Optional[str], bool] | |
""" | |
Return (sha_or_none, is_branch), where sha_or_none is a commit hash | |
if the revision names a remote branch or tag, otherwise None. | |
Args: | |
dest: the repository directory. | |
rev: the revision name. | |
""" | |
# Pass rev to pre-filter the list. | |
output = cls.run_command( | |
['show-ref', rev], | |
cwd=dest, | |
show_stdout=False, | |
stdout_only=True, | |
on_returncode='ignore', | |
) | |
refs = {} | |
# NOTE: We do not use splitlines here since that would split on other | |
# unicode separators, which can be maliciously used to install a | |
# different revision. | |
for line in output.strip().split("\n"): | |
line = line.rstrip("\r") | |
if not line: | |
continue | |
try: | |
ref_sha, ref_name = line.split(" ", maxsplit=2) | |
except ValueError: | |
# Include the offending line to simplify troubleshooting if | |
# this error ever occurs. | |
raise ValueError(f'unexpected show-ref line: {line!r}') | |
refs[ref_name] = ref_sha | |
branch_ref = f'refs/remotes/origin/{rev}' | |
tag_ref = f'refs/tags/{rev}' | |
sha = refs.get(branch_ref) | |
if sha is not None: | |
return (sha, True) | |
sha = refs.get(tag_ref) | |
return (sha, False) | |
def _should_fetch(cls, dest, rev): | |
# type: (str, str) -> bool | |
""" | |
Return true if rev is a ref or is a commit that we don't have locally. | |
Branches and tags are not considered in this method because they are | |
assumed to be always available locally (which is a normal outcome of | |
``git clone`` and ``git fetch --tags``). | |
""" | |
if rev.startswith("refs/"): | |
# Always fetch remote refs. | |
return True | |
if not looks_like_hash(rev): | |
# Git fetch would fail with abbreviated commits. | |
return False | |
if cls.has_commit(dest, rev): | |
# Don't fetch if we have the commit locally. | |
return False | |
return True | |
def resolve_revision(cls, dest, url, rev_options): | |
# type: (str, HiddenText, RevOptions) -> RevOptions | |
""" | |
Resolve a revision to a new RevOptions object with the SHA1 of the | |
branch, tag, or ref if found. | |
Args: | |
rev_options: a RevOptions object. | |
""" | |
rev = rev_options.arg_rev | |
# The arg_rev property's implementation for Git ensures that the | |
# rev return value is always non-None. | |
assert rev is not None | |
sha, is_branch = cls.get_revision_sha(dest, rev) | |
if sha is not None: | |
rev_options = rev_options.make_new(sha) | |
rev_options.branch_name = rev if is_branch else None | |
return rev_options | |
# Do not show a warning for the common case of something that has | |
# the form of a Git commit hash. | |
if not looks_like_hash(rev): | |
logger.warning( | |
"Did not find branch or tag '%s', assuming revision or ref.", | |
rev, | |
) | |
if not cls._should_fetch(dest, rev): | |
return rev_options | |
# fetch the requested revision | |
cls.run_command( | |
make_command('fetch', '-q', url, rev_options.to_args()), | |
cwd=dest, | |
) | |
# Change the revision to the SHA of the ref we fetched | |
sha = cls.get_revision(dest, rev='FETCH_HEAD') | |
rev_options = rev_options.make_new(sha) | |
return rev_options | |
def is_commit_id_equal(cls, dest, name): | |
# type: (str, Optional[str]) -> bool | |
""" | |
Return whether the current commit hash equals the given name. | |
Args: | |
dest: the repository directory. | |
name: a string name. | |
""" | |
if not name: | |
# Then avoid an unnecessary subprocess call. | |
return False | |
return cls.get_revision(dest) == name | |
def fetch_new(self, dest, url, rev_options): | |
# type: (str, HiddenText, RevOptions) -> None | |
rev_display = rev_options.to_display() | |
logger.info('Cloning %s%s to %s', url, rev_display, display_path(dest)) | |
self.run_command(make_command('clone', '-q', url, dest)) | |
if rev_options.rev: | |
# Then a specific revision was requested. | |
rev_options = self.resolve_revision(dest, url, rev_options) | |
branch_name = getattr(rev_options, 'branch_name', None) | |
if branch_name is None: | |
# Only do a checkout if the current commit id doesn't match | |
# the requested revision. | |
if not self.is_commit_id_equal(dest, rev_options.rev): | |
cmd_args = make_command( | |
'checkout', '-q', rev_options.to_args(), | |
) | |
self.run_command(cmd_args, cwd=dest) | |
elif self.get_current_branch(dest) != branch_name: | |
# Then a specific branch was requested, and that branch | |
# is not yet checked out. | |
track_branch = f'origin/{branch_name}' | |
cmd_args = [ | |
'checkout', '-b', branch_name, '--track', track_branch, | |
] | |
self.run_command(cmd_args, cwd=dest) | |
else: | |
sha = self.get_revision(dest) | |
rev_options = rev_options.make_new(sha) | |
logger.info("Resolved %s to commit %s", url, rev_options.rev) | |
#: repo may contain submodules | |
self.update_submodules(dest) | |
def switch(self, dest, url, rev_options): | |
# type: (str, HiddenText, RevOptions) -> None | |
self.run_command( | |
make_command('config', 'remote.origin.url', url), | |
cwd=dest, | |
) | |
cmd_args = make_command('checkout', '-q', rev_options.to_args()) | |
self.run_command(cmd_args, cwd=dest) | |
self.update_submodules(dest) | |
def update(self, dest, url, rev_options): | |
# type: (str, HiddenText, RevOptions) -> None | |
# First fetch changes from the default remote | |
if self.get_git_version() >= (1, 9): | |
# fetch tags in addition to everything else | |
self.run_command(['fetch', '-q', '--tags'], cwd=dest) | |
else: | |
self.run_command(['fetch', '-q'], cwd=dest) | |
# Then reset to wanted revision (maybe even origin/master) | |
rev_options = self.resolve_revision(dest, url, rev_options) | |
cmd_args = make_command('reset', '--hard', '-q', rev_options.to_args()) | |
self.run_command(cmd_args, cwd=dest) | |
#: update submodules | |
self.update_submodules(dest) | |
def get_remote_url(cls, location): | |
# type: (str) -> str | |
""" | |
Return URL of the first remote encountered. | |
Raises RemoteNotFoundError if the repository does not have a remote | |
url configured. | |
""" | |
# We need to pass 1 for extra_ok_returncodes since the command | |
# exits with return code 1 if there are no matching lines. | |
stdout = cls.run_command( | |
['config', '--get-regexp', r'remote\..*\.url'], | |
extra_ok_returncodes=(1, ), | |
show_stdout=False, | |
stdout_only=True, | |
cwd=location, | |
) | |
remotes = stdout.splitlines() | |
try: | |
found_remote = remotes[0] | |
except IndexError: | |
raise RemoteNotFoundError | |
for remote in remotes: | |
if remote.startswith('remote.origin.url '): | |
found_remote = remote | |
break | |
url = found_remote.split(' ')[1] | |
return cls._git_remote_to_pip_url(url.strip()) | |
def _git_remote_to_pip_url(url): | |
# type: (str) -> str | |
""" | |
Convert a remote url from what git uses to what pip accepts. | |
There are 3 legal forms **url** may take: | |
1. A fully qualified url: ssh://[email protected]/foo/bar.git | |
2. A local project.git folder: /path/to/bare/repository.git | |
3. SCP shorthand for form 1: [email protected]:foo/bar.git | |
Form 1 is output as-is. Form 2 must be converted to URI and form 3 must | |
be converted to form 1. | |
See the corresponding test test_git_remote_url_to_pip() for examples of | |
sample inputs/outputs. | |
""" | |
if re.match(r"\w+://", url): | |
# This is already valid. Pass it though as-is. | |
return url | |
if os.path.exists(url): | |
# A local bare remote (git clone --mirror). | |
# Needs a file:// prefix. | |
return pathlib.PurePath(url).as_uri() | |
scp_match = SCP_REGEX.match(url) | |
if scp_match: | |
# Add an ssh:// prefix and replace the ':' with a '/'. | |
return scp_match.expand(r"ssh://\1\2/\3") | |
# Otherwise, bail out. | |
raise RemoteNotValidError(url) | |
def has_commit(cls, location, rev): | |
# type: (str, str) -> bool | |
""" | |
Check if rev is a commit that is available in the local repository. | |
""" | |
try: | |
cls.run_command( | |
['rev-parse', '-q', '--verify', "sha^" + rev], | |
cwd=location, | |
log_failed_cmd=False, | |
) | |
except InstallationError: | |
return False | |
else: | |
return True | |
def get_revision(cls, location, rev=None): | |
# type: (str, Optional[str]) -> str | |
if rev is None: | |
rev = 'HEAD' | |
current_rev = cls.run_command( | |
['rev-parse', rev], | |
show_stdout=False, | |
stdout_only=True, | |
cwd=location, | |
) | |
return current_rev.strip() | |
def get_subdirectory(cls, location): | |
# type: (str) -> Optional[str] | |
""" | |
Return the path to Python project root, relative to the repo root. | |
Return None if the project root is in the repo root. | |
""" | |
# find the repo root | |
git_dir = cls.run_command( | |
['rev-parse', '--git-dir'], | |
show_stdout=False, | |
stdout_only=True, | |
cwd=location, | |
).strip() | |
if not os.path.isabs(git_dir): | |
git_dir = os.path.join(location, git_dir) | |
repo_root = os.path.abspath(os.path.join(git_dir, '..')) | |
return find_path_to_project_root_from_repo_root(location, repo_root) | |
def get_url_rev_and_auth(cls, url): | |
# type: (str) -> Tuple[str, Optional[str], AuthInfo] | |
""" | |
Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'. | |
That's required because although they use SSH they sometimes don't | |
work with a ssh:// scheme (e.g. GitHub). But we need a scheme for | |
parsing. Hence we remove it again afterwards and return it as a stub. | |
""" | |
# Works around an apparent Git bug | |
# (see https://article.gmane.org/gmane.comp.version-control.git/146500) | |
scheme, netloc, path, query, fragment = urlsplit(url) | |
if scheme.endswith('file'): | |
initial_slashes = path[:-len(path.lstrip('/'))] | |
newpath = ( | |
initial_slashes + | |
urllib.request.url2pathname(path) | |
.replace('\\', '/').lstrip('/') | |
) | |
after_plus = scheme.find('+') + 1 | |
url = scheme[:after_plus] + urlunsplit( | |
(scheme[after_plus:], netloc, newpath, query, fragment), | |
) | |
if '://' not in url: | |
assert 'file:' not in url | |
url = url.replace('git+', 'git+ssh://') | |
url, rev, user_pass = super().get_url_rev_and_auth(url) | |
url = url.replace('ssh://', '') | |
else: | |
url, rev, user_pass = super().get_url_rev_and_auth(url) | |
return url, rev, user_pass | |
def update_submodules(cls, location): | |
# type: (str) -> None | |
if not os.path.exists(os.path.join(location, '.gitmodules')): | |
return | |
cls.run_command( | |
['submodule', 'update', '--init', '--recursive', '-q'], | |
cwd=location, | |
) | |
def get_repository_root(cls, location): | |
# type: (str) -> Optional[str] | |
loc = super().get_repository_root(location) | |
if loc: | |
return loc | |
try: | |
r = cls.run_command( | |
['rev-parse', '--show-toplevel'], | |
cwd=location, | |
show_stdout=False, | |
stdout_only=True, | |
on_returncode='raise', | |
log_failed_cmd=False, | |
) | |
except BadCommand: | |
logger.debug("could not determine if %s is under git control " | |
"because git is not available", location) | |
return None | |
except InstallationError: | |
return None | |
return os.path.normpath(r.rstrip('\r\n')) | |
def should_add_vcs_url_prefix(repo_url): | |
# type: (str) -> bool | |
"""In either https or ssh form, requirements must be prefixed with git+. | |
""" | |
return True | |
vcs.register(Git) | |