fix: address PR #589 review findings

- Fix anyio.Lock() created at module import time; use lazy init in
  get_shared_storage() to avoid instantiation before event loop exists
- Stop get_login_flow_session from silently swallowing DB exceptions;
  re-raise and handle in caller with proper error response
- Update ProvisionAccessResponse and UpdateScopesResponse status field
  docs to include all actual values (declined, cancelled, unchanged)
- Narrow except clause in present_login_url to (AttributeError,
  NotImplementedError) instead of bare Exception
- Add KeyError handling in LoginFlowV2Client.initiate() and poll() for
  clear errors on malformed Nextcloud responses
- Simplify redundant env-var bypass branches in scope_authorization.py
- Extract _maybe_login_flow_cleanup() context manager to replace 4
  inline cleanup loop registrations in app.py; move sleep to end of
  loop body so cleanup runs once at startup
- Replace fragile string replacement in _rewrite_login_flow_url with
  proper urllib.parse URL handling

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2026-03-02 09:10:57 +01:00
parent 1a6ce0fa7d
commit ba597634bd
8 changed files with 73 additions and 55 deletions
+16 -21
View File
@@ -1531,7 +1531,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
async def _login_flow_cleanup_loop() -> None: async def _login_flow_cleanup_loop() -> None:
"""Periodically clean up expired Login Flow v2 sessions.""" """Periodically clean up expired Login Flow v2 sessions."""
while True: while True:
await anyio.sleep(3600) # Every hour
try: try:
storage = await get_shared_storage() storage = await get_shared_storage()
count = await storage.delete_expired_login_flow_sessions() count = await storage.delete_expired_login_flow_sessions()
@@ -1539,6 +1538,18 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
logger.info(f"Cleaned up {count} expired login flow sessions") logger.info(f"Cleaned up {count} expired login flow sessions")
except Exception as e: except Exception as e:
logger.warning(f"Login flow cleanup error: {e}") logger.warning(f"Login flow cleanup error: {e}")
await anyio.sleep(3600) # Every hour
@asynccontextmanager
async def _maybe_login_flow_cleanup():
"""Start Login Flow cleanup task if enabled."""
if settings.enable_login_flow:
async with anyio.create_task_group() as tg:
tg.start_soon(_login_flow_cleanup_loop)
yield
tg.cancel_scope.cancel()
else:
yield
@asynccontextmanager @asynccontextmanager
async def starlette_lifespan(app: Starlette): async def starlette_lifespan(app: Starlette):
@@ -1772,13 +1783,10 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
f"{settings.vector_sync_processor_workers} processors" f"{settings.vector_sync_processor_workers} processors"
) )
# Start Login Flow cleanup task if enabled
if settings.enable_login_flow:
tg.start_soon(_login_flow_cleanup_loop)
# Run MCP session manager and yield # Run MCP session manager and yield
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run()) await stack.enter_async_context(mcp.session_manager.run())
await stack.enter_async_context(_maybe_login_flow_cleanup())
try: try:
yield yield
finally: finally:
@@ -1959,13 +1967,10 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
f"{settings.vector_sync_processor_workers} processors" f"{settings.vector_sync_processor_workers} processors"
) )
# Start Login Flow cleanup task if enabled
if settings.enable_login_flow:
tg.start_soon(_login_flow_cleanup_loop)
# Run MCP session manager and yield # Run MCP session manager and yield
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run()) await stack.enter_async_context(mcp.session_manager.run())
await stack.enter_async_context(_maybe_login_flow_cleanup())
try: try:
yield yield
finally: finally:
@@ -1986,12 +1991,7 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Just run MCP session manager without vector sync # Just run MCP session manager without vector sync
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run()) await stack.enter_async_context(mcp.session_manager.run())
if settings.enable_login_flow: async with _maybe_login_flow_cleanup():
async with anyio.create_task_group() as cleanup_tg:
cleanup_tg.start_soon(_login_flow_cleanup_loop)
yield
cleanup_tg.cancel_scope.cancel()
else:
yield yield
else: else:
@@ -2013,12 +2013,7 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
) )
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run()) await stack.enter_async_context(mcp.session_manager.run())
if settings.enable_login_flow: async with _maybe_login_flow_cleanup():
async with anyio.create_task_group() as cleanup_tg:
cleanup_tg.start_soon(_login_flow_cleanup_loop)
yield
cleanup_tg.cancel_scope.cancel()
else:
yield yield
# Health check endpoints for Kubernetes probes # Health check endpoints for Kubernetes probes
+5 -3
View File
@@ -70,7 +70,9 @@ async def present_login_url(
logger.info("User cancelled login flow") logger.info("User cancelled login flow")
return "cancelled" return "cancelled"
except Exception as e: except (AttributeError, NotImplementedError) as e:
# Elicitation not supported by this client - fall back to message # Elicitation not supported by this client/SDK - fall back to message
logger.debug(f"Elicitation not available ({e}), returning URL in message") logger.debug(
f"Elicitation not available ({type(e).__name__}: {e}), returning URL in message"
)
return "message_only" return "message_only"
+21 -11
View File
@@ -90,11 +90,16 @@ class LoginFlowV2Client:
poll_data = data.get("poll", {}) poll_data = data.get("poll", {})
result = LoginFlowInitResponse( try:
login_url=data["login"], result = LoginFlowInitResponse(
poll_endpoint=poll_data["endpoint"], login_url=data["login"],
poll_token=poll_data["token"], poll_endpoint=poll_data["endpoint"],
) poll_token=poll_data["token"],
)
except KeyError as e:
raise ValueError(
f"Malformed Login Flow v2 initiate response from Nextcloud (missing key: {e})"
) from e
logger.info(f"Login Flow v2 initiated: login_url={result.login_url[:60]}...") logger.info(f"Login Flow v2 initiated: login_url={result.login_url[:60]}...")
return result return result
@@ -129,12 +134,17 @@ class LoginFlowV2Client:
f"Login Flow v2 completed: server={data.get('server')}, " f"Login Flow v2 completed: server={data.get('server')}, "
f"loginName={data.get('loginName')}" f"loginName={data.get('loginName')}"
) )
return LoginFlowPollResult( try:
status="completed", return LoginFlowPollResult(
server=data["server"], status="completed",
login_name=data["loginName"], server=data["server"],
app_password=data["appPassword"], login_name=data["loginName"],
) app_password=data["appPassword"],
)
except KeyError as e:
raise ValueError(
f"Malformed Login Flow v2 poll response from Nextcloud (missing key: {e})"
) from e
if response.status_code == 404: if response.status_code == 404:
logger.debug("Login Flow v2 still pending") logger.debug("Login Flow v2 still pending")
@@ -128,20 +128,17 @@ def require_scopes(*required_scopes: str):
) )
if access_token is None: if access_token is None:
# Check if single-user BasicAuth mode (env var app password) # No OAuth token — either BasicAuth with env var credentials
# If NEXTCLOUD_APP_PASSWORD or NEXTCLOUD_PASSWORD is set, bypass scope checks # or BasicAuth without explicit credentials. Both bypass scope checks.
settings = get_settings() settings = get_settings()
if settings.nextcloud_app_password or settings.nextcloud_password: if settings.nextcloud_app_password or settings.nextcloud_password:
logger.debug( logger.debug(
f"No access token for {func_name} - allowing (env var app password)" f"No access token for {func_name} - allowing (env var app password)"
) )
return await func(*args, **kwargs) else:
logger.debug(
# Not in OAuth mode (BasicAuth or no auth) f"No access token present for {func_name} - allowing (BasicAuth mode)"
# In BasicAuth mode, all operations are allowed )
logger.debug(
f"No access token present for {func_name} - allowing (BasicAuth mode)"
)
return await func(*args, **kwargs) return await func(*args, **kwargs)
# ── Login Flow v2: Check stored app password scopes ── # ── Login Flow v2: Check stored app password scopes ──
+5 -3
View File
@@ -1779,7 +1779,7 @@ class RefreshTokenStorage:
logger.error( logger.error(
f"Failed to retrieve login flow session for user {user_id}: {e}" f"Failed to retrieve login flow session for user {user_id}: {e}"
) )
return None raise
async def delete_login_flow_session(self, user_id: str) -> bool: async def delete_login_flow_session(self, user_id: str) -> bool:
"""Delete a Login Flow v2 session. """Delete a Login Flow v2 session.
@@ -1861,7 +1861,7 @@ class RefreshTokenStorage:
_shared_instance: RefreshTokenStorage | None = None _shared_instance: RefreshTokenStorage | None = None
_shared_lock = anyio.Lock() _shared_lock: anyio.Lock | None = None
async def get_shared_storage() -> RefreshTokenStorage: async def get_shared_storage() -> RefreshTokenStorage:
@@ -1871,7 +1871,9 @@ async def get_shared_storage() -> RefreshTokenStorage:
creating their own lazy singletons. The lock ensures thread-safe creating their own lazy singletons. The lock ensures thread-safe
initialization on concurrent first-access. initialization on concurrent first-access.
""" """
global _shared_instance global _shared_instance, _shared_lock
if _shared_lock is None:
_shared_lock = anyio.Lock()
async with _shared_lock: async with _shared_lock:
if _shared_instance is None: if _shared_instance is None:
_shared_instance = RefreshTokenStorage.from_env() _shared_instance = RefreshTokenStorage.from_env()
+4 -2
View File
@@ -9,7 +9,7 @@ class ProvisionAccessResponse(BaseResponse):
"""Response from nc_auth_provision_access tool.""" """Response from nc_auth_provision_access tool."""
status: str = Field( status: str = Field(
description="Provisioning status: 'login_required', 'already_provisioned', 'error'" description="Provisioning status: 'login_required', 'already_provisioned', 'declined', 'cancelled', 'error'"
) )
login_url: str | None = Field( login_url: str | None = Field(
None, description="URL to open in browser for Nextcloud login" None, description="URL to open in browser for Nextcloud login"
@@ -38,7 +38,9 @@ class ProvisionStatusResponse(BaseResponse):
class UpdateScopesResponse(BaseResponse): class UpdateScopesResponse(BaseResponse):
"""Response from nc_auth_update_scopes tool.""" """Response from nc_auth_update_scopes tool."""
status: str = Field(description="Status: 'login_required', 'updated', 'error'") status: str = Field(
description="Status: 'login_required', 'unchanged', 'declined', 'cancelled', 'error'"
)
login_url: str | None = Field( login_url: str | None = Field(
None, description="URL for re-provisioning with new scopes" None, description="URL for re-provisioning with new scopes"
) )
+10 -1
View File
@@ -212,7 +212,16 @@ def register_auth_tools(mcp: FastMCP) -> None:
) )
# Check for pending login flow session # Check for pending login flow session
session = await storage.get_login_flow_session(user_id) try:
session = await storage.get_login_flow_session(user_id)
except Exception as e:
logger.error(f"Failed to check login flow session for {user_id}: {e}")
return ProvisionStatusResponse(
status="error",
message=f"Failed to check login flow session: {e}",
user_id=user_id,
success=False,
)
if not session: if not session:
return ProvisionStatusResponse( return ProvisionStatusResponse(
status="not_initiated", status="not_initiated",
+6 -5
View File
@@ -14,7 +14,7 @@ import os
import secrets import secrets
import time import time
from typing import Any, AsyncGenerator from typing import Any, AsyncGenerator
from urllib.parse import quote from urllib.parse import quote, urlparse, urlunparse
import anyio import anyio
import httpx import httpx
@@ -214,10 +214,11 @@ def _rewrite_login_flow_url(login_url: str) -> str:
the host and needs localhost:8080 instead. the host and needs localhost:8080 instead.
""" """
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080") nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
# Replace common internal Docker hostnames target = urlparse(nextcloud_host)
url = login_url.replace("http://app:80", nextcloud_host) parsed = urlparse(login_url)
url = url.replace("http://app", nextcloud_host) if parsed.hostname == "app":
return url parsed = parsed._replace(scheme=target.scheme, netloc=target.netloc)
return urlunparse(parsed)
async def _complete_login_flow_v2(browser, login_url: str) -> None: async def _complete_login_flow_v2(browser, login_url: str) -> None: