diff --git a/CLAUDE.md b/CLAUDE.md index 194a9cb..5972633 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -5,11 +5,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Coding Conventions ### async/await Patterns -- **Use anyio + asyncio hybrid** - Both libraries are available +- **Use anyio for all async operations** - Provides structured concurrency - pytest runs in `anyio` mode (`anyio_mode = "auto"` in pyproject.toml) - - asyncio used in auth modules (refresh_token_storage.py, token_exchange.py, token_broker.py) - - anyio used in calendar.py, client_registration.py, app.py + - Use `anyio.create_task_group()` for concurrent execution (NOT `asyncio.gather()`) + - Use `anyio.Lock()` for synchronization primitives (NOT `asyncio.Lock()`) + - Use `anyio.run()` for entry points (NOT `asyncio.run()`) - Prefer standard async/await syntax without explicit library imports when possible + - Examples: app.py, search/hybrid.py, search/verification.py, auth/token_broker.py ### Type Hints - **Use Python 3.10+ union syntax**: `str | None` instead of `Optional[str]` diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 501bc16..b953983 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -446,7 +446,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: # Start background tasks using anyio TaskGroup async with anyio.create_task_group() as tg: # Start scanner task - tg.start_soon( + await tg.start( scanner_task, send_stream, shutdown_event, @@ -457,7 +457,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): - tg.start_soon( + await tg.start( processor_task, i, receive_stream.clone(), @@ -1147,7 +1147,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): # Start background tasks using anyio TaskGroup async with anyio_module.create_task_group() as tg: # Start scanner task - tg.start_soon( + await tg.start( scanner_task, send_stream, shutdown_event, @@ -1158,7 +1158,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): - tg.start_soon( + await tg.start( processor_task, i, receive_stream.clone(), diff --git a/nextcloud_mcp_server/auth/storage.py b/nextcloud_mcp_server/auth/storage.py index 1cf3936..9f4c1ff 100644 --- a/nextcloud_mcp_server/auth/storage.py +++ b/nextcloud_mcp_server/auth/storage.py @@ -1310,7 +1310,7 @@ async def generate_encryption_key() -> str: # Example usage if __name__ == "__main__": - import asyncio + import anyio async def main(): # Generate a key for testing @@ -1318,4 +1318,4 @@ if __name__ == "__main__": print(f"Generated encryption key: {key}") print(f"Set this in your environment: export TOKEN_ENCRYPTION_KEY='{key}'") - asyncio.run(main()) + anyio.run(main) diff --git a/nextcloud_mcp_server/auth/token_broker.py b/nextcloud_mcp_server/auth/token_broker.py index a4d68aa..6b89eef 100644 --- a/nextcloud_mcp_server/auth/token_broker.py +++ b/nextcloud_mcp_server/auth/token_broker.py @@ -14,11 +14,11 @@ The Token Broker provides: - Session vs background token separation (RFC 8693) """ -import asyncio import logging from datetime import datetime, timedelta, timezone from typing import Dict, Optional, Tuple +import anyio import httpx import jwt from cryptography.fernet import Fernet @@ -43,7 +43,7 @@ class TokenCache: self._cache: Dict[str, Tuple[str, datetime]] = {} self._ttl = timedelta(seconds=ttl_seconds) self._early_refresh = timedelta(seconds=early_refresh_seconds) - self._lock = asyncio.Lock() + self._lock = anyio.Lock() async def get(self, user_id: str) -> Optional[str]: """Get cached token if valid.""" diff --git a/nextcloud_mcp_server/search/hybrid.py b/nextcloud_mcp_server/search/hybrid.py index 3dd65b0..287aa94 100644 --- a/nextcloud_mcp_server/search/hybrid.py +++ b/nextcloud_mcp_server/search/hybrid.py @@ -1,10 +1,11 @@ """Hybrid search algorithm using Reciprocal Rank Fusion (RRF).""" -import asyncio import logging from collections import defaultdict from typing import Any +import anyio + from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm @@ -105,30 +106,70 @@ class HybridSearchAlgorithm(SearchAlgorithm): f"fuzzy={self.fuzzy_weight})" ) - # Run algorithms in parallel - tasks = [] - algo_names = [] - + # Prepare algorithm configurations for parallel execution + algo_configs = [] if self.semantic_weight > 0: - tasks.append( - self.semantic.search(query, user_id, limit * 2, doc_type, **kwargs) + algo_configs.append( + ( + "semantic", + self.semantic.search, + query, + user_id, + limit * 2, + doc_type, + kwargs, + ) ) - algo_names.append("semantic") - if self.keyword_weight > 0: - tasks.append( - self.keyword.search(query, user_id, limit * 2, doc_type, **kwargs) + algo_configs.append( + ( + "keyword", + self.keyword.search, + query, + user_id, + limit * 2, + doc_type, + kwargs, + ) ) - algo_names.append("keyword") - if self.fuzzy_weight > 0: - tasks.append( - self.fuzzy.search(query, user_id, limit * 2, doc_type, **kwargs) + algo_configs.append( + ( + "fuzzy", + self.fuzzy.search, + query, + user_id, + limit * 2, + doc_type, + kwargs, + ) ) - algo_names.append("fuzzy") - # Execute searches in parallel - results_list = await asyncio.gather(*tasks) + # Pre-allocate results list and extract algorithm names + results_list = [None] * len(algo_configs) + algo_names = [name for name, *_ in algo_configs] + + async def search_one( + index: int, + search_func, + query_arg: str, + user_id_arg: str, + limit_arg: int, + doc_type_arg: str | None, + kwargs_arg: dict, + ): + """Execute one search algorithm and store result at index.""" + result = await search_func( + query_arg, user_id_arg, limit_arg, doc_type_arg, **kwargs_arg + ) + results_list[index] = result + + # Execute searches in parallel using anyio task group + async with anyio.create_task_group() as tg: + for idx, (name, search_func, q, uid, lim, dt, kw) in enumerate( + algo_configs + ): + tg.start_soon(search_one, idx, search_func, q, uid, lim, dt, kw) # Build results dict algo_results = {} diff --git a/nextcloud_mcp_server/vector/processor.py b/nextcloud_mcp_server/vector/processor.py index ceecf5c..fde08d0 100644 --- a/nextcloud_mcp_server/vector/processor.py +++ b/nextcloud_mcp_server/vector/processor.py @@ -8,6 +8,7 @@ import time import uuid import anyio +from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream from httpx import HTTPStatusError from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct @@ -34,6 +35,8 @@ async def processor_task( shutdown_event: anyio.Event, nc_client: NextcloudClient, user_id: str, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ): """ Process documents from stream concurrently. @@ -53,9 +56,13 @@ async def processor_task( shutdown_event: Event signaling shutdown nc_client: Authenticated Nextcloud client user_id: User being processed + task_status: Status object for signaling task readiness """ logger.info(f"Processor {worker_id} started") + # Signal that the task has started and is ready + task_status.started() + while not shutdown_event.is_set(): try: # Get document with timeout (allows checking shutdown) diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index 953219e..27f7d86 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -8,6 +8,7 @@ import time from dataclasses import dataclass import anyio +from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectSendStream from qdrant_client.models import FieldCondition, Filter, MatchValue @@ -93,6 +94,8 @@ async def scanner_task( wake_event: anyio.Event, nc_client: NextcloudClient, user_id: str, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ): """ Periodic scanner that detects changed documents for enabled user. @@ -105,10 +108,14 @@ async def scanner_task( wake_event: Event to trigger immediate scan nc_client: Authenticated Nextcloud client user_id: User to scan + task_status: Status object for signaling task readiness """ logger.info(f"Scanner task started for user: {user_id}") settings = get_settings() + # Signal that the task has started and is ready + task_status.started() + async with send_stream: while not shutdown_event.is_set(): try: