Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions packages/apps/src/microsoft_teams/apps/http_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from microsoft_teams.common import EventEmitter

from .plugins.streamer import StreamCancelledError, StreamerEvent, StreamerProtocol
from .utils import RetryOptions, retry
from .utils import RetryOptions, make_limiter, retry

logger = logging.getLogger(__name__)

Expand All @@ -32,27 +32,44 @@ class HttpStream(StreamerProtocol):
Flow:
1. emit() adds activities to a queue
2. _flush() drains the entire queue under a lock.
3. Informative typing updates are sent immediately if no message started.
3. Informative typing updates are sent if no message started.
4. Message text are combined into a typing chunk.
5. Another flush is scheduled if more items remain.
6. close() waits for queue to empty, then sends final message with stream_type='stream_final'

The timeout cancellation ensures only one flush operation is scheduled at a time.
The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams.
Every send goes through a per-stream limiter (min_send_interval) so we stay within
the Teams 1 req/s streaming limit. By default a burst of informative updates in one
flush collapses to the latest (coalesce_informative_updates); set it False to pace
out every update instead.
"""

def __init__(self, client: ApiClient, ref: ConversationReference):
def __init__(
self,
client: ApiClient,
ref: ConversationReference,
min_send_interval: float = 1.0,
coalesce_informative_updates: bool = True,
Comment thread
lilyydu marked this conversation as resolved.
Outdated
):
"""
Initialize a new HttpStream instance.

Args:
client (ApiClient): The API client used to send activities to Microsoft Teams.
ref (ConversationReference): Reference to the Teams conversation.
min_send_interval (float): Minimum seconds between sends, including retries and the final
close() send (Teams limits streaming to 1 req/s). Set 0 to disable pacing.
coalesce_informative_updates (bool): When True (default), a burst of informative updates in
Comment thread
lilyydu marked this conversation as resolved.
Outdated
one flush collapses to the latest one. Set False to pace out every update at 1 req/s
instead; a long burst then holds the flush lock and can delay or drop close()'s final
message (see _total_wait_timeout).
"""
super().__init__()
self._client = client
self._ref = ref
self._events = EventEmitter[StreamerEvent]()
self._acquire = make_limiter(rate=1, period=min_send_interval)
Comment thread
lilyydu marked this conversation as resolved.
Outdated
self._coalesce_informative_updates = coalesce_informative_updates

self._result: Optional[SentActivity] = None
self._lock = asyncio.Lock()
Expand Down Expand Up @@ -252,7 +269,10 @@ async def _flush(self) -> None:
logger.debug("No activities to flush")
return

# Send informative updates immediately
if self._coalesce_informative_updates and len(informative_updates) > 1:
informative_updates = informative_updates[-1:]

# Send informative updates, paced by the limiter
for typing_update in informative_updates:
await self._send_activity(typing_update)

Expand Down Expand Up @@ -303,6 +323,16 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput])
logger.warning("Teams channel stopped the stream.")
raise StreamCancelledError("Teams channel stopped the stream.")

# Pace every HTTP attempt to the Teams 1 req/s streaming limit. Gating here
# rather than in _send_activity also paces retries and close()'s final send,
# so a retry storm or a final message sent right behind the last chunk can't
# trip the throttle. acquire() only waits when a send would actually be too
# soon (next_slot is in the past after any idle gap), so close() pays no
# latency unless it lands within the interval. While _flush holds self._lock
# this sleeps under it, so a long informative burst still delays close()
# (see _total_wait_timeout). Pass min_send_interval=0 to disable pacing.
await self._acquire()

to_send.from_ = self._ref.bot
to_send.conversation = self._ref.conversation

Expand Down
10 changes: 9 additions & 1 deletion packages/apps/src/microsoft_teams/apps/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@

from .activity_utils import extract_tenant_id
from .graph import create_graph_client
from .limiter import make_limiter
from .retry import RetryOptions, retry
from .thread import to_threaded_conversation_id

__all__ = ["create_graph_client", "extract_tenant_id", "retry", "RetryOptions", "to_threaded_conversation_id"]
__all__ = [
"create_graph_client",
"extract_tenant_id",
"make_limiter",
"retry",
"RetryOptions",
"to_threaded_conversation_id",
]
35 changes: 35 additions & 0 deletions packages/apps/src/microsoft_teams/apps/utils/limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.
"""

import asyncio
from time import monotonic
from typing import Awaitable, Callable


def make_limiter(rate: int, period: float = 1.0) -> Callable[[], Awaitable[None]]:

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already handled: make_limiter raises ValueError for rate < 1 and period < 0, so rate == 0 (ZeroDivisionError) and negative periods fail fast at construction. period == 0 is allowed on purpose and means "no pacing" (interval 0), which is how callers disable the limiter.

"""Leaky-slot limiter: at most `rate` acquisitions per `period` seconds.
Comment thread
lilyydu marked this conversation as resolved.
Outdated

The slot is reserved (read then write of `next_slot`) with no await in
between, so reservations are race-free under single-threaded asyncio. The
first call never waits.
"""
if rate < 1:
raise ValueError("rate must be >= 1")
if period < 0:
raise ValueError("period must be >= 0")

interval = period / rate
Comment thread
lilyydu marked this conversation as resolved.
Outdated
next_slot = monotonic()

async def acquire() -> None:
nonlocal next_slot
now = monotonic()
slot = max(now, next_slot)
next_slot = slot + interval
wait = slot - now
if wait > 0:
await asyncio.sleep(wait)

return acquire
148 changes: 134 additions & 14 deletions packages/apps/tests/test_http_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pyright: basic

import asyncio
from time import monotonic
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -63,7 +64,7 @@ def conversation_reference(self):

@pytest.fixture
def http_stream(self, mock_api_client, conversation_reference):
return HttpStream(mock_api_client, conversation_reference)
return HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

@pytest.fixture
def patch_loop_call_later(self):
Expand Down Expand Up @@ -116,7 +117,7 @@ async def mock_send_with_timeout(activity):
return SentActivity(id=f"success-after-timeout-{call_count}", activity_params=activity)

mock_api_client.conversations.activities().create = mock_send_with_timeout
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

stream.emit("Test message with timeout")
await asyncio.sleep(0)
Expand All @@ -141,7 +142,7 @@ async def mock_send_all_timeout(activity):
raise TimeoutError("All operations timed out")

mock_api_client.conversations.activities().create = mock_send_all_timeout
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

stream.emit("Test message with all timeouts")
await asyncio.sleep(0)
Expand All @@ -156,7 +157,7 @@ async def test_stream_update_status_sends_typing_activity(
loop = asyncio.get_running_loop()
patcher, scheduled = patch_loop_call_later(loop)
with patcher:
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)
stream.update("Thinking...")
await asyncio.sleep(0)
await self._run_scheduled_flushes(scheduled)
Expand All @@ -176,7 +177,7 @@ async def test_stream_sequence_of_update_and_emit(
loop = asyncio.get_running_loop()
patcher, scheduled = patch_loop_call_later(loop)
with patcher:
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01)
stream.update("Preparing response...")
stream.emit("Final response message")
await asyncio.sleep(0)
Expand Down Expand Up @@ -214,7 +215,7 @@ async def mock_send(activity):
mock_api_client.conversations.activities().create = mock_send

with patcher:
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

async def emit_task():
stream.emit("Concurrent message")
Expand All @@ -239,7 +240,7 @@ async def mock_send_403(activity):
)

mock_api_client.conversations.activities().create = mock_send_403
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

stream.emit("Test message")
await asyncio.sleep(0)
Expand All @@ -261,7 +262,7 @@ async def mock_send_403(activity):
)

mock_api_client.conversations.activities().create = mock_send_403
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

stream.emit("First message")
await asyncio.sleep(0)
Expand All @@ -275,7 +276,7 @@ async def mock_send_403(activity):

@pytest.mark.asyncio
async def test_send_blocked_after_cancel(self, mock_api_client, conversation_reference):
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)
stream._canceled = True

with pytest.raises(StreamCancelledError, match="Teams channel stopped the stream."):
Expand All @@ -302,7 +303,7 @@ async def mock_send_then_403(activity):
)

mock_api_client.conversations.activities().create = mock_send_then_403
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.01)

# First emit succeeds
stream.emit("First message")
Expand All @@ -326,7 +327,7 @@ async def mock_send_then_403(activity):

@pytest.mark.asyncio
async def test_close_returns_none_when_canceled(self, mock_api_client, conversation_reference):
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)
stream._canceled = True

result = await stream.close()
Expand Down Expand Up @@ -356,7 +357,7 @@ async def mock_send(activity):
mock_api_client.conversations.activities().update = mock_send

with patcher:
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

early_actions = SuggestedActions(
to=[],
Expand Down Expand Up @@ -406,7 +407,7 @@ async def mock_send(activity):
mock_api_client.conversations.activities().update = mock_send

with patcher:
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

actions = SuggestedActions(
to=[],
Expand All @@ -430,7 +431,7 @@ async def mock_send(activity):
@pytest.mark.asyncio
async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversation_reference):
"""close() must not send the final message while a flush is still mid-await."""
stream = HttpStream(mock_api_client, conversation_reference)
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0)

# Simulate a flush in progress: lock held, _id assigned, text accumulated.
# This mirrors the window after the inner queue drain but before SendActivity awaits resolve.
Expand All @@ -453,3 +454,122 @@ async def test_close_waits_for_flush_to_complete(self, mock_api_client, conversa
assert result is not None
assert mock_api_client.send_call_count == 1
assert mock_api_client.sent_activities[0].text == "Response text"

@pytest.mark.asyncio
async def test_rapid_updates_are_paced_not_dropped_when_coalesce_off(self, mock_api_client, conversation_reference):
interval = 0.05
send_times: list[float] = []

async def mock_send(activity):
send_times.append(monotonic())
mock_api_client.send_call_count += 1
mock_api_client.sent_activities.append(activity)
return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity)

mock_api_client.conversations.activities().create = mock_send

stream = HttpStream(
mock_api_client,
conversation_reference,
min_send_interval=interval,
coalesce_informative_updates=False,
)
for i in range(8):
stream.update(f"progress {i}")

task = stream._pending
assert task is not None
await task
Comment thread
lilyydu marked this conversation as resolved.
Outdated

texts = [a.text for a in mock_api_client.sent_activities]
assert texts == [f"progress {i}" for i in range(8)] # all sent, in order, none dropped
gaps = [b - a for a, b in zip(send_times, send_times[1:], strict=False)]
assert min(gaps) >= interval * 0.9 # each subsequent send waited ~one interval

@pytest.mark.asyncio
@pytest.mark.parametrize("interval", [0.05, 0.15])
async def test_consecutive_emits_are_paced_across_flushes(self, mock_api_client, conversation_reference, interval):
send_times: list[float] = []

async def mock_send(activity):
send_times.append(monotonic())
mock_api_client.send_call_count += 1
mock_api_client.sent_activities.append(activity)
return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity)

mock_api_client.conversations.activities().create = mock_send

stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval)
# Each update drains its own flush, so pacing must hold across flushes (bug 2).
for i in range(3):
stream.update(f"step {i}")
task = stream._pending
assert task is not None
await task

texts = [a.text for a in mock_api_client.sent_activities]
assert texts == ["step 0", "step 1", "step 2"]
gaps = [b - a for a, b in zip(send_times, send_times[1:], strict=False)]
assert min(gaps) >= interval * 0.9

@pytest.mark.asyncio
async def test_coalesce_drops_intermediate_informative_in_burst_by_default(
self, mock_api_client, conversation_reference
):
# Coalescing is the default, so a burst must not be paced one-by-one.
stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=0.05)
for i in range(8):
stream.update(f"progress {i}")

task = stream._pending
assert task is not None
await task

# A burst collapses to the latest informative bubble.
assert mock_api_client.send_call_count == 1
assert mock_api_client.sent_activities[0].text == "progress 7"

@pytest.mark.asyncio
async def test_coalesce_does_not_drop_text(self, mock_api_client, conversation_reference, patch_loop_call_later):
loop = asyncio.get_running_loop()
patcher, scheduled = patch_loop_call_later(loop)
with patcher:
stream = HttpStream(
mock_api_client,
conversation_reference,
min_send_interval=0.01,
coalesce_informative_updates=True,
)
stream.update("Thinking...")
stream.emit("The answer")
await asyncio.sleep(0)
await self._run_scheduled_flushes(scheduled)

texts = [a.text for a in mock_api_client.sent_activities]
assert texts == ["Thinking...", "The answer"] # informative + text both sent

@pytest.mark.asyncio
async def test_close_final_send_is_paced(self, mock_api_client, conversation_reference):
# The final close() send goes through the limiter too, so it can't land
# right behind the last chunk and trip the throttle.
interval = 0.05
send_times: list[float] = []

async def mock_send(activity):
send_times.append(monotonic())
mock_api_client.send_call_count += 1
mock_api_client.sent_activities.append(activity)
return SentActivity(id=f"activity-{mock_api_client.send_call_count}", activity_params=activity)

mock_api_client.conversations.activities().create = mock_send
mock_api_client.conversations.activities().update = mock_send

stream = HttpStream(mock_api_client, conversation_reference, min_send_interval=interval)
stream.emit("chunk")
task = stream._pending
assert task is not None
await task
await stream.close()

assert len(send_times) == 2 # chunk + final
assert send_times[1] - send_times[0] >= interval * 0.9
Loading