From 2bcfd3d7eee12ba190738ea0d80525097045b178 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Fri, 1 Aug 2025 08:34:51 +0200 Subject: [PATCH] fix(calendar): Fix iCalendar date vs datetime format --- nextcloud_mcp_server/client/calendar.py | 70 ++++-- nextcloud_mcp_server/server/calendar.py | 106 +++++++-- pyproject.toml | 4 +- tests/integration/test_mcp.py | 279 ++++++++++++++++++++++++ 4 files changed, 412 insertions(+), 47 deletions(-) diff --git a/nextcloud_mcp_server/client/calendar.py b/nextcloud_mcp_server/client/calendar.py index d01d5be..f75278e 100644 --- a/nextcloud_mcp_server/client/calendar.py +++ b/nextcloud_mcp_server/client/calendar.py @@ -118,8 +118,8 @@ class CalendarClient(BaseNextcloudClient): async def get_calendar_events( self, calendar_name: str, - start_date: str = "", - end_date: str = "", + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, limit: int = 50, ) -> List[Dict[str, Any]]: """List events in a calendar within date range.""" @@ -127,9 +127,18 @@ class CalendarClient(BaseNextcloudClient): # Build time range filter if dates provided time_range_filter = "" - if start_date or end_date: - start_dt = start_date or "19700101T000000Z" - end_dt = end_date or "20301231T235959Z" + if start_datetime or end_datetime: + # Convert datetime objects to CalDAV format (YYYYMMDDTHHMMSSZ) + start_dt = ( + start_datetime.strftime("%Y%m%dT%H%M%SZ") + if start_datetime + else "19700101T000000Z" + ) + end_dt = ( + end_datetime.strftime("%Y%m%dT%H%M%SZ") + if end_datetime + else "20301231T235959Z" + ) time_range_filter = f""" """ @@ -504,8 +513,8 @@ class CalendarClient(BaseNextcloudClient): async def search_events_across_calendars( self, - start_date: str = "", - end_date: str = "", + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, filters: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Search events across all calendars with advanced filtering.""" @@ -516,7 +525,7 @@ class CalendarClient(BaseNextcloudClient): for calendar in calendars: try: events = await self.get_calendar_events( - calendar["name"], start_date, end_date + calendar["name"], start_datetime, end_datetime ) # Apply filters if provided @@ -623,22 +632,21 @@ class CalendarClient(BaseNextcloudClient): self, duration_minutes: int, attendees: Optional[List[str]] = None, - date_range_start: str = "", - date_range_end: str = "", + start_datetime: Optional[dt.datetime] = None, + end_datetime: Optional[dt.datetime] = None, constraints: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Find available time slots for scheduling.""" try: # Set default date range if not provided - if not date_range_start: - date_range_start = dt.datetime.now().strftime("%Y-%m-%d") - if not date_range_end: - end_date = dt.datetime.now() + dt.timedelta(days=7) - date_range_end = end_date.strftime("%Y-%m-%d") + if not start_datetime: + start_datetime = dt.datetime.now() + if not end_datetime: + end_datetime = dt.datetime.now() + dt.timedelta(days=7) # Get all events in the date range busy_events = await self.search_events_across_calendars( - start_date=date_range_start, end_date=date_range_end + start_datetime=start_datetime, end_datetime=end_datetime ) # Filter events for relevant attendees if specified @@ -662,8 +670,8 @@ class CalendarClient(BaseNextcloudClient): available_slots = self._generate_available_slots( busy_events, duration_minutes, - date_range_start, - date_range_end, + start_datetime, + end_datetime, business_hours_only, exclude_weekends, preferred_times, @@ -679,8 +687,8 @@ class CalendarClient(BaseNextcloudClient): self, busy_events: List[Dict[str, Any]], duration_minutes: int, - start_date: str, - end_date: str, + start_datetime: dt.datetime, + end_datetime: dt.datetime, business_hours_only: bool, exclude_weekends: bool, preferred_times: List[str], @@ -689,8 +697,12 @@ class CalendarClient(BaseNextcloudClient): available_slots = [] try: - current_date = dt.datetime.fromisoformat(start_date) - end_date_dt = dt.datetime.fromisoformat(end_date) + current_date = start_datetime.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + end_date_dt = end_datetime.replace( + hour=23, minute=59, second=59, microsecond=999999 + ) while current_date <= end_date_dt: # Skip weekends if requested @@ -819,10 +831,20 @@ class CalendarClient(BaseNextcloudClient): ) -> Dict[str, Any]: """Bulk update events matching filter criteria.""" try: + # Convert string dates to datetime objects if present + start_datetime = None + end_datetime = None + if "start_date" in filter_criteria and filter_criteria["start_date"]: + start_datetime = dt.datetime.fromisoformat( + filter_criteria["start_date"] + ) + if "end_date" in filter_criteria and filter_criteria["end_date"]: + end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"]) + # Find events matching criteria events = await self.search_events_across_calendars( - start_date=filter_criteria.get("start_date", ""), - end_date=filter_criteria.get("end_date", ""), + start_datetime=start_datetime, + end_datetime=end_datetime, filters=filter_criteria, ) diff --git a/nextcloud_mcp_server/server/calendar.py b/nextcloud_mcp_server/server/calendar.py index dd1d26a..7893095 100644 --- a/nextcloud_mcp_server/server/calendar.py +++ b/nextcloud_mcp_server/server/calendar.py @@ -126,6 +126,33 @@ def configure_calendar_tools(mcp: FastMCP): """ client: NextcloudClient = ctx.request_context.lifespan_context.client + # Convert YYYY-MM-DD format dates to datetime objects + start_datetime = None + end_datetime = None + + if start_date: + try: + start_datetime = dt.datetime.strptime(start_date, "%Y-%m-%d") + except ValueError: + # If parsing fails, try to parse as ISO format + try: + start_datetime = dt.datetime.fromisoformat(start_date) + except ValueError: + logger.warning(f"Invalid start_date format: {start_date}") + + if end_date: + try: + # For end date, set to end of day (23:59:59) + end_datetime = dt.datetime.strptime(end_date, "%Y-%m-%d").replace( + hour=23, minute=59, second=59 + ) + except ValueError: + # If parsing fails, try to parse as ISO format + try: + end_datetime = dt.datetime.fromisoformat(end_date) + except ValueError: + logger.warning(f"Invalid end_date format: {end_date}") + # Build filters dictionary filters = {} if min_attendees is not None: @@ -144,8 +171,8 @@ def configure_calendar_tools(mcp: FastMCP): if search_all_calendars: # Search across all calendars with filters events = await client.calendar.search_events_across_calendars( - start_date=start_date, - end_date=end_date, + start_datetime=start_datetime, + end_datetime=end_datetime, filters=filters if filters else None, ) return events[:limit] @@ -153,8 +180,8 @@ def configure_calendar_tools(mcp: FastMCP): # Search in specific calendar events = await client.calendar.get_calendar_events( calendar_name=calendar_name, - start_date=start_date, - end_date=end_date, + start_datetime=start_datetime, + end_datetime=end_datetime, limit=limit, ) @@ -302,7 +329,6 @@ def configure_calendar_tools(mcp: FastMCP): start_datetime = f"{date}T{time}:00" # Calculate end_datetime - start_dt = dt.datetime.fromisoformat(start_datetime) end_dt = start_dt + dt.timedelta(minutes=duration_minutes) end_datetime = end_dt.isoformat() @@ -334,17 +360,14 @@ def configure_calendar_tools(mcp: FastMCP): client: NextcloudClient = ctx.request_context.lifespan_context.client now = dt.datetime.now() - end_date = now + dt.timedelta(days=days_ahead) - - start_date_str = now.strftime("%Y%m%dT%H%M%SZ") - end_date_str = end_date.strftime("%Y%m%dT%H%M%SZ") + end_datetime = now + dt.timedelta(days=days_ahead) if calendar_name: # Get events from specific calendar return await client.calendar.get_calendar_events( calendar_name=calendar_name, - start_date=start_date_str, - end_date=end_date_str, + start_datetime=now, + end_datetime=end_datetime, limit=limit, ) else: @@ -356,8 +379,8 @@ def configure_calendar_tools(mcp: FastMCP): try: events = await client.calendar.get_calendar_events( calendar_name=calendar["name"], - start_date=start_date_str, - end_date=end_date_str, + start_datetime=now, + end_datetime=end_datetime, limit=limit, ) # Add calendar info to each event @@ -421,6 +444,24 @@ def configure_calendar_tools(mcp: FastMCP): if time_range.strip() ] + # Convert date strings to datetime objects + start_datetime = None + end_datetime = None + + if date_range_start: + try: + start_datetime = dt.datetime.strptime(date_range_start, "%Y-%m-%d") + except ValueError: + logger.warning(f"Invalid date_range_start format: {date_range_start}") + + if date_range_end: + try: + end_datetime = dt.datetime.strptime(date_range_end, "%Y-%m-%d").replace( + hour=23, minute=59, second=59 + ) + except ValueError: + logger.warning(f"Invalid date_range_end format: {date_range_end}") + # Build constraints constraints = { "business_hours_only": business_hours_only, @@ -431,8 +472,8 @@ def configure_calendar_tools(mcp: FastMCP): return await client.calendar.find_availability( duration_minutes=duration_minutes, attendees=attendee_list, - date_range_start=date_range_start, - date_range_end=date_range_end, + start_datetime=start_datetime, + end_datetime=end_datetime, constraints=constraints, ) @@ -491,6 +532,24 @@ def configure_calendar_tools(mcp: FastMCP): if operation not in ["update", "delete", "move"]: raise ValueError("Operation must be 'update', 'delete', or 'move'") + # Convert date strings to datetime objects + start_datetime = None + end_datetime = None + + if start_date: + try: + start_datetime = dt.datetime.strptime(start_date, "%Y-%m-%d") + except ValueError: + logger.warning(f"Invalid start_date format: {start_date}") + + if end_date: + try: + end_datetime = dt.datetime.strptime(end_date, "%Y-%m-%d").replace( + hour=23, minute=59, second=59 + ) + except ValueError: + logger.warning(f"Invalid end_date format: {end_date}") + # Build filter criteria filter_criteria = {} if title_contains is not None: @@ -503,6 +562,7 @@ def configure_calendar_tools(mcp: FastMCP): filter_criteria["status"] = status if location_contains is not None: filter_criteria["location_contains"] = location_contains + # Add datetime strings for client compatibility if start_date: filter_criteria["start_date"] = start_date if end_date: @@ -513,8 +573,8 @@ def configure_calendar_tools(mcp: FastMCP): if calendar_name: events = await client.calendar.get_calendar_events( calendar_name=calendar_name, - start_date=start_date, - end_date=end_date, + start_datetime=start_datetime, + end_datetime=end_datetime, ) if filter_criteria: events = client.calendar._apply_event_filters( @@ -522,7 +582,9 @@ def configure_calendar_tools(mcp: FastMCP): ) else: events = await client.calendar.search_events_across_calendars( - start_date=start_date, end_date=end_date, filters=filter_criteria + start_datetime=start_datetime, + end_datetime=end_datetime, + filters=filter_criteria, ) deleted_count = 0 @@ -592,8 +654,8 @@ def configure_calendar_tools(mcp: FastMCP): if calendar_name: events = await client.calendar.get_calendar_events( calendar_name=calendar_name, - start_date=start_date, - end_date=end_date, + start_datetime=start_datetime, + end_datetime=end_datetime, ) if filter_criteria: events = client.calendar._apply_event_filters( @@ -601,7 +663,9 @@ def configure_calendar_tools(mcp: FastMCP): ) else: events = await client.calendar.search_events_across_calendars( - start_date=start_date, end_date=end_date, filters=filter_criteria + start_datetime=start_datetime, + end_datetime=end_datetime, + filters=filter_criteria, ) moved_count = 0 diff --git a/pyproject.toml b/pyproject.toml index d1f7406..a834e87 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,8 +19,8 @@ asyncio_mode = "auto" asyncio_default_test_loop_scope = "session" asyncio_default_fixture_loop_scope = "session" log_cli = 1 -log_cli_level = "WARN" -log_level = "WARN" +log_cli_level = "INFO" +log_level = "INFO" markers = [ "integration: marks tests as slow (deselect with '-m \"not slow\"')" ] diff --git a/tests/integration/test_mcp.py b/tests/integration/test_mcp.py index 52c9fc8..3a0f165 100644 --- a/tests/integration/test_mcp.py +++ b/tests/integration/test_mcp.py @@ -41,6 +41,17 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession): "nc_webdav_write_file", "nc_webdav_create_directory", "nc_webdav_delete_resource", + "nc_calendar_list_calendars", + "nc_calendar_create_event", + "nc_calendar_list_events", + "nc_calendar_get_event", + "nc_calendar_update_event", + "nc_calendar_delete_event", + "nc_calendar_create_meeting", + "nc_calendar_get_upcoming_events", + "nc_calendar_find_availability", + "nc_calendar_bulk_operations", + "nc_calendar_manage_calendar", ] for expected_tool in expected_tools: @@ -395,3 +406,271 @@ async def test_mcp_resources_access( assert isinstance(direct_settings, dict) logger.info("Successfully verified MCP resources match direct API calls") + + +async def test_mcp_calendar_workflow( + nc_mcp_client: ClientSession, nc_client: NextcloudClient +): + """Test complete Calendar workflow via MCP tools with verification via NextcloudClient.""" + + unique_suffix = uuid.uuid4().hex[:8] + test_event_title = f"MCP Test Event {unique_suffix}" + test_location = f"MCP Test Location {unique_suffix}" + + created_event = None + calendar_name = None + + try: + # 1. List calendars via MCP + logger.info("Listing calendars via MCP") + calendars_result = await nc_mcp_client.call_tool( + "nc_calendar_list_calendars", {} + ) + + assert calendars_result.isError is False, ( + f"MCP calendar listing failed: {calendars_result.content}" + ) + + calendars_data = json.loads(calendars_result.content[0].text) + + # Debug output to understand the structure + logger.info(f"calendars_data type: {type(calendars_data)}") + logger.info(f"calendars_data content: {calendars_data}") + + # Handle the case where MCP tool returns a single dict instead of a list + if isinstance(calendars_data, dict): + # Single calendar returned as dict instead of list + calendar_name = calendars_data["name"] + elif isinstance(calendars_data, list) and calendars_data: + # Normal case - list of calendars + calendar_name = calendars_data[0]["name"] + else: + pytest.skip("No calendars available for testing") + logger.info(f"Using calendar: {calendar_name}") + + # 2. Create event via MCP + from datetime import datetime, timedelta + + tomorrow = datetime.now() + timedelta(days=1) + start_datetime = tomorrow.strftime("%Y-%m-%dT14:00:00") + end_datetime = tomorrow.strftime("%Y-%m-%dT15:00:00") + + event_data = { + "calendar_name": calendar_name, + "title": test_event_title, + "start_datetime": start_datetime, + "end_datetime": end_datetime, + "description": f"Test event created via MCP {unique_suffix}", + "location": test_location, + "categories": "testing,mcp", + "status": "CONFIRMED", + "priority": 5, + } + + logger.info(f"Creating event via MCP: {test_event_title}") + create_result = await nc_mcp_client.call_tool( + "nc_calendar_create_event", event_data + ) + + assert create_result.isError is False, ( + f"MCP event creation failed: {create_result.content}" + ) + + created_event_data = json.loads(create_result.content[0].text) + event_uid = created_event_data["uid"] + created_event = {"uid": event_uid, "calendar_name": calendar_name} + + logger.info(f"Event created via MCP with UID: {event_uid}") + + # 3. Verify creation via direct NextcloudClient + direct_event, _ = await nc_client.calendar.get_event(calendar_name, event_uid) + assert direct_event["title"] == test_event_title + assert direct_event["location"] == test_location + assert "testing" in direct_event.get("categories", "") + + # 4. Get event via MCP + logger.info(f"Getting event via MCP: {event_uid}") + get_result = await nc_mcp_client.call_tool( + "nc_calendar_get_event", + {"calendar_name": calendar_name, "event_uid": event_uid}, + ) + + assert get_result.isError is False, ( + f"MCP event get failed: {get_result.content}" + ) + + get_event_data = json.loads(get_result.content[0].text) + assert get_event_data["title"] == test_event_title + assert get_event_data["location"] == test_location + + # 5. **TEST nc_calendar_list_events - This is the main tool we're testing** + logger.info("Testing nc_calendar_list_events via MCP") + + # Get today and next week for date range + today = datetime.now() + next_week = today + timedelta(days=7) + start_date = today.strftime("%Y-%m-%d") + end_date = next_week.strftime("%Y-%m-%d") + + list_events_data = { + "calendar_name": calendar_name, + "start_date": start_date, + "end_date": end_date, + "limit": 50, + "location_contains": "MCP Test", + "title_contains": unique_suffix, + } + + list_result = await nc_mcp_client.call_tool( + "nc_calendar_list_events", list_events_data + ) + + assert list_result.isError is False, ( + f"MCP list events failed: {list_result.content}" + ) + + events_data = json.loads(list_result.content[0].text) + + # Debug output to understand what nc_calendar_list_events returns + logger.info(f"list_events result type: {type(events_data)}") + logger.info(f"list_events result content: {events_data}") + + # Handle single event returned as dict instead of list (same fix as calendars) + if isinstance(events_data, dict): + # Single event returned as dict instead of list + events_data = [events_data] + + assert isinstance(events_data, list), "Expected events list" + + # Our created event should be in the list + found_event = None + for event in events_data: + if event.get("uid") == event_uid: + found_event = event + break + + assert found_event is not None, ( + f"Created event {event_uid} not found in events list" + ) + assert found_event["title"] == test_event_title + + # 6. Test list events across all calendars + logger.info("Testing nc_calendar_list_events across all calendars") + + all_calendars_data = { + "calendar_name": "", # Will be ignored + "search_all_calendars": True, + "start_date": start_date, + "end_date": end_date, + "title_contains": unique_suffix, + } + + all_list_result = await nc_mcp_client.call_tool( + "nc_calendar_list_events", all_calendars_data + ) + + assert all_list_result.isError is False, ( + f"MCP list all events failed: {all_list_result.content}" + ) + + all_events_data = json.loads(all_list_result.content[0].text) + + # Handle single event returned as dict instead of list (same fix as calendars) + if isinstance(all_events_data, dict): + # Single event returned as dict instead of list + all_events_data = [all_events_data] + + assert isinstance(all_events_data, list), "Expected events list" + + # Our event should still be found when searching all calendars + found_in_all = any(event.get("uid") == event_uid for event in all_events_data) + assert found_in_all, "Event not found when searching all calendars" + + # 7. Update event via MCP + updated_title = f"Updated {test_event_title}" + updated_description = f"Updated description {unique_suffix}" + + update_data = { + "calendar_name": calendar_name, + "event_uid": event_uid, + "title": updated_title, + "description": updated_description, + "priority": 1, + } + + logger.info(f"Updating event via MCP: {event_uid}") + update_result = await nc_mcp_client.call_tool( + "nc_calendar_update_event", update_data + ) + + assert update_result.isError is False, ( + f"MCP event update failed: {update_result.content}" + ) + + # 8. Verify update via direct NextcloudClient + updated_direct_event, _ = await nc_client.calendar.get_event( + calendar_name, event_uid + ) + assert updated_direct_event["title"] == updated_title + assert updated_direct_event["description"] == updated_description + assert updated_direct_event["priority"] == 1 + + # 9. Test upcoming events via MCP + logger.info("Testing nc_calendar_get_upcoming_events via MCP") + upcoming_result = await nc_mcp_client.call_tool( + "nc_calendar_get_upcoming_events", + {"calendar_name": calendar_name, "days_ahead": 7, "limit": 10}, + ) + + assert upcoming_result.isError is False, ( + f"MCP upcoming events failed: {upcoming_result.content}" + ) + + upcoming_events = json.loads(upcoming_result.content[0].text) + + # Handle single event returned as dict instead of list (same fix as other tools) + if isinstance(upcoming_events, dict): + # Single event returned as dict instead of list + upcoming_events = [upcoming_events] + + assert isinstance(upcoming_events, list), "Expected upcoming events list" + + # 10. Delete event via MCP + logger.info(f"Deleting event via MCP: {event_uid}") + delete_result = await nc_mcp_client.call_tool( + "nc_calendar_delete_event", + {"calendar_name": calendar_name, "event_uid": event_uid}, + ) + + assert delete_result.isError is False, ( + f"MCP event deletion failed: {delete_result.content}" + ) + + # 11. Verify deletion via direct NextcloudClient + try: + await nc_client.calendar.get_event(calendar_name, event_uid) + pytest.fail("Event should have been deleted but was still found") + except Exception: + # Expected - event should be deleted + logger.info(f"Successfully verified event {event_uid} was deleted") + created_event = None # Mark as cleaned up + + except Exception as e: + if "Calendar app may not be enabled" in str( + e + ) or "No calendars available" in str(e): + pytest.skip("Calendar functionality not available for testing") + raise + + finally: + # Cleanup in case of test failure + if created_event is not None: + try: + await nc_client.calendar.delete_event( + created_event["calendar_name"], created_event["uid"] + ) + logger.info( + f"Cleaned up event {created_event['uid']} after test failure" + ) + except Exception as e: + logger.warning(f"Failed to cleanup event: {e}")