| from __future__ import absolute_import |
| |
| import re |
| from collections import namedtuple |
| |
| from ..exceptions import LocationParseError |
| from ..packages import six |
| |
| url_attrs = ["scheme", "auth", "host", "port", "path", "query", "fragment"] |
| |
| # We only want to normalize urls with an HTTP(S) scheme. |
| # urllib3 infers URLs without a scheme (None) to be http. |
| NORMALIZABLE_SCHEMES = ("http", "https", None) |
| |
| # Almost all of these patterns were derived from the |
| # 'rfc3986' module: https://github.com/python-hyper/rfc3986 |
| PERCENT_RE = re.compile(r"%[a-fA-F0-9]{2}") |
| SCHEME_RE = re.compile(r"^(?:[a-zA-Z][a-zA-Z0-9+-]*:|/)") |
| URI_RE = re.compile( |
| r"^(?:([a-zA-Z][a-zA-Z0-9+.-]*):)?" |
| r"(?://([^\\/?#]*))?" |
| r"([^?#]*)" |
| r"(?:\?([^#]*))?" |
| r"(?:#(.*))?$", |
| re.UNICODE | re.DOTALL, |
| ) |
| |
| IPV4_PAT = r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}" |
| HEX_PAT = "[0-9A-Fa-f]{1,4}" |
| LS32_PAT = "(?:{hex}:{hex}|{ipv4})".format(hex=HEX_PAT, ipv4=IPV4_PAT) |
| _subs = {"hex": HEX_PAT, "ls32": LS32_PAT} |
| _variations = [ |
| # 6( h16 ":" ) ls32 |
| "(?:%(hex)s:){6}%(ls32)s", |
| # "::" 5( h16 ":" ) ls32 |
| "::(?:%(hex)s:){5}%(ls32)s", |
| # [ h16 ] "::" 4( h16 ":" ) ls32 |
| "(?:%(hex)s)?::(?:%(hex)s:){4}%(ls32)s", |
| # [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 |
| "(?:(?:%(hex)s:)?%(hex)s)?::(?:%(hex)s:){3}%(ls32)s", |
| # [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 |
| "(?:(?:%(hex)s:){0,2}%(hex)s)?::(?:%(hex)s:){2}%(ls32)s", |
| # [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 |
| "(?:(?:%(hex)s:){0,3}%(hex)s)?::%(hex)s:%(ls32)s", |
| # [ *4( h16 ":" ) h16 ] "::" ls32 |
| "(?:(?:%(hex)s:){0,4}%(hex)s)?::%(ls32)s", |
| # [ *5( h16 ":" ) h16 ] "::" h16 |
| "(?:(?:%(hex)s:){0,5}%(hex)s)?::%(hex)s", |
| # [ *6( h16 ":" ) h16 ] "::" |
| "(?:(?:%(hex)s:){0,6}%(hex)s)?::", |
| ] |
| |
| UNRESERVED_PAT = r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._!\-~" |
| IPV6_PAT = "(?:" + "|".join([x % _subs for x in _variations]) + ")" |
| ZONE_ID_PAT = "(?:%25|%)(?:[" + UNRESERVED_PAT + "]|%[a-fA-F0-9]{2})+" |
| IPV6_ADDRZ_PAT = r"\[" + IPV6_PAT + r"(?:" + ZONE_ID_PAT + r")?\]" |
| REG_NAME_PAT = r"(?:[^\[\]%:/?#]|%[a-fA-F0-9]{2})*" |
| TARGET_RE = re.compile(r"^(/[^?#]*)(?:\?([^#]*))?(?:#.*)?$") |
| |
| IPV4_RE = re.compile("^" + IPV4_PAT + "$") |
| IPV6_RE = re.compile("^" + IPV6_PAT + "$") |
| IPV6_ADDRZ_RE = re.compile("^" + IPV6_ADDRZ_PAT + "$") |
| BRACELESS_IPV6_ADDRZ_RE = re.compile("^" + IPV6_ADDRZ_PAT[2:-2] + "$") |
| ZONE_ID_RE = re.compile("(" + ZONE_ID_PAT + r")\]$") |
| |
| _HOST_PORT_PAT = ("^(%s|%s|%s)(?::([0-9]{0,5}))?$") % ( |
| REG_NAME_PAT, |
| IPV4_PAT, |
| IPV6_ADDRZ_PAT, |
| ) |
| _HOST_PORT_RE = re.compile(_HOST_PORT_PAT, re.UNICODE | re.DOTALL) |
| |
| UNRESERVED_CHARS = set( |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-~" |
| ) |
| SUB_DELIM_CHARS = set("!$&'()*+,;=") |
| USERINFO_CHARS = UNRESERVED_CHARS | SUB_DELIM_CHARS | {":"} |
| PATH_CHARS = USERINFO_CHARS | {"@", "/"} |
| QUERY_CHARS = FRAGMENT_CHARS = PATH_CHARS | {"?"} |
| |
| |
| class Url(namedtuple("Url", url_attrs)): |
| """ |
| Data structure for representing an HTTP URL. Used as a return value for |
| :func:`parse_url`. Both the scheme and host are normalized as they are |
| both case-insensitive according to RFC 3986. |
| """ |
| |
| __slots__ = () |
| |
| def __new__( |
| cls, |
| scheme=None, |
| auth=None, |
| host=None, |
| port=None, |
| path=None, |
| query=None, |
| fragment=None, |
| ): |
| if path and not path.startswith("/"): |
| path = "/" + path |
| if scheme is not None: |
| scheme = scheme.lower() |
| return super(Url, cls).__new__( |
| cls, scheme, auth, host, port, path, query, fragment |
| ) |
| |
| @property |
| def hostname(self): |
| """For backwards-compatibility with urlparse. We're nice like that.""" |
| return self.host |
| |
| @property |
| def request_uri(self): |
| """Absolute path including the query string.""" |
| uri = self.path or "/" |
| |
| if self.query is not None: |
| uri += "?" + self.query |
| |
| return uri |
| |
| @property |
| def netloc(self): |
| """Network location including host and port""" |
| if self.port: |
| return "%s:%d" % (self.host, self.port) |
| return self.host |
| |
| @property |
| def url(self): |
| """ |
| Convert self into a url |
| |
| This function should more or less round-trip with :func:`.parse_url`. The |
| returned url may not be exactly the same as the url inputted to |
| :func:`.parse_url`, but it should be equivalent by the RFC (e.g., urls |
| with a blank port will have : removed). |
| |
| Example: :: |
| |
| >>> U = parse_url('http://google.com/mail/') |
| >>> U.url |
| 'http://google.com/mail/' |
| >>> Url('http', 'username:password', 'host.com', 80, |
| ... '/path', 'query', 'fragment').url |
| 'http://username:password@host.com:80/path?query#fragment' |
| """ |
| scheme, auth, host, port, path, query, fragment = self |
| url = u"" |
| |
| # We use "is not None" we want things to happen with empty strings (or 0 port) |
| if scheme is not None: |
| url += scheme + u"://" |
| if auth is not None: |
| url += auth + u"@" |
| if host is not None: |
| url += host |
| if port is not None: |
| url += u":" + str(port) |
| if path is not None: |
| url += path |
| if query is not None: |
| url += u"?" + query |
| if fragment is not None: |
| url += u"#" + fragment |
| |
| return url |
| |
| def __str__(self): |
| return self.url |
| |
| |
| def split_first(s, delims): |
| """ |
| .. deprecated:: 1.25 |
| |
| Given a string and an iterable of delimiters, split on the first found |
| delimiter. Return two split parts and the matched delimiter. |
| |
| If not found, then the first part is the full input string. |
| |
| Example:: |
| |
| >>> split_first('foo/bar?baz', '?/=') |
| ('foo', 'bar?baz', '/') |
| >>> split_first('foo/bar?baz', '123') |
| ('foo/bar?baz', '', None) |
| |
| Scales linearly with number of delims. Not ideal for large number of delims. |
| """ |
| min_idx = None |
| min_delim = None |
| for d in delims: |
| idx = s.find(d) |
| if idx < 0: |
| continue |
| |
| if min_idx is None or idx < min_idx: |
| min_idx = idx |
| min_delim = d |
| |
| if min_idx is None or min_idx < 0: |
| return s, "", None |
| |
| return s[:min_idx], s[min_idx + 1 :], min_delim |
| |
| |
| def _encode_invalid_chars(component, allowed_chars, encoding="utf-8"): |
| """Percent-encodes a URI component without reapplying |
| onto an already percent-encoded component. |
| """ |
| if component is None: |
| return component |
| |
| component = six.ensure_text(component) |
| |
| # Normalize existing percent-encoded bytes. |
| # Try to see if the component we're encoding is already percent-encoded |
| # so we can skip all '%' characters but still encode all others. |
| component, percent_encodings = PERCENT_RE.subn( |
| lambda match: match.group(0).upper(), component |
| ) |
| |
| uri_bytes = component.encode("utf-8", "surrogatepass") |
| is_percent_encoded = percent_encodings == uri_bytes.count(b"%") |
| encoded_component = bytearray() |
| |
| for i in range(0, len(uri_bytes)): |
| # Will return a single character bytestring on both Python 2 & 3 |
| byte = uri_bytes[i : i + 1] |
| byte_ord = ord(byte) |
| if (is_percent_encoded and byte == b"%") or ( |
| byte_ord < 128 and byte.decode() in allowed_chars |
| ): |
| encoded_component += byte |
| continue |
| encoded_component.extend(b"%" + (hex(byte_ord)[2:].encode().zfill(2).upper())) |
| |
| return encoded_component.decode(encoding) |
| |
| |
| def _remove_path_dot_segments(path): |
| # See http://tools.ietf.org/html/rfc3986#section-5.2.4 for pseudo-code |
| segments = path.split("/") # Turn the path into a list of segments |
| output = [] # Initialize the variable to use to store output |
| |
| for segment in segments: |
| # '.' is the current directory, so ignore it, it is superfluous |
| if segment == ".": |
| continue |
| # Anything other than '..', should be appended to the output |
| elif segment != "..": |
| output.append(segment) |
| # In this case segment == '..', if we can, we should pop the last |
| # element |
| elif output: |
| output.pop() |
| |
| # If the path starts with '/' and the output is empty or the first string |
| # is non-empty |
| if path.startswith("/") and (not output or output[0]): |
| output.insert(0, "") |
| |
| # If the path starts with '/.' or '/..' ensure we add one more empty |
| # string to add a trailing '/' |
| if path.endswith(("/.", "/..")): |
| output.append("") |
| |
| return "/".join(output) |
| |
| |
| def _normalize_host(host, scheme): |
| if host: |
| if isinstance(host, six.binary_type): |
| host = six.ensure_str(host) |
| |
| if scheme in NORMALIZABLE_SCHEMES: |
| is_ipv6 = IPV6_ADDRZ_RE.match(host) |
| if is_ipv6: |
| match = ZONE_ID_RE.search(host) |
| if match: |
| start, end = match.span(1) |
| zone_id = host[start:end] |
| |
| if zone_id.startswith("%25") and zone_id != "%25": |
| zone_id = zone_id[3:] |
| else: |
| zone_id = zone_id[1:] |
| zone_id = "%" + _encode_invalid_chars(zone_id, UNRESERVED_CHARS) |
| return host[:start].lower() + zone_id + host[end:] |
| else: |
| return host.lower() |
| elif not IPV4_RE.match(host): |
| return six.ensure_str( |
| b".".join([_idna_encode(label) for label in host.split(".")]) |
| ) |
| return host |
| |
| |
| def _idna_encode(name): |
| if name and any([ord(x) > 128 for x in name]): |
| try: |
| from pip._vendor import idna |
| except ImportError: |
| six.raise_from( |
| LocationParseError("Unable to parse URL without the 'idna' module"), |
| None, |
| ) |
| try: |
| return idna.encode(name.lower(), strict=True, std3_rules=True) |
| except idna.IDNAError: |
| six.raise_from( |
| LocationParseError(u"Name '%s' is not a valid IDNA label" % name), None |
| ) |
| return name.lower().encode("ascii") |
| |
| |
| def _encode_target(target): |
| """Percent-encodes a request target so that there are no invalid characters""" |
| path, query = TARGET_RE.match(target).groups() |
| target = _encode_invalid_chars(path, PATH_CHARS) |
| query = _encode_invalid_chars(query, QUERY_CHARS) |
| if query is not None: |
| target += "?" + query |
| return target |
| |
| |
| def parse_url(url): |
| """ |
| Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is |
| performed to parse incomplete urls. Fields not provided will be None. |
| This parser is RFC 3986 compliant. |
| |
| The parser logic and helper functions are based heavily on |
| work done in the ``rfc3986`` module. |
| |
| :param str url: URL to parse into a :class:`.Url` namedtuple. |
| |
| Partly backwards-compatible with :mod:`urlparse`. |
| |
| Example:: |
| |
| >>> parse_url('http://google.com/mail/') |
| Url(scheme='http', host='google.com', port=None, path='/mail/', ...) |
| >>> parse_url('google.com:80') |
| Url(scheme=None, host='google.com', port=80, path=None, ...) |
| >>> parse_url('/foo?bar') |
| Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...) |
| """ |
| if not url: |
| # Empty |
| return Url() |
| |
| source_url = url |
| if not SCHEME_RE.search(url): |
| url = "//" + url |
| |
| try: |
| scheme, authority, path, query, fragment = URI_RE.match(url).groups() |
| normalize_uri = scheme is None or scheme.lower() in NORMALIZABLE_SCHEMES |
| |
| if scheme: |
| scheme = scheme.lower() |
| |
| if authority: |
| auth, _, host_port = authority.rpartition("@") |
| auth = auth or None |
| host, port = _HOST_PORT_RE.match(host_port).groups() |
| if auth and normalize_uri: |
| auth = _encode_invalid_chars(auth, USERINFO_CHARS) |
| if port == "": |
| port = None |
| else: |
| auth, host, port = None, None, None |
| |
| if port is not None: |
| port = int(port) |
| if not (0 <= port <= 65535): |
| raise LocationParseError(url) |
| |
| host = _normalize_host(host, scheme) |
| |
| if normalize_uri and path: |
| path = _remove_path_dot_segments(path) |
| path = _encode_invalid_chars(path, PATH_CHARS) |
| if normalize_uri and query: |
| query = _encode_invalid_chars(query, QUERY_CHARS) |
| if normalize_uri and fragment: |
| fragment = _encode_invalid_chars(fragment, FRAGMENT_CHARS) |
| |
| except (ValueError, AttributeError): |
| return six.raise_from(LocationParseError(source_url), None) |
| |
| # For the sake of backwards compatibility we put empty |
| # string values for path if there are any defined values |
| # beyond the path in the URL. |
| # TODO: Remove this when we break backwards compatibility. |
| if not path: |
| if query is not None or fragment is not None: |
| path = "" |
| else: |
| path = None |
| |
| # Ensure that each part of the URL is a `str` for |
| # backwards compatibility. |
| if isinstance(url, six.text_type): |
| ensure_func = six.ensure_text |
| else: |
| ensure_func = six.ensure_str |
| |
| def ensure_type(x): |
| return x if x is None else ensure_func(x) |
| |
| return Url( |
| scheme=ensure_type(scheme), |
| auth=ensure_type(auth), |
| host=ensure_type(host), |
| port=port, |
| path=ensure_type(path), |
| query=ensure_type(query), |
| fragment=ensure_type(fragment), |
| ) |
| |
| |
| def get_host(url): |
| """ |
| Deprecated. Use :func:`parse_url` instead. |
| """ |
| p = parse_url(url) |
| return p.scheme or "http", p.hostname, p.port |