feat(astrolabe): add 3D PCA visualization for semantic search
- Add Plotly.js 3D scatter plot showing search results in PCA space - Create shared visualization.py module to avoid code duplication - Pass include_pca parameter through API chain to enable coordinates - Fix OAuth redirects to use /settings/user/astroglobe The visualization shows document embeddings projected to 3D via PCA, with the query point highlighted in red. Uses Viridis colorscale for score visualization, matching the existing vector-viz page. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
"""Management API for Nextcloud MCP Server.
|
||||
|
||||
Provides REST endpoints for the Nextcloud PHP app to query server status,
|
||||
user sessions, and vector sync metrics. All endpoints use OAuth bearer token
|
||||
authentication via the UnifiedTokenVerifier.
|
||||
"""
|
||||
@@ -0,0 +1,549 @@
|
||||
"""Management API endpoints for Nextcloud PHP app integration.
|
||||
|
||||
ADR-018: Provides REST API endpoints for the Nextcloud PHP app to query:
|
||||
- Server status and version
|
||||
- User session information and background access status
|
||||
- Vector sync metrics
|
||||
- Vector search for visualization
|
||||
|
||||
All endpoints use OAuth bearer token authentication via UnifiedTokenVerifier.
|
||||
The PHP app obtains tokens through PKCE flow and uses them to access these endpoints.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from importlib.metadata import version
|
||||
from typing import Any
|
||||
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Get package version from metadata
|
||||
__version__ = version("nextcloud-mcp-server")
|
||||
|
||||
# Track server start time for uptime calculation
|
||||
_server_start_time = time.time()
|
||||
|
||||
|
||||
def extract_bearer_token(request: Request) -> str | None:
|
||||
"""Extract OAuth bearer token from Authorization header.
|
||||
|
||||
Args:
|
||||
request: Starlette request
|
||||
|
||||
Returns:
|
||||
Token string or None if no valid Authorization header
|
||||
"""
|
||||
auth_header = request.headers.get("Authorization")
|
||||
if not auth_header:
|
||||
return None
|
||||
|
||||
# Parse "Bearer <token>"
|
||||
parts = auth_header.split()
|
||||
if len(parts) != 2 or parts[0].lower() != "bearer":
|
||||
return None
|
||||
|
||||
return parts[1]
|
||||
|
||||
|
||||
async def validate_token_and_get_user(
|
||||
request: Request,
|
||||
) -> tuple[str, dict[str, Any]]:
|
||||
"""Validate OAuth bearer token and extract user ID.
|
||||
|
||||
Args:
|
||||
request: Starlette request with Authorization header
|
||||
|
||||
Returns:
|
||||
Tuple of (user_id, validated_token_data)
|
||||
|
||||
Raises:
|
||||
Exception: If token is invalid or missing
|
||||
"""
|
||||
token = extract_bearer_token(request)
|
||||
if not token:
|
||||
raise ValueError("Missing Authorization header")
|
||||
|
||||
# Get token verifier from app state
|
||||
# Note: This is set in app.py starlette_lifespan for OAuth mode
|
||||
token_verifier = request.app.state.oauth_context["token_verifier"]
|
||||
|
||||
# Validate token (handles both JWT and opaque tokens)
|
||||
# verify_token returns AccessToken object or None
|
||||
access_token = await token_verifier.verify_token(token)
|
||||
|
||||
if not access_token:
|
||||
raise ValueError("Token validation failed")
|
||||
|
||||
# Extract user ID from AccessToken.resource field (set during verification)
|
||||
user_id = access_token.resource
|
||||
if not user_id:
|
||||
raise ValueError("Token missing user identifier")
|
||||
|
||||
# Return user_id and a dict with token info for compatibility
|
||||
validated = {
|
||||
"sub": user_id,
|
||||
"client_id": access_token.client_id,
|
||||
"scopes": access_token.scopes,
|
||||
"expires_at": access_token.expires_at,
|
||||
}
|
||||
|
||||
return user_id, validated
|
||||
|
||||
|
||||
async def get_server_status(request: Request) -> JSONResponse:
|
||||
"""GET /api/v1/status - Server status and version.
|
||||
|
||||
Returns basic server information including version, auth mode,
|
||||
vector sync status, and uptime.
|
||||
|
||||
Public endpoint - no authentication required.
|
||||
"""
|
||||
# Public endpoint - no authentication required
|
||||
|
||||
# Get configuration
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
# Calculate uptime
|
||||
uptime_seconds = int(time.time() - _server_start_time)
|
||||
|
||||
# Determine auth mode
|
||||
nextcloud_username = os.getenv("NEXTCLOUD_USERNAME")
|
||||
nextcloud_password = os.getenv("NEXTCLOUD_PASSWORD")
|
||||
|
||||
if nextcloud_username and nextcloud_password:
|
||||
auth_mode = "basic"
|
||||
else:
|
||||
auth_mode = "oauth"
|
||||
|
||||
response_data = {
|
||||
"version": __version__,
|
||||
"auth_mode": auth_mode,
|
||||
"vector_sync_enabled": settings.vector_sync_enabled,
|
||||
"uptime_seconds": uptime_seconds,
|
||||
"management_api_version": "1.0",
|
||||
}
|
||||
|
||||
# Include OIDC configuration if in OAuth mode
|
||||
if auth_mode == "oauth":
|
||||
# Provide IdP discovery information for NC PHP app
|
||||
oidc_config = {}
|
||||
|
||||
if settings.oidc_discovery_url:
|
||||
oidc_config["discovery_url"] = settings.oidc_discovery_url
|
||||
|
||||
if settings.oidc_issuer:
|
||||
oidc_config["issuer"] = settings.oidc_issuer
|
||||
|
||||
if oidc_config:
|
||||
response_data["oidc"] = oidc_config
|
||||
|
||||
return JSONResponse(response_data)
|
||||
|
||||
|
||||
async def get_vector_sync_status(request: Request) -> JSONResponse:
|
||||
"""GET /api/v1/vector-sync/status - Vector sync metrics.
|
||||
|
||||
Returns real-time indexing status and metrics.
|
||||
|
||||
Requires: VECTOR_SYNC_ENABLED=true
|
||||
|
||||
Public endpoint - no authentication required.
|
||||
"""
|
||||
# Public endpoint - no authentication required
|
||||
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
if not settings.vector_sync_enabled:
|
||||
return JSONResponse(
|
||||
{"error": "Vector sync is disabled on this server"},
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
try:
|
||||
# Get document receive stream from app state (set by starlette_lifespan in app.py)
|
||||
document_receive_stream = getattr(
|
||||
request.app.state, "document_receive_stream", None
|
||||
)
|
||||
|
||||
if document_receive_stream is None:
|
||||
logger.debug("document_receive_stream not available in app state")
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": "unknown",
|
||||
"indexed_documents": 0,
|
||||
"pending_documents": 0,
|
||||
"message": "Vector sync stream not initialized",
|
||||
}
|
||||
)
|
||||
|
||||
# Get pending count from stream statistics
|
||||
stream_stats = document_receive_stream.statistics()
|
||||
pending_count = stream_stats.current_buffer_used
|
||||
|
||||
# Get Qdrant client and query indexed count
|
||||
indexed_count = 0
|
||||
try:
|
||||
from qdrant_client.models import Filter
|
||||
|
||||
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
|
||||
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
||||
|
||||
qdrant_client = await get_qdrant_client()
|
||||
|
||||
# Count documents in collection, excluding placeholders
|
||||
count_result = await qdrant_client.count(
|
||||
collection_name=settings.get_collection_name(),
|
||||
count_filter=Filter(must=[get_placeholder_filter()]),
|
||||
)
|
||||
indexed_count = count_result.count
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to query Qdrant for indexed count: {e}")
|
||||
# Continue with indexed_count = 0
|
||||
|
||||
# Determine status
|
||||
status = "syncing" if pending_count > 0 else "idle"
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"status": status,
|
||||
"indexed_documents": indexed_count,
|
||||
"pending_documents": pending_count,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting vector sync status: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Internal error", "message": str(e)},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
async def get_user_session(request: Request) -> JSONResponse:
|
||||
"""GET /api/v1/users/{user_id}/session - User session details.
|
||||
|
||||
Returns information about the user's MCP session including:
|
||||
- Background access status (offline_access)
|
||||
- IdP profile information
|
||||
|
||||
Requires OAuth bearer token. The user_id in the path must match
|
||||
the user_id in the token.
|
||||
"""
|
||||
try:
|
||||
# Validate OAuth token and extract user
|
||||
token_user_id, validated = await validate_token_and_get_user(request)
|
||||
except Exception as e:
|
||||
logger.warning(f"Unauthorized access to /api/v1/users/{{user_id}}/session: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Unauthorized", "message": str(e)},
|
||||
status_code=401,
|
||||
)
|
||||
|
||||
# Get user_id from path
|
||||
path_user_id = request.path_params.get("user_id")
|
||||
|
||||
# Verify token user matches requested user
|
||||
if token_user_id != path_user_id:
|
||||
logger.warning(
|
||||
f"User {token_user_id} attempted to access session for {path_user_id}"
|
||||
)
|
||||
return JSONResponse(
|
||||
{
|
||||
"error": "Forbidden",
|
||||
"message": "Cannot access another user's session",
|
||||
},
|
||||
status_code=403,
|
||||
)
|
||||
|
||||
# Check if offline access is enabled
|
||||
enable_offline_access = os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() in (
|
||||
"true",
|
||||
"1",
|
||||
"yes",
|
||||
)
|
||||
|
||||
if not enable_offline_access:
|
||||
# Offline access disabled - return minimal session info
|
||||
return JSONResponse(
|
||||
{
|
||||
"session_id": token_user_id,
|
||||
"background_access_granted": False,
|
||||
}
|
||||
)
|
||||
|
||||
# Get refresh token storage from app state
|
||||
storage = request.app.state.oauth_context.get("storage")
|
||||
if not storage:
|
||||
logger.error("Refresh token storage not available in app state")
|
||||
return JSONResponse(
|
||||
{
|
||||
"session_id": token_user_id,
|
||||
"background_access_granted": False,
|
||||
"error": "Storage not configured",
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
# Check if user has refresh token stored
|
||||
refresh_token_data = await storage.get_refresh_token(token_user_id)
|
||||
|
||||
if not refresh_token_data:
|
||||
# No refresh token - user hasn't provisioned background access
|
||||
return JSONResponse(
|
||||
{
|
||||
"session_id": token_user_id,
|
||||
"background_access_granted": False,
|
||||
}
|
||||
)
|
||||
|
||||
# User has background access - get profile info
|
||||
profile = await storage.get_user_profile(token_user_id)
|
||||
|
||||
response_data = {
|
||||
"session_id": token_user_id,
|
||||
"background_access_granted": True,
|
||||
"background_access_details": {
|
||||
"granted_at": refresh_token_data.get("created_at"),
|
||||
"scopes": refresh_token_data.get("scope", "").split(),
|
||||
},
|
||||
}
|
||||
|
||||
if profile:
|
||||
response_data["idp_profile"] = profile
|
||||
|
||||
return JSONResponse(response_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting user session for {token_user_id}: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Internal error", "message": str(e)},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
async def revoke_user_access(request: Request) -> JSONResponse:
|
||||
"""POST /api/v1/users/{user_id}/revoke - Revoke user's background access.
|
||||
|
||||
Deletes the user's stored refresh token, removing their offline access.
|
||||
|
||||
Requires OAuth bearer token. The user_id in the path must match
|
||||
the user_id in the token.
|
||||
"""
|
||||
try:
|
||||
# Validate OAuth token and extract user
|
||||
token_user_id, validated = await validate_token_and_get_user(request)
|
||||
except Exception as e:
|
||||
logger.warning(f"Unauthorized access to /api/v1/users/{{user_id}}/revoke: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Unauthorized", "message": str(e)},
|
||||
status_code=401,
|
||||
)
|
||||
|
||||
# Get user_id from path
|
||||
path_user_id = request.path_params.get("user_id")
|
||||
|
||||
# Verify token user matches requested user
|
||||
if token_user_id != path_user_id:
|
||||
logger.warning(
|
||||
f"User {token_user_id} attempted to revoke access for {path_user_id}"
|
||||
)
|
||||
return JSONResponse(
|
||||
{
|
||||
"error": "Forbidden",
|
||||
"message": "Cannot revoke another user's access",
|
||||
},
|
||||
status_code=403,
|
||||
)
|
||||
|
||||
# Get refresh token storage from app state
|
||||
storage = request.app.state.oauth_context.get("storage")
|
||||
if not storage:
|
||||
logger.error("Refresh token storage not available in app state")
|
||||
return JSONResponse(
|
||||
{"error": "Storage not configured"},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
try:
|
||||
# Delete refresh token
|
||||
await storage.delete_refresh_token(token_user_id)
|
||||
logger.info(f"Revoked background access for user: {token_user_id}")
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"success": True,
|
||||
"message": f"Background access revoked for {token_user_id}",
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error revoking access for {token_user_id}: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Internal error", "message": str(e)},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
async def vector_search(request: Request) -> JSONResponse:
|
||||
"""POST /api/v1/vector-viz/search - Vector search for visualization.
|
||||
|
||||
Executes semantic search and returns results with optional PCA coordinates
|
||||
for 2D visualization.
|
||||
|
||||
Request body:
|
||||
{
|
||||
"query": "search query",
|
||||
"algorithm": "semantic|bm25|hybrid", // default: hybrid
|
||||
"limit": 10, // max: 50
|
||||
"include_pca": true, // whether to include 2D coordinates
|
||||
"doc_types": ["note", "file"] // optional filter by document types
|
||||
}
|
||||
|
||||
Requires OAuth bearer token for user filtering.
|
||||
"""
|
||||
from nextcloud_mcp_server.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
if not settings.vector_sync_enabled:
|
||||
return JSONResponse(
|
||||
{"error": "Vector sync is disabled on this server"},
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
# Validate OAuth token and extract user
|
||||
try:
|
||||
user_id, _validated = await validate_token_and_get_user(request)
|
||||
except Exception as e:
|
||||
logger.warning(f"Unauthorized access to /api/v1/vector-viz/search: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Unauthorized", "message": str(e)},
|
||||
status_code=401,
|
||||
)
|
||||
|
||||
try:
|
||||
# Parse request body
|
||||
body = await request.json()
|
||||
query = body.get("query", "")
|
||||
algorithm = body.get("algorithm", "hybrid")
|
||||
limit = min(body.get("limit", 10), 50) # Enforce max limit
|
||||
include_pca = body.get("include_pca", True)
|
||||
doc_types = body.get("doc_types") # Optional list of document types
|
||||
|
||||
if not query:
|
||||
return JSONResponse(
|
||||
{"error": "Missing required parameter: query"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
# Validate algorithm
|
||||
valid_algorithms = {"semantic", "bm25", "hybrid"}
|
||||
if algorithm not in valid_algorithms:
|
||||
algorithm = "hybrid"
|
||||
|
||||
# Execute search using the appropriate algorithm
|
||||
from nextcloud_mcp_server.search import (
|
||||
BM25HybridSearchAlgorithm,
|
||||
SemanticSearchAlgorithm,
|
||||
)
|
||||
|
||||
# Select search algorithm
|
||||
if algorithm == "semantic":
|
||||
search_algo = SemanticSearchAlgorithm(score_threshold=0.0)
|
||||
else:
|
||||
# Both "hybrid" and "bm25" use the BM25HybridSearchAlgorithm
|
||||
# which combines dense semantic and sparse BM25 vectors
|
||||
search_algo = BM25HybridSearchAlgorithm(score_threshold=0.0, fusion="rrf")
|
||||
|
||||
# Execute search for each doc_type if specified, otherwise search all
|
||||
all_results = []
|
||||
if doc_types and isinstance(doc_types, list):
|
||||
# Search each doc_type separately and merge results
|
||||
for doc_type in doc_types:
|
||||
if doc_type: # Skip empty strings
|
||||
results = await search_algo.search(
|
||||
query=query,
|
||||
user_id=user_id,
|
||||
limit=limit,
|
||||
doc_type=doc_type,
|
||||
)
|
||||
all_results.extend(results)
|
||||
# Sort merged results by score and limit
|
||||
all_results.sort(key=lambda r: r.score, reverse=True)
|
||||
all_results = all_results[:limit]
|
||||
else:
|
||||
# Search all document types
|
||||
all_results = await search_algo.search(
|
||||
query=query,
|
||||
user_id=user_id,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
# Format results for PHP client
|
||||
formatted_results = []
|
||||
for result in all_results:
|
||||
formatted_results.append(
|
||||
{
|
||||
"id": result.id,
|
||||
"doc_type": result.doc_type,
|
||||
"title": result.title,
|
||||
"excerpt": result.excerpt[:200] if result.excerpt else "",
|
||||
"score": result.score,
|
||||
"metadata": result.metadata,
|
||||
}
|
||||
)
|
||||
|
||||
response_data: dict[str, Any] = {
|
||||
"results": formatted_results,
|
||||
"algorithm_used": algorithm,
|
||||
"total_documents": len(formatted_results),
|
||||
}
|
||||
|
||||
# Compute PCA coordinates for visualization using shared function
|
||||
if include_pca and len(all_results) >= 2:
|
||||
try:
|
||||
from nextcloud_mcp_server.vector.visualization import (
|
||||
compute_pca_coordinates,
|
||||
)
|
||||
|
||||
# Get query embedding from search algorithm or generate it
|
||||
if search_algo.query_embedding is not None:
|
||||
query_embedding = search_algo.query_embedding
|
||||
else:
|
||||
from nextcloud_mcp_server.embedding.service import (
|
||||
get_embedding_service,
|
||||
)
|
||||
|
||||
embedding_service = get_embedding_service()
|
||||
query_embedding = await embedding_service.embed(query)
|
||||
|
||||
pca_data = await compute_pca_coordinates(all_results, query_embedding)
|
||||
response_data["coordinates_3d"] = pca_data["coordinates_3d"]
|
||||
response_data["query_coords"] = pca_data["query_coords"]
|
||||
if "pca_variance" in pca_data:
|
||||
response_data["pca_variance"] = pca_data["pca_variance"]
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to compute PCA coordinates: {e}")
|
||||
response_data["coordinates_3d"] = []
|
||||
response_data["query_coords"] = []
|
||||
elif include_pca:
|
||||
# Not enough results for PCA
|
||||
response_data["coordinates_3d"] = []
|
||||
response_data["query_coords"] = []
|
||||
|
||||
return JSONResponse(response_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing vector search: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Internal error", "message": str(e)},
|
||||
status_code=500,
|
||||
)
|
||||
Reference in New Issue
Block a user