diff --git a/nextcloud_mcp_server/api/management.py b/nextcloud_mcp_server/api/management.py index a21b2ce..4922939 100644 --- a/nextcloud_mcp_server/api/management.py +++ b/nextcloud_mcp_server/api/management.py @@ -10,8 +10,11 @@ All endpoints use OAuth bearer token authentication via UnifiedTokenVerifier. The PHP app obtains tokens through PKCE flow and uses them to access these endpoints. """ +import base64 import logging +import re import time +from collections import defaultdict from importlib.metadata import version from typing import TYPE_CHECKING, Any @@ -28,6 +31,23 @@ logger = logging.getLogger(__name__) # Get package version from metadata __version__ = version("nextcloud-mcp-server") +# App password format regex (Nextcloud format: xxxxx-xxxxx-xxxxx-xxxxx-xxxxx) +APP_PASSWORD_PATTERN = re.compile( + r"^[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}$" +) + +# Timeout for Nextcloud API validation requests (seconds) +NEXTCLOUD_VALIDATION_TIMEOUT = 10.0 + +# Rate limiting configuration for app password provisioning +# Limits: 5 attempts per user per hour +RATE_LIMIT_MAX_ATTEMPTS = 5 +RATE_LIMIT_WINDOW_SECONDS = 3600 # 1 hour + +# In-memory rate limiter storage +# Structure: {user_id: [(timestamp, success), ...]} +_rate_limit_attempts: dict[str, list[tuple[float, bool]]] = defaultdict(list) + # Track server start time for uptime calculation _server_start_time = time.time() @@ -209,6 +229,116 @@ async def _get_app_password_storage(request: Request) -> "RefreshTokenStorage": return storage +def _check_rate_limit(user_id: str) -> tuple[bool, int]: + """Check if user is rate limited for app password operations. + + Implements a sliding window rate limiter to prevent brute-force attacks + on the app password provisioning endpoint. + + Args: + user_id: User identifier to check + + Returns: + Tuple of (is_allowed, seconds_until_retry) + - is_allowed: True if request should be allowed + - seconds_until_retry: Seconds to wait if rate limited (0 if allowed) + """ + current_time = time.time() + window_start = current_time - RATE_LIMIT_WINDOW_SECONDS + + # Clean up old attempts outside the window + _rate_limit_attempts[user_id] = [ + (ts, success) + for ts, success in _rate_limit_attempts[user_id] + if ts > window_start + ] + + # Count recent attempts (both successful and failed) + recent_attempts = len(_rate_limit_attempts[user_id]) + + if recent_attempts >= RATE_LIMIT_MAX_ATTEMPTS: + # Find when the oldest attempt in the window will expire + oldest_attempt = min(ts for ts, _ in _rate_limit_attempts[user_id]) + seconds_until_retry = int( + oldest_attempt + RATE_LIMIT_WINDOW_SECONDS - current_time + ) + return False, max(1, seconds_until_retry) + + return True, 0 + + +def _record_rate_limit_attempt(user_id: str, success: bool) -> None: + """Record an app password provisioning attempt for rate limiting. + + Args: + user_id: User identifier + success: Whether the attempt was successful + """ + _rate_limit_attempts[user_id].append((time.time(), success)) + + +def _extract_basic_auth( + request: Request, path_user_id: str +) -> tuple[str, str, JSONResponse | None]: + """Extract and validate BasicAuth credentials from request. + + Validates: + 1. Authorization header is present and valid BasicAuth format + 2. Username in credentials matches the path user_id + + Args: + request: Starlette request with Authorization header + path_user_id: User ID from the URL path to verify against + + Returns: + Tuple of (username, password, error_response) + - If successful: (username, password, None) + - If failed: ("", "", JSONResponse with error) + """ + auth_header = request.headers.get("Authorization") + + if not auth_header or not auth_header.startswith("Basic "): + return ( + "", + "", + JSONResponse( + {"success": False, "error": "Missing BasicAuth credentials"}, + status_code=401, + ), + ) + + try: + # Decode BasicAuth + encoded = auth_header.split(" ", 1)[1] + decoded = base64.b64decode(encoded).decode("utf-8") + username, password = decoded.split(":", 1) + except Exception: + return ( + "", + "", + JSONResponse( + {"success": False, "error": "Invalid BasicAuth format"}, + status_code=401, + ), + ) + + # Verify username matches path user_id + if username != path_user_id: + logger.warning( + f"Username mismatch in app password operation for path user {path_user_id}" + ) + return ( + "", + "", + JSONResponse( + {"success": False, "error": "Username does not match path user_id"}, + status_code=403, + ), + ) + + return username, password, None + + async def get_server_status(request: Request) -> JSONResponse: """GET /api/v1/status - Server status and version. @@ -555,8 +685,9 @@ async def provision_app_password(request: Request) -> JSONResponse: - User identity is verified via BasicAuth against Nextcloud - App password is encrypted before storage - Only the user who owns the password can provision it + - Rate limited to prevent brute-force attacks """ - import base64 + from nextcloud_mcp_server.config import get_settings # Get user_id from path path_user_id = request.path_params.get("user_id") @@ -566,51 +697,36 @@ async def provision_app_password(request: Request) -> JSONResponse: status_code=400, ) - # Extract BasicAuth credentials - auth_header = request.headers.get("Authorization") - if not auth_header or not auth_header.startswith("Basic "): - return JSONResponse( - {"success": False, "error": "Missing BasicAuth credentials"}, - status_code=401, - ) - - try: - # Decode BasicAuth - encoded = auth_header.split(" ", 1)[1] - decoded = base64.b64decode(encoded).decode("utf-8") - username, app_password = decoded.split(":", 1) - except Exception: - return JSONResponse( - {"success": False, "error": "Invalid BasicAuth format"}, - status_code=401, - ) - - # Verify username matches path user_id - if username != path_user_id: + # Check rate limit before processing + is_allowed, retry_after = _check_rate_limit(path_user_id) + if not is_allowed: logger.warning( - f"Username mismatch in app password provisioning: " - f"path={path_user_id}, auth={username}" + f"Rate limit exceeded for app password provisioning: {path_user_id}" ) return JSONResponse( - {"success": False, "error": "Username does not match path user_id"}, - status_code=403, + { + "success": False, + "error": f"Rate limit exceeded. Try again in {retry_after} seconds.", + }, + status_code=429, + headers={"Retry-After": str(retry_after)}, ) - # Validate app password format (xxxxx-xxxxx-xxxxx-xxxxx-xxxxx) - import re + # Extract and validate BasicAuth credentials + username, app_password, error_response = _extract_basic_auth(request, path_user_id) + if error_response is not None: + _record_rate_limit_attempt(path_user_id, success=False) + return error_response - if not re.match( - r"^[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}$", - app_password, - ): + # Validate app password format + if not APP_PASSWORD_PATTERN.match(app_password): + _record_rate_limit_attempt(path_user_id, success=False) return JSONResponse( {"success": False, "error": "Invalid app password format"}, status_code=400, ) # Get Nextcloud host from settings - from nextcloud_mcp_server.config import get_settings - settings = get_settings() nextcloud_host = settings.nextcloud_host @@ -623,7 +739,7 @@ async def provision_app_password(request: Request) -> JSONResponse: # Validate app password against Nextcloud try: - async with httpx.AsyncClient(timeout=10.0) as client: + async with httpx.AsyncClient(timeout=NEXTCLOUD_VALIDATION_TIMEOUT) as client: # Use OCS API to verify credentials test_url = f"{nextcloud_host}/ocs/v1.php/cloud/user" response = await client.get( @@ -635,9 +751,9 @@ async def provision_app_password(request: Request) -> JSONResponse: if response.status_code != 200: logger.warning( - f"App password validation failed for {username}: " - f"HTTP {response.status_code}" + f"App password validation failed for user: HTTP {response.status_code}" ) + _record_rate_limit_attempt(path_user_id, success=False) return JSONResponse( {"success": False, "error": "Invalid app password"}, status_code=401, @@ -647,9 +763,8 @@ async def provision_app_password(request: Request) -> JSONResponse: data = response.json() ocs_user_id = data.get("ocs", {}).get("data", {}).get("id") if ocs_user_id != username: - logger.warning( - f"User ID mismatch: expected {username}, got {ocs_user_id}" - ) + logger.warning("User ID mismatch in OCS response") + _record_rate_limit_attempt(path_user_id, success=False) return JSONResponse( {"success": False, "error": "User ID mismatch"}, status_code=403, @@ -667,6 +782,7 @@ async def provision_app_password(request: Request) -> JSONResponse: storage = await _get_app_password_storage(request) await storage.store_app_password(username, app_password) + _record_rate_limit_attempt(path_user_id, success=True) logger.info(f"Provisioned app password for user: {username}") return JSONResponse( @@ -691,8 +807,6 @@ async def get_app_password_status(request: Request) -> JSONResponse: Requires BasicAuth with the user's app password for authentication. """ - import base64 - # Get user_id from path path_user_id = request.path_params.get("user_id") if not path_user_id: @@ -701,31 +815,10 @@ async def get_app_password_status(request: Request) -> JSONResponse: status_code=400, ) - # Extract BasicAuth credentials - auth_header = request.headers.get("Authorization") - if not auth_header or not auth_header.startswith("Basic "): - return JSONResponse( - {"success": False, "error": "Missing BasicAuth credentials"}, - status_code=401, - ) - - try: - # Decode BasicAuth - encoded = auth_header.split(" ", 1)[1] - decoded = base64.b64decode(encoded).decode("utf-8") - username, _ = decoded.split(":", 1) - except Exception: - return JSONResponse( - {"success": False, "error": "Invalid BasicAuth format"}, - status_code=401, - ) - - # Verify username matches path user_id - if username != path_user_id: - return JSONResponse( - {"success": False, "error": "Username does not match path user_id"}, - status_code=403, - ) + # Extract and validate BasicAuth credentials + username, _, error_response = _extract_basic_auth(request, path_user_id) + if error_response is not None: + return error_response try: storage = await _get_app_password_storage(request) @@ -754,8 +847,6 @@ async def delete_app_password(request: Request) -> JSONResponse: Requires BasicAuth with the user's credentials. """ - import base64 - from nextcloud_mcp_server.config import get_settings # Get user_id from path @@ -766,38 +857,17 @@ async def delete_app_password(request: Request) -> JSONResponse: status_code=400, ) - # Extract BasicAuth credentials - auth_header = request.headers.get("Authorization") - if not auth_header or not auth_header.startswith("Basic "): - return JSONResponse( - {"success": False, "error": "Missing BasicAuth credentials"}, - status_code=401, - ) - - try: - # Decode BasicAuth - encoded = auth_header.split(" ", 1)[1] - decoded = base64.b64decode(encoded).decode("utf-8") - username, password = decoded.split(":", 1) - except Exception: - return JSONResponse( - {"success": False, "error": "Invalid BasicAuth format"}, - status_code=401, - ) - - # Verify username matches path user_id - if username != path_user_id: - return JSONResponse( - {"success": False, "error": "Username does not match path user_id"}, - status_code=403, - ) + # Extract and validate BasicAuth credentials + username, password, error_response = _extract_basic_auth(request, path_user_id) + if error_response is not None: + return error_response # Validate credentials against Nextcloud settings = get_settings() nextcloud_host = settings.nextcloud_host try: - async with httpx.AsyncClient(timeout=10.0) as client: + async with httpx.AsyncClient(timeout=NEXTCLOUD_VALIDATION_TIMEOUT) as client: test_url = f"{nextcloud_host}/ocs/v1.php/cloud/user" response = await client.get( test_url, diff --git a/tests/unit/test_management_app_password_endpoints.py b/tests/unit/test_management_app_password_endpoints.py index e7d80a7..c7ce9f1 100644 --- a/tests/unit/test_management_app_password_endpoints.py +++ b/tests/unit/test_management_app_password_endpoints.py @@ -18,6 +18,7 @@ from starlette.applications import Starlette from starlette.routing import Route from starlette.testclient import TestClient +from nextcloud_mcp_server.api import management from nextcloud_mcp_server.api.management import ( delete_app_password, get_app_password_status, @@ -28,6 +29,14 @@ from nextcloud_mcp_server.auth.storage import RefreshTokenStorage pytestmark = pytest.mark.unit +@pytest.fixture(autouse=True) +def clear_rate_limit(): + """Clear rate limit state before each test.""" + management._rate_limit_attempts.clear() + yield + management._rate_limit_attempts.clear() + + @pytest.fixture def encryption_key(): """Generate a test encryption key.""" @@ -487,3 +496,129 @@ async def test_delete_app_password_username_mismatch(): ) assert response.status_code == 403 + + +async def test_provision_app_password_rate_limiting(mocker): + """Test that rate limiting blocks excessive provisioning attempts.""" + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client to return 401 (failed validation) + mock_response = MagicMock() + mock_response.status_code = 401 + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + + # Make 5 failed attempts (should all return 401) + for i in range(5): + response = client.post( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + assert response.status_code == 401, f"Attempt {i + 1} should return 401" + + # 6th attempt should be rate limited (429) + response = client.post( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + assert response.status_code == 429 + assert "Rate limit exceeded" in response.json()["error"] + assert "Retry-After" in response.headers + + +async def test_rate_limiting_is_per_user(mocker): + """Test that rate limiting is applied per user, not globally.""" + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client to return 401 + mock_response = MagicMock() + mock_response.status_code = 401 + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + + # Make 5 failed attempts for user1 (hits rate limit) + for _ in range(5): + client.post( + "/api/v1/users/user1/app-password", + headers={ + "Authorization": create_basic_auth_header( + "user1", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + # user1 should be rate limited + response = client.post( + "/api/v1/users/user1/app-password", + headers={ + "Authorization": create_basic_auth_header( + "user1", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + assert response.status_code == 429 + + # user2 should NOT be rate limited (different user) + response = client.post( + "/api/v1/users/user2/app-password", + headers={ + "Authorization": create_basic_auth_header( + "user2", "bbbbb-ccccc-ddddd-eeeee-fffff" + ) + }, + ) + assert response.status_code == 401 # Fails validation, but not rate limited