Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 249 additions & 1 deletion salt/channel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,157 @@ def _start_raft_as_learner(self, known_peers):
def gen_token(self):
return "".join(random.choices(string.ascii_letters + string.digits, k=32))

async def _run_root_sync_to_peers(self, channels):
"""
Operator-driven push of ``file_roots`` and/or ``pillar_roots`` to
every peer in the cluster. Triggered by the ``cluster.sync_roots``
runner via the ``cluster/runner/sync_roots`` local event.

For each peer:

1. Allocate a fresh session id.
2. Emit a ``cluster/peer/sync-roots-begin`` event to the peer so
the receiver pre-registers a state-sync session — same
contract as the join-reply flow, but the receiver's
``on_complete`` is a no-op (this is an ad-hoc push, not a
Raft-learner bootstrap).
3. Stream the requested channels to that peer via the standard
state-sync chunk format.

Errors per-peer are logged but do not stop the fan-out — a
partially-online cluster can still receive the update on
reachable peers.
"""
from salt.cluster.state_sync import ( # pylint: disable=import-outside-toplevel
FILE_ROOTS_CHANNEL,
PILLAR_ROOTS_CHANNEL,
new_session_id,
)

roots_for = {
FILE_ROOTS_CHANNEL: self.opts.get("file_roots"),
PILLAR_ROOTS_CHANNEL: self.opts.get("pillar_roots"),
}
# Honour the channel filter — operator may want only one of the
# two trees synced.
active_channels = [
ch for ch in (FILE_ROOTS_CHANNEL, PILLAR_ROOTS_CHANNEL) if ch in channels
]
if not active_channels:
log.warning(
"cluster.sync_roots: no valid channels (got %r), skipping",
channels,
)
return

crypticle = salt.crypt.Crypticle(
self.opts,
salt.master.SMaster.secrets["cluster_aes"]["secret"].value,
)

for pusher in self.pushers:
await self._send_sync_roots_to_peer(
pusher,
active_channels,
roots_for,
crypticle,
new_session_id(),
)

async def _send_sync_roots_to_peer(
self, pusher, active_channels, roots_for, crypticle, session_id
):
"""
Push one ``cluster.sync_roots`` session to a single peer.

Split out of :meth:`_run_root_sync_to_peers` so the per-peer
loop variables are explicit method arguments — avoids
cell-var-from-loop closure captures on the per-channel send.
"""
from salt.cluster.state_sync import ( # pylint: disable=import-outside-toplevel
iter_root_chunks,
)

peer_id = pusher.pull_host
log.info(
"cluster.sync_roots: starting %s to peer %s (session %s)",
active_channels,
peer_id,
session_id,
)

# Pre-announce the session to the receiver. Encrypted under
# cluster_aes; the receiver decrypts and registers the session
# with an on_complete that tears down the registry entry (vs.
# join-reply's ``_start_raft_as_learner``).
begin_payload = {"session": session_id, "channels": active_channels}
begin_event = salt.utils.event.SaltEvent.pack(
salt.utils.event.tagify("sync-roots-begin", "peer", "cluster"),
crypticle.dumps(begin_payload),
)
try:
await pusher.publish(begin_event)
except Exception: # pylint: disable=broad-except
log.exception(
"cluster.sync_roots: failed to send sync-roots-begin to %s",
peer_id,
)
return

for channel in active_channels:
await self._send_sync_roots_channel(
pusher,
crypticle,
session_id,
peer_id,
channel,
iter_root_chunks(roots_for[channel]),
)

async def _send_sync_roots_channel(
self, pusher, crypticle, session_id, peer_id, channel, chunks
):
"""
Stream one state-sync channel's chunks to a single peer for an
operator-driven ``cluster.sync_roots`` session.
"""
chunks = list(chunks)
if not chunks:
chunks = [[]]
total = len(chunks)
for seq, items in enumerate(chunks):
payload = {
"session": session_id,
"channel": channel,
"seq": seq,
"total": total,
"eof": seq == total - 1,
"items": items,
}
chunk_event = salt.utils.event.SaltEvent.pack(
salt.utils.event.tagify("state-sync-chunk", "peer", "cluster"),
crypticle.dumps(payload),
)
try:
await pusher.publish(chunk_event)
except Exception: # pylint: disable=broad-except
log.exception(
"cluster.sync_roots: chunk %s/%s seq=%d to %s failed",
session_id,
channel,
seq,
peer_id,
)
return
log.info(
"cluster.sync_roots: %s/%s sent %d chunks (%d items) to %s",
session_id,
channel,
total,
sum(len(c) for c in chunks),
peer_id,
)

async def _send_state_sync_chunks(self, session_id, peer_id):
"""
Stream the four state-sync channels (keys, denied_keys,
Expand Down Expand Up @@ -1866,6 +2017,75 @@ def _on_complete():
session_id,
)

def _begin_root_sync_session(self, session_id, channels):
"""
Pre-register a state-sync session for an operator-driven
``cluster.sync_roots`` push.

Mirrors ``_begin_state_sync_session`` but the on-complete is a
plain teardown (no Raft-learner bootstrap, no discover_event to
set). Idempotent: a duplicate ``sync-roots-begin`` for an
already-registered session is logged and ignored.
"""
from salt.cluster.state_sync import ( # pylint: disable=import-outside-toplevel
ALL_CHANNELS,
DEFAULT_RECEIVE_TIMEOUT,
StateSyncSession,
)

if not session_id:
log.warning("sync-roots-begin without session_id; dropping")
return
if not hasattr(self, "_state_sync_sessions"):
self._state_sync_sessions = {}
if session_id in self._state_sync_sessions:
log.warning(
"Duplicate sync-roots-begin session id %s; ignoring", session_id
)
return

# Defensive: empty channels list defaults to all four (matches
# join-time semantics).
if not channels:
channels = list(ALL_CHANNELS)

completed_holder = {"done": False}

def _on_complete():
if completed_holder["done"]:
return
completed_holder["done"] = True
log.info(
"cluster.sync_roots: session %s complete (channels=%s)",
session_id,
channels,
)
handle = session.watchdog_handle
if handle is not None:
try:
handle.cancel()
except Exception: # pylint: disable=broad-except
pass
self._state_sync_sessions.pop(session_id, None)

# The StateSyncSession state machine tracks per-channel eof. We
# tell it about only the channels we expect — when each fires
# eof, the session triggers on_complete.
session = StateSyncSession(session_id, _on_complete, channels=channels)
session.watchdog_handle = None
self._state_sync_sessions[session_id] = session

try:
loop = asyncio.get_event_loop()
session.watchdog_handle = loop.call_later(
DEFAULT_RECEIVE_TIMEOUT, session.force_complete
)
except RuntimeError:
log.warning(
"cluster.sync_roots: no event loop for watchdog on session %s",
session_id,
)

def _join_sentinel_path(self):
"""
Return the path to the per-master join sentinel file.
Expand Down Expand Up @@ -2240,6 +2460,27 @@ async def handle_pool_publish(self, payload):
return
self._apply_state_sync_chunk(chunk)
return
if tag.startswith("cluster/peer/sync-roots-begin"):
# Operator-driven push from a peer (via the
# ``cluster.sync_roots`` runner). Pre-register a state-
# sync session keyed by the announced session_id so the
# subsequent ``state-sync-chunk`` events have somewhere
# to land. ``on_complete`` here is a no-op (just removes
# the registry entry) because this is an ad-hoc content
# push, not a join-time Raft-learner bootstrap.
try:
crypticle = salt.crypt.Crypticle(
self.opts,
salt.master.SMaster.secrets["cluster_aes"]["secret"].value,
)
begin = crypticle.loads(data)
except Exception: # pylint: disable=broad-except
log.exception("Failed to decrypt sync-roots-begin")
return
self._begin_root_sync_session(
begin.get("session"), begin.get("channels") or []
)
return
if tag.startswith("cluster/peer/join-notify"):
# join-notify is encrypted with the shared cluster AES key.
try:
Expand Down Expand Up @@ -2810,7 +3051,14 @@ def extract_cluster_event(self, peer_id, data):

async def publish_payload(self, load, *args):
tag, data = salt.utils.event.SaltEvent.unpack(load)
# log.warning("Event %s %s %r", len(self.pushers), tag, data)
# Operator-triggered cluster operations originate as ``cluster/runner/*``
# events fired by the runner subprocess. Intercept them here so the
# event is consumed locally rather than broadcast as a regular
# cluster event.
if tag == "cluster/runner/sync_roots":
channels = data.get("channels") or ["file_roots", "pillar_roots"]
asyncio.create_task(self._run_root_sync_to_peers(channels))
return
tasks = []
if not tag.startswith("cluster/peer"):
tasks = [
Expand Down
73 changes: 73 additions & 0 deletions salt/runners/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,79 @@ def _read_health_sentinel():
return {}


def sync_roots(roots="both"):
"""
Push this master's ``file_roots`` and/or ``pillar_roots`` to every
other cluster master.

Runs the operator-driven counterpart of the bulk state-sync that
fires automatically during a cluster join. Use it when the
canonical content on this master has changed and you want every
peer to pick up the new files without restarting them or waiting
for the next join handshake.

The runner fires a local event; the master daemon picks it up and
fans out chunks to every peer over the encrypted cluster pub bus
(same transport as the join-time state-sync). Returns immediately
after the event is fired — the actual sync runs asynchronously in
the master process. Check each peer's master log for the
``state-sync ... installed N items`` lines to confirm delivery.

:param roots: ``"file"``, ``"pillar"``, or ``"both"`` (default
``"both"``). Selects which content trees to sync.

CLI Example:

.. code-block:: bash

# Push both file_roots and pillar_roots to all peers
salt-run cluster.sync_roots

# Push only file_roots
salt-run cluster.sync_roots roots=file

.. versionadded:: 3009.0
"""
if roots not in ("file", "pillar", "both"):
raise ValueError(f"roots must be 'file', 'pillar', or 'both', got {roots!r}")
# Lazy imports — keep the runner module light.
import salt.utils.event # pylint: disable=import-outside-toplevel

cluster_id = __opts__.get("cluster_id")
if not cluster_id:
return {
"status": "skipped",
"reason": "no cluster_id configured; this master is not a cluster member",
}

channels = []
if roots in ("file", "both"):
channels.append("file_roots")
if roots in ("pillar", "both"):
channels.append("pillar_roots")

with salt.utils.event.get_event(
"master",
sock_dir=__opts__["sock_dir"],
opts=__opts__,
listen=False,
) as event:
event.fire_event(
{"channels": channels},
"cluster/runner/sync_roots",
)
return {
"status": "fan-out initiated",
"channels": channels,
"cluster_id": cluster_id,
"note": (
"The sync runs asynchronously inside the master daemon. Tail "
"each peer's master log for the 'state-sync ... installed N items' "
"lines to confirm delivery."
),
}


def ring_info():
"""
Return a snapshot of this master's ring state.
Expand Down
Loading
Loading