diff --git a/src/exo/main.py b/src/exo/main.py index 3b27418365..2da181b3ee 100644 --- a/src/exo/main.py +++ b/src/exo/main.py @@ -46,6 +46,7 @@ class Node: node_id: NodeId offline: bool _api_port: int + _no_master: bool _tg: TaskGroup = field(init=False, default_factory=TaskGroup) @classmethod @@ -112,22 +113,28 @@ async def create(cls, args: "Args") -> Self: else: worker = None - # We start every node with a master - master = Master( - node_id, - session_id, - event_sender=event_router.sender(), - global_event_sender=router.sender(topics.GLOBAL_EVENTS), - local_event_receiver=router.receiver(topics.LOCAL_EVENTS), - command_receiver=router.receiver(topics.COMMANDS), - download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS), - ) + # We start every node with a master, unless --no-master is set + if args.no_master: + master = None + else: + master = Master( + node_id, + session_id, + event_sender=event_router.sender(), + global_event_sender=router.sender(topics.GLOBAL_EVENTS), + local_event_receiver=router.receiver(topics.LOCAL_EVENTS), + command_receiver=router.receiver(topics.COMMANDS), + download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS), + ) er_send, er_recv = channel[ElectionResult]() election = Election( node_id, # If someone manages to assemble 1 MILLION devices into an exo cluster then. well done. good job champ. seniority=1_000_000 if args.force_master else 0, + # --no-api nodes (coordinators) must not self-elect as master; any API-bearing node beats them. + # --no-master explicitly prevents self-election even during solo partitions. + is_candidate=args.spawn_api and not args.no_master, # nb: this DOES feedback right now. i have thoughts on how to address this, # but ultimately it seems not worth the complexity election_message_sender=router.sender(topics.ELECTION_MESSAGES), @@ -149,6 +156,7 @@ async def create(cls, args: "Args") -> Self: node_id, args.offline, args.api_port, + args.no_master, ) async def run(self): @@ -209,6 +217,14 @@ async def _elect_loop(self): "cannot be new master if we remain master" ) logger.info("Node elected Master - maintaining self") + elif ( + result.session_id.master_node_id == self.node_id + and self.master is None + and self._no_master + ): + logger.warning( + "Node won election but --no-master is set. Refusing promotion." + ) elif ( result.session_id.master_node_id == self.node_id and self.master is None @@ -377,6 +393,7 @@ def main_inner(args: "Args"): class Args(FrozenModel): verbosity: int = 0 force_master: bool = False + no_master: bool = False spawn_api: bool = False api_port: PositiveInt = 52415 tb_only: bool = False @@ -414,6 +431,12 @@ def parse(cls) -> Self: action="store_true", dest="force_master", ) + parser.add_argument( + "--no-master", + action="store_true", + dest="no_master", + help="Prevent this node from ever becoming master (worker-only mode)", + ) parser.add_argument( "--no-api", action="store_false", diff --git a/src/exo/shared/election.py b/src/exo/shared/election.py index 958a83d2fa..dcd930c7b2 100644 --- a/src/exo/shared/election.py +++ b/src/exo/shared/election.py @@ -61,6 +61,7 @@ def __init__( # If we aren't a candidate, simply don't increment seniority. # For reference: This node can be elected master if all nodes are not master candidates # Any master candidate will automatically win out over this node. + self.is_candidate = is_candidate self.seniority = seniority if is_candidate else -1 self.clock = 0 self.node_id = node_id @@ -253,6 +254,19 @@ async def _campaign( def _election_status(self, clock: int | None = None) -> ElectionMessage: c = self.clock if clock is None else clock + # Non-candidate nodes must never propose themselves as master. + # Re-propose the last known master instead. During a solo partition + # this prevents the node from winning by default. + if ( + not self.is_candidate + and self.current_session.master_node_id != self.node_id + ): + return ElectionMessage( + proposed_session=self.current_session, + clock=c, + seniority=self.seniority, + commands_seen=self.commands_seen, + ) return ElectionMessage( proposed_session=( self.current_session diff --git a/src/exo/shared/tests/conftest.py b/src/exo/shared/tests/conftest.py index b0ffb08d82..f9ccc3c90e 100644 --- a/src/exo/shared/tests/conftest.py +++ b/src/exo/shared/tests/conftest.py @@ -1,7 +1,32 @@ """Pytest configuration and shared fixtures for shared package tests.""" import asyncio +import sys +import types from typing import Generator +from unittest.mock import MagicMock + +# Stub the exo_rs Rust extension so tests can run without a compiled binary. +# Only installed when the real extension is not already available. +if "exo_rs" not in sys.modules: + _stub = types.ModuleType("exo_rs") + + class _FromSwarm: + class Connection: + peer_id: str = "" + connected: bool = False + + _stub.FromSwarm = _FromSwarm # type: ignore[attr-defined] + _stub.AllQueuesFullError = type("AllQueuesFullError", (Exception,), {}) # type: ignore[attr-defined] + _stub.MessageTooLargeError = type("MessageTooLargeError", (Exception,), {}) # type: ignore[attr-defined] + _stub.NoPeersSubscribedToTopicError = type( + "NoPeersSubscribedToTopicError", (Exception,), {} + ) # type: ignore[attr-defined] + _stub.Keypair = MagicMock # type: ignore[attr-defined] + _stub.NetworkingHandle = MagicMock # type: ignore[attr-defined] + _stub.Pidfile = MagicMock # type: ignore[attr-defined] + _stub.PidfileError = type("PidfileError", (Exception,), {}) # type: ignore[attr-defined] + sys.modules["exo_rs"] = _stub import pytest from _pytest.logging import LogCaptureFixture