From 13221ad1a775bca60d536b4cf81742bf42b40a62 Mon Sep 17 00:00:00 2001 From: "Michiel W. Beijen" Date: Mon, 18 May 2026 21:54:04 +0200 Subject: [PATCH] Add IPNetPattern for accurate CIDR proxy/mount matching Setting no_proxy patterns matched by hostname string comparison. So 192.168.0.1 would not match all://192.168.0.0/16. Add IPNetPattern, backed by ipaddress.ip_network, and a build_url_pattern() factory that returns IPNetPattern for all:// patterns containing a slash and WildcardURLPattern otherwise. Introduce a Pattern protocol so Client and AsyncClient can hold either kind in their _mounts dict. URLPattern is kept as a backward-compatible alias for WildcardURLPattern. Together with https://github.com/pydantic/httpx2/pull/967 will close - https://github.com/pydantic/httpx2/issues/829 - https://github.com/pydantic/httpx2/issues/899 Ported from https://github.com/encode/httpx/pull/3741. Co-Authored-By: Ondrej Filip Co-Authored-By: Bill Peck --- src/httpx2/httpx2/_client.py | 14 +++---- src/httpx2/httpx2/_utils.py | 81 ++++++++++++++++++++++++++++++++---- tests/httpx2/test_utils.py | 59 +++++++++++++++++++++----- 3 files changed, 128 insertions(+), 26 deletions(-) diff --git a/src/httpx2/httpx2/_client.py b/src/httpx2/httpx2/_client.py index f7535d33..68f492d2 100644 --- a/src/httpx2/httpx2/_client.py +++ b/src/httpx2/httpx2/_client.py @@ -46,7 +46,7 @@ TimeoutTypes, ) from ._urls import URL, QueryParams -from ._utils import URLPattern, get_environment_proxies +from ._utils import Pattern, build_url_pattern, get_environment_proxies if typing.TYPE_CHECKING: import ssl # pragma: no cover @@ -665,8 +665,8 @@ def __init__( limits=limits, transport=transport, ) - self._mounts: dict[URLPattern, BaseTransport | None] = { - URLPattern(key): None + self._mounts: dict[Pattern, BaseTransport | None] = { + build_url_pattern(key): None if proxy is None else self._init_proxy_transport( proxy, @@ -680,7 +680,7 @@ def __init__( for key, proxy in proxy_map.items() } if mounts is not None: - self._mounts.update({URLPattern(key): transport for key, transport in mounts.items()}) + self._mounts.update({build_url_pattern(key): transport for key, transport in mounts.items()}) self._mounts = dict(sorted(self._mounts.items())) @@ -1368,8 +1368,8 @@ def __init__( transport=transport, ) - self._mounts: dict[URLPattern, AsyncBaseTransport | None] = { - URLPattern(key): None + self._mounts: dict[Pattern, AsyncBaseTransport | None] = { + build_url_pattern(key): None if proxy is None else self._init_proxy_transport( proxy, @@ -1383,7 +1383,7 @@ def __init__( for key, proxy in proxy_map.items() } if mounts is not None: - self._mounts.update({URLPattern(key): transport for key, transport in mounts.items()}) + self._mounts.update({build_url_pattern(key): transport for key, transport in mounts.items()}) self._mounts = dict(sorted(self._mounts.items())) def _init_transport( diff --git a/src/httpx2/httpx2/_utils.py b/src/httpx2/httpx2/_utils.py index eef7b6dc..14130267 100644 --- a/src/httpx2/httpx2/_utils.py +++ b/src/httpx2/httpx2/_utils.py @@ -4,6 +4,7 @@ import os import re import typing +from abc import abstractmethod from urllib.request import getproxies from ._types import PrimitiveData @@ -115,24 +116,41 @@ def peek_filelike_length(stream: typing.Any) -> int | None: return length -class URLPattern: +class Pattern(typing.Protocol): + @abstractmethod + def matches(self, other: URL) -> bool: + """this method should never be accessed""" + + @property + @abstractmethod + def priority(self) -> tuple[int, int, int]: + """this property should never be accessed""" + + def __lt__(self, other: Pattern) -> bool: + """this method should never be accessed""" + + def __eq__(self, other: typing.Any) -> bool: + """this method should never be accessed""" + + +class WildcardURLPattern(Pattern): """ A utility class currently used for making lookups against proxy keys... # Wildcard matching... - >>> pattern = URLPattern("all://") + >>> pattern = WildcardURLPattern("all://") >>> pattern.matches(httpx2.URL("http://example.com")) True # Witch scheme matching... - >>> pattern = URLPattern("https://") + >>> pattern = WildcardURLPattern("https://") >>> pattern.matches(httpx2.URL("https://example.com")) True >>> pattern.matches(httpx2.URL("http://example.com")) False # With domain matching... - >>> pattern = URLPattern("https://example.com") + >>> pattern = WildcardURLPattern("https://example.com") >>> pattern.matches(httpx2.URL("https://example.com")) True >>> pattern.matches(httpx2.URL("http://example.com")) @@ -141,7 +159,7 @@ class URLPattern: False # Wildcard scheme, with domain matching... - >>> pattern = URLPattern("all://example.com") + >>> pattern = WildcardURLPattern("all://example.com") >>> pattern.matches(httpx2.URL("https://example.com")) True >>> pattern.matches(httpx2.URL("http://example.com")) @@ -150,7 +168,7 @@ class URLPattern: False # With port matching... - >>> pattern = URLPattern("https://example.com:1234") + >>> pattern = WildcardURLPattern("https://example.com:1234") >>> pattern.matches(httpx2.URL("https://example.com:1234")) True >>> pattern.matches(httpx2.URL("https://example.com")) @@ -199,7 +217,7 @@ def matches(self, other: URL) -> bool: @property def priority(self) -> tuple[int, int, int]: """ - The priority allows URLPattern instances to be sortable, so that + The priority allows WildcardURLPattern instances to be sortable, so that we can match from most specific to least specific. """ # URLs with a port should take priority over URLs without a port. @@ -213,11 +231,56 @@ def priority(self) -> tuple[int, int, int]: def __hash__(self) -> int: return hash(self.pattern) - def __lt__(self, other: URLPattern) -> bool: + def __lt__(self, other: Pattern) -> bool: + return self.priority < other.priority + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, WildcardURLPattern) and self.pattern == other.pattern + + +class IPNetPattern(Pattern): + def __init__(self, ip_net: str) -> None: + try: + addr, range = ip_net.split("/", 1) + if addr[0] == "[" and addr[-1] == "]": + addr = addr[1:-1] + ip_net = f"{addr}/{range}" + except ValueError: + pass # not a range + self.net = ipaddress.ip_network(ip_net) + + def matches(self, other: URL) -> bool: + try: + return ipaddress.ip_address(other.host) in self.net + except ValueError: + return False + + @property + def priority(self) -> tuple[int, int, int]: + return -1, 0, 0 # higher priority than WildcardURLPatterns + + def __hash__(self) -> int: + return hash(self.net) + + def __lt__(self, other: Pattern) -> bool: return self.priority < other.priority def __eq__(self, other: typing.Any) -> bool: - return isinstance(other, URLPattern) and self.pattern == other.pattern + return isinstance(other, IPNetPattern) and self.net == other.net + + +# Backward-compatible alias so existing code using URLPattern("...") keeps working. +URLPattern = WildcardURLPattern + + +def build_url_pattern(pattern: str) -> Pattern: + try: + proto, rest = pattern.split("://", 1) + if proto == "all" and "/" in rest: + return IPNetPattern(rest) + except ValueError: # covers .split() and IPNetPattern + pass + return WildcardURLPattern(pattern) def is_ipv4_hostname(hostname: str) -> bool: diff --git a/tests/httpx2/test_utils.py b/tests/httpx2/test_utils.py index f4bbd1fa..30aa4091 100644 --- a/tests/httpx2/test_utils.py +++ b/tests/httpx2/test_utils.py @@ -6,7 +6,12 @@ import pytest import httpx2 -from httpx2._utils import URLPattern, get_environment_proxies +from httpx2._utils import ( + IPNetPattern, + WildcardURLPattern, + build_url_pattern, + get_environment_proxies, +) @pytest.mark.parametrize( @@ -126,24 +131,58 @@ def test_get_environment_proxies(environment, proxies): ("http://", "https://example.com", False), ("all://", "https://example.com:123", True), ("", "https://example.com:123", True), + ("all://192.168.0.0/24", "http://192.168.0.1", True), + ("all://192.168.0.0/24", "https://192.168.1.1", False), + ("all://[2001:db8:abcd:0012::]/64", "http://[2001:db8:abcd:12::1]", True), + ("all://[2001:db8:abcd:0012::]/64", "http://[2001:db8:abcd:13::1]:8080", False), ], ) def test_url_matches(pattern, url, expected): - pattern = URLPattern(pattern) + pattern = build_url_pattern(pattern) assert pattern.matches(httpx2.URL(url)) == expected +@pytest.mark.parametrize( + ["pattern", "url", "expected"], + [ + ("all://192.168.0.0/24", "http://192.168.0.1", True), + ("all://192.168.0.1", "http://192.168.0.1", True), + ("all://192.168.0.0/24", "foobar", False), + ], +) +def test_IPNetPattern(pattern, url, expected): + proto, rest = pattern.split("://", 1) + pattern = IPNetPattern(rest) + assert pattern.matches(httpx2.URL(url)) == expected + + +def test_build_url_pattern(): + pattern1 = build_url_pattern("all://192.168.0.0/16") + pattern2 = build_url_pattern("all://192.168.0.0/16") + pattern3 = build_url_pattern("all://192.168.0.1") + assert isinstance(pattern1, IPNetPattern) + assert isinstance(pattern2, IPNetPattern) + assert isinstance(pattern3, WildcardURLPattern) + assert pattern1 == pattern2 + assert pattern2 != pattern3 + assert pattern1 < pattern3 + assert hash(pattern1) == hash(pattern2) + assert hash(pattern2) != hash(pattern3) + + def test_pattern_priority(): matchers = [ - URLPattern("all://"), - URLPattern("http://"), - URLPattern("http://example.com"), - URLPattern("http://example.com:123"), + build_url_pattern("all://"), + build_url_pattern("http://"), + build_url_pattern("http://example.com"), + build_url_pattern("http://example.com:123"), + build_url_pattern("all://192.168.0.0/16"), ] random.shuffle(matchers) assert sorted(matchers) == [ - URLPattern("http://example.com:123"), - URLPattern("http://example.com"), - URLPattern("http://"), - URLPattern("all://"), + build_url_pattern("all://192.168.0.0/16"), + build_url_pattern("http://example.com:123"), + build_url_pattern("http://example.com"), + build_url_pattern("http://"), + build_url_pattern("all://"), ]