refactor: migrate asyncio to anyio for consistent structured concurrency

Replace asyncio primitives with anyio equivalents throughout the codebase
to establish a single async pattern. This provides better structured
concurrency with automatic cancellation on errors and aligns with the
pytest anyio configuration.

Changes:
- hybrid.py: Replace asyncio.gather() with anyio task groups
- token_broker.py: Replace asyncio.Lock() with anyio.Lock()
- storage.py: Replace asyncio.run() with anyio.run()
- app.py: Replace tg.start_soon() with await tg.start() for task status
- processor.py: Add task_status parameter for structured startup
- scanner.py: Add task_status parameter for structured startup
- CLAUDE.md: Update async/await patterns guidance

The change from start_soon() to await tg.start() enables proper task
initialization signaling, ensuring background tasks are ready before
proceeding. This follows anyio best practices for structured concurrency.

All 118 unit tests pass with the new implementation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-16 03:51:45 +01:00
parent 98d1c2de8e
commit c8d9cc24e0
7 changed files with 86 additions and 29 deletions
+5 -3
View File
@@ -5,11 +5,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Coding Conventions
### async/await Patterns
- **Use anyio + asyncio hybrid** - Both libraries are available
- **Use anyio for all async operations** - Provides structured concurrency
- pytest runs in `anyio` mode (`anyio_mode = "auto"` in pyproject.toml)
- asyncio used in auth modules (refresh_token_storage.py, token_exchange.py, token_broker.py)
- anyio used in calendar.py, client_registration.py, app.py
- Use `anyio.create_task_group()` for concurrent execution (NOT `asyncio.gather()`)
- Use `anyio.Lock()` for synchronization primitives (NOT `asyncio.Lock()`)
- Use `anyio.run()` for entry points (NOT `asyncio.run()`)
- Prefer standard async/await syntax without explicit library imports when possible
- Examples: app.py, search/hybrid.py, search/verification.py, auth/token_broker.py
### Type Hints
- **Use Python 3.10+ union syntax**: `str | None` instead of `Optional[str]`
+4 -4
View File
@@ -446,7 +446,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start scanner task
tg.start_soon(
await tg.start(
scanner_task,
send_stream,
shutdown_event,
@@ -457,7 +457,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
tg.start_soon(
await tg.start(
processor_task,
i,
receive_stream.clone(),
@@ -1147,7 +1147,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start scanner task
tg.start_soon(
await tg.start(
scanner_task,
send_stream,
shutdown_event,
@@ -1158,7 +1158,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
tg.start_soon(
await tg.start(
processor_task,
i,
receive_stream.clone(),
+2 -2
View File
@@ -1310,7 +1310,7 @@ async def generate_encryption_key() -> str:
# Example usage
if __name__ == "__main__":
import asyncio
import anyio
async def main():
# Generate a key for testing
@@ -1318,4 +1318,4 @@ if __name__ == "__main__":
print(f"Generated encryption key: {key}")
print(f"Set this in your environment: export TOKEN_ENCRYPTION_KEY='{key}'")
asyncio.run(main())
anyio.run(main)
+2 -2
View File
@@ -14,11 +14,11 @@ The Token Broker provides:
- Session vs background token separation (RFC 8693)
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional, Tuple
import anyio
import httpx
import jwt
from cryptography.fernet import Fernet
@@ -43,7 +43,7 @@ class TokenCache:
self._cache: Dict[str, Tuple[str, datetime]] = {}
self._ttl = timedelta(seconds=ttl_seconds)
self._early_refresh = timedelta(seconds=early_refresh_seconds)
self._lock = asyncio.Lock()
self._lock = anyio.Lock()
async def get(self, user_id: str) -> Optional[str]:
"""Get cached token if valid."""
+59 -18
View File
@@ -1,10 +1,11 @@
"""Hybrid search algorithm using Reciprocal Rank Fusion (RRF)."""
import asyncio
import logging
from collections import defaultdict
from typing import Any
import anyio
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm
from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm
@@ -105,30 +106,70 @@ class HybridSearchAlgorithm(SearchAlgorithm):
f"fuzzy={self.fuzzy_weight})"
)
# Run algorithms in parallel
tasks = []
algo_names = []
# Prepare algorithm configurations for parallel execution
algo_configs = []
if self.semantic_weight > 0:
tasks.append(
self.semantic.search(query, user_id, limit * 2, doc_type, **kwargs)
algo_configs.append(
(
"semantic",
self.semantic.search,
query,
user_id,
limit * 2,
doc_type,
kwargs,
)
)
algo_names.append("semantic")
if self.keyword_weight > 0:
tasks.append(
self.keyword.search(query, user_id, limit * 2, doc_type, **kwargs)
algo_configs.append(
(
"keyword",
self.keyword.search,
query,
user_id,
limit * 2,
doc_type,
kwargs,
)
)
algo_names.append("keyword")
if self.fuzzy_weight > 0:
tasks.append(
self.fuzzy.search(query, user_id, limit * 2, doc_type, **kwargs)
algo_configs.append(
(
"fuzzy",
self.fuzzy.search,
query,
user_id,
limit * 2,
doc_type,
kwargs,
)
)
algo_names.append("fuzzy")
# Execute searches in parallel
results_list = await asyncio.gather(*tasks)
# Pre-allocate results list and extract algorithm names
results_list = [None] * len(algo_configs)
algo_names = [name for name, *_ in algo_configs]
async def search_one(
index: int,
search_func,
query_arg: str,
user_id_arg: str,
limit_arg: int,
doc_type_arg: str | None,
kwargs_arg: dict,
):
"""Execute one search algorithm and store result at index."""
result = await search_func(
query_arg, user_id_arg, limit_arg, doc_type_arg, **kwargs_arg
)
results_list[index] = result
# Execute searches in parallel using anyio task group
async with anyio.create_task_group() as tg:
for idx, (name, search_func, q, uid, lim, dt, kw) in enumerate(
algo_configs
):
tg.start_soon(search_one, idx, search_func, q, uid, lim, dt, kw)
# Build results dict
algo_results = {}
+7
View File
@@ -8,6 +8,7 @@ import time
import uuid
import anyio
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
@@ -34,6 +35,8 @@ async def processor_task(
shutdown_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
):
"""
Process documents from stream concurrently.
@@ -53,9 +56,13 @@ async def processor_task(
shutdown_event: Event signaling shutdown
nc_client: Authenticated Nextcloud client
user_id: User being processed
task_status: Status object for signaling task readiness
"""
logger.info(f"Processor {worker_id} started")
# Signal that the task has started and is ready
task_status.started()
while not shutdown_event.is_set():
try:
# Get document with timeout (allows checking shutdown)
+7
View File
@@ -8,6 +8,7 @@ import time
from dataclasses import dataclass
import anyio
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectSendStream
from qdrant_client.models import FieldCondition, Filter, MatchValue
@@ -93,6 +94,8 @@ async def scanner_task(
wake_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
):
"""
Periodic scanner that detects changed documents for enabled user.
@@ -105,10 +108,14 @@ async def scanner_task(
wake_event: Event to trigger immediate scan
nc_client: Authenticated Nextcloud client
user_id: User to scan
task_status: Status object for signaling task readiness
"""
logger.info(f"Scanner task started for user: {user_id}")
settings = get_settings()
# Signal that the task has started and is ready
task_status.started()
async with send_stream:
while not shutdown_event.is_set():
try: