fix: address PR #589 review findings
- Fix anyio.Lock() created at module import time; use lazy init in get_shared_storage() to avoid instantiation before event loop exists - Stop get_login_flow_session from silently swallowing DB exceptions; re-raise and handle in caller with proper error response - Update ProvisionAccessResponse and UpdateScopesResponse status field docs to include all actual values (declined, cancelled, unchanged) - Narrow except clause in present_login_url to (AttributeError, NotImplementedError) instead of bare Exception - Add KeyError handling in LoginFlowV2Client.initiate() and poll() for clear errors on malformed Nextcloud responses - Simplify redundant env-var bypass branches in scope_authorization.py - Extract _maybe_login_flow_cleanup() context manager to replace 4 inline cleanup loop registrations in app.py; move sleep to end of loop body so cleanup runs once at startup - Replace fragile string replacement in _rewrite_login_flow_url with proper urllib.parse URL handling Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+16
-21
@@ -1531,7 +1531,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
|
|||||||
async def _login_flow_cleanup_loop() -> None:
|
async def _login_flow_cleanup_loop() -> None:
|
||||||
"""Periodically clean up expired Login Flow v2 sessions."""
|
"""Periodically clean up expired Login Flow v2 sessions."""
|
||||||
while True:
|
while True:
|
||||||
await anyio.sleep(3600) # Every hour
|
|
||||||
try:
|
try:
|
||||||
storage = await get_shared_storage()
|
storage = await get_shared_storage()
|
||||||
count = await storage.delete_expired_login_flow_sessions()
|
count = await storage.delete_expired_login_flow_sessions()
|
||||||
@@ -1539,6 +1538,18 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
|
|||||||
logger.info(f"Cleaned up {count} expired login flow sessions")
|
logger.info(f"Cleaned up {count} expired login flow sessions")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Login flow cleanup error: {e}")
|
logger.warning(f"Login flow cleanup error: {e}")
|
||||||
|
await anyio.sleep(3600) # Every hour
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _maybe_login_flow_cleanup():
|
||||||
|
"""Start Login Flow cleanup task if enabled."""
|
||||||
|
if settings.enable_login_flow:
|
||||||
|
async with anyio.create_task_group() as tg:
|
||||||
|
tg.start_soon(_login_flow_cleanup_loop)
|
||||||
|
yield
|
||||||
|
tg.cancel_scope.cancel()
|
||||||
|
else:
|
||||||
|
yield
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def starlette_lifespan(app: Starlette):
|
async def starlette_lifespan(app: Starlette):
|
||||||
@@ -1772,13 +1783,10 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
|
|||||||
f"{settings.vector_sync_processor_workers} processors"
|
f"{settings.vector_sync_processor_workers} processors"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Start Login Flow cleanup task if enabled
|
|
||||||
if settings.enable_login_flow:
|
|
||||||
tg.start_soon(_login_flow_cleanup_loop)
|
|
||||||
|
|
||||||
# Run MCP session manager and yield
|
# Run MCP session manager and yield
|
||||||
async with AsyncExitStack() as stack:
|
async with AsyncExitStack() as stack:
|
||||||
await stack.enter_async_context(mcp.session_manager.run())
|
await stack.enter_async_context(mcp.session_manager.run())
|
||||||
|
await stack.enter_async_context(_maybe_login_flow_cleanup())
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
@@ -1959,13 +1967,10 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
|
|||||||
f"{settings.vector_sync_processor_workers} processors"
|
f"{settings.vector_sync_processor_workers} processors"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Start Login Flow cleanup task if enabled
|
|
||||||
if settings.enable_login_flow:
|
|
||||||
tg.start_soon(_login_flow_cleanup_loop)
|
|
||||||
|
|
||||||
# Run MCP session manager and yield
|
# Run MCP session manager and yield
|
||||||
async with AsyncExitStack() as stack:
|
async with AsyncExitStack() as stack:
|
||||||
await stack.enter_async_context(mcp.session_manager.run())
|
await stack.enter_async_context(mcp.session_manager.run())
|
||||||
|
await stack.enter_async_context(_maybe_login_flow_cleanup())
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
@@ -1986,12 +1991,7 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
|
|||||||
# Just run MCP session manager without vector sync
|
# Just run MCP session manager without vector sync
|
||||||
async with AsyncExitStack() as stack:
|
async with AsyncExitStack() as stack:
|
||||||
await stack.enter_async_context(mcp.session_manager.run())
|
await stack.enter_async_context(mcp.session_manager.run())
|
||||||
if settings.enable_login_flow:
|
async with _maybe_login_flow_cleanup():
|
||||||
async with anyio.create_task_group() as cleanup_tg:
|
|
||||||
cleanup_tg.start_soon(_login_flow_cleanup_loop)
|
|
||||||
yield
|
|
||||||
cleanup_tg.cancel_scope.cancel()
|
|
||||||
else:
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@@ -2013,12 +2013,7 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
|
|||||||
)
|
)
|
||||||
async with AsyncExitStack() as stack:
|
async with AsyncExitStack() as stack:
|
||||||
await stack.enter_async_context(mcp.session_manager.run())
|
await stack.enter_async_context(mcp.session_manager.run())
|
||||||
if settings.enable_login_flow:
|
async with _maybe_login_flow_cleanup():
|
||||||
async with anyio.create_task_group() as cleanup_tg:
|
|
||||||
cleanup_tg.start_soon(_login_flow_cleanup_loop)
|
|
||||||
yield
|
|
||||||
cleanup_tg.cancel_scope.cancel()
|
|
||||||
else:
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# Health check endpoints for Kubernetes probes
|
# Health check endpoints for Kubernetes probes
|
||||||
|
|||||||
@@ -70,7 +70,9 @@ async def present_login_url(
|
|||||||
logger.info("User cancelled login flow")
|
logger.info("User cancelled login flow")
|
||||||
return "cancelled"
|
return "cancelled"
|
||||||
|
|
||||||
except Exception as e:
|
except (AttributeError, NotImplementedError) as e:
|
||||||
# Elicitation not supported by this client - fall back to message
|
# Elicitation not supported by this client/SDK - fall back to message
|
||||||
logger.debug(f"Elicitation not available ({e}), returning URL in message")
|
logger.debug(
|
||||||
|
f"Elicitation not available ({type(e).__name__}: {e}), returning URL in message"
|
||||||
|
)
|
||||||
return "message_only"
|
return "message_only"
|
||||||
|
|||||||
@@ -90,11 +90,16 @@ class LoginFlowV2Client:
|
|||||||
|
|
||||||
poll_data = data.get("poll", {})
|
poll_data = data.get("poll", {})
|
||||||
|
|
||||||
result = LoginFlowInitResponse(
|
try:
|
||||||
login_url=data["login"],
|
result = LoginFlowInitResponse(
|
||||||
poll_endpoint=poll_data["endpoint"],
|
login_url=data["login"],
|
||||||
poll_token=poll_data["token"],
|
poll_endpoint=poll_data["endpoint"],
|
||||||
)
|
poll_token=poll_data["token"],
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
raise ValueError(
|
||||||
|
f"Malformed Login Flow v2 initiate response from Nextcloud (missing key: {e})"
|
||||||
|
) from e
|
||||||
|
|
||||||
logger.info(f"Login Flow v2 initiated: login_url={result.login_url[:60]}...")
|
logger.info(f"Login Flow v2 initiated: login_url={result.login_url[:60]}...")
|
||||||
return result
|
return result
|
||||||
@@ -129,12 +134,17 @@ class LoginFlowV2Client:
|
|||||||
f"Login Flow v2 completed: server={data.get('server')}, "
|
f"Login Flow v2 completed: server={data.get('server')}, "
|
||||||
f"loginName={data.get('loginName')}"
|
f"loginName={data.get('loginName')}"
|
||||||
)
|
)
|
||||||
return LoginFlowPollResult(
|
try:
|
||||||
status="completed",
|
return LoginFlowPollResult(
|
||||||
server=data["server"],
|
status="completed",
|
||||||
login_name=data["loginName"],
|
server=data["server"],
|
||||||
app_password=data["appPassword"],
|
login_name=data["loginName"],
|
||||||
)
|
app_password=data["appPassword"],
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
raise ValueError(
|
||||||
|
f"Malformed Login Flow v2 poll response from Nextcloud (missing key: {e})"
|
||||||
|
) from e
|
||||||
|
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
logger.debug("Login Flow v2 still pending")
|
logger.debug("Login Flow v2 still pending")
|
||||||
|
|||||||
@@ -128,20 +128,17 @@ def require_scopes(*required_scopes: str):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if access_token is None:
|
if access_token is None:
|
||||||
# Check if single-user BasicAuth mode (env var app password)
|
# No OAuth token — either BasicAuth with env var credentials
|
||||||
# If NEXTCLOUD_APP_PASSWORD or NEXTCLOUD_PASSWORD is set, bypass scope checks
|
# or BasicAuth without explicit credentials. Both bypass scope checks.
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
if settings.nextcloud_app_password or settings.nextcloud_password:
|
if settings.nextcloud_app_password or settings.nextcloud_password:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"No access token for {func_name} - allowing (env var app password)"
|
f"No access token for {func_name} - allowing (env var app password)"
|
||||||
)
|
)
|
||||||
return await func(*args, **kwargs)
|
else:
|
||||||
|
logger.debug(
|
||||||
# Not in OAuth mode (BasicAuth or no auth)
|
f"No access token present for {func_name} - allowing (BasicAuth mode)"
|
||||||
# In BasicAuth mode, all operations are allowed
|
)
|
||||||
logger.debug(
|
|
||||||
f"No access token present for {func_name} - allowing (BasicAuth mode)"
|
|
||||||
)
|
|
||||||
return await func(*args, **kwargs)
|
return await func(*args, **kwargs)
|
||||||
|
|
||||||
# ── Login Flow v2: Check stored app password scopes ──
|
# ── Login Flow v2: Check stored app password scopes ──
|
||||||
|
|||||||
@@ -1779,7 +1779,7 @@ class RefreshTokenStorage:
|
|||||||
logger.error(
|
logger.error(
|
||||||
f"Failed to retrieve login flow session for user {user_id}: {e}"
|
f"Failed to retrieve login flow session for user {user_id}: {e}"
|
||||||
)
|
)
|
||||||
return None
|
raise
|
||||||
|
|
||||||
async def delete_login_flow_session(self, user_id: str) -> bool:
|
async def delete_login_flow_session(self, user_id: str) -> bool:
|
||||||
"""Delete a Login Flow v2 session.
|
"""Delete a Login Flow v2 session.
|
||||||
@@ -1861,7 +1861,7 @@ class RefreshTokenStorage:
|
|||||||
|
|
||||||
|
|
||||||
_shared_instance: RefreshTokenStorage | None = None
|
_shared_instance: RefreshTokenStorage | None = None
|
||||||
_shared_lock = anyio.Lock()
|
_shared_lock: anyio.Lock | None = None
|
||||||
|
|
||||||
|
|
||||||
async def get_shared_storage() -> RefreshTokenStorage:
|
async def get_shared_storage() -> RefreshTokenStorage:
|
||||||
@@ -1871,7 +1871,9 @@ async def get_shared_storage() -> RefreshTokenStorage:
|
|||||||
creating their own lazy singletons. The lock ensures thread-safe
|
creating their own lazy singletons. The lock ensures thread-safe
|
||||||
initialization on concurrent first-access.
|
initialization on concurrent first-access.
|
||||||
"""
|
"""
|
||||||
global _shared_instance
|
global _shared_instance, _shared_lock
|
||||||
|
if _shared_lock is None:
|
||||||
|
_shared_lock = anyio.Lock()
|
||||||
async with _shared_lock:
|
async with _shared_lock:
|
||||||
if _shared_instance is None:
|
if _shared_instance is None:
|
||||||
_shared_instance = RefreshTokenStorage.from_env()
|
_shared_instance = RefreshTokenStorage.from_env()
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ class ProvisionAccessResponse(BaseResponse):
|
|||||||
"""Response from nc_auth_provision_access tool."""
|
"""Response from nc_auth_provision_access tool."""
|
||||||
|
|
||||||
status: str = Field(
|
status: str = Field(
|
||||||
description="Provisioning status: 'login_required', 'already_provisioned', 'error'"
|
description="Provisioning status: 'login_required', 'already_provisioned', 'declined', 'cancelled', 'error'"
|
||||||
)
|
)
|
||||||
login_url: str | None = Field(
|
login_url: str | None = Field(
|
||||||
None, description="URL to open in browser for Nextcloud login"
|
None, description="URL to open in browser for Nextcloud login"
|
||||||
@@ -38,7 +38,9 @@ class ProvisionStatusResponse(BaseResponse):
|
|||||||
class UpdateScopesResponse(BaseResponse):
|
class UpdateScopesResponse(BaseResponse):
|
||||||
"""Response from nc_auth_update_scopes tool."""
|
"""Response from nc_auth_update_scopes tool."""
|
||||||
|
|
||||||
status: str = Field(description="Status: 'login_required', 'updated', 'error'")
|
status: str = Field(
|
||||||
|
description="Status: 'login_required', 'unchanged', 'declined', 'cancelled', 'error'"
|
||||||
|
)
|
||||||
login_url: str | None = Field(
|
login_url: str | None = Field(
|
||||||
None, description="URL for re-provisioning with new scopes"
|
None, description="URL for re-provisioning with new scopes"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -212,7 +212,16 @@ def register_auth_tools(mcp: FastMCP) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Check for pending login flow session
|
# Check for pending login flow session
|
||||||
session = await storage.get_login_flow_session(user_id)
|
try:
|
||||||
|
session = await storage.get_login_flow_session(user_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to check login flow session for {user_id}: {e}")
|
||||||
|
return ProvisionStatusResponse(
|
||||||
|
status="error",
|
||||||
|
message=f"Failed to check login flow session: {e}",
|
||||||
|
user_id=user_id,
|
||||||
|
success=False,
|
||||||
|
)
|
||||||
if not session:
|
if not session:
|
||||||
return ProvisionStatusResponse(
|
return ProvisionStatusResponse(
|
||||||
status="not_initiated",
|
status="not_initiated",
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import os
|
|||||||
import secrets
|
import secrets
|
||||||
import time
|
import time
|
||||||
from typing import Any, AsyncGenerator
|
from typing import Any, AsyncGenerator
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote, urlparse, urlunparse
|
||||||
|
|
||||||
import anyio
|
import anyio
|
||||||
import httpx
|
import httpx
|
||||||
@@ -214,10 +214,11 @@ def _rewrite_login_flow_url(login_url: str) -> str:
|
|||||||
the host and needs localhost:8080 instead.
|
the host and needs localhost:8080 instead.
|
||||||
"""
|
"""
|
||||||
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
|
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
|
||||||
# Replace common internal Docker hostnames
|
target = urlparse(nextcloud_host)
|
||||||
url = login_url.replace("http://app:80", nextcloud_host)
|
parsed = urlparse(login_url)
|
||||||
url = url.replace("http://app", nextcloud_host)
|
if parsed.hostname == "app":
|
||||||
return url
|
parsed = parsed._replace(scheme=target.scheme, netloc=target.netloc)
|
||||||
|
return urlunparse(parsed)
|
||||||
|
|
||||||
|
|
||||||
async def _complete_login_flow_v2(browser, login_url: str) -> None:
|
async def _complete_login_flow_v2(browser, login_url: str) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user