feat(webdav): Add search and list favorite response tools

This commit is contained in:
Chris Coutinho
2025-10-18 21:57:36 +02:00
parent 240ceb3808
commit 6158a890af
8 changed files with 1200 additions and 63 deletions
+18
View File
@@ -149,6 +149,24 @@ Each Nextcloud app has a corresponding server module that:
4. **Context injection** - MCP context provides access to the authenticated client instance
5. **Modular design** - Each Nextcloud app is isolated in its own client/server pair
### MCP Response Patterns
**CRITICAL: Never return raw `List[Dict]` from MCP tools - always wrap in Pydantic response models**
FastMCP serialization issue: raw lists get mangled into dicts with numeric string keys.
**Pattern:**
1. Client methods return `List[Dict]` (raw data)
2. MCP tools convert to Pydantic models and wrap in response object
3. Response models inherit from `BaseResponse`, include `results` field + metadata
**Reference implementations:**
- `SearchNotesResponse` in `nextcloud_mcp_server/models/notes.py:80`
- `SearchFilesResponse` in `nextcloud_mcp_server/models/webdav.py:113`
- Tool examples: `nextcloud_mcp_server/server/{notes,webdav}.py`
**Testing:** Extract `data["results"]` from MCP responses, not `data` directly.
### Testing Structure
- **Integration tests** in `tests/client/` and `tests/server/` - Test real Nextcloud API interactions
+376
View File
@@ -570,3 +570,379 @@ class WebDAVClient(BaseNextcloudClient):
f"Unexpected error copying resource from '{source_path}' to '{destination_path}': {e}"
)
raise e
async def search_files(
self,
scope: str = "",
where_conditions: Optional[str] = None,
properties: Optional[List[str]] = None,
order_by: Optional[List[Tuple[str, str]]] = None,
limit: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Search for files using WebDAV SEARCH method (RFC 5323).
Args:
scope: Directory path to search in (empty string for user root)
where_conditions: XML string for where clause conditions
properties: List of property names to retrieve (defaults to basic set)
order_by: List of (property, direction) tuples for sorting, e.g. [("getlastmodified", "descending")]
limit: Maximum number of results to return
Returns:
List of file/directory dictionaries with requested properties
"""
# Default properties if not specified
if properties is None:
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
]
# Build the SEARCH request XML
search_body = self._build_search_xml(
scope=scope,
where_conditions=where_conditions,
properties=properties,
order_by=order_by,
limit=limit,
)
# The SEARCH endpoint is at the dav root
search_path = "/remote.php/dav/"
headers = {"Content-Type": "text/xml", "OCS-APIRequest": "true"}
logger.debug(f"Searching files in scope: {scope}")
try:
response = await self._make_request(
"SEARCH", search_path, content=search_body, headers=headers
)
response.raise_for_status()
# Parse the XML response
results = self._parse_search_response(response.content, scope)
logger.debug(f"Search returned {len(results)} results")
return results
except HTTPStatusError as e:
logger.error(f"HTTP error during search: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error during search: {e}")
raise e
def _build_search_xml(
self,
scope: str,
where_conditions: Optional[str],
properties: List[str],
order_by: Optional[List[Tuple[str, str]]],
limit: Optional[int],
) -> str:
"""Build the XML body for a SEARCH request."""
# Construct the scope path
username = self.username
scope_path = f"/files/{username}"
if scope:
scope_path = f"{scope_path}/{scope.lstrip('/')}"
# Build property list
prop_xml = "\n".join([self._property_to_xml(prop) for prop in properties])
# Build where clause
where_xml = where_conditions if where_conditions else ""
# Build order by clause
orderby_xml = ""
if order_by:
order_elements = []
for prop, direction in order_by:
prop_element = self._property_to_xml(prop)
dir_element = (
"<d:ascending/>"
if direction.lower() == "ascending"
else "<d:descending/>"
)
order_elements.append(f"<d:order>{prop_element}{dir_element}</d:order>")
orderby_xml = "\n".join(order_elements)
else:
orderby_xml = ""
# Build limit clause
limit_xml = (
f"<d:limit><d:nresults>{limit}</d:nresults></d:limit>" if limit else ""
)
# Construct the full SEARCH XML
search_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<d:searchrequest xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:basicsearch>
<d:select>
<d:prop>
{prop_xml}
</d:prop>
</d:select>
<d:from>
<d:scope>
<d:href>{scope_path}</d:href>
<d:depth>infinity</d:depth>
</d:scope>
</d:from>
<d:where>
{where_xml}
</d:where>
<d:orderby>
{orderby_xml}
</d:orderby>
{limit_xml}
</d:basicsearch>
</d:searchrequest>"""
return search_xml
def _property_to_xml(self, prop: str) -> str:
"""Convert a property name to its XML element."""
# Handle properties with namespace prefixes
if prop.startswith("{"):
# Already a full namespace
namespace_end = prop.index("}")
namespace = prop[1:namespace_end]
local_name = prop[namespace_end + 1 :]
# Map namespace URIs to prefixes
ns_map = {
"DAV:": "d",
"http://owncloud.org/ns": "oc",
"http://nextcloud.org/ns": "nc",
}
prefix = ns_map.get(namespace, "d")
return f"<{prefix}:{local_name}/>"
else:
# Guess namespace based on common properties
if prop in [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"quota-available-bytes",
"quota-used-bytes",
]:
return f"<d:{prop}/>"
elif prop in [
"fileid",
"size",
"permissions",
"favorite",
"tags",
"owner-id",
"owner-display-name",
"share-types",
"checksums",
"comments-count",
"comments-unread",
]:
return f"<oc:{prop}/>"
else:
# Assume nc namespace for newer properties
return f"<nc:{prop}/>"
def _parse_search_response(
self, xml_content: bytes, scope: str
) -> List[Dict[str, Any]]:
"""Parse the XML response from a SEARCH request."""
root = ET.fromstring(xml_content)
items = []
# Process each response element
responses = root.findall(".//{DAV:}response")
for response_elem in responses:
href = response_elem.find(".//{DAV:}href")
if href is None:
continue
# Extract file/directory path from href
href_text = href.text or ""
# Remove the /remote.php/dav/files/username/ prefix to get relative path
path_parts = href_text.split("/files/")
if len(path_parts) > 1:
# Get the path after username
path_after_user = "/".join(path_parts[1].split("/")[1:])
relative_path = path_after_user.rstrip("/")
else:
relative_path = href_text.rstrip("/").split("/")[-1]
# Get properties
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
continue
prop = propstat.find(".//{DAV:}prop")
if prop is None:
continue
# Build item dictionary
item = {"path": relative_path, "href": href_text}
# Extract all properties
for child in prop:
tag = child.tag
value = child.text
# Remove namespace from tag
if "}" in tag:
tag = tag.split("}", 1)[1]
# Handle special properties
if tag == "resourcetype":
item["is_directory"] = child.find(".//{DAV:}collection") is not None
elif tag == "getcontentlength":
item["size"] = int(value) if value else 0
elif tag == "displayname":
item["name"] = value
elif tag == "getcontenttype":
item["content_type"] = value
elif tag == "getlastmodified":
item["last_modified"] = value
elif tag == "getetag":
item["etag"] = value.strip('"') if value else None
elif tag == "fileid":
item["file_id"] = int(value) if value else None
elif tag == "favorite":
item["is_favorite"] = value == "1"
elif tag == "permissions":
item["permissions"] = value
elif tag == "size":
# oc:size includes folder sizes
item["total_size"] = int(value) if value else 0
else:
# Store other properties as-is
item[tag] = value
items.append(item)
return items
async def find_by_name(
self, pattern: str, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Find files by name pattern using LIKE matching.
Args:
pattern: Name pattern to search for (supports % wildcard)
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of matching files/directories
Examples:
# Find all .txt files
results = await find_by_name("%.txt")
# Find files starting with "report"
results = await find_by_name("report%")
"""
where_conditions = f"""
<d:like>
<d:prop>
<d:displayname/>
</d:prop>
<d:literal>{pattern}</d:literal>
</d:like>
"""
return await self.search_files(
scope=scope, where_conditions=where_conditions, limit=limit
)
async def find_by_type(
self, mime_type: str, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Find files by MIME type.
Args:
mime_type: MIME type to search for (supports % wildcard, e.g., "image/%")
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of matching files
Examples:
# Find all images
results = await find_by_type("image/%")
# Find all PDFs
results = await find_by_type("application/pdf")
"""
where_conditions = f"""
<d:like>
<d:prop>
<d:getcontenttype/>
</d:prop>
<d:literal>{mime_type}</d:literal>
</d:like>
"""
return await self.search_files(
scope=scope, where_conditions=where_conditions, limit=limit
)
async def list_favorites(
self, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""List all favorite files.
Args:
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of favorite files/directories
Examples:
# List all favorites
results = await list_favorites()
# List favorites in a specific folder
results = await list_favorites(scope="Documents")
"""
# Use REPORT method for favorites as it's more efficient
# But we can also use SEARCH as fallback
where_conditions = """
<d:eq>
<d:prop>
<oc:favorite/>
</d:prop>
<d:literal>1</d:literal>
</d:eq>
"""
# Request favorite property
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"fileid",
"favorite",
]
return await self.search_files(
scope=scope,
where_conditions=where_conditions,
properties=properties,
limit=limit,
)
+6
View File
@@ -65,11 +65,14 @@ from .tables import (
# WebDAV models
from .webdav import (
CopyResourceResponse,
CreateDirectoryResponse,
DeleteResourceResponse,
DirectoryListing,
FileInfo,
MoveResourceResponse,
ReadFileResponse,
SearchFilesResponse,
WriteFileResponse,
)
@@ -133,4 +136,7 @@ __all__ = [
"WriteFileResponse",
"CreateDirectoryResponse",
"DeleteResourceResponse",
"MoveResourceResponse",
"CopyResourceResponse",
"SearchFilesResponse",
]
+13
View File
@@ -22,6 +22,8 @@ class FileInfo(BaseModel):
None, description="Last modification time (ISO format)"
)
etag: Optional[str] = Field(None, description="ETag for versioning")
file_id: Optional[int] = Field(None, description="Nextcloud file ID")
is_favorite: Optional[bool] = Field(None, description="Whether file is favorited")
@property
def last_modified_datetime(self) -> Optional[datetime]:
@@ -106,3 +108,14 @@ class CopyResourceResponse(StatusResponse):
overwrite: bool = Field(
description="Whether the destination was overwritten if it existed"
)
class SearchFilesResponse(BaseResponse):
"""Response model for WebDAV search operations."""
results: List[FileInfo] = Field(description="Search results")
total_found: int = Field(description="Total number of files found")
scope: str = Field(description="The scope/path that was searched")
filters_applied: Optional[dict] = Field(
None, description="Filters that were applied to the search"
)
+191 -63
View File
@@ -3,6 +3,7 @@ import logging
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.context import get_client
from nextcloud_mcp_server.models import FileInfo, SearchFilesResponse
logger = logging.getLogger(__name__)
@@ -18,13 +19,6 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
List of items with metadata including name, path, is_directory, size, content_type, last_modified
Examples:
# List root directory
await nc_webdav_list_directory("")
# List a specific folder
await nc_webdav_list_directory("Documents/Projects")
"""
client = get_client(ctx)
return await client.webdav.list_directory(path)
@@ -39,15 +33,6 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
Dict with path, content, content_type, size, and encoding (if binary)
Text files are decoded to UTF-8, binary files are base64 encoded
Examples:
# Read a text file
result = await nc_webdav_read_file("Documents/readme.txt")
logger.info(result['content']) # Decoded text content
# Read a binary file
result = await nc_webdav_read_file("Images/photo.jpg")
logger.info(result['encoding']) # 'base64'
"""
client = get_client(ctx)
content, content_type = await client.webdav.read_file(path)
@@ -89,13 +74,6 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
Dict with status_code indicating success
Examples:
# Write a text file
await nc_webdav_write_file("Documents/notes.md", "# My Notes\nContent here...")
# Write binary data (base64 encoded)
await nc_webdav_write_file("files/data.bin", base64_content, "application/octet-stream;base64")
"""
client = get_client(ctx)
@@ -119,13 +97,6 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
Dict with status_code (201 for created, 405 if already exists)
Examples:
# Create a single directory
await nc_webdav_create_directory("NewProject")
# Create nested directories (parent must exist)
await nc_webdav_create_directory("Projects/MyApp/docs")
"""
client = get_client(ctx)
return await client.webdav.create_directory(path)
@@ -139,13 +110,6 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
Dict with status_code indicating result (404 if not found)
Examples:
# Delete a file
await nc_webdav_delete_resource("old_document.txt")
# Delete a directory (will delete all contents)
await nc_webdav_delete_resource("temp_folder")
"""
client = get_client(ctx)
return await client.webdav.delete_resource(path)
@@ -163,19 +127,6 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
Dict with status_code indicating result (404 if source not found, 412 if destination exists and overwrite is False)
Examples:
# Rename a file
await nc_webdav_move_resource("document.txt", "new_name.txt")
# Move a file to another directory
await nc_webdav_move_resource("document.txt", "Archive/document.txt")
# Move a directory
await nc_webdav_move_resource("Projects/OldProject", "Projects/NewProject")
# Move and overwrite if destination exists
await nc_webdav_move_resource("document.txt", "Archive/document.txt", overwrite=True)
"""
client = get_client(ctx)
return await client.webdav.move_resource(
@@ -195,21 +146,198 @@ def configure_webdav_tools(mcp: FastMCP):
Returns:
Dict with status_code indicating result (404 if source not found, 412 if destination exists and overwrite is False)
Examples:
# Copy a file
await nc_webdav_copy_resource("document.txt", "document_copy.txt")
# Copy a file to another directory
await nc_webdav_copy_resource("document.txt", "Backup/document.txt")
# Copy a directory
await nc_webdav_copy_resource("Projects/ProjectA", "Projects/ProjectA_Backup")
# Copy and overwrite if destination exists
await nc_webdav_copy_resource("document.txt", "Backup/document.txt", overwrite=True)
"""
client = get_client(ctx)
return await client.webdav.copy_resource(
source_path, destination_path, overwrite
)
@mcp.tool()
async def nc_webdav_search_files(
ctx: Context,
scope: str = "",
name_pattern: str | None = None,
mime_type: str | None = None,
only_favorites: bool = False,
limit: int | None = None,
) -> SearchFilesResponse:
"""Search for files in NextCloud using WebDAV SEARCH.
This is a high-level search tool that supports common search patterns.
For more complex queries, use the specific search tools.
Args:
scope: Directory path to search in (empty string for user root)
name_pattern: File name pattern (supports % wildcard, e.g., "%.txt" for all text files)
mime_type: MIME type to filter by (supports % wildcard, e.g., "image/%" for all images)
only_favorites: If True, only return favorited files
limit: Maximum number of results to return
Returns:
SearchFilesResponse with list of matching files
"""
client = get_client(ctx)
# Build where conditions based on filters
conditions = []
if name_pattern:
conditions.append(
f"""
<d:like>
<d:prop>
<d:displayname/>
</d:prop>
<d:literal>{name_pattern}</d:literal>
</d:like>
"""
)
if mime_type:
conditions.append(
f"""
<d:like>
<d:prop>
<d:getcontenttype/>
</d:prop>
<d:literal>{mime_type}</d:literal>
</d:like>
"""
)
if only_favorites:
conditions.append(
"""
<d:eq>
<d:prop>
<oc:favorite/>
</d:prop>
<d:literal>1</d:literal>
</d:eq>
"""
)
# Combine conditions with AND if multiple
if len(conditions) > 1:
where_conditions = f"""
<d:and>
{"".join(conditions)}
</d:and>
"""
elif len(conditions) == 1:
where_conditions = conditions[0]
else:
where_conditions = None
# Include extended properties
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"fileid",
"favorite",
]
results = await client.webdav.search_files(
scope=scope,
where_conditions=where_conditions,
properties=properties,
limit=limit,
)
# Convert to FileInfo models
file_infos = [FileInfo(**result) for result in results]
# Build filters applied dict
filters = {}
if name_pattern:
filters["name_pattern"] = name_pattern
if mime_type:
filters["mime_type"] = mime_type
if only_favorites:
filters["only_favorites"] = True
return SearchFilesResponse(
results=file_infos,
total_found=len(file_infos),
scope=scope,
filters_applied=filters if filters else None,
)
@mcp.tool()
async def nc_webdav_find_by_name(
pattern: str, ctx: Context, scope: str = "", limit: int | None = None
) -> SearchFilesResponse:
"""Find files by name pattern in NextCloud.
Args:
pattern: Name pattern to search for (supports % wildcard)
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
SearchFilesResponse with list of matching files
"""
client = get_client(ctx)
results = await client.webdav.find_by_name(
pattern=pattern, scope=scope, limit=limit
)
file_infos = [FileInfo(**result) for result in results]
return SearchFilesResponse(
results=file_infos,
total_found=len(file_infos),
scope=scope,
filters_applied={"name_pattern": pattern},
)
@mcp.tool()
async def nc_webdav_find_by_type(
mime_type: str, ctx: Context, scope: str = "", limit: int | None = None
) -> SearchFilesResponse:
"""Find files by MIME type in NextCloud.
Args:
mime_type: MIME type to search for (supports % wildcard)
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
SearchFilesResponse with list of matching files
"""
client = get_client(ctx)
results = await client.webdav.find_by_type(
mime_type=mime_type, scope=scope, limit=limit
)
file_infos = [FileInfo(**result) for result in results]
return SearchFilesResponse(
results=file_infos,
total_found=len(file_infos),
scope=scope,
filters_applied={"mime_type": mime_type},
)
@mcp.tool()
async def nc_webdav_list_favorites(
ctx: Context, scope: str = "", limit: int | None = None
) -> SearchFilesResponse:
"""List all favorite files in NextCloud.
Args:
scope: Directory path to search in (empty string for all favorites)
limit: Maximum number of results to return
Returns:
SearchFilesResponse with list of favorite files
"""
client = get_client(ctx)
results = await client.webdav.list_favorites(scope=scope, limit=limit)
file_infos = [FileInfo(**result) for result in results]
return SearchFilesResponse(
results=file_infos,
total_found=len(file_infos),
scope=scope,
filters_applied={"only_favorites": True},
)
+268
View File
@@ -0,0 +1,268 @@
"""Integration tests for WebDAV search operations."""
import logging
import uuid
import pytest
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
# Mark all tests in this module as integration tests
pytestmark = pytest.mark.integration
@pytest.fixture
async def test_search_setup(nc_client: NextcloudClient):
"""Create test files and directories for search testing."""
test_dir = f"mcp_search_test_{uuid.uuid4().hex[:8]}"
# Create base directory
await nc_client.webdav.create_directory(test_dir)
# Create various test files
test_files = [
# Text files
(f"{test_dir}/document1.txt", b"Sample document content", "text/plain"),
(f"{test_dir}/document2.txt", b"Another document", "text/plain"),
(f"{test_dir}/report.txt", b"Report content", "text/plain"),
# Markdown files
(f"{test_dir}/readme.md", b"# README\nMarkdown content", "text/markdown"),
(f"{test_dir}/notes.md", b"# Notes\nSome notes here", "text/markdown"),
# PDF (simulated as binary)
(
f"{test_dir}/presentation.pdf",
b"%PDF-1.4 fake pdf content",
"application/pdf",
),
# Subdirectory with files
(f"{test_dir}/subdir/nested.txt", b"Nested file content", "text/plain"),
]
# Create subdirectory
await nc_client.webdav.create_directory(f"{test_dir}/subdir")
# Write all test files
for file_path, content, content_type in test_files:
await nc_client.webdav.write_file(file_path, content, content_type)
logger.info(f"Created test directory with {len(test_files)} files: {test_dir}")
yield test_dir
# Cleanup
try:
await nc_client.webdav.delete_resource(test_dir)
logger.info(f"Cleaned up test directory: {test_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup test directory {test_dir}: {e}")
async def test_find_by_name_exact(nc_client: NextcloudClient, test_search_setup: str):
"""Test finding files by exact name."""
results = await nc_client.webdav.find_by_name("readme.md", scope=test_search_setup)
assert len(results) >= 1, "Should find at least one readme.md file"
# Check that we found the right file
readme_files = [r for r in results if r.get("name") == "readme.md"]
assert len(readme_files) >= 1, "Should find readme.md"
logger.info(f"Found {len(results)} files matching 'readme.md'")
async def test_find_by_name_wildcard_extension(
nc_client: NextcloudClient, test_search_setup: str
):
"""Test finding files by extension using wildcard."""
# Find all .txt files
results = await nc_client.webdav.find_by_name("%.txt", scope=test_search_setup)
assert len(results) >= 3, "Should find at least 3 .txt files"
# Verify all results are .txt files
for result in results:
name = result.get("name", "")
assert name.endswith(".txt"), f"Expected .txt file, got {name}"
logger.info(f"Found {len(results)} .txt files")
async def test_find_by_name_wildcard_prefix(
nc_client: NextcloudClient, test_search_setup: str
):
"""Test finding files by name prefix using wildcard."""
# Find all files starting with "document"
results = await nc_client.webdav.find_by_name("document%", scope=test_search_setup)
assert len(results) >= 2, "Should find at least 2 files starting with 'document'"
# Verify all results start with "document"
for result in results:
name = result.get("name", "")
assert name.startswith("document"), (
f"Expected name to start with 'document', got {name}"
)
logger.info(f"Found {len(results)} files starting with 'document'")
async def test_find_by_type_text(nc_client: NextcloudClient, test_search_setup: str):
"""Test finding files by MIME type (text files)."""
# Find all text files
results = await nc_client.webdav.find_by_type("text/%", scope=test_search_setup)
assert len(results) >= 5, "Should find at least 5 text files"
# Verify all results are text files
for result in results:
content_type = result.get("content_type", "")
assert content_type.startswith("text/"), (
f"Expected text/* type, got {content_type}"
)
logger.info(f"Found {len(results)} text files")
async def test_find_by_type_specific(
nc_client: NextcloudClient, test_search_setup: str
):
"""Test finding files by specific MIME type."""
# Find PDF files
results = await nc_client.webdav.find_by_type(
"application/pdf", scope=test_search_setup
)
assert len(results) >= 1, "Should find at least 1 PDF file"
# Verify result is PDF
for result in results:
content_type = result.get("content_type", "")
assert content_type == "application/pdf", (
f"Expected application/pdf, got {content_type}"
)
logger.info(f"Found {len(results)} PDF files")
async def test_search_with_limit(nc_client: NextcloudClient, test_search_setup: str):
"""Test search with result limit."""
# Search for .txt files with limit of 2
results = await nc_client.webdav.find_by_name(
"%.txt", scope=test_search_setup, limit=2
)
# Should return at most 2 results
assert len(results) <= 2, f"Should return at most 2 results, got {len(results)}"
assert len(results) > 0, "Should return at least 1 result"
logger.info(f"Found {len(results)} files with limit=2")
async def test_search_files_combined_filters(
nc_client: NextcloudClient, test_search_setup: str
):
"""Test search with multiple filters combined."""
# This test uses the search_files method directly to test combined conditions
# Search for .txt files that match a specific pattern
where_conditions = """
<d:and>
<d:like>
<d:prop>
<d:displayname/>
</d:prop>
<d:literal>%.txt</d:literal>
</d:like>
<d:like>
<d:prop>
<d:displayname/>
</d:prop>
<d:literal>document%</d:literal>
</d:like>
</d:and>
"""
results = await nc_client.webdav.search_files(
scope=test_search_setup, where_conditions=where_conditions
)
# Should find document1.txt and document2.txt
assert len(results) >= 2, "Should find at least 2 files matching both conditions"
# Verify results match both conditions
for result in results:
name = result.get("name", "")
assert name.endswith(".txt"), f"Expected .txt file, got {name}"
assert name.startswith("document"), (
f"Expected name to start with 'document', got {name}"
)
logger.info(f"Found {len(results)} files matching combined filters")
async def test_search_empty_scope(nc_client: NextcloudClient, test_search_setup: str):
"""Test search in empty scope (user root)."""
# Search entire user root for a unique filename
unique_name = "readme.md"
results = await nc_client.webdav.find_by_name(unique_name, scope="")
# Should find at least the one we created
assert len(results) >= 1, f"Should find at least 1 file named {unique_name}"
logger.info(f"Found {len(results)} files in root scope")
async def test_search_subdirectory(nc_client: NextcloudClient, test_search_setup: str):
"""Test search within a subdirectory."""
# Search in the subdir for the nested file
results = await nc_client.webdav.find_by_name(
"nested.txt", scope=f"{test_search_setup}/subdir"
)
assert len(results) >= 1, "Should find nested.txt in subdirectory"
# Verify the file path
nested_file = results[0]
assert "nested.txt" in nested_file.get("name", ""), "Should find nested.txt"
logger.info(f"Found file in subdirectory: {nested_file.get('name')}")
async def test_search_no_results(nc_client: NextcloudClient, test_search_setup: str):
"""Test search that returns no results."""
# Search for a non-existent pattern
results = await nc_client.webdav.find_by_name(
"nonexistent_file_xyz123.txt", scope=test_search_setup
)
assert len(results) == 0, "Should return empty results for non-existent file"
logger.info("Search correctly returned no results for non-existent file")
async def test_search_properties_returned(
nc_client: NextcloudClient, test_search_setup: str
):
"""Test that search returns expected properties."""
results = await nc_client.webdav.find_by_name("readme.md", scope=test_search_setup)
assert len(results) >= 1, "Should find at least one file"
result = results[0]
# Check for expected properties
assert "name" in result, "Should include name property"
assert "path" in result, "Should include path property"
assert "is_directory" in result, "Should include is_directory property"
assert result["is_directory"] is False, "readme.md should not be a directory"
# Optional properties that may be present
optional_props = ["size", "content_type", "last_modified", "etag"]
logger.info(f"Result properties: {list(result.keys())}")
# At least some optional properties should be present
present_optional = [prop for prop in optional_props if prop in result]
assert len(present_optional) > 0, f"Should have at least one of {optional_props}"
logger.info(f"Search returned properties: {list(result.keys())}")
+6
View File
@@ -40,6 +40,12 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession):
"nc_webdav_write_file",
"nc_webdav_create_directory",
"nc_webdav_delete_resource",
"nc_webdav_move_resource",
"nc_webdav_copy_resource",
"nc_webdav_search_files",
"nc_webdav_find_by_name",
"nc_webdav_find_by_type",
"nc_webdav_list_favorites",
"nc_calendar_list_calendars",
"nc_calendar_create_event",
"nc_calendar_list_events",
+322
View File
@@ -0,0 +1,322 @@
"""Integration tests for WebDAV search MCP tools."""
import json
import logging
import uuid
import pytest
from mcp import ClientSession
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.integration
def normalize_search_response(data):
"""Extract results list from SearchFilesResponse.
The response is a SearchFilesResponse with a 'results' field containing the list of files.
"""
if isinstance(data, dict) and "results" in data:
return data["results"]
else:
# Fallback for unexpected format
return []
@pytest.fixture
async def search_test_files(nc_client: NextcloudClient):
"""Create test files for WebDAV search testing via MCP."""
test_dir = f"mcp_webdav_search_{uuid.uuid4().hex[:8]}"
# Create base directory
await nc_client.webdav.create_directory(test_dir)
# Create various test files
test_files = [
# Text files
(f"{test_dir}/search_test1.txt", b"Sample document", "text/plain"),
(f"{test_dir}/search_test2.txt", b"Another document", "text/plain"),
(f"{test_dir}/search_report.txt", b"Report content", "text/plain"),
# Markdown files
(f"{test_dir}/search_readme.md", b"# README", "text/markdown"),
(f"{test_dir}/search_notes.md", b"# Notes", "text/markdown"),
# Images (simulated)
(f"{test_dir}/search_image.jpg", b"\xff\xd8\xff fake jpg", "image/jpeg"),
(f"{test_dir}/search_photo.png", b"\x89PNG fake png", "image/png"),
# PDF (simulated)
(f"{test_dir}/search_presentation.pdf", b"%PDF-1.4", "application/pdf"),
]
# Write all test files
for file_path, content, content_type in test_files:
await nc_client.webdav.write_file(file_path, content, content_type)
logger.info(f"Created {len(test_files)} test files in {test_dir}")
yield test_dir
# Cleanup
try:
await nc_client.webdav.delete_resource(test_dir)
logger.info(f"Cleaned up test directory: {test_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup {test_dir}: {e}")
async def test_nc_webdav_find_by_name(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_find_by_name MCP tool."""
# Find all .txt files in the test directory
result = await nc_mcp_client.call_tool(
"nc_webdav_find_by_name",
arguments={
"pattern": "search_%.txt",
"scope": search_test_files,
},
)
# Parse the result
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} files matching 'search_%.txt'")
# Should find at least 3 .txt files
assert len(files) >= 3, f"Expected at least 3 .txt files, got {len(files)}"
# Verify all results end with .txt
for file in files:
name = file.get("name", "")
assert name.endswith(".txt"), f"Expected .txt file, got {name}"
assert name.startswith("search_"), (
f"Expected name to start with 'search_', got {name}"
)
async def test_nc_webdav_find_by_name_with_limit(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_find_by_name with limit parameter."""
# Find files with limit
result = await nc_mcp_client.call_tool(
"nc_webdav_find_by_name",
arguments={
"pattern": "search_%.txt",
"scope": search_test_files,
"limit": 2,
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} files with limit=2")
# Should return at most 2 results
assert len(files) <= 2, f"Expected at most 2 files, got {len(files)}"
assert len(files) > 0, "Expected at least 1 file"
async def test_nc_webdav_find_by_type_images(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_find_by_type for images."""
# Find all images
result = await nc_mcp_client.call_tool(
"nc_webdav_find_by_type",
arguments={
"mime_type": "image/%",
"scope": search_test_files,
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} image files")
# Should find at least 2 image files (jpg and png)
assert len(files) >= 2, f"Expected at least 2 image files, got {len(files)}"
# Verify all results are images
for file in files:
content_type = file.get("content_type", "")
assert content_type.startswith("image/"), (
f"Expected image/* type, got {content_type}"
)
async def test_nc_webdav_find_by_type_specific(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_find_by_type for specific MIME type."""
# Find PDF files
result = await nc_mcp_client.call_tool(
"nc_webdav_find_by_type",
arguments={
"mime_type": "application/pdf",
"scope": search_test_files,
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} PDF files")
# Should find at least 1 PDF
assert len(files) >= 1, f"Expected at least 1 PDF file, got {len(files)}"
# Verify result is PDF
for file in files:
content_type = file.get("content_type", "")
assert content_type == "application/pdf", (
f"Expected application/pdf, got {content_type}"
)
async def test_nc_webdav_search_files_basic(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_search_files with basic filters."""
# Search for markdown files
result = await nc_mcp_client.call_tool(
"nc_webdav_search_files",
arguments={
"scope": search_test_files,
"name_pattern": "%.md",
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} markdown files")
# Should find at least 2 .md files
assert len(files) >= 2, f"Expected at least 2 .md files, got {len(files)}"
# Verify all results are .md files
for file in files:
name = file.get("name", "")
assert name.endswith(".md"), f"Expected .md file, got {name}"
async def test_nc_webdav_search_files_combined(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_search_files with combined filters."""
# Search for text files with specific name pattern
result = await nc_mcp_client.call_tool(
"nc_webdav_search_files",
arguments={
"scope": search_test_files,
"name_pattern": "search_test%.txt",
"mime_type": "text/plain",
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} files matching combined filters")
# Should find search_test1.txt and search_test2.txt
assert len(files) >= 2, f"Expected at least 2 files, got {len(files)}"
# Verify all results match both conditions
for file in files:
name = file.get("name", "")
content_type = file.get("content_type", "")
assert name.endswith(".txt"), f"Expected .txt file, got {name}"
assert name.startswith("search_test"), (
f"Expected 'search_test' prefix, got {name}"
)
assert content_type == "text/plain", f"Expected text/plain, got {content_type}"
async def test_nc_webdav_search_files_with_limit(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test nc_webdav_search_files with result limit."""
# Search with limit
result = await nc_mcp_client.call_tool(
"nc_webdav_search_files",
arguments={
"scope": search_test_files,
"name_pattern": "search_%",
"limit": 3,
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
logger.info(f"Found {len(files)} files with limit=3")
# Should return at most 3 results
assert len(files) <= 3, f"Expected at most 3 files, got {len(files)}"
assert len(files) > 0, "Expected at least 1 file"
async def test_nc_webdav_search_no_results(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test search that returns no results."""
# Search for non-existent pattern
result = await nc_mcp_client.call_tool(
"nc_webdav_find_by_name",
arguments={
"pattern": "nonexistent_xyz123.txt",
"scope": search_test_files,
},
)
# Handle case where empty results might return empty content
if result.content and len(result.content) > 0:
content = result.content[0].text
files = normalize_search_response(json.loads(content))
else:
files = []
logger.info("Search correctly returned no results")
# Should return empty array
assert len(files) == 0, f"Expected no results, got {len(files)}"
async def test_search_result_properties(
nc_mcp_client: ClientSession, search_test_files: str
):
"""Test that search results include expected properties."""
# Search for a specific file
result = await nc_mcp_client.call_tool(
"nc_webdav_find_by_name",
arguments={
"pattern": "search_readme.md",
"scope": search_test_files,
},
)
content = result.content[0].text
files = normalize_search_response(json.loads(content))
assert len(files) >= 1, "Should find at least one file"
file = files[0]
# Check for expected properties
assert "name" in file, "Should include name property"
assert "path" in file, "Should include path property"
assert "is_directory" in file, "Should include is_directory property"
assert file["is_directory"] is False, "File should not be a directory"
# Check for extended properties from search
extended_props = ["file_id", "etag", "size", "content_type", "last_modified"]
present_props = [prop for prop in extended_props if prop in file]
logger.info(f"Search result properties: {list(file.keys())}")
assert len(present_props) > 0, f"Should have at least one of {extended_props}"