blob: e75feda9ca9477b0ffec1f523f29033e289d6b6a [file] [log] [blame]
""" PEP 610 """
import json
import re
import urllib.parse
from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
__all__ = [
"DirectUrl",
"DirectUrlValidationError",
"DirInfo",
"ArchiveInfo",
"VcsInfo",
]
T = TypeVar("T")
DIRECT_URL_METADATA_NAME = "direct_url.json"
ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
class DirectUrlValidationError(Exception):
pass
def _get(
d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
) -> Optional[T]:
"""Get value from dictionary and verify expected type."""
if key not in d:
return default
value = d[key]
if not isinstance(value, expected_type):
raise DirectUrlValidationError(
"{!r} has unexpected type for {} (expected {})".format(
value, key, expected_type
)
)
return value
def _get_required(
d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
) -> T:
value = _get(d, expected_type, key, default)
if value is None:
raise DirectUrlValidationError(f"{key} must have a value")
return value
def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
infos = [info for info in infos if info is not None]
if not infos:
raise DirectUrlValidationError(
"missing one of archive_info, dir_info, vcs_info"
)
if len(infos) > 1:
raise DirectUrlValidationError(
"more than one of archive_info, dir_info, vcs_info"
)
assert infos[0] is not None
return infos[0]
def _filter_none(**kwargs: Any) -> Dict[str, Any]:
"""Make dict excluding None values."""
return {k: v for k, v in kwargs.items() if v is not None}
class VcsInfo:
name = "vcs_info"
def __init__(
self,
vcs: str,
commit_id: str,
requested_revision: Optional[str] = None,
) -> None:
self.vcs = vcs
self.requested_revision = requested_revision
self.commit_id = commit_id
@classmethod
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
if d is None:
return None
return cls(
vcs=_get_required(d, str, "vcs"),
commit_id=_get_required(d, str, "commit_id"),
requested_revision=_get(d, str, "requested_revision"),
)
def _to_dict(self) -> Dict[str, Any]:
return _filter_none(
vcs=self.vcs,
requested_revision=self.requested_revision,
commit_id=self.commit_id,
)
class ArchiveInfo:
name = "archive_info"
def __init__(
self,
hash: Optional[str] = None,
) -> None:
self.hash = hash
@classmethod
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
if d is None:
return None
return cls(hash=_get(d, str, "hash"))
def _to_dict(self) -> Dict[str, Any]:
return _filter_none(hash=self.hash)
class DirInfo:
name = "dir_info"
def __init__(
self,
editable: bool = False,
) -> None:
self.editable = editable
@classmethod
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
if d is None:
return None
return cls(editable=_get_required(d, bool, "editable", default=False))
def _to_dict(self) -> Dict[str, Any]:
return _filter_none(editable=self.editable or None)
InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
class DirectUrl:
def __init__(
self,
url: str,
info: InfoType,
subdirectory: Optional[str] = None,
) -> None:
self.url = url
self.info = info
self.subdirectory = subdirectory
def _remove_auth_from_netloc(self, netloc: str) -> str:
if "@" not in netloc:
return netloc
user_pass, netloc_no_user_pass = netloc.split("@", 1)
if (
isinstance(self.info, VcsInfo)
and self.info.vcs == "git"
and user_pass == "git"
):
return netloc
if ENV_VAR_RE.match(user_pass):
return netloc
return netloc_no_user_pass
@property
def redacted_url(self) -> str:
"""url with user:password part removed unless it is formed with
environment variables as specified in PEP 610, or it is ``git``
in the case of a git URL.
"""
purl = urllib.parse.urlsplit(self.url)
netloc = self._remove_auth_from_netloc(purl.netloc)
surl = urllib.parse.urlunsplit(
(purl.scheme, netloc, purl.path, purl.query, purl.fragment)
)
return surl
def validate(self) -> None:
self.from_dict(self.to_dict())
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
return DirectUrl(
url=_get_required(d, str, "url"),
subdirectory=_get(d, str, "subdirectory"),
info=_exactly_one_of(
[
ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
DirInfo._from_dict(_get(d, dict, "dir_info")),
VcsInfo._from_dict(_get(d, dict, "vcs_info")),
]
),
)
def to_dict(self) -> Dict[str, Any]:
res = _filter_none(
url=self.redacted_url,
subdirectory=self.subdirectory,
)
res[self.info.name] = self.info._to_dict()
return res
@classmethod
def from_json(cls, s: str) -> "DirectUrl":
return cls.from_dict(json.loads(s))
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True)
def is_local_editable(self) -> bool:
return isinstance(self.info, DirInfo) and self.info.editable