From a6e5f3d8ff66e02fddd4705228a124207d596021 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Mon, 10 Nov 2025 22:48:37 +0100 Subject: [PATCH] refactor: simplify OpenTelemetry tracing configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplifies the OpenTelemetry tracing setup by removing the redundant OTEL_ENABLED flag and using the presence of OTEL_EXPORTER_OTLP_ENDPOINT to determine if tracing should be enabled. This follows the standard OpenTelemetry environment variable conventions more closely. Changes: - Remove OTEL_ENABLED/tracing_enabled flag in favor of checking if OTEL_EXPORTER_OTLP_ENDPOINT is set - Add OTEL_EXPORTER_VERIFY_SSL configuration option for OTLP endpoints with self-signed certificates (defaults to false for development) - Move HTTPXClientInstrumentor initialization to module level to ensure httpx calls are traced across all Nextcloud API requests - Add tracing spans to vector sync operations (scan_user_documents) - Fix authorization header logging to only warn about missing headers in OAuth mode (BasicAuth mode doesn't use Authorization headers) - Update observability documentation to reflect simplified configuration - Refactor Dockerfile to use --no-editable flag for uv sync Breaking changes: - OTEL_ENABLED environment variable is removed - Tracing is now automatically enabled when OTEL_EXPORTER_OTLP_ENDPOINT is set Migration guide: - Remove OTEL_ENABLED=true from environment configuration - Tracing will be enabled automatically if OTEL_EXPORTER_OTLP_ENDPOINT is configured 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .gitignore | 2 + Dockerfile | 2 +- .../templates/deployment.yaml | 2 - docker-compose.yml | 8 +- docs/observability.md | 6 +- nextcloud_mcp_server/app.py | 22 +- nextcloud_mcp_server/auth/unified_verifier.py | 18 +- nextcloud_mcp_server/client/__init__.py | 2 + nextcloud_mcp_server/config.py | 5 +- nextcloud_mcp_server/observability/tracing.py | 16 +- nextcloud_mcp_server/vector/processor.py | 107 ++++---- nextcloud_mcp_server/vector/scanner.py | 255 +++++++++--------- pyproject.toml | 16 +- 13 files changed, 255 insertions(+), 206 deletions(-) diff --git a/.gitignore b/.gitignore index da98098..56429ff 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,7 @@ __pycache__/ .env.local .env.*.local +docker-compose.override.yml + # Generated by pytest used to login users .nextcloud_oauth_*.json diff --git a/Dockerfile b/Dockerfile index 18f3a77..adfaac0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,7 +9,7 @@ WORKDIR /app COPY . . -RUN uv sync --locked --no-dev +RUN uv sync --locked --no-dev --no-editable ENV PYTHONUNBUFFERED=1 ENV VIRTUAL_ENV=/app/.venv diff --git a/charts/nextcloud-mcp-server/templates/deployment.yaml b/charts/nextcloud-mcp-server/templates/deployment.yaml index ce624c0..6471ebb 100644 --- a/charts/nextcloud-mcp-server/templates/deployment.yaml +++ b/charts/nextcloud-mcp-server/templates/deployment.yaml @@ -218,8 +218,6 @@ spec: - name: METRICS_PORT value: {{ .Values.observability.metrics.port | quote }} {{- if .Values.observability.tracing.enabled }} - - name: OTEL_ENABLED - value: "true" - name: OTEL_EXPORTER_OTLP_ENDPOINT value: {{ .Values.observability.tracing.endpoint | quote }} - name: OTEL_SERVICE_NAME diff --git a/docker-compose.yml b/docker-compose.yml index fc75736..e1cfd43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -98,16 +98,20 @@ services: #- QDRANT_URL=http://qdrant:6333 # Uncomment for network mode #- QDRANT_API_KEY=${QDRANT_API_KEY:-my_secret_api_key} # Only for network mode + # Observability + #- OTEL_SERVICE_NAME=nextcloud-mcp-docker-compose + #- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 + # Collection naming: Auto-generated as {deployment-id}-{model-name} # - Deployment ID: OTEL_SERVICE_NAME (if set) or hostname (fallback) # - Model name: OLLAMA_EMBEDDING_MODEL # - Example: "nextcloud-mcp-server-nomic-embed-text" # - Changing models creates new collection (requires re-embedding) # - Set QDRANT_COLLECTION to override auto-generation: - - QDRANT_COLLECTION=nextcloud_content + #- QDRANT_COLLECTION=nextcloud_content # Ollama configuration (optional - uses SimpleEmbeddingProvider if not set) - # - OLLAMA_BASE_URL=https://ollama.internal.coutinho.io:443 + # - OLLAMA_BASE_URL=http://ollama:11434 # - OLLAMA_EMBEDDING_MODEL=nomic-embed-text # Changing this creates new collection # - OLLAMA_VERIFY_SSL=false diff --git a/docs/observability.md b/docs/observability.md index 015f8a1..17f49f5 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -16,8 +16,7 @@ The Nextcloud MCP Server includes comprehensive observability features for produ export METRICS_ENABLED=true export METRICS_PORT=9090 -# Enable tracing (optional) -export OTEL_ENABLED=true +# Enable tracing (optional - tracing is enabled when OTEL_EXPORTER_OTLP_ENDPOINT is set) export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 # Start the server @@ -46,8 +45,7 @@ helm install nextcloud-mcp charts/nextcloud-mcp-server \ |----------|---------|-------------| | `METRICS_ENABLED` | `true` | Enable Prometheus metrics | | `METRICS_PORT` | `9090` | Port for metrics endpoint | -| `OTEL_ENABLED` | `false` | Enable OpenTelemetry tracing | -| `OTEL_EXPORTER_OTLP_ENDPOINT` | - | OTLP gRPC endpoint (e.g., `http://otel-collector:4317`) | +| `OTEL_EXPORTER_OTLP_ENDPOINT` | - | OTLP gRPC endpoint (e.g., `http://otel-collector:4317`). Tracing is enabled when this is set. | | `OTEL_SERVICE_NAME` | `nextcloud-mcp-server` | Service name in traces | | `OTEL_TRACES_SAMPLER` | `always_on` | Trace sampling strategy | | `OTEL_TRACES_SAMPLER_ARG` | `1.0` | Sampling rate (0.0-1.0) | diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 54434bb..aeb36db 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -5,9 +5,12 @@ from contextlib import AsyncExitStack, asynccontextmanager from dataclasses import dataclass from typing import TYPE_CHECKING, Optional +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor + if TYPE_CHECKING: from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage + import anyio import click import httpx @@ -58,6 +61,7 @@ from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools from nextcloud_mcp_server.vector import processor_task, scanner_task logger = logging.getLogger(__name__) +HTTPXClientInstrumentor().instrument() def initialize_document_processors(): @@ -791,17 +795,20 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): ) # Setup OpenTelemetry tracing (optional) - if settings.tracing_enabled: + if settings.otel_exporter_otlp_endpoint: setup_tracing( service_name=settings.otel_service_name, otlp_endpoint=settings.otel_exporter_otlp_endpoint, + otlp_verify_ssl=settings.otel_exporter_verify_ssl, sampling_rate=settings.otel_traces_sampler_arg, ) logger.info( f"OpenTelemetry tracing enabled (endpoint: {settings.otel_exporter_otlp_endpoint})" ) else: - logger.info("OpenTelemetry tracing disabled (set OTEL_ENABLED=true to enable)") + logger.info( + "OpenTelemetry tracing disabled (set OTEL_EXPORTER_OTLP_ENDPOINT to enable)" + ) # Determine authentication mode oauth_enabled = is_oauth_mode() @@ -1391,9 +1398,12 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): ) logger.info(f"🔑 /mcp request with Authorization: {token_preview}") else: - logger.warning( - f"⚠️ /mcp request WITHOUT Authorization header from {request.client}" - ) + # Only warn about missing Authorization in OAuth mode + # In BasicAuth mode, /mcp requests without Authorization are expected + if oauth_enabled: + logger.warning( + f"⚠️ /mcp request WITHOUT Authorization header from {request.client}" + ) # Log client capabilities on initialize request if request.method == "POST": @@ -1454,7 +1464,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): ) # Add observability middleware (metrics + tracing) - if settings.metrics_enabled or settings.tracing_enabled: + if settings.metrics_enabled or settings.otel_exporter_otlp_endpoint: app.add_middleware(ObservabilityMiddleware) logger.info("Observability middleware enabled (metrics and/or tracing)") diff --git a/nextcloud_mcp_server/auth/unified_verifier.py b/nextcloud_mcp_server/auth/unified_verifier.py index bd7e99b..6ca7129 100644 --- a/nextcloud_mcp_server/auth/unified_verifier.py +++ b/nextcloud_mcp_server/auth/unified_verifier.py @@ -231,17 +231,21 @@ class UnifiedTokenVerifier(TokenVerifier): token, signing_key.key, algorithms=["RS256"], - issuer=self.settings.oidc_issuer - if hasattr(self.settings, "oidc_issuer") - else None, + issuer=( + self.settings.oidc_issuer + if hasattr(self.settings, "oidc_issuer") + else None + ), options={ "verify_signature": True, "verify_exp": True, "verify_iat": True, - "verify_iss": True - if hasattr(self.settings, "oidc_issuer") - and self.settings.oidc_issuer - else False, + "verify_iss": ( + True + if hasattr(self.settings, "oidc_issuer") + and self.settings.oidc_issuer + else False + ), "verify_aud": False, # We handle audience validation separately }, ) diff --git a/nextcloud_mcp_server/client/__init__.py b/nextcloud_mcp_server/client/__init__.py index fd11418..cae6c07 100644 --- a/nextcloud_mcp_server/client/__init__.py +++ b/nextcloud_mcp_server/client/__init__.py @@ -9,6 +9,7 @@ from httpx import ( BasicAuth, Request, Response, + Timeout, ) from ..controllers.notes_search import NotesSearchController @@ -66,6 +67,7 @@ class NextcloudClient: auth=auth, transport=AsyncDisableCookieTransport(AsyncHTTPTransport()), event_hooks={"request": [log_request], "response": [log_response]}, + timeout=Timeout(timeout=30, connect=5), ) # Initialize app clients diff --git a/nextcloud_mcp_server/config.py b/nextcloud_mcp_server/config.py index 61b4ea0..eb2fd34 100644 --- a/nextcloud_mcp_server/config.py +++ b/nextcloud_mcp_server/config.py @@ -181,8 +181,8 @@ class Settings: # Observability settings metrics_enabled: bool = True metrics_port: int = 9090 - tracing_enabled: bool = False otel_exporter_otlp_endpoint: Optional[str] = None + otel_exporter_verify_ssl: bool = False otel_service_name: str = "nextcloud-mcp-server" otel_traces_sampler: str = "always_on" otel_traces_sampler_arg: float = 1.0 @@ -334,8 +334,9 @@ def get_settings() -> Settings: # Observability settings metrics_enabled=os.getenv("METRICS_ENABLED", "true").lower() == "true", metrics_port=int(os.getenv("METRICS_PORT", "9090")), - tracing_enabled=os.getenv("OTEL_ENABLED", "false").lower() == "true", otel_exporter_otlp_endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), + otel_exporter_verify_ssl=os.getenv("OTEL_EXPORTER_VERIFY_SSL", "false").lower() + == "true", otel_service_name=os.getenv("OTEL_SERVICE_NAME", "nextcloud-mcp-server"), otel_traces_sampler=os.getenv("OTEL_TRACES_SAMPLER", "always_on"), otel_traces_sampler_arg=float(os.getenv("OTEL_TRACES_SAMPLER_ARG", "1.0")), diff --git a/nextcloud_mcp_server/observability/tracing.py b/nextcloud_mcp_server/observability/tracing.py index 4b0e5ed..395fa75 100644 --- a/nextcloud_mcp_server/observability/tracing.py +++ b/nextcloud_mcp_server/observability/tracing.py @@ -13,9 +13,9 @@ import logging from contextlib import contextmanager from typing import Any +from importlib_metadata import version from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.logging import LoggingInstrumentor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider @@ -27,10 +27,13 @@ logger = logging.getLogger(__name__) # Global tracer instance (initialized in setup_tracing) _tracer: Tracer | None = None +# Auto-instrument httpx for Nextcloud API calls + def setup_tracing( service_name: str = "nextcloud-mcp-server", otlp_endpoint: str | None = None, + otlp_verify_ssl: bool = False, sampling_rate: float = 1.0, ) -> Tracer: """ @@ -40,6 +43,8 @@ def setup_tracing( service_name: Service name for traces (default: "nextcloud-mcp-server") otlp_endpoint: OTLP gRPC endpoint (e.g., "http://otel-collector:4317") If None, tracing is initialized but no exporter is configured + otlp_verify_ssl: Enable TLS verification for otlp_endpoint. If True, + `insecure` will eval to False sampling_rate: Sampling rate (0.0-1.0). Default 1.0 (100% sampling) Returns: @@ -51,7 +56,7 @@ def setup_tracing( resource = Resource.create( { "service.name": service_name, - "service.version": "0.27.2", # TODO: Extract from pyproject.toml + "service.version": version(__package__.split(".")[0]), } ) @@ -61,7 +66,9 @@ def setup_tracing( # Configure OTLP exporter if endpoint is provided if otlp_endpoint: try: - otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True) + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, insecure=not otlp_verify_ssl + ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) logger.info( @@ -79,9 +86,6 @@ def setup_tracing( # Set global tracer provider trace.set_tracer_provider(provider) - # Auto-instrument httpx for Nextcloud API calls - HTTPXClientInstrumentor().instrument() - # Auto-instrument logging to inject trace context LoggingInstrumentor().instrument(set_logging_format=True) diff --git a/nextcloud_mcp_server/vector/processor.py b/nextcloud_mcp_server/vector/processor.py index 424e716..5542cce 100644 --- a/nextcloud_mcp_server/vector/processor.py +++ b/nextcloud_mcp_server/vector/processor.py @@ -15,6 +15,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.embedding import get_embedding_service +from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.vector.document_chunker import DocumentChunker from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client from nextcloud_mcp_server.vector.scanner import DocumentTask @@ -94,58 +95,68 @@ async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient): f"for {doc_task.user_id} ({doc_task.operation})" ) - qdrant_client = await get_qdrant_client() - settings = get_settings() + with trace_operation( + "vector_sync.process_document", + attributes={ + "vector_sync.operation": "process", + "vector_sync.user_id": doc_task.user_id, + "vector_sync.doc_id": doc_task.doc_id, + "vector_sync.doc_type": doc_task.doc_type, + "vector_sync.doc_operation": doc_task.operation, + }, + ): + qdrant_client = await get_qdrant_client() + settings = get_settings() - # Handle deletion - if doc_task.operation == "delete": - await qdrant_client.delete( - collection_name=settings.get_collection_name(), - points_selector=Filter( - must=[ - FieldCondition( - key="user_id", - match=MatchValue(value=doc_task.user_id), - ), - FieldCondition( - key="doc_id", - match=MatchValue(value=doc_task.doc_id), - ), - FieldCondition( - key="doc_type", - match=MatchValue(value=doc_task.doc_type), - ), - ] - ), - ) - logger.info( - f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}" - ) - return + # Handle deletion + if doc_task.operation == "delete": + await qdrant_client.delete( + collection_name=settings.get_collection_name(), + points_selector=Filter( + must=[ + FieldCondition( + key="user_id", + match=MatchValue(value=doc_task.user_id), + ), + FieldCondition( + key="doc_id", + match=MatchValue(value=doc_task.doc_id), + ), + FieldCondition( + key="doc_type", + match=MatchValue(value=doc_task.doc_type), + ), + ] + ), + ) + logger.info( + f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}" + ) + return - # Handle indexing with retry - max_retries = 3 - retry_delay = 1.0 + # Handle indexing with retry + max_retries = 3 + retry_delay = 1.0 - for attempt in range(max_retries): - try: - await _index_document(doc_task, nc_client, qdrant_client) - return # Success + for attempt in range(max_retries): + try: + await _index_document(doc_task, nc_client, qdrant_client) + return # Success - except (HTTPStatusError, Exception) as e: - if attempt < max_retries - 1: - logger.warning( - f"Retry {attempt + 1}/{max_retries} for " - f"{doc_task.doc_type}_{doc_task.doc_id}: {e}" - ) - await anyio.sleep(retry_delay) - retry_delay *= 2 # Exponential backoff - else: - logger.error( - f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} " - f"after {max_retries} retries: {e}" - ) - raise + except (HTTPStatusError, Exception) as e: + if attempt < max_retries - 1: + logger.warning( + f"Retry {attempt + 1}/{max_retries} for " + f"{doc_task.doc_type}_{doc_task.doc_id}: {e}" + ) + await anyio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + logger.error( + f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} " + f"after {max_retries} retries: {e}" + ) + raise async def _index_document( diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index 0c64426..1eea941 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -13,6 +13,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) @@ -154,134 +155,148 @@ async def scan_user_documents( f"[SCAN-{scan_id}] Starting scan for user: {user_id}, initial_sync={initial_sync}" ) - # Calculate prune timestamp for optimized data transfer - # Only notes modified after this will be sent with full data - prune_before = None if initial_sync else await get_last_indexed_timestamp(user_id) - if prune_before: - logger.info( - f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer" + with trace_operation( + "vector_sync.scan_user_documents", + attributes={ + "vector_sync.operation": "scan", + "vector_sync.user_id": user_id, + "vector_sync.initial_sync": initial_sync, + "vector_sync.scan_id": scan_id, + }, + ): + # Calculate prune timestamp for optimized data transfer + # Only notes modified after this will be sent with full data + prune_before = ( + None if initial_sync else await get_last_indexed_timestamp(user_id) + ) + if prune_before: + logger.info( + f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer" + ) + + # Fetch all notes from Nextcloud + notes = [ + note + async for note in nc_client.notes.get_all_notes(prune_before=prune_before) + ] + logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}") + + if initial_sync: + # Send everything on first sync + for note in notes: + modified_at = note.get("modified", 0) + await send_stream.send( + DocumentTask( + user_id=user_id, + doc_id=str(note["id"]), + doc_type="note", + operation="index", + modified_at=modified_at, + ) + ) + logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}") + return + + # Get indexed state from Qdrant + qdrant_client = await get_qdrant_client() + scroll_result = await qdrant_client.scroll( + collection_name=get_settings().get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_type", match=MatchValue(value="note")), + ] + ), + with_payload=["doc_id", "indexed_at"], + with_vectors=False, + limit=10000, ) - # Fetch all notes from Nextcloud - notes = [ - note async for note in nc_client.notes.get_all_notes(prune_before=prune_before) - ] - logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}") + indexed_docs = { + point.payload["doc_id"]: point.payload["indexed_at"] + for point in scroll_result[0] + } + + logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant") + + # Compare and queue changes + queued = 0 + nextcloud_doc_ids = {str(note["id"]) for note in notes} - if initial_sync: - # Send everything on first sync for note in notes: + doc_id = str(note["id"]) + indexed_at = indexed_docs.get(doc_id) modified_at = note.get("modified", 0) - await send_stream.send( - DocumentTask( - user_id=user_id, - doc_id=str(note["id"]), - doc_type="note", - operation="index", - modified_at=modified_at, - ) - ) - logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}") - return - # Get indexed state from Qdrant - qdrant_client = await get_qdrant_client() - scroll_result = await qdrant_client.scroll( - collection_name=get_settings().get_collection_name(), - scroll_filter=Filter( - must=[ - FieldCondition(key="user_id", match=MatchValue(value=user_id)), - FieldCondition(key="doc_type", match=MatchValue(value="note")), - ] - ), - with_payload=["doc_id", "indexed_at"], - with_vectors=False, - limit=10000, - ) - - indexed_docs = { - point.payload["doc_id"]: point.payload["indexed_at"] - for point in scroll_result[0] - } - - logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant") - - # Compare and queue changes - queued = 0 - nextcloud_doc_ids = {str(note["id"]) for note in notes} - - for note in notes: - doc_id = str(note["id"]) - indexed_at = indexed_docs.get(doc_id) - modified_at = note.get("modified", 0) - - # If document reappeared, remove from potentially_deleted - doc_key = (user_id, doc_id) - if doc_key in _potentially_deleted: - logger.debug( - f"Document {doc_id} reappeared, removing from deletion grace period" - ) - del _potentially_deleted[doc_key] - - # Send if never indexed or modified since last index - if indexed_at is None or modified_at > indexed_at: - await send_stream.send( - DocumentTask( - user_id=user_id, - doc_id=doc_id, - doc_type="note", - operation="index", - modified_at=modified_at, - ) - ) - queued += 1 - - # Check for deleted documents (in Qdrant but not in Nextcloud) - # Use grace period: only delete after 2 consecutive scans confirm absence - settings = get_settings() - grace_period = settings.vector_sync_scan_interval * 1.5 # Allow 1.5 scan intervals - current_time = time.time() - - for doc_id in indexed_docs: - if doc_id not in nextcloud_doc_ids: + # If document reappeared, remove from potentially_deleted doc_key = (user_id, doc_id) - if doc_key in _potentially_deleted: - # Already marked as potentially deleted, check if grace period elapsed - first_missing_time = _potentially_deleted[doc_key] - time_missing = current_time - first_missing_time - - if time_missing >= grace_period: - # Grace period elapsed, send for deletion - logger.info( - f"Document {doc_id} missing for {time_missing:.1f}s " - f"(>{grace_period:.1f}s grace period), sending deletion" - ) - await send_stream.send( - DocumentTask( - user_id=user_id, - doc_id=doc_id, - doc_type="note", - operation="delete", - modified_at=0, - ) - ) - queued += 1 - # Remove from tracking after sending deletion - del _potentially_deleted[doc_key] - else: - logger.debug( - f"Document {doc_id} still missing " - f"({time_missing:.1f}s/{grace_period:.1f}s grace period)" - ) - else: - # First time missing, add to grace period tracking logger.debug( - f"Document {doc_id} missing for first time, starting grace period" + f"Document {doc_id} reappeared, removing from deletion grace period" ) - _potentially_deleted[doc_key] = current_time + del _potentially_deleted[doc_key] - if queued > 0: - logger.info(f"Sent {queued} documents for incremental sync: {user_id}") - else: - logger.debug(f"No changes detected for {user_id}") + # Send if never indexed or modified since last index + if indexed_at is None or modified_at > indexed_at: + await send_stream.send( + DocumentTask( + user_id=user_id, + doc_id=doc_id, + doc_type="note", + operation="index", + modified_at=modified_at, + ) + ) + queued += 1 + + # Check for deleted documents (in Qdrant but not in Nextcloud) + # Use grace period: only delete after 2 consecutive scans confirm absence + settings = get_settings() + grace_period = ( + settings.vector_sync_scan_interval * 1.5 + ) # Allow 1.5 scan intervals + current_time = time.time() + + for doc_id in indexed_docs: + if doc_id not in nextcloud_doc_ids: + doc_key = (user_id, doc_id) + + if doc_key in _potentially_deleted: + # Already marked as potentially deleted, check if grace period elapsed + first_missing_time = _potentially_deleted[doc_key] + time_missing = current_time - first_missing_time + + if time_missing >= grace_period: + # Grace period elapsed, send for deletion + logger.info( + f"Document {doc_id} missing for {time_missing:.1f}s " + f"(>{grace_period:.1f}s grace period), sending deletion" + ) + await send_stream.send( + DocumentTask( + user_id=user_id, + doc_id=doc_id, + doc_type="note", + operation="delete", + modified_at=0, + ) + ) + queued += 1 + # Remove from tracking after sending deletion + del _potentially_deleted[doc_key] + else: + logger.debug( + f"Document {doc_id} still missing " + f"({time_missing:.1f}s/{grace_period:.1f}s grace period)" + ) + else: + # First time missing, add to grace period tracking + logger.debug( + f"Document {doc_id} missing for first time, starting grace period" + ) + _potentially_deleted[doc_key] = current_time + + if queued > 0: + logger.info(f"Sent {queued} documents for incremental sync: {user_id}") + else: + logger.debug(f"No changes detected for {user_id}") diff --git a/pyproject.toml b/pyproject.toml index e9153db..54eec04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,14 +23,14 @@ dependencies = [ "authlib>=1.6.5", "qdrant-client>=1.7.0", # Observability dependencies - "prometheus-client>=0.21.0", # Prometheus metrics - "opentelemetry-api>=1.28.2", # OpenTelemetry API - "opentelemetry-sdk>=1.28.2", # OpenTelemetry SDK - "opentelemetry-instrumentation-asgi>=0.49b2", # Auto-instrument ASGI/Starlette - "opentelemetry-instrumentation-httpx>=0.49b2", # Auto-instrument httpx client - "opentelemetry-instrumentation-logging>=0.49b2", # Logging integration - "opentelemetry-exporter-otlp-proto-grpc>=1.28.2", # OTLP gRPC exporter - "python-json-logger>=3.2.0", # Structured JSON logging + "prometheus-client>=0.21.0", # Prometheus metrics + "opentelemetry-api>=1.28.2", # OpenTelemetry API + "opentelemetry-sdk>=1.28.2", # OpenTelemetry SDK + "opentelemetry-instrumentation-asgi>=0.49b2", # Auto-instrument ASGI/Starlette + "opentelemetry-instrumentation-httpx>=0.49b2", # Auto-instrument httpx client + "opentelemetry-instrumentation-logging>=0.49b2", # Logging integration + "opentelemetry-exporter-otlp-proto-grpc>=1.28.2", # OTLP gRPC exporter + "python-json-logger>=3.2.0", # Structured JSON logging ] classifiers = [ "Development Status :: 4 - Beta",