test: Update all test network hosts to respect iss claims from JWTs

This commit is contained in:
Chris Coutinho
2025-10-23 11:09:51 +02:00
parent e9a16c43b5
commit 54e975198f
8 changed files with 271 additions and 28 deletions
+4 -4
View File
@@ -74,8 +74,8 @@ services:
- 127.0.0.1:8001:8001
environment:
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8001
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
- NEXTCLOUD_MCP_SERVER_URL=http://localhost:8001
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8080
- NEXTCLOUD_OIDC_CLIENT_STORAGE=/app/.oauth/nextcloud_oauth_client.json
- NEXTCLOUD_OIDC_SCOPES=openid profile email nc:read nc:write
# No USERNAME/PASSWORD - will use OAuth with Dynamic Client Registration
@@ -93,8 +93,8 @@ services:
- 127.0.0.1:8002:8002
environment:
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8002
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
- NEXTCLOUD_MCP_SERVER_URL=http://localhost:8002
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8080
- NEXTCLOUD_OIDC_CLIENT_STORAGE=/app/.oauth-jwt/nextcloud_oauth_client.json
- NEXTCLOUD_OIDC_SCOPES=openid profile email nc:read nc:write
- NEXTCLOUD_OIDC_TOKEN_TYPE=jwt
+107 -11
View File
@@ -82,7 +82,7 @@ async def create_mcp_client_session(
- Ensures proper cleanup without suppressing errors
Args:
url: MCP server URL (e.g., "http://127.0.0.1:8000/mcp")
url: MCP server URL (e.g., "http://localhost:8000/mcp")
token: Optional OAuth access token for Bearer authentication
client_name: Client name for logging (e.g., "OAuth MCP (Playwright)")
@@ -159,7 +159,7 @@ async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]:
Uses anyio pytest plugin for proper async fixture handling.
"""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8000/mcp", client_name="Basic MCP"
url="http://localhost:8000/mcp", client_name="Basic MCP"
):
yield session
@@ -177,7 +177,7 @@ async def nc_mcp_oauth_client(
Uses anyio pytest plugin for proper async fixture handling.
"""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
url="http://localhost:8001/mcp",
token=playwright_oauth_token,
client_name="OAuth MCP (Playwright)",
):
@@ -202,7 +202,7 @@ async def nc_mcp_oauth_jwt_client(
Uses anyio pytest plugin for proper async fixture handling.
"""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8002/mcp",
url="http://localhost:8002/mcp",
token=playwright_oauth_token_jwt,
client_name="OAuth JWT MCP (Playwright)",
):
@@ -225,7 +225,7 @@ async def nc_mcp_oauth_client_read_only(
enabling proper scope-based filtering.
"""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8002/mcp",
url="http://localhost:8002/mcp",
token=playwright_oauth_token_read_only,
client_name="OAuth JWT MCP Read-Only (Playwright)",
):
@@ -248,7 +248,7 @@ async def nc_mcp_oauth_client_write_only(
enabling proper scope-based filtering.
"""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8002/mcp",
url="http://localhost:8002/mcp",
token=playwright_oauth_token_write_only,
client_name="OAuth JWT MCP Write-Only (Playwright)",
):
@@ -270,13 +270,38 @@ async def nc_mcp_oauth_client_full_access(
enabling proper scope-based filtering.
"""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8002/mcp",
url="http://localhost:8002/mcp",
token=playwright_oauth_token_full_access,
client_name="OAuth JWT MCP Full Access (Playwright)",
):
yield session
@pytest.fixture(scope="session")
async def nc_mcp_oauth_client_no_custom_scopes(
anyio_backend,
playwright_oauth_token_no_custom_scopes: str,
) -> AsyncGenerator[ClientSession, Any]:
"""
Fixture to create an MCP client session with NO custom scopes.
Connects to the JWT OAuth-enabled MCP server on port 8002.
This client has only OIDC default scopes (openid, profile, email) without
application-specific scopes (nc:read, nc:write).
Expected behavior: Should see 0 tools (all tools require custom scopes).
Uses JWT MCP server because JWT tokens embed scope information in claims,
enabling proper scope-based filtering.
"""
async for session in create_mcp_client_session(
url="http://localhost:8002/mcp",
token=playwright_oauth_token_no_custom_scopes,
client_name="OAuth JWT MCP No Custom Scopes (Playwright)",
):
yield session
@pytest.fixture
async def temporary_note(nc_client: NextcloudClient):
"""
@@ -1232,6 +1257,51 @@ async def full_access_oauth_client_credentials(anyio_backend, oauth_callback_ser
)
@pytest.fixture(scope="session")
async def no_custom_scopes_oauth_client_credentials(
anyio_backend, oauth_callback_server
):
"""
Fixture for OAuth client with NO custom scopes (only OIDC defaults).
Tests the security behavior when a user grants only the default OIDC scopes
(openid, profile, email) but declines custom application scopes (nc:read, nc:write).
Returns:
Tuple of (client_id, client_secret, callback_url, token_endpoint, authorization_endpoint)
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("No-custom-scopes OAuth client requires NEXTCLOUD_HOST")
auth_states, callback_url = oauth_callback_server
async with httpx.AsyncClient(timeout=30.0) as http_client:
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
discovery_response = await http_client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
token_endpoint = oidc_config.get("token_endpoint")
authorization_endpoint = oidc_config.get("authorization_endpoint")
# Create JWT client with NO custom scopes (only OIDC defaults)
client_id, client_secret = await _create_oauth_client_with_scopes(
callback_url=callback_url,
client_name="Test Client No Custom Scopes",
allowed_scopes="openid profile email", # No nc:read or nc:write
token_type="JWT", # JWT tokens for scope validation
)
return (
client_id,
client_secret,
callback_url,
token_endpoint,
authorization_endpoint,
)
@pytest.fixture(scope="session")
async def playwright_oauth_token(
anyio_backend, browser, shared_oauth_client_credentials, oauth_callback_server
@@ -1711,6 +1781,32 @@ async def playwright_oauth_token_full_access(
)
@pytest.fixture(scope="session")
async def playwright_oauth_token_no_custom_scopes(
anyio_backend,
browser,
no_custom_scopes_oauth_client_credentials,
oauth_callback_server,
) -> str:
"""
Fixture to obtain an OAuth access token with NO custom scopes.
Tests the security behavior when a user grants only default OIDC scopes
(openid, profile, email) but declines application-specific scopes.
Expected: JWT token will contain only default scopes, and all MCP tools
should be filtered out since they all require nc:read or nc:write.
Uses a dedicated JWT OAuth client with allowed_scopes="openid profile email"
"""
return await _get_oauth_token_with_scopes(
browser,
no_custom_scopes_oauth_client_credentials,
oauth_callback_server,
scopes="openid profile email", # Only OIDC defaults, no custom scopes
)
@pytest.fixture(scope="session")
async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
"""
@@ -2047,7 +2143,7 @@ async def alice_mcp_client(
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as alice (owner role)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
url="http://localhost:8001/mcp",
token=alice_oauth_token,
client_name="Alice MCP",
):
@@ -2060,7 +2156,7 @@ async def bob_mcp_client(
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as bob (viewer role)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
url="http://localhost:8001/mcp",
token=bob_oauth_token,
client_name="Bob MCP",
):
@@ -2074,7 +2170,7 @@ async def charlie_mcp_client(
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as charlie (editor role, in 'editors' group)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
url="http://localhost:8001/mcp",
token=charlie_oauth_token,
client_name="Charlie MCP",
):
@@ -2088,7 +2184,7 @@ async def diana_mcp_client(
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as diana (no-access role)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
url="http://localhost:8001/mcp",
token=diana_oauth_token,
client_name="Diana MCP",
):
+1 -1
View File
@@ -145,7 +145,7 @@ uv run python -m tests.load.oauth_benchmark -u 2 -d 30 --verbose
| `--users` | `-u` | 2 | Number of concurrent users (dynamically created) |
| `--duration` | `-d` | 30.0 | Test duration in seconds |
| `--warmup` | `-w` | 5.0 | Warmup period before metrics collection (seconds) |
| `--url` | | `http://127.0.0.1:8001/mcp` | MCP OAuth server URL |
| `--url` | | `http://localhost:8001/mcp` | MCP OAuth server URL |
| `--output` | `-o` | None | JSON output file path |
| `--workload` | | `mixed` | Workload type: mixed, sharing, collaboration, baseline |
| `--user-prefix` | | `loadtest` | Prefix for dynamically created usernames |
+2 -2
View File
@@ -414,7 +414,7 @@ async def show_progress(
@click.option(
"--url",
"-u",
default="http://127.0.0.1:8000/mcp",
default="http://localhost:8000/mcp",
show_default=True,
help="MCP server URL",
)
@@ -463,7 +463,7 @@ def main(
uv run python -m tests.load.benchmark -c 20 -d 60 --output results.json
# Test OAuth server on port 8001
uv run python -m tests.load.benchmark --url http://127.0.0.1:8001/mcp
uv run python -m tests.load.benchmark --url http://localhost:8001/mcp
"""
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
+5 -5
View File
@@ -51,7 +51,7 @@ class OAuthCallbackServer:
correlation, and stores them in a shared dictionary.
"""
def __init__(self, host: str = "127.0.0.1", port: int = 8081):
def __init__(self, host: str = "localhost", port: int = 8081):
self.host = host
self.port = port
self.auth_states: dict[str, str] = {}
@@ -363,13 +363,13 @@ async def run_oauth_benchmark(
try:
# Get environment variables
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
callback_url = "http://127.0.0.1:8081/callback"
callback_url = "http://localhost:8081/callback"
# Step 1: Start OAuth callback server
print("Step 1/6: Starting OAuth callback server...")
callback_server = OAuthCallbackServer(host="127.0.0.1", port=8081)
callback_server = OAuthCallbackServer(host="localhost", port=8081)
callback_server.start()
print("✓ Callback server listening on http://127.0.0.1:8081\n")
print("✓ Callback server listening on http://localhost:8081\n")
# Step 2: Discover OIDC endpoints
print("Step 2/6: Discovering OIDC endpoints...")
@@ -634,7 +634,7 @@ async def run_oauth_benchmark(
)
@click.option(
"--url",
default="http://127.0.0.1:8001/mcp",
default="http://localhost:8001/mcp",
show_default=True,
help="MCP OAuth server URL",
)
+1 -1
View File
@@ -138,7 +138,7 @@ class OAuthUserPool:
return profile
async def create_user_session(
self, username: str, mcp_url: str = "http://127.0.0.1:8001/mcp"
self, username: str, mcp_url: str = "http://localhost:8001/mcp"
) -> ClientSession:
"""
Create an MCP client session for a user.
+1 -1
View File
@@ -55,7 +55,7 @@ async def test_jwt_token_structure_with_custom_client():
pytest.skip("NEXTCLOUD_JWT_CLIENT_ID not set - skipping JWT token test")
_client_secret = os.getenv("NEXTCLOUD_JWT_CLIENT_SECRET")
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://127.0.0.1:8080")
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
# Fetch discovery
async with httpx.AsyncClient() as client:
+150 -3
View File
@@ -19,15 +19,15 @@ async def test_prm_endpoint():
# Test the PRM endpoint directly
async with httpx.AsyncClient() as client:
response = await client.get(
"http://127.0.0.1:8001/.well-known/oauth-protected-resource"
"http://localhost:8001/.well-known/oauth-protected-resource"
)
assert response.status_code == 200
prm_data = response.json()
assert prm_data["resource"] == "http://127.0.0.1:8001"
assert prm_data["resource"] == "http://localhost:8001"
assert "nc:read" in prm_data["scopes_supported"]
assert "nc:write" in prm_data["scopes_supported"]
assert "http://127.0.0.1:8080" in prm_data["authorization_servers"]
assert "http://localhost:8080" in prm_data["authorization_servers"]
assert "header" in prm_data["bearer_methods_supported"]
assert "RS256" in prm_data["resource_signing_alg_values_supported"]
@@ -388,5 +388,152 @@ async def test_scope_metadata_coverage(nc_mcp_client):
assert len(tools_response.tools) >= 90
@pytest.mark.integration
async def test_jwt_with_no_custom_scopes_returns_zero_tools(
nc_mcp_oauth_client_no_custom_scopes,
):
"""
Test that a JWT token with only OIDC default scopes (no nc:read or nc:write) returns 0 tools.
This tests the security behavior when a user declines to grant custom scopes during consent.
Expected: JWT token has scopes=['openid', 'profile', 'email'] but no nc:read or nc:write.
All tools require at least one custom scope, so they should all be filtered out.
"""
import logging
logger = logging.getLogger(__name__)
# Connect with JWT token that has NO custom scopes (only openid, profile, email)
result = await nc_mcp_oauth_client_no_custom_scopes.list_tools()
assert result is not None
tool_names = [tool.name for tool in result.tools]
logger.info(
f"JWT token with no custom scopes sees {len(tool_names)} tools (should be 0)"
)
# All tools require nc:read or nc:write, so should be filtered out
assert len(tool_names) == 0, (
f"Expected 0 tools but got {len(tool_names)}: {tool_names[:10]}"
)
logger.info(
"✅ JWT token without custom scopes correctly returns 0 tools (all filtered out)"
)
@pytest.mark.integration
async def test_jwt_consent_scenarios_read_only(nc_mcp_oauth_client_read_only):
"""
Test JWT with only nc:read scope consented.
Simulates user granting only read permission during OAuth consent.
Expected: Should see read tools but not write tools.
"""
import logging
logger = logging.getLogger(__name__)
result = await nc_mcp_oauth_client_read_only.list_tools()
assert result is not None
assert len(result.tools) > 0
tool_names = [tool.name for tool in result.tools]
logger.info(f"JWT with nc:read consent sees {len(tool_names)} tools")
# Verify read tools are present
read_tools = ["nc_notes_get_note", "nc_notes_search_notes", "nc_webdav_read_file"]
for tool in read_tools:
assert tool in tool_names, f"Expected read tool {tool} not found"
# Verify write tools are filtered out
write_tools = [
"nc_notes_create_note",
"nc_notes_update_note",
"nc_webdav_write_file",
]
for tool in write_tools:
assert tool not in tool_names, f"Write tool {tool} should be filtered out"
logger.info(
f"✅ JWT with nc:read consent: {len(tool_names)} read tools visible, write tools filtered"
)
@pytest.mark.integration
async def test_jwt_consent_scenarios_write_only(nc_mcp_oauth_client_write_only):
"""
Test JWT with only nc:write scope consented.
Simulates user granting only write permission during OAuth consent.
Expected: Should see write tools but not read-only tools.
"""
import logging
logger = logging.getLogger(__name__)
result = await nc_mcp_oauth_client_write_only.list_tools()
assert result is not None
assert len(result.tools) > 0
tool_names = [tool.name for tool in result.tools]
logger.info(f"JWT with nc:write consent sees {len(tool_names)} tools")
# Verify write tools are present
write_tools = [
"nc_notes_create_note",
"nc_notes_update_note",
"nc_webdav_write_file",
]
for tool in write_tools:
assert tool in tool_names, f"Expected write tool {tool} not found"
# Verify read-only tools are filtered out
read_only_tools = ["nc_notes_get_note", "nc_notes_search_notes"]
for tool in read_only_tools:
assert tool not in tool_names, f"Read-only tool {tool} should be filtered out"
logger.info(
f"✅ JWT with nc:write consent: {len(tool_names)} write tools visible, read-only tools filtered"
)
@pytest.mark.integration
async def test_jwt_consent_scenarios_full_access(nc_mcp_oauth_client_full_access):
"""
Test JWT with both nc:read and nc:write scopes consented.
Simulates user granting both permissions during OAuth consent.
Expected: Should see all 90+ tools (both read and write).
"""
import logging
logger = logging.getLogger(__name__)
result = await nc_mcp_oauth_client_full_access.list_tools()
assert result is not None
assert len(result.tools) > 0
tool_names = [tool.name for tool in result.tools]
logger.info(f"JWT with full consent sees {len(tool_names)} tools")
# Verify both read and write tools are present
read_tools = ["nc_notes_get_note", "nc_webdav_read_file"]
write_tools = ["nc_notes_create_note", "nc_webdav_write_file"]
for tool in read_tools:
assert tool in tool_names, f"Expected read tool {tool} not found"
for tool in write_tools:
assert tool in tool_names, f"Expected write tool {tool} not found"
# Should have all tools
assert len(tool_names) >= 90, f"Expected 90+ tools but got {len(tool_names)}"
logger.info(
f"✅ JWT with full consent: {len(tool_names)} tools visible (all read + write)"
)
if __name__ == "__main__":
pytest.main([__file__, "-v"])