Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion strands-py/src/strands/agent/agent.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a first-class SDK option to stream only the final answer, eliminating the need for consumer-side buffering.

What's the use case for this versus agent.invoke? The events are buffered as is so I'm not clear why you would use this instead of agent.invoke which provides the completed message as well

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a fair question. you are right that text events for the final turn are buffered until EventLoopStopEvent arrives with end_turn, then flushed as a batch. So for the text content alone, the user experience is similar to invoke(). however I think the parts of differences are:

  1. Non-text events still stream in real-time throughout the whole execution

with invoke(), the caller is blocked until the entire agent loop completes, which they won't have the visibility into tool calls, lifecycle events, reasoning, or progress. With stream_async(stream_final_turn_only=True), the consumer still receives:

  • start_event_loop per turn (progress indicator)
  • current_tool_use events
  • etc
  1. users wrapping agents in SSE endpoints already use stream_async for everything else. at least for my project I tend to stick with stream_async pattern (that's whats on the AWS example/ documentation as well). asking them to special-case invoke() for the final answer requires user to do extra research.

Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@
from ..tools.registry import ToolRegistry
from ..tools.structured_output._structured_output_context import StructuredOutputContext
from ..tools.watcher import ToolWatcher
from ..types._events import AgentResultEvent, EventLoopStopEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent
from ..types._events import (
AgentResultEvent,
EventLoopStopEvent,
InitEventLoopEvent,
ModelStreamChunkEvent,
StartEventLoopEvent,
TextStreamEvent,
TypedEvent,
)
from ..types.agent import AgentInput, ConcurrentInvocationMode, Limits
from ..types.content import ContentBlock, Message, Messages, SystemContentBlock
from ..types.exceptions import ConcurrencyException, ContextWindowOverflowException
Expand Down Expand Up @@ -792,6 +800,7 @@ async def stream_async(
invocation_state: dict[str, Any] | None = None,
structured_output_model: type[BaseModel] | None = None,
structured_output_prompt: str | None = None,
stream_final_turn_only: bool = False,
limits: Limits | None = None,
**kwargs: Any,
) -> AsyncIterator[Any]:
Expand All @@ -812,6 +821,19 @@ async def stream_async(
invocation_state: Additional parameters to pass through the event loop.
structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
stream_final_turn_only: When True, buffers text events from intermediate turns and only yields
text events from the final turn. A turn is considered intermediate when it ends with a
``tool_use`` stop reason; any other stop reason (``end_turn``, ``max_tokens``,
``content_filtered``, ``cancelled``, etc.) flushes the buffered text so partial output
is not silently lost when the model terminates abnormally on the final turn.

Note: This setting only filters ``TextStreamEvent`` instances (events with a ``"data"``
key). Reasoning events from intermediate turns are still yielded because they are a
distinct event type (``ReasoningTextStreamEvent``). Non-text events such as lifecycle,
tool use, reasoning, and citation events are yielded normally regardless of this setting.

When False (default), all events are yielded as they are produced with no change in
behavior.
limits: Per-invocation budget caps (turns / output_tokens / total_tokens).
See :class:`~strands.types.agent.Limits`. When a cap is reached, the loop
terminates gracefully at the next turn boundary with a corresponding
Expand All @@ -835,11 +857,21 @@ async def stream_async(
Exception: Any exceptions from the agent invocation will be propagated to the caller.

Example:
Stream all events (default behavior):

```python
async for event in agent.stream_async("Analyze this data"):
if "data" in event:
yield event["data"]
```

Stream only the final answer (skip intermediate tool-use turns):

```python
async for event in agent.stream_async("Analyze this data", stream_final_turn_only=True):
if "data" in event:
yield event["data"] # Only receives final turn text
```
"""
self._validate_limits(limits)
# Conditionally acquire lock based on concurrent_invocation_mode
Expand Down Expand Up @@ -882,9 +914,31 @@ async def stream_async(
messages, merged_state, structured_output_model, structured_output_prompt, limits
)

text_event_buffer: list[dict[str, Any]] = []

async for event in events:
event.prepare(invocation_state=merged_state)

if stream_final_turn_only:
if isinstance(event, StartEventLoopEvent):
text_event_buffer.clear()
elif isinstance(event, TextStreamEvent):
text_event_buffer.append(event.as_dict())
continue
elif isinstance(event, EventLoopStopEvent):
stop_reason = event["stop"][0]
# Flush buffered text for any stop reason except tool_use.
# tool_use is the only stop reason that means "this is an
# intermediate turn — more model turns will follow". For all
# other stop reasons (end_turn, max_tokens, content_filtered,
# cancelled, etc.) the buffered text represents the model's
# final output and should be delivered to the caller.
if stop_reason != "tool_use":
for buffered in text_event_buffer:
callback_handler(**buffered)
yield buffered
text_event_buffer.clear()

if event.is_callback_event:
as_dict = event.as_dict()
callback_handler(**as_dict)
Expand Down
Loading