diff --git a/tests/conftest.py b/tests/conftest.py index 6ab4adb..49fd7f5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -854,9 +854,11 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): """ Create test users for multi-user OAuth testing. - Creates two test users to reduce CI resource usage: + Creates four test users: - alice: Owner role, creates resources - bob: Viewer role, read-only access + - charlie: Editor role, can edit (in 'editors' group) + - diana: No-access role, no shares """ test_user_configs = { "alice": { @@ -871,12 +873,50 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): "display_name": "Bob Viewer", "groups": [], }, + "charlie": { + "password": "CharlieSecurePass789!", + "email": "charlie@example.com", + "display_name": "Charlie Editor", + "groups": ["editors"], + }, + "diana": { + "password": "DianaSecurePass012!", + "email": "diana@example.com", + "display_name": "Diana NoAccess", + "groups": [], + }, } logger.info("Creating test users for multi-user OAuth testing...") created_users = [] try: + # Create the 'editors' group first (charlie needs it) + try: + # Use admin nc_client to create the group via User API + # First, try to create it (will fail if exists, but that's okay) + async with httpx.AsyncClient() as http_client: + base_url = str(nc_client._client.base_url) + # Get password from environment since nc_client doesn't expose it + password = os.getenv("NEXTCLOUD_PASSWORD") + response = await http_client.post( + f"{base_url}/ocs/v2.php/cloud/groups", + auth=(nc_client.username, password), + headers={"OCS-APIRequest": "true", "Accept": "application/json"}, + data={"groupid": "editors"}, + ) + if response.status_code in [ + 200, + 409, + ]: # 200 = created, 409 = already exists + logger.info("Editors group ready") + else: + logger.warning( + f"Group creation returned {response.status_code}: {response.text}" + ) + except Exception as e: + logger.warning(f"Error creating editors group (may already exist): {e}") + # Create each test user for username, config in test_user_configs.items(): try: @@ -889,6 +929,14 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): logger.info(f"Created test user: {username}") created_users.append(username) + # Add user to groups if specified + for group in config["groups"]: + try: + await nc_client.users.add_user_to_group(username, group) + logger.info(f"Added {username} to group {group}") + except Exception as e: + logger.warning(f"Error adding {username} to group {group}: {e}") + except Exception as e: # User might already exist, that's okay logger.warning( @@ -1046,7 +1094,7 @@ async def _get_oauth_token_for_user( return access_token -# OAuth token retrieval fixture - parallel locally, sequential in CI +# Parallel token retrieval fixture - fetches all OAuth tokens concurrently @pytest.fixture(scope="session") async def all_oauth_tokens( anyio_backend, @@ -1056,13 +1104,13 @@ async def all_oauth_tokens( oauth_callback_server, ) -> dict[str, str]: """ - Fetch OAuth tokens for all test users. - - In CI (GitHub Actions), fetches sequentially to reduce load on Nextcloud. - Locally, fetches in parallel for speed. + Fetch OAuth tokens for all test users in parallel for speed. Returns a dict mapping username to OAuth access token. - Uses the real callback server with state parameters for reliable token acquisition. + This is significantly faster than fetching tokens sequentially. + + Now uses the real callback server with state parameters for reliable + concurrent token acquisition without race conditions. """ import asyncio import time @@ -1071,68 +1119,47 @@ async def all_oauth_tokens( auth_states, callback_url = oauth_callback_server start_time = time.time() - is_ci = os.getenv("GITHUB_ACTIONS") == "true" - mode = "sequentially" if is_ci else "in parallel" - logger.info(f"Fetching OAuth tokens for all users {mode} (CI={is_ci})...") + logger.info("Fetching OAuth tokens for all users in parallel...") logger.info(f"Using callback server at {callback_url} with state-based correlation") + async def get_token_with_delay(username: str, config: dict, delay: float): + """Get token for a user after a small delay to stagger requests.""" + if delay > 0: + await asyncio.sleep(delay) + return await _get_oauth_token_for_user( + browser, + shared_oauth_client_credentials, + auth_states, + username, + config["password"], + ) + + # Create tasks for all users with staggered starts (0.5s apart) + tasks = { + username: get_token_with_delay(username, config, idx * 0.5) + for idx, (username, config) in enumerate(test_users_setup.items()) + } + + # Run all token fetches concurrently + results = await asyncio.gather(*tasks.values(), return_exceptions=True) + + # Build result dict, handling any errors tokens = {} - - if is_ci: - # Sequential execution in CI to reduce Nextcloud load - logger.info("Running in CI: using sequential OAuth token acquisition") - for username, config in test_users_setup.items(): - logger.info(f"Fetching OAuth token for {username}...") - tokens[username] = await _get_oauth_token_for_user( - browser, - shared_oauth_client_credentials, - auth_states, - username, - config["password"], - ) - # Add delay between users to give Nextcloud breathing room - await asyncio.sleep(1.0) - else: - # Parallel execution locally for speed - logger.info("Running locally: using parallel OAuth token acquisition") - - async def get_token_with_delay(username: str, config: dict, delay: float): - """Get token for a user after a small delay to stagger requests.""" - if delay > 0: - await asyncio.sleep(delay) - return await _get_oauth_token_for_user( - browser, - shared_oauth_client_credentials, - auth_states, - username, - config["password"], - ) - - # Create tasks for all users with staggered starts (0.5s apart) - tasks = { - username: get_token_with_delay(username, config, idx * 0.5) - for idx, (username, config) in enumerate(test_users_setup.items()) - } - - # Run all token fetches concurrently - results = await asyncio.gather(*tasks.values(), return_exceptions=True) - - # Build result dict, handling any errors - for username, result in zip(tasks.keys(), results): - if isinstance(result, Exception): - logger.error(f"Failed to get OAuth token for {username}: {result}") - raise result - tokens[username] = result + for username, result in zip(tasks.keys(), results): + if isinstance(result, Exception): + logger.error(f"Failed to get OAuth token for {username}: {result}") + raise result + tokens[username] = result elapsed = time.time() - start_time logger.info( - f"Successfully fetched {len(tokens)} OAuth tokens {mode} " + f"Successfully fetched {len(tokens)} OAuth tokens in parallel " f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)" ) return tokens -# Session-scoped OAuth token fixtures +# Session-scoped OAuth token fixtures - now use the parallel fixture @pytest.fixture(scope="session") async def alice_oauth_token(anyio_backend, all_oauth_tokens) -> str: """OAuth token for alice (cached for session). Uses shared OAuth client.""" @@ -1145,6 +1172,18 @@ async def bob_oauth_token(anyio_backend, all_oauth_tokens) -> str: return all_oauth_tokens["bob"] +@pytest.fixture(scope="session") +async def charlie_oauth_token(anyio_backend, all_oauth_tokens) -> str: + """OAuth token for charlie (cached for session). Uses shared OAuth client.""" + return all_oauth_tokens["charlie"] + + +@pytest.fixture(scope="session") +async def diana_oauth_token(anyio_backend, all_oauth_tokens) -> str: + """OAuth token for diana (cached for session). Uses shared OAuth client.""" + return all_oauth_tokens["diana"] + + @pytest.fixture(scope="session") async def alice_mcp_client( anyio_backend, @@ -1172,6 +1211,34 @@ async def bob_mcp_client( yield session +@pytest.fixture(scope="session") +async def charlie_mcp_client( + anyio_backend, + charlie_oauth_token: str, +) -> AsyncGenerator[ClientSession, Any]: + """MCP client authenticated as charlie (editor role, in 'editors' group).""" + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=charlie_oauth_token, + client_name="Charlie MCP", + ): + yield session + + +@pytest.fixture(scope="session") +async def diana_mcp_client( + anyio_backend, + diana_oauth_token: str, +) -> AsyncGenerator[ClientSession, Any]: + """MCP client authenticated as diana (no-access role).""" + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=diana_oauth_token, + client_name="Diana MCP", + ): + yield session + + # Test user/group fixtures for clean test isolation @pytest.fixture async def test_user(nc_client: NextcloudClient):