Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/apps/src/microsoft_teams/apps/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"""

from .remote_function_jwt_middleware import validate_remote_function_request
from .token_validator import TokenValidator
from .token_validator import InboundActivityTokenValidator, TokenValidator

__all__ = ["TokenValidator", "validate_remote_function_request"]
__all__ = ["InboundActivityTokenValidator", "TokenValidator", "validate_remote_function_request"]
62 changes: 61 additions & 1 deletion packages/apps/src/microsoft_teams/apps/auth/token_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from microsoft_teams.api.auth.cloud_environment import PUBLIC, CloudEnvironment

JWT_LEEWAY_SECONDS = 300 # Allowable clock skew when validating JWTs
_MAX_ENTRA_VALIDATOR_CACHE_SIZE = 100
ENTRA_V1_ISSUER_PREFIX = "https://sts.windows.net/"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -113,7 +115,7 @@ def for_entra(
# are still issued with the v1 issuer.
# See: https://learn.microsoft.com/en-us/entra/identity-platform/access-tokens
valid_issuers.append(f"{env.login_endpoint}/{tenant_id}/v2.0")
valid_issuers.append(f"https://sts.windows.net/{tenant_id}/")
valid_issuers.append(f"{ENTRA_V1_ISSUER_PREFIX}{tenant_id}/")
else:
logger.warning(
"No tenant_id provided for Entra token validation. "
Expand Down Expand Up @@ -222,3 +224,61 @@ def _validate_scope(self, payload: Dict[str, Any], required_scope: str) -> None:
if required_scope not in scope_set:
logger.error(f"Token missing required scope: {required_scope}")
raise jwt.InvalidTokenError(f"Token missing required scope: {required_scope}")


class InboundActivityTokenValidator:
"""Validator for inbound Teams activities.

Classic bot activities use Bot Framework connector tokens. Agent ID activities use
Entra tokens whose audience is the agent identity blueprint app ID.
"""

def __init__(self, app_id: str, cloud: Optional[CloudEnvironment] = None):
self._app_id = app_id
self._cloud = cloud or PUBLIC
self._service_validator = TokenValidator.for_service(app_id, cloud=self._cloud)
self._entra_validators_by_tenant: dict[str, TokenValidator] = {}

async def validate_token(self, raw_token: str, service_url: Optional[str] = None) -> Dict[str, Any]:
if not raw_token:
logger.error("No token provided")
raise jwt.InvalidTokenError("No token provided")

unverified_payload = jwt.decode(raw_token, algorithms=["RS256"], options={"verify_signature": False})
issuer = unverified_payload.get("iss", "")
Comment thread
Copilot marked this conversation as resolved.
if self._is_entra_issuer(issuer):
return await self._validate_entra_token(raw_token, unverified_payload)

return await self._service_validator.validate_token(raw_token, service_url)

def _is_entra_issuer(self, issuer: Any) -> bool:
if not isinstance(issuer, str):
return False

return issuer.startswith(self._cloud.login_endpoint) or issuer.startswith(ENTRA_V1_ISSUER_PREFIX)

async def _validate_entra_token(self, raw_token: str, unverified_payload: Dict[str, Any]) -> Dict[str, Any]:
tenant_id = unverified_payload.get("tid")
if not tenant_id or not isinstance(tenant_id, str):
raise jwt.InvalidTokenError("Entra inbound token is missing tid")

validator = self._get_entra_validator(tenant_id)
# TODO: Agent ID inbound Entra tokens currently do not include serviceurl. Revisit service URL
# validation for this path once the platform defines a signed service URL claim or equivalent.
return await validator.validate_token(raw_token)

def _get_entra_validator(self, tenant_id: str) -> TokenValidator:
cached_validator = self._entra_validators_by_tenant.get(tenant_id)
if cached_validator:
return cached_validator

validator = TokenValidator.for_entra(
self._app_id,
tenant_id,
cloud=self._cloud,
)
self._entra_validators_by_tenant[tenant_id] = validator
if len(self._entra_validators_by_tenant) > _MAX_ENTRA_VALIDATOR_CACHE_SIZE:
oldest_tenant_id = next(iter(self._entra_validators_by_tenant))
self._entra_validators_by_tenant.pop(oldest_tenant_id)
return validator
Comment thread
heyitsaamir marked this conversation as resolved.
6 changes: 3 additions & 3 deletions packages/apps/src/microsoft_teams/apps/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from microsoft_teams.api.auth.json_web_token import JsonWebToken
from pydantic import BaseModel

from ..auth import TokenValidator
from ..auth import InboundActivityTokenValidator
from ..events import ActivityEvent, CoreActivity
from .adapter import HttpRequest, HttpResponse, HttpServerAdapter

Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(self, adapter: HttpServerAdapter, messaging_endpoint: str = "/api/m
raise ValueError("messaging_endpoint must be a non-empty path starting with '/'.")
self._messaging_endpoint = normalized_endpoint
self._on_request: Optional[Callable[[ActivityEvent], Awaitable[InvokeResponse[Any]]]] = None
self._token_validator: Optional[TokenValidator] = None
self._token_validator: Optional[InboundActivityTokenValidator] = None
self._skip_auth: bool = False
self._cloud: CloudEnvironment = PUBLIC
self._initialized: bool = False
Expand Down Expand Up @@ -89,7 +89,7 @@ def initialize(

app_id = getattr(credentials, "client_id", None) if credentials else None
if app_id and not skip_auth:
self._token_validator = TokenValidator.for_service(
self._token_validator = InboundActivityTokenValidator(
app_id,
cloud=self._cloud,
)
Expand Down
96 changes: 94 additions & 2 deletions packages/apps/tests/test_token_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
Licensed under the MIT License.
"""

from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch

import jwt
import pytest
from microsoft_teams.apps.auth.token_validator import TokenValidator
from microsoft_teams.apps.auth import token_validator as token_validator_module
from microsoft_teams.apps.auth.token_validator import InboundActivityTokenValidator, TokenValidator

# pyright: basic

Expand Down Expand Up @@ -407,3 +408,94 @@ async def test_validate_entra_token_v1_sts_issuer(self, mock_jwks_client):
result = await validator.validate_token("v1.entra.token")
assert result["iss"] == "https://sts.windows.net/test-tenant-id/"
assert result["ver"] == "1.0"


class TestInboundActivityTokenValidator:
@pytest.mark.asyncio
async def test_validate_token_uses_service_validator_for_bot_framework_tokens(self):
validator = InboundActivityTokenValidator("test-app-id")
validator._service_validator.validate_token = AsyncMock(return_value={"iss": "https://api.botframework.com"})

with patch("jwt.decode", return_value={"iss": "https://api.botframework.com"}) as decode:
result = await validator.validate_token("bot-token", "https://service.example")

assert result == {"iss": "https://api.botframework.com"}
decode.assert_called_once_with("bot-token", algorithms=["RS256"], options={"verify_signature": False})
validator._service_validator.validate_token.assert_called_once_with("bot-token", "https://service.example")

@pytest.mark.asyncio
async def test_validate_token_uses_entra_validator_for_v2_issuer(self):
validator = InboundActivityTokenValidator("test-app-id")
validator._service_validator.validate_token = AsyncMock()
entra_validator = MagicMock()
entra_validator.validate_token = AsyncMock(return_value={"tid": "tenant-id"})

with patch.object(validator, "_get_entra_validator", return_value=entra_validator) as get_validator:
with patch(
"jwt.decode",
return_value={"iss": "https://login.microsoftonline.com/tenant-id/v2.0", "tid": "tenant-id"},
):
result = await validator.validate_token("entra-token", "https://service.example")

assert result == {"tid": "tenant-id"}
get_validator.assert_called_once_with("tenant-id")
entra_validator.validate_token.assert_called_once_with("entra-token")
validator._service_validator.validate_token.assert_not_called()

@pytest.mark.asyncio
async def test_validate_token_uses_entra_validator_for_v1_sts_issuer(self):
validator = InboundActivityTokenValidator("test-app-id")
entra_validator = MagicMock()
entra_validator.validate_token = AsyncMock(return_value={"tid": "tenant-id"})

with patch.object(validator, "_get_entra_validator", return_value=entra_validator) as get_validator:
with patch("jwt.decode", return_value={"iss": "https://sts.windows.net/tenant-id/", "tid": "tenant-id"}):
result = await validator.validate_token("entra-v1-token")

assert result == {"tid": "tenant-id"}
get_validator.assert_called_once_with("tenant-id")
entra_validator.validate_token.assert_called_once_with("entra-v1-token")

@pytest.mark.asyncio
async def test_validate_token_rejects_entra_token_without_tid(self):
validator = InboundActivityTokenValidator("test-app-id")

with patch("jwt.decode", return_value={"iss": "https://login.microsoftonline.com/tenant-id/v2.0"}):
with pytest.raises(jwt.InvalidTokenError, match="missing tid"):
await validator.validate_token("entra-token")

@pytest.mark.asyncio
async def test_validate_token_rejects_empty_token_before_routing_decode(self):
validator = InboundActivityTokenValidator("test-app-id")

with patch("jwt.decode") as decode:
with pytest.raises(jwt.InvalidTokenError, match="No token provided"):
await validator.validate_token("")

decode.assert_not_called()

def test_get_entra_validator_caches_by_tenant(self):
validator = InboundActivityTokenValidator("test-app-id")

with patch("microsoft_teams.apps.auth.token_validator.TokenValidator.for_entra") as for_entra:
for_entra.return_value = MagicMock()

first = validator._get_entra_validator("tenant-id")
second = validator._get_entra_validator("tenant-id")

assert first is second
for_entra.assert_called_once_with("test-app-id", "tenant-id", cloud=validator._cloud)

def test_get_entra_validator_cache_is_bounded(self):
validator = InboundActivityTokenValidator("test-app-id")

with patch("microsoft_teams.apps.auth.token_validator.TokenValidator.for_entra") as for_entra:
for_entra.side_effect = lambda _app_id, tenant_id, **_kwargs: MagicMock(name=tenant_id)

for index in range(token_validator_module._MAX_ENTRA_VALIDATOR_CACHE_SIZE + 1):
validator._get_entra_validator(f"tenant-{index}")

assert len(validator._entra_validators_by_tenant) == token_validator_module._MAX_ENTRA_VALIDATOR_CACHE_SIZE
assert "tenant-0" not in validator._entra_validators_by_tenant
last_tenant_id = f"tenant-{token_validator_module._MAX_ENTRA_VALIDATOR_CACHE_SIZE}"
assert last_tenant_id in validator._entra_validators_by_tenant
Loading