"""Multi-user vector sync orchestration. Manages background vector sync for multi-user deployments: - User Manager: Monitors storage for user changes - Per-User Scanners: One scanner task per provisioned user - Shared Processor Pool: Processes documents from all users Authentication strategies are mutually exclusive by deployment mode: Multi-user BasicAuth mode (ENABLE_MULTI_USER_BASIC_AUTH=true): - Uses app passwords obtained via Astrolabe Management API - Users provision via Astrolabe personal settings - OAuth is NOT used OAuth mode (with external IdP like Keycloak): - Uses OAuth refresh tokens via TokenBrokerService - Users provision via browser OAuth flow - App passwords are NOT used These are separate concerns - no fallback between them. """ import logging import time from dataclasses import dataclass, field from typing import TYPE_CHECKING import anyio from anyio.abc import TaskGroup, TaskStatus from anyio.streams.memory import ( MemoryObjectReceiveStream, MemoryObjectSendStream, ) from httpx import BasicAuth from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.vector.scanner import DocumentTask, scan_user_documents if TYPE_CHECKING: from nextcloud_mcp_server.auth.storage import RefreshTokenStorage from nextcloud_mcp_server.auth.token_broker import TokenBrokerService logger = logging.getLogger(__name__) # Scopes required for vector sync operations VECTOR_SYNC_SCOPES = [ "notes:read", "files:read", "deck:read", # "news:read", # News app may not be installed ] class NotProvisionedError(Exception): """User has not provisioned offline access or has revoked it.""" pass @dataclass class UserSyncState: """State for a single user's scanner task.""" user_id: str cancel_scope: anyio.CancelScope started_at: float = field(default_factory=time.time) async def get_user_client_basic_auth( user_id: str, nextcloud_host: str, ) -> NextcloudClient: """Get an authenticated NextcloudClient using app password (BasicAuth mode). For multi-user BasicAuth deployments where users provision app passwords via Astrolabe personal settings. OAuth is NOT used in this mode. Args: user_id: User identifier nextcloud_host: Nextcloud base URL Returns: Authenticated NextcloudClient with BasicAuth Raises: NotProvisionedError: If user has not provisioned an app password """ settings = get_settings() if not settings.oidc_client_id or not settings.oidc_client_secret: raise NotProvisionedError( "Astrolabe client credentials not configured. " "Set OIDC_CLIENT_ID and OIDC_CLIENT_SECRET for app password retrieval." ) astrolabe = AstrolabeClient( nextcloud_host=nextcloud_host, client_id=settings.oidc_client_id, client_secret=settings.oidc_client_secret, ) app_password = await astrolabe.get_user_app_password(user_id) if not app_password: raise NotProvisionedError( f"User {user_id} has not provisioned an app password. " f"User must configure background sync in Astrolabe personal settings." ) logger.info(f"Using app password for background sync: {user_id}") return NextcloudClient( base_url=nextcloud_host, username=user_id, auth=BasicAuth(user_id, app_password), ) async def get_user_client_oauth( user_id: str, token_broker: "TokenBrokerService", nextcloud_host: str, ) -> NextcloudClient: """Get an authenticated NextcloudClient using OAuth refresh token. For OAuth deployments with external IdP where users provision via browser OAuth flow. App passwords are NOT used in this mode. Args: user_id: User identifier token_broker: Token broker for obtaining access tokens nextcloud_host: Nextcloud base URL Returns: Authenticated NextcloudClient with Bearer token Raises: NotProvisionedError: If user has not provisioned offline access """ token = await token_broker.get_background_token(user_id, VECTOR_SYNC_SCOPES) if not token: raise NotProvisionedError( f"User {user_id} has not provisioned offline access. " f"User must complete the OAuth provisioning flow." ) logger.info(f"Using OAuth refresh token for background sync: {user_id}") return NextcloudClient.from_token( base_url=nextcloud_host, token=token, username=user_id, ) async def get_user_client( user_id: str, token_broker: "TokenBrokerService | None", nextcloud_host: str, *, use_basic_auth: bool = False, ) -> NextcloudClient: """Get an authenticated NextcloudClient for a user. Dispatches to the appropriate authentication strategy based on mode. These are mutually exclusive - no fallback between them. Args: user_id: User identifier token_broker: Token broker for OAuth mode (can be None for BasicAuth mode) nextcloud_host: Nextcloud base URL use_basic_auth: If True, use app passwords via Astrolabe (BasicAuth mode). If False, use OAuth refresh tokens (OAuth mode). Returns: Authenticated NextcloudClient Raises: NotProvisionedError: If user has not provisioned access for the mode """ if use_basic_auth: return await get_user_client_basic_auth(user_id, nextcloud_host) else: if token_broker is None: raise ValueError("token_broker required for OAuth mode") return await get_user_client_oauth(user_id, token_broker, nextcloud_host) async def user_scanner_task( user_id: str, send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, wake_event: anyio.Event, token_broker: "TokenBrokerService | None", nextcloud_host: str, *, use_basic_auth: bool = False, task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ) -> None: """Scanner task for a single user. Gets fresh credentials at the start of each scan cycle. Args: user_id: User to scan send_stream: Stream to send changed documents to processors shutdown_event: Event signaling shutdown wake_event: Event to trigger immediate scan token_broker: Token broker for OAuth mode (None for BasicAuth mode) nextcloud_host: Nextcloud base URL use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ mode_label = "BasicAuth" if use_basic_auth else "OAuth" logger.info(f"[{mode_label}] Scanner started for user: {user_id}") settings = get_settings() task_status.started() while not shutdown_event.is_set(): nc_client = None try: # Get fresh credentials for this scan cycle nc_client = await get_user_client( user_id, token_broker, nextcloud_host, use_basic_auth=use_basic_auth ) # Scan user's documents await scan_user_documents( user_id=user_id, send_stream=send_stream, nc_client=nc_client, ) except NotProvisionedError: logger.warning( f"[{mode_label}] User {user_id} no longer provisioned, stopping scanner" ) break except Exception as e: logger.error( f"[{mode_label}] Scanner error for {user_id}: {e}", exc_info=True ) finally: if nc_client: await nc_client.close() # Sleep until next interval or wake event try: with anyio.move_on_after(settings.vector_sync_scan_interval): await wake_event.wait() except anyio.get_cancelled_exc_class(): break logger.info(f"[{mode_label}] Scanner stopped for user: {user_id}") async def multi_user_processor_task( worker_id: int, receive_stream: MemoryObjectReceiveStream[DocumentTask], shutdown_event: anyio.Event, token_broker: "TokenBrokerService | None", nextcloud_host: str, use_basic_auth: bool = False, *, task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ) -> None: """Processor task for multi-user mode. Handles documents from any user by fetching credentials on-demand. Args: worker_id: Worker identifier for logging receive_stream: Stream to receive documents from shutdown_event: Event signaling shutdown token_broker: Token broker for OAuth mode (None for BasicAuth mode) nextcloud_host: Nextcloud base URL use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ from nextcloud_mcp_server.vector.processor import process_document mode_label = "BasicAuth" if use_basic_auth else "OAuth" logger.info(f"[{mode_label}] Processor {worker_id} started") task_status.started() while not shutdown_event.is_set(): doc_task = None nc_client = None try: # Get document with timeout with anyio.fail_after(1.0): doc_task = await receive_stream.receive() # Get credentials for THIS document's user nc_client = await get_user_client( doc_task.user_id, token_broker, nextcloud_host, use_basic_auth=use_basic_auth, ) # Process the document await process_document(doc_task, nc_client) except TimeoutError: continue except anyio.EndOfStream: logger.info(f"[{mode_label}] Processor {worker_id}: Stream closed, exiting") break except NotProvisionedError: if doc_task: logger.warning( f"[{mode_label}] User {doc_task.user_id} not provisioned, " f"skipping {doc_task.doc_type}_{doc_task.doc_id}" ) continue except Exception as e: if doc_task: logger.error( f"[{mode_label}] Processor {worker_id} error processing " f"{doc_task.doc_type}_{doc_task.doc_id}: {e}", exc_info=True, ) else: logger.error( f"[{mode_label}] Processor {worker_id} error: {e}", exc_info=True ) finally: if nc_client: await nc_client.close() logger.info(f"[{mode_label}] Processor {worker_id} stopped") # Backward compatibility alias oauth_processor_task = multi_user_processor_task async def _run_user_scanner_with_scope( user_id: str, cancel_scope: anyio.CancelScope, send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, wake_event: anyio.Event, token_broker: "TokenBrokerService | None", nextcloud_host: str, user_states: dict[str, UserSyncState], use_basic_auth: bool = False, ) -> None: """Wrapper to run scanner with cancellation scope. Cleans up user state on exit. """ cloned_stream = send_stream.clone() try: with cancel_scope: await user_scanner_task( user_id=user_id, send_stream=cloned_stream, shutdown_event=shutdown_event, wake_event=wake_event, token_broker=token_broker, nextcloud_host=nextcloud_host, use_basic_auth=use_basic_auth, ) finally: # Clean up on exit if user_id in user_states: del user_states[user_id] await cloned_stream.aclose() async def user_manager_task( send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, wake_event: anyio.Event, token_broker: "TokenBrokerService | None", refresh_token_storage: "RefreshTokenStorage", nextcloud_host: str, user_states: dict[str, UserSyncState], tg: TaskGroup, use_basic_auth: bool = False, *, task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ) -> None: """Supervisor task that manages per-user scanners. Periodically polls storage to detect: - New users who have provisioned access -> start scanner - Users who have revoked access -> cancel their scanner Args: send_stream: Stream to send documents to processors shutdown_event: Event signaling shutdown wake_event: Event to wake scanners for immediate scan token_broker: Token broker for OAuth mode (None for BasicAuth mode) refresh_token_storage: Storage for tracking provisioned users nextcloud_host: Nextcloud base URL user_states: Shared dict tracking active user scanners tg: Task group for spawning scanner tasks use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ settings = get_settings() poll_interval = settings.vector_sync_user_poll_interval mode_label = "BasicAuth" if use_basic_auth else "OAuth" logger.info( f"[{mode_label}] User manager started (poll interval: {poll_interval}s)" ) task_status.started() while not shutdown_event.is_set(): try: # Get current provisioned users provisioned_users = set(await refresh_token_storage.get_all_user_ids()) active_users = set(user_states.keys()) # Start scanners for new users new_users = provisioned_users - active_users for user_id in new_users: logger.info( f"[{mode_label}] Starting scanner for newly provisioned user: {user_id}" ) cancel_scope = anyio.CancelScope() user_states[user_id] = UserSyncState( user_id=user_id, cancel_scope=cancel_scope, ) # Start scanner in task group tg.start_soon( _run_user_scanner_with_scope, user_id, cancel_scope, send_stream, shutdown_event, wake_event, token_broker, nextcloud_host, user_states, use_basic_auth, # Positional after user_states ) # Cancel scanners for revoked users revoked_users = active_users - provisioned_users for user_id in revoked_users: logger.info( f"[{mode_label}] Stopping scanner for revoked user: {user_id}" ) state = user_states.get(user_id) if state: state.cancel_scope.cancel() # Note: state will be removed by _run_user_scanner_with_scope on exit if new_users: logger.info(f"[{mode_label}] Started {len(new_users)} new scanner(s)") if revoked_users: logger.info(f"[{mode_label}] Stopped {len(revoked_users)} scanner(s)") except Exception as e: logger.error(f"[{mode_label}] User manager error: {e}", exc_info=True) # Sleep until next poll try: with anyio.move_on_after(poll_interval): await shutdown_event.wait() except anyio.get_cancelled_exc_class(): break # Cancel all remaining scanners on shutdown logger.info( f"[{mode_label}] User manager shutting down, cancelling {len(user_states)} scanner(s)" ) for state in list(user_states.values()): state.cancel_scope.cancel() logger.info(f"[{mode_label}] User manager stopped")