From fa002296ffac33d6fb368569e950abbe72b549a9 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 13:23:34 +0200 Subject: [PATCH 1/7] chore(claude): Initialize CLAUDE.md --- CLAUDE.md | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..69db26a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,104 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Development Commands + +### Testing +```bash +# Run all tests +pytest + +# Run integration tests only +uv run pytest -m integration + +# Run tests with coverage +uv run pytest --cov + +# Skip integration tests +uv run pytest -m "not integration" +``` + +### Code Quality +```bash +# Format and lint code +uv run ruff check +uv run ruff format + +# Type checking +# No explicit type checker configured - this is a Python project using ruff for linting +``` + +### Running the Server +```bash +# Local development - load environment variables and run +export $(grep -v '^#' .env | xargs) +mcp run --transport sse nextcloud_mcp_server.app:mcp + +# Docker development environment with Nextcloud instance +docker-compose up + +# Build Docker image +docker build -t nextcloud-mcp-server . +``` + +### Environment Setup +```bash +# Install dependencies +uv sync + +# Install development dependencies +uv sync --group dev +``` + +## Architecture Overview + +This is a Python MCP (Model Context Protocol) server that provides LLM integration with Nextcloud. The architecture follows a layered pattern: + +### Core Components + +- **`nextcloud_mcp_server/app.py`** - Main MCP server entry point using FastMCP framework +- **`nextcloud_mcp_server/client/`** - HTTP client implementations for different Nextcloud APIs +- **`nextcloud_mcp_server/server/`** - MCP tool/resource definitions that expose client functionality +- **`nextcloud_mcp_server/controllers/`** - Business logic controllers (e.g., notes search) + +### Client Architecture + +- **`NextcloudClient`** - Main orchestrating client that manages all app-specific clients +- **`BaseNextcloudClient`** - Abstract base class providing common HTTP functionality and retry logic +- **App-specific clients**: `NotesClient`, `CalendarClient`, `ContactsClient`, `TablesClient`, `WebDAVClient` + +### Server Integration + +Each Nextcloud app has a corresponding server module that: +1. Defines MCP tools using `@mcp.tool()` decorators +2. Defines MCP resources using `@mcp.resource()` decorators +3. Uses the context pattern to access the `NextcloudClient` instance + +### Supported Nextcloud Apps + +- **Notes** - Full CRUD operations and search +- **Calendar** - CalDAV integration with events, recurring events, attendees +- **Contacts** - CardDAV integration with address book operations +- **Tables** - Row-level operations on Nextcloud Tables +- **WebDAV** - Complete file system access + +### Key Patterns + +1. **Environment-based configuration** - Uses `NextcloudClient.from_env()` to load credentials from environment variables +2. **Async/await throughout** - All operations are async using httpx +3. **Retry logic** - `@retry_on_429` decorator handles rate limiting +4. **Context injection** - MCP context provides access to the authenticated client instance +5. **Modular design** - Each Nextcloud app is isolated in its own client/server pair + +### Testing Structure + +- **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions +- **Fixtures** in `tests/conftest.py` - Shared test setup and utilities +- Tests are marked with `@pytest.mark.integration` for selective running + +### Configuration Files + +- **`pyproject.toml`** - Python project configuration using uv for dependency management +- **`.env`** (from `env.sample`) - Environment variables for Nextcloud connection +- **`docker-compose.yml`** - Complete development environment with Nextcloud + database From 84ad1958af64e2c396193be9769d992307738494 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 14:25:16 +0200 Subject: [PATCH 2/7] chore: Remove unnecessary logging Migrate pre-commit tasks to local --- .pre-commit-config.yaml | 11 +++++++++-- CLAUDE.md | 4 ++++ nextcloud_mcp_server/client/base.py | 7 +++++++ nextcloud_mcp_server/client/webdav.py | 5 ++--- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f8b3af0..84a9e6c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,8 +6,15 @@ repos: - id: commitizen-branch stages: - pre-push -- repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.12.5 +- repo: local hooks: - id: ruff-check + name: ruff-check + entry: uv run ruff check + language: system + types: [python] - id: ruff-format + name: ruff-format + entry: uv run ruff format + language: system + types: [python] diff --git a/CLAUDE.md b/CLAUDE.md index 69db26a..f7e5e87 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,6 +38,9 @@ mcp run --transport sse nextcloud_mcp_server.app:mcp # Docker development environment with Nextcloud instance docker-compose up +# After code changes, rebuild and restart only the MCP server container +docker-compose up --build -d mcp + # Build Docker image docker build -t nextcloud-mcp-server . ``` @@ -96,6 +99,7 @@ Each Nextcloud app has a corresponding server module that: - **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions - **Fixtures** in `tests/conftest.py` - Shared test setup and utilities - Tests are marked with `@pytest.mark.integration` for selective running +- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests ### Configuration Files diff --git a/nextcloud_mcp_server/client/base.py b/nextcloud_mcp_server/client/base.py index 664353b..3dbabdf 100644 --- a/nextcloud_mcp_server/client/base.py +++ b/nextcloud_mcp_server/client/base.py @@ -39,6 +39,13 @@ def retry_on_429(func): f"429 Client Error: Too Many Requests, Number of attempts: {retries}" ) time.sleep(5) + elif e.response.status_code == 404: + # 404 errors are often expected (e.g., checking if attachments exist) + # Log as debug instead of warning + logger.debug( + f"HTTPStatusError {e.response.status_code}: {e}, Number of attempts: {retries}" + ) + raise else: logger.warning( f"HTTPStatusError {e.response.status_code}: {e}, Number of attempts: {retries}" diff --git a/nextcloud_mcp_server/client/webdav.py b/nextcloud_mcp_server/client/webdav.py index fbe4f28..d6f3fd9 100644 --- a/nextcloud_mcp_server/client/webdav.py +++ b/nextcloud_mcp_server/client/webdav.py @@ -31,7 +31,7 @@ class WebDAVClient(BaseNextcloudClient): # First try a PROPFIND to verify resource exists propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} try: - propfind_resp = await self._client.request( + propfind_resp = await self._make_request( "PROPFIND", webdav_path, headers=propfind_headers ) logger.debug( @@ -44,8 +44,7 @@ class WebDAVClient(BaseNextcloudClient): # For other errors, continue with deletion attempt # Proceed with deletion - response = await self._client.delete(webdav_path, headers=headers) - response.raise_for_status() + response = await self._make_request("DELETE", webdav_path, headers=headers) logger.debug(f"Successfully deleted WebDAV resource '{path}'") return {"status_code": response.status_code} From 0484167a220e0b879950ae2e01816888cef82942 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 14:27:53 +0200 Subject: [PATCH 3/7] refactor: Use _make_request where available --- nextcloud_mcp_server/client/webdav.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/nextcloud_mcp_server/client/webdav.py b/nextcloud_mcp_server/client/webdav.py index d6f3fd9..0892dc6 100644 --- a/nextcloud_mcp_server/client/webdav.py +++ b/nextcloud_mcp_server/client/webdav.py @@ -126,7 +126,7 @@ class WebDAVClient(BaseNextcloudClient): # First check if we can access WebDAV at all notes_dir_path = f"{webdav_base}/Notes" propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} - notes_dir_response = await self._client.request( + notes_dir_response = await self._make_request( "PROPFIND", notes_dir_path, headers=propfind_headers ) @@ -145,7 +145,7 @@ class WebDAVClient(BaseNextcloudClient): # Ensure the parent directory exists using MKCOL mkcol_headers = {"OCS-APIRequest": "true"} - mkcol_response = await self._client.request( + mkcol_response = await self._make_request( "MKCOL", parent_dir_path, headers=mkcol_headers ) @@ -157,8 +157,8 @@ class WebDAVClient(BaseNextcloudClient): mkcol_response.raise_for_status() # Proceed with the PUT request - response = await self._client.put( - attachment_path, content=content, headers=headers + response = await self._make_request( + "PUT", attachment_path, content=content, headers=headers ) response.raise_for_status() logger.debug( @@ -189,7 +189,7 @@ class WebDAVClient(BaseNextcloudClient): logger.debug(f"Fetching attachment '{filename}' for note {note_id}") try: - response = await self._client.get(attachment_path) + response = await self._make_request("GET", attachment_path) response.raise_for_status() content = response.content @@ -236,7 +236,7 @@ class WebDAVClient(BaseNextcloudClient): headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"} try: - response = await self._client.request( + response = await self._make_request( "PROPFIND", webdav_path, content=propfind_body, headers=headers ) response.raise_for_status() @@ -319,7 +319,7 @@ class WebDAVClient(BaseNextcloudClient): logger.debug(f"Reading file: {path}") try: - response = await self._client.get(webdav_path) + response = await self._make_request("GET", webdav_path) response.raise_for_status() content = response.content @@ -353,8 +353,8 @@ class WebDAVClient(BaseNextcloudClient): headers = {"Content-Type": content_type, "OCS-APIRequest": "true"} try: - response = await self._client.put( - webdav_path, content=content, headers=headers + response = await self._make_request( + "PUT", webdav_path, content=content, headers=headers ) response.raise_for_status() @@ -381,7 +381,7 @@ class WebDAVClient(BaseNextcloudClient): headers = {"OCS-APIRequest": "true"} try: - response = await self._client.request("MKCOL", webdav_path, headers=headers) + response = await self._make_request("MKCOL", webdav_path, headers=headers) response.raise_for_status() logger.debug(f"Successfully created directory '{path}'") From 938376425b64a20b73792fc488304149d8084162 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 14:34:25 +0200 Subject: [PATCH 4/7] chore: Update CLAUDE.md --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index f7e5e87..f7874c7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -7,7 +7,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ### Testing ```bash # Run all tests -pytest +uv run pytest # Run integration tests only uv run pytest -m integration From 9b00530e8e8d5f319602a1388ad2d38aff1d7e75 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 18:27:32 +0200 Subject: [PATCH 5/7] feat(server): Add structured output to all tool/resource output BREAKING CHANGE --- CLAUDE.md | 10 + nextcloud_mcp_server/models/__init__.py | 146 +++++++++++++++ nextcloud_mcp_server/models/base.py | 70 +++++++ nextcloud_mcp_server/models/calendar.py | 182 +++++++++++++++++++ nextcloud_mcp_server/models/contacts.py | 130 +++++++++++++ nextcloud_mcp_server/models/notes.py | 77 ++++++++ nextcloud_mcp_server/models/tables.py | 142 +++++++++++++++ nextcloud_mcp_server/models/webdav.py | 88 +++++++++ nextcloud_mcp_server/server/calendar.py | 11 +- nextcloud_mcp_server/server/contacts.py | 2 +- nextcloud_mcp_server/server/notes.py | 191 +++++++++++++++++--- pyproject.toml | 1 + tests/integration/test_error_propagation.py | 186 +++++++++++++++++++ tests/integration/test_mcp.py | 68 ++++--- uv.lock | 4 +- 15 files changed, 1255 insertions(+), 53 deletions(-) create mode 100644 nextcloud_mcp_server/models/__init__.py create mode 100644 nextcloud_mcp_server/models/base.py create mode 100644 nextcloud_mcp_server/models/calendar.py create mode 100644 nextcloud_mcp_server/models/contacts.py create mode 100644 nextcloud_mcp_server/models/notes.py create mode 100644 nextcloud_mcp_server/models/tables.py create mode 100644 nextcloud_mcp_server/models/webdav.py create mode 100644 tests/integration/test_error_propagation.py diff --git a/CLAUDE.md b/CLAUDE.md index f7874c7..ed69773 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -101,6 +101,16 @@ Each Nextcloud app has a corresponding server module that: - Tests are marked with `@pytest.mark.integration` for selective running - **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests +#### Testing Best Practices +- **Always restart MCP server** after code changes with `docker-compose up --build -d mcp` +- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work: + - `nc_mcp_client` - MCP client session for tool/resource testing + - `nc_client` - Direct NextcloudClient for setup/cleanup operations + - `temporary_note` - Creates and cleans up test notes automatically + - `temporary_addressbook` - Creates and cleans up test address books + - `temporary_contact` - Creates and cleans up test contacts +- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead + ### Configuration Files - **`pyproject.toml`** - Python project configuration using uv for dependency management diff --git a/nextcloud_mcp_server/models/__init__.py b/nextcloud_mcp_server/models/__init__.py new file mode 100644 index 0000000..f51ed4f --- /dev/null +++ b/nextcloud_mcp_server/models/__init__.py @@ -0,0 +1,146 @@ +"""Pydantic models for structured MCP server responses.""" + +# Base models +from .base import ( + BaseResponse, + ErrorResponse, + SuccessResponse, + PaginatedResponse, + IdResponse, + StatusResponse, +) + +# Notes models +from .notes import ( + Note, + NoteSearchResult, + NotesSettings, + CreateNoteResponse, + UpdateNoteResponse, + DeleteNoteResponse, + AppendContentResponse, + SearchNotesResponse, +) + +# Calendar models +from .calendar import ( + Calendar, + CalendarEvent, + CalendarEventSummary, + CreateEventResponse, + UpdateEventResponse, + DeleteEventResponse, + ListEventsResponse, + ListCalendarsResponse, + AvailabilitySlot, + FindAvailabilityResponse, + BulkOperationResult, + BulkOperationResponse, + CreateMeetingResponse, + UpcomingEventsResponse, + ManageCalendarResponse, +) + +# Contacts models +from .contacts import ( + AddressBook, + Contact, + ContactField, + ListAddressBooksResponse, + ListContactsResponse, + CreateContactResponse, + UpdateContactResponse, + DeleteContactResponse, + CreateAddressBookResponse, + DeleteAddressBookResponse, +) + +# Tables models +from .tables import ( + Table, + TableColumn, + TableRow, + TableView, + TableSchema, + ListTablesResponse, + GetSchemaResponse, + ReadTableResponse, + CreateRowResponse, + UpdateRowResponse, + DeleteRowResponse, +) + +# WebDAV models +from .webdav import ( + FileInfo, + DirectoryListing, + ReadFileResponse, + WriteFileResponse, + CreateDirectoryResponse, + DeleteResourceResponse, +) + +__all__ = [ + # Base models + "BaseResponse", + "ErrorResponse", + "SuccessResponse", + "PaginatedResponse", + "IdResponse", + "StatusResponse", + # Notes models + "Note", + "NoteSearchResult", + "NotesSettings", + "CreateNoteResponse", + "UpdateNoteResponse", + "DeleteNoteResponse", + "AppendContentResponse", + "SearchNotesResponse", + # Calendar models + "Calendar", + "CalendarEvent", + "CalendarEventSummary", + "CreateEventResponse", + "UpdateEventResponse", + "DeleteEventResponse", + "ListEventsResponse", + "ListCalendarsResponse", + "AvailabilitySlot", + "FindAvailabilityResponse", + "BulkOperationResult", + "BulkOperationResponse", + "CreateMeetingResponse", + "UpcomingEventsResponse", + "ManageCalendarResponse", + # Contacts models + "AddressBook", + "Contact", + "ContactField", + "ListAddressBooksResponse", + "ListContactsResponse", + "CreateContactResponse", + "UpdateContactResponse", + "DeleteContactResponse", + "CreateAddressBookResponse", + "DeleteAddressBookResponse", + # Tables models + "Table", + "TableColumn", + "TableRow", + "TableView", + "TableSchema", + "ListTablesResponse", + "GetSchemaResponse", + "ReadTableResponse", + "CreateRowResponse", + "UpdateRowResponse", + "DeleteRowResponse", + # WebDAV models + "FileInfo", + "DirectoryListing", + "ReadFileResponse", + "WriteFileResponse", + "CreateDirectoryResponse", + "DeleteResourceResponse", +] diff --git a/nextcloud_mcp_server/models/base.py b/nextcloud_mcp_server/models/base.py new file mode 100644 index 0000000..d89b4d2 --- /dev/null +++ b/nextcloud_mcp_server/models/base.py @@ -0,0 +1,70 @@ +"""Base Pydantic models for common response patterns.""" + +from datetime import datetime +from typing import Any, Dict, Generic, List, Optional, TypeVar, Union + +from pydantic import BaseModel, Field + + +class BaseResponse(BaseModel): + """Base response model for all MCP tool responses.""" + + model_config = {"json_encoders": {datetime: lambda v: v.isoformat()}} + + success: bool = Field( + default=True, description="Whether the operation was successful" + ) + timestamp: datetime = Field( + default_factory=datetime.now, description="Response timestamp" + ) + + +class ErrorResponse(BaseResponse): + """Response model for error cases.""" + + success: bool = Field(default=False, description="Always False for error responses") + error: str = Field(description="Error message") + error_code: Optional[str] = Field(None, description="Optional error code") + details: Optional[Dict[str, Any]] = Field( + None, description="Additional error details" + ) + + +class SuccessResponse(BaseResponse): + """Generic success response.""" + + message: Optional[str] = Field(None, description="Optional success message") + data: Optional[Dict[str, Any]] = Field(None, description="Optional response data") + + +T = TypeVar("T") + + +class PaginatedResponse(BaseResponse, Generic[T]): + """Generic paginated response model.""" + + items: List[T] = Field(description="List of items") + total_count: Optional[int] = Field( + None, description="Total number of items available" + ) + page: Optional[int] = Field(None, description="Current page number") + page_size: Optional[int] = Field(None, description="Number of items per page") + has_more: Optional[bool] = Field( + None, description="Whether more items are available" + ) + cursor: Optional[str] = Field( + None, description="Cursor for next page (if using cursor-based pagination)" + ) + + +class IdResponse(BaseResponse): + """Response model for operations that return a new ID.""" + + id: Union[int, str] = Field(description="ID of the created or affected resource") + + +class StatusResponse(BaseResponse): + """Response model for operations that return just a status.""" + + status_code: Optional[int] = Field(None, description="HTTP status code") + message: Optional[str] = Field(None, description="Status message") diff --git a/nextcloud_mcp_server/models/calendar.py b/nextcloud_mcp_server/models/calendar.py new file mode 100644 index 0000000..474db42 --- /dev/null +++ b/nextcloud_mcp_server/models/calendar.py @@ -0,0 +1,182 @@ +"""Pydantic models for Calendar app responses.""" + +from typing import List, Optional + +from pydantic import BaseModel, Field + +from .base import BaseResponse, StatusResponse + + +class Calendar(BaseModel): + """Model for a Nextcloud calendar.""" + + name: str = Field(description="Calendar name/ID") + display_name: str = Field(description="Calendar display name") + description: Optional[str] = Field(None, description="Calendar description") + color: Optional[str] = Field(None, description="Calendar color") + href: Optional[str] = Field(None, description="Calendar DAV href") + timezone: Optional[str] = Field(None, description="Calendar timezone") + enabled: bool = Field(default=True, description="Whether calendar is enabled") + ctag: Optional[str] = Field(None, description="Calendar tag for synchronization") + + +class CalendarEventSummary(BaseModel): + """Model for calendar event summary (for lists).""" + + uid: str = Field(description="Event UID") + summary: str = Field(description="Event summary/title") + start: str = Field(description="Event start datetime (ISO format)") + end: Optional[str] = Field(None, description="Event end datetime (ISO format)") + all_day: bool = Field(default=False, description="Whether event is all-day") + location: Optional[str] = Field(None, description="Event location") + description: Optional[str] = Field(None, description="Event description") + categories: List[str] = Field(default_factory=list, description="Event categories") + status: Optional[str] = Field( + None, description="Event status (CONFIRMED, TENTATIVE, CANCELLED)" + ) + + +class CalendarEvent(CalendarEventSummary): + """Model for a complete calendar event.""" + + created: Optional[str] = Field(None, description="Event creation datetime") + last_modified: Optional[str] = Field(None, description="Last modification datetime") + recurring: bool = Field(default=False, description="Whether event is recurring") + recurrence_rule: Optional[str] = Field(None, description="RFC5545 recurrence rule") + recurrence_end: Optional[str] = Field(None, description="Recurrence end date") + attendees: List[str] = Field( + default_factory=list, description="List of attendee email addresses" + ) + organizer: Optional[str] = Field(None, description="Event organizer") + priority: Optional[int] = Field(None, description="Event priority (1-9)") + privacy: Optional[str] = Field(None, description="Event privacy level") + url: Optional[str] = Field(None, description="Event URL") + duration_minutes: Optional[int] = Field( + None, description="Event duration in minutes" + ) + reminder_minutes: Optional[int] = Field( + None, description="Reminder time in minutes before event" + ) + reminder_email: bool = Field( + default=False, description="Whether to send email reminder" + ) + color: Optional[str] = Field(None, description="Event color") + etag: Optional[str] = Field(None, description="ETag for versioning") + + +class CreateEventResponse(BaseResponse): + """Response model for event creation.""" + + event: CalendarEvent = Field(description="The created event") + calendar_name: str = Field( + description="Name of the calendar the event was created in" + ) + + +class UpdateEventResponse(BaseResponse): + """Response model for event updates.""" + + event: CalendarEvent = Field(description="The updated event") + calendar_name: str = Field(description="Name of the calendar the event belongs to") + + +class DeleteEventResponse(StatusResponse): + """Response model for event deletion.""" + + deleted_uid: str = Field(description="UID of the deleted event") + calendar_name: str = Field( + description="Name of the calendar the event was deleted from" + ) + + +class ListEventsResponse(BaseResponse): + """Response model for listing events.""" + + events: List[CalendarEventSummary] = Field(description="List of events") + calendar_name: Optional[str] = Field( + None, description="Calendar name (if filtered to one calendar)" + ) + start_date: Optional[str] = Field(None, description="Start date filter applied") + end_date: Optional[str] = Field(None, description="End date filter applied") + total_found: int = Field(description="Total number of events found") + + +class ListCalendarsResponse(BaseResponse): + """Response model for listing calendars.""" + + calendars: List[Calendar] = Field(description="List of available calendars") + total_count: int = Field(description="Total number of calendars") + + +class AvailabilitySlot(BaseModel): + """Model for an available time slot.""" + + start: str = Field(description="Slot start datetime (ISO format)") + end: str = Field(description="Slot end datetime (ISO format)") + duration_minutes: int = Field(description="Slot duration in minutes") + date: str = Field(description="Date of the slot (YYYY-MM-DD)") + + +class FindAvailabilityResponse(BaseResponse): + """Response model for finding availability.""" + + available_slots: List[AvailabilitySlot] = Field( + description="List of available time slots" + ) + duration_requested: int = Field(description="Requested duration in minutes") + date_range_start: str = Field(description="Start date of search range") + date_range_end: str = Field(description="End date of search range") + attendees_checked: List[str] = Field( + default_factory=list, description="Attendees checked for availability" + ) + business_hours_only: bool = Field( + description="Whether search was limited to business hours" + ) + + +class BulkOperationResult(BaseModel): + """Model for bulk operation results.""" + + operation: str = Field(description="Operation performed (update, delete, move)") + events_processed: int = Field(description="Number of events processed") + events_successful: int = Field( + description="Number of events successfully processed" + ) + events_failed: int = Field(description="Number of events that failed processing") + failed_events: List[str] = Field( + default_factory=list, description="UIDs of events that failed" + ) + errors: List[str] = Field(default_factory=list, description="Error messages") + + +class BulkOperationResponse(BaseResponse): + """Response model for bulk operations.""" + + result: BulkOperationResult = Field(description="Bulk operation result") + + +class CreateMeetingResponse(CreateEventResponse): + """Response model for meeting creation (same as event creation).""" + + pass + + +class UpcomingEventsResponse(BaseResponse): + """Response model for upcoming events.""" + + events: List[CalendarEventSummary] = Field(description="List of upcoming events") + days_ahead: int = Field(description="Number of days ahead searched") + calendar_name: Optional[str] = Field( + None, description="Calendar name (if filtered to one calendar)" + ) + + +class ManageCalendarResponse(BaseResponse): + """Response model for calendar management operations.""" + + action: str = Field(description="Action performed (create, delete, update, list)") + calendar: Optional[Calendar] = Field(None, description="Calendar that was affected") + calendars: Optional[List[Calendar]] = Field( + None, description="List of calendars (for list action)" + ) + message: str = Field(description="Success message") diff --git a/nextcloud_mcp_server/models/contacts.py b/nextcloud_mcp_server/models/contacts.py new file mode 100644 index 0000000..54230b9 --- /dev/null +++ b/nextcloud_mcp_server/models/contacts.py @@ -0,0 +1,130 @@ +"""Pydantic models for Contacts app responses.""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from .base import BaseResponse, StatusResponse + + +class AddressBook(BaseModel): + """Model for a Nextcloud address book.""" + + uri: str = Field(description="Address book URI") + displayname: str = Field(description="Address book display name") + description: Optional[str] = Field(None, description="Address book description") + ctag: Optional[str] = Field( + None, description="Address book tag for synchronization" + ) + + +class ContactField(BaseModel): + """Model for a contact field (email, phone, etc.).""" + + type: str = Field(description="Field type (e.g., 'email', 'phone', 'address')") + value: str = Field(description="Field value") + label: Optional[str] = Field(None, description="Field label (e.g., 'work', 'home')") + preferred: bool = Field( + default=False, description="Whether this is the preferred field of this type" + ) + + +class Contact(BaseModel): + """Model for a Nextcloud contact.""" + + uid: str = Field(description="Contact UID") + fn: str = Field(description="Full name (formatted name)") + given_name: Optional[str] = Field(None, description="Given name") + family_name: Optional[str] = Field(None, description="Family name") + organization: Optional[str] = Field(None, description="Organization") + title: Optional[str] = Field(None, description="Job title") + emails: List[ContactField] = Field( + default_factory=list, description="Email addresses" + ) + phones: List[ContactField] = Field( + default_factory=list, description="Phone numbers" + ) + addresses: List[ContactField] = Field(default_factory=list, description="Addresses") + urls: List[ContactField] = Field(default_factory=list, description="URLs") + note: Optional[str] = Field(None, description="Notes") + photo: Optional[str] = Field(None, description="Photo URL or base64 data") + birthday: Optional[str] = Field(None, description="Birthday (ISO date format)") + categories: List[str] = Field( + default_factory=list, description="Contact categories" + ) + custom_fields: Dict[str, Any] = Field( + default_factory=dict, description="Custom fields" + ) + etag: Optional[str] = Field(None, description="ETag for versioning") + + @property + def primary_email(self) -> Optional[str]: + """Get the primary email address.""" + if not self.emails: + return None + # Return preferred email if available, otherwise first email + preferred = next( + (email.value for email in self.emails if email.preferred), None + ) + return preferred or self.emails[0].value + + @property + def primary_phone(self) -> Optional[str]: + """Get the primary phone number.""" + if not self.phones: + return None + # Return preferred phone if available, otherwise first phone + preferred = next( + (phone.value for phone in self.phones if phone.preferred), None + ) + return preferred or self.phones[0].value + + +class ListAddressBooksResponse(BaseResponse): + """Response model for listing address books.""" + + addressbooks: List[AddressBook] = Field( + description="List of available address books" + ) + total_count: int = Field(description="Total number of address books") + + +class ListContactsResponse(BaseResponse): + """Response model for listing contacts.""" + + contacts: List[Contact] = Field(description="List of contacts") + addressbook: str = Field(description="Address book name") + total_count: int = Field(description="Total number of contacts") + + +class CreateContactResponse(BaseResponse): + """Response model for contact creation.""" + + contact: Contact = Field(description="The created contact") + addressbook: str = Field(description="Address book the contact was created in") + + +class UpdateContactResponse(BaseResponse): + """Response model for contact updates.""" + + contact: Contact = Field(description="The updated contact") + addressbook: str = Field(description="Address book the contact belongs to") + + +class DeleteContactResponse(StatusResponse): + """Response model for contact deletion.""" + + deleted_uid: str = Field(description="UID of the deleted contact") + addressbook: str = Field(description="Address book the contact was deleted from") + + +class CreateAddressBookResponse(BaseResponse): + """Response model for address book creation.""" + + addressbook: AddressBook = Field(description="The created address book") + + +class DeleteAddressBookResponse(StatusResponse): + """Response model for address book deletion.""" + + deleted_name: str = Field(description="Name of the deleted address book") diff --git a/nextcloud_mcp_server/models/notes.py b/nextcloud_mcp_server/models/notes.py new file mode 100644 index 0000000..975aee7 --- /dev/null +++ b/nextcloud_mcp_server/models/notes.py @@ -0,0 +1,77 @@ +"""Pydantic models for Notes app responses.""" + +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, Field + +from .base import BaseResponse, IdResponse, StatusResponse + + +class Note(BaseModel): + """Model for a Nextcloud note.""" + + id: int = Field(description="Note ID") + title: str = Field(description="Note title") + content: str = Field(description="Note content in markdown") + category: str = Field(default="", description="Note category") + modified: int = Field(description="Unix timestamp of last modification") + favorite: bool = Field( + default=False, description="Whether note is marked as favorite" + ) + etag: Optional[str] = Field(None, description="ETag for versioning") + readonly: bool = Field(default=False, description="Whether note is read-only") + + @property + def modified_datetime(self) -> datetime: + """Convert Unix timestamp to datetime.""" + return datetime.fromtimestamp(self.modified) + + +class NoteSearchResult(BaseModel): + """Model for note search results (limited fields).""" + + id: int = Field(description="Note ID") + title: str = Field(description="Note title") + category: str = Field(default="", description="Note category") + score: Optional[float] = Field(None, description="Search relevance score") + + +class NotesSettings(BaseModel): + """Model for Notes app settings.""" + + notesPath: str = Field(description="Path to notes directory") + fileSuffix: str = Field(description="File suffix for notes") + noteMode: str = Field(description="Note mode setting") + + +class CreateNoteResponse(IdResponse): + """Response model for note creation.""" + + note: Note = Field(description="The created note") + + +class UpdateNoteResponse(BaseResponse): + """Response model for note updates.""" + + note: Note = Field(description="The updated note") + + +class DeleteNoteResponse(StatusResponse): + """Response model for note deletion.""" + + deleted_id: int = Field(description="ID of the deleted note") + + +class AppendContentResponse(BaseResponse): + """Response model for appending content to a note.""" + + note: Note = Field(description="The updated note after appending content") + + +class SearchNotesResponse(BaseResponse): + """Response model for note search.""" + + results: List[NoteSearchResult] = Field(description="Search results") + query: str = Field(description="The search query used") + total_found: int = Field(description="Total number of notes found") diff --git a/nextcloud_mcp_server/models/tables.py b/nextcloud_mcp_server/models/tables.py new file mode 100644 index 0000000..32e49c2 --- /dev/null +++ b/nextcloud_mcp_server/models/tables.py @@ -0,0 +1,142 @@ +"""Pydantic models for Tables app responses.""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from .base import BaseResponse, IdResponse, StatusResponse + + +class TableColumn(BaseModel): + """Model for a table column definition.""" + + id: int = Field(description="Column ID") + title: str = Field(description="Column title") + type: str = Field(description="Column type (text, number, datetime, etc.)") + subtype: Optional[str] = Field(None, description="Column subtype") + mandatory: bool = Field(default=False, description="Whether column is mandatory") + description: Optional[str] = Field(None, description="Column description") + text_default: Optional[str] = Field(None, description="Default text value") + text_allowed_pattern: Optional[str] = Field( + None, description="Allowed text pattern" + ) + text_max_length: Optional[int] = Field(None, description="Maximum text length") + number_default: Optional[float] = Field(None, description="Default number value") + number_min: Optional[float] = Field(None, description="Minimum number value") + number_max: Optional[float] = Field(None, description="Maximum number value") + number_decimals: Optional[int] = Field(None, description="Number of decimal places") + datetime_default: Optional[str] = Field(None, description="Default datetime value") + selection_options: List[str] = Field( + default_factory=list, description="Selection options" + ) + selection_default: Optional[str] = Field( + None, description="Default selection value" + ) + + +class TableRow(BaseModel): + """Model for a table row.""" + + id: int = Field(description="Row ID") + created_by: Optional[str] = Field(None, description="User who created the row") + created_at: Optional[str] = Field(None, description="Row creation timestamp") + last_edit_by: Optional[str] = Field( + None, description="User who last edited the row" + ) + last_edit_at: Optional[str] = Field(None, description="Last edit timestamp") + data: Dict[int, Any] = Field(description="Row data keyed by column ID") + + +class TableView(BaseModel): + """Model for a table view.""" + + id: int = Field(description="View ID") + title: str = Field(description="View title") + emoji: Optional[str] = Field(None, description="View emoji") + description: Optional[str] = Field(None, description="View description") + columns: List[int] = Field( + default_factory=list, description="List of column IDs in this view" + ) + sort: List[Dict[str, Any]] = Field( + default_factory=list, description="Sort configuration" + ) + filter: List[Dict[str, Any]] = Field( + default_factory=list, description="Filter configuration" + ) + + +class Table(BaseModel): + """Model for a Nextcloud table.""" + + id: int = Field(description="Table ID") + title: str = Field(description="Table title") + emoji: Optional[str] = Field(None, description="Table emoji") + ownership: str = Field(description="Table ownership") + owner_display_name: str = Field(description="Display name of table owner") + created_by: Optional[str] = Field(None, description="User who created the table") + created_at: Optional[str] = Field(None, description="Table creation timestamp") + last_edit_by: Optional[str] = Field( + None, description="User who last edited the table" + ) + last_edit_at: Optional[str] = Field(None, description="Last edit timestamp") + row_count: int = Field(default=0, description="Number of rows in the table") + has_shares: bool = Field(default=False, description="Whether table is shared") + archived: bool = Field(default=False, description="Whether table is archived") + is_shared: bool = Field( + default=False, description="Whether table is shared with current user" + ) + on_share_permissions: Optional[Dict[str, Any]] = Field( + None, description="Share permissions" + ) + + +class TableSchema(BaseModel): + """Model for complete table schema including columns and views.""" + + table: Table = Field(description="Table information") + columns: List[TableColumn] = Field(description="Table columns") + views: List[TableView] = Field(description="Table views") + + +class ListTablesResponse(BaseResponse): + """Response model for listing tables.""" + + tables: List[Table] = Field(description="List of available tables") + total_count: int = Field(description="Total number of tables") + + +class GetSchemaResponse(BaseResponse): + """Response model for getting table schema.""" + + table_schema: TableSchema = Field(description="Table schema information") + + +class ReadTableResponse(BaseResponse): + """Response model for reading table rows.""" + + rows: List[TableRow] = Field(description="Table rows") + table_id: int = Field(description="Table ID") + total_count: Optional[int] = Field( + None, description="Total number of rows (if known)" + ) + offset: Optional[int] = Field(None, description="Offset used for pagination") + limit: Optional[int] = Field(None, description="Limit used for pagination") + + +class CreateRowResponse(IdResponse): + """Response model for row creation.""" + + row: TableRow = Field(description="The created row") + table_id: int = Field(description="Table ID the row was created in") + + +class UpdateRowResponse(BaseResponse): + """Response model for row updates.""" + + row: TableRow = Field(description="The updated row") + + +class DeleteRowResponse(StatusResponse): + """Response model for row deletion.""" + + deleted_id: int = Field(description="ID of the deleted row") diff --git a/nextcloud_mcp_server/models/webdav.py b/nextcloud_mcp_server/models/webdav.py new file mode 100644 index 0000000..bce6174 --- /dev/null +++ b/nextcloud_mcp_server/models/webdav.py @@ -0,0 +1,88 @@ +"""Pydantic models for WebDAV responses.""" + +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, Field + +from .base import BaseResponse, StatusResponse + + +class FileInfo(BaseModel): + """Model for file/directory information.""" + + name: str = Field(description="File/directory name") + path: str = Field(description="Full path") + is_directory: bool = Field(description="Whether this is a directory") + size: Optional[int] = Field( + None, description="File size in bytes (None for directories)" + ) + content_type: Optional[str] = Field(None, description="MIME content type") + last_modified: Optional[str] = Field( + None, description="Last modification time (ISO format)" + ) + etag: Optional[str] = Field(None, description="ETag for versioning") + + @property + def last_modified_datetime(self) -> Optional[datetime]: + """Convert last modified string to datetime.""" + if not self.last_modified: + return None + try: + return datetime.fromisoformat(self.last_modified.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return None + + +class DirectoryListing(BaseResponse): + """Response model for directory listings.""" + + path: str = Field(description="Directory path") + items: List[FileInfo] = Field(description="Files and directories in the path") + total_count: int = Field(description="Total number of items") + directories_count: int = Field(description="Number of directories") + files_count: int = Field(description="Number of files") + total_size: int = Field(default=0, description="Total size of all files in bytes") + + +class ReadFileResponse(BaseResponse): + """Response model for reading file contents.""" + + path: str = Field(description="File path") + content: str = Field(description="File content (text or base64 for binary)") + content_type: str = Field(description="MIME content type") + size: int = Field(description="File size in bytes") + encoding: Optional[str] = Field( + None, description="Encoding used (e.g., 'base64' for binary files)" + ) + etag: Optional[str] = Field(None, description="ETag for versioning") + last_modified: Optional[str] = Field(None, description="Last modification time") + + +class WriteFileResponse(StatusResponse): + """Response model for writing files.""" + + path: str = Field(description="File path that was written") + size: Optional[int] = Field(None, description="Size of the written file") + created: bool = Field(description="Whether a new file was created (vs overwritten)") + + +class CreateDirectoryResponse(StatusResponse): + """Response model for directory creation.""" + + path: str = Field(description="Directory path that was created") + created: bool = Field( + description="Whether directory was created or already existed" + ) + + +class DeleteResourceResponse(StatusResponse): + """Response model for resource deletion.""" + + path: str = Field(description="Path that was deleted") + was_directory: bool = Field( + description="Whether the deleted resource was a directory" + ) + items_deleted: Optional[int] = Field( + None, description="Number of items deleted (for directories)" + ) diff --git a/nextcloud_mcp_server/server/calendar.py b/nextcloud_mcp_server/server/calendar.py index 5e71a2c..c68c73d 100644 --- a/nextcloud_mcp_server/server/calendar.py +++ b/nextcloud_mcp_server/server/calendar.py @@ -5,6 +5,10 @@ from typing import Optional from mcp.server.fastmcp import Context, FastMCP from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.models.calendar import ( + Calendar, + ListCalendarsResponse, +) logger = logging.getLogger(__name__) @@ -12,10 +16,13 @@ logger = logging.getLogger(__name__) def configure_calendar_tools(mcp: FastMCP): # Calendar tools @mcp.tool() - async def nc_calendar_list_calendars(ctx: Context): + async def nc_calendar_list_calendars(ctx: Context) -> ListCalendarsResponse: """List all available calendars for the user""" client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.calendar.list_calendars() + calendars_data = await client.calendar.list_calendars() + + calendars = [Calendar(**cal_data) for cal_data in calendars_data] + return ListCalendarsResponse(calendars=calendars, total_count=len(calendars)) @mcp.tool() async def nc_calendar_create_event( diff --git a/nextcloud_mcp_server/server/contacts.py b/nextcloud_mcp_server/server/contacts.py index 3ee9844..370219e 100644 --- a/nextcloud_mcp_server/server/contacts.py +++ b/nextcloud_mcp_server/server/contacts.py @@ -17,7 +17,7 @@ def configure_contacts_tools(mcp: FastMCP): @mcp.tool() async def nc_contacts_list_contacts(ctx: Context, *, addressbook: str): - """List all addressbooks for the user.""" + """List all contacts in the specified addressbook.""" client: NextcloudClient = ctx.request_context.lifespan_context.client return await client.contacts.list_contacts(addressbook=addressbook) diff --git a/nextcloud_mcp_server/server/notes.py b/nextcloud_mcp_server/server/notes.py index 38b3f29..3a4beb1 100644 --- a/nextcloud_mcp_server/server/notes.py +++ b/nextcloud_mcp_server/server/notes.py @@ -1,8 +1,20 @@ import logging +from httpx import HTTPStatusError from mcp.server.fastmcp import Context, FastMCP from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.models.base import ErrorResponse +from nextcloud_mcp_server.models.notes import ( + Note, + NotesSettings, + CreateNoteResponse, + UpdateNoteResponse, + DeleteNoteResponse, + AppendContentResponse, + SearchNotesResponse, + NoteSearchResult, +) logger = logging.getLogger(__name__) @@ -15,7 +27,8 @@ def configure_notes_tools(mcp: FastMCP): mcp.get_context() ) # https://github.com/modelcontextprotocol/python-sdk/issues/244 client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.get_settings() + settings_data = await client.notes.get_settings() + return NotesSettings(**settings_data) @mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}") async def nc_notes_get_attachment(note_id: int, attachment_filename: str): @@ -38,23 +51,61 @@ def configure_notes_tools(mcp: FastMCP): ] } - @mcp.tool() - async def nc_get_note(note_id: int, ctx: Context): + @mcp.resource("nc://Notes/{note_id}") + async def nc_get_note(note_id: int): """Get user note using note id""" + from mcp.shared.exceptions import McpError + from mcp.types import ErrorData + + ctx: Context = mcp.get_context() client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.get_note(note_id) + try: + note_data = await client.notes.get_note(note_id) + return Note(**note_data) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found")) + elif e.response.status_code == 403: + raise McpError( + ErrorData(code=-1, message=f"Access denied to note {note_id}") + ) + else: + raise McpError( + ErrorData( + code=-1, + message=f"Failed to retrieve note {note_id}: {e.response.reason_phrase}", + ) + ) @mcp.tool() async def nc_notes_create_note( title: str, content: str, category: str, ctx: Context - ): + ) -> CreateNoteResponse | ErrorResponse: """Create a new note""" client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.create_note( - title=title, - content=content, - category=category, - ) + try: + note_data = await client.notes.create_note( + title=title, + content=content, + category=category, + ) + note = Note(**note_data) + return CreateNoteResponse(id=note.id, note=note) + except HTTPStatusError as e: + if e.response.status_code == 403: + return ErrorResponse( + error="Access denied: insufficient permissions to create notes" + ) + elif e.response.status_code == 413: + return ErrorResponse(error="Note content too large") + elif e.response.status_code == 409: + return ErrorResponse( + error=f"A note with title '{title}' already exists in this category" + ) + else: + return ErrorResponse( + error=f"Failed to create note: server error ({e.response.status_code})" + ) @mcp.tool() async def nc_notes_update_note( @@ -64,32 +115,124 @@ def configure_notes_tools(mcp: FastMCP): content: str | None, category: str | None, ctx: Context, - ): + ) -> UpdateNoteResponse | ErrorResponse: + """Update an existing note's title, content, or category""" logger.info("Updating note %s", note_id) client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.update( - note_id=note_id, - etag=etag, - title=title, - content=content, - category=category, - ) + try: + note_data = await client.notes.update( + note_id=note_id, + etag=etag, + title=title, + content=content, + category=category, + ) + note = Note(**note_data) + return UpdateNoteResponse(note=note) + except HTTPStatusError as e: + if e.response.status_code == 404: + return ErrorResponse(error=f"Note {note_id} not found") + elif e.response.status_code == 412: + return ErrorResponse( + error=f"Note {note_id} has been modified by someone else. Please refresh and try again." + ) + elif e.response.status_code == 403: + return ErrorResponse( + error=f"Access denied: insufficient permissions to update note {note_id}" + ) + elif e.response.status_code == 413: + return ErrorResponse(error="Updated note content is too large") + else: + return ErrorResponse( + error=f"Failed to update note {note_id}: server error ({e.response.status_code})" + ) @mcp.tool() - async def nc_notes_append_content(note_id: int, content: str, ctx: Context): + async def nc_notes_append_content( + note_id: int, content: str, ctx: Context + ) -> AppendContentResponse | ErrorResponse: """Append content to an existing note with a clear separator""" logger.info("Appending content to note %s", note_id) client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.append_content(note_id=note_id, content=content) + try: + note_data = await client.notes.append_content( + note_id=note_id, content=content + ) + note = Note(**note_data) + return AppendContentResponse(note=note) + except HTTPStatusError as e: + if e.response.status_code == 404: + return ErrorResponse(error=f"Note {note_id} not found") + elif e.response.status_code == 403: + return ErrorResponse( + error=f"Access denied: insufficient permissions to modify note {note_id}" + ) + elif e.response.status_code == 413: + return ErrorResponse( + error="Content to append would make the note too large" + ) + else: + return ErrorResponse( + error=f"Failed to append content to note {note_id}: server error ({e.response.status_code})" + ) @mcp.tool() - async def nc_notes_search_notes(query: str, ctx: Context): + async def nc_notes_search_notes( + query: str, ctx: Context + ) -> SearchNotesResponse | ErrorResponse: """Search notes by title or content, returning only id, title, and category.""" client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes_search_notes(query=query) + try: + search_results_raw = await client.notes_search_notes(query=query) + + # Convert to NoteSearchResult models, including the _score field + results = [ + NoteSearchResult( + id=result["id"], + title=result["title"], + category=result["category"], + score=result.get("_score"), # Include search score if available + ) + for result in search_results_raw + ] + + return SearchNotesResponse( + results=results, query=query, total_found=len(results) + ) + except HTTPStatusError as e: + if e.response.status_code == 403: + return ErrorResponse( + error="Access denied: insufficient permissions to search notes" + ) + elif e.response.status_code == 400: + return ErrorResponse(error="Invalid search query format") + else: + return ErrorResponse( + error=f"Search failed: server error ({e.response.status_code})" + ) @mcp.tool() - async def nc_notes_delete_note(note_id: int, ctx: Context): + async def nc_notes_delete_note( + note_id: int, ctx: Context + ) -> DeleteNoteResponse | ErrorResponse: + """Delete a note permanently""" logger.info("Deleting note %s", note_id) client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.delete_note(note_id) + try: + await client.notes.delete_note(note_id) + return DeleteNoteResponse( + status_code=200, + message=f"Note {note_id} deleted successfully", + deleted_id=note_id, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + return ErrorResponse(error=f"Note {note_id} not found") + elif e.response.status_code == 403: + return ErrorResponse( + error=f"Access denied: insufficient permissions to delete note {note_id}" + ) + else: + return ErrorResponse( + error=f"Failed to delete note {note_id}: server error ({e.response.status_code})" + ) diff --git a/pyproject.toml b/pyproject.toml index 9d869ea..09bb01d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "pillow (>=11.2.1,<12.0.0)", "icalendar (>=6.0.0,<7.0.0)", "pythonvcard4>=0.2.0", + "pydantic>=2.11.4", ] [tool.pytest.ini_options] diff --git a/tests/integration/test_error_propagation.py b/tests/integration/test_error_propagation.py new file mode 100644 index 0000000..45d5e33 --- /dev/null +++ b/tests/integration/test_error_propagation.py @@ -0,0 +1,186 @@ +"""Test error propagation in the MCP server for various error scenarios.""" + +import logging +from mcp import ClientSession +from mcp.shared.exceptions import McpError + +import pytest + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +async def test_missing_note_resource_error(nc_mcp_client: ClientSession): + """Test that accessing a non-existent note resource returns proper error.""" + # Try to get a non-existent note via resource - should raise McpError with improved message + with pytest.raises(McpError, match=r"Note 999999 not found"): + await nc_mcp_client.read_resource("nc://Notes/999999") + + +@pytest.mark.integration +async def test_delete_missing_note_tool_error(nc_mcp_client: ClientSession): + """Test that deleting a non-existent note returns proper error.""" + # Try to delete a non-existent note + response = await nc_mcp_client.call_tool( + "nc_notes_delete_note", {"note_id": 999999} + ) + + logger.info(f"Delete missing note response: {response}") + + # Should return structured error response with improved message + assert response is not None + assert ( + response.isError is False + ) # Tools now return structured responses, not MCP errors + + # Check structured content for error + assert "success" in response.structuredContent["result"] + assert response.structuredContent["result"]["success"] is False + assert "Note 999999 not found" in response.structuredContent["result"]["error"] + + +@pytest.mark.integration +async def test_search_with_empty_query(nc_mcp_client: ClientSession): + """Test search behavior with empty query.""" + # Search with empty query + response = await nc_mcp_client.call_tool("nc_notes_search_notes", {"query": ""}) + + logger.info(f"Empty search query response: {response}") + + # Should return successful response with empty or valid results + assert response is not None + assert response.isError is False + + +@pytest.mark.integration +async def test_tool_missing_required_parameters(nc_mcp_client: ClientSession): + """Test calling a tool with missing required parameters.""" + # Try to create note with missing parameters + response = await nc_mcp_client.call_tool( + "nc_notes_create_note", + {"title": "Test"}, # Missing content and category + ) + logger.info(f"Missing params response: {response}") + + # Should return error response for missing required parameters + assert response is not None + assert response.isError is True + assert ( + "required" in response.content[0].text.lower() + or "missing" in response.content[0].text.lower() + ) + + +@pytest.mark.integration +async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_client): + """Test updating a note with invalid ETag.""" + # First create a note + note_data = await nc_client.notes.create_note( + title="Test Note for ETag", content="Test content", category="" + ) + note_id = note_data["id"] + + try: + # Try to update with invalid ETag + response = await nc_mcp_client.call_tool( + "nc_notes_update_note", + { + "note_id": note_id, + "etag": "invalid-etag", + "title": "Updated Title", + "content": None, + "category": None, + }, + ) + + logger.info(f"Invalid ETag response: {response}") + + # Should return structured error response with improved message + assert response is not None + assert response.isError is False # Tools now return structured responses + assert "success" in response.structuredContent["result"] + assert response.structuredContent["result"]["success"] is False + assert ( + "modified by someone else" in response.structuredContent["result"]["error"] + ) + + finally: + # Clean up + await nc_client.notes.delete_note(note_id) + + +@pytest.mark.integration +async def test_calendar_missing_calendar_error(nc_mcp_client: ClientSession): + """Test calendar operations with non-existent calendar.""" + # Try to create event in non-existent calendar + response = await nc_mcp_client.call_tool( + "nc_calendar_create_event", + { + "calendar_name": "non-existent-calendar", + "title": "Test Event", + "start_datetime": "2025-01-15T14:00:00", + }, + ) + + logger.info(f"Non-existent calendar response: {response}") + + # Should return structured error response + assert response is not None + # Note: Some modules may not have improved error handling yet + # Check if we have structured content with success=false or isError=true + if ( + hasattr(response, "structuredContent") + and response.structuredContent + and "result" in response.structuredContent + ): + assert response.structuredContent["result"]["success"] is False + else: + assert response.isError is True + + +@pytest.mark.integration +async def test_webdav_read_missing_file_error(nc_mcp_client: ClientSession): + """Test WebDAV operations with non-existent file.""" + # Try to read a non-existent file + response = await nc_mcp_client.call_tool( + "nc_webdav_read_file", {"path": "non-existent-file.txt"} + ) + + logger.info(f"Missing file response: {response}") + + # Should return structured error response + assert response is not None + # Note: Some modules may not have improved error handling yet + # Check if we have structured content with success=false or isError=true + if ( + hasattr(response, "structuredContent") + and response.structuredContent + and "result" in response.structuredContent + ): + assert response.structuredContent["result"]["success"] is False + else: + assert response.isError is True + + +@pytest.mark.integration +async def test_tables_missing_table_error(nc_mcp_client: ClientSession): + """Test Tables operations with non-existent table.""" + # Try to get schema of non-existent table + response = await nc_mcp_client.call_tool( + "nc_tables_get_schema", {"table_id": 999999} + ) + + logger.info(f"Missing table response: {response}") + + # Should return structured error response + assert response is not None + # Note: Some modules may not have improved error handling yet + # Check if we have structured content with success=false or isError=true + if ( + hasattr(response, "structuredContent") + and response.structuredContent + and "result" in response.structuredContent + ): + assert response.structuredContent["result"]["success"] is False + else: + assert response.isError is True diff --git a/tests/integration/test_mcp.py b/tests/integration/test_mcp.py index 9eba0d4..06b89ff 100644 --- a/tests/integration/test_mcp.py +++ b/tests/integration/test_mcp.py @@ -24,7 +24,6 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession): # Verify expected tools are present expected_tools = [ - "nc_get_note", "nc_notes_create_note", "nc_notes_update_note", "nc_notes_append_content", @@ -137,11 +136,9 @@ async def test_mcp_notes_crud_workflow( # 3. Read note via MCP logger.info(f"Reading note via MCP: {note_id}") - read_result = await nc_mcp_client.call_tool("nc_get_note", {"note_id": note_id}) - assert read_result.isError is False, ( - f"MCP note read failed: {read_result.content}" - ) - read_note_data = json.loads(read_result.content[0].text) + read_result = await nc_mcp_client.read_resource(f"nc://Notes/{note_id}") + assert len(read_result.contents) == 1, "Expected exactly one content item" + read_note_data = json.loads(read_result.contents[0].text) assert read_note_data["title"] == test_title assert read_note_data["content"] == test_content @@ -199,14 +196,23 @@ async def test_mcp_notes_crud_workflow( ) search_notes_text = search_result.content[0].text logger.info(f"Search result text: {search_notes_text}") - search_notes = json.loads(search_notes_text) + search_response = json.loads(search_notes_text) - # Ensure search_notes is a list - if not isinstance(search_notes, list): - logger.warning( - f"Expected search results to be a list, got: {type(search_notes)}" - ) - search_notes = [search_notes] if search_notes else [] + # Expect structured response with Pydantic format + assert isinstance(search_response, dict), ( + f"Expected search response to be a dict with structured format, got: {type(search_response)}" + ) + assert "results" in search_response, ( + f"Expected 'results' field in search response, got keys: {list(search_response.keys())}" + ) + assert "success" in search_response and search_response["success"], ( + f"Expected successful search response, got: {search_response}" + ) + + search_notes = search_response["results"] + assert isinstance(search_notes, list), ( + f"Expected results to be a list, got: {type(search_notes)}" + ) # Find our note in search results found_note = None @@ -216,7 +222,7 @@ async def test_mcp_notes_crud_workflow( break assert found_note is not None, ( - f"Created note not found in search results. Search returned: {search_notes}" + f"Created note not found in search results. Search returned: {search_response}" ) assert found_note["title"] == updated_title @@ -431,21 +437,33 @@ async def test_mcp_calendar_workflow( f"MCP calendar listing failed: {calendars_result.content}" ) - calendars_data = json.loads(calendars_result.content[0].text) + calendars_response = json.loads(calendars_result.content[0].text) # Debug output to understand the structure - logger.info(f"calendars_data type: {type(calendars_data)}") - logger.info(f"calendars_data content: {calendars_data}") + logger.info(f"calendars_response type: {type(calendars_response)}") + logger.info(f"calendars_response content: {calendars_response}") - # Handle the case where MCP tool returns a single dict instead of a list - if isinstance(calendars_data, dict): - # Single calendar returned as dict instead of list - calendar_name = calendars_data["name"] - elif isinstance(calendars_data, list) and calendars_data: - # Normal case - list of calendars - calendar_name = calendars_data[0]["name"] - else: + # Expect structured response with Pydantic format + assert isinstance(calendars_response, dict), ( + f"Expected calendar response to be a dict with structured format, got: {type(calendars_response)}" + ) + assert "calendars" in calendars_response, ( + f"Expected 'calendars' field in response, got keys: {list(calendars_response.keys())}" + ) + assert "success" in calendars_response and calendars_response["success"], ( + f"Expected successful calendar response, got: {calendars_response}" + ) + + calendars_list = calendars_response["calendars"] + assert isinstance(calendars_list, list), ( + f"Expected calendars to be a list, got: {type(calendars_list)}" + ) + + if not calendars_list: pytest.skip("No calendars available for testing") + + # Use the first available calendar + calendar_name = calendars_list[0]["name"] logger.info(f"Using calendar: {calendar_name}") # 2. Create event via MCP diff --git a/uv.lock b/uv.lock index 6f5bb55..1df9d10 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11" [[package]] @@ -512,6 +512,7 @@ dependencies = [ { name = "icalendar" }, { name = "mcp", extra = ["cli"] }, { name = "pillow" }, + { name = "pydantic" }, { name = "pythonvcard4" }, ] @@ -531,6 +532,7 @@ requires-dist = [ { name = "icalendar", specifier = ">=6.0.0,<7.0.0" }, { name = "mcp", extras = ["cli"], specifier = ">=1.10,<1.11" }, { name = "pillow", specifier = ">=11.2.1,<12.0.0" }, + { name = "pydantic", specifier = ">=2.11.4" }, { name = "pythonvcard4", specifier = ">=0.2.0" }, ] From 1cc65f0160644d4a254d0ebba2abda64305bb21d Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 18:31:45 +0200 Subject: [PATCH 6/7] chore: Remove unused model --- nextcloud_mcp_server/models/__init__.py | 2 -- nextcloud_mcp_server/models/base.py | 22 +--------------------- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/nextcloud_mcp_server/models/__init__.py b/nextcloud_mcp_server/models/__init__.py index f51ed4f..4a06d95 100644 --- a/nextcloud_mcp_server/models/__init__.py +++ b/nextcloud_mcp_server/models/__init__.py @@ -5,7 +5,6 @@ from .base import ( BaseResponse, ErrorResponse, SuccessResponse, - PaginatedResponse, IdResponse, StatusResponse, ) @@ -85,7 +84,6 @@ __all__ = [ "BaseResponse", "ErrorResponse", "SuccessResponse", - "PaginatedResponse", "IdResponse", "StatusResponse", # Notes models diff --git a/nextcloud_mcp_server/models/base.py b/nextcloud_mcp_server/models/base.py index d89b4d2..d9303a7 100644 --- a/nextcloud_mcp_server/models/base.py +++ b/nextcloud_mcp_server/models/base.py @@ -1,7 +1,7 @@ """Base Pydantic models for common response patterns.""" from datetime import datetime -from typing import Any, Dict, Generic, List, Optional, TypeVar, Union +from typing import Any, Dict, Optional, Union from pydantic import BaseModel, Field @@ -37,26 +37,6 @@ class SuccessResponse(BaseResponse): data: Optional[Dict[str, Any]] = Field(None, description="Optional response data") -T = TypeVar("T") - - -class PaginatedResponse(BaseResponse, Generic[T]): - """Generic paginated response model.""" - - items: List[T] = Field(description="List of items") - total_count: Optional[int] = Field( - None, description="Total number of items available" - ) - page: Optional[int] = Field(None, description="Current page number") - page_size: Optional[int] = Field(None, description="Number of items per page") - has_more: Optional[bool] = Field( - None, description="Whether more items are available" - ) - cursor: Optional[str] = Field( - None, description="Cursor for next page (if using cursor-based pagination)" - ) - - class IdResponse(BaseResponse): """Response model for operations that return a new ID.""" From 4cf5f2a95a70885c53c7d01a750323f5dea33971 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 19:14:26 +0200 Subject: [PATCH 7/7] feat(client): Preserve fields when modifying contacts/calendar resources --- nextcloud_mcp_server/client/calendar.py | 147 ++++++- nextcloud_mcp_server/client/contacts.py | 201 +++++++++ nextcloud_mcp_server/server/contacts.py | 17 + tests/integration/test_field_preservation.py | 436 +++++++++++++++++++ 4 files changed, 790 insertions(+), 11 deletions(-) create mode 100644 tests/integration/test_field_preservation.py diff --git a/nextcloud_mcp_server/client/calendar.py b/nextcloud_mcp_server/client/calendar.py index 1057b80..98830d3 100644 --- a/nextcloud_mcp_server/client/calendar.py +++ b/nextcloud_mcp_server/client/calendar.py @@ -238,27 +238,33 @@ class CalendarClient(BaseNextcloudClient): event_data: Dict[str, Any], etag: str = "", ) -> Dict[str, Any]: - """Update an existing calendar event.""" + """Update an existing calendar event while preserving all existing properties.""" event_filename = f"{event_uid}.ics" event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" - # Get existing event data to merge with updates - existing_event_data = {} + # Get raw iCal content to preserve all properties including extended ones + raw_ical_content = "" if not etag: try: - existing_event_data, current_etag = await self.get_event( + raw_ical_content, current_etag = await self._get_raw_ical( calendar_name, event_uid ) etag = current_etag except Exception: - # Continue without etag if we can't get it - pass + # Fall back to creating new iCal if we can't get existing + logger.warning( + f"Could not fetch existing iCal for {event_uid}, creating new" + ) + raw_ical_content = "" - # Merge existing data with new data (new data takes precedence) - merged_data = {**existing_event_data, **event_data} - - # Create updated iCalendar event - ical_content = self._create_ical_event(merged_data, event_uid) + # Create updated iCalendar event preserving existing properties + if raw_ical_content: + ical_content = self._merge_ical_properties( + raw_ical_content, event_data, event_uid + ) + else: + # Fallback to creating new iCal if we couldn't get existing + ical_content = self._create_ical_event(event_data, event_uid) headers = { "Content-Type": "text/calendar; charset=utf-8", @@ -949,3 +955,122 @@ class CalendarClient(BaseNextcloudClient): except Exception as e: logger.error(f"Error deleting calendar {calendar_name}: {e}") raise + + async def _get_raw_ical( + self, calendar_name: str, event_uid: str + ) -> Tuple[str, str]: + """Get raw iCal content for an event without parsing.""" + event_filename = f"{event_uid}.ics" + event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" + + headers = {"Accept": "text/calendar"} + + try: + response = await self._make_request("GET", event_path, headers=headers) + etag = response.headers.get("etag", "") + return response.text, etag + except Exception as e: + logger.error(f"Error getting raw iCal for {event_uid}: {e}") + raise + + def _merge_ical_properties( + self, raw_ical: str, event_data: Dict[str, Any], event_uid: str + ) -> str: + """Merge new event data into existing raw iCal while preserving all properties.""" + try: + # Parse existing iCal + cal = Calendar.from_ical(raw_ical) + + # Find the VEVENT component + for component in cal.walk(): + if component.name == "VEVENT": + # Update only the properties that were provided in event_data + if "title" in event_data: + component["SUMMARY"] = event_data["title"] + if "description" in event_data: + component["DESCRIPTION"] = event_data["description"] + if "location" in event_data: + component["LOCATION"] = event_data["location"] + if "status" in event_data: + component["STATUS"] = event_data["status"].upper() + if "priority" in event_data: + component["PRIORITY"] = event_data["priority"] + if "privacy" in event_data: + component["CLASS"] = event_data["privacy"].upper() + if "url" in event_data: + component["URL"] = event_data["url"] + + # Handle dates + if "start_datetime" in event_data: + start_str = event_data["start_datetime"] + all_day = event_data.get("all_day", False) + if all_day: + start_date = dt.datetime.fromisoformat( + start_str.split("T")[0] + ).date() + component["DTSTART"] = start_date + else: + start_dt = dt.datetime.fromisoformat( + start_str.replace("Z", "+00:00") + ) + component["DTSTART"] = start_dt + + if "end_datetime" in event_data: + end_str = event_data["end_datetime"] + all_day = event_data.get("all_day", False) + if all_day: + end_date = dt.datetime.fromisoformat( + end_str.split("T")[0] + ).date() + component["DTEND"] = end_date + else: + end_dt = dt.datetime.fromisoformat( + end_str.replace("Z", "+00:00") + ) + component["DTEND"] = end_dt + + # Handle categories + if "categories" in event_data: + categories = event_data["categories"] + if categories: + component["CATEGORIES"] = categories.split(",") + + # Handle recurrence + if "recurring" in event_data: + if event_data["recurring"] and "recurrence_rule" in event_data: + recurrence_rule = event_data["recurrence_rule"] + if recurrence_rule: + component["RRULE"] = vRecur.from_ical(recurrence_rule) + elif not event_data["recurring"]: + # Remove recurrence if set to False + if "RRULE" in component: + del component["RRULE"] + + # Handle attendees + if "attendees" in event_data: + attendees = event_data["attendees"] + # Remove existing attendees + component.pop("ATTENDEE", None) + if attendees: + for email in attendees.split(","): + if email.strip(): + component.add("ATTENDEE", f"mailto:{email.strip()}") + + # Update timestamps in proper iCal format + from icalendar import vDDDTypes + + now = dt.datetime.now(dt.UTC) + component["LAST-MODIFIED"] = vDDDTypes(now) + component["DTSTAMP"] = vDDDTypes(now) + + # Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.) + # by not touching them - they remain in the component + + break + + return cal.to_ical().decode("utf-8") + + except Exception as e: + logger.error(f"Error merging iCal properties: {e}") + # Fallback to creating new iCal + return self._create_ical_event(event_data, event_uid) diff --git a/nextcloud_mcp_server/client/contacts.py b/nextcloud_mcp_server/client/contacts.py index 11ecc3f..460a884 100644 --- a/nextcloud_mcp_server/client/contacts.py +++ b/nextcloud_mcp_server/client/contacts.py @@ -143,6 +143,50 @@ class ContactsClient(BaseNextcloudClient): url = f"{carddav_path}/{addressbook}/{uid}.vcf" await self._make_request("DELETE", url) + async def update_contact( + self, *, addressbook: str, uid: str, contact_data: dict, etag: str = "" + ): + """Update an existing contact while preserving all existing properties.""" + carddav_path = self._get_carddav_base_path() + url = f"{carddav_path}/{addressbook}/{uid}.vcf" + + # Get raw vCard content to preserve all properties including extended ones + raw_vcard_content = "" + if not etag: + try: + raw_vcard_content, current_etag = await self._get_raw_vcard( + addressbook, uid + ) + etag = current_etag + except Exception: + # Fall back to creating new vCard if we can't get existing + logger.warning( + f"Could not fetch existing vCard for {uid}, creating new" + ) + raw_vcard_content = "" + + # Create updated vCard preserving existing properties + if raw_vcard_content: + vcard_content = self._merge_vcard_properties( + raw_vcard_content, contact_data, uid + ) + else: + # Fallback to creating new vCard if we couldn't get existing + contact = Contact(fn=contact_data.get("fn"), uid=uid) + if "email" in contact_data: + contact.email = [{"value": contact_data["email"], "type": ["HOME"]}] + if "tel" in contact_data: + contact.tel = [{"value": contact_data["tel"], "type": ["HOME"]}] + vcard_content = contact.to_vcard() + + headers = { + "Content-Type": "text/vcard; charset=utf-8", + } + if etag: + headers["If-Match"] = etag + + await self._make_request("PUT", url, content=vcard_content, headers=headers) + async def list_contacts(self, *, addressbook: str): """List all available contacts for addressbook.""" @@ -233,3 +277,160 @@ class ContactsClient(BaseNextcloudClient): logger.debug(f"Found {len(contacts)} contacts") return contacts + + async def _get_raw_vcard(self, addressbook: str, uid: str) -> tuple[str, str]: + """Get raw vCard content for a contact without parsing.""" + carddav_path = self._get_carddav_base_path() + url = f"{carddav_path}/{addressbook}/{uid}.vcf" + + try: + response = await self._make_request("GET", url) + etag = response.headers.get("etag", "") + return response.text, etag + except Exception as e: + logger.error(f"Error getting raw vCard for {uid}: {e}") + raise + + def _merge_vcard_properties( + self, raw_vcard: str, contact_data: dict, uid: str + ) -> str: + """Merge new contact data into existing raw vCard while preserving all properties.""" + try: + # Instead of using pythonvCard4 which has formatting issues, + # let's do a simple text-based merge to preserve exact formatting + + # Start with the original vCard + lines = raw_vcard.strip().split("\n") + updated_lines = [] + + # Track what we've updated to avoid duplicates + updated_properties = set() + + for line in lines: + line = line.strip() + if not line: + continue + + # Skip the END:VCARD line for now + if line == "END:VCARD": + continue + + property_name = line.split(":")[0].split(";")[0] + + # Handle updates for specific properties + if property_name == "FN" and "fn" in contact_data: + updated_lines.append(f"FN:{contact_data['fn']}") + updated_properties.add("fn") + elif property_name == "EMAIL" and "email" in contact_data: + # Replace first email with new one, preserve others + if "email" not in updated_properties: + if isinstance(contact_data["email"], str): + # Try to preserve the original format as much as possible + if ";TYPE=" in line: + type_part = line.split(";TYPE=")[1].split(":")[0] + updated_lines.append( + f"EMAIL;TYPE={type_part}:{contact_data['email']}" + ) + else: + updated_lines.append(f"EMAIL:{contact_data['email']}") + updated_properties.add("email") + else: + # Keep additional emails unchanged + updated_lines.append(line) + elif property_name == "TEL" and "tel" in contact_data: + # Similar handling for phone numbers + if "tel" not in updated_properties: + if isinstance(contact_data["tel"], str): + if ";TYPE=" in line: + type_part = line.split(";TYPE=")[1].split(":")[0] + updated_lines.append( + f"TEL;TYPE={type_part}:{contact_data['tel']}" + ) + else: + updated_lines.append(f"TEL:{contact_data['tel']}") + updated_properties.add("tel") + else: + # Keep additional phone numbers unchanged + updated_lines.append(line) + elif property_name == "NOTE" and "note" in contact_data: + updated_lines.append(f"NOTE:{contact_data['note']}") + updated_properties.add("note") + elif property_name == "NICKNAME" and "nickname" in contact_data: + nickname_value = contact_data["nickname"] + if isinstance(nickname_value, list): + nickname_value = ",".join(nickname_value) + updated_lines.append(f"NICKNAME:{nickname_value}") + updated_properties.add("nickname") + elif property_name == "BDAY" and "bday" in contact_data: + updated_lines.append(f"BDAY:{contact_data['bday']}") + updated_properties.add("bday") + elif property_name == "CATEGORIES" and "categories" in contact_data: + categories_value = contact_data["categories"] + if isinstance(categories_value, list): + categories_value = ",".join(categories_value) + updated_lines.append(f"CATEGORIES:{categories_value}") + updated_properties.add("categories") + elif property_name == "ORG" and ( + "org" in contact_data or "organization" in contact_data + ): + org_value = contact_data.get("org") or contact_data.get( + "organization" + ) + updated_lines.append(f"ORG:{org_value}") + updated_properties.add("org") + elif property_name == "TITLE" and "title" in contact_data: + updated_lines.append(f"TITLE:{contact_data['title']}") + updated_properties.add("title") + else: + # Keep all other properties unchanged (preserves all extended/custom fields) + updated_lines.append(line) + + # Add any new properties that weren't in the original vCard + for key, value in contact_data.items(): + if key not in updated_properties: + if key == "fn": + updated_lines.append(f"FN:{value}") + elif key == "email" and isinstance(value, str): + updated_lines.append(f"EMAIL:{value}") + elif key == "tel" and isinstance(value, str): + updated_lines.append(f"TEL:{value}") + elif key == "note": + updated_lines.append(f"NOTE:{value}") + elif key == "nickname": + nickname_value = ( + value if isinstance(value, str) else ",".join(value) + ) + updated_lines.append(f"NICKNAME:{nickname_value}") + elif key == "bday": + updated_lines.append(f"BDAY:{value}") + elif key == "categories": + categories_value = ( + value if isinstance(value, str) else ",".join(value) + ) + updated_lines.append(f"CATEGORIES:{categories_value}") + elif key in ["org", "organization"]: + updated_lines.append(f"ORG:{value}") + elif key == "title": + updated_lines.append(f"TITLE:{value}") + + # Add the END:VCARD line + updated_lines.append("END:VCARD") + + # Join all lines + return "\n".join(updated_lines) + + except Exception as e: + logger.error(f"Error merging vCard properties: {e}") + # Fallback to creating basic vCard matching Nextcloud format + basic_vcard = f"""BEGIN:VCARD +VERSION:3.0 +UID:{uid} +FN:{contact_data.get("fn", "Unknown")}""" + + if "email" in contact_data: + basic_vcard += f"\nEMAIL:{contact_data['email']}" + if "tel" in contact_data: + basic_vcard += f"\nTEL:{contact_data['tel']}" + + basic_vcard += "\nEND:VCARD" + return basic_vcard diff --git a/nextcloud_mcp_server/server/contacts.py b/nextcloud_mcp_server/server/contacts.py index 370219e..78a63ef 100644 --- a/nextcloud_mcp_server/server/contacts.py +++ b/nextcloud_mcp_server/server/contacts.py @@ -63,3 +63,20 @@ def configure_contacts_tools(mcp: FastMCP): """Delete a contact.""" client: NextcloudClient = ctx.request_context.lifespan_context.client return await client.contacts.delete_contact(addressbook=addressbook, uid=uid) + + @mcp.tool() + async def nc_contacts_update_contact( + ctx: Context, *, addressbook: str, uid: str, contact_data: dict, etag: str = "" + ): + """Update an existing contact while preserving all existing properties. + + Args: + addressbook: The name of the addressbook containing the contact. + uid: The unique ID of the contact to update. + contact_data: A dictionary with the contact's updated details, e.g. {"fn": "Jane Doe", "email": "jane.doe@example.com"}. + etag: Optional ETag for optimistic concurrency control. + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.contacts.update_contact( + addressbook=addressbook, uid=uid, contact_data=contact_data, etag=etag + ) diff --git a/tests/integration/test_field_preservation.py b/tests/integration/test_field_preservation.py new file mode 100644 index 0000000..62bb473 --- /dev/null +++ b/tests/integration/test_field_preservation.py @@ -0,0 +1,436 @@ +"""Integration tests for CalDAV and CardDAV field preservation. + +This test module demonstrates data loss issues when non-supported fields +are present in calendar events and contacts during round-trip operations. +""" + +import logging +import pytest +import uuid +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +async def test_calendar_event_custom_fields_preservation(nc_client): + """Test that demonstrates loss of non-supported iCal fields during round-trip operations.""" + calendar_name = "personal" + + # Create an event with standard fields + event_data = { + "title": "Test Event with Custom Fields", + "description": "Event to test custom field preservation", + "start_datetime": (datetime.now() + timedelta(days=1)).isoformat(), + "end_datetime": (datetime.now() + timedelta(days=1, hours=1)).isoformat(), + "location": "Test Location", + } + + # Create the event + result = await nc_client.calendar.create_event(calendar_name, event_data) + event_uid = result["uid"] + + try: + # Now manually inject a custom iCal property by creating a new version with raw iCal + # This simulates what would happen if the event was created by another CalDAV client + # with extended properties + custom_ical = f"""BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//Test Client//EN +BEGIN:VEVENT +UID:{event_uid} +DTSTART:{(datetime.now() + timedelta(days=1)).strftime("%Y%m%dT%H%M%SZ")} +DTEND:{(datetime.now() + timedelta(days=1, hours=1)).strftime("%Y%m%dT%H%M%SZ")} +SUMMARY:Test Event with Custom Fields +DESCRIPTION:Event to test custom field preservation +LOCATION:Test Location +X-CUSTOM-FIELD:This is a custom field that should be preserved +X-VENDOR-SPECIFIC:Vendor specific data +CATEGORIES:work,testing +STATUS:CONFIRMED +PRIORITY:5 +CLASS:PUBLIC +CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +END:VEVENT +END:VCALENDAR""" + + # Direct CalDAV PUT to inject the custom iCal + event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics" + await nc_client.calendar._make_request( + "PUT", + event_path, + content=custom_ical, + headers={"Content-Type": "text/calendar; charset=utf-8"}, + ) + + logger.info(f"Injected custom iCal properties into event {event_uid}") + + # Retrieve the event to confirm custom fields are present in raw iCal + response = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + raw_ical_before = response.text + + logger.info("Raw iCal before update:") + logger.info(raw_ical_before) + + # Verify custom fields exist in raw iCal + assert ( + "X-CUSTOM-FIELD:This is a custom field that should be preserved" + in raw_ical_before + ) + assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_before + + # Now update the event through the MCP client (simulating normal usage) + update_data = { + "title": "Updated Test Event with Custom Fields", + "description": "Updated description - custom fields should be preserved", + } + + await nc_client.calendar.update_event(calendar_name, event_uid, update_data) + logger.info(f"Updated event {event_uid} through MCP client") + + # Retrieve the event again to see if custom fields survived + response_after = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + raw_ical_after = response_after.text + + logger.info("Raw iCal after update:") + logger.info(raw_ical_after) + + # THIS IS THE TEST THAT SHOULD FAIL - custom fields should be preserved but won't be + try: + assert ( + "X-CUSTOM-FIELD:This is a custom field that should be preserved" + in raw_ical_after + ), "Custom field X-CUSTOM-FIELD was lost during round-trip update" + assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_after, ( + "Custom field X-VENDOR-SPECIFIC was lost during round-trip update" + ) + logger.info( + "✓ Custom fields were preserved (unexpected - this should fail with current implementation)" + ) + except AssertionError as e: + logger.error(f"✗ Custom fields were lost during round-trip update: {e}") + # Re-raise to show the test failure + raise + + finally: + # Cleanup + try: + await nc_client.calendar.delete_event(calendar_name, event_uid) + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}") + + +@pytest.mark.integration +async def test_contact_extended_fields_preservation(nc_client): + """Test that demonstrates loss of extended vCard fields during round-trip operations.""" + addressbook_name = f"test_preservation_{uuid.uuid4().hex[:8]}" + + # Create a temporary addressbook + await nc_client.contacts.create_addressbook( + name=addressbook_name, display_name="Test Preservation Addressbook" + ) + + try: + contact_uid = str(uuid.uuid4()) + + # Create a contact with minimal data first + basic_contact_data = { + "fn": "John Extended Doe", + "email": "john.extended@example.com", + } + + await nc_client.contacts.create_contact( + addressbook=addressbook_name, + uid=contact_uid, + contact_data=basic_contact_data, + ) + + logger.info(f"Created basic contact {contact_uid}") + + # Now inject a rich vCard with extended fields directly via CardDAV + extended_vcard = f"""BEGIN:VCARD +VERSION:4.0 +UID:{contact_uid} +FN:John Extended Doe +N:Doe;John;Extended;; +NICKNAME:Johnny,JD +EMAIL;TYPE=work:john.work@company.com +EMAIL;TYPE=home:john.extended@example.com +TEL;TYPE=cell:+1-555-123-4567 +TEL;TYPE=work:+1-555-987-6543 +ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA +ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA +ORG:Example Corporation +TITLE:Senior Developer +URL;TYPE=work:https://company.com/john +URL;TYPE=personal:https://johndoe.dev +BDAY:1985-06-15 +NOTE:This is a note with important information that should be preserved. +CATEGORIES:colleagues,developers,friends +X-CUSTOM-FIELD:This should be preserved +X-SKYPE:john.doe.skype +X-LINKEDIN:https://linkedin.com/in/johndoe +REV:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +END:VCARD""" + + # Direct CardDAV PUT to inject the extended vCard + contact_path = f"/remote.php/dav/addressbooks/users/{nc_client.contacts.username}/{addressbook_name}/{contact_uid}.vcf" + await nc_client.contacts._make_request( + "PUT", + contact_path, + content=extended_vcard, + headers={"Content-Type": "text/vcard; charset=utf-8"}, + ) + + logger.info(f"Injected extended vCard for contact {contact_uid}") + + # Retrieve the contact to confirm extended fields are present in raw vCard + response = await nc_client.contacts._make_request("GET", contact_path) + raw_vcard_before = response.text + + logger.info("Raw vCard before any operations:") + logger.info(raw_vcard_before) + + # Verify extended fields exist in raw vCard + assert "TEL;TYPE=cell:+1-555-123-4567" in raw_vcard_before + assert "ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA" in raw_vcard_before + assert "ORG:Example Corporation" in raw_vcard_before + assert "TITLE:Senior Developer" in raw_vcard_before + assert "X-CUSTOM-FIELD:This should be preserved" in raw_vcard_before + assert "X-LINKEDIN:https://linkedin.com/in/johndoe" in raw_vcard_before + assert "NOTE:This is a note with important information" in raw_vcard_before + + # List contacts through the MCP client (this will parse and return limited fields) + contacts = await nc_client.contacts.list_contacts(addressbook=addressbook_name) + our_contact = next((c for c in contacts if c["vcard_id"] == contact_uid), None) + + assert our_contact is not None + logger.info("Contact as parsed by MCP client:") + logger.info(our_contact) + + # Check what fields are accessible through the parsed contact + parsed_contact = our_contact["contact"] + + # These should be available (basic fields that are parsed) + assert parsed_contact["fullname"] == "John Extended Doe" + assert parsed_contact["email"] is not None # Some email should be present + + # The raw vCard should still be available in addressdata + raw_addressdata = our_contact["addressdata"] + assert "X-CUSTOM-FIELD:This should be preserved" in raw_addressdata + assert "ORG:Example Corporation" in raw_addressdata + + # The key test: Can we update this contact without losing extended field data? + logger.info("Testing contact update preservation...") + + # Update the contact through the MCP client with a simple change + try: + await nc_client.contacts.update_contact( + addressbook=addressbook_name, + uid=contact_uid, + contact_data={"email": "john.updated@example.com"}, + ) + logger.info("✓ Contact updated successfully") + except Exception as e: + logger.error(f"✗ Failed to update contact: {e}") + raise + + # Retrieve the contact again to see if extended fields survived + contacts_after = await nc_client.contacts.list_contacts( + addressbook=addressbook_name + ) + updated_contact = next( + (c for c in contacts_after if c["vcard_id"] == contact_uid), None + ) + + assert updated_contact is not None, "Contact not found after update" + updated_addressdata = updated_contact["addressdata"] + + logger.info("Raw vCard after contact update:") + logger.info(updated_addressdata) + + # THIS IS THE CRITICAL TEST - extended fields should be preserved during updates + extended_field_checks = [ + ("ORG:Example Corporation", "organization field"), + ("TITLE:Senior Developer", "title field"), + ("TEL;TYPE=cell:+1-555-123-4567", "cell phone"), + ("TEL;TYPE=work:+1-555-987-6543", "work phone"), + ("ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA", "home address"), + ("ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA", "work address"), + ("URL;TYPE=work;VALUE=URI:https://company.com/john", "work URL"), + ("NOTE:This is a note with important information", "note field"), + ("CATEGORIES:colleagues,developers,friends", "categories"), + ("X-CUSTOM-FIELD:This should be preserved", "custom field"), + ("X-LINKEDIN:https://linkedin.com/in/johndoe", "LinkedIn custom field"), + ("john.updated@example.com", "updated email"), + ] + + all_preserved = True + for field_pattern, field_name in extended_field_checks: + if field_pattern in updated_addressdata: + logger.info(f"✓ {field_name} preserved") + else: + logger.error(f"✗ {field_name} was lost during update") + all_preserved = False + + # The test should PASS - field preservation should work + assert all_preserved, ( + "Contact update lost extended field data - this indicates the preservation mechanism failed" + ) + + logger.info("🎉 SUCCESS: All extended fields preserved during contact update!") + + finally: + # Cleanup + try: + await nc_client.contacts.delete_addressbook(name=addressbook_name) + except Exception as cleanup_error: + logger.warning( + f"Failed to cleanup addressbook {addressbook_name}: {cleanup_error}" + ) + + +@pytest.mark.integration +async def test_calendar_event_roundtrip_data_loss_demonstration(nc_client): + """Demonstrates specific data loss scenarios in calendar events.""" + calendar_name = "personal" + + event_data = { + "title": "Roundtrip Test Event", + "description": "Testing data preservation", + "start_datetime": (datetime.now() + timedelta(days=2)).isoformat(), + "end_datetime": (datetime.now() + timedelta(days=2, hours=1)).isoformat(), + } + + result = await nc_client.calendar.create_event(calendar_name, event_data) + event_uid = result["uid"] + + try: + # Inject additional iCal properties that are valid but not supported by our parser + extended_ical = f"""BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//Extended Client//EN +BEGIN:VEVENT +UID:{event_uid} +DTSTART:{(datetime.now() + timedelta(days=2)).strftime("%Y%m%dT%H%M%SZ")} +DTEND:{(datetime.now() + timedelta(days=2, hours=1)).strftime("%Y%m%dT%H%M%SZ")} +SUMMARY:Roundtrip Test Event +DESCRIPTION:Testing data preservation +STATUS:CONFIRMED +PRIORITY:5 +CLASS:PUBLIC +SEQUENCE:1 +X-MICROSOFT-CDO-ALLDAYEVENT:FALSE +X-MICROSOFT-CDO-IMPORTANCE:1 +X-CUSTOM-MEETING-ID:12345-67890 +X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890 +ORGANIZER;CN=Test Organizer:mailto:organizer@example.com +COMMENT:This is a comment that should be preserved +LOCATION:Conference Room A +GEO:40.7128;-74.0060 +TRANSP:OPAQUE +CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +END:VEVENT +END:VCALENDAR""" + + # Inject the extended iCal + event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics" + await nc_client.calendar._make_request( + "PUT", + event_path, + content=extended_ical, + headers={"Content-Type": "text/calendar; charset=utf-8"}, + ) + + # Verify extended properties are present + response = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + original_ical = response.text + + # Confirm extended properties exist + extended_properties = [ + "SEQUENCE:1", + "X-MICROSOFT-CDO-ALLDAYEVENT:FALSE", + "X-CUSTOM-MEETING-ID:12345-67890", + "X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890", + "ORGANIZER;CN=Test Organizer:mailto:organizer@example.com", + "COMMENT:This is a comment that should be preserved", + "GEO:40.7128;-74.0060", + "TRANSP:OPAQUE", + ] + + # More flexible patterns for properties that might be reformatted + flexible_patterns = { + "ORGANIZER;CN=Test Organizer:mailto:organizer@example.com": [ + "ORGANIZER;CN=Test Organizer:mailto:organizer@example.com", + 'ORGANIZER;CN="Test Organizer":mailto:organizer@example.com', + ], + "GEO:40.7128;-74.0060": [ + "GEO:40.7128;-74.0060", + "GEO:40.7128;-74.006", # May lose trailing zero + ], + } + + for prop in extended_properties: + assert prop in original_ical, ( + f"Extended property {prop} not found in original iCal" + ) + + logger.info("✓ All extended properties confirmed in original iCal") + + # Now perform a simple update through MCP + update_data = {"location": "Conference Room B"} # Simple location change + await nc_client.calendar.update_event(calendar_name, event_uid, update_data) + + # Check what survived the round-trip + response_after = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + updated_ical = response_after.text + + logger.info("Checking which properties survived the update...") + + # Check which extended properties survived + survived = [] + lost = [] + + for prop in extended_properties: + # Check if this property has flexible patterns + if prop in flexible_patterns: + # Check if any of the flexible patterns match + found = any( + pattern in updated_ical for pattern in flexible_patterns[prop] + ) + if found: + survived.append(prop) + else: + lost.append(prop) + else: + # Standard exact match + if prop in updated_ical: + survived.append(prop) + else: + lost.append(prop) + + logger.info(f"Properties that SURVIVED: {survived}") + logger.error(f"Properties that were LOST: {lost}") + + # This test should fail - we expect data loss + assert len(lost) == 0, ( + f"Round-trip update lost {len(lost)} extended properties: {lost}" + ) + + finally: + try: + await nc_client.calendar.delete_event(calendar_name, event_uid) + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")