diff --git a/docker-compose.yml b/docker-compose.yml
index e1cfd43..5dfecd9 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -94,7 +94,7 @@ services:
# 1. Network mode: Set QDRANT_URL=http://qdrant:6333 (requires qdrant service)
# 2. In-memory mode: Set QDRANT_LOCATION=:memory: (default if nothing set)
# 3. Persistent local: Set QDRANT_LOCATION=/app/data/qdrant (stored in mcp-data volume)
- - QDRANT_LOCATION=":memory:" # In-memory mode for CI/testing (no external service required)
+ #- QDRANT_LOCATION=/app/data/qdrant # In-memory mode used if not set
#- QDRANT_URL=http://qdrant:6333 # Uncomment for network mode
#- QDRANT_API_KEY=${QDRANT_API_KEY:-my_secret_api_key} # Only for network mode
diff --git a/docs/ADR-010-webhook-based-vector-sync.md b/docs/ADR-010-webhook-based-vector-sync.md
index d276319..cf8c91b 100644
--- a/docs/ADR-010-webhook-based-vector-sync.md
+++ b/docs/ADR-010-webhook-based-vector-sync.md
@@ -43,7 +43,8 @@ The webhook_listeners app supports events for all Nextcloud apps relevant to thi
**Files/Notes Events** (notes are stored as files):
- `OCP\Files\Events\Node\NodeCreatedEvent`
- `OCP\Files\Events\Node\NodeWrittenEvent`
-- `OCP\Files\Events\Node\NodeDeletedEvent`
+- `OCP\Files\Events\Node\BeforeNodeDeletedEvent` ⭐ **Use this for deletion (includes node.id)**
+- `OCP\Files\Events\Node\NodeDeletedEvent` (missing node.id - file already deleted)
- `OCP\Files\Events\Node\NodeRenamedEvent`
- `OCP\Files\Events\Node\NodeCopiedEvent`
@@ -228,8 +229,9 @@ def extract_document_task(event_class: str, payload: dict) -> DocumentTask | Non
modified_at=event_data["objectData"]["lastmodified"],
)
- # Deletion events
- elif "NodeDeletedEvent" in event_class or \
+ # Deletion events (use BeforeNodeDeletedEvent for files to get node.id)
+ elif "BeforeNodeDeletedEvent" in event_class or \
+ "NodeDeletedEvent" in event_class or \
"CalendarObjectDeletedEvent" in event_class:
# Similar logic for delete operations
...
@@ -455,7 +457,14 @@ Manual validation of Nextcloud webhook schemas and behavior confirmed that webho
**Impact:** The event parser in this ADR's example code assumes `event_data["node"]["id"]` exists for all file events. This will fail for deletions.
-**Required Fix:** Check for `id` existence and fall back to path-based identification:
+**Update (2025-11-11):** Nextcloud maintainer clarified that `BeforeNodeDeletedEvent` should be used instead of `NodeDeletedEvent` to access `node.id` before the file is deleted. See [issue #56371](https://github.com/nextcloud/server/issues/56371#issuecomment-2470896634).
+
+> "Try using the `BeforeNodeDeletedEvent`. The `id` should still be available at that time. The reason `id` is not in `NodeDeletedEvent` is because the file is effectively guaranteed to be gone and, in turn, so is the FileInfo."
+> — Josh Richards, Nextcloud maintainer
+
+**Recommended Solution:** Use `OCP\Files\Events\Node\BeforeNodeDeletedEvent` for file deletion webhooks instead of `NodeDeletedEvent`.
+
+**Alternative Fix (if using NodeDeletedEvent):** Check for `id` existence and fall back to path-based identification:
```python
def extract_document_task(event_class: str, payload: dict) -> DocumentTask | None:
diff --git a/nextcloud_mcp_server/auth/permissions.py b/nextcloud_mcp_server/auth/permissions.py
new file mode 100644
index 0000000..551d201
--- /dev/null
+++ b/nextcloud_mcp_server/auth/permissions.py
@@ -0,0 +1,54 @@
+"""Permission checking utilities for Nextcloud admin operations."""
+
+import logging
+
+from httpx import AsyncClient
+from starlette.requests import Request
+
+from nextcloud_mcp_server.client.users import UsersClient
+
+logger = logging.getLogger(__name__)
+
+
+async def is_nextcloud_admin(request: Request, http_client: AsyncClient) -> bool:
+ """Check if the authenticated user is a Nextcloud administrator.
+
+ This function extracts the username from the session/request context
+ and checks if the user is a member of the "admin" group in Nextcloud.
+
+ Args:
+ request: Starlette request object with authenticated user
+ http_client: Authenticated HTTP client for Nextcloud API calls
+
+ Returns:
+ True if user is admin, False otherwise
+
+ Example:
+ ```python
+ if await is_nextcloud_admin(request, http_client):
+ # Show admin-only features
+ pass
+ ```
+ """
+ try:
+ # Extract username from authenticated session
+ username = request.user.display_name
+ if not username:
+ logger.warning("No username found in authenticated session")
+ return False
+
+ # Query Nextcloud for user's group memberships
+ users_client = UsersClient(http_client, username)
+ user_groups = await users_client.get_user_groups(username)
+
+ # Check if user is in the admin group
+ is_admin = "admin" in user_groups
+ logger.debug(
+ f"Admin check for user '{username}': {is_admin} (groups: {user_groups})"
+ )
+
+ return is_admin
+
+ except Exception as e:
+ logger.error(f"Error checking admin permissions: {e}", exc_info=True)
+ return False
diff --git a/nextcloud_mcp_server/auth/userinfo_routes.py b/nextcloud_mcp_server/auth/userinfo_routes.py
index 09a870c..7f1f5b2 100644
--- a/nextcloud_mcp_server/auth/userinfo_routes.py
+++ b/nextcloud_mcp_server/auth/userinfo_routes.py
@@ -19,6 +19,57 @@ from starlette.responses import HTMLResponse, JSONResponse
logger = logging.getLogger(__name__)
+async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.AsyncClient:
+ """Get an authenticated HTTP client for user info page operations.
+
+ Args:
+ request: Starlette request object
+
+ Returns:
+ Authenticated httpx.AsyncClient
+ """
+ oauth_ctx = getattr(request.app.state, "oauth_context", None)
+
+ # BasicAuth mode - use credentials from environment
+ if not oauth_ctx:
+ nextcloud_host = os.getenv("NEXTCLOUD_HOST")
+ username = os.getenv("NEXTCLOUD_USERNAME")
+ password = os.getenv("NEXTCLOUD_PASSWORD")
+
+ if not all([nextcloud_host, username, password]):
+ raise RuntimeError("BasicAuth credentials not configured")
+
+ assert nextcloud_host is not None # Type narrowing for type checker
+ return httpx.AsyncClient(
+ base_url=nextcloud_host,
+ auth=(username, password),
+ timeout=30.0,
+ )
+
+ # OAuth mode - get token from session
+ storage = oauth_ctx.get("storage")
+ session_id = request.cookies.get("mcp_session")
+
+ if not storage or not session_id:
+ raise RuntimeError("Session not found")
+
+ token_data = await storage.get_refresh_token(session_id)
+ if not token_data or "access_token" not in token_data:
+ raise RuntimeError("No access token found in session")
+
+ access_token = token_data["access_token"]
+ nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
+
+ if not nextcloud_host:
+ raise RuntimeError("Nextcloud host not configured")
+
+ return httpx.AsyncClient(
+ base_url=nextcloud_host,
+ headers={"Authorization": f"Bearer {access_token}"},
+ timeout=30.0,
+ )
+
+
async def _get_processing_status(request: Request) -> dict[str, Any] | None:
"""Get vector sync processing status.
@@ -296,6 +347,19 @@ async def user_info_html(request: Request) -> HTMLResponse:
# Get vector sync processing status
processing_status = await _get_processing_status(request)
+ # Check if user is admin (for Webhooks tab)
+ is_admin = False
+ try:
+ from nextcloud_mcp_server.auth.permissions import is_nextcloud_admin
+
+ # Get authenticated HTTP client
+ http_client = await _get_authenticated_client_for_userinfo(request)
+ is_admin = await is_nextcloud_admin(request, http_client)
+ await http_client.aclose()
+ except Exception as e:
+ logger.warning(f"Failed to check admin status: {e}")
+ # Default to not admin if check fails
+
# Check for error
if "error" in user_context and user_context["error"] != "":
# Get login URL dynamically
@@ -506,17 +570,61 @@ async def user_info_html(request: Request) -> HTMLResponse:
-
Nextcloud MCP Server - User Info
+
+
Nextcloud MCP Server
-
Authentication
-
-
- Username
- {username}
-
-
- Authentication Mode
- {auth_mode}
-
-
+
+
+
+ User Info
+
+ {
+ ""
+ if not show_vector_sync_tab
+ else '''
+
+ Vector Sync
+
+ '''
+ }
+ {
+ ""
+ if not show_webhooks_tab
+ else '''
+
+ Webhooks
+
+ '''
+ }
+
- {host_info_html}
- {session_info_html}
- {vector_status_html}
- {idp_profile_html}
+
+
+
+
+ {user_info_tab_html}
+
- {f'
' if auth_mode == "oauth" else ""}
+ {
+ ""
+ if not show_vector_sync_tab
+ else f'''
+
+
+ {vector_sync_tab_html}
+
+ '''
+ }
+
+ {
+ ""
+ if not show_webhooks_tab
+ else f'''
+
+
+ {webhooks_tab_html}
+
+ '''
+ }
+
+
+ {
+ f'
'
+ if auth_mode == "oauth"
+ else ""
+ }
diff --git a/nextcloud_mcp_server/cli.py b/nextcloud_mcp_server/cli.py
new file mode 100644
index 0000000..3b93cae
--- /dev/null
+++ b/nextcloud_mcp_server/cli.py
@@ -0,0 +1,257 @@
+import os
+
+import click
+import uvicorn
+
+from nextcloud_mcp_server.config import (
+ get_settings,
+)
+from nextcloud_mcp_server.observability import get_uvicorn_logging_config
+
+from .app import get_app
+
+
+@click.command()
+@click.option(
+ "--host", "-h", default="127.0.0.1", show_default=True, help="Server host"
+)
+@click.option(
+ "--port", "-p", type=int, default=8000, show_default=True, help="Server port"
+)
+@click.option(
+ "--log-level",
+ "-l",
+ default="info",
+ show_default=True,
+ type=click.Choice(["critical", "error", "warning", "info", "debug", "trace"]),
+ help="Logging level",
+)
+@click.option(
+ "--transport",
+ "-t",
+ default="sse",
+ show_default=True,
+ type=click.Choice(["sse", "streamable-http", "http"]),
+ help="MCP transport protocol",
+)
+@click.option(
+ "--enable-app",
+ "-e",
+ multiple=True,
+ type=click.Choice(
+ ["notes", "tables", "webdav", "calendar", "contacts", "cookbook", "deck"]
+ ),
+ help="Enable specific Nextcloud app APIs. Can be specified multiple times. If not specified, all apps are enabled.",
+)
+@click.option(
+ "--oauth/--no-oauth",
+ default=None,
+ help="Force OAuth mode (if enabled) or BasicAuth mode (if disabled). By default, auto-detected based on environment variables.",
+)
+@click.option(
+ "--oauth-client-id",
+ envvar="NEXTCLOUD_OIDC_CLIENT_ID",
+ help="OAuth client ID (can also use NEXTCLOUD_OIDC_CLIENT_ID env var)",
+)
+@click.option(
+ "--oauth-client-secret",
+ envvar="NEXTCLOUD_OIDC_CLIENT_SECRET",
+ help="OAuth client secret (can also use NEXTCLOUD_OIDC_CLIENT_SECRET env var)",
+)
+@click.option(
+ "--mcp-server-url",
+ envvar="NEXTCLOUD_MCP_SERVER_URL",
+ default="http://localhost:8000",
+ show_default=True,
+ help="MCP server URL for OAuth callbacks (can also use NEXTCLOUD_MCP_SERVER_URL env var)",
+)
+@click.option(
+ "--nextcloud-host",
+ envvar="NEXTCLOUD_HOST",
+ help="Nextcloud instance URL (can also use NEXTCLOUD_HOST env var)",
+)
+@click.option(
+ "--nextcloud-username",
+ envvar="NEXTCLOUD_USERNAME",
+ help="Nextcloud username for BasicAuth (can also use NEXTCLOUD_USERNAME env var)",
+)
+@click.option(
+ "--nextcloud-password",
+ envvar="NEXTCLOUD_PASSWORD",
+ help="Nextcloud password for BasicAuth (can also use NEXTCLOUD_PASSWORD env var)",
+)
+@click.option(
+ "--oauth-scopes",
+ envvar="NEXTCLOUD_OIDC_SCOPES",
+ default="openid profile email notes:read notes:write calendar:read calendar:write todo:read todo:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write",
+ show_default=True,
+ help="OAuth scopes to request during client registration. These define the maximum allowed scopes for the client. Note: Actual supported scopes are discovered dynamically from MCP tools at runtime. (can also use NEXTCLOUD_OIDC_SCOPES env var)",
+)
+@click.option(
+ "--oauth-token-type",
+ envvar="NEXTCLOUD_OIDC_TOKEN_TYPE",
+ default="bearer",
+ show_default=True,
+ type=click.Choice(["bearer", "jwt"], case_sensitive=False),
+ help="OAuth token type (can also use NEXTCLOUD_OIDC_TOKEN_TYPE env var)",
+)
+@click.option(
+ "--public-issuer-url",
+ envvar="NEXTCLOUD_PUBLIC_ISSUER_URL",
+ help="Public issuer URL for OAuth (can also use NEXTCLOUD_PUBLIC_ISSUER_URL env var)",
+)
+def run(
+ host: str,
+ port: int,
+ log_level: str,
+ transport: str,
+ enable_app: tuple[str, ...],
+ oauth: bool | None,
+ oauth_client_id: str | None,
+ oauth_client_secret: str | None,
+ mcp_server_url: str,
+ nextcloud_host: str | None,
+ nextcloud_username: str | None,
+ nextcloud_password: str | None,
+ oauth_scopes: str,
+ oauth_token_type: str,
+ public_issuer_url: str | None,
+):
+ """
+ Run the Nextcloud MCP server.
+
+ \b
+ Authentication Modes:
+ - BasicAuth: Set NEXTCLOUD_USERNAME and NEXTCLOUD_PASSWORD
+ - OAuth: Leave USERNAME/PASSWORD unset (requires OIDC app enabled)
+
+ \b
+ Examples:
+ # BasicAuth mode with CLI options
+ $ nextcloud-mcp-server --nextcloud-host=https://cloud.example.com \\
+ --nextcloud-username=admin --nextcloud-password=secret
+
+ # BasicAuth mode with env vars (recommended for credentials)
+ $ export NEXTCLOUD_HOST=https://cloud.example.com
+ $ export NEXTCLOUD_USERNAME=admin
+ $ export NEXTCLOUD_PASSWORD=secret
+ $ nextcloud-mcp-server --host 0.0.0.0 --port 8000
+
+ # OAuth mode with auto-registration
+ $ nextcloud-mcp-server --nextcloud-host=https://cloud.example.com --oauth
+
+ # OAuth mode with pre-configured client
+ $ nextcloud-mcp-server --nextcloud-host=https://cloud.example.com --oauth \\
+ --oauth-client-id=xxx --oauth-client-secret=yyy
+
+ # OAuth mode with custom scopes and JWT tokens
+ $ nextcloud-mcp-server --nextcloud-host=https://cloud.example.com --oauth \\
+ --oauth-scopes="openid notes:read notes:write" --oauth-token-type=jwt
+
+ # OAuth with public issuer URL (for Docker/proxy setups)
+ $ nextcloud-mcp-server --nextcloud-host=http://app --oauth \\
+ --public-issuer-url=http://localhost:8080
+ """
+ # Set env vars from CLI options if provided
+ if nextcloud_host:
+ os.environ["NEXTCLOUD_HOST"] = nextcloud_host
+ if nextcloud_username:
+ os.environ["NEXTCLOUD_USERNAME"] = nextcloud_username
+ if nextcloud_password:
+ os.environ["NEXTCLOUD_PASSWORD"] = nextcloud_password
+ if oauth_client_id:
+ os.environ["NEXTCLOUD_OIDC_CLIENT_ID"] = oauth_client_id
+ if oauth_client_secret:
+ os.environ["NEXTCLOUD_OIDC_CLIENT_SECRET"] = oauth_client_secret
+ if oauth_scopes:
+ os.environ["NEXTCLOUD_OIDC_SCOPES"] = oauth_scopes
+ if oauth_token_type:
+ os.environ["NEXTCLOUD_OIDC_TOKEN_TYPE"] = oauth_token_type
+ if mcp_server_url:
+ os.environ["NEXTCLOUD_MCP_SERVER_URL"] = mcp_server_url
+ if public_issuer_url:
+ os.environ["NEXTCLOUD_PUBLIC_ISSUER_URL"] = public_issuer_url
+
+ # Force OAuth mode if explicitly requested
+ if oauth is True:
+ # Clear username/password to force OAuth mode
+ if "NEXTCLOUD_USERNAME" in os.environ:
+ click.echo(
+ "Warning: --oauth flag set, ignoring NEXTCLOUD_USERNAME", err=True
+ )
+ del os.environ["NEXTCLOUD_USERNAME"]
+ if "NEXTCLOUD_PASSWORD" in os.environ:
+ click.echo(
+ "Warning: --oauth flag set, ignoring NEXTCLOUD_PASSWORD", err=True
+ )
+ del os.environ["NEXTCLOUD_PASSWORD"]
+
+ # Validate OAuth configuration
+ nextcloud_host = os.getenv("NEXTCLOUD_HOST")
+ if not nextcloud_host:
+ raise click.ClickException(
+ "OAuth mode requires NEXTCLOUD_HOST environment variable to be set"
+ )
+
+ # Check if we have client credentials OR if dynamic registration is possible
+ has_client_creds = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID") and os.getenv(
+ "NEXTCLOUD_OIDC_CLIENT_SECRET"
+ )
+
+ if not has_client_creds:
+ # No client credentials - will attempt dynamic registration
+ # Show helpful message before server starts
+ click.echo("", err=True)
+ click.echo("OAuth Configuration:", err=True)
+ click.echo(" Mode: Dynamic Client Registration", err=True)
+ click.echo(" Host: " + nextcloud_host, err=True)
+ click.echo(" Storage: SQLite (TOKEN_STORAGE_DB)", err=True)
+ click.echo("", err=True)
+ click.echo(
+ "Note: Make sure 'Dynamic Client Registration' is enabled", err=True
+ )
+ click.echo(" in your Nextcloud OIDC app settings.", err=True)
+ click.echo("", err=True)
+ else:
+ click.echo("", err=True)
+ click.echo("OAuth Configuration:", err=True)
+ click.echo(" Mode: Pre-configured Client", err=True)
+ click.echo(" Host: " + nextcloud_host, err=True)
+ click.echo(
+ " Client ID: "
+ + os.getenv("NEXTCLOUD_OIDC_CLIENT_ID", "")[:16]
+ + "...",
+ err=True,
+ )
+ click.echo("", err=True)
+
+ elif oauth is False:
+ # Force BasicAuth mode - verify credentials exist
+ if not os.getenv("NEXTCLOUD_USERNAME") or not os.getenv("NEXTCLOUD_PASSWORD"):
+ raise click.ClickException(
+ "--no-oauth flag set but NEXTCLOUD_USERNAME or NEXTCLOUD_PASSWORD not set"
+ )
+
+ enabled_apps = list(enable_app) if enable_app else None
+
+ app = get_app(transport=transport, enabled_apps=enabled_apps)
+
+ # Get observability settings and create uvicorn logging config
+ settings = get_settings()
+ uvicorn_log_config = get_uvicorn_logging_config(
+ log_format=settings.log_format,
+ log_level=settings.log_level,
+ include_trace_context=settings.log_include_trace_context,
+ )
+
+ uvicorn.run(
+ app=app,
+ host=host,
+ port=port,
+ log_level=log_level,
+ log_config=uvicorn_log_config,
+ )
+
+
+if __name__ == "__main__":
+ run()
diff --git a/nextcloud_mcp_server/client/__init__.py b/nextcloud_mcp_server/client/__init__.py
index cae6c07..29dfc36 100644
--- a/nextcloud_mcp_server/client/__init__.py
+++ b/nextcloud_mcp_server/client/__init__.py
@@ -23,6 +23,7 @@ from .sharing import SharingClient
from .tables import TablesClient
from .users import UsersClient
from .webdav import WebDAVClient
+from .webhooks import WebhooksClient
logger = logging.getLogger(__name__)
@@ -83,6 +84,7 @@ class NextcloudClient:
self.users = UsersClient(self._client, username)
self.groups = GroupsClient(self._client, username)
self.sharing = SharingClient(self._client, username)
+ self.webhooks = WebhooksClient(self._client, username)
# Initialize controllers
self._notes_search = NotesSearchController()
diff --git a/nextcloud_mcp_server/client/webhooks.py b/nextcloud_mcp_server/client/webhooks.py
new file mode 100644
index 0000000..e1b206b
--- /dev/null
+++ b/nextcloud_mcp_server/client/webhooks.py
@@ -0,0 +1,109 @@
+"""Client for Nextcloud Webhook Listeners API operations."""
+
+from typing import Any, Dict, List, Optional
+
+from nextcloud_mcp_server.client.base import BaseNextcloudClient
+
+
+class WebhooksClient(BaseNextcloudClient):
+ """Client for Nextcloud webhook_listeners app API operations."""
+
+ app_name = "webhooks"
+
+ def _get_webhook_headers(
+ self, additional_headers: Optional[Dict[str, str]] = None
+ ) -> Dict[str, str]:
+ """Get standard headers required for Webhook Listeners API calls."""
+ headers = {"OCS-APIRequest": "true", "Accept": "application/json"}
+ if additional_headers:
+ headers.update(additional_headers)
+ return headers
+
+ async def list_webhooks(self) -> List[Dict[str, Any]]:
+ """List all registered webhooks for the current user.
+
+ Returns:
+ List of webhook registrations with id, uri, event, filters, etc.
+ """
+ headers = self._get_webhook_headers()
+ response = await self._make_request(
+ "GET",
+ "/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks",
+ headers=headers,
+ )
+ data = response.json()["ocs"]["data"]
+ return data if isinstance(data, list) else []
+
+ async def create_webhook(
+ self,
+ event: str,
+ uri: str,
+ http_method: str = "POST",
+ auth_method: str = "none",
+ headers: Optional[Dict[str, str]] = None,
+ event_filter: Optional[Dict[str, Any]] = None,
+ ) -> Dict[str, Any]:
+ """Register a new webhook for the specified event.
+
+ Args:
+ event: Fully qualified event class name (e.g., "OCP\\Files\\Events\\Node\\NodeCreatedEvent")
+ uri: Webhook endpoint URL to receive event notifications
+ http_method: HTTP method for webhook delivery (default: "POST")
+ auth_method: Authentication method ("none", "bearer", etc.)
+ headers: Custom headers to include in webhook requests (e.g., Authorization header)
+ event_filter: JSON object specifying event filters (e.g., {"user.uid": "bob"})
+
+ Returns:
+ Webhook registration details including webhook ID
+ """
+ data: Dict[str, Any] = {
+ "httpMethod": http_method,
+ "uri": uri,
+ "event": event,
+ "authMethod": auth_method,
+ }
+
+ if headers:
+ data["headers"] = headers
+
+ if event_filter:
+ data["eventFilter"] = event_filter
+
+ request_headers = self._get_webhook_headers()
+ response = await self._make_request(
+ "POST",
+ "/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks",
+ json=data,
+ headers=request_headers,
+ )
+ return response.json()["ocs"]["data"]
+
+ async def delete_webhook(self, webhook_id: int) -> None:
+ """Delete a webhook registration.
+
+ Args:
+ webhook_id: ID of the webhook to delete
+ """
+ headers = self._get_webhook_headers()
+ await self._make_request(
+ "DELETE",
+ f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{webhook_id}",
+ headers=headers,
+ )
+
+ async def get_webhook(self, webhook_id: int) -> Dict[str, Any]:
+ """Get details of a specific webhook registration.
+
+ Args:
+ webhook_id: ID of the webhook to retrieve
+
+ Returns:
+ Webhook registration details
+ """
+ headers = self._get_webhook_headers()
+ response = await self._make_request(
+ "GET",
+ f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{webhook_id}",
+ headers=headers,
+ )
+ return response.json()["ocs"]["data"]
diff --git a/nextcloud_mcp_server/server/webhook_presets.py b/nextcloud_mcp_server/server/webhook_presets.py
new file mode 100644
index 0000000..11500a4
--- /dev/null
+++ b/nextcloud_mcp_server/server/webhook_presets.py
@@ -0,0 +1,197 @@
+"""Webhook preset configurations for common sync scenarios.
+
+This module defines pre-configured webhook bundles that simplify
+webhook setup for common use cases like Notes sync, Calendar sync, etc.
+"""
+
+from typing import Any, Dict, List, TypedDict
+
+
+class WebhookEventConfig(TypedDict):
+ """Configuration for a single webhook event."""
+
+ event: str # Fully qualified event class name
+ filter: Dict[str, Any] # Event filter (optional)
+
+
+class WebhookPreset(TypedDict):
+ """Definition of a webhook preset."""
+
+ name: str # Display name
+ description: str # User-friendly description
+ events: List[WebhookEventConfig] # List of events to register
+ app: str # Nextcloud app this preset is for
+
+
+# File/Notes webhook events
+FILE_EVENT_CREATED = "OCP\\Files\\Events\\Node\\NodeCreatedEvent"
+FILE_EVENT_WRITTEN = "OCP\\Files\\Events\\Node\\NodeWrittenEvent"
+# Use BeforeNodeDeletedEvent instead of NodeDeletedEvent to get node.id
+# See: https://github.com/nextcloud/server/issues/56371
+FILE_EVENT_DELETED = "OCP\\Files\\Events\\Node\\BeforeNodeDeletedEvent"
+
+# Calendar webhook events
+CALENDAR_EVENT_CREATED = "OCP\\Calendar\\Events\\CalendarObjectCreatedEvent"
+CALENDAR_EVENT_UPDATED = "OCP\\Calendar\\Events\\CalendarObjectUpdatedEvent"
+CALENDAR_EVENT_DELETED = "OCP\\Calendar\\Events\\CalendarObjectDeletedEvent"
+
+# Tables webhook events (Nextcloud 30+)
+TABLES_EVENT_ROW_ADDED = "OCA\\Tables\\Event\\RowAddedEvent"
+TABLES_EVENT_ROW_UPDATED = "OCA\\Tables\\Event\\RowUpdatedEvent"
+TABLES_EVENT_ROW_DELETED = "OCA\\Tables\\Event\\RowDeletedEvent"
+
+# Forms webhook events (Nextcloud 30+)
+FORMS_EVENT_FORM_SUBMITTED = "OCA\\Forms\\Events\\FormSubmittedEvent"
+
+# NOTE: Deck and Contacts do NOT support webhooks
+# Their event classes do not implement IWebhookCompatibleEvent interface.
+# Alternative sync strategies:
+# - Deck: Use polling with ETag-based change detection
+# - Contacts: Use CardDAV sync-token mechanism for efficient syncing
+
+
+WEBHOOK_PRESETS: Dict[str, WebhookPreset] = {
+ "notes_sync": {
+ "name": "Notes Sync",
+ "description": "Real-time synchronization for Notes app (create, update, delete)",
+ "app": "notes",
+ "events": [
+ {
+ "event": FILE_EVENT_CREATED,
+ "filter": {"event.node.path": "/^\\/.*\\/files\\/Notes\\//"},
+ },
+ {
+ "event": FILE_EVENT_WRITTEN,
+ "filter": {"event.node.path": "/^\\/.*\\/files\\/Notes\\//"},
+ },
+ {
+ "event": FILE_EVENT_DELETED,
+ "filter": {"event.node.path": "/^\\/.*\\/files\\/Notes\\//"},
+ },
+ ],
+ },
+ "calendar_sync": {
+ "name": "Calendar Sync",
+ "description": "Real-time synchronization for Calendar events (create, update, delete)",
+ "app": "calendar",
+ "events": [
+ {
+ "event": CALENDAR_EVENT_CREATED,
+ "filter": {},
+ },
+ {
+ "event": CALENDAR_EVENT_UPDATED,
+ "filter": {},
+ },
+ {
+ "event": CALENDAR_EVENT_DELETED,
+ "filter": {},
+ },
+ ],
+ },
+ "tables_sync": {
+ "name": "Tables Sync",
+ "description": "Real-time synchronization for Tables rows (add, update, delete)",
+ "app": "tables",
+ "events": [
+ {
+ "event": TABLES_EVENT_ROW_ADDED,
+ "filter": {},
+ },
+ {
+ "event": TABLES_EVENT_ROW_UPDATED,
+ "filter": {},
+ },
+ {
+ "event": TABLES_EVENT_ROW_DELETED,
+ "filter": {},
+ },
+ ],
+ },
+ "forms_sync": {
+ "name": "Forms Sync",
+ "description": "Real-time synchronization for Forms submissions",
+ "app": "forms",
+ "events": [
+ {
+ "event": FORMS_EVENT_FORM_SUBMITTED,
+ "filter": {},
+ },
+ ],
+ },
+ "files_sync": {
+ "name": "All Files Sync",
+ "description": "Real-time synchronization for all file operations (create, update, delete)",
+ "app": "files",
+ "events": [
+ {
+ "event": FILE_EVENT_CREATED,
+ "filter": {},
+ },
+ {
+ "event": FILE_EVENT_WRITTEN,
+ "filter": {},
+ },
+ {
+ "event": FILE_EVENT_DELETED,
+ "filter": {},
+ },
+ ],
+ },
+}
+
+
+def get_preset(preset_id: str) -> WebhookPreset | None:
+ """Get a webhook preset by ID.
+
+ Args:
+ preset_id: Preset identifier (e.g., "notes_sync", "calendar_sync")
+
+ Returns:
+ Webhook preset configuration or None if not found
+ """
+ return WEBHOOK_PRESETS.get(preset_id)
+
+
+def list_presets() -> List[tuple[str, WebhookPreset]]:
+ """Get all available webhook presets.
+
+ Returns:
+ List of (preset_id, preset_config) tuples
+ """
+ return list(WEBHOOK_PRESETS.items())
+
+
+def get_preset_events(preset_id: str) -> List[str]:
+ """Get list of event class names for a preset.
+
+ Args:
+ preset_id: Preset identifier
+
+ Returns:
+ List of fully qualified event class names
+ """
+ preset = get_preset(preset_id)
+ if not preset:
+ return []
+ return [event_config["event"] for event_config in preset["events"]]
+
+
+def filter_presets_by_installed_apps(
+ installed_apps: list[str],
+) -> List[tuple[str, WebhookPreset]]:
+ """Filter webhook presets to only show those for installed apps.
+
+ Args:
+ installed_apps: List of installed app names (e.g., ["notes", "calendar", "forms"])
+
+ Returns:
+ List of (preset_id, preset_config) tuples for presets whose apps are installed
+ """
+ filtered = []
+ for preset_id, preset in WEBHOOK_PRESETS.items():
+ app_name = preset["app"]
+ # "files" is always available (core functionality)
+ if app_name == "files" or app_name in installed_apps:
+ filtered.append((preset_id, preset))
+ return filtered
diff --git a/pyproject.toml b/pyproject.toml
index c8c7885..dc449f9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -116,7 +116,7 @@ dev = [
]
[project.scripts]
-nextcloud-mcp-server = "nextcloud_mcp_server.app:run"
+nextcloud-mcp-server = "nextcloud_mcp_server.cli:run"
[[tool.uv.index]]
name = "testpypi"
diff --git a/tests/auth/test_permissions.py b/tests/auth/test_permissions.py
new file mode 100644
index 0000000..54de8db
--- /dev/null
+++ b/tests/auth/test_permissions.py
@@ -0,0 +1,112 @@
+"""Unit tests for permission checking."""
+
+import pytest
+from httpx import AsyncClient
+
+from nextcloud_mcp_server.auth.permissions import is_nextcloud_admin
+from nextcloud_mcp_server.client.users import UsersClient
+
+
+@pytest.fixture
+def mock_request(mocker):
+ """Create a mock Starlette request."""
+ request = mocker.Mock()
+ request.user = mocker.Mock()
+ request.user.display_name = "testuser"
+ return request
+
+
+@pytest.fixture
+def mock_http_client(mocker):
+ """Create a mock HTTP client."""
+ return mocker.AsyncMock(spec=AsyncClient)
+
+
+@pytest.mark.unit
+async def test_is_nextcloud_admin_true(mock_request, mock_http_client, mocker):
+ """Test checking if user is admin (admin group membership)."""
+ # Mock the get_user_groups method to return admin group
+ mock_get_user_groups = mocker.patch.object(
+ UsersClient, "get_user_groups", return_value=["admin", "users"]
+ )
+
+ is_admin = await is_nextcloud_admin(mock_request, mock_http_client)
+
+ assert is_admin is True
+ mock_get_user_groups.assert_called_once_with("testuser")
+
+
+@pytest.mark.unit
+async def test_is_nextcloud_admin_false(mock_request, mock_http_client, mocker):
+ """Test checking if user is not admin (no admin group membership)."""
+ # Mock the get_user_groups method to return no admin group
+ mock_get_user_groups = mocker.patch.object(
+ UsersClient, "get_user_groups", return_value=["users", "editors"]
+ )
+
+ is_admin = await is_nextcloud_admin(mock_request, mock_http_client)
+
+ assert is_admin is False
+ mock_get_user_groups.assert_called_once_with("testuser")
+
+
+@pytest.mark.unit
+async def test_is_nextcloud_admin_empty_groups(mock_request, mock_http_client, mocker):
+ """Test checking admin status when user has no groups."""
+ # Mock the get_user_groups method to return empty list
+ mock_get_user_groups = mocker.patch.object(
+ UsersClient, "get_user_groups", return_value=[]
+ )
+
+ is_admin = await is_nextcloud_admin(mock_request, mock_http_client)
+
+ assert is_admin is False
+ mock_get_user_groups.assert_called_once_with("testuser")
+
+
+@pytest.mark.unit
+async def test_is_nextcloud_admin_no_username(mock_request, mock_http_client, mocker):
+ """Test checking admin status when username is missing."""
+ # Set username to None
+ mock_request.user.display_name = None
+
+ mock_get_user_groups = mocker.patch.object(UsersClient, "get_user_groups")
+
+ is_admin = await is_nextcloud_admin(mock_request, mock_http_client)
+
+ assert is_admin is False
+ # Ensure get_user_groups was not called
+ mock_get_user_groups.assert_not_called()
+
+
+@pytest.mark.unit
+async def test_is_nextcloud_admin_api_error(mock_request, mock_http_client, mocker):
+ """Test checking admin status when API call fails."""
+ # Mock the get_user_groups method to raise an exception
+ mock_get_user_groups = mocker.patch.object(
+ UsersClient,
+ "get_user_groups",
+ side_effect=Exception("API error"),
+ )
+
+ is_admin = await is_nextcloud_admin(mock_request, mock_http_client)
+
+ assert is_admin is False
+ mock_get_user_groups.assert_called_once_with("testuser")
+
+
+@pytest.mark.unit
+async def test_is_nextcloud_admin_case_sensitive(
+ mock_request, mock_http_client, mocker
+):
+ """Test that admin group check is case-sensitive."""
+ # Mock with "Admin" (capital A) instead of "admin"
+ mock_get_user_groups = mocker.patch.object(
+ UsersClient, "get_user_groups", return_value=["Admin", "users"]
+ )
+
+ is_admin = await is_nextcloud_admin(mock_request, mock_http_client)
+
+ # Should be False because Nextcloud uses lowercase "admin"
+ assert is_admin is False
+ mock_get_user_groups.assert_called_once_with("testuser")
diff --git a/tests/client/test_webhooks_client.py b/tests/client/test_webhooks_client.py
new file mode 100644
index 0000000..6c5022f
--- /dev/null
+++ b/tests/client/test_webhooks_client.py
@@ -0,0 +1,218 @@
+"""Unit tests for WebhooksClient."""
+
+import pytest
+from httpx import AsyncClient
+
+from nextcloud_mcp_server.client.webhooks import WebhooksClient
+
+
+@pytest.fixture
+def webhooks_client(mocker):
+ """Create a WebhooksClient with mocked HTTP client."""
+ mock_http_client = mocker.AsyncMock(spec=AsyncClient)
+ return WebhooksClient(mock_http_client, "testuser")
+
+
+@pytest.mark.unit
+async def test_list_webhooks(webhooks_client, mocker):
+ """Test listing registered webhooks."""
+ mock_response = mocker.Mock()
+ mock_response.json.return_value = {
+ "ocs": {
+ "data": [
+ {
+ "id": 1,
+ "uri": "http://example.com/webhook",
+ "event": "OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ "httpMethod": "POST",
+ },
+ {
+ "id": 2,
+ "uri": "http://example.com/webhook",
+ "event": "OCP\\Files\\Events\\Node\\NodeWrittenEvent",
+ "httpMethod": "POST",
+ },
+ ]
+ }
+ }
+
+ mock_make_request = mocker.patch.object(
+ WebhooksClient, "_make_request", return_value=mock_response
+ )
+
+ webhooks = await webhooks_client.list_webhooks()
+
+ assert len(webhooks) == 2
+ assert webhooks[0]["id"] == 1
+ assert webhooks[0]["event"] == "OCP\\Files\\Events\\Node\\NodeCreatedEvent"
+ assert webhooks[1]["id"] == 2
+
+ mock_make_request.assert_called_once_with(
+ "GET",
+ "/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks",
+ headers={"OCS-APIRequest": "true", "Accept": "application/json"},
+ )
+
+
+@pytest.mark.unit
+async def test_list_webhooks_empty(webhooks_client, mocker):
+ """Test listing webhooks when none are registered."""
+ mock_response = mocker.Mock()
+ mock_response.json.return_value = {"ocs": {"data": []}}
+
+ mocker.patch.object(WebhooksClient, "_make_request", return_value=mock_response)
+
+ webhooks = await webhooks_client.list_webhooks()
+
+ assert webhooks == []
+
+
+@pytest.mark.unit
+async def test_create_webhook(webhooks_client, mocker):
+ """Test creating a webhook registration."""
+ mock_response = mocker.Mock()
+ mock_response.json.return_value = {
+ "ocs": {
+ "data": {
+ "id": 123,
+ "uri": "http://example.com/webhook",
+ "event": "OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ "httpMethod": "POST",
+ "authMethod": "none",
+ }
+ }
+ }
+
+ mock_make_request = mocker.patch.object(
+ WebhooksClient, "_make_request", return_value=mock_response
+ )
+
+ webhook_data = await webhooks_client.create_webhook(
+ event="OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ uri="http://example.com/webhook",
+ )
+
+ assert webhook_data["id"] == 123
+ assert webhook_data["event"] == "OCP\\Files\\Events\\Node\\NodeCreatedEvent"
+
+ mock_make_request.assert_called_once()
+ call_args = mock_make_request.call_args
+ assert call_args[0][0] == "POST"
+ assert call_args[0][1] == "/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks"
+
+
+@pytest.mark.unit
+async def test_create_webhook_with_filter(webhooks_client, mocker):
+ """Test creating a webhook with event filter."""
+ mock_response = mocker.Mock()
+ mock_response.json.return_value = {
+ "ocs": {
+ "data": {
+ "id": 124,
+ "uri": "http://example.com/webhook",
+ "event": "OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ "eventFilter": {"user.uid": "bob"},
+ }
+ }
+ }
+
+ mock_make_request = mocker.patch.object(
+ WebhooksClient, "_make_request", return_value=mock_response
+ )
+
+ webhook_data = await webhooks_client.create_webhook(
+ event="OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ uri="http://example.com/webhook",
+ event_filter={"user.uid": "bob"},
+ )
+
+ assert webhook_data["id"] == 124
+ assert webhook_data["eventFilter"] == {"user.uid": "bob"}
+
+ mock_make_request.assert_called_once()
+ call_args = mock_make_request.call_args
+ assert call_args[1]["json"]["eventFilter"] == {"user.uid": "bob"}
+
+
+@pytest.mark.unit
+async def test_create_webhook_with_auth_headers(webhooks_client, mocker):
+ """Test creating a webhook with authentication headers."""
+ mock_response = mocker.Mock()
+ mock_response.json.return_value = {
+ "ocs": {
+ "data": {
+ "id": 125,
+ "uri": "http://example.com/webhook",
+ "event": "OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ "authMethod": "bearer",
+ }
+ }
+ }
+
+ mock_make_request = mocker.patch.object(
+ WebhooksClient, "_make_request", return_value=mock_response
+ )
+
+ webhook_data = await webhooks_client.create_webhook(
+ event="OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ uri="http://example.com/webhook",
+ auth_method="bearer",
+ headers={"Authorization": "Bearer secret-token"},
+ )
+
+ assert webhook_data["id"] == 125
+ assert webhook_data["authMethod"] == "bearer"
+
+ mock_make_request.assert_called_once()
+ call_args = mock_make_request.call_args
+ assert call_args[1]["json"]["authMethod"] == "bearer"
+ assert call_args[1]["json"]["headers"] == {"Authorization": "Bearer secret-token"}
+
+
+@pytest.mark.unit
+async def test_delete_webhook(webhooks_client, mocker):
+ """Test deleting a webhook registration."""
+ mock_response = mocker.Mock()
+
+ mock_make_request = mocker.patch.object(
+ WebhooksClient, "_make_request", return_value=mock_response
+ )
+
+ await webhooks_client.delete_webhook(webhook_id=123)
+
+ mock_make_request.assert_called_once_with(
+ "DELETE",
+ "/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/123",
+ headers={"OCS-APIRequest": "true", "Accept": "application/json"},
+ )
+
+
+@pytest.mark.unit
+async def test_get_webhook(webhooks_client, mocker):
+ """Test getting a specific webhook by ID."""
+ mock_response = mocker.Mock()
+ mock_response.json.return_value = {
+ "ocs": {
+ "data": {
+ "id": 123,
+ "uri": "http://example.com/webhook",
+ "event": "OCP\\Files\\Events\\Node\\NodeCreatedEvent",
+ "httpMethod": "POST",
+ }
+ }
+ }
+
+ mock_make_request = mocker.patch.object(
+ WebhooksClient, "_make_request", return_value=mock_response
+ )
+
+ webhook_data = await webhooks_client.get_webhook(webhook_id=123)
+
+ assert webhook_data["id"] == 123
+ assert webhook_data["event"] == "OCP\\Files\\Events\\Node\\NodeCreatedEvent"
+
+ mock_make_request.assert_called_once_with(
+ "GET",
+ "/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/123",
+ headers={"OCS-APIRequest": "true", "Accept": "application/json"},
+ )
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 1763ba4..5c7f77f 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -5,7 +5,7 @@ import os
import pytest
from click.testing import CliRunner
-from nextcloud_mcp_server.app import run
+from nextcloud_mcp_server.cli import run
@pytest.fixture
diff --git a/tests/unit/test_webhook_presets.py b/tests/unit/test_webhook_presets.py
new file mode 100644
index 0000000..4e18b52
--- /dev/null
+++ b/tests/unit/test_webhook_presets.py
@@ -0,0 +1,112 @@
+"""Unit tests for webhook preset filtering."""
+
+import pytest
+
+from nextcloud_mcp_server.server.webhook_presets import (
+ filter_presets_by_installed_apps,
+ get_preset,
+ list_presets,
+)
+
+
+@pytest.mark.unit
+def test_list_all_presets():
+ """Test listing all presets returns 5 presets."""
+ presets = list_presets()
+ assert len(presets) == 5
+ preset_ids = [preset_id for preset_id, _ in presets]
+ assert "notes_sync" in preset_ids
+ assert "calendar_sync" in preset_ids
+ assert "tables_sync" in preset_ids
+ assert "forms_sync" in preset_ids
+ assert "files_sync" in preset_ids
+
+
+@pytest.mark.unit
+def test_get_preset_existing():
+ """Test getting an existing preset."""
+ preset = get_preset("notes_sync")
+ assert preset is not None
+ assert preset["name"] == "Notes Sync"
+ assert preset["app"] == "notes"
+ assert len(preset["events"]) == 3
+
+
+@pytest.mark.unit
+def test_get_preset_nonexistent():
+ """Test getting a nonexistent preset returns None."""
+ preset = get_preset("nonexistent_sync")
+ assert preset is None
+
+
+@pytest.mark.unit
+def test_filter_presets_all_apps_installed():
+ """Test filtering when all apps are installed."""
+ installed_apps = ["notes", "calendar", "tables", "forms"]
+ filtered = filter_presets_by_installed_apps(installed_apps)
+ assert len(filtered) == 5 # All 5 presets (files is always included)
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "notes_sync" in preset_ids
+ assert "calendar_sync" in preset_ids
+ assert "tables_sync" in preset_ids
+ assert "forms_sync" in preset_ids
+ assert "files_sync" in preset_ids
+
+
+@pytest.mark.unit
+def test_filter_presets_subset_installed():
+ """Test filtering when only some apps are installed."""
+ installed_apps = ["notes", "calendar"]
+ filtered = filter_presets_by_installed_apps(installed_apps)
+ assert len(filtered) == 3 # notes, calendar, files
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "notes_sync" in preset_ids
+ assert "calendar_sync" in preset_ids
+ assert "files_sync" in preset_ids
+ assert "tables_sync" not in preset_ids
+ assert "forms_sync" not in preset_ids
+
+
+@pytest.mark.unit
+def test_filter_presets_no_apps_installed():
+ """Test filtering when no optional apps are installed."""
+ installed_apps = []
+ filtered = filter_presets_by_installed_apps(installed_apps)
+ assert len(filtered) == 1 # Only files
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "files_sync" in preset_ids
+ assert "notes_sync" not in preset_ids
+ assert "calendar_sync" not in preset_ids
+
+
+@pytest.mark.unit
+def test_filter_presets_files_always_included():
+ """Test that files preset is always included regardless of installed apps."""
+ # Empty list
+ filtered = filter_presets_by_installed_apps([])
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "files_sync" in preset_ids
+
+ # List with other apps but not explicitly "files"
+ filtered = filter_presets_by_installed_apps(["notes", "calendar"])
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "files_sync" in preset_ids
+
+
+@pytest.mark.unit
+def test_filter_presets_forms_included_when_installed():
+ """Test that forms preset is included when Forms app is installed."""
+ installed_apps = ["forms"]
+ filtered = filter_presets_by_installed_apps(installed_apps)
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "forms_sync" in preset_ids
+ assert len(filtered) == 2 # forms + files
+
+
+@pytest.mark.unit
+def test_filter_presets_forms_excluded_when_not_installed():
+ """Test that forms preset is excluded when Forms app is not installed."""
+ installed_apps = ["notes", "calendar", "tables"]
+ filtered = filter_presets_by_installed_apps(installed_apps)
+ preset_ids = [preset_id for preset_id, _ in filtered]
+ assert "forms_sync" not in preset_ids