-
Notifications
You must be signed in to change notification settings - Fork 862
feat(agent): add stream_final_turn_only parameter to stream_async #2104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,7 +65,15 @@ | |
| from ..tools.registry import ToolRegistry | ||
| from ..tools.structured_output._structured_output_context import StructuredOutputContext | ||
| from ..tools.watcher import ToolWatcher | ||
| from ..types._events import AgentResultEvent, EventLoopStopEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent | ||
| from ..types._events import ( | ||
| AgentResultEvent, | ||
| EventLoopStopEvent, | ||
| InitEventLoopEvent, | ||
| ModelStreamChunkEvent, | ||
| StartEventLoopEvent, | ||
| TextStreamEvent, | ||
| TypedEvent, | ||
| ) | ||
| from ..types.agent import AgentInput, ConcurrentInvocationMode | ||
| from ..types.content import ContentBlock, Message, Messages, SystemContentBlock | ||
| from ..types.exceptions import ConcurrencyException, ContextWindowOverflowException | ||
|
|
@@ -776,6 +784,7 @@ async def stream_async( | |
| invocation_state: dict[str, Any] | None = None, | ||
| structured_output_model: type[BaseModel] | None = None, | ||
| structured_output_prompt: str | None = None, | ||
| stream_final_turn_only: bool = False, | ||
| **kwargs: Any, | ||
| ) -> AsyncIterator[Any]: | ||
| """Process a natural language prompt and yield events as an async iterator. | ||
|
|
@@ -795,6 +804,11 @@ async def stream_async( | |
| invocation_state: Additional parameters to pass through the event loop. | ||
| structured_output_model: Pydantic model type(s) for structured output (overrides agent default). | ||
| structured_output_prompt: Custom prompt for forcing structured output (overrides agent default). | ||
| stream_final_turn_only: When True, buffers text events from intermediate turns and only yields | ||
| text events from the final turn (where stop_reason is "end_turn"). Non-text events such as | ||
| lifecycle, tool use, reasoning, and citation events are yielded normally regardless of this | ||
| setting. When False (default), all events are yielded as they are produced with no change | ||
| in behavior. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The docstring says "Non-text events such as lifecycle, tool use, reasoning, and citation events are yielded normally regardless of this setting." While accurate, this creates an asymmetry that may confuse users: reasoning text from intermediate turns passes through (it's a Suggestion: Consider calling this out explicitly in the docstring with a brief note, e.g.: |
||
| **kwargs: Additional parameters to pass to the event loop.[Deprecating] | ||
|
|
||
| Yields: | ||
|
|
@@ -811,11 +825,21 @@ async def stream_async( | |
| Exception: Any exceptions from the agent invocation will be propagated to the caller. | ||
|
|
||
| Example: | ||
| Stream all events (default behavior): | ||
|
|
||
| ```python | ||
| async for event in agent.stream_async("Analyze this data"): | ||
| if "data" in event: | ||
| yield event["data"] | ||
| ``` | ||
|
|
||
| Stream only the final answer (skip intermediate tool-use turns): | ||
|
|
||
| ```python | ||
| async for event in agent.stream_async("Analyze this data", stream_final_turn_only=True): | ||
| if "data" in event: | ||
| yield event["data"] # Only receives final turn text | ||
| ``` | ||
| """ | ||
| # Conditionally acquire lock based on concurrent_invocation_mode | ||
| # Using threading.Lock instead of asyncio.Lock because run_async() creates | ||
|
|
@@ -855,9 +879,25 @@ async def stream_async( | |
| try: | ||
| events = self._run_loop(messages, merged_state, structured_output_model, structured_output_prompt) | ||
|
|
||
| text_event_buffer: list[dict[str, Any]] = [] | ||
|
|
||
| async for event in events: | ||
| event.prepare(invocation_state=merged_state) | ||
|
|
||
| if stream_final_turn_only: | ||
| if isinstance(event, StartEventLoopEvent): | ||
| text_event_buffer.clear() | ||
| elif isinstance(event, TextStreamEvent): | ||
| text_event_buffer.append(event.as_dict()) | ||
| continue | ||
| elif isinstance(event, EventLoopStopEvent): | ||
| stop_reason = event["stop"][0] | ||
| if stop_reason == "end_turn": | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: When Suggestion: Consider flushing buffered text for any stop reason that is not elif isinstance(event, EventLoopStopEvent):
stop_reason = event["stop"][0]
if stop_reason != "tool_use":
for buffered in text_event_buffer:
callback_handler(**buffered)
yield buffered
text_event_buffer.clear()This way, if the agent is cancelled or hits max_tokens on the final turn, the partial text is still delivered to the caller. If you decide to keep the current behavior, please document explicitly in the docstring that text is only delivered for |
||
| for buffered in text_event_buffer: | ||
| callback_handler(**buffered) | ||
| yield buffered | ||
| text_event_buffer.clear() | ||
|
|
||
| if event.is_callback_event: | ||
| as_dict = event.as_dict() | ||
| callback_handler(**as_dict) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for this versus
agent.invoke? The events are buffered as is so I'm not clear why you would use this instead ofagent.invokewhich provides the completed message as wellThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is a fair question. you are right that text events for the final turn are buffered until EventLoopStopEvent arrives with end_turn, then flushed as a batch. So for the text content alone, the user experience is similar to invoke(). however I think the parts of differences are:
with invoke(), the caller is blocked until the entire agent loop completes, which they won't have the visibility into tool calls, lifecycle events, reasoning, or progress. With stream_async(stream_final_turn_only=True), the consumer still receives: