diff --git a/src/exo/main.py b/src/exo/main.py index 86a46561be..d901432e4c 100644 --- a/src/exo/main.py +++ b/src/exo/main.py @@ -127,7 +127,8 @@ async def create(cls, args: "Args") -> Self: election = Election( node_id, # If someone manages to assemble 1 MILLION devices into an exo cluster then. well done. good job champ. - seniority=1_000_000 if args.force_master else 0, + is_candidate=not bool(args.bootstrap_peers), + seniority=(2**31 - 1) if args.force_master else 0, # nb: this DOES feedback right now. i have thoughts on how to address this, # but ultimately it seems not worth the complexity election_message_sender=router.sender(topics.ELECTION_MESSAGES), diff --git a/src/exo/shared/election.py b/src/exo/shared/election.py index 958a83d2fa..8f7ccc8199 100644 --- a/src/exo/shared/election.py +++ b/src/exo/shared/election.py @@ -26,10 +26,10 @@ class ElectionMessage(FrozenModel): # Could eventually include a list of neighbour nodes for centrality def __lt__(self, other: Self) -> bool: - if self.clock != other.clock: - return self.clock < other.clock if self.seniority != other.seniority: return self.seniority < other.seniority + if self.clock != other.clock: + return self.clock < other.clock elif self.commands_seen != other.commands_seen: return self.commands_seen < other.commands_seen else: diff --git a/src/exo/shared/models/model_cards.py b/src/exo/shared/models/model_cards.py index 8911b9323c..fd59259372 100644 --- a/src/exo/shared/models/model_cards.py +++ b/src/exo/shared/models/model_cards.py @@ -345,6 +345,17 @@ async def fetch_config_data(model_id: ModelId) -> ConfigData: download_file_with_retry, resolve_model_dir, ) + from exo.shared.constants import EXO_MODELS_READ_ONLY_DIRS, EXO_MODELS_DIRS + # Check local directories first before attempting any HF download + logger.info(f"LOCAL CHECK: {model_id} dirs={[str(d) for d in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS)]}") + _normalized = model_id.normalize() + for _search_dir in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS): + _local_config = _search_dir / _normalized / "config.json" + if _local_config.exists(): + async with aiofiles.open(_local_config, "r") as f: + return ConfigData.model_validate_json( + await f.read(), context={"model_id": str(model_id)} + ) target_dir = await resolve_model_dir(model_id) config_path = await download_file_with_retry( @@ -369,6 +380,18 @@ async def fetch_safetensors_size(model_id: ModelId) -> Memory: resolve_model_dir, ) from exo.shared.types.worker.downloads import ModelSafetensorsIndex + from exo.shared.constants import EXO_MODELS_READ_ONLY_DIRS, EXO_MODELS_DIRS + # Check local directories first before attempting any HF download + logger.info(f"LOCAL CHECK: {model_id} dirs={[str(d) for d in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS)]}") + _normalized = model_id.normalize() + for _search_dir in (*EXO_MODELS_READ_ONLY_DIRS, *EXO_MODELS_DIRS): + _local_index = _search_dir / _normalized / "model.safetensors.index.json" + if _local_index.exists(): + async with aiofiles.open(_local_index, "r") as f: + index_data = ModelSafetensorsIndex.model_validate_json(await f.read()) + metadata = index_data.metadata + if metadata is not None and metadata.total_size is not None: + return Memory.from_bytes(metadata.total_size) target_dir = await resolve_model_dir(model_id) index_path = await download_file_with_retry( diff --git a/src/exo/worker/runner/llm_inference/tool_parsers.py b/src/exo/worker/runner/llm_inference/tool_parsers.py index 26140d9c4a..ec6990bb3e 100644 --- a/src/exo/worker/runner/llm_inference/tool_parsers.py +++ b/src/exo/worker/runner/llm_inference/tool_parsers.py @@ -240,5 +240,32 @@ def make_json_parser() -> ToolParser: def infer_tool_parser(chat_template: str) -> ToolParser | None: """Attempt to auto-infer a tool parser from the chat template.""" if "" in chat_template and "tool_call.name" in chat_template: + # Qwen3 uses v XML format + # inside tags, not JSON + if "function=" in chat_template: + return make_qwen_xml_parser() return make_json_parser() return None + + +def _parse_qwen_xml_calls(text: str) -> list[ToolCallItem] | None: + """Parse Qwen3's v XML format.""" + import re + text = text.removeprefix("").removesuffix("").strip() + fn_match = re.match(r"(.*)", text, re.DOTALL) + if not fn_match: + return None + name = fn_match.group(1) + params_text = fn_match.group(2) + arguments: dict[str, str] = {} + for m in re.finditer(r"\n?(.*?)\n?", params_text, re.DOTALL): + arguments[m.group(1)] = m.group(2).strip() + return [ToolCallItem(name=name, arguments=json.dumps(arguments))] + + +def make_qwen_xml_parser() -> ToolParser: + return ToolParser( + start_parsing="", + end_parsing="", + _inner_parser=_parse_qwen_xml_calls, + )