feat: Add OAuth token and database metrics (Phases 3-4)

Complete Prometheus instrumentation for OAuth token operations
and additional database operations to populate empty dashboard panels.

OAuth Token Metrics (Phase 4):
- unified_verifier.py:
  * Token validation cache hits/misses
  * JWT verification success/failure/error
  * Introspection validation results
  * Audience validation failures
- context_helper.py:
  * Token exchange cache hits/misses
  * RFC 8693 exchange success/error

Database Metrics (Phase 3 completion):
- storage.py:
  * get_refresh_token() with timing
  * delete_refresh_token() with timing
  * All operations record duration and success/error status

These metrics populate the following dashboard panels:
- Token Validations (by method and result)
- Token Cache Hit Rate
- Token Exchange Operations
- Database Operations (refresh token CRUD)
- Database Operation Duration

Part of PR #295 - Complete metrics instrumentation
This commit is contained in:
Chris Coutinho
2025-11-13 16:23:00 +01:00
parent a667d7c59c
commit c97f12d47e
3 changed files with 109 additions and 57 deletions
+19 -7
View File
@@ -12,6 +12,10 @@ from mcp.server.fastmcp import Context
from ..client import NextcloudClient
from ..config import get_settings
from ..observability.metrics import (
oauth_token_cache_hits_total,
oauth_token_exchange_total,
)
from .token_exchange import exchange_token_for_audience
logger = logging.getLogger(__name__)
@@ -138,6 +142,7 @@ async def get_session_client_from_context(
logger.debug(
f"Using cached exchanged token (expires in {expiry - time.time():.1f}s)"
)
oauth_token_cache_hits_total.labels(hit="true").inc()
return NextcloudClient.from_token(
base_url=base_url, token=cached_token, username=username
)
@@ -145,17 +150,24 @@ async def get_session_client_from_context(
logger.debug("Cached token expired, removing from cache")
del _exchange_cache[cache_key]
oauth_token_cache_hits_total.labels(hit="false").inc()
# Perform RFC 8693 token exchange
logger.info(f"Exchanging MCP token for Nextcloud API token (user: {username})")
# Exchange for Nextcloud resource URI audience
exchanged_token, expires_in = await exchange_token_for_audience(
subject_token=mcp_token,
requested_audience=settings.nextcloud_resource_uri or "nextcloud",
requested_scopes=None, # Nextcloud doesn't support scopes
)
try:
# Exchange for Nextcloud resource URI audience
exchanged_token, expires_in = await exchange_token_for_audience(
subject_token=mcp_token,
requested_audience=settings.nextcloud_resource_uri or "nextcloud",
requested_scopes=None, # Nextcloud doesn't support scopes
)
oauth_token_exchange_total.labels(status="success").inc()
logger.info(f"Token exchange successful. Token expires in {expires_in}s")
logger.info(f"Token exchange successful. Token expires in {expires_in}s")
except Exception:
oauth_token_exchange_total.labels(status="error").inc()
raise
# Cache the exchanged token
# Use the minimum of exchange TTL and configured cache TTL
+69 -50
View File
@@ -432,40 +432,45 @@ class RefreshTokenStorage:
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT encrypted_token, expires_at, flow_type, token_audience,
provisioned_at, provisioning_client_id, scopes
FROM refresh_tokens WHERE user_id = ?
""",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(f"No refresh token found for user {user_id}")
return None
(
encrypted_token,
expires_at,
flow_type,
token_audience,
provisioned_at,
provisioning_client_id,
scopes_json,
) = row
# Check expiration
if expires_at is not None and expires_at < time.time():
logger.warning(
f"Refresh token for user {user_id} has expired (expired at {expires_at})"
)
await self.delete_refresh_token(user_id)
return None
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT encrypted_token, expires_at, flow_type, token_audience,
provisioned_at, provisioning_client_id, scopes
FROM refresh_tokens WHERE user_id = ?
""",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(f"No refresh token found for user {user_id}")
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return None
(
encrypted_token,
expires_at,
flow_type,
token_audience,
provisioned_at,
provisioning_client_id,
scopes_json,
) = row
# Check expiration
if expires_at is not None and expires_at < time.time():
logger.warning(
f"Refresh token for user {user_id} has expired (expired at {expires_at})"
)
await self.delete_refresh_token(user_id)
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return None
decrypted_token = self.cipher.decrypt(encrypted_token).decode()
scopes = json.loads(scopes_json) if scopes_json else None
@@ -473,6 +478,9 @@ class RefreshTokenStorage:
f"Retrieved refresh token for user {user_id} (flow_type: {flow_type})"
)
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "success")
return {
"refresh_token": decrypted_token,
"expires_at": expires_at,
@@ -484,6 +492,8 @@ class RefreshTokenStorage:
"scopes": scopes,
}
except Exception as e:
duration = time.time() - start_time
record_db_operation("sqlite", "select", duration, "error")
logger.error(f"Failed to decrypt refresh token for user {user_id}: {e}")
return None
@@ -578,25 +588,34 @@ class RefreshTokenStorage:
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM refresh_tokens WHERE user_id = ?",
(user_id,),
)
await db.commit()
deleted = cursor.rowcount > 0
start_time = time.time()
try:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM refresh_tokens WHERE user_id = ?",
(user_id,),
)
await db.commit()
deleted = cursor.rowcount > 0
if deleted:
logger.info(f"Deleted refresh token for user {user_id}")
await self._audit_log(
event="delete_refresh_token",
user_id=user_id,
auth_method="offline_access",
)
else:
logger.debug(f"No refresh token to delete for user {user_id}")
duration = time.time() - start_time
record_db_operation("sqlite", "delete", duration, "success")
return deleted
if deleted:
logger.info(f"Deleted refresh token for user {user_id}")
await self._audit_log(
event="delete_refresh_token",
user_id=user_id,
auth_method="offline_access",
)
else:
logger.debug(f"No refresh token to delete for user {user_id}")
return deleted
except Exception:
duration = time.time() - start_time
record_db_operation("sqlite", "delete", duration, "error")
raise
async def get_all_user_ids(self) -> list[str]:
"""
@@ -26,6 +26,10 @@ from jwt import PyJWKClient
from mcp.server.auth.provider import AccessToken, TokenVerifier
from nextcloud_mcp_server.config import Settings
from nextcloud_mcp_server.observability.metrics import (
oauth_token_cache_hits_total,
record_oauth_token_validation,
)
logger = logging.getLogger(__name__)
@@ -105,8 +109,11 @@ class UnifiedTokenVerifier(TokenVerifier):
cached = self._get_cached_token(token)
if cached:
logger.debug("Token found in cache")
oauth_token_cache_hits_total.labels(hit="true").inc()
return cached
oauth_token_cache_hits_total.labels(hit="false").inc()
# Both modes do the same validation (MCP audience only)
return await self._verify_mcp_audience(token)
@@ -124,13 +131,24 @@ class UnifiedTokenVerifier(TokenVerifier):
Returns:
AccessToken if valid with MCP audience, None otherwise
"""
validation_method = "unknown"
try:
# Attempt JWT verification first
if self._is_jwt_format(token) and self.jwks_client:
validation_method = "jwt"
payload = await self._verify_jwt_signature(token)
if payload:
record_oauth_token_validation("jwt", "valid")
else:
record_oauth_token_validation("jwt", "invalid")
else:
# Fall back to introspection for opaque tokens
validation_method = "introspect"
payload = await self._introspect_token(token)
if payload:
record_oauth_token_validation("introspect", "valid")
else:
record_oauth_token_validation("introspect", "invalid")
if not payload:
return None
@@ -146,6 +164,8 @@ class UnifiedTokenVerifier(TokenVerifier):
f"Got {audiences}, need MCP ({self.settings.oidc_client_id} or "
f"{self.settings.nextcloud_mcp_server_url})"
)
# Record as invalid due to audience mismatch
record_oauth_token_validation(validation_method, "invalid")
return None
# Log based on mode for clarity
@@ -163,6 +183,7 @@ class UnifiedTokenVerifier(TokenVerifier):
except Exception as e:
logger.error(f"Token verification failed: {e}")
record_oauth_token_validation(validation_method, "error")
return None
def _has_mcp_audience(self, payload: dict[str, Any]) -> bool: