Prevent redirects to loopback/internal IPs (SSRF)

This commit is contained in:
Talmaj Marinc 2026-06-30 17:35:06 +02:00
parent 7690c52a34
commit 312b282ca8
3 changed files with 68 additions and 17 deletions

View File

@ -53,7 +53,7 @@ async def _resolve_final_response(
current = url
hops = 0
while True:
check_redirect_hop(current)
check_redirect_hop(current, is_initial_url=(hops == 0))
parts = urlsplit(current)
auth = await resolve_auth_for_hop(
parts.hostname or "", parts.scheme, explicit_credential_id=credential_id

View File

@ -9,14 +9,18 @@ Two cooperating layers:
resolve and the connect happen together inside the connector, there is no
check-then-connect window for DNS rebinding to exploit.
2. :func:`check_redirect_hop` re-validates every redirect hop. The host
allowlist gates only the *initial* user-supplied URL (anti-SSRF for
arbitrary input); legitimate downloads from allowlisted origins redirect
to presigned CDN hosts that are deliberately NOT on the allowlist (HF ->
2. :func:`check_redirect_hop` re-validates every hop. The host allowlist gates
only the *initial* user-supplied URL (anti-SSRF for arbitrary input);
legitimate downloads from allowlisted origins redirect to presigned CDN
hosts that are deliberately NOT on the allowlist (HF ->
``cdn-lfs*.huggingface.co``, Civitai -> signed Cloudflare/S3), so hops are
instead screened for scheme, embedded credentials, and via the resolver
above private IPs. Credentials are only ever attached when a hop's host
exactly matches a stored credential, so they are dropped on the CDN hop.
Loopback (the "download a local model" feature) is exempt from IP filtering
only for the initial URL: a *redirect* may never target a loopback host or
a blocked IP-literal, which the resolver alone can't enforce (it exempts
loopback literals and never sees IP literals through DNS).
"""
from __future__ import annotations
@ -109,28 +113,51 @@ class ValidatingResolver(AbstractResolver):
await self._inner.close()
def check_redirect_hop(url: str) -> str:
"""Validate one redirect hop's URL.
def check_redirect_hop(url: str, *, is_initial_url: bool = False) -> str:
"""Validate one hop's URL.
Returns the URL unchanged on success; raises :class:`SSRFError` otherwise.
Requires https for external hosts (http only for loopback/approved dev
hosts) and forbids credentials-in-URL. The host is NOT re-checked against
the allowlist (CDN redirect targets are off-list by design); private-IP
protection is provided by the connector's resolver, and credential leakage
is prevented by exact host matching at attach time. The landing filename's
extension is gated separately by the caller.
the allowlist (CDN redirect targets are off-list by design); credential
leakage is prevented by exact host matching at attach time, and the landing
filename's extension is gated separately by the caller.
Loopback/blocked-IP screening: the connector's resolver filters resolvable
hostnames but exempts literal loopback hosts (``localhost``/``127.0.0.1``/
``::1``) and never sees IP literals through DNS. That loopback exemption is
legitimate only for the *initial* user-supplied URL (``is_initial_url``);
on a redirect hop we reject loopback hosts and any blocked IP-literal here,
so a 30x can't steer a server-side GET at loopback/internal services.
"""
try:
parsed = urlparse(url)
except ValueError as e:
raise SSRFError(f"unparseable redirect URL {url!r}: {e}") from e
if not parsed.hostname:
host = parsed.hostname
if not host:
raise SSRFError(f"redirect URL has no host: {url!r}")
if not is_scheme_allowed(parsed.scheme, parsed.hostname):
if not is_scheme_allowed(parsed.scheme, host):
raise SSRFError(
f"redirect to disallowed scheme {parsed.scheme!r} for host "
f"{parsed.hostname!r} (https required for external hosts)"
f"{host!r} (https required for external hosts)"
)
if parsed.username or parsed.password:
raise SSRFError("credentials-in-URL are not allowed")
host_is_loopback = host.lower() in LOOPBACK_HOSTS
if not is_initial_url and host_is_loopback:
raise SSRFError(f"redirect to loopback host {host!r} is not allowed")
# IP-literal targets never go through DNS, so the connector's resolver can't
# screen them — check them directly. The only blocked IP allowed through is
# a loopback literal on the initial URL (handled by the exemption above).
try:
ipaddress.ip_address(host)
except ValueError:
is_ip_literal = False
else:
is_ip_literal = True
if is_ip_literal and is_blocked_ip(host) and not (
is_initial_url and host_is_loopback
):
raise SSRFError(f"redirect to blocked internal address {host!r}")
return url

View File

@ -89,9 +89,33 @@ def test_check_redirect_hop_http_only_for_loopback():
# Plain http to an external host is rejected (no plaintext downgrade).
with pytest.raises(SSRFError):
check_redirect_hop("http://cdn-lfs.huggingface.co/abc")
# http is still honored for loopback/approved dev hosts.
assert check_redirect_hop("http://localhost/x.safetensors") is not None
assert check_redirect_hop("http://127.0.0.1/x.safetensors") is not None
# http is honored for loopback only on the initial user-supplied URL (the
# "download a local model" feature).
assert (
check_redirect_hop("http://localhost/x.safetensors", is_initial_url=True)
is not None
)
assert (
check_redirect_hop("http://127.0.0.1/x.safetensors", is_initial_url=True)
is not None
)
def test_check_redirect_hop_blocks_loopback_and_ip_literals_on_redirect():
# A redirect (is_initial_url=False, the default) must never reach loopback,
# whether by hostname or by IP literal, nor any other internal IP literal.
for target in (
"http://localhost/x.safetensors",
"http://127.0.0.1/x.safetensors",
"https://[::1]/x.safetensors",
"https://169.254.169.254/x.safetensors", # cloud metadata
"https://10.0.0.5/x.safetensors", # RFC1918
):
with pytest.raises(SSRFError):
check_redirect_hop(target)
# Off-allowlist public CDN hosts (hostnames) remain valid redirect targets;
# their resolved IPs are screened by the connector's resolver.
assert check_redirect_hop("https://cdn-lfs.huggingface.co/abc") is not None
# ----- path safety -----