1707b2e6e1
Add NEXTCLOUD_VERIFY_SSL and NEXTCLOUD_CA_BUNDLE env vars to configure TLS certificate verification for all outbound Nextcloud connections. Centralizes SSL config via a new HTTP client factory (http.py) used by all 27 Nextcloud-bound call sites, including API clients, OIDC endpoints, OAuth flows, and health checks. Closes #560 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
714 lines
26 KiB
Python
714 lines
26 KiB
Python
"""
|
|
MCP Tools for OAuth and Provisioning Management (ADR-004 Progressive Consent).
|
|
|
|
This module provides MCP tools that enable users to explicitly provision
|
|
Nextcloud access using the Flow 2 (Resource Provisioning) OAuth flow.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import secrets
|
|
from typing import Optional
|
|
from urllib.parse import urlencode
|
|
|
|
import jwt
|
|
from mcp.server.auth.middleware.auth_context import get_access_token
|
|
from mcp.server.auth.provider import AccessToken
|
|
from mcp.server.fastmcp import Context
|
|
from mcp.types import ToolAnnotations
|
|
from pydantic import BaseModel, Field
|
|
|
|
from nextcloud_mcp_server.auth import require_scopes
|
|
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
|
|
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
|
|
from nextcloud_mcp_server.auth.userinfo_routes import _query_idp_userinfo
|
|
|
|
from ..http import nextcloud_httpx_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def extract_user_id_from_token(ctx: Context) -> str:
|
|
"""Extract user_id from the MCP access token (Flow 1).
|
|
|
|
Handles both JWT and opaque tokens:
|
|
- JWT: Decode and extract 'sub' claim
|
|
- Opaque: Call userinfo endpoint to get 'sub'
|
|
|
|
Args:
|
|
ctx: MCP context with access token
|
|
|
|
Returns:
|
|
user_id extracted from token, or "default_user" as fallback
|
|
"""
|
|
# Use MCP SDK's get_access_token() which uses contextvars
|
|
access_token: AccessToken | None = get_access_token()
|
|
|
|
if not access_token or not access_token.token:
|
|
logger.warning(" ✗ No access token found via get_access_token()")
|
|
return "default_user"
|
|
|
|
token = access_token.token
|
|
is_jwt = "." in token and token.count(".") >= 2
|
|
logger.info(f" Token type: {'JWT' if is_jwt else 'Opaque'}")
|
|
|
|
# Try JWT decode first
|
|
if is_jwt:
|
|
try:
|
|
payload = jwt.decode(token, options={"verify_signature": False})
|
|
user_id = payload.get("sub", "unknown")
|
|
logger.info(f" ✓ JWT decode successful: user_id={user_id}")
|
|
return user_id
|
|
except Exception as e:
|
|
logger.error(f" ✗ JWT decode failed: {type(e).__name__}: {e}")
|
|
|
|
# Opaque token - call userinfo endpoint
|
|
logger.info(" Opaque token detected, calling userinfo endpoint...")
|
|
try:
|
|
# Get userinfo endpoint from OIDC discovery
|
|
oidc_discovery_uri = os.getenv(
|
|
"OIDC_DISCOVERY_URI",
|
|
"http://localhost:8080/.well-known/openid-configuration",
|
|
)
|
|
async with nextcloud_httpx_client() as http_client:
|
|
discovery_response = await http_client.get(oidc_discovery_uri)
|
|
discovery_response.raise_for_status()
|
|
discovery = discovery_response.json()
|
|
userinfo_endpoint = discovery.get("userinfo_endpoint")
|
|
|
|
if userinfo_endpoint:
|
|
userinfo = await _query_idp_userinfo(token, userinfo_endpoint)
|
|
if userinfo:
|
|
user_id = userinfo.get("sub", "unknown")
|
|
logger.info(f" ✓ Userinfo query successful: user_id={user_id}")
|
|
return user_id
|
|
else:
|
|
logger.error(" ✗ Userinfo query failed")
|
|
else:
|
|
logger.error(" ✗ No userinfo_endpoint available")
|
|
except Exception as e:
|
|
logger.error(f" ✗ Userinfo query failed: {type(e).__name__}: {e}")
|
|
|
|
# Fallback
|
|
logger.warning(" Using fallback user_id: default_user")
|
|
return "default_user"
|
|
|
|
|
|
class ProvisioningStatus(BaseModel):
|
|
"""Status of Nextcloud provisioning for a user."""
|
|
|
|
is_provisioned: bool = Field(description="Whether Nextcloud access is provisioned")
|
|
provisioned_at: Optional[str] = Field(
|
|
None, description="ISO timestamp when provisioned"
|
|
)
|
|
credential_type: Optional[str] = Field(
|
|
None, description="Type of credential ('refresh_token' or 'app_password')"
|
|
)
|
|
client_id: Optional[str] = Field(
|
|
None, description="Client ID that initiated the original Flow 1"
|
|
)
|
|
scopes: Optional[list[str]] = Field(None, description="Granted scopes")
|
|
flow_type: Optional[str] = Field(
|
|
None, description="Type of flow used ('hybrid', 'flow1', 'flow2')"
|
|
)
|
|
|
|
|
|
class ProvisioningResult(BaseModel):
|
|
"""Result of provisioning attempt."""
|
|
|
|
success: bool = Field(description="Whether provisioning was initiated")
|
|
provisioning_url: Optional[str] = Field(
|
|
None, description="URL to Astrolabe settings for provisioning background sync"
|
|
)
|
|
message: str = Field(description="Status message for the user")
|
|
already_provisioned: bool = Field(
|
|
False, description="Whether access was already provisioned"
|
|
)
|
|
|
|
|
|
class RevocationResult(BaseModel):
|
|
"""Result of access revocation."""
|
|
|
|
success: bool = Field(description="Whether revocation succeeded")
|
|
message: str = Field(description="Status message for the user")
|
|
|
|
|
|
class LoginConfirmation(BaseModel):
|
|
"""Schema for login confirmation elicitation."""
|
|
|
|
acknowledged: bool = Field(
|
|
default=False,
|
|
description="Check this box after completing login at the provided URL",
|
|
)
|
|
|
|
|
|
async def get_provisioning_status(ctx: Context, user_id: str) -> ProvisioningStatus:
|
|
"""
|
|
Check the provisioning status for Nextcloud access.
|
|
|
|
Checks for both credential types:
|
|
1. App password from Astrolabe (works today)
|
|
2. OAuth refresh token from storage (for future)
|
|
|
|
Args:
|
|
mcp: MCP context
|
|
user_id: User identifier
|
|
|
|
Returns:
|
|
ProvisioningStatus with current provisioning state
|
|
"""
|
|
from datetime import datetime, timezone
|
|
|
|
from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient
|
|
from nextcloud_mcp_server.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
# Check for app password first (interim solution)
|
|
if settings.oidc_client_id and settings.oidc_client_secret:
|
|
try:
|
|
astrolabe = AstrolabeClient(
|
|
nextcloud_host=settings.nextcloud_host or "",
|
|
client_id=settings.oidc_client_id,
|
|
client_secret=settings.oidc_client_secret,
|
|
)
|
|
status = await astrolabe.get_background_sync_status(user_id)
|
|
|
|
if status.get("has_access"):
|
|
logger.info(
|
|
f" get_provisioning_status: ✓ App password FOUND for user_id={user_id}"
|
|
)
|
|
provisioned_at_str = status.get("provisioned_at")
|
|
return ProvisioningStatus(
|
|
is_provisioned=True,
|
|
provisioned_at=provisioned_at_str,
|
|
credential_type="app_password",
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f" App password check failed for {user_id}: {e}")
|
|
|
|
# Check for OAuth refresh token (fallback)
|
|
logger.info(
|
|
f" get_provisioning_status: Looking up refresh token for user_id={user_id}"
|
|
)
|
|
storage = RefreshTokenStorage.from_env()
|
|
await storage.initialize()
|
|
|
|
token_data = await storage.get_refresh_token(user_id)
|
|
|
|
if not token_data:
|
|
logger.info(
|
|
f" get_provisioning_status: ✗ No credentials found for user_id={user_id}"
|
|
)
|
|
return ProvisioningStatus(is_provisioned=False)
|
|
|
|
logger.info(
|
|
f" get_provisioning_status: ✓ Refresh token FOUND for user_id={user_id}"
|
|
)
|
|
logger.info(f" flow_type: {token_data.get('flow_type')}")
|
|
logger.info(
|
|
f" provisioning_client_id: {token_data.get('provisioning_client_id', 'N/A')}"
|
|
)
|
|
|
|
# Convert timestamp to ISO format if present
|
|
provisioned_at_str = None
|
|
if token_data.get("provisioned_at"):
|
|
dt = datetime.fromtimestamp(token_data["provisioned_at"], tz=timezone.utc)
|
|
provisioned_at_str = dt.isoformat()
|
|
|
|
return ProvisioningStatus(
|
|
is_provisioned=True,
|
|
provisioned_at=provisioned_at_str,
|
|
credential_type="refresh_token",
|
|
client_id=token_data.get("provisioning_client_id"),
|
|
scopes=token_data.get("scopes"),
|
|
flow_type=token_data.get("flow_type", "hybrid"),
|
|
)
|
|
|
|
|
|
def generate_oauth_url_for_flow2(
|
|
oidc_discovery_url: str,
|
|
server_client_id: str,
|
|
redirect_uri: str,
|
|
state: str,
|
|
scopes: list[str],
|
|
) -> str:
|
|
"""
|
|
Generate OAuth authorization URL for Flow 2 (Resource Provisioning).
|
|
|
|
This returns the MCP server's Flow 2 authorization endpoint, which will:
|
|
1. Generate PKCE parameters (required by Nextcloud OIDC)
|
|
2. Store code_verifier in session
|
|
3. Redirect to Nextcloud IdP with PKCE
|
|
4. Handle the callback with code_verifier for token exchange
|
|
|
|
Args:
|
|
oidc_discovery_url: OIDC provider discovery URL (unused, kept for compatibility)
|
|
server_client_id: MCP server's OAuth client ID (unused, kept for compatibility)
|
|
redirect_uri: Callback URL for the MCP server (unused, kept for compatibility)
|
|
state: CSRF protection state
|
|
scopes: List of scopes to request (unused, kept for compatibility)
|
|
|
|
Returns:
|
|
MCP server's Flow 2 authorization URL with state parameter
|
|
"""
|
|
# Use the MCP server's Flow 2 endpoint which handles PKCE internally
|
|
# This endpoint will:
|
|
# - Generate code_verifier and code_challenge (PKCE)
|
|
# - Store code_verifier in session storage
|
|
# - Redirect to Nextcloud with PKCE parameters
|
|
# - Handle the callback with proper code_verifier
|
|
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
|
|
auth_endpoint = f"{mcp_server_url}/oauth/authorize-nextcloud"
|
|
|
|
# Only pass state parameter - the endpoint handles everything else
|
|
params = {"state": state}
|
|
|
|
return f"{auth_endpoint}?{urlencode(params)}"
|
|
|
|
|
|
async def provision_nextcloud_access(
|
|
ctx: Context, user_id: Optional[str] = None
|
|
) -> ProvisioningResult:
|
|
"""
|
|
MCP Tool: Provision offline access to Nextcloud resources.
|
|
|
|
Returns URL to Astrolabe settings page where users can provision background
|
|
sync access using either:
|
|
- App password (works today, interim solution)
|
|
- OAuth refresh token (future, when Nextcloud supports OAuth for app APIs)
|
|
|
|
Args:
|
|
ctx: MCP context with user's Flow 1 token
|
|
user_id: Optional user identifier (extracted from token if not provided)
|
|
|
|
Returns:
|
|
ProvisioningResult with Astrolabe settings URL or status
|
|
"""
|
|
try:
|
|
# Extract user ID from the MCP access token (Flow 1 token)
|
|
if not user_id:
|
|
user_id = await extract_user_id_from_token(ctx)
|
|
|
|
# Check if already provisioned
|
|
status = await get_provisioning_status(ctx, user_id)
|
|
if status.is_provisioned:
|
|
return ProvisioningResult(
|
|
success=True,
|
|
already_provisioned=True,
|
|
message=(
|
|
f"Nextcloud access is already provisioned (credential_type={status.credential_type}, "
|
|
f"since {status.provisioned_at}). "
|
|
"Use 'revoke_nextcloud_access' if you want to re-provision."
|
|
),
|
|
)
|
|
|
|
# Get configuration using settings (handles both ENABLE_BACKGROUND_OPERATIONS
|
|
# and ENABLE_OFFLINE_ACCESS environment variables)
|
|
from nextcloud_mcp_server.config import get_settings
|
|
|
|
settings = get_settings()
|
|
if not settings.enable_offline_access:
|
|
return ProvisioningResult(
|
|
success=False,
|
|
message=(
|
|
"Offline access is not enabled. "
|
|
"Set ENABLE_BACKGROUND_OPERATIONS=true to use this feature."
|
|
),
|
|
)
|
|
|
|
# Return Astrolabe settings URL for background sync provisioning
|
|
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
|
|
astrolabe_url = f"{nextcloud_host}/settings/user/astrolabe#background-sync"
|
|
|
|
return ProvisioningResult(
|
|
success=True,
|
|
provisioning_url=astrolabe_url,
|
|
message=(
|
|
"Visit Astrolabe settings to provision background sync access.\n\n"
|
|
"You can choose either:\n"
|
|
"- App password (works today, recommended for now)\n"
|
|
"- OAuth refresh token (future, when Nextcloud fully supports OAuth)\n\n"
|
|
"After provisioning, background sync will enable the MCP server to "
|
|
"access Nextcloud resources even when you're not actively connected."
|
|
),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initiate provisioning: {e}")
|
|
return ProvisioningResult(
|
|
success=False,
|
|
message=f"Failed to initiate provisioning: {str(e)}",
|
|
)
|
|
|
|
|
|
async def revoke_nextcloud_access(
|
|
ctx: Context, user_id: Optional[str] = None
|
|
) -> RevocationResult:
|
|
"""
|
|
MCP Tool: Revoke offline access to Nextcloud resources.
|
|
|
|
This tool removes the stored refresh token and revokes access
|
|
that was granted via Flow 2.
|
|
|
|
Args:
|
|
mcp: MCP context
|
|
user_id: Optional user identifier
|
|
|
|
Returns:
|
|
RevocationResult with status
|
|
"""
|
|
try:
|
|
# Get user ID from token if not provided
|
|
if not user_id:
|
|
logger.info("Extracting user_id from access token for revoke...")
|
|
user_id = await extract_user_id_from_token(ctx)
|
|
logger.info(f" Revoke using user_id: {user_id}")
|
|
|
|
# Check current status
|
|
status = await get_provisioning_status(ctx, user_id)
|
|
if not status.is_provisioned:
|
|
return RevocationResult(
|
|
success=True,
|
|
message="No Nextcloud access to revoke.",
|
|
)
|
|
|
|
# Initialize Token Broker to handle revocation
|
|
storage = RefreshTokenStorage.from_env()
|
|
await storage.initialize()
|
|
|
|
# Get OAuth client credentials from storage
|
|
client_creds = await storage.get_oauth_client()
|
|
if not client_creds:
|
|
return RevocationResult(
|
|
success=False,
|
|
message="OAuth client credentials not found in storage.",
|
|
)
|
|
|
|
broker = TokenBrokerService(
|
|
storage=storage,
|
|
oidc_discovery_url=os.getenv(
|
|
"OIDC_DISCOVERY_URL",
|
|
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
|
|
),
|
|
nextcloud_host=os.getenv("NEXTCLOUD_HOST"), # type: ignore
|
|
client_id=client_creds["client_id"],
|
|
client_secret=client_creds["client_secret"],
|
|
)
|
|
|
|
# Revoke access
|
|
success = await broker.revoke_nextcloud_access(user_id)
|
|
|
|
if success:
|
|
return RevocationResult(
|
|
success=True,
|
|
message=(
|
|
"Successfully revoked Nextcloud access. "
|
|
"You can run 'provision_nextcloud_access' again if needed."
|
|
),
|
|
)
|
|
else:
|
|
return RevocationResult(
|
|
success=False,
|
|
message="Failed to revoke access. Please try again.",
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to revoke access: {e}")
|
|
return RevocationResult(
|
|
success=False,
|
|
message=f"Failed to revoke access: {str(e)}",
|
|
)
|
|
|
|
|
|
async def check_provisioning_status(
|
|
ctx: Context, user_id: Optional[str] = None
|
|
) -> ProvisioningStatus:
|
|
"""
|
|
MCP Tool: Check the current provisioning status.
|
|
|
|
This tool allows users to check whether they have provisioned
|
|
Nextcloud access and see details about their current authorization.
|
|
|
|
Args:
|
|
mcp: MCP context
|
|
user_id: Optional user identifier
|
|
|
|
Returns:
|
|
ProvisioningStatus with current state
|
|
"""
|
|
# Get user ID from context if not provided
|
|
if not user_id:
|
|
user_id = (
|
|
ctx.context.get("user_id", "default_user") # type: ignore
|
|
if hasattr(ctx, "context")
|
|
else "default_user"
|
|
)
|
|
|
|
return await get_provisioning_status(ctx, user_id)
|
|
|
|
|
|
async def check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
|
|
"""
|
|
MCP Tool: Check if user is logged in and elicit login if needed.
|
|
|
|
This tool checks whether the user has completed Flow 2 (resource provisioning)
|
|
to grant offline access to Nextcloud. If not logged in, it uses MCP elicitation
|
|
to prompt the user to complete the login flow.
|
|
|
|
Args:
|
|
ctx: MCP context with user's Flow 1 token
|
|
user_id: Optional user identifier (extracted from token if not provided)
|
|
|
|
Returns:
|
|
"yes" if logged in, or elicitation prompting for login
|
|
"""
|
|
try:
|
|
# Extract user ID from the MCP access token (Flow 1 token)
|
|
logger.info("=" * 60)
|
|
logger.info("check_logged_in: Starting user_id extraction")
|
|
logger.info("=" * 60)
|
|
|
|
if not user_id:
|
|
user_id = await extract_user_id_from_token(ctx)
|
|
logger.info(f" Final user_id for check_logged_in: {user_id}")
|
|
else:
|
|
logger.info(f" user_id provided as argument: {user_id}")
|
|
|
|
# Check if already logged in
|
|
logger.info(f"Checking provisioning status for user_id: {user_id}")
|
|
status = await get_provisioning_status(ctx, user_id)
|
|
logger.info(f" Provisioning status: is_provisioned={status.is_provisioned}")
|
|
|
|
if status.is_provisioned:
|
|
logger.info(f"✓ User {user_id} is already logged in - returning 'yes'")
|
|
logger.info("=" * 60)
|
|
return "yes"
|
|
|
|
logger.info(f"✗ User {user_id} is NOT logged in - triggering elicitation")
|
|
logger.info("=" * 60)
|
|
|
|
# Not logged in - generate OAuth URL for Flow 2
|
|
# Use settings (handles both ENABLE_BACKGROUND_OPERATIONS and ENABLE_OFFLINE_ACCESS)
|
|
from nextcloud_mcp_server.config import get_settings
|
|
|
|
settings = get_settings()
|
|
if not settings.enable_offline_access:
|
|
return (
|
|
"Not logged in. Offline access is not enabled. "
|
|
"Set ENABLE_BACKGROUND_OPERATIONS=true to use this feature."
|
|
)
|
|
|
|
# Get MCP server's OAuth client credentials
|
|
# Try environment variable first, then fall back to DCR client_id
|
|
server_client_id = os.getenv("MCP_SERVER_CLIENT_ID")
|
|
if not server_client_id:
|
|
# Try to get from lifespan context (DCR)
|
|
lifespan_ctx = ctx.request_context.lifespan_context
|
|
if hasattr(lifespan_ctx, "server_client_id"):
|
|
server_client_id = lifespan_ctx.server_client_id
|
|
|
|
if not server_client_id:
|
|
return (
|
|
"Not logged in. MCP server OAuth client not configured. "
|
|
"Set MCP_SERVER_CLIENT_ID environment variable or use Dynamic Client Registration."
|
|
)
|
|
|
|
# Generate OAuth URL for Flow 2
|
|
oidc_discovery_url = os.getenv(
|
|
"OIDC_DISCOVERY_URL",
|
|
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
|
|
)
|
|
|
|
# Generate secure state for CSRF protection
|
|
state = secrets.token_urlsafe(32)
|
|
|
|
# Store state in session for validation on callback
|
|
storage = RefreshTokenStorage.from_env()
|
|
await storage.initialize()
|
|
|
|
# Create OAuth session for Flow 2
|
|
session_id = f"flow2_{user_id}_{secrets.token_hex(8)}"
|
|
redirect_uri = f"{os.getenv('NEXTCLOUD_MCP_SERVER_URL', 'http://localhost:8000')}/oauth/callback"
|
|
|
|
await storage.store_oauth_session(
|
|
session_id=session_id,
|
|
client_redirect_uri="", # No client redirect for Flow 2
|
|
state=state,
|
|
flow_type="flow2",
|
|
is_provisioning=True,
|
|
ttl_seconds=600, # 10 minute TTL
|
|
)
|
|
|
|
# Define scopes for Nextcloud access
|
|
scopes = [
|
|
"openid",
|
|
"profile",
|
|
"email",
|
|
"offline_access", # Critical for background operations
|
|
"notes:read",
|
|
"notes:write",
|
|
"calendar:read",
|
|
"calendar:write",
|
|
"contacts:read",
|
|
"contacts:write",
|
|
"files:read",
|
|
"files:write",
|
|
]
|
|
|
|
# Generate authorization URL
|
|
auth_url = generate_oauth_url_for_flow2(
|
|
oidc_discovery_url=oidc_discovery_url,
|
|
server_client_id=server_client_id,
|
|
redirect_uri=redirect_uri,
|
|
state=state,
|
|
scopes=scopes,
|
|
)
|
|
|
|
# Use elicitation to prompt user to login
|
|
logger.info(f"Eliciting login for user {user_id} with URL: {auth_url}")
|
|
|
|
result = await ctx.elicit(
|
|
message=f"Please log in to Nextcloud at the following URL:\n\n{auth_url}\n\nAfter completing the login, check the box below and click OK.",
|
|
schema=LoginConfirmation,
|
|
)
|
|
|
|
if result.action == "accept":
|
|
# Check if login was successful by looking for refresh token
|
|
# Strategy: Try multiple lookup methods to handle both flows
|
|
logger.info("User accepted login prompt, checking for refresh token")
|
|
logger.info(f" State parameter: {state[:16]}...")
|
|
logger.info(f" User ID: {user_id}")
|
|
|
|
# First, try to find token by provisioning_client_id (Flow 2 from elicitation)
|
|
refresh_token_data = (
|
|
await storage.get_refresh_token_by_provisioning_client_id(state)
|
|
)
|
|
|
|
if refresh_token_data:
|
|
logger.info("✓ Refresh token found via provisioning_client_id lookup")
|
|
logger.info(
|
|
f" Flow type: {refresh_token_data.get('flow_type', 'unknown')}"
|
|
)
|
|
logger.info(
|
|
f" Provisioned at: {refresh_token_data.get('provisioned_at', 'unknown')}"
|
|
)
|
|
return "yes"
|
|
|
|
# Fallback: Try to find token by user_id (browser login or any other flow)
|
|
logger.info(f"✗ No token found with provisioning_client_id={state[:16]}...")
|
|
logger.info(f" Trying fallback lookup by user_id: {user_id}")
|
|
|
|
refresh_token_data = await storage.get_refresh_token(user_id)
|
|
|
|
if refresh_token_data:
|
|
logger.info("✓ Refresh token found via user_id lookup")
|
|
logger.info(
|
|
f" Flow type: {refresh_token_data.get('flow_type', 'unknown')}"
|
|
)
|
|
logger.info(
|
|
f" Provisioned at: {refresh_token_data.get('provisioned_at', 'unknown')}"
|
|
)
|
|
logger.info(
|
|
f" Provisioning client ID: {refresh_token_data.get('provisioning_client_id', 'NULL')}"
|
|
)
|
|
logger.info(
|
|
" Note: This token was created via browser login or different flow"
|
|
)
|
|
return "yes"
|
|
|
|
# No token found by either method
|
|
logger.warning(f"✗ No refresh token found for user {user_id}")
|
|
logger.warning(
|
|
f" Checked provisioning_client_id={state[:16]}... - NOT FOUND"
|
|
)
|
|
logger.warning(f" Checked user_id={user_id} - NOT FOUND")
|
|
logger.warning(
|
|
" This may indicate the user completed login but token wasn't stored"
|
|
)
|
|
|
|
return (
|
|
"Login not detected. Please ensure you completed the login "
|
|
"at the provided URL before clicking OK."
|
|
)
|
|
elif result.action == "decline":
|
|
return "Login declined by user."
|
|
else:
|
|
return "Login cancelled by user."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check login status: {e}")
|
|
return f"Error checking login status: {str(e)}"
|
|
|
|
|
|
# Register MCP tools
|
|
def register_oauth_tools(mcp):
|
|
"""Register OAuth and provisioning tools with the MCP server."""
|
|
|
|
@mcp.tool(
|
|
name="provision_nextcloud_access",
|
|
title="Grant Server Access to Nextcloud",
|
|
description=(
|
|
"Provision offline access to Nextcloud resources. "
|
|
"This is required before using Nextcloud tools. "
|
|
"You'll need to complete an OAuth authorization in your browser."
|
|
),
|
|
annotations=ToolAnnotations(
|
|
idempotentHint=False, # Creates new OAuth session each time
|
|
openWorldHint=True,
|
|
),
|
|
)
|
|
@require_scopes("openid")
|
|
async def tool_provision_access(
|
|
ctx: Context,
|
|
user_id: Optional[str] = None,
|
|
) -> ProvisioningResult:
|
|
return await provision_nextcloud_access(ctx, user_id)
|
|
|
|
@mcp.tool(
|
|
name="revoke_nextcloud_access",
|
|
title="Revoke Server Access to Nextcloud",
|
|
description="Revoke offline access to Nextcloud resources.",
|
|
annotations=ToolAnnotations(
|
|
destructiveHint=True, # Removes stored access tokens
|
|
idempotentHint=True, # Revoking revoked access = same end state
|
|
openWorldHint=True,
|
|
),
|
|
)
|
|
@require_scopes("openid")
|
|
async def tool_revoke_access(
|
|
ctx: Context, user_id: Optional[str] = None
|
|
) -> RevocationResult:
|
|
return await revoke_nextcloud_access(ctx, user_id)
|
|
|
|
@mcp.tool(
|
|
name="check_provisioning_status",
|
|
title="Check Provisioning Status",
|
|
description="Check whether Nextcloud access is provisioned.",
|
|
annotations=ToolAnnotations(
|
|
readOnlyHint=True, # Only checks status, doesn't modify
|
|
openWorldHint=True,
|
|
),
|
|
)
|
|
@require_scopes("openid")
|
|
async def tool_check_status(
|
|
ctx: Context, user_id: Optional[str] = None
|
|
) -> ProvisioningStatus:
|
|
return await check_provisioning_status(ctx, user_id)
|
|
|
|
@mcp.tool(
|
|
name="check_logged_in",
|
|
title="Check Server Login Status",
|
|
description=(
|
|
"Check if you are logged in to Nextcloud. "
|
|
"If not logged in, this tool will prompt you to complete the login flow."
|
|
),
|
|
annotations=ToolAnnotations(
|
|
readOnlyHint=True, # Checking status doesn't modify state
|
|
openWorldHint=True,
|
|
),
|
|
)
|
|
@require_scopes("openid")
|
|
async def tool_check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
|
|
return await check_logged_in(ctx, user_id)
|