Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4d70a62
persist node ids in .cache
Evanev7 Feb 24, 2026
0c813b4
feat: peer-to-peer model downloads over LAN
ecohash-co Mar 11, 2026
4740f9d
refactor: decouple peer discovery from worker state, add link priorit…
ecohash-co Mar 12, 2026
66e0c35
feat: EXO_KV_CACHE_BITS env var + step=16384 to keep QuantizedKVCache…
adurham Apr 26, 2026
8996e7d
fix: skip KV cache quantization in single-node BatchGenerator mode
Apr 26, 2026
1cc0be2
feat: add --trust-remote-code CLI flag for custom model tokenizers
AlexCheema Feb 23, 2026
392bc6c
fix: keep TRUST_REMOTE_CODE=True for built-in models
AlexCheema Feb 23, 2026
a8ea158
Reconcile worker instance backoff from state
jw-wcv May 7, 2026
0955b1c
Tune cluster liveness polling cadence
jw-wcv May 7, 2026
d18c00b
Gate RDMA placement on rdma_ctl state
jw-wcv May 7, 2026
47b0ccb
Fix upstream port compatibility issues
jw-wcv May 7, 2026
34455fd
Harden peer download port for current typing
jw-wcv May 7, 2026
1fe31b5
Use current model directory for peer file server
jw-wcv May 7, 2026
a8dcc32
fix(download): harden peer file serving
jw-wcv May 7, 2026
70e502d
fix: make darwin mdns discovery reliable
AlexCheema May 9, 2026
3f21786
Guard quantized cache + integrity-check peer downloads
jw-wcv May 9, 2026
3cf6bbf
Mirror download_shard ignore_patterns and offline flag in peer path
jw-wcv May 9, 2026
7ef79bc
PR #16 R3: skip HF integrity check in offline mode + per-process peer…
jw-wcv May 9, 2026
714c610
Pick RDMA edges and serve from every model dir for peer downloads
jw-wcv May 9, 2026
09420bd
Reject oversized peer partials; relocate node-ID keypair to config dir
jw-wcv May 9, 2026
755cce7
Scope node-ID keypair per process and migrate inside the file lock
jw-wcv May 9, 2026
5cc987e
Combine listening ports for keypair scope; restart on 200-on-resume
jw-wcv May 9, 2026
fe348e9
PR #16 R(N+8) P1: address libp2p-port=0 scope collision and oversized…
jw-wcv May 9, 2026
78031c4
PR #16 R(N+9) P1+P2: restore safer node timeout and search all model …
jw-wcv May 9, 2026
9336bf2
PR #16 R(N+10) P2: keep peer file server task alive for cleanup
jw-wcv May 9, 2026
effabd9
PR #16 R(N+10) P2: materialize zero-byte marker files in peer transfer
jw-wcv May 9, 2026
363e695
PR #16 R(N+11) P1: distinguish unknown-size from zero-byte in peer tr…
jw-wcv May 9, 2026
6fcc743
PR #16 R(N+13) P1: serialize legacy keypair adoption across scopes
jw-wcv May 9, 2026
19d6ce1
PR #16 R(N+14) P2: mark zero-byte peer files complete in progress map
jw-wcv May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/exo/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ async def get_placement(
topology=self.state.topology,
current_instances=self.state.instances,
download_status=self.state.downloads,
node_rdma_ctl=self.state.node_rdma_ctl,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
Expand Down Expand Up @@ -794,6 +795,7 @@ async def get_placement_previews(
allowed_nodes=allowed_nodes,
allow_single_node_total_memory=allowed_nodes is not None,
download_status=self.state.downloads,
node_rdma_ctl=self.state.node_rdma_ctl,
)
except ValueError as exc:
if (model_card.model_id, sharding, instance_meta, 0) not in seen:
Expand Down
11 changes: 10 additions & 1 deletion src/exo/download/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
map_repo_download_progress_to_download_progress_data,
resolve_existing_model,
)
from exo.download.peer_shard_downloader import PeerAwareShardDownloader
from exo.download.shard_downloader import ShardDownloader
from exo.shared.constants import EXO_DEFAULT_MODELS_DIR, EXO_MODELS_READ_ONLY_DIRS
from exo.shared.models.model_cards import ModelCard, ModelId, get_model_cards
Expand Down Expand Up @@ -225,7 +226,15 @@ async def _command_processor(self) -> None:
continue

match cmd.command:
case StartDownload(shard_metadata=shard):
case StartDownload(shard_metadata=shard, available_peers=peers):
# Pass peer endpoints to the shard downloader if it supports it
if isinstance(self.shard_downloader, PeerAwareShardDownloader):
self.shard_downloader.set_available_peers(shard, peers)
elif hasattr(self.shard_downloader, "shard_downloader") and isinstance(
self.shard_downloader.shard_downloader, PeerAwareShardDownloader # type: ignore[union-attr]
):
# Unwrap SingletonShardDownloader
self.shard_downloader.shard_downloader.set_available_peers(shard, peers) # type: ignore[union-attr]
await self._start_download(shard)
Comment on lines +229 to 238
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Bypass offline short-circuit when peer endpoints are provided

This new StartDownload branch wires available_peers into PeerAwareShardDownloader, but it still calls _start_download() unchanged, and that method exits early with DownloadFailed whenever self.offline is true before ensure_shard() is ever invoked. In --offline/air-gapped deployments, peer sync is therefore never attempted even when LAN peers were discovered, so models that exist only on peers will always fail. The offline failure path needs to be conditioned so peer-backed downloads can proceed.

Useful? React with 👍 / 👎.

Comment on lines +232 to 238
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Do not enqueue peer candidates before start eligibility check

set_available_peers() is called before _start_download() decides whether this request will actually run; if the model is already DownloadOngoing, DownloadCompleted, or DownloadFailed, _start_download() returns immediately and the queued peer list is never consumed. On the next real retry for the same shard, _pop_available_peers() will use that stale list first, which can point to outdated/unreachable peers and trigger unnecessary fallback behavior. Only queue peers once a download is confirmed to start, or clear queued entries on skipped starts.

Useful? React with 👍 / 👎.

case DeleteDownload(model_id=model_id):
await self._delete_download(model_id)
Expand Down
25 changes: 25 additions & 0 deletions src/exo/download/download_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import hashlib
import json
import os
import random
import shutil
Expand Down Expand Up @@ -777,6 +778,9 @@ async def _download_file(
) as f:
while chunk := await r.content.read(8 * 1024 * 1024):
n_read = n_read + (await f.write(chunk))
await f.flush()
# Write companion metadata for peer download streaming
await _write_partial_meta(partial_path, n_read, length, remote_hash)
on_progress(n_read, length, False)

final_hash = await calc_hash(
Expand All @@ -792,10 +796,31 @@ async def _download_file(
f"Downloaded file {target_dir / path} has hash {final_hash} but remote hash is {remote_hash}"
)
await aios.rename(partial_path, target_dir / path)
# Clean up companion metadata file
meta_path = Path(f"{partial_path}.meta")
if await aios.path.exists(meta_path):
await aios.remove(meta_path)
on_progress(length, length, True)
return target_dir / path


async def _write_partial_meta(
partial_path: Path, safe_bytes: int, total: int, etag: str
) -> None:
"""Write companion .partial.meta file for peer download streaming.

This small JSON file tells the peer file server how many bytes of the
.partial file have been safely flushed to disk and are safe to serve.
"""
meta_path = Path(f"{partial_path}.meta")
meta = json.dumps({"safe_bytes": safe_bytes, "total": total, "etag": etag})
# Write to temp then rename for atomicity
tmp_path = Path(f"{partial_path}.meta.tmp")
async with aiofiles.open(tmp_path, "w") as f:
await f.write(meta)
await aios.rename(tmp_path, meta_path)


def calculate_repo_progress(
shard: ShardMetadata,
model_id: ModelId,
Expand Down
12 changes: 9 additions & 3 deletions src/exo/download/impl_shard_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RepoDownloadProgress,
download_shard,
)
from exo.download.peer_shard_downloader import PeerAwareShardDownloader
from exo.download.shard_downloader import ShardDownloader
from exo.shared.models.model_cards import (
ModelCard,
Expand All @@ -25,11 +26,16 @@


def exo_shard_downloader(
max_parallel_downloads: int = 8, offline: bool = False
max_parallel_downloads: int = 8,
offline: bool = False,
peer_download_enabled: bool = False,
) -> ShardDownloader:
return SingletonShardDownloader(
ResumableShardDownloader(max_parallel_downloads, offline=offline)
inner: ShardDownloader = ResumableShardDownloader(
max_parallel_downloads, offline=offline
)
if peer_download_enabled:
inner = PeerAwareShardDownloader(inner, offline=offline)
return SingletonShardDownloader(inner)


async def build_base_shard(model_id: ModelId) -> ShardMetadata:
Expand Down
271 changes: 271 additions & 0 deletions src/exo/download/peer_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
"""HTTP client for downloading model files from peer nodes.

Instead of downloading from HuggingFace, nodes can fetch model files from
peers on the same LAN that already have them (or are still downloading them).
Falls back gracefully if the peer is unreachable or the transfer fails.
"""

import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, cast

import aiofiles
import aiofiles.os as aios
import aiohttp
from loguru import logger


@dataclass(frozen=True)
class PeerFileInfo:
"""Status of a single file on a peer node."""

path: str
size: int
complete: bool
safe_bytes: int


def _as_int(value: object) -> int:
return value if isinstance(value, int) else 0


async def get_peer_file_status(
peer_host: str,
peer_port: int,
model_id_normalized: str,
timeout: float = 5.0,
) -> list[PeerFileInfo] | None:
"""Query a peer's file server for available files for a model.

Returns None if the peer is unreachable.
"""
url = f"http://{peer_host}:{peer_port}/status/{model_id_normalized}"
try:
async with (
aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=timeout)
) as session,
session.get(url) as r,
):
if r.status != 200:
return None
data = cast(dict[str, object], await r.json())
files = data.get("files", [])
if not isinstance(files, list):
return []
raw_files = cast(list[object], files)
out: list[PeerFileInfo] = []
required = {"path", "size", "complete", "safe_bytes"}
for raw_file in raw_files:
if not isinstance(raw_file, dict):
continue
file_info = cast(dict[str, object], raw_file)
if not required.issubset(file_info):
continue
out.append(
PeerFileInfo(
path=str(file_info["path"]),
size=_as_int(file_info["size"]),
complete=bool(file_info["complete"]),
safe_bytes=_as_int(file_info["safe_bytes"]),
)
)
return out
except Exception as e:
logger.debug(f"Could not reach peer {peer_host}:{peer_port}: {e}")
return None


async def download_file_from_peer(
peer_host: str,
peer_port: int,
model_id_normalized: str,
file_path: str,
target_dir: Path,
expected_size: int,
on_progress: Callable[[int, int, bool], None] = lambda _a, _b, _c: None,
max_poll_attempts: int = 60,
poll_interval: float = 3.0,
) -> Path | None:
"""Download a single file from a peer's file server.

Supports streaming relay: if the peer is still downloading the file,
we fetch available bytes, wait, and poll for more until the file is
complete.

Returns the final file path on success, or None on failure (caller
should fall back to HuggingFace).
"""
target_path = target_dir / file_path
partial_path = target_dir / f"{file_path}.partial"

# Check if already complete locally
if await aios.path.exists(target_path):
local_size = (await aios.stat(target_path)).st_size
if local_size == expected_size:
on_progress(expected_size, expected_size, True)
return target_path

await aios.makedirs((target_dir / file_path).parent, exist_ok=True)

url = f"http://{peer_host}:{peer_port}/files/{model_id_normalized}/{file_path}"
n_read = 0

# Resume from existing partial.
#
# Codex P1 (PR #16 round 5): a stale ``.partial`` left over from a
# previous run can be larger than ``expected_size`` (e.g. the peer
# was serving the wrong revision, the on-disk file was truncated
# to a different blob, or the user manually replaced it). In that
# case ``n_read >= expected_size`` skips the resume loop entirely
# and we'd then ``rename`` a too-large file as the "successful"
# result. With offline mode we explicitly skip hash verification,
# so the bad bytes would never get caught downstream and would
# poison the model cache. Fail fast: drop the stale partial and
# restart from zero on this peer.
if await aios.path.exists(partial_path):
existing_size = (await aios.stat(partial_path)).st_size
if existing_size > expected_size:
logger.warning(
f"Discarding stale oversized peer partial for {file_path} "
f"({existing_size} > expected {expected_size}); "
"restarting download from zero"
)
await aios.remove(partial_path)
n_read = 0
else:
n_read = existing_size

poll_count = 0
chunk_size = 8 * 1024 * 1024 # 8MB, matching HF download

try:
while n_read < expected_size and poll_count < max_poll_attempts:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject oversized stale partials before peer resume

The resume loop only runs while n_read < expected_size, so if an existing .partial is already larger than expected_size (stale/corrupt prior download), this path skips downloading and later renames that file as a successful result. In offline mode (where hash verification is intentionally skipped), this can silently accept incorrect model bytes and produce broken inference artifacts.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved. Resolved by commit 09d0279b. The peer-download resume loop now rejects oversized stale partial files: if partial_size > expected_size, the partial is deleted and the download restarts from byte 0 rather than appending a fresh body to a stale buffer.

headers: dict[str, str] = {}
if n_read > 0:
headers["Range"] = f"bytes={n_read}-"

got_bytes = False
range_was_requested = n_read > 0
async with (
aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=300, sock_read=60)
) as session,
session.get(url, headers=headers) as r,
):
if r.status == 416:
# Range not satisfiable - peer doesn't have more yet
pass
elif range_was_requested and r.status == 200:
# Codex P1 (PR #16 round-(N+3), peer_download.py:162):
# we sent a ``Range`` header (we have a partial), but
# the peer ignored it and returned full content with
# 200. Appending the body would duplicate the
# already-downloaded prefix, push ``n_read`` past
# ``expected_size``, and -- because offline mode
# skips hash verification -- silently poison the
# model file. Drop the partial and restart from
# zero on the next loop iteration so the next
# request gets fresh, intact bytes.
logger.warning(
f"Peer {peer_host} ignored Range header for "
f"{file_path} (returned 200 instead of 206); "
"discarding partial and restarting from zero"
)
await aios.remove(partial_path)
n_read = 0
elif r.status in (200, 206):
# Codex P1 (PR #16 round-(N+8), peer_download.py:187):
# bound the inner read by ``expected_size - n_read``
# and treat any extra bytes as a peer protocol
# violation. Pre-fix the loop kept appending until
# EOF and only checked ``n_read < expected_size``
# afterward, so an oversized response (peer
# serving a stale/wrong blob) was accepted as
# success and renamed into the model cache. In
# offline mode hash verification is skipped, so
# this silently poisoned local weights. Now we
# cap each chunk at the remaining budget and bail
# out the moment a peer tries to send extra data.
oversized_response = False
async with aiofiles.open(
partial_path, "ab" if n_read > 0 else "wb"
) as f:
Comment on lines +178 to +194
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Require 206 for ranged peer-download resumes

When resuming (n_read > 0), the client sends a Range header but still accepts HTTP 200 and appends the body to the existing partial file. A server is allowed to ignore Range and return full content with 200, which here will duplicate bytes, push n_read past expected_size, and still rename the oversized file as success; in offline mode this can silently poison model files because hash verification is skipped. On resume, treat non-206 as a restart/failure (or truncate and rewrite from zero) instead of appending.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved. Resolved by commit f34534c2 (Combine listening ports for keypair scope; restart on 200-on-resume). When n_read > 0 (resume) the client now requires HTTP 206; a 200 response triggers the partial file to be deleted and the download to restart from byte 0, preventing prepended-old + appended-new corruption.

while True:
remaining = expected_size - n_read
if remaining <= 0:
# We have everything we need. Read one
# more byte to detect peer
# over-supplying; if the stream isn't
# EOF, the peer is sending more bytes
# than ``expected_size`` claims.
tail = await r.content.read(1)
if tail:
oversized_response = True
break
chunk = await r.content.read(min(chunk_size, remaining))
if not chunk:
break
written = await f.write(chunk)
n_read += written
got_bytes = True
on_progress(n_read, expected_size, False)
if oversized_response:
# Discard the partial: we cannot trust any
# bytes from a peer that violates the
# advertised file size, especially in
# offline mode where hash verification is
# skipped. Restart from zero on the next
# iteration so a fresh request gets a
# well-bounded response.
logger.warning(
f"Peer {peer_host} returned oversized response for "
f"{file_path} (advertised {expected_size} bytes, "
"stream still had data when budget was exhausted); "
"discarding partial and restarting from zero"
)
await aios.remove(partial_path)
n_read = 0
elif r.status == 404:
logger.debug(f"File {file_path} not found on peer {peer_host}")
return None
else:
logger.warning(
f"Unexpected status {r.status} from peer {peer_host}"
)
return None

# Check if we're done
if n_read >= expected_size:
break

# If we got no new bytes, the peer might still be downloading
if not got_bytes:
poll_count += 1
logger.debug(
f"Waiting for peer {peer_host} to download more of {file_path} "
f"({n_read}/{expected_size}, poll {poll_count}/{max_poll_attempts})"
)
await asyncio.sleep(poll_interval)
else:
# Got data, reset poll counter
poll_count = 0

if n_read < expected_size:
logger.warning(
f"Peer download incomplete for {file_path}: {n_read}/{expected_size}"
)
return None

# Rename partial to final
await aios.rename(partial_path, target_path)
on_progress(expected_size, expected_size, True)
Comment thread
team-wcv marked this conversation as resolved.
logger.info(
f"Downloaded {file_path} from peer {peer_host} ({expected_size} bytes)"
)
return target_path

except Exception as e:
logger.warning(f"Peer download failed for {file_path} from {peer_host}: {e}")
return None
Loading