fix(auth): Store app passwords locally for multi-user BasicAuth background sync

Previously, the multi-user BasicAuth mode attempted to retrieve app passwords
via OAuth client_credentials grant, which Nextcloud OIDC doesn't support.

This fix implements local storage for app passwords:
- Add app_passwords table via Alembic migration (002)
- Add store/get/delete methods to RefreshTokenStorage
- Add management API endpoints for app password provisioning:
  - POST /api/v1/users/{user_id}/app-password
  - GET /api/v1/users/{user_id}/app-password
  - DELETE /api/v1/users/{user_id}/app-password
- Update oauth_sync.py to read from local storage
- Update Astrolabe to send app passwords to MCP server after validation
- Add app-hook to configure mcp_server_url in Nextcloud

The flow is now:
1. User creates app password in Nextcloud Security settings
2. User enters it in Astrolabe Personal Settings
3. Astrolabe validates against Nextcloud, then sends to MCP server
4. MCP server stores encrypted app password locally
5. Background sync uses locally stored password

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2026-01-13 15:44:11 +01:00
parent 546f0c0674
commit e486e92f91
7 changed files with 691 additions and 32 deletions
+336
View File
@@ -510,6 +510,342 @@ async def revoke_user_access(request: Request) -> JSONResponse:
)
async def provision_app_password(request: Request) -> JSONResponse:
"""POST /api/v1/users/{user_id}/app-password - Store app password for background sync.
This endpoint is used by Astrolabe (Nextcloud PHP app) to provision app passwords
for multi-user BasicAuth mode background sync.
The request must include BasicAuth credentials where:
- username: Nextcloud user ID (must match path user_id)
- password: The app password being provisioned
The MCP server validates the app password against Nextcloud before storing it.
This proves the user owns the password and has access to Nextcloud.
Security model:
- User identity is verified via BasicAuth against Nextcloud
- App password is encrypted before storage
- Only the user who owns the password can provision it
"""
import base64
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
return JSONResponse(
{"success": False, "error": "Missing user_id in path"},
status_code=400,
)
# Extract BasicAuth credentials
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
return JSONResponse(
{"success": False, "error": "Missing BasicAuth credentials"},
status_code=401,
)
try:
# Decode BasicAuth
encoded = auth_header.split(" ", 1)[1]
decoded = base64.b64decode(encoded).decode("utf-8")
username, app_password = decoded.split(":", 1)
except Exception:
return JSONResponse(
{"success": False, "error": "Invalid BasicAuth format"},
status_code=401,
)
# Verify username matches path user_id
if username != path_user_id:
logger.warning(
f"Username mismatch in app password provisioning: "
f"path={path_user_id}, auth={username}"
)
return JSONResponse(
{"success": False, "error": "Username does not match path user_id"},
status_code=403,
)
# Validate app password format (xxxxx-xxxxx-xxxxx-xxxxx-xxxxx)
import re
if not re.match(
r"^[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}$",
app_password,
):
return JSONResponse(
{"success": False, "error": "Invalid app password format"},
status_code=400,
)
# Get Nextcloud host from settings
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
nextcloud_host = settings.nextcloud_host
if not nextcloud_host:
logger.error("NEXTCLOUD_HOST not configured")
return JSONResponse(
{"success": False, "error": "Server not configured"},
status_code=500,
)
# Validate app password against Nextcloud
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Use OCS API to verify credentials
test_url = f"{nextcloud_host}/ocs/v1.php/cloud/user"
response = await client.get(
test_url,
auth=(username, app_password),
params={"format": "json"},
headers={"OCS-APIRequest": "true"},
)
if response.status_code != 200:
logger.warning(
f"App password validation failed for {username}: "
f"HTTP {response.status_code}"
)
return JSONResponse(
{"success": False, "error": "Invalid app password"},
status_code=401,
)
# Verify the user ID from response matches
data = response.json()
ocs_user_id = data.get("ocs", {}).get("data", {}).get("id")
if ocs_user_id != username:
logger.warning(
f"User ID mismatch: expected {username}, got {ocs_user_id}"
)
return JSONResponse(
{"success": False, "error": "User ID mismatch"},
status_code=403,
)
except httpx.RequestError as e:
logger.error(f"Failed to validate app password: {e}")
return JSONResponse(
{"success": False, "error": "Failed to validate credentials"},
status_code=500,
)
# Store the validated app password
try:
# Get storage from app state or create from env
storage = getattr(request.app.state, "storage", None)
if not storage:
# Multi-user BasicAuth mode may not have oauth_context
# Initialize storage from environment
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
await storage.store_app_password(username, app_password)
logger.info(f"Provisioned app password for user: {username}")
return JSONResponse(
{
"success": True,
"message": f"App password stored for {username}",
}
)
except Exception as e:
error_msg = _sanitize_error_for_client(e, "provision_app_password")
return JSONResponse(
{"success": False, "error": error_msg},
status_code=500,
)
async def get_app_password_status(request: Request) -> JSONResponse:
"""GET /api/v1/users/{user_id}/app-password - Check if user has provisioned app password.
Returns status of background sync access for multi-user BasicAuth mode.
Requires BasicAuth with the user's app password for authentication.
"""
import base64
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
return JSONResponse(
{"success": False, "error": "Missing user_id in path"},
status_code=400,
)
# Extract BasicAuth credentials
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
return JSONResponse(
{"success": False, "error": "Missing BasicAuth credentials"},
status_code=401,
)
try:
# Decode BasicAuth
encoded = auth_header.split(" ", 1)[1]
decoded = base64.b64decode(encoded).decode("utf-8")
username, _ = decoded.split(":", 1)
except Exception:
return JSONResponse(
{"success": False, "error": "Invalid BasicAuth format"},
status_code=401,
)
# Verify username matches path user_id
if username != path_user_id:
return JSONResponse(
{"success": False, "error": "Username does not match path user_id"},
status_code=403,
)
try:
# Get storage
storage = getattr(request.app.state, "storage", None)
if not storage:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# Check if app password exists
app_password = await storage.get_app_password(username)
return JSONResponse(
{
"success": True,
"user_id": username,
"has_app_password": app_password is not None,
}
)
except Exception as e:
error_msg = _sanitize_error_for_client(e, "get_app_password_status")
return JSONResponse(
{"success": False, "error": error_msg},
status_code=500,
)
async def delete_app_password(request: Request) -> JSONResponse:
"""DELETE /api/v1/users/{user_id}/app-password - Delete stored app password.
Removes the user's app password from MCP server storage.
Requires BasicAuth with the user's credentials.
"""
import base64
from nextcloud_mcp_server.config import get_settings
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
return JSONResponse(
{"success": False, "error": "Missing user_id in path"},
status_code=400,
)
# Extract BasicAuth credentials
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
return JSONResponse(
{"success": False, "error": "Missing BasicAuth credentials"},
status_code=401,
)
try:
# Decode BasicAuth
encoded = auth_header.split(" ", 1)[1]
decoded = base64.b64decode(encoded).decode("utf-8")
username, password = decoded.split(":", 1)
except Exception:
return JSONResponse(
{"success": False, "error": "Invalid BasicAuth format"},
status_code=401,
)
# Verify username matches path user_id
if username != path_user_id:
return JSONResponse(
{"success": False, "error": "Username does not match path user_id"},
status_code=403,
)
# Validate credentials against Nextcloud
settings = get_settings()
nextcloud_host = settings.nextcloud_host
try:
async with httpx.AsyncClient(timeout=10.0) as client:
test_url = f"{nextcloud_host}/ocs/v1.php/cloud/user"
response = await client.get(
test_url,
auth=(username, password),
params={"format": "json"},
headers={"OCS-APIRequest": "true"},
)
if response.status_code != 200:
return JSONResponse(
{"success": False, "error": "Invalid credentials"},
status_code=401,
)
except httpx.RequestError as e:
logger.error(f"Failed to validate credentials: {e}")
return JSONResponse(
{"success": False, "error": "Failed to validate credentials"},
status_code=500,
)
try:
# Get storage
storage = getattr(request.app.state, "storage", None)
if not storage:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# Delete app password
deleted = await storage.delete_app_password(username)
if deleted:
logger.info(f"Deleted app password for user: {username}")
return JSONResponse(
{
"success": True,
"message": f"App password deleted for {username}",
}
)
else:
return JSONResponse(
{
"success": True,
"message": "No app password found to delete",
}
)
except Exception as e:
error_msg = _sanitize_error_for_client(e, "delete_app_password")
return JSONResponse(
{"success": False, "error": error_msg},
status_code=500,
)
async def get_installed_apps(request: Request) -> JSONResponse:
"""GET /api/v1/apps - Get list of installed Nextcloud apps.