| # -*- coding: utf-8 -*- |
| """upload_docs |
| |
| Implements a Distutils 'upload_docs' subcommand (upload documentation to |
| PyPI's pythonhosted.org). |
| """ |
| |
| from base64 import standard_b64encode |
| from distutils import log |
| from distutils.errors import DistutilsOptionError |
| import os |
| import socket |
| import zipfile |
| import tempfile |
| import shutil |
| import itertools |
| import functools |
| |
| from setuptools.extern import six |
| from setuptools.extern.six.moves import http_client, urllib |
| |
| from pkg_resources import iter_entry_points |
| from .upload import upload |
| |
| |
| def _encode(s): |
| errors = 'strict' if six.PY2 else 'surrogateescape' |
| return s.encode('utf-8', errors) |
| |
| |
| class upload_docs(upload): |
| # override the default repository as upload_docs isn't |
| # supported by Warehouse (and won't be). |
| DEFAULT_REPOSITORY = 'https://pypi.python.org/pypi/' |
| |
| description = 'Upload documentation to PyPI' |
| |
| user_options = [ |
| ('repository=', 'r', |
| "url of repository [default: %s]" % upload.DEFAULT_REPOSITORY), |
| ('show-response', None, |
| 'display full response text from server'), |
| ('upload-dir=', None, 'directory to upload'), |
| ] |
| boolean_options = upload.boolean_options |
| |
| def has_sphinx(self): |
| if self.upload_dir is None: |
| for ep in iter_entry_points('distutils.commands', 'build_sphinx'): |
| return True |
| |
| sub_commands = [('build_sphinx', has_sphinx)] |
| |
| def initialize_options(self): |
| upload.initialize_options(self) |
| self.upload_dir = None |
| self.target_dir = None |
| |
| def finalize_options(self): |
| upload.finalize_options(self) |
| if self.upload_dir is None: |
| if self.has_sphinx(): |
| build_sphinx = self.get_finalized_command('build_sphinx') |
| self.target_dir = build_sphinx.builder_target_dir |
| else: |
| build = self.get_finalized_command('build') |
| self.target_dir = os.path.join(build.build_base, 'docs') |
| else: |
| self.ensure_dirname('upload_dir') |
| self.target_dir = self.upload_dir |
| if 'pypi.python.org' in self.repository: |
| log.warn("Upload_docs command is deprecated. Use RTD instead.") |
| self.announce('Using upload directory %s' % self.target_dir) |
| |
| def create_zipfile(self, filename): |
| zip_file = zipfile.ZipFile(filename, "w") |
| try: |
| self.mkpath(self.target_dir) # just in case |
| for root, dirs, files in os.walk(self.target_dir): |
| if root == self.target_dir and not files: |
| tmpl = "no files found in upload directory '%s'" |
| raise DistutilsOptionError(tmpl % self.target_dir) |
| for name in files: |
| full = os.path.join(root, name) |
| relative = root[len(self.target_dir):].lstrip(os.path.sep) |
| dest = os.path.join(relative, name) |
| zip_file.write(full, dest) |
| finally: |
| zip_file.close() |
| |
| def run(self): |
| # Run sub commands |
| for cmd_name in self.get_sub_commands(): |
| self.run_command(cmd_name) |
| |
| tmp_dir = tempfile.mkdtemp() |
| name = self.distribution.metadata.get_name() |
| zip_file = os.path.join(tmp_dir, "%s.zip" % name) |
| try: |
| self.create_zipfile(zip_file) |
| self.upload_file(zip_file) |
| finally: |
| shutil.rmtree(tmp_dir) |
| |
| @staticmethod |
| def _build_part(item, sep_boundary): |
| key, values = item |
| title = '\nContent-Disposition: form-data; name="%s"' % key |
| # handle multiple entries for the same name |
| if not isinstance(values, list): |
| values = [values] |
| for value in values: |
| if isinstance(value, tuple): |
| title += '; filename="%s"' % value[0] |
| value = value[1] |
| else: |
| value = _encode(value) |
| yield sep_boundary |
| yield _encode(title) |
| yield b"\n\n" |
| yield value |
| if value and value[-1:] == b'\r': |
| yield b'\n' # write an extra newline (lurve Macs) |
| |
| @classmethod |
| def _build_multipart(cls, data): |
| """ |
| Build up the MIME payload for the POST data |
| """ |
| boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' |
| sep_boundary = b'\n--' + boundary |
| end_boundary = sep_boundary + b'--' |
| end_items = end_boundary, b"\n", |
| builder = functools.partial( |
| cls._build_part, |
| sep_boundary=sep_boundary, |
| ) |
| part_groups = map(builder, data.items()) |
| parts = itertools.chain.from_iterable(part_groups) |
| body_items = itertools.chain(parts, end_items) |
| content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii') |
| return b''.join(body_items), content_type |
| |
| def upload_file(self, filename): |
| with open(filename, 'rb') as f: |
| content = f.read() |
| meta = self.distribution.metadata |
| data = { |
| ':action': 'doc_upload', |
| 'name': meta.get_name(), |
| 'content': (os.path.basename(filename), content), |
| } |
| # set up the authentication |
| credentials = _encode(self.username + ':' + self.password) |
| credentials = standard_b64encode(credentials) |
| if not six.PY2: |
| credentials = credentials.decode('ascii') |
| auth = "Basic " + credentials |
| |
| body, ct = self._build_multipart(data) |
| |
| msg = "Submitting documentation to %s" % (self.repository) |
| self.announce(msg, log.INFO) |
| |
| # build the Request |
| # We can't use urllib2 since we need to send the Basic |
| # auth right with the first request |
| schema, netloc, url, params, query, fragments = \ |
| urllib.parse.urlparse(self.repository) |
| assert not params and not query and not fragments |
| if schema == 'http': |
| conn = http_client.HTTPConnection(netloc) |
| elif schema == 'https': |
| conn = http_client.HTTPSConnection(netloc) |
| else: |
| raise AssertionError("unsupported schema " + schema) |
| |
| data = '' |
| try: |
| conn.connect() |
| conn.putrequest("POST", url) |
| content_type = ct |
| conn.putheader('Content-type', content_type) |
| conn.putheader('Content-length', str(len(body))) |
| conn.putheader('Authorization', auth) |
| conn.endheaders() |
| conn.send(body) |
| except socket.error as e: |
| self.announce(str(e), log.ERROR) |
| return |
| |
| r = conn.getresponse() |
| if r.status == 200: |
| msg = 'Server response (%s): %s' % (r.status, r.reason) |
| self.announce(msg, log.INFO) |
| elif r.status == 301: |
| location = r.getheader('Location') |
| if location is None: |
| location = 'https://pythonhosted.org/%s/' % meta.get_name() |
| msg = 'Upload successful. Visit %s' % location |
| self.announce(msg, log.INFO) |
| else: |
| msg = 'Upload failed (%s): %s' % (r.status, r.reason) |
| self.announce(msg, log.ERROR) |
| if self.show_response: |
| print('-' * 75, r.read(), '-' * 75) |