diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml new file mode 100644 index 000000000..60fb2abbd --- /dev/null +++ b/.github/workflows/unit-tests.yml @@ -0,0 +1,29 @@ +name: Unit Tests + +on: + pull_request: + types: [ opened, edited, reopened, synchronize, ready_for_review ] + +permissions: {} + +jobs: + unit-tests: + runs-on: ubuntu-latest + permissions: + contents: read + strategy: + matrix: + python-version: ["3.12"] + steps: + - name: Checkout repository + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0 + with: + python-version: ${{ matrix.python-version }} + - name: Install test dependencies + run: python -m pip install pytest + - name: Run unit tests + run: python -m pytest tests/ -v diff --git a/.gitignore b/.gitignore index af5e6bd82..503386e9d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ # Common editor temp files *.swp .claude + +# Python +__pycache__/ +*.pyc +.pytest_cache/ diff --git a/scripts/detect_interface.py b/scripts/detect_interface.py new file mode 100644 index 000000000..6e7d727a7 --- /dev/null +++ b/scripts/detect_interface.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +"""Network interface detection helpers for ironic. + +Uses ``ip -json -d`` for structured output, which correctly handles +cases where a MAC or IP address appears on multiple interfaces (e.g. a +physical interface enslaved to an OVS or Linux bridge). + +Subcommands +----------- +interface-of-mac [] (default) + Detect the provisioning interface. *macs_csv* is a + comma-separated list of MAC addresses; falls back to the + PROVISIONING_MACS environment variable when omitted. + +interface-of-ip [4|6] + Return the interface that carries *ip_address*. +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +from typing import Any + +# Type alias for the dict entries returned by ``ip -json``. +IfaceData = dict[str, Any] + +# (interface_name, is_bridge, has_global_ip) +Candidate = tuple[str, bool, bool] + + +def _ip_json(*args: str) -> list[IfaceData]: + """Run an ``ip -json -d`` command and return the parsed output.""" + result: subprocess.CompletedProcess[str] = subprocess.run( + ["ip", "-json", "-d"] + list(args), + capture_output=True, + text=True, + check=False, + ) + try: + return json.loads(result.stdout) + except (json.JSONDecodeError, ValueError): + return [] + + +def _iface_name(data: IfaceData) -> str: + """Return the base interface name, stripping any ``@link`` suffix.""" + return data.get("ifname", "").split("@")[0] + + +def _is_bridge(data: IfaceData) -> bool: + """Return True if the interface is a bridge (Linux bridge or OVS).""" + kind: str = data.get("linkinfo", {}).get("info_kind", "") + return kind in ("bridge", "openvswitch") + + +def _has_global_address(ifname: str, addr_data: list[IfaceData]) -> bool: + """Return True if *ifname* carries at least one global-scope address.""" + for iface in addr_data: + if _iface_name(iface) != ifname: + continue + for addr_info in iface.get("addr_info", []): + if addr_info.get("scope") == "global": + return True + return False + + +# -- MAC-based detection --------------------------------------------------- + +def find_by_mac(macs_csv: str) -> str | None: + """Return the best UP interface whose MAC matches one in *macs_csv*. + + When a MAC appears on both a physical interface and a bridge (common + with OVN-Kubernetes), the selection prefers: + + 1. The interface that already carries a global IP address (this is + the one dnsmasq should bind to). + 2. Otherwise, the non-bridge (physical) interface. + 3. As a last resort, the first match. + """ + link_data: list[IfaceData] = _ip_json("link", "show", "up") + addr_data: list[IfaceData] = _ip_json("addr", "show") + + for mac in macs_csv.split(","): + mac = mac.strip().lower() + if not mac: + continue + + candidates: list[Candidate] = [] + for iface in link_data: + if iface.get("address", "").lower() == mac: + name: str = _iface_name(iface) + bridge: bool = _is_bridge(iface) + has_ip: bool = _has_global_address(name, addr_data) + candidates.append((name, bridge, has_ip)) + + if not candidates: + continue + + if len(candidates) == 1: + return candidates[0][0] + + with_ip: list[Candidate] = [c for c in candidates if c[2]] + if len(with_ip) == 1: + return with_ip[0][0] + + pool: list[Candidate] = with_ip or candidates + physical: list[Candidate] = [c for c in pool if not c[1]] + if physical: + return physical[0][0] + + return pool[0][0] + + return None + + +def detect_provisioning_interface(macs_csv: str | None = None) -> str: + """Return the name of the provisioning interface. + + *macs_csv* is a comma-separated list of MAC addresses to match. + Falls back to the ``PROVISIONING_MACS`` environment variable when + *macs_csv* is ``None`` or empty. + """ + provisioning_macs: str = ( + macs_csv if macs_csv else os.environ.get("PROVISIONING_MACS", "") + ) + + interface: str = "provisioning" + + if provisioning_macs: + found: str | None = find_by_mac(provisioning_macs) + if found: + interface = found + + return interface + + +# -- IP-based detection ---------------------------------------------------- + +_VALID_IP_VERSIONS: set[str] = {"4", "6"} +_VALID_SUBCOMMANDS: set[str | None] = {None, "interface-of-mac", "interface-of-ip"} + + +def find_by_ip(ip_addr: str, ip_version: str | None = None) -> str: + """Return the first interface carrying *ip_addr*, or empty string. + + *ip_version* can be ``"4"`` or ``"6"`` to restrict the address + family, or ``None`` to search both. + + Raises ``ValueError`` if *ip_version* is not ``None``, ``"4"``, + or ``"6"``. + """ + if ip_version is not None and ip_version not in _VALID_IP_VERSIONS: + raise ValueError( + f"ip_version must be '4', '6', or None, got {ip_version!r}") + + args: list[str] = ["addr", "show"] + if ip_version: + args = [f"-{ip_version}"] + args + + ip_bare: str = ip_addr.split("/")[0].lower() + + for iface in _ip_json(*args): + for addr_info in iface.get("addr_info", []): + if addr_info.get("local", "").lower() == ip_bare: + return _iface_name(iface) + return "" + + +# -- CLI entry point ------------------------------------------------------- + +_USAGE: str = ( + "Usage: detect_interface.py" + " [interface-of-mac [] | interface-of-ip [4|6]]" +) + + +def main() -> None: + subcommand: str | None = sys.argv[1] if len(sys.argv) >= 2 else None + + if subcommand not in _VALID_SUBCOMMANDS: + print(f"ERROR: unknown subcommand {subcommand!r}\n{_USAGE}", + file=sys.stderr) + sys.exit(1) + + if subcommand == "interface-of-ip": + if len(sys.argv) < 3: + print(f"ERROR: interface-of-ip requires an IP address\n{_USAGE}", + file=sys.stderr) + sys.exit(1) + ip_addr: str = sys.argv[2] + ip_version: str | None = sys.argv[3] if len(sys.argv) > 3 else None + print(find_by_ip(ip_addr, ip_version)) + else: + macs_csv: str | None = sys.argv[2] if len(sys.argv) > 2 else None + print(detect_provisioning_interface(macs_csv)) + + +if __name__ == "__main__": + main() diff --git a/scripts/ironic-common.sh b/scripts/ironic-common.sh index 9d71194b5..3bfaa5f07 100644 --- a/scripts/ironic-common.sh +++ b/scripts/ironic-common.sh @@ -41,21 +41,11 @@ export IRONIC_USE_MARIADB=${IRONIC_USE_MARIADB:-false} get_provisioning_interface() { if [[ -n "$PROVISIONING_INTERFACE" ]]; then - # don't override the PROVISIONING_INTERFACE if one is provided echo "$PROVISIONING_INTERFACE" return fi - local interface="provisioning" - - for mac in ${PROVISIONING_MACS//,/ }; do - if ip -br link show up | grep -i "$mac" &>/dev/null; then - interface="$(ip -br link show up | grep -i "$mac" | cut -f 1 -d ' ' | cut -f 1 -d '@')" - break - fi - done - - echo "$interface" + python3.12 /bin/detect_interface.py interface-of-mac "$PROVISIONING_MACS" } PROVISIONING_INTERFACE="$(get_provisioning_interface)" @@ -65,19 +55,14 @@ export LISTEN_ALL_INTERFACES="${LISTEN_ALL_INTERFACES:-true}" get_interface_of_ip() { - local IP_VERS - local IP_ADDR - - if [[ $# -gt 2 ]]; then - echo "ERROR: ${FUNCNAME[0]}: too many parameters" >&2 + if [[ $# -lt 1 ]] || [[ $# -gt 2 ]]; then + echo "ERROR: ${FUNCNAME[0]}: usage: get_interface_of_ip IP_ADDR [4|6]" >&2 exit 1 fi if [[ $# -eq 2 ]]; then case "$2" in - 4|6) - IP_VERS="-$2" - ;; + 4|6) ;; *) echo "ERROR: ${FUNCNAME[0]}: the second parameter should be [4|6] (or missing for both)" >&2 exit 1 @@ -85,11 +70,7 @@ get_interface_of_ip() esac fi - IP_ADDR="$1" - - if ip "${IP_VERS[@]}" -br addr show | grep -F " ${IP_ADDR}/" &>/dev/null; then - ip "${IP_VERS[@]}" -br addr show | grep -F " ${IP_ADDR}/" | cut -f 1 -d ' ' | cut -f 1 -d '@' - fi + python3.12 /bin/detect_interface.py interface-of-ip "$@" } parse_ip_address() diff --git a/tests/test_detect_interface.py b/tests/test_detect_interface.py new file mode 100644 index 000000000..b13f00dbe --- /dev/null +++ b/tests/test_detect_interface.py @@ -0,0 +1,362 @@ +"""Unit tests for detect_interface.py.""" + +import os +import sys +import unittest +from unittest import mock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) + +import detect_interface + + +def _link_entry(ifname, mac, up=True, kind=None): + """Build a minimal ``ip -json -d link show`` entry.""" + entry = { + "ifname": ifname, + "address": mac, + "operstate": "UP" if up else "DOWN", + } + if kind: + entry["linkinfo"] = {"info_kind": kind} + return entry + + +def _addr_entry(ifname, *addrs): + """Build a minimal ``ip -json addr show`` entry. + + Each element in *addrs* is ``(ip, scope)`` — e.g. + ``("192.168.1.10", "global")``. + """ + return { + "ifname": ifname, + "addr_info": [ + {"local": ip, "scope": scope} for ip, scope in addrs + ], + } + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class TestIsBridge(unittest.TestCase): + + def test_linux_bridge(self): + data = {"linkinfo": {"info_kind": "bridge"}} + self.assertTrue(detect_interface._is_bridge(data)) + + def test_ovs_bridge(self): + data = {"linkinfo": {"info_kind": "openvswitch"}} + self.assertTrue(detect_interface._is_bridge(data)) + + def test_physical(self): + data = {"linkinfo": {"info_kind": ""}} + self.assertFalse(detect_interface._is_bridge(data)) + + def test_no_linkinfo(self): + self.assertFalse(detect_interface._is_bridge({})) + + +class TestIfaceName(unittest.TestCase): + + def test_plain_name(self): + self.assertEqual( + detect_interface._iface_name({"ifname": "eth0"}), "eth0") + + def test_at_suffix_stripped(self): + self.assertEqual( + detect_interface._iface_name({"ifname": "eno1@br-ex"}), "eno1") + + +class TestHasGlobalAddress(unittest.TestCase): + + def test_has_global(self): + addr_data = [_addr_entry("eth0", ("10.0.0.1", "global"))] + self.assertTrue( + detect_interface._has_global_address("eth0", addr_data)) + + def test_only_link_local(self): + addr_data = [_addr_entry("eth0", ("fe80::1", "link"))] + self.assertFalse( + detect_interface._has_global_address("eth0", addr_data)) + + def test_wrong_interface(self): + addr_data = [_addr_entry("eth1", ("10.0.0.1", "global"))] + self.assertFalse( + detect_interface._has_global_address("eth0", addr_data)) + + +# --------------------------------------------------------------------------- +# find_by_mac +# --------------------------------------------------------------------------- + +def _patch_ip_json(link_data, addr_data): + """Return a patcher that feeds *link_data* and *addr_data* to _ip_json.""" + def fake_ip_json(*args): + if "link" in args: + return link_data + return addr_data + return mock.patch.object(detect_interface, "_ip_json", side_effect=fake_ip_json) + + +class TestFindByMac(unittest.TestCase): + + def test_single_match(self): + link = [_link_entry("eth0", "aa:bb:cc:dd:ee:ff")] + addr = [_addr_entry("eth0", ("10.0.0.1", "global"))] + with _patch_ip_json(link, addr): + self.assertEqual( + detect_interface.find_by_mac("aa:bb:cc:dd:ee:ff"), "eth0") + + def test_no_match_returns_none(self): + link = [_link_entry("eth0", "aa:bb:cc:dd:ee:ff")] + addr = [] + with _patch_ip_json(link, addr): + self.assertIsNone( + detect_interface.find_by_mac("11:22:33:44:55:66")) + + def test_case_insensitive(self): + link = [_link_entry("eth0", "aa:bb:cc:dd:ee:ff")] + addr = [] + with _patch_ip_json(link, addr): + self.assertEqual( + detect_interface.find_by_mac("AA:BB:CC:DD:EE:FF"), "eth0") + + def test_multiple_macs_first_hit_wins(self): + link = [_link_entry("eth1", "11:22:33:44:55:66")] + addr = [] + with _patch_ip_json(link, addr): + self.assertEqual( + detect_interface.find_by_mac( + "aa:bb:cc:dd:ee:ff,11:22:33:44:55:66"), + "eth1") + + def test_prefers_interface_with_ip(self): + """The bug scenario: eno1 and br-ex share a MAC; br-ex has the IP.""" + mac = "6c:92:cf:0d:03:e6" + link = [ + _link_entry("eno1@br-ex", mac), + _link_entry("br-ex", mac, kind="openvswitch"), + ] + addr = [ + _addr_entry("eno1"), + _addr_entry("br-ex", ("192.168.111.10", "global")), + ] + with _patch_ip_json(link, addr): + self.assertEqual(detect_interface.find_by_mac(mac), "br-ex") + + def test_prefers_physical_when_no_ip(self): + """No interface has an IP yet — prefer physical over bridge.""" + mac = "6c:92:cf:0d:03:e6" + link = [ + _link_entry("eno1@br-ex", mac), + _link_entry("br-ex", mac, kind="bridge"), + ] + addr = [ + _addr_entry("eno1"), + _addr_entry("br-ex"), + ] + with _patch_ip_json(link, addr): + self.assertEqual(detect_interface.find_by_mac(mac), "eno1") + + def test_prefers_physical_with_ip_over_bridge_with_ip(self): + """Both have IPs — prefer physical.""" + mac = "aa:bb:cc:dd:ee:ff" + link = [ + _link_entry("eno1@br-ex", mac), + _link_entry("br-ex", mac, kind="openvswitch"), + ] + addr = [ + _addr_entry("eno1", ("10.0.0.1", "global")), + _addr_entry("br-ex", ("10.0.0.1", "global")), + ] + with _patch_ip_json(link, addr): + self.assertEqual(detect_interface.find_by_mac(mac), "eno1") + + def test_falls_back_to_bridge_if_only_bridges(self): + mac = "aa:bb:cc:dd:ee:ff" + link = [ + _link_entry("br0", mac, kind="bridge"), + _link_entry("br1", mac, kind="openvswitch"), + ] + addr = [_addr_entry("br0"), _addr_entry("br1")] + with _patch_ip_json(link, addr): + self.assertEqual(detect_interface.find_by_mac(mac), "br0") + + def test_empty_macs(self): + link = [_link_entry("eth0", "aa:bb:cc:dd:ee:ff")] + addr = [] + with _patch_ip_json(link, addr): + self.assertIsNone(detect_interface.find_by_mac("")) + self.assertIsNone(detect_interface.find_by_mac(",,,")) + + +# --------------------------------------------------------------------------- +# detect_provisioning_interface +# --------------------------------------------------------------------------- + +class TestDetectProvisioningInterface(unittest.TestCase): + + @mock.patch.object(detect_interface, "find_by_mac", return_value="eth0") + def test_explicit_macs_argument(self, mock_find): + self.assertEqual( + detect_interface.detect_provisioning_interface( + "aa:bb:cc:dd:ee:ff"), "eth0") + mock_find.assert_called_once_with("aa:bb:cc:dd:ee:ff") + + @mock.patch.dict("os.environ", {"PROVISIONING_MACS": "aa:bb:cc:dd:ee:ff"}) + @mock.patch.object(detect_interface, "find_by_mac", return_value="eth0") + def test_falls_back_to_env_var(self, mock_find): + self.assertEqual( + detect_interface.detect_provisioning_interface(), "eth0") + mock_find.assert_called_once_with("aa:bb:cc:dd:ee:ff") + + @mock.patch.dict("os.environ", {"PROVISIONING_MACS": "aa:bb:cc:dd:ee:ff"}) + @mock.patch.object(detect_interface, "find_by_mac", return_value=None) + def test_defaults_to_provisioning(self, _mock): + self.assertEqual( + detect_interface.detect_provisioning_interface(), "provisioning") + + @mock.patch.dict("os.environ", {}, clear=True) + def test_no_macs_defaults_to_provisioning(self): + self.assertEqual( + detect_interface.detect_provisioning_interface(), "provisioning") + + @mock.patch.dict("os.environ", {"PROVISIONING_MACS": "from:env:only"}) + @mock.patch.object(detect_interface, "find_by_mac", return_value="eth1") + def test_explicit_arg_overrides_env(self, mock_find): + self.assertEqual( + detect_interface.detect_provisioning_interface( + "from:cli:arg"), "eth1") + mock_find.assert_called_once_with("from:cli:arg") + + +# --------------------------------------------------------------------------- +# find_by_ip +# --------------------------------------------------------------------------- + +class TestFindByIp(unittest.TestCase): + + def test_match(self): + addr = [_addr_entry("eth0", ("192.168.1.10", "global"))] + with mock.patch.object( + detect_interface, "_ip_json", return_value=addr): + self.assertEqual( + detect_interface.find_by_ip("192.168.1.10"), "eth0") + + def test_no_match_returns_empty(self): + addr = [_addr_entry("eth0", ("192.168.1.10", "global"))] + with mock.patch.object( + detect_interface, "_ip_json", return_value=addr): + self.assertEqual( + detect_interface.find_by_ip("10.0.0.99"), "") + + def test_strips_prefix_length(self): + addr = [_addr_entry("eth0", ("192.168.1.10", "global"))] + with mock.patch.object( + detect_interface, "_ip_json", return_value=addr): + self.assertEqual( + detect_interface.find_by_ip("192.168.1.10/24"), "eth0") + + def test_case_insensitive_ipv6(self): + addr = [_addr_entry("eth0", ("fd00::1", "global"))] + with mock.patch.object( + detect_interface, "_ip_json", return_value=addr): + self.assertEqual( + detect_interface.find_by_ip("FD00::1"), "eth0") + + def test_ip_version_passed_to_ip_json(self): + with mock.patch.object( + detect_interface, "_ip_json", return_value=[]) as m: + detect_interface.find_by_ip("10.0.0.1", ip_version="4") + m.assert_called_once_with("-4", "addr", "show") + + def test_strips_at_suffix(self): + addr = [_addr_entry("eno1@br-ex", ("10.0.0.1", "global"))] + with mock.patch.object( + detect_interface, "_ip_json", return_value=addr): + self.assertEqual( + detect_interface.find_by_ip("10.0.0.1"), "eno1") + + def test_invalid_ip_version_raises(self): + with self.assertRaises(ValueError): + detect_interface.find_by_ip("10.0.0.1", ip_version="5") + + def test_valid_ip_versions_accepted(self): + with mock.patch.object( + detect_interface, "_ip_json", return_value=[]): + detect_interface.find_by_ip("10.0.0.1", ip_version="4") + detect_interface.find_by_ip("10.0.0.1", ip_version="6") + detect_interface.find_by_ip("10.0.0.1", ip_version=None) + + +# --------------------------------------------------------------------------- +# CLI (main) +# --------------------------------------------------------------------------- + +class TestMain(unittest.TestCase): + + @mock.patch.object(detect_interface, "detect_provisioning_interface", + return_value="eth0") + def test_default_subcommand(self, _mock): + with mock.patch("sys.argv", ["detect_interface.py"]): + with mock.patch("builtins.print") as mock_print: + detect_interface.main() + mock_print.assert_called_once_with("eth0") + + @mock.patch.object(detect_interface, "find_by_ip", return_value="eno1") + def test_interface_of_ip_subcommand(self, mock_find): + with mock.patch("sys.argv", + ["detect_interface.py", "interface-of-ip", + "10.0.0.1", "4"]): + with mock.patch("builtins.print") as mock_print: + detect_interface.main() + mock_find.assert_called_once_with("10.0.0.1", "4") + mock_print.assert_called_once_with("eno1") + + def test_interface_of_ip_missing_addr_exits(self): + with mock.patch("sys.argv", + ["detect_interface.py", "interface-of-ip"]): + with self.assertRaises(SystemExit) as ctx: + detect_interface.main() + self.assertEqual(ctx.exception.code, 1) + + @mock.patch.object(detect_interface, "detect_provisioning_interface", + return_value="eth0") + def test_explicit_interface_of_mac_with_arg(self, mock_detect): + with mock.patch("sys.argv", + ["detect_interface.py", "interface-of-mac", + "aa:bb:cc:dd:ee:ff"]): + with mock.patch("builtins.print") as mock_print: + detect_interface.main() + mock_detect.assert_called_once_with("aa:bb:cc:dd:ee:ff") + mock_print.assert_called_once_with("eth0") + + @mock.patch.object(detect_interface, "detect_provisioning_interface", + return_value="eth0") + def test_explicit_interface_of_mac_no_arg(self, mock_detect): + with mock.patch("sys.argv", + ["detect_interface.py", "interface-of-mac"]): + with mock.patch("builtins.print") as mock_print: + detect_interface.main() + mock_detect.assert_called_once_with(None) + mock_print.assert_called_once_with("eth0") + + def test_unknown_subcommand_exits(self): + with mock.patch("sys.argv", + ["detect_interface.py", "interface-of-Ip"]): + with self.assertRaises(SystemExit) as ctx: + detect_interface.main() + self.assertEqual(ctx.exception.code, 1) + + def test_garbage_argument_exits(self): + with mock.patch("sys.argv", + ["detect_interface.py", "foobar"]): + with self.assertRaises(SystemExit) as ctx: + detect_interface.main() + self.assertEqual(ctx.exception.code, 1) + + +if __name__ == "__main__": + unittest.main()