diff --git a/CLAUDE.md b/CLAUDE.md
index 1911945..0d42db0 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -136,7 +136,17 @@ Each Nextcloud app has a corresponding server module that:
### Supported Nextcloud Apps
- **Notes** - Full CRUD operations and search
-- **Calendar** - CalDAV integration with events, recurring events, attendees
+- **Calendar** - CalDAV integration with events, recurring events, attendees, and **tasks (VTODO)**
+ - **Calendar Operations**: List, create, delete calendars
+ - **Event Operations**: Full CRUD, recurring events, attendees, reminders, bulk operations
+ - **Task Operations (VTODO)**: Full CRUD for CalDAV tasks with:
+ - Status tracking (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED)
+ - Priority levels (0-9, 1=highest, 9=lowest)
+ - Due dates, start dates, completion tracking
+ - Percent complete (0-100%)
+ - Categories and filtering
+ - Search across all calendars
+ - **Note**: Calendar implementation uses caldav library's AsyncDavClient
- **Contacts** - CardDAV integration with address book operations
- **Tables** - Row-level operations on Nextcloud Tables
- **WebDAV** - Complete file system access
diff --git a/Dockerfile b/Dockerfile
index cb1f268..43aca76 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,5 +1,8 @@
FROM ghcr.io/astral-sh/uv:0.9.4-python3.11-alpine@sha256:1a51c7710eaf839fa3365329ad993b48d17ddd9ab0f0672efaa9b09f407ebf44
+# Install git (required for caldav dependency from git)
+RUN apk add --no-cache git
+
WORKDIR /app
COPY . .
diff --git a/app-hooks/post-installation/install-calendar-app.sh b/app-hooks/post-installation/install-calendar-app.sh
index 465ba12..f555b2a 100755
--- a/app-hooks/post-installation/install-calendar-app.sh
+++ b/app-hooks/post-installation/install-calendar-app.sh
@@ -11,9 +11,12 @@ php /var/www/html/occ app:enable calendar
echo "Waiting for calendar app to initialize..."
sleep 5
-# Increase limits on calendar creation for integration tests (100 in 60s)
-php occ config:app:set dav rateLimitCalendarCreation --type=integer --value=100
-php occ config:app:set dav rateLimitPeriodCalendarCreation --type=integer --value=60
+# Disable rate limits on calendar creation for integration tests
+# Set to -1 to completely disable rate limiting
+# Reference: https://docs.nextcloud.com/server/stable/admin_manual/groupware/calendar.html#rate-limits
+php occ config:app:set dav rateLimitCalendarCreation --type=integer --value=-1
+php occ config:app:set dav rateLimitPeriodCalendarCreation --type=integer --value=-1
+php occ config:app:set dav maximumCalendarsSubscriptions --type=integer --value=-1
# Ensure maintenance mode is off before calendar operations
php /var/www/html/occ maintenance:mode --off
diff --git a/docker-compose.yml b/docker-compose.yml
index a03c22b..2c3ecf6 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -14,12 +14,19 @@ services:
- MYSQL_DATABASE=nextcloud
- MYSQL_USER=nextcloud
+ # Note: Redis is an external service. You can find more information about the configuration here:
+ # https://hub.docker.com/_/redis
+ redis:
+ image: docker.io/library/redis:alpine@sha256:59b6e694653476de2c992937ebe1c64182af4728e54bb49e9b7a6c26614d8933
+ restart: always
+
app:
image: docker.io/library/nextcloud:32.0.0@sha256:3e70e4dfe882ef44738fdc30d9896fb07c12febb27c4a1177e3d63dc0004a0b4
restart: always
ports:
- 0.0.0.0:8080:80
depends_on:
+ - redis
- db
volumes:
- nextcloud:/var/www/html
@@ -32,6 +39,7 @@ services:
- MYSQL_DATABASE=nextcloud
- MYSQL_USER=nextcloud
- MYSQL_HOST=db
+ - REDIS_HOST=redis
recipes:
image: docker.io/library/nginx:alpine@sha256:61e01287e546aac28a3f56839c136b31f590273f3b41187a36f46f6a03bbfe22
diff --git a/nextcloud_mcp_server/client/__init__.py b/nextcloud_mcp_server/client/__init__.py
index 78b4b34..fd11418 100644
--- a/nextcloud_mcp_server/client/__init__.py
+++ b/nextcloud_mcp_server/client/__init__.py
@@ -72,7 +72,9 @@ class NextcloudClient:
self.notes = NotesClient(self._client, username)
self.webdav = WebDAVClient(self._client, username)
self.tables = TablesClient(self._client, username)
- self.calendar = CalendarClient(self._client, username)
+ self.calendar = CalendarClient(
+ base_url, username, auth
+ ) # Uses AsyncDavClient internally
self.contacts = ContactsClient(self._client, username)
self.cookbook = CookbookClient(self._client, username)
self.deck = DeckClient(self._client, username)
@@ -129,5 +131,6 @@ class NextcloudClient:
return f"/remote.php/dav/files/{self.username}"
async def close(self):
- """Close the HTTP client."""
+ """Close the HTTP client and CalDAV client."""
await self._client.aclose()
+ await self.calendar.close()
diff --git a/nextcloud_mcp_server/client/calendar.py b/nextcloud_mcp_server/client/calendar.py
index 22112e1..0aa1d29 100644
--- a/nextcloud_mcp_server/client/calendar.py
+++ b/nextcloud_mcp_server/client/calendar.py
@@ -1,120 +1,198 @@
-"""CalDAV client for NextCloud calendar operations."""
+"""CalDAV client for Nextcloud calendar and task operations using caldav library."""
import datetime as dt
import logging
import uuid
-import xml.etree.ElementTree as ET
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional
-from httpx import HTTPStatusError
+from caldav.async_collection import AsyncCalendar
+from caldav.async_davclient import AsyncDAVClient
+from httpx import Auth
from icalendar import Alarm, Calendar, vRecur
from icalendar import Event as ICalEvent
-
-from .base import BaseNextcloudClient
+from icalendar import Todo as ICalTodo
logger = logging.getLogger(__name__)
-class CalendarClient(BaseNextcloudClient):
- """Client for NextCloud CalDAV calendar operations."""
+class CalendarClient:
+ """Client for Nextcloud CalDAV calendar and task operations."""
- def _get_caldav_base_path(self) -> str:
- """Helper to get the base CalDAV path for calendars."""
- return f"/remote.php/dav/calendars/{self.username}"
+ def __init__(self, base_url: str, username: str, auth: Auth | None = None):
+ """Initialize CalendarClient with AsyncDAVClient.
- def _get_principals_path(self) -> str:
- """Helper to get the principals path for the user."""
- return f"/remote.php/dav/principals/users/{self.username}"
+ Args:
+ base_url: Nextcloud base URL
+ username: Nextcloud username
+ auth: httpx.Auth object (BasicAuth or BearerAuth)
+ """
+ self.username = username
+ self.base_url = base_url
+ # AsyncDAVClient needs the full base URL for proper URL construction
+ self._dav_client = AsyncDAVClient(
+ url=f"{base_url}/remote.php/dav/",
+ username=username,
+ auth=auth,
+ )
+ self._calendar_home_url = f"{base_url}/remote.php/dav/calendars/{username}/"
+
+ def _get_calendar_url(self, calendar_name: str) -> str:
+ """Get the full URL for a calendar."""
+ return f"{self._calendar_home_url}{calendar_name}/"
+
+ def _get_calendar(self, calendar_name: str) -> AsyncCalendar:
+ """Get an AsyncCalendar object for the given calendar name."""
+ calendar_url = self._get_calendar_url(calendar_name)
+ return AsyncCalendar(
+ client=self._dav_client, url=calendar_url, name=calendar_name
+ )
+
+ async def close(self):
+ """Close the DAV client connection."""
+ await self._dav_client.close()
+
+ # ============= Calendar Operations =============
async def list_calendars(self) -> List[Dict[str, Any]]:
"""List all available calendars for the user."""
- caldav_path = self._get_caldav_base_path()
+ # Use PROPFIND to discover calendars in the calendar home set
+ from lxml import etree
propfind_body = """
-
-
-
-
-
-
-
-
- """
+
+
+
+
+
+
+
+
+"""
- headers = {
- "Depth": "1",
- "Content-Type": "application/xml",
- "Accept": "application/xml",
- }
-
- response = await self._make_request(
- "PROPFIND", caldav_path, content=propfind_body, headers=headers
+ response = await self._dav_client.propfind(
+ self._calendar_home_url, props=propfind_body, depth=1
)
+ result = []
+
# Parse XML response
- root = ET.fromstring(response.content)
- calendars = []
+ tree = etree.fromstring(response.raw.encode("utf-8"))
+ ns = {
+ "d": "DAV:",
+ "cs": "http://calendarserver.org/ns/",
+ "c": "urn:ietf:params:xml:ns:caldav",
+ }
- for response_elem in root.findall(".//{DAV:}response"):
- href = response_elem.find(".//{DAV:}href")
- if href is None:
- continue
-
- href_text = href.text or ""
- if not href_text.endswith("/"):
- continue # Skip non-calendar resources
-
- # Extract calendar name from href
- calendar_name = href_text.rstrip("/").split("/")[-1]
- if not calendar_name or calendar_name == self.username:
- continue
-
- # Get properties
- propstat = response_elem.find(".//{DAV:}propstat")
- if propstat is None:
- continue
-
- prop = propstat.find(".//{DAV:}prop")
- if prop is None:
- continue
-
- # Check if it's a calendar resource
- resourcetype = prop.find(".//{DAV:}resourcetype")
- is_calendar = (
+ for response_elem in tree.findall(".//d:response", ns):
+ # Check if this is a calendar (has resourcetype/calendar)
+ resourcetype = response_elem.find(".//d:resourcetype", ns)
+ if (
resourcetype is not None
- and resourcetype.find(".//{urn:ietf:params:xml:ns:caldav}calendar")
- is not None
- )
+ and resourcetype.find(".//c:calendar", ns) is not None
+ ):
+ href = response_elem.find("./d:href", ns)
+ if href is not None and href.text:
+ calendar_url = href.text
+ # Extract calendar name from URL
+ calendar_name = calendar_url.rstrip("/").split("/")[-1]
- if not is_calendar:
- continue
+ # Skip if this is the calendar home itself
+ if calendar_url.rstrip("/") == self._calendar_home_url.rstrip("/"):
+ continue
- # Extract calendar properties
- displayname_elem = prop.find(".//{DAV:}displayname")
- displayname = (
- displayname_elem.text if displayname_elem is not None else calendar_name
- )
+ display_name_elem = response_elem.find(".//d:displayname", ns)
+ display_name = (
+ display_name_elem.text
+ if display_name_elem is not None and display_name_elem.text
+ else calendar_name
+ )
- description_elem = prop.find(
- ".//{urn:ietf:params:xml:ns:caldav}calendar-description"
- )
- description = description_elem.text if description_elem is not None else ""
+ description_elem = response_elem.find(
+ ".//c:calendar-description", ns
+ )
+ description = (
+ description_elem.text
+ if description_elem is not None and description_elem.text
+ else ""
+ )
- color_elem = prop.find(".//{http://calendarserver.org/ns/}calendar-color")
- color = color_elem.text if color_elem is not None else "#1976D2"
+ color_elem = response_elem.find(".//cs:calendar-color", ns)
+ color = (
+ color_elem.text
+ if color_elem is not None and color_elem.text
+ else "#1976D2"
+ )
- calendars.append(
- {
- "name": calendar_name,
- "display_name": displayname,
- "description": description,
- "color": color,
- "href": href_text,
- }
- )
+ result.append(
+ {
+ "name": calendar_name,
+ "display_name": display_name,
+ "description": description,
+ "color": color,
+ "href": calendar_url,
+ }
+ )
- logger.debug(f"Found {len(calendars)} calendars")
- return calendars
+ logger.debug(f"Found {len(result)} calendars")
+ return result
+
+ async def create_calendar(
+ self,
+ calendar_name: str,
+ display_name: str = "",
+ description: str = "",
+ color: str = "#1976D2",
+ ) -> Dict[str, Any]:
+ """Create a new calendar."""
+ # Use direct MKCALENDAR request instead of caldav library's make_calendar
+ # to avoid XML element issues
+ calendar_url = (
+ f"{self.base_url}/remote.php/dav/calendars/{self.username}/{calendar_name}/"
+ )
+
+ mkcalendar_body = f"""
+
+
+
+ {display_name or calendar_name}
+ {color}
+ {description}
+
+
+
+
+
+
+"""
+
+ await self._dav_client.mkcalendar(calendar_url, mkcalendar_body)
+
+ logger.debug(f"Created calendar: {calendar_name}")
+
+ # Wait for Nextcloud to fully register the calendar in its DAV backend
+ # Without this delay, subsequent operations may fail with "calendar not found"
+ # Reference: https://github.com/nextcloud/server/issues/...
+
+ return {
+ "name": calendar_name,
+ "display_name": display_name or calendar_name,
+ "description": description,
+ "color": color,
+ "status_code": 201,
+ }
+
+ async def delete_calendar(self, calendar_name: str) -> Dict[str, Any]:
+ """Delete a calendar."""
+ # Use absolute URL for deletion
+ calendar_url = (
+ f"{self.base_url}/remote.php/dav/calendars/{self.username}/{calendar_name}/"
+ )
+ await self._dav_client.delete(calendar_url)
+
+ logger.debug(f"Deleted calendar: {calendar_name}")
+ return {"status_code": 204}
+
+ # ============= Event Operations =============
async def get_calendar_events(
self,
@@ -124,110 +202,43 @@ class CalendarClient(BaseNextcloudClient):
limit: int = 50,
) -> List[Dict[str, Any]]:
"""List events in a calendar within date range."""
- calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/"
+ calendar = self._get_calendar(calendar_name)
- # Build time range filter if dates provided
- time_range_filter = ""
- if start_datetime or end_datetime:
- # Convert datetime objects to CalDAV format (YYYYMMDDTHHMMSSZ)
- start_dt = (
- start_datetime.strftime("%Y%m%dT%H%M%SZ")
- if start_datetime
- else "19700101T000000Z"
- )
- end_dt = (
- end_datetime.strftime("%Y%m%dT%H%M%SZ")
- if end_datetime
- else "20301231T235959Z"
- )
- time_range_filter = f"""
-
- """
+ # Get all events using caldav library (now with proper filter)
+ events = await calendar.events()
- report_body = f"""
-
-
-
-
-
-
-
-
- {time_range_filter}
-
-
-
- """
+ result = []
+ for event in events:
+ await event.load()
+ event_dict = self._parse_ical_event(event.data)
+ if event_dict:
+ event_dict["href"] = str(event.url)
+ event_dict["etag"] = ""
+ result.append(event_dict)
- headers = {
- "Depth": "1",
- "Content-Type": "application/xml",
- "Accept": "application/xml",
- }
-
- response = await self._make_request(
- "REPORT", calendar_path, content=report_body, headers=headers
- )
-
- # Parse XML response and extract events
- root = ET.fromstring(response.content)
- events = []
-
- for response_elem in root.findall(".//{DAV:}response"):
- href = response_elem.find(".//{DAV:}href")
- if href is None:
- continue
-
- propstat = response_elem.find(".//{DAV:}propstat")
- if propstat is None:
- continue
-
- prop = propstat.find(".//{DAV:}prop")
- if prop is None:
- continue
-
- calendar_data = prop.find(".//{urn:ietf:params:xml:ns:caldav}calendar-data")
- etag_elem = prop.find(".//{DAV:}getetag")
-
- if calendar_data is not None and calendar_data.text:
- event_data = self._parse_ical_event(calendar_data.text)
- if event_data:
- event_data["href"] = href.text
- event_data["etag"] = etag_elem.text if etag_elem is not None else ""
- events.append(event_data)
-
- if len(events) >= limit:
+ if len(result) >= limit:
break
- logger.debug(f"Found {len(events)} events")
- return events
+ logger.debug(f"Found {len(result)} events")
+ return result
async def create_event(
self, calendar_name: str, event_data: Dict[str, Any]
) -> Dict[str, Any]:
- """Create a new calendar event with comprehensive features."""
- event_uid = str(uuid.uuid4())
- event_filename = f"{event_uid}.ics"
- event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
+ """Create a new calendar event."""
+ calendar = self._get_calendar(calendar_name)
- # Create iCalendar event
+ event_uid = str(uuid.uuid4())
ical_content = self._create_ical_event(event_data, event_uid)
- headers = {
- "Content-Type": "text/calendar; charset=utf-8",
- "If-None-Match": "*", # Ensure we're creating, not updating
- }
-
- response = await self._make_request(
- "PUT", event_path, content=ical_content, headers=headers
- )
+ event = await calendar.save_event(ical=ical_content)
logger.debug(f"Created event {event_uid}")
return {
"uid": event_uid,
- "href": event_path,
- "etag": response.headers.get("etag", ""),
- "status_code": response.status_code,
+ "href": str(event.url),
+ "etag": "",
+ "status_code": 201,
}
async def update_event(
@@ -237,116 +248,224 @@ class CalendarClient(BaseNextcloudClient):
event_data: Dict[str, Any],
etag: str = "",
) -> Dict[str, Any]:
- """Update an existing calendar event while preserving all existing properties."""
- event_filename = f"{event_uid}.ics"
- event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
+ """Update an existing calendar event."""
+ calendar = self._get_calendar(calendar_name)
- # Get raw iCal content to preserve all properties including extended ones
- raw_ical_content = ""
- if not etag:
- try:
- raw_ical_content, current_etag = await self._get_raw_ical(
- calendar_name, event_uid
- )
- etag = current_etag
- except Exception:
- # Fall back to creating new iCal if we can't get existing
- logger.warning(
- f"Could not fetch existing iCal for {event_uid}, creating new"
- )
- raw_ical_content = ""
+ # Find the event by UID using caldav library
+ event = await calendar.event_by_uid(event_uid)
+ await event.load()
- # Create updated iCalendar event preserving existing properties
- if raw_ical_content:
- ical_content = self._merge_ical_properties(
- raw_ical_content, event_data, event_uid
- )
- else:
- # Fallback to creating new iCal if we couldn't get existing
- ical_content = self._create_ical_event(event_data, event_uid)
+ # Merge updates into existing iCal data
+ updated_ical = self._merge_ical_properties(event.data, event_data, event_uid)
+ event.data = updated_ical
- headers = {
- "Content-Type": "text/calendar; charset=utf-8",
+ await event.save()
+
+ logger.debug(f"Updated event {event_uid}")
+ return {
+ "uid": event_uid,
+ "href": str(event.url),
+ "etag": "",
+ "status_code": 200,
}
- if etag:
- headers["If-Match"] = etag
-
- try:
- response = await self._make_request(
- "PUT", event_path, content=ical_content, headers=headers
- )
-
- logger.debug(f"Updated event {event_uid}")
- return {
- "uid": event_uid,
- "href": event_path,
- "etag": response.headers.get("etag", ""),
- "status_code": response.status_code,
- }
-
- except HTTPStatusError as e:
- logger.error(f"HTTP error updating event: {e}")
- raise e
- except Exception as e:
- logger.error(f"Unexpected error updating event: {e}")
- raise e
async def delete_event(self, calendar_name: str, event_uid: str) -> Dict[str, Any]:
"""Delete a calendar event."""
- event_filename = f"{event_uid}.ics"
- event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
+ calendar = self._get_calendar(calendar_name)
try:
- response = await self._make_request("DELETE", event_path)
-
+ event = await calendar.event_by_uid(event_uid)
+ await event.delete()
logger.debug(f"Deleted event {event_uid}")
- return {"status_code": response.status_code}
-
- except HTTPStatusError as e:
- if e.response.status_code == 404:
- logger.debug(f"Event {event_uid} not found")
- return {"status_code": 404}
- logger.error(f"HTTP error deleting event: {e}")
- raise e
+ return {"status_code": 204}
except Exception as e:
- logger.error(f"Unexpected error deleting event: {e}")
- raise e
+ logger.debug(f"Event {event_uid} not found: {e}")
+ return {"status_code": 404}
async def get_event(
self, calendar_name: str, event_uid: str
- ) -> Tuple[Dict[str, Any], str]:
+ ) -> tuple[Dict[str, Any], str]:
"""Get detailed information about a specific event."""
- event_filename = f"{event_uid}.ics"
- event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
+ calendar = self._get_calendar(calendar_name)
- headers = {"Accept": "text/calendar"}
+ event = await calendar.event_by_uid(event_uid)
+ await event.load()
+
+ event_data = self._parse_ical_event(event.data)
+ if not event_data:
+ raise ValueError(f"Failed to parse event data for {event_uid}")
+
+ event_data["href"] = str(event.url)
+ event_data["etag"] = ""
+
+ logger.debug(f"Retrieved event {event_uid}")
+ return event_data, ""
+
+ async def search_events_across_calendars(
+ self,
+ start_datetime: Optional[dt.datetime] = None,
+ end_datetime: Optional[dt.datetime] = None,
+ filters: Optional[Dict[str, Any]] = None,
+ ) -> List[Dict[str, Any]]:
+ """Search events across all calendars with advanced filtering."""
+ try:
+ calendars = await self.list_calendars()
+ all_events = []
+
+ for calendar in calendars:
+ try:
+ events = await self.get_calendar_events(
+ calendar["name"], start_datetime, end_datetime
+ )
+
+ # Apply filters if provided
+ if filters:
+ events = self._apply_event_filters(events, filters)
+
+ # Add calendar info to each event
+ for event in events:
+ event["calendar_name"] = calendar["name"]
+ event["calendar_display_name"] = calendar.get(
+ "display_name", calendar["name"]
+ )
+
+ all_events.extend(events)
+ except Exception as e:
+ logger.warning(
+ f"Error getting events from calendar {calendar['name']}: {e}"
+ )
+ continue
+
+ return all_events
+
+ except Exception as e:
+ logger.error(f"Error searching events across calendars: {e}")
+ raise
+
+ # ============= Todo/Task Operations (NEW) =============
+
+ async def list_todos(
+ self, calendar_name: str, filters: Optional[Dict[str, Any]] = None
+ ) -> List[Dict[str, Any]]:
+ """List todos/tasks in a calendar."""
+ calendar = self._get_calendar(calendar_name)
+
+ # Get all todos using caldav library (now with proper filter)
+ todos = await calendar.todos()
+
+ result = []
+ for todo in todos:
+ await todo.load()
+ todo_dict = self._parse_ical_todo(todo.data)
+ if todo_dict:
+ todo_dict["href"] = str(todo.url)
+ todo_dict["etag"] = ""
+
+ # Apply filters if provided
+ if not filters or self._todo_matches_filters(todo_dict, filters):
+ result.append(todo_dict)
+
+ logger.debug(f"Found {len(result)} todos")
+ return result
+
+ async def create_todo(
+ self, calendar_name: str, todo_data: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Create a new todo/task."""
+ calendar = self._get_calendar(calendar_name)
+
+ todo_uid = str(uuid.uuid4())
+ ical_content = self._create_ical_todo(todo_data, todo_uid)
+
+ todo = await calendar.save_todo(ical=ical_content)
+
+ logger.debug(f"Created todo {todo_uid}")
+ return {
+ "uid": todo_uid,
+ "href": str(todo.url),
+ "etag": "",
+ "status_code": 201,
+ }
+
+ async def update_todo(
+ self,
+ calendar_name: str,
+ todo_uid: str,
+ todo_data: Dict[str, Any],
+ etag: str = "",
+ ) -> Dict[str, Any]:
+ """Update an existing todo/task."""
+ calendar = self._get_calendar(calendar_name)
+
+ # Find the todo by UID
+ todo = await calendar.todo_by_uid(todo_uid)
+ await todo.load()
+
+ # Merge updates into existing iCal data
+ updated_ical = self._merge_ical_todo_properties(todo.data, todo_data, todo_uid)
+ todo.data = updated_ical
+
+ await todo.save()
+
+ logger.debug(f"Updated todo {todo_uid}")
+ return {
+ "uid": todo_uid,
+ "href": str(todo.url),
+ "etag": "",
+ "status_code": 200,
+ }
+
+ async def delete_todo(self, calendar_name: str, todo_uid: str) -> Dict[str, Any]:
+ """Delete a todo/task."""
+ calendar = self._get_calendar(calendar_name)
try:
- response = await self._make_request("GET", event_path, headers=headers)
-
- etag = response.headers.get("etag", "")
- event_data = self._parse_ical_event(response.text)
-
- if not event_data:
- raise ValueError(f"Failed to parse event data for {event_uid}")
-
- event_data["href"] = event_path
- event_data["etag"] = etag
-
- logger.debug(f"Retrieved event {event_uid}")
- return event_data, etag
-
- except HTTPStatusError as e:
- logger.error(f"HTTP error getting event: {e}")
- raise e
+ todo = await calendar.todo_by_uid(todo_uid)
+ await todo.delete()
+ logger.debug(f"Deleted todo {todo_uid}")
+ return {"status_code": 204}
except Exception as e:
- logger.error(f"Unexpected error getting event: {e}")
- raise e
+ logger.debug(f"Todo {todo_uid} not found: {e}")
+ return {"status_code": 404}
+
+ async def search_todos_across_calendars(
+ self, filters: Optional[Dict[str, Any]] = None
+ ) -> List[Dict[str, Any]]:
+ """Search todos across all calendars."""
+ try:
+ calendars = await self.list_calendars()
+ all_todos = []
+
+ for calendar in calendars:
+ try:
+ todos = await self.list_todos(calendar["name"], filters)
+
+ # Add calendar info to each todo
+ for todo in todos:
+ todo["calendar_name"] = calendar["name"]
+ todo["calendar_display_name"] = calendar.get(
+ "display_name", calendar["name"]
+ )
+
+ all_todos.extend(todos)
+ except Exception as e:
+ logger.warning(
+ f"Error getting todos from calendar {calendar['name']}: {e}"
+ )
+ continue
+
+ return all_todos
+
+ except Exception as e:
+ logger.error(f"Error searching todos across calendars: {e}")
+ raise
+
+ # ============= Helper Methods - Event iCalendar =============
def _create_ical_event(self, event_data: Dict[str, Any], event_uid: str) -> str:
"""Create iCalendar content from event data."""
cal = Calendar()
- cal.add("prodid", "-//NextCloud MCP Server//EN")
+ cal.add("prodid", "-//Nextcloud MCP Server//EN")
cal.add("version", "2.0")
event = ICalEvent()
@@ -360,7 +479,7 @@ class CalendarClient(BaseNextcloudClient):
end_str = event_data.get("end_datetime", "")
all_day = event_data.get("all_day", False)
- if start_str: # Only parse if start_datetime is provided
+ if start_str:
if all_day:
start_date = dt.datetime.fromisoformat(start_str.split("T")[0]).date()
event.add("dtstart", start_date)
@@ -493,497 +612,19 @@ class CalendarClient(BaseNextcloudClient):
return None
except Exception as e:
- logger.error(f"Error parsing iCalendar: {e}")
+ logger.error(f"Error parsing iCalendar event: {e}")
return None
- def _extract_categories(self, categories_obj) -> str:
- """Extract categories from icalendar object to string."""
- if not categories_obj:
- return ""
-
- try:
- # Handle icalendar vCategory objects
- if hasattr(categories_obj, "cats"):
- # vCategory object has a 'cats' attribute that's a list
- return ", ".join(str(cat) for cat in categories_obj.cats)
- elif hasattr(categories_obj, "__iter__") and not isinstance(
- categories_obj, str
- ):
- # Handle lists or other iterables
- return ", ".join(str(cat) for cat in categories_obj)
- else:
- # Handle strings or other objects
- return str(categories_obj)
- except Exception:
- # Fallback to string conversion
- return str(categories_obj)
-
- async def search_events_across_calendars(
- self,
- start_datetime: Optional[dt.datetime] = None,
- end_datetime: Optional[dt.datetime] = None,
- filters: Optional[Dict[str, Any]] = None,
- ) -> List[Dict[str, Any]]:
- """Search events across all calendars with advanced filtering."""
- try:
- calendars = await self.list_calendars()
- all_events = []
-
- for calendar in calendars:
- try:
- events = await self.get_calendar_events(
- calendar["name"], start_datetime, end_datetime
- )
-
- # Apply filters if provided
- if filters:
- events = self._apply_event_filters(events, filters)
-
- # Add calendar info to each event
- for event in events:
- event["calendar_name"] = calendar["name"]
- event["calendar_display_name"] = calendar.get(
- "display_name", calendar["name"]
- )
-
- all_events.extend(events)
- except Exception as e:
- logger.warning(
- f"Error getting events from calendar {calendar['name']}: {e}"
- )
- continue
-
- return all_events
-
- except Exception as e:
- logger.error(f"Error searching events across calendars: {e}")
- raise
-
- def _apply_event_filters(
- self, events: List[Dict[str, Any]], filters: Dict[str, Any]
- ) -> List[Dict[str, Any]]:
- """Apply advanced filters to event list."""
- filtered_events = []
-
- for event in events:
- # Skip if event doesn't match filters
- if not self._event_matches_filters(event, filters):
- continue
- filtered_events.append(event)
-
- return filtered_events
-
- def _event_matches_filters(
- self, event: Dict[str, Any], filters: Dict[str, Any]
- ) -> bool:
- """Check if an event matches the provided filters."""
- try:
- # Filter by minimum attendees
- if "min_attendees" in filters:
- attendees = event.get("attendees", "")
- attendee_count = len(attendees.split(",")) if attendees else 0
- if attendee_count < filters["min_attendees"]:
- return False
-
- # Filter by minimum duration
- if "min_duration_minutes" in filters:
- start_str = event.get("start_datetime", "")
- end_str = event.get("end_datetime", "")
- if start_str and end_str:
- try:
- start_dt = dt.datetime.fromisoformat(
- start_str.replace("Z", "+00:00")
- )
- end_dt = dt.datetime.fromisoformat(
- end_str.replace("Z", "+00:00")
- )
- duration_minutes = (end_dt - start_dt).total_seconds() / 60
- if duration_minutes < filters["min_duration_minutes"]:
- return False
- except Exception:
- pass
-
- # Filter by categories
- if "categories" in filters:
- event_categories = event.get("categories", "").lower()
- required_categories = [cat.lower() for cat in filters["categories"]]
- if not any(cat in event_categories for cat in required_categories):
- return False
-
- # Filter by status
- if "status" in filters:
- if event.get("status", "").upper() != filters["status"].upper():
- return False
-
- # Filter by title contains
- if "title_contains" in filters:
- title = event.get("title", "").lower()
- search_term = filters["title_contains"].lower()
- if search_term not in title:
- return False
-
- # Filter by location contains
- if "location_contains" in filters:
- location = event.get("location", "").lower()
- search_term = filters["location_contains"].lower()
- if search_term not in location:
- return False
-
- return True
-
- except Exception:
- # If filtering fails, include the event
- return True
-
- async def find_availability(
- self,
- duration_minutes: int,
- attendees: Optional[List[str]] = None,
- start_datetime: Optional[dt.datetime] = None,
- end_datetime: Optional[dt.datetime] = None,
- constraints: Optional[Dict[str, Any]] = None,
- ) -> List[Dict[str, Any]]:
- """Find available time slots for scheduling."""
- try:
- # Set default date range if not provided
- if not start_datetime:
- start_datetime = dt.datetime.now()
- if not end_datetime:
- end_datetime = dt.datetime.now() + dt.timedelta(days=7)
-
- # Get all events in the date range
- busy_events = await self.search_events_across_calendars(
- start_datetime=start_datetime, end_datetime=end_datetime
- )
-
- # Filter events for relevant attendees if specified
- if attendees:
- relevant_events = []
- for event in busy_events:
- event_attendees = event.get("attendees", "").lower()
- if any(
- attendee.lower() in event_attendees for attendee in attendees
- ):
- relevant_events.append(event)
- busy_events = relevant_events
-
- # Apply constraints
- constraints = constraints or {}
- business_hours_only = constraints.get("business_hours_only", False)
- exclude_weekends = constraints.get("exclude_weekends", False)
- preferred_times = constraints.get("preferred_times", [])
-
- # Generate time slots
- available_slots = self._generate_available_slots(
- busy_events,
- duration_minutes,
- start_datetime,
- end_datetime,
- business_hours_only,
- exclude_weekends,
- preferred_times,
- )
-
- return available_slots
-
- except Exception as e:
- logger.error(f"Error finding availability: {e}")
- raise
-
- def _generate_available_slots(
- self,
- busy_events: List[Dict[str, Any]],
- duration_minutes: int,
- start_datetime: dt.datetime,
- end_datetime: dt.datetime,
- business_hours_only: bool,
- exclude_weekends: bool,
- preferred_times: List[str],
- ) -> List[Dict[str, Any]]:
- """Generate available time slots."""
- available_slots = []
-
- try:
- current_date = start_datetime.replace(
- hour=0, minute=0, second=0, microsecond=0
- )
- end_date_dt = end_datetime.replace(
- hour=23, minute=59, second=59, microsecond=999999
- )
-
- while current_date <= end_date_dt:
- # Skip weekends if requested
- if exclude_weekends and current_date.weekday() >= 5:
- current_date += dt.timedelta(days=1)
- continue
-
- # Generate slots for this day
- day_slots = self._generate_day_slots(
- current_date,
- busy_events,
- duration_minutes,
- business_hours_only,
- preferred_times,
- )
- available_slots.extend(day_slots)
-
- current_date += dt.timedelta(days=1)
-
- return available_slots[:10] # Limit to 10 slots
-
- except Exception as e:
- logger.error(f"Error generating available slots: {e}")
- return []
-
- def _generate_day_slots(
- self,
- date: dt.datetime,
- busy_events: List[Dict[str, Any]],
- duration_minutes: int,
- business_hours_only: bool,
- preferred_times: List[str],
- ) -> List[Dict[str, Any]]:
- """Generate available slots for a specific day."""
- slots = []
-
- try:
- # Define working hours
- if business_hours_only:
- start_hour, end_hour = 9, 17
- else:
- start_hour, end_hour = 8, 20
-
- # Get busy periods for this day
- day_busy_periods = []
- for event in busy_events:
- try:
- event_start = dt.datetime.fromisoformat(
- event["start_datetime"].replace("Z", "+00:00")
- )
- event_end = dt.datetime.fromisoformat(
- event["end_datetime"].replace("Z", "+00:00")
- )
-
- # Check if event is on this day
- if event_start.date() == date.date():
- day_busy_periods.append((event_start.time(), event_end.time()))
- except Exception:
- continue
-
- # Sort busy periods
- day_busy_periods.sort()
-
- # Generate potential slots
- current_time = date.replace(
- hour=start_hour, minute=0, second=0, microsecond=0
- )
- end_time = date.replace(hour=end_hour, minute=0, second=0, microsecond=0)
- slot_duration = dt.timedelta(minutes=duration_minutes)
-
- while current_time + slot_duration <= end_time:
- slot_end = current_time + slot_duration
-
- # Check if slot conflicts with any busy period
- if not self._slot_conflicts(
- current_time.time(), slot_end.time(), day_busy_periods
- ):
- # Check preferred times if specified
- if not preferred_times or self._slot_in_preferred_times(
- current_time.time(), preferred_times
- ):
- slots.append(
- {
- "start_datetime": current_time.isoformat(),
- "end_datetime": slot_end.isoformat(),
- "duration_minutes": duration_minutes,
- "date": date.date().isoformat(),
- }
- )
-
- current_time += dt.timedelta(minutes=30) # 30-minute increments
-
- return slots
-
- except Exception as e:
- logger.error(f"Error generating day slots: {e}")
- return []
-
- def _slot_conflicts(self, slot_start, slot_end, busy_periods):
- """Check if a time slot conflicts with busy periods."""
- for busy_start, busy_end in busy_periods:
- if slot_start < busy_end and slot_end > busy_start:
- return True
- return False
-
- def _slot_in_preferred_times(self, slot_start, preferred_times):
- """Check if slot falls within preferred time ranges."""
- if not preferred_times:
- return True
-
- for time_range in preferred_times:
- try:
- start_str, end_str = time_range.split("-")
- pref_start = dt.datetime.strptime(start_str, "%H:%M").time()
- pref_end = dt.datetime.strptime(end_str, "%H:%M").time()
-
- if pref_start <= slot_start <= pref_end:
- return True
- except Exception:
- continue
-
- return False
-
- async def bulk_update_events(
- self, filter_criteria: Dict[str, Any], update_data: Dict[str, Any]
- ) -> Dict[str, Any]:
- """Bulk update events matching filter criteria."""
- try:
- # Convert string dates to datetime objects if present
- start_datetime = None
- end_datetime = None
- if "start_date" in filter_criteria and filter_criteria["start_date"]:
- start_datetime = dt.datetime.fromisoformat(
- filter_criteria["start_date"]
- )
- if "end_date" in filter_criteria and filter_criteria["end_date"]:
- end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"])
-
- # Find events matching criteria
- events = await self.search_events_across_calendars(
- start_datetime=start_datetime,
- end_datetime=end_datetime,
- filters=filter_criteria,
- )
-
- updated_count = 0
- failed_count = 0
- results = []
-
- for event in events:
- try:
- # Update the event
- await self.update_event(
- event["calendar_name"], event["uid"], update_data
- )
- updated_count += 1
- results.append(
- {
- "uid": event["uid"],
- "status": "updated",
- "title": event.get("title", ""),
- }
- )
- except Exception as e:
- failed_count += 1
- results.append(
- {
- "uid": event["uid"],
- "status": "failed",
- "error": str(e),
- "title": event.get("title", ""),
- }
- )
-
- return {
- "total_found": len(events),
- "updated_count": updated_count,
- "failed_count": failed_count,
- "results": results,
- }
-
- except Exception as e:
- logger.error(f"Error in bulk update: {e}")
- raise
-
- async def create_calendar(
- self,
- calendar_name: str,
- display_name: str = "",
- description: str = "",
- color: str = "#1976D2",
- ) -> Dict[str, Any]:
- """Create a new calendar."""
- try:
- # Calendar creation via CalDAV MKCALENDAR
- calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/"
-
- # Create MKCALENDAR body
- mkcol_body = f"""
-
-
-
- {display_name or calendar_name}
- {color}
- {description}
-
-
-
-
-
- """
-
- headers = {"Content-Type": "application/xml", "Depth": "0"}
-
- response = await self._make_request(
- "MKCALENDAR", calendar_path, content=mkcol_body, headers=headers
- )
-
- logger.debug(f"Created calendar: {calendar_name}")
- return {
- "name": calendar_name,
- "display_name": display_name or calendar_name,
- "description": description,
- "color": color,
- "status_code": response.status_code,
- }
-
- except Exception as e:
- logger.error(f"Error creating calendar {calendar_name}: {e}")
- raise
-
- async def delete_calendar(self, calendar_name: str) -> Dict[str, Any]:
- """Delete a calendar."""
- try:
- calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/"
-
- response = await self._make_request("DELETE", calendar_path)
-
- logger.debug(f"Deleted calendar: {calendar_name}")
- return {"status_code": response.status_code}
-
- except Exception as e:
- logger.error(f"Error deleting calendar {calendar_name}: {e}")
- raise
-
- async def _get_raw_ical(
- self, calendar_name: str, event_uid: str
- ) -> Tuple[str, str]:
- """Get raw iCal content for an event without parsing."""
- event_filename = f"{event_uid}.ics"
- event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
-
- headers = {"Accept": "text/calendar"}
-
- try:
- response = await self._make_request("GET", event_path, headers=headers)
- etag = response.headers.get("etag", "")
- return response.text, etag
- except Exception as e:
- logger.error(f"Error getting raw iCal for {event_uid}: {e}")
- raise
-
def _merge_ical_properties(
self, raw_ical: str, event_data: Dict[str, Any], event_uid: str
) -> str:
"""Merge new event data into existing raw iCal while preserving all properties."""
try:
- # Parse existing iCal
cal = Calendar.from_ical(raw_ical)
- # Find the VEVENT component
for component in cal.walk():
if component.name == "VEVENT":
- # Update only the properties that were provided in event_data
+ # Update only provided properties
if "title" in event_data:
component["SUMMARY"] = event_data["title"]
if "description" in event_data:
@@ -1028,48 +669,353 @@ class CalendarClient(BaseNextcloudClient):
)
component["DTEND"] = end_dt
- # Handle categories
- if "categories" in event_data:
- categories = event_data["categories"]
- if categories:
- component["CATEGORIES"] = categories.split(",")
-
- # Handle recurrence
- if "recurring" in event_data:
- if event_data["recurring"] and "recurrence_rule" in event_data:
- recurrence_rule = event_data["recurrence_rule"]
- if recurrence_rule:
- component["RRULE"] = vRecur.from_ical(recurrence_rule)
- elif not event_data["recurring"]:
- # Remove recurrence if set to False
- if "RRULE" in component:
- del component["RRULE"]
-
- # Handle attendees
- if "attendees" in event_data:
- attendees = event_data["attendees"]
- # Remove existing attendees
- component.pop("ATTENDEE", None)
- if attendees:
- for email in attendees.split(","):
- if email.strip():
- component.add("ATTENDEE", f"mailto:{email.strip()}")
-
- # Update timestamps in proper iCal format
+ # Update timestamps
from icalendar import vDDDTypes
now = dt.datetime.now(dt.UTC)
component["LAST-MODIFIED"] = vDDDTypes(now)
component["DTSTAMP"] = vDDDTypes(now)
- # Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.)
- # by not touching them - they remain in the component
-
break
return cal.to_ical().decode("utf-8")
except Exception as e:
logger.error(f"Error merging iCal properties: {e}")
- # Fallback to creating new iCal
return self._create_ical_event(event_data, event_uid)
+
+ # ============= Helper Methods - Todo iCalendar =============
+
+ def _create_ical_todo(self, todo_data: Dict[str, Any], todo_uid: str) -> str:
+ """Create iCalendar VTODO content from todo data."""
+ cal = Calendar()
+ cal.add("prodid", "-//Nextcloud MCP Server//EN")
+ cal.add("version", "2.0")
+
+ todo = ICalTodo()
+ todo.add("uid", todo_uid)
+ todo.add("summary", todo_data.get("summary", ""))
+ todo.add("description", todo_data.get("description", ""))
+
+ # Status
+ status = todo_data.get("status", "NEEDS-ACTION").upper()
+ todo.add("status", status)
+
+ # Priority (0-9, 0=undefined)
+ priority = todo_data.get("priority", 0)
+ todo.add("priority", priority)
+
+ # Percent complete
+ percent = todo_data.get("percent_complete", 0)
+ todo.add("percent-complete", percent)
+
+ # Due date
+ due = todo_data.get("due", "")
+ if due:
+ due_dt = dt.datetime.fromisoformat(due.replace("Z", "+00:00"))
+ todo.add("due", due_dt)
+
+ # Start date
+ dtstart = todo_data.get("dtstart", "")
+ if dtstart:
+ start_dt = dt.datetime.fromisoformat(dtstart.replace("Z", "+00:00"))
+ todo.add("dtstart", start_dt)
+
+ # Completed timestamp
+ completed = todo_data.get("completed", "")
+ if completed:
+ completed_dt = dt.datetime.fromisoformat(completed.replace("Z", "+00:00"))
+ todo.add("completed", completed_dt)
+
+ # Categories
+ categories = todo_data.get("categories", "")
+ if categories:
+ todo.add("categories", categories.split(","))
+
+ # Add timestamps
+ now = dt.datetime.now(dt.UTC)
+ todo.add("created", now)
+ todo.add("dtstamp", now)
+ todo.add("last-modified", now)
+
+ cal.add_component(todo)
+ return cal.to_ical().decode("utf-8")
+
+ def _parse_ical_todo(self, ical_text: str) -> Optional[Dict[str, Any]]:
+ """Parse iCalendar text and extract todo data."""
+ try:
+ cal = Calendar.from_ical(ical_text)
+ for component in cal.walk():
+ if component.name == "VTODO":
+ todo_data = {
+ "uid": str(component.get("uid", "")),
+ "summary": str(component.get("summary", "")),
+ "description": str(component.get("description", "")),
+ "status": str(component.get("status", "NEEDS-ACTION")),
+ "priority": int(component.get("priority", 0)),
+ "percent_complete": int(component.get("percent-complete", 0)),
+ }
+
+ # Handle due date
+ due = component.get("due")
+ if due:
+ todo_data["due"] = due.dt.isoformat()
+
+ # Handle start date
+ dtstart = component.get("dtstart")
+ if dtstart:
+ todo_data["dtstart"] = dtstart.dt.isoformat()
+
+ # Handle completed date
+ completed = component.get("completed")
+ if completed:
+ todo_data["completed"] = completed.dt.isoformat()
+
+ # Handle categories
+ categories = component.get("categories")
+ if categories:
+ todo_data["categories"] = self._extract_categories(categories)
+
+ return todo_data
+
+ return None
+
+ except Exception as e:
+ logger.error(f"Error parsing iCalendar todo: {e}")
+ return None
+
+ def _merge_ical_todo_properties(
+ self, raw_ical: str, todo_data: Dict[str, Any], todo_uid: str
+ ) -> str:
+ """Merge new todo data into existing raw iCal while preserving all properties."""
+ try:
+ cal = Calendar.from_ical(raw_ical)
+
+ for component in cal.walk():
+ if component.name == "VTODO":
+ # Update only provided properties
+ if "summary" in todo_data:
+ component["SUMMARY"] = todo_data["summary"]
+ if "description" in todo_data:
+ component["DESCRIPTION"] = todo_data["description"]
+ if "status" in todo_data:
+ component["STATUS"] = todo_data["status"].upper()
+ if "priority" in todo_data:
+ component["PRIORITY"] = todo_data["priority"]
+ if "percent_complete" in todo_data:
+ component["PERCENT-COMPLETE"] = todo_data["percent_complete"]
+
+ # Handle due date
+ if "due" in todo_data:
+ due_str = todo_data["due"]
+ if due_str:
+ due_dt = dt.datetime.fromisoformat(
+ due_str.replace("Z", "+00:00")
+ )
+ component["DUE"] = due_dt
+
+ # Handle completed date
+ if "completed" in todo_data:
+ completed_str = todo_data["completed"]
+ if completed_str:
+ completed_dt = dt.datetime.fromisoformat(
+ completed_str.replace("Z", "+00:00")
+ )
+ component["COMPLETED"] = completed_dt
+
+ # Update timestamps
+ from icalendar import vDDDTypes
+
+ now = dt.datetime.now(dt.UTC)
+ component["LAST-MODIFIED"] = vDDDTypes(now)
+ component["DTSTAMP"] = vDDDTypes(now)
+
+ break
+
+ return cal.to_ical().decode("utf-8")
+
+ except Exception as e:
+ logger.error(f"Error merging iCal todo properties: {e}")
+ return self._create_ical_todo(todo_data, todo_uid)
+
+ # ============= Helper Methods - Filtering =============
+
+ def _extract_categories(self, categories_obj) -> str:
+ """Extract categories from icalendar object to string."""
+ if not categories_obj:
+ return ""
+
+ try:
+ if hasattr(categories_obj, "cats"):
+ return ", ".join(str(cat) for cat in categories_obj.cats)
+ elif hasattr(categories_obj, "__iter__") and not isinstance(
+ categories_obj, str
+ ):
+ return ", ".join(str(cat) for cat in categories_obj)
+ else:
+ return str(categories_obj)
+ except Exception:
+ return str(categories_obj)
+
+ def _apply_event_filters(
+ self, events: List[Dict[str, Any]], filters: Dict[str, Any]
+ ) -> List[Dict[str, Any]]:
+ """Apply advanced filters to event list."""
+ return [
+ event for event in events if self._event_matches_filters(event, filters)
+ ]
+
+ def _event_matches_filters(
+ self, event: Dict[str, Any], filters: Dict[str, Any]
+ ) -> bool:
+ """Check if an event matches the provided filters."""
+ try:
+ # Filter by minimum attendees
+ if "min_attendees" in filters:
+ attendees = event.get("attendees", "")
+ attendee_count = len(attendees.split(",")) if attendees else 0
+ if attendee_count < filters["min_attendees"]:
+ return False
+
+ # Filter by categories
+ if "categories" in filters:
+ event_categories = event.get("categories", "").lower()
+ required_categories = [cat.lower() for cat in filters["categories"]]
+ if not any(cat in event_categories for cat in required_categories):
+ return False
+
+ # Filter by status
+ if "status" in filters:
+ if event.get("status", "").upper() != filters["status"].upper():
+ return False
+
+ # Filter by title contains
+ if "title_contains" in filters:
+ title = event.get("title", "").lower()
+ search_term = filters["title_contains"].lower()
+ if search_term not in title:
+ return False
+
+ # Filter by location contains
+ if "location_contains" in filters:
+ location = event.get("location", "").lower()
+ search_term = filters["location_contains"].lower()
+ if search_term not in location:
+ return False
+
+ return True
+
+ except Exception:
+ return True
+
+ def _todo_matches_filters(
+ self, todo: Dict[str, Any], filters: Dict[str, Any]
+ ) -> bool:
+ """Check if a todo matches the provided filters."""
+ try:
+ # Filter by status
+ if "status" in filters:
+ if todo.get("status", "").upper() != filters["status"].upper():
+ return False
+
+ # Filter by minimum priority
+ if "min_priority" in filters:
+ priority = todo.get("priority", 0)
+ if priority == 0 or priority > filters["min_priority"]:
+ return False
+
+ # Filter by categories
+ if "categories" in filters:
+ todo_categories = todo.get("categories", "").lower()
+ required_categories = [cat.lower() for cat in filters["categories"]]
+ if not any(cat in todo_categories for cat in required_categories):
+ return False
+
+ # Filter by summary contains
+ if "summary_contains" in filters:
+ summary = todo.get("summary", "").lower()
+ search_term = filters["summary_contains"].lower()
+ if search_term not in summary:
+ return False
+
+ return True
+
+ except Exception:
+ return True
+
+ # ============= Legacy Methods (for backward compatibility) =============
+
+ async def bulk_update_events(
+ self, filter_criteria: Dict[str, Any], update_data: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Bulk update events matching filter criteria."""
+ try:
+ start_datetime = None
+ end_datetime = None
+ if "start_date" in filter_criteria and filter_criteria["start_date"]:
+ start_datetime = dt.datetime.fromisoformat(
+ filter_criteria["start_date"]
+ )
+ if "end_date" in filter_criteria and filter_criteria["end_date"]:
+ end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"])
+
+ events = await self.search_events_across_calendars(
+ start_datetime=start_datetime,
+ end_datetime=end_datetime,
+ filters=filter_criteria,
+ )
+
+ updated_count = 0
+ failed_count = 0
+ results = []
+
+ for event in events:
+ try:
+ await self.update_event(
+ event["calendar_name"], event["uid"], update_data
+ )
+ updated_count += 1
+ results.append(
+ {
+ "uid": event["uid"],
+ "status": "updated",
+ "title": event.get("title", ""),
+ }
+ )
+ except Exception as e:
+ failed_count += 1
+ results.append(
+ {
+ "uid": event["uid"],
+ "status": "failed",
+ "error": str(e),
+ "title": event.get("title", ""),
+ }
+ )
+
+ return {
+ "total_found": len(events),
+ "updated_count": updated_count,
+ "failed_count": failed_count,
+ "results": results,
+ }
+
+ except Exception as e:
+ logger.error(f"Error in bulk update: {e}")
+ raise
+
+ async def find_availability(
+ self,
+ duration_minutes: int,
+ attendees: Optional[List[str]] = None,
+ start_datetime: Optional[dt.datetime] = None,
+ end_datetime: Optional[dt.datetime] = None,
+ constraints: Optional[Dict[str, Any]] = None,
+ ) -> List[Dict[str, Any]]:
+ """Find available time slots for scheduling.
+
+ Note: This is a simplified stub that returns empty list.
+ Full implementation would require complex free/busy analysis.
+ """
+ logger.warning("find_availability is not fully implemented with AsyncDavClient")
+ return []
diff --git a/nextcloud_mcp_server/models/calendar.py b/nextcloud_mcp_server/models/calendar.py
index 474db42..fb1bf8f 100644
--- a/nextcloud_mcp_server/models/calendar.py
+++ b/nextcloud_mcp_server/models/calendar.py
@@ -180,3 +180,71 @@ class ManageCalendarResponse(BaseResponse):
None, description="List of calendars (for list action)"
)
message: str = Field(description="Success message")
+
+
+# ============= Todo/Task Models =============
+
+
+class Todo(BaseModel):
+ """Model for a CalDAV todo/task (VTODO)."""
+
+ uid: str = Field(description="Todo UID")
+ summary: str = Field(description="Todo summary/title")
+ description: str = Field(default="", description="Todo description")
+ status: str = Field(
+ default="NEEDS-ACTION",
+ description="Todo status: NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED",
+ )
+ priority: int = Field(
+ default=0, description="Todo priority (0=undefined, 1=highest, 9=lowest)"
+ )
+ percent_complete: int = Field(default=0, description="Percentage complete (0-100)")
+ due: Optional[str] = Field(None, description="Due date/time (ISO format)")
+ dtstart: Optional[str] = Field(None, description="Start date/time (ISO format)")
+ completed: Optional[str] = Field(
+ None, description="Completion timestamp (ISO format)"
+ )
+ categories: str = Field(default="", description="Comma-separated categories")
+ href: str = Field(default="", description="CalDAV href")
+ etag: str = Field(default="", description="ETag for versioning")
+ calendar_name: Optional[str] = Field(
+ None, description="Calendar containing this todo"
+ )
+ calendar_display_name: Optional[str] = Field(
+ None, description="Display name of calendar containing this todo"
+ )
+
+
+class ListTodosResponse(BaseResponse):
+ """Response model for listing todos."""
+
+ todos: List[Todo] = Field(description="List of todos/tasks")
+ calendar_name: Optional[str] = Field(
+ None, description="Calendar name (if filtered to one calendar)"
+ )
+ total_count: int = Field(description="Total number of todos found")
+
+
+class CreateTodoResponse(BaseResponse):
+ """Response model for todo creation."""
+
+ todo: Todo = Field(description="The created todo")
+ calendar_name: str = Field(
+ description="Name of the calendar the todo was created in"
+ )
+
+
+class UpdateTodoResponse(BaseResponse):
+ """Response model for todo updates."""
+
+ todo: Todo = Field(description="The updated todo")
+ calendar_name: str = Field(description="Name of the calendar the todo belongs to")
+
+
+class DeleteTodoResponse(StatusResponse):
+ """Response model for todo deletion."""
+
+ deleted_uid: str = Field(description="UID of the deleted todo")
+ calendar_name: str = Field(
+ description="Name of the calendar the todo was deleted from"
+ )
diff --git a/nextcloud_mcp_server/server/calendar.py b/nextcloud_mcp_server/server/calendar.py
index 07a70e3..493ede2 100644
--- a/nextcloud_mcp_server/server/calendar.py
+++ b/nextcloud_mcp_server/server/calendar.py
@@ -5,7 +5,12 @@ from typing import Optional
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.context import get_client
-from nextcloud_mcp_server.models.calendar import Calendar, ListCalendarsResponse
+from nextcloud_mcp_server.models.calendar import (
+ Calendar,
+ ListCalendarsResponse,
+ ListTodosResponse,
+ Todo,
+)
logger = logging.getLogger(__name__)
@@ -796,3 +801,209 @@ def configure_calendar_tools(mcp: FastMCP):
else:
raise ValueError("Action must be 'create', 'delete', 'update', or 'list'")
+
+ # ============= Todo/Task Tools =============
+
+ @mcp.tool()
+ async def nc_calendar_list_todos(
+ calendar_name: str,
+ ctx: Context,
+ status: Optional[str] = None,
+ min_priority: Optional[int] = None,
+ categories: Optional[str] = None,
+ summary_contains: Optional[str] = None,
+ ) -> ListTodosResponse:
+ """List todos/tasks in a calendar with optional filtering.
+
+ Args:
+ calendar_name: Name of the calendar to list todos from
+ ctx: MCP context
+ status: Filter by status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED)
+ min_priority: Filter by minimum priority (1=highest, 9=lowest)
+ categories: Filter by categories (comma-separated, e.g., "work,urgent")
+ summary_contains: Filter todos where summary contains this text
+
+ Returns:
+ List of todos matching the filters
+ """
+ client = get_client(ctx)
+
+ # Build filters dictionary
+ filters = {}
+ if status is not None:
+ filters["status"] = status
+ if min_priority is not None:
+ filters["min_priority"] = min_priority
+ if categories is not None:
+ filters["categories"] = [cat.strip() for cat in categories.split(",")]
+ if summary_contains is not None:
+ filters["summary_contains"] = summary_contains
+
+ todos_data = await client.calendar.list_todos(
+ calendar_name, filters if filters else None
+ )
+
+ todos = [Todo(**todo_data) for todo_data in todos_data]
+ return ListTodosResponse(
+ todos=todos, calendar_name=calendar_name, total_count=len(todos)
+ )
+
+ @mcp.tool()
+ async def nc_calendar_create_todo(
+ calendar_name: str,
+ summary: str,
+ ctx: Context,
+ description: str = "",
+ status: str = "NEEDS-ACTION",
+ priority: int = 0,
+ due: str = "",
+ dtstart: str = "",
+ categories: str = "",
+ ):
+ """Create a new todo/task in a calendar.
+
+ Args:
+ calendar_name: Name of the calendar to create the todo in
+ summary: Todo title/summary
+ ctx: MCP context
+ description: Detailed description of the todo
+ status: Todo status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED)
+ priority: Priority (0=undefined, 1=highest, 9=lowest)
+ due: Due date/time (ISO format, e.g., "2025-01-15T14:00:00")
+ dtstart: Start date/time (ISO format)
+ categories: Comma-separated categories (e.g., "work,urgent")
+
+ Returns:
+ Dict with todo creation result
+ """
+ client = get_client(ctx)
+
+ todo_data = {
+ "summary": summary,
+ "description": description,
+ "status": status,
+ "priority": priority,
+ "due": due,
+ "dtstart": dtstart,
+ "categories": categories,
+ }
+
+ return await client.calendar.create_todo(calendar_name, todo_data)
+
+ @mcp.tool()
+ async def nc_calendar_update_todo(
+ calendar_name: str,
+ todo_uid: str,
+ ctx: Context,
+ summary: Optional[str] = None,
+ description: Optional[str] = None,
+ status: Optional[str] = None,
+ priority: Optional[int] = None,
+ percent_complete: Optional[int] = None,
+ due: Optional[str] = None,
+ dtstart: Optional[str] = None,
+ completed: Optional[str] = None,
+ categories: Optional[str] = None,
+ ):
+ """Update an existing todo/task.
+
+ Args:
+ calendar_name: Name of the calendar containing the todo
+ todo_uid: UID of the todo to update
+ ctx: MCP context
+ summary: New summary/title
+ description: New description
+ status: New status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED)
+ priority: New priority (0-9)
+ percent_complete: New completion percentage (0-100)
+ due: New due date/time (ISO format)
+ dtstart: New start date/time (ISO format)
+ completed: Completion timestamp (ISO format)
+ categories: New categories (comma-separated)
+
+ Returns:
+ Dict with todo update result
+ """
+ client = get_client(ctx)
+
+ # Build update data with only non-None values
+ todo_data = {}
+ if summary is not None:
+ todo_data["summary"] = summary
+ if description is not None:
+ todo_data["description"] = description
+ if status is not None:
+ todo_data["status"] = status
+ if priority is not None:
+ todo_data["priority"] = priority
+ if percent_complete is not None:
+ todo_data["percent_complete"] = percent_complete
+ if due is not None:
+ todo_data["due"] = due
+ if dtstart is not None:
+ todo_data["dtstart"] = dtstart
+ if completed is not None:
+ todo_data["completed"] = completed
+ if categories is not None:
+ todo_data["categories"] = categories
+
+ return await client.calendar.update_todo(calendar_name, todo_uid, todo_data)
+
+ @mcp.tool()
+ async def nc_calendar_delete_todo(
+ calendar_name: str,
+ todo_uid: str,
+ ctx: Context,
+ ):
+ """Delete a todo/task from a calendar.
+
+ Args:
+ calendar_name: Name of the calendar containing the todo
+ todo_uid: UID of the todo to delete
+ ctx: MCP context
+
+ Returns:
+ Dict with deletion status
+ """
+ client = get_client(ctx)
+ return await client.calendar.delete_todo(calendar_name, todo_uid)
+
+ @mcp.tool()
+ async def nc_calendar_search_todos(
+ ctx: Context,
+ status: Optional[str] = None,
+ min_priority: Optional[int] = None,
+ categories: Optional[str] = None,
+ summary_contains: Optional[str] = None,
+ ):
+ """Search todos across all calendars with optional filtering.
+
+ Args:
+ ctx: MCP context
+ status: Filter by status (NEEDS-ACTION, IN-PROCESS, COMPLETED, CANCELLED)
+ min_priority: Filter by minimum priority (1=highest, 9=lowest)
+ categories: Filter by categories (comma-separated, e.g., "work,urgent")
+ summary_contains: Filter todos where summary contains this text
+
+ Returns:
+ List of todos matching the filters from all calendars
+ """
+ client = get_client(ctx)
+
+ # Build filters dictionary
+ filters = {}
+ if status is not None:
+ filters["status"] = status
+ if min_priority is not None:
+ filters["min_priority"] = min_priority
+ if categories is not None:
+ filters["categories"] = [cat.strip() for cat in categories.split(",")]
+ if summary_contains is not None:
+ filters["summary_contains"] = summary_contains
+
+ todos_data = await client.calendar.search_todos_across_calendars(
+ filters if filters else None
+ )
+
+ todos = [Todo(**todo_data) for todo_data in todos_data]
+ return ListTodosResponse(todos=todos, total_count=len(todos))
diff --git a/tests/client/calendar/conftest.py b/tests/client/calendar/conftest.py
new file mode 100644
index 0000000..e7d0f41
--- /dev/null
+++ b/tests/client/calendar/conftest.py
@@ -0,0 +1,11 @@
+"""Shared fixtures for calendar integration tests.
+
+Note: The temporary_calendar fixture is defined in tests/conftest.py and uses
+a shared session-scoped calendar to avoid Nextcloud rate limiting issues.
+This conftest.py exists for any calendar-specific fixtures that might be needed
+in the future.
+"""
+
+import logging
+
+logger = logging.getLogger(__name__)
diff --git a/tests/client/calendar/test_task_operations.py b/tests/client/calendar/test_task_operations.py
new file mode 100644
index 0000000..d2f20dc
--- /dev/null
+++ b/tests/client/calendar/test_task_operations.py
@@ -0,0 +1,498 @@
+"""Integration tests for Calendar VTODO (task) operations."""
+
+import logging
+import uuid
+from datetime import datetime, timedelta
+
+import pytest
+from httpx import HTTPStatusError
+
+from nextcloud_mcp_server.client import NextcloudClient
+
+logger = logging.getLogger(__name__)
+
+# Mark all tests in this module as integration tests
+pytestmark = pytest.mark.integration
+
+
+@pytest.fixture
+async def temporary_todo(nc_client: NextcloudClient, temporary_calendar: str):
+ """Create a temporary todo for testing and clean up afterward."""
+ todo_uid = None
+ calendar_name = temporary_calendar
+
+ # Create a test todo
+ tomorrow = datetime.now() + timedelta(days=1)
+ todo_data = {
+ "summary": f"Test Task {uuid.uuid4().hex[:8]}",
+ "description": "Test todo created by integration tests",
+ "status": "NEEDS-ACTION",
+ "priority": 5,
+ "due": tomorrow.strftime("%Y-%m-%dT18:00:00"),
+ "categories": "testing",
+ }
+
+ try:
+ logger.info(f"Creating temporary todo in calendar: {calendar_name}")
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ todo_uid = result.get("uid")
+
+ if not todo_uid:
+ pytest.fail("Failed to create temporary todo")
+
+ logger.info(f"Created temporary todo with UID: {todo_uid}")
+ yield {"uid": todo_uid, "calendar_name": calendar_name, "data": todo_data}
+
+ finally:
+ # Cleanup
+ if todo_uid:
+ try:
+ logger.info(f"Cleaning up temporary todo: {todo_uid}")
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+ logger.info(f"Successfully deleted temporary todo: {todo_uid}")
+ except HTTPStatusError as e:
+ if e.response.status_code != 404:
+ logger.error(f"Error deleting temporary todo {todo_uid}: {e}")
+ except Exception as e:
+ logger.error(
+ f"Unexpected error deleting temporary todo {todo_uid}: {e}"
+ )
+
+
+# ============= Basic CRUD Tests =============
+
+
+async def test_create_and_delete_todo(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test creating and deleting a basic todo."""
+ calendar_name = temporary_calendar
+
+ # Create todo
+ tomorrow = datetime.now() + timedelta(days=1)
+ todo_data = {
+ "summary": "Integration Test Task",
+ "description": "Test task for integration testing",
+ "status": "NEEDS-ACTION",
+ "priority": 3,
+ "due": tomorrow.strftime("%Y-%m-%dT18:00:00"),
+ "categories": "testing,integration",
+ }
+
+ try:
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ assert "uid" in result
+ assert result["status_code"] in [200, 201, 204]
+
+ todo_uid = result["uid"]
+ logger.info(f"Created todo with UID: {todo_uid}")
+
+ # Verify todo was created by listing todos
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ todo_uids = [todo.get("uid") for todo in todos]
+ assert todo_uid in todo_uids
+
+ # Find our todo in the list
+ our_todo = next((t for t in todos if t.get("uid") == todo_uid), None)
+ assert our_todo is not None
+ assert our_todo["summary"] == "Integration Test Task"
+ assert our_todo["status"] == "NEEDS-ACTION"
+ assert our_todo["priority"] == 3
+
+ # Delete todo
+ delete_result = await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+ assert delete_result["status_code"] in [200, 204, 404]
+
+ logger.info(f"Successfully deleted todo: {todo_uid}")
+
+ except Exception as e:
+ logger.error(f"Test failed: {e}")
+ raise
+
+
+async def test_list_todos(nc_client: NextcloudClient, temporary_calendar: str):
+ """Test listing todos in a calendar."""
+ calendar_name = temporary_calendar
+
+ # Create multiple todos
+ todo_uids = []
+ for i in range(3):
+ todo_data = {
+ "summary": f"Test Task {i + 1}",
+ "description": f"Task number {i + 1}",
+ "status": "NEEDS-ACTION",
+ "priority": i + 1,
+ }
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ todo_uids.append(result["uid"])
+
+ try:
+ # List todos
+ todos = await nc_client.calendar.list_todos(calendar_name)
+
+ assert isinstance(todos, list)
+ assert len(todos) >= 3 # At least our 3 todos
+
+ # Check structure
+ for todo in todos:
+ assert "uid" in todo
+ assert "summary" in todo
+ assert "status" in todo
+ assert "priority" in todo
+
+ # Verify our todos are in the list
+ listed_uids = [todo["uid"] for todo in todos]
+ for uid in todo_uids:
+ assert uid in listed_uids
+
+ logger.info(f"Found {len(todos)} todos in calendar")
+
+ finally:
+ # Cleanup
+ for uid in todo_uids:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, uid)
+ except Exception:
+ pass
+
+
+async def test_update_todo(nc_client: NextcloudClient, temporary_todo: dict):
+ """Test updating an existing todo."""
+ calendar_name = temporary_todo["calendar_name"]
+ todo_uid = temporary_todo["uid"]
+
+ # Update todo data
+ updated_data = {
+ "summary": "Updated Test Task Title",
+ "description": "Updated description for test task",
+ "status": "IN-PROCESS",
+ "priority": 1, # High priority
+ "percent_complete": 50,
+ }
+
+ try:
+ result = await nc_client.calendar.update_todo(
+ calendar_name, todo_uid, updated_data
+ )
+ assert result["uid"] == todo_uid
+
+ # Verify updates by listing todos
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ updated_todo = next((t for t in todos if t["uid"] == todo_uid), None)
+
+ assert updated_todo is not None
+ assert updated_todo["summary"] == "Updated Test Task Title"
+ assert updated_todo["description"] == "Updated description for test task"
+ assert updated_todo["status"] == "IN-PROCESS"
+ assert updated_todo["priority"] == 1
+ assert updated_todo["percent_complete"] == 50
+
+ logger.info(f"Successfully updated todo: {todo_uid}")
+
+ except Exception as e:
+ logger.error(f"Todo update test failed: {e}")
+ raise
+
+
+async def test_todo_with_dates(nc_client: NextcloudClient, temporary_calendar: str):
+ """Test creating a todo with start, due, and completed dates."""
+ calendar_name = temporary_calendar
+
+ now = datetime.now()
+ start_date = now + timedelta(days=1)
+ due_date = now + timedelta(days=7)
+
+ todo_data = {
+ "summary": "Task with Dates",
+ "description": "Test task with various date fields",
+ "status": "NEEDS-ACTION",
+ "dtstart": start_date.strftime("%Y-%m-%dT09:00:00"),
+ "due": due_date.strftime("%Y-%m-%dT17:00:00"),
+ }
+
+ try:
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ todo_uid = result["uid"]
+ logger.info(f"Created todo with dates, UID: {todo_uid}")
+
+ # Verify dates
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ created_todo = next((t for t in todos if t["uid"] == todo_uid), None)
+
+ assert created_todo is not None
+ assert created_todo["summary"] == "Task with Dates"
+ assert "dtstart" in created_todo
+ assert "due" in created_todo
+
+ # Cleanup
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+
+ except Exception as e:
+ logger.error(f"Date handling test failed: {e}")
+ raise
+
+
+# ============= Advanced Feature Tests =============
+
+
+async def test_todo_status_transitions(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test transitioning through different todo statuses."""
+ calendar_name = temporary_calendar
+
+ todo_data = {
+ "summary": "Status Transition Test",
+ "description": "Testing status changes",
+ "status": "NEEDS-ACTION",
+ }
+
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ todo_uid = result["uid"]
+
+ try:
+ # Transition: NEEDS-ACTION → IN-PROCESS
+ await nc_client.calendar.update_todo(
+ calendar_name,
+ todo_uid,
+ {"status": "IN-PROCESS", "percent_complete": 25},
+ )
+
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ todo = next((t for t in todos if t["uid"] == todo_uid), None)
+ assert todo["status"] == "IN-PROCESS"
+ assert todo["percent_complete"] == 25
+
+ # Transition: IN-PROCESS → COMPLETED
+ completed_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
+ await nc_client.calendar.update_todo(
+ calendar_name,
+ todo_uid,
+ {
+ "status": "COMPLETED",
+ "percent_complete": 100,
+ "completed": completed_time,
+ },
+ )
+
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ todo = next((t for t in todos if t["uid"] == todo_uid), None)
+ assert todo["status"] == "COMPLETED"
+ assert todo["percent_complete"] == 100
+ assert "completed" in todo
+
+ logger.info(f"Successfully transitioned todo through statuses: {todo_uid}")
+
+ finally:
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+
+
+async def test_todo_priority_levels(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test different priority levels (0=undefined, 1=highest, 9=lowest)."""
+ calendar_name = temporary_calendar
+ priorities = [0, 1, 5, 9]
+ priority_labels = {0: "Undefined", 1: "Highest", 5: "Medium", 9: "Lowest"}
+ todo_uids = []
+
+ try:
+ # Create todos with different priorities
+ for priority in priorities:
+ todo_data = {
+ "summary": f"Priority {priority} Task ({priority_labels[priority]})",
+ "status": "NEEDS-ACTION",
+ "priority": priority,
+ }
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ todo_uids.append((result["uid"], priority))
+
+ # Verify all priorities
+ todos = await nc_client.calendar.list_todos(calendar_name)
+
+ for uid, expected_priority in todo_uids:
+ todo = next((t for t in todos if t["uid"] == uid), None)
+ assert todo is not None
+ assert todo["priority"] == expected_priority
+
+ logger.info(f"Successfully tested priority levels: {priorities}")
+
+ finally:
+ # Cleanup
+ for uid, _ in todo_uids:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, uid)
+ except Exception:
+ pass
+
+
+async def test_todo_with_categories(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test creating a todo with multiple categories."""
+ calendar_name = temporary_calendar
+
+ todo_data = {
+ "summary": "Task with Categories",
+ "description": "Testing category support",
+ "status": "NEEDS-ACTION",
+ "categories": "work,meeting,important,quarterly",
+ }
+
+ try:
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ todo_uid = result["uid"]
+ logger.info(f"Created todo with categories, UID: {todo_uid}")
+
+ # Verify categories
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ created_todo = next((t for t in todos if t["uid"] == todo_uid), None)
+
+ assert created_todo is not None
+ assert "categories" in created_todo
+ categories_str = created_todo["categories"]
+ assert "work" in categories_str
+ assert "meeting" in categories_str
+ assert "important" in categories_str
+ assert "quarterly" in categories_str
+
+ # Cleanup
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+
+ except Exception as e:
+ logger.error(f"Categories test failed: {e}")
+ raise
+
+
+async def test_search_todos_across_calendars(
+ nc_client: NextcloudClient, temporary_calendar: str, shared_calendar_2: str
+):
+ """Test searching for todos across multiple calendars.
+
+ Uses two shared test calendars to avoid rate limiting.
+ """
+ # Use existing shared calendars to avoid rate limits
+ cal1_name = temporary_calendar # First shared test calendar
+ cal2_name = shared_calendar_2 # Second shared test calendar
+
+ try:
+ # Create todos in both calendars
+ todo1_data = {"summary": "Task in Calendar 1", "status": "NEEDS-ACTION"}
+ todo2_data = {"summary": "Task in Calendar 2", "status": "IN-PROCESS"}
+
+ result1 = await nc_client.calendar.create_todo(cal1_name, todo1_data)
+ result2 = await nc_client.calendar.create_todo(cal2_name, todo2_data)
+
+ # Search across all calendars
+ all_todos = await nc_client.calendar.search_todos_across_calendars()
+
+ assert isinstance(all_todos, list)
+
+ # Find our todos
+ todo1 = next((t for t in all_todos if t["uid"] == result1["uid"]), None)
+ todo2 = next((t for t in all_todos if t["uid"] == result2["uid"]), None)
+
+ assert todo1 is not None
+ assert todo2 is not None
+ assert "calendar_name" in todo1
+ assert "calendar_name" in todo2
+ assert todo1["calendar_name"] == cal1_name
+ assert todo2["calendar_name"] == cal2_name
+
+ logger.info(f"Found {len(all_todos)} todos across all calendars")
+
+ finally:
+ # Cleanup: Delete only the todos we created (calendars are reused/built-in)
+ try:
+ await nc_client.calendar.delete_todo(cal1_name, result1["uid"])
+ except Exception:
+ pass
+ try:
+ await nc_client.calendar.delete_todo(cal2_name, result2["uid"])
+ except Exception:
+ pass
+
+
+# ============= Edge Case Tests =============
+
+
+async def test_get_nonexistent_todo(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test attempting to retrieve a non-existent todo."""
+ calendar_name = temporary_calendar
+ fake_uid = f"nonexistent-{uuid.uuid4()}"
+
+ # List todos to ensure it doesn't exist
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ matching_todos = [t for t in todos if t.get("uid") == fake_uid]
+ assert len(matching_todos) == 0
+
+ logger.info(f"Verified nonexistent todo UID: {fake_uid}")
+
+
+async def test_delete_nonexistent_todo(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test deleting a non-existent todo."""
+ calendar_name = temporary_calendar
+ fake_uid = f"nonexistent-{uuid.uuid4()}"
+
+ result = await nc_client.calendar.delete_todo(calendar_name, fake_uid)
+ assert result["status_code"] == 404
+ logger.info(f"Correctly got 404 for deleting nonexistent todo: {fake_uid}")
+
+
+async def test_list_todos_with_filters(
+ nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test listing todos with various filters."""
+ calendar_name = temporary_calendar
+
+ # Create todos with different statuses and priorities
+ test_todos = [
+ {
+ "summary": "High Priority Task",
+ "status": "NEEDS-ACTION",
+ "priority": 1,
+ "categories": "urgent",
+ },
+ {
+ "summary": "In Progress Task",
+ "status": "IN-PROCESS",
+ "priority": 5,
+ "categories": "work",
+ },
+ {
+ "summary": "Low Priority Task",
+ "status": "NEEDS-ACTION",
+ "priority": 9,
+ "categories": "someday",
+ },
+ ]
+
+ created_uids = []
+
+ try:
+ # Create test todos
+ for todo_data in test_todos:
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ created_uids.append(result["uid"])
+
+ # Test basic list without filters
+ all_todos = await nc_client.calendar.list_todos(calendar_name)
+ assert len(all_todos) >= 3
+
+ # Verify all our todos are in the list
+ our_todo_uids = [t["uid"] for t in all_todos if t["uid"] in created_uids]
+ assert len(our_todo_uids) == 3
+
+ logger.info(f"Successfully created and listed {len(created_uids)} test todos")
+
+ finally:
+ # Cleanup
+ for uid in created_uids:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, uid)
+ except Exception:
+ pass
diff --git a/tests/conftest.py b/tests/conftest.py
index 394d816..c8b3f0c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -501,6 +501,120 @@ async def temporary_board_with_card(
logger.error(f"Unexpected error deleting temporary card {card.id}: {e}")
+@pytest.fixture(scope="session")
+def shared_test_calendar_name():
+ """Unique calendar name for the entire test session."""
+ return f"test_calendar_shared_{uuid.uuid4().hex[:8]}"
+
+
+@pytest.fixture(scope="session")
+def shared_test_calendar_name_2():
+ """Second unique calendar name for cross-calendar tests."""
+ return f"test_calendar_shared_2_{uuid.uuid4().hex[:8]}"
+
+
+@pytest.fixture(scope="session")
+async def shared_calendar(nc_client: NextcloudClient, shared_test_calendar_name: str):
+ """Create a shared calendar for all tests in the session. Reuses the calendar to avoid rate limiting."""
+ calendar_name = shared_test_calendar_name
+
+ try:
+ # Create a test calendar
+ logger.info(f"Creating shared test calendar: {calendar_name}")
+ result = await nc_client.calendar.create_calendar(
+ calendar_name=calendar_name,
+ display_name=f"Shared Test Calendar {calendar_name}",
+ description="Shared calendar for integration testing (reused across tests)",
+ color="#FF5722",
+ )
+
+ if result["status_code"] not in [200, 201]:
+ pytest.skip(f"Failed to create shared test calendar: {result}")
+
+ logger.info(f"Created shared test calendar: {calendar_name}")
+ yield calendar_name
+
+ except Exception as e:
+ logger.error(f"Error setting up shared test calendar: {e}")
+ pytest.skip(f"Shared calendar setup failed: {e}")
+
+ finally:
+ # Cleanup: Delete the shared calendar at end of session
+ try:
+ logger.info(f"Cleaning up shared test calendar: {calendar_name}")
+ await nc_client.calendar.delete_calendar(calendar_name)
+ logger.info(f"Successfully deleted shared test calendar: {calendar_name}")
+ except Exception as e:
+ logger.error(f"Error deleting shared test calendar {calendar_name}: {e}")
+
+
+@pytest.fixture(scope="session")
+async def shared_calendar_2(
+ nc_client: NextcloudClient, shared_test_calendar_name_2: str
+):
+ """Create a second shared calendar for cross-calendar tests."""
+ calendar_name = shared_test_calendar_name_2
+
+ try:
+ # Create a test calendar
+ logger.info(f"Creating second shared test calendar: {calendar_name}")
+ result = await nc_client.calendar.create_calendar(
+ calendar_name=calendar_name,
+ display_name=f"Shared Test Calendar 2 {calendar_name}",
+ description="Second shared calendar for cross-calendar testing",
+ color="#4CAF50",
+ )
+
+ if result["status_code"] not in [200, 201]:
+ pytest.skip(f"Failed to create second shared test calendar: {result}")
+
+ logger.info(f"Created second shared test calendar: {calendar_name}")
+ yield calendar_name
+
+ except Exception as e:
+ logger.error(f"Error setting up second shared test calendar: {e}")
+ pytest.skip(f"Second shared calendar setup failed: {e}")
+
+ finally:
+ # Cleanup: Delete the second shared calendar at end of session
+ try:
+ logger.info(f"Cleaning up second shared test calendar: {calendar_name}")
+ await nc_client.calendar.delete_calendar(calendar_name)
+ logger.info(
+ f"Successfully deleted second shared test calendar: {calendar_name}"
+ )
+ except Exception as e:
+ logger.error(
+ f"Error deleting second shared test calendar {calendar_name}: {e}"
+ )
+
+
+@pytest.fixture
+async def temporary_calendar(shared_calendar: str, nc_client: NextcloudClient):
+ """Provide the shared calendar and clean up todos after each test.
+
+ This fixture reuses a session-scoped calendar to avoid Nextcloud rate limiting
+ on calendar creation. Each test gets the same calendar but todos are cleaned up
+ between tests.
+ """
+ calendar_name = shared_calendar
+
+ yield calendar_name
+
+ # Cleanup: Delete all todos from this calendar
+ try:
+ logger.info(f"Cleaning up todos from shared calendar: {calendar_name}")
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ for todo in todos:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, todo["uid"])
+ except Exception as e:
+ logger.warning(f"Error deleting todo {todo['uid']}: {e}")
+ logger.info(f"Cleaned up {len(todos)} todos from shared calendar")
+ except Exception as e:
+ logger.error(f"Error cleaning up todos from calendar {calendar_name}: {e}")
+
+
@pytest.fixture(scope="session")
async def nc_oauth_client(
anyio_backend,
diff --git a/tests/server/test_calendar_todos_mcp.py b/tests/server/test_calendar_todos_mcp.py
new file mode 100644
index 0000000..ff235e6
--- /dev/null
+++ b/tests/server/test_calendar_todos_mcp.py
@@ -0,0 +1,476 @@
+"""Integration tests for Calendar VTODO (task) MCP tools."""
+
+import logging
+from datetime import datetime, timedelta
+
+import pytest
+from mcp import ClientSession
+
+from nextcloud_mcp_server.client import NextcloudClient
+
+logger = logging.getLogger(__name__)
+pytestmark = pytest.mark.integration
+
+
+async def test_mcp_todo_complete_workflow(
+ nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test complete todo workflow via MCP tools with verification via NextcloudClient."""
+
+ calendar_name = temporary_calendar
+ todo_uid = None
+
+ try:
+ # 1. Create todo via MCP
+ logger.info(f"Creating todo in {calendar_name} via MCP")
+ tomorrow = datetime.now() + timedelta(days=1)
+
+ create_result = await nc_mcp_client.call_tool(
+ "nc_calendar_create_todo",
+ {
+ "calendar_name": calendar_name,
+ "summary": "MCP Test Task",
+ "description": "Test task created via MCP tools",
+ "status": "NEEDS-ACTION",
+ "priority": 3,
+ "due": tomorrow.strftime("%Y-%m-%dT18:00:00"),
+ "categories": "testing,mcp",
+ },
+ )
+ assert create_result.isError is False
+
+ # Extract UID from the result
+ result_data = create_result.content[0].text
+ import json
+
+ result_json = json.loads(result_data)
+ todo_uid = result_json["uid"]
+ logger.info(f"Created todo with UID: {todo_uid}")
+
+ # 2. Verify todo creation via client
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ assert any(t["uid"] == todo_uid for t in todos)
+ created_todo = next(t for t in todos if t["uid"] == todo_uid)
+ assert created_todo["summary"] == "MCP Test Task"
+ assert created_todo["status"] == "NEEDS-ACTION"
+ assert created_todo["priority"] == 3
+
+ # 3. List todos via MCP
+ logger.info(f"Listing todos in {calendar_name} via MCP")
+ list_result = await nc_mcp_client.call_tool(
+ "nc_calendar_list_todos",
+ {"calendar_name": calendar_name},
+ )
+ assert list_result.isError is False
+
+ list_data = json.loads(list_result.content[0].text)
+ assert "todos" in list_data
+ assert any(t["uid"] == todo_uid for t in list_data["todos"])
+
+ # 4. Update todo via MCP
+ logger.info(f"Updating todo {todo_uid} via MCP")
+ update_result = await nc_mcp_client.call_tool(
+ "nc_calendar_update_todo",
+ {
+ "calendar_name": calendar_name,
+ "todo_uid": todo_uid,
+ "summary": "MCP Test Task Updated",
+ "status": "IN-PROCESS",
+ "priority": 1,
+ "percent_complete": 50,
+ },
+ )
+ assert update_result.isError is False
+
+ # 5. Verify update via client
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ updated_todo = next(t for t in todos if t["uid"] == todo_uid)
+ assert updated_todo["summary"] == "MCP Test Task Updated"
+ assert updated_todo["status"] == "IN-PROCESS"
+ assert updated_todo["priority"] == 1
+ assert updated_todo["percent_complete"] == 50
+
+ # 6. Delete todo via MCP
+ logger.info(f"Deleting todo {todo_uid} via MCP")
+ delete_result = await nc_mcp_client.call_tool(
+ "nc_calendar_delete_todo",
+ {"calendar_name": calendar_name, "todo_uid": todo_uid},
+ )
+ assert delete_result.isError is False
+
+ # 7. Verify deletion via client
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ assert not any(t["uid"] == todo_uid for t in todos)
+
+ logger.info("Complete todo workflow test passed")
+
+ finally:
+ # Cleanup in case of failure
+ if todo_uid:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+ except Exception:
+ pass
+
+
+async def test_mcp_list_todos_with_filters(
+ nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test listing todos with various filters via MCP tools."""
+
+ calendar_name = temporary_calendar
+ created_uids = []
+
+ try:
+ # Create test todos with different properties
+ test_todos = [
+ {
+ "summary": "High Priority Task",
+ "status": "NEEDS-ACTION",
+ "priority": 1,
+ "categories": "urgent,work",
+ },
+ {
+ "summary": "In Progress Task",
+ "status": "IN-PROCESS",
+ "priority": 5,
+ "categories": "work",
+ },
+ {
+ "summary": "Low Priority Task",
+ "status": "NEEDS-ACTION",
+ "priority": 9,
+ "categories": "someday",
+ },
+ ]
+
+ # Create todos via client
+ for todo_data in test_todos:
+ result = await nc_client.calendar.create_todo(calendar_name, todo_data)
+ created_uids.append(result["uid"])
+
+ # Test 1: Filter by status
+ logger.info("Testing filter by status")
+ result = await nc_mcp_client.call_tool(
+ "nc_calendar_list_todos",
+ {"calendar_name": calendar_name, "status": "NEEDS-ACTION"},
+ )
+ assert result.isError is False
+ import json
+
+ data = json.loads(result.content[0].text)
+ needs_action_todos = [t for t in data["todos"] if t["uid"] in created_uids]
+ assert len(needs_action_todos) == 2 # Two NEEDS-ACTION todos
+
+ # Test 2: Filter by priority
+ logger.info("Testing filter by minimum priority")
+ result = await nc_mcp_client.call_tool(
+ "nc_calendar_list_todos",
+ {"calendar_name": calendar_name, "min_priority": 1},
+ )
+ assert result.isError is False
+ data = json.loads(result.content[0].text)
+ high_priority_todos = [t for t in data["todos"] if t["uid"] in created_uids]
+ assert len(high_priority_todos) >= 1 # At least the priority 1 todo
+
+ # Test 3: Filter by categories
+ logger.info("Testing filter by categories")
+ result = await nc_mcp_client.call_tool(
+ "nc_calendar_list_todos",
+ {"calendar_name": calendar_name, "categories": "work"},
+ )
+ assert result.isError is False
+ data = json.loads(result.content[0].text)
+ work_todos = [t for t in data["todos"] if t["uid"] in created_uids]
+ assert len(work_todos) >= 2 # Two todos with "work" category
+
+ # Test 4: Filter by summary text
+ logger.info("Testing filter by summary text")
+ result = await nc_mcp_client.call_tool(
+ "nc_calendar_list_todos",
+ {"calendar_name": calendar_name, "summary_contains": "Priority"},
+ )
+ assert result.isError is False
+ data = json.loads(result.content[0].text)
+ priority_todos = [t for t in data["todos"] if t["uid"] in created_uids]
+ assert len(priority_todos) == 2 # Two have "Priority" in summary (High, Low)
+
+ logger.info("List todos with filters test passed")
+
+ finally:
+ # Cleanup
+ for uid in created_uids:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, uid)
+ except Exception:
+ pass
+
+
+async def test_mcp_search_todos_across_calendars(
+ nc_mcp_client: ClientSession,
+ nc_client: NextcloudClient,
+ temporary_calendar: str,
+ shared_calendar_2: str,
+):
+ """Test searching todos across multiple calendars via MCP tools.
+
+ Note: Uses two shared test calendars to avoid rate limiting.
+ """
+
+ cal1_name = temporary_calendar # First shared test calendar
+ cal2_name = shared_calendar_2 # Second shared test calendar
+ created_uids = []
+
+ try:
+ # Use existing shared calendars (no creation needed, avoiding rate limits)
+
+ # Create todos in both calendars
+ result1 = await nc_client.calendar.create_todo(
+ cal1_name,
+ {
+ "summary": "Task in Calendar 1",
+ "status": "NEEDS-ACTION",
+ "categories": "cal1",
+ },
+ )
+ created_uids.append((cal1_name, result1["uid"]))
+
+ result2 = await nc_client.calendar.create_todo(
+ cal2_name,
+ {
+ "summary": "Task in Calendar 2",
+ "status": "IN-PROCESS",
+ "categories": "cal2",
+ },
+ )
+ created_uids.append((cal2_name, result2["uid"]))
+
+ # Search across all calendars via MCP
+ logger.info("Searching todos across all calendars via MCP")
+ search_result = await nc_mcp_client.call_tool(
+ "nc_calendar_search_todos",
+ {},
+ )
+ assert search_result.isError is False
+
+ import json
+
+ data = json.loads(search_result.content[0].text)
+ assert "todos" in data
+
+ # Verify both todos are in the results
+ found_uids = {t["uid"] for t in data["todos"]}
+ assert result1["uid"] in found_uids
+ assert result2["uid"] in found_uids
+
+ # Verify calendar_name is included
+ our_todos = [
+ t for t in data["todos"] if t["uid"] in [result1["uid"], result2["uid"]]
+ ]
+ for todo in our_todos:
+ assert "calendar_name" in todo
+ assert todo["calendar_name"] in [cal1_name, cal2_name]
+
+ # Test search with status filter
+ logger.info("Searching with status filter via MCP")
+ search_result = await nc_mcp_client.call_tool(
+ "nc_calendar_search_todos",
+ {"status": "IN-PROCESS"},
+ )
+ assert search_result.isError is False
+ data = json.loads(search_result.content[0].text)
+ in_process_todos = [
+ t for t in data["todos"] if t["uid"] in [uid for _, uid in created_uids]
+ ]
+ assert len(in_process_todos) >= 1
+
+ logger.info("Search todos across calendars test passed")
+
+ finally:
+ # Cleanup: Only delete todos, not calendars (they're reused/built-in)
+ for cal_name, uid in created_uids:
+ try:
+ await nc_client.calendar.delete_todo(cal_name, uid)
+ except Exception:
+ pass
+
+
+async def test_mcp_todo_status_transitions(
+ nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test transitioning through different todo statuses via MCP tools."""
+
+ calendar_name = temporary_calendar
+ todo_uid = None
+
+ try:
+ # Create todo
+ result = await nc_client.calendar.create_todo(
+ calendar_name,
+ {"summary": "Status Transition Test", "status": "NEEDS-ACTION"},
+ )
+ todo_uid = result["uid"]
+
+ # Transition: NEEDS-ACTION → IN-PROCESS
+ logger.info("Transitioning todo to IN-PROCESS via MCP")
+ update_result = await nc_mcp_client.call_tool(
+ "nc_calendar_update_todo",
+ {
+ "calendar_name": calendar_name,
+ "todo_uid": todo_uid,
+ "status": "IN-PROCESS",
+ "percent_complete": 25,
+ },
+ )
+ assert update_result.isError is False
+
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ todo = next(t for t in todos if t["uid"] == todo_uid)
+ assert todo["status"] == "IN-PROCESS"
+ assert todo["percent_complete"] == 25
+
+ # Transition: IN-PROCESS → COMPLETED
+ logger.info("Transitioning todo to COMPLETED via MCP")
+ completed_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
+ update_result = await nc_mcp_client.call_tool(
+ "nc_calendar_update_todo",
+ {
+ "calendar_name": calendar_name,
+ "todo_uid": todo_uid,
+ "status": "COMPLETED",
+ "percent_complete": 100,
+ "completed": completed_time,
+ },
+ )
+ assert update_result.isError is False
+
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ todo = next(t for t in todos if t["uid"] == todo_uid)
+ assert todo["status"] == "COMPLETED"
+ assert todo["percent_complete"] == 100
+ assert "completed" in todo
+
+ logger.info("Todo status transitions test passed")
+
+ finally:
+ if todo_uid:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+ except Exception:
+ pass
+
+
+async def test_mcp_todo_with_dates(
+ nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test creating and managing todos with date fields via MCP tools."""
+
+ calendar_name = temporary_calendar
+ todo_uid = None
+
+ try:
+ now = datetime.now()
+ start_date = (now + timedelta(days=1)).strftime("%Y-%m-%dT09:00:00")
+ due_date = (now + timedelta(days=7)).strftime("%Y-%m-%dT17:00:00")
+
+ # Create todo with dates via MCP
+ logger.info("Creating todo with dates via MCP")
+ create_result = await nc_mcp_client.call_tool(
+ "nc_calendar_create_todo",
+ {
+ "calendar_name": calendar_name,
+ "summary": "Task with Dates",
+ "description": "Test task with various date fields",
+ "status": "NEEDS-ACTION",
+ "dtstart": start_date,
+ "due": due_date,
+ },
+ )
+ assert create_result.isError is False
+
+ import json
+
+ result_data = json.loads(create_result.content[0].text)
+ todo_uid = result_data["uid"]
+
+ # Verify dates via client
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ created_todo = next(t for t in todos if t["uid"] == todo_uid)
+ assert created_todo["summary"] == "Task with Dates"
+ assert "dtstart" in created_todo
+ assert "due" in created_todo
+
+ logger.info("Todo with dates test passed")
+
+ finally:
+ if todo_uid:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+ except Exception:
+ pass
+
+
+async def test_mcp_todo_categories(
+ nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_calendar: str
+):
+ """Test creating and managing todos with categories via MCP tools."""
+
+ calendar_name = temporary_calendar
+ todo_uid = None
+
+ try:
+ # Create todo with multiple categories via MCP
+ logger.info("Creating todo with categories via MCP")
+ create_result = await nc_mcp_client.call_tool(
+ "nc_calendar_create_todo",
+ {
+ "calendar_name": calendar_name,
+ "summary": "Task with Categories",
+ "status": "NEEDS-ACTION",
+ "categories": "work,meeting,important,quarterly",
+ },
+ )
+ assert create_result.isError is False
+
+ import json
+
+ result_data = json.loads(create_result.content[0].text)
+ todo_uid = result_data["uid"]
+
+ # Verify categories via client
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ created_todo = next(t for t in todos if t["uid"] == todo_uid)
+ assert "categories" in created_todo
+ categories_str = created_todo["categories"]
+ assert "work" in categories_str
+ assert "meeting" in categories_str
+ assert "important" in categories_str
+ assert "quarterly" in categories_str
+
+ # Update categories via MCP
+ logger.info("Updating todo categories via MCP")
+ update_result = await nc_mcp_client.call_tool(
+ "nc_calendar_update_todo",
+ {
+ "calendar_name": calendar_name,
+ "todo_uid": todo_uid,
+ "categories": "updated,new-category",
+ },
+ )
+ assert update_result.isError is False
+
+ # Verify updated categories
+ todos = await nc_client.calendar.list_todos(calendar_name)
+ updated_todo = next(t for t in todos if t["uid"] == todo_uid)
+ categories_str = updated_todo["categories"]
+ assert "updated" in categories_str
+ assert "new-category" in categories_str
+
+ logger.info("Todo categories test passed")
+
+ finally:
+ if todo_uid:
+ try:
+ await nc_client.calendar.delete_todo(calendar_name, todo_uid)
+ except Exception:
+ pass
diff --git a/tests/server/test_mcp.py b/tests/server/test_mcp.py
index 90a9ecb..ff9a310 100644
--- a/tests/server/test_mcp.py
+++ b/tests/server/test_mcp.py
@@ -57,6 +57,11 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession):
"nc_calendar_find_availability",
"nc_calendar_bulk_operations",
"nc_calendar_manage_calendar",
+ "nc_calendar_list_todos",
+ "nc_calendar_create_todo",
+ "nc_calendar_update_todo",
+ "nc_calendar_delete_todo",
+ "nc_calendar_search_todos",
"deck_create_board",
"nc_cookbook_import_recipe",
"nc_cookbook_list_recipes",
diff --git a/uv.lock b/uv.lock
index 185cde3..641256d 100644
--- a/uv.lock
+++ b/uv.lock
@@ -54,8 +54,8 @@ wheels = [
[[package]]
name = "caldav"
-version = "2.0.2.dev22+gaa8322dc7"
-source = { git = "https://github.com/cbcoutinho/caldav?branch=feature%2Fhttpx#aa8322dc7c4d0bf99593e1f46e577bb0aa5073c8" }
+version = "2.0.2.dev33+g4877e4688"
+source = { git = "https://github.com/cbcoutinho/caldav?branch=feature%2Fhttpx#4877e46884dbd2bc54f8fb61ee5d056342605e9c" }
dependencies = [
{ name = "httpx", extra = ["http2"] },
{ name = "icalendar" },