Spaces:
Sleeping
Sleeping
import logging | |
import shutil | |
import sys | |
import textwrap | |
import xmlrpc.client | |
from collections import OrderedDict | |
from optparse import Values | |
from typing import TYPE_CHECKING, Dict, List, Optional | |
from pip._vendor.packaging.version import parse as parse_version | |
from pip._internal.cli.base_command import Command | |
from pip._internal.cli.req_command import SessionCommandMixin | |
from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS | |
from pip._internal.exceptions import CommandError | |
from pip._internal.metadata import get_default_environment | |
from pip._internal.models.index import PyPI | |
from pip._internal.network.xmlrpc import PipXmlrpcTransport | |
from pip._internal.utils.logging import indent_log | |
from pip._internal.utils.misc import write_output | |
if TYPE_CHECKING: | |
from typing import TypedDict | |
class TransformedHit(TypedDict): | |
name: str | |
summary: str | |
versions: List[str] | |
logger = logging.getLogger(__name__) | |
class SearchCommand(Command, SessionCommandMixin): | |
"""Search for PyPI packages whose name or summary contains <query>.""" | |
usage = """ | |
%prog [options] <query>""" | |
ignore_require_venv = True | |
def add_options(self) -> None: | |
self.cmd_opts.add_option( | |
'-i', '--index', | |
dest='index', | |
metavar='URL', | |
default=PyPI.pypi_url, | |
help='Base URL of Python Package Index (default %default)') | |
self.parser.insert_option_group(0, self.cmd_opts) | |
def run(self, options: Values, args: List[str]) -> int: | |
if not args: | |
raise CommandError('Missing required argument (search query).') | |
query = args | |
pypi_hits = self.search(query, options) | |
hits = transform_hits(pypi_hits) | |
terminal_width = None | |
if sys.stdout.isatty(): | |
terminal_width = shutil.get_terminal_size()[0] | |
print_results(hits, terminal_width=terminal_width) | |
if pypi_hits: | |
return SUCCESS | |
return NO_MATCHES_FOUND | |
def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: | |
index_url = options.index | |
session = self.get_default_session(options) | |
transport = PipXmlrpcTransport(index_url, session) | |
pypi = xmlrpc.client.ServerProxy(index_url, transport) | |
try: | |
hits = pypi.search({'name': query, 'summary': query}, 'or') | |
except xmlrpc.client.Fault as fault: | |
message = "XMLRPC request failed [code: {code}]\n{string}".format( | |
code=fault.faultCode, | |
string=fault.faultString, | |
) | |
raise CommandError(message) | |
assert isinstance(hits, list) | |
return hits | |
def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: | |
""" | |
The list from pypi is really a list of versions. We want a list of | |
packages with the list of versions stored inline. This converts the | |
list from pypi into one we can use. | |
""" | |
packages: Dict[str, "TransformedHit"] = OrderedDict() | |
for hit in hits: | |
name = hit['name'] | |
summary = hit['summary'] | |
version = hit['version'] | |
if name not in packages.keys(): | |
packages[name] = { | |
'name': name, | |
'summary': summary, | |
'versions': [version], | |
} | |
else: | |
packages[name]['versions'].append(version) | |
# if this is the highest version, replace summary and score | |
if version == highest_version(packages[name]['versions']): | |
packages[name]['summary'] = summary | |
return list(packages.values()) | |
def print_dist_installation_info(name: str, latest: str) -> None: | |
env = get_default_environment() | |
dist = env.get_distribution(name) | |
if dist is not None: | |
with indent_log(): | |
if dist.version == latest: | |
write_output('INSTALLED: %s (latest)', dist.version) | |
else: | |
write_output('INSTALLED: %s', dist.version) | |
if parse_version(latest).pre: | |
write_output('LATEST: %s (pre-release; install' | |
' with "pip install --pre")', latest) | |
else: | |
write_output('LATEST: %s', latest) | |
def print_results( | |
hits: List["TransformedHit"], | |
name_column_width: Optional[int] = None, | |
terminal_width: Optional[int] = None, | |
) -> None: | |
if not hits: | |
return | |
if name_column_width is None: | |
name_column_width = max([ | |
len(hit['name']) + len(highest_version(hit.get('versions', ['-']))) | |
for hit in hits | |
]) + 4 | |
for hit in hits: | |
name = hit['name'] | |
summary = hit['summary'] or '' | |
latest = highest_version(hit.get('versions', ['-'])) | |
if terminal_width is not None: | |
target_width = terminal_width - name_column_width - 5 | |
if target_width > 10: | |
# wrap and indent summary to fit terminal | |
summary_lines = textwrap.wrap(summary, target_width) | |
summary = ('\n' + ' ' * (name_column_width + 3)).join( | |
summary_lines) | |
name_latest = f'{name} ({latest})' | |
line = f'{name_latest:{name_column_width}} - {summary}' | |
try: | |
write_output(line) | |
print_dist_installation_info(name, latest) | |
except UnicodeEncodeError: | |
pass | |
def highest_version(versions: List[str]) -> str: | |
return max(versions, key=parse_version) | |