| import configparser |
| import logging |
| import os |
| from typing import List, Optional |
| |
| from pip._internal.exceptions import BadCommand, InstallationError |
| from pip._internal.utils.misc import HiddenText, display_path |
| from pip._internal.utils.subprocess import make_command |
| from pip._internal.utils.urls import path_to_url |
| from pip._internal.vcs.versioncontrol import ( |
| RevOptions, |
| VersionControl, |
| find_path_to_project_root_from_repo_root, |
| vcs, |
| ) |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class Mercurial(VersionControl): |
| name = "hg" |
| dirname = ".hg" |
| repo_name = "clone" |
| schemes = ( |
| "hg+file", |
| "hg+http", |
| "hg+https", |
| "hg+ssh", |
| "hg+static-http", |
| ) |
| |
| @staticmethod |
| def get_base_rev_args(rev: str) -> List[str]: |
| return [rev] |
| |
| def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: |
| rev_display = rev_options.to_display() |
| logger.info( |
| "Cloning hg %s%s to %s", |
| url, |
| rev_display, |
| display_path(dest), |
| ) |
| self.run_command(make_command("clone", "--noupdate", "-q", url, dest)) |
| self.run_command( |
| make_command("update", "-q", rev_options.to_args()), |
| cwd=dest, |
| ) |
| |
| def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: |
| repo_config = os.path.join(dest, self.dirname, "hgrc") |
| config = configparser.RawConfigParser() |
| try: |
| config.read(repo_config) |
| config.set("paths", "default", url.secret) |
| with open(repo_config, "w") as config_file: |
| config.write(config_file) |
| except (OSError, configparser.NoSectionError) as exc: |
| logger.warning("Could not switch Mercurial repository to %s: %s", url, exc) |
| else: |
| cmd_args = make_command("update", "-q", rev_options.to_args()) |
| self.run_command(cmd_args, cwd=dest) |
| |
| def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: |
| self.run_command(["pull", "-q"], cwd=dest) |
| cmd_args = make_command("update", "-q", rev_options.to_args()) |
| self.run_command(cmd_args, cwd=dest) |
| |
| @classmethod |
| def get_remote_url(cls, location: str) -> str: |
| url = cls.run_command( |
| ["showconfig", "paths.default"], |
| show_stdout=False, |
| stdout_only=True, |
| cwd=location, |
| ).strip() |
| if cls._is_local_repository(url): |
| url = path_to_url(url) |
| return url.strip() |
| |
| @classmethod |
| def get_revision(cls, location: str) -> str: |
| """ |
| Return the repository-local changeset revision number, as an integer. |
| """ |
| current_revision = cls.run_command( |
| ["parents", "--template={rev}"], |
| show_stdout=False, |
| stdout_only=True, |
| cwd=location, |
| ).strip() |
| return current_revision |
| |
| @classmethod |
| def get_requirement_revision(cls, location: str) -> str: |
| """ |
| Return the changeset identification hash, as a 40-character |
| hexadecimal string |
| """ |
| current_rev_hash = cls.run_command( |
| ["parents", "--template={node}"], |
| show_stdout=False, |
| stdout_only=True, |
| cwd=location, |
| ).strip() |
| return current_rev_hash |
| |
| @classmethod |
| def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: |
| """Always assume the versions don't match""" |
| return False |
| |
| @classmethod |
| def get_subdirectory(cls, location: str) -> Optional[str]: |
| """ |
| Return the path to Python project root, relative to the repo root. |
| Return None if the project root is in the repo root. |
| """ |
| # find the repo root |
| repo_root = cls.run_command( |
| ["root"], show_stdout=False, stdout_only=True, cwd=location |
| ).strip() |
| if not os.path.isabs(repo_root): |
| repo_root = os.path.abspath(os.path.join(location, repo_root)) |
| return find_path_to_project_root_from_repo_root(location, repo_root) |
| |
| @classmethod |
| def get_repository_root(cls, location: str) -> Optional[str]: |
| loc = super().get_repository_root(location) |
| if loc: |
| return loc |
| try: |
| r = cls.run_command( |
| ["root"], |
| cwd=location, |
| show_stdout=False, |
| stdout_only=True, |
| on_returncode="raise", |
| log_failed_cmd=False, |
| ) |
| except BadCommand: |
| logger.debug( |
| "could not determine if %s is under hg control " |
| "because hg is not available", |
| location, |
| ) |
| return None |
| except InstallationError: |
| return None |
| return os.path.normpath(r.rstrip("\r\n")) |
| |
| |
| vcs.register(Mercurial) |