From a972339748a55c2b17ddf9fe332ab73b2005052f Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Thu, 28 May 2026 17:40:13 +0100 Subject: [PATCH 01/18] runner-unique paths --- src/exo/shared/constants.py | 2 - src/exo/worker/runner/supervisor.py | 87 ++++++++++++++--------------- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/src/exo/shared/constants.py b/src/exo/shared/constants.py index fd0869c465..ec89c1a509 100644 --- a/src/exo/shared/constants.py +++ b/src/exo/shared/constants.py @@ -69,8 +69,6 @@ def _parse_colon_dirs(env_var: str) -> tuple[Path, ...]: EXO_LOG_DIR = EXO_CACHE_HOME / "exo_log" EXO_LOG = EXO_LOG_DIR / "exo.log" EXO_RUNNER_LOG_DIR = EXO_LOG_DIR / "runner_log" -EXO_RUNNER_STDOUT_LOG = EXO_RUNNER_LOG_DIR / "stdout.log" -EXO_RUNNER_STDERR_LOG = EXO_RUNNER_LOG_DIR / "stderr.log" EXO_TEST_LOG = EXO_CACHE_HOME / "exo_test.log" EXO_PID_FILE = EXO_CACHE_HOME / "exo.pid" diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index 9611262473..9d49efcb2b 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -2,8 +2,8 @@ import contextlib import signal from dataclasses import dataclass, field -from os import PathLike -from typing import Callable, Self +from pathlib import Path +from typing import Self import anyio from anyio import ( @@ -12,9 +12,12 @@ CancelScope, ClosedResourceError, ) +from anyio.streams.text import TextReceiveStream from loguru import logger -from exo.shared.constants import EXO_RUNNER_STDERR_LOG, EXO_RUNNER_STDOUT_LOG +from exo.shared.constants import ( + EXO_RUNNER_LOG_DIR, +) from exo.shared.types.chunks import ErrorChunk from exo.shared.types.events import ( ChunkGenerated, @@ -60,9 +63,9 @@ @dataclass(eq=False) class RunnerStdioHandler: + _bound_instance: BoundInstance _stdout_rx: Receiver[bytes] _stderr_rx: Receiver[bytes] - _stdout_log: AsyncFile[str] _stderr_log: AsyncFile[str] diagnostics: RunnerDiagnosticCollector = field( default_factory=RunnerDiagnosticCollector @@ -74,25 +77,24 @@ class RunnerStdioHandler: async def create( cls, *, + bound_instance: BoundInstance, stdout_rx: Receiver[bytes], stderr_rx: Receiver[bytes], - stdout_log_path: PathLike[str] = EXO_RUNNER_STDOUT_LOG, - stderr_log_path: PathLike[str] = EXO_RUNNER_STDERR_LOG, + runner_log_dir: Path = EXO_RUNNER_LOG_DIR, ) -> Self: - # these are append only logs used to gather data for log template mining - # - # TODO: in the future use [Drain3](https://github.com/logpai/Drain3) - # to mine these logs - ensure_parent_directory_exists(stdout_log_path) + # create file in log_dir//.stderr.log + stderr_log_path = ( + runner_log_dir + / bound_instance.instance.instance_id + / f"{bound_instance.bound_runner_id}.stderr.log" + ) ensure_parent_directory_exists(stderr_log_path) - stdout_log = await anyio.open_file(stdout_log_path, "a") - stderr_log = await anyio.open_file(stderr_log_path, "a") + stderr_log = await anyio.open_file(stderr_log_path, "w+") - # instantiate and return self = cls( + _bound_instance=bound_instance, _stdout_rx=stdout_rx, _stderr_rx=stderr_rx, - _stdout_log=stdout_log, _stderr_log=stderr_log, ) return self @@ -100,32 +102,25 @@ async def create( async def run(self): try: async with self._tg as tg: - tg.start_soon( # pyright: ignore[reportUnknownArgumentType] - self._handle_runner_output, - self._stdout_rx, - self._stdout_log, - lambda line: logger.info(f"Runner stdout: {line}"), # pyright: ignore[reportUnknownLambdaType] - lambda _: None, # pyright: ignore[reportUnknownLambdaType] - ) - tg.start_soon( # pyright: ignore[reportUnknownArgumentType] - self._handle_runner_output, - self._stderr_rx, - self._stderr_log, - lambda line: logger.warning(f"Runner stderr: {line}"), # pyright: ignore[reportUnknownLambdaType] - self.diagnostics.record_line, - ) + tg.start_soon(self._handle_stdout) + tg.start_soon(self._handle_stderr) finally: with CancelScope(shield=True): - await self._stdout_log.aclose() await self._stderr_log.aclose() - async def _handle_runner_output( - self, - rx: Receiver[bytes], - logfile: AsyncFile[str], - log_line: Callable[[str], None], - record_diagnostic_line: Callable[[str], None], - ): + async def _handle_stdout(self): + # We don't expect anything in stdout so even reading this at all is going + # to be quite weird; hence handle it by logging error and the received chunk + + rx = TextReceiveStream(self._stdout_rx, encoding="utf-8", errors="replace") + try: + async with rx: + async for chunk in rx: + logger.error(f"Unexpected runner stdout chunk: {chunk}") + except (ClosedResourceError, BrokenResourceError): + logger.warning("Runner stdio stream closed before clean EOF") + + async def _handle_stderr(self): # The diagnostic collector is deliberately line-level for now. It records # bounded stderr context and known failure anchors; the supervisor # correlates those hints with the runner exit status before surfacing an @@ -142,8 +137,8 @@ async def handle_line(line: str): return # Send to logger & error recovery task - log_line(line) - record_diagnostic_line(line) + logger.warning(f"Runner stderr: {line}") + self.diagnostics.record_line(line) async def handle_text(text: str): nonlocal pending_line @@ -151,8 +146,8 @@ async def handle_text(text: str): if not text: return - await logfile.write(text) - await logfile.flush() + await self._stderr_log.write(text) + await self._stderr_log.flush() # newline buffering pending_line += text @@ -163,15 +158,15 @@ async def handle_text(text: str): await handle_line(line) try: - with rx: - async for chunk in rx: + with self._stderr_rx: + async for chunk in self._stderr_rx: await handle_text(decoder.decode(chunk, final=False)) except (ClosedResourceError, BrokenResourceError): logger.warning("Runner stdio stream closed before clean EOF") finally: with CancelScope(shield=True): await handle_text(decoder.decode(b"", final=True)) - await logfile.flush() + await self._stderr_log.flush() if pending_line: await handle_line(pending_line) @@ -223,7 +218,9 @@ async def create( daemon=True, ) runner_stdio_handler = await RunnerStdioHandler.create( - stdout_rx=runner_process.stdout, stderr_rx=runner_process.stderr + bound_instance=bound_instance, + stdout_rx=runner_process.stdout, + stderr_rx=runner_process.stderr, ) shard_metadata = bound_instance.bound_shard From b67e9892844a567df789885ceab52790a9321443 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Thu, 28 May 2026 19:03:02 +0100 Subject: [PATCH 02/18] telemetry skeleton --- src/exo/shared/telemetry.py | 108 ++++++++++++++++++ src/exo/worker/runner/supervisor.py | 3 + .../test_runner/test_runner_supervisor.py | 10 +- 3 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 src/exo/shared/telemetry.py diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py new file mode 100644 index 0000000000..41910a0ac9 --- /dev/null +++ b/src/exo/shared/telemetry.py @@ -0,0 +1,108 @@ +from dataclasses import dataclass, field +from pathlib import Path +from typing import Self + +from anyio import BrokenResourceError, ClosedResourceError, WouldBlock +from loguru import logger + +from exo.utils.channels import Receiver, Sender, channel +from exo.utils.pydantic_ext import TaggedModel +from exo.utils.task_group import TaskGroup + +CHANNEL_BOUND_SIZE = 64 + + +class BaseTelemetrySubmission(TaggedModel): + pass + + +class TestSubmission(BaseTelemetrySubmission): + pass + + +class RunnerStderrSubmission(BaseTelemetrySubmission): + path: Path + + +TelemetrySubmission = TestSubmission | RunnerStderrSubmission + + +@dataclass(eq=False) +class TelemetrySink: + """ + A non-blocking non-throwing bounded wrapper around sender/receiver channels + to ensure telemetry never blocks or has adverse side-effects, since telemetry + is an optional diagnostic feature and hence should never break the main app. + """ + + _send: Sender[TelemetrySubmission] + + @classmethod + def pair(cls) -> tuple[Self, Receiver[TelemetrySubmission]]: + send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE) + return cls(_send=send), recv + + def submit(self, submission: TelemetrySubmission): + try: + self._send.send_nowait(submission) + except WouldBlock: + logger.debug("Telemetry submission would block. why so many submissions??") + except (BrokenResourceError, ClosedResourceError): + logger.debug("Telemetry submission receivers are broken or closed. why??") + + +@dataclass(eq=False) +class TelemetryService: + _send: Sender[TelemetrySubmission] + _recv: Receiver[TelemetrySubmission] + _tg: TaskGroup = field(default_factory=TaskGroup, init=False) + + @classmethod + def create(cls) -> Self: + send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE) + + return cls( + _send=send, + _recv=recv, + ) + + async def run(self): + try: + async with self._tg as tg: + tg.start_soon(self._process) + finally: + self._send.close() + self._recv.close() + + async def _process(self): + with self._recv as submissions: + async for submission in submissions: + try: + await self._process_submission(submission) + except Exception as e: + logger.opt(exception=e).warning( + "Exception when processing telemetry submission" + ) + + async def _process_submission(self, submission: TelemetrySubmission): + match submission: + case TestSubmission(): + pass + case RunnerStderrSubmission(): + pass + + def sink(self) -> TelemetrySink: + sink, recv = TelemetrySink.pair() + if self._tg.is_running(): + self._tg.start_soon(self._ingest, recv) + else: + self._tg.queue(self._ingest, recv) + return sink + + async def _ingest(self, recv: Receiver[TelemetrySubmission]): + try: + with recv as submissions: + async for submission in submissions: + await self._send.send(submission) + except ClosedResourceError: + pass diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index 9d49efcb2b..fdb682fd17 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -89,6 +89,9 @@ async def create( / f"{bound_instance.bound_runner_id}.stderr.log" ) ensure_parent_directory_exists(stderr_log_path) + # TODO: rotate an existing stderr log before truncating it. The same + # instance/runner id can be reused if a supervisor is recreated for an + # existing assignment. stderr_log = await anyio.open_file(stderr_log_path, "w+") self = cls( diff --git a/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py b/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py index 87cb9c7441..c770150063 100644 --- a/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py +++ b/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import cast import anyio @@ -36,7 +37,9 @@ def is_alive(self) -> bool: @pytest.mark.anyio -async def test_check_runner_emits_error_chunk_for_inflight_text_generation() -> None: +async def test_check_runner_emits_error_chunk_for_inflight_text_generation( + tmp_path: Path, +) -> None: event_sender, event_receiver = channel[Event]() task_sender, _ = mp_channel[Task]() cancel_sender, _ = mp_channel[TaskId]() @@ -51,7 +54,10 @@ async def test_check_runner_emits_error_chunk_for_inflight_text_generation() -> proc = cast(AsyncProcess, cast(object, _DeadProcess())) handler = await RunnerStdioHandler.create( - stdout_rx=proc.stdout, stderr_rx=proc.stderr + bound_instance=bound_instance, + stdout_rx=proc.stdout, + stderr_rx=proc.stderr, + runner_log_dir=tmp_path, ) supervisor = RunnerSupervisor( shard_metadata=bound_instance.bound_shard, From f387f54cc9155d8ae1d1ab5d3637b7c398635689 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Thu, 28 May 2026 19:42:31 +0100 Subject: [PATCH 03/18] supervisor wiring --- src/exo/shared/telemetry.py | 21 ++++++++++++++------- src/exo/worker/runner/supervisor.py | 27 +++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index 41910a0ac9..c6ad7a0eab 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -53,19 +53,25 @@ def submit(self, submission: TelemetrySubmission): @dataclass(eq=False) class TelemetryService: + dry_run: bool _send: Sender[TelemetrySubmission] _recv: Receiver[TelemetrySubmission] _tg: TaskGroup = field(default_factory=TaskGroup, init=False) @classmethod - def create(cls) -> Self: + def create(cls, dry_run: bool) -> Self: send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE) return cls( + dry_run=dry_run, _send=send, _recv=recv, ) + @classmethod + def dummy(cls) -> Self: + return cls.create(True) + async def run(self): try: async with self._tg as tg: @@ -77,12 +83,13 @@ async def run(self): async def _process(self): with self._recv as submissions: async for submission in submissions: - try: - await self._process_submission(submission) - except Exception as e: - logger.opt(exception=e).warning( - "Exception when processing telemetry submission" - ) + if not self.dry_run: + try: + await self._process_submission(submission) + except Exception as e: + logger.opt(exception=e).warning( + "Exception when processing telemetry submission" + ) async def _process_submission(self, submission: TelemetrySubmission): match submission: diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index fdb682fd17..e4ddcf92a7 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -2,6 +2,7 @@ import contextlib import signal from dataclasses import dataclass, field +from datetime import datetime, timezone from pathlib import Path from typing import Self @@ -18,6 +19,7 @@ from exo.shared.constants import ( EXO_RUNNER_LOG_DIR, ) +from exo.shared.telemetry import RunnerStderrSubmission, TelemetrySink from exo.shared.types.chunks import ErrorChunk from exo.shared.types.events import ( ChunkGenerated, @@ -66,7 +68,9 @@ class RunnerStdioHandler: _bound_instance: BoundInstance _stdout_rx: Receiver[bytes] _stderr_rx: Receiver[bytes] + _stderr_log_path: Path _stderr_log: AsyncFile[str] + _telemetry: TelemetrySink diagnostics: RunnerDiagnosticCollector = field( default_factory=RunnerDiagnosticCollector ) @@ -80,13 +84,16 @@ async def create( bound_instance: BoundInstance, stdout_rx: Receiver[bytes], stderr_rx: Receiver[bytes], + telemetry_sink: TelemetrySink, runner_log_dir: Path = EXO_RUNNER_LOG_DIR, ) -> Self: - # create file in log_dir//.stderr.log + # create file in ///.stderr.log + now = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S_%fZ") stderr_log_path = ( runner_log_dir / bound_instance.instance.instance_id - / f"{bound_instance.bound_runner_id}.stderr.log" + / bound_instance.bound_runner_id + / f"{now}.stderr.log" ) ensure_parent_directory_exists(stderr_log_path) # TODO: rotate an existing stderr log before truncating it. The same @@ -98,7 +105,9 @@ async def create( _bound_instance=bound_instance, _stdout_rx=stdout_rx, _stderr_rx=stderr_rx, + _stderr_log_path=stderr_log_path, _stderr_log=stderr_log, + _telemetry=telemetry_sink, ) return self @@ -111,6 +120,18 @@ async def run(self): with CancelScope(shield=True): await self._stderr_log.aclose() + # send off telemetry submission when runner stdio dies; + # it may have been for entirely innocuous reasons or + # the log may have nothing in it, but its submitted regardless + self._telemetry.submit( + RunnerStderrSubmission( + path=self._stderr_log_path, + ) + ) + + def shutdown(self): + self._tg.cancel_tasks() + async def _handle_stdout(self): # We don't expect anything in stdout so even reading this at all is going # to be quite weird; hence handle it by logging error and the received chunk @@ -203,6 +224,7 @@ async def create( *, bound_instance: BoundInstance, event_sender: Sender[Event], + telemetry_sink: TelemetrySink, initialize_timeout: float = 400, ) -> Self: ev_send, ev_recv = mp_channel[Event | RunnerTerminationError]() @@ -224,6 +246,7 @@ async def create( bound_instance=bound_instance, stdout_rx=runner_process.stdout, stderr_rx=runner_process.stderr, + telemetry_sink=telemetry_sink, ) shard_metadata = bound_instance.bound_shard From 354aa1be282e2d690b2ae4a23d29c265db6bae94 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Thu, 28 May 2026 19:43:24 +0100 Subject: [PATCH 04/18] no more todo --- src/exo/worker/runner/supervisor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index e4ddcf92a7..594a55785f 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -96,10 +96,7 @@ async def create( / f"{now}.stderr.log" ) ensure_parent_directory_exists(stderr_log_path) - # TODO: rotate an existing stderr log before truncating it. The same - # instance/runner id can be reused if a supervisor is recreated for an - # existing assignment. - stderr_log = await anyio.open_file(stderr_log_path, "w+") + stderr_log = await anyio.open_file(stderr_log_path, "w") self = cls( _bound_instance=bound_instance, From 536f69b32d91ff6a9175114fc7bd9207e5ef040e Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Thu, 28 May 2026 19:47:31 +0100 Subject: [PATCH 05/18] clone --- src/exo/shared/telemetry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index c6ad7a0eab..d2f46ff13e 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -50,6 +50,9 @@ def submit(self, submission: TelemetrySubmission): except (BrokenResourceError, ClosedResourceError): logger.debug("Telemetry submission receivers are broken or closed. why??") + def clone(self) -> "TelemetrySink": + return TelemetrySink(_send=self._send.clone()) + @dataclass(eq=False) class TelemetryService: From 6969c21b907ba6796a0376c61f99fd0fe84680b4 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Thu, 28 May 2026 19:53:14 +0100 Subject: [PATCH 06/18] wired up telemetry --- src/exo/main.py | 13 +++++++++++++ src/exo/shared/telemetry.py | 5 +++++ src/exo/worker/main.py | 5 +++++ src/exo/worker/runner/supervisor.py | 1 + .../unittests/test_runner/test_runner_supervisor.py | 3 +++ 5 files changed, 27 insertions(+) diff --git a/src/exo/main.py b/src/exo/main.py index 86a46561be..e95d3f989e 100644 --- a/src/exo/main.py +++ b/src/exo/main.py @@ -24,6 +24,7 @@ from exo.shared.constants import EXO_DEFAULT_MODELS_DIR, EXO_LOG, EXO_PID_FILE from exo.shared.election import Election, ElectionResult from exo.shared.logging import logger_cleanup, logger_setup +from exo.shared.telemetry import TelemetryService from exo.shared.types.common import NodeId, SessionId from exo.utils import STDIO_FDS from exo.utils.channels import Receiver, channel @@ -42,6 +43,7 @@ class Node: election_result_receiver: Receiver[ElectionResult] master: Master | None api: API | None + telemetry: TelemetryService node_id: NodeId offline: bool @@ -70,6 +72,7 @@ async def create(cls, args: "Args") -> Self: external_outbound=router.sender(topics.LOCAL_EVENTS), external_inbound=router.receiver(topics.GLOBAL_EVENTS), ) + telemetry = TelemetryService.create(dry_run=not args.telemetry) logger.info(f"Starting node {node_id}") @@ -108,6 +111,7 @@ async def create(cls, args: "Args") -> Self: command_sender=router.sender(topics.COMMANDS), download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS), api_port=args.api_port, + telemetry_sink=telemetry.sink(), ) else: worker = None @@ -146,6 +150,7 @@ async def create(cls, args: "Args") -> Self: er_recv, master, api, + telemetry, node_id, args.offline, args.api_port, @@ -157,6 +162,7 @@ async def run(self): signal.signal(signal.SIGTERM, lambda _, __: self.shutdown()) tg.start_soon(self.router.run) tg.start_soon(self.event_router.run) + tg.start_soon(self.telemetry.run) tg.start_soon(self.election.run) if self.download_coordinator: tg.start_soon(self.download_coordinator.run) @@ -264,6 +270,7 @@ async def _elect_loop(self): topics.DOWNLOAD_COMMANDS ), api_port=self._api_port, + telemetry_sink=self.telemetry.sink(), ) self._tg.start_soon(self.worker.run) if self.api: @@ -384,6 +391,7 @@ class Args(FrozenModel): no_downloads: bool = False offline: bool = os.getenv("EXO_OFFLINE", "false").lower() == "true" no_batch: bool = False + telemetry: bool = False fast_synch: bool | None = None # None = auto, True = force on, False = force off legacy_daemon: bool = False bootstrap_peers: list[str] = [] @@ -445,6 +453,11 @@ def parse(cls) -> Self: action="store_true", help="Disable continuous batching, use sequential generation", ) + parser.add_argument( + "--telemetry", + action="store_true", + help="Enable telemetry uploads. Disabled by default; disabled mode keeps telemetry in dry-run.", + ) parser.add_argument( "--legacy-daemon", action="store_true", diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index d2f46ff13e..d66185e115 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -1,3 +1,4 @@ +import contextlib from dataclasses import dataclass, field from pathlib import Path from typing import Self @@ -53,6 +54,10 @@ def submit(self, submission: TelemetrySubmission): def clone(self) -> "TelemetrySink": return TelemetrySink(_send=self._send.clone()) + def close(self): + with contextlib.suppress(BrokenResourceError, ClosedResourceError): + self._send.close() + @dataclass(eq=False) class TelemetryService: diff --git a/src/exo/worker/main.py b/src/exo/worker/main.py index a641bacfb6..3e40d58f1e 100644 --- a/src/exo/worker/main.py +++ b/src/exo/worker/main.py @@ -15,6 +15,7 @@ from exo.shared.apply import apply from exo.shared.constants import EXO_MAX_INSTANCE_RETRIES from exo.shared.models.model_cards import ModelId, card_cache +from exo.shared.telemetry import TelemetrySink from exo.shared.types.chunks import InputImageChunk from exo.shared.types.commands import ( DeleteInstance, @@ -74,6 +75,7 @@ def __init__( command_sender: Sender[ForwarderCommand], download_command_sender: Sender[ForwarderDownloadCommand], api_port: int, + telemetry_sink: TelemetrySink, ): self.node_id: NodeId = node_id self.event_receiver = event_receiver @@ -81,6 +83,7 @@ def __init__( self.command_sender = command_sender self.download_command_sender = download_command_sender self.api_port = api_port + self.telemetry_sink = telemetry_sink self.state: State = State() self.runners: dict[RunnerId, RunnerSupervisor] = {} @@ -122,6 +125,7 @@ async def run(self): self.event_sender.close() self.command_sender.close() self.download_command_sender.close() + self.telemetry_sink.close() for runner in self.runners.values(): runner.shutdown() self._stopped.set() @@ -381,6 +385,7 @@ async def _create_supervisor(self, task: CreateRunner) -> RunnerSupervisor: runner = await RunnerSupervisor.create( bound_instance=task.bound_instance, event_sender=self.event_sender.clone(), + telemetry_sink=self.telemetry_sink.clone(), ) self.runners[task.bound_instance.bound_runner_id] = runner self._tg.start_soon(runner.run) diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index 594a55785f..d54726fa36 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -125,6 +125,7 @@ async def run(self): path=self._stderr_log_path, ) ) + self._telemetry.close() def shutdown(self): self._tg.cancel_tasks() diff --git a/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py b/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py index c770150063..aeb0d52d8f 100644 --- a/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py +++ b/src/exo/worker/tests/unittests/test_runner/test_runner_supervisor.py @@ -5,6 +5,7 @@ import pytest from exo.shared.models.model_cards import ModelId +from exo.shared.telemetry import TelemetryService from exo.shared.types.chunks import ErrorChunk from exo.shared.types.common import CommandId, NodeId from exo.shared.types.events import ChunkGenerated, Event, RunnerStatusUpdated @@ -53,10 +54,12 @@ async def test_check_runner_emits_error_chunk_for_inflight_text_generation( ) proc = cast(AsyncProcess, cast(object, _DeadProcess())) + telemetry = TelemetryService.dummy() handler = await RunnerStdioHandler.create( bound_instance=bound_instance, stdout_rx=proc.stdout, stderr_rx=proc.stderr, + telemetry_sink=telemetry.sink(), runner_log_dir=tmp_path, ) supervisor = RunnerSupervisor( From a806371f7f5202ff0a849b7bde8d48a7ffad7998 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 11:12:50 +0100 Subject: [PATCH 07/18] plumbing --- src/exo/shared/constants.py | 7 +++++++ src/exo/shared/telemetry.py | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/exo/shared/constants.py b/src/exo/shared/constants.py index ec89c1a509..ea98a3955e 100644 --- a/src/exo/shared/constants.py +++ b/src/exo/shared/constants.py @@ -1,6 +1,7 @@ import os import sys from pathlib import Path +from urllib.parse import urlparse from exo.utils.dashboard_path import find_dashboard, find_resources @@ -26,6 +27,12 @@ def _get_xdg_dir(env_var: str, fallback: str) -> Path: EXO_DATA_HOME = _get_xdg_dir("XDG_DATA_HOME", ".local/share") EXO_CACHE_HOME = _get_xdg_dir("XDG_CACHE_HOME", ".cache") +# Exo website API endpoints +_EXO_TELEMETRY_API_URL_ENV = os.environ.get( + "EXO_TELEMETRY_API_URL", "https://telemetry.exolabs.net/" +) +EXO_TELEMETRY_API_URL = urlparse(_EXO_TELEMETRY_API_URL_ENV).geturl().rstrip("/") + # Default models directory (always included as first entry in writable dirs) _EXO_DEFAULT_MODELS_DIR_ENV = os.environ.get("EXO_DEFAULT_MODELS_DIR", None) EXO_DEFAULT_MODELS_DIR = ( diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index d66185e115..f59430474b 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -6,6 +6,7 @@ from anyio import BrokenResourceError, ClosedResourceError, WouldBlock from loguru import logger +from exo.shared.constants import EXO_TELEMETRY_API_URL from exo.utils.channels import Receiver, Sender, channel from exo.utils.pydantic_ext import TaggedModel from exo.utils.task_group import TaskGroup @@ -62,16 +63,18 @@ def close(self): @dataclass(eq=False) class TelemetryService: dry_run: bool + api_url: str _send: Sender[TelemetrySubmission] _recv: Receiver[TelemetrySubmission] _tg: TaskGroup = field(default_factory=TaskGroup, init=False) @classmethod - def create(cls, dry_run: bool) -> Self: + def create(cls, dry_run: bool, api_url: str = EXO_TELEMETRY_API_URL) -> Self: send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE) return cls( dry_run=dry_run, + api_url=api_url, _send=send, _recv=recv, ) From 73dfde5df7fc910eed109eb9ed833ee08cf1843c Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 11:36:04 +0100 Subject: [PATCH 08/18] send runner log --- src/exo/shared/constants.py | 4 +- src/exo/shared/telemetry.py | 53 +++++++++++++-- src/exo/shared/tests/test_telemetry.py | 90 ++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 src/exo/shared/tests/test_telemetry.py diff --git a/src/exo/shared/constants.py b/src/exo/shared/constants.py index ea98a3955e..c7e0a6a53a 100644 --- a/src/exo/shared/constants.py +++ b/src/exo/shared/constants.py @@ -1,7 +1,6 @@ import os import sys from pathlib import Path -from urllib.parse import urlparse from exo.utils.dashboard_path import find_dashboard, find_resources @@ -28,10 +27,9 @@ def _get_xdg_dir(env_var: str, fallback: str) -> Path: EXO_CACHE_HOME = _get_xdg_dir("XDG_CACHE_HOME", ".cache") # Exo website API endpoints -_EXO_TELEMETRY_API_URL_ENV = os.environ.get( +EXO_TELEMETRY_API_URL = os.environ.get( "EXO_TELEMETRY_API_URL", "https://telemetry.exolabs.net/" ) -EXO_TELEMETRY_API_URL = urlparse(_EXO_TELEMETRY_API_URL_ENV).geturl().rstrip("/") # Default models directory (always included as first entry in writable dirs) _EXO_DEFAULT_MODELS_DIR_ENV = os.environ.get("EXO_DEFAULT_MODELS_DIR", None) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index f59430474b..64266f18f1 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -1,17 +1,21 @@ import contextlib +import hashlib from dataclasses import dataclass, field from pathlib import Path from typing import Self +from urllib.parse import urlparse -from anyio import BrokenResourceError, ClosedResourceError, WouldBlock +import httpx +from anyio import BrokenResourceError, ClosedResourceError, WouldBlock, to_thread from loguru import logger from exo.shared.constants import EXO_TELEMETRY_API_URL from exo.utils.channels import Receiver, Sender, channel -from exo.utils.pydantic_ext import TaggedModel +from exo.utils.pydantic_ext import FrozenModel, TaggedModel from exo.utils.task_group import TaskGroup CHANNEL_BOUND_SIZE = 64 +TELEMETRY_HTTP_TIMEOUT_SECONDS = 10.0 class BaseTelemetrySubmission(TaggedModel): @@ -29,6 +33,10 @@ class RunnerStderrSubmission(BaseTelemetrySubmission): TelemetrySubmission = TestSubmission | RunnerStderrSubmission +class TelemetryPresignResponse(FrozenModel): + upload_url: str + + @dataclass(eq=False) class TelemetrySink: """ @@ -66,10 +74,18 @@ class TelemetryService: api_url: str _send: Sender[TelemetrySubmission] _recv: Receiver[TelemetrySubmission] + _http_transport: httpx.AsyncBaseTransport | None _tg: TaskGroup = field(default_factory=TaskGroup, init=False) @classmethod - def create(cls, dry_run: bool, api_url: str = EXO_TELEMETRY_API_URL) -> Self: + def create( + cls, + dry_run: bool, + api_url: str = EXO_TELEMETRY_API_URL, + http_transport: httpx.AsyncBaseTransport | None = None, + ) -> Self: + api_url = urlparse(api_url).geturl().rstrip("/") + send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE) return cls( @@ -77,6 +93,7 @@ def create(cls, dry_run: bool, api_url: str = EXO_TELEMETRY_API_URL) -> Self: api_url=api_url, _send=send, _recv=recv, + _http_transport=http_transport, ) @classmethod @@ -106,8 +123,34 @@ async def _process_submission(self, submission: TelemetrySubmission): match submission: case TestSubmission(): pass - case RunnerStderrSubmission(): - pass + case RunnerStderrSubmission(path=path): + await self._submit_runner_stderr(path) + + async def _submit_runner_stderr(self, path: Path): + data = await to_thread.run_sync(path.read_bytes) + sha256 = hashlib.sha256(data).hexdigest() + + async with httpx.AsyncClient( + timeout=TELEMETRY_HTTP_TIMEOUT_SECONDS, + transport=self._http_transport, + ) as client: + presign_response = await client.post( + f"{self.api_url}/telemetry/runner-log/presign", + json={ + "sha256": sha256, + "size": len(data), + }, + ) + presign_response.raise_for_status() + presign = TelemetryPresignResponse.model_validate_json( + presign_response.text, + ) + + upload_response = await client.put( + presign.upload_url, + content=data, + ) + upload_response.raise_for_status() def sink(self) -> TelemetrySink: sink, recv = TelemetrySink.pair() diff --git a/src/exo/shared/tests/test_telemetry.py b/src/exo/shared/tests/test_telemetry.py new file mode 100644 index 0000000000..e9ec8d723c --- /dev/null +++ b/src/exo/shared/tests/test_telemetry.py @@ -0,0 +1,90 @@ +import hashlib +import json +from dataclasses import dataclass +from pathlib import Path + +import httpx +import pytest + +from exo.shared.telemetry import RunnerStderrSubmission, TelemetryService + + +@dataclass(frozen=True) +class RecordedRequest: + method: str + url: str + content: bytes + + +def _queue_submission( + service: TelemetryService, + submission: RunnerStderrSubmission, +) -> None: + service._send.send_nowait(submission) # pyright: ignore[reportPrivateUsage] + service._send.close() # pyright: ignore[reportPrivateUsage] + + +@pytest.mark.anyio +async def test_runner_stderr_upload_hashes_and_uploads_file_bytes(tmp_path: Path): + log_bytes = b"runner stderr\nsecond line\n" + log_path = tmp_path / "runner.stderr.log" + log_path.write_bytes(log_bytes) + requests: list[RecordedRequest] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + requests.append( + RecordedRequest( + method=request.method, + url=str(request.url), + content=await request.aread(), + ) + ) + if request.method == "POST": + return httpx.Response( + 200, + json={"uploadUrl": "https://uploads.example/runner.stderr.log"}, + ) + if request.method == "PUT": + return httpx.Response(200) + return httpx.Response(404) + + service = TelemetryService.create( + dry_run=False, + api_url="https://telemetry.example/", + http_transport=httpx.MockTransport(handler), + ) + + await service._process_submission( # pyright: ignore[reportPrivateUsage] + RunnerStderrSubmission(path=log_path) + ) + + assert [r.method for r in requests] == ["POST", "PUT"] + assert requests[0].url == "https://telemetry.example/telemetry/runner-log/presign" + assert json.loads(requests[0].content) == { + "sha256": hashlib.sha256(log_bytes).hexdigest(), + "size": len(log_bytes), + } + assert requests[1].url == "https://uploads.example/runner.stderr.log" + assert requests[1].content == log_bytes + + +@pytest.mark.anyio +async def test_runner_stderr_upload_failure_is_swallowed(tmp_path: Path): + log_path = tmp_path / "runner.stderr.log" + log_path.write_text("runner stderr\n") + requests: list[httpx.Request] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + requests.append(request) + return httpx.Response(500) + + service = TelemetryService.create( + dry_run=False, + api_url="https://telemetry.example", + http_transport=httpx.MockTransport(handler), + ) + _queue_submission(service, RunnerStderrSubmission(path=log_path)) + + await service._process() # pyright: ignore[reportPrivateUsage] + + assert len(requests) == 1 From ec2509224f13e28589ca4450e508e2babeee6d6a Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 11:53:27 +0100 Subject: [PATCH 09/18] end-to-end runs --- src/exo/shared/telemetry.py | 3 +++ src/exo/shared/tests/test_telemetry.py | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index 64266f18f1..93236d81de 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -34,7 +34,10 @@ class RunnerStderrSubmission(BaseTelemetrySubmission): class TelemetryPresignResponse(FrozenModel): + key: str upload_url: str + expires_in: int + max_size: int @dataclass(eq=False) diff --git a/src/exo/shared/tests/test_telemetry.py b/src/exo/shared/tests/test_telemetry.py index e9ec8d723c..d3efc16e8f 100644 --- a/src/exo/shared/tests/test_telemetry.py +++ b/src/exo/shared/tests/test_telemetry.py @@ -42,7 +42,12 @@ async def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST": return httpx.Response( 200, - json={"uploadUrl": "https://uploads.example/runner.stderr.log"}, + json={ + "key": "runner_log/test.stderr.log", + "uploadUrl": "https://uploads.example/runner.stderr.log", + "expiresIn": 300, + "maxSize": 52428800, + }, ) if request.method == "PUT": return httpx.Response(200) From 56c25367a34eb05d694b11616f6fd9a29e358956 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 13:03:31 +0100 Subject: [PATCH 10/18] skip zero --- src/exo/shared/telemetry.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index 93236d81de..176b68ecdb 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -131,6 +131,10 @@ async def _process_submission(self, submission: TelemetrySubmission): async def _submit_runner_stderr(self, path: Path): data = await to_thread.run_sync(path.read_bytes) + if not data: + logger.debug(f"Skipping empty runner stderr telemetry file: {path}") + return + sha256 = hashlib.sha256(data).hexdigest() async with httpx.AsyncClient( From 97b6953884ce4710a4d87601e897f2629f8abf09 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 13:44:45 +0100 Subject: [PATCH 11/18] test --- src/exo/worker/runner/bootstrap.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/exo/worker/runner/bootstrap.py b/src/exo/worker/runner/bootstrap.py index f981667c0a..f6d043b095 100644 --- a/src/exo/worker/runner/bootstrap.py +++ b/src/exo/worker/runner/bootstrap.py @@ -1,5 +1,6 @@ import os import resource +import sys import traceback from dataclasses import dataclass from typing import Self, cast @@ -47,6 +48,8 @@ def entrypoint( global logger logger = _logger + print("this is a test log in runner stderr", file=sys.stderr) + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (min(max(soft, 2048), hard), hard)) From 2e4b13c75f03021a07cbe8312308540a7a0a7625 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 13:51:18 +0100 Subject: [PATCH 12/18] undo --- src/exo/worker/runner/bootstrap.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/exo/worker/runner/bootstrap.py b/src/exo/worker/runner/bootstrap.py index f6d043b095..f981667c0a 100644 --- a/src/exo/worker/runner/bootstrap.py +++ b/src/exo/worker/runner/bootstrap.py @@ -1,6 +1,5 @@ import os import resource -import sys import traceback from dataclasses import dataclass from typing import Self, cast @@ -48,8 +47,6 @@ def entrypoint( global logger logger = _logger - print("this is a test log in runner stderr", file=sys.stderr) - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (min(max(soft, 2048), hard), hard)) From e629007c429b57b2948fd3d97ae630900c78641f Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 14:33:33 +0100 Subject: [PATCH 13/18] test --- src/exo/worker/runner/bootstrap.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/exo/worker/runner/bootstrap.py b/src/exo/worker/runner/bootstrap.py index f981667c0a..f6d043b095 100644 --- a/src/exo/worker/runner/bootstrap.py +++ b/src/exo/worker/runner/bootstrap.py @@ -1,5 +1,6 @@ import os import resource +import sys import traceback from dataclasses import dataclass from typing import Self, cast @@ -47,6 +48,8 @@ def entrypoint( global logger logger = _logger + print("this is a test log in runner stderr", file=sys.stderr) + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (min(max(soft, 2048), hard), hard)) From 50d02f3ef80a9a9d785da5cbb405389ef122c335 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 14:43:15 +0100 Subject: [PATCH 14/18] fee --- src/exo/shared/telemetry.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index 176b68ecdb..b3c1e8ad64 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -141,6 +141,7 @@ async def _submit_runner_stderr(self, path: Path): timeout=TELEMETRY_HTTP_TIMEOUT_SECONDS, transport=self._http_transport, ) as client: + logger.warning("doing {self.api_url}") presign_response = await client.post( f"{self.api_url}/telemetry/runner-log/presign", json={ @@ -158,6 +159,7 @@ async def _submit_runner_stderr(self, path: Path): content=data, ) upload_response.raise_for_status() + logger.warning(f"done telemetry {upload_response}") def sink(self) -> TelemetrySink: sink, recv = TelemetrySink.pair() From 0df3df6d11c67ffc32378be23bad1a545e47476e Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 14:51:49 +0100 Subject: [PATCH 15/18] undo logging --- src/exo/shared/telemetry.py | 2 -- src/exo/worker/runner/bootstrap.py | 3 --- 2 files changed, 5 deletions(-) diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index b3c1e8ad64..176b68ecdb 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -141,7 +141,6 @@ async def _submit_runner_stderr(self, path: Path): timeout=TELEMETRY_HTTP_TIMEOUT_SECONDS, transport=self._http_transport, ) as client: - logger.warning("doing {self.api_url}") presign_response = await client.post( f"{self.api_url}/telemetry/runner-log/presign", json={ @@ -159,7 +158,6 @@ async def _submit_runner_stderr(self, path: Path): content=data, ) upload_response.raise_for_status() - logger.warning(f"done telemetry {upload_response}") def sink(self) -> TelemetrySink: sink, recv = TelemetrySink.pair() diff --git a/src/exo/worker/runner/bootstrap.py b/src/exo/worker/runner/bootstrap.py index f6d043b095..f981667c0a 100644 --- a/src/exo/worker/runner/bootstrap.py +++ b/src/exo/worker/runner/bootstrap.py @@ -1,6 +1,5 @@ import os import resource -import sys import traceback from dataclasses import dataclass from typing import Self, cast @@ -48,8 +47,6 @@ def entrypoint( global logger logger = _logger - print("this is a test log in runner stderr", file=sys.stderr) - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit(resource.RLIMIT_NOFILE, (min(max(soft, 2048), hard), hard)) From d8b752b355d3aaacf9924ffd47625d02ed509f81 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 18:01:28 +0100 Subject: [PATCH 16/18] push changes --- app/EXO/EXO/Services/ClusterStateService.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/EXO/EXO/Services/ClusterStateService.swift b/app/EXO/EXO/Services/ClusterStateService.swift index c4222cde91..eda7b03eb0 100644 --- a/app/EXO/EXO/Services/ClusterStateService.swift +++ b/app/EXO/EXO/Services/ClusterStateService.swift @@ -37,7 +37,7 @@ final class ClusterStateService: ObservableObject { /// gain nothing from being cached on disk. Use an ephemeral session /// with `urlCache = nil` so neither response bodies nor metadata /// touch disk. - private static func makeNonCachingSession() -> URLSession { + nonisolated private static func makeNonCachingSession() -> URLSession { let config = URLSessionConfiguration.ephemeral config.urlCache = nil config.requestCachePolicy = .reloadIgnoringLocalCacheData From 490cf320552e5ff816a34218450b267edcec93b7 Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 18:16:52 +0100 Subject: [PATCH 17/18] fix env flag --- justfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/justfile b/justfile index 96e26c84b7..6ef6fa4e05 100644 --- a/justfile +++ b/justfile @@ -37,7 +37,7 @@ package: build-dashboard rm -rf build build-app: rust-rebuild sync-clean package - xcodebuild build -project app/EXO/EXO.xcodeproj -scheme EXO -configuration Debug -derivedDataPath app/EXO/build + env -u LD xcodebuild build -project app/EXO/EXO.xcodeproj -scheme EXO -configuration Debug -derivedDataPath app/EXO/build @echo "\nBuild complete. Run with:\n open {{justfile_directory()}}/app/EXO/build/Build/Products/Debug/EXO.app" clean: From a295eb8497f1b52b5f86399ce855326d6f810d7e Mon Sep 17 00:00:00 2001 From: Andrei Cravtov Date: Fri, 29 May 2026 20:16:06 +0100 Subject: [PATCH 18/18] fixes --- src/exo/main.py | 2 +- src/exo/shared/telemetry.py | 8 ++++---- src/exo/shared/tests/test_telemetry.py | 4 ++-- src/exo/worker/runner/supervisor.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/exo/main.py b/src/exo/main.py index e95d3f989e..cdb555005e 100644 --- a/src/exo/main.py +++ b/src/exo/main.py @@ -72,7 +72,7 @@ async def create(cls, args: "Args") -> Self: external_outbound=router.sender(topics.LOCAL_EVENTS), external_inbound=router.receiver(topics.GLOBAL_EVENTS), ) - telemetry = TelemetryService.create(dry_run=not args.telemetry) + telemetry = TelemetryService.create(telemetry_disabled=not args.telemetry) logger.info(f"Starting node {node_id}") diff --git a/src/exo/shared/telemetry.py b/src/exo/shared/telemetry.py index 176b68ecdb..c7d5dd2e73 100644 --- a/src/exo/shared/telemetry.py +++ b/src/exo/shared/telemetry.py @@ -73,7 +73,7 @@ def close(self): @dataclass(eq=False) class TelemetryService: - dry_run: bool + telemetry_disabled: bool api_url: str _send: Sender[TelemetrySubmission] _recv: Receiver[TelemetrySubmission] @@ -83,7 +83,7 @@ class TelemetryService: @classmethod def create( cls, - dry_run: bool, + telemetry_disabled: bool, api_url: str = EXO_TELEMETRY_API_URL, http_transport: httpx.AsyncBaseTransport | None = None, ) -> Self: @@ -92,7 +92,7 @@ def create( send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE) return cls( - dry_run=dry_run, + telemetry_disabled=telemetry_disabled, api_url=api_url, _send=send, _recv=recv, @@ -114,7 +114,7 @@ async def run(self): async def _process(self): with self._recv as submissions: async for submission in submissions: - if not self.dry_run: + if not self.telemetry_disabled: try: await self._process_submission(submission) except Exception as e: diff --git a/src/exo/shared/tests/test_telemetry.py b/src/exo/shared/tests/test_telemetry.py index d3efc16e8f..f86d4c497a 100644 --- a/src/exo/shared/tests/test_telemetry.py +++ b/src/exo/shared/tests/test_telemetry.py @@ -54,7 +54,7 @@ async def handler(request: httpx.Request) -> httpx.Response: return httpx.Response(404) service = TelemetryService.create( - dry_run=False, + telemetry_disabled=False, api_url="https://telemetry.example/", http_transport=httpx.MockTransport(handler), ) @@ -84,7 +84,7 @@ async def handler(request: httpx.Request) -> httpx.Response: return httpx.Response(500) service = TelemetryService.create( - dry_run=False, + telemetry_disabled=False, api_url="https://telemetry.example", http_transport=httpx.MockTransport(handler), ) diff --git a/src/exo/worker/runner/supervisor.py b/src/exo/worker/runner/supervisor.py index d54726fa36..060afc9785 100644 --- a/src/exo/worker/runner/supervisor.py +++ b/src/exo/worker/runner/supervisor.py @@ -138,7 +138,7 @@ async def _handle_stdout(self): try: async with rx: async for chunk in rx: - logger.error(f"Unexpected runner stdout chunk: {chunk}") + logger.warning(f"Unexpected runner stdout chunk: {chunk}") except (ClosedResourceError, BrokenResourceError): logger.warning("Runner stdio stream closed before clean EOF")