docs(auth): Update docstrings of management api auth handling

This commit is contained in:
Chris Coutinho
2025-12-26 09:04:39 -06:00
parent 779d474aaa
commit 0a23e484e9
5 changed files with 115 additions and 26 deletions
+1 -2
View File
@@ -102,5 +102,4 @@ jobs:
NEXTCLOUD_USERNAME: "admin"
NEXTCLOUD_PASSWORD: "admin"
run: |
# NOTE: Temporarily run all tests until merged
uv run pytest -v --log-cli-level=WARN #-m unit -m smoke
uv run pytest -v --log-cli-level=WARN -m unit -m smoke
+12
View File
@@ -58,6 +58,18 @@ async def validate_token_and_get_user(
token (not just MCP-audience tokens). This is needed because Astrolabe
(NC PHP app) uses its own OAuth client, separate from MCP server's client.
Security Model:
~~~~~~~~~~~~~~~
- **Authentication** (this function): Verifies token is cryptographically valid
and extracts user identity from the `sub` claim.
- **Authorization** (calling endpoints): Each endpoint MUST verify that the
authenticated user owns the requested resource. For example:
- GET /users/{user_id}/session: Checks token_user_id == path_user_id (403 if mismatch)
- POST /users/{user_id}/revoke: Checks token_user_id == path_user_id (403 if mismatch)
This separation ensures that even without audience validation, users can only
access their own resources. Cross-user access is blocked at the authorization layer.
Args:
request: Starlette request with Authorization header
+26 -4
View File
@@ -993,10 +993,32 @@ async def setup_oauth_config_for_multi_user_basic(
)
# Perform OIDC discovery
async with httpx.AsyncClient() as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
try:
async with httpx.AsyncClient(timeout=30.0) as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
except httpx.HTTPStatusError as e:
logger.error(
f"OIDC discovery failed: HTTP {e.response.status_code} from {discovery_url}"
)
raise ValueError(
f"OIDC discovery failed: HTTP {e.response.status_code} from {discovery_url}. "
"Ensure Nextcloud OIDC (user_oidc app) is installed and configured."
) from e
except httpx.RequestError as e:
logger.error(f"OIDC discovery failed: {e}")
raise ValueError(
f"OIDC discovery failed: Cannot connect to {discovery_url}. Error: {e}"
) from e
except (KeyError, ValueError) as e:
logger.error(
f"OIDC discovery failed: Invalid response from {discovery_url}: {e}"
)
raise ValueError(
f"OIDC discovery failed: Invalid response from {discovery_url}. "
"The endpoint did not return valid OIDC configuration."
) from e
logger.info("✓ OIDC discovery successful (multi-user BasicAuth)")
+38 -11
View File
@@ -121,15 +121,35 @@ class UnifiedTokenVerifier(TokenVerifier):
"""
Verify token for management API access (ADR-018 NC PHP app integration).
This is a more lenient verification that accepts ANY valid Nextcloud OIDC
token, not just tokens with MCP server audience. This is needed because:
This verification accepts ANY valid Nextcloud OIDC token, not just tokens
with MCP server audience. This is needed because:
- Astrolabe (NC PHP app) uses its own OAuth client with Nextcloud OIDC
- Tokens from Astrolabe have Astrolabe's client_id as audience
- MCP server's management API should accept these tokens
Security model:
- Authentication: Token is valid (issued by Nextcloud, not expired)
- Authorization: Token's user == requested resource (checked by management API)
Security Model:
~~~~~~~~~~~~~~~~
This relaxed audience validation is secure because:
1. **Authentication layer** (this method):
- Verifies token signature against Nextcloud's JWKS (cryptographic proof)
- Verifies token is not expired
- Extracts user identity from validated token claims
2. **Authorization layer** (management API endpoints):
- EVERY endpoint verifies: token.sub == requested_resource_owner
- Example: GET /users/{user_id}/session checks token_user_id == path_user_id
- Users can ONLY access their own resources, never another user's
3. **Attack scenario analysis**:
- Attacker with stolen token for App A cannot access user B's data
- Token's `sub` claim is cryptographically bound to a specific user
- Authorization layer rejects cross-user access attempts (403 Forbidden)
4. **Why audience validation isn't needed here**:
- Audience validation prevents token confusion attacks across services
- But management API authorization already gates access per-user
- A token valid for "astrolabe" is still bound to user X, not user Y
Args:
token: Bearer token to verify
@@ -241,12 +261,19 @@ class UnifiedTokenVerifier(TokenVerifier):
be accepted. These tokens are issued by Nextcloud OIDC to Astrolabe's
OAuth client, not MCP server's client.
Security model:
- We skip audience check (token may have Astrolabe's audience, not MCP's)
- We skip issuer check (token may have internal Nextcloud URL as issuer)
- We still verify signature (token is authentically from Nextcloud OIDC)
- We still verify expiration (token is not expired)
- Authorization is checked by management API (user == requested resource)
What we verify:
- ✓ Token signature (cryptographic proof token is from Nextcloud OIDC)
- ✓ Token expiration (not expired)
- ✓ Token structure (valid JWT format)
What we skip:
- ✗ Audience check (token may have Astrolabe's audience, not MCP's)
- ✗ Issuer check (token may have internal Nextcloud URL as issuer)
Security guarantee:
- Authorization is enforced by management API endpoints
- Each endpoint verifies: token.sub == requested_resource_owner
- See verify_token_for_management_api() docstring for full security model
Args:
token: Bearer token to verify
+38 -9
View File
@@ -203,26 +203,55 @@ class TestSetupOAuthConfigForMultiUserBasic:
# Verify issuer is used directly for JWT validation
assert hybrid_auth_settings.oidc_issuer == oidc_discovery_response["issuer"]
async def test_oidc_discovery_failure(self, hybrid_auth_settings, mocker):
"""Test handling of OIDC discovery failure."""
# Mock httpx.AsyncClient to raise an HTTP error
async def test_oidc_discovery_failure_http_error(
self, hybrid_auth_settings, mocker
):
"""Test handling of OIDC discovery HTTP errors."""
import httpx
# Create a mock response with a status error
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = httpx.HTTPError(
"Connection failed"
mock_response.status_code = 404
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"Not Found",
request=MagicMock(),
response=MagicMock(status_code=404),
)
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = AsyncMock()
# Return None to propagate exceptions (not suppress them)
mock_client.__aexit__ = AsyncMock(return_value=None)
mocker.patch("httpx.AsyncClient", return_value=mock_client)
# Call function and expect exception (currently raises UnboundLocalError
# due to exception in async with block - this is a known issue)
with pytest.raises((httpx.HTTPError, UnboundLocalError)):
# Should raise ValueError with helpful message (not UnboundLocalError)
with pytest.raises(ValueError, match="OIDC discovery failed"):
await setup_oauth_config_for_multi_user_basic(
settings=hybrid_auth_settings,
client_id="test-client-id",
client_secret="test-client-secret",
)
async def test_oidc_discovery_failure_connection_error(
self, hybrid_auth_settings, mocker
):
"""Test handling of OIDC discovery connection errors."""
import httpx
mock_client = AsyncMock()
mock_client.get = AsyncMock(
side_effect=httpx.ConnectError("Connection refused")
)
mock_client.__aenter__.return_value = mock_client
# Return None to propagate exceptions (not suppress them)
mock_client.__aexit__ = AsyncMock(return_value=None)
mocker.patch("httpx.AsyncClient", return_value=mock_client)
# Should raise ValueError with helpful message
with pytest.raises(ValueError, match="Cannot connect to"):
await setup_oauth_config_for_multi_user_basic(
settings=hybrid_auth_settings,
client_id="test-client-id",