diff --git a/salt/channel/server.py b/salt/channel/server.py index 9ddf2dbbb9a..81b51edc5cf 100644 --- a/salt/channel/server.py +++ b/salt/channel/server.py @@ -1639,6 +1639,157 @@ def _start_raft_as_learner(self, known_peers): def gen_token(self): return "".join(random.choices(string.ascii_letters + string.digits, k=32)) + async def _run_root_sync_to_peers(self, channels): + """ + Operator-driven push of ``file_roots`` and/or ``pillar_roots`` to + every peer in the cluster. Triggered by the ``cluster.sync_roots`` + runner via the ``cluster/runner/sync_roots`` local event. + + For each peer: + + 1. Allocate a fresh session id. + 2. Emit a ``cluster/peer/sync-roots-begin`` event to the peer so + the receiver pre-registers a state-sync session — same + contract as the join-reply flow, but the receiver's + ``on_complete`` is a no-op (this is an ad-hoc push, not a + Raft-learner bootstrap). + 3. Stream the requested channels to that peer via the standard + state-sync chunk format. + + Errors per-peer are logged but do not stop the fan-out — a + partially-online cluster can still receive the update on + reachable peers. + """ + from salt.cluster.state_sync import ( # pylint: disable=import-outside-toplevel + FILE_ROOTS_CHANNEL, + PILLAR_ROOTS_CHANNEL, + new_session_id, + ) + + roots_for = { + FILE_ROOTS_CHANNEL: self.opts.get("file_roots"), + PILLAR_ROOTS_CHANNEL: self.opts.get("pillar_roots"), + } + # Honour the channel filter — operator may want only one of the + # two trees synced. + active_channels = [ + ch for ch in (FILE_ROOTS_CHANNEL, PILLAR_ROOTS_CHANNEL) if ch in channels + ] + if not active_channels: + log.warning( + "cluster.sync_roots: no valid channels (got %r), skipping", + channels, + ) + return + + crypticle = salt.crypt.Crypticle( + self.opts, + salt.master.SMaster.secrets["cluster_aes"]["secret"].value, + ) + + for pusher in self.pushers: + await self._send_sync_roots_to_peer( + pusher, + active_channels, + roots_for, + crypticle, + new_session_id(), + ) + + async def _send_sync_roots_to_peer( + self, pusher, active_channels, roots_for, crypticle, session_id + ): + """ + Push one ``cluster.sync_roots`` session to a single peer. + + Split out of :meth:`_run_root_sync_to_peers` so the per-peer + loop variables are explicit method arguments — avoids + cell-var-from-loop closure captures on the per-channel send. + """ + from salt.cluster.state_sync import ( # pylint: disable=import-outside-toplevel + iter_root_chunks, + ) + + peer_id = pusher.pull_host + log.info( + "cluster.sync_roots: starting %s to peer %s (session %s)", + active_channels, + peer_id, + session_id, + ) + + # Pre-announce the session to the receiver. Encrypted under + # cluster_aes; the receiver decrypts and registers the session + # with an on_complete that tears down the registry entry (vs. + # join-reply's ``_start_raft_as_learner``). + begin_payload = {"session": session_id, "channels": active_channels} + begin_event = salt.utils.event.SaltEvent.pack( + salt.utils.event.tagify("sync-roots-begin", "peer", "cluster"), + crypticle.dumps(begin_payload), + ) + try: + await pusher.publish(begin_event) + except Exception: # pylint: disable=broad-except + log.exception( + "cluster.sync_roots: failed to send sync-roots-begin to %s", + peer_id, + ) + return + + for channel in active_channels: + await self._send_sync_roots_channel( + pusher, + crypticle, + session_id, + peer_id, + channel, + iter_root_chunks(roots_for[channel]), + ) + + async def _send_sync_roots_channel( + self, pusher, crypticle, session_id, peer_id, channel, chunks + ): + """ + Stream one state-sync channel's chunks to a single peer for an + operator-driven ``cluster.sync_roots`` session. + """ + chunks = list(chunks) + if not chunks: + chunks = [[]] + total = len(chunks) + for seq, items in enumerate(chunks): + payload = { + "session": session_id, + "channel": channel, + "seq": seq, + "total": total, + "eof": seq == total - 1, + "items": items, + } + chunk_event = salt.utils.event.SaltEvent.pack( + salt.utils.event.tagify("state-sync-chunk", "peer", "cluster"), + crypticle.dumps(payload), + ) + try: + await pusher.publish(chunk_event) + except Exception: # pylint: disable=broad-except + log.exception( + "cluster.sync_roots: chunk %s/%s seq=%d to %s failed", + session_id, + channel, + seq, + peer_id, + ) + return + log.info( + "cluster.sync_roots: %s/%s sent %d chunks (%d items) to %s", + session_id, + channel, + total, + sum(len(c) for c in chunks), + peer_id, + ) + async def _send_state_sync_chunks(self, session_id, peer_id): """ Stream the four state-sync channels (keys, denied_keys, @@ -1866,6 +2017,75 @@ def _on_complete(): session_id, ) + def _begin_root_sync_session(self, session_id, channels): + """ + Pre-register a state-sync session for an operator-driven + ``cluster.sync_roots`` push. + + Mirrors ``_begin_state_sync_session`` but the on-complete is a + plain teardown (no Raft-learner bootstrap, no discover_event to + set). Idempotent: a duplicate ``sync-roots-begin`` for an + already-registered session is logged and ignored. + """ + from salt.cluster.state_sync import ( # pylint: disable=import-outside-toplevel + ALL_CHANNELS, + DEFAULT_RECEIVE_TIMEOUT, + StateSyncSession, + ) + + if not session_id: + log.warning("sync-roots-begin without session_id; dropping") + return + if not hasattr(self, "_state_sync_sessions"): + self._state_sync_sessions = {} + if session_id in self._state_sync_sessions: + log.warning( + "Duplicate sync-roots-begin session id %s; ignoring", session_id + ) + return + + # Defensive: empty channels list defaults to all four (matches + # join-time semantics). + if not channels: + channels = list(ALL_CHANNELS) + + completed_holder = {"done": False} + + def _on_complete(): + if completed_holder["done"]: + return + completed_holder["done"] = True + log.info( + "cluster.sync_roots: session %s complete (channels=%s)", + session_id, + channels, + ) + handle = session.watchdog_handle + if handle is not None: + try: + handle.cancel() + except Exception: # pylint: disable=broad-except + pass + self._state_sync_sessions.pop(session_id, None) + + # The StateSyncSession state machine tracks per-channel eof. We + # tell it about only the channels we expect — when each fires + # eof, the session triggers on_complete. + session = StateSyncSession(session_id, _on_complete, channels=channels) + session.watchdog_handle = None + self._state_sync_sessions[session_id] = session + + try: + loop = asyncio.get_event_loop() + session.watchdog_handle = loop.call_later( + DEFAULT_RECEIVE_TIMEOUT, session.force_complete + ) + except RuntimeError: + log.warning( + "cluster.sync_roots: no event loop for watchdog on session %s", + session_id, + ) + def _join_sentinel_path(self): """ Return the path to the per-master join sentinel file. @@ -2240,6 +2460,27 @@ async def handle_pool_publish(self, payload): return self._apply_state_sync_chunk(chunk) return + if tag.startswith("cluster/peer/sync-roots-begin"): + # Operator-driven push from a peer (via the + # ``cluster.sync_roots`` runner). Pre-register a state- + # sync session keyed by the announced session_id so the + # subsequent ``state-sync-chunk`` events have somewhere + # to land. ``on_complete`` here is a no-op (just removes + # the registry entry) because this is an ad-hoc content + # push, not a join-time Raft-learner bootstrap. + try: + crypticle = salt.crypt.Crypticle( + self.opts, + salt.master.SMaster.secrets["cluster_aes"]["secret"].value, + ) + begin = crypticle.loads(data) + except Exception: # pylint: disable=broad-except + log.exception("Failed to decrypt sync-roots-begin") + return + self._begin_root_sync_session( + begin.get("session"), begin.get("channels") or [] + ) + return if tag.startswith("cluster/peer/join-notify"): # join-notify is encrypted with the shared cluster AES key. try: @@ -2810,7 +3051,14 @@ def extract_cluster_event(self, peer_id, data): async def publish_payload(self, load, *args): tag, data = salt.utils.event.SaltEvent.unpack(load) - # log.warning("Event %s %s %r", len(self.pushers), tag, data) + # Operator-triggered cluster operations originate as ``cluster/runner/*`` + # events fired by the runner subprocess. Intercept them here so the + # event is consumed locally rather than broadcast as a regular + # cluster event. + if tag == "cluster/runner/sync_roots": + channels = data.get("channels") or ["file_roots", "pillar_roots"] + asyncio.create_task(self._run_root_sync_to_peers(channels)) + return tasks = [] if not tag.startswith("cluster/peer"): tasks = [ diff --git a/salt/runners/cluster.py b/salt/runners/cluster.py index 8ed95457e3a..7d777646bc4 100644 --- a/salt/runners/cluster.py +++ b/salt/runners/cluster.py @@ -150,6 +150,79 @@ def _read_health_sentinel(): return {} +def sync_roots(roots="both"): + """ + Push this master's ``file_roots`` and/or ``pillar_roots`` to every + other cluster master. + + Runs the operator-driven counterpart of the bulk state-sync that + fires automatically during a cluster join. Use it when the + canonical content on this master has changed and you want every + peer to pick up the new files without restarting them or waiting + for the next join handshake. + + The runner fires a local event; the master daemon picks it up and + fans out chunks to every peer over the encrypted cluster pub bus + (same transport as the join-time state-sync). Returns immediately + after the event is fired — the actual sync runs asynchronously in + the master process. Check each peer's master log for the + ``state-sync ... installed N items`` lines to confirm delivery. + + :param roots: ``"file"``, ``"pillar"``, or ``"both"`` (default + ``"both"``). Selects which content trees to sync. + + CLI Example: + + .. code-block:: bash + + # Push both file_roots and pillar_roots to all peers + salt-run cluster.sync_roots + + # Push only file_roots + salt-run cluster.sync_roots roots=file + + .. versionadded:: 3009.0 + """ + if roots not in ("file", "pillar", "both"): + raise ValueError(f"roots must be 'file', 'pillar', or 'both', got {roots!r}") + # Lazy imports — keep the runner module light. + import salt.utils.event # pylint: disable=import-outside-toplevel + + cluster_id = __opts__.get("cluster_id") + if not cluster_id: + return { + "status": "skipped", + "reason": "no cluster_id configured; this master is not a cluster member", + } + + channels = [] + if roots in ("file", "both"): + channels.append("file_roots") + if roots in ("pillar", "both"): + channels.append("pillar_roots") + + with salt.utils.event.get_event( + "master", + sock_dir=__opts__["sock_dir"], + opts=__opts__, + listen=False, + ) as event: + event.fire_event( + {"channels": channels}, + "cluster/runner/sync_roots", + ) + return { + "status": "fan-out initiated", + "channels": channels, + "cluster_id": cluster_id, + "note": ( + "The sync runs asynchronously inside the master daemon. Tail " + "each peer's master log for the 'state-sync ... installed N items' " + "lines to confirm delivery." + ), + } + + def ring_info(): """ Return a snapshot of this master's ring state. diff --git a/tests/pytests/integration/cluster/test_isolated_cluster.py b/tests/pytests/integration/cluster/test_isolated_cluster.py index 9fbc1aa8e0f..a1ecc4dbc8b 100644 --- a/tests/pytests/integration/cluster/test_isolated_cluster.py +++ b/tests/pytests/integration/cluster/test_isolated_cluster.py @@ -467,3 +467,173 @@ def test_isolated_late_joiner_targets_all_existing_minions( f"Minion {mid!r} did not return True via late-joining master_4 " f"(got {data[mid]!r})" ) + + +# --------------------------------------------------------------------------- +# cluster.sync_roots runner — operator-driven content fan-out +# --------------------------------------------------------------------------- + + +def test_isolated_sync_roots_runner_propagates_content( + cluster_master_1_isolated, + cluster_master_2_isolated, + cluster_master_3_isolated, +): + """ + Content added to master_1's ``file_roots`` and ``pillar_roots`` *after* + the cluster is up must propagate to every peer when the operator runs + ``salt-run cluster.sync_roots``. + + Pins the runner-to-peer fan-out contract: + + * The runner returns immediately with ``status: "fan-out initiated"``. + * The actual sync happens asynchronously inside the master daemon. + * Each peer's content tree gets the new files via the same encrypted + state-sync transport used at join time — no special path, no IPC. + + Distinct from ``test_isolated_late_joiner_receives_file_and_pillar_roots``: + that one tests *join-time* bulk sync (content present before a master + joins). This one tests *post-join* operator-driven sync (content added + after the cluster is steady-state). + """ + src_file_root = pathlib.Path( + cluster_master_1_isolated.config["file_roots"]["base"][0] + ) + src_pillar_root = pathlib.Path( + cluster_master_1_isolated.config["pillar_roots"]["base"][0] + ) + + # Unique marker so we can't be fooled by leftover state from a + # neighbouring test or a previous run. + marker = f"sync-roots-marker-{int(time.time())}" + sls_body = f"post-join-id:\n test.succeed_without_changes:\n - name: {marker}\n" + pillar_body = f"sync_roots_marker: {marker}\n" + + sls_path = src_file_root / "post_join_synced.sls" + pillar_path = src_pillar_root / "post_join_synced.sls" + sls_path.write_text(sls_body) + pillar_path.write_text(pillar_body) + + # Fire the runner from master_1. Returns immediately; the daemon + # owns the fan-out. + salt_run = cluster_master_1_isolated.salt_run_cli(timeout=60) + ret = salt_run.run("cluster.sync_roots") + assert ret.returncode == 0, ( + f"cluster.sync_roots exited non-zero (stdout={ret.stdout!r}, " + f"stderr={ret.stderr!r})" + ) + # JSON deserialisation may give us a dict or the raw string depending + # on output format; just check the success substring loosely. + assert "fan-out initiated" in (ret.stdout or "") or ( + isinstance(ret.data, dict) and ret.data.get("status") == "fan-out initiated" + ), f"runner output missing fan-out marker: {ret.stdout!r} / {ret.data!r}" + + # Poll for content arrival on master_2 and master_3. Same transport + # as the join-time bulk sync, so 30 s is comfortably above the + # observed live-cluster window (~5-8 s on a healthy LAN). + peers = (cluster_master_2_isolated, cluster_master_3_isolated) + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: + all_landed = True + for master in peers: + dst_sls = ( + pathlib.Path(master.config["file_roots"]["base"][0]) + / "post_join_synced.sls" + ) + dst_pillar = ( + pathlib.Path(master.config["pillar_roots"]["base"][0]) + / "post_join_synced.sls" + ) + if not dst_sls.is_file() or not dst_pillar.is_file(): + all_landed = False + break + if all_landed: + break + time.sleep(0.5) + + # Per-peer assertions so a failure points at the right master. + for master in peers: + addr = master.config["interface"] + dst_sls = ( + pathlib.Path(master.config["file_roots"]["base"][0]) + / "post_join_synced.sls" + ) + dst_pillar = ( + pathlib.Path(master.config["pillar_roots"]["base"][0]) + / "post_join_synced.sls" + ) + assert dst_sls.is_file(), ( + f"master {addr} never received post_join_synced.sls " + f"(looked under {dst_sls.parent})" + ) + assert dst_pillar.is_file(), ( + f"master {addr} never received post_join_synced.sls pillar " + f"(looked under {dst_pillar.parent})" + ) + assert ( + marker in dst_sls.read_text() + ), f"master {addr} received the file but marker is wrong" + assert ( + marker in dst_pillar.read_text() + ), f"master {addr} received the pillar but marker is wrong" + + +def test_isolated_sync_roots_runner_file_only( + cluster_master_1_isolated, + cluster_master_2_isolated, + cluster_master_3_isolated, +): + """ + ``cluster.sync_roots roots=file`` syncs only ``file_roots``; pillars + on the peers are unchanged. Pins the channel-filter contract so an + operator who only wanted SLS updates doesn't accidentally fan out + secret pillar data. + """ + src_file_root = pathlib.Path( + cluster_master_1_isolated.config["file_roots"]["base"][0] + ) + src_pillar_root = pathlib.Path( + cluster_master_1_isolated.config["pillar_roots"]["base"][0] + ) + + marker = f"file-only-{int(time.time())}" + file_path = src_file_root / "file_only.sls" + pillar_path = src_pillar_root / "file_only_pillar.sls" + file_path.write_text(f"id:\n test.nop:\n - name: {marker}\n") + pillar_path.write_text(f"never_fanned_out: {marker}\n") + + salt_run = cluster_master_1_isolated.salt_run_cli(timeout=60) + ret = salt_run.run("cluster.sync_roots", "roots=file") + assert ret.returncode == 0 + + peers = (cluster_master_2_isolated, cluster_master_3_isolated) + + # Wait for the file_roots side to land; we deliberately do not wait + # for the pillar (because it shouldn't land at all). + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: + if all( + ( + pathlib.Path(m.config["file_roots"]["base"][0]) / "file_only.sls" + ).is_file() + for m in peers + ): + break + time.sleep(0.5) + + for master in peers: + addr = master.config["interface"] + dst_file = ( + pathlib.Path(master.config["file_roots"]["base"][0]) / "file_only.sls" + ) + dst_pillar = ( + pathlib.Path(master.config["pillar_roots"]["base"][0]) + / "file_only_pillar.sls" + ) + assert ( + dst_file.is_file() + ), f"master {addr} did not receive file_only.sls via roots=file" + assert not dst_pillar.is_file(), ( + f"master {addr} received file_only_pillar.sls despite roots=file " + "(pillar should be excluded)" + ) diff --git a/tests/pytests/unit/runners/test_cluster_runner.py b/tests/pytests/unit/runners/test_cluster_runner.py index 717f7edfc11..9f66167a6fd 100644 --- a/tests/pytests/unit/runners/test_cluster_runner.py +++ b/tests/pytests/unit/runners/test_cluster_runner.py @@ -231,3 +231,99 @@ def test_members_skips_non_config_entries(_runner_opts): # version stamp is from the CONFIG entry, not the trailing # non-membership entries. assert result["membership_version"] == 0 + + +# --------------------------------------------------------------------------- +# cluster.sync_roots — operator-driven content fan-out +# --------------------------------------------------------------------------- + + +def test_sync_roots_rejects_invalid_roots(_runner_opts): + """ + ``roots`` is constrained to ``{"file", "pillar", "both"}``. Anything + else is rejected up-front so the operator doesn't silently fire a + no-op event. + """ + _runner_opts["cluster_id"] = "test_cluster" + with pytest.raises(ValueError, match="roots must be"): + cluster_runner.sync_roots(roots="everything") + + +def test_sync_roots_no_cluster_id_is_skip(_runner_opts): + """ + A non-cluster master returns a structured skip rather than + firing a meaningless event. Lets ops automation call this runner + unconditionally without breaking standalone masters. + """ + _runner_opts["cluster_id"] = None + result = cluster_runner.sync_roots() + assert result["status"] == "skipped" + assert "no cluster_id" in result["reason"] + + +def test_sync_roots_fires_local_event(_runner_opts, monkeypatch): + """ + The happy path: the runner fires a ``cluster/runner/sync_roots`` + event with the resolved channel list. The master daemon (not the + runner subprocess) is responsible for the actual fan-out — the + runner's job is just to make the request loudly enough that the + daemon picks it up. + """ + _runner_opts["cluster_id"] = "test_cluster" + fired = [] + + class _FakeEvent: + def __init__(self, *a, **kw): + pass + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def fire_event(self, data, tag): + fired.append((tag, data)) + + import salt.utils.event + + monkeypatch.setattr(salt.utils.event, "get_event", lambda *a, **kw: _FakeEvent()) + + result = cluster_runner.sync_roots(roots="both") + assert result["status"] == "fan-out initiated" + assert result["channels"] == ["file_roots", "pillar_roots"] + assert len(fired) == 1 + tag, data = fired[0] + assert tag == "cluster/runner/sync_roots" + assert data == {"channels": ["file_roots", "pillar_roots"]} + + +def test_sync_roots_file_only_filters_channels(_runner_opts, monkeypatch): + """ + ``roots="file"`` requests only the file_roots channel; pillar_roots + is excluded from the runner's event payload so the daemon doesn't + push pillars when the operator only wanted SLS. + """ + _runner_opts["cluster_id"] = "test_cluster" + fired = [] + + class _FakeEvent: + def __init__(self, *a, **kw): + pass + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def fire_event(self, data, tag): + fired.append((tag, data)) + + import salt.utils.event + + monkeypatch.setattr(salt.utils.event, "get_event", lambda *a, **kw: _FakeEvent()) + + result = cluster_runner.sync_roots(roots="file") + assert result["channels"] == ["file_roots"] + assert fired[0][1] == {"channels": ["file_roots"]}