diff --git a/examples/agent365/README.md b/examples/agent365/README.md new file mode 100644 index 000000000..c3164c58a --- /dev/null +++ b/examples/agent365/README.md @@ -0,0 +1,31 @@ +# agent365 + +Acquire an Agent 365 agent user token using an agent identity blueprint, an agent identity app ID, and an agent user. + +## Run + +Set these environment variables or add them to `.env`: + +```bash +AGENT365_TENANT_ID= +AGENT365_BLUEPRINT_CLIENT_ID= +AGENT365_BLUEPRINT_CLIENT_SECRET= +AGENT365_AGENT_IDENTITY_APP_ID= +AGENT365_AGENT_USER_ID= +``` + +Then run: + +```bash +uv run --project examples/agent365 python src/main.py +``` + +By default this requests a Teams bot API token for `https://botapi.skype.com/.default`. + +To request another resource, set `AGENT365_SCOPE`, for example: + +```bash +AGENT365_SCOPE=https://graph.microsoft.com/.default +``` + +You can use `AGENT365_AGENT_USER_UPN` instead of `AGENT365_AGENT_USER_ID`. diff --git a/examples/agent365/pyproject.toml b/examples/agent365/pyproject.toml new file mode 100644 index 000000000..43fc1ee3d --- /dev/null +++ b/examples/agent365/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "agent365" +version = "0.1.0" +description = "Agent 365 token example" +readme = "README.md" +requires-python = ">=3.11,<4.0" +dependencies = [ + "dotenv>=0.9.9", + "microsoft-teams-apps", +] + +[tool.uv.sources] +microsoft-teams-apps = { workspace = true } diff --git a/examples/agent365/src/main.py b/examples/agent365/src/main.py new file mode 100644 index 000000000..358f108c4 --- /dev/null +++ b/examples/agent365/src/main.py @@ -0,0 +1,53 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. +""" + +import asyncio +import os + +from dotenv import load_dotenv +from microsoft_teams.api import ClientCredentials +from microsoft_teams.apps.token_manager import AGENT_BOT_API_SCOPE, TokenManager + + +def get_required_env(name: str) -> str: + value = os.getenv(name) + if not value: + raise ValueError(f"{name} must be set") + + return value + + +async def main(): + load_dotenv() + + tenant_id = get_required_env("AGENT365_TENANT_ID") + blueprint_client_id = get_required_env("AGENT365_BLUEPRINT_CLIENT_ID") + blueprint_client_secret = get_required_env("AGENT365_BLUEPRINT_CLIENT_SECRET") + agentic_app_id = get_required_env("AGENT365_AGENTIC_APP_ID") + agentic_user_id = os.getenv("AGENT365_AGENTIC_USER_ID") + agentic_user_upn = os.getenv("AGENT365_AGENTIC_USER_UPN") + scope = os.getenv("AGENT365_SCOPE", AGENT_BOT_API_SCOPE) + + credentials = ClientCredentials( + client_id=blueprint_client_id, + client_secret=blueprint_client_secret, + tenant_id=tenant_id, + ) + token_manager = TokenManager(credentials=credentials) + + token = await token_manager.get_agentic_token( + tenant_id, + agentic_app_id, + scope, + agentic_user_id=agentic_user_id, + agentic_user_upn=agentic_user_upn, + ) + + print(f"Acquired agent user token for {scope}") + print(f"Token preview: {str(token)[:20]}...") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/apps/pyproject.toml b/packages/apps/pyproject.toml index c27b9db69..6a2b33c7f 100644 --- a/packages/apps/pyproject.toml +++ b/packages/apps/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "cryptography>=3.4.0", "pyjwt[crypto]>=2.12.0", "dependency-injector>=4.48.1", - "msal>=1.33.0", + "msal>=1.37.0", "python-dotenv>=1.0.0", "pydantic-settings>=2.11.0", ] diff --git a/packages/apps/src/microsoft_teams/apps/token_manager.py b/packages/apps/src/microsoft_teams/apps/token_manager.py index e4c3525f1..18eb00f30 100644 --- a/packages/apps/src/microsoft_teams/apps/token_manager.py +++ b/packages/apps/src/microsoft_teams/apps/token_manager.py @@ -6,7 +6,7 @@ import asyncio import logging from inspect import isawaitable -from typing import Any, Optional +from typing import Any, Callable, Optional import requests from microsoft_teams.api import ( @@ -29,6 +29,8 @@ ) DEFAULT_TENANT_FOR_GRAPH_TOKEN = "common" +TOKEN_EXCHANGE_SCOPE = "api://AzureADTokenExchange/.default" +AGENT_BOT_API_SCOPE = "https://botapi.skype.com/.default" logger = logging.getLogger(__name__) @@ -45,6 +47,7 @@ def __init__( self._cloud = cloud or PUBLIC self._confidential_clients_by_tenant: dict[str, ConfidentialClientApplication] = {} self._federated_identity_clients_by_tenant: dict[str, ConfidentialClientApplication] = {} + self._agent_identity_clients_by_tenant_and_app_id: dict[tuple[str, str], ConfidentialClientApplication] = {} self._managed_identity_client: Optional[ManagedIdentityClient] = None async def get_bot_token(self) -> Optional[TokenProtocol]: @@ -68,6 +71,70 @@ async def get_graph_token(self, tenant_id: Optional[str] = None) -> Optional[Tok self._cloud.graph_scope, tenant_id=self._resolve_tenant_id(tenant_id, DEFAULT_TENANT_FOR_GRAPH_TOKEN) ) + async def get_agentic_token( + self, + tenant_id: str, + agentic_app_id: str, + scope: str, + *, + agentic_user_id: str | None = None, + agentic_user_upn: str | None = None, + caller_name: str | None = None, + ) -> Optional[TokenProtocol]: + """Get a resource token for an agentic identity acting through its agentic user.""" + if not agentic_user_id and not agentic_user_upn: + raise ValueError("Either agentic_user_id or agentic_user_upn must be provided") + if agentic_user_id and agentic_user_upn: + raise ValueError("agentic_user_id and agentic_user_upn are mutually exclusive") + if self._credentials is None: + if caller_name: + logger.debug(f"No credentials provided for {caller_name}") + return None + + credentials = self._credentials + if not isinstance(credentials, ClientCredentials): + raise ValueError("Agent user tokens require ClientCredentials") + confidential_client = self._get_confidential_client(credentials, tenant_id) + + def get_t1_assertion(_context: dict[str, Any]) -> str: + t1_raw: dict[str, Any] = confidential_client.acquire_token_for_client( + [TOKEN_EXCHANGE_SCOPE], fmi_path=agentic_app_id + ) + return self._get_access_token_or_raise(t1_raw, "Agent token exchange step 1 failed") + + # The agent identity app needs its own MSAL client. It uses the Federated Managed + # Identity assertion from step 1 as its client assertion for the next exchanges. + t2_confidential_client = self._get_agent_identity_client( + tenant_id, + agentic_app_id, + get_t1_assertion, + ) + + t2_raw: dict[str, Any] = await asyncio.to_thread( + lambda: t2_confidential_client.acquire_token_for_client([TOKEN_EXCHANGE_SCOPE]) + ) + + t2 = self._get_access_token_or_raise(t2_raw, "Agent token exchange step 2 failed") + + t3_raw: dict[str, Any] = await asyncio.to_thread( + lambda: t2_confidential_client.acquire_token_by_user_federated_identity_credential( + [scope], + assertion=t2, + user_object_id=agentic_user_id, + username=agentic_user_upn, + data={"requested_token_use": "on_behalf_of"}, + ) + ) + return self._handle_token_response(t3_raw, caller_name or "get_agentic_token") + + def _get_access_token_or_raise(self, token_res: dict[str, Any], error_prefix: str) -> str: + if token_res.get("access_token", None): + return token_res["access_token"] + + error_description = token_res.get("error_description") or token_res.get("error") or "Could not acquire token" + logger.error(f"{error_prefix}: {error_description}") + raise ValueError(f"{error_prefix}: {error_description}") + async def _get_token( self, scope: str, tenant_id: str, *, caller_name: str | None = None ) -> Optional[TokenProtocol]: @@ -220,6 +287,24 @@ def _get_federated_identity_client( self._federated_identity_clients_by_tenant[tenant_id] = client return client + def _get_agent_identity_client( + self, + tenant_id: str, + agentic_app_id: str, + client_assertion: Callable[[dict[str, Any]], str], + ) -> ConfidentialClientApplication: + cached_client = self._agent_identity_clients_by_tenant_and_app_id.get((tenant_id, agentic_app_id)) + if cached_client: + return cached_client + + client: ConfidentialClientApplication = ConfidentialClientApplication( + agentic_app_id, + client_credential={"client_assertion": client_assertion}, + authority=f"{self._cloud.login_endpoint}/{tenant_id}", + ) + self._agent_identity_clients_by_tenant_and_app_id[(tenant_id, agentic_app_id)] = client + return client + def _get_managed_identity_client( self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials ) -> ManagedIdentityClient: diff --git a/packages/apps/tests/test_token_manager.py b/packages/apps/tests/test_token_manager.py index dd75d3601..a60e54030 100644 --- a/packages/apps/tests/test_token_manager.py +++ b/packages/apps/tests/test_token_manager.py @@ -16,7 +16,7 @@ ManagedIdentityCredentials, ) from microsoft_teams.api.auth.credentials import TokenCredentials -from microsoft_teams.apps.token_manager import TokenManager +from microsoft_teams.apps.token_manager import AGENT_BOT_API_SCOPE, TOKEN_EXCHANGE_SCOPE, TokenManager from msal import ManagedIdentityClient # pyright: ignore[reportMissingTypeStubs] # Valid JWT-like token for testing (format: header.payload.signature) @@ -30,6 +30,88 @@ class TestTokenManager: """Test TokenManager functionality.""" + @pytest.mark.asyncio + async def test_get_agentic_token_uses_agent_identity_flow(self): + mock_credentials = ClientCredentials( + client_id="blueprint-client-id", + client_secret="blueprint-client-secret", + tenant_id="tenant-id", + ) + + blueprint_app = MagicMock() + blueprint_app.acquire_token_for_client.return_value = {"access_token": "t1-token"} + + agent_app = MagicMock() + agent_app.acquire_token_for_client.side_effect = lambda _scopes: ( + mock_confidential_app.call_args_list[1].kwargs["client_credential"]["client_assertion"]({}), + {"access_token": "t2-token"}, + )[1] + agent_app.acquire_token_by_user_federated_identity_credential.return_value = {"access_token": VALID_TEST_TOKEN} + + with patch("microsoft_teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app: + mock_confidential_app.side_effect = [blueprint_app, agent_app] + + manager = TokenManager(credentials=mock_credentials) + token = await manager.get_agentic_token( + "tenant-id", + "agentic-app-id", + AGENT_BOT_API_SCOPE, + agentic_user_id="agentic-user-id", + ) + + assert token is not None + assert str(token) == VALID_TEST_TOKEN + + blueprint_app.acquire_token_for_client.assert_called_once_with( + [TOKEN_EXCHANGE_SCOPE], fmi_path="agentic-app-id" + ) + agent_app.acquire_token_for_client.assert_called_once_with([TOKEN_EXCHANGE_SCOPE]) + agent_app.acquire_token_by_user_federated_identity_credential.assert_called_once_with( + [AGENT_BOT_API_SCOPE], + assertion="t2-token", + user_object_id="agentic-user-id", + username=None, + data={"requested_token_use": "on_behalf_of"}, + ) + + first_call, second_call = mock_confidential_app.call_args_list + assert first_call.args == ("blueprint-client-id",) + assert first_call.kwargs == { + "client_credential": "blueprint-client-secret", + "authority": "https://login.microsoftonline.com/tenant-id", + } + assert second_call.args == ("agentic-app-id",) + assert second_call.kwargs["authority"] == "https://login.microsoftonline.com/tenant-id" + assert callable(second_call.kwargs["client_credential"]["client_assertion"]) + + @pytest.mark.asyncio + async def test_get_agentic_token_caches_agent_identity_client(self): + mock_credentials = ClientCredentials( + client_id="blueprint-client-id", + client_secret="blueprint-client-secret", + tenant_id="tenant-id", + ) + + blueprint_app = MagicMock() + blueprint_app.acquire_token_for_client.return_value = {"access_token": "t1-token"} + + agent_app = MagicMock() + agent_app.acquire_token_for_client.return_value = {"access_token": "t2-token"} + agent_app.acquire_token_by_user_federated_identity_credential.return_value = {"access_token": VALID_TEST_TOKEN} + + with patch("microsoft_teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app: + mock_confidential_app.side_effect = [blueprint_app, agent_app] + + manager = TokenManager(credentials=mock_credentials) + await manager.get_agentic_token( + "tenant-id", "agentic-app-id", AGENT_BOT_API_SCOPE, agentic_user_id="agentic-user-id" + ) + await manager.get_agentic_token( + "tenant-id", "agentic-app-id", AGENT_BOT_API_SCOPE, agentic_user_id="agentic-user-id" + ) + + assert mock_confidential_app.call_count == 2 + @pytest.mark.asyncio async def test_get_bot_token_success(self): """Test successful bot token retrieval using MSAL.""" diff --git a/stubs/msal/__init__.pyi b/stubs/msal/__init__.pyi index 0ec244167..5e5272ced 100644 --- a/stubs/msal/__init__.pyi +++ b/stubs/msal/__init__.pyi @@ -1,8 +1,8 @@ """Type stubs for msal""" -from typing import Any, Callable, TypeAlias +from typing import Any, Callable, Optional, TypeAlias -ClientCredential: TypeAlias = str | dict[str, str | Callable[[], str]] | None +ClientCredential: TypeAlias = str | dict[str, str | Callable[[], str] | Callable[[dict[str, Any]], str]] | None class ConfidentialClientApplication: """MSAL Confidential Client Application""" @@ -16,7 +16,22 @@ class ConfidentialClientApplication: **kwargs: Any, ) -> None: ... def acquire_token_for_client( - self, scopes: list[str] | str, claims_challenge: str | None = None, **kwargs: Any + self, + scopes: list[str] | str, + claims_challenge: str | None = None, + *, + fmi_path: str | None = None, + **kwargs: Any, + ) -> dict[str, Any]: ... + def acquire_token_by_user_federated_identity_credential( + self, + scopes: list[str], + assertion: str | Callable[[], str], + *, + username: Optional[str] = None, + user_object_id: Optional[str] = None, + claims_challenge: Optional[None] = None, + **kwargs: Any, ) -> dict[str, Any]: ... class SystemAssignedManagedIdentity: diff --git a/uv.lock b/uv.lock index 3ef2ba7ac..061ce1f51 100644 --- a/uv.lock +++ b/uv.lock @@ -9,6 +9,7 @@ resolution-markers = [ [manifest] members = [ "a2a", + "agent365", "ai-agentframework", "botbuilder", "cards", @@ -133,6 +134,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1c/58/1878b9ac4db703f40c687129c0bccdbf88c94d557b64f0172f3c4e954558/agent_framework_openai-1.1.0-py3-none-any.whl", hash = "sha256:8fbcdb87fbc3fb6aa6f3d781a61a2c48fbc308d2ee8165454f94e53e026a787b", size = 50280, upload-time = "2026-04-21T06:20:11.864Z" }, ] +[[package]] +name = "agent365" +version = "0.1.0" +source = { virtual = "examples/agent365" } +dependencies = [ + { name = "dotenv" }, + { name = "microsoft-teams-apps" }, +] + +[package.metadata] +requires-dist = [ + { name = "dotenv", specifier = ">=0.9.9" }, + { name = "microsoft-teams-apps", editable = "packages/apps" }, +] + [[package]] name = "ai-agentframework" version = "0.1.0" @@ -1822,7 +1838,7 @@ requires-dist = [ { name = "microsoft-teams-api", editable = "packages/api" }, { name = "microsoft-teams-common", editable = "packages/common" }, { name = "microsoft-teams-graph", marker = "extra == 'graph'", editable = "packages/graph" }, - { name = "msal", specifier = ">=1.33.0" }, + { name = "msal", specifier = ">=1.37.0" }, { name = "pydantic-settings", specifier = ">=2.11.0" }, { name = "pyjwt", extras = ["crypto"], specifier = ">=2.12.0" }, { name = "python-dotenv", specifier = ">=1.0.0" }, @@ -1935,16 +1951,16 @@ dev = [ [[package]] name = "msal" -version = "1.34.0" +version = "1.37.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cryptography" }, { name = "pyjwt", extra = ["crypto"] }, { name = "requests" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/cf/0e/c857c46d653e104019a84f22d4494f2119b4fe9f896c92b4b864b3b045cc/msal-1.34.0.tar.gz", hash = "sha256:76ba83b716ea5a6d75b0279c0ac353a0e05b820ca1f6682c0eb7f45190c43c2f", size = 153961, upload-time = "2025-09-22T23:05:48.989Z" } +sdist = { url = "https://files.pythonhosted.org/packages/9a/99/d840198ecf6e8057bbc937f129ae940404485d736cda73253bbff9537f01/msal-1.37.0.tar.gz", hash = "sha256:1b1672a33ee467c1d70b341bb16cafd51bb3c817147a95b93263794b03971bec", size = 182444, upload-time = "2026-05-29T19:49:05.561Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c2/dc/18d48843499e278538890dc709e9ee3dea8375f8be8e82682851df1b48b5/msal-1.34.0-py3-none-any.whl", hash = "sha256:f669b1644e4950115da7a176441b0e13ec2975c29528d8b9e81316023676d6e1", size = 116987, upload-time = "2025-09-22T23:05:47.294Z" }, + { url = "https://files.pythonhosted.org/packages/94/b0/d807279f4b55d16d1f120d5ac4344c6e39b56732e2a224d40bded7fd67ad/msal-1.37.0-py3-none-any.whl", hash = "sha256:dd17e95a7c71bce75e8108113438ba7c4a086b3bcad4f57a8c09b7af3d753c2d", size = 123725, upload-time = "2026-05-29T19:49:04.335Z" }, ] [[package]]