test: Add load testing framework

This commit is contained in:
Chris Coutinho
2025-10-18 13:59:20 +02:00
parent 90b4b2a038
commit 955ad78f13
4 changed files with 839 additions and 0 deletions
+47
View File
@@ -19,6 +19,53 @@ uv run pytest --cov
uv run pytest -m "not integration"
```
### Load Testing
```bash
# Run benchmark with default settings (10 workers, 30 seconds)
uv run python -m tests.load.benchmark
# Quick test with custom concurrency and duration
uv run python -m tests.load.benchmark --concurrency 20 --duration 60
# Extended load test (50 workers for 5 minutes)
uv run python -m tests.load.benchmark -c 50 -d 300
# Export results to JSON for analysis
uv run python -m tests.load.benchmark -c 20 -d 60 --output results.json
# Test OAuth server on port 8001
uv run python -m tests.load.benchmark --url http://127.0.0.1:8001/mcp
# Verbose mode with detailed logging
uv run python -m tests.load.benchmark -c 10 -d 30 --verbose
```
**Load Testing Features:**
- **Mixed workload** simulating realistic MCP usage (40% reads, 20% writes, 15% search, 25% other operations)
- **Real-time progress** bar with live RPS and error counts
- **Detailed metrics**:
- Throughput (requests/second)
- Latency percentiles (p50, p90, p95, p99)
- Per-operation breakdown
- Error rates and types
- **Automatic cleanup** of test data
- **JSON export** for CI/CD integration
- **Server health checks** before starting
**Understanding Results:**
- **Requests/Second (RPS)**: Higher is better. Expected baseline: 50-200 RPS for mixed workload
- **Latency**:
- p50 (median): Should be <100ms for most operations
- p95: Should be <500ms
- p99: Should be <1000ms
- **Error Rate**: Should be <1% under normal load
**Common Bottlenecks:**
1. Nextcloud backend API response times (most common)
2. Database connection limits
3. HTTP client connection pooling
4. Network I/O between containers
### Code Quality
```bash
# Format and lint code
+1
View File
@@ -0,0 +1 @@
"""Load testing utilities for Nextcloud MCP Server."""
+509
View File
@@ -0,0 +1,509 @@
#!/usr/bin/env python3
"""
Load testing benchmark for Nextcloud MCP Server.
Usage:
uv run python -m tests.load.benchmark --concurrency 10 --duration 30
uv run python -m tests.load.benchmark -c 50 -d 300 --output results.json
"""
import asyncio
import json
import logging
import signal
import statistics
import sys
import time
from collections import Counter
from contextlib import asynccontextmanager
from typing import Any
import click
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from tests.load.workloads import MixedWorkload, OperationResult, WorkloadOperations
logging.basicConfig(
level=logging.WARNING, format="%(levelname)s [%(asctime)s] %(name)s - %(message)s"
)
logger = logging.getLogger(__name__)
class BenchmarkMetrics:
"""Collect and analyze benchmark metrics."""
def __init__(self):
self.results: list[OperationResult] = []
self.start_time: float | None = None
self.end_time: float | None = None
self._operation_counts: Counter = Counter()
self._operation_errors: Counter = Counter()
def add_result(self, result: OperationResult):
"""Add a single operation result."""
self.results.append(result)
self._operation_counts[result.operation] += 1
if not result.success:
self._operation_errors[result.operation] += 1
def start(self):
"""Mark the start of the benchmark."""
self.start_time = time.time()
def stop(self):
"""Mark the end of the benchmark."""
self.end_time = time.time()
@property
def duration(self) -> float:
"""Total benchmark duration in seconds."""
if self.start_time is None or self.end_time is None:
return 0.0
return self.end_time - self.start_time
@property
def total_requests(self) -> int:
"""Total number of requests made."""
return len(self.results)
@property
def successful_requests(self) -> int:
"""Number of successful requests."""
return sum(1 for r in self.results if r.success)
@property
def failed_requests(self) -> int:
"""Number of failed requests."""
return sum(1 for r in self.results if not r.success)
@property
def error_rate(self) -> float:
"""Error rate as a percentage."""
if self.total_requests == 0:
return 0.0
return (self.failed_requests / self.total_requests) * 100
@property
def requests_per_second(self) -> float:
"""Average requests per second."""
if self.duration == 0:
return 0.0
return self.total_requests / self.duration
def latency_stats(self) -> dict[str, float]:
"""Calculate latency statistics."""
if not self.results:
return {
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"median": 0.0,
"p90": 0.0,
"p95": 0.0,
"p99": 0.0,
}
durations = [r.duration for r in self.results]
sorted_durations = sorted(durations)
def percentile(data: list[float], p: float) -> float:
k = (len(data) - 1) * p
f = int(k)
c = f + 1
if c >= len(data):
return data[-1]
return data[f] + (k - f) * (data[c] - data[f])
return {
"min": min(durations),
"max": max(durations),
"mean": statistics.mean(durations),
"median": statistics.median(durations),
"p90": percentile(sorted_durations, 0.90),
"p95": percentile(sorted_durations, 0.95),
"p99": percentile(sorted_durations, 0.99),
}
def operation_breakdown(self) -> dict[str, dict[str, Any]]:
"""Get per-operation statistics."""
breakdown = {}
for op_name in self._operation_counts:
op_results = [r for r in self.results if r.operation == op_name]
op_durations = [r.duration for r in op_results if r.success]
if op_durations:
sorted_durations = sorted(op_durations)
p50 = statistics.median(sorted_durations)
p95_idx = int(len(sorted_durations) * 0.95)
p95 = sorted_durations[min(p95_idx, len(sorted_durations) - 1)]
else:
p50 = p95 = 0.0
breakdown[op_name] = {
"count": self._operation_counts[op_name],
"errors": self._operation_errors[op_name],
"success_rate": (
(self._operation_counts[op_name] - self._operation_errors[op_name])
/ self._operation_counts[op_name]
* 100
),
"p50_latency": p50,
"p95_latency": p95,
}
return breakdown
def to_dict(self) -> dict[str, Any]:
"""Convert metrics to dictionary for JSON export."""
return {
"summary": {
"duration": self.duration,
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"error_rate": self.error_rate,
"requests_per_second": self.requests_per_second,
},
"latency": self.latency_stats(),
"operations": self.operation_breakdown(),
}
def print_report(self):
"""Print human-readable benchmark report."""
print("\n" + "=" * 80)
print("BENCHMARK RESULTS")
print("=" * 80)
print(f"\nDuration: {self.duration:.2f}s")
print(f"Total Requests: {self.total_requests}")
print(f"Successful: {self.successful_requests}")
print(f"Failed: {self.failed_requests}")
print(f"Error Rate: {self.error_rate:.2f}%")
print(f"Requests/Second: {self.requests_per_second:.2f}")
print("\n" + "-" * 80)
print("LATENCY (seconds)")
print("-" * 80)
latency = self.latency_stats()
print(f"Min: {latency['min']:.4f}s")
print(f"Mean: {latency['mean']:.4f}s")
print(f"Median: {latency['median']:.4f}s")
print(f"P90: {latency['p90']:.4f}s")
print(f"P95: {latency['p95']:.4f}s")
print(f"P99: {latency['p99']:.4f}s")
print(f"Max: {latency['max']:.4f}s")
print("\n" + "-" * 80)
print("OPERATION BREAKDOWN")
print("-" * 80)
print(
f"{'Operation':<25} {'Count':>8} {'Errors':>8} {'Success':>9} {'P50':>10} {'P95':>10}"
)
print("-" * 80)
breakdown = self.operation_breakdown()
for op_name, stats in sorted(breakdown.items()):
print(
f"{op_name:<25} {stats['count']:>8} {stats['errors']:>8} "
f"{stats['success_rate']:>8.1f}% {stats['p50_latency']:>9.4f}s {stats['p95_latency']:>9.4f}s"
)
print("=" * 80 + "\n")
@asynccontextmanager
async def create_mcp_session(url: str):
"""Create an MCP client session with proper cleanup."""
logger.info(f"Creating MCP client session for {url}")
streamable_context = streamablehttp_client(url)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("MCP client session initialized")
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing streamable context: {e}")
async def wait_for_mcp_server(url: str, max_attempts: int = 10) -> bool:
"""Wait for MCP server to be ready."""
logger.info(f"Waiting for MCP server at {url}...")
for attempt in range(1, max_attempts + 1):
try:
async with create_mcp_session(url) as session:
# Try to get capabilities
await session.read_resource("nc://capabilities")
logger.info("MCP server is ready")
return True
except Exception as e:
if attempt < max_attempts:
logger.debug(f"Attempt {attempt}/{max_attempts}: {e}")
await asyncio.sleep(2)
else:
logger.error(f"MCP server not ready after {max_attempts} attempts")
return False
return False
async def benchmark_worker(
worker_id: int,
url: str,
duration: float,
metrics: BenchmarkMetrics,
stop_event: asyncio.Event,
):
"""Single worker that runs operations for the specified duration."""
logger.info(f"Worker {worker_id} starting...")
try:
async with create_mcp_session(url) as session:
ops = WorkloadOperations(session)
workload = MixedWorkload(ops)
# Warmup
await workload.warmup(count=5)
# Run operations until duration expires or stop event is set
start_time = time.time()
operation_count = 0
while not stop_event.is_set():
if time.time() - start_time >= duration:
break
result = await workload.run_operation()
metrics.add_result(result)
operation_count += 1
# Small delay to prevent overwhelming the server
await asyncio.sleep(0.01)
# Cleanup
await ops.cleanup()
logger.info(f"Worker {worker_id} completed {operation_count} operations")
except Exception as e:
logger.error(f"Worker {worker_id} error: {e}", exc_info=True)
async def run_benchmark(
url: str,
concurrency: int,
duration: float,
warmup: float = 5.0,
) -> BenchmarkMetrics:
"""Run the benchmark with specified parameters."""
metrics = BenchmarkMetrics()
stop_event = asyncio.Event()
# Setup signal handlers for graceful shutdown
def signal_handler(sig, frame):
logger.warning("Received interrupt signal, stopping benchmark...")
stop_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print(
f"\nStarting benchmark with {concurrency} concurrent workers for {duration}s..."
)
print(f"Target: {url}")
print(f"Warmup period: {warmup}s\n")
# Warmup period
if warmup > 0:
print("Warming up...")
await asyncio.sleep(warmup)
# Start metrics collection
metrics.start()
# Create and run workers
workers = [
benchmark_worker(i, url, duration, metrics, stop_event)
for i in range(concurrency)
]
# Show progress
progress_task = asyncio.create_task(show_progress(duration, metrics, stop_event))
# Wait for all workers to complete
await asyncio.gather(*workers, return_exceptions=True)
# Stop metrics and progress
metrics.stop()
stop_event.set()
await progress_task
return metrics
async def show_progress(
duration: float,
metrics: BenchmarkMetrics,
stop_event: asyncio.Event,
):
"""Show real-time progress during benchmark."""
start_time = time.time()
while not stop_event.is_set():
elapsed = time.time() - start_time
if elapsed >= duration:
break
# Calculate progress
progress = min(elapsed / duration * 100, 100)
rps = metrics.total_requests / max(elapsed, 0.1)
# Print progress bar
bar_length = 40
filled = int(bar_length * progress / 100)
bar = "" * filled + "" * (bar_length - filled)
print(
f"\r[{bar}] {progress:5.1f}% | "
f"Requests: {metrics.total_requests:6d} | "
f"RPS: {rps:6.1f} | "
f"Errors: {metrics.failed_requests:4d}",
end="",
flush=True,
)
await asyncio.sleep(0.5)
print() # New line after progress
@click.command()
@click.option(
"--concurrency",
"-c",
type=int,
default=10,
show_default=True,
help="Number of concurrent workers",
)
@click.option(
"--duration",
"-d",
type=float,
default=30.0,
show_default=True,
help="Test duration in seconds",
)
@click.option(
"--warmup",
"-w",
type=float,
default=5.0,
show_default=True,
help="Warmup duration before collecting metrics (seconds)",
)
@click.option(
"--url",
"-u",
default="http://127.0.0.1:8000/mcp",
show_default=True,
help="MCP server URL",
)
@click.option(
"--output",
"-o",
type=click.Path(),
help="Output file for JSON results (optional)",
)
@click.option(
"--wait-for-server/--no-wait",
default=True,
show_default=True,
help="Wait for MCP server to be ready before starting",
)
@click.option(
"--verbose",
"-v",
is_flag=True,
help="Enable verbose logging",
)
def main(
concurrency: int,
duration: float,
warmup: float,
url: str,
output: str | None,
wait_for_server: bool,
verbose: bool,
):
"""
Load testing benchmark for Nextcloud MCP Server.
Runs a mixed workload of realistic MCP operations against the server
and reports detailed performance metrics.
Examples:
# Quick 30-second test with 10 workers
uv run python -m tests.load.benchmark --concurrency 10 --duration 30
# Extended test with 50 workers for 5 minutes
uv run python -m tests.load.benchmark -c 50 -d 300
# Export results to JSON
uv run python -m tests.load.benchmark -c 20 -d 60 --output results.json
# Test OAuth server on port 8001
uv run python -m tests.load.benchmark --url http://127.0.0.1:8001/mcp
"""
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("tests.load").setLevel(logging.DEBUG)
async def run():
# Wait for server if requested
if wait_for_server:
if not await wait_for_mcp_server(url):
print("ERROR: MCP server is not ready", file=sys.stderr)
sys.exit(1)
# Run benchmark
metrics = await run_benchmark(url, concurrency, duration, warmup)
# Print report
metrics.print_report()
# Export to JSON if requested
if output:
with open(output, "w") as f:
json.dump(metrics.to_dict(), f, indent=2)
print(f"Results exported to: {output}")
try:
asyncio.run(run())
except KeyboardInterrupt:
print("\nBenchmark interrupted by user")
sys.exit(130)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
if verbose:
raise
sys.exit(1)
if __name__ == "__main__":
main()
+282
View File
@@ -0,0 +1,282 @@
"""
Workload definitions for load testing the MCP server.
Defines realistic operation mixes and individual operation functions.
"""
import logging
import random
import time
import uuid
from mcp import ClientSession
logger = logging.getLogger(__name__)
class OperationResult:
"""Result of a single operation execution."""
def __init__(
self,
operation: str,
success: bool,
duration: float,
error: str | None = None,
):
self.operation = operation
self.success = success
self.duration = duration
self.error = error
self.timestamp = time.time()
class WorkloadOperations:
"""Collection of MCP operations for load testing."""
def __init__(self, session: ClientSession):
self.session = session
self._created_notes: list[int] = []
self._created_boards: list[int] = []
async def get_capabilities(self) -> OperationResult:
"""Fetch server capabilities (lightweight operation)."""
start = time.time()
try:
await self.session.read_resource("nc://capabilities")
duration = time.time() - start
return OperationResult("get_capabilities", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("get_capabilities", False, duration, str(e))
async def list_notes(self) -> OperationResult:
"""List all notes (read operation)."""
start = time.time()
try:
await self.session.call_tool("nc_notes_search_notes", {"query": ""})
duration = time.time() - start
return OperationResult("list_notes", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("list_notes", False, duration, str(e))
async def search_notes(self, query: str = "test") -> OperationResult:
"""Search notes by query (read operation with filtering)."""
start = time.time()
try:
await self.session.call_tool("nc_notes_search_notes", {"query": query})
duration = time.time() - start
return OperationResult("search_notes", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("search_notes", False, duration, str(e))
async def create_note(self) -> OperationResult:
"""Create a new note (write operation)."""
start = time.time()
unique_id = uuid.uuid4().hex[:8]
try:
result = await self.session.call_tool(
"nc_notes_create_note",
{
"title": f"Load Test Note {unique_id}",
"content": f"Content for load test note {unique_id}",
"category": "LoadTesting",
},
)
duration = time.time() - start
# Track created note ID for cleanup
if result and len(result.content) > 0:
content = result.content[0]
if hasattr(content, "text"):
import json
note_data = json.loads(content.text)
note_id = note_data.get("id")
if note_id:
self._created_notes.append(note_id)
return OperationResult("create_note", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("create_note", False, duration, str(e))
async def get_note(self, note_id: int) -> OperationResult:
"""Get a specific note by ID (read operation)."""
start = time.time()
try:
await self.session.call_tool("nc_notes_get_note", {"note_id": note_id})
duration = time.time() - start
return OperationResult("get_note", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("get_note", False, duration, str(e))
async def update_note(self, note_id: int, etag: str) -> OperationResult:
"""Update an existing note (write operation)."""
start = time.time()
try:
await self.session.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": etag,
"title": f"Updated Note {note_id}",
"content": f"Updated content at {time.time()}",
"category": "LoadTesting",
},
)
duration = time.time() - start
return OperationResult("update_note", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("update_note", False, duration, str(e))
async def delete_note(self, note_id: int) -> OperationResult:
"""Delete a note (write operation)."""
start = time.time()
try:
await self.session.call_tool("nc_notes_delete_note", {"note_id": note_id})
duration = time.time() - start
# Remove from tracking
if note_id in self._created_notes:
self._created_notes.remove(note_id)
return OperationResult("delete_note", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("delete_note", False, duration, str(e))
async def list_webdav_files(self, path: str = "/") -> OperationResult:
"""List files via WebDAV (read operation)."""
start = time.time()
try:
await self.session.call_tool("nc_webdav_list", {"path": path})
duration = time.time() - start
return OperationResult("list_webdav_files", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("list_webdav_files", False, duration, str(e))
async def list_calendars(self) -> OperationResult:
"""List calendars (read operation)."""
start = time.time()
try:
await self.session.call_tool("nc_calendar_list_calendars", {})
duration = time.time() - start
return OperationResult("list_calendars", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("list_calendars", False, duration, str(e))
async def list_deck_boards(self) -> OperationResult:
"""List deck boards (read operation)."""
start = time.time()
try:
await self.session.call_tool("nc_deck_list_boards", {})
duration = time.time() - start
return OperationResult("list_deck_boards", True, duration)
except Exception as e:
duration = time.time() - start
return OperationResult("list_deck_boards", False, duration, str(e))
async def cleanup(self):
"""Clean up any resources created during testing."""
logger.info(f"Cleaning up {len(self._created_notes)} test notes...")
for note_id in self._created_notes[:]:
try:
await self.delete_note(note_id)
except Exception as e:
logger.warning(f"Failed to delete note {note_id}: {e}")
class MixedWorkload:
"""
Realistic mixed workload simulating typical MCP server usage.
Operation distribution:
- 40% Notes read (list/get/search)
- 20% Notes write (create/update/delete)
- 15% Notes search
- 10% WebDAV operations
- 10% Calendar operations
- 5% Other (capabilities, deck)
"""
def __init__(self, operations: WorkloadOperations):
self.ops = operations
# Pre-create some notes for read/update operations
self._warmup_note_ids: list[tuple[int, str]] = []
async def warmup(self, count: int = 10):
"""Create initial notes for read/update operations."""
logger.info(f"Warming up with {count} test notes...")
for _ in range(count):
result = await self.ops.create_note()
if result.success and self.ops._created_notes:
note_id = self.ops._created_notes[-1]
# Get the note to fetch its etag
try:
get_result = await self.ops.session.call_tool(
"nc_notes_get_note", {"note_id": note_id}
)
if get_result and len(get_result.content) > 0:
import json
note_data = json.loads(get_result.content[0].text)
etag = note_data.get("etag", "")
self._warmup_note_ids.append((note_id, etag))
except Exception as e:
logger.warning(f"Failed to get etag for note {note_id}: {e}")
async def run_operation(self) -> OperationResult:
"""Execute one random operation based on the workload distribution."""
rand = random.random()
# 40% reads (list/get/search)
if rand < 0.40:
op_rand = random.random()
if op_rand < 0.5:
return await self.ops.list_notes()
elif op_rand < 0.8 and self._warmup_note_ids:
note_id, _ = random.choice(self._warmup_note_ids)
return await self.ops.get_note(note_id)
else:
return await self.ops.search_notes()
# 20% writes (create/update/delete)
elif rand < 0.60:
op_rand = random.random()
if op_rand < 0.5:
return await self.ops.create_note()
elif op_rand < 0.8 and self._warmup_note_ids:
note_id, etag = random.choice(self._warmup_note_ids)
return await self.ops.update_note(note_id, etag)
elif self.ops._created_notes and len(self.ops._created_notes) > 5:
# Only delete if we have enough notes
note_id = random.choice(self.ops._created_notes)
return await self.ops.delete_note(note_id)
else:
return await self.ops.create_note()
# 15% search
elif rand < 0.75:
queries = ["test", "load", "note", "content", ""]
return await self.ops.search_notes(random.choice(queries))
# 10% WebDAV
elif rand < 0.85:
return await self.ops.list_webdav_files()
# 10% Calendar
elif rand < 0.95:
return await self.ops.list_calendars()
# 5% Other
else:
op_rand = random.random()
if op_rand < 0.5:
return await self.ops.get_capabilities()
else:
return await self.ops.list_deck_boards()