test: Migrate load test benchmark scripts to anyio

Remove unused redis container
This commit is contained in:
Chris Coutinho
2025-10-18 22:40:50 +02:00
parent 6158a890af
commit 2f805e54b7
4 changed files with 128 additions and 185 deletions
-7
View File
@@ -14,19 +14,12 @@ services:
- MYSQL_DATABASE=nextcloud
- MYSQL_USER=nextcloud
# Note: Redis is an external service. You can find more information about the configuration here:
# https://hub.docker.com/_/redis
redis:
image: docker.io/library/redis:alpine@sha256:59b6e694653476de2c992937ebe1c64182af4728e54bb49e9b7a6c26614d8933
restart: always
app:
image: docker.io/library/nextcloud:32.0.0@sha256:3e70e4dfe882ef44738fdc30d9896fb07c12febb27c4a1177e3d63dc0004a0b4
restart: always
ports:
- 0.0.0.0:8080:80
depends_on:
- redis
- db
volumes:
- nextcloud:/var/www/html
+58 -125
View File
@@ -854,11 +854,9 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
"""
Create test users for multi-user OAuth testing.
Creates four test users:
Creates two test users to reduce CI resource usage:
- alice: Owner role, creates resources
- bob: Viewer role, read-only access
- charlie: Editor role, can edit (in 'editors' group)
- diana: No-access role, no shares
"""
test_user_configs = {
"alice": {
@@ -873,50 +871,12 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
"display_name": "Bob Viewer",
"groups": [],
},
"charlie": {
"password": "CharlieSecurePass789!",
"email": "charlie@example.com",
"display_name": "Charlie Editor",
"groups": ["editors"],
},
"diana": {
"password": "DianaSecurePass012!",
"email": "diana@example.com",
"display_name": "Diana NoAccess",
"groups": [],
},
}
logger.info("Creating test users for multi-user OAuth testing...")
created_users = []
try:
# Create the 'editors' group first (charlie needs it)
try:
# Use admin nc_client to create the group via User API
# First, try to create it (will fail if exists, but that's okay)
async with httpx.AsyncClient() as http_client:
base_url = str(nc_client._client.base_url)
# Get password from environment since nc_client doesn't expose it
password = os.getenv("NEXTCLOUD_PASSWORD")
response = await http_client.post(
f"{base_url}/ocs/v2.php/cloud/groups",
auth=(nc_client.username, password),
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data={"groupid": "editors"},
)
if response.status_code in [
200,
409,
]: # 200 = created, 409 = already exists
logger.info("Editors group ready")
else:
logger.warning(
f"Group creation returned {response.status_code}: {response.text}"
)
except Exception as e:
logger.warning(f"Error creating editors group (may already exist): {e}")
# Create each test user
for username, config in test_user_configs.items():
try:
@@ -929,14 +889,6 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient):
logger.info(f"Created test user: {username}")
created_users.append(username)
# Add user to groups if specified
for group in config["groups"]:
try:
await nc_client.users.add_user_to_group(username, group)
logger.info(f"Added {username} to group {group}")
except Exception as e:
logger.warning(f"Error adding {username} to group {group}: {e}")
except Exception as e:
# User might already exist, that's okay
logger.warning(
@@ -1094,7 +1046,7 @@ async def _get_oauth_token_for_user(
return access_token
# Parallel token retrieval fixture - fetches all OAuth tokens concurrently
# OAuth token retrieval fixture - parallel locally, sequential in CI
@pytest.fixture(scope="session")
async def all_oauth_tokens(
anyio_backend,
@@ -1104,13 +1056,13 @@ async def all_oauth_tokens(
oauth_callback_server,
) -> dict[str, str]:
"""
Fetch OAuth tokens for all test users in parallel for speed.
Fetch OAuth tokens for all test users.
In CI (GitHub Actions), fetches sequentially to reduce load on Nextcloud.
Locally, fetches in parallel for speed.
Returns a dict mapping username to OAuth access token.
This is significantly faster than fetching tokens sequentially.
Now uses the real callback server with state parameters for reliable
concurrent token acquisition without race conditions.
Uses the real callback server with state parameters for reliable token acquisition.
"""
import asyncio
import time
@@ -1119,47 +1071,68 @@ async def all_oauth_tokens(
auth_states, callback_url = oauth_callback_server
start_time = time.time()
logger.info("Fetching OAuth tokens for all users in parallel...")
is_ci = os.getenv("GITHUB_ACTIONS") == "true"
mode = "sequentially" if is_ci else "in parallel"
logger.info(f"Fetching OAuth tokens for all users {mode} (CI={is_ci})...")
logger.info(f"Using callback server at {callback_url} with state-based correlation")
async def get_token_with_delay(username: str, config: dict, delay: float):
"""Get token for a user after a small delay to stagger requests."""
if delay > 0:
await asyncio.sleep(delay)
return await _get_oauth_token_for_user(
browser,
shared_oauth_client_credentials,
auth_states,
username,
config["password"],
)
# Create tasks for all users with staggered starts (0.5s apart)
tasks = {
username: get_token_with_delay(username, config, idx * 0.5)
for idx, (username, config) in enumerate(test_users_setup.items())
}
# Run all token fetches concurrently
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Build result dict, handling any errors
tokens = {}
for username, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
logger.error(f"Failed to get OAuth token for {username}: {result}")
raise result
tokens[username] = result
if is_ci:
# Sequential execution in CI to reduce Nextcloud load
logger.info("Running in CI: using sequential OAuth token acquisition")
for username, config in test_users_setup.items():
logger.info(f"Fetching OAuth token for {username}...")
tokens[username] = await _get_oauth_token_for_user(
browser,
shared_oauth_client_credentials,
auth_states,
username,
config["password"],
)
# Add delay between users to give Nextcloud breathing room
await asyncio.sleep(1.0)
else:
# Parallel execution locally for speed
logger.info("Running locally: using parallel OAuth token acquisition")
async def get_token_with_delay(username: str, config: dict, delay: float):
"""Get token for a user after a small delay to stagger requests."""
if delay > 0:
await asyncio.sleep(delay)
return await _get_oauth_token_for_user(
browser,
shared_oauth_client_credentials,
auth_states,
username,
config["password"],
)
# Create tasks for all users with staggered starts (0.5s apart)
tasks = {
username: get_token_with_delay(username, config, idx * 0.5)
for idx, (username, config) in enumerate(test_users_setup.items())
}
# Run all token fetches concurrently
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Build result dict, handling any errors
for username, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
logger.error(f"Failed to get OAuth token for {username}: {result}")
raise result
tokens[username] = result
elapsed = time.time() - start_time
logger.info(
f"Successfully fetched {len(tokens)} OAuth tokens in parallel "
f"Successfully fetched {len(tokens)} OAuth tokens {mode} "
f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)"
)
return tokens
# Session-scoped OAuth token fixtures - now use the parallel fixture
# Session-scoped OAuth token fixtures
@pytest.fixture(scope="session")
async def alice_oauth_token(anyio_backend, all_oauth_tokens) -> str:
"""OAuth token for alice (cached for session). Uses shared OAuth client."""
@@ -1172,18 +1145,6 @@ async def bob_oauth_token(anyio_backend, all_oauth_tokens) -> str:
return all_oauth_tokens["bob"]
@pytest.fixture(scope="session")
async def charlie_oauth_token(anyio_backend, all_oauth_tokens) -> str:
"""OAuth token for charlie (cached for session). Uses shared OAuth client."""
return all_oauth_tokens["charlie"]
@pytest.fixture(scope="session")
async def diana_oauth_token(anyio_backend, all_oauth_tokens) -> str:
"""OAuth token for diana (cached for session). Uses shared OAuth client."""
return all_oauth_tokens["diana"]
@pytest.fixture(scope="session")
async def alice_mcp_client(
anyio_backend,
@@ -1211,34 +1172,6 @@ async def bob_mcp_client(
yield session
@pytest.fixture(scope="session")
async def charlie_mcp_client(
anyio_backend,
charlie_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as charlie (editor role, in 'editors' group)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=charlie_oauth_token,
client_name="Charlie MCP",
):
yield session
@pytest.fixture(scope="session")
async def diana_mcp_client(
anyio_backend,
diana_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as diana (no-access role)."""
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=diana_oauth_token,
client_name="Diana MCP",
):
yield session
# Test user/group fixtures for clean test isolation
@pytest.fixture
async def test_user(nc_client: NextcloudClient):
+15 -21
View File
@@ -7,7 +7,6 @@ Usage:
uv run python -m tests.load.benchmark -c 50 -d 300 --output results.json
"""
import asyncio
import json
import logging
import signal
@@ -254,7 +253,7 @@ async def wait_for_mcp_server(url: str, max_attempts: int = 10) -> bool:
except Exception as e:
if attempt < max_attempts:
logger.debug(f"Attempt {attempt}/{max_attempts}: {e}")
await asyncio.sleep(2)
await anyio.sleep(2)
else:
logger.error(f"MCP server not ready after {max_attempts} attempts")
return False
@@ -267,7 +266,7 @@ async def benchmark_worker(
url: str,
duration: float,
metrics: BenchmarkMetrics,
stop_event: asyncio.Event,
stop_event: anyio.Event,
):
"""Single worker that runs operations for the specified duration."""
logger.info(f"Worker {worker_id} starting...")
@@ -293,7 +292,7 @@ async def benchmark_worker(
operation_count += 1
# Small delay to prevent overwhelming the server
await asyncio.sleep(0.01)
await anyio.sleep(0.01)
# Cleanup
await ops.cleanup()
@@ -312,7 +311,7 @@ async def run_benchmark(
) -> BenchmarkMetrics:
"""Run the benchmark with specified parameters."""
metrics = BenchmarkMetrics()
stop_event = asyncio.Event()
stop_event = anyio.Event()
# Setup signal handlers for graceful shutdown
def signal_handler(sig, frame):
@@ -331,27 +330,22 @@ async def run_benchmark(
# Warmup period
if warmup > 0:
print("Warming up...")
await asyncio.sleep(warmup)
await anyio.sleep(warmup)
# Start metrics collection
metrics.start()
# Create and run workers
workers = [
benchmark_worker(i, url, duration, metrics, stop_event)
for i in range(concurrency)
]
# Create and run workers using anyio task groups
async with anyio.create_task_group() as tg:
# Start all workers
for i in range(concurrency):
tg.start_soon(benchmark_worker, i, url, duration, metrics, stop_event)
# Show progress
progress_task = asyncio.create_task(show_progress(duration, metrics, stop_event))
# Show progress
tg.start_soon(show_progress, duration, metrics, stop_event)
# Wait for all workers to complete
await asyncio.gather(*workers, return_exceptions=True)
# Stop metrics and progress
# Stop metrics (tasks already completed when task group exits)
metrics.stop()
stop_event.set()
await progress_task
return metrics
@@ -359,7 +353,7 @@ async def run_benchmark(
async def show_progress(
duration: float,
metrics: BenchmarkMetrics,
stop_event: asyncio.Event,
stop_event: anyio.Event,
):
"""Show real-time progress during benchmark."""
start_time = time.time()
@@ -387,7 +381,7 @@ async def show_progress(
flush=True,
)
await asyncio.sleep(0.5)
await anyio.sleep(0.5)
print() # New line after progress
+55 -32
View File
@@ -10,7 +10,6 @@ Usage:
uv run python -m tests.load.oauth_benchmark -u 10 -d 300 --workload sharing
"""
import asyncio
import json
import logging
import os
@@ -223,7 +222,7 @@ async def oauth_benchmark_worker(
workload: MixedOAuthWorkload,
duration: float,
metrics: OAuthBenchmarkMetrics,
stop_event: asyncio.Event,
stop_event: anyio.Event,
):
"""
Single worker executing operations for one user.
@@ -258,13 +257,13 @@ async def oauth_benchmark_worker(
operation_count += 1
# Small delay to prevent overwhelming the server
await asyncio.sleep(0.05)
await anyio.sleep(0.05)
logger.info(
f"Worker for {user_wrapper.username} completed {operation_count} operations"
)
except asyncio.CancelledError:
except anyio.get_cancelled_exc_class():
# Handle task cancellation gracefully (e.g., during benchmark shutdown)
logger.info(
f"Worker for {user_wrapper.username} was cancelled "
@@ -278,7 +277,7 @@ async def oauth_benchmark_worker(
async def show_progress(
duration: float,
metrics: OAuthBenchmarkMetrics,
stop_event: asyncio.Event,
stop_event: anyio.Event,
):
"""Show real-time progress during benchmark."""
start_time = time.time()
@@ -306,7 +305,7 @@ async def show_progress(
flush=True,
)
await asyncio.sleep(0.5)
await anyio.sleep(0.5)
print() # New line after progress
@@ -338,7 +337,7 @@ async def run_oauth_benchmark(
OAuthBenchmarkMetrics with results
"""
metrics = OAuthBenchmarkMetrics()
stop_event = asyncio.Event()
stop_event = anyio.Event()
created_users: list[str] = []
callback_server: OAuthCallbackServer | None = None
user_pool: OAuthUserPool | None = None
@@ -437,12 +436,23 @@ async def run_oauth_benchmark(
browser = await browser_launcher.launch(headless=not headed)
try:
# Create all users concurrently
tasks = [
create_user_task(i, browser, callback_server.auth_states)
for i in range(num_users)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Create all users concurrently using anyio task groups
results = []
async def run_and_collect(i: int):
"""Wrapper to collect results from tasks."""
try:
result = await create_user_task(
i, browser, callback_server.auth_states
)
results.append(result)
except Exception as e:
logger.error(f"User creation task failed: {e}")
results.append(e)
async with anyio.create_task_group() as tg:
for i in range(num_users):
tg.start_soon(run_and_collect, i)
# Process results
for result in results:
@@ -484,13 +494,21 @@ async def run_oauth_benchmark(
logger.error(f"Failed to create session for {username}: {e}")
return None
# Create all sessions concurrently
session_tasks = [
create_session_task(username) for username in created_users
]
session_results = await asyncio.gather(
*session_tasks, return_exceptions=True
)
# Create all sessions concurrently using anyio task groups
session_results = []
async def run_and_collect_session(username: str):
"""Wrapper to collect session results from tasks."""
try:
result = await create_session_task(username)
session_results.append(result)
except Exception as e:
logger.error(f"Session creation task failed: {e}")
session_results.append(e)
async with anyio.create_task_group() as tg:
for username in created_users:
tg.start_soon(run_and_collect_session, username)
# Process results
for result in session_results:
@@ -508,7 +526,7 @@ async def run_oauth_benchmark(
# Warmup period
if warmup > 0:
print(f"Warmup period: {warmup}s...")
await asyncio.sleep(warmup)
await anyio.sleep(warmup)
print()
# Start benchmark
@@ -518,21 +536,26 @@ async def run_oauth_benchmark(
metrics.start()
# Create workload and workers
# Create workload and workers using anyio task groups
workload = MixedOAuthWorkload(user_wrappers)
workers = [
oauth_benchmark_worker(wrapper, workload, duration, metrics, stop_event)
for wrapper in user_wrappers
]
# Run workers with progress display
progress_task = asyncio.create_task(
show_progress(duration, metrics, stop_event)
)
await asyncio.gather(*workers, return_exceptions=True)
stop_event.set()
await progress_task
async with anyio.create_task_group() as tg:
# Start all workers
for wrapper in user_wrappers:
tg.start_soon(
oauth_benchmark_worker,
wrapper,
workload,
duration,
metrics,
stop_event,
)
# Show progress
tg.start_soon(show_progress, duration, metrics, stop_event)
# Tasks already completed when task group exits
metrics.stop()
print(f"\n{'=' * 80}")