feat: Add rate limiting and extract helpers for app password endpoints

Security improvements:
- Add in-memory rate limiter for app password provisioning (5 attempts/hour/user)
- Returns 429 Too Many Requests with Retry-After header when limit exceeded
- Rate limiting is per-user to prevent cross-user DoS

Code quality improvements:
- Extract _extract_basic_auth() helper to reduce duplication across 3 endpoints
- Move base64, re imports to module level
- Add APP_PASSWORD_PATTERN constant for regex validation
- Add NEXTCLOUD_VALIDATION_TIMEOUT constant (10s)

Test coverage:
- Add test_provision_app_password_rate_limiting
- Add test_rate_limiting_is_per_user
- Add autouse fixture to clear rate limit state between tests
- Total: 15 tests for management API endpoints

Addresses reviewer feedback on PR #473.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2026-01-14 14:02:00 +01:00
parent 6affad1c8b
commit f15baefe7e
2 changed files with 300 additions and 95 deletions
@@ -18,6 +18,7 @@ from starlette.applications import Starlette
from starlette.routing import Route
from starlette.testclient import TestClient
from nextcloud_mcp_server.api import management
from nextcloud_mcp_server.api.management import (
delete_app_password,
get_app_password_status,
@@ -28,6 +29,14 @@ from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
pytestmark = pytest.mark.unit
@pytest.fixture(autouse=True)
def clear_rate_limit():
"""Clear rate limit state before each test."""
management._rate_limit_attempts.clear()
yield
management._rate_limit_attempts.clear()
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
@@ -487,3 +496,129 @@ async def test_delete_app_password_username_mismatch():
)
assert response.status_code == 403
async def test_provision_app_password_rate_limiting(mocker):
"""Test that rate limiting blocks excessive provisioning attempts."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client to return 401 (failed validation)
mock_response = MagicMock()
mock_response.status_code = 401
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
# Make 5 failed attempts (should all return 401)
for i in range(5):
response = client.post(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 401, f"Attempt {i + 1} should return 401"
# 6th attempt should be rate limited (429)
response = client.post(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 429
assert "Rate limit exceeded" in response.json()["error"]
assert "Retry-After" in response.headers
async def test_rate_limiting_is_per_user(mocker):
"""Test that rate limiting is applied per user, not globally."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client to return 401
mock_response = MagicMock()
mock_response.status_code = 401
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
# Make 5 failed attempts for user1 (hits rate limit)
for _ in range(5):
client.post(
"/api/v1/users/user1/app-password",
headers={
"Authorization": create_basic_auth_header(
"user1", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
# user1 should be rate limited
response = client.post(
"/api/v1/users/user1/app-password",
headers={
"Authorization": create_basic_auth_header(
"user1", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 429
# user2 should NOT be rate limited (different user)
response = client.post(
"/api/v1/users/user2/app-password",
headers={
"Authorization": create_basic_auth_header(
"user2", "bbbbb-ccccc-ddddd-eeeee-fffff"
)
},
)
assert response.status_code == 401 # Fails validation, but not rate limited