diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index b1fa436..cf1d095 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -1,5 +1,6 @@ import logging import os +import time from collections.abc import AsyncIterator from contextlib import AsyncExitStack, asynccontextmanager from dataclasses import dataclass @@ -44,6 +45,10 @@ from nextcloud_mcp_server.observability import ( setup_metrics, setup_tracing, ) +from nextcloud_mcp_server.observability.metrics import ( + record_dependency_check, + set_dependency_health, +) from nextcloud_mcp_server.server import ( configure_calendar_tools, configure_contacts_tools, @@ -1205,12 +1210,35 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): checks = {} is_ready = True - # Check Nextcloud host configuration + # Check Nextcloud host configuration and connectivity nextcloud_host = os.getenv("NEXTCLOUD_HOST") if nextcloud_host: checks["nextcloud_configured"] = "ok" + # Try to connect to Nextcloud + start_time = time.time() + try: + async with httpx.AsyncClient(timeout=2.0) as client: + response = await client.get(f"{nextcloud_host}/status.php") + duration = time.time() - start_time + if response.status_code == 200: + checks["nextcloud_reachable"] = "ok" + set_dependency_health("nextcloud", True) + else: + checks["nextcloud_reachable"] = ( + f"error: status {response.status_code}" + ) + set_dependency_health("nextcloud", False) + is_ready = False + record_dependency_check("nextcloud", duration) + except Exception as e: + duration = time.time() - start_time + checks["nextcloud_reachable"] = f"error: {str(e)}" + set_dependency_health("nextcloud", False) + record_dependency_check("nextcloud", duration) + is_ready = False else: checks["nextcloud_configured"] = "error: NEXTCLOUD_HOST not set" + set_dependency_health("nextcloud", False) is_ready = False # Check authentication configuration @@ -1238,20 +1266,29 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): qdrant_url = os.getenv("QDRANT_URL") # Only set in network mode if vector_sync_enabled and qdrant_url: + start_time = time.time() try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get(f"{qdrant_url}/readyz") + duration = time.time() - start_time if response.status_code == 200: checks["qdrant"] = "ok" + set_dependency_health("qdrant", True) else: checks["qdrant"] = f"error: status {response.status_code}" + set_dependency_health("qdrant", False) is_ready = False + record_dependency_check("qdrant", duration) except Exception as e: + duration = time.time() - start_time checks["qdrant"] = f"error: {str(e)}" + set_dependency_health("qdrant", False) + record_dependency_check("qdrant", duration) is_ready = False elif vector_sync_enabled: # Using embedded Qdrant (memory or persistent mode) checks["qdrant"] = "embedded" + set_dependency_health("qdrant", True) status_code = 200 if is_ready else 503 return JSONResponse( diff --git a/nextcloud_mcp_server/auth/storage.py b/nextcloud_mcp_server/auth/storage.py index ce3db76..689d1c9 100644 --- a/nextcloud_mcp_server/auth/storage.py +++ b/nextcloud_mcp_server/auth/storage.py @@ -35,6 +35,8 @@ from typing import Any, Optional import aiosqlite from cryptography.fernet import Fernet +from nextcloud_mcp_server.observability.metrics import record_db_operation + logger = logging.getLogger(__name__) @@ -292,35 +294,43 @@ class RefreshTokenStorage: # For Flow 2, set provisioned_at timestamp provisioned_at = now if flow_type == "flow2" else None - async with aiosqlite.connect(self.db_path) as db: - await db.execute( - """ - INSERT OR REPLACE INTO refresh_tokens - (user_id, encrypted_token, expires_at, created_at, updated_at, - flow_type, token_audience, provisioned_at, provisioning_client_id, scopes) - VALUES (?, ?, ?, COALESCE((SELECT created_at FROM refresh_tokens WHERE user_id = ?), ?), ?, - ?, ?, ?, ?, ?) - """, - ( - user_id, - encrypted_token, - expires_at, - user_id, - now, - now, - flow_type, - token_audience, - provisioned_at, - provisioning_client_id, - scopes_json, - ), - ) - await db.commit() + start_time = time.time() + try: + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + """ + INSERT OR REPLACE INTO refresh_tokens + (user_id, encrypted_token, expires_at, created_at, updated_at, + flow_type, token_audience, provisioned_at, provisioning_client_id, scopes) + VALUES (?, ?, ?, COALESCE((SELECT created_at FROM refresh_tokens WHERE user_id = ?), ?), ?, + ?, ?, ?, ?, ?) + """, + ( + user_id, + encrypted_token, + expires_at, + user_id, + now, + now, + flow_type, + token_audience, + provisioned_at, + provisioning_client_id, + scopes_json, + ), + ) + await db.commit() + duration = time.time() - start_time + record_db_operation("sqlite", "insert", duration, "success") - logger.info( - f"Stored refresh token for user {user_id}" - + (f" (expires at {expires_at})" if expires_at else "") - ) + logger.info( + f"Stored refresh token for user {user_id}" + + (f" (expires at {expires_at})" if expires_at else "") + ) + except Exception: + duration = time.time() - start_time + record_db_operation("sqlite", "insert", duration, "error") + raise # Audit log await self._audit_log( diff --git a/nextcloud_mcp_server/vector/processor.py b/nextcloud_mcp_server/vector/processor.py index f89aae5..ceecf5c 100644 --- a/nextcloud_mcp_server/vector/processor.py +++ b/nextcloud_mcp_server/vector/processor.py @@ -18,6 +18,7 @@ from nextcloud_mcp_server.embedding import get_embedding_service from nextcloud_mcp_server.observability.metrics import ( record_qdrant_operation, record_vector_sync_processing, + update_vector_sync_queue_size, ) from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.vector.document_chunker import DocumentChunker @@ -61,11 +62,21 @@ async def processor_task( with anyio.fail_after(1.0): doc_task = await receive_stream.receive() + # Update queue size metric after receiving + stream_stats = receive_stream.statistics() + update_vector_sync_queue_size(stream_stats.current_buffer_used) + # Process document await process_document(doc_task, nc_client) + # Update queue size metric after processing + stream_stats = receive_stream.statistics() + update_vector_sync_queue_size(stream_stats.current_buffer_used) + except TimeoutError: - # No documents available, continue + # No documents available, update metric to show empty queue + stream_stats = receive_stream.statistics() + update_vector_sync_queue_size(stream_stats.current_buffer_used) continue except anyio.EndOfStream: