505 lines
15 KiB
Python
505 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load testing benchmark for Nextcloud MCP Server.
|
|
|
|
Usage:
|
|
uv run python -m tests.load.benchmark --concurrency 10 --duration 30
|
|
uv run python -m tests.load.benchmark -c 50 -d 300 --output results.json
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import signal
|
|
import statistics
|
|
import sys
|
|
import time
|
|
from collections import Counter
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any
|
|
|
|
import anyio
|
|
import click
|
|
from mcp import ClientSession
|
|
from mcp.client.streamable_http import streamablehttp_client
|
|
|
|
from tests.load.workloads import MixedWorkload, OperationResult, WorkloadOperations
|
|
|
|
logging.basicConfig(
|
|
level=logging.WARNING, format="%(levelname)s [%(asctime)s] %(name)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BenchmarkMetrics:
|
|
"""Collect and analyze benchmark metrics."""
|
|
|
|
def __init__(self):
|
|
self.results: list[OperationResult] = []
|
|
self.start_time: float | None = None
|
|
self.end_time: float | None = None
|
|
self._operation_counts: Counter = Counter()
|
|
self._operation_errors: Counter = Counter()
|
|
|
|
def add_result(self, result: OperationResult):
|
|
"""Add a single operation result."""
|
|
self.results.append(result)
|
|
self._operation_counts[result.operation] += 1
|
|
if not result.success:
|
|
self._operation_errors[result.operation] += 1
|
|
|
|
def start(self):
|
|
"""Mark the start of the benchmark."""
|
|
self.start_time = time.time()
|
|
|
|
def stop(self):
|
|
"""Mark the end of the benchmark."""
|
|
self.end_time = time.time()
|
|
|
|
@property
|
|
def duration(self) -> float:
|
|
"""Total benchmark duration in seconds."""
|
|
if self.start_time is None or self.end_time is None:
|
|
return 0.0
|
|
return self.end_time - self.start_time
|
|
|
|
@property
|
|
def total_requests(self) -> int:
|
|
"""Total number of requests made."""
|
|
return len(self.results)
|
|
|
|
@property
|
|
def successful_requests(self) -> int:
|
|
"""Number of successful requests."""
|
|
return sum(1 for r in self.results if r.success)
|
|
|
|
@property
|
|
def failed_requests(self) -> int:
|
|
"""Number of failed requests."""
|
|
return sum(1 for r in self.results if not r.success)
|
|
|
|
@property
|
|
def error_rate(self) -> float:
|
|
"""Error rate as a percentage."""
|
|
if self.total_requests == 0:
|
|
return 0.0
|
|
return (self.failed_requests / self.total_requests) * 100
|
|
|
|
@property
|
|
def requests_per_second(self) -> float:
|
|
"""Average requests per second."""
|
|
if self.duration == 0:
|
|
return 0.0
|
|
return self.total_requests / self.duration
|
|
|
|
def latency_stats(self) -> dict[str, float]:
|
|
"""Calculate latency statistics."""
|
|
if not self.results:
|
|
return {
|
|
"min": 0.0,
|
|
"max": 0.0,
|
|
"mean": 0.0,
|
|
"median": 0.0,
|
|
"p90": 0.0,
|
|
"p95": 0.0,
|
|
"p99": 0.0,
|
|
}
|
|
|
|
durations = [r.duration for r in self.results]
|
|
sorted_durations = sorted(durations)
|
|
|
|
def percentile(data: list[float], p: float) -> float:
|
|
k = (len(data) - 1) * p
|
|
f = int(k)
|
|
c = f + 1
|
|
if c >= len(data):
|
|
return data[-1]
|
|
return data[f] + (k - f) * (data[c] - data[f])
|
|
|
|
return {
|
|
"min": min(durations),
|
|
"max": max(durations),
|
|
"mean": statistics.mean(durations),
|
|
"median": statistics.median(durations),
|
|
"p90": percentile(sorted_durations, 0.90),
|
|
"p95": percentile(sorted_durations, 0.95),
|
|
"p99": percentile(sorted_durations, 0.99),
|
|
}
|
|
|
|
def operation_breakdown(self) -> dict[str, dict[str, Any]]:
|
|
"""Get per-operation statistics."""
|
|
breakdown = {}
|
|
for op_name in self._operation_counts:
|
|
op_results = [r for r in self.results if r.operation == op_name]
|
|
op_durations = [r.duration for r in op_results if r.success]
|
|
|
|
if op_durations:
|
|
sorted_durations = sorted(op_durations)
|
|
p50 = statistics.median(sorted_durations)
|
|
p95_idx = int(len(sorted_durations) * 0.95)
|
|
p95 = sorted_durations[min(p95_idx, len(sorted_durations) - 1)]
|
|
else:
|
|
p50 = p95 = 0.0
|
|
|
|
breakdown[op_name] = {
|
|
"count": self._operation_counts[op_name],
|
|
"errors": self._operation_errors[op_name],
|
|
"success_rate": (
|
|
(self._operation_counts[op_name] - self._operation_errors[op_name])
|
|
/ self._operation_counts[op_name]
|
|
* 100
|
|
),
|
|
"p50_latency": p50,
|
|
"p95_latency": p95,
|
|
}
|
|
|
|
return breakdown
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Convert metrics to dictionary for JSON export."""
|
|
return {
|
|
"summary": {
|
|
"duration": self.duration,
|
|
"total_requests": self.total_requests,
|
|
"successful_requests": self.successful_requests,
|
|
"failed_requests": self.failed_requests,
|
|
"error_rate": self.error_rate,
|
|
"requests_per_second": self.requests_per_second,
|
|
},
|
|
"latency": self.latency_stats(),
|
|
"operations": self.operation_breakdown(),
|
|
}
|
|
|
|
def print_report(self):
|
|
"""Print human-readable benchmark report."""
|
|
print("\n" + "=" * 80)
|
|
print("BENCHMARK RESULTS")
|
|
print("=" * 80)
|
|
|
|
print(f"\nDuration: {self.duration:.2f}s")
|
|
print(f"Total Requests: {self.total_requests}")
|
|
print(f"Successful: {self.successful_requests}")
|
|
print(f"Failed: {self.failed_requests}")
|
|
print(f"Error Rate: {self.error_rate:.2f}%")
|
|
print(f"Requests/Second: {self.requests_per_second:.2f}")
|
|
|
|
print("\n" + "-" * 80)
|
|
print("LATENCY (seconds)")
|
|
print("-" * 80)
|
|
latency = self.latency_stats()
|
|
print(f"Min: {latency['min']:.4f}s")
|
|
print(f"Mean: {latency['mean']:.4f}s")
|
|
print(f"Median: {latency['median']:.4f}s")
|
|
print(f"P90: {latency['p90']:.4f}s")
|
|
print(f"P95: {latency['p95']:.4f}s")
|
|
print(f"P99: {latency['p99']:.4f}s")
|
|
print(f"Max: {latency['max']:.4f}s")
|
|
|
|
print("\n" + "-" * 80)
|
|
print("OPERATION BREAKDOWN")
|
|
print("-" * 80)
|
|
print(
|
|
f"{'Operation':<25} {'Count':>8} {'Errors':>8} {'Success':>9} {'P50':>10} {'P95':>10}"
|
|
)
|
|
print("-" * 80)
|
|
|
|
breakdown = self.operation_breakdown()
|
|
for op_name, stats in sorted(breakdown.items()):
|
|
print(
|
|
f"{op_name:<25} {stats['count']:>8} {stats['errors']:>8} "
|
|
f"{stats['success_rate']:>8.1f}% {stats['p50_latency']:>9.4f}s {stats['p95_latency']:>9.4f}s"
|
|
)
|
|
|
|
print("=" * 80 + "\n")
|
|
|
|
|
|
@asynccontextmanager
|
|
async def create_mcp_session(url: str):
|
|
"""Create an MCP client session with proper cleanup."""
|
|
logger.info(f"Creating MCP client session for {url}")
|
|
streamable_context = streamablehttp_client(url)
|
|
session_context = None
|
|
|
|
try:
|
|
read_stream, write_stream, _ = await streamable_context.__aenter__()
|
|
session_context = ClientSession(read_stream, write_stream)
|
|
session = await session_context.__aenter__()
|
|
await session.initialize()
|
|
logger.info("MCP client session initialized")
|
|
yield session
|
|
finally:
|
|
if session_context is not None:
|
|
try:
|
|
await session_context.__aexit__(None, None, None)
|
|
except Exception as e:
|
|
logger.debug(f"Error closing session: {e}")
|
|
|
|
try:
|
|
await streamable_context.__aexit__(None, None, None)
|
|
except Exception as e:
|
|
logger.debug(f"Error closing streamable context: {e}")
|
|
|
|
|
|
async def wait_for_mcp_server(url: str, max_attempts: int = 10) -> bool:
|
|
"""Wait for MCP server to be ready."""
|
|
logger.info(f"Waiting for MCP server at {url}...")
|
|
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
async with create_mcp_session(url) as session:
|
|
# Try to get capabilities
|
|
await session.read_resource("nc://capabilities")
|
|
logger.info("MCP server is ready")
|
|
return True
|
|
except Exception as e:
|
|
if attempt < max_attempts:
|
|
logger.debug(f"Attempt {attempt}/{max_attempts}: {e}")
|
|
await anyio.sleep(2)
|
|
else:
|
|
logger.error(f"MCP server not ready after {max_attempts} attempts")
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
async def benchmark_worker(
|
|
worker_id: int,
|
|
url: str,
|
|
duration: float,
|
|
metrics: BenchmarkMetrics,
|
|
stop_event: anyio.Event,
|
|
):
|
|
"""Single worker that runs operations for the specified duration."""
|
|
logger.info(f"Worker {worker_id} starting...")
|
|
|
|
try:
|
|
async with create_mcp_session(url) as session:
|
|
ops = WorkloadOperations(session)
|
|
workload = MixedWorkload(ops)
|
|
|
|
# Warmup
|
|
await workload.warmup(count=5)
|
|
|
|
# Run operations until duration expires or stop event is set
|
|
start_time = time.time()
|
|
operation_count = 0
|
|
|
|
while not stop_event.is_set():
|
|
if time.time() - start_time >= duration:
|
|
break
|
|
|
|
result = await workload.run_operation()
|
|
metrics.add_result(result)
|
|
operation_count += 1
|
|
|
|
# Small delay to prevent overwhelming the server
|
|
await anyio.sleep(0.01)
|
|
|
|
# Cleanup
|
|
await ops.cleanup()
|
|
|
|
logger.info(f"Worker {worker_id} completed {operation_count} operations")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Worker {worker_id} error: {e}", exc_info=True)
|
|
|
|
|
|
async def run_benchmark(
|
|
url: str,
|
|
concurrency: int,
|
|
duration: float,
|
|
warmup: float = 5.0,
|
|
) -> BenchmarkMetrics:
|
|
"""Run the benchmark with specified parameters."""
|
|
metrics = BenchmarkMetrics()
|
|
stop_event = anyio.Event()
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
def signal_handler(sig, frame):
|
|
logger.warning("Received interrupt signal, stopping benchmark...")
|
|
stop_event.set()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
print(
|
|
f"\nStarting benchmark with {concurrency} concurrent workers for {duration}s..."
|
|
)
|
|
print(f"Target: {url}")
|
|
print(f"Warmup period: {warmup}s\n")
|
|
|
|
# Warmup period
|
|
if warmup > 0:
|
|
print("Warming up...")
|
|
await anyio.sleep(warmup)
|
|
|
|
# Start metrics collection
|
|
metrics.start()
|
|
|
|
# Create and run workers using anyio task groups
|
|
async with anyio.create_task_group() as tg:
|
|
# Start all workers
|
|
for i in range(concurrency):
|
|
tg.start_soon(benchmark_worker, i, url, duration, metrics, stop_event)
|
|
|
|
# Show progress
|
|
tg.start_soon(show_progress, duration, metrics, stop_event)
|
|
|
|
# Stop metrics (tasks already completed when task group exits)
|
|
metrics.stop()
|
|
|
|
return metrics
|
|
|
|
|
|
async def show_progress(
|
|
duration: float,
|
|
metrics: BenchmarkMetrics,
|
|
stop_event: anyio.Event,
|
|
):
|
|
"""Show real-time progress during benchmark."""
|
|
start_time = time.time()
|
|
|
|
while not stop_event.is_set():
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= duration:
|
|
break
|
|
|
|
# Calculate progress
|
|
progress = min(elapsed / duration * 100, 100)
|
|
rps = metrics.total_requests / max(elapsed, 0.1)
|
|
|
|
# Print progress bar
|
|
bar_length = 40
|
|
filled = int(bar_length * progress / 100)
|
|
bar = "█" * filled + "░" * (bar_length - filled)
|
|
|
|
print(
|
|
f"\r[{bar}] {progress:5.1f}% | "
|
|
f"Requests: {metrics.total_requests:6d} | "
|
|
f"RPS: {rps:6.1f} | "
|
|
f"Errors: {metrics.failed_requests:4d}",
|
|
end="",
|
|
flush=True,
|
|
)
|
|
|
|
await anyio.sleep(0.5)
|
|
|
|
print() # New line after progress
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--concurrency",
|
|
"-c",
|
|
type=int,
|
|
default=10,
|
|
show_default=True,
|
|
help="Number of concurrent workers",
|
|
)
|
|
@click.option(
|
|
"--duration",
|
|
"-d",
|
|
type=float,
|
|
default=30.0,
|
|
show_default=True,
|
|
help="Test duration in seconds",
|
|
)
|
|
@click.option(
|
|
"--warmup",
|
|
"-w",
|
|
type=float,
|
|
default=5.0,
|
|
show_default=True,
|
|
help="Warmup duration before collecting metrics (seconds)",
|
|
)
|
|
@click.option(
|
|
"--url",
|
|
"-u",
|
|
default="http://localhost:8000/mcp",
|
|
show_default=True,
|
|
help="MCP server URL",
|
|
)
|
|
@click.option(
|
|
"--output",
|
|
"-o",
|
|
type=click.Path(),
|
|
help="Output file for JSON results (optional)",
|
|
)
|
|
@click.option(
|
|
"--wait-for-server/--no-wait",
|
|
default=True,
|
|
show_default=True,
|
|
help="Wait for MCP server to be ready before starting",
|
|
)
|
|
@click.option(
|
|
"--verbose",
|
|
"-v",
|
|
is_flag=True,
|
|
help="Enable verbose logging",
|
|
)
|
|
def main(
|
|
concurrency: int,
|
|
duration: float,
|
|
warmup: float,
|
|
url: str,
|
|
output: str | None,
|
|
wait_for_server: bool,
|
|
verbose: bool,
|
|
):
|
|
"""
|
|
Load testing benchmark for Nextcloud MCP Server.
|
|
|
|
Runs a mixed workload of realistic MCP operations against the server
|
|
and reports detailed performance metrics.
|
|
|
|
Examples:
|
|
|
|
# Quick 30-second test with 10 workers
|
|
uv run python -m tests.load.benchmark --concurrency 10 --duration 30
|
|
|
|
# Extended test with 50 workers for 5 minutes
|
|
uv run python -m tests.load.benchmark -c 50 -d 300
|
|
|
|
# Export results to JSON
|
|
uv run python -m tests.load.benchmark -c 20 -d 60 --output results.json
|
|
|
|
# Test OAuth server on port 8001
|
|
uv run python -m tests.load.benchmark --url http://localhost:8001/mcp
|
|
"""
|
|
if verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
logging.getLogger("tests.load").setLevel(logging.DEBUG)
|
|
|
|
async def run():
|
|
# Wait for server if requested
|
|
if wait_for_server:
|
|
if not await wait_for_mcp_server(url):
|
|
print("ERROR: MCP server is not ready", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Run benchmark
|
|
metrics = await run_benchmark(url, concurrency, duration, warmup)
|
|
|
|
# Print report
|
|
metrics.print_report()
|
|
|
|
# Export to JSON if requested
|
|
if output:
|
|
with open(output, "w") as f:
|
|
json.dump(metrics.to_dict(), f, indent=2)
|
|
print(f"Results exported to: {output}")
|
|
|
|
try:
|
|
anyio.run(run)
|
|
except KeyboardInterrupt:
|
|
print("\nBenchmark interrupted by user")
|
|
sys.exit(130)
|
|
except Exception as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
if verbose:
|
|
raise
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|