Spaces:
Running
Running
"""upload_docs | |
Implements a Distutils 'upload_docs' subcommand (upload documentation to | |
sites other than PyPi such as devpi). | |
""" | |
from base64 import standard_b64encode | |
from distutils import log | |
from distutils.errors import DistutilsOptionError | |
import os | |
import socket | |
import zipfile | |
import tempfile | |
import shutil | |
import itertools | |
import functools | |
import http.client | |
import urllib.parse | |
from .._importlib import metadata | |
from ..warnings import SetuptoolsDeprecationWarning | |
from .upload import upload | |
def _encode(s): | |
return s.encode('utf-8', 'surrogateescape') | |
class upload_docs(upload): | |
# override the default repository as upload_docs isn't | |
# supported by Warehouse (and won't be). | |
DEFAULT_REPOSITORY = 'https://pypi.python.org/pypi/' | |
description = 'Upload documentation to sites other than PyPi such as devpi' | |
user_options = [ | |
('repository=', 'r', | |
"url of repository [default: %s]" % upload.DEFAULT_REPOSITORY), | |
('show-response', None, | |
'display full response text from server'), | |
('upload-dir=', None, 'directory to upload'), | |
] | |
boolean_options = upload.boolean_options | |
def has_sphinx(self): | |
return bool( | |
self.upload_dir is None | |
and metadata.entry_points(group='distutils.commands', name='build_sphinx') | |
) | |
sub_commands = [('build_sphinx', has_sphinx)] | |
def initialize_options(self): | |
upload.initialize_options(self) | |
self.upload_dir = None | |
self.target_dir = None | |
def finalize_options(self): | |
log.warn( | |
"Upload_docs command is deprecated. Use Read the Docs " | |
"(https://readthedocs.org) instead.") | |
upload.finalize_options(self) | |
if self.upload_dir is None: | |
if self.has_sphinx(): | |
build_sphinx = self.get_finalized_command('build_sphinx') | |
self.target_dir = dict(build_sphinx.builder_target_dirs)['html'] | |
else: | |
build = self.get_finalized_command('build') | |
self.target_dir = os.path.join(build.build_base, 'docs') | |
else: | |
self.ensure_dirname('upload_dir') | |
self.target_dir = self.upload_dir | |
self.announce('Using upload directory %s' % self.target_dir) | |
def create_zipfile(self, filename): | |
zip_file = zipfile.ZipFile(filename, "w") | |
try: | |
self.mkpath(self.target_dir) # just in case | |
for root, dirs, files in os.walk(self.target_dir): | |
if root == self.target_dir and not files: | |
tmpl = "no files found in upload directory '%s'" | |
raise DistutilsOptionError(tmpl % self.target_dir) | |
for name in files: | |
full = os.path.join(root, name) | |
relative = root[len(self.target_dir):].lstrip(os.path.sep) | |
dest = os.path.join(relative, name) | |
zip_file.write(full, dest) | |
finally: | |
zip_file.close() | |
def run(self): | |
SetuptoolsDeprecationWarning.emit( | |
"Deprecated command", | |
""" | |
upload_docs is deprecated and will be removed in a future version. | |
Instead, use tools like devpi and Read the Docs; or lower level tools like | |
httpie and curl to interact directly with your hosting service API. | |
""", | |
due_date=(2023, 9, 26), # warning introduced in 27 Jul 2022 | |
) | |
# Run sub commands | |
for cmd_name in self.get_sub_commands(): | |
self.run_command(cmd_name) | |
tmp_dir = tempfile.mkdtemp() | |
name = self.distribution.metadata.get_name() | |
zip_file = os.path.join(tmp_dir, "%s.zip" % name) | |
try: | |
self.create_zipfile(zip_file) | |
self.upload_file(zip_file) | |
finally: | |
shutil.rmtree(tmp_dir) | |
def _build_part(item, sep_boundary): | |
key, values = item | |
title = '\nContent-Disposition: form-data; name="%s"' % key | |
# handle multiple entries for the same name | |
if not isinstance(values, list): | |
values = [values] | |
for value in values: | |
if isinstance(value, tuple): | |
title += '; filename="%s"' % value[0] | |
value = value[1] | |
else: | |
value = _encode(value) | |
yield sep_boundary | |
yield _encode(title) | |
yield b"\n\n" | |
yield value | |
if value and value[-1:] == b'\r': | |
yield b'\n' # write an extra newline (lurve Macs) | |
def _build_multipart(cls, data): | |
""" | |
Build up the MIME payload for the POST data | |
""" | |
boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' | |
sep_boundary = b'\n--' + boundary.encode('ascii') | |
end_boundary = sep_boundary + b'--' | |
end_items = end_boundary, b"\n", | |
builder = functools.partial( | |
cls._build_part, | |
sep_boundary=sep_boundary, | |
) | |
part_groups = map(builder, data.items()) | |
parts = itertools.chain.from_iterable(part_groups) | |
body_items = itertools.chain(parts, end_items) | |
content_type = 'multipart/form-data; boundary=%s' % boundary | |
return b''.join(body_items), content_type | |
def upload_file(self, filename): | |
with open(filename, 'rb') as f: | |
content = f.read() | |
meta = self.distribution.metadata | |
data = { | |
':action': 'doc_upload', | |
'name': meta.get_name(), | |
'content': (os.path.basename(filename), content), | |
} | |
# set up the authentication | |
credentials = _encode(self.username + ':' + self.password) | |
credentials = standard_b64encode(credentials).decode('ascii') | |
auth = "Basic " + credentials | |
body, ct = self._build_multipart(data) | |
msg = "Submitting documentation to %s" % (self.repository) | |
self.announce(msg, log.INFO) | |
# build the Request | |
# We can't use urllib2 since we need to send the Basic | |
# auth right with the first request | |
schema, netloc, url, params, query, fragments = \ | |
urllib.parse.urlparse(self.repository) | |
assert not params and not query and not fragments | |
if schema == 'http': | |
conn = http.client.HTTPConnection(netloc) | |
elif schema == 'https': | |
conn = http.client.HTTPSConnection(netloc) | |
else: | |
raise AssertionError("unsupported schema " + schema) | |
data = '' | |
try: | |
conn.connect() | |
conn.putrequest("POST", url) | |
content_type = ct | |
conn.putheader('Content-type', content_type) | |
conn.putheader('Content-length', str(len(body))) | |
conn.putheader('Authorization', auth) | |
conn.endheaders() | |
conn.send(body) | |
except socket.error as e: | |
self.announce(str(e), log.ERROR) | |
return | |
r = conn.getresponse() | |
if r.status == 200: | |
msg = 'Server response (%s): %s' % (r.status, r.reason) | |
self.announce(msg, log.INFO) | |
elif r.status == 301: | |
location = r.getheader('Location') | |
if location is None: | |
location = 'https://pythonhosted.org/%s/' % meta.get_name() | |
msg = 'Upload successful. Visit %s' % location | |
self.announce(msg, log.INFO) | |
else: | |
msg = 'Upload failed (%s): %s' % (r.status, r.reason) | |
self.announce(msg, log.ERROR) | |
if self.show_response: | |
print('-' * 75, r.read(), '-' * 75) | |