refactor: enforce PLC0415 (import-outside-top-level) for source code

Enable ruff PLC0415 rule for all source files (tests excluded via
per-file-ignores). Move 136 inline imports to top-level across 33 files.
8 imports suppressed with noqa for legitimate reasons: circular
dependencies (client/__init__.py, context.py), optional dependency
guards (app.py document processors, auth/userinfo_routes.py), and
post-env-setup imports (smithery_main.py).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2026-02-20 08:04:50 +01:00
parent 81efa6e263
commit a11ae9c027
32 changed files with 157 additions and 348 deletions
+6 -13
View File
@@ -20,9 +20,15 @@ import time
from importlib.metadata import version
from typing import Any
from qdrant_client.models import Filter
from starlette.requests import Request
from starlette.responses import JSONResponse
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.config_validators import AuthMode, detect_auth_mode
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -196,16 +202,12 @@ async def get_server_status(request: Request) -> JSONResponse:
# Public endpoint - no authentication required
# Get configuration
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
# Calculate uptime
uptime_seconds = int(time.time() - _server_start_time)
# Determine auth mode using proper mode detection
from nextcloud_mcp_server.config_validators import AuthMode, detect_auth_mode
mode = detect_auth_mode(settings)
# Map deployment mode to auth_mode for API response
@@ -266,8 +268,6 @@ async def get_vector_sync_status(request: Request) -> JSONResponse:
"""
# Public endpoint - no authentication required
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if not settings.vector_sync_enabled:
return JSONResponse(
@@ -299,11 +299,6 @@ async def get_vector_sync_status(request: Request) -> JSONResponse:
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from qdrant_client.models import Filter
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
# Count documents in collection, excluding placeholders
@@ -375,8 +370,6 @@ async def get_user_session(request: Request) -> JSONResponse:
# Check if offline access is enabled
# Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new)
# and ENABLE_OFFLINE_ACCESS (deprecated) environment variables
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
enable_offline_access = settings.enable_offline_access
+5 -13
View File
@@ -15,18 +15,16 @@ import logging
import re
import time
from collections import defaultdict
from typing import TYPE_CHECKING
import httpx
from starlette.requests import Request
from starlette.responses import JSONResponse
from ..http import nextcloud_httpx_client
if TYPE_CHECKING:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.api.management import _sanitize_error_for_client
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.config import get_settings
from ..http import nextcloud_httpx_client
logger = logging.getLogger(__name__)
@@ -158,7 +156,7 @@ def _extract_basic_auth(
return username, password, None
async def _get_app_password_storage(request: Request) -> "RefreshTokenStorage":
async def _get_app_password_storage(request: Request) -> RefreshTokenStorage:
"""Get or initialize RefreshTokenStorage for app password operations.
Checks app.state.storage first, then falls back to creating from environment.
@@ -170,8 +168,6 @@ async def _get_app_password_storage(request: Request) -> "RefreshTokenStorage":
Returns:
Initialized RefreshTokenStorage instance
"""
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = getattr(request.app.state, "storage", None)
if not storage:
@@ -202,8 +198,6 @@ async def provision_app_password(request: Request) -> JSONResponse:
- Only the user who owns the password can provision it
- Rate limited to prevent brute-force attacks
"""
from nextcloud_mcp_server.config import get_settings
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
@@ -364,8 +358,6 @@ async def delete_app_password(request: Request) -> JSONResponse:
Requires BasicAuth with the user's credentials.
"""
from nextcloud_mcp_server.config import get_settings
# Get user_id from path
path_user_id = request.path_params.get("user_id")
if not path_user_id:
+13 -50
View File
@@ -11,13 +11,10 @@ All endpoints require OAuth bearer token authentication via UnifiedTokenVerifier
import base64
import logging
from typing import TYPE_CHECKING, Any
from typing import Any
import pymupdf
if TYPE_CHECKING:
pass
from qdrant_client.models import FieldCondition, Filter, MatchValue
from starlette.requests import Request
from starlette.responses import JSONResponse
@@ -29,6 +26,17 @@ from nextcloud_mcp_server.api.management import (
extract_bearer_token,
validate_token_and_get_user,
)
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding.service import get_embedding_service
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm,
)
from nextcloud_mcp_server.search.context import get_chunk_with_context
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.visualization import compute_pca_coordinates
logger = logging.getLogger(__name__)
@@ -68,8 +76,6 @@ async def unified_search(request: Request) -> JSONResponse:
Requires OAuth bearer token for user filtering.
"""
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if not settings.vector_sync_enabled:
return JSONResponse(
@@ -144,12 +150,6 @@ async def unified_search(request: Request) -> JSONResponse:
if fusion not in valid_fusions:
fusion = "rrf"
# Execute search using the appropriate algorithm
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm,
)
# Select search algorithm
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -254,17 +254,9 @@ async def unified_search(request: Request) -> JSONResponse:
# Optional PCA coordinates
if include_pca and len(paginated_results) >= 2:
try:
from nextcloud_mcp_server.vector.visualization import (
compute_pca_coordinates,
)
if search_algo.query_embedding is not None:
query_embedding = search_algo.query_embedding
else:
from nextcloud_mcp_server.embedding.service import (
get_embedding_service,
)
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
@@ -305,8 +297,6 @@ async def vector_search(request: Request) -> JSONResponse:
Requires OAuth bearer token for user filtering.
"""
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if not settings.vector_sync_enabled:
return JSONResponse(
@@ -354,12 +344,6 @@ async def vector_search(request: Request) -> JSONResponse:
if fusion not in valid_fusions:
fusion = "rrf"
# Execute search using the appropriate algorithm
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm,
)
# Select search algorithm
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -428,18 +412,10 @@ async def vector_search(request: Request) -> JSONResponse:
# Compute PCA coordinates for visualization using shared function
if include_pca and len(all_results) >= 2:
try:
from nextcloud_mcp_server.vector.visualization import (
compute_pca_coordinates,
)
# Get query embedding from search algorithm or generate it
if search_algo.query_embedding is not None:
query_embedding = search_algo.query_embedding
else:
from nextcloud_mcp_server.embedding.service import (
get_embedding_service,
)
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
@@ -549,9 +525,6 @@ async def get_chunk_context(request: Request) -> JSONResponse:
raise ValueError("Nextcloud host not configured")
# Initialize authenticated Nextcloud client
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.search.context import get_chunk_with_context
async with NextcloudClient.from_token(
base_url=nextcloud_host, token=token, username=user_id
) as nc_client:
@@ -581,14 +554,6 @@ async def get_chunk_context(request: Request) -> JSONResponse:
if doc_type == "file":
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.placeholder import (
get_placeholder_filter,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
@@ -735,8 +700,6 @@ async def get_pdf_preview(request: Request) -> JSONResponse:
raise ValueError("Nextcloud host not configured")
# Download PDF via WebDAV using user's token
from nextcloud_mcp_server.client import NextcloudClient
async with NextcloudClient.from_token(
base_url=nextcloud_host, token=token, username=user_id
) as nc_client:
+1 -6
View File
@@ -18,6 +18,7 @@ from nextcloud_mcp_server.api.management import (
extract_bearer_token,
validate_token_and_get_user,
)
from nextcloud_mcp_server.client.webhooks import WebhooksClient
from ..http import nextcloud_httpx_client
@@ -115,8 +116,6 @@ async def list_webhooks(request: Request) -> JSONResponse:
)
try:
from nextcloud_mcp_server.client.webhooks import WebhooksClient
# Get Bearer token from request
token = extract_bearer_token(request)
if not token:
@@ -180,8 +179,6 @@ async def create_webhook(request: Request) -> JSONResponse:
)
try:
from nextcloud_mcp_server.client.webhooks import WebhooksClient
# Parse request body
body = await request.json()
event = body.get("event")
@@ -256,8 +253,6 @@ async def delete_webhook(request: Request) -> JSONResponse:
)
try:
from nextcloud_mcp_server.client.webhooks import WebhooksClient
# Get webhook_id from path parameter
webhook_id = request.path_params.get("webhook_id")
if not webhook_id:
+60 -91
View File
@@ -10,22 +10,17 @@ from collections.abc import AsyncIterator
from contextlib import AsyncExitStack, asynccontextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, cast
from urllib.parse import urlparse
from typing import Optional, cast
from urllib.parse import parse_qs, urlparse
import anyio
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
if TYPE_CHECKING:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
import click
import httpx
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp.server.auth.settings import AuthSettings
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from pydantic import AnyHttpUrl
from starlette.applications import Starlette
from starlette.middleware.authentication import AuthenticationMiddleware
@@ -36,6 +31,23 @@ from starlette.staticfiles import StaticFiles
from starlette.types import ASGIApp, Receive, Send
from starlette.types import Scope as StarletteScope
from nextcloud_mcp_server.api import (
create_webhook,
delete_app_password,
delete_webhook,
get_app_password_status,
get_chunk_context,
get_installed_apps,
get_pdf_preview,
get_server_status,
get_user_session,
get_vector_sync_status,
list_webhooks,
provision_app_password,
revoke_user_access,
unified_search,
vector_search,
)
from nextcloud_mcp_server.auth import (
InsufficientScopeError,
discover_all_scopes,
@@ -43,7 +55,38 @@ from nextcloud_mcp_server.auth import (
has_required_scopes,
is_jwt_token,
)
from nextcloud_mcp_server.auth.browser_oauth_routes import (
oauth_login,
oauth_login_callback,
oauth_logout,
)
from nextcloud_mcp_server.auth.client_registration import ensure_oauth_client
from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient
from nextcloud_mcp_server.auth.oauth_routes import (
oauth_authorize,
oauth_authorize_nextcloud,
oauth_callback,
oauth_callback_nextcloud,
)
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.auth.userinfo_routes import (
revoke_session,
user_info_html,
vector_sync_status_fragment,
)
from nextcloud_mcp_server.auth.viz_routes import (
chunk_context_endpoint,
vector_visualization_html,
vector_visualization_search,
)
from nextcloud_mcp_server.auth.webhook_routes import (
disable_webhook_preset,
enable_webhook_preset,
webhook_management_pane,
)
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import (
DeploymentMode,
@@ -82,6 +125,11 @@ from nextcloud_mcp_server.server import (
)
from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools
from nextcloud_mcp_server.vector import processor_task, scanner_task
from nextcloud_mcp_server.vector.oauth_sync import (
oauth_processor_task,
user_manager_task,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
HTTPXClientInstrumentor().instrument()
@@ -106,7 +154,7 @@ def initialize_document_processors():
if "unstructured" in config["processors"]:
unst_config = config["processors"]["unstructured"]
try:
from nextcloud_mcp_server.document_processors.unstructured import (
from nextcloud_mcp_server.document_processors.unstructured import ( # noqa: PLC0415
UnstructuredProcessor,
)
@@ -127,7 +175,7 @@ def initialize_document_processors():
if "tesseract" in config["processors"]:
tess_config = config["processors"]["tesseract"]
try:
from nextcloud_mcp_server.document_processors.tesseract import (
from nextcloud_mcp_server.document_processors.tesseract import ( # noqa: PLC0415
TesseractProcessor,
)
@@ -145,7 +193,7 @@ def initialize_document_processors():
if "pymupdf" in config["processors"]:
pymupdf_config = config["processors"]["pymupdf"]
try:
from nextcloud_mcp_server.document_processors.pymupdf import (
from nextcloud_mcp_server.document_processors.pymupdf import ( # noqa: PLC0415
PyMuPDFProcessor,
)
@@ -165,7 +213,7 @@ def initialize_document_processors():
if "custom" in config["processors"]:
custom_config = config["processors"]["custom"]
try:
from nextcloud_mcp_server.document_processors.custom_http import (
from nextcloud_mcp_server.document_processors.custom_http import ( # noqa: PLC0415
CustomHTTPProcessor,
)
@@ -431,8 +479,6 @@ class SmitheryConfigMiddleware:
) -> None:
if scope["type"] == "http":
# Extract config from query parameters
from urllib.parse import parse_qs
query_string = scope.get("query_string", b"").decode("utf-8")
params = parse_qs(query_string)
@@ -507,8 +553,6 @@ async def load_oauth_client_credentials(
# Try loading from SQLite storage
try:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
@@ -559,9 +603,6 @@ async def load_oauth_client_credentials(
logger.info(f"Requesting token type: {token_type}")
# Ensure OAuth client in SQLite storage
from nextcloud_mcp_server.auth.client_registration import ensure_oauth_client
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
@@ -625,8 +666,6 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
)
# Initialize persistent storage (for webhook tracking and future features)
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
logger.info("Persistent storage initialized (webhook tracking enabled)")
@@ -756,10 +795,6 @@ async def setup_oauth_config():
refresh_token_storage = None
if enable_offline_access:
try:
from nextcloud_mcp_server.auth.storage import (
RefreshTokenStorage,
)
# Validate encryption key before initializing
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
@@ -881,8 +916,6 @@ async def setup_oauth_config():
oauth_client = None
if enable_offline_access and refresh_token_storage and is_external_idp:
# For external IdP mode, create generic OIDC client for token operations
from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
# Note: This redirect_uri is for OAuth client initialization, not used for actual redirects
# since this client is used for backend token operations (exchange, refresh)
@@ -1077,8 +1110,6 @@ async def setup_oauth_config_for_multi_user_basic(
refresh_token_storage = None
if settings.enable_offline_access:
try:
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
logger.warning(
@@ -1544,8 +1575,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
)
else:
# BasicAuth mode - initialize storage for webhook management
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
basic_auth_storage = RefreshTokenStorage.from_env()
await basic_auth_storage.initialize()
logger.info("Initialized refresh token storage for webhook management")
@@ -1653,7 +1682,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
@@ -1745,12 +1773,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
mode_desc = "OAuth mode" if oauth_enabled else "Multi-user BasicAuth mode"
logger.info(f"Starting background vector sync tasks for {mode_desc}")
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.vector.oauth_sync import (
oauth_processor_task,
user_manager_task,
)
# Get nextcloud_host (from settings - already validated)
nextcloud_host_for_sync = settings.nextcloud_host
if not nextcloud_host_for_sync:
@@ -1815,7 +1837,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
@@ -2126,24 +2147,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
settings.enable_multi_user_basic_auth and settings.enable_offline_access
)
if enable_management_apis:
from nextcloud_mcp_server.api import (
create_webhook,
delete_app_password,
delete_webhook,
get_app_password_status,
get_chunk_context,
get_installed_apps,
get_pdf_preview,
get_server_status,
get_user_session,
get_vector_sync_status,
list_webhooks,
provision_app_password,
revoke_user_access,
unified_search,
vector_search,
)
routes.append(Route("/api/v1/status", get_server_status, methods=["GET"]))
routes.append(
Route(
@@ -2251,8 +2254,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
f"OAuth provisioning routes enabled for mode: {mode.value} "
f"(oauth_enabled={oauth_enabled}, hybrid_mode={not oauth_enabled})"
)
# Import OAuth routes (ADR-004 Progressive Consent)
from nextcloud_mcp_server.auth.oauth_routes import oauth_authorize
def oauth_protected_resource_metadata(request):
"""RFC 9728 Protected Resource Metadata endpoint.
@@ -2314,12 +2315,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
)
# Add unified OAuth callback endpoint supporting both flows
from nextcloud_mcp_server.auth.oauth_routes import (
oauth_authorize_nextcloud,
oauth_callback,
oauth_callback_nextcloud,
)
routes.append(Route("/oauth/callback", oauth_callback, methods=["GET"]))
logger.info(
"OAuth unified callback enabled: /oauth/callback?flow={browser|provisioning}"
@@ -2348,8 +2343,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Add OAuth Flow 1 routes (MCP client login) - ONLY for OAuth modes
# Multi-user BasicAuth uses hybrid mode with only Flow 2 (resource provisioning)
if oauth_enabled:
from nextcloud_mcp_server.auth.oauth_routes import oauth_authorize
routes.append(Route("/oauth/authorize", oauth_authorize, methods=["GET"]))
logger.info("OAuth login routes enabled: /oauth/authorize (Flow 1)")
@@ -2357,12 +2350,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Available in OAuth modes AND multi-user BasicAuth with offline access
# (hybrid mode). Separate from MCP tool auth - Management API uses OAuth
if oauth_provisioning_available:
from nextcloud_mcp_server.auth.browser_oauth_routes import (
oauth_login,
oauth_login_callback,
oauth_logout,
)
routes.append(
Route("/oauth/login", oauth_login, methods=["GET"], name="oauth_login")
)
@@ -2385,24 +2372,6 @@ def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None =
# Add user info routes (available in both BasicAuth and OAuth modes)
# ADR-016: Skip /app admin UI in Smithery stateless mode (no vector sync, webhooks)
if deployment_mode != DeploymentMode.SMITHERY_STATELESS:
# These require session authentication, so we wrap them in a separate app
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.userinfo_routes import (
revoke_session,
user_info_html,
vector_sync_status_fragment,
)
from nextcloud_mcp_server.auth.viz_routes import (
chunk_context_endpoint,
vector_visualization_html,
vector_visualization_search,
)
from nextcloud_mcp_server.auth.webhook_routes import (
disable_webhook_preset,
enable_webhook_preset,
webhook_management_pane,
)
# Create a separate Starlette app for browser routes that need session auth
# This prevents SessionAuthBackend from interfering with FastMCP's OAuth
browser_routes = [
@@ -11,6 +11,7 @@ import secrets
import time
from base64 import urlsafe_b64encode
from urllib.parse import urlencode
from urllib.parse import urlparse as parse_url
import httpx
import jwt
@@ -153,8 +154,6 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
# Replace internal Docker hostname with public URL
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
from urllib.parse import urlparse as parse_url
internal_parsed = parse_url(oauth_config["nextcloud_host"])
auth_parsed = parse_url(authorization_endpoint)
+1 -2
View File
@@ -10,6 +10,7 @@ import logging
import os
from dataclasses import dataclass
from typing import Dict, List, Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
@@ -161,8 +162,6 @@ class ClientRegistry:
True if valid, False otherwise
"""
# Parse the redirect URI
from urllib.parse import urlparse
parsed = urlparse(redirect_uri)
# Check against registered patterns
+3 -11
View File
@@ -26,11 +26,13 @@ import secrets
import time
from base64 import urlsafe_b64encode
from urllib.parse import urlencode
from urllib.parse import urlparse as parse_url
import jwt
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse
from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse
from nextcloud_mcp_server.auth.browser_oauth_routes import oauth_login_callback
from nextcloud_mcp_server.auth.client_registry import get_client_registry
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
@@ -228,8 +230,6 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
# IMPORTANT: Replace internal Docker hostname with public URL for browser access
# The discovery endpoint returns http://app/apps/oidc/authorize (internal)
# But browsers need http://localhost:8080/apps/oidc/authorize (public)
from urllib.parse import urlparse as parse_url
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
# Parse internal and authorization endpoint to compare hostnames
@@ -364,8 +364,6 @@ async def oauth_authorize_nextcloud(
# Fix internal hostname for browser access
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
from urllib.parse import urlparse as parse_url
internal_parsed = parse_url(oauth_config["nextcloud_host"])
auth_parsed = parse_url(authorization_endpoint)
@@ -567,8 +565,6 @@ async def oauth_callback_nextcloud(request: Request):
</html>
"""
from starlette.responses import HTMLResponse
return HTMLResponse(content=success_html, status_code=200)
@@ -633,10 +629,6 @@ async def oauth_callback(request: Request):
elif flow_type == "browser":
# Browser UI Login - establish browser session for /user/page access
logger.info("Routing to browser login flow")
from nextcloud_mcp_server.auth.browser_oauth_routes import (
oauth_login_callback,
)
return await oauth_login_callback(request)
else:
@@ -15,6 +15,7 @@ from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.config import get_settings
logger = logging.getLogger(__name__)
@@ -66,8 +67,6 @@ def require_provisioning(func: Callable) -> Callable:
# Check if we're in token exchange mode - if so, skip provisioning check
# In token exchange mode, tokens are exchanged per-request (no stored refresh tokens)
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if hasattr(lifespan_ctx, "nextcloud_host") and settings.enable_token_exchange:
# Token exchange mode - per-request exchange, no provisioning needed
@@ -9,6 +9,8 @@ from mcp.server.auth.provider import AccessToken
from mcp.server.fastmcp import Context
from mcp.server.fastmcp.utilities.context_injection import find_context_parameter
from nextcloud_mcp_server.config import get_settings
logger = logging.getLogger(__name__)
@@ -132,8 +134,6 @@ def require_scopes(*required_scopes: str):
# Check if offline access is enabled
# Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new)
# and ENABLE_OFFLINE_ACCESS (deprecated) environment variables
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
enable_offline_access = settings.enable_offline_access
+5 -7
View File
@@ -13,11 +13,13 @@ import traceback
from pathlib import Path
from typing import Any
from httpx import BasicAuth
from jinja2 import Environment, FileSystemLoader
from starlette.authentication import requires
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.auth.permissions import is_nextcloud_admin
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
@@ -56,8 +58,6 @@ async def _get_authenticated_client_for_userinfo(request: Request) -> NextcloudC
if not all([nextcloud_host, username, password]):
raise RuntimeError("BasicAuth credentials not configured")
from httpx import BasicAuth
assert nextcloud_host is not None
assert username is not None
assert password is not None
@@ -129,7 +129,9 @@ async def _get_processing_status(request: Request) -> dict[str, Any] | None:
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.qdrant_client import ( # noqa: PLC0415
get_qdrant_client,
)
qdrant_client = await get_qdrant_client()
@@ -431,8 +433,6 @@ async def user_info_html(request: Request) -> HTMLResponse:
# Check if user is admin (for Webhooks tab)
is_admin = False
try:
from nextcloud_mcp_server.auth.permissions import is_nextcloud_admin
# Get authenticated Nextcloud client
nc_client = await _get_authenticated_client_for_userinfo(request)
is_admin = await is_nextcloud_admin(request, nc_client._client)
@@ -471,8 +471,6 @@ async def user_info_html(request: Request) -> HTMLResponse:
# Get Nextcloud host for generating links to apps (used by viz tab)
# Use public issuer URL if available (for browser-accessible links),
# otherwise fall back to NEXTCLOUD_HOST from settings
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
nextcloud_host_for_links = (
os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") or settings.nextcloud_host
+6 -13
View File
@@ -18,16 +18,22 @@ from pathlib import Path
import anyio
import numpy as np
from jinja2 import Environment, FileSystemLoader
from qdrant_client.models import FieldCondition, Filter, MatchValue
from starlette.authentication import requires
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding.service import get_embedding_service
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm,
)
from nextcloud_mcp_server.search.context import get_chunk_with_context
from nextcloud_mcp_server.vector.pca import PCA
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
@@ -137,10 +143,6 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Get authenticated HTTP client from session
# In BasicAuth mode: uses username/password from session
# In OAuth mode: uses access token from session
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
with trace_operation("vector_viz.get_auth_client"):
auth_client_ctx = await _get_authenticated_client_for_userinfo(request)
@@ -353,8 +355,6 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
else:
# Fallback: generate embedding if not available from search
from nextcloud_mcp_server.embedding.service import get_embedding_service
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
logger.info(f"Generated query embedding (dimension={len(query_embedding)})")
@@ -555,11 +555,6 @@ async def chunk_context_endpoint(request: Request) -> JSONResponse:
doc_id_int = int(doc_id)
# Get authenticated Nextcloud client
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
from nextcloud_mcp_server.search.context import get_chunk_with_context
# Use context expansion module to fetch chunk with surrounding context
async with await _get_authenticated_client_for_userinfo(request) as nc_client:
chunk_context = await get_chunk_with_context(
@@ -594,8 +589,6 @@ async def chunk_context_endpoint(request: Request) -> JSONResponse:
page_number = None
if doc_type == "file":
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
settings = get_settings()
qdrant_client = await get_qdrant_client()
username = request.user.display_name
+7 -10
View File
@@ -6,6 +6,13 @@ import uvicorn
from nextcloud_mcp_server.config import (
get_settings,
)
from nextcloud_mcp_server.migrations import (
create_migration,
downgrade_database,
get_current_revision,
show_migration_history,
upgrade_database,
)
from nextcloud_mcp_server.observability import get_uvicorn_logging_config
from .app import get_app
@@ -289,8 +296,6 @@ def upgrade(database_path: str, revision: str):
# Use custom database path
$ nextcloud-mcp-server db upgrade -d /path/to/tokens.db
"""
from nextcloud_mcp_server.migrations import upgrade_database
try:
click.echo(f"Upgrading database to revision: {revision}")
upgrade_database(database_path, revision)
@@ -335,8 +340,6 @@ def downgrade(database_path: str, revision: str):
# Downgrade to base (empty database)
$ nextcloud-mcp-server db downgrade --revision base
"""
from nextcloud_mcp_server.migrations import downgrade_database
try:
click.echo(f"Downgrading database to revision: {revision}")
downgrade_database(database_path, revision)
@@ -362,8 +365,6 @@ def current(database_path: str):
Example:
$ nextcloud-mcp-server db current
"""
from nextcloud_mcp_server.migrations import get_current_revision
try:
revision = get_current_revision(database_path)
if revision:
@@ -397,8 +398,6 @@ def history(database_path: str):
Example:
$ nextcloud-mcp-server db history
"""
from nextcloud_mcp_server.migrations import show_migration_history
try:
click.echo("Migration history:")
show_migration_history(database_path)
@@ -421,8 +420,6 @@ def migrate(message: str):
Note: You must manually edit the generated migration file to add SQL statements.
"""
from nextcloud_mcp_server.migrations import create_migration
try:
click.echo(f"Creating new migration: {message}")
create_migration(message)
+1 -1
View File
@@ -113,7 +113,7 @@ class NextcloudClient:
Returns:
NextcloudClient configured with bearer token authentication
"""
from ..auth import BearerAuth
from ..auth import BearerAuth # noqa: PLC0415
logger.info(f"Creating NC Client for user '{username}' using OAuth token")
return cls(base_url=base_url, username=username, auth=BearerAuth(token))
+4 -19
View File
@@ -6,12 +6,14 @@ import uuid
from typing import Any, Dict, List, Optional
import anyio
from caldav.async_collection import AsyncCalendar
from caldav.async_collection import AsyncCalendar, AsyncEvent
from caldav.async_davclient import AsyncDAVClient
from caldav.elements import cdav, dav
from httpx import Auth
from icalendar import Alarm, Calendar, vRecur
from icalendar import Alarm, Calendar, vDDDTypes, vRecur
from icalendar import Event as ICalEvent
from icalendar import Todo as ICalTodo
from lxml import etree # type: ignore[import-untyped]
from ..config import get_nextcloud_ssl_verify
@@ -103,8 +105,6 @@ class CalendarClient:
# Use custom PROPFIND with CalendarServer namespace (cs:) for calendar-color.
# caldav library's nsmap lacks "CS" namespace, and its CalendarColor uses
# Apple iCal namespace which Nextcloud doesn't recognize.
from lxml import etree # type: ignore[import-untyped]
propfind_body = """<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:" xmlns:cs="http://calendarserver.org/ns/" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop>
@@ -301,10 +301,6 @@ class CalendarClient:
end_datetime: Optional[dt.datetime] = None,
) -> list:
"""Execute a CalDAV REPORT with time-range filter."""
from caldav.async_collection import AsyncEvent
from caldav.elements import cdav, dav
from lxml import etree # type: ignore[import-untyped]
# Ensure naive datetimes are treated as UTC
if start_datetime and start_datetime.tzinfo is None:
start_datetime = start_datetime.replace(tzinfo=dt.UTC)
@@ -889,8 +885,6 @@ class CalendarClient:
component["DTEND"] = end_dt
# Update timestamps
from icalendar import vDDDTypes
now = dt.datetime.now(dt.UTC)
component["LAST-MODIFIED"] = vDDDTypes(now)
component["DTSTAMP"] = vDDDTypes(now)
@@ -955,24 +949,18 @@ class CalendarClient:
# Due date
due = todo_data.get("due", "")
if due:
from icalendar import vDDDTypes
due_dt = self._ensure_timezone_aware(due)
todo.add("due", vDDDTypes(due_dt))
# Start date
dtstart = todo_data.get("dtstart", "")
if dtstart:
from icalendar import vDDDTypes
start_dt = self._ensure_timezone_aware(dtstart)
todo.add("dtstart", vDDDTypes(start_dt))
# Completed timestamp
completed = todo_data.get("completed", "")
if completed:
from icalendar import vDDDTypes
completed_dt = self._ensure_timezone_aware(completed)
todo.add("completed", vDDDTypes(completed_dt))
@@ -1061,9 +1049,6 @@ class CalendarClient:
component["PERCENT-COMPLETE"] = percent_value
logger.debug(f"Set PERCENT-COMPLETE to {percent_value}")
# Import vDDDTypes at the beginning for datetime formatting
from icalendar import vDDDTypes
# Handle due date
if "due" in todo_data:
due_str = todo_data["due"]
+1 -9
View File
@@ -2,6 +2,7 @@
import logging
from typing import Any, Dict, List
from urllib.parse import quote
from httpx import Timeout
@@ -164,9 +165,6 @@ class CookbookClient(BaseNextcloudClient):
Returns:
List of matching recipe stubs
"""
# URL encode the query
from urllib.parse import quote
encoded_query = quote(query)
response = await self._make_request(
"GET", f"/apps/cookbook/api/v1/search/{encoded_query}"
@@ -193,8 +191,6 @@ class CookbookClient(BaseNextcloudClient):
Returns:
List of recipe stubs in the category
"""
from urllib.parse import quote
encoded_category = quote(category)
response = await self._make_request(
"GET", f"/apps/cookbook/api/v1/category/{encoded_category}"
@@ -211,8 +207,6 @@ class CookbookClient(BaseNextcloudClient):
Returns:
New category name
"""
from urllib.parse import quote
encoded_old_name = quote(old_name)
response = await self._make_request(
"PUT",
@@ -241,8 +235,6 @@ class CookbookClient(BaseNextcloudClient):
Returns:
List of recipe stubs matching the keywords
"""
from urllib.parse import quote
# Join keywords with commas
keywords_str = ",".join(keywords)
encoded_keywords = quote(keywords_str)
+1 -5
View File
@@ -4,6 +4,7 @@ import logging
from typing import Any, AsyncIterator, Dict, Optional
from .base import BaseNextcloudClient
from .webdav import WebDAVClient
logger = logging.getLogger(__name__)
@@ -157,9 +158,6 @@ class NotesClient(BaseNextcloudClient):
f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory"
)
try:
# Import here to avoid circular imports
from .webdav import WebDAVClient
webdav_client = WebDAVClient(self._client, self.username)
await webdav_client.cleanup_old_attachment_directory(
note_id=note_id, old_category=old_note.get("category", "")
@@ -204,8 +202,6 @@ class NotesClient(BaseNextcloudClient):
# Clean up attachment directories
try:
from .webdav import WebDAVClient
webdav_client = WebDAVClient(self._client, self.username)
for cat in potential_categories:
+2 -4
View File
@@ -3,7 +3,9 @@
import logging
import mimetypes
import xml.etree.ElementTree as ET
from email.utils import parsedate_to_datetime
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import unquote
from httpx import HTTPStatusError
@@ -1259,8 +1261,6 @@ class WebDAVClient(BaseNextcloudClient):
continue
# Decode href path and extract the file path
from urllib.parse import unquote
href_path = unquote(href_elem.text)
# Remove WebDAV prefix to get user-relative path
webdav_prefix = f"/remote.php/dav/files/{self.username}/"
@@ -1269,8 +1269,6 @@ class WebDAVClient(BaseNextcloudClient):
# Parse last modified timestamp
last_modified_timestamp = None
if lastmodified_elem is not None and lastmodified_elem.text:
from email.utils import parsedate_to_datetime
try:
dt = parsedate_to_datetime(lastmodified_elem.text)
last_modified_timestamp = int(dt.timestamp())
+1 -3
View File
@@ -268,9 +268,7 @@ class Settings:
"This is insecure and should only be used for development/testing."
)
if self.nextcloud_ca_bundle:
import os as _os
if not _os.path.isfile(self.nextcloud_ca_bundle):
if not os.path.isfile(self.nextcloud_ca_bundle):
raise ValueError(
f"NEXTCLOUD_CA_BUNDLE path does not exist: {self.nextcloud_ca_bundle}"
)
+5 -6
View File
@@ -5,6 +5,10 @@ import logging
from httpx import BasicAuth
from mcp.server.fastmcp import Context
from nextcloud_mcp_server.auth.context_helper import (
get_client_from_context,
get_session_client_from_context,
)
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import (
DeploymentMode,
@@ -80,11 +84,6 @@ async def get_client(ctx: Context) -> NextcloudClient:
# OAuth mode (has 'nextcloud_host' attribute)
if hasattr(lifespan_ctx, "nextcloud_host"):
from nextcloud_mcp_server.auth.context_helper import (
get_client_from_context,
get_session_client_from_context,
)
if settings.enable_token_exchange:
# Mode 2: Exchange MCP token for Nextcloud token
# Token was validated to have MCP audience in UnifiedTokenVerifier
@@ -131,7 +130,7 @@ def _get_client_from_session_config(ctx: Context) -> NextcloudClient:
ValueError: If required session config fields are missing
"""
# ADR-016: Get session config from context variable (set by SmitheryConfigMiddleware)
from nextcloud_mcp_server.app import get_smithery_session_config
from nextcloud_mcp_server.app import get_smithery_session_config # noqa: PLC0415
session_config = get_smithery_session_config()
+1 -2
View File
@@ -11,6 +11,7 @@ from pathlib import Path
from alembic.config import Config
import nextcloud_mcp_server.alembic as alembic_package
from alembic import command
logger = logging.getLogger(__name__)
@@ -30,8 +31,6 @@ def get_alembic_config(database_path: str | Path | None = None) -> Config:
Returns:
Alembic Config object configured for the specified database
"""
from nextcloud_mcp_server import alembic as alembic_package
# Use package location (works in both editable and installed modes)
if alembic_package.__file__ is None:
raise RuntimeError("alembic package __file__ is None")
@@ -25,6 +25,8 @@ from prometheus_client import (
start_http_server,
)
from nextcloud_mcp_server.observability.tracing import trace_operation
logger = logging.getLogger(__name__)
# =============================================================================
@@ -426,8 +428,6 @@ def instrument_tool(func):
Wrapped function with metrics and tracing instrumentation
"""
from nextcloud_mcp_server.observability.tracing import trace_operation
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
+4 -24
View File
@@ -9,8 +9,12 @@ from dataclasses import dataclass
import pymupdf
import pymupdf4llm
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.html_processor import html_to_markdown
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -34,11 +38,6 @@ async def _get_chunk_from_qdrant(
Full chunk text from Qdrant excerpt field, or None if not found
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
@@ -104,11 +103,6 @@ async def _get_chunk_by_index_from_qdrant(
Full chunk text from Qdrant excerpt field, or None if not found
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
@@ -165,11 +159,6 @@ async def _get_file_path_from_qdrant(
File path string, or None if not found in Qdrant
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
@@ -225,11 +214,6 @@ async def _get_deck_metadata_from_qdrant(
Dictionary with board_id and stack_id, or None if not found
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
@@ -355,8 +339,6 @@ async def get_chunk_with_context(
# Fetch adjacent chunks for context expansion
# Get chunk overlap from config to remove duplicate text
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
chunk_overlap = settings.document_chunk_overlap
@@ -587,8 +569,6 @@ async def _fetch_document_text(
return None
elif doc_type == "news_item":
# Fetch news item by ID
from nextcloud_mcp_server.vector.html_processor import html_to_markdown
item = await nc_client.news.get_item(int(doc_id))
# Reconstruct full content as indexed: title + source + URL + body
# This ensures chunk offsets align with indexed content structure
+3 -10
View File
@@ -12,11 +12,14 @@ import logging
import re
import shutil
import tempfile
from collections import defaultdict
from io import BytesIO
from pathlib import Path
from typing import Optional
import pymupdf
import pymupdf4llm
from PIL import Image, ImageDraw
logger = logging.getLogger(__name__)
@@ -682,8 +685,6 @@ class PDFHighlighter:
# Clean up temp directory and PDF file
if temp_pdf_path and temp_pdf_path.parent.exists():
try:
import shutil
shutil.rmtree(temp_pdf_path.parent)
except Exception as e:
logger.warning(
@@ -720,11 +721,6 @@ class PDFHighlighter:
Dict mapping chunk_index to (png_bytes, page_number, highlight_count)
Chunks that fail to highlight are omitted from the result.
"""
import shutil
import tempfile
from collections import defaultdict
from pathlib import Path
results: dict[int, tuple[bytes, int, int]] = {}
if not chunks:
@@ -798,9 +794,6 @@ class PDFHighlighter:
# OPTIMIZATION: Render each page ONCE, then draw highlights using PIL
# This avoids expensive page.get_pixmap() calls per chunk
from io import BytesIO
from PIL import Image, ImageDraw
# PIL color for bounding box (RGB tuple)
rgb = PDFHighlighter.COLORS.get(color, PDFHighlighter.COLORS["yellow"])
+3 -9
View File
@@ -8,6 +8,7 @@ Nextcloud access using the Flow 2 (Resource Provisioning) OAuth flow.
import logging
import os
import secrets
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlencode
@@ -19,9 +20,11 @@ from mcp.types import ToolAnnotations
from pydantic import BaseModel, Field
from nextcloud_mcp_server.auth import require_scopes
from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.auth.userinfo_routes import _query_idp_userinfo
from nextcloud_mcp_server.config import get_settings
from ..http import nextcloud_httpx_client
@@ -157,11 +160,6 @@ async def get_provisioning_status(ctx: Context, user_id: str) -> ProvisioningSta
Returns:
ProvisioningStatus with current provisioning state
"""
from datetime import datetime, timezone
from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
# Check for app password first (interim solution)
@@ -305,8 +303,6 @@ async def provision_nextcloud_access(
# Get configuration using settings (handles both ENABLE_BACKGROUND_OPERATIONS
# and ENABLE_OFFLINE_ACCESS environment variables)
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if not settings.enable_offline_access:
return ProvisioningResult(
@@ -490,8 +486,6 @@ async def check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
# Not logged in - generate OAuth URL for Flow 2
# Use settings (handles both ENABLE_BACKGROUND_OPERATIONS and ENABLE_OFFLINE_ACCESS)
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if not settings.enable_offline_access:
return (
+6 -15
View File
@@ -7,15 +7,19 @@ from httpx import RequestError
from mcp.server.fastmcp import Context, FastMCP
from mcp.shared.exceptions import McpError
from mcp.types import (
ClientCapabilities,
ErrorData,
ModelHint,
ModelPreferences,
SamplingCapability,
SamplingMessage,
TextContent,
ToolAnnotations,
)
from qdrant_client.models import Filter
from nextcloud_mcp_server.auth import require_scopes
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.context import get_client
from nextcloud_mcp_server.models.semantic import (
SamplingSearchResponse,
@@ -28,6 +32,8 @@ from nextcloud_mcp_server.observability.metrics import (
)
from nextcloud_mcp_server.search.bm25_hybrid import BM25HybridSearchAlgorithm
from nextcloud_mcp_server.search.context import get_chunk_with_context
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -82,8 +88,6 @@ def configure_semantic_tools(mcp: FastMCP):
Returns:
SemanticSearchResponse with matching documents ranked by fusion scores
"""
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
client = await get_client(ctx)
username = client.username
@@ -373,8 +377,6 @@ def configure_semantic_tools(mcp: FastMCP):
)
# 3. Check if client supports sampling
from mcp.types import ClientCapabilities, SamplingCapability
client_has_sampling = ctx.session.check_client_capability(
ClientCapabilities(sampling=SamplingCapability())
)
@@ -658,8 +660,6 @@ def configure_semantic_tools(mcp: FastMCP):
"""
# Check if vector sync is enabled (supports both old and new env var names)
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
if not settings.vector_sync_enabled:
return VectorSyncStatusResponse(
@@ -694,15 +694,6 @@ def configure_semantic_tools(mcp: FastMCP):
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from qdrant_client.models import Filter
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.placeholder import (
get_placeholder_filter,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
# Count documents in collection, excluding placeholders
+1 -1
View File
@@ -36,7 +36,7 @@ def main():
logger.info("Starting Nextcloud MCP Server in Smithery stateless mode")
# Import app after setting environment variables
from nextcloud_mcp_server.app import get_app
from nextcloud_mcp_server.app import get_app # noqa: PLC0415
# Create the app with streamable-http transport (required for Smithery)
app = get_app(transport="streamable-http")
+1 -2
View File
@@ -21,6 +21,7 @@ import logging
import time
import uuid
from qdrant_client import models
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
from nextcloud_mcp_server.config import get_settings
@@ -82,8 +83,6 @@ async def write_placeholder_point(
# Create empty sparse vector for placeholders
# Use models.SparseVector with empty indices/values
from qdrant_client import models
empty_sparse = models.SparseVector(indices=[], values=[])
# Generate deterministic point ID
+3 -6
View File
@@ -17,6 +17,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.document_processors import get_registry
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
from nextcloud_mcp_server.observability.metrics import (
record_qdrant_operation,
@@ -24,7 +25,9 @@ from nextcloud_mcp_server.observability.metrics import (
update_vector_sync_queue_size,
)
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search.pdf_highlighter import PDFHighlighter
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
from nextcloud_mcp_server.vector.html_processor import html_to_markdown
from nextcloud_mcp_server.vector.placeholder import delete_placeholder_point
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.scanner import DocumentTask
@@ -275,8 +278,6 @@ async def _index_document(
content_bytes = None # Notes don't have binary content
content_type = None
elif doc_task.doc_type == "news_item":
from nextcloud_mcp_server.vector.html_processor import html_to_markdown
item = await nc_client.news.get_item(int(doc_task.doc_id))
# Convert HTML body to Markdown for better embedding
body_markdown = html_to_markdown(item.get("body", ""))
@@ -437,8 +438,6 @@ async def _index_document(
},
):
# Use document processor registry to extract text
from nextcloud_mcp_server.document_processors import get_registry
registry = get_registry()
try:
@@ -586,8 +585,6 @@ async def _index_document(
"vector_sync.pdf_size": len(content_bytes),
},
):
from nextcloud_mcp_server.search.pdf_highlighter import PDFHighlighter
# Build chunk data for batch processing
# Format: (chunk_index, start_offset, end_offset, page_number, chunk_text)
chunk_data: list[tuple[int, int, int, int | None, str]] = [
+1 -3
View File
@@ -6,6 +6,7 @@ from qdrant_client import AsyncQdrantClient, models
from qdrant_client.models import Distance, VectorParams
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
logger = logging.getLogger(__name__)
@@ -62,9 +63,6 @@ async def get_qdrant_client() -> AsyncQdrantClient:
# Get collection name (auto-generated from deployment ID + model)
collection_name = settings.get_collection_name()
# Import here to avoid circular dependency
from nextcloud_mcp_server.embedding import get_embedding_service
embedding_service = get_embedding_service()
# Detect dimension dynamically (for OllamaEmbeddingProvider)
+2 -4
View File
@@ -8,6 +8,7 @@ import os
import random
import time
from dataclasses import dataclass
from email.utils import parsedate_to_datetime
import anyio
from anyio.abc import TaskStatus
@@ -15,6 +16,7 @@ from anyio.streams.memory import MemoryObjectSendStream
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.client.news import NewsItemType
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.observability.metrics import record_vector_sync_scan
from nextcloud_mcp_server.observability.tracing import trace_operation
@@ -418,8 +420,6 @@ async def scan_user_documents(
modified_at = file_info.get("last_modified_timestamp", int(time.time()))
if isinstance(file_info.get("last_modified"), str):
# Parse RFC 2822 date format if needed
from email.utils import parsedate_to_datetime
try:
dt = parsedate_to_datetime(file_info["last_modified"])
modified_at = int(dt.timestamp())
@@ -615,8 +615,6 @@ async def scan_news_items(
Returns:
Number of items queued for processing
"""
from nextcloud_mcp_server.client.news import NewsItemType
settings = get_settings()
queued = 0
+4 -1
View File
@@ -106,7 +106,10 @@ changelog_pattern = "^(feat|fix|docs|refactor|perf|test|build|ci|chore)(?!\\((?:
schema_pattern = "^(feat|fix|docs|refactor|perf|test|build|ci|chore)(?!\\((?:helm)\\))(\\([^)]+\\))?(!)?:\\s.+"
[tool.ruff.lint]
extend-select = ["I"]
extend-select = ["I", "PLC0415"]
[tool.ruff.lint.per-file-ignores]
"tests/**" = ["PLC0415"]
[tool.uv.sources]
caldav = { git = "https://github.com/cbcoutinho/caldav", branch = "feature/httpx" }