feat: Switch to using async client

This commit is contained in:
Chris Coutinho
2025-06-06 18:41:57 +02:00
parent c918284927
commit ee32a1bfe8
12 changed files with 838 additions and 441 deletions
+211 -104
View File
@@ -1,6 +1,8 @@
import os
import datetime as dt
import mimetypes
from httpx import (
AsyncClient,
Client,
Auth,
BasicAuth,
@@ -16,7 +18,7 @@ logger = logging.getLogger(__name__)
def log_request(request: Request):
logger.info(
"Request event hook ****: %s %s - Waiting for content",
"Request event hook: %s %s - Waiting for content",
request.method,
request.url,
)
@@ -30,18 +32,16 @@ def log_response(response: Response):
class NextcloudClient:
def __init__(self, base_url: str, username: str, auth: Auth | None = None):
self.username = username # Store username
self._client = Client(
self.username = username # Store username
self._client = AsyncClient(
base_url=base_url,
auth=auth,
event_hooks={"request": [log_request], "response": [log_response]},
# event_hooks={"request": [log_request], "response": [log_response]},
)
@classmethod
def from_env(cls):
logger.info("Creating NC Client using env vars")
host = os.environ["NEXTCLOUD_HOST"]
@@ -50,9 +50,8 @@ class NextcloudClient:
# Pass username to constructor
return cls(base_url=host, username=username, auth=BasicAuth(username, password))
def capabilities(self):
response = self._client.get(
async def capabilities(self):
response = await self._client.get(
"/ocs/v2.php/cloud/capabilities",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
@@ -60,22 +59,22 @@ class NextcloudClient:
return response.json()
def notes_get_settings(self):
response = self._client.get("/apps/notes/api/v1/settings")
async def notes_get_settings(self):
response = await self._client.get("/apps/notes/api/v1/settings")
response.raise_for_status()
return response.json()
def notes_get_all(self):
response = self._client.get("/apps/notes/api/v1/notes")
async def notes_get_all(self):
response = await self._client.get("/apps/notes/api/v1/notes")
response.raise_for_status()
return response.json()
def notes_get_note(self, *, note_id: int):
response = self._client.get(f"/apps/notes/api/v1/notes/{note_id}")
async def notes_get_note(self, *, note_id: int):
response = await self._client.get(f"/apps/notes/api/v1/notes/{note_id}")
response.raise_for_status()
return response.json()
def notes_create_note(
async def notes_create_note(
self,
*,
title: str | None = None,
@@ -90,14 +89,14 @@ class NextcloudClient:
if category:
body.update({"category": category})
response = self._client.post(
response = await self._client.post(
url="/apps/notes/api/v1/notes",
json=body,
)
response.raise_for_status()
return response.json()
def notes_update_note(
async def notes_update_note(
self,
*,
note_id: int,
@@ -110,11 +109,13 @@ class NextcloudClient:
old_note = None
try:
if category is not None: # Only fetch if category might change
old_note = self.notes_get_note(note_id=note_id)
old_note = await self.notes_get_note(note_id=note_id)
old_category = old_note.get("category", "")
logger.info(f"Current category for note {note_id}: '{old_category}'")
except Exception as e:
logger.warning(f"Could not fetch current note {note_id} details before update: {e}")
logger.warning(
f"Could not fetch current note {note_id} details before update: {e}"
)
# Continue with update even if we couldn't fetch current details
old_note = None
@@ -134,7 +135,7 @@ class NextcloudClient:
body,
)
# Ensure conditional PUT using If-Match header is active
response = self._client.put(
response = await self._client.put(
url=f"/apps/notes/api/v1/notes/{note_id}",
json=body,
headers={"If-Match": f'"{etag}"'},
@@ -149,25 +150,39 @@ class NextcloudClient:
updated_note = response.json()
# Check for category change and clean up old attachment directory if needed
if old_note and category is not None and old_note.get("category", "") != category:
logger.info(f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory")
if (
old_note
and category is not None
and old_note.get("category", "") != category
):
logger.info(
f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory"
)
try:
self._cleanup_old_attachment_directory(note_id=note_id, old_category=old_note.get("category", ""))
await self._cleanup_old_attachment_directory(
note_id=note_id, old_category=old_note.get("category", "")
)
except Exception as e:
logger.error(f"Error cleaning up old attachment directory for note {note_id}: {e}")
logger.error(
f"Error cleaning up old attachment directory for note {note_id}: {e}"
)
# Continue with update even if cleanup failed
return updated_note
def notes_append_content(self, *, note_id: int, content: str):
"""Append content to an existing note with a standard separator"""
async def notes_append_content(self, *, note_id: int, content: str):
"""Append content to an existing note.
The content will be separated by a newline, delimiter `---`, and
timestemp so callers do not need to append metadata themselves.
"""
logger.info(f"Appending content to note {note_id}")
# Get current note
current_note = self.notes_get_note(note_id=note_id)
current_note = await self.notes_get_note(note_id=note_id)
# Use fixed separator for consistency
separator = "\n---\n"
separator = f"\n---\n## Content appended: {dt.datetime.now():%Y-%m-%d %H:%M}\n"
# Combine content
existing_content = current_note.get("content", "")
@@ -176,23 +191,25 @@ class NextcloudClient:
else:
new_content = content # No separator needed for empty notes
logger.info(f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)")
logger.info(
f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)"
)
# Update with combined content
return self.notes_update_note(
return await self.notes_update_note(
note_id=note_id,
etag=current_note["etag"],
content=new_content,
title=None, # Keep existing title
category=None # Keep existing category
category=None, # Keep existing category
)
def notes_search_notes(self, *, query: str):
async def notes_search_notes(self, *, query: str):
"""
Search notes using token-based matching with relevance ranking.
Returns notes sorted by relevance score.
"""
all_notes = self.notes_get_all()
all_notes = await self.notes_get_all()
search_results = []
# Process the query
@@ -209,13 +226,15 @@ class NextcloudClient:
# Only include notes with a non-zero score
if score >= 0.5:
search_results.append({
"id": note.get("id"),
"title": note.get("title"),
"category": note.get("category"),
"modified": note.get("modified"),
"_score": score # Include score for sorting (optional field)
})
search_results.append(
{
"id": note.get("id"),
"title": note.get("title"),
"category": note.get("category"),
"modified": note.get("modified"),
"_score": score, # Include score for sorting (optional field)
}
)
# Sort by score in descending order
search_results.sort(key=lambda x: x["_score"], reverse=True)
@@ -252,7 +271,12 @@ class NextcloudClient:
return title_tokens, content_tokens
def calculate_score(self, query_tokens: list[str], title_tokens: list[str], content_tokens: list[str]) -> float:
def calculate_score(
self,
query_tokens: list[str],
title_tokens: list[str],
content_tokens: list[str],
) -> float:
"""
Calculate a relevance score for a note based on query tokens.
"""
@@ -280,33 +304,35 @@ class NextcloudClient:
return score
def _cleanup_old_attachment_directory(self, *, note_id: int, old_category: str):
async def _cleanup_old_attachment_directory(self, *, note_id: int, old_category: str):
"""
Clean up the attachment directory for a note in its old category location.
Called after a category change to prevent orphaned directories.
"""
# Construct path to old attachment directory
old_category_path_part = f"{old_category}/" if old_category else ""
old_attachment_dir_path = f"Notes/{old_category_path_part}.attachments.{note_id}/"
old_attachment_dir_path = (
f"Notes/{old_category_path_part}.attachments.{note_id}/"
)
logger.info(f"Cleaning up old attachment directory: {old_attachment_dir_path}")
try:
delete_result = self.delete_webdav_resource(path=old_attachment_dir_path)
delete_result = await self.delete_webdav_resource(path=old_attachment_dir_path)
logger.info(f"Cleanup of old attachment directory result: {delete_result}")
return delete_result
except Exception as e:
logger.error(f"Error during cleanup of old attachment directory: {e}")
raise e
def delete_webdav_resource(self, *, path: str):
async def delete_webdav_resource(self, *, path: str):
"""Delete a resource (file or directory) via WebDAV DELETE."""
# Ensure path ends with a slash if it's a directory
if not path.endswith('/'):
# This is a heuristic; a more robust solution would check resource type first
# but for the specific case of deleting the attachment directory, this is acceptable.
path_with_slash = f"{path}/"
if not path.endswith("/"):
# This is a heuristic; a more robust solution would check resource type first
# but for the specific case of deleting the attachment directory, this is acceptable.
path_with_slash = f"{path}/"
else:
path_with_slash = path
path_with_slash = path
webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}"
logger.info("Deleting WebDAV resource: %s", webdav_path)
@@ -316,19 +342,29 @@ class NextcloudClient:
# First try a PROPFIND to verify resource exists
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
try:
propfind_resp = self._client.request("PROPFIND", webdav_path, headers=propfind_headers)
logger.info(f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}")
propfind_resp = await self._client.request(
"PROPFIND", webdav_path, headers=propfind_headers
)
logger.info(
f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}"
)
# If we get here with 2xx, the resource exists
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.info(f"Resource '{webdav_path}' doesn't exist, no deletion needed.")
logger.info(
f"Resource '{webdav_path}' doesn't exist, no deletion needed."
)
return {"status_code": 404}
# For other errors, continue with deletion attempt
# Proceed with deletion
response = self._client.delete(webdav_path, headers=headers)
response.raise_for_status() # Raises for 4xx/5xx status codes
logger.info("Successfully deleted WebDAV resource '%s' (Status: %s)", webdav_path, response.status_code)
response = await self._client.delete(webdav_path, headers=headers)
response.raise_for_status() # Raises for 4xx/5xx status codes
logger.info(
"Successfully deleted WebDAV resource '%s' (Status: %s)",
webdav_path,
response.status_code,
)
# DELETE typically returns 204 No Content on success
return {"status_code": response.status_code}
@@ -344,7 +380,7 @@ class NextcloudClient:
raise e
else:
logger.info("Resource '%s' not found, no deletion needed.", webdav_path)
return {"status_code": 404} # Indicate resource was not found
return {"status_code": 404} # Indicate resource was not found
except Exception as e:
logger.warning(
"Unexpected error deleting WebDAV resource '%s': %s",
@@ -353,11 +389,11 @@ class NextcloudClient:
)
raise e
def notes_delete_note(self, *, note_id: int):
async def notes_delete_note(self, *, note_id: int):
"""Deletes a note via API and attempts to delete its attachment directory via WebDAV."""
# Fetch note details first to get the category for path construction
try:
note_details = self.notes_get_note(note_id=note_id)
note_details = await self.notes_get_note(note_id=note_id)
category = note_details.get("category", "")
# Check for other potential categories (if any note was moved between categories)
@@ -373,39 +409,51 @@ class NextcloudClient:
# We could add logic here to check for other common categories if needed
logger.info(f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}")
logger.info(
f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}"
)
except HTTPStatusError as e:
# If note doesn't exist (404), we can't delete attachments anyway.
# Re-raise other errors.
if e.response.status_code == 404:
logger.warning(f"Note {note_id} not found when attempting delete. Skipping attachment cleanup.")
logger.warning(
f"Note {note_id} not found when attempting delete. Skipping attachment cleanup."
)
# Still raise the 404 as the primary delete operation failed
raise e
else:
logger.error(f"Error fetching note {note_id} details before deleting attachments: {e}")
raise e # Re-raise unexpected errors during fetch
logger.error(
f"Error fetching note {note_id} details before deleting attachments: {e}"
)
raise e # Re-raise unexpected errors during fetch
# Proceed with API note deletion
logger.info(f"Deleting note {note_id} via API.")
response = self._client.delete(f"/apps/notes/api/v1/notes/{note_id}")
response.raise_for_status() # Raise if API deletion fails
response = await self._client.delete(f"/apps/notes/api/v1/notes/{note_id}")
response.raise_for_status() # Raise if API deletion fails
logger.info(f"Note {note_id} deleted successfully via API.")
json_response = response.json() # Usually empty on success
json_response = response.json() # Usually empty on success
# Now, attempt to delete the associated attachments directory via WebDAV for each potential category
for cat in potential_categories:
cat_path_part = f"{cat}/" if cat else ""
attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/"
logger.info(f"Attempting to delete attachment directory for note {note_id} in category '{cat}' via WebDAV: {attachment_dir_path}")
logger.info(
f"Attempting to delete attachment directory for note {note_id} in category '{cat}' via WebDAV: {attachment_dir_path}"
)
try:
# delete_webdav_resource expects path relative to user's files dir
delete_result = self.delete_webdav_resource(path=attachment_dir_path)
logger.info(f"WebDAV deletion for category '{cat}' attachment directory: {delete_result}")
delete_result = await self.delete_webdav_resource(path=attachment_dir_path)
logger.info(
f"WebDAV deletion for category '{cat}' attachment directory: {delete_result}"
)
except Exception as e:
# Log the error but don't re-raise, as API note deletion itself was successful
# Also, we want to try other potential categories even if one fails
logger.warning(f"Failed during WebDAV deletion for category '{cat}' attachment directory: {e}")
logger.warning(
f"Failed during WebDAV deletion for category '{cat}' attachment directory: {e}"
)
return json_response
@@ -418,7 +466,15 @@ class NextcloudClient:
# Removed _get_note_attachment_webdav_path helper
def add_note_attachment(self, *, note_id: int, filename: str, content: bytes, category: str | None = None, mime_type: str | None = None):
async def add_note_attachment(
self,
*,
note_id: int,
filename: str,
content: bytes,
category: str | None = None,
mime_type: str | None = None,
):
"""
Add/Update an attachment to a note via WebDAV PUT.
Requires the caller to provide the note's category.
@@ -427,20 +483,29 @@ class NextcloudClient:
webdav_base = self._get_webdav_base_path()
category_path_part = f"{category}/" if category else ""
attachment_dir_segment = f".attachments.{note_id}"
parent_dir_webdav_rel_path = f"Notes/{category_path_part}{attachment_dir_segment}"
parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}" # Full path for MKCOL
attachment_path = f"{parent_dir_path}/{filename}" # Full path for PUT
parent_dir_webdav_rel_path = (
f"Notes/{category_path_part}{attachment_dir_segment}"
)
parent_dir_path = (
f"{webdav_base}/{parent_dir_webdav_rel_path}" # Full path for MKCOL
)
attachment_path = f"{parent_dir_path}/{filename}" # Full path for PUT
logger.info(f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}")
logger.info(
f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}"
)
# Log current auth settings to diagnose the issue
logger.info("WebDAV auth settings - Username: %s, Auth Type: %s",
self.username, type(self._client.auth).__name__)
logger.info(
"WebDAV auth settings - Username: %s, Auth Type: %s",
self.username,
type(self._client.auth).__name__,
)
if not mime_type:
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream" # Default if guessing fails
mime_type = "application/octet-stream" # Default if guessing fails
headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"}
try:
@@ -451,59 +516,92 @@ class NextcloudClient:
# Log details of the auth being used by the client for this specific request
if self._client.auth:
auth_header = self._client.auth.auth_flow(self._client.build_request("GET", notes_dir_path)).__next__().headers.get("Authorization")
logger.info("Authorization header for PROPFIND (Notes dir): %s", auth_header if auth_header else "Not present or generated by auth flow")
auth_header = (
self._client.auth.auth_flow(
self._client.build_request("GET", notes_dir_path)
)
.__next__()
.headers.get("Authorization")
)
logger.info(
"Authorization header for PROPFIND (Notes dir): %s",
(
auth_header
if auth_header
else "Not present or generated by auth flow"
),
)
else:
logger.info("No httpx.Auth object configured on the client for PROPFIND (Notes dir).")
logger.info(
"No httpx.Auth object configured on the client for PROPFIND (Notes dir)."
)
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
logger.info("Headers for PROPFIND (Notes dir): %s", propfind_headers)
notes_dir_response = self._client.request("PROPFIND", notes_dir_path,
headers=propfind_headers)
notes_dir_response = await self._client.request(
"PROPFIND", notes_dir_path, headers=propfind_headers
)
if notes_dir_response.status_code == 401:
logger.error("WebDAV authentication failed for Notes directory. Please verify WebDAV permissions.")
logger.error(
"WebDAV authentication failed for Notes directory. Please verify WebDAV permissions."
)
raise HTTPStatusError(
f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}",
request=notes_dir_response.request,
response=notes_dir_response
response=notes_dir_response,
)
elif notes_dir_response.status_code >= 400:
logger.error("Error accessing WebDAV Notes directory: %s", notes_dir_response.status_code)
logger.error(
"Error accessing WebDAV Notes directory: %s",
notes_dir_response.status_code,
)
notes_dir_response.raise_for_status()
else:
logger.info("Successfully accessed WebDAV Notes directory (Status: %s)",
notes_dir_response.status_code)
logger.info(
"Successfully accessed WebDAV Notes directory (Status: %s)",
notes_dir_response.status_code,
)
# Ensure the parent directory exists using MKCOL
# parent_dir_path is now determined by the helper method
logger.info("Ensuring attachments directory exists: %s", parent_dir_path)
mkcol_headers = {"OCS-APIRequest": "true"}
logger.info("Headers for MKCOL (Attachments dir): %s", mkcol_headers)
mkcol_response = self._client.request("MKCOL", parent_dir_path, headers=mkcol_headers)
mkcol_response = await self._client.request(
"MKCOL", parent_dir_path, headers=mkcol_headers
)
# MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists)
# We can ignore 405, but raise for other errors
if mkcol_response.status_code not in [201, 405]:
logger.warning(
"Unexpected status code %s when creating attachments directory",
mkcol_response.status_code
mkcol_response.status_code,
)
mkcol_response.raise_for_status()
else:
logger.info("Created/verified directory: %s (Status: %s)",
parent_dir_path, mkcol_response.status_code)
logger.info(
"Created/verified directory: %s (Status: %s)",
parent_dir_path,
mkcol_response.status_code,
)
# Proceed with the PUT request
logger.info("Putting attachment file to: %s", attachment_path)
response = self._client.put(
attachment_path,
content=content,
headers=headers
response = await self._client.put(
attachment_path, content=content, headers=headers
)
response.raise_for_status() # Raises for 4xx/5xx status codes
logger.info(
"Successfully uploaded attachment '%s' to note %s (Status: %s)",
filename,
note_id,
response.status_code,
)
response.raise_for_status() # Raises for 4xx/5xx status codes
logger.info("Successfully uploaded attachment '%s' to note %s (Status: %s)", filename, note_id, response.status_code)
# PUT typically returns 201 Created or 204 No Content on success
return {"status_code": response.status_code} # Return status or relevant info
return {
"status_code": response.status_code
} # Return status or relevant info
except HTTPStatusError as e:
logger.error(
@@ -522,7 +620,9 @@ class NextcloudClient:
)
raise e
def get_note_attachment(self, *, note_id: int, filename: str, category: str | None = None):
async def get_note_attachment(
self, *, note_id: int, filename: str, category: str | None = None
):
"""
Fetch a specific attachment from a note via WebDAV GET.
Requires the caller to provide the note's category.
@@ -533,16 +633,23 @@ class NextcloudClient:
attachment_dir_segment = f".attachments.{note_id}"
attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}"
logger.info(f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}")
logger.info(
f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}"
)
try:
response = self._client.get(attachment_path)
response = await self._client.get(attachment_path)
response.raise_for_status()
content = response.content
mime_type = response.headers.get("content-type", "application/octet-stream")
logger.info("Successfully fetched attachment '%s' (%s, %d bytes)", filename, mime_type, len(content))
logger.info(
"Successfully fetched attachment '%s' (%s, %d bytes)",
filename,
mime_type,
len(content),
)
return content, mime_type
except HTTPStatusError as e:
+1 -1
View File
@@ -17,7 +17,7 @@ LOGGING_CONFIG = {
"loggers": {
"": {
"handlers": ["default"],
"level": "DEBUG",
"level": "INFO",
},
"httpx": {
"handlers": ["default"],
+19 -21
View File
@@ -4,10 +4,8 @@ from nextcloud_mcp_server.config import setup_logging
from contextlib import asynccontextmanager
from dataclasses import dataclass
from mcp.server.fastmcp import FastMCP, Context
from mcp.server import Server
from collections.abc import AsyncIterator
from nextcloud_mcp_server.client import NextcloudClient
import asyncio # Import asyncio
setup_logging()
@@ -28,7 +26,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
yield AppContext(client=client)
finally:
# Cleanup on shutdown
client._client.close()
await client._client.aclose()
# Create an MCP server
@@ -38,38 +36,38 @@ logger = logging.getLogger(__name__)
@mcp.resource("nc://capabilities")
def nc_get_capabilities():
async def nc_get_capabilities():
"""Get the Nextcloud Host capabilities"""
# client = NextcloudClient.from_env()
ctx = (
mcp.get_context()
) # https://github.com/modelcontextprotocol/python-sdk/issues/244
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.capabilities()
return await client.capabilities()
@mcp.resource("notes://settings")
def notes_get_settings():
async def notes_get_settings():
"""Get the Notes App settings"""
ctx = (
mcp.get_context()
) # https://github.com/modelcontextprotocol/python-sdk/issues/244
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_get_settings()
return await client.notes_get_settings()
@mcp.tool()
def nc_get_note(note_id: int, ctx: Context):
async def nc_get_note(note_id: int, ctx: Context):
"""Get user note using note id"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_get_note(note_id=note_id)
return await client.notes_get_note(note_id=note_id)
@mcp.tool()
def nc_notes_create_note(title: str, content: str, category: str, ctx: Context):
async def nc_notes_create_note(title: str, content: str, category: str, ctx: Context):
"""Create a new note"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_create_note(
return await client.notes_create_note(
title=title,
content=content,
category=category,
@@ -77,7 +75,7 @@ def nc_notes_create_note(title: str, content: str, category: str, ctx: Context):
@mcp.tool()
def nc_notes_update_note(
async def nc_notes_update_note(
note_id: int,
etag: str,
title: str | None,
@@ -87,7 +85,7 @@ def nc_notes_update_note(
):
logger.info("Updating note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_update_note(
return await client.notes_update_note(
note_id=note_id,
etag=etag,
title=title,
@@ -97,35 +95,35 @@ def nc_notes_update_note(
@mcp.tool()
def nc_notes_append_content(note_id: int, content: str, ctx: Context):
async def nc_notes_append_content(note_id: int, content: str, ctx: Context):
"""Append content to an existing note with a clear separator"""
logger.info("Appending content to note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_append_content(note_id=note_id, content=content)
return await client.notes_append_content(note_id=note_id, content=content)
@mcp.tool()
def nc_notes_search_notes(query: str, ctx: Context):
async def nc_notes_search_notes(query: str, ctx: Context):
"""Search notes by title or content, returning only id, title, and category."""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_search_notes(query=query)
return await client.notes_search_notes(query=query)
@mcp.tool()
def nc_notes_delete_note(note_id: int, ctx: Context):
async def nc_notes_delete_note(note_id: int, ctx: Context):
logger.info("Deleting note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return client.notes_delete_note(note_id=note_id)
return await client.notes_delete_note(note_id=note_id)
@mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}")
def nc_notes_get_attachment(note_id: int, attachment_filename: str):
async def nc_notes_get_attachment(note_id: int, attachment_filename: str):
"""Get a specific attachment from a note"""
ctx = mcp.get_context()
client: NextcloudClient = ctx.request_context.lifespan_context.client
# Assuming a method get_note_attachment exists in the client
# This method should return the raw content and determine the mime type
content, mime_type = client.get_note_attachment(
content, mime_type = await client.get_note_attachment(
note_id=note_id, filename=attachment_filename
)
return {