Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 892340fb66 | |||
| f79b957644 | |||
| ef1fb9e9aa | |||
| d712b5487c | |||
| 892a8d2d23 | |||
| daeb95f3c3 | |||
| 36d44d1781 | |||
| 949fb7124b | |||
| 6c4f071d2b | |||
| 53b11f7fbb | |||
| 336bc45637 | |||
| 6c587bb265 | |||
| 6b1f5c12c8 | |||
| f8dc1f060b |
@@ -1,3 +1,26 @@
|
||||
## v0.8.2 (2025-08-31)
|
||||
|
||||
### Fix
|
||||
|
||||
- **notes**: Remove note contents from responses to reduce token usage
|
||||
|
||||
## v0.8.1 (2025-08-30)
|
||||
|
||||
### Fix
|
||||
|
||||
- **model**: Serialize timestamps in RFC3339 format
|
||||
|
||||
## v0.8.0 (2025-08-30)
|
||||
|
||||
### Feat
|
||||
|
||||
- **client**: Preserve fields when modifying contacts/calendar resources
|
||||
- **server**: Add structured output to all tool/resource output
|
||||
|
||||
### Refactor
|
||||
|
||||
- Use _make_request where available
|
||||
|
||||
## v0.7.2 (2025-08-30)
|
||||
|
||||
### Fix
|
||||
|
||||
@@ -102,13 +102,19 @@ Each Nextcloud app has a corresponding server module that:
|
||||
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
|
||||
|
||||
#### Testing Best Practices
|
||||
- **Always restart MCP server** after code changes with `docker-compose up --build -d mcp`
|
||||
- **MANDATORY: Always run tests after implementing features or fixing bugs**
|
||||
- Run tests to completion before considering any task complete
|
||||
- If tests require modifications to pass, ask for permission before proceeding
|
||||
- Use `docker-compose up --build -d mcp` to rebuild MCP container after code changes
|
||||
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
|
||||
- `nc_mcp_client` - MCP client session for tool/resource testing
|
||||
- `nc_client` - Direct NextcloudClient for setup/cleanup operations
|
||||
- `temporary_note` - Creates and cleans up test notes automatically
|
||||
- `temporary_addressbook` - Creates and cleans up test address books
|
||||
- `temporary_contact` - Creates and cleans up test contacts
|
||||
- **Test specific functionality** after changes:
|
||||
- For Notes changes: `uv run pytest tests/integration/test_mcp.py -k "notes" -v`
|
||||
- For specific API changes: `uv run pytest tests/integration/test_notes_api.py -v`
|
||||
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
|
||||
|
||||
### Configuration Files
|
||||
|
||||
+1
-1
@@ -21,7 +21,7 @@ services:
|
||||
restart: always
|
||||
|
||||
app:
|
||||
image: nextcloud:31.0.8@sha256:3eaddb0a9c56e6cf81ad258a5d05b78f747f6434b974f9a44e3f0dd91311b6ef
|
||||
image: nextcloud:31.0.8@sha256:fcf637074755bb1d27644441b938bf39b27dd6c0a8c2326a5752e1b3d5014366
|
||||
#user: www-data:www-data
|
||||
restart: always
|
||||
#post_start:
|
||||
|
||||
@@ -3,8 +3,6 @@
|
||||
# Base models
|
||||
from .base import (
|
||||
BaseResponse,
|
||||
ErrorResponse,
|
||||
SuccessResponse,
|
||||
IdResponse,
|
||||
StatusResponse,
|
||||
)
|
||||
@@ -82,8 +80,6 @@ from .webdav import (
|
||||
__all__ = [
|
||||
# Base models
|
||||
"BaseResponse",
|
||||
"ErrorResponse",
|
||||
"SuccessResponse",
|
||||
"IdResponse",
|
||||
"StatusResponse",
|
||||
# Notes models
|
||||
|
||||
@@ -1,40 +1,38 @@
|
||||
"""Base Pydantic models for common response patterns."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional, Union
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, field_serializer
|
||||
|
||||
|
||||
def _utc_now() -> datetime:
|
||||
"""Generate UTC timestamp for responses."""
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
class BaseResponse(BaseModel):
|
||||
"""Base response model for all MCP tool responses."""
|
||||
|
||||
model_config = {"json_encoders": {datetime: lambda v: v.isoformat()}}
|
||||
|
||||
success: bool = Field(
|
||||
default=True, description="Whether the operation was successful"
|
||||
)
|
||||
timestamp: datetime = Field(
|
||||
default_factory=datetime.now, description="Response timestamp"
|
||||
default_factory=_utc_now, description="Response timestamp"
|
||||
)
|
||||
|
||||
|
||||
class ErrorResponse(BaseResponse):
|
||||
"""Response model for error cases."""
|
||||
|
||||
success: bool = Field(default=False, description="Always False for error responses")
|
||||
error: str = Field(description="Error message")
|
||||
error_code: Optional[str] = Field(None, description="Optional error code")
|
||||
details: Optional[Dict[str, Any]] = Field(
|
||||
None, description="Additional error details"
|
||||
)
|
||||
|
||||
|
||||
class SuccessResponse(BaseResponse):
|
||||
"""Generic success response."""
|
||||
|
||||
message: Optional[str] = Field(None, description="Optional success message")
|
||||
data: Optional[Dict[str, Any]] = Field(None, description="Optional response data")
|
||||
@field_serializer("timestamp")
|
||||
def serialize_timestamp(self, timestamp: datetime) -> str:
|
||||
"""Serialize timestamp to RFC3339 format for MCP compliance."""
|
||||
if timestamp.tzinfo is None:
|
||||
# If somehow we get a naive datetime, assume UTC
|
||||
timestamp = timestamp.replace(tzinfo=timezone.utc)
|
||||
# Use isoformat() which produces RFC3339 compliant format
|
||||
# For UTC times, replace '+00:00' with 'Z' as preferred by many systems
|
||||
iso_string = timestamp.isoformat()
|
||||
if iso_string.endswith("+00:00"):
|
||||
return iso_string[:-6] + "Z"
|
||||
return iso_string
|
||||
|
||||
|
||||
class IdResponse(BaseResponse):
|
||||
|
||||
@@ -19,7 +19,7 @@ class Note(BaseModel):
|
||||
favorite: bool = Field(
|
||||
default=False, description="Whether note is marked as favorite"
|
||||
)
|
||||
etag: Optional[str] = Field(None, description="ETag for versioning")
|
||||
etag: str = Field(description="ETag for versioning")
|
||||
readonly: bool = Field(default=False, description="Whether note is read-only")
|
||||
|
||||
@property
|
||||
@@ -48,13 +48,18 @@ class NotesSettings(BaseModel):
|
||||
class CreateNoteResponse(IdResponse):
|
||||
"""Response model for note creation."""
|
||||
|
||||
note: Note = Field(description="The created note")
|
||||
title: str = Field(description="The created note title")
|
||||
category: str = Field(description="The created note category")
|
||||
etag: str = Field(description="Current ETag for the created note")
|
||||
|
||||
|
||||
class UpdateNoteResponse(BaseResponse):
|
||||
"""Response model for note updates."""
|
||||
|
||||
note: Note = Field(description="The updated note")
|
||||
id: int = Field(description="The updated note ID")
|
||||
title: str = Field(description="The updated note title")
|
||||
category: str = Field(description="The updated note category")
|
||||
etag: str = Field(description="Current ETag for the updated note")
|
||||
|
||||
|
||||
class DeleteNoteResponse(StatusResponse):
|
||||
@@ -66,7 +71,10 @@ class DeleteNoteResponse(StatusResponse):
|
||||
class AppendContentResponse(BaseResponse):
|
||||
"""Response model for appending content to a note."""
|
||||
|
||||
note: Note = Field(description="The updated note after appending content")
|
||||
id: int = Field(description="The updated note ID")
|
||||
title: str = Field(description="The updated note title")
|
||||
category: str = Field(description="The updated note category")
|
||||
etag: str = Field(description="Current ETag for the updated note")
|
||||
|
||||
|
||||
class SearchNotesResponse(BaseResponse):
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import logging
|
||||
from httpx import HTTPStatusError
|
||||
from mcp.shared.exceptions import McpError
|
||||
from mcp.types import ErrorData
|
||||
|
||||
from mcp.server.fastmcp import Context, FastMCP
|
||||
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
from nextcloud_mcp_server.models.base import ErrorResponse
|
||||
from nextcloud_mcp_server.models.notes import (
|
||||
Note,
|
||||
NotesSettings,
|
||||
@@ -54,8 +55,6 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
@mcp.resource("nc://Notes/{note_id}")
|
||||
async def nc_get_note(note_id: int):
|
||||
"""Get user note using note id"""
|
||||
from mcp.shared.exceptions import McpError
|
||||
from mcp.types import ErrorData
|
||||
|
||||
ctx: Context = mcp.get_context()
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
@@ -80,7 +79,7 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
@mcp.tool()
|
||||
async def nc_notes_create_note(
|
||||
title: str, content: str, category: str, ctx: Context
|
||||
) -> CreateNoteResponse | ErrorResponse:
|
||||
) -> CreateNoteResponse:
|
||||
"""Create a new note"""
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
try:
|
||||
@@ -90,21 +89,32 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
category=category,
|
||||
)
|
||||
note = Note(**note_data)
|
||||
return CreateNoteResponse(id=note.id, note=note)
|
||||
return CreateNoteResponse(
|
||||
id=note.id, title=note.title, category=note.category, etag=note.etag
|
||||
)
|
||||
except HTTPStatusError as e:
|
||||
if e.response.status_code == 403:
|
||||
return ErrorResponse(
|
||||
error="Access denied: insufficient permissions to create notes"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message="Access denied: insufficient permissions to create notes",
|
||||
)
|
||||
)
|
||||
elif e.response.status_code == 413:
|
||||
return ErrorResponse(error="Note content too large")
|
||||
raise McpError(ErrorData(code=-1, message="Note content too large"))
|
||||
elif e.response.status_code == 409:
|
||||
return ErrorResponse(
|
||||
error=f"A note with title '{title}' already exists in this category"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"A note with title '{title}' already exists in this category",
|
||||
)
|
||||
)
|
||||
else:
|
||||
return ErrorResponse(
|
||||
error=f"Failed to create note: server error ({e.response.status_code})"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Failed to create note: server error ({e.response.status_code})",
|
||||
)
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
@@ -115,8 +125,13 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
content: str | None,
|
||||
category: str | None,
|
||||
ctx: Context,
|
||||
) -> UpdateNoteResponse | ErrorResponse:
|
||||
"""Update an existing note's title, content, or category"""
|
||||
) -> UpdateNoteResponse:
|
||||
"""Update an existing note's title, content, or category.
|
||||
|
||||
REQUIRED: etag parameter must be provided to prevent overwriting concurrent changes.
|
||||
Get the current ETag by first retrieving the note using nc://Notes/{note_id} resource.
|
||||
If the note has been modified by someone else since you retrieved it,
|
||||
the update will fail with a 412 error."""
|
||||
logger.info("Updating note %s", note_id)
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
try:
|
||||
@@ -128,30 +143,45 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
category=category,
|
||||
)
|
||||
note = Note(**note_data)
|
||||
return UpdateNoteResponse(note=note)
|
||||
return UpdateNoteResponse(
|
||||
id=note.id, title=note.title, category=note.category, etag=note.etag
|
||||
)
|
||||
except HTTPStatusError as e:
|
||||
if e.response.status_code == 404:
|
||||
return ErrorResponse(error=f"Note {note_id} not found")
|
||||
raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
|
||||
elif e.response.status_code == 412:
|
||||
return ErrorResponse(
|
||||
error=f"Note {note_id} has been modified by someone else. Please refresh and try again."
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Note {note_id} has been modified by someone else. Please refresh and try again.",
|
||||
)
|
||||
)
|
||||
elif e.response.status_code == 403:
|
||||
return ErrorResponse(
|
||||
error=f"Access denied: insufficient permissions to update note {note_id}"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Access denied: insufficient permissions to update note {note_id}",
|
||||
)
|
||||
)
|
||||
elif e.response.status_code == 413:
|
||||
return ErrorResponse(error="Updated note content is too large")
|
||||
raise McpError(
|
||||
ErrorData(code=-1, message="Updated note content is too large")
|
||||
)
|
||||
else:
|
||||
return ErrorResponse(
|
||||
error=f"Failed to update note {note_id}: server error ({e.response.status_code})"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Failed to update note {note_id}: server error ({e.response.status_code})",
|
||||
)
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_notes_append_content(
|
||||
note_id: int, content: str, ctx: Context
|
||||
) -> AppendContentResponse | ErrorResponse:
|
||||
"""Append content to an existing note with a clear separator"""
|
||||
) -> AppendContentResponse:
|
||||
"""Append content to an existing note. The tool adds a `\n---\n`
|
||||
between the note and what will be appended."""
|
||||
|
||||
logger.info("Appending content to note %s", note_id)
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
try:
|
||||
@@ -159,27 +189,36 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
note_id=note_id, content=content
|
||||
)
|
||||
note = Note(**note_data)
|
||||
return AppendContentResponse(note=note)
|
||||
return AppendContentResponse(
|
||||
id=note.id, title=note.title, category=note.category, etag=note.etag
|
||||
)
|
||||
except HTTPStatusError as e:
|
||||
if e.response.status_code == 404:
|
||||
return ErrorResponse(error=f"Note {note_id} not found")
|
||||
raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
|
||||
elif e.response.status_code == 403:
|
||||
return ErrorResponse(
|
||||
error=f"Access denied: insufficient permissions to modify note {note_id}"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Access denied: insufficient permissions to modify note {note_id}",
|
||||
)
|
||||
)
|
||||
elif e.response.status_code == 413:
|
||||
return ErrorResponse(
|
||||
error="Content to append would make the note too large"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message="Content to append would make the note too large",
|
||||
)
|
||||
)
|
||||
else:
|
||||
return ErrorResponse(
|
||||
error=f"Failed to append content to note {note_id}: server error ({e.response.status_code})"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Failed to append content to note {note_id}: server error ({e.response.status_code})",
|
||||
)
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_notes_search_notes(
|
||||
query: str, ctx: Context
|
||||
) -> SearchNotesResponse | ErrorResponse:
|
||||
async def nc_notes_search_notes(query: str, ctx: Context) -> SearchNotesResponse:
|
||||
"""Search notes by title or content, returning only id, title, and category."""
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
try:
|
||||
@@ -201,20 +240,26 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
)
|
||||
except HTTPStatusError as e:
|
||||
if e.response.status_code == 403:
|
||||
return ErrorResponse(
|
||||
error="Access denied: insufficient permissions to search notes"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message="Access denied: insufficient permissions to search notes",
|
||||
)
|
||||
)
|
||||
elif e.response.status_code == 400:
|
||||
return ErrorResponse(error="Invalid search query format")
|
||||
raise McpError(
|
||||
ErrorData(code=-1, message="Invalid search query format")
|
||||
)
|
||||
else:
|
||||
return ErrorResponse(
|
||||
error=f"Search failed: server error ({e.response.status_code})"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Search failed: server error ({e.response.status_code})",
|
||||
)
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_notes_delete_note(
|
||||
note_id: int, ctx: Context
|
||||
) -> DeleteNoteResponse | ErrorResponse:
|
||||
async def nc_notes_delete_note(note_id: int, ctx: Context) -> DeleteNoteResponse:
|
||||
"""Delete a note permanently"""
|
||||
logger.info("Deleting note %s", note_id)
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
@@ -227,12 +272,18 @@ def configure_notes_tools(mcp: FastMCP):
|
||||
)
|
||||
except HTTPStatusError as e:
|
||||
if e.response.status_code == 404:
|
||||
return ErrorResponse(error=f"Note {note_id} not found")
|
||||
raise McpError(ErrorData(code=-1, message=f"Note {note_id} not found"))
|
||||
elif e.response.status_code == 403:
|
||||
return ErrorResponse(
|
||||
error=f"Access denied: insufficient permissions to delete note {note_id}"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Access denied: insufficient permissions to delete note {note_id}",
|
||||
)
|
||||
)
|
||||
else:
|
||||
return ErrorResponse(
|
||||
error=f"Failed to delete note {note_id}: server error ({e.response.status_code})"
|
||||
raise McpError(
|
||||
ErrorData(
|
||||
code=-1,
|
||||
message=f"Failed to delete note {note_id}: server error ({e.response.status_code})",
|
||||
)
|
||||
)
|
||||
|
||||
@@ -43,11 +43,11 @@ def configure_webdav_tools(mcp: FastMCP):
|
||||
Examples:
|
||||
# Read a text file
|
||||
result = await nc_webdav_read_file("Documents/readme.txt")
|
||||
print(result['content']) # Decoded text content
|
||||
logger.info(result['content']) # Decoded text content
|
||||
|
||||
# Read a binary file
|
||||
result = await nc_webdav_read_file("Images/photo.jpg")
|
||||
print(result['encoding']) # 'base64'
|
||||
logger.info(result['encoding']) # 'base64'
|
||||
"""
|
||||
client: NextcloudClient = ctx.request_context.lifespan_context.client
|
||||
content, content_type = await client.webdav.read_file(path)
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "nextcloud-mcp-server"
|
||||
version = "0.7.2"
|
||||
version = "0.8.2"
|
||||
description = ""
|
||||
authors = [
|
||||
{name = "Chris Coutinho",email = "chris@coutinho.io"}
|
||||
|
||||
@@ -20,23 +20,15 @@ async def test_missing_note_resource_error(nc_mcp_client: ClientSession):
|
||||
@pytest.mark.integration
|
||||
async def test_delete_missing_note_tool_error(nc_mcp_client: ClientSession):
|
||||
"""Test that deleting a non-existent note returns proper error."""
|
||||
# Try to delete a non-existent note
|
||||
# Try to delete a non-existent note - should return error response
|
||||
response = await nc_mcp_client.call_tool(
|
||||
"nc_notes_delete_note", {"note_id": 999999}
|
||||
)
|
||||
|
||||
logger.info(f"Delete missing note response: {response}")
|
||||
|
||||
# Should return structured error response with improved message
|
||||
# Should return error response (not raise exception) for tools
|
||||
assert response is not None
|
||||
assert (
|
||||
response.isError is False
|
||||
) # Tools now return structured responses, not MCP errors
|
||||
|
||||
# Check structured content for error
|
||||
assert "success" in response.structuredContent["result"]
|
||||
assert response.structuredContent["result"]["success"] is False
|
||||
assert "Note 999999 not found" in response.structuredContent["result"]["error"]
|
||||
assert response.isError is True
|
||||
assert "Note 999999 not found" in response.content[0].text
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@@ -81,7 +73,7 @@ async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_cl
|
||||
note_id = note_data["id"]
|
||||
|
||||
try:
|
||||
# Try to update with invalid ETag
|
||||
# Try to update with invalid ETag - should return error response
|
||||
response = await nc_mcp_client.call_tool(
|
||||
"nc_notes_update_note",
|
||||
{
|
||||
@@ -93,16 +85,10 @@ async def test_update_note_with_invalid_etag(nc_mcp_client: ClientSession, nc_cl
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(f"Invalid ETag response: {response}")
|
||||
|
||||
# Should return structured error response with improved message
|
||||
# Should return error response (not raise exception) for tools
|
||||
assert response is not None
|
||||
assert response.isError is False # Tools now return structured responses
|
||||
assert "success" in response.structuredContent["result"]
|
||||
assert response.structuredContent["result"]["success"] is False
|
||||
assert (
|
||||
"modified by someone else" in response.structuredContent["result"]["error"]
|
||||
)
|
||||
assert response.isError is True
|
||||
assert "modified by someone else" in response.content[0].text
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
|
||||
@@ -123,8 +123,11 @@ async def test_mcp_notes_crud_workflow(
|
||||
created_note = create_result.content[0].text
|
||||
note_data = json.loads(created_note) # Parse the returned JSON
|
||||
note_id = note_data["id"]
|
||||
create_etag = note_data["etag"] # Verify create response includes ETag
|
||||
|
||||
logger.info(f"Note created via MCP with ID: {note_id}")
|
||||
logger.info(f"Note created via MCP with ID: {note_id}, ETag: {create_etag}")
|
||||
assert "etag" in note_data, "Create response should include ETag"
|
||||
assert create_etag, "Create ETag should not be empty"
|
||||
|
||||
# 2. Verify creation via direct NextcloudClient
|
||||
direct_note = await nc_client.notes.get_note(note_id)
|
||||
@@ -165,6 +168,15 @@ async def test_mcp_notes_crud_workflow(
|
||||
f"MCP note update failed: {update_result.content}"
|
||||
)
|
||||
|
||||
# Verify update response includes new ETag
|
||||
updated_note_data = json.loads(update_result.content[0].text)
|
||||
update_etag = updated_note_data["etag"]
|
||||
|
||||
logger.info(f"Note updated via MCP, new ETag: {update_etag}")
|
||||
assert "etag" in updated_note_data, "Update response should include ETag"
|
||||
assert update_etag, "Update ETag should not be empty"
|
||||
assert update_etag != etag, "ETag should change after update"
|
||||
|
||||
# 5. Verify update via direct NextcloudClient
|
||||
updated_direct_note = await nc_client.notes.get_note(note_id)
|
||||
assert updated_direct_note["title"] == updated_title
|
||||
@@ -181,6 +193,15 @@ async def test_mcp_notes_crud_workflow(
|
||||
f"MCP note append failed: {append_result.content}"
|
||||
)
|
||||
|
||||
# Verify append response includes new ETag
|
||||
appended_note_data = json.loads(append_result.content[0].text)
|
||||
append_etag = appended_note_data["etag"]
|
||||
|
||||
logger.info(f"Content appended via MCP, new ETag: {append_etag}")
|
||||
assert "etag" in appended_note_data, "Append response should include ETag"
|
||||
assert append_etag, "Append ETag should not be empty"
|
||||
assert append_etag != update_etag, "ETag should change after append"
|
||||
|
||||
# 7. Verify append via direct NextcloudClient
|
||||
appended_direct_note = await nc_client.notes.get_note(note_id)
|
||||
assert append_content in appended_direct_note["content"]
|
||||
@@ -256,6 +277,82 @@ async def test_mcp_notes_crud_workflow(
|
||||
logger.warning(f"Failed to cleanup note: {e}")
|
||||
|
||||
|
||||
async def test_mcp_notes_etag_conflict(
|
||||
nc_mcp_client: ClientSession, nc_client: NextcloudClient
|
||||
):
|
||||
"""Test that MCP note updates fail when using stale ETags."""
|
||||
|
||||
unique_suffix = uuid.uuid4().hex[:8]
|
||||
test_title = f"ETag Test Note {unique_suffix}"
|
||||
test_content = f"This is test content for ETag testing {unique_suffix}"
|
||||
test_category = "ETTesting"
|
||||
|
||||
created_note = None
|
||||
|
||||
try:
|
||||
# 1. Create note via MCP
|
||||
logger.info(f"Creating note for ETag conflict test: {test_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"nc_notes_create_note",
|
||||
{"title": test_title, "content": test_content, "category": test_category},
|
||||
)
|
||||
|
||||
assert create_result.isError is False
|
||||
note_data = json.loads(create_result.content[0].text)
|
||||
note_id = note_data["id"]
|
||||
original_etag = note_data["etag"]
|
||||
created_note = note_data
|
||||
|
||||
# 2. Update note to change ETag
|
||||
first_update_result = await nc_mcp_client.call_tool(
|
||||
"nc_notes_update_note",
|
||||
{
|
||||
"note_id": note_id,
|
||||
"etag": original_etag,
|
||||
"title": f"First Update {test_title}",
|
||||
"content": test_content,
|
||||
"category": test_category,
|
||||
},
|
||||
)
|
||||
|
||||
assert first_update_result.isError is False
|
||||
updated_data = json.loads(first_update_result.content[0].text)
|
||||
new_etag = updated_data["etag"]
|
||||
assert new_etag != original_etag, "ETag should have changed after update"
|
||||
|
||||
# 3. Try to update with the stale (original) ETag - this should fail
|
||||
logger.info(f"Attempting update with stale ETag: {original_etag}")
|
||||
conflict_result = await nc_mcp_client.call_tool(
|
||||
"nc_notes_update_note",
|
||||
{
|
||||
"note_id": note_id,
|
||||
"etag": original_etag, # Use stale ETag
|
||||
"title": "This should fail",
|
||||
"content": "This update should be rejected",
|
||||
"category": test_category,
|
||||
},
|
||||
)
|
||||
|
||||
# 4. Verify the update was rejected with a 412 error
|
||||
# With McpError, tools now return proper error responses
|
||||
assert conflict_result.isError is True, "Update with stale ETag should fail"
|
||||
response_content = conflict_result.content[0].text
|
||||
assert "modified by someone else" in response_content, (
|
||||
f"Expected conflict error message, got: {response_content}"
|
||||
)
|
||||
|
||||
logger.info("Successfully verified ETag conflict handling via MCP")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if created_note is not None:
|
||||
try:
|
||||
await nc_client.notes.delete_note(created_note["id"])
|
||||
logger.info(f"Cleaned up test note {created_note['id']}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cleanup test note: {e}")
|
||||
|
||||
|
||||
async def test_mcp_webdav_workflow(
|
||||
nc_mcp_client: ClientSession, nc_client: NextcloudClient
|
||||
):
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
"""Unit tests for Pydantic models and serialization."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
|
||||
from nextcloud_mcp_server.models.base import BaseResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_timestamp_format_validation():
|
||||
"""Test that timestamps in BaseResponse are RFC3339 compliant for MCP validation.
|
||||
|
||||
This test should initially fail, demonstrating the timestamp validation error
|
||||
seen in MCP inspector. MCP expects RFC3339 format with timezone information.
|
||||
"""
|
||||
# Create a response object
|
||||
response = BaseResponse()
|
||||
|
||||
# Serialize to JSON (mimics what MCP inspector sees)
|
||||
json_str = response.model_dump_json()
|
||||
data = json.loads(json_str)
|
||||
|
||||
timestamp_str = data["timestamp"]
|
||||
|
||||
# RFC3339 regex pattern (what MCP expects)
|
||||
# Format: YYYY-MM-DDTHH:MM:SS[.ffffff][Z|±HH:MM]
|
||||
rfc3339_pattern = (
|
||||
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})$"
|
||||
)
|
||||
|
||||
# This assertion should FAIL with current implementation
|
||||
assert re.match(rfc3339_pattern, timestamp_str), (
|
||||
f"Timestamp '{timestamp_str}' is not RFC3339 compliant. "
|
||||
f"MCP expects format like '2025-08-30T19:22:58.862377Z' or '2025-08-30T19:22:58.862377+00:00'"
|
||||
)
|
||||
|
||||
|
||||
def test_base_response_timestamp_is_utc():
|
||||
"""Test that BaseResponse timestamps are in UTC timezone."""
|
||||
response = BaseResponse()
|
||||
|
||||
# The timestamp should be timezone-aware and in UTC
|
||||
assert response.timestamp.tzinfo is not None, (
|
||||
"Timestamp should have timezone information"
|
||||
)
|
||||
assert response.timestamp.tzinfo == timezone.utc, (
|
||||
"Timestamp should be in UTC timezone"
|
||||
)
|
||||
|
||||
|
||||
def test_serialized_timestamp_ends_with_z_or_offset():
|
||||
"""Test that serialized timestamps have proper timezone suffix."""
|
||||
response = BaseResponse()
|
||||
json_str = response.model_dump_json()
|
||||
data = json.loads(json_str)
|
||||
|
||||
timestamp_str = data["timestamp"]
|
||||
|
||||
# Should end with 'Z' (UTC) or timezone offset like '+00:00'
|
||||
assert timestamp_str.endswith("Z") or re.search(
|
||||
r"[+-]\d{2}:\d{2}$", timestamp_str
|
||||
), (
|
||||
f"Timestamp '{timestamp_str}' should end with 'Z' or timezone offset like '+00:00'"
|
||||
)
|
||||
|
||||
|
||||
def test_current_broken_format():
|
||||
"""Test showing the current broken timestamp format that causes MCP validation errors."""
|
||||
# This demonstrates what the current code produces
|
||||
current_naive_dt = datetime.now()
|
||||
current_format = current_naive_dt.isoformat()
|
||||
|
||||
# Show that current format lacks timezone info
|
||||
assert "Z" not in current_format
|
||||
assert "+" not in current_format
|
||||
assert "-" not in current_format[-6:] # Check last 6 chars for timezone
|
||||
|
||||
logger.info(f"Current broken format: {current_format}")
|
||||
logger.info(
|
||||
"This format causes MCP validation errors because it lacks timezone information"
|
||||
)
|
||||
Reference in New Issue
Block a user