Files
nextcloud-mcp-server/tests/unit/test_unified_verifier.py
T
Chris Coutinho 9fab6cb550 feat: Implement ADR-005 unified token verifier to eliminate token passthrough vulnerability
Replace two non-compliant token verifiers (NextcloudTokenVerifier and
ProgressiveConsentTokenVerifier) with a single UnifiedTokenVerifier that properly
validates token audiences per MCP Security Best Practices specification.

The previous implementation had a critical security vulnerability where tokens
intended for the MCP server were passed directly to Nextcloud APIs without
proper audience validation (token passthrough anti-pattern). This violates
OAuth 2.0 security principles and the MCP specification.

Changes:
- Add UnifiedTokenVerifier supporting two compliant modes:
  * Multi-audience mode (default): Validates tokens contain BOTH MCP and
    Nextcloud audiences, enabling direct use without exchange
  * Token exchange mode (opt-in): Validates MCP audience only, exchanges
    for Nextcloud tokens via RFC 8693 with caching to minimize latency

- Remove token passthrough vulnerability from context.py and context_helper.py
- Implement token exchange caching (5-minute TTL default) to reduce network calls
- Add required environment variables for audience validation:
  * NEXTCLOUD_MCP_SERVER_URL - MCP server URL (used as audience)
  * NEXTCLOUD_RESOURCE_URI - Nextcloud resource identifier
  * TOKEN_EXCHANGE_CACHE_TTL - Cache TTL for exchanged tokens

- Update docker-compose.yml with resource URI configuration for both OAuth modes
- Add comprehensive test suite (29 tests) covering both authentication modes
- Remove legacy NextcloudTokenVerifier and ProgressiveConsentTokenVerifier

Security improvements:
- Eliminates token passthrough anti-pattern
- Enforces proper audience separation between MCP and Nextcloud
- Complies with MCP Security Best Practices and RFC 8707/8693
- Maintains performance with token exchange caching

Test results: 65/65 unit tests passed, 5/5 smoke tests passed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 18:53:14 +01:00

519 lines
18 KiB
Python

"""
Unit tests for UnifiedTokenVerifier (ADR-005).
Tests token audience validation for both multi-audience and token exchange modes
without requiring real network calls or IdP connections.
"""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import jwt
import pytest
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.config import Settings
pytestmark = pytest.mark.unit
@pytest.fixture
def base_settings():
"""Create base settings for testing."""
return Settings(
oidc_client_id="test-client-id",
oidc_client_secret="test-client-secret",
oidc_issuer="https://idp.example.com",
nextcloud_host="https://nextcloud.example.com",
nextcloud_mcp_server_url="http://localhost:8000",
nextcloud_resource_uri="http://localhost:8080",
jwks_uri="https://idp.example.com/jwks",
introspection_uri="https://idp.example.com/introspect",
enable_token_exchange=False, # Multi-audience mode
token_exchange_cache_ttl=300,
)
@pytest.fixture
def exchange_settings(base_settings):
"""Create settings for token exchange mode."""
base_settings.enable_token_exchange = True
return base_settings
class TestUnifiedTokenVerifierInit:
"""Test UnifiedTokenVerifier initialization."""
def test_init_multi_audience_mode(self, base_settings):
"""Test verifier initialization in multi-audience mode."""
verifier = UnifiedTokenVerifier(base_settings)
assert verifier.mode == "multi-audience"
assert verifier.settings == base_settings
def test_init_exchange_mode(self, exchange_settings):
"""Test verifier initialization in token exchange mode."""
verifier = UnifiedTokenVerifier(exchange_settings)
assert verifier.mode == "exchange"
assert verifier.settings == exchange_settings
class TestAudienceValidation:
"""Test audience validation logic."""
def test_validate_multi_audience_both_present(self, base_settings):
"""Test multi-audience validation with both audiences present."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._validate_multi_audience(payload) is True
def test_validate_multi_audience_server_url_and_resource(self, base_settings):
"""Test multi-audience validation with server URL instead of client ID."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["http://localhost:8000", "http://localhost:8080"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._validate_multi_audience(payload) is True
def test_validate_multi_audience_missing_mcp(self, base_settings):
"""Test multi-audience validation fails without MCP audience."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["http://localhost:8080"], # Only Nextcloud
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._validate_multi_audience(payload) is False
def test_validate_multi_audience_missing_nextcloud(self, base_settings):
"""Test multi-audience validation fails without Nextcloud audience."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["test-client-id"], # Only MCP
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._validate_multi_audience(payload) is False
def test_validate_multi_audience_string_audience(self, base_settings):
"""Test multi-audience validation with string audience (should still work)."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": "test-client-id", # Single audience as string
"sub": "testuser",
"exp": int(time.time() + 3600),
}
# Should fail - needs both audiences
assert verifier._validate_multi_audience(payload) is False
def test_has_mcp_audience_with_client_id(self, exchange_settings):
"""Test MCP audience validation with client ID."""
verifier = UnifiedTokenVerifier(exchange_settings)
payload = {
"aud": ["test-client-id"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is True
def test_has_mcp_audience_with_server_url(self, exchange_settings):
"""Test MCP audience validation with server URL."""
verifier = UnifiedTokenVerifier(exchange_settings)
payload = {
"aud": ["http://localhost:8000"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is True
def test_has_mcp_audience_missing(self, exchange_settings):
"""Test MCP audience validation fails without MCP audience."""
verifier = UnifiedTokenVerifier(exchange_settings)
payload = {
"aud": ["http://localhost:8080"], # Wrong audience
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is False
class TestTokenFormatDetection:
"""Test JWT format detection."""
def test_is_jwt_format_valid(self, base_settings):
"""Test JWT format detection with valid JWT."""
verifier = UnifiedTokenVerifier(base_settings)
jwt_token = "eyJhbGc.eyJzdWI.signature"
assert verifier._is_jwt_format(jwt_token) is True
def test_is_jwt_format_opaque(self, base_settings):
"""Test JWT format detection with opaque token."""
verifier = UnifiedTokenVerifier(base_settings)
opaque_token = "opaque-token-12345"
assert verifier._is_jwt_format(opaque_token) is False
class TestTokenCaching:
"""Test token caching functionality."""
async def test_cache_stores_and_retrieves(self, base_settings):
"""Test token caching stores and retrieves tokens."""
verifier = UnifiedTokenVerifier(base_settings)
# Create a valid access token
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
test_token = jwt.encode(payload, "secret", algorithm="HS256")
# Create AccessToken and cache it
access_token = verifier._create_access_token(test_token, payload)
assert access_token is not None
# Should retrieve from cache
cached = verifier._get_cached_token(test_token)
assert cached is not None
assert cached.resource == "testuser"
assert cached.scopes == ["openid", "profile"]
async def test_cache_respects_expiry(self, base_settings):
"""Test that expired tokens are not returned from cache."""
verifier = UnifiedTokenVerifier(base_settings)
# Create expired token payload
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"scope": "openid profile",
"exp": int(time.time() - 100), # Expired 100 seconds ago
"client_id": "test-client-id",
}
test_token = jwt.encode(payload, "secret", algorithm="HS256")
# Create and cache
access_token = verifier._create_access_token(test_token, payload)
assert access_token is not None
# Should not retrieve expired token
cached = verifier._get_cached_token(test_token)
assert cached is None
async def test_cache_clear(self, base_settings):
"""Test cache clearing."""
verifier = UnifiedTokenVerifier(base_settings)
# Create and cache token
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
test_token = jwt.encode(payload, "secret", algorithm="HS256")
verifier._create_access_token(test_token, payload)
# Clear cache
verifier.clear_cache()
# Should not retrieve after clear
cached = verifier._get_cached_token(test_token)
assert cached is None
class TestMultiAudienceVerification:
"""Test multi-audience token verification."""
async def test_verify_multi_audience_with_introspection(self, base_settings):
"""Test multi-audience verification using introspection."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock introspection response
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id", "http://localhost:8080"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_multi_audience_token(opaque_token)
assert result is not None
assert result.resource == "testuser"
assert result.scopes == ["openid", "profile"]
async def test_verify_multi_audience_fails_without_both_audiences(
self, base_settings
):
"""Test multi-audience verification fails without both audiences."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock introspection response with only one audience
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id"], # Missing Nextcloud audience
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_multi_audience_token(opaque_token)
assert result is None
class TestExchangeModeVerification:
"""Test token exchange mode verification."""
async def test_verify_mcp_audience_only_success(self, exchange_settings):
"""Test MCP-only audience verification succeeds with MCP audience."""
verifier = UnifiedTokenVerifier(exchange_settings)
# Mock introspection response with MCP audience only
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_mcp_audience_only(opaque_token)
assert result is not None
assert result.resource == "testuser"
async def test_verify_mcp_audience_only_fails_without_mcp(self, exchange_settings):
"""Test MCP-only audience verification fails without MCP audience."""
verifier = UnifiedTokenVerifier(exchange_settings)
# Mock introspection response without MCP audience
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["http://localhost:8080"], # Wrong audience
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_mcp_audience_only(opaque_token)
assert result is None
class TestIntrospection:
"""Test token introspection."""
async def test_introspect_active_token(self, base_settings):
"""Test introspection of active token."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock HTTP response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id", "http://localhost:8080"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
verifier.http_client.post = AsyncMock(return_value=mock_response)
result = await verifier._introspect_token("test-token")
assert result is not None
assert result["active"] is True
assert result["sub"] == "testuser"
async def test_introspect_inactive_token(self, base_settings):
"""Test introspection of inactive token."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock HTTP response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"active": False}
verifier.http_client.post = AsyncMock(return_value=mock_response)
result = await verifier._introspect_token("test-token")
assert result is None
async def test_introspect_without_endpoint(self, base_settings):
"""Test introspection when endpoint not configured."""
base_settings.introspection_uri = None
verifier = UnifiedTokenVerifier(base_settings)
result = await verifier._introspect_token("test-token")
assert result is None
class TestAccessTokenCreation:
"""Test AccessToken object creation."""
def test_create_access_token_success(self, base_settings):
"""Test successful AccessToken creation."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"sub": "testuser",
"scope": "openid profile email",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is not None
assert result.token == token
assert result.resource == "testuser"
assert result.scopes == ["openid", "profile", "email"]
assert result.client_id == "test-client-id"
def test_create_access_token_with_preferred_username(self, base_settings):
"""Test AccessToken creation with preferred_username fallback."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"preferred_username": "testuser", # No 'sub' claim
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is not None
assert result.resource == "testuser"
def test_create_access_token_no_username(self, base_settings):
"""Test AccessToken creation fails without username."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
# No sub or preferred_username
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is None
def test_create_access_token_no_expiry(self, base_settings):
"""Test AccessToken creation uses default TTL without expiry."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"sub": "testuser",
"scope": "openid profile",
# No exp claim
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is not None
# Should have set a default expiry
assert result.expires_at > int(time.time())
class TestVerifyTokenFlow:
"""Test complete verify_token flow."""
async def test_verify_token_from_cache(self, base_settings):
"""Test verify_token returns cached token."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
token = jwt.encode(payload, "secret", algorithm="HS256")
# First call - should cache
result1 = verifier._create_access_token(token, payload)
assert result1 is not None
# Mock _verify_multi_audience_token to ensure it's not called
with patch.object(verifier, "_verify_multi_audience_token") as mock_verify:
result2 = await verifier.verify_token(token)
assert result2 is not None
assert result2.resource == "testuser"
# Should not call verification since it's cached
mock_verify.assert_not_called()
async def test_verify_token_multi_audience_mode(self, base_settings):
"""Test verify_token in multi-audience mode."""
verifier = UnifiedTokenVerifier(base_settings)
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id", "http://localhost:8080"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
result = await verifier.verify_token("opaque-token")
assert result is not None
assert result.resource == "testuser"
async def test_verify_token_exchange_mode(self, exchange_settings):
"""Test verify_token in exchange mode."""
verifier = UnifiedTokenVerifier(exchange_settings)
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id"], # MCP audience only
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
result = await verifier.verify_token("opaque-token")
assert result is not None
assert result.resource == "testuser"