From f76ce7c8a2807b375855d83727f9608d33f37eec Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 31 May 2026 17:15:18 -0700 Subject: [PATCH 1/2] fix(security): block SSRF in fetch_image_url via scheme allowlist and private IP rejection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds URL validation to fetch_image_url() before issuing any HTTP request: - Scheme allowlist: only http/https permitted; file://, ftp://, etc. raise ValueError - Metadata host blocklist: rejects 169.254.169.254 (AWS IMDSv1), metadata.google.internal (GCP), 169.254.170.2 (Azure IMDS) - Literal IP rejection: ipaddress.ip_address() detects private, loopback, and link-local literal IPs; raises ValueError before session.get() is called. Hostnames (non-literal IPs) are not blocked — DNS rebinding protection is deferred to a future change. Tests (12 cases) verify that session.get() is never called for rejected URLs by mocking create_http_session and asserting zero .get() calls. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/exo/api/adapters/chat_completions.py | 25 +++ src/exo/api/adapters/tests/__init__.py | 0 .../adapters/tests/test_fetch_image_url.py | 153 ++++++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 src/exo/api/adapters/tests/__init__.py create mode 100644 src/exo/api/adapters/tests/test_fetch_image_url.py diff --git a/src/exo/api/adapters/chat_completions.py b/src/exo/api/adapters/chat_completions.py index cbd545318b..ae68c9fbbc 100644 --- a/src/exo/api/adapters/chat_completions.py +++ b/src/exo/api/adapters/chat_completions.py @@ -1,10 +1,12 @@ """OpenAI Chat Completions API adapter for converting requests/responses.""" import base64 +import ipaddress import re import time from collections.abc import AsyncGenerator from typing import Any +from urllib.parse import urlparse from exo.api.types import ( ChatCompletionChoice, @@ -38,6 +40,12 @@ resolve_reasoning_params, ) +_BLOCKED_METADATA_HOSTS: frozenset[str] = frozenset({ + "169.254.169.254", # AWS IMDSv1 + "metadata.google.internal", # GCP + "169.254.170.2", # Azure IMDS +}) + def extract_base64_from_data_url(data_url: str) -> Base64Image: match = re.match(r"data:[^;]+;base64,(.+)", data_url) @@ -47,6 +55,23 @@ def extract_base64_from_data_url(data_url: str) -> Base64Image: async def fetch_image_url(url: str) -> Base64Image: + parsed = urlparse(url) + + if parsed.scheme not in ("http", "https"): + raise ValueError(f"URL scheme '{parsed.scheme}' not allowed; only http and https are permitted") + + if parsed.hostname in _BLOCKED_METADATA_HOSTS: + raise ValueError(f"Access to '{parsed.hostname}' is denied (cloud metadata endpoint)") + + if parsed.hostname: + try: + ip = ipaddress.ip_address(parsed.hostname) + except ValueError: + ip = None # hostname, not a literal IP — DNS resolution proceeds normally + + if ip is not None and (ip.is_private or ip.is_loopback or ip.is_link_local): + raise ValueError(f"Non-public IP address '{parsed.hostname}' not allowed") + headers = {"User-Agent": "exo/1.0"} async with ( create_http_session(timeout_profile="short") as session, diff --git a/src/exo/api/adapters/tests/__init__.py b/src/exo/api/adapters/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/exo/api/adapters/tests/test_fetch_image_url.py b/src/exo/api/adapters/tests/test_fetch_image_url.py new file mode 100644 index 0000000000..8334762a4f --- /dev/null +++ b/src/exo/api/adapters/tests/test_fetch_image_url.py @@ -0,0 +1,153 @@ +# pyright: reportAny=false +"""Tests for fetch_image_url SSRF protection. + +Verifies that scheme, metadata-host, and literal private/loopback/link-local IP +checks fire before any network access, and that valid public URLs are allowed. +""" + +import base64 +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from exo.api.adapters.chat_completions import fetch_image_url + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _mock_session(response_data: bytes = b"img") -> MagicMock: + """Return a mock aiohttp session whose .get() never actually sends a request.""" + resp = MagicMock() + resp.__aenter__ = AsyncMock(return_value=resp) + resp.__aexit__ = AsyncMock(return_value=False) + resp.raise_for_status = MagicMock() + resp.read = AsyncMock(return_value=response_data) + + session = MagicMock() + session.__aenter__ = AsyncMock(return_value=session) + session.__aexit__ = AsyncMock(return_value=False) + session.get = MagicMock(return_value=resp) + return session + + +# --------------------------------------------------------------------------- +# Rejection cases — scheme +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_file_scheme_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="scheme"): + await fetch_image_url("file:///etc/passwd") + mock_cs.assert_not_called() + + +@pytest.mark.asyncio +async def test_ftp_scheme_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="scheme"): + await fetch_image_url("ftp://example.com/image.jpg") + mock_cs.assert_not_called() + + +# --------------------------------------------------------------------------- +# Rejection cases — metadata host blocklist +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_aws_metadata_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="metadata"): + await fetch_image_url("http://169.254.169.254/latest/meta-data/") + mock_cs.assert_not_called() + + +@pytest.mark.asyncio +async def test_gcp_metadata_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="metadata"): + await fetch_image_url("http://metadata.google.internal/computeMetadata/v1/") + mock_cs.assert_not_called() + + +@pytest.mark.asyncio +async def test_azure_metadata_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="metadata"): + await fetch_image_url("http://169.254.170.2/metadata/instance") + mock_cs.assert_not_called() + + +# --------------------------------------------------------------------------- +# Rejection cases — literal private/loopback/link-local IPs +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_private_ip_rfc1918_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="Non-public IP"): + await fetch_image_url("http://192.168.1.1/image.jpg") + mock_cs.assert_not_called() + + +@pytest.mark.asyncio +async def test_private_ip_10_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="Non-public IP"): + await fetch_image_url("http://10.0.0.1/image.jpg") + mock_cs.assert_not_called() + + +@pytest.mark.asyncio +async def test_loopback_rejected() -> None: + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="Non-public IP"): + await fetch_image_url("http://127.0.0.1/internal") + mock_cs.assert_not_called() + + +@pytest.mark.asyncio +async def test_link_local_non_metadata_rejected() -> None: + """Link-local IPs not in metadata blocklist are still rejected by the IP check.""" + with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: + with pytest.raises(ValueError, match="Non-public IP"): + await fetch_image_url("http://169.254.1.1/any") + mock_cs.assert_not_called() + + +# --------------------------------------------------------------------------- +# Allowed cases +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_valid_https_url_succeeds() -> None: + image_data = b"\x89PNG\r\n" + session = _mock_session(image_data) + with patch("exo.api.adapters.chat_completions.create_http_session", return_value=session): + result = await fetch_image_url("https://example.com/image.png") + assert result == base64.b64encode(image_data).decode("ascii") + session.get.assert_called_once() + + +@pytest.mark.asyncio +async def test_public_ip_literal_allowed() -> None: + """8.8.8.8 is a genuine public IP (Google DNS); should pass the IP check.""" + image_data = b"data" + session = _mock_session(image_data) + with patch("exo.api.adapters.chat_completions.create_http_session", return_value=session): + result = await fetch_image_url("https://8.8.8.8/image.jpg") + assert result == base64.b64encode(image_data).decode("ascii") + session.get.assert_called_once() + + +@pytest.mark.asyncio +async def test_hostname_not_literal_ip_allowed_through() -> None: + """A plain hostname is not a literal IP; ip_address() raises ValueError and check is skipped.""" + image_data = b"pixels" + session = _mock_session(image_data) + with patch("exo.api.adapters.chat_completions.create_http_session", return_value=session): + result = await fetch_image_url("https://cdn.example.com/photo.jpg") + assert result == base64.b64encode(image_data).decode("ascii") + session.get.assert_called_once() From 2efb1526554f83eefc8840f51baea465a743b2d3 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 31 May 2026 17:25:23 -0700 Subject: [PATCH 2/2] style: apply ruff format (nix fmt equivalent for Python) --- src/exo/api/adapters/chat_completions.py | 20 ++++++++++++------- .../adapters/tests/test_fetch_image_url.py | 18 +++++++++++++---- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/exo/api/adapters/chat_completions.py b/src/exo/api/adapters/chat_completions.py index ae68c9fbbc..58a4f01d44 100644 --- a/src/exo/api/adapters/chat_completions.py +++ b/src/exo/api/adapters/chat_completions.py @@ -40,11 +40,13 @@ resolve_reasoning_params, ) -_BLOCKED_METADATA_HOSTS: frozenset[str] = frozenset({ - "169.254.169.254", # AWS IMDSv1 - "metadata.google.internal", # GCP - "169.254.170.2", # Azure IMDS -}) +_BLOCKED_METADATA_HOSTS: frozenset[str] = frozenset( + { + "169.254.169.254", # AWS IMDSv1 + "metadata.google.internal", # GCP + "169.254.170.2", # Azure IMDS + } +) def extract_base64_from_data_url(data_url: str) -> Base64Image: @@ -58,10 +60,14 @@ async def fetch_image_url(url: str) -> Base64Image: parsed = urlparse(url) if parsed.scheme not in ("http", "https"): - raise ValueError(f"URL scheme '{parsed.scheme}' not allowed; only http and https are permitted") + raise ValueError( + f"URL scheme '{parsed.scheme}' not allowed; only http and https are permitted" + ) if parsed.hostname in _BLOCKED_METADATA_HOSTS: - raise ValueError(f"Access to '{parsed.hostname}' is denied (cloud metadata endpoint)") + raise ValueError( + f"Access to '{parsed.hostname}' is denied (cloud metadata endpoint)" + ) if parsed.hostname: try: diff --git a/src/exo/api/adapters/tests/test_fetch_image_url.py b/src/exo/api/adapters/tests/test_fetch_image_url.py index 8334762a4f..d7c3cb0c94 100644 --- a/src/exo/api/adapters/tests/test_fetch_image_url.py +++ b/src/exo/api/adapters/tests/test_fetch_image_url.py @@ -12,11 +12,11 @@ from exo.api.adapters.chat_completions import fetch_image_url - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- + def _mock_session(response_data: bytes = b"img") -> MagicMock: """Return a mock aiohttp session whose .get() never actually sends a request.""" resp = MagicMock() @@ -36,6 +36,7 @@ def _mock_session(response_data: bytes = b"img") -> MagicMock: # Rejection cases — scheme # --------------------------------------------------------------------------- + @pytest.mark.asyncio async def test_file_scheme_rejected() -> None: with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: @@ -56,6 +57,7 @@ async def test_ftp_scheme_rejected() -> None: # Rejection cases — metadata host blocklist # --------------------------------------------------------------------------- + @pytest.mark.asyncio async def test_aws_metadata_rejected() -> None: with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: @@ -84,6 +86,7 @@ async def test_azure_metadata_rejected() -> None: # Rejection cases — literal private/loopback/link-local IPs # --------------------------------------------------------------------------- + @pytest.mark.asyncio async def test_private_ip_rfc1918_rejected() -> None: with patch("exo.api.adapters.chat_completions.create_http_session") as mock_cs: @@ -121,11 +124,14 @@ async def test_link_local_non_metadata_rejected() -> None: # Allowed cases # --------------------------------------------------------------------------- + @pytest.mark.asyncio async def test_valid_https_url_succeeds() -> None: image_data = b"\x89PNG\r\n" session = _mock_session(image_data) - with patch("exo.api.adapters.chat_completions.create_http_session", return_value=session): + with patch( + "exo.api.adapters.chat_completions.create_http_session", return_value=session + ): result = await fetch_image_url("https://example.com/image.png") assert result == base64.b64encode(image_data).decode("ascii") session.get.assert_called_once() @@ -136,7 +142,9 @@ async def test_public_ip_literal_allowed() -> None: """8.8.8.8 is a genuine public IP (Google DNS); should pass the IP check.""" image_data = b"data" session = _mock_session(image_data) - with patch("exo.api.adapters.chat_completions.create_http_session", return_value=session): + with patch( + "exo.api.adapters.chat_completions.create_http_session", return_value=session + ): result = await fetch_image_url("https://8.8.8.8/image.jpg") assert result == base64.b64encode(image_data).decode("ascii") session.get.assert_called_once() @@ -147,7 +155,9 @@ async def test_hostname_not_literal_ip_allowed_through() -> None: """A plain hostname is not a literal IP; ip_address() raises ValueError and check is skipped.""" image_data = b"pixels" session = _mock_session(image_data) - with patch("exo.api.adapters.chat_completions.create_http_session", return_value=session): + with patch( + "exo.api.adapters.chat_completions.create_http_session", return_value=session + ): result = await fetch_image_url("https://cdn.example.com/photo.jpg") assert result == base64.b64encode(image_data).decode("ascii") session.get.assert_called_once()