From 402286be20c5b9314cefc8453db6593afe555b87 Mon Sep 17 00:00:00 2001 From: WERCK Ayrton <9287010+Athosone@users.noreply.github.com> Date: Sat, 6 Jun 2026 21:21:41 +0000 Subject: [PATCH 1/7] fix(apps): rate-limit HttpStream to 1 req/s streaming limit HttpStream could send streaming activities faster than the Teams 1 request/second per-stream limit, which gets the stream throttled and returned as 403 ContentStreamNotAllowed (surfaced to the user as a red error toast). Two causes: _flush() sent every queued informative update back to back plus the text chunk, and the only pacing was a 0.5s reschedule that was dropped once the queue drained, so the next emit() fired immediately. Add a per-stream leaky-slot limiter (make_limiter) and gate every chunk send through it in _send_activity. Because the limiter state is shared across flushes, it paces both the in-flush burst and the next post-drain emit. min_send_interval is exposed (default 1.0s) so callers can buffer toward the docs' 1.5-2s advice. A burst of informative updates in one flush coalesces to the latest by default (coalesce_informative_updates, matching the docs' one-reused-message guidance); set it False to pace out every update instead. --- .../src/microsoft_teams/apps/http_stream.py | 32 +++++- .../microsoft_teams/apps/utils/__init__.py | 10 +- .../src/microsoft_teams/apps/utils/limiter.py | 35 +++++++ packages/apps/tests/test_http_stream.py | 98 ++++++++++++++++++- packages/apps/tests/test_limiter.py | 30 ++++++ 5 files changed, 197 insertions(+), 8 deletions(-) create mode 100644 packages/apps/src/microsoft_teams/apps/utils/limiter.py create mode 100644 packages/apps/tests/test_limiter.py diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index 05f188fad..2feda71e7 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -20,7 +20,7 @@ from microsoft_teams.common import EventEmitter from .plugins.streamer import StreamCancelledError, StreamerEvent, StreamerProtocol -from .utils import RetryOptions, retry +from .utils import RetryOptions, make_limiter, retry logger = logging.getLogger(__name__) @@ -32,27 +32,43 @@ class HttpStream(StreamerProtocol): Flow: 1. emit() adds activities to a queue 2. _flush() drains the entire queue under a lock. - 3. Informative typing updates are sent immediately if no message started. + 3. Informative typing updates are sent if no message started. 4. Message text are combined into a typing chunk. 5. Another flush is scheduled if more items remain. 6. close() waits for queue to empty, then sends final message with stream_type='stream_final' The timeout cancellation ensures only one flush operation is scheduled at a time. - The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams. + Every send goes through a per-stream limiter (min_send_interval) so we stay within + the Teams 1 req/s streaming limit. By default a burst of informative updates in one + flush collapses to the latest (coalesce_informative_updates); set it False to pace + out every update instead. """ - def __init__(self, client: ApiClient, ref: ConversationReference): + def __init__( + self, + client: ApiClient, + ref: ConversationReference, + min_send_interval: float = 1.0, + coalesce_informative_updates: bool = True, + ): """ Initialize a new HttpStream instance. Args: client (ApiClient): The API client used to send activities to Microsoft Teams. ref (ConversationReference): Reference to the Teams conversation. + min_send_interval (float): Minimum seconds between sends (Teams limits streaming to 1 req/s). + coalesce_informative_updates (bool): When True (default), a burst of informative updates in + one flush collapses to the latest one. Set False to pace out every update at 1 req/s + instead; a long burst then holds the flush lock and can delay or drop close()'s final + message (see _total_wait_timeout). """ super().__init__() self._client = client self._ref = ref self._events = EventEmitter[StreamerEvent]() + self._acquire = make_limiter(rate=1, period=min_send_interval) + self._coalesce_informative_updates = coalesce_informative_updates self._result: Optional[SentActivity] = None self._lock = asyncio.Lock() @@ -252,7 +268,10 @@ async def _flush(self) -> None: logger.debug("No activities to flush") return - # Send informative updates immediately + if self._coalesce_informative_updates and len(informative_updates) > 1: + informative_updates = informative_updates[-1:] + + # Send informative updates, paced by the limiter for typing_update in informative_updates: await self._send_activity(typing_update) @@ -278,6 +297,9 @@ async def _send_activity(self, to_send: TypingActivityInput): Args: activity: The activity to send. """ + # Paces sends to the Teams 1 req/s limit. This sleeps while _flush holds + # self._lock, so a long informative burst delays close() (see _total_wait_timeout). + await self._acquire() if self._id: to_send = to_send.with_id(self._id) to_send = to_send.add_stream_update(self._index) diff --git a/packages/apps/src/microsoft_teams/apps/utils/__init__.py b/packages/apps/src/microsoft_teams/apps/utils/__init__.py index 511041a5a..2b26c5ec5 100644 --- a/packages/apps/src/microsoft_teams/apps/utils/__init__.py +++ b/packages/apps/src/microsoft_teams/apps/utils/__init__.py @@ -5,7 +5,15 @@ from .activity_utils import extract_tenant_id from .graph import create_graph_client +from .limiter import make_limiter from .retry import RetryOptions, retry from .thread import to_threaded_conversation_id -__all__ = ["create_graph_client", "extract_tenant_id", "retry", "RetryOptions", "to_threaded_conversation_id"] +__all__ = [ + "create_graph_client", + "extract_tenant_id", + "make_limiter", + "retry", + "RetryOptions", + "to_threaded_conversation_id", +] diff --git a/packages/apps/src/microsoft_teams/apps/utils/limiter.py b/packages/apps/src/microsoft_teams/apps/utils/limiter.py new file mode 100644 index 000000000..89e3325f7 --- /dev/null +++ b/packages/apps/src/microsoft_teams/apps/utils/limiter.py @@ -0,0 +1,35 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. +""" + +import asyncio +from time import monotonic +from typing import Awaitable, Callable + + +def make_limiter(rate: int, period: float = 1.0) -> Callable[[], Awaitable[None]]: + """Leaky-slot limiter: at most `rate` acquisitions per `period` seconds. + + The slot is reserved (read then write of `next_slot`) with no await in + between, so reservations are race-free under single-threaded asyncio. The + first call never waits. + """ + if rate < 1: + raise ValueError("rate must be >= 1") + if period < 0: + raise ValueError("period must be >= 0") + + interval = period / rate + next_slot = monotonic() + + async def acquire() -> None: + nonlocal next_slot + now = monotonic() + slot = max(now, next_slot) + next_slot = slot + interval + wait = slot - now + if wait > 0: + await asyncio.sleep(wait) + + return acquire diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index 53173c4cb..87e90c8b7 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -5,6 +5,7 @@ # pyright: basic import asyncio +from time import monotonic from unittest.mock import MagicMock, patch import pytest @@ -176,7 +177,7 @@ async def test_stream_sequence_of_update_and_emit( loop = asyncio.get_running_loop() patcher, scheduled = patch_loop_call_later(loop) with patcher: - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) stream.update("Preparing response...") stream.emit("Final response message") await asyncio.sleep(0) @@ -302,7 +303,7 @@ async def mock_send_then_403(activity): ) mock_api_client.conversations.activities().create = mock_send_then_403 - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) # First emit succeeds stream.emit("First message") @@ -453,3 +454,96 @@ async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversa assert result is not None assert mock_api_client.send_call_count == 1 assert mock_api_client.sent_activities[0].text == "Response text" + + @pytest.mark.asyncio + async def test_rapid_updates_are_paced_not_dropped_when_coalesce_off(self, mock_api_client, conversation_reference): + interval = 0.05 + send_times: list[float] = [] + + async def mock_send(activity): + send_times.append(monotonic()) + mock_api_client.send_call_count += 1 + mock_api_client.sent_activities.append(activity) + return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity) + + mock_api_client.conversations.activities().create = mock_send + + stream = HttpStream( + mock_api_client, + conversation_reference, + min_send_interval=interval, + coalesce_informative_updates=False, + ) + for i in range(8): + stream.update(f"progress {i}") + + task = stream._pending + assert task is not None + await task + + texts = [a.text for a in mock_api_client.sent_activities] + assert texts == [f"progress {i}" for i in range(8)] # all sent, in order, none dropped + gaps = [b - a for a, b in zip(send_times, send_times[1:], strict=False)] + assert min(gaps) >= interval * 0.9 # each subsequent send waited ~one interval + + @pytest.mark.asyncio + @pytest.mark.parametrize("interval", [0.05, 0.15]) + async def test_consecutive_emits_are_paced_across_flushes(self, mock_api_client, conversation_reference, interval): + send_times: list[float] = [] + + async def mock_send(activity): + send_times.append(monotonic()) + mock_api_client.send_call_count += 1 + mock_api_client.sent_activities.append(activity) + return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity) + + mock_api_client.conversations.activities().create = mock_send + + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) + # Each update drains its own flush, so pacing must hold across flushes (bug 2). + for i in range(3): + stream.update(f"step {i}") + task = stream._pending + assert task is not None + await task + + texts = [a.text for a in mock_api_client.sent_activities] + assert texts == ["step 0", "step 1", "step 2"] + gaps = [b - a for a, b in zip(send_times, send_times[1:], strict=False)] + assert min(gaps) >= interval * 0.9 + + @pytest.mark.asyncio + async def test_coalesce_drops_intermediate_informative_in_burst_by_default( + self, mock_api_client, conversation_reference + ): + # Coalescing is the default, so a burst must not be paced one-by-one. + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.05) + for i in range(8): + stream.update(f"progress {i}") + + task = stream._pending + assert task is not None + await task + + # A burst collapses to the latest informative bubble. + assert mock_api_client.send_call_count == 1 + assert mock_api_client.sent_activities[0].text == "progress 7" + + @pytest.mark.asyncio + async def test_coalesce_does_not_drop_text(self, mock_api_client, conversation_reference, patch_loop_call_later): + loop = asyncio.get_running_loop() + patcher, scheduled = patch_loop_call_later(loop) + with patcher: + stream = HttpStream( + mock_api_client, + conversation_reference, + min_send_interval=0.01, + coalesce_informative_updates=True, + ) + stream.update("Thinking...") + stream.emit("The answer") + await asyncio.sleep(0) + await self._run_scheduled_flushes(scheduled) + + texts = [a.text for a in mock_api_client.sent_activities] + assert texts == ["Thinking...", "The answer"] # informative + text both sent diff --git a/packages/apps/tests/test_limiter.py b/packages/apps/tests/test_limiter.py new file mode 100644 index 000000000..18f9cf2b1 --- /dev/null +++ b/packages/apps/tests/test_limiter.py @@ -0,0 +1,30 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. +""" + +from time import monotonic + +import pytest +from microsoft_teams.apps.utils import make_limiter + + +@pytest.mark.asyncio +async def test_make_limiter_spaces_calls(): + acquire = make_limiter(rate=1, period=0.05) + + start = monotonic() + await acquire() # first call returns immediately + first_done = monotonic() + await acquire() + await acquire() + elapsed = monotonic() - start + + assert first_done - start < 0.05 # leading edge is not delayed + assert elapsed >= 0.10 # two subsequent calls each waited one interval + + +@pytest.mark.parametrize("rate, period", [(0, 1.0), (-1, 1.0), (1, -0.5)]) +def test_make_limiter_rejects_invalid_args(rate, period): + with pytest.raises(ValueError): + make_limiter(rate=rate, period=period) From bd1e0a3cf20020df2bf2a0ce11d77613c91151d4 Mon Sep 17 00:00:00 2001 From: Ayrton WERCK Date: Sat, 6 Jun 2026 22:30:57 +0000 Subject: [PATCH 2/7] fix(apps): pace retries and final close() send through the limiter Acquire the per-stream limiter in _send instead of _send_activity so every HTTP attempt is paced, including retries and close()'s final send. This keeps a retry or the final message from landing within the interval of the last chunk and tripping the Teams 1 req/s throttle. acquire() only waits when a send would actually be too soon, so close() pays no latency after an idle gap; min_send_interval=0 disables pacing. Tests that don't assert pacing construct with min_send_interval=0 to stay fast. Addresses Copilot review feedback on #453. --- .../src/microsoft_teams/apps/http_stream.py | 16 ++++-- packages/apps/tests/test_http_stream.py | 50 ++++++++++++++----- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index 2feda71e7..c1da82416 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -57,7 +57,8 @@ def __init__( Args: client (ApiClient): The API client used to send activities to Microsoft Teams. ref (ConversationReference): Reference to the Teams conversation. - min_send_interval (float): Minimum seconds between sends (Teams limits streaming to 1 req/s). + min_send_interval (float): Minimum seconds between sends, including retries and the final + close() send (Teams limits streaming to 1 req/s). Set 0 to disable pacing. coalesce_informative_updates (bool): When True (default), a burst of informative updates in one flush collapses to the latest one. Set False to pace out every update at 1 req/s instead; a long burst then holds the flush lock and can delay or drop close()'s final @@ -297,9 +298,6 @@ async def _send_activity(self, to_send: TypingActivityInput): Args: activity: The activity to send. """ - # Paces sends to the Teams 1 req/s limit. This sleeps while _flush holds - # self._lock, so a long informative burst delays close() (see _total_wait_timeout). - await self._acquire() if self._id: to_send = to_send.with_id(self._id) to_send = to_send.add_stream_update(self._index) @@ -325,6 +323,16 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) logger.warning("Teams channel stopped the stream.") raise StreamCancelledError("Teams channel stopped the stream.") + # Pace every HTTP attempt to the Teams 1 req/s streaming limit. Gating here + # rather than in _send_activity also paces retries and close()'s final send, + # so a retry storm or a final message sent right behind the last chunk can't + # trip the throttle. acquire() only waits when a send would actually be too + # soon (next_slot is in the past after any idle gap), so close() pays no + # latency unless it lands within the interval. While _flush holds self._lock + # this sleeps under it, so a long informative burst still delays close() + # (see _total_wait_timeout). Pass min_send_interval=0 to disable pacing. + await self._acquire() + to_send.from_ = self._ref.bot to_send.conversation = self._ref.conversation diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index 87e90c8b7..36165c47a 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -64,7 +64,7 @@ def conversation_reference(self): @pytest.fixture def http_stream(self, mock_api_client, conversation_reference): - return HttpStream(mock_api_client, conversation_reference) + return HttpStream(mock_api_client, conversation_reference, min_send_interval=0) @pytest.fixture def patch_loop_call_later(self): @@ -117,7 +117,7 @@ async def mock_send_with_timeout(activity): return SentActivity(id=f"success-after-timeout-{call_count}", activity_params=activity) mock_api_client.conversations.activities().create = mock_send_with_timeout - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.emit("Test message with timeout") await asyncio.sleep(0) @@ -142,7 +142,7 @@ async def mock_send_all_timeout(activity): raise TimeoutError("All operations timed out") mock_api_client.conversations.activities().create = mock_send_all_timeout - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.emit("Test message with all timeouts") await asyncio.sleep(0) @@ -157,7 +157,7 @@ async def test_stream_update_status_sends_typing_activity( loop = asyncio.get_running_loop() patcher, scheduled = patch_loop_call_later(loop) with patcher: - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.update("Thinking...") await asyncio.sleep(0) await self._run_scheduled_flushes(scheduled) @@ -215,7 +215,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().create = mock_send with patcher: - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) async def emit_task(): stream.emit("Concurrent message") @@ -240,7 +240,7 @@ async def mock_send_403(activity): ) mock_api_client.conversations.activities().create = mock_send_403 - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.emit("Test message") await asyncio.sleep(0) @@ -262,7 +262,7 @@ async def mock_send_403(activity): ) mock_api_client.conversations.activities().create = mock_send_403 - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.emit("First message") await asyncio.sleep(0) @@ -276,7 +276,7 @@ async def mock_send_403(activity): @pytest.mark.asyncio async def test_send_blocked_after_cancel(self, mock_api_client, conversation_reference): - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream._canceled = True with pytest.raises(StreamCancelledError, match="Teams channel stopped the stream."): @@ -327,7 +327,7 @@ async def mock_send_then_403(activity): @pytest.mark.asyncio async def test_close_returns_none_when_canceled(self, mock_api_client, conversation_reference): - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream._canceled = True result = await stream.close() @@ -357,7 +357,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().update = mock_send with patcher: - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) early_actions = SuggestedActions( to=[], @@ -407,7 +407,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().update = mock_send with patcher: - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) actions = SuggestedActions( to=[], @@ -431,7 +431,7 @@ async def mock_send(activity): @pytest.mark.asyncio async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversation_reference): """close() must not send the final message while a flush is still mid-await.""" - stream = HttpStream(mock_api_client, conversation_reference) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) # Simulate a flush in progress: lock held, _id assigned, text accumulated. # This mirrors the window after the inner queue drain but before SendActivity awaits resolve. @@ -547,3 +547,29 @@ async def test_coalesce_does_not_drop_text(self, mock_api_client, conversation_r texts = [a.text for a in mock_api_client.sent_activities] assert texts == ["Thinking...", "The answer"] # informative + text both sent + + @pytest.mark.asyncio + async def test_close_final_send_is_paced(self, mock_api_client, conversation_reference): + # The final close() send goes through the limiter too, so it can't land + # right behind the last chunk and trip the throttle. + interval = 0.05 + send_times: list[float] = [] + + async def mock_send(activity): + send_times.append(monotonic()) + mock_api_client.send_call_count += 1 + mock_api_client.sent_activities.append(activity) + return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity) + + mock_api_client.conversations.activities().create = mock_send + mock_api_client.conversations.activities().update = mock_send + + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) + stream.emit("chunk") + task = stream._pending + assert task is not None + await task + await stream.close() + + assert len(send_times) == 2 # chunk + final + assert send_times[1] - send_times[0] >= interval * 0.9 From fa6ae49b493b94413a337dc56351a7d921a94565 Mon Sep 17 00:00:00 2001 From: Ayrton WERCK Date: Wed, 10 Jun 2026 15:02:13 +0000 Subject: [PATCH 3/7] =?UTF-8?q?fix(apps):=20address=20review=20=E2=80=94?= =?UTF-8?q?=20plumb=20stream=20options,=20pace-don't-drop=20default,=20sim?= =?UTF-8?q?pler=20limiter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Plumb stream_min_send_interval and stream_coalesce_informative_updates through AppOptions -> ActivitySender -> HttpStream - Flip coalesce_informative_updates default to False so informative updates are paced, not silently dropped; coalescing is now opt-in - Simplify make_limiter(rate, period) to make_limiter(interval) and reword its docstring as a fixed-interval gate (token bucket of size 1) --- .../microsoft_teams/apps/activity_sender.py | 20 +++++++++++++++++-- packages/apps/src/microsoft_teams/apps/app.py | 6 +++++- .../src/microsoft_teams/apps/http_stream.py | 18 ++++++++--------- .../apps/src/microsoft_teams/apps/options.py | 14 +++++++++++++ .../src/microsoft_teams/apps/utils/limiter.py | 14 ++++++------- packages/apps/tests/test_http_stream.py | 20 +++++++++---------- packages/apps/tests/test_limiter.py | 8 ++++---- 7 files changed, 66 insertions(+), 34 deletions(-) diff --git a/packages/apps/src/microsoft_teams/apps/activity_sender.py b/packages/apps/src/microsoft_teams/apps/activity_sender.py index 1ab97c96c..4dda40a02 100644 --- a/packages/apps/src/microsoft_teams/apps/activity_sender.py +++ b/packages/apps/src/microsoft_teams/apps/activity_sender.py @@ -27,14 +27,25 @@ class ActivitySender: Separate from transport concerns (HTTP, WebSocket, etc.) """ - def __init__(self, client: Client): + def __init__( + self, + client: Client, + stream_min_send_interval: float = 1.0, + stream_coalesce_informative_updates: bool = False, + ): """ Initialize ActivitySender. Args: client: HTTP client with token provider configured + stream_min_send_interval: Minimum seconds between sends on streams created by + create_stream() (Teams limits streaming to 1 req/s). Set 0 to disable pacing. + stream_coalesce_informative_updates: When True, a burst of informative updates in one + flush collapses to the latest one instead of pacing out every update. """ self._client = client + self._stream_min_send_interval = stream_min_send_interval + self._stream_coalesce_informative_updates = stream_coalesce_informative_updates async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: """ @@ -92,4 +103,9 @@ def create_stream(self, ref: ConversationReference) -> StreamerProtocol: """ # Create API client for this conversation's service URL api = ApiClient(ref.service_url, self._client) - return HttpStream(api, ref) + return HttpStream( + api, + ref, + min_send_interval=self._stream_min_send_interval, + coalesce_informative_updates=self._stream_coalesce_informative_updates, + ) diff --git a/packages/apps/src/microsoft_teams/apps/app.py b/packages/apps/src/microsoft_teams/apps/app.py index bed082791..872fa93a9 100644 --- a/packages/apps/src/microsoft_teams/apps/app.py +++ b/packages/apps/src/microsoft_teams/apps/app.py @@ -130,7 +130,11 @@ def __init__(self, **options: Unpack[AppOptions]): self._initialized = False # initialize ActivitySender for sending activities - self.activity_sender = ActivitySender(self.http_client.clone(ClientOptions(token=self._get_bot_token))) + self.activity_sender = ActivitySender( + self.http_client.clone(ClientOptions(token=self._get_bot_token)), + stream_min_send_interval=self.options.stream_min_send_interval, + stream_coalesce_informative_updates=self.options.stream_coalesce_informative_updates, + ) # initialize all event, activity, and plugin processors self.activity_processor = ActivityProcessor( diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index c1da82416..3301949dc 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -39,9 +39,9 @@ class HttpStream(StreamerProtocol): The timeout cancellation ensures only one flush operation is scheduled at a time. Every send goes through a per-stream limiter (min_send_interval) so we stay within - the Teams 1 req/s streaming limit. By default a burst of informative updates in one - flush collapses to the latest (coalesce_informative_updates); set it False to pace - out every update instead. + the Teams 1 req/s streaming limit. By default every informative update is kept and + paced out; set coalesce_informative_updates=True to collapse a burst of informative + updates in one flush to the latest one instead. """ def __init__( @@ -49,7 +49,7 @@ def __init__( client: ApiClient, ref: ConversationReference, min_send_interval: float = 1.0, - coalesce_informative_updates: bool = True, + coalesce_informative_updates: bool = False, ): """ Initialize a new HttpStream instance. @@ -59,16 +59,16 @@ def __init__( ref (ConversationReference): Reference to the Teams conversation. min_send_interval (float): Minimum seconds between sends, including retries and the final close() send (Teams limits streaming to 1 req/s). Set 0 to disable pacing. - coalesce_informative_updates (bool): When True (default), a burst of informative updates in - one flush collapses to the latest one. Set False to pace out every update at 1 req/s - instead; a long burst then holds the flush lock and can delay or drop close()'s final - message (see _total_wait_timeout). + coalesce_informative_updates (bool): When False (default), every informative update is + paced out at min_send_interval; a long burst then holds the flush lock and can delay + or drop close()'s final message (see _total_wait_timeout). Set True to collapse a + burst of informative updates in one flush to the latest one instead. """ super().__init__() self._client = client self._ref = ref self._events = EventEmitter[StreamerEvent]() - self._acquire = make_limiter(rate=1, period=min_send_interval) + self._acquire = make_limiter(min_send_interval) self._coalesce_informative_updates = coalesce_informative_updates self._result: Optional[SentActivity] = None diff --git a/packages/apps/src/microsoft_teams/apps/options.py b/packages/apps/src/microsoft_teams/apps/options.py index c9d7a363f..fd34b5fca 100644 --- a/packages/apps/src/microsoft_teams/apps/options.py +++ b/packages/apps/src/microsoft_teams/apps/options.py @@ -84,6 +84,14 @@ class AppOptions(TypedDict, total=False): Defaults to PUBLIC (commercial cloud). """ + # Streaming + stream_min_send_interval: Optional[float] + """Minimum seconds between sends on a stream, including retries and the final close() send. + Defaults to 1.0 (Teams limits streaming to 1 req/s). Set 0 to disable pacing.""" + stream_coalesce_informative_updates: Optional[bool] + """When True, a burst of informative updates in one flush collapses to the latest one. + Defaults to False: every informative update is kept and paced out at stream_min_send_interval.""" + @dataclass class InternalAppOptions: @@ -133,6 +141,12 @@ class InternalAppOptions: """URL path for the Teams messaging endpoint. Defaults to '/api/messages'.""" cloud: Optional[CloudEnvironment] = None """Cloud environment for sovereign cloud support.""" + stream_min_send_interval: float = 1.0 + """Minimum seconds between sends on a stream, including retries and the final close() send. + Defaults to 1.0 (Teams limits streaming to 1 req/s). Set 0 to disable pacing.""" + stream_coalesce_informative_updates: bool = False + """When True, a burst of informative updates in one flush collapses to the latest one. + Defaults to False: every informative update is kept and paced out at stream_min_send_interval.""" @classmethod def from_typeddict(cls, options: AppOptions) -> "InternalAppOptions": diff --git a/packages/apps/src/microsoft_teams/apps/utils/limiter.py b/packages/apps/src/microsoft_teams/apps/utils/limiter.py index 89e3325f7..c43857762 100644 --- a/packages/apps/src/microsoft_teams/apps/utils/limiter.py +++ b/packages/apps/src/microsoft_teams/apps/utils/limiter.py @@ -8,19 +8,17 @@ from typing import Awaitable, Callable -def make_limiter(rate: int, period: float = 1.0) -> Callable[[], Awaitable[None]]: - """Leaky-slot limiter: at most `rate` acquisitions per `period` seconds. +def make_limiter(interval: float) -> Callable[[], Awaitable[None]]: + """Fixed-interval gate (a token bucket of size 1): consecutive + acquisitions are spaced at least `interval` seconds apart. The slot is reserved (read then write of `next_slot`) with no await in between, so reservations are race-free under single-threaded asyncio. The - first call never waits. + first call never waits, and `interval=0` disables pacing. """ - if rate < 1: - raise ValueError("rate must be >= 1") - if period < 0: - raise ValueError("period must be >= 0") + if interval < 0: + raise ValueError("interval must be >= 0") - interval = period / rate next_slot = monotonic() async def acquire() -> None: diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index 36165c47a..f0a107410 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -456,7 +456,7 @@ async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversa assert mock_api_client.sent_activities[0].text == "Response text" @pytest.mark.asyncio - async def test_rapid_updates_are_paced_not_dropped_when_coalesce_off(self, mock_api_client, conversation_reference): + async def test_rapid_updates_are_paced_not_dropped_by_default(self, mock_api_client, conversation_reference): interval = 0.05 send_times: list[float] = [] @@ -468,12 +468,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().create = mock_send - stream = HttpStream( - mock_api_client, - conversation_reference, - min_send_interval=interval, - coalesce_informative_updates=False, - ) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) for i in range(8): stream.update(f"progress {i}") @@ -513,11 +508,16 @@ async def mock_send(activity): assert min(gaps) >= interval * 0.9 @pytest.mark.asyncio - async def test_coalesce_drops_intermediate_informative_in_burst_by_default( + async def test_coalesce_drops_intermediate_informative_in_burst_when_opted_in( self, mock_api_client, conversation_reference ): - # Coalescing is the default, so a burst must not be paced one-by-one. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.05) + # Coalescing is opt-in; when enabled, a burst is not paced one-by-one. + stream = HttpStream( + mock_api_client, + conversation_reference, + min_send_interval=0.05, + coalesce_informative_updates=True, + ) for i in range(8): stream.update(f"progress {i}") diff --git a/packages/apps/tests/test_limiter.py b/packages/apps/tests/test_limiter.py index 18f9cf2b1..e987498d4 100644 --- a/packages/apps/tests/test_limiter.py +++ b/packages/apps/tests/test_limiter.py @@ -11,7 +11,7 @@ @pytest.mark.asyncio async def test_make_limiter_spaces_calls(): - acquire = make_limiter(rate=1, period=0.05) + acquire = make_limiter(0.05) start = monotonic() await acquire() # first call returns immediately @@ -24,7 +24,7 @@ async def test_make_limiter_spaces_calls(): assert elapsed >= 0.10 # two subsequent calls each waited one interval -@pytest.mark.parametrize("rate, period", [(0, 1.0), (-1, 1.0), (1, -0.5)]) -def test_make_limiter_rejects_invalid_args(rate, period): +@pytest.mark.parametrize("interval", [-0.5, -1]) +def test_make_limiter_rejects_negative_interval(interval: float): with pytest.raises(ValueError): - make_limiter(rate=rate, period=period) + make_limiter(interval) From bd8eedb6a912485467677ff66c44226ffc4598ad Mon Sep 17 00:00:00 2001 From: Ayrton WERCK Date: Wed, 10 Jun 2026 21:42:11 +0000 Subject: [PATCH 4/7] fix(apps): always coalesce informative updates, add informative/text stream modes Informative updates are status replacements, not cumulative content, so a burst now always collapses to the latest one (skipped count logged at debug level) instead of being paced out one-by-one or gated behind a flag. The stream now has two explicit modes: informative mode sends the latest pending informative update per flush; once text streaming starts the stream permanently switches to text mode and informative updates are dropped. Text landing in the same flush as informative updates supersedes them. Removes the stream_min_send_interval / stream_coalesce_informative_updates plumbing from AppOptions and ActivitySender; pacing stays fixed at the Teams 1 req/s limit and all sends keep going through the same limiter/retry path. --- .../microsoft_teams/apps/activity_sender.py | 20 +-- packages/apps/src/microsoft_teams/apps/app.py | 6 +- .../src/microsoft_teams/apps/http_stream.py | 57 ++++--- .../apps/src/microsoft_teams/apps/options.py | 14 -- packages/apps/tests/test_http_stream.py | 147 +++++++++--------- 5 files changed, 112 insertions(+), 132 deletions(-) diff --git a/packages/apps/src/microsoft_teams/apps/activity_sender.py b/packages/apps/src/microsoft_teams/apps/activity_sender.py index 4dda40a02..1ab97c96c 100644 --- a/packages/apps/src/microsoft_teams/apps/activity_sender.py +++ b/packages/apps/src/microsoft_teams/apps/activity_sender.py @@ -27,25 +27,14 @@ class ActivitySender: Separate from transport concerns (HTTP, WebSocket, etc.) """ - def __init__( - self, - client: Client, - stream_min_send_interval: float = 1.0, - stream_coalesce_informative_updates: bool = False, - ): + def __init__(self, client: Client): """ Initialize ActivitySender. Args: client: HTTP client with token provider configured - stream_min_send_interval: Minimum seconds between sends on streams created by - create_stream() (Teams limits streaming to 1 req/s). Set 0 to disable pacing. - stream_coalesce_informative_updates: When True, a burst of informative updates in one - flush collapses to the latest one instead of pacing out every update. """ self._client = client - self._stream_min_send_interval = stream_min_send_interval - self._stream_coalesce_informative_updates = stream_coalesce_informative_updates async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity: """ @@ -103,9 +92,4 @@ def create_stream(self, ref: ConversationReference) -> StreamerProtocol: """ # Create API client for this conversation's service URL api = ApiClient(ref.service_url, self._client) - return HttpStream( - api, - ref, - min_send_interval=self._stream_min_send_interval, - coalesce_informative_updates=self._stream_coalesce_informative_updates, - ) + return HttpStream(api, ref) diff --git a/packages/apps/src/microsoft_teams/apps/app.py b/packages/apps/src/microsoft_teams/apps/app.py index 872fa93a9..bed082791 100644 --- a/packages/apps/src/microsoft_teams/apps/app.py +++ b/packages/apps/src/microsoft_teams/apps/app.py @@ -130,11 +130,7 @@ def __init__(self, **options: Unpack[AppOptions]): self._initialized = False # initialize ActivitySender for sending activities - self.activity_sender = ActivitySender( - self.http_client.clone(ClientOptions(token=self._get_bot_token)), - stream_min_send_interval=self.options.stream_min_send_interval, - stream_coalesce_informative_updates=self.options.stream_coalesce_informative_updates, - ) + self.activity_sender = ActivitySender(self.http_client.clone(ClientOptions(token=self._get_bot_token))) # initialize all event, activity, and plugin processors self.activity_processor = ActivityProcessor( diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index 3301949dc..e25d42cfc 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -32,16 +32,21 @@ class HttpStream(StreamerProtocol): Flow: 1. emit() adds activities to a queue 2. _flush() drains the entire queue under a lock. - 3. Informative typing updates are sent if no message started. - 4. Message text are combined into a typing chunk. - 5. Another flush is scheduled if more items remain. - 6. close() waits for queue to empty, then sends final message with stream_type='stream_final' + 3. Message text are combined into a typing chunk. + 4. Another flush is scheduled if more items remain. + 5. close() waits for queue to empty, then sends final message with stream_type='stream_final' + + The stream has two modes: + - Informative mode (initial): each flush sends only the latest pending informative + update — they are status replacements, not cumulative content, so intermediate + ones are skipped (count logged at debug level). + - Text mode: entered once text streaming starts, permanently. Informative updates + are no longer sent (count logged at debug level). The timeout cancellation ensures only one flush operation is scheduled at a time. - Every send goes through a per-stream limiter (min_send_interval) so we stay within - the Teams 1 req/s streaming limit. By default every informative update is kept and - paced out; set coalesce_informative_updates=True to collapse a burst of informative - updates in one flush to the latest one instead. + Every send — informative updates, text chunks, retries, and the final close() send — + goes through the same per-stream limiter (min_send_interval) so we stay within the + Teams 1 req/s streaming limit. """ def __init__( @@ -49,7 +54,6 @@ def __init__( client: ApiClient, ref: ConversationReference, min_send_interval: float = 1.0, - coalesce_informative_updates: bool = False, ): """ Initialize a new HttpStream instance. @@ -59,17 +63,12 @@ def __init__( ref (ConversationReference): Reference to the Teams conversation. min_send_interval (float): Minimum seconds between sends, including retries and the final close() send (Teams limits streaming to 1 req/s). Set 0 to disable pacing. - coalesce_informative_updates (bool): When False (default), every informative update is - paced out at min_send_interval; a long burst then holds the flush lock and can delay - or drop close()'s final message (see _total_wait_timeout). Set True to collapse a - burst of informative updates in one flush to the latest one instead. """ super().__init__() self._client = client self._ref = ref self._events = EventEmitter[StreamerEvent]() self._acquire = make_limiter(min_send_interval) - self._coalesce_informative_updates = coalesce_informative_updates self._result: Optional[SentActivity] = None self._lock = asyncio.Lock() @@ -86,6 +85,7 @@ def _reset_state(self) -> None: self._index = 1 self._id: Optional[str] = None self._text: str = "" + self._text_mode: bool = False self._channel_data: ChannelData = ChannelData() self._final_activity: Optional[MessageActivityInput] = None self._queue: deque[Union[MessageActivityInput, TypingActivityInput, str]] = deque() @@ -157,6 +157,9 @@ def clear_text(self) -> None: previously-flushed attachments/suggested actions aren't sent if the caller never emits a replacement. The stream id and channel data are kept intact so the new final activity still updates the stream in place. + + Text mode is NOT reset: once text streaming has started, informative + updates stay disabled even if the text buffer is cleared. """ # Safe without the lock: no await points here, so this runs atomically # w.r.t. the event loop and can't interleave with _flush's critical @@ -259,22 +262,29 @@ async def _flush(self) -> None: if ( isinstance(activity, TypingActivityInput) and getattr(activity.channel_data, "stream_type", None) == "informative" - and self._text == "" ): - # If `_text` is not empty then it's possible that streaming has started. - # And so informative updates cannot be sent. informative_updates.append(activity) if start_length == 0: logger.debug("No activities to flush") return - if self._coalesce_informative_updates and len(informative_updates) > 1: + # Any accumulated text — even from this same flush — switches the stream + # to text mode for good: the streamed text replaces the status bubble in + # the Teams UI, so informative updates are pointless from then on. + if self._text: + self._text_mode = True + + if informative_updates and self._text_mode: + logger.debug("Dropped %d informative update(s): text streaming has started", len(informative_updates)) + informative_updates = [] + elif len(informative_updates) > 1: + logger.debug("Coalesced %d informative update(s) into the latest", len(informative_updates) - 1) informative_updates = informative_updates[-1:] - # Send informative updates, paced by the limiter - for typing_update in informative_updates: - await self._send_activity(typing_update) + # Send the surviving informative update (at most one), paced by the limiter + if informative_updates: + await self._send_activity(informative_updates[-1]) # Send the combined text chunk if self._text: @@ -328,9 +338,8 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) # so a retry storm or a final message sent right behind the last chunk can't # trip the throttle. acquire() only waits when a send would actually be too # soon (next_slot is in the past after any idle gap), so close() pays no - # latency unless it lands within the interval. While _flush holds self._lock - # this sleeps under it, so a long informative burst still delays close() - # (see _total_wait_timeout). Pass min_send_interval=0 to disable pacing. + # latency unless it lands within the interval. Pass min_send_interval=0 to + # disable pacing. await self._acquire() to_send.from_ = self._ref.bot diff --git a/packages/apps/src/microsoft_teams/apps/options.py b/packages/apps/src/microsoft_teams/apps/options.py index fd34b5fca..c9d7a363f 100644 --- a/packages/apps/src/microsoft_teams/apps/options.py +++ b/packages/apps/src/microsoft_teams/apps/options.py @@ -84,14 +84,6 @@ class AppOptions(TypedDict, total=False): Defaults to PUBLIC (commercial cloud). """ - # Streaming - stream_min_send_interval: Optional[float] - """Minimum seconds between sends on a stream, including retries and the final close() send. - Defaults to 1.0 (Teams limits streaming to 1 req/s). Set 0 to disable pacing.""" - stream_coalesce_informative_updates: Optional[bool] - """When True, a burst of informative updates in one flush collapses to the latest one. - Defaults to False: every informative update is kept and paced out at stream_min_send_interval.""" - @dataclass class InternalAppOptions: @@ -141,12 +133,6 @@ class InternalAppOptions: """URL path for the Teams messaging endpoint. Defaults to '/api/messages'.""" cloud: Optional[CloudEnvironment] = None """Cloud environment for sovereign cloud support.""" - stream_min_send_interval: float = 1.0 - """Minimum seconds between sends on a stream, including retries and the final close() send. - Defaults to 1.0 (Teams limits streaming to 1 req/s). Set 0 to disable pacing.""" - stream_coalesce_informative_updates: bool = False - """When True, a burst of informative updates in one flush collapses to the latest one. - Defaults to False: every informative update is kept and paced out at stream_min_send_interval.""" @classmethod def from_typeddict(cls, options: AppOptions) -> "InternalAppOptions": diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index f0a107410..eeca01127 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -5,6 +5,7 @@ # pyright: basic import asyncio +import logging from time import monotonic from unittest.mock import MagicMock, patch @@ -171,26 +172,28 @@ async def test_stream_update_status_sends_typing_activity( assert stream.sequence >= 2 @pytest.mark.asyncio - async def test_stream_sequence_of_update_and_emit( - self, mock_api_client, conversation_reference, patch_loop_call_later - ): - loop = asyncio.get_running_loop() - patcher, scheduled = patch_loop_call_later(loop) - with patcher: - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) - stream.update("Preparing response...") - stream.emit("Final response message") - await asyncio.sleep(0) - await self._run_scheduled_flushes(scheduled) + async def test_stream_sequence_of_update_and_emit(self, mock_api_client, conversation_reference): + # Informative mode then text mode: the informative update flushes first, + # then text streaming starts in a later flush. + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream.update("Preparing response...") + task = stream._pending + assert task is not None + await task - assert len(mock_api_client.sent_activities) >= 2 - typing_activity = mock_api_client.sent_activities[0] - message_activity = mock_api_client.sent_activities[1] + stream.emit("Final response message") + task = stream._pending + assert task is not None + await task - assert isinstance(typing_activity, TypingActivityInput) - assert typing_activity.text == "Preparing response..." - assert message_activity.text == "Final response message" - assert stream.sequence >= 3 + assert len(mock_api_client.sent_activities) == 2 + typing_activity = mock_api_client.sent_activities[0] + message_activity = mock_api_client.sent_activities[1] + + assert isinstance(typing_activity, TypingActivityInput) + assert typing_activity.text == "Preparing response..." + assert message_activity.text == "Final response message" + assert stream.sequence >= 3 @pytest.mark.asyncio async def test_stream_concurrent_emits_do_not_flush_simultaneously( @@ -456,30 +459,21 @@ async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversa assert mock_api_client.sent_activities[0].text == "Response text" @pytest.mark.asyncio - async def test_rapid_updates_are_paced_not_dropped_by_default(self, mock_api_client, conversation_reference): - interval = 0.05 - send_times: list[float] = [] + async def test_informative_burst_coalesces_to_latest(self, mock_api_client, conversation_reference, caplog): + # Informative updates are status replacements, not cumulative content: + # a burst in one flush collapses to the latest one. + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.05) + with caplog.at_level(logging.DEBUG, logger="microsoft_teams.apps.http_stream"): + for i in range(8): + stream.update(f"progress {i}") - async def mock_send(activity): - send_times.append(monotonic()) - mock_api_client.send_call_count += 1 - mock_api_client.sent_activities.append(activity) - return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity) - - mock_api_client.conversations.activities().create = mock_send - - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) - for i in range(8): - stream.update(f"progress {i}") - - task = stream._pending - assert task is not None - await task + task = stream._pending + assert task is not None + await task - texts = [a.text for a in mock_api_client.sent_activities] - assert texts == [f"progress {i}" for i in range(8)] # all sent, in order, none dropped - gaps = [b - a for a, b in zip(send_times, send_times[1:], strict=False)] - assert min(gaps) >= interval * 0.9 # each subsequent send waited ~one interval + assert mock_api_client.send_call_count == 1 + assert mock_api_client.sent_activities[0].text == "progress 7" + assert "Coalesced 7 informative update(s) into the latest" in caplog.text @pytest.mark.asyncio @pytest.mark.parametrize("interval", [0.05, 0.15]) @@ -508,45 +502,56 @@ async def mock_send(activity): assert min(gaps) >= interval * 0.9 @pytest.mark.asyncio - async def test_coalesce_drops_intermediate_informative_in_burst_when_opted_in( - self, mock_api_client, conversation_reference - ): - # Coalescing is opt-in; when enabled, a burst is not paced one-by-one. - stream = HttpStream( - mock_api_client, - conversation_reference, - min_send_interval=0.05, - coalesce_informative_updates=True, - ) - for i in range(8): - stream.update(f"progress {i}") + async def test_text_supersedes_informative_in_same_flush(self, mock_api_client, conversation_reference): + # When text lands in the same flush as informative updates, the text wins: + # it replaces the status bubble immediately anyway, so the informative + # update would only burn a paced slot before real content. + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream.update("Thinking...") + stream.emit("The answer") + task = stream._pending + assert task is not None + await task + + texts = [a.text for a in mock_api_client.sent_activities] + assert texts == ["The answer"] + @pytest.mark.asyncio + async def test_informative_dropped_after_text_started(self, mock_api_client, conversation_reference, caplog): + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream.emit("chunk") task = stream._pending assert task is not None await task - # A burst collapses to the latest informative bubble. - assert mock_api_client.send_call_count == 1 - assert mock_api_client.sent_activities[0].text == "progress 7" + with caplog.at_level(logging.DEBUG, logger="microsoft_teams.apps.http_stream"): + stream.update("status") + task = stream._pending + assert task is not None + await task + + texts = [a.text for a in mock_api_client.sent_activities] + assert "status" not in texts + assert "Dropped 1 informative update(s): text streaming has started" in caplog.text @pytest.mark.asyncio - async def test_coalesce_does_not_drop_text(self, mock_api_client, conversation_reference, patch_loop_call_later): - loop = asyncio.get_running_loop() - patcher, scheduled = patch_loop_call_later(loop) - with patcher: - stream = HttpStream( - mock_api_client, - conversation_reference, - min_send_interval=0.01, - coalesce_informative_updates=True, - ) - stream.update("Thinking...") - stream.emit("The answer") - await asyncio.sleep(0) - await self._run_scheduled_flushes(scheduled) + async def test_text_mode_latch_survives_clear_text(self, mock_api_client, conversation_reference): + # Once text streaming has started, clearing the text buffer must not + # re-enable informative updates. + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream.emit("chunk") + task = stream._pending + assert task is not None + await task + + stream.clear_text() + stream.update("status") + task = stream._pending + assert task is not None + await task - texts = [a.text for a in mock_api_client.sent_activities] - assert texts == ["Thinking...", "The answer"] # informative + text both sent + texts = [a.text for a in mock_api_client.sent_activities] + assert texts == ["chunk"] @pytest.mark.asyncio async def test_close_final_send_is_paced(self, mock_api_client, conversation_reference): From 0fe5331f5f5ce93045de0c0287d927a88c269e30 Mon Sep 17 00:00:00 2001 From: Ayrton WERCK Date: Wed, 10 Jun 2026 22:01:22 +0000 Subject: [PATCH 5/7] refactor(apps): skip re-sending unchanged text, simplify flush bookkeeping An informative-only flush after text streaming starts used to fall through and re-send the unchanged cumulative text, burning a paced limiter slot under the flush lock for a no-op request. The chunk send is now guarded on text actually having been added by this flush. Also from the cleanup pass: track the latest informative update with a scalar and counter instead of building a list that only ever yields its last element, drop the unreachable start_length branch, use the typed ChannelData.stream_type field instead of getattr, trim a comment that restated the limiter docs, and deduplicate test boilerplate behind _await_pending_flush/_track_send_times helpers (tests with no timing assertions now run with pacing disabled). --- .../src/microsoft_teams/apps/http_stream.py | 47 ++++---- packages/apps/tests/test_http_stream.py | 102 ++++++++---------- packages/apps/tests/test_limiter.py | 5 +- 3 files changed, 65 insertions(+), 89 deletions(-) diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index e25d42cfc..e3ebea628 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -247,8 +247,9 @@ async def _flush(self) -> None: self._timeout.cancel() self._timeout = None - informative_updates: list[TypingActivityInput] = [] - start_length = len(self._queue) + last_informative: Optional[TypingActivityInput] = None + informative_count = 0 + text_before = self._text while self._queue: activity = self._queue.popleft() @@ -261,13 +262,11 @@ async def _flush(self) -> None: self._channel_data = ChannelData(**merged) if ( isinstance(activity, TypingActivityInput) - and getattr(activity.channel_data, "stream_type", None) == "informative" + and activity.channel_data is not None + and activity.channel_data.stream_type == "informative" ): - informative_updates.append(activity) - - if start_length == 0: - logger.debug("No activities to flush") - return + last_informative = activity + informative_count += 1 # Any accumulated text — even from this same flush — switches the stream # to text mode for good: the streamed text replaces the status bubble in @@ -275,19 +274,18 @@ async def _flush(self) -> None: if self._text: self._text_mode = True - if informative_updates and self._text_mode: - logger.debug("Dropped %d informative update(s): text streaming has started", len(informative_updates)) - informative_updates = [] - elif len(informative_updates) > 1: - logger.debug("Coalesced %d informative update(s) into the latest", len(informative_updates) - 1) - informative_updates = informative_updates[-1:] - - # Send the surviving informative update (at most one), paced by the limiter - if informative_updates: - await self._send_activity(informative_updates[-1]) - - # Send the combined text chunk - if self._text: + if self._text_mode: + if informative_count: + logger.debug("Dropped %d informative update(s): text streaming has started", informative_count) + elif last_informative is not None: + if informative_count > 1: + logger.debug("Coalesced %d informative update(s) into the latest", informative_count - 1) + await self._send_activity(last_informative) + + # Send the combined text as a chunk, but only when this flush added text — + # an informative-only flush must not re-send (and re-pace) the unchanged + # cumulative text. + if self._text != text_before: to_send = TypingActivityInput(text=self._text) await self._send_activity(to_send) @@ -334,12 +332,7 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) raise StreamCancelledError("Teams channel stopped the stream.") # Pace every HTTP attempt to the Teams 1 req/s streaming limit. Gating here - # rather than in _send_activity also paces retries and close()'s final send, - # so a retry storm or a final message sent right behind the last chunk can't - # trip the throttle. acquire() only waits when a send would actually be too - # soon (next_slot is in the past after any idle gap), so close() pays no - # latency unless it lands within the interval. Pass min_send_interval=0 to - # disable pacing. + # rather than in _send_activity also paces retries and close()'s final send. await self._acquire() to_send.from_ = self._ref.bot diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index eeca01127..c0b42e5c8 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -88,6 +88,25 @@ async def _run_scheduled_flushes(self, scheduled): callback(*args) await asyncio.sleep(0) + async def _await_pending_flush(self, stream): + """Await the flush task created by the last emit/update.""" + task = stream._pending + assert task is not None + await task + + def _track_send_times(self, mock_api_client): + """Wrap the fixture's send mock to record a timestamp per send.""" + send_times: list[float] = [] + original = mock_api_client.conversations.activities().create + + async def timed_send(activity): + send_times.append(monotonic()) + return await original(activity) + + mock_api_client.conversations.activities().create = timed_send + mock_api_client.conversations.activities().update = timed_send + return send_times + @pytest.mark.asyncio async def test_stream_multiple_emits_with_timer(self, http_stream, patch_loop_call_later): loop = asyncio.get_running_loop() @@ -175,16 +194,12 @@ async def test_stream_update_status_sends_typing_activity( async def test_stream_sequence_of_update_and_emit(self, mock_api_client, conversation_reference): # Informative mode then text mode: the informative update flushes first, # then text streaming starts in a later flush. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.update("Preparing response...") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) stream.emit("Final response message") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) assert len(mock_api_client.sent_activities) == 2 typing_activity = mock_api_client.sent_activities[0] @@ -306,7 +321,7 @@ async def mock_send_then_403(activity): ) mock_api_client.conversations.activities().create = mock_send_then_403 - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) # First emit succeeds stream.emit("First message") @@ -462,39 +477,27 @@ async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversa async def test_informative_burst_coalesces_to_latest(self, mock_api_client, conversation_reference, caplog): # Informative updates are status replacements, not cumulative content: # a burst in one flush collapses to the latest one. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.05) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) with caplog.at_level(logging.DEBUG, logger="microsoft_teams.apps.http_stream"): for i in range(8): stream.update(f"progress {i}") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) assert mock_api_client.send_call_count == 1 assert mock_api_client.sent_activities[0].text == "progress 7" assert "Coalesced 7 informative update(s) into the latest" in caplog.text @pytest.mark.asyncio - @pytest.mark.parametrize("interval", [0.05, 0.15]) - async def test_consecutive_emits_are_paced_across_flushes(self, mock_api_client, conversation_reference, interval): - send_times: list[float] = [] - - async def mock_send(activity): - send_times.append(monotonic()) - mock_api_client.send_call_count += 1 - mock_api_client.sent_activities.append(activity) - return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity) - - mock_api_client.conversations.activities().create = mock_send + async def test_consecutive_emits_are_paced_across_flushes(self, mock_api_client, conversation_reference): + interval = 0.05 + send_times = self._track_send_times(mock_api_client) stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) - # Each update drains its own flush, so pacing must hold across flushes (bug 2). + # Each update drains its own flush, so pacing must hold across flushes. for i in range(3): stream.update(f"step {i}") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) texts = [a.text for a in mock_api_client.sent_activities] assert texts == ["step 0", "step 1", "step 2"] @@ -506,49 +509,41 @@ async def test_text_supersedes_informative_in_same_flush(self, mock_api_client, # When text lands in the same flush as informative updates, the text wins: # it replaces the status bubble immediately anyway, so the informative # update would only burn a paced slot before real content. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.update("Thinking...") stream.emit("The answer") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) texts = [a.text for a in mock_api_client.sent_activities] assert texts == ["The answer"] @pytest.mark.asyncio async def test_informative_dropped_after_text_started(self, mock_api_client, conversation_reference, caplog): - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.emit("chunk") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) with caplog.at_level(logging.DEBUG, logger="microsoft_teams.apps.http_stream"): stream.update("status") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) + # The informative-only flush sends nothing: the update is dropped and the + # unchanged cumulative text is not re-sent. texts = [a.text for a in mock_api_client.sent_activities] - assert "status" not in texts + assert texts == ["chunk"] assert "Dropped 1 informative update(s): text streaming has started" in caplog.text @pytest.mark.asyncio async def test_text_mode_latch_survives_clear_text(self, mock_api_client, conversation_reference): # Once text streaming has started, clearing the text buffer must not # re-enable informative updates. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01) + stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) stream.emit("chunk") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) stream.clear_text() stream.update("status") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) texts = [a.text for a in mock_api_client.sent_activities] assert texts == ["chunk"] @@ -558,22 +553,11 @@ async def test_close_final_send_is_paced(self, mock_api_client, conversation_ref # The final close() send goes through the limiter too, so it can't land # right behind the last chunk and trip the throttle. interval = 0.05 - send_times: list[float] = [] - - async def mock_send(activity): - send_times.append(monotonic()) - mock_api_client.send_call_count += 1 - mock_api_client.sent_activities.append(activity) - return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity) - - mock_api_client.conversations.activities().create = mock_send - mock_api_client.conversations.activities().update = mock_send + send_times = self._track_send_times(mock_api_client) stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) stream.emit("chunk") - task = stream._pending - assert task is not None - await task + await self._await_pending_flush(stream) await stream.close() assert len(send_times) == 2 # chunk + final diff --git a/packages/apps/tests/test_limiter.py b/packages/apps/tests/test_limiter.py index e987498d4..c2371ee0c 100644 --- a/packages/apps/tests/test_limiter.py +++ b/packages/apps/tests/test_limiter.py @@ -24,7 +24,6 @@ async def test_make_limiter_spaces_calls(): assert elapsed >= 0.10 # two subsequent calls each waited one interval -@pytest.mark.parametrize("interval", [-0.5, -1]) -def test_make_limiter_rejects_negative_interval(interval: float): +def test_make_limiter_rejects_negative_interval(): with pytest.raises(ValueError): - make_limiter(interval) + make_limiter(-0.5) From 8d5e26b21181db0bc2602bc39305f3c990df21b9 Mon Sep 17 00:00:00 2001 From: lilyydu Date: Wed, 24 Jun 2026 15:36:06 -0700 Subject: [PATCH 6/7] Revert HttpStream rate-limit changes from PR #453 Removes the per-stream leaky-bucket limiter and associated changes, resetting the affected files to their main baseline. This provides a clean starting point for a new implementation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/microsoft_teams/apps/http_stream.py | 78 +++----- .../microsoft_teams/apps/utils/__init__.py | 10 +- .../src/microsoft_teams/apps/utils/limiter.py | 33 ---- packages/apps/tests/test_http_stream.py | 171 ++++-------------- packages/apps/tests/test_limiter.py | 29 --- 5 files changed, 55 insertions(+), 266 deletions(-) delete mode 100644 packages/apps/src/microsoft_teams/apps/utils/limiter.py delete mode 100644 packages/apps/tests/test_limiter.py diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index e3ebea628..05f188fad 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -20,7 +20,7 @@ from microsoft_teams.common import EventEmitter from .plugins.streamer import StreamCancelledError, StreamerEvent, StreamerProtocol -from .utils import RetryOptions, make_limiter, retry +from .utils import RetryOptions, retry logger = logging.getLogger(__name__) @@ -32,43 +32,27 @@ class HttpStream(StreamerProtocol): Flow: 1. emit() adds activities to a queue 2. _flush() drains the entire queue under a lock. - 3. Message text are combined into a typing chunk. - 4. Another flush is scheduled if more items remain. - 5. close() waits for queue to empty, then sends final message with stream_type='stream_final' - - The stream has two modes: - - Informative mode (initial): each flush sends only the latest pending informative - update — they are status replacements, not cumulative content, so intermediate - ones are skipped (count logged at debug level). - - Text mode: entered once text streaming starts, permanently. Informative updates - are no longer sent (count logged at debug level). + 3. Informative typing updates are sent immediately if no message started. + 4. Message text are combined into a typing chunk. + 5. Another flush is scheduled if more items remain. + 6. close() waits for queue to empty, then sends final message with stream_type='stream_final' The timeout cancellation ensures only one flush operation is scheduled at a time. - Every send — informative updates, text chunks, retries, and the final close() send — - goes through the same per-stream limiter (min_send_interval) so we stay within the - Teams 1 req/s streaming limit. + The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams. """ - def __init__( - self, - client: ApiClient, - ref: ConversationReference, - min_send_interval: float = 1.0, - ): + def __init__(self, client: ApiClient, ref: ConversationReference): """ Initialize a new HttpStream instance. Args: client (ApiClient): The API client used to send activities to Microsoft Teams. ref (ConversationReference): Reference to the Teams conversation. - min_send_interval (float): Minimum seconds between sends, including retries and the final - close() send (Teams limits streaming to 1 req/s). Set 0 to disable pacing. """ super().__init__() self._client = client self._ref = ref self._events = EventEmitter[StreamerEvent]() - self._acquire = make_limiter(min_send_interval) self._result: Optional[SentActivity] = None self._lock = asyncio.Lock() @@ -85,7 +69,6 @@ def _reset_state(self) -> None: self._index = 1 self._id: Optional[str] = None self._text: str = "" - self._text_mode: bool = False self._channel_data: ChannelData = ChannelData() self._final_activity: Optional[MessageActivityInput] = None self._queue: deque[Union[MessageActivityInput, TypingActivityInput, str]] = deque() @@ -157,9 +140,6 @@ def clear_text(self) -> None: previously-flushed attachments/suggested actions aren't sent if the caller never emits a replacement. The stream id and channel data are kept intact so the new final activity still updates the stream in place. - - Text mode is NOT reset: once text streaming has started, informative - updates stay disabled even if the text buffer is cleared. """ # Safe without the lock: no await points here, so this runs atomically # w.r.t. the event loop and can't interleave with _flush's critical @@ -247,9 +227,8 @@ async def _flush(self) -> None: self._timeout.cancel() self._timeout = None - last_informative: Optional[TypingActivityInput] = None - informative_count = 0 - text_before = self._text + informative_updates: list[TypingActivityInput] = [] + start_length = len(self._queue) while self._queue: activity = self._queue.popleft() @@ -262,30 +241,23 @@ async def _flush(self) -> None: self._channel_data = ChannelData(**merged) if ( isinstance(activity, TypingActivityInput) - and activity.channel_data is not None - and activity.channel_data.stream_type == "informative" + and getattr(activity.channel_data, "stream_type", None) == "informative" + and self._text == "" ): - last_informative = activity - informative_count += 1 + # If `_text` is not empty then it's possible that streaming has started. + # And so informative updates cannot be sent. + informative_updates.append(activity) + + if start_length == 0: + logger.debug("No activities to flush") + return + + # Send informative updates immediately + for typing_update in informative_updates: + await self._send_activity(typing_update) - # Any accumulated text — even from this same flush — switches the stream - # to text mode for good: the streamed text replaces the status bubble in - # the Teams UI, so informative updates are pointless from then on. + # Send the combined text chunk if self._text: - self._text_mode = True - - if self._text_mode: - if informative_count: - logger.debug("Dropped %d informative update(s): text streaming has started", informative_count) - elif last_informative is not None: - if informative_count > 1: - logger.debug("Coalesced %d informative update(s) into the latest", informative_count - 1) - await self._send_activity(last_informative) - - # Send the combined text as a chunk, but only when this flush added text — - # an informative-only flush must not re-send (and re-pace) the unchanged - # cumulative text. - if self._text != text_before: to_send = TypingActivityInput(text=self._text) await self._send_activity(to_send) @@ -331,10 +303,6 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) logger.warning("Teams channel stopped the stream.") raise StreamCancelledError("Teams channel stopped the stream.") - # Pace every HTTP attempt to the Teams 1 req/s streaming limit. Gating here - # rather than in _send_activity also paces retries and close()'s final send. - await self._acquire() - to_send.from_ = self._ref.bot to_send.conversation = self._ref.conversation diff --git a/packages/apps/src/microsoft_teams/apps/utils/__init__.py b/packages/apps/src/microsoft_teams/apps/utils/__init__.py index 2b26c5ec5..511041a5a 100644 --- a/packages/apps/src/microsoft_teams/apps/utils/__init__.py +++ b/packages/apps/src/microsoft_teams/apps/utils/__init__.py @@ -5,15 +5,7 @@ from .activity_utils import extract_tenant_id from .graph import create_graph_client -from .limiter import make_limiter from .retry import RetryOptions, retry from .thread import to_threaded_conversation_id -__all__ = [ - "create_graph_client", - "extract_tenant_id", - "make_limiter", - "retry", - "RetryOptions", - "to_threaded_conversation_id", -] +__all__ = ["create_graph_client", "extract_tenant_id", "retry", "RetryOptions", "to_threaded_conversation_id"] diff --git a/packages/apps/src/microsoft_teams/apps/utils/limiter.py b/packages/apps/src/microsoft_teams/apps/utils/limiter.py deleted file mode 100644 index c43857762..000000000 --- a/packages/apps/src/microsoft_teams/apps/utils/limiter.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -Copyright (c) Microsoft Corporation. All rights reserved. -Licensed under the MIT License. -""" - -import asyncio -from time import monotonic -from typing import Awaitable, Callable - - -def make_limiter(interval: float) -> Callable[[], Awaitable[None]]: - """Fixed-interval gate (a token bucket of size 1): consecutive - acquisitions are spaced at least `interval` seconds apart. - - The slot is reserved (read then write of `next_slot`) with no await in - between, so reservations are race-free under single-threaded asyncio. The - first call never waits, and `interval=0` disables pacing. - """ - if interval < 0: - raise ValueError("interval must be >= 0") - - next_slot = monotonic() - - async def acquire() -> None: - nonlocal next_slot - now = monotonic() - slot = max(now, next_slot) - next_slot = slot + interval - wait = slot - now - if wait > 0: - await asyncio.sleep(wait) - - return acquire diff --git a/packages/apps/tests/test_http_stream.py b/packages/apps/tests/test_http_stream.py index c0b42e5c8..53173c4cb 100644 --- a/packages/apps/tests/test_http_stream.py +++ b/packages/apps/tests/test_http_stream.py @@ -5,8 +5,6 @@ # pyright: basic import asyncio -import logging -from time import monotonic from unittest.mock import MagicMock, patch import pytest @@ -65,7 +63,7 @@ def conversation_reference(self): @pytest.fixture def http_stream(self, mock_api_client, conversation_reference): - return HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + return HttpStream(mock_api_client, conversation_reference) @pytest.fixture def patch_loop_call_later(self): @@ -88,25 +86,6 @@ async def _run_scheduled_flushes(self, scheduled): callback(*args) await asyncio.sleep(0) - async def _await_pending_flush(self, stream): - """Await the flush task created by the last emit/update.""" - task = stream._pending - assert task is not None - await task - - def _track_send_times(self, mock_api_client): - """Wrap the fixture's send mock to record a timestamp per send.""" - send_times: list[float] = [] - original = mock_api_client.conversations.activities().create - - async def timed_send(activity): - send_times.append(monotonic()) - return await original(activity) - - mock_api_client.conversations.activities().create = timed_send - mock_api_client.conversations.activities().update = timed_send - return send_times - @pytest.mark.asyncio async def test_stream_multiple_emits_with_timer(self, http_stream, patch_loop_call_later): loop = asyncio.get_running_loop() @@ -137,7 +116,7 @@ async def mock_send_with_timeout(activity): return SentActivity(id=f"success-after-timeout-{call_count}", activity_params=activity) mock_api_client.conversations.activities().create = mock_send_with_timeout - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream.emit("Test message with timeout") await asyncio.sleep(0) @@ -162,7 +141,7 @@ async def mock_send_all_timeout(activity): raise TimeoutError("All operations timed out") mock_api_client.conversations.activities().create = mock_send_all_timeout - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream.emit("Test message with all timeouts") await asyncio.sleep(0) @@ -177,7 +156,7 @@ async def test_stream_update_status_sends_typing_activity( loop = asyncio.get_running_loop() patcher, scheduled = patch_loop_call_later(loop) with patcher: - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream.update("Thinking...") await asyncio.sleep(0) await self._run_scheduled_flushes(scheduled) @@ -191,24 +170,26 @@ async def test_stream_update_status_sends_typing_activity( assert stream.sequence >= 2 @pytest.mark.asyncio - async def test_stream_sequence_of_update_and_emit(self, mock_api_client, conversation_reference): - # Informative mode then text mode: the informative update flushes first, - # then text streaming starts in a later flush. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) - stream.update("Preparing response...") - await self._await_pending_flush(stream) - - stream.emit("Final response message") - await self._await_pending_flush(stream) + async def test_stream_sequence_of_update_and_emit( + self, mock_api_client, conversation_reference, patch_loop_call_later + ): + loop = asyncio.get_running_loop() + patcher, scheduled = patch_loop_call_later(loop) + with patcher: + stream = HttpStream(mock_api_client, conversation_reference) + stream.update("Preparing response...") + stream.emit("Final response message") + await asyncio.sleep(0) + await self._run_scheduled_flushes(scheduled) - assert len(mock_api_client.sent_activities) == 2 - typing_activity = mock_api_client.sent_activities[0] - message_activity = mock_api_client.sent_activities[1] + assert len(mock_api_client.sent_activities) >= 2 + typing_activity = mock_api_client.sent_activities[0] + message_activity = mock_api_client.sent_activities[1] - assert isinstance(typing_activity, TypingActivityInput) - assert typing_activity.text == "Preparing response..." - assert message_activity.text == "Final response message" - assert stream.sequence >= 3 + assert isinstance(typing_activity, TypingActivityInput) + assert typing_activity.text == "Preparing response..." + assert message_activity.text == "Final response message" + assert stream.sequence >= 3 @pytest.mark.asyncio async def test_stream_concurrent_emits_do_not_flush_simultaneously( @@ -233,7 +214,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().create = mock_send with patcher: - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) async def emit_task(): stream.emit("Concurrent message") @@ -258,7 +239,7 @@ async def mock_send_403(activity): ) mock_api_client.conversations.activities().create = mock_send_403 - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream.emit("Test message") await asyncio.sleep(0) @@ -280,7 +261,7 @@ async def mock_send_403(activity): ) mock_api_client.conversations.activities().create = mock_send_403 - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream.emit("First message") await asyncio.sleep(0) @@ -294,7 +275,7 @@ async def mock_send_403(activity): @pytest.mark.asyncio async def test_send_blocked_after_cancel(self, mock_api_client, conversation_reference): - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream._canceled = True with pytest.raises(StreamCancelledError, match="Teams channel stopped the stream."): @@ -321,7 +302,7 @@ async def mock_send_then_403(activity): ) mock_api_client.conversations.activities().create = mock_send_then_403 - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) # First emit succeeds stream.emit("First message") @@ -345,7 +326,7 @@ async def mock_send_then_403(activity): @pytest.mark.asyncio async def test_close_returns_none_when_canceled(self, mock_api_client, conversation_reference): - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) stream._canceled = True result = await stream.close() @@ -375,7 +356,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().update = mock_send with patcher: - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) early_actions = SuggestedActions( to=[], @@ -425,7 +406,7 @@ async def mock_send(activity): mock_api_client.conversations.activities().update = mock_send with patcher: - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) actions = SuggestedActions( to=[], @@ -449,7 +430,7 @@ async def mock_send(activity): @pytest.mark.asyncio async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversation_reference): """close() must not send the final message while a flush is still mid-await.""" - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) + stream = HttpStream(mock_api_client, conversation_reference) # Simulate a flush in progress: lock held, _id assigned, text accumulated. # This mirrors the window after the inner queue drain but before SendActivity awaits resolve. @@ -472,93 +453,3 @@ async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversa assert result is not None assert mock_api_client.send_call_count == 1 assert mock_api_client.sent_activities[0].text == "Response text" - - @pytest.mark.asyncio - async def test_informative_burst_coalesces_to_latest(self, mock_api_client, conversation_reference, caplog): - # Informative updates are status replacements, not cumulative content: - # a burst in one flush collapses to the latest one. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) - with caplog.at_level(logging.DEBUG, logger="microsoft_teams.apps.http_stream"): - for i in range(8): - stream.update(f"progress {i}") - - await self._await_pending_flush(stream) - - assert mock_api_client.send_call_count == 1 - assert mock_api_client.sent_activities[0].text == "progress 7" - assert "Coalesced 7 informative update(s) into the latest" in caplog.text - - @pytest.mark.asyncio - async def test_consecutive_emits_are_paced_across_flushes(self, mock_api_client, conversation_reference): - interval = 0.05 - send_times = self._track_send_times(mock_api_client) - - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) - # Each update drains its own flush, so pacing must hold across flushes. - for i in range(3): - stream.update(f"step {i}") - await self._await_pending_flush(stream) - - texts = [a.text for a in mock_api_client.sent_activities] - assert texts == ["step 0", "step 1", "step 2"] - gaps = [b - a for a, b in zip(send_times, send_times[1:], strict=False)] - assert min(gaps) >= interval * 0.9 - - @pytest.mark.asyncio - async def test_text_supersedes_informative_in_same_flush(self, mock_api_client, conversation_reference): - # When text lands in the same flush as informative updates, the text wins: - # it replaces the status bubble immediately anyway, so the informative - # update would only burn a paced slot before real content. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) - stream.update("Thinking...") - stream.emit("The answer") - await self._await_pending_flush(stream) - - texts = [a.text for a in mock_api_client.sent_activities] - assert texts == ["The answer"] - - @pytest.mark.asyncio - async def test_informative_dropped_after_text_started(self, mock_api_client, conversation_reference, caplog): - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) - stream.emit("chunk") - await self._await_pending_flush(stream) - - with caplog.at_level(logging.DEBUG, logger="microsoft_teams.apps.http_stream"): - stream.update("status") - await self._await_pending_flush(stream) - - # The informative-only flush sends nothing: the update is dropped and the - # unchanged cumulative text is not re-sent. - texts = [a.text for a in mock_api_client.sent_activities] - assert texts == ["chunk"] - assert "Dropped 1 informative update(s): text streaming has started" in caplog.text - - @pytest.mark.asyncio - async def test_text_mode_latch_survives_clear_text(self, mock_api_client, conversation_reference): - # Once text streaming has started, clearing the text buffer must not - # re-enable informative updates. - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0) - stream.emit("chunk") - await self._await_pending_flush(stream) - - stream.clear_text() - stream.update("status") - await self._await_pending_flush(stream) - - texts = [a.text for a in mock_api_client.sent_activities] - assert texts == ["chunk"] - - @pytest.mark.asyncio - async def test_close_final_send_is_paced(self, mock_api_client, conversation_reference): - # The final close() send goes through the limiter too, so it can't land - # right behind the last chunk and trip the throttle. - interval = 0.05 - send_times = self._track_send_times(mock_api_client) - - stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval) - stream.emit("chunk") - await self._await_pending_flush(stream) - await stream.close() - - assert len(send_times) == 2 # chunk + final - assert send_times[1] - send_times[0] >= interval * 0.9 diff --git a/packages/apps/tests/test_limiter.py b/packages/apps/tests/test_limiter.py deleted file mode 100644 index c2371ee0c..000000000 --- a/packages/apps/tests/test_limiter.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -Copyright (c) Microsoft Corporation. All rights reserved. -Licensed under the MIT License. -""" - -from time import monotonic - -import pytest -from microsoft_teams.apps.utils import make_limiter - - -@pytest.mark.asyncio -async def test_make_limiter_spaces_calls(): - acquire = make_limiter(0.05) - - start = monotonic() - await acquire() # first call returns immediately - first_done = monotonic() - await acquire() - await acquire() - elapsed = monotonic() - start - - assert first_done - start < 0.05 # leading edge is not delayed - assert elapsed >= 0.10 # two subsequent calls each waited one interval - - -def test_make_limiter_rejects_negative_interval(): - with pytest.raises(ValueError): - make_limiter(-0.5) From 3c1c34c9962ef4834fbb9614da5e9432cfcf8c99 Mon Sep 17 00:00:00 2001 From: lilyydu Date: Fri, 26 Jun 2026 12:25:33 -0700 Subject: [PATCH 7/7] fix stream timeout 403 error Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/microsoft_teams/apps/http_stream.py | 60 +++++++++++++++---- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index 05f188fad..c6491f2b3 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -62,6 +62,7 @@ def __init__(self, client: ApiClient, ref: ConversationReference): self._state_changed = asyncio.Event() self._canceled = False + self._timed_out = False self._reset_state() def _reset_state(self) -> None: @@ -77,10 +78,18 @@ def _reset_state(self) -> None: def canceled(self) -> bool: """ Whether the stream has been canceled. - For example when the user pressed the Stop button or the 2-minute timeout has exceeded. + For example when the user pressed the Stop button. """ return self._canceled + @property + def timed_out(self) -> bool: + """ + Whether the stream has timed out. + For example when the streaming has exceeded two minutes. + """ + return self._timed_out + @property def closed(self) -> bool: """Whether the final stream message has been sent.""" @@ -194,11 +203,27 @@ async def close(self) -> Optional[SentActivity]: return None # Build final message from the last emitted MessageActivityInput (last wins) - assert self._id is not None, "ID should be set by this point" - activity = self._final_activity or MessageActivityInput() - activity.with_text(self._text).with_id(self._id).with_channel_data(self._channel_data).add_stream_final() - - res = await retry(lambda: self._send(activity), options=RetryOptions()) + if self._timed_out: + activity = self._final_activity or MessageActivityInput() + activity.with_text(self._text) + activity.id = None + res = await retry(lambda: self._send(activity), options=RetryOptions()) + else: + assert self._id is not None, "ID should be set by this point" + activity = self._final_activity or MessageActivityInput() + activity.with_text(self._text).with_id(self._id).with_channel_data(self._channel_data).add_stream_final() + try: + res = await retry(lambda: self._send(activity), options=RetryOptions()) + except StreamCancelledError: + # Reaches this point if the streaming time exceeded 2 minutes on the final request. + if not self._timed_out: + raise + # The final stream send itself tripped the time limit; resend the + # buffered content as a regular message (cleared id -> create path). + final_message = self._final_activity or MessageActivityInput() + final_message.with_text(self._text) + final_message.id = None + res = await self._send(final_message) # Emit close event self._events.emit("close", res) @@ -252,6 +277,9 @@ async def _flush(self) -> None: logger.debug("No activities to flush") return + if self._timed_out: + return + # Send informative updates immediately for typing_update in informative_updates: await self._send_activity(typing_update) @@ -282,10 +310,15 @@ async def _send_activity(self, to_send: TypingActivityInput): to_send = to_send.with_id(self._id) to_send = to_send.add_stream_update(self._index) - res = await retry( - lambda: self._send(to_send), - options=RetryOptions(max_delay=4.0, jitter_type="none", max_attempts=8), - ) + try: + res = await retry( + lambda: self._send(to_send), + options=RetryOptions(max_delay=4.0, jitter_type="none", max_attempts=8), + ) + except StreamCancelledError: + if self._timed_out: + return + raise self._events.emit("chunk", res) self._index += 1 if self._id is None: @@ -315,6 +348,13 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) return SentActivity.merge(to_send, res) except HTTPStatusError as e: if e.response.status_code == 403: + error = e.response.json().get("error", {}) + message = error.get("message", "") + if message != "Content stream was cancelled by user.": + if message == "Content stream finished due to exceeded streaming time.": + self._timed_out = True + logger.warning("Teams encountered an error while streaming. Sending as a regular message.") + raise StreamCancelledError(message) from e self._canceled = True logger.warning("Teams channel stopped the stream.") raise StreamCancelledError("Teams channel stopped the stream.") from e