From 4cf5f2a95a70885c53c7d01a750323f5dea33971 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 30 Aug 2025 19:14:26 +0200 Subject: [PATCH] feat(client): Preserve fields when modifying contacts/calendar resources --- nextcloud_mcp_server/client/calendar.py | 147 ++++++- nextcloud_mcp_server/client/contacts.py | 201 +++++++++ nextcloud_mcp_server/server/contacts.py | 17 + tests/integration/test_field_preservation.py | 436 +++++++++++++++++++ 4 files changed, 790 insertions(+), 11 deletions(-) create mode 100644 tests/integration/test_field_preservation.py diff --git a/nextcloud_mcp_server/client/calendar.py b/nextcloud_mcp_server/client/calendar.py index 1057b80..98830d3 100644 --- a/nextcloud_mcp_server/client/calendar.py +++ b/nextcloud_mcp_server/client/calendar.py @@ -238,27 +238,33 @@ class CalendarClient(BaseNextcloudClient): event_data: Dict[str, Any], etag: str = "", ) -> Dict[str, Any]: - """Update an existing calendar event.""" + """Update an existing calendar event while preserving all existing properties.""" event_filename = f"{event_uid}.ics" event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" - # Get existing event data to merge with updates - existing_event_data = {} + # Get raw iCal content to preserve all properties including extended ones + raw_ical_content = "" if not etag: try: - existing_event_data, current_etag = await self.get_event( + raw_ical_content, current_etag = await self._get_raw_ical( calendar_name, event_uid ) etag = current_etag except Exception: - # Continue without etag if we can't get it - pass + # Fall back to creating new iCal if we can't get existing + logger.warning( + f"Could not fetch existing iCal for {event_uid}, creating new" + ) + raw_ical_content = "" - # Merge existing data with new data (new data takes precedence) - merged_data = {**existing_event_data, **event_data} - - # Create updated iCalendar event - ical_content = self._create_ical_event(merged_data, event_uid) + # Create updated iCalendar event preserving existing properties + if raw_ical_content: + ical_content = self._merge_ical_properties( + raw_ical_content, event_data, event_uid + ) + else: + # Fallback to creating new iCal if we couldn't get existing + ical_content = self._create_ical_event(event_data, event_uid) headers = { "Content-Type": "text/calendar; charset=utf-8", @@ -949,3 +955,122 @@ class CalendarClient(BaseNextcloudClient): except Exception as e: logger.error(f"Error deleting calendar {calendar_name}: {e}") raise + + async def _get_raw_ical( + self, calendar_name: str, event_uid: str + ) -> Tuple[str, str]: + """Get raw iCal content for an event without parsing.""" + event_filename = f"{event_uid}.ics" + event_path = f"{self._get_caldav_base_path()}/{calendar_name}/{event_filename}" + + headers = {"Accept": "text/calendar"} + + try: + response = await self._make_request("GET", event_path, headers=headers) + etag = response.headers.get("etag", "") + return response.text, etag + except Exception as e: + logger.error(f"Error getting raw iCal for {event_uid}: {e}") + raise + + def _merge_ical_properties( + self, raw_ical: str, event_data: Dict[str, Any], event_uid: str + ) -> str: + """Merge new event data into existing raw iCal while preserving all properties.""" + try: + # Parse existing iCal + cal = Calendar.from_ical(raw_ical) + + # Find the VEVENT component + for component in cal.walk(): + if component.name == "VEVENT": + # Update only the properties that were provided in event_data + if "title" in event_data: + component["SUMMARY"] = event_data["title"] + if "description" in event_data: + component["DESCRIPTION"] = event_data["description"] + if "location" in event_data: + component["LOCATION"] = event_data["location"] + if "status" in event_data: + component["STATUS"] = event_data["status"].upper() + if "priority" in event_data: + component["PRIORITY"] = event_data["priority"] + if "privacy" in event_data: + component["CLASS"] = event_data["privacy"].upper() + if "url" in event_data: + component["URL"] = event_data["url"] + + # Handle dates + if "start_datetime" in event_data: + start_str = event_data["start_datetime"] + all_day = event_data.get("all_day", False) + if all_day: + start_date = dt.datetime.fromisoformat( + start_str.split("T")[0] + ).date() + component["DTSTART"] = start_date + else: + start_dt = dt.datetime.fromisoformat( + start_str.replace("Z", "+00:00") + ) + component["DTSTART"] = start_dt + + if "end_datetime" in event_data: + end_str = event_data["end_datetime"] + all_day = event_data.get("all_day", False) + if all_day: + end_date = dt.datetime.fromisoformat( + end_str.split("T")[0] + ).date() + component["DTEND"] = end_date + else: + end_dt = dt.datetime.fromisoformat( + end_str.replace("Z", "+00:00") + ) + component["DTEND"] = end_dt + + # Handle categories + if "categories" in event_data: + categories = event_data["categories"] + if categories: + component["CATEGORIES"] = categories.split(",") + + # Handle recurrence + if "recurring" in event_data: + if event_data["recurring"] and "recurrence_rule" in event_data: + recurrence_rule = event_data["recurrence_rule"] + if recurrence_rule: + component["RRULE"] = vRecur.from_ical(recurrence_rule) + elif not event_data["recurring"]: + # Remove recurrence if set to False + if "RRULE" in component: + del component["RRULE"] + + # Handle attendees + if "attendees" in event_data: + attendees = event_data["attendees"] + # Remove existing attendees + component.pop("ATTENDEE", None) + if attendees: + for email in attendees.split(","): + if email.strip(): + component.add("ATTENDEE", f"mailto:{email.strip()}") + + # Update timestamps in proper iCal format + from icalendar import vDDDTypes + + now = dt.datetime.now(dt.UTC) + component["LAST-MODIFIED"] = vDDDTypes(now) + component["DTSTAMP"] = vDDDTypes(now) + + # Preserve all other existing properties (X-*, ORGANIZER, COMMENT, GEO, etc.) + # by not touching them - they remain in the component + + break + + return cal.to_ical().decode("utf-8") + + except Exception as e: + logger.error(f"Error merging iCal properties: {e}") + # Fallback to creating new iCal + return self._create_ical_event(event_data, event_uid) diff --git a/nextcloud_mcp_server/client/contacts.py b/nextcloud_mcp_server/client/contacts.py index 11ecc3f..460a884 100644 --- a/nextcloud_mcp_server/client/contacts.py +++ b/nextcloud_mcp_server/client/contacts.py @@ -143,6 +143,50 @@ class ContactsClient(BaseNextcloudClient): url = f"{carddav_path}/{addressbook}/{uid}.vcf" await self._make_request("DELETE", url) + async def update_contact( + self, *, addressbook: str, uid: str, contact_data: dict, etag: str = "" + ): + """Update an existing contact while preserving all existing properties.""" + carddav_path = self._get_carddav_base_path() + url = f"{carddav_path}/{addressbook}/{uid}.vcf" + + # Get raw vCard content to preserve all properties including extended ones + raw_vcard_content = "" + if not etag: + try: + raw_vcard_content, current_etag = await self._get_raw_vcard( + addressbook, uid + ) + etag = current_etag + except Exception: + # Fall back to creating new vCard if we can't get existing + logger.warning( + f"Could not fetch existing vCard for {uid}, creating new" + ) + raw_vcard_content = "" + + # Create updated vCard preserving existing properties + if raw_vcard_content: + vcard_content = self._merge_vcard_properties( + raw_vcard_content, contact_data, uid + ) + else: + # Fallback to creating new vCard if we couldn't get existing + contact = Contact(fn=contact_data.get("fn"), uid=uid) + if "email" in contact_data: + contact.email = [{"value": contact_data["email"], "type": ["HOME"]}] + if "tel" in contact_data: + contact.tel = [{"value": contact_data["tel"], "type": ["HOME"]}] + vcard_content = contact.to_vcard() + + headers = { + "Content-Type": "text/vcard; charset=utf-8", + } + if etag: + headers["If-Match"] = etag + + await self._make_request("PUT", url, content=vcard_content, headers=headers) + async def list_contacts(self, *, addressbook: str): """List all available contacts for addressbook.""" @@ -233,3 +277,160 @@ class ContactsClient(BaseNextcloudClient): logger.debug(f"Found {len(contacts)} contacts") return contacts + + async def _get_raw_vcard(self, addressbook: str, uid: str) -> tuple[str, str]: + """Get raw vCard content for a contact without parsing.""" + carddav_path = self._get_carddav_base_path() + url = f"{carddav_path}/{addressbook}/{uid}.vcf" + + try: + response = await self._make_request("GET", url) + etag = response.headers.get("etag", "") + return response.text, etag + except Exception as e: + logger.error(f"Error getting raw vCard for {uid}: {e}") + raise + + def _merge_vcard_properties( + self, raw_vcard: str, contact_data: dict, uid: str + ) -> str: + """Merge new contact data into existing raw vCard while preserving all properties.""" + try: + # Instead of using pythonvCard4 which has formatting issues, + # let's do a simple text-based merge to preserve exact formatting + + # Start with the original vCard + lines = raw_vcard.strip().split("\n") + updated_lines = [] + + # Track what we've updated to avoid duplicates + updated_properties = set() + + for line in lines: + line = line.strip() + if not line: + continue + + # Skip the END:VCARD line for now + if line == "END:VCARD": + continue + + property_name = line.split(":")[0].split(";")[0] + + # Handle updates for specific properties + if property_name == "FN" and "fn" in contact_data: + updated_lines.append(f"FN:{contact_data['fn']}") + updated_properties.add("fn") + elif property_name == "EMAIL" and "email" in contact_data: + # Replace first email with new one, preserve others + if "email" not in updated_properties: + if isinstance(contact_data["email"], str): + # Try to preserve the original format as much as possible + if ";TYPE=" in line: + type_part = line.split(";TYPE=")[1].split(":")[0] + updated_lines.append( + f"EMAIL;TYPE={type_part}:{contact_data['email']}" + ) + else: + updated_lines.append(f"EMAIL:{contact_data['email']}") + updated_properties.add("email") + else: + # Keep additional emails unchanged + updated_lines.append(line) + elif property_name == "TEL" and "tel" in contact_data: + # Similar handling for phone numbers + if "tel" not in updated_properties: + if isinstance(contact_data["tel"], str): + if ";TYPE=" in line: + type_part = line.split(";TYPE=")[1].split(":")[0] + updated_lines.append( + f"TEL;TYPE={type_part}:{contact_data['tel']}" + ) + else: + updated_lines.append(f"TEL:{contact_data['tel']}") + updated_properties.add("tel") + else: + # Keep additional phone numbers unchanged + updated_lines.append(line) + elif property_name == "NOTE" and "note" in contact_data: + updated_lines.append(f"NOTE:{contact_data['note']}") + updated_properties.add("note") + elif property_name == "NICKNAME" and "nickname" in contact_data: + nickname_value = contact_data["nickname"] + if isinstance(nickname_value, list): + nickname_value = ",".join(nickname_value) + updated_lines.append(f"NICKNAME:{nickname_value}") + updated_properties.add("nickname") + elif property_name == "BDAY" and "bday" in contact_data: + updated_lines.append(f"BDAY:{contact_data['bday']}") + updated_properties.add("bday") + elif property_name == "CATEGORIES" and "categories" in contact_data: + categories_value = contact_data["categories"] + if isinstance(categories_value, list): + categories_value = ",".join(categories_value) + updated_lines.append(f"CATEGORIES:{categories_value}") + updated_properties.add("categories") + elif property_name == "ORG" and ( + "org" in contact_data or "organization" in contact_data + ): + org_value = contact_data.get("org") or contact_data.get( + "organization" + ) + updated_lines.append(f"ORG:{org_value}") + updated_properties.add("org") + elif property_name == "TITLE" and "title" in contact_data: + updated_lines.append(f"TITLE:{contact_data['title']}") + updated_properties.add("title") + else: + # Keep all other properties unchanged (preserves all extended/custom fields) + updated_lines.append(line) + + # Add any new properties that weren't in the original vCard + for key, value in contact_data.items(): + if key not in updated_properties: + if key == "fn": + updated_lines.append(f"FN:{value}") + elif key == "email" and isinstance(value, str): + updated_lines.append(f"EMAIL:{value}") + elif key == "tel" and isinstance(value, str): + updated_lines.append(f"TEL:{value}") + elif key == "note": + updated_lines.append(f"NOTE:{value}") + elif key == "nickname": + nickname_value = ( + value if isinstance(value, str) else ",".join(value) + ) + updated_lines.append(f"NICKNAME:{nickname_value}") + elif key == "bday": + updated_lines.append(f"BDAY:{value}") + elif key == "categories": + categories_value = ( + value if isinstance(value, str) else ",".join(value) + ) + updated_lines.append(f"CATEGORIES:{categories_value}") + elif key in ["org", "organization"]: + updated_lines.append(f"ORG:{value}") + elif key == "title": + updated_lines.append(f"TITLE:{value}") + + # Add the END:VCARD line + updated_lines.append("END:VCARD") + + # Join all lines + return "\n".join(updated_lines) + + except Exception as e: + logger.error(f"Error merging vCard properties: {e}") + # Fallback to creating basic vCard matching Nextcloud format + basic_vcard = f"""BEGIN:VCARD +VERSION:3.0 +UID:{uid} +FN:{contact_data.get("fn", "Unknown")}""" + + if "email" in contact_data: + basic_vcard += f"\nEMAIL:{contact_data['email']}" + if "tel" in contact_data: + basic_vcard += f"\nTEL:{contact_data['tel']}" + + basic_vcard += "\nEND:VCARD" + return basic_vcard diff --git a/nextcloud_mcp_server/server/contacts.py b/nextcloud_mcp_server/server/contacts.py index 370219e..78a63ef 100644 --- a/nextcloud_mcp_server/server/contacts.py +++ b/nextcloud_mcp_server/server/contacts.py @@ -63,3 +63,20 @@ def configure_contacts_tools(mcp: FastMCP): """Delete a contact.""" client: NextcloudClient = ctx.request_context.lifespan_context.client return await client.contacts.delete_contact(addressbook=addressbook, uid=uid) + + @mcp.tool() + async def nc_contacts_update_contact( + ctx: Context, *, addressbook: str, uid: str, contact_data: dict, etag: str = "" + ): + """Update an existing contact while preserving all existing properties. + + Args: + addressbook: The name of the addressbook containing the contact. + uid: The unique ID of the contact to update. + contact_data: A dictionary with the contact's updated details, e.g. {"fn": "Jane Doe", "email": "jane.doe@example.com"}. + etag: Optional ETag for optimistic concurrency control. + """ + client: NextcloudClient = ctx.request_context.lifespan_context.client + return await client.contacts.update_contact( + addressbook=addressbook, uid=uid, contact_data=contact_data, etag=etag + ) diff --git a/tests/integration/test_field_preservation.py b/tests/integration/test_field_preservation.py new file mode 100644 index 0000000..62bb473 --- /dev/null +++ b/tests/integration/test_field_preservation.py @@ -0,0 +1,436 @@ +"""Integration tests for CalDAV and CardDAV field preservation. + +This test module demonstrates data loss issues when non-supported fields +are present in calendar events and contacts during round-trip operations. +""" + +import logging +import pytest +import uuid +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +async def test_calendar_event_custom_fields_preservation(nc_client): + """Test that demonstrates loss of non-supported iCal fields during round-trip operations.""" + calendar_name = "personal" + + # Create an event with standard fields + event_data = { + "title": "Test Event with Custom Fields", + "description": "Event to test custom field preservation", + "start_datetime": (datetime.now() + timedelta(days=1)).isoformat(), + "end_datetime": (datetime.now() + timedelta(days=1, hours=1)).isoformat(), + "location": "Test Location", + } + + # Create the event + result = await nc_client.calendar.create_event(calendar_name, event_data) + event_uid = result["uid"] + + try: + # Now manually inject a custom iCal property by creating a new version with raw iCal + # This simulates what would happen if the event was created by another CalDAV client + # with extended properties + custom_ical = f"""BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//Test Client//EN +BEGIN:VEVENT +UID:{event_uid} +DTSTART:{(datetime.now() + timedelta(days=1)).strftime("%Y%m%dT%H%M%SZ")} +DTEND:{(datetime.now() + timedelta(days=1, hours=1)).strftime("%Y%m%dT%H%M%SZ")} +SUMMARY:Test Event with Custom Fields +DESCRIPTION:Event to test custom field preservation +LOCATION:Test Location +X-CUSTOM-FIELD:This is a custom field that should be preserved +X-VENDOR-SPECIFIC:Vendor specific data +CATEGORIES:work,testing +STATUS:CONFIRMED +PRIORITY:5 +CLASS:PUBLIC +CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +END:VEVENT +END:VCALENDAR""" + + # Direct CalDAV PUT to inject the custom iCal + event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics" + await nc_client.calendar._make_request( + "PUT", + event_path, + content=custom_ical, + headers={"Content-Type": "text/calendar; charset=utf-8"}, + ) + + logger.info(f"Injected custom iCal properties into event {event_uid}") + + # Retrieve the event to confirm custom fields are present in raw iCal + response = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + raw_ical_before = response.text + + logger.info("Raw iCal before update:") + logger.info(raw_ical_before) + + # Verify custom fields exist in raw iCal + assert ( + "X-CUSTOM-FIELD:This is a custom field that should be preserved" + in raw_ical_before + ) + assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_before + + # Now update the event through the MCP client (simulating normal usage) + update_data = { + "title": "Updated Test Event with Custom Fields", + "description": "Updated description - custom fields should be preserved", + } + + await nc_client.calendar.update_event(calendar_name, event_uid, update_data) + logger.info(f"Updated event {event_uid} through MCP client") + + # Retrieve the event again to see if custom fields survived + response_after = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + raw_ical_after = response_after.text + + logger.info("Raw iCal after update:") + logger.info(raw_ical_after) + + # THIS IS THE TEST THAT SHOULD FAIL - custom fields should be preserved but won't be + try: + assert ( + "X-CUSTOM-FIELD:This is a custom field that should be preserved" + in raw_ical_after + ), "Custom field X-CUSTOM-FIELD was lost during round-trip update" + assert "X-VENDOR-SPECIFIC:Vendor specific data" in raw_ical_after, ( + "Custom field X-VENDOR-SPECIFIC was lost during round-trip update" + ) + logger.info( + "✓ Custom fields were preserved (unexpected - this should fail with current implementation)" + ) + except AssertionError as e: + logger.error(f"✗ Custom fields were lost during round-trip update: {e}") + # Re-raise to show the test failure + raise + + finally: + # Cleanup + try: + await nc_client.calendar.delete_event(calendar_name, event_uid) + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}") + + +@pytest.mark.integration +async def test_contact_extended_fields_preservation(nc_client): + """Test that demonstrates loss of extended vCard fields during round-trip operations.""" + addressbook_name = f"test_preservation_{uuid.uuid4().hex[:8]}" + + # Create a temporary addressbook + await nc_client.contacts.create_addressbook( + name=addressbook_name, display_name="Test Preservation Addressbook" + ) + + try: + contact_uid = str(uuid.uuid4()) + + # Create a contact with minimal data first + basic_contact_data = { + "fn": "John Extended Doe", + "email": "john.extended@example.com", + } + + await nc_client.contacts.create_contact( + addressbook=addressbook_name, + uid=contact_uid, + contact_data=basic_contact_data, + ) + + logger.info(f"Created basic contact {contact_uid}") + + # Now inject a rich vCard with extended fields directly via CardDAV + extended_vcard = f"""BEGIN:VCARD +VERSION:4.0 +UID:{contact_uid} +FN:John Extended Doe +N:Doe;John;Extended;; +NICKNAME:Johnny,JD +EMAIL;TYPE=work:john.work@company.com +EMAIL;TYPE=home:john.extended@example.com +TEL;TYPE=cell:+1-555-123-4567 +TEL;TYPE=work:+1-555-987-6543 +ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA +ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA +ORG:Example Corporation +TITLE:Senior Developer +URL;TYPE=work:https://company.com/john +URL;TYPE=personal:https://johndoe.dev +BDAY:1985-06-15 +NOTE:This is a note with important information that should be preserved. +CATEGORIES:colleagues,developers,friends +X-CUSTOM-FIELD:This should be preserved +X-SKYPE:john.doe.skype +X-LINKEDIN:https://linkedin.com/in/johndoe +REV:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +END:VCARD""" + + # Direct CardDAV PUT to inject the extended vCard + contact_path = f"/remote.php/dav/addressbooks/users/{nc_client.contacts.username}/{addressbook_name}/{contact_uid}.vcf" + await nc_client.contacts._make_request( + "PUT", + contact_path, + content=extended_vcard, + headers={"Content-Type": "text/vcard; charset=utf-8"}, + ) + + logger.info(f"Injected extended vCard for contact {contact_uid}") + + # Retrieve the contact to confirm extended fields are present in raw vCard + response = await nc_client.contacts._make_request("GET", contact_path) + raw_vcard_before = response.text + + logger.info("Raw vCard before any operations:") + logger.info(raw_vcard_before) + + # Verify extended fields exist in raw vCard + assert "TEL;TYPE=cell:+1-555-123-4567" in raw_vcard_before + assert "ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA" in raw_vcard_before + assert "ORG:Example Corporation" in raw_vcard_before + assert "TITLE:Senior Developer" in raw_vcard_before + assert "X-CUSTOM-FIELD:This should be preserved" in raw_vcard_before + assert "X-LINKEDIN:https://linkedin.com/in/johndoe" in raw_vcard_before + assert "NOTE:This is a note with important information" in raw_vcard_before + + # List contacts through the MCP client (this will parse and return limited fields) + contacts = await nc_client.contacts.list_contacts(addressbook=addressbook_name) + our_contact = next((c for c in contacts if c["vcard_id"] == contact_uid), None) + + assert our_contact is not None + logger.info("Contact as parsed by MCP client:") + logger.info(our_contact) + + # Check what fields are accessible through the parsed contact + parsed_contact = our_contact["contact"] + + # These should be available (basic fields that are parsed) + assert parsed_contact["fullname"] == "John Extended Doe" + assert parsed_contact["email"] is not None # Some email should be present + + # The raw vCard should still be available in addressdata + raw_addressdata = our_contact["addressdata"] + assert "X-CUSTOM-FIELD:This should be preserved" in raw_addressdata + assert "ORG:Example Corporation" in raw_addressdata + + # The key test: Can we update this contact without losing extended field data? + logger.info("Testing contact update preservation...") + + # Update the contact through the MCP client with a simple change + try: + await nc_client.contacts.update_contact( + addressbook=addressbook_name, + uid=contact_uid, + contact_data={"email": "john.updated@example.com"}, + ) + logger.info("✓ Contact updated successfully") + except Exception as e: + logger.error(f"✗ Failed to update contact: {e}") + raise + + # Retrieve the contact again to see if extended fields survived + contacts_after = await nc_client.contacts.list_contacts( + addressbook=addressbook_name + ) + updated_contact = next( + (c for c in contacts_after if c["vcard_id"] == contact_uid), None + ) + + assert updated_contact is not None, "Contact not found after update" + updated_addressdata = updated_contact["addressdata"] + + logger.info("Raw vCard after contact update:") + logger.info(updated_addressdata) + + # THIS IS THE CRITICAL TEST - extended fields should be preserved during updates + extended_field_checks = [ + ("ORG:Example Corporation", "organization field"), + ("TITLE:Senior Developer", "title field"), + ("TEL;TYPE=cell:+1-555-123-4567", "cell phone"), + ("TEL;TYPE=work:+1-555-987-6543", "work phone"), + ("ADR;TYPE=home:;;123 Main St;Hometown;ST;12345;USA", "home address"), + ("ADR;TYPE=work:;;456 Work Ave;Worktown;ST;54321;USA", "work address"), + ("URL;TYPE=work;VALUE=URI:https://company.com/john", "work URL"), + ("NOTE:This is a note with important information", "note field"), + ("CATEGORIES:colleagues,developers,friends", "categories"), + ("X-CUSTOM-FIELD:This should be preserved", "custom field"), + ("X-LINKEDIN:https://linkedin.com/in/johndoe", "LinkedIn custom field"), + ("john.updated@example.com", "updated email"), + ] + + all_preserved = True + for field_pattern, field_name in extended_field_checks: + if field_pattern in updated_addressdata: + logger.info(f"✓ {field_name} preserved") + else: + logger.error(f"✗ {field_name} was lost during update") + all_preserved = False + + # The test should PASS - field preservation should work + assert all_preserved, ( + "Contact update lost extended field data - this indicates the preservation mechanism failed" + ) + + logger.info("🎉 SUCCESS: All extended fields preserved during contact update!") + + finally: + # Cleanup + try: + await nc_client.contacts.delete_addressbook(name=addressbook_name) + except Exception as cleanup_error: + logger.warning( + f"Failed to cleanup addressbook {addressbook_name}: {cleanup_error}" + ) + + +@pytest.mark.integration +async def test_calendar_event_roundtrip_data_loss_demonstration(nc_client): + """Demonstrates specific data loss scenarios in calendar events.""" + calendar_name = "personal" + + event_data = { + "title": "Roundtrip Test Event", + "description": "Testing data preservation", + "start_datetime": (datetime.now() + timedelta(days=2)).isoformat(), + "end_datetime": (datetime.now() + timedelta(days=2, hours=1)).isoformat(), + } + + result = await nc_client.calendar.create_event(calendar_name, event_data) + event_uid = result["uid"] + + try: + # Inject additional iCal properties that are valid but not supported by our parser + extended_ical = f"""BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//Extended Client//EN +BEGIN:VEVENT +UID:{event_uid} +DTSTART:{(datetime.now() + timedelta(days=2)).strftime("%Y%m%dT%H%M%SZ")} +DTEND:{(datetime.now() + timedelta(days=2, hours=1)).strftime("%Y%m%dT%H%M%SZ")} +SUMMARY:Roundtrip Test Event +DESCRIPTION:Testing data preservation +STATUS:CONFIRMED +PRIORITY:5 +CLASS:PUBLIC +SEQUENCE:1 +X-MICROSOFT-CDO-ALLDAYEVENT:FALSE +X-MICROSOFT-CDO-IMPORTANCE:1 +X-CUSTOM-MEETING-ID:12345-67890 +X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890 +ORGANIZER;CN=Test Organizer:mailto:organizer@example.com +COMMENT:This is a comment that should be preserved +LOCATION:Conference Room A +GEO:40.7128;-74.0060 +TRANSP:OPAQUE +CREATED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +DTSTAMP:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +LAST-MODIFIED:{datetime.now().strftime("%Y%m%dT%H%M%SZ")} +END:VEVENT +END:VCALENDAR""" + + # Inject the extended iCal + event_path = f"/remote.php/dav/calendars/{nc_client.calendar.username}/{calendar_name}/{event_uid}.ics" + await nc_client.calendar._make_request( + "PUT", + event_path, + content=extended_ical, + headers={"Content-Type": "text/calendar; charset=utf-8"}, + ) + + # Verify extended properties are present + response = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + original_ical = response.text + + # Confirm extended properties exist + extended_properties = [ + "SEQUENCE:1", + "X-MICROSOFT-CDO-ALLDAYEVENT:FALSE", + "X-CUSTOM-MEETING-ID:12345-67890", + "X-ZOOM-MEETING-URL:https://zoom.us/j/1234567890", + "ORGANIZER;CN=Test Organizer:mailto:organizer@example.com", + "COMMENT:This is a comment that should be preserved", + "GEO:40.7128;-74.0060", + "TRANSP:OPAQUE", + ] + + # More flexible patterns for properties that might be reformatted + flexible_patterns = { + "ORGANIZER;CN=Test Organizer:mailto:organizer@example.com": [ + "ORGANIZER;CN=Test Organizer:mailto:organizer@example.com", + 'ORGANIZER;CN="Test Organizer":mailto:organizer@example.com', + ], + "GEO:40.7128;-74.0060": [ + "GEO:40.7128;-74.0060", + "GEO:40.7128;-74.006", # May lose trailing zero + ], + } + + for prop in extended_properties: + assert prop in original_ical, ( + f"Extended property {prop} not found in original iCal" + ) + + logger.info("✓ All extended properties confirmed in original iCal") + + # Now perform a simple update through MCP + update_data = {"location": "Conference Room B"} # Simple location change + await nc_client.calendar.update_event(calendar_name, event_uid, update_data) + + # Check what survived the round-trip + response_after = await nc_client.calendar._make_request( + "GET", event_path, headers={"Accept": "text/calendar"} + ) + updated_ical = response_after.text + + logger.info("Checking which properties survived the update...") + + # Check which extended properties survived + survived = [] + lost = [] + + for prop in extended_properties: + # Check if this property has flexible patterns + if prop in flexible_patterns: + # Check if any of the flexible patterns match + found = any( + pattern in updated_ical for pattern in flexible_patterns[prop] + ) + if found: + survived.append(prop) + else: + lost.append(prop) + else: + # Standard exact match + if prop in updated_ical: + survived.append(prop) + else: + lost.append(prop) + + logger.info(f"Properties that SURVIVED: {survived}") + logger.error(f"Properties that were LOST: {lost}") + + # This test should fail - we expect data loss + assert len(lost) == 0, ( + f"Round-trip update lost {len(lost)} extended properties: {lost}" + ) + + finally: + try: + await nc_client.calendar.delete_event(calendar_name, event_uid) + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup event {event_uid}: {cleanup_error}")