| """ |
| An implementation of `urlparse` that provides URL validation and normalization |
| as described by RFC3986. |
| |
| We rely on this implementation rather than the one in Python's stdlib, because: |
| |
| * It provides more complete URL validation. |
| * It properly differentiates between an empty querystring and an absent querystring, |
| to distinguish URLs with a trailing '?'. |
| * It handles scheme, hostname, port, and path normalization. |
| * It supports IDNA hostnames, normalizing them to their encoded form. |
| * The API supports passing individual components, as well as the complete URL string. |
| |
| Previously we relied on the excellent `rfc3986` package to handle URL parsing and |
| validation, but this module provides a simpler alternative, with less indirection |
| required. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import ipaddress |
| import re |
| import typing |
|
|
| import idna |
|
|
| from ._exceptions import InvalidURL |
|
|
| MAX_URL_LENGTH = 65536 |
|
|
| |
| UNRESERVED_CHARACTERS = ( |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~" |
| ) |
| SUB_DELIMS = "!$&'()*+,;=" |
|
|
| PERCENT_ENCODED_REGEX = re.compile("%[A-Fa-f0-9]{2}") |
|
|
| |
|
|
| |
| |
| FRAG_SAFE = "".join( |
| [chr(i) for i in range(0x20, 0x7F) if i not in (0x20, 0x22, 0x3C, 0x3E, 0x60)] |
| ) |
|
|
| |
| |
| QUERY_SAFE = "".join( |
| [chr(i) for i in range(0x20, 0x7F) if i not in (0x20, 0x22, 0x23, 0x3C, 0x3E)] |
| ) |
|
|
| |
| |
| PATH_SAFE = "".join( |
| [ |
| chr(i) |
| for i in range(0x20, 0x7F) |
| if i not in (0x20, 0x22, 0x23, 0x3C, 0x3E) + (0x3F, 0x60, 0x7B, 0x7D) |
| ] |
| ) |
|
|
| |
| |
| |
| USERNAME_SAFE = "".join( |
| [ |
| chr(i) |
| for i in range(0x20, 0x7F) |
| if i |
| not in (0x20, 0x22, 0x23, 0x3C, 0x3E) |
| + (0x3F, 0x60, 0x7B, 0x7D) |
| + (0x2F, 0x3A, 0x3B, 0x3D, 0x40, 0x5B, 0x5C, 0x5D, 0x5E, 0x7C) |
| ] |
| ) |
| PASSWORD_SAFE = "".join( |
| [ |
| chr(i) |
| for i in range(0x20, 0x7F) |
| if i |
| not in (0x20, 0x22, 0x23, 0x3C, 0x3E) |
| + (0x3F, 0x60, 0x7B, 0x7D) |
| + (0x2F, 0x3A, 0x3B, 0x3D, 0x40, 0x5B, 0x5C, 0x5D, 0x5E, 0x7C) |
| ] |
| ) |
| |
| |
| |
| USERINFO_SAFE = "".join( |
| [ |
| chr(i) |
| for i in range(0x20, 0x7F) |
| if i |
| not in (0x20, 0x22, 0x23, 0x3C, 0x3E) |
| + (0x3F, 0x60, 0x7B, 0x7D) |
| + (0x2F, 0x3B, 0x3D, 0x40, 0x5B, 0x5C, 0x5D, 0x5E, 0x7C) |
| ] |
| ) |
|
|
|
|
| |
| |
| |
| |
| |
| URL_REGEX = re.compile( |
| ( |
| r"(?:(?P<scheme>{scheme}):)?" |
| r"(?://(?P<authority>{authority}))?" |
| r"(?P<path>{path})" |
| r"(?:\?(?P<query>{query}))?" |
| r"(?:#(?P<fragment>{fragment}))?" |
| ).format( |
| scheme="([a-zA-Z][a-zA-Z0-9+.-]*)?", |
| authority="[^/?#]*", |
| path="[^?#]*", |
| query="[^#]*", |
| fragment=".*", |
| ) |
| ) |
|
|
| |
| |
| |
| AUTHORITY_REGEX = re.compile( |
| ( |
| r"(?:(?P<userinfo>{userinfo})@)?" r"(?P<host>{host})" r":?(?P<port>{port})?" |
| ).format( |
| userinfo=".*", |
| host="(\\[.*\\]|[^:@]*)", |
| |
| port=".*", |
| ) |
| ) |
|
|
|
|
| |
| |
| |
| COMPONENT_REGEX = { |
| "scheme": re.compile("([a-zA-Z][a-zA-Z0-9+.-]*)?"), |
| "authority": re.compile("[^/?#]*"), |
| "path": re.compile("[^?#]*"), |
| "query": re.compile("[^#]*"), |
| "fragment": re.compile(".*"), |
| "userinfo": re.compile("[^@]*"), |
| "host": re.compile("(\\[.*\\]|[^:]*)"), |
| "port": re.compile(".*"), |
| } |
|
|
|
|
| |
| |
| IPv4_STYLE_HOSTNAME = re.compile(r"^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$") |
| IPv6_STYLE_HOSTNAME = re.compile(r"^\[.*\]$") |
|
|
|
|
| class ParseResult(typing.NamedTuple): |
| scheme: str |
| userinfo: str |
| host: str |
| port: int | None |
| path: str |
| query: str | None |
| fragment: str | None |
|
|
| @property |
| def authority(self) -> str: |
| return "".join( |
| [ |
| f"{self.userinfo}@" if self.userinfo else "", |
| f"[{self.host}]" if ":" in self.host else self.host, |
| f":{self.port}" if self.port is not None else "", |
| ] |
| ) |
|
|
| @property |
| def netloc(self) -> str: |
| return "".join( |
| [ |
| f"[{self.host}]" if ":" in self.host else self.host, |
| f":{self.port}" if self.port is not None else "", |
| ] |
| ) |
|
|
| def copy_with(self, **kwargs: str | None) -> ParseResult: |
| if not kwargs: |
| return self |
|
|
| defaults = { |
| "scheme": self.scheme, |
| "authority": self.authority, |
| "path": self.path, |
| "query": self.query, |
| "fragment": self.fragment, |
| } |
| defaults.update(kwargs) |
| return urlparse("", **defaults) |
|
|
| def __str__(self) -> str: |
| authority = self.authority |
| return "".join( |
| [ |
| f"{self.scheme}:" if self.scheme else "", |
| f"//{authority}" if authority else "", |
| self.path, |
| f"?{self.query}" if self.query is not None else "", |
| f"#{self.fragment}" if self.fragment is not None else "", |
| ] |
| ) |
|
|
|
|
| def urlparse(url: str = "", **kwargs: str | None) -> ParseResult: |
| |
| |
|
|
| |
| if len(url) > MAX_URL_LENGTH: |
| raise InvalidURL("URL too long") |
|
|
| |
| |
| if any(char.isascii() and not char.isprintable() for char in url): |
| char = next(char for char in url if char.isascii() and not char.isprintable()) |
| idx = url.find(char) |
| error = ( |
| f"Invalid non-printable ASCII character in URL, {char!r} at position {idx}." |
| ) |
| raise InvalidURL(error) |
|
|
| |
| |
|
|
| |
| if "port" in kwargs: |
| port = kwargs["port"] |
| kwargs["port"] = str(port) if isinstance(port, int) else port |
|
|
| |
| if "netloc" in kwargs: |
| netloc = kwargs.pop("netloc") or "" |
| kwargs["host"], _, kwargs["port"] = netloc.partition(":") |
|
|
| |
| if "username" in kwargs or "password" in kwargs: |
| username = quote(kwargs.pop("username", "") or "", safe=USERNAME_SAFE) |
| password = quote(kwargs.pop("password", "") or "", safe=PASSWORD_SAFE) |
| kwargs["userinfo"] = f"{username}:{password}" if password else username |
|
|
| |
| if "raw_path" in kwargs: |
| raw_path = kwargs.pop("raw_path") or "" |
| kwargs["path"], seperator, kwargs["query"] = raw_path.partition("?") |
| if not seperator: |
| kwargs["query"] = None |
|
|
| |
| if "host" in kwargs: |
| host = kwargs.get("host") or "" |
| if ":" in host and not (host.startswith("[") and host.endswith("]")): |
| kwargs["host"] = f"[{host}]" |
|
|
| |
| |
|
|
| for key, value in kwargs.items(): |
| if value is not None: |
| if len(value) > MAX_URL_LENGTH: |
| raise InvalidURL(f"URL component '{key}' too long") |
|
|
| |
| |
| if any(char.isascii() and not char.isprintable() for char in value): |
| char = next( |
| char for char in value if char.isascii() and not char.isprintable() |
| ) |
| idx = value.find(char) |
| error = ( |
| f"Invalid non-printable ASCII character in URL {key} component, " |
| f"{char!r} at position {idx}." |
| ) |
| raise InvalidURL(error) |
|
|
| |
| if not COMPONENT_REGEX[key].fullmatch(value): |
| raise InvalidURL(f"Invalid URL component '{key}'") |
|
|
| |
| url_match = URL_REGEX.match(url) |
| assert url_match is not None |
| url_dict = url_match.groupdict() |
|
|
| |
| |
| |
| |
| |
| scheme = kwargs.get("scheme", url_dict["scheme"]) or "" |
| authority = kwargs.get("authority", url_dict["authority"]) or "" |
| path = kwargs.get("path", url_dict["path"]) or "" |
| query = kwargs.get("query", url_dict["query"]) |
| frag = kwargs.get("fragment", url_dict["fragment"]) |
|
|
| |
| authority_match = AUTHORITY_REGEX.match(authority) |
| assert authority_match is not None |
| authority_dict = authority_match.groupdict() |
|
|
| |
| |
| userinfo = kwargs.get("userinfo", authority_dict["userinfo"]) or "" |
| host = kwargs.get("host", authority_dict["host"]) or "" |
| port = kwargs.get("port", authority_dict["port"]) |
|
|
| |
| |
| |
| parsed_scheme: str = scheme.lower() |
| parsed_userinfo: str = quote(userinfo, safe=USERINFO_SAFE) |
| parsed_host: str = encode_host(host) |
| parsed_port: int | None = normalize_port(port, scheme) |
|
|
| has_scheme = parsed_scheme != "" |
| has_authority = ( |
| parsed_userinfo != "" or parsed_host != "" or parsed_port is not None |
| ) |
| validate_path(path, has_scheme=has_scheme, has_authority=has_authority) |
| if has_scheme or has_authority: |
| path = normalize_path(path) |
|
|
| parsed_path: str = quote(path, safe=PATH_SAFE) |
| parsed_query: str | None = None if query is None else quote(query, safe=QUERY_SAFE) |
| parsed_frag: str | None = None if frag is None else quote(frag, safe=FRAG_SAFE) |
|
|
| |
| |
| return ParseResult( |
| parsed_scheme, |
| parsed_userinfo, |
| parsed_host, |
| parsed_port, |
| parsed_path, |
| parsed_query, |
| parsed_frag, |
| ) |
|
|
|
|
| def encode_host(host: str) -> str: |
| if not host: |
| return "" |
|
|
| elif IPv4_STYLE_HOSTNAME.match(host): |
| |
| |
| |
| |
| |
| try: |
| ipaddress.IPv4Address(host) |
| except ipaddress.AddressValueError: |
| raise InvalidURL(f"Invalid IPv4 address: {host!r}") |
| return host |
|
|
| elif IPv6_STYLE_HOSTNAME.match(host): |
| |
| |
| |
| |
| |
| |
| |
| |
| try: |
| ipaddress.IPv6Address(host[1:-1]) |
| except ipaddress.AddressValueError: |
| raise InvalidURL(f"Invalid IPv6 address: {host!r}") |
| return host[1:-1] |
|
|
| elif host.isascii(): |
| |
| |
| |
| |
| |
| WHATWG_SAFE = '"`{}%|\\' |
| return quote(host.lower(), safe=SUB_DELIMS + WHATWG_SAFE) |
|
|
| |
| try: |
| return idna.encode(host.lower()).decode("ascii") |
| except idna.IDNAError: |
| raise InvalidURL(f"Invalid IDNA hostname: {host!r}") |
|
|
|
|
| def normalize_port(port: str | int | None, scheme: str) -> int | None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if port is None or port == "": |
| return None |
|
|
| try: |
| port_as_int = int(port) |
| except ValueError: |
| raise InvalidURL(f"Invalid port: {port!r}") |
|
|
| |
| default_port = {"ftp": 21, "http": 80, "https": 443, "ws": 80, "wss": 443}.get( |
| scheme |
| ) |
| if port_as_int == default_port: |
| return None |
| return port_as_int |
|
|
|
|
| def validate_path(path: str, has_scheme: bool, has_authority: bool) -> None: |
| """ |
| Path validation rules that depend on if the URL contains |
| a scheme or authority component. |
| |
| See https://datatracker.ietf.org/doc/html/rfc3986.html#section-3.3 |
| """ |
| if has_authority: |
| |
| |
| if path and not path.startswith("/"): |
| raise InvalidURL("For absolute URLs, path must be empty or begin with '/'") |
|
|
| if not has_scheme and not has_authority: |
| |
| |
| if path.startswith("//"): |
| raise InvalidURL("Relative URLs cannot have a path starting with '//'") |
|
|
| |
| |
| if path.startswith(":"): |
| raise InvalidURL("Relative URLs cannot have a path starting with ':'") |
|
|
|
|
| def normalize_path(path: str) -> str: |
| """ |
| Drop "." and ".." segments from a URL path. |
| |
| For example: |
| |
| normalize_path("/path/./to/somewhere/..") == "/path/to" |
| """ |
| |
| if "." not in path: |
| return path |
|
|
| components = path.split("/") |
|
|
| |
| if "." not in components and ".." not in components: |
| return path |
|
|
| |
| output: list[str] = [] |
| for component in components: |
| if component == ".": |
| pass |
| elif component == "..": |
| if output and output != [""]: |
| output.pop() |
| else: |
| output.append(component) |
| return "/".join(output) |
|
|
|
|
| def PERCENT(string: str) -> str: |
| return "".join([f"%{byte:02X}" for byte in string.encode("utf-8")]) |
|
|
|
|
| def percent_encoded(string: str, safe: str) -> str: |
| """ |
| Use percent-encoding to quote a string. |
| """ |
| NON_ESCAPED_CHARS = UNRESERVED_CHARACTERS + safe |
|
|
| |
| if not string.rstrip(NON_ESCAPED_CHARS): |
| return string |
|
|
| return "".join( |
| [char if char in NON_ESCAPED_CHARS else PERCENT(char) for char in string] |
| ) |
|
|
|
|
| def quote(string: str, safe: str) -> str: |
| """ |
| Use percent-encoding to quote a string, omitting existing '%xx' escape sequences. |
| |
| See: https://www.rfc-editor.org/rfc/rfc3986#section-2.1 |
| |
| * `string`: The string to be percent-escaped. |
| * `safe`: A string containing characters that may be treated as safe, and do not |
| need to be escaped. Unreserved characters are always treated as safe. |
| See: https://www.rfc-editor.org/rfc/rfc3986#section-2.3 |
| """ |
| parts = [] |
| current_position = 0 |
| for match in re.finditer(PERCENT_ENCODED_REGEX, string): |
| start_position, end_position = match.start(), match.end() |
| matched_text = match.group(0) |
| |
| if start_position != current_position: |
| leading_text = string[current_position:start_position] |
| parts.append(percent_encoded(leading_text, safe=safe)) |
|
|
| |
| parts.append(matched_text) |
| current_position = end_position |
|
|
| |
| if current_position != len(string): |
| trailing_text = string[current_position:] |
| parts.append(percent_encoded(trailing_text, safe=safe)) |
|
|
| return "".join(parts) |
|
|