From a57c12591aa90abb37bca55609d75ce0786f57d7 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 6 Jul 2025 08:41:02 +0200 Subject: [PATCH] chore: ruff format --- nextcloud_mcp_server/base_client.py | 12 +- nextcloud_mcp_server/client.py | 11 +- .../controllers/notes_search.py | 56 +++--- nextcloud_mcp_server/notes_client.py | 111 +++++++----- nextcloud_mcp_server/webdav_client.py | 160 ++++++++++++------ 5 files changed, 220 insertions(+), 130 deletions(-) diff --git a/nextcloud_mcp_server/base_client.py b/nextcloud_mcp_server/base_client.py index 8b769b4..92add59 100644 --- a/nextcloud_mcp_server/base_client.py +++ b/nextcloud_mcp_server/base_client.py @@ -9,29 +9,29 @@ logger = logging.getLogger(__name__) class BaseNextcloudClient(ABC): """Base class for all Nextcloud app clients.""" - + def __init__(self, http_client: AsyncClient, username: str): """Initialize with shared HTTP client and username. - + Args: http_client: Authenticated AsyncClient instance username: Nextcloud username for WebDAV operations """ self._client = http_client self.username = username - + def _get_webdav_base_path(self) -> str: """Helper to get the base WebDAV path for the authenticated user.""" return f"/remote.php/dav/files/{self.username}" - + async def _make_request(self, method: str, url: str, **kwargs): """Common request wrapper with logging and error handling. - + Args: method: HTTP method url: Request URL **kwargs: Additional request parameters - + Returns: Response object """ diff --git a/nextcloud_mcp_server/client.py b/nextcloud_mcp_server/client.py index 4257c0d..e2ed09d 100644 --- a/nextcloud_mcp_server/client.py +++ b/nextcloud_mcp_server/client.py @@ -32,7 +32,7 @@ def log_response(response: Response): class NextcloudClient: """Main Nextcloud client that orchestrates all app clients.""" - + def __init__(self, base_url: str, username: str, auth: Auth | None = None): self.username = username self._client = AsyncClient( @@ -40,11 +40,11 @@ class NextcloudClient: auth=auth, # event_hooks={"request": [log_request], "response": [log_response]}, ) - + # Initialize app clients self.notes = NotesClient(self._client, username) self.webdav = WebDAVClient(self._client, username) - + # Initialize controllers self._notes_search = NotesSearchController() @@ -88,7 +88,9 @@ class NextcloudClient: category: str | None = None, ): """Create a new note.""" - return await self.notes.create_note(title=title, content=content, category=category) + return await self.notes.create_note( + title=title, content=content, category=category + ) async def notes_update_note( self, @@ -113,7 +115,6 @@ class NextcloudClient: all_notes = await self.notes.get_all_notes() return self._notes_search.search_notes(all_notes, query) - async def notes_delete_note(self, *, note_id: int): """Delete a note and its attachments.""" return await self.notes.delete_note(note_id) diff --git a/nextcloud_mcp_server/controllers/notes_search.py b/nextcloud_mcp_server/controllers/notes_search.py index 4cb7d40..7a4e0e8 100644 --- a/nextcloud_mcp_server/controllers/notes_search.py +++ b/nextcloud_mcp_server/controllers/notes_search.py @@ -5,39 +5,43 @@ from typing import List, Dict, Any class NotesSearchController: """Handles notes search logic and scoring.""" - - def search_notes(self, notes: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: + + def search_notes( + self, notes: List[Dict[str, Any]], query: str + ) -> List[Dict[str, Any]]: """ Search notes using token-based matching with relevance ranking. Returns notes sorted by relevance score. """ search_results = [] query_tokens = self._process_query(query) - + # If empty query after processing, return empty results if not query_tokens: return [] - + # Process and score each note for note in notes: title_tokens, content_tokens = self._process_note_content(note) score = self._calculate_score(query_tokens, title_tokens, content_tokens) - + # Only include notes with a non-zero score if score >= 0.5: - search_results.append({ - "id": note.get("id"), - "title": note.get("title"), - "category": note.get("category"), - "modified": note.get("modified"), - "_score": score, # Include score for sorting - }) - + search_results.append( + { + "id": note.get("id"), + "title": note.get("title"), + "category": note.get("category"), + "modified": note.get("modified"), + "_score": score, # Include score for sorting + } + ) + # Sort by score in descending order search_results.sort(key=lambda x: x["_score"], reverse=True) - + return search_results - + def _process_query(self, query: str) -> List[str]: """ Tokenize and normalize the search query. @@ -47,21 +51,23 @@ class NotesSearchController: # Filter out very short tokens tokens = [token for token in tokens if len(token) > 1] return tokens - - def _process_note_content(self, note: Dict[str, Any]) -> tuple[List[str], List[str]]: + + def _process_note_content( + self, note: Dict[str, Any] + ) -> tuple[List[str], List[str]]: """ Tokenize and normalize note title and content. """ # Process title title = note.get("title", "").lower() title_tokens = title.split() - + # Process content content = note.get("content", "").lower() content_tokens = content.split() - + return title_tokens, content_tokens - + def _calculate_score( self, query_tokens: List[str], @@ -74,23 +80,23 @@ class NotesSearchController: # Constants for weighting TITLE_WEIGHT = 3.0 CONTENT_WEIGHT = 1.0 - + score = 0.0 - + # Count matches in title title_matches = sum(1 for qt in query_tokens if qt in title_tokens) if query_tokens: # Avoid division by zero title_match_ratio = title_matches / len(query_tokens) score += TITLE_WEIGHT * title_match_ratio - + # Count matches in content content_matches = sum(1 for qt in query_tokens if qt in content_tokens) if query_tokens: # Avoid division by zero content_match_ratio = content_matches / len(query_tokens) score += CONTENT_WEIGHT * content_match_ratio - + # If no tokens matched at all, return zero if title_matches == 0 and content_matches == 0: return 0.0 - + return score diff --git a/nextcloud_mcp_server/notes_client.py b/nextcloud_mcp_server/notes_client.py index f713066..4dee62c 100644 --- a/nextcloud_mcp_server/notes_client.py +++ b/nextcloud_mcp_server/notes_client.py @@ -10,22 +10,24 @@ logger = logging.getLogger(__name__) class NotesClient(BaseNextcloudClient): """Client for Nextcloud Notes app operations.""" - + async def get_settings(self) -> Dict[str, Any]: """Get Notes app settings.""" response = await self._make_request("GET", "/apps/notes/api/v1/settings") return response.json() - + async def get_all_notes(self) -> List[Dict[str, Any]]: """Get all notes.""" response = await self._make_request("GET", "/apps/notes/api/v1/notes") return response.json() - + async def get_note(self, note_id: int) -> Dict[str, Any]: """Get a specific note by ID.""" - response = await self._make_request("GET", f"/apps/notes/api/v1/notes/{note_id}") + response = await self._make_request( + "GET", f"/apps/notes/api/v1/notes/{note_id}" + ) return response.json() - + async def create_note( self, title: Optional[str] = None, @@ -40,10 +42,12 @@ class NotesClient(BaseNextcloudClient): body["content"] = content if category: body["category"] = category - - response = await self._make_request("POST", "/apps/notes/api/v1/notes", json=body) + + response = await self._make_request( + "POST", "/apps/notes/api/v1/notes", json=body + ) return response.json() - + async def update( self, note_id: int, @@ -61,9 +65,11 @@ class NotesClient(BaseNextcloudClient): old_category = old_note.get("category", "") logger.info(f"Current category for note {note_id}: '{old_category}'") except Exception as e: - logger.warning(f"Could not fetch current note {note_id} details before update: {e}") + logger.warning( + f"Could not fetch current note {note_id} details before update: {e}" + ) old_note = None - + # Prepare update body body = {} if title: @@ -72,94 +78,117 @@ class NotesClient(BaseNextcloudClient): body["content"] = content if category: body["category"] = category - - logger.info(f"Attempting to update note {note_id} with etag {etag}. Body: {body}") - + + logger.info( + f"Attempting to update note {note_id} with etag {etag}. Body: {body}" + ) + response = await self._make_request( "PUT", f"/apps/notes/api/v1/notes/{note_id}", json=body, - headers={"If-Match": f'"{etag}"'} + headers={"If-Match": f'"{etag}"'}, + ) + + logger.info( + f"Update response for note {note_id}: Status {response.status_code}" ) - - logger.info(f"Update response for note {note_id}: Status {response.status_code}") updated_note = response.json() - + # Check for category change and cleanup old attachment directory if needed - if old_note and category is not None and old_note.get("category", "") != category: - logger.info(f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory") + if ( + old_note + and category is not None + and old_note.get("category", "") != category + ): + logger.info( + f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory" + ) try: # Import here to avoid circular imports from .webdav_client import WebDAVClient + webdav_client = WebDAVClient(self._client, self.username) await webdav_client.cleanup_old_attachment_directory( - note_id=note_id, - old_category=old_note.get("category", "") + note_id=note_id, old_category=old_note.get("category", "") ) except Exception as e: - logger.error(f"Error cleaning up old attachment directory for note {note_id}: {e}") - + logger.error( + f"Error cleaning up old attachment directory for note {note_id}: {e}" + ) + return updated_note - + async def delete_note(self, note_id: int) -> Dict[str, Any]: """Delete a note and its attachments.""" # Fetch note details first to get category for cleanup try: note_details = await self.get_note(note_id) category = note_details.get("category", "") - + # Determine potential categories for cleanup potential_categories = [] if category: potential_categories.append(category) if category != "": potential_categories.append("") # Empty category - - logger.info(f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}") + + logger.info( + f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}" + ) except Exception as e: - logger.warning(f"Could not fetch note {note_id} details before deletion: {e}") + logger.warning( + f"Could not fetch note {note_id} details before deletion: {e}" + ) potential_categories = ["", "Unknown"] # Try common categories - + # Delete the note via API logger.info(f"Deleting note {note_id} via API") - response = await self._make_request("DELETE", f"/apps/notes/api/v1/notes/{note_id}") + response = await self._make_request( + "DELETE", f"/apps/notes/api/v1/notes/{note_id}" + ) logger.info(f"Note {note_id} deleted successfully via API") json_response = response.json() - + # Clean up attachment directories try: from .webdav_client import WebDAVClient + webdav_client = WebDAVClient(self._client, self.username) - + for cat in potential_categories: try: await webdav_client.cleanup_note_attachments(note_id, cat) except Exception as e: - logger.warning(f"Failed to cleanup attachments for category '{cat}': {e}") + logger.warning( + f"Failed to cleanup attachments for category '{cat}': {e}" + ) except Exception as e: logger.warning(f"Error during attachment cleanup: {e}") - + return json_response - + async def append_content(self, note_id: int, content: str) -> Dict[str, Any]: """Append content to an existing note with a separator.""" logger.info(f"Appending content to note {note_id}") - + # Get current note current_note = await self.get_note(note_id) - + # Use fixed separator for consistency separator = "\n---\n" - + # Combine content existing_content = current_note.get("content", "") if existing_content: new_content = existing_content + separator + content else: new_content = content # No separator needed for empty notes - - logger.info(f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)") - + + logger.info( + f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)" + ) + # Update with combined content return await self.update( note_id=note_id, diff --git a/nextcloud_mcp_server/webdav_client.py b/nextcloud_mcp_server/webdav_client.py index 32a0882..5dac2d3 100644 --- a/nextcloud_mcp_server/webdav_client.py +++ b/nextcloud_mcp_server/webdav_client.py @@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) class WebDAVClient(BaseNextcloudClient): """Client for Nextcloud WebDAV operations.""" - + async def delete_resource(self, path: str) -> Dict[str, Any]: """Delete a resource (file or directory) via WebDAV DELETE.""" # Ensure path ends with a slash if it's a directory @@ -20,10 +20,10 @@ class WebDAVClient(BaseNextcloudClient): path_with_slash = f"{path}/" else: path_with_slash = path - + webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}" logger.info(f"Deleting WebDAV resource: {webdav_path}") - + headers = {"OCS-APIRequest": "true"} try: # First try a PROPFIND to verify resource exists @@ -32,19 +32,25 @@ class WebDAVClient(BaseNextcloudClient): propfind_resp = await self._client.request( "PROPFIND", webdav_path, headers=propfind_headers ) - logger.info(f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}") + logger.info( + f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}" + ) except HTTPStatusError as e: if e.response.status_code == 404: - logger.info(f"Resource '{webdav_path}' doesn't exist, no deletion needed.") + logger.info( + f"Resource '{webdav_path}' doesn't exist, no deletion needed." + ) return {"status_code": 404} # For other errors, continue with deletion attempt - + # Proceed with deletion response = await self._client.delete(webdav_path, headers=headers) response.raise_for_status() - logger.info(f"Successfully deleted WebDAV resource '{webdav_path}' (Status: {response.status_code})") + logger.info( + f"Successfully deleted WebDAV resource '{webdav_path}' (Status: {response.status_code})" + ) return {"status_code": response.status_code} - + except HTTPStatusError as e: logger.warning(f"HTTP error deleting WebDAV resource '{webdav_path}': {e}") if e.response.status_code != 404: @@ -53,14 +59,20 @@ class WebDAVClient(BaseNextcloudClient): logger.info(f"Resource '{webdav_path}' not found, no deletion needed.") return {"status_code": 404} except Exception as e: - logger.warning(f"Unexpected error deleting WebDAV resource '{webdav_path}': {e}") + logger.warning( + f"Unexpected error deleting WebDAV resource '{webdav_path}': {e}" + ) raise e - - async def cleanup_old_attachment_directory(self, note_id: int, old_category: str) -> Dict[str, Any]: + + async def cleanup_old_attachment_directory( + self, note_id: int, old_category: str + ) -> Dict[str, Any]: """Clean up the attachment directory for a note in its old category location.""" old_category_path_part = f"{old_category}/" if old_category else "" - old_attachment_dir_path = f"Notes/{old_category_path_part}.attachments.{note_id}/" - + old_attachment_dir_path = ( + f"Notes/{old_category_path_part}.attachments.{note_id}/" + ) + logger.info(f"Cleaning up old attachment directory: {old_attachment_dir_path}") try: delete_result = await self.delete_resource(path=old_attachment_dir_path) @@ -69,21 +81,29 @@ class WebDAVClient(BaseNextcloudClient): except Exception as e: logger.error(f"Error during cleanup of old attachment directory: {e}") raise e - - async def cleanup_note_attachments(self, note_id: int, category: str) -> Dict[str, Any]: + + async def cleanup_note_attachments( + self, note_id: int, category: str + ) -> Dict[str, Any]: """Clean up attachment directory for a specific note and category.""" cat_path_part = f"{category}/" if category else "" attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/" - - logger.info(f"Attempting to delete attachment directory for note {note_id} in category '{category}' via WebDAV: {attachment_dir_path}") + + logger.info( + f"Attempting to delete attachment directory for note {note_id} in category '{category}' via WebDAV: {attachment_dir_path}" + ) try: delete_result = await self.delete_resource(path=attachment_dir_path) - logger.info(f"WebDAV deletion for category '{category}' attachment directory: {delete_result}") + logger.info( + f"WebDAV deletion for category '{category}' attachment directory: {delete_result}" + ) return delete_result except Exception as e: - logger.warning(f"Failed during WebDAV deletion for category '{category}' attachment directory: {e}") + logger.warning( + f"Failed during WebDAV deletion for category '{category}' attachment directory: {e}" + ) raise e - + async def add_note_attachment( self, note_id: int, @@ -97,70 +117,96 @@ class WebDAVClient(BaseNextcloudClient): webdav_base = self._get_webdav_base_path() category_path_part = f"{category}/" if category else "" attachment_dir_segment = f".attachments.{note_id}" - parent_dir_webdav_rel_path = f"Notes/{category_path_part}{attachment_dir_segment}" + parent_dir_webdav_rel_path = ( + f"Notes/{category_path_part}{attachment_dir_segment}" + ) parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}" attachment_path = f"{parent_dir_path}/{filename}" - - logger.info(f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}") - + + logger.info( + f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}" + ) + # Log current auth settings - logger.info(f"WebDAV auth settings - Username: {self.username}, Auth Type: {type(self._client.auth).__name__}") - + logger.info( + f"WebDAV auth settings - Username: {self.username}, Auth Type: {type(self._client.auth).__name__}" + ) + if not mime_type: mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" - + headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"} try: # First check if we can access WebDAV at all notes_dir_path = f"{webdav_base}/Notes" logger.info(f"Testing WebDAV access to Notes directory: {notes_dir_path}") - + propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} notes_dir_response = await self._client.request( "PROPFIND", notes_dir_path, headers=propfind_headers ) - + if notes_dir_response.status_code == 401: - logger.error("WebDAV authentication failed for Notes directory. Please verify WebDAV permissions.") + logger.error( + "WebDAV authentication failed for Notes directory. Please verify WebDAV permissions." + ) raise HTTPStatusError( f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}", request=notes_dir_response.request, response=notes_dir_response, ) elif notes_dir_response.status_code >= 400: - logger.error(f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}") + logger.error( + f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}" + ) notes_dir_response.raise_for_status() else: - logger.info(f"Successfully accessed WebDAV Notes directory (Status: {notes_dir_response.status_code})") - + logger.info( + f"Successfully accessed WebDAV Notes directory (Status: {notes_dir_response.status_code})" + ) + # Ensure the parent directory exists using MKCOL logger.info(f"Ensuring attachments directory exists: {parent_dir_path}") mkcol_headers = {"OCS-APIRequest": "true"} - mkcol_response = await self._client.request("MKCOL", parent_dir_path, headers=mkcol_headers) - + mkcol_response = await self._client.request( + "MKCOL", parent_dir_path, headers=mkcol_headers + ) + # MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists) if mkcol_response.status_code not in [201, 405]: - logger.warning(f"Unexpected status code {mkcol_response.status_code} when creating attachments directory") + logger.warning( + f"Unexpected status code {mkcol_response.status_code} when creating attachments directory" + ) mkcol_response.raise_for_status() else: - logger.info(f"Created/verified directory: {parent_dir_path} (Status: {mkcol_response.status_code})") - + logger.info( + f"Created/verified directory: {parent_dir_path} (Status: {mkcol_response.status_code})" + ) + # Proceed with the PUT request logger.info(f"Putting attachment file to: {attachment_path}") - response = await self._client.put(attachment_path, content=content, headers=headers) + response = await self._client.put( + attachment_path, content=content, headers=headers + ) response.raise_for_status() - logger.info(f"Successfully uploaded attachment '{filename}' to note {note_id} (Status: {response.status_code})") + logger.info( + f"Successfully uploaded attachment '{filename}' to note {note_id} (Status: {response.status_code})" + ) return {"status_code": response.status_code} - + except HTTPStatusError as e: - logger.error(f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}") + logger.error( + f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}" + ) raise e except Exception as e: - logger.error(f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}") + logger.error( + f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}" + ) raise e - + async def get_note_attachment( self, note_id: int, filename: str, category: Optional[str] = None ) -> Tuple[bytes, str]: @@ -169,22 +215,30 @@ class WebDAVClient(BaseNextcloudClient): category_path_part = f"{category}/" if category else "" attachment_dir_segment = f".attachments.{note_id}" attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}" - - logger.info(f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}") - + + logger.info( + f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}" + ) + try: response = await self._client.get(attachment_path) response.raise_for_status() - + content = response.content mime_type = response.headers.get("content-type", "application/octet-stream") - - logger.info(f"Successfully fetched attachment '{filename}' ({mime_type}, {len(content)} bytes)") + + logger.info( + f"Successfully fetched attachment '{filename}' ({mime_type}, {len(content)} bytes)" + ) return content, mime_type - + except HTTPStatusError as e: - logger.error(f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}") + logger.error( + f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}" + ) raise e except Exception as e: - logger.error(f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}") + logger.error( + f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}" + ) raise e