diff --git a/CLAUDE.md b/CLAUDE.md index bea9f60..1911945 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -149,6 +149,24 @@ Each Nextcloud app has a corresponding server module that: 4. **Context injection** - MCP context provides access to the authenticated client instance 5. **Modular design** - Each Nextcloud app is isolated in its own client/server pair +### MCP Response Patterns + +**CRITICAL: Never return raw `List[Dict]` from MCP tools - always wrap in Pydantic response models** + +FastMCP serialization issue: raw lists get mangled into dicts with numeric string keys. + +**Pattern:** +1. Client methods return `List[Dict]` (raw data) +2. MCP tools convert to Pydantic models and wrap in response object +3. Response models inherit from `BaseResponse`, include `results` field + metadata + +**Reference implementations:** +- `SearchNotesResponse` in `nextcloud_mcp_server/models/notes.py:80` +- `SearchFilesResponse` in `nextcloud_mcp_server/models/webdav.py:113` +- Tool examples: `nextcloud_mcp_server/server/{notes,webdav}.py` + +**Testing:** Extract `data["results"]` from MCP responses, not `data` directly. + ### Testing Structure - **Integration tests** in `tests/client/` and `tests/server/` - Test real Nextcloud API interactions diff --git a/nextcloud_mcp_server/client/webdav.py b/nextcloud_mcp_server/client/webdav.py index 6907286..b2755ce 100644 --- a/nextcloud_mcp_server/client/webdav.py +++ b/nextcloud_mcp_server/client/webdav.py @@ -570,3 +570,379 @@ class WebDAVClient(BaseNextcloudClient): f"Unexpected error copying resource from '{source_path}' to '{destination_path}': {e}" ) raise e + + async def search_files( + self, + scope: str = "", + where_conditions: Optional[str] = None, + properties: Optional[List[str]] = None, + order_by: Optional[List[Tuple[str, str]]] = None, + limit: Optional[int] = None, + ) -> List[Dict[str, Any]]: + """Search for files using WebDAV SEARCH method (RFC 5323). + + Args: + scope: Directory path to search in (empty string for user root) + where_conditions: XML string for where clause conditions + properties: List of property names to retrieve (defaults to basic set) + order_by: List of (property, direction) tuples for sorting, e.g. [("getlastmodified", "descending")] + limit: Maximum number of results to return + + Returns: + List of file/directory dictionaries with requested properties + """ + # Default properties if not specified + if properties is None: + properties = [ + "displayname", + "getcontentlength", + "getcontenttype", + "getlastmodified", + "resourcetype", + "getetag", + ] + + # Build the SEARCH request XML + search_body = self._build_search_xml( + scope=scope, + where_conditions=where_conditions, + properties=properties, + order_by=order_by, + limit=limit, + ) + + # The SEARCH endpoint is at the dav root + search_path = "/remote.php/dav/" + + headers = {"Content-Type": "text/xml", "OCS-APIRequest": "true"} + + logger.debug(f"Searching files in scope: {scope}") + + try: + response = await self._make_request( + "SEARCH", search_path, content=search_body, headers=headers + ) + response.raise_for_status() + + # Parse the XML response + results = self._parse_search_response(response.content, scope) + + logger.debug(f"Search returned {len(results)} results") + return results + + except HTTPStatusError as e: + logger.error(f"HTTP error during search: {e}") + raise e + except Exception as e: + logger.error(f"Unexpected error during search: {e}") + raise e + + def _build_search_xml( + self, + scope: str, + where_conditions: Optional[str], + properties: List[str], + order_by: Optional[List[Tuple[str, str]]], + limit: Optional[int], + ) -> str: + """Build the XML body for a SEARCH request.""" + # Construct the scope path + username = self.username + scope_path = f"/files/{username}" + if scope: + scope_path = f"{scope_path}/{scope.lstrip('/')}" + + # Build property list + prop_xml = "\n".join([self._property_to_xml(prop) for prop in properties]) + + # Build where clause + where_xml = where_conditions if where_conditions else "" + + # Build order by clause + orderby_xml = "" + if order_by: + order_elements = [] + for prop, direction in order_by: + prop_element = self._property_to_xml(prop) + dir_element = ( + "" + if direction.lower() == "ascending" + else "" + ) + order_elements.append(f"{prop_element}{dir_element}") + orderby_xml = "\n".join(order_elements) + else: + orderby_xml = "" + + # Build limit clause + limit_xml = ( + f"{limit}" if limit else "" + ) + + # Construct the full SEARCH XML + search_xml = f""" + + + + + {prop_xml} + + + + + {scope_path} + infinity + + + + {where_xml} + + + {orderby_xml} + + {limit_xml} + +""" + + return search_xml + + def _property_to_xml(self, prop: str) -> str: + """Convert a property name to its XML element.""" + # Handle properties with namespace prefixes + if prop.startswith("{"): + # Already a full namespace + namespace_end = prop.index("}") + namespace = prop[1:namespace_end] + local_name = prop[namespace_end + 1 :] + + # Map namespace URIs to prefixes + ns_map = { + "DAV:": "d", + "http://owncloud.org/ns": "oc", + "http://nextcloud.org/ns": "nc", + } + + prefix = ns_map.get(namespace, "d") + return f"<{prefix}:{local_name}/>" + else: + # Guess namespace based on common properties + if prop in [ + "displayname", + "getcontentlength", + "getcontenttype", + "getlastmodified", + "resourcetype", + "getetag", + "quota-available-bytes", + "quota-used-bytes", + ]: + return f"" + elif prop in [ + "fileid", + "size", + "permissions", + "favorite", + "tags", + "owner-id", + "owner-display-name", + "share-types", + "checksums", + "comments-count", + "comments-unread", + ]: + return f"" + else: + # Assume nc namespace for newer properties + return f"" + + def _parse_search_response( + self, xml_content: bytes, scope: str + ) -> List[Dict[str, Any]]: + """Parse the XML response from a SEARCH request.""" + root = ET.fromstring(xml_content) + items = [] + + # Process each response element + responses = root.findall(".//{DAV:}response") + + for response_elem in responses: + href = response_elem.find(".//{DAV:}href") + if href is None: + continue + + # Extract file/directory path from href + href_text = href.text or "" + # Remove the /remote.php/dav/files/username/ prefix to get relative path + path_parts = href_text.split("/files/") + if len(path_parts) > 1: + # Get the path after username + path_after_user = "/".join(path_parts[1].split("/")[1:]) + relative_path = path_after_user.rstrip("/") + else: + relative_path = href_text.rstrip("/").split("/")[-1] + + # Get properties + propstat = response_elem.find(".//{DAV:}propstat") + if propstat is None: + continue + + prop = propstat.find(".//{DAV:}prop") + if prop is None: + continue + + # Build item dictionary + item = {"path": relative_path, "href": href_text} + + # Extract all properties + for child in prop: + tag = child.tag + value = child.text + + # Remove namespace from tag + if "}" in tag: + tag = tag.split("}", 1)[1] + + # Handle special properties + if tag == "resourcetype": + item["is_directory"] = child.find(".//{DAV:}collection") is not None + elif tag == "getcontentlength": + item["size"] = int(value) if value else 0 + elif tag == "displayname": + item["name"] = value + elif tag == "getcontenttype": + item["content_type"] = value + elif tag == "getlastmodified": + item["last_modified"] = value + elif tag == "getetag": + item["etag"] = value.strip('"') if value else None + elif tag == "fileid": + item["file_id"] = int(value) if value else None + elif tag == "favorite": + item["is_favorite"] = value == "1" + elif tag == "permissions": + item["permissions"] = value + elif tag == "size": + # oc:size includes folder sizes + item["total_size"] = int(value) if value else 0 + else: + # Store other properties as-is + item[tag] = value + + items.append(item) + + return items + + async def find_by_name( + self, pattern: str, scope: str = "", limit: Optional[int] = None + ) -> List[Dict[str, Any]]: + """Find files by name pattern using LIKE matching. + + Args: + pattern: Name pattern to search for (supports % wildcard) + scope: Directory path to search in (empty string for user root) + limit: Maximum number of results to return + + Returns: + List of matching files/directories + + Examples: + # Find all .txt files + results = await find_by_name("%.txt") + + # Find files starting with "report" + results = await find_by_name("report%") + """ + where_conditions = f""" + + + + + {pattern} + + """ + + return await self.search_files( + scope=scope, where_conditions=where_conditions, limit=limit + ) + + async def find_by_type( + self, mime_type: str, scope: str = "", limit: Optional[int] = None + ) -> List[Dict[str, Any]]: + """Find files by MIME type. + + Args: + mime_type: MIME type to search for (supports % wildcard, e.g., "image/%") + scope: Directory path to search in (empty string for user root) + limit: Maximum number of results to return + + Returns: + List of matching files + + Examples: + # Find all images + results = await find_by_type("image/%") + + # Find all PDFs + results = await find_by_type("application/pdf") + """ + where_conditions = f""" + + + + + {mime_type} + + """ + + return await self.search_files( + scope=scope, where_conditions=where_conditions, limit=limit + ) + + async def list_favorites( + self, scope: str = "", limit: Optional[int] = None + ) -> List[Dict[str, Any]]: + """List all favorite files. + + Args: + scope: Directory path to search in (empty string for user root) + limit: Maximum number of results to return + + Returns: + List of favorite files/directories + + Examples: + # List all favorites + results = await list_favorites() + + # List favorites in a specific folder + results = await list_favorites(scope="Documents") + """ + # Use REPORT method for favorites as it's more efficient + # But we can also use SEARCH as fallback + where_conditions = """ + + + + + 1 + + """ + + # Request favorite property + properties = [ + "displayname", + "getcontentlength", + "getcontenttype", + "getlastmodified", + "resourcetype", + "getetag", + "fileid", + "favorite", + ] + + return await self.search_files( + scope=scope, + where_conditions=where_conditions, + properties=properties, + limit=limit, + ) diff --git a/nextcloud_mcp_server/models/__init__.py b/nextcloud_mcp_server/models/__init__.py index 55bf208..7af6e4a 100644 --- a/nextcloud_mcp_server/models/__init__.py +++ b/nextcloud_mcp_server/models/__init__.py @@ -65,11 +65,14 @@ from .tables import ( # WebDAV models from .webdav import ( + CopyResourceResponse, CreateDirectoryResponse, DeleteResourceResponse, DirectoryListing, FileInfo, + MoveResourceResponse, ReadFileResponse, + SearchFilesResponse, WriteFileResponse, ) @@ -133,4 +136,7 @@ __all__ = [ "WriteFileResponse", "CreateDirectoryResponse", "DeleteResourceResponse", + "MoveResourceResponse", + "CopyResourceResponse", + "SearchFilesResponse", ] diff --git a/nextcloud_mcp_server/models/webdav.py b/nextcloud_mcp_server/models/webdav.py index c85e2a8..1008429 100644 --- a/nextcloud_mcp_server/models/webdav.py +++ b/nextcloud_mcp_server/models/webdav.py @@ -22,6 +22,8 @@ class FileInfo(BaseModel): None, description="Last modification time (ISO format)" ) etag: Optional[str] = Field(None, description="ETag for versioning") + file_id: Optional[int] = Field(None, description="Nextcloud file ID") + is_favorite: Optional[bool] = Field(None, description="Whether file is favorited") @property def last_modified_datetime(self) -> Optional[datetime]: @@ -106,3 +108,14 @@ class CopyResourceResponse(StatusResponse): overwrite: bool = Field( description="Whether the destination was overwritten if it existed" ) + + +class SearchFilesResponse(BaseResponse): + """Response model for WebDAV search operations.""" + + results: List[FileInfo] = Field(description="Search results") + total_found: int = Field(description="Total number of files found") + scope: str = Field(description="The scope/path that was searched") + filters_applied: Optional[dict] = Field( + None, description="Filters that were applied to the search" + ) diff --git a/nextcloud_mcp_server/server/webdav.py b/nextcloud_mcp_server/server/webdav.py index 6241ef6..2a2fd08 100644 --- a/nextcloud_mcp_server/server/webdav.py +++ b/nextcloud_mcp_server/server/webdav.py @@ -3,6 +3,7 @@ import logging from mcp.server.fastmcp import Context, FastMCP from nextcloud_mcp_server.context import get_client +from nextcloud_mcp_server.models import FileInfo, SearchFilesResponse logger = logging.getLogger(__name__) @@ -18,13 +19,6 @@ def configure_webdav_tools(mcp: FastMCP): Returns: List of items with metadata including name, path, is_directory, size, content_type, last_modified - - Examples: - # List root directory - await nc_webdav_list_directory("") - - # List a specific folder - await nc_webdav_list_directory("Documents/Projects") """ client = get_client(ctx) return await client.webdav.list_directory(path) @@ -39,15 +33,6 @@ def configure_webdav_tools(mcp: FastMCP): Returns: Dict with path, content, content_type, size, and encoding (if binary) Text files are decoded to UTF-8, binary files are base64 encoded - - Examples: - # Read a text file - result = await nc_webdav_read_file("Documents/readme.txt") - logger.info(result['content']) # Decoded text content - - # Read a binary file - result = await nc_webdav_read_file("Images/photo.jpg") - logger.info(result['encoding']) # 'base64' """ client = get_client(ctx) content, content_type = await client.webdav.read_file(path) @@ -89,13 +74,6 @@ def configure_webdav_tools(mcp: FastMCP): Returns: Dict with status_code indicating success - - Examples: - # Write a text file - await nc_webdav_write_file("Documents/notes.md", "# My Notes\nContent here...") - - # Write binary data (base64 encoded) - await nc_webdav_write_file("files/data.bin", base64_content, "application/octet-stream;base64") """ client = get_client(ctx) @@ -119,13 +97,6 @@ def configure_webdav_tools(mcp: FastMCP): Returns: Dict with status_code (201 for created, 405 if already exists) - - Examples: - # Create a single directory - await nc_webdav_create_directory("NewProject") - - # Create nested directories (parent must exist) - await nc_webdav_create_directory("Projects/MyApp/docs") """ client = get_client(ctx) return await client.webdav.create_directory(path) @@ -139,13 +110,6 @@ def configure_webdav_tools(mcp: FastMCP): Returns: Dict with status_code indicating result (404 if not found) - - Examples: - # Delete a file - await nc_webdav_delete_resource("old_document.txt") - - # Delete a directory (will delete all contents) - await nc_webdav_delete_resource("temp_folder") """ client = get_client(ctx) return await client.webdav.delete_resource(path) @@ -163,19 +127,6 @@ def configure_webdav_tools(mcp: FastMCP): Returns: Dict with status_code indicating result (404 if source not found, 412 if destination exists and overwrite is False) - - Examples: - # Rename a file - await nc_webdav_move_resource("document.txt", "new_name.txt") - - # Move a file to another directory - await nc_webdav_move_resource("document.txt", "Archive/document.txt") - - # Move a directory - await nc_webdav_move_resource("Projects/OldProject", "Projects/NewProject") - - # Move and overwrite if destination exists - await nc_webdav_move_resource("document.txt", "Archive/document.txt", overwrite=True) """ client = get_client(ctx) return await client.webdav.move_resource( @@ -195,21 +146,198 @@ def configure_webdav_tools(mcp: FastMCP): Returns: Dict with status_code indicating result (404 if source not found, 412 if destination exists and overwrite is False) - - Examples: - # Copy a file - await nc_webdav_copy_resource("document.txt", "document_copy.txt") - - # Copy a file to another directory - await nc_webdav_copy_resource("document.txt", "Backup/document.txt") - - # Copy a directory - await nc_webdav_copy_resource("Projects/ProjectA", "Projects/ProjectA_Backup") - - # Copy and overwrite if destination exists - await nc_webdav_copy_resource("document.txt", "Backup/document.txt", overwrite=True) """ client = get_client(ctx) return await client.webdav.copy_resource( source_path, destination_path, overwrite ) + + @mcp.tool() + async def nc_webdav_search_files( + ctx: Context, + scope: str = "", + name_pattern: str | None = None, + mime_type: str | None = None, + only_favorites: bool = False, + limit: int | None = None, + ) -> SearchFilesResponse: + """Search for files in NextCloud using WebDAV SEARCH. + + This is a high-level search tool that supports common search patterns. + For more complex queries, use the specific search tools. + + Args: + scope: Directory path to search in (empty string for user root) + name_pattern: File name pattern (supports % wildcard, e.g., "%.txt" for all text files) + mime_type: MIME type to filter by (supports % wildcard, e.g., "image/%" for all images) + only_favorites: If True, only return favorited files + limit: Maximum number of results to return + + Returns: + SearchFilesResponse with list of matching files + """ + client = get_client(ctx) + + # Build where conditions based on filters + conditions = [] + + if name_pattern: + conditions.append( + f""" + + + + + {name_pattern} + + """ + ) + + if mime_type: + conditions.append( + f""" + + + + + {mime_type} + + """ + ) + + if only_favorites: + conditions.append( + """ + + + + + 1 + + """ + ) + + # Combine conditions with AND if multiple + if len(conditions) > 1: + where_conditions = f""" + + {"".join(conditions)} + + """ + elif len(conditions) == 1: + where_conditions = conditions[0] + else: + where_conditions = None + + # Include extended properties + properties = [ + "displayname", + "getcontentlength", + "getcontenttype", + "getlastmodified", + "resourcetype", + "getetag", + "fileid", + "favorite", + ] + + results = await client.webdav.search_files( + scope=scope, + where_conditions=where_conditions, + properties=properties, + limit=limit, + ) + + # Convert to FileInfo models + file_infos = [FileInfo(**result) for result in results] + + # Build filters applied dict + filters = {} + if name_pattern: + filters["name_pattern"] = name_pattern + if mime_type: + filters["mime_type"] = mime_type + if only_favorites: + filters["only_favorites"] = True + + return SearchFilesResponse( + results=file_infos, + total_found=len(file_infos), + scope=scope, + filters_applied=filters if filters else None, + ) + + @mcp.tool() + async def nc_webdav_find_by_name( + pattern: str, ctx: Context, scope: str = "", limit: int | None = None + ) -> SearchFilesResponse: + """Find files by name pattern in NextCloud. + + Args: + pattern: Name pattern to search for (supports % wildcard) + scope: Directory path to search in (empty string for user root) + limit: Maximum number of results to return + + Returns: + SearchFilesResponse with list of matching files + """ + client = get_client(ctx) + results = await client.webdav.find_by_name( + pattern=pattern, scope=scope, limit=limit + ) + file_infos = [FileInfo(**result) for result in results] + return SearchFilesResponse( + results=file_infos, + total_found=len(file_infos), + scope=scope, + filters_applied={"name_pattern": pattern}, + ) + + @mcp.tool() + async def nc_webdav_find_by_type( + mime_type: str, ctx: Context, scope: str = "", limit: int | None = None + ) -> SearchFilesResponse: + """Find files by MIME type in NextCloud. + + Args: + mime_type: MIME type to search for (supports % wildcard) + scope: Directory path to search in (empty string for user root) + limit: Maximum number of results to return + + Returns: + SearchFilesResponse with list of matching files + """ + client = get_client(ctx) + results = await client.webdav.find_by_type( + mime_type=mime_type, scope=scope, limit=limit + ) + file_infos = [FileInfo(**result) for result in results] + return SearchFilesResponse( + results=file_infos, + total_found=len(file_infos), + scope=scope, + filters_applied={"mime_type": mime_type}, + ) + + @mcp.tool() + async def nc_webdav_list_favorites( + ctx: Context, scope: str = "", limit: int | None = None + ) -> SearchFilesResponse: + """List all favorite files in NextCloud. + + Args: + scope: Directory path to search in (empty string for all favorites) + limit: Maximum number of results to return + + Returns: + SearchFilesResponse with list of favorite files + """ + client = get_client(ctx) + results = await client.webdav.list_favorites(scope=scope, limit=limit) + file_infos = [FileInfo(**result) for result in results] + return SearchFilesResponse( + results=file_infos, + total_found=len(file_infos), + scope=scope, + filters_applied={"only_favorites": True}, + ) diff --git a/tests/client/webdav/test_webdav_search.py b/tests/client/webdav/test_webdav_search.py new file mode 100644 index 0000000..81cd83e --- /dev/null +++ b/tests/client/webdav/test_webdav_search.py @@ -0,0 +1,268 @@ +"""Integration tests for WebDAV search operations.""" + +import logging +import uuid + +import pytest + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) + +# Mark all tests in this module as integration tests +pytestmark = pytest.mark.integration + + +@pytest.fixture +async def test_search_setup(nc_client: NextcloudClient): + """Create test files and directories for search testing.""" + test_dir = f"mcp_search_test_{uuid.uuid4().hex[:8]}" + + # Create base directory + await nc_client.webdav.create_directory(test_dir) + + # Create various test files + test_files = [ + # Text files + (f"{test_dir}/document1.txt", b"Sample document content", "text/plain"), + (f"{test_dir}/document2.txt", b"Another document", "text/plain"), + (f"{test_dir}/report.txt", b"Report content", "text/plain"), + # Markdown files + (f"{test_dir}/readme.md", b"# README\nMarkdown content", "text/markdown"), + (f"{test_dir}/notes.md", b"# Notes\nSome notes here", "text/markdown"), + # PDF (simulated as binary) + ( + f"{test_dir}/presentation.pdf", + b"%PDF-1.4 fake pdf content", + "application/pdf", + ), + # Subdirectory with files + (f"{test_dir}/subdir/nested.txt", b"Nested file content", "text/plain"), + ] + + # Create subdirectory + await nc_client.webdav.create_directory(f"{test_dir}/subdir") + + # Write all test files + for file_path, content, content_type in test_files: + await nc_client.webdav.write_file(file_path, content, content_type) + + logger.info(f"Created test directory with {len(test_files)} files: {test_dir}") + + yield test_dir + + # Cleanup + try: + await nc_client.webdav.delete_resource(test_dir) + logger.info(f"Cleaned up test directory: {test_dir}") + except Exception as e: + logger.warning(f"Failed to cleanup test directory {test_dir}: {e}") + + +async def test_find_by_name_exact(nc_client: NextcloudClient, test_search_setup: str): + """Test finding files by exact name.""" + results = await nc_client.webdav.find_by_name("readme.md", scope=test_search_setup) + + assert len(results) >= 1, "Should find at least one readme.md file" + + # Check that we found the right file + readme_files = [r for r in results if r.get("name") == "readme.md"] + assert len(readme_files) >= 1, "Should find readme.md" + + logger.info(f"Found {len(results)} files matching 'readme.md'") + + +async def test_find_by_name_wildcard_extension( + nc_client: NextcloudClient, test_search_setup: str +): + """Test finding files by extension using wildcard.""" + # Find all .txt files + results = await nc_client.webdav.find_by_name("%.txt", scope=test_search_setup) + + assert len(results) >= 3, "Should find at least 3 .txt files" + + # Verify all results are .txt files + for result in results: + name = result.get("name", "") + assert name.endswith(".txt"), f"Expected .txt file, got {name}" + + logger.info(f"Found {len(results)} .txt files") + + +async def test_find_by_name_wildcard_prefix( + nc_client: NextcloudClient, test_search_setup: str +): + """Test finding files by name prefix using wildcard.""" + # Find all files starting with "document" + results = await nc_client.webdav.find_by_name("document%", scope=test_search_setup) + + assert len(results) >= 2, "Should find at least 2 files starting with 'document'" + + # Verify all results start with "document" + for result in results: + name = result.get("name", "") + assert name.startswith("document"), ( + f"Expected name to start with 'document', got {name}" + ) + + logger.info(f"Found {len(results)} files starting with 'document'") + + +async def test_find_by_type_text(nc_client: NextcloudClient, test_search_setup: str): + """Test finding files by MIME type (text files).""" + # Find all text files + results = await nc_client.webdav.find_by_type("text/%", scope=test_search_setup) + + assert len(results) >= 5, "Should find at least 5 text files" + + # Verify all results are text files + for result in results: + content_type = result.get("content_type", "") + assert content_type.startswith("text/"), ( + f"Expected text/* type, got {content_type}" + ) + + logger.info(f"Found {len(results)} text files") + + +async def test_find_by_type_specific( + nc_client: NextcloudClient, test_search_setup: str +): + """Test finding files by specific MIME type.""" + # Find PDF files + results = await nc_client.webdav.find_by_type( + "application/pdf", scope=test_search_setup + ) + + assert len(results) >= 1, "Should find at least 1 PDF file" + + # Verify result is PDF + for result in results: + content_type = result.get("content_type", "") + assert content_type == "application/pdf", ( + f"Expected application/pdf, got {content_type}" + ) + + logger.info(f"Found {len(results)} PDF files") + + +async def test_search_with_limit(nc_client: NextcloudClient, test_search_setup: str): + """Test search with result limit.""" + # Search for .txt files with limit of 2 + results = await nc_client.webdav.find_by_name( + "%.txt", scope=test_search_setup, limit=2 + ) + + # Should return at most 2 results + assert len(results) <= 2, f"Should return at most 2 results, got {len(results)}" + assert len(results) > 0, "Should return at least 1 result" + + logger.info(f"Found {len(results)} files with limit=2") + + +async def test_search_files_combined_filters( + nc_client: NextcloudClient, test_search_setup: str +): + """Test search with multiple filters combined.""" + # This test uses the search_files method directly to test combined conditions + # Search for .txt files that match a specific pattern + where_conditions = """ + + + + + + %.txt + + + + + + document% + + + """ + + results = await nc_client.webdav.search_files( + scope=test_search_setup, where_conditions=where_conditions + ) + + # Should find document1.txt and document2.txt + assert len(results) >= 2, "Should find at least 2 files matching both conditions" + + # Verify results match both conditions + for result in results: + name = result.get("name", "") + assert name.endswith(".txt"), f"Expected .txt file, got {name}" + assert name.startswith("document"), ( + f"Expected name to start with 'document', got {name}" + ) + + logger.info(f"Found {len(results)} files matching combined filters") + + +async def test_search_empty_scope(nc_client: NextcloudClient, test_search_setup: str): + """Test search in empty scope (user root).""" + # Search entire user root for a unique filename + unique_name = "readme.md" + results = await nc_client.webdav.find_by_name(unique_name, scope="") + + # Should find at least the one we created + assert len(results) >= 1, f"Should find at least 1 file named {unique_name}" + + logger.info(f"Found {len(results)} files in root scope") + + +async def test_search_subdirectory(nc_client: NextcloudClient, test_search_setup: str): + """Test search within a subdirectory.""" + # Search in the subdir for the nested file + results = await nc_client.webdav.find_by_name( + "nested.txt", scope=f"{test_search_setup}/subdir" + ) + + assert len(results) >= 1, "Should find nested.txt in subdirectory" + + # Verify the file path + nested_file = results[0] + assert "nested.txt" in nested_file.get("name", ""), "Should find nested.txt" + + logger.info(f"Found file in subdirectory: {nested_file.get('name')}") + + +async def test_search_no_results(nc_client: NextcloudClient, test_search_setup: str): + """Test search that returns no results.""" + # Search for a non-existent pattern + results = await nc_client.webdav.find_by_name( + "nonexistent_file_xyz123.txt", scope=test_search_setup + ) + + assert len(results) == 0, "Should return empty results for non-existent file" + + logger.info("Search correctly returned no results for non-existent file") + + +async def test_search_properties_returned( + nc_client: NextcloudClient, test_search_setup: str +): + """Test that search returns expected properties.""" + results = await nc_client.webdav.find_by_name("readme.md", scope=test_search_setup) + + assert len(results) >= 1, "Should find at least one file" + + result = results[0] + + # Check for expected properties + assert "name" in result, "Should include name property" + assert "path" in result, "Should include path property" + assert "is_directory" in result, "Should include is_directory property" + assert result["is_directory"] is False, "readme.md should not be a directory" + + # Optional properties that may be present + optional_props = ["size", "content_type", "last_modified", "etag"] + logger.info(f"Result properties: {list(result.keys())}") + + # At least some optional properties should be present + present_optional = [prop for prop in optional_props if prop in result] + assert len(present_optional) > 0, f"Should have at least one of {optional_props}" + + logger.info(f"Search returned properties: {list(result.keys())}") diff --git a/tests/server/test_mcp.py b/tests/server/test_mcp.py index 5cbc1a7..90a9ecb 100644 --- a/tests/server/test_mcp.py +++ b/tests/server/test_mcp.py @@ -40,6 +40,12 @@ async def test_mcp_connectivity(nc_mcp_client: ClientSession): "nc_webdav_write_file", "nc_webdav_create_directory", "nc_webdav_delete_resource", + "nc_webdav_move_resource", + "nc_webdav_copy_resource", + "nc_webdav_search_files", + "nc_webdav_find_by_name", + "nc_webdav_find_by_type", + "nc_webdav_list_favorites", "nc_calendar_list_calendars", "nc_calendar_create_event", "nc_calendar_list_events", diff --git a/tests/server/test_webdav_search_mcp.py b/tests/server/test_webdav_search_mcp.py new file mode 100644 index 0000000..25f0900 --- /dev/null +++ b/tests/server/test_webdav_search_mcp.py @@ -0,0 +1,322 @@ +"""Integration tests for WebDAV search MCP tools.""" + +import json +import logging +import uuid + +import pytest +from mcp import ClientSession + +from nextcloud_mcp_server.client import NextcloudClient + +logger = logging.getLogger(__name__) +pytestmark = pytest.mark.integration + + +def normalize_search_response(data): + """Extract results list from SearchFilesResponse. + + The response is a SearchFilesResponse with a 'results' field containing the list of files. + """ + if isinstance(data, dict) and "results" in data: + return data["results"] + else: + # Fallback for unexpected format + return [] + + +@pytest.fixture +async def search_test_files(nc_client: NextcloudClient): + """Create test files for WebDAV search testing via MCP.""" + test_dir = f"mcp_webdav_search_{uuid.uuid4().hex[:8]}" + + # Create base directory + await nc_client.webdav.create_directory(test_dir) + + # Create various test files + test_files = [ + # Text files + (f"{test_dir}/search_test1.txt", b"Sample document", "text/plain"), + (f"{test_dir}/search_test2.txt", b"Another document", "text/plain"), + (f"{test_dir}/search_report.txt", b"Report content", "text/plain"), + # Markdown files + (f"{test_dir}/search_readme.md", b"# README", "text/markdown"), + (f"{test_dir}/search_notes.md", b"# Notes", "text/markdown"), + # Images (simulated) + (f"{test_dir}/search_image.jpg", b"\xff\xd8\xff fake jpg", "image/jpeg"), + (f"{test_dir}/search_photo.png", b"\x89PNG fake png", "image/png"), + # PDF (simulated) + (f"{test_dir}/search_presentation.pdf", b"%PDF-1.4", "application/pdf"), + ] + + # Write all test files + for file_path, content, content_type in test_files: + await nc_client.webdav.write_file(file_path, content, content_type) + + logger.info(f"Created {len(test_files)} test files in {test_dir}") + + yield test_dir + + # Cleanup + try: + await nc_client.webdav.delete_resource(test_dir) + logger.info(f"Cleaned up test directory: {test_dir}") + except Exception as e: + logger.warning(f"Failed to cleanup {test_dir}: {e}") + + +async def test_nc_webdav_find_by_name( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_find_by_name MCP tool.""" + # Find all .txt files in the test directory + result = await nc_mcp_client.call_tool( + "nc_webdav_find_by_name", + arguments={ + "pattern": "search_%.txt", + "scope": search_test_files, + }, + ) + + # Parse the result + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} files matching 'search_%.txt'") + + # Should find at least 3 .txt files + assert len(files) >= 3, f"Expected at least 3 .txt files, got {len(files)}" + + # Verify all results end with .txt + for file in files: + name = file.get("name", "") + assert name.endswith(".txt"), f"Expected .txt file, got {name}" + assert name.startswith("search_"), ( + f"Expected name to start with 'search_', got {name}" + ) + + +async def test_nc_webdav_find_by_name_with_limit( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_find_by_name with limit parameter.""" + # Find files with limit + result = await nc_mcp_client.call_tool( + "nc_webdav_find_by_name", + arguments={ + "pattern": "search_%.txt", + "scope": search_test_files, + "limit": 2, + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} files with limit=2") + + # Should return at most 2 results + assert len(files) <= 2, f"Expected at most 2 files, got {len(files)}" + assert len(files) > 0, "Expected at least 1 file" + + +async def test_nc_webdav_find_by_type_images( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_find_by_type for images.""" + # Find all images + result = await nc_mcp_client.call_tool( + "nc_webdav_find_by_type", + arguments={ + "mime_type": "image/%", + "scope": search_test_files, + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} image files") + + # Should find at least 2 image files (jpg and png) + assert len(files) >= 2, f"Expected at least 2 image files, got {len(files)}" + + # Verify all results are images + for file in files: + content_type = file.get("content_type", "") + assert content_type.startswith("image/"), ( + f"Expected image/* type, got {content_type}" + ) + + +async def test_nc_webdav_find_by_type_specific( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_find_by_type for specific MIME type.""" + # Find PDF files + result = await nc_mcp_client.call_tool( + "nc_webdav_find_by_type", + arguments={ + "mime_type": "application/pdf", + "scope": search_test_files, + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} PDF files") + + # Should find at least 1 PDF + assert len(files) >= 1, f"Expected at least 1 PDF file, got {len(files)}" + + # Verify result is PDF + for file in files: + content_type = file.get("content_type", "") + assert content_type == "application/pdf", ( + f"Expected application/pdf, got {content_type}" + ) + + +async def test_nc_webdav_search_files_basic( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_search_files with basic filters.""" + # Search for markdown files + result = await nc_mcp_client.call_tool( + "nc_webdav_search_files", + arguments={ + "scope": search_test_files, + "name_pattern": "%.md", + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} markdown files") + + # Should find at least 2 .md files + assert len(files) >= 2, f"Expected at least 2 .md files, got {len(files)}" + + # Verify all results are .md files + for file in files: + name = file.get("name", "") + assert name.endswith(".md"), f"Expected .md file, got {name}" + + +async def test_nc_webdav_search_files_combined( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_search_files with combined filters.""" + # Search for text files with specific name pattern + result = await nc_mcp_client.call_tool( + "nc_webdav_search_files", + arguments={ + "scope": search_test_files, + "name_pattern": "search_test%.txt", + "mime_type": "text/plain", + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} files matching combined filters") + + # Should find search_test1.txt and search_test2.txt + assert len(files) >= 2, f"Expected at least 2 files, got {len(files)}" + + # Verify all results match both conditions + for file in files: + name = file.get("name", "") + content_type = file.get("content_type", "") + assert name.endswith(".txt"), f"Expected .txt file, got {name}" + assert name.startswith("search_test"), ( + f"Expected 'search_test' prefix, got {name}" + ) + assert content_type == "text/plain", f"Expected text/plain, got {content_type}" + + +async def test_nc_webdav_search_files_with_limit( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test nc_webdav_search_files with result limit.""" + # Search with limit + result = await nc_mcp_client.call_tool( + "nc_webdav_search_files", + arguments={ + "scope": search_test_files, + "name_pattern": "search_%", + "limit": 3, + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + logger.info(f"Found {len(files)} files with limit=3") + + # Should return at most 3 results + assert len(files) <= 3, f"Expected at most 3 files, got {len(files)}" + assert len(files) > 0, "Expected at least 1 file" + + +async def test_nc_webdav_search_no_results( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test search that returns no results.""" + # Search for non-existent pattern + result = await nc_mcp_client.call_tool( + "nc_webdav_find_by_name", + arguments={ + "pattern": "nonexistent_xyz123.txt", + "scope": search_test_files, + }, + ) + + # Handle case where empty results might return empty content + if result.content and len(result.content) > 0: + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + else: + files = [] + + logger.info("Search correctly returned no results") + + # Should return empty array + assert len(files) == 0, f"Expected no results, got {len(files)}" + + +async def test_search_result_properties( + nc_mcp_client: ClientSession, search_test_files: str +): + """Test that search results include expected properties.""" + # Search for a specific file + result = await nc_mcp_client.call_tool( + "nc_webdav_find_by_name", + arguments={ + "pattern": "search_readme.md", + "scope": search_test_files, + }, + ) + + content = result.content[0].text + files = normalize_search_response(json.loads(content)) + + assert len(files) >= 1, "Should find at least one file" + + file = files[0] + + # Check for expected properties + assert "name" in file, "Should include name property" + assert "path" in file, "Should include path property" + assert "is_directory" in file, "Should include is_directory property" + assert file["is_directory"] is False, "File should not be a directory" + + # Check for extended properties from search + extended_props = ["file_id", "etag", "size", "content_type", "last_modified"] + present_props = [prop for prop in extended_props if prop in file] + + logger.info(f"Search result properties: {list(file.keys())}") + assert len(present_props) > 0, f"Should have at least one of {extended_props}"