| import collections |
| import math |
| from typing import TYPE_CHECKING, Dict, Iterable, Iterator, Mapping, Sequence, Union |
| |
| from pip._vendor.resolvelib.providers import AbstractProvider |
| |
| from .base import Candidate, Constraint, Requirement |
| from .candidates import REQUIRES_PYTHON_IDENTIFIER |
| from .factory import Factory |
| |
| if TYPE_CHECKING: |
| from pip._vendor.resolvelib.providers import Preference |
| from pip._vendor.resolvelib.resolvers import RequirementInformation |
| |
| PreferenceInformation = RequirementInformation[Requirement, Candidate] |
| |
| _ProviderBase = AbstractProvider[Requirement, Candidate, str] |
| else: |
| _ProviderBase = AbstractProvider |
| |
| # Notes on the relationship between the provider, the factory, and the |
| # candidate and requirement classes. |
| # |
| # The provider is a direct implementation of the resolvelib class. Its role |
| # is to deliver the API that resolvelib expects. |
| # |
| # Rather than work with completely abstract "requirement" and "candidate" |
| # concepts as resolvelib does, pip has concrete classes implementing these two |
| # ideas. The API of Requirement and Candidate objects are defined in the base |
| # classes, but essentially map fairly directly to the equivalent provider |
| # methods. In particular, `find_matches` and `is_satisfied_by` are |
| # requirement methods, and `get_dependencies` is a candidate method. |
| # |
| # The factory is the interface to pip's internal mechanisms. It is stateless, |
| # and is created by the resolver and held as a property of the provider. It is |
| # responsible for creating Requirement and Candidate objects, and provides |
| # services to those objects (access to pip's finder and preparer). |
| |
| |
| class PipProvider(_ProviderBase): |
| """Pip's provider implementation for resolvelib. |
| |
| :params constraints: A mapping of constraints specified by the user. Keys |
| are canonicalized project names. |
| :params ignore_dependencies: Whether the user specified ``--no-deps``. |
| :params upgrade_strategy: The user-specified upgrade strategy. |
| :params user_requested: A set of canonicalized package names that the user |
| supplied for pip to install/upgrade. |
| """ |
| |
| def __init__( |
| self, |
| factory: Factory, |
| constraints: Dict[str, Constraint], |
| ignore_dependencies: bool, |
| upgrade_strategy: str, |
| user_requested: Dict[str, int], |
| ) -> None: |
| self._factory = factory |
| self._constraints = constraints |
| self._ignore_dependencies = ignore_dependencies |
| self._upgrade_strategy = upgrade_strategy |
| self._user_requested = user_requested |
| self._known_depths: Dict[str, float] = collections.defaultdict(lambda: math.inf) |
| |
| def identify(self, requirement_or_candidate: Union[Requirement, Candidate]) -> str: |
| return requirement_or_candidate.name |
| |
| def get_preference( # type: ignore |
| self, |
| identifier: str, |
| resolutions: Mapping[str, Candidate], |
| candidates: Mapping[str, Iterator[Candidate]], |
| information: Mapping[str, Iterable["PreferenceInformation"]], |
| backtrack_causes: Sequence["PreferenceInformation"], |
| ) -> "Preference": |
| """Produce a sort key for given requirement based on preference. |
| |
| The lower the return value is, the more preferred this group of |
| arguments is. |
| |
| Currently pip considers the followings in order: |
| |
| * Prefer if any of the known requirements is "direct", e.g. points to an |
| explicit URL. |
| * If equal, prefer if any requirement is "pinned", i.e. contains |
| operator ``===`` or ``==``. |
| * If equal, calculate an approximate "depth" and resolve requirements |
| closer to the user-specified requirements first. |
| * Order user-specified requirements by the order they are specified. |
| * If equal, prefers "non-free" requirements, i.e. contains at least one |
| operator, such as ``>=`` or ``<``. |
| * If equal, order alphabetically for consistency (helps debuggability). |
| """ |
| lookups = (r.get_candidate_lookup() for r, _ in information[identifier]) |
| candidate, ireqs = zip(*lookups) |
| operators = [ |
| specifier.operator |
| for specifier_set in (ireq.specifier for ireq in ireqs if ireq) |
| for specifier in specifier_set |
| ] |
| |
| direct = candidate is not None |
| pinned = any(op[:2] == "==" for op in operators) |
| unfree = bool(operators) |
| |
| try: |
| requested_order: Union[int, float] = self._user_requested[identifier] |
| except KeyError: |
| requested_order = math.inf |
| parent_depths = ( |
| self._known_depths[parent.name] if parent is not None else 0.0 |
| for _, parent in information[identifier] |
| ) |
| inferred_depth = min(d for d in parent_depths) + 1.0 |
| else: |
| inferred_depth = 1.0 |
| self._known_depths[identifier] = inferred_depth |
| |
| requested_order = self._user_requested.get(identifier, math.inf) |
| |
| # Requires-Python has only one candidate and the check is basically |
| # free, so we always do it first to avoid needless work if it fails. |
| requires_python = identifier == REQUIRES_PYTHON_IDENTIFIER |
| |
| # HACK: Setuptools have a very long and solid backward compatibility |
| # track record, and extremely few projects would request a narrow, |
| # non-recent version range of it since that would break a lot things. |
| # (Most projects specify it only to request for an installer feature, |
| # which does not work, but that's another topic.) Intentionally |
| # delaying Setuptools helps reduce branches the resolver has to check. |
| # This serves as a temporary fix for issues like "apache-airlfow[all]" |
| # while we work on "proper" branch pruning techniques. |
| delay_this = identifier == "setuptools" |
| |
| # Prefer the causes of backtracking on the assumption that the problem |
| # resolving the dependency tree is related to the failures that caused |
| # the backtracking |
| backtrack_cause = self.is_backtrack_cause(identifier, backtrack_causes) |
| |
| return ( |
| not requires_python, |
| delay_this, |
| not direct, |
| not pinned, |
| not backtrack_cause, |
| inferred_depth, |
| requested_order, |
| not unfree, |
| identifier, |
| ) |
| |
| def _get_constraint(self, identifier: str) -> Constraint: |
| if identifier in self._constraints: |
| return self._constraints[identifier] |
| |
| # HACK: Theoratically we should check whether this identifier is a valid |
| # "NAME[EXTRAS]" format, and parse out the name part with packaging or |
| # some regular expression. But since pip's resolver only spits out |
| # three kinds of identifiers: normalized PEP 503 names, normalized names |
| # plus extras, and Requires-Python, we can cheat a bit here. |
| name, open_bracket, _ = identifier.partition("[") |
| if open_bracket and name in self._constraints: |
| return self._constraints[name] |
| |
| return Constraint.empty() |
| |
| def find_matches( |
| self, |
| identifier: str, |
| requirements: Mapping[str, Iterator[Requirement]], |
| incompatibilities: Mapping[str, Iterator[Candidate]], |
| ) -> Iterable[Candidate]: |
| def _eligible_for_upgrade(name: str) -> bool: |
| """Are upgrades allowed for this project? |
| |
| This checks the upgrade strategy, and whether the project was one |
| that the user specified in the command line, in order to decide |
| whether we should upgrade if there's a newer version available. |
| |
| (Note that we don't need access to the `--upgrade` flag, because |
| an upgrade strategy of "to-satisfy-only" means that `--upgrade` |
| was not specified). |
| """ |
| if self._upgrade_strategy == "eager": |
| return True |
| elif self._upgrade_strategy == "only-if-needed": |
| return name in self._user_requested |
| return False |
| |
| return self._factory.find_candidates( |
| identifier=identifier, |
| requirements=requirements, |
| constraint=self._get_constraint(identifier), |
| prefers_installed=(not _eligible_for_upgrade(identifier)), |
| incompatibilities=incompatibilities, |
| ) |
| |
| def is_satisfied_by(self, requirement: Requirement, candidate: Candidate) -> bool: |
| return requirement.is_satisfied_by(candidate) |
| |
| def get_dependencies(self, candidate: Candidate) -> Sequence[Requirement]: |
| with_requires = not self._ignore_dependencies |
| return [r for r in candidate.iter_dependencies(with_requires) if r is not None] |
| |
| @staticmethod |
| def is_backtrack_cause( |
| identifier: str, backtrack_causes: Sequence["PreferenceInformation"] |
| ) -> bool: |
| for backtrack_cause in backtrack_causes: |
| if identifier == backtrack_cause.requirement.name: |
| return True |
| if backtrack_cause.parent and identifier == backtrack_cause.parent.name: |
| return True |
| return False |