diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1896f11..f8b3af0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,13 @@ repos: - repo: https://github.com/commitizen-tools/commitizen - rev: v4.8.2 + rev: v4.8.3 hooks: - id: commitizen - id: commitizen-branch stages: - pre-push - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.12.2 + rev: v0.12.5 hooks: - id: ruff-check - id: ruff-format diff --git a/Dockerfile b/Dockerfile index d98d6d1..90784da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,4 +6,4 @@ COPY . . RUN uv sync --locked --no-dev -CMD ["/app/.venv/bin/mcp", "run", "--transport", "sse", "/app/nextcloud_mcp_server/server.py:mcp"] +CMD ["/app/.venv/bin/mcp", "run", "--transport", "sse", "/app/nextcloud_mcp_server/app.py:mcp"] diff --git a/README.md b/README.md index 0d7f730..779089c 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,7 @@ NEXTCLOUD_PASSWORD=your_nextcloud_app_password_or_login_password * `NEXTCLOUD_HOST`: The full URL of your Nextcloud instance. * `NEXTCLOUD_USERNAME`: Your Nextcloud username. * `NEXTCLOUD_PASSWORD`: **Important:** It is highly recommended to use a dedicated Nextcloud App Password for security. You can generate one in your Nextcloud Security settings. Alternatively, you can use your regular login password, but this is less secure. +* `FASTMCP_HOST`: _Optional:_ By default FastMCP binds to localhost. Use this variable to set a different binding address (e.g. `0.0.0.0`) ## Running the Server @@ -255,10 +256,12 @@ Ensure your environment variables are loaded, then run the server using `mcp run export $(grep -v '^#' .env | xargs) # Run the server -mcp run --transport sse nextcloud_mcp_server.server:mcp +mcp run --transport sse nextcloud_mcp_server.app:mcp ``` -The server will start, typically listening on `http://0.0.0.0:8000`. +The server will start, typically listening on `http://localhost:8000`. + +> NOTE: To make the server bind to a different address, use the FASTMCP_HOST environmental variable ### Using Docker diff --git a/app-hooks/post-installation/install-calendar-app.sh b/app-hooks/post-installation/install-calendar-app.sh new file mode 100755 index 0000000..eb142a1 --- /dev/null +++ b/app-hooks/post-installation/install-calendar-app.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +set -e # Exit on any error + +echo "Installing and configuring Calendar app..." + +# Enable calendar app +php /var/www/html/occ app:enable calendar + +# Wait for calendar app to be fully initialized +echo "Waiting for calendar app to initialize..." +sleep 5 + +# Ensure maintenance mode is off before calendar operations +php /var/www/html/occ maintenance:mode --off + +# Sync DAV system to ensure proper initialization +echo "Syncing DAV system..." +php /var/www/html/occ dav:sync-system-addressbook + +# Repair calendar app to ensure proper setup +echo "Repairing calendar app..." +php /var/www/html/occ maintenance:repair --include-expensive + +# Final wait to ensure CalDAV service is fully ready +echo "Final CalDAV initialization wait..." +sleep 5 + +echo "Calendar app installation complete!" diff --git a/docker-compose.yml b/docker-compose.yml index 03294f8..5a858fb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,6 +52,7 @@ services: - NEXTCLOUD_HOST=http://app:80 - NEXTCLOUD_USERNAME=admin - NEXTCLOUD_PASSWORD=admin + - FASTMCP_HOST=0.0.0.0 volumes: nextcloud: diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py new file mode 100644 index 0000000..6f359fe --- /dev/null +++ b/nextcloud_mcp_server/app.py @@ -0,0 +1,62 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from dataclasses import dataclass + +from mcp.server.fastmcp import Context, FastMCP + +from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.config import setup_logging +from nextcloud_mcp_server.server import ( + configure_calendar_tools, + configure_notes_tools, + configure_tables_tools, + configure_webdav_tools, +) + +setup_logging() + + +@dataclass +class AppContext: + client: NextcloudClient + + +@asynccontextmanager +async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: + """Manage application lifecycle with type-safe context""" + # Initialize on startup + logging.info("Creating Nextcloud client") + client = NextcloudClient.from_env() + logging.info("Client initialization wait complete.") + try: + yield AppContext(client=client) + finally: + # Cleanup on shutdown + await client.close() + + +# Create an MCP server +mcp = FastMCP("Nextcloud MCP", lifespan=app_lifespan) + +logger = logging.getLogger(__name__) + + +@mcp.resource("nc://capabilities") +async def nc_get_capabilities(): + """Get the Nextcloud Host capabilities""" + ctx: Context = ( + mcp.get_context() + ) # https://github.com/modelcontextprotocol/python-sdk/issues/244 + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.capabilities() + + +configure_notes_tools(mcp) +configure_tables_tools(mcp) +configure_webdav_tools(mcp) +configure_calendar_tools(mcp) + + +def run(): + mcp.run() diff --git a/nextcloud_mcp_server/client/__init__.py b/nextcloud_mcp_server/client/__init__.py index e6a27d8..d641a4d 100644 --- a/nextcloud_mcp_server/client/__init__.py +++ b/nextcloud_mcp_server/client/__init__.py @@ -1,18 +1,13 @@ -import os -from httpx import ( - AsyncClient, - Auth, - BasicAuth, - Request, - Response, -) import logging +import os + +from httpx import AsyncClient, Auth, BasicAuth, Request, Response -from .notes import NotesClient -from .webdav import WebDAVClient -from .tables import TablesClient -from .calendar import CalendarClient from ..controllers.notes_search import NotesSearchController +from .calendar import CalendarClient +from .notes import NotesClient +from .tables import TablesClient +from .webdav import WebDAVClient logger = logging.getLogger(__name__) diff --git a/nextcloud_mcp_server/client/base.py b/nextcloud_mcp_server/client/base.py index 92add59..22eab6a 100644 --- a/nextcloud_mcp_server/client/base.py +++ b/nextcloud_mcp_server/client/base.py @@ -1,8 +1,9 @@ """Base client for Nextcloud operations with shared authentication.""" -from abc import ABC -from httpx import AsyncClient import logging +from abc import ABC + +from httpx import AsyncClient logger = logging.getLogger(__name__) diff --git a/nextcloud_mcp_server/client/calendar.py b/nextcloud_mcp_server/client/calendar.py index c268a67..1057b80 100644 --- a/nextcloud_mcp_server/client/calendar.py +++ b/nextcloud_mcp_server/client/calendar.py @@ -1,13 +1,15 @@ """CalDAV client for NextCloud calendar operations.""" -import xml.etree.ElementTree as ET -from datetime import datetime, date -from typing import Dict, Any, List, Optional, Tuple +import datetime as dt import logging -from httpx import HTTPStatusError -from icalendar import Calendar, Event as ICalEvent, vRecur, Alarm -from datetime import timedelta import uuid +import xml.etree.ElementTree as ET +from typing import Any, Dict, List, Optional, Tuple + +from httpx import HTTPStatusError +from icalendar import Alarm, Calendar +from icalendar import Event as ICalEvent +from icalendar import vRecur from .base import BaseNextcloudClient @@ -46,105 +48,80 @@ class CalendarClient(BaseNextcloudClient): "Accept": "application/xml", } - try: - response = await self._client.request( - "PROPFIND", caldav_path, content=propfind_body, headers=headers + response = await self._make_request( + "PROPFIND", caldav_path, content=propfind_body, headers=headers + ) + + # Parse XML response + root = ET.fromstring(response.content) + calendars = [] + + for response_elem in root.findall(".//{DAV:}response"): + href = response_elem.find(".//{DAV:}href") + if href is None: + continue + + href_text = href.text or "" + if not href_text.endswith("/"): + continue # Skip non-calendar resources + + # Extract calendar name from href + calendar_name = href_text.rstrip("/").split("/")[-1] + if not calendar_name or calendar_name == self.username: + continue + + # Get properties + propstat = response_elem.find(".//{DAV:}propstat") + if propstat is None: + continue + + prop = propstat.find(".//{DAV:}prop") + if prop is None: + continue + + # Check if it's a calendar resource + resourcetype = prop.find(".//{DAV:}resourcetype") + is_calendar = ( + resourcetype is not None + and resourcetype.find(".//{urn:ietf:params:xml:ns:caldav}calendar") + is not None ) - response.raise_for_status() - # Parse XML response - root = ET.fromstring(response.content) - calendars = [] + if not is_calendar: + continue - for response_elem in root.findall(".//{DAV:}response"): - href = response_elem.find(".//{DAV:}href") - if href is None: - continue + # Extract calendar properties + displayname_elem = prop.find(".//{DAV:}displayname") + displayname = ( + displayname_elem.text if displayname_elem is not None else calendar_name + ) - href_text = href.text or "" - if not href_text.endswith("/"): - continue # Skip non-calendar resources + description_elem = prop.find( + ".//{urn:ietf:params:xml:ns:caldav}calendar-description" + ) + description = description_elem.text if description_elem is not None else "" - # Extract calendar name from href - calendar_name = href_text.rstrip("/").split("/")[-1] - if not calendar_name or calendar_name == self.username: - continue + color_elem = prop.find(".//{http://calendarserver.org/ns/}calendar-color") + color = color_elem.text if color_elem is not None else "#1976D2" - # Get properties - propstat = response_elem.find(".//{DAV:}propstat") - if propstat is None: - continue + calendars.append( + { + "name": calendar_name, + "display_name": displayname, + "description": description, + "color": color, + "href": href_text, + } + ) - prop = propstat.find(".//{DAV:}prop") - if prop is None: - continue - - # Check if it's a calendar resource - resourcetype = prop.find(".//{DAV:}resourcetype") - is_calendar = ( - resourcetype is not None - and resourcetype.find(".//{urn:ietf:params:xml:ns:caldav}calendar") - is not None - ) - - if not is_calendar: - continue - - # Extract calendar properties - displayname_elem = prop.find(".//{DAV:}displayname") - displayname = ( - displayname_elem.text - if displayname_elem is not None - else calendar_name - ) - - description_elem = prop.find( - ".//{urn:ietf:params:xml:ns:caldav}calendar-description" - ) - description = ( - description_elem.text if description_elem is not None else "" - ) - - color_elem = prop.find( - ".//{http://calendarserver.org/ns/}calendar-color" - ) - color = color_elem.text if color_elem is not None else "#1976D2" - - calendars.append( - { - "name": calendar_name, - "display_name": displayname, - "description": description, - "color": color, - "href": href_text, - } - ) - - logger.debug(f"Found {len(calendars)} calendars") - return calendars - - except HTTPStatusError as e: - if e.response.status_code == 401: - logger.warning( - "Authentication failed for CalDAV - Calendar app may not be enabled for this user" - ) - return [] - elif e.response.status_code == 404: - logger.warning( - "CalDAV endpoint not found - Calendar app may not be installed" - ) - return [] - logger.error(f"HTTP error listing calendars: {e}") - raise e - except Exception as e: - logger.error(f"Unexpected error listing calendars: {e}") - raise e + logger.debug(f"Found {len(calendars)} calendars") + return calendars async def get_calendar_events( self, calendar_name: str, - start_date: str = "", - end_date: str = "", + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, limit: int = 50, ) -> List[Dict[str, Any]]: """List events in a calendar within date range.""" @@ -152,9 +129,18 @@ class CalendarClient(BaseNextcloudClient): # Build time range filter if dates provided time_range_filter = "" - if start_date or end_date: - start_dt = start_date or "19700101T000000Z" - end_dt = end_date or "20301231T235959Z" + if start_datetime or end_datetime: + # Convert datetime objects to CalDAV format (YYYYMMDDTHHMMSSZ) + start_dt = ( + start_datetime.strftime("%Y%m%dT%H%M%SZ") + if start_datetime + else "19700101T000000Z" + ) + end_dt = ( + end_datetime.strftime("%Y%m%dT%H%M%SZ") + if end_datetime + else "20301231T235959Z" + ) time_range_filter = f""" """ @@ -180,55 +166,42 @@ class CalendarClient(BaseNextcloudClient): "Accept": "application/xml", } - try: - response = await self._client.request( - "REPORT", calendar_path, content=report_body, headers=headers - ) - response.raise_for_status() + response = await self._make_request( + "REPORT", calendar_path, content=report_body, headers=headers + ) - # Parse XML response and extract events - root = ET.fromstring(response.content) - events = [] + # Parse XML response and extract events + root = ET.fromstring(response.content) + events = [] - for response_elem in root.findall(".//{DAV:}response"): - href = response_elem.find(".//{DAV:}href") - if href is None: - continue + for response_elem in root.findall(".//{DAV:}response"): + href = response_elem.find(".//{DAV:}href") + if href is None: + continue - propstat = response_elem.find(".//{DAV:}propstat") - if propstat is None: - continue + propstat = response_elem.find(".//{DAV:}propstat") + if propstat is None: + continue - prop = propstat.find(".//{DAV:}prop") - if prop is None: - continue + prop = propstat.find(".//{DAV:}prop") + if prop is None: + continue - calendar_data = prop.find( - ".//{urn:ietf:params:xml:ns:caldav}calendar-data" - ) - etag_elem = prop.find(".//{DAV:}getetag") + calendar_data = prop.find(".//{urn:ietf:params:xml:ns:caldav}calendar-data") + etag_elem = prop.find(".//{DAV:}getetag") - if calendar_data is not None and calendar_data.text: - event_data = self._parse_ical_event(calendar_data.text) - if event_data: - event_data["href"] = href.text - event_data["etag"] = ( - etag_elem.text if etag_elem is not None else "" - ) - events.append(event_data) + if calendar_data is not None and calendar_data.text: + event_data = self._parse_ical_event(calendar_data.text) + if event_data: + event_data["href"] = href.text + event_data["etag"] = etag_elem.text if etag_elem is not None else "" + events.append(event_data) - if len(events) >= limit: - break + if len(events) >= limit: + break - logger.debug(f"Found {len(events)} events") - return events - - except HTTPStatusError as e: - logger.error(f"HTTP error getting calendar events: {e}") - raise e - except Exception as e: - logger.error(f"Unexpected error getting calendar events: {e}") - raise e + logger.debug(f"Found {len(events)} events") + return events async def create_event( self, calendar_name: str, event_data: Dict[str, Any] @@ -246,26 +219,17 @@ class CalendarClient(BaseNextcloudClient): "If-None-Match": "*", # Ensure we're creating, not updating } - try: - response = await self._client.put( - event_path, content=ical_content, headers=headers - ) - response.raise_for_status() + response = await self._make_request( + "PUT", event_path, content=ical_content, headers=headers + ) - logger.debug(f"Created event {event_uid}") - return { - "uid": event_uid, - "href": event_path, - "etag": response.headers.get("etag", ""), - "status_code": response.status_code, - } - - except HTTPStatusError as e: - logger.error(f"HTTP error creating event: {e}") - raise e - except Exception as e: - logger.error(f"Unexpected error creating event: {e}") - raise e + logger.debug(f"Created event {event_uid}") + return { + "uid": event_uid, + "href": event_path, + "etag": response.headers.get("etag", ""), + "status_code": response.status_code, + } async def update_event( self, @@ -303,10 +267,9 @@ class CalendarClient(BaseNextcloudClient): headers["If-Match"] = etag try: - response = await self._client.put( - event_path, content=ical_content, headers=headers + response = await self._make_request( + "PUT", event_path, content=ical_content, headers=headers ) - response.raise_for_status() logger.debug(f"Updated event {event_uid}") return { @@ -329,8 +292,7 @@ class CalendarClient(BaseNextcloudClient): event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" try: - response = await self._client.delete(event_path) - response.raise_for_status() + response = await self._make_request("DELETE", event_path) logger.debug(f"Deleted event {event_uid}") return {"status_code": response.status_code} @@ -355,8 +317,7 @@ class CalendarClient(BaseNextcloudClient): headers = {"Accept": "text/calendar"} try: - response = await self._client.get(event_path, headers=headers) - response.raise_for_status() + response = await self._make_request("GET", event_path, headers=headers) etag = response.headers.get("etag", "") event_data = self._parse_ical_event(response.text) @@ -396,16 +357,16 @@ class CalendarClient(BaseNextcloudClient): if start_str: # Only parse if start_datetime is provided if all_day: - start_date = datetime.fromisoformat(start_str.split("T")[0]).date() + start_date = dt.datetime.fromisoformat(start_str.split("T")[0]).date() event.add("dtstart", start_date) if end_str: - end_date = datetime.fromisoformat(end_str.split("T")[0]).date() + end_date = dt.datetime.fromisoformat(end_str.split("T")[0]).date() event.add("dtend", end_date) else: - start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00")) + start_dt = dt.datetime.fromisoformat(start_str.replace("Z", "+00:00")) event.add("dtstart", start_dt) if end_str: - end_dt = datetime.fromisoformat(end_str.replace("Z", "+00:00")) + end_dt = dt.datetime.fromisoformat(end_str.replace("Z", "+00:00")) event.add("dtend", end_dt) # Add categories @@ -442,7 +403,7 @@ class CalendarClient(BaseNextcloudClient): alarm = Alarm() alarm.add("action", "DISPLAY") alarm.add("description", "Event reminder") - alarm.add("trigger", timedelta(minutes=-reminder_minutes)) + alarm.add("trigger", dt.timedelta(minutes=-reminder_minutes)) event.add_component(alarm) # Add attendees @@ -453,7 +414,7 @@ class CalendarClient(BaseNextcloudClient): event.add("attendee", f"mailto:{email.strip()}") # Add timestamps - now = datetime.utcnow() + now = dt.datetime.now(dt.UTC) event.add("created", now) event.add("dtstamp", now) event.add("last-modified", now) @@ -481,8 +442,8 @@ class CalendarClient(BaseNextcloudClient): # Handle dates dtstart = component.get("dtstart") if dtstart: - if isinstance(dtstart.dt, date) and not isinstance( - dtstart.dt, datetime + if isinstance(dtstart.dt, dt.date) and not isinstance( + dtstart.dt, dt.datetime ): event_data["start_datetime"] = dtstart.dt.isoformat() event_data["all_day"] = True @@ -492,8 +453,8 @@ class CalendarClient(BaseNextcloudClient): dtend = component.get("dtend") if dtend: - if isinstance(dtend.dt, date) and not isinstance( - dtend.dt, datetime + if isinstance(dtend.dt, dt.date) and not isinstance( + dtend.dt, dt.datetime ): event_data["end_datetime"] = dtend.dt.isoformat() else: @@ -554,8 +515,8 @@ class CalendarClient(BaseNextcloudClient): async def search_events_across_calendars( self, - start_date: str = "", - end_date: str = "", + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, filters: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Search events across all calendars with advanced filtering.""" @@ -566,7 +527,7 @@ class CalendarClient(BaseNextcloudClient): for calendar in calendars: try: events = await self.get_calendar_events( - calendar["name"], start_date, end_date + calendar["name"], start_datetime, end_datetime ) # Apply filters if provided @@ -625,10 +586,12 @@ class CalendarClient(BaseNextcloudClient): end_str = event.get("end_datetime", "") if start_str and end_str: try: - start_dt = datetime.fromisoformat( + start_dt = dt.datetime.fromisoformat( start_str.replace("Z", "+00:00") ) - end_dt = datetime.fromisoformat(end_str.replace("Z", "+00:00")) + end_dt = dt.datetime.fromisoformat( + end_str.replace("Z", "+00:00") + ) duration_minutes = (end_dt - start_dt).total_seconds() / 60 if duration_minutes < filters["min_duration_minutes"]: return False @@ -671,22 +634,21 @@ class CalendarClient(BaseNextcloudClient): self, duration_minutes: int, attendees: Optional[List[str]] = None, - date_range_start: str = "", - date_range_end: str = "", + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, constraints: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Find available time slots for scheduling.""" try: # Set default date range if not provided - if not date_range_start: - date_range_start = datetime.now().strftime("%Y-%m-%d") - if not date_range_end: - end_date = datetime.now() + timedelta(days=7) - date_range_end = end_date.strftime("%Y-%m-%d") + if not start_datetime: + start_datetime = dt.datetime.now() + if not end_datetime: + end_datetime = dt.datetime.now() + dt.timedelta(days=7) # Get all events in the date range busy_events = await self.search_events_across_calendars( - start_date=date_range_start, end_date=date_range_end + start_datetime=start_datetime, end_datetime=end_datetime ) # Filter events for relevant attendees if specified @@ -710,8 +672,8 @@ class CalendarClient(BaseNextcloudClient): available_slots = self._generate_available_slots( busy_events, duration_minutes, - date_range_start, - date_range_end, + start_datetime, + end_datetime, business_hours_only, exclude_weekends, preferred_times, @@ -727,8 +689,8 @@ class CalendarClient(BaseNextcloudClient): self, busy_events: List[Dict[str, Any]], duration_minutes: int, - start_date: str, - end_date: str, + start_datetime: dt.datetime, + end_datetime: dt.datetime, business_hours_only: bool, exclude_weekends: bool, preferred_times: List[str], @@ -737,13 +699,17 @@ class CalendarClient(BaseNextcloudClient): available_slots = [] try: - current_date = datetime.fromisoformat(start_date) - end_date_dt = datetime.fromisoformat(end_date) + current_date = start_datetime.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + end_date_dt = end_datetime.replace( + hour=23, minute=59, second=59, microsecond=999999 + ) while current_date <= end_date_dt: # Skip weekends if requested if exclude_weekends and current_date.weekday() >= 5: - current_date += timedelta(days=1) + current_date += dt.timedelta(days=1) continue # Generate slots for this day @@ -756,7 +722,7 @@ class CalendarClient(BaseNextcloudClient): ) available_slots.extend(day_slots) - current_date += timedelta(days=1) + current_date += dt.timedelta(days=1) return available_slots[:10] # Limit to 10 slots @@ -766,7 +732,7 @@ class CalendarClient(BaseNextcloudClient): def _generate_day_slots( self, - date: datetime, + date: dt.datetime, busy_events: List[Dict[str, Any]], duration_minutes: int, business_hours_only: bool, @@ -786,10 +752,10 @@ class CalendarClient(BaseNextcloudClient): day_busy_periods = [] for event in busy_events: try: - event_start = datetime.fromisoformat( + event_start = dt.datetime.fromisoformat( event["start_datetime"].replace("Z", "+00:00") ) - event_end = datetime.fromisoformat( + event_end = dt.datetime.fromisoformat( event["end_datetime"].replace("Z", "+00:00") ) @@ -807,7 +773,7 @@ class CalendarClient(BaseNextcloudClient): hour=start_hour, minute=0, second=0, microsecond=0 ) end_time = date.replace(hour=end_hour, minute=0, second=0, microsecond=0) - slot_duration = timedelta(minutes=duration_minutes) + slot_duration = dt.timedelta(minutes=duration_minutes) while current_time + slot_duration <= end_time: slot_end = current_time + slot_duration @@ -829,7 +795,7 @@ class CalendarClient(BaseNextcloudClient): } ) - current_time += timedelta(minutes=30) # 30-minute increments + current_time += dt.timedelta(minutes=30) # 30-minute increments return slots @@ -852,8 +818,8 @@ class CalendarClient(BaseNextcloudClient): for time_range in preferred_times: try: start_str, end_str = time_range.split("-") - pref_start = datetime.strptime(start_str, "%H:%M").time() - pref_end = datetime.strptime(end_str, "%H:%M").time() + pref_start = dt.datetime.strptime(start_str, "%H:%M").time() + pref_end = dt.datetime.strptime(end_str, "%H:%M").time() if pref_start <= slot_start <= pref_end: return True @@ -867,10 +833,20 @@ class CalendarClient(BaseNextcloudClient): ) -> Dict[str, Any]: """Bulk update events matching filter criteria.""" try: + # Convert string dates to datetime objects if present + start_datetime = None + end_datetime = None + if "start_date" in filter_criteria and filter_criteria["start_date"]: + start_datetime = dt.datetime.fromisoformat( + filter_criteria["start_date"] + ) + if "end_date" in filter_criteria and filter_criteria["end_date"]: + end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"]) + # Find events matching criteria events = await self.search_events_across_calendars( - start_date=filter_criteria.get("start_date", ""), - end_date=filter_criteria.get("end_date", ""), + start_datetime=start_datetime, + end_datetime=end_datetime, filters=filter_criteria, ) @@ -943,10 +919,9 @@ class CalendarClient(BaseNextcloudClient): headers = {"Content-Type": "application/xml", "Depth": "0"} - response = await self._client.request( + response = await self._make_request( "MKCALENDAR", calendar_path, content=mkcol_body, headers=headers ) - response.raise_for_status() logger.debug(f"Created calendar: {calendar_name}") return { @@ -966,8 +941,7 @@ class CalendarClient(BaseNextcloudClient): try: calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" - response = await self._client.delete(calendar_path) - response.raise_for_status() + response = await self._make_request("DELETE", calendar_path) logger.debug(f"Deleted calendar: {calendar_name}") return {"status_code": response.status_code} diff --git a/nextcloud_mcp_server/client/notes.py b/nextcloud_mcp_server/client/notes.py index b8f951a..3ce8cd8 100644 --- a/nextcloud_mcp_server/client/notes.py +++ b/nextcloud_mcp_server/client/notes.py @@ -1,7 +1,7 @@ """Client for Nextcloud Notes app operations.""" -from typing import Dict, List, Any, Optional import logging +from typing import Any, Dict, List, Optional from .base import BaseNextcloudClient diff --git a/nextcloud_mcp_server/client/tables.py b/nextcloud_mcp_server/client/tables.py index 3ee0bc4..1a382bf 100644 --- a/nextcloud_mcp_server/client/tables.py +++ b/nextcloud_mcp_server/client/tables.py @@ -1,7 +1,7 @@ """Client for Nextcloud Tables app operations.""" -from typing import Dict, List, Any, Optional import logging +from typing import Any, Dict, List, Optional from .base import BaseNextcloudClient diff --git a/nextcloud_mcp_server/client/webdav.py b/nextcloud_mcp_server/client/webdav.py index c9b08e9..fbe4f28 100644 --- a/nextcloud_mcp_server/client/webdav.py +++ b/nextcloud_mcp_server/client/webdav.py @@ -1,10 +1,11 @@ """WebDAV client for Nextcloud file operations.""" -import mimetypes -from typing import Tuple, Dict, Any, Optional, List import logging -from httpx import HTTPStatusError +import mimetypes import xml.etree.ElementTree as ET +from typing import Any, Dict, List, Optional, Tuple + +from httpx import HTTPStatusError from .base import BaseNextcloudClient diff --git a/nextcloud_mcp_server/config.py b/nextcloud_mcp_server/config.py index e434344..c37ce94 100644 --- a/nextcloud_mcp_server/config.py +++ b/nextcloud_mcp_server/config.py @@ -21,12 +21,12 @@ LOGGING_CONFIG = { }, "httpx": { "handlers": ["default"], - "level": "DEBUG", + "level": "INFO", "propagate": False, # Prevent propagation to root logger }, "httpcore": { "handlers": ["default"], - "level": "DEBUG", + "level": "INFO", "propagate": False, # Prevent propagation to root logger }, }, diff --git a/nextcloud_mcp_server/controllers/notes_search.py b/nextcloud_mcp_server/controllers/notes_search.py index 7a4e0e8..35f7357 100644 --- a/nextcloud_mcp_server/controllers/notes_search.py +++ b/nextcloud_mcp_server/controllers/notes_search.py @@ -1,6 +1,6 @@ """Controller for notes search functionality.""" -from typing import List, Dict, Any +from typing import Any, Dict, List class NotesSearchController: diff --git a/nextcloud_mcp_server/server.py b/nextcloud_mcp_server/server.py deleted file mode 100644 index a23a170..0000000 --- a/nextcloud_mcp_server/server.py +++ /dev/null @@ -1,1067 +0,0 @@ -# server.py -import logging -from typing import Optional -from datetime import datetime, timedelta -from nextcloud_mcp_server.config import setup_logging -from contextlib import asynccontextmanager -from dataclasses import dataclass -from mcp.server.fastmcp import FastMCP, Context -from collections.abc import AsyncIterator -from nextcloud_mcp_server.client import NextcloudClient - -setup_logging() - - -@dataclass -class AppContext: - client: NextcloudClient - - -@asynccontextmanager -async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: - """Manage application lifecycle with type-safe context""" - # Initialize on startup - logging.info("Creating Nextcloud client") - client = NextcloudClient.from_env() - logging.info("Client initialization wait complete.") - try: - yield AppContext(client=client) - finally: - # Cleanup on shutdown - await client.close() - - -# Create an MCP server -mcp = FastMCP("Nextcloud MCP", lifespan=app_lifespan) - -logger = logging.getLogger(__name__) - - -@mcp.resource("nc://capabilities") -async def nc_get_capabilities(): - """Get the Nextcloud Host capabilities""" - ctx: Context = ( - mcp.get_context() - ) # https://github.com/modelcontextprotocol/python-sdk/issues/244 - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.capabilities() - - -@mcp.resource("notes://settings") -async def notes_get_settings(): - """Get the Notes App settings""" - ctx: Context = ( - mcp.get_context() - ) # https://github.com/modelcontextprotocol/python-sdk/issues/244 - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.get_settings() - - -@mcp.tool() -async def nc_get_note(note_id: int, ctx: Context): - """Get user note using note id""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.get_note(note_id) - - -@mcp.tool() -async def nc_notes_create_note(title: str, content: str, category: str, ctx: Context): - """Create a new note""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.create_note( - title=title, - content=content, - category=category, - ) - - -@mcp.tool() -async def nc_notes_update_note( - note_id: int, - etag: str, - title: str | None, - content: str | None, - category: str | None, - ctx: Context, -): - logger.info("Updating note %s", note_id) - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.update( - note_id=note_id, - etag=etag, - title=title, - content=content, - category=category, - ) - - -@mcp.tool() -async def nc_notes_append_content(note_id: int, content: str, ctx: Context): - """Append content to an existing note with a clear separator""" - logger.info("Appending content to note %s", note_id) - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.append_content(note_id=note_id, content=content) - - -@mcp.tool() -async def nc_notes_search_notes(query: str, ctx: Context): - """Search notes by title or content, returning only id, title, and category.""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes_search_notes(query=query) - - -@mcp.tool() -async def nc_notes_delete_note(note_id: int, ctx: Context): - logger.info("Deleting note %s", note_id) - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.notes.delete_note(note_id) - - -# Tables tools -@mcp.tool() -async def nc_tables_list_tables(ctx: Context): - """List all tables available to the user""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.tables.list_tables() - - -@mcp.tool() -async def nc_tables_get_schema(table_id: int, ctx: Context): - """Get the schema/structure of a specific table including columns and views""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.tables.get_table_schema(table_id) - - -@mcp.tool() -async def nc_tables_read_table( - table_id: int, - ctx: Context, - limit: int | None = None, - offset: int | None = None, -): - """Read rows from a table with optional pagination""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.tables.get_table_rows(table_id, limit, offset) - - -@mcp.tool() -async def nc_tables_insert_row(table_id: int, data: dict, ctx: Context): - """Insert a new row into a table. - - Data should be a dictionary mapping column IDs to values, e.g. {1: "text", 2: 42} - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.tables.create_row(table_id, data) - - -@mcp.tool() -async def nc_tables_update_row(row_id: int, data: dict, ctx: Context): - """Update an existing row in a table. - - Data should be a dictionary mapping column IDs to new values, e.g. {1: "new text", 2: 99} - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.tables.update_row(row_id, data) - - -@mcp.tool() -async def nc_tables_delete_row(row_id: int, ctx: Context): - """Delete a row from a table""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.tables.delete_row(row_id) - - -@mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}") -async def nc_notes_get_attachment(note_id: int, attachment_filename: str): - """Get a specific attachment from a note""" - ctx: Context = mcp.get_context() - client: NextcloudClient = ctx.request_context.lifespan_context.client - # Assuming a method get_note_attachment exists in the client - # This method should return the raw content and determine the mime type - content, mime_type = await client.webdav.get_note_attachment( - note_id=note_id, filename=attachment_filename - ) - return { - "contents": [ - { - # Use uppercase 'Notes' to match the decorator - "uri": f"nc://Notes/{note_id}/attachments/{attachment_filename}", - "mimeType": mime_type, # Client needs to determine this - "data": content, # Return raw bytes/data - } - ] - } - - -# WebDAV file system tools -@mcp.tool() -async def nc_webdav_list_directory(ctx: Context, path: str = ""): - """List files and directories in the specified NextCloud path. - - Args: - path: Directory path to list (empty string for root directory) - - Returns: - List of items with metadata including name, path, is_directory, size, content_type, last_modified - - Examples: - # List root directory - await nc_webdav_list_directory("") - - # List a specific folder - await nc_webdav_list_directory("Documents/Projects") - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.webdav.list_directory(path) - - -@mcp.tool() -async def nc_webdav_read_file(path: str, ctx: Context): - """Read the content of a file from NextCloud. - - Args: - path: Full path to the file to read - - Returns: - Dict with path, content, content_type, size, and encoding (if binary) - Text files are decoded to UTF-8, binary files are base64 encoded - - Examples: - # Read a text file - result = await nc_webdav_read_file("Documents/readme.txt") - print(result['content']) # Decoded text content - - # Read a binary file - result = await nc_webdav_read_file("Images/photo.jpg") - print(result['encoding']) # 'base64' - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - content, content_type = await client.webdav.read_file(path) - - # For text files, decode content for easier viewing - if content_type and content_type.startswith("text/"): - try: - decoded_content = content.decode("utf-8") - return { - "path": path, - "content": decoded_content, - "content_type": content_type, - "size": len(content), - } - except UnicodeDecodeError: - pass - - # For binary files, return metadata and base64 encoded content - import base64 - - return { - "path": path, - "content": base64.b64encode(content).decode("ascii"), - "content_type": content_type, - "size": len(content), - "encoding": "base64", - } - - -@mcp.tool() -async def nc_webdav_write_file( - path: str, content: str, ctx: Context, content_type: str | None = None -): - """Write content to a file in NextCloud. - - Args: - path: Full path where to write the file - content: File content (text or base64 for binary) - content_type: MIME type (auto-detected if not provided, use 'type;base64' for binary) - - Returns: - Dict with status_code indicating success - - Examples: - # Write a text file - await nc_webdav_write_file("Documents/notes.md", "# My Notes\nContent here...") - - # Write binary data (base64 encoded) - await nc_webdav_write_file("files/data.bin", base64_content, "application/octet-stream;base64") - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - # Handle base64 encoded content - if content_type and "base64" in content_type.lower(): - import base64 - - content_bytes = base64.b64decode(content) - content_type = content_type.replace(";base64", "") - else: - content_bytes = content.encode("utf-8") - - return await client.webdav.write_file(path, content_bytes, content_type) - - -@mcp.tool() -async def nc_webdav_create_directory(path: str, ctx: Context): - """Create a directory in NextCloud. - - Args: - path: Full path of the directory to create - - Returns: - Dict with status_code (201 for created, 405 if already exists) - - Examples: - # Create a single directory - await nc_webdav_create_directory("NewProject") - - # Create nested directories (parent must exist) - await nc_webdav_create_directory("Projects/MyApp/docs") - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.webdav.create_directory(path) - - -@mcp.tool() -async def nc_webdav_delete_resource(path: str, ctx: Context): - """Delete a file or directory in NextCloud. - - Args: - path: Full path of the file or directory to delete - - Returns: - Dict with status_code indicating result (404 if not found) - - Examples: - # Delete a file - await nc_webdav_delete_resource("old_document.txt") - - # Delete a directory (will delete all contents) - await nc_webdav_delete_resource("temp_folder") - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.webdav.delete_resource(path) - - -# Calendar tools -@mcp.tool() -async def nc_calendar_list_calendars(ctx: Context): - """List all available calendars for the user""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.calendar.list_calendars() - - -@mcp.tool() -async def nc_calendar_create_event( - calendar_name: str, - title: str, - start_datetime: str, - ctx: Context, - end_datetime: str = "", - all_day: bool = False, - description: str = "", - location: str = "", - categories: str = "", - recurring: bool = False, - recurrence_rule: str = "", - recurrence_end_date: str = "", - reminder_minutes: int = 15, - reminder_email: bool = False, - status: str = "CONFIRMED", - priority: int = 5, - privacy: str = "PUBLIC", - attendees: str = "", - url: str = "", - color: str = "", -): - """Create a comprehensive calendar event with full feature support - - Args: - calendar_name: Name of the calendar to create the event in - title: Event title - start_datetime: ISO format: "2025-01-15T14:00:00" or "2025-01-15" for all-day - ctx: MCP context - end_datetime: ISO format end time, empty for all-day events - all_day: Whether this is an all-day event - description: Event description/details - location: Event location - categories: Comma-separated categories (e.g., "work,meeting") - recurring: Whether this is a recurring event - recurrence_rule: RFC5545 RRULE (e.g., "FREQ=WEEKLY;BYDAY=MO,WE,FR") - recurrence_end_date: When to stop recurring - reminder_minutes: Minutes before event to send reminder - reminder_email: Whether to send email notification - status: Event status: CONFIRMED, TENTATIVE, or CANCELLED - priority: Priority level 1-9 (1=highest, 9=lowest, 5=normal) - privacy: Privacy level: PUBLIC, PRIVATE, or CONFIDENTIAL - attendees: Comma-separated email addresses - url: Related URL for the event - color: Event color (hex or name) - - Returns: - Dict with event creation result - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - event_data = { - "title": title, - "start_datetime": start_datetime, - "end_datetime": end_datetime, - "all_day": all_day, - "description": description, - "location": location, - "categories": categories, - "recurring": recurring, - "recurrence_rule": recurrence_rule, - "recurrence_end_date": recurrence_end_date, - "reminder_minutes": reminder_minutes, - "reminder_email": reminder_email, - "status": status, - "priority": priority, - "privacy": privacy, - "attendees": attendees, - "url": url, - "color": color, - } - - return await client.calendar.create_event(calendar_name, event_data) - - -@mcp.tool() -async def nc_calendar_list_events( - calendar_name: str, - ctx: Context, - start_date: str = "", - end_date: str = "", - limit: int = 50, - min_attendees: Optional[int] = None, - min_duration_minutes: Optional[int] = None, - categories: Optional[str] = None, - status: Optional[str] = None, - title_contains: Optional[str] = None, - location_contains: Optional[str] = None, - search_all_calendars: bool = False, -): - """List events in a calendar (or all calendars) within date range with advanced filtering. - - Args: - calendar_name: Name of the calendar to search. Ignored if search_all_calendars=True. - ctx: MCP context - start_date: Start date for search (YYYY-MM-DD format, e.g., "2025-01-01") - end_date: End date for search (YYYY-MM-DD format, e.g., "2025-01-31") - limit: Maximum number of events to return - min_attendees: Filter events with at least this many attendees - min_duration_minutes: Filter events with at least this duration - categories: Filter events containing any of these categories (comma-separated, e.g., "work,meeting") - status: Filter events by status (CONFIRMED, TENTATIVE, or CANCELLED) - title_contains: Filter events where title contains this text - location_contains: Filter events where location contains this text - search_all_calendars: If True, search across all calendars instead of just one - - Returns: - List of events matching the filters - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - # Build filters dictionary - filters = {} - if min_attendees is not None: - filters["min_attendees"] = min_attendees - if min_duration_minutes is not None: - filters["min_duration_minutes"] = min_duration_minutes - if categories is not None: - filters["categories"] = [cat.strip() for cat in categories.split(",")] - if status is not None: - filters["status"] = status - if title_contains is not None: - filters["title_contains"] = title_contains - if location_contains is not None: - filters["location_contains"] = location_contains - - if search_all_calendars: - # Search across all calendars with filters - events = await client.calendar.search_events_across_calendars( - start_date=start_date, - end_date=end_date, - filters=filters if filters else None, - ) - return events[:limit] - else: - # Search in specific calendar - events = await client.calendar.get_calendar_events( - calendar_name=calendar_name, - start_date=start_date, - end_date=end_date, - limit=limit, - ) - - # Apply filters if provided - if filters: - events = client.calendar._apply_event_filters(events, filters) - - return events - - -@mcp.tool() -async def nc_calendar_get_event( - calendar_name: str, - event_uid: str, - ctx: Context, -): - """Get detailed information about a specific event""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - event_data, etag = await client.calendar.get_event(calendar_name, event_uid) - return event_data - - -@mcp.tool() -async def nc_calendar_update_event( - calendar_name: str, - event_uid: str, - ctx: Context, - # All the same parameters as create_event but optional - title: str | None = None, - start_datetime: str | None = None, - end_datetime: str | None = None, - all_day: bool | None = None, - description: str | None = None, - location: str | None = None, - categories: str | None = None, - # Recurrence updates - recurring: bool | None = None, - recurrence_rule: str | None = None, - # Notification updates - reminder_minutes: int | None = None, - reminder_email: bool | None = None, - # Event property updates - status: str | None = None, - priority: int | None = None, - privacy: str | None = None, - attendees: str | None = None, - url: str | None = None, - color: str | None = None, - etag: str = "", -): - """Update any aspect of an existing event""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - - # Build update data with only non-None values - event_data = {} - if title is not None: - event_data["title"] = title - if start_datetime is not None: - event_data["start_datetime"] = start_datetime - if end_datetime is not None: - event_data["end_datetime"] = end_datetime - if all_day is not None: - event_data["all_day"] = all_day - if description is not None: - event_data["description"] = description - if location is not None: - event_data["location"] = location - if categories is not None: - event_data["categories"] = categories - if recurring is not None: - event_data["recurring"] = recurring - if recurrence_rule is not None: - event_data["recurrence_rule"] = recurrence_rule - if reminder_minutes is not None: - event_data["reminder_minutes"] = reminder_minutes - if reminder_email is not None: - event_data["reminder_email"] = reminder_email - if status is not None: - event_data["status"] = status - if priority is not None: - event_data["priority"] = priority - if privacy is not None: - event_data["privacy"] = privacy - if attendees is not None: - event_data["attendees"] = attendees - if url is not None: - event_data["url"] = url - if color is not None: - event_data["color"] = color - - return await client.calendar.update_event( - calendar_name, event_uid, event_data, etag - ) - - -@mcp.tool() -async def nc_calendar_delete_event( - calendar_name: str, - event_uid: str, - ctx: Context, -): - """Delete a calendar event""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - return await client.calendar.delete_event(calendar_name, event_uid) - - -@mcp.tool() -async def nc_calendar_create_meeting( - title: str, - date: str, - time: str, - ctx: Context, - duration_minutes: int = 60, - calendar_name: str = "personal", - attendees: str = "", - location: str = "", - description: str = "", - reminder_minutes: int = 15, -): - """Quick meeting creation with smart defaults - - This is a convenience function for creating events with common meeting defaults. - It automatically: - - Calculates end time based on duration - - Sets status to CONFIRMED - - Adds a reminder - - Uses simpler date/time inputs instead of full ISO format - - For full control over all event properties, use nc_calendar_create_event instead. - - Args: - title: Meeting title - date: Meeting date (YYYY-MM-DD format, e.g., "2025-01-15") - time: Meeting start time (HH:MM format, e.g., "14:00") - ctx: MCP context - duration_minutes: Meeting duration in minutes (default: 60) - calendar_name: Calendar to create the meeting in (default: "personal") - attendees: Comma-separated email addresses of attendees - location: Meeting location - description: Meeting description/agenda - reminder_minutes: Minutes before meeting to send reminder (default: 15) - - Returns: - Dict with meeting creation result - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - # Combine date and time for start_datetime - start_datetime = f"{date}T{time}:00" - - # Calculate end_datetime - - start_dt = datetime.fromisoformat(start_datetime) - end_dt = start_dt + timedelta(minutes=duration_minutes) - end_datetime = end_dt.isoformat() - - event_data = { - "title": title, - "start_datetime": start_datetime, - "end_datetime": end_datetime, - "all_day": False, - "description": description, - "location": location, - "attendees": attendees, - "reminder_minutes": reminder_minutes, - "status": "CONFIRMED", - "priority": 5, - "privacy": "PUBLIC", - } - - return await client.calendar.create_event(calendar_name, event_data) - - -@mcp.tool() -async def nc_calendar_get_upcoming_events( - ctx: Context, - calendar_name: str = "", # Empty = all calendars - days_ahead: int = 7, - limit: int = 10, -): - """Get upcoming events in next N days""" - client: NextcloudClient = ctx.request_context.lifespan_context.client - - now = datetime.now() - end_date = now + timedelta(days=days_ahead) - - start_date_str = now.strftime("%Y%m%dT%H%M%SZ") - end_date_str = end_date.strftime("%Y%m%dT%H%M%SZ") - - if calendar_name: - # Get events from specific calendar - return await client.calendar.get_calendar_events( - calendar_name=calendar_name, - start_date=start_date_str, - end_date=end_date_str, - limit=limit, - ) - else: - # Get events from all calendars - all_calendars = await client.calendar.list_calendars() - all_events = [] - - for calendar in all_calendars: - try: - events = await client.calendar.get_calendar_events( - calendar_name=calendar["name"], - start_date=start_date_str, - end_date=end_date_str, - limit=limit, - ) - # Add calendar info to each event - for event in events: - event["calendar_name"] = calendar["name"] - event["calendar_display_name"] = calendar["display_name"] - all_events.extend(events) - except Exception as e: - logger.warning( - f"Error getting events from calendar {calendar['name']}: {e}" - ) - continue - - # Sort by start time and limit - all_events.sort(key=lambda x: x.get("start_datetime", "")) - return all_events[:limit] - - -@mcp.tool() -async def nc_calendar_find_availability( - duration_minutes: int, - ctx: Context, - attendees: str = "", # Comma-separated email list - date_range_start: str = "", # "2025-07-28" - date_range_end: str = "", # "2025-08-04" - business_hours_only: bool = True, - exclude_weekends: bool = True, - preferred_times: str = "", # Comma-separated time ranges like "09:00-12:00,14:00-17:00" -): - """Find available time slots for scheduling meetings. - - This tool intelligently analyzes existing calendar events to find free time slots - that work for all specified attendees within the given constraints. - - Args: - duration_minutes: Required duration for the meeting in minutes - attendees: Comma-separated list of attendee email addresses to check availability for - date_range_start: Start date for availability search (YYYY-MM-DD) - date_range_end: End date for availability search (YYYY-MM-DD) - business_hours_only: Only suggest slots during business hours (9 AM - 5 PM) - exclude_weekends: Skip weekends when finding availability - preferred_times: Preferred time ranges as "HH:MM-HH:MM" (comma-separated) - - Returns: - List of available time slots with start/end times and duration - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - # Parse attendees - attendee_list = [] - if attendees: - attendee_list = [ - email.strip() for email in attendees.split(",") if email.strip() - ] - - # Parse preferred times - preferred_time_list = [] - if preferred_times: - preferred_time_list = [ - time_range.strip() - for time_range in preferred_times.split(",") - if time_range.strip() - ] - - # Build constraints - constraints = { - "business_hours_only": business_hours_only, - "exclude_weekends": exclude_weekends, - "preferred_times": preferred_time_list, - } - - return await client.calendar.find_availability( - duration_minutes=duration_minutes, - attendees=attendee_list, - date_range_start=date_range_start, - date_range_end=date_range_end, - constraints=constraints, - ) - - -@mcp.tool() -async def nc_calendar_bulk_operations( - operation: str, # "update", "delete", "move" - ctx: Context, - title_contains: Optional[str] = None, - categories: Optional[str] = None, # Comma-separated - calendar_name: Optional[str] = None, - start_date: str = "", # "2025-07-01" - end_date: str = "", # "2025-07-31" - status: Optional[str] = None, - location_contains: Optional[str] = None, - # Update operation parameters - new_title: Optional[str] = None, - new_description: Optional[str] = None, - new_location: Optional[str] = None, - new_categories: Optional[str] = None, - new_priority: Optional[int] = None, - new_reminder_minutes: Optional[int] = None, - # Move operation parameters - target_calendar: Optional[str] = None, -): - """Perform bulk operations (update/delete) on events matching filter criteria. - - This tool allows you to efficiently modify or delete multiple events at once - by applying filters to find matching events and then performing the specified operation. - - Args: - operation: Type of operation - "update" or "delete" - title_contains: Filter events where title contains this text - categories: Filter events containing any of these categories (comma-separated) - calendar_name: Filter events from this specific calendar - start_date: Filter events starting from this date (YYYY-MM-DD) - end_date: Filter events ending before this date (YYYY-MM-DD) - status: Filter events by status (CONFIRMED, TENTATIVE, CANCELLED) - location_contains: Filter events where location contains this text - - # For update operations: - new_title: New title for matching events - new_description: New description for matching events - new_location: New location for matching events - new_categories: New categories for matching events (comma-separated) - new_priority: New priority for matching events (1-9, 5=normal) - new_reminder_minutes: New reminder time in minutes before event - - # For move operations: - target_calendar: Calendar to move events to (requires operation="move") - - Returns: - Summary of operation results including counts and details - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - if operation not in ["update", "delete", "move"]: - raise ValueError("Operation must be 'update', 'delete', or 'move'") - - # Build filter criteria - filter_criteria = {} - if title_contains is not None: - filter_criteria["title_contains"] = title_contains - if categories is not None: - filter_criteria["categories"] = [cat.strip() for cat in categories.split(",")] - if status is not None: - filter_criteria["status"] = status - if location_contains is not None: - filter_criteria["location_contains"] = location_contains - if start_date: - filter_criteria["start_date"] = start_date - if end_date: - filter_criteria["end_date"] = end_date - - if operation == "delete": - # Find matching events and delete them - if calendar_name: - events = await client.calendar.get_calendar_events( - calendar_name=calendar_name, start_date=start_date, end_date=end_date - ) - if filter_criteria: - events = client.calendar._apply_event_filters(events, filter_criteria) - else: - events = await client.calendar.search_events_across_calendars( - start_date=start_date, end_date=end_date, filters=filter_criteria - ) - - deleted_count = 0 - failed_count = 0 - results = [] - - for event in events: - try: - await client.calendar.delete_event( - event.get("calendar_name", calendar_name), event["uid"] - ) - deleted_count += 1 - results.append( - { - "uid": event["uid"], - "status": "deleted", - "title": event.get("title", ""), - } - ) - except Exception as e: - failed_count += 1 - results.append( - { - "uid": event["uid"], - "status": "failed", - "error": str(e), - "title": event.get("title", ""), - } - ) - - return { - "operation": "delete", - "total_found": len(events), - "deleted_count": deleted_count, - "failed_count": failed_count, - "results": results, - } - - elif operation == "update": - # Build update data - update_data = {} - if new_title is not None: - update_data["title"] = new_title - if new_description is not None: - update_data["description"] = new_description - if new_location is not None: - update_data["location"] = new_location - if new_categories is not None: - update_data["categories"] = new_categories - if new_priority is not None: - update_data["priority"] = new_priority - if new_reminder_minutes is not None: - update_data["reminder_minutes"] = new_reminder_minutes - - if not update_data: - raise ValueError("No update data provided for update operation") - - return await client.calendar.bulk_update_events(filter_criteria, update_data) - - elif operation == "move": - if not target_calendar: - raise ValueError("target_calendar is required for move operation") - - # Find matching events - if calendar_name: - events = await client.calendar.get_calendar_events( - calendar_name=calendar_name, start_date=start_date, end_date=end_date - ) - if filter_criteria: - events = client.calendar._apply_event_filters(events, filter_criteria) - else: - events = await client.calendar.search_events_across_calendars( - start_date=start_date, end_date=end_date, filters=filter_criteria - ) - - moved_count = 0 - failed_count = 0 - results = [] - - for event in events: - try: - # Create event in target calendar - event_data = { - k: v - for k, v in event.items() - if k - not in [ - "uid", - "href", - "etag", - "calendar_name", - "calendar_display_name", - ] - } - - await client.calendar.create_event(target_calendar, event_data) - - # Delete from source calendar - await client.calendar.delete_event( - event.get("calendar_name", calendar_name), event["uid"] - ) - - moved_count += 1 - results.append( - { - "uid": event["uid"], - "status": "moved", - "title": event.get("title", ""), - "from_calendar": event.get("calendar_name", calendar_name), - "to_calendar": target_calendar, - } - ) - except Exception as e: - failed_count += 1 - results.append( - { - "uid": event["uid"], - "status": "failed", - "error": str(e), - "title": event.get("title", ""), - } - ) - - return { - "operation": "move", - "total_found": len(events), - "moved_count": moved_count, - "failed_count": failed_count, - "target_calendar": target_calendar, - "results": results, - } - - -@mcp.tool() -async def nc_calendar_manage_calendar( - action: str, # "create", "delete", "update", "list" - ctx: Context, - calendar_name: str = "", - display_name: str = "", - description: str = "", - color: str = "#1976D2", # Default blue color -): - """Manage calendar creation, deletion, and properties. - - This tool provides comprehensive calendar management functionality including - creating new calendars, deleting existing ones, and updating calendar properties. - - Args: - action: Action to perform - "create", "delete", "update", or "list" - calendar_name: Internal name for the calendar (required for create/delete/update) - display_name: Human-readable name for the calendar (used for create/update) - description: Description for the calendar (used for create/update) - color: Hex color code for the calendar (e.g., "#1976D2" for blue) - - Returns: - Result of the calendar management operation - """ - client: NextcloudClient = ctx.request_context.lifespan_context.client - - if action == "list": - return await client.calendar.list_calendars() - - elif action == "create": - if not calendar_name: - raise ValueError("calendar_name is required for create action") - - return await client.calendar.create_calendar( - calendar_name=calendar_name, - display_name=display_name or calendar_name, - description=description, - color=color, - ) - - elif action == "delete": - if not calendar_name: - raise ValueError("calendar_name is required for delete action") - - return await client.calendar.delete_calendar(calendar_name) - - elif action == "update": - if not calendar_name: - raise ValueError("calendar_name is required for update action") - - # Note: Calendar property updates require additional CalDAV PROPPATCH implementation - # For now, return an informative message - return { - "status": "not_implemented", - "message": "Calendar property updates require PROPPATCH implementation", - "calendar_name": calendar_name, - "requested_changes": { - "display_name": display_name, - "description": description, - "color": color, - }, - } - - else: - raise ValueError("Action must be 'create', 'delete', 'update', or 'list'") - - -def run(): - mcp.run() - - -if __name__ == "__main__": - logger.info("Starting now") - mcp.run() diff --git a/nextcloud_mcp_server/server/__init__.py b/nextcloud_mcp_server/server/__init__.py new file mode 100644 index 0000000..925c987 --- /dev/null +++ b/nextcloud_mcp_server/server/__init__.py @@ -0,0 +1,11 @@ +from .calendar import configure_calendar_tools +from .notes import configure_notes_tools +from .tables import configure_tables_tools +from .webdav import configure_webdav_tools + +__all__ = [ + "configure_calendar_tools", + "configure_notes_tools", + "configure_tables_tools", + "configure_webdav_tools", +] diff --git a/nextcloud_mcp_server/server/calendar.py b/nextcloud_mcp_server/server/calendar.py new file mode 100644 index 0000000..5e71a2c --- /dev/null +++ b/nextcloud_mcp_server/server/calendar.py @@ -0,0 +1,794 @@ +import datetime as dt +import logging +from typing import Optional + +from mcp.server.fastmcp import Context, FastMCP + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) + + +def configure_calendar_tools(mcp: FastMCP): + # Calendar tools + @mcp.tool() + async def nc_calendar_list_calendars(ctx: Context): + """List all available calendars for the user""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.calendar.list_calendars() + + @mcp.tool() + async def nc_calendar_create_event( + calendar_name: str, + title: str, + start_datetime: str, + ctx: Context, + end_datetime: str = "", + all_day: bool = False, + description: str = "", + location: str = "", + categories: str = "", + recurring: bool = False, + recurrence_rule: str = "", + recurrence_end_date: str = "", + reminder_minutes: int = 15, + reminder_email: bool = False, + status: str = "CONFIRMED", + priority: int = 5, + privacy: str = "PUBLIC", + attendees: str = "", + url: str = "", + color: str = "", + ): + """Create a comprehensive calendar event with full feature support + + Args: + calendar_name: Name of the calendar to create the event in + title: Event title + start_datetime: ISO format: "2025-01-15T14:00:00" or "2025-01-15" for all-day + ctx: MCP context + end_datetime: ISO format end time, empty for all-day events + all_day: Whether this is an all-day event + description: Event description/details + location: Event location + categories: Comma-separated categories (e.g., "work,meeting") + recurring: Whether this is a recurring event + recurrence_rule: RFC5545 RRULE (e.g., "FREQ=WEEKLY;BYDAY=MO,WE,FR") + recurrence_end_date: When to stop recurring + reminder_minutes: Minutes before event to send reminder + reminder_email: Whether to send email notification + status: Event status: CONFIRMED, TENTATIVE, or CANCELLED + priority: Priority level 1-9 (1=highest, 9=lowest, 5=normal) + privacy: Privacy level: PUBLIC, PRIVATE, or CONFIDENTIAL + attendees: Comma-separated email addresses + url: Related URL for the event + color: Event color (hex or name) + + Returns: + Dict with event creation result + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + event_data = { + "title": title, + "start_datetime": start_datetime, + "end_datetime": end_datetime, + "all_day": all_day, + "description": description, + "location": location, + "categories": categories, + "recurring": recurring, + "recurrence_rule": recurrence_rule, + "recurrence_end_date": recurrence_end_date, + "reminder_minutes": reminder_minutes, + "reminder_email": reminder_email, + "status": status, + "priority": priority, + "privacy": privacy, + "attendees": attendees, + "url": url, + "color": color, + } + + return await client.calendar.create_event(calendar_name, event_data) + + @mcp.tool() + async def nc_calendar_list_events( + calendar_name: str, + ctx: Context, + start_date: str = "", + end_date: str = "", + limit: int = 50, + min_attendees: Optional[int] = None, + min_duration_minutes: Optional[int] = None, + categories: Optional[str] = None, + status: Optional[str] = None, + title_contains: Optional[str] = None, + location_contains: Optional[str] = None, + search_all_calendars: bool = False, + ): + """List events in a calendar (or all calendars) within date range with advanced filtering. + + Args: + calendar_name: Name of the calendar to search. Ignored if search_all_calendars=True. + ctx: MCP context + start_date: Start date for search (YYYY-MM-DD format, e.g., "2025-01-01") + end_date: End date for search (YYYY-MM-DD format, e.g., "2025-01-31") + limit: Maximum number of events to return + min_attendees: Filter events with at least this many attendees + min_duration_minutes: Filter events with at least this duration + categories: Filter events containing any of these categories (comma-separated, e.g., "work,meeting") + status: Filter events by status (CONFIRMED, TENTATIVE, or CANCELLED) + title_contains: Filter events where title contains this text + location_contains: Filter events where location contains this text + search_all_calendars: If True, search across all calendars instead of just one + + Returns: + List of events matching the filters + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + # Convert YYYY-MM-DD format dates to datetime objects + start_datetime = None + end_datetime = None + + if start_date: + try: + start_datetime = dt.datetime.strptime(start_date, "%Y-%m-%d") + except ValueError: + # If parsing fails, try to parse as ISO format + try: + start_datetime = dt.datetime.fromisoformat(start_date) + except ValueError: + logger.warning(f"Invalid start_date format: {start_date}") + + if end_date: + try: + # For end date, set to end of day (23:59:59) + end_datetime = dt.datetime.strptime(end_date, "%Y-%m-%d").replace( + hour=23, minute=59, second=59 + ) + except ValueError: + # If parsing fails, try to parse as ISO format + try: + end_datetime = dt.datetime.fromisoformat(end_date) + except ValueError: + logger.warning(f"Invalid end_date format: {end_date}") + + # Build filters dictionary + filters = {} + if min_attendees is not None: + filters["min_attendees"] = min_attendees + if min_duration_minutes is not None: + filters["min_duration_minutes"] = min_duration_minutes + if categories is not None: + filters["categories"] = [cat.strip() for cat in categories.split(",")] + if status is not None: + filters["status"] = status + if title_contains is not None: + filters["title_contains"] = title_contains + if location_contains is not None: + filters["location_contains"] = location_contains + + if search_all_calendars: + # Search across all calendars with filters + events = await client.calendar.search_events_across_calendars( + start_datetime=start_datetime, + end_datetime=end_datetime, + filters=filters if filters else None, + ) + return events[:limit] + else: + # Search in specific calendar + events = await client.calendar.get_calendar_events( + calendar_name=calendar_name, + start_datetime=start_datetime, + end_datetime=end_datetime, + limit=limit, + ) + + # Apply filters if provided + if filters: + events = client.calendar._apply_event_filters(events, filters) + + return events + + @mcp.tool() + async def nc_calendar_get_event( + calendar_name: str, + event_uid: str, + ctx: Context, + ): + """Get detailed information about a specific event""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + event_data, etag = await client.calendar.get_event(calendar_name, event_uid) + return event_data + + @mcp.tool() + async def nc_calendar_update_event( + calendar_name: str, + event_uid: str, + ctx: Context, + # All the same parameters as create_event but optional + title: str | None = None, + start_datetime: str | None = None, + end_datetime: str | None = None, + all_day: bool | None = None, + description: str | None = None, + location: str | None = None, + categories: str | None = None, + # Recurrence updates + recurring: bool | None = None, + recurrence_rule: str | None = None, + # Notification updates + reminder_minutes: int | None = None, + reminder_email: bool | None = None, + # Event property updates + status: str | None = None, + priority: int | None = None, + privacy: str | None = None, + attendees: str | None = None, + url: str | None = None, + color: str | None = None, + etag: str = "", + ): + """Update any aspect of an existing event""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + + # Build update data with only non-None values + event_data = {} + if title is not None: + event_data["title"] = title + if start_datetime is not None: + event_data["start_datetime"] = start_datetime + if end_datetime is not None: + event_data["end_datetime"] = end_datetime + if all_day is not None: + event_data["all_day"] = all_day + if description is not None: + event_data["description"] = description + if location is not None: + event_data["location"] = location + if categories is not None: + event_data["categories"] = categories + if recurring is not None: + event_data["recurring"] = recurring + if recurrence_rule is not None: + event_data["recurrence_rule"] = recurrence_rule + if reminder_minutes is not None: + event_data["reminder_minutes"] = reminder_minutes + if reminder_email is not None: + event_data["reminder_email"] = reminder_email + if status is not None: + event_data["status"] = status + if priority is not None: + event_data["priority"] = priority + if privacy is not None: + event_data["privacy"] = privacy + if attendees is not None: + event_data["attendees"] = attendees + if url is not None: + event_data["url"] = url + if color is not None: + event_data["color"] = color + + return await client.calendar.update_event( + calendar_name, event_uid, event_data, etag + ) + + @mcp.tool() + async def nc_calendar_delete_event( + calendar_name: str, + event_uid: str, + ctx: Context, + ): + """Delete a calendar event""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.calendar.delete_event(calendar_name, event_uid) + + @mcp.tool() + async def nc_calendar_create_meeting( + title: str, + date: str, + time: str, + ctx: Context, + duration_minutes: int = 60, + calendar_name: str = "personal", + attendees: str = "", + location: str = "", + description: str = "", + reminder_minutes: int = 15, + ): + """Quick meeting creation with smart defaults + + This is a convenience function for creating events with common meeting defaults. + It automatically: + - Calculates end time based on duration + - Sets status to CONFIRMED + - Adds a reminder + - Uses simpler date/time inputs instead of full ISO format + + For full control over all event properties, use nc_calendar_create_event instead. + + Args: + title: Meeting title + date: Meeting date (YYYY-MM-DD format, e.g., "2025-01-15") + time: Meeting start time (HH:MM format, e.g., "14:00") + ctx: MCP context + duration_minutes: Meeting duration in minutes (default: 60) + calendar_name: Calendar to create the meeting in (default: "personal") + attendees: Comma-separated email addresses of attendees + location: Meeting location + description: Meeting description/agenda + reminder_minutes: Minutes before meeting to send reminder (default: 15) + + Returns: + Dict with meeting creation result + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + # Combine date and time for start_datetime + start_datetime = f"{date}T{time}:00" + + # Calculate end_datetime + start_dt = dt.datetime.fromisoformat(start_datetime) + end_dt = start_dt + dt.timedelta(minutes=duration_minutes) + end_datetime = end_dt.isoformat() + + event_data = { + "title": title, + "start_datetime": start_datetime, + "end_datetime": end_datetime, + "all_day": False, + "description": description, + "location": location, + "attendees": attendees, + "reminder_minutes": reminder_minutes, + "status": "CONFIRMED", + "priority": 5, + "privacy": "PUBLIC", + } + + return await client.calendar.create_event(calendar_name, event_data) + + @mcp.tool() + async def nc_calendar_get_upcoming_events( + ctx: Context, + calendar_name: str = "", # Empty = all calendars + days_ahead: int = 7, + limit: int = 10, + ): + """Get upcoming events in next N days""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + + now = dt.datetime.now() + end_datetime = now + dt.timedelta(days=days_ahead) + + if calendar_name: + # Get events from specific calendar + return await client.calendar.get_calendar_events( + calendar_name=calendar_name, + start_datetime=now, + end_datetime=end_datetime, + limit=limit, + ) + else: + # Get events from all calendars + all_calendars = await client.calendar.list_calendars() + all_events = [] + + for calendar in all_calendars: + try: + events = await client.calendar.get_calendar_events( + calendar_name=calendar["name"], + start_datetime=now, + end_datetime=end_datetime, + limit=limit, + ) + # Add calendar info to each event + for event in events: + event["calendar_name"] = calendar["name"] + event["calendar_display_name"] = calendar["display_name"] + all_events.extend(events) + except Exception as e: + logger.warning( + f"Error getting events from calendar {calendar['name']}: {e}" + ) + continue + + # Sort by start time and limit + all_events.sort(key=lambda x: x.get("start_datetime", "")) + return all_events[:limit] + + @mcp.tool() + async def nc_calendar_find_availability( + duration_minutes: int, + ctx: Context, + attendees: str = "", # Comma-separated email list + date_range_start: str = "", # "2025-07-28" + date_range_end: str = "", # "2025-08-04" + business_hours_only: bool = True, + exclude_weekends: bool = True, + preferred_times: str = "", # Comma-separated time ranges like "09:00-12:00,14:00-17:00" + ): + """Find available time slots for scheduling meetings. + + This tool intelligently analyzes existing calendar events to find free time slots + that work for all specified attendees within the given constraints. + + Args: + duration_minutes: Required duration for the meeting in minutes + attendees: Comma-separated list of attendee email addresses to check availability for + date_range_start: Start date for availability search (YYYY-MM-DD) + date_range_end: End date for availability search (YYYY-MM-DD) + business_hours_only: Only suggest slots during business hours (9 AM - 5 PM) + exclude_weekends: Skip weekends when finding availability + preferred_times: Preferred time ranges as "HH:MM-HH:MM" (comma-separated) + + Returns: + List of available time slots with start/end times and duration + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + # Parse attendees + attendee_list = [] + if attendees: + attendee_list = [ + email.strip() for email in attendees.split(",") if email.strip() + ] + + # Parse preferred times + preferred_time_list = [] + if preferred_times: + preferred_time_list = [ + time_range.strip() + for time_range in preferred_times.split(",") + if time_range.strip() + ] + + # Convert date strings to datetime objects + start_datetime = None + end_datetime = None + + if date_range_start: + try: + start_datetime = dt.datetime.strptime(date_range_start, "%Y-%m-%d") + except ValueError: + logger.warning(f"Invalid date_range_start format: {date_range_start}") + + if date_range_end: + try: + end_datetime = dt.datetime.strptime(date_range_end, "%Y-%m-%d").replace( + hour=23, minute=59, second=59 + ) + except ValueError: + logger.warning(f"Invalid date_range_end format: {date_range_end}") + + # Build constraints + constraints = { + "business_hours_only": business_hours_only, + "exclude_weekends": exclude_weekends, + "preferred_times": preferred_time_list, + } + + return await client.calendar.find_availability( + duration_minutes=duration_minutes, + attendees=attendee_list, + start_datetime=start_datetime, + end_datetime=end_datetime, + constraints=constraints, + ) + + @mcp.tool() + async def nc_calendar_bulk_operations( + operation: str, # "update", "delete", "move" + ctx: Context, + title_contains: Optional[str] = None, + categories: Optional[str] = None, # Comma-separated + calendar_name: Optional[str] = None, + start_date: str = "", # "2025-07-01" + end_date: str = "", # "2025-07-31" + status: Optional[str] = None, + location_contains: Optional[str] = None, + # Update operation parameters + new_title: Optional[str] = None, + new_description: Optional[str] = None, + new_location: Optional[str] = None, + new_categories: Optional[str] = None, + new_priority: Optional[int] = None, + new_reminder_minutes: Optional[int] = None, + # Move operation parameters + target_calendar: Optional[str] = None, + ): + """Perform bulk operations (update/delete) on events matching filter criteria. + + This tool allows you to efficiently modify or delete multiple events at once + by applying filters to find matching events and then performing the specified operation. + + Args: + operation: Type of operation - "update" or "delete" + title_contains: Filter events where title contains this text + categories: Filter events containing any of these categories (comma-separated) + calendar_name: Filter events from this specific calendar + start_date: Filter events starting from this date (YYYY-MM-DD) + end_date: Filter events ending before this date (YYYY-MM-DD) + status: Filter events by status (CONFIRMED, TENTATIVE, CANCELLED) + location_contains: Filter events where location contains this text + + # For update operations: + new_title: New title for matching events + new_description: New description for matching events + new_location: New location for matching events + new_categories: New categories for matching events (comma-separated) + new_priority: New priority for matching events (1-9, 5=normal) + new_reminder_minutes: New reminder time in minutes before event + + # For move operations: + target_calendar: Calendar to move events to (requires operation="move") + + Returns: + Summary of operation results including counts and details + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + if operation not in ["update", "delete", "move"]: + raise ValueError("Operation must be 'update', 'delete', or 'move'") + + # Convert date strings to datetime objects + start_datetime = None + end_datetime = None + + if start_date: + try: + start_datetime = dt.datetime.strptime(start_date, "%Y-%m-%d") + except ValueError: + logger.warning(f"Invalid start_date format: {start_date}") + + if end_date: + try: + end_datetime = dt.datetime.strptime(end_date, "%Y-%m-%d").replace( + hour=23, minute=59, second=59 + ) + except ValueError: + logger.warning(f"Invalid end_date format: {end_date}") + + # Build filter criteria + filter_criteria = {} + if title_contains is not None: + filter_criteria["title_contains"] = title_contains + if categories is not None: + filter_criteria["categories"] = [ + cat.strip() for cat in categories.split(",") + ] + if status is not None: + filter_criteria["status"] = status + if location_contains is not None: + filter_criteria["location_contains"] = location_contains + # Add datetime strings for client compatibility + if start_date: + filter_criteria["start_date"] = start_date + if end_date: + filter_criteria["end_date"] = end_date + + if operation == "delete": + # Find matching events and delete them + if calendar_name: + events = await client.calendar.get_calendar_events( + calendar_name=calendar_name, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + if filter_criteria: + events = client.calendar._apply_event_filters( + events, filter_criteria + ) + else: + events = await client.calendar.search_events_across_calendars( + start_datetime=start_datetime, + end_datetime=end_datetime, + filters=filter_criteria, + ) + + deleted_count = 0 + failed_count = 0 + results = [] + + for event in events: + try: + await client.calendar.delete_event( + event.get("calendar_name", calendar_name), event["uid"] + ) + deleted_count += 1 + results.append( + { + "uid": event["uid"], + "status": "deleted", + "title": event.get("title", ""), + } + ) + except Exception as e: + failed_count += 1 + results.append( + { + "uid": event["uid"], + "status": "failed", + "error": str(e), + "title": event.get("title", ""), + } + ) + + return { + "operation": "delete", + "total_found": len(events), + "deleted_count": deleted_count, + "failed_count": failed_count, + "results": results, + } + + elif operation == "update": + # Build update data + update_data = {} + if new_title is not None: + update_data["title"] = new_title + if new_description is not None: + update_data["description"] = new_description + if new_location is not None: + update_data["location"] = new_location + if new_categories is not None: + update_data["categories"] = new_categories + if new_priority is not None: + update_data["priority"] = new_priority + if new_reminder_minutes is not None: + update_data["reminder_minutes"] = new_reminder_minutes + + if not update_data: + raise ValueError("No update data provided for update operation") + + return await client.calendar.bulk_update_events( + filter_criteria, update_data + ) + + elif operation == "move": + if not target_calendar: + raise ValueError("target_calendar is required for move operation") + + # Find matching events + if calendar_name: + events = await client.calendar.get_calendar_events( + calendar_name=calendar_name, + start_datetime=start_datetime, + end_datetime=end_datetime, + ) + if filter_criteria: + events = client.calendar._apply_event_filters( + events, filter_criteria + ) + else: + events = await client.calendar.search_events_across_calendars( + start_datetime=start_datetime, + end_datetime=end_datetime, + filters=filter_criteria, + ) + + moved_count = 0 + failed_count = 0 + results = [] + + for event in events: + try: + # Create event in target calendar + event_data = { + k: v + for k, v in event.items() + if k + not in [ + "uid", + "href", + "etag", + "calendar_name", + "calendar_display_name", + ] + } + + await client.calendar.create_event(target_calendar, event_data) + + # Delete from source calendar + await client.calendar.delete_event( + event.get("calendar_name", calendar_name), event["uid"] + ) + + moved_count += 1 + results.append( + { + "uid": event["uid"], + "status": "moved", + "title": event.get("title", ""), + "from_calendar": event.get("calendar_name", calendar_name), + "to_calendar": target_calendar, + } + ) + except Exception as e: + failed_count += 1 + results.append( + { + "uid": event["uid"], + "status": "failed", + "error": str(e), + "title": event.get("title", ""), + } + ) + + return { + "operation": "move", + "total_found": len(events), + "moved_count": moved_count, + "failed_count": failed_count, + "target_calendar": target_calendar, + "results": results, + } + + @mcp.tool() + async def nc_calendar_manage_calendar( + action: str, # "create", "delete", "update", "list" + ctx: Context, + calendar_name: str = "", + display_name: str = "", + description: str = "", + color: str = "#1976D2", # Default blue color + ): + """Manage calendar creation, deletion, and properties. + + This tool provides comprehensive calendar management functionality including + creating new calendars, deleting existing ones, and updating calendar properties. + + Args: + action: Action to perform - "create", "delete", "update", or "list" + calendar_name: Internal name for the calendar (required for create/delete/update) + display_name: Human-readable name for the calendar (used for create/update) + description: Description for the calendar (used for create/update) + color: Hex color code for the calendar (e.g., "#1976D2" for blue) + + Returns: + Result of the calendar management operation + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + if action == "list": + return await client.calendar.list_calendars() + + elif action == "create": + if not calendar_name: + raise ValueError("calendar_name is required for create action") + + return await client.calendar.create_calendar( + calendar_name=calendar_name, + display_name=display_name or calendar_name, + description=description, + color=color, + ) + + elif action == "delete": + if not calendar_name: + raise ValueError("calendar_name is required for delete action") + + return await client.calendar.delete_calendar(calendar_name) + + elif action == "update": + if not calendar_name: + raise ValueError("calendar_name is required for update action") + + # Note: Calendar property updates require additional CalDAV PROPPATCH implementation + # For now, return an informative message + return { + "status": "not_implemented", + "message": "Calendar property updates require PROPPATCH implementation", + "calendar_name": calendar_name, + "requested_changes": { + "display_name": display_name, + "description": description, + "color": color, + }, + } + + else: + raise ValueError("Action must be 'create', 'delete', 'update', or 'list'") diff --git a/nextcloud_mcp_server/server/notes.py b/nextcloud_mcp_server/server/notes.py new file mode 100644 index 0000000..38b3f29 --- /dev/null +++ b/nextcloud_mcp_server/server/notes.py @@ -0,0 +1,95 @@ +import logging + +from mcp.server.fastmcp import Context, FastMCP + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) + + +def configure_notes_tools(mcp: FastMCP): + @mcp.resource("notes://settings") + async def notes_get_settings(): + """Get the Notes App settings""" + ctx: Context = ( + mcp.get_context() + ) # https://github.com/modelcontextprotocol/python-sdk/issues/244 + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes.get_settings() + + @mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}") + async def nc_notes_get_attachment(note_id: int, attachment_filename: str): + """Get a specific attachment from a note""" + ctx: Context = mcp.get_context() + client: NextcloudClient = ctx.request_context.lifespan_context.client + # Assuming a method get_note_attachment exists in the client + # This method should return the raw content and determine the mime type + content, mime_type = await client.webdav.get_note_attachment( + note_id=note_id, filename=attachment_filename + ) + return { + "contents": [ + { + # Use uppercase 'Notes' to match the decorator + "uri": f"nc://Notes/{note_id}/attachments/{attachment_filename}", + "mimeType": mime_type, # Client needs to determine this + "data": content, # Return raw bytes/data + } + ] + } + + @mcp.tool() + async def nc_get_note(note_id: int, ctx: Context): + """Get user note using note id""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes.get_note(note_id) + + @mcp.tool() + async def nc_notes_create_note( + title: str, content: str, category: str, ctx: Context + ): + """Create a new note""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes.create_note( + title=title, + content=content, + category=category, + ) + + @mcp.tool() + async def nc_notes_update_note( + note_id: int, + etag: str, + title: str | None, + content: str | None, + category: str | None, + ctx: Context, + ): + logger.info("Updating note %s", note_id) + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes.update( + note_id=note_id, + etag=etag, + title=title, + content=content, + category=category, + ) + + @mcp.tool() + async def nc_notes_append_content(note_id: int, content: str, ctx: Context): + """Append content to an existing note with a clear separator""" + logger.info("Appending content to note %s", note_id) + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes.append_content(note_id=note_id, content=content) + + @mcp.tool() + async def nc_notes_search_notes(query: str, ctx: Context): + """Search notes by title or content, returning only id, title, and category.""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes_search_notes(query=query) + + @mcp.tool() + async def nc_notes_delete_note(note_id: int, ctx: Context): + logger.info("Deleting note %s", note_id) + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.notes.delete_note(note_id) diff --git a/nextcloud_mcp_server/server/tables.py b/nextcloud_mcp_server/server/tables.py new file mode 100644 index 0000000..f9f7699 --- /dev/null +++ b/nextcloud_mcp_server/server/tables.py @@ -0,0 +1,57 @@ +import logging + +from mcp.server.fastmcp import Context, FastMCP + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) + + +def configure_tables_tools(mcp: FastMCP): + # Tables tools + @mcp.tool() + async def nc_tables_list_tables(ctx: Context): + """List all tables available to the user""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.tables.list_tables() + + @mcp.tool() + async def nc_tables_get_schema(table_id: int, ctx: Context): + """Get the schema/structure of a specific table including columns and views""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.tables.get_table_schema(table_id) + + @mcp.tool() + async def nc_tables_read_table( + table_id: int, + ctx: Context, + limit: int | None = None, + offset: int | None = None, + ): + """Read rows from a table with optional pagination""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.tables.get_table_rows(table_id, limit, offset) + + @mcp.tool() + async def nc_tables_insert_row(table_id: int, data: dict, ctx: Context): + """Insert a new row into a table. + + Data should be a dictionary mapping column IDs to values, e.g. {1: "text", 2: 42} + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.tables.create_row(table_id, data) + + @mcp.tool() + async def nc_tables_update_row(row_id: int, data: dict, ctx: Context): + """Update an existing row in a table. + + Data should be a dictionary mapping column IDs to new values, e.g. {1: "new text", 2: 99} + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.tables.update_row(row_id, data) + + @mcp.tool() + async def nc_tables_delete_row(row_id: int, ctx: Context): + """Delete a row from a table""" + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.tables.delete_row(row_id) diff --git a/nextcloud_mcp_server/server/webdav.py b/nextcloud_mcp_server/server/webdav.py new file mode 100644 index 0000000..ae4040a --- /dev/null +++ b/nextcloud_mcp_server/server/webdav.py @@ -0,0 +1,151 @@ +import logging + +from mcp.server.fastmcp import Context, FastMCP + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) + + +def configure_webdav_tools(mcp: FastMCP): + # WebDAV file system tools + @mcp.tool() + async def nc_webdav_list_directory(ctx: Context, path: str = ""): + """List files and directories in the specified NextCloud path. + + Args: + path: Directory path to list (empty string for root directory) + + Returns: + List of items with metadata including name, path, is_directory, size, content_type, last_modified + + Examples: + # List root directory + await nc_webdav_list_directory("") + + # List a specific folder + await nc_webdav_list_directory("Documents/Projects") + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.webdav.list_directory(path) + + @mcp.tool() + async def nc_webdav_read_file(path: str, ctx: Context): + """Read the content of a file from NextCloud. + + Args: + path: Full path to the file to read + + Returns: + Dict with path, content, content_type, size, and encoding (if binary) + Text files are decoded to UTF-8, binary files are base64 encoded + + Examples: + # Read a text file + result = await nc_webdav_read_file("Documents/readme.txt") + print(result['content']) # Decoded text content + + # Read a binary file + result = await nc_webdav_read_file("Images/photo.jpg") + print(result['encoding']) # 'base64' + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + content, content_type = await client.webdav.read_file(path) + + # For text files, decode content for easier viewing + if content_type and content_type.startswith("text/"): + try: + decoded_content = content.decode("utf-8") + return { + "path": path, + "content": decoded_content, + "content_type": content_type, + "size": len(content), + } + except UnicodeDecodeError: + pass + + # For binary files, return metadata and base64 encoded content + import base64 + + return { + "path": path, + "content": base64.b64encode(content).decode("ascii"), + "content_type": content_type, + "size": len(content), + "encoding": "base64", + } + + @mcp.tool() + async def nc_webdav_write_file( + path: str, content: str, ctx: Context, content_type: str | None = None + ): + """Write content to a file in NextCloud. + + Args: + path: Full path where to write the file + content: File content (text or base64 for binary) + content_type: MIME type (auto-detected if not provided, use 'type;base64' for binary) + + Returns: + Dict with status_code indicating success + + Examples: + # Write a text file + await nc_webdav_write_file("Documents/notes.md", "# My Notes\nContent here...") + + # Write binary data (base64 encoded) + await nc_webdav_write_file("files/data.bin", base64_content, "application/octet-stream;base64") + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + + # Handle base64 encoded content + if content_type and "base64" in content_type.lower(): + import base64 + + content_bytes = base64.b64decode(content) + content_type = content_type.replace(";base64", "") + else: + content_bytes = content.encode("utf-8") + + return await client.webdav.write_file(path, content_bytes, content_type) + + @mcp.tool() + async def nc_webdav_create_directory(path: str, ctx: Context): + """Create a directory in NextCloud. + + Args: + path: Full path of the directory to create + + Returns: + Dict with status_code (201 for created, 405 if already exists) + + Examples: + # Create a single directory + await nc_webdav_create_directory("NewProject") + + # Create nested directories (parent must exist) + await nc_webdav_create_directory("Projects/MyApp/docs") + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.webdav.create_directory(path) + + @mcp.tool() + async def nc_webdav_delete_resource(path: str, ctx: Context): + """Delete a file or directory in NextCloud. + + Args: + path: Full path of the file or directory to delete + + Returns: + Dict with status_code indicating result (404 if not found) + + Examples: + # Delete a file + await nc_webdav_delete_resource("old_document.txt") + + # Delete a directory (will delete all contents) + await nc_webdav_delete_resource("temp_folder") + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.webdav.delete_resource(path) diff --git a/pyproject.toml b/pyproject.toml index 27e9064..a834e87 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,16 +14,13 @@ dependencies = [ "icalendar (>=6.0.0,<7.0.0)" ] -[project.scripts] -nc-mcp-server = "nextcloud_mcp_server.server:run" - [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_test_loop_scope = "session" asyncio_default_fixture_loop_scope = "session" log_cli = 1 -log_cli_level = "WARN" -log_level = "WARN" +log_cli_level = "INFO" +log_level = "INFO" markers = [ "integration: marks tests as slow (deselect with '-m \"not slow\"')" ] diff --git a/tests/conftest.py b/tests/conftest.py index 3664720..ff28f07 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,18 +1,20 @@ -import pytest -import os import logging +import os import uuid -from nextcloud_mcp_server.client import NextcloudClient +from typing import Any, AsyncGenerator + +import pytest from httpx import HTTPStatusError -import asyncio +from mcp import ClientSession +from mcp.client.sse import sse_client + +from nextcloud_mcp_server.client import NextcloudClient logger = logging.getLogger(__name__) -# pytestmark = pytest.mark.asyncio(loop_scope="package") - @pytest.fixture(scope="session") -async def nc_client() -> NextcloudClient: +async def nc_client() -> AsyncGenerator[NextcloudClient, Any]: """ Fixture to create a NextcloudClient instance for integration tests. Uses environment variables for configuration. @@ -29,10 +31,54 @@ async def nc_client() -> NextcloudClient: logger.info( "NextcloudClient session fixture initialized and capabilities checked." ) + yield client except Exception as e: logger.error(f"Failed to initialize NextcloudClient session fixture: {e}") pytest.fail(f"Failed to connect to Nextcloud or get capabilities: {e}") - return client + finally: + await client.close() + + +@pytest.fixture +async def nc_mcp_client() -> AsyncGenerator[ClientSession, Any]: + """ + Fixture to create an MCP client session for integration tests. + """ + logger.info("Creating SSE client") + sse_context = sse_client(url="http://127.0.0.1:8000/sse") + session_context = None + + try: + read, write = await sse_context.__aenter__() + session_context = ClientSession(read, write) + session = await session_context.__aenter__() + await session.initialize() + logger.info("MCP client session initialized successfully") + + yield session + + finally: + # Clean up in reverse order, ignoring task scope issues + if session_context is not None: + try: + await session_context.__aexit__(None, None, None) + except RuntimeError as e: + if "cancel scope" in str(e): + logger.debug(f"Ignoring cancel scope teardown issue: {e}") + else: + logger.warning(f"Error closing session: {e}") + except Exception as e: + logger.warning(f"Error closing session: {e}") + + try: + await sse_context.__aexit__(None, None, None) + except RuntimeError as e: + if "cancel scope" in str(e): + logger.debug(f"Ignoring cancel scope teardown issue: {e}") + else: + logger.warning(f"Error closing SSE client: {e}") + except Exception as e: + logger.warning(f"Error closing SSE client: {e}") @pytest.fixture @@ -41,7 +87,6 @@ async def temporary_note(nc_client: NextcloudClient): Fixture to create a temporary note for a test and ensure its deletion afterward. Yields the created note dictionary. """ - asyncio.new_event_loop() note_id = None unique_suffix = uuid.uuid4().hex[:8] @@ -87,7 +132,6 @@ async def temporary_note_with_attachment( Yields a tuple: (note_data, attachment_filename, attachment_content). Depends on the temporary_note fixture. """ - asyncio.new_event_loop() note_data = temporary_note note_id = note_data["id"] diff --git a/tests/integration/test_attachments.py b/tests/integration/test_attachments.py index a489583..b5ffa36 100644 --- a/tests/integration/test_attachments.py +++ b/tests/integration/test_attachments.py @@ -1,7 +1,8 @@ -import pytest import logging import time import uuid + +import pytest from httpx import HTTPStatusError from nextcloud_mcp_server.client import NextcloudClient diff --git a/tests/integration/test_calendar_operations.py b/tests/integration/test_calendar_operations.py index 2ac0239..1de3d27 100644 --- a/tests/integration/test_calendar_operations.py +++ b/tests/integration/test_calendar_operations.py @@ -1,9 +1,10 @@ """Integration tests for Calendar CalDAV operations.""" -import pytest import logging import uuid from datetime import datetime, timedelta + +import pytest from httpx import HTTPStatusError from nextcloud_mcp_server.client import NextcloudClient @@ -14,6 +15,16 @@ logger = logging.getLogger(__name__) pytestmark = pytest.mark.integration +@pytest.fixture +async def calendar_test_client(): + """Create a new, isolated NextcloudClient for calendar tests.""" + client = NextcloudClient.from_env() + try: + yield client + finally: + await client.close() + + @pytest.fixture def test_calendar_name(): """Unique calendar name for testing.""" @@ -21,31 +32,46 @@ def test_calendar_name(): @pytest.fixture -async def temporary_calendar(nc_client: NextcloudClient, test_calendar_name: str): +async def temporary_calendar( + calendar_test_client: NextcloudClient, test_calendar_name: str +): """Create a temporary calendar for testing and clean up afterward.""" calendar_name = test_calendar_name try: - # Create a test calendar if possible - # Note: Calendar creation might require admin permissions - # For now, we'll use an existing calendar or create events in default calendar + # Create a test calendar + logger.info(f"Creating temporary calendar: {calendar_name}") + result = await calendar_test_client.calendar.create_calendar( + calendar_name=calendar_name, + display_name=f"Test Calendar {calendar_name}", + description="Temporary calendar for integration testing", + color="#FF5722", + ) - # Try to find an existing calendar to use - calendars = await nc_client.calendar.list_calendars() - if calendars: - calendar_name = calendars[0]["name"] - logger.info(f"Using existing calendar: {calendar_name}") - yield calendar_name - else: - pytest.skip("No calendars available for testing") + if result["status_code"] not in [200, 201]: + pytest.skip(f"Failed to create temporary calendar: {result}") + + logger.info(f"Created temporary calendar: {calendar_name}") + yield calendar_name except Exception as e: logger.error(f"Error setting up temporary calendar: {e}") pytest.skip(f"Calendar setup failed: {e}") + finally: + # Cleanup: Delete the temporary calendar + try: + logger.info(f"Cleaning up temporary calendar: {calendar_name}") + await calendar_test_client.calendar.delete_calendar(calendar_name) + logger.info(f"Successfully deleted temporary calendar: {calendar_name}") + except Exception as e: + logger.error(f"Error deleting temporary calendar {calendar_name}: {e}") + @pytest.fixture -async def temporary_event(nc_client: NextcloudClient, temporary_calendar: str): +async def temporary_event( + calendar_test_client: NextcloudClient, temporary_calendar: str +): """Create a temporary event for testing and clean up afterward.""" event_uid = None calendar_name = temporary_calendar @@ -65,7 +91,9 @@ async def temporary_event(nc_client: NextcloudClient, temporary_calendar: str): try: logger.info(f"Creating temporary event in calendar: {calendar_name}") - result = await nc_client.calendar.create_event(calendar_name, event_data) + result = await calendar_test_client.calendar.create_event( + calendar_name, event_data + ) event_uid = result.get("uid") if not event_uid: @@ -79,7 +107,9 @@ async def temporary_event(nc_client: NextcloudClient, temporary_calendar: str): if event_uid: try: logger.info(f"Cleaning up temporary event: {event_uid}") - await nc_client.calendar.delete_event(calendar_name, event_uid) + await calendar_test_client.calendar.delete_event( + calendar_name, event_uid + ) logger.info(f"Successfully deleted temporary event: {event_uid}") except HTTPStatusError as e: if e.response.status_code != 404: @@ -90,9 +120,9 @@ async def temporary_event(nc_client: NextcloudClient, temporary_calendar: str): ) -async def test_list_calendars(nc_client: NextcloudClient): +async def test_list_calendars(calendar_test_client: NextcloudClient): """Test listing available calendars.""" - calendars = await nc_client.calendar.list_calendars() + calendars = await calendar_test_client.calendar.list_calendars() assert isinstance(calendars, list) @@ -114,7 +144,7 @@ async def test_list_calendars(nc_client: NextcloudClient): async def test_create_and_delete_event( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test creating and deleting a basic event.""" calendar_name = temporary_calendar @@ -133,7 +163,9 @@ async def test_create_and_delete_event( } try: - result = await nc_client.calendar.create_event(calendar_name, event_data) + result = await calendar_test_client.calendar.create_event( + calendar_name, event_data + ) assert "uid" in result assert result["status_code"] in [200, 201, 204] @@ -141,7 +173,7 @@ async def test_create_and_delete_event( logger.info(f"Created event with UID: {event_uid}") # Verify event was created by retrieving it - retrieved_event, etag = await nc_client.calendar.get_event( + retrieved_event, etag = await calendar_test_client.calendar.get_event( calendar_name, event_uid ) assert retrieved_event["uid"] == event_uid @@ -149,7 +181,9 @@ async def test_create_and_delete_event( assert retrieved_event["location"] == "Test Room" # Delete event - delete_result = await nc_client.calendar.delete_event(calendar_name, event_uid) + delete_result = await calendar_test_client.calendar.delete_event( + calendar_name, event_uid + ) assert delete_result["status_code"] in [200, 204, 404] logger.info(f"Successfully deleted event: {event_uid}") @@ -160,7 +194,7 @@ async def test_create_and_delete_event( async def test_create_all_day_event( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test creating an all-day event.""" calendar_name = temporary_calendar @@ -175,19 +209,21 @@ async def test_create_all_day_event( } try: - result = await nc_client.calendar.create_event(calendar_name, event_data) + result = await calendar_test_client.calendar.create_event( + calendar_name, event_data + ) event_uid = result["uid"] logger.info(f"Created all-day event with UID: {event_uid}") # Verify event - retrieved_event, _ = await nc_client.calendar.get_event( + retrieved_event, _ = await calendar_test_client.calendar.get_event( calendar_name, event_uid ) assert retrieved_event["title"] == "All Day Test Event" assert retrieved_event.get("all_day") is True # Cleanup - await nc_client.calendar.delete_event(calendar_name, event_uid) + await calendar_test_client.calendar.delete_event(calendar_name, event_uid) except Exception as e: logger.error(f"All-day event test failed: {e}") @@ -195,7 +231,7 @@ async def test_create_all_day_event( async def test_create_recurring_event( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test creating a recurring event.""" calendar_name = temporary_calendar @@ -212,35 +248,42 @@ async def test_create_recurring_event( } try: - result = await nc_client.calendar.create_event(calendar_name, event_data) + result = await calendar_test_client.calendar.create_event( + calendar_name, event_data + ) event_uid = result["uid"] logger.info(f"Created recurring event with UID: {event_uid}") # Verify event - retrieved_event, _ = await nc_client.calendar.get_event( + retrieved_event, _ = await calendar_test_client.calendar.get_event( calendar_name, event_uid ) assert retrieved_event["title"] == "Weekly Recurring Test" assert retrieved_event.get("recurring") is True # Cleanup - await nc_client.calendar.delete_event(calendar_name, event_uid) + await calendar_test_client.calendar.delete_event(calendar_name, event_uid) except Exception as e: logger.error(f"Recurring event test failed: {e}") raise -async def test_list_events_in_range(nc_client: NextcloudClient, temporary_event: dict): +async def test_list_events_in_range( + calendar_test_client: NextcloudClient, temporary_event: dict +): """Test listing events within a date range.""" calendar_name = temporary_event["calendar_name"] # Get events for the next week - start_date = datetime.now().strftime("%Y%m%dT000000Z") - end_date = (datetime.now() + timedelta(days=7)).strftime("%Y%m%dT235959Z") + start_datetime = datetime.now() + end_datetime = datetime.now() + timedelta(days=7) - events = await nc_client.calendar.get_calendar_events( - calendar_name=calendar_name, start_date=start_date, end_date=end_date, limit=50 + events = await calendar_test_client.calendar.get_calendar_events( + calendar_name=calendar_name, + start_datetime=start_datetime, + end_datetime=end_datetime, + limit=50, ) assert isinstance(events, list) @@ -257,7 +300,9 @@ async def test_list_events_in_range(nc_client: NextcloudClient, temporary_event: assert "start_datetime" in event -async def test_update_event(nc_client: NextcloudClient, temporary_event: dict): +async def test_update_event( + calendar_test_client: NextcloudClient, temporary_event: dict +): """Test updating an existing event.""" calendar_name = temporary_event["calendar_name"] event_uid = temporary_event["uid"] @@ -271,13 +316,15 @@ async def test_update_event(nc_client: NextcloudClient, temporary_event: dict): } try: - result = await nc_client.calendar.update_event( + result = await calendar_test_client.calendar.update_event( calendar_name, event_uid, updated_data ) assert result["uid"] == event_uid # Verify updates - updated_event, _ = await nc_client.calendar.get_event(calendar_name, event_uid) + updated_event, _ = await calendar_test_client.calendar.get_event( + calendar_name, event_uid + ) assert updated_event["title"] == "Updated Test Event Title" assert updated_event["description"] == "Updated description for test event" assert updated_event["location"] == "Updated Location" @@ -291,7 +338,7 @@ async def test_update_event(nc_client: NextcloudClient, temporary_event: dict): async def test_create_event_with_attendees( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test creating an event with attendees.""" calendar_name = temporary_calendar @@ -309,12 +356,14 @@ async def test_create_event_with_attendees( } try: - result = await nc_client.calendar.create_event(calendar_name, event_data) + result = await calendar_test_client.calendar.create_event( + calendar_name, event_data + ) event_uid = result["uid"] logger.info(f"Created event with attendees, UID: {event_uid}") # Verify event - retrieved_event, _ = await nc_client.calendar.get_event( + retrieved_event, _ = await calendar_test_client.calendar.get_event( calendar_name, event_uid ) assert retrieved_event["title"] == "Meeting with Attendees" @@ -322,7 +371,7 @@ async def test_create_event_with_attendees( assert retrieved_event["status"] == "TENTATIVE" # Cleanup - await nc_client.calendar.delete_event(calendar_name, event_uid) + await calendar_test_client.calendar.delete_event(calendar_name, event_uid) except Exception as e: logger.error(f"Event with attendees test failed: {e}") @@ -330,33 +379,33 @@ async def test_create_event_with_attendees( async def test_get_nonexistent_event( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test retrieving a non-existent event.""" calendar_name = temporary_calendar fake_uid = f"nonexistent-{uuid.uuid4()}" with pytest.raises(HTTPStatusError) as exc_info: - await nc_client.calendar.get_event(calendar_name, fake_uid) + await calendar_test_client.calendar.get_event(calendar_name, fake_uid) assert exc_info.value.response.status_code == 404 logger.info(f"Correctly got 404 for nonexistent event: {fake_uid}") async def test_delete_nonexistent_event( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test deleting a non-existent event.""" calendar_name = temporary_calendar fake_uid = f"nonexistent-{uuid.uuid4()}" - result = await nc_client.calendar.delete_event(calendar_name, fake_uid) + result = await calendar_test_client.calendar.delete_event(calendar_name, fake_uid) assert result["status_code"] == 404 logger.info(f"Correctly got 404 for deleting nonexistent event: {fake_uid}") async def test_event_with_url_and_categories( - nc_client: NextcloudClient, temporary_calendar: str + calendar_test_client: NextcloudClient, temporary_calendar: str ): """Test creating an event with URL and multiple categories.""" calendar_name = temporary_calendar @@ -374,12 +423,14 @@ async def test_event_with_url_and_categories( } try: - result = await nc_client.calendar.create_event(calendar_name, event_data) + result = await calendar_test_client.calendar.create_event( + calendar_name, event_data + ) event_uid = result["uid"] logger.info(f"Created event with metadata, UID: {event_uid}") # Verify event - retrieved_event, _ = await nc_client.calendar.get_event( + retrieved_event, _ = await calendar_test_client.calendar.get_event( calendar_name, event_uid ) assert retrieved_event["title"] == "Event with URL and Categories" @@ -390,20 +441,22 @@ async def test_event_with_url_and_categories( assert retrieved_event.get("priority") == 2 # Cleanup - await nc_client.calendar.delete_event(calendar_name, event_uid) + await calendar_test_client.calendar.delete_event(calendar_name, event_uid) except Exception as e: logger.error(f"Event with metadata test failed: {e}") raise -async def test_calendar_operations_error_handling(nc_client: NextcloudClient): +async def test_calendar_operations_error_handling( + calendar_test_client: NextcloudClient, +): """Test error handling for calendar operations.""" # Test with non-existent calendar fake_calendar = f"nonexistent_calendar_{uuid.uuid4().hex}" with pytest.raises(HTTPStatusError): - await nc_client.calendar.get_calendar_events(fake_calendar) + await calendar_test_client.calendar.get_calendar_events(fake_calendar) logger.info("Error handling tests completed successfully") diff --git a/tests/integration/test_embedded_images.py b/tests/integration/test_embedded_images.py index 6ab1fa0..506675f 100644 --- a/tests/integration/test_embedded_images.py +++ b/tests/integration/test_embedded_images.py @@ -1,10 +1,11 @@ -import pytest +import logging import time import uuid -import logging -from PIL import Image, ImageDraw from io import BytesIO + +import pytest from httpx import HTTPStatusError # Import if needed for specific error checks +from PIL import Image, ImageDraw from nextcloud_mcp_server.client import NextcloudClient diff --git a/tests/integration/test_mcp.py b/tests/integration/test_mcp.py new file mode 100644 index 0000000..9eba0d4 --- /dev/null +++ b/tests/integration/test_mcp.py @@ -0,0 +1,676 @@ +import json +import logging +import uuid + +import pytest +from mcp import ClientSession + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) +pytestmark = pytest.mark.integration + + +async def test_mcp_connectivity(nc_mcp_client: ClientSession): + """Test basic MCP server connectivity and list available tools/resources.""" + + # List available tools + tools = await nc_mcp_client.list_tools() + logger.info("Available MCP tools:") + tool_names = [] + for tool in tools.tools: + logger.info(f" - {tool.name}: {tool.description}") + tool_names.append(tool.name) + + # Verify expected tools are present + expected_tools = [ + "nc_get_note", + "nc_notes_create_note", + "nc_notes_update_note", + "nc_notes_append_content", + "nc_notes_search_notes", + "nc_notes_delete_note", + "nc_tables_list_tables", + "nc_tables_get_schema", + "nc_tables_read_table", + "nc_tables_insert_row", + "nc_tables_update_row", + "nc_tables_delete_row", + "nc_webdav_list_directory", + "nc_webdav_read_file", + "nc_webdav_write_file", + "nc_webdav_create_directory", + "nc_webdav_delete_resource", + "nc_calendar_list_calendars", + "nc_calendar_create_event", + "nc_calendar_list_events", + "nc_calendar_get_event", + "nc_calendar_update_event", + "nc_calendar_delete_event", + "nc_calendar_create_meeting", + "nc_calendar_get_upcoming_events", + "nc_calendar_find_availability", + "nc_calendar_bulk_operations", + "nc_calendar_manage_calendar", + ] + + for expected_tool in expected_tools: + assert expected_tool in tool_names, ( + f"Expected tool '{expected_tool}' not found in available tools" + ) + + # List available resource templates + templates = await nc_mcp_client.list_resource_templates() + logger.info("\nAvailable resource templates:") + template_uris = [] + for template in templates.resourceTemplates: + logger.info(f" - {template.uriTemplate}") + template_uris.append(template.uriTemplate) + + # Verify expected resource templates + expected_templates = ["nc://Notes/{note_id}/attachments/{attachment_filename}"] + + for expected_template in expected_templates: + assert expected_template in template_uris, ( + f"Expected template '{expected_template}' not found" + ) + + # List available resources + resources = await nc_mcp_client.list_resources() + logger.info("\nAvailable resources:") + resource_uris = [] + for resource in resources.resources: + logger.info(f" - {resource.uri}: {resource.name}") + resource_uris.append(str(resource.uri)) # Convert to string for comparison + + # Verify expected resources + expected_resources = ["nc://capabilities", "notes://settings"] + + for expected_resource in expected_resources: + assert expected_resource in resource_uris, ( + f"Expected resource '{expected_resource}' not found" + ) + + # List available prompts + prompts = await nc_mcp_client.list_prompts() + logger.info("\nAvailable prompts:") + for prompt in prompts.prompts: + logger.info(f" - {prompt.name}") + + +async def test_mcp_notes_crud_workflow( + nc_mcp_client: ClientSession, nc_client: NextcloudClient +): + """Test complete Notes CRUD workflow via MCP tools with verification via NextcloudClient.""" + + unique_suffix = uuid.uuid4().hex[:8] + test_title = f"MCP Test Note {unique_suffix}" + test_content = f"This is test content for note {unique_suffix}" + test_category = "MCPTesting" + + created_note = None + + try: + # 1. Create note via MCP + logger.info(f"Creating note via MCP: {test_title}") + create_result = await nc_mcp_client.call_tool( + "nc_notes_create_note", + {"title": test_title, "content": test_content, "category": test_category}, + ) + + assert create_result.isError is False, ( + f"MCP note creation failed: {create_result.content}" + ) + created_note = create_result.content[0].text + note_data = json.loads(created_note) # Parse the returned JSON + note_id = note_data["id"] + + logger.info(f"Note created via MCP with ID: {note_id}") + + # 2. Verify creation via direct NextcloudClient + direct_note = await nc_client.notes.get_note(note_id) + assert direct_note["title"] == test_title, ( + f"Title mismatch: {direct_note['title']} != {test_title}" + ) + assert direct_note["content"] == test_content, "Content mismatch" + assert direct_note["category"] == test_category, "Category mismatch" + + # 3. Read note via MCP + logger.info(f"Reading note via MCP: {note_id}") + read_result = await nc_mcp_client.call_tool("nc_get_note", {"note_id": note_id}) + assert read_result.isError is False, ( + f"MCP note read failed: {read_result.content}" + ) + read_note_data = json.loads(read_result.content[0].text) + + assert read_note_data["title"] == test_title + assert read_note_data["content"] == test_content + assert read_note_data["category"] == test_category + + # 4. Update note via MCP + updated_title = f"Updated {test_title}" + updated_content = f"Updated content: {test_content}" + etag = read_note_data["etag"] + + logger.info(f"Updating note via MCP: {note_id}") + update_result = await nc_mcp_client.call_tool( + "nc_notes_update_note", + { + "note_id": note_id, + "etag": etag, + "title": updated_title, + "content": updated_content, + "category": test_category, + }, + ) + + assert update_result.isError is False, ( + f"MCP note update failed: {update_result.content}" + ) + + # 5. Verify update via direct NextcloudClient + updated_direct_note = await nc_client.notes.get_note(note_id) + assert updated_direct_note["title"] == updated_title + assert updated_direct_note["content"] == updated_content + + # 6. Append content via MCP + append_content = "\n\nThis is appended content via MCP." + logger.info(f"Appending content to note via MCP: {note_id}") + append_result = await nc_mcp_client.call_tool( + "nc_notes_append_content", {"note_id": note_id, "content": append_content} + ) + + assert append_result.isError is False, ( + f"MCP note append failed: {append_result.content}" + ) + + # 7. Verify append via direct NextcloudClient + appended_direct_note = await nc_client.notes.get_note(note_id) + assert append_content in appended_direct_note["content"] + + # 8. Search for note via MCP + logger.info(f"Searching for note via MCP with query: {unique_suffix}") + search_result = await nc_mcp_client.call_tool( + "nc_notes_search_notes", {"query": unique_suffix} + ) + + assert search_result.isError is False, ( + f"MCP note search failed: {search_result.content}" + ) + search_notes_text = search_result.content[0].text + logger.info(f"Search result text: {search_notes_text}") + search_notes = json.loads(search_notes_text) + + # Ensure search_notes is a list + if not isinstance(search_notes, list): + logger.warning( + f"Expected search results to be a list, got: {type(search_notes)}" + ) + search_notes = [search_notes] if search_notes else [] + + # Find our note in search results + found_note = None + for note in search_notes: + if isinstance(note, dict) and note.get("id") == note_id: + found_note = note + break + + assert found_note is not None, ( + f"Created note not found in search results. Search returned: {search_notes}" + ) + assert found_note["title"] == updated_title + + # 9. Delete note via MCP + logger.info(f"Deleting note via MCP: {note_id}") + delete_result = await nc_mcp_client.call_tool( + "nc_notes_delete_note", {"note_id": note_id} + ) + + assert delete_result.isError is False, ( + f"MCP note deletion failed: {delete_result.content}" + ) + + # 10. Verify deletion via direct NextcloudClient + try: + await nc_client.notes.get_note(note_id) + pytest.fail("Note should have been deleted but was still found") + except Exception: + # Expected - note should be deleted + logger.info(f"Successfully verified note {note_id} was deleted") + created_note = None # Mark as cleaned up + + finally: + # Cleanup in case of test failure + if created_note is not None: + try: + note_data = json.loads(created_note) + await nc_client.notes.delete_note(note_data["id"]) + logger.info(f"Cleaned up note {note_data['id']} after test failure") + except Exception as e: + logger.warning(f"Failed to cleanup note: {e}") + + +async def test_mcp_webdav_workflow( + nc_mcp_client: ClientSession, nc_client: NextcloudClient +): + """Test WebDAV file operations via MCP tools with verification via NextcloudClient.""" + + unique_suffix = uuid.uuid4().hex[:8] + test_dir = f"mcp_test_dir_{unique_suffix}" + test_file = f"mcp_test_file_{unique_suffix}.txt" + test_file_path = f"{test_dir}/{test_file}" + test_content = f"This is test content for MCP WebDAV testing {unique_suffix}" + + try: + # 1. Create directory via MCP + logger.info(f"Creating directory via MCP: {test_dir}") + create_dir_result = await nc_mcp_client.call_tool( + "nc_webdav_create_directory", {"path": test_dir} + ) + + assert create_dir_result.isError is False, ( + f"MCP directory creation failed: {create_dir_result.content}" + ) + + # 2. Verify directory creation via direct WebDAV + dir_listing = await nc_client.webdav.list_directory("") + dir_names = [item["name"] for item in dir_listing if item["is_directory"]] + assert test_dir in dir_names, f"Directory {test_dir} not found in root listing" + + # 3. Write file via MCP + logger.info(f"Writing file via MCP: {test_file_path}") + write_result = await nc_mcp_client.call_tool( + "nc_webdav_write_file", + { + "path": test_file_path, + "content": test_content, + "content_type": "text/plain", + }, + ) + + assert write_result.isError is False, ( + f"MCP file write failed: {write_result.content}" + ) + + # 4. Verify file creation via direct WebDAV + file_listing = await nc_client.webdav.list_directory(test_dir) + file_names = [item["name"] for item in file_listing if not item["is_directory"]] + assert test_file in file_names, ( + f"File {test_file} not found in directory listing" + ) + + # 5. Read file via MCP + logger.info(f"Reading file via MCP: {test_file_path}") + read_result = await nc_mcp_client.call_tool( + "nc_webdav_read_file", {"path": test_file_path} + ) + + assert read_result.isError is False, ( + f"MCP file read failed: {read_result.content}" + ) + read_data = json.loads(read_result.content[0].text) + + assert read_data["content"] == test_content, "File content mismatch" + assert read_data["path"] == test_file_path + assert "text/plain" in read_data["content_type"] + + # 6. Verify file content via direct WebDAV + direct_content, direct_content_type = await nc_client.webdav.read_file( + test_file_path + ) + assert direct_content.decode("utf-8") == test_content + + # 7. List directory via MCP + logger.info(f"Listing directory via MCP: {test_dir}") + list_result = await nc_mcp_client.call_tool( + "nc_webdav_list_directory", {"path": test_dir} + ) + + assert list_result.isError is False, ( + f"MCP directory listing failed: {list_result.content}" + ) + listing_text = list_result.content[0].text + logger.info(f"Directory listing response: {listing_text}") + listing_data = json.loads(listing_text) + + # Ensure listing_data is a list + if not isinstance(listing_data, list): + logger.warning( + f"Expected directory listing to be a list, got: {type(listing_data)}" + ) + listing_data = [listing_data] if listing_data else [] + + # Find our file in the listing + found_file = None + for item in listing_data: + if isinstance(item, dict) and item.get("name") == test_file: + found_file = item + break + + assert found_file is not None, ( + f"File {test_file} not found in MCP directory listing" + ) + assert found_file["is_directory"] is False + assert found_file["size"] == len(test_content.encode("utf-8")) + + finally: + # Cleanup + try: + logger.info(f"Cleaning up test file: {test_file_path}") + await nc_mcp_client.call_tool( + "nc_webdav_delete_resource", {"path": test_file_path} + ) + + logger.info(f"Cleaning up test directory: {test_dir}") + await nc_mcp_client.call_tool( + "nc_webdav_delete_resource", {"path": test_dir} + ) + except Exception as e: + logger.warning(f"Failed to cleanup WebDAV resources: {e}") + + +async def test_mcp_resources_access( + nc_mcp_client: ClientSession, nc_client: NextcloudClient +): + """Test accessing MCP resources and compare with direct API calls.""" + + # 1. Test capabilities resource + logger.info("Testing capabilities resource via MCP") + caps_result = await nc_mcp_client.read_resource("nc://capabilities") + assert len(caps_result.contents) == 1 + mcp_capabilities = json.loads(caps_result.contents[0].text) + + # Compare with direct API call + direct_capabilities = await nc_client.capabilities() + + # Basic validation - both should have similar structure + # Both return full OCS response structure + assert "ocs" in mcp_capabilities + assert "data" in mcp_capabilities["ocs"] + assert "version" in mcp_capabilities["ocs"]["data"] + assert "ocs" in direct_capabilities + assert "data" in direct_capabilities["ocs"] + assert "version" in direct_capabilities["ocs"]["data"] + + # 2. Test notes settings resource + logger.info("Testing notes settings resource via MCP") + settings_result = await nc_mcp_client.read_resource("notes://settings") + assert len(settings_result.contents) == 1 + mcp_settings = json.loads(settings_result.contents[0].text) + + # Compare with direct API call + direct_settings = await nc_client.notes.get_settings() + + # Both should have settings data + assert isinstance(mcp_settings, dict) + assert isinstance(direct_settings, dict) + + logger.info("Successfully verified MCP resources match direct API calls") + + +async def test_mcp_calendar_workflow( + nc_mcp_client: ClientSession, nc_client: NextcloudClient +): + """Test complete Calendar workflow via MCP tools with verification via NextcloudClient.""" + + unique_suffix = uuid.uuid4().hex[:8] + test_event_title = f"MCP Test Event {unique_suffix}" + test_location = f"MCP Test Location {unique_suffix}" + + created_event = None + calendar_name = None + + try: + # 1. List calendars via MCP + logger.info("Listing calendars via MCP") + calendars_result = await nc_mcp_client.call_tool( + "nc_calendar_list_calendars", {} + ) + + assert calendars_result.isError is False, ( + f"MCP calendar listing failed: {calendars_result.content}" + ) + + calendars_data = json.loads(calendars_result.content[0].text) + + # Debug output to understand the structure + logger.info(f"calendars_data type: {type(calendars_data)}") + logger.info(f"calendars_data content: {calendars_data}") + + # Handle the case where MCP tool returns a single dict instead of a list + if isinstance(calendars_data, dict): + # Single calendar returned as dict instead of list + calendar_name = calendars_data["name"] + elif isinstance(calendars_data, list) and calendars_data: + # Normal case - list of calendars + calendar_name = calendars_data[0]["name"] + else: + pytest.skip("No calendars available for testing") + logger.info(f"Using calendar: {calendar_name}") + + # 2. Create event via MCP + from datetime import datetime, timedelta + + tomorrow = datetime.now() + timedelta(days=1) + start_datetime = tomorrow.strftime("%Y-%m-%dT14:00:00") + end_datetime = tomorrow.strftime("%Y-%m-%dT15:00:00") + + event_data = { + "calendar_name": calendar_name, + "title": test_event_title, + "start_datetime": start_datetime, + "end_datetime": end_datetime, + "description": f"Test event created via MCP {unique_suffix}", + "location": test_location, + "categories": "testing,mcp", + "status": "CONFIRMED", + "priority": 5, + } + + logger.info(f"Creating event via MCP: {test_event_title}") + create_result = await nc_mcp_client.call_tool( + "nc_calendar_create_event", event_data + ) + + assert create_result.isError is False, ( + f"MCP event creation failed: {create_result.content}" + ) + + created_event_data = json.loads(create_result.content[0].text) + event_uid = created_event_data["uid"] + created_event = {"uid": event_uid, "calendar_name": calendar_name} + + logger.info(f"Event created via MCP with UID: {event_uid}") + + # 3. Verify creation via direct NextcloudClient + direct_event, _ = await nc_client.calendar.get_event(calendar_name, event_uid) + assert direct_event["title"] == test_event_title + assert direct_event["location"] == test_location + assert "testing" in direct_event.get("categories", "") + + # 4. Get event via MCP + logger.info(f"Getting event via MCP: {event_uid}") + get_result = await nc_mcp_client.call_tool( + "nc_calendar_get_event", + {"calendar_name": calendar_name, "event_uid": event_uid}, + ) + + assert get_result.isError is False, ( + f"MCP event get failed: {get_result.content}" + ) + + get_event_data = json.loads(get_result.content[0].text) + assert get_event_data["title"] == test_event_title + assert get_event_data["location"] == test_location + + # 5. **TEST nc_calendar_list_events - This is the main tool we're testing** + logger.info("Testing nc_calendar_list_events via MCP") + + # Get today and next week for date range + today = datetime.now() + next_week = today + timedelta(days=7) + start_date = today.strftime("%Y-%m-%d") + end_date = next_week.strftime("%Y-%m-%d") + + list_events_data = { + "calendar_name": calendar_name, + "start_date": start_date, + "end_date": end_date, + "limit": 50, + "location_contains": "MCP Test", + "title_contains": unique_suffix, + } + + list_result = await nc_mcp_client.call_tool( + "nc_calendar_list_events", list_events_data + ) + + assert list_result.isError is False, ( + f"MCP list events failed: {list_result.content}" + ) + + events_data = json.loads(list_result.content[0].text) + + # Debug output to understand what nc_calendar_list_events returns + logger.info(f"list_events result type: {type(events_data)}") + logger.info(f"list_events result content: {events_data}") + + # Handle single event returned as dict instead of list (same fix as calendars) + if isinstance(events_data, dict): + # Single event returned as dict instead of list + events_data = [events_data] + + assert isinstance(events_data, list), "Expected events list" + + # Our created event should be in the list + found_event = None + for event in events_data: + if event.get("uid") == event_uid: + found_event = event + break + + assert found_event is not None, ( + f"Created event {event_uid} not found in events list" + ) + assert found_event["title"] == test_event_title + + # 6. Test list events across all calendars + logger.info("Testing nc_calendar_list_events across all calendars") + + all_calendars_data = { + "calendar_name": "", # Will be ignored + "search_all_calendars": True, + "start_date": start_date, + "end_date": end_date, + "title_contains": unique_suffix, + } + + all_list_result = await nc_mcp_client.call_tool( + "nc_calendar_list_events", all_calendars_data + ) + + assert all_list_result.isError is False, ( + f"MCP list all events failed: {all_list_result.content}" + ) + + all_events_data = json.loads(all_list_result.content[0].text) + + # Handle single event returned as dict instead of list (same fix as calendars) + if isinstance(all_events_data, dict): + # Single event returned as dict instead of list + all_events_data = [all_events_data] + + assert isinstance(all_events_data, list), "Expected events list" + + # Our event should still be found when searching all calendars + found_in_all = any(event.get("uid") == event_uid for event in all_events_data) + assert found_in_all, "Event not found when searching all calendars" + + # 7. Update event via MCP + updated_title = f"Updated {test_event_title}" + updated_description = f"Updated description {unique_suffix}" + + update_data = { + "calendar_name": calendar_name, + "event_uid": event_uid, + "title": updated_title, + "description": updated_description, + "priority": 1, + } + + logger.info(f"Updating event via MCP: {event_uid}") + update_result = await nc_mcp_client.call_tool( + "nc_calendar_update_event", update_data + ) + + assert update_result.isError is False, ( + f"MCP event update failed: {update_result.content}" + ) + + # 8. Verify update via direct NextcloudClient + updated_direct_event, _ = await nc_client.calendar.get_event( + calendar_name, event_uid + ) + assert updated_direct_event["title"] == updated_title + assert updated_direct_event["description"] == updated_description + assert updated_direct_event["priority"] == 1 + + # 9. Test upcoming events via MCP + logger.info("Testing nc_calendar_get_upcoming_events via MCP") + upcoming_result = await nc_mcp_client.call_tool( + "nc_calendar_get_upcoming_events", + {"calendar_name": calendar_name, "days_ahead": 7, "limit": 10}, + ) + + assert upcoming_result.isError is False, ( + f"MCP upcoming events failed: {upcoming_result.content}" + ) + + upcoming_events = json.loads(upcoming_result.content[0].text) + + # Handle single event returned as dict instead of list (same fix as other tools) + if isinstance(upcoming_events, dict): + # Single event returned as dict instead of list + upcoming_events = [upcoming_events] + + assert isinstance(upcoming_events, list), "Expected upcoming events list" + + # 10. Delete event via MCP + logger.info(f"Deleting event via MCP: {event_uid}") + delete_result = await nc_mcp_client.call_tool( + "nc_calendar_delete_event", + {"calendar_name": calendar_name, "event_uid": event_uid}, + ) + + assert delete_result.isError is False, ( + f"MCP event deletion failed: {delete_result.content}" + ) + + # 11. Verify deletion via direct NextcloudClient + try: + await nc_client.calendar.get_event(calendar_name, event_uid) + pytest.fail("Event should have been deleted but was still found") + except Exception: + # Expected - event should be deleted + logger.info(f"Successfully verified event {event_uid} was deleted") + created_event = None # Mark as cleaned up + + except Exception as e: + if "Calendar app may not be enabled" in str( + e + ) or "No calendars available" in str(e): + pytest.skip("Calendar functionality not available for testing") + raise + + finally: + # Cleanup in case of test failure + if created_event is not None: + try: + await nc_client.calendar.delete_event( + created_event["calendar_name"], created_event["uid"] + ) + logger.info( + f"Cleaned up event {created_event['uid']} after test failure" + ) + except Exception as e: + logger.warning(f"Failed to cleanup event: {e}") diff --git a/tests/integration/test_notes_api.py b/tests/integration/test_notes_api.py index 7ffd9e9..0c46ad2 100644 --- a/tests/integration/test_notes_api.py +++ b/tests/integration/test_notes_api.py @@ -1,7 +1,8 @@ -import pytest -import logging import asyncio +import logging import uuid # Keep uuid if needed for generating unique data within tests + +import pytest from httpx import HTTPStatusError from nextcloud_mcp_server.client import NextcloudClient diff --git a/tests/integration/test_tables_api.py b/tests/integration/test_tables_api.py index e03ad68..39a9d05 100644 --- a/tests/integration/test_tables_api.py +++ b/tests/integration/test_tables_api.py @@ -1,9 +1,10 @@ -import pytest -import logging import asyncio +import logging import uuid +from typing import Any, Dict + +import pytest from httpx import HTTPStatusError -from typing import Dict, Any from nextcloud_mcp_server.client import NextcloudClient diff --git a/tests/integration/test_webdav_cleanup.py b/tests/integration/test_webdav_cleanup.py index 3d197d8..84152d7 100644 --- a/tests/integration/test_webdav_cleanup.py +++ b/tests/integration/test_webdav_cleanup.py @@ -1,7 +1,8 @@ -import pytest import logging import time import uuid + +import pytest from httpx import HTTPStatusError from nextcloud_mcp_server.client import NextcloudClient diff --git a/tests/integration/test_webdav_operations.py b/tests/integration/test_webdav_operations.py index 0023869..0897318 100644 --- a/tests/integration/test_webdav_operations.py +++ b/tests/integration/test_webdav_operations.py @@ -1,8 +1,9 @@ """Integration tests for WebDAV operations.""" -import pytest import logging import uuid + +import pytest from httpx import HTTPStatusError from nextcloud_mcp_server.client import NextcloudClient diff --git a/uv.lock b/uv.lock index 3ab5c44..f72f869 100644 --- a/uv.lock +++ b/uv.lock @@ -469,7 +469,7 @@ wheels = [ [[package]] name = "mcp" -version = "1.10.0" +version = "1.10.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -483,9 +483,9 @@ dependencies = [ { name = "starlette" }, { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c8/1a/d90e42be23a7e6dd35c03e35c7c63fe1036f082d3bb88114b66bd0f2467e/mcp-1.10.0.tar.gz", hash = "sha256:91fb1623c3faf14577623d14755d3213db837c5da5dae85069e1b59124cbe0e9", size = 392961, upload-time = "2025-06-26T13:51:19.025Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7c/68/63045305f29ff680a9cd5be360c755270109e6b76f696ea6824547ddbc30/mcp-1.10.1.tar.gz", hash = "sha256:aaa0957d8307feeff180da2d9d359f2b801f35c0c67f1882136239055ef034c2", size = 392969, upload-time = "2025-06-27T12:03:08.982Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0f/52/e1c43c4b5153465fd5d3b4b41bf2d4c7731475e9f668f38d68f848c25c9a/mcp-1.10.0-py3-none-any.whl", hash = "sha256:925c45482d75b1b6f11febddf9736d55edf7739c7ea39b583309f6651cbc9e5c", size = 150894, upload-time = "2025-06-26T13:51:17.342Z" }, + { url = "https://files.pythonhosted.org/packages/d7/3f/435a5b3d10ae242a9d6c2b33175551173c3c61fe637dc893be05c4ed0aaf/mcp-1.10.1-py3-none-any.whl", hash = "sha256:4d08301aefe906dce0fa482289db55ce1db831e3e67212e65b5e23ad8454b3c5", size = 150878, upload-time = "2025-06-27T12:03:07.328Z" }, ] [package.optional-dependencies] @@ -1150,7 +1150,7 @@ wheels = [ [[package]] name = "typer" -version = "0.15.3" +version = "0.16.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "click" }, @@ -1158,9 +1158,9 @@ dependencies = [ { name = "shellingham" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/98/1a/5f36851f439884bcfe8539f6a20ff7516e7b60f319bbaf69a90dc35cc2eb/typer-0.15.3.tar.gz", hash = "sha256:818873625d0569653438316567861899f7e9972f2e6e0c16dab608345ced713c", size = 101641, upload-time = "2025-04-28T21:40:59.204Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c5/8c/7d682431efca5fd290017663ea4588bf6f2c6aad085c7f108c5dbc316e70/typer-0.16.0.tar.gz", hash = "sha256:af377ffaee1dbe37ae9440cb4e8f11686ea5ce4e9bae01b84ae7c63b87f1dd3b", size = 102625, upload-time = "2025-05-26T14:30:31.824Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/48/20/9d953de6f4367163d23ec823200eb3ecb0050a2609691e512c8b95827a9b/typer-0.15.3-py3-none-any.whl", hash = "sha256:c86a65ad77ca531f03de08d1b9cb67cd09ad02ddddf4b34745b5008f43b239bd", size = 45253, upload-time = "2025-04-28T21:40:56.269Z" }, + { url = "https://files.pythonhosted.org/packages/76/42/3efaf858001d2c2913de7f354563e3a3a2f0decae3efe98427125a8f441e/typer-0.16.0-py3-none-any.whl", hash = "sha256:1f79bed11d4d02d4310e3c1b7ba594183bcedb0ac73b27a9e5f28f6fb5b98855", size = 46317, upload-time = "2025-05-26T14:30:30.523Z" }, ] [[package]]