fix(notes): Include ETags in responses to avoid accidently updates

This commit is contained in:
Chris Coutinho
2025-08-31 19:20:51 +02:00
parent daeb95f3c3
commit 892a8d2d23
3 changed files with 109 additions and 5 deletions
+3
View File
@@ -50,6 +50,7 @@ class CreateNoteResponse(IdResponse):
title: str = Field(description="The created note title")
category: str = Field(description="The created note category")
etag: str = Field(description="Current ETag for the created note")
class UpdateNoteResponse(BaseResponse):
@@ -58,6 +59,7 @@ class UpdateNoteResponse(BaseResponse):
id: int = Field(description="The updated note ID")
title: str = Field(description="The updated note title")
category: str = Field(description="The updated note category")
etag: str = Field(description="Current ETag for the updated note")
class DeleteNoteResponse(StatusResponse):
@@ -72,6 +74,7 @@ class AppendContentResponse(BaseResponse):
id: int = Field(description="The updated note ID")
title: str = Field(description="The updated note title")
category: str = Field(description="The updated note category")
etag: str = Field(description="Current ETag for the updated note")
class SearchNotesResponse(BaseResponse):
+9 -4
View File
@@ -91,7 +91,7 @@ def configure_notes_tools(mcp: FastMCP):
)
note = Note(**note_data)
return CreateNoteResponse(
id=note.id, title=note.title, category=note.category
id=note.id, title=note.title, category=note.category, etag=note.etag
)
except HTTPStatusError as e:
if e.response.status_code == 403:
@@ -118,7 +118,12 @@ def configure_notes_tools(mcp: FastMCP):
category: str | None,
ctx: Context,
) -> UpdateNoteResponse | ErrorResponse:
"""Update an existing note's title, content, or category"""
"""Update an existing note's title, content, or category.
REQUIRED: etag parameter must be provided to prevent overwriting concurrent changes.
Get the current ETag by first retrieving the note using nc://Notes/{note_id} resource.
If the note has been modified by someone else since you retrieved it,
the update will fail with a 412 error."""
logger.info("Updating note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
try:
@@ -131,7 +136,7 @@ def configure_notes_tools(mcp: FastMCP):
)
note = Note(**note_data)
return UpdateNoteResponse(
id=note.id, title=note.title, category=note.category
id=note.id, title=note.title, category=note.category, etag=note.etag
)
except HTTPStatusError as e:
if e.response.status_code == 404:
@@ -164,7 +169,7 @@ def configure_notes_tools(mcp: FastMCP):
)
note = Note(**note_data)
return AppendContentResponse(
id=note.id, title=note.title, category=note.category
id=note.id, title=note.title, category=note.category, etag=note.etag
)
except HTTPStatusError as e:
if e.response.status_code == 404:
+97 -1
View File
@@ -123,8 +123,11 @@ async def test_mcp_notes_crud_workflow(
created_note = create_result.content[0].text
note_data = json.loads(created_note) # Parse the returned JSON
note_id = note_data["id"]
create_etag = note_data["etag"] # Verify create response includes ETag
logger.info(f"Note created via MCP with ID: {note_id}")
logger.info(f"Note created via MCP with ID: {note_id}, ETag: {create_etag}")
assert "etag" in note_data, "Create response should include ETag"
assert create_etag, "Create ETag should not be empty"
# 2. Verify creation via direct NextcloudClient
direct_note = await nc_client.notes.get_note(note_id)
@@ -165,6 +168,15 @@ async def test_mcp_notes_crud_workflow(
f"MCP note update failed: {update_result.content}"
)
# Verify update response includes new ETag
updated_note_data = json.loads(update_result.content[0].text)
update_etag = updated_note_data["etag"]
logger.info(f"Note updated via MCP, new ETag: {update_etag}")
assert "etag" in updated_note_data, "Update response should include ETag"
assert update_etag, "Update ETag should not be empty"
assert update_etag != etag, "ETag should change after update"
# 5. Verify update via direct NextcloudClient
updated_direct_note = await nc_client.notes.get_note(note_id)
assert updated_direct_note["title"] == updated_title
@@ -181,6 +193,15 @@ async def test_mcp_notes_crud_workflow(
f"MCP note append failed: {append_result.content}"
)
# Verify append response includes new ETag
appended_note_data = json.loads(append_result.content[0].text)
append_etag = appended_note_data["etag"]
logger.info(f"Content appended via MCP, new ETag: {append_etag}")
assert "etag" in appended_note_data, "Append response should include ETag"
assert append_etag, "Append ETag should not be empty"
assert append_etag != update_etag, "ETag should change after append"
# 7. Verify append via direct NextcloudClient
appended_direct_note = await nc_client.notes.get_note(note_id)
assert append_content in appended_direct_note["content"]
@@ -256,6 +277,81 @@ async def test_mcp_notes_crud_workflow(
logger.warning(f"Failed to cleanup note: {e}")
async def test_mcp_notes_etag_conflict(
nc_mcp_client: ClientSession, nc_client: NextcloudClient
):
"""Test that MCP note updates fail when using stale ETags."""
unique_suffix = uuid.uuid4().hex[:8]
test_title = f"ETag Test Note {unique_suffix}"
test_content = f"This is test content for ETag testing {unique_suffix}"
test_category = "ETTesting"
created_note = None
try:
# 1. Create note via MCP
logger.info(f"Creating note for ETag conflict test: {test_title}")
create_result = await nc_mcp_client.call_tool(
"nc_notes_create_note",
{"title": test_title, "content": test_content, "category": test_category},
)
assert create_result.isError is False
note_data = json.loads(create_result.content[0].text)
note_id = note_data["id"]
original_etag = note_data["etag"]
created_note = note_data
# 2. Update note to change ETag
first_update_result = await nc_mcp_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": original_etag,
"title": f"First Update {test_title}",
"content": test_content,
"category": test_category,
},
)
assert first_update_result.isError is False
updated_data = json.loads(first_update_result.content[0].text)
new_etag = updated_data["etag"]
assert new_etag != original_etag, "ETag should have changed after update"
# 3. Try to update with the stale (original) ETag - this should fail
logger.info(f"Attempting update with stale ETag: {original_etag}")
conflict_result = await nc_mcp_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": original_etag, # Use stale ETag
"title": "This should fail",
"content": "This update should be rejected",
"category": test_category,
},
)
# 4. Verify the update was rejected with a 412 error
assert conflict_result.isError is True, "Update with stale ETag should fail"
error_content = conflict_result.content[0].text
assert "modified by someone else" in error_content or "412" in error_content, (
f"Expected conflict error message, got: {error_content}"
)
logger.info("Successfully verified ETag conflict handling via MCP")
finally:
# Cleanup
if created_note is not None:
try:
await nc_client.notes.delete_note(created_note["id"])
logger.info(f"Cleaned up test note {created_note['id']}")
except Exception as e:
logger.warning(f"Failed to cleanup test note: {e}")
async def test_mcp_webdav_workflow(
nc_mcp_client: ClientSession, nc_client: NextcloudClient
):