Compare commits

..

73 Commits

Author SHA1 Message Date
Chris Coutinho 949fb7124b fix(notes): Remove note contents from responses to reduce token usage 2025-08-31 11:55:15 +02:00
Chris Coutinho 53b11f7fbb fix(model): Serialize timestamps in RFC3339 format 2025-08-30 22:37:16 +02:00
Chris Coutinho 336bc45637 Merge pull request #138 from cbcoutinho/renovate/nextcloud-31.0.8
chore(deps): update nextcloud:31.0.8 docker digest to fcf6370
2025-08-30 20:29:17 +02:00
renovate-bot-cbcoutinho[bot] 6c587bb265 chore(deps): update nextcloud:31.0.8 docker digest to fcf6370 2025-08-30 18:19:45 +00:00
github-actions[bot] 6b1f5c12c8 bump: version 0.7.2 → 0.8.0 2025-08-30 17:28:57 +00:00
Chris Coutinho f8dc1f060b Merge pull request #137 from cbcoutinho/feature/claude-code
Feature/claude code
2025-08-30 19:28:33 +02:00
Chris Coutinho 4cf5f2a95a feat(client): Preserve fields when modifying contacts/calendar resources 2025-08-30 19:19:20 +02:00
Chris Coutinho 1cc65f0160 chore: Remove unused model 2025-08-30 18:31:45 +02:00
Chris Coutinho 9b00530e8e feat(server): Add structured output to all tool/resource output
BREAKING CHANGE
2025-08-30 18:27:32 +02:00
Chris Coutinho 938376425b chore: Update CLAUDE.md 2025-08-30 14:34:25 +02:00
Chris Coutinho 0484167a22 refactor: Use _make_request where available 2025-08-30 14:27:53 +02:00
Chris Coutinho 84ad1958af chore: Remove unnecessary logging
Migrate pre-commit tasks to local
2025-08-30 14:25:16 +02:00
Chris Coutinho fa002296ff chore(claude): Initialize CLAUDE.md 2025-08-30 13:23:34 +02:00
github-actions[bot] 464ff2c8b2 bump: version 0.7.1 → 0.7.2 2025-08-30 10:15:06 +00:00
Chris Coutinho 0804ff8d17 Merge pull request #136 from rnivet/fix/get-all-notes-paging
fix(client): Use paging to fetch all notes
2025-08-30 12:14:45 +02:00
Rémi Nivet 4f7023a16e fix(client): Use paging to fetch all notes 2025-08-29 23:46:58 +02:00
Chris Coutinho 8f6656c546 Merge pull request #134 from cbcoutinho/renovate/nextcloud-31.0.8
chore(deps): update nextcloud:31.0.8 docker digest to 3eaddb0
2025-08-29 12:53:52 +02:00
Chris Coutinho 741c58d9a3 Merge pull request #135 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.14
2025-08-29 12:53:42 +02:00
renovate-bot-cbcoutinho[bot] e7b79d0316 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.14 2025-08-29 10:25:25 +00:00
renovate-bot-cbcoutinho[bot] 0e4cc8e56f chore(deps): update nextcloud:31.0.8 docker digest to 3eaddb0 2025-08-29 10:25:20 +00:00
Chris Coutinho 16da7a9a76 Merge pull request #133 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.13
2025-08-22 13:06:28 +02:00
renovate-bot-cbcoutinho[bot] 520e515f2b chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.13 2025-08-21 22:14:57 +00:00
Chris Coutinho fd6ce7b294 Merge pull request #132 from cbcoutinho/renovate/astral-sh-setup-uv-digest
chore(deps): update astral-sh/setup-uv digest to 4959332
2025-08-21 12:45:28 +02:00
renovate-bot-cbcoutinho[bot] 8063059f5f chore(deps): update astral-sh/setup-uv digest to 4959332 2025-08-21 10:04:51 +00:00
Chris Coutinho 20c5046b20 Merge pull request #130 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to 987c376
2025-08-19 11:50:51 +02:00
Chris Coutinho 68126640d8 Merge pull request #131 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.12
2025-08-19 11:50:10 +02:00
renovate-bot-cbcoutinho[bot] af617e3869 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.12 2025-08-19 04:04:58 +00:00
renovate-bot-cbcoutinho[bot] 04e5f7beca chore(deps): update redis:alpine docker digest to 987c376 2025-08-19 04:04:54 +00:00
Chris Coutinho 6ed1efab24 Merge pull request #129 from cbcoutinho/renovate/nextcloud-31.0.8
chore(deps): update nextcloud:31.0.8 docker digest to 72abe18
2025-08-17 23:30:34 +02:00
renovate-bot-cbcoutinho[bot] cffa002364 chore(deps): update nextcloud:31.0.8 docker digest to 72abe18 2025-08-17 16:04:16 +00:00
Chris Coutinho 951a7095b2 Merge pull request #127 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.11
2025-08-16 20:04:50 +02:00
Chris Coutinho ee31f33038 Merge pull request #128 from cbcoutinho/renovate/nextcloud-31.x
chore(deps): update nextcloud docker tag to v31.0.8
2025-08-15 14:18:22 +02:00
renovate-bot-cbcoutinho[bot] 0fdbfae198 chore(deps): update nextcloud docker tag to v31.0.8 2025-08-15 04:08:58 +00:00
renovate-bot-cbcoutinho[bot] 315f918d88 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.11 2025-08-14 22:11:23 +00:00
Chris Coutinho 96a8491a4c Merge pull request #123 from cbcoutinho/renovate/astral-sh-setup-uv-digest
chore(deps): update astral-sh/setup-uv digest to d9e0f98
2025-08-13 10:00:32 +02:00
Chris Coutinho 0a311766f2 Merge pull request #124 from cbcoutinho/renovate/mariadb-lts
chore(deps): update mariadb:lts docker digest to 272084c
2025-08-13 09:59:56 +02:00
Chris Coutinho d28c249f8d Merge pull request #125 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to b255a97
2025-08-13 09:59:47 +02:00
renovate-bot-cbcoutinho[bot] ab6cac8799 chore(deps): update nextcloud:31.0.7 docker digest to b255a97 2025-08-13 04:05:37 +00:00
renovate-bot-cbcoutinho[bot] 7127b9953f chore(deps): update mariadb:lts docker digest to 272084c 2025-08-13 04:05:33 +00:00
renovate-bot-cbcoutinho[bot] 49c9af3c76 chore(deps): update astral-sh/setup-uv digest to d9e0f98 2025-08-12 22:08:22 +00:00
Chris Coutinho 823151f42e Merge pull request #122 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.9
2025-08-12 13:31:53 +02:00
renovate-bot-cbcoutinho[bot] 2bbd56e1cd chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.9 2025-08-12 04:05:16 +00:00
Chris Coutinho 8a36a120a7 Merge pull request #121 from cbcoutinho/renovate/actions-checkout-5.x
chore(deps): update actions/checkout action to v5
2025-08-11 22:39:16 +02:00
renovate-bot-cbcoutinho[bot] 9df8cc937d chore(deps): update actions/checkout action to v5 2025-08-11 16:07:14 +00:00
Chris Coutinho 325dcdf654 Merge pull request #118 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.8
2025-08-09 09:09:45 +02:00
renovate-bot-cbcoutinho[bot] 945eb1eb4e chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.8 2025-08-09 04:04:39 +00:00
Chris Coutinho 088343d003 Merge pull request #117 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.7
2025-08-09 01:14:56 +02:00
renovate-bot-cbcoutinho[bot] 94d553985f chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.7 2025-08-08 22:07:52 +00:00
github-actions[bot] 982dbd18ca bump: version 0.7.0 → 0.7.1 2025-08-08 19:04:17 +00:00
Chris Coutinho 054fa38e3a Merge pull request #116 from cbcoutinho/fix/csrf-cookies
Strip cookies from responses to avoid falsely raising CS…
2025-08-08 21:03:56 +02:00
Chris Coutinho 3836534205 fix(client): Strip cookies from responses to avoid falsely raising CSRF errors 2025-08-08 21:03:16 +02:00
Chris Coutinho f852a18b12 Merge pull request #114 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.6
2025-08-08 13:11:56 +02:00
renovate-bot-cbcoutinho[bot] 0450c5cc52 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.6 2025-08-07 16:06:38 +00:00
Chris Coutinho f48fd0be60 Merge pull request #113 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to a834f43
2025-08-07 09:11:06 +02:00
renovate-bot-cbcoutinho[bot] ee29194bc9 chore(deps): update nextcloud:31.0.7 docker digest to a834f43 2025-08-07 04:06:07 +00:00
Chris Coutinho fc32fa2852 Merge pull request #112 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to 7521abd
2025-08-06 20:53:55 +02:00
renovate-bot-cbcoutinho[bot] b7d6548741 chore(deps): update redis:alpine docker digest to 7521abd 2025-08-06 10:05:20 +00:00
Chris Coutinho a9ffd49815 Merge pull request #111 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.5
2025-08-06 02:52:55 +02:00
renovate-bot-cbcoutinho[bot] 538f861414 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.8.5 2025-08-05 22:09:00 +00:00
Chris Coutinho b784651f7f Merge pull request #110 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to 33c21e8
2025-08-05 18:27:41 +02:00
renovate-bot-cbcoutinho[bot] 6f0baf5fca chore(deps): update nextcloud:31.0.7 docker digest to 33c21e8 2025-08-05 16:04:55 +00:00
Chris Coutinho 664254ed95 Merge pull request #108 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to e716e2f
2025-08-05 14:55:04 +02:00
Chris Coutinho b976494ca2 Merge pull request #109 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to a0fc425
2025-08-05 14:54:55 +02:00
renovate-bot-cbcoutinho[bot] 061f667e00 chore(deps): update redis:alpine docker digest to a0fc425 2025-08-05 10:05:41 +00:00
renovate-bot-cbcoutinho[bot] 3319c35798 chore(deps): update nextcloud:31.0.7 docker digest to e716e2f 2025-08-05 10:05:35 +00:00
Chris Coutinho 52c9293c37 Merge pull request #106 from cbcoutinho/renovate/redis-alpine
chore(deps): update redis:alpine docker digest to fb96127
2025-08-05 08:54:31 +02:00
Chris Coutinho af6863a764 Merge pull request #107 from cbcoutinho/renovate/nextcloud-31.0.7
chore(deps): update nextcloud:31.0.7 docker digest to 6b268fb
2025-08-05 08:53:01 +02:00
renovate-bot-cbcoutinho[bot] 77181f7c6f chore(deps): update nextcloud:31.0.7 docker digest to 6b268fb 2025-08-05 04:05:19 +00:00
renovate-bot-cbcoutinho[bot] 61f3beac01 chore(deps): update redis:alpine docker digest to fb96127 2025-08-04 22:07:46 +00:00
Chris Coutinho 49aaf24363 Merge pull request #105 from cbcoutinho/renovate/docker-login-action-digest
chore(deps): update docker/login-action digest to 184bdaa
2025-08-04 19:22:12 +02:00
renovate-bot-cbcoutinho[bot] 4edd31ee28 chore(deps): update docker/login-action digest to 184bdaa 2025-08-04 16:05:38 +00:00
github-actions[bot] 9ae2a0fc6f bump: version 0.6.1 → 0.7.0 2025-08-03 12:47:13 +00:00
Chris Coutinho 8386644dfd Merge pull request #104 from cbcoutinho/feature/vcard
Initialize Contacts App
2025-08-03 14:46:48 +02:00
32 changed files with 2420 additions and 106 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ jobs:
packages: write
steps:
- name: Check out
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
with:
fetch-depth: 0
token: "${{ secrets.PERSONAL_ACCESS_TOKEN }}"
+2 -2
View File
@@ -12,7 +12,7 @@ jobs:
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Docker meta
id: meta
@@ -37,7 +37,7 @@ jobs:
- name: Log in to GitHub Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3
uses: docker/login-action@184bdaa0721073962dff0199f1fb9940f07167d1 # v3
with:
registry: ghcr.io
username: ${{ github.actor }}
+4 -4
View File
@@ -9,9 +9,9 @@ jobs:
linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Install the latest version of uv
uses: astral-sh/setup-uv@e92bafb6253dcd438e0484186d7669ea7a8ca1cc # v6
uses: astral-sh/setup-uv@4959332f0f014c5280e7eac8b70c90cb574c9f9b # v6
- name: Check format
run: |
uv run --frozen ruff format --diff
@@ -24,14 +24,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Run docker compose
uses: hoverkraft-tech/compose-action@40041ff1b97dbf152cd2361138c2b03fa29139df # v2.3.0
with:
compose-file: "./docker-compose.yml"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@e92bafb6253dcd438e0484186d7669ea7a8ca1cc # v6
uses: astral-sh/setup-uv@4959332f0f014c5280e7eac8b70c90cb574c9f9b # v6
- name: Wait for service to be ready
run: |
+9 -2
View File
@@ -6,8 +6,15 @@ repos:
- id: commitizen-branch
stages:
- pre-push
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.5
- repo: local
hooks:
- id: ruff-check
name: ruff-check
entry: uv run ruff check
language: system
types: [python]
- id: ruff-format
name: ruff-format
entry: uv run ruff format
language: system
types: [python]
+29
View File
@@ -1,3 +1,32 @@
## v0.8.0 (2025-08-30)
### Feat
- **client**: Preserve fields when modifying contacts/calendar resources
- **server**: Add structured output to all tool/resource output
### Refactor
- Use _make_request where available
## v0.7.2 (2025-08-30)
### Fix
- **client**: Use paging to fetch all notes
## v0.7.1 (2025-08-08)
### Fix
- **client**: Strip cookies from responses to avoid falsely raising CSRF errors
## v0.7.0 (2025-08-03)
### Feat
- **contacts**: Initialize Contacts App
## v0.6.1 (2025-08-01)
### Fix
+118
View File
@@ -0,0 +1,118 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Development Commands
### Testing
```bash
# Run all tests
uv run pytest
# Run integration tests only
uv run pytest -m integration
# Run tests with coverage
uv run pytest --cov
# Skip integration tests
uv run pytest -m "not integration"
```
### Code Quality
```bash
# Format and lint code
uv run ruff check
uv run ruff format
# Type checking
# No explicit type checker configured - this is a Python project using ruff for linting
```
### Running the Server
```bash
# Local development - load environment variables and run
export $(grep -v '^#' .env | xargs)
mcp run --transport sse nextcloud_mcp_server.app:mcp
# Docker development environment with Nextcloud instance
docker-compose up
# After code changes, rebuild and restart only the MCP server container
docker-compose up --build -d mcp
# Build Docker image
docker build -t nextcloud-mcp-server .
```
### Environment Setup
```bash
# Install dependencies
uv sync
# Install development dependencies
uv sync --group dev
```
## Architecture Overview
This is a Python MCP (Model Context Protocol) server that provides LLM integration with Nextcloud. The architecture follows a layered pattern:
### Core Components
- **`nextcloud_mcp_server/app.py`** - Main MCP server entry point using FastMCP framework
- **`nextcloud_mcp_server/client/`** - HTTP client implementations for different Nextcloud APIs
- **`nextcloud_mcp_server/server/`** - MCP tool/resource definitions that expose client functionality
- **`nextcloud_mcp_server/controllers/`** - Business logic controllers (e.g., notes search)
### Client Architecture
- **`NextcloudClient`** - Main orchestrating client that manages all app-specific clients
- **`BaseNextcloudClient`** - Abstract base class providing common HTTP functionality and retry logic
- **App-specific clients**: `NotesClient`, `CalendarClient`, `ContactsClient`, `TablesClient`, `WebDAVClient`
### Server Integration
Each Nextcloud app has a corresponding server module that:
1. Defines MCP tools using `@mcp.tool()` decorators
2. Defines MCP resources using `@mcp.resource()` decorators
3. Uses the context pattern to access the `NextcloudClient` instance
### Supported Nextcloud Apps
- **Notes** - Full CRUD operations and search
- **Calendar** - CalDAV integration with events, recurring events, attendees
- **Contacts** - CardDAV integration with address book operations
- **Tables** - Row-level operations on Nextcloud Tables
- **WebDAV** - Complete file system access
### Key Patterns
1. **Environment-based configuration** - Uses `NextcloudClient.from_env()` to load credentials from environment variables
2. **Async/await throughout** - All operations are async using httpx
3. **Retry logic** - `@retry_on_429` decorator handles rate limiting
4. **Context injection** - MCP context provides access to the authenticated client instance
5. **Modular design** - Each Nextcloud app is isolated in its own client/server pair
### Testing Structure
- **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions
- **Fixtures** in `tests/conftest.py` - Shared test setup and utilities
- Tests are marked with `@pytest.mark.integration` for selective running
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
#### Testing Best Practices
- **Always restart MCP server** after code changes with `docker-compose up --build -d mcp`
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
- `nc_mcp_client` - MCP client session for tool/resource testing
- `nc_client` - Direct NextcloudClient for setup/cleanup operations
- `temporary_note` - Creates and cleans up test notes automatically
- `temporary_addressbook` - Creates and cleans up test address books
- `temporary_contact` - Creates and cleans up test contacts
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
### Configuration Files
- **`pyproject.toml`** - Python project configuration using uv for dependency management
- **`.env`** (from `env.sample`) - Environment variables for Nextcloud connection
- **`docker-compose.yml`** - Complete development environment with Nextcloud + database
+1 -1
View File
@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:0.8.4-python3.11-alpine@sha256:f2c5b953b713f455bcac4429303bb21d7d2547d56a64e1a7b2517cc9f0563f0f
FROM ghcr.io/astral-sh/uv:0.8.14-python3.11-alpine@sha256:7b1463148981d57ed2d9c2950f570fe5fdd88570970f9f56f6e0e5a8829eca95
WORKDIR /app
@@ -11,6 +11,10 @@ php /var/www/html/occ app:enable calendar
echo "Waiting for calendar app to initialize..."
sleep 5
# Increase limits on calendar creation for integration tests (100 in 60s)
php occ config:app:set dav rateLimitCalendarCreation --type=integer --value=100
php occ config:app:set dav rateLimitPeriodCalendarCreation --type=integer --value=60
# Ensure maintenance mode is off before calendar operations
php /var/www/html/occ maintenance:mode --off
+3 -3
View File
@@ -3,7 +3,7 @@ services:
# https://hub.docker.com/_/mariadb
db:
# Note: Check the recommend version here: https://docs.nextcloud.com/server/latest/admin_manual/installation/system_requirements.html#server
image: mariadb:lts@sha256:2bcbaec92bd9d4f6591bc8103d3a8e6d0512ee2235506e47a2e129d190444405
image: mariadb:lts@sha256:272084c2dec70619714df329c4ffcb336e3f8c723072c3f56f2e4015997bbf2c
restart: always
command: --transaction-isolation=READ-COMMITTED
volumes:
@@ -17,11 +17,11 @@ services:
# Note: Redis is an external service. You can find more information about the configuration here:
# https://hub.docker.com/_/redis
redis:
image: redis:alpine@sha256:25c0ae32c6c2301798579f5944af53729766a18eff5660bbef196fc2e6214a9c
image: redis:alpine@sha256:987c376c727652f99625c7d205a1cba3cb2c53b92b0b62aade2bd48ee1593232
restart: always
app:
image: nextcloud:31.0.7@sha256:81dc361f8f216d8acff20bd3dea2226fb6cea883c277505cbb2ddd6327c867fa
image: nextcloud:31.0.8@sha256:fcf637074755bb1d27644441b938bf39b27dd6c0a8c2326a5752e1b3d5014366
#user: www-data:www-data
restart: always
#post_start:
+33 -9
View File
@@ -1,7 +1,15 @@
import logging
import os
from httpx import AsyncClient, Auth, BasicAuth, Request, Response
from httpx import (
AsyncClient,
Auth,
BasicAuth,
Request,
Response,
AsyncBaseTransport,
AsyncHTTPTransport,
)
from ..controllers.notes_search import NotesSearchController
from .calendar import CalendarClient
@@ -13,19 +21,34 @@ from .webdav import WebDAVClient
logger = logging.getLogger(__name__)
def log_request(request: Request):
logger.info(
async def log_request(request: Request):
logger.debug(
"Request event hook: %s %s - Waiting for content",
request.method,
request.url,
)
logger.info("Request body: %s", request.content)
logger.info("Headers: %s", request.headers)
logger.debug("Request body: %s", request.content)
logger.debug("Headers: %s", request.headers)
def log_response(response: Response):
response.read() # Explicitly read the stream before accessing .text
logger.info("Response [%s] %s", response.status_code, response.text)
async def log_response(response: Response):
await response.aread()
logger.debug("Response [%s] %s", response.status_code, response.text)
class AsyncDisableCookieTransport(AsyncBaseTransport):
"""This Transport disable cookies from accumulating in the httpx AsyncClient
Thanks to: https://github.com/encode/httpx/issues/2992#issuecomment-2133258994
"""
def __init__(self, transport: AsyncBaseTransport):
self.transport = transport
async def handle_async_request(self, request: Request) -> Response:
response = await self.transport.handle_async_request(request)
response.headers.pop("set-cookie", None)
return response
class NextcloudClient:
@@ -36,7 +59,8 @@ class NextcloudClient:
self._client = AsyncClient(
base_url=base_url,
auth=auth,
# event_hooks={"request": [log_request], "response": [log_response]},
transport=AsyncDisableCookieTransport(AsyncHTTPTransport()),
event_hooks={"request": [log_request], "response": [log_response]},
)
# Initialize app clients
+63 -1
View File
@@ -3,11 +3,72 @@
import logging
from abc import ABC
from httpx import AsyncClient
from functools import wraps
import time
from httpx import HTTPStatusError, codes, RequestError, AsyncClient
logger = logging.getLogger(__name__)
def retry_on_429(func):
"""This decorator handles the 429 response from REST APIs
The `func` is assumed to be a method that is similar to `httpx.Client.get`,
and returns an `httpx.Response` object. In the case of `Too Many Requests` HTTP
response, the function will wait for a couple of seconds and retry the request.
"""
MAX_RETRIES = 5
@wraps(func)
async def wrapper(*args, **kwargs):
retries = 0
while retries < MAX_RETRIES:
try:
# Make GET API call
retries += 1
response = await func(*args, **kwargs)
break
except HTTPStatusError as e:
# If we get a '429 Client Error: Too Many Requests'
# error we wait a couple of seconds and do a retry
if e.response.status_code == codes.TOO_MANY_REQUESTS:
logger.warning(
f"429 Client Error: Too Many Requests, Number of attempts: {retries}"
)
time.sleep(5)
elif e.response.status_code == 404:
# 404 errors are often expected (e.g., checking if attachments exist)
# Log as debug instead of warning
logger.debug(
f"HTTPStatusError {e.response.status_code}: {e}, Number of attempts: {retries}"
)
raise
else:
logger.warning(
f"HTTPStatusError {e.response.status_code}: {e}, Number of attempts: {retries}"
)
raise
except RequestError as e:
logger.warning(
f"RequestError {e.request.url}: {e}, Number of attempts: {retries}"
)
raise
# If for loop ends without break statement
else:
logger.warning("All API call retries failed")
raise RuntimeError(
f"Maximum number of retries ({MAX_RETRIES}) exceeded without success"
)
return response
return wrapper
class BaseNextcloudClient(ABC):
"""Base class for all Nextcloud app clients."""
@@ -25,6 +86,7 @@ class BaseNextcloudClient(ABC):
"""Helper to get the base WebDAV path for the authenticated user."""
return f"/remote.php/dav/files/{self.username}"
@retry_on_429
async def _make_request(self, method: str, url: str, **kwargs):
"""Common request wrapper with logging and error handling.
+136 -11
View File
@@ -238,27 +238,33 @@ class CalendarClient(BaseNextcloudClient):
event_data: Dict[str, Any],
etag: str = "",
) -> Dict[str, Any]:
"""Update an existing calendar event."""
"""Update an existing calendar event while preserving all existing properties."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
# Get existing event data to merge with updates
existing_event_data = {}
# Get raw iCal content to preserve all properties including extended ones
raw_ical_content = ""
if not etag:
try:
existing_event_data, current_etag = await self.get_event(
raw_ical_content, current_etag = await self._get_raw_ical(
calendar_name, event_uid
)
etag = current_etag
except Exception:
# Continue without etag if we can't get it
pass
# Fall back to creating new iCal if we can't get existing
logger.warning(
f"Could not fetch existing iCal for {event_uid}, creating new"
)
raw_ical_content = ""
# Merge existing data with new data (new data takes precedence)
merged_data = {**existing_event_data, **event_data}
# Create updated iCalendar event
ical_content = self._create_ical_event(merged_data, event_uid)
# Create updated iCalendar event preserving existing properties
if raw_ical_content:
ical_content = self._merge_ical_properties(
raw_ical_content, event_data, event_uid
)
else:
# Fallback to creating new iCal if we couldn't get existing
ical_content = self._create_ical_event(event_data, event_uid)
headers = {
"Content-Type": "text/calendar; charset=utf-8",
@@ -949,3 +955,122 @@ class CalendarClient(BaseNextcloudClient):
except Exception as e:
logger.error(f"Error deleting calendar {calendar_name}: {e}")
raise
async def _get_raw_ical(
self, calendar_name: str, event_uid: str
) -> Tuple[str, str]:
"""Get raw iCal content for an event without parsing."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
headers = {"Accept": "text/calendar"}
try:
response = await self._make_request("GET", event_path, headers=headers)
etag = response.headers.get("etag", "")
return response.text, etag
except Exception as e:
logger.error(f"Error getting raw iCal for {event_uid}: {e}")
raise
def _merge_ical_properties(
self, raw_ical: str, event_data: Dict[str, Any], event_uid: str
) -> str:
"""Merge new event data into existing raw iCal while preserving all properties."""
try:
# Parse existing iCal
cal = Calendar.from_ical(raw_ical)
# Find the VEVENT component
for component in cal.walk():
if component.name == "VEVENT":
# Update only the properties that were provided in event_data
if "title" in event_data:
component["SUMMARY"] = event_data["title"]
if "description" in event_data:
component["DESCRIPTION"] = event_data["description"]
if "location" in event_data:
component["LOCATION"] = event_data["location"]
if "status" in event_data:
component["STATUS"] = event_data["status"].upper()
if "priority" in event_data:
component["PRIORITY"] = event_data["priority"]
if "privacy" in event_data:
component["CLASS"] = event_data["privacy"].upper()
if "url" in event_data:
component["URL"] = event_data["url"]
# Handle dates
if "start_datetime" in event_data:
start_str = event_data["start_datetime"]
all_day = event_data.get("all_day", False)
if all_day:
start_date = dt.datetime.fromisoformat(
start_str.split("T")[0]
).date()
component["DTSTART"] = start_date
else:
start_dt = dt.datetime.fromisoformat(
start_str.replace("Z", "+00:00")
)
component["DTSTART"] = start_dt
if "end_datetime" in event_data:
end_str = event_data["end_datetime"]
all_day = event_data.get("all_day", False)
if all_day:
end_date = dt.datetime.fromisoformat(
end_str.split("T")[0]
).date()
component["DTEND"] = end_date
else:
end_dt = dt.datetime.fromisoformat(
end_str.replace("Z", "+00:00")
)
component["DTEND"] = end_dt
# Handle categories
if "categories" in event_data:
categories = event_data["categories"]
if categories:
component["CATEGORIES"] = categories.split(",")
# Handle recurrence
if "recurring" in event_data:
if event_data["recurring"] and "recurrence_rule" in event_data:
recurrence_rule = event_data["recurrence_rule"]
if recurrence_rule:
component["RRULE"] = vRecur.from_ical(recurrence_rule)
elif not event_data["recurring"]:
# Remove recurrence if set to False
if "RRULE" in component:
del component["RRULE"]
# Handle attendees
if "attendees" in event_data:
attendees = event_data["attendees"]
# Remove existing attendees
component.pop("ATTENDEE", None)
if attendees:
for email in attendees.split(","):
if email.strip():
component.add("ATTENDEE", f"mailto:{email.strip()}")
# Update timestamps in proper iCal format
from icalendar import vDDDTypes
now = dt.datetime.now(dt.UTC)
component["LAST-MODIFIED"] = vDDDTypes(now)
component["DTSTAMP"] = vDDDTypes(now)
# Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.)
# by not touching them - they remain in the component
break
return cal.to_ical().decode("utf-8")
except Exception as e:
logger.error(f"Error merging iCal properties: {e}")
# Fallback to creating new iCal
return self._create_ical_event(event_data, event_uid)
+201
View File
@@ -143,6 +143,50 @@ class ContactsClient(BaseNextcloudClient):
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
await self._make_request("DELETE", url)
async def update_contact(
self, *, addressbook: str, uid: str, contact_data: dict, etag: str = ""
):
"""Update an existing contact while preserving all existing properties."""
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
# Get raw vCard content to preserve all properties including extended ones
raw_vcard_content = ""
if not etag:
try:
raw_vcard_content, current_etag = await self._get_raw_vcard(
addressbook, uid
)
etag = current_etag
except Exception:
# Fall back to creating new vCard if we can't get existing
logger.warning(
f"Could not fetch existing vCard for {uid}, creating new"
)
raw_vcard_content = ""
# Create updated vCard preserving existing properties
if raw_vcard_content:
vcard_content = self._merge_vcard_properties(
raw_vcard_content, contact_data, uid
)
else:
# Fallback to creating new vCard if we couldn't get existing
contact = Contact(fn=contact_data.get("fn"), uid=uid)
if "email" in contact_data:
contact.email = [{"value": contact_data["email"], "type": ["HOME"]}]
if "tel" in contact_data:
contact.tel = [{"value": contact_data["tel"], "type": ["HOME"]}]
vcard_content = contact.to_vcard()
headers = {
"Content-Type": "text/vcard; charset=utf-8",
}
if etag:
headers["If-Match"] = etag
await self._make_request("PUT", url, content=vcard_content, headers=headers)
async def list_contacts(self, *, addressbook: str):
"""List all available contacts for addressbook."""
@@ -233,3 +277,160 @@ class ContactsClient(BaseNextcloudClient):
logger.debug(f"Found {len(contacts)} contacts")
return contacts
async def _get_raw_vcard(self, addressbook: str, uid: str) -> tuple[str, str]:
"""Get raw vCard content for a contact without parsing."""
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
try:
response = await self._make_request("GET", url)
etag = response.headers.get("etag", "")
return response.text, etag
except Exception as e:
logger.error(f"Error getting raw vCard for {uid}: {e}")
raise
def _merge_vcard_properties(
self, raw_vcard: str, contact_data: dict, uid: str
) -> str:
"""Merge new contact data into existing raw vCard while preserving all properties."""
try:
# Instead of using pythonvCard4 which has formatting issues,
# let's do a simple text-based merge to preserve exact formatting
# Start with the original vCard
lines = raw_vcard.strip().split("\n")
updated_lines = []
# Track what we've updated to avoid duplicates
updated_properties = set()
for line in lines:
line = line.strip()
if not line:
continue
# Skip the END:VCARD line for now
if line == "END:VCARD":
continue
property_name = line.split(":")[0].split(";")[0]
# Handle updates for specific properties
if property_name == "FN" and "fn" in contact_data:
updated_lines.append(f"FN:{contact_data['fn']}")
updated_properties.add("fn")
elif property_name == "EMAIL" and "email" in contact_data:
# Replace first email with new one, preserve others
if "email" not in updated_properties:
if isinstance(contact_data["email"], str):
# Try to preserve the original format as much as possible
if ";TYPE=" in line:
type_part = line.split(";TYPE=")[1].split(":")[0]
updated_lines.append(
f"EMAIL;TYPE={type_part}:{contact_data['email']}"
)
else:
updated_lines.append(f"EMAIL:{contact_data['email']}")
updated_properties.add("email")
else:
# Keep additional emails unchanged
updated_lines.append(line)
elif property_name == "TEL" and "tel" in contact_data:
# Similar handling for phone numbers
if "tel" not in updated_properties:
if isinstance(contact_data["tel"], str):
if ";TYPE=" in line:
type_part = line.split(";TYPE=")[1].split(":")[0]
updated_lines.append(
f"TEL;TYPE={type_part}:{contact_data['tel']}"
)
else:
updated_lines.append(f"TEL:{contact_data['tel']}")
updated_properties.add("tel")
else:
# Keep additional phone numbers unchanged
updated_lines.append(line)
elif property_name == "NOTE" and "note" in contact_data:
updated_lines.append(f"NOTE:{contact_data['note']}")
updated_properties.add("note")
elif property_name == "NICKNAME" and "nickname" in contact_data:
nickname_value = contact_data["nickname"]
if isinstance(nickname_value, list):
nickname_value = ",".join(nickname_value)
updated_lines.append(f"NICKNAME:{nickname_value}")
updated_properties.add("nickname")
elif property_name == "BDAY" and "bday" in contact_data:
updated_lines.append(f"BDAY:{contact_data['bday']}")
updated_properties.add("bday")
elif property_name == "CATEGORIES" and "categories" in contact_data:
categories_value = contact_data["categories"]
if isinstance(categories_value, list):
categories_value = ",".join(categories_value)
updated_lines.append(f"CATEGORIES:{categories_value}")
updated_properties.add("categories")
elif property_name == "ORG" and (
"org" in contact_data or "organization" in contact_data
):
org_value = contact_data.get("org") or contact_data.get(
"organization"
)
updated_lines.append(f"ORG:{org_value}")
updated_properties.add("org")
elif property_name == "TITLE" and "title" in contact_data:
updated_lines.append(f"TITLE:{contact_data['title']}")
updated_properties.add("title")
else:
# Keep all other properties unchanged (preserves all extended/custom fields)
updated_lines.append(line)
# Add any new properties that weren't in the original vCard
for key, value in contact_data.items():
if key not in updated_properties:
if key == "fn":
updated_lines.append(f"FN:{value}")
elif key == "email" and isinstance(value, str):
updated_lines.append(f"EMAIL:{value}")
elif key == "tel" and isinstance(value, str):
updated_lines.append(f"TEL:{value}")
elif key == "note":
updated_lines.append(f"NOTE:{value}")
elif key == "nickname":
nickname_value = (
value if isinstance(value, str) else ",".join(value)
)
updated_lines.append(f"NICKNAME:{nickname_value}")
elif key == "bday":
updated_lines.append(f"BDAY:{value}")
elif key == "categories":
categories_value = (
value if isinstance(value, str) else ",".join(value)
)
updated_lines.append(f"CATEGORIES:{categories_value}")
elif key in ["org", "organization"]:
updated_lines.append(f"ORG:{value}")
elif key == "title":
updated_lines.append(f"TITLE:{value}")
# Add the END:VCARD line
updated_lines.append("END:VCARD")
# Join all lines
return "\n".join(updated_lines)
except Exception as e:
logger.error(f"Error merging vCard properties: {e}")
# Fallback to creating basic vCard matching Nextcloud format
basic_vcard = f"""BEGIN:VCARD
VERSION:3.0
UID:{uid}
FN:{contact_data.get("fn", "Unknown")}"""
if "email" in contact_data:
basic_vcard += f"\nEMAIL:{contact_data['email']}"
if "tel" in contact_data:
basic_vcard += f"\nTEL:{contact_data['tel']}"
basic_vcard += "\nEND:VCARD"
return basic_vcard
+15 -2
View File
@@ -18,8 +18,21 @@ class NotesClient(BaseNextcloudClient):
async def get_all_notes(self) -> List[Dict[str, Any]]:
"""Get all notes."""
response = await self._make_request("GET", "/apps/notes/api/v1/notes")
return response.json()
notes = []
cursor = ""
while True:
response = await self._make_request(
"GET",
"/apps/notes/api/v1/notes",
params={"chunkSize": 50, "chunkCursor": cursor},
)
notes.extend(response.json())
if "X-Notes-Chunk-Cursor" not in response.headers:
break
cursor = response.headers["X-Notes-Chunk-Cursor"]
return notes
async def get_note(self, note_id: int) -> Dict[str, Any]:
"""Get a specific note by ID."""
+12 -13
View File
@@ -31,7 +31,7 @@ class WebDAVClient(BaseNextcloudClient):
# First try a PROPFIND to verify resource exists
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
try:
propfind_resp = await self._client.request(
propfind_resp = await self._make_request(
"PROPFIND", webdav_path, headers=propfind_headers
)
logger.debug(
@@ -44,8 +44,7 @@ class WebDAVClient(BaseNextcloudClient):
# For other errors, continue with deletion attempt
# Proceed with deletion
response = await self._client.delete(webdav_path, headers=headers)
response.raise_for_status()
response = await self._make_request("DELETE", webdav_path, headers=headers)
logger.debug(f"Successfully deleted WebDAV resource '{path}'")
return {"status_code": response.status_code}
@@ -127,7 +126,7 @@ class WebDAVClient(BaseNextcloudClient):
# First check if we can access WebDAV at all
notes_dir_path = f"{webdav_base}/Notes"
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
notes_dir_response = await self._client.request(
notes_dir_response = await self._make_request(
"PROPFIND", notes_dir_path, headers=propfind_headers
)
@@ -146,7 +145,7 @@ class WebDAVClient(BaseNextcloudClient):
# Ensure the parent directory exists using MKCOL
mkcol_headers = {"OCS-APIRequest": "true"}
mkcol_response = await self._client.request(
mkcol_response = await self._make_request(
"MKCOL", parent_dir_path, headers=mkcol_headers
)
@@ -158,8 +157,8 @@ class WebDAVClient(BaseNextcloudClient):
mkcol_response.raise_for_status()
# Proceed with the PUT request
response = await self._client.put(
attachment_path, content=content, headers=headers
response = await self._make_request(
"PUT", attachment_path, content=content, headers=headers
)
response.raise_for_status()
logger.debug(
@@ -190,7 +189,7 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Fetching attachment '{filename}' for note {note_id}")
try:
response = await self._client.get(attachment_path)
response = await self._make_request("GET", attachment_path)
response.raise_for_status()
content = response.content
@@ -237,7 +236,7 @@ class WebDAVClient(BaseNextcloudClient):
headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
try:
response = await self._client.request(
response = await self._make_request(
"PROPFIND", webdav_path, content=propfind_body, headers=headers
)
response.raise_for_status()
@@ -320,7 +319,7 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Reading file: {path}")
try:
response = await self._client.get(webdav_path)
response = await self._make_request("GET", webdav_path)
response.raise_for_status()
content = response.content
@@ -354,8 +353,8 @@ class WebDAVClient(BaseNextcloudClient):
headers = {"Content-Type": content_type, "OCS-APIRequest": "true"}
try:
response = await self._client.put(
webdav_path, content=content, headers=headers
response = await self._make_request(
"PUT", webdav_path, content=content, headers=headers
)
response.raise_for_status()
@@ -382,7 +381,7 @@ class WebDAVClient(BaseNextcloudClient):
headers = {"OCS-APIRequest": "true"}
try:
response = await self._client.request("MKCOL", webdav_path, headers=headers)
response = await self._make_request("MKCOL", webdav_path, headers=headers)
response.raise_for_status()
logger.debug(f"Successfully created directory '{path}'")
+144
View File
@@ -0,0 +1,144 @@
"""Pydantic models for structured MCP server responses."""
# Base models
from .base import (
BaseResponse,
ErrorResponse,
SuccessResponse,
IdResponse,
StatusResponse,
)
# Notes models
from .notes import (
Note,
NoteSearchResult,
NotesSettings,
CreateNoteResponse,
UpdateNoteResponse,
DeleteNoteResponse,
AppendContentResponse,
SearchNotesResponse,
)
# Calendar models
from .calendar import (
Calendar,
CalendarEvent,
CalendarEventSummary,
CreateEventResponse,
UpdateEventResponse,
DeleteEventResponse,
ListEventsResponse,
ListCalendarsResponse,
AvailabilitySlot,
FindAvailabilityResponse,
BulkOperationResult,
BulkOperationResponse,
CreateMeetingResponse,
UpcomingEventsResponse,
ManageCalendarResponse,
)
# Contacts models
from .contacts import (
AddressBook,
Contact,
ContactField,
ListAddressBooksResponse,
ListContactsResponse,
CreateContactResponse,
UpdateContactResponse,
DeleteContactResponse,
CreateAddressBookResponse,
DeleteAddressBookResponse,
)
# Tables models
from .tables import (
Table,
TableColumn,
TableRow,
TableView,
TableSchema,
ListTablesResponse,
GetSchemaResponse,
ReadTableResponse,
CreateRowResponse,
UpdateRowResponse,
DeleteRowResponse,
)
# WebDAV models
from .webdav import (
FileInfo,
DirectoryListing,
ReadFileResponse,
WriteFileResponse,
CreateDirectoryResponse,
DeleteResourceResponse,
)
__all__ = [
# Base models
"BaseResponse",
"ErrorResponse",
"SuccessResponse",
"IdResponse",
"StatusResponse",
# Notes models
"Note",
"NoteSearchResult",
"NotesSettings",
"CreateNoteResponse",
"UpdateNoteResponse",
"DeleteNoteResponse",
"AppendContentResponse",
"SearchNotesResponse",
# Calendar models
"Calendar",
"CalendarEvent",
"CalendarEventSummary",
"CreateEventResponse",
"UpdateEventResponse",
"DeleteEventResponse",
"ListEventsResponse",
"ListCalendarsResponse",
"AvailabilitySlot",
"FindAvailabilityResponse",
"BulkOperationResult",
"BulkOperationResponse",
"CreateMeetingResponse",
"UpcomingEventsResponse",
"ManageCalendarResponse",
# Contacts models
"AddressBook",
"Contact",
"ContactField",
"ListAddressBooksResponse",
"ListContactsResponse",
"CreateContactResponse",
"UpdateContactResponse",
"DeleteContactResponse",
"CreateAddressBookResponse",
"DeleteAddressBookResponse",
# Tables models
"Table",
"TableColumn",
"TableRow",
"TableView",
"TableSchema",
"ListTablesResponse",
"GetSchemaResponse",
"ReadTableResponse",
"CreateRowResponse",
"UpdateRowResponse",
"DeleteRowResponse",
# WebDAV models
"FileInfo",
"DirectoryListing",
"ReadFileResponse",
"WriteFileResponse",
"CreateDirectoryResponse",
"DeleteResourceResponse",
]
+66
View File
@@ -0,0 +1,66 @@
"""Base Pydantic models for common response patterns."""
from datetime import datetime, timezone
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, Field, field_serializer
def _utc_now() -> datetime:
"""Generate UTC timestamp for responses."""
return datetime.now(timezone.utc)
class BaseResponse(BaseModel):
"""Base response model for all MCP tool responses."""
success: bool = Field(
default=True, description="Whether the operation was successful"
)
timestamp: datetime = Field(
default_factory=_utc_now, description="Response timestamp"
)
@field_serializer("timestamp")
def serialize_timestamp(self, timestamp: datetime) -> str:
"""Serialize timestamp to RFC3339 format for MCP compliance."""
if timestamp.tzinfo is None:
# If somehow we get a naive datetime, assume UTC
timestamp = timestamp.replace(tzinfo=timezone.utc)
# Use isoformat() which produces RFC3339 compliant format
# For UTC times, replace '+00:00' with 'Z' as preferred by many systems
iso_string = timestamp.isoformat()
if iso_string.endswith("+00:00"):
return iso_string[:-6] + "Z"
return iso_string
class ErrorResponse(BaseResponse):
"""Response model for error cases."""
success: bool = Field(default=False, description="Always False for error responses")
error: str = Field(description="Error message")
error_code: Optional[str] = Field(None, description="Optional error code")
details: Optional[Dict[str, Any]] = Field(
None, description="Additional error details"
)
class SuccessResponse(BaseResponse):
"""Generic success response."""
message: Optional[str] = Field(None, description="Optional success message")
data: Optional[Dict[str, Any]] = Field(None, description="Optional response data")
class IdResponse(BaseResponse):
"""Response model for operations that return a new ID."""
id: Union[int, str] = Field(description="ID of the created or affected resource")
class StatusResponse(BaseResponse):
"""Response model for operations that return just a status."""
status_code: Optional[int] = Field(None, description="HTTP status code")
message: Optional[str] = Field(None, description="Status message")
+182
View File
@@ -0,0 +1,182 @@
"""Pydantic models for Calendar app responses."""
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, StatusResponse
class Calendar(BaseModel):
"""Model for a Nextcloud calendar."""
name: str = Field(description="Calendar name/ID")
display_name: str = Field(description="Calendar display name")
description: Optional[str] = Field(None, description="Calendar description")
color: Optional[str] = Field(None, description="Calendar color")
href: Optional[str] = Field(None, description="Calendar DAV href")
timezone: Optional[str] = Field(None, description="Calendar timezone")
enabled: bool = Field(default=True, description="Whether calendar is enabled")
ctag: Optional[str] = Field(None, description="Calendar tag for synchronization")
class CalendarEventSummary(BaseModel):
"""Model for calendar event summary (for lists)."""
uid: str = Field(description="Event UID")
summary: str = Field(description="Event summary/title")
start: str = Field(description="Event start datetime (ISO format)")
end: Optional[str] = Field(None, description="Event end datetime (ISO format)")
all_day: bool = Field(default=False, description="Whether event is all-day")
location: Optional[str] = Field(None, description="Event location")
description: Optional[str] = Field(None, description="Event description")
categories: List[str] = Field(default_factory=list, description="Event categories")
status: Optional[str] = Field(
None, description="Event status (CONFIRMED, TENTATIVE, CANCELLED)"
)
class CalendarEvent(CalendarEventSummary):
"""Model for a complete calendar event."""
created: Optional[str] = Field(None, description="Event creation datetime")
last_modified: Optional[str] = Field(None, description="Last modification datetime")
recurring: bool = Field(default=False, description="Whether event is recurring")
recurrence_rule: Optional[str] = Field(None, description="RFC5545 recurrence rule")
recurrence_end: Optional[str] = Field(None, description="Recurrence end date")
attendees: List[str] = Field(
default_factory=list, description="List of attendee email addresses"
)
organizer: Optional[str] = Field(None, description="Event organizer")
priority: Optional[int] = Field(None, description="Event priority (1-9)")
privacy: Optional[str] = Field(None, description="Event privacy level")
url: Optional[str] = Field(None, description="Event URL")
duration_minutes: Optional[int] = Field(
None, description="Event duration in minutes"
)
reminder_minutes: Optional[int] = Field(
None, description="Reminder time in minutes before event"
)
reminder_email: bool = Field(
default=False, description="Whether to send email reminder"
)
color: Optional[str] = Field(None, description="Event color")
etag: Optional[str] = Field(None, description="ETag for versioning")
class CreateEventResponse(BaseResponse):
"""Response model for event creation."""
event: CalendarEvent = Field(description="The created event")
calendar_name: str = Field(
description="Name of the calendar the event was created in"
)
class UpdateEventResponse(BaseResponse):
"""Response model for event updates."""
event: CalendarEvent = Field(description="The updated event")
calendar_name: str = Field(description="Name of the calendar the event belongs to")
class DeleteEventResponse(StatusResponse):
"""Response model for event deletion."""
deleted_uid: str = Field(description="UID of the deleted event")
calendar_name: str = Field(
description="Name of the calendar the event was deleted from"
)
class ListEventsResponse(BaseResponse):
"""Response model for listing events."""
events: List[CalendarEventSummary] = Field(description="List of events")
calendar_name: Optional[str] = Field(
None, description="Calendar name (if filtered to one calendar)"
)
start_date: Optional[str] = Field(None, description="Start date filter applied")
end_date: Optional[str] = Field(None, description="End date filter applied")
total_found: int = Field(description="Total number of events found")
class ListCalendarsResponse(BaseResponse):
"""Response model for listing calendars."""
calendars: List[Calendar] = Field(description="List of available calendars")
total_count: int = Field(description="Total number of calendars")
class AvailabilitySlot(BaseModel):
"""Model for an available time slot."""
start: str = Field(description="Slot start datetime (ISO format)")
end: str = Field(description="Slot end datetime (ISO format)")
duration_minutes: int = Field(description="Slot duration in minutes")
date: str = Field(description="Date of the slot (YYYY-MM-DD)")
class FindAvailabilityResponse(BaseResponse):
"""Response model for finding availability."""
available_slots: List[AvailabilitySlot] = Field(
description="List of available time slots"
)
duration_requested: int = Field(description="Requested duration in minutes")
date_range_start: str = Field(description="Start date of search range")
date_range_end: str = Field(description="End date of search range")
attendees_checked: List[str] = Field(
default_factory=list, description="Attendees checked for availability"
)
business_hours_only: bool = Field(
description="Whether search was limited to business hours"
)
class BulkOperationResult(BaseModel):
"""Model for bulk operation results."""
operation: str = Field(description="Operation performed (update, delete, move)")
events_processed: int = Field(description="Number of events processed")
events_successful: int = Field(
description="Number of events successfully processed"
)
events_failed: int = Field(description="Number of events that failed processing")
failed_events: List[str] = Field(
default_factory=list, description="UIDs of events that failed"
)
errors: List[str] = Field(default_factory=list, description="Error messages")
class BulkOperationResponse(BaseResponse):
"""Response model for bulk operations."""
result: BulkOperationResult = Field(description="Bulk operation result")
class CreateMeetingResponse(CreateEventResponse):
"""Response model for meeting creation (same as event creation)."""
pass
class UpcomingEventsResponse(BaseResponse):
"""Response model for upcoming events."""
events: List[CalendarEventSummary] = Field(description="List of upcoming events")
days_ahead: int = Field(description="Number of days ahead searched")
calendar_name: Optional[str] = Field(
None, description="Calendar name (if filtered to one calendar)"
)
class ManageCalendarResponse(BaseResponse):
"""Response model for calendar management operations."""
action: str = Field(description="Action performed (create, delete, update, list)")
calendar: Optional[Calendar] = Field(None, description="Calendar that was affected")
calendars: Optional[List[Calendar]] = Field(
None, description="List of calendars (for list action)"
)
message: str = Field(description="Success message")
+130
View File
@@ -0,0 +1,130 @@
"""Pydantic models for Contacts app responses."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, StatusResponse
class AddressBook(BaseModel):
"""Model for a Nextcloud address book."""
uri: str = Field(description="Address book URI")
displayname: str = Field(description="Address book display name")
description: Optional[str] = Field(None, description="Address book description")
ctag: Optional[str] = Field(
None, description="Address book tag for synchronization"
)
class ContactField(BaseModel):
"""Model for a contact field (email, phone, etc.)."""
type: str = Field(description="Field type (e.g., 'email', 'phone', 'address')")
value: str = Field(description="Field value")
label: Optional[str] = Field(None, description="Field label (e.g., 'work', 'home')")
preferred: bool = Field(
default=False, description="Whether this is the preferred field of this type"
)
class Contact(BaseModel):
"""Model for a Nextcloud contact."""
uid: str = Field(description="Contact UID")
fn: str = Field(description="Full name (formatted name)")
given_name: Optional[str] = Field(None, description="Given name")
family_name: Optional[str] = Field(None, description="Family name")
organization: Optional[str] = Field(None, description="Organization")
title: Optional[str] = Field(None, description="Job title")
emails: List[ContactField] = Field(
default_factory=list, description="Email addresses"
)
phones: List[ContactField] = Field(
default_factory=list, description="Phone numbers"
)
addresses: List[ContactField] = Field(default_factory=list, description="Addresses")
urls: List[ContactField] = Field(default_factory=list, description="URLs")
note: Optional[str] = Field(None, description="Notes")
photo: Optional[str] = Field(None, description="Photo URL or base64 data")
birthday: Optional[str] = Field(None, description="Birthday (ISO date format)")
categories: List[str] = Field(
default_factory=list, description="Contact categories"
)
custom_fields: Dict[str, Any] = Field(
default_factory=dict, description="Custom fields"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
@property
def primary_email(self) -> Optional[str]:
"""Get the primary email address."""
if not self.emails:
return None
# Return preferred email if available, otherwise first email
preferred = next(
(email.value for email in self.emails if email.preferred), None
)
return preferred or self.emails[0].value
@property
def primary_phone(self) -> Optional[str]:
"""Get the primary phone number."""
if not self.phones:
return None
# Return preferred phone if available, otherwise first phone
preferred = next(
(phone.value for phone in self.phones if phone.preferred), None
)
return preferred or self.phones[0].value
class ListAddressBooksResponse(BaseResponse):
"""Response model for listing address books."""
addressbooks: List[AddressBook] = Field(
description="List of available address books"
)
total_count: int = Field(description="Total number of address books")
class ListContactsResponse(BaseResponse):
"""Response model for listing contacts."""
contacts: List[Contact] = Field(description="List of contacts")
addressbook: str = Field(description="Address book name")
total_count: int = Field(description="Total number of contacts")
class CreateContactResponse(BaseResponse):
"""Response model for contact creation."""
contact: Contact = Field(description="The created contact")
addressbook: str = Field(description="Address book the contact was created in")
class UpdateContactResponse(BaseResponse):
"""Response model for contact updates."""
contact: Contact = Field(description="The updated contact")
addressbook: str = Field(description="Address book the contact belongs to")
class DeleteContactResponse(StatusResponse):
"""Response model for contact deletion."""
deleted_uid: str = Field(description="UID of the deleted contact")
addressbook: str = Field(description="Address book the contact was deleted from")
class CreateAddressBookResponse(BaseResponse):
"""Response model for address book creation."""
addressbook: AddressBook = Field(description="The created address book")
class DeleteAddressBookResponse(StatusResponse):
"""Response model for address book deletion."""
deleted_name: str = Field(description="Name of the deleted address book")
+82
View File
@@ -0,0 +1,82 @@
"""Pydantic models for Notes app responses."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, IdResponse, StatusResponse
class Note(BaseModel):
"""Model for a Nextcloud note."""
id: int = Field(description="Note ID")
title: str = Field(description="Note title")
content: str = Field(description="Note content in markdown")
category: str = Field(default="", description="Note category")
modified: int = Field(description="Unix timestamp of last modification")
favorite: bool = Field(
default=False, description="Whether note is marked as favorite"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
readonly: bool = Field(default=False, description="Whether note is read-only")
@property
def modified_datetime(self) -> datetime:
"""Convert Unix timestamp to datetime."""
return datetime.fromtimestamp(self.modified)
class NoteSearchResult(BaseModel):
"""Model for note search results (limited fields)."""
id: int = Field(description="Note ID")
title: str = Field(description="Note title")
category: str = Field(default="", description="Note category")
score: Optional[float] = Field(None, description="Search relevance score")
class NotesSettings(BaseModel):
"""Model for Notes app settings."""
notesPath: str = Field(description="Path to notes directory")
fileSuffix: str = Field(description="File suffix for notes")
noteMode: str = Field(description="Note mode setting")
class CreateNoteResponse(IdResponse):
"""Response model for note creation."""
title: str = Field(description="The created note title")
category: str = Field(description="The created note category")
class UpdateNoteResponse(BaseResponse):
"""Response model for note updates."""
id: int = Field(description="The updated note ID")
title: str = Field(description="The updated note title")
category: str = Field(description="The updated note category")
class DeleteNoteResponse(StatusResponse):
"""Response model for note deletion."""
deleted_id: int = Field(description="ID of the deleted note")
class AppendContentResponse(BaseResponse):
"""Response model for appending content to a note."""
id: int = Field(description="The updated note ID")
title: str = Field(description="The updated note title")
category: str = Field(description="The updated note category")
class SearchNotesResponse(BaseResponse):
"""Response model for note search."""
results: List[NoteSearchResult] = Field(description="Search results")
query: str = Field(description="The search query used")
total_found: int = Field(description="Total number of notes found")
+142
View File
@@ -0,0 +1,142 @@
"""Pydantic models for Tables app responses."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, IdResponse, StatusResponse
class TableColumn(BaseModel):
"""Model for a table column definition."""
id: int = Field(description="Column ID")
title: str = Field(description="Column title")
type: str = Field(description="Column type (text, number, datetime, etc.)")
subtype: Optional[str] = Field(None, description="Column subtype")
mandatory: bool = Field(default=False, description="Whether column is mandatory")
description: Optional[str] = Field(None, description="Column description")
text_default: Optional[str] = Field(None, description="Default text value")
text_allowed_pattern: Optional[str] = Field(
None, description="Allowed text pattern"
)
text_max_length: Optional[int] = Field(None, description="Maximum text length")
number_default: Optional[float] = Field(None, description="Default number value")
number_min: Optional[float] = Field(None, description="Minimum number value")
number_max: Optional[float] = Field(None, description="Maximum number value")
number_decimals: Optional[int] = Field(None, description="Number of decimal places")
datetime_default: Optional[str] = Field(None, description="Default datetime value")
selection_options: List[str] = Field(
default_factory=list, description="Selection options"
)
selection_default: Optional[str] = Field(
None, description="Default selection value"
)
class TableRow(BaseModel):
"""Model for a table row."""
id: int = Field(description="Row ID")
created_by: Optional[str] = Field(None, description="User who created the row")
created_at: Optional[str] = Field(None, description="Row creation timestamp")
last_edit_by: Optional[str] = Field(
None, description="User who last edited the row"
)
last_edit_at: Optional[str] = Field(None, description="Last edit timestamp")
data: Dict[int, Any] = Field(description="Row data keyed by column ID")
class TableView(BaseModel):
"""Model for a table view."""
id: int = Field(description="View ID")
title: str = Field(description="View title")
emoji: Optional[str] = Field(None, description="View emoji")
description: Optional[str] = Field(None, description="View description")
columns: List[int] = Field(
default_factory=list, description="List of column IDs in this view"
)
sort: List[Dict[str, Any]] = Field(
default_factory=list, description="Sort configuration"
)
filter: List[Dict[str, Any]] = Field(
default_factory=list, description="Filter configuration"
)
class Table(BaseModel):
"""Model for a Nextcloud table."""
id: int = Field(description="Table ID")
title: str = Field(description="Table title")
emoji: Optional[str] = Field(None, description="Table emoji")
ownership: str = Field(description="Table ownership")
owner_display_name: str = Field(description="Display name of table owner")
created_by: Optional[str] = Field(None, description="User who created the table")
created_at: Optional[str] = Field(None, description="Table creation timestamp")
last_edit_by: Optional[str] = Field(
None, description="User who last edited the table"
)
last_edit_at: Optional[str] = Field(None, description="Last edit timestamp")
row_count: int = Field(default=0, description="Number of rows in the table")
has_shares: bool = Field(default=False, description="Whether table is shared")
archived: bool = Field(default=False, description="Whether table is archived")
is_shared: bool = Field(
default=False, description="Whether table is shared with current user"
)
on_share_permissions: Optional[Dict[str, Any]] = Field(
None, description="Share permissions"
)
class TableSchema(BaseModel):
"""Model for complete table schema including columns and views."""
table: Table = Field(description="Table information")
columns: List[TableColumn] = Field(description="Table columns")
views: List[TableView] = Field(description="Table views")
class ListTablesResponse(BaseResponse):
"""Response model for listing tables."""
tables: List[Table] = Field(description="List of available tables")
total_count: int = Field(description="Total number of tables")
class GetSchemaResponse(BaseResponse):
"""Response model for getting table schema."""
table_schema: TableSchema = Field(description="Table schema information")
class ReadTableResponse(BaseResponse):
"""Response model for reading table rows."""
rows: List[TableRow] = Field(description="Table rows")
table_id: int = Field(description="Table ID")
total_count: Optional[int] = Field(
None, description="Total number of rows (if known)"
)
offset: Optional[int] = Field(None, description="Offset used for pagination")
limit: Optional[int] = Field(None, description="Limit used for pagination")
class CreateRowResponse(IdResponse):
"""Response model for row creation."""
row: TableRow = Field(description="The created row")
table_id: int = Field(description="Table ID the row was created in")
class UpdateRowResponse(BaseResponse):
"""Response model for row updates."""
row: TableRow = Field(description="The updated row")
class DeleteRowResponse(StatusResponse):
"""Response model for row deletion."""
deleted_id: int = Field(description="ID of the deleted row")
+88
View File
@@ -0,0 +1,88 @@
"""Pydantic models for WebDAV responses."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse, StatusResponse
class FileInfo(BaseModel):
"""Model for file/directory information."""
name: str = Field(description="File/directory name")
path: str = Field(description="Full path")
is_directory: bool = Field(description="Whether this is a directory")
size: Optional[int] = Field(
None, description="File size in bytes (None for directories)"
)
content_type: Optional[str] = Field(None, description="MIME content type")
last_modified: Optional[str] = Field(
None, description="Last modification time (ISO format)"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
@property
def last_modified_datetime(self) -> Optional[datetime]:
"""Convert last modified string to datetime."""
if not self.last_modified:
return None
try:
return datetime.fromisoformat(self.last_modified.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
class DirectoryListing(BaseResponse):
"""Response model for directory listings."""
path: str = Field(description="Directory path")
items: List[FileInfo] = Field(description="Files and directories in the path")
total_count: int = Field(description="Total number of items")
directories_count: int = Field(description="Number of directories")
files_count: int = Field(description="Number of files")
total_size: int = Field(default=0, description="Total size of all files in bytes")
class ReadFileResponse(BaseResponse):
"""Response model for reading file contents."""
path: str = Field(description="File path")
content: str = Field(description="File content (text or base64 for binary)")
content_type: str = Field(description="MIME content type")
size: int = Field(description="File size in bytes")
encoding: Optional[str] = Field(
None, description="Encoding used (e.g., 'base64' for binary files)"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
last_modified: Optional[str] = Field(None, description="Last modification time")
class WriteFileResponse(StatusResponse):
"""Response model for writing files."""
path: str = Field(description="File path that was written")
size: Optional[int] = Field(None, description="Size of the written file")
created: bool = Field(description="Whether a new file was created (vs overwritten)")
class CreateDirectoryResponse(StatusResponse):
"""Response model for directory creation."""
path: str = Field(description="Directory path that was created")
created: bool = Field(
description="Whether directory was created or already existed"
)
class DeleteResourceResponse(StatusResponse):
"""Response model for resource deletion."""
path: str = Field(description="Path that was deleted")
was_directory: bool = Field(
description="Whether the deleted resource was a directory"
)
items_deleted: Optional[int] = Field(
None, description="Number of items deleted (for directories)"
)
+9 -2
View File
@@ -5,6 +5,10 @@ from typing import Optional
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.models.calendar import (
Calendar,
ListCalendarsResponse,
)
logger = logging.getLogger(__name__)
@@ -12,10 +16,13 @@ logger = logging.getLogger(__name__)
def configure_calendar_tools(mcp: FastMCP):
# Calendar tools
@mcp.tool()
async def nc_calendar_list_calendars(ctx: Context):
async def nc_calendar_list_calendars(ctx: Context) -> ListCalendarsResponse:
"""List all available calendars for the user"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.calendar.list_calendars()
calendars_data = await client.calendar.list_calendars()
calendars = [Calendar(**cal_data) for cal_data in calendars_data]
return ListCalendarsResponse(calendars=calendars, total_count=len(calendars))
@mcp.tool()
async def nc_calendar_create_event(
+18 -1
View File
@@ -17,7 +17,7 @@ def configure_contacts_tools(mcp: FastMCP):
@mcp.tool()
async def nc_contacts_list_contacts(ctx: Context, *, addressbook: str):
"""List all addressbooks for the user."""
"""List all contacts in the specified addressbook."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.list_contacts(addressbook=addressbook)
@@ -63,3 +63,20 @@ def configure_contacts_tools(mcp: FastMCP):
"""Delete a contact."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.delete_contact(addressbook=addressbook, uid=uid)
@mcp.tool()
async def nc_contacts_update_contact(
ctx: Context, *, addressbook: str, uid: str, contact_data: dict, etag: str = ""
):
"""Update an existing contact while preserving all existing properties.
Args:
addressbook: The name of the addressbook containing the contact.
uid: The unique ID of the contact to update.
contact_data: A dictionary with the contact's updated details, e.g. {"fn": "Jane Doe", "email": "jane.doe@example.com"}.
etag: Optional ETag for optimistic concurrency control.
"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.update_contact(
addressbook=addressbook, uid=uid, contact_data=contact_data, etag=etag
)
+174 -25
View File
@@ -1,8 +1,20 @@
import logging
from httpx import HTTPStatusError
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.models.base import ErrorResponse
from nextcloud_mcp_server.models.notes import (
Note,
NotesSettings,
CreateNoteResponse,
UpdateNoteResponse,
DeleteNoteResponse,
AppendContentResponse,
SearchNotesResponse,
NoteSearchResult,
)
logger = logging.getLogger(__name__)
@@ -15,7 +27,8 @@ def configure_notes_tools(mcp: FastMCP):
mcp.get_context()
) # https://github.com/modelcontextprotocol/python-sdk/issues/244
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.get_settings()
settings_data = await client.notes.get_settings()
return NotesSettings(**settings_data)
@mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}")
async def nc_notes_get_attachment(note_id: int, attachment_filename: str):
@@ -38,23 +51,63 @@ def configure_notes_tools(mcp: FastMCP):
]
}
@mcp.tool()
async def nc_get_note(note_id: int, ctx: Context):
@mcp.resource("nc://Notes/{note_id}")
async def nc_get_note(note_id: int):
"""Get user note using note id"""
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
ctx: Context = mcp.get_context()
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.get_note(note_id)
try:
note_data = await client.notes.get_note(note_id)
return Note(**note_data)
except HTTPStatusError as e:
if e.response.status_code == 404:
raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
elif e.response.status_code == 403:
raise McpError(
ErrorData(code=-1, message=f"Access denied to note {note_id}")
)
else:
raise McpError(
ErrorData(
code=-1,
message=f"Failed to retrieve note {note_id}: {e.response.reason_phrase}",
)
)
@mcp.tool()
async def nc_notes_create_note(
title: str, content: str, category: str, ctx: Context
):
) -> CreateNoteResponse | ErrorResponse:
"""Create a new note"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.create_note(
title=title,
content=content,
category=category,
)
try:
note_data = await client.notes.create_note(
title=title,
content=content,
category=category,
)
note = Note(**note_data)
return CreateNoteResponse(
id=note.id, title=note.title, category=note.category
)
except HTTPStatusError as e:
if e.response.status_code == 403:
return ErrorResponse(
error="Access denied: insufficient permissions to create notes"
)
elif e.response.status_code == 413:
return ErrorResponse(error="Note content too large")
elif e.response.status_code == 409:
return ErrorResponse(
error=f"A note with title '{title}' already exists in this category"
)
else:
return ErrorResponse(
error=f"Failed to create note: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_update_note(
@@ -64,32 +117,128 @@ def configure_notes_tools(mcp: FastMCP):
content: str | None,
category: str | None,
ctx: Context,
):
) -> UpdateNoteResponse | ErrorResponse:
"""Update an existing note's title, content, or category"""
logger.info("Updating note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.update(
note_id=note_id,
etag=etag,
title=title,
content=content,
category=category,
)
try:
note_data = await client.notes.update(
note_id=note_id,
etag=etag,
title=title,
content=content,
category=category,
)
note = Note(**note_data)
return UpdateNoteResponse(
id=note.id, title=note.title, category=note.category
)
except HTTPStatusError as e:
if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found")
elif e.response.status_code == 412:
return ErrorResponse(
error=f"Note {note_id} has been modified by someone else. Please refresh and try again."
)
elif e.response.status_code == 403:
return ErrorResponse(
error=f"Access denied: insufficient permissions to update note {note_id}"
)
elif e.response.status_code == 413:
return ErrorResponse(error="Updated note content is too large")
else:
return ErrorResponse(
error=f"Failed to update note {note_id}: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_append_content(note_id: int, content: str, ctx: Context):
"""Append content to an existing note with a clear separator"""
async def nc_notes_append_content(
note_id: int, content: str, ctx: Context
) -> AppendContentResponse | ErrorResponse:
"""Append content to an existing note with a clear separator. The tool automatically adds separators between existing and new content - do not include separators in your content."""
logger.info("Appending content to note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.append_content(note_id=note_id, content=content)
try:
note_data = await client.notes.append_content(
note_id=note_id, content=content
)
note = Note(**note_data)
return AppendContentResponse(
id=note.id, title=note.title, category=note.category
)
except HTTPStatusError as e:
if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found")
elif e.response.status_code == 403:
return ErrorResponse(
error=f"Access denied: insufficient permissions to modify note {note_id}"
)
elif e.response.status_code == 413:
return ErrorResponse(
error="Content to append would make the note too large"
)
else:
return ErrorResponse(
error=f"Failed to append content to note {note_id}: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_search_notes(query: str, ctx: Context):
async def nc_notes_search_notes(
query: str, ctx: Context
) -> SearchNotesResponse | ErrorResponse:
"""Search notes by title or content, returning only id, title, and category."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_search_notes(query=query)
try:
search_results_raw = await client.notes_search_notes(query=query)
# Convert to NoteSearchResult models, including the _score field
results = [
NoteSearchResult(
id=result["id"],
title=result["title"],
category=result["category"],
score=result.get("_score"), # Include search score if available
)
for result in search_results_raw
]
return SearchNotesResponse(
results=results, query=query, total_found=len(results)
)
except HTTPStatusError as e:
if e.response.status_code == 403:
return ErrorResponse(
error="Access denied: insufficient permissions to search notes"
)
elif e.response.status_code == 400:
return ErrorResponse(error="Invalid search query format")
else:
return ErrorResponse(
error=f"Search failed: server error ({e.response.status_code})"
)
@mcp.tool()
async def nc_notes_delete_note(note_id: int, ctx: Context):
async def nc_notes_delete_note(
note_id: int, ctx: Context
) -> DeleteNoteResponse | ErrorResponse:
"""Delete a note permanently"""
logger.info("Deleting note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes.delete_note(note_id)
try:
await client.notes.delete_note(note_id)
return DeleteNoteResponse(
status_code=200,
message=f"Note {note_id} deleted successfully",
deleted_id=note_id,
)
except HTTPStatusError as e:
if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found")
elif e.response.status_code == 403:
return ErrorResponse(
error=f"Access denied: insufficient permissions to delete note {note_id}"
)
else:
return ErrorResponse(
error=f"Failed to delete note {note_id}: server error ({e.response.status_code})"
)
+2 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.6.1"
version = "0.8.0"
description = ""
authors = [
{name = "Chris Coutinho",email = "chris@coutinho.io"}
@@ -13,6 +13,7 @@ dependencies = [
"pillow (>=11.2.1,<12.0.0)",
"icalendar (>=6.0.0,<7.0.0)",
"pythonvcard4>=0.2.0",
"pydantic>=2.11.4",
]
[tool.pytest.ini_options]
+1 -1
View File
@@ -13,7 +13,7 @@ from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
async def nc_client() -> AsyncGenerator[NextcloudClient, Any]:
"""
Fixture to create a NextcloudClient instance for integration tests.
+186
View File
@@ -0,0 +1,186 @@
"""Test error propagation in the MCP server for various error scenarios."""
import logging
from mcp import ClientSession
from mcp.shared.exceptions import McpError
import pytest
logger = logging.getLogger(__name__)
@pytest.mark.integration
async def test_missing_note_resource_error(nc_mcp_client: ClientSession):
"""Test that accessing a non-existent note resource returns proper error."""
# Try to get a non-existent note via resource - should raise McpError with improved message
with pytest.raises(McpError, match=r"Note 999999 not found"):
await nc_mcp_client.read_resource("nc://Notes/999999")
@pytest.mark.integration
async def test_delete_missing_note_tool_error(nc_mcp_client: ClientSession):
"""Test that deleting a non-existent note returns proper error."""
# Try to delete a non-existent note
response = await nc_mcp_client.call_tool(
"nc_notes_delete_note", {"note_id": 999999}
)
logger.info(f"Delete missing note response: {response}")
# Should return structured error response with improved message
assert response is not None
assert (
response.isError is False
) # Tools now return structured responses, not MCP errors
# Check structured content for error
assert "success" in response.structuredContent["result"]
assert response.structuredContent["result"]["success"] is False
assert "Note 999999 not found" in response.structuredContent["result"]["error"]
@pytest.mark.integration
async def test_search_with_empty_query(nc_mcp_client: ClientSession):
"""Test search behavior with empty query."""
# Search with empty query
response = await nc_mcp_client.call_tool("nc_notes_search_notes", {"query": ""})
logger.info(f"Empty search query response: {response}")
# Should return successful response with empty or valid results
assert response is not None
assert response.isError is False
@pytest.mark.integration
async def test_tool_missing_required_parameters(nc_mcp_client: ClientSession):
"""Test calling a tool with missing required parameters."""
# Try to create note with missing parameters
response = await nc_mcp_client.call_tool(
"nc_notes_create_note",
{"title": "Test"}, # Missing content and category
)
logger.info(f"Missing params response: {response}")
# Should return error response for missing required parameters
assert response is not None
assert response.isError is True
assert (
"required" in response.content[0].text.lower()
or "missing" in response.content[0].text.lower()
)
@pytest.mark.integration
async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_client):
"""Test updating a note with invalid ETag."""
# First create a note
note_data = await nc_client.notes.create_note(
title="Test Note for ETag", content="Test content", category=""
)
note_id = note_data["id"]
try:
# Try to update with invalid ETag
response = await nc_mcp_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": "invalid-etag",
"title": "Updated Title",
"content": None,
"category": None,
},
)
logger.info(f"Invalid ETag response: {response}")
# Should return structured error response with improved message
assert response is not None
assert response.isError is False # Tools now return structured responses
assert "success" in response.structuredContent["result"]
assert response.structuredContent["result"]["success"] is False
assert (
"modified by someone else" in response.structuredContent["result"]["error"]
)
finally:
# Clean up
await nc_client.notes.delete_note(note_id)
@pytest.mark.integration
async def test_calendar_missing_calendar_error(nc_mcp_client: ClientSession):
"""Test calendar operations with non-existent calendar."""
# Try to create event in non-existent calendar
response = await nc_mcp_client.call_tool(
"nc_calendar_create_event",
{
"calendar_name": "non-existent-calendar",
"title": "Test Event",
"start_datetime": "2025-01-15T14:00:00",
},
)
logger.info(f"Non-existent calendar response: {response}")
# Should return structured error response
assert response is not None
# Note: Some modules may not have improved error handling yet
# Check if we have structured content with success=false or isError=true
if (
hasattr(response, "structuredContent")
and response.structuredContent
and "result" in response.structuredContent
):
assert response.structuredContent["result"]["success"] is False
else:
assert response.isError is True
@pytest.mark.integration
async def test_webdav_read_missing_file_error(nc_mcp_client: ClientSession):
"""Test WebDAV operations with non-existent file."""
# Try to read a non-existent file
response = await nc_mcp_client.call_tool(
"nc_webdav_read_file", {"path": "non-existent-file.txt"}
)
logger.info(f"Missing file response: {response}")
# Should return structured error response
assert response is not None
# Note: Some modules may not have improved error handling yet
# Check if we have structured content with success=false or isError=true
if (
hasattr(response, "structuredContent")
and response.structuredContent
and "result" in response.structuredContent
):
assert response.structuredContent["result"]["success"] is False
else:
assert response.isError is True
@pytest.mark.integration
async def test_tables_missing_table_error(nc_mcp_client: ClientSession):
"""Test Tables operations with non-existent table."""
# Try to get schema of non-existent table
response = await nc_mcp_client.call_tool(
"nc_tables_get_schema", {"table_id": 999999}
)
logger.info(f"Missing table response: {response}")
# Should return structured error response
assert response is not None
# Note: Some modules may not have improved error handling yet
# Check if we have structured content with success=false or isError=true
if (
hasattr(response, "structuredContent")
and response.structuredContent
and "result" in response.structuredContent
):
assert response.structuredContent["result"]["success"] is False
else:
assert response.isError is True
@@ -0,0 +1,436 @@
"""Integration tests for CalDAV and CardDAV field preservation.
This test module demonstrates data loss issues when non-supported fields
are present in calendar events and contacts during round-trip operations.
"""
import logging
import pytest
import uuid
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
@pytest.mark.integration
async def test_calendar_event_custom_fields_preservation(nc_client):
"""Test that demonstrates loss of non-supported iCal fields during round-trip operations."""
calendar_name = "personal"
# Create an event with standard fields
event_data = {
"title": "Test Event with Custom Fields",
"description": "Event to test custom field preservation",
"start_datetime": (datetime.now() + timedelta(days=1)).isoformat(),
"end_datetime": (datetime.now() + timedelta(days=1, hours=1)).isoformat(),
"location": "Test Location",
}
# Create the event
result = await nc_client.calendar.create_event(calendar_name, event_data)
event_uid = result["uid"]
try:
# Now manually inject a custom iCal property by creating a new version with raw iCal
# This simulates what would happen if the event was created by another CalDAV client
# with extended properties
custom_ical = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Test Client//EN
BEGIN:VEVENT
UID:{event_uid}
DTSTART:{(datetime.now() + timedelta(days=1)).strftime("%Y%m%dT%H%M%SZ")}
DTEND:{(datetime.now() + timedelta(days=1, hours=1)).strftime("%Y%m%dT%H%M%SZ")}
SUMMARY:Test Event with Custom Fields
DESCRIPTION:Event to test custom field preservation
LOCATION:Test Location
X-CUSTOM-FIELD:This is a custom field that should be preserved
X-VENDOR-SPECIFIC:Vendor specific data
CATEGORIES:work,testing
STATUS:CONFIRMED
PRIORITY:5
CLASS:PUBLIC
CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VEVENT
END:VCALENDAR"""
# Direct CalDAV PUT to inject the custom iCal
event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics"
await nc_client.calendar._make_request(
"PUT",
event_path,
content=custom_ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
logger.info(f"Injected custom iCal properties into event {event_uid}")
# Retrieve the event to confirm custom fields are present in raw iCal
response = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
raw_ical_before = response.text
logger.info("Raw iCal before update:")
logger.info(raw_ical_before)
# Verify custom fields exist in raw iCal
assert (
"X-CUSTOM-FIELD:This is a custom field that should be preserved"
in raw_ical_before
)
assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_before
# Now update the event through the MCP client (simulating normal usage)
update_data = {
"title": "Updated Test Event with Custom Fields",
"description": "Updated description - custom fields should be preserved",
}
await nc_client.calendar.update_event(calendar_name, event_uid, update_data)
logger.info(f"Updated event {event_uid} through MCP client")
# Retrieve the event again to see if custom fields survived
response_after = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
raw_ical_after = response_after.text
logger.info("Raw iCal after update:")
logger.info(raw_ical_after)
# THIS IS THE TEST THAT SHOULD FAIL - custom fields should be preserved but won't be
try:
assert (
"X-CUSTOM-FIELD:This is a custom field that should be preserved"
in raw_ical_after
), "Custom field X-CUSTOM-FIELD was lost during round-trip update"
assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_after, (
"Custom field X-VENDOR-SPECIFIC was lost during round-trip update"
)
logger.info(
"✓ Custom fields were preserved (unexpected - this should fail with current implementation)"
)
except AssertionError as e:
logger.error(f"✗ Custom fields were lost during round-trip update: {e}")
# Re-raise to show the test failure
raise
finally:
# Cleanup
try:
await nc_client.calendar.delete_event(calendar_name, event_uid)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")
@pytest.mark.integration
async def test_contact_extended_fields_preservation(nc_client):
"""Test that demonstrates loss of extended vCard fields during round-trip operations."""
addressbook_name = f"test_preservation_{uuid.uuid4().hex[:8]}"
# Create a temporary addressbook
await nc_client.contacts.create_addressbook(
name=addressbook_name, display_name="Test Preservation Addressbook"
)
try:
contact_uid = str(uuid.uuid4())
# Create a contact with minimal data first
basic_contact_data = {
"fn": "John Extended Doe",
"email": "john.extended@example.com",
}
await nc_client.contacts.create_contact(
addressbook=addressbook_name,
uid=contact_uid,
contact_data=basic_contact_data,
)
logger.info(f"Created basic contact {contact_uid}")
# Now inject a rich vCard with extended fields directly via CardDAV
extended_vcard = f"""BEGIN:VCARD
VERSION:4.0
UID:{contact_uid}
FN:John Extended Doe
N:Doe;John;Extended;;
NICKNAME:Johnny,JD
EMAIL;TYPE=work:john.work@company.com
EMAIL;TYPE=home:john.extended@example.com
TEL;TYPE=cell:+1-555-123-4567
TEL;TYPE=work:+1-555-987-6543
ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA
ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA
ORG:Example Corporation
TITLE:Senior Developer
URL;TYPE=work:https://company.com/john
URL;TYPE=personal:https://johndoe.dev
BDAY:1985-06-15
NOTE:This is a note with important information that should be preserved.
CATEGORIES:colleagues,developers,friends
X-CUSTOM-FIELD:This should be preserved
X-SKYPE:john.doe.skype
X-LINKEDIN:https://linkedin.com/in/johndoe
REV:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VCARD"""
# Direct CardDAV PUT to inject the extended vCard
contact_path = f"/remote.php/dav/addressbooks/users/{nc_client.contacts.username}/{addressbook_name}/{contact_uid}.vcf"
await nc_client.contacts._make_request(
"PUT",
contact_path,
content=extended_vcard,
headers={"Content-Type": "text/vcard; charset=utf-8"},
)
logger.info(f"Injected extended vCard for contact {contact_uid}")
# Retrieve the contact to confirm extended fields are present in raw vCard
response = await nc_client.contacts._make_request("GET", contact_path)
raw_vcard_before = response.text
logger.info("Raw vCard before any operations:")
logger.info(raw_vcard_before)
# Verify extended fields exist in raw vCard
assert "TEL;TYPE=cell:+1-555-123-4567" in raw_vcard_before
assert "ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA" in raw_vcard_before
assert "ORG:Example Corporation" in raw_vcard_before
assert "TITLE:Senior Developer" in raw_vcard_before
assert "X-CUSTOM-FIELD:This should be preserved" in raw_vcard_before
assert "X-LINKEDIN:https://linkedin.com/in/johndoe" in raw_vcard_before
assert "NOTE:This is a note with important information" in raw_vcard_before
# List contacts through the MCP client (this will parse and return limited fields)
contacts = await nc_client.contacts.list_contacts(addressbook=addressbook_name)
our_contact = next((c for c in contacts if c["vcard_id"] == contact_uid), None)
assert our_contact is not None
logger.info("Contact as parsed by MCP client:")
logger.info(our_contact)
# Check what fields are accessible through the parsed contact
parsed_contact = our_contact["contact"]
# These should be available (basic fields that are parsed)
assert parsed_contact["fullname"] == "John Extended Doe"
assert parsed_contact["email"] is not None # Some email should be present
# The raw vCard should still be available in addressdata
raw_addressdata = our_contact["addressdata"]
assert "X-CUSTOM-FIELD:This should be preserved" in raw_addressdata
assert "ORG:Example Corporation" in raw_addressdata
# The key test: Can we update this contact without losing extended field data?
logger.info("Testing contact update preservation...")
# Update the contact through the MCP client with a simple change
try:
await nc_client.contacts.update_contact(
addressbook=addressbook_name,
uid=contact_uid,
contact_data={"email": "john.updated@example.com"},
)
logger.info("✓ Contact updated successfully")
except Exception as e:
logger.error(f"✗ Failed to update contact: {e}")
raise
# Retrieve the contact again to see if extended fields survived
contacts_after = await nc_client.contacts.list_contacts(
addressbook=addressbook_name
)
updated_contact = next(
(c for c in contacts_after if c["vcard_id"] == contact_uid), None
)
assert updated_contact is not None, "Contact not found after update"
updated_addressdata = updated_contact["addressdata"]
logger.info("Raw vCard after contact update:")
logger.info(updated_addressdata)
# THIS IS THE CRITICAL TEST - extended fields should be preserved during updates
extended_field_checks = [
("ORG:Example Corporation", "organization field"),
("TITLE:Senior Developer", "title field"),
("TEL;TYPE=cell:+1-555-123-4567", "cell phone"),
("TEL;TYPE=work:+1-555-987-6543", "work phone"),
("ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA", "home address"),
("ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA", "work address"),
("URL;TYPE=work;VALUE=URI:https://company.com/john", "work URL"),
("NOTE:This is a note with important information", "note field"),
("CATEGORIES:colleagues,developers,friends", "categories"),
("X-CUSTOM-FIELD:This should be preserved", "custom field"),
("X-LINKEDIN:https://linkedin.com/in/johndoe", "LinkedIn custom field"),
("john.updated@example.com", "updated email"),
]
all_preserved = True
for field_pattern, field_name in extended_field_checks:
if field_pattern in updated_addressdata:
logger.info(f"{field_name} preserved")
else:
logger.error(f"{field_name} was lost during update")
all_preserved = False
# The test should PASS - field preservation should work
assert all_preserved, (
"Contact update lost extended field data - this indicates the preservation mechanism failed"
)
logger.info("🎉 SUCCESS: All extended fields preserved during contact update!")
finally:
# Cleanup
try:
await nc_client.contacts.delete_addressbook(name=addressbook_name)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup addressbook {addressbook_name}: {cleanup_error}"
)
@pytest.mark.integration
async def test_calendar_event_roundtrip_data_loss_demonstration(nc_client):
"""Demonstrates specific data loss scenarios in calendar events."""
calendar_name = "personal"
event_data = {
"title": "Roundtrip Test Event",
"description": "Testing data preservation",
"start_datetime": (datetime.now() + timedelta(days=2)).isoformat(),
"end_datetime": (datetime.now() + timedelta(days=2, hours=1)).isoformat(),
}
result = await nc_client.calendar.create_event(calendar_name, event_data)
event_uid = result["uid"]
try:
# Inject additional iCal properties that are valid but not supported by our parser
extended_ical = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Extended Client//EN
BEGIN:VEVENT
UID:{event_uid}
DTSTART:{(datetime.now() + timedelta(days=2)).strftime("%Y%m%dT%H%M%SZ")}
DTEND:{(datetime.now() + timedelta(days=2, hours=1)).strftime("%Y%m%dT%H%M%SZ")}
SUMMARY:Roundtrip Test Event
DESCRIPTION:Testing data preservation
STATUS:CONFIRMED
PRIORITY:5
CLASS:PUBLIC
SEQUENCE:1
X-MICROSOFT-CDO-ALLDAYEVENT:FALSE
X-MICROSOFT-CDO-IMPORTANCE:1
X-CUSTOM-MEETING-ID:12345-67890
X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890
ORGANIZER;CN=Test Organizer:mailto:organizer@example.com
COMMENT:This is a comment that should be preserved
LOCATION:Conference Room A
GEO:40.7128;-74.0060
TRANSP:OPAQUE
CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VEVENT
END:VCALENDAR"""
# Inject the extended iCal
event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics"
await nc_client.calendar._make_request(
"PUT",
event_path,
content=extended_ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
# Verify extended properties are present
response = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
original_ical = response.text
# Confirm extended properties exist
extended_properties = [
"SEQUENCE:1",
"X-MICROSOFT-CDO-ALLDAYEVENT:FALSE",
"X-CUSTOM-MEETING-ID:12345-67890",
"X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890",
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com",
"COMMENT:This is a comment that should be preserved",
"GEO:40.7128;-74.0060",
"TRANSP:OPAQUE",
]
# More flexible patterns for properties that might be reformatted
flexible_patterns = {
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com": [
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com",
'ORGANIZER;CN="Test Organizer":mailto:organizer@example.com',
],
"GEO:40.7128;-74.0060": [
"GEO:40.7128;-74.0060",
"GEO:40.7128;-74.006", # May lose trailing zero
],
}
for prop in extended_properties:
assert prop in original_ical, (
f"Extended property {prop} not found in original iCal"
)
logger.info("✓ All extended properties confirmed in original iCal")
# Now perform a simple update through MCP
update_data = {"location": "Conference Room B"} # Simple location change
await nc_client.calendar.update_event(calendar_name, event_uid, update_data)
# Check what survived the round-trip
response_after = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
updated_ical = response_after.text
logger.info("Checking which properties survived the update...")
# Check which extended properties survived
survived = []
lost = []
for prop in extended_properties:
# Check if this property has flexible patterns
if prop in flexible_patterns:
# Check if any of the flexible patterns match
found = any(
pattern in updated_ical for pattern in flexible_patterns[prop]
)
if found:
survived.append(prop)
else:
lost.append(prop)
else:
# Standard exact match
if prop in updated_ical:
survived.append(prop)
else:
lost.append(prop)
logger.info(f"Properties that SURVIVED: {survived}")
logger.error(f"Properties that were LOST: {lost}")
# This test should fail - we expect data loss
assert len(lost) == 0, (
f"Round-trip update lost {len(lost)} extended properties: {lost}"
)
finally:
try:
await nc_client.calendar.delete_event(calendar_name, event_uid)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")
+43 -25
View File
@@ -24,7 +24,6 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession):
# Verify expected tools are present
expected_tools = [
"nc_get_note",
"nc_notes_create_note",
"nc_notes_update_note",
"nc_notes_append_content",
@@ -137,11 +136,9 @@ async def test_mcp_notes_crud_workflow(
# 3. Read note via MCP
logger.info(f"Reading note via MCP: {note_id}")
read_result = await nc_mcp_client.call_tool("nc_get_note", {"note_id": note_id})
assert read_result.isError is False, (
f"MCP note read failed: {read_result.content}"
)
read_note_data = json.loads(read_result.content[0].text)
read_result = await nc_mcp_client.read_resource(f"nc://Notes/{note_id}")
assert len(read_result.contents) == 1, "Expected exactly one content item"
read_note_data = json.loads(read_result.contents[0].text)
assert read_note_data["title"] == test_title
assert read_note_data["content"] == test_content
@@ -199,14 +196,23 @@ async def test_mcp_notes_crud_workflow(
)
search_notes_text = search_result.content[0].text
logger.info(f"Search result text: {search_notes_text}")
search_notes = json.loads(search_notes_text)
search_response = json.loads(search_notes_text)
# Ensure search_notes is a list
if not isinstance(search_notes, list):
logger.warning(
f"Expected search results to be a list, got: {type(search_notes)}"
)
search_notes = [search_notes] if search_notes else []
# Expect structured response with Pydantic format
assert isinstance(search_response, dict), (
f"Expected search response to be a dict with structured format, got: {type(search_response)}"
)
assert "results" in search_response, (
f"Expected 'results' field in search response, got keys: {list(search_response.keys())}"
)
assert "success" in search_response and search_response["success"], (
f"Expected successful search response, got: {search_response}"
)
search_notes = search_response["results"]
assert isinstance(search_notes, list), (
f"Expected results to be a list, got: {type(search_notes)}"
)
# Find our note in search results
found_note = None
@@ -216,7 +222,7 @@ async def test_mcp_notes_crud_workflow(
break
assert found_note is not None, (
f"Created note not found in search results. Search returned: {search_notes}"
f"Created note not found in search results. Search returned: {search_response}"
)
assert found_note["title"] == updated_title
@@ -431,21 +437,33 @@ async def test_mcp_calendar_workflow(
f"MCP calendar listing failed: {calendars_result.content}"
)
calendars_data = json.loads(calendars_result.content[0].text)
calendars_response = json.loads(calendars_result.content[0].text)
# Debug output to understand the structure
logger.info(f"calendars_data type: {type(calendars_data)}")
logger.info(f"calendars_data content: {calendars_data}")
logger.info(f"calendars_response type: {type(calendars_response)}")
logger.info(f"calendars_response content: {calendars_response}")
# Handle the case where MCP tool returns a single dict instead of a list
if isinstance(calendars_data, dict):
# Single calendar returned as dict instead of list
calendar_name = calendars_data["name"]
elif isinstance(calendars_data, list) and calendars_data:
# Normal case - list of calendars
calendar_name = calendars_data[0]["name"]
else:
# Expect structured response with Pydantic format
assert isinstance(calendars_response, dict), (
f"Expected calendar response to be a dict with structured format, got: {type(calendars_response)}"
)
assert "calendars" in calendars_response, (
f"Expected 'calendars' field in response, got keys: {list(calendars_response.keys())}"
)
assert "success" in calendars_response and calendars_response["success"], (
f"Expected successful calendar response, got: {calendars_response}"
)
calendars_list = calendars_response["calendars"]
assert isinstance(calendars_list, list), (
f"Expected calendars to be a list, got: {type(calendars_list)}"
)
if not calendars_list:
pytest.skip("No calendars available for testing")
# Use the first available calendar
calendar_name = calendars_list[0]["name"]
logger.info(f"Using calendar: {calendar_name}")
# 2. Create event via MCP
+82
View File
@@ -0,0 +1,82 @@
"""Unit tests for Pydantic models and serialization."""
import json
import re
from datetime import datetime, timezone
from nextcloud_mcp_server.models.base import BaseResponse, SuccessResponse
def test_timestamp_format_validation():
"""Test that timestamps in BaseResponse are RFC3339 compliant for MCP validation.
This test should initially fail, demonstrating the timestamp validation error
seen in MCP inspector. MCP expects RFC3339 format with timezone information.
"""
# Create a response object
response = SuccessResponse(message="Test message")
# Serialize to JSON (mimics what MCP inspector sees)
json_str = response.model_dump_json()
data = json.loads(json_str)
timestamp_str = data["timestamp"]
# RFC3339 regex pattern (what MCP expects)
# Format: YYYY-MM-DDTHH:MM:SS[.ffffff][Z|±HH:MM]
rfc3339_pattern = (
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})$"
)
# This assertion should FAIL with current implementation
assert re.match(rfc3339_pattern, timestamp_str), (
f"Timestamp '{timestamp_str}' is not RFC3339 compliant. "
f"MCP expects format like '2025-08-30T19:22:58.862377Z' or '2025-08-30T19:22:58.862377+00:00'"
)
def test_base_response_timestamp_is_utc():
"""Test that BaseResponse timestamps are in UTC timezone."""
response = BaseResponse()
# The timestamp should be timezone-aware and in UTC
assert response.timestamp.tzinfo is not None, (
"Timestamp should have timezone information"
)
assert response.timestamp.tzinfo == timezone.utc, (
"Timestamp should be in UTC timezone"
)
def test_serialized_timestamp_ends_with_z_or_offset():
"""Test that serialized timestamps have proper timezone suffix."""
response = BaseResponse()
json_str = response.model_dump_json()
data = json.loads(json_str)
timestamp_str = data["timestamp"]
# Should end with 'Z' (UTC) or timezone offset like '+00:00'
assert timestamp_str.endswith("Z") or re.search(
r"[+-]\d{2}:\d{2}$", timestamp_str
), (
f"Timestamp '{timestamp_str}' should end with 'Z' or timezone offset like '+00:00'"
)
def test_current_broken_format():
"""Test showing the current broken timestamp format that causes MCP validation errors."""
# This demonstrates what the current code produces
current_naive_dt = datetime.now()
current_format = current_naive_dt.isoformat()
# Show that current format lacks timezone info
assert "Z" not in current_format
assert "+" not in current_format
assert "-" not in current_format[-6:] # Check last 6 chars for timezone
print(f"Current broken format: {current_format}")
print(
"This format causes MCP validation errors because it lacks timezone information"
)
Generated
+4 -2
View File
@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.11"
[[package]]
@@ -505,13 +505,14 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.6.1"
version = "0.8.0"
source = { editable = "." }
dependencies = [
{ name = "httpx" },
{ name = "icalendar" },
{ name = "mcp", extra = ["cli"] },
{ name = "pillow" },
{ name = "pydantic" },
{ name = "pythonvcard4" },
]
@@ -531,6 +532,7 @@ requires-dist = [
{ name = "icalendar", specifier = ">=6.0.0,<7.0.0" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.10,<1.11" },
{ name = "pillow", specifier = ">=11.2.1,<12.0.0" },
{ name = "pydantic", specifier = ">=2.11.4" },
{ name = "pythonvcard4", specifier = ">=0.2.0" },
]