chore: ruff format

This commit is contained in:
Chris Coutinho
2025-07-06 08:41:02 +02:00
parent 5b512f83bd
commit a57c12591a
5 changed files with 220 additions and 130 deletions
+6 -6
View File
@@ -9,29 +9,29 @@ logger = logging.getLogger(__name__)
class BaseNextcloudClient(ABC):
"""Base class for all Nextcloud app clients."""
def __init__(self, http_client: AsyncClient, username: str):
"""Initialize with shared HTTP client and username.
Args:
http_client: Authenticated AsyncClient instance
username: Nextcloud username for WebDAV operations
"""
self._client = http_client
self.username = username
def _get_webdav_base_path(self) -> str:
"""Helper to get the base WebDAV path for the authenticated user."""
return f"/remote.php/dav/files/{self.username}"
async def _make_request(self, method: str, url: str, **kwargs):
"""Common request wrapper with logging and error handling.
Args:
method: HTTP method
url: Request URL
**kwargs: Additional request parameters
Returns:
Response object
"""
+6 -5
View File
@@ -32,7 +32,7 @@ def log_response(response: Response):
class NextcloudClient:
"""Main Nextcloud client that orchestrates all app clients."""
def __init__(self, base_url: str, username: str, auth: Auth | None = None):
self.username = username
self._client = AsyncClient(
@@ -40,11 +40,11 @@ class NextcloudClient:
auth=auth,
# event_hooks={"request": [log_request], "response": [log_response]},
)
# Initialize app clients
self.notes = NotesClient(self._client, username)
self.webdav = WebDAVClient(self._client, username)
# Initialize controllers
self._notes_search = NotesSearchController()
@@ -88,7 +88,9 @@ class NextcloudClient:
category: str | None = None,
):
"""Create a new note."""
return await self.notes.create_note(title=title, content=content, category=category)
return await self.notes.create_note(
title=title, content=content, category=category
)
async def notes_update_note(
self,
@@ -113,7 +115,6 @@ class NextcloudClient:
all_notes = await self.notes.get_all_notes()
return self._notes_search.search_notes(all_notes, query)
async def notes_delete_note(self, *, note_id: int):
"""Delete a note and its attachments."""
return await self.notes.delete_note(note_id)
@@ -5,39 +5,43 @@ from typing import List, Dict, Any
class NotesSearchController:
"""Handles notes search logic and scoring."""
def search_notes(self, notes: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
def search_notes(
self, notes: List[Dict[str, Any]], query: str
) -> List[Dict[str, Any]]:
"""
Search notes using token-based matching with relevance ranking.
Returns notes sorted by relevance score.
"""
search_results = []
query_tokens = self._process_query(query)
# If empty query after processing, return empty results
if not query_tokens:
return []
# Process and score each note
for note in notes:
title_tokens, content_tokens = self._process_note_content(note)
score = self._calculate_score(query_tokens, title_tokens, content_tokens)
# Only include notes with a non-zero score
if score >= 0.5:
search_results.append({
"id": note.get("id"),
"title": note.get("title"),
"category": note.get("category"),
"modified": note.get("modified"),
"_score": score, # Include score for sorting
})
search_results.append(
{
"id": note.get("id"),
"title": note.get("title"),
"category": note.get("category"),
"modified": note.get("modified"),
"_score": score, # Include score for sorting
}
)
# Sort by score in descending order
search_results.sort(key=lambda x: x["_score"], reverse=True)
return search_results
def _process_query(self, query: str) -> List[str]:
"""
Tokenize and normalize the search query.
@@ -47,21 +51,23 @@ class NotesSearchController:
# Filter out very short tokens
tokens = [token for token in tokens if len(token) > 1]
return tokens
def _process_note_content(self, note: Dict[str, Any]) -> tuple[List[str], List[str]]:
def _process_note_content(
self, note: Dict[str, Any]
) -> tuple[List[str], List[str]]:
"""
Tokenize and normalize note title and content.
"""
# Process title
title = note.get("title", "").lower()
title_tokens = title.split()
# Process content
content = note.get("content", "").lower()
content_tokens = content.split()
return title_tokens, content_tokens
def _calculate_score(
self,
query_tokens: List[str],
@@ -74,23 +80,23 @@ class NotesSearchController:
# Constants for weighting
TITLE_WEIGHT = 3.0
CONTENT_WEIGHT = 1.0
score = 0.0
# Count matches in title
title_matches = sum(1 for qt in query_tokens if qt in title_tokens)
if query_tokens: # Avoid division by zero
title_match_ratio = title_matches / len(query_tokens)
score += TITLE_WEIGHT * title_match_ratio
# Count matches in content
content_matches = sum(1 for qt in query_tokens if qt in content_tokens)
if query_tokens: # Avoid division by zero
content_match_ratio = content_matches / len(query_tokens)
score += CONTENT_WEIGHT * content_match_ratio
# If no tokens matched at all, return zero
if title_matches == 0 and content_matches == 0:
return 0.0
return score
+70 -41
View File
@@ -10,22 +10,24 @@ logger = logging.getLogger(__name__)
class NotesClient(BaseNextcloudClient):
"""Client for Nextcloud Notes app operations."""
async def get_settings(self) -> Dict[str, Any]:
"""Get Notes app settings."""
response = await self._make_request("GET", "/apps/notes/api/v1/settings")
return response.json()
async def get_all_notes(self) -> List[Dict[str, Any]]:
"""Get all notes."""
response = await self._make_request("GET", "/apps/notes/api/v1/notes")
return response.json()
async def get_note(self, note_id: int) -> Dict[str, Any]:
"""Get a specific note by ID."""
response = await self._make_request("GET", f"/apps/notes/api/v1/notes/{note_id}")
response = await self._make_request(
"GET", f"/apps/notes/api/v1/notes/{note_id}"
)
return response.json()
async def create_note(
self,
title: Optional[str] = None,
@@ -40,10 +42,12 @@ class NotesClient(BaseNextcloudClient):
body["content"] = content
if category:
body["category"] = category
response = await self._make_request("POST", "/apps/notes/api/v1/notes", json=body)
response = await self._make_request(
"POST", "/apps/notes/api/v1/notes", json=body
)
return response.json()
async def update(
self,
note_id: int,
@@ -61,9 +65,11 @@ class NotesClient(BaseNextcloudClient):
old_category = old_note.get("category", "")
logger.info(f"Current category for note {note_id}: '{old_category}'")
except Exception as e:
logger.warning(f"Could not fetch current note {note_id} details before update: {e}")
logger.warning(
f"Could not fetch current note {note_id} details before update: {e}"
)
old_note = None
# Prepare update body
body = {}
if title:
@@ -72,94 +78,117 @@ class NotesClient(BaseNextcloudClient):
body["content"] = content
if category:
body["category"] = category
logger.info(f"Attempting to update note {note_id} with etag {etag}. Body: {body}")
logger.info(
f"Attempting to update note {note_id} with etag {etag}. Body: {body}"
)
response = await self._make_request(
"PUT",
f"/apps/notes/api/v1/notes/{note_id}",
json=body,
headers={"If-Match": f'"{etag}"'}
headers={"If-Match": f'"{etag}"'},
)
logger.info(
f"Update response for note {note_id}: Status {response.status_code}"
)
logger.info(f"Update response for note {note_id}: Status {response.status_code}")
updated_note = response.json()
# Check for category change and cleanup old attachment directory if needed
if old_note and category is not None and old_note.get("category", "") != category:
logger.info(f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory")
if (
old_note
and category is not None
and old_note.get("category", "") != category
):
logger.info(
f"Category changed from '{old_note.get('category', '')}' to '{category}' - cleaning up old attachment directory"
)
try:
# Import here to avoid circular imports
from .webdav_client import WebDAVClient
webdav_client = WebDAVClient(self._client, self.username)
await webdav_client.cleanup_old_attachment_directory(
note_id=note_id,
old_category=old_note.get("category", "")
note_id=note_id, old_category=old_note.get("category", "")
)
except Exception as e:
logger.error(f"Error cleaning up old attachment directory for note {note_id}: {e}")
logger.error(
f"Error cleaning up old attachment directory for note {note_id}: {e}"
)
return updated_note
async def delete_note(self, note_id: int) -> Dict[str, Any]:
"""Delete a note and its attachments."""
# Fetch note details first to get category for cleanup
try:
note_details = await self.get_note(note_id)
category = note_details.get("category", "")
# Determine potential categories for cleanup
potential_categories = []
if category:
potential_categories.append(category)
if category != "":
potential_categories.append("") # Empty category
logger.info(f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}")
logger.info(
f"Note {note_id} has category: '{category}', will check attachment directories in: {potential_categories}"
)
except Exception as e:
logger.warning(f"Could not fetch note {note_id} details before deletion: {e}")
logger.warning(
f"Could not fetch note {note_id} details before deletion: {e}"
)
potential_categories = ["", "Unknown"] # Try common categories
# Delete the note via API
logger.info(f"Deleting note {note_id} via API")
response = await self._make_request("DELETE", f"/apps/notes/api/v1/notes/{note_id}")
response = await self._make_request(
"DELETE", f"/apps/notes/api/v1/notes/{note_id}"
)
logger.info(f"Note {note_id} deleted successfully via API")
json_response = response.json()
# Clean up attachment directories
try:
from .webdav_client import WebDAVClient
webdav_client = WebDAVClient(self._client, self.username)
for cat in potential_categories:
try:
await webdav_client.cleanup_note_attachments(note_id, cat)
except Exception as e:
logger.warning(f"Failed to cleanup attachments for category '{cat}': {e}")
logger.warning(
f"Failed to cleanup attachments for category '{cat}': {e}"
)
except Exception as e:
logger.warning(f"Error during attachment cleanup: {e}")
return json_response
async def append_content(self, note_id: int, content: str) -> Dict[str, Any]:
"""Append content to an existing note with a separator."""
logger.info(f"Appending content to note {note_id}")
# Get current note
current_note = await self.get_note(note_id)
# Use fixed separator for consistency
separator = "\n---\n"
# Combine content
existing_content = current_note.get("content", "")
if existing_content:
new_content = existing_content + separator + content
else:
new_content = content # No separator needed for empty notes
logger.info(f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)")
logger.info(
f"Combining existing content ({len(existing_content)} chars) with new content ({len(content)} chars)"
)
# Update with combined content
return await self.update(
note_id=note_id,
+107 -53
View File
@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
class WebDAVClient(BaseNextcloudClient):
"""Client for Nextcloud WebDAV operations."""
async def delete_resource(self, path: str) -> Dict[str, Any]:
"""Delete a resource (file or directory) via WebDAV DELETE."""
# Ensure path ends with a slash if it's a directory
@@ -20,10 +20,10 @@ class WebDAVClient(BaseNextcloudClient):
path_with_slash = f"{path}/"
else:
path_with_slash = path
webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}"
logger.info(f"Deleting WebDAV resource: {webdav_path}")
headers = {"OCS-APIRequest": "true"}
try:
# First try a PROPFIND to verify resource exists
@@ -32,19 +32,25 @@ class WebDAVClient(BaseNextcloudClient):
propfind_resp = await self._client.request(
"PROPFIND", webdav_path, headers=propfind_headers
)
logger.info(f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}")
logger.info(
f"Resource exists check (PROPFIND) status: {propfind_resp.status_code}"
)
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.info(f"Resource '{webdav_path}' doesn't exist, no deletion needed.")
logger.info(
f"Resource '{webdav_path}' doesn't exist, no deletion needed."
)
return {"status_code": 404}
# For other errors, continue with deletion attempt
# Proceed with deletion
response = await self._client.delete(webdav_path, headers=headers)
response.raise_for_status()
logger.info(f"Successfully deleted WebDAV resource '{webdav_path}' (Status: {response.status_code})")
logger.info(
f"Successfully deleted WebDAV resource '{webdav_path}' (Status: {response.status_code})"
)
return {"status_code": response.status_code}
except HTTPStatusError as e:
logger.warning(f"HTTP error deleting WebDAV resource '{webdav_path}': {e}")
if e.response.status_code != 404:
@@ -53,14 +59,20 @@ class WebDAVClient(BaseNextcloudClient):
logger.info(f"Resource '{webdav_path}' not found, no deletion needed.")
return {"status_code": 404}
except Exception as e:
logger.warning(f"Unexpected error deleting WebDAV resource '{webdav_path}': {e}")
logger.warning(
f"Unexpected error deleting WebDAV resource '{webdav_path}': {e}"
)
raise e
async def cleanup_old_attachment_directory(self, note_id: int, old_category: str) -> Dict[str, Any]:
async def cleanup_old_attachment_directory(
self, note_id: int, old_category: str
) -> Dict[str, Any]:
"""Clean up the attachment directory for a note in its old category location."""
old_category_path_part = f"{old_category}/" if old_category else ""
old_attachment_dir_path = f"Notes/{old_category_path_part}.attachments.{note_id}/"
old_attachment_dir_path = (
f"Notes/{old_category_path_part}.attachments.{note_id}/"
)
logger.info(f"Cleaning up old attachment directory: {old_attachment_dir_path}")
try:
delete_result = await self.delete_resource(path=old_attachment_dir_path)
@@ -69,21 +81,29 @@ class WebDAVClient(BaseNextcloudClient):
except Exception as e:
logger.error(f"Error during cleanup of old attachment directory: {e}")
raise e
async def cleanup_note_attachments(self, note_id: int, category: str) -> Dict[str, Any]:
async def cleanup_note_attachments(
self, note_id: int, category: str
) -> Dict[str, Any]:
"""Clean up attachment directory for a specific note and category."""
cat_path_part = f"{category}/" if category else ""
attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/"
logger.info(f"Attempting to delete attachment directory for note {note_id} in category '{category}' via WebDAV: {attachment_dir_path}")
logger.info(
f"Attempting to delete attachment directory for note {note_id} in category '{category}' via WebDAV: {attachment_dir_path}"
)
try:
delete_result = await self.delete_resource(path=attachment_dir_path)
logger.info(f"WebDAV deletion for category '{category}' attachment directory: {delete_result}")
logger.info(
f"WebDAV deletion for category '{category}' attachment directory: {delete_result}"
)
return delete_result
except Exception as e:
logger.warning(f"Failed during WebDAV deletion for category '{category}' attachment directory: {e}")
logger.warning(
f"Failed during WebDAV deletion for category '{category}' attachment directory: {e}"
)
raise e
async def add_note_attachment(
self,
note_id: int,
@@ -97,70 +117,96 @@ class WebDAVClient(BaseNextcloudClient):
webdav_base = self._get_webdav_base_path()
category_path_part = f"{category}/" if category else ""
attachment_dir_segment = f".attachments.{note_id}"
parent_dir_webdav_rel_path = f"Notes/{category_path_part}{attachment_dir_segment}"
parent_dir_webdav_rel_path = (
f"Notes/{category_path_part}{attachment_dir_segment}"
)
parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}"
attachment_path = f"{parent_dir_path}/{filename}"
logger.info(f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}")
logger.info(
f"Uploading attachment for note {note_id} (category: '{category or ''}') to WebDAV path: {attachment_path}"
)
# Log current auth settings
logger.info(f"WebDAV auth settings - Username: {self.username}, Auth Type: {type(self._client.auth).__name__}")
logger.info(
f"WebDAV auth settings - Username: {self.username}, Auth Type: {type(self._client.auth).__name__}"
)
if not mime_type:
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream"
headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"}
try:
# First check if we can access WebDAV at all
notes_dir_path = f"{webdav_base}/Notes"
logger.info(f"Testing WebDAV access to Notes directory: {notes_dir_path}")
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
notes_dir_response = await self._client.request(
"PROPFIND", notes_dir_path, headers=propfind_headers
)
if notes_dir_response.status_code == 401:
logger.error("WebDAV authentication failed for Notes directory. Please verify WebDAV permissions.")
logger.error(
"WebDAV authentication failed for Notes directory. Please verify WebDAV permissions."
)
raise HTTPStatusError(
f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}",
request=notes_dir_response.request,
response=notes_dir_response,
)
elif notes_dir_response.status_code >= 400:
logger.error(f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}")
logger.error(
f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}"
)
notes_dir_response.raise_for_status()
else:
logger.info(f"Successfully accessed WebDAV Notes directory (Status: {notes_dir_response.status_code})")
logger.info(
f"Successfully accessed WebDAV Notes directory (Status: {notes_dir_response.status_code})"
)
# Ensure the parent directory exists using MKCOL
logger.info(f"Ensuring attachments directory exists: {parent_dir_path}")
mkcol_headers = {"OCS-APIRequest": "true"}
mkcol_response = await self._client.request("MKCOL", parent_dir_path, headers=mkcol_headers)
mkcol_response = await self._client.request(
"MKCOL", parent_dir_path, headers=mkcol_headers
)
# MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists)
if mkcol_response.status_code not in [201, 405]:
logger.warning(f"Unexpected status code {mkcol_response.status_code} when creating attachments directory")
logger.warning(
f"Unexpected status code {mkcol_response.status_code} when creating attachments directory"
)
mkcol_response.raise_for_status()
else:
logger.info(f"Created/verified directory: {parent_dir_path} (Status: {mkcol_response.status_code})")
logger.info(
f"Created/verified directory: {parent_dir_path} (Status: {mkcol_response.status_code})"
)
# Proceed with the PUT request
logger.info(f"Putting attachment file to: {attachment_path}")
response = await self._client.put(attachment_path, content=content, headers=headers)
response = await self._client.put(
attachment_path, content=content, headers=headers
)
response.raise_for_status()
logger.info(f"Successfully uploaded attachment '{filename}' to note {note_id} (Status: {response.status_code})")
logger.info(
f"Successfully uploaded attachment '{filename}' to note {note_id} (Status: {response.status_code})"
)
return {"status_code": response.status_code}
except HTTPStatusError as e:
logger.error(f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}")
logger.error(
f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}"
)
raise e
except Exception as e:
logger.error(f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}")
logger.error(
f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}"
)
raise e
async def get_note_attachment(
self, note_id: int, filename: str, category: Optional[str] = None
) -> Tuple[bytes, str]:
@@ -169,22 +215,30 @@ class WebDAVClient(BaseNextcloudClient):
category_path_part = f"{category}/" if category else ""
attachment_dir_segment = f".attachments.{note_id}"
attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}"
logger.info(f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}")
logger.info(
f"Fetching attachment for note {note_id} (category: '{category or ''}') from WebDAV path: {attachment_path}"
)
try:
response = await self._client.get(attachment_path)
response.raise_for_status()
content = response.content
mime_type = response.headers.get("content-type", "application/octet-stream")
logger.info(f"Successfully fetched attachment '{filename}' ({mime_type}, {len(content)} bytes)")
logger.info(
f"Successfully fetched attachment '{filename}' ({mime_type}, {len(content)} bytes)"
)
return content, mime_type
except HTTPStatusError as e:
logger.error(f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}")
logger.error(
f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}"
)
raise e
except Exception as e:
logger.error(f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}")
logger.error(
f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}"
)
raise e