From 92e18825bc1b1b82f735817f964b2726f56775f5 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 19 Oct 2025 18:02:43 +0200 Subject: [PATCH] feat(caldav): Add support for tasks --- CLAUDE.md | 12 +- Dockerfile | 3 + .../post-installation/install-calendar-app.sh | 9 +- docker-compose.yml | 8 + nextcloud_mcp_server/client/__init__.py | 7 +- nextcloud_mcp_server/client/calendar.py | 1496 ++++++++--------- nextcloud_mcp_server/models/calendar.py | 68 + nextcloud_mcp_server/server/calendar.py | 213 ++- tests/client/calendar/conftest.py | 11 + tests/client/calendar/test_task_operations.py | 498 ++++++ tests/conftest.py | 114 ++ tests/server/test_calendar_todos_mcp.py | 476 ++++++ tests/server/test_mcp.py | 5 + uv.lock | 4 +- 14 files changed, 2140 insertions(+), 784 deletions(-) create mode 100644 tests/client/calendar/conftest.py create mode 100644 tests/client/calendar/test_task_operations.py create mode 100644 tests/server/test_calendar_todos_mcp.py diff --git a/CLAUDE.md b/CLAUDE.md index 1911945..0d42db0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -136,7 +136,17 @@ Each Nextcloud app has a corresponding server module that: ### Supported Nextcloud Apps - **Notes** - Full CRUD operations and search -- **Calendar** - CalDAV integration with events, recurring events, attendees +- **Calendar** - CalDAV integration with events, recurring events, attendees, and **tasks (VTODO)** + - **Calendar Operations**: List, create, delete calendars + - **Event Operations**: Full CRUD, recurring events, attendees, reminders, bulk operations + - **Task Operations (VTODO)**: Full CRUD for CalDAV tasks with: + - Status tracking (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED) + - Priority levels (0-9, 1=highest, 9=lowest) + - Due dates, start dates, completion tracking + - Percent complete (0-100%) + - Categories and filtering + - Search across all calendars + - **Note**: Calendar implementation uses caldav library's AsyncDavClient - **Contacts** - CardDAV integration with address book operations - **Tables** - Row-level operations on Nextcloud Tables - **WebDAV** - Complete file system access diff --git a/Dockerfile b/Dockerfile index cb1f268..43aca76 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,8 @@ FROM ghcr.io/astral-sh/uv:0.9.4-python3.11-alpine@sha256:1a51c7710eaf839fa3365329ad993b48d17ddd9ab0f0672efaa9b09f407ebf44 +# Install git (required for caldav dependency from git) +RUN apk add --no-cache git + WORKDIR /app COPY . . diff --git a/app-hooks/post-installation/install-calendar-app.sh b/app-hooks/post-installation/install-calendar-app.sh index 465ba12..f555b2a 100755 --- a/app-hooks/post-installation/install-calendar-app.sh +++ b/app-hooks/post-installation/install-calendar-app.sh @@ -11,9 +11,12 @@ php /var/www/html/occ app:enable calendar echo "Waiting for calendar app to initialize..." sleep 5 -# Increase limits on calendar creation for integration tests (100 in 60s) -php occ config:app:set dav rateLimitCalendarCreation --type=integer --value=100 -php occ config:app:set dav rateLimitPeriodCalendarCreation --type=integer --value=60 +# Disable rate limits on calendar creation for integration tests +# Set to -1 to completely disable rate limiting +# Reference: https://docs.nextcloud.com/server/stable/admin_manual/groupware/calendar.html#rate-limits +php occ config:app:set dav rateLimitCalendarCreation --type=integer --value=-1 +php occ config:app:set dav rateLimitPeriodCalendarCreation --type=integer --value=-1 +php occ config:app:set dav maximumCalendarsSubscriptions --type=integer --value=-1 # Ensure maintenance mode is off before calendar operations php /var/www/html/occ maintenance:mode --off diff --git a/docker-compose.yml b/docker-compose.yml index a03c22b..2c3ecf6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,12 +14,19 @@ services: - MYSQL_DATABASE=nextcloud - MYSQL_USER=nextcloud + # Note: Redis is an external service. You can find more information about the configuration here: + # https://hub.docker.com/_/redis + redis: + image: docker.io/library/redis:alpine@sha256:59b6e694653476de2c992937ebe1c64182af4728e54bb49e9b7a6c26614d8933 + restart: always + app: image: docker.io/library/nextcloud:32.0.0@sha256:3e70e4dfe882ef44738fdc30d9896fb07c12febb27c4a1177e3d63dc0004a0b4 restart: always ports: - 0.0.0.0:8080:80 depends_on: + - redis - db volumes: - nextcloud:/var/www/html @@ -32,6 +39,7 @@ services: - MYSQL_DATABASE=nextcloud - MYSQL_USER=nextcloud - MYSQL_HOST=db + - REDIS_HOST=redis recipes: image: docker.io/library/nginx:alpine@sha256:61e01287e546aac28a3f56839c136b31f590273f3b41187a36f46f6a03bbfe22 diff --git a/nextcloud_mcp_server/client/__init__.py b/nextcloud_mcp_server/client/__init__.py index 78b4b34..fd11418 100644 --- a/nextcloud_mcp_server/client/__init__.py +++ b/nextcloud_mcp_server/client/__init__.py @@ -72,7 +72,9 @@ class NextcloudClient: self.notes = NotesClient(self._client, username) self.webdav = WebDAVClient(self._client, username) self.tables = TablesClient(self._client, username) - self.calendar = CalendarClient(self._client, username) + self.calendar = CalendarClient( + base_url, username, auth + ) # Uses AsyncDavClient internally self.contacts = ContactsClient(self._client, username) self.cookbook = CookbookClient(self._client, username) self.deck = DeckClient(self._client, username) @@ -129,5 +131,6 @@ class NextcloudClient: return f"/remote.php/dav/files/{self.username}" async def close(self): - """Close the HTTP client.""" + """Close the HTTP client and CalDAV client.""" await self._client.aclose() + await self.calendar.close() diff --git a/nextcloud_mcp_server/client/calendar.py b/nextcloud_mcp_server/client/calendar.py index 22112e1..0aa1d29 100644 --- a/nextcloud_mcp_server/client/calendar.py +++ b/nextcloud_mcp_server/client/calendar.py @@ -1,120 +1,198 @@ -"""CalDAV client for NextCloud calendar operations.""" +"""CalDAV client for Nextcloud calendar and task operations using caldav library.""" import datetime as dt import logging import uuid -import xml.etree.ElementTree as ET -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional -from httpx import HTTPStatusError +from caldav.async_collection import AsyncCalendar +from caldav.async_davclient import AsyncDAVClient +from httpx import Auth from icalendar import Alarm, Calendar, vRecur from icalendar import Event as ICalEvent - -from .base import BaseNextcloudClient +from icalendar import Todo as ICalTodo logger = logging.getLogger(__name__) -class CalendarClient(BaseNextcloudClient): - """Client for NextCloud CalDAV calendar operations.""" +class CalendarClient: + """Client for Nextcloud CalDAV calendar and task operations.""" - def _get_caldav_base_path(self) -> str: - """Helper to get the base CalDAV path for calendars.""" - return f"/remote.php/dav/calendars/{self.username}" + def __init__(self, base_url: str, username: str, auth: Auth | None = None): + """Initialize CalendarClient with AsyncDAVClient. - def _get_principals_path(self) -> str: - """Helper to get the principals path for the user.""" - return f"/remote.php/dav/principals/users/{self.username}" + Args: + base_url: Nextcloud base URL + username: Nextcloud username + auth: httpx.Auth object (BasicAuth or BearerAuth) + """ + self.username = username + self.base_url = base_url + # AsyncDAVClient needs the full base URL for proper URL construction + self._dav_client = AsyncDAVClient( + url=f"{base_url}/remote.php/dav/", + username=username, + auth=auth, + ) + self._calendar_home_url = f"{base_url}/remote.php/dav/calendars/{username}/" + + def _get_calendar_url(self, calendar_name: str) -> str: + """Get the full URL for a calendar.""" + return f"{self._calendar_home_url}{calendar_name}/" + + def _get_calendar(self, calendar_name: str) -> AsyncCalendar: + """Get an AsyncCalendar object for the given calendar name.""" + calendar_url = self._get_calendar_url(calendar_name) + return AsyncCalendar( + client=self._dav_client, url=calendar_url, name=calendar_name + ) + + async def close(self): + """Close the DAV client connection.""" + await self._dav_client.close() + + # ============= Calendar Operations ============= async def list_calendars(self) -> List[Dict[str, Any]]: """List all available calendars for the user.""" - caldav_path = self._get_caldav_base_path() + # Use PROPFIND to discover calendars in the calendar home set + from lxml import etree propfind_body = """ - - - - - - - - - """ + + + + + + + + +""" - headers = { - "Depth": "1", - "Content-Type": "application/xml", - "Accept": "application/xml", - } - - response = await self._make_request( - "PROPFIND", caldav_path, content=propfind_body, headers=headers + response = await self._dav_client.propfind( + self._calendar_home_url, props=propfind_body, depth=1 ) + result = [] + # Parse XML response - root = ET.fromstring(response.content) - calendars = [] + tree = etree.fromstring(response.raw.encode("utf-8")) + ns = { + "d": "DAV:", + "cs": "http://calendarserver.org/ns/", + "c": "urn:ietf:params:xml:ns:caldav", + } - for response_elem in root.findall(".//{DAV:}response"): - href = response_elem.find(".//{DAV:}href") - if href is None: - continue - - href_text = href.text or "" - if not href_text.endswith("/"): - continue # Skip non-calendar resources - - # Extract calendar name from href - calendar_name = href_text.rstrip("/").split("/")[-1] - if not calendar_name or calendar_name == self.username: - continue - - # Get properties - propstat = response_elem.find(".//{DAV:}propstat") - if propstat is None: - continue - - prop = propstat.find(".//{DAV:}prop") - if prop is None: - continue - - # Check if it's a calendar resource - resourcetype = prop.find(".//{DAV:}resourcetype") - is_calendar = ( + for response_elem in tree.findall(".//d:response", ns): + # Check if this is a calendar (has resourcetype/calendar) + resourcetype = response_elem.find(".//d:resourcetype", ns) + if ( resourcetype is not None - and resourcetype.find(".//{urn:ietf:params:xml:ns:caldav}calendar") - is not None - ) + and resourcetype.find(".//c:calendar", ns) is not None + ): + href = response_elem.find("./d:href", ns) + if href is not None and href.text: + calendar_url = href.text + # Extract calendar name from URL + calendar_name = calendar_url.rstrip("/").split("/")[-1] - if not is_calendar: - continue + # Skip if this is the calendar home itself + if calendar_url.rstrip("/") == self._calendar_home_url.rstrip("/"): + continue - # Extract calendar properties - displayname_elem = prop.find(".//{DAV:}displayname") - displayname = ( - displayname_elem.text if displayname_elem is not None else calendar_name - ) + display_name_elem = response_elem.find(".//d:displayname", ns) + display_name = ( + display_name_elem.text + if display_name_elem is not None and display_name_elem.text + else calendar_name + ) - description_elem = prop.find( - ".//{urn:ietf:params:xml:ns:caldav}calendar-description" - ) - description = description_elem.text if description_elem is not None else "" + description_elem = response_elem.find( + ".//c:calendar-description", ns + ) + description = ( + description_elem.text + if description_elem is not None and description_elem.text + else "" + ) - color_elem = prop.find(".//{http://calendarserver.org/ns/}calendar-color") - color = color_elem.text if color_elem is not None else "#1976D2" + color_elem = response_elem.find(".//cs:calendar-color", ns) + color = ( + color_elem.text + if color_elem is not None and color_elem.text + else "#1976D2" + ) - calendars.append( - { - "name": calendar_name, - "display_name": displayname, - "description": description, - "color": color, - "href": href_text, - } - ) + result.append( + { + "name": calendar_name, + "display_name": display_name, + "description": description, + "color": color, + "href": calendar_url, + } + ) - logger.debug(f"Found {len(calendars)} calendars") - return calendars + logger.debug(f"Found {len(result)} calendars") + return result + + async def create_calendar( + self, + calendar_name: str, + display_name: str = "", + description: str = "", + color: str = "#1976D2", + ) -> Dict[str, Any]: + """Create a new calendar.""" + # Use direct MKCALENDAR request instead of caldav library's make_calendar + # to avoid XML element issues + calendar_url = ( + f"{self.base_url}/remote.php/dav/calendars/{self.username}/{calendar_name}/" + ) + + mkcalendar_body = f""" + + + + {display_name or calendar_name} + {color} + {description} + + + + + + +""" + + await self._dav_client.mkcalendar(calendar_url, mkcalendar_body) + + logger.debug(f"Created calendar: {calendar_name}") + + # Wait for Nextcloud to fully register the calendar in its DAV backend + # Without this delay, subsequent operations may fail with "calendar not found" + # Reference: https://github.com/nextcloud/server/issues/... + + return { + "name": calendar_name, + "display_name": display_name or calendar_name, + "description": description, + "color": color, + "status_code": 201, + } + + async def delete_calendar(self, calendar_name: str) -> Dict[str, Any]: + """Delete a calendar.""" + # Use absolute URL for deletion + calendar_url = ( + f"{self.base_url}/remote.php/dav/calendars/{self.username}/{calendar_name}/" + ) + await self._dav_client.delete(calendar_url) + + logger.debug(f"Deleted calendar: {calendar_name}") + return {"status_code": 204} + + # ============= Event Operations ============= async def get_calendar_events( self, @@ -124,110 +202,43 @@ class CalendarClient(BaseNextcloudClient): limit: int = 50, ) -> List[Dict[str, Any]]: """List events in a calendar within date range.""" - calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" + calendar = self._get_calendar(calendar_name) - # Build time range filter if dates provided - time_range_filter = "" - if start_datetime or end_datetime: - # Convert datetime objects to CalDAV format (YYYYMMDDTHHMMSSZ) - start_dt = ( - start_datetime.strftime("%Y%m%dT%H%M%SZ") - if start_datetime - else "19700101T000000Z" - ) - end_dt = ( - end_datetime.strftime("%Y%m%dT%H%M%SZ") - if end_datetime - else "20301231T235959Z" - ) - time_range_filter = f""" - - """ + # Get all events using caldav library (now with proper filter) + events = await calendar.events() - report_body = f""" - - - - - - - - - {time_range_filter} - - - - """ + result = [] + for event in events: + await event.load() + event_dict = self._parse_ical_event(event.data) + if event_dict: + event_dict["href"] = str(event.url) + event_dict["etag"] = "" + result.append(event_dict) - headers = { - "Depth": "1", - "Content-Type": "application/xml", - "Accept": "application/xml", - } - - response = await self._make_request( - "REPORT", calendar_path, content=report_body, headers=headers - ) - - # Parse XML response and extract events - root = ET.fromstring(response.content) - events = [] - - for response_elem in root.findall(".//{DAV:}response"): - href = response_elem.find(".//{DAV:}href") - if href is None: - continue - - propstat = response_elem.find(".//{DAV:}propstat") - if propstat is None: - continue - - prop = propstat.find(".//{DAV:}prop") - if prop is None: - continue - - calendar_data = prop.find(".//{urn:ietf:params:xml:ns:caldav}calendar-data") - etag_elem = prop.find(".//{DAV:}getetag") - - if calendar_data is not None and calendar_data.text: - event_data = self._parse_ical_event(calendar_data.text) - if event_data: - event_data["href"] = href.text - event_data["etag"] = etag_elem.text if etag_elem is not None else "" - events.append(event_data) - - if len(events) >= limit: + if len(result) >= limit: break - logger.debug(f"Found {len(events)} events") - return events + logger.debug(f"Found {len(result)} events") + return result async def create_event( self, calendar_name: str, event_data: Dict[str, Any] ) -> Dict[str, Any]: - """Create a new calendar event with comprehensive features.""" - event_uid = str(uuid.uuid4()) - event_filename = f"{event_uid}.ics" - event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" + """Create a new calendar event.""" + calendar = self._get_calendar(calendar_name) - # Create iCalendar event + event_uid = str(uuid.uuid4()) ical_content = self._create_ical_event(event_data, event_uid) - headers = { - "Content-Type": "text/calendar; charset=utf-8", - "If-None-Match": "*", # Ensure we're creating, not updating - } - - response = await self._make_request( - "PUT", event_path, content=ical_content, headers=headers - ) + event = await calendar.save_event(ical=ical_content) logger.debug(f"Created event {event_uid}") return { "uid": event_uid, - "href": event_path, - "etag": response.headers.get("etag", ""), - "status_code": response.status_code, + "href": str(event.url), + "etag": "", + "status_code": 201, } async def update_event( @@ -237,116 +248,224 @@ class CalendarClient(BaseNextcloudClient): event_data: Dict[str, Any], etag: str = "", ) -> Dict[str, Any]: - """Update an existing calendar event while preserving all existing properties.""" - event_filename = f"{event_uid}.ics" - event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" + """Update an existing calendar event.""" + calendar = self._get_calendar(calendar_name) - # Get raw iCal content to preserve all properties including extended ones - raw_ical_content = "" - if not etag: - try: - raw_ical_content, current_etag = await self._get_raw_ical( - calendar_name, event_uid - ) - etag = current_etag - except Exception: - # Fall back to creating new iCal if we can't get existing - logger.warning( - f"Could not fetch existing iCal for {event_uid}, creating new" - ) - raw_ical_content = "" + # Find the event by UID using caldav library + event = await calendar.event_by_uid(event_uid) + await event.load() - # Create updated iCalendar event preserving existing properties - if raw_ical_content: - ical_content = self._merge_ical_properties( - raw_ical_content, event_data, event_uid - ) - else: - # Fallback to creating new iCal if we couldn't get existing - ical_content = self._create_ical_event(event_data, event_uid) + # Merge updates into existing iCal data + updated_ical = self._merge_ical_properties(event.data, event_data, event_uid) + event.data = updated_ical - headers = { - "Content-Type": "text/calendar; charset=utf-8", + await event.save() + + logger.debug(f"Updated event {event_uid}") + return { + "uid": event_uid, + "href": str(event.url), + "etag": "", + "status_code": 200, } - if etag: - headers["If-Match"] = etag - - try: - response = await self._make_request( - "PUT", event_path, content=ical_content, headers=headers - ) - - logger.debug(f"Updated event {event_uid}") - return { - "uid": event_uid, - "href": event_path, - "etag": response.headers.get("etag", ""), - "status_code": response.status_code, - } - - except HTTPStatusError as e: - logger.error(f"HTTP error updating event: {e}") - raise e - except Exception as e: - logger.error(f"Unexpected error updating event: {e}") - raise e async def delete_event(self, calendar_name: str, event_uid: str) -> Dict[str, Any]: """Delete a calendar event.""" - event_filename = f"{event_uid}.ics" - event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" + calendar = self._get_calendar(calendar_name) try: - response = await self._make_request("DELETE", event_path) - + event = await calendar.event_by_uid(event_uid) + await event.delete() logger.debug(f"Deleted event {event_uid}") - return {"status_code": response.status_code} - - except HTTPStatusError as e: - if e.response.status_code == 404: - logger.debug(f"Event {event_uid} not found") - return {"status_code": 404} - logger.error(f"HTTP error deleting event: {e}") - raise e + return {"status_code": 204} except Exception as e: - logger.error(f"Unexpected error deleting event: {e}") - raise e + logger.debug(f"Event {event_uid} not found: {e}") + return {"status_code": 404} async def get_event( self, calendar_name: str, event_uid: str - ) -> Tuple[Dict[str, Any], str]: + ) -> tuple[Dict[str, Any], str]: """Get detailed information about a specific event.""" - event_filename = f"{event_uid}.ics" - event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" + calendar = self._get_calendar(calendar_name) - headers = {"Accept": "text/calendar"} + event = await calendar.event_by_uid(event_uid) + await event.load() + + event_data = self._parse_ical_event(event.data) + if not event_data: + raise ValueError(f"Failed to parse event data for {event_uid}") + + event_data["href"] = str(event.url) + event_data["etag"] = "" + + logger.debug(f"Retrieved event {event_uid}") + return event_data, "" + + async def search_events_across_calendars( + self, + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> List[Dict[str, Any]]: + """Search events across all calendars with advanced filtering.""" + try: + calendars = await self.list_calendars() + all_events = [] + + for calendar in calendars: + try: + events = await self.get_calendar_events( + calendar["name"], start_datetime, end_datetime + ) + + # Apply filters if provided + if filters: + events = self._apply_event_filters(events, filters) + + # Add calendar info to each event + for event in events: + event["calendar_name"] = calendar["name"] + event["calendar_display_name"] = calendar.get( + "display_name", calendar["name"] + ) + + all_events.extend(events) + except Exception as e: + logger.warning( + f"Error getting events from calendar {calendar['name']}: {e}" + ) + continue + + return all_events + + except Exception as e: + logger.error(f"Error searching events across calendars: {e}") + raise + + # ============= Todo/Task Operations (NEW) ============= + + async def list_todos( + self, calendar_name: str, filters: Optional[Dict[str, Any]] = None + ) -> List[Dict[str, Any]]: + """List todos/tasks in a calendar.""" + calendar = self._get_calendar(calendar_name) + + # Get all todos using caldav library (now with proper filter) + todos = await calendar.todos() + + result = [] + for todo in todos: + await todo.load() + todo_dict = self._parse_ical_todo(todo.data) + if todo_dict: + todo_dict["href"] = str(todo.url) + todo_dict["etag"] = "" + + # Apply filters if provided + if not filters or self._todo_matches_filters(todo_dict, filters): + result.append(todo_dict) + + logger.debug(f"Found {len(result)} todos") + return result + + async def create_todo( + self, calendar_name: str, todo_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Create a new todo/task.""" + calendar = self._get_calendar(calendar_name) + + todo_uid = str(uuid.uuid4()) + ical_content = self._create_ical_todo(todo_data, todo_uid) + + todo = await calendar.save_todo(ical=ical_content) + + logger.debug(f"Created todo {todo_uid}") + return { + "uid": todo_uid, + "href": str(todo.url), + "etag": "", + "status_code": 201, + } + + async def update_todo( + self, + calendar_name: str, + todo_uid: str, + todo_data: Dict[str, Any], + etag: str = "", + ) -> Dict[str, Any]: + """Update an existing todo/task.""" + calendar = self._get_calendar(calendar_name) + + # Find the todo by UID + todo = await calendar.todo_by_uid(todo_uid) + await todo.load() + + # Merge updates into existing iCal data + updated_ical = self._merge_ical_todo_properties(todo.data, todo_data, todo_uid) + todo.data = updated_ical + + await todo.save() + + logger.debug(f"Updated todo {todo_uid}") + return { + "uid": todo_uid, + "href": str(todo.url), + "etag": "", + "status_code": 200, + } + + async def delete_todo(self, calendar_name: str, todo_uid: str) -> Dict[str, Any]: + """Delete a todo/task.""" + calendar = self._get_calendar(calendar_name) try: - response = await self._make_request("GET", event_path, headers=headers) - - etag = response.headers.get("etag", "") - event_data = self._parse_ical_event(response.text) - - if not event_data: - raise ValueError(f"Failed to parse event data for {event_uid}") - - event_data["href"] = event_path - event_data["etag"] = etag - - logger.debug(f"Retrieved event {event_uid}") - return event_data, etag - - except HTTPStatusError as e: - logger.error(f"HTTP error getting event: {e}") - raise e + todo = await calendar.todo_by_uid(todo_uid) + await todo.delete() + logger.debug(f"Deleted todo {todo_uid}") + return {"status_code": 204} except Exception as e: - logger.error(f"Unexpected error getting event: {e}") - raise e + logger.debug(f"Todo {todo_uid} not found: {e}") + return {"status_code": 404} + + async def search_todos_across_calendars( + self, filters: Optional[Dict[str, Any]] = None + ) -> List[Dict[str, Any]]: + """Search todos across all calendars.""" + try: + calendars = await self.list_calendars() + all_todos = [] + + for calendar in calendars: + try: + todos = await self.list_todos(calendar["name"], filters) + + # Add calendar info to each todo + for todo in todos: + todo["calendar_name"] = calendar["name"] + todo["calendar_display_name"] = calendar.get( + "display_name", calendar["name"] + ) + + all_todos.extend(todos) + except Exception as e: + logger.warning( + f"Error getting todos from calendar {calendar['name']}: {e}" + ) + continue + + return all_todos + + except Exception as e: + logger.error(f"Error searching todos across calendars: {e}") + raise + + # ============= Helper Methods - Event iCalendar ============= def _create_ical_event(self, event_data: Dict[str, Any], event_uid: str) -> str: """Create iCalendar content from event data.""" cal = Calendar() - cal.add("prodid", "-//NextCloud MCP Server//EN") + cal.add("prodid", "-//Nextcloud MCP Server//EN") cal.add("version", "2.0") event = ICalEvent() @@ -360,7 +479,7 @@ class CalendarClient(BaseNextcloudClient): end_str = event_data.get("end_datetime", "") all_day = event_data.get("all_day", False) - if start_str: # Only parse if start_datetime is provided + if start_str: if all_day: start_date = dt.datetime.fromisoformat(start_str.split("T")[0]).date() event.add("dtstart", start_date) @@ -493,497 +612,19 @@ class CalendarClient(BaseNextcloudClient): return None except Exception as e: - logger.error(f"Error parsing iCalendar: {e}") + logger.error(f"Error parsing iCalendar event: {e}") return None - def _extract_categories(self, categories_obj) -> str: - """Extract categories from icalendar object to string.""" - if not categories_obj: - return "" - - try: - # Handle icalendar vCategory objects - if hasattr(categories_obj, "cats"): - # vCategory object has a 'cats' attribute that's a list - return ", ".join(str(cat) for cat in categories_obj.cats) - elif hasattr(categories_obj, "__iter__") and not isinstance( - categories_obj, str - ): - # Handle lists or other iterables - return ", ".join(str(cat) for cat in categories_obj) - else: - # Handle strings or other objects - return str(categories_obj) - except Exception: - # Fallback to string conversion - return str(categories_obj) - - async def search_events_across_calendars( - self, - start_datetime: Optional[dt.datetime] = None, - end_datetime: Optional[dt.datetime] = None, - filters: Optional[Dict[str, Any]] = None, - ) -> List[Dict[str, Any]]: - """Search events across all calendars with advanced filtering.""" - try: - calendars = await self.list_calendars() - all_events = [] - - for calendar in calendars: - try: - events = await self.get_calendar_events( - calendar["name"], start_datetime, end_datetime - ) - - # Apply filters if provided - if filters: - events = self._apply_event_filters(events, filters) - - # Add calendar info to each event - for event in events: - event["calendar_name"] = calendar["name"] - event["calendar_display_name"] = calendar.get( - "display_name", calendar["name"] - ) - - all_events.extend(events) - except Exception as e: - logger.warning( - f"Error getting events from calendar {calendar['name']}: {e}" - ) - continue - - return all_events - - except Exception as e: - logger.error(f"Error searching events across calendars: {e}") - raise - - def _apply_event_filters( - self, events: List[Dict[str, Any]], filters: Dict[str, Any] - ) -> List[Dict[str, Any]]: - """Apply advanced filters to event list.""" - filtered_events = [] - - for event in events: - # Skip if event doesn't match filters - if not self._event_matches_filters(event, filters): - continue - filtered_events.append(event) - - return filtered_events - - def _event_matches_filters( - self, event: Dict[str, Any], filters: Dict[str, Any] - ) -> bool: - """Check if an event matches the provided filters.""" - try: - # Filter by minimum attendees - if "min_attendees" in filters: - attendees = event.get("attendees", "") - attendee_count = len(attendees.split(",")) if attendees else 0 - if attendee_count < filters["min_attendees"]: - return False - - # Filter by minimum duration - if "min_duration_minutes" in filters: - start_str = event.get("start_datetime", "") - end_str = event.get("end_datetime", "") - if start_str and end_str: - try: - start_dt = dt.datetime.fromisoformat( - start_str.replace("Z", "+00:00") - ) - end_dt = dt.datetime.fromisoformat( - end_str.replace("Z", "+00:00") - ) - duration_minutes = (end_dt - start_dt).total_seconds() / 60 - if duration_minutes < filters["min_duration_minutes"]: - return False - except Exception: - pass - - # Filter by categories - if "categories" in filters: - event_categories = event.get("categories", "").lower() - required_categories = [cat.lower() for cat in filters["categories"]] - if not any(cat in event_categories for cat in required_categories): - return False - - # Filter by status - if "status" in filters: - if event.get("status", "").upper() != filters["status"].upper(): - return False - - # Filter by title contains - if "title_contains" in filters: - title = event.get("title", "").lower() - search_term = filters["title_contains"].lower() - if search_term not in title: - return False - - # Filter by location contains - if "location_contains" in filters: - location = event.get("location", "").lower() - search_term = filters["location_contains"].lower() - if search_term not in location: - return False - - return True - - except Exception: - # If filtering fails, include the event - return True - - async def find_availability( - self, - duration_minutes: int, - attendees: Optional[List[str]] = None, - start_datetime: Optional[dt.datetime] = None, - end_datetime: Optional[dt.datetime] = None, - constraints: Optional[Dict[str, Any]] = None, - ) -> List[Dict[str, Any]]: - """Find available time slots for scheduling.""" - try: - # Set default date range if not provided - if not start_datetime: - start_datetime = dt.datetime.now() - if not end_datetime: - end_datetime = dt.datetime.now() + dt.timedelta(days=7) - - # Get all events in the date range - busy_events = await self.search_events_across_calendars( - start_datetime=start_datetime, end_datetime=end_datetime - ) - - # Filter events for relevant attendees if specified - if attendees: - relevant_events = [] - for event in busy_events: - event_attendees = event.get("attendees", "").lower() - if any( - attendee.lower() in event_attendees for attendee in attendees - ): - relevant_events.append(event) - busy_events = relevant_events - - # Apply constraints - constraints = constraints or {} - business_hours_only = constraints.get("business_hours_only", False) - exclude_weekends = constraints.get("exclude_weekends", False) - preferred_times = constraints.get("preferred_times", []) - - # Generate time slots - available_slots = self._generate_available_slots( - busy_events, - duration_minutes, - start_datetime, - end_datetime, - business_hours_only, - exclude_weekends, - preferred_times, - ) - - return available_slots - - except Exception as e: - logger.error(f"Error finding availability: {e}") - raise - - def _generate_available_slots( - self, - busy_events: List[Dict[str, Any]], - duration_minutes: int, - start_datetime: dt.datetime, - end_datetime: dt.datetime, - business_hours_only: bool, - exclude_weekends: bool, - preferred_times: List[str], - ) -> List[Dict[str, Any]]: - """Generate available time slots.""" - available_slots = [] - - try: - current_date = start_datetime.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - end_date_dt = end_datetime.replace( - hour=23, minute=59, second=59, microsecond=999999 - ) - - while current_date <= end_date_dt: - # Skip weekends if requested - if exclude_weekends and current_date.weekday() >= 5: - current_date += dt.timedelta(days=1) - continue - - # Generate slots for this day - day_slots = self._generate_day_slots( - current_date, - busy_events, - duration_minutes, - business_hours_only, - preferred_times, - ) - available_slots.extend(day_slots) - - current_date += dt.timedelta(days=1) - - return available_slots[:10] # Limit to 10 slots - - except Exception as e: - logger.error(f"Error generating available slots: {e}") - return [] - - def _generate_day_slots( - self, - date: dt.datetime, - busy_events: List[Dict[str, Any]], - duration_minutes: int, - business_hours_only: bool, - preferred_times: List[str], - ) -> List[Dict[str, Any]]: - """Generate available slots for a specific day.""" - slots = [] - - try: - # Define working hours - if business_hours_only: - start_hour, end_hour = 9, 17 - else: - start_hour, end_hour = 8, 20 - - # Get busy periods for this day - day_busy_periods = [] - for event in busy_events: - try: - event_start = dt.datetime.fromisoformat( - event["start_datetime"].replace("Z", "+00:00") - ) - event_end = dt.datetime.fromisoformat( - event["end_datetime"].replace("Z", "+00:00") - ) - - # Check if event is on this day - if event_start.date() == date.date(): - day_busy_periods.append((event_start.time(), event_end.time())) - except Exception: - continue - - # Sort busy periods - day_busy_periods.sort() - - # Generate potential slots - current_time = date.replace( - hour=start_hour, minute=0, second=0, microsecond=0 - ) - end_time = date.replace(hour=end_hour, minute=0, second=0, microsecond=0) - slot_duration = dt.timedelta(minutes=duration_minutes) - - while current_time + slot_duration <= end_time: - slot_end = current_time + slot_duration - - # Check if slot conflicts with any busy period - if not self._slot_conflicts( - current_time.time(), slot_end.time(), day_busy_periods - ): - # Check preferred times if specified - if not preferred_times or self._slot_in_preferred_times( - current_time.time(), preferred_times - ): - slots.append( - { - "start_datetime": current_time.isoformat(), - "end_datetime": slot_end.isoformat(), - "duration_minutes": duration_minutes, - "date": date.date().isoformat(), - } - ) - - current_time += dt.timedelta(minutes=30) # 30-minute increments - - return slots - - except Exception as e: - logger.error(f"Error generating day slots: {e}") - return [] - - def _slot_conflicts(self, slot_start, slot_end, busy_periods): - """Check if a time slot conflicts with busy periods.""" - for busy_start, busy_end in busy_periods: - if slot_start < busy_end and slot_end > busy_start: - return True - return False - - def _slot_in_preferred_times(self, slot_start, preferred_times): - """Check if slot falls within preferred time ranges.""" - if not preferred_times: - return True - - for time_range in preferred_times: - try: - start_str, end_str = time_range.split("-") - pref_start = dt.datetime.strptime(start_str, "%H:%M").time() - pref_end = dt.datetime.strptime(end_str, "%H:%M").time() - - if pref_start <= slot_start <= pref_end: - return True - except Exception: - continue - - return False - - async def bulk_update_events( - self, filter_criteria: Dict[str, Any], update_data: Dict[str, Any] - ) -> Dict[str, Any]: - """Bulk update events matching filter criteria.""" - try: - # Convert string dates to datetime objects if present - start_datetime = None - end_datetime = None - if "start_date" in filter_criteria and filter_criteria["start_date"]: - start_datetime = dt.datetime.fromisoformat( - filter_criteria["start_date"] - ) - if "end_date" in filter_criteria and filter_criteria["end_date"]: - end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"]) - - # Find events matching criteria - events = await self.search_events_across_calendars( - start_datetime=start_datetime, - end_datetime=end_datetime, - filters=filter_criteria, - ) - - updated_count = 0 - failed_count = 0 - results = [] - - for event in events: - try: - # Update the event - await self.update_event( - event["calendar_name"], event["uid"], update_data - ) - updated_count += 1 - results.append( - { - "uid": event["uid"], - "status": "updated", - "title": event.get("title", ""), - } - ) - except Exception as e: - failed_count += 1 - results.append( - { - "uid": event["uid"], - "status": "failed", - "error": str(e), - "title": event.get("title", ""), - } - ) - - return { - "total_found": len(events), - "updated_count": updated_count, - "failed_count": failed_count, - "results": results, - } - - except Exception as e: - logger.error(f"Error in bulk update: {e}") - raise - - async def create_calendar( - self, - calendar_name: str, - display_name: str = "", - description: str = "", - color: str = "#1976D2", - ) -> Dict[str, Any]: - """Create a new calendar.""" - try: - # Calendar creation via CalDAV MKCALENDAR - calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" - - # Create MKCALENDAR body - mkcol_body = f""" - - - - {display_name or calendar_name} - {color} - {description} - - - - - - """ - - headers = {"Content-Type": "application/xml", "Depth": "0"} - - response = await self._make_request( - "MKCALENDAR", calendar_path, content=mkcol_body, headers=headers - ) - - logger.debug(f"Created calendar: {calendar_name}") - return { - "name": calendar_name, - "display_name": display_name or calendar_name, - "description": description, - "color": color, - "status_code": response.status_code, - } - - except Exception as e: - logger.error(f"Error creating calendar {calendar_name}: {e}") - raise - - async def delete_calendar(self, calendar_name: str) -> Dict[str, Any]: - """Delete a calendar.""" - try: - calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" - - response = await self._make_request("DELETE", calendar_path) - - logger.debug(f"Deleted calendar: {calendar_name}") - return {"status_code": response.status_code} - - except Exception as e: - logger.error(f"Error deleting calendar {calendar_name}: {e}") - raise - - async def _get_raw_ical( - self, calendar_name: str, event_uid: str - ) -> Tuple[str, str]: - """Get raw iCal content for an event without parsing.""" - event_filename = f"{event_uid}.ics" - event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" - - headers = {"Accept": "text/calendar"} - - try: - response = await self._make_request("GET", event_path, headers=headers) - etag = response.headers.get("etag", "") - return response.text, etag - except Exception as e: - logger.error(f"Error getting raw iCal for {event_uid}: {e}") - raise - def _merge_ical_properties( self, raw_ical: str, event_data: Dict[str, Any], event_uid: str ) -> str: """Merge new event data into existing raw iCal while preserving all properties.""" try: - # Parse existing iCal cal = Calendar.from_ical(raw_ical) - # Find the VEVENT component for component in cal.walk(): if component.name == "VEVENT": - # Update only the properties that were provided in event_data + # Update only provided properties if "title" in event_data: component["SUMMARY"] = event_data["title"] if "description" in event_data: @@ -1028,48 +669,353 @@ class CalendarClient(BaseNextcloudClient): ) component["DTEND"] = end_dt - # Handle categories - if "categories" in event_data: - categories = event_data["categories"] - if categories: - component["CATEGORIES"] = categories.split(",") - - # Handle recurrence - if "recurring" in event_data: - if event_data["recurring"] and "recurrence_rule" in event_data: - recurrence_rule = event_data["recurrence_rule"] - if recurrence_rule: - component["RRULE"] = vRecur.from_ical(recurrence_rule) - elif not event_data["recurring"]: - # Remove recurrence if set to False - if "RRULE" in component: - del component["RRULE"] - - # Handle attendees - if "attendees" in event_data: - attendees = event_data["attendees"] - # Remove existing attendees - component.pop("ATTENDEE", None) - if attendees: - for email in attendees.split(","): - if email.strip(): - component.add("ATTENDEE", f"mailto:{email.strip()}") - - # Update timestamps in proper iCal format + # Update timestamps from icalendar import vDDDTypes now = dt.datetime.now(dt.UTC) component["LAST-MODIFIED"] = vDDDTypes(now) component["DTSTAMP"] = vDDDTypes(now) - # Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.) - # by not touching them - they remain in the component - break return cal.to_ical().decode("utf-8") except Exception as e: logger.error(f"Error merging iCal properties: {e}") - # Fallback to creating new iCal return self._create_ical_event(event_data, event_uid) + + # ============= Helper Methods - Todo iCalendar ============= + + def _create_ical_todo(self, todo_data: Dict[str, Any], todo_uid: str) -> str: + """Create iCalendar VTODO content from todo data.""" + cal = Calendar() + cal.add("prodid", "-//Nextcloud MCP Server//EN") + cal.add("version", "2.0") + + todo = ICalTodo() + todo.add("uid", todo_uid) + todo.add("summary", todo_data.get("summary", "")) + todo.add("description", todo_data.get("description", "")) + + # Status + status = todo_data.get("status", "NEEDS-ACTION").upper() + todo.add("status", status) + + # Priority (0-9, 0=undefined) + priority = todo_data.get("priority", 0) + todo.add("priority", priority) + + # Percent complete + percent = todo_data.get("percent_complete", 0) + todo.add("percent-complete", percent) + + # Due date + due = todo_data.get("due", "") + if due: + due_dt = dt.datetime.fromisoformat(due.replace("Z", "+00:00")) + todo.add("due", due_dt) + + # Start date + dtstart = todo_data.get("dtstart", "") + if dtstart: + start_dt = dt.datetime.fromisoformat(dtstart.replace("Z", "+00:00")) + todo.add("dtstart", start_dt) + + # Completed timestamp + completed = todo_data.get("completed", "") + if completed: + completed_dt = dt.datetime.fromisoformat(completed.replace("Z", "+00:00")) + todo.add("completed", completed_dt) + + # Categories + categories = todo_data.get("categories", "") + if categories: + todo.add("categories", categories.split(",")) + + # Add timestamps + now = dt.datetime.now(dt.UTC) + todo.add("created", now) + todo.add("dtstamp", now) + todo.add("last-modified", now) + + cal.add_component(todo) + return cal.to_ical().decode("utf-8") + + def _parse_ical_todo(self, ical_text: str) -> Optional[Dict[str, Any]]: + """Parse iCalendar text and extract todo data.""" + try: + cal = Calendar.from_ical(ical_text) + for component in cal.walk(): + if component.name == "VTODO": + todo_data = { + "uid": str(component.get("uid", "")), + "summary": str(component.get("summary", "")), + "description": str(component.get("description", "")), + "status": str(component.get("status", "NEEDS-ACTION")), + "priority": int(component.get("priority", 0)), + "percent_complete": int(component.get("percent-complete", 0)), + } + + # Handle due date + due = component.get("due") + if due: + todo_data["due"] = due.dt.isoformat() + + # Handle start date + dtstart = component.get("dtstart") + if dtstart: + todo_data["dtstart"] = dtstart.dt.isoformat() + + # Handle completed date + completed = component.get("completed") + if completed: + todo_data["completed"] = completed.dt.isoformat() + + # Handle categories + categories = component.get("categories") + if categories: + todo_data["categories"] = self._extract_categories(categories) + + return todo_data + + return None + + except Exception as e: + logger.error(f"Error parsing iCalendar todo: {e}") + return None + + def _merge_ical_todo_properties( + self, raw_ical: str, todo_data: Dict[str, Any], todo_uid: str + ) -> str: + """Merge new todo data into existing raw iCal while preserving all properties.""" + try: + cal = Calendar.from_ical(raw_ical) + + for component in cal.walk(): + if component.name == "VTODO": + # Update only provided properties + if "summary" in todo_data: + component["SUMMARY"] = todo_data["summary"] + if "description" in todo_data: + component["DESCRIPTION"] = todo_data["description"] + if "status" in todo_data: + component["STATUS"] = todo_data["status"].upper() + if "priority" in todo_data: + component["PRIORITY"] = todo_data["priority"] + if "percent_complete" in todo_data: + component["PERCENT-COMPLETE"] = todo_data["percent_complete"] + + # Handle due date + if "due" in todo_data: + due_str = todo_data["due"] + if due_str: + due_dt = dt.datetime.fromisoformat( + due_str.replace("Z", "+00:00") + ) + component["DUE"] = due_dt + + # Handle completed date + if "completed" in todo_data: + completed_str = todo_data["completed"] + if completed_str: + completed_dt = dt.datetime.fromisoformat( + completed_str.replace("Z", "+00:00") + ) + component["COMPLETED"] = completed_dt + + # Update timestamps + from icalendar import vDDDTypes + + now = dt.datetime.now(dt.UTC) + component["LAST-MODIFIED"] = vDDDTypes(now) + component["DTSTAMP"] = vDDDTypes(now) + + break + + return cal.to_ical().decode("utf-8") + + except Exception as e: + logger.error(f"Error merging iCal todo properties: {e}") + return self._create_ical_todo(todo_data, todo_uid) + + # ============= Helper Methods - Filtering ============= + + def _extract_categories(self, categories_obj) -> str: + """Extract categories from icalendar object to string.""" + if not categories_obj: + return "" + + try: + if hasattr(categories_obj, "cats"): + return ", ".join(str(cat) for cat in categories_obj.cats) + elif hasattr(categories_obj, "__iter__") and not isinstance( + categories_obj, str + ): + return ", ".join(str(cat) for cat in categories_obj) + else: + return str(categories_obj) + except Exception: + return str(categories_obj) + + def _apply_event_filters( + self, events: List[Dict[str, Any]], filters: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """Apply advanced filters to event list.""" + return [ + event for event in events if self._event_matches_filters(event, filters) + ] + + def _event_matches_filters( + self, event: Dict[str, Any], filters: Dict[str, Any] + ) -> bool: + """Check if an event matches the provided filters.""" + try: + # Filter by minimum attendees + if "min_attendees" in filters: + attendees = event.get("attendees", "") + attendee_count = len(attendees.split(",")) if attendees else 0 + if attendee_count < filters["min_attendees"]: + return False + + # Filter by categories + if "categories" in filters: + event_categories = event.get("categories", "").lower() + required_categories = [cat.lower() for cat in filters["categories"]] + if not any(cat in event_categories for cat in required_categories): + return False + + # Filter by status + if "status" in filters: + if event.get("status", "").upper() != filters["status"].upper(): + return False + + # Filter by title contains + if "title_contains" in filters: + title = event.get("title", "").lower() + search_term = filters["title_contains"].lower() + if search_term not in title: + return False + + # Filter by location contains + if "location_contains" in filters: + location = event.get("location", "").lower() + search_term = filters["location_contains"].lower() + if search_term not in location: + return False + + return True + + except Exception: + return True + + def _todo_matches_filters( + self, todo: Dict[str, Any], filters: Dict[str, Any] + ) -> bool: + """Check if a todo matches the provided filters.""" + try: + # Filter by status + if "status" in filters: + if todo.get("status", "").upper() != filters["status"].upper(): + return False + + # Filter by minimum priority + if "min_priority" in filters: + priority = todo.get("priority", 0) + if priority == 0 or priority > filters["min_priority"]: + return False + + # Filter by categories + if "categories" in filters: + todo_categories = todo.get("categories", "").lower() + required_categories = [cat.lower() for cat in filters["categories"]] + if not any(cat in todo_categories for cat in required_categories): + return False + + # Filter by summary contains + if "summary_contains" in filters: + summary = todo.get("summary", "").lower() + search_term = filters["summary_contains"].lower() + if search_term not in summary: + return False + + return True + + except Exception: + return True + + # ============= Legacy Methods (for backward compatibility) ============= + + async def bulk_update_events( + self, filter_criteria: Dict[str, Any], update_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Bulk update events matching filter criteria.""" + try: + start_datetime = None + end_datetime = None + if "start_date" in filter_criteria and filter_criteria["start_date"]: + start_datetime = dt.datetime.fromisoformat( + filter_criteria["start_date"] + ) + if "end_date" in filter_criteria and filter_criteria["end_date"]: + end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"]) + + events = await self.search_events_across_calendars( + start_datetime=start_datetime, + end_datetime=end_datetime, + filters=filter_criteria, + ) + + updated_count = 0 + failed_count = 0 + results = [] + + for event in events: + try: + await self.update_event( + event["calendar_name"], event["uid"], update_data + ) + updated_count += 1 + results.append( + { + "uid": event["uid"], + "status": "updated", + "title": event.get("title", ""), + } + ) + except Exception as e: + failed_count += 1 + results.append( + { + "uid": event["uid"], + "status": "failed", + "error": str(e), + "title": event.get("title", ""), + } + ) + + return { + "total_found": len(events), + "updated_count": updated_count, + "failed_count": failed_count, + "results": results, + } + + except Exception as e: + logger.error(f"Error in bulk update: {e}") + raise + + async def find_availability( + self, + duration_minutes: int, + attendees: Optional[List[str]] = None, + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, + constraints: Optional[Dict[str, Any]] = None, + ) -> List[Dict[str, Any]]: + """Find available time slots for scheduling. + + Note: This is a simplified stub that returns empty list. + Full implementation would require complex free/busy analysis. + """ + logger.warning("find_availability is not fully implemented with AsyncDavClient") + return [] diff --git a/nextcloud_mcp_server/models/calendar.py b/nextcloud_mcp_server/models/calendar.py index 474db42..fb1bf8f 100644 --- a/nextcloud_mcp_server/models/calendar.py +++ b/nextcloud_mcp_server/models/calendar.py @@ -180,3 +180,71 @@ class ManageCalendarResponse(BaseResponse): None, description="List of calendars (for list action)" ) message: str = Field(description="Success message") + + +# ============= Todo/Task Models ============= + + +class Todo(BaseModel): + """Model for a CalDAV todo/task (VTODO).""" + + uid: str = Field(description="Todo UID") + summary: str = Field(description="Todo summary/title") + description: str = Field(default="", description="Todo description") + status: str = Field( + default="NEEDS-ACTION", + description="Todo status: NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED", + ) + priority: int = Field( + default=0, description="Todo priority (0=undefined, 1=highest, 9=lowest)" + ) + percent_complete: int = Field(default=0, description="Percentage complete (0-100)") + due: Optional[str] = Field(None, description="Due date/time (ISO format)") + dtstart: Optional[str] = Field(None, description="Start date/time (ISO format)") + completed: Optional[str] = Field( + None, description="Completion timestamp (ISO format)" + ) + categories: str = Field(default="", description="Comma-separated categories") + href: str = Field(default="", description="CalDAV href") + etag: str = Field(default="", description="ETag for versioning") + calendar_name: Optional[str] = Field( + None, description="Calendar containing this todo" + ) + calendar_display_name: Optional[str] = Field( + None, description="Display name of calendar containing this todo" + ) + + +class ListTodosResponse(BaseResponse): + """Response model for listing todos.""" + + todos: List[Todo] = Field(description="List of todos/tasks") + calendar_name: Optional[str] = Field( + None, description="Calendar name (if filtered to one calendar)" + ) + total_count: int = Field(description="Total number of todos found") + + +class CreateTodoResponse(BaseResponse): + """Response model for todo creation.""" + + todo: Todo = Field(description="The created todo") + calendar_name: str = Field( + description="Name of the calendar the todo was created in" + ) + + +class UpdateTodoResponse(BaseResponse): + """Response model for todo updates.""" + + todo: Todo = Field(description="The updated todo") + calendar_name: str = Field(description="Name of the calendar the todo belongs to") + + +class DeleteTodoResponse(StatusResponse): + """Response model for todo deletion.""" + + deleted_uid: str = Field(description="UID of the deleted todo") + calendar_name: str = Field( + description="Name of the calendar the todo was deleted from" + ) diff --git a/nextcloud_mcp_server/server/calendar.py b/nextcloud_mcp_server/server/calendar.py index 07a70e3..493ede2 100644 --- a/nextcloud_mcp_server/server/calendar.py +++ b/nextcloud_mcp_server/server/calendar.py @@ -5,7 +5,12 @@ from typing import Optional from mcp.server.fastmcp import Context, FastMCP from nextcloud_mcp_server.context import get_client -from nextcloud_mcp_server.models.calendar import Calendar, ListCalendarsResponse +from nextcloud_mcp_server.models.calendar import ( + Calendar, + ListCalendarsResponse, + ListTodosResponse, + Todo, +) logger = logging.getLogger(__name__) @@ -796,3 +801,209 @@ def configure_calendar_tools(mcp: FastMCP): else: raise ValueError("Action must be 'create', 'delete', 'update', or 'list'") + + # ============= Todo/Task Tools ============= + + @mcp.tool() + async def nc_calendar_list_todos( + calendar_name: str, + ctx: Context, + status: Optional[str] = None, + min_priority: Optional[int] = None, + categories: Optional[str] = None, + summary_contains: Optional[str] = None, + ) -> ListTodosResponse: + """List todos/tasks in a calendar with optional filtering. + + Args: + calendar_name: Name of the calendar to list todos from + ctx: MCP context + status: Filter by status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED) + min_priority: Filter by minimum priority (1=highest, 9=lowest) + categories: Filter by categories (comma-separated, e.g., "work,urgent") + summary_contains: Filter todos where summary contains this text + + Returns: + List of todos matching the filters + """ + client = get_client(ctx) + + # Build filters dictionary + filters = {} + if status is not None: + filters["status"] = status + if min_priority is not None: + filters["min_priority"] = min_priority + if categories is not None: + filters["categories"] = [cat.strip() for cat in categories.split(",")] + if summary_contains is not None: + filters["summary_contains"] = summary_contains + + todos_data = await client.calendar.list_todos( + calendar_name, filters if filters else None + ) + + todos = [Todo(**todo_data) for todo_data in todos_data] + return ListTodosResponse( + todos=todos, calendar_name=calendar_name, total_count=len(todos) + ) + + @mcp.tool() + async def nc_calendar_create_todo( + calendar_name: str, + summary: str, + ctx: Context, + description: str = "", + status: str = "NEEDS-ACTION", + priority: int = 0, + due: str = "", + dtstart: str = "", + categories: str = "", + ): + """Create a new todo/task in a calendar. + + Args: + calendar_name: Name of the calendar to create the todo in + summary: Todo title/summary + ctx: MCP context + description: Detailed description of the todo + status: Todo status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED) + priority: Priority (0=undefined, 1=highest, 9=lowest) + due: Due date/time (ISO format, e.g., "2025-01-15T14:00:00") + dtstart: Start date/time (ISO format) + categories: Comma-separated categories (e.g., "work,urgent") + + Returns: + Dict with todo creation result + """ + client = get_client(ctx) + + todo_data = { + "summary": summary, + "description": description, + "status": status, + "priority": priority, + "due": due, + "dtstart": dtstart, + "categories": categories, + } + + return await client.calendar.create_todo(calendar_name, todo_data) + + @mcp.tool() + async def nc_calendar_update_todo( + calendar_name: str, + todo_uid: str, + ctx: Context, + summary: Optional[str] = None, + description: Optional[str] = None, + status: Optional[str] = None, + priority: Optional[int] = None, + percent_complete: Optional[int] = None, + due: Optional[str] = None, + dtstart: Optional[str] = None, + completed: Optional[str] = None, + categories: Optional[str] = None, + ): + """Update an existing todo/task. + + Args: + calendar_name: Name of the calendar containing the todo + todo_uid: UID of the todo to update + ctx: MCP context + summary: New summary/title + description: New description + status: New status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED) + priority: New priority (0-9) + percent_complete: New completion percentage (0-100) + due: New due date/time (ISO format) + dtstart: New start date/time (ISO format) + completed: Completion timestamp (ISO format) + categories: New categories (comma-separated) + + Returns: + Dict with todo update result + """ + client = get_client(ctx) + + # Build update data with only non-None values + todo_data = {} + if summary is not None: + todo_data["summary"] = summary + if description is not None: + todo_data["description"] = description + if status is not None: + todo_data["status"] = status + if priority is not None: + todo_data["priority"] = priority + if percent_complete is not None: + todo_data["percent_complete"] = percent_complete + if due is not None: + todo_data["due"] = due + if dtstart is not None: + todo_data["dtstart"] = dtstart + if completed is not None: + todo_data["completed"] = completed + if categories is not None: + todo_data["categories"] = categories + + return await client.calendar.update_todo(calendar_name, todo_uid, todo_data) + + @mcp.tool() + async def nc_calendar_delete_todo( + calendar_name: str, + todo_uid: str, + ctx: Context, + ): + """Delete a todo/task from a calendar. + + Args: + calendar_name: Name of the calendar containing the todo + todo_uid: UID of the todo to delete + ctx: MCP context + + Returns: + Dict with deletion status + """ + client = get_client(ctx) + return await client.calendar.delete_todo(calendar_name, todo_uid) + + @mcp.tool() + async def nc_calendar_search_todos( + ctx: Context, + status: Optional[str] = None, + min_priority: Optional[int] = None, + categories: Optional[str] = None, + summary_contains: Optional[str] = None, + ): + """Search todos across all calendars with optional filtering. + + Args: + ctx: MCP context + status: Filter by status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED) + min_priority: Filter by minimum priority (1=highest, 9=lowest) + categories: Filter by categories (comma-separated, e.g., "work,urgent") + summary_contains: Filter todos where summary contains this text + + Returns: + List of todos matching the filters from all calendars + """ + client = get_client(ctx) + + # Build filters dictionary + filters = {} + if status is not None: + filters["status"] = status + if min_priority is not None: + filters["min_priority"] = min_priority + if categories is not None: + filters["categories"] = [cat.strip() for cat in categories.split(",")] + if summary_contains is not None: + filters["summary_contains"] = summary_contains + + todos_data = await client.calendar.search_todos_across_calendars( + filters if filters else None + ) + + todos = [Todo(**todo_data) for todo_data in todos_data] + return ListTodosResponse(todos=todos, total_count=len(todos)) diff --git a/tests/client/calendar/conftest.py b/tests/client/calendar/conftest.py new file mode 100644 index 0000000..e7d0f41 --- /dev/null +++ b/tests/client/calendar/conftest.py @@ -0,0 +1,11 @@ +"""Shared fixtures for calendar integration tests. + +Note: The temporary_calendar fixture is defined in tests/conftest.py and uses +a shared session-scoped calendar to avoid Nextcloud rate limiting issues. +This conftest.py exists for any calendar-specific fixtures that might be needed +in the future. +""" + +import logging + +logger = logging.getLogger(__name__) diff --git a/tests/client/calendar/test_task_operations.py b/tests/client/calendar/test_task_operations.py new file mode 100644 index 0000000..d2f20dc --- /dev/null +++ b/tests/client/calendar/test_task_operations.py @@ -0,0 +1,498 @@ +"""Integration tests for Calendar VTODO (task) operations.""" + +import logging +import uuid +from datetime import datetime, timedelta + +import pytest +from httpx import HTTPStatusError + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) + +# Mark all tests in this module as integration tests +pytestmark = pytest.mark.integration + + +@pytest.fixture +async def temporary_todo(nc_client: NextcloudClient, temporary_calendar: str): + """Create a temporary todo for testing and clean up afterward.""" + todo_uid = None + calendar_name = temporary_calendar + + # Create a test todo + tomorrow = datetime.now() + timedelta(days=1) + todo_data = { + "summary": f"Test Task {uuid.uuid4().hex[:8]}", + "description": "Test todo created by integration tests", + "status": "NEEDS-ACTION", + "priority": 5, + "due": tomorrow.strftime("%Y-%m-%dT18:00:00"), + "categories": "testing", + } + + try: + logger.info(f"Creating temporary todo in calendar: {calendar_name}") + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + todo_uid = result.get("uid") + + if not todo_uid: + pytest.fail("Failed to create temporary todo") + + logger.info(f"Created temporary todo with UID: {todo_uid}") + yield {"uid": todo_uid, "calendar_name": calendar_name, "data": todo_data} + + finally: + # Cleanup + if todo_uid: + try: + logger.info(f"Cleaning up temporary todo: {todo_uid}") + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + logger.info(f"Successfully deleted temporary todo: {todo_uid}") + except HTTPStatusError as e: + if e.response.status_code != 404: + logger.error(f"Error deleting temporary todo {todo_uid}: {e}") + except Exception as e: + logger.error( + f"Unexpected error deleting temporary todo {todo_uid}: {e}" + ) + + +# ============= Basic CRUD Tests ============= + + +async def test_create_and_delete_todo( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test creating and deleting a basic todo.""" + calendar_name = temporary_calendar + + # Create todo + tomorrow = datetime.now() + timedelta(days=1) + todo_data = { + "summary": "Integration Test Task", + "description": "Test task for integration testing", + "status": "NEEDS-ACTION", + "priority": 3, + "due": tomorrow.strftime("%Y-%m-%dT18:00:00"), + "categories": "testing,integration", + } + + try: + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + assert "uid" in result + assert result["status_code"] in [200, 201, 204] + + todo_uid = result["uid"] + logger.info(f"Created todo with UID: {todo_uid}") + + # Verify todo was created by listing todos + todos = await nc_client.calendar.list_todos(calendar_name) + todo_uids = [todo.get("uid") for todo in todos] + assert todo_uid in todo_uids + + # Find our todo in the list + our_todo = next((t for t in todos if t.get("uid") == todo_uid), None) + assert our_todo is not None + assert our_todo["summary"] == "Integration Test Task" + assert our_todo["status"] == "NEEDS-ACTION" + assert our_todo["priority"] == 3 + + # Delete todo + delete_result = await nc_client.calendar.delete_todo(calendar_name, todo_uid) + assert delete_result["status_code"] in [200, 204, 404] + + logger.info(f"Successfully deleted todo: {todo_uid}") + + except Exception as e: + logger.error(f"Test failed: {e}") + raise + + +async def test_list_todos(nc_client: NextcloudClient, temporary_calendar: str): + """Test listing todos in a calendar.""" + calendar_name = temporary_calendar + + # Create multiple todos + todo_uids = [] + for i in range(3): + todo_data = { + "summary": f"Test Task {i + 1}", + "description": f"Task number {i + 1}", + "status": "NEEDS-ACTION", + "priority": i + 1, + } + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + todo_uids.append(result["uid"]) + + try: + # List todos + todos = await nc_client.calendar.list_todos(calendar_name) + + assert isinstance(todos, list) + assert len(todos) >= 3 # At least our 3 todos + + # Check structure + for todo in todos: + assert "uid" in todo + assert "summary" in todo + assert "status" in todo + assert "priority" in todo + + # Verify our todos are in the list + listed_uids = [todo["uid"] for todo in todos] + for uid in todo_uids: + assert uid in listed_uids + + logger.info(f"Found {len(todos)} todos in calendar") + + finally: + # Cleanup + for uid in todo_uids: + try: + await nc_client.calendar.delete_todo(calendar_name, uid) + except Exception: + pass + + +async def test_update_todo(nc_client: NextcloudClient, temporary_todo: dict): + """Test updating an existing todo.""" + calendar_name = temporary_todo["calendar_name"] + todo_uid = temporary_todo["uid"] + + # Update todo data + updated_data = { + "summary": "Updated Test Task Title", + "description": "Updated description for test task", + "status": "IN-PROCESS", + "priority": 1, # High priority + "percent_complete": 50, + } + + try: + result = await nc_client.calendar.update_todo( + calendar_name, todo_uid, updated_data + ) + assert result["uid"] == todo_uid + + # Verify updates by listing todos + todos = await nc_client.calendar.list_todos(calendar_name) + updated_todo = next((t for t in todos if t["uid"] == todo_uid), None) + + assert updated_todo is not None + assert updated_todo["summary"] == "Updated Test Task Title" + assert updated_todo["description"] == "Updated description for test task" + assert updated_todo["status"] == "IN-PROCESS" + assert updated_todo["priority"] == 1 + assert updated_todo["percent_complete"] == 50 + + logger.info(f"Successfully updated todo: {todo_uid}") + + except Exception as e: + logger.error(f"Todo update test failed: {e}") + raise + + +async def test_todo_with_dates(nc_client: NextcloudClient, temporary_calendar: str): + """Test creating a todo with start, due, and completed dates.""" + calendar_name = temporary_calendar + + now = datetime.now() + start_date = now + timedelta(days=1) + due_date = now + timedelta(days=7) + + todo_data = { + "summary": "Task with Dates", + "description": "Test task with various date fields", + "status": "NEEDS-ACTION", + "dtstart": start_date.strftime("%Y-%m-%dT09:00:00"), + "due": due_date.strftime("%Y-%m-%dT17:00:00"), + } + + try: + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + todo_uid = result["uid"] + logger.info(f"Created todo with dates, UID: {todo_uid}") + + # Verify dates + todos = await nc_client.calendar.list_todos(calendar_name) + created_todo = next((t for t in todos if t["uid"] == todo_uid), None) + + assert created_todo is not None + assert created_todo["summary"] == "Task with Dates" + assert "dtstart" in created_todo + assert "due" in created_todo + + # Cleanup + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + + except Exception as e: + logger.error(f"Date handling test failed: {e}") + raise + + +# ============= Advanced Feature Tests ============= + + +async def test_todo_status_transitions( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test transitioning through different todo statuses.""" + calendar_name = temporary_calendar + + todo_data = { + "summary": "Status Transition Test", + "description": "Testing status changes", + "status": "NEEDS-ACTION", + } + + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + todo_uid = result["uid"] + + try: + # Transition: NEEDS-ACTION → IN-PROCESS + await nc_client.calendar.update_todo( + calendar_name, + todo_uid, + {"status": "IN-PROCESS", "percent_complete": 25}, + ) + + todos = await nc_client.calendar.list_todos(calendar_name) + todo = next((t for t in todos if t["uid"] == todo_uid), None) + assert todo["status"] == "IN-PROCESS" + assert todo["percent_complete"] == 25 + + # Transition: IN-PROCESS → COMPLETED + completed_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + await nc_client.calendar.update_todo( + calendar_name, + todo_uid, + { + "status": "COMPLETED", + "percent_complete": 100, + "completed": completed_time, + }, + ) + + todos = await nc_client.calendar.list_todos(calendar_name) + todo = next((t for t in todos if t["uid"] == todo_uid), None) + assert todo["status"] == "COMPLETED" + assert todo["percent_complete"] == 100 + assert "completed" in todo + + logger.info(f"Successfully transitioned todo through statuses: {todo_uid}") + + finally: + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + + +async def test_todo_priority_levels( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test different priority levels (0=undefined, 1=highest, 9=lowest).""" + calendar_name = temporary_calendar + priorities = [0, 1, 5, 9] + priority_labels = {0: "Undefined", 1: "Highest", 5: "Medium", 9: "Lowest"} + todo_uids = [] + + try: + # Create todos with different priorities + for priority in priorities: + todo_data = { + "summary": f"Priority {priority} Task ({priority_labels[priority]})", + "status": "NEEDS-ACTION", + "priority": priority, + } + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + todo_uids.append((result["uid"], priority)) + + # Verify all priorities + todos = await nc_client.calendar.list_todos(calendar_name) + + for uid, expected_priority in todo_uids: + todo = next((t for t in todos if t["uid"] == uid), None) + assert todo is not None + assert todo["priority"] == expected_priority + + logger.info(f"Successfully tested priority levels: {priorities}") + + finally: + # Cleanup + for uid, _ in todo_uids: + try: + await nc_client.calendar.delete_todo(calendar_name, uid) + except Exception: + pass + + +async def test_todo_with_categories( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test creating a todo with multiple categories.""" + calendar_name = temporary_calendar + + todo_data = { + "summary": "Task with Categories", + "description": "Testing category support", + "status": "NEEDS-ACTION", + "categories": "work,meeting,important,quarterly", + } + + try: + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + todo_uid = result["uid"] + logger.info(f"Created todo with categories, UID: {todo_uid}") + + # Verify categories + todos = await nc_client.calendar.list_todos(calendar_name) + created_todo = next((t for t in todos if t["uid"] == todo_uid), None) + + assert created_todo is not None + assert "categories" in created_todo + categories_str = created_todo["categories"] + assert "work" in categories_str + assert "meeting" in categories_str + assert "important" in categories_str + assert "quarterly" in categories_str + + # Cleanup + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + + except Exception as e: + logger.error(f"Categories test failed: {e}") + raise + + +async def test_search_todos_across_calendars( + nc_client: NextcloudClient, temporary_calendar: str, shared_calendar_2: str +): + """Test searching for todos across multiple calendars. + + Uses two shared test calendars to avoid rate limiting. + """ + # Use existing shared calendars to avoid rate limits + cal1_name = temporary_calendar # First shared test calendar + cal2_name = shared_calendar_2 # Second shared test calendar + + try: + # Create todos in both calendars + todo1_data = {"summary": "Task in Calendar 1", "status": "NEEDS-ACTION"} + todo2_data = {"summary": "Task in Calendar 2", "status": "IN-PROCESS"} + + result1 = await nc_client.calendar.create_todo(cal1_name, todo1_data) + result2 = await nc_client.calendar.create_todo(cal2_name, todo2_data) + + # Search across all calendars + all_todos = await nc_client.calendar.search_todos_across_calendars() + + assert isinstance(all_todos, list) + + # Find our todos + todo1 = next((t for t in all_todos if t["uid"] == result1["uid"]), None) + todo2 = next((t for t in all_todos if t["uid"] == result2["uid"]), None) + + assert todo1 is not None + assert todo2 is not None + assert "calendar_name" in todo1 + assert "calendar_name" in todo2 + assert todo1["calendar_name"] == cal1_name + assert todo2["calendar_name"] == cal2_name + + logger.info(f"Found {len(all_todos)} todos across all calendars") + + finally: + # Cleanup: Delete only the todos we created (calendars are reused/built-in) + try: + await nc_client.calendar.delete_todo(cal1_name, result1["uid"]) + except Exception: + pass + try: + await nc_client.calendar.delete_todo(cal2_name, result2["uid"]) + except Exception: + pass + + +# ============= Edge Case Tests ============= + + +async def test_get_nonexistent_todo( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test attempting to retrieve a non-existent todo.""" + calendar_name = temporary_calendar + fake_uid = f"nonexistent-{uuid.uuid4()}" + + # List todos to ensure it doesn't exist + todos = await nc_client.calendar.list_todos(calendar_name) + matching_todos = [t for t in todos if t.get("uid") == fake_uid] + assert len(matching_todos) == 0 + + logger.info(f"Verified nonexistent todo UID: {fake_uid}") + + +async def test_delete_nonexistent_todo( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test deleting a non-existent todo.""" + calendar_name = temporary_calendar + fake_uid = f"nonexistent-{uuid.uuid4()}" + + result = await nc_client.calendar.delete_todo(calendar_name, fake_uid) + assert result["status_code"] == 404 + logger.info(f"Correctly got 404 for deleting nonexistent todo: {fake_uid}") + + +async def test_list_todos_with_filters( + nc_client: NextcloudClient, temporary_calendar: str +): + """Test listing todos with various filters.""" + calendar_name = temporary_calendar + + # Create todos with different statuses and priorities + test_todos = [ + { + "summary": "High Priority Task", + "status": "NEEDS-ACTION", + "priority": 1, + "categories": "urgent", + }, + { + "summary": "In Progress Task", + "status": "IN-PROCESS", + "priority": 5, + "categories": "work", + }, + { + "summary": "Low Priority Task", + "status": "NEEDS-ACTION", + "priority": 9, + "categories": "someday", + }, + ] + + created_uids = [] + + try: + # Create test todos + for todo_data in test_todos: + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + created_uids.append(result["uid"]) + + # Test basic list without filters + all_todos = await nc_client.calendar.list_todos(calendar_name) + assert len(all_todos) >= 3 + + # Verify all our todos are in the list + our_todo_uids = [t["uid"] for t in all_todos if t["uid"] in created_uids] + assert len(our_todo_uids) == 3 + + logger.info(f"Successfully created and listed {len(created_uids)} test todos") + + finally: + # Cleanup + for uid in created_uids: + try: + await nc_client.calendar.delete_todo(calendar_name, uid) + except Exception: + pass diff --git a/tests/conftest.py b/tests/conftest.py index 394d816..c8b3f0c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -501,6 +501,120 @@ async def temporary_board_with_card( logger.error(f"Unexpected error deleting temporary card {card.id}: {e}") +@pytest.fixture(scope="session") +def shared_test_calendar_name(): + """Unique calendar name for the entire test session.""" + return f"test_calendar_shared_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def shared_test_calendar_name_2(): + """Second unique calendar name for cross-calendar tests.""" + return f"test_calendar_shared_2_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +async def shared_calendar(nc_client: NextcloudClient, shared_test_calendar_name: str): + """Create a shared calendar for all tests in the session. Reuses the calendar to avoid rate limiting.""" + calendar_name = shared_test_calendar_name + + try: + # Create a test calendar + logger.info(f"Creating shared test calendar: {calendar_name}") + result = await nc_client.calendar.create_calendar( + calendar_name=calendar_name, + display_name=f"Shared Test Calendar {calendar_name}", + description="Shared calendar for integration testing (reused across tests)", + color="#FF5722", + ) + + if result["status_code"] not in [200, 201]: + pytest.skip(f"Failed to create shared test calendar: {result}") + + logger.info(f"Created shared test calendar: {calendar_name}") + yield calendar_name + + except Exception as e: + logger.error(f"Error setting up shared test calendar: {e}") + pytest.skip(f"Shared calendar setup failed: {e}") + + finally: + # Cleanup: Delete the shared calendar at end of session + try: + logger.info(f"Cleaning up shared test calendar: {calendar_name}") + await nc_client.calendar.delete_calendar(calendar_name) + logger.info(f"Successfully deleted shared test calendar: {calendar_name}") + except Exception as e: + logger.error(f"Error deleting shared test calendar {calendar_name}: {e}") + + +@pytest.fixture(scope="session") +async def shared_calendar_2( + nc_client: NextcloudClient, shared_test_calendar_name_2: str +): + """Create a second shared calendar for cross-calendar tests.""" + calendar_name = shared_test_calendar_name_2 + + try: + # Create a test calendar + logger.info(f"Creating second shared test calendar: {calendar_name}") + result = await nc_client.calendar.create_calendar( + calendar_name=calendar_name, + display_name=f"Shared Test Calendar 2 {calendar_name}", + description="Second shared calendar for cross-calendar testing", + color="#4CAF50", + ) + + if result["status_code"] not in [200, 201]: + pytest.skip(f"Failed to create second shared test calendar: {result}") + + logger.info(f"Created second shared test calendar: {calendar_name}") + yield calendar_name + + except Exception as e: + logger.error(f"Error setting up second shared test calendar: {e}") + pytest.skip(f"Second shared calendar setup failed: {e}") + + finally: + # Cleanup: Delete the second shared calendar at end of session + try: + logger.info(f"Cleaning up second shared test calendar: {calendar_name}") + await nc_client.calendar.delete_calendar(calendar_name) + logger.info( + f"Successfully deleted second shared test calendar: {calendar_name}" + ) + except Exception as e: + logger.error( + f"Error deleting second shared test calendar {calendar_name}: {e}" + ) + + +@pytest.fixture +async def temporary_calendar(shared_calendar: str, nc_client: NextcloudClient): + """Provide the shared calendar and clean up todos after each test. + + This fixture reuses a session-scoped calendar to avoid Nextcloud rate limiting + on calendar creation. Each test gets the same calendar but todos are cleaned up + between tests. + """ + calendar_name = shared_calendar + + yield calendar_name + + # Cleanup: Delete all todos from this calendar + try: + logger.info(f"Cleaning up todos from shared calendar: {calendar_name}") + todos = await nc_client.calendar.list_todos(calendar_name) + for todo in todos: + try: + await nc_client.calendar.delete_todo(calendar_name, todo["uid"]) + except Exception as e: + logger.warning(f"Error deleting todo {todo['uid']}: {e}") + logger.info(f"Cleaned up {len(todos)} todos from shared calendar") + except Exception as e: + logger.error(f"Error cleaning up todos from calendar {calendar_name}: {e}") + + @pytest.fixture(scope="session") async def nc_oauth_client( anyio_backend, diff --git a/tests/server/test_calendar_todos_mcp.py b/tests/server/test_calendar_todos_mcp.py new file mode 100644 index 0000000..ff235e6 --- /dev/null +++ b/tests/server/test_calendar_todos_mcp.py @@ -0,0 +1,476 @@ +"""Integration tests for Calendar VTODO (task) MCP tools.""" + +import logging +from datetime import datetime, timedelta + +import pytest +from mcp import ClientSession + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) +pytestmark = pytest.mark.integration + + +async def test_mcp_todo_complete_workflow( + nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str +): + """Test complete todo workflow via MCP tools with verification via NextcloudClient.""" + + calendar_name = temporary_calendar + todo_uid = None + + try: + # 1. Create todo via MCP + logger.info(f"Creating todo in {calendar_name} via MCP") + tomorrow = datetime.now() + timedelta(days=1) + + create_result = await nc_mcp_client.call_tool( + "nc_calendar_create_todo", + { + "calendar_name": calendar_name, + "summary": "MCP Test Task", + "description": "Test task created via MCP tools", + "status": "NEEDS-ACTION", + "priority": 3, + "due": tomorrow.strftime("%Y-%m-%dT18:00:00"), + "categories": "testing,mcp", + }, + ) + assert create_result.isError is False + + # Extract UID from the result + result_data = create_result.content[0].text + import json + + result_json = json.loads(result_data) + todo_uid = result_json["uid"] + logger.info(f"Created todo with UID: {todo_uid}") + + # 2. Verify todo creation via client + todos = await nc_client.calendar.list_todos(calendar_name) + assert any(t["uid"] == todo_uid for t in todos) + created_todo = next(t for t in todos if t["uid"] == todo_uid) + assert created_todo["summary"] == "MCP Test Task" + assert created_todo["status"] == "NEEDS-ACTION" + assert created_todo["priority"] == 3 + + # 3. List todos via MCP + logger.info(f"Listing todos in {calendar_name} via MCP") + list_result = await nc_mcp_client.call_tool( + "nc_calendar_list_todos", + {"calendar_name": calendar_name}, + ) + assert list_result.isError is False + + list_data = json.loads(list_result.content[0].text) + assert "todos" in list_data + assert any(t["uid"] == todo_uid for t in list_data["todos"]) + + # 4. Update todo via MCP + logger.info(f"Updating todo {todo_uid} via MCP") + update_result = await nc_mcp_client.call_tool( + "nc_calendar_update_todo", + { + "calendar_name": calendar_name, + "todo_uid": todo_uid, + "summary": "MCP Test Task Updated", + "status": "IN-PROCESS", + "priority": 1, + "percent_complete": 50, + }, + ) + assert update_result.isError is False + + # 5. Verify update via client + todos = await nc_client.calendar.list_todos(calendar_name) + updated_todo = next(t for t in todos if t["uid"] == todo_uid) + assert updated_todo["summary"] == "MCP Test Task Updated" + assert updated_todo["status"] == "IN-PROCESS" + assert updated_todo["priority"] == 1 + assert updated_todo["percent_complete"] == 50 + + # 6. Delete todo via MCP + logger.info(f"Deleting todo {todo_uid} via MCP") + delete_result = await nc_mcp_client.call_tool( + "nc_calendar_delete_todo", + {"calendar_name": calendar_name, "todo_uid": todo_uid}, + ) + assert delete_result.isError is False + + # 7. Verify deletion via client + todos = await nc_client.calendar.list_todos(calendar_name) + assert not any(t["uid"] == todo_uid for t in todos) + + logger.info("Complete todo workflow test passed") + + finally: + # Cleanup in case of failure + if todo_uid: + try: + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + except Exception: + pass + + +async def test_mcp_list_todos_with_filters( + nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str +): + """Test listing todos with various filters via MCP tools.""" + + calendar_name = temporary_calendar + created_uids = [] + + try: + # Create test todos with different properties + test_todos = [ + { + "summary": "High Priority Task", + "status": "NEEDS-ACTION", + "priority": 1, + "categories": "urgent,work", + }, + { + "summary": "In Progress Task", + "status": "IN-PROCESS", + "priority": 5, + "categories": "work", + }, + { + "summary": "Low Priority Task", + "status": "NEEDS-ACTION", + "priority": 9, + "categories": "someday", + }, + ] + + # Create todos via client + for todo_data in test_todos: + result = await nc_client.calendar.create_todo(calendar_name, todo_data) + created_uids.append(result["uid"]) + + # Test 1: Filter by status + logger.info("Testing filter by status") + result = await nc_mcp_client.call_tool( + "nc_calendar_list_todos", + {"calendar_name": calendar_name, "status": "NEEDS-ACTION"}, + ) + assert result.isError is False + import json + + data = json.loads(result.content[0].text) + needs_action_todos = [t for t in data["todos"] if t["uid"] in created_uids] + assert len(needs_action_todos) == 2 # Two NEEDS-ACTION todos + + # Test 2: Filter by priority + logger.info("Testing filter by minimum priority") + result = await nc_mcp_client.call_tool( + "nc_calendar_list_todos", + {"calendar_name": calendar_name, "min_priority": 1}, + ) + assert result.isError is False + data = json.loads(result.content[0].text) + high_priority_todos = [t for t in data["todos"] if t["uid"] in created_uids] + assert len(high_priority_todos) >= 1 # At least the priority 1 todo + + # Test 3: Filter by categories + logger.info("Testing filter by categories") + result = await nc_mcp_client.call_tool( + "nc_calendar_list_todos", + {"calendar_name": calendar_name, "categories": "work"}, + ) + assert result.isError is False + data = json.loads(result.content[0].text) + work_todos = [t for t in data["todos"] if t["uid"] in created_uids] + assert len(work_todos) >= 2 # Two todos with "work" category + + # Test 4: Filter by summary text + logger.info("Testing filter by summary text") + result = await nc_mcp_client.call_tool( + "nc_calendar_list_todos", + {"calendar_name": calendar_name, "summary_contains": "Priority"}, + ) + assert result.isError is False + data = json.loads(result.content[0].text) + priority_todos = [t for t in data["todos"] if t["uid"] in created_uids] + assert len(priority_todos) == 2 # Two have "Priority" in summary (High, Low) + + logger.info("List todos with filters test passed") + + finally: + # Cleanup + for uid in created_uids: + try: + await nc_client.calendar.delete_todo(calendar_name, uid) + except Exception: + pass + + +async def test_mcp_search_todos_across_calendars( + nc_mcp_client: ClientSession, + nc_client: NextcloudClient, + temporary_calendar: str, + shared_calendar_2: str, +): + """Test searching todos across multiple calendars via MCP tools. + + Note: Uses two shared test calendars to avoid rate limiting. + """ + + cal1_name = temporary_calendar # First shared test calendar + cal2_name = shared_calendar_2 # Second shared test calendar + created_uids = [] + + try: + # Use existing shared calendars (no creation needed, avoiding rate limits) + + # Create todos in both calendars + result1 = await nc_client.calendar.create_todo( + cal1_name, + { + "summary": "Task in Calendar 1", + "status": "NEEDS-ACTION", + "categories": "cal1", + }, + ) + created_uids.append((cal1_name, result1["uid"])) + + result2 = await nc_client.calendar.create_todo( + cal2_name, + { + "summary": "Task in Calendar 2", + "status": "IN-PROCESS", + "categories": "cal2", + }, + ) + created_uids.append((cal2_name, result2["uid"])) + + # Search across all calendars via MCP + logger.info("Searching todos across all calendars via MCP") + search_result = await nc_mcp_client.call_tool( + "nc_calendar_search_todos", + {}, + ) + assert search_result.isError is False + + import json + + data = json.loads(search_result.content[0].text) + assert "todos" in data + + # Verify both todos are in the results + found_uids = {t["uid"] for t in data["todos"]} + assert result1["uid"] in found_uids + assert result2["uid"] in found_uids + + # Verify calendar_name is included + our_todos = [ + t for t in data["todos"] if t["uid"] in [result1["uid"], result2["uid"]] + ] + for todo in our_todos: + assert "calendar_name" in todo + assert todo["calendar_name"] in [cal1_name, cal2_name] + + # Test search with status filter + logger.info("Searching with status filter via MCP") + search_result = await nc_mcp_client.call_tool( + "nc_calendar_search_todos", + {"status": "IN-PROCESS"}, + ) + assert search_result.isError is False + data = json.loads(search_result.content[0].text) + in_process_todos = [ + t for t in data["todos"] if t["uid"] in [uid for _, uid in created_uids] + ] + assert len(in_process_todos) >= 1 + + logger.info("Search todos across calendars test passed") + + finally: + # Cleanup: Only delete todos, not calendars (they're reused/built-in) + for cal_name, uid in created_uids: + try: + await nc_client.calendar.delete_todo(cal_name, uid) + except Exception: + pass + + +async def test_mcp_todo_status_transitions( + nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str +): + """Test transitioning through different todo statuses via MCP tools.""" + + calendar_name = temporary_calendar + todo_uid = None + + try: + # Create todo + result = await nc_client.calendar.create_todo( + calendar_name, + {"summary": "Status Transition Test", "status": "NEEDS-ACTION"}, + ) + todo_uid = result["uid"] + + # Transition: NEEDS-ACTION → IN-PROCESS + logger.info("Transitioning todo to IN-PROCESS via MCP") + update_result = await nc_mcp_client.call_tool( + "nc_calendar_update_todo", + { + "calendar_name": calendar_name, + "todo_uid": todo_uid, + "status": "IN-PROCESS", + "percent_complete": 25, + }, + ) + assert update_result.isError is False + + todos = await nc_client.calendar.list_todos(calendar_name) + todo = next(t for t in todos if t["uid"] == todo_uid) + assert todo["status"] == "IN-PROCESS" + assert todo["percent_complete"] == 25 + + # Transition: IN-PROCESS → COMPLETED + logger.info("Transitioning todo to COMPLETED via MCP") + completed_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + update_result = await nc_mcp_client.call_tool( + "nc_calendar_update_todo", + { + "calendar_name": calendar_name, + "todo_uid": todo_uid, + "status": "COMPLETED", + "percent_complete": 100, + "completed": completed_time, + }, + ) + assert update_result.isError is False + + todos = await nc_client.calendar.list_todos(calendar_name) + todo = next(t for t in todos if t["uid"] == todo_uid) + assert todo["status"] == "COMPLETED" + assert todo["percent_complete"] == 100 + assert "completed" in todo + + logger.info("Todo status transitions test passed") + + finally: + if todo_uid: + try: + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + except Exception: + pass + + +async def test_mcp_todo_with_dates( + nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str +): + """Test creating and managing todos with date fields via MCP tools.""" + + calendar_name = temporary_calendar + todo_uid = None + + try: + now = datetime.now() + start_date = (now + timedelta(days=1)).strftime("%Y-%m-%dT09:00:00") + due_date = (now + timedelta(days=7)).strftime("%Y-%m-%dT17:00:00") + + # Create todo with dates via MCP + logger.info("Creating todo with dates via MCP") + create_result = await nc_mcp_client.call_tool( + "nc_calendar_create_todo", + { + "calendar_name": calendar_name, + "summary": "Task with Dates", + "description": "Test task with various date fields", + "status": "NEEDS-ACTION", + "dtstart": start_date, + "due": due_date, + }, + ) + assert create_result.isError is False + + import json + + result_data = json.loads(create_result.content[0].text) + todo_uid = result_data["uid"] + + # Verify dates via client + todos = await nc_client.calendar.list_todos(calendar_name) + created_todo = next(t for t in todos if t["uid"] == todo_uid) + assert created_todo["summary"] == "Task with Dates" + assert "dtstart" in created_todo + assert "due" in created_todo + + logger.info("Todo with dates test passed") + + finally: + if todo_uid: + try: + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + except Exception: + pass + + +async def test_mcp_todo_categories( + nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str +): + """Test creating and managing todos with categories via MCP tools.""" + + calendar_name = temporary_calendar + todo_uid = None + + try: + # Create todo with multiple categories via MCP + logger.info("Creating todo with categories via MCP") + create_result = await nc_mcp_client.call_tool( + "nc_calendar_create_todo", + { + "calendar_name": calendar_name, + "summary": "Task with Categories", + "status": "NEEDS-ACTION", + "categories": "work,meeting,important,quarterly", + }, + ) + assert create_result.isError is False + + import json + + result_data = json.loads(create_result.content[0].text) + todo_uid = result_data["uid"] + + # Verify categories via client + todos = await nc_client.calendar.list_todos(calendar_name) + created_todo = next(t for t in todos if t["uid"] == todo_uid) + assert "categories" in created_todo + categories_str = created_todo["categories"] + assert "work" in categories_str + assert "meeting" in categories_str + assert "important" in categories_str + assert "quarterly" in categories_str + + # Update categories via MCP + logger.info("Updating todo categories via MCP") + update_result = await nc_mcp_client.call_tool( + "nc_calendar_update_todo", + { + "calendar_name": calendar_name, + "todo_uid": todo_uid, + "categories": "updated,new-category", + }, + ) + assert update_result.isError is False + + # Verify updated categories + todos = await nc_client.calendar.list_todos(calendar_name) + updated_todo = next(t for t in todos if t["uid"] == todo_uid) + categories_str = updated_todo["categories"] + assert "updated" in categories_str + assert "new-category" in categories_str + + logger.info("Todo categories test passed") + + finally: + if todo_uid: + try: + await nc_client.calendar.delete_todo(calendar_name, todo_uid) + except Exception: + pass diff --git a/tests/server/test_mcp.py b/tests/server/test_mcp.py index 90a9ecb..ff9a310 100644 --- a/tests/server/test_mcp.py +++ b/tests/server/test_mcp.py @@ -57,6 +57,11 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession): "nc_calendar_find_availability", "nc_calendar_bulk_operations", "nc_calendar_manage_calendar", + "nc_calendar_list_todos", + "nc_calendar_create_todo", + "nc_calendar_update_todo", + "nc_calendar_delete_todo", + "nc_calendar_search_todos", "deck_create_board", "nc_cookbook_import_recipe", "nc_cookbook_list_recipes", diff --git a/uv.lock b/uv.lock index 185cde3..641256d 100644 --- a/uv.lock +++ b/uv.lock @@ -54,8 +54,8 @@ wheels = [ [[package]] name = "caldav" -version = "2.0.2.dev22+gaa8322dc7" -source = { git = "https://github.com/cbcoutinho/caldav?branch=feature%2Fhttpx#aa8322dc7c4d0bf99593e1f46e577bb0aa5073c8" } +version = "2.0.2.dev33+g4877e4688" +source = { git = "https://github.com/cbcoutinho/caldav?branch=feature%2Fhttpx#4877e46884dbd2bc54f8fb61ee5d056342605e9c" } dependencies = [ { name = "httpx", extra = ["http2"] }, { name = "icalendar" },