chore: revert conftest.py

This commit is contained in:
Chris Coutinho
2025-10-18 22:44:51 +02:00
parent 2f805e54b7
commit ead298c132
+125 -58
View File
@@ -854,9 +854,11 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
"""
Create test users for multi-user OAuth testing.
Creates two test users to reduce CI resource usage:
Creates four test users:
- alice: Owner role, creates resources
- bob: Viewer role, read-only access
- charlie: Editor role, can edit (in 'editors' group)
- diana: No-access role, no shares
"""
test_user_configs = {
"alice": {
@@ -871,12 +873,50 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
"display_name": "Bob Viewer",
"groups": [],
},
"charlie": {
"password": "CharlieSecurePass789!",
"email": "charlie@example.com",
"display_name": "Charlie Editor",
"groups": ["editors"],
},
"diana": {
"password": "DianaSecurePass012!",
"email": "diana@example.com",
"display_name": "Diana NoAccess",
"groups": [],
},
}
logger.info("Creating test users for multi-user OAuth testing...")
created_users = []
try:
# Create the 'editors' group first (charlie needs it)
try:
# Use admin nc_client to create the group via User API
# First, try to create it (will fail if exists, but that's okay)
async with httpx.AsyncClient() as http_client:
base_url = str(nc_client._client.base_url)
# Get password from environment since nc_client doesn't expose it
password = os.getenv("NEXTCLOUD_PASSWORD")
response = await http_client.post(
f"{base_url}/ocs/v2.php/cloud/groups",
auth=(nc_client.username, password),
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data={"groupid": "editors"},
)
if response.status_code in [
200,
409,
]: # 200 = created, 409 = already exists
logger.info("Editors group ready")
else:
logger.warning(
f"Group creation returned {response.status_code}: {response.text}"
)
except Exception as e:
logger.warning(f"Error creating editors group (may already exist): {e}")
# Create each test user
for username, config in test_user_configs.items():
try:
@@ -889,6 +929,14 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
logger.info(f"Created test user: {username}")
created_users.append(username)
# Add user to groups if specified
for group in config["groups"]:
try:
await nc_client.users.add_user_to_group(username, group)
logger.info(f"Added {username} to group {group}")
except Exception as e:
logger.warning(f"Error adding {username} to group {group}: {e}")
except Exception as e:
# User might already exist, that's okay
logger.warning(
@@ -1046,7 +1094,7 @@ async def _get_oauth_token_for_user(
return access_token
# OAuth token retrieval fixture - parallel locally, sequential in CI
# Parallel token retrieval fixture - fetches all OAuth tokens concurrently
@pytest.fixture(scope="session")
async def all_oauth_tokens(
anyio_backend,
@@ -1056,13 +1104,13 @@ async def all_oauth_tokens(
oauth_callback_server,
) -> dict[str, str]:
"""
Fetch OAuth tokens for all test users.
In CI (GitHub Actions), fetches sequentially to reduce load on Nextcloud.
Locally, fetches in parallel for speed.
Fetch OAuth tokens for all test users in parallel for speed.
Returns a dict mapping username to OAuth access token.
Uses the real callback server with state parameters for reliable token acquisition.
This is significantly faster than fetching tokens sequentially.
Now uses the real callback server with state parameters for reliable
concurrent token acquisition without race conditions.
"""
import asyncio
import time
@@ -1071,68 +1119,47 @@ async def all_oauth_tokens(
auth_states, callback_url = oauth_callback_server
start_time = time.time()
is_ci = os.getenv("GITHUB_ACTIONS") == "true"
mode = "sequentially" if is_ci else "in parallel"
logger.info(f"Fetching OAuth tokens for all users {mode} (CI={is_ci})...")
logger.info("Fetching OAuth tokens for all users in parallel...")
logger.info(f"Using callback server at {callback_url} with state-based correlation")
async def get_token_with_delay(username: str, config: dict, delay: float):
"""Get token for a user after a small delay to stagger requests."""
if delay > 0:
await asyncio.sleep(delay)
return await _get_oauth_token_for_user(
browser,
shared_oauth_client_credentials,
auth_states,
username,
config["password"],
)
# Create tasks for all users with staggered starts (0.5s apart)
tasks = {
username: get_token_with_delay(username, config, idx * 0.5)
for idx, (username, config) in enumerate(test_users_setup.items())
}
# Run all token fetches concurrently
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Build result dict, handling any errors
tokens = {}
if is_ci:
# Sequential execution in CI to reduce Nextcloud load
logger.info("Running in CI: using sequential OAuth token acquisition")
for username, config in test_users_setup.items():
logger.info(f"Fetching OAuth token for {username}...")
tokens[username] = await _get_oauth_token_for_user(
browser,
shared_oauth_client_credentials,
auth_states,
username,
config["password"],
)
# Add delay between users to give Nextcloud breathing room
await asyncio.sleep(1.0)
else:
# Parallel execution locally for speed
logger.info("Running locally: using parallel OAuth token acquisition")
async def get_token_with_delay(username: str, config: dict, delay: float):
"""Get token for a user after a small delay to stagger requests."""
if delay > 0:
await asyncio.sleep(delay)
return await _get_oauth_token_for_user(
browser,
shared_oauth_client_credentials,
auth_states,
username,
config["password"],
)
# Create tasks for all users with staggered starts (0.5s apart)
tasks = {
username: get_token_with_delay(username, config, idx * 0.5)
for idx, (username, config) in enumerate(test_users_setup.items())
}
# Run all token fetches concurrently
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Build result dict, handling any errors
for username, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
logger.error(f"Failed to get OAuth token for {username}: {result}")
raise result
tokens[username] = result
for username, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
logger.error(f"Failed to get OAuth token for {username}: {result}")
raise result
tokens[username] = result
elapsed = time.time() - start_time
logger.info(
f"Successfully fetched {len(tokens)} OAuth tokens {mode} "
f"Successfully fetched {len(tokens)} OAuth tokens in parallel "
f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)"
)
return tokens
# Session-scoped OAuth token fixtures
# Session-scoped OAuth token fixtures - now use the parallel fixture
@pytest.fixture(scope="session")
async def alice_oauth_token(anyio_backend, all_oauth_tokens) -> str:
"""OAuth token for alice (cached for session). Uses shared OAuth client."""
@@ -1145,6 +1172,18 @@ async def bob_oauth_token(anyio_backend, all_oauth_tokens) -> str:
return all_oauth_tokens["bob"]
@pytest.fixture(scope="session")
async def charlie_oauth_token(anyio_backend, all_oauth_tokens) -> str:
"""OAuth token for charlie (cached for session). Uses shared OAuth client."""
return all_oauth_tokens["charlie"]
@pytest.fixture(scope="session")
async def diana_oauth_token(anyio_backend, all_oauth_tokens) -> str:
"""OAuth token for diana (cached for session). Uses shared OAuth client."""
return all_oauth_tokens["diana"]
@pytest.fixture(scope="session")
async def alice_mcp_client(
anyio_backend,
@@ -1172,6 +1211,34 @@ async def bob_mcp_client(
yield session
@pytest.fixture(scope="session")
async def charlie_mcp_client(
anyio_backend,
charlie_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as charlie (editor role, in 'editors' group)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=charlie_oauth_token,
client_name="Charlie MCP",
):
yield session
@pytest.fixture(scope="session")
async def diana_mcp_client(
anyio_backend,
diana_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as diana (no-access role)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=diana_oauth_token,
client_name="Diana MCP",
):
yield session
# Test user/group fixtures for clean test isolation
@pytest.fixture
async def test_user(nc_client: NextcloudClient):