Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions packages/apps/src/microsoft_teams/apps/auth/token_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class JwtValidationOptions:
""" Optional scope that must be present in the token """
clock_tolerance: int = JWT_LEEWAY_SECONDS
""" Allowable clock skew when validating JWTs """
app_id: Optional[str] = None
""" Optional app ID used to select per-token validators for inbound activities """
cloud: CloudEnvironment = PUBLIC
""" Cloud environment used to select issuer and JWKS endpoints """


class TokenValidator:
Expand Down Expand Up @@ -83,9 +87,24 @@ def for_service(
valid_audiences=cls._default_audiences(app_id),
jwks_uri=jwks_keys_uri,
service_url=service_url,
app_id=app_id,
cloud=env,
)
return cls(options)

@classmethod
def for_inbound_activity(
cls,
app_id: str,
cloud: Optional[CloudEnvironment] = None,
) -> TokenValidator:
"""Create a validator for inbound Teams activities.

Classic bot activities use Bot Framework connector tokens. Agent ID activities use
Entra tokens whose audience is the agent identity blueprint app ID.
"""
return cls.for_service(app_id, cloud=cloud)

@classmethod
def for_entra(
cls,
Expand Down Expand Up @@ -128,9 +147,38 @@ def for_entra(
valid_audiences=valid_audiences,
jwks_uri=f"{env.login_endpoint}/{tenant_id}/discovery/v2.0/keys",
scope=scope,
app_id=app_id,
cloud=env,
)
return cls(options)

async def validate_inbound_activity_token(
self, raw_token: str, service_url: Optional[str] = None
) -> Dict[str, Any]:
unverified_payload = jwt.decode(raw_token, options={"verify_signature": False})
issuer = unverified_payload.get("iss", "")
if isinstance(issuer, str) and issuer.startswith(self.options.cloud.login_endpoint):
return await self._validate_entra_inbound_activity_token(raw_token, unverified_payload)

return await self.validate_token(raw_token, service_url)

async def _validate_entra_inbound_activity_token(
self, raw_token: str, unverified_payload: Dict[str, Any]
) -> Dict[str, Any]:
if not self.options.app_id:
raise ValueError("App ID is required for Entra inbound token validation")

tenant_id = unverified_payload.get("tid")
if not tenant_id or not isinstance(tenant_id, str):
raise jwt.InvalidTokenError("Entra inbound token is missing tid")

validator = TokenValidator.for_entra(
self.options.app_id,
tenant_id,
cloud=self.options.cloud,
)
return await validator.validate_token(raw_token)

async def validate_token(
self, raw_token: str, service_url: Optional[str] = None, scope: Optional[str] = None
) -> Dict[str, Any]:
Expand Down
4 changes: 2 additions & 2 deletions packages/apps/src/microsoft_teams/apps/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def initialize(

app_id = getattr(credentials, "client_id", None) if credentials else None
if app_id and not skip_auth:
self._token_validator = TokenValidator.for_service(
self._token_validator = TokenValidator.for_inbound_activity(
app_id,
cloud=self._cloud,
)
Expand Down Expand Up @@ -156,7 +156,7 @@ async def handle_request(self, request: HttpRequest) -> HttpResponse:
service_url = cast(Optional[str], body.get("serviceUrl"))

try:
await self._token_validator.validate_token(raw_token, service_url)
await self._token_validator.validate_inbound_activity_token(raw_token, service_url)
except Exception as e:
logger.warning("JWT token validation failed: %s", e)
return HttpResponse(status=401, body={"error": "Unauthorized"})
Expand Down
Loading