| import hashlib |
| from typing import TYPE_CHECKING, BinaryIO, Dict, Iterable, List, Optional |
| |
| from pip._internal.exceptions import HashMismatch, HashMissing, InstallationError |
| from pip._internal.utils.misc import read_chunks |
| |
| if TYPE_CHECKING: |
| from hashlib import _Hash |
| |
| # NoReturn introduced in 3.6.2; imported only for type checking to maintain |
| # pip compatibility with older patch versions of Python 3.6 |
| from typing import NoReturn |
| |
| |
| # The recommended hash algo of the moment. Change this whenever the state of |
| # the art changes; it won't hurt backward compatibility. |
| FAVORITE_HASH = "sha256" |
| |
| |
| # Names of hashlib algorithms allowed by the --hash option and ``pip hash`` |
| # Currently, those are the ones at least as collision-resistant as sha256. |
| STRONG_HASHES = ["sha256", "sha384", "sha512"] |
| |
| |
| class Hashes: |
| """A wrapper that builds multiple hashes at once and checks them against |
| known-good values |
| |
| """ |
| |
| def __init__(self, hashes: Optional[Dict[str, List[str]]] = None) -> None: |
| """ |
| :param hashes: A dict of algorithm names pointing to lists of allowed |
| hex digests |
| """ |
| allowed = {} |
| if hashes is not None: |
| for alg, keys in hashes.items(): |
| # Make sure values are always sorted (to ease equality checks) |
| allowed[alg] = sorted(keys) |
| self._allowed = allowed |
| |
| def __and__(self, other: "Hashes") -> "Hashes": |
| if not isinstance(other, Hashes): |
| return NotImplemented |
| |
| # If either of the Hashes object is entirely empty (i.e. no hash |
| # specified at all), all hashes from the other object are allowed. |
| if not other: |
| return self |
| if not self: |
| return other |
| |
| # Otherwise only hashes that present in both objects are allowed. |
| new = {} |
| for alg, values in other._allowed.items(): |
| if alg not in self._allowed: |
| continue |
| new[alg] = [v for v in values if v in self._allowed[alg]] |
| return Hashes(new) |
| |
| @property |
| def digest_count(self) -> int: |
| return sum(len(digests) for digests in self._allowed.values()) |
| |
| def is_hash_allowed(self, hash_name: str, hex_digest: str) -> bool: |
| """Return whether the given hex digest is allowed.""" |
| return hex_digest in self._allowed.get(hash_name, []) |
| |
| def check_against_chunks(self, chunks: Iterable[bytes]) -> None: |
| """Check good hashes against ones built from iterable of chunks of |
| data. |
| |
| Raise HashMismatch if none match. |
| |
| """ |
| gots = {} |
| for hash_name in self._allowed.keys(): |
| try: |
| gots[hash_name] = hashlib.new(hash_name) |
| except (ValueError, TypeError): |
| raise InstallationError(f"Unknown hash name: {hash_name}") |
| |
| for chunk in chunks: |
| for hash in gots.values(): |
| hash.update(chunk) |
| |
| for hash_name, got in gots.items(): |
| if got.hexdigest() in self._allowed[hash_name]: |
| return |
| self._raise(gots) |
| |
| def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn": |
| raise HashMismatch(self._allowed, gots) |
| |
| def check_against_file(self, file: BinaryIO) -> None: |
| """Check good hashes against a file-like object |
| |
| Raise HashMismatch if none match. |
| |
| """ |
| return self.check_against_chunks(read_chunks(file)) |
| |
| def check_against_path(self, path: str) -> None: |
| with open(path, "rb") as file: |
| return self.check_against_file(file) |
| |
| def __bool__(self) -> bool: |
| """Return whether I know any known-good hashes.""" |
| return bool(self._allowed) |
| |
| def __eq__(self, other: object) -> bool: |
| if not isinstance(other, Hashes): |
| return NotImplemented |
| return self._allowed == other._allowed |
| |
| def __hash__(self) -> int: |
| return hash( |
| ",".join( |
| sorted( |
| ":".join((alg, digest)) |
| for alg, digest_list in self._allowed.items() |
| for digest in digest_list |
| ) |
| ) |
| ) |
| |
| |
| class MissingHashes(Hashes): |
| """A workalike for Hashes used when we're missing a hash for a requirement |
| |
| It computes the actual hash of the requirement and raises a HashMissing |
| exception showing it to the user. |
| |
| """ |
| |
| def __init__(self) -> None: |
| """Don't offer the ``hashes`` kwarg.""" |
| # Pass our favorite hash in to generate a "gotten hash". With the |
| # empty list, it will never match, so an error will always raise. |
| super().__init__(hashes={FAVORITE_HASH: []}) |
| |
| def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn": |
| raise HashMissing(gots[FAVORITE_HASH].hexdigest()) |