diff --git a/docs/ADR-002-vector-sync-authentication.md b/docs/ADR-002-vector-sync-authentication.md index 5322651..42b5b2f 100644 --- a/docs/ADR-002-vector-sync-authentication.md +++ b/docs/ADR-002-vector-sync-authentication.md @@ -1,7 +1,7 @@ # ADR-002: Vector Database Background Sync Authentication ## Status -Proposed +Accepted - Tier 2 (Token Exchange) Implemented ## Context @@ -39,27 +39,77 @@ We need an OAuth-native solution that maintains security while enabling backgrou ## Decision -We will implement a **tiered authentication strategy** that leverages OAuth standards with graceful fallback: +We will implement a **tiered OAuth authentication strategy** for background operations in OAuth mode. When OAuth authentication is not configured or available, the background sync feature is not available. -### Primary Strategy: OAuth-Based Authentication +**Note**: This ADR applies only to **OAuth mode**. In BasicAuth mode (single-user deployments), credentials are already available via environment variables, and background operations work without additional configuration. -**Tier 1: Offline Access with Refresh Tokens** (Preferred) -- Request `offline_access` scope during OAuth client registration -- Receive and securely store user refresh tokens -- Background worker exchanges refresh tokens for access tokens as needed -- Respects per-user permissions and provides full audit trail +### Tier 1: Service Account Token (client_credentials) ✅ **IMPLEMENTED** -**Tier 2: Token Exchange (RFC 8693)** (If supported) -- Service account exchanges its token for user-scoped tokens on-demand -- No token storage required -- Only available if OIDC provider implements RFC 8693 +**Most Compatible Option** - Works with all OIDC providers supporting `client_credentials` -### Fallback Strategy: Admin Credentials +- MCP server obtains service account token via `client_credentials` grant +- Background worker uses service account token directly +- No user-specific delegation or impersonation +- **Implementation**: `KeycloakOAuthClient.get_service_account_token()` (keycloak_oauth.py:341-395) +- **Testing**: Manual test in `tests/manual/test_token_exchange.py` +- **TODO**: Automated integration tests needed for both Keycloak and Nextcloud OIDC app -**Tier 3: Admin BasicAuth** (Development/Simple Deployments) -- Dedicated sync account with read-only permissions -- Clear documentation of security implications -- Recommended only for trusted environments +**Trade-offs**: +- ✅ Works with nearly all OIDC providers +- ✅ Simple implementation and configuration +- ✅ No additional provider features required +- ❌ Service account needs broad permissions across users +- ❌ Less granular audit trail (all actions attributed to service account) +- ❌ No per-user permission enforcement + +### Tier 2: Token Exchange with Impersonation (RFC 8693) ⚠️ **NOT IMPLEMENTED** + +**Better Security** - Requires provider support for user impersonation + +- Service account exchanges token to impersonate specific users +- Each background operation runs as the target user +- Uses `requested_subject` parameter in token exchange +- Per-user permission enforcement at API level + +**Requirements**: +- OIDC provider supports RFC 8693 token exchange +- Provider supports user impersonation (rare - requires Legacy Keycloak V1 with preview features) +- Service account has impersonation permissions + +**Status**: ⚠️ Not implemented - Keycloak Standard V2 doesn't support impersonation +**Reference**: See `docs/oauth-impersonation-findings.md` for investigation details + +### Tier 3: Token Exchange with Delegation (RFC 8693) ✅ **IMPLEMENTED** + +**Best Security** - Requires provider support for delegation with `act` claim + +- Service account exchanges token on behalf of users (delegation, not impersonation) +- Token includes `act` claim showing service account as actor +- API sees both the user (`sub`) and actor (`act`) in token +- Full audit trail of delegated operations +- **Implementation**: `KeycloakOAuthClient.exchange_token_for_user()` (keycloak_oauth.py:397-495) +- **Testing**: Manual test in `tests/manual/test_token_exchange.py` +- **Limitation**: Keycloak doesn't support `act` claim yet - [Issue #38279](https://github.com/keycloak/keycloak/issues/38279) + +**Requirements**: +- OIDC provider supports RFC 8693 token exchange +- Provider supports delegation with `act` claim (very rare) +- Proper token exchange permissions configured + +**Current Implementation**: Internal-to-internal token exchange with audience modification (without `act` claim) + +### ❌ Will Not Implement + +**1. Offline Access with Refresh Tokens** +- **MCP Protocol Architecture**: FastMCP SDK manages OAuth where MCP Client handles refresh tokens +- **Security Model**: Refresh tokens must never be shared between client and server (OAuth best practice) +- **Technical Impossibility**: MCP Server has no access to refresh tokens from the OAuth callback +- **Alternative**: Token exchange provides similar benefits without violating OAuth security model + +**2. Admin Credentials Fallback** +- **Out of Scope**: This ADR focuses on OAuth mode only +- **Not Appropriate**: Admin credentials bypass OAuth security model +- **BasicAuth Mode**: For single-user deployments needing background operations, use BasicAuth mode instead ### Key Architectural Principles @@ -72,169 +122,9 @@ We will implement a **tiered authentication strategy** that leverages OAuth stan ## Implementation Details -### 1. Offline Access Flow (Tier 1) +### 1. Service Account Token (Tier 1 - Primary) ✅ IMPLEMENTED -#### 1.1 Client Registration -```python -# During OAuth client registration -client_metadata = { - "client_name": "Nextcloud MCP Server", - "redirect_uris": ["http://localhost:8000/oauth/callback"], - "grant_types": ["authorization_code", "refresh_token"], - "scope": "openid profile email offline_access notes:read files:read ...", - "token_type": "Bearer" # or "jwt" -} -``` - -#### 1.2 Token Storage -```python -# Encrypted token storage -class RefreshTokenStorage: - """Securely store and manage user refresh tokens""" - - def __init__(self, db_path: str, encryption_key: bytes): - self.db = Database(db_path) - self.cipher = Fernet(encryption_key) - - async def store_refresh_token( - self, - user_id: str, - refresh_token: str, - expires_at: int | None = None - ): - """Store encrypted refresh token for user""" - encrypted_token = self.cipher.encrypt(refresh_token.encode()) - await self.db.execute( - "INSERT OR REPLACE INTO refresh_tokens VALUES (?, ?, ?, ?)", - (user_id, encrypted_token, expires_at, int(time.time())) - ) - - async def get_refresh_token(self, user_id: str) -> str | None: - """Retrieve and decrypt refresh token""" - row = await self.db.fetch_one( - "SELECT encrypted_token FROM refresh_tokens WHERE user_id = ?", - (user_id,) - ) - if row: - return self.cipher.decrypt(row[0]).decode() - return None -``` - -#### 1.3 Token Refresh Flow -```python -async def get_user_access_token(user_id: str) -> str: - """Exchange refresh token for fresh access token""" - - # Retrieve stored refresh token - refresh_token = await token_storage.get_refresh_token(user_id) - if not refresh_token: - raise ValueError(f"No refresh token for user {user_id}") - - # Exchange for access token - async with httpx.AsyncClient() as client: - response = await client.post( - token_endpoint, - data={ - "grant_type": "refresh_token", - "refresh_token": refresh_token - }, - auth=(client_id, client_secret) - ) - response.raise_for_status() - token_data = response.json() - - # Store new refresh token if rotated - if "refresh_token" in token_data: - await token_storage.store_refresh_token( - user_id, - token_data["refresh_token"], - token_data.get("refresh_expires_in") - ) - - return token_data["access_token"] -``` - -#### 1.4 Capturing Refresh Tokens - -**Challenge**: MCP protocol doesn't expose refresh tokens to server - -**Solution**: Intercept OAuth callback -```python -# Add route to MCP server -@app.route("/oauth/callback") -async def oauth_callback(request): - """Capture OAuth callback and store refresh token""" - - code = request.query_params.get("code") - state = request.query_params.get("state") - - # Exchange authorization code for tokens - token_response = await exchange_authorization_code(code) - - # Extract user info - userinfo = await get_userinfo(token_response["access_token"]) - user_id = userinfo["sub"] - - # Store refresh token (if present) - if "refresh_token" in token_response: - await token_storage.store_refresh_token( - user_id, - token_response["refresh_token"], - expires_at=token_response.get("refresh_expires_in") - ) - logger.info(f"Stored refresh token for user: {user_id}") - - # Continue MCP OAuth flow - return redirect_to_mcp_client(state, token_response) -``` - -### 2. Token Exchange Flow (Tier 2) - -#### 2.1 Capability Detection -```python -async def check_token_exchange_support(discovery_url: str) -> bool: - """Check if OIDC provider supports RFC 8693 token exchange""" - - async with httpx.AsyncClient() as client: - response = await client.get(discovery_url) - discovery = response.json() - - # Check for token exchange grant type - grant_types = discovery.get("grant_types_supported", []) - return "urn:ietf:params:oauth:grant-type:token-exchange" in grant_types -``` - -#### 2.2 Token Exchange Implementation -```python -async def exchange_for_user_token( - service_token: str, - user_id: str, - scopes: list[str] -) -> str: - """Exchange service token for user-scoped token""" - - async with httpx.AsyncClient() as client: - response = await client.post( - token_endpoint, - data={ - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "subject_token": service_token, - "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", - "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", - "resource": f"user:{user_id}", - "scope": " ".join(scopes) - }, - auth=(client_id, client_secret) - ) - - if response.status_code != 200: - logger.warning(f"Token exchange failed: {response.status_code}") - raise TokenExchangeNotSupportedError() - - return response.json()["access_token"] -``` - -#### 2.3 Service Account Token +#### 1.1 Service Account Token Acquisition ```python async def get_service_token() -> str: """Get token for MCP server's service account""" @@ -252,7 +142,117 @@ async def get_service_token() -> str: return response.json()["access_token"] ``` -### 3. Sync Worker with Tiered Authentication +**Implementation**: `KeycloakOAuthClient.get_service_account_token()` (keycloak_oauth.py:341-395) + +**Usage**: +```python +# Background worker uses service account token directly +service_token_data = await oauth_client.get_service_account_token( + scopes=["notes:read", "files:read", "calendar:read"] +) + +client = NextcloudClient.from_token( + base_url=nextcloud_host, + token=service_token_data["access_token"], + username="service-account" +) + +# All operations are performed as the service account +notes = await client.notes.list_notes() +``` + +### 2. Token Exchange with Impersonation (Tier 2) ⚠️ NOT IMPLEMENTED + +This tier is documented for completeness but is not currently implemented due to lack of provider support. + +#### 2.1 Impersonation Flow (Conceptual) + +```python +async def exchange_for_impersonated_user_token( + service_token: str, + target_user_id: str, + scopes: list[str] +) -> str: + """Exchange service token to impersonate specific user (NOT IMPLEMENTED)""" + + async with httpx.AsyncClient() as client: + response = await client.post( + token_endpoint, + data={ + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": service_token, + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "requested_subject": target_user_id, # Impersonate this user + "audience": "nextcloud", + "scope": " ".join(scopes) + }, + auth=(client_id, client_secret) + ) + + response.raise_for_status() + return response.json()["access_token"] +``` + +**Why Not Implemented**: +- Keycloak Standard V2 doesn't support `requested_subject` parameter +- Requires Legacy Keycloak V1 with preview features (not production-ready) +- Very few OIDC providers support user impersonation via token exchange + +**See**: `docs/oauth-impersonation-findings.md` for detailed investigation + +### 3. Token Exchange with Delegation (Tier 3) ✅ IMPLEMENTED + +#### 3.1 Capability Detection +```python +async def check_token_exchange_support(discovery_url: str) -> bool: + """Check if OIDC provider supports RFC 8693 token exchange""" + + async with httpx.AsyncClient() as client: + response = await client.get(discovery_url) + discovery = response.json() + + # Check for token exchange grant type + grant_types = discovery.get("grant_types_supported", []) + return "urn:ietf:params:oauth:grant-type:token-exchange" in grant_types +``` + +#### 3.2 Delegation Token Exchange +```python +async def exchange_for_user_token( + service_token: str, + target_user_id: str, + audience: str, + scopes: list[str] +) -> str: + """Exchange service token for user-scoped token via RFC 8693""" + + async with httpx.AsyncClient() as client: + response = await client.post( + token_endpoint, + data={ + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "subject_token": service_token, + "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "audience": audience, # Target resource server (e.g., "nextcloud") + "scope": " ".join(scopes) + }, + auth=(client_id, client_secret) + ) + + if response.status_code != 200: + logger.warning(f"Token exchange failed: {response.status_code}") + raise TokenExchangeNotSupportedError() + + return response.json()["access_token"] +``` + +**Implementation**: `KeycloakOAuthClient.exchange_token_for_user()` (keycloak_oauth.py:397-495) + +**Note**: Full delegation with `act` claim requires provider support that is currently very rare. Keycloak tracking: [Issue #38279](https://github.com/keycloak/keycloak/issues/38279) + +### 4. Sync Worker with Tiered Authentication ```python # nextcloud_mcp_server/sync_worker.py @@ -261,76 +261,72 @@ class VectorSyncWorker: def __init__(self): self.auth_method = None - self.token_storage = None + self.oauth_client = None # KeycloakOAuthClient or similar self.vector_service = None async def initialize(self): """Detect and configure authentication method""" - # Try Tier 1: Offline Access - if os.getenv("ENABLE_OFFLINE_ACCESS") == "true": - try: - encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") - self.token_storage = RefreshTokenStorage( - db_path="tokens.db", - encryption_key=base64.b64decode(encryption_key) - ) - self.auth_method = "offline_access" - logger.info("✓ Using offline_access authentication") - return - except Exception as e: - logger.warning(f"Offline access unavailable: {e}") + from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient - # Try Tier 2: Token Exchange try: - if await check_token_exchange_support(discovery_url): - self.auth_method = "token_exchange" - logger.info("✓ Using token exchange authentication (RFC 8693)") - return + self.oauth_client = KeycloakOAuthClient.from_env() + await self.oauth_client.discover() + + # Verify service account access (Tier 1) + service_token = await self.oauth_client.get_service_account_token() + logger.info("✓ Service account token acquired") + + # Check if token exchange is supported (Tier 2/3) + if await check_token_exchange_support(self.oauth_client.discovery_url): + self.auth_method = "token_exchange_delegation" + logger.info( + "✓ Token exchange supported (RFC 8693) - will use delegation for user-scoped operations" + ) + else: + self.auth_method = "service_account" + logger.info( + "ℹ Token exchange not supported - using service account token for all operations" + ) + except Exception as e: - logger.warning(f"Token exchange unavailable: {e}") - - # Fallback: Admin Credentials - if os.getenv("NEXTCLOUD_USERNAME") and os.getenv("NEXTCLOUD_PASSWORD"): - self.auth_method = "admin_basic" - logger.warning( - "⚠ Using admin BasicAuth authentication. " - "Consider enabling offline_access for production." - ) - return - - raise RuntimeError("No authentication method available for sync worker") + logger.error(f"Failed to initialize OAuth authentication: {e}") + raise RuntimeError( + "OAuth authentication is required for background sync. " + "Either configure OIDC_CLIENT_ID/OIDC_CLIENT_SECRET with service account enabled, " + "or use BasicAuth mode for single-user deployments." + ) from e async def get_user_client(self, user_id: str) -> NextcloudClient: """Get authenticated client for user based on auth method""" - if self.auth_method == "offline_access": - # Exchange refresh token for access token - access_token = await get_user_access_token(user_id) + if self.auth_method == "token_exchange_delegation": + # Tier 2/3: Get service token and exchange for user-scoped token + service_token_data = await self.oauth_client.get_service_account_token() + + user_token_data = await self.oauth_client.exchange_token_for_user( + subject_token=service_token_data["access_token"], + target_user_id=user_id, + audience="nextcloud", + scopes=["notes:read", "files:read", "calendar:read"] + ) + return NextcloudClient.from_token( base_url=nextcloud_host, - token=access_token, + token=user_token_data["access_token"], username=user_id ) - elif self.auth_method == "token_exchange": - # Get service token and exchange for user token - service_token = await get_service_token() - user_token = await exchange_for_user_token( - service_token, - user_id, - scopes=["notes:read", "files:read"] - ) + elif self.auth_method == "service_account": + # Tier 1: Use service account token directly (no user scoping) + service_token_data = await self.oauth_client.get_service_account_token() + return NextcloudClient.from_token( base_url=nextcloud_host, - token=user_token, - username=user_id + token=service_token_data["access_token"], + username="service-account" ) - elif self.auth_method == "admin_basic": - # Use admin credentials (fallback) - return NextcloudClient.from_env() - raise RuntimeError(f"Unknown auth method: {self.auth_method}") async def sync_user_content(self, user_id: str): @@ -370,14 +366,13 @@ class VectorSyncWorker: while True: try: # Get list of users to sync - if self.auth_method == "admin_basic": - # Admin can list all users - admin_client = NextcloudClient.from_env() - users = await admin_client.users.list_users() - user_ids = [u.id for u in users] - else: - # OAuth methods: only sync users with stored tokens - user_ids = await self.token_storage.get_all_user_ids() + # Implementation depends on how you track authenticated users + # Options: + # - Audit logs of MCP authentication events + # - MCP session history + # - Configured user list + # - If using service account with broad permissions: list all users + user_ids = await self.get_active_users() logger.info(f"Syncing content for {len(user_ids)} users") @@ -452,40 +447,59 @@ async def nc_notes_semantic_search( ### 5. Security Implementation -#### 5.1 Token Encryption +#### 5.1 Service Account Credentials Protection ```python -# Generate encryption key (store securely) -from cryptography.fernet import Fernet - -# On first setup -encryption_key = Fernet.generate_key() -# Store in environment or secrets manager +# Store OAuth client credentials securely # NEVER commit to source control -# In production -encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") # Base64-encoded Fernet key +# Option 1: Environment variables (for development) +export OIDC_CLIENT_ID="nextcloud-mcp-server" +export OIDC_CLIENT_SECRET="" + +# Option 2: Secrets manager (for production) +import boto3 +secrets = boto3.client('secretsmanager') +secret = secrets.get_secret_value(SecretId='nextcloud-mcp-oauth') +client_secret = json.loads(secret['SecretString'])['client_secret'] + +# Option 3: Encrypted storage (for self-hosted) +from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage + +storage = RefreshTokenStorage.from_env() +await storage.initialize() + +# Client credentials are encrypted at rest using Fernet +client_data = await storage.get_oauth_client() ``` -#### 5.2 Token Rotation +#### 5.2 Token Lifecycle Management ```python -async def rotate_refresh_token(user_id: str): - """Handle refresh token rotation""" +async def manage_service_token_lifecycle(): + """Cache and refresh service account tokens""" - old_refresh_token = await token_storage.get_refresh_token(user_id) + # Cache service token (avoid repeated requests) + cached_token = None + token_expires_at = 0 - # Exchange for new tokens - response = await exchange_refresh_token(old_refresh_token) + async def get_fresh_service_token() -> str: + nonlocal cached_token, token_expires_at - if "refresh_token" in response: - # Store new refresh token - await token_storage.store_refresh_token( - user_id, - response["refresh_token"], - expires_at=response.get("refresh_expires_in") - ) + now = time.time() - # Securely delete old token - await token_storage.delete_refresh_token(user_id, old_refresh_token) + # Return cached token if still valid (with 5-minute buffer) + if cached_token and now < (token_expires_at - 300): + return cached_token + + # Request new token + token_data = await oauth_client.get_service_account_token() + + cached_token = token_data["access_token"] + token_expires_at = now + token_data.get("expires_in", 3600) + + logger.info("Service account token refreshed") + return cached_token + + return get_fresh_service_token ``` #### 5.3 Audit Logging @@ -517,17 +531,16 @@ async def audit_log( #### 6.1 Environment Variables ```bash -# Tier 1: Offline Access -ENABLE_OFFLINE_ACCESS=true -TOKEN_ENCRYPTION_KEY= -TOKEN_STORAGE_DB=/app/data/tokens.db +# OAuth Configuration (Required for Background Sync in OAuth Mode) +# Requires external OIDC provider with client_credentials support +OIDC_DISCOVERY_URL=http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration +OIDC_CLIENT_ID=nextcloud-mcp-server +OIDC_CLIENT_SECRET= +NEXTCLOUD_HOST=http://app:80 -# Tier 2: Token Exchange (auto-detected) -# No configuration needed - detected via OIDC discovery - -# Tier 3: Admin Fallback -NEXTCLOUD_USERNAME=sync-bot -NEXTCLOUD_PASSWORD= +# Tier selection is automatic: +# - Tier 1 (service_account): Always available if client has service account enabled +# - Tier 2/3 (token_exchange): Used if provider supports RFC 8693 token exchange # Vector Database QDRANT_URL=http://qdrant:6333 @@ -536,9 +549,31 @@ QDRANT_API_KEY= # Sync Configuration SYNC_INTERVAL_SECONDS=300 SYNC_BATCH_SIZE=100 + +# Note: For BasicAuth mode (single-user), background sync uses NEXTCLOUD_USERNAME/NEXTCLOUD_PASSWORD +# This ADR focuses on OAuth mode only ``` -#### 6.2 Docker Compose +#### 6.2 Keycloak Configuration (for Token Exchange) + +**Client Settings** (`nextcloud-mcp-server`): +```json +{ + "clientId": "nextcloud-mcp-server", + "serviceAccountsEnabled": true, + "authorizationServicesEnabled": false, + "attributes": { + "token.exchange.grant.enabled": "true", + "client.token.exchange.standard.enabled": "true" + } +} +``` + +**Service Account Roles**: +- Assign appropriate Nextcloud roles/scopes to the service account +- Configure token exchange permissions + +#### 6.3 Docker Compose ```yaml services: mcp-sync: @@ -546,20 +581,24 @@ services: command: ["python", "-m", "nextcloud_mcp_server.sync_worker"] environment: - NEXTCLOUD_HOST=http://app:80 - - ENABLE_OFFLINE_ACCESS=true - - TOKEN_ENCRYPTION_KEY=${TOKEN_ENCRYPTION_KEY} + + # External OIDC provider (Keycloak) + - OIDC_DISCOVERY_URL=http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration + - OIDC_CLIENT_ID=nextcloud-mcp-server + - OIDC_CLIENT_SECRET=${OIDC_CLIENT_SECRET} + + # Vector database - QDRANT_URL=http://qdrant:6333 - # OAuth client credentials (for token refresh) - - NEXTCLOUD_OIDC_CLIENT_ID=${NEXTCLOUD_OIDC_CLIENT_ID} - - NEXTCLOUD_OIDC_CLIENT_SECRET=${NEXTCLOUD_OIDC_CLIENT_SECRET} + - QDRANT_API_KEY=${QDRANT_API_KEY} volumes: - - sync-tokens:/app/data + - sync-data:/app/data # For OAuth client credential storage depends_on: - app + - keycloak - qdrant volumes: - sync-tokens: # Persistent storage for encrypted tokens + sync-data: # Persistent storage for encrypted OAuth client credentials ``` ## Consequences @@ -641,32 +680,37 @@ volumes: #### Security Best Practices -1. **Token Encryption Key Management** +1. **OAuth Client Secret Management** ```bash - # Generate secure key - python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" - # Store in secrets manager (Vault, AWS Secrets Manager, etc.) # Or use environment variable with restricted permissions + + # For self-hosted: Use encrypted storage + # OAuth client credentials stored in SQLite with Fernet encryption + # Encryption key: TOKEN_ENCRYPTION_KEY environment variable + + # Generate encryption key: + python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" ``` -2. **Token Storage Permissions** +2. **Service Account Token Lifecycle** + - Cache service tokens to minimize requests (with expiry buffer) + - Automatically refresh expired tokens + - Use short-lived tokens (provider default, typically 1 hour) + - Monitor token request rates and failures + +3. **Database Permissions (for Client Credential Storage)** ```bash # Restrict database file permissions chmod 600 /app/data/tokens.db chown mcp-server:mcp-server /app/data/tokens.db ``` -3. **Token Rotation Schedule** - - Refresh access tokens every 5 minutes (or token expiry) - - Rotate refresh tokens on each use (if provider supports) - - Revoke tokens on user logout/deauthorization - 4. **Monitoring and Alerting** - - Alert on token refresh failures + - Alert on token exchange failures - Monitor for unusual access patterns - - Track token age and rotation - - Audit sync operations per user + - Track service account token usage + - Audit sync operations per user (if delegation supported) ### Future Enhancements