diff --git a/app-hooks/post-installation/install-tables-app.sh b/app-hooks/post-installation/install-tables-app.sh new file mode 100755 index 0000000..53c8583 --- /dev/null +++ b/app-hooks/post-installation/install-tables-app.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +php /var/www/html/occ app:enable tables diff --git a/nextcloud_mcp_server/base_client.py b/nextcloud_mcp_server/base_client.py new file mode 100644 index 0000000..8b769b4 --- /dev/null +++ b/nextcloud_mcp_server/base_client.py @@ -0,0 +1,41 @@ +"""Base client for Nextcloud operations with shared authentication.""" + +from abc import ABC +from httpx import AsyncClient +import logging + +logger = logging.getLogger(__name__) + + +class BaseNextcloudClient(ABC): + """Base class for all Nextcloud app clients.""" + + def __init__(self, http_client: AsyncClient, username: str): + """Initialize with shared HTTP client and username. + + Args: + http_client: Authenticated AsyncClient instance + username: Nextcloud username for WebDAV operations + """ + self._client = http_client + self.username = username + + def _get_webdav_base_path(self) -> str: + """Helper to get the base WebDAV path for the authenticated user.""" + return f"/remote.php/dav/files/{self.username}" + + async def _make_request(self, method: str, url: str, **kwargs): + """Common request wrapper with logging and error handling. + + Args: + method: HTTP method + url: Request URL + **kwargs: Additional request parameters + + Returns: + Response object + """ + logger.debug(f"Making {method} request to {url}") + response = await self._client.request(method, url, **kwargs) + response.raise_for_status() + return response diff --git a/nextcloud_mcp_server/client.py b/nextcloud_mcp_server/client.py index e4bab4c..4257c0d 100644 --- a/nextcloud_mcp_server/client.py +++ b/nextcloud_mcp_server/client.py @@ -1,15 +1,16 @@ import os -import mimetypes from httpx import ( AsyncClient, Auth, BasicAuth, Request, Response, - HTTPStatusError, ) import logging +from .notes_client import NotesClient +from .webdav_client import WebDAVClient +from .controllers.notes_search import NotesSearchController logger = logging.getLogger(__name__) @@ -30,13 +31,22 @@ def log_response(response: Response): class NextcloudClient: + """Main Nextcloud client that orchestrates all app clients.""" + def __init__(self, base_url: str, username: str, auth: Auth | None = None): - self.username = username # Store username + self.username = username self._client = AsyncClient( base_url=base_url, auth=auth, # event_hooks={"request": [log_request], "response": [log_response]}, ) + + # Initialize app clients + self.notes = NotesClient(self._client, username) + self.webdav = WebDAVClient(self._client, username) + + # Initialize controllers + self._notes_search = NotesSearchController() @classmethod def from_env(cls): @@ -57,20 +67,18 @@ class NextcloudClient: return response.json() + # Convenience methods that delegate to subclients async def notes_get_settings(self): - response = await self._client.get("/apps/notes/api/v1/settings") - response.raise_for_status() - return response.json() + """Get Notes app settings.""" + return await self.notes.get_settings() async def notes_get_all(self): - response = await self._client.get("/apps/notes/api/v1/notes") - response.raise_for_status() - return response.json() + """Get all notes.""" + return await self.notes.get_all_notes() async def notes_get_note(self, *, note_id: int): - response = await self._client.get(f"/apps/notes/api/v1/notes/{note_id}") - response.raise_for_status() - return response.json() + """Get a specific note.""" + return await self.notes.get_note(note_id) async def notes_create_note( self, @@ -79,20 +87,8 @@ class NextcloudClient: content: str | None = None, category: str | None = None, ): - body = {} - if title: - body.update({"title": title}) - if content: - body.update({"content": content}) - if category: - body.update({"category": category}) - - response = await self._client.post( - url="/apps/notes/api/v1/notes", - json=body, - ) - response.raise_for_status() - return response.json() + """Create a new note.""" + return await self.notes.create_note(title=title, content=content, category=category) async def notes_update_note( self, @@ -103,372 +99,24 @@ class NextcloudClient: content: str | None = None, category: str | None = None, ): - # First, get the current note details to check for category change - old_note = None - try: - if category is not None: # Only fetch if category might change - old_note = await self.notes_get_note(note_id=note_id) - old_category = old_note.get("category", "") - logger.info(f"Current category for note {note_id}: '{old_category}'") - except Exception as e: - logger.warning( - f"Could not fetch current note {note_id} details before update: {e}" - ) - # Continue with update even if we couldn't fetch current details - old_note = None - - # Prepare update body - body = {} - if title: - body.update({"title": title}) - if content: - body.update({"content": content}) - if category: - body.update({"category": category}) - - logger.info( - "Attempting to update note %s with etag %s. Body: %s", - note_id, - etag, - body, + """Update a note.""" + return await self.notes.update( + note_id=note_id, etag=etag, title=title, content=content, category=category ) - # Ensure conditional PUT using If-Match header is active - response = await self._client.put( - url=f"/apps/notes/api/v1/notes/{note_id}", - json=body, - headers={"If-Match": f'"{etag}"'}, - ) - logger.info( - "Update response for note %s: Status %s, Headers %s", - note_id, - response.status_code, - response.headers, - ) - response.raise_for_status() - updated_note = response.json() - - # Check for category change and clean up old attachment directory if needed - if ( - old_note - and category is not None - and old_note.get("category", "") != category - ): - logger.info( - f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory" - ) - try: - await self._cleanup_old_attachment_directory( - note_id=note_id, old_category=old_note.get("category", "") - ) - except Exception as e: - logger.error( - f"Error cleaning up old attachment directory for note {note_id}: {e}" - ) - # Continue with update even if cleanup failed - - return updated_note async def notes_append_content(self, *, note_id: int, content: str): - """Append content to an existing note. - - The content will be separated by a newline and a delimiter `---`, so - one will not be required in the content provided to this tool - """ - logger.info(f"Appending content to note {note_id}") - - # Get current note - current_note = await self.notes_get_note(note_id=note_id) - - # Use fixed separator for consistency - separator = "\n---\n" - - # Combine content - existing_content = current_note.get("content", "") - if existing_content: - new_content = existing_content + separator + content - else: - new_content = content # No separator needed for empty notes - - logger.info( - f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)" - ) - - # Update with combined content - return await self.notes_update_note( - note_id=note_id, - etag=current_note["etag"], - content=new_content, - title=None, # Keep existing title - category=None, # Keep existing category - ) + """Append content to an existing note with a separator.""" + return await self.notes.append_content(note_id=note_id, content=content) async def notes_search_notes(self, *, query: str): - """ - Search notes using token-based matching with relevance ranking. - Returns notes sorted by relevance score. - """ - all_notes = await self.notes_get_all() - search_results = [] + """Search notes using token-based matching with relevance ranking.""" + all_notes = await self.notes.get_all_notes() + return self._notes_search.search_notes(all_notes, query) - # Process the query - query_tokens = self.process_query(query) - - # If empty query after processing, return empty results - if not query_tokens: - return [] - - # Process and score each note - for note in all_notes: - title_tokens, content_tokens = self.process_note_content(note) - score = self.calculate_score(query_tokens, title_tokens, content_tokens) - - # Only include notes with a non-zero score - if score >= 0.5: - search_results.append( - { - "id": note.get("id"), - "title": note.get("title"), - "category": note.get("category"), - "modified": note.get("modified"), - "_score": score, # Include score for sorting (optional field) - } - ) - - # Sort by score in descending order - search_results.sort(key=lambda x: x["_score"], reverse=True) - - # Keep score field for debugging - # for result in search_results: - # if "_score" in result: - # del result["_score"] - - return search_results - - def process_query(self, query: str) -> list[str]: - """ - Tokenize and normalize the search query. - """ - # Convert to lowercase and split into tokens - tokens = query.lower().split() - # Filter out very short tokens (optional) - tokens = [token for token in tokens if len(token) > 1] - # Could add stop word removal here - return tokens - - def process_note_content(self, note: dict) -> tuple[list[str], list[str]]: - """ - Tokenize and normalize note title and content. - """ - # Process title - title = note.get("title", "").lower() - title_tokens = title.split() - - # Process content - content = note.get("content", "").lower() - content_tokens = content.split() - - return title_tokens, content_tokens - - def calculate_score( - self, - query_tokens: list[str], - title_tokens: list[str], - content_tokens: list[str], - ) -> float: - """ - Calculate a relevance score for a note based on query tokens. - """ - # Constants for weighting - TITLE_WEIGHT = 3.0 - CONTENT_WEIGHT = 1.0 - - score = 0.0 - - # Count matches in title - title_matches = sum(1 for qt in query_tokens if qt in title_tokens) - if query_tokens: # Avoid division by zero - title_match_ratio = title_matches / len(query_tokens) - score += TITLE_WEIGHT * title_match_ratio - - # Count matches in content - content_matches = sum(1 for qt in query_tokens if qt in content_tokens) - if query_tokens: # Avoid division by zero - content_match_ratio = content_matches / len(query_tokens) - score += CONTENT_WEIGHT * content_match_ratio - - # If no tokens matched at all, return zero - if title_matches == 0 and content_matches == 0: - return 0.0 - - return score - - async def _cleanup_old_attachment_directory( - self, *, note_id: int, old_category: str - ): - """ - Clean up the attachment directory for a note in its old category location. - Called after a category change to prevent orphaned directories. - """ - # Construct path to old attachment directory - old_category_path_part = f"{old_category}/" if old_category else "" - old_attachment_dir_path = ( - f"Notes/{old_category_path_part}.attachments.{note_id}/" - ) - - logger.info(f"Cleaning up old attachment directory: {old_attachment_dir_path}") - try: - delete_result = await self.delete_webdav_resource( - path=old_attachment_dir_path - ) - logger.info(f"Cleanup of old attachment directory result: {delete_result}") - return delete_result - except Exception as e: - logger.error(f"Error during cleanup of old attachment directory: {e}") - raise e - - async def delete_webdav_resource(self, *, path: str): - """Delete a resource (file or directory) via WebDAV DELETE.""" - # Ensure path ends with a slash if it's a directory - if not path.endswith("/"): - # This is a heuristic; a more robust solution would check resource type first - # but for the specific case of deleting the attachment directory, this is acceptable. - path_with_slash = f"{path}/" - else: - path_with_slash = path - - webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}" - logger.info("Deleting WebDAV resource: %s", webdav_path) - - headers = {"OCS-APIRequest": "true"} - try: - # First try a PROPFIND to verify resource exists - propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} - try: - propfind_resp = await self._client.request( - "PROPFIND", webdav_path, headers=propfind_headers - ) - logger.info( - f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}" - ) - # If we get here with 2xx, the resource exists - except HTTPStatusError as e: - if e.response.status_code == 404: - logger.info( - f"Resource '{webdav_path}' doesn't exist, no deletion needed." - ) - return {"status_code": 404} - # For other errors, continue with deletion attempt - - # Proceed with deletion - response = await self._client.delete(webdav_path, headers=headers) - response.raise_for_status() # Raises for 4xx/5xx status codes - logger.info( - "Successfully deleted WebDAV resource '%s' (Status: %s)", - webdav_path, - response.status_code, - ) - # DELETE typically returns 204 No Content on success - return {"status_code": response.status_code} - - except HTTPStatusError as e: - logger.warning( - "HTTP error deleting WebDAV resource '%s': %s", - webdav_path, - e, - ) - # It's expected to get a 404 if the resource doesn't exist, which is fine. - # We only re-raise if it's not a 404. - if e.response.status_code != 404: - raise e - else: - logger.info("Resource '%s' not found, no deletion needed.", webdav_path) - return {"status_code": 404} # Indicate resource was not found - except Exception as e: - logger.warning( - "Unexpected error deleting WebDAV resource '%s': %s", - webdav_path, - e, - ) - raise e async def notes_delete_note(self, *, note_id: int): - """Deletes a note via API and attempts to delete its attachment directory via WebDAV.""" - # Fetch note details first to get the category for path construction - try: - note_details = await self.notes_get_note(note_id=note_id) - category = note_details.get("category", "") - - # Check for other potential categories (if any note was moved between categories) - # We can't reliably detect this without a dedicated tracking mechanism, but we can - # implement a basic check for common category names and empty category - potential_categories = [] - if category: - potential_categories.append(category) # Current category first - - # Add empty category (uncategorized notes) - if category != "": - potential_categories.append("") - - # We could add logic here to check for other common categories if needed - - logger.info( - f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}" - ) - except HTTPStatusError as e: - # If note doesn't exist (404), we can't delete attachments anyway. - # Re-raise other errors. - if e.response.status_code == 404: - logger.warning( - f"Note {note_id} not found when attempting delete. Skipping attachment cleanup." - ) - # Still raise the 404 as the primary delete operation failed - raise e - else: - logger.error( - f"Error fetching note {note_id} details before deleting attachments: {e}" - ) - raise e # Re-raise unexpected errors during fetch - - # Proceed with API note deletion - logger.info(f"Deleting note {note_id} via API.") - response = await self._client.delete(f"/apps/notes/api/v1/notes/{note_id}") - response.raise_for_status() # Raise if API deletion fails - logger.info(f"Note {note_id} deleted successfully via API.") - json_response = response.json() # Usually empty on success - - # Now, attempt to delete the associated attachments directory via WebDAV for each potential category - for cat in potential_categories: - cat_path_part = f"{cat}/" if cat else "" - attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/" - - logger.info( - f"Attempting to delete attachment directory for note {note_id} in category '{cat}' via WebDAV: {attachment_dir_path}" - ) - try: - # delete_webdav_resource expects path relative to user's files dir - delete_result = await self.delete_webdav_resource( - path=attachment_dir_path - ) - logger.info( - f"WebDAV deletion for category '{cat}' attachment directory: {delete_result}" - ) - except Exception as e: - # Log the error but don't re-raise, as API note deletion itself was successful - # Also, we want to try other potential categories even if one fails - logger.warning( - f"Failed during WebDAV deletion for category '{cat}' attachment directory: {e}" - ) - - return json_response - - # Removed incorrect get_note_attachment method that used Notes API - - def _get_webdav_base_path(self) -> str: - """Helper to get the base WebDAV path for the authenticated user.""" - # Use the stored username - return f"/remote.php/dav/files/{self.username}" - - # Removed _get_note_attachment_webdav_path helper + """Delete a note and its attachments.""" + return await self.notes.delete_note(note_id) async def add_note_attachment( self, @@ -479,196 +127,27 @@ class NextcloudClient: category: str | None = None, mime_type: str | None = None, ): - """ - Add/Update an attachment to a note via WebDAV PUT. - Requires the caller to provide the note's category. - """ - # Construct paths based on provided category - webdav_base = self._get_webdav_base_path() - category_path_part = f"{category}/" if category else "" - attachment_dir_segment = f".attachments.{note_id}" - parent_dir_webdav_rel_path = ( - f"Notes/{category_path_part}{attachment_dir_segment}" + """Add/Update an attachment to a note via WebDAV PUT.""" + return await self.webdav.add_note_attachment( + note_id=note_id, + filename=filename, + content=content, + category=category, + mime_type=mime_type, ) - parent_dir_path = ( - f"{webdav_base}/{parent_dir_webdav_rel_path}" # Full path for MKCOL - ) - attachment_path = f"{parent_dir_path}/{filename}" # Full path for PUT - - logger.info( - f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}" - ) - - # Log current auth settings to diagnose the issue - logger.info( - "WebDAV auth settings - Username: %s, Auth Type: %s", - self.username, - type(self._client.auth).__name__, - ) - - if not mime_type: - mime_type, _ = mimetypes.guess_type(filename) - if not mime_type: - mime_type = "application/octet-stream" # Default if guessing fails - - headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"} - try: - # First check if we can access WebDAV at all with current credentials - # by checking the Notes directory - notes_dir_path = f"{webdav_base}/Notes" - logger.info("Testing WebDAV access to Notes directory: %s", notes_dir_path) - - # Log details of the auth being used by the client for this specific request - if self._client.auth: - auth_header = ( - self._client.auth.auth_flow( - self._client.build_request("GET", notes_dir_path) - ) - .__next__() - .headers.get("Authorization") - ) - logger.info( - "Authorization header for PROPFIND (Notes dir): %s", - ( - auth_header - if auth_header - else "Not present or generated by auth flow" - ), - ) - else: - logger.info( - "No httpx.Auth object configured on the client for PROPFIND (Notes dir)." - ) - - propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} - logger.info("Headers for PROPFIND (Notes dir): %s", propfind_headers) - notes_dir_response = await self._client.request( - "PROPFIND", notes_dir_path, headers=propfind_headers - ) - - if notes_dir_response.status_code == 401: - logger.error( - "WebDAV authentication failed for Notes directory. Please verify WebDAV permissions." - ) - raise HTTPStatusError( - f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}", - request=notes_dir_response.request, - response=notes_dir_response, - ) - elif notes_dir_response.status_code >= 400: - logger.error( - "Error accessing WebDAV Notes directory: %s", - notes_dir_response.status_code, - ) - notes_dir_response.raise_for_status() - else: - logger.info( - "Successfully accessed WebDAV Notes directory (Status: %s)", - notes_dir_response.status_code, - ) - - # Ensure the parent directory exists using MKCOL - # parent_dir_path is now determined by the helper method - logger.info("Ensuring attachments directory exists: %s", parent_dir_path) - mkcol_headers = {"OCS-APIRequest": "true"} - logger.info("Headers for MKCOL (Attachments dir): %s", mkcol_headers) - mkcol_response = await self._client.request( - "MKCOL", parent_dir_path, headers=mkcol_headers - ) - # MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists) - # We can ignore 405, but raise for other errors - if mkcol_response.status_code not in [201, 405]: - logger.warning( - "Unexpected status code %s when creating attachments directory", - mkcol_response.status_code, - ) - mkcol_response.raise_for_status() - else: - logger.info( - "Created/verified directory: %s (Status: %s)", - parent_dir_path, - mkcol_response.status_code, - ) - - # Proceed with the PUT request - logger.info("Putting attachment file to: %s", attachment_path) - response = await self._client.put( - attachment_path, content=content, headers=headers - ) - response.raise_for_status() # Raises for 4xx/5xx status codes - logger.info( - "Successfully uploaded attachment '%s' to note %s (Status: %s)", - filename, - note_id, - response.status_code, - ) - # PUT typically returns 201 Created or 204 No Content on success - return { - "status_code": response.status_code - } # Return status or relevant info - - except HTTPStatusError as e: - logger.error( - "HTTP error uploading attachment '%s' to note %s: %s", - filename, - note_id, - e, - ) - raise e - except Exception as e: - logger.error( - "Unexpected error uploading attachment '%s' to note %s: %s", - filename, - note_id, - e, - ) - raise e async def get_note_attachment( self, *, note_id: int, filename: str, category: str | None = None ): - """ - Fetch a specific attachment from a note via WebDAV GET. - Requires the caller to provide the note's category. - """ - # Construct path based on provided category - webdav_base = self._get_webdav_base_path() - category_path_part = f"{category}/" if category else "" - attachment_dir_segment = f".attachments.{note_id}" - attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}" - - logger.info( - f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}" + """Fetch a specific attachment from a note via WebDAV GET.""" + return await self.webdav.get_note_attachment( + note_id=note_id, filename=filename, category=category ) - try: - response = await self._client.get(attachment_path) - response.raise_for_status() + def _get_webdav_base_path(self) -> str: + """Helper to get the base WebDAV path for the authenticated user.""" + return f"/remote.php/dav/files/{self.username}" - content = response.content - mime_type = response.headers.get("content-type", "application/octet-stream") - - logger.info( - "Successfully fetched attachment '%s' (%s, %d bytes)", - filename, - mime_type, - len(content), - ) - return content, mime_type - - except HTTPStatusError as e: - logger.error( - "HTTP error fetching attachment '%s' for note %s: %s", - filename, - note_id, - e, - ) - raise e - except Exception as e: - logger.error( - "Unexpected error fetching attachment '%s' for note %s: %s", - filename, - note_id, - e, - ) - raise e + async def close(self): + """Close the HTTP client.""" + await self._client.aclose() diff --git a/nextcloud_mcp_server/controllers/__init__.py b/nextcloud_mcp_server/controllers/__init__.py new file mode 100644 index 0000000..8645805 --- /dev/null +++ b/nextcloud_mcp_server/controllers/__init__.py @@ -0,0 +1 @@ +"""Controllers for utility operations.""" diff --git a/nextcloud_mcp_server/controllers/notes_search.py b/nextcloud_mcp_server/controllers/notes_search.py new file mode 100644 index 0000000..4cb7d40 --- /dev/null +++ b/nextcloud_mcp_server/controllers/notes_search.py @@ -0,0 +1,96 @@ +"""Controller for notes search functionality.""" + +from typing import List, Dict, Any + + +class NotesSearchController: + """Handles notes search logic and scoring.""" + + def search_notes(self, notes: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: + """ + Search notes using token-based matching with relevance ranking. + Returns notes sorted by relevance score. + """ + search_results = [] + query_tokens = self._process_query(query) + + # If empty query after processing, return empty results + if not query_tokens: + return [] + + # Process and score each note + for note in notes: + title_tokens, content_tokens = self._process_note_content(note) + score = self._calculate_score(query_tokens, title_tokens, content_tokens) + + # Only include notes with a non-zero score + if score >= 0.5: + search_results.append({ + "id": note.get("id"), + "title": note.get("title"), + "category": note.get("category"), + "modified": note.get("modified"), + "_score": score, # Include score for sorting + }) + + # Sort by score in descending order + search_results.sort(key=lambda x: x["_score"], reverse=True) + + return search_results + + def _process_query(self, query: str) -> List[str]: + """ + Tokenize and normalize the search query. + """ + # Convert to lowercase and split into tokens + tokens = query.lower().split() + # Filter out very short tokens + tokens = [token for token in tokens if len(token) > 1] + return tokens + + def _process_note_content(self, note: Dict[str, Any]) -> tuple[List[str], List[str]]: + """ + Tokenize and normalize note title and content. + """ + # Process title + title = note.get("title", "").lower() + title_tokens = title.split() + + # Process content + content = note.get("content", "").lower() + content_tokens = content.split() + + return title_tokens, content_tokens + + def _calculate_score( + self, + query_tokens: List[str], + title_tokens: List[str], + content_tokens: List[str], + ) -> float: + """ + Calculate a relevance score for a note based on query tokens. + """ + # Constants for weighting + TITLE_WEIGHT = 3.0 + CONTENT_WEIGHT = 1.0 + + score = 0.0 + + # Count matches in title + title_matches = sum(1 for qt in query_tokens if qt in title_tokens) + if query_tokens: # Avoid division by zero + title_match_ratio = title_matches / len(query_tokens) + score += TITLE_WEIGHT * title_match_ratio + + # Count matches in content + content_matches = sum(1 for qt in query_tokens if qt in content_tokens) + if query_tokens: # Avoid division by zero + content_match_ratio = content_matches / len(query_tokens) + score += CONTENT_WEIGHT * content_match_ratio + + # If no tokens matched at all, return zero + if title_matches == 0 and content_matches == 0: + return 0.0 + + return score diff --git a/nextcloud_mcp_server/notes_client.py b/nextcloud_mcp_server/notes_client.py new file mode 100644 index 0000000..f713066 --- /dev/null +++ b/nextcloud_mcp_server/notes_client.py @@ -0,0 +1,170 @@ +"""Client for Nextcloud Notes app operations.""" + +from typing import Dict, List, Any, Optional +import logging + +from .base_client import BaseNextcloudClient + +logger = logging.getLogger(__name__) + + +class NotesClient(BaseNextcloudClient): + """Client for Nextcloud Notes app operations.""" + + async def get_settings(self) -> Dict[str, Any]: + """Get Notes app settings.""" + response = await self._make_request("GET", "/apps/notes/api/v1/settings") + return response.json() + + async def get_all_notes(self) -> List[Dict[str, Any]]: + """Get all notes.""" + response = await self._make_request("GET", "/apps/notes/api/v1/notes") + return response.json() + + async def get_note(self, note_id: int) -> Dict[str, Any]: + """Get a specific note by ID.""" + response = await self._make_request("GET", f"/apps/notes/api/v1/notes/{note_id}") + return response.json() + + async def create_note( + self, + title: Optional[str] = None, + content: Optional[str] = None, + category: Optional[str] = None, + ) -> Dict[str, Any]: + """Create a new note.""" + body = {} + if title: + body["title"] = title + if content: + body["content"] = content + if category: + body["category"] = category + + response = await self._make_request("POST", "/apps/notes/api/v1/notes", json=body) + return response.json() + + async def update( + self, + note_id: int, + etag: str, + title: Optional[str] = None, + content: Optional[str] = None, + category: Optional[str] = None, + ) -> Dict[str, Any]: + """Update an existing note.""" + # Get current note details to check for category change + old_note = None + try: + if category is not None: + old_note = await self.get_note(note_id) + old_category = old_note.get("category", "") + logger.info(f"Current category for note {note_id}: '{old_category}'") + except Exception as e: + logger.warning(f"Could not fetch current note {note_id} details before update: {e}") + old_note = None + + # Prepare update body + body = {} + if title: + body["title"] = title + if content: + body["content"] = content + if category: + body["category"] = category + + logger.info(f"Attempting to update note {note_id} with etag {etag}. Body: {body}") + + response = await self._make_request( + "PUT", + f"/apps/notes/api/v1/notes/{note_id}", + json=body, + headers={"If-Match": f'"{etag}"'} + ) + + logger.info(f"Update response for note {note_id}: Status {response.status_code}") + updated_note = response.json() + + # Check for category change and cleanup old attachment directory if needed + if old_note and category is not None and old_note.get("category", "") != category: + logger.info(f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory") + try: + # Import here to avoid circular imports + from .webdav_client import WebDAVClient + webdav_client = WebDAVClient(self._client, self.username) + await webdav_client.cleanup_old_attachment_directory( + note_id=note_id, + old_category=old_note.get("category", "") + ) + except Exception as e: + logger.error(f"Error cleaning up old attachment directory for note {note_id}: {e}") + + return updated_note + + async def delete_note(self, note_id: int) -> Dict[str, Any]: + """Delete a note and its attachments.""" + # Fetch note details first to get category for cleanup + try: + note_details = await self.get_note(note_id) + category = note_details.get("category", "") + + # Determine potential categories for cleanup + potential_categories = [] + if category: + potential_categories.append(category) + if category != "": + potential_categories.append("") # Empty category + + logger.info(f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}") + except Exception as e: + logger.warning(f"Could not fetch note {note_id} details before deletion: {e}") + potential_categories = ["", "Unknown"] # Try common categories + + # Delete the note via API + logger.info(f"Deleting note {note_id} via API") + response = await self._make_request("DELETE", f"/apps/notes/api/v1/notes/{note_id}") + logger.info(f"Note {note_id} deleted successfully via API") + json_response = response.json() + + # Clean up attachment directories + try: + from .webdav_client import WebDAVClient + webdav_client = WebDAVClient(self._client, self.username) + + for cat in potential_categories: + try: + await webdav_client.cleanup_note_attachments(note_id, cat) + except Exception as e: + logger.warning(f"Failed to cleanup attachments for category '{cat}': {e}") + except Exception as e: + logger.warning(f"Error during attachment cleanup: {e}") + + return json_response + + async def append_content(self, note_id: int, content: str) -> Dict[str, Any]: + """Append content to an existing note with a separator.""" + logger.info(f"Appending content to note {note_id}") + + # Get current note + current_note = await self.get_note(note_id) + + # Use fixed separator for consistency + separator = "\n---\n" + + # Combine content + existing_content = current_note.get("content", "") + if existing_content: + new_content = existing_content + separator + content + else: + new_content = content # No separator needed for empty notes + + logger.info(f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)") + + # Update with combined content + return await self.update( + note_id=note_id, + etag=current_note["etag"], + content=new_content, + title=None, # Keep existing title + category=None, # Keep existing category + ) diff --git a/nextcloud_mcp_server/server.py b/nextcloud_mcp_server/server.py index 3102ecf..85c4066 100644 --- a/nextcloud_mcp_server/server.py +++ b/nextcloud_mcp_server/server.py @@ -26,7 +26,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: yield AppContext(client=client) finally: # Cleanup on shutdown - await client._client.aclose() + await client.close() # Create an MCP server diff --git a/nextcloud_mcp_server/webdav_client.py b/nextcloud_mcp_server/webdav_client.py new file mode 100644 index 0000000..32a0882 --- /dev/null +++ b/nextcloud_mcp_server/webdav_client.py @@ -0,0 +1,190 @@ +"""WebDAV client for Nextcloud file operations.""" + +import mimetypes +from typing import Tuple, Dict, Any, Optional +import logging +from httpx import HTTPStatusError + +from .base_client import BaseNextcloudClient + +logger = logging.getLogger(__name__) + + +class WebDAVClient(BaseNextcloudClient): + """Client for Nextcloud WebDAV operations.""" + + async def delete_resource(self, path: str) -> Dict[str, Any]: + """Delete a resource (file or directory) via WebDAV DELETE.""" + # Ensure path ends with a slash if it's a directory + if not path.endswith("/"): + path_with_slash = f"{path}/" + else: + path_with_slash = path + + webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}" + logger.info(f"Deleting WebDAV resource: {webdav_path}") + + headers = {"OCS-APIRequest": "true"} + try: + # First try a PROPFIND to verify resource exists + propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} + try: + propfind_resp = await self._client.request( + "PROPFIND", webdav_path, headers=propfind_headers + ) + logger.info(f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}") + except HTTPStatusError as e: + if e.response.status_code == 404: + logger.info(f"Resource '{webdav_path}' doesn't exist, no deletion needed.") + return {"status_code": 404} + # For other errors, continue with deletion attempt + + # Proceed with deletion + response = await self._client.delete(webdav_path, headers=headers) + response.raise_for_status() + logger.info(f"Successfully deleted WebDAV resource '{webdav_path}' (Status: {response.status_code})") + return {"status_code": response.status_code} + + except HTTPStatusError as e: + logger.warning(f"HTTP error deleting WebDAV resource '{webdav_path}': {e}") + if e.response.status_code != 404: + raise e + else: + logger.info(f"Resource '{webdav_path}' not found, no deletion needed.") + return {"status_code": 404} + except Exception as e: + logger.warning(f"Unexpected error deleting WebDAV resource '{webdav_path}': {e}") + raise e + + async def cleanup_old_attachment_directory(self, note_id: int, old_category: str) -> Dict[str, Any]: + """Clean up the attachment directory for a note in its old category location.""" + old_category_path_part = f"{old_category}/" if old_category else "" + old_attachment_dir_path = f"Notes/{old_category_path_part}.attachments.{note_id}/" + + logger.info(f"Cleaning up old attachment directory: {old_attachment_dir_path}") + try: + delete_result = await self.delete_resource(path=old_attachment_dir_path) + logger.info(f"Cleanup of old attachment directory result: {delete_result}") + return delete_result + except Exception as e: + logger.error(f"Error during cleanup of old attachment directory: {e}") + raise e + + async def cleanup_note_attachments(self, note_id: int, category: str) -> Dict[str, Any]: + """Clean up attachment directory for a specific note and category.""" + cat_path_part = f"{category}/" if category else "" + attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/" + + logger.info(f"Attempting to delete attachment directory for note {note_id} in category '{category}' via WebDAV: {attachment_dir_path}") + try: + delete_result = await self.delete_resource(path=attachment_dir_path) + logger.info(f"WebDAV deletion for category '{category}' attachment directory: {delete_result}") + return delete_result + except Exception as e: + logger.warning(f"Failed during WebDAV deletion for category '{category}' attachment directory: {e}") + raise e + + async def add_note_attachment( + self, + note_id: int, + filename: str, + content: bytes, + category: Optional[str] = None, + mime_type: Optional[str] = None, + ) -> Dict[str, Any]: + """Add/Update an attachment to a note via WebDAV PUT.""" + # Construct paths based on provided category + webdav_base = self._get_webdav_base_path() + category_path_part = f"{category}/" if category else "" + attachment_dir_segment = f".attachments.{note_id}" + parent_dir_webdav_rel_path = f"Notes/{category_path_part}{attachment_dir_segment}" + parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}" + attachment_path = f"{parent_dir_path}/{filename}" + + logger.info(f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}") + + # Log current auth settings + logger.info(f"WebDAV auth settings - Username: {self.username}, Auth Type: {type(self._client.auth).__name__}") + + if not mime_type: + mime_type, _ = mimetypes.guess_type(filename) + if not mime_type: + mime_type = "application/octet-stream" + + headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"} + try: + # First check if we can access WebDAV at all + notes_dir_path = f"{webdav_base}/Notes" + logger.info(f"Testing WebDAV access to Notes directory: {notes_dir_path}") + + propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} + notes_dir_response = await self._client.request( + "PROPFIND", notes_dir_path, headers=propfind_headers + ) + + if notes_dir_response.status_code == 401: + logger.error("WebDAV authentication failed for Notes directory. Please verify WebDAV permissions.") + raise HTTPStatusError( + f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}", + request=notes_dir_response.request, + response=notes_dir_response, + ) + elif notes_dir_response.status_code >= 400: + logger.error(f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}") + notes_dir_response.raise_for_status() + else: + logger.info(f"Successfully accessed WebDAV Notes directory (Status: {notes_dir_response.status_code})") + + # Ensure the parent directory exists using MKCOL + logger.info(f"Ensuring attachments directory exists: {parent_dir_path}") + mkcol_headers = {"OCS-APIRequest": "true"} + mkcol_response = await self._client.request("MKCOL", parent_dir_path, headers=mkcol_headers) + + # MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists) + if mkcol_response.status_code not in [201, 405]: + logger.warning(f"Unexpected status code {mkcol_response.status_code} when creating attachments directory") + mkcol_response.raise_for_status() + else: + logger.info(f"Created/verified directory: {parent_dir_path} (Status: {mkcol_response.status_code})") + + # Proceed with the PUT request + logger.info(f"Putting attachment file to: {attachment_path}") + response = await self._client.put(attachment_path, content=content, headers=headers) + response.raise_for_status() + logger.info(f"Successfully uploaded attachment '{filename}' to note {note_id} (Status: {response.status_code})") + return {"status_code": response.status_code} + + except HTTPStatusError as e: + logger.error(f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}") + raise e + except Exception as e: + logger.error(f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}") + raise e + + async def get_note_attachment( + self, note_id: int, filename: str, category: Optional[str] = None + ) -> Tuple[bytes, str]: + """Fetch a specific attachment from a note via WebDAV GET.""" + webdav_base = self._get_webdav_base_path() + category_path_part = f"{category}/" if category else "" + attachment_dir_segment = f".attachments.{note_id}" + attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}" + + logger.info(f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}") + + try: + response = await self._client.get(attachment_path) + response.raise_for_status() + + content = response.content + mime_type = response.headers.get("content-type", "application/octet-stream") + + logger.info(f"Successfully fetched attachment '{filename}' ({mime_type}, {len(content)} bytes)") + return content, mime_type + + except HTTPStatusError as e: + logger.error(f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}") + raise e + except Exception as e: + logger.error(f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}") + raise e diff --git a/tests/conftest.py b/tests/conftest.py index 621e7cc..c166b40 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,8 @@ import pytest import os import logging import uuid -from nextcloud_mcp_server.client import NextcloudClient, HTTPStatusError +from nextcloud_mcp_server.client import NextcloudClient +from httpx import HTTPStatusError import asyncio logger = logging.getLogger(__name__)