diff --git a/cecli/__init__.py b/cecli/__init__.py index 6ea9f35ef0a..66fb65c8e64 100644 --- a/cecli/__init__.py +++ b/cecli/__init__.py @@ -1,6 +1,6 @@ from packaging import version -__version__ = "0.100.8.dev" +__version__ = "0.100.11.dev" safe_version = __version__ try: diff --git a/cecli/args.py b/cecli/args.py index 83286782492..a9dbab5114c 100644 --- a/cecli/args.py +++ b/cecli/args.py @@ -43,15 +43,10 @@ def get_parser(default_config_files, git_root): # List of valid edit formats for argparse validation & shtab completion. # Dynamically gather them from the registered coder classes so the list # stays in sync if new formats are added. - from cecli import coders as _cecli_coders + from cecli.coders import EDIT_FORMAT_MAP + + edit_format_choices = sorted(EDIT_FORMAT_MAP.keys()) - edit_format_choices = sorted( - { - c.edit_format - for c in _cecli_coders.__all__ - if hasattr(c, "edit_format") and c.edit_format is not None - } - ) group = parser.add_argument_group("Main model") group.add_argument( "files", metavar="FILE", nargs="*", help="files to edit with an LLM (optional)" @@ -1115,6 +1110,16 @@ def get_parser(default_config_files, git_root): " specified, a default command for your OS may be used." ), ) + group.add_argument( + "--notification-bell", + action=argparse.BooleanOptionalAction, + default=True, + help=( + "Allow notification commands to produce an audible bell. When enabled, command" + " output is not suppressed so terminal bell escape sequences can ring through" + " (default: True)" + ), + ) group.add_argument( "--command-prefix", default=None, diff --git a/cecli/coders/__init__.py b/cecli/coders/__init__.py index e5693180774..7c6c6abfc5a 100644 --- a/cecli/coders/__init__.py +++ b/cecli/coders/__init__.py @@ -44,6 +44,26 @@ } +EDIT_FORMAT_MAP = { + "help": "HelpCoder", + "ask": "AskCoder", + "diff": "EditBlockCoder", + "diff-fenced": "EditBlockFencedCoder", + "whole": "WholeFileCoder", + "patch": "PatchCoder", + "udiff": "UnifiedDiffCoder", + "udiff-simple": "UnifiedDiffSimpleCoder", + "architect": "ArchitectCoder", + "editor-diff": "EditorEditBlockCoder", + "editor-whole": "EditorWholeFileCoder", + "editor-diff-fenced": "EditorDiffFencedCoder", + "context": "ContextCoder", + "agent": "AgentCoder", + "hashline": "HashLineCoder", + "subagent": "SubAgentCoder", +} + + def __getattr__(name): if name in _MODULE_MAP: import importlib diff --git a/cecli/coders/base_coder.py b/cecli/coders/base_coder.py index e206fdae6de..b752735b25c 100755 --- a/cecli/coders/base_coder.py +++ b/cecli/coders/base_coder.py @@ -105,28 +105,6 @@ def wrap_fence(name): wrap_fence("sourcecode"), ] -# Map of edit_format values to coder class names. -# Used by Coder.create() to find the right coder class by edit_format -# without importing all coder modules. -EDIT_FORMAT_MAP = { - "help": "HelpCoder", - "ask": "AskCoder", - "diff": "EditBlockCoder", - "diff-fenced": "EditBlockFencedCoder", - "whole": "WholeFileCoder", - "patch": "PatchCoder", - "udiff": "UnifiedDiffCoder", - "udiff-simple": "UnifiedDiffSimpleCoder", - "architect": "ArchitectCoder", - "editor-diff": "EditorEditBlockCoder", - "editor-whole": "EditorWholeFileCoder", - "editor-diff-fenced": "EditorDiffFencedCoder", - "context": "ContextCoder", - "agent": "AgentCoder", - "hashline": "HashLineCoder", - "subagent": "SubAgentCoder", -} - class UsageMeta(type): """Metaclass that provides shared accumulator properties across all Coder subclasses. @@ -336,6 +314,7 @@ async def create( uuid=from_coder.uuid, parent_uuid=from_coder.parent_uuid, repo=from_coder.repo, + summarizer=from_coder.summarizer, ) use_kwargs.update(update) # override to complete the switch use_kwargs.update(kwargs) # override passed kwargs @@ -351,7 +330,7 @@ async def create( res = coders.CopyPasteCoder(main_model, io, args=args, **kwargs) if not res: - coder_name = EDIT_FORMAT_MAP.get(edit_format) + coder_name = coders.EDIT_FORMAT_MAP.get(edit_format) if coder_name: coder_cls = getattr(coders, coder_name) res = coder_cls(main_model, io, args=args, **kwargs) @@ -373,10 +352,14 @@ async def create( await res.initialize_mcp_tools() - res.original_kwargs = dict(kwargs) + # Store only small/primitive kwargs to avoid retaining large object references. + # Large objects (repo, mcp_manager, commands, summarizer, file_watcher, etc.) + # are either overridden during clone() or accessible from instance attributes. + _LARGE_KWARGS = {"repo", "mcp_manager", "commands", "summarizer", "file_watcher"} + res.original_kwargs = {k: v for k, v in kwargs.items() if k not in _LARGE_KWARGS} return res - valid_formats = list(EDIT_FORMAT_MAP.keys()) + valid_formats = list(coders.EDIT_FORMAT_MAP.keys()) raise UnknownEditFormat(edit_format, valid_formats) async def clone(self, **kwargs): diff --git a/cecli/coders/copypaste_coder.py b/cecli/coders/copypaste_coder.py index ec92f522e4b..5df361291d5 100644 --- a/cecli/coders/copypaste_coder.py +++ b/cecli/coders/copypaste_coder.py @@ -7,145 +7,134 @@ from cecli.exceptions import LiteLLMExceptions from cecli.llm import litellm -from .base_coder import Coder +from .base_coder import Coder, EmptyResponseError class CopyPasteCoder(Coder): - """Coder implementation that performs clipboard-driven interactions. + """Fixed marker class for isinstance checks and backward compatibility. - This coder swaps the transport mechanism (clipboard vs API) but must remain compatible with the - base ``Coder`` interface. In particular, many base methods assume ``self.gpt_prompts`` exists. + When ``copy_paste_mode`` is active, instantiating this class automatically + returns an instance of a dynamic subclass that inherits from **both** this + class and the coder matching the selected ``edit_format`` (e.g. + ``AgentCoder``, ``EditBlockCoder``, ``AskCoder``, …). The dynamic subclass + overrides ``send()`` with the clipboard-transport logic while inheriting + all other behaviour from the target coder. - We therefore mirror the prompt pack from the coder that matches the currently selected - ``edit_format``. + This fixed class exists only so that ``isinstance(coder, CopyPasteCoder)`` + works and the name remains importable from ``cecli.coders``. """ - # CopyPasteCoder doesn't have its own prompt format - it dynamically determines - # prompts based on the selected edit_format + # CopyPasteCoder doesn't have its own prompt format. prompt_format = None - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __new__(cls, *args, **kwargs): + """Intercept instantiation to return a dynamic subclass instance. - # Ensure CopyPasteCoder always has a prompt pack. - # We mirror prompts from the coder that matches the active edit format. - self._init_prompts_from_selected_edit_format() + When ``CopyPasteCoder()`` is called, this creates a dynamic subclass + that inherits from **both** ``CopyPasteCoder`` (for isinstance checks) + and the coder class matching the selected ``edit_format``. - @property - def gpt_prompts(self): - """Override gpt_prompts property for CopyPasteCoder. - - CopyPasteCoder dynamically determines prompts based on the selected edit format. - This property returns the prompts that were set by _init_prompts_from_selected_edit_format(). - """ - if not hasattr(self, "_gpt_prompts"): - raise AttributeError( - "CopyPasteCoder must call _init_prompts_from_selected_edit_format() " - "before accessing gpt_prompts" - ) - return self._gpt_prompts - - @gpt_prompts.setter - def gpt_prompts(self, value): - """Setter for gpt_prompts property.""" - self._gpt_prompts = value - - def _init_prompts_from_selected_edit_format(self): - """Initialize ``self.gpt_prompts`` based on the currently selected edit format. - - This prevents AttributeError crashes when base ``Coder`` code assumes ``self.gpt_prompts`` - exists (eg during message formatting, announcements, cancellation/cleanup paths, etc). + Python will automatically call ``__init__`` on the returned instance + because ``isinstance(result, CopyPasteCoder)`` is ``True``. """ - # Determine the selected edit_format the same way Coder.create() does. - selected_edit_format = None - if getattr(self, "args", None) is not None and getattr(self.args, "edit_format", None): - selected_edit_format = self.args.edit_format - else: - selected_edit_format = getattr(self.main_model, "edit_format", None) - - # "code" is treated like None in Coder.create() - if selected_edit_format == "code": - selected_edit_format = None - - # If no edit format is selected, fall back to model default. - if selected_edit_format is None: - selected_edit_format = getattr(self.main_model, "edit_format", None) - - # Find the coder class that would have been selected for this edit_format. - try: - import cecli.coders as coders - except Exception: - coders = None - - target_coder_class = None - if coders is not None and selected_edit_format: - # Use EDIT_FORMAT_MAP from base_coder to find the right coder class by edit_format - from cecli.coders.base_coder import EDIT_FORMAT_MAP - - coder_name = EDIT_FORMAT_MAP.get(selected_edit_format) - if coder_name: - target_coder_class = getattr(coders, coder_name, None) - - # Mirror prompt pack + edit_format where available. - if target_coder_class is not None: - # All coder classes must have prompt_format attribute - if ( - not hasattr(target_coder_class, "prompt_format") - or target_coder_class.prompt_format is None - ): - raise AttributeError( - f"Target coder class {target_coder_class.__name__} must have a 'prompt_format'" - " attribute." - ) + # If already instantiating a dynamic subclass, create normally + if cls is not CopyPasteCoder: + return super().__new__(cls) + + # --- resolve the effective edit format ------------------------------- + main_model = args[0] if args else None + args_obj = kwargs.get("args") + edit_format = None + if args_obj is not None: + edit_format = getattr(args_obj, "edit_format", None) + if not edit_format or edit_format == "code": + edit_format = getattr(main_model, "edit_format", None) + + # --- delegate to the factory function -------------------------------- + dynamic_cls = get_copy_paste_coder_class(edit_format, main_model) + return object.__new__(dynamic_cls) - prompt_name = target_coder_class.prompt_format - - # Get prompts from cache or load them - if prompt_name in Coder._prompt_cache: - self.gpt_prompts = Coder._prompt_cache[prompt_name] - else: - # Create a dummy instance to trigger prompt loading - dummy_instance = target_coder_class.__new__(target_coder_class) - dummy_instance.__class__ = target_coder_class - self.gpt_prompts = dummy_instance.gpt_prompts + prompt_format = None - # Keep announcements/formatting consistent with the selected coder. - self.edit_format = getattr(target_coder_class, "edit_format", self.edit_format) - return - # Last-resort fallback: avoid crashing if we can't determine the prompts. - # Prefer keeping any existing gpt_prompts (if one was set elsewhere). - if not hasattr(self, "_gpt_prompts"): - self.gpt_prompts = None +def get_copy_paste_coder_class(edit_format, main_model): + """Dynamically create a CopyPasteCoder class that inherits from the + coder class matching the selected ``edit_format``. + ``CopyPasteCoder`` is a transport-layer wrapper: it swaps the API + transport for clipboard transport while inheriting all behavioural + traits (editing, prompting, announcements, …) from the target coder + class (e.g. ``AgentCoder``, ``EditBlockCoder``, ``AskCoder``, …). + """ + import cecli.coders as coders + + # --- resolve the effective edit format ----------------------------------- + effective_format = edit_format + if not effective_format or effective_format == "code": + effective_format = getattr(main_model, "edit_format", None) + + # --- find the target base class ------------------------------------------ + if effective_format: + coder_name = coders.EDIT_FORMAT_MAP.get(effective_format) + base_class = getattr(coders, coder_name, Coder) if coder_name else Coder + else: + base_class = Coder + + # ---- create the dynamic class first so methods can reference it ----- + # Use a different local variable name to avoid shadowing the + # module-level CopyPasteCoder (needed for the bases tuple). + DynamicCopyPasteCoder = type( + "CopyPasteCoder", + (CopyPasteCoder, base_class), + { + "__module__": __name__, + "prompt_format": getattr(base_class, "prompt_format", None), + }, + ) + + # ---- clipboard-specific transport method -------------------------------- async def send(self, messages, model=None, functions=None, tools=None): model = model or self.main_model if getattr(model, "copy_paste_transport", "api") == "api": - async for chunk in super().send( - messages, model=model, functions=functions, tools=tools + # Fall through to the base coder's normal send() + async for chunk in base_class.send( + self, messages, model=model, functions=functions, tools=tools ): yield chunk return - if functions: - self.io.tool_warning("copy/paste mode ignores function call requests.") - if tools: - self.io.tool_warning("copy/paste mode ignores tool call requests.") + self.interrupt_event.clear() + self.got_reasoning_content = False + self.ended_reasoning_content = False + self.empty_response = False + self._streaming_buffer_length = 0 self.io.reset_streaming_response() - # Base Coder methods (eg show_send_output/preprocess_response) expect these streaming - # attributes to always exist, even when we bypass the normal API streaming path. + # Base Coder methods (eg show_send_output/preprocess_response) expect these + # streaming attributes to always exist, even when we bypass the normal API + # streaming path. self.partial_response_content = "" - self.partial_response_function_call = None - # preprocess_response() does len(self.partial_response_tool_calls), so it must not be None. + self.partial_response_reasoning_content = "" + self.partial_response_chunks = [] self.partial_response_tool_calls = [] + self.partial_response_function_call = dict() + # preprocess_response() does len(self.partial_response_tool_calls), + # so it must not be None. + self.partial_response_consolidated = None try: hash_object, completion = self.copy_paste_completion(messages, model) self.chat_completion_call_hashes.append(hash_object.hexdigest()) await self.show_send_output(completion) + + if self.empty_response: + raise EmptyResponseError + + response, func_err, content_err = self.consolidate_chunks() + if response: + completion = response self.calculate_and_show_tokens_and_cost(messages, completion) finally: self.preprocess_response() @@ -153,10 +142,12 @@ async def send(self, messages, model=None, functions=None, tools=None): if self.partial_response_content: self.io.ai_output(self.partial_response_content) + # ---- clipboard completion logic ------------------------------------------ def copy_paste_completion(self, messages, model): + try: from cecli.helpers import copypaste - except ImportError: # pragma: no cover - import error path + except ImportError: self.io.tool_error("copy/paste mode requires the pyperclip package.") self.io.tool_output("Install it with: pip install pyperclip") raise @@ -197,9 +188,22 @@ def content_to_text(content): prompt_text = "\n\n".join(lines).strip() + # --- incremental context: strip previously-sent exchange --- + last_prompt = getattr(self, "_last_prompt_text", None) + last_response = getattr(self, "_last_response_text", None) + _full_prompt = prompt_text # Keep the original for storage + if last_prompt is not None and last_response is not None: + expected_prefix = f"{last_prompt}\n\nASSISTANT:\n{last_response}" + if prompt_text.startswith(expected_prefix): + new_only = prompt_text[len(expected_prefix) :].strip() + if new_only: + prompt_text = new_only + + self._last_prompt_text = _full_prompt + try: copypaste.copy_to_clipboard(prompt_text) - except copypaste.ClipboardError as err: # pragma: no cover - clipboard error path + except copypaste.ClipboardError as err: self.io.tool_error(f"Unable to copy prompt to clipboard: {err}") raise @@ -209,17 +213,18 @@ def content_to_text(content): try: last_value = copypaste.read_clipboard() - except copypaste.ClipboardError as err: # pragma: no cover - clipboard error path + except copypaste.ClipboardError as err: self.io.tool_error(f"Unable to read clipboard: {err}") raise try: response_text = copypaste.wait_for_clipboard_change(initial=last_value) - except copypaste.ClipboardError as err: # pragma: no cover - clipboard error path + except copypaste.ClipboardError as err: self.io.tool_error(f"Unable to read clipboard: {err}") raise - # Estimate tokens locally using the model's tokenizer; fallback to heuristic. + self._last_response_text = response_text + def _safe_token_count(text): """Return token count via the model tokenizer, falling back to a heuristic.""" if not text: @@ -229,7 +234,6 @@ def _safe_token_count(text): if isinstance(count, int) and count >= 0: return count except Exception as ex: - # Try to map known LiteLLM exceptions to user-friendly messages, then fall back. try: ex_info = LiteLLMExceptions().get_ex_info(ex) if ex_info and ex_info.description: @@ -237,7 +241,6 @@ def _safe_token_count(text): f"Token count failed: {ex_info.description} Falling back to heuristic." ) except Exception: - # Avoid masking the original issue during error mapping. pass return int(math.ceil(len(text) / 4)) @@ -264,5 +267,10 @@ def _safe_token_count(text): ) kwargs = dict(model=model.name, messages=messages, stream=False) - hash_object = hashlib.sha1(json.dumps(kwargs, sort_keys=True).encode()) # nosec B324 + hash_object = hashlib.sha1(json.dumps(kwargs, sort_keys=True).encode()) return hash_object, completion + + # ---- attach the clipboard methods and return ----------------------------- + DynamicCopyPasteCoder.send = send + DynamicCopyPasteCoder.copy_paste_completion = copy_paste_completion + return DynamicCopyPasteCoder diff --git a/cecli/commands/core.py b/cecli/commands/core.py index 53aabd70e8c..29823d6cddd 100644 --- a/cecli/commands/core.py +++ b/cecli/commands/core.py @@ -1,6 +1,7 @@ import json import re import sys +import weakref from pathlib import Path from cecli.commands.utils.registry import CommandRegistry @@ -55,6 +56,18 @@ def __init__(self, message="Reloading program configuration...", **kwargs): class Commands: scraper = None + def _get_coder(self): + """Return coder via weak reference, or None if collected.""" + if self._coder_ref is not None: + return self._coder_ref() + return None + + def _set_coder(self, value): + """Store coder as weakref to break circular reference chains.""" + self._coder_ref = weakref.ref(value) if value is not None else None + + coder = property(_get_coder, _set_coder) + def clone(self): cloned = Commands( self.io, @@ -87,7 +100,8 @@ def __init__( original_read_only_fnames=None, ): self.io = io - self.coder = coder + # Use weak ref to avoid circular reference chains + self._coder_ref = weakref.ref(coder) if coder else None self.parser = parser self.args = args self.verbose = verbose diff --git a/cecli/helpers/conversation/files.py b/cecli/helpers/conversation/files.py index db52dca4b2e..ca239aeb92f 100644 --- a/cecli/helpers/conversation/files.py +++ b/cecli/helpers/conversation/files.py @@ -117,7 +117,7 @@ def add_file( content = coder.io.read_text(abs_fname, silent=True) if coder.hashlines: content, json_str = hashline_formatted( - content, file_name=abs_fname, partial=False + content, file_name=abs_fname, partial=False, expanded=False ) self._file_json_contents[abs_fname] = json_str except Exception: @@ -237,7 +237,7 @@ def generate_diff(self, fname: str) -> Optional[str]: current_content = coder.io.read_text(abs_fname, silent=True) if coder.hashlines: current_content, _ = hashline_formatted( - current_content, file_name=abs_fname, partial=False + current_content, file_name=abs_fname, partial=False, expanded=False ) except Exception: return None @@ -260,6 +260,7 @@ def generate_diff(self, fname: str) -> Optional[str]: new_content=current_content, fromfile=f"{rel_fname} (snapshot)", tofile=f"{rel_fname} (current)", + context_lines=3, ) # If there's a diff, update the last snapshot with current content @@ -606,7 +607,11 @@ def get_file_context(self, file_path: str, all_ranges=False, check_versions=True continue _, json_str = hashline_formatted( - range_content, file_name=abs_fname, partial=True, start_line=start_line_adj + range_content, + file_name=abs_fname, + partial=True, + expanded=False, + start_line=start_line_adj, ) results.append(json.loads(json_str)) diff --git a/cecli/helpers/conversation/integration.py b/cecli/helpers/conversation/integration.py index 7510976817d..464023f6a3e 100644 --- a/cecli/helpers/conversation/integration.py +++ b/cecli/helpers/conversation/integration.py @@ -104,6 +104,10 @@ def add_system_messages(self) -> None: priority=75 + i, ) + if getattr(coder, "copy_paste_mode", False): + # Add copy-paste mode tool instructions for inline tool calling + self.add_copy_paste_tool_instructions() + if self._cancel_post_message_injections(): return @@ -1035,6 +1039,84 @@ def _cancel_post_message_injections(self, modulus=10): return False + def add_copy_paste_tool_instructions(self) -> None: + """ + Add tool instructions for copy/paste mode. + + When in copy/paste mode, the model cannot use native API tool calling so + we inject instructions on how to embed tool calls inline using XML, JSON, + or bracket formats. These are parsed out by the extract_tools_from_content_* + functions in ``responses.py``. + """ + coder = self.get_coder() + if not coder: + return + + tool_list = coder.get_tool_list() + if not tool_list: + return + + # Prepare a clean, serializable version of the tool list + tool_descriptions = [] + for prefixed_tool in tool_list: + if isinstance(prefixed_tool, dict): + func = prefixed_tool.get("function", {}) + tool_descriptions.append( + { + "name": func.get("name", ""), + "description": func.get("description", ""), + "parameters": func.get("parameters", {}), + } + ) + elif hasattr(prefixed_tool, "function") and hasattr(prefixed_tool.function, "name"): + tool_descriptions.append( + { + "name": getattr(prefixed_tool.function, "name", ""), + "description": getattr(prefixed_tool.function, "description", ""), + "parameters": getattr(prefixed_tool.function, "parameters", {}), + } + ) + + tool_list_json = json.dumps(tool_descriptions, indent=2) + + inline_tool_instructions = ( + '\n' + "You are in copy/paste mode. Since the LLM API transport is unavailable, " + "you MUST embed tool calls directly in your response content using one of the " + "following formats. The system will parse these out and execute them.\n\n" + "Available tools:\n" + f"{tool_list_json}\n\n" + "TOOL CALLING FORMATS (use any of these):\n\n" + "1. XML Function Format:\n" + " \n" + " param_value_json\n" + " \n\n" + " Example:\n" + " \n" + ' [{"file_path": "example.py", "range_start": "def hello",' + ' "range_end": "def goodbye"}]\n' + " \n\n" + "2. JSON Tool-Call Format:\n" + ' Embed a JSON object with "name" and "arguments" keys.\n\n' + " Example:\n" + ' {"name": "Local--ReadRange", "arguments": {"read": [{"file_path": "example.py",' + ' "range_start": "class A", "range_end": "class Z"}]}}\n\n' + "3. Bracket Format:\n" + " [ToolName(key1=value1, key2=value2)]\n\n" + " Example:\n" + ' [Local--ReadRange(read=[{"file_path": "example.py", "range_start": "class A"}])]\n\n' + "IMPORTANT: Use the FULL prefixed tool name " + '(e.g., "Local--ReadRange" not just "ReadRange").\n' + "" + ) + + ConversationService.get_manager(coder).add_message( + message_dict={"role": "system", "content": inline_tool_instructions}, + tag=MessageTag.SYSTEM, + hash_key=("main", "copy_paste_tools"), + force=True, + ) + def _shuffle_reminders(self, content: str) -> str: """ If the string is a critical_reminders block, shuffle all bulleted points diff --git a/cecli/helpers/hashline.py b/cecli/helpers/hashline.py index 359fc2ca3a8..e886558f1d8 100644 --- a/cecli/helpers/hashline.py +++ b/cecli/helpers/hashline.py @@ -1,6 +1,7 @@ import difflib import json import re +from collections import Counter from cecli.helpers.hashpos.hashpos import HashPos @@ -29,7 +30,7 @@ def hashline(text: str, start_line: int = 1) -> str: def hashline_formatted( - text: str, file_name: str, partial: bool, start_line: int = 1 + text: str, file_name: str, partial: bool, expanded: bool, start_line: int = 1 ) -> tuple[str, str]: """ Generate hashline-formatted content and return it as both raw hashline text and a JSON structure. @@ -55,6 +56,7 @@ def hashline_formatted( "start_line": start_line, "end_line": end_line, "partial": partial, + "expanded": expanded, "prefixed_contents": prefixed, } @@ -1427,109 +1429,134 @@ def _apply_closure_safeguard( original_content: str, resolved_ops: list, file_path: str = None, -) -> list: +) -> tuple[list, list]: """ - Use tree-sitter to heal edit boundaries by simulating edits and checking syntax. + Use tree-sitter to heal edit boundaries by simulating edits and scoring by AST health. + Evicts edits that strictly increase the number of syntax errors. - For each replace/delete operation, this simulates applying the edit with - the replacement text, parses the resulting code with tree-sitter, and - checks if the result is syntactically valid (no ERROR/MISSING nodes). - - If the edit produces invalid syntax (e.g., missing closing braces), - it progressively expands/contracts the start and end boundaries by a - step and re-tests until the resulting code parses correctly or the - maximum number of expansion steps is exhausted. + Returns: + tuple: (safe_resolved_ops, rejected_ops) + - safe_resolved_ops: Operations with healed boundaries + - rejected_ops: List of dicts with {"index": int, "error": str} for evicted operations + """ + if not resolved_ops or not file_path: + return resolved_ops, [] - This prevents the common LLM edit error of "eating" outer scope - closing braces, parentheses, or brackets by finding the nearest - syntactically valid edit boundary. + from cecli.helpers.grep_ast.parsers import filename_to_lang + from cecli.helpers.grep_ast.tsl import get_language, get_parser - Args: - original_content: Original source code (without hashlines) - resolved_ops: List of resolved operation dicts with start_idx/end_idx - file_path: File path to determine tree-sitter language + lang = filename_to_lang(file_path) + if not lang: + return resolved_ops, [] - Returns: - Modified resolved_ops with healed boundaries - """ + try: + language = get_language(lang) # noqa + parser = get_parser(lang) + except Exception: + return resolved_ops, [] - def is_syntactically_valid(source_bytes: bytes, parser) -> bool: - try: - tree = parser.parse(source_bytes) - return not tree.root_node.has_error - except Exception: - return False + source_lines = original_content.splitlines() + source_bytes = original_content.encode("utf-8") def apply_edit( - source_lines: list[str], + lines: list[str], start: int, end: int, replacement: list[str], ) -> bytes: """Applies the edit and returns the new source code as bytes.""" - new_lines = source_lines[:start] + replacement + source_lines[end + 1 :] + new_lines = lines[:start] + replacement + lines[end + 1 :] return "\n".join(new_lines).encode("utf-8") - def count_closures(source_bytes: bytes) -> int: - """Counts the total number of brackets, braces, and parenthesis.""" - return sum(source_bytes.count(b) for b in b"{}[]()") + def count_ast_errors(node) -> int: + """ + Recursively counts MISSING or ERROR nodes in the AST. + """ + errors = 0 + if node.is_missing or node.type == "ERROR": + errors += 1 + for child in node.children: + errors += count_ast_errors(child) + return errors + + def _detect_indent_unit(source_lines: list) -> int: + """Detect the common indentation unit from non-empty source lines.""" + indents = [] + for line in source_lines: + if line.strip(): # non-empty + indent = len(line) - len(line.lstrip()) + indents.append(indent) + + if len(indents) < 2: + return 0 + + # Compute differences between consecutive non-empty lines + diffs = [] + for i in range(1, len(indents)): + diff = abs(indents[i] - indents[i - 1]) + if diff > 0: + diffs.append(diff) + + if not diffs: + return 0 + + # Most common non-zero difference is the indentation unit + return Counter(diffs).most_common(1)[0][0] + + def _has_indent_jump(test_lines: list, unit: int) -> bool: + """Check if the edit result creates an indentation jump of 2+ units.""" + if unit == 0: + return False - def get_indentation(line: str) -> int: - """Helper to safely calculate the leading whitespace of a line.""" - return len(line) - len(line.lstrip(" \t")) + prev_indent = None + for line in test_lines: + if line.strip(): # non-empty + indent = len(line) - len(line.lstrip()) + if prev_indent is not None: + diff = abs(indent - prev_indent) + if diff > unit: + return True + prev_indent = indent - if not resolved_ops or not file_path: - return resolved_ops + return False - # Determine language from file path - from cecli.helpers.grep_ast.parsers import filename_to_lang - from cecli.helpers.grep_ast.tsl import get_language, get_parser + # Establish the baseline syntax health of the original file + base_tree = parser.parse(source_bytes) + baseline_error_count = count_ast_errors(base_tree.root_node) - lang = filename_to_lang(file_path) - if not lang: - return resolved_ops - - # Set up tree-sitter parser - try: - language = get_language(lang) # noqa - parser = get_parser(lang) - except Exception: - # Can't determine language, skip safeguard - return resolved_ops + # Detect if the original source follows normal indentation (steps of 1 unit) + indent_unit = _detect_indent_unit(source_lines) + is_normally_indented = indent_unit > 0 - source_lines = original_content.splitlines() + safe_resolved_ops = [] + rejected_ops = [] for resolved in resolved_ops: op = resolved["op"] if op["operation"] not in {"replace", "insert", "delete"}: + safe_resolved_ops.append(resolved) continue llm_start = resolved["start_idx"] llm_end = resolved["end_idx"] - # Clamp to valid bounds if llm_start < 0 or llm_start >= len(source_lines): + safe_resolved_ops.append(resolved) continue if llm_end < llm_start: + safe_resolved_ops.append(resolved) continue - # Get replacement text replacement_text = op.get("text", "") or "" if op["operation"] == "delete": replacement_text = "" repl_lines = replacement_text.splitlines() - # Clamp end to valid range llm_end = min(llm_end, len(source_lines) - 1) - # --- THE HEALING LOOP --- - # Generate all combinations of offsets in {-2, -1, 0, 1, 2} for both - # start and end, then filter to syntactically valid edits and rank - # by cumulative movement (abs(start_shift) + abs(end_shift)) as a - # primary criterion to bias toward minimal range perturbation. - offsets = [0, 1, -1, 2, -2] all_candidates = [] + for start_shift in offsets: for end_shift in offsets: candidate_start = max(0, llm_start - start_shift) @@ -1538,7 +1565,6 @@ def get_indentation(line: str) -> int: if candidate_end < candidate_start: continue - # Skip candidates that overlap with other operations' ranges overlaps = False for other_op in resolved_ops: if other_op is resolved: @@ -1552,87 +1578,61 @@ def get_indentation(line: str) -> int: if overlaps: continue - # Skip candidates that would create duplicate adjacent content at edit boundaries if _would_create_duplicate_content( source_lines, candidate_start, candidate_end, repl_lines ): continue - test_source = apply_edit( + test_source_bytes = apply_edit( source_lines, candidate_start, candidate_end, repl_lines, ) - if is_syntactically_valid(test_source, parser): - # Determine properties for tiebreaking - is_partial = (start_shift == 0) ^ (end_shift == 0) # XOR: exactly one is zero - is_downward = start_shift <= 0 # Negative/zero shift = moving down - is_both = start_shift == end_shift # Whole range expansion/contraction - closures = count_closures(test_source) - start_line_match = source_lines[candidate_start] == source_lines[llm_start] - end_line_match = source_lines[candidate_end] == source_lines[llm_end] - cumulative_movement = abs(start_shift) + abs(end_shift) - - # --- INDENTATION SCORING --- - indent_score = 0 - if repl_lines: - # Safely grab the first and last non-empty replacement lines - first_repl = next((line for line in repl_lines if line.strip()), "") - last_repl = next( - (line for line in reversed(repl_lines) if line.strip()), "" - ) - - # Check Start Boundary Match - if first_repl and candidate_start < len(source_lines): - source_start_line = source_lines[candidate_start] - if source_start_line.strip() and get_indentation( - first_repl - ) == get_indentation(source_start_line): - indent_score += 1 - - # Check End Boundary Match - if last_repl and candidate_end < len(source_lines): - source_end_line = source_lines[candidate_end] - if source_end_line.strip() and get_indentation( - last_repl - ) == get_indentation(source_end_line): - indent_score += 1 - - all_candidates.append( - { - "start_idx": candidate_start, - "end_idx": candidate_end, - "start_line_match": start_line_match, - "end_line_match": end_line_match, - "source_len": len(test_source), - "is_partial": is_partial, - "is_downward": is_downward, - "is_both": is_both, - "closure_count": closures, - "indent_score": indent_score, - "cumulative_movement": cumulative_movement, - "offsets": (start_shift, end_shift), - } - ) + # Parse the candidate to get structural truth + edited_tree = parser.parse(test_source_bytes) + ast_error_count = count_ast_errors(edited_tree.root_node) + + is_partial = (start_shift == 0) ^ (end_shift == 0) + is_downward = start_shift <= 0 + is_both = start_shift == end_shift + start_line_match = source_lines[candidate_start] == source_lines[llm_start] + end_line_match = source_lines[candidate_end] == source_lines[llm_end] + cumulative_movement = abs(start_shift) + abs(end_shift) + + # Check for indentation jumps in the resulting edit + if is_normally_indented: + test_lines = test_source_bytes.decode("utf-8").splitlines() + has_indent_jump = _has_indent_jump(test_lines, indent_unit) + else: + has_indent_jump = False + + all_candidates.append( + { + "start_idx": candidate_start, + "end_idx": candidate_end, + "ast_error_count": ast_error_count, + "start_line_match": start_line_match, + "end_line_match": end_line_match, + "cumulative_movement": cumulative_movement, + "has_indent_jump": has_indent_jump, + "is_partial": is_partial, + "is_downward": is_downward, + "is_both": is_both, + } + ) if all_candidates: - # Sort using the new hierarchy: - # 1. Cumulative movement (ascending — less movement is better) - # 2. Fewest total closures (minimize structural pollution) - # 3. Indentation Score (Descending: 2 is better than 0) - # 4. Longest source (preserve more file content) - # 5. Partial expansions over full range shifts - # 6. Downward changes over upward changes + # 1. AST Health is the absolute highest priority. + # 2. Cumulative movement acts as the tie-breaker for equally valid ASTs. all_candidates.sort( key=lambda r: ( + r["ast_error_count"], # Ascending: fewer syntax errors wins not r["start_line_match"], # Descending: True first not r["end_line_match"], # Descending: True first + r.get("has_indent_jump", False), # Ascending: False (no jump) first r["cumulative_movement"], # Ascending: smaller is better - -r["indent_score"], # Descending: larger score is better (2 > 1 > 0) - -r["source_len"], # Descending: larger is better - r["closure_count"], # Ascending: smaller is better not r["is_partial"], # Booleans: False comes before True not r["is_downward"], # Booleans: False comes before True r["is_both"], # Booleans: False comes before True @@ -1640,10 +1640,30 @@ def get_indentation(line: str) -> int: ) best = all_candidates[0] + + # EVICTION GATE: If the absolute best boundary still introduces NEW + # syntax errors compared to the original file, discard the edit entirely. + if best["ast_error_count"] > baseline_error_count: + + rejected_ops.append( + { + "index": resolved["index"], + "error": ( + f"Edit introduces new syntax error(s) (baseline: {baseline_error_count}, " + f"after edit: {best['ast_error_count']}) and none of the " + f"{len(all_candidates)} candidate boundary adjustments could resolve it" + ), + "operation": op, + } + ) + continue + resolved["start_idx"] = best["start_idx"] resolved["end_idx"] = best["end_idx"] - return resolved_ops + safe_resolved_ops.append(resolved) + + return safe_resolved_ops, rejected_ops def apply_hashline_operations( @@ -1794,7 +1814,16 @@ def apply_hashline_operations( if file_path: # Apply tree-sitter based closure safeguard to snap boundaries to AST nodes - resolved_ops = _apply_closure_safeguard(original_content, resolved_ops, file_path) + safe_ops, rejected_ops = _apply_closure_safeguard(original_content, resolved_ops, file_path) + resolved_ops = safe_ops + for rejected in rejected_ops: + failed_ops.append( + { + "index": rejected["index"], + "error": rejected["error"], + "operation": rejected["operation"], + } + ) # Fix edit boundaries where replacement content duplicates adjacent lines source_lines = original_content.splitlines() diff --git a/cecli/helpers/skills.py b/cecli/helpers/skills.py index 40bf2a3bd45..e5331de7ba8 100644 --- a/cecli/helpers/skills.py +++ b/cecli/helpers/skills.py @@ -6,6 +6,7 @@ """ import re +import weakref from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional @@ -69,7 +70,7 @@ def __init__( self.include_list = set(include_list) if include_list else None self.exclude_list = set(exclude_list) if exclude_list else set() self.git_root = Path(git_root).expanduser().resolve() if git_root else None - self.coder = coder # Weak reference to coder instance + self._coder_ref = weakref.ref(coder) if coder else None # Weak reference to coder instance # Cache for loaded skills self._skills_cache: Dict[str, SkillContent] = {} @@ -105,6 +106,18 @@ def __init__( # Save initial state from config + def _get_coder(self): + """Return coder via weak reference, or None if collected.""" + if self._coder_ref is not None: + return self._coder_ref() + return None + + def _set_coder(self, value): + """Store coder as weakref to break circular reference chains.""" + self._coder_ref = weakref.ref(value) if value is not None else None + + coder = property(_get_coder, _set_coder) + def _save_state(self): """Save current mutable state to the global skill state store. diff --git a/cecli/io.py b/cecli/io.py index a140bd516a9..d7633a52579 100644 --- a/cecli/io.py +++ b/cecli/io.py @@ -367,6 +367,7 @@ def __init__( root=".", notifications=False, notifications_command=None, + notification_bell=True, verbose=False, ): self.console = Console() @@ -384,6 +385,7 @@ def __init__( self.multiline_mode = multiline_mode self.bell_on_next_input = False self.notifications = notifications + self.notification_bell = notification_bell self.verbose = verbose self.profile_start_time = None self.profile_last_time = None @@ -1692,31 +1694,39 @@ def llm_started(self): self.bell_on_next_input = True def get_default_notification_command(self): - """Return a default notification command based on the operating system.""" + """Return a command that triggers a system bell followed by a notification.""" import platform + import shutil system = platform.system() - if system == "Darwin": # macOS - # Check for terminal-notifier first + # 1. Define the Bell component + if system == "Windows": + bell_cmd = "powershell -c [Console]::Beep(1000, 200)" + elif system == "Darwin": + bell_cmd = "osascript -e 'beep'" + else: # Linux/Unix + bell_cmd = "tput bel" if shutil.which("tput") else "echo -e '\\a'" + + # 2. Define the Notification component + notif_cmd = None + if system == "Darwin": if shutil.which("terminal-notifier"): - return f"terminal-notifier -title 'cecli' -message '{NOTIFICATION_MESSAGE}'" - # Fall back to osascript - return ( - f'osascript -e \'display notification "{NOTIFICATION_MESSAGE}" with title "cecli"\'' - ) + notif_cmd = f"terminal-notifier -title 'cecli' -message '{NOTIFICATION_MESSAGE}'" + else: + notif_cmd = f'osascript -e \'display notification "{NOTIFICATION_MESSAGE}" with title "cecli"\'' + elif system == "Linux": - # Check for common Linux notification tools for cmd in ["notify-send", "zenity"]: if shutil.which(cmd): if cmd == "notify-send": - return f"notify-send 'cecli' '{NOTIFICATION_MESSAGE}'" + notif_cmd = f"notify-send 'cecli' '{NOTIFICATION_MESSAGE}'" elif cmd == "zenity": - return f"zenity --notification --text='{NOTIFICATION_MESSAGE}'" - return None # No known notification tool found + notif_cmd = f"zenity --notification --text='{NOTIFICATION_MESSAGE}'" + break + elif system == "Windows": - # PowerShell toast notification - ps_command = ( + ps_body = ( ' "try {{ Add-Type -AssemblyName System.Runtime.WindowsRuntime; $null =' " [Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications," " ContentType = WindowsRuntime] }} catch {{}}; " @@ -1732,9 +1742,15 @@ def get_default_notification_command(self): "[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier('cecli')" '.Show($toast)"' ) - return "powershell -WindowStyle Hidden -Command" + ps_command + notif_cmd = f"powershell -WindowStyle Hidden -Command {ps_body}" - return None # Unknown system + # 3. Concatenate them + if notif_cmd: + # Using ';' as a command separator works for both shell=True on Unix and Windows + return f"{bell_cmd} ; {notif_cmd}" + + # Fallback if no notification tool is found + return bell_cmd def _send_notification(self): # Cooldown to prevent notification spam @@ -1747,11 +1763,20 @@ def _send_notification(self): try: # Use Popen to run the command in the background without waiting for it # and without capturing its output, detaching it from the current terminal session. + + # Determine if this is a terminal bell command that should not be suppressed + is_bell_cmd = ( + "\\a" in self.notifications_command or "\a" in self.notifications_command + ) and re.search(r"(?:echo|print)", self.notifications_command, re.IGNORECASE) + kwargs = { "shell": True, - "stdout": subprocess.DEVNULL, - "stderr": subprocess.DEVNULL, } + + if not (is_bell_cmd or self.notification_bell): + kwargs["stdout"] = subprocess.DEVNULL + kwargs["stderr"] = subprocess.DEVNULL + if platform.system() == "Windows": kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW else: diff --git a/cecli/main.py b/cecli/main.py index 6cb779e7bca..2cd40d7cc9b 100644 --- a/cecli/main.py +++ b/cecli/main.py @@ -273,10 +273,10 @@ def parse_lint_cmds(lint_cmds, io): def register_models(git_root, model_settings_fname, io, verbose=False): from cecli import models - from cecli.helpers.file_searcher import generate_search_path_list + from cecli.helpers.file_searcher import generate_search_path_list, handle_core_files model_settings_files = generate_search_path_list( - ".cecli.model.settings.yml", git_root, model_settings_fname + handle_core_files(".cecli.model.settings.yml"), git_root, model_settings_fname ) try: files_loaded = models.register_models(model_settings_files) @@ -326,13 +326,13 @@ def load_dotenv_files(git_root, dotenv_fname, encoding="utf-8"): def register_litellm_models(git_root, model_metadata_fname, io, verbose=False): from cecli import models - from cecli.helpers.file_searcher import generate_search_path_list + from cecli.helpers.file_searcher import generate_search_path_list, handle_core_files model_metadata_files = [] resource_metadata = importlib_resources.files("cecli.resources").joinpath("model-metadata.json") model_metadata_files.append(str(resource_metadata)) model_metadata_files += generate_search_path_list( - ".cecli.model.metadata.json", git_root, model_metadata_fname + handle_core_files(".cecli.model.metadata.json"), git_root, model_metadata_fname ) try: model_metadata_files_loaded = models.register_litellm_models(model_metadata_files) @@ -697,6 +697,7 @@ def get_io(pretty): multiline_mode=args.multiline, notifications=args.notifications, notifications_command=args.notifications_command, + notification_bell=args.notification_bell, verbose=args.verbose, ) diff --git a/cecli/repo.py b/cecli/repo.py index 9a55146c111..4b42387257e 100644 --- a/cecli/repo.py +++ b/cecli/repo.py @@ -1,5 +1,6 @@ import contextlib import os +import sys import time from pathlib import Path, PurePosixPath @@ -84,7 +85,9 @@ def __init__( self.models = models self.normalized_path = {} - self.tree_files = {} + # Single-entry file cache: (commit_sha, interned_set_of_paths) + # Using a single entry (not per-commit dict) limits unbounded memory growth + self._tree_cache = None self.attribute_author = attribute_author self.attribute_committer = attribute_committer @@ -602,8 +605,8 @@ def get_tracked_files(self): files = set() if commit: - if commit in self.tree_files: - files = self.tree_files[commit] + if self._tree_cache is not None and self._tree_cache[0] == commit.hexsha: + files = self._tree_cache[1] else: try: iterator = commit.tree.traverse() @@ -612,7 +615,8 @@ def get_tracked_files(self): try: blob = next(iterator) if blob.type == "blob": # blob is a file - files.add(blob.path) + # Use sys.intern() to deduplicate path strings in memory + files.add(sys.intern(blob.path)) except IndexError: # Handle potential index error during tree traversal # without relying on potentially unassigned 'blob' @@ -629,7 +633,10 @@ def get_tracked_files(self): self.io.tool_output("Is your git repo corrupted?") return [] files = set(self.normalize_path(path) for path in files) - self.tree_files[commit] = set(files) + # Use single-entry cache (not per-commit dict) to limit memory growth + # Store only the SHA string (not the Commit object) to avoid retaining + # the entire git object graph (tree, blobs, parent commits, etc.) + self._tree_cache = (commit.hexsha, files) # Add staged files index = self.repo.index diff --git a/cecli/resources/model-settings.yml b/cecli/resources/model-settings.yml index 995cc25a2bf..3be10dab32f 100644 --- a/cecli/resources/model-settings.yml +++ b/cecli/resources/model-settings.yml @@ -2115,26 +2115,26 @@ - name: openrouter/anthropic/claude-opus-4.6 edit_format: diff - weak_model_name: openrouter/anthropic/claude-haiku-4-5 + weak_model_name: openrouter/anthropic/claude-haiku-4.5 use_repo_map: true examples_as_sys_msg: false extra_params: max_tokens: 128000 cache_control: true - editor_model_name: openrouter/anthropic/claude-sonnet-4-5 + editor_model_name: openrouter/anthropic/claude-sonnet-4.5 editor_edit_format: editor-diff accepts_settings: ["thinking_tokens"] overeager: true - name: openrouter/anthropic/claude-opus-4.7 edit_format: diff - weak_model_name: openrouter/anthropic/claude-haiku-4-5 + weak_model_name: openrouter/anthropic/claude-haiku-4.5 use_repo_map: true examples_as_sys_msg: false extra_params: max_tokens: 128000 cache_control: true - editor_model_name: openrouter/anthropic/claude-sonnet-4-5 + editor_model_name: openrouter/anthropic/claude-sonnet-4.5 editor_edit_format: editor-diff use_temperature: false overeager: true @@ -2142,20 +2142,20 @@ - name: openrouter/anthropic/claude-sonnet-4.5 edit_format: diff - weak_model_name: openrouter/anthropic/claude-haiku-4-5 + weak_model_name: openrouter/anthropic/claude-haiku-4.5 use_repo_map: true examples_as_sys_msg: false extra_params: max_tokens: 64000 cache_control: true - editor_model_name: openrouter/anthropic/claude-sonnet-4-5 + editor_model_name: openrouter/anthropic/claude-sonnet-4.5 editor_edit_format: editor-diff accepts_settings: ["thinking_tokens"] - name: openrouter/anthropic/claude-haiku-4.5 edit_format: diff - weak_model_name: openrouter/anthropic/claude-haiku-4-5 + weak_model_name: openrouter/anthropic/claude-haiku-4.5 use_repo_map: true examples_as_sys_msg: false extra_params: diff --git a/cecli/tools/edit_text.py b/cecli/tools/edit_text.py index 0e44efc753a..f523241f606 100644 --- a/cecli/tools/edit_text.py +++ b/cecli/tools/edit_text.py @@ -24,6 +24,12 @@ } +USER_EDIT_CATEGORIES = { + "no_changes": "No Changes", + "syntax_errors": "Syntax Errors", +} + + class Tool(BaseTool): NORM_NAME = "edittext" TRACK_INVOCATIONS = False @@ -37,54 +43,58 @@ class Tool(BaseTool): "name": "EditText", "description": ( "Edit text in one or more files using content ID markers. " - "Supports replace and delete operations in a single call. " - "Can handle an array of edits across multiple files. " - "Each edit must include its own file_path and operation type. " - "Use content ID ranges with the start_line and end_line parameters with format " - "`content_id::` (the content id with the :: demarcator). For empty files, use `@000` as the " - "content ID references. " - "Start and end values are inclusive: start and end content IDs both count as " - "part of the range to replace or delete. " - "Edits within a file must not be adjacent or overlapping." + "You can perform multiple 'replace' or 'delete' operations in a single call. " + "CRITICAL RULES: " + "1. Start and end content IDs are INCLUSIVE. Both will be modified or deleted. " + "2. Edits within the same file MUST NOT be adjacent or overlapping. " + "3. For empty files, you MUST use '@000' as the content ID reference." ), "parameters": { "type": "object", "properties": { "edits": { "type": "array", + "description": ( + "List of edit operations to apply. Each edit requires a " + "file path, operation type, start/end IDs, and text." + ), "items": { "type": "object", "properties": { "file_path": { "type": "string", - "description": "Required file path for this specific edit.", + "description": ( + "The absolute or relative path to the file being edited." + ), }, "operation": { "type": "string", "enum": ["replace", "delete"], "description": ( - "The type of operation: 'replace' (replace range with" - " text) or 'delete' (remove range)." + "Choose 'replace' to swap the ID range with new text, " + "or 'delete' to remove the ID range entirely." ), }, "start_line": { "type": "string", "description": ( - "Content ID for start line. Only include the id and demarcator." + "The exact content ID and demarcator for the start of the edit " + "(e.g., 'abc::'). For empty files, use '@000'." ), }, "end_line": { "type": "string", "description": ( - "Content ID for end line. Only include the id and demarcator." + "The exact content ID and demarcator for the end of the edit " + "(e.g., 'xyz::'). For empty files, use '@000'." ), }, "text": { "type": "string", "description": ( - "Text content for replace operations. " - "Empty string for delete operations. " - "Do not include content IDs inside replacement text" + "The exact replacement text. If operation is 'delete', " + 'this MUST be an empty string (""). ' + "NEVER include content IDs in this text." ), }, }, @@ -96,9 +106,11 @@ class Tool(BaseTool): "text", ], }, - "description": "Array of edits to apply.", }, - "change_id": {"type": "string"}, + "change_id": { + "type": "string", + "description": "Optional tracking ID for this batch of edits.", + }, }, "required": ["edits"], }, @@ -255,7 +267,9 @@ def execute( except Exception as e: # Record failed edit but continue with others - file_failed_edits.append(f"Edit {edit_index + 1}: {str(e)}") + file_failed_edits.append( + f"Edit {edit_index + 1} - {cls._categorize_edit_error(str(e))}" + ) continue # Apply all operations in batch @@ -269,13 +283,27 @@ def execute( if new_content != original_content: file_successful_edits += len(successful_ops) else: - raise ToolError("Invalid Edit - Update content ID bounds") + # Be specific about why content didn't change + if failed_ops: + error_details = "; ".join( + f"Edit {op['index'] + 1}: {op['error']}" for op in failed_ops + ) + raise ToolError( + f"Invalid Edit - Update content ID bounds: {error_details}" + ) + else: + raise ToolError( + "Invalid Edit - Update content ID bounds - " + "all edits resulted in unchanged content" + ) if len(failed_ops): for failed_op in failed_ops: op_index = failed_op["index"] op_error = failed_op["error"] - file_failed_edits.append(f"Edit {op_index + 1}: {str(op_error)}") + file_failed_edits.append( + f"Edit {op_index + 1} - {cls._categorize_edit_error(str(op_error))}" + ) except Exception as e: # If batch operation fails, mark all operations as failed for edit_index, _ in file_edits: @@ -339,7 +367,9 @@ def execute( except Exception as e: # Record all edits for this file as failed for edit_index, _ in file_edits: - all_failed_edits.append(f"Edit {edit_index + 1}: {str(e)}") + all_failed_edits.append( + f"Edit {edit_index + 1} - {cls._categorize_edit_error(str(e))}" + ) continue # If dry run, return all results @@ -360,10 +390,6 @@ def execute( raise ToolError(error_msg) # 5. Format and return result - # Log failed edit messages to console for visibility - if all_failed_edits: - for failed_msg in all_failed_edits: - coder.io.tool_error(failed_msg) if files_processed == 1: # Single file case @@ -459,6 +485,9 @@ def format_output(cls, coder, mcp_server, tool_response): original_content = coder.io.read_text(abs_path) if original_content is not None: + start_line, end_line = resolve_content_to_hashline_ids( + original_content, start_line, end_line + ) diff_output = get_hashline_diff( original_content=strip_hashline(original_content), start_line_hash=start_line, @@ -483,3 +512,21 @@ def format_output(cls, coder, mcp_server, tool_response): coder.io.tool_output("") tool_footer(coder=coder, tool_response=tool_response, params=params) + + @classmethod + def _categorize_edit_error(cls, error_msg: str) -> str: + """Categorize an edit error message into a user-friendly display category. + + Maps errors from apply_hashline_operations to simplified category names + for user-facing output instead of displaying full error details. + + Args: + error_msg: The raw error message string. + + Returns: + str: The display category name (e.g., "No Changes", "Syntax Errors"). + """ + error_lower = error_msg.lower() + if "syntax error" in error_lower or "introduces new syntax" in error_lower: + return USER_EDIT_CATEGORIES["syntax_errors"] + return USER_EDIT_CATEGORIES["no_changes"] diff --git a/cecli/tools/read_range.py b/cecli/tools/read_range.py index fe5be2e7228..50cc56abdb0 100644 --- a/cecli/tools/read_range.py +++ b/cecli/tools/read_range.py @@ -511,7 +511,9 @@ def _is_valid_int(s): # output_lines = [f"Displaying context around {found_by} in {rel_path}:"] # Generate hashline for the entire file - hashed_content, _ = hashline_formatted(content, file_name=abs_path, partial=False) + hashed_content, _ = hashline_formatted( + content, file_name=abs_path, partial=False, expanded=False + ) hashed_lines = hashed_content.splitlines() # Extract the context window from hashed lines @@ -632,7 +634,7 @@ def _is_valid_int(s): if already_up_to_details: coder.io.tool_output( ( - "Lines already up to date in context for" + "Earlier contents still valid for" f" {len(already_up_to_details)} operation(s)" ), type="tool-result", @@ -640,10 +642,10 @@ def _is_valid_int(s): detail_str = "\n".join(already_up_to_details) result_parts.append( - "Content up to date and available in history from previous read for " + "Earlier contents still valid from previous read for " f"{len(already_up_to_details)} operation(s):\n\n" f"{detail_str}\n" - "Current contents for these reads available in previous content ID message." + "Relevant contents for these reads available in previous message." ) if already_up_to_date and not new_context_retrieved: result_parts.append( @@ -731,6 +733,7 @@ def format_model_response(cls, coder, rel_path, s_idx, e_idx, hashed_lines, curr "start_line": start_stub_s + 1, "end_line": end_stub_e if has_second_range else start_stub_e, "partial": True, + "expanded": True, "prefixed_contents": prefixed, } return json.dumps(result, ensure_ascii=False) @@ -754,6 +757,7 @@ def format_model_response(cls, coder, rel_path, s_idx, e_idx, hashed_lines, curr "start_line": s_idx + 1, "end_line": e_idx + 1, "partial": True, + "expanded": False, "prefixed_contents": prefixed, } return json.dumps(result, ensure_ascii=False) diff --git a/cecli/tui/worker.py b/cecli/tui/worker.py index a233df24243..4b28fcf7e89 100644 --- a/cecli/tui/worker.py +++ b/cecli/tui/worker.py @@ -156,7 +156,9 @@ async def _handle_switch_coder_signal(self, switch): else: new_coder.show_announcements() - edit_format = getattr(target_coder, "edit_format", "code") or "code" + edit_format = kwargs.get( + "edit_format", getattr(target_coder, "edit_format", "code") or "code" + ) self.output_queue.put( { "type": "mode_change", diff --git a/tests/basic/test_hashline_closure.py b/tests/basic/test_hashline_closure.py index ca2587dc138..cd762903909 100644 --- a/tests/basic/test_hashline_closure.py +++ b/tests/basic/test_hashline_closure.py @@ -252,21 +252,24 @@ def _make_closure_op(operation, text, start_idx, end_idx, index=0): def test_closure_safeguard_no_file_path(): """Without file_path, ops should be returned unchanged.""" ops = [_make_closure_op("replace", "pass", 0, 0)] - result = _apply_closure_safeguard("x = 1", ops, file_path=None) + result, rejected = _apply_closure_safeguard("x = 1", ops, file_path=None) assert result == ops + assert rejected == [] def test_closure_safeguard_unsupported_language(): """File path with unsupported language — ops returned unchanged.""" ops = [_make_closure_op("replace", "xyz", 0, 0)] - result = _apply_closure_safeguard("some content", ops, file_path="test.xyz") + result, rejected = _apply_closure_safeguard("some content", ops, file_path="test.xyz") assert result == ops + assert rejected == [] def test_closure_safeguard_empty_ops(): """Empty ops list — returned as-is.""" - result = _apply_closure_safeguard("x = 1", [], file_path="test.py") + result, rejected = _apply_closure_safeguard("x = 1", [], file_path="test.py") assert result == [] + assert rejected == [] def test_closure_safeguard_valid_code_no_change(): @@ -277,7 +280,7 @@ def test_closure_safeguard_valid_code_no_change(): """ # Replace all 3 lines with 2 valid lines ops = [_make_closure_op("replace", "a = 10\nb = 20", 0, 2)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") assert result[0]["start_idx"] == 0 assert result[0]["end_idx"] == 2 @@ -298,7 +301,7 @@ def test_closure_safeguard_heals_outer_scope(): # which tree-sitter finds as an ERROR. # The safeguard should expand end_idx to include line 3. ops = [_make_closure_op("replace", ' return "value"', 1, 2)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") assert result[0]["start_idx"] == 1 assert result[0]["end_idx"] == 3 # Expanded to consume stray closing brace @@ -311,7 +314,7 @@ def test_closure_safeguard_simple_python_replace(): """ # Replace line 1 ("y = 2") with "y = 99" ops = [_make_closure_op("replace", "y = 99", 1, 1)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") assert result[0]["start_idx"] == 1 assert result[0]["end_idx"] == 1 @@ -324,7 +327,7 @@ def test_closure_safeguard_delete_valid(): """ # Delete line 1 ("y = 2") — result is "x = 1\nz = 3" which is valid ops = [_make_closure_op("delete", None, 1, 1)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") assert result[0]["start_idx"] == 1 assert result[0]["end_idx"] == 1 @@ -343,7 +346,7 @@ def test_closure_safeguard_heals_stray_except(): # Replacing lines 0-1 with 'x = 1' leaves a stray 'except:' on line 2. # The safeguard should expand the boundary to avoid the parse error. ops = [_make_closure_op("replace", "x = 1", 0, 1)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") healed_start = result[0]["start_idx"] healed_end = result[0]["end_idx"] # Boundaries should have changed from original (0, 1) @@ -361,57 +364,78 @@ def test_closure_safeguard_heals_stray_except(): ), f"Healed source still has tree-sitter errors: {new_source!r}" -def test_closure_safeguard_skip_insert(): - """Insert operations should be skipped by the safeguard.""" - source = "x = 1\ny = 2\n" - ops = [_make_closure_op("insert", "z = 3", 1, 1)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") - assert result == ops # Insert ops pass through unchanged +# ============================================================================= +# Tests for eviction gate (edits that introduce syntax errors are rejected) +# ============================================================================= -def test_closure_safeguard_with_outer_function_brace(): +def test_closure_safeguard_evicts_syntax_error_edit(): """ - Test that the safeguard can expand an edit that would 'eat' an outer - scope's closing brace. + Edit that produces invalid Python syntax should be evicted by the gate. + The eviction gate checks if ast_error_count > baseline_error_count and + discards the edit entirely when even the best candidate boundary fails. """ - source = """if True: - if False: - x = 1 + source = """x = 1 y = 2 +z = 3 """ - lines = source.splitlines() # noqa - # Line 0: "if True:" - # Line 1: " if False:" - # Line 2: " x = 1" - # Line 3: "y = 2" - - # Replace lines 1-2 with something that would leave invalid indentation - # Let me test something that should work: replacing just the inner if - ops = [_make_closure_op("replace", " x = 99", 1, 2)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") - # The replacement " x = 99" should be valid, keep original bounds - assert result[0]["start_idx"] == 1 - assert result[0]["end_idx"] == 2 + # Replace "z = 3" with "]][][[[]" — broken syntax that no boundary adjustment can fix. + ops = [_make_closure_op("replace", "]][][[[]", 2, 2)] + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") + # The edit should be evicted (not in result, present in rejected) + assert len(result) == 0, f"Expected no ops to survive, got {len(result)}" + assert len(rejected) == 1, f"Expected 1 rejected op, got {len(rejected)}" + assert "syntax error" in rejected[0]["error"].lower() + assert rejected[0]["index"] == 0 + + +def test_closure_safeguard_evicts_incompatible_replacement(): + """ + An edit that fundamentally breaks the grammar (e.g., replacing a function + body with something syntactically incompatible) should be evicted. + No amount of boundary shifting can fix it. + """ + source = """def foo(): + return 1 + +def bar(): + return 2 +""" + # Replace the entire function + middle with just hard-to-parse junk + ops = [_make_closure_op("replace", "]]]]broken[[[[", 0, 3)] + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") + assert len(result) == 0, f"Expected no ops to survive, got {len(result)}" + assert len(rejected) == 1 + assert rejected[0]["index"] == 0 -def test_closure_safeguard_heals_broken_dict(): +def test_closure_safeguard_edit_with_off_by_one_healing(): """ - A replace that removes the opening brace but keeps the closing brace - should have its boundaries expanded by the safeguard to include the - stray closing brace. + An edit that is slightly off (off-by-one line) should be healed by + expanding boundaries to include the missing context. """ source = """def foo(): return { - "key": "value" + "key": "value", } + +def bar(): + return 42 """ - # Replace lines 1-2 with ' return "value"' — this eats the opening - # brace but leaves the closing '}' on line 3, producing a parse error. - ops = [_make_closure_op("replace", ' return "value"', 1, 2)] - result = _apply_closure_safeguard(source, ops, file_path="test.py") + # Replace just line 1 (" return {") with ' return "value"' — but + # leave lines 2-3 (the dict contents and closing brace) orphaned. + # The safeguard should expand end_idx to consume the orphaned lines. + ops = [_make_closure_op("replace", ' return "value"', 1, 1)] + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") + assert len(result) == 1, f"Expected 1 healed op, got {len(result)}" + assert len(rejected) == 0, f"Expected no rejected ops, got {len(rejected)}" + # Boundaries should have expanded from (1, 1) to include lines 2-3 healed_start = result[0]["start_idx"] healed_end = result[0]["end_idx"] - # Verify the healed result parses correctly via tree-sitter + assert ( + healed_end >= 3 + ), f"Expected end_idx >= 3 (to consume orphaned dict lines), got {healed_end}" + # Verify the healed result parses correctly from cecli.helpers.grep_ast.tsl import get_parser parser = get_parser("python") @@ -424,9 +448,73 @@ def test_closure_safeguard_heals_broken_dict(): ), f"Healed source still has tree-sitter errors: {new_source!r}" +def test_closure_safeguard_off_by_one_start(): + """ + When an edit's start_idx is one line too late (doesn't include a + statement that opens a scope), the safeguard should expand it backward. + """ + source = """def foo(): + return { + "key": "value", + } +""" + # Line 0: "def foo():" + # Line 1: " return {" + # Line 2: ' "key": "value",' + # Line 3: " }" + # Replace lines 2-3 with ' pass' — this leaves line 0 and 1 + # (the function header and the opening brace) but replaces the + # content and closing brace. Result has a stray opening brace + # that cannot be parsed. + # The safeguard should expand start_idx from 2 backward to 1 + # to include the " return {" line. + ops = [_make_closure_op("replace", " pass", 2, 3)] + result, rejected = _apply_closure_safeguard(source, ops, file_path="test.py") + assert len(result) == 1 + assert len(rejected) == 0 + # start_idx should have expanded from 2 to at most 1 + assert result[0]["start_idx"] <= 1, ( + f"Expected start_idx <= 1 (expanded for opening brace), " f"got {result[0]['start_idx']}" + ) + + +def test_apply_closure_safeguard_passes_rejected_ops_to_failed(): + """ + Integration test: apply_hashline_operations should propagate rejected + operations (from _apply_closure_safeguard) into the failed_ops list. + """ + from cecli.helpers.hashline import apply_hashline_operations, hashline + + source = """x = 1 +y = 2 +""" + # Try to replace a line with broken syntax — this should fail via eviction + # Get the correct hashline ID for the first line + hp_content = hashline(source) + first_line_hash = hp_content.splitlines()[0].split("::")[0] + "::" + + ops = [ + { + "start_line_hash": first_line_hash, + "end_line_hash": first_line_hash, + "operation": "replace", + "text": "]][[[[broken", + } + ] + new_content, successful_ops, failed_ops = apply_hashline_operations( + source, ops, file_path="test.py" + ) + # Content should be unchanged since the edit was rejected + assert new_content == source, "Expected content unchanged when all edits are rejected" + assert len(failed_ops) >= 1, f"Expected at least 1 failed_ops from eviction, got {failed_ops}" + # Error message should mention syntax errors + assert any( + "syntax error" in op["error"].lower() for op in failed_ops + ), f"Expected syntax error mention in failed_ops, got {failed_ops}" + + # ============================================================================= # Tests for _merge_replace_operations -# ============================================================================= def _make_merge_op(operation, text, start_idx, end_idx, index=0): diff --git a/tests/coders/test_copypaste_coder.py b/tests/coders/test_copypaste_coder.py index 9a49165c28e..acb9d3414dc 100644 --- a/tests/coders/test_copypaste_coder.py +++ b/tests/coders/test_copypaste_coder.py @@ -1,43 +1,33 @@ import hashlib import json -from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, call, patch import pytest -from cecli.coders.copypaste_coder import CopyPasteCoder +from cecli.coders.copypaste_coder import get_copy_paste_coder_class from cecli.coders.editblock_coder import EditBlockCoder +# Dynamically create a CopyPasteCoder class for testing purposes. +# We use the 'diff' edit format so it inherits EditBlockCoder behavior. +_TestModel = type("TestModel", (), {"edit_format": None, "name": "test"}) +CopyPasteCoder = get_copy_paste_coder_class("diff", _TestModel()) -def test_init_prompts_uses_selected_edit_format(): - coder = CopyPasteCoder.__new__(CopyPasteCoder) - coder.args = SimpleNamespace(edit_format="diff") - coder.main_model = SimpleNamespace(edit_format=None) - coder.edit_format = None - coder.gpt_prompts = None - - coder._init_prompts_from_selected_edit_format() - - assert coder.gpt_prompts is not None - assert hasattr(coder.gpt_prompts, "main_system") - assert coder.edit_format == EditBlockCoder.edit_format - - -def test_init_prompts_preserves_existing_when_no_match(monkeypatch): - coder = CopyPasteCoder.__new__(CopyPasteCoder) - coder.args = SimpleNamespace(edit_format="custom-format") - coder.main_model = SimpleNamespace(edit_format=None) - coder.edit_format = "original-format" - coder.gpt_prompts = "original-prompts" - - import cecli.coders as coders - - monkeypatch.setattr(coders, "__all__", [], raising=False) - coder._init_prompts_from_selected_edit_format() +def test_dynamic_class_inherits_from_target_coder(): + """The dynamic CopyPasteCoder class inherits from the target coder class.""" + # Already created at module level: CopyPasteCoder = get_copy_paste_coder_class("diff", _TestModel()) + assert issubclass( + CopyPasteCoder, EditBlockCoder + ), "CopyPasteCoder should inherit from EditBlockCoder for 'diff' format" + # The fixed CopyPasteCoder marker should also be in the MRO + from cecli.coders.copypaste_coder import CopyPasteCoder as FixedCopyPasteCoder - assert coder.gpt_prompts == "original-prompts" - assert coder.edit_format == "original-format" + assert ( + FixedCopyPasteCoder in CopyPasteCoder.__mro__ + ), "Fixed CopyPasteCoder should be in the MRO for isinstance checks" + # gpt_prompts should resolve correctly via the inherited property + assert CopyPasteCoder.prompt_format is not None + assert CopyPasteCoder.prompt_format == EditBlockCoder.prompt_format @pytest.mark.asyncio @@ -51,7 +41,11 @@ async def test_send_uses_copy_paste_flow(monkeypatch): coder.partial_response_tool_calls = [] coder.partial_response_function_call = None coder.chat_completion_call_hashes = [] - coder.show_send_output = AsyncMock() + coder.interrupt_event = MagicMock() + + coder.show_send_output = AsyncMock( + side_effect=lambda c: coder.partial_response_chunks.append(c) + ) coder.calculate_and_show_tokens_and_cost = MagicMock() def fake_preprocess_response(): diff --git a/tests/scrape/test_scrape.py b/tests/scrape/test_scrape.py index 15db33a33f2..6a90ca18039 100644 --- a/tests/scrape/test_scrape.py +++ b/tests/scrape/test_scrape.py @@ -39,7 +39,10 @@ def get_all_abs_files(self): def get_announcements(self): return [] - return Commands(io, DummyCoder()) + coder = DummyCoder() + cmds = Commands(io, coder) + cmds._test_coder = coder # keep strong reference to prevent GC of weakref + return cmds @patch("cecli.commands.web.install_playwright") @patch("cecli.commands.web.Scraper") diff --git a/tests/tools/test_read_range_execute.py b/tests/tools/test_read_range_execute.py index 2da385d208c..a985a6452b9 100644 --- a/tests/tools/test_read_range_execute.py +++ b/tests/tools/test_read_range_execute.py @@ -125,7 +125,7 @@ def _setup(self, mock_coder, mock_file_context, mock_chunks, mock_manager, file_ # Patch hashline_formatted to return (text, json) hl_patch = patch( "cecli.tools.read_range.hashline_formatted", - side_effect=lambda text, file_name, partial, start_line=1: (text, "{}"), + side_effect=lambda text, file_name, partial, expanded, start_line=1: (text, "{}"), ) hl_patch.start() self.patches.append(hl_patch) @@ -451,7 +451,7 @@ def resolve_side_effect(coder, file_path): hl_patch = patch( "cecli.tools.read_range.hashline_formatted", - side_effect=lambda text, file_name, partial, start_line=1: (text, "{}"), + side_effect=lambda text, file_name, partial, expanded, start_line=1: (text, "{}"), ) hl_patch.start()