Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions packages/apps/src/microsoft_teams/apps/activity_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,25 @@ class ActivitySender:
Separate from transport concerns (HTTP, WebSocket, etc.)
"""

def __init__(self, client: Client):
def __init__(
self,
client: Client,
stream_min_send_interval: float = 1.0,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also don't think we need this.

stream_coalesce_informative_updates: bool = False,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's stream_coalesce_informative_updates should be true always. let's not expose this.

):
"""
Initialize ActivitySender.

Args:
client: HTTP client with token provider configured
stream_min_send_interval: Minimum seconds between sends on streams created by
create_stream() (Teams limits streaming to 1 req/s). Set 0 to disable pacing.
stream_coalesce_informative_updates: When True, a burst of informative updates in one
flush collapses to the latest one instead of pacing out every update.
"""
self._client = client
self._stream_min_send_interval = stream_min_send_interval
self._stream_coalesce_informative_updates = stream_coalesce_informative_updates

async def send(self, activity: ActivityParams, ref: ConversationReference) -> SentActivity:
"""
Expand Down Expand Up @@ -92,4 +103,9 @@ def create_stream(self, ref: ConversationReference) -> StreamerProtocol:
"""
# Create API client for this conversation's service URL
api = ApiClient(ref.service_url, self._client)
return HttpStream(api, ref)
return HttpStream(
api,
ref,
min_send_interval=self._stream_min_send_interval,
coalesce_informative_updates=self._stream_coalesce_informative_updates,
)
6 changes: 5 additions & 1 deletion packages/apps/src/microsoft_teams/apps/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ def __init__(self, **options: Unpack[AppOptions]):
self._initialized = False

# initialize ActivitySender for sending activities
self.activity_sender = ActivitySender(self.http_client.clone(ClientOptions(token=self._get_bot_token)))
self.activity_sender = ActivitySender(
self.http_client.clone(ClientOptions(token=self._get_bot_token)),
stream_min_send_interval=self.options.stream_min_send_interval,
stream_coalesce_informative_updates=self.options.stream_coalesce_informative_updates,
)

# initialize all event, activity, and plugin processors
self.activity_processor = ActivityProcessor(
Expand Down
40 changes: 35 additions & 5 deletions packages/apps/src/microsoft_teams/apps/http_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from microsoft_teams.common import EventEmitter

from .plugins.streamer import StreamCancelledError, StreamerEvent, StreamerProtocol
from .utils import RetryOptions, retry
from .utils import RetryOptions, make_limiter, retry

logger = logging.getLogger(__name__)

Expand All @@ -32,27 +32,44 @@ class HttpStream(StreamerProtocol):
Flow:
1. emit() adds activities to a queue
2. _flush() drains the entire queue under a lock.
3. Informative typing updates are sent immediately if no message started.
3. Informative typing updates are sent if no message started.
4. Message text are combined into a typing chunk.
5. Another flush is scheduled if more items remain.
6. close() waits for queue to empty, then sends final message with stream_type='stream_final'

The timeout cancellation ensures only one flush operation is scheduled at a time.
The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams.
Every send goes through a per-stream limiter (min_send_interval) so we stay within
the Teams 1 req/s streaming limit. By default every informative update is kept and
paced out; set coalesce_informative_updates=True to collapse a burst of informative
updates in one flush to the latest one instead.
"""

def __init__(self, client: ApiClient, ref: ConversationReference):
def __init__(
self,
client: ApiClient,
ref: ConversationReference,
min_send_interval: float = 1.0,
coalesce_informative_updates: bool = False,
):
"""
Initialize a new HttpStream instance.

Args:
client (ApiClient): The API client used to send activities to Microsoft Teams.
ref (ConversationReference): Reference to the Teams conversation.
min_send_interval (float): Minimum seconds between sends, including retries and the final
close() send (Teams limits streaming to 1 req/s). Set 0 to disable pacing.
coalesce_informative_updates (bool): When False (default), every informative update is
paced out at min_send_interval; a long burst then holds the flush lock and can delay
or drop close()'s final message (see _total_wait_timeout). Set True to collapse a
burst of informative updates in one flush to the latest one instead.
"""
super().__init__()
self._client = client
self._ref = ref
self._events = EventEmitter[StreamerEvent]()
self._acquire = make_limiter(min_send_interval)
self._coalesce_informative_updates = coalesce_informative_updates

self._result: Optional[SentActivity] = None
self._lock = asyncio.Lock()
Expand Down Expand Up @@ -252,7 +269,10 @@ async def _flush(self) -> None:
logger.debug("No activities to flush")
return

# Send informative updates immediately
if self._coalesce_informative_updates and len(informative_updates) > 1:
informative_updates = informative_updates[-1:]

# Send informative updates, paced by the limiter
for typing_update in informative_updates:
await self._send_activity(typing_update)

Expand Down Expand Up @@ -303,6 +323,16 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput])
logger.warning("Teams channel stopped the stream.")
raise StreamCancelledError("Teams channel stopped the stream.")

# Pace every HTTP attempt to the Teams 1 req/s streaming limit. Gating here
# rather than in _send_activity also paces retries and close()'s final send,
# so a retry storm or a final message sent right behind the last chunk can't
# trip the throttle. acquire() only waits when a send would actually be too
# soon (next_slot is in the past after any idle gap), so close() pays no
# latency unless it lands within the interval. While _flush holds self._lock
# this sleeps under it, so a long informative burst still delays close()
# (see _total_wait_timeout). Pass min_send_interval=0 to disable pacing.
await self._acquire()

to_send.from_ = self._ref.bot
to_send.conversation = self._ref.conversation

Expand Down
14 changes: 14 additions & 0 deletions packages/apps/src/microsoft_teams/apps/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ class AppOptions(TypedDict, total=False):
Defaults to PUBLIC (commercial cloud).
"""

# Streaming
stream_min_send_interval: Optional[float]
"""Minimum seconds between sends on a stream, including retries and the final close() send.
Defaults to 1.0 (Teams limits streaming to 1 req/s). Set 0 to disable pacing."""
stream_coalesce_informative_updates: Optional[bool]
"""When True, a burst of informative updates in one flush collapses to the latest one.
Defaults to False: every informative update is kept and paced out at stream_min_send_interval."""


@dataclass
class InternalAppOptions:
Expand Down Expand Up @@ -133,6 +141,12 @@ class InternalAppOptions:
"""URL path for the Teams messaging endpoint. Defaults to '/api/messages'."""
cloud: Optional[CloudEnvironment] = None
"""Cloud environment for sovereign cloud support."""
stream_min_send_interval: float = 1.0
"""Minimum seconds between sends on a stream, including retries and the final close() send.
Defaults to 1.0 (Teams limits streaming to 1 req/s). Set 0 to disable pacing."""
stream_coalesce_informative_updates: bool = False
"""When True, a burst of informative updates in one flush collapses to the latest one.
Defaults to False: every informative update is kept and paced out at stream_min_send_interval."""

@classmethod
def from_typeddict(cls, options: AppOptions) -> "InternalAppOptions":
Expand Down
10 changes: 9 additions & 1 deletion packages/apps/src/microsoft_teams/apps/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@

from .activity_utils import extract_tenant_id
from .graph import create_graph_client
from .limiter import make_limiter
from .retry import RetryOptions, retry
from .thread import to_threaded_conversation_id

__all__ = ["create_graph_client", "extract_tenant_id", "retry", "RetryOptions", "to_threaded_conversation_id"]
__all__ = [
"create_graph_client",
"extract_tenant_id",
"make_limiter",
"retry",
"RetryOptions",
"to_threaded_conversation_id",
]
33 changes: 33 additions & 0 deletions packages/apps/src/microsoft_teams/apps/utils/limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.
"""

import asyncio
from time import monotonic
from typing import Awaitable, Callable


def make_limiter(interval: float) -> Callable[[], Awaitable[None]]:
"""Fixed-interval gate (a token bucket of size 1): consecutive
acquisitions are spaced at least `interval` seconds apart.

The slot is reserved (read then write of `next_slot`) with no await in
between, so reservations are race-free under single-threaded asyncio. The
first call never waits, and `interval=0` disables pacing.
"""
if interval < 0:
raise ValueError("interval must be >= 0")

next_slot = monotonic()

async def acquire() -> None:
nonlocal next_slot
now = monotonic()
slot = max(now, next_slot)
next_slot = slot + interval
wait = slot - now
if wait > 0:
await asyncio.sleep(wait)

return acquire
Loading
Loading