diff --git a/nextcloud_mcp_server/auth/storage.py b/nextcloud_mcp_server/auth/storage.py index fe42cc6..364eb4f 100644 --- a/nextcloud_mcp_server/auth/storage.py +++ b/nextcloud_mcp_server/auth/storage.py @@ -34,8 +34,12 @@ from pathlib import Path from typing import Any, Optional import aiosqlite +import anyio +import httpx +from anyio import to_thread from cryptography.fernet import Fernet +from nextcloud_mcp_server.migrations import stamp_database, upgrade_database from nextcloud_mcp_server.observability.metrics import record_db_operation logger = logging.getLogger(__name__) @@ -164,10 +168,6 @@ class RefreshTokenStorage: # Run migrations in a worker thread using anyio.to_thread # This allows Alembic to run its own async operations in a separate context - from anyio import to_thread - - from nextcloud_mcp_server.migrations import stamp_database, upgrade_database - if not has_alembic: if has_schema: # Stamp existing database without running migrations @@ -1427,8 +1427,6 @@ class RefreshTokenStorage: Returns: List of user IDs whose app passwords were removed """ - import httpx - if not self._initialized: await self.initialize() @@ -1438,10 +1436,10 @@ class RefreshTokenStorage: removed: list[str] = [] - for user_id in user_ids: + async def _validate_user(user_id: str) -> None: app_password = await self.get_app_password(user_id) if not app_password: - continue + return try: async with httpx.AsyncClient( @@ -1473,6 +1471,10 @@ class RefreshTokenStorage: except Exception as e: logger.warning(f"Could not validate app password for {user_id}: {e}") + async with anyio.create_task_group() as tg: + for user_id in user_ids: + tg.start_soon(_validate_user, user_id) + return removed diff --git a/nextcloud_mcp_server/vector/oauth_sync.py b/nextcloud_mcp_server/vector/oauth_sync.py index f69b747..c4c56f7 100644 --- a/nextcloud_mcp_server/vector/oauth_sync.py +++ b/nextcloud_mcp_server/vector/oauth_sync.py @@ -33,12 +33,13 @@ from anyio.streams.memory import ( ) from httpx import BasicAuth, HTTPStatusError +from nextcloud_mcp_server.auth.storage import RefreshTokenStorage from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.vector.processor import process_document from nextcloud_mcp_server.vector.scanner import DocumentTask, scan_user_documents if TYPE_CHECKING: - from nextcloud_mcp_server.auth.storage import RefreshTokenStorage from nextcloud_mcp_server.auth.token_broker import TokenBrokerService logger = logging.getLogger(__name__) @@ -89,8 +90,6 @@ async def get_user_client_basic_auth( Raises: NotProvisionedError: If user has not provisioned an app password """ - from nextcloud_mcp_server.auth.storage import RefreshTokenStorage - # Get or create storage instance if storage is None: storage = RefreshTokenStorage.from_env() @@ -237,6 +236,11 @@ async def user_scanner_task( f"[{mode_label}] User {user_id} not provisioned, not starting scan loop" ) return + except Exception as e: + logger.warning( + f"[{mode_label}] Pre-validation failed for {user_id}: {e}. " + f"Proceeding to scan loop (has its own error handling)." + ) consecutive_errors = 0 @@ -273,13 +277,16 @@ async def user_scanner_task( ) break elif status_code == 429: + retry_after = min(int(e.response.headers.get("Retry-After", "60")), 300) logger.warning( f"[{mode_label}] Scanner rate-limited for {user_id}, " - f"backing off 60s" + f"backing off {retry_after}s" ) try: - with anyio.move_on_after(60): + with anyio.move_on_after(retry_after): await shutdown_event.wait() + # anyio.get_cancelled_exc_class() catches task cancellation + # (e.g. from task group teardown) so we exit cleanly. except anyio.get_cancelled_exc_class(): break continue @@ -343,8 +350,6 @@ async def multi_user_processor_task( use_basic_auth: If True, use app passwords; if False, use OAuth tokens task_status: Status object for signaling task readiness """ - from nextcloud_mcp_server.vector.processor import process_document - mode_label = "BasicAuth" if use_basic_auth else "OAuth" logger.info(f"[{mode_label}] Processor {worker_id} started") task_status.started() diff --git a/tests/conftest.py b/tests/conftest.py index 868051e..53f37e6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2400,29 +2400,31 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): except Exception as e: logger.warning(f"Error deleting test user {username}: {e}") - # Clean up app passwords from MCP server to prevent stale scanners - for username in created_users: - try: - import subprocess + # Clean up all app passwords from MCP server to prevent stale scanners + import subprocess - subprocess.run( - [ - "docker", - "compose", - "exec", - "-T", - "mcp-multi-user-basic", - "sqlite3", - "/app/data/tokens.db", - f"DELETE FROM app_passwords WHERE user_id = '{username}';", - ], - capture_output=True, - text=True, - timeout=10, - ) - logger.info(f"Cleaned up app password for {username}") - except Exception as e: - logger.debug(f"App password cleanup for {username}: {e}") + result = subprocess.run( + [ + "docker", + "compose", + "exec", + "-T", + "mcp-multi-user-basic", + "sqlite3", + "/app/data/tokens.db", + "DELETE FROM app_passwords;", + ], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + logger.warning( + f"Failed to clean up app passwords (rc={result.returncode}): " + f"{result.stderr}" + ) + else: + logger.info("Cleaned up all test app passwords") async def _get_oauth_token_for_user( diff --git a/tests/integration/test_astrolabe_multi_user_background_sync.py b/tests/integration/test_astrolabe_multi_user_background_sync.py index cc70293..ef9be9f 100644 --- a/tests/integration/test_astrolabe_multi_user_background_sync.py +++ b/tests/integration/test_astrolabe_multi_user_background_sync.py @@ -834,6 +834,70 @@ async def verify_app_password_created(username: str) -> bool: return False +def clear_stale_test_state(clear_preferences: bool = False) -> None: + """Clear stale app passwords, bruteforce entries, and optionally Astrolabe preferences.""" + commands: list[tuple[list[str], str]] = [ + ( + [ + "docker", + "compose", + "exec", + "-T", + "mcp-multi-user-basic", + "sqlite3", + "/app/data/tokens.db", + "DELETE FROM app_passwords;", + ], + "app passwords", + ), + ( + [ + "docker", + "compose", + "exec", + "-T", + "db", + "mariadb", + "-u", + "root", + "-ppassword", + "nextcloud", + "-e", + "DELETE FROM oc_bruteforce_attempts;", + ], + "bruteforce entries", + ), + ] + if clear_preferences: + commands.append( + ( + [ + "docker", + "compose", + "exec", + "-T", + "db", + "mariadb", + "-u", + "root", + "-ppassword", + "nextcloud", + "-e", + "DELETE FROM oc_preferences WHERE appid = 'astrolabe';", + ], + "Astrolabe preferences", + ), + ) + for cmd, label in commands: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode != 0: + logger.warning( + f"Failed to clear {label} (rc={result.returncode}): {result.stderr}" + ) + else: + logger.debug(f"Cleared {label}") + + @pytest.mark.integration @pytest.mark.oauth async def test_multi_user_astrolabe_background_sync_enablement( @@ -863,40 +927,7 @@ async def test_multi_user_astrolabe_background_sync_enablement( """ # Clear stale state from previous test runs logger.info("Clearing stale app passwords and bruteforce entries...") - subprocess.run( - [ - "docker", - "compose", - "exec", - "-T", - "mcp-multi-user-basic", - "sqlite3", - "/app/data/tokens.db", - "DELETE FROM app_passwords;", - ], - capture_output=True, - text=True, - timeout=10, - ) - subprocess.run( - [ - "docker", - "compose", - "exec", - "-T", - "db", - "mariadb", - "-u", - "root", - "-ppassword", - "nextcloud", - "-e", - "DELETE FROM oc_bruteforce_attempts;", - ], - capture_output=True, - text=True, - timeout=10, - ) + clear_stale_test_state() # Configure Astrolabe to point to the mcp-multi-user-basic server logger.info("Configuring Astrolabe for mcp-multi-user-basic server...") @@ -1239,59 +1270,7 @@ async def test_revoke_background_sync_access( logger.info( "Clearing stale app passwords, bruteforce entries, and Astrolabe preferences..." ) - subprocess.run( - [ - "docker", - "compose", - "exec", - "-T", - "mcp-multi-user-basic", - "sqlite3", - "/app/data/tokens.db", - "DELETE FROM app_passwords;", - ], - capture_output=True, - text=True, - timeout=10, - ) - subprocess.run( - [ - "docker", - "compose", - "exec", - "-T", - "db", - "mariadb", - "-u", - "root", - "-ppassword", - "nextcloud", - "-e", - "DELETE FROM oc_bruteforce_attempts;", - ], - capture_output=True, - text=True, - timeout=10, - ) - subprocess.run( - [ - "docker", - "compose", - "exec", - "-T", - "db", - "mariadb", - "-u", - "root", - "-ppassword", - "nextcloud", - "-e", - "DELETE FROM oc_preferences WHERE appid = 'astrolabe';", - ], - capture_output=True, - text=True, - timeout=10, - ) + clear_stale_test_state(clear_preferences=True) # Configure Astrolabe to point to the mcp-multi-user-basic server logger.info("Configuring Astrolabe for mcp-multi-user-basic server...")