diff --git a/Dockerfile b/Dockerfile index 045ef68..f651b87 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,9 @@ FROM ghcr.io/astral-sh/uv:0.9.7-python3.11-alpine@sha256:0006b77df7ebf46e68959fdc8d3af9d19f1adfae8c2e7e77907ad257e5d05be4 -# Install git (required for caldav dependency from git) -RUN apk add --no-cache git +# Install dependencies +# 1. git (required for caldav dependency from git) +# 2. sqlite for development with token db +RUN apk add --no-cache git sqlite WORKDIR /app diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 462bbab..9fedef5 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -15,6 +15,7 @@ from mcp.server.auth.settings import AuthSettings from mcp.server.fastmcp import Context, FastMCP from pydantic import AnyHttpUrl from starlette.applications import Starlette +from starlette.middleware.authentication import AuthenticationMiddleware from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse from starlette.routing import Mount, Route @@ -295,31 +296,19 @@ async def load_oauth_client_credentials( if registration_endpoint: logger.info("Dynamic client registration available") mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") - redirect_uris = [f"{mcp_server_url}/oauth/callback"] + redirect_uris = [ + f"{mcp_server_url}/oauth/callback", # MCP OAuth flow + f"{mcp_server_url}/oauth/login-callback", # Browser OAuth flow for /user/page + ] - # Get scopes from environment or use defaults - # Note: Client registration happens BEFORE tools are registered, so we can't - # dynamically discover scopes here. These scopes define the "maximum allowed" - # scopes for this OAuth client. The actual per-tool scope enforcement happens - # via @require_scopes decorators, and the PRM endpoint advertises the actual - # supported scopes dynamically. + # MCP server DCR: Only request basic OIDC scopes for the server's own authentication + # Note: Nextcloud app scopes (notes:read, calendar:write, etc.) are for MCP *clients* + # that request access tokens. The MCP server itself only needs to authenticate + # as a client application, not request any Nextcloud resource access. # - # IMPORTANT: Keep this list in sync with all @require_scopes decorators - # when adding new apps, or set NEXTCLOUD_OIDC_SCOPES environment variable - # to override. - default_scopes = ( - "openid profile email " - "notes:read notes:write " - "calendar:read calendar:write " - "todo:read todo:write " - "contacts:read contacts:write " - "cookbook:read cookbook:write " - "deck:read deck:write " - "tables:read tables:write " - "files:read files:write " - "sharing:read sharing:write" - ) - scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", default_scopes) + # The PRM endpoint will advertise the full list of supported scopes dynamically + # by discovering all @require_scopes decorators on registered tools. + dcr_scopes = "openid profile email" # Add offline_access scope if refresh tokens are enabled enable_offline_access = os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() in ( @@ -327,11 +316,11 @@ async def load_oauth_client_credentials( "1", "yes", ) - if enable_offline_access and "offline_access" not in scopes: - scopes = f"{scopes} offline_access" + if enable_offline_access: + dcr_scopes = f"{dcr_scopes} offline_access" logger.info("✓ offline_access scope enabled for refresh tokens") - logger.info(f"Requesting OAuth scopes: {scopes}") + logger.info(f"MCP server DCR scopes: {dcr_scopes}") # Get token type from environment (Bearer or jwt) # Note: Must be lowercase "jwt" to match OIDC app's check @@ -354,7 +343,7 @@ async def load_oauth_client_credentials( storage=storage, client_name=f"Nextcloud MCP Server ({token_type})", redirect_uris=redirect_uris, - scopes=scopes, + scopes=dcr_scopes, # Use DCR-specific scopes (basic OIDC only) token_type=token_type, ) @@ -892,6 +881,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): app.state.oauth_context = { "storage": refresh_token_storage, "oauth_client": oauth_client, + "token_verifier": token_verifier, # For querying IdP userinfo endpoint "config": { "mcp_server_url": mcp_server_url, "discovery_url": discovery_url, @@ -1045,9 +1035,55 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): "OAuth login routes enabled: /oauth/authorize, /oauth/callback, /oauth/token" ) + # Add browser OAuth login routes (OAuth mode only) + if oauth_enabled: + from nextcloud_mcp_server.auth.browser_oauth_routes import ( + oauth_login, + oauth_login_callback, + oauth_logout, + ) + + routes.append( + Route("/oauth/login", oauth_login, methods=["GET"], name="oauth_login") + ) + routes.append( + Route( + "/oauth/login-callback", + oauth_login_callback, + methods=["GET"], + name="oauth_login_callback", + ) + ) + routes.append( + Route("/oauth/logout", oauth_logout, methods=["GET"], name="oauth_logout") + ) + logger.info( + "Browser OAuth routes enabled: /oauth/login, /oauth/login-callback, /oauth/logout" + ) + + # Add user info routes (available in both BasicAuth and OAuth modes) + from nextcloud_mcp_server.auth.userinfo_routes import ( + user_info_html, + user_info_json, + ) + + routes.append(Route("/user", user_info_json, methods=["GET"])) + routes.append(Route("/user/page", user_info_html, methods=["GET"])) + logger.info("User info routes enabled: /user (JSON), /user/page (HTML)") + routes.append(Mount("/", app=mcp_app)) app = Starlette(routes=routes, lifespan=starlette_lifespan) + # Add authentication middleware for browser-based routes + from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend + + # SessionAuthBackend will look up oauth_context from app.state at runtime + app.add_middleware( + AuthenticationMiddleware, + backend=SessionAuthBackend(oauth_enabled=oauth_enabled), + ) + logger.info("Authentication middleware enabled for browser routes") + # Add CORS middleware to allow browser-based clients like MCP Inspector app.add_middleware( CORSMiddleware, diff --git a/nextcloud_mcp_server/auth/browser_oauth_routes.py b/nextcloud_mcp_server/auth/browser_oauth_routes.py new file mode 100644 index 0000000..1f932f8 --- /dev/null +++ b/nextcloud_mcp_server/auth/browser_oauth_routes.py @@ -0,0 +1,358 @@ +"""Browser-based OAuth login routes for admin UI. + +Separate from MCP OAuth flow - these routes establish browser sessions +for accessing admin UI endpoints like /user/page. +""" + +import logging +import os +import secrets +from urllib.parse import urlencode + +import httpx +import jwt +from starlette.requests import Request +from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse + +logger = logging.getLogger(__name__) + + +async def oauth_login(request: Request) -> RedirectResponse | JSONResponse: + """Browser OAuth login endpoint - redirects to IdP for authentication. + + This is separate from the MCP OAuth flow (/oauth/authorize). + Creates a browser session with refresh token for admin UI access. + + Query parameters: + next: Optional URL to redirect to after login (default: /user/page) + + Returns: + 302 redirect to IdP authorization endpoint + """ + oauth_ctx = request.app.state.oauth_context + if not oauth_ctx: + # BasicAuth mode - no login needed, redirect to user page + return RedirectResponse("/user/page", status_code=302) + + storage = oauth_ctx["storage"] + oauth_client = oauth_ctx["oauth_client"] + oauth_config = oauth_ctx["config"] + + # Debug: Log oauth_config contents + logger.info(f"oauth_login called - oauth_config keys: {oauth_config.keys()}") + logger.info(f"oauth_login called - client_id: {oauth_config.get('client_id')}") + logger.info(f"oauth_login called - oauth_client: {oauth_client is not None}") + + # Generate state for CSRF protection + state = secrets.token_urlsafe(32) + + # Build OAuth authorization URL + mcp_server_url = oauth_config["mcp_server_url"] + callback_uri = f"{mcp_server_url}/oauth/login-callback" + + # Request only basic OIDC scopes for browser session + # Note: Nextcloud app scopes (notes:read, etc.) are for MCP client access tokens, + # not for the MCP server's own browser authentication + scopes = "openid profile email offline_access" + + code_challenge = "" + code_verifier = "" + + if oauth_client: + # External IdP mode (Keycloak) + # Keycloak requires PKCE, so generate code_verifier and code_challenge + if not oauth_client.authorization_endpoint: + await oauth_client.discover() + + # Generate PKCE values + code_verifier, code_challenge = oauth_client.generate_pkce_challenge() + + # Store code_verifier temporarily (using state as key) + # We'll retrieve it in the callback using the state parameter + await storage.store_oauth_session( + session_id=state, # Use state as session ID + client_id="browser-ui", + client_redirect_uri="/user/page", + state=state, + code_challenge=code_challenge, + code_challenge_method="S256", + mcp_authorization_code=code_verifier, # Store code_verifier here temporarily + flow_type="browser", + ttl_seconds=600, # 10 minutes + ) + + idp_params = { + "client_id": oauth_client.client_id, + "redirect_uri": callback_uri, + "response_type": "code", + "scope": scopes, + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "prompt": "consent", # Ensure refresh token + } + + auth_url = f"{oauth_client.authorization_endpoint}?{urlencode(idp_params)}" + logger.info(f"Redirecting to external IdP login: {auth_url.split('?')[0]}") + else: + # Integrated mode (Nextcloud OIDC) + discovery_url = oauth_config.get("discovery_url") + if not discovery_url: + return JSONResponse( + { + "error": "server_error", + "error_description": "OAuth discovery URL not configured", + }, + status_code=500, + ) + + # Fetch authorization endpoint + async with httpx.AsyncClient() as http_client: + response = await http_client.get(discovery_url) + response.raise_for_status() + discovery = response.json() + authorization_endpoint = discovery["authorization_endpoint"] + + # Replace internal Docker hostname with public URL + public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") + if public_issuer: + from urllib.parse import urlparse as parse_url + + internal_parsed = parse_url(oauth_config["nextcloud_host"]) + auth_parsed = parse_url(authorization_endpoint) + + if auth_parsed.hostname == internal_parsed.hostname: + public_parsed = parse_url(public_issuer) + authorization_endpoint = ( + f"{public_parsed.scheme}://{public_parsed.netloc}{auth_parsed.path}" + ) + + idp_params = { + "client_id": oauth_config["client_id"], + "redirect_uri": callback_uri, + "response_type": "code", + "scope": scopes, + "state": state, + "prompt": "consent", # Ensure refresh token + } + + # Debug: Log full parameters + logger.info(f"Building Nextcloud OIDC auth URL with params: {idp_params}") + + auth_url = f"{authorization_endpoint}?{urlencode(idp_params)}" + logger.info(f"Redirecting to Nextcloud OIDC login: {auth_url}") + + return RedirectResponse(auth_url, status_code=302) + + +async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLResponse: + """Browser OAuth callback - IdP redirects here after authentication. + + Exchanges authorization code for tokens, stores refresh token, + sets session cookie, and redirects to original destination. + + Query parameters: + code: Authorization code from IdP + state: State parameter + error: Error code (if authorization failed) + + Returns: + 302 redirect to next URL with session cookie + """ + # Check for errors + error = request.query_params.get("error") + if error: + error_description = request.query_params.get( + "error_description", "Authorization failed" + ) + logger.error(f"OAuth login error: {error} - {error_description}") + login_url = str(request.url_for("oauth_login")) + return HTMLResponse( + f""" + + +
Error: {error}
+{error_description}
+ + + + """, + status_code=400, + ) + + # Extract code and state + code = request.query_params.get("code") + state = request.query_params.get("state") + + if not code or not state: + return HTMLResponse( + """ + + +Missing code or state parameter
+ + + """, + status_code=400, + ) + + # Get OAuth context + oauth_ctx = request.app.state.oauth_context + storage = oauth_ctx["storage"] + oauth_client = oauth_ctx["oauth_client"] + oauth_config = oauth_ctx["config"] + + # Retrieve code_verifier from session storage (if using PKCE) + code_verifier = "" + if oauth_client: + # For Keycloak (external IdP), we stored the code_verifier in the session + oauth_session = await storage.get_oauth_session(state) + if oauth_session: + # code_verifier was stored in mcp_authorization_code field + code_verifier = oauth_session.get("mcp_authorization_code", "") + # Clean up the temporary session + # Note: We don't have delete_oauth_session method, but it will expire after TTL + + # Exchange authorization code for tokens + mcp_server_url = oauth_config["mcp_server_url"] + callback_uri = f"{mcp_server_url}/oauth/login-callback" + + try: + if oauth_client: + # External IdP mode (Keycloak) + # Use PKCE if we have a code_verifier + if not oauth_client.token_endpoint: + await oauth_client.discover() + + token_params = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": callback_uri, + "client_id": oauth_client.client_id, + "client_secret": oauth_client.client_secret, + } + + # Add code_verifier if we have one (PKCE) + if code_verifier: + token_params["code_verifier"] = code_verifier + + async with httpx.AsyncClient() as http_client: + response = await http_client.post( + oauth_client.token_endpoint, + data=token_params, + ) + response.raise_for_status() + token_data = response.json() + else: + # Integrated mode (Nextcloud OIDC) + discovery_url = oauth_config.get("discovery_url") + async with httpx.AsyncClient() as http_client: + response = await http_client.get(discovery_url) + response.raise_for_status() + discovery = response.json() + token_endpoint = discovery["token_endpoint"] + + async with httpx.AsyncClient() as http_client: + response = await http_client.post( + token_endpoint, + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": callback_uri, + "client_id": oauth_config["client_id"], + "client_secret": oauth_config["client_secret"], + }, + ) + response.raise_for_status() + token_data = response.json() + + except Exception as e: + logger.error(f"Token exchange failed: {e}") + return HTMLResponse( + f""" + + +Failed to exchange authorization code for tokens
+Error: {e}
+ + + """, + status_code=500, + ) + + refresh_token = token_data.get("refresh_token") + id_token = token_data.get("id_token") + + logger.info(f"Token exchange response keys: {token_data.keys()}") + logger.info(f"Refresh token present: {refresh_token is not None}") + logger.info(f"ID token present: {id_token is not None}") + + # Decode ID token to get user info + try: + userinfo = jwt.decode(id_token, options={"verify_signature": False}) + user_id = userinfo.get("sub") + username = userinfo.get("preferred_username") or userinfo.get("email") + logger.info(f"Browser login successful: {username} (sub={user_id})") + except Exception as e: + logger.warning(f"Failed to decode ID token: {e}") + user_id = f"user-{secrets.token_hex(8)}" + username = "unknown" + + # Store refresh token + if refresh_token: + logger.info(f"Storing refresh token for user_id: {user_id}") + await storage.store_refresh_token( + user_id=user_id, + refresh_token=refresh_token, + expires_at=None, + flow_type="browser", # Browser-based login flow + ) + logger.info(f"✓ Refresh token stored successfully for user_id: {user_id}") + else: + logger.warning("No refresh token in token response - cannot store session") + + # Create response and set session cookie + response = RedirectResponse("/user/page", status_code=302) + response.set_cookie( + key="mcp_session", + value=user_id, + max_age=86400 * 30, # 30 days + httponly=True, + secure=False, # Set to True in production with HTTPS + samesite="lax", + ) + + logger.info(f"Session cookie set for user: {username}") + return response + + +async def oauth_logout(request: Request) -> RedirectResponse: + """Browser OAuth logout - clears session cookie. + + Query parameters: + next: Optional URL to redirect to after logout (default: /oauth/login) + + Returns: + 302 redirect with cleared session cookie + """ + next_url = request.query_params.get("next", "/oauth/login") + + # TODO: Optionally revoke refresh token from storage + # session_id = request.cookies.get("mcp_session") + # if session_id: + # await storage.delete_refresh_token(session_id) + + response = RedirectResponse(next_url, status_code=302) + response.delete_cookie("mcp_session") + + logger.info("User logged out, session cookie cleared") + return response diff --git a/nextcloud_mcp_server/auth/session_backend.py b/nextcloud_mcp_server/auth/session_backend.py new file mode 100644 index 0000000..f702ee0 --- /dev/null +++ b/nextcloud_mcp_server/auth/session_backend.py @@ -0,0 +1,92 @@ +"""Session-based authentication backend for Starlette routes. + +Provides browser-based authentication for admin UI routes, separate from +MCP's OAuth authentication flow. +""" + +import logging +import os + +from starlette.authentication import ( + AuthCredentials, + AuthenticationBackend, + SimpleUser, +) +from starlette.requests import HTTPConnection + +logger = logging.getLogger(__name__) + + +class SessionAuthBackend(AuthenticationBackend): + """Authentication backend using signed session cookies. + + For BasicAuth mode: Always authenticates as the configured user. + For OAuth mode: Checks for valid session cookie with stored refresh token. + """ + + def __init__(self, oauth_enabled: bool = False): + """Initialize session authentication backend. + + Args: + oauth_enabled: Whether OAuth mode is enabled + """ + self.oauth_enabled = oauth_enabled + + async def authenticate( + self, conn: HTTPConnection + ) -> tuple[AuthCredentials, SimpleUser] | None: + """Authenticate the request based on session cookie or BasicAuth mode. + + Args: + conn: HTTP connection + + Returns: + Tuple of (credentials, user) if authenticated, None otherwise + """ + # BasicAuth mode: Always authenticated as the configured user + if not self.oauth_enabled: + username = os.getenv("NEXTCLOUD_USERNAME", "admin") + return AuthCredentials(["authenticated", "admin"]), SimpleUser(username) + + # OAuth mode: Check for session cookie + session_id = conn.cookies.get("mcp_session") + logger.info( + f"Session authentication check - cookie present: {session_id is not None}, path: {conn.url.path}" + ) + if not session_id: + logger.info("No session cookie found - redirecting to login") + return None + + logger.info(f"Found session cookie: {session_id[:16]}...") + + # Get OAuth context from app state + oauth_context = getattr(conn.app.state, "oauth_context", None) + if not oauth_context: + logger.warning("OAuth context not available in app state") + return None + + # Validate session + storage = oauth_context.get("storage") + if not storage: + logger.warning("OAuth storage not available") + return None + + try: + # Check if user has refresh token (indicates logged-in session) + logger.info(f"Looking up refresh token for session: {session_id[:16]}...") + token_data = await storage.get_refresh_token(session_id) + if not token_data: + logger.warning( + f"No refresh token found for session {session_id[:16]}..." + ) + return None + + # Session is valid - use session_id (which is user_id from ID token) as username + username = session_id + logger.info(f"✓ Session authenticated successfully: {username[:16]}...") + + return AuthCredentials(["authenticated"]), SimpleUser(username) + + except Exception as e: + logger.warning(f"Session validation error: {e}") + return None diff --git a/nextcloud_mcp_server/auth/userinfo_routes.py b/nextcloud_mcp_server/auth/userinfo_routes.py new file mode 100644 index 0000000..5582606 --- /dev/null +++ b/nextcloud_mcp_server/auth/userinfo_routes.py @@ -0,0 +1,442 @@ +"""User info routes for the MCP server admin UI. + +Provides browser-based endpoints to view information about the currently +authenticated user. Uses session-based authentication with OAuth flow. + +For BasicAuth mode: Shows configured user info (no login needed). +For OAuth mode: Requires browser-based OAuth login to establish session. +""" + +import logging +import os +from typing import Any + +import httpx +from starlette.authentication import requires +from starlette.requests import Request +from starlette.responses import HTMLResponse, JSONResponse + +logger = logging.getLogger(__name__) + + +async def _query_idp_userinfo( + access_token_str: str, userinfo_uri: str +) -> dict[str, Any] | None: + """Query the IdP's userinfo endpoint. + + Args: + access_token_str: The access token string + userinfo_uri: The userinfo endpoint URI + + Returns: + User info dictionary from IdP, or None if query fails + """ + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.get( + userinfo_uri, + headers={"Authorization": f"Bearer {access_token_str}"}, + ) + response.raise_for_status() + return response.json() + except Exception as e: + logger.warning(f"Failed to query IdP userinfo endpoint: {e}") + return None + + +async def _get_user_info(request: Request) -> dict[str, Any]: + """Get user information for the currently authenticated user. + + Args: + request: Starlette request object (must be authenticated) + + Returns: + Dictionary containing user information + """ + username = request.user.display_name + oauth_ctx = getattr(request.app.state, "oauth_context", None) + + # BasicAuth mode + if not oauth_ctx: + return { + "username": username, + "auth_mode": "basic", + "nextcloud_host": os.getenv("NEXTCLOUD_HOST", "unknown"), + } + + # OAuth mode - get user's refresh token and current access token + storage = oauth_ctx.get("storage") + session_id = request.cookies.get("mcp_session") + + if not storage or not session_id: + return { + "error": "Session not found", + "username": username, + "auth_mode": "oauth", + } + + try: + # Get refresh token data + token_data = await storage.get_refresh_token(session_id) + if not token_data: + return { + "error": "No refresh token found", + "username": username, + "auth_mode": "oauth", + } + + refresh_token = token_data.get("refresh_token") + + # Exchange refresh token for fresh access token + oauth_client = oauth_ctx.get("oauth_client") + oauth_config = oauth_ctx.get("config") + + if oauth_client: + # External IdP mode (Keycloak) + # Create fresh HTTP client to avoid event loop issues + if not oauth_client.token_endpoint: + await oauth_client.discover() + + async with httpx.AsyncClient(timeout=30.0) as http_client: + response = await http_client.post( + oauth_client.token_endpoint, + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + }, + auth=(oauth_client.client_id, oauth_client.client_secret), + ) + response.raise_for_status() + token_response = response.json() + access_token = token_response["access_token"] + else: + # Integrated mode (Nextcloud OIDC) + # Note: This is server-side code, so we use internal Docker hostnames + # (not public URLs) for server-to-server communication + discovery_url = oauth_config.get("discovery_url") + logger.info(f"Querying discovery URL: {discovery_url}") + + async with httpx.AsyncClient() as http_client: + response = await http_client.get(discovery_url) + response.raise_for_status() + discovery = response.json() + token_endpoint = discovery["token_endpoint"] + logger.info( + f"Using token endpoint for server-side refresh: {token_endpoint}" + ) + + async with httpx.AsyncClient() as http_client: + response = await http_client.post( + token_endpoint, + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": oauth_config["client_id"], + "client_secret": oauth_config["client_secret"], + }, + ) + response.raise_for_status() + token_response = response.json() + access_token = token_response["access_token"] + + # Build basic user context + user_context = { + "username": username, # From request.user.display_name + "auth_mode": "oauth", + "session_id": session_id[:16] + "...", # Truncated for security + } + + # Query IdP userinfo for enhanced profile + token_verifier = oauth_ctx.get("token_verifier") + if token_verifier and hasattr(token_verifier, "userinfo_uri"): + idp_profile = await _query_idp_userinfo( + access_token, token_verifier.userinfo_uri + ) + if idp_profile: + user_context["idp_profile"] = idp_profile + else: + user_context["idp_profile_error"] = ( + "Failed to retrieve profile from IdP" + ) + + return user_context + + except Exception as e: + import traceback + + logger.error(f"Error retrieving user info: {e}") + logger.error(f"Traceback: {traceback.format_exc()}") + return { + "error": f"Failed to retrieve user info: {e}", + "username": username, + "auth_mode": "oauth", + } + + +@requires("authenticated", redirect="oauth_login") +async def user_info_json(request: Request) -> JSONResponse: + """User info endpoint - returns JSON with current user information. + + Requires authentication via session cookie (redirects to oauth_login route if not authenticated). + + Args: + request: Starlette request object + + Returns: + JSON response with user information + """ + user_info = await _get_user_info(request) + return JSONResponse(user_info) + + +@requires("authenticated", redirect="oauth_login") +async def user_info_html(request: Request) -> HTMLResponse: + """User info page - returns HTML with current user information. + + Requires authentication via session cookie (redirects to oauth_login route if not authenticated). + + Args: + request: Starlette request object + + Returns: + HTML response with formatted user information + """ + user_context = await _get_user_info(request) + + # Check for error + if "error" in user_context and user_context["error"] != "": + # Get login URL dynamically + oauth_ctx = getattr(request.app.state, "oauth_context", None) + login_url = str(request.url_for("oauth_login")) if oauth_ctx else "/oauth/login" + + error_html = f""" + + + + + +| Nextcloud Host | +{nextcloud_host} | +
| Session ID | +{session_id} |
+
| {key} | +{value_str} | +
| Username | +{username} | +
| Authentication Mode | +{auth_mode} | +