Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/EXO/EXO/Services/ClusterStateService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class ClusterStateService: ObservableObject {
/// gain nothing from being cached on disk. Use an ephemeral session
/// with `urlCache = nil` so neither response bodies nor metadata
/// touch disk.
private static func makeNonCachingSession() -> URLSession {
nonisolated private static func makeNonCachingSession() -> URLSession {
let config = URLSessionConfiguration.ephemeral
config.urlCache = nil
config.requestCachePolicy = .reloadIgnoringLocalCacheData
Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ package: build-dashboard
rm -rf build

build-app: rust-rebuild sync-clean package
xcodebuild build -project app/EXO/EXO.xcodeproj -scheme EXO -configuration Debug -derivedDataPath app/EXO/build
env -u LD xcodebuild build -project app/EXO/EXO.xcodeproj -scheme EXO -configuration Debug -derivedDataPath app/EXO/build
@echo "\nBuild complete. Run with:\n open {{justfile_directory()}}/app/EXO/build/Build/Products/Debug/EXO.app"

clean:
Expand Down
13 changes: 13 additions & 0 deletions src/exo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from exo.shared.constants import EXO_DEFAULT_MODELS_DIR, EXO_LOG, EXO_PID_FILE
from exo.shared.election import Election, ElectionResult
from exo.shared.logging import logger_cleanup, logger_setup
from exo.shared.telemetry import TelemetryService
from exo.shared.types.common import NodeId, SessionId
from exo.utils import STDIO_FDS
from exo.utils.channels import Receiver, channel
Expand All @@ -42,6 +43,7 @@ class Node:
election_result_receiver: Receiver[ElectionResult]
master: Master | None
api: API | None
telemetry: TelemetryService

node_id: NodeId
offline: bool
Expand Down Expand Up @@ -70,6 +72,7 @@ async def create(cls, args: "Args") -> Self:
external_outbound=router.sender(topics.LOCAL_EVENTS),
external_inbound=router.receiver(topics.GLOBAL_EVENTS),
)
telemetry = TelemetryService.create(dry_run=not args.telemetry)

logger.info(f"Starting node {node_id}")

Expand Down Expand Up @@ -108,6 +111,7 @@ async def create(cls, args: "Args") -> Self:
command_sender=router.sender(topics.COMMANDS),
download_command_sender=router.sender(topics.DOWNLOAD_COMMANDS),
api_port=args.api_port,
telemetry_sink=telemetry.sink(),
)
else:
worker = None
Expand Down Expand Up @@ -146,6 +150,7 @@ async def create(cls, args: "Args") -> Self:
er_recv,
master,
api,
telemetry,
node_id,
args.offline,
args.api_port,
Expand All @@ -157,6 +162,7 @@ async def run(self):
signal.signal(signal.SIGTERM, lambda _, __: self.shutdown())
tg.start_soon(self.router.run)
tg.start_soon(self.event_router.run)
tg.start_soon(self.telemetry.run)
tg.start_soon(self.election.run)
if self.download_coordinator:
tg.start_soon(self.download_coordinator.run)
Expand Down Expand Up @@ -264,6 +270,7 @@ async def _elect_loop(self):
topics.DOWNLOAD_COMMANDS
),
api_port=self._api_port,
telemetry_sink=self.telemetry.sink(),
)
self._tg.start_soon(self.worker.run)
if self.api:
Expand Down Expand Up @@ -384,6 +391,7 @@ class Args(FrozenModel):
no_downloads: bool = False
offline: bool = os.getenv("EXO_OFFLINE", "false").lower() == "true"
no_batch: bool = False
telemetry: bool = False
fast_synch: bool | None = None # None = auto, True = force on, False = force off
legacy_daemon: bool = False
bootstrap_peers: list[str] = []
Expand Down Expand Up @@ -445,6 +453,11 @@ def parse(cls) -> Self:
action="store_true",
help="Disable continuous batching, use sequential generation",
)
parser.add_argument(
"--telemetry",
action="store_true",
help="Enable telemetry uploads. Disabled by default; disabled mode keeps telemetry in dry-run.",
Comment thread
AndreiCravtov marked this conversation as resolved.
)
parser.add_argument(
"--legacy-daemon",
action="store_true",
Expand Down
7 changes: 5 additions & 2 deletions src/exo/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def _get_xdg_dir(env_var: str, fallback: str) -> Path:
EXO_DATA_HOME = _get_xdg_dir("XDG_DATA_HOME", ".local/share")
EXO_CACHE_HOME = _get_xdg_dir("XDG_CACHE_HOME", ".cache")

# Exo website API endpoints
EXO_TELEMETRY_API_URL = os.environ.get(
"EXO_TELEMETRY_API_URL", "https://telemetry.exolabs.net/"
)

# Default models directory (always included as first entry in writable dirs)
_EXO_DEFAULT_MODELS_DIR_ENV = os.environ.get("EXO_DEFAULT_MODELS_DIR", None)
EXO_DEFAULT_MODELS_DIR = (
Expand Down Expand Up @@ -69,8 +74,6 @@ def _parse_colon_dirs(env_var: str) -> tuple[Path, ...]:
EXO_LOG_DIR = EXO_CACHE_HOME / "exo_log"
EXO_LOG = EXO_LOG_DIR / "exo.log"
EXO_RUNNER_LOG_DIR = EXO_LOG_DIR / "runner_log"
EXO_RUNNER_STDOUT_LOG = EXO_RUNNER_LOG_DIR / "stdout.log"
EXO_RUNNER_STDERR_LOG = EXO_RUNNER_LOG_DIR / "stderr.log"

EXO_TEST_LOG = EXO_CACHE_HOME / "exo_test.log"
EXO_PID_FILE = EXO_CACHE_HOME / "exo.pid"
Expand Down
176 changes: 176 additions & 0 deletions src/exo/shared/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import contextlib
import hashlib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Self
from urllib.parse import urlparse

import httpx
from anyio import BrokenResourceError, ClosedResourceError, WouldBlock, to_thread
from loguru import logger

from exo.shared.constants import EXO_TELEMETRY_API_URL
from exo.utils.channels import Receiver, Sender, channel
from exo.utils.pydantic_ext import FrozenModel, TaggedModel
from exo.utils.task_group import TaskGroup

CHANNEL_BOUND_SIZE = 64
TELEMETRY_HTTP_TIMEOUT_SECONDS = 10.0


class BaseTelemetrySubmission(TaggedModel):
pass


class TestSubmission(BaseTelemetrySubmission):
pass


class RunnerStderrSubmission(BaseTelemetrySubmission):
path: Path


TelemetrySubmission = TestSubmission | RunnerStderrSubmission


class TelemetryPresignResponse(FrozenModel):
key: str
upload_url: str
expires_in: int
max_size: int


@dataclass(eq=False)
class TelemetrySink:
"""
A non-blocking non-throwing bounded wrapper around sender/receiver channels
to ensure telemetry never blocks or has adverse side-effects, since telemetry
is an optional diagnostic feature and hence should never break the main app.
"""

_send: Sender[TelemetrySubmission]

@classmethod
def pair(cls) -> tuple[Self, Receiver[TelemetrySubmission]]:
send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE)
return cls(_send=send), recv

def submit(self, submission: TelemetrySubmission):
try:
self._send.send_nowait(submission)
except WouldBlock:
logger.debug("Telemetry submission would block. why so many submissions??")
except (BrokenResourceError, ClosedResourceError):
logger.debug("Telemetry submission receivers are broken or closed. why??")

def clone(self) -> "TelemetrySink":
return TelemetrySink(_send=self._send.clone())

def close(self):
with contextlib.suppress(BrokenResourceError, ClosedResourceError):
self._send.close()


@dataclass(eq=False)
class TelemetryService:
dry_run: bool
api_url: str
_send: Sender[TelemetrySubmission]
_recv: Receiver[TelemetrySubmission]
_http_transport: httpx.AsyncBaseTransport | None
_tg: TaskGroup = field(default_factory=TaskGroup, init=False)

@classmethod
def create(
cls,
dry_run: bool,
Comment thread
AndreiCravtov marked this conversation as resolved.
Outdated
api_url: str = EXO_TELEMETRY_API_URL,
http_transport: httpx.AsyncBaseTransport | None = None,
) -> Self:
api_url = urlparse(api_url).geturl().rstrip("/")

send, recv = channel[TelemetrySubmission](CHANNEL_BOUND_SIZE)

return cls(
dry_run=dry_run,
api_url=api_url,
_send=send,
_recv=recv,
_http_transport=http_transport,
)

@classmethod
def dummy(cls) -> Self:
return cls.create(True)

async def run(self):
try:
async with self._tg as tg:
tg.start_soon(self._process)
finally:
self._send.close()
self._recv.close()

async def _process(self):
with self._recv as submissions:
async for submission in submissions:
if not self.dry_run:
try:
await self._process_submission(submission)
except Exception as e:
logger.opt(exception=e).warning(
"Exception when processing telemetry submission"
)

async def _process_submission(self, submission: TelemetrySubmission):
match submission:
case TestSubmission():
pass
case RunnerStderrSubmission(path=path):
await self._submit_runner_stderr(path)

async def _submit_runner_stderr(self, path: Path):
data = await to_thread.run_sync(path.read_bytes)
if not data:
logger.debug(f"Skipping empty runner stderr telemetry file: {path}")
return

sha256 = hashlib.sha256(data).hexdigest()

async with httpx.AsyncClient(
timeout=TELEMETRY_HTTP_TIMEOUT_SECONDS,
transport=self._http_transport,
) as client:
presign_response = await client.post(
f"{self.api_url}/telemetry/runner-log/presign",
json={
"sha256": sha256,
"size": len(data),
},
)
presign_response.raise_for_status()
presign = TelemetryPresignResponse.model_validate_json(
presign_response.text,
)

upload_response = await client.put(
presign.upload_url,
content=data,
)
upload_response.raise_for_status()

def sink(self) -> TelemetrySink:
sink, recv = TelemetrySink.pair()
if self._tg.is_running():
self._tg.start_soon(self._ingest, recv)
else:
self._tg.queue(self._ingest, recv)
return sink

async def _ingest(self, recv: Receiver[TelemetrySubmission]):
try:
with recv as submissions:
async for submission in submissions:
await self._send.send(submission)
except ClosedResourceError:
pass
95 changes: 95 additions & 0 deletions src/exo/shared/tests/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path

import httpx
import pytest

from exo.shared.telemetry import RunnerStderrSubmission, TelemetryService


@dataclass(frozen=True)
class RecordedRequest:
method: str
url: str
content: bytes


def _queue_submission(
service: TelemetryService,
submission: RunnerStderrSubmission,
) -> None:
service._send.send_nowait(submission) # pyright: ignore[reportPrivateUsage]
service._send.close() # pyright: ignore[reportPrivateUsage]


@pytest.mark.anyio
async def test_runner_stderr_upload_hashes_and_uploads_file_bytes(tmp_path: Path):
log_bytes = b"runner stderr\nsecond line\n"
log_path = tmp_path / "runner.stderr.log"
log_path.write_bytes(log_bytes)
requests: list[RecordedRequest] = []

async def handler(request: httpx.Request) -> httpx.Response:
requests.append(
RecordedRequest(
method=request.method,
url=str(request.url),
content=await request.aread(),
)
)
if request.method == "POST":
return httpx.Response(
200,
json={
"key": "runner_log/test.stderr.log",
"uploadUrl": "https://uploads.example/runner.stderr.log",
"expiresIn": 300,
"maxSize": 52428800,
},
)
if request.method == "PUT":
return httpx.Response(200)
return httpx.Response(404)

service = TelemetryService.create(
dry_run=False,
api_url="https://telemetry.example/",
http_transport=httpx.MockTransport(handler),
)

await service._process_submission( # pyright: ignore[reportPrivateUsage]
RunnerStderrSubmission(path=log_path)
)

assert [r.method for r in requests] == ["POST", "PUT"]
assert requests[0].url == "https://telemetry.example/telemetry/runner-log/presign"
assert json.loads(requests[0].content) == {
"sha256": hashlib.sha256(log_bytes).hexdigest(),
"size": len(log_bytes),
}
assert requests[1].url == "https://uploads.example/runner.stderr.log"
assert requests[1].content == log_bytes


@pytest.mark.anyio
async def test_runner_stderr_upload_failure_is_swallowed(tmp_path: Path):
log_path = tmp_path / "runner.stderr.log"
log_path.write_text("runner stderr\n")
requests: list[httpx.Request] = []

async def handler(request: httpx.Request) -> httpx.Response:
requests.append(request)
return httpx.Response(500)

service = TelemetryService.create(
dry_run=False,
api_url="https://telemetry.example",
http_transport=httpx.MockTransport(handler),
)
_queue_submission(service, RunnerStderrSubmission(path=log_path))

await service._process() # pyright: ignore[reportPrivateUsage]

assert len(requests) == 1
Loading
Loading