fix: optimize Notes API pagination with pruneBefore parameter
The Nextcloud Notes API intentionally returns all note IDs (with only 'id' field) in the last chunk to enable deletion detection. Without using the pruneBefore parameter, this causes duplicates - all notes appear with full data in chunks, then again with minimal data in the last chunk. This commit implements proper pruneBefore support: - NotesClient.get_all_notes() now accepts prune_before timestamp parameter - Scanner calculates max(indexed_at) from Qdrant to use as prune threshold - Only notes modified after this timestamp are sent with full data - Deduplication logic handles the API's deletion detection pattern - Significantly reduces data transfer for incremental syncs The behavior is documented in Notes API v1 spec - this is not an API bug, but a feature we weren't utilizing correctly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -18,18 +18,57 @@ class NotesClient(BaseNextcloudClient):
|
||||
response = await self._make_request("GET", "/apps/notes/api/v1/settings")
|
||||
return response.json()
|
||||
|
||||
async def get_all_notes(self) -> AsyncIterator[Dict[str, Any]]:
|
||||
"""Get all notes, yielding them one at a time."""
|
||||
async def get_all_notes(
|
||||
self, prune_before: Optional[int] = None
|
||||
) -> AsyncIterator[Dict[str, Any]]:
|
||||
"""Get all notes, yielding them one at a time.
|
||||
|
||||
The Notes API returns changed notes with full data in chunks, and ALL note IDs
|
||||
(with only 'id' field) in the last chunk for deletion detection. This causes
|
||||
duplicates which we handle by tracking seen IDs (first occurrence with full
|
||||
data is kept, later pruned duplicates are skipped).
|
||||
|
||||
Args:
|
||||
prune_before: Optional Unix timestamp. Notes unchanged since this time
|
||||
are pruned (only 'id' field returned in last chunk).
|
||||
Reduces data transfer for large note collections.
|
||||
|
||||
Yields:
|
||||
Note dictionaries with full data (deduplicated).
|
||||
"""
|
||||
cursor = ""
|
||||
seen_ids: set[int] = set()
|
||||
|
||||
while True:
|
||||
params: Dict[str, Any] = {"chunkSize": 10}
|
||||
if cursor:
|
||||
params["chunkCursor"] = cursor
|
||||
if prune_before is not None:
|
||||
params["pruneBefore"] = prune_before
|
||||
|
||||
response = await self._make_request(
|
||||
"GET",
|
||||
"/apps/notes/api/v1/notes",
|
||||
params={"chunkSize": 10, "chunkCursor": cursor},
|
||||
params=params,
|
||||
)
|
||||
for note in response.json():
|
||||
response_data = response.json()
|
||||
|
||||
for note in response_data:
|
||||
note_id = note.get("id")
|
||||
if note_id is None:
|
||||
logger.warning(f"Skipping note without ID: {note}")
|
||||
continue
|
||||
|
||||
# Skip duplicates (API returns all IDs in last chunk for deletion detection)
|
||||
if note_id in seen_ids:
|
||||
logger.debug(
|
||||
f"Skipping duplicate note {note_id} (pruned version in last chunk)"
|
||||
)
|
||||
continue
|
||||
|
||||
seen_ids.add(note_id)
|
||||
yield note
|
||||
|
||||
if "X-Notes-Chunk-Cursor" not in response.headers:
|
||||
break
|
||||
cursor = response.headers["X-Notes-Chunk-Cursor"]
|
||||
|
||||
@@ -34,6 +34,57 @@ class DocumentTask:
|
||||
_potentially_deleted: dict[tuple[str, str], float] = {}
|
||||
|
||||
|
||||
async def get_last_indexed_timestamp(user_id: str) -> int | None:
|
||||
"""Get the most recent indexed_at timestamp for user's notes in Qdrant.
|
||||
|
||||
This timestamp can be used as pruneBefore parameter to optimize data transfer
|
||||
when fetching notes - only notes modified after this timestamp will be sent
|
||||
with full data.
|
||||
|
||||
Args:
|
||||
user_id: User to query
|
||||
|
||||
Returns:
|
||||
Unix timestamp of most recently indexed note, or None if no notes indexed yet
|
||||
"""
|
||||
try:
|
||||
qdrant_client = await get_qdrant_client()
|
||||
|
||||
# Query for user's notes, ordered by indexed_at descending, limit 1
|
||||
scroll_result = await qdrant_client.scroll(
|
||||
collection_name=get_settings().get_collection_name(),
|
||||
scroll_filter=Filter(
|
||||
must=[
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
FieldCondition(key="doc_type", match=MatchValue(value="note")),
|
||||
]
|
||||
),
|
||||
with_payload=["indexed_at"],
|
||||
with_vectors=False,
|
||||
limit=10000, # Get all to find max
|
||||
)
|
||||
|
||||
# Find max indexed_at across all results
|
||||
num_points = len(scroll_result[0]) if scroll_result[0] else 0
|
||||
logger.info(f"Found {num_points} indexed notes in Qdrant for user {user_id}")
|
||||
|
||||
if scroll_result[0]:
|
||||
timestamps = [
|
||||
point.payload.get("indexed_at", 0) for point in scroll_result[0]
|
||||
]
|
||||
max_timestamp = max(timestamps)
|
||||
logger.info(
|
||||
f"Max indexed_at: {max_timestamp}, timestamps sample: {timestamps[:3]}"
|
||||
)
|
||||
return int(max_timestamp) if max_timestamp > 0 else None
|
||||
|
||||
logger.info(f"No indexed notes found for user {user_id}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get last indexed timestamp: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
async def scanner_task(
|
||||
send_stream: MemoryObjectSendStream[DocumentTask],
|
||||
shutdown_event: anyio.Event,
|
||||
@@ -96,22 +147,31 @@ async def scan_user_documents(
|
||||
nc_client: Authenticated Nextcloud client
|
||||
initial_sync: If True, send all documents (first-time sync)
|
||||
"""
|
||||
logger.debug(f"Scanning documents for user: {user_id}")
|
||||
import random
|
||||
|
||||
scan_id = random.randint(1000, 9999)
|
||||
logger.info(
|
||||
f"[SCAN-{scan_id}] Starting scan for user: {user_id}, initial_sync={initial_sync}"
|
||||
)
|
||||
|
||||
# Calculate prune timestamp for optimized data transfer
|
||||
# Only notes modified after this will be sent with full data
|
||||
prune_before = None if initial_sync else await get_last_indexed_timestamp(user_id)
|
||||
if prune_before:
|
||||
logger.info(
|
||||
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
|
||||
)
|
||||
|
||||
# Fetch all notes from Nextcloud
|
||||
notes = [note async for note in nc_client.notes.get_all_notes()]
|
||||
logger.debug(f"Found {len(notes)} notes for {user_id}")
|
||||
notes = [
|
||||
note async for note in nc_client.notes.get_all_notes(prune_before=prune_before)
|
||||
]
|
||||
logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}")
|
||||
|
||||
if initial_sync:
|
||||
# Send everything on first sync
|
||||
for note in notes:
|
||||
# Handle missing 'modified' field (use 0 as fallback)
|
||||
modified_at = note.get("modified", 0)
|
||||
if modified_at == 0:
|
||||
logger.warning(
|
||||
f"Note {note['id']} missing 'modified' field, using 0 as fallback"
|
||||
)
|
||||
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
@@ -153,13 +213,7 @@ async def scan_user_documents(
|
||||
for note in notes:
|
||||
doc_id = str(note["id"])
|
||||
indexed_at = indexed_docs.get(doc_id)
|
||||
|
||||
# Handle missing 'modified' field (use 0 as fallback)
|
||||
modified_at = note.get("modified", 0)
|
||||
if modified_at == 0:
|
||||
logger.warning(
|
||||
f"Note {doc_id} missing 'modified' field, using 0 as fallback"
|
||||
)
|
||||
|
||||
# If document reappeared, remove from potentially_deleted
|
||||
doc_key = (user_id, doc_id)
|
||||
|
||||
Reference in New Issue
Block a user