Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 156 additions & 1 deletion packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com).
Copyright (c) 2025-2026, WSO2 LLC. (https://www.wso2.com).
WSO2 LLC. licenses this file to you under the Apache License,
Version 2.0 (the "License"); you may not use this file except
in compliance with the License.
Expand Down Expand Up @@ -228,6 +228,126 @@ def get_authorization_url_with_pkce(
)
return auth_url, state, code_verifier

def get_org_authorization_url(
self,
scopes: List[str],
org_discovery_type: str,
value: str,
state: Optional[str] = None,
resource: Optional[str] = None,
**kwargs: Any,
) -> Tuple[str, str]:
"""Generate authorization URL for organization-specific user authentication.

:param scopes: List of OAuth scopes to request
Comment thread
HasiniSama marked this conversation as resolved.
Outdated
:param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain')
:param value: The value for the discovery type
Comment thread
HasiniSama marked this conversation as resolved.
Outdated
:param state: Optional state parameter (generated if not provided)
:param resource: Optional resource parameter
:param kwargs: Additional parameters for the authorization URL
:return: Tuple of (authorization_url, state)
"""
if not state:
state = generate_state()

auth_params = {
"client_id": self.config.client_id,
"redirect_uri": self.config.redirect_uri,
"scope": " ".join(scopes),
"state": state,
"response_type": "code",
"fidp": "OrganizationSSO",
}

# Switch case to handle each discovery type
if org_discovery_type == "orgID":
auth_params["orgId"] = value
elif org_discovery_type == "orgHandle":
auth_params["orgHandle"] = value
elif org_discovery_type == "org":
auth_params["org"] = value
elif org_discovery_type == "emailDomain":
auth_params["login_hint"] = value
auth_params["orgDiscoveryType"] = "emailDomain"
else:
raise ValueError(f"Unsupported org_discovery_type: {org_discovery_type}")
Comment thread
HasiniSama marked this conversation as resolved.
Outdated
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

if resource:
auth_params["resource"] = resource

if self.agent_config:
auth_params["requested_actor"] = self.agent_config.agent_id

auth_params.update(kwargs)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

auth_url = build_authorization_url(
f"{self.config.base_url}/oauth2/authorize",
auth_params
)
return auth_url, state

def get_org_authorization_url_with_pkce(
self,
scopes: List[str],
org_discovery_type: str,
value: str,
state: Optional[str] = None,
resource: Optional[str] = None,
**kwargs: Any,
) -> Tuple[str, str, str]:
"""Generate authorization URL for organization-specific user authentication with PKCE.

:param scopes: List of OAuth scopes to request
:param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain')
:param value: The value for the discovery type
:param state: Optional state parameter (generated if not provided)
:param resource: Optional resource parameter
:param kwargs: Additional parameters for the authorization URL
:return: Tuple of (authorization_url, state, code_verifier)
"""
if not state:
state = generate_state()

code_verifier, code_challenge = generate_pkce_pair()

auth_params = {
"client_id": self.config.client_id,
"redirect_uri": self.config.redirect_uri,
"scope": " ".join(scopes),
"state": state,
"response_type": "code",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"fidp": "OrganizationSSO",
Comment thread
HasiniSama marked this conversation as resolved.
Outdated
}

# Switch case to handle each discovery type
if org_discovery_type == "orgID":
auth_params["orgId"] = value
elif org_discovery_type == "orgHandle":
auth_params["orgHandle"] = value
elif org_discovery_type == "org":
auth_params["org"] = value
elif org_discovery_type == "emailDomain":
auth_params["login_hint"] = value
auth_params["orgDiscoveryType"] = "emailDomain"
else:
raise ValueError(f"Unsupported org_discovery_type: {org_discovery_type}")

if resource:
auth_params["resource"] = resource

if self.agent_config:
auth_params["requested_actor"] = self.agent_config.agent_id

auth_params.update(kwargs)

auth_url = build_authorization_url(
f"{self.config.base_url}/oauth2/authorize",
auth_params
)
return auth_url, state, code_verifier

async def get_obo_token(
self,
auth_code: str,
Expand Down Expand Up @@ -388,6 +508,41 @@ async def get_obo_token_with_ciba(
logger.error(f"CIBA OBO token exchange failed: {e}")
raise TokenError(f"CIBA OBO token exchange failed: {e}")

async def switch_token_to_organization(
self,
token: str,
switching_organization: str,
scopes: Optional[List[str]] = None
) -> OAuthToken:
"""Switch token to a sub-organization.

:param token: The current access token to be switched.
:param switching_organization: The ID or UUID of the target organization.
:param scopes: Optional list of scopes to request.
:return: OAuth token for the switched organization.
"""
if not token:
raise ValidationError("Token is required for organization switch.")
if not switching_organization:
raise ValidationError("switching_organization is required.")

scope_str = ' '.join(scopes) if scopes else "add"

try:
switched_token = await self.token_client.get_token(
'organization_switch',
token=token,
switching_organization=switching_organization,
scope=scope_str
)
return switched_token

except (TokenError, ValidationError):
raise
except Exception as e:
logger.error(f"Organization switch failed: {e}")
raise TokenError(f"Organization switch failed: {e}")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

async def revoke_token(
self,
token: str,
Expand Down
12 changes: 12 additions & 0 deletions packages/asgardeo/src/asgardeo/auth/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken:
scope = kwargs.get("scope")
if scope:
data["scope"] = scope
elif grant_type == "organization_switch":
token = kwargs.get("token")
switching_organization = kwargs.get("switching_organization")
if not token or not switching_organization:
raise ValidationError(
"token and switching_organization are required for 'organization_switch' grant type.",
)
data["token"] = token
data["switching_organization"] = switching_organization
scope = kwargs.get("scope")
if scope:
data["scope"] = scope
else:
raise ValidationError(f"Unsupported grant type: {grant_type}")

Expand Down
Loading