refactor(auth): Decouple BasicAuth and OAuth authentication strategies

Completely separates multi-user BasicAuth mode from OAuth mode with no
fallback between them. These are now mutually exclusive authentication
strategies based on deployment configuration.

Changes:
- Create separate functions: get_user_client_basic_auth() and
  get_user_client_oauth() with clear separation of concerns
- Update get_user_client() to dispatch based on use_basic_auth parameter
- Pass use_basic_auth through all background sync tasks
- Update app.py to determine auth mode at startup
- Rewrite integration tests to verify no OAuth fallback in BasicAuth mode
- Fix test assertions for response field names and duplicate title handling

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-12-25 08:27:15 -06:00
parent 804480836e
commit 894bf5f916
5 changed files with 390 additions and 136 deletions
+9 -2
View File
@@ -1841,6 +1841,11 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
)
break
# Determine authentication mode for background sync
# Multi-user BasicAuth: use app passwords via Astrolabe (NOT OAuth)
# OAuth mode: use OAuth refresh tokens (NOT app passwords)
use_basic_auth = not oauth_enabled
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start user manager task (supervises per-user scanners)
@@ -1849,11 +1854,12 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
send_stream,
shutdown_event,
scanner_wake_event,
token_broker,
token_broker if not use_basic_auth else None,
token_storage, # Use token_storage (works for both OAuth and multi-user BasicAuth)
nextcloud_host_for_sync,
user_states,
tg,
use_basic_auth, # Pass as positional arg (before task_status)
)
# Start processor pool (each gets a cloned receive stream)
@@ -1863,8 +1869,9 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
i,
receive_stream.clone(),
shutdown_event,
token_broker,
token_broker if not use_basic_auth else None,
nextcloud_host_for_sync,
use_basic_auth, # Pass as positional arg (before task_status)
)
logger.info(
+174 -81
View File
@@ -1,14 +1,23 @@
"""OAuth mode vector sync orchestration.
"""Multi-user vector sync orchestration.
Manages multi-user background vector sync when running in OAuth mode
with ENABLE_OFFLINE_ACCESS=true:
- User Manager: Monitors RefreshTokenStorage for user changes
Manages background vector sync for multi-user deployments:
- User Manager: Monitors storage for user changes
- Per-User Scanners: One scanner task per provisioned user
- Shared Processor Pool: Processes documents from all users
Supports dual credential types for background sync:
- App passwords (interim solution, works today)
- OAuth refresh tokens (future, when Nextcloud supports OAuth for app APIs)
Authentication strategies are mutually exclusive by deployment mode:
Multi-user BasicAuth mode (ENABLE_MULTI_USER_BASIC_AUTH=true):
- Uses app passwords obtained via Astrolabe Management API
- Users provision via Astrolabe personal settings
- OAuth is NOT used
OAuth mode (with external IdP like Keycloak):
- Uses OAuth refresh tokens via TokenBrokerService
- Users provision via browser OAuth flow
- App passwords are NOT used
These are separate concerns - no fallback between them.
"""
import logging
@@ -59,16 +68,64 @@ class UserSyncState:
started_at: float = field(default_factory=time.time)
async def get_user_client(
async def get_user_client_basic_auth(
user_id: str,
nextcloud_host: str,
) -> NextcloudClient:
"""Get an authenticated NextcloudClient using app password (BasicAuth mode).
For multi-user BasicAuth deployments where users provision app passwords
via Astrolabe personal settings. OAuth is NOT used in this mode.
Args:
user_id: User identifier
nextcloud_host: Nextcloud base URL
Returns:
Authenticated NextcloudClient with BasicAuth
Raises:
NotProvisionedError: If user has not provisioned an app password
"""
settings = get_settings()
if not settings.oidc_client_id or not settings.oidc_client_secret:
raise NotProvisionedError(
"Astrolabe client credentials not configured. "
"Set OIDC_CLIENT_ID and OIDC_CLIENT_SECRET for app password retrieval."
)
astrolabe = AstrolabeClient(
nextcloud_host=nextcloud_host,
client_id=settings.oidc_client_id,
client_secret=settings.oidc_client_secret,
)
app_password = await astrolabe.get_user_app_password(user_id)
if not app_password:
raise NotProvisionedError(
f"User {user_id} has not provisioned an app password. "
f"User must configure background sync in Astrolabe personal settings."
)
logger.info(f"Using app password for background sync: {user_id}")
return NextcloudClient(
base_url=nextcloud_host,
username=user_id,
auth=BasicAuth(user_id, app_password),
)
async def get_user_client_oauth(
user_id: str,
token_broker: "TokenBrokerService",
nextcloud_host: str,
) -> NextcloudClient:
"""Get an authenticated NextcloudClient for a user.
"""Get an authenticated NextcloudClient using OAuth refresh token.
Supports dual credential types with priority:
1. App password from Astrolabe (works today with BasicAuth)
2. OAuth refresh token from storage (for future when OAuth fully supported)
For OAuth deployments with external IdP where users provision via
browser OAuth flow. App passwords are NOT used in this mode.
Args:
user_id: User identifier
@@ -76,45 +133,19 @@ async def get_user_client(
nextcloud_host: Nextcloud base URL
Returns:
Authenticated NextcloudClient
Authenticated NextcloudClient with Bearer token
Raises:
NotProvisionedError: If user has not provisioned offline access
"""
settings = get_settings()
# Try app password first (interim solution, works today)
if settings.oidc_client_id and settings.oidc_client_secret:
try:
astrolabe = AstrolabeClient(
nextcloud_host=nextcloud_host,
client_id=settings.oidc_client_id,
client_secret=settings.oidc_client_secret,
)
app_password = await astrolabe.get_user_app_password(user_id)
if app_password:
logger.info(
f"Using app password for background sync: {user_id} "
f"(credential_type=app_password)"
)
return NextcloudClient(
base_url=nextcloud_host,
username=user_id,
auth=BasicAuth(user_id, app_password),
)
except Exception as e:
logger.debug(f"App password not available for {user_id}: {e}")
# Fall back to OAuth refresh token
logger.info(
f"Using OAuth refresh token for background sync: {user_id} "
f"(credential_type=refresh_token)"
)
token = await token_broker.get_background_token(user_id, VECTOR_SYNC_SCOPES)
if not token:
raise NotProvisionedError(f"User {user_id} has not provisioned offline access")
raise NotProvisionedError(
f"User {user_id} has not provisioned offline access. "
f"User must complete the OAuth provisioning flow."
)
logger.info(f"Using OAuth refresh token for background sync: {user_id}")
return NextcloudClient.from_token(
base_url=nextcloud_host,
token=token,
@@ -122,30 +153,66 @@ async def get_user_client(
)
async def get_user_client(
user_id: str,
token_broker: "TokenBrokerService | None",
nextcloud_host: str,
*,
use_basic_auth: bool = False,
) -> NextcloudClient:
"""Get an authenticated NextcloudClient for a user.
Dispatches to the appropriate authentication strategy based on mode.
These are mutually exclusive - no fallback between them.
Args:
user_id: User identifier
token_broker: Token broker for OAuth mode (can be None for BasicAuth mode)
nextcloud_host: Nextcloud base URL
use_basic_auth: If True, use app passwords via Astrolabe (BasicAuth mode).
If False, use OAuth refresh tokens (OAuth mode).
Returns:
Authenticated NextcloudClient
Raises:
NotProvisionedError: If user has not provisioned access for the mode
"""
if use_basic_auth:
return await get_user_client_basic_auth(user_id, nextcloud_host)
else:
if token_broker is None:
raise ValueError("token_broker required for OAuth mode")
return await get_user_client_oauth(user_id, token_broker, nextcloud_host)
async def user_scanner_task(
user_id: str,
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
token_broker: "TokenBrokerService",
token_broker: "TokenBrokerService | None",
nextcloud_host: str,
*,
use_basic_auth: bool = False,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
) -> None:
"""Scanner task for a single user in OAuth mode.
"""Scanner task for a single user.
Gets a fresh token at the start of each scan cycle.
Gets fresh credentials at the start of each scan cycle.
Args:
user_id: User to scan
send_stream: Stream to send changed documents to processors
shutdown_event: Event signaling shutdown
wake_event: Event to trigger immediate scan
token_broker: Token broker for obtaining access tokens
token_broker: Token broker for OAuth mode (None for BasicAuth mode)
nextcloud_host: Nextcloud base URL
use_basic_auth: If True, use app passwords; if False, use OAuth tokens
task_status: Status object for signaling task readiness
"""
logger.info(f"[OAuth] Scanner started for user: {user_id}")
mode_label = "BasicAuth" if use_basic_auth else "OAuth"
logger.info(f"[{mode_label}] Scanner started for user: {user_id}")
settings = get_settings()
task_status.started()
@@ -153,8 +220,10 @@ async def user_scanner_task(
while not shutdown_event.is_set():
nc_client = None
try:
# Get fresh token for this scan cycle
nc_client = await get_user_client(user_id, token_broker, nextcloud_host)
# Get fresh credentials for this scan cycle
nc_client = await get_user_client(
user_id, token_broker, nextcloud_host, use_basic_auth=use_basic_auth
)
# Scan user's documents
await scan_user_documents(
@@ -165,12 +234,14 @@ async def user_scanner_task(
except NotProvisionedError:
logger.warning(
f"[OAuth] User {user_id} no longer provisioned, stopping scanner"
f"[{mode_label}] User {user_id} no longer provisioned, stopping scanner"
)
break
except Exception as e:
logger.error(f"[OAuth] Scanner error for {user_id}: {e}", exc_info=True)
logger.error(
f"[{mode_label}] Scanner error for {user_id}: {e}", exc_info=True
)
finally:
if nc_client:
@@ -183,33 +254,36 @@ async def user_scanner_task(
except anyio.get_cancelled_exc_class():
break
logger.info(f"[OAuth] Scanner stopped for user: {user_id}")
logger.info(f"[{mode_label}] Scanner stopped for user: {user_id}")
async def oauth_processor_task(
async def multi_user_processor_task(
worker_id: int,
receive_stream: MemoryObjectReceiveStream[DocumentTask],
shutdown_event: anyio.Event,
token_broker: "TokenBrokerService",
token_broker: "TokenBrokerService | None",
nextcloud_host: str,
use_basic_auth: bool = False,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
) -> None:
"""Processor task for OAuth mode.
"""Processor task for multi-user mode.
Handles documents from any user by fetching tokens on-demand.
Handles documents from any user by fetching credentials on-demand.
Args:
worker_id: Worker identifier for logging
receive_stream: Stream to receive documents from
shutdown_event: Event signaling shutdown
token_broker: Token broker for obtaining access tokens
token_broker: Token broker for OAuth mode (None for BasicAuth mode)
nextcloud_host: Nextcloud base URL
use_basic_auth: If True, use app passwords; if False, use OAuth tokens
task_status: Status object for signaling task readiness
"""
from nextcloud_mcp_server.vector.processor import process_document
logger.info(f"[OAuth] Processor {worker_id} started")
mode_label = "BasicAuth" if use_basic_auth else "OAuth"
logger.info(f"[{mode_label}] Processor {worker_id} started")
task_status.started()
while not shutdown_event.is_set():
@@ -220,9 +294,12 @@ async def oauth_processor_task(
with anyio.fail_after(1.0):
doc_task = await receive_stream.receive()
# Get token for THIS document's user
# Get credentials for THIS document's user
nc_client = await get_user_client(
doc_task.user_id, token_broker, nextcloud_host
doc_task.user_id,
token_broker,
nextcloud_host,
use_basic_auth=use_basic_auth,
)
# Process the document
@@ -232,13 +309,13 @@ async def oauth_processor_task(
continue
except anyio.EndOfStream:
logger.info(f"[OAuth] Processor {worker_id}: Stream closed, exiting")
logger.info(f"[{mode_label}] Processor {worker_id}: Stream closed, exiting")
break
except NotProvisionedError:
if doc_task:
logger.warning(
f"[OAuth] User {doc_task.user_id} not provisioned, "
f"[{mode_label}] User {doc_task.user_id} not provisioned, "
f"skipping {doc_task.doc_type}_{doc_task.doc_id}"
)
continue
@@ -246,18 +323,24 @@ async def oauth_processor_task(
except Exception as e:
if doc_task:
logger.error(
f"[OAuth] Processor {worker_id} error processing "
f"[{mode_label}] Processor {worker_id} error processing "
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
exc_info=True,
)
else:
logger.error(f"[OAuth] Processor {worker_id} error: {e}", exc_info=True)
logger.error(
f"[{mode_label}] Processor {worker_id} error: {e}", exc_info=True
)
finally:
if nc_client:
await nc_client.close()
logger.info(f"[OAuth] Processor {worker_id} stopped")
logger.info(f"[{mode_label}] Processor {worker_id} stopped")
# Backward compatibility alias
oauth_processor_task = multi_user_processor_task
async def _run_user_scanner_with_scope(
@@ -266,9 +349,10 @@ async def _run_user_scanner_with_scope(
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
token_broker: "TokenBrokerService",
token_broker: "TokenBrokerService | None",
nextcloud_host: str,
user_states: dict[str, UserSyncState],
use_basic_auth: bool = False,
) -> None:
"""Wrapper to run scanner with cancellation scope.
@@ -284,6 +368,7 @@ async def _run_user_scanner_with_scope(
wake_event=wake_event,
token_broker=token_broker,
nextcloud_host=nextcloud_host,
use_basic_auth=use_basic_auth,
)
finally:
# Clean up on exit
@@ -296,35 +381,40 @@ async def user_manager_task(
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
token_broker: "TokenBrokerService",
token_broker: "TokenBrokerService | None",
refresh_token_storage: "RefreshTokenStorage",
nextcloud_host: str,
user_states: dict[str, UserSyncState],
tg: TaskGroup,
use_basic_auth: bool = False,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
) -> None:
"""Supervisor task that manages per-user scanners.
Periodically polls RefreshTokenStorage to detect:
- New users who have provisioned offline access -> start scanner
Periodically polls storage to detect:
- New users who have provisioned access -> start scanner
- Users who have revoked access -> cancel their scanner
Args:
send_stream: Stream to send documents to processors
shutdown_event: Event signaling shutdown
wake_event: Event to wake scanners for immediate scan
token_broker: Token broker for obtaining access tokens
refresh_token_storage: Storage for refresh tokens
token_broker: Token broker for OAuth mode (None for BasicAuth mode)
refresh_token_storage: Storage for tracking provisioned users
nextcloud_host: Nextcloud base URL
user_states: Shared dict tracking active user scanners
tg: Task group for spawning scanner tasks
use_basic_auth: If True, use app passwords; if False, use OAuth tokens
task_status: Status object for signaling task readiness
"""
settings = get_settings()
poll_interval = settings.vector_sync_user_poll_interval
mode_label = "BasicAuth" if use_basic_auth else "OAuth"
logger.info(f"[OAuth] User manager started (poll interval: {poll_interval}s)")
logger.info(
f"[{mode_label}] User manager started (poll interval: {poll_interval}s)"
)
task_status.started()
while not shutdown_event.is_set():
@@ -337,7 +427,7 @@ async def user_manager_task(
new_users = provisioned_users - active_users
for user_id in new_users:
logger.info(
f"[OAuth] Starting scanner for newly provisioned user: {user_id}"
f"[{mode_label}] Starting scanner for newly provisioned user: {user_id}"
)
cancel_scope = anyio.CancelScope()
user_states[user_id] = UserSyncState(
@@ -356,24 +446,27 @@ async def user_manager_task(
token_broker,
nextcloud_host,
user_states,
use_basic_auth, # Positional after user_states
)
# Cancel scanners for revoked users
revoked_users = active_users - provisioned_users
for user_id in revoked_users:
logger.info(f"[OAuth] Stopping scanner for revoked user: {user_id}")
logger.info(
f"[{mode_label}] Stopping scanner for revoked user: {user_id}"
)
state = user_states.get(user_id)
if state:
state.cancel_scope.cancel()
# Note: state will be removed by _run_user_scanner_with_scope on exit
if new_users:
logger.info(f"[OAuth] Started {len(new_users)} new scanner(s)")
logger.info(f"[{mode_label}] Started {len(new_users)} new scanner(s)")
if revoked_users:
logger.info(f"[OAuth] Stopped {len(revoked_users)} scanner(s)")
logger.info(f"[{mode_label}] Stopped {len(revoked_users)} scanner(s)")
except Exception as e:
logger.error(f"[OAuth] User manager error: {e}", exc_info=True)
logger.error(f"[{mode_label}] User manager error: {e}", exc_info=True)
# Sleep until next poll
try:
@@ -384,9 +477,9 @@ async def user_manager_task(
# Cancel all remaining scanners on shutdown
logger.info(
f"[OAuth] User manager shutting down, cancelling {len(user_states)} scanner(s)"
f"[{mode_label}] User manager shutting down, cancelling {len(user_states)} scanner(s)"
)
for state in list(user_states.values()):
state.cancel_scope.cancel()
logger.info("[OAuth] User manager stopped")
logger.info(f"[{mode_label}] User manager stopped")