Files
2025-08-01 12:21:32 +02:00

952 lines
35 KiB
Python

"""CalDAV client for NextCloud calendar operations."""
import datetime as dt
import logging
import uuid
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional, Tuple
from httpx import HTTPStatusError
from icalendar import Alarm, Calendar
from icalendar import Event as ICalEvent
from icalendar import vRecur
from .base import BaseNextcloudClient
logger = logging.getLogger(__name__)
class CalendarClient(BaseNextcloudClient):
"""Client for NextCloud CalDAV calendar operations."""
def _get_caldav_base_path(self) -> str:
"""Helper to get the base CalDAV path for calendars."""
return f"/remote.php/dav/calendars/{self.username}"
def _get_principals_path(self) -> str:
"""Helper to get the principals path for the user."""
return f"/remote.php/dav/principals/users/{self.username}"
async def list_calendars(self) -> List[Dict[str, Any]]:
"""List all available calendars for the user."""
caldav_path = self._get_caldav_base_path()
propfind_body = """<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav" xmlns:cs="http://calendarserver.org/ns/">
<d:prop>
<d:displayname/>
<d:resourcetype/>
<c:calendar-description/>
<cs:calendar-color/>
<c:supported-calendar-component-set/>
</d:prop>
</d:propfind>"""
headers = {
"Depth": "1",
"Content-Type": "application/xml",
"Accept": "application/xml",
}
response = await self._make_request(
"PROPFIND", caldav_path, content=propfind_body, headers=headers
)
# Parse XML response
root = ET.fromstring(response.content)
calendars = []
for response_elem in root.findall(".//{DAV:}response"):
href = response_elem.find(".//{DAV:}href")
if href is None:
continue
href_text = href.text or ""
if not href_text.endswith("/"):
continue # Skip non-calendar resources
# Extract calendar name from href
calendar_name = href_text.rstrip("/").split("/")[-1]
if not calendar_name or calendar_name == self.username:
continue
# Get properties
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
continue
prop = propstat.find(".//{DAV:}prop")
if prop is None:
continue
# Check if it's a calendar resource
resourcetype = prop.find(".//{DAV:}resourcetype")
is_calendar = (
resourcetype is not None
and resourcetype.find(".//{urn:ietf:params:xml:ns:caldav}calendar")
is not None
)
if not is_calendar:
continue
# Extract calendar properties
displayname_elem = prop.find(".//{DAV:}displayname")
displayname = (
displayname_elem.text if displayname_elem is not None else calendar_name
)
description_elem = prop.find(
".//{urn:ietf:params:xml:ns:caldav}calendar-description"
)
description = description_elem.text if description_elem is not None else ""
color_elem = prop.find(".//{http://calendarserver.org/ns/}calendar-color")
color = color_elem.text if color_elem is not None else "#1976D2"
calendars.append(
{
"name": calendar_name,
"display_name": displayname,
"description": description,
"color": color,
"href": href_text,
}
)
logger.debug(f"Found {len(calendars)} calendars")
return calendars
async def get_calendar_events(
self,
calendar_name: str,
start_datetime: Optional[dt.datetime] = None,
end_datetime: Optional[dt.datetime] = None,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""List events in a calendar within date range."""
calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/"
# Build time range filter if dates provided
time_range_filter = ""
if start_datetime or end_datetime:
# Convert datetime objects to CalDAV format (YYYYMMDDTHHMMSSZ)
start_dt = (
start_datetime.strftime("%Y%m%dT%H%M%SZ")
if start_datetime
else "19700101T000000Z"
)
end_dt = (
end_datetime.strftime("%Y%m%dT%H%M%SZ")
if end_datetime
else "20301231T235959Z"
)
time_range_filter = f"""
<c:time-range start="{start_dt}" end="{end_dt}"/>
"""
report_body = f"""<?xml version="1.0" encoding="utf-8"?>
<c:calendar-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop>
<d:getetag/>
<c:calendar-data/>
</d:prop>
<c:filter>
<c:comp-filter name="VCALENDAR">
<c:comp-filter name="VEVENT">
{time_range_filter}
</c:comp-filter>
</c:comp-filter>
</c:filter>
</c:calendar-query>"""
headers = {
"Depth": "1",
"Content-Type": "application/xml",
"Accept": "application/xml",
}
response = await self._make_request(
"REPORT", calendar_path, content=report_body, headers=headers
)
# Parse XML response and extract events
root = ET.fromstring(response.content)
events = []
for response_elem in root.findall(".//{DAV:}response"):
href = response_elem.find(".//{DAV:}href")
if href is None:
continue
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
continue
prop = propstat.find(".//{DAV:}prop")
if prop is None:
continue
calendar_data = prop.find(".//{urn:ietf:params:xml:ns:caldav}calendar-data")
etag_elem = prop.find(".//{DAV:}getetag")
if calendar_data is not None and calendar_data.text:
event_data = self._parse_ical_event(calendar_data.text)
if event_data:
event_data["href"] = href.text
event_data["etag"] = etag_elem.text if etag_elem is not None else ""
events.append(event_data)
if len(events) >= limit:
break
logger.debug(f"Found {len(events)} events")
return events
async def create_event(
self, calendar_name: str, event_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Create a new calendar event with comprehensive features."""
event_uid = str(uuid.uuid4())
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
# Create iCalendar event
ical_content = self._create_ical_event(event_data, event_uid)
headers = {
"Content-Type": "text/calendar; charset=utf-8",
"If-None-Match": "*", # Ensure we're creating, not updating
}
response = await self._make_request(
"PUT", event_path, content=ical_content, headers=headers
)
logger.debug(f"Created event {event_uid}")
return {
"uid": event_uid,
"href": event_path,
"etag": response.headers.get("etag", ""),
"status_code": response.status_code,
}
async def update_event(
self,
calendar_name: str,
event_uid: str,
event_data: Dict[str, Any],
etag: str = "",
) -> Dict[str, Any]:
"""Update an existing calendar event."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
# Get existing event data to merge with updates
existing_event_data = {}
if not etag:
try:
existing_event_data, current_etag = await self.get_event(
calendar_name, event_uid
)
etag = current_etag
except Exception:
# Continue without etag if we can't get it
pass
# Merge existing data with new data (new data takes precedence)
merged_data = {**existing_event_data, **event_data}
# Create updated iCalendar event
ical_content = self._create_ical_event(merged_data, event_uid)
headers = {
"Content-Type": "text/calendar; charset=utf-8",
}
if etag:
headers["If-Match"] = etag
try:
response = await self._make_request(
"PUT", event_path, content=ical_content, headers=headers
)
logger.debug(f"Updated event {event_uid}")
return {
"uid": event_uid,
"href": event_path,
"etag": response.headers.get("etag", ""),
"status_code": response.status_code,
}
except HTTPStatusError as e:
logger.error(f"HTTP error updating event: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error updating event: {e}")
raise e
async def delete_event(self, calendar_name: str, event_uid: str) -> Dict[str, Any]:
"""Delete a calendar event."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
try:
response = await self._make_request("DELETE", event_path)
logger.debug(f"Deleted event {event_uid}")
return {"status_code": response.status_code}
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"Event {event_uid} not found")
return {"status_code": 404}
logger.error(f"HTTP error deleting event: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error deleting event: {e}")
raise e
async def get_event(
self, calendar_name: str, event_uid: str
) -> Tuple[Dict[str, Any], str]:
"""Get detailed information about a specific event."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
headers = {"Accept": "text/calendar"}
try:
response = await self._make_request("GET", event_path, headers=headers)
etag = response.headers.get("etag", "")
event_data = self._parse_ical_event(response.text)
if not event_data:
raise ValueError(f"Failed to parse event data for {event_uid}")
event_data["href"] = event_path
event_data["etag"] = etag
logger.debug(f"Retrieved event {event_uid}")
return event_data, etag
except HTTPStatusError as e:
logger.error(f"HTTP error getting event: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error getting event: {e}")
raise e
def _create_ical_event(self, event_data: Dict[str, Any], event_uid: str) -> str:
"""Create iCalendar content from event data."""
cal = Calendar()
cal.add("prodid", "-//NextCloud MCP Server//EN")
cal.add("version", "2.0")
event = ICalEvent()
event.add("uid", event_uid)
event.add("summary", event_data.get("title", ""))
event.add("description", event_data.get("description", ""))
event.add("location", event_data.get("location", ""))
# Handle dates/times
start_str = event_data.get("start_datetime", "")
end_str = event_data.get("end_datetime", "")
all_day = event_data.get("all_day", False)
if start_str: # Only parse if start_datetime is provided
if all_day:
start_date = dt.datetime.fromisoformat(start_str.split("T")[0]).date()
event.add("dtstart", start_date)
if end_str:
end_date = dt.datetime.fromisoformat(end_str.split("T")[0]).date()
event.add("dtend", end_date)
else:
start_dt = dt.datetime.fromisoformat(start_str.replace("Z", "+00:00"))
event.add("dtstart", start_dt)
if end_str:
end_dt = dt.datetime.fromisoformat(end_str.replace("Z", "+00:00"))
event.add("dtend", end_dt)
# Add categories
categories = event_data.get("categories", "")
if categories:
event.add("categories", categories.split(","))
# Add priority and status
priority = event_data.get("priority", 5)
event.add("priority", priority)
status = event_data.get("status", "CONFIRMED")
event.add("status", status)
# Add privacy classification
privacy = event_data.get("privacy", "PUBLIC")
event.add("class", privacy)
# Add URL
url = event_data.get("url", "")
if url:
event.add("url", url)
# Handle recurrence
recurring = event_data.get("recurring", False)
if recurring:
recurrence_rule = event_data.get("recurrence_rule", "")
if recurrence_rule:
event.add("rrule", vRecur.from_ical(recurrence_rule))
# Add alarms/reminders
reminder_minutes = event_data.get("reminder_minutes", 0)
if reminder_minutes > 0:
alarm = Alarm()
alarm.add("action", "DISPLAY")
alarm.add("description", "Event reminder")
alarm.add("trigger", dt.timedelta(minutes=-reminder_minutes))
event.add_component(alarm)
# Add attendees
attendees = event_data.get("attendees", "")
if attendees:
for email in attendees.split(","):
if email.strip():
event.add("attendee", f"mailto:{email.strip()}")
# Add timestamps
now = dt.datetime.now(dt.UTC)
event.add("created", now)
event.add("dtstamp", now)
event.add("last-modified", now)
cal.add_component(event)
return cal.to_ical().decode("utf-8")
def _parse_ical_event(self, ical_text: str) -> Optional[Dict[str, Any]]:
"""Parse iCalendar text and extract event data."""
try:
cal = Calendar.from_ical(ical_text)
for component in cal.walk():
if component.name == "VEVENT":
event_data = {
"uid": str(component.get("uid", "")),
"title": str(component.get("summary", "")),
"description": str(component.get("description", "")),
"location": str(component.get("location", "")),
"status": str(component.get("status", "CONFIRMED")),
"priority": int(component.get("priority", 5)),
"privacy": str(component.get("class", "PUBLIC")),
"url": str(component.get("url", "")),
}
# Handle dates
dtstart = component.get("dtstart")
if dtstart:
if isinstance(dtstart.dt, dt.date) and not isinstance(
dtstart.dt, dt.datetime
):
event_data["start_datetime"] = dtstart.dt.isoformat()
event_data["all_day"] = True
else:
event_data["start_datetime"] = dtstart.dt.isoformat()
event_data["all_day"] = False
dtend = component.get("dtend")
if dtend:
if isinstance(dtend.dt, dt.date) and not isinstance(
dtend.dt, dt.datetime
):
event_data["end_datetime"] = dtend.dt.isoformat()
else:
event_data["end_datetime"] = dtend.dt.isoformat()
# Handle categories
categories = component.get("categories")
if categories:
event_data["categories"] = self._extract_categories(categories)
# Handle recurrence
rrule = component.get("rrule")
if rrule:
event_data["recurring"] = True
event_data["recurrence_rule"] = str(rrule)
# Handle attendees
attendees = []
for attendee in component.get("attendee", []):
if isinstance(attendee, list):
attendees.extend(
str(a).replace("mailto:", "") for a in attendee
)
else:
attendees.append(str(attendee).replace("mailto:", ""))
if attendees:
event_data["attendees"] = ",".join(attendees)
return event_data
return None
except Exception as e:
logger.error(f"Error parsing iCalendar: {e}")
return None
def _extract_categories(self, categories_obj) -> str:
"""Extract categories from icalendar object to string."""
if not categories_obj:
return ""
try:
# Handle icalendar vCategory objects
if hasattr(categories_obj, "cats"):
# vCategory object has a 'cats' attribute that's a list
return ", ".join(str(cat) for cat in categories_obj.cats)
elif hasattr(categories_obj, "__iter__") and not isinstance(
categories_obj, str
):
# Handle lists or other iterables
return ", ".join(str(cat) for cat in categories_obj)
else:
# Handle strings or other objects
return str(categories_obj)
except Exception:
# Fallback to string conversion
return str(categories_obj)
async def search_events_across_calendars(
self,
start_datetime: Optional[dt.datetime] = None,
end_datetime: Optional[dt.datetime] = None,
filters: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""Search events across all calendars with advanced filtering."""
try:
calendars = await self.list_calendars()
all_events = []
for calendar in calendars:
try:
events = await self.get_calendar_events(
calendar["name"], start_datetime, end_datetime
)
# Apply filters if provided
if filters:
events = self._apply_event_filters(events, filters)
# Add calendar info to each event
for event in events:
event["calendar_name"] = calendar["name"]
event["calendar_display_name"] = calendar.get(
"display_name", calendar["name"]
)
all_events.extend(events)
except Exception as e:
logger.warning(
f"Error getting events from calendar {calendar['name']}: {e}"
)
continue
return all_events
except Exception as e:
logger.error(f"Error searching events across calendars: {e}")
raise
def _apply_event_filters(
self, events: List[Dict[str, Any]], filters: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Apply advanced filters to event list."""
filtered_events = []
for event in events:
# Skip if event doesn't match filters
if not self._event_matches_filters(event, filters):
continue
filtered_events.append(event)
return filtered_events
def _event_matches_filters(
self, event: Dict[str, Any], filters: Dict[str, Any]
) -> bool:
"""Check if an event matches the provided filters."""
try:
# Filter by minimum attendees
if "min_attendees" in filters:
attendees = event.get("attendees", "")
attendee_count = len(attendees.split(",")) if attendees else 0
if attendee_count < filters["min_attendees"]:
return False
# Filter by minimum duration
if "min_duration_minutes" in filters:
start_str = event.get("start_datetime", "")
end_str = event.get("end_datetime", "")
if start_str and end_str:
try:
start_dt = dt.datetime.fromisoformat(
start_str.replace("Z", "+00:00")
)
end_dt = dt.datetime.fromisoformat(
end_str.replace("Z", "+00:00")
)
duration_minutes = (end_dt - start_dt).total_seconds() / 60
if duration_minutes < filters["min_duration_minutes"]:
return False
except Exception:
pass
# Filter by categories
if "categories" in filters:
event_categories = event.get("categories", "").lower()
required_categories = [cat.lower() for cat in filters["categories"]]
if not any(cat in event_categories for cat in required_categories):
return False
# Filter by status
if "status" in filters:
if event.get("status", "").upper() != filters["status"].upper():
return False
# Filter by title contains
if "title_contains" in filters:
title = event.get("title", "").lower()
search_term = filters["title_contains"].lower()
if search_term not in title:
return False
# Filter by location contains
if "location_contains" in filters:
location = event.get("location", "").lower()
search_term = filters["location_contains"].lower()
if search_term not in location:
return False
return True
except Exception:
# If filtering fails, include the event
return True
async def find_availability(
self,
duration_minutes: int,
attendees: Optional[List[str]] = None,
start_datetime: Optional[dt.datetime] = None,
end_datetime: Optional[dt.datetime] = None,
constraints: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""Find available time slots for scheduling."""
try:
# Set default date range if not provided
if not start_datetime:
start_datetime = dt.datetime.now()
if not end_datetime:
end_datetime = dt.datetime.now() + dt.timedelta(days=7)
# Get all events in the date range
busy_events = await self.search_events_across_calendars(
start_datetime=start_datetime, end_datetime=end_datetime
)
# Filter events for relevant attendees if specified
if attendees:
relevant_events = []
for event in busy_events:
event_attendees = event.get("attendees", "").lower()
if any(
attendee.lower() in event_attendees for attendee in attendees
):
relevant_events.append(event)
busy_events = relevant_events
# Apply constraints
constraints = constraints or {}
business_hours_only = constraints.get("business_hours_only", False)
exclude_weekends = constraints.get("exclude_weekends", False)
preferred_times = constraints.get("preferred_times", [])
# Generate time slots
available_slots = self._generate_available_slots(
busy_events,
duration_minutes,
start_datetime,
end_datetime,
business_hours_only,
exclude_weekends,
preferred_times,
)
return available_slots
except Exception as e:
logger.error(f"Error finding availability: {e}")
raise
def _generate_available_slots(
self,
busy_events: List[Dict[str, Any]],
duration_minutes: int,
start_datetime: dt.datetime,
end_datetime: dt.datetime,
business_hours_only: bool,
exclude_weekends: bool,
preferred_times: List[str],
) -> List[Dict[str, Any]]:
"""Generate available time slots."""
available_slots = []
try:
current_date = start_datetime.replace(
hour=0, minute=0, second=0, microsecond=0
)
end_date_dt = end_datetime.replace(
hour=23, minute=59, second=59, microsecond=999999
)
while current_date <= end_date_dt:
# Skip weekends if requested
if exclude_weekends and current_date.weekday() >= 5:
current_date += dt.timedelta(days=1)
continue
# Generate slots for this day
day_slots = self._generate_day_slots(
current_date,
busy_events,
duration_minutes,
business_hours_only,
preferred_times,
)
available_slots.extend(day_slots)
current_date += dt.timedelta(days=1)
return available_slots[:10] # Limit to 10 slots
except Exception as e:
logger.error(f"Error generating available slots: {e}")
return []
def _generate_day_slots(
self,
date: dt.datetime,
busy_events: List[Dict[str, Any]],
duration_minutes: int,
business_hours_only: bool,
preferred_times: List[str],
) -> List[Dict[str, Any]]:
"""Generate available slots for a specific day."""
slots = []
try:
# Define working hours
if business_hours_only:
start_hour, end_hour = 9, 17
else:
start_hour, end_hour = 8, 20
# Get busy periods for this day
day_busy_periods = []
for event in busy_events:
try:
event_start = dt.datetime.fromisoformat(
event["start_datetime"].replace("Z", "+00:00")
)
event_end = dt.datetime.fromisoformat(
event["end_datetime"].replace("Z", "+00:00")
)
# Check if event is on this day
if event_start.date() == date.date():
day_busy_periods.append((event_start.time(), event_end.time()))
except Exception:
continue
# Sort busy periods
day_busy_periods.sort()
# Generate potential slots
current_time = date.replace(
hour=start_hour, minute=0, second=0, microsecond=0
)
end_time = date.replace(hour=end_hour, minute=0, second=0, microsecond=0)
slot_duration = dt.timedelta(minutes=duration_minutes)
while current_time + slot_duration <= end_time:
slot_end = current_time + slot_duration
# Check if slot conflicts with any busy period
if not self._slot_conflicts(
current_time.time(), slot_end.time(), day_busy_periods
):
# Check preferred times if specified
if not preferred_times or self._slot_in_preferred_times(
current_time.time(), preferred_times
):
slots.append(
{
"start_datetime": current_time.isoformat(),
"end_datetime": slot_end.isoformat(),
"duration_minutes": duration_minutes,
"date": date.date().isoformat(),
}
)
current_time += dt.timedelta(minutes=30) # 30-minute increments
return slots
except Exception as e:
logger.error(f"Error generating day slots: {e}")
return []
def _slot_conflicts(self, slot_start, slot_end, busy_periods):
"""Check if a time slot conflicts with busy periods."""
for busy_start, busy_end in busy_periods:
if slot_start < busy_end and slot_end > busy_start:
return True
return False
def _slot_in_preferred_times(self, slot_start, preferred_times):
"""Check if slot falls within preferred time ranges."""
if not preferred_times:
return True
for time_range in preferred_times:
try:
start_str, end_str = time_range.split("-")
pref_start = dt.datetime.strptime(start_str, "%H:%M").time()
pref_end = dt.datetime.strptime(end_str, "%H:%M").time()
if pref_start <= slot_start <= pref_end:
return True
except Exception:
continue
return False
async def bulk_update_events(
self, filter_criteria: Dict[str, Any], update_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Bulk update events matching filter criteria."""
try:
# Convert string dates to datetime objects if present
start_datetime = None
end_datetime = None
if "start_date" in filter_criteria and filter_criteria["start_date"]:
start_datetime = dt.datetime.fromisoformat(
filter_criteria["start_date"]
)
if "end_date" in filter_criteria and filter_criteria["end_date"]:
end_datetime = dt.datetime.fromisoformat(filter_criteria["end_date"])
# Find events matching criteria
events = await self.search_events_across_calendars(
start_datetime=start_datetime,
end_datetime=end_datetime,
filters=filter_criteria,
)
updated_count = 0
failed_count = 0
results = []
for event in events:
try:
# Update the event
await self.update_event(
event["calendar_name"], event["uid"], update_data
)
updated_count += 1
results.append(
{
"uid": event["uid"],
"status": "updated",
"title": event.get("title", ""),
}
)
except Exception as e:
failed_count += 1
results.append(
{
"uid": event["uid"],
"status": "failed",
"error": str(e),
"title": event.get("title", ""),
}
)
return {
"total_found": len(events),
"updated_count": updated_count,
"failed_count": failed_count,
"results": results,
}
except Exception as e:
logger.error(f"Error in bulk update: {e}")
raise
async def create_calendar(
self,
calendar_name: str,
display_name: str = "",
description: str = "",
color: str = "#1976D2",
) -> Dict[str, Any]:
"""Create a new calendar."""
try:
# Calendar creation via CalDAV MKCALENDAR
calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/"
# Create MKCALENDAR body
mkcol_body = f"""<?xml version="1.0" encoding="utf-8"?>
<mkcalendar xmlns="urn:ietf:params:xml:ns:caldav" xmlns:d="DAV:" xmlns:cs="http://calendarserver.org/ns/">
<d:set>
<d:prop>
<d:displayname>{display_name or calendar_name}</d:displayname>
<cs:calendar-color>{color}</cs:calendar-color>
<caldav:calendar-description xmlns:caldav="urn:ietf:params:xml:ns:caldav">{description}</caldav:calendar-description>
<caldav:supported-calendar-component-set xmlns:caldav="urn:ietf:params:xml:ns:caldav">
<caldav:comp name="VEVENT"/>
</caldav:supported-calendar-component-set>
</d:prop>
</d:set>
</mkcalendar>"""
headers = {"Content-Type": "application/xml", "Depth": "0"}
response = await self._make_request(
"MKCALENDAR", calendar_path, content=mkcol_body, headers=headers
)
logger.debug(f"Created calendar: {calendar_name}")
return {
"name": calendar_name,
"display_name": display_name or calendar_name,
"description": description,
"color": color,
"status_code": response.status_code,
}
except Exception as e:
logger.error(f"Error creating calendar {calendar_name}: {e}")
raise
async def delete_calendar(self, calendar_name: str) -> Dict[str, Any]:
"""Delete a calendar."""
try:
calendar_path = f"{self._get_caldav_base_path()}/{calendar_name}/"
response = await self._make_request("DELETE", calendar_path)
logger.debug(f"Deleted calendar: {calendar_name}")
return {"status_code": response.status_code}
except Exception as e:
logger.error(f"Error deleting calendar {calendar_name}: {e}")
raise