f43343356e
- Add 429 retry with exponential backoff to register_client() (fixes CI oauth matrix failures from parallel DCR requests) - Make client_id, redirect_uri, and PKCE mandatory at token endpoint - Add null-checks for discovery_url and OAuth credentials in proxy flows - Add OIDC discovery document caching with 5-min TTL - Add per-IP rate limiting on /oauth/register DCR proxy - Discover DCR endpoint from OIDC discovery instead of hardcoding - Extract extract_user_id_from_token to auth/token_utils.py (breaks circular imports between server/ and auth/ layers) - Add TTL scope cache in scope_authorization.py (avoids DB hit per tool) - Add defense-in-depth scope validation in storage layer - Broaden elicitation exception handling with graceful fallback - Add idempotentHint to nc_auth_check_status, return "pending" status after accepted elicitation, add polling interval to description - Change ALL_SUPPORTED_SCOPES from tuple to frozenset for O(1) lookups - Replace Optional[str] with str | None throughout config.py - Use default_factory for ProxyCodeEntry/ASProxySession dataclasses - Add proxy code/session cleanup to background loop - Fix OIDC verification CI step to only run for oauth/login-flow modes - Add unit tests for access.py REST endpoints (10 tests) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
174 lines
5.4 KiB
Python
174 lines
5.4 KiB
Python
"""Access and scope management API endpoints.
|
|
|
|
Provides REST API endpoints for querying and managing user access status
|
|
and application-level scopes for Login Flow v2 mode.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse
|
|
|
|
from nextcloud_mcp_server.api.management import _sanitize_error_for_client
|
|
from nextcloud_mcp_server.api.passwords import (
|
|
_extract_basic_auth,
|
|
_get_app_password_storage,
|
|
)
|
|
from nextcloud_mcp_server.auth.scope_authorization import invalidate_scope_cache
|
|
from nextcloud_mcp_server.models.auth import ALL_SUPPORTED_SCOPES
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_user_access(request: Request) -> JSONResponse:
|
|
"""GET /api/v1/users/{user_id}/access - Get user's provisioned access and scopes.
|
|
|
|
Returns the user's current provisioning status, granted scopes, and metadata.
|
|
Requires BasicAuth with the user's credentials.
|
|
"""
|
|
path_user_id = request.path_params.get("user_id")
|
|
if not path_user_id:
|
|
return JSONResponse(
|
|
{"success": False, "error": "Missing user_id in path"},
|
|
status_code=400,
|
|
)
|
|
|
|
username, _, error_response = _extract_basic_auth(request, path_user_id)
|
|
if error_response is not None:
|
|
return error_response
|
|
|
|
try:
|
|
storage = await _get_app_password_storage(request)
|
|
data = await storage.get_app_password_with_scopes(username)
|
|
|
|
if data is None:
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"user_id": username,
|
|
"provisioned": False,
|
|
"scopes": None,
|
|
"username": None,
|
|
}
|
|
)
|
|
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"user_id": username,
|
|
"provisioned": True,
|
|
"scopes": data["scopes"],
|
|
"username": data.get("username"),
|
|
"created_at": data.get("created_at"),
|
|
"updated_at": data.get("updated_at"),
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = _sanitize_error_for_client(e, "get_user_access")
|
|
return JSONResponse(
|
|
{"success": False, "error": error_msg},
|
|
status_code=500,
|
|
)
|
|
|
|
|
|
async def update_user_scopes(request: Request) -> JSONResponse:
|
|
"""PATCH /api/v1/users/{user_id}/scopes - Update user's application-level scopes.
|
|
|
|
Accepts JSON body with:
|
|
- scopes: list[str] - New scope set to apply
|
|
|
|
This only updates the stored scopes, not the app password itself.
|
|
The app password remains valid; scope enforcement is application-level.
|
|
|
|
Security note: This endpoint allows direct scope modification without
|
|
re-authenticating via Login Flow. The caller must authenticate with
|
|
valid BasicAuth credentials (user_id + app_password), which serves
|
|
as the authorization check.
|
|
"""
|
|
path_user_id = request.path_params.get("user_id")
|
|
if not path_user_id:
|
|
return JSONResponse(
|
|
{"success": False, "error": "Missing user_id in path"},
|
|
status_code=400,
|
|
)
|
|
|
|
username, _, error_response = _extract_basic_auth(request, path_user_id)
|
|
if error_response is not None:
|
|
return error_response
|
|
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return JSONResponse(
|
|
{"success": False, "error": "Invalid JSON body"},
|
|
status_code=400,
|
|
)
|
|
|
|
scopes = body.get("scopes")
|
|
if scopes is None or not isinstance(scopes, list):
|
|
return JSONResponse(
|
|
{"success": False, "error": "scopes must be a list of strings"},
|
|
status_code=400,
|
|
)
|
|
|
|
# Validate scopes
|
|
invalid = [s for s in scopes if s not in ALL_SUPPORTED_SCOPES]
|
|
if invalid:
|
|
return JSONResponse(
|
|
{
|
|
"success": False,
|
|
"error": f"Invalid scopes: {', '.join(invalid)}",
|
|
"valid_scopes": sorted(ALL_SUPPORTED_SCOPES),
|
|
},
|
|
status_code=400,
|
|
)
|
|
|
|
try:
|
|
storage = await _get_app_password_storage(request)
|
|
existing = await storage.get_app_password_with_scopes(username)
|
|
|
|
if existing is None:
|
|
return JSONResponse(
|
|
{
|
|
"success": False,
|
|
"error": "No app password provisioned for this user",
|
|
},
|
|
status_code=404,
|
|
)
|
|
|
|
# Update scopes only (no decrypt/re-encrypt of the password)
|
|
await storage.update_app_password_scopes(
|
|
user_id=username,
|
|
scopes=scopes,
|
|
)
|
|
|
|
# Invalidate scope cache so subsequent tool calls see updated scopes
|
|
invalidate_scope_cache(username)
|
|
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"user_id": username,
|
|
"scopes": scopes,
|
|
"message": "Scopes updated successfully",
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = _sanitize_error_for_client(e, "update_user_scopes")
|
|
return JSONResponse(
|
|
{"success": False, "error": error_msg},
|
|
status_code=500,
|
|
)
|
|
|
|
|
|
async def list_supported_scopes(_: Request) -> JSONResponse:
|
|
"""GET /api/v1/scopes - List all supported application-level scopes."""
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"scopes": sorted(ALL_SUPPORTED_SCOPES),
|
|
}
|
|
)
|