feat: Add metrics instrumentation for queue, health, and database operations

Implement Prometheus metrics to populate empty Grafana dashboard panels.

## Phase 1: Queue Size Metrics 
**File**: `processor.py`
- Track vector sync queue depth in real-time
- Update metric after receiving and processing each document
- Update metric during timeout (empty queue)
- Enables: "Processing Queue Depth" panel

## Phase 2: Health Check Metrics 
**File**: `app.py`
- Add Nextcloud connectivity check with timing
- Add Qdrant health check with timing
- Record dependency health status (up/down)
- Record health check duration
- Enables: 4 health status panels + health check duration panel

## Phase 3: Database Operation Metrics (Partial) 
**File**: `storage.py`
- Instrument `store_refresh_token()` method
- Track SQLite INSERT operation timing and success/error status
- Enables: Partial data for database operation latency panel

## Metrics Now Exposed

### Queue Metrics:
- `mcp_vector_sync_queue_size` - Real-time queue depth

### Health Metrics:
- `mcp_dependency_health{dependency="nextcloud"}` - UP/DOWN status
- `mcp_dependency_health{dependency="qdrant"}` - UP/DOWN status
- `mcp_dependency_check_duration_seconds{dependency}` - Health check latency

### Database Metrics:
- `mcp_db_operations_total{db="sqlite",operation="insert"}` - Operation count
- `mcp_db_operation_duration_seconds{db="sqlite",operation="insert"}` - Operation latency

## Dashboard Impact

**Panels Now Populated** (7/34 panels):
-  Processing Queue Depth
-  Nextcloud Health
-  Qdrant Health
-  Health Check Duration
-  Database Operation Latency (partial)
-  Vector sync panels (already working from PR #292)

**Panels Still Empty** (remaining work):
-  OAuth panels (4): Token validations, exchanges, cache hit rate, refresh ops
-  MCP tool panels (3): Call volume, error rates, execution duration
-  Database panel: Needs more SQLite operations instrumented (~29 remaining)

## Testing

Verified metric definitions exist and will be recorded on next deployment.

## Next Steps

Phase 4: OAuth token metrics (unified_verifier.py, context_helper.py, storage.py)
Phase 5: MCP tool metrics (all server/*.py files with @mcp.tool())
Phase 3 completion: Remaining 29 database operations in storage.py

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-13 16:14:38 +01:00
parent bd76902932
commit a667d7c59c
3 changed files with 88 additions and 30 deletions
+38 -1
View File
@@ -1,5 +1,6 @@
import logging
import os
import time
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass
@@ -44,6 +45,10 @@ from nextcloud_mcp_server.observability import (
setup_metrics,
setup_tracing,
)
from nextcloud_mcp_server.observability.metrics import (
record_dependency_check,
set_dependency_health,
)
from nextcloud_mcp_server.server import (
configure_calendar_tools,
configure_contacts_tools,
@@ -1205,12 +1210,35 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
checks = {}
is_ready = True
# Check Nextcloud host configuration
# Check Nextcloud host configuration and connectivity
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if nextcloud_host:
checks["nextcloud_configured"] = "ok"
# Try to connect to Nextcloud
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(f"{nextcloud_host}/status.php")
duration = time.time() - start_time
if response.status_code == 200:
checks["nextcloud_reachable"] = "ok"
set_dependency_health("nextcloud", True)
else:
checks["nextcloud_reachable"] = (
f"error: status {response.status_code}"
)
set_dependency_health("nextcloud", False)
is_ready = False
record_dependency_check("nextcloud", duration)
except Exception as e:
duration = time.time() - start_time
checks["nextcloud_reachable"] = f"error: {str(e)}"
set_dependency_health("nextcloud", False)
record_dependency_check("nextcloud", duration)
is_ready = False
else:
checks["nextcloud_configured"] = "error: NEXTCLOUD_HOST not set"
set_dependency_health("nextcloud", False)
is_ready = False
# Check authentication configuration
@@ -1238,20 +1266,29 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
qdrant_url = os.getenv("QDRANT_URL") # Only set in network mode
if vector_sync_enabled and qdrant_url:
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(f"{qdrant_url}/readyz")
duration = time.time() - start_time
if response.status_code == 200:
checks["qdrant"] = "ok"
set_dependency_health("qdrant", True)
else:
checks["qdrant"] = f"error: status {response.status_code}"
set_dependency_health("qdrant", False)
is_ready = False
record_dependency_check("qdrant", duration)
except Exception as e:
duration = time.time() - start_time
checks["qdrant"] = f"error: {str(e)}"
set_dependency_health("qdrant", False)
record_dependency_check("qdrant", duration)
is_ready = False
elif vector_sync_enabled:
# Using embedded Qdrant (memory or persistent mode)
checks["qdrant"] = "embedded"
set_dependency_health("qdrant", True)
status_code = 200 if is_ready else 503
return JSONResponse(
+38 -28
View File
@@ -35,6 +35,8 @@ from typing import Any, Optional
import aiosqlite
from cryptography.fernet import Fernet
from nextcloud_mcp_server.observability.metrics import record_db_operation
logger = logging.getLogger(__name__)
@@ -292,35 +294,43 @@ class RefreshTokenStorage:
# For Flow 2, set provisioned_at timestamp
provisioned_at = now if flow_type == "flow2" else None
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO refresh_tokens
(user_id, encrypted_token, expires_at, created_at, updated_at,
flow_type, token_audience, provisioned_at, provisioning_client_id, scopes)
VALUES (?, ?, ?, COALESCE((SELECT created_at FROM refresh_tokens WHERE user_id = ?), ?), ?,
?, ?, ?, ?, ?)
""",
(
user_id,
encrypted_token,
expires_at,
user_id,
now,
now,
flow_type,
token_audience,
provisioned_at,
provisioning_client_id,
scopes_json,
),
)
await db.commit()
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
INSERT OR REPLACE INTO refresh_tokens
(user_id, encrypted_token, expires_at, created_at, updated_at,
flow_type, token_audience, provisioned_at, provisioning_client_id, scopes)
VALUES (?, ?, ?, COALESCE((SELECT created_at FROM refresh_tokens WHERE user_id = ?), ?), ?,
?, ?, ?, ?, ?)
""",
(
user_id,
encrypted_token,
expires_at,
user_id,
now,
now,
flow_type,
token_audience,
provisioned_at,
provisioning_client_id,
scopes_json,
),
)
await db.commit()
duration = time.time() - start_time
record_db_operation("sqlite", "insert", duration, "success")
logger.info(
f"Stored refresh token for user {user_id}"
+ (f" (expires at {expires_at})" if expires_at else "")
)
logger.info(
f"Stored refresh token for user {user_id}"
+ (f" (expires at {expires_at})" if expires_at else "")
)
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "insert", duration, "error")
raise
# Audit log
await self._audit_log(
+12 -1
View File
@@ -18,6 +18,7 @@ from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.observability.metrics import (
record_qdrant_operation,
record_vector_sync_processing,
update_vector_sync_queue_size,
)
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
@@ -61,11 +62,21 @@ async def processor_task(
with anyio.fail_after(1.0):
doc_task = await receive_stream.receive()
# Update queue size metric after receiving
stream_stats = receive_stream.statistics()
update_vector_sync_queue_size(stream_stats.current_buffer_used)
# Process document
await process_document(doc_task, nc_client)
# Update queue size metric after processing
stream_stats = receive_stream.statistics()
update_vector_sync_queue_size(stream_stats.current_buffer_used)
except TimeoutError:
# No documents available, continue
# No documents available, update metric to show empty queue
stream_stats = receive_stream.statistics()
update_vector_sync_queue_size(stream_stats.current_buffer_used)
continue
except anyio.EndOfStream: