fix: address PR #571 review comments
- Move httpx import to top-level and use anyio task group for concurrent validation in cleanup_invalid_app_passwords (storage.py) - Respect Retry-After header for 429 responses, capped at 300s (oauth_sync.py) - Soften pre-validation exceptions so transient failures don't crash the background sync task (oauth_sync.py) - Replace f-string SQL with blanket DELETE and add returncode checks (conftest.py) - Extract clear_stale_test_state() helper to deduplicate cleanup logic in astrolabe background sync tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -34,8 +34,12 @@ from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import aiosqlite
|
||||
import anyio
|
||||
import httpx
|
||||
from anyio import to_thread
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
from nextcloud_mcp_server.migrations import stamp_database, upgrade_database
|
||||
from nextcloud_mcp_server.observability.metrics import record_db_operation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -164,10 +168,6 @@ class RefreshTokenStorage:
|
||||
|
||||
# Run migrations in a worker thread using anyio.to_thread
|
||||
# This allows Alembic to run its own async operations in a separate context
|
||||
from anyio import to_thread
|
||||
|
||||
from nextcloud_mcp_server.migrations import stamp_database, upgrade_database
|
||||
|
||||
if not has_alembic:
|
||||
if has_schema:
|
||||
# Stamp existing database without running migrations
|
||||
@@ -1427,8 +1427,6 @@ class RefreshTokenStorage:
|
||||
Returns:
|
||||
List of user IDs whose app passwords were removed
|
||||
"""
|
||||
import httpx
|
||||
|
||||
if not self._initialized:
|
||||
await self.initialize()
|
||||
|
||||
@@ -1438,10 +1436,10 @@ class RefreshTokenStorage:
|
||||
|
||||
removed: list[str] = []
|
||||
|
||||
for user_id in user_ids:
|
||||
async def _validate_user(user_id: str) -> None:
|
||||
app_password = await self.get_app_password(user_id)
|
||||
if not app_password:
|
||||
continue
|
||||
return
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(
|
||||
@@ -1473,6 +1471,10 @@ class RefreshTokenStorage:
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not validate app password for {user_id}: {e}")
|
||||
|
||||
async with anyio.create_task_group() as tg:
|
||||
for user_id in user_ids:
|
||||
tg.start_soon(_validate_user, user_id)
|
||||
|
||||
return removed
|
||||
|
||||
|
||||
|
||||
@@ -33,12 +33,13 @@ from anyio.streams.memory import (
|
||||
)
|
||||
from httpx import BasicAuth, HTTPStatusError
|
||||
|
||||
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.vector.processor import process_document
|
||||
from nextcloud_mcp_server.vector.scanner import DocumentTask, scan_user_documents
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
|
||||
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -89,8 +90,6 @@ async def get_user_client_basic_auth(
|
||||
Raises:
|
||||
NotProvisionedError: If user has not provisioned an app password
|
||||
"""
|
||||
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
|
||||
|
||||
# Get or create storage instance
|
||||
if storage is None:
|
||||
storage = RefreshTokenStorage.from_env()
|
||||
@@ -237,6 +236,11 @@ async def user_scanner_task(
|
||||
f"[{mode_label}] User {user_id} not provisioned, not starting scan loop"
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[{mode_label}] Pre-validation failed for {user_id}: {e}. "
|
||||
f"Proceeding to scan loop (has its own error handling)."
|
||||
)
|
||||
|
||||
consecutive_errors = 0
|
||||
|
||||
@@ -273,13 +277,16 @@ async def user_scanner_task(
|
||||
)
|
||||
break
|
||||
elif status_code == 429:
|
||||
retry_after = min(int(e.response.headers.get("Retry-After", "60")), 300)
|
||||
logger.warning(
|
||||
f"[{mode_label}] Scanner rate-limited for {user_id}, "
|
||||
f"backing off 60s"
|
||||
f"backing off {retry_after}s"
|
||||
)
|
||||
try:
|
||||
with anyio.move_on_after(60):
|
||||
with anyio.move_on_after(retry_after):
|
||||
await shutdown_event.wait()
|
||||
# anyio.get_cancelled_exc_class() catches task cancellation
|
||||
# (e.g. from task group teardown) so we exit cleanly.
|
||||
except anyio.get_cancelled_exc_class():
|
||||
break
|
||||
continue
|
||||
@@ -343,8 +350,6 @@ async def multi_user_processor_task(
|
||||
use_basic_auth: If True, use app passwords; if False, use OAuth tokens
|
||||
task_status: Status object for signaling task readiness
|
||||
"""
|
||||
from nextcloud_mcp_server.vector.processor import process_document
|
||||
|
||||
mode_label = "BasicAuth" if use_basic_auth else "OAuth"
|
||||
logger.info(f"[{mode_label}] Processor {worker_id} started")
|
||||
task_status.started()
|
||||
|
||||
Reference in New Issue
Block a user