feat: Add Keycloak OAuth provider support with refresh token storage

Implements Keycloak as an external OIDC provider following ADR-002
architecture for background job authentication using offline_access.

## Features

- Keycloak OAuth provider with PKCE and offline_access support
- Refresh token storage with Fernet encryption
- Token verifier for both JWT and opaque tokens
- Multi-client validation (realm-level trust)
- Sample configuration for Keycloak integration

## Implementation

### OAuth Provider (keycloak_oauth.py)
- Authorization Code Flow with PKCE
- Refresh token exchange
- OIDC discovery endpoint support
- Token validation with JWKS

### Token Storage (refresh_token_storage.py)
- Encrypted storage using Fernet symmetric encryption
- SQLite backend for persistence
- Token rotation support
- Per-user token management

### Token Verifier Updates
- Support both JWT (self-encoded) and opaque tokens
- JWKS-based JWT signature verification
- Introspection endpoint fallback for opaque tokens
- Scope extraction from both token types

### Configuration
- .env.keycloak.sample: Example configuration with Keycloak URLs
- docs/keycloak-multi-client-validation.md: Realm-level validation documentation
- app-hooks/post-installation/10-install-user_oidc-app.sh: Updated dependencies

## Architecture Notes

- MCP Server is a protected resource (requires OAuth)
- MCP Client initiates OAuth flow and shares refresh tokens
- Refresh tokens enable background operations without admin credentials
- Supports future token exchange delegation when Keycloak implements it

## References

- ADR-002: Vector Database Background Sync Authentication
- RFC 6749: OAuth 2.0 (offline_access, refresh tokens)
- RFC 7517: JSON Web Key (JWK)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-10-31 03:03:58 +01:00
parent 529dc4616b
commit f34366a260
8 changed files with 1484 additions and 53 deletions
+141
View File
@@ -0,0 +1,141 @@
# Keycloak OAuth Configuration for Nextcloud MCP Server
#
# This configuration uses Keycloak as the OAuth/OIDC identity provider
# while still accessing Nextcloud APIs. Nextcloud's user_oidc app validates
# Keycloak bearer tokens and provisions users automatically.
#
# Architecture: Client → Keycloak (OAuth) → MCP Server → Nextcloud (user_oidc validates) → APIs
#
# This enables ADR-002 authentication patterns without admin credentials!
# ==============================================================================
# OAUTH PROVIDER SELECTION
# ==============================================================================
# OAuth provider: "keycloak" or "nextcloud" (default)
OAUTH_PROVIDER=keycloak
# ==============================================================================
# KEYCLOAK CONFIGURATION
# ==============================================================================
# Keycloak base URL (accessible from MCP server container)
KEYCLOAK_URL=http://keycloak:8080
# Keycloak realm name
KEYCLOAK_REALM=nextcloud-mcp
# OAuth client credentials (from Keycloak realm export or manual configuration)
KEYCLOAK_CLIENT_ID=nextcloud-mcp-server
KEYCLOAK_CLIENT_SECRET=mcp-secret-change-in-production
# OIDC discovery URL (auto-constructed from URL + realm, or specify explicitly)
KEYCLOAK_DISCOVERY_URL=http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration
# ==============================================================================
# NEXTCLOUD CONFIGURATION
# ==============================================================================
# Nextcloud URL (accessible from MCP server container)
# Used for API access - Keycloak tokens are validated by user_oidc app
NEXTCLOUD_HOST=http://app:80
# MCP server URL (for OAuth redirect URIs)
# This is the publicly accessible URL that OAuth clients connect to
NEXTCLOUD_MCP_SERVER_URL=http://localhost:8002
# Public Keycloak issuer URL (accessible from OAuth clients)
# If clients access Keycloak via a different URL than the internal one,
# set this to the public URL for OAuth flows
NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8888
# ==============================================================================
# REFRESH TOKEN STORAGE (ADR-002 Tier 1: Offline Access)
# ==============================================================================
# Enable offline_access scope to get refresh tokens
ENABLE_OFFLINE_ACCESS=true
# Encryption key for storing refresh tokens (generate with instructions below)
# IMPORTANT: Keep this secret! Tokens are encrypted at rest using this key.
#
# Generate a key:
# python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
#
# Example (DO NOT use this in production!):
# TOKEN_ENCRYPTION_KEY=your-base64-encoded-fernet-key-here
# Path to SQLite database for token storage
TOKEN_STORAGE_DB=/app/data/tokens.db
# OAuth client storage (for MCP internal use)
NEXTCLOUD_OIDC_CLIENT_STORAGE=/app/.oauth/keycloak_oauth_client.json
# ==============================================================================
# DOCKER COMPOSE NOTES
# ==============================================================================
# When running via docker-compose, the mcp-keycloak service is pre-configured
# with these environment variables. See docker-compose.yml for the full config.
#
# Start services:
# docker-compose up -d keycloak app mcp-keycloak
#
# View logs:
# docker-compose logs -f mcp-keycloak
#
# Check Keycloak realm:
# curl http://localhost:8888/realms/nextcloud-mcp/.well-known/openid-configuration
#
# Check user_oidc provider:
# docker compose exec app php occ user_oidc:provider keycloak
# ==============================================================================
# KEYCLOAK SETUP VERIFICATION
# ==============================================================================
# 1. Verify Keycloak is running and realm is imported:
# curl http://localhost:8888/realms/nextcloud-mcp/.well-known/openid-configuration
#
# 2. Verify Nextcloud user_oidc provider is configured:
# docker compose exec app php occ user_oidc:provider keycloak
#
# 3. Test OAuth flow manually:
# - Get token from Keycloak:
# curl -X POST "http://localhost:8888/realms/nextcloud-mcp/protocol/openid-connect/token" \
# -d "grant_type=password" \
# -d "client_id=nextcloud-mcp-server" \
# -d "client_secret=mcp-secret-change-in-production" \
# -d "username=admin" \
# -d "password=admin" \
# -d "scope=openid profile email offline_access"
#
# - Use token with Nextcloud API:
# curl -H "Authorization: Bearer <access_token>" \
# http://localhost:8080/ocs/v2.php/cloud/capabilities
#
# 4. Connect MCP client to server:
# - Point your MCP client to http://localhost:8002
# - Complete OAuth flow via Keycloak (credentials: admin/admin)
# - Client should receive access token and be able to call MCP tools
# ==============================================================================
# TROUBLESHOOTING
# ==============================================================================
# If OAuth flow fails:
# - Check that Keycloak is accessible: curl http://localhost:8888
# - Check that user_oidc provider is configured: docker compose exec app php occ user_oidc:provider keycloak
# - Check MCP server logs: docker-compose logs mcp-keycloak
# - Verify redirect URIs match in Keycloak client configuration
#
# If token validation fails:
# - Verify user_oidc has bearer validation enabled (--check-bearer=1)
# - Check Nextcloud logs: docker compose exec app tail -f /var/www/html/data/nextcloud.log
# - Verify Keycloak discovery URL is accessible from Nextcloud container:
# docker compose exec app curl http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration
#
# If offline_access/refresh tokens not working:
# - Verify TOKEN_ENCRYPTION_KEY is set and valid
# - Check token storage database: ls -lah /app/data/tokens.db (inside container)
# - Check that offline_access scope is requested in realm configuration
+119
View File
@@ -395,6 +395,125 @@ uv run pytest -m oauth -v
- Playwright tests run in CI/CD environments
- Use Firefox browser in CI: `--browser firefox` (Chromium may have issues with localhost redirects)
#### Keycloak OAuth/OIDC Testing (ADR-002 Integration)
The MCP server supports using **Keycloak as an external OAuth/OIDC identity provider** instead of Nextcloud's built-in OIDC app. This validates the ADR-002 architecture for background jobs and external identity providers.
**Architecture:**
```
MCP Client → Keycloak (OAuth) → MCP Server → Nextcloud user_oidc (validates token) → APIs
```
**Key Benefits:**
-**No admin credentials needed** - All API access uses user's Keycloak token
-**External identity provider** - Demonstrates integration with enterprise IdPs
-**ADR-002 validation** - Tests offline_access and refresh token patterns
-**User provisioning** - Nextcloud automatically provisions users from Keycloak
**Setup and Testing:**
```bash
# 1. Start Keycloak and MCP server with Keycloak OAuth
docker-compose up -d keycloak app mcp-keycloak
# 2. Verify Keycloak realm is available
curl http://localhost:8888/realms/nextcloud-mcp/.well-known/openid-configuration
# 3. Verify user_oidc provider is configured
docker compose exec app php occ user_oidc:provider keycloak
# 4. Generate encryption key for refresh token storage (optional, for ADR-002 Tier 1)
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
# Set in environment: export TOKEN_ENCRYPTION_KEY='<key>'
# 5. Test OAuth flow manually
# Get token from Keycloak:
TOKEN=$(curl -s -X POST "http://localhost:8888/realms/nextcloud-mcp/protocol/openid-connect/token" \
-d "grant_type=password" \
-d "client_id=mcp-client" \
-d "client_secret=mcp-secret-change-in-production" \
-d "username=admin" \
-d "password=admin" \
-d "scope=openid profile email offline_access" | jq -r .access_token)
# Use token with Nextcloud API (validated by user_oidc):
curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/ocs/v2.php/cloud/capabilities
# 6. Connect MCP client
# Point client to: http://localhost:8002
# Complete OAuth flow using Keycloak credentials: admin/admin
```
**Three MCP Server Containers:**
- **`mcp`** (port 8000): Basic auth with admin credentials
- **`mcp-oauth`** (port 8001): Nextcloud OIDC provider (JWT tokens)
- **`mcp-keycloak`** (port 8002): Keycloak OIDC provider (external IdP)
**Keycloak Configuration:**
- **Realm**: `nextcloud-mcp` (auto-imported from `keycloak/realm-export.json`)
- **Client**: `mcp-client` (pre-configured with PKCE, offline_access)
- **Admin user**: `admin/admin` (created in realm export)
- **Redirect URIs**: `http://localhost:*/callback`, `http://127.0.0.1:*/callback`
**Environment Variables** (see `.env.keycloak.sample`):
```bash
OAUTH_PROVIDER=keycloak # Use Keycloak instead of Nextcloud
KEYCLOAK_URL=http://keycloak:8080 # Keycloak base URL
KEYCLOAK_REALM=nextcloud-mcp # Realm name
KEYCLOAK_CLIENT_ID=mcp-client # OAuth client ID
KEYCLOAK_CLIENT_SECRET=mcp-secret-... # OAuth client secret
KEYCLOAK_DISCOVERY_URL=http://... # OIDC discovery URL
NEXTCLOUD_HOST=http://app:80 # Nextcloud API (token validation)
ENABLE_OFFLINE_ACCESS=true # Enable refresh tokens (ADR-002)
TOKEN_ENCRYPTION_KEY=<fernet-key> # Encrypt refresh tokens
```
**Nextcloud user_oidc Configuration:**
The `user_oidc` app is automatically configured by `app-hooks/post-installation/15-setup-keycloak-provider.sh`:
```bash
# Configured with:
--check-bearer=1 # Validate bearer tokens
--bearer-provisioning=1 # Auto-provision users
--unique-uid=1 # Hash user IDs
--scope="openid profile email offline_access"
```
**Troubleshooting:**
```bash
# Check Keycloak is running
docker-compose ps keycloak
docker-compose logs keycloak
# Check user_oidc provider configuration
docker compose exec app php occ user_oidc:provider keycloak
# Check MCP server logs
docker-compose logs -f mcp-keycloak
# Check Nextcloud logs for token validation
docker compose exec app tail -f /var/www/html/data/nextcloud.log
# Verify Keycloak is accessible from Nextcloud container
docker compose exec app curl http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration
```
**ADR-002 Offline Access Testing:**
The Keycloak integration enables testing ADR-002 Tier 1 (offline access with refresh tokens):
1. **Refresh token storage**: Tokens stored encrypted in SQLite (`/app/data/tokens.db`)
2. **Token refresh**: Access tokens refreshed automatically when expired
3. **Background workers**: Can access APIs using stored refresh tokens
4. **No admin credentials**: All operations use user's OAuth tokens
See `docs/ADR-002-vector-sync-authentication.md` for architectural details.
**Audience Validation:**
Tokens include `aud: ["mcp-server", "nextcloud"]` claims for proper security:
- MCP server validates tokens are intended for it
- Nextcloud validates tokens include it as audience
- Prevents token misuse across services
See `docs/audience-validation-setup.md` for configuration details and `docs/keycloak-multi-client-validation.md` for realm-level validation behavior.
### Configuration Files
- **`pyproject.toml`** - Python project configuration using uv for dependency management
@@ -9,5 +9,6 @@ php /var/www/html/occ app:enable user_oidc
# Configure user_oidc to validate bearer tokens from the OIDC Identity Provider
php /var/www/html/occ config:system:set user_oidc oidc_provider_bearer_validation --value=true --type=boolean
php /var/www/html/occ config:system:set user_oidc httpclient.allowselfsigned --value=true --type=boolean
patch -u /var/www/html/custom_apps/user_oidc/lib/User/Backend.php -i /docker-entrypoint-hooks.d/post-installation/0001-Fix-Bearer-token-authentication-causing-session-logo.patch
+298
View File
@@ -0,0 +1,298 @@
# Keycloak Multi-Client Token Validation
## Executive Summary
**Question**: Can Nextcloud's `user_oidc` app (configured with client A) validate bearer tokens from client B in the same Keycloak realm?
**Answer**: ✅ **YES** - user_oidc validates tokens at the **realm level**, not per-client.
## Test Results
### Setup
- **Keycloak Realm**: `nextcloud-mcp`
- **Provider in user_oidc**: Configured with `mcp-client` credentials
- **Test**: Get token from `test-client-b`, validate via Nextcloud API
### Result
```bash
# Token from test-client-b (client B)
$ TOKEN=$(curl -X POST ".../token" -d "client_id=test-client-b" ...)
# Validated successfully by Nextcloud (configured with mcp-client = client A)
$ curl -H "Authorization: Bearer $TOKEN" "http://nextcloud/ocs/.../capabilities"
HTTP/1.1 200 OK
{"ocs":{"meta":{"status":"ok"}}}
```
**Token from client B validated successfully!**
## How It Works
### Token Structure from Keycloak
**Access Token** (password grant):
```json
{
"iss": "http://keycloak/realms/nextcloud-mcp",
"azp": "test-client-b", // Authorized party = client B
"typ": "Bearer",
"exp": 1234567890,
// NO "sub" claim
// NO "aud" claim
"scope": "openid profile email"
}
```
**ID Token** (for comparison):
```json
{
"iss": "http://keycloak/realms/nextcloud-mcp",
"aud": "test-client-b", // Audience = client B
"sub": "923da741-7ebe-4cf9-baf2-37fcf2ecc95d",
"azp": "test-client-b"
}
```
**Key Observation**: Access tokens from Keycloak's password grant **do not contain** `sub` or `aud` claims!
### Validation Flow in user_oidc
From source code analysis (`~/Software/user_oidc/lib/User/Backend.php`):
```
1. Request with Bearer token arrives
2. user_oidc loops through providers with checkBearer=true
3. Try SelfEncodedValidator (JWT/JWKS validation):
- Validates JWT signature using Keycloak's JWKS
- Tries to extract 'sub' claim → FAILS (no sub in access token)
4. Fallback to UserInfoValidator:
- Calls Keycloak userinfo endpoint with bearer token
- Keycloak validates token server-side
- Returns userinfo with 'sub' claim
→ SUCCESS!
5. User identified, request authorized
```
### Why This Works
**Realm-Level Trust**:
- Keycloak's userinfo endpoint validates ANY valid token from the realm
- It doesn't matter which client issued the token
- The token is validated by Keycloak itself (via userinfo call)
**No Audience Check**:
- Access tokens have no `aud` claim
- SelfEncodedValidator's audience check is bypassed (no audience to validate)
- UserInfoValidator doesn't check audience (delegates to Keycloak)
**Client Credentials Role**:
- The configured `client_id`/`client_secret` in user_oidc are **NOT used** for bearer token validation
- They're only used for OAuth login flows (authorization code exchange)
- Userinfo endpoint doesn't require client authentication
## Source Code Evidence
### SelfEncodedValidator - Audience Check
```php
// ~/Software/user_oidc/lib/User/Validator/SelfEncodedValidator.php:64-76
$checkAudience = !isset($oidcSystemConfig['selfencoded_bearer_validation_audience_check'])
|| !in_array($oidcSystemConfig['selfencoded_bearer_validation_audience_check'],
[false, 'false', 0, '0'], true);
if ($checkAudience) {
$tokenAudience = $payload->aud ?? null;
if ((is_string($tokenAudience) && $tokenAudience !== $providerClientId)
|| (is_array($tokenAudience) && !in_array($providerClientId, $tokenAudience))) {
$this->logger->debug('Audience does not match client ID');
return null; // REJECT
}
}
// If $tokenAudience is null (our case), both conditions are false → validation continues
```
### UserInfoValidator - No Client Auth
```php
// ~/Software/user_oidc/lib/Service/OIDCService.php:28-45
public function userinfo(Provider $provider, string $accessToken): array {
$url = $this->discoveryService->obtainDiscovery($provider)['userinfo_endpoint'];
// Bearer token passed directly - NO client credentials used
$options = ['headers' => ['Authorization' => 'Bearer ' . $accessToken]];
return json_decode($this->clientService->get($url, [], $options), true);
}
```
### Keycloak Userinfo Response
```bash
$ curl -H "Authorization: Bearer $TOKEN_FROM_CLIENT_B" \
"http://keycloak/realms/nextcloud-mcp/protocol/openid-connect/userinfo"
{
"sub": "923da741-7ebe-4cf9-baf2-37fcf2ecc95d",
"email_verified": true,
"name": "Admin User",
"email": "admin@example.com"
}
```
Keycloak validates the token **regardless of which client issued it**, as long as it's from the same realm.
## Implications for Your Architecture
### Desired Architecture
```
MCP Server (client A) ← DCR with Keycloak
MCP Clients (client B, C, D...) ← DCR with Keycloak
Nextcloud user_oidc ← configured once with any client from realm
```
### What This Means
**You can do exactly what you want!**
1. **Configure user_oidc once** with any client from the Keycloak realm (e.g., a dedicated `nextcloud-validator` client)
2. **MCP Server registers via DCR** as a unique client (e.g., `mcp-server-abc123`)
- Gets its own client credentials
- Issues tokens with `azp: "mcp-server-abc123"`
- These tokens will be validated by user_oidc!
3. **MCP Clients also use DCR** (each gets unique identity)
- Client A: `client-123`
- Client B: `client-456`
- Tokens from all clients validated by user_oidc!
4. **Tokens from ANY client** in the realm can access Nextcloud APIs
- user_oidc validates via Keycloak userinfo endpoint
- Realm-level trust (not per-client)
### Configuration
**Step 1: Configure user_oidc Provider**
```bash
php occ user_oidc:provider keycloak-realm \
--clientid="nextcloud-validator" \
--clientsecret="***" \
--discoveryuri="https://keycloak/realms/my-realm/.well-known/openid-configuration" \
--check-bearer=1 \
--bearer-provisioning=1
```
**Step 2: MCP Server Registers with Keycloak (DCR)**
```python
# MCP server startup
registration_response = await keycloak_client.register_client(
client_name="MCP Server Instance",
redirect_uris=["http://mcp-server/oauth/callback"]
)
# Store: client_id, client_secret
```
**Step 3: Issue Tokens to Users**
- Users authenticate via Keycloak
- MCP server gets tokens issued to its `client_id`
- These tokens validated by user_oidc!
**Step 4: Background Operations (ADR-002)**
- Store user refresh tokens (encrypted)
- Refresh access tokens as needed
- All tokens validated by user_oidc regardless of issuing client
## Important Notes
### Token Grant Types Matter
**Password Grant** (what we tested):
- Access tokens have NO `sub` or `aud`
- Forces validation via userinfo endpoint
- Works with any client in realm
**Authorization Code Grant** (production):
- Tokens MAY include `aud` claim
- Need to verify behavior with real OAuth flows
- May require disabling audience check
### Recommendation for Production
**Option 1: Disable Audience Check (Simplest)**
```php
// config.php
'user_oidc' => [
'selfencoded_bearer_validation_audience_check' => false,
],
```
**Option 2: Rely on UserInfo Validation**
```php
// config.php
'user_oidc' => [
'userinfo_bearer_validation' => true, // Enable userinfo validation
],
```
**Option 3: Configure Keycloak to Not Include aud in Access Tokens**
- Keep default behavior (works as tested)
- Tokens validated via userinfo endpoint
## Testing Script
```bash
#!/bin/bash
# Test multi-client validation
# Create second client in Keycloak
curl -X POST "http://keycloak/admin/realms/my-realm/clients" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-d '{
"clientId": "test-client-b",
"secret": "test-secret-b",
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true
}'
# Get token from client B
TOKEN=$(curl -X POST "http://keycloak/realms/my-realm/protocol/openid-connect/token" \
-d "grant_type=password" \
-d "client_id=test-client-b" \
-d "client_secret=test-secret-b" \
-d "username=testuser" \
-d "password=password" | jq -r '.access_token')
# Test with Nextcloud (configured with client A)
curl -H "Authorization: Bearer $TOKEN" \
"http://nextcloud/ocs/v2.php/cloud/capabilities"
# Should return 200 OK!
```
## Conclusion
**Your proposed architecture is fully supported!**
- user_oidc configured once with ANY client from Keycloak realm
- MCP server registers dynamically via DCR
- MCP clients also register dynamically
- ALL tokens from realm validated successfully
- No per-client configuration needed
The key insight: **user_oidc validates tokens at the realm level** (via Keycloak's userinfo endpoint), not at the client level.
## References
- Source code: `~/Software/user_oidc/lib/User/Backend.php:260-343`
- SelfEncodedValidator: `~/Software/user_oidc/lib/User/Validator/SelfEncodedValidator.php`
- UserInfoValidator: `~/Software/user_oidc/lib/User/Validator/UserInfoValidator.php`
- Test setup: `docker-compose.yml` (mcp-keycloak service)
- Configuration: `.env.keycloak.sample`
+141 -52
View File
@@ -440,6 +440,10 @@ async def setup_oauth_config():
"""
Setup OAuth configuration by performing OIDC discovery and client registration.
Supports two OAuth providers (via OAUTH_PROVIDER environment variable):
- "nextcloud" (default): Nextcloud OIDC app as both IdP and API server
- "keycloak": Keycloak as IdP, Nextcloud user_oidc validates tokens
This is done synchronously before FastMCP initialization because FastMCP
requires token_verifier at construction time.
@@ -453,70 +457,155 @@ async def setup_oauth_config():
)
nextcloud_host = nextcloud_host.rstrip("/")
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
logger.info(f"Performing OIDC discovery: {discovery_url}")
# Determine OAuth provider
oauth_provider = os.getenv("OAUTH_PROVIDER", "nextcloud").lower()
logger.info(f"OAuth provider: {oauth_provider}")
# Fetch OIDC discovery
async with httpx.AsyncClient() as client:
response = await client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
if oauth_provider == "keycloak":
# Keycloak mode: Use Keycloak for OAuth, Nextcloud for token validation
logger.info("Using Keycloak as OAuth identity provider")
logger.info("OIDC discovery successful")
keycloak_discovery_url = os.getenv("KEYCLOAK_DISCOVERY_URL")
if not keycloak_discovery_url:
raise ValueError(
"KEYCLOAK_DISCOVERY_URL environment variable is required for Keycloak mode. "
"Example: http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration"
)
# Validate PKCE support
validate_pkce_support(discovery, discovery_url)
logger.info(f"Performing OIDC discovery: {keycloak_discovery_url}")
# Extract endpoints
issuer = discovery["issuer"]
userinfo_uri = discovery["userinfo_endpoint"]
jwks_uri = discovery.get("jwks_uri")
introspection_uri = discovery.get("introspection_endpoint")
registration_endpoint = discovery.get("registration_endpoint")
# Fetch Keycloak OIDC discovery
async with httpx.AsyncClient() as client:
response = await client.get(keycloak_discovery_url)
response.raise_for_status()
discovery = response.json()
logger.info("OIDC endpoints discovered:")
logger.info(f" Issuer: {issuer}")
logger.info(f" Userinfo: {userinfo_uri}")
logger.info(f" JWKS: {jwks_uri}")
if introspection_uri:
logger.info(f" Introspection: {introspection_uri}")
logger.info("Keycloak OIDC discovery successful")
# Allow override of public issuer URL for both client configuration and JWT validation
# When clients access Nextcloud via a public URL (e.g., http://127.0.0.1:8080),
# the OIDC app issues JWT tokens with that public URL in the 'iss' claim,
# even though the MCP server accesses Nextcloud via an internal URL (e.g., http://app).
# Therefore, we must validate JWT tokens against the public issuer, not the internal one.
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
public_issuer = public_issuer.rstrip("/")
logger.info(
f"Using public issuer URL for clients and JWT validation: {public_issuer}"
# Validate PKCE support
validate_pkce_support(discovery, keycloak_discovery_url)
# Extract Keycloak endpoints (for OAuth flows)
issuer = discovery["issuer"]
keycloak_userinfo_uri = discovery["userinfo_endpoint"]
keycloak_jwks_uri = discovery.get("jwks_uri")
logger.info("Keycloak OIDC endpoints discovered:")
logger.info(f" Issuer: {issuer}")
logger.info(f" Userinfo: {keycloak_userinfo_uri}")
logger.info(f" JWKS: {keycloak_jwks_uri}")
# Get static client credentials from environment
client_id = os.getenv("KEYCLOAK_CLIENT_ID")
client_secret = os.getenv("KEYCLOAK_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError(
"KEYCLOAK_CLIENT_ID and KEYCLOAK_CLIENT_SECRET environment variables "
"are required for Keycloak mode"
)
logger.info(f"Using Keycloak client: {client_id}")
# Token validation: Use Nextcloud's userinfo endpoint
# Nextcloud's user_oidc app validates Keycloak tokens and provisions users
nextcloud_userinfo_uri = f"{nextcloud_host}/apps/user_oidc/userinfo"
# Override issuer for public access if needed
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
public_issuer = public_issuer.rstrip("/")
logger.info(
f"Using public issuer URL for client configuration: {public_issuer}"
)
issuer = public_issuer
# Create token verifier pointing to Nextcloud (validates via user_oidc)
token_verifier = NextcloudTokenVerifier(
nextcloud_host=nextcloud_host,
userinfo_uri=nextcloud_userinfo_uri, # Nextcloud validates Keycloak tokens
jwks_uri=keycloak_jwks_uri, # Keycloak's JWKS for JWT validation
issuer=issuer, # Keycloak issuer
introspection_uri=None, # Not used in Keycloak mode
client_id=client_id,
client_secret=client_secret,
)
# Use public issuer for both client configuration AND JWT validation
issuer = public_issuer
jwt_validation_issuer = public_issuer
logger.info(
"✓ Keycloak OAuth configured - tokens validated by Nextcloud user_oidc app"
)
else:
# Use discovered issuer for both
jwt_validation_issuer = issuer
# Nextcloud mode (default): Use Nextcloud for both OAuth and validation
logger.info("Using Nextcloud OIDC app as OAuth provider")
# Load OAuth client credentials
client_id, client_secret = await load_oauth_client_credentials(
nextcloud_host=nextcloud_host, registration_endpoint=registration_endpoint
)
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
# Create token verifier with JWT support and introspection
token_verifier = NextcloudTokenVerifier(
nextcloud_host=nextcloud_host,
userinfo_uri=userinfo_uri,
jwks_uri=jwks_uri, # Enable JWT verification if available
issuer=jwt_validation_issuer, # Use original issuer for JWT validation
introspection_uri=introspection_uri, # Enable introspection for opaque tokens
client_id=client_id,
client_secret=client_secret,
)
logger.info(f"Performing OIDC discovery: {discovery_url}")
# Create auth settings
# Fetch OIDC discovery
async with httpx.AsyncClient() as client:
response = await client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
logger.info("OIDC discovery successful")
# Validate PKCE support
validate_pkce_support(discovery, discovery_url)
# Extract endpoints
issuer = discovery["issuer"]
userinfo_uri = discovery["userinfo_endpoint"]
jwks_uri = discovery.get("jwks_uri")
introspection_uri = discovery.get("introspection_endpoint")
registration_endpoint = discovery.get("registration_endpoint")
logger.info("OIDC endpoints discovered:")
logger.info(f" Issuer: {issuer}")
logger.info(f" Userinfo: {userinfo_uri}")
logger.info(f" JWKS: {jwks_uri}")
if introspection_uri:
logger.info(f" Introspection: {introspection_uri}")
# Allow override of public issuer URL for both client configuration and JWT validation
# When clients access Nextcloud via a public URL (e.g., http://127.0.0.1:8080),
# the OIDC app issues JWT tokens with that public URL in the 'iss' claim,
# even though the MCP server accesses Nextcloud via an internal URL (e.g., http://app).
# Therefore, we must validate JWT tokens against the public issuer, not the internal one.
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
public_issuer = public_issuer.rstrip("/")
logger.info(
f"Using public issuer URL for clients and JWT validation: {public_issuer}"
)
# Use public issuer for both client configuration AND JWT validation
issuer = public_issuer
jwt_validation_issuer = public_issuer
else:
# Use discovered issuer for both
jwt_validation_issuer = issuer
# Load OAuth client credentials (dynamic registration or environment)
client_id, client_secret = await load_oauth_client_credentials(
nextcloud_host=nextcloud_host, registration_endpoint=registration_endpoint
)
# Create token verifier with JWT support and introspection
token_verifier = NextcloudTokenVerifier(
nextcloud_host=nextcloud_host,
userinfo_uri=userinfo_uri,
jwks_uri=jwks_uri, # Enable JWT verification if available
issuer=jwt_validation_issuer, # Use original issuer for JWT validation
introspection_uri=introspection_uri, # Enable introspection for opaque tokens
client_id=client_id,
client_secret=client_secret,
)
logger.info("✓ Nextcloud OAuth configured")
# Create auth settings (same for both modes)
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
# Note: We don't set required_scopes here anymore.
+383
View File
@@ -0,0 +1,383 @@
"""
Keycloak OAuth 2.0 / OIDC Client
Handles OAuth flows with Keycloak as the identity provider, including:
- OIDC Discovery
- Authorization Code Flow with PKCE
- Token refresh using refresh tokens (ADR-002 Tier 1)
- Integration with RefreshTokenStorage
"""
import hashlib
import logging
import os
import secrets
from typing import Optional
from urllib.parse import urlencode, urlparse
import httpx
from nextcloud_mcp_server.auth.client_registration import generate_state, verify_state
logger = logging.getLogger(__name__)
class KeycloakOAuthClient:
"""OAuth 2.0 client for Keycloak integration"""
def __init__(
self,
keycloak_url: str,
realm: str,
client_id: str,
client_secret: str,
redirect_uri: str,
scopes: Optional[list[str]] = None,
):
"""
Initialize Keycloak OAuth client.
Args:
keycloak_url: Base URL of Keycloak (e.g., http://keycloak:8080)
realm: Keycloak realm name
client_id: OAuth client ID
client_secret: OAuth client secret
redirect_uri: OAuth redirect URI
scopes: List of scopes to request (default: openid, profile, email, offline_access)
"""
self.keycloak_url = keycloak_url.rstrip("/")
self.realm = realm
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.scopes = scopes or ["openid", "profile", "email", "offline_access"]
# Discovered endpoints (populated by discover())
self.authorization_endpoint: Optional[str] = None
self.token_endpoint: Optional[str] = None
self.userinfo_endpoint: Optional[str] = None
self.jwks_uri: Optional[str] = None
self.end_session_endpoint: Optional[str] = None
self._http_client: Optional[httpx.AsyncClient] = None
@classmethod
def from_env(cls) -> "KeycloakOAuthClient":
"""
Create client from environment variables.
Environment variables:
KEYCLOAK_URL: Keycloak base URL
KEYCLOAK_REALM: Realm name
KEYCLOAK_CLIENT_ID: Client ID
KEYCLOAK_CLIENT_SECRET: Client secret
NEXTCLOUD_MCP_SERVER_URL: MCP server URL (for redirect URI)
Returns:
KeycloakOAuthClient instance
Raises:
ValueError: If required environment variables are missing
"""
keycloak_url = os.getenv("KEYCLOAK_URL")
realm = os.getenv("KEYCLOAK_REALM")
client_id = os.getenv("KEYCLOAK_CLIENT_ID")
client_secret = os.getenv("KEYCLOAK_CLIENT_SECRET")
server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
if not all([keycloak_url, realm, client_id, client_secret]):
raise ValueError(
"Missing required environment variables: "
"KEYCLOAK_URL, KEYCLOAK_REALM, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET"
)
# Parse server URL to construct redirect URI
parsed_url = urlparse(server_url)
redirect_uri = f"{parsed_url.scheme}://{parsed_url.netloc}/oauth/callback"
return cls(
keycloak_url=keycloak_url,
realm=realm,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
)
async def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client"""
if self._http_client is None:
self._http_client = httpx.AsyncClient(timeout=30.0)
return self._http_client
async def close(self) -> None:
"""Close HTTP client"""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
async def discover(self) -> None:
"""
Perform OIDC discovery to get endpoint URLs.
Raises:
httpx.HTTPError: If discovery fails
"""
discovery_url = (
f"{self.keycloak_url}/realms/{self.realm}/.well-known/openid-configuration"
)
logger.info(f"Discovering Keycloak endpoints at {discovery_url}")
client = await self._get_http_client()
response = await client.get(discovery_url)
response.raise_for_status()
discovery_data = response.json()
self.authorization_endpoint = discovery_data["authorization_endpoint"]
self.token_endpoint = discovery_data["token_endpoint"]
self.userinfo_endpoint = discovery_data["userinfo_endpoint"]
self.jwks_uri = discovery_data.get("jwks_uri")
self.end_session_endpoint = discovery_data.get("end_session_endpoint")
logger.info(
f"✓ Discovered Keycloak endpoints:\n"
f" Authorization: {self.authorization_endpoint}\n"
f" Token: {self.token_endpoint}\n"
f" Userinfo: {self.userinfo_endpoint}\n"
f" JWKS: {self.jwks_uri}"
)
def generate_pkce_challenge(self) -> tuple[str, str]:
"""
Generate PKCE code verifier and challenge.
Returns:
Tuple of (code_verifier, code_challenge)
"""
import base64
# Generate code verifier (43-128 characters)
code_verifier = secrets.token_urlsafe(32)
# Generate code challenge using S256 method (base64url-encoded SHA256)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
return code_verifier, code_challenge
async def get_authorization_url(
self,
state: str,
code_challenge: str,
extra_params: Optional[dict[str, str]] = None,
) -> str:
"""
Build authorization URL for OAuth flow.
Args:
state: CSRF protection state parameter
code_challenge: PKCE code challenge
extra_params: Additional query parameters
Returns:
Authorization URL
Raises:
RuntimeError: If discover() hasn't been called
"""
if not self.authorization_endpoint:
await self.discover()
if not self.authorization_endpoint:
raise RuntimeError("Authorization endpoint not discovered")
params = {
"client_id": self.client_id,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"scope": " ".join(self.scopes),
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
if extra_params:
params.update(extra_params)
return f"{self.authorization_endpoint}?{urlencode(params)}"
async def exchange_authorization_code(
self,
code: str,
code_verifier: str,
) -> dict:
"""
Exchange authorization code for tokens.
Args:
code: Authorization code from OAuth callback
code_verifier: PKCE code verifier
Returns:
Token response dictionary with keys:
- access_token: Access token
- refresh_token: Refresh token (if offline_access scope requested)
- id_token: ID token (JWT)
- expires_in: Access token lifetime in seconds
- refresh_expires_in: Refresh token lifetime in seconds (optional)
- token_type: Token type (Bearer)
Raises:
httpx.HTTPError: If token exchange fails
"""
if not self.token_endpoint:
await self.discover()
if not self.token_endpoint:
raise RuntimeError("Token endpoint not discovered")
logger.debug(
f"Exchanging authorization code for tokens at {self.token_endpoint}"
)
client = await self._get_http_client()
response = await client.post(
self.token_endpoint,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_uri,
"code_verifier": code_verifier,
},
auth=(self.client_id, self.client_secret),
)
response.raise_for_status()
token_data = response.json()
logger.info("✓ Successfully exchanged authorization code for tokens")
if "refresh_token" in token_data:
logger.info(" Received refresh token (offline_access granted)")
return token_data
async def refresh_access_token(self, refresh_token: str) -> dict:
"""
Refresh access token using refresh token.
Args:
refresh_token: Refresh token
Returns:
Token response dictionary (same format as exchange_authorization_code)
Raises:
httpx.HTTPError: If token refresh fails
"""
if not self.token_endpoint:
await self.discover()
if not self.token_endpoint:
raise RuntimeError("Token endpoint not discovered")
logger.debug("Refreshing access token")
client = await self._get_http_client()
response = await client.post(
self.token_endpoint,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
auth=(self.client_id, self.client_secret),
)
response.raise_for_status()
token_data = response.json()
logger.debug("✓ Successfully refreshed access token")
return token_data
async def get_userinfo(self, access_token: str) -> dict:
"""
Get user information using access token.
Args:
access_token: Access token
Returns:
Userinfo response dictionary with claims like:
- sub: Subject (user ID)
- name: Full name
- preferred_username: Username
- email: Email address
- email_verified: Email verification status
Raises:
httpx.HTTPError: If userinfo request fails
"""
if not self.userinfo_endpoint:
await self.discover()
if not self.userinfo_endpoint:
raise RuntimeError("Userinfo endpoint not discovered")
logger.debug("Fetching user info")
client = await self._get_http_client()
response = await client.get(
self.userinfo_endpoint,
headers={"Authorization": f"Bearer {access_token}"},
)
response.raise_for_status()
userinfo = response.json()
logger.debug(f"✓ Retrieved user info for subject: {userinfo.get('sub')}")
return userinfo
async def check_token_exchange_support(self) -> bool:
"""
Check if Keycloak supports RFC 8693 token exchange.
Returns:
True if token exchange is supported
Note:
This is ADR-002 Tier 2. Most Keycloak installations don't
have token exchange enabled by default.
"""
if not self.token_endpoint:
await self.discover()
# Try to get discovery document and check for token exchange grant
discovery_url = (
f"{self.keycloak_url}/realms/{self.realm}/.well-known/openid-configuration"
)
try:
client = await self._get_http_client()
response = await client.get(discovery_url)
response.raise_for_status()
discovery_data = response.json()
grant_types = discovery_data.get("grant_types_supported", [])
supported = "urn:ietf:params:oauth:grant-type:token-exchange" in grant_types
if supported:
logger.info("✓ Token exchange (RFC 8693) is supported")
else:
logger.info("Token exchange (RFC 8693) is not supported")
return supported
except Exception as e:
logger.warning(f"Failed to check token exchange support: {e}")
return False
__all__ = ["KeycloakOAuthClient", "generate_state", "verify_state"]
@@ -0,0 +1,394 @@
"""
Refresh Token Storage for ADR-002 Tier 1: Offline Access
Securely stores and manages user refresh tokens for background operations.
Tokens are encrypted at rest using Fernet symmetric encryption.
"""
import base64
import logging
import os
import time
from pathlib import Path
from typing import Optional
import aiosqlite
from cryptography.fernet import Fernet
logger = logging.getLogger(__name__)
class RefreshTokenStorage:
"""Securely store and manage user refresh tokens"""
def __init__(self, db_path: str, encryption_key: bytes):
"""
Initialize refresh token storage.
Args:
db_path: Path to SQLite database file
encryption_key: Fernet encryption key (32 bytes, base64-encoded)
"""
self.db_path = db_path
self.cipher = Fernet(encryption_key)
self._initialized = False
@classmethod
def from_env(cls) -> "RefreshTokenStorage":
"""
Create storage instance from environment variables.
Environment variables:
TOKEN_STORAGE_DB: Path to database file (default: /app/data/tokens.db)
TOKEN_ENCRYPTION_KEY: Base64-encoded Fernet key
Returns:
RefreshTokenStorage instance
Raises:
ValueError: If TOKEN_ENCRYPTION_KEY is not set
"""
db_path = os.getenv("TOKEN_STORAGE_DB", "/app/data/tokens.db")
encryption_key_b64 = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key_b64:
raise ValueError(
"TOKEN_ENCRYPTION_KEY environment variable is required. "
"Generate one with: python -c 'from cryptography.fernet import Fernet; "
"print(Fernet.generate_key().decode())'"
)
try:
encryption_key = base64.b64decode(encryption_key_b64)
except Exception as e:
raise ValueError(
f"Invalid TOKEN_ENCRYPTION_KEY: {e}. "
"Must be a base64-encoded Fernet key."
) from e
return cls(db_path=db_path, encryption_key=encryption_key)
async def initialize(self) -> None:
"""Initialize database schema"""
if self._initialized:
return
# Ensure directory exists
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
# Set restrictive permissions on database file
if Path(self.db_path).exists():
os.chmod(self.db_path, 0o600)
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
CREATE TABLE IF NOT EXISTS refresh_tokens (
user_id TEXT PRIMARY KEY,
encrypted_token BLOB NOT NULL,
expires_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
await db.execute(
"""
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
event TEXT NOT NULL,
user_id TEXT NOT NULL,
resource_type TEXT,
resource_id TEXT,
auth_method TEXT,
hostname TEXT
)
"""
)
# Create index on audit logs for efficient queries
await db.execute(
"CREATE INDEX IF NOT EXISTS idx_audit_user_timestamp "
"ON audit_logs(user_id, timestamp)"
)
await db.commit()
# Set restrictive permissions after creation
os.chmod(self.db_path, 0o600)
self._initialized = True
logger.info(f"Initialized refresh token storage at {self.db_path}")
async def store_refresh_token(
self,
user_id: str,
refresh_token: str,
expires_at: Optional[int] = None,
) -> None:
"""
Store encrypted refresh token for user.
Args:
user_id: User identifier (from OIDC 'sub' claim)
refresh_token: Refresh token to store
expires_at: Token expiration timestamp (Unix epoch), if known
"""
if not self._initialized:
await self.initialize()
encrypted_token = self.cipher.encrypt(refresh_token.encode())
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO refresh_tokens
(user_id, encrypted_token, expires_at, created_at, updated_at)
VALUES (?, ?, ?, COALESCE((SELECT created_at FROM refresh_tokens WHERE user_id = ?), ?), ?)
""",
(user_id, encrypted_token, expires_at, user_id, now, now),
)
await db.commit()
logger.info(
f"Stored refresh token for user {user_id}"
+ (f" (expires at {expires_at})" if expires_at else "")
)
# Audit log
await self._audit_log(
event="store_refresh_token",
user_id=user_id,
auth_method="offline_access",
)
async def get_refresh_token(self, user_id: str) -> Optional[str]:
"""
Retrieve and decrypt refresh token for user.
Args:
user_id: User identifier
Returns:
Decrypted refresh token, or None if not found or expired
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT encrypted_token, expires_at FROM refresh_tokens WHERE user_id = ?",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(f"No refresh token found for user {user_id}")
return None
encrypted_token, expires_at = row
# Check expiration
if expires_at is not None and expires_at < time.time():
logger.warning(
f"Refresh token for user {user_id} has expired (expired at {expires_at})"
)
await self.delete_refresh_token(user_id)
return None
try:
decrypted_token = self.cipher.decrypt(encrypted_token).decode()
logger.debug(f"Retrieved refresh token for user {user_id}")
return decrypted_token
except Exception as e:
logger.error(f"Failed to decrypt refresh token for user {user_id}: {e}")
return None
async def delete_refresh_token(self, user_id: str) -> bool:
"""
Delete refresh token for user.
Args:
user_id: User identifier
Returns:
True if token was deleted, False if not found
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM refresh_tokens WHERE user_id = ?",
(user_id,),
)
await db.commit()
deleted = cursor.rowcount > 0
if deleted:
logger.info(f"Deleted refresh token for user {user_id}")
await self._audit_log(
event="delete_refresh_token",
user_id=user_id,
auth_method="offline_access",
)
else:
logger.debug(f"No refresh token to delete for user {user_id}")
return deleted
async def get_all_user_ids(self) -> list[str]:
"""
Get list of all user IDs with stored refresh tokens.
Returns:
List of user IDs
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT user_id FROM refresh_tokens ORDER BY updated_at DESC"
) as cursor:
rows = await cursor.fetchall()
user_ids = [row[0] for row in rows]
logger.debug(f"Found {len(user_ids)} users with refresh tokens")
return user_ids
async def cleanup_expired_tokens(self) -> int:
"""
Remove expired refresh tokens from storage.
Returns:
Number of tokens deleted
"""
if not self._initialized:
await self.initialize()
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM refresh_tokens WHERE expires_at IS NOT NULL AND expires_at < ?",
(now,),
)
await db.commit()
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"Cleaned up {deleted} expired refresh token(s)")
return deleted
async def _audit_log(
self,
event: str,
user_id: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
auth_method: Optional[str] = None,
) -> None:
"""
Log operation to audit log.
Args:
event: Event name (e.g., "store_refresh_token", "token_refresh")
user_id: User identifier
resource_type: Resource type (e.g., "note", "file")
resource_id: Resource identifier
auth_method: Authentication method used
"""
import socket
hostname = socket.gethostname()
timestamp = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT INTO audit_logs
(timestamp, event, user_id, resource_type, resource_id, auth_method, hostname)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
timestamp,
event,
user_id,
resource_type,
resource_id,
auth_method,
hostname,
),
)
await db.commit()
async def get_audit_logs(
self,
user_id: Optional[str] = None,
since: Optional[int] = None,
limit: int = 100,
) -> list[dict]:
"""
Retrieve audit logs.
Args:
user_id: Filter by user ID (optional)
since: Filter by timestamp (Unix epoch, optional)
limit: Maximum number of logs to return
Returns:
List of audit log entries
"""
if not self._initialized:
await self.initialize()
query = "SELECT * FROM audit_logs WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
params.append(user_id)
if since:
query += " AND timestamp >= ?"
params.append(since)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def generate_encryption_key() -> str:
"""
Generate a new Fernet encryption key.
Returns:
Base64-encoded encryption key suitable for TOKEN_ENCRYPTION_KEY env var
"""
return Fernet.generate_key().decode()
# Example usage
if __name__ == "__main__":
import asyncio
async def main():
# Generate a key for testing
key = await generate_encryption_key()
print(f"Generated encryption key: {key}")
print(f"Set this in your environment: export TOKEN_ENCRYPTION_KEY='{key}'")
asyncio.run(main())
+7 -1
View File
@@ -168,17 +168,23 @@ class NextcloudTokenVerifier(TokenVerifier):
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
# Verify and decode JWT
# Accept tokens with audience: "mcp-server" or ["mcp-server", "nextcloud"]
# This allows:
# 1. Tokens from MCP clients (aud: "mcp-server")
# 2. Tokens for Nextcloud APIs (aud: "nextcloud")
# 3. Tokens for both (aud: ["mcp-server", "nextcloud"])
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
issuer=self.issuer,
audience=["mcp-server", "nextcloud"], # Accept either audience
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_iss": True if self.issuer else False,
"verify_aud": False, # Skip audience validation for Bearer tokens
"verify_aud": True, # Enable audience validation
},
)