bdb1ba2051
Since both multi-audience and exchange modes now validate the same thing (MCP audience only per RFC 7519), consolidated the duplicate methods: - Removed duplicate verification methods (_verify_multi_audience_token and _verify_mcp_audience_only) - Created single _verify_mcp_audience() method for all validation - Removed duplicate helper (_validate_multi_audience), kept _has_mcp_audience - Mode only affects logging and what happens AFTER verification The mode distinction is now purely about post-verification behavior: - Multi-audience mode: Use token directly (Nextcloud validates its own) - Exchange mode: Exchange for Nextcloud-audience token via RFC 8693 This makes the code cleaner and clearer about what's actually happening - both modes do identical validation, they just differ in how the validated token is used. All tests pass: unit (65), OAuth integration confirmed working. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
524 lines
19 KiB
Python
524 lines
19 KiB
Python
"""
|
|
Unit tests for UnifiedTokenVerifier (ADR-005).
|
|
|
|
Tests token audience validation for both multi-audience and token exchange modes
|
|
without requiring real network calls or IdP connections.
|
|
"""
|
|
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import jwt
|
|
import pytest
|
|
|
|
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
|
|
from nextcloud_mcp_server.config import Settings
|
|
|
|
pytestmark = pytest.mark.unit
|
|
|
|
|
|
@pytest.fixture
|
|
def base_settings():
|
|
"""Create base settings for testing."""
|
|
return Settings(
|
|
oidc_client_id="test-client-id",
|
|
oidc_client_secret="test-client-secret",
|
|
oidc_issuer="https://idp.example.com",
|
|
nextcloud_host="https://nextcloud.example.com",
|
|
nextcloud_mcp_server_url="http://localhost:8000",
|
|
nextcloud_resource_uri="http://localhost:8080",
|
|
jwks_uri="https://idp.example.com/jwks",
|
|
introspection_uri="https://idp.example.com/introspect",
|
|
enable_token_exchange=False, # Multi-audience mode
|
|
token_exchange_cache_ttl=300,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def exchange_settings(base_settings):
|
|
"""Create settings for token exchange mode."""
|
|
base_settings.enable_token_exchange = True
|
|
return base_settings
|
|
|
|
|
|
class TestUnifiedTokenVerifierInit:
|
|
"""Test UnifiedTokenVerifier initialization."""
|
|
|
|
def test_init_multi_audience_mode(self, base_settings):
|
|
"""Test verifier initialization in multi-audience mode."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
assert verifier.mode == "multi-audience"
|
|
assert verifier.settings == base_settings
|
|
|
|
def test_init_exchange_mode(self, exchange_settings):
|
|
"""Test verifier initialization in token exchange mode."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
assert verifier.mode == "exchange"
|
|
assert verifier.settings == exchange_settings
|
|
|
|
|
|
class TestAudienceValidation:
|
|
"""Test audience validation logic."""
|
|
|
|
def test_validate_multi_audience_both_present(self, base_settings):
|
|
"""Test MCP audience validation with both audiences present."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
payload = {
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
assert verifier._has_mcp_audience(payload) is True
|
|
|
|
def test_validate_multi_audience_server_url_and_resource(self, base_settings):
|
|
"""Test MCP audience validation with server URL instead of client ID."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
payload = {
|
|
"aud": ["http://localhost:8000", "http://localhost:8080"],
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
assert verifier._has_mcp_audience(payload) is True
|
|
|
|
def test_validate_multi_audience_missing_mcp(self, base_settings):
|
|
"""Test MCP audience validation fails without MCP audience."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
payload = {
|
|
"aud": ["http://localhost:8080"], # Only Nextcloud
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
assert verifier._has_mcp_audience(payload) is False
|
|
|
|
def test_validate_multi_audience_missing_nextcloud(self, base_settings):
|
|
"""Test MCP audience validation succeeds with only MCP audience (RFC 7519 compliant)."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
payload = {
|
|
"aud": ["test-client-id"], # Only MCP
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
# Per RFC 7519, we only validate MCP audience. Nextcloud validates its own.
|
|
assert verifier._has_mcp_audience(payload) is True
|
|
|
|
def test_validate_multi_audience_string_audience(self, base_settings):
|
|
"""Test MCP audience validation with string audience works (RFC 7519 compliant)."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
payload = {
|
|
"aud": "test-client-id", # Single audience as string
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
# Should pass - we only validate MCP audience per RFC 7519
|
|
assert verifier._has_mcp_audience(payload) is True
|
|
|
|
def test_has_mcp_audience_with_client_id(self, exchange_settings):
|
|
"""Test MCP audience validation with client ID."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
payload = {
|
|
"aud": ["test-client-id"],
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
assert verifier._has_mcp_audience(payload) is True
|
|
|
|
def test_has_mcp_audience_with_server_url(self, exchange_settings):
|
|
"""Test MCP audience validation with server URL."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
payload = {
|
|
"aud": ["http://localhost:8000"],
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
assert verifier._has_mcp_audience(payload) is True
|
|
|
|
def test_has_mcp_audience_missing(self, exchange_settings):
|
|
"""Test MCP audience validation fails without MCP audience."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
payload = {
|
|
"aud": ["http://localhost:8080"], # Wrong audience
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
assert verifier._has_mcp_audience(payload) is False
|
|
|
|
|
|
class TestTokenFormatDetection:
|
|
"""Test JWT format detection."""
|
|
|
|
def test_is_jwt_format_valid(self, base_settings):
|
|
"""Test JWT format detection with valid JWT."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
jwt_token = "eyJhbGc.eyJzdWI.signature"
|
|
assert verifier._is_jwt_format(jwt_token) is True
|
|
|
|
def test_is_jwt_format_opaque(self, base_settings):
|
|
"""Test JWT format detection with opaque token."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
opaque_token = "opaque-token-12345"
|
|
assert verifier._is_jwt_format(opaque_token) is False
|
|
|
|
|
|
class TestTokenCaching:
|
|
"""Test token caching functionality."""
|
|
|
|
async def test_cache_stores_and_retrieves(self, base_settings):
|
|
"""Test token caching stores and retrieves tokens."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Create a valid access token
|
|
payload = {
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"sub": "testuser",
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
"client_id": "test-client-id",
|
|
}
|
|
test_token = jwt.encode(payload, "secret", algorithm="HS256")
|
|
|
|
# Create AccessToken and cache it
|
|
access_token = verifier._create_access_token(test_token, payload)
|
|
assert access_token is not None
|
|
|
|
# Should retrieve from cache
|
|
cached = verifier._get_cached_token(test_token)
|
|
assert cached is not None
|
|
assert cached.resource == "testuser"
|
|
assert cached.scopes == ["openid", "profile"]
|
|
|
|
async def test_cache_respects_expiry(self, base_settings):
|
|
"""Test that expired tokens are not returned from cache."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Create expired token payload
|
|
payload = {
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"sub": "testuser",
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() - 100), # Expired 100 seconds ago
|
|
"client_id": "test-client-id",
|
|
}
|
|
test_token = jwt.encode(payload, "secret", algorithm="HS256")
|
|
|
|
# Create and cache
|
|
access_token = verifier._create_access_token(test_token, payload)
|
|
assert access_token is not None
|
|
|
|
# Should not retrieve expired token
|
|
cached = verifier._get_cached_token(test_token)
|
|
assert cached is None
|
|
|
|
async def test_cache_clear(self, base_settings):
|
|
"""Test cache clearing."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Create and cache token
|
|
payload = {
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"sub": "testuser",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
test_token = jwt.encode(payload, "secret", algorithm="HS256")
|
|
verifier._create_access_token(test_token, payload)
|
|
|
|
# Clear cache
|
|
verifier.clear_cache()
|
|
|
|
# Should not retrieve after clear
|
|
cached = verifier._get_cached_token(test_token)
|
|
assert cached is None
|
|
|
|
|
|
class TestMultiAudienceVerification:
|
|
"""Test multi-audience token verification."""
|
|
|
|
async def test_verify_multi_audience_with_introspection(self, base_settings):
|
|
"""Test multi-audience verification using introspection."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Mock introspection response
|
|
introspection_response = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
"client_id": "test-client-id",
|
|
}
|
|
|
|
with patch.object(
|
|
verifier, "_introspect_token", return_value=introspection_response
|
|
):
|
|
opaque_token = "opaque-token-12345"
|
|
result = await verifier._verify_mcp_audience(opaque_token)
|
|
|
|
assert result is not None
|
|
assert result.resource == "testuser"
|
|
assert result.scopes == ["openid", "profile"]
|
|
|
|
async def test_verify_multi_audience_fails_without_both_audiences(
|
|
self, base_settings
|
|
):
|
|
"""Test MCP audience verification succeeds with only MCP audience (RFC 7519 compliant)."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Mock introspection response with only MCP audience
|
|
introspection_response = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": [
|
|
"test-client-id"
|
|
], # Only MCP audience (Nextcloud validates its own)
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
with patch.object(
|
|
verifier, "_introspect_token", return_value=introspection_response
|
|
):
|
|
opaque_token = "opaque-token-12345"
|
|
result = await verifier._verify_mcp_audience(opaque_token)
|
|
|
|
# Should succeed with only MCP audience per RFC 7519
|
|
assert result is not None
|
|
assert result.resource == "testuser"
|
|
|
|
|
|
class TestExchangeModeVerification:
|
|
"""Test token exchange mode verification."""
|
|
|
|
async def test_verify_mcp_audience_only_success(self, exchange_settings):
|
|
"""Test MCP-only audience verification succeeds with MCP audience."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
|
|
# Mock introspection response with MCP audience only
|
|
introspection_response = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": ["test-client-id"],
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
"client_id": "test-client-id",
|
|
}
|
|
|
|
with patch.object(
|
|
verifier, "_introspect_token", return_value=introspection_response
|
|
):
|
|
opaque_token = "opaque-token-12345"
|
|
result = await verifier._verify_mcp_audience(opaque_token)
|
|
|
|
assert result is not None
|
|
assert result.resource == "testuser"
|
|
|
|
async def test_verify_mcp_audience_only_fails_without_mcp(self, exchange_settings):
|
|
"""Test MCP audience verification fails without MCP audience."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
|
|
# Mock introspection response without MCP audience
|
|
introspection_response = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": ["http://localhost:8080"], # Wrong audience
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
with patch.object(
|
|
verifier, "_introspect_token", return_value=introspection_response
|
|
):
|
|
opaque_token = "opaque-token-12345"
|
|
result = await verifier._verify_mcp_audience(opaque_token)
|
|
|
|
assert result is None
|
|
|
|
|
|
class TestIntrospection:
|
|
"""Test token introspection."""
|
|
|
|
async def test_introspect_active_token(self, base_settings):
|
|
"""Test introspection of active token."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Mock HTTP response
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
"client_id": "test-client-id",
|
|
}
|
|
|
|
verifier.http_client.post = AsyncMock(return_value=mock_response)
|
|
|
|
result = await verifier._introspect_token("test-token")
|
|
assert result is not None
|
|
assert result["active"] is True
|
|
assert result["sub"] == "testuser"
|
|
|
|
async def test_introspect_inactive_token(self, base_settings):
|
|
"""Test introspection of inactive token."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
# Mock HTTP response
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"active": False}
|
|
|
|
verifier.http_client.post = AsyncMock(return_value=mock_response)
|
|
|
|
result = await verifier._introspect_token("test-token")
|
|
assert result is None
|
|
|
|
async def test_introspect_without_endpoint(self, base_settings):
|
|
"""Test introspection when endpoint not configured."""
|
|
base_settings.introspection_uri = None
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
result = await verifier._introspect_token("test-token")
|
|
assert result is None
|
|
|
|
|
|
class TestAccessTokenCreation:
|
|
"""Test AccessToken object creation."""
|
|
|
|
def test_create_access_token_success(self, base_settings):
|
|
"""Test successful AccessToken creation."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
payload = {
|
|
"sub": "testuser",
|
|
"scope": "openid profile email",
|
|
"exp": int(time.time() + 3600),
|
|
"client_id": "test-client-id",
|
|
}
|
|
token = "test-token-123"
|
|
|
|
result = verifier._create_access_token(token, payload)
|
|
assert result is not None
|
|
assert result.token == token
|
|
assert result.resource == "testuser"
|
|
assert result.scopes == ["openid", "profile", "email"]
|
|
assert result.client_id == "test-client-id"
|
|
|
|
def test_create_access_token_with_preferred_username(self, base_settings):
|
|
"""Test AccessToken creation with preferred_username fallback."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
payload = {
|
|
"preferred_username": "testuser", # No 'sub' claim
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
token = "test-token-123"
|
|
|
|
result = verifier._create_access_token(token, payload)
|
|
assert result is not None
|
|
assert result.resource == "testuser"
|
|
|
|
def test_create_access_token_no_username(self, base_settings):
|
|
"""Test AccessToken creation fails without username."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
payload = {
|
|
# No sub or preferred_username
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
token = "test-token-123"
|
|
|
|
result = verifier._create_access_token(token, payload)
|
|
assert result is None
|
|
|
|
def test_create_access_token_no_expiry(self, base_settings):
|
|
"""Test AccessToken creation uses default TTL without expiry."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
payload = {
|
|
"sub": "testuser",
|
|
"scope": "openid profile",
|
|
# No exp claim
|
|
}
|
|
token = "test-token-123"
|
|
|
|
result = verifier._create_access_token(token, payload)
|
|
assert result is not None
|
|
# Should have set a default expiry
|
|
assert result.expires_at > int(time.time())
|
|
|
|
|
|
class TestVerifyTokenFlow:
|
|
"""Test complete verify_token flow."""
|
|
|
|
async def test_verify_token_from_cache(self, base_settings):
|
|
"""Test verify_token returns cached token."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
payload = {
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"sub": "testuser",
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
token = jwt.encode(payload, "secret", algorithm="HS256")
|
|
|
|
# First call - should cache
|
|
result1 = verifier._create_access_token(token, payload)
|
|
assert result1 is not None
|
|
|
|
# Mock _verify_mcp_audience to ensure it's not called
|
|
with patch.object(verifier, "_verify_mcp_audience") as mock_verify:
|
|
result2 = await verifier.verify_token(token)
|
|
assert result2 is not None
|
|
assert result2.resource == "testuser"
|
|
# Should not call verification since it's cached
|
|
mock_verify.assert_not_called()
|
|
|
|
async def test_verify_token_multi_audience_mode(self, base_settings):
|
|
"""Test verify_token in multi-audience mode."""
|
|
verifier = UnifiedTokenVerifier(base_settings)
|
|
|
|
introspection_response = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": ["test-client-id", "http://localhost:8080"],
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
with patch.object(
|
|
verifier, "_introspect_token", return_value=introspection_response
|
|
):
|
|
result = await verifier.verify_token("opaque-token")
|
|
assert result is not None
|
|
assert result.resource == "testuser"
|
|
|
|
async def test_verify_token_exchange_mode(self, exchange_settings):
|
|
"""Test verify_token in exchange mode."""
|
|
verifier = UnifiedTokenVerifier(exchange_settings)
|
|
|
|
introspection_response = {
|
|
"active": True,
|
|
"sub": "testuser",
|
|
"aud": ["test-client-id"], # MCP audience only
|
|
"scope": "openid profile",
|
|
"exp": int(time.time() + 3600),
|
|
}
|
|
|
|
with patch.object(
|
|
verifier, "_introspect_token", return_value=introspection_response
|
|
):
|
|
result = await verifier.verify_token("opaque-token")
|
|
assert result is not None
|
|
assert result.resource == "testuser"
|