Merge pull request #589 from cbcoutinho/feat/docker-compose-profiles-login-flow

feat: Docker Compose profiles and Login Flow v2 integration tests
This commit is contained in:
Chris Coutinho
2026-03-03 09:41:48 +01:00
committed by GitHub
58 changed files with 5251 additions and 389 deletions
-7
View File
@@ -1109,13 +1109,6 @@ def oauth_callback_server():
The server automatically shuts down when the fixture is torn down.
"""
# Skip OAuth tests in GitHub Actions - Playwright browser automation
# has issues with localhost callback server in CI environment
# if os.getenv("GITHUB_ACTIONS"):
# pytest.skip(
# "OAuth tests with browser automation not supported in GitHub Actions CI"
# )
# Use a dict to store auth codes keyed by state parameter
# This allows multiple concurrent OAuth flows
auth_states = {}
@@ -27,7 +27,7 @@ from playwright.async_api import Page
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
pytestmark = [pytest.mark.integration, pytest.mark.multi_user_basic]
async def login_to_nextcloud(page: Page, username: str, password: str):
@@ -899,7 +899,7 @@ def clear_stale_test_state(clear_preferences: bool = False) -> None:
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_multi_user_astrolabe_background_sync_enablement(
browser,
nc_client,
@@ -1246,7 +1246,7 @@ async def verify_app_password_deleted(username: str) -> bool:
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_revoke_background_sync_access(
browser,
nc_client,
@@ -35,7 +35,7 @@ from tests.integration.test_astrolabe_multi_user_background_sync import (
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
pytestmark = [pytest.mark.integration, pytest.mark.multi_user_basic]
async def wait_for_vector_sync(
@@ -101,7 +101,7 @@ async def navigate_to_astrolabe_main(page: Page):
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
@pytest.mark.timeout(
300
) # 5 minutes - this test involves OAuth, app password, and vector sync
@@ -30,7 +30,7 @@ import anyio
import pytest
from playwright.async_api import Page
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
pytestmark = [pytest.mark.integration, pytest.mark.multi_user_basic]
logger = logging.getLogger(__name__)
@@ -334,7 +334,7 @@ def delete_user_credentials(username: str) -> bool:
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_app_password_storage_and_cleanup(
browser,
nc_client,
@@ -440,7 +440,7 @@ async def test_app_password_storage_and_cleanup(
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_credential_isolation_between_users(
browser,
nc_client,
@@ -549,7 +549,7 @@ async def test_credential_isolation_between_users(
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_credential_revoke_and_reprovision(
browser,
nc_client,
@@ -10,6 +10,7 @@ import pytest
@pytest.mark.integration
@pytest.mark.multi_user_basic
async def test_basic_auth_pass_through_notes_search(nc_mcp_basic_auth_client):
"""Test BasicAuth pass-through with notes search tool."""
# Call tool - BasicAuth header is set at connection level by fixture
@@ -27,6 +28,7 @@ async def test_basic_auth_pass_through_notes_search(nc_mcp_basic_auth_client):
@pytest.mark.integration
@pytest.mark.multi_user_basic
async def test_basic_auth_pass_through_notes_create(nc_mcp_basic_auth_client):
"""Test BasicAuth pass-through with notes create tool."""
# Create a note using BasicAuth
@@ -47,6 +49,7 @@ async def test_basic_auth_pass_through_notes_create(nc_mcp_basic_auth_client):
@pytest.mark.integration
@pytest.mark.multi_user_basic
async def test_basic_auth_pass_through_get_note(nc_mcp_basic_auth_client):
"""Test BasicAuth pass-through with get note tool."""
# First create a note to get
+2 -2
View File
@@ -34,7 +34,7 @@ async def test_query_idp_userinfo_success(mocker):
mock_client.__aexit__.return_value = None
mocker.patch(
"nextcloud_mcp_server.auth.userinfo_routes.httpx.AsyncClient",
"nextcloud_mcp_server.auth.userinfo_routes.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -59,7 +59,7 @@ async def test_query_idp_userinfo_failure(mocker):
mock_client.__aexit__.return_value = None
mocker.patch(
"nextcloud_mcp_server.auth.userinfo_routes.httpx.AsyncClient",
"nextcloud_mcp_server.auth.userinfo_routes.nextcloud_httpx_client",
return_value=mock_client,
)
View File
+417
View File
@@ -0,0 +1,417 @@
"""Fixtures for Login Flow v2 integration tests.
These fixtures handle the complete provisioning flow:
1. Create OAuth client for the login-flow MCP server (port 8004)
2. Obtain OAuth token via Playwright browser automation
3. Connect MCP client session with OAuth token
4. Complete Login Flow v2 provisioning (browser login → app password)
5. Run MCP tools against the provisioned session
"""
import json
import logging
import os
import secrets
import time
from typing import Any, AsyncGenerator
from urllib.parse import quote, urlparse, urlunparse
import anyio
import httpx
import pytest
from mcp import ClientSession
from mcp.types import ElicitRequestParams, ElicitResult
from tests.conftest import (
DEFAULT_FULL_SCOPES,
_handle_oauth_consent_screen,
create_mcp_client_session,
get_mcp_server_resource_metadata,
)
logger = logging.getLogger(__name__)
LOGIN_FLOW_MCP_URL = "http://localhost:8004/mcp"
LOGIN_FLOW_MCP_BASE_URL = "http://localhost:8004"
@pytest.fixture(scope="session")
async def login_flow_oauth_client_credentials(anyio_backend, oauth_callback_server):
"""Create OAuth client credentials for the login-flow MCP server (port 8004).
Uses Dynamic Client Registration against Nextcloud's OIDC endpoint.
The client only needs openid/profile/email scopes since Login Flow v2
uses app passwords for Nextcloud API access, not OAuth tokens.
"""
from nextcloud_mcp_server.auth.client_registration import (
delete_client,
register_client,
)
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("Login Flow tests require NEXTCLOUD_HOST")
auth_states, callback_url = oauth_callback_server
logger.info("Setting up OAuth client for login-flow MCP server (port 8004)...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
discovery_response = await http_client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
token_endpoint = oidc_config["token_endpoint"]
authorization_endpoint = oidc_config["authorization_endpoint"]
registration_endpoint = oidc_config["registration_endpoint"]
# Login flow only needs identity scopes for the MCP session;
# we also request resource scopes so the token passes the MCP server's
# scope validation (the server advertises these scopes).
client_info = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="Pytest - Login Flow Test Client",
redirect_uris=[callback_url],
scopes=DEFAULT_FULL_SCOPES,
token_type="Bearer",
)
logger.info(f"Login Flow OAuth client ready: {client_info.client_id[:16]}...")
yield (
client_info.client_id,
client_info.client_secret,
callback_url,
token_endpoint,
authorization_endpoint,
)
# Cleanup
try:
await delete_client(
nextcloud_url=nextcloud_host,
client_id=client_info.client_id,
registration_access_token=client_info.registration_access_token,
client_secret=client_info.client_secret,
registration_client_uri=client_info.registration_client_uri,
)
logger.info(
f"Cleaned up Login Flow OAuth client: {client_info.client_id[:16]}..."
)
except Exception as e:
logger.warning(f"Failed to clean up Login Flow OAuth client: {e}")
@pytest.fixture(scope="session")
async def login_flow_oauth_token(
anyio_backend, browser, login_flow_oauth_client_credentials, oauth_callback_server
) -> str:
"""Obtain OAuth token for the login-flow MCP server.
Uses Playwright browser automation to complete the OAuth flow against
Nextcloud, obtaining a token suitable for the port 8004 MCP session.
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
username = os.getenv("NEXTCLOUD_USERNAME")
password = os.getenv("NEXTCLOUD_PASSWORD")
if not all([nextcloud_host, username, password]):
pytest.skip(
"Login Flow OAuth requires NEXTCLOUD_HOST, NEXTCLOUD_USERNAME, NEXTCLOUD_PASSWORD"
)
auth_states, _ = oauth_callback_server
client_id, client_secret, callback_url, token_endpoint, authorization_endpoint = (
login_flow_oauth_client_credentials
)
# Fetch resource metadata from port 8004 for audience
try:
resource_metadata = await get_mcp_server_resource_metadata(
LOGIN_FLOW_MCP_BASE_URL
)
resource_id = resource_metadata.get("resource")
except Exception as e:
logger.warning(f"Failed to fetch resource metadata from port 8004: {e}")
resource_id = None
state = secrets.token_urlsafe(32)
scopes_encoded = quote(DEFAULT_FULL_SCOPES, safe="")
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={quote(callback_url, safe='')}&"
f"state={state}&"
f"scope={scopes_encoded}"
)
if resource_id:
auth_url += f"&resource={quote(resource_id, safe='')}"
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
await page.goto(auth_url, wait_until="networkidle", timeout=60000)
current_url = page.url
if "/login" in current_url or "/index.php/login" in current_url:
await page.wait_for_selector('input[name="user"]', timeout=10000)
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
await page.click('button[type="submit"]')
await page.wait_for_load_state("networkidle", timeout=60000)
try:
await _handle_oauth_consent_screen(page, username)
except Exception:
pass
start_time = time.time()
while state not in auth_states:
if time.time() - start_time > 30:
raise TimeoutError("Timeout waiting for OAuth callback")
await anyio.sleep(0.5)
auth_code = auth_states[state]
finally:
await context.close()
async with httpx.AsyncClient(timeout=30.0) as http_client:
token_response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": callback_url,
"client_id": client_id,
"client_secret": client_secret,
},
)
token_response.raise_for_status()
token_data = token_response.json()
access_token = token_data["access_token"]
logger.info("Successfully obtained OAuth token for login-flow MCP server")
return access_token
def _rewrite_login_flow_url(login_url: str) -> str:
"""Rewrite internal Docker URLs to host-accessible URLs.
The MCP server runs inside Docker with NEXTCLOUD_HOST=http://app:80,
so Login Flow v2 URLs use the internal hostname. Playwright runs on
the host and needs localhost:8080 instead.
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
target = urlparse(nextcloud_host)
parsed = urlparse(login_url)
if parsed.hostname == "app":
parsed = parsed._replace(scheme=target.scheme, netloc=target.netloc)
return urlunparse(parsed)
async def _complete_login_flow_v2(browser, login_url: str) -> None:
"""Complete Nextcloud Login Flow v2 in a browser.
The full Nextcloud Login Flow v2 has these steps:
1. "Connect to your account" page → click "Log in" button
2. Login form → fill username/password, submit
(if already logged in via session cookie, this step is skipped)
3. "Account access" grant page → click "Grant access" button
4. Password confirmation dialog → enter password, click "Confirm"
5. "Account connected" success page
Args:
browser: Playwright browser instance
login_url: URL from Login Flow v2 initiation (e.g., /login/v2/flow/...)
"""
username = os.getenv("NEXTCLOUD_USERNAME", "admin")
password = os.getenv("NEXTCLOUD_PASSWORD", "admin")
# Rewrite internal Docker URL to host-accessible URL
login_url = _rewrite_login_flow_url(login_url)
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
logger.info(f"Opening Login Flow v2 URL: {login_url[:80]}...")
await page.goto(login_url, wait_until="networkidle", timeout=60000)
logger.info(f"Step 1 - Current URL: {page.url}")
# Step 1: "Connect to your account" page - click "Log in"
login_btn = page.get_by_role("button", name="Log in")
try:
await login_btn.wait_for(timeout=10000)
await login_btn.click()
logger.info("Clicked 'Log in' on Connect page")
await page.wait_for_load_state("networkidle", timeout=30000)
except Exception:
logger.info("No 'Log in' button - may already be on login/grant page")
logger.info(f"Step 2 - Current URL: {page.url}")
# Step 2: Login form (only if not already logged in)
# If the user has an active session, they skip straight to the grant page.
user_field = page.locator('input[name="user"]')
if await user_field.count() > 0:
logger.info("Login form detected, filling credentials...")
await user_field.fill(username)
await page.locator('input[name="password"]').fill(password)
await page.get_by_role("button", name="Log in", exact=True).click()
await page.wait_for_load_state("networkidle", timeout=60000)
logger.info(f"After login: {page.url}")
else:
logger.info("No login form - already logged in via session")
# Step 3: "Account access" grant page - click "Grant access"
grant_btn = page.get_by_role("button", name="Grant access")
try:
await grant_btn.wait_for(timeout=15000)
await grant_btn.click()
logger.info("Clicked 'Grant access'")
except Exception as e:
logger.warning(f"No Grant access button: {e}")
await page.screenshot(path="/tmp/login_flow_no_grant.png")
# Step 4: Password confirmation dialog
# Nextcloud shows "Authentication required" dialog after clicking Grant access
confirm_password = page.get_by_role("dialog").get_by_role(
"textbox", name="Password"
)
try:
await confirm_password.wait_for(timeout=10000)
logger.info("Password confirmation dialog detected")
await confirm_password.fill(password)
# Wait for Confirm button to become enabled after filling password
confirm_btn = page.get_by_role("dialog").get_by_role(
"button", name="Confirm"
)
await confirm_btn.wait_for(timeout=5000)
await confirm_btn.click()
logger.info("Clicked 'Confirm' in password dialog")
except Exception:
logger.info(
"No password confirmation dialog (may have been auto-confirmed)"
)
# Step 5: Wait for "Account connected" success page
try:
await page.get_by_text("Account connected").wait_for(timeout=15000)
logger.info("Login Flow v2 completed: Account connected!")
except Exception:
# The grant may have completed without the success page being visible
await page.wait_for_load_state("networkidle", timeout=10000)
logger.info(f"Login Flow v2 done. Final URL: {page.url}")
finally:
await context.close()
@pytest.fixture(scope="session")
async def nc_mcp_login_flow_client(
anyio_backend,
login_flow_oauth_token: str,
browser,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client session connected to the login-flow server (port 8004).
This fixture:
1. Connects to the MCP server with an OAuth token
2. Calls nc_auth_provision_access to start Login Flow v2
3. Completes the browser login to get an app password
4. Calls nc_auth_check_status to finalize provisioning
5. Yields the provisioned MCP client session
All subsequent tool calls will use the stored app password.
"""
# Create an elicitation callback that extracts the login URL
# and completes the Login Flow v2 in the browser
login_url_holder: dict[str, str] = {}
async def elicitation_callback(
context: Any,
params: ElicitRequestParams,
) -> ElicitResult:
"""Handle elicitation from nc_auth_provision_access.
Extracts the login URL from the elicitation message and
completes the Login Flow v2 browser login.
"""
message = params.message
logger.info(f"Elicitation received: {message[:100]}...")
# Extract login URL from elicitation message
for line in message.split("\n"):
stripped = line.strip()
if stripped.startswith("http") and "/login/v2/" in stripped:
login_url_holder["url"] = stripped
logger.info(f"Extracted login URL: {stripped[:80]}...")
break
if "url" in login_url_holder:
# Complete the Login Flow v2 in the browser
await _complete_login_flow_v2(browser, login_url_holder["url"])
# Return acceptance
return ElicitResult(
action="accept",
content={"acknowledged": True},
)
async for session in create_mcp_client_session(
url=LOGIN_FLOW_MCP_URL,
token=login_flow_oauth_token,
client_name="Login Flow MCP",
elicitation_callback=elicitation_callback,
):
# Step 1: Provision access via Login Flow v2
logger.info("Starting Login Flow v2 provisioning...")
provision_result = await session.call_tool(
"nc_auth_provision_access",
{"scopes": None}, # Request all scopes
)
provision_data = json.loads(provision_result.content[0].text)
logger.info(f"Provision result: {provision_data.get('status')}")
# If elicitation didn't fire (client doesn't support it),
# extract URL from the response and complete flow manually
if provision_data.get("status") == "login_required":
login_url = provision_data.get("login_url")
if login_url and "url" not in login_url_holder:
logger.info("Completing Login Flow v2 from response URL...")
await _complete_login_flow_v2(browser, login_url)
# Step 2: Poll for completion
logger.info("Polling Login Flow v2 status...")
max_attempts = 15
for attempt in range(max_attempts):
status_result = await session.call_tool("nc_auth_check_status", {})
status_data = json.loads(status_result.content[0].text)
status = status_data.get("status")
logger.info(f"Status check {attempt + 1}/{max_attempts}: {status}")
if status == "provisioned":
logger.info(
f"Login Flow v2 provisioned! Username: {status_data.get('username')}"
)
break
if status in ("not_initiated", "error"):
raise RuntimeError(
f"Login Flow v2 failed: {status_data.get('message')}"
)
await anyio.sleep(2)
else:
raise TimeoutError(
f"Login Flow v2 did not complete after {max_attempts} attempts"
)
yield session
@@ -0,0 +1,652 @@
"""Integration tests for Login Flow v2 (ADR-022).
Tests the complete Login Flow v2 provisioning and verifies all MCP tools
work through the stored app password. This validates the end-to-end flow:
OAuth token (MCP session) → Login Flow v2 (browser) → App password → Nextcloud API
Test categories:
1. Auth tools: provision, check status, scope management
2. Notes: CRUD operations
3. Calendar: events and todos
4. Contacts: address book and contact operations
5. Files (WebDAV): directory listing, file operations
6. Deck: board management
7. Cookbook: recipe operations
8. Tables: table operations
"""
import json
import logging
import uuid
import pytest
from mcp import ClientSession
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.login_flow, pytest.mark.integration]
# ---------------------------------------------------------------------------
# Auth tools
# ---------------------------------------------------------------------------
class TestLoginFlowAuthTools:
"""Test Login Flow v2 auth tools."""
async def test_check_status_provisioned(
self, nc_mcp_login_flow_client: ClientSession
):
"""After fixture setup, status should be 'provisioned'."""
result = await nc_mcp_login_flow_client.call_tool("nc_auth_check_status", {})
data = json.loads(result.content[0].text)
assert data["status"] == "provisioned"
assert data["username"] is not None
assert data["scopes"] is not None
logger.info(f"Provisioned as: {data['username']}, scopes: {data['scopes']}")
async def test_provision_access_already_provisioned(
self, nc_mcp_login_flow_client: ClientSession
):
"""Calling provision when already provisioned returns 'already_provisioned'."""
result = await nc_mcp_login_flow_client.call_tool(
"nc_auth_provision_access", {}
)
data = json.loads(result.content[0].text)
assert data["status"] == "already_provisioned"
assert "already provisioned" in data["message"].lower()
async def test_list_tools_includes_auth_tools(
self, nc_mcp_login_flow_client: ClientSession
):
"""Login Flow server should expose auth tools."""
tools = await nc_mcp_login_flow_client.list_tools()
tool_names = [t.name for t in tools.tools]
assert "nc_auth_provision_access" in tool_names
assert "nc_auth_check_status" in tool_names
assert "nc_auth_update_scopes" in tool_names
# ---------------------------------------------------------------------------
# Notes
# ---------------------------------------------------------------------------
class TestLoginFlowNotes:
"""Test Notes CRUD via Login Flow v2 app password."""
async def test_notes_crud(self, nc_mcp_login_flow_client: ClientSession):
"""Full Notes CRUD: create → read → update → search → delete."""
suffix = uuid.uuid4().hex[:8]
title = f"LoginFlow Test {suffix}"
content = f"Content for {suffix}"
category = "LoginFlowTest"
# Create
create_result = await nc_mcp_login_flow_client.call_tool(
"nc_notes_create_note",
{"title": title, "content": content, "category": category},
)
assert create_result.isError is False, (
f"Create failed: {create_result.content[0].text}"
)
note = json.loads(create_result.content[0].text)
note_id = note["id"]
etag = note["etag"]
logger.info(f"Created note {note_id}")
try:
# Read
read_result = await nc_mcp_login_flow_client.call_tool(
"nc_notes_get_note", {"note_id": note_id}
)
assert read_result.isError is False
read_data = json.loads(read_result.content[0].text)
assert read_data["title"] == title
assert read_data["content"] == content
# Update (title, content, category are all required params)
updated_content = f"Updated content for {suffix}"
update_result = await nc_mcp_login_flow_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"title": title,
"content": updated_content,
"category": category,
"etag": etag,
},
)
assert update_result.isError is False, (
f"Update failed: {update_result.content[0].text}"
)
updated = json.loads(update_result.content[0].text)
# UpdateNoteResponse returns id, title, category, etag (no content)
assert updated["title"] == title
assert "etag" in updated
# Append
append_result = await nc_mcp_login_flow_client.call_tool(
"nc_notes_append_content",
{"note_id": note_id, "content": "\n\nAppended text"},
)
assert append_result.isError is False
# Search
search_result = await nc_mcp_login_flow_client.call_tool(
"nc_notes_search_notes", {"query": suffix}
)
assert search_result.isError is False
search_data = json.loads(search_result.content[0].text)
assert search_data["total_found"] >= 1
finally:
# Delete
await nc_mcp_login_flow_client.call_tool(
"nc_notes_delete_note", {"note_id": note_id}
)
logger.info(f"Deleted note {note_id}")
# ---------------------------------------------------------------------------
# Calendar Events
# ---------------------------------------------------------------------------
class TestLoginFlowCalendarEvents:
"""Test Calendar event operations via Login Flow v2."""
async def test_calendar_events_workflow(
self, nc_mcp_login_flow_client: ClientSession
):
"""List calendars → create event → get event → delete event."""
# List calendars
cal_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_list_calendars", {}
)
assert cal_result.isError is False
cal_data = json.loads(cal_result.content[0].text)
calendars = cal_data.get("calendars", [])
assert len(calendars) > 0
calendar_name = calendars[0].get("name", "personal")
logger.info(f"Using calendar: {calendar_name}")
suffix = uuid.uuid4().hex[:8]
event_title = f"LoginFlow Event {suffix}"
# Create event (uses start_datetime/end_datetime)
create_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_create_event",
{
"calendar_name": calendar_name,
"title": event_title,
"start_datetime": "2026-03-01T10:00:00",
"end_datetime": "2026-03-01T11:00:00",
"description": f"Test event for login flow {suffix}",
},
)
assert create_result.isError is False, (
f"Create event failed: {create_result.content[0].text}"
)
event_data = json.loads(create_result.content[0].text)
event_uid = event_data.get("uid") or event_data.get("event_uid")
logger.info(f"Created event: {event_uid}")
try:
# Get event
get_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_get_event",
{"calendar_name": calendar_name, "event_uid": event_uid},
)
assert get_result.isError is False
finally:
# Delete event
await nc_mcp_login_flow_client.call_tool(
"nc_calendar_delete_event",
{"calendar_name": calendar_name, "event_uid": event_uid},
)
logger.info(f"Deleted event {event_uid}")
# ---------------------------------------------------------------------------
# Calendar Todos
# ---------------------------------------------------------------------------
class TestLoginFlowCalendarTodos:
"""Test Calendar todo (VTODO) operations via Login Flow v2."""
async def test_todo_workflow(self, nc_mcp_login_flow_client: ClientSession):
"""Create todo → list todos → update todo → delete todo."""
cal_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_list_calendars", {}
)
cal_data = json.loads(cal_result.content[0].text)
calendars = cal_data.get("calendars", [])
calendar_name = calendars[0].get("name", "personal")
suffix = uuid.uuid4().hex[:8]
todo_title = f"LoginFlow Todo {suffix}"
# Create todo (uses 'summary', not 'title')
create_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_create_todo",
{
"calendar_name": calendar_name,
"summary": todo_title,
"description": f"Test todo {suffix}",
},
)
if create_result.isError:
error_text = create_result.content[0].text
if "AuthorizationError" in error_text:
pytest.skip(
f"Calendar '{calendar_name}' does not support VTODO: {error_text}"
)
raise AssertionError(f"Create todo failed: {error_text}")
todo_data = json.loads(create_result.content[0].text)
todo_uid = todo_data.get("uid") or todo_data.get("todo_uid")
logger.info(f"Created todo: {todo_uid}")
try:
# List todos
list_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_list_todos",
{"calendar_name": calendar_name},
)
assert list_result.isError is False
# Update todo
update_result = await nc_mcp_login_flow_client.call_tool(
"nc_calendar_update_todo",
{
"calendar_name": calendar_name,
"todo_uid": todo_uid,
"percent_complete": 50,
},
)
assert update_result.isError is False
finally:
await nc_mcp_login_flow_client.call_tool(
"nc_calendar_delete_todo",
{"calendar_name": calendar_name, "todo_uid": todo_uid},
)
logger.info(f"Deleted todo {todo_uid}")
# ---------------------------------------------------------------------------
# Contacts
# ---------------------------------------------------------------------------
class TestLoginFlowContacts:
"""Test Contacts (CardDAV) operations via Login Flow v2."""
async def test_contacts_workflow(self, nc_mcp_login_flow_client: ClientSession):
"""Create addressbook → create contact → list contacts → cleanup."""
suffix = uuid.uuid4().hex[:8]
ab_name = f"lf-test-{suffix}"
contact_uid = f"login-flow-test-{suffix}"
contact_fn = f"LoginFlow Contact {suffix}"
# List address books (basic smoke test)
ab_result = await nc_mcp_login_flow_client.call_tool(
"nc_contacts_list_addressbooks", {}
)
assert ab_result.isError is False
# Create a temporary address book for isolation
create_ab_result = await nc_mcp_login_flow_client.call_tool(
"nc_contacts_create_addressbook",
{"name": ab_name, "display_name": f"Login Flow Test {suffix}"},
)
assert create_ab_result.isError is False, (
f"Create addressbook failed: {create_ab_result.content[0].text}"
)
logger.info(f"Created address book: {ab_name}")
try:
# Create contact (requires addressbook, uid, contact_data dict)
create_result = await nc_mcp_login_flow_client.call_tool(
"nc_contacts_create_contact",
{
"addressbook": ab_name,
"uid": contact_uid,
"contact_data": {
"fn": contact_fn,
"email": f"test-{suffix}@example.com",
},
},
)
assert create_result.isError is False, (
f"Create contact failed: {create_result.content[0].text}"
)
logger.info(f"Created contact: {contact_uid}")
# List contacts in our clean addressbook
# Note: may fail due to server-side Pydantic bug where ContactField.value
# is a dict (structured email) but model expects string
list_result = await nc_mcp_login_flow_client.call_tool(
"nc_contacts_list_contacts",
{"addressbook": ab_name},
)
if list_result.isError:
error_text = list_result.content[0].text
if "ContactField" in error_text:
logger.warning(
f"Known server bug: ContactField validation: {error_text}"
)
else:
raise AssertionError(f"List contacts failed: {error_text}")
else:
list_data = json.loads(list_result.content[0].text)
contacts = list_data.get("contacts", [])
contact_uids = [c.get("uid", "") for c in contacts]
assert contact_uid in contact_uids, (
f"Created contact {contact_uid} not found in list"
)
# Delete contact
await nc_mcp_login_flow_client.call_tool(
"nc_contacts_delete_contact",
{"addressbook": ab_name, "uid": contact_uid},
)
logger.info(f"Deleted contact {contact_uid}")
finally:
# Always clean up the temporary address book
await nc_mcp_login_flow_client.call_tool(
"nc_contacts_delete_addressbook",
{"name": ab_name},
)
logger.info(f"Deleted address book {ab_name}")
# ---------------------------------------------------------------------------
# Files (WebDAV)
# ---------------------------------------------------------------------------
class TestLoginFlowFiles:
"""Test WebDAV file operations via Login Flow v2."""
async def test_file_operations(self, nc_mcp_login_flow_client: ClientSession):
"""Create dir → write file → read file → list dir → delete."""
suffix = uuid.uuid4().hex[:8]
dir_path = f"/LoginFlowTest_{suffix}"
file_path = f"{dir_path}/test_file.txt"
file_content = f"Hello from Login Flow v2 test {suffix}"
# Create directory
mkdir_result = await nc_mcp_login_flow_client.call_tool(
"nc_webdav_create_directory", {"path": dir_path}
)
assert mkdir_result.isError is False, (
f"Create dir failed: {mkdir_result.content[0].text}"
)
logger.info(f"Created directory: {dir_path}")
try:
# Write file
write_result = await nc_mcp_login_flow_client.call_tool(
"nc_webdav_write_file",
{"path": file_path, "content": file_content},
)
assert write_result.isError is False
# Read file
read_result = await nc_mcp_login_flow_client.call_tool(
"nc_webdav_read_file", {"path": file_path}
)
assert read_result.isError is False
read_data = json.loads(read_result.content[0].text)
assert file_content in read_data.get("content", "")
# List directory (response uses 'files' field, each with 'name')
list_result = await nc_mcp_login_flow_client.call_tool(
"nc_webdav_list_directory", {"path": dir_path}
)
assert list_result.isError is False
list_data = json.loads(list_result.content[0].text)
files = list_data.get("files", [])
file_names = [f.get("name", "") for f in files]
assert "test_file.txt" in file_names
# Find files by name (uses 'pattern' and 'scope')
search_result = await nc_mcp_login_flow_client.call_tool(
"nc_webdav_find_by_name",
{"pattern": "test_file.txt", "scope": dir_path},
)
assert search_result.isError is False
finally:
# Clean up: delete file then directory
await nc_mcp_login_flow_client.call_tool(
"nc_webdav_delete_resource", {"path": file_path}
)
await nc_mcp_login_flow_client.call_tool(
"nc_webdav_delete_resource", {"path": dir_path}
)
logger.info(f"Cleaned up {dir_path}")
# ---------------------------------------------------------------------------
# Deck
# ---------------------------------------------------------------------------
class TestLoginFlowDeck:
"""Test Deck (Kanban) operations via Login Flow v2."""
async def test_deck_board_workflow(self, nc_mcp_login_flow_client: ClientSession):
"""Create board → list boards → get board details."""
import os
import httpx
suffix = uuid.uuid4().hex[:8]
board_title = f"LoginFlow Board {suffix}"
board_id = None
try:
# Create board (requires title and color)
create_result = await nc_mcp_login_flow_client.call_tool(
"deck_create_board", {"title": board_title, "color": "0076D1"}
)
assert create_result.isError is False, (
f"Create board failed: {create_result.content[0].text}"
)
board_data = json.loads(create_result.content[0].text)
board_id = board_data.get("id") or board_data.get("board_id")
logger.info(f"Created board: {board_id}")
# List boards (tool name is deck_get_boards)
list_result = await nc_mcp_login_flow_client.call_tool(
"deck_get_boards", {}
)
assert list_result.isError is False
boards_data = json.loads(list_result.content[0].text)
boards = boards_data.get("boards", [])
board_ids = [b.get("id") for b in boards]
assert board_id in board_ids
# Get board details
detail_result = await nc_mcp_login_flow_client.call_tool(
"deck_get_board", {"board_id": board_id}
)
assert detail_result.isError is False
finally:
# Clean up board via Deck REST API (no MCP delete_board tool exists)
if board_id is not None:
nc_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
nc_user = os.getenv("NEXTCLOUD_USERNAME", "admin")
nc_pass = os.getenv("NEXTCLOUD_PASSWORD", "admin")
try:
async with httpx.AsyncClient(
base_url=nc_host,
auth=httpx.BasicAuth(nc_user, nc_pass),
headers={"OCS-APIREQUEST": "true"},
) as client:
resp = await client.delete(
f"/apps/deck/api/v1.0/boards/{board_id}"
)
logger.info(f"Board cleanup: {board_id}{resp.status_code}")
except Exception as e:
logger.warning(f"Board cleanup failed: {e}")
# ---------------------------------------------------------------------------
# Tables
# ---------------------------------------------------------------------------
class TestLoginFlowTables:
"""Test Tables operations via Login Flow v2."""
@pytest.mark.xfail(
reason="Server-side Pydantic bug: Table.owner_display_name required but missing from API",
strict=False,
)
async def test_tables_list(self, nc_mcp_login_flow_client: ClientSession):
"""List tables (may be empty but should not error)."""
result = await nc_mcp_login_flow_client.call_tool("nc_tables_list_tables", {})
assert result.isError is False, f"List tables failed: {result.content[0].text}"
data = json.loads(result.content[0].text)
logger.info(f"Tables: {data}")
# ---------------------------------------------------------------------------
# Cookbook
# ---------------------------------------------------------------------------
class TestLoginFlowCookbook:
"""Test Cookbook operations via Login Flow v2."""
async def test_cookbook_list_and_categories(
self, nc_mcp_login_flow_client: ClientSession
):
"""List recipes and categories (may be empty but should not error)."""
# List recipes
list_result = await nc_mcp_login_flow_client.call_tool(
"nc_cookbook_list_recipes", {}
)
assert list_result.isError is False
# List categories
cat_result = await nc_mcp_login_flow_client.call_tool(
"nc_cookbook_list_categories", {}
)
assert cat_result.isError is False
async def test_cookbook_create_and_delete(
self, nc_mcp_login_flow_client: ClientSession
):
"""Create recipe → get recipe → delete recipe."""
suffix = uuid.uuid4().hex[:8]
create_result = await nc_mcp_login_flow_client.call_tool(
"nc_cookbook_create_recipe",
{
"name": f"LoginFlow Recipe {suffix}",
"description": f"Test recipe {suffix}",
"ingredients": ["flour", "sugar", "butter"],
"instructions": ["Mix ingredients", "Bake at 350F"],
"keywords": "test,login-flow", # keywords is a string, not list
},
)
assert create_result.isError is False, (
f"Create recipe failed: {create_result.content[0].text}"
)
recipe_data = json.loads(create_result.content[0].text)
recipe_id = recipe_data.get("id") or recipe_data.get("recipe_id")
logger.info(f"Created recipe: {recipe_id}")
try:
# Get recipe (may fail due to server-side Pydantic bug with recipeYield=None)
get_result = await nc_mcp_login_flow_client.call_tool(
"nc_cookbook_get_recipe", {"recipe_id": recipe_id}
)
if get_result.isError:
error_text = get_result.content[0].text
if "recipeYield" in error_text:
logger.warning(
f"Known server bug: Recipe.recipeYield validation: {error_text}"
)
else:
raise AssertionError(f"Get recipe failed: {error_text}")
finally:
if recipe_id:
await nc_mcp_login_flow_client.call_tool(
"nc_cookbook_delete_recipe", {"recipe_id": recipe_id}
)
logger.info(f"Deleted recipe {recipe_id}")
# ---------------------------------------------------------------------------
# Connectivity & Tool Listing
# ---------------------------------------------------------------------------
class TestLoginFlowConnectivity:
"""Basic connectivity and tool listing tests."""
async def test_list_tools(self, nc_mcp_login_flow_client: ClientSession):
"""Verify key tools are available."""
tools = await nc_mcp_login_flow_client.list_tools()
tool_names = [t.name for t in tools.tools]
# Auth tools (Login Flow v2 specific)
assert "nc_auth_provision_access" in tool_names
assert "nc_auth_check_status" in tool_names
assert "nc_auth_update_scopes" in tool_names
# Standard Nextcloud tools (verified against server/test_mcp.py)
expected = [
"nc_notes_create_note",
"nc_notes_search_notes",
"nc_notes_get_note",
"nc_notes_update_note",
"nc_notes_delete_note",
"nc_notes_append_content",
"nc_calendar_list_calendars",
"nc_calendar_create_event",
"nc_calendar_list_events",
"nc_calendar_get_event",
"nc_calendar_delete_event",
"nc_calendar_list_todos",
"nc_calendar_create_todo",
"nc_calendar_update_todo",
"nc_calendar_delete_todo",
"nc_contacts_list_addressbooks",
"nc_contacts_create_contact",
"nc_contacts_list_contacts",
"nc_contacts_delete_contact",
"nc_webdav_list_directory",
"nc_webdav_read_file",
"nc_webdav_write_file",
"nc_webdav_create_directory",
"nc_webdav_delete_resource",
"nc_webdav_find_by_name",
"deck_create_board",
"deck_get_boards",
"deck_get_board",
"nc_tables_list_tables",
"nc_cookbook_list_recipes",
"nc_cookbook_create_recipe",
"nc_cookbook_get_recipe",
"nc_cookbook_delete_recipe",
"nc_cookbook_list_categories",
]
for tool in expected:
assert tool in tool_names, f"Expected tool '{tool}' not found"
async def test_list_resources(self, nc_mcp_login_flow_client: ClientSession):
"""Verify resource templates are available."""
templates = await nc_mcp_login_flow_client.list_resource_templates()
logger.info(f"Resource templates: {len(templates.resourceTemplates)}")
@@ -27,6 +27,8 @@ from ...conftest import _handle_oauth_consent_screen
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
@pytest.fixture(scope="module")
def nextcloud_host() -> str:
@@ -114,7 +116,6 @@ async def test_oauth_clients(
logger.info("Test OAuth clients fixture complete")
@pytest.mark.integration
async def test_introspection_requires_client_authentication(
oidc_endpoints: dict[str, str],
):
@@ -284,7 +285,6 @@ async def _obtain_token_for_client(
return access_token
@pytest.mark.integration
async def test_client_cannot_introspect_other_clients_tokens(
playwright_oauth_token: str,
shared_oauth_client_credentials: tuple,
@@ -344,7 +344,6 @@ async def test_client_cannot_introspect_other_clients_tokens(
)
@pytest.mark.integration
async def test_introspection_with_resource_parameter(
browser,
oauth_callback_server,
@@ -440,7 +439,6 @@ async def test_introspection_with_resource_parameter(
)
@pytest.mark.integration
async def test_introspection_returns_inactive_for_invalid_token(
test_oauth_clients: dict[str, tuple[str, str]],
oidc_endpoints: dict[str, str],
@@ -18,6 +18,7 @@ import pytest
@pytest.mark.integration
@pytest.mark.oauth
async def test_prm_endpoint():
"""Test that the Protected Resource Metadata endpoint returns correct data."""
@@ -32,7 +33,7 @@ async def test_prm_endpoint():
assert prm_data["resource"] == "http://localhost:8001/mcp"
assert "notes:read" in prm_data["scopes_supported"]
assert "notes:write" in prm_data["scopes_supported"]
assert "http://localhost:8080" in prm_data["authorization_servers"]
assert "http://localhost:8001" in prm_data["authorization_servers"]
assert "header" in prm_data["bearer_methods_supported"]
assert "RS256" in prm_data["resource_signing_alg_values_supported"]
@@ -60,6 +61,7 @@ async def test_basicauth_shows_all_tools(nc_mcp_client):
@pytest.mark.integration
@pytest.mark.oauth
async def test_read_only_token_filters_write_tools(nc_mcp_oauth_client_read_only):
"""Test that a token with only read scopes filters out write tools."""
@@ -108,6 +110,7 @@ async def test_read_only_token_filters_write_tools(nc_mcp_oauth_client_read_only
@pytest.mark.integration
@pytest.mark.oauth
async def test_write_only_token_filters_read_tools(nc_mcp_oauth_client_write_only):
"""Test that a token with only write scopes filters out read tools."""
@@ -156,6 +159,7 @@ async def test_write_only_token_filters_read_tools(nc_mcp_oauth_client_write_onl
@pytest.mark.integration
@pytest.mark.oauth
async def test_full_access_token_shows_all_tools(nc_mcp_oauth_client_full_access):
"""Test that a token with both read and write scopes scopes can see all tools."""
@@ -389,6 +393,7 @@ async def test_scope_metadata_coverage(nc_mcp_client):
@pytest.mark.integration
@pytest.mark.oauth
async def test_jwt_with_no_custom_scopes_returns_zero_tools(
nc_mcp_oauth_client_no_custom_scopes,
):
@@ -433,6 +438,7 @@ async def test_jwt_with_no_custom_scopes_returns_zero_tools(
@pytest.mark.integration
@pytest.mark.oauth
async def test_jwt_consent_scenarios_read_only(nc_mcp_oauth_client_read_only):
"""
Test JWT with only nc:read scope consented.
@@ -470,6 +476,7 @@ async def test_jwt_consent_scenarios_read_only(nc_mcp_oauth_client_read_only):
@pytest.mark.integration
@pytest.mark.oauth
async def test_jwt_consent_scenarios_write_only(nc_mcp_oauth_client_write_only):
"""
Test JWT with only nc:write scope consented.
@@ -507,6 +514,7 @@ async def test_jwt_consent_scenarios_write_only(nc_mcp_oauth_client_write_only):
@pytest.mark.integration
@pytest.mark.oauth
async def test_jwt_consent_scenarios_full_access(nc_mcp_oauth_client_full_access):
"""
Test JWT with both nc:read and nc:write scopes consented.
+2 -4
View File
@@ -61,14 +61,12 @@ async def token_exchange_service(token_storage):
@pytest.fixture
async def token_broker(token_storage):
"""Create test token broker service."""
# Use the same encryption key as storage
encryption_key = token_storage._test_encryption_key
broker = TokenBrokerService(
storage=token_storage,
oidc_discovery_url="http://test-idp/.well-known/openid-configuration",
nextcloud_host="http://test-nextcloud",
encryption_key=encryption_key,
client_id="test-client",
client_secret="test-secret",
cache_ttl=300,
cache_early_refresh=30,
)
View File
+243
View File
@@ -0,0 +1,243 @@
"""Unit tests for access.py REST API endpoints.
Tests the REST API endpoints for user access and scope management:
- GET /api/v1/users/{user_id}/access - Get user's provisioned access and scopes
- PATCH /api/v1/users/{user_id}/scopes - Update user's application-level scopes
- GET /api/v1/scopes - List all supported scopes
"""
import base64
import tempfile
from pathlib import Path
import pytest
from cryptography.fernet import Fernet
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.testclient import TestClient
from nextcloud_mcp_server.api.access import (
get_user_access,
list_supported_scopes,
update_user_scopes,
)
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.models.auth import ALL_SUPPORTED_SCOPES
pytestmark = pytest.mark.unit
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
return Fernet.generate_key().decode()
@pytest.fixture
async def temp_storage(encryption_key):
"""Create temporary storage instance with encryption for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_access.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
yield storage
def create_basic_auth_header(username: str, password: str) -> str:
"""Create BasicAuth header value."""
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded}"
def create_test_app(storage):
"""Create a test Starlette app with the access endpoints."""
app = Starlette(
routes=[
Route(
"/api/v1/users/{user_id}/access",
get_user_access,
methods=["GET"],
),
Route(
"/api/v1/users/{user_id}/scopes",
update_user_scopes,
methods=["PATCH"],
),
Route(
"/api/v1/scopes",
list_supported_scopes,
methods=["GET"],
),
],
)
app.state.storage = storage
return app
class TestGetUserAccess:
"""Tests for GET /api/v1/users/{user_id}/access."""
async def test_not_provisioned(self, temp_storage):
"""Returns provisioned=False when no app password stored."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.get(
"/api/v1/users/alice/access",
headers={"Authorization": create_basic_auth_header("alice", "pw")},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["provisioned"] is False
assert data["scopes"] is None
async def test_provisioned_with_scopes(self, temp_storage):
"""Returns provisioned=True with scopes when app password exists."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="test-app-pw",
scopes=["notes:read", "calendar:write"],
username="alice_nc",
)
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.get(
"/api/v1/users/alice/access",
headers={"Authorization": create_basic_auth_header("alice", "pw")},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["provisioned"] is True
assert set(data["scopes"]) == {"notes:read", "calendar:write"}
assert data["username"] == "alice_nc"
async def test_missing_auth_header(self, temp_storage):
"""Returns 401 when no Authorization header."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.get("/api/v1/users/alice/access")
assert resp.status_code == 401
async def test_user_id_mismatch(self, temp_storage):
"""Returns 403 when path user_id doesn't match auth credentials."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.get(
"/api/v1/users/alice/access",
headers={"Authorization": create_basic_auth_header("bob", "pw")},
)
assert resp.status_code == 403
class TestUpdateUserScopes:
"""Tests for PATCH /api/v1/users/{user_id}/scopes."""
async def test_update_valid_scopes(self, temp_storage):
"""Successfully updates scopes for a provisioned user."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="test-app-pw",
scopes=["notes:read"],
username="alice_nc",
)
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.patch(
"/api/v1/users/alice/scopes",
headers={"Authorization": create_basic_auth_header("alice", "pw")},
json={"scopes": ["notes:read", "notes:write", "calendar:read"]},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert set(data["scopes"]) == {"notes:read", "notes:write", "calendar:read"}
async def test_invalid_scopes(self, temp_storage):
"""Returns 400 for invalid scope names."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="test-app-pw",
scopes=["notes:read"],
)
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.patch(
"/api/v1/users/alice/scopes",
headers={"Authorization": create_basic_auth_header("alice", "pw")},
json={"scopes": ["notes:read", "invalid:scope"]},
)
assert resp.status_code == 400
data = resp.json()
assert data["success"] is False
assert "invalid:scope" in data["error"]
async def test_user_not_provisioned(self, temp_storage):
"""Returns 404 when user has no app password."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.patch(
"/api/v1/users/alice/scopes",
headers={"Authorization": create_basic_auth_header("alice", "pw")},
json={"scopes": ["notes:read"]},
)
assert resp.status_code == 404
data = resp.json()
assert data["success"] is False
async def test_missing_scopes_field(self, temp_storage):
"""Returns 400 when scopes field is missing from body."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.patch(
"/api/v1/users/alice/scopes",
headers={"Authorization": create_basic_auth_header("alice", "pw")},
json={"something_else": True},
)
assert resp.status_code == 400
async def test_invalid_json_body(self, temp_storage):
"""Returns 400 for invalid JSON body."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.patch(
"/api/v1/users/alice/scopes",
headers={
"Authorization": create_basic_auth_header("alice", "pw"),
"Content-Type": "application/json",
},
content=b"not json",
)
assert resp.status_code == 400
class TestListSupportedScopes:
"""Tests for GET /api/v1/scopes."""
async def test_returns_all_scopes(self, temp_storage):
"""Returns all supported scopes sorted."""
app = create_test_app(temp_storage)
client = TestClient(app)
resp = client.get("/api/v1/scopes")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert set(data["scopes"]) == ALL_SUPPORTED_SCOPES
# Verify it's sorted
assert data["scopes"] == sorted(data["scopes"])
+198
View File
@@ -0,0 +1,198 @@
"""Unit tests for Login Flow v2 MCP auth tools.
Tests the auth tools logic with mocked storage and Login Flow client.
"""
import tempfile
from pathlib import Path
import pytest
from cryptography.fernet import Fernet
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.models.auth import ALL_SUPPORTED_SCOPES
pytestmark = pytest.mark.unit
@pytest.fixture
def encryption_key():
"""Generate a test encryption key."""
return Fernet.generate_key().decode()
@pytest.fixture
async def temp_storage(encryption_key):
"""Create temporary storage with encryption for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_auth_tools.db"
storage = RefreshTokenStorage(
db_path=str(db_path), encryption_key=encryption_key
)
await storage.initialize()
yield storage
async def test_store_app_password_with_scopes(temp_storage):
"""Test storing app password with scopes."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="aaaaa-bbbbb-ccccc-ddddd-eeeee",
scopes=["notes:read", "notes:write"],
username="alice_nc",
)
data = await temp_storage.get_app_password_with_scopes("alice")
assert data is not None
assert data["app_password"] == "aaaaa-bbbbb-ccccc-ddddd-eeeee"
assert data["scopes"] == ["notes:read", "notes:write"]
assert data["username"] == "alice_nc"
assert data["created_at"] is not None
assert data["updated_at"] is not None
async def test_store_app_password_null_scopes(temp_storage):
"""Test storing app password with NULL scopes (all allowed)."""
await temp_storage.store_app_password_with_scopes(
user_id="bob",
app_password="fffff-ggggg-hhhhh-iiiii-jjjjj",
scopes=None,
)
data = await temp_storage.get_app_password_with_scopes("bob")
assert data is not None
assert data["scopes"] is None # NULL = all scopes allowed
assert data["username"] is None
async def test_store_app_password_with_scopes_replaces(temp_storage):
"""Test that storing replaces existing record."""
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="aaaaa-bbbbb-ccccc-ddddd-eeeee",
scopes=["notes:read"],
)
await temp_storage.store_app_password_with_scopes(
user_id="alice",
app_password="xxxxx-yyyyy-zzzzz-aaaaa-bbbbb",
scopes=["notes:read", "calendar:read"],
username="alice_nc",
)
data = await temp_storage.get_app_password_with_scopes("alice")
assert data["app_password"] == "xxxxx-yyyyy-zzzzz-aaaaa-bbbbb"
assert data["scopes"] == ["notes:read", "calendar:read"]
async def test_get_app_password_with_scopes_nonexistent(temp_storage):
"""Test getting scoped password for non-existent user."""
data = await temp_storage.get_app_password_with_scopes("nonexistent")
assert data is None
# ── Login Flow Session Tests ──
async def test_store_and_get_login_flow_session(temp_storage):
"""Test storing and retrieving a login flow session."""
await temp_storage.store_login_flow_session(
user_id="alice",
poll_token="secret-poll-token",
poll_endpoint="https://cloud.example.com/login/v2/poll",
requested_scopes=["notes:read", "notes:write"],
)
session = await temp_storage.get_login_flow_session("alice")
assert session is not None
assert session["poll_token"] == "secret-poll-token"
assert session["poll_endpoint"] == "https://cloud.example.com/login/v2/poll"
assert session["requested_scopes"] == ["notes:read", "notes:write"]
assert session["created_at"] is not None
assert session["expires_at"] is not None
async def test_get_login_flow_session_nonexistent(temp_storage):
"""Test getting session for user with no pending flow."""
session = await temp_storage.get_login_flow_session("nonexistent")
assert session is None
async def test_get_login_flow_session_expired(temp_storage):
"""Test that expired sessions are not returned."""
await temp_storage.store_login_flow_session(
user_id="alice",
poll_token="expired-token",
poll_endpoint="https://cloud.example.com/login/v2/poll",
expires_at=1, # Expired long ago
)
session = await temp_storage.get_login_flow_session("alice")
assert session is None
async def test_delete_login_flow_session(temp_storage):
"""Test deleting a login flow session."""
await temp_storage.store_login_flow_session(
user_id="alice",
poll_token="token",
poll_endpoint="https://cloud.example.com/poll",
)
deleted = await temp_storage.delete_login_flow_session("alice")
assert deleted is True
# Verify it's gone
session = await temp_storage.get_login_flow_session("alice")
assert session is None
async def test_delete_login_flow_session_nonexistent(temp_storage):
"""Test deleting a non-existent session returns False."""
deleted = await temp_storage.delete_login_flow_session("nonexistent")
assert deleted is False
async def test_delete_expired_login_flow_sessions(temp_storage):
"""Test cleanup of expired sessions."""
# Store 2 expired and 1 valid session
await temp_storage.store_login_flow_session(
user_id="expired1",
poll_token="t1",
poll_endpoint="https://cloud.example.com/poll",
expires_at=1,
)
await temp_storage.store_login_flow_session(
user_id="expired2",
poll_token="t2",
poll_endpoint="https://cloud.example.com/poll",
expires_at=2,
)
await temp_storage.store_login_flow_session(
user_id="valid",
poll_token="t3",
poll_endpoint="https://cloud.example.com/poll",
# Default expiry = 20 minutes from now
)
count = await temp_storage.delete_expired_login_flow_sessions()
assert count == 2
# Valid session should still exist
session = await temp_storage.get_login_flow_session("valid")
assert session is not None
# ── Response Model Tests ──
def test_all_supported_scopes():
"""Test that ALL_SUPPORTED_SCOPES contains expected scopes."""
assert "notes:read" in ALL_SUPPORTED_SCOPES
assert "notes:write" in ALL_SUPPORTED_SCOPES
assert "calendar:read" in ALL_SUPPORTED_SCOPES
assert "files:read" in ALL_SUPPORTED_SCOPES
assert "deck:read" in ALL_SUPPORTED_SCOPES
# Scopes should be in pairs (read/write)
read_scopes = [s for s in ALL_SUPPORTED_SCOPES if s.endswith(":read")]
write_scopes = [s for s in ALL_SUPPORTED_SCOPES if s.endswith(":write")]
assert len(read_scopes) == len(write_scopes)
+1 -1
View File
@@ -32,7 +32,7 @@ def mock_metrics():
def mock_tracer():
"""Mock OpenTelemetry tracer."""
with patch(
"nextcloud_mcp_server.observability.tracing.trace_operation"
"nextcloud_mcp_server.observability.metrics.trace_operation"
) as mock_trace:
# Configure mock to act as a context manager that allows exceptions to propagate
mock_trace.return_value.__enter__ = MagicMock(return_value=None)
+210
View File
@@ -0,0 +1,210 @@
"""Unit tests for Login Flow v2 HTTP client.
Tests the LoginFlowV2Client with mocked HTTP responses for:
- Flow initiation (POST /index.php/login/v2)
- Flow polling (completed, pending, expired)
- Error handling
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nextcloud_mcp_server.auth.login_flow import (
LoginFlowInitResponse,
LoginFlowPollResult,
LoginFlowV2Client,
)
pytestmark = pytest.mark.unit
@pytest.fixture
def flow_client():
"""Create a LoginFlowV2Client for testing."""
return LoginFlowV2Client(
nextcloud_host="https://cloud.example.com",
verify_ssl=False,
)
def _mock_response(status_code: int, json_data: dict) -> MagicMock:
"""Create a mock httpx response."""
response = MagicMock()
response.status_code = status_code
response.json.return_value = json_data
response.raise_for_status = MagicMock()
if status_code >= 400:
from httpx import HTTPStatusError
response.raise_for_status.side_effect = HTTPStatusError(
"error", request=MagicMock(), response=response
)
return response
async def test_initiate_success(flow_client):
"""Test successful Login Flow v2 initiation."""
mock_response = _mock_response(
200,
{
"login": "https://cloud.example.com/login/v2/grant?token=abc123",
"poll": {
"endpoint": "https://cloud.example.com/login/v2/poll",
"token": "secret-poll-token",
},
},
)
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch(
"nextcloud_mcp_server.auth.login_flow.nextcloud_httpx_client",
return_value=mock_client,
):
result = await flow_client.initiate()
assert isinstance(result, LoginFlowInitResponse)
assert result.login_url == "https://cloud.example.com/login/v2/grant?token=abc123"
assert result.poll_endpoint == "https://cloud.example.com/login/v2/poll"
assert result.poll_token == "secret-poll-token"
async def test_poll_completed(flow_client):
"""Test polling when user has completed login."""
mock_response = _mock_response(
200,
{
"server": "https://cloud.example.com",
"loginName": "alice",
"appPassword": "aaaaa-bbbbb-ccccc-ddddd-eeeee",
},
)
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch(
"nextcloud_mcp_server.auth.login_flow.nextcloud_httpx_client",
return_value=mock_client,
):
result = await flow_client.poll(
poll_endpoint="https://cloud.example.com/login/v2/poll",
poll_token="secret-poll-token",
)
assert isinstance(result, LoginFlowPollResult)
assert result.status == "completed"
assert result.server == "https://cloud.example.com"
assert result.login_name == "alice"
assert result.app_password == "aaaaa-bbbbb-ccccc-ddddd-eeeee"
async def test_poll_pending(flow_client):
"""Test polling when login is still pending."""
mock_response = _mock_response(404, {})
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch(
"nextcloud_mcp_server.auth.login_flow.nextcloud_httpx_client",
return_value=mock_client,
):
result = await flow_client.poll(
poll_endpoint="https://cloud.example.com/login/v2/poll",
poll_token="secret-poll-token",
)
assert result.status == "pending"
assert result.server is None
assert result.app_password is None
async def test_poll_expired(flow_client):
"""Test polling when flow has expired."""
mock_response = _mock_response(403, {})
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch(
"nextcloud_mcp_server.auth.login_flow.nextcloud_httpx_client",
return_value=mock_client,
):
result = await flow_client.poll(
poll_endpoint="https://cloud.example.com/login/v2/poll",
poll_token="expired-token",
)
assert result.status == "expired"
assert result.app_password is None
async def test_initiate_with_custom_user_agent(flow_client):
"""Test that custom user agent is passed in the request."""
mock_response = _mock_response(
200,
{
"login": "https://cloud.example.com/login/v2/grant?token=abc",
"poll": {
"endpoint": "https://cloud.example.com/login/v2/poll",
"token": "tok",
},
},
)
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch(
"nextcloud_mcp_server.auth.login_flow.nextcloud_httpx_client",
return_value=mock_client,
):
await flow_client.initiate(user_agent="my-custom-agent")
# Verify the user agent was passed
call_kwargs = mock_client.post.call_args
assert call_kwargs.kwargs["headers"]["User-Agent"] == "my-custom-agent"
async def test_login_flow_init_response_model():
"""Test LoginFlowInitResponse Pydantic model validation."""
resp = LoginFlowInitResponse(
login_url="https://cloud.example.com/login",
poll_endpoint="https://cloud.example.com/poll",
poll_token="token123",
)
assert resp.login_url == "https://cloud.example.com/login"
assert resp.poll_endpoint == "https://cloud.example.com/poll"
assert resp.poll_token == "token123"
async def test_login_flow_poll_result_model():
"""Test LoginFlowPollResult Pydantic model validation."""
# Completed result
completed = LoginFlowPollResult(
status="completed",
server="https://cloud.example.com",
login_name="bob",
app_password="xxxxx-yyyyy-zzzzz-aaaaa-bbbbb",
)
assert completed.status == "completed"
assert completed.login_name == "bob"
# Pending result
pending = LoginFlowPollResult(status="pending")
assert pending.status == "pending"
assert pending.server is None
assert pending.app_password is None
@@ -184,7 +184,7 @@ async def test_provision_app_password_success(temp_storage, mocker):
"""Test successful app password provisioning."""
# Mock settings (imported locally in the function)
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -203,7 +203,7 @@ async def test_provision_app_password_success(temp_storage, mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -233,7 +233,7 @@ async def test_provision_app_password_success(temp_storage, mocker):
async def test_provision_app_password_nextcloud_validation_fails(mocker):
"""Test that failed Nextcloud validation returns 401."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -251,7 +251,7 @@ async def test_provision_app_password_nextcloud_validation_fails(mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -356,7 +356,7 @@ async def test_delete_app_password_success(temp_storage, mocker):
# Mock settings (imported locally in the function)
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -374,7 +374,7 @@ async def test_delete_app_password_success(temp_storage, mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -404,7 +404,7 @@ async def test_delete_app_password_not_found(temp_storage, mocker):
"""Test deleting non-existent app password."""
# Mock settings (imported locally in the function)
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -422,7 +422,7 @@ async def test_delete_app_password_not_found(temp_storage, mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -447,7 +447,7 @@ async def test_delete_app_password_not_found(temp_storage, mocker):
async def test_delete_app_password_invalid_credentials(mocker):
"""Test that invalid credentials returns 401 for deletion."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -465,7 +465,7 @@ async def test_delete_app_password_invalid_credentials(mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -521,7 +521,7 @@ async def test_delete_app_password_username_mismatch():
async def test_provision_app_password_rate_limiting(mocker):
"""Test that rate limiting blocks excessive provisioning attempts."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -539,7 +539,7 @@ async def test_provision_app_password_rate_limiting(mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
@@ -584,7 +584,7 @@ async def test_provision_app_password_rate_limiting(mocker):
async def test_rate_limiting_is_per_user(mocker):
"""Test that rate limiting is applied per user, not globally."""
mocker.patch(
"nextcloud_mcp_server.config.get_settings",
"nextcloud_mcp_server.api.passwords.get_settings",
return_value=MagicMock(
nextcloud_host="http://localhost:8080",
nextcloud_verify_ssl=True,
@@ -602,7 +602,7 @@ async def test_rate_limiting_is_per_user(mocker):
mock_client.__aexit__ = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.api.passwords.httpx.AsyncClient",
"nextcloud_mcp_server.api.passwords.nextcloud_httpx_client",
return_value=mock_client,
)
+27 -18
View File
@@ -70,10 +70,11 @@ class TestStatusEndpointOidcConfig:
# get_settings and detect_auth_mode are imported inside the function
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.MULTI_USER_BASIC,
),
):
@@ -107,10 +108,11 @@ class TestStatusEndpointOidcConfig:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.MULTI_USER_BASIC,
),
):
@@ -135,10 +137,11 @@ class TestStatusEndpointOidcConfig:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.MULTI_USER_BASIC,
),
):
@@ -167,10 +170,11 @@ class TestStatusEndpointOidcConfig:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.OAUTH_SINGLE_AUDIENCE,
),
):
@@ -202,10 +206,11 @@ class TestStatusEndpointOidcConfig:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.SINGLE_USER_BASIC,
),
):
@@ -235,10 +240,11 @@ class TestStatusEndpointOidcConfig:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.MULTI_USER_BASIC,
),
):
@@ -267,10 +273,11 @@ class TestStatusEndpointOidcConfig:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.MULTI_USER_BASIC,
),
):
@@ -295,10 +302,11 @@ class TestStatusEndpointBasicResponse:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.SINGLE_USER_BASIC,
),
):
@@ -320,10 +328,11 @@ class TestStatusEndpointBasicResponse:
with (
patch(
"nextcloud_mcp_server.config.get_settings", return_value=mock_settings
"nextcloud_mcp_server.api.management.get_settings",
return_value=mock_settings,
),
patch(
"nextcloud_mcp_server.config_validators.detect_auth_mode",
"nextcloud_mcp_server.api.management.detect_auth_mode",
return_value=AuthMode.SINGLE_USER_BASIC,
),
):
@@ -0,0 +1,93 @@
"""Unit tests for @require_scopes with stored app passwords (Login Flow v2).
Tests the third enforcement mode in scope_authorization.py that checks
application-level scopes stored alongside app passwords.
"""
from unittest.mock import AsyncMock, patch
import pytest
from nextcloud_mcp_server.auth.scope_authorization import (
_get_stored_scopes,
_scope_cache,
)
pytestmark = pytest.mark.unit
@pytest.fixture(autouse=True)
def clear_scope_cache():
"""Clear scope cache before each test."""
_scope_cache.clear()
yield
_scope_cache.clear()
async def test_get_stored_scopes_with_scopes():
"""Test getting specific scopes from storage."""
mock_storage = AsyncMock()
mock_storage.get_app_password_with_scopes.return_value = {
"app_password": "xxxxx",
"scopes": ["notes:read", "calendar:read"],
"username": "alice",
"created_at": 1000,
"updated_at": 1000,
}
with patch(
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
):
result = await _get_stored_scopes("alice")
assert result == ["notes:read", "calendar:read"]
async def test_get_stored_scopes_null_scopes():
"""Test that NULL scopes returns 'all'."""
mock_storage = AsyncMock()
mock_storage.get_app_password_with_scopes.return_value = {
"app_password": "xxxxx",
"scopes": None,
"username": "bob",
"created_at": 1000,
"updated_at": 1000,
}
with patch(
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
):
result = await _get_stored_scopes("bob")
assert result == "all"
async def test_get_stored_scopes_no_password():
"""Test that missing app password returns None."""
mock_storage = AsyncMock()
mock_storage.get_app_password_with_scopes.return_value = None
with patch(
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
):
result = await _get_stored_scopes("nobody")
assert result is None
async def test_get_stored_scopes_storage_error():
"""Test that storage errors propagate to the caller."""
mock_storage = AsyncMock()
mock_storage.get_app_password_with_scopes.side_effect = RuntimeError("DB error")
with (
patch(
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
),
pytest.raises(RuntimeError, match="DB error"),
):
await _get_stored_scopes("alice")