a11ae9c027
Enable ruff PLC0415 rule for all source files (tests excluded via per-file-ignores). Move 136 inline imports to top-level across 33 files. 8 imports suppressed with noqa for legitimate reasons: circular dependencies (client/__init__.py, context.py), optional dependency guards (app.py document processors, auth/userinfo_routes.py), and post-env-setup imports (smithery_main.py). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1536 lines
55 KiB
Python
1536 lines
55 KiB
Python
"""WebDAV client for Nextcloud file operations."""
|
|
|
|
import logging
|
|
import mimetypes
|
|
import xml.etree.ElementTree as ET
|
|
from email.utils import parsedate_to_datetime
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
from urllib.parse import unquote
|
|
|
|
from httpx import HTTPStatusError
|
|
|
|
from .base import BaseNextcloudClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WebDAVClient(BaseNextcloudClient):
|
|
"""Client for Nextcloud WebDAV operations."""
|
|
|
|
app_name = "webdav"
|
|
|
|
async def delete_resource(self, path: str) -> Dict[str, Any]:
|
|
"""Delete a resource (file or directory) via WebDAV DELETE."""
|
|
# Ensure path ends with a slash if it's a directory
|
|
if not path.endswith("/"):
|
|
path_with_slash = f"{path}/"
|
|
else:
|
|
path_with_slash = path
|
|
|
|
webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}"
|
|
logger.debug(f"Deleting WebDAV resource: {webdav_path}")
|
|
|
|
headers = {"OCS-APIRequest": "true"}
|
|
try:
|
|
# First try a PROPFIND to verify resource exists
|
|
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
|
|
try:
|
|
propfind_resp = await self._make_request(
|
|
"PROPFIND", webdav_path, headers=propfind_headers
|
|
)
|
|
logger.debug(
|
|
f"Resource exists check status: {propfind_resp.status_code}"
|
|
)
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
logger.debug(f"Resource '{path}' doesn't exist, no deletion needed")
|
|
return {"status_code": 404}
|
|
# For other errors, continue with deletion attempt
|
|
|
|
# Proceed with deletion
|
|
response = await self._make_request("DELETE", webdav_path, headers=headers)
|
|
logger.debug(f"Successfully deleted WebDAV resource '{path}'")
|
|
return {"status_code": response.status_code}
|
|
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
logger.debug(f"Resource '{path}' not found, no deletion needed")
|
|
return {"status_code": 404}
|
|
else:
|
|
logger.error(f"HTTP error deleting WebDAV resource '{path}': {e}")
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error deleting WebDAV resource '{path}': {e}")
|
|
raise e
|
|
|
|
async def cleanup_old_attachment_directory(
|
|
self, note_id: int, old_category: str
|
|
) -> Dict[str, Any]:
|
|
"""Clean up the attachment directory for a note in its old category location."""
|
|
old_category_path_part = f"{old_category}/" if old_category else ""
|
|
old_attachment_dir_path = (
|
|
f"Notes/{old_category_path_part}.attachments.{note_id}/"
|
|
)
|
|
|
|
logger.debug(f"Cleaning up old attachment directory: {old_attachment_dir_path}")
|
|
try:
|
|
delete_result = await self.delete_resource(path=old_attachment_dir_path)
|
|
logger.debug(f"Cleanup result: {delete_result}")
|
|
return delete_result
|
|
except Exception as e:
|
|
logger.error(f"Error during cleanup of old attachment directory: {e}")
|
|
raise e
|
|
|
|
async def cleanup_note_attachments(
|
|
self, note_id: int, category: str
|
|
) -> Dict[str, Any]:
|
|
"""Clean up attachment directory for a specific note and category."""
|
|
cat_path_part = f"{category}/" if category else ""
|
|
attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/"
|
|
|
|
logger.debug(
|
|
f"Cleaning up attachments for note {note_id} in category '{category}'"
|
|
)
|
|
try:
|
|
delete_result = await self.delete_resource(path=attachment_dir_path)
|
|
logger.debug(f"Cleanup result for note {note_id}: {delete_result}")
|
|
return delete_result
|
|
except Exception as e:
|
|
logger.error(f"Failed cleaning up attachments for note {note_id}: {e}")
|
|
raise e
|
|
|
|
async def add_note_attachment(
|
|
self,
|
|
note_id: int,
|
|
filename: str,
|
|
content: bytes,
|
|
category: Optional[str] = None,
|
|
mime_type: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Add/Update an attachment to a note via WebDAV PUT."""
|
|
# Construct paths based on provided category
|
|
webdav_base = self._get_webdav_base_path()
|
|
category_path_part = f"{category}/" if category else ""
|
|
attachment_dir_segment = f".attachments.{note_id}"
|
|
parent_dir_webdav_rel_path = (
|
|
f"Notes/{category_path_part}{attachment_dir_segment}"
|
|
)
|
|
parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}"
|
|
attachment_path = f"{parent_dir_path}/{filename}"
|
|
|
|
logger.debug(f"Uploading attachment '{filename}' for note {note_id}")
|
|
|
|
if not mime_type:
|
|
mime_type, _ = mimetypes.guess_type(filename)
|
|
if not mime_type:
|
|
mime_type = "application/octet-stream"
|
|
|
|
headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"}
|
|
try:
|
|
# First check if we can access WebDAV at all
|
|
notes_dir_path = f"{webdav_base}/Notes"
|
|
propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"}
|
|
notes_dir_response = await self._make_request(
|
|
"PROPFIND", notes_dir_path, headers=propfind_headers
|
|
)
|
|
|
|
if notes_dir_response.status_code == 401:
|
|
logger.error("WebDAV authentication failed for Notes directory")
|
|
raise HTTPStatusError(
|
|
f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}",
|
|
request=notes_dir_response.request,
|
|
response=notes_dir_response,
|
|
)
|
|
elif notes_dir_response.status_code >= 400:
|
|
logger.error(
|
|
f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}"
|
|
)
|
|
notes_dir_response.raise_for_status()
|
|
|
|
# Ensure the parent directory exists using MKCOL
|
|
mkcol_headers = {"OCS-APIRequest": "true"}
|
|
mkcol_response = await self._make_request(
|
|
"MKCOL", parent_dir_path, headers=mkcol_headers
|
|
)
|
|
|
|
# MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists)
|
|
if mkcol_response.status_code not in [201, 405]:
|
|
logger.error(
|
|
f"Unexpected status code {mkcol_response.status_code} when creating attachments directory"
|
|
)
|
|
mkcol_response.raise_for_status()
|
|
|
|
# Proceed with the PUT request
|
|
response = await self._make_request(
|
|
"PUT", attachment_path, content=content, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
logger.debug(
|
|
f"Successfully uploaded attachment '{filename}' to note {note_id}"
|
|
)
|
|
return {"status_code": response.status_code}
|
|
|
|
except HTTPStatusError as e:
|
|
logger.error(
|
|
f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}"
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}"
|
|
)
|
|
raise e
|
|
|
|
async def get_note_attachment(
|
|
self, note_id: int, filename: str, category: Optional[str] = None
|
|
) -> Tuple[bytes, str]:
|
|
"""Fetch a specific attachment from a note via WebDAV GET."""
|
|
webdav_base = self._get_webdav_base_path()
|
|
category_path_part = f"{category}/" if category else ""
|
|
attachment_dir_segment = f".attachments.{note_id}"
|
|
attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}"
|
|
|
|
logger.debug(f"Fetching attachment '{filename}' for note {note_id}")
|
|
|
|
try:
|
|
response = await self._make_request("GET", attachment_path)
|
|
response.raise_for_status()
|
|
|
|
content = response.content
|
|
mime_type = response.headers.get("content-type", "application/octet-stream")
|
|
|
|
logger.debug(
|
|
f"Successfully fetched attachment '{filename}' ({len(content)} bytes)"
|
|
)
|
|
return content, mime_type
|
|
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
logger.debug(f"Attachment '{filename}' not found for note {note_id}")
|
|
else:
|
|
logger.error(
|
|
f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}"
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}"
|
|
)
|
|
raise e
|
|
|
|
async def list_directory(self, path: str = "") -> List[Dict[str, Any]]:
|
|
"""List files and directories in the specified path via WebDAV PROPFIND."""
|
|
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
|
|
if not webdav_path.endswith("/"):
|
|
webdav_path += "/"
|
|
|
|
logger.debug(f"Listing directory: {path}")
|
|
|
|
propfind_body = """<?xml version="1.0"?>
|
|
<d:propfind xmlns:d="DAV:">
|
|
<d:prop>
|
|
<d:displayname/>
|
|
<d:getcontentlength/>
|
|
<d:getcontenttype/>
|
|
<d:getlastmodified/>
|
|
<d:resourcetype/>
|
|
</d:prop>
|
|
</d:propfind>"""
|
|
|
|
headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
|
|
|
|
try:
|
|
response = await self._make_request(
|
|
"PROPFIND", webdav_path, content=propfind_body, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse the XML response
|
|
root = ET.fromstring(response.content)
|
|
items = []
|
|
|
|
# Skip the first response (the directory itself)
|
|
responses = root.findall(".//{DAV:}response")[1:]
|
|
|
|
for response_elem in responses:
|
|
href = response_elem.find(".//{DAV:}href")
|
|
if href is None:
|
|
continue
|
|
|
|
# Extract file/directory name from href
|
|
href_text = href.text or ""
|
|
name = href_text.rstrip("/").split("/")[-1]
|
|
if not name:
|
|
continue
|
|
|
|
# Get properties
|
|
propstat = response_elem.find(".//{DAV:}propstat")
|
|
if propstat is None:
|
|
continue
|
|
|
|
prop = propstat.find(".//{DAV:}prop")
|
|
if prop is None:
|
|
continue
|
|
|
|
# Determine if it's a directory
|
|
resourcetype = prop.find(".//{DAV:}resourcetype")
|
|
is_directory = (
|
|
resourcetype is not None
|
|
and resourcetype.find(".//{DAV:}collection") is not None
|
|
)
|
|
|
|
# Get other properties
|
|
size_elem = prop.find(".//{DAV:}getcontentlength")
|
|
size = (
|
|
int(size_elem.text)
|
|
if size_elem is not None and size_elem.text
|
|
else 0
|
|
)
|
|
|
|
content_type_elem = prop.find(".//{DAV:}getcontenttype")
|
|
content_type = (
|
|
content_type_elem.text if content_type_elem is not None else None
|
|
)
|
|
|
|
modified_elem = prop.find(".//{DAV:}getlastmodified")
|
|
modified = modified_elem.text if modified_elem is not None else None
|
|
|
|
items.append(
|
|
{
|
|
"name": name,
|
|
"path": f"{path.rstrip('/')}/{name}" if path else name,
|
|
"is_directory": is_directory,
|
|
"size": size if not is_directory else None,
|
|
"content_type": content_type,
|
|
"last_modified": modified,
|
|
}
|
|
)
|
|
|
|
logger.debug(f"Found {len(items)} items in directory: {path}")
|
|
return items
|
|
|
|
except HTTPStatusError as e:
|
|
logger.error(f"HTTP error listing directory '{webdav_path}': {e}")
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error listing directory '{webdav_path}': {e}")
|
|
raise e
|
|
|
|
async def read_file(self, path: str) -> Tuple[bytes, str]:
|
|
"""Read a file's content via WebDAV GET."""
|
|
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
|
|
|
|
logger.debug(f"Reading file: {path}")
|
|
|
|
try:
|
|
response = await self._make_request("GET", webdav_path)
|
|
response.raise_for_status()
|
|
|
|
content = response.content
|
|
content_type = response.headers.get(
|
|
"content-type", "application/octet-stream"
|
|
)
|
|
|
|
logger.debug(f"Successfully read file '{path}' ({len(content)} bytes)")
|
|
return content, content_type
|
|
|
|
except HTTPStatusError as e:
|
|
logger.error(f"HTTP error reading file '{path}': {e}")
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error reading file '{path}': {e}")
|
|
raise e
|
|
|
|
async def write_file(
|
|
self, path: str, content: bytes, content_type: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Write content to a file via WebDAV PUT."""
|
|
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
|
|
|
|
logger.debug(f"Writing file: {path}")
|
|
|
|
if not content_type:
|
|
content_type, _ = mimetypes.guess_type(path)
|
|
if not content_type:
|
|
content_type = "application/octet-stream"
|
|
|
|
headers = {"Content-Type": content_type, "OCS-APIRequest": "true"}
|
|
|
|
try:
|
|
response = await self._make_request(
|
|
"PUT", webdav_path, content=content, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
|
|
logger.debug(f"Successfully wrote file '{path}'")
|
|
return {"status_code": response.status_code}
|
|
|
|
except HTTPStatusError as e:
|
|
logger.error(f"HTTP error writing file '{path}': {e}")
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error writing file '{path}': {e}")
|
|
raise e
|
|
|
|
async def create_directory(
|
|
self, path: str, recursive: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Create a directory via WebDAV MKCOL."""
|
|
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
|
|
if not webdav_path.endswith("/"):
|
|
webdav_path += "/"
|
|
|
|
logger.debug(f"Creating directory: {path}")
|
|
|
|
headers = {"OCS-APIRequest": "true"}
|
|
|
|
try:
|
|
response = await self._make_request("MKCOL", webdav_path, headers=headers)
|
|
response.raise_for_status()
|
|
|
|
logger.debug(f"Successfully created directory '{path}'")
|
|
return {"status_code": response.status_code}
|
|
|
|
except HTTPStatusError as e:
|
|
# Method Not Allowed - directory already exists
|
|
if e.response.status_code == 405:
|
|
logger.debug(f"Directory '{path}' already exists")
|
|
return {"status_code": 405, "message": "Directory already exists"}
|
|
|
|
# File Conflict - parent directory does not exist
|
|
if e.response.status_code == 409 and recursive:
|
|
# Extract parent directory path
|
|
path_parts = path.strip("/").split("/")
|
|
if len(path_parts) > 1:
|
|
parent_dir = "/".join(path_parts[:-1])
|
|
logger.debug(
|
|
f"Parent directory '{parent_dir}' doesn't exist, creating recursively"
|
|
)
|
|
await self.create_directory(parent_dir, recursive)
|
|
# Now try to create the original directory again
|
|
return await self.create_directory(path, recursive)
|
|
else:
|
|
# This shouldn't happen for single-level directories under root
|
|
logger.error(f"409 conflict for single-level directory '{path}'")
|
|
raise e
|
|
|
|
logger.error(f"HTTP error creating directory '{path}': {e}")
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error creating directory '{path}': {e}")
|
|
raise e
|
|
|
|
async def move_resource(
|
|
self, source_path: str, destination_path: str, overwrite: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Move or rename a resource (file or directory) via WebDAV MOVE.
|
|
|
|
Args:
|
|
source_path: The path of the file or directory to move
|
|
destination_path: The new path for the file or directory
|
|
overwrite: Whether to overwrite the destination if it exists
|
|
|
|
Returns:
|
|
Dict with status_code and optional message
|
|
"""
|
|
source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}"
|
|
destination_webdav_path = (
|
|
f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}"
|
|
)
|
|
|
|
# Ensure paths have consistent trailing slashes for directories
|
|
if source_path.endswith("/") and not destination_path.endswith("/"):
|
|
destination_webdav_path += "/"
|
|
elif not source_path.endswith("/") and destination_path.endswith("/"):
|
|
source_webdav_path += "/"
|
|
|
|
logger.debug(f"Moving resource from '{source_path}' to '{destination_path}'")
|
|
|
|
headers = {
|
|
"OCS-APIRequest": "true",
|
|
"Destination": destination_webdav_path,
|
|
"Overwrite": "T" if overwrite else "F",
|
|
}
|
|
|
|
try:
|
|
response = await self._make_request(
|
|
"MOVE", source_webdav_path, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
|
|
logger.debug(
|
|
f"Successfully moved resource from '{source_path}' to '{destination_path}'"
|
|
)
|
|
return {"status_code": response.status_code}
|
|
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
logger.debug(f"Source resource '{source_path}' not found")
|
|
return {"status_code": 404, "message": "Source resource not found"}
|
|
elif e.response.status_code == 412:
|
|
logger.debug(
|
|
f"Destination '{destination_path}' already exists and overwrite is false"
|
|
)
|
|
return {
|
|
"status_code": 412,
|
|
"message": "Destination already exists and overwrite is false",
|
|
}
|
|
elif e.response.status_code == 409:
|
|
logger.debug(
|
|
f"Parent directory of destination '{destination_path}' doesn't exist"
|
|
)
|
|
return {
|
|
"status_code": 409,
|
|
"message": "Parent directory of destination doesn't exist",
|
|
}
|
|
logger.debug(
|
|
f"Parent directory of destination '{destination_path}' doesn't exist"
|
|
)
|
|
return {
|
|
"status_code": 409,
|
|
"message": "Parent directory of destination doesn't exist",
|
|
}
|
|
else:
|
|
logger.error(
|
|
f"HTTP error moving resource from '{source_path}' to '{destination_path}': {e}"
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Unexpected error moving resource from '{source_path}' to '{destination_path}': {e}"
|
|
)
|
|
raise e
|
|
|
|
async def copy_resource(
|
|
self, source_path: str, destination_path: str, overwrite: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Copy a resource (file or directory) via WebDAV COPY.
|
|
|
|
Args:
|
|
source_path: The path of the file or directory to copy
|
|
destination_path: The destination path for the copy
|
|
overwrite: Whether to overwrite the destination if it exists
|
|
|
|
Returns:
|
|
Dict with status_code and optional message
|
|
"""
|
|
source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}"
|
|
destination_webdav_path = (
|
|
f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}"
|
|
)
|
|
|
|
# Ensure paths have consistent trailing slashes for directories
|
|
if source_path.endswith("/") and not destination_path.endswith("/"):
|
|
destination_webdav_path += "/"
|
|
elif not source_path.endswith("/") and destination_path.endswith("/"):
|
|
source_webdav_path += "/"
|
|
|
|
logger.debug(f"Copying resource from '{source_path}' to '{destination_path}'")
|
|
|
|
headers = {
|
|
"OCS-APIRequest": "true",
|
|
"Destination": destination_webdav_path,
|
|
"Overwrite": "T" if overwrite else "F",
|
|
}
|
|
|
|
try:
|
|
response = await self._make_request(
|
|
"COPY", source_webdav_path, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
|
|
logger.debug(
|
|
f"Successfully copied resource from '{source_path}' to '{destination_path}'"
|
|
)
|
|
return {"status_code": response.status_code}
|
|
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
logger.debug(f"Source resource '{source_path}' not found")
|
|
return {"status_code": 404, "message": "Source resource not found"}
|
|
elif e.response.status_code == 412:
|
|
logger.debug(
|
|
f"Destination '{destination_path}' already exists and overwrite is false"
|
|
)
|
|
return {
|
|
"status_code": 412,
|
|
"message": "Destination already exists and overwrite is false",
|
|
}
|
|
elif e.response.status_code == 409:
|
|
logger.debug(
|
|
f"Parent directory of destination '{destination_path}' doesn't exist"
|
|
)
|
|
return {
|
|
"status_code": 409,
|
|
"message": "Parent directory of destination doesn't exist",
|
|
}
|
|
else:
|
|
logger.error(
|
|
f"HTTP error copying resource from '{source_path}' to '{destination_path}': {e}"
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Unexpected error copying resource from '{source_path}' to '{destination_path}': {e}"
|
|
)
|
|
raise e
|
|
|
|
async def search_files(
|
|
self,
|
|
scope: str = "",
|
|
where_conditions: Optional[str] = None,
|
|
properties: Optional[List[str]] = None,
|
|
order_by: Optional[List[Tuple[str, str]]] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Search for files using WebDAV SEARCH method (RFC 5323).
|
|
|
|
Args:
|
|
scope: Directory path to search in (empty string for user root)
|
|
where_conditions: XML string for where clause conditions
|
|
properties: List of property names to retrieve (defaults to basic set)
|
|
order_by: List of (property, direction) tuples for sorting, e.g. [("getlastmodified", "descending")]
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of file/directory dictionaries with requested properties
|
|
"""
|
|
# Default properties if not specified
|
|
if properties is None:
|
|
properties = [
|
|
"displayname",
|
|
"getcontentlength",
|
|
"getcontenttype",
|
|
"getlastmodified",
|
|
"resourcetype",
|
|
"getetag",
|
|
]
|
|
|
|
# Build the SEARCH request XML
|
|
search_body = self._build_search_xml(
|
|
scope=scope,
|
|
where_conditions=where_conditions,
|
|
properties=properties,
|
|
order_by=order_by,
|
|
limit=limit,
|
|
)
|
|
|
|
# The SEARCH endpoint is at the dav root
|
|
search_path = "/remote.php/dav/"
|
|
|
|
headers = {"Content-Type": "text/xml", "OCS-APIRequest": "true"}
|
|
|
|
logger.debug(f"Searching files in scope: {scope}")
|
|
|
|
try:
|
|
response = await self._make_request(
|
|
"SEARCH", search_path, content=search_body, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse the XML response
|
|
results = self._parse_search_response(response.content, scope)
|
|
|
|
logger.debug(f"Search returned {len(results)} results")
|
|
return results
|
|
|
|
except HTTPStatusError as e:
|
|
logger.error(f"HTTP error during search: {e}")
|
|
raise e
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during search: {e}")
|
|
raise e
|
|
|
|
def _build_search_xml(
|
|
self,
|
|
scope: str,
|
|
where_conditions: Optional[str],
|
|
properties: List[str],
|
|
order_by: Optional[List[Tuple[str, str]]],
|
|
limit: Optional[int],
|
|
) -> str:
|
|
"""Build the XML body for a SEARCH request."""
|
|
# Construct the scope path
|
|
username = self.username
|
|
scope_path = f"/files/{username}"
|
|
if scope:
|
|
scope_path = f"{scope_path}/{scope.lstrip('/')}"
|
|
|
|
# Build property list
|
|
prop_xml = "\n".join([self._property_to_xml(prop) for prop in properties])
|
|
|
|
# Build where clause
|
|
where_xml = where_conditions if where_conditions else ""
|
|
|
|
# Build order by clause
|
|
orderby_xml = ""
|
|
if order_by:
|
|
order_elements = []
|
|
for prop, direction in order_by:
|
|
prop_element = self._property_to_xml(prop)
|
|
dir_element = (
|
|
"<d:ascending/>"
|
|
if direction.lower() == "ascending"
|
|
else "<d:descending/>"
|
|
)
|
|
order_elements.append(f"<d:order>{prop_element}{dir_element}</d:order>")
|
|
orderby_xml = "\n".join(order_elements)
|
|
else:
|
|
orderby_xml = ""
|
|
|
|
# Build limit clause
|
|
limit_xml = (
|
|
f"<d:limit><d:nresults>{limit}</d:nresults></d:limit>" if limit else ""
|
|
)
|
|
|
|
# Construct the full SEARCH XML
|
|
search_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<d:searchrequest xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
|
|
<d:basicsearch>
|
|
<d:select>
|
|
<d:prop>
|
|
{prop_xml}
|
|
</d:prop>
|
|
</d:select>
|
|
<d:from>
|
|
<d:scope>
|
|
<d:href>{scope_path}</d:href>
|
|
<d:depth>infinity</d:depth>
|
|
</d:scope>
|
|
</d:from>
|
|
<d:where>
|
|
{where_xml}
|
|
</d:where>
|
|
<d:orderby>
|
|
{orderby_xml}
|
|
</d:orderby>
|
|
{limit_xml}
|
|
</d:basicsearch>
|
|
</d:searchrequest>"""
|
|
|
|
return search_xml
|
|
|
|
def _property_to_xml(self, prop: str) -> str:
|
|
"""Convert a property name to its XML element."""
|
|
# Handle properties with namespace prefixes
|
|
if prop.startswith("{"):
|
|
# Already a full namespace
|
|
namespace_end = prop.index("}")
|
|
namespace = prop[1:namespace_end]
|
|
local_name = prop[namespace_end + 1 :]
|
|
|
|
# Map namespace URIs to prefixes
|
|
ns_map = {
|
|
"DAV:": "d",
|
|
"http://owncloud.org/ns": "oc",
|
|
"http://nextcloud.org/ns": "nc",
|
|
}
|
|
|
|
prefix = ns_map.get(namespace, "d")
|
|
return f"<{prefix}:{local_name}/>"
|
|
else:
|
|
# Guess namespace based on common properties
|
|
if prop in [
|
|
"displayname",
|
|
"getcontentlength",
|
|
"getcontenttype",
|
|
"getlastmodified",
|
|
"resourcetype",
|
|
"getetag",
|
|
"quota-available-bytes",
|
|
"quota-used-bytes",
|
|
]:
|
|
return f"<d:{prop}/>"
|
|
elif prop in [
|
|
"fileid",
|
|
"size",
|
|
"permissions",
|
|
"favorite",
|
|
"tags",
|
|
"owner-id",
|
|
"owner-display-name",
|
|
"share-types",
|
|
"checksums",
|
|
"comments-count",
|
|
"comments-unread",
|
|
]:
|
|
return f"<oc:{prop}/>"
|
|
else:
|
|
# Assume nc namespace for newer properties
|
|
return f"<nc:{prop}/>"
|
|
|
|
def _parse_search_response(
|
|
self, xml_content: bytes, scope: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Parse the XML response from a SEARCH request."""
|
|
root = ET.fromstring(xml_content)
|
|
items = []
|
|
|
|
# Process each response element
|
|
responses = root.findall(".//{DAV:}response")
|
|
|
|
for response_elem in responses:
|
|
href = response_elem.find(".//{DAV:}href")
|
|
if href is None:
|
|
continue
|
|
|
|
# Extract file/directory path from href
|
|
href_text = href.text or ""
|
|
# Remove the /remote.php/dav/files/username/ prefix to get relative path
|
|
path_parts = href_text.split("/files/")
|
|
if len(path_parts) > 1:
|
|
# Get the path after username
|
|
path_after_user = "/".join(path_parts[1].split("/")[1:])
|
|
relative_path = path_after_user.rstrip("/")
|
|
else:
|
|
relative_path = href_text.rstrip("/").split("/")[-1]
|
|
|
|
# Get properties
|
|
propstat = response_elem.find(".//{DAV:}propstat")
|
|
if propstat is None:
|
|
continue
|
|
|
|
prop = propstat.find(".//{DAV:}prop")
|
|
if prop is None:
|
|
continue
|
|
|
|
# Build item dictionary
|
|
item = {"path": relative_path, "href": href_text}
|
|
|
|
# Extract all properties
|
|
for child in prop:
|
|
tag = child.tag
|
|
value = child.text
|
|
|
|
# Remove namespace from tag
|
|
if "}" in tag:
|
|
tag = tag.split("}", 1)[1]
|
|
|
|
# Handle special properties
|
|
if tag == "resourcetype":
|
|
item["is_directory"] = child.find(".//{DAV:}collection") is not None
|
|
elif tag == "getcontentlength":
|
|
item["size"] = int(value) if value else 0
|
|
elif tag == "displayname":
|
|
item["name"] = value
|
|
elif tag == "getcontenttype":
|
|
item["content_type"] = value
|
|
elif tag == "getlastmodified":
|
|
item["last_modified"] = value
|
|
elif tag == "getetag":
|
|
item["etag"] = value.strip('"') if value else None
|
|
elif tag == "fileid":
|
|
item["file_id"] = int(value) if value else None
|
|
elif tag == "favorite":
|
|
item["is_favorite"] = value == "1"
|
|
elif tag == "tags":
|
|
# Tags can be comma-separated or have multiple child elements
|
|
if value:
|
|
# Handle comma-separated tags
|
|
item["tags"] = [
|
|
t.strip() for t in value.split(",") if t.strip()
|
|
]
|
|
else:
|
|
# Check for child tag elements (alternative format)
|
|
tag_elements = child.findall(".//{http://owncloud.org/ns}tag")
|
|
if tag_elements:
|
|
item["tags"] = [t.text for t in tag_elements if t.text]
|
|
else:
|
|
item["tags"] = []
|
|
elif tag == "permissions":
|
|
item["permissions"] = value
|
|
elif tag == "size":
|
|
# oc:size includes folder sizes
|
|
item["total_size"] = int(value) if value else 0
|
|
else:
|
|
# Store other properties as-is
|
|
item[tag] = value
|
|
|
|
items.append(item)
|
|
|
|
return items
|
|
|
|
async def find_by_name(
|
|
self, pattern: str, scope: str = "", limit: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Find files by name pattern using LIKE matching.
|
|
|
|
Args:
|
|
pattern: Name pattern to search for (supports % wildcard)
|
|
scope: Directory path to search in (empty string for user root)
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of matching files/directories
|
|
|
|
Examples:
|
|
# Find all .txt files
|
|
results = await find_by_name("%.txt")
|
|
|
|
# Find files starting with "report"
|
|
results = await find_by_name("report%")
|
|
"""
|
|
where_conditions = f"""
|
|
<d:like>
|
|
<d:prop>
|
|
<d:displayname/>
|
|
</d:prop>
|
|
<d:literal>{pattern}</d:literal>
|
|
</d:like>
|
|
"""
|
|
|
|
return await self.search_files(
|
|
scope=scope, where_conditions=where_conditions, limit=limit
|
|
)
|
|
|
|
async def find_by_type(
|
|
self, mime_type: str, scope: str = "", limit: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Find files by MIME type.
|
|
|
|
Args:
|
|
mime_type: MIME type to search for (supports % wildcard, e.g., "image/%")
|
|
scope: Directory path to search in (empty string for user root)
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of matching files
|
|
|
|
Examples:
|
|
# Find all images
|
|
results = await find_by_type("image/%")
|
|
|
|
# Find all PDFs
|
|
results = await find_by_type("application/pdf")
|
|
"""
|
|
where_conditions = f"""
|
|
<d:like>
|
|
<d:prop>
|
|
<d:getcontenttype/>
|
|
</d:prop>
|
|
<d:literal>{mime_type}</d:literal>
|
|
</d:like>
|
|
"""
|
|
|
|
return await self.search_files(
|
|
scope=scope, where_conditions=where_conditions, limit=limit
|
|
)
|
|
|
|
async def list_favorites(
|
|
self, scope: str = "", limit: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""List all favorite files.
|
|
|
|
Args:
|
|
scope: Directory path to search in (empty string for user root)
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of favorite files/directories
|
|
|
|
Examples:
|
|
# List all favorites
|
|
results = await list_favorites()
|
|
|
|
# List favorites in a specific folder
|
|
results = await list_favorites(scope="Documents")
|
|
"""
|
|
# Use REPORT method for favorites as it's more efficient
|
|
# But we can also use SEARCH as fallback
|
|
where_conditions = """
|
|
<d:eq>
|
|
<d:prop>
|
|
<oc:favorite/>
|
|
</d:prop>
|
|
<d:literal>1</d:literal>
|
|
</d:eq>
|
|
"""
|
|
|
|
# Request favorite property
|
|
properties = [
|
|
"displayname",
|
|
"getcontentlength",
|
|
"getcontenttype",
|
|
"getlastmodified",
|
|
"resourcetype",
|
|
"getetag",
|
|
"fileid",
|
|
"favorite",
|
|
]
|
|
|
|
return await self.search_files(
|
|
scope=scope,
|
|
where_conditions=where_conditions,
|
|
properties=properties,
|
|
limit=limit,
|
|
)
|
|
|
|
async def find_by_tag(
|
|
self, tag_name: str, scope: str = "", limit: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Find files by tag name.
|
|
|
|
DEPRECATED: Use NextcloudClient.find_files_by_tag() instead, which uses
|
|
the proper OCS Tags API rather than WebDAV SEARCH.
|
|
|
|
Args:
|
|
tag_name: Tag to filter by (e.g., "vector-index")
|
|
scope: Directory path to search in (empty string for user root)
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of files/directories with the specified tag
|
|
|
|
Examples:
|
|
# Find all files tagged with "vector-index"
|
|
results = await find_by_tag("vector-index")
|
|
|
|
# Find tagged files in a specific folder
|
|
results = await find_by_tag("vector-index", scope="Documents")
|
|
"""
|
|
# Use LIKE for tag matching since tags can be comma-separated
|
|
where_conditions = f"""
|
|
<d:like>
|
|
<d:prop>
|
|
<oc:tags/>
|
|
</d:prop>
|
|
<d:literal>%{tag_name}%</d:literal>
|
|
</d:like>
|
|
"""
|
|
|
|
# Request tag property along with standard properties
|
|
properties = [
|
|
"displayname",
|
|
"getcontentlength",
|
|
"getcontenttype",
|
|
"getlastmodified",
|
|
"resourcetype",
|
|
"getetag",
|
|
"fileid",
|
|
"tags",
|
|
]
|
|
|
|
return await self.search_files(
|
|
scope=scope,
|
|
where_conditions=where_conditions,
|
|
properties=properties,
|
|
limit=limit,
|
|
)
|
|
|
|
async def _get_file_info_by_id(self, file_id: int) -> Dict[str, Any]:
|
|
"""Get file information by Nextcloud file ID using WebDAV.
|
|
|
|
Args:
|
|
file_id: Nextcloud internal file ID
|
|
|
|
Returns:
|
|
File information dictionary with path, size, content_type, etc.
|
|
|
|
Raises:
|
|
HTTPStatusError: If file not found or request fails
|
|
"""
|
|
# Nextcloud allows accessing files by ID via special meta endpoint
|
|
meta_path = f"/remote.php/dav/meta/{file_id}/"
|
|
|
|
propfind_body = """<?xml version="1.0"?>
|
|
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
|
|
<d:prop>
|
|
<d:displayname/>
|
|
<d:getcontentlength/>
|
|
<d:getcontenttype/>
|
|
<d:getlastmodified/>
|
|
<d:resourcetype/>
|
|
<d:getetag/>
|
|
<oc:fileid/>
|
|
</d:prop>
|
|
</d:propfind>"""
|
|
|
|
headers = {"Depth": "0", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
|
|
|
|
response = await self._make_request(
|
|
"PROPFIND", meta_path, content=propfind_body, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse the XML response
|
|
root = ET.fromstring(response.content)
|
|
responses = root.findall(".//{DAV:}response")
|
|
|
|
if not responses:
|
|
raise RuntimeError(f"File ID {file_id} not found")
|
|
|
|
response_elem = responses[0]
|
|
href = response_elem.find(".//{DAV:}href")
|
|
if href is None:
|
|
raise RuntimeError(f"No href in response for file ID {file_id}")
|
|
|
|
propstat = response_elem.find(".//{DAV:}propstat")
|
|
if propstat is None:
|
|
raise RuntimeError(f"No propstat for file ID {file_id}")
|
|
|
|
prop = propstat.find(".//{DAV:}prop")
|
|
if prop is None:
|
|
raise RuntimeError(f"No prop for file ID {file_id}")
|
|
|
|
# Extract file path from displayname or construct from file ID
|
|
displayname_elem = prop.find(".//{DAV:}displayname")
|
|
name = (
|
|
displayname_elem.text if displayname_elem is not None else f"file_{file_id}"
|
|
)
|
|
|
|
# Get file properties
|
|
size_elem = prop.find(".//{DAV:}getcontentlength")
|
|
size = int(size_elem.text) if size_elem is not None and size_elem.text else 0
|
|
|
|
content_type_elem = prop.find(".//{DAV:}getcontenttype")
|
|
content_type = content_type_elem.text if content_type_elem is not None else None
|
|
|
|
modified_elem = prop.find(".//{DAV:}getlastmodified")
|
|
modified = modified_elem.text if modified_elem is not None else None
|
|
|
|
etag_elem = prop.find(".//{DAV:}getetag")
|
|
etag = (
|
|
etag_elem.text.strip('"')
|
|
if etag_elem is not None and etag_elem.text
|
|
else None
|
|
)
|
|
|
|
# Check if it's a directory
|
|
resourcetype = prop.find(".//{DAV:}resourcetype")
|
|
is_directory = (
|
|
resourcetype is not None
|
|
and resourcetype.find(".//{DAV:}collection") is not None
|
|
)
|
|
|
|
# Try to get actual file path - meta endpoint doesn't give us the real path
|
|
# so we'll construct a reasonable path from the name
|
|
# The calling code in NextcloudClient will have the context to determine the actual path
|
|
file_info = {
|
|
"name": name,
|
|
"path": f"/{name}", # Placeholder - caller should use WebDAV to get real path if needed
|
|
"size": size,
|
|
"content_type": content_type,
|
|
"last_modified": modified,
|
|
"etag": etag,
|
|
"is_directory": is_directory,
|
|
"file_id": file_id,
|
|
}
|
|
|
|
logger.debug(f"Retrieved file info for ID {file_id}: {name}")
|
|
return file_info
|
|
|
|
async def get_tag_by_name(self, tag_name: str) -> dict[str, Any] | None:
|
|
"""Get a system tag by its name via WebDAV.
|
|
|
|
Args:
|
|
tag_name: Name of the tag to find (case-sensitive)
|
|
|
|
Returns:
|
|
Tag dictionary if found, None otherwise
|
|
"""
|
|
# Use WebDAV PROPFIND to list all systemtags
|
|
propfind_body = """<?xml version="1.0"?>
|
|
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
|
|
<d:prop>
|
|
<oc:id/>
|
|
<oc:display-name/>
|
|
<oc:user-visible/>
|
|
<oc:user-assignable/>
|
|
</d:prop>
|
|
</d:propfind>"""
|
|
|
|
response = await self._client.request(
|
|
"PROPFIND",
|
|
"/remote.php/dav/systemtags/",
|
|
headers={"Depth": "1"},
|
|
content=propfind_body,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse XML response
|
|
root = ET.fromstring(response.content)
|
|
ns = {
|
|
"d": "DAV:",
|
|
"oc": "http://owncloud.org/ns",
|
|
}
|
|
|
|
for response_elem in root.findall("d:response", ns):
|
|
href = response_elem.find("d:href", ns)
|
|
if href is None or href.text == "/remote.php/dav/systemtags/":
|
|
# Skip the collection itself
|
|
continue
|
|
|
|
propstat = response_elem.find("d:propstat", ns)
|
|
if propstat is None:
|
|
continue
|
|
|
|
prop = propstat.find("d:prop", ns)
|
|
if prop is None:
|
|
continue
|
|
|
|
# Extract tag properties
|
|
tag_id_elem = prop.find("oc:id", ns)
|
|
display_name_elem = prop.find("oc:display-name", ns)
|
|
user_visible_elem = prop.find("oc:user-visible", ns)
|
|
user_assignable_elem = prop.find("oc:user-assignable", ns)
|
|
|
|
if display_name_elem is not None and display_name_elem.text == tag_name:
|
|
tag_info = {
|
|
"id": int(tag_id_elem.text)
|
|
if tag_id_elem is not None and tag_id_elem.text is not None
|
|
else None,
|
|
"name": display_name_elem.text,
|
|
"userVisible": user_visible_elem.text.lower() == "true"
|
|
if user_visible_elem is not None
|
|
and user_visible_elem.text is not None
|
|
else True,
|
|
"userAssignable": user_assignable_elem.text.lower() == "true"
|
|
if user_assignable_elem is not None
|
|
and user_assignable_elem.text is not None
|
|
else True,
|
|
}
|
|
logger.debug(f"Found tag '{tag_name}' with ID {tag_info['id']}")
|
|
return tag_info
|
|
|
|
logger.debug(f"Tag '{tag_name}' not found")
|
|
return None
|
|
|
|
async def get_files_by_tag(self, tag_id: int) -> list[dict[str, Any]]:
|
|
"""Get all files tagged with a specific system tag via WebDAV REPORT.
|
|
|
|
Args:
|
|
tag_id: Numeric ID of the tag
|
|
|
|
Returns:
|
|
List of file info dictionaries with path, size, content_type, etc.
|
|
"""
|
|
# Use WebDAV REPORT method with systemtag filter, requesting all properties
|
|
report_body = f"""<?xml version="1.0"?>
|
|
<oc:filter-files xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
|
|
<d:prop>
|
|
<oc:fileid/>
|
|
<d:displayname/>
|
|
<d:getcontentlength/>
|
|
<d:getcontenttype/>
|
|
<d:getlastmodified/>
|
|
<d:getetag/>
|
|
</d:prop>
|
|
<oc:filter-rules>
|
|
<oc:systemtag>{tag_id}</oc:systemtag>
|
|
</oc:filter-rules>
|
|
</oc:filter-files>"""
|
|
|
|
response = await self._client.request(
|
|
"REPORT",
|
|
f"{self._get_webdav_base_path()}/",
|
|
content=report_body,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse XML response
|
|
root = ET.fromstring(response.content)
|
|
ns = {
|
|
"d": "DAV:",
|
|
"oc": "http://owncloud.org/ns",
|
|
}
|
|
|
|
files = []
|
|
for response_elem in root.findall("d:response", ns):
|
|
# Extract href (file path)
|
|
href_elem = response_elem.find("d:href", ns)
|
|
if href_elem is None or not href_elem.text:
|
|
continue
|
|
|
|
propstat = response_elem.find("d:propstat", ns)
|
|
if propstat is None:
|
|
continue
|
|
|
|
prop = propstat.find("d:prop", ns)
|
|
if prop is None:
|
|
continue
|
|
|
|
# Extract all properties
|
|
fileid_elem = prop.find("oc:fileid", ns)
|
|
displayname_elem = prop.find("d:displayname", ns)
|
|
contentlength_elem = prop.find("d:getcontentlength", ns)
|
|
contenttype_elem = prop.find("d:getcontenttype", ns)
|
|
lastmodified_elem = prop.find("d:getlastmodified", ns)
|
|
etag_elem = prop.find("d:getetag", ns)
|
|
|
|
if fileid_elem is None or not fileid_elem.text:
|
|
continue
|
|
|
|
# Decode href path and extract the file path
|
|
href_path = unquote(href_elem.text)
|
|
# Remove WebDAV prefix to get user-relative path
|
|
webdav_prefix = f"/remote.php/dav/files/{self.username}/"
|
|
file_path = href_path.replace(webdav_prefix, "/")
|
|
|
|
# Parse last modified timestamp
|
|
last_modified_timestamp = None
|
|
if lastmodified_elem is not None and lastmodified_elem.text:
|
|
try:
|
|
dt = parsedate_to_datetime(lastmodified_elem.text)
|
|
last_modified_timestamp = int(dt.timestamp())
|
|
except Exception:
|
|
pass
|
|
|
|
file_info = {
|
|
"id": int(fileid_elem.text),
|
|
"path": file_path,
|
|
"name": displayname_elem.text
|
|
if displayname_elem is not None
|
|
else file_path.split("/")[-1],
|
|
"size": int(contentlength_elem.text)
|
|
if contentlength_elem is not None and contentlength_elem.text
|
|
else 0,
|
|
"content_type": contenttype_elem.text
|
|
if contenttype_elem is not None
|
|
else "",
|
|
"last_modified": lastmodified_elem.text
|
|
if lastmodified_elem is not None
|
|
else None,
|
|
"last_modified_timestamp": last_modified_timestamp,
|
|
"etag": etag_elem.text if etag_elem is not None else None,
|
|
}
|
|
files.append(file_info)
|
|
|
|
logger.debug(f"Found {len(files)} files with tag ID {tag_id}")
|
|
return files
|
|
|
|
async def get_file_info(self, path: str) -> dict[str, Any] | None:
|
|
"""Get file info including file ID via WebDAV PROPFIND.
|
|
|
|
Args:
|
|
path: Path to the file (relative to user's files directory)
|
|
|
|
Returns:
|
|
File info dictionary with id, name, size, content_type, etc.
|
|
Returns None if file not found.
|
|
"""
|
|
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
|
|
|
|
propfind_body = """<?xml version="1.0"?>
|
|
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
|
|
<d:prop>
|
|
<oc:fileid/>
|
|
<d:displayname/>
|
|
<d:getcontentlength/>
|
|
<d:getcontenttype/>
|
|
<d:getlastmodified/>
|
|
<d:getetag/>
|
|
<d:resourcetype/>
|
|
</d:prop>
|
|
</d:propfind>"""
|
|
|
|
try:
|
|
response = await self._client.request(
|
|
"PROPFIND",
|
|
webdav_path,
|
|
headers={"Depth": "0"},
|
|
content=propfind_body,
|
|
)
|
|
response.raise_for_status()
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
logger.debug(f"File not found: {path}")
|
|
return None
|
|
raise
|
|
|
|
# Parse XML response
|
|
root = ET.fromstring(response.content)
|
|
ns = {
|
|
"d": "DAV:",
|
|
"oc": "http://owncloud.org/ns",
|
|
}
|
|
|
|
response_elem = root.find("d:response", ns)
|
|
if response_elem is None:
|
|
return None
|
|
|
|
propstat = response_elem.find("d:propstat", ns)
|
|
if propstat is None:
|
|
return None
|
|
|
|
prop = propstat.find("d:prop", ns)
|
|
if prop is None:
|
|
return None
|
|
|
|
# Extract properties
|
|
fileid_elem = prop.find("oc:fileid", ns)
|
|
displayname_elem = prop.find("d:displayname", ns)
|
|
contentlength_elem = prop.find("d:getcontentlength", ns)
|
|
contenttype_elem = prop.find("d:getcontenttype", ns)
|
|
lastmodified_elem = prop.find("d:getlastmodified", ns)
|
|
etag_elem = prop.find("d:getetag", ns)
|
|
resourcetype_elem = prop.find("d:resourcetype", ns)
|
|
|
|
is_directory = (
|
|
resourcetype_elem is not None
|
|
and resourcetype_elem.find("d:collection", ns) is not None
|
|
)
|
|
|
|
file_info = {
|
|
"id": int(fileid_elem.text)
|
|
if fileid_elem is not None and fileid_elem.text is not None
|
|
else None,
|
|
"path": path,
|
|
"name": displayname_elem.text
|
|
if displayname_elem is not None
|
|
else path.split("/")[-1],
|
|
"size": int(contentlength_elem.text)
|
|
if contentlength_elem is not None and contentlength_elem.text
|
|
else 0,
|
|
"content_type": contenttype_elem.text
|
|
if contenttype_elem is not None
|
|
else "",
|
|
"last_modified": lastmodified_elem.text
|
|
if lastmodified_elem is not None
|
|
else None,
|
|
"etag": etag_elem.text.strip('"')
|
|
if etag_elem is not None and etag_elem.text
|
|
else None,
|
|
"is_directory": is_directory,
|
|
}
|
|
|
|
logger.debug(f"Got file info for '{path}': id={file_info['id']}")
|
|
return file_info
|
|
|
|
async def create_tag(
|
|
self,
|
|
name: str,
|
|
user_visible: bool = True,
|
|
user_assignable: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""Create a system tag via WebDAV.
|
|
|
|
Args:
|
|
name: Name of the tag to create
|
|
user_visible: Whether the tag is visible to users
|
|
user_assignable: Whether users can assign this tag
|
|
|
|
Returns:
|
|
Tag dictionary with id, name, userVisible, userAssignable
|
|
|
|
Raises:
|
|
HTTPStatusError: If tag creation fails (409 if already exists)
|
|
"""
|
|
# Use WebDAV POST with JSON body to create tag
|
|
response = await self._client.post(
|
|
"/remote.php/dav/systemtags/",
|
|
headers={"Content-Type": "application/json"},
|
|
json={
|
|
"name": name,
|
|
"userVisible": user_visible,
|
|
"userAssignable": user_assignable,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42)
|
|
content_location = response.headers.get("Content-Location", "")
|
|
tag_id = None
|
|
if content_location:
|
|
# Extract the numeric ID from the path
|
|
try:
|
|
tag_id = int(content_location.rstrip("/").split("/")[-1])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
tag_info = {
|
|
"id": tag_id,
|
|
"name": name,
|
|
"userVisible": user_visible,
|
|
"userAssignable": user_assignable,
|
|
}
|
|
|
|
logger.info(f"Created tag '{name}' with ID {tag_info['id']}")
|
|
return tag_info
|
|
|
|
async def get_or_create_tag(
|
|
self,
|
|
name: str,
|
|
user_visible: bool = True,
|
|
user_assignable: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""Get a tag by name, creating it if it doesn't exist.
|
|
|
|
Args:
|
|
name: Name of the tag
|
|
user_visible: Whether the tag is visible to users (for creation)
|
|
user_assignable: Whether users can assign this tag (for creation)
|
|
|
|
Returns:
|
|
Tag dictionary with id, name, userVisible, userAssignable
|
|
"""
|
|
# First try to get existing tag
|
|
existing_tag = await self.get_tag_by_name(name)
|
|
if existing_tag:
|
|
logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}")
|
|
return existing_tag
|
|
|
|
# Create new tag
|
|
try:
|
|
return await self.create_tag(name, user_visible, user_assignable)
|
|
except HTTPStatusError as e:
|
|
if e.response.status_code == 409:
|
|
# Tag was created between our check and creation, fetch it
|
|
existing_tag = await self.get_tag_by_name(name)
|
|
if existing_tag:
|
|
return existing_tag
|
|
raise
|
|
|
|
async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool:
|
|
"""Assign a system tag to a file.
|
|
|
|
Args:
|
|
file_id: Numeric file ID
|
|
tag_id: Numeric tag ID
|
|
|
|
Returns:
|
|
True if tag was assigned successfully (or already assigned)
|
|
|
|
Raises:
|
|
HTTPStatusError: If tag assignment fails
|
|
"""
|
|
response = await self._client.request(
|
|
"PUT",
|
|
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
|
|
headers={"Content-Length": "0"},
|
|
content=b"",
|
|
)
|
|
|
|
# 201 = Created (new assignment), 409 = Conflict (already assigned)
|
|
if response.status_code in (201, 409):
|
|
logger.info(f"Tagged file {file_id} with tag {tag_id}")
|
|
return True
|
|
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool:
|
|
"""Remove a system tag from a file.
|
|
|
|
Args:
|
|
file_id: Numeric file ID
|
|
tag_id: Numeric tag ID
|
|
|
|
Returns:
|
|
True if tag was removed successfully (or wasn't assigned)
|
|
|
|
Raises:
|
|
HTTPStatusError: If tag removal fails
|
|
"""
|
|
response = await self._client.request(
|
|
"DELETE",
|
|
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
|
|
)
|
|
|
|
# 204 = No Content (removed), 404 = Not Found (wasn't assigned)
|
|
if response.status_code in (204, 404):
|
|
logger.info(f"Removed tag {tag_id} from file {file_id}")
|
|
return True
|
|
|
|
response.raise_for_status()
|
|
return True
|