feat(client): Preserve fields when modifying contacts/calendar resources

This commit is contained in:
Chris Coutinho
2025-08-30 19:14:26 +02:00
parent 1cc65f0160
commit 4cf5f2a95a
4 changed files with 790 additions and 11 deletions
+136 -11
View File
@@ -238,27 +238,33 @@ class CalendarClient(BaseNextcloudClient):
event_data: Dict[str, Any],
etag: str = "",
) -> Dict[str, Any]:
"""Update an existing calendar event."""
"""Update an existing calendar event while preserving all existing properties."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
# Get existing event data to merge with updates
existing_event_data = {}
# Get raw iCal content to preserve all properties including extended ones
raw_ical_content = ""
if not etag:
try:
existing_event_data, current_etag = await self.get_event(
raw_ical_content, current_etag = await self._get_raw_ical(
calendar_name, event_uid
)
etag = current_etag
except Exception:
# Continue without etag if we can't get it
pass
# Fall back to creating new iCal if we can't get existing
logger.warning(
f"Could not fetch existing iCal for {event_uid}, creating new"
)
raw_ical_content = ""
# Merge existing data with new data (new data takes precedence)
merged_data = {**existing_event_data, **event_data}
# Create updated iCalendar event
ical_content = self._create_ical_event(merged_data, event_uid)
# Create updated iCalendar event preserving existing properties
if raw_ical_content:
ical_content = self._merge_ical_properties(
raw_ical_content, event_data, event_uid
)
else:
# Fallback to creating new iCal if we couldn't get existing
ical_content = self._create_ical_event(event_data, event_uid)
headers = {
"Content-Type": "text/calendar; charset=utf-8",
@@ -949,3 +955,122 @@ class CalendarClient(BaseNextcloudClient):
except Exception as e:
logger.error(f"Error deleting calendar {calendar_name}: {e}")
raise
async def _get_raw_ical(
self, calendar_name: str, event_uid: str
) -> Tuple[str, str]:
"""Get raw iCal content for an event without parsing."""
event_filename = f"{event_uid}.ics"
event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}"
headers = {"Accept": "text/calendar"}
try:
response = await self._make_request("GET", event_path, headers=headers)
etag = response.headers.get("etag", "")
return response.text, etag
except Exception as e:
logger.error(f"Error getting raw iCal for {event_uid}: {e}")
raise
def _merge_ical_properties(
self, raw_ical: str, event_data: Dict[str, Any], event_uid: str
) -> str:
"""Merge new event data into existing raw iCal while preserving all properties."""
try:
# Parse existing iCal
cal = Calendar.from_ical(raw_ical)
# Find the VEVENT component
for component in cal.walk():
if component.name == "VEVENT":
# Update only the properties that were provided in event_data
if "title" in event_data:
component["SUMMARY"] = event_data["title"]
if "description" in event_data:
component["DESCRIPTION"] = event_data["description"]
if "location" in event_data:
component["LOCATION"] = event_data["location"]
if "status" in event_data:
component["STATUS"] = event_data["status"].upper()
if "priority" in event_data:
component["PRIORITY"] = event_data["priority"]
if "privacy" in event_data:
component["CLASS"] = event_data["privacy"].upper()
if "url" in event_data:
component["URL"] = event_data["url"]
# Handle dates
if "start_datetime" in event_data:
start_str = event_data["start_datetime"]
all_day = event_data.get("all_day", False)
if all_day:
start_date = dt.datetime.fromisoformat(
start_str.split("T")[0]
).date()
component["DTSTART"] = start_date
else:
start_dt = dt.datetime.fromisoformat(
start_str.replace("Z", "+00:00")
)
component["DTSTART"] = start_dt
if "end_datetime" in event_data:
end_str = event_data["end_datetime"]
all_day = event_data.get("all_day", False)
if all_day:
end_date = dt.datetime.fromisoformat(
end_str.split("T")[0]
).date()
component["DTEND"] = end_date
else:
end_dt = dt.datetime.fromisoformat(
end_str.replace("Z", "+00:00")
)
component["DTEND"] = end_dt
# Handle categories
if "categories" in event_data:
categories = event_data["categories"]
if categories:
component["CATEGORIES"] = categories.split(",")
# Handle recurrence
if "recurring" in event_data:
if event_data["recurring"] and "recurrence_rule" in event_data:
recurrence_rule = event_data["recurrence_rule"]
if recurrence_rule:
component["RRULE"] = vRecur.from_ical(recurrence_rule)
elif not event_data["recurring"]:
# Remove recurrence if set to False
if "RRULE" in component:
del component["RRULE"]
# Handle attendees
if "attendees" in event_data:
attendees = event_data["attendees"]
# Remove existing attendees
component.pop("ATTENDEE", None)
if attendees:
for email in attendees.split(","):
if email.strip():
component.add("ATTENDEE", f"mailto:{email.strip()}")
# Update timestamps in proper iCal format
from icalendar import vDDDTypes
now = dt.datetime.now(dt.UTC)
component["LAST-MODIFIED"] = vDDDTypes(now)
component["DTSTAMP"] = vDDDTypes(now)
# Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.)
# by not touching them - they remain in the component
break
return cal.to_ical().decode("utf-8")
except Exception as e:
logger.error(f"Error merging iCal properties: {e}")
# Fallback to creating new iCal
return self._create_ical_event(event_data, event_uid)
+201
View File
@@ -143,6 +143,50 @@ class ContactsClient(BaseNextcloudClient):
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
await self._make_request("DELETE", url)
async def update_contact(
self, *, addressbook: str, uid: str, contact_data: dict, etag: str = ""
):
"""Update an existing contact while preserving all existing properties."""
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
# Get raw vCard content to preserve all properties including extended ones
raw_vcard_content = ""
if not etag:
try:
raw_vcard_content, current_etag = await self._get_raw_vcard(
addressbook, uid
)
etag = current_etag
except Exception:
# Fall back to creating new vCard if we can't get existing
logger.warning(
f"Could not fetch existing vCard for {uid}, creating new"
)
raw_vcard_content = ""
# Create updated vCard preserving existing properties
if raw_vcard_content:
vcard_content = self._merge_vcard_properties(
raw_vcard_content, contact_data, uid
)
else:
# Fallback to creating new vCard if we couldn't get existing
contact = Contact(fn=contact_data.get("fn"), uid=uid)
if "email" in contact_data:
contact.email = [{"value": contact_data["email"], "type": ["HOME"]}]
if "tel" in contact_data:
contact.tel = [{"value": contact_data["tel"], "type": ["HOME"]}]
vcard_content = contact.to_vcard()
headers = {
"Content-Type": "text/vcard; charset=utf-8",
}
if etag:
headers["If-Match"] = etag
await self._make_request("PUT", url, content=vcard_content, headers=headers)
async def list_contacts(self, *, addressbook: str):
"""List all available contacts for addressbook."""
@@ -233,3 +277,160 @@ class ContactsClient(BaseNextcloudClient):
logger.debug(f"Found {len(contacts)} contacts")
return contacts
async def _get_raw_vcard(self, addressbook: str, uid: str) -> tuple[str, str]:
"""Get raw vCard content for a contact without parsing."""
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
try:
response = await self._make_request("GET", url)
etag = response.headers.get("etag", "")
return response.text, etag
except Exception as e:
logger.error(f"Error getting raw vCard for {uid}: {e}")
raise
def _merge_vcard_properties(
self, raw_vcard: str, contact_data: dict, uid: str
) -> str:
"""Merge new contact data into existing raw vCard while preserving all properties."""
try:
# Instead of using pythonvCard4 which has formatting issues,
# let's do a simple text-based merge to preserve exact formatting
# Start with the original vCard
lines = raw_vcard.strip().split("\n")
updated_lines = []
# Track what we've updated to avoid duplicates
updated_properties = set()
for line in lines:
line = line.strip()
if not line:
continue
# Skip the END:VCARD line for now
if line == "END:VCARD":
continue
property_name = line.split(":")[0].split(";")[0]
# Handle updates for specific properties
if property_name == "FN" and "fn" in contact_data:
updated_lines.append(f"FN:{contact_data['fn']}")
updated_properties.add("fn")
elif property_name == "EMAIL" and "email" in contact_data:
# Replace first email with new one, preserve others
if "email" not in updated_properties:
if isinstance(contact_data["email"], str):
# Try to preserve the original format as much as possible
if ";TYPE=" in line:
type_part = line.split(";TYPE=")[1].split(":")[0]
updated_lines.append(
f"EMAIL;TYPE={type_part}:{contact_data['email']}"
)
else:
updated_lines.append(f"EMAIL:{contact_data['email']}")
updated_properties.add("email")
else:
# Keep additional emails unchanged
updated_lines.append(line)
elif property_name == "TEL" and "tel" in contact_data:
# Similar handling for phone numbers
if "tel" not in updated_properties:
if isinstance(contact_data["tel"], str):
if ";TYPE=" in line:
type_part = line.split(";TYPE=")[1].split(":")[0]
updated_lines.append(
f"TEL;TYPE={type_part}:{contact_data['tel']}"
)
else:
updated_lines.append(f"TEL:{contact_data['tel']}")
updated_properties.add("tel")
else:
# Keep additional phone numbers unchanged
updated_lines.append(line)
elif property_name == "NOTE" and "note" in contact_data:
updated_lines.append(f"NOTE:{contact_data['note']}")
updated_properties.add("note")
elif property_name == "NICKNAME" and "nickname" in contact_data:
nickname_value = contact_data["nickname"]
if isinstance(nickname_value, list):
nickname_value = ",".join(nickname_value)
updated_lines.append(f"NICKNAME:{nickname_value}")
updated_properties.add("nickname")
elif property_name == "BDAY" and "bday" in contact_data:
updated_lines.append(f"BDAY:{contact_data['bday']}")
updated_properties.add("bday")
elif property_name == "CATEGORIES" and "categories" in contact_data:
categories_value = contact_data["categories"]
if isinstance(categories_value, list):
categories_value = ",".join(categories_value)
updated_lines.append(f"CATEGORIES:{categories_value}")
updated_properties.add("categories")
elif property_name == "ORG" and (
"org" in contact_data or "organization" in contact_data
):
org_value = contact_data.get("org") or contact_data.get(
"organization"
)
updated_lines.append(f"ORG:{org_value}")
updated_properties.add("org")
elif property_name == "TITLE" and "title" in contact_data:
updated_lines.append(f"TITLE:{contact_data['title']}")
updated_properties.add("title")
else:
# Keep all other properties unchanged (preserves all extended/custom fields)
updated_lines.append(line)
# Add any new properties that weren't in the original vCard
for key, value in contact_data.items():
if key not in updated_properties:
if key == "fn":
updated_lines.append(f"FN:{value}")
elif key == "email" and isinstance(value, str):
updated_lines.append(f"EMAIL:{value}")
elif key == "tel" and isinstance(value, str):
updated_lines.append(f"TEL:{value}")
elif key == "note":
updated_lines.append(f"NOTE:{value}")
elif key == "nickname":
nickname_value = (
value if isinstance(value, str) else ",".join(value)
)
updated_lines.append(f"NICKNAME:{nickname_value}")
elif key == "bday":
updated_lines.append(f"BDAY:{value}")
elif key == "categories":
categories_value = (
value if isinstance(value, str) else ",".join(value)
)
updated_lines.append(f"CATEGORIES:{categories_value}")
elif key in ["org", "organization"]:
updated_lines.append(f"ORG:{value}")
elif key == "title":
updated_lines.append(f"TITLE:{value}")
# Add the END:VCARD line
updated_lines.append("END:VCARD")
# Join all lines
return "\n".join(updated_lines)
except Exception as e:
logger.error(f"Error merging vCard properties: {e}")
# Fallback to creating basic vCard matching Nextcloud format
basic_vcard = f"""BEGIN:VCARD
VERSION:3.0
UID:{uid}
FN:{contact_data.get("fn", "Unknown")}"""
if "email" in contact_data:
basic_vcard += f"\nEMAIL:{contact_data['email']}"
if "tel" in contact_data:
basic_vcard += f"\nTEL:{contact_data['tel']}"
basic_vcard += "\nEND:VCARD"
return basic_vcard
+17
View File
@@ -63,3 +63,20 @@ def configure_contacts_tools(mcp: FastMCP):
"""Delete a contact."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.delete_contact(addressbook=addressbook, uid=uid)
@mcp.tool()
async def nc_contacts_update_contact(
ctx: Context, *, addressbook: str, uid: str, contact_data: dict, etag: str = ""
):
"""Update an existing contact while preserving all existing properties.
Args:
addressbook: The name of the addressbook containing the contact.
uid: The unique ID of the contact to update.
contact_data: A dictionary with the contact's updated details, e.g. {"fn": "Jane Doe", "email": "jane.doe@example.com"}.
etag: Optional ETag for optimistic concurrency control.
"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.contacts.update_contact(
addressbook=addressbook, uid=uid, contact_data=contact_data, etag=etag
)
@@ -0,0 +1,436 @@
"""Integration tests for CalDAV and CardDAV field preservation.
This test module demonstrates data loss issues when non-supported fields
are present in calendar events and contacts during round-trip operations.
"""
import logging
import pytest
import uuid
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
@pytest.mark.integration
async def test_calendar_event_custom_fields_preservation(nc_client):
"""Test that demonstrates loss of non-supported iCal fields during round-trip operations."""
calendar_name = "personal"
# Create an event with standard fields
event_data = {
"title": "Test Event with Custom Fields",
"description": "Event to test custom field preservation",
"start_datetime": (datetime.now() + timedelta(days=1)).isoformat(),
"end_datetime": (datetime.now() + timedelta(days=1, hours=1)).isoformat(),
"location": "Test Location",
}
# Create the event
result = await nc_client.calendar.create_event(calendar_name, event_data)
event_uid = result["uid"]
try:
# Now manually inject a custom iCal property by creating a new version with raw iCal
# This simulates what would happen if the event was created by another CalDAV client
# with extended properties
custom_ical = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Test Client//EN
BEGIN:VEVENT
UID:{event_uid}
DTSTART:{(datetime.now() + timedelta(days=1)).strftime("%Y%m%dT%H%M%SZ")}
DTEND:{(datetime.now() + timedelta(days=1, hours=1)).strftime("%Y%m%dT%H%M%SZ")}
SUMMARY:Test Event with Custom Fields
DESCRIPTION:Event to test custom field preservation
LOCATION:Test Location
X-CUSTOM-FIELD:This is a custom field that should be preserved
X-VENDOR-SPECIFIC:Vendor specific data
CATEGORIES:work,testing
STATUS:CONFIRMED
PRIORITY:5
CLASS:PUBLIC
CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VEVENT
END:VCALENDAR"""
# Direct CalDAV PUT to inject the custom iCal
event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics"
await nc_client.calendar._make_request(
"PUT",
event_path,
content=custom_ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
logger.info(f"Injected custom iCal properties into event {event_uid}")
# Retrieve the event to confirm custom fields are present in raw iCal
response = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
raw_ical_before = response.text
logger.info("Raw iCal before update:")
logger.info(raw_ical_before)
# Verify custom fields exist in raw iCal
assert (
"X-CUSTOM-FIELD:This is a custom field that should be preserved"
in raw_ical_before
)
assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_before
# Now update the event through the MCP client (simulating normal usage)
update_data = {
"title": "Updated Test Event with Custom Fields",
"description": "Updated description - custom fields should be preserved",
}
await nc_client.calendar.update_event(calendar_name, event_uid, update_data)
logger.info(f"Updated event {event_uid} through MCP client")
# Retrieve the event again to see if custom fields survived
response_after = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
raw_ical_after = response_after.text
logger.info("Raw iCal after update:")
logger.info(raw_ical_after)
# THIS IS THE TEST THAT SHOULD FAIL - custom fields should be preserved but won't be
try:
assert (
"X-CUSTOM-FIELD:This is a custom field that should be preserved"
in raw_ical_after
), "Custom field X-CUSTOM-FIELD was lost during round-trip update"
assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_after, (
"Custom field X-VENDOR-SPECIFIC was lost during round-trip update"
)
logger.info(
"✓ Custom fields were preserved (unexpected - this should fail with current implementation)"
)
except AssertionError as e:
logger.error(f"✗ Custom fields were lost during round-trip update: {e}")
# Re-raise to show the test failure
raise
finally:
# Cleanup
try:
await nc_client.calendar.delete_event(calendar_name, event_uid)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")
@pytest.mark.integration
async def test_contact_extended_fields_preservation(nc_client):
"""Test that demonstrates loss of extended vCard fields during round-trip operations."""
addressbook_name = f"test_preservation_{uuid.uuid4().hex[:8]}"
# Create a temporary addressbook
await nc_client.contacts.create_addressbook(
name=addressbook_name, display_name="Test Preservation Addressbook"
)
try:
contact_uid = str(uuid.uuid4())
# Create a contact with minimal data first
basic_contact_data = {
"fn": "John Extended Doe",
"email": "john.extended@example.com",
}
await nc_client.contacts.create_contact(
addressbook=addressbook_name,
uid=contact_uid,
contact_data=basic_contact_data,
)
logger.info(f"Created basic contact {contact_uid}")
# Now inject a rich vCard with extended fields directly via CardDAV
extended_vcard = f"""BEGIN:VCARD
VERSION:4.0
UID:{contact_uid}
FN:John Extended Doe
N:Doe;John;Extended;;
NICKNAME:Johnny,JD
EMAIL;TYPE=work:john.work@company.com
EMAIL;TYPE=home:john.extended@example.com
TEL;TYPE=cell:+1-555-123-4567
TEL;TYPE=work:+1-555-987-6543
ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA
ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA
ORG:Example Corporation
TITLE:Senior Developer
URL;TYPE=work:https://company.com/john
URL;TYPE=personal:https://johndoe.dev
BDAY:1985-06-15
NOTE:This is a note with important information that should be preserved.
CATEGORIES:colleagues,developers,friends
X-CUSTOM-FIELD:This should be preserved
X-SKYPE:john.doe.skype
X-LINKEDIN:https://linkedin.com/in/johndoe
REV:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VCARD"""
# Direct CardDAV PUT to inject the extended vCard
contact_path = f"/remote.php/dav/addressbooks/users/{nc_client.contacts.username}/{addressbook_name}/{contact_uid}.vcf"
await nc_client.contacts._make_request(
"PUT",
contact_path,
content=extended_vcard,
headers={"Content-Type": "text/vcard; charset=utf-8"},
)
logger.info(f"Injected extended vCard for contact {contact_uid}")
# Retrieve the contact to confirm extended fields are present in raw vCard
response = await nc_client.contacts._make_request("GET", contact_path)
raw_vcard_before = response.text
logger.info("Raw vCard before any operations:")
logger.info(raw_vcard_before)
# Verify extended fields exist in raw vCard
assert "TEL;TYPE=cell:+1-555-123-4567" in raw_vcard_before
assert "ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA" in raw_vcard_before
assert "ORG:Example Corporation" in raw_vcard_before
assert "TITLE:Senior Developer" in raw_vcard_before
assert "X-CUSTOM-FIELD:This should be preserved" in raw_vcard_before
assert "X-LINKEDIN:https://linkedin.com/in/johndoe" in raw_vcard_before
assert "NOTE:This is a note with important information" in raw_vcard_before
# List contacts through the MCP client (this will parse and return limited fields)
contacts = await nc_client.contacts.list_contacts(addressbook=addressbook_name)
our_contact = next((c for c in contacts if c["vcard_id"] == contact_uid), None)
assert our_contact is not None
logger.info("Contact as parsed by MCP client:")
logger.info(our_contact)
# Check what fields are accessible through the parsed contact
parsed_contact = our_contact["contact"]
# These should be available (basic fields that are parsed)
assert parsed_contact["fullname"] == "John Extended Doe"
assert parsed_contact["email"] is not None # Some email should be present
# The raw vCard should still be available in addressdata
raw_addressdata = our_contact["addressdata"]
assert "X-CUSTOM-FIELD:This should be preserved" in raw_addressdata
assert "ORG:Example Corporation" in raw_addressdata
# The key test: Can we update this contact without losing extended field data?
logger.info("Testing contact update preservation...")
# Update the contact through the MCP client with a simple change
try:
await nc_client.contacts.update_contact(
addressbook=addressbook_name,
uid=contact_uid,
contact_data={"email": "john.updated@example.com"},
)
logger.info("✓ Contact updated successfully")
except Exception as e:
logger.error(f"✗ Failed to update contact: {e}")
raise
# Retrieve the contact again to see if extended fields survived
contacts_after = await nc_client.contacts.list_contacts(
addressbook=addressbook_name
)
updated_contact = next(
(c for c in contacts_after if c["vcard_id"] == contact_uid), None
)
assert updated_contact is not None, "Contact not found after update"
updated_addressdata = updated_contact["addressdata"]
logger.info("Raw vCard after contact update:")
logger.info(updated_addressdata)
# THIS IS THE CRITICAL TEST - extended fields should be preserved during updates
extended_field_checks = [
("ORG:Example Corporation", "organization field"),
("TITLE:Senior Developer", "title field"),
("TEL;TYPE=cell:+1-555-123-4567", "cell phone"),
("TEL;TYPE=work:+1-555-987-6543", "work phone"),
("ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA", "home address"),
("ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA", "work address"),
("URL;TYPE=work;VALUE=URI:https://company.com/john", "work URL"),
("NOTE:This is a note with important information", "note field"),
("CATEGORIES:colleagues,developers,friends", "categories"),
("X-CUSTOM-FIELD:This should be preserved", "custom field"),
("X-LINKEDIN:https://linkedin.com/in/johndoe", "LinkedIn custom field"),
("john.updated@example.com", "updated email"),
]
all_preserved = True
for field_pattern, field_name in extended_field_checks:
if field_pattern in updated_addressdata:
logger.info(f"{field_name} preserved")
else:
logger.error(f"{field_name} was lost during update")
all_preserved = False
# The test should PASS - field preservation should work
assert all_preserved, (
"Contact update lost extended field data - this indicates the preservation mechanism failed"
)
logger.info("🎉 SUCCESS: All extended fields preserved during contact update!")
finally:
# Cleanup
try:
await nc_client.contacts.delete_addressbook(name=addressbook_name)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup addressbook {addressbook_name}: {cleanup_error}"
)
@pytest.mark.integration
async def test_calendar_event_roundtrip_data_loss_demonstration(nc_client):
"""Demonstrates specific data loss scenarios in calendar events."""
calendar_name = "personal"
event_data = {
"title": "Roundtrip Test Event",
"description": "Testing data preservation",
"start_datetime": (datetime.now() + timedelta(days=2)).isoformat(),
"end_datetime": (datetime.now() + timedelta(days=2, hours=1)).isoformat(),
}
result = await nc_client.calendar.create_event(calendar_name, event_data)
event_uid = result["uid"]
try:
# Inject additional iCal properties that are valid but not supported by our parser
extended_ical = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Extended Client//EN
BEGIN:VEVENT
UID:{event_uid}
DTSTART:{(datetime.now() + timedelta(days=2)).strftime("%Y%m%dT%H%M%SZ")}
DTEND:{(datetime.now() + timedelta(days=2, hours=1)).strftime("%Y%m%dT%H%M%SZ")}
SUMMARY:Roundtrip Test Event
DESCRIPTION:Testing data preservation
STATUS:CONFIRMED
PRIORITY:5
CLASS:PUBLIC
SEQUENCE:1
X-MICROSOFT-CDO-ALLDAYEVENT:FALSE
X-MICROSOFT-CDO-IMPORTANCE:1
X-CUSTOM-MEETING-ID:12345-67890
X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890
ORGANIZER;CN=Test Organizer:mailto:organizer@example.com
COMMENT:This is a comment that should be preserved
LOCATION:Conference Room A
GEO:40.7128;-74.0060
TRANSP:OPAQUE
CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")}
END:VEVENT
END:VCALENDAR"""
# Inject the extended iCal
event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics"
await nc_client.calendar._make_request(
"PUT",
event_path,
content=extended_ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
# Verify extended properties are present
response = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
original_ical = response.text
# Confirm extended properties exist
extended_properties = [
"SEQUENCE:1",
"X-MICROSOFT-CDO-ALLDAYEVENT:FALSE",
"X-CUSTOM-MEETING-ID:12345-67890",
"X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890",
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com",
"COMMENT:This is a comment that should be preserved",
"GEO:40.7128;-74.0060",
"TRANSP:OPAQUE",
]
# More flexible patterns for properties that might be reformatted
flexible_patterns = {
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com": [
"ORGANIZER;CN=Test Organizer:mailto:organizer@example.com",
'ORGANIZER;CN="Test Organizer":mailto:organizer@example.com',
],
"GEO:40.7128;-74.0060": [
"GEO:40.7128;-74.0060",
"GEO:40.7128;-74.006", # May lose trailing zero
],
}
for prop in extended_properties:
assert prop in original_ical, (
f"Extended property {prop} not found in original iCal"
)
logger.info("✓ All extended properties confirmed in original iCal")
# Now perform a simple update through MCP
update_data = {"location": "Conference Room B"} # Simple location change
await nc_client.calendar.update_event(calendar_name, event_uid, update_data)
# Check what survived the round-trip
response_after = await nc_client.calendar._make_request(
"GET", event_path, headers={"Accept": "text/calendar"}
)
updated_ical = response_after.text
logger.info("Checking which properties survived the update...")
# Check which extended properties survived
survived = []
lost = []
for prop in extended_properties:
# Check if this property has flexible patterns
if prop in flexible_patterns:
# Check if any of the flexible patterns match
found = any(
pattern in updated_ical for pattern in flexible_patterns[prop]
)
if found:
survived.append(prop)
else:
lost.append(prop)
else:
# Standard exact match
if prop in updated_ical:
survived.append(prop)
else:
lost.append(prop)
logger.info(f"Properties that SURVIVED: {survived}")
logger.error(f"Properties that were LOST: {lost}")
# This test should fail - we expect data loss
assert len(lost) == 0, (
f"Round-trip update lost {len(lost)} extended properties: {lost}"
)
finally:
try:
await nc_client.calendar.delete_event(calendar_name, event_uid)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")