From a58a14111b283dfbd90b6c8bb915b05e7c4a11ca Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 14 Dec 2025 20:00:41 +0100 Subject: [PATCH] feat(vector-sync): enable background sync in OAuth mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add multi-user background vector synchronization when running in OAuth mode with ENABLE_OFFLINE_ACCESS=true. Key changes: Architecture (oauth_sync.py): - User Manager task polls RefreshTokenStorage for provisioned users - Per-user scanner tasks fetch documents using OAuth tokens - Shared processor pool indexes documents from all users Token Broker improvements: - Accept client_id/client_secret instead of encryption_key - Remove redundant token audience pre-validation (Nextcloud validates) - Add _rewrite_token_endpoint for Docker internal URL routing - Remove double-decryption (storage handles encryption internally) Browser OAuth flow fixes: - Add 'resource' parameter to request Nextcloud-scoped tokens - Store and retrieve next_url for proper redirect after consent - Rewrite token endpoint URLs for internal Docker access Configuration: - Add vector_sync_user_poll_interval setting (default: 60s) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- nextcloud_mcp_server/app.py | 263 ++++++++++++- .../auth/browser_oauth_routes.py | 45 ++- nextcloud_mcp_server/auth/token_broker.py | 102 +++-- nextcloud_mcp_server/config.py | 4 + nextcloud_mcp_server/vector/oauth_sync.py | 352 ++++++++++++++++++ 5 files changed, 723 insertions(+), 43 deletions(-) create mode 100644 nextcloud_mcp_server/vector/oauth_sync.py diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 9ac513c..2757eb6 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -676,6 +676,29 @@ async def setup_oauth_config(): logger.info(f"OIDC_JWKS_URI override: {jwks_uri} → {jwks_uri_override}") jwks_uri = jwks_uri_override + # Rewrite discovered endpoint URLs from public issuer to internal host + # This is needed when OIDC discovery returns public URLs (e.g., http://localhost:8080) + # but the server needs to access them via internal docker network (e.g., http://app:80) + from urllib.parse import urlparse + + issuer_parsed = urlparse(issuer) + nextcloud_parsed = urlparse(nextcloud_host) + issuer_base = f"{issuer_parsed.scheme}://{issuer_parsed.netloc}" + nextcloud_base = f"{nextcloud_parsed.scheme}://{nextcloud_parsed.netloc}" + + if issuer_base != nextcloud_base: + logger.info(f"Rewriting OIDC endpoints: {issuer_base} → {nextcloud_base}") + + def rewrite_url(url: str | None) -> str | None: + if url and url.startswith(issuer_base): + return url.replace(issuer_base, nextcloud_base, 1) + return url + + userinfo_uri = rewrite_url(userinfo_uri) or userinfo_uri + jwks_uri = rewrite_url(jwks_uri) + introspection_uri = rewrite_url(introspection_uri) + registration_endpoint = rewrite_url(registration_endpoint) + logger.info("OIDC endpoints discovered:") logger.info(f" Issuer: {issuer}") logger.info(f" Userinfo: {userinfo_uri}") @@ -687,8 +710,6 @@ async def setup_oauth_config(): # Auto-detect provider mode based on issuer # External IdP mode: issuer doesn't match Nextcloud host # Normalize URLs for comparison (handle port differences like :80 for HTTP) - from urllib.parse import urlparse - def normalize_url(url: str) -> str: """Normalize URL by removing default ports (80 for HTTP, 443 for HTTPS).""" parsed = urlparse(url) @@ -704,7 +725,16 @@ async def setup_oauth_config(): issuer_normalized = normalize_url(issuer) nextcloud_normalized = normalize_url(nextcloud_host) - is_external_idp = not issuer_normalized.startswith(nextcloud_normalized) + # Use NEXTCLOUD_PUBLIC_ISSUER_URL for IdP detection when set + # This handles the case where MCP server accesses Nextcloud via internal URL (http://app:80) + # but the issuer in OIDC discovery is the public URL (http://localhost:8080) + public_issuer_for_detection = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") + if public_issuer_for_detection: + comparison_issuer = normalize_url(public_issuer_for_detection) + else: + comparison_issuer = nextcloud_normalized + + is_external_idp = not issuer_normalized.startswith(comparison_issuer) if is_external_idp: oauth_provider = "external" # Could be Keycloak, Auth0, Okta, etc. @@ -716,6 +746,28 @@ async def setup_oauth_config(): oauth_provider = "nextcloud" logger.info("✓ Detected integrated mode (Nextcloud OIDC app)") + # For integrated mode, rewrite OIDC endpoints to use internal URL + # The discovery document returns external URLs (http://localhost:8080) + # but the MCP server needs internal URLs (http://app:80) for backend requests + if jwks_uri and not os.getenv("OIDC_JWKS_URI"): + internal_jwks_uri = f"{nextcloud_host}/apps/oidc/jwks" + logger.info( + f" Auto-rewriting JWKS URI for internal access: {jwks_uri} → {internal_jwks_uri}" + ) + jwks_uri = internal_jwks_uri + if introspection_uri and not os.getenv("OIDC_INTROSPECTION_URI"): + internal_introspection_uri = f"{nextcloud_host}/apps/oidc/introspect" + logger.info( + f" Auto-rewriting introspection URI for internal access: {introspection_uri} → {internal_introspection_uri}" + ) + introspection_uri = internal_introspection_uri + if userinfo_uri: + internal_userinfo_uri = f"{nextcloud_host}/apps/oidc/userinfo" + logger.info( + f" Auto-rewriting userinfo URI for internal access: {userinfo_uri} → {internal_userinfo_uri}" + ) + userinfo_uri = internal_userinfo_uri + # Check if offline access (refresh tokens) is enabled enable_offline_access = os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() in ( "true", @@ -1234,12 +1286,20 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = ) break - # Start background vector sync tasks for BasicAuth mode (ADR-007) + # Start background vector sync tasks (ADR-007) # Scanner runs at server-level (once), not per-session import anyio as anyio_module settings = get_settings() - if not oauth_enabled and settings.vector_sync_enabled: + + # Check if vector sync is enabled and determine the mode + enable_offline_access_for_sync = os.getenv( + "ENABLE_OFFLINE_ACCESS", "false" + ).lower() in ("true", "1", "yes") + encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") + + if settings.vector_sync_enabled and not oauth_enabled: + # BasicAuth mode - single user sync logger.info("Starting background vector sync tasks for BasicAuth mode") # Get username from environment @@ -1334,8 +1394,161 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = shutdown_event.set() await client.close() # TaskGroup automatically cancels all tasks on exit + + elif ( + settings.vector_sync_enabled + and oauth_enabled + and enable_offline_access_for_sync + and refresh_token_storage + and encryption_key + ): + # OAuth mode with offline access - multi-user sync + logger.info("Starting background vector sync tasks for OAuth mode") + + from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + from nextcloud_mcp_server.vector.oauth_sync import ( + oauth_processor_task, + user_manager_task, + ) + + # Get OIDC discovery URL (same as used for OAuth setup) + discovery_url = os.getenv( + "OIDC_DISCOVERY_URL", + f"{nextcloud_host}/.well-known/openid-configuration", + ) + + # Get client credentials from oauth_context (set by setup_oauth_config) + # This includes credentials from DCR if dynamic registration was used + # Use different variable names to avoid shadowing client_id/client_secret from outer scope + oauth_ctx = getattr(app.state, "oauth_context", {}) + oauth_config = oauth_ctx.get("config", {}) + sync_client_id = oauth_config.get("client_id") + sync_client_secret = oauth_config.get("client_secret") + + if not sync_client_id or not sync_client_secret: + logger.error( + "Cannot start OAuth vector sync: client credentials not found in oauth_context" + ) + raise ValueError("OAuth client credentials required for vector sync") + + # Create token broker for background operations + # Note: storage handles encryption internally, no key needed here + # Client credentials are needed for token refresh operations + token_broker = TokenBrokerService( + storage=refresh_token_storage, + oidc_discovery_url=discovery_url, + nextcloud_host=nextcloud_host, + client_id=sync_client_id, + client_secret=sync_client_secret, + ) + + # Initialize Qdrant collection before starting background tasks + logger.info("Initializing Qdrant collection...") + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + + try: + await get_qdrant_client() # Triggers collection creation if needed + logger.info("Qdrant collection ready") + except Exception as e: + logger.error(f"Failed to initialize Qdrant collection: {e}") + raise RuntimeError( + f"Cannot start vector sync - Qdrant initialization failed: {e}" + ) from e + + # Initialize shared state + send_stream, receive_stream = anyio_module.create_memory_object_stream( + max_buffer_size=settings.vector_sync_queue_max_size + ) + shutdown_event = anyio_module.Event() + scanner_wake_event = anyio_module.Event() + + # User state tracking for user manager + user_states: dict = {} + + # Store in app state for access from routes (ADR-007) + app.state.document_send_stream = send_stream + app.state.document_receive_stream = receive_stream + app.state.shutdown_event = shutdown_event + app.state.scanner_wake_event = scanner_wake_event + + # Also store in module singleton for FastMCP session lifespans + _vector_sync_state.document_send_stream = send_stream + _vector_sync_state.document_receive_stream = receive_stream + _vector_sync_state.shutdown_event = shutdown_event + _vector_sync_state.scanner_wake_event = scanner_wake_event + logger.info("Vector sync state stored in module singleton") + + # Also share with browser_app for /app route + for route in app.routes: + if isinstance(route, Mount) and route.path == "/app": + route.app.state.document_send_stream = send_stream + route.app.state.document_receive_stream = receive_stream + route.app.state.shutdown_event = shutdown_event + route.app.state.scanner_wake_event = scanner_wake_event + logger.info("Vector sync state shared with browser_app for /app") + break + + # Start background tasks using anyio TaskGroup + async with anyio_module.create_task_group() as tg: + # Start user manager task (supervises per-user scanners) + await tg.start( + user_manager_task, + send_stream, + shutdown_event, + scanner_wake_event, + token_broker, + refresh_token_storage, + nextcloud_host, + user_states, + tg, + ) + + # Start processor pool (each gets a cloned receive stream) + for i in range(settings.vector_sync_processor_workers): + await tg.start( + oauth_processor_task, + i, + receive_stream.clone(), + shutdown_event, + token_broker, + nextcloud_host, + ) + + logger.info( + f"Background sync tasks started: 1 user manager + " + f"{settings.vector_sync_processor_workers} processors" + ) + + # Run MCP session manager and yield + async with AsyncExitStack() as stack: + await stack.enter_async_context(mcp.session_manager.run()) + try: + yield + finally: + # Shutdown signal + logger.info("Shutting down background sync tasks") + shutdown_event.set() + # Close token broker HTTP client + if token_broker._http_client: + await token_broker._http_client.aclose() + # TaskGroup automatically cancels all tasks on exit else: # No vector sync - just run MCP session manager + if settings.vector_sync_enabled: + # Log why vector sync is not starting + if oauth_enabled and not enable_offline_access_for_sync: + logger.warning( + "Vector sync enabled but ENABLE_OFFLINE_ACCESS=false - " + "vector sync requires offline access in OAuth mode" + ) + elif oauth_enabled and not refresh_token_storage: + logger.warning( + "Vector sync enabled but refresh token storage not available" + ) + elif oauth_enabled and not encryption_key: + logger.warning( + "Vector sync enabled but TOKEN_ENCRYPTION_KEY not set" + ) async with AsyncExitStack() as stack: await stack.enter_async_context(mcp.session_manager.run()) yield @@ -1491,6 +1704,46 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = ) logger.info("Test webhook endpoint enabled: /webhooks/nextcloud") + # Add management API endpoints for Nextcloud PHP app (OAuth mode only) + if oauth_enabled: + from nextcloud_mcp_server.api.management import ( + get_server_status, + get_user_session, + get_vector_sync_status, + revoke_user_access, + vector_search, + ) + + routes.append(Route("/api/v1/status", get_server_status, methods=["GET"])) + routes.append( + Route( + "/api/v1/vector-sync/status", + get_vector_sync_status, + methods=["GET"], + ) + ) + routes.append( + Route( + "/api/v1/users/{user_id}/session", + get_user_session, + methods=["GET"], + ) + ) + routes.append( + Route( + "/api/v1/users/{user_id}/revoke", + revoke_user_access, + methods=["POST"], + ) + ) + routes.append( + Route("/api/v1/vector-viz/search", vector_search, methods=["POST"]) + ) + logger.info( + "Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, " + "/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, /api/v1/vector-viz/search" + ) + # ADR-016: Add Smithery well-known config endpoint for container runtime discovery if deployment_mode == DeploymentMode.SMITHERY_STATELESS: diff --git a/nextcloud_mcp_server/auth/browser_oauth_routes.py b/nextcloud_mcp_server/auth/browser_oauth_routes.py index 7bc2d00..a86673e 100644 --- a/nextcloud_mcp_server/auth/browser_oauth_routes.py +++ b/nextcloud_mcp_server/auth/browser_oauth_routes.py @@ -50,6 +50,10 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: logger.info(f"oauth_login called - client_id: {oauth_config.get('client_id')}") logger.info(f"oauth_login called - oauth_client: {oauth_client is not None}") + # Get redirect URL from query params (default to /app) + next_url = request.query_params.get("next", "/app") + logger.info(f"oauth_login - next_url: {next_url}") + # Generate state for CSRF protection state = secrets.token_urlsafe(32) @@ -71,7 +75,7 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: await storage.store_oauth_session( session_id=state, # Use state as session ID client_id="browser-ui", - client_redirect_uri="/app", + client_redirect_uri=next_url, # Store the redirect URL for after auth state=state, code_challenge=code_challenge, code_challenge_method="S256", @@ -85,6 +89,11 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: if not oauth_client.authorization_endpoint: await oauth_client.discover() + # Get Nextcloud resource URI for audience (background sync needs Nextcloud-scoped tokens) + nextcloud_resource_uri = oauth_config.get( + "nextcloud_resource_uri", oauth_config.get("nextcloud_host") + ) + idp_params = { "client_id": oauth_client.client_id, "redirect_uri": callback_uri, @@ -94,6 +103,7 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: "code_challenge": code_challenge, "code_challenge_method": "S256", "prompt": "consent", # Ensure refresh token + "resource": nextcloud_resource_uri, # Request tokens for Nextcloud API access } auth_url = f"{oauth_client.authorization_endpoint}?{urlencode(idp_params)}" @@ -131,6 +141,11 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: f"{public_parsed.scheme}://{public_parsed.netloc}{auth_parsed.path}" ) + # Get Nextcloud resource URI for audience (background sync needs Nextcloud-scoped tokens) + nextcloud_resource_uri = oauth_config.get( + "nextcloud_resource_uri", oauth_config.get("nextcloud_host") + ) + idp_params = { "client_id": oauth_config["client_id"], "redirect_uri": callback_uri, @@ -140,6 +155,7 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: "code_challenge": code_challenge, "code_challenge_method": "S256", "prompt": "consent", # Ensure refresh token + "resource": nextcloud_resource_uri, # Request tokens for Nextcloud API access } # Debug: Log full parameters @@ -214,12 +230,15 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo oauth_client = oauth_ctx["oauth_client"] oauth_config = oauth_ctx["config"] - # Retrieve code_verifier from session storage (PKCE required for all modes) + # Retrieve code_verifier and redirect URL from session storage code_verifier = "" + next_url = "/app" # Default redirect oauth_session = await storage.get_oauth_session(state) if oauth_session: # code_verifier was stored in mcp_authorization_code field code_verifier = oauth_session.get("mcp_authorization_code", "") + # next_url was stored in client_redirect_uri field + next_url = oauth_session.get("client_redirect_uri", "/app") # Clean up the temporary session # Note: We don't have delete_oauth_session method, but it will expire after TTL @@ -262,6 +281,25 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo discovery = response.json() token_endpoint = discovery["token_endpoint"] + # Rewrite token_endpoint from public URL to internal Docker URL + # Discovery document returns public URLs (e.g., http://localhost:8080/...) + # but server-side requests must use internal Docker network (e.g., http://app:80/...) + public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") + if public_issuer: + from urllib.parse import urlparse as parse_url + + internal_host = oauth_config["nextcloud_host"] + internal_parsed = parse_url(internal_host) + token_parsed = parse_url(token_endpoint) + public_parsed = parse_url(public_issuer) + + if token_parsed.hostname == public_parsed.hostname: + # Replace public URL with internal Docker URL + token_endpoint = f"{internal_parsed.scheme}://{internal_parsed.netloc}{token_parsed.path}" + logger.info( + f"Rewrote token endpoint to internal URL: {token_endpoint}" + ) + token_params = { "grant_type": "authorization_code", "code": code, @@ -383,7 +421,8 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo # Continue anyway - profile cache is optional for browser UI # Create response and set session cookie - response = RedirectResponse("/app", status_code=302) + # Redirect to stored next_url (from OAuth session) or /app as default + response = RedirectResponse(next_url, status_code=302) response.set_cookie( key="mcp_session", value=user_id, diff --git a/nextcloud_mcp_server/auth/token_broker.py b/nextcloud_mcp_server/auth/token_broker.py index 6b89eef..4e5d783 100644 --- a/nextcloud_mcp_server/auth/token_broker.py +++ b/nextcloud_mcp_server/auth/token_broker.py @@ -21,7 +21,6 @@ from typing import Dict, Optional, Tuple import anyio import httpx import jwt -from cryptography.fernet import Fernet from nextcloud_mcp_server.auth.storage import RefreshTokenStorage from nextcloud_mcp_server.auth.token_exchange import exchange_token_for_delegation @@ -104,7 +103,8 @@ class TokenBrokerService: storage: RefreshTokenStorage, oidc_discovery_url: str, nextcloud_host: str, - encryption_key: str, + client_id: str, + client_secret: str, cache_ttl: int = 300, cache_early_refresh: int = 30, ): @@ -112,21 +112,19 @@ class TokenBrokerService: Initialize the Token Broker Service. Args: - storage: Database storage for refresh tokens + storage: Database storage for refresh tokens (handles encryption internally) oidc_discovery_url: OIDC provider discovery URL nextcloud_host: Nextcloud server URL - encryption_key: Fernet key for token encryption + client_id: OAuth client ID for token operations + client_secret: OAuth client secret for token operations cache_ttl: Cache TTL in seconds (default: 5 minutes) cache_early_refresh: Early refresh threshold in seconds (default: 30 seconds) """ self.storage = storage self.oidc_discovery_url = oidc_discovery_url self.nextcloud_host = nextcloud_host - self.fernet = Fernet( - encryption_key.encode() - if isinstance(encryption_key, str) - else encryption_key - ) + self.client_id = client_id + self.client_secret = client_secret self.cache = TokenCache(cache_ttl, cache_early_refresh) self._oidc_config = None self._http_client = None @@ -148,6 +146,37 @@ class TokenBrokerService: self._oidc_config = response.json() return self._oidc_config + def _rewrite_token_endpoint(self, token_endpoint: str) -> str: + """Rewrite token endpoint from public URL to internal Docker URL. + + OIDC discovery documents return public URLs (e.g., http://localhost:8080/...) + but server-side requests must use internal Docker network (e.g., http://app:80/...). + + Args: + token_endpoint: Token endpoint URL from discovery document + + Returns: + Rewritten URL using internal Docker host + """ + import os + from urllib.parse import urlparse + + public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") + if not public_issuer: + return token_endpoint + + internal_parsed = urlparse(self.nextcloud_host) + token_parsed = urlparse(token_endpoint) + public_parsed = urlparse(public_issuer) + + if token_parsed.hostname == public_parsed.hostname: + # Replace public URL with internal Docker URL + rewritten = f"{internal_parsed.scheme}://{internal_parsed.netloc}{token_parsed.path}" + logger.info(f"Rewrote token endpoint: {token_endpoint} -> {rewritten}") + return rewritten + + return token_endpoint + async def get_nextcloud_token(self, user_id: str) -> Optional[str]: """ Get a valid Nextcloud access token for the user. @@ -180,9 +209,8 @@ class TokenBrokerService: return None try: - # Decrypt refresh token - encrypted_token = refresh_data["refresh_token"] - refresh_token = self.fernet.decrypt(encrypted_token.encode()).decode() + # storage.get_refresh_token() returns already-decrypted token + refresh_token = refresh_data["refresh_token"] # Exchange refresh token for new access token access_token, expires_in = await self._refresh_access_token(refresh_token) @@ -282,9 +310,8 @@ class TokenBrokerService: return None try: - # Decrypt refresh token - encrypted_token = refresh_data["refresh_token"] - refresh_token = self.fernet.decrypt(encrypted_token.encode()).decode() + # storage.get_refresh_token() returns already-decrypted token + refresh_token = refresh_data["refresh_token"] # Get token with specific scopes for background operation access_token, expires_in = await self._refresh_access_token_with_scopes( @@ -301,7 +328,10 @@ class TokenBrokerService: return access_token except Exception as e: - logger.error(f"Failed to get background token for user {user_id}: {e}") + logger.error( + f"Failed to get background token for user {user_id}: {e}", + exc_info=True, + ) await self.cache.invalidate(cache_key) return None @@ -318,15 +348,18 @@ class TokenBrokerService: Tuple of (access_token, expires_in_seconds) """ config = await self._get_oidc_config() - token_endpoint = config["token_endpoint"] + token_endpoint = self._rewrite_token_endpoint(config["token_endpoint"]) client = await self._get_http_client() # Request new access token using refresh token + # Include client credentials as required by most OAuth servers data = { "grant_type": "refresh_token", "refresh_token": refresh_token, "scope": "openid profile email notes:read notes:write calendar:read calendar:write", + "client_id": self.client_id, + "client_secret": self.client_secret, } response = await client.post( @@ -345,8 +378,7 @@ class TokenBrokerService: access_token = token_data["access_token"] expires_in = token_data.get("expires_in", 3600) # Default 1 hour - # Validate audience - await self._validate_token_audience(access_token, "nextcloud") + # Note: Nextcloud validates token audience on API calls - no need to pre-validate here logger.info(f"Refreshed access token (expires in {expires_in}s)") return access_token, expires_in @@ -367,7 +399,7 @@ class TokenBrokerService: Tuple of (access_token, expires_in_seconds) """ config = await self._get_oidc_config() - token_endpoint = config["token_endpoint"] + token_endpoint = self._rewrite_token_endpoint(config["token_endpoint"]) client = await self._get_http_client() @@ -375,12 +407,19 @@ class TokenBrokerService: scopes = list(set(["openid", "profile", "email"] + required_scopes)) # Request new access token with specific scopes + # Include client credentials as required by most OAuth servers data = { "grant_type": "refresh_token", "refresh_token": refresh_token, "scope": " ".join(scopes), + "client_id": self.client_id, + "client_secret": self.client_secret, } + logger.info( + f"Token refresh request to {token_endpoint} with client_id={self.client_id[:16]}..." + ) + response = await client.post( token_endpoint, data=data, @@ -391,14 +430,14 @@ class TokenBrokerService: logger.error( f"Token refresh with scopes failed: {response.status_code} - {response.text}" ) + logger.error(f" client_id used: {self.client_id[:16]}...") raise Exception(f"Token refresh failed: {response.status_code}") token_data = response.json() access_token = token_data["access_token"] expires_in = token_data.get("expires_in", 3600) # Default 1 hour - # Validate audience - await self._validate_token_audience(access_token, "nextcloud") + # Note: Nextcloud validates token audience on API calls - no need to pre-validate here logger.info( f"Refreshed access token with scopes {scopes} (expires in {expires_in}s)" @@ -453,11 +492,8 @@ class TokenBrokerService: return False try: - # Decrypt current refresh token - encrypted_token = refresh_data["refresh_token"] - current_refresh_token = self.fernet.decrypt( - encrypted_token.encode() - ).decode() + # storage.get_refresh_token() returns already-decrypted token + current_refresh_token = refresh_data["refresh_token"] # Get OIDC configuration config = await self._get_oidc_config() @@ -486,11 +522,10 @@ class TokenBrokerService: new_refresh_token = token_data.get("refresh_token") if new_refresh_token and new_refresh_token != current_refresh_token: - # Encrypt and store new refresh token - encrypted_new = self.fernet.encrypt(new_refresh_token.encode()).decode() + # storage.store_refresh_token() handles encryption internally await self.storage.store_refresh_token( user_id=user_id, - refresh_token=encrypted_new, + refresh_token=new_refresh_token, expires_at=datetime.now(timezone.utc) + timedelta(days=90), # 90-day expiry ) @@ -536,11 +571,8 @@ class TokenBrokerService: refresh_data = await self.storage.get_refresh_token(user_id) if refresh_data: try: - # Attempt to revoke at IdP - encrypted_token = refresh_data["refresh_token"] - refresh_token = self.fernet.decrypt( - encrypted_token.encode() - ).decode() + # storage.get_refresh_token() returns already-decrypted token + refresh_token = refresh_data["refresh_token"] await self._revoke_token_at_idp(refresh_token) except Exception as e: logger.warning(f"Failed to revoke at IdP: {e}") diff --git a/nextcloud_mcp_server/config.py b/nextcloud_mcp_server/config.py index 1eacb28..0803fe2 100644 --- a/nextcloud_mcp_server/config.py +++ b/nextcloud_mcp_server/config.py @@ -205,6 +205,7 @@ class Settings: vector_sync_scan_interval: int = 300 # seconds (5 minutes) vector_sync_processor_workers: int = 3 vector_sync_queue_max_size: int = 10000 + vector_sync_user_poll_interval: int = 60 # seconds - OAuth mode user discovery # Qdrant settings (mutually exclusive modes) qdrant_url: Optional[str] = None # Network mode: http://qdrant:6333 @@ -391,6 +392,9 @@ def get_settings() -> Settings: vector_sync_queue_max_size=int( os.getenv("VECTOR_SYNC_QUEUE_MAX_SIZE", "10000") ), + vector_sync_user_poll_interval=int( + os.getenv("VECTOR_SYNC_USER_POLL_INTERVAL", "60") + ), # Qdrant settings qdrant_url=os.getenv("QDRANT_URL"), qdrant_location=os.getenv("QDRANT_LOCATION"), diff --git a/nextcloud_mcp_server/vector/oauth_sync.py b/nextcloud_mcp_server/vector/oauth_sync.py new file mode 100644 index 0000000..75dcf91 --- /dev/null +++ b/nextcloud_mcp_server/vector/oauth_sync.py @@ -0,0 +1,352 @@ +"""OAuth mode vector sync orchestration. + +Manages multi-user background vector sync when running in OAuth mode +with ENABLE_OFFLINE_ACCESS=true: +- User Manager: Monitors RefreshTokenStorage for user changes +- Per-User Scanners: One scanner task per provisioned user +- Shared Processor Pool: Processes documents from all users +""" + +import logging +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import anyio +from anyio.abc import TaskGroup, TaskStatus +from anyio.streams.memory import ( + MemoryObjectReceiveStream, + MemoryObjectSendStream, +) + +from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.vector.scanner import DocumentTask, scan_user_documents + +if TYPE_CHECKING: + from nextcloud_mcp_server.auth.storage import RefreshTokenStorage + from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + +logger = logging.getLogger(__name__) + +# Scopes required for vector sync operations +VECTOR_SYNC_SCOPES = [ + "notes:read", + "files:read", + "deck:read", + # "news:read", # News app may not be installed +] + + +class NotProvisionedError(Exception): + """User has not provisioned offline access or has revoked it.""" + + pass + + +@dataclass +class UserSyncState: + """State for a single user's scanner task.""" + + user_id: str + cancel_scope: anyio.CancelScope + started_at: float = field(default_factory=time.time) + + +async def get_user_client( + user_id: str, + token_broker: "TokenBrokerService", + nextcloud_host: str, +) -> NextcloudClient: + """Get an authenticated NextcloudClient for a user. + + Args: + user_id: User identifier + token_broker: Token broker for obtaining access tokens + nextcloud_host: Nextcloud base URL + + Returns: + Authenticated NextcloudClient + + Raises: + NotProvisionedError: If user has not provisioned offline access + """ + token = await token_broker.get_background_token(user_id, VECTOR_SYNC_SCOPES) + if not token: + raise NotProvisionedError(f"User {user_id} has not provisioned offline access") + + return NextcloudClient.from_token( + base_url=nextcloud_host, + token=token, + username=user_id, + ) + + +async def user_scanner_task( + user_id: str, + send_stream: MemoryObjectSendStream[DocumentTask], + shutdown_event: anyio.Event, + wake_event: anyio.Event, + token_broker: "TokenBrokerService", + nextcloud_host: str, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, +) -> None: + """Scanner task for a single user in OAuth mode. + + Gets a fresh token at the start of each scan cycle. + + Args: + user_id: User to scan + send_stream: Stream to send changed documents to processors + shutdown_event: Event signaling shutdown + wake_event: Event to trigger immediate scan + token_broker: Token broker for obtaining access tokens + nextcloud_host: Nextcloud base URL + task_status: Status object for signaling task readiness + """ + logger.info(f"[OAuth] Scanner started for user: {user_id}") + settings = get_settings() + + task_status.started() + + while not shutdown_event.is_set(): + nc_client = None + try: + # Get fresh token for this scan cycle + nc_client = await get_user_client(user_id, token_broker, nextcloud_host) + + # Scan user's documents + await scan_user_documents( + user_id=user_id, + send_stream=send_stream, + nc_client=nc_client, + ) + + except NotProvisionedError: + logger.warning( + f"[OAuth] User {user_id} no longer provisioned, stopping scanner" + ) + break + + except Exception as e: + logger.error(f"[OAuth] Scanner error for {user_id}: {e}", exc_info=True) + + finally: + if nc_client: + await nc_client.close() + + # Sleep until next interval or wake event + try: + with anyio.move_on_after(settings.vector_sync_scan_interval): + await wake_event.wait() + except anyio.get_cancelled_exc_class(): + break + + logger.info(f"[OAuth] Scanner stopped for user: {user_id}") + + +async def oauth_processor_task( + worker_id: int, + receive_stream: MemoryObjectReceiveStream[DocumentTask], + shutdown_event: anyio.Event, + token_broker: "TokenBrokerService", + nextcloud_host: str, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, +) -> None: + """Processor task for OAuth mode. + + Handles documents from any user by fetching tokens on-demand. + + Args: + worker_id: Worker identifier for logging + receive_stream: Stream to receive documents from + shutdown_event: Event signaling shutdown + token_broker: Token broker for obtaining access tokens + nextcloud_host: Nextcloud base URL + task_status: Status object for signaling task readiness + """ + from nextcloud_mcp_server.vector.processor import process_document + + logger.info(f"[OAuth] Processor {worker_id} started") + task_status.started() + + while not shutdown_event.is_set(): + doc_task = None + nc_client = None + try: + # Get document with timeout + with anyio.fail_after(1.0): + doc_task = await receive_stream.receive() + + # Get token for THIS document's user + nc_client = await get_user_client( + doc_task.user_id, token_broker, nextcloud_host + ) + + # Process the document + await process_document(doc_task, nc_client) + + except TimeoutError: + continue + + except anyio.EndOfStream: + logger.info(f"[OAuth] Processor {worker_id}: Stream closed, exiting") + break + + except NotProvisionedError: + if doc_task: + logger.warning( + f"[OAuth] User {doc_task.user_id} not provisioned, " + f"skipping {doc_task.doc_type}_{doc_task.doc_id}" + ) + continue + + except Exception as e: + if doc_task: + logger.error( + f"[OAuth] Processor {worker_id} error processing " + f"{doc_task.doc_type}_{doc_task.doc_id}: {e}", + exc_info=True, + ) + else: + logger.error(f"[OAuth] Processor {worker_id} error: {e}", exc_info=True) + + finally: + if nc_client: + await nc_client.close() + + logger.info(f"[OAuth] Processor {worker_id} stopped") + + +async def _run_user_scanner_with_scope( + user_id: str, + cancel_scope: anyio.CancelScope, + send_stream: MemoryObjectSendStream[DocumentTask], + shutdown_event: anyio.Event, + wake_event: anyio.Event, + token_broker: "TokenBrokerService", + nextcloud_host: str, + user_states: dict[str, UserSyncState], +) -> None: + """Wrapper to run scanner with cancellation scope. + + Cleans up user state on exit. + """ + cloned_stream = send_stream.clone() + try: + with cancel_scope: + await user_scanner_task( + user_id=user_id, + send_stream=cloned_stream, + shutdown_event=shutdown_event, + wake_event=wake_event, + token_broker=token_broker, + nextcloud_host=nextcloud_host, + ) + finally: + # Clean up on exit + if user_id in user_states: + del user_states[user_id] + await cloned_stream.aclose() + + +async def user_manager_task( + send_stream: MemoryObjectSendStream[DocumentTask], + shutdown_event: anyio.Event, + wake_event: anyio.Event, + token_broker: "TokenBrokerService", + refresh_token_storage: "RefreshTokenStorage", + nextcloud_host: str, + user_states: dict[str, UserSyncState], + tg: TaskGroup, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, +) -> None: + """Supervisor task that manages per-user scanners. + + Periodically polls RefreshTokenStorage to detect: + - New users who have provisioned offline access -> start scanner + - Users who have revoked access -> cancel their scanner + + Args: + send_stream: Stream to send documents to processors + shutdown_event: Event signaling shutdown + wake_event: Event to wake scanners for immediate scan + token_broker: Token broker for obtaining access tokens + refresh_token_storage: Storage for refresh tokens + nextcloud_host: Nextcloud base URL + user_states: Shared dict tracking active user scanners + tg: Task group for spawning scanner tasks + task_status: Status object for signaling task readiness + """ + settings = get_settings() + poll_interval = settings.vector_sync_user_poll_interval + + logger.info(f"[OAuth] User manager started (poll interval: {poll_interval}s)") + task_status.started() + + while not shutdown_event.is_set(): + try: + # Get current provisioned users + provisioned_users = set(await refresh_token_storage.get_all_user_ids()) + active_users = set(user_states.keys()) + + # Start scanners for new users + new_users = provisioned_users - active_users + for user_id in new_users: + logger.info( + f"[OAuth] Starting scanner for newly provisioned user: {user_id}" + ) + cancel_scope = anyio.CancelScope() + user_states[user_id] = UserSyncState( + user_id=user_id, + cancel_scope=cancel_scope, + ) + + # Start scanner in task group + tg.start_soon( + _run_user_scanner_with_scope, + user_id, + cancel_scope, + send_stream, + shutdown_event, + wake_event, + token_broker, + nextcloud_host, + user_states, + ) + + # Cancel scanners for revoked users + revoked_users = active_users - provisioned_users + for user_id in revoked_users: + logger.info(f"[OAuth] Stopping scanner for revoked user: {user_id}") + state = user_states.get(user_id) + if state: + state.cancel_scope.cancel() + # Note: state will be removed by _run_user_scanner_with_scope on exit + + if new_users: + logger.info(f"[OAuth] Started {len(new_users)} new scanner(s)") + if revoked_users: + logger.info(f"[OAuth] Stopped {len(revoked_users)} scanner(s)") + + except Exception as e: + logger.error(f"[OAuth] User manager error: {e}", exc_info=True) + + # Sleep until next poll + try: + with anyio.move_on_after(poll_interval): + await shutdown_event.wait() + except anyio.get_cancelled_exc_class(): + break + + # Cancel all remaining scanners on shutdown + logger.info( + f"[OAuth] User manager shutting down, cancelling {len(user_states)} scanner(s)" + ) + for state in list(user_states.values()): + state.cancel_scope.cancel() + + logger.info("[OAuth] User manager stopped")