test: Add comprehensive tests for app password storage and provisioning

- Add 12 unit tests for RefreshTokenStorage app password methods
  - Basic CRUD operations (store, get, delete)
  - Encryption verification (passwords encrypted at rest)
  - Error handling (missing encryption key, wrong key)
  - Multi-user independence

- Add 13 unit tests for Management API endpoints
  - POST /api/v1/users/{user_id}/app-password provisioning
  - GET /api/v1/users/{user_id}/app-password status
  - DELETE /api/v1/users/{user_id}/app-password deletion
  - Auth validation (BasicAuth, username matching)
  - Nextcloud credential validation

- Rewrite 10 integration tests for new architecture
  - Remove AstrolabeClient/OAuth dependency
  - Use local RefreshTokenStorage for app passwords
  - Test BasicAuth and OAuth mode separation
  - Test NotProvisionedError scenarios

Addresses reviewer feedback on PR #473 requiring test coverage.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2026-01-13 21:44:23 +01:00
parent e486e92f91
commit 370c3ff444
3 changed files with 852 additions and 152 deletions
@@ -1,18 +1,21 @@
"""Integration tests for app password provisioning via Astrolabe.
"""Integration tests for app password provisioning via management API.
Tests the complete flow for multi-user BasicAuth mode:
1. User stores app password via Astrolabe API
2. MCP server retrieves it via OAuth client credentials
3. Background sync uses it to access Nextcloud (NOT OAuth refresh tokens)
1. User stores app password via management API endpoint
2. MCP server stores it locally (encrypted)
3. Background sync uses locally stored password to access Nextcloud
These tests verify that BasicAuth and OAuth are completely separate concerns
with no fallback between them.
"""
import pytest
import tempfile
from pathlib import Path
from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient
from nextcloud_mcp_server.config import get_settings
import pytest
from cryptography.fernet import Fernet
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.vector.oauth_sync import (
NotProvisionedError,
get_user_client,
@@ -21,140 +24,60 @@ from nextcloud_mcp_server.vector.oauth_sync import (
)
@pytest.mark.integration
async def test_astrolabe_client_initialization():
"""Test AstrolabeClient can be instantiated."""
client = AstrolabeClient(
nextcloud_host="http://localhost:8080",
client_id="test-client",
client_secret="test-secret",
)
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
return Fernet.generate_key().decode()
assert client is not None
assert client.nextcloud_host == "http://localhost:8080"
assert client.client_id == "test-client"
assert client.client_secret == "test-secret"
assert client._token_cache is None
@pytest.fixture
async def temp_storage(encryption_key):
"""Create temporary storage instance with encryption for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_provisioning.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
yield storage
@pytest.mark.integration
async def test_astrolabe_client_get_access_token_requires_oidc():
"""Test that getting access token requires OIDC discovery endpoint."""
client = AstrolabeClient(
nextcloud_host="http://localhost:8080",
client_id="test-client",
client_secret="test-secret",
)
async def test_basic_auth_mode_uses_local_storage(temp_storage, mocker):
"""Test that BasicAuth mode uses locally stored app passwords.
# This will fail without proper OIDC setup, which is expected
# The test verifies the client follows the OAuth client credentials flow
try:
token = await client.get_access_token()
# If we get here, OIDC is configured
assert token is not None
except Exception as e:
# Expected if OIDC not fully configured for test client
# 400/401/403/404 all indicate the flow is working but credentials are invalid
assert any(code in str(e) for code in ["400", "401", "403", "404"])
@pytest.mark.integration
async def test_get_user_app_password_returns_none_for_unconfigured_user():
"""Test that get_user_app_password returns None for users without app passwords."""
# This requires valid OAuth client credentials
settings = get_settings()
if not settings.oidc_client_id or not settings.oidc_client_secret:
pytest.skip("OAuth client credentials not configured")
client = AstrolabeClient(
nextcloud_host=settings.nextcloud_host or "http://localhost:8080",
client_id=settings.oidc_client_id,
client_secret=settings.oidc_client_secret,
)
# Try to get app password for a user that hasn't provisioned one
try:
app_password = await client.get_user_app_password("nonexistent_user")
# Should return None for unconfigured user (404 response)
assert app_password is None
except Exception as e:
# May fail with auth error if OAuth not fully configured
assert any(code in str(e) for code in ["400", "401", "403", "404"])
@pytest.mark.integration
async def test_basic_auth_mode_uses_app_password_only(mocker):
"""Test that BasicAuth mode uses ONLY app passwords, NOT OAuth tokens.
In multi-user BasicAuth mode, OAuth refresh tokens are NOT used.
This is a complete separation of concerns.
In multi-user BasicAuth mode, app passwords are stored locally
in the MCP server's database after being provisioned via the API.
"""
# Mock settings to have client credentials
mock_settings = mocker.MagicMock()
mock_settings.oidc_client_id = "test-client-id"
mock_settings.oidc_client_secret = "test-client-secret"
mocker.patch(
"nextcloud_mcp_server.vector.oauth_sync.get_settings",
return_value=mock_settings,
)
# Store an app password in local storage
await temp_storage.store_app_password("test_user", "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB")
# Mock AstrolabeClient to return an app password
mock_astrolabe = mocker.AsyncMock()
mock_astrolabe.get_user_app_password.return_value = "test-app-password-12345"
mocker.patch(
"nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient",
return_value=mock_astrolabe,
)
# Call get_user_client in BasicAuth mode
_client = await get_user_client(
# Call get_user_client_basic_auth with local storage
client = await get_user_client_basic_auth(
user_id="test_user",
token_broker=None, # No token broker needed for BasicAuth mode
nextcloud_host="http://localhost:8080",
use_basic_auth=True,
storage=temp_storage,
)
# Verify app password was requested
mock_astrolabe.get_user_app_password.assert_called_once_with("test_user")
# Verify client was created successfully with correct username
assert _client is not None
assert _client.username == "test_user"
# Verify client was created with correct credentials
assert client is not None
assert client.username == "test_user"
@pytest.mark.integration
async def test_basic_auth_mode_raises_error_without_app_password(mocker):
async def test_basic_auth_mode_raises_error_without_app_password(temp_storage):
"""Test that BasicAuth mode raises NotProvisionedError if no app password.
There is NO fallback to OAuth - if no app password, user must provision one.
"""
# Mock settings to have client credentials
mock_settings = mocker.MagicMock()
mock_settings.oidc_client_id = "test-client-id"
mock_settings.oidc_client_secret = "test-client-secret"
mocker.patch(
"nextcloud_mcp_server.vector.oauth_sync.get_settings",
return_value=mock_settings,
)
# Don't store any app password
# Mock AstrolabeClient to return None (no app password)
mock_astrolabe = mocker.AsyncMock()
mock_astrolabe.get_user_app_password.return_value = None
mocker.patch(
"nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient",
return_value=mock_astrolabe,
)
# Call get_user_client in BasicAuth mode - should raise NotProvisionedError
# Call get_user_client_basic_auth - should raise NotProvisionedError
with pytest.raises(NotProvisionedError) as exc_info:
await get_user_client(
await get_user_client_basic_auth(
user_id="test_user",
token_broker=None,
nextcloud_host="http://localhost:8080",
use_basic_auth=True,
storage=temp_storage,
)
# Verify error message mentions app password provisioning
@@ -162,6 +85,33 @@ async def test_basic_auth_mode_raises_error_without_app_password(mocker):
assert "test_user" in str(exc_info.value)
@pytest.mark.integration
async def test_get_user_client_dispatches_to_basic_auth(temp_storage, mocker):
"""Test that get_user_client dispatches to BasicAuth mode correctly."""
# Store an app password
await temp_storage.store_app_password("alice", "aaaaa-bbbbb-ccccc-ddddd-eeeee")
# Mock RefreshTokenStorage.from_env at the source module
mocker.patch(
"nextcloud_mcp_server.auth.storage.RefreshTokenStorage.from_env",
return_value=temp_storage,
)
# Also mock initialize since from_env returns an uninitialized instance
mocker.patch.object(temp_storage, "initialize", return_value=None)
# Call get_user_client in BasicAuth mode
client = await get_user_client(
user_id="alice",
token_broker=None, # No token broker needed for BasicAuth mode
nextcloud_host="http://localhost:8080",
use_basic_auth=True,
)
# Verify client was created successfully
assert client is not None
assert client.username == "alice"
@pytest.mark.integration
async def test_oauth_mode_uses_refresh_token_only(mocker):
"""Test that OAuth mode uses ONLY refresh tokens, NOT app passwords.
@@ -183,7 +133,7 @@ async def test_oauth_mode_uses_refresh_token_only(mocker):
use_basic_auth=False, # OAuth mode
)
# Verify token broker was called (NOT Astrolabe)
# Verify token broker was called
mock_token_broker.get_background_token.assert_called_once()
@@ -213,38 +163,6 @@ async def test_oauth_mode_raises_error_without_token(mocker):
assert "test_user" in str(exc_info.value)
@pytest.mark.integration
async def test_get_user_client_basic_auth_function(mocker):
"""Test the dedicated get_user_client_basic_auth function."""
# Mock settings to have client credentials
mock_settings = mocker.MagicMock()
mock_settings.oidc_client_id = "test-client-id"
mock_settings.oidc_client_secret = "test-client-secret"
mocker.patch(
"nextcloud_mcp_server.vector.oauth_sync.get_settings",
return_value=mock_settings,
)
# Mock AstrolabeClient
mock_astrolabe = mocker.AsyncMock()
mock_astrolabe.get_user_app_password.return_value = "xxxxx-xxxxx-xxxxx-xxxxx-xxxxx"
mocker.patch(
"nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient",
return_value=mock_astrolabe,
)
# Call dedicated function
client = await get_user_client_basic_auth(
user_id="alice",
nextcloud_host="http://localhost:8080",
)
assert client is not None
assert client.username == "alice"
mock_astrolabe.get_user_app_password.assert_called_once_with("alice")
@pytest.mark.integration
async def test_get_user_client_oauth_function(mocker):
"""Test the dedicated get_user_client_oauth function."""
@@ -276,3 +194,69 @@ async def test_oauth_mode_requires_token_broker():
nextcloud_host="http://localhost:8080",
use_basic_auth=False, # OAuth mode
)
@pytest.mark.integration
async def test_multiple_users_basic_auth_mode(temp_storage, mocker):
"""Test that multiple users can be provisioned independently."""
# Store app passwords for multiple users
users = {
"alice": "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa",
"bob": "bbbbb-bbbbb-bbbbb-bbbbb-bbbbb",
"charlie": "ccccc-ccccc-ccccc-ccccc-ccccc",
}
for user_id, password in users.items():
await temp_storage.store_app_password(user_id, password)
# Verify each user can get a client
for user_id in users.keys():
client = await get_user_client_basic_auth(
user_id=user_id,
nextcloud_host="http://localhost:8080",
storage=temp_storage,
)
assert client is not None
assert client.username == user_id
@pytest.mark.integration
async def test_get_all_provisioned_users(temp_storage):
"""Test that we can list all provisioned users for BasicAuth mode."""
# Store app passwords for multiple users
await temp_storage.store_app_password("alice", "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa")
await temp_storage.store_app_password("bob", "bbbbb-bbbbb-bbbbb-bbbbb-bbbbb")
# Get all provisioned users
user_ids = await temp_storage.get_all_app_password_user_ids()
assert len(user_ids) == 2
assert "alice" in user_ids
assert "bob" in user_ids
@pytest.mark.integration
async def test_revoke_app_password(temp_storage):
"""Test that deleting app password revokes background access."""
# Provision user
await temp_storage.store_app_password("alice", "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa")
# Verify user is provisioned
user_ids = await temp_storage.get_all_app_password_user_ids()
assert "alice" in user_ids
# Revoke access
deleted = await temp_storage.delete_app_password("alice")
assert deleted is True
# Verify user is no longer provisioned
user_ids = await temp_storage.get_all_app_password_user_ids()
assert "alice" not in user_ids
# Verify get_user_client now raises NotProvisionedError
with pytest.raises(NotProvisionedError):
await get_user_client_basic_auth(
user_id="alice",
nextcloud_host="http://localhost:8080",
storage=temp_storage,
)
+227
View File
@@ -0,0 +1,227 @@
"""
Unit tests for App Password Storage functionality.
Tests the app password methods in RefreshTokenStorage for multi-user
BasicAuth mode background sync.
"""
import tempfile
from pathlib import Path
import pytest
from cryptography.fernet import Fernet
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
pytestmark = pytest.mark.unit
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
return Fernet.generate_key().decode()
@pytest.fixture
async def temp_storage(encryption_key):
"""Create temporary storage instance with encryption for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_app_passwords.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
yield storage
async def test_store_app_password(temp_storage):
"""Test storing an app password."""
await temp_storage.store_app_password(
user_id="testuser",
app_password="JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB",
)
# Verify it can be retrieved
retrieved = await temp_storage.get_app_password("testuser")
assert retrieved == "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB"
async def test_store_app_password_replaces_existing(temp_storage):
"""Test that storing a new app password replaces the existing one."""
await temp_storage.store_app_password(
user_id="testuser",
app_password="aaaaa-bbbbb-ccccc-ddddd-eeeee",
)
await temp_storage.store_app_password(
user_id="testuser",
app_password="fffff-ggggg-hhhhh-iiiii-jjjjj",
)
retrieved = await temp_storage.get_app_password("testuser")
assert retrieved == "fffff-ggggg-hhhhh-iiiii-jjjjj"
async def test_get_app_password_nonexistent(temp_storage):
"""Test retrieving app password for non-existent user."""
retrieved = await temp_storage.get_app_password("nonexistent")
assert retrieved is None
async def test_delete_app_password(temp_storage):
"""Test deleting an app password."""
await temp_storage.store_app_password(
user_id="testuser",
app_password="JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB",
)
deleted = await temp_storage.delete_app_password("testuser")
assert deleted is True
# Verify it's gone
retrieved = await temp_storage.get_app_password("testuser")
assert retrieved is None
async def test_delete_app_password_nonexistent(temp_storage):
"""Test deleting non-existent app password."""
deleted = await temp_storage.delete_app_password("nonexistent")
assert deleted is False
async def test_get_all_app_password_user_ids(temp_storage):
"""Test listing all users with app passwords."""
await temp_storage.store_app_password("alice", "aaaaa-aaaaa-aaaaa-aaaaa-aaaaa")
await temp_storage.store_app_password("bob", "bbbbb-bbbbb-bbbbb-bbbbb-bbbbb")
await temp_storage.store_app_password("charlie", "ccccc-ccccc-ccccc-ccccc-ccccc")
user_ids = await temp_storage.get_all_app_password_user_ids()
assert len(user_ids) == 3
assert "alice" in user_ids
assert "bob" in user_ids
assert "charlie" in user_ids
async def test_get_all_app_password_user_ids_empty(temp_storage):
"""Test listing users when none have app passwords."""
user_ids = await temp_storage.get_all_app_password_user_ids()
assert len(user_ids) == 0
async def test_app_password_encryption(encryption_key):
"""Test that app passwords are encrypted at rest."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_encryption.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
# Store a password
test_password = "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB"
await storage.store_app_password("testuser", test_password)
# Read directly from database to verify encryption
import aiosqlite
async with aiosqlite.connect(str(db_path)) as db:
async with db.execute(
"SELECT encrypted_password FROM app_passwords WHERE user_id = ?",
("testuser",),
) as cursor:
row = await cursor.fetchone()
# The stored value should be encrypted (not plain text)
encrypted_bytes = row[0]
assert encrypted_bytes != test_password.encode()
# Encrypted data should be longer due to Fernet overhead
assert len(encrypted_bytes) > len(test_password)
async def test_app_password_requires_encryption_key():
"""Test that app password operations require encryption key."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_no_key.db"
storage = RefreshTokenStorage(db_path=str(db_path), encryption_key=None)
await storage.initialize()
# Storing should fail without encryption key
with pytest.raises(RuntimeError, match="Encryption key not configured"):
await storage.store_app_password(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
# Getting should also fail without encryption key
with pytest.raises(RuntimeError, match="Encryption key not configured"):
await storage.get_app_password("testuser")
async def test_multiple_users_independence(temp_storage):
"""Test that different users maintain independent app passwords."""
users = ["alice", "bob", "charlie", "diana"]
# Store unique passwords for each user
for i, user in enumerate(users):
password = (
f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}-" * 4
+ f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}"
)
await temp_storage.store_app_password(user, password)
# Verify each user has their correct password
for user in users:
expected = (
f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}-" * 4
+ f"{user[0]}{user[0]}{user[0]}{user[0]}{user[0]}"
)
retrieved = await temp_storage.get_app_password(user)
assert retrieved == expected
# Delete one user's password
await temp_storage.delete_app_password("bob")
# Verify other users unchanged
for user in ["alice", "charlie", "diana"]:
retrieved = await temp_storage.get_app_password(user)
assert retrieved is not None
# Verify bob's password is gone
assert await temp_storage.get_app_password("bob") is None
async def test_app_password_with_special_characters(temp_storage):
"""Test storing passwords with various alphanumeric patterns."""
# Nextcloud app passwords use alphanumeric characters
passwords = [
"AAAAA-BBBBB-CCCCC-DDDDD-EEEEE", # uppercase
"aaaaa-bbbbb-ccccc-ddddd-eeeee", # lowercase
"12345-67890-12345-67890-12345", # numbers
"aB1cD-eF2gH-iJ3kL-mN4oP-qR5sT", # mixed
]
for i, password in enumerate(passwords):
user = f"user{i}"
await temp_storage.store_app_password(user, password)
retrieved = await temp_storage.get_app_password(user)
assert retrieved == password
async def test_decryption_with_wrong_key(encryption_key):
"""Test that decryption fails with wrong key."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_wrong_key.db"
# Store with original key
storage1 = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage1.initialize()
await storage1.store_app_password("testuser", "JHWzB-ZYgLZ-3qBDj-ZQe5o-LdKpB")
# Try to read with different key
wrong_key = Fernet.generate_key().decode()
storage2 = RefreshTokenStorage(db_path=str(db_path), encryption_key=wrong_key)
await storage2.initialize()
# Decryption should fail and return None (graceful handling)
retrieved = await storage2.get_app_password("testuser")
assert retrieved is None
@@ -0,0 +1,489 @@
"""
Unit tests for Management API app password endpoints.
Tests the REST API endpoints for multi-user BasicAuth mode app password management:
- POST /api/v1/users/{user_id}/app-password - Provision app password
- GET /api/v1/users/{user_id}/app-password - Check status
- DELETE /api/v1/users/{user_id}/app-password - Delete app password
"""
import base64
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from cryptography.fernet import Fernet
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.testclient import TestClient
from nextcloud_mcp_server.api.management import (
delete_app_password,
get_app_password_status,
provision_app_password,
)
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
pytestmark = pytest.mark.unit
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
return Fernet.generate_key().decode()
@pytest.fixture
async def temp_storage(encryption_key):
"""Create temporary storage instance with encryption for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_management.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
yield storage
def create_basic_auth_header(username: str, password: str) -> str:
"""Create BasicAuth header value."""
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded}"
def create_test_app(storage):
"""Create a test Starlette app with the management endpoints."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
Route(
"/api/v1/users/{user_id}/app-password",
get_app_password_status,
methods=["GET"],
),
Route(
"/api/v1/users/{user_id}/app-password",
delete_app_password,
methods=["DELETE"],
),
]
)
app.state.storage = storage
return app
async def test_provision_app_password_missing_auth():
"""Test that missing auth returns 401."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
response = client.post("/api/v1/users/testuser/app-password")
assert response.status_code == 401
assert "Missing BasicAuth" in response.json()["error"]
async def test_provision_app_password_invalid_auth_format():
"""Test that invalid auth format returns 401."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
response = client.post(
"/api/v1/users/testuser/app-password",
headers={"Authorization": "Basic invalid-not-base64!!!"},
)
assert response.status_code == 401
assert "Invalid BasicAuth" in response.json()["error"]
async def test_provision_app_password_username_mismatch():
"""Test that username mismatch returns 403."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
# Try to provision for "testuser" but auth as "otheruser"
response = client.post(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"otheruser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 403
assert "does not match" in response.json()["error"]
async def test_provision_app_password_invalid_format():
"""Test that invalid app password format returns 400."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
# Use invalid password format (not xxxxx-xxxxx-xxxxx-xxxxx-xxxxx)
response = client.post(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header("testuser", "invalid-password")
},
)
assert response.status_code == 400
assert "Invalid app password format" in response.json()["error"]
async def test_provision_app_password_success(temp_storage, mocker):
"""Test successful app password provisioning."""
# Mock settings (imported locally in the function)
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client for Nextcloud validation
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"ocs": {"data": {"id": "testuser"}}}
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
# Create app with storage
app = create_test_app(temp_storage)
client = TestClient(app)
response = client.post(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert "stored" in data["message"].lower()
# Verify password was stored
stored_password = await temp_storage.get_app_password("testuser")
assert stored_password == "aaaaa-bbbbb-ccccc-ddddd-eeeee"
async def test_provision_app_password_nextcloud_validation_fails(mocker):
"""Test that failed Nextcloud validation returns 401."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client to return 401
mock_response = MagicMock()
mock_response.status_code = 401
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
),
]
)
client = TestClient(app)
response = client.post(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 401
assert "Invalid app password" in response.json()["error"]
async def test_get_app_password_status_provisioned(temp_storage, mocker):
"""Test checking status when app password is provisioned."""
# Store an app password
await temp_storage.store_app_password("testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee")
app = create_test_app(temp_storage)
client = TestClient(app)
response = client.get(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["user_id"] == "testuser"
assert data["has_app_password"] is True
async def test_get_app_password_status_not_provisioned(temp_storage, mocker):
"""Test checking status when app password is not provisioned."""
app = create_test_app(temp_storage)
client = TestClient(app)
response = client.get(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["user_id"] == "testuser"
assert data["has_app_password"] is False
async def test_get_app_password_status_username_mismatch():
"""Test that username mismatch returns 403 for status check."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
get_app_password_status,
methods=["GET"],
),
]
)
client = TestClient(app)
response = client.get(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"otheruser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 403
async def test_delete_app_password_success(temp_storage, mocker):
"""Test successful app password deletion."""
# Store an app password
await temp_storage.store_app_password("testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee")
# Mock settings (imported locally in the function)
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client for Nextcloud validation
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
app = create_test_app(temp_storage)
client = TestClient(app)
response = client.delete(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert "deleted" in data["message"].lower()
# Verify password was removed
stored_password = await temp_storage.get_app_password("testuser")
assert stored_password is None
async def test_delete_app_password_not_found(temp_storage, mocker):
"""Test deleting non-existent app password."""
# Mock settings (imported locally in the function)
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client for Nextcloud validation
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
app = create_test_app(temp_storage)
client = TestClient(app)
response = client.delete(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert "no app password found" in data["message"].lower()
async def test_delete_app_password_invalid_credentials(mocker):
"""Test that invalid credentials returns 401 for deletion."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
return_value=MagicMock(nextcloud_host="http://localhost:8080"),
)
# Mock httpx client to return 401
mock_response = MagicMock()
mock_response.status_code = 401
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.management.httpx.AsyncClient",
return_value=mock_client,
)
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
delete_app_password,
methods=["DELETE"],
),
]
)
client = TestClient(app)
response = client.delete(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"testuser", "wrong-password-xxxxx"
)
},
)
assert response.status_code == 401
assert "Invalid credentials" in response.json()["error"]
async def test_delete_app_password_username_mismatch():
"""Test that username mismatch returns 403 for deletion."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/app-password",
delete_app_password,
methods=["DELETE"],
),
]
)
client = TestClient(app)
response = client.delete(
"/api/v1/users/testuser/app-password",
headers={
"Authorization": create_basic_auth_header(
"otheruser", "aaaaa-bbbbb-ccccc-ddddd-eeeee"
)
},
)
assert response.status_code == 403