Merge pull request #473 from cbcoutinho/fix/multi-user-basicauth-app-password-storage

fix(auth): Store app passwords locally for multi-user BasicAuth background sync
This commit is contained in:
Chris Coutinho
2026-01-14 20:52:12 +01:00
committed by GitHub
15 changed files with 2141 additions and 200 deletions
@@ -0,0 +1,50 @@
"""Add app_passwords table for multi-user BasicAuth mode
This migration adds support for storing app passwords that are provisioned
via Astrolabe's personal settings. This enables background sync in
multi-user BasicAuth mode without requiring OAuth.
Revision ID: 002
Revises: 001
Create Date: 2026-01-13 12:00:00.000000
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "002"
down_revision = "001"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add app_passwords table for multi-user BasicAuth mode."""
# App passwords table for multi-user BasicAuth background sync
op.execute(
"""
CREATE TABLE IF NOT EXISTS app_passwords (
user_id TEXT PRIMARY KEY,
encrypted_password BLOB NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
# Index for efficient user lookups
op.execute(
"""
CREATE INDEX IF NOT EXISTS idx_app_passwords_updated
ON app_passwords(updated_at)
"""
)
def downgrade() -> None:
"""Drop app_passwords table."""
op.execute("DROP INDEX IF EXISTS idx_app_passwords_updated")
op.execute("DROP TABLE IF EXISTS app_passwords")
+407 -1
View File
@@ -10,12 +10,18 @@ All endpoints use OAuth bearer token authentication via UnifiedTokenVerifier.
The PHP app obtains tokens through PKCE flow and uses them to access these endpoints.
"""
import base64
import logging
import re
import time
from collections import defaultdict
from importlib.metadata import version
from typing import Any
from typing import TYPE_CHECKING, Any
import httpx
if TYPE_CHECKING:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from starlette.requests import Request
from starlette.responses import JSONResponse
@@ -25,6 +31,23 @@ logger = logging.getLogger(__name__)
# Get package version from metadata
__version__ = version("nextcloud-mcp-server")
# App password format regex (Nextcloud format: xxxxx-xxxxx-xxxxx-xxxxx-xxxxx)
APP_PASSWORD_PATTERN = re.compile(
r"^[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}$"
)
# Timeout for Nextcloud API validation requests (seconds)
NEXTCLOUD_VALIDATION_TIMEOUT = 10.0
# Rate limiting configuration for app password provisioning
# Limits: 5 attempts per user per hour
RATE_LIMIT_MAX_ATTEMPTS = 5
RATE_LIMIT_WINDOW_SECONDS = 3600 # 1 hour
# In-memory rate limiter storage
# Structure: {user_id: [(timestamp, success), ...]}
_rate_limit_attempts: dict[str, list[tuple[float, bool]]] = defaultdict(list)
# Track server start time for uptime calculation
_server_start_time = time.time()
@@ -181,6 +204,141 @@ def _validate_query_string(query: str, max_length: int = 10000) -> None:
raise ValueError(f"Query too long: maximum {max_length} characters")
async def _get_app_password_storage(request: Request) -> "RefreshTokenStorage":
"""Get or initialize RefreshTokenStorage for app password operations.
Checks app.state.storage first, then falls back to creating from environment.
This helper avoids repeated storage initialization logic across endpoints.
Args:
request: Starlette request with app state
Returns:
Initialized RefreshTokenStorage instance
"""
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = getattr(request.app.state, "storage", None)
if not storage:
# Multi-user BasicAuth mode may not have oauth_context
# Initialize storage from environment
storage = RefreshTokenStorage.from_env()
await storage.initialize()
return storage
def _check_rate_limit(user_id: str) -> tuple[bool, int]:
"""Check if user is rate limited for app password operations.
Implements a sliding window rate limiter to prevent brute-force attacks
on the app password provisioning endpoint.
Args:
user_id: User identifier to check
Returns:
Tuple of (is_allowed, seconds_until_retry)
- is_allowed: True if request should be allowed
- seconds_until_retry: Seconds to wait if rate limited (0 if allowed)
"""
current_time = time.time()
window_start = current_time - RATE_LIMIT_WINDOW_SECONDS
# Clean up old attempts outside the window
_rate_limit_attempts[user_id] = [
(ts, success)
for ts, success in _rate_limit_attempts[user_id]
if ts > window_start
]
# Count recent attempts (both successful and failed)
recent_attempts = len(_rate_limit_attempts[user_id])
if recent_attempts >= RATE_LIMIT_MAX_ATTEMPTS:
# Find when the oldest attempt in the window will expire
oldest_attempt = min(ts for ts, _ in _rate_limit_attempts[user_id])
seconds_until_retry = int(
oldest_attempt + RATE_LIMIT_WINDOW_SECONDS - current_time
)
return False, max(1, seconds_until_retry)
return True, 0
def _record_rate_limit_attempt(user_id: str, success: bool) -> None:
"""Record an app password provisioning attempt for rate limiting.
Args:
user_id: User identifier
success: Whether the attempt was successful
"""
_rate_limit_attempts[user_id].append((time.time(), success))
def _extract_basic_auth(
request: Request, path_user_id: str
) -> tuple[str, str, JSONResponse | None]:
"""Extract and validate BasicAuth credentials from request.
Validates:
1. Authorization header is present and valid BasicAuth format
2. Username in credentials matches the path user_id
Args:
request: Starlette request with Authorization header
path_user_id: User ID from the URL path to verify against
Returns:
Tuple of (username, password, error_response)
- If successful: (username, password, None)
- If failed: ("", "", JSONResponse with error)
"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
return (
"",
"",
JSONResponse(
{"success": False, "error": "Missing BasicAuth credentials"},
status_code=401,
),
)
try:
# Decode BasicAuth
encoded = auth_header.split(" ", 1)[1]
decoded = base64.b64decode(encoded).decode("utf-8")
username, password = decoded.split(":", 1)
except Exception:
return (
"",
"",
JSONResponse(
{"success": False, "error": "Invalid BasicAuth format"},
status_code=401,
),
)
# Verify username matches path user_id
if username != path_user_id:
logger.warning(
f"Username mismatch in app password operation for path user {path_user_id}"
)
return (
"",
"",
JSONResponse(
{"success": False, "error": "Username does not match path user_id"},
status_code=403,
),
)
return username, password, None
async def get_server_status(request: Request) -> JSONResponse:
"""GET /api/v1/status - Server status and version.
@@ -510,6 +668,254 @@ async def revoke_user_access(request: Request) -> JSONResponse:
)
async def provision_app_password(request: Request) -> JSONResponse:
"""POST /api/v1/users/{user_id}/app-password - Store app password for background sync.
This endpoint is used by Astrolabe (Nextcloud PHP app) to provision app passwords
for multi-user BasicAuth mode background sync.
The request must include BasicAuth credentials where:
- username: Nextcloud user ID (must match path user_id)
- password: The app password being provisioned
The MCP server validates the app password against Nextcloud before storing it.
This proves the user owns the password and has access to Nextcloud.
Security model:
- User identity is verified via BasicAuth against Nextcloud
- App password is encrypted before storage
- Only the user who owns the password can provision it
- Rate limited to prevent brute-force attacks
"""
from nextcloud_mcp_server.config import get_settings
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
return JSONResponse(
{"success": False, "error": "Missing user_id in path"},
status_code=400,
)
# Check rate limit before processing
is_allowed, retry_after = _check_rate_limit(path_user_id)
if not is_allowed:
logger.warning(
f"Rate limit exceeded for app password provisioning: {path_user_id}"
)
return JSONResponse(
{
"success": False,
"error": f"Rate limit exceeded. Try again in {retry_after} seconds.",
},
status_code=429,
headers={"Retry-After": str(retry_after)},
)
# Extract and validate BasicAuth credentials
username, app_password, error_response = _extract_basic_auth(request, path_user_id)
if error_response is not None:
_record_rate_limit_attempt(path_user_id, success=False)
return error_response
# Validate app password format
if not APP_PASSWORD_PATTERN.match(app_password):
_record_rate_limit_attempt(path_user_id, success=False)
return JSONResponse(
{"success": False, "error": "Invalid app password format"},
status_code=400,
)
# Get Nextcloud host from settings
settings = get_settings()
nextcloud_host = settings.nextcloud_host
if not nextcloud_host:
logger.error("NEXTCLOUD_HOST not configured")
return JSONResponse(
{"success": False, "error": "Server not configured"},
status_code=500,
)
# Validate app password against Nextcloud
try:
async with httpx.AsyncClient(timeout=NEXTCLOUD_VALIDATION_TIMEOUT) as client:
# Use OCS API to verify credentials
test_url = f"{nextcloud_host}/ocs/v1.php/cloud/user"
response = await client.get(
test_url,
auth=(username, app_password),
params={"format": "json"},
headers={"OCS-APIRequest": "true"},
)
if response.status_code != 200:
logger.warning(
f"App password validation failed for user: HTTP {response.status_code}"
)
_record_rate_limit_attempt(path_user_id, success=False)
return JSONResponse(
{"success": False, "error": "Invalid app password"},
status_code=401,
)
# Verify the user ID from response matches
data = response.json()
ocs_user_id = data.get("ocs", {}).get("data", {}).get("id")
if ocs_user_id != username:
logger.warning("User ID mismatch in OCS response")
_record_rate_limit_attempt(path_user_id, success=False)
return JSONResponse(
{"success": False, "error": "User ID mismatch"},
status_code=403,
)
except httpx.RequestError as e:
logger.error(f"Failed to validate app password: {e}")
return JSONResponse(
{"success": False, "error": "Failed to validate credentials"},
status_code=500,
)
# Store the validated app password
try:
storage = await _get_app_password_storage(request)
await storage.store_app_password(username, app_password)
_record_rate_limit_attempt(path_user_id, success=True)
logger.info(f"Provisioned app password for user: {username}")
return JSONResponse(
{
"success": True,
"message": f"App password stored for {username}",
}
)
except Exception as e:
error_msg = _sanitize_error_for_client(e, "provision_app_password")
return JSONResponse(
{"success": False, "error": error_msg},
status_code=500,
)
async def get_app_password_status(request: Request) -> JSONResponse:
"""GET /api/v1/users/{user_id}/app-password - Check if user has provisioned app password.
Returns status of background sync access for multi-user BasicAuth mode.
Requires BasicAuth with the user's app password for authentication.
"""
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
return JSONResponse(
{"success": False, "error": "Missing user_id in path"},
status_code=400,
)
# Extract and validate BasicAuth credentials
username, _, error_response = _extract_basic_auth(request, path_user_id)
if error_response is not None:
return error_response
try:
storage = await _get_app_password_storage(request)
app_password = await storage.get_app_password(username)
return JSONResponse(
{
"success": True,
"user_id": username,
"has_app_password": app_password is not None,
}
)
except Exception as e:
error_msg = _sanitize_error_for_client(e, "get_app_password_status")
return JSONResponse(
{"success": False, "error": error_msg},
status_code=500,
)
async def delete_app_password(request: Request) -> JSONResponse:
"""DELETE /api/v1/users/{user_id}/app-password - Delete stored app password.
Removes the user's app password from MCP server storage.
Requires BasicAuth with the user's credentials.
"""
from nextcloud_mcp_server.config import get_settings
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
return JSONResponse(
{"success": False, "error": "Missing user_id in path"},
status_code=400,
)
# Extract and validate BasicAuth credentials
username, password, error_response = _extract_basic_auth(request, path_user_id)
if error_response is not None:
return error_response
# Validate credentials against Nextcloud
settings = get_settings()
nextcloud_host = settings.nextcloud_host
try:
async with httpx.AsyncClient(timeout=NEXTCLOUD_VALIDATION_TIMEOUT) as client:
test_url = f"{nextcloud_host}/ocs/v1.php/cloud/user"
response = await client.get(
test_url,
auth=(username, password),
params={"format": "json"},
headers={"OCS-APIRequest": "true"},
)
if response.status_code != 200:
return JSONResponse(
{"success": False, "error": "Invalid credentials"},
status_code=401,
)
except httpx.RequestError as e:
logger.error(f"Failed to validate credentials: {e}")
return JSONResponse(
{"success": False, "error": "Failed to validate credentials"},
status_code=500,
)
try:
storage = await _get_app_password_storage(request)
deleted = await storage.delete_app_password(username)
if deleted:
logger.info(f"Deleted app password for user: {username}")
return JSONResponse(
{
"success": True,
"message": f"App password deleted for {username}",
}
)
else:
return JSONResponse(
{
"success": True,
"message": "No app password found to delete",
}
)
except Exception as e:
error_msg = _sanitize_error_for_client(e, "delete_app_password")
return JSONResponse(
{"success": False, "error": error_msg},
status_code=500,
)
async def get_installed_apps(request: Request) -> JSONResponse:
"""GET /api/v1/apps - Get list of installed Nextcloud apps.
+30 -4
View File
@@ -2012,7 +2012,7 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
checks["auth_mode"] = "multi_user_basic"
checks["auth_configured"] = "ok"
# Indicate if app passwords are supported (when offline_access enabled)
checks["supports_app_passwords"] = settings.enable_offline_access
checks["supports_app_passwords"] = get_settings().enable_offline_access
elif mode == AuthMode.SINGLE_USER_BASIC:
username = os.getenv("NEXTCLOUD_USERNAME")
password = os.getenv("NEXTCLOUD_PASSWORD")
@@ -2029,9 +2029,9 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Check Qdrant status if using network mode (external Qdrant service)
# In-memory and persistent modes use embedded Qdrant, no external service to check
vector_sync_enabled = (
os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
)
# Note: get_settings() supports both ENABLE_SEMANTIC_SEARCH and VECTOR_SYNC_ENABLED
settings = get_settings()
vector_sync_enabled = settings.vector_sync_enabled
qdrant_url = os.getenv("QDRANT_URL") # Only set in network mode
if vector_sync_enabled and qdrant_url:
@@ -2114,13 +2114,16 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
if enable_management_apis:
from nextcloud_mcp_server.api.management import (
create_webhook,
delete_app_password,
delete_webhook,
get_app_password_status,
get_chunk_context,
get_installed_apps,
get_server_status,
get_user_session,
get_vector_sync_status,
list_webhooks,
provision_app_password,
revoke_user_access,
unified_search,
vector_search,
@@ -2148,6 +2151,28 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
methods=["POST"],
)
)
# App password endpoints for multi-user BasicAuth mode
routes.append(
Route(
"/api/v1/users/{user_id}/app-password",
provision_app_password,
methods=["POST"],
)
)
routes.append(
Route(
"/api/v1/users/{user_id}/app-password",
get_app_password_status,
methods=["GET"],
)
)
routes.append(
Route(
"/api/v1/users/{user_id}/app-password",
delete_app_password,
methods=["DELETE"],
)
)
routes.append(
Route("/api/v1/vector-viz/search", vector_search, methods=["POST"])
)
@@ -2166,6 +2191,7 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
logger.info(
"Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, "
"/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, "
"/api/v1/users/{user_id}/app-password, "
"/api/v1/vector-viz/search, /api/v1/search, /api/v1/apps, "
"/api/v1/webhooks"
)
+174
View File
@@ -1240,6 +1240,180 @@ class RefreshTokenStorage:
return deleted
# ============================================================================
# App Password Storage (multi-user BasicAuth mode)
# ============================================================================
async def store_app_password(
self,
user_id: str,
app_password: str,
) -> None:
"""
Store encrypted app password for background sync (multi-user BasicAuth mode).
Args:
user_id: Nextcloud user ID
app_password: Nextcloud app password to store
"""
if not self._initialized:
await self.initialize()
if not self.cipher:
raise RuntimeError(
"Encryption key not configured. "
"Set TOKEN_ENCRYPTION_KEY for app password storage."
)
encrypted_password = self.cipher.encrypt(app_password.encode())
now = int(time.time())
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO app_passwords
(user_id, encrypted_password, created_at, updated_at)
VALUES (
?,
?,
COALESCE((SELECT created_at FROM app_passwords WHERE user_id = ?), ?),
?
)
""",
(user_id, encrypted_password, user_id, now, now),
)
await db.commit()
duration = time.time() - start_time
record_db_operation("sqlite", "insert", duration, "success")
logger.info(f"Stored app password for user {user_id}")
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "insert", duration, "error")
raise
# Audit log
await self._audit_log(
event="store_app_password",
user_id=user_id,
auth_method="app_password",
)
async def get_app_password(self, user_id: str) -> Optional[str]:
"""
Retrieve and decrypt app password for a user.
Args:
user_id: Nextcloud user ID
Returns:
Decrypted app password, or None if not found
"""
if not self._initialized:
await self.initialize()
if not self.cipher:
raise RuntimeError(
"Encryption key not configured. "
"Set TOKEN_ENCRYPTION_KEY for app password retrieval."
)
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT encrypted_password FROM app_passwords WHERE user_id = ?",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(f"No app password found for user {user_id}")
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return None
encrypted_password = row[0]
decrypted_password = self.cipher.decrypt(encrypted_password).decode()
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
logger.debug(f"Retrieved app password for user {user_id}")
return decrypted_password
except Exception as e:
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "error")
logger.error(f"Failed to decrypt app password for user {user_id}: {e}")
return None
async def delete_app_password(self, user_id: str) -> bool:
"""
Delete app password for a user.
Args:
user_id: Nextcloud user ID
Returns:
True if password was deleted, False if not found
"""
if not self._initialized:
await self.initialize()
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM app_passwords WHERE user_id = ?",
(user_id,),
)
await db.commit()
deleted = cursor.rowcount > 0
duration = time.time() - start_time
record_db_operation("sqlite", "delete", duration, "success")
if deleted:
logger.info(f"Deleted app password for user {user_id}")
await self._audit_log(
event="delete_app_password",
user_id=user_id,
auth_method="app_password",
)
else:
logger.debug(f"No app password to delete for user {user_id}")
return deleted
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "delete", duration, "error")
raise
async def get_all_app_password_user_ids(self) -> list[str]:
"""
Get list of all user IDs with stored app passwords.
Returns:
List of user IDs
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"SELECT user_id FROM app_passwords ORDER BY updated_at DESC"
) as cursor:
rows = await cursor.fetchall()
user_ids = [row[0] for row in rows]
logger.debug(f"Found {len(user_ids)} users with app passwords")
return user_ids
async def generate_encryption_key() -> str:
"""
+7 -6
View File
@@ -20,6 +20,7 @@ from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
logger = logging.getLogger(__name__)
@@ -106,9 +107,9 @@ async def _get_processing_status(request: Request) -> dict[str, Any] | None:
"status": str, # "syncing" or "idle"
}
"""
# Check if vector sync is enabled
vector_sync_enabled = os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
if not vector_sync_enabled:
# Check if vector sync is enabled (supports both old and new env var names)
settings = get_settings()
if not settings.vector_sync_enabled:
return None
try:
@@ -127,10 +128,8 @@ async def _get_processing_status(request: Request) -> dict[str, Any] | None:
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
# Count documents in collection
@@ -634,7 +633,9 @@ async def user_info_html(request: Request) -> HTMLResponse:
"""
# Check if vector sync is enabled (needed for Welcome tab)
vector_sync_enabled = os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
# Note: get_settings() supports both ENABLE_SEMANTIC_SEARCH and VECTOR_SYNC_ENABLED
settings = get_settings()
vector_sync_enabled = settings.vector_sync_enabled
# Render template
template = _jinja_env.get_template("user_info.html")
+6 -2
View File
@@ -637,7 +637,9 @@ def configure_deck_tools(mcp: FastMCP):
@mcp.tool(
title="Remove Label from Deck Card",
annotations=ToolAnnotations(idempotentHint=False, openWorldHint=True),
annotations=ToolAnnotations(
destructiveHint=True, idempotentHint=True, openWorldHint=True
),
)
@require_scopes("deck:write")
@instrument_tool
@@ -692,7 +694,9 @@ def configure_deck_tools(mcp: FastMCP):
@mcp.tool(
title="Unassign User from Deck Card",
annotations=ToolAnnotations(idempotentHint=False, openWorldHint=True),
annotations=ToolAnnotations(
destructiveHint=True, idempotentHint=True, openWorldHint=True
),
)
@require_scopes("deck:write")
@instrument_tool
+4 -6
View File
@@ -1,7 +1,6 @@
"""Semantic search MCP tools using vector database."""
import logging
import os
import anyio
from httpx import RequestError
@@ -658,12 +657,11 @@ def configure_semantic_tools(mcp: FastMCP):
after creating or updating content across all indexed apps.
"""
# Check if vector sync is enabled
vector_sync_enabled = (
os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
)
# Check if vector sync is enabled (supports both old and new env var names)
from nextcloud_mcp_server.config import get_settings
if not vector_sync_enabled:
settings = get_settings()
if not settings.vector_sync_enabled:
return VectorSyncStatusResponse(
indexed_count=0,
pending_count=0,
+22 -19
View File
@@ -8,8 +8,8 @@ Manages background vector sync for multi-user deployments:
Authentication strategies are mutually exclusive by deployment mode:
Multi-user BasicAuth mode (ENABLE_MULTI_USER_BASIC_AUTH=true):
- Uses app passwords obtained via Astrolabe Management API
- Users provision via Astrolabe personal settings
- Uses app passwords stored locally in MCP server's database
- Users provision via Astrolabe personal settings, which sends to MCP API
- OAuth is NOT used
OAuth mode (with external IdP like Keycloak):
@@ -33,7 +33,6 @@ from anyio.streams.memory import (
)
from httpx import BasicAuth
from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.scanner import DocumentTask, scan_user_documents
@@ -71,15 +70,18 @@ class UserSyncState:
async def get_user_client_basic_auth(
user_id: str,
nextcloud_host: str,
storage: "RefreshTokenStorage | None" = None,
) -> NextcloudClient:
"""Get an authenticated NextcloudClient using app password (BasicAuth mode).
For multi-user BasicAuth deployments where users provision app passwords
via Astrolabe personal settings. OAuth is NOT used in this mode.
via Astrolabe personal settings. The app password is stored locally in the
MCP server's database after being provisioned through the management API.
Args:
user_id: User identifier
nextcloud_host: Nextcloud base URL
storage: Optional RefreshTokenStorage instance (created from env if not provided)
Returns:
Authenticated NextcloudClient with BasicAuth
@@ -87,21 +89,15 @@ async def get_user_client_basic_auth(
Raises:
NotProvisionedError: If user has not provisioned an app password
"""
settings = get_settings()
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
if not settings.oidc_client_id or not settings.oidc_client_secret:
raise NotProvisionedError(
"Astrolabe client credentials not configured. "
"Set OIDC_CLIENT_ID and OIDC_CLIENT_SECRET for app password retrieval."
)
# Get or create storage instance
if storage is None:
storage = RefreshTokenStorage.from_env()
await storage.initialize()
astrolabe = AstrolabeClient(
nextcloud_host=nextcloud_host,
client_id=settings.oidc_client_id,
client_secret=settings.oidc_client_secret,
)
app_password = await astrolabe.get_user_app_password(user_id)
# Retrieve app password from local storage
app_password = await storage.get_app_password(user_id)
if not app_password:
raise NotProvisionedError(
@@ -419,8 +415,15 @@ async def user_manager_task(
while not shutdown_event.is_set():
try:
# Get current provisioned users
provisioned_users = set(await refresh_token_storage.get_all_user_ids())
# Get current provisioned users based on mode
if use_basic_auth:
# BasicAuth mode: query app_passwords table
provisioned_users = set(
await refresh_token_storage.get_all_app_password_user_ids()
)
else:
# OAuth mode: query refresh_tokens table
provisioned_users = set(await refresh_token_storage.get_all_user_ids())
active_users = set(user_states.keys())
# Start scanners for new users