diff --git a/packages/apps/src/microsoft_teams/apps/http_stream.py b/packages/apps/src/microsoft_teams/apps/http_stream.py index 05f188fa..c6491f2b 100644 --- a/packages/apps/src/microsoft_teams/apps/http_stream.py +++ b/packages/apps/src/microsoft_teams/apps/http_stream.py @@ -62,6 +62,7 @@ def __init__(self, client: ApiClient, ref: ConversationReference): self._state_changed = asyncio.Event() self._canceled = False + self._timed_out = False self._reset_state() def _reset_state(self) -> None: @@ -77,10 +78,18 @@ def _reset_state(self) -> None: def canceled(self) -> bool: """ Whether the stream has been canceled. - For example when the user pressed the Stop button or the 2-minute timeout has exceeded. + For example when the user pressed the Stop button. """ return self._canceled + @property + def timed_out(self) -> bool: + """ + Whether the stream has timed out. + For example when the streaming has exceeded two minutes. + """ + return self._timed_out + @property def closed(self) -> bool: """Whether the final stream message has been sent.""" @@ -194,11 +203,27 @@ async def close(self) -> Optional[SentActivity]: return None # Build final message from the last emitted MessageActivityInput (last wins) - assert self._id is not None, "ID should be set by this point" - activity = self._final_activity or MessageActivityInput() - activity.with_text(self._text).with_id(self._id).with_channel_data(self._channel_data).add_stream_final() - - res = await retry(lambda: self._send(activity), options=RetryOptions()) + if self._timed_out: + activity = self._final_activity or MessageActivityInput() + activity.with_text(self._text) + activity.id = None + res = await retry(lambda: self._send(activity), options=RetryOptions()) + else: + assert self._id is not None, "ID should be set by this point" + activity = self._final_activity or MessageActivityInput() + activity.with_text(self._text).with_id(self._id).with_channel_data(self._channel_data).add_stream_final() + try: + res = await retry(lambda: self._send(activity), options=RetryOptions()) + except StreamCancelledError: + # Reaches this point if the streaming time exceeded 2 minutes on the final request. + if not self._timed_out: + raise + # The final stream send itself tripped the time limit; resend the + # buffered content as a regular message (cleared id -> create path). + final_message = self._final_activity or MessageActivityInput() + final_message.with_text(self._text) + final_message.id = None + res = await self._send(final_message) # Emit close event self._events.emit("close", res) @@ -252,6 +277,9 @@ async def _flush(self) -> None: logger.debug("No activities to flush") return + if self._timed_out: + return + # Send informative updates immediately for typing_update in informative_updates: await self._send_activity(typing_update) @@ -282,10 +310,15 @@ async def _send_activity(self, to_send: TypingActivityInput): to_send = to_send.with_id(self._id) to_send = to_send.add_stream_update(self._index) - res = await retry( - lambda: self._send(to_send), - options=RetryOptions(max_delay=4.0, jitter_type="none", max_attempts=8), - ) + try: + res = await retry( + lambda: self._send(to_send), + options=RetryOptions(max_delay=4.0, jitter_type="none", max_attempts=8), + ) + except StreamCancelledError: + if self._timed_out: + return + raise self._events.emit("chunk", res) self._index += 1 if self._id is None: @@ -315,6 +348,13 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) return SentActivity.merge(to_send, res) except HTTPStatusError as e: if e.response.status_code == 403: + error = e.response.json().get("error", {}) + message = error.get("message", "") + if message != "Content stream was cancelled by user.": + if message == "Content stream finished due to exceeded streaming time.": + self._timed_out = True + logger.warning("Teams encountered an error while streaming. Sending as a regular message.") + raise StreamCancelledError(message) from e self._canceled = True logger.warning("Teams channel stopped the stream.") raise StreamCancelledError("Teams channel stopped the stream.") from e