feat(auth): Add support for client registration deletion

This commit is contained in:
Chris Coutinho
2025-10-24 18:54:24 +02:00
parent 72fce189d2
commit 8e0a4d8ce5
2 changed files with 100 additions and 45 deletions
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class ClientInfo:
"""Client registration information."""
"""Client registration information with RFC 7592 support."""
def __init__(
self,
@@ -23,12 +23,16 @@ class ClientInfo:
client_id_issued_at: int,
client_secret_expires_at: int,
redirect_uris: list[str],
registration_access_token: str | None = None,
registration_client_uri: str | None = None,
):
self.client_id = client_id
self.client_secret = client_secret
self.client_id_issued_at = client_id_issued_at
self.client_secret_expires_at = client_secret_expires_at
self.redirect_uris = redirect_uris
self.registration_access_token = registration_access_token
self.registration_client_uri = registration_client_uri
@property
def is_expired(self) -> bool:
@@ -42,13 +46,18 @@ class ClientInfo:
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for storage."""
return {
result = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"client_id_issued_at": self.client_id_issued_at,
"client_secret_expires_at": self.client_secret_expires_at,
"redirect_uris": self.redirect_uris,
}
if self.registration_access_token:
result["registration_access_token"] = self.registration_access_token
if self.registration_client_uri:
result["registration_client_uri"] = self.registration_client_uri
return result
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ClientInfo":
@@ -59,6 +68,8 @@ class ClientInfo:
client_id_issued_at=data["client_id_issued_at"],
client_secret_expires_at=data["client_secret_expires_at"],
redirect_uris=data["redirect_uris"],
registration_access_token=data.get("registration_access_token"),
registration_client_uri=data.get("registration_client_uri"),
)
@@ -125,6 +136,16 @@ async def register_client(
f"(in {client_info.get('client_secret_expires_at', 0) - int(time.time())} seconds)"
)
# Log if RFC 7592 fields are present
has_reg_token = "registration_access_token" in client_info
has_reg_uri = "registration_client_uri" in client_info
if has_reg_token and has_reg_uri:
logger.info(
"RFC 7592 management fields received - client deletion will be supported"
)
else:
logger.warning("RFC 7592 fields missing - client deletion may not work")
return ClientInfo(
client_id=client_info["client_id"],
client_secret=client_info["client_secret"],
@@ -135,6 +156,8 @@ async def register_client(
"client_secret_expires_at", int(time.time()) + 3600
),
redirect_uris=client_info.get("redirect_uris", redirect_uris),
registration_access_token=client_info.get("registration_access_token"),
registration_client_uri=client_info.get("registration_client_uri"),
)
except httpx.HTTPStatusError as e:
@@ -215,44 +238,65 @@ def save_client_to_file(client_info: ClientInfo, storage_path: Path):
async def delete_client(
nextcloud_url: str,
client_id: str,
client_secret: str,
registration_access_token: str | None = None,
client_secret: str | None = None,
registration_client_uri: str | None = None,
) -> bool:
"""
Delete a dynamically registered OAuth client using RFC 7592.
This implements RFC 7592 Section 2.3 (Client Delete Request).
The client authenticates using client_secret_post method and
requests deletion via DELETE to the client configuration endpoint.
Prefers Bearer token authentication (RFC 7592 standard) but falls back
to HTTP Basic Auth if registration_access_token is not available.
Args:
nextcloud_url: Base URL of the Nextcloud instance
client_id: Client identifier to delete
client_secret: Client secret for authentication
registration_access_token: RFC 7592 registration access token (preferred)
client_secret: Client secret for fallback HTTP Basic Auth
registration_client_uri: RFC 7592 client configuration URI (optional)
Returns:
True if deletion successful, False otherwise
Note:
Per RFC 7592, the deletion endpoint is:
{nextcloud_url}/apps/oidc/register/{client_id}
RFC 7592 deletion endpoint: {registration_client_uri} or {nextcloud_url}/apps/oidc/register/{client_id}
Authentication uses HTTP Basic Auth or client_secret_post:
- HTTP Basic Auth: client_id as username, client_secret as password
- client_secret_post: credentials in request body
Authentication methods (in order of preference):
1. Bearer token: Authorization: Bearer {registration_access_token} (RFC 7592 standard)
2. HTTP Basic Auth: client_id as username, client_secret as password (fallback)
"""
deletion_endpoint = f"{nextcloud_url}/apps/oidc/register/{client_id}"
# Determine deletion endpoint
if registration_client_uri:
deletion_endpoint = registration_client_uri
else:
deletion_endpoint = f"{nextcloud_url}/apps/oidc/register/{client_id}"
logger.info(f"Deleting OAuth client: {client_id[:16]}...")
logger.debug(f"Deletion endpoint: {deletion_endpoint}")
async with httpx.AsyncClient(timeout=30.0) as http_client:
try:
# RFC 7592 requires client authentication
# Use HTTP Basic Auth (client_id as username, client_secret as password)
response = await http_client.delete(
deletion_endpoint,
auth=(client_id, client_secret),
)
# Prefer RFC 7592 Bearer token authentication
if registration_access_token:
logger.debug("Using RFC 7592 Bearer token authentication")
response = await http_client.delete(
deletion_endpoint,
headers={"Authorization": f"Bearer {registration_access_token}"},
)
elif client_secret:
logger.debug(
"Falling back to HTTP Basic Auth (registration_access_token not available)"
)
response = await http_client.delete(
deletion_endpoint,
auth=(client_id, client_secret),
)
else:
logger.error(
"Cannot delete client: no registration_access_token or client_secret provided"
)
return False
# RFC 7592: Successful deletion returns 204 No Content
if response.status_code == 204:
+38 -27
View File
@@ -986,21 +986,21 @@ async def shared_oauth_client_credentials(anyio_backend, oauth_callback_server):
# Create opaque token client with allowed_scopes (not JWT)
# This ensures the token has proper scopes even though they're not embedded
client_id, client_secret = await _create_oauth_client_with_scopes(
client_info = await _create_oauth_client_with_scopes(
callback_url=callback_url,
client_name="Pytest - Shared Test Client (Opaque)",
allowed_scopes=DEFAULT_FULL_SCOPES,
token_type="Bearer", # Opaque tokens for port 8001
)
logger.info(f"Shared OAuth client ready: {client_id[:16]}...")
logger.info(f"Shared OAuth client ready: {client_info.client_id[:16]}...")
logger.info(
"This opaque token client with full scopes will be reused for all test user authentications"
)
yield (
client_id,
client_secret,
client_info.client_id,
client_info.client_secret,
callback_url,
token_endpoint,
authorization_endpoint,
@@ -1008,23 +1008,27 @@ async def shared_oauth_client_credentials(anyio_backend, oauth_callback_server):
# Cleanup: Delete OAuth client from Nextcloud using RFC 7592
try:
logger.info(f"Cleaning up shared OAuth client: {client_id[:16]}...")
logger.info(
f"Cleaning up shared OAuth client: {client_info.client_id[:16]}..."
)
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_id,
client_secret=client_secret,
client_id=client_info.client_id,
registration_access_token=client_info.registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=client_info.registration_client_uri,
)
if success:
logger.info(
f"Successfully deleted shared OAuth client: {client_id[:16]}..."
f"Successfully deleted shared OAuth client: {client_info.client_id[:16]}..."
)
else:
logger.warning(
f"Failed to delete shared OAuth client: {client_id[:16]}..."
f"Failed to delete shared OAuth client: {client_info.client_id[:16]}..."
)
except Exception as e:
logger.warning(
f"Error cleaning up shared OAuth client {client_id[:16]}...: {e}"
f"Error cleaning up shared OAuth client {client_info.client_id[:16]}...: {e}"
)
@@ -1070,21 +1074,21 @@ async def shared_jwt_oauth_client_credentials(anyio_backend, oauth_callback_serv
)
# Create JWT client with full scopes (all app read/write scopes)
client_id, client_secret = await _create_oauth_client_with_scopes(
client_info = await _create_oauth_client_with_scopes(
callback_url=callback_url,
client_name="Pytest - Shared JWT Test Client",
allowed_scopes=DEFAULT_FULL_SCOPES,
token_type="JWT", # Explicitly set JWT token type
)
logger.info(f"Shared JWT OAuth client ready: {client_id[:16]}...")
logger.info(f"Shared JWT OAuth client ready: {client_info.client_id[:16]}...")
logger.info(
"This JWT client with full scopes will be reused for JWT MCP server tests"
)
yield (
client_id,
client_secret,
client_info.client_id,
client_info.client_secret,
callback_url,
token_endpoint,
authorization_endpoint,
@@ -1092,23 +1096,27 @@ async def shared_jwt_oauth_client_credentials(anyio_backend, oauth_callback_serv
# Cleanup: Delete OAuth client from Nextcloud using RFC 7592
try:
logger.info(f"Cleaning up shared JWT OAuth client: {client_id[:16]}...")
logger.info(
f"Cleaning up shared JWT OAuth client: {client_info.client_id[:16]}..."
)
success = await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_id,
client_secret=client_secret,
client_id=client_info.client_id,
registration_access_token=client_info.registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=client_info.registration_client_uri,
)
if success:
logger.info(
f"Successfully deleted shared JWT OAuth client: {client_id[:16]}..."
f"Successfully deleted shared JWT OAuth client: {client_info.client_id[:16]}..."
)
else:
logger.warning(
f"Failed to delete shared JWT OAuth client: {client_id[:16]}..."
f"Failed to delete shared JWT OAuth client: {client_info.client_id[:16]}..."
)
except Exception as e:
logger.warning(
f"Error cleaning up shared JWT OAuth client {client_id[:16]}...: {e}"
f"Error cleaning up shared JWT OAuth client {client_info.client_id[:16]}...: {e}"
)
@@ -1117,7 +1125,7 @@ async def _create_oauth_client_with_scopes(
client_name: str,
allowed_scopes: str,
token_type: str = "JWT",
) -> tuple[str, str]:
):
"""
Helper function to create an OAuth client with specific allowed_scopes using DCR.
@@ -1128,7 +1136,7 @@ async def _create_oauth_client_with_scopes(
token_type: Either "JWT" or "Bearer" (default: "JWT")
Returns:
Tuple of (client_id, client_secret)
ClientInfo object with full registration details including registration_access_token
"""
from nextcloud_mcp_server.auth.client_registration import register_client
@@ -1162,14 +1170,17 @@ async def _create_oauth_client_with_scopes(
token_type=token_type,
)
client_id = client_info.client_id
client_secret = client_info.client_secret
logger.info(
f"Created OAuth client via DCR: {client_id[:16]}... with scopes: {allowed_scopes}"
f"Created OAuth client via DCR: {client_info.client_id[:16]}... with scopes: {allowed_scopes}"
)
if client_info.registration_access_token:
logger.info(
"RFC 7592 registration_access_token received - client can be deleted"
)
else:
logger.warning("No registration_access_token - client deletion may fail")
return client_id, client_secret
return client_info
@pytest.fixture(scope="session")