feat(vector-sync): enable background sync in OAuth mode

Add multi-user background vector synchronization when running in OAuth
mode with ENABLE_OFFLINE_ACCESS=true. Key changes:

Architecture (oauth_sync.py):
- User Manager task polls RefreshTokenStorage for provisioned users
- Per-user scanner tasks fetch documents using OAuth tokens
- Shared processor pool indexes documents from all users

Token Broker improvements:
- Accept client_id/client_secret instead of encryption_key
- Remove redundant token audience pre-validation (Nextcloud validates)
- Add _rewrite_token_endpoint for Docker internal URL routing
- Remove double-decryption (storage handles encryption internally)

Browser OAuth flow fixes:
- Add 'resource' parameter to request Nextcloud-scoped tokens
- Store and retrieve next_url for proper redirect after consent
- Rewrite token endpoint URLs for internal Docker access

Configuration:
- Add vector_sync_user_poll_interval setting (default: 60s)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-12-14 20:00:41 +01:00
parent 1a079a41e7
commit a58a14111b
5 changed files with 723 additions and 43 deletions
+258 -5
View File
@@ -676,6 +676,29 @@ async def setup_oauth_config():
logger.info(f"OIDC_JWKS_URI override: {jwks_uri}{jwks_uri_override}")
jwks_uri = jwks_uri_override
# Rewrite discovered endpoint URLs from public issuer to internal host
# This is needed when OIDC discovery returns public URLs (e.g., http://localhost:8080)
# but the server needs to access them via internal docker network (e.g., http://app:80)
from urllib.parse import urlparse
issuer_parsed = urlparse(issuer)
nextcloud_parsed = urlparse(nextcloud_host)
issuer_base = f"{issuer_parsed.scheme}://{issuer_parsed.netloc}"
nextcloud_base = f"{nextcloud_parsed.scheme}://{nextcloud_parsed.netloc}"
if issuer_base != nextcloud_base:
logger.info(f"Rewriting OIDC endpoints: {issuer_base}{nextcloud_base}")
def rewrite_url(url: str | None) -> str | None:
if url and url.startswith(issuer_base):
return url.replace(issuer_base, nextcloud_base, 1)
return url
userinfo_uri = rewrite_url(userinfo_uri) or userinfo_uri
jwks_uri = rewrite_url(jwks_uri)
introspection_uri = rewrite_url(introspection_uri)
registration_endpoint = rewrite_url(registration_endpoint)
logger.info("OIDC endpoints discovered:")
logger.info(f" Issuer: {issuer}")
logger.info(f" Userinfo: {userinfo_uri}")
@@ -687,8 +710,6 @@ async def setup_oauth_config():
# Auto-detect provider mode based on issuer
# External IdP mode: issuer doesn't match Nextcloud host
# Normalize URLs for comparison (handle port differences like :80 for HTTP)
from urllib.parse import urlparse
def normalize_url(url: str) -> str:
"""Normalize URL by removing default ports (80 for HTTP, 443 for HTTPS)."""
parsed = urlparse(url)
@@ -704,7 +725,16 @@ async def setup_oauth_config():
issuer_normalized = normalize_url(issuer)
nextcloud_normalized = normalize_url(nextcloud_host)
is_external_idp = not issuer_normalized.startswith(nextcloud_normalized)
# Use NEXTCLOUD_PUBLIC_ISSUER_URL for IdP detection when set
# This handles the case where MCP server accesses Nextcloud via internal URL (http://app:80)
# but the issuer in OIDC discovery is the public URL (http://localhost:8080)
public_issuer_for_detection = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer_for_detection:
comparison_issuer = normalize_url(public_issuer_for_detection)
else:
comparison_issuer = nextcloud_normalized
is_external_idp = not issuer_normalized.startswith(comparison_issuer)
if is_external_idp:
oauth_provider = "external" # Could be Keycloak, Auth0, Okta, etc.
@@ -716,6 +746,28 @@ async def setup_oauth_config():
oauth_provider = "nextcloud"
logger.info("✓ Detected integrated mode (Nextcloud OIDC app)")
# For integrated mode, rewrite OIDC endpoints to use internal URL
# The discovery document returns external URLs (http://localhost:8080)
# but the MCP server needs internal URLs (http://app:80) for backend requests
if jwks_uri and not os.getenv("OIDC_JWKS_URI"):
internal_jwks_uri = f"{nextcloud_host}/apps/oidc/jwks"
logger.info(
f" Auto-rewriting JWKS URI for internal access: {jwks_uri}{internal_jwks_uri}"
)
jwks_uri = internal_jwks_uri
if introspection_uri and not os.getenv("OIDC_INTROSPECTION_URI"):
internal_introspection_uri = f"{nextcloud_host}/apps/oidc/introspect"
logger.info(
f" Auto-rewriting introspection URI for internal access: {introspection_uri}{internal_introspection_uri}"
)
introspection_uri = internal_introspection_uri
if userinfo_uri:
internal_userinfo_uri = f"{nextcloud_host}/apps/oidc/userinfo"
logger.info(
f" Auto-rewriting userinfo URI for internal access: {userinfo_uri}{internal_userinfo_uri}"
)
userinfo_uri = internal_userinfo_uri
# Check if offline access (refresh tokens) is enabled
enable_offline_access = os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() in (
"true",
@@ -1234,12 +1286,20 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
)
break
# Start background vector sync tasks for BasicAuth mode (ADR-007)
# Start background vector sync tasks (ADR-007)
# Scanner runs at server-level (once), not per-session
import anyio as anyio_module
settings = get_settings()
if not oauth_enabled and settings.vector_sync_enabled:
# Check if vector sync is enabled and determine the mode
enable_offline_access_for_sync = os.getenv(
"ENABLE_OFFLINE_ACCESS", "false"
).lower() in ("true", "1", "yes")
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if settings.vector_sync_enabled and not oauth_enabled:
# BasicAuth mode - single user sync
logger.info("Starting background vector sync tasks for BasicAuth mode")
# Get username from environment
@@ -1334,8 +1394,161 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
shutdown_event.set()
await client.close()
# TaskGroup automatically cancels all tasks on exit
elif (
settings.vector_sync_enabled
and oauth_enabled
and enable_offline_access_for_sync
and refresh_token_storage
and encryption_key
):
# OAuth mode with offline access - multi-user sync
logger.info("Starting background vector sync tasks for OAuth mode")
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.vector.oauth_sync import (
oauth_processor_task,
user_manager_task,
)
# Get OIDC discovery URL (same as used for OAuth setup)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
)
# Get client credentials from oauth_context (set by setup_oauth_config)
# This includes credentials from DCR if dynamic registration was used
# Use different variable names to avoid shadowing client_id/client_secret from outer scope
oauth_ctx = getattr(app.state, "oauth_context", {})
oauth_config = oauth_ctx.get("config", {})
sync_client_id = oauth_config.get("client_id")
sync_client_secret = oauth_config.get("client_secret")
if not sync_client_id or not sync_client_secret:
logger.error(
"Cannot start OAuth vector sync: client credentials not found in oauth_context"
)
raise ValueError("OAuth client credentials required for vector sync")
# Create token broker for background operations
# Note: storage handles encryption internally, no key needed here
# Client credentials are needed for token refresh operations
token_broker = TokenBrokerService(
storage=refresh_token_storage,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host,
client_id=sync_client_id,
client_secret=sync_client_secret,
)
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio_module.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio_module.Event()
scanner_wake_event = anyio_module.Event()
# User state tracking for user manager
user_states: dict = {}
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Also store in module singleton for FastMCP session lifespans
_vector_sync_state.document_send_stream = send_stream
_vector_sync_state.document_receive_stream = receive_stream
_vector_sync_state.shutdown_event = shutdown_event
_vector_sync_state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state stored in module singleton")
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.document_send_stream = send_stream
route.app.state.document_receive_stream = receive_stream
route.app.state.shutdown_event = shutdown_event
route.app.state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state shared with browser_app for /app")
break
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start user manager task (supervises per-user scanners)
await tg.start(
user_manager_task,
send_stream,
shutdown_event,
scanner_wake_event,
token_broker,
refresh_token_storage,
nextcloud_host,
user_states,
tg,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
oauth_processor_task,
i,
receive_stream.clone(),
shutdown_event,
token_broker,
nextcloud_host,
)
logger.info(
f"Background sync tasks started: 1 user manager + "
f"{settings.vector_sync_processor_workers} processors"
)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
# Close token broker HTTP client
if token_broker._http_client:
await token_broker._http_client.aclose()
# TaskGroup automatically cancels all tasks on exit
else:
# No vector sync - just run MCP session manager
if settings.vector_sync_enabled:
# Log why vector sync is not starting
if oauth_enabled and not enable_offline_access_for_sync:
logger.warning(
"Vector sync enabled but ENABLE_OFFLINE_ACCESS=false - "
"vector sync requires offline access in OAuth mode"
)
elif oauth_enabled and not refresh_token_storage:
logger.warning(
"Vector sync enabled but refresh token storage not available"
)
elif oauth_enabled and not encryption_key:
logger.warning(
"Vector sync enabled but TOKEN_ENCRYPTION_KEY not set"
)
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
@@ -1491,6 +1704,46 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
)
logger.info("Test webhook endpoint enabled: /webhooks/nextcloud")
# Add management API endpoints for Nextcloud PHP app (OAuth mode only)
if oauth_enabled:
from nextcloud_mcp_server.api.management import (
get_server_status,
get_user_session,
get_vector_sync_status,
revoke_user_access,
vector_search,
)
routes.append(Route("/api/v1/status", get_server_status, methods=["GET"]))
routes.append(
Route(
"/api/v1/vector-sync/status",
get_vector_sync_status,
methods=["GET"],
)
)
routes.append(
Route(
"/api/v1/users/{user_id}/session",
get_user_session,
methods=["GET"],
)
)
routes.append(
Route(
"/api/v1/users/{user_id}/revoke",
revoke_user_access,
methods=["POST"],
)
)
routes.append(
Route("/api/v1/vector-viz/search", vector_search, methods=["POST"])
)
logger.info(
"Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, "
"/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, /api/v1/vector-viz/search"
)
# ADR-016: Add Smithery well-known config endpoint for container runtime discovery
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
@@ -50,6 +50,10 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
logger.info(f"oauth_login called - client_id: {oauth_config.get('client_id')}")
logger.info(f"oauth_login called - oauth_client: {oauth_client is not None}")
# Get redirect URL from query params (default to /app)
next_url = request.query_params.get("next", "/app")
logger.info(f"oauth_login - next_url: {next_url}")
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
@@ -71,7 +75,7 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
await storage.store_oauth_session(
session_id=state, # Use state as session ID
client_id="browser-ui",
client_redirect_uri="/app",
client_redirect_uri=next_url, # Store the redirect URL for after auth
state=state,
code_challenge=code_challenge,
code_challenge_method="S256",
@@ -85,6 +89,11 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
if not oauth_client.authorization_endpoint:
await oauth_client.discover()
# Get Nextcloud resource URI for audience (background sync needs Nextcloud-scoped tokens)
nextcloud_resource_uri = oauth_config.get(
"nextcloud_resource_uri", oauth_config.get("nextcloud_host")
)
idp_params = {
"client_id": oauth_client.client_id,
"redirect_uri": callback_uri,
@@ -94,6 +103,7 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "consent", # Ensure refresh token
"resource": nextcloud_resource_uri, # Request tokens for Nextcloud API access
}
auth_url = f"{oauth_client.authorization_endpoint}?{urlencode(idp_params)}"
@@ -131,6 +141,11 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
f"{public_parsed.scheme}://{public_parsed.netloc}{auth_parsed.path}"
)
# Get Nextcloud resource URI for audience (background sync needs Nextcloud-scoped tokens)
nextcloud_resource_uri = oauth_config.get(
"nextcloud_resource_uri", oauth_config.get("nextcloud_host")
)
idp_params = {
"client_id": oauth_config["client_id"],
"redirect_uri": callback_uri,
@@ -140,6 +155,7 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "consent", # Ensure refresh token
"resource": nextcloud_resource_uri, # Request tokens for Nextcloud API access
}
# Debug: Log full parameters
@@ -214,12 +230,15 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
oauth_client = oauth_ctx["oauth_client"]
oauth_config = oauth_ctx["config"]
# Retrieve code_verifier from session storage (PKCE required for all modes)
# Retrieve code_verifier and redirect URL from session storage
code_verifier = ""
next_url = "/app" # Default redirect
oauth_session = await storage.get_oauth_session(state)
if oauth_session:
# code_verifier was stored in mcp_authorization_code field
code_verifier = oauth_session.get("mcp_authorization_code", "")
# next_url was stored in client_redirect_uri field
next_url = oauth_session.get("client_redirect_uri", "/app")
# Clean up the temporary session
# Note: We don't have delete_oauth_session method, but it will expire after TTL
@@ -262,6 +281,25 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
discovery = response.json()
token_endpoint = discovery["token_endpoint"]
# Rewrite token_endpoint from public URL to internal Docker URL
# Discovery document returns public URLs (e.g., http://localhost:8080/...)
# but server-side requests must use internal Docker network (e.g., http://app:80/...)
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
from urllib.parse import urlparse as parse_url
internal_host = oauth_config["nextcloud_host"]
internal_parsed = parse_url(internal_host)
token_parsed = parse_url(token_endpoint)
public_parsed = parse_url(public_issuer)
if token_parsed.hostname == public_parsed.hostname:
# Replace public URL with internal Docker URL
token_endpoint = f"{internal_parsed.scheme}://{internal_parsed.netloc}{token_parsed.path}"
logger.info(
f"Rewrote token endpoint to internal URL: {token_endpoint}"
)
token_params = {
"grant_type": "authorization_code",
"code": code,
@@ -383,7 +421,8 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
# Continue anyway - profile cache is optional for browser UI
# Create response and set session cookie
response = RedirectResponse("/app", status_code=302)
# Redirect to stored next_url (from OAuth session) or /app as default
response = RedirectResponse(next_url, status_code=302)
response.set_cookie(
key="mcp_session",
value=user_id,
+67 -35
View File
@@ -21,7 +21,6 @@ from typing import Dict, Optional, Tuple
import anyio
import httpx
import jwt
from cryptography.fernet import Fernet
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_exchange import exchange_token_for_delegation
@@ -104,7 +103,8 @@ class TokenBrokerService:
storage: RefreshTokenStorage,
oidc_discovery_url: str,
nextcloud_host: str,
encryption_key: str,
client_id: str,
client_secret: str,
cache_ttl: int = 300,
cache_early_refresh: int = 30,
):
@@ -112,21 +112,19 @@ class TokenBrokerService:
Initialize the Token Broker Service.
Args:
storage: Database storage for refresh tokens
storage: Database storage for refresh tokens (handles encryption internally)
oidc_discovery_url: OIDC provider discovery URL
nextcloud_host: Nextcloud server URL
encryption_key: Fernet key for token encryption
client_id: OAuth client ID for token operations
client_secret: OAuth client secret for token operations
cache_ttl: Cache TTL in seconds (default: 5 minutes)
cache_early_refresh: Early refresh threshold in seconds (default: 30 seconds)
"""
self.storage = storage
self.oidc_discovery_url = oidc_discovery_url
self.nextcloud_host = nextcloud_host
self.fernet = Fernet(
encryption_key.encode()
if isinstance(encryption_key, str)
else encryption_key
)
self.client_id = client_id
self.client_secret = client_secret
self.cache = TokenCache(cache_ttl, cache_early_refresh)
self._oidc_config = None
self._http_client = None
@@ -148,6 +146,37 @@ class TokenBrokerService:
self._oidc_config = response.json()
return self._oidc_config
def _rewrite_token_endpoint(self, token_endpoint: str) -> str:
"""Rewrite token endpoint from public URL to internal Docker URL.
OIDC discovery documents return public URLs (e.g., http://localhost:8080/...)
but server-side requests must use internal Docker network (e.g., http://app:80/...).
Args:
token_endpoint: Token endpoint URL from discovery document
Returns:
Rewritten URL using internal Docker host
"""
import os
from urllib.parse import urlparse
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if not public_issuer:
return token_endpoint
internal_parsed = urlparse(self.nextcloud_host)
token_parsed = urlparse(token_endpoint)
public_parsed = urlparse(public_issuer)
if token_parsed.hostname == public_parsed.hostname:
# Replace public URL with internal Docker URL
rewritten = f"{internal_parsed.scheme}://{internal_parsed.netloc}{token_parsed.path}"
logger.info(f"Rewrote token endpoint: {token_endpoint} -> {rewritten}")
return rewritten
return token_endpoint
async def get_nextcloud_token(self, user_id: str) -> Optional[str]:
"""
Get a valid Nextcloud access token for the user.
@@ -180,9 +209,8 @@ class TokenBrokerService:
return None
try:
# Decrypt refresh token
encrypted_token = refresh_data["refresh_token"]
refresh_token = self.fernet.decrypt(encrypted_token.encode()).decode()
# storage.get_refresh_token() returns already-decrypted token
refresh_token = refresh_data["refresh_token"]
# Exchange refresh token for new access token
access_token, expires_in = await self._refresh_access_token(refresh_token)
@@ -282,9 +310,8 @@ class TokenBrokerService:
return None
try:
# Decrypt refresh token
encrypted_token = refresh_data["refresh_token"]
refresh_token = self.fernet.decrypt(encrypted_token.encode()).decode()
# storage.get_refresh_token() returns already-decrypted token
refresh_token = refresh_data["refresh_token"]
# Get token with specific scopes for background operation
access_token, expires_in = await self._refresh_access_token_with_scopes(
@@ -301,7 +328,10 @@ class TokenBrokerService:
return access_token
except Exception as e:
logger.error(f"Failed to get background token for user {user_id}: {e}")
logger.error(
f"Failed to get background token for user {user_id}: {e}",
exc_info=True,
)
await self.cache.invalidate(cache_key)
return None
@@ -318,15 +348,18 @@ class TokenBrokerService:
Tuple of (access_token, expires_in_seconds)
"""
config = await self._get_oidc_config()
token_endpoint = config["token_endpoint"]
token_endpoint = self._rewrite_token_endpoint(config["token_endpoint"])
client = await self._get_http_client()
# Request new access token using refresh token
# Include client credentials as required by most OAuth servers
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "openid profile email notes:read notes:write calendar:read calendar:write",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
response = await client.post(
@@ -345,8 +378,7 @@ class TokenBrokerService:
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600) # Default 1 hour
# Validate audience
await self._validate_token_audience(access_token, "nextcloud")
# Note: Nextcloud validates token audience on API calls - no need to pre-validate here
logger.info(f"Refreshed access token (expires in {expires_in}s)")
return access_token, expires_in
@@ -367,7 +399,7 @@ class TokenBrokerService:
Tuple of (access_token, expires_in_seconds)
"""
config = await self._get_oidc_config()
token_endpoint = config["token_endpoint"]
token_endpoint = self._rewrite_token_endpoint(config["token_endpoint"])
client = await self._get_http_client()
@@ -375,12 +407,19 @@ class TokenBrokerService:
scopes = list(set(["openid", "profile", "email"] + required_scopes))
# Request new access token with specific scopes
# Include client credentials as required by most OAuth servers
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": " ".join(scopes),
"client_id": self.client_id,
"client_secret": self.client_secret,
}
logger.info(
f"Token refresh request to {token_endpoint} with client_id={self.client_id[:16]}..."
)
response = await client.post(
token_endpoint,
data=data,
@@ -391,14 +430,14 @@ class TokenBrokerService:
logger.error(
f"Token refresh with scopes failed: {response.status_code} - {response.text}"
)
logger.error(f" client_id used: {self.client_id[:16]}...")
raise Exception(f"Token refresh failed: {response.status_code}")
token_data = response.json()
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600) # Default 1 hour
# Validate audience
await self._validate_token_audience(access_token, "nextcloud")
# Note: Nextcloud validates token audience on API calls - no need to pre-validate here
logger.info(
f"Refreshed access token with scopes {scopes} (expires in {expires_in}s)"
@@ -453,11 +492,8 @@ class TokenBrokerService:
return False
try:
# Decrypt current refresh token
encrypted_token = refresh_data["refresh_token"]
current_refresh_token = self.fernet.decrypt(
encrypted_token.encode()
).decode()
# storage.get_refresh_token() returns already-decrypted token
current_refresh_token = refresh_data["refresh_token"]
# Get OIDC configuration
config = await self._get_oidc_config()
@@ -486,11 +522,10 @@ class TokenBrokerService:
new_refresh_token = token_data.get("refresh_token")
if new_refresh_token and new_refresh_token != current_refresh_token:
# Encrypt and store new refresh token
encrypted_new = self.fernet.encrypt(new_refresh_token.encode()).decode()
# storage.store_refresh_token() handles encryption internally
await self.storage.store_refresh_token(
user_id=user_id,
refresh_token=encrypted_new,
refresh_token=new_refresh_token,
expires_at=datetime.now(timezone.utc)
+ timedelta(days=90), # 90-day expiry
)
@@ -536,11 +571,8 @@ class TokenBrokerService:
refresh_data = await self.storage.get_refresh_token(user_id)
if refresh_data:
try:
# Attempt to revoke at IdP
encrypted_token = refresh_data["refresh_token"]
refresh_token = self.fernet.decrypt(
encrypted_token.encode()
).decode()
# storage.get_refresh_token() returns already-decrypted token
refresh_token = refresh_data["refresh_token"]
await self._revoke_token_at_idp(refresh_token)
except Exception as e:
logger.warning(f"Failed to revoke at IdP: {e}")
+4
View File
@@ -205,6 +205,7 @@ class Settings:
vector_sync_scan_interval: int = 300 # seconds (5 minutes)
vector_sync_processor_workers: int = 3
vector_sync_queue_max_size: int = 10000
vector_sync_user_poll_interval: int = 60 # seconds - OAuth mode user discovery
# Qdrant settings (mutually exclusive modes)
qdrant_url: Optional[str] = None # Network mode: http://qdrant:6333
@@ -391,6 +392,9 @@ def get_settings() -> Settings:
vector_sync_queue_max_size=int(
os.getenv("VECTOR_SYNC_QUEUE_MAX_SIZE", "10000")
),
vector_sync_user_poll_interval=int(
os.getenv("VECTOR_SYNC_USER_POLL_INTERVAL", "60")
),
# Qdrant settings
qdrant_url=os.getenv("QDRANT_URL"),
qdrant_location=os.getenv("QDRANT_LOCATION"),
+352
View File
@@ -0,0 +1,352 @@
"""OAuth mode vector sync orchestration.
Manages multi-user background vector sync when running in OAuth mode
with ENABLE_OFFLINE_ACCESS=true:
- User Manager: Monitors RefreshTokenStorage for user changes
- Per-User Scanners: One scanner task per provisioned user
- Shared Processor Pool: Processes documents from all users
"""
import logging
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
import anyio
from anyio.abc import TaskGroup, TaskStatus
from anyio.streams.memory import (
MemoryObjectReceiveStream,
MemoryObjectSendStream,
)
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.scanner import DocumentTask, scan_user_documents
if TYPE_CHECKING:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
logger = logging.getLogger(__name__)
# Scopes required for vector sync operations
VECTOR_SYNC_SCOPES = [
"notes:read",
"files:read",
"deck:read",
# "news:read", # News app may not be installed
]
class NotProvisionedError(Exception):
"""User has not provisioned offline access or has revoked it."""
pass
@dataclass
class UserSyncState:
"""State for a single user's scanner task."""
user_id: str
cancel_scope: anyio.CancelScope
started_at: float = field(default_factory=time.time)
async def get_user_client(
user_id: str,
token_broker: "TokenBrokerService",
nextcloud_host: str,
) -> NextcloudClient:
"""Get an authenticated NextcloudClient for a user.
Args:
user_id: User identifier
token_broker: Token broker for obtaining access tokens
nextcloud_host: Nextcloud base URL
Returns:
Authenticated NextcloudClient
Raises:
NotProvisionedError: If user has not provisioned offline access
"""
token = await token_broker.get_background_token(user_id, VECTOR_SYNC_SCOPES)
if not token:
raise NotProvisionedError(f"User {user_id} has not provisioned offline access")
return NextcloudClient.from_token(
base_url=nextcloud_host,
token=token,
username=user_id,
)
async def user_scanner_task(
user_id: str,
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
token_broker: "TokenBrokerService",
nextcloud_host: str,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
) -> None:
"""Scanner task for a single user in OAuth mode.
Gets a fresh token at the start of each scan cycle.
Args:
user_id: User to scan
send_stream: Stream to send changed documents to processors
shutdown_event: Event signaling shutdown
wake_event: Event to trigger immediate scan
token_broker: Token broker for obtaining access tokens
nextcloud_host: Nextcloud base URL
task_status: Status object for signaling task readiness
"""
logger.info(f"[OAuth] Scanner started for user: {user_id}")
settings = get_settings()
task_status.started()
while not shutdown_event.is_set():
nc_client = None
try:
# Get fresh token for this scan cycle
nc_client = await get_user_client(user_id, token_broker, nextcloud_host)
# Scan user's documents
await scan_user_documents(
user_id=user_id,
send_stream=send_stream,
nc_client=nc_client,
)
except NotProvisionedError:
logger.warning(
f"[OAuth] User {user_id} no longer provisioned, stopping scanner"
)
break
except Exception as e:
logger.error(f"[OAuth] Scanner error for {user_id}: {e}", exc_info=True)
finally:
if nc_client:
await nc_client.close()
# Sleep until next interval or wake event
try:
with anyio.move_on_after(settings.vector_sync_scan_interval):
await wake_event.wait()
except anyio.get_cancelled_exc_class():
break
logger.info(f"[OAuth] Scanner stopped for user: {user_id}")
async def oauth_processor_task(
worker_id: int,
receive_stream: MemoryObjectReceiveStream[DocumentTask],
shutdown_event: anyio.Event,
token_broker: "TokenBrokerService",
nextcloud_host: str,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
) -> None:
"""Processor task for OAuth mode.
Handles documents from any user by fetching tokens on-demand.
Args:
worker_id: Worker identifier for logging
receive_stream: Stream to receive documents from
shutdown_event: Event signaling shutdown
token_broker: Token broker for obtaining access tokens
nextcloud_host: Nextcloud base URL
task_status: Status object for signaling task readiness
"""
from nextcloud_mcp_server.vector.processor import process_document
logger.info(f"[OAuth] Processor {worker_id} started")
task_status.started()
while not shutdown_event.is_set():
doc_task = None
nc_client = None
try:
# Get document with timeout
with anyio.fail_after(1.0):
doc_task = await receive_stream.receive()
# Get token for THIS document's user
nc_client = await get_user_client(
doc_task.user_id, token_broker, nextcloud_host
)
# Process the document
await process_document(doc_task, nc_client)
except TimeoutError:
continue
except anyio.EndOfStream:
logger.info(f"[OAuth] Processor {worker_id}: Stream closed, exiting")
break
except NotProvisionedError:
if doc_task:
logger.warning(
f"[OAuth] User {doc_task.user_id} not provisioned, "
f"skipping {doc_task.doc_type}_{doc_task.doc_id}"
)
continue
except Exception as e:
if doc_task:
logger.error(
f"[OAuth] Processor {worker_id} error processing "
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
exc_info=True,
)
else:
logger.error(f"[OAuth] Processor {worker_id} error: {e}", exc_info=True)
finally:
if nc_client:
await nc_client.close()
logger.info(f"[OAuth] Processor {worker_id} stopped")
async def _run_user_scanner_with_scope(
user_id: str,
cancel_scope: anyio.CancelScope,
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
token_broker: "TokenBrokerService",
nextcloud_host: str,
user_states: dict[str, UserSyncState],
) -> None:
"""Wrapper to run scanner with cancellation scope.
Cleans up user state on exit.
"""
cloned_stream = send_stream.clone()
try:
with cancel_scope:
await user_scanner_task(
user_id=user_id,
send_stream=cloned_stream,
shutdown_event=shutdown_event,
wake_event=wake_event,
token_broker=token_broker,
nextcloud_host=nextcloud_host,
)
finally:
# Clean up on exit
if user_id in user_states:
del user_states[user_id]
await cloned_stream.aclose()
async def user_manager_task(
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
token_broker: "TokenBrokerService",
refresh_token_storage: "RefreshTokenStorage",
nextcloud_host: str,
user_states: dict[str, UserSyncState],
tg: TaskGroup,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
) -> None:
"""Supervisor task that manages per-user scanners.
Periodically polls RefreshTokenStorage to detect:
- New users who have provisioned offline access -> start scanner
- Users who have revoked access -> cancel their scanner
Args:
send_stream: Stream to send documents to processors
shutdown_event: Event signaling shutdown
wake_event: Event to wake scanners for immediate scan
token_broker: Token broker for obtaining access tokens
refresh_token_storage: Storage for refresh tokens
nextcloud_host: Nextcloud base URL
user_states: Shared dict tracking active user scanners
tg: Task group for spawning scanner tasks
task_status: Status object for signaling task readiness
"""
settings = get_settings()
poll_interval = settings.vector_sync_user_poll_interval
logger.info(f"[OAuth] User manager started (poll interval: {poll_interval}s)")
task_status.started()
while not shutdown_event.is_set():
try:
# Get current provisioned users
provisioned_users = set(await refresh_token_storage.get_all_user_ids())
active_users = set(user_states.keys())
# Start scanners for new users
new_users = provisioned_users - active_users
for user_id in new_users:
logger.info(
f"[OAuth] Starting scanner for newly provisioned user: {user_id}"
)
cancel_scope = anyio.CancelScope()
user_states[user_id] = UserSyncState(
user_id=user_id,
cancel_scope=cancel_scope,
)
# Start scanner in task group
tg.start_soon(
_run_user_scanner_with_scope,
user_id,
cancel_scope,
send_stream,
shutdown_event,
wake_event,
token_broker,
nextcloud_host,
user_states,
)
# Cancel scanners for revoked users
revoked_users = active_users - provisioned_users
for user_id in revoked_users:
logger.info(f"[OAuth] Stopping scanner for revoked user: {user_id}")
state = user_states.get(user_id)
if state:
state.cancel_scope.cancel()
# Note: state will be removed by _run_user_scanner_with_scope on exit
if new_users:
logger.info(f"[OAuth] Started {len(new_users)} new scanner(s)")
if revoked_users:
logger.info(f"[OAuth] Stopped {len(revoked_users)} scanner(s)")
except Exception as e:
logger.error(f"[OAuth] User manager error: {e}", exc_info=True)
# Sleep until next poll
try:
with anyio.move_on_after(poll_interval):
await shutdown_event.wait()
except anyio.get_cancelled_exc_class():
break
# Cancel all remaining scanners on shutdown
logger.info(
f"[OAuth] User manager shutting down, cancelling {len(user_states)} scanner(s)"
)
for state in list(user_states.values()):
state.cancel_scope.cancel()
logger.info("[OAuth] User manager stopped")