Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions src/exo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Node:
node_id: NodeId
offline: bool
_api_port: int
_no_master: bool
_tg: TaskGroup = field(init=False, default_factory=TaskGroup)

@classmethod
Expand Down Expand Up @@ -112,22 +113,28 @@ async def create(cls, args: "Args") -> Self:
else:
worker = None

# We start every node with a master
master = Master(
node_id,
session_id,
event_sender=event_router.sender(),
global_event_sender=router.sender(topics.GLOBAL_EVENTS),
local_event_receiver=router.receiver(topics.LOCAL_EVENTS),
command_receiver=router.receiver(topics.COMMANDS),
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
)
# We start every node with a master, unless --no-master is set
if args.no_master:
master = None
else:
master = Master(
node_id,
session_id,
event_sender=event_router.sender(),
global_event_sender=router.sender(topics.GLOBAL_EVENTS),
local_event_receiver=router.receiver(topics.LOCAL_EVENTS),
command_receiver=router.receiver(topics.COMMANDS),
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
)

er_send, er_recv = channel[ElectionResult]()
election = Election(
node_id,
# If someone manages to assemble 1 MILLION devices into an exo cluster then. well done. good job champ.
seniority=1_000_000 if args.force_master else 0,
# --no-api nodes (coordinators) must not self-elect as master; any API-bearing node beats them.
# --no-master explicitly prevents self-election even during solo partitions.
is_candidate=args.spawn_api and not args.no_master,
# nb: this DOES feedback right now. i have thoughts on how to address this,
# but ultimately it seems not worth the complexity
election_message_sender=router.sender(topics.ELECTION_MESSAGES),
Expand All @@ -149,6 +156,7 @@ async def create(cls, args: "Args") -> Self:
node_id,
args.offline,
args.api_port,
args.no_master,
)

async def run(self):
Expand Down Expand Up @@ -209,6 +217,14 @@ async def _elect_loop(self):
"cannot be new master if we remain master"
)
logger.info("Node elected Master - maintaining self")
elif (
result.session_id.master_node_id == self.node_id
and self.master is None
and self._no_master
):
logger.warning(
"Node won election but --no-master is set. Refusing promotion."
)
elif (
result.session_id.master_node_id == self.node_id
and self.master is None
Expand Down Expand Up @@ -377,6 +393,7 @@ def main_inner(args: "Args"):
class Args(FrozenModel):
verbosity: int = 0
force_master: bool = False
no_master: bool = False
spawn_api: bool = False
api_port: PositiveInt = 52415
tb_only: bool = False
Expand Down Expand Up @@ -414,6 +431,12 @@ def parse(cls) -> Self:
action="store_true",
dest="force_master",
)
parser.add_argument(
"--no-master",
action="store_true",
dest="no_master",
help="Prevent this node from ever becoming master (worker-only mode)",
)
parser.add_argument(
"--no-api",
action="store_false",
Expand Down
14 changes: 14 additions & 0 deletions src/exo/shared/election.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
# If we aren't a candidate, simply don't increment seniority.
# For reference: This node can be elected master if all nodes are not master candidates
# Any master candidate will automatically win out over this node.
self.is_candidate = is_candidate
self.seniority = seniority if is_candidate else -1
self.clock = 0
self.node_id = node_id
Expand Down Expand Up @@ -253,6 +254,19 @@ async def _campaign(

def _election_status(self, clock: int | None = None) -> ElectionMessage:
c = self.clock if clock is None else clock
# Non-candidate nodes must never propose themselves as master.
# Re-propose the last known master instead. During a solo partition
# this prevents the node from winning by default.
if (
not self.is_candidate
and self.current_session.master_node_id != self.node_id
):
return ElectionMessage(
proposed_session=self.current_session,
clock=c,
seniority=self.seniority,
commands_seen=self.commands_seen,
)
return ElectionMessage(
proposed_session=(
self.current_session
Expand Down
25 changes: 25 additions & 0 deletions src/exo/shared/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
"""Pytest configuration and shared fixtures for shared package tests."""

import asyncio
import sys
import types
from typing import Generator
from unittest.mock import MagicMock

# Stub the exo_rs Rust extension so tests can run without a compiled binary.
# Only installed when the real extension is not already available.
if "exo_rs" not in sys.modules:
_stub = types.ModuleType("exo_rs")

class _FromSwarm:
class Connection:
peer_id: str = ""
connected: bool = False

_stub.FromSwarm = _FromSwarm # type: ignore[attr-defined]
_stub.AllQueuesFullError = type("AllQueuesFullError", (Exception,), {}) # type: ignore[attr-defined]
_stub.MessageTooLargeError = type("MessageTooLargeError", (Exception,), {}) # type: ignore[attr-defined]
_stub.NoPeersSubscribedToTopicError = type(
"NoPeersSubscribedToTopicError", (Exception,), {}
) # type: ignore[attr-defined]
_stub.Keypair = MagicMock # type: ignore[attr-defined]
_stub.NetworkingHandle = MagicMock # type: ignore[attr-defined]
_stub.Pidfile = MagicMock # type: ignore[attr-defined]
_stub.PidfileError = type("PidfileError", (Exception,), {}) # type: ignore[attr-defined]
sys.modules["exo_rs"] = _stub

import pytest
from _pytest.logging import LogCaptureFixture
Expand Down