From c8d9cc24e013bea08421395e3df2c3ab44c3f537 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 16 Nov 2025 03:51:45 +0100 Subject: [PATCH] refactor: migrate asyncio to anyio for consistent structured concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace asyncio primitives with anyio equivalents throughout the codebase to establish a single async pattern. This provides better structured concurrency with automatic cancellation on errors and aligns with the pytest anyio configuration. Changes: - hybrid.py: Replace asyncio.gather() with anyio task groups - token_broker.py: Replace asyncio.Lock() with anyio.Lock() - storage.py: Replace asyncio.run() with anyio.run() - app.py: Replace tg.start_soon() with await tg.start() for task status - processor.py: Add task_status parameter for structured startup - scanner.py: Add task_status parameter for structured startup - CLAUDE.md: Update async/await patterns guidance The change from start_soon() to await tg.start() enables proper task initialization signaling, ensuring background tasks are ready before proceeding. This follows anyio best practices for structured concurrency. All 118 unit tests pass with the new implementation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- CLAUDE.md | 8 ++- nextcloud_mcp_server/app.py | 8 +-- nextcloud_mcp_server/auth/storage.py | 4 +- nextcloud_mcp_server/auth/token_broker.py | 4 +- nextcloud_mcp_server/search/hybrid.py | 77 +++++++++++++++++------ nextcloud_mcp_server/vector/processor.py | 7 +++ nextcloud_mcp_server/vector/scanner.py | 7 +++ 7 files changed, 86 insertions(+), 29 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 194a9cb..5972633 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -5,11 +5,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Coding Conventions ### async/await Patterns -- **Use anyio + asyncio hybrid** - Both libraries are available +- **Use anyio for all async operations** - Provides structured concurrency - pytest runs in `anyio` mode (`anyio_mode = "auto"` in pyproject.toml) - - asyncio used in auth modules (refresh_token_storage.py, token_exchange.py, token_broker.py) - - anyio used in calendar.py, client_registration.py, app.py + - Use `anyio.create_task_group()` for concurrent execution (NOT `asyncio.gather()`) + - Use `anyio.Lock()` for synchronization primitives (NOT `asyncio.Lock()`) + - Use `anyio.run()` for entry points (NOT `asyncio.run()`) - Prefer standard async/await syntax without explicit library imports when possible + - Examples: app.py, search/hybrid.py, search/verification.py, auth/token_broker.py ### Type Hints - **Use Python 3.10+ union syntax**: `str | None` instead of `Optional[str]` diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 501bc16..b953983 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -446,7 +446,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: # Start background tasks using anyio TaskGroup async with anyio.create_task_group() as tg: # Start scanner task - tg.start_soon( + await tg.start( scanner_task, send_stream, shutdown_event, @@ -457,7 +457,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): - tg.start_soon( + await tg.start( processor_task, i, receive_stream.clone(), @@ -1147,7 +1147,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): # Start background tasks using anyio TaskGroup async with anyio_module.create_task_group() as tg: # Start scanner task - tg.start_soon( + await tg.start( scanner_task, send_stream, shutdown_event, @@ -1158,7 +1158,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): - tg.start_soon( + await tg.start( processor_task, i, receive_stream.clone(), diff --git a/nextcloud_mcp_server/auth/storage.py b/nextcloud_mcp_server/auth/storage.py index 1cf3936..9f4c1ff 100644 --- a/nextcloud_mcp_server/auth/storage.py +++ b/nextcloud_mcp_server/auth/storage.py @@ -1310,7 +1310,7 @@ async def generate_encryption_key() -> str: # Example usage if __name__ == "__main__": - import asyncio + import anyio async def main(): # Generate a key for testing @@ -1318,4 +1318,4 @@ if __name__ == "__main__": print(f"Generated encryption key: {key}") print(f"Set this in your environment: export TOKEN_ENCRYPTION_KEY='{key}'") - asyncio.run(main()) + anyio.run(main) diff --git a/nextcloud_mcp_server/auth/token_broker.py b/nextcloud_mcp_server/auth/token_broker.py index a4d68aa..6b89eef 100644 --- a/nextcloud_mcp_server/auth/token_broker.py +++ b/nextcloud_mcp_server/auth/token_broker.py @@ -14,11 +14,11 @@ The Token Broker provides: - Session vs background token separation (RFC 8693) """ -import asyncio import logging from datetime import datetime, timedelta, timezone from typing import Dict, Optional, Tuple +import anyio import httpx import jwt from cryptography.fernet import Fernet @@ -43,7 +43,7 @@ class TokenCache: self._cache: Dict[str, Tuple[str, datetime]] = {} self._ttl = timedelta(seconds=ttl_seconds) self._early_refresh = timedelta(seconds=early_refresh_seconds) - self._lock = asyncio.Lock() + self._lock = anyio.Lock() async def get(self, user_id: str) -> Optional[str]: """Get cached token if valid.""" diff --git a/nextcloud_mcp_server/search/hybrid.py b/nextcloud_mcp_server/search/hybrid.py index 3dd65b0..287aa94 100644 --- a/nextcloud_mcp_server/search/hybrid.py +++ b/nextcloud_mcp_server/search/hybrid.py @@ -1,10 +1,11 @@ """Hybrid search algorithm using Reciprocal Rank Fusion (RRF).""" -import asyncio import logging from collections import defaultdict from typing import Any +import anyio + from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm @@ -105,30 +106,70 @@ class HybridSearchAlgorithm(SearchAlgorithm): f"fuzzy={self.fuzzy_weight})" ) - # Run algorithms in parallel - tasks = [] - algo_names = [] - + # Prepare algorithm configurations for parallel execution + algo_configs = [] if self.semantic_weight > 0: - tasks.append( - self.semantic.search(query, user_id, limit * 2, doc_type, **kwargs) + algo_configs.append( + ( + "semantic", + self.semantic.search, + query, + user_id, + limit * 2, + doc_type, + kwargs, + ) ) - algo_names.append("semantic") - if self.keyword_weight > 0: - tasks.append( - self.keyword.search(query, user_id, limit * 2, doc_type, **kwargs) + algo_configs.append( + ( + "keyword", + self.keyword.search, + query, + user_id, + limit * 2, + doc_type, + kwargs, + ) ) - algo_names.append("keyword") - if self.fuzzy_weight > 0: - tasks.append( - self.fuzzy.search(query, user_id, limit * 2, doc_type, **kwargs) + algo_configs.append( + ( + "fuzzy", + self.fuzzy.search, + query, + user_id, + limit * 2, + doc_type, + kwargs, + ) ) - algo_names.append("fuzzy") - # Execute searches in parallel - results_list = await asyncio.gather(*tasks) + # Pre-allocate results list and extract algorithm names + results_list = [None] * len(algo_configs) + algo_names = [name for name, *_ in algo_configs] + + async def search_one( + index: int, + search_func, + query_arg: str, + user_id_arg: str, + limit_arg: int, + doc_type_arg: str | None, + kwargs_arg: dict, + ): + """Execute one search algorithm and store result at index.""" + result = await search_func( + query_arg, user_id_arg, limit_arg, doc_type_arg, **kwargs_arg + ) + results_list[index] = result + + # Execute searches in parallel using anyio task group + async with anyio.create_task_group() as tg: + for idx, (name, search_func, q, uid, lim, dt, kw) in enumerate( + algo_configs + ): + tg.start_soon(search_one, idx, search_func, q, uid, lim, dt, kw) # Build results dict algo_results = {} diff --git a/nextcloud_mcp_server/vector/processor.py b/nextcloud_mcp_server/vector/processor.py index ceecf5c..fde08d0 100644 --- a/nextcloud_mcp_server/vector/processor.py +++ b/nextcloud_mcp_server/vector/processor.py @@ -8,6 +8,7 @@ import time import uuid import anyio +from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream from httpx import HTTPStatusError from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct @@ -34,6 +35,8 @@ async def processor_task( shutdown_event: anyio.Event, nc_client: NextcloudClient, user_id: str, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ): """ Process documents from stream concurrently. @@ -53,9 +56,13 @@ async def processor_task( shutdown_event: Event signaling shutdown nc_client: Authenticated Nextcloud client user_id: User being processed + task_status: Status object for signaling task readiness """ logger.info(f"Processor {worker_id} started") + # Signal that the task has started and is ready + task_status.started() + while not shutdown_event.is_set(): try: # Get document with timeout (allows checking shutdown) diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index 953219e..27f7d86 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -8,6 +8,7 @@ import time from dataclasses import dataclass import anyio +from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectSendStream from qdrant_client.models import FieldCondition, Filter, MatchValue @@ -93,6 +94,8 @@ async def scanner_task( wake_event: anyio.Event, nc_client: NextcloudClient, user_id: str, + *, + task_status: TaskStatus = anyio.TASK_STATUS_IGNORED, ): """ Periodic scanner that detects changed documents for enabled user. @@ -105,10 +108,14 @@ async def scanner_task( wake_event: Event to trigger immediate scan nc_client: Authenticated Nextcloud client user_id: User to scan + task_status: Status object for signaling task readiness """ logger.info(f"Scanner task started for user: {user_id}") settings = get_settings() + # Signal that the task has started and is ready + task_status.started() + async with send_stream: while not shutdown_event.is_set(): try: