From 894bf5f916496baebb9fdac701c874a247352c1a Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Thu, 25 Dec 2025 08:27:15 -0600 Subject: [PATCH] refactor(auth): Decouple BasicAuth and OAuth authentication strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completely separates multi-user BasicAuth mode from OAuth mode with no fallback between them. These are now mutually exclusive authentication strategies based on deployment configuration. Changes: - Create separate functions: get_user_client_basic_auth() and get_user_client_oauth() with clear separation of concerns - Update get_user_client() to dispatch based on use_basic_auth parameter - Pass use_basic_auth through all background sync tasks - Update app.py to determine auth mode at startup - Rewrite integration tests to verify no OAuth fallback in BasicAuth mode - Fix test assertions for response field names and duplicate title handling 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- nextcloud_mcp_server/app.py | 11 +- nextcloud_mcp_server/vector/oauth_sync.py | 255 ++++++++++++------ pyproject.toml | 2 +- .../test_app_password_provisioning.py | 209 +++++++++++--- .../integration/test_multi_user_basic_auth.py | 49 +++- 5 files changed, 390 insertions(+), 136 deletions(-) diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 2ae3665..bd97746 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -1841,6 +1841,11 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = ) break + # Determine authentication mode for background sync + # Multi-user BasicAuth: use app passwords via Astrolabe (NOT OAuth) + # OAuth mode: use OAuth refresh tokens (NOT app passwords) + use_basic_auth = not oauth_enabled + # Start background tasks using anyio TaskGroup async with anyio_module.create_task_group() as tg: # Start user manager task (supervises per-user scanners) @@ -1849,11 +1854,12 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = send_stream, shutdown_event, scanner_wake_event, - token_broker, + token_broker if not use_basic_auth else None, token_storage, # Use token_storage (works for both OAuth and multi-user BasicAuth) nextcloud_host_for_sync, user_states, tg, + use_basic_auth, # Pass as positional arg (before task_status) ) # Start processor pool (each gets a cloned receive stream) @@ -1863,8 +1869,9 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = i, receive_stream.clone(), shutdown_event, - token_broker, + token_broker if not use_basic_auth else None, nextcloud_host_for_sync, + use_basic_auth, # Pass as positional arg (before task_status) ) logger.info( diff --git a/nextcloud_mcp_server/vector/oauth_sync.py b/nextcloud_mcp_server/vector/oauth_sync.py index a2e7bc0..9e1810a 100644 --- a/nextcloud_mcp_server/vector/oauth_sync.py +++ b/nextcloud_mcp_server/vector/oauth_sync.py @@ -1,14 +1,23 @@ -"""OAuth mode vector sync orchestration. +"""Multi-user vector sync orchestration. -Manages multi-user background vector sync when running in OAuth mode -with ENABLE_OFFLINE_ACCESS=true: -- User Manager: Monitors RefreshTokenStorage for user changes +Manages background vector sync for multi-user deployments: +- User Manager: Monitors storage for user changes - Per-User Scanners: One scanner task per provisioned user - Shared Processor Pool: Processes documents from all users -Supports dual credential types for background sync: -- App passwords (interim solution, works today) -- OAuth refresh tokens (future, when Nextcloud supports OAuth for app APIs) +Authentication strategies are mutually exclusive by deployment mode: + +Multi-user BasicAuth mode (ENABLE_MULTI_USER_BASIC_AUTH=true): +- Uses app passwords obtained via Astrolabe Management API +- Users provision via Astrolabe personal settings +- OAuth is NOT used + +OAuth mode (with external IdP like Keycloak): +- Uses OAuth refresh tokens via TokenBrokerService +- Users provision via browser OAuth flow +- App passwords are NOT used + +These are separate concerns - no fallback between them. """ import logging @@ -59,16 +68,64 @@ class UserSyncState: started_at: float = field(default_factory=time.time) -async def get_user_client( +async def get_user_client_basic_auth( + user_id: str, + nextcloud_host: str, +) -> NextcloudClient: + """Get an authenticated NextcloudClient using app password (BasicAuth mode). + + For multi-user BasicAuth deployments where users provision app passwords + via Astrolabe personal settings. OAuth is NOT used in this mode. + + Args: + user_id: User identifier + nextcloud_host: Nextcloud base URL + + Returns: + Authenticated NextcloudClient with BasicAuth + + Raises: + NotProvisionedError: If user has not provisioned an app password + """ + settings = get_settings() + + if not settings.oidc_client_id or not settings.oidc_client_secret: + raise NotProvisionedError( + "Astrolabe client credentials not configured. " + "Set OIDC_CLIENT_ID and OIDC_CLIENT_SECRET for app password retrieval." + ) + + astrolabe = AstrolabeClient( + nextcloud_host=nextcloud_host, + client_id=settings.oidc_client_id, + client_secret=settings.oidc_client_secret, + ) + + app_password = await astrolabe.get_user_app_password(user_id) + + if not app_password: + raise NotProvisionedError( + f"User {user_id} has not provisioned an app password. " + f"User must configure background sync in Astrolabe personal settings." + ) + + logger.info(f"Using app password for background sync: {user_id}") + return NextcloudClient( + base_url=nextcloud_host, + username=user_id, + auth=BasicAuth(user_id, app_password), + ) + + +async def get_user_client_oauth( user_id: str, token_broker: "TokenBrokerService", nextcloud_host: str, ) -> NextcloudClient: - """Get an authenticated NextcloudClient for a user. + """Get an authenticated NextcloudClient using OAuth refresh token. - Supports dual credential types with priority: - 1. App password from Astrolabe (works today with BasicAuth) - 2. OAuth refresh token from storage (for future when OAuth fully supported) + For OAuth deployments with external IdP where users provision via + browser OAuth flow. App passwords are NOT used in this mode. Args: user_id: User identifier @@ -76,45 +133,19 @@ async def get_user_client( nextcloud_host: Nextcloud base URL Returns: - Authenticated NextcloudClient + Authenticated NextcloudClient with Bearer token Raises: NotProvisionedError: If user has not provisioned offline access """ - settings = get_settings() - - # Try app password first (interim solution, works today) - if settings.oidc_client_id and settings.oidc_client_secret: - try: - astrolabe = AstrolabeClient( - nextcloud_host=nextcloud_host, - client_id=settings.oidc_client_id, - client_secret=settings.oidc_client_secret, - ) - app_password = await astrolabe.get_user_app_password(user_id) - - if app_password: - logger.info( - f"Using app password for background sync: {user_id} " - f"(credential_type=app_password)" - ) - return NextcloudClient( - base_url=nextcloud_host, - username=user_id, - auth=BasicAuth(user_id, app_password), - ) - except Exception as e: - logger.debug(f"App password not available for {user_id}: {e}") - - # Fall back to OAuth refresh token - logger.info( - f"Using OAuth refresh token for background sync: {user_id} " - f"(credential_type=refresh_token)" - ) token = await token_broker.get_background_token(user_id, VECTOR_SYNC_SCOPES) if not token: - raise NotProvisionedError(f"User {user_id} has not provisioned offline access") + raise NotProvisionedError( + f"User {user_id} has not provisioned offline access. " + f"User must complete the OAuth provisioning flow." + ) + logger.info(f"Using OAuth refresh token for background sync: {user_id}") return NextcloudClient.from_token( base_url=nextcloud_host, token=token, @@ -122,30 +153,66 @@ async def get_user_client( ) +async def get_user_client( + user_id: str, + token_broker: "TokenBrokerService | None", + nextcloud_host: str, + *, + use_basic_auth: bool = False, +) -> NextcloudClient: + """Get an authenticated NextcloudClient for a user. + + Dispatches to the appropriate authentication strategy based on mode. + These are mutually exclusive - no fallback between them. + + Args: + user_id: User identifier + token_broker: Token broker for OAuth mode (can be None for BasicAuth mode) + nextcloud_host: Nextcloud base URL + use_basic_auth: If True, use app passwords via Astrolabe (BasicAuth mode). + If False, use OAuth refresh tokens (OAuth mode). + + Returns: + Authenticated NextcloudClient + + Raises: + NotProvisionedError: If user has not provisioned access for the mode + """ + if use_basic_auth: + return await get_user_client_basic_auth(user_id, nextcloud_host) + else: + if token_broker is None: + raise ValueError("token_broker required for OAuth mode") + return await get_user_client_oauth(user_id, token_broker, nextcloud_host) + + async def user_scanner_task( user_id: str, send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, wake_event: anyio.Event, - token_broker: "TokenBrokerService", + token_broker: "TokenBrokerService | None", nextcloud_host: str, *, + use_basic_auth: bool = False, task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ) -> None: - """Scanner task for a single user in OAuth mode. + """Scanner task for a single user. - Gets a fresh token at the start of each scan cycle. + Gets fresh credentials at the start of each scan cycle. Args: user_id: User to scan send_stream: Stream to send changed documents to processors shutdown_event: Event signaling shutdown wake_event: Event to trigger immediate scan - token_broker: Token broker for obtaining access tokens + token_broker: Token broker for OAuth mode (None for BasicAuth mode) nextcloud_host: Nextcloud base URL + use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ - logger.info(f"[OAuth] Scanner started for user: {user_id}") + mode_label = "BasicAuth" if use_basic_auth else "OAuth" + logger.info(f"[{mode_label}] Scanner started for user: {user_id}") settings = get_settings() task_status.started() @@ -153,8 +220,10 @@ async def user_scanner_task( while not shutdown_event.is_set(): nc_client = None try: - # Get fresh token for this scan cycle - nc_client = await get_user_client(user_id, token_broker, nextcloud_host) + # Get fresh credentials for this scan cycle + nc_client = await get_user_client( + user_id, token_broker, nextcloud_host, use_basic_auth=use_basic_auth + ) # Scan user's documents await scan_user_documents( @@ -165,12 +234,14 @@ async def user_scanner_task( except NotProvisionedError: logger.warning( - f"[OAuth] User {user_id} no longer provisioned, stopping scanner" + f"[{mode_label}] User {user_id} no longer provisioned, stopping scanner" ) break except Exception as e: - logger.error(f"[OAuth] Scanner error for {user_id}: {e}", exc_info=True) + logger.error( + f"[{mode_label}] Scanner error for {user_id}: {e}", exc_info=True + ) finally: if nc_client: @@ -183,33 +254,36 @@ async def user_scanner_task( except anyio.get_cancelled_exc_class(): break - logger.info(f"[OAuth] Scanner stopped for user: {user_id}") + logger.info(f"[{mode_label}] Scanner stopped for user: {user_id}") -async def oauth_processor_task( +async def multi_user_processor_task( worker_id: int, receive_stream: MemoryObjectReceiveStream[DocumentTask], shutdown_event: anyio.Event, - token_broker: "TokenBrokerService", + token_broker: "TokenBrokerService | None", nextcloud_host: str, + use_basic_auth: bool = False, *, task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ) -> None: - """Processor task for OAuth mode. + """Processor task for multi-user mode. - Handles documents from any user by fetching tokens on-demand. + Handles documents from any user by fetching credentials on-demand. Args: worker_id: Worker identifier for logging receive_stream: Stream to receive documents from shutdown_event: Event signaling shutdown - token_broker: Token broker for obtaining access tokens + token_broker: Token broker for OAuth mode (None for BasicAuth mode) nextcloud_host: Nextcloud base URL + use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ from nextcloud_mcp_server.vector.processor import process_document - logger.info(f"[OAuth] Processor {worker_id} started") + mode_label = "BasicAuth" if use_basic_auth else "OAuth" + logger.info(f"[{mode_label}] Processor {worker_id} started") task_status.started() while not shutdown_event.is_set(): @@ -220,9 +294,12 @@ async def oauth_processor_task( with anyio.fail_after(1.0): doc_task = await receive_stream.receive() - # Get token for THIS document's user + # Get credentials for THIS document's user nc_client = await get_user_client( - doc_task.user_id, token_broker, nextcloud_host + doc_task.user_id, + token_broker, + nextcloud_host, + use_basic_auth=use_basic_auth, ) # Process the document @@ -232,13 +309,13 @@ async def oauth_processor_task( continue except anyio.EndOfStream: - logger.info(f"[OAuth] Processor {worker_id}: Stream closed, exiting") + logger.info(f"[{mode_label}] Processor {worker_id}: Stream closed, exiting") break except NotProvisionedError: if doc_task: logger.warning( - f"[OAuth] User {doc_task.user_id} not provisioned, " + f"[{mode_label}] User {doc_task.user_id} not provisioned, " f"skipping {doc_task.doc_type}_{doc_task.doc_id}" ) continue @@ -246,18 +323,24 @@ async def oauth_processor_task( except Exception as e: if doc_task: logger.error( - f"[OAuth] Processor {worker_id} error processing " + f"[{mode_label}] Processor {worker_id} error processing " f"{doc_task.doc_type}_{doc_task.doc_id}: {e}", exc_info=True, ) else: - logger.error(f"[OAuth] Processor {worker_id} error: {e}", exc_info=True) + logger.error( + f"[{mode_label}] Processor {worker_id} error: {e}", exc_info=True + ) finally: if nc_client: await nc_client.close() - logger.info(f"[OAuth] Processor {worker_id} stopped") + logger.info(f"[{mode_label}] Processor {worker_id} stopped") + + +# Backward compatibility alias +oauth_processor_task = multi_user_processor_task async def _run_user_scanner_with_scope( @@ -266,9 +349,10 @@ async def _run_user_scanner_with_scope( send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, wake_event: anyio.Event, - token_broker: "TokenBrokerService", + token_broker: "TokenBrokerService | None", nextcloud_host: str, user_states: dict[str, UserSyncState], + use_basic_auth: bool = False, ) -> None: """Wrapper to run scanner with cancellation scope. @@ -284,6 +368,7 @@ async def _run_user_scanner_with_scope( wake_event=wake_event, token_broker=token_broker, nextcloud_host=nextcloud_host, + use_basic_auth=use_basic_auth, ) finally: # Clean up on exit @@ -296,35 +381,40 @@ async def user_manager_task( send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, wake_event: anyio.Event, - token_broker: "TokenBrokerService", + token_broker: "TokenBrokerService | None", refresh_token_storage: "RefreshTokenStorage", nextcloud_host: str, user_states: dict[str, UserSyncState], tg: TaskGroup, + use_basic_auth: bool = False, *, task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ) -> None: """Supervisor task that manages per-user scanners. - Periodically polls RefreshTokenStorage to detect: - - New users who have provisioned offline access -> start scanner + Periodically polls storage to detect: + - New users who have provisioned access -> start scanner - Users who have revoked access -> cancel their scanner Args: send_stream: Stream to send documents to processors shutdown_event: Event signaling shutdown wake_event: Event to wake scanners for immediate scan - token_broker: Token broker for obtaining access tokens - refresh_token_storage: Storage for refresh tokens + token_broker: Token broker for OAuth mode (None for BasicAuth mode) + refresh_token_storage: Storage for tracking provisioned users nextcloud_host: Nextcloud base URL user_states: Shared dict tracking active user scanners tg: Task group for spawning scanner tasks + use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ settings = get_settings() poll_interval = settings.vector_sync_user_poll_interval + mode_label = "BasicAuth" if use_basic_auth else "OAuth" - logger.info(f"[OAuth] User manager started (poll interval: {poll_interval}s)") + logger.info( + f"[{mode_label}] User manager started (poll interval: {poll_interval}s)" + ) task_status.started() while not shutdown_event.is_set(): @@ -337,7 +427,7 @@ async def user_manager_task( new_users = provisioned_users - active_users for user_id in new_users: logger.info( - f"[OAuth] Starting scanner for newly provisioned user: {user_id}" + f"[{mode_label}] Starting scanner for newly provisioned user: {user_id}" ) cancel_scope = anyio.CancelScope() user_states[user_id] = UserSyncState( @@ -356,24 +446,27 @@ async def user_manager_task( token_broker, nextcloud_host, user_states, + use_basic_auth, # Positional after user_states ) # Cancel scanners for revoked users revoked_users = active_users - provisioned_users for user_id in revoked_users: - logger.info(f"[OAuth] Stopping scanner for revoked user: {user_id}") + logger.info( + f"[{mode_label}] Stopping scanner for revoked user: {user_id}" + ) state = user_states.get(user_id) if state: state.cancel_scope.cancel() # Note: state will be removed by _run_user_scanner_with_scope on exit if new_users: - logger.info(f"[OAuth] Started {len(new_users)} new scanner(s)") + logger.info(f"[{mode_label}] Started {len(new_users)} new scanner(s)") if revoked_users: - logger.info(f"[OAuth] Stopped {len(revoked_users)} scanner(s)") + logger.info(f"[{mode_label}] Stopped {len(revoked_users)} scanner(s)") except Exception as e: - logger.error(f"[OAuth] User manager error: {e}", exc_info=True) + logger.error(f"[{mode_label}] User manager error: {e}", exc_info=True) # Sleep until next poll try: @@ -384,9 +477,9 @@ async def user_manager_task( # Cancel all remaining scanners on shutdown logger.info( - f"[OAuth] User manager shutting down, cancelling {len(user_states)} scanner(s)" + f"[{mode_label}] User manager shutting down, cancelling {len(user_states)} scanner(s)" ) for state in list(user_states.values()): state.cancel_scope.cancel() - logger.info("[OAuth] User manager stopped") + logger.info(f"[{mode_label}] User manager stopped") diff --git a/pyproject.toml b/pyproject.toml index 73eea4a..96bab55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ Changelog = "https://github.com/cbcoutinho/nextcloud-mcp-server/blob/master/CHAN [tool.pytest.ini_options] anyio_mode = "auto" -addopts = "-p no:asyncio -x" # Disable pytest-asyncio plugin, use only anyio +addopts = "-p no:asyncio" # Disable pytest-asyncio plugin, use only anyio log_cli = 1 log_cli_level = "ERROR" log_level = "ERROR" diff --git a/tests/integration/test_app_password_provisioning.py b/tests/integration/test_app_password_provisioning.py index e57495a..419cb16 100644 --- a/tests/integration/test_app_password_provisioning.py +++ b/tests/integration/test_app_password_provisioning.py @@ -1,17 +1,24 @@ """Integration tests for app password provisioning via Astrolabe. -Tests the complete flow: +Tests the complete flow for multi-user BasicAuth mode: 1. User stores app password via Astrolabe API 2. MCP server retrieves it via OAuth client credentials -3. Background sync uses it to access Nextcloud +3. Background sync uses it to access Nextcloud (NOT OAuth refresh tokens) + +These tests verify that BasicAuth and OAuth are completely separate concerns +with no fallback between them. """ import pytest -from httpx import BasicAuth from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient from nextcloud_mcp_server.config import get_settings -from nextcloud_mcp_server.vector.oauth_sync import get_user_client +from nextcloud_mcp_server.vector.oauth_sync import ( + NotProvisionedError, + get_user_client, + get_user_client_basic_auth, + get_user_client_oauth, +) @pytest.mark.integration @@ -77,9 +84,20 @@ async def test_get_user_app_password_returns_none_for_unconfigured_user(): @pytest.mark.integration -async def test_dual_credential_support_in_background_sync(mocker): - """Test that background sync tries app password first, then refresh token.""" - from nextcloud_mcp_server.auth.token_broker import TokenBrokerService +async def test_basic_auth_mode_uses_app_password_only(mocker): + """Test that BasicAuth mode uses ONLY app passwords, NOT OAuth tokens. + + In multi-user BasicAuth mode, OAuth refresh tokens are NOT used. + This is a complete separation of concerns. + """ + # Mock settings to have client credentials + mock_settings = mocker.MagicMock() + mock_settings.oidc_client_id = "test-client-id" + mock_settings.oidc_client_secret = "test-client-secret" + mocker.patch( + "nextcloud_mcp_server.vector.oauth_sync.get_settings", + return_value=mock_settings, + ) # Mock AstrolabeClient to return an app password mock_astrolabe = mocker.AsyncMock() @@ -90,35 +108,36 @@ async def test_dual_credential_support_in_background_sync(mocker): return_value=mock_astrolabe, ) - # Mock TokenBrokerService (shouldn't be called if app password works) - mock_token_broker = mocker.MagicMock(spec=TokenBrokerService) + # Call get_user_client in BasicAuth mode + _client = await get_user_client( + user_id="test_user", + token_broker=None, # No token broker needed for BasicAuth mode + nextcloud_host="http://localhost:8080", + use_basic_auth=True, + ) - # Call get_user_client - should use app password - try: - _client = await get_user_client( - user_id="test_user", - token_broker=mock_token_broker, - nextcloud_host="http://localhost:8080", - ) + # Verify app password was requested + mock_astrolabe.get_user_app_password.assert_called_once_with("test_user") - # Verify app password was requested - mock_astrolabe.get_user_app_password.assert_called_once_with("test_user") - - # Verify token broker was NOT called (app password took priority) - mock_token_broker.get_background_token.assert_not_called() - - # Verify client uses BasicAuth - assert _client.auth is not None - assert isinstance(_client.auth, BasicAuth) - except Exception: - # May fail in test environment, but we verified the priority logic - pass + # Verify client was created successfully with correct username + assert _client is not None + assert _client.username == "test_user" @pytest.mark.integration -async def test_background_sync_falls_back_to_refresh_token(mocker): - """Test that background sync falls back to refresh token if no app password.""" - from nextcloud_mcp_server.auth.token_broker import TokenBrokerService +async def test_basic_auth_mode_raises_error_without_app_password(mocker): + """Test that BasicAuth mode raises NotProvisionedError if no app password. + + There is NO fallback to OAuth - if no app password, user must provision one. + """ + # Mock settings to have client credentials + mock_settings = mocker.MagicMock() + mock_settings.oidc_client_id = "test-client-id" + mock_settings.oidc_client_secret = "test-client-secret" + mocker.patch( + "nextcloud_mcp_server.vector.oauth_sync.get_settings", + return_value=mock_settings, + ) # Mock AstrolabeClient to return None (no app password) mock_astrolabe = mocker.AsyncMock() @@ -129,23 +148,131 @@ async def test_background_sync_falls_back_to_refresh_token(mocker): return_value=mock_astrolabe, ) + # Call get_user_client in BasicAuth mode - should raise NotProvisionedError + with pytest.raises(NotProvisionedError) as exc_info: + await get_user_client( + user_id="test_user", + token_broker=None, + nextcloud_host="http://localhost:8080", + use_basic_auth=True, + ) + + # Verify error message mentions app password provisioning + assert "app password" in str(exc_info.value).lower() + assert "test_user" in str(exc_info.value) + + +@pytest.mark.integration +async def test_oauth_mode_uses_refresh_token_only(mocker): + """Test that OAuth mode uses ONLY refresh tokens, NOT app passwords. + + In OAuth mode, app passwords are NOT used. + This is a complete separation of concerns. + """ + from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + # Mock TokenBrokerService to return an access token mock_token_broker = mocker.AsyncMock(spec=TokenBrokerService) mock_token_broker.get_background_token.return_value = "test-access-token" - # Call get_user_client - should fall back to refresh token - try: - _client = await get_user_client( + # Call get_user_client in OAuth mode + _client = await get_user_client( + user_id="test_user", + token_broker=mock_token_broker, + nextcloud_host="http://localhost:8080", + use_basic_auth=False, # OAuth mode + ) + + # Verify token broker was called (NOT Astrolabe) + mock_token_broker.get_background_token.assert_called_once() + + +@pytest.mark.integration +async def test_oauth_mode_raises_error_without_token(mocker): + """Test that OAuth mode raises NotProvisionedError if no refresh token. + + There is NO fallback to app passwords - if no token, user must provision. + """ + from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + + # Mock TokenBrokerService to return None (no token) + mock_token_broker = mocker.AsyncMock(spec=TokenBrokerService) + mock_token_broker.get_background_token.return_value = None + + # Call get_user_client in OAuth mode - should raise NotProvisionedError + with pytest.raises(NotProvisionedError) as exc_info: + await get_user_client( user_id="test_user", token_broker=mock_token_broker, nextcloud_host="http://localhost:8080", + use_basic_auth=False, ) - # Verify app password was attempted first - mock_astrolabe.get_user_app_password.assert_called_once_with("test_user") + # Verify error message mentions OAuth provisioning + assert "oauth" in str(exc_info.value).lower() + assert "test_user" in str(exc_info.value) - # Verify token broker was called as fallback - mock_token_broker.get_background_token.assert_called_once() - except Exception: - # May fail in test environment, but we verified the fallback logic - pass + +@pytest.mark.integration +async def test_get_user_client_basic_auth_function(mocker): + """Test the dedicated get_user_client_basic_auth function.""" + # Mock settings to have client credentials + mock_settings = mocker.MagicMock() + mock_settings.oidc_client_id = "test-client-id" + mock_settings.oidc_client_secret = "test-client-secret" + mocker.patch( + "nextcloud_mcp_server.vector.oauth_sync.get_settings", + return_value=mock_settings, + ) + + # Mock AstrolabeClient + mock_astrolabe = mocker.AsyncMock() + mock_astrolabe.get_user_app_password.return_value = "xxxxx-xxxxx-xxxxx-xxxxx-xxxxx" + + mocker.patch( + "nextcloud_mcp_server.vector.oauth_sync.AstrolabeClient", + return_value=mock_astrolabe, + ) + + # Call dedicated function + client = await get_user_client_basic_auth( + user_id="alice", + nextcloud_host="http://localhost:8080", + ) + + assert client is not None + assert client.username == "alice" + mock_astrolabe.get_user_app_password.assert_called_once_with("alice") + + +@pytest.mark.integration +async def test_get_user_client_oauth_function(mocker): + """Test the dedicated get_user_client_oauth function.""" + from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + + # Mock TokenBrokerService + mock_token_broker = mocker.AsyncMock(spec=TokenBrokerService) + mock_token_broker.get_background_token.return_value = "test-bearer-token" + + # Call dedicated function + client = await get_user_client_oauth( + user_id="alice", + token_broker=mock_token_broker, + nextcloud_host="http://localhost:8080", + ) + + assert client is not None + assert client.username == "alice" + mock_token_broker.get_background_token.assert_called_once() + + +@pytest.mark.integration +async def test_oauth_mode_requires_token_broker(): + """Test that OAuth mode requires a token broker.""" + with pytest.raises(ValueError, match="token_broker required"): + await get_user_client( + user_id="test_user", + token_broker=None, # Missing token broker + nextcloud_host="http://localhost:8080", + use_basic_auth=False, # OAuth mode + ) diff --git a/tests/integration/test_multi_user_basic_auth.py b/tests/integration/test_multi_user_basic_auth.py index 7a48791..9d3df3a 100644 --- a/tests/integration/test_multi_user_basic_auth.py +++ b/tests/integration/test_multi_user_basic_auth.py @@ -4,18 +4,26 @@ Tests that BasicAuth credentials are extracted from request headers and passed through to Nextcloud APIs without storage (stateless). """ +import json + import pytest @pytest.mark.integration -async def test_basic_auth_pass_through_notes_list(nc_mcp_basic_auth_client): - """Test BasicAuth pass-through with notes list tool.""" +async def test_basic_auth_pass_through_notes_search(nc_mcp_basic_auth_client): + """Test BasicAuth pass-through with notes search tool.""" # Call tool - BasicAuth header is set at connection level by fixture - response = await nc_mcp_basic_auth_client.call_tool("nc_notes_list", {}) + response = await nc_mcp_basic_auth_client.call_tool( + "nc_notes_search_notes", {"query": "test"} + ) # Verify tool executed successfully with pass-through auth assert response is not None - assert "results" in response or "content" in response + assert not response.isError, f"Tool returned error: {response.content}" + # Response should have content with results + assert len(response.content) > 0 + data = json.loads(response.content[0].text) + assert "results" in data @pytest.mark.integration @@ -23,7 +31,7 @@ async def test_basic_auth_pass_through_notes_create(nc_mcp_basic_auth_client): """Test BasicAuth pass-through with notes create tool.""" # Create a note using BasicAuth response = await nc_mcp_basic_auth_client.call_tool( - "nc_notes_create", + "nc_notes_create_note", { "title": "BasicAuth Test Note", "content": "This note was created via BasicAuth pass-through", @@ -32,16 +40,35 @@ async def test_basic_auth_pass_through_notes_create(nc_mcp_basic_auth_client): ) assert response is not None - assert response.get("success") is True or "note_id" in response + assert not response.isError, f"Tool returned error: {response.content}" + # Parse response and verify note was created + data = json.loads(response.content[0].text) + assert data.get("success") is True or "note_id" in data @pytest.mark.integration -async def test_basic_auth_pass_through_search(nc_mcp_basic_auth_client): - """Test BasicAuth pass-through with search tool.""" - # Search notes using BasicAuth +async def test_basic_auth_pass_through_get_note(nc_mcp_basic_auth_client): + """Test BasicAuth pass-through with get note tool.""" + # First create a note to get + create_response = await nc_mcp_basic_auth_client.call_tool( + "nc_notes_create_note", + { + "title": "BasicAuth Get Test", + "content": "Note to retrieve", + "category": "Test", + }, + ) + assert not create_response.isError + create_data = json.loads(create_response.content[0].text) + note_id = create_data.get("id") + + # Now get the note using BasicAuth response = await nc_mcp_basic_auth_client.call_tool( - "nc_notes_search", {"query": "BasicAuth"} + "nc_notes_get_note", {"note_id": note_id} ) assert response is not None - assert "results" in response or "content" in response + assert not response.isError, f"Tool returned error: {response.content}" + data = json.loads(response.content[0].text) + # Nextcloud may append a number to duplicate titles + assert data.get("title", "").startswith("BasicAuth Get Test")