feat: Implement Qdrant placeholder state management
Introduces a placeholder-based state tracking system to prevent duplicate document processing during the gap between scanner queuing and processor completion. **Key Changes:** 1. **Placeholder Helper Functions** (`vector/placeholder.py`): - `write_placeholder_point()` - Creates zero-vector placeholder when queuing - `query_document_metadata()` - Queries for existing entry (placeholder or real) - `delete_placeholder_point()` - Removes placeholder before writing real vectors - `get_placeholder_filter()` - Filters placeholders from user-facing queries 2. **Scanner Updates** (`vector/scanner.py`): - Replace `indexed_at` comparison with `modified_at` comparison - Write placeholder before queuing each document - Query per-document metadata instead of bulk-querying indexed_at - Fixes bug where files were resubmitted every scan cycle 3. **Processor Updates** (`vector/processor.py`): - Delete placeholder before upserting real vectors - Ensures no duplicate points in Qdrant 4. **Query Filters** (all search files): - Add `get_placeholder_filter()` to all user-facing queries - Ensures placeholders never appear in search results or visualizations - Applied to: bm25_hybrid.py, semantic.py, viz_routes.py, algorithms.py **Architecture:** - Placeholders use zero vectors with dimension from embedding service - Payload includes `is_placeholder: True` flag for filtering - Status field tracks: "pending", "processing", "completed", "failed" - Deterministic UUIDs using uuid5 for consistent point IDs **Impact:** - Eliminates duplicate processing of same documents - Fixes race condition where long-running documents get queued multiple times - Prevents scanner from resubmitting files every scan cycle - Maintains clean separation between in-flight and indexed documents 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,7 @@ COPY --from=ghcr.io/astral-sh/uv:0.9.10@sha256:29bd45092ea8902c0bbb7f0a338f0494a
|
||||
# 2. sqlite for development with token db
|
||||
RUN apt update && apt install --no-install-recommends --no-install-suggests -y \
|
||||
git \
|
||||
tesseract-ocr \
|
||||
sqlite3 && apt clean
|
||||
|
||||
WORKDIR /app
|
||||
@@ -17,5 +18,6 @@ RUN uv sync --locked --no-dev --no-editable --no-cache
|
||||
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
ENV VIRTUAL_ENV=/app/.venv
|
||||
ENV TESSDATA_PREFIX=/usr/share/tesseract-ocr/5/tessdata
|
||||
|
||||
ENTRYPOINT ["/app/.venv/bin/nextcloud-mcp-server", "--host", "0.0.0.0"]
|
||||
|
||||
@@ -27,6 +27,7 @@ from nextcloud_mcp_server.search import (
|
||||
SemanticSearchAlgorithm,
|
||||
)
|
||||
from nextcloud_mcp_server.vector.pca import PCA
|
||||
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -233,6 +234,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
|
||||
|
||||
# Build filter for this specific chunk
|
||||
must_conditions = [
|
||||
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
|
||||
FieldCondition(
|
||||
key="doc_id",
|
||||
match=MatchValue(value=result.id),
|
||||
|
||||
@@ -83,6 +83,7 @@ async def get_indexed_doc_types(user_id: str) -> set[str]:
|
||||
from qdrant_client.models import FieldCondition, Filter, MatchValue
|
||||
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -97,7 +98,10 @@ async def get_indexed_doc_types(user_id: str) -> set[str]:
|
||||
scroll_results, _next_offset = await qdrant_client.scroll(
|
||||
collection_name=collection,
|
||||
scroll_filter=Filter(
|
||||
must=[FieldCondition(key="user_id", match=MatchValue(value=user_id))]
|
||||
must=[
|
||||
get_placeholder_filter(), # Exclude placeholders from doc_type discovery
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
]
|
||||
),
|
||||
limit=1000, # Sample size to discover types
|
||||
with_payload=["doc_type"],
|
||||
|
||||
@@ -10,6 +10,7 @@ from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
|
||||
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
|
||||
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
|
||||
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -112,10 +113,11 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
|
||||
|
||||
# Build Qdrant filter
|
||||
filter_conditions = [
|
||||
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
|
||||
FieldCondition(
|
||||
key="user_id",
|
||||
match=MatchValue(value=user_id),
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
# Add doc_type filter if specified
|
||||
|
||||
@@ -9,6 +9,7 @@ from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.embedding import get_embedding_service
|
||||
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
|
||||
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
|
||||
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -83,10 +84,11 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
|
||||
|
||||
# Build Qdrant filter
|
||||
filter_conditions = [
|
||||
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
|
||||
FieldCondition(
|
||||
key="user_id",
|
||||
match=MatchValue(value=user_id),
|
||||
)
|
||||
),
|
||||
]
|
||||
|
||||
# Add doc_type filter if specified
|
||||
|
||||
@@ -0,0 +1,300 @@
|
||||
"""Placeholder point management for Qdrant state tracking.
|
||||
|
||||
Placeholders are zero-vector points stored in Qdrant to track document processing
|
||||
state. They prevent duplicate work by marking documents as "in-flight" during the
|
||||
gap between scanner queuing and processor completion.
|
||||
|
||||
Architecture:
|
||||
- Scanner writes placeholders when queuing documents for processing
|
||||
- Processor deletes placeholders and writes real vectors after processing
|
||||
- All user-facing queries filter out placeholders (is_placeholder: False)
|
||||
|
||||
Placeholders contain:
|
||||
- Zero vectors (dimension from embedding service)
|
||||
- is_placeholder: True flag (for filtering)
|
||||
- status: "pending", "processing", "completed", "failed"
|
||||
- modified_at, etag from source document
|
||||
- queued_at timestamp
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
|
||||
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.embedding import get_embedding_service
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _generate_placeholder_id(doc_type: str, doc_id: str | int) -> str:
|
||||
"""Generate deterministic UUID for placeholder point.
|
||||
|
||||
Args:
|
||||
doc_type: Document type (note, file, etc.)
|
||||
doc_id: Document ID
|
||||
|
||||
Returns:
|
||||
UUID string for point ID
|
||||
"""
|
||||
point_name = f"{doc_type}:{doc_id}:placeholder"
|
||||
return str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
|
||||
|
||||
|
||||
async def write_placeholder_point(
|
||||
doc_id: str | int,
|
||||
doc_type: str,
|
||||
user_id: str,
|
||||
modified_at: int,
|
||||
etag: str = "",
|
||||
file_path: str | None = None,
|
||||
) -> None:
|
||||
"""Write a placeholder point to Qdrant to mark document as queued.
|
||||
|
||||
This should be called by the scanner BEFORE queuing a document for processing.
|
||||
The placeholder prevents duplicate work if the scanner runs again before
|
||||
processing completes.
|
||||
|
||||
Args:
|
||||
doc_id: Document ID (int for notes/files)
|
||||
doc_type: Document type (note, file, etc.)
|
||||
user_id: User ID who owns the document
|
||||
modified_at: Document modification timestamp
|
||||
etag: Document ETag (if available)
|
||||
file_path: File path (for files only)
|
||||
|
||||
Raises:
|
||||
Exception: If Qdrant write fails
|
||||
"""
|
||||
try:
|
||||
qdrant_client = await get_qdrant_client()
|
||||
settings = get_settings()
|
||||
embedding_service = get_embedding_service()
|
||||
|
||||
# Get dimension dynamically (never hardcode)
|
||||
dimension = embedding_service.get_dimension()
|
||||
|
||||
# Create zero vectors
|
||||
zero_dense = [0.0] * dimension
|
||||
|
||||
# Generate deterministic point ID
|
||||
point_id = _generate_placeholder_id(doc_type, doc_id)
|
||||
|
||||
# Build payload
|
||||
payload = {
|
||||
"user_id": user_id,
|
||||
"doc_id": doc_id,
|
||||
"doc_type": doc_type,
|
||||
"is_placeholder": True,
|
||||
"status": "pending",
|
||||
"modified_at": modified_at,
|
||||
"etag": etag,
|
||||
"queued_at": int(time.time()),
|
||||
}
|
||||
|
||||
# Add file_path for files
|
||||
if doc_type == "file" and file_path:
|
||||
payload["file_path"] = file_path
|
||||
|
||||
# Create placeholder point
|
||||
point = PointStruct(
|
||||
id=point_id,
|
||||
vector={
|
||||
"dense": zero_dense,
|
||||
"sparse": None, # No sparse vector for placeholders
|
||||
},
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
# Upsert to Qdrant
|
||||
await qdrant_client.upsert(
|
||||
collection_name=settings.get_collection_name(),
|
||||
points=[point],
|
||||
wait=True,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Wrote placeholder for {doc_type}_{doc_id} (user={user_id}, "
|
||||
f"modified_at={modified_at})"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to write placeholder for {doc_type}_{doc_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def query_document_metadata(
|
||||
doc_id: str | int,
|
||||
doc_type: str,
|
||||
user_id: str,
|
||||
) -> dict | None:
|
||||
"""Query Qdrant for existing document entry (placeholder or real).
|
||||
|
||||
Returns the payload of the first matching point, which could be:
|
||||
- A placeholder (is_placeholder: True)
|
||||
- A real indexed document (is_placeholder: False or missing)
|
||||
- None if document not in Qdrant
|
||||
|
||||
Args:
|
||||
doc_id: Document ID
|
||||
doc_type: Document type
|
||||
user_id: User ID
|
||||
|
||||
Returns:
|
||||
Payload dict if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
qdrant_client = await get_qdrant_client()
|
||||
settings = get_settings()
|
||||
|
||||
# Query for any entry matching doc_id, doc_type, user_id
|
||||
scroll_result = await qdrant_client.scroll(
|
||||
collection_name=settings.get_collection_name(),
|
||||
scroll_filter=Filter(
|
||||
must=[
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
|
||||
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
|
||||
]
|
||||
),
|
||||
limit=1,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
|
||||
if scroll_result[0]:
|
||||
point = scroll_result[0][0]
|
||||
return dict(point.payload)
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error querying document metadata for {doc_type}_{doc_id}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def delete_placeholder_point(
|
||||
doc_id: str | int,
|
||||
doc_type: str,
|
||||
user_id: str,
|
||||
) -> None:
|
||||
"""Delete a placeholder point from Qdrant.
|
||||
|
||||
This should be called by the processor BEFORE writing real vectors.
|
||||
We delete the placeholder to avoid duplicates, then write the real chunks.
|
||||
|
||||
Args:
|
||||
doc_id: Document ID
|
||||
doc_type: Document type
|
||||
user_id: User ID
|
||||
|
||||
Raises:
|
||||
Exception: If Qdrant delete fails
|
||||
"""
|
||||
try:
|
||||
qdrant_client = await get_qdrant_client()
|
||||
settings = get_settings()
|
||||
|
||||
# Delete by filter (in case there are multiple chunks from old indexing)
|
||||
await qdrant_client.delete(
|
||||
collection_name=settings.get_collection_name(),
|
||||
points_selector=Filter(
|
||||
must=[
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
|
||||
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
|
||||
FieldCondition(key="is_placeholder", match=MatchValue(value=True)),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
logger.debug(f"Deleted placeholder for {doc_type}_{doc_id} (user={user_id})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to delete placeholder for {doc_type}_{doc_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
async def update_placeholder_status(
|
||||
doc_id: str | int,
|
||||
doc_type: str,
|
||||
user_id: str,
|
||||
status: str,
|
||||
) -> None:
|
||||
"""Update the status field of a placeholder point.
|
||||
|
||||
Status values:
|
||||
- "pending": Queued for processing
|
||||
- "processing": Currently being processed
|
||||
- "completed": Processing completed successfully
|
||||
- "failed": Processing failed
|
||||
|
||||
Args:
|
||||
doc_id: Document ID
|
||||
doc_type: Document type
|
||||
user_id: User ID
|
||||
status: New status value
|
||||
|
||||
Raises:
|
||||
Exception: If Qdrant update fails
|
||||
"""
|
||||
try:
|
||||
qdrant_client = await get_qdrant_client()
|
||||
settings = get_settings()
|
||||
|
||||
# Update payload using set_payload
|
||||
await qdrant_client.set_payload(
|
||||
collection_name=settings.get_collection_name(),
|
||||
payload={"status": status},
|
||||
points=Filter(
|
||||
must=[
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
|
||||
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
|
||||
FieldCondition(key="is_placeholder", match=MatchValue(value=True)),
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Updated placeholder status for {doc_type}_{doc_id} to '{status}' "
|
||||
f"(user={user_id})"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to update placeholder status for {doc_type}_{doc_id}: {e}"
|
||||
)
|
||||
# Don't raise - status updates are non-critical
|
||||
|
||||
|
||||
def get_placeholder_filter() -> FieldCondition:
|
||||
"""Get a filter condition to exclude placeholders from queries.
|
||||
|
||||
Add this to all user-facing search/visualization queries to ensure
|
||||
placeholders are never returned to users.
|
||||
|
||||
Returns:
|
||||
FieldCondition that filters out is_placeholder: True
|
||||
|
||||
Example:
|
||||
Filter(
|
||||
must=[
|
||||
get_placeholder_filter(), # Exclude placeholders
|
||||
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
|
||||
]
|
||||
)
|
||||
"""
|
||||
return FieldCondition(
|
||||
key="is_placeholder",
|
||||
match=MatchValue(value=False),
|
||||
)
|
||||
@@ -23,6 +23,7 @@ from nextcloud_mcp_server.observability.metrics import (
|
||||
)
|
||||
from nextcloud_mcp_server.observability.tracing import trace_operation
|
||||
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
|
||||
from nextcloud_mcp_server.vector.placeholder import delete_placeholder_point
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
from nextcloud_mcp_server.vector.scanner import DocumentTask
|
||||
|
||||
@@ -418,6 +419,20 @@ async def _index_document(
|
||||
)
|
||||
)
|
||||
|
||||
# Delete placeholder before writing real vectors
|
||||
# This prevents duplicates and cleans up the placeholder state
|
||||
try:
|
||||
await delete_placeholder_point(
|
||||
doc_id=doc_task.doc_id,
|
||||
doc_type=doc_task.doc_type,
|
||||
user_id=doc_task.user_id,
|
||||
)
|
||||
except Exception as e:
|
||||
# Log but don't fail indexing if placeholder deletion fails
|
||||
logger.warning(
|
||||
f"Failed to delete placeholder for {doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
||||
)
|
||||
|
||||
# Upsert to Qdrant
|
||||
await qdrant_client.upsert(
|
||||
collection_name=settings.get_collection_name(),
|
||||
|
||||
@@ -17,6 +17,10 @@ from nextcloud_mcp_server.client import NextcloudClient
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
from nextcloud_mcp_server.observability.metrics import record_vector_sync_scan
|
||||
from nextcloud_mcp_server.observability.tracing import trace_operation
|
||||
from nextcloud_mcp_server.vector.placeholder import (
|
||||
query_document_metadata,
|
||||
write_placeholder_point,
|
||||
)
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -184,8 +188,9 @@ async def scan_user_documents(
|
||||
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
|
||||
)
|
||||
|
||||
# Get indexed state from Qdrant first (for incremental sync)
|
||||
indexed_docs = {}
|
||||
# For deletion tracking, get all doc_ids in Qdrant (for incremental sync)
|
||||
# Note: We no longer bulk-query indexed_at, instead check per-document
|
||||
indexed_doc_ids = set()
|
||||
if not initial_sync:
|
||||
qdrant_client = await get_qdrant_client()
|
||||
scroll_result = await qdrant_client.scroll(
|
||||
@@ -196,17 +201,14 @@ async def scan_user_documents(
|
||||
FieldCondition(key="doc_type", match=MatchValue(value="note")),
|
||||
]
|
||||
),
|
||||
with_payload=["doc_id", "indexed_at"],
|
||||
with_payload=["doc_id"],
|
||||
with_vectors=False,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
indexed_docs = {
|
||||
point.payload["doc_id"]: point.payload["indexed_at"]
|
||||
for point in scroll_result[0]
|
||||
}
|
||||
indexed_doc_ids = {point.payload["doc_id"] for point in scroll_result[0]}
|
||||
|
||||
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
|
||||
logger.debug(f"Found {len(indexed_doc_ids)} indexed documents in Qdrant")
|
||||
|
||||
# Stream notes from Nextcloud and process immediately
|
||||
note_count = 0
|
||||
@@ -220,7 +222,14 @@ async def scan_user_documents(
|
||||
modified_at = note.get("modified", 0)
|
||||
|
||||
if initial_sync:
|
||||
# Send everything on first sync
|
||||
# Send everything on first sync - write placeholder first
|
||||
await write_placeholder_point(
|
||||
doc_id=doc_id,
|
||||
doc_type="note",
|
||||
user_id=user_id,
|
||||
modified_at=modified_at,
|
||||
etag=note.get("etag", ""),
|
||||
)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
@@ -232,9 +241,7 @@ async def scan_user_documents(
|
||||
)
|
||||
queued += 1
|
||||
else:
|
||||
# Incremental sync: compare with indexed state
|
||||
indexed_at = indexed_docs.get(doc_id)
|
||||
|
||||
# Incremental sync: check if document exists and compare modified_at
|
||||
# If document reappeared, remove from potentially_deleted
|
||||
doc_key = (user_id, doc_id)
|
||||
if doc_key in _potentially_deleted:
|
||||
@@ -243,8 +250,36 @@ async def scan_user_documents(
|
||||
)
|
||||
del _potentially_deleted[doc_key]
|
||||
|
||||
# Query Qdrant for existing entry (placeholder or real)
|
||||
existing_metadata = await query_document_metadata(
|
||||
doc_id=doc_id, doc_type="note", user_id=user_id
|
||||
)
|
||||
|
||||
# Send if never indexed or modified since last index
|
||||
if indexed_at is None or modified_at > indexed_at:
|
||||
# Compare against stored modified_at (not indexed_at!)
|
||||
needs_indexing = False
|
||||
if existing_metadata is None:
|
||||
# Never seen before
|
||||
needs_indexing = True
|
||||
elif existing_metadata.get("modified_at", 0) < modified_at:
|
||||
# Document modified since last indexing
|
||||
needs_indexing = True
|
||||
elif existing_metadata.get("is_placeholder", False):
|
||||
# Placeholder exists but processing may have failed - requeue
|
||||
logger.debug(
|
||||
f"Found existing placeholder for note {doc_id}, requeuing"
|
||||
)
|
||||
needs_indexing = True
|
||||
|
||||
if needs_indexing:
|
||||
# Write placeholder before queuing
|
||||
await write_placeholder_point(
|
||||
doc_id=doc_id,
|
||||
doc_type="note",
|
||||
user_id=user_id,
|
||||
modified_at=modified_at,
|
||||
etag=note.get("etag", ""),
|
||||
)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
@@ -272,7 +307,7 @@ async def scan_user_documents(
|
||||
) # Allow 1.5 scan intervals
|
||||
current_time = time.time()
|
||||
|
||||
for doc_id in indexed_docs:
|
||||
for doc_id in indexed_doc_ids:
|
||||
if doc_id not in nextcloud_doc_ids:
|
||||
doc_key = (user_id, doc_id)
|
||||
|
||||
@@ -312,8 +347,8 @@ async def scan_user_documents(
|
||||
_potentially_deleted[doc_key] = current_time
|
||||
|
||||
# Scan tagged PDF files (after notes)
|
||||
# Get indexed files from Qdrant (separate query for doc_type="file")
|
||||
indexed_files = {}
|
||||
# Get indexed file IDs from Qdrant (for deletion tracking)
|
||||
indexed_file_ids = set()
|
||||
if not initial_sync:
|
||||
file_scroll_result = await qdrant_client.scroll(
|
||||
collection_name=settings.get_collection_name(),
|
||||
@@ -324,16 +359,15 @@ async def scan_user_documents(
|
||||
]
|
||||
),
|
||||
limit=10000, # Reasonable limit for file count
|
||||
with_payload=["doc_id", "indexed_at"],
|
||||
with_payload=["doc_id"],
|
||||
with_vectors=False,
|
||||
)
|
||||
|
||||
indexed_files = {
|
||||
point.payload["doc_id"]: point.payload["indexed_at"]
|
||||
for point in file_scroll_result[0]
|
||||
indexed_file_ids = {
|
||||
point.payload["doc_id"] for point in file_scroll_result[0]
|
||||
}
|
||||
|
||||
logger.debug(f"Found {len(indexed_files)} indexed files in Qdrant")
|
||||
logger.debug(f"Found {len(indexed_file_ids)} indexed files in Qdrant")
|
||||
|
||||
# Scan for tagged PDF files
|
||||
file_count = 0
|
||||
@@ -370,7 +404,14 @@ async def scan_user_documents(
|
||||
pass
|
||||
|
||||
if initial_sync:
|
||||
# Send everything on first sync
|
||||
# Send everything on first sync - write placeholder first
|
||||
await write_placeholder_point(
|
||||
doc_id=file_id,
|
||||
doc_type="file",
|
||||
user_id=user_id,
|
||||
modified_at=modified_at,
|
||||
file_path=file_path,
|
||||
)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
@@ -383,9 +424,7 @@ async def scan_user_documents(
|
||||
)
|
||||
file_queued += 1
|
||||
else:
|
||||
# Incremental sync: compare with indexed state
|
||||
indexed_at = indexed_files.get(file_id)
|
||||
|
||||
# Incremental sync: check if file exists and compare modified_at
|
||||
# If file reappeared, remove from potentially_deleted
|
||||
file_key = (user_id, file_id)
|
||||
if file_key in _potentially_deleted:
|
||||
@@ -394,8 +433,36 @@ async def scan_user_documents(
|
||||
)
|
||||
del _potentially_deleted[file_key]
|
||||
|
||||
# Query Qdrant for existing entry (placeholder or real)
|
||||
existing_metadata = await query_document_metadata(
|
||||
doc_id=file_id, doc_type="file", user_id=user_id
|
||||
)
|
||||
|
||||
# Send if never indexed or modified since last index
|
||||
if indexed_at is None or modified_at > indexed_at:
|
||||
# Compare against stored modified_at (not indexed_at!)
|
||||
needs_indexing = False
|
||||
if existing_metadata is None:
|
||||
# Never seen before
|
||||
needs_indexing = True
|
||||
elif existing_metadata.get("modified_at", 0) < modified_at:
|
||||
# File modified since last indexing
|
||||
needs_indexing = True
|
||||
elif existing_metadata.get("is_placeholder", False):
|
||||
# Placeholder exists but processing may have failed - requeue
|
||||
logger.debug(
|
||||
f"Found existing placeholder for file {file_path} (ID: {file_id}), requeuing"
|
||||
)
|
||||
needs_indexing = True
|
||||
|
||||
if needs_indexing:
|
||||
# Write placeholder before queuing
|
||||
await write_placeholder_point(
|
||||
doc_id=file_id,
|
||||
doc_type="file",
|
||||
user_id=user_id,
|
||||
modified_at=modified_at,
|
||||
file_path=file_path,
|
||||
)
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
@@ -415,7 +482,7 @@ async def scan_user_documents(
|
||||
|
||||
# Check for deleted files (not initial sync)
|
||||
if not initial_sync:
|
||||
for file_id in indexed_files:
|
||||
for file_id in indexed_file_ids:
|
||||
if file_id not in nextcloud_file_ids:
|
||||
file_key = (user_id, file_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user