Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ce666934f2 | |||
| cdf69b3ea8 | |||
| a6e5f3d8ff |
@@ -5,5 +5,7 @@ __pycache__/
|
||||
.env.local
|
||||
.env.*.local
|
||||
|
||||
docker-compose.override.yml
|
||||
|
||||
# Generated by pytest used to login users
|
||||
.nextcloud_oauth_*.json
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
## v0.31.1 (2025-11-10)
|
||||
|
||||
### Refactor
|
||||
|
||||
- simplify OpenTelemetry tracing configuration
|
||||
|
||||
## v0.31.0 (2025-11-10)
|
||||
|
||||
### Feat
|
||||
|
||||
+1
-1
@@ -9,7 +9,7 @@ WORKDIR /app
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN uv sync --locked --no-dev
|
||||
RUN uv sync --locked --no-dev --no-editable
|
||||
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
ENV VIRTUAL_ENV=/app/.venv
|
||||
|
||||
@@ -2,8 +2,8 @@ apiVersion: v2
|
||||
name: nextcloud-mcp-server
|
||||
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
|
||||
type: application
|
||||
version: 0.31.0
|
||||
appVersion: "0.31.0"
|
||||
version: 0.31.1
|
||||
appVersion: "0.31.1"
|
||||
keywords:
|
||||
- nextcloud
|
||||
- mcp
|
||||
|
||||
@@ -218,8 +218,6 @@ spec:
|
||||
- name: METRICS_PORT
|
||||
value: {{ .Values.observability.metrics.port | quote }}
|
||||
{{- if .Values.observability.tracing.enabled }}
|
||||
- name: OTEL_ENABLED
|
||||
value: "true"
|
||||
- name: OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
value: {{ .Values.observability.tracing.endpoint | quote }}
|
||||
- name: OTEL_SERVICE_NAME
|
||||
|
||||
+6
-2
@@ -98,16 +98,20 @@ services:
|
||||
#- QDRANT_URL=http://qdrant:6333 # Uncomment for network mode
|
||||
#- QDRANT_API_KEY=${QDRANT_API_KEY:-my_secret_api_key} # Only for network mode
|
||||
|
||||
# Observability
|
||||
#- OTEL_SERVICE_NAME=nextcloud-mcp-docker-compose
|
||||
#- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
||||
|
||||
# Collection naming: Auto-generated as {deployment-id}-{model-name}
|
||||
# - Deployment ID: OTEL_SERVICE_NAME (if set) or hostname (fallback)
|
||||
# - Model name: OLLAMA_EMBEDDING_MODEL
|
||||
# - Example: "nextcloud-mcp-server-nomic-embed-text"
|
||||
# - Changing models creates new collection (requires re-embedding)
|
||||
# - Set QDRANT_COLLECTION to override auto-generation:
|
||||
- QDRANT_COLLECTION=nextcloud_content
|
||||
#- QDRANT_COLLECTION=nextcloud_content
|
||||
|
||||
# Ollama configuration (optional - uses SimpleEmbeddingProvider if not set)
|
||||
# - OLLAMA_BASE_URL=https://ollama.internal.coutinho.io:443
|
||||
# - OLLAMA_BASE_URL=http://ollama:11434
|
||||
# - OLLAMA_EMBEDDING_MODEL=nomic-embed-text # Changing this creates new collection
|
||||
# - OLLAMA_VERIFY_SSL=false
|
||||
|
||||
|
||||
@@ -16,8 +16,7 @@ The Nextcloud MCP Server includes comprehensive observability features for produ
|
||||
export METRICS_ENABLED=true
|
||||
export METRICS_PORT=9090
|
||||
|
||||
# Enable tracing (optional)
|
||||
export OTEL_ENABLED=true
|
||||
# Enable tracing (optional - tracing is enabled when OTEL_EXPORTER_OTLP_ENDPOINT is set)
|
||||
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
|
||||
|
||||
# Start the server
|
||||
@@ -46,8 +45,7 @@ helm install nextcloud-mcp charts/nextcloud-mcp-server \
|
||||
|----------|---------|-------------|
|
||||
| `METRICS_ENABLED` | `true` | Enable Prometheus metrics |
|
||||
| `METRICS_PORT` | `9090` | Port for metrics endpoint |
|
||||
| `OTEL_ENABLED` | `false` | Enable OpenTelemetry tracing |
|
||||
| `OTEL_EXPORTER_OTLP_ENDPOINT` | - | OTLP gRPC endpoint (e.g., `http://otel-collector:4317`) |
|
||||
| `OTEL_EXPORTER_OTLP_ENDPOINT` | - | OTLP gRPC endpoint (e.g., `http://otel-collector:4317`). Tracing is enabled when this is set. |
|
||||
| `OTEL_SERVICE_NAME` | `nextcloud-mcp-server` | Service name in traces |
|
||||
| `OTEL_TRACES_SAMPLER` | `always_on` | Trace sampling strategy |
|
||||
| `OTEL_TRACES_SAMPLER_ARG` | `1.0` | Sampling rate (0.0-1.0) |
|
||||
|
||||
@@ -5,9 +5,12 @@ from contextlib import AsyncExitStack, asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage
|
||||
|
||||
|
||||
import anyio
|
||||
import click
|
||||
import httpx
|
||||
@@ -58,6 +61,7 @@ from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools
|
||||
from nextcloud_mcp_server.vector import processor_task, scanner_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
|
||||
|
||||
def initialize_document_processors():
|
||||
@@ -791,17 +795,20 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
)
|
||||
|
||||
# Setup OpenTelemetry tracing (optional)
|
||||
if settings.tracing_enabled:
|
||||
if settings.otel_exporter_otlp_endpoint:
|
||||
setup_tracing(
|
||||
service_name=settings.otel_service_name,
|
||||
otlp_endpoint=settings.otel_exporter_otlp_endpoint,
|
||||
otlp_verify_ssl=settings.otel_exporter_verify_ssl,
|
||||
sampling_rate=settings.otel_traces_sampler_arg,
|
||||
)
|
||||
logger.info(
|
||||
f"OpenTelemetry tracing enabled (endpoint: {settings.otel_exporter_otlp_endpoint})"
|
||||
)
|
||||
else:
|
||||
logger.info("OpenTelemetry tracing disabled (set OTEL_ENABLED=true to enable)")
|
||||
logger.info(
|
||||
"OpenTelemetry tracing disabled (set OTEL_EXPORTER_OTLP_ENDPOINT to enable)"
|
||||
)
|
||||
|
||||
# Determine authentication mode
|
||||
oauth_enabled = is_oauth_mode()
|
||||
@@ -1391,9 +1398,12 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
)
|
||||
logger.info(f"🔑 /mcp request with Authorization: {token_preview}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"⚠️ /mcp request WITHOUT Authorization header from {request.client}"
|
||||
)
|
||||
# Only warn about missing Authorization in OAuth mode
|
||||
# In BasicAuth mode, /mcp requests without Authorization are expected
|
||||
if oauth_enabled:
|
||||
logger.warning(
|
||||
f"⚠️ /mcp request WITHOUT Authorization header from {request.client}"
|
||||
)
|
||||
|
||||
# Log client capabilities on initialize request
|
||||
if request.method == "POST":
|
||||
@@ -1454,7 +1464,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
)
|
||||
|
||||
# Add observability middleware (metrics + tracing)
|
||||
if settings.metrics_enabled or settings.tracing_enabled:
|
||||
if settings.metrics_enabled or settings.otel_exporter_otlp_endpoint:
|
||||
app.add_middleware(ObservabilityMiddleware)
|
||||
logger.info("Observability middleware enabled (metrics and/or tracing)")
|
||||
|
||||
|
||||
@@ -231,17 +231,21 @@ class UnifiedTokenVerifier(TokenVerifier):
|
||||
token,
|
||||
signing_key.key,
|
||||
algorithms=["RS256"],
|
||||
issuer=self.settings.oidc_issuer
|
||||
if hasattr(self.settings, "oidc_issuer")
|
||||
else None,
|
||||
issuer=(
|
||||
self.settings.oidc_issuer
|
||||
if hasattr(self.settings, "oidc_issuer")
|
||||
else None
|
||||
),
|
||||
options={
|
||||
"verify_signature": True,
|
||||
"verify_exp": True,
|
||||
"verify_iat": True,
|
||||
"verify_iss": True
|
||||
if hasattr(self.settings, "oidc_issuer")
|
||||
and self.settings.oidc_issuer
|
||||
else False,
|
||||
"verify_iss": (
|
||||
True
|
||||
if hasattr(self.settings, "oidc_issuer")
|
||||
and self.settings.oidc_issuer
|
||||
else False
|
||||
),
|
||||
"verify_aud": False, # We handle audience validation separately
|
||||
},
|
||||
)
|
||||
|
||||
@@ -9,6 +9,7 @@ from httpx import (
|
||||
BasicAuth,
|
||||
Request,
|
||||
Response,
|
||||
Timeout,
|
||||
)
|
||||
|
||||
from ..controllers.notes_search import NotesSearchController
|
||||
@@ -66,6 +67,7 @@ class NextcloudClient:
|
||||
auth=auth,
|
||||
transport=AsyncDisableCookieTransport(AsyncHTTPTransport()),
|
||||
event_hooks={"request": [log_request], "response": [log_response]},
|
||||
timeout=Timeout(timeout=30, connect=5),
|
||||
)
|
||||
|
||||
# Initialize app clients
|
||||
|
||||
@@ -181,8 +181,8 @@ class Settings:
|
||||
# Observability settings
|
||||
metrics_enabled: bool = True
|
||||
metrics_port: int = 9090
|
||||
tracing_enabled: bool = False
|
||||
otel_exporter_otlp_endpoint: Optional[str] = None
|
||||
otel_exporter_verify_ssl: bool = False
|
||||
otel_service_name: str = "nextcloud-mcp-server"
|
||||
otel_traces_sampler: str = "always_on"
|
||||
otel_traces_sampler_arg: float = 1.0
|
||||
@@ -334,8 +334,9 @@ def get_settings() -> Settings:
|
||||
# Observability settings
|
||||
metrics_enabled=os.getenv("METRICS_ENABLED", "true").lower() == "true",
|
||||
metrics_port=int(os.getenv("METRICS_PORT", "9090")),
|
||||
tracing_enabled=os.getenv("OTEL_ENABLED", "false").lower() == "true",
|
||||
otel_exporter_otlp_endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"),
|
||||
otel_exporter_verify_ssl=os.getenv("OTEL_EXPORTER_VERIFY_SSL", "false").lower()
|
||||
== "true",
|
||||
otel_service_name=os.getenv("OTEL_SERVICE_NAME", "nextcloud-mcp-server"),
|
||||
otel_traces_sampler=os.getenv("OTEL_TRACES_SAMPLER", "always_on"),
|
||||
otel_traces_sampler_arg=float(os.getenv("OTEL_TRACES_SAMPLER_ARG", "1.0")),
|
||||
|
||||
@@ -13,9 +13,9 @@ import logging
|
||||
from contextlib import contextmanager
|
||||
from typing import Any
|
||||
|
||||
from importlib_metadata import version
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
|
||||
from opentelemetry.instrumentation.logging import LoggingInstrumentor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
@@ -27,10 +27,13 @@ logger = logging.getLogger(__name__)
|
||||
# Global tracer instance (initialized in setup_tracing)
|
||||
_tracer: Tracer | None = None
|
||||
|
||||
# Auto-instrument httpx for Nextcloud API calls
|
||||
|
||||
|
||||
def setup_tracing(
|
||||
service_name: str = "nextcloud-mcp-server",
|
||||
otlp_endpoint: str | None = None,
|
||||
otlp_verify_ssl: bool = False,
|
||||
sampling_rate: float = 1.0,
|
||||
) -> Tracer:
|
||||
"""
|
||||
@@ -40,6 +43,8 @@ def setup_tracing(
|
||||
service_name: Service name for traces (default: "nextcloud-mcp-server")
|
||||
otlp_endpoint: OTLP gRPC endpoint (e.g., "http://otel-collector:4317")
|
||||
If None, tracing is initialized but no exporter is configured
|
||||
otlp_verify_ssl: Enable TLS verification for otlp_endpoint. If True,
|
||||
`insecure` will eval to False
|
||||
sampling_rate: Sampling rate (0.0-1.0). Default 1.0 (100% sampling)
|
||||
|
||||
Returns:
|
||||
@@ -51,7 +56,7 @@ def setup_tracing(
|
||||
resource = Resource.create(
|
||||
{
|
||||
"service.name": service_name,
|
||||
"service.version": "0.27.2", # TODO: Extract from pyproject.toml
|
||||
"service.version": version(__package__.split(".")[0]),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -61,7 +66,9 @@ def setup_tracing(
|
||||
# Configure OTLP exporter if endpoint is provided
|
||||
if otlp_endpoint:
|
||||
try:
|
||||
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
|
||||
otlp_exporter = OTLPSpanExporter(
|
||||
endpoint=otlp_endpoint, insecure=not otlp_verify_ssl
|
||||
)
|
||||
span_processor = BatchSpanProcessor(otlp_exporter)
|
||||
provider.add_span_processor(span_processor)
|
||||
logger.info(
|
||||
@@ -79,9 +86,6 @@ def setup_tracing(
|
||||
# Set global tracer provider
|
||||
trace.set_tracer_provider(provider)
|
||||
|
||||
# Auto-instrument httpx for Nextcloud API calls
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
|
||||
# Auto-instrument logging to inject trace context
|
||||
LoggingInstrumentor().instrument(set_logging_format=True)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.embedding import get_embedding_service
|
||||
from nextcloud_mcp_server.observability.tracing import trace_operation
|
||||
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
from nextcloud_mcp_server.vector.scanner import DocumentTask
|
||||
@@ -94,58 +95,68 @@ async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient):
|
||||
f"for {doc_task.user_id} ({doc_task.operation})"
|
||||
)
|
||||
|
||||
qdrant_client = await get_qdrant_client()
|
||||
settings = get_settings()
|
||||
with trace_operation(
|
||||
"vector_sync.process_document",
|
||||
attributes={
|
||||
"vector_sync.operation": "process",
|
||||
"vector_sync.user_id": doc_task.user_id,
|
||||
"vector_sync.doc_id": doc_task.doc_id,
|
||||
"vector_sync.doc_type": doc_task.doc_type,
|
||||
"vector_sync.doc_operation": doc_task.operation,
|
||||
},
|
||||
):
|
||||
qdrant_client = await get_qdrant_client()
|
||||
settings = get_settings()
|
||||
|
||||
# Handle deletion
|
||||
if doc_task.operation == "delete":
|
||||
await qdrant_client.delete(
|
||||
collection_name=settings.get_collection_name(),
|
||||
points_selector=Filter(
|
||||
must=[
|
||||
FieldCondition(
|
||||
key="user_id",
|
||||
match=MatchValue(value=doc_task.user_id),
|
||||
),
|
||||
FieldCondition(
|
||||
key="doc_id",
|
||||
match=MatchValue(value=doc_task.doc_id),
|
||||
),
|
||||
FieldCondition(
|
||||
key="doc_type",
|
||||
match=MatchValue(value=doc_task.doc_type),
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
logger.info(
|
||||
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
|
||||
)
|
||||
return
|
||||
# Handle deletion
|
||||
if doc_task.operation == "delete":
|
||||
await qdrant_client.delete(
|
||||
collection_name=settings.get_collection_name(),
|
||||
points_selector=Filter(
|
||||
must=[
|
||||
FieldCondition(
|
||||
key="user_id",
|
||||
match=MatchValue(value=doc_task.user_id),
|
||||
),
|
||||
FieldCondition(
|
||||
key="doc_id",
|
||||
match=MatchValue(value=doc_task.doc_id),
|
||||
),
|
||||
FieldCondition(
|
||||
key="doc_type",
|
||||
match=MatchValue(value=doc_task.doc_type),
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
logger.info(
|
||||
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
|
||||
)
|
||||
return
|
||||
|
||||
# Handle indexing with retry
|
||||
max_retries = 3
|
||||
retry_delay = 1.0
|
||||
# Handle indexing with retry
|
||||
max_retries = 3
|
||||
retry_delay = 1.0
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
await _index_document(doc_task, nc_client, qdrant_client)
|
||||
return # Success
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
await _index_document(doc_task, nc_client, qdrant_client)
|
||||
return # Success
|
||||
|
||||
except (HTTPStatusError, Exception) as e:
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
f"Retry {attempt + 1}/{max_retries} for "
|
||||
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
||||
)
|
||||
await anyio.sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
|
||||
f"after {max_retries} retries: {e}"
|
||||
)
|
||||
raise
|
||||
except (HTTPStatusError, Exception) as e:
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
f"Retry {attempt + 1}/{max_retries} for "
|
||||
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
||||
)
|
||||
await anyio.sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
|
||||
f"after {max_retries} retries: {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def _index_document(
|
||||
|
||||
@@ -13,6 +13,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue
|
||||
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.observability.tracing import trace_operation
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -154,134 +155,148 @@ async def scan_user_documents(
|
||||
f"[SCAN-{scan_id}] Starting scan for user: {user_id}, initial_sync={initial_sync}"
|
||||
)
|
||||
|
||||
# Calculate prune timestamp for optimized data transfer
|
||||
# Only notes modified after this will be sent with full data
|
||||
prune_before = None if initial_sync else await get_last_indexed_timestamp(user_id)
|
||||
if prune_before:
|
||||
logger.info(
|
||||
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
|
||||
with trace_operation(
|
||||
"vector_sync.scan_user_documents",
|
||||
attributes={
|
||||
"vector_sync.operation": "scan",
|
||||
"vector_sync.user_id": user_id,
|
||||
"vector_sync.initial_sync": initial_sync,
|
||||
"vector_sync.scan_id": scan_id,
|
||||
},
|
||||
):
|
||||
# Calculate prune timestamp for optimized data transfer
|
||||
# Only notes modified after this will be sent with full data
|
||||
prune_before = (
|
||||
None if initial_sync else await get_last_indexed_timestamp(user_id)
|
||||
)
|
||||
if prune_before:
|
||||
logger.info(
|
||||
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
|
||||
)
|
||||
|
||||
# Fetch all notes from Nextcloud
|
||||
notes = [
|
||||
note
|
||||
async for note in nc_client.notes.get_all_notes(prune_before=prune_before)
|
||||
]
|
||||
logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}")
|
||||
|
||||
if initial_sync:
|
||||
# Send everything on first sync
|
||||
for note in notes:
|
||||
modified_at = note.get("modified", 0)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=str(note["id"]),
|
||||
doc_type="note",
|
||||
operation="index",
|
||||
modified_at=modified_at,
|
||||
)
|
||||
)
|
||||
logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}")
|
||||
return
|
||||
|
||||
# Get indexed state from Qdrant
|
||||
qdrant_client = await get_qdrant_client()
|
||||
scroll_result = await qdrant_client.scroll(
|
||||
collection_name=get_settings().get_collection_name(),
|
||||
scroll_filter=Filter(
|
||||
must=[
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
FieldCondition(key="doc_type", match=MatchValue(value="note")),
|
||||
]
|
||||
),
|
||||
with_payload=["doc_id", "indexed_at"],
|
||||
with_vectors=False,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
# Fetch all notes from Nextcloud
|
||||
notes = [
|
||||
note async for note in nc_client.notes.get_all_notes(prune_before=prune_before)
|
||||
]
|
||||
logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}")
|
||||
indexed_docs = {
|
||||
point.payload["doc_id"]: point.payload["indexed_at"]
|
||||
for point in scroll_result[0]
|
||||
}
|
||||
|
||||
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
|
||||
|
||||
# Compare and queue changes
|
||||
queued = 0
|
||||
nextcloud_doc_ids = {str(note["id"]) for note in notes}
|
||||
|
||||
if initial_sync:
|
||||
# Send everything on first sync
|
||||
for note in notes:
|
||||
doc_id = str(note["id"])
|
||||
indexed_at = indexed_docs.get(doc_id)
|
||||
modified_at = note.get("modified", 0)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=str(note["id"]),
|
||||
doc_type="note",
|
||||
operation="index",
|
||||
modified_at=modified_at,
|
||||
)
|
||||
)
|
||||
logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}")
|
||||
return
|
||||
|
||||
# Get indexed state from Qdrant
|
||||
qdrant_client = await get_qdrant_client()
|
||||
scroll_result = await qdrant_client.scroll(
|
||||
collection_name=get_settings().get_collection_name(),
|
||||
scroll_filter=Filter(
|
||||
must=[
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
FieldCondition(key="doc_type", match=MatchValue(value="note")),
|
||||
]
|
||||
),
|
||||
with_payload=["doc_id", "indexed_at"],
|
||||
with_vectors=False,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
indexed_docs = {
|
||||
point.payload["doc_id"]: point.payload["indexed_at"]
|
||||
for point in scroll_result[0]
|
||||
}
|
||||
|
||||
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
|
||||
|
||||
# Compare and queue changes
|
||||
queued = 0
|
||||
nextcloud_doc_ids = {str(note["id"]) for note in notes}
|
||||
|
||||
for note in notes:
|
||||
doc_id = str(note["id"])
|
||||
indexed_at = indexed_docs.get(doc_id)
|
||||
modified_at = note.get("modified", 0)
|
||||
|
||||
# If document reappeared, remove from potentially_deleted
|
||||
doc_key = (user_id, doc_id)
|
||||
if doc_key in _potentially_deleted:
|
||||
logger.debug(
|
||||
f"Document {doc_id} reappeared, removing from deletion grace period"
|
||||
)
|
||||
del _potentially_deleted[doc_key]
|
||||
|
||||
# Send if never indexed or modified since last index
|
||||
if indexed_at is None or modified_at > indexed_at:
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=doc_id,
|
||||
doc_type="note",
|
||||
operation="index",
|
||||
modified_at=modified_at,
|
||||
)
|
||||
)
|
||||
queued += 1
|
||||
|
||||
# Check for deleted documents (in Qdrant but not in Nextcloud)
|
||||
# Use grace period: only delete after 2 consecutive scans confirm absence
|
||||
settings = get_settings()
|
||||
grace_period = settings.vector_sync_scan_interval * 1.5 # Allow 1.5 scan intervals
|
||||
current_time = time.time()
|
||||
|
||||
for doc_id in indexed_docs:
|
||||
if doc_id not in nextcloud_doc_ids:
|
||||
# If document reappeared, remove from potentially_deleted
|
||||
doc_key = (user_id, doc_id)
|
||||
|
||||
if doc_key in _potentially_deleted:
|
||||
# Already marked as potentially deleted, check if grace period elapsed
|
||||
first_missing_time = _potentially_deleted[doc_key]
|
||||
time_missing = current_time - first_missing_time
|
||||
|
||||
if time_missing >= grace_period:
|
||||
# Grace period elapsed, send for deletion
|
||||
logger.info(
|
||||
f"Document {doc_id} missing for {time_missing:.1f}s "
|
||||
f"(>{grace_period:.1f}s grace period), sending deletion"
|
||||
)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=doc_id,
|
||||
doc_type="note",
|
||||
operation="delete",
|
||||
modified_at=0,
|
||||
)
|
||||
)
|
||||
queued += 1
|
||||
# Remove from tracking after sending deletion
|
||||
del _potentially_deleted[doc_key]
|
||||
else:
|
||||
logger.debug(
|
||||
f"Document {doc_id} still missing "
|
||||
f"({time_missing:.1f}s/{grace_period:.1f}s grace period)"
|
||||
)
|
||||
else:
|
||||
# First time missing, add to grace period tracking
|
||||
logger.debug(
|
||||
f"Document {doc_id} missing for first time, starting grace period"
|
||||
f"Document {doc_id} reappeared, removing from deletion grace period"
|
||||
)
|
||||
_potentially_deleted[doc_key] = current_time
|
||||
del _potentially_deleted[doc_key]
|
||||
|
||||
if queued > 0:
|
||||
logger.info(f"Sent {queued} documents for incremental sync: {user_id}")
|
||||
else:
|
||||
logger.debug(f"No changes detected for {user_id}")
|
||||
# Send if never indexed or modified since last index
|
||||
if indexed_at is None or modified_at > indexed_at:
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=doc_id,
|
||||
doc_type="note",
|
||||
operation="index",
|
||||
modified_at=modified_at,
|
||||
)
|
||||
)
|
||||
queued += 1
|
||||
|
||||
# Check for deleted documents (in Qdrant but not in Nextcloud)
|
||||
# Use grace period: only delete after 2 consecutive scans confirm absence
|
||||
settings = get_settings()
|
||||
grace_period = (
|
||||
settings.vector_sync_scan_interval * 1.5
|
||||
) # Allow 1.5 scan intervals
|
||||
current_time = time.time()
|
||||
|
||||
for doc_id in indexed_docs:
|
||||
if doc_id not in nextcloud_doc_ids:
|
||||
doc_key = (user_id, doc_id)
|
||||
|
||||
if doc_key in _potentially_deleted:
|
||||
# Already marked as potentially deleted, check if grace period elapsed
|
||||
first_missing_time = _potentially_deleted[doc_key]
|
||||
time_missing = current_time - first_missing_time
|
||||
|
||||
if time_missing >= grace_period:
|
||||
# Grace period elapsed, send for deletion
|
||||
logger.info(
|
||||
f"Document {doc_id} missing for {time_missing:.1f}s "
|
||||
f"(>{grace_period:.1f}s grace period), sending deletion"
|
||||
)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=doc_id,
|
||||
doc_type="note",
|
||||
operation="delete",
|
||||
modified_at=0,
|
||||
)
|
||||
)
|
||||
queued += 1
|
||||
# Remove from tracking after sending deletion
|
||||
del _potentially_deleted[doc_key]
|
||||
else:
|
||||
logger.debug(
|
||||
f"Document {doc_id} still missing "
|
||||
f"({time_missing:.1f}s/{grace_period:.1f}s grace period)"
|
||||
)
|
||||
else:
|
||||
# First time missing, add to grace period tracking
|
||||
logger.debug(
|
||||
f"Document {doc_id} missing for first time, starting grace period"
|
||||
)
|
||||
_potentially_deleted[doc_key] = current_time
|
||||
|
||||
if queued > 0:
|
||||
logger.info(f"Sent {queued} documents for incremental sync: {user_id}")
|
||||
else:
|
||||
logger.debug(f"No changes detected for {user_id}")
|
||||
|
||||
+9
-9
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "nextcloud-mcp-server"
|
||||
version = "0.31.0"
|
||||
version = "0.31.1"
|
||||
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
|
||||
authors = [
|
||||
{name = "Chris Coutinho", email = "chris@coutinho.io"}
|
||||
@@ -23,14 +23,14 @@ dependencies = [
|
||||
"authlib>=1.6.5",
|
||||
"qdrant-client>=1.7.0",
|
||||
# Observability dependencies
|
||||
"prometheus-client>=0.21.0", # Prometheus metrics
|
||||
"opentelemetry-api>=1.28.2", # OpenTelemetry API
|
||||
"opentelemetry-sdk>=1.28.2", # OpenTelemetry SDK
|
||||
"opentelemetry-instrumentation-asgi>=0.49b2", # Auto-instrument ASGI/Starlette
|
||||
"opentelemetry-instrumentation-httpx>=0.49b2", # Auto-instrument httpx client
|
||||
"opentelemetry-instrumentation-logging>=0.49b2", # Logging integration
|
||||
"opentelemetry-exporter-otlp-proto-grpc>=1.28.2", # OTLP gRPC exporter
|
||||
"python-json-logger>=3.2.0", # Structured JSON logging
|
||||
"prometheus-client>=0.21.0", # Prometheus metrics
|
||||
"opentelemetry-api>=1.28.2", # OpenTelemetry API
|
||||
"opentelemetry-sdk>=1.28.2", # OpenTelemetry SDK
|
||||
"opentelemetry-instrumentation-asgi>=0.49b2", # Auto-instrument ASGI/Starlette
|
||||
"opentelemetry-instrumentation-httpx>=0.49b2", # Auto-instrument httpx client
|
||||
"opentelemetry-instrumentation-logging>=0.49b2", # Logging integration
|
||||
"opentelemetry-exporter-otlp-proto-grpc>=1.28.2", # OTLP gRPC exporter
|
||||
"python-json-logger>=3.2.0", # Structured JSON logging
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
|
||||
Reference in New Issue
Block a user