fix: address PR #589 review feedback (round 2)

Consolidate three independent RefreshTokenStorage lazy singletons into a
single lock-protected get_shared_storage() function, eliminating race
conditions on concurrent first-access. Remove blanket try/except in
_get_stored_scopes so storage errors propagate as proper MCP errors
instead of silently triggering "please provision" messages. Handle
declined/cancelled elicitation results in Login Flow tools by cleaning up
sessions and returning clear status. Add update_app_password_scopes() to
avoid unnecessary decrypt/re-encrypt when only scopes change. Add
unprovisioned-user early exit and no-op detection to nc_auth_update_scopes.
Remove four dead config fields and misleading NEXTCLOUD_PASSWORD deprecation
warning. Add periodic login flow session cleanup task. Generate separate
Fernet keys per service. Add board cleanup in deck integration test. Gate
CI unit tests on linting and skip Astrolabe build for single-user profile.
Fix test markers from oauth to multi_user_basic for astrolabe integration
tests. Update login_flow.py docstrings to document outbound HTTP calls.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2026-03-01 16:35:31 +01:00
parent 33cf0fee9b
commit db1e0606ad
15 changed files with 248 additions and 131 deletions
+4
View File
@@ -21,6 +21,7 @@ jobs:
unit-test:
runs-on: ubuntu-latest
needs: [linting]
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install the latest version of uv
@@ -79,6 +80,7 @@ jobs:
submodules: 'true'
- name: Set up PHP 8.4
if: matrix.mode != 'single-user'
uses: shivammathur/setup-php@44454db4f0199b8b9685a5d763dc37cbf79108e1 # 2.36.0
with:
php-version: 8.4
@@ -87,11 +89,13 @@ jobs:
# OIDC app installed from app store (dev mount removed from docker-compose.yml)
- name: Set up Node.js
if: matrix.mode != 'single-user'
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: 22
- name: Build Astrolabe app
if: matrix.mode != 'single-user'
run: |
cd third_party/astrolabe
composer install --no-dev --optimize-autoloader
+5 -3
View File
@@ -148,7 +148,7 @@ services:
# Token storage (required for middleware initialization)
# DEVELOPMENT ONLY - generate a fresh key for production:
# python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
- TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo=
- TOKEN_ENCRYPTION_KEY=fqqI4G51yBCOcu9cvv6wCUJB7sf_CK2za5ClC6b86yY=
- TOKEN_STORAGE_DB=/app/data/tokens.db
- ENABLE_SEMANTIC_SEARCH=true
@@ -187,7 +187,7 @@ services:
# Refresh token storage (ADR-002 Tier 1)
- ENABLE_BACKGROUND_OPERATIONS=true
- TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo=
- TOKEN_ENCRYPTION_KEY=Qh60VwZQsM7CLtSMunzC0gIGPBT948S6VSawUkODtvU=
- TOKEN_STORAGE_DB=/app/data/tokens.db
# ADR-005: Multi-audience mode (default - ENABLE_TOKEN_EXCHANGE=false)
@@ -307,7 +307,9 @@ services:
- ENABLE_LOGIN_FLOW=true
# Token storage (required for app password + session persistence)
- TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo=
# DEVELOPMENT ONLY - generate a fresh key for production:
# python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
- TOKEN_ENCRYPTION_KEY=rxJvkBf7ZBjZZDL4a1sSqjhmjawhmbRMSOGfK8HDyKU=
- TOKEN_STORAGE_DB=/app/data/tokens.db
# Semantic search
+2 -4
View File
@@ -131,12 +131,10 @@ async def update_user_scopes(request: Request) -> JSONResponse:
status_code=404,
)
# Re-store with updated scopes (password and username unchanged)
await storage.store_app_password_with_scopes(
# Update scopes only (no decrypt/re-encrypt of the password)
await storage.update_app_password_scopes(
user_id=username,
app_password=existing["app_password"],
scopes=scopes,
username=existing.get("username"),
)
return JSONResponse(
+35 -3
View File
@@ -72,7 +72,7 @@ from nextcloud_mcp_server.auth.oauth_routes import (
oauth_callback_nextcloud,
)
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage, get_shared_storage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.auth.userinfo_routes import (
@@ -1528,6 +1528,18 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
mcp_app = mcp.streamable_http_app()
async def _login_flow_cleanup_loop() -> None:
"""Periodically clean up expired Login Flow v2 sessions."""
while True:
await anyio.sleep(3600) # Every hour
try:
storage = await get_shared_storage()
count = await storage.delete_expired_login_flow_sessions()
if count:
logger.info(f"Cleaned up {count} expired login flow sessions")
except Exception as e:
logger.warning(f"Login flow cleanup error: {e}")
@asynccontextmanager
async def starlette_lifespan(app: Starlette):
# Set OAuth context for OAuth login routes (ADR-004)
@@ -1760,6 +1772,10 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
f"{settings.vector_sync_processor_workers} processors"
)
# Start Login Flow cleanup task if enabled
if settings.enable_login_flow:
tg.start_soon(_login_flow_cleanup_loop)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
@@ -1943,6 +1959,10 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
f"{settings.vector_sync_processor_workers} processors"
)
# Start Login Flow cleanup task if enabled
if settings.enable_login_flow:
tg.start_soon(_login_flow_cleanup_loop)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
@@ -1966,7 +1986,13 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Just run MCP session manager without vector sync
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
if settings.enable_login_flow:
async with anyio.create_task_group() as cleanup_tg:
cleanup_tg.start_soon(_login_flow_cleanup_loop)
yield
cleanup_tg.cancel_scope.cancel()
else:
yield
else:
# No vector sync - just run MCP session manager
@@ -1987,7 +2013,13 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
)
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
if settings.enable_login_flow:
async with anyio.create_task_group() as cleanup_tg:
cleanup_tg.start_soon(_login_flow_cleanup_loop)
yield
cleanup_tg.cancel_scope.cancel()
else:
yield
# Health check endpoints for Kubernetes probes
def health_live(request):
+6 -4
View File
@@ -62,9 +62,10 @@ class LoginFlowV2Client:
async def initiate(
self, user_agent: str = "nextcloud-mcp-server"
) -> LoginFlowInitResponse:
"""Initiate Login Flow v2.
"""Initiate Login Flow v2 by sending an HTTP POST to the Nextcloud instance.
Posts to /index.php/login/v2 to start a new login flow.
Makes an outbound HTTP request to POST /index.php/login/v2 on the
configured Nextcloud server to start a new login flow.
Args:
user_agent: User-Agent string for the app password name
@@ -99,9 +100,10 @@ class LoginFlowV2Client:
return result
async def poll(self, poll_endpoint: str, poll_token: str) -> LoginFlowPollResult:
"""Poll for Login Flow v2 completion.
"""Poll for Login Flow v2 completion by sending an HTTP POST to the Nextcloud instance.
Posts to the poll endpoint with the token. Nextcloud returns:
Makes an outbound HTTP request to the poll endpoint provided by the
initiate response. Nextcloud returns:
- 200 with credentials when the user completes login
- 404 when still pending
- Other errors for expired/invalid flows
@@ -9,7 +9,7 @@ from mcp.server.auth.provider import AccessToken
from mcp.server.fastmcp import Context
from mcp.server.fastmcp.utilities.context_injection import find_context_parameter
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.storage import get_shared_storage
from nextcloud_mcp_server.config import get_settings
logger = logging.getLogger(__name__)
@@ -478,18 +478,6 @@ def discover_all_scopes(mcp) -> list[str]:
# ── Login Flow v2 helpers ────────────────────────────────────────────────
_scope_storage_instance = None
async def _get_scope_storage():
"""Get initialized storage instance for scope checks (lazy singleton)."""
global _scope_storage_instance
if _scope_storage_instance is None:
_scope_storage_instance = RefreshTokenStorage.from_env()
await _scope_storage_instance.initialize()
return _scope_storage_instance
async def _get_stored_scopes(user_id: str) -> list[str] | str | None:
"""Look up stored app password scopes for a user.
@@ -497,16 +485,16 @@ async def _get_stored_scopes(user_id: str) -> list[str] | str | None:
- list[str]: Specific scopes granted
- "all": NULL scopes in DB (legacy = all allowed)
- None: No stored app password (provisioning required)
"""
try:
storage = await _get_scope_storage()
data = await storage.get_app_password_with_scopes(user_id)
if data is None:
return None
if data["scopes"] is None:
return "all"
return data["scopes"]
except Exception as e:
logger.error(f"Failed to check stored scopes for {user_id}: {e}")
Raises:
Storage/infrastructure exceptions propagate to the caller
(require_scopes decorator) for proper MCP error responses.
"""
storage = await get_shared_storage()
data = await storage.get_app_password_with_scopes(user_id)
if data is None:
return None
if data["scopes"] is None:
return "all"
return data["scopes"]
+53
View File
@@ -1614,6 +1614,40 @@ class RefreshTokenStorage:
)
return None
async def update_app_password_scopes(self, user_id: str, scopes: list[str]) -> bool:
"""Update only the scopes for an existing app password (no decrypt/re-encrypt).
Args:
user_id: MCP user ID
scopes: New scope list
Returns:
True if a row was updated, False if user not found
"""
if not self._initialized:
await self.initialize()
scopes_json = json.dumps(scopes)
now = int(time.time())
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"UPDATE app_passwords SET scopes = ?, updated_at = ? WHERE user_id = ?",
(scopes_json, now, user_id),
)
await db.commit()
updated = cursor.rowcount > 0
duration = time.time() - start_time
record_db_operation("sqlite", "update", duration, "success")
return updated
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "update", duration, "error")
raise
# ── Login Flow v2: Session Tracking ──────────────────────────────────
async def store_login_flow_session(
@@ -1811,6 +1845,25 @@ class RefreshTokenStorage:
raise
_shared_instance: RefreshTokenStorage | None = None
_shared_lock = anyio.Lock()
async def get_shared_storage() -> RefreshTokenStorage:
"""Get the process-wide RefreshTokenStorage singleton (lock-protected).
All modules that need storage should use this function instead of
creating their own lazy singletons. The lock ensures thread-safe
initialization on concurrent first-access.
"""
global _shared_instance
async with _shared_lock:
if _shared_instance is None:
_shared_instance = RefreshTokenStorage.from_env()
await _shared_instance.initialize()
return _shared_instance
async def generate_encryption_key() -> str:
"""
Generate a new Fernet encryption key.
-18
View File
@@ -207,10 +207,6 @@ class Settings:
# Login Flow v2 settings (ADR-022)
enable_login_flow: bool = False
login_flow_poll_interval: int = 2 # seconds between polls
login_flow_poll_timeout: int = 300 # max seconds to wait for completion
login_flow_cleanup_interval: int = 3600 # seconds between expired session cleanup
app_password_max_age_days: int = 0 # 0 = no expiration
# Token exchange cache settings
token_exchange_cache_ttl: int = 300 # seconds (5 minutes default)
@@ -268,14 +264,6 @@ class Settings:
"""Validate configuration and set defaults."""
logger = logging.getLogger(__name__)
# Deprecation warning: NEXTCLOUD_PASSWORD without NEXTCLOUD_APP_PASSWORD
if self.nextcloud_password and not self.nextcloud_app_password:
logger.warning(
"NEXTCLOUD_PASSWORD is deprecated for app password usage. "
"Please use NEXTCLOUD_APP_PASSWORD instead. "
"Support for NEXTCLOUD_PASSWORD as app password will be removed in v1.0.0."
)
# Validate SSL/TLS configuration
if not self.nextcloud_verify_ssl:
logger.warning(
@@ -563,12 +551,6 @@ def get_settings() -> Settings:
),
# Login Flow v2 settings (ADR-022)
enable_login_flow=(os.getenv("ENABLE_LOGIN_FLOW", "false").lower() == "true"),
login_flow_poll_interval=int(os.getenv("LOGIN_FLOW_POLL_INTERVAL", "2")),
login_flow_poll_timeout=int(os.getenv("LOGIN_FLOW_POLL_TIMEOUT", "300")),
login_flow_cleanup_interval=int(
os.getenv("LOGIN_FLOW_CLEANUP_INTERVAL", "3600")
),
app_password_max_age_days=int(os.getenv("APP_PASSWORD_MAX_AGE_DAYS", "0")),
# Token exchange cache settings
token_exchange_cache_ttl=int(os.getenv("TOKEN_EXCHANGE_CACHE_TTL", "300")),
# Token and webhook storage settings (encryption key optional for webhook-only usage)
+2 -14
View File
@@ -10,7 +10,7 @@ from nextcloud_mcp_server.auth.context_helper import (
get_session_client_from_context,
)
from nextcloud_mcp_server.auth.scope_authorization import ProvisioningRequiredError
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.storage import get_shared_storage
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import (
DeploymentMode,
@@ -254,18 +254,6 @@ def _get_client_from_basic_auth(ctx: Context) -> NextcloudClient:
)
_login_flow_storage_instance = None
async def _get_login_flow_storage():
"""Get initialized storage instance for login flow (lazy singleton)."""
global _login_flow_storage_instance
if _login_flow_storage_instance is None:
_login_flow_storage_instance = RefreshTokenStorage.from_env()
await _login_flow_storage_instance.initialize()
return _login_flow_storage_instance
async def _get_client_from_login_flow(
ctx: Context, nextcloud_host: str
) -> NextcloudClient:
@@ -294,7 +282,7 @@ async def _get_client_from_login_flow(
"Cannot determine user identity from MCP token."
)
storage = await _get_login_flow_storage()
storage = await get_shared_storage()
app_data = await storage.get_app_password_with_scopes(user_id)
if not app_data:
+61 -18
View File
@@ -15,7 +15,7 @@ from mcp.types import ToolAnnotations
from nextcloud_mcp_server.auth.elicitation import present_login_url
from nextcloud_mcp_server.auth.login_flow import LoginFlowV2Client
from nextcloud_mcp_server.auth.scope_authorization import require_scopes
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.storage import get_shared_storage
from nextcloud_mcp_server.config import get_nextcloud_ssl_verify, get_settings
from nextcloud_mcp_server.models.auth import (
ALL_SUPPORTED_SCOPES,
@@ -28,18 +28,6 @@ from nextcloud_mcp_server.server.oauth_tools import extract_user_id_from_token
logger = logging.getLogger(__name__)
_storage_instance: RefreshTokenStorage | None = None
async def _get_storage() -> RefreshTokenStorage:
"""Get initialized storage instance (lazy singleton)."""
global _storage_instance
if _storage_instance is None:
_storage_instance = RefreshTokenStorage.from_env()
await _storage_instance.initialize()
return _storage_instance
def register_auth_tools(mcp) -> None:
"""Register Login Flow v2 auth tools with the MCP server."""
@@ -79,7 +67,7 @@ def register_auth_tools(mcp) -> None:
success=False,
)
storage = await _get_storage()
storage = await get_shared_storage()
# Check if already provisioned
existing = await storage.get_app_password_with_scopes(user_id)
@@ -143,6 +131,24 @@ def register_auth_tools(mcp) -> None:
# Present login URL to user via elicitation
elicitation_result = await present_login_url(ctx, init_response.login_url)
if elicitation_result == "declined":
await storage.delete_login_flow_session(user_id)
return ProvisionAccessResponse(
status="declined",
message="Login flow declined. Call nc_auth_provision_access again to retry.",
user_id=user_id,
success=False,
)
if elicitation_result == "cancelled":
await storage.delete_login_flow_session(user_id)
return ProvisionAccessResponse(
status="cancelled",
message="Login flow cancelled. Call nc_auth_provision_access again to retry.",
user_id=user_id,
success=False,
)
message = (
f"Please open this URL in your browser to log in to Nextcloud:\n\n"
f"{init_response.login_url}\n\n"
@@ -192,7 +198,7 @@ def register_auth_tools(mcp) -> None:
success=False,
)
storage = await _get_storage()
storage = await get_shared_storage()
# Check for existing app password
existing = await storage.get_app_password_with_scopes(user_id)
@@ -336,11 +342,18 @@ def register_auth_tools(mcp) -> None:
success=False,
)
storage = await _get_storage()
storage = await get_shared_storage()
# Get current state
# Get current state - require existing provisioning
existing = await storage.get_app_password_with_scopes(user_id)
previous_scopes = existing["scopes"] if existing else None
if existing is None:
return UpdateScopesResponse(
status="error",
message="Not provisioned. Call nc_auth_provision_access first.",
success=False,
)
previous_scopes = existing["scopes"]
# Compute new scope set
current_set = (
@@ -367,6 +380,16 @@ def register_auth_tools(mcp) -> None:
success=False,
)
# No-op detection: skip Login Flow if scopes are unchanged
previous_scopes_set = set(previous_scopes) if previous_scopes else set()
if set(new_scopes) == previous_scopes_set:
return UpdateScopesResponse(
status="unchanged",
message="Requested scopes match current scopes. No changes needed.",
previous_scopes=previous_scopes,
new_scopes=new_scopes,
)
# Initiate new Login Flow v2
# Note: existing app password stays valid until the new flow completes.
# store_app_password_with_scopes() does an upsert, so the old password
@@ -405,6 +428,26 @@ def register_auth_tools(mcp) -> None:
# Present login URL
elicitation_result = await present_login_url(ctx, init_response.login_url)
if elicitation_result == "declined":
await storage.delete_login_flow_session(user_id)
return UpdateScopesResponse(
status="declined",
message="Scope update declined. Call nc_auth_update_scopes again to retry.",
previous_scopes=previous_scopes if previous_scopes else None,
new_scopes=new_scopes,
success=False,
)
if elicitation_result == "cancelled":
await storage.delete_login_flow_session(user_id)
return UpdateScopesResponse(
status="cancelled",
message="Scope update cancelled. Call nc_auth_update_scopes again to retry.",
previous_scopes=previous_scopes if previous_scopes else None,
new_scopes=new_scopes,
success=False,
)
message = (
f"Scope update requires re-authentication.\n\n"
f"Please open this URL to log in:\n{init_response.login_url}\n\n"
@@ -27,7 +27,7 @@ from playwright.async_api import Page
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
pytestmark = [pytest.mark.integration, pytest.mark.multi_user_basic]
async def login_to_nextcloud(page: Page, username: str, password: str):
@@ -899,7 +899,7 @@ def clear_stale_test_state(clear_preferences: bool = False) -> None:
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_multi_user_astrolabe_background_sync_enablement(
browser,
nc_client,
@@ -1246,7 +1246,7 @@ async def verify_app_password_deleted(username: str) -> bool:
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_revoke_background_sync_access(
browser,
nc_client,
@@ -35,7 +35,7 @@ from tests.integration.test_astrolabe_multi_user_background_sync import (
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
pytestmark = [pytest.mark.integration, pytest.mark.multi_user_basic]
async def wait_for_vector_sync(
@@ -101,7 +101,7 @@ async def navigate_to_astrolabe_main(page: Page):
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
@pytest.mark.timeout(
300
) # 5 minutes - this test involves OAuth, app password, and vector sync
@@ -30,7 +30,7 @@ import anyio
import pytest
from playwright.async_api import Page
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
pytestmark = [pytest.mark.integration, pytest.mark.multi_user_basic]
logger = logging.getLogger(__name__)
@@ -334,7 +334,7 @@ def delete_user_credentials(username: str) -> bool:
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_app_password_storage_and_cleanup(
browser,
nc_client,
@@ -440,7 +440,7 @@ async def test_app_password_storage_and_cleanup(
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_credential_isolation_between_users(
browser,
nc_client,
@@ -549,7 +549,7 @@ async def test_credential_isolation_between_users(
@pytest.mark.integration
@pytest.mark.oauth
@pytest.mark.multi_user_basic
async def test_credential_revoke_and_reprovision(
browser,
nc_client,
@@ -443,35 +443,59 @@ class TestLoginFlowDeck:
async def test_deck_board_workflow(self, nc_mcp_login_flow_client: ClientSession):
"""Create board → list boards → get board details."""
import os
import httpx
suffix = uuid.uuid4().hex[:8]
board_title = f"LoginFlow Board {suffix}"
board_id = None
# Create board (requires title and color)
create_result = await nc_mcp_login_flow_client.call_tool(
"deck_create_board", {"title": board_title, "color": "0076D1"}
)
assert create_result.isError is False, (
f"Create board failed: {create_result.content[0].text}"
)
board_data = json.loads(create_result.content[0].text)
board_id = board_data.get("id") or board_data.get("board_id")
logger.info(f"Created board: {board_id}")
try:
# Create board (requires title and color)
create_result = await nc_mcp_login_flow_client.call_tool(
"deck_create_board", {"title": board_title, "color": "0076D1"}
)
assert create_result.isError is False, (
f"Create board failed: {create_result.content[0].text}"
)
board_data = json.loads(create_result.content[0].text)
board_id = board_data.get("id") or board_data.get("board_id")
logger.info(f"Created board: {board_id}")
# List boards (tool name is deck_get_boards)
list_result = await nc_mcp_login_flow_client.call_tool("deck_get_boards", {})
assert list_result.isError is False
boards_data = json.loads(list_result.content[0].text)
boards = boards_data.get("boards", [])
board_ids = [b.get("id") for b in boards]
assert board_id in board_ids
# List boards (tool name is deck_get_boards)
list_result = await nc_mcp_login_flow_client.call_tool(
"deck_get_boards", {}
)
assert list_result.isError is False
boards_data = json.loads(list_result.content[0].text)
boards = boards_data.get("boards", [])
board_ids = [b.get("id") for b in boards]
assert board_id in board_ids
# Get board details
detail_result = await nc_mcp_login_flow_client.call_tool(
"deck_get_board", {"board_id": board_id}
)
assert detail_result.isError is False
# Note: no deck_delete_board tool exists, board cleanup is manual
# Get board details
detail_result = await nc_mcp_login_flow_client.call_tool(
"deck_get_board", {"board_id": board_id}
)
assert detail_result.isError is False
finally:
# Clean up board via Deck REST API (no MCP delete_board tool exists)
if board_id is not None:
nc_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
nc_user = os.getenv("NEXTCLOUD_USERNAME", "admin")
nc_pass = os.getenv("NEXTCLOUD_PASSWORD", "admin")
try:
async with httpx.AsyncClient(
base_url=nc_host,
auth=httpx.BasicAuth(nc_user, nc_pass),
headers={"OCS-APIREQUEST": "true"},
) as client:
resp = await client.delete(
f"/apps/deck/api/v1.0/boards/{board_id}"
)
logger.info(f"Board cleanup: {board_id}{resp.status_code}")
except Exception as e:
logger.warning(f"Board cleanup failed: {e}")
# ---------------------------------------------------------------------------
+11 -10
View File
@@ -27,7 +27,7 @@ async def test_get_stored_scopes_with_scopes():
}
with patch(
"nextcloud_mcp_server.auth.scope_authorization._get_scope_storage",
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
):
result = await _get_stored_scopes("alice")
@@ -47,7 +47,7 @@ async def test_get_stored_scopes_null_scopes():
}
with patch(
"nextcloud_mcp_server.auth.scope_authorization._get_scope_storage",
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
):
result = await _get_stored_scopes("bob")
@@ -61,7 +61,7 @@ async def test_get_stored_scopes_no_password():
mock_storage.get_app_password_with_scopes.return_value = None
with patch(
"nextcloud_mcp_server.auth.scope_authorization._get_scope_storage",
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
):
result = await _get_stored_scopes("nobody")
@@ -70,14 +70,15 @@ async def test_get_stored_scopes_no_password():
async def test_get_stored_scopes_storage_error():
"""Test that storage errors return None (fail-closed)."""
"""Test that storage errors propagate to the caller."""
mock_storage = AsyncMock()
mock_storage.get_app_password_with_scopes.side_effect = RuntimeError("DB error")
with patch(
"nextcloud_mcp_server.auth.scope_authorization._get_scope_storage",
return_value=mock_storage,
with (
patch(
"nextcloud_mcp_server.auth.scope_authorization.get_shared_storage",
return_value=mock_storage,
),
pytest.raises(RuntimeError, match="DB error"),
):
result = await _get_stored_scopes("alice")
assert result is None
await _get_stored_scopes("alice")