diff --git a/apps/api/plane/app/views/webhook/base.py b/apps/api/plane/app/views/webhook/base.py
index c874f0a4225..24496c742bf 100644
--- a/apps/api/plane/app/views/webhook/base.py
+++ b/apps/api/plane/app/views/webhook/base.py
@@ -81,7 +81,7 @@ def patch(self, request, slug, pk):
serializer = WebhookSerializer(
webhook,
data=request.data,
- context={request: request},
+ context={"request": request},
partial=True,
fields=(
"id",
diff --git a/apps/api/plane/authentication/adapter/base.py b/apps/api/plane/authentication/adapter/base.py
index b80dfa5002f..1b906c4a8d4 100644
--- a/apps/api/plane/authentication/adapter/base.py
+++ b/apps/api/plane/authentication/adapter/base.py
@@ -8,10 +8,10 @@
import uuid
from io import BytesIO
-import requests
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
+from plane.utils.url_security import pinned_fetch_following_redirects
# Django imports
from django.utils import timezone
@@ -146,48 +146,62 @@ def download_and_upload_avatar(self, avatar_url, user):
try:
headers = self.get_avatar_download_headers()
- # Download the avatar image
- response = requests.get(avatar_url, timeout=10, headers=headers)
- response.raise_for_status()
-
- # Check content length before downloading
- content_length = response.headers.get("Content-Length")
- max_size = settings.DATA_UPLOAD_MAX_MEMORY_SIZE
- if content_length and int(content_length) > max_size:
- return None
+ # Download the avatar image over an SSRF-safe client: the avatar URL
+ # comes from the OAuth provider's (attacker-influenceable) profile
+ # data, so it must not be allowed to reach internal addresses. The
+ # connection is pinned to the validated IP (defeats DNS rebinding)
+ # and every redirect hop is re-validated, so a public URL cannot
+ # bounce the fetch to an internal target — GHSA-cv9p-325g-wmv5 /
+ # GHSA-hx79-5pj5-qh42 (avatar hop).
+ # stream=True so the body is read incrementally and the size cap
+ # below actually bounds memory (without it, requests buffers the
+ # whole body before any check runs).
+ response, _ = pinned_fetch_following_redirects(
+ "GET", avatar_url, headers=headers, timeout=10, max_redirects=5, stream=True
+ )
+ try:
+ response.raise_for_status()
- # Get content type and determine file extension
- content_type = response.headers.get("Content-Type", "image/jpeg")
- extension_map = {
- "image/jpeg": "jpg",
- "image/jpg": "jpg",
- "image/png": "png",
- "image/gif": "gif",
- "image/webp": "webp",
- }
- extension = extension_map.get(content_type)
-
- if not extension:
- return None
+ # Check content length before downloading
+ content_length = response.headers.get("Content-Length")
+ max_size = settings.DATA_UPLOAD_MAX_MEMORY_SIZE
+ if content_length and int(content_length) > max_size:
+ return None
- # Download with size limit
- chunks = []
- total_size = 0
- for chunk in response.iter_content(chunk_size=8192):
- total_size += len(chunk)
- if total_size > max_size:
+ # Get content type and determine file extension
+ content_type = response.headers.get("Content-Type", "image/jpeg")
+ extension_map = {
+ "image/jpeg": "jpg",
+ "image/jpg": "jpg",
+ "image/png": "png",
+ "image/gif": "gif",
+ "image/webp": "webp",
+ }
+ extension = extension_map.get(content_type)
+
+ if not extension:
return None
- chunks.append(chunk)
- content = b"".join(chunks)
- file_size = len(content)
+
+ # Download with size limit
+ chunks = []
+ total_size = 0
+ for chunk in response.iter_content(chunk_size=8192):
+ total_size += len(chunk)
+ if total_size > max_size:
+ return None
+ chunks.append(chunk)
+ content = b"".join(chunks)
+ file_size = len(content)
+ finally:
+ response.close()
# Generate unique filename
filename = f"{uuid.uuid4().hex}-user-avatar.{extension}"
storage = S3Storage(request=self.request)
- # Create file-like object
- file_obj = BytesIO(response.content)
+ # Create file-like object from the size-bounded buffer
+ file_obj = BytesIO(content)
file_obj.seek(0)
# Upload using boto3 directly
diff --git a/apps/api/plane/bgtasks/webhook_task.py b/apps/api/plane/bgtasks/webhook_task.py
index 02e6915d4e3..9fa6c704f62 100644
--- a/apps/api/plane/bgtasks/webhook_task.py
+++ b/apps/api/plane/bgtasks/webhook_task.py
@@ -52,7 +52,7 @@
from plane.license.utils.instance_value import get_email_configuration
from plane.utils.email import generate_plain_text_from_html
from plane.utils.exception_logger import log_exception
-from plane.utils.ip_address import validate_url
+from plane.utils.url_security import pinned_fetch
SERIALIZER_MAPPER = {
@@ -307,22 +307,20 @@ def webhook_send_task(
return
try:
- # Re-validate the webhook URL at send time to prevent DNS-rebinding attacks
- validate_url(
+ # Resolve + validate the webhook URL and pin the connection to the
+ # validated IP. Pinning closes the DNS-rebinding TOCTOU (validating the
+ # name then letting requests re-resolve it lets an attacker swap in an
+ # internal IP between the two lookups). Redirects are never followed, so
+ # a 3xx Location cannot bounce the request to an internal address
+ # (GHSA-mq87-52pf-hm3h / cluster C).
+ response = pinned_fetch(
+ "POST",
webhook.url,
allowed_ips=settings.WEBHOOK_ALLOWED_IPS,
allowed_hosts=settings.WEBHOOK_ALLOWED_HOSTS,
- )
-
- # Send the webhook event
- # allow_redirects=False prevents SSRF via 3xx hops to internal addresses
- # bypassing the validate_url() check above (GHSA-mq87-52pf-hm3h).
- response = requests.post(
- webhook.url,
headers=headers,
json=payload,
timeout=30,
- allow_redirects=False,
)
# Log the webhook request
@@ -366,6 +364,25 @@ def webhook_send_task(
return
raise requests.RequestException()
+ except ValueError as e:
+ # SSRF validation failure (blocked/internal target or unresolvable host).
+ # Not retryable — record it so the failure is visible to the admin, but
+ # do not raise (no Celery retry) and do not auto-deactivate (the cause
+ # may be transient DNS).
+ save_webhook_log(
+ webhook=webhook,
+ request_method=action,
+ request_headers=headers,
+ request_body=payload,
+ response_status=400,
+ response_headers="",
+ response_body=f"Webhook URL rejected: {e}",
+ retry_count=self.request.retries,
+ event_type=event,
+ )
+ logger.warning(f"Webhook {webhook.id} URL rejected: {e}")
+ return
+
except Exception as e:
log_exception(e)
return
diff --git a/apps/api/plane/bgtasks/work_item_link_task.py b/apps/api/plane/bgtasks/work_item_link_task.py
index cae6a4feb4f..b15d0af9df6 100644
--- a/apps/api/plane/bgtasks/work_item_link_task.py
+++ b/apps/api/plane/bgtasks/work_item_link_task.py
@@ -17,6 +17,8 @@
from typing import Optional
from plane.db.models import IssueLink
from plane.utils.exception_logger import log_exception
+from plane.utils.ip_address import is_blocked_ip
+from plane.utils.url_security import pinned_fetch, pinned_fetch_following_redirects
logger = logging.getLogger("plane.worker")
@@ -50,16 +52,19 @@ def validate_url_ip(url: str) -> None:
try:
addr_info = socket.getaddrinfo(hostname, None)
- except socket.gaierror:
+ except (socket.gaierror, UnicodeError):
+ # UnicodeError covers IDNA failures raised before the address lookup.
raise ValueError("Hostname could not be resolved")
if not addr_info:
raise ValueError("No IP addresses found for the hostname")
- # Check every resolved IP against blocked ranges to prevent SSRF
+ # Check every resolved IP against blocked ranges to prevent SSRF. The
+ # actual fetch is pinned to the validated IP (see safe_get), so this acts
+ # as an early, fail-closed pre-filter.
for addr in addr_info:
- ip = ipaddress.ip_address(addr[4][0])
- if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
+ ip = ipaddress.ip_address(addr[4][0].split("%")[0])
+ if is_blocked_ip(ip):
raise ValueError("Access to private/internal networks is not allowed")
@@ -72,8 +77,9 @@ def safe_get(
timeout: int = 1,
) -> Tuple[requests.Response, str]:
"""
- Perform a GET request that validates every redirect hop against private IPs.
- Prevents SSRF by ensuring no redirect lands on a private/internal address.
+ Perform a GET request that resolves, validates and pins every hop to its
+ validated IP. Prevents SSRF via private/internal targets, DNS rebinding
+ (TOCTOU) and redirects that bounce to internal addresses.
Args:
url: The URL to fetch
@@ -85,32 +91,16 @@ def safe_get(
Raises:
ValueError: If any URL in the redirect chain points to a private IP
- requests.RequestException: On network errors
- RuntimeError: If max redirects exceeded
+ requests.RequestException: On network errors (incl. TooManyRedirects)
"""
- validate_url_ip(url)
-
- current_url = url
- response = requests.get(
- current_url, headers=headers, timeout=timeout, allow_redirects=False
+ return pinned_fetch_following_redirects(
+ "GET",
+ url,
+ headers=headers,
+ timeout=timeout,
+ max_redirects=MAX_REDIRECTS,
)
- redirect_count = 0
- while response.is_redirect:
- if redirect_count >= MAX_REDIRECTS:
- raise RuntimeError(f"Too many redirects for URL: {url}")
- redirect_url = response.headers.get("Location")
- if not redirect_url:
- break
- current_url = urljoin(current_url, redirect_url)
- validate_url_ip(current_url)
- redirect_count += 1
- response = requests.get(
- current_url, headers=headers, timeout=timeout, allow_redirects=False
- )
-
- return response, current_url
-
def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
"""
@@ -199,14 +189,13 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
parsed_url = urlparse(base_url)
fallback_url = f"{parsed_url.scheme}://{parsed_url.netloc}/favicon.ico"
- # Check if fallback exists
+ # Check if fallback exists (pinned to the validated IP).
try:
- validate_url_ip(fallback_url)
- response = requests.head(fallback_url, timeout=2, allow_redirects=False)
+ response = pinned_fetch("HEAD", fallback_url, timeout=2)
if response.status_code == 200:
return fallback_url
- except requests.RequestException as e:
+ except (requests.RequestException, ValueError) as e:
log_exception(e, warning=True)
return None
diff --git a/apps/api/plane/tests/unit/bg_tasks/test_ssrf_advisories.py b/apps/api/plane/tests/unit/bg_tasks/test_ssrf_advisories.py
new file mode 100644
index 00000000000..ea4adbc1c20
--- /dev/null
+++ b/apps/api/plane/tests/unit/bg_tasks/test_ssrf_advisories.py
@@ -0,0 +1,289 @@
+# Copyright (c) 2023-present Plane Software, Inc. and contributors
+# SPDX-License-Identifier: AGPL-3.0-only
+# See the LICENSE file for details.
+
+"""
+Per-advisory SSRF regression tests.
+
+Each test reproduces a published / reported SSRF advisory scenario and asserts
+the current code blocks it. This file is the auditable map of "which advisory is
+covered where"; the lower-level mechanics (IP classification, pinning, redirect
+re-validation) are exercised in detail in ``test_url_security.py`` and
+``test_work_item_link_task.py``.
+
+Advisory coverage
+-----------------
+Webhook delivery
+ * GHSA-m3f8-q4wj-9grv / CVE-2026-30242 / GHSA-75vf-hh93-h7mx
+ webhook URL resolves to a private/metadata/loopback IP -> TestWebhookUrlValidation
+ * GHSA-75fg-f8qg-23wv CGNAT(100.64/10), 6to4, multicast missed -> TestWebhookUrlValidation
+ * GHSA-6485-m23r-fx8q PATCH serializer context-key bypass -> TestWebhookPatchContextGuard
+ * GHSA-whh3-5g95-4qhc / -4mjx-q738-87cf / -6p39-x6q9-h3g5 /
+ -9292-pvg4-7hvm / -fgcv-6h3f-xcx9 webhook DNS-rebinding TOCTOU -> TestWebhookRebinding
+ * GHSA-6v37-328w-j2wv / -jw6g-h7h5-rfc6 / -mq87-52pf-hm3h
+ webhook SSRF via HTTP redirect following -> TestWebhookRedirect
+
+Work-item link unfurling / favicon
+ * GHSA-8wvv-p676-hcw4 / -fv24-3845-646g / -9292-pvg4-7hvm link rebinding
+ * GHSA-9fr2-pprw-pp9j / CVE-2026-39843 favicon redirect SSRF -> TestFaviconRedirect
+ * GHSA-3856-6mgg-rx84 favicon DNS-rebinding -> TestFaviconRebinding
+
+OAuth avatar (the still-unresolved family this change adds)
+ * GHSA-cv9p-325g-wmv5 OAuth avatar redirect SSRF -> static-asset exfil
+ * GHSA-hx79-5pj5-qh42 Gitea OAuth SSRF (avatar hop) -> TestOAuthAvatarSSRF
+"""
+
+import pytest
+import requests
+from unittest.mock import MagicMock, patch
+
+from bs4 import BeautifulSoup
+
+from plane.utils.ip_address import validate_url
+from plane.bgtasks.work_item_link_task import fetch_and_encode_favicon, DEFAULT_FAVICON
+from plane.authentication.adapter.base import Adapter
+
+
+def _addr(ip):
+ family = 6 if ":" in ip else 2
+ return (family, None, None, None, (ip, 0))
+
+
+def _resp(status_code=200, headers=None, content=b"OK"):
+ resp = MagicMock(spec=requests.Response)
+ resp.status_code = status_code
+ resp.headers = headers or {}
+ resp.content = content
+ return resp
+
+
+_BLOCKED = "Access to private/internal networks is not allowed"
+
+
+# ---------------------------------------------------------------------------
+# Webhook URL validation (creation/update-time defense in depth)
+# GHSA-m3f8-q4wj-9grv / CVE-2026-30242 / GHSA-75vf-hh93-h7mx / GHSA-75fg-f8qg-23wv
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestWebhookUrlValidation:
+ @pytest.mark.parametrize(
+ "ip",
+ [
+ "169.254.169.254", # AWS/GCP metadata (CVE-2026-30242 PoC)
+ "127.0.0.1", # loopback
+ "10.0.0.1", # private
+ "172.16.0.1", # private
+ "192.168.0.1", # private
+ "::1", # IPv6 loopback
+ "100.64.0.1", # CGNAT / RFC 6598 (GHSA-75fg)
+ "2002:7f00:1::", # 6to4 -> 127.0.0.1 (GHSA-75fg)
+ "224.0.0.1", # multicast (GHSA-75fg)
+ "::ffff:169.254.169.254", # IPv4-mapped metadata
+ ],
+ )
+ def test_webhook_url_to_internal_is_rejected(self, ip):
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr(ip)]
+ with pytest.raises(ValueError, match="private/internal"):
+ validate_url(
+ "https://attacker.example.com/hook",
+ allowed_ips=[],
+ allowed_hosts=[],
+ )
+
+ def test_legitimate_public_webhook_url_passes(self):
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("93.184.216.34")]
+ # Should not raise
+ validate_url("https://hooks.example.com/x", allowed_ips=[], allowed_hosts=[])
+
+
+# ---------------------------------------------------------------------------
+# GHSA-6485-m23r-fx8q — PATCH serializer context-key bypass
+# The PATCH view now passes context={"request": request}; with the request in
+# context the disallowed-domain / request-host loop-back guard runs on update.
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestWebhookPatchContextGuard:
+ def _serializer_with_request(self, host):
+ from plane.app.serializers import WebhookSerializer
+
+ request = MagicMock()
+ request.get_host.return_value = host
+ return WebhookSerializer(context={"request": request})
+
+ def test_request_host_is_blocked_when_context_present(self):
+ # A webhook pointed at the instance's own host must be rejected — this
+ # is the guard the PATCH endpoint silently skipped with the wrong key.
+ ser = self._serializer_with_request("myplane.example.com")
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("93.184.216.34")] # public, so only the host guard can block
+ with pytest.raises(Exception, match="not allowed"):
+ ser._validate_webhook_url("https://myplane.example.com/hook")
+
+ def test_unrelated_public_host_passes_with_context(self):
+ ser = self._serializer_with_request("myplane.example.com")
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("93.184.216.34")]
+ ser._validate_webhook_url("https://hooks.partner.com/x") # should not raise
+
+
+# ---------------------------------------------------------------------------
+# Webhook DNS-rebinding TOCTOU
+# GHSA-whh3-5g95-4qhc / -4mjx-q738-87cf / -6p39-x6q9-h3g5 / -9292 / -fgcv
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestWebhookRebinding:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_connection_pinned_to_validated_ip(self, mock_resolve, mock_session_cls):
+ from plane.utils.url_security import pinned_fetch
+
+ # The validator resolves to a public IP; the connection must go to THAT
+ # IP literal, so a rebind to an internal IP after validation is moot.
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch("POST", "https://rebinder.example.com/hook", json={})
+
+ _, url = session.request.call_args.args
+ assert url == "https://93.184.216.34:443/hook" # IP literal -> no 2nd DNS lookup
+
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_rebind_to_internal_is_blocked(self, mock_resolve):
+ from plane.utils.url_security import pinned_fetch
+
+ mock_resolve.side_effect = ValueError(_BLOCKED)
+ with pytest.raises(ValueError, match="private/internal"):
+ pinned_fetch("POST", "https://rebinder.example.com/hook", json={})
+
+
+# ---------------------------------------------------------------------------
+# Webhook SSRF via HTTP redirect following
+# GHSA-6v37-328w-j2wv / GHSA-jw6g-h7h5-rfc6 / GHSA-mq87-52pf-hm3h
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestWebhookRedirect:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_webhook_does_not_follow_redirects(self, mock_resolve, mock_session_cls):
+ from plane.utils.url_security import pinned_fetch
+
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ # The endpoint replies 302 -> internal; the webhook client must NOT follow.
+ session.request.return_value = _resp(
+ 302, headers={"Location": "http://169.254.169.254/latest/meta-data/"}
+ )
+
+ resp = pinned_fetch("POST", "https://hooks.example.com/x", json={})
+
+ # The 3xx is returned as-is and only ONE request was made (no follow).
+ assert resp.status_code == 302
+ assert session.request.call_count == 1
+ assert session.request.call_args.kwargs["allow_redirects"] is False
+
+
+# ---------------------------------------------------------------------------
+# Favicon redirect SSRF — GHSA-9fr2-pprw-pp9j / CVE-2026-39843
+# A whose href is public but 30x-redirects to a private IP must
+# NOT exfiltrate internal content; the favicon falls back to the default icon.
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestFaviconRedirect:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ @patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo")
+ def test_favicon_redirect_to_private_returns_default(
+ self, mock_pre_dns, mock_resolve, mock_session_cls
+ ):
+ # validate_url_ip pre-check (work_item_link_task.socket) sees a public IP.
+ mock_pre_dns.return_value = [_addr("93.184.216.34")]
+ # safe_get: hop0 public, hop1 (redirect target) blocked.
+ mock_resolve.side_effect = [["93.184.216.34"], ValueError(_BLOCKED)]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(
+ 302, headers={"Location": "http://192.168.8.14:8081/"}
+ )
+
+ soup = BeautifulSoup(
+ '',
+ "html.parser",
+ )
+ result = fetch_and_encode_favicon({}, soup, "https://attacker.example.com")
+
+ # Blocked -> default icon, NOT the internal response body.
+ assert result["favicon_base64"] == f"data:image/svg+xml;base64,{DEFAULT_FAVICON}"
+
+
+# ---------------------------------------------------------------------------
+# Favicon DNS rebinding — GHSA-3856-6mgg-rx84
+# The favicon host passes the pre-check (public) but resolves to a private IP at
+# fetch time; the pinned client re-resolves+validates and blocks it.
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestFaviconRebinding:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ @patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo")
+ def test_favicon_rebind_to_private_returns_default(
+ self, mock_pre_dns, mock_resolve, mock_session_cls
+ ):
+ mock_pre_dns.return_value = [_addr("93.184.216.34")] # pre-check: public
+ mock_resolve.side_effect = ValueError(_BLOCKED) # fetch-time: rebound -> blocked
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ soup = BeautifulSoup(
+ '',
+ "html.parser",
+ )
+ result = fetch_and_encode_favicon({}, soup, "https://attacker.example.com")
+ assert result["favicon_base64"] == f"data:image/svg+xml;base64,{DEFAULT_FAVICON}"
+
+
+# ---------------------------------------------------------------------------
+# OAuth avatar SSRF — GHSA-cv9p-325g-wmv5 / GHSA-hx79-5pj5-qh42 (avatar hop)
+# download_and_upload_avatar must reject avatar URLs that point at, or redirect
+# to, internal addresses, returning None (no fetch stored as an asset).
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestOAuthAvatarSSRF:
+ def _adapter(self):
+ return Adapter(request=MagicMock(), provider="gitea")
+
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_avatar_to_internal_ip_is_blocked(self, mock_resolve):
+ mock_resolve.side_effect = ValueError(_BLOCKED)
+ result = self._adapter().download_and_upload_avatar(
+ "http://169.254.169.254/latest/meta-data/", user=MagicMock()
+ )
+ assert result is None
+ mock_resolve.assert_called() # SSRF validation was actually attempted
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_avatar_redirect_to_internal_is_blocked(self, mock_resolve, mock_session_cls):
+ # Public avatar URL that 302-redirects to the metadata service.
+ mock_resolve.side_effect = [["93.184.216.34"], ValueError(_BLOCKED)]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(
+ 302, headers={"Location": "http://169.254.169.254/imds"}
+ )
+ result = self._adapter().download_and_upload_avatar(
+ "https://evil.example.com/avatar", user=MagicMock()
+ )
+ assert result is None
+
+ @patch("plane.authentication.adapter.base.pinned_fetch_following_redirects")
+ def test_avatar_uses_ssrf_safe_client(self, mock_fetch):
+ # Wiring guard: the avatar path must go through the pinned client, never
+ # a raw requests.get (which would re-resolve + follow redirects freely).
+ mock_fetch.side_effect = ValueError(_BLOCKED)
+ result = self._adapter().download_and_upload_avatar(
+ "https://cdn.example.com/a.png", user=MagicMock()
+ )
+ assert result is None
+ assert mock_fetch.call_args.args[0] == "GET"
+ assert mock_fetch.call_args.args[1] == "https://cdn.example.com/a.png"
diff --git a/apps/api/plane/tests/unit/bg_tasks/test_url_security.py b/apps/api/plane/tests/unit/bg_tasks/test_url_security.py
new file mode 100644
index 00000000000..3a1e8d3dba1
--- /dev/null
+++ b/apps/api/plane/tests/unit/bg_tasks/test_url_security.py
@@ -0,0 +1,395 @@
+# Copyright (c) 2023-present Plane Software, Inc. and contributors
+# SPDX-License-Identifier: AGPL-3.0-only
+# See the LICENSE file for details.
+
+"""
+SSRF-protection tests for the webhook + link-unfurling clusters (advisories A/B/C):
+
+ A — incomplete private-IP validation -> is_blocked_ip hardening
+ B — DNS-rebinding TOCTOU -> connection pinned to the validated IP
+ C — SSRF via HTTP redirect following -> redirects re-resolved/re-validated/re-pinned
+"""
+
+import ipaddress
+
+import pytest
+import requests
+from unittest.mock import MagicMock, patch
+
+from plane.utils.ip_address import is_blocked_ip, resolve_and_validate, validate_url
+from plane.utils.url_security import (
+ PinnedIPAdapter,
+ pinned_fetch,
+ pinned_fetch_following_redirects,
+)
+
+
+def _addr(ip):
+ """Build a single getaddrinfo-style result tuple for an IP string."""
+ family = 6 if ":" in ip else 2
+ return (family, None, None, None, (ip, 0))
+
+
+def _resp(status_code=200, headers=None, content=b"OK"):
+ resp = MagicMock(spec=requests.Response)
+ resp.status_code = status_code
+ resp.headers = headers or {}
+ resp.content = content
+ return resp
+
+
+# ---------------------------------------------------------------------------
+# Cluster A — robust IP classification (verified on Python 3.12 semantics)
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestIsBlockedIp:
+ @pytest.mark.parametrize(
+ "ip",
+ [
+ "127.0.0.1", # loopback
+ "10.0.0.1", # private
+ "192.168.1.1", # private
+ "172.16.0.1", # private
+ "169.254.169.254", # link-local / cloud metadata
+ "0.0.0.0", # unspecified
+ "100.64.0.1", # CGNAT / shared (NOT is_private on py3.12!)
+ "224.0.0.1", # multicast
+ "239.255.255.250", # SSDP multicast
+ "255.255.255.255", # limited broadcast
+ "::1", # IPv6 loopback
+ "fe80::1", # IPv6 link-local
+ "fc00::1", # IPv6 unique-local
+ "ff02::1", # IPv6 multicast
+ "::ffff:127.0.0.1", # IPv4-mapped loopback
+ "::ffff:169.254.169.254", # IPv4-mapped metadata
+ "::ffff:10.0.0.1", # IPv4-mapped private
+ "64:ff9b::7f00:1", # NAT64 well-known prefix embedding 127.0.0.1
+ "64:ff9b::a9fe:a9fe", # NAT64 well-known prefix embedding 169.254.169.254
+ "64:ff9b:1::7f00:1", # NAT64 local-use prefix (RFC 8215, /48)
+ "64:ff9b:1:0100::1", # NAT64 local-use prefix, outside the /96 subset
+ "2002:7f00:1::", # 6to4 embedding 127.0.0.1
+ "2002:a00:1::", # 6to4 embedding 10.0.0.1
+ ],
+ )
+ def test_blocks_internal(self, ip):
+ assert is_blocked_ip(ipaddress.ip_address(ip)) is True
+
+ @pytest.mark.parametrize(
+ "ip",
+ [
+ "8.8.8.8",
+ "93.184.216.34",
+ "1.1.1.1",
+ "2606:4700:4700::1111", # public IPv6 (Cloudflare)
+ "2001:4860:4860::8888", # public IPv6 (Google)
+ ],
+ )
+ def test_allows_public(self, ip):
+ assert is_blocked_ip(ipaddress.ip_address(ip)) is False
+
+
+# ---------------------------------------------------------------------------
+# resolve_and_validate — resolution + validation, returns IPs to pin
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestResolveAndValidate:
+ def test_returns_public_ips(self):
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("93.184.216.34")]
+ assert resolve_and_validate("example.com") == ["93.184.216.34"]
+
+ def test_raises_on_private(self):
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("10.0.0.1")]
+ with pytest.raises(ValueError, match="private/internal"):
+ resolve_and_validate("internal.example.com")
+
+ def test_raises_if_any_resolved_ip_is_private(self):
+ # A hostname that resolves to BOTH a public and a private IP must fail
+ # closed — an attacker could otherwise steer the connection to the
+ # private one.
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("93.184.216.34"), _addr("127.0.0.1")]
+ with pytest.raises(ValueError, match="private/internal"):
+ resolve_and_validate("rebinder.example.com")
+
+ def test_allowlist_permits_private(self):
+ allowed = [ipaddress.ip_network("10.0.0.0/8")]
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr("10.0.0.5")]
+ assert resolve_and_validate("internal", allowed_ips=allowed) == ["10.0.0.5"]
+
+ def test_unresolvable_raises(self):
+ import socket as _socket
+
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.side_effect = _socket.gaierror()
+ with pytest.raises(ValueError, match="could not be resolved"):
+ resolve_and_validate("nope.invalid")
+
+
+# ---------------------------------------------------------------------------
+# Cluster B — connection pinned to the validated IP (DNS-rebinding TOCTOU)
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestPinnedFetch:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_connects_to_validated_ip_not_hostname(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch("POST", "https://example.com/hook", json={"a": 1})
+
+ # The socket target is the validated IP literal — there is no second
+ # DNS lookup, so a rebind between validation and connection is
+ # impossible.
+ method, url = session.request.call_args.args
+ kwargs = session.request.call_args.kwargs
+ assert method == "POST"
+ assert url == "https://93.184.216.34:443/hook"
+ # Host header + TLS SNI still target the real hostname.
+ assert kwargs["headers"]["Host"] == "example.com"
+ assert kwargs["allow_redirects"] is False
+ assert kwargs["verify"] is True
+ assert kwargs["json"] == {"a": 1}
+ # Ambient proxy/env must not be honoured (would bypass pinning).
+ assert session.trust_env is False
+ assert kwargs["proxies"] == {"http": None, "https": None}
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_non_default_port_in_host_header(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch("GET", "http://example.com:8080/x")
+
+ _, url = session.request.call_args.args
+ kwargs = session.request.call_args.kwargs
+ assert url == "http://93.184.216.34:8080/x"
+ assert kwargs["headers"]["Host"] == "example.com:8080"
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_ipv6_validated_ip_is_bracketed(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["2606:4700:4700::1111"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch("GET", "https://example.com/x")
+
+ _, url = session.request.call_args.args
+ assert url == "https://[2606:4700:4700::1111]:443/x"
+
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_blocked_target_raises_before_any_request(self, mock_resolve):
+ mock_resolve.side_effect = ValueError(
+ "Access to private/internal networks is not allowed"
+ )
+ with pytest.raises(ValueError, match="private/internal"):
+ pinned_fetch("POST", "https://attacker.com/hook")
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_tries_next_ip_on_connection_error(self, mock_resolve, mock_session_cls):
+ # Dual-stack host: first validated IP is unreachable, second works.
+ mock_resolve.return_value = ["93.184.216.34", "93.184.216.35"]
+ session = mock_session_cls.return_value
+ session.request.side_effect = [
+ requests.ConnectionError("down"),
+ _resp(200),
+ ]
+ resp = pinned_fetch("GET", "https://example.com/x")
+ assert resp.status_code == 200
+ assert session.request.call_count == 2
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_allowed_host_skips_block_check_but_still_pins(self, mock_resolve, mock_session_cls):
+ # Trusted host (e.g. internal docker service) whose IP is private: the
+ # block check is skipped, but the connection is STILL pinned to the
+ # resolved IP so it cannot be rebound to a different internal target.
+ mock_resolve.return_value = ["172.18.0.5"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch(
+ "POST",
+ "http://silo:3000/hook",
+ allowed_hosts=["silo"],
+ json={"x": 1},
+ )
+
+ # Resolution happens with require_safe=False (trusted, skip block check).
+ assert mock_resolve.call_args.kwargs.get("require_safe") is False
+ # ...but the connection is pinned to the resolved IP literal, Host=silo.
+ _, url = session.request.call_args.args
+ assert url == "http://172.18.0.5:3000/hook"
+ assert session.request.call_args.kwargs["headers"]["Host"] == "silo:3000"
+ assert session.request.call_args.kwargs["allow_redirects"] is False
+
+
+# ---------------------------------------------------------------------------
+# Cluster C — redirects re-resolved / re-validated / re-pinned each hop
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestPinnedFetchRedirects:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_no_redirect_returns_response(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ resp, final = pinned_fetch_following_redirects("GET", "https://example.com/a")
+ assert resp.status_code == 200
+ assert final == "https://example.com/a"
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_follows_and_revalidates_each_hop(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.side_effect = [
+ _resp(301, headers={"Location": "https://other.com/page"}),
+ _resp(200),
+ ]
+
+ resp, final = pinned_fetch_following_redirects("GET", "https://example.com/a")
+ assert resp.status_code == 200
+ assert final == "https://other.com/page"
+ # Re-resolved (and thus re-validated + re-pinned) on each hop.
+ assert mock_resolve.call_count == 2
+ assert mock_resolve.call_args_list[0].args[0] == "example.com"
+ assert mock_resolve.call_args_list[1].args[0] == "other.com"
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_blocks_redirect_to_private_ip(self, mock_resolve, mock_session_cls):
+ # First hop resolves public; redirect target resolves private -> blocked
+ mock_resolve.side_effect = [
+ ["93.184.216.34"],
+ ValueError("Access to private/internal networks is not allowed"),
+ ]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(
+ 302, headers={"Location": "http://169.254.169.254/latest/meta-data/"}
+ )
+ with pytest.raises(ValueError, match="private/internal"):
+ pinned_fetch_following_redirects("GET", "https://evil.com/r")
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_too_many_redirects(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(
+ 302, headers={"Location": "https://example.com/loop"}
+ )
+ with pytest.raises(requests.TooManyRedirects):
+ pinned_fetch_following_redirects(
+ "GET", "https://example.com/start", max_redirects=3
+ )
+
+
+# ---------------------------------------------------------------------------
+# PinnedIPAdapter — TLS server_hostname injection (cert verified vs hostname)
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestPinnedIPAdapter:
+ def test_injects_server_hostname_into_pool(self):
+ adapter = PinnedIPAdapter(server_hostname="example.com")
+ adapter.build_connection_pool_key_attributes = MagicMock(
+ return_value=({"scheme": "https", "host": "93.184.216.34", "port": 443}, {})
+ )
+ adapter.poolmanager = MagicMock()
+
+ request = MagicMock()
+ adapter.get_connection_with_tls_context(request, verify=True)
+
+ _, kwargs = adapter.poolmanager.connection_from_host.call_args
+ assert kwargs["pool_kwargs"]["server_hostname"] == "example.com"
+
+
+# ---------------------------------------------------------------------------
+# validate_url — create/update-time defense in depth still rejects bypasses
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestValidateUrlHardening:
+ @pytest.mark.parametrize("ip", ["100.64.0.1", "224.0.0.1", "0.0.0.0"])
+ def test_rejects_newly_covered_ranges(self, ip):
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.return_value = [_addr(ip)]
+ with pytest.raises(ValueError, match="private/internal"):
+ validate_url("http://attacker.example.com")
+
+
+# ---------------------------------------------------------------------------
+# Review-feedback fixes (PR #9163)
+# ---------------------------------------------------------------------------
+@pytest.mark.unit
+class TestReviewFixes:
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_url_embedded_credentials_become_basic_auth(self, mock_resolve, mock_session_cls):
+ # user:pass@host -> Basic Auth preserved as auth=, userinfo stripped from URL
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch("GET", "https://user:p%40ss@example.com/hook")
+
+ _, url = session.request.call_args.args
+ kwargs = session.request.call_args.kwargs
+ assert url == "https://93.184.216.34:443/hook" # no userinfo in the IP URL
+ assert kwargs["auth"] == ("user", "p@ss") # percent-decoded
+ assert kwargs["headers"]["Host"] == "example.com"
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_no_credentials_passes_auth_none(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+ pinned_fetch("GET", "https://example.com/x")
+ assert session.request.call_args.kwargs["auth"] is None
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_ipv6_literal_host_header_is_bracketed(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["2606:4700:4700::1111"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _resp(200)
+
+ pinned_fetch("GET", "https://[2606:4700:4700::1111]/x")
+
+ kwargs = session.request.call_args.kwargs
+ assert kwargs["headers"]["Host"] == "[2606:4700:4700::1111]"
+
+ def test_idna_unicode_error_is_treated_as_unresolvable(self):
+ # getaddrinfo can raise UnicodeError (IDNA) before any lookup; it must
+ # surface as ValueError so webhook_send_task records a URL rejection.
+ with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
+ dns.side_effect = UnicodeError("label empty or too long")
+ with pytest.raises(ValueError, match="could not be resolved"):
+ resolve_and_validate("xn--bad-name")
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_stream_defers_session_close_until_response_close(self, mock_resolve, mock_session_cls):
+ # With stream=True the size cap can bound memory only if the session
+ # stays open until the body is read; closing the response closes it.
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ resp = _resp(200)
+ session.request.return_value = resp
+
+ out = pinned_fetch("GET", "https://cdn.example.com/a.png", stream=True)
+
+ assert session.request.call_args.kwargs["stream"] is True
+ session.close.assert_not_called() # deferred
+ out.close()
+ session.close.assert_called_once()
diff --git a/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py b/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py
index f2209e67e6d..2599126ff49 100644
--- a/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py
+++ b/apps/api/plane/tests/unit/bg_tasks/test_work_item_link_task.py
@@ -5,6 +5,7 @@
import ipaddress
import pytest
+import requests
from unittest.mock import patch, MagicMock
from plane.bgtasks.work_item_link_task import safe_get, validate_url_ip
from plane.utils.ip_address import validate_url
@@ -45,6 +46,22 @@ def test_allows_public_ip(self):
mock_dns.return_value = [(None, None, None, None, ("93.184.216.34", 0))]
validate_url_ip("https://example.com") # Should not raise
+ @pytest.mark.parametrize(
+ "ip",
+ [
+ "100.64.0.1", # CGNAT / shared address space (not is_private on 3.12)
+ "224.0.0.1", # multicast
+ "0.0.0.0", # unspecified
+ "::ffff:169.254.169.254", # IPv4-mapped cloud metadata
+ "64:ff9b::a9fe:a9fe", # NAT64 embedding 169.254.169.254
+ ],
+ )
+ def test_rejects_hardened_bypass_ranges(self, ip):
+ with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns:
+ mock_dns.return_value = [(None, None, None, None, (ip, 0))]
+ with pytest.raises(ValueError, match="private/internal"):
+ validate_url_ip("http://attacker.example.com")
+
@pytest.mark.unit
class TestValidateUrlAllowlist:
@@ -133,82 +150,81 @@ def test_allowed_hosts_empty_does_not_bypass(self):
@pytest.mark.unit
class TestSafeGet:
- """Test safe_get follows redirects safely and blocks SSRF."""
+ """safe_get now delegates to the pinned SSRF-safe client; assert it resolves,
+ validates, pins to the validated IP and follows redirects safely. Network is
+ mocked at the requests.Session boundary inside plane.utils.url_security."""
- @patch("plane.bgtasks.work_item_link_task.requests.get")
- @patch("plane.bgtasks.work_item_link_task.validate_url_ip")
- def test_returns_response_for_non_redirect(self, mock_validate, mock_get):
- final_resp = _make_response(status_code=200, content=b"OK")
- mock_get.return_value = final_resp
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_returns_response_for_non_redirect(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _make_response(status_code=200, content=b"OK")
response, final_url = safe_get("https://example.com")
- assert response is final_resp
+ assert response.status_code == 200
assert final_url == "https://example.com"
- mock_validate.assert_called_once_with("https://example.com")
-
- @patch("plane.bgtasks.work_item_link_task.requests.get")
- @patch("plane.bgtasks.work_item_link_task.validate_url_ip")
- def test_follows_redirect_and_validates_each_hop(self, mock_validate, mock_get):
- redirect_resp = _make_response(
- status_code=301,
- is_redirect=True,
- headers={"Location": "https://other.com/page"},
- )
- final_resp = _make_response(status_code=200, content=b"OK")
- mock_get.side_effect = [redirect_resp, final_resp]
+ # Pinned to the validated IP literal, not the hostname.
+ _, url = session.request.call_args.args
+ assert url == "https://93.184.216.34:443/"
+ assert session.request.call_args.kwargs["headers"]["Host"] == "example.com"
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_follows_redirect_and_validates_each_hop(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.side_effect = [
+ _make_response(status_code=301, headers={"Location": "https://other.com/page"}),
+ _make_response(status_code=200, content=b"OK"),
+ ]
response, final_url = safe_get("https://example.com")
- assert response is final_resp
+ assert response.status_code == 200
assert final_url == "https://other.com/page"
- # Should validate both the initial URL and the redirect target
- assert mock_validate.call_count == 2
- mock_validate.assert_any_call("https://example.com")
- mock_validate.assert_any_call("https://other.com/page")
-
- @patch("plane.bgtasks.work_item_link_task.requests.get")
- @patch("plane.bgtasks.work_item_link_task.validate_url_ip")
- def test_blocks_redirect_to_private_ip(self, mock_validate, mock_get):
- redirect_resp = _make_response(
- status_code=302,
- is_redirect=True,
- headers={"Location": "http://192.168.1.1:8080"},
+ assert mock_resolve.call_count == 2
+ assert mock_resolve.call_args_list[0].args[0] == "example.com"
+ assert mock_resolve.call_args_list[1].args[0] == "other.com"
+
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_blocks_redirect_to_private_ip(self, mock_resolve, mock_session_cls):
+ mock_resolve.side_effect = [
+ ["93.184.216.34"],
+ ValueError("Access to private/internal networks is not allowed"),
+ ]
+ session = mock_session_cls.return_value
+ session.request.return_value = _make_response(
+ status_code=302, headers={"Location": "http://192.168.1.1:8080"}
)
- mock_get.return_value = redirect_resp
- # First call (initial URL) succeeds, second call (redirect target) fails
- mock_validate.side_effect = [None, ValueError("Access to private/internal networks is not allowed")]
with pytest.raises(ValueError, match="private/internal"):
safe_get("https://evil.com/redirect")
- @patch("plane.bgtasks.work_item_link_task.requests.get")
- @patch("plane.bgtasks.work_item_link_task.validate_url_ip")
- def test_raises_on_too_many_redirects(self, mock_validate, mock_get):
- redirect_resp = _make_response(
- status_code=302,
- is_redirect=True,
- headers={"Location": "https://example.com/loop"},
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_raises_on_too_many_redirects(self, mock_resolve, mock_session_cls):
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.return_value = _make_response(
+ status_code=302, headers={"Location": "https://example.com/loop"}
)
- mock_get.return_value = redirect_resp
- with pytest.raises(RuntimeError, match="Too many redirects"):
+ with pytest.raises(requests.TooManyRedirects):
safe_get("https://example.com/start")
- @patch("plane.bgtasks.work_item_link_task.requests.get")
- @patch("plane.bgtasks.work_item_link_task.validate_url_ip")
- def test_succeeds_at_exact_max_redirects(self, mock_validate, mock_get):
- """After exactly MAX_REDIRECTS hops, if the final response is 200, it should succeed."""
- redirect_resp = _make_response(
- status_code=302,
- is_redirect=True,
- headers={"Location": "https://example.com/next"},
- )
- final_resp = _make_response(status_code=200, content=b"OK")
- # 5 redirects then a 200
- mock_get.side_effect = [redirect_resp] * 5 + [final_resp]
+ @patch("plane.utils.url_security.requests.Session")
+ @patch("plane.utils.url_security.resolve_and_validate")
+ def test_succeeds_at_exact_max_redirects(self, mock_resolve, mock_session_cls):
+ """5 redirects then a 200 must succeed (MAX_REDIRECTS == 5)."""
+ mock_resolve.return_value = ["93.184.216.34"]
+ session = mock_session_cls.return_value
+ session.request.side_effect = [
+ _make_response(status_code=302, headers={"Location": "https://example.com/next"})
+ ] * 5 + [_make_response(status_code=200, content=b"OK")]
response, final_url = safe_get("https://example.com/start")
- assert response is final_resp
- assert not response.is_redirect
+ assert response.status_code == 200
diff --git a/apps/api/plane/utils/ip_address.py b/apps/api/plane/utils/ip_address.py
index ce1612a57aa..c4b0d331287 100644
--- a/apps/api/plane/utils/ip_address.py
+++ b/apps/api/plane/utils/ip_address.py
@@ -8,10 +8,162 @@
from urllib.parse import urlparse
+# Networks that must never be reachable as an outbound request target but which
+# the stdlib ``ipaddress`` flags (is_private/is_loopback/...) do NOT reliably
+# classify on every Python version. Listed explicitly so the verdict is
+# identical and fail-closed across Python 3.9 – 3.14 (Plane ships on 3.12,
+# where e.g. 100.64.0.0/10 is neither is_private nor is_global).
+_BLOCKED_NETWORKS = [
+ ipaddress.ip_network(cidr)
+ for cidr in (
+ "0.0.0.0/8", # "this host on this network" (RFC 1122) / unspecified block
+ "100.64.0.0/10", # carrier-grade NAT / shared address space (RFC 6598)
+ "169.254.0.0/16", # link-local (incl. cloud metadata 169.254.169.254)
+ "255.255.255.255/32", # limited broadcast
+ "::ffff:0:0/96", # IPv4-mapped IPv6
+ "64:ff9b::/96", # NAT64 well-known prefix (RFC 6052)
+ "64:ff9b:1::/48", # NAT64 local-use prefix (RFC 8215)
+ "2002::/16", # 6to4
+ "2001::/32", # Teredo
+ "fec0::/10", # deprecated IPv6 site-local
+ )
+]
+
+
+def _embedded_ipv4(ip):
+ """
+ Yield any IPv4 address embedded inside an IPv6 transition address.
+
+ An attacker who controls a hostname's AAAA record can point it at an IPv6
+ address that the network transparently translates to an internal IPv4
+ target (e.g. ``::ffff:169.254.169.254``, ``64:ff9b::7f00:1`` → 127.0.0.1,
+ 6to4, Teredo). The embedded IPv4 is what the packet ultimately reaches, so
+ it must be validated too — we cannot trust the interpreter to classify the
+ outer IPv6 address consistently across versions.
+ """
+ if ip.version != 6:
+ return
+
+ if ip.ipv4_mapped is not None:
+ yield ip.ipv4_mapped
+
+ if ip.sixtofour is not None:
+ yield ip.sixtofour
+
+ teredo = ip.teredo
+ if teredo is not None:
+ # (server_ipv4, client_ipv4)
+ yield teredo[0]
+ yield teredo[1]
+
+ # NAT64 well-known prefix (64:ff9b::/96): the low 32 bits embed the IPv4.
+ # The local-use prefix 64:ff9b:1::/48 uses a different (length-dependent)
+ # embedding per RFC 6052, so it is not decoded here — it is blocked wholesale
+ # via _BLOCKED_NETWORKS instead.
+ if ip in ipaddress.ip_network("64:ff9b::/96"):
+ yield ipaddress.ip_address(int(ip) & 0xFFFFFFFF)
+
+
+def is_blocked_ip(ip):
+ """
+ Return ``True`` if ``ip`` (an ``ipaddress`` address object) should never be
+ used as an outbound request target (SSRF protection).
+
+ Blocks private, loopback, reserved, link-local, multicast and unspecified
+ ranges; an explicit deny-list of networks the stdlib misclassifies on some
+ Python versions; and recurses into IPv4 addresses embedded in IPv6
+ transition formats. Fails closed: anything it cannot positively clear is
+ treated as blocked.
+ """
+ if (
+ ip.is_private
+ or ip.is_loopback
+ or ip.is_reserved
+ or ip.is_link_local
+ or ip.is_multicast
+ or ip.is_unspecified
+ ):
+ return True
+
+ if any(ip.version == net.version and ip in net for net in _BLOCKED_NETWORKS):
+ return True
+
+ for embedded in _embedded_ipv4(ip):
+ if is_blocked_ip(embedded):
+ return True
+
+ return False
+
+
+def _is_allowed_ip(ip, allowed_ips):
+ """Return True if ``ip`` falls inside an operator-trusted allowlist network."""
+ return bool(allowed_ips) and any(
+ net.version == ip.version and ip in net for net in allowed_ips
+ )
+
+
+def resolve_and_validate(hostname, allowed_ips=None, require_safe=True):
+ """
+ Resolve ``hostname`` and (when ``require_safe``) ensure every resolved
+ address is a safe outbound target, returning the list of resolved IP
+ strings (in resolver order, de-duplicated).
+
+ The returned list is intended to be *pinned* for the actual connection
+ (connect to the IP literal so no second DNS lookup occurs), which is what
+ closes the DNS-rebinding TOCTOU.
+
+ Args:
+ hostname: The hostname (or IP literal) to resolve.
+ allowed_ips: Optional list of ``ipaddress.ip_network`` objects. IPs
+ inside these networks are permitted even if otherwise
+ blocked (operator-trusted internal targets).
+ require_safe: When ``True`` (default) every resolved IP is checked and a
+ blocked/internal address raises. When ``False`` the host is
+ already operator-trusted (e.g. a WEBHOOK_ALLOWED_HOSTS
+ entry) so the block check is skipped — but resolution still
+ happens so the connection can be pinned (pinning prevents
+ rebinding even for trusted hosts).
+
+ Returns:
+ list[str]: The resolved IP addresses to which a connection may be
+ pinned.
+
+ Raises:
+ ValueError: If the hostname cannot be resolved or (when
+ ``require_safe``) any resolved address is a blocked/internal target not
+ covered by ``allowed_ips``.
+ """
+ try:
+ addr_info = socket.getaddrinfo(hostname, None)
+ except (socket.gaierror, UnicodeError):
+ # UnicodeError covers IDNA encoding/normalisation failures, which
+ # getaddrinfo raises before the address lookup for malformed hostnames.
+ raise ValueError("Hostname could not be resolved")
+
+ if not addr_info:
+ raise ValueError("No IP addresses found for the hostname")
+
+ validated = []
+ for addr in addr_info:
+ # Strip any IPv6 zone id (e.g. ``fe80::1%eth0``) before parsing.
+ ip_str = addr[4][0].split("%")[0]
+ ip = ipaddress.ip_address(ip_str)
+ if require_safe and not _is_allowed_ip(ip, allowed_ips) and is_blocked_ip(ip):
+ raise ValueError("Access to private/internal networks is not allowed")
+ if ip_str not in validated:
+ validated.append(ip_str)
+
+ return validated
+
+
def validate_url(url, allowed_ips=None, allowed_hosts=None):
"""
Validate that a URL doesn't resolve to a private/internal IP address (SSRF protection).
+ Note: this validates at a point in time. To defeat DNS-rebinding (TOCTOU),
+ the actual request must be pinned to the validated IP — see
+ ``plane.utils.url_security.pinned_fetch``.
+
Args:
url: The URL to validate.
allowed_ips: Optional list of ipaddress.ip_network objects. IPs falling within
@@ -41,22 +193,7 @@ def validate_url(url, allowed_ips=None, allowed_hosts=None):
}:
return
- try:
- addr_info = socket.getaddrinfo(hostname, None)
- except socket.gaierror:
- raise ValueError("Hostname could not be resolved")
-
- if not addr_info:
- raise ValueError("No IP addresses found for the hostname")
-
- for addr in addr_info:
- ip = ipaddress.ip_address(addr[4][0])
- if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
- if allowed_ips and any(
- network.version == ip.version and ip in network for network in allowed_ips
- ):
- continue
- raise ValueError("Access to private/internal networks is not allowed")
+ resolve_and_validate(hostname, allowed_ips=allowed_ips)
def get_client_ip(request):
diff --git a/apps/api/plane/utils/url_security.py b/apps/api/plane/utils/url_security.py
new file mode 100644
index 00000000000..37f4d0636a1
--- /dev/null
+++ b/apps/api/plane/utils/url_security.py
@@ -0,0 +1,272 @@
+# Copyright (c) 2023-present Plane Software, Inc. and contributors
+# SPDX-License-Identifier: AGPL-3.0-only
+# See the LICENSE file for details.
+
+"""
+SSRF-safe outbound HTTP client.
+
+The validators in :mod:`plane.utils.ip_address` resolve a hostname and confirm
+that none of its addresses point at internal infrastructure. On their own they
+are vulnerable to DNS rebinding (TOCTOU): the validator resolves the name, but
+``requests`` resolves it a *second* time when it actually connects, and an
+attacker who controls DNS can return a public IP to the validator and an
+internal IP to the connection.
+
+``pinned_fetch`` closes that window by resolving + validating once and then
+connecting to the *validated IP literal* — urllib3 performs no second DNS
+lookup, so the address that was checked is exactly the address that is reached.
+The original hostname is still used for the ``Host`` header, TLS SNI and
+certificate verification, so virtual-hosting and HTTPS continue to work.
+
+Redirects are never auto-followed (``requests`` would re-resolve each hop and
+reopen the rebinding window, and a ``Location`` can point at a new internal
+host). ``pinned_fetch_following_redirects`` follows them manually, re-resolving,
+re-validating and re-pinning every hop.
+"""
+
+# Python imports
+import ipaddress
+from urllib.parse import unquote, urljoin, urlsplit
+
+# Third party imports
+import requests
+from requests.adapters import HTTPAdapter
+
+# Module imports
+from plane.utils.ip_address import resolve_and_validate
+
+# 3xx status codes that carry a Location we may follow.
+_REDIRECT_STATUSES = {301, 302, 303, 307, 308}
+
+# Never route through an ambient proxy — a CONNECT to a proxy would tunnel to
+# the original hostname and bypass the IP pinning entirely.
+_NO_PROXIES = {"http": None, "https": None}
+
+
+class PinnedIPAdapter(HTTPAdapter):
+ """
+ A ``requests`` transport adapter that connects to whatever IP literal is in
+ the request URL while presenting ``server_hostname`` for TLS SNI and
+ certificate verification.
+
+ The IP literal in the URL means urllib3 opens the socket to that exact IP
+ with no DNS resolution. Injecting ``server_hostname`` (and leaving
+ ``assert_hostname`` at its ``None`` default so ``SSLContext.check_hostname``
+ stays ``True``) makes the standard library verify the presented certificate
+ against the real hostname rather than the IP.
+
+ Instances hold no global state — one is mounted on a throwaway
+ :class:`requests.Session` per request, so this is safe under any Celery pool
+ (prefork / threads / gevent).
+ """
+
+ def __init__(self, server_hostname, *args, **kwargs):
+ self._server_hostname = server_hostname
+ super().__init__(*args, **kwargs)
+
+ def get_connection_with_tls_context(self, request, verify, proxies=None, cert=None):
+ # requests >= 2.32 calls this (it replaced get_connection() as part of
+ # the CVE-2024-35195 fix). requests is pinned to 2.33 in base.txt.
+ host_params, pool_kwargs = self.build_connection_pool_key_attributes(
+ request, verify, cert
+ )
+ # server_hostname is a recognised urllib3 SSL pool-key field, so pools
+ # for different hostnames don't collide.
+ pool_kwargs["server_hostname"] = self._server_hostname
+ return self.poolmanager.connection_from_host(**host_params, pool_kwargs=pool_kwargs)
+
+
+def _split_target(url):
+ """Parse a URL into the pieces needed to build a pinned request.
+
+ Returns ``(scheme, hostname, port, path, auth)`` where ``auth`` carries any
+ URL-embedded credentials (``user:pass@host``) as a ``(user, pass)`` tuple so
+ HTTP Basic Auth still works once the URL is rewritten to an IP literal.
+ """
+ parts = urlsplit(url)
+ scheme = parts.scheme
+ if scheme not in ("http", "https"):
+ raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed")
+ hostname = parts.hostname
+ if not hostname:
+ raise ValueError("Invalid URL: No hostname found")
+ port = parts.port or (443 if scheme == "https" else 80)
+ path = parts.path or "/"
+ if parts.query:
+ path = f"{path}?{parts.query}"
+ auth = None
+ if parts.username is not None or parts.password is not None:
+ auth = (unquote(parts.username or ""), unquote(parts.password or ""))
+ return scheme, hostname, port, path, auth
+
+
+def _request_to_ip(method, scheme, hostname, ip, port, path, *, headers, timeout, auth=None, **kwargs):
+ """Issue a single request whose socket is pinned to ``ip``.
+
+ With ``stream=True`` the session is kept open until the caller closes the
+ response (closing the response also closes the session), so a streamed body
+ can be read with a real size cap; otherwise the session is closed eagerly.
+ """
+ ip_obj = ipaddress.ip_address(ip)
+ host_for_url = f"[{ip}]" if ip_obj.version == 6 else ip
+ url = f"{scheme}://{host_for_url}:{port}{path}"
+
+ request_headers = dict(headers or {})
+ default_port = 443 if scheme == "https" else 80
+ # Host header (and TLS) carry the ORIGINAL hostname, not the IP literal.
+ # An IPv6-literal hostname must be bracketed in the Host header.
+ host_label = f"[{hostname}]" if ":" in hostname else hostname
+ request_headers["Host"] = host_label if port == default_port else f"{host_label}:{port}"
+
+ session = requests.Session()
+ session.trust_env = False # ignore ambient proxy / netrc / env (see _NO_PROXIES)
+ if scheme == "https":
+ session.mount("https://", PinnedIPAdapter(server_hostname=hostname))
+
+ try:
+ response = session.request(
+ method,
+ url,
+ headers=request_headers,
+ timeout=timeout,
+ allow_redirects=False,
+ verify=True,
+ proxies=_NO_PROXIES,
+ auth=auth,
+ **kwargs,
+ )
+ except BaseException:
+ session.close()
+ raise
+
+ if kwargs.get("stream"):
+ # Defer closing the session until the response is closed, so the
+ # streamed body remains readable. response.close() now also closes
+ # the session.
+ _orig_close = response.close
+
+ def _close_all(_orig=_orig_close, _sess=session):
+ try:
+ _orig()
+ finally:
+ _sess.close()
+
+ response.close = _close_all
+ else:
+ session.close()
+ return response
+
+
+def _fetch_validated_hop(method, url, *, allowed_ips, allowed_hosts, headers, timeout, **kwargs):
+ """
+ Resolve ``url``'s host, validate it, then issue a single (non-redirecting)
+ request pinned to a resolved IP. Returns ``(response, normalized_host)``.
+
+ Hosts in ``allowed_hosts`` are operator-trusted (e.g. internal service DNS
+ whose IPs are dynamic): they skip the private-IP *block* check, but the
+ connection is STILL pinned to the resolved IP so a trusted hostname cannot
+ be rebound to a different internal target between validation and connect.
+ """
+ scheme, hostname, port, path, auth = _split_target(url)
+
+ normalized_host = hostname.rstrip(".").lower()
+ trusted = bool(allowed_hosts) and normalized_host in {
+ (h or "").rstrip(".").lower() for h in allowed_hosts if h
+ }
+
+ # Resolve once (and validate unless the host is operator-trusted), then pin
+ # the connection to a resolved IP literal — urllib3 performs no second DNS
+ # lookup, so the address validated here is exactly the one reached.
+ ips = resolve_and_validate(hostname, allowed_ips=allowed_ips, require_safe=not trusted)
+
+ last_exc = None
+ for ip in ips:
+ try:
+ response = _request_to_ip(
+ method, scheme, hostname, ip, port, path,
+ headers=headers, timeout=timeout, auth=auth, **kwargs,
+ )
+ return response, normalized_host
+ except requests.RequestException as exc:
+ # Try the next resolved address (dual-stack / round-robin hosts).
+ last_exc = exc
+ if last_exc is not None:
+ raise last_exc
+ raise requests.ConnectionError(f"No reachable address for host: {hostname}")
+
+
+def pinned_fetch(
+ method,
+ url,
+ *,
+ allowed_ips=None,
+ allowed_hosts=None,
+ headers=None,
+ timeout=30,
+ **kwargs,
+):
+ """
+ SSRF-safe single request. Resolves + validates the target host and pins the
+ connection to a validated IP (defeating DNS rebinding). Does NOT follow
+ redirects.
+
+ Raises:
+ ValueError: if the URL is invalid or resolves to a blocked address.
+ requests.RequestException: on network/transport errors.
+ """
+ response, _ = _fetch_validated_hop(
+ method, url,
+ allowed_ips=allowed_ips, allowed_hosts=allowed_hosts,
+ headers=headers, timeout=timeout, **kwargs,
+ )
+ return response
+
+
+def pinned_fetch_following_redirects(
+ method,
+ url,
+ *,
+ allowed_ips=None,
+ allowed_hosts=None,
+ headers=None,
+ timeout=30,
+ max_redirects=5,
+ **kwargs,
+):
+ """
+ SSRF-safe request that follows redirects manually, re-resolving,
+ re-validating and re-pinning every hop. Returns ``(response, final_url)``.
+
+ Raises:
+ ValueError: if any URL in the chain is invalid or resolves to a blocked
+ address.
+ requests.TooManyRedirects: if the hop limit is exceeded.
+ requests.RequestException: on network/transport errors.
+ """
+ current_url = url
+ redirects = 0
+ while True:
+ response, _ = _fetch_validated_hop(
+ method, current_url,
+ allowed_ips=allowed_ips, allowed_hosts=allowed_hosts,
+ headers=headers, timeout=timeout, **kwargs,
+ )
+
+ if response.status_code not in _REDIRECT_STATUSES:
+ return response, current_url
+
+ location = response.headers.get("Location")
+ if not location:
+ return response, current_url
+
+ if redirects >= max_redirects:
+ response.close()
+ raise requests.TooManyRedirects(
+ f"Exceeded {max_redirects} redirects for URL: {url}"
+ )
+ redirects += 1
+ # Release the intermediate hop's connection/session before following.
+ response.close()
+ # Resolve the redirect target against the current URL; the next loop
+ # iteration re-validates and re-pins it.
+ current_url = urljoin(current_url, location)
diff --git a/apps/api/requirements/base.txt b/apps/api/requirements/base.txt
index 3aca207b684..252b0209149 100644
--- a/apps/api/requirements/base.txt
+++ b/apps/api/requirements/base.txt
@@ -56,6 +56,9 @@ lxml==6.1.0
boto3==1.34.96
# http client (pinned to address CVE-2026-44431 and CVE-2026-44432)
urllib3>=2.7.0
+# requests — used directly for webhook delivery & link unfurling; pinned to
+# >=2.32 for the get_connection_with_tls_context adapter hook (SSRF IP pinning)
+requests==2.33.0
# password validator
zxcvbn==4.4.28
# timezone
diff --git a/apps/api/requirements/test.txt b/apps/api/requirements/test.txt
index bed38224a2f..0b655f99466 100644
--- a/apps/api/requirements/test.txt
+++ b/apps/api/requirements/test.txt
@@ -8,5 +8,4 @@ pytest-mock==3.11.1
factory-boy==3.3.0
freezegun==1.2.2
coverage==7.2.7
-httpx==0.24.1
-requests==2.33.0
\ No newline at end of file
+httpx==0.24.1
\ No newline at end of file