Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/agent365/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# agent365

Acquire an Agent 365 agent user token using an agent identity blueprint, an agent identity app ID, and an agent user.

## Run

Set these environment variables or add them to `.env`:

```bash
AGENT365_TENANT_ID=<tenant-id>
AGENT365_BLUEPRINT_CLIENT_ID=<agent-identity-blueprint-app-id>
AGENT365_BLUEPRINT_CLIENT_SECRET=<agent-identity-blueprint-secret>
AGENT365_AGENT_IDENTITY_APP_ID=<agent-identity-app-id>
AGENT365_AGENT_USER_ID=<agent-user-object-id>
```
Comment thread
lilyydu marked this conversation as resolved.

Then run:

```bash
uv run --project examples/agent365 python src/main.py
```

By default this requests a Teams bot API token for `https://botapi.skype.com/.default`.

To request another resource, set `AGENT365_SCOPE`, for example:

```bash
AGENT365_SCOPE=https://graph.microsoft.com/.default
```

You can use `AGENT365_AGENT_USER_UPN` instead of `AGENT365_AGENT_USER_ID`.
13 changes: 13 additions & 0 deletions examples/agent365/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[project]
name = "agent365"
version = "0.1.0"
description = "Agent 365 token example"
readme = "README.md"
requires-python = ">=3.11,<4.0"
dependencies = [
"dotenv>=0.9.9",
"microsoft-teams-apps",
]

[tool.uv.sources]
microsoft-teams-apps = { workspace = true }
53 changes: 53 additions & 0 deletions examples/agent365/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.
"""

import asyncio
import os

from dotenv import load_dotenv
from microsoft_teams.api import ClientCredentials
from microsoft_teams.apps.token_manager import AGENT_BOT_API_SCOPE, TokenManager


def get_required_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"{name} must be set")

return value


async def main():
load_dotenv()

tenant_id = get_required_env("AGENT365_TENANT_ID")
blueprint_client_id = get_required_env("AGENT365_BLUEPRINT_CLIENT_ID")
blueprint_client_secret = get_required_env("AGENT365_BLUEPRINT_CLIENT_SECRET")
agent_identity_app_id = get_required_env("AGENT365_AGENT_IDENTITY_APP_ID")
agent_user_id = os.getenv("AGENT365_AGENT_USER_ID")
agent_user_upn = os.getenv("AGENT365_AGENT_USER_UPN")
scope = os.getenv("AGENT365_SCOPE", AGENT_BOT_API_SCOPE)

credentials = ClientCredentials(
client_id=blueprint_client_id,
client_secret=blueprint_client_secret,
tenant_id=tenant_id,
)
token_manager = TokenManager(credentials=credentials)

token = await token_manager.get_agent_user_token(
tenant_id,
agent_identity_app_id,
scope,
agent_user_id=agent_user_id,
agent_user_upn=agent_user_upn,
)

print(f"Acquired agent user token for {scope}")
print(f"Token preview: {str(token)[:20]}...")


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion packages/apps/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
"cryptography>=3.4.0",
"pyjwt[crypto]>=2.12.0",
"dependency-injector>=4.48.1",
"msal>=1.33.0",
"msal>=1.37.0",
"python-dotenv>=1.0.0",
"pydantic-settings>=2.11.0",
]
Expand Down
106 changes: 105 additions & 1 deletion packages/apps/src/microsoft_teams/apps/token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import asyncio
import logging
from inspect import isawaitable
from typing import Any, Optional
from typing import Any, Callable, Optional

import requests
from microsoft_teams.api import (
Expand All @@ -29,6 +29,8 @@
)

DEFAULT_TENANT_FOR_GRAPH_TOKEN = "common"
TOKEN_EXCHANGE_SCOPE = "api://AzureADTokenExchange/.default"
AGENT_BOT_API_SCOPE = "https://botapi.skype.com/.default"

logger = logging.getLogger(__name__)

Expand All @@ -45,6 +47,7 @@ def __init__(
self._cloud = cloud or PUBLIC
self._confidential_clients_by_tenant: dict[str, ConfidentialClientApplication] = {}
self._federated_identity_clients_by_tenant: dict[str, ConfidentialClientApplication] = {}
self._agent_identity_clients_by_tenant_and_app_id: dict[tuple[str, str], ConfidentialClientApplication] = {}
self._managed_identity_client: Optional[ManagedIdentityClient] = None

async def get_bot_token(self) -> Optional[TokenProtocol]:
Expand All @@ -68,6 +71,89 @@ async def get_graph_token(self, tenant_id: Optional[str] = None) -> Optional[Tok
self._cloud.graph_scope, tenant_id=self._resolve_tenant_id(tenant_id, DEFAULT_TENANT_FOR_GRAPH_TOKEN)
)

async def get_agent_bot_token(
self,
tenant_id: str,
agent_identity_app_id: str,
Comment thread
heyitsaamir marked this conversation as resolved.
Outdated
*,
agent_user_id: str | None = None,
agent_user_upn: str | None = None,
Comment thread
heyitsaamir marked this conversation as resolved.
Outdated
Comment thread
heyitsaamir marked this conversation as resolved.
Outdated
caller_name: str | None = None,
) -> Optional[TokenProtocol]:
"""Get a Teams bot API token for an agent identity acting through its agent user."""
return await self.get_agent_user_token(
tenant_id,
agent_identity_app_id,
AGENT_BOT_API_SCOPE,
agent_user_id=agent_user_id,
agent_user_upn=agent_user_upn,
caller_name=caller_name,
)

async def get_agent_user_token(
self,
tenant_id: str,
agent_identity_app_id: str,
scope: str,
*,
agent_user_id: str | None = None,
agent_user_upn: str | None = None,
Comment thread
heyitsaamir marked this conversation as resolved.
Outdated
caller_name: str | None = None,
) -> Optional[TokenProtocol]:
"""Get a resource token for an agent identity acting through its agent user."""
if not agent_user_id and not agent_user_upn:
raise ValueError("Either agent_user_id or agent_user_upn must be provided")
if agent_user_id and agent_user_upn:
raise ValueError("agent_user_id and agent_user_upn are mutually exclusive")
if self._credentials is None:
if caller_name:
logger.debug(f"No credentials provided for {caller_name}")
return None

credentials = self._credentials
if not isinstance(credentials, ClientCredentials):
raise ValueError("Agent user tokens require ClientCredentials")
confidential_client = self._get_confidential_client(credentials, tenant_id)

def get_t1_assertion(_context: dict[str, Any]) -> str:
t1_raw: dict[str, Any] = confidential_client.acquire_token_for_client(
[TOKEN_EXCHANGE_SCOPE], fmi_path=agent_identity_app_id
)
return self._get_access_token_or_raise(t1_raw, "Agent token exchange step 1 failed")

# The agent identity app needs its own MSAL client. It uses the Federated Managed
# Identity assertion from step 1 as its client assertion for the next exchanges.
t2_confidential_client = self._get_agent_identity_client(
Comment thread
heyitsaamir marked this conversation as resolved.
tenant_id,
agent_identity_app_id,
get_t1_assertion,
)

t2_raw: dict[str, Any] = await asyncio.to_thread(
Comment thread
heyitsaamir marked this conversation as resolved.
lambda: t2_confidential_client.acquire_token_for_client([TOKEN_EXCHANGE_SCOPE])
)

t2 = self._get_access_token_or_raise(t2_raw, "Agent token exchange step 2 failed")

t3_raw: dict[str, Any] = await asyncio.to_thread(
lambda: t2_confidential_client.acquire_token_by_user_federated_identity_credential(
[scope],
assertion=t2,
user_object_id=agent_user_id,
username=agent_user_upn,
data={"requested_token_use": "on_behalf_of"},
)
)
return self._handle_token_response(t3_raw, caller_name or "get_agent_user_token")

def _get_access_token_or_raise(self, token_res: dict[str, Any], error_prefix: str) -> str:
if token_res.get("access_token", None):
return token_res["access_token"]

error_description = token_res.get("error_description") or token_res.get("error") or "Could not acquire token"
logger.error(f"{error_prefix}: {error_description}")
raise ValueError(f"{error_prefix}: {error_description}")

async def _get_token(
self, scope: str, tenant_id: str, *, caller_name: str | None = None
) -> Optional[TokenProtocol]:
Expand Down Expand Up @@ -220,6 +306,24 @@ def _get_federated_identity_client(
self._federated_identity_clients_by_tenant[tenant_id] = client
return client

def _get_agent_identity_client(
self,
tenant_id: str,
agent_identity_app_id: str,
client_assertion: Callable[[dict[str, Any]], str],
) -> ConfidentialClientApplication:
cached_client = self._agent_identity_clients_by_tenant_and_app_id.get((tenant_id, agent_identity_app_id))
if cached_client:
return cached_client

client: ConfidentialClientApplication = ConfidentialClientApplication(
agent_identity_app_id,
client_credential={"client_assertion": client_assertion},
authority=f"{self._cloud.login_endpoint}/{tenant_id}",
)
self._agent_identity_clients_by_tenant_and_app_id[(tenant_id, agent_identity_app_id)] = client
return client

def _get_managed_identity_client(
self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials
) -> ManagedIdentityClient:
Expand Down
21 changes: 18 additions & 3 deletions stubs/msal/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Type stubs for msal"""

from typing import Any, Callable, TypeAlias
from typing import Any, Callable, Optional, TypeAlias

ClientCredential: TypeAlias = str | dict[str, str | Callable[[], str]] | None
ClientCredential: TypeAlias = str | dict[str, str | Callable[[], str] | Callable[[dict[str, Any]], str]] | None

class ConfidentialClientApplication:
"""MSAL Confidential Client Application"""
Expand All @@ -16,7 +16,22 @@ class ConfidentialClientApplication:
**kwargs: Any,
) -> None: ...
def acquire_token_for_client(
self, scopes: list[str] | str, claims_challenge: str | None = None, **kwargs: Any
self,
scopes: list[str] | str,
claims_challenge: str | None = None,
*,
fmi_path: str | None = None,
**kwargs: Any,
) -> dict[str, Any]: ...
def acquire_token_by_user_federated_identity_credential(
self,
scopes: list[str],
assertion: str | Callable[[], str],
*,
username: Optional[str] = None,
user_object_id: Optional[str] = None,
claims_challenge: Optional[None] = None,
**kwargs: Any,
) -> dict[str, Any]: ...

class SystemAssignedManagedIdentity:
Expand Down
24 changes: 20 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading