Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/agent365/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# agent365

Acquire an Agent 365 agent user token using an agent identity blueprint, an agent identity app ID, and an agent user.

## Run

Set these environment variables or add them to `.env`:

```bash
AGENT365_TENANT_ID=<tenant-id>
AGENT365_BLUEPRINT_CLIENT_ID=<agent-identity-blueprint-app-id>
AGENT365_BLUEPRINT_CLIENT_SECRET=<agent-identity-blueprint-secret>
AGENT365_AGENT_IDENTITY_APP_ID=<agent-identity-app-id>
AGENT365_AGENT_USER_ID=<agent-user-object-id>
```
Comment thread
lilyydu marked this conversation as resolved.

Then run:

```bash
uv run --project examples/agent365 python src/main.py
```

By default this requests a Teams bot API token for `https://botapi.skype.com/.default`.

To request another resource, set `AGENT365_SCOPE`, for example:

```bash
AGENT365_SCOPE=https://graph.microsoft.com/.default
```

You can use `AGENT365_AGENT_USER_UPN` instead of `AGENT365_AGENT_USER_ID`.
13 changes: 13 additions & 0 deletions examples/agent365/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[project]
name = "agent365"
version = "0.1.0"
description = "Agent 365 token example"
readme = "README.md"
requires-python = ">=3.11,<4.0"
dependencies = [
"dotenv>=0.9.9",
"microsoft-teams-apps",
]

[tool.uv.sources]
microsoft-teams-apps = { workspace = true }
53 changes: 53 additions & 0 deletions examples/agent365/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.
"""

import asyncio
import os

from dotenv import load_dotenv
from microsoft_teams.api import ClientCredentials
from microsoft_teams.apps.token_manager import AGENT_BOT_API_SCOPE, TokenManager


def get_required_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"{name} must be set")

return value


async def main():
load_dotenv()

tenant_id = get_required_env("AGENT365_TENANT_ID")
blueprint_client_id = get_required_env("AGENT365_BLUEPRINT_CLIENT_ID")
blueprint_client_secret = get_required_env("AGENT365_BLUEPRINT_CLIENT_SECRET")
agentic_app_id = get_required_env("AGENT365_AGENTIC_APP_ID")
agentic_user_id = os.getenv("AGENT365_AGENTIC_USER_ID")
agentic_user_upn = os.getenv("AGENT365_AGENTIC_USER_UPN")
scope = os.getenv("AGENT365_SCOPE", AGENT_BOT_API_SCOPE)

credentials = ClientCredentials(
client_id=blueprint_client_id,
client_secret=blueprint_client_secret,
tenant_id=tenant_id,
)
token_manager = TokenManager(credentials=credentials)

token = await token_manager.get_agentic_token(
tenant_id,
agentic_app_id,
scope,
agentic_user_id=agentic_user_id,
agentic_user_upn=agentic_user_upn,
)

print(f"Acquired agent user token for {scope}")
print(f"Token preview: {str(token)[:20]}...")


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion packages/apps/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
"cryptography>=3.4.0",
"pyjwt[crypto]>=2.12.0",
"dependency-injector>=4.48.1",
"msal>=1.33.0",
"msal>=1.37.0",
"python-dotenv>=1.0.0",
"pydantic-settings>=2.11.0",
]
Expand Down
87 changes: 86 additions & 1 deletion packages/apps/src/microsoft_teams/apps/token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import asyncio
import logging
from inspect import isawaitable
from typing import Any, Optional
from typing import Any, Callable, Optional

import requests
from microsoft_teams.api import (
Expand All @@ -29,6 +29,8 @@
)

DEFAULT_TENANT_FOR_GRAPH_TOKEN = "common"
TOKEN_EXCHANGE_SCOPE = "api://AzureADTokenExchange/.default"
AGENT_BOT_API_SCOPE = "https://botapi.skype.com/.default"

logger = logging.getLogger(__name__)

Expand All @@ -45,6 +47,7 @@ def __init__(
self._cloud = cloud or PUBLIC
self._confidential_clients_by_tenant: dict[str, ConfidentialClientApplication] = {}
self._federated_identity_clients_by_tenant: dict[str, ConfidentialClientApplication] = {}
self._agent_identity_clients_by_tenant_and_app_id: dict[tuple[str, str], ConfidentialClientApplication] = {}
self._managed_identity_client: Optional[ManagedIdentityClient] = None

async def get_bot_token(self) -> Optional[TokenProtocol]:
Expand All @@ -68,6 +71,70 @@ async def get_graph_token(self, tenant_id: Optional[str] = None) -> Optional[Tok
self._cloud.graph_scope, tenant_id=self._resolve_tenant_id(tenant_id, DEFAULT_TENANT_FOR_GRAPH_TOKEN)
)

async def get_agentic_token(
self,
tenant_id: str,
agentic_app_id: str,
scope: str,
*,
agentic_user_id: str | None = None,
agentic_user_upn: str | None = None,
caller_name: str | None = None,
) -> Optional[TokenProtocol]:
"""Get a resource token for an agentic identity acting through its agentic user."""
if not agentic_user_id and not agentic_user_upn:
raise ValueError("Either agentic_user_id or agentic_user_upn must be provided")
if agentic_user_id and agentic_user_upn:
raise ValueError("agentic_user_id and agentic_user_upn are mutually exclusive")
if self._credentials is None:
if caller_name:
logger.debug(f"No credentials provided for {caller_name}")
return None

credentials = self._credentials
if not isinstance(credentials, ClientCredentials):
raise ValueError("Agent user tokens require ClientCredentials")
confidential_client = self._get_confidential_client(credentials, tenant_id)

def get_t1_assertion(_context: dict[str, Any]) -> str:
t1_raw: dict[str, Any] = confidential_client.acquire_token_for_client(
[TOKEN_EXCHANGE_SCOPE], fmi_path=agentic_app_id
)
return self._get_access_token_or_raise(t1_raw, "Agent token exchange step 1 failed")

# The agent identity app needs its own MSAL client. It uses the Federated Managed
# Identity assertion from step 1 as its client assertion for the next exchanges.
t2_confidential_client = self._get_agent_identity_client(
Comment thread
heyitsaamir marked this conversation as resolved.
tenant_id,
agentic_app_id,
get_t1_assertion,
)

t2_raw: dict[str, Any] = await asyncio.to_thread(
Comment thread
heyitsaamir marked this conversation as resolved.
lambda: t2_confidential_client.acquire_token_for_client([TOKEN_EXCHANGE_SCOPE])
)

t2 = self._get_access_token_or_raise(t2_raw, "Agent token exchange step 2 failed")

t3_raw: dict[str, Any] = await asyncio.to_thread(
lambda: t2_confidential_client.acquire_token_by_user_federated_identity_credential(
[scope],
assertion=t2,
user_object_id=agentic_user_id,
username=agentic_user_upn,
data={"requested_token_use": "on_behalf_of"},
)
)
return self._handle_token_response(t3_raw, caller_name or "get_agentic_token")

def _get_access_token_or_raise(self, token_res: dict[str, Any], error_prefix: str) -> str:
if token_res.get("access_token", None):
return token_res["access_token"]

error_description = token_res.get("error_description") or token_res.get("error") or "Could not acquire token"
logger.error(f"{error_prefix}: {error_description}")
raise ValueError(f"{error_prefix}: {error_description}")

async def _get_token(
self, scope: str, tenant_id: str, *, caller_name: str | None = None
) -> Optional[TokenProtocol]:
Expand Down Expand Up @@ -220,6 +287,24 @@ def _get_federated_identity_client(
self._federated_identity_clients_by_tenant[tenant_id] = client
return client

def _get_agent_identity_client(
self,
tenant_id: str,
agentic_app_id: str,
client_assertion: Callable[[dict[str, Any]], str],
) -> ConfidentialClientApplication:
cached_client = self._agent_identity_clients_by_tenant_and_app_id.get((tenant_id, agentic_app_id))
if cached_client:
return cached_client

client: ConfidentialClientApplication = ConfidentialClientApplication(
agentic_app_id,
client_credential={"client_assertion": client_assertion},
authority=f"{self._cloud.login_endpoint}/{tenant_id}",
)
self._agent_identity_clients_by_tenant_and_app_id[(tenant_id, agentic_app_id)] = client
return client

def _get_managed_identity_client(
self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials
) -> ManagedIdentityClient:
Expand Down
84 changes: 83 additions & 1 deletion packages/apps/tests/test_token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ManagedIdentityCredentials,
)
from microsoft_teams.api.auth.credentials import TokenCredentials
from microsoft_teams.apps.token_manager import TokenManager
from microsoft_teams.apps.token_manager import AGENT_BOT_API_SCOPE, TOKEN_EXCHANGE_SCOPE, TokenManager
from msal import ManagedIdentityClient # pyright: ignore[reportMissingTypeStubs]

# Valid JWT-like token for testing (format: header.payload.signature)
Expand All @@ -30,6 +30,88 @@
class TestTokenManager:
"""Test TokenManager functionality."""

@pytest.mark.asyncio
async def test_get_agentic_token_uses_agent_identity_flow(self):
mock_credentials = ClientCredentials(
client_id="blueprint-client-id",
client_secret="blueprint-client-secret",
tenant_id="tenant-id",
)

blueprint_app = MagicMock()
blueprint_app.acquire_token_for_client.return_value = {"access_token": "t1-token"}

agent_app = MagicMock()
agent_app.acquire_token_for_client.side_effect = lambda _scopes: (
mock_confidential_app.call_args_list[1].kwargs["client_credential"]["client_assertion"]({}),
{"access_token": "t2-token"},
)[1]
agent_app.acquire_token_by_user_federated_identity_credential.return_value = {"access_token": VALID_TEST_TOKEN}

with patch("microsoft_teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app:
mock_confidential_app.side_effect = [blueprint_app, agent_app]

manager = TokenManager(credentials=mock_credentials)
token = await manager.get_agentic_token(
"tenant-id",
"agentic-app-id",
AGENT_BOT_API_SCOPE,
agentic_user_id="agentic-user-id",
)

assert token is not None
assert str(token) == VALID_TEST_TOKEN

blueprint_app.acquire_token_for_client.assert_called_once_with(
[TOKEN_EXCHANGE_SCOPE], fmi_path="agentic-app-id"
)
agent_app.acquire_token_for_client.assert_called_once_with([TOKEN_EXCHANGE_SCOPE])
agent_app.acquire_token_by_user_federated_identity_credential.assert_called_once_with(
[AGENT_BOT_API_SCOPE],
assertion="t2-token",
user_object_id="agentic-user-id",
username=None,
data={"requested_token_use": "on_behalf_of"},
)

first_call, second_call = mock_confidential_app.call_args_list
assert first_call.args == ("blueprint-client-id",)
assert first_call.kwargs == {
"client_credential": "blueprint-client-secret",
"authority": "https://login.microsoftonline.com/tenant-id",
}
assert second_call.args == ("agentic-app-id",)
assert second_call.kwargs["authority"] == "https://login.microsoftonline.com/tenant-id"
assert callable(second_call.kwargs["client_credential"]["client_assertion"])

@pytest.mark.asyncio
async def test_get_agentic_token_caches_agent_identity_client(self):
mock_credentials = ClientCredentials(
client_id="blueprint-client-id",
client_secret="blueprint-client-secret",
tenant_id="tenant-id",
)

blueprint_app = MagicMock()
blueprint_app.acquire_token_for_client.return_value = {"access_token": "t1-token"}

agent_app = MagicMock()
agent_app.acquire_token_for_client.return_value = {"access_token": "t2-token"}
agent_app.acquire_token_by_user_federated_identity_credential.return_value = {"access_token": VALID_TEST_TOKEN}

with patch("microsoft_teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app:
mock_confidential_app.side_effect = [blueprint_app, agent_app]

manager = TokenManager(credentials=mock_credentials)
await manager.get_agentic_token(
"tenant-id", "agentic-app-id", AGENT_BOT_API_SCOPE, agentic_user_id="agentic-user-id"
)
await manager.get_agentic_token(
"tenant-id", "agentic-app-id", AGENT_BOT_API_SCOPE, agentic_user_id="agentic-user-id"
)

assert mock_confidential_app.call_count == 2

@pytest.mark.asyncio
async def test_get_bot_token_success(self):
"""Test successful bot token retrieval using MSAL."""
Expand Down
21 changes: 18 additions & 3 deletions stubs/msal/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Type stubs for msal"""

from typing import Any, Callable, TypeAlias
from typing import Any, Callable, Optional, TypeAlias

ClientCredential: TypeAlias = str | dict[str, str | Callable[[], str]] | None
ClientCredential: TypeAlias = str | dict[str, str | Callable[[], str] | Callable[[dict[str, Any]], str]] | None

class ConfidentialClientApplication:
"""MSAL Confidential Client Application"""
Expand All @@ -16,7 +16,22 @@ class ConfidentialClientApplication:
**kwargs: Any,
) -> None: ...
def acquire_token_for_client(
self, scopes: list[str] | str, claims_challenge: str | None = None, **kwargs: Any
self,
scopes: list[str] | str,
claims_challenge: str | None = None,
*,
fmi_path: str | None = None,
**kwargs: Any,
) -> dict[str, Any]: ...
def acquire_token_by_user_federated_identity_credential(
self,
scopes: list[str],
assertion: str | Callable[[], str],
*,
username: Optional[str] = None,
user_object_id: Optional[str] = None,
claims_challenge: Optional[None] = None,
**kwargs: Any,
) -> dict[str, Any]: ...

class SystemAssignedManagedIdentity:
Expand Down
Loading
Loading