diff --git a/nextcloud_mcp_server/auth/client_registration.py b/nextcloud_mcp_server/auth/client_registration.py index b540827..4c8af78 100644 --- a/nextcloud_mcp_server/auth/client_registration.py +++ b/nextcloud_mcp_server/auth/client_registration.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class ClientInfo: - """Client registration information.""" + """Client registration information with RFC 7592 support.""" def __init__( self, @@ -23,12 +23,16 @@ class ClientInfo: client_id_issued_at: int, client_secret_expires_at: int, redirect_uris: list[str], + registration_access_token: str | None = None, + registration_client_uri: str | None = None, ): self.client_id = client_id self.client_secret = client_secret self.client_id_issued_at = client_id_issued_at self.client_secret_expires_at = client_secret_expires_at self.redirect_uris = redirect_uris + self.registration_access_token = registration_access_token + self.registration_client_uri = registration_client_uri @property def is_expired(self) -> bool: @@ -42,13 +46,18 @@ class ClientInfo: def to_dict(self) -> dict[str, Any]: """Convert to dictionary for storage.""" - return { + result = { "client_id": self.client_id, "client_secret": self.client_secret, "client_id_issued_at": self.client_id_issued_at, "client_secret_expires_at": self.client_secret_expires_at, "redirect_uris": self.redirect_uris, } + if self.registration_access_token: + result["registration_access_token"] = self.registration_access_token + if self.registration_client_uri: + result["registration_client_uri"] = self.registration_client_uri + return result @classmethod def from_dict(cls, data: dict[str, Any]) -> "ClientInfo": @@ -59,6 +68,8 @@ class ClientInfo: client_id_issued_at=data["client_id_issued_at"], client_secret_expires_at=data["client_secret_expires_at"], redirect_uris=data["redirect_uris"], + registration_access_token=data.get("registration_access_token"), + registration_client_uri=data.get("registration_client_uri"), ) @@ -125,6 +136,16 @@ async def register_client( f"(in {client_info.get('client_secret_expires_at', 0) - int(time.time())} seconds)" ) + # Log if RFC 7592 fields are present + has_reg_token = "registration_access_token" in client_info + has_reg_uri = "registration_client_uri" in client_info + if has_reg_token and has_reg_uri: + logger.info( + "RFC 7592 management fields received - client deletion will be supported" + ) + else: + logger.warning("RFC 7592 fields missing - client deletion may not work") + return ClientInfo( client_id=client_info["client_id"], client_secret=client_info["client_secret"], @@ -135,6 +156,8 @@ async def register_client( "client_secret_expires_at", int(time.time()) + 3600 ), redirect_uris=client_info.get("redirect_uris", redirect_uris), + registration_access_token=client_info.get("registration_access_token"), + registration_client_uri=client_info.get("registration_client_uri"), ) except httpx.HTTPStatusError as e: @@ -215,44 +238,65 @@ def save_client_to_file(client_info: ClientInfo, storage_path: Path): async def delete_client( nextcloud_url: str, client_id: str, - client_secret: str, + registration_access_token: str | None = None, + client_secret: str | None = None, + registration_client_uri: str | None = None, ) -> bool: """ Delete a dynamically registered OAuth client using RFC 7592. This implements RFC 7592 Section 2.3 (Client Delete Request). - The client authenticates using client_secret_post method and - requests deletion via DELETE to the client configuration endpoint. + Prefers Bearer token authentication (RFC 7592 standard) but falls back + to HTTP Basic Auth if registration_access_token is not available. Args: nextcloud_url: Base URL of the Nextcloud instance client_id: Client identifier to delete - client_secret: Client secret for authentication + registration_access_token: RFC 7592 registration access token (preferred) + client_secret: Client secret for fallback HTTP Basic Auth + registration_client_uri: RFC 7592 client configuration URI (optional) Returns: True if deletion successful, False otherwise Note: - Per RFC 7592, the deletion endpoint is: - {nextcloud_url}/apps/oidc/register/{client_id} + RFC 7592 deletion endpoint: {registration_client_uri} or {nextcloud_url}/apps/oidc/register/{client_id} - Authentication uses HTTP Basic Auth or client_secret_post: - - HTTP Basic Auth: client_id as username, client_secret as password - - client_secret_post: credentials in request body + Authentication methods (in order of preference): + 1. Bearer token: Authorization: Bearer {registration_access_token} (RFC 7592 standard) + 2. HTTP Basic Auth: client_id as username, client_secret as password (fallback) """ - deletion_endpoint = f"{nextcloud_url}/apps/oidc/register/{client_id}" + # Determine deletion endpoint + if registration_client_uri: + deletion_endpoint = registration_client_uri + else: + deletion_endpoint = f"{nextcloud_url}/apps/oidc/register/{client_id}" logger.info(f"Deleting OAuth client: {client_id[:16]}...") logger.debug(f"Deletion endpoint: {deletion_endpoint}") async with httpx.AsyncClient(timeout=30.0) as http_client: try: - # RFC 7592 requires client authentication - # Use HTTP Basic Auth (client_id as username, client_secret as password) - response = await http_client.delete( - deletion_endpoint, - auth=(client_id, client_secret), - ) + # Prefer RFC 7592 Bearer token authentication + if registration_access_token: + logger.debug("Using RFC 7592 Bearer token authentication") + response = await http_client.delete( + deletion_endpoint, + headers={"Authorization": f"Bearer {registration_access_token}"}, + ) + elif client_secret: + logger.debug( + "Falling back to HTTP Basic Auth (registration_access_token not available)" + ) + response = await http_client.delete( + deletion_endpoint, + auth=(client_id, client_secret), + ) + else: + logger.error( + "Cannot delete client: no registration_access_token or client_secret provided" + ) + return False # RFC 7592: Successful deletion returns 204 No Content if response.status_code == 204: diff --git a/tests/conftest.py b/tests/conftest.py index a245907..63ca105 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -986,21 +986,21 @@ async def shared_oauth_client_credentials(anyio_backend, oauth_callback_server): # Create opaque token client with allowed_scopes (not JWT) # This ensures the token has proper scopes even though they're not embedded - client_id, client_secret = await _create_oauth_client_with_scopes( + client_info = await _create_oauth_client_with_scopes( callback_url=callback_url, client_name="Pytest - Shared Test Client (Opaque)", allowed_scopes=DEFAULT_FULL_SCOPES, token_type="Bearer", # Opaque tokens for port 8001 ) - logger.info(f"Shared OAuth client ready: {client_id[:16]}...") + logger.info(f"Shared OAuth client ready: {client_info.client_id[:16]}...") logger.info( "This opaque token client with full scopes will be reused for all test user authentications" ) yield ( - client_id, - client_secret, + client_info.client_id, + client_info.client_secret, callback_url, token_endpoint, authorization_endpoint, @@ -1008,23 +1008,27 @@ async def shared_oauth_client_credentials(anyio_backend, oauth_callback_server): # Cleanup: Delete OAuth client from Nextcloud using RFC 7592 try: - logger.info(f"Cleaning up shared OAuth client: {client_id[:16]}...") + logger.info( + f"Cleaning up shared OAuth client: {client_info.client_id[:16]}..." + ) success = await delete_client( nextcloud_url=nextcloud_host, - client_id=client_id, - client_secret=client_secret, + client_id=client_info.client_id, + registration_access_token=client_info.registration_access_token, + client_secret=client_info.client_secret, + registration_client_uri=client_info.registration_client_uri, ) if success: logger.info( - f"Successfully deleted shared OAuth client: {client_id[:16]}..." + f"Successfully deleted shared OAuth client: {client_info.client_id[:16]}..." ) else: logger.warning( - f"Failed to delete shared OAuth client: {client_id[:16]}..." + f"Failed to delete shared OAuth client: {client_info.client_id[:16]}..." ) except Exception as e: logger.warning( - f"Error cleaning up shared OAuth client {client_id[:16]}...: {e}" + f"Error cleaning up shared OAuth client {client_info.client_id[:16]}...: {e}" ) @@ -1070,21 +1074,21 @@ async def shared_jwt_oauth_client_credentials(anyio_backend, oauth_callback_serv ) # Create JWT client with full scopes (all app read/write scopes) - client_id, client_secret = await _create_oauth_client_with_scopes( + client_info = await _create_oauth_client_with_scopes( callback_url=callback_url, client_name="Pytest - Shared JWT Test Client", allowed_scopes=DEFAULT_FULL_SCOPES, token_type="JWT", # Explicitly set JWT token type ) - logger.info(f"Shared JWT OAuth client ready: {client_id[:16]}...") + logger.info(f"Shared JWT OAuth client ready: {client_info.client_id[:16]}...") logger.info( "This JWT client with full scopes will be reused for JWT MCP server tests" ) yield ( - client_id, - client_secret, + client_info.client_id, + client_info.client_secret, callback_url, token_endpoint, authorization_endpoint, @@ -1092,23 +1096,27 @@ async def shared_jwt_oauth_client_credentials(anyio_backend, oauth_callback_serv # Cleanup: Delete OAuth client from Nextcloud using RFC 7592 try: - logger.info(f"Cleaning up shared JWT OAuth client: {client_id[:16]}...") + logger.info( + f"Cleaning up shared JWT OAuth client: {client_info.client_id[:16]}..." + ) success = await delete_client( nextcloud_url=nextcloud_host, - client_id=client_id, - client_secret=client_secret, + client_id=client_info.client_id, + registration_access_token=client_info.registration_access_token, + client_secret=client_info.client_secret, + registration_client_uri=client_info.registration_client_uri, ) if success: logger.info( - f"Successfully deleted shared JWT OAuth client: {client_id[:16]}..." + f"Successfully deleted shared JWT OAuth client: {client_info.client_id[:16]}..." ) else: logger.warning( - f"Failed to delete shared JWT OAuth client: {client_id[:16]}..." + f"Failed to delete shared JWT OAuth client: {client_info.client_id[:16]}..." ) except Exception as e: logger.warning( - f"Error cleaning up shared JWT OAuth client {client_id[:16]}...: {e}" + f"Error cleaning up shared JWT OAuth client {client_info.client_id[:16]}...: {e}" ) @@ -1117,7 +1125,7 @@ async def _create_oauth_client_with_scopes( client_name: str, allowed_scopes: str, token_type: str = "JWT", -) -> tuple[str, str]: +): """ Helper function to create an OAuth client with specific allowed_scopes using DCR. @@ -1128,7 +1136,7 @@ async def _create_oauth_client_with_scopes( token_type: Either "JWT" or "Bearer" (default: "JWT") Returns: - Tuple of (client_id, client_secret) + ClientInfo object with full registration details including registration_access_token """ from nextcloud_mcp_server.auth.client_registration import register_client @@ -1162,14 +1170,17 @@ async def _create_oauth_client_with_scopes( token_type=token_type, ) - client_id = client_info.client_id - client_secret = client_info.client_secret - logger.info( - f"Created OAuth client via DCR: {client_id[:16]}... with scopes: {allowed_scopes}" + f"Created OAuth client via DCR: {client_info.client_id[:16]}... with scopes: {allowed_scopes}" ) + if client_info.registration_access_token: + logger.info( + "RFC 7592 registration_access_token received - client can be deleted" + ) + else: + logger.warning("No registration_access_token - client deletion may fail") - return client_id, client_secret + return client_info @pytest.fixture(scope="session")