feat(search): add file_path metadata and chunk offsets to search results

Changes:
- Add file_path to metadata in semantic and BM25 hybrid search algorithms
  for PDF viewer integration (search/semantic.py:161-163, search/bm25_hybrid.py:230-232)
- Include chunk_start_offset, chunk_end_offset, page_number, and page_count
  in search results for rich chunk display (api/management.py:981-1004)
- Add point_id field to SearchResult for batch retrieval (models/semantic.py)
- Fix type narrowing for chunk context API parameters (api/management.py:1102-1111)
- Fix None-safety in doc_types discovery (search/algorithms.py:114)

This enables the Astroglobe UI to display PDF pages at the correct
location for matched chunks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-12-15 21:31:10 +01:00
parent a026f2eddb
commit 85db90a2df
5 changed files with 504 additions and 35 deletions
+488 -34
View File
@@ -393,6 +393,275 @@ async def revoke_user_access(request: Request) -> JSONResponse:
)
async def get_installed_apps(request: Request) -> JSONResponse:
"""GET /api/v1/apps - Get list of installed Nextcloud apps.
Returns a list of installed app IDs for filtering webhook presets.
Requires OAuth bearer token for authentication.
"""
try:
# Validate OAuth token and extract user
user_id, validated = await validate_token_and_get_user(request)
except Exception as e:
logger.warning(f"Unauthorized access to /api/v1/apps: {e}")
return JSONResponse(
{"error": "Unauthorized", "message": str(e)},
status_code=401,
)
try:
import httpx
# Get Bearer token from request
token = extract_bearer_token(request)
if not token:
raise ValueError("Missing Authorization header")
# Get Nextcloud host from OAuth context
oauth_ctx = request.app.state.oauth_context
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise ValueError("Nextcloud host not configured")
# Create authenticated HTTP client
async with httpx.AsyncClient(
base_url=nextcloud_host,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
) as client:
# Get installed apps using OCS API
# Notes, Calendar, Deck, Tables, etc. are apps that support webhooks
# We check which ones are installed and enabled
ocs_url = "/ocs/v1.php/cloud/apps"
params = {"filter": "enabled"}
response = await client.get(
ocs_url,
params=params,
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
if response.status_code != 200:
raise ValueError(f"OCS API returned status {response.status_code}")
data = response.json()
apps = data.get("ocs", {}).get("data", {}).get("apps", [])
return JSONResponse({"apps": apps})
except Exception as e:
logger.error(f"Error getting installed apps for user {user_id}: {e}")
return JSONResponse(
{"error": "Internal error", "message": str(e)},
status_code=500,
)
async def list_webhooks(request: Request) -> JSONResponse:
"""GET /api/v1/webhooks - List all registered webhooks.
Returns list of webhook registrations for the authenticated user.
Requires OAuth bearer token for authentication.
"""
try:
# Validate OAuth token and extract user
user_id, validated = await validate_token_and_get_user(request)
except Exception as e:
logger.warning(f"Unauthorized access to /api/v1/webhooks: {e}")
return JSONResponse(
{"error": "Unauthorized", "message": str(e)},
status_code=401,
)
try:
import httpx
from nextcloud_mcp_server.client.webhooks import WebhooksClient
# Get Bearer token from request
token = extract_bearer_token(request)
if not token:
raise ValueError("Missing Authorization header")
# Get Nextcloud host from OAuth context
oauth_ctx = request.app.state.oauth_context
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise ValueError("Nextcloud host not configured")
# Create authenticated HTTP client
async with httpx.AsyncClient(
base_url=nextcloud_host,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
) as client:
# Use WebhooksClient to list webhooks
webhooks_client = WebhooksClient(client, user_id)
webhooks = await webhooks_client.list_webhooks()
return JSONResponse({"webhooks": webhooks})
except Exception as e:
logger.error(f"Error listing webhooks for user {user_id}: {e}")
return JSONResponse(
{"error": "Internal error", "message": str(e)},
status_code=500,
)
async def create_webhook(request: Request) -> JSONResponse:
"""POST /api/v1/webhooks - Create a new webhook registration.
Request body:
{
"event": "OCP\\Files\\Events\\Node\\NodeCreatedEvent",
"uri": "http://mcp:8000/webhooks/nextcloud",
"eventFilter": {"event.node.path": "/^\\/.*\\/files\\/Notes\\//"}
}
Returns the created webhook data including the webhook ID.
Requires OAuth bearer token for authentication.
"""
try:
# Validate OAuth token and extract user
user_id, validated = await validate_token_and_get_user(request)
except Exception as e:
logger.warning(f"Unauthorized access to /api/v1/webhooks: {e}")
return JSONResponse(
{"error": "Unauthorized", "message": str(e)},
status_code=401,
)
try:
import httpx
from nextcloud_mcp_server.client.webhooks import WebhooksClient
# Parse request body
body = await request.json()
event = body.get("event")
uri = body.get("uri")
# Accept both camelCase (eventFilter) and snake_case (event_filter)
event_filter = body.get("eventFilter") or body.get("event_filter")
if not event or not uri:
return JSONResponse(
{
"error": "Bad request",
"message": "Missing required fields: event, uri",
},
status_code=400,
)
# Get Bearer token from request
token = extract_bearer_token(request)
if not token:
raise ValueError("Missing Authorization header")
# Get Nextcloud host from OAuth context
oauth_ctx = request.app.state.oauth_context
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise ValueError("Nextcloud host not configured")
# Create authenticated HTTP client
async with httpx.AsyncClient(
base_url=nextcloud_host,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
) as client:
# Use WebhooksClient to create webhook
webhooks_client = WebhooksClient(client, user_id)
webhook_data = await webhooks_client.create_webhook(
event=event, uri=uri, event_filter=event_filter
)
return JSONResponse({"webhook": webhook_data})
except Exception as e:
logger.error(f"Error creating webhook for user {user_id}: {e}")
return JSONResponse(
{"error": "Internal error", "message": str(e)},
status_code=500,
)
async def delete_webhook(request: Request) -> JSONResponse:
"""DELETE /api/v1/webhooks/{webhook_id} - Delete a webhook registration.
Returns success/failure status.
Requires OAuth bearer token for authentication.
"""
try:
# Validate OAuth token and extract user
user_id, validated = await validate_token_and_get_user(request)
except Exception as e:
logger.warning(f"Unauthorized access to /api/v1/webhooks: {e}")
return JSONResponse(
{"error": "Unauthorized", "message": str(e)},
status_code=401,
)
try:
import httpx
from nextcloud_mcp_server.client.webhooks import WebhooksClient
# Get webhook_id from path parameter
webhook_id = request.path_params.get("webhook_id")
if not webhook_id:
return JSONResponse(
{"error": "Bad request", "message": "Missing webhook_id"},
status_code=400,
)
try:
webhook_id = int(webhook_id)
except ValueError:
return JSONResponse(
{"error": "Bad request", "message": "Invalid webhook_id"},
status_code=400,
)
# Get Bearer token from request
token = extract_bearer_token(request)
if not token:
raise ValueError("Missing Authorization header")
# Get Nextcloud host from OAuth context
oauth_ctx = request.app.state.oauth_context
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise ValueError("Nextcloud host not configured")
# Create authenticated HTTP client
async with httpx.AsyncClient(
base_url=nextcloud_host,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
) as client:
# Use WebhooksClient to delete webhook
webhooks_client = WebhooksClient(client, user_id)
await webhooks_client.delete_webhook(webhook_id=webhook_id)
return JSONResponse({"success": True, "message": "Webhook deleted"})
except Exception as e:
logger.error(f"Error deleting webhook for user {user_id}: {e}")
return JSONResponse(
{"error": "Internal error", "message": str(e)},
status_code=500,
)
async def unified_search(request: Request) -> JSONResponse:
"""POST /api/v1/search - Search endpoint for Nextcloud Unified Search.
@@ -513,29 +782,12 @@ async def unified_search(request: Request) -> JSONResponse:
limit=search_limit,
)
# Deduplicate results by document (multiple chunks may come from same doc)
# Keep highest-scoring chunk per document
doc_map: dict[str, Any] = {} # key: "doc_type:id" -> best result
for result in all_results:
# Build document key from type and ID
doc_id = result.id
if result.metadata:
# Use note_id if present (for notes), otherwise use result.id
doc_id = result.metadata.get("note_id", result.id)
doc_key = f"{result.doc_type}:{doc_id}"
# Sort results by score (no deduplication - show all chunks)
sorted_results = sorted(all_results, key=lambda r: r.score, reverse=True)
# Keep only the highest-scoring chunk per document
if doc_key not in doc_map or result.score > doc_map[doc_key].score:
doc_map[doc_key] = result
# Convert back to list and sort by score
deduplicated_results = sorted(
doc_map.values(), key=lambda r: r.score, reverse=True
)
# Calculate total and apply pagination (on deduplicated results)
total_found = len(deduplicated_results)
paginated_results = deduplicated_results[offset : offset + limit]
# Calculate total and apply pagination
total_found = len(sorted_results)
paginated_results = sorted_results[offset : offset + limit]
# Format results for Unified Search
formatted_results = []
@@ -576,6 +828,16 @@ async def unified_search(request: Request) -> JSONResponse:
if "event_uid" in result.metadata:
result_data["event_uid"] = result.metadata["event_uid"]
# Add PDF page metadata
if result.page_number is not None:
result_data["page_number"] = result.page_number
if result.page_count is not None:
result_data["page_count"] = result.page_count
# Add chunk metadata (always present, defaults to 0 and 1)
result_data["chunk_index"] = result.chunk_index
result_data["total_chunks"] = result.total_chunks
formatted_results.append(result_data)
response_data: dict[str, Any] = {
@@ -659,6 +921,8 @@ async def vector_search(request: Request) -> JSONResponse:
body = await request.json()
query = body.get("query", "")
algorithm = body.get("algorithm", "hybrid")
fusion = body.get("fusion", "rrf")
score_threshold = body.get("score_threshold", 0.0)
limit = min(body.get("limit", 10), 50) # Enforce max limit
include_pca = body.get("include_pca", True)
doc_types = body.get("doc_types") # Optional list of document types
@@ -674,6 +938,11 @@ async def vector_search(request: Request) -> JSONResponse:
if algorithm not in valid_algorithms:
algorithm = "hybrid"
# Validate fusion method
valid_fusions = {"rrf", "dbsf"}
if fusion not in valid_fusions:
fusion = "rrf"
# Execute search using the appropriate algorithm
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
@@ -682,11 +951,13 @@ async def vector_search(request: Request) -> JSONResponse:
# Select search algorithm
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=0.0)
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
else:
# Both "hybrid" and "bm25" use the BM25HybridSearchAlgorithm
# which combines dense semantic and sparse BM25 vectors
search_algo = BM25HybridSearchAlgorithm(score_threshold=0.0, fusion="rrf")
search_algo = BM25HybridSearchAlgorithm(
score_threshold=score_threshold, fusion=fusion
)
# Execute search for each doc_type if specified, otherwise search all
all_results = []
@@ -715,16 +986,27 @@ async def vector_search(request: Request) -> JSONResponse:
# Format results for PHP client
formatted_results = []
for result in all_results:
formatted_results.append(
{
"id": result.id,
"doc_type": result.doc_type,
"title": result.title,
"excerpt": result.excerpt[:200] if result.excerpt else "",
"score": result.score,
"metadata": result.metadata,
}
)
formatted_result = {
"id": result.id,
"doc_type": result.doc_type,
"title": result.title,
"excerpt": result.excerpt[:200] if result.excerpt else "",
"score": result.score,
"metadata": result.metadata,
# Chunk information for context display
"chunk_index": result.chunk_index,
"total_chunks": result.total_chunks,
}
# Include optional fields if present
if result.chunk_start_offset is not None:
formatted_result["chunk_start_offset"] = result.chunk_start_offset
if result.chunk_end_offset is not None:
formatted_result["chunk_end_offset"] = result.chunk_end_offset
if result.page_number is not None:
formatted_result["page_number"] = result.page_number
if result.page_count is not None:
formatted_result["page_count"] = result.page_count
formatted_results.append(formatted_result)
response_data: dict[str, Any] = {
"results": formatted_results,
@@ -772,3 +1054,175 @@ async def vector_search(request: Request) -> JSONResponse:
{"error": "Internal error", "message": str(e)},
status_code=500,
)
async def get_chunk_context(request: Request) -> JSONResponse:
"""GET /api/v1/chunk-context - Fetch chunk text with context.
Retrieves the matched chunk along with surrounding text and metadata.
Used by clients to display chunk context and highlighted PDFs.
Query parameters:
doc_type: Document type (e.g., "note")
doc_id: Document ID
start: Chunk start offset (character position)
end: Chunk end offset (character position)
context: Characters of context before/after (default: 500)
Requires OAuth bearer token for authentication.
"""
try:
# Validate OAuth token and extract user
user_id, validated = await validate_token_and_get_user(request)
except Exception as e:
logger.warning(f"Unauthorized access to /api/v1/chunk-context: {e}")
return JSONResponse(
{"error": "Unauthorized", "message": str(e)},
status_code=401,
)
try:
# Get query parameters
doc_type = request.query_params.get("doc_type")
doc_id = request.query_params.get("doc_id")
start_str = request.query_params.get("start")
end_str = request.query_params.get("end")
context_chars = int(request.query_params.get("context", "500"))
# Validate required parameters
if not all([doc_type, doc_id, start_str, end_str]):
return JSONResponse(
{
"success": False,
"error": "Missing required parameters: doc_type, doc_id, start, end",
},
status_code=400,
)
# Type narrowing: we already checked these are not None above
assert start_str is not None
assert end_str is not None
assert doc_id is not None
assert doc_type is not None
start = int(start_str)
end = int(end_str)
# Convert doc_id to int if possible (most IDs are int)
doc_id_val: str | int = int(doc_id) if doc_id.isdigit() else doc_id
# Get bearer token for client initialization
token = extract_bearer_token(request)
if not token:
raise ValueError("Missing token")
# Get Nextcloud host from OAuth context
oauth_ctx = request.app.state.oauth_context
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise ValueError("Nextcloud host not configured")
# Initialize authenticated Nextcloud client
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.search.context import get_chunk_with_context
async with NextcloudClient.from_token(
base_url=nextcloud_host, token=token, username=user_id
) as nc_client:
chunk_context = await get_chunk_with_context(
nc_client=nc_client,
user_id=user_id,
doc_id=doc_id_val,
doc_type=doc_type,
chunk_start=start,
chunk_end=end,
context_chars=context_chars,
)
if chunk_context is None:
return JSONResponse(
{
"success": False,
"error": f"Failed to fetch chunk context for {doc_type} {doc_id}",
},
status_code=404,
)
# For PDF files, also fetch the highlighted page image from Qdrant if available
# This is useful for clients that want to show a pre-rendered image
highlighted_page_image = None
page_number = chunk_context.page_number
if doc_type == "file":
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.placeholder import (
get_placeholder_filter,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
# Query for this specific chunk's highlighted image
points_response = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
get_placeholder_filter(),
FieldCondition(
key="doc_id", match=MatchValue(value=doc_id_val)
),
FieldCondition(
key="user_id", match=MatchValue(value=user_id)
),
FieldCondition(
key="chunk_start_offset", match=MatchValue(value=start)
),
FieldCondition(
key="chunk_end_offset", match=MatchValue(value=end)
),
]
),
limit=1,
with_vectors=False,
with_payload=["highlighted_page_image", "page_number"],
)
if points_response[0]:
payload = points_response[0][0].payload
if payload:
highlighted_page_image = payload.get("highlighted_page_image")
# Trust Qdrant page number if available (might be more accurate than context expansion logic)
if payload.get("page_number") is not None:
page_number = payload.get("page_number")
except Exception as e:
logger.warning(f"Failed to fetch highlighted image: {e}")
# Build response
response_data = {
"success": True,
"chunk_text": chunk_context.chunk_text,
"before_context": chunk_context.before_context,
"after_context": chunk_context.after_context,
"has_more_before": chunk_context.has_before_truncation,
"has_more_after": chunk_context.has_after_truncation,
"page_number": page_number,
"chunk_index": chunk_context.chunk_index,
"total_chunks": chunk_context.total_chunks,
}
if highlighted_page_image:
response_data["highlighted_page_image"] = highlighted_page_image
return JSONResponse(response_data)
except Exception as e:
logger.error(f"Error getting chunk context: {e}", exc_info=True)
return JSONResponse(
{"error": "Internal error", "message": str(e)},
status_code=500,
)
+3
View File
@@ -38,6 +38,9 @@ class SemanticSearchResult(BaseModel):
page_number: Optional[int] = Field(
default=None, description="Page number for PDF documents"
)
page_count: Optional[int] = Field(
default=None, description="Total number of pages in PDF document"
)
# Context expansion fields (optional, populated when include_context=True)
has_context_expansion: bool = Field(
default=False, description="Whether context expansion was performed"
+3 -1
View File
@@ -111,7 +111,7 @@ async def get_indexed_doc_types(user_id: str) -> set[str]:
doc_types: set[str] = {
str(point.payload.get("doc_type"))
for point in scroll_results
if point.payload.get("doc_type")
if point.payload and point.payload.get("doc_type")
}
logger.debug(f"Found indexed document types for user {user_id}: {doc_types}")
@@ -138,6 +138,7 @@ class SearchResult:
chunk_start_offset: Character position where chunk starts (None if not available)
chunk_end_offset: Character position where chunk ends (None if not available)
page_number: Page number for PDF documents (None for other doc types)
page_count: Total number of pages in PDF document (None for other doc types)
chunk_index: Zero-based index of this chunk in the document
total_chunks: Total number of chunks in the document
point_id: Qdrant point ID for batch vector retrieval (None if not from Qdrant)
@@ -152,6 +153,7 @@ class SearchResult:
chunk_start_offset: int | None = None
chunk_end_offset: int | None = None
page_number: int | None = None
page_count: int | None = None
chunk_index: int = 0
total_chunks: int = 1
point_id: str | None = None
@@ -226,6 +226,10 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
"search_method": f"bm25_hybrid_{self.fusion_name}",
}
# Add file-specific metadata for PDF viewer
if doc_type == "file" and (path := result.payload.get("file_path")):
metadata["path"] = path
# Add deck_card-specific metadata for frontend URL construction
if doc_type == "deck_card":
if board_id := result.payload.get("board_id"):
@@ -243,6 +247,7 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
page_count=result.payload.get("page_count"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
+5
View File
@@ -157,6 +157,10 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
"total_chunks": result.payload.get("total_chunks"),
}
# Add file-specific metadata for PDF viewer
if doc_type == "file" and (path := result.payload.get("file_path")):
metadata["path"] = path
# Add deck_card-specific metadata for frontend URL construction
if doc_type == "deck_card":
if board_id := result.payload.get("board_id"):
@@ -174,6 +178,7 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
page_count=result.payload.get("page_count"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval