Skip to content

perf(event): iteration extension events and general-purpose status events#1889

Open
E2ern1ty wants to merge 3 commits into
agentscope-ai:mainfrom
E2ern1ty:feat/improve-event-system
Open

perf(event): iteration extension events and general-purpose status events#1889
E2ern1ty wants to merge 3 commits into
agentscope-ai:mainfrom
E2ern1ty:feat/improve-event-system

Conversation

@E2ern1ty

Copy link
Copy Markdown

Summary

Closes #1696.

Improves the event system in two ways, as described in the issue.

1. Iteration extension events (HITL loop-limit handling)

  • Add RequireIterationExtensionEvent, emitted when the reasoning-acting loop reaches its iteration limit.
  • Add IterationExtensionResultEvent to capture the user's decision and resume the paused loop.
  • Gated by a new ReActConfig.allow_iteration_extension (default False, fully backward compatible). When disabled, the loop terminates with ExceedMaxItersEvent as before.
  • When enabled, reaching the limit emits RequireIterationExtensionEvent and pauses (mirroring the existing HITL tool-confirmation pause/resume). On resume:
    • Approved → extends the per-reply iteration budget (AgentState.iteration_extension) and continues. Approving without a positive count grants another full max_iters budget so the loop always makes progress.
    • Denied → falls back to the existing ExceedMaxItersEvent termination path.
  • Pause state (AgentState.awaiting_iteration_extension / iteration_extension) lives on AgentState, so the pause/resume cycle survives persistence and process restarts.

2. General-purpose status events

  • Add a customizable StatusEvent (name + status + arbitrary data) and Agent.emit_status(...) so the backend can notify the frontend about time-consuming operations without being tied to a specific operation.
  • Context compaction emits compressing_context start/end status events as the canonical example.
  • The active reply's status queue is exposed through a ContextVar (inspired by LangGraph's get_stream_writer), so emit_status works from any stage of a reply — reasoning, tool execution, compaction, or user middleware — and the events are interleaved promptly into the stream. Internally the reply pipeline runs as a background task whose output is drained by a driver, with exception propagation and early-close cancellation handled.

Wiring

  • Thread the new input event through reply / reply_stream and the chat service; convert the new events in the AG-UI protocol middleware; skip wake-up runs while the agent is awaiting an iteration extension decision.

Tests

  • tests/iteration_extension_test.py — request / approve / deny, default-budget grant, validation errors.
  • tests/status_event_test.py — emit-outside-reply no-op, compaction status events in a real reply, a tool emitting status events during a reply (proves it is not tied to compaction), early-close cancellation, exception propagation.
  • Extended tests/agui_protocol_test.py for the three new events.
  • Local checks: black, flake8, mypy, pylint all clean; relevant test suites pass.

Backward compatibility

  • Iteration extension is opt-in (allow_iteration_extension=False by default); existing behavior is unchanged.
  • compress_context() keeps its -> None signature.
  • New AgentState / ReActConfig fields have defaults, so existing serialized state/config deserialize unchanged.

E2ern1ty added 3 commits June 17, 2026 16:42
… events

Add iteration extension events and a general-purpose status event
mechanism to the agent event system.

Iteration extension events:
- Add RequireIterationExtensionEvent, emitted when the reasoning-acting
  loop reaches its iteration limit (gated by the new
  ReActConfig.allow_iteration_extension, default False for backward
  compatibility).
- Add IterationExtensionResultEvent to capture the user's decision and
  resume the paused loop. Approving extends the per-reply iteration
  budget (tracked via AgentState.iteration_extension); denying falls
  back to the existing ExceedMaxItersEvent termination path.
- Track the paused state via AgentState.awaiting_iteration_extension so
  the pause/resume cycle survives persistence and process restarts.

General status events:
- Add a general-purpose, user-customizable StatusEvent (name + status +
  arbitrary data) plus Agent.emit_status for emitting custom status
  notifications into the active reply stream.
- Wire context compaction to emit start/end status events as the
  canonical example of a time-consuming operation.

Wiring:
- Thread the new input event through reply/reply_stream and the chat
  service; convert the new events in the AG-UI protocol middleware; skip
  wake-up runs while awaiting an iteration extension decision.

Tests:
- Add tests for iteration extension (request/approve/deny, default
  budget, validation) and the status event mechanism (emit, drain,
  early-close cancellation, compaction), and extend the AG-UI tests.
Make the general-purpose status event mechanism work from any stage of a
reply (reasoning, tool execution, context compaction, user middleware),
not just during context compaction.

Inspired by LangGraph's get_stream_writer, the active reply's status
queue is now exposed through a ContextVar so emit_status can be called
from anywhere in the call stack without threading a writer argument.

- Run the reply pipeline (_reply) as a background task that pushes its
  output onto a single queue; a driver concurrently drains the queue and
  re-yields, so status events emitted during a long-running awaited
  operation surface promptly, interleaved in emission order with the
  main events.
- Bind the status queue inside the producer task's own (copied) context
  to avoid the cross-context ContextVar reset pitfall on async-generator
  aclose(); the binding is discarded automatically when the task ends.
- The driver propagates pipeline exceptions to the consumer and cancels
  the background task if the consumer abandons the stream early.
- Remove the previous _drain_status_events helper and the per-instance
  _status_queue; the compaction call site reverts to a plain await.

Rewrite the status event tests to exercise the mechanism through real
reply_stream runs, including a tool that emits status events during the
reply (demonstrating it is no longer tied to compaction), early-close
cancellation, and exception propagation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

perf(event): improve the current event system

1 participant