Compare commits

..

67 Commits

Author SHA1 Message Date
Chris Coutinho 4cf5f2a95a feat(client): Preserve fields when modifying contacts/calendar resources 2025-08-30 19:19:20 +02:00
Chris Coutinho 1cc65f0160 chore: Remove unused model 2025-08-30 18:31:45 +02:00
Chris Coutinho 9b00530e8e feat(server): Add structured output to all tool/resource output
BREAKING CHANGE
2025-08-30 18:27:32 +02:00
Chris Coutinho 938376425b chore: Update CLAUDE.md 2025-08-30 14:34:25 +02:00
Chris Coutinho 0484167a22 refactor: Use _make_request where available 2025-08-30 14:27:53 +02:00
Chris Coutinho 84ad1958af chore: Remove unnecessary logging
Migrate pre-commit tasks to local
2025-08-30 14:25:16 +02:00
Chris Coutinho fa002296ff chore(claude): Initialize CLAUDE.md 2025-08-30 13:23:34 +02:00
github-actions[bot] 464ff2c8b2 bump: version 0.7.1 → 0.7.2 2025-08-30 10:15:06 +00:00
Chris Coutinho 0804ff8d17 Merge pull request #136 from rnivet/fix/get-all-notes-paging
fix(client): Use paging to fetch all notes
2025-08-30 12:14:45 +02:00
Rémi Nivet 4f7023a16e fix(client): Use paging to fetch all notes 2025-08-29 23:46:58 +02:00
Chris Coutinho 8f6656c546 Merge pull request #134 from cbcoutinho/renovate/nextcloud-31.0.8
chore(deps): update nextcloud:31.0.8 docker digest to 3eaddb0
2025-08-29 12:53:52 +02:00
Chris Coutinho 741c58d9a3 Merge pull request #135 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.14
2025-08-29 12:53:42 +02:00
renovate-bot-cbcoutinho[bot] e7b79d0316 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.14 2025-08-29 10:25:25 +00:00
renovate-bot-cbcoutinho[bot] 0e4cc8e56f chore(deps): update nextcloud:31.0.8 docker digest to 3eaddb0 2025-08-29 10:25:20 +00:00
Chris Coutinho 16da7a9a76 Merge pull request #133 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.13
2025-08-22 13:06:28 +02:00
renovate-bot-cbcoutinho[bot] 520e515f2b chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.13 2025-08-21 22:14:57 +00:00
Chris Coutinho fd6ce7b294 Merge pull request #132 from cbcoutinho/renovate/astral-sh-setup-uv-digest
chore(deps): update astral-sh/setup-uv digest to 4959332
2025-08-21 12:45:28 +02:00
renovate-bot-cbcoutinho[bot] 8063059f5f chore(deps): update astral-sh/setup-uv digest to 4959332 2025-08-21 10:04:51 +00:00
Chris Coutinho 20c5046b20 Merge pull request #130 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to 987c376
2025-08-19 11:50:51 +02:00
Chris Coutinho 68126640d8 Merge pull request #131 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.12
2025-08-19 11:50:10 +02:00
renovate-bot-cbcoutinho[bot] af617e3869 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.12 2025-08-19 04:04:58 +00:00
renovate-bot-cbcoutinho[bot] 04e5f7beca chore(deps): update redis:alpine docker digest to 987c376 2025-08-19 04:04:54 +00:00
Chris Coutinho 6ed1efab24 Merge pull request #129 from cbcoutinho/renovate/nextcloud-31.0.8
chore(deps): update nextcloud:31.0.8 docker digest to 72abe18
2025-08-17 23:30:34 +02:00
renovate-bot-cbcoutinho[bot] cffa002364 chore(deps): update nextcloud:31.0.8 docker digest to 72abe18 2025-08-17 16:04:16 +00:00
Chris Coutinho 951a7095b2 Merge pull request #127 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.11
2025-08-16 20:04:50 +02:00
Chris Coutinho ee31f33038 Merge pull request #128 from cbcoutinho/renovate/nextcloud-31.x
chore(deps): update nextcloud docker tag to v31.0.8
2025-08-15 14:18:22 +02:00
renovate-bot-cbcoutinho[bot] 0fdbfae198 chore(deps): update nextcloud docker tag to v31.0.8 2025-08-15 04:08:58 +00:00
renovate-bot-cbcoutinho[bot] 315f918d88 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.11 2025-08-14 22:11:23 +00:00
Chris Coutinho 96a8491a4c Merge pull request #123 from cbcoutinho/renovate/astral-sh-setup-uv-digest
chore(deps): update astral-sh/setup-uv digest to d9e0f98
2025-08-13 10:00:32 +02:00
Chris Coutinho 0a311766f2 Merge pull request #124 from cbcoutinho/renovate/mariadb-lts
chore(deps): update mariadb:lts docker digest to 272084c
2025-08-13 09:59:56 +02:00
Chris Coutinho d28c249f8d Merge pull request #125 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to b255a97
2025-08-13 09:59:47 +02:00
renovate-bot-cbcoutinho[bot] ab6cac8799 chore(deps): update nextcloud:31.0.7 docker digest to b255a97 2025-08-13 04:05:37 +00:00
renovate-bot-cbcoutinho[bot] 7127b9953f chore(deps): update mariadb:lts docker digest to 272084c 2025-08-13 04:05:33 +00:00
renovate-bot-cbcoutinho[bot] 49c9af3c76 chore(deps): update astral-sh/setup-uv digest to d9e0f98 2025-08-12 22:08:22 +00:00
Chris Coutinho 823151f42e Merge pull request #122 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.9
2025-08-12 13:31:53 +02:00
renovate-bot-cbcoutinho[bot] 2bbd56e1cd chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.9 2025-08-12 04:05:16 +00:00
Chris Coutinho 8a36a120a7 Merge pull request #121 from cbcoutinho/renovate/actions-checkout-5.x
chore(deps): update actions/checkout action to v5
2025-08-11 22:39:16 +02:00
renovate-bot-cbcoutinho[bot] 9df8cc937d chore(deps): update actions/checkout action to v5 2025-08-11 16:07:14 +00:00
Chris Coutinho 325dcdf654 Merge pull request #118 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.8
2025-08-09 09:09:45 +02:00
renovate-bot-cbcoutinho[bot] 945eb1eb4e chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.8 2025-08-09 04:04:39 +00:00
Chris Coutinho 088343d003 Merge pull request #117 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.7
2025-08-09 01:14:56 +02:00
renovate-bot-cbcoutinho[bot] 94d553985f chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.7 2025-08-08 22:07:52 +00:00
github-actions[bot] 982dbd18ca bump: version 0.7.0 → 0.7.1 2025-08-08 19:04:17 +00:00
Chris Coutinho 054fa38e3a Merge pull request #116 from cbcoutinho/fix/csrf-cookies
Strip cookies from responses to avoid falsely raising CS…
2025-08-08 21:03:56 +02:00
Chris Coutinho 3836534205 fix(client): Strip cookies from responses to avoid falsely raising CSRF errors 2025-08-08 21:03:16 +02:00
Chris Coutinho f852a18b12 Merge pull request #114 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.6
2025-08-08 13:11:56 +02:00
renovate-bot-cbcoutinho[bot] 0450c5cc52 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.6 2025-08-07 16:06:38 +00:00
Chris Coutinho f48fd0be60 Merge pull request #113 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to a834f43
2025-08-07 09:11:06 +02:00
renovate-bot-cbcoutinho[bot] ee29194bc9 chore(deps): update nextcloud:31.0.7 docker digest to a834f43 2025-08-07 04:06:07 +00:00
Chris Coutinho fc32fa2852 Merge pull request #112 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to 7521abd
2025-08-06 20:53:55 +02:00
renovate-bot-cbcoutinho[bot] b7d6548741 chore(deps): update redis:alpine docker digest to 7521abd 2025-08-06 10:05:20 +00:00
Chris Coutinho a9ffd49815 Merge pull request #111 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.5
2025-08-06 02:52:55 +02:00
renovate-bot-cbcoutinho[bot] 538f861414 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.5 2025-08-05 22:09:00 +00:00
Chris Coutinho b784651f7f Merge pull request #110 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to 33c21e8
2025-08-05 18:27:41 +02:00
renovate-bot-cbcoutinho[bot] 6f0baf5fca chore(deps): update nextcloud:31.0.7 docker digest to 33c21e8 2025-08-05 16:04:55 +00:00
Chris Coutinho 664254ed95 Merge pull request #108 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to e716e2f
2025-08-05 14:55:04 +02:00
Chris Coutinho b976494ca2 Merge pull request #109 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to a0fc425
2025-08-05 14:54:55 +02:00
renovate-bot-cbcoutinho[bot] 061f667e00 chore(deps): update redis:alpine docker digest to a0fc425 2025-08-05 10:05:41 +00:00
renovate-bot-cbcoutinho[bot] 3319c35798 chore(deps): update nextcloud:31.0.7 docker digest to e716e2f 2025-08-05 10:05:35 +00:00
Chris Coutinho 52c9293c37 Merge pull request #106 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to fb96127
2025-08-05 08:54:31 +02:00
Chris Coutinho af6863a764 Merge pull request #107 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to 6b268fb
2025-08-05 08:53:01 +02:00
renovate-bot-cbcoutinho[bot] 77181f7c6f chore(deps): update nextcloud:31.0.7 docker digest to 6b268fb 2025-08-05 04:05:19 +00:00
renovate-bot-cbcoutinho[bot] 61f3beac01 chore(deps): update redis:alpine docker digest to fb96127 2025-08-04 22:07:46 +00:00
Chris Coutinho 49aaf24363 Merge pull request #105 from cbcoutinho/renovate/docker-login-action-digest
chore(deps): update docker/login-action digest to 184bdaa
2025-08-04 19:22:12 +02:00
renovate-bot-cbcoutinho[bot] 4edd31ee28 chore(deps): update docker/login-action digest to 184bdaa 2025-08-04 16:05:38 +00:00
github-actions[bot] 9ae2a0fc6f bump: version 0.6.1 → 0.7.0 2025-08-03 12:47:13 +00:00
Chris Coutinho 8386644dfd Merge pull request #104 from cbcoutinho/feature/vcard
Initialize Contacts App
2025-08-03 14:46:48 +02:00
31 changed files with 2299 additions and 105 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ jobs:
packages: write
steps:
- name: Check out
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
with:
fetch-depth: 0
token: "${{ secrets.PERSONAL_ACCESS_TOKEN }}"
+2 -2
View File
@@ -12,7 +12,7 @@ jobs:
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Docker meta
id: meta
@@ -37,7 +37,7 @@ jobs:
- name: Log in to GitHub Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3
uses: docker/login-action@184bdaa0721073962dff0199f1fb9940f07167d1 # v3
with:
registry: ghcr.io
username: ${{ github.actor }}
+4 -4
View File
@@ -9,9 +9,9 @@ jobs:
linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Install the latest version of uv
uses: astral-sh/setup-uv@e92bafb6253dcd438e0484186d7669ea7a8ca1cc # v6
uses: astral-sh/setup-uv@4959332f0f014c5280e7eac8b70c90cb574c9f9b # v6
- name: Check format
run: |
uv run --frozen ruff format --diff
@@ -24,14 +24,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Run docker compose
uses: hoverkraft-tech/compose-action@40041ff1b97dbf152cd2361138c2b03fa29139df # v2.3.0
with:
compose-file: "./docker-compose.yml"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@e92bafb6253dcd438e0484186d7669ea7a8ca1cc # v6
uses: astral-sh/setup-uv@4959332f0f014c5280e7eac8b70c90cb574c9f9b # v6
- name: Wait for service to be ready
run: |
+9 -2
View File
@@ -6,8 +6,15 @@ repos:
- id: commitizen-branch
stages:
- pre-push
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.5
- repo: local
hooks:
- id: ruff-check
name: ruff-check
entry: uv run ruff check
language: system
types: [python]
- id: ruff-format
name: ruff-format
entry: uv run ruff format
language: system
types: [python]
+18
View File
@@ -1,3 +1,21 @@
## v0.7.2 (2025-08-30)
### Fix
- **client**: Use paging to fetch all notes
## v0.7.1 (2025-08-08)
### Fix
- **client**: Strip cookies from responses to avoid falsely raising CSRF errors
## v0.7.0 (2025-08-03)
### Feat
- **contacts**: Initialize Contacts App
## v0.6.1 (2025-08-01)
### Fix
+118
View File
@@ -0,0 +1,118 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Development Commands
### Testing
```bash
# Run all tests
uv run pytest
# Run integration tests only
uv run pytest -m integration
# Run tests with coverage
uv run pytest --cov
# Skip integration tests
uv run pytest -m "not integration"
```
### Code Quality
```bash
# Format and lint code
uv run ruff check
uv run ruff format
# Type checking
# No explicit type checker configured - this is a Python project using ruff for linting
```
### Running the Server
```bash
# Local development - load environment variables and run
export $(grep -v '^#' .env | xargs)
mcp run --transport sse nextcloud_mcp_server.app:mcp
# Docker development environment with Nextcloud instance
docker-compose up
# After code changes, rebuild and restart only the MCP server container
docker-compose up --build -d mcp
# Build Docker image
docker build -t nextcloud-mcp-server .
```
### Environment Setup
```bash
# Install dependencies
uv sync
# Install development dependencies
uv sync --group dev
```
## Architecture Overview
This is a Python MCP (Model Context Protocol) server that provides LLM integration with Nextcloud. The architecture follows a layered pattern:
### Core Components
- **`nextcloud_mcp_server/app.py`** - Main MCP server entry point using FastMCP framework
- **`nextcloud_mcp_server/client/`** - HTTP client implementations for different Nextcloud APIs
- **`nextcloud_mcp_server/server/`** - MCP tool/resource definitions that expose client functionality
- **`nextcloud_mcp_server/controllers/`** - Business logic controllers (e.g., notes search)
### Client Architecture
- **`NextcloudClient`** - Main orchestrating client that manages all app-specific clients
- **`BaseNextcloudClient`** - Abstract base class providing common HTTP functionality and retry logic
- **App-specific clients**: `NotesClient`, `CalendarClient`, `ContactsClient`, `TablesClient`, `WebDAVClient`
### Server Integration
Each Nextcloud app has a corresponding server module that:
1. Defines MCP tools using `@mcp.tool()` decorators
2. Defines MCP resources using `@mcp.resource()` decorators
3. Uses the context pattern to access the `NextcloudClient` instance
### Supported Nextcloud Apps
- **Notes** - Full CRUD operations and search
- **Calendar** - CalDAV integration with events, recurring events, attendees
- **Contacts** - CardDAV integration with address book operations
- **Tables** - Row-level operations on Nextcloud Tables
- **WebDAV** - Complete file system access
### Key Patterns
1. **Environment-based configuration** - Uses `NextcloudClient.from_env()` to load credentials from environment variables
2. **Async/await throughout** - All operations are async using httpx
3. **Retry logic** - `@retry_on_429` decorator handles rate limiting
4. **Context injection** - MCP context provides access to the authenticated client instance
5. **Modular design** - Each Nextcloud app is isolated in its own client/server pair
### Testing Structure
- **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions
- **Fixtures** in `tests/conftest.py` - Shared test setup and utilities
- Tests are marked with `@pytest.mark.integration` for selective running
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
#### Testing Best Practices
- **Always restart MCP server** after code changes with `docker-compose up --build -d mcp`
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
- `nc_mcp_client` - MCP client session for tool/resource testing
- `nc_client` - Direct NextcloudClient for setup/cleanup operations
- `temporary_note` - Creates and cleans up test notes automatically
- `temporary_addressbook` - Creates and cleans up test address books
- `temporary_contact` - Creates and cleans up test contacts
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
### Configuration Files
- **`pyproject.toml`** - Python project configuration using uv for dependency management
- **`.env`** (from `env.sample`) - Environment variables for Nextcloud connection
- **`docker-compose.yml`** - Complete development environment with Nextcloud + database
+1 -1
View File
@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:0.8.4-python3.11-alpine@sha256:f2c5b953b713f455bcac4429303bb21d7d2547d56a64e1a7b2517cc9f0563f0f
FROM ghcr.io/astral-sh/uv:0.8.14-python3.11-alpine@sha256:7b1463148981d57ed2d9c2950f570fe5fdd88570970f9f56f6e0e5a8829eca95
WORKDIR /app
@@ -11,6 +11,10 @@ php /var/www/html/occ app:enable calendar
echo "Waiting for calendar app to initialize..."
sleep 5
# Increase limits on calendar creation for integration tests (100 in 60s)
php occ config:app:set dav rateLimitCalendarCreation --type=integer --value=100
php occ config:app:set dav rateLimitPeriodCalendarCreation --type=integer --value=60
# Ensure maintenance mode is off before calendar operations
php /var/www/html/occ maintenance:mode --off
+3 -3
View File
@@ -3,7 +3,7 @@ services:
# https://hub.docker.com/_/mariadb
db:
# Note: Check the recommend version here: https://docs.nextcloud.com/server/latest/admin_manual/installation/system_requirements.html#server
image: mariadb:lts@sha256:2bcbaec92bd9d4f6591bc8103d3a8e6d0512ee2235506e47a2e129d190444405
image: mariadb:lts@sha256:272084c2dec70619714df329c4ffcb336e3f8c723072c3f56f2e4015997bbf2c
restart: always
command: --transaction-isolation=READ-COMMITTED
volumes:
@@ -17,11 +17,11 @@ services:
# Note: Redis is an external service. You can find more information about the configuration here:
# https://hub.docker.com/_/redis
redis:
image: redis:alpine@sha256:25c0ae32c6c2301798579f5944af53729766a18eff5660bbef196fc2e6214a9c
image: redis:alpine@sha256:987c376c727652f99625c7d205a1cba3cb2c53b92b0b62aade2bd48ee1593232
restart: always
app:
image: nextcloud:31.0.7@sha256:81dc361f8f216d8acff20bd3dea2226fb6cea883c277505cbb2ddd6327c867fa
image: nextcloud:31.0.8@sha256:3eaddb0a9c56e6cf81ad258a5d05b78f747f6434b974f9a44e3f0dd91311b6ef
#user: www-data:www-data
restart: always
#post_start:
+33 -9
View File
@@ -1,7 +1,15 @@
import logging
import os
from httpx import AsyncClient, Auth, BasicAuth, Request, Response
from httpx import (
AsyncClient,
Auth,
BasicAuth,
Request,
Response,
AsyncBaseTransport,
AsyncHTTPTransport,
)
from ..controllers.notes_search import NotesSearchController
from .calendar import CalendarClient
@@ -13,19 +21,34 @@ from .webdav import WebDAVClient
logger = logging.getLogger(__name__)
def log_request(request: Request):
logger.info(
async def log_request(request: Request):
logger.debug(
"Request event hook: %s %s - Waiting for content",
request.method,
request.url,
)
logger.info("Request body: %s", request.content)
logger.info("Headers: %s", request.headers)
logger.debug("Request body: %s", request.content)
logger.debug("Headers: %s", request.headers)
def log_response(response: Response):
response.read() # Explicitly read the stream before accessing .text
logger.info("Response [%s] %s", response.status_code, response.text)
async def log_response(response: Response):
await response.aread()
logger.debug("Response [%s] %s", response.status_code, response.text)
class AsyncDisableCookieTransport(AsyncBaseTransport):
"""This Transport disable cookies from accumulating in the httpx AsyncClient
Thanks to: https://github.com/encode/httpx/issues/2992#issuecomment-2133258994
"""
def __init__(self, transport: AsyncBaseTransport):
self.transport = transport
async def handle_async_request(self, request: Request) -> Response:
response = await self.transport.handle_async_request(request)
response.headers.pop("set-cookie", None)
return response
class NextcloudClient:
@@ -36,7 +59,8 @@ class NextcloudClient:
self._client = AsyncClient(
base_url=base_url,
auth=auth,
# event_hooks={"request": [log_request], "response": [log_response]},
transport=AsyncDisableCookieTransport(AsyncHTTPTransport()),
event_hooks={"request": [log_request], "response": [log_response]},
)
# Initialize app clients
+63 -1
View File
@@ -3,11 +3,72 @@
import logging
from abc import ABC
from httpx import AsyncClient
from functools import wraps
import time
from httpx import HTTPStatusError, codes, RequestError, AsyncClient
logger = logging.getLogger(__name__)
def retry_on_429(func):
"""This decorator handles the 429 response from REST APIs
The `func` is assumed to be a method that is similar to `httpx.Client.get`,
and returns an `httpx.Response` object. In the case of `Too Many Requests` HTTP
response, the function will wait for a couple of seconds and retry the request.
"""
MAX_RETRIES = 5
@wraps(func)
async def wrapper(*args, **kwargs):
retries = 0
while retries < MAX_RETRIES:
try:
# Make GET API call
retries += 1
response = await func(*args, **kwargs)
break
except HTTPStatusError as e:
# If we get a '429 Client Error: Too Many Requests'
# error we wait a couple of seconds and do a retry
if e.response.status_code == codes.TOO_MANY_REQUESTS:
logger.warning(
f"429 Client Error: Too Many Requests, Number of attempts: {retries}"
)
time.sleep(5)
elif e.response.status_code == 404:
# 404 errors are often expected (e.g., checking if attachments exist)
# Log as debug instead of warning
logger.debug(
f"HTTPStatusError {e.response.status_code}: {e}, Number of attempts: {retries}"
)
raise
else:
logger.warning(
f"HTTPStatusError {e.response.status_code}: {e}, Number of attempts: {retries}"
)
raise
except RequestError as e:
logger.warning(
f"RequestError {e.request.url}: {e}, Number of attempts: {retries}"
)
raise
# If for loop ends without break statement
else:
logger.warning("All API call retries failed")
raise RuntimeError(
f"Maximum number of retries ({MAX_RETRIES}) exceeded without success"
)
return response
return wrapper
class BaseNextcloudClient(ABC):
"""Base class for all Nextcloud app clients."""
@@ -25,6 +86,7 @@ class BaseNextcloudClient(ABC):
"""Helper to get the base WebDAV path for the authenticated user."""
return f"/remote.php/dav/files/{self.username}"
@retry_on_429
async def _make_request(self, method: str, url: str, **kwargs):
"""Common request wrapper with logging and error handling.
+136 -11
View File
@@ -238,27 +238,33 @@ class CalendarClient(BaseNextcloudClient):
event_data: Dict[str, Any],
etag: str = "",
) -> Dict[str, Any]:
"""Update an existing calendar event."""
"""Update an existing calendar event while preserving all existing properties."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
# Get existing event data to merge with updates
existing_event_data = {}
# Get raw iCal content to preserve all properties including extended ones
raw_ical_content = ""
if not etag:
try:
existing_event_data, current_etag = await self.get_event(
raw_ical_content, current_etag = await self._get_raw_ical(
calendar_name, event_uid
)
etag = current_etag
except Exception:
# Continue without etag if we can't get it
pass
# Fall back to creating new iCal if we can't get existing
logger.warning(
f"Could not fetch existing iCal for {event_uid}, creating new"
)
raw_ical_content = ""
# Merge existing data with new data (new data takes precedence)
merged_data = {**existing_event_data, **event_data}
# Create updated iCalendar event
ical_content = self._create_ical_event(merged_data, event_uid)
# Create updated iCalendar event preserving existing properties
if raw_ical_content:
ical_content = self._merge_ical_properties(
raw_ical_content, event_data, event_uid
)
else:
# Fallback to creating new iCal if we couldn't get existing
ical_content = self._create_ical_event(event_data, event_uid)
headers = {
"Content-Type": "text/calendar; charset=utf-8",
@@ -949,3 +955,122 @@ class CalendarClient(BaseNextcloudClient):
except Exception as e:
logger.error(f"Error deleting calendar {calendar_name}: {e}")
raise
async def _get_raw_ical(
self, calendar_name: str, event_uid: str
) -> Tuple[str, str]:
"""Get raw iCal content for an event without parsing."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
headers = {"Accept": "text/calendar"}
try:
response = await self._make_request("GET", event_path, headers=headers)
etag = response.headers.get("etag", "")
return response.text, etag
except Exception as e:
logger.error(f"Error getting raw iCal for {event_uid}: {e}")
raise
def _merge_ical_properties(
self, raw_ical: str, event_data: Dict[str, Any], event_uid: str
) -> str:
"""Merge new event data into existing raw iCal while preserving all properties."""
try:
# Parse existing iCal
cal = Calendar.from_ical(raw_ical)
# Find the VEVENT component
for component in cal.walk():
if component.name == "VEVENT":
# Update only the properties that were provided in event_data
if "title" in event_data:
component["SUMMARY"] = event_data["title"]
if "description" in event_data:
component["DESCRIPTION"] = event_data["description"]
if "location" in event_data:
component["LOCATION"] = event_data["location"]
if "status" in event_data:
component["STATUS"] = event_data["status"].upper()
if "priority" in event_data:
component["PRIORITY"] = event_data["priority"]
if "privacy" in event_data:
component["CLASS"] = event_data["privacy"].upper()
if "url" in event_data:
component["URL"] = event_data["url"]
# Handle dates
if "start_datetime" in event_data:
start_str = event_data["start_datetime"]
all_day = event_data.get("all_day", False)
if all_day:
start_date = dt.datetime.fromisoformat(
start_str.split("T")[0]
).date()
component["DTSTART"] = start_date
else:
start_dt = dt.datetime.fromisoformat(
start_str.replace("Z", "+00:00")
)
component["DTSTART"] = start_dt
if "end_datetime" in event_data:
end_str = event_data["end_datetime"]
all_day = event_data.get("all_day", False)
if all_day:
end_date = dt.datetime.fromisoformat(
end_str.split("T")[0]
).date()
component["DTEND"] = end_date
else:
end_dt = dt.datetime.fromisoformat(
end_str.replace("Z", "+00:00")
)
component["DTEND"] = end_dt
# Handle categories
if "categories" in event_data:
categories = event_data["categories"]
if categories:
component["CATEGORIES"] = categories.split(",")
# Handle recurrence
if "recurring" in event_data:
if event_data["recurring"] and "recurrence_rule" in event_data:
recurrence_rule = event_data["recurrence_rule"]
if recurrence_rule:
component["RRULE"] = vRecur.from_ical(recurrence_rule)
elif not event_data["recurring"]:
# Remove recurrence if set to False
if "RRULE" in component:
del component["RRULE"]
# Handle attendees
if "attendees" in event_data:
attendees = event_data["attendees"]
# Remove existing attendees
component.pop("ATTENDEE", None)
if attendees:
for email in attendees.split(","):
if email.strip():
component.add("ATTENDEE", f"mailto:{email.strip()}")
# Update timestamps in proper iCal format
from icalendar import vDDDTypes
now = dt.datetime.now(dt.UTC)
component["LAST-MODIFIED"] = vDDDTypes(now)
component["DTSTAMP"] = vDDDTypes(now)
# Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.)
# by not touching them - they remain in the component
break
return cal.to_ical().decode("utf-8")
except Exception as e:
logger.error(f"Error merging iCal properties: {e}")
# Fallback to creating new iCal
return self._create_ical_event(event_data, event_uid)
+201
View File
@@ -143,6 +143,50 @@ class ContactsClient(BaseNextcloudClient):
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
await self._make_request("DELETE", url)
async def update_contact(
self, *, addressbook: str, uid: str, contact_data: dict, etag: str = ""
):
"""Update an existing contact while preserving all existing properties."""
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
# Get raw vCard content to preserve all properties including extended ones
raw_vcard_content = ""
if not etag:
try:
raw_vcard_content, current_etag = await self._get_raw_vcard(
addressbook, uid
)
etag = current_etag
except Exception:
# Fall back to creating new vCard if we can't get existing
logger.warning(
f"Could not fetch existing vCard for {uid}, creating new"
)
raw_vcard_content = ""
# Create updated vCard preserving existing properties
if raw_vcard_content:
vcard_content = self._merge_vcard_properties(
raw_vcard_content, contact_data, uid
)
else:
# Fallback to creating new vCard if we couldn't get existing
contact = Contact(fn=contact_data.get("fn"), uid=uid)
if "email" in contact_data:
contact.email = [{"value": contact_data["email"], "type": ["HOME"]}]
if "tel" in contact_data:
contact.tel = [{"value": contact_data["tel"], "type": ["HOME"]}]
vcard_content = contact.to_vcard()
headers = {
"Content-Type": "text/vcard; charset=utf-8",
}
if etag:
headers["If-Match"] = etag
await self._make_request("PUT", url, content=vcard_content, headers=headers)
async def list_contacts(self, *, addressbook: str):
"""List all available contacts for addressbook."""
@@ -233,3 +277,160 @@ class ContactsClient(BaseNextcloudClient):
logger.debug(f"Found {len(contacts)} contacts")
return contacts
async def _get_raw_vcard(self, addressbook: str, uid: str) -> tuple[str, str]:
"""Get raw vCard content for a contact without parsing."""
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
try:
response = await self._make_request("GET", url)
etag = response.headers.get("etag", "")
return response.text, etag
except Exception as e:
logger.error(f"Error getting raw vCard for {uid}: {e}")
raise
def _merge_vcard_properties(
self, raw_vcard: str, contact_data: dict, uid: str
) -> str:
"""Merge new contact data into existing raw vCard while preserving all properties."""
try:
# Instead of using pythonvCard4 which has formatting issues,
# let's do a simple text-based merge to preserve exact formatting
# Start with the original vCard
lines = raw_vcard.strip().split("\n")
updated_lines = []
# Track what we've updated to avoid duplicates
updated_properties = set()
for line in lines:
line = line.strip()
if not line:
continue
# Skip the END:VCARD line for now
if line == "END:VCARD":
continue
property_name = line.split(":")[0].split(";")[0]
# Handle updates for specific properties
if property_name == "FN" and "fn" in contact_data:
updated_lines.append(f"FN:{contact_data['fn']}")
updated_properties.add("fn")
elif property_name == "EMAIL" and "email" in contact_data:
# Replace first email with new one, preserve others
if "email" not in updated_properties:
if isinstance(contact_data["email"], str):
# Try to preserve the original format as much as possible
if ";TYPE=" in line:
type_part = line.split(";TYPE=")[1].split(":")[0]
updated_lines.append(
f"EMAIL;TYPE={type_part}:{contact_data['email']}"
)
else:
updated_lines.append(f"EMAIL:{contact_data['email']}")
updated_properties.add("email")
else:
# Keep additional emails unchanged
updated_lines.append(line)
elif property_name == "TEL" and "tel" in contact_data:
# Similar handling for phone numbers
if "tel" not in updated_properties:
if isinstance(contact_data["tel"], str):
if ";TYPE=" in line:
type_part = line.split(";TYPE=")[1].split(":")[0]
updated_lines.append(
f"TEL;TYPE={type_part}:{contact_data['tel']}"
)
else:
updated_lines.append(f"TEL:{contact_data['tel']}")
updated_properties.add("tel")
else:
# Keep additional phone numbers unchanged
updated_lines.append(line)
elif property_name == "NOTE" and "note" in contact_data:
updated_lines.append(f"NOTE:{contact_data['note']}")
updated_properties.add("note")
elif property_name == "NICKNAME" and "nickname" in contact_data:
nickname_value = contact_data["nickname"]
if isinstance(nickname_value, list):
nickname_value = ",".join(nickname_value)
updated_lines.append(f"NICKNAME:{nickname_value}")
updated_properties.add("nickname")
elif property_name == "BDAY" and "bday" in contact_data:
updated_lines.append(f"BDAY:{contact_data['bday']}")
updated_properties.add("bday")
elif property_name == "CATEGORIES" and "categories" in contact_data:
categories_value = contact_data["categories"]
if isinstance(categories_value, list):
categories_value = ",".join(categories_value)
updated_lines.append(f"CATEGORIES:{categories_value}")
updated_properties.add("categories")
elif property_name == "ORG" and (
"org" in contact_data or "organization" in contact_data
):
org_value = contact_data.get("org") or contact_data.get(
"organization"
)
updated_lines.append(f"ORG:{org_value}")
updated_properties.add("org")
elif property_name == "TITLE" and "title" in contact_data:
updated_lines.append(f"TITLE:{contact_data['title']}")
updated_properties.add("title")
else:
# Keep all other properties unchanged (preserves all extended/custom fields)
updated_lines.append(line)
# Add any new properties that weren't in the original vCard
for key, value in contact_data.items():
if key not in updated_properties:
if key == "fn":
updated_lines.append(f"FN:{value}")
elif key == "email" and isinstance(value, str):
updated_lines.append(f"EMAIL:{value}")
elif key == "tel" and isinstance(value, str):
updated_lines.append(f"TEL:{value}")
elif key == "note":
updated_lines.append(f"NOTE:{value}")
elif key == "nickname":
nickname_value = (
value if isinstance(value, str) else ",".join(value)
)
updated_lines.append(f"NICKNAME:{nickname_value}")
elif key == "bday":
updated_lines.append(f"BDAY:{value}")
elif key == "categories":
categories_value = (
value if isinstance(value, str) else ",".join(value)
)
updated_lines.append(f"CATEGORIES:{categories_value}")
elif key in ["org", "organization"]:
updated_lines.append(f"ORG:{value}")
elif key == "title":
updated_lines.append(f"TITLE:{value}")
# Add the END:VCARD line
updated_lines.append("END:VCARD")
# Join all lines
return "\n".join(updated_lines)
except Exception as e:
logger.error(f"Error merging vCard properties: {e}")
# Fallback to creating basic vCard matching Nextcloud format
basic_vcard = f"""BEGIN:VCARD
VERSION:3.0
UID:{uid}
FN:{contact_data.get("fn", "Unknown")}"""
if "email" in contact_data:
basic_vcard += f"\nEMAIL:{contact_data['email']}"
if "tel" in contact_data:
basic_vcard += f"\nTEL:{contact_data['tel']}"
basic_vcard += "\nEND:VCARD"
return basic_vcard
+15 -2
View File
@@ -18,8 +18,21 @@ class NotesClient(BaseNextcloudClient):
async def get_all_notes(self) -> List[Dict[str, Any]]:
"""Get all notes."""
response = await self._make_request("GET", "/apps/notes/api/v1/notes")
return response.json()
notes = []
cursor = ""
while True:
response = await self._make_request(
"GET",
"/apps/notes/api/v1/notes",
params={"chunkSize": 50, "chunkCursor": cursor},
)
notes.extend(response.json())
if "X-Notes-Chunk-Cursor" not in response.headers:
break
cursor = response.headers["X-Notes-Chunk-Cursor"]
return notes
async def get_note(self, note_id: int) -> Dict[str, Any]:
"""Get a specific note by ID."""
+12 -13
View File
@@ -31,7 +31,7 @@ class WebDAVClient(BaseNextcloudClient):
# First try a PROPFIND to verify resource exists
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
try:
propfind_resp = await self._client.request(
propfind_resp = await self._make_request(
"PROPFIND", webdav_path, headers=propfind_headers
)
logger.debug(
@@ -44,8 +44,7 @@ class WebDAVClient(BaseNextcloudClient):
# For other errors, continue with deletion attempt
# Proceed with deletion
response = await self._client.delete(webdav_path, headers=headers)
response.raise_for_status()
response = await self._make_request("DELETE", webdav_path, headers=headers)
logger.debug(f"Successfully deleted WebDAV resource '{path}'")
return {"status_code": response.status_code}
@@ -127,7 +126,7 @@ class WebDAVClient(BaseNextcloudClient):
# First check if we can access WebDAV at all
notes_dir_path = f"{webdav_base}/Notes"
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
notes_dir_response = await self._client.request(
notes_dir_response = await self._make_request(
"PROPFIND", notes_dir_path, headers=propfind_headers
)
@@ -146,7 +145,7 @@ class WebDAVClient(BaseNextcloudClient):
# Ensure the parent directory exists using MKCOL
mkcol_headers = {"OCS-APIRequest": "true"}
mkcol_response = await self._client.request(
mkcol_response = await self._make_request(
"MKCOL", parent_dir_path, headers=mkcol_headers
)
@@ -158,8 +157,8 @@ class WebDAVClient(BaseNextcloudClient):
mkcol_response.raise_for_status()
# Proceed with the PUT request
response = await self._client.put(
attachment_path, content=content, headers=headers
response = await self._make_request(
"PUT", attachment_path, content=content, headers=headers
)
response.raise_for_status()
logger.debug(
@@ -190,7 +189,7 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Fetching attachment '{filename}' for note {note_id}")
try:
response = await self._client.get(attachment_path)
response = await self._make_request("GET", attachment_path)
response.raise_for_status()
content = response.content
@@ -237,7 +236,7 @@ class WebDAVClient(BaseNextcloudClient):
headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
try:
response = await self._client.request(
response = await self._make_request(
"PROPFIND", webdav_path, content=propfind_body, headers=headers
)
response.raise_for_status()
@@ -320,7 +319,7 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Reading file: {path}")
try:
response = await self._client.get(webdav_path)
response = await self._make_request("GET", webdav_path)
response.raise_for_status()
content = response.content
@@ -354,8 +353,8 @@ class WebDAVClient(BaseNextcloudClient):
headers = {"Content-Type": content_type, "OCS-APIRequest": "true"}
try:
response = await self._client.put(
webdav_path, content=content, headers=headers
response = await self._make_request(
"PUT", webdav_path, content=content, headers=headers
)
response.raise_for_status()
@@ -382,7 +381,7 @@ class WebDAVClient(BaseNextcloudClient):
headers = {"OCS-APIRequest": "true"}
try:
response = await self._client.request("MKCOL", webdav_path, headers=headers)
response = await self._make_request("MKCOL", webdav_path, headers=headers)
response.raise_for_status()
logger.debug(f"Successfully created directory '{path}'")
+144
View File
@@ -0,0 +1,144 @@
"""Pydantic models for structured MCP server responses."""
# Base models
from .base import (
BaseResponse,
ErrorResponse,
SuccessResponse,
IdResponse,
StatusResponse,
)
# Notes models
from .notes import (
Note,
NoteSearchResult,
NotesSettings,
CreateNoteResponse,
UpdateNoteResponse,
DeleteNoteResponse,
AppendContentResponse,
SearchNotesResponse,
)
# Calendar models
from .calendar import (
Calendar,
CalendarEvent,
CalendarEventSummary,
CreateEventResponse,
UpdateEventResponse,
DeleteEventResponse,
ListEventsResponse,
ListCalendarsResponse,
AvailabilitySlot,
FindAvailabilityResponse,
BulkOperationResult,
BulkOperationResponse,
CreateMeetingResponse,
UpcomingEventsResponse,
ManageCalendarResponse,
)
# Contacts models
from .contacts import (
AddressBook,
Contact,
ContactField,
ListAddressBooksResponse,
ListContactsResponse,
CreateContactResponse,
UpdateContactResponse,
DeleteContactResponse,
CreateAddressBookResponse,
DeleteAddressBookResponse,
)
# Tables models
from .tables import (
Table,
TableColumn,
TableRow,
TableView,
TableSchema,
ListTablesResponse,
GetSchemaResponse,
ReadTableResponse,
CreateRowResponse,
UpdateRowResponse,
DeleteRowResponse,
)
# WebDAV models
from .webdav import (
FileInfo,
DirectoryListing,
ReadFileResponse,
WriteFileResponse,
CreateDirectoryResponse,
DeleteResourceResponse,
)
__all__ = [
# Base models
"BaseResponse",
"ErrorResponse",
"SuccessResponse",
"IdResponse",
"StatusResponse",
# Notes models
"Note",
"NoteSearchResult",
"NotesSettings",
"CreateNoteResponse",
"UpdateNoteResponse",
"DeleteNoteResponse",
"AppendContentResponse",
"SearchNotesResponse",
# Calendar models
"Calendar",
"CalendarEvent",
"CalendarEventSummary",
"CreateEventResponse",
"UpdateEventResponse",
"DeleteEventResponse",
"ListEventsResponse",
"ListCalendarsResponse",
"AvailabilitySlot",
"FindAvailabilityResponse",
"BulkOperationResult",
"BulkOperationResponse",
"CreateMeetingResponse",
"UpcomingEventsResponse",
"ManageCalendarResponse",
# Contacts models
"AddressBook",
"Contact",
"ContactField",
"ListAddressBooksResponse",
"ListContactsResponse",
"CreateContactResponse",
"UpdateContactResponse",
"DeleteContactResponse",
"CreateAddressBookResponse",
"DeleteAddressBookResponse",
# Tables models
"Table",
"TableColumn",
"TableRow",
"TableView",
"TableSchema",
"ListTablesResponse",
"GetSchemaResponse",
"ReadTableResponse",
"CreateRowResponse",
"UpdateRowResponse",
"DeleteRowResponse",
# WebDAV models
"FileInfo",
"DirectoryListing",
"ReadFileResponse",
"WriteFileResponse",
"CreateDirectoryResponse",
"DeleteResourceResponse",
]
+50
View File
@@ -0,0 +1,50 @@
"""Base Pydantic models for common response patterns."""
from datetime import datetime
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, Field
class BaseResponse(BaseModel):
"""Base response model for all MCP tool responses."""
model_config = {"json_encoders": {datetime: lambda v: v.isoformat()}}
success: bool = Field(
default=True, description="Whether the operation was successful"
)
timestamp: datetime = Field(
default_factory=datetime.now, description="Response timestamp"
)
class ErrorResponse(BaseResponse):
"""Response model for error cases."""
success: bool = Field(default=False, description="Always False for error responses")
error: str = Field(description="Error message")
error_code: Optional[str] = Field(None, description="Optional error code")
details: Optional[Dict[str, Any]] = Field(
None, description="Additional error details"
)
class SuccessResponse(BaseResponse):
"""Generic success response."""
message: Optional[str] = Field(None, description="Optional success message")
data: Optional[Dict[str, Any]] = Field(None, description="Optional response data")
class IdResponse(BaseResponse):
"""Response model for operations that return a new ID."""
id: Union[int, str] = Field(description="ID of the created or affected resource")
class StatusResponse(BaseResponse):
"""Response model for operations that return just a status."""
status_code: Optional[int] = Field(None, description="HTTP status code")
message: Optional[str] = Field(None, description="Status message")
+182
View File
@@ -0,0 +1,182 @@
"""Pydantic models for Calendar app responses."""
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, StatusResponse
class Calendar(BaseModel):
"""Model for a Nextcloud calendar."""
name: str = Field(description="Calendar name/ID")
display_name: str = Field(description="Calendar display name")
description: Optional[str] = Field(None, description="Calendar description")
color: Optional[str] = Field(None, description="Calendar color")
href: Optional[str] = Field(None, description="Calendar DAV href")
timezone: Optional[str] = Field(None, description="Calendar timezone")
enabled: bool = Field(default=True, description="Whether calendar is enabled")
ctag: Optional[str] = Field(None, description="Calendar tag for synchronization")
class CalendarEventSummary(BaseModel):
"""Model for calendar event summary (for lists)."""
uid: str = Field(description="Event UID")
summary: str = Field(description="Event summary/title")
start: str = Field(description="Event start datetime (ISO format)")
end: Optional[str] = Field(None, description="Event end datetime (ISO format)")
all_day: bool = Field(default=False, description="Whether event is all-day")
location: Optional[str] = Field(None, description="Event location")
description: Optional[str] = Field(None, description="Event description")
categories: List[str] = Field(default_factory=list, description="Event categories")
status: Optional[str] = Field(
None, description="Event status (CONFIRMED, TENTATIVE, CANCELLED)"
)
class CalendarEvent(CalendarEventSummary):
"""Model for a complete calendar event."""
created: Optional[str] = Field(None, description="Event creation datetime")
last_modified: Optional[str] = Field(None, description="Last modification datetime")
recurring: bool = Field(default=False, description="Whether event is recurring")
recurrence_rule: Optional[str] = Field(None, description="RFC5545 recurrence rule")
recurrence_end: Optional[str] = Field(None, description="Recurrence end date")
attendees: List[str] = Field(
default_factory=list, description="List of attendee email addresses"
)
organizer: Optional[str] = Field(None, description="Event organizer")
priority: Optional[int] = Field(None, description="Event priority (1-9)")
privacy: Optional[str] = Field(None, description="Event privacy level")
url: Optional[str] = Field(None, description="Event URL")
duration_minutes: Optional[int] = Field(
None, description="Event duration in minutes"
)
reminder_minutes: Optional[int] = Field(
None, description="Reminder time in minutes before event"
)
reminder_email: bool = Field(
default=False, description="Whether to send email reminder"
)
color: Optional[str] = Field(None, description="Event color")
etag: Optional[str] = Field(None, description="ETag for versioning")
class CreateEventResponse(BaseResponse):
"""Response model for event creation."""
event: CalendarEvent = Field(description="The created event")
calendar_name: str = Field(
description="Name of the calendar the event was created in"
)
class UpdateEventResponse(BaseResponse):
"""Response model for event updates."""
event: CalendarEvent = Field(description="The updated event")
calendar_name: str = Field(description="Name of the calendar the event belongs to")
class DeleteEventResponse(StatusResponse):
"""Response model for event deletion."""
deleted_uid: str = Field(description="UID of the deleted event")
calendar_name: str = Field(
description="Name of the calendar the event was deleted from"
)
class ListEventsResponse(BaseResponse):
"""Response model for listing events."""
events: List[CalendarEventSummary] = Field(description="List of events")
calendar_name: Optional[str] = Field(
None, description="Calendar name (if filtered to one calendar)"
)
start_date: Optional[str] = Field(None, description="Start date filter applied")
end_date: Optional[str] = Field(None, description="End date filter applied")
total_found: int = Field(description="Total number of events found")
class ListCalendarsResponse(BaseResponse):
"""Response model for listing calendars."""
calendars: List[Calendar] = Field(description="List of available calendars")
total_count: int = Field(description="Total number of calendars")
class AvailabilitySlot(BaseModel):
"""Model for an available time slot."""
start: str = Field(description="Slot start datetime (ISO format)")
end: str = Field(description="Slot end datetime (ISO format)")
duration_minutes: int = Field(description="Slot duration in minutes")
date: str = Field(description="Date of the slot (YYYY-MM-DD)")
class FindAvailabilityResponse(BaseResponse):
"""Response model for finding availability."""
available_slots: List[AvailabilitySlot] = Field(
description="List of available time slots"
)
duration_requested: int = Field(description="Requested duration in minutes")
date_range_start: str = Field(description="Start date of search range")
date_range_end: str = Field(description="End date of search range")
attendees_checked: List[str] = Field(
default_factory=list, description="Attendees checked for availability"
)
business_hours_only: bool = Field(
description="Whether search was limited to business hours"
)
class BulkOperationResult(BaseModel):
"""Model for bulk operation results."""
operation: str = Field(description="Operation performed (update, delete, move)")
events_processed: int = Field(description="Number of events processed")
events_successful: int = Field(
description="Number of events successfully processed"
)
events_failed: int = Field(description="Number of events that failed processing")
failed_events: List[str] = Field(
default_factory=list, description="UIDs of events that failed"
)
errors: List[str] = Field(default_factory=list, description="Error messages")
class BulkOperationResponse(BaseResponse):
"""Response model for bulk operations."""
result: BulkOperationResult = Field(description="Bulk operation result")
class CreateMeetingResponse(CreateEventResponse):
"""Response model for meeting creation (same as event creation)."""
pass
class UpcomingEventsResponse(BaseResponse):
"""Response model for upcoming events."""
events: List[CalendarEventSummary] = Field(description="List of upcoming events")
days_ahead: int = Field(description="Number of days ahead searched")
calendar_name: Optional[str] = Field(
None, description="Calendar name (if filtered to one calendar)"
)
class ManageCalendarResponse(BaseResponse):
"""Response model for calendar management operations."""
action: str = Field(description="Action performed (create, delete, update, list)")
calendar: Optional[Calendar] = Field(None, description="Calendar that was affected")
calendars: Optional[List[Calendar]] = Field(
None, description="List of calendars (for list action)"
)
message: str = Field(description="Success message")
+130
View File
@@ -0,0 +1,130 @@
"""Pydantic models for Contacts app responses."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, StatusResponse
class AddressBook(BaseModel):
"""Model for a Nextcloud address book."""
uri: str = Field(description="Address book URI")
displayname: str = Field(description="Address book display name")
description: Optional[str] = Field(None, description="Address book description")
ctag: Optional[str] = Field(
None, description="Address book tag for synchronization"
)
class ContactField(BaseModel):
"""Model for a contact field (email, phone, etc.)."""
type: str = Field(description="Field type (e.g., 'email', 'phone', 'address')")
value: str = Field(description="Field value")
label: Optional[str] = Field(None, description="Field label (e.g., 'work', 'home')")
preferred: bool = Field(
default=False, description="Whether this is the preferred field of this type"
)
class Contact(BaseModel):
"""Model for a Nextcloud contact."""
uid: str = Field(description="Contact UID")
fn: str = Field(description="Full name (formatted name)")
given_name: Optional[str] = Field(None, description="Given name")
family_name: Optional[str] = Field(None, description="Family name")
organization: Optional[str] = Field(None, description="Organization")
title: Optional[str] = Field(None, description="Job title")
emails: List[ContactField] = Field(
default_factory=list, description="Email addresses"
)
phones: List[ContactField] = Field(
default_factory=list, description="Phone numbers"
)
addresses: List[ContactField] = Field(default_factory=list, description="Addresses")
urls: List[ContactField] = Field(default_factory=list, description="URLs")
note: Optional[str] = Field(None, description="Notes")
photo: Optional[str] = Field(None, description="Photo URL or base64 data")
birthday: Optional[str] = Field(None, description="Birthday (ISO date format)")
categories: List[str] = Field(
default_factory=list, description="Contact categories"
)
custom_fields: Dict[str, Any] = Field(
default_factory=dict, description="Custom fields"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
@property
def primary_email(self) -> Optional[str]:
"""Get the primary email address."""
if not self.emails:
return None
# Return preferred email if available, otherwise first email
preferred = next(
(email.value for email in self.emails if email.preferred), None
)
return preferred or self.emails[0].value
@property
def primary_phone(self) -> Optional[str]:
"""Get the primary phone number."""
if not self.phones:
return None
# Return preferred phone if available, otherwise first phone
preferred = next(
(phone.value for phone in self.phones if phone.preferred), None
)
return preferred or self.phones[0].value
class ListAddressBooksResponse(BaseResponse):
"""Response model for listing address books."""
addressbooks: List[AddressBook] = Field(
description="List of available address books"
)
total_count: int = Field(description="Total number of address books")
class ListContactsResponse(BaseResponse):
"""Response model for listing contacts."""
contacts: List[Contact] = Field(description="List of contacts")
addressbook: str = Field(description="Address book name")
total_count: int = Field(description="Total number of contacts")
class CreateContactResponse(BaseResponse):
"""Response model for contact creation."""
contact: Contact = Field(description="The created contact")
addressbook: str = Field(description="Address book the contact was created in")
class UpdateContactResponse(BaseResponse):
"""Response model for contact updates."""
contact: Contact = Field(description="The updated contact")
addressbook: str = Field(description="Address book the contact belongs to")
class DeleteContactResponse(StatusResponse):
"""Response model for contact deletion."""
deleted_uid: str = Field(description="UID of the deleted contact")
addressbook: str = Field(description="Address book the contact was deleted from")
class CreateAddressBookResponse(BaseResponse):
"""Response model for address book creation."""
addressbook: AddressBook = Field(description="The created address book")
class DeleteAddressBookResponse(StatusResponse):
"""Response model for address book deletion."""
deleted_name: str = Field(description="Name of the deleted address book")
+77
View File
@@ -0,0 +1,77 @@
"""Pydantic models for Notes app responses."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, IdResponse, StatusResponse
class Note(BaseModel):
"""Model for a Nextcloud note."""
id: int = Field(description="Note ID")
title: str = Field(description="Note title")
content: str = Field(description="Note content in markdown")
category: str = Field(default="", description="Note category")
modified: int = Field(description="Unix timestamp of last modification")
favorite: bool = Field(
default=False, description="Whether note is marked as favorite"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
readonly: bool = Field(default=False, description="Whether note is read-only")
@property
def modified_datetime(self) -> datetime:
"""Convert Unix timestamp to datetime."""
return datetime.fromtimestamp(self.modified)
class NoteSearchResult(BaseModel):
"""Model for note search results (limited fields)."""
id: int = Field(description="Note ID")
title: str = Field(description="Note title")
category: str = Field(default="", description="Note category")
score: Optional[float] = Field(None, description="Search relevance score")
class NotesSettings(BaseModel):
"""Model for Notes app settings."""
notesPath: str = Field(description="Path to notes directory")
fileSuffix: str = Field(description="File suffix for notes")
noteMode: str = Field(description="Note mode setting")
class CreateNoteResponse(IdResponse):
"""Response model for note creation."""
note: Note = Field(description="The created note")
class UpdateNoteResponse(BaseResponse):
"""Response model for note updates."""
note: Note = Field(description="The updated note")
class DeleteNoteResponse(StatusResponse):
"""Response model for note deletion."""
deleted_id: int = Field(description="ID of the deleted note")
class AppendContentResponse(BaseResponse):
"""Response model for appending content to a note."""
note: Note = Field(description="The updated note after appending content")
class SearchNotesResponse(BaseResponse):
"""Response model for note search."""
results: List[NoteSearchResult] = Field(description="Search results")
query: str = Field(description="The search query used")
total_found: int = Field(description="Total number of notes found")
+142
View File
@@ -0,0 +1,142 @@
"""Pydantic models for Tables app responses."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, IdResponse, StatusResponse
class TableColumn(BaseModel):
"""Model for a table column definition."""
id: int = Field(description="Column ID")
title: str = Field(description="Column title")
type: str = Field(description="Column type (text, number, datetime, etc.)")
subtype: Optional[str] = Field(None, description="Column subtype")
mandatory: bool = Field(default=False, description="Whether column is mandatory")
description: Optional[str] = Field(None, description="Column description")
text_default: Optional[str] = Field(None, description="Default text value")
text_allowed_pattern: Optional[str] = Field(
None, description="Allowed text pattern"
)
text_max_length: Optional[int] = Field(None, description="Maximum text length")
number_default: Optional[float] = Field(None, description="Default number value")
number_min: Optional[float] = Field(None, description="Minimum number value")
number_max: Optional[float] = Field(None, description="Maximum number value")
number_decimals: Optional[int] = Field(None, description="Number of decimal places")
datetime_default: Optional[str] = Field(None, description="Default datetime value")
selection_options: List[str] = Field(
default_factory=list, description="Selection options"
)
selection_default: Optional[str] = Field(
None, description="Default selection value"
)
class TableRow(BaseModel):
"""Model for a table row."""
id: int = Field(description="Row ID")
created_by: Optional[str] = Field(None, description="User who created the row")
created_at: Optional[str] = Field(None, description="Row creation timestamp")
last_edit_by: Optional[str] = Field(
None, description="User who last edited the row"
)
last_edit_at: Optional[str] = Field(None, description="Last edit timestamp")
data: Dict[int, Any] = Field(description="Row data keyed by column ID")
class TableView(BaseModel):
"""Model for a table view."""
id: int = Field(description="View ID")
title: str = Field(description="View title")
emoji: Optional[str] = Field(None, description="View emoji")
description: Optional[str] = Field(None, description="View description")
columns: List[int] = Field(
default_factory=list, description="List of column IDs in this view"
)
sort: List[Dict[str, Any]] = Field(
default_factory=list, description="Sort configuration"
)
filter: List[Dict[str, Any]] = Field(
default_factory=list, description="Filter configuration"
)
class Table(BaseModel):
"""Model for a Nextcloud table."""
id: int = Field(description="Table ID")
title: str = Field(description="Table title")
emoji: Optional[str] = Field(None, description="Table emoji")
ownership: str = Field(description="Table ownership")
owner_display_name: str = Field(description="Display name of table owner")
created_by: Optional[str] = Field(None, description="User who created the table")
created_at: Optional[str] = Field(None, description="Table creation timestamp")
last_edit_by: Optional[str] = Field(
None, description="User who last edited the table"
)
last_edit_at: Optional[str] = Field(None, description="Last edit timestamp")
row_count: int = Field(default=0, description="Number of rows in the table")
has_shares: bool = Field(default=False, description="Whether table is shared")
archived: bool = Field(default=False, description="Whether table is archived")
is_shared: bool = Field(
default=False, description="Whether table is shared with current user"
)
on_share_permissions: Optional[Dict[str, Any]] = Field(
None, description="Share permissions"
)
class TableSchema(BaseModel):
"""Model for complete table schema including columns and views."""
table: Table = Field(description="Table information")
columns: List[TableColumn] = Field(description="Table columns")
views: List[TableView] = Field(description="Table views")
class ListTablesResponse(BaseResponse):
"""Response model for listing tables."""
tables: List[Table] = Field(description="List of available tables")
total_count: int = Field(description="Total number of tables")
class GetSchemaResponse(BaseResponse):
"""Response model for getting table schema."""
table_schema: TableSchema = Field(description="Table schema information")
class ReadTableResponse(BaseResponse):
"""Response model for reading table rows."""
rows: List[TableRow] = Field(description="Table rows")
table_id: int = Field(description="Table ID")
total_count: Optional[int] = Field(
None, description="Total number of rows (if known)"
)
offset: Optional[int] = Field(None, description="Offset used for pagination")
limit: Optional[int] = Field(None, description="Limit used for pagination")
class CreateRowResponse(IdResponse):
"""Response model for row creation."""
row: TableRow = Field(description="The created row")
table_id: int = Field(description="Table ID the row was created in")
class UpdateRowResponse(BaseResponse):
"""Response model for row updates."""
row: TableRow = Field(description="The updated row")
class DeleteRowResponse(StatusResponse):
"""Response model for row deletion."""
deleted_id: int = Field(description="ID of the deleted row")
+88
View File
@@ -0,0 +1,88 @@
"""Pydantic models for WebDAV responses."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, StatusResponse
class FileInfo(BaseModel):
"""Model for file/directory information."""
name: str = Field(description="File/directory name")
path: str = Field(description="Full path")
is_directory: bool = Field(description="Whether this is a directory")
size: Optional[int] = Field(
None, description="File size in bytes (None for directories)"
)
content_type: Optional[str] = Field(None, description="MIME content type")
last_modified: Optional[str] = Field(
None, description="Last modification time (ISO format)"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
@property
def last_modified_datetime(self) -> Optional[datetime]:
"""Convert last modified string to datetime."""
if not self.last_modified:
return None
try:
return datetime.fromisoformat(self.last_modified.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
class DirectoryListing(BaseResponse):
"""Response model for directory listings."""
path: str = Field(description="Directory path")
items: List[FileInfo] = Field(description="Files and directories in the path")
total_count: int = Field(description="Total number of items")
directories_count: int = Field(description="Number of directories")
files_count: int = Field(description="Number of files")
total_size: int = Field(default=0, description="Total size of all files in bytes")
class ReadFileResponse(BaseResponse):
"""Response model for reading file contents."""
path: str = Field(description="File path")
content: str = Field(description="File content (text or base64 for binary)")
content_type: str = Field(description="MIME content type")
size: int = Field(description="File size in bytes")
encoding: Optional[str] = Field(
None, description="Encoding used (e.g., 'base64' for binary files)"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
last_modified: Optional[str] = Field(None, description="Last modification time")
class WriteFileResponse(StatusResponse):
"""Response model for writing files."""
path: str = Field(description="File path that was written")
size: Optional[int] = Field(None, description="Size of the written file")
created: bool = Field(description="Whether a new file was created (vs overwritten)")
class CreateDirectoryResponse(StatusResponse):
"""Response model for directory creation."""
path: str = Field(description="Directory path that was created")
created: bool = Field(
description="Whether directory was created or already existed"
)
class DeleteResourceResponse(StatusResponse):
"""Response model for resource deletion."""
path: str = Field(description="Path that was deleted")
was_directory: bool = Field(
description="Whether the deleted resource was a directory"
)
items_deleted: Optional[int] = Field(
None, description="Number of items deleted (for directories)"
)
+9 -2
View File
@@ -5,6 +5,10 @@ from typing import Optional
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.models.calendar import (
Calendar,
ListCalendarsResponse,
)
logger = logging.getLogger(__name__)
@@ -12,10 +16,13 @@ logger = logging.getLogger(__name__)
def configure_calendar_tools(mcp: FastMCP):
# Calendar tools
@mcp.tool()
async def nc_calendar_list_calendars(ctx: Context):
async def nc_calendar_list_calendars(ctx: Context) -> ListCalendarsResponse:
"""List all available calendars for the user"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.calendar.list_calendars()
calendars_data = await client.calendar.list_calendars()
calendars = [Calendar(**cal_data) for cal_data in calendars_data]
return ListCalendarsResponse(calendars=calendars, total_count=len(calendars))
@mcp.tool()
async def nc_calendar_create_event(
+18 -1
View File
@@ -17,7 +17,7 @@ def configure_contacts_tools(mcp: FastMCP):
@mcp.tool()
async def nc_contacts_list_contacts(ctx: Context, *, addressbook: str):
"""List all addressbooks for the user."""
"""List all contacts in the specified addressbook."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.list_contacts(addressbook=addressbook)
@@ -63,3 +63,20 @@ def configure_contacts_tools(mcp: FastMCP):
"""Delete a contact."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.delete_contact(addressbook=addressbook, uid=uid)
@mcp.tool()
async def nc_contacts_update_contact(
ctx: Context, *, addressbook: str, uid: str, contact_data: dict, etag: str = ""
):
"""Update an existing contact while preserving all existing properties.
Args:
addressbook: The name of the addressbook containing the contact.
uid: The unique ID of the contact to update.
contact_data: A dictionary with the contact's updated details, e.g. {"fn": "Jane Doe", "email": "jane.doe@example.com"}.
etag: Optional ETag for optimistic concurrency control.
"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.update_contact(
addressbook=addressbook, uid=uid, contact_data=contact_data, etag=etag
)
+167 -24
View File
@@ -1,8 +1,20 @@
import logging
from httpx import HTTPStatusError
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.models.base import ErrorResponse
from nextcloud_mcp_server.models.notes import (
Note,
NotesSettings,
CreateNoteResponse,
UpdateNoteResponse,
DeleteNoteResponse,
AppendContentResponse,
SearchNotesResponse,
NoteSearchResult,
)
logger = logging.getLogger(__name__)
@@ -15,7 +27,8 @@ def configure_notes_tools(mcp: FastMCP):
mcp.get_context()
) # https://github.com/modelcontextprotocol/python-sdk/issues/244
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.get_settings()
settings_data = await client.notes.get_settings()
return NotesSettings(**settings_data)
@mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}")
async def nc_notes_get_attachment(note_id: int, attachment_filename: str):
@@ -38,23 +51,61 @@ def configure_notes_tools(mcp: FastMCP):
]
}
@mcp.tool()
async def nc_get_note(note_id: int, ctx: Context):
@mcp.resource("nc://Notes/{note_id}")
async def nc_get_note(note_id: int):
"""Get user note using note id"""
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
ctx: Context = mcp.get_context()
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.get_note(note_id)
try:
note_data = await client.notes.get_note(note_id)
return Note(**note_data)
except HTTPStatusError as e:
if e.response.status_code == 404:
raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
elif e.response.status_code == 403:
raise McpError(
ErrorData(code=-1, message=f"Access denied to note {note_id}")
)
else:
raise McpError(
ErrorData(
code=-1,
message=f"Failed to retrieve note {note_id}: {e.response.reason_phrase}",
)
)
@mcp.tool()
async def nc_notes_create_note(
title: str, content: str, category: str, ctx: Context
):
) -> CreateNoteResponse | ErrorResponse:
"""Create a new note"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.create_note(
title=title,
content=content,
category=category,
)
try:
note_data = await client.notes.create_note(
title=title,
content=content,
category=category,
)
note = Note(**note_data)
return CreateNoteResponse(id=note.id, note=note)
except HTTPStatusError as e:
if e.response.status_code == 403:
return ErrorResponse(
error="Access denied: insufficient permissions to create notes"
)
elif e.response.status_code == 413:
return ErrorResponse(error="Note content too large")
elif e.response.status_code == 409:
return ErrorResponse(
error=f"A note with title '{title}' already exists in this category"
)
else:
return ErrorResponse(
error=f"Failed to create note: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_update_note(
@@ -64,32 +115,124 @@ def configure_notes_tools(mcp: FastMCP):
content: str | None,
category: str | None,
ctx: Context,
):
) -> UpdateNoteResponse | ErrorResponse:
"""Update an existing note's title, content, or category"""
logger.info("Updating note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.update(
note_id=note_id,
etag=etag,
title=title,
content=content,
category=category,
)
try:
note_data = await client.notes.update(
note_id=note_id,
etag=etag,
title=title,
content=content,
category=category,
)
note = Note(**note_data)
return UpdateNoteResponse(note=note)
except HTTPStatusError as e:
if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found")
elif e.response.status_code == 412:
return ErrorResponse(
error=f"Note {note_id} has been modified by someone else. Please refresh and try again."
)
elif e.response.status_code == 403:
return ErrorResponse(
error=f"Access denied: insufficient permissions to update note {note_id}"
)
elif e.response.status_code == 413:
return ErrorResponse(error="Updated note content is too large")
else:
return ErrorResponse(
error=f"Failed to update note {note_id}: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_append_content(note_id: int, content: str, ctx: Context):
async def nc_notes_append_content(
note_id: int, content: str, ctx: Context
) -> AppendContentResponse | ErrorResponse:
"""Append content to an existing note with a clear separator"""
logger.info("Appending content to note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.append_content(note_id=note_id, content=content)
try:
note_data = await client.notes.append_content(
note_id=note_id, content=content
)
note = Note(**note_data)
return AppendContentResponse(note=note)
except HTTPStatusError as e:
if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found")
elif e.response.status_code == 403:
return ErrorResponse(
error=f"Access denied: insufficient permissions to modify note {note_id}"
)
elif e.response.status_code == 413:
return ErrorResponse(
error="Content to append would make the note too large"
)
else:
return ErrorResponse(
error=f"Failed to append content to note {note_id}: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_search_notes(query: str, ctx: Context):
async def nc_notes_search_notes(
query: str, ctx: Context
) -> SearchNotesResponse | ErrorResponse:
"""Search notes by title or content, returning only id, title, and category."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_search_notes(query=query)
try:
search_results_raw = await client.notes_search_notes(query=query)
# Convert to NoteSearchResult models, including the _score field
results = [
NoteSearchResult(
id=result["id"],
title=result["title"],
category=result["category"],
score=result.get("_score"), # Include search score if available
)
for result in search_results_raw
]
return SearchNotesResponse(
results=results, query=query, total_found=len(results)
)
except HTTPStatusError as e:
if e.response.status_code == 403:
return ErrorResponse(
error="Access denied: insufficient permissions to search notes"
)
elif e.response.status_code == 400:
return ErrorResponse(error="Invalid search query format")
else:
return ErrorResponse(
error=f"Search failed: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_delete_note(note_id: int, ctx: Context):
async def nc_notes_delete_note(
note_id: int, ctx: Context
) -> DeleteNoteResponse | ErrorResponse:
"""Delete a note permanently"""
logger.info("Deleting note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.delete_note(note_id)
try:
await client.notes.delete_note(note_id)
return DeleteNoteResponse(
status_code=200,
message=f"Note {note_id} deleted successfully",
deleted_id=note_id,
)
except HTTPStatusError as e:
if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found")
elif e.response.status_code == 403:
return ErrorResponse(
error=f"Access denied: insufficient permissions to delete note {note_id}"
)
else:
return ErrorResponse(
error=f"Failed to delete note {note_id}: server error ({e.response.status_code})"
)
+2 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.6.1"
version = "0.7.2"
description = ""
authors = [
{name = "Chris Coutinho",email = "chris@coutinho.io"}
@@ -13,6 +13,7 @@ dependencies = [
"pillow (>=11.2.1,<12.0.0)",
"icalendar (>=6.0.0,<7.0.0)",
"pythonvcard4>=0.2.0",
"pydantic>=2.11.4",
]
[tool.pytest.ini_options]
+1 -1
View File
@@ -13,7 +13,7 @@ from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
async def nc_client() -> AsyncGenerator[NextcloudClient, Any]:
"""
Fixture to create a NextcloudClient instance for integration tests.
+186
View File
@@ -0,0 +1,186 @@
"""Test error propagation in the MCP server for various error scenarios."""
import logging
from mcp import ClientSession
from mcp.shared.exceptions import McpError
import pytest
logger = logging.getLogger(__name__)
@pytest.mark.integration
async def test_missing_note_resource_error(nc_mcp_client: ClientSession):
"""Test that accessing a non-existent note resource returns proper error."""
# Try to get a non-existent note via resource - should raise McpError with improved message
with pytest.raises(McpError, match=r"Note 999999 not found"):
await nc_mcp_client.read_resource("nc://Notes/999999")
@pytest.mark.integration
async def test_delete_missing_note_tool_error(nc_mcp_client: ClientSession):
"""Test that deleting a non-existent note returns proper error."""
# Try to delete a non-existent note
response = await nc_mcp_client.call_tool(
"nc_notes_delete_note", {"note_id": 999999}
)
logger.info(f"Delete missing note response: {response}")
# Should return structured error response with improved message
assert response is not None
assert (
response.isError is False
) # Tools now return structured responses, not MCP errors
# Check structured content for error
assert "success" in response.structuredContent["result"]
assert response.structuredContent["result"]["success"] is False
assert "Note 999999 not found" in response.structuredContent["result"]["error"]
@pytest.mark.integration
async def test_search_with_empty_query(nc_mcp_client: ClientSession):
"""Test search behavior with empty query."""
# Search with empty query
response = await nc_mcp_client.call_tool("nc_notes_search_notes", {"query": ""})
logger.info(f"Empty search query response: {response}")
# Should return successful response with empty or valid results
assert response is not None
assert response.isError is False
@pytest.mark.integration
async def test_tool_missing_required_parameters(nc_mcp_client: ClientSession):
"""Test calling a tool with missing required parameters."""
# Try to create note with missing parameters
response = await nc_mcp_client.call_tool(
"nc_notes_create_note",
{"title": "Test"}, # Missing content and category
)
logger.info(f"Missing params response: {response}")
# Should return error response for missing required parameters
assert response is not None
assert response.isError is True
assert (
"required" in response.content[0].text.lower()
or "missing" in response.content[0].text.lower()
)
@pytest.mark.integration
async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_client):
"""Test updating a note with invalid ETag."""
# First create a note
note_data = await nc_client.notes.create_note(
title="Test Note for ETag", content="Test content", category=""
)
note_id = note_data["id"]
try:
# Try to update with invalid ETag
response = await nc_mcp_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": "invalid-etag",
"title": "Updated Title",
"content": None,
"category": None,
},
)
logger.info(f"Invalid ETag response: {response}")
# Should return structured error response with improved message
assert response is not None
assert response.isError is False # Tools now return structured responses
assert "success" in response.structuredContent["result"]
assert response.structuredContent["result"]["success"] is False
assert (
"modified by someone else" in response.structuredContent["result"]["error"]
)
finally:
# Clean up
await nc_client.notes.delete_note(note_id)
@pytest.mark.integration
async def test_calendar_missing_calendar_error(nc_mcp_client: ClientSession):
"""Test calendar operations with non-existent calendar."""
# Try to create event in non-existent calendar
response = await nc_mcp_client.call_tool(
"nc_calendar_create_event",
{
"calendar_name": "non-existent-calendar",
"title": "Test Event",
"start_datetime": "2025-01-15T14:00:00",
},
)
logger.info(f"Non-existent calendar response: {response}")
# Should return structured error response
assert response is not None
# Note: Some modules may not have improved error handling yet
# Check if we have structured content with success=false or isError=true
if (
hasattr(response, "structuredContent")
and response.structuredContent
and "result" in response.structuredContent
):
assert response.structuredContent["result"]["success"] is False
else:
assert response.isError is True
@pytest.mark.integration
async def test_webdav_read_missing_file_error(nc_mcp_client: ClientSession):
"""Test WebDAV operations with non-existent file."""
# Try to read a non-existent file
response = await nc_mcp_client.call_tool(
"nc_webdav_read_file", {"path": "non-existent-file.txt"}
)
logger.info(f"Missing file response: {response}")
# Should return structured error response
assert response is not None
# Note: Some modules may not have improved error handling yet
# Check if we have structured content with success=false or isError=true
if (
hasattr(response, "structuredContent")
and response.structuredContent
and "result" in response.structuredContent
):
assert response.structuredContent["result"]["success"] is False
else:
assert response.isError is True
@pytest.mark.integration
async def test_tables_missing_table_error(nc_mcp_client: ClientSession):
"""Test Tables operations with non-existent table."""
# Try to get schema of non-existent table
response = await nc_mcp_client.call_tool(
"nc_tables_get_schema", {"table_id": 999999}
)
logger.info(f"Missing table response: {response}")
# Should return structured error response
assert response is not None
# Note: Some modules may not have improved error handling yet
# Check if we have structured content with success=false or isError=true
if (
hasattr(response, "structuredContent")
and response.structuredContent
and "result" in response.structuredContent
):
assert response.structuredContent["result"]["success"] is False
else:
assert response.isError is True
@@ -0,0 +1,436 @@
"""Integration tests for CalDAV and CardDAV field preservation.
This test module demonstrates data loss issues when non-supported fields
are present in calendar events and contacts during round-trip operations.
"""
import logging
import pytest
import uuid
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
@pytest.mark.integration
async def test_calendar_event_custom_fields_preservation(nc_client):
"""Test that demonstrates loss of non-supported iCal fields during round-trip operations."""
calendar_name = "personal"
# Create an event with standard fields
event_data = {
"title": "Test Event with Custom Fields",
"description": "Event to test custom field preservation",
"start_datetime": (datetime.now() + timedelta(days=1)).isoformat(),
"end_datetime": (datetime.now() + timedelta(days=1, hours=1)).isoformat(),
"location": "Test Location",
}
# Create the event
result = await nc_client.calendar.create_event(calendar_name, event_data)
event_uid = result["uid"]
try:
# Now manually inject a custom iCal property by creating a new version with raw iCal
# This simulates what would happen if the event was created by another CalDAV client
# with extended properties
custom_ical = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Test Client//EN
BEGIN:VEVENT
UID:{event_uid}
DTSTART:{(datetime.now() + timedelta(days=1)).strftime("%Y%m%dT%H%M%SZ")}
DTEND:{(datetime.now() + timedelta(days=1, hours=1)).strftime("%Y%m%dT%H%M%SZ")}
SUMMARY:Test Event with Custom Fields
DESCRIPTION:Event to test custom field preservation
LOCATION:Test Location
X-CUSTOM-FIELD:This is a custom field that should be preserved
X-VENDOR-SPECIFIC:Vendor specific data
CATEGORIES:work,testing
STATUS:CONFIRMED
PRIORITY:5
CLASS:PUBLIC
CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VEVENT
END:VCALENDAR"""
# Direct CalDAV PUT to inject the custom iCal
event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics"
await nc_client.calendar._make_request(
"PUT",
event_path,
content=custom_ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
logger.info(f"Injected custom iCal properties into event {event_uid}")
# Retrieve the event to confirm custom fields are present in raw iCal
response = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
raw_ical_before = response.text
logger.info("Raw iCal before update:")
logger.info(raw_ical_before)
# Verify custom fields exist in raw iCal
assert (
"X-CUSTOM-FIELD:This is a custom field that should be preserved"
in raw_ical_before
)
assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_before
# Now update the event through the MCP client (simulating normal usage)
update_data = {
"title": "Updated Test Event with Custom Fields",
"description": "Updated description - custom fields should be preserved",
}
await nc_client.calendar.update_event(calendar_name, event_uid, update_data)
logger.info(f"Updated event {event_uid} through MCP client")
# Retrieve the event again to see if custom fields survived
response_after = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
raw_ical_after = response_after.text
logger.info("Raw iCal after update:")
logger.info(raw_ical_after)
# THIS IS THE TEST THAT SHOULD FAIL - custom fields should be preserved but won't be
try:
assert (
"X-CUSTOM-FIELD:This is a custom field that should be preserved"
in raw_ical_after
), "Custom field X-CUSTOM-FIELD was lost during round-trip update"
assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_after, (
"Custom field X-VENDOR-SPECIFIC was lost during round-trip update"
)
logger.info(
"✓ Custom fields were preserved (unexpected - this should fail with current implementation)"
)
except AssertionError as e:
logger.error(f"✗ Custom fields were lost during round-trip update: {e}")
# Re-raise to show the test failure
raise
finally:
# Cleanup
try:
await nc_client.calendar.delete_event(calendar_name, event_uid)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")
@pytest.mark.integration
async def test_contact_extended_fields_preservation(nc_client):
"""Test that demonstrates loss of extended vCard fields during round-trip operations."""
addressbook_name = f"test_preservation_{uuid.uuid4().hex[:8]}"
# Create a temporary addressbook
await nc_client.contacts.create_addressbook(
name=addressbook_name, display_name="Test Preservation Addressbook"
)
try:
contact_uid = str(uuid.uuid4())
# Create a contact with minimal data first
basic_contact_data = {
"fn": "John Extended Doe",
"email": "john.extended@example.com",
}
await nc_client.contacts.create_contact(
addressbook=addressbook_name,
uid=contact_uid,
contact_data=basic_contact_data,
)
logger.info(f"Created basic contact {contact_uid}")
# Now inject a rich vCard with extended fields directly via CardDAV
extended_vcard = f"""BEGIN:VCARD
VERSION:4.0
UID:{contact_uid}
FN:John Extended Doe
N:Doe;John;Extended;;
NICKNAME:Johnny,JD
EMAIL;TYPE=work:john.work@company.com
EMAIL;TYPE=home:john.extended@example.com
TEL;TYPE=cell:+1-555-123-4567
TEL;TYPE=work:+1-555-987-6543
ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA
ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA
ORG:Example Corporation
TITLE:Senior Developer
URL;TYPE=work:https://company.com/john
URL;TYPE=personal:https://johndoe.dev
BDAY:1985-06-15
NOTE:This is a note with important information that should be preserved.
CATEGORIES:colleagues,developers,friends
X-CUSTOM-FIELD:This should be preserved
X-SKYPE:john.doe.skype
X-LINKEDIN:https://linkedin.com/in/johndoe
REV:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VCARD"""
# Direct CardDAV PUT to inject the extended vCard
contact_path = f"/remote.php/dav/addressbooks/users/{nc_client.contacts.username}/{addressbook_name}/{contact_uid}.vcf"
await nc_client.contacts._make_request(
"PUT",
contact_path,
content=extended_vcard,
headers={"Content-Type": "text/vcard; charset=utf-8"},
)
logger.info(f"Injected extended vCard for contact {contact_uid}")
# Retrieve the contact to confirm extended fields are present in raw vCard
response = await nc_client.contacts._make_request("GET", contact_path)
raw_vcard_before = response.text
logger.info("Raw vCard before any operations:")
logger.info(raw_vcard_before)
# Verify extended fields exist in raw vCard
assert "TEL;TYPE=cell:+1-555-123-4567" in raw_vcard_before
assert "ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA" in raw_vcard_before
assert "ORG:Example Corporation" in raw_vcard_before
assert "TITLE:Senior Developer" in raw_vcard_before
assert "X-CUSTOM-FIELD:This should be preserved" in raw_vcard_before
assert "X-LINKEDIN:https://linkedin.com/in/johndoe" in raw_vcard_before
assert "NOTE:This is a note with important information" in raw_vcard_before
# List contacts through the MCP client (this will parse and return limited fields)
contacts = await nc_client.contacts.list_contacts(addressbook=addressbook_name)
our_contact = next((c for c in contacts if c["vcard_id"] == contact_uid), None)
assert our_contact is not None
logger.info("Contact as parsed by MCP client:")
logger.info(our_contact)
# Check what fields are accessible through the parsed contact
parsed_contact = our_contact["contact"]
# These should be available (basic fields that are parsed)
assert parsed_contact["fullname"] == "John Extended Doe"
assert parsed_contact["email"] is not None # Some email should be present
# The raw vCard should still be available in addressdata
raw_addressdata = our_contact["addressdata"]
assert "X-CUSTOM-FIELD:This should be preserved" in raw_addressdata
assert "ORG:Example Corporation" in raw_addressdata
# The key test: Can we update this contact without losing extended field data?
logger.info("Testing contact update preservation...")
# Update the contact through the MCP client with a simple change
try:
await nc_client.contacts.update_contact(
addressbook=addressbook_name,
uid=contact_uid,
contact_data={"email": "john.updated@example.com"},
)
logger.info("✓ Contact updated successfully")
except Exception as e:
logger.error(f"✗ Failed to update contact: {e}")
raise
# Retrieve the contact again to see if extended fields survived
contacts_after = await nc_client.contacts.list_contacts(
addressbook=addressbook_name
)
updated_contact = next(
(c for c in contacts_after if c["vcard_id"] == contact_uid), None
)
assert updated_contact is not None, "Contact not found after update"
updated_addressdata = updated_contact["addressdata"]
logger.info("Raw vCard after contact update:")
logger.info(updated_addressdata)
# THIS IS THE CRITICAL TEST - extended fields should be preserved during updates
extended_field_checks = [
("ORG:Example Corporation", "organization field"),
("TITLE:Senior Developer", "title field"),
("TEL;TYPE=cell:+1-555-123-4567", "cell phone"),
("TEL;TYPE=work:+1-555-987-6543", "work phone"),
("ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA", "home address"),
("ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA", "work address"),
("URL;TYPE=work;VALUE=URI:https://company.com/john", "work URL"),
("NOTE:This is a note with important information", "note field"),
("CATEGORIES:colleagues,developers,friends", "categories"),
("X-CUSTOM-FIELD:This should be preserved", "custom field"),
("X-LINKEDIN:https://linkedin.com/in/johndoe", "LinkedIn custom field"),
("john.updated@example.com", "updated email"),
]
all_preserved = True
for field_pattern, field_name in extended_field_checks:
if field_pattern in updated_addressdata:
logger.info(f"{field_name} preserved")
else:
logger.error(f"{field_name} was lost during update")
all_preserved = False
# The test should PASS - field preservation should work
assert all_preserved, (
"Contact update lost extended field data - this indicates the preservation mechanism failed"
)
logger.info("🎉 SUCCESS: All extended fields preserved during contact update!")
finally:
# Cleanup
try:
await nc_client.contacts.delete_addressbook(name=addressbook_name)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup addressbook {addressbook_name}: {cleanup_error}"
)
@pytest.mark.integration
async def test_calendar_event_roundtrip_data_loss_demonstration(nc_client):
"""Demonstrates specific data loss scenarios in calendar events."""
calendar_name = "personal"
event_data = {
"title": "Roundtrip Test Event",
"description": "Testing data preservation",
"start_datetime": (datetime.now() + timedelta(days=2)).isoformat(),
"end_datetime": (datetime.now() + timedelta(days=2, hours=1)).isoformat(),
}
result = await nc_client.calendar.create_event(calendar_name, event_data)
event_uid = result["uid"]
try:
# Inject additional iCal properties that are valid but not supported by our parser
extended_ical = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Extended Client//EN
BEGIN:VEVENT
UID:{event_uid}
DTSTART:{(datetime.now() + timedelta(days=2)).strftime("%Y%m%dT%H%M%SZ")}
DTEND:{(datetime.now() + timedelta(days=2, hours=1)).strftime("%Y%m%dT%H%M%SZ")}
SUMMARY:Roundtrip Test Event
DESCRIPTION:Testing data preservation
STATUS:CONFIRMED
PRIORITY:5
CLASS:PUBLIC
SEQUENCE:1
X-MICROSOFT-CDO-ALLDAYEVENT:FALSE
X-MICROSOFT-CDO-IMPORTANCE:1
X-CUSTOM-MEETING-ID:12345-67890
X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890
ORGANIZER;CN=Test Organizer:mailto:organizer@example.com
COMMENT:This is a comment that should be preserved
LOCATION:Conference Room A
GEO:40.7128;-74.0060
TRANSP:OPAQUE
CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VEVENT
END:VCALENDAR"""
# Inject the extended iCal
event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics"
await nc_client.calendar._make_request(
"PUT",
event_path,
content=extended_ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
# Verify extended properties are present
response = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
original_ical = response.text
# Confirm extended properties exist
extended_properties = [
"SEQUENCE:1",
"X-MICROSOFT-CDO-ALLDAYEVENT:FALSE",
"X-CUSTOM-MEETING-ID:12345-67890",
"X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890",
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com",
"COMMENT:This is a comment that should be preserved",
"GEO:40.7128;-74.0060",
"TRANSP:OPAQUE",
]
# More flexible patterns for properties that might be reformatted
flexible_patterns = {
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com": [
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com",
'ORGANIZER;CN="Test Organizer":mailto:organizer@example.com',
],
"GEO:40.7128;-74.0060": [
"GEO:40.7128;-74.0060",
"GEO:40.7128;-74.006", # May lose trailing zero
],
}
for prop in extended_properties:
assert prop in original_ical, (
f"Extended property {prop} not found in original iCal"
)
logger.info("✓ All extended properties confirmed in original iCal")
# Now perform a simple update through MCP
update_data = {"location": "Conference Room B"} # Simple location change
await nc_client.calendar.update_event(calendar_name, event_uid, update_data)
# Check what survived the round-trip
response_after = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
updated_ical = response_after.text
logger.info("Checking which properties survived the update...")
# Check which extended properties survived
survived = []
lost = []
for prop in extended_properties:
# Check if this property has flexible patterns
if prop in flexible_patterns:
# Check if any of the flexible patterns match
found = any(
pattern in updated_ical for pattern in flexible_patterns[prop]
)
if found:
survived.append(prop)
else:
lost.append(prop)
else:
# Standard exact match
if prop in updated_ical:
survived.append(prop)
else:
lost.append(prop)
logger.info(f"Properties that SURVIVED: {survived}")
logger.error(f"Properties that were LOST: {lost}")
# This test should fail - we expect data loss
assert len(lost) == 0, (
f"Round-trip update lost {len(lost)} extended properties: {lost}"
)
finally:
try:
await nc_client.calendar.delete_event(calendar_name, event_uid)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")
+43 -25
View File
@@ -24,7 +24,6 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession):
# Verify expected tools are present
expected_tools = [
"nc_get_note",
"nc_notes_create_note",
"nc_notes_update_note",
"nc_notes_append_content",
@@ -137,11 +136,9 @@ async def test_mcp_notes_crud_workflow(
# 3. Read note via MCP
logger.info(f"Reading note via MCP: {note_id}")
read_result = await nc_mcp_client.call_tool("nc_get_note", {"note_id": note_id})
assert read_result.isError is False, (
f"MCP note read failed: {read_result.content}"
)
read_note_data = json.loads(read_result.content[0].text)
read_result = await nc_mcp_client.read_resource(f"nc://Notes/{note_id}")
assert len(read_result.contents) == 1, "Expected exactly one content item"
read_note_data = json.loads(read_result.contents[0].text)
assert read_note_data["title"] == test_title
assert read_note_data["content"] == test_content
@@ -199,14 +196,23 @@ async def test_mcp_notes_crud_workflow(
)
search_notes_text = search_result.content[0].text
logger.info(f"Search result text: {search_notes_text}")
search_notes = json.loads(search_notes_text)
search_response = json.loads(search_notes_text)
# Ensure search_notes is a list
if not isinstance(search_notes, list):
logger.warning(
f"Expected search results to be a list, got: {type(search_notes)}"
)
search_notes = [search_notes] if search_notes else []
# Expect structured response with Pydantic format
assert isinstance(search_response, dict), (
f"Expected search response to be a dict with structured format, got: {type(search_response)}"
)
assert "results" in search_response, (
f"Expected 'results' field in search response, got keys: {list(search_response.keys())}"
)
assert "success" in search_response and search_response["success"], (
f"Expected successful search response, got: {search_response}"
)
search_notes = search_response["results"]
assert isinstance(search_notes, list), (
f"Expected results to be a list, got: {type(search_notes)}"
)
# Find our note in search results
found_note = None
@@ -216,7 +222,7 @@ async def test_mcp_notes_crud_workflow(
break
assert found_note is not None, (
f"Created note not found in search results. Search returned: {search_notes}"
f"Created note not found in search results. Search returned: {search_response}"
)
assert found_note["title"] == updated_title
@@ -431,21 +437,33 @@ async def test_mcp_calendar_workflow(
f"MCP calendar listing failed: {calendars_result.content}"
)
calendars_data = json.loads(calendars_result.content[0].text)
calendars_response = json.loads(calendars_result.content[0].text)
# Debug output to understand the structure
logger.info(f"calendars_data type: {type(calendars_data)}")
logger.info(f"calendars_data content: {calendars_data}")
logger.info(f"calendars_response type: {type(calendars_response)}")
logger.info(f"calendars_response content: {calendars_response}")
# Handle the case where MCP tool returns a single dict instead of a list
if isinstance(calendars_data, dict):
# Single calendar returned as dict instead of list
calendar_name = calendars_data["name"]
elif isinstance(calendars_data, list) and calendars_data:
# Normal case - list of calendars
calendar_name = calendars_data[0]["name"]
else:
# Expect structured response with Pydantic format
assert isinstance(calendars_response, dict), (
f"Expected calendar response to be a dict with structured format, got: {type(calendars_response)}"
)
assert "calendars" in calendars_response, (
f"Expected 'calendars' field in response, got keys: {list(calendars_response.keys())}"
)
assert "success" in calendars_response and calendars_response["success"], (
f"Expected successful calendar response, got: {calendars_response}"
)
calendars_list = calendars_response["calendars"]
assert isinstance(calendars_list, list), (
f"Expected calendars to be a list, got: {type(calendars_list)}"
)
if not calendars_list:
pytest.skip("No calendars available for testing")
# Use the first available calendar
calendar_name = calendars_list[0]["name"]
logger.info(f"Using calendar: {calendar_name}")
# 2. Create event via MCP
Generated
+4 -2
View File
@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.11"
[[package]]
@@ -505,13 +505,14 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.6.1"
version = "0.7.2"
source = { editable = "." }
dependencies = [
{ name = "httpx" },
{ name = "icalendar" },
{ name = "mcp", extra = ["cli"] },
{ name = "pillow" },
{ name = "pydantic" },
{ name = "pythonvcard4" },
]
@@ -531,6 +532,7 @@ requires-dist = [
{ name = "icalendar", specifier = ">=6.0.0,<7.0.0" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.10,<1.11" },
{ name = "pillow", specifier = ">=11.2.1,<12.0.0" },
{ name = "pydantic", specifier = ">=2.11.4" },
{ name = "pythonvcard4", specifier = ">=0.2.0" },
]