"""CalDAV client for NextCloud calendar operations.""" import datetime as dt import logging import uuid import xml.etree.ElementTree as ET from typing import Any, Dict, List, Optional, Tuple from httpx import HTTPStatusError from icalendar import Alarm, Calendar from icalendar import Event as ICalEvent from icalendar import vRecur from .base import BaseNextcloudClient logger = logging.getLogger(__name__) class CalendarClient(BaseNextcloudClient): """Client for NextCloud CalDAV calendar operations.""" def _get_caldav_base_path(self) -> str: """Helper to get the base CalDAV path for calendars.""" return f"/remote.php/dav/calendars/{self.username}" def _get_principals_path(self) -> str: """Helper to get the principals path for the user.""" return f"/remote.php/dav/principals/users/{self.username}" async def list_calendars(self) -> List[Dict[str, Any]]: """List all available calendars for the user.""" caldav_path = self._get_caldav_base_path() propfind_body = """ """ headers = { "Depth": "1", "Content-Type": "application/xml", "Accept": "application/xml", } response = await self._make_request( "PROPFIND", caldav_path, content=propfind_body, headers=headers ) # Parse XML response root = ET.fromstring(response.content) calendars = [] for response_elem in root.findall(".//{DAV:}response"): href = response_elem.find(".//{DAV:}href") if href is None: continue href_text = href.text or "" if not href_text.endswith("/"): continue # Skip non-calendar resources # Extract calendar name from href calendar_name = href_text.rstrip("/").split("/")[-1] if not calendar_name or calendar_name == self.username: continue # Get properties propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: continue prop = propstat.find(".//{DAV:}prop") if prop is None: continue # Check if it's a calendar resource resourcetype = prop.find(".//{DAV:}resourcetype") is_calendar = ( resourcetype is not None and resourcetype.find(".//{urn:ietf:params:xml:ns:caldav}calendar") is not None ) if not is_calendar: continue # Extract calendar properties displayname_elem = prop.find(".//{DAV:}displayname") displayname = ( displayname_elem.text if displayname_elem is not None else calendar_name ) description_elem = prop.find( ".//{urn:ietf:params:xml:ns:caldav}calendar-description" ) description = description_elem.text if description_elem is not None else "" color_elem = prop.find(".//{http://calendarserver.org/ns/}calendar-color") color = color_elem.text if color_elem is not None else "#1976D2" calendars.append( { "name": calendar_name, "display_name": displayname, "description": description, "color": color, "href": href_text, } ) logger.debug(f"Found {len(calendars)} calendars") return calendars async def get_calendar_events( self, calendar_name: str, start_datetime: Optional[dt.datetime] = None, end_datetime: Optional[dt.datetime] = None, limit: int = 50, ) -> List[Dict[str, Any]]: """List events in a calendar within date range.""" calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" # Build time range filter if dates provided time_range_filter = "" if start_datetime or end_datetime: # Convert datetime objects to CalDAV format (YYYYMMDDTHHMMSSZ) start_dt = ( start_datetime.strftime("%Y%m%dT%H%M%SZ") if start_datetime else "19700101T000000Z" ) end_dt = ( end_datetime.strftime("%Y%m%dT%H%M%SZ") if end_datetime else "20301231T235959Z" ) time_range_filter = f""" """ report_body = f""" {time_range_filter} """ headers = { "Depth": "1", "Content-Type": "application/xml", "Accept": "application/xml", } response = await self._make_request( "REPORT", calendar_path, content=report_body, headers=headers ) # Parse XML response and extract events root = ET.fromstring(response.content) events = [] for response_elem in root.findall(".//{DAV:}response"): href = response_elem.find(".//{DAV:}href") if href is None: continue propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: continue prop = propstat.find(".//{DAV:}prop") if prop is None: continue calendar_data = prop.find(".//{urn:ietf:params:xml:ns:caldav}calendar-data") etag_elem = prop.find(".//{DAV:}getetag") if calendar_data is not None and calendar_data.text: event_data = self._parse_ical_event(calendar_data.text) if event_data: event_data["href"] = href.text event_data["etag"] = etag_elem.text if etag_elem is not None else "" events.append(event_data) if len(events) >= limit: break logger.debug(f"Found {len(events)} events") return events async def create_event( self, calendar_name: str, event_data: Dict[str, Any] ) -> Dict[str, Any]: """Create a new calendar event with comprehensive features.""" event_uid = str(uuid.uuid4()) event_filename = f"{event_uid}.ics" event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" # Create iCalendar event ical_content = self._create_ical_event(event_data, event_uid) headers = { "Content-Type": "text/calendar; charset=utf-8", "If-None-Match": "*", # Ensure we're creating, not updating } response = await self._make_request( "PUT", event_path, content=ical_content, headers=headers ) logger.debug(f"Created event {event_uid}") return { "uid": event_uid, "href": event_path, "etag": response.headers.get("etag", ""), "status_code": response.status_code, } async def update_event( self, calendar_name: str, event_uid: str, event_data: Dict[str, Any], etag: str = "", ) -> Dict[str, Any]: """Update an existing calendar event.""" event_filename = f"{event_uid}.ics" event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" # Get existing event data to merge with updates existing_event_data = {} if not etag: try: existing_event_data, current_etag = await self.get_event( calendar_name, event_uid ) etag = current_etag except Exception: # Continue without etag if we can't get it pass # Merge existing data with new data (new data takes precedence) merged_data = {**existing_event_data, **event_data} # Create updated iCalendar event ical_content = self._create_ical_event(merged_data, event_uid) headers = { "Content-Type": "text/calendar; charset=utf-8", } if etag: headers["If-Match"] = etag try: response = await self._make_request( "PUT", event_path, content=ical_content, headers=headers ) logger.debug(f"Updated event {event_uid}") return { "uid": event_uid, "href": event_path, "etag": response.headers.get("etag", ""), "status_code": response.status_code, } except HTTPStatusError as e: logger.error(f"HTTP error updating event: {e}") raise e except Exception as e: logger.error(f"Unexpected error updating event: {e}") raise e async def delete_event(self, calendar_name: str, event_uid: str) -> Dict[str, Any]: """Delete a calendar event.""" event_filename = f"{event_uid}.ics" event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" try: response = await self._make_request("DELETE", event_path) logger.debug(f"Deleted event {event_uid}") return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Event {event_uid} not found") return {"status_code": 404} logger.error(f"HTTP error deleting event: {e}") raise e except Exception as e: logger.error(f"Unexpected error deleting event: {e}") raise e async def get_event( self, calendar_name: str, event_uid: str ) -> Tuple[Dict[str, Any], str]: """Get detailed information about a specific event.""" event_filename = f"{event_uid}.ics" event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" headers = {"Accept": "text/calendar"} try: response = await self._make_request("GET", event_path, headers=headers) etag = response.headers.get("etag", "") event_data = self._parse_ical_event(response.text) if not event_data: raise ValueError(f"Failed to parse event data for {event_uid}") event_data["href"] = event_path event_data["etag"] = etag logger.debug(f"Retrieved event {event_uid}") return event_data, etag except HTTPStatusError as e: logger.error(f"HTTP error getting event: {e}") raise e except Exception as e: logger.error(f"Unexpected error getting event: {e}") raise e def _create_ical_event(self, event_data: Dict[str, Any], event_uid: str) -> str: """Create iCalendar content from event data.""" cal = Calendar() cal.add("prodid", "-//NextCloud MCP Server//EN") cal.add("version", "2.0") event = ICalEvent() event.add("uid", event_uid) event.add("summary", event_data.get("title", "")) event.add("description", event_data.get("description", "")) event.add("location", event_data.get("location", "")) # Handle dates/times start_str = event_data.get("start_datetime", "") end_str = event_data.get("end_datetime", "") all_day = event_data.get("all_day", False) if start_str: # Only parse if start_datetime is provided if all_day: start_date = dt.datetime.fromisoformat(start_str.split("T")[0]).date() event.add("dtstart", start_date) if end_str: end_date = dt.datetime.fromisoformat(end_str.split("T")[0]).date() event.add("dtend", end_date) else: start_dt = dt.datetime.fromisoformat(start_str.replace("Z", "+00:00")) event.add("dtstart", start_dt) if end_str: end_dt = dt.datetime.fromisoformat(end_str.replace("Z", "+00:00")) event.add("dtend", end_dt) # Add categories categories = event_data.get("categories", "") if categories: event.add("categories", categories.split(",")) # Add priority and status priority = event_data.get("priority", 5) event.add("priority", priority) status = event_data.get("status", "CONFIRMED") event.add("status", status) # Add privacy classification privacy = event_data.get("privacy", "PUBLIC") event.add("class", privacy) # Add URL url = event_data.get("url", "") if url: event.add("url", url) # Handle recurrence recurring = event_data.get("recurring", False) if recurring: recurrence_rule = event_data.get("recurrence_rule", "") if recurrence_rule: event.add("rrule", vRecur.from_ical(recurrence_rule)) # Add alarms/reminders reminder_minutes = event_data.get("reminder_minutes", 0) if reminder_minutes > 0: alarm = Alarm() alarm.add("action", "DISPLAY") alarm.add("description", "Event reminder") alarm.add("trigger", dt.timedelta(minutes=-reminder_minutes)) event.add_component(alarm) # Add attendees attendees = event_data.get("attendees", "") if attendees: for email in attendees.split(","): if email.strip(): event.add("attendee", f"mailto:{email.strip()}") # Add timestamps now = dt.datetime.now(dt.UTC) event.add("created", now) event.add("dtstamp", now) event.add("last-modified", now) cal.add_component(event) return cal.to_ical().decode("utf-8") def _parse_ical_event(self, ical_text: str) -> Optional[Dict[str, Any]]: """Parse iCalendar text and extract event data.""" try: cal = Calendar.from_ical(ical_text) for component in cal.walk(): if component.name == "VEVENT": event_data = { "uid": str(component.get("uid", "")), "title": str(component.get("summary", "")), "description": str(component.get("description", "")), "location": str(component.get("location", "")), "status": str(component.get("status", "CONFIRMED")), "priority": int(component.get("priority", 5)), "privacy": str(component.get("class", "PUBLIC")), "url": str(component.get("url", "")), } # Handle dates dtstart = component.get("dtstart") if dtstart: if isinstance(dtstart.dt, dt.date) and not isinstance( dtstart.dt, dt.datetime ): event_data["start_datetime"] = dtstart.dt.isoformat() event_data["all_day"] = True else: event_data["start_datetime"] = dtstart.dt.isoformat() event_data["all_day"] = False dtend = component.get("dtend") if dtend: if isinstance(dtend.dt, dt.date) and not isinstance( dtend.dt, dt.datetime ): event_data["end_datetime"] = dtend.dt.isoformat() else: event_data["end_datetime"] = dtend.dt.isoformat() # Handle categories categories = component.get("categories") if categories: event_data["categories"] = self._extract_categories(categories) # Handle recurrence rrule = component.get("rrule") if rrule: event_data["recurring"] = True event_data["recurrence_rule"] = str(rrule) # Handle attendees attendees = [] for attendee in component.get("attendee", []): if isinstance(attendee, list): attendees.extend( str(a).replace("mailto:", "") for a in attendee ) else: attendees.append(str(attendee).replace("mailto:", "")) if attendees: event_data["attendees"] = ",".join(attendees) return event_data return None except Exception as e: logger.error(f"Error parsing iCalendar: {e}") return None def _extract_categories(self, categories_obj) -> str: """Extract categories from icalendar object to string.""" if not categories_obj: return "" try: # Handle icalendar vCategory objects if hasattr(categories_obj, "cats"): # vCategory object has a 'cats' attribute that's a list return ", ".join(str(cat) for cat in categories_obj.cats) elif hasattr(categories_obj, "__iter__") and not isinstance( categories_obj, str ): # Handle lists or other iterables return ", ".join(str(cat) for cat in categories_obj) else: # Handle strings or other objects return str(categories_obj) except Exception: # Fallback to string conversion return str(categories_obj) async def search_events_across_calendars( self, start_datetime: Optional[dt.datetime] = None, end_datetime: Optional[dt.datetime] = None, filters: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Search events across all calendars with advanced filtering.""" try: calendars = await self.list_calendars() all_events = [] for calendar in calendars: try: events = await self.get_calendar_events( calendar["name"], start_datetime, end_datetime ) # Apply filters if provided if filters: events = self._apply_event_filters(events, filters) # Add calendar info to each event for event in events: event["calendar_name"] = calendar["name"] event["calendar_display_name"] = calendar.get( "display_name", calendar["name"] ) all_events.extend(events) except Exception as e: logger.warning( f"Error getting events from calendar {calendar['name']}: {e}" ) continue return all_events except Exception as e: logger.error(f"Error searching events across calendars: {e}") raise def _apply_event_filters( self, events: List[Dict[str, Any]], filters: Dict[str, Any] ) -> List[Dict[str, Any]]: """Apply advanced filters to event list.""" filtered_events = [] for event in events: # Skip if event doesn't match filters if not self._event_matches_filters(event, filters): continue filtered_events.append(event) return filtered_events def _event_matches_filters( self, event: Dict[str, Any], filters: Dict[str, Any] ) -> bool: """Check if an event matches the provided filters.""" try: # Filter by minimum attendees if "min_attendees" in filters: attendees = event.get("attendees", "") attendee_count = len(attendees.split(",")) if attendees else 0 if attendee_count < filters["min_attendees"]: return False # Filter by minimum duration if "min_duration_minutes" in filters: start_str = event.get("start_datetime", "") end_str = event.get("end_datetime", "") if start_str and end_str: try: start_dt = dt.datetime.fromisoformat( start_str.replace("Z", "+00:00") ) end_dt = dt.datetime.fromisoformat( end_str.replace("Z", "+00:00") ) duration_minutes = (end_dt - start_dt).total_seconds() / 60 if duration_minutes < filters["min_duration_minutes"]: return False except Exception: pass # Filter by categories if "categories" in filters: event_categories = event.get("categories", "").lower() required_categories = [cat.lower() for cat in filters["categories"]] if not any(cat in event_categories for cat in required_categories): return False # Filter by status if "status" in filters: if event.get("status", "").upper() != filters["status"].upper(): return False # Filter by title contains if "title_contains" in filters: title = event.get("title", "").lower() search_term = filters["title_contains"].lower() if search_term not in title: return False # Filter by location contains if "location_contains" in filters: location = event.get("location", "").lower() search_term = filters["location_contains"].lower() if search_term not in location: return False return True except Exception: # If filtering fails, include the event return True async def find_availability( self, duration_minutes: int, attendees: Optional[List[str]] = None, start_datetime: Optional[dt.datetime] = None, end_datetime: Optional[dt.datetime] = None, constraints: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Find available time slots for scheduling.""" try: # Set default date range if not provided if not start_datetime: start_datetime = dt.datetime.now() if not end_datetime: end_datetime = dt.datetime.now() + dt.timedelta(days=7) # Get all events in the date range busy_events = await self.search_events_across_calendars( start_datetime=start_datetime, end_datetime=end_datetime ) # Filter events for relevant attendees if specified if attendees: relevant_events = [] for event in busy_events: event_attendees = event.get("attendees", "").lower() if any( attendee.lower() in event_attendees for attendee in attendees ): relevant_events.append(event) busy_events = relevant_events # Apply constraints constraints = constraints or {} business_hours_only = constraints.get("business_hours_only", False) exclude_weekends = constraints.get("exclude_weekends", False) preferred_times = constraints.get("preferred_times", []) # Generate time slots available_slots = self._generate_available_slots( busy_events, duration_minutes, start_datetime, end_datetime, business_hours_only, exclude_weekends, preferred_times, ) return available_slots except Exception as e: logger.error(f"Error finding availability: {e}") raise def _generate_available_slots( self, busy_events: List[Dict[str, Any]], duration_minutes: int, start_datetime: dt.datetime, end_datetime: dt.datetime, business_hours_only: bool, exclude_weekends: bool, preferred_times: List[str], ) -> List[Dict[str, Any]]: """Generate available time slots.""" available_slots = [] try: current_date = start_datetime.replace( hour=0, minute=0, second=0, microsecond=0 ) end_date_dt = end_datetime.replace( hour=23, minute=59, second=59, microsecond=999999 ) while current_date <= end_date_dt: # Skip weekends if requested if exclude_weekends and current_date.weekday() >= 5: current_date += dt.timedelta(days=1) continue # Generate slots for this day day_slots = self._generate_day_slots( current_date, busy_events, duration_minutes, business_hours_only, preferred_times, ) available_slots.extend(day_slots) current_date += dt.timedelta(days=1) return available_slots[:10] # Limit to 10 slots except Exception as e: logger.error(f"Error generating available slots: {e}") return [] def _generate_day_slots( self, date: dt.datetime, busy_events: List[Dict[str, Any]], duration_minutes: int, business_hours_only: bool, preferred_times: List[str], ) -> List[Dict[str, Any]]: """Generate available slots for a specific day.""" slots = [] try: # Define working hours if business_hours_only: start_hour, end_hour = 9, 17 else: start_hour, end_hour = 8, 20 # Get busy periods for this day day_busy_periods = [] for event in busy_events: try: event_start = dt.datetime.fromisoformat( event["start_datetime"].replace("Z", "+00:00") ) event_end = dt.datetime.fromisoformat( event["end_datetime"].replace("Z", "+00:00") ) # Check if event is on this day if event_start.date() == date.date(): day_busy_periods.append((event_start.time(), event_end.time())) except Exception: continue # Sort busy periods day_busy_periods.sort() # Generate potential slots current_time = date.replace( hour=start_hour, minute=0, second=0, microsecond=0 ) end_time = date.replace(hour=end_hour, minute=0, second=0, microsecond=0) slot_duration = dt.timedelta(minutes=duration_minutes) while current_time + slot_duration <= end_time: slot_end = current_time + slot_duration # Check if slot conflicts with any busy period if not self._slot_conflicts( current_time.time(), slot_end.time(), day_busy_periods ): # Check preferred times if specified if not preferred_times or self._slot_in_preferred_times( current_time.time(), preferred_times ): slots.append( { "start_datetime": current_time.isoformat(), "end_datetime": slot_end.isoformat(), "duration_minutes": duration_minutes, "date": date.date().isoformat(), } ) current_time += dt.timedelta(minutes=30) # 30-minute increments return slots except Exception as e: logger.error(f"Error generating day slots: {e}") return [] def _slot_conflicts(self, slot_start, slot_end, busy_periods): """Check if a time slot conflicts with busy periods.""" for busy_start, busy_end in busy_periods: if slot_start < busy_end and slot_end > busy_start: return True return False def _slot_in_preferred_times(self, slot_start, preferred_times): """Check if slot falls within preferred time ranges.""" if not preferred_times: return True for time_range in preferred_times: try: start_str, end_str = time_range.split("-") pref_start = dt.datetime.strptime(start_str, "%H:%M").time() pref_end = dt.datetime.strptime(end_str, "%H:%M").time() if pref_start <= slot_start <= pref_end: return True except Exception: continue return False async def bulk_update_events( self, filter_criteria: Dict[str, Any], update_data: Dict[str, Any] ) -> Dict[str, Any]: """Bulk update events matching filter criteria.""" try: # Convert string dates to datetime objects if present start_datetime = None end_datetime = None if "start_date" in filter_criteria and filter_criteria["start_date"]: start_datetime = dt.datetime.fromisoformat( filter_criteria["start_date"] ) if "end_date" in filter_criteria and filter_criteria["end_date"]: end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"]) # Find events matching criteria events = await self.search_events_across_calendars( start_datetime=start_datetime, end_datetime=end_datetime, filters=filter_criteria, ) updated_count = 0 failed_count = 0 results = [] for event in events: try: # Update the event await self.update_event( event["calendar_name"], event["uid"], update_data ) updated_count += 1 results.append( { "uid": event["uid"], "status": "updated", "title": event.get("title", ""), } ) except Exception as e: failed_count += 1 results.append( { "uid": event["uid"], "status": "failed", "error": str(e), "title": event.get("title", ""), } ) return { "total_found": len(events), "updated_count": updated_count, "failed_count": failed_count, "results": results, } except Exception as e: logger.error(f"Error in bulk update: {e}") raise async def create_calendar( self, calendar_name: str, display_name: str = "", description: str = "", color: str = "#1976D2", ) -> Dict[str, Any]: """Create a new calendar.""" try: # Calendar creation via CalDAV MKCALENDAR calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" # Create MKCALENDAR body mkcol_body = f""" {display_name or calendar_name} {color} {description} """ headers = {"Content-Type": "application/xml", "Depth": "0"} response = await self._make_request( "MKCALENDAR", calendar_path, content=mkcol_body, headers=headers ) logger.debug(f"Created calendar: {calendar_name}") return { "name": calendar_name, "display_name": display_name or calendar_name, "description": description, "color": color, "status_code": response.status_code, } except Exception as e: logger.error(f"Error creating calendar {calendar_name}: {e}") raise async def delete_calendar(self, calendar_name: str) -> Dict[str, Any]: """Delete a calendar.""" try: calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/" response = await self._make_request("DELETE", calendar_path) logger.debug(f"Deleted calendar: {calendar_name}") return {"status_code": response.status_code} except Exception as e: logger.error(f"Error deleting calendar {calendar_name}: {e}") raise