From d4ee5a74c25b7ecffc8d65252f33442e35554b7c Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Fri, 24 Oct 2025 00:51:50 +0200 Subject: [PATCH] test: Update default tokens to JWT, add to introspection tests --- .../post-installation/10-install-oidc-app.sh | 1 + .../test_introspection_authorization.py | 480 ++++++++++++++++++ third_party/oidc | 2 +- 3 files changed, 482 insertions(+), 1 deletion(-) create mode 100644 tests/server/test_introspection_authorization.py diff --git a/app-hooks/post-installation/10-install-oidc-app.sh b/app-hooks/post-installation/10-install-oidc-app.sh index 805fb65..40a7fd5 100755 --- a/app-hooks/post-installation/10-install-oidc-app.sh +++ b/app-hooks/post-installation/10-install-oidc-app.sh @@ -33,5 +33,6 @@ fi # Configure OIDC Identity Provider with dynamic client registration enabled php /var/www/html/occ config:app:set oidc dynamic_client_registration --value='true' php /var/www/html/occ config:app:set oidc proof_key_for_code_exchange --value=true --type=boolean +php /var/www/html/occ config:app:set oidc default_token_type --value='jwt' echo "OIDC app installed and configured successfully" diff --git a/tests/server/test_introspection_authorization.py b/tests/server/test_introspection_authorization.py new file mode 100644 index 0000000..f15cbc4 --- /dev/null +++ b/tests/server/test_introspection_authorization.py @@ -0,0 +1,480 @@ +""" +Integration tests for token introspection authorization. + +These tests verify that the introspection endpoint properly enforces +authorization rules: +1. Client authentication is required (401 if missing) +2. Only the token owner can introspect its own tokens +3. Only the designated resource server can introspect tokens +4. Other clients cannot introspect tokens they don't own or aren't the audience for +""" + +import asyncio +import logging +import os +import secrets + +# Import helpers from conftest +import sys +import time +from typing import AsyncGenerator +from urllib.parse import quote + +import httpx +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from conftest import _handle_oauth_consent_screen + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="module") +def nextcloud_host() -> str: + """Get Nextcloud host from environment.""" + host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080") + return host + + +@pytest.fixture(scope="module") +async def oidc_endpoints(nextcloud_host: str) -> dict[str, str]: + """Discover OIDC endpoints.""" + async with httpx.AsyncClient(timeout=30.0) as client: + discovery_url = f"{nextcloud_host}/.well-known/openid-configuration" + response = await client.get(discovery_url) + response.raise_for_status() + config = response.json() + + return { + "token_endpoint": config["token_endpoint"], + "authorization_endpoint": config.get("authorization_endpoint"), + "introspection_endpoint": config.get("introspection_endpoint"), + "registration_endpoint": config.get("registration_endpoint"), + } + + +@pytest.fixture(scope="module") +async def test_oauth_clients( + nextcloud_host: str, oidc_endpoints: dict[str, str], oauth_callback_server +) -> AsyncGenerator[dict[str, tuple[str, str]], None]: + """ + Create multiple OAuth clients for introspection testing. + + Returns a dict mapping client names to (client_id, client_secret) tuples. + """ + from nextcloud_mcp_server.auth.client_registration import register_client + + clients = {} + registration_endpoint = oidc_endpoints["registration_endpoint"] + + # Get the correct callback URL from the oauth_callback_server fixture + auth_states, callback_url = oauth_callback_server + + # Create client A (will be the token owner) + logger.info("Creating OAuth client A for introspection testing") + client_a = await register_client( + nextcloud_url=nextcloud_host, + registration_endpoint=registration_endpoint, + client_name="Introspection Test Client A", + redirect_uris=[callback_url], + scopes="openid profile email", + token_type="Bearer", # Use opaque tokens for this test + ) + clients["clientA"] = (client_a.client_id, client_a.client_secret) + logger.info(f"Created client A: {client_a.client_id[:16]}...") + + # Create client B (will attempt to introspect client A's tokens) + logger.info("Creating OAuth client B for introspection testing") + client_b = await register_client( + nextcloud_url=nextcloud_host, + registration_endpoint=registration_endpoint, + client_name="Introspection Test Client B", + redirect_uris=[callback_url], + scopes="openid profile email", + token_type="Bearer", + ) + clients["clientB"] = (client_b.client_id, client_b.client_secret) + logger.info(f"Created client B: {client_b.client_id[:16]}...") + + # Create client C (third party, should not be able to introspect) + logger.info("Creating OAuth client C for introspection testing") + client_c = await register_client( + nextcloud_url=nextcloud_host, + registration_endpoint=registration_endpoint, + client_name="Introspection Test Client C", + redirect_uris=[callback_url], + scopes="openid profile email", + token_type="Bearer", + ) + clients["clientC"] = (client_c.client_id, client_c.client_secret) + logger.info(f"Created client C: {client_c.client_id[:16]}...") + + yield clients + + # Cleanup is handled by Nextcloud - clients will be removed when tests are done + logger.info("Test OAuth clients fixture complete") + + +@pytest.mark.integration +async def test_introspection_requires_client_authentication( + oidc_endpoints: dict[str, str], +): + """ + Test that the introspection endpoint requires client authentication. + + Expected: 401 UNAUTHORIZED when credentials are missing or invalid. + """ + introspection_endpoint = oidc_endpoints["introspection_endpoint"] + if not introspection_endpoint: + pytest.skip("Introspection endpoint not available") + + async with httpx.AsyncClient(timeout=10.0) as client: + # Test 1: No credentials + response = await client.post( + introspection_endpoint, + data={"token": "some_token"}, + ) + assert response.status_code == 401, "Should return 401 without credentials" + data = response.json() + assert data.get("error") == "invalid_client" + + # Test 2: Invalid credentials + response = await client.post( + introspection_endpoint, + data={"token": "some_token"}, + auth=("invalid_client", "invalid_secret"), + ) + assert response.status_code == 401, "Should return 401 with invalid credentials" + data = response.json() + logger.info(f"Invalid client response: {data}") + # Response may be either {"error": "invalid_client"} or {"message": "..."} + # Both are acceptable as long as we get 401 + assert "error" in data or "message" in data, "Should return error information" + + +async def _obtain_token_for_client( + browser, + oauth_callback_server, + client_id: str, + client_secret: str, + token_endpoint: str, + authorization_endpoint: str, + scope: str = "openid profile email", + resource: str | None = None, +) -> str: + """ + Helper to obtain an OAuth token using existing callback server and playwright automation. + + Reuses the pattern from conftest.py's playwright_oauth_token fixture. + """ + username = os.getenv("NEXTCLOUD_USERNAME", "admin") + password = os.getenv("NEXTCLOUD_PASSWORD", "admin") + + # Get callback server from fixture + auth_states, callback_url = oauth_callback_server + + # Generate unique state parameter + state = secrets.token_urlsafe(32) + + # Construct authorization URL + auth_url_parts = [ + f"{authorization_endpoint}?", + "response_type=code&", + f"client_id={client_id}&", + f"redirect_uri={quote(callback_url, safe='')}&", + f"state={state}&", + f"scope={quote(scope, safe='')}", + ] + + if resource: + auth_url_parts.append(f"&resource={quote(resource, safe='')}") + + auth_url = "".join(auth_url_parts) + + logger.info(f"Obtaining token for client {client_id[:16]}... with scopes={scope}") + if resource: + logger.info(f" Resource parameter: {resource[:16]}...") + + # Browser automation (same pattern as conftest.py) + context = await browser.new_context(ignore_https_errors=True) + page = await context.new_page() + + try: + logger.debug(f"Navigating to: {auth_url[:100]}...") + await page.goto(auth_url, wait_until="networkidle", timeout=60000) + current_url = page.url + logger.debug(f"Current URL after navigation: {current_url}") + + # Handle login if needed + if "/login" in current_url or "/index.php/login" in current_url: + logger.info("Login page detected, filling credentials...") + await page.wait_for_selector('input[name="user"]', timeout=10000) + await page.fill('input[name="user"]', username) + await page.fill('input[name="password"]', password) + await page.click('button[type="submit"]') + await page.wait_for_load_state("networkidle", timeout=60000) + current_url = page.url + logger.info(f"After login: {current_url}") + + # Wait a bit for page to fully render after login + await asyncio.sleep(2) + current_url = page.url + logger.info(f"After waiting, current URL: {current_url}") + + # Check page content for debugging + page_content = await page.content() + has_consent_div = "#oidc-consent" in page_content + logger.info(f"Page has #oidc-consent div: {has_consent_div}") + + # Handle consent screen using the helper from conftest + try: + consent_handled = await _handle_oauth_consent_screen(page, username) + logger.info(f"Consent screen handled: {consent_handled}") + except Exception as e: + logger.warning(f"Error handling consent screen: {e}") + # Take screenshot for debugging + await page.screenshot(path=f"/tmp/consent_error_{state[:8]}.png") + logger.error("Consent error screenshot saved") + raise + + # Wait for callback server to receive auth code + logger.info("Waiting for callback server to receive auth code...") + timeout_seconds = 30 + start_time = time.time() + while state not in auth_states: + if time.time() - start_time > timeout_seconds: + screenshot_path = ( + f"/tmp/oauth_introspection_test_timeout_{state[:8]}.png" + ) + await page.screenshot(path=screenshot_path) + logger.error(f"Timeout! Screenshot saved to {screenshot_path}") + logger.error(f"Current URL: {page.url}") + raise TimeoutError( + f"Timeout waiting for OAuth callback (state={state[:16]}...)" + ) + await asyncio.sleep(0.5) + + auth_code = auth_states[state] + logger.info(f"Successfully received auth code: {auth_code[:20]}...") + + finally: + await context.close() + + # Exchange code for token + logger.debug("Exchanging authorization code for access token...") + async with httpx.AsyncClient(timeout=30.0) as http_client: + token_response = await http_client.post( + token_endpoint, + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": callback_url, + "client_id": client_id, + "client_secret": client_secret, + }, + ) + + token_response.raise_for_status() + token_data = token_response.json() + access_token = token_data.get("access_token") + + if not access_token: + raise ValueError(f"No access_token in response: {token_data}") + + logger.info("Successfully obtained access token") + return access_token + + +@pytest.mark.integration +async def test_client_cannot_introspect_other_clients_tokens( + playwright_oauth_token: str, + shared_oauth_client_credentials: tuple, + test_oauth_clients: dict[str, tuple[str, str]], + oidc_endpoints: dict[str, str], +): + """ + Test that one client cannot introspect tokens owned by another client. + + This test uses a pre-authorized shared OAuth client (with existing token) + and verifies that a different client cannot introspect that token. + + Expected: introspection returns {active: false} to not reveal token existence. + """ + introspection_endpoint = oidc_endpoints["introspection_endpoint"] + if not introspection_endpoint: + pytest.skip("Introspection endpoint not available") + + # Use the shared OAuth client's token (pre-authorized, working) + access_token = playwright_oauth_token + shared_client_id, shared_client_secret, _, _, _ = shared_oauth_client_credentials + + # Get a different client to try to introspect + different_client_id, different_client_secret = test_oauth_clients["clientB"] + + logger.info( + f"Testing introspection with shared client token: {access_token[:16]}..." + ) + logger.info(f"Shared client ID: {shared_client_id[:16]}...") + logger.info(f"Different client ID: {different_client_id[:16]}...") + + async with httpx.AsyncClient(timeout=10.0) as client: + # Test 1: The owning client (shared client) can introspect its own token + response = await client.post( + introspection_endpoint, + data={"token": access_token}, + auth=(shared_client_id, shared_client_secret), + ) + assert response.status_code == 200 + data = response.json() + logger.info(f"Owner client introspection response: {data}") + assert data.get("active") is True, ( + "Owner client should be able to introspect its own token" + ) + + # Test 2: A different client CANNOT introspect the shared client's token + response = await client.post( + introspection_endpoint, + data={"token": access_token}, + auth=(different_client_id, different_client_secret), + ) + assert response.status_code == 200 + data = response.json() + logger.info(f"Different client introspection response: {data}") + assert data.get("active") is False, ( + "Different client should NOT be able to introspect another client's token" + ) + + +@pytest.mark.integration +async def test_introspection_with_resource_parameter( + browser, + oauth_callback_server, + test_oauth_clients: dict[str, tuple[str, str]], + oidc_endpoints: dict[str, str], + nextcloud_host: str, +): + """ + Test that the resource server (specified via 'resource' parameter) can introspect tokens. + + This test verifies that when a token is issued with resource=clientB, + clientB can introspect it even though it's owned by clientA. + + This requires obtaining a token with the 'resource' parameter set via authorization code grant. + + Uses playwright automation to obtain real tokens. + """ + introspection_endpoint = oidc_endpoints["introspection_endpoint"] + if not introspection_endpoint: + pytest.skip("Introspection endpoint not available") + + client_a_id, client_a_secret = test_oauth_clients["clientA"] + client_b_id, client_b_secret = test_oauth_clients["clientB"] + client_c_id, client_c_secret = test_oauth_clients["clientC"] + + token_endpoint = oidc_endpoints["token_endpoint"] + authorization_endpoint = oidc_endpoints.get("authorization_endpoint") + if not authorization_endpoint: + pytest.skip("Authorization endpoint not available") + + # Obtain a token for client A with resource parameter set to client B + try: + access_token = await _obtain_token_for_client( + browser=browser, + oauth_callback_server=oauth_callback_server, + client_id=client_a_id, + client_secret=client_a_secret, + token_endpoint=token_endpoint, + authorization_endpoint=authorization_endpoint, + scope="openid profile email", + resource=client_b_id, # Set client B as the resource server + ) + except Exception as e: + logger.error(f"Failed to obtain token with resource parameter: {e}") + pytest.skip(f"Cannot obtain test token with resource parameter: {e}") + + logger.info( + f"Obtained access token from client A with resource={client_b_id}: {access_token[:16]}..." + ) + + # Test introspection + async with httpx.AsyncClient(timeout=10.0) as client: + # Test 1: Client A (owner) can introspect its own token + response = await client.post( + introspection_endpoint, + data={"token": access_token}, + auth=(client_a_id, client_a_secret), + ) + assert response.status_code == 200 + data = response.json() + logger.info(f"Client A (owner) introspection response: {data}") + assert data.get("active") is True, ( + "Client A (owner) should be able to introspect its own token" + ) + + # Test 2: Client B (resource server) can introspect the token + response = await client.post( + introspection_endpoint, + data={"token": access_token}, + auth=(client_b_id, client_b_secret), + ) + assert response.status_code == 200 + data = response.json() + logger.info(f"Client B (resource server) introspection response: {data}") + assert data.get("active") is True, ( + "Client B (resource server) should be able to introspect token intended for it" + ) + + # Verify the resource field in the response matches client B + logger.info(f"Full introspection response from Client B: {data}") + + # Test 3: Client C CANNOT introspect the token (not owner, not resource server) + response = await client.post( + introspection_endpoint, + data={"token": access_token}, + auth=(client_c_id, client_c_secret), + ) + assert response.status_code == 200 + data = response.json() + logger.info(f"Client C (third party) introspection response: {data}") + assert data.get("active") is False, ( + "Client C should NOT be able to introspect token (not owner or resource server)" + ) + + +@pytest.mark.integration +async def test_introspection_returns_inactive_for_invalid_token( + test_oauth_clients: dict[str, tuple[str, str]], + oidc_endpoints: dict[str, str], +): + """ + Test that introspection returns {active: false} for invalid/unknown tokens. + + This is important for security - we shouldn't reveal whether a token exists or not. + """ + introspection_endpoint = oidc_endpoints["introspection_endpoint"] + if not introspection_endpoint: + pytest.skip("Introspection endpoint not available") + + client_a_id, client_a_secret = test_oauth_clients["clientA"] + + async with httpx.AsyncClient(timeout=10.0) as client: + # Test with a fake token + response = await client.post( + introspection_endpoint, + data={"token": "completely_fake_token_12345"}, + auth=(client_a_id, client_a_secret), + ) + + assert response.status_code == 200 + data = response.json() + logger.info(f"Introspection response for fake token: {data}") + assert data.get("active") is False, ( + "Should return active=false for invalid token" + ) + # Should NOT return any other information + assert len(data) == 1, "Should only return 'active' field for invalid token" + + +if __name__ == "__main__": + # Run with: uv run pytest tests/server/test_introspection_authorization.py -v -s + pytest.main([__file__, "-v", "-s", "-m", "integration"]) diff --git a/third_party/oidc b/third_party/oidc index 6c59725..0842fad 160000 --- a/third_party/oidc +++ b/third_party/oidc @@ -1 +1 @@ -Subproject commit 6c5972540b4a4f100abda7436c8c9bef7166f2d4 +Subproject commit 0842fad479d94548cd9f110faad73dbe44283907