fix(auth): Skip issuer validation for management API tokens

Fixes NC PHP app (Astrolabe) OAuth integration by making token validation
more lenient for management API access.

Problem:
- Astrolabe calls Nextcloud OIDC token endpoint via internal URL (http://localhost)
- Tokens are issued with iss: http://localhost (internal)
- MCP server expects iss: http://localhost:8080 (external)
- Token validation failed with "Invalid issuer"

Solution:
- Add skip_issuer_check parameter to _verify_jwt_signature()
- verify_token_for_management_api() now skips both audience and issuer checks
- Security maintained: signature still verified, authorization checked by API

Also includes related fixes from previous session:
- Update test selectors for Vue 3 UI ("Enable Semantic Search")
- Fix OIDC discovery URL transformation in OAuthController.php
- Add overwrite.cli.url to setup hook for proper external URLs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-12-24 17:25:48 -06:00
parent 5e2ef5f35b
commit 804480836e
6 changed files with 249 additions and 78 deletions
+8 -3
View File
@@ -54,6 +54,10 @@ async def validate_token_and_get_user(
) -> tuple[str, dict[str, Any]]:
"""Validate OAuth bearer token and extract user ID.
Uses verify_token_for_management_api which accepts any valid Nextcloud OIDC
token (not just MCP-audience tokens). This is needed because Astrolabe
(NC PHP app) uses its own OAuth client, separate from MCP server's client.
Args:
request: Starlette request with Authorization header
@@ -71,9 +75,10 @@ async def validate_token_and_get_user(
# Note: This is set in app.py starlette_lifespan for OAuth mode
token_verifier = request.app.state.oauth_context["token_verifier"]
# Validate token (handles both JWT and opaque tokens)
# verify_token returns AccessToken object or None
access_token = await token_verifier.verify_token(token)
# Validate token for management API (handles both JWT and opaque tokens)
# Uses verify_token_for_management_api which accepts any valid Nextcloud token
# without requiring MCP audience - needed for Astrolabe integration (ADR-018)
access_token = await token_verifier.verify_token_for_management_api(token)
if not access_token:
raise ValueError("Token validation failed")
+142 -15
View File
@@ -117,6 +117,51 @@ class UnifiedTokenVerifier(TokenVerifier):
# Both modes do the same validation (MCP audience only)
return await self._verify_mcp_audience(token)
async def verify_token_for_management_api(self, token: str) -> AccessToken | None:
"""
Verify token for management API access (ADR-018 NC PHP app integration).
This is a more lenient verification that accepts ANY valid Nextcloud OIDC
token, not just tokens with MCP server audience. This is needed because:
- Astrolabe (NC PHP app) uses its own OAuth client with Nextcloud OIDC
- Tokens from Astrolabe have Astrolabe's client_id as audience
- MCP server's management API should accept these tokens
Security model:
- Authentication: Token is valid (issued by Nextcloud, not expired)
- Authorization: Token's user == requested resource (checked by management API)
Args:
token: Bearer token to verify
Returns:
AccessToken if valid (regardless of audience), None otherwise
"""
# Check cache first (using separate cache key to avoid mixing with MCP tokens)
cache_key = f"mgmt:{hashlib.sha256(token.encode()).hexdigest()}"
if cache_key in self._token_cache:
userinfo, expiry = self._token_cache[cache_key]
if time.time() < expiry:
logger.debug("Management API token found in cache")
oauth_token_cache_hits_total.labels(hit="true").inc()
username = userinfo.get("sub") or userinfo.get("preferred_username")
scope_string = userinfo.get("scope", "")
scopes = scope_string.split() if scope_string else []
return AccessToken(
token=token,
client_id=userinfo.get("client_id", ""),
scopes=scopes,
expires_at=int(expiry),
resource=username,
)
else:
del self._token_cache[cache_key]
oauth_token_cache_hits_total.labels(hit="false").inc()
# Verify token without audience check
return await self._verify_without_audience_check(token, cache_key)
async def _verify_mcp_audience(self, token: str) -> AccessToken | None:
"""
Validate token has MCP audience.
@@ -186,6 +231,71 @@ class UnifiedTokenVerifier(TokenVerifier):
record_oauth_token_validation(validation_method, "error")
return None
async def _verify_without_audience_check(
self, token: str, cache_key: str
) -> AccessToken | None:
"""
Verify token validity without checking MCP audience or issuer.
Used for management API where tokens from Astrolabe (NC PHP app) need to
be accepted. These tokens are issued by Nextcloud OIDC to Astrolabe's
OAuth client, not MCP server's client.
Security model:
- We skip audience check (token may have Astrolabe's audience, not MCP's)
- We skip issuer check (token may have internal Nextcloud URL as issuer)
- We still verify signature (token is authentically from Nextcloud OIDC)
- We still verify expiration (token is not expired)
- Authorization is checked by management API (user == requested resource)
Args:
token: Bearer token to verify
cache_key: Cache key for storing validation result
Returns:
AccessToken if valid, None otherwise
"""
validation_method = "unknown"
try:
# Attempt JWT verification first
# Skip issuer check for management API tokens (may have internal URL)
if self._is_jwt_format(token) and self.jwks_client:
validation_method = "jwt"
payload = await self._verify_jwt_signature(
token, skip_issuer_check=True
)
if payload:
record_oauth_token_validation("jwt", "valid")
else:
record_oauth_token_validation("jwt", "invalid")
return None
else:
# Fall back to introspection for opaque tokens
validation_method = "introspect"
payload = await self._introspect_token(token)
if payload:
record_oauth_token_validation("introspect", "valid")
else:
record_oauth_token_validation("introspect", "invalid")
return None
# Check payload is valid
if not payload:
return None
# Skip audience validation - any valid Nextcloud token is accepted
logger.debug(
f"Management API token validated (no audience check) for user: {payload.get('sub')}"
)
# Cache and return the token
return self._create_access_token_with_cache_key(token, payload, cache_key)
except Exception as e:
logger.error(f"Management API token verification failed: {e}")
record_oauth_token_validation(validation_method, "error")
return None
def _has_mcp_audience(self, payload: dict[str, Any]) -> bool:
"""
Check if token has MCP audience.
@@ -230,12 +340,15 @@ class UnifiedTokenVerifier(TokenVerifier):
"""
return "." in token and token.count(".") == 2
async def _verify_jwt_signature(self, token: str) -> dict[str, Any] | None:
async def _verify_jwt_signature(
self, token: str, skip_issuer_check: bool = False
) -> dict[str, Any] | None:
"""
Verify JWT token with signature validation using JWKS.
Args:
token: JWT token to verify
skip_issuer_check: If True, skip issuer validation (for management API tokens)
Returns:
Decoded payload if valid, None if invalid
@@ -248,25 +361,22 @@ class UnifiedTokenVerifier(TokenVerifier):
# Verify and decode JWT
# Note: We don't validate audience here - that's done separately based on mode
# Issuer validation can be skipped for management API tokens (from Astrolabe)
should_verify_issuer = (
not skip_issuer_check
and hasattr(self.settings, "oidc_issuer")
and self.settings.oidc_issuer
)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
issuer=(
self.settings.oidc_issuer
if hasattr(self.settings, "oidc_issuer")
else None
),
issuer=(self.settings.oidc_issuer if should_verify_issuer else None),
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_iss": (
True
if hasattr(self.settings, "oidc_issuer")
and self.settings.oidc_issuer
else False
),
"verify_iss": should_verify_issuer,
"verify_aud": False, # We handle audience validation separately
},
)
@@ -358,6 +468,24 @@ class UnifiedTokenVerifier(TokenVerifier):
token: The bearer token
payload: Validated token payload
Returns:
AccessToken object or None if required fields missing
"""
# Use default cache key (hash of token)
cache_key = hashlib.sha256(token.encode()).hexdigest()
return self._create_access_token_with_cache_key(token, payload, cache_key)
def _create_access_token_with_cache_key(
self, token: str, payload: dict[str, Any], cache_key: str
) -> AccessToken | None:
"""
Create AccessToken object from validated token payload with custom cache key.
Args:
token: The bearer token
payload: Validated token payload
cache_key: Key to use for caching (allows separate caches for MCP vs management API)
Returns:
AccessToken object or None if required fields missing
"""
@@ -382,14 +510,13 @@ class UnifiedTokenVerifier(TokenVerifier):
logger.warning("No 'exp' claim in token, using default TTL")
exp = int(time.time() + self.cache_ttl)
# Cache the result
token_hash = hashlib.sha256(token.encode()).hexdigest()
# Cache the result with the provided key
userinfo = {
"sub": username,
"scope": scope_string,
**{k: v for k, v in payload.items() if k not in ["sub", "scope"]},
}
self._token_cache[token_hash] = (userinfo, exp)
self._token_cache[cache_key] = (userinfo, exp)
return AccessToken(
token=token,