db1e0606ad
Consolidate three independent RefreshTokenStorage lazy singletons into a single lock-protected get_shared_storage() function, eliminating race conditions on concurrent first-access. Remove blanket try/except in _get_stored_scopes so storage errors propagate as proper MCP errors instead of silently triggering "please provision" messages. Handle declined/cancelled elicitation results in Login Flow tools by cleaning up sessions and returning clear status. Add update_app_password_scopes() to avoid unnecessary decrypt/re-encrypt when only scopes change. Add unprovisioned-user early exit and no-op detection to nc_auth_update_scopes. Remove four dead config fields and misleading NEXTCLOUD_PASSWORD deprecation warning. Add periodic login flow session cleanup task. Generate separate Fernet keys per service. Add board cleanup in deck integration test. Gate CI unit tests on linting and skip Astrolabe build for single-user profile. Fix test markers from oauth to multi_user_basic for astrolabe integration tests. Update login_flow.py docstrings to document outbound HTTP calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
"""Access and scope management API endpoints.
|
|
|
|
Provides REST API endpoints for querying and managing user access status
|
|
and application-level scopes for Login Flow v2 mode.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse
|
|
|
|
from nextcloud_mcp_server.api.management import _sanitize_error_for_client
|
|
from nextcloud_mcp_server.api.passwords import (
|
|
_extract_basic_auth,
|
|
_get_app_password_storage,
|
|
)
|
|
from nextcloud_mcp_server.models.auth import ALL_SUPPORTED_SCOPES
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_user_access(request: Request) -> JSONResponse:
|
|
"""GET /api/v1/users/{user_id}/access - Get user's provisioned access and scopes.
|
|
|
|
Returns the user's current provisioning status, granted scopes, and metadata.
|
|
Requires BasicAuth with the user's credentials.
|
|
"""
|
|
path_user_id = request.path_params.get("user_id")
|
|
if not path_user_id:
|
|
return JSONResponse(
|
|
{"success": False, "error": "Missing user_id in path"},
|
|
status_code=400,
|
|
)
|
|
|
|
username, _, error_response = _extract_basic_auth(request, path_user_id)
|
|
if error_response is not None:
|
|
return error_response
|
|
|
|
try:
|
|
storage = await _get_app_password_storage(request)
|
|
data = await storage.get_app_password_with_scopes(username)
|
|
|
|
if data is None:
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"user_id": username,
|
|
"provisioned": False,
|
|
"scopes": None,
|
|
"username": None,
|
|
}
|
|
)
|
|
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"user_id": username,
|
|
"provisioned": True,
|
|
"scopes": data["scopes"],
|
|
"username": data.get("username"),
|
|
"created_at": data.get("created_at"),
|
|
"updated_at": data.get("updated_at"),
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = _sanitize_error_for_client(e, "get_user_access")
|
|
return JSONResponse(
|
|
{"success": False, "error": error_msg},
|
|
status_code=500,
|
|
)
|
|
|
|
|
|
async def update_user_scopes(request: Request) -> JSONResponse:
|
|
"""PATCH /api/v1/users/{user_id}/scopes - Update user's application-level scopes.
|
|
|
|
Accepts JSON body with:
|
|
- scopes: list[str] - New scope set to apply
|
|
|
|
This only updates the stored scopes, not the app password itself.
|
|
The app password remains valid; scope enforcement is application-level.
|
|
"""
|
|
path_user_id = request.path_params.get("user_id")
|
|
if not path_user_id:
|
|
return JSONResponse(
|
|
{"success": False, "error": "Missing user_id in path"},
|
|
status_code=400,
|
|
)
|
|
|
|
username, _, error_response = _extract_basic_auth(request, path_user_id)
|
|
if error_response is not None:
|
|
return error_response
|
|
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return JSONResponse(
|
|
{"success": False, "error": "Invalid JSON body"},
|
|
status_code=400,
|
|
)
|
|
|
|
scopes = body.get("scopes")
|
|
if scopes is None or not isinstance(scopes, list):
|
|
return JSONResponse(
|
|
{"success": False, "error": "scopes must be a list of strings"},
|
|
status_code=400,
|
|
)
|
|
|
|
# Validate scopes
|
|
invalid = [s for s in scopes if s not in ALL_SUPPORTED_SCOPES]
|
|
if invalid:
|
|
return JSONResponse(
|
|
{
|
|
"success": False,
|
|
"error": f"Invalid scopes: {', '.join(invalid)}",
|
|
"valid_scopes": ALL_SUPPORTED_SCOPES,
|
|
},
|
|
status_code=400,
|
|
)
|
|
|
|
try:
|
|
storage = await _get_app_password_storage(request)
|
|
existing = await storage.get_app_password_with_scopes(username)
|
|
|
|
if existing is None:
|
|
return JSONResponse(
|
|
{
|
|
"success": False,
|
|
"error": "No app password provisioned for this user",
|
|
},
|
|
status_code=404,
|
|
)
|
|
|
|
# Update scopes only (no decrypt/re-encrypt of the password)
|
|
await storage.update_app_password_scopes(
|
|
user_id=username,
|
|
scopes=scopes,
|
|
)
|
|
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"user_id": username,
|
|
"scopes": scopes,
|
|
"message": "Scopes updated successfully",
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = _sanitize_error_for_client(e, "update_user_scopes")
|
|
return JSONResponse(
|
|
{"success": False, "error": error_msg},
|
|
status_code=500,
|
|
)
|
|
|
|
|
|
async def list_supported_scopes(request: Request) -> JSONResponse:
|
|
"""GET /api/v1/scopes - List all supported application-level scopes."""
|
|
return JSONResponse(
|
|
{
|
|
"success": True,
|
|
"scopes": ALL_SUPPORTED_SCOPES,
|
|
}
|
|
)
|