Skip to content
Open
60 changes: 50 additions & 10 deletions packages/apps/src/microsoft_teams/apps/http_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, client: ApiClient, ref: ConversationReference):
self._state_changed = asyncio.Event()

self._canceled = False
self._timed_out = False
self._reset_state()

def _reset_state(self) -> None:
Expand All @@ -77,10 +78,18 @@ def _reset_state(self) -> None:
def canceled(self) -> bool:
"""
Whether the stream has been canceled.
For example when the user pressed the Stop button or the 2-minute timeout has exceeded.
For example when the user pressed the Stop button.
"""
return self._canceled

@property
def timed_out(self) -> bool:
"""
Whether the stream has timed out.
For example when the streaming has exceeded two minutes.
"""
return self._timed_out

@property
def closed(self) -> bool:
"""Whether the final stream message has been sent."""
Expand Down Expand Up @@ -194,11 +203,27 @@ async def close(self) -> Optional[SentActivity]:
return None

# Build final message from the last emitted MessageActivityInput (last wins)
assert self._id is not None, "ID should be set by this point"
activity = self._final_activity or MessageActivityInput()
activity.with_text(self._text).with_id(self._id).with_channel_data(self._channel_data).add_stream_final()

res = await retry(lambda: self._send(activity), options=RetryOptions())
if self._timed_out:
activity = self._final_activity or MessageActivityInput()
activity.with_text(self._text)
activity.id = None
res = await retry(lambda: self._send(activity), options=RetryOptions())
else:
assert self._id is not None, "ID should be set by this point"
activity = self._final_activity or MessageActivityInput()
activity.with_text(self._text).with_id(self._id).with_channel_data(self._channel_data).add_stream_final()
try:
res = await retry(lambda: self._send(activity), options=RetryOptions())
except StreamCancelledError:
# Reaches this point if the streaming time exceeded 2 minutes on the final request.
if not self._timed_out:
raise
# The final stream send itself tripped the time limit; resend the
# buffered content as a regular message (cleared id -> create path).
final_message = self._final_activity or MessageActivityInput()
final_message.with_text(self._text)
final_message.id = None
res = await self._send(final_message)

# Emit close event
self._events.emit("close", res)
Expand Down Expand Up @@ -252,6 +277,9 @@ async def _flush(self) -> None:
logger.debug("No activities to flush")
return

if self._timed_out:
return

# Send informative updates immediately
for typing_update in informative_updates:
await self._send_activity(typing_update)
Expand Down Expand Up @@ -282,10 +310,15 @@ async def _send_activity(self, to_send: TypingActivityInput):
to_send = to_send.with_id(self._id)
to_send = to_send.add_stream_update(self._index)

res = await retry(
lambda: self._send(to_send),
options=RetryOptions(max_delay=4.0, jitter_type="none", max_attempts=8),
)
try:
res = await retry(
lambda: self._send(to_send),
options=RetryOptions(max_delay=4.0, jitter_type="none", max_attempts=8),
)
except StreamCancelledError:
if self._timed_out:
return
raise
self._events.emit("chunk", res)
self._index += 1
if self._id is None:
Expand Down Expand Up @@ -315,6 +348,13 @@ async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput])
return SentActivity.merge(to_send, res)
except HTTPStatusError as e:
if e.response.status_code == 403:
error = e.response.json().get("error", {})
message = error.get("message", "")
if message != "Content stream was cancelled by user.":

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also make sure that the backend has tests for these particular strings

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and maybe a more resilient error could be to just check for phrases like "canceled", "exceeded streaming time" etc.)

if message == "Content stream finished due to exceeded streaming time.":
self._timed_out = True
logger.warning("Teams encountered an error while streaming. Sending as a regular message.")
raise StreamCancelledError(message) from e
Comment on lines +353 to +357

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will raise the StreamCancelled error to the handler. We should probably raise a different error here.

Can we just do a switch statement here for various types of canceled errors? Also point to the learn docs that describe these.

self._canceled = True
logger.warning("Teams channel stopped the stream.")
raise StreamCancelledError("Teams channel stopped the stream.") from e
Expand Down
Loading