diff --git a/tests/integration/test_app_password_provisioning.py b/tests/integration/test_app_password_provisioning.py index 419cb16..0e6e427 100644 --- a/tests/integration/test_app_password_provisioning.py +++ b/tests/integration/test_app_password_provisioning.py @@ -1,18 +1,21 @@ -"""Integration tests for app password provisioning via Astrolabe. +"""Integration tests for app password provisioning via management API. Tests the complete flow for multi-user BasicAuth mode: -1. User stores app password via Astrolabe API -2. MCP server retrieves it via OAuth client credentials -3. Background sync uses it to access Nextcloud (NOT OAuth refresh tokens) +1. User stores app password via management API endpoint +2. MCP server stores it locally (encrypted) +3. Background sync uses locally stored password to access Nextcloud These tests verify that BasicAuth and OAuth are completely separate concerns with no fallback between them. """ -import pytest +import tempfile +from pathlib import Path -from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient -from nextcloud_mcp_server.config import get_settings +import pytest +from cryptography.fernet import Fernet + +from nextcloud_mcp_server.auth.storage import RefreshTokenStorage from nextcloud_mcp_server.vector.oauth_sync import ( NotProvisionedError, get_user_client, @@ -21,140 +24,60 @@ from nextcloud_mcp_server.vector.oauth_sync import ( ) -@pytest.mark.integration -async def test_astrolabe_client_initialization(): - """Test AstrolabeClient can be instantiated.""" - client = AstrolabeClient( - nextcloud_host="http://localhost:8080", - client_id="test-client", - client_secret="test-secret", - ) +@pytest.fixture +def encryption_key(): + """Generate a test encryption key.""" + return Fernet.generate_key().decode() - assert client is not None - assert client.nextcloud_host == "http://localhost:8080" - assert client.client_id == "test-client" - assert client.client_secret == "test-secret" - assert client._token_cache is None + +@pytest.fixture +async def temp_storage(encryption_key): + """Create temporary storage instance with encryption for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_provisioning.db" + storage = RefreshTokenStorage( + db_path=str(db_path), encryption_key=encryption_key + ) + await storage.initialize() + yield storage @pytest.mark.integration -async def test_astrolabe_client_get_access_token_requires_oidc(): - """Test that getting access token requires OIDC discovery endpoint.""" - client = AstrolabeClient( - nextcloud_host="http://localhost:8080", - client_id="test-client", - client_secret="test-secret", - ) +async def test_basic_auth_mode_uses_local_storage(temp_storage, mocker): + """Test that BasicAuth mode uses locally stored app passwords. - # This will fail without proper OIDC setup, which is expected - # The test verifies the client follows the OAuth client credentials flow - try: - token = await client.get_access_token() - # If we get here, OIDC is configured - assert token is not None - except Exception as e: - # Expected if OIDC not fully configured for test client - # 400/401/403/404 all indicate the flow is working but credentials are invalid - assert any(code in str(e) for code in ["400", "401", "403", "404"]) - - -@pytest.mark.integration -async def test_get_user_app_password_returns_none_for_unconfigured_user(): - """Test that get_user_app_password returns None for users without app passwords.""" - # This requires valid OAuth client credentials - settings = get_settings() - - if not settings.oidc_client_id or not settings.oidc_client_secret: - pytest.skip("OAuth client credentials not configured") - - client = AstrolabeClient( - nextcloud_host=settings.nextcloud_host or "http://localhost:8080", - client_id=settings.oidc_client_id, - client_secret=settings.oidc_client_secret, - ) - - # Try to get app password for a user that hasn't provisioned one - try: - app_password = await client.get_user_app_password("nonexistent_user") - # Should return None for unconfigured user (404 response) - assert app_password is None - except Exception as e: - # May fail with auth error if OAuth not fully configured - assert any(code in str(e) for code in ["400", "401", "403", "404"]) - - -@pytest.mark.integration -async def test_basic_auth_mode_uses_app_password_only(mocker): - """Test that BasicAuth mode uses ONLY app passwords, NOT OAuth tokens. - - In multi-user BasicAuth mode, OAuth refresh tokens are NOT used. - This is a complete separation of concerns. + In multi-user BasicAuth mode, app passwords are stored locally + in the MCP server's database after being provisioned via the API. """ - # Mock settings to have client credentials - mock_settings = mocker.MagicMock() - mock_settings.oidc_client_id = "test-client-id" - mock_settings.oidc_client_secret = "test-client-secret" - mocker.patch( - "nextcloud_mcp_server.vector.oauth_sync.get_settings", - return_value=mock_settings, - ) + # Store an app password in local storage + await temp_storage.store_app_password("test_user", "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB") - # Mock AstrolabeClient to return an app password - mock_astrolabe = mocker.AsyncMock() - mock_astrolabe.get_user_app_password.return_value = "test-app-password-12345" - - mocker.patch( - "nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient", - return_value=mock_astrolabe, - ) - - # Call get_user_client in BasicAuth mode - _client = await get_user_client( + # Call get_user_client_basic_auth with local storage + client = await get_user_client_basic_auth( user_id="test_user", - token_broker=None, # No token broker needed for BasicAuth mode nextcloud_host="http://localhost:8080", - use_basic_auth=True, + storage=temp_storage, ) - # Verify app password was requested - mock_astrolabe.get_user_app_password.assert_called_once_with("test_user") - - # Verify client was created successfully with correct username - assert _client is not None - assert _client.username == "test_user" + # Verify client was created with correct credentials + assert client is not None + assert client.username == "test_user" @pytest.mark.integration -async def test_basic_auth_mode_raises_error_without_app_password(mocker): +async def test_basic_auth_mode_raises_error_without_app_password(temp_storage): """Test that BasicAuth mode raises NotProvisionedError if no app password. There is NO fallback to OAuth - if no app password, user must provision one. """ - # Mock settings to have client credentials - mock_settings = mocker.MagicMock() - mock_settings.oidc_client_id = "test-client-id" - mock_settings.oidc_client_secret = "test-client-secret" - mocker.patch( - "nextcloud_mcp_server.vector.oauth_sync.get_settings", - return_value=mock_settings, - ) + # Don't store any app password - # Mock AstrolabeClient to return None (no app password) - mock_astrolabe = mocker.AsyncMock() - mock_astrolabe.get_user_app_password.return_value = None - - mocker.patch( - "nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient", - return_value=mock_astrolabe, - ) - - # Call get_user_client in BasicAuth mode - should raise NotProvisionedError + # Call get_user_client_basic_auth - should raise NotProvisionedError with pytest.raises(NotProvisionedError) as exc_info: - await get_user_client( + await get_user_client_basic_auth( user_id="test_user", - token_broker=None, nextcloud_host="http://localhost:8080", - use_basic_auth=True, + storage=temp_storage, ) # Verify error message mentions app password provisioning @@ -162,6 +85,33 @@ async def test_basic_auth_mode_raises_error_without_app_password(mocker): assert "test_user" in str(exc_info.value) +@pytest.mark.integration +async def test_get_user_client_dispatches_to_basic_auth(temp_storage, mocker): + """Test that get_user_client dispatches to BasicAuth mode correctly.""" + # Store an app password + await temp_storage.store_app_password("alice", "aaaaa-bbbbb-ccccc-ddddd-eeeee") + + # Mock RefreshTokenStorage.from_env at the source module + mocker.patch( + "nextcloud_mcp_server.auth.storage.RefreshTokenStorage.from_env", + return_value=temp_storage, + ) + # Also mock initialize since from_env returns an uninitialized instance + mocker.patch.object(temp_storage, "initialize", return_value=None) + + # Call get_user_client in BasicAuth mode + client = await get_user_client( + user_id="alice", + token_broker=None, # No token broker needed for BasicAuth mode + nextcloud_host="http://localhost:8080", + use_basic_auth=True, + ) + + # Verify client was created successfully + assert client is not None + assert client.username == "alice" + + @pytest.mark.integration async def test_oauth_mode_uses_refresh_token_only(mocker): """Test that OAuth mode uses ONLY refresh tokens, NOT app passwords. @@ -183,7 +133,7 @@ async def test_oauth_mode_uses_refresh_token_only(mocker): use_basic_auth=False, # OAuth mode ) - # Verify token broker was called (NOT Astrolabe) + # Verify token broker was called mock_token_broker.get_background_token.assert_called_once() @@ -213,38 +163,6 @@ async def test_oauth_mode_raises_error_without_token(mocker): assert "test_user" in str(exc_info.value) -@pytest.mark.integration -async def test_get_user_client_basic_auth_function(mocker): - """Test the dedicated get_user_client_basic_auth function.""" - # Mock settings to have client credentials - mock_settings = mocker.MagicMock() - mock_settings.oidc_client_id = "test-client-id" - mock_settings.oidc_client_secret = "test-client-secret" - mocker.patch( - "nextcloud_mcp_server.vector.oauth_sync.get_settings", - return_value=mock_settings, - ) - - # Mock AstrolabeClient - mock_astrolabe = mocker.AsyncMock() - mock_astrolabe.get_user_app_password.return_value = "xxxxx-xxxxx-xxxxx-xxxxx-xxxxx" - - mocker.patch( - "nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient", - return_value=mock_astrolabe, - ) - - # Call dedicated function - client = await get_user_client_basic_auth( - user_id="alice", - nextcloud_host="http://localhost:8080", - ) - - assert client is not None - assert client.username == "alice" - mock_astrolabe.get_user_app_password.assert_called_once_with("alice") - - @pytest.mark.integration async def test_get_user_client_oauth_function(mocker): """Test the dedicated get_user_client_oauth function.""" @@ -276,3 +194,69 @@ async def test_oauth_mode_requires_token_broker(): nextcloud_host="http://localhost:8080", use_basic_auth=False, # OAuth mode ) + + +@pytest.mark.integration +async def test_multiple_users_basic_auth_mode(temp_storage, mocker): + """Test that multiple users can be provisioned independently.""" + # Store app passwords for multiple users + users = { + "alice": "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa", + "bob": "bbbbb-bbbbb-bbbbb-bbbbb-bbbbb", + "charlie": "ccccc-ccccc-ccccc-ccccc-ccccc", + } + + for user_id, password in users.items(): + await temp_storage.store_app_password(user_id, password) + + # Verify each user can get a client + for user_id in users.keys(): + client = await get_user_client_basic_auth( + user_id=user_id, + nextcloud_host="http://localhost:8080", + storage=temp_storage, + ) + assert client is not None + assert client.username == user_id + + +@pytest.mark.integration +async def test_get_all_provisioned_users(temp_storage): + """Test that we can list all provisioned users for BasicAuth mode.""" + # Store app passwords for multiple users + await temp_storage.store_app_password("alice", "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa") + await temp_storage.store_app_password("bob", "bbbbb-bbbbb-bbbbb-bbbbb-bbbbb") + + # Get all provisioned users + user_ids = await temp_storage.get_all_app_password_user_ids() + + assert len(user_ids) == 2 + assert "alice" in user_ids + assert "bob" in user_ids + + +@pytest.mark.integration +async def test_revoke_app_password(temp_storage): + """Test that deleting app password revokes background access.""" + # Provision user + await temp_storage.store_app_password("alice", "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa") + + # Verify user is provisioned + user_ids = await temp_storage.get_all_app_password_user_ids() + assert "alice" in user_ids + + # Revoke access + deleted = await temp_storage.delete_app_password("alice") + assert deleted is True + + # Verify user is no longer provisioned + user_ids = await temp_storage.get_all_app_password_user_ids() + assert "alice" not in user_ids + + # Verify get_user_client now raises NotProvisionedError + with pytest.raises(NotProvisionedError): + await get_user_client_basic_auth( + user_id="alice", + nextcloud_host="http://localhost:8080", + storage=temp_storage, + ) diff --git a/tests/unit/test_app_password_storage.py b/tests/unit/test_app_password_storage.py new file mode 100644 index 0000000..3743197 --- /dev/null +++ b/tests/unit/test_app_password_storage.py @@ -0,0 +1,227 @@ +""" +Unit tests for App Password Storage functionality. + +Tests the app password methods in RefreshTokenStorage for multi-user +BasicAuth mode background sync. +""" + +import tempfile +from pathlib import Path + +import pytest +from cryptography.fernet import Fernet + +from nextcloud_mcp_server.auth.storage import RefreshTokenStorage + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def encryption_key(): + """Generate a test encryption key.""" + return Fernet.generate_key().decode() + + +@pytest.fixture +async def temp_storage(encryption_key): + """Create temporary storage instance with encryption for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_app_passwords.db" + storage = RefreshTokenStorage( + db_path=str(db_path), encryption_key=encryption_key + ) + await storage.initialize() + yield storage + + +async def test_store_app_password(temp_storage): + """Test storing an app password.""" + await temp_storage.store_app_password( + user_id="testuser", + app_password="JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB", + ) + + # Verify it can be retrieved + retrieved = await temp_storage.get_app_password("testuser") + assert retrieved == "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB" + + +async def test_store_app_password_replaces_existing(temp_storage): + """Test that storing a new app password replaces the existing one.""" + await temp_storage.store_app_password( + user_id="testuser", + app_password="aaaaa-bbbbb-ccccc-ddddd-eeeee", + ) + await temp_storage.store_app_password( + user_id="testuser", + app_password="fffff-ggggg-hhhhh-iiiii-jjjjj", + ) + + retrieved = await temp_storage.get_app_password("testuser") + assert retrieved == "fffff-ggggg-hhhhh-iiiii-jjjjj" + + +async def test_get_app_password_nonexistent(temp_storage): + """Test retrieving app password for non-existent user.""" + retrieved = await temp_storage.get_app_password("nonexistent") + assert retrieved is None + + +async def test_delete_app_password(temp_storage): + """Test deleting an app password.""" + await temp_storage.store_app_password( + user_id="testuser", + app_password="JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB", + ) + + deleted = await temp_storage.delete_app_password("testuser") + assert deleted is True + + # Verify it's gone + retrieved = await temp_storage.get_app_password("testuser") + assert retrieved is None + + +async def test_delete_app_password_nonexistent(temp_storage): + """Test deleting non-existent app password.""" + deleted = await temp_storage.delete_app_password("nonexistent") + assert deleted is False + + +async def test_get_all_app_password_user_ids(temp_storage): + """Test listing all users with app passwords.""" + await temp_storage.store_app_password("alice", "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa") + await temp_storage.store_app_password("bob", "bbbbb-bbbbb-bbbbb-bbbbb-bbbbb") + await temp_storage.store_app_password("charlie", "ccccc-ccccc-ccccc-ccccc-ccccc") + + user_ids = await temp_storage.get_all_app_password_user_ids() + assert len(user_ids) == 3 + assert "alice" in user_ids + assert "bob" in user_ids + assert "charlie" in user_ids + + +async def test_get_all_app_password_user_ids_empty(temp_storage): + """Test listing users when none have app passwords.""" + user_ids = await temp_storage.get_all_app_password_user_ids() + assert len(user_ids) == 0 + + +async def test_app_password_encryption(encryption_key): + """Test that app passwords are encrypted at rest.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_encryption.db" + storage = RefreshTokenStorage( + db_path=str(db_path), encryption_key=encryption_key + ) + await storage.initialize() + + # Store a password + test_password = "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB" + await storage.store_app_password("testuser", test_password) + + # Read directly from database to verify encryption + import aiosqlite + + async with aiosqlite.connect(str(db_path)) as db: + async with db.execute( + "SELECT encrypted_password FROM app_passwords WHERE user_id = ?", + ("testuser",), + ) as cursor: + row = await cursor.fetchone() + + # The stored value should be encrypted (not plain text) + encrypted_bytes = row[0] + assert encrypted_bytes != test_password.encode() + # Encrypted data should be longer due to Fernet overhead + assert len(encrypted_bytes) > len(test_password) + + +async def test_app_password_requires_encryption_key(): + """Test that app password operations require encryption key.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_no_key.db" + storage = RefreshTokenStorage(db_path=str(db_path), encryption_key=None) + await storage.initialize() + + # Storing should fail without encryption key + with pytest.raises(RuntimeError, match="Encryption key not configured"): + await storage.store_app_password( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + + # Getting should also fail without encryption key + with pytest.raises(RuntimeError, match="Encryption key not configured"): + await storage.get_app_password("testuser") + + +async def test_multiple_users_independence(temp_storage): + """Test that different users maintain independent app passwords.""" + users = ["alice", "bob", "charlie", "diana"] + + # Store unique passwords for each user + for i, user in enumerate(users): + password = ( + f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}-" * 4 + + f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}" + ) + await temp_storage.store_app_password(user, password) + + # Verify each user has their correct password + for user in users: + expected = ( + f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}-" * 4 + + f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}" + ) + retrieved = await temp_storage.get_app_password(user) + assert retrieved == expected + + # Delete one user's password + await temp_storage.delete_app_password("bob") + + # Verify other users unchanged + for user in ["alice", "charlie", "diana"]: + retrieved = await temp_storage.get_app_password(user) + assert retrieved is not None + + # Verify bob's password is gone + assert await temp_storage.get_app_password("bob") is None + + +async def test_app_password_with_special_characters(temp_storage): + """Test storing passwords with various alphanumeric patterns.""" + # Nextcloud app passwords use alphanumeric characters + passwords = [ + "AAAAA-BBBBB-CCCCC-DDDDD-EEEEE", # uppercase + "aaaaa-bbbbb-ccccc-ddddd-eeeee", # lowercase + "12345-67890-12345-67890-12345", # numbers + "aB1cD-eF2gH-iJ3kL-mN4oP-qR5sT", # mixed + ] + + for i, password in enumerate(passwords): + user = f"user{i}" + await temp_storage.store_app_password(user, password) + retrieved = await temp_storage.get_app_password(user) + assert retrieved == password + + +async def test_decryption_with_wrong_key(encryption_key): + """Test that decryption fails with wrong key.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_wrong_key.db" + + # Store with original key + storage1 = RefreshTokenStorage( + db_path=str(db_path), encryption_key=encryption_key + ) + await storage1.initialize() + await storage1.store_app_password("testuser", "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB") + + # Try to read with different key + wrong_key = Fernet.generate_key().decode() + storage2 = RefreshTokenStorage(db_path=str(db_path), encryption_key=wrong_key) + await storage2.initialize() + + # Decryption should fail and return None (graceful handling) + retrieved = await storage2.get_app_password("testuser") + assert retrieved is None diff --git a/tests/unit/test_management_app_password_endpoints.py b/tests/unit/test_management_app_password_endpoints.py new file mode 100644 index 0000000..e7d80a7 --- /dev/null +++ b/tests/unit/test_management_app_password_endpoints.py @@ -0,0 +1,489 @@ +""" +Unit tests for Management API app password endpoints. + +Tests the REST API endpoints for multi-user BasicAuth mode app password management: +- POST /api/v1/users/{user_id}/app-password - Provision app password +- GET /api/v1/users/{user_id}/app-password - Check status +- DELETE /api/v1/users/{user_id}/app-password - Delete app password +""" + +import base64 +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest +from cryptography.fernet import Fernet +from starlette.applications import Starlette +from starlette.routing import Route +from starlette.testclient import TestClient + +from nextcloud_mcp_server.api.management import ( + delete_app_password, + get_app_password_status, + provision_app_password, +) +from nextcloud_mcp_server.auth.storage import RefreshTokenStorage + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def encryption_key(): + """Generate a test encryption key.""" + return Fernet.generate_key().decode() + + +@pytest.fixture +async def temp_storage(encryption_key): + """Create temporary storage instance with encryption for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_management.db" + storage = RefreshTokenStorage( + db_path=str(db_path), encryption_key=encryption_key + ) + await storage.initialize() + yield storage + + +def create_basic_auth_header(username: str, password: str) -> str: + """Create BasicAuth header value.""" + credentials = f"{username}:{password}" + encoded = base64.b64encode(credentials.encode()).decode() + return f"Basic {encoded}" + + +def create_test_app(storage): + """Create a test Starlette app with the management endpoints.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + Route( + "/api/v1/users/{user_id}/app-password", + get_app_password_status, + methods=["GET"], + ), + Route( + "/api/v1/users/{user_id}/app-password", + delete_app_password, + methods=["DELETE"], + ), + ] + ) + app.state.storage = storage + return app + + +async def test_provision_app_password_missing_auth(): + """Test that missing auth returns 401.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + response = client.post("/api/v1/users/testuser/app-password") + + assert response.status_code == 401 + assert "Missing BasicAuth" in response.json()["error"] + + +async def test_provision_app_password_invalid_auth_format(): + """Test that invalid auth format returns 401.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + response = client.post( + "/api/v1/users/testuser/app-password", + headers={"Authorization": "Basic invalid-not-base64!!!"}, + ) + + assert response.status_code == 401 + assert "Invalid BasicAuth" in response.json()["error"] + + +async def test_provision_app_password_username_mismatch(): + """Test that username mismatch returns 403.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + # Try to provision for "testuser" but auth as "otheruser" + response = client.post( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "otheruser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 403 + assert "does not match" in response.json()["error"] + + +async def test_provision_app_password_invalid_format(): + """Test that invalid app password format returns 400.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + # Use invalid password format (not xxxxx-xxxxx-xxxxx-xxxxx-xxxxx) + response = client.post( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header("testuser", "invalid-password") + }, + ) + + assert response.status_code == 400 + assert "Invalid app password format" in response.json()["error"] + + +async def test_provision_app_password_success(temp_storage, mocker): + """Test successful app password provisioning.""" + # Mock settings (imported locally in the function) + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client for Nextcloud validation + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"ocs": {"data": {"id": "testuser"}}} + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + # Create app with storage + app = create_test_app(temp_storage) + + client = TestClient(app) + response = client.post( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert "stored" in data["message"].lower() + + # Verify password was stored + stored_password = await temp_storage.get_app_password("testuser") + assert stored_password == "aaaaa-bbbbb-ccccc-ddddd-eeeee" + + +async def test_provision_app_password_nextcloud_validation_fails(mocker): + """Test that failed Nextcloud validation returns 401.""" + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client to return 401 + mock_response = MagicMock() + mock_response.status_code = 401 + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + provision_app_password, + methods=["POST"], + ), + ] + ) + + client = TestClient(app) + response = client.post( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 401 + assert "Invalid app password" in response.json()["error"] + + +async def test_get_app_password_status_provisioned(temp_storage, mocker): + """Test checking status when app password is provisioned.""" + # Store an app password + await temp_storage.store_app_password("testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee") + + app = create_test_app(temp_storage) + + client = TestClient(app) + response = client.get( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["user_id"] == "testuser" + assert data["has_app_password"] is True + + +async def test_get_app_password_status_not_provisioned(temp_storage, mocker): + """Test checking status when app password is not provisioned.""" + app = create_test_app(temp_storage) + + client = TestClient(app) + response = client.get( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["user_id"] == "testuser" + assert data["has_app_password"] is False + + +async def test_get_app_password_status_username_mismatch(): + """Test that username mismatch returns 403 for status check.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + get_app_password_status, + methods=["GET"], + ), + ] + ) + + client = TestClient(app) + response = client.get( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "otheruser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 403 + + +async def test_delete_app_password_success(temp_storage, mocker): + """Test successful app password deletion.""" + # Store an app password + await temp_storage.store_app_password("testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee") + + # Mock settings (imported locally in the function) + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client for Nextcloud validation + mock_response = MagicMock() + mock_response.status_code = 200 + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + app = create_test_app(temp_storage) + + client = TestClient(app) + response = client.delete( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert "deleted" in data["message"].lower() + + # Verify password was removed + stored_password = await temp_storage.get_app_password("testuser") + assert stored_password is None + + +async def test_delete_app_password_not_found(temp_storage, mocker): + """Test deleting non-existent app password.""" + # Mock settings (imported locally in the function) + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client for Nextcloud validation + mock_response = MagicMock() + mock_response.status_code = 200 + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + app = create_test_app(temp_storage) + + client = TestClient(app) + response = client.delete( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert "no app password found" in data["message"].lower() + + +async def test_delete_app_password_invalid_credentials(mocker): + """Test that invalid credentials returns 401 for deletion.""" + mocker.patch( + "nextcloud_mcp_server.config.get_settings", + return_value=MagicMock(nextcloud_host="http://localhost:8080"), + ) + + # Mock httpx client to return 401 + mock_response = MagicMock() + mock_response.status_code = 401 + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + + mocker.patch( + "nextcloud_mcp_server.api.management.httpx.AsyncClient", + return_value=mock_client, + ) + + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + delete_app_password, + methods=["DELETE"], + ), + ] + ) + + client = TestClient(app) + response = client.delete( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "testuser", "wrong-password-xxxxx" + ) + }, + ) + + assert response.status_code == 401 + assert "Invalid credentials" in response.json()["error"] + + +async def test_delete_app_password_username_mismatch(): + """Test that username mismatch returns 403 for deletion.""" + app = Starlette( + routes=[ + Route( + "/api/v1/users/{user_id}/app-password", + delete_app_password, + methods=["DELETE"], + ), + ] + ) + + client = TestClient(app) + response = client.delete( + "/api/v1/users/testuser/app-password", + headers={ + "Authorization": create_basic_auth_header( + "otheruser", "aaaaa-bbbbb-ccccc-ddddd-eeeee" + ) + }, + ) + + assert response.status_code == 403