Files
Chris Coutinho 8b5c2395b5 feat: add Docker Compose profiles and Login Flow v2 service
Add selective service startup via Docker Compose profiles so each MCP
deployment mode runs independently. Also add the new mcp-login-flow
service (port 8004) for Login Flow v2 authentication (ADR-022).

Profile assignments:
- single-user: mcp (port 8000)
- multi-user-basic: mcp-multi-user-basic (port 8003)
- oauth: mcp-oauth (port 8001)
- keycloak: keycloak + mcp-keycloak (port 8002)
- login-flow: mcp-login-flow (port 8004)

Infrastructure services (db, redis, app, recipes) always start.

Integration tests cover the full Login Flow v2 provisioning flow:
OAuth → browser login → app password → Nextcloud API access for
notes, calendar, contacts, files, deck, and cookbook operations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 20:33:54 +01:00

199 lines
6.6 KiB
Python

"""Unit tests for Login Flow v2 MCP auth tools.
Tests the auth tools logic with mocked storage and Login Flow client.
"""
import tempfile
from pathlib import Path
import pytest
from cryptography.fernet import Fernet
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.models.auth import ALL_SUPPORTED_SCOPES
pytestmark = pytest.mark.unit
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
return Fernet.generate_key().decode()
@pytest.fixture
async def temp_storage(encryption_key):
"""Create temporary storage with encryption for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_auth_tools.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
yield storage
async def test_store_app_password_with_scopes(temp_storage):
"""Test storing app password with scopes."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="aaaaa-bbbbb-ccccc-ddddd-eeeee",
scopes=["notes:read", "notes:write"],
username="alice_nc",
)
data = await temp_storage.get_app_password_with_scopes("alice")
assert data is not None
assert data["app_password"] == "aaaaa-bbbbb-ccccc-ddddd-eeeee"
assert data["scopes"] == ["notes:read", "notes:write"]
assert data["username"] == "alice_nc"
assert data["created_at"] is not None
assert data["updated_at"] is not None
async def test_store_app_password_null_scopes(temp_storage):
"""Test storing app password with NULL scopes (all allowed)."""
await temp_storage.store_app_password_with_scopes(
user_id="bob",
app_password="fffff-ggggg-hhhhh-iiiii-jjjjj",
scopes=None,
)
data = await temp_storage.get_app_password_with_scopes("bob")
assert data is not None
assert data["scopes"] is None # NULL = all scopes allowed
assert data["username"] is None
async def test_store_app_password_with_scopes_replaces(temp_storage):
"""Test that storing replaces existing record."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="aaaaa-bbbbb-ccccc-ddddd-eeeee",
scopes=["notes:read"],
)
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="xxxxx-yyyyy-zzzzz-aaaaa-bbbbb",
scopes=["notes:read", "calendar:read"],
username="alice_nc",
)
data = await temp_storage.get_app_password_with_scopes("alice")
assert data["app_password"] == "xxxxx-yyyyy-zzzzz-aaaaa-bbbbb"
assert data["scopes"] == ["notes:read", "calendar:read"]
async def test_get_app_password_with_scopes_nonexistent(temp_storage):
"""Test getting scoped password for non-existent user."""
data = await temp_storage.get_app_password_with_scopes("nonexistent")
assert data is None
# ── Login Flow Session Tests ──
async def test_store_and_get_login_flow_session(temp_storage):
"""Test storing and retrieving a login flow session."""
await temp_storage.store_login_flow_session(
user_id="alice",
poll_token="secret-poll-token",
poll_endpoint="https://cloud.example.com/login/v2/poll",
requested_scopes=["notes:read", "notes:write"],
)
session = await temp_storage.get_login_flow_session("alice")
assert session is not None
assert session["poll_token"] == "secret-poll-token"
assert session["poll_endpoint"] == "https://cloud.example.com/login/v2/poll"
assert session["requested_scopes"] == ["notes:read", "notes:write"]
assert session["created_at"] is not None
assert session["expires_at"] is not None
async def test_get_login_flow_session_nonexistent(temp_storage):
"""Test getting session for user with no pending flow."""
session = await temp_storage.get_login_flow_session("nonexistent")
assert session is None
async def test_get_login_flow_session_expired(temp_storage):
"""Test that expired sessions are not returned."""
await temp_storage.store_login_flow_session(
user_id="alice",
poll_token="expired-token",
poll_endpoint="https://cloud.example.com/login/v2/poll",
expires_at=1, # Expired long ago
)
session = await temp_storage.get_login_flow_session("alice")
assert session is None
async def test_delete_login_flow_session(temp_storage):
"""Test deleting a login flow session."""
await temp_storage.store_login_flow_session(
user_id="alice",
poll_token="token",
poll_endpoint="https://cloud.example.com/poll",
)
deleted = await temp_storage.delete_login_flow_session("alice")
assert deleted is True
# Verify it's gone
session = await temp_storage.get_login_flow_session("alice")
assert session is None
async def test_delete_login_flow_session_nonexistent(temp_storage):
"""Test deleting a non-existent session returns False."""
deleted = await temp_storage.delete_login_flow_session("nonexistent")
assert deleted is False
async def test_delete_expired_login_flow_sessions(temp_storage):
"""Test cleanup of expired sessions."""
# Store 2 expired and 1 valid session
await temp_storage.store_login_flow_session(
user_id="expired1",
poll_token="t1",
poll_endpoint="https://cloud.example.com/poll",
expires_at=1,
)
await temp_storage.store_login_flow_session(
user_id="expired2",
poll_token="t2",
poll_endpoint="https://cloud.example.com/poll",
expires_at=2,
)
await temp_storage.store_login_flow_session(
user_id="valid",
poll_token="t3",
poll_endpoint="https://cloud.example.com/poll",
# Default expiry = 20 minutes from now
)
count = await temp_storage.delete_expired_login_flow_sessions()
assert count == 2
# Valid session should still exist
session = await temp_storage.get_login_flow_session("valid")
assert session is not None
# ── Response Model Tests ──
def test_all_supported_scopes():
"""Test that ALL_SUPPORTED_SCOPES contains expected scopes."""
assert "notes:read" in ALL_SUPPORTED_SCOPES
assert "notes:write" in ALL_SUPPORTED_SCOPES
assert "calendar:read" in ALL_SUPPORTED_SCOPES
assert "files:read" in ALL_SUPPORTED_SCOPES
assert "deck:read" in ALL_SUPPORTED_SCOPES
# Scopes should be in pairs (read/write)
read_scopes = [s for s in ALL_SUPPORTED_SCOPES if s.endswith(":read")]
write_scopes = [s for s in ALL_SUPPORTED_SCOPES if s.endswith(":write")]
assert len(read_scopes) == len(write_scopes)