diff --git a/docker-compose.yml b/docker-compose.yml index cbf308a..a03c22b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,19 +14,12 @@ services: - MYSQL_DATABASE=nextcloud - MYSQL_USER=nextcloud - # Note: Redis is an external service. You can find more information about the configuration here: - # https://hub.docker.com/_/redis - redis: - image: docker.io/library/redis:alpine@sha256:59b6e694653476de2c992937ebe1c64182af4728e54bb49e9b7a6c26614d8933 - restart: always - app: image: docker.io/library/nextcloud:32.0.0@sha256:3e70e4dfe882ef44738fdc30d9896fb07c12febb27c4a1177e3d63dc0004a0b4 restart: always ports: - 0.0.0.0:8080:80 depends_on: - - redis - db volumes: - nextcloud:/var/www/html diff --git a/tests/conftest.py b/tests/conftest.py index 49fd7f5..6ab4adb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -854,11 +854,9 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): """ Create test users for multi-user OAuth testing. - Creates four test users: + Creates two test users to reduce CI resource usage: - alice: Owner role, creates resources - bob: Viewer role, read-only access - - charlie: Editor role, can edit (in 'editors' group) - - diana: No-access role, no shares """ test_user_configs = { "alice": { @@ -873,50 +871,12 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): "display_name": "Bob Viewer", "groups": [], }, - "charlie": { - "password": "CharlieSecurePass789!", - "email": "charlie@example.com", - "display_name": "Charlie Editor", - "groups": ["editors"], - }, - "diana": { - "password": "DianaSecurePass012!", - "email": "diana@example.com", - "display_name": "Diana NoAccess", - "groups": [], - }, } logger.info("Creating test users for multi-user OAuth testing...") created_users = [] try: - # Create the 'editors' group first (charlie needs it) - try: - # Use admin nc_client to create the group via User API - # First, try to create it (will fail if exists, but that's okay) - async with httpx.AsyncClient() as http_client: - base_url = str(nc_client._client.base_url) - # Get password from environment since nc_client doesn't expose it - password = os.getenv("NEXTCLOUD_PASSWORD") - response = await http_client.post( - f"{base_url}/ocs/v2.php/cloud/groups", - auth=(nc_client.username, password), - headers={"OCS-APIRequest": "true", "Accept": "application/json"}, - data={"groupid": "editors"}, - ) - if response.status_code in [ - 200, - 409, - ]: # 200 = created, 409 = already exists - logger.info("Editors group ready") - else: - logger.warning( - f"Group creation returned {response.status_code}: {response.text}" - ) - except Exception as e: - logger.warning(f"Error creating editors group (may already exist): {e}") - # Create each test user for username, config in test_user_configs.items(): try: @@ -929,14 +889,6 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): logger.info(f"Created test user: {username}") created_users.append(username) - # Add user to groups if specified - for group in config["groups"]: - try: - await nc_client.users.add_user_to_group(username, group) - logger.info(f"Added {username} to group {group}") - except Exception as e: - logger.warning(f"Error adding {username} to group {group}: {e}") - except Exception as e: # User might already exist, that's okay logger.warning( @@ -1094,7 +1046,7 @@ async def _get_oauth_token_for_user( return access_token -# Parallel token retrieval fixture - fetches all OAuth tokens concurrently +# OAuth token retrieval fixture - parallel locally, sequential in CI @pytest.fixture(scope="session") async def all_oauth_tokens( anyio_backend, @@ -1104,13 +1056,13 @@ async def all_oauth_tokens( oauth_callback_server, ) -> dict[str, str]: """ - Fetch OAuth tokens for all test users in parallel for speed. + Fetch OAuth tokens for all test users. + + In CI (GitHub Actions), fetches sequentially to reduce load on Nextcloud. + Locally, fetches in parallel for speed. Returns a dict mapping username to OAuth access token. - This is significantly faster than fetching tokens sequentially. - - Now uses the real callback server with state parameters for reliable - concurrent token acquisition without race conditions. + Uses the real callback server with state parameters for reliable token acquisition. """ import asyncio import time @@ -1119,47 +1071,68 @@ async def all_oauth_tokens( auth_states, callback_url = oauth_callback_server start_time = time.time() - logger.info("Fetching OAuth tokens for all users in parallel...") + is_ci = os.getenv("GITHUB_ACTIONS") == "true" + mode = "sequentially" if is_ci else "in parallel" + logger.info(f"Fetching OAuth tokens for all users {mode} (CI={is_ci})...") logger.info(f"Using callback server at {callback_url} with state-based correlation") - async def get_token_with_delay(username: str, config: dict, delay: float): - """Get token for a user after a small delay to stagger requests.""" - if delay > 0: - await asyncio.sleep(delay) - return await _get_oauth_token_for_user( - browser, - shared_oauth_client_credentials, - auth_states, - username, - config["password"], - ) - - # Create tasks for all users with staggered starts (0.5s apart) - tasks = { - username: get_token_with_delay(username, config, idx * 0.5) - for idx, (username, config) in enumerate(test_users_setup.items()) - } - - # Run all token fetches concurrently - results = await asyncio.gather(*tasks.values(), return_exceptions=True) - - # Build result dict, handling any errors tokens = {} - for username, result in zip(tasks.keys(), results): - if isinstance(result, Exception): - logger.error(f"Failed to get OAuth token for {username}: {result}") - raise result - tokens[username] = result + + if is_ci: + # Sequential execution in CI to reduce Nextcloud load + logger.info("Running in CI: using sequential OAuth token acquisition") + for username, config in test_users_setup.items(): + logger.info(f"Fetching OAuth token for {username}...") + tokens[username] = await _get_oauth_token_for_user( + browser, + shared_oauth_client_credentials, + auth_states, + username, + config["password"], + ) + # Add delay between users to give Nextcloud breathing room + await asyncio.sleep(1.0) + else: + # Parallel execution locally for speed + logger.info("Running locally: using parallel OAuth token acquisition") + + async def get_token_with_delay(username: str, config: dict, delay: float): + """Get token for a user after a small delay to stagger requests.""" + if delay > 0: + await asyncio.sleep(delay) + return await _get_oauth_token_for_user( + browser, + shared_oauth_client_credentials, + auth_states, + username, + config["password"], + ) + + # Create tasks for all users with staggered starts (0.5s apart) + tasks = { + username: get_token_with_delay(username, config, idx * 0.5) + for idx, (username, config) in enumerate(test_users_setup.items()) + } + + # Run all token fetches concurrently + results = await asyncio.gather(*tasks.values(), return_exceptions=True) + + # Build result dict, handling any errors + for username, result in zip(tasks.keys(), results): + if isinstance(result, Exception): + logger.error(f"Failed to get OAuth token for {username}: {result}") + raise result + tokens[username] = result elapsed = time.time() - start_time logger.info( - f"Successfully fetched {len(tokens)} OAuth tokens in parallel " + f"Successfully fetched {len(tokens)} OAuth tokens {mode} " f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)" ) return tokens -# Session-scoped OAuth token fixtures - now use the parallel fixture +# Session-scoped OAuth token fixtures @pytest.fixture(scope="session") async def alice_oauth_token(anyio_backend, all_oauth_tokens) -> str: """OAuth token for alice (cached for session). Uses shared OAuth client.""" @@ -1172,18 +1145,6 @@ async def bob_oauth_token(anyio_backend, all_oauth_tokens) -> str: return all_oauth_tokens["bob"] -@pytest.fixture(scope="session") -async def charlie_oauth_token(anyio_backend, all_oauth_tokens) -> str: - """OAuth token for charlie (cached for session). Uses shared OAuth client.""" - return all_oauth_tokens["charlie"] - - -@pytest.fixture(scope="session") -async def diana_oauth_token(anyio_backend, all_oauth_tokens) -> str: - """OAuth token for diana (cached for session). Uses shared OAuth client.""" - return all_oauth_tokens["diana"] - - @pytest.fixture(scope="session") async def alice_mcp_client( anyio_backend, @@ -1211,34 +1172,6 @@ async def bob_mcp_client( yield session -@pytest.fixture(scope="session") -async def charlie_mcp_client( - anyio_backend, - charlie_oauth_token: str, -) -> AsyncGenerator[ClientSession, Any]: - """MCP client authenticated as charlie (editor role, in 'editors' group).""" - async for session in create_mcp_client_session( - url="http://127.0.0.1:8001/mcp", - token=charlie_oauth_token, - client_name="Charlie MCP", - ): - yield session - - -@pytest.fixture(scope="session") -async def diana_mcp_client( - anyio_backend, - diana_oauth_token: str, -) -> AsyncGenerator[ClientSession, Any]: - """MCP client authenticated as diana (no-access role).""" - async for session in create_mcp_client_session( - url="http://127.0.0.1:8001/mcp", - token=diana_oauth_token, - client_name="Diana MCP", - ): - yield session - - # Test user/group fixtures for clean test isolation @pytest.fixture async def test_user(nc_client: NextcloudClient): diff --git a/tests/load/benchmark.py b/tests/load/benchmark.py index 54adffb..53af736 100644 --- a/tests/load/benchmark.py +++ b/tests/load/benchmark.py @@ -7,7 +7,6 @@ Usage: uv run python -m tests.load.benchmark -c 50 -d 300 --output results.json """ -import asyncio import json import logging import signal @@ -254,7 +253,7 @@ async def wait_for_mcp_server(url: str, max_attempts: int = 10) -> bool: except Exception as e: if attempt < max_attempts: logger.debug(f"Attempt {attempt}/{max_attempts}: {e}") - await asyncio.sleep(2) + await anyio.sleep(2) else: logger.error(f"MCP server not ready after {max_attempts} attempts") return False @@ -267,7 +266,7 @@ async def benchmark_worker( url: str, duration: float, metrics: BenchmarkMetrics, - stop_event: asyncio.Event, + stop_event: anyio.Event, ): """Single worker that runs operations for the specified duration.""" logger.info(f"Worker {worker_id} starting...") @@ -293,7 +292,7 @@ async def benchmark_worker( operation_count += 1 # Small delay to prevent overwhelming the server - await asyncio.sleep(0.01) + await anyio.sleep(0.01) # Cleanup await ops.cleanup() @@ -312,7 +311,7 @@ async def run_benchmark( ) -> BenchmarkMetrics: """Run the benchmark with specified parameters.""" metrics = BenchmarkMetrics() - stop_event = asyncio.Event() + stop_event = anyio.Event() # Setup signal handlers for graceful shutdown def signal_handler(sig, frame): @@ -331,27 +330,22 @@ async def run_benchmark( # Warmup period if warmup > 0: print("Warming up...") - await asyncio.sleep(warmup) + await anyio.sleep(warmup) # Start metrics collection metrics.start() - # Create and run workers - workers = [ - benchmark_worker(i, url, duration, metrics, stop_event) - for i in range(concurrency) - ] + # Create and run workers using anyio task groups + async with anyio.create_task_group() as tg: + # Start all workers + for i in range(concurrency): + tg.start_soon(benchmark_worker, i, url, duration, metrics, stop_event) - # Show progress - progress_task = asyncio.create_task(show_progress(duration, metrics, stop_event)) + # Show progress + tg.start_soon(show_progress, duration, metrics, stop_event) - # Wait for all workers to complete - await asyncio.gather(*workers, return_exceptions=True) - - # Stop metrics and progress + # Stop metrics (tasks already completed when task group exits) metrics.stop() - stop_event.set() - await progress_task return metrics @@ -359,7 +353,7 @@ async def run_benchmark( async def show_progress( duration: float, metrics: BenchmarkMetrics, - stop_event: asyncio.Event, + stop_event: anyio.Event, ): """Show real-time progress during benchmark.""" start_time = time.time() @@ -387,7 +381,7 @@ async def show_progress( flush=True, ) - await asyncio.sleep(0.5) + await anyio.sleep(0.5) print() # New line after progress diff --git a/tests/load/oauth_benchmark.py b/tests/load/oauth_benchmark.py index 4cf3296..2c20b2b 100644 --- a/tests/load/oauth_benchmark.py +++ b/tests/load/oauth_benchmark.py @@ -10,7 +10,6 @@ Usage: uv run python -m tests.load.oauth_benchmark -u 10 -d 300 --workload sharing """ -import asyncio import json import logging import os @@ -223,7 +222,7 @@ async def oauth_benchmark_worker( workload: MixedOAuthWorkload, duration: float, metrics: OAuthBenchmarkMetrics, - stop_event: asyncio.Event, + stop_event: anyio.Event, ): """ Single worker executing operations for one user. @@ -258,13 +257,13 @@ async def oauth_benchmark_worker( operation_count += 1 # Small delay to prevent overwhelming the server - await asyncio.sleep(0.05) + await anyio.sleep(0.05) logger.info( f"Worker for {user_wrapper.username} completed {operation_count} operations" ) - except asyncio.CancelledError: + except anyio.get_cancelled_exc_class(): # Handle task cancellation gracefully (e.g., during benchmark shutdown) logger.info( f"Worker for {user_wrapper.username} was cancelled " @@ -278,7 +277,7 @@ async def oauth_benchmark_worker( async def show_progress( duration: float, metrics: OAuthBenchmarkMetrics, - stop_event: asyncio.Event, + stop_event: anyio.Event, ): """Show real-time progress during benchmark.""" start_time = time.time() @@ -306,7 +305,7 @@ async def show_progress( flush=True, ) - await asyncio.sleep(0.5) + await anyio.sleep(0.5) print() # New line after progress @@ -338,7 +337,7 @@ async def run_oauth_benchmark( OAuthBenchmarkMetrics with results """ metrics = OAuthBenchmarkMetrics() - stop_event = asyncio.Event() + stop_event = anyio.Event() created_users: list[str] = [] callback_server: OAuthCallbackServer | None = None user_pool: OAuthUserPool | None = None @@ -437,12 +436,23 @@ async def run_oauth_benchmark( browser = await browser_launcher.launch(headless=not headed) try: - # Create all users concurrently - tasks = [ - create_user_task(i, browser, callback_server.auth_states) - for i in range(num_users) - ] - results = await asyncio.gather(*tasks, return_exceptions=True) + # Create all users concurrently using anyio task groups + results = [] + + async def run_and_collect(i: int): + """Wrapper to collect results from tasks.""" + try: + result = await create_user_task( + i, browser, callback_server.auth_states + ) + results.append(result) + except Exception as e: + logger.error(f"User creation task failed: {e}") + results.append(e) + + async with anyio.create_task_group() as tg: + for i in range(num_users): + tg.start_soon(run_and_collect, i) # Process results for result in results: @@ -484,13 +494,21 @@ async def run_oauth_benchmark( logger.error(f"Failed to create session for {username}: {e}") return None - # Create all sessions concurrently - session_tasks = [ - create_session_task(username) for username in created_users - ] - session_results = await asyncio.gather( - *session_tasks, return_exceptions=True - ) + # Create all sessions concurrently using anyio task groups + session_results = [] + + async def run_and_collect_session(username: str): + """Wrapper to collect session results from tasks.""" + try: + result = await create_session_task(username) + session_results.append(result) + except Exception as e: + logger.error(f"Session creation task failed: {e}") + session_results.append(e) + + async with anyio.create_task_group() as tg: + for username in created_users: + tg.start_soon(run_and_collect_session, username) # Process results for result in session_results: @@ -508,7 +526,7 @@ async def run_oauth_benchmark( # Warmup period if warmup > 0: print(f"Warmup period: {warmup}s...") - await asyncio.sleep(warmup) + await anyio.sleep(warmup) print() # Start benchmark @@ -518,21 +536,26 @@ async def run_oauth_benchmark( metrics.start() - # Create workload and workers + # Create workload and workers using anyio task groups workload = MixedOAuthWorkload(user_wrappers) - workers = [ - oauth_benchmark_worker(wrapper, workload, duration, metrics, stop_event) - for wrapper in user_wrappers - ] # Run workers with progress display - progress_task = asyncio.create_task( - show_progress(duration, metrics, stop_event) - ) - await asyncio.gather(*workers, return_exceptions=True) - stop_event.set() - await progress_task + async with anyio.create_task_group() as tg: + # Start all workers + for wrapper in user_wrappers: + tg.start_soon( + oauth_benchmark_worker, + wrapper, + workload, + duration, + metrics, + stop_event, + ) + # Show progress + tg.start_soon(show_progress, duration, metrics, stop_event) + + # Tasks already completed when task group exits metrics.stop() print(f"\n{'=' * 80}")