Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/exo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ async def create(cls, args: "Args") -> Self:
election = Election(
node_id,
# If someone manages to assemble 1 MILLION devices into an exo cluster then. well done. good job champ.
seniority=1_000_000 if args.force_master else 0,
is_candidate=not bool(args.bootstrap_peers),
seniority=(2**31 - 1) if args.force_master else 0,
# nb: this DOES feedback right now. i have thoughts on how to address this,
# but ultimately it seems not worth the complexity
election_message_sender=router.sender(topics.ELECTION_MESSAGES),
Expand Down
4 changes: 2 additions & 2 deletions src/exo/shared/election.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class ElectionMessage(FrozenModel):

# Could eventually include a list of neighbour nodes for centrality
def __lt__(self, other: Self) -> bool:
if self.clock != other.clock:
return self.clock < other.clock
if self.seniority != other.seniority:
return self.seniority < other.seniority
if self.clock != other.clock:
return self.clock < other.clock
elif self.commands_seen != other.commands_seen:
return self.commands_seen < other.commands_seen
else:
Expand Down
23 changes: 23 additions & 0 deletions src/exo/shared/models/model_cards.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ async def fetch_config_data(model_id: ModelId) -> ConfigData:
download_file_with_retry,
resolve_model_dir,
)
from exo.shared.constants import EXO_MODELS_READ_ONLY_DIRS, EXO_MODELS_DIRS
# Check local directories first before attempting any HF download
logger.info(f"LOCAL CHECK: {model_id} dirs={[str(d) for d in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS)]}")
_normalized = model_id.normalize()
for _search_dir in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS):
_local_config = _search_dir / _normalized / "config.json"
if _local_config.exists():
async with aiofiles.open(_local_config, "r") as f:
return ConfigData.model_validate_json(
await f.read(), context={"model_id": str(model_id)}
)

target_dir = await resolve_model_dir(model_id)
config_path = await download_file_with_retry(
Expand All @@ -369,6 +380,18 @@ async def fetch_safetensors_size(model_id: ModelId) -> Memory:
resolve_model_dir,
)
from exo.shared.types.worker.downloads import ModelSafetensorsIndex
from exo.shared.constants import EXO_MODELS_READ_ONLY_DIRS, EXO_MODELS_DIRS
# Check local directories first before attempting any HF download
logger.info(f"LOCAL CHECK: {model_id} dirs={[str(d) for d in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS)]}")
_normalized = model_id.normalize()
for _search_dir in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS):
_local_index = _search_dir / _normalized / "model.safetensors.index.json"
if _local_index.exists():
async with aiofiles.open(_local_index, "r") as f:
index_data = ModelSafetensorsIndex.model_validate_json(await f.read())
metadata = index_data.metadata
if metadata is not None and metadata.total_size is not None:
return Memory.from_bytes(metadata.total_size)

target_dir = await resolve_model_dir(model_id)
index_path = await download_file_with_retry(
Expand Down
27 changes: 27 additions & 0 deletions src/exo/worker/runner/llm_inference/tool_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,32 @@ def make_json_parser() -> ToolParser:
def infer_tool_parser(chat_template: str) -> ToolParser | None:
"""Attempt to auto-infer a tool parser from the chat template."""
if "<tool_call>" in chat_template and "tool_call.name" in chat_template:
# Qwen3 uses <function=Name><parameter=k>v</parameter></function> XML format
# inside <tool_call> tags, not JSON
if "function=" in chat_template:
return make_qwen_xml_parser()
return make_json_parser()
return None


def _parse_qwen_xml_calls(text: str) -> list[ToolCallItem] | None:
"""Parse Qwen3's <function=Name><parameter=k>v</parameter></function> XML format."""
import re
text = text.removeprefix("<tool_call>").removesuffix("</tool_call>").strip()
fn_match = re.match(r"<function=(\w+)>(.*)</function>", text, re.DOTALL)
if not fn_match:
return None
name = fn_match.group(1)
params_text = fn_match.group(2)
arguments: dict[str, str] = {}
for m in re.finditer(r"<parameter=(\w+)>\n?(.*?)\n?</parameter>", params_text, re.DOTALL):
arguments[m.group(1)] = m.group(2).strip()
return [ToolCallItem(name=name, arguments=json.dumps(arguments))]


def make_qwen_xml_parser() -> ToolParser:
return ToolParser(
start_parsing="<tool_call>",
end_parsing="</tool_call>",
_inner_parser=_parse_qwen_xml_calls,
)