Compare commits

..

16 Commits

Author SHA1 Message Date
github-actions[bot] f034012101 bump: version 0.8.2 → 0.8.3 2025-08-31 19:22:11 +00:00
Chris Coutinho 7c4c0284f3 Merge pull request #140 from cbcoutinho/feature/etag
fix(notes): Include ETags in responses to avoid accidently updates
2025-08-31 21:21:50 +02:00
Chris Coutinho 892340fb66 chore: Remove unused model SuccessResponse 2025-08-31 21:15:43 +02:00
Chris Coutinho f79b957644 test: Update tests with McpError 2025-08-31 21:08:04 +02:00
Chris Coutinho ef1fb9e9aa fix(server): Replace ErrorResponses with standard McpErrors 2025-08-31 20:58:12 +02:00
Chris Coutinho d712b5487c test(notes): Modify tests with updated error handling 2025-08-31 19:32:39 +02:00
Chris Coutinho 892a8d2d23 fix(notes): Include ETags in responses to avoid accidently updates 2025-08-31 19:20:51 +02:00
github-actions[bot] daeb95f3c3 bump: version 0.8.1 → 0.8.2 2025-08-31 10:36:56 +00:00
Chris Coutinho 36d44d1781 Merge pull request #139 from cbcoutinho/feature/notes-no-return-content
fix(notes): Remove note contents from responses to reduce token usage
2025-08-31 12:36:30 +02:00
Chris Coutinho 949fb7124b fix(notes): Remove note contents from responses to reduce token usage 2025-08-31 11:55:15 +02:00
github-actions[bot] 6c4f071d2b bump: version 0.8.0 → 0.8.1 2025-08-30 20:38:13 +00:00
Chris Coutinho 53b11f7fbb fix(model): Serialize timestamps in RFC3339 format 2025-08-30 22:37:16 +02:00
Chris Coutinho 336bc45637 Merge pull request #138 from cbcoutinho/renovate/nextcloud-31.0.8
chore(deps): update nextcloud:31.0.8 docker digest to fcf6370
2025-08-30 20:29:17 +02:00
renovate-bot-cbcoutinho[bot] 6c587bb265 chore(deps): update nextcloud:31.0.8 docker digest to fcf6370 2025-08-30 18:19:45 +00:00
github-actions[bot] 6b1f5c12c8 bump: version 0.7.2 → 0.8.0 2025-08-30 17:28:57 +00:00
Chris Coutinho f8dc1f060b Merge pull request #137 from cbcoutinho/feature/claude-code
Feature/claude code
2025-08-30 19:28:33 +02:00
13 changed files with 365 additions and 109 deletions
+30
View File
@@ -1,3 +1,33 @@
## v0.8.3 (2025-08-31)
### Fix
- **server**: Replace ErrorResponses with standard McpErrors
- **notes**: Include ETags in responses to avoid accidently updates
## v0.8.2 (2025-08-31)
### Fix
- **notes**: Remove note contents from responses to reduce token usage
## v0.8.1 (2025-08-30)
### Fix
- **model**: Serialize timestamps in RFC3339 format
## v0.8.0 (2025-08-30)
### Feat
- **client**: Preserve fields when modifying contacts/calendar resources
- **server**: Add structured output to all tool/resource output
### Refactor
- Use _make_request where available
## v0.7.2 (2025-08-30) ## v0.7.2 (2025-08-30)
### Fix ### Fix
+7 -1
View File
@@ -102,13 +102,19 @@ Each Nextcloud app has a corresponding server module that:
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests - **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
#### Testing Best Practices #### Testing Best Practices
- **Always restart MCP server** after code changes with `docker-compose up --build -d mcp` - **MANDATORY: Always run tests after implementing features or fixing bugs**
- Run tests to completion before considering any task complete
- If tests require modifications to pass, ask for permission before proceeding
- Use `docker-compose up --build -d mcp` to rebuild MCP container after code changes
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work: - **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
- `nc_mcp_client` - MCP client session for tool/resource testing - `nc_mcp_client` - MCP client session for tool/resource testing
- `nc_client` - Direct NextcloudClient for setup/cleanup operations - `nc_client` - Direct NextcloudClient for setup/cleanup operations
- `temporary_note` - Creates and cleans up test notes automatically - `temporary_note` - Creates and cleans up test notes automatically
- `temporary_addressbook` - Creates and cleans up test address books - `temporary_addressbook` - Creates and cleans up test address books
- `temporary_contact` - Creates and cleans up test contacts - `temporary_contact` - Creates and cleans up test contacts
- **Test specific functionality** after changes:
- For Notes changes: `uv run pytest tests/integration/test_mcp.py -k "notes" -v`
- For specific API changes: `uv run pytest tests/integration/test_notes_api.py -v`
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead - **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
### Configuration Files ### Configuration Files
+1 -1
View File
@@ -21,7 +21,7 @@ services:
restart: always restart: always
app: app:
image: nextcloud:31.0.8@sha256:3eaddb0a9c56e6cf81ad258a5d05b78f747f6434b974f9a44e3f0dd91311b6ef image: nextcloud:31.0.8@sha256:fcf637074755bb1d27644441b938bf39b27dd6c0a8c2326a5752e1b3d5014366
#user: www-data:www-data #user: www-data:www-data
restart: always restart: always
#post_start: #post_start:
-4
View File
@@ -3,8 +3,6 @@
# Base models # Base models
from .base import ( from .base import (
BaseResponse, BaseResponse,
ErrorResponse,
SuccessResponse,
IdResponse, IdResponse,
StatusResponse, StatusResponse,
) )
@@ -82,8 +80,6 @@ from .webdav import (
__all__ = [ __all__ = [
# Base models # Base models
"BaseResponse", "BaseResponse",
"ErrorResponse",
"SuccessResponse",
"IdResponse", "IdResponse",
"StatusResponse", "StatusResponse",
# Notes models # Notes models
+21 -23
View File
@@ -1,40 +1,38 @@
"""Base Pydantic models for common response patterns.""" """Base Pydantic models for common response patterns."""
from datetime import datetime from datetime import datetime, timezone
from typing import Any, Dict, Optional, Union from typing import Optional, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, field_serializer
def _utc_now() -> datetime:
"""Generate UTC timestamp for responses."""
return datetime.now(timezone.utc)
class BaseResponse(BaseModel): class BaseResponse(BaseModel):
"""Base response model for all MCP tool responses.""" """Base response model for all MCP tool responses."""
model_config = {"json_encoders": {datetime: lambda v: v.isoformat()}}
success: bool = Field( success: bool = Field(
default=True, description="Whether the operation was successful" default=True, description="Whether the operation was successful"
) )
timestamp: datetime = Field( timestamp: datetime = Field(
default_factory=datetime.now, description="Response timestamp" default_factory=_utc_now, description="Response timestamp"
) )
@field_serializer("timestamp")
class ErrorResponse(BaseResponse): def serialize_timestamp(self, timestamp: datetime) -> str:
"""Response model for error cases.""" """Serialize timestamp to RFC3339 format for MCP compliance."""
if timestamp.tzinfo is None:
success: bool = Field(default=False, description="Always False for error responses") # If somehow we get a naive datetime, assume UTC
error: str = Field(description="Error message") timestamp = timestamp.replace(tzinfo=timezone.utc)
error_code: Optional[str] = Field(None, description="Optional error code") # Use isoformat() which produces RFC3339 compliant format
details: Optional[Dict[str, Any]] = Field( # For UTC times, replace '+00:00' with 'Z' as preferred by many systems
None, description="Additional error details" iso_string = timestamp.isoformat()
) if iso_string.endswith("+00:00"):
return iso_string[:-6] + "Z"
return iso_string
class SuccessResponse(BaseResponse):
"""Generic success response."""
message: Optional[str] = Field(None, description="Optional success message")
data: Optional[Dict[str, Any]] = Field(None, description="Optional response data")
class IdResponse(BaseResponse): class IdResponse(BaseResponse):
+12 -4
View File
@@ -19,7 +19,7 @@ class Note(BaseModel):
favorite: bool = Field( favorite: bool = Field(
default=False, description="Whether note is marked as favorite" default=False, description="Whether note is marked as favorite"
) )
etag: Optional[str] = Field(None, description="ETag for versioning") etag: str = Field(description="ETag for versioning")
readonly: bool = Field(default=False, description="Whether note is read-only") readonly: bool = Field(default=False, description="Whether note is read-only")
@property @property
@@ -48,13 +48,18 @@ class NotesSettings(BaseModel):
class CreateNoteResponse(IdResponse): class CreateNoteResponse(IdResponse):
"""Response model for note creation.""" """Response model for note creation."""
note: Note = Field(description="The created note") title: str = Field(description="The created note title")
category: str = Field(description="The created note category")
etag: str = Field(description="Current ETag for the created note")
class UpdateNoteResponse(BaseResponse): class UpdateNoteResponse(BaseResponse):
"""Response model for note updates.""" """Response model for note updates."""
note: Note = Field(description="The updated note") id: int = Field(description="The updated note ID")
title: str = Field(description="The updated note title")
category: str = Field(description="The updated note category")
etag: str = Field(description="Current ETag for the updated note")
class DeleteNoteResponse(StatusResponse): class DeleteNoteResponse(StatusResponse):
@@ -66,7 +71,10 @@ class DeleteNoteResponse(StatusResponse):
class AppendContentResponse(BaseResponse): class AppendContentResponse(BaseResponse):
"""Response model for appending content to a note.""" """Response model for appending content to a note."""
note: Note = Field(description="The updated note after appending content") id: int = Field(description="The updated note ID")
title: str = Field(description="The updated note title")
category: str = Field(description="The updated note category")
etag: str = Field(description="Current ETag for the updated note")
class SearchNotesResponse(BaseResponse): class SearchNotesResponse(BaseResponse):
+100 -49
View File
@@ -1,10 +1,11 @@
import logging import logging
from httpx import HTTPStatusError from httpx import HTTPStatusError
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
from mcp.server.fastmcp import Context, FastMCP from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.models.base import ErrorResponse
from nextcloud_mcp_server.models.notes import ( from nextcloud_mcp_server.models.notes import (
Note, Note,
NotesSettings, NotesSettings,
@@ -54,8 +55,6 @@ def configure_notes_tools(mcp: FastMCP):
@mcp.resource("nc://Notes/{note_id}") @mcp.resource("nc://Notes/{note_id}")
async def nc_get_note(note_id: int): async def nc_get_note(note_id: int):
"""Get user note using note id""" """Get user note using note id"""
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
ctx: Context = mcp.get_context() ctx: Context = mcp.get_context()
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
@@ -80,7 +79,7 @@ def configure_notes_tools(mcp: FastMCP):
@mcp.tool() @mcp.tool()
async def nc_notes_create_note( async def nc_notes_create_note(
title: str, content: str, category: str, ctx: Context title: str, content: str, category: str, ctx: Context
) -> CreateNoteResponse | ErrorResponse: ) -> CreateNoteResponse:
"""Create a new note""" """Create a new note"""
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
try: try:
@@ -90,21 +89,32 @@ def configure_notes_tools(mcp: FastMCP):
category=category, category=category,
) )
note = Note(**note_data) note = Note(**note_data)
return CreateNoteResponse(id=note.id, note=note) return CreateNoteResponse(
id=note.id, title=note.title, category=note.category, etag=note.etag
)
except HTTPStatusError as e: except HTTPStatusError as e:
if e.response.status_code == 403: if e.response.status_code == 403:
return ErrorResponse( raise McpError(
error="Access denied: insufficient permissions to create notes" ErrorData(
code=-1,
message="Access denied: insufficient permissions to create notes",
)
) )
elif e.response.status_code == 413: elif e.response.status_code == 413:
return ErrorResponse(error="Note content too large") raise McpError(ErrorData(code=-1, message="Note content too large"))
elif e.response.status_code == 409: elif e.response.status_code == 409:
return ErrorResponse( raise McpError(
error=f"A note with title '{title}' already exists in this category" ErrorData(
code=-1,
message=f"A note with title '{title}' already exists in this category",
)
) )
else: else:
return ErrorResponse( raise McpError(
error=f"Failed to create note: server error ({e.response.status_code})" ErrorData(
code=-1,
message=f"Failed to create note: server error ({e.response.status_code})",
)
) )
@mcp.tool() @mcp.tool()
@@ -115,8 +125,13 @@ def configure_notes_tools(mcp: FastMCP):
content: str | None, content: str | None,
category: str | None, category: str | None,
ctx: Context, ctx: Context,
) -> UpdateNoteResponse | ErrorResponse: ) -> UpdateNoteResponse:
"""Update an existing note's title, content, or category""" """Update an existing note's title, content, or category.
REQUIRED: etag parameter must be provided to prevent overwriting concurrent changes.
Get the current ETag by first retrieving the note using nc://Notes/{note_id} resource.
If the note has been modified by someone else since you retrieved it,
the update will fail with a 412 error."""
logger.info("Updating note %s", note_id) logger.info("Updating note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
try: try:
@@ -128,30 +143,45 @@ def configure_notes_tools(mcp: FastMCP):
category=category, category=category,
) )
note = Note(**note_data) note = Note(**note_data)
return UpdateNoteResponse(note=note) return UpdateNoteResponse(
id=note.id, title=note.title, category=note.category, etag=note.etag
)
except HTTPStatusError as e: except HTTPStatusError as e:
if e.response.status_code == 404: if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found") raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
elif e.response.status_code == 412: elif e.response.status_code == 412:
return ErrorResponse( raise McpError(
error=f"Note {note_id} has been modified by someone else. Please refresh and try again." ErrorData(
code=-1,
message=f"Note {note_id} has been modified by someone else. Please refresh and try again.",
)
) )
elif e.response.status_code == 403: elif e.response.status_code == 403:
return ErrorResponse( raise McpError(
error=f"Access denied: insufficient permissions to update note {note_id}" ErrorData(
code=-1,
message=f"Access denied: insufficient permissions to update note {note_id}",
)
) )
elif e.response.status_code == 413: elif e.response.status_code == 413:
return ErrorResponse(error="Updated note content is too large") raise McpError(
ErrorData(code=-1, message="Updated note content is too large")
)
else: else:
return ErrorResponse( raise McpError(
error=f"Failed to update note {note_id}: server error ({e.response.status_code})" ErrorData(
code=-1,
message=f"Failed to update note {note_id}: server error ({e.response.status_code})",
)
) )
@mcp.tool() @mcp.tool()
async def nc_notes_append_content( async def nc_notes_append_content(
note_id: int, content: str, ctx: Context note_id: int, content: str, ctx: Context
) -> AppendContentResponse | ErrorResponse: ) -> AppendContentResponse:
"""Append content to an existing note with a clear separator""" """Append content to an existing note. The tool adds a `\n---\n`
between the note and what will be appended."""
logger.info("Appending content to note %s", note_id) logger.info("Appending content to note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
try: try:
@@ -159,27 +189,36 @@ def configure_notes_tools(mcp: FastMCP):
note_id=note_id, content=content note_id=note_id, content=content
) )
note = Note(**note_data) note = Note(**note_data)
return AppendContentResponse(note=note) return AppendContentResponse(
id=note.id, title=note.title, category=note.category, etag=note.etag
)
except HTTPStatusError as e: except HTTPStatusError as e:
if e.response.status_code == 404: if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found") raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
elif e.response.status_code == 403: elif e.response.status_code == 403:
return ErrorResponse( raise McpError(
error=f"Access denied: insufficient permissions to modify note {note_id}" ErrorData(
code=-1,
message=f"Access denied: insufficient permissions to modify note {note_id}",
)
) )
elif e.response.status_code == 413: elif e.response.status_code == 413:
return ErrorResponse( raise McpError(
error="Content to append would make the note too large" ErrorData(
code=-1,
message="Content to append would make the note too large",
)
) )
else: else:
return ErrorResponse( raise McpError(
error=f"Failed to append content to note {note_id}: server error ({e.response.status_code})" ErrorData(
code=-1,
message=f"Failed to append content to note {note_id}: server error ({e.response.status_code})",
)
) )
@mcp.tool() @mcp.tool()
async def nc_notes_search_notes( async def nc_notes_search_notes(query: str, ctx: Context) -> SearchNotesResponse:
query: str, ctx: Context
) -> SearchNotesResponse | ErrorResponse:
"""Search notes by title or content, returning only id, title, and category.""" """Search notes by title or content, returning only id, title, and category."""
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
try: try:
@@ -201,20 +240,26 @@ def configure_notes_tools(mcp: FastMCP):
) )
except HTTPStatusError as e: except HTTPStatusError as e:
if e.response.status_code == 403: if e.response.status_code == 403:
return ErrorResponse( raise McpError(
error="Access denied: insufficient permissions to search notes" ErrorData(
code=-1,
message="Access denied: insufficient permissions to search notes",
)
) )
elif e.response.status_code == 400: elif e.response.status_code == 400:
return ErrorResponse(error="Invalid search query format") raise McpError(
ErrorData(code=-1, message="Invalid search query format")
)
else: else:
return ErrorResponse( raise McpError(
error=f"Search failed: server error ({e.response.status_code})" ErrorData(
code=-1,
message=f"Search failed: server error ({e.response.status_code})",
)
) )
@mcp.tool() @mcp.tool()
async def nc_notes_delete_note( async def nc_notes_delete_note(note_id: int, ctx: Context) -> DeleteNoteResponse:
note_id: int, ctx: Context
) -> DeleteNoteResponse | ErrorResponse:
"""Delete a note permanently""" """Delete a note permanently"""
logger.info("Deleting note %s", note_id) logger.info("Deleting note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
@@ -227,12 +272,18 @@ def configure_notes_tools(mcp: FastMCP):
) )
except HTTPStatusError as e: except HTTPStatusError as e:
if e.response.status_code == 404: if e.response.status_code == 404:
return ErrorResponse(error=f"Note {note_id} not found") raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
elif e.response.status_code == 403: elif e.response.status_code == 403:
return ErrorResponse( raise McpError(
error=f"Access denied: insufficient permissions to delete note {note_id}" ErrorData(
code=-1,
message=f"Access denied: insufficient permissions to delete note {note_id}",
)
) )
else: else:
return ErrorResponse( raise McpError(
error=f"Failed to delete note {note_id}: server error ({e.response.status_code})" ErrorData(
code=-1,
message=f"Failed to delete note {note_id}: server error ({e.response.status_code})",
)
) )
+2 -2
View File
@@ -43,11 +43,11 @@ def configure_webdav_tools(mcp: FastMCP):
Examples: Examples:
# Read a text file # Read a text file
result = await nc_webdav_read_file("Documents/readme.txt") result = await nc_webdav_read_file("Documents/readme.txt")
print(result['content']) # Decoded text content logger.info(result['content']) # Decoded text content
# Read a binary file # Read a binary file
result = await nc_webdav_read_file("Images/photo.jpg") result = await nc_webdav_read_file("Images/photo.jpg")
print(result['encoding']) # 'base64' logger.info(result['encoding']) # 'base64'
""" """
client: NextcloudClient = ctx.request_context.lifespan_context.client client: NextcloudClient = ctx.request_context.lifespan_context.client
content, content_type = await client.webdav.read_file(path) content, content_type = await client.webdav.read_file(path)
+1 -1
View File
@@ -1,6 +1,6 @@
[project] [project]
name = "nextcloud-mcp-server" name = "nextcloud-mcp-server"
version = "0.7.2" version = "0.8.3"
description = "" description = ""
authors = [ authors = [
{name = "Chris Coutinho",email = "chris@coutinho.io"} {name = "Chris Coutinho",email = "chris@coutinho.io"}
+8 -22
View File
@@ -20,23 +20,15 @@ async def test_missing_note_resource_error(nc_mcp_client: ClientSession):
@pytest.mark.integration @pytest.mark.integration
async def test_delete_missing_note_tool_error(nc_mcp_client: ClientSession): async def test_delete_missing_note_tool_error(nc_mcp_client: ClientSession):
"""Test that deleting a non-existent note returns proper error.""" """Test that deleting a non-existent note returns proper error."""
# Try to delete a non-existent note # Try to delete a non-existent note - should return error response
response = await nc_mcp_client.call_tool( response = await nc_mcp_client.call_tool(
"nc_notes_delete_note", {"note_id": 999999} "nc_notes_delete_note", {"note_id": 999999}
) )
logger.info(f"Delete missing note response: {response}") # Should return error response (not raise exception) for tools
# Should return structured error response with improved message
assert response is not None assert response is not None
assert ( assert response.isError is True
response.isError is False assert "Note 999999 not found" in response.content[0].text
) # Tools now return structured responses, not MCP errors
# Check structured content for error
assert "success" in response.structuredContent["result"]
assert response.structuredContent["result"]["success"] is False
assert "Note 999999 not found" in response.structuredContent["result"]["error"]
@pytest.mark.integration @pytest.mark.integration
@@ -81,7 +73,7 @@ async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_cl
note_id = note_data["id"] note_id = note_data["id"]
try: try:
# Try to update with invalid ETag # Try to update with invalid ETag - should return error response
response = await nc_mcp_client.call_tool( response = await nc_mcp_client.call_tool(
"nc_notes_update_note", "nc_notes_update_note",
{ {
@@ -93,16 +85,10 @@ async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_cl
}, },
) )
logger.info(f"Invalid ETag response: {response}") # Should return error response (not raise exception) for tools
# Should return structured error response with improved message
assert response is not None assert response is not None
assert response.isError is False # Tools now return structured responses assert response.isError is True
assert "success" in response.structuredContent["result"] assert "modified by someone else" in response.content[0].text
assert response.structuredContent["result"]["success"] is False
assert (
"modified by someone else" in response.structuredContent["result"]["error"]
)
finally: finally:
# Clean up # Clean up
+98 -1
View File
@@ -123,8 +123,11 @@ async def test_mcp_notes_crud_workflow(
created_note = create_result.content[0].text created_note = create_result.content[0].text
note_data = json.loads(created_note) # Parse the returned JSON note_data = json.loads(created_note) # Parse the returned JSON
note_id = note_data["id"] note_id = note_data["id"]
create_etag = note_data["etag"] # Verify create response includes ETag
logger.info(f"Note created via MCP with ID: {note_id}") logger.info(f"Note created via MCP with ID: {note_id}, ETag: {create_etag}")
assert "etag" in note_data, "Create response should include ETag"
assert create_etag, "Create ETag should not be empty"
# 2. Verify creation via direct NextcloudClient # 2. Verify creation via direct NextcloudClient
direct_note = await nc_client.notes.get_note(note_id) direct_note = await nc_client.notes.get_note(note_id)
@@ -165,6 +168,15 @@ async def test_mcp_notes_crud_workflow(
f"MCP note update failed: {update_result.content}" f"MCP note update failed: {update_result.content}"
) )
# Verify update response includes new ETag
updated_note_data = json.loads(update_result.content[0].text)
update_etag = updated_note_data["etag"]
logger.info(f"Note updated via MCP, new ETag: {update_etag}")
assert "etag" in updated_note_data, "Update response should include ETag"
assert update_etag, "Update ETag should not be empty"
assert update_etag != etag, "ETag should change after update"
# 5. Verify update via direct NextcloudClient # 5. Verify update via direct NextcloudClient
updated_direct_note = await nc_client.notes.get_note(note_id) updated_direct_note = await nc_client.notes.get_note(note_id)
assert updated_direct_note["title"] == updated_title assert updated_direct_note["title"] == updated_title
@@ -181,6 +193,15 @@ async def test_mcp_notes_crud_workflow(
f"MCP note append failed: {append_result.content}" f"MCP note append failed: {append_result.content}"
) )
# Verify append response includes new ETag
appended_note_data = json.loads(append_result.content[0].text)
append_etag = appended_note_data["etag"]
logger.info(f"Content appended via MCP, new ETag: {append_etag}")
assert "etag" in appended_note_data, "Append response should include ETag"
assert append_etag, "Append ETag should not be empty"
assert append_etag != update_etag, "ETag should change after append"
# 7. Verify append via direct NextcloudClient # 7. Verify append via direct NextcloudClient
appended_direct_note = await nc_client.notes.get_note(note_id) appended_direct_note = await nc_client.notes.get_note(note_id)
assert append_content in appended_direct_note["content"] assert append_content in appended_direct_note["content"]
@@ -256,6 +277,82 @@ async def test_mcp_notes_crud_workflow(
logger.warning(f"Failed to cleanup note: {e}") logger.warning(f"Failed to cleanup note: {e}")
async def test_mcp_notes_etag_conflict(
nc_mcp_client: ClientSession, nc_client: NextcloudClient
):
"""Test that MCP note updates fail when using stale ETags."""
unique_suffix = uuid.uuid4().hex[:8]
test_title = f"ETag Test Note {unique_suffix}"
test_content = f"This is test content for ETag testing {unique_suffix}"
test_category = "ETTesting"
created_note = None
try:
# 1. Create note via MCP
logger.info(f"Creating note for ETag conflict test: {test_title}")
create_result = await nc_mcp_client.call_tool(
"nc_notes_create_note",
{"title": test_title, "content": test_content, "category": test_category},
)
assert create_result.isError is False
note_data = json.loads(create_result.content[0].text)
note_id = note_data["id"]
original_etag = note_data["etag"]
created_note = note_data
# 2. Update note to change ETag
first_update_result = await nc_mcp_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": original_etag,
"title": f"First Update {test_title}",
"content": test_content,
"category": test_category,
},
)
assert first_update_result.isError is False
updated_data = json.loads(first_update_result.content[0].text)
new_etag = updated_data["etag"]
assert new_etag != original_etag, "ETag should have changed after update"
# 3. Try to update with the stale (original) ETag - this should fail
logger.info(f"Attempting update with stale ETag: {original_etag}")
conflict_result = await nc_mcp_client.call_tool(
"nc_notes_update_note",
{
"note_id": note_id,
"etag": original_etag, # Use stale ETag
"title": "This should fail",
"content": "This update should be rejected",
"category": test_category,
},
)
# 4. Verify the update was rejected with a 412 error
# With McpError, tools now return proper error responses
assert conflict_result.isError is True, "Update with stale ETag should fail"
response_content = conflict_result.content[0].text
assert "modified by someone else" in response_content, (
f"Expected conflict error message, got: {response_content}"
)
logger.info("Successfully verified ETag conflict handling via MCP")
finally:
# Cleanup
if created_note is not None:
try:
await nc_client.notes.delete_note(created_note["id"])
logger.info(f"Cleaned up test note {created_note['id']}")
except Exception as e:
logger.warning(f"Failed to cleanup test note: {e}")
async def test_mcp_webdav_workflow( async def test_mcp_webdav_workflow(
nc_mcp_client: ClientSession, nc_client: NextcloudClient nc_mcp_client: ClientSession, nc_client: NextcloudClient
): ):
+84
View File
@@ -0,0 +1,84 @@
"""Unit tests for Pydantic models and serialization."""
from datetime import datetime, timezone
import json
import logging
import re
from nextcloud_mcp_server.models.base import BaseResponse
logger = logging.getLogger(__name__)
def test_timestamp_format_validation():
"""Test that timestamps in BaseResponse are RFC3339 compliant for MCP validation.
This test should initially fail, demonstrating the timestamp validation error
seen in MCP inspector. MCP expects RFC3339 format with timezone information.
"""
# Create a response object
response = BaseResponse()
# Serialize to JSON (mimics what MCP inspector sees)
json_str = response.model_dump_json()
data = json.loads(json_str)
timestamp_str = data["timestamp"]
# RFC3339 regex pattern (what MCP expects)
# Format: YYYY-MM-DDTHH:MM:SS[.ffffff][Z|±HH:MM]
rfc3339_pattern = (
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})$"
)
# This assertion should FAIL with current implementation
assert re.match(rfc3339_pattern, timestamp_str), (
f"Timestamp '{timestamp_str}' is not RFC3339 compliant. "
f"MCP expects format like '2025-08-30T19:22:58.862377Z' or '2025-08-30T19:22:58.862377+00:00'"
)
def test_base_response_timestamp_is_utc():
"""Test that BaseResponse timestamps are in UTC timezone."""
response = BaseResponse()
# The timestamp should be timezone-aware and in UTC
assert response.timestamp.tzinfo is not None, (
"Timestamp should have timezone information"
)
assert response.timestamp.tzinfo == timezone.utc, (
"Timestamp should be in UTC timezone"
)
def test_serialized_timestamp_ends_with_z_or_offset():
"""Test that serialized timestamps have proper timezone suffix."""
response = BaseResponse()
json_str = response.model_dump_json()
data = json.loads(json_str)
timestamp_str = data["timestamp"]
# Should end with 'Z' (UTC) or timezone offset like '+00:00'
assert timestamp_str.endswith("Z") or re.search(
r"[+-]\d{2}:\d{2}$", timestamp_str
), (
f"Timestamp '{timestamp_str}' should end with 'Z' or timezone offset like '+00:00'"
)
def test_current_broken_format():
"""Test showing the current broken timestamp format that causes MCP validation errors."""
# This demonstrates what the current code produces
current_naive_dt = datetime.now()
current_format = current_naive_dt.isoformat()
# Show that current format lacks timezone info
assert "Z" not in current_format
assert "+" not in current_format
assert "-" not in current_format[-6:] # Check last 6 chars for timezone
logger.info(f"Current broken format: {current_format}")
logger.info(
"This format causes MCP validation errors because it lacks timezone information"
)
Generated
+1 -1
View File
@@ -505,7 +505,7 @@ wheels = [
[[package]] [[package]]
name = "nextcloud-mcp-server" name = "nextcloud-mcp-server"
version = "0.7.2" version = "0.8.3"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "httpx" }, { name = "httpx" },