diff --git a/nextcloud_mcp_server/client/notes.py b/nextcloud_mcp_server/client/notes.py index ef5609d..601e94f 100644 --- a/nextcloud_mcp_server/client/notes.py +++ b/nextcloud_mcp_server/client/notes.py @@ -18,18 +18,57 @@ class NotesClient(BaseNextcloudClient): response = await self._make_request("GET", "/apps/notes/api/v1/settings") return response.json() - async def get_all_notes(self) -> AsyncIterator[Dict[str, Any]]: - """Get all notes, yielding them one at a time.""" + async def get_all_notes( + self, prune_before: Optional[int] = None + ) -> AsyncIterator[Dict[str, Any]]: + """Get all notes, yielding them one at a time. + + The Notes API returns changed notes with full data in chunks, and ALL note IDs + (with only 'id' field) in the last chunk for deletion detection. This causes + duplicates which we handle by tracking seen IDs (first occurrence with full + data is kept, later pruned duplicates are skipped). + + Args: + prune_before: Optional Unix timestamp. Notes unchanged since this time + are pruned (only 'id' field returned in last chunk). + Reduces data transfer for large note collections. + + Yields: + Note dictionaries with full data (deduplicated). + """ cursor = "" + seen_ids: set[int] = set() while True: + params: Dict[str, Any] = {"chunkSize": 10} + if cursor: + params["chunkCursor"] = cursor + if prune_before is not None: + params["pruneBefore"] = prune_before + response = await self._make_request( "GET", "/apps/notes/api/v1/notes", - params={"chunkSize": 10, "chunkCursor": cursor}, + params=params, ) - for note in response.json(): + response_data = response.json() + + for note in response_data: + note_id = note.get("id") + if note_id is None: + logger.warning(f"Skipping note without ID: {note}") + continue + + # Skip duplicates (API returns all IDs in last chunk for deletion detection) + if note_id in seen_ids: + logger.debug( + f"Skipping duplicate note {note_id} (pruned version in last chunk)" + ) + continue + + seen_ids.add(note_id) yield note + if "X-Notes-Chunk-Cursor" not in response.headers: break cursor = response.headers["X-Notes-Chunk-Cursor"] diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index c625638..0c64426 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -34,6 +34,57 @@ class DocumentTask: _potentially_deleted: dict[tuple[str, str], float] = {} +async def get_last_indexed_timestamp(user_id: str) -> int | None: + """Get the most recent indexed_at timestamp for user's notes in Qdrant. + + This timestamp can be used as pruneBefore parameter to optimize data transfer + when fetching notes - only notes modified after this timestamp will be sent + with full data. + + Args: + user_id: User to query + + Returns: + Unix timestamp of most recently indexed note, or None if no notes indexed yet + """ + try: + qdrant_client = await get_qdrant_client() + + # Query for user's notes, ordered by indexed_at descending, limit 1 + scroll_result = await qdrant_client.scroll( + collection_name=get_settings().get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_type", match=MatchValue(value="note")), + ] + ), + with_payload=["indexed_at"], + with_vectors=False, + limit=10000, # Get all to find max + ) + + # Find max indexed_at across all results + num_points = len(scroll_result[0]) if scroll_result[0] else 0 + logger.info(f"Found {num_points} indexed notes in Qdrant for user {user_id}") + + if scroll_result[0]: + timestamps = [ + point.payload.get("indexed_at", 0) for point in scroll_result[0] + ] + max_timestamp = max(timestamps) + logger.info( + f"Max indexed_at: {max_timestamp}, timestamps sample: {timestamps[:3]}" + ) + return int(max_timestamp) if max_timestamp > 0 else None + + logger.info(f"No indexed notes found for user {user_id}") + return None + except Exception as e: + logger.warning(f"Failed to get last indexed timestamp: {e}", exc_info=True) + return None + + async def scanner_task( send_stream: MemoryObjectSendStream[DocumentTask], shutdown_event: anyio.Event, @@ -96,22 +147,31 @@ async def scan_user_documents( nc_client: Authenticated Nextcloud client initial_sync: If True, send all documents (first-time sync) """ - logger.debug(f"Scanning documents for user: {user_id}") + import random + + scan_id = random.randint(1000, 9999) + logger.info( + f"[SCAN-{scan_id}] Starting scan for user: {user_id}, initial_sync={initial_sync}" + ) + + # Calculate prune timestamp for optimized data transfer + # Only notes modified after this will be sent with full data + prune_before = None if initial_sync else await get_last_indexed_timestamp(user_id) + if prune_before: + logger.info( + f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer" + ) # Fetch all notes from Nextcloud - notes = [note async for note in nc_client.notes.get_all_notes()] - logger.debug(f"Found {len(notes)} notes for {user_id}") + notes = [ + note async for note in nc_client.notes.get_all_notes(prune_before=prune_before) + ] + logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}") if initial_sync: # Send everything on first sync for note in notes: - # Handle missing 'modified' field (use 0 as fallback) modified_at = note.get("modified", 0) - if modified_at == 0: - logger.warning( - f"Note {note['id']} missing 'modified' field, using 0 as fallback" - ) - await send_stream.send( DocumentTask( user_id=user_id, @@ -153,13 +213,7 @@ async def scan_user_documents( for note in notes: doc_id = str(note["id"]) indexed_at = indexed_docs.get(doc_id) - - # Handle missing 'modified' field (use 0 as fallback) modified_at = note.get("modified", 0) - if modified_at == 0: - logger.warning( - f"Note {doc_id} missing 'modified' field, using 0 as fallback" - ) # If document reappeared, remove from potentially_deleted doc_key = (user_id, doc_id)