feat(auth): implement refresh token rotation for Nextcloud OIDC

Add support for one-time use refresh tokens with automatic rotation
to align with Nextcloud OIDC security model.

Changes:
- TokenBrokerService improvements:
  - Add user_id parameter to refresh methods
  - Detect and store rotated refresh tokens
  - Add offline_access scope to token requests
  - Handle refresh token rotation on every use
- Add management API endpoints:
  - /api/v1/webhooks (GET/POST) - List/create webhooks
  - /api/v1/webhooks/{id} (DELETE) - Delete webhook
  - /api/v1/search (POST) - Unified search
  - /api/v1/chunk-context (GET) - Get chunk context
  - /api/v1/apps (GET) - List installed apps
- Update tests for refresh token rotation
- Add --headed flag to pytest for Playwright debugging

Benefits:
- Aligns with Nextcloud OIDC one-time refresh token model
- Prevents refresh token invalidation after first use
- Enables long-lived background operations
- Provides full webhook lifecycle management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-12-15 21:48:49 +01:00
parent 656214b162
commit 5eec34c17e
4 changed files with 225 additions and 45 deletions
+21 -1
View File
@@ -1707,10 +1707,16 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Add management API endpoints for Nextcloud PHP app (OAuth mode only)
if oauth_enabled:
from nextcloud_mcp_server.api.management import (
create_webhook,
delete_webhook,
get_chunk_context,
get_installed_apps,
get_server_status,
get_user_session,
get_vector_sync_status,
list_webhooks,
revoke_user_access,
unified_search,
vector_search,
)
@@ -1739,9 +1745,23 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
routes.append(
Route("/api/v1/vector-viz/search", vector_search, methods=["POST"])
)
routes.append(
Route("/api/v1/chunk-context", get_chunk_context, methods=["GET"])
)
# ADR-018: Unified search endpoint for Nextcloud PHP app integration
routes.append(Route("/api/v1/search", unified_search, methods=["POST"]))
routes.append(Route("/api/v1/apps", get_installed_apps, methods=["GET"]))
# Webhook management endpoints
routes.append(Route("/api/v1/webhooks", list_webhooks, methods=["GET"]))
routes.append(Route("/api/v1/webhooks", create_webhook, methods=["POST"]))
routes.append(
Route("/api/v1/webhooks/{webhook_id}", delete_webhook, methods=["DELETE"])
)
logger.info(
"Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, "
"/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, /api/v1/vector-viz/search"
"/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, "
"/api/v1/vector-viz/search, /api/v1/search, /api/v1/apps, "
"/api/v1/webhooks"
)
# ADR-016: Add Smithery well-known config endpoint for container runtime discovery
+51 -8
View File
@@ -314,8 +314,9 @@ class TokenBrokerService:
refresh_token = refresh_data["refresh_token"]
# Get token with specific scopes for background operation
# Pass user_id to enable refresh token rotation storage
access_token, expires_in = await self._refresh_access_token_with_scopes(
refresh_token, required_scopes
refresh_token, required_scopes, user_id=user_id
)
# Cache the background token
@@ -335,7 +336,9 @@ class TokenBrokerService:
await self.cache.invalidate(cache_key)
return None
async def _refresh_access_token(self, refresh_token: str) -> Tuple[str, int]:
async def _refresh_access_token(
self, refresh_token: str, user_id: str | None = None
) -> Tuple[str, int]:
"""
Exchange refresh token for new access token.
@@ -343,6 +346,7 @@ class TokenBrokerService:
Args:
refresh_token: The refresh token
user_id: If provided, store the rotated refresh token for this user
Returns:
Tuple of (access_token, expires_in_seconds)
@@ -357,7 +361,7 @@ class TokenBrokerService:
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid profile email notes:read notes:write calendar:read calendar:write",
"scope": "openid profile email offline_access notes:read notes:write calendar:read calendar:write",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
@@ -378,22 +382,41 @@ class TokenBrokerService:
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600) # Default 1 hour
# Handle refresh token rotation (Nextcloud OIDC rotates on every use)
new_refresh_token = token_data.get("refresh_token")
if user_id and new_refresh_token and new_refresh_token != refresh_token:
# Calculate expiry as Unix timestamp (90 days from now)
expires_at = int(
(datetime.now(timezone.utc) + timedelta(days=90)).timestamp()
)
await self.storage.store_refresh_token(
user_id=user_id,
refresh_token=new_refresh_token,
expires_at=expires_at,
)
logger.info(f"Stored rotated refresh token for user {user_id}")
# Note: Nextcloud validates token audience on API calls - no need to pre-validate here
logger.info(f"Refreshed access token (expires in {expires_in}s)")
return access_token, expires_in
async def _refresh_access_token_with_scopes(
self, refresh_token: str, required_scopes: list[str]
self, refresh_token: str, required_scopes: list[str], user_id: str | None = None
) -> Tuple[str, int]:
"""
Exchange refresh token for new access token with specific scopes.
This method implements scope downscoping for least privilege.
IMPORTANT: Nextcloud OIDC rotates refresh tokens on every use (one-time use).
When user_id is provided, this method stores the new refresh token returned
by Nextcloud to ensure subsequent refresh operations succeed.
Args:
refresh_token: The refresh token
required_scopes: Minimal scopes needed for this operation
user_id: If provided, store the rotated refresh token for this user
Returns:
Tuple of (access_token, expires_in_seconds)
@@ -403,8 +426,10 @@ class TokenBrokerService:
client = await self._get_http_client()
# Always include basic OpenID scopes
scopes = list(set(["openid", "profile", "email"] + required_scopes))
# Always include basic OpenID scopes + offline_access to get new refresh token
scopes = list(
set(["openid", "profile", "email", "offline_access"] + required_scopes)
)
# Request new access token with specific scopes
# Include client credentials as required by most OAuth servers
@@ -437,6 +462,21 @@ class TokenBrokerService:
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600) # Default 1 hour
# Handle refresh token rotation (Nextcloud OIDC rotates on every use)
new_refresh_token = token_data.get("refresh_token")
if user_id and new_refresh_token and new_refresh_token != refresh_token:
# Store the new refresh token for future use
# Calculate expiry as Unix timestamp (90 days from now)
expires_at = int(
(datetime.now(timezone.utc) + timedelta(days=90)).timestamp()
)
await self.storage.store_refresh_token(
user_id=user_id,
refresh_token=new_refresh_token,
expires_at=expires_at,
)
logger.info(f"Stored rotated refresh token for user {user_id}")
# Note: Nextcloud validates token audience on API calls - no need to pre-validate here
logger.info(
@@ -523,11 +563,14 @@ class TokenBrokerService:
if new_refresh_token and new_refresh_token != current_refresh_token:
# storage.store_refresh_token() handles encryption internally
# Convert datetime to Unix timestamp (int) for database storage
expires_at = int(
(datetime.now(timezone.utc) + timedelta(days=90)).timestamp()
)
await self.storage.store_refresh_token(
user_id=user_id,
refresh_token=new_refresh_token,
expires_at=datetime.now(timezone.utc)
+ timedelta(days=90), # 90-day expiry
expires_at=expires_at,
)
logger.info(f"Rotated master refresh token for user {user_id}")
+1 -1
View File
@@ -63,7 +63,7 @@ Changelog = "https://github.com/cbcoutinho/nextcloud-mcp-server/blob/master/CHAN
[tool.pytest.ini_options]
anyio_mode = "auto"
addopts = "-p no:asyncio -x" # Disable pytest-asyncio plugin, use only anyio
addopts = "-p no:asyncio -x --headed" # Disable pytest-asyncio plugin, use only anyio
log_cli = 1
log_cli_level = "ERROR"
log_level = "ERROR"
+152 -35
View File
@@ -47,13 +47,14 @@ def mock_oidc_config():
@pytest.fixture
async def token_broker(mock_storage, encryption_key):
async def token_broker(mock_storage):
"""Create TokenBrokerService instance."""
broker = TokenBrokerService(
storage=mock_storage,
oidc_discovery_url="https://idp.example.com/.well-known/openid-configuration",
nextcloud_host="https://nextcloud.example.com",
encryption_key=encryption_key,
client_id="test_client_id",
client_secret="test_client_secret",
cache_ttl=300,
)
yield broker
@@ -143,14 +144,12 @@ class TestTokenBrokerService:
token_broker.storage.get_refresh_token.assert_not_called()
async def test_get_nextcloud_token_refresh(
self, token_broker, mock_storage, encryption_key, mock_oidc_config
self, token_broker, mock_storage, mock_oidc_config
):
"""Test getting token via refresh when not cached."""
# Setup encrypted refresh token in storage
fernet = Fernet(encryption_key.encode())
encrypted_token = fernet.encrypt(b"test_refresh_token").decode()
# Storage returns already-decrypted refresh token (encryption handled by storage layer)
mock_storage.get_refresh_token.return_value = {
"refresh_token": encrypted_token,
"refresh_token": "test_refresh_token",
"expires_at": datetime.now(timezone.utc) + timedelta(days=30),
}
@@ -187,14 +186,12 @@ class TestTokenBrokerService:
assert token is None
async def test_refresh_master_token(
self, token_broker, mock_storage, encryption_key, mock_oidc_config
self, token_broker, mock_storage, mock_oidc_config
):
"""Test master refresh token rotation."""
# Setup current refresh token
fernet = Fernet(encryption_key.encode())
encrypted_token = fernet.encrypt(b"current_refresh_token").decode()
# Storage returns already-decrypted refresh token
mock_storage.get_refresh_token.return_value = {
"refresh_token": encrypted_token,
"refresh_token": "current_refresh_token",
"expires_at": datetime.now(timezone.utc) + timedelta(days=30),
}
@@ -217,25 +214,19 @@ class TestTokenBrokerService:
success = await token_broker.refresh_master_token("user1")
assert success is True
# Verify new token was stored
# Verify new token was stored (storage handles encryption)
mock_storage.store_refresh_token.assert_called_once()
call_args = mock_storage.store_refresh_token.call_args[1]
assert call_args["user_id"] == "user1"
# Decrypt to verify it's the new token
stored_token = fernet.decrypt(
call_args["refresh_token"].encode()
).decode()
assert stored_token == "new_refresh_token"
assert call_args["refresh_token"] == "new_refresh_token"
async def test_refresh_master_token_no_rotation(
self, token_broker, mock_storage, encryption_key, mock_oidc_config
self, token_broker, mock_storage, mock_oidc_config
):
"""Test when IdP returns same refresh token (no rotation)."""
# Setup current refresh token
fernet = Fernet(encryption_key.encode())
encrypted_token = fernet.encrypt(b"same_refresh_token").decode()
# Storage returns already-decrypted refresh token
mock_storage.get_refresh_token.return_value = {
"refresh_token": encrypted_token,
"refresh_token": "same_refresh_token",
"expires_at": datetime.now(timezone.utc) + timedelta(days=30),
}
@@ -261,14 +252,12 @@ class TestTokenBrokerService:
mock_storage.store_refresh_token.assert_not_called()
async def test_revoke_nextcloud_access(
self, token_broker, mock_storage, encryption_key, mock_oidc_config
self, token_broker, mock_storage, mock_oidc_config
):
"""Test revoking Nextcloud access."""
# Setup refresh token for revocation
fernet = Fernet(encryption_key.encode())
encrypted_token = fernet.encrypt(b"token_to_revoke").decode()
# Storage returns already-decrypted refresh token
mock_storage.get_refresh_token.return_value = {
"refresh_token": encrypted_token,
"refresh_token": "token_to_revoke",
"expires_at": datetime.now(timezone.utc) + timedelta(days=30),
}
@@ -311,15 +300,11 @@ class TestTokenBrokerService:
with pytest.raises(ValueError, match="doesn't include wrong-audience"):
await token_broker._validate_token_audience(test_token, "wrong-audience")
async def test_token_refresh_with_network_error(
self, token_broker, mock_storage, encryption_key
):
async def test_token_refresh_with_network_error(self, token_broker, mock_storage):
"""Test handling network errors during token refresh."""
# Setup encrypted refresh token
fernet = Fernet(encryption_key.encode())
encrypted_token = fernet.encrypt(b"test_refresh_token").decode()
# Storage returns already-decrypted refresh token
mock_storage.get_refresh_token.return_value = {
"refresh_token": encrypted_token,
"refresh_token": "test_refresh_token",
"expires_at": datetime.now(timezone.utc) + timedelta(days=30),
}
@@ -351,3 +336,135 @@ class TestTokenBrokerService:
)
assert results == ["token1", "token2", "token1", "token2"]
class TestRefreshTokenRotation:
"""Test refresh token rotation handling.
Nextcloud OIDC rotates refresh tokens on every use (one-time use).
These tests verify that the token broker stores rotated tokens.
"""
async def test_refresh_access_token_stores_rotated_token(
self, mock_storage, mock_oidc_config
):
"""Verify that new refresh token is stored after successful refresh."""
broker = TokenBrokerService(
storage=mock_storage,
oidc_discovery_url="http://localhost:8080/.well-known/openid-configuration",
nextcloud_host="http://localhost:8080",
client_id="test_client_id",
client_secret="test_client_secret",
)
# Mock HTTP response with rotated refresh token
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"access_token": "new_access_token_abc",
"refresh_token": "new_refresh_token_456", # Rotated token
"expires_in": 900,
"token_type": "Bearer",
}
mock_http_client = AsyncMock()
mock_http_client.post.return_value = mock_response
with patch.object(broker, "_get_oidc_config", return_value=mock_oidc_config):
with patch.object(
broker, "_get_http_client", return_value=mock_http_client
):
(
access_token,
expires_in,
) = await broker._refresh_access_token_with_scopes(
refresh_token="old_refresh_token_123",
required_scopes=["notes:read"],
user_id="admin",
)
assert access_token == "new_access_token_abc"
assert expires_in == 900
# CRITICAL: Verify the new refresh token was stored
mock_storage.store_refresh_token.assert_called_once()
call_args = mock_storage.store_refresh_token.call_args
assert call_args.kwargs["user_id"] == "admin"
assert call_args.kwargs["refresh_token"] == "new_refresh_token_456"
await broker.close()
async def test_no_storage_when_refresh_token_unchanged(
self, mock_storage, mock_oidc_config
):
"""Verify storage is NOT called when refresh token is unchanged."""
broker = TokenBrokerService(
storage=mock_storage,
oidc_discovery_url="http://localhost:8080/.well-known/openid-configuration",
nextcloud_host="http://localhost:8080",
client_id="test_client_id",
client_secret="test_client_secret",
)
# Response returns SAME refresh token (no rotation)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"access_token": "new_access_token_abc",
"refresh_token": "same_refresh_token", # Same as input
"expires_in": 900,
}
mock_http_client = AsyncMock()
mock_http_client.post.return_value = mock_response
with patch.object(broker, "_get_oidc_config", return_value=mock_oidc_config):
with patch.object(
broker, "_get_http_client", return_value=mock_http_client
):
await broker._refresh_access_token_with_scopes(
refresh_token="same_refresh_token",
required_scopes=["notes:read"],
user_id="admin",
)
# Should NOT store since token didn't change
mock_storage.store_refresh_token.assert_not_called()
await broker.close()
async def test_no_storage_without_user_id(self, mock_storage, mock_oidc_config):
"""Verify storage is NOT called when user_id is None."""
broker = TokenBrokerService(
storage=mock_storage,
oidc_discovery_url="http://localhost:8080/.well-known/openid-configuration",
nextcloud_host="http://localhost:8080",
client_id="test_client_id",
client_secret="test_client_secret",
)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"access_token": "new_access_token_abc",
"refresh_token": "new_refresh_token_456",
"expires_in": 900,
}
mock_http_client = AsyncMock()
mock_http_client.post.return_value = mock_response
with patch.object(broker, "_get_oidc_config", return_value=mock_oidc_config):
with patch.object(
broker, "_get_http_client", return_value=mock_http_client
):
await broker._refresh_access_token_with_scopes(
refresh_token="old_token",
required_scopes=["notes:read"],
user_id=None, # No user_id
)
# Should NOT store since no user_id
mock_storage.store_refresh_token.assert_not_called()
await broker.close()