"""WebDAV client for Nextcloud file operations.""" import logging import mimetypes import xml.etree.ElementTree as ET from email.utils import parsedate_to_datetime from typing import Any, Dict, List, Optional, Tuple from urllib.parse import unquote from httpx import HTTPStatusError from .base import BaseNextcloudClient logger = logging.getLogger(__name__) class WebDAVClient(BaseNextcloudClient): """Client for Nextcloud WebDAV operations.""" app_name = "webdav" async def delete_resource(self, path: str) -> Dict[str, Any]: """Delete a resource (file or directory) via WebDAV DELETE.""" # Ensure path ends with a slash if it's a directory if not path.endswith("/"): path_with_slash = f"{path}/" else: path_with_slash = path webdav_path = f"{self._get_webdav_base_path()}/{path_with_slash.lstrip('/')}" logger.debug(f"Deleting WebDAV resource: {webdav_path}") headers = {"OCS-APIRequest": "true"} try: # First try a PROPFIND to verify resource exists propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} try: propfind_resp = await self._make_request( "PROPFIND", webdav_path, headers=propfind_headers ) logger.debug( f"Resource exists check status: {propfind_resp.status_code}" ) except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Resource '{path}' doesn't exist, no deletion needed") return {"status_code": 404} # For other errors, continue with deletion attempt # Proceed with deletion response = await self._make_request("DELETE", webdav_path, headers=headers) logger.debug(f"Successfully deleted WebDAV resource '{path}'") return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Resource '{path}' not found, no deletion needed") return {"status_code": 404} else: logger.error(f"HTTP error deleting WebDAV resource '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error deleting WebDAV resource '{path}': {e}") raise e async def cleanup_old_attachment_directory( self, note_id: int, old_category: str ) -> Dict[str, Any]: """Clean up the attachment directory for a note in its old category location.""" old_category_path_part = f"{old_category}/" if old_category else "" old_attachment_dir_path = ( f"Notes/{old_category_path_part}.attachments.{note_id}/" ) logger.debug(f"Cleaning up old attachment directory: {old_attachment_dir_path}") try: delete_result = await self.delete_resource(path=old_attachment_dir_path) logger.debug(f"Cleanup result: {delete_result}") return delete_result except Exception as e: logger.error(f"Error during cleanup of old attachment directory: {e}") raise e async def cleanup_note_attachments( self, note_id: int, category: str ) -> Dict[str, Any]: """Clean up attachment directory for a specific note and category.""" cat_path_part = f"{category}/" if category else "" attachment_dir_path = f"Notes/{cat_path_part}.attachments.{note_id}/" logger.debug( f"Cleaning up attachments for note {note_id} in category '{category}'" ) try: delete_result = await self.delete_resource(path=attachment_dir_path) logger.debug(f"Cleanup result for note {note_id}: {delete_result}") return delete_result except Exception as e: logger.error(f"Failed cleaning up attachments for note {note_id}: {e}") raise e async def add_note_attachment( self, note_id: int, filename: str, content: bytes, category: Optional[str] = None, mime_type: Optional[str] = None, ) -> Dict[str, Any]: """Add/Update an attachment to a note via WebDAV PUT.""" # Construct paths based on provided category webdav_base = self._get_webdav_base_path() category_path_part = f"{category}/" if category else "" attachment_dir_segment = f".attachments.{note_id}" parent_dir_webdav_rel_path = ( f"Notes/{category_path_part}{attachment_dir_segment}" ) parent_dir_path = f"{webdav_base}/{parent_dir_webdav_rel_path}" attachment_path = f"{parent_dir_path}/{filename}" logger.debug(f"Uploading attachment '{filename}' for note {note_id}") if not mime_type: mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" headers = {"Content-Type": mime_type, "OCS-APIRequest": "true"} try: # First check if we can access WebDAV at all notes_dir_path = f"{webdav_base}/Notes" propfind_headers = {"Depth": "0", "OCS-APIRequest": "true"} notes_dir_response = await self._make_request( "PROPFIND", notes_dir_path, headers=propfind_headers ) if notes_dir_response.status_code == 401: logger.error("WebDAV authentication failed for Notes directory") raise HTTPStatusError( f"Authentication error accessing WebDAV Notes directory: {notes_dir_response.status_code}", request=notes_dir_response.request, response=notes_dir_response, ) elif notes_dir_response.status_code >= 400: logger.error( f"Error accessing WebDAV Notes directory: {notes_dir_response.status_code}" ) notes_dir_response.raise_for_status() # Ensure the parent directory exists using MKCOL mkcol_headers = {"OCS-APIRequest": "true"} mkcol_response = await self._make_request( "MKCOL", parent_dir_path, headers=mkcol_headers ) # MKCOL should return 201 Created or 405 Method Not Allowed (if directory already exists) if mkcol_response.status_code not in [201, 405]: logger.error( f"Unexpected status code {mkcol_response.status_code} when creating attachments directory" ) mkcol_response.raise_for_status() # Proceed with the PUT request response = await self._make_request( "PUT", attachment_path, content=content, headers=headers ) response.raise_for_status() logger.debug( f"Successfully uploaded attachment '{filename}' to note {note_id}" ) return {"status_code": response.status_code} except HTTPStatusError as e: logger.error( f"HTTP error uploading attachment '{filename}' to note {note_id}: {e}" ) raise e except Exception as e: logger.error( f"Unexpected error uploading attachment '{filename}' to note {note_id}: {e}" ) raise e async def get_note_attachment( self, note_id: int, filename: str, category: Optional[str] = None ) -> Tuple[bytes, str]: """Fetch a specific attachment from a note via WebDAV GET.""" webdav_base = self._get_webdav_base_path() category_path_part = f"{category}/" if category else "" attachment_dir_segment = f".attachments.{note_id}" attachment_path = f"{webdav_base}/Notes/{category_path_part}{attachment_dir_segment}/{filename}" logger.debug(f"Fetching attachment '{filename}' for note {note_id}") try: response = await self._make_request("GET", attachment_path) response.raise_for_status() content = response.content mime_type = response.headers.get("content-type", "application/octet-stream") logger.debug( f"Successfully fetched attachment '{filename}' ({len(content)} bytes)" ) return content, mime_type except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Attachment '{filename}' not found for note {note_id}") else: logger.error( f"HTTP error fetching attachment '{filename}' for note {note_id}: {e}" ) raise e except Exception as e: logger.error( f"Unexpected error fetching attachment '{filename}' for note {note_id}: {e}" ) raise e async def list_directory(self, path: str = "") -> List[Dict[str, Any]]: """List files and directories in the specified path via WebDAV PROPFIND.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" if not webdav_path.endswith("/"): webdav_path += "/" logger.debug(f"Listing directory: {path}") propfind_body = """ """ headers = {"Depth": "1", "Content-Type": "text/xml", "OCS-APIRequest": "true"} try: response = await self._make_request( "PROPFIND", webdav_path, content=propfind_body, headers=headers ) response.raise_for_status() # Parse the XML response root = ET.fromstring(response.content) items = [] # Skip the first response (the directory itself) responses = root.findall(".//{DAV:}response")[1:] for response_elem in responses: href = response_elem.find(".//{DAV:}href") if href is None: continue # Extract file/directory name from href href_text = href.text or "" name = href_text.rstrip("/").split("/")[-1] if not name: continue # Get properties propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: continue prop = propstat.find(".//{DAV:}prop") if prop is None: continue # Determine if it's a directory resourcetype = prop.find(".//{DAV:}resourcetype") is_directory = ( resourcetype is not None and resourcetype.find(".//{DAV:}collection") is not None ) # Get other properties size_elem = prop.find(".//{DAV:}getcontentlength") size = ( int(size_elem.text) if size_elem is not None and size_elem.text else 0 ) content_type_elem = prop.find(".//{DAV:}getcontenttype") content_type = ( content_type_elem.text if content_type_elem is not None else None ) modified_elem = prop.find(".//{DAV:}getlastmodified") modified = modified_elem.text if modified_elem is not None else None items.append( { "name": name, "path": f"{path.rstrip('/')}/{name}" if path else name, "is_directory": is_directory, "size": size if not is_directory else None, "content_type": content_type, "last_modified": modified, } ) logger.debug(f"Found {len(items)} items in directory: {path}") return items except HTTPStatusError as e: logger.error(f"HTTP error listing directory '{webdav_path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error listing directory '{webdav_path}': {e}") raise e async def read_file(self, path: str) -> Tuple[bytes, str]: """Read a file's content via WebDAV GET.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" logger.debug(f"Reading file: {path}") try: response = await self._make_request("GET", webdav_path) response.raise_for_status() content = response.content content_type = response.headers.get( "content-type", "application/octet-stream" ) logger.debug(f"Successfully read file '{path}' ({len(content)} bytes)") return content, content_type except HTTPStatusError as e: logger.error(f"HTTP error reading file '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error reading file '{path}': {e}") raise e async def write_file( self, path: str, content: bytes, content_type: Optional[str] = None ) -> Dict[str, Any]: """Write content to a file via WebDAV PUT.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" logger.debug(f"Writing file: {path}") if not content_type: content_type, _ = mimetypes.guess_type(path) if not content_type: content_type = "application/octet-stream" headers = {"Content-Type": content_type, "OCS-APIRequest": "true"} try: response = await self._make_request( "PUT", webdav_path, content=content, headers=headers ) response.raise_for_status() logger.debug(f"Successfully wrote file '{path}'") return {"status_code": response.status_code} except HTTPStatusError as e: logger.error(f"HTTP error writing file '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error writing file '{path}': {e}") raise e async def create_directory( self, path: str, recursive: bool = False ) -> Dict[str, Any]: """Create a directory via WebDAV MKCOL.""" webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" if not webdav_path.endswith("/"): webdav_path += "/" logger.debug(f"Creating directory: {path}") headers = {"OCS-APIRequest": "true"} try: response = await self._make_request("MKCOL", webdav_path, headers=headers) response.raise_for_status() logger.debug(f"Successfully created directory '{path}'") return {"status_code": response.status_code} except HTTPStatusError as e: # Method Not Allowed - directory already exists if e.response.status_code == 405: logger.debug(f"Directory '{path}' already exists") return {"status_code": 405, "message": "Directory already exists"} # File Conflict - parent directory does not exist if e.response.status_code == 409 and recursive: # Extract parent directory path path_parts = path.strip("/").split("/") if len(path_parts) > 1: parent_dir = "/".join(path_parts[:-1]) logger.debug( f"Parent directory '{parent_dir}' doesn't exist, creating recursively" ) await self.create_directory(parent_dir, recursive) # Now try to create the original directory again return await self.create_directory(path, recursive) else: # This shouldn't happen for single-level directories under root logger.error(f"409 conflict for single-level directory '{path}'") raise e logger.error(f"HTTP error creating directory '{path}': {e}") raise e except Exception as e: logger.error(f"Unexpected error creating directory '{path}': {e}") raise e async def move_resource( self, source_path: str, destination_path: str, overwrite: bool = False ) -> Dict[str, Any]: """Move or rename a resource (file or directory) via WebDAV MOVE. Args: source_path: The path of the file or directory to move destination_path: The new path for the file or directory overwrite: Whether to overwrite the destination if it exists Returns: Dict with status_code and optional message """ source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}" destination_webdav_path = ( f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}" ) # Ensure paths have consistent trailing slashes for directories if source_path.endswith("/") and not destination_path.endswith("/"): destination_webdav_path += "/" elif not source_path.endswith("/") and destination_path.endswith("/"): source_webdav_path += "/" logger.debug(f"Moving resource from '{source_path}' to '{destination_path}'") headers = { "OCS-APIRequest": "true", "Destination": destination_webdav_path, "Overwrite": "T" if overwrite else "F", } try: response = await self._make_request( "MOVE", source_webdav_path, headers=headers ) response.raise_for_status() logger.debug( f"Successfully moved resource from '{source_path}' to '{destination_path}'" ) return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Source resource '{source_path}' not found") return {"status_code": 404, "message": "Source resource not found"} elif e.response.status_code == 412: logger.debug( f"Destination '{destination_path}' already exists and overwrite is false" ) return { "status_code": 412, "message": "Destination already exists and overwrite is false", } elif e.response.status_code == 409: logger.debug( f"Parent directory of destination '{destination_path}' doesn't exist" ) return { "status_code": 409, "message": "Parent directory of destination doesn't exist", } logger.debug( f"Parent directory of destination '{destination_path}' doesn't exist" ) return { "status_code": 409, "message": "Parent directory of destination doesn't exist", } else: logger.error( f"HTTP error moving resource from '{source_path}' to '{destination_path}': {e}" ) raise e except Exception as e: logger.error( f"Unexpected error moving resource from '{source_path}' to '{destination_path}': {e}" ) raise e async def copy_resource( self, source_path: str, destination_path: str, overwrite: bool = False ) -> Dict[str, Any]: """Copy a resource (file or directory) via WebDAV COPY. Args: source_path: The path of the file or directory to copy destination_path: The destination path for the copy overwrite: Whether to overwrite the destination if it exists Returns: Dict with status_code and optional message """ source_webdav_path = f"{self._get_webdav_base_path()}/{source_path.lstrip('/')}" destination_webdav_path = ( f"{self._get_webdav_base_path()}/{destination_path.lstrip('/')}" ) # Ensure paths have consistent trailing slashes for directories if source_path.endswith("/") and not destination_path.endswith("/"): destination_webdav_path += "/" elif not source_path.endswith("/") and destination_path.endswith("/"): source_webdav_path += "/" logger.debug(f"Copying resource from '{source_path}' to '{destination_path}'") headers = { "OCS-APIRequest": "true", "Destination": destination_webdav_path, "Overwrite": "T" if overwrite else "F", } try: response = await self._make_request( "COPY", source_webdav_path, headers=headers ) response.raise_for_status() logger.debug( f"Successfully copied resource from '{source_path}' to '{destination_path}'" ) return {"status_code": response.status_code} except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"Source resource '{source_path}' not found") return {"status_code": 404, "message": "Source resource not found"} elif e.response.status_code == 412: logger.debug( f"Destination '{destination_path}' already exists and overwrite is false" ) return { "status_code": 412, "message": "Destination already exists and overwrite is false", } elif e.response.status_code == 409: logger.debug( f"Parent directory of destination '{destination_path}' doesn't exist" ) return { "status_code": 409, "message": "Parent directory of destination doesn't exist", } else: logger.error( f"HTTP error copying resource from '{source_path}' to '{destination_path}': {e}" ) raise e except Exception as e: logger.error( f"Unexpected error copying resource from '{source_path}' to '{destination_path}': {e}" ) raise e async def search_files( self, scope: str = "", where_conditions: Optional[str] = None, properties: Optional[List[str]] = None, order_by: Optional[List[Tuple[str, str]]] = None, limit: Optional[int] = None, ) -> List[Dict[str, Any]]: """Search for files using WebDAV SEARCH method (RFC 5323). Args: scope: Directory path to search in (empty string for user root) where_conditions: XML string for where clause conditions properties: List of property names to retrieve (defaults to basic set) order_by: List of (property, direction) tuples for sorting, e.g. [("getlastmodified", "descending")] limit: Maximum number of results to return Returns: List of file/directory dictionaries with requested properties """ # Default properties if not specified if properties is None: properties = [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", ] # Build the SEARCH request XML search_body = self._build_search_xml( scope=scope, where_conditions=where_conditions, properties=properties, order_by=order_by, limit=limit, ) # The SEARCH endpoint is at the dav root search_path = "/remote.php/dav/" headers = {"Content-Type": "text/xml", "OCS-APIRequest": "true"} logger.debug(f"Searching files in scope: {scope}") try: response = await self._make_request( "SEARCH", search_path, content=search_body, headers=headers ) response.raise_for_status() # Parse the XML response results = self._parse_search_response(response.content, scope) logger.debug(f"Search returned {len(results)} results") return results except HTTPStatusError as e: logger.error(f"HTTP error during search: {e}") raise e except Exception as e: logger.error(f"Unexpected error during search: {e}") raise e def _build_search_xml( self, scope: str, where_conditions: Optional[str], properties: List[str], order_by: Optional[List[Tuple[str, str]]], limit: Optional[int], ) -> str: """Build the XML body for a SEARCH request.""" # Construct the scope path username = self.username scope_path = f"/files/{username}" if scope: scope_path = f"{scope_path}/{scope.lstrip('/')}" # Build property list prop_xml = "\n".join([self._property_to_xml(prop) for prop in properties]) # Build where clause where_xml = where_conditions if where_conditions else "" # Build order by clause orderby_xml = "" if order_by: order_elements = [] for prop, direction in order_by: prop_element = self._property_to_xml(prop) dir_element = ( "" if direction.lower() == "ascending" else "" ) order_elements.append(f"{prop_element}{dir_element}") orderby_xml = "\n".join(order_elements) else: orderby_xml = "" # Build limit clause limit_xml = ( f"{limit}" if limit else "" ) # Construct the full SEARCH XML search_xml = f""" {prop_xml} {scope_path} infinity {where_xml} {orderby_xml} {limit_xml} """ return search_xml def _property_to_xml(self, prop: str) -> str: """Convert a property name to its XML element.""" # Handle properties with namespace prefixes if prop.startswith("{"): # Already a full namespace namespace_end = prop.index("}") namespace = prop[1:namespace_end] local_name = prop[namespace_end + 1 :] # Map namespace URIs to prefixes ns_map = { "DAV:": "d", "http://owncloud.org/ns": "oc", "http://nextcloud.org/ns": "nc", } prefix = ns_map.get(namespace, "d") return f"<{prefix}:{local_name}/>" else: # Guess namespace based on common properties if prop in [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", "quota-available-bytes", "quota-used-bytes", ]: return f"" elif prop in [ "fileid", "size", "permissions", "favorite", "tags", "owner-id", "owner-display-name", "share-types", "checksums", "comments-count", "comments-unread", ]: return f"" else: # Assume nc namespace for newer properties return f"" def _parse_search_response( self, xml_content: bytes, scope: str ) -> List[Dict[str, Any]]: """Parse the XML response from a SEARCH request.""" root = ET.fromstring(xml_content) items = [] # Process each response element responses = root.findall(".//{DAV:}response") for response_elem in responses: href = response_elem.find(".//{DAV:}href") if href is None: continue # Extract file/directory path from href href_text = href.text or "" # Remove the /remote.php/dav/files/username/ prefix to get relative path path_parts = href_text.split("/files/") if len(path_parts) > 1: # Get the path after username path_after_user = "/".join(path_parts[1].split("/")[1:]) relative_path = path_after_user.rstrip("/") else: relative_path = href_text.rstrip("/").split("/")[-1] # Get properties propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: continue prop = propstat.find(".//{DAV:}prop") if prop is None: continue # Build item dictionary item = {"path": relative_path, "href": href_text} # Extract all properties for child in prop: tag = child.tag value = child.text # Remove namespace from tag if "}" in tag: tag = tag.split("}", 1)[1] # Handle special properties if tag == "resourcetype": item["is_directory"] = child.find(".//{DAV:}collection") is not None elif tag == "getcontentlength": item["size"] = int(value) if value else 0 elif tag == "displayname": item["name"] = value elif tag == "getcontenttype": item["content_type"] = value elif tag == "getlastmodified": item["last_modified"] = value elif tag == "getetag": item["etag"] = value.strip('"') if value else None elif tag == "fileid": item["file_id"] = int(value) if value else None elif tag == "favorite": item["is_favorite"] = value == "1" elif tag == "tags": # Tags can be comma-separated or have multiple child elements if value: # Handle comma-separated tags item["tags"] = [ t.strip() for t in value.split(",") if t.strip() ] else: # Check for child tag elements (alternative format) tag_elements = child.findall(".//{http://owncloud.org/ns}tag") if tag_elements: item["tags"] = [t.text for t in tag_elements if t.text] else: item["tags"] = [] elif tag == "permissions": item["permissions"] = value elif tag == "size": # oc:size includes folder sizes item["total_size"] = int(value) if value else 0 else: # Store other properties as-is item[tag] = value items.append(item) return items async def find_by_name( self, pattern: str, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Find files by name pattern using LIKE matching. Args: pattern: Name pattern to search for (supports % wildcard) scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of matching files/directories Examples: # Find all .txt files results = await find_by_name("%.txt") # Find files starting with "report" results = await find_by_name("report%") """ where_conditions = f""" {pattern} """ return await self.search_files( scope=scope, where_conditions=where_conditions, limit=limit ) async def find_by_type( self, mime_type: str, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Find files by MIME type. Args: mime_type: MIME type to search for (supports % wildcard, e.g., "image/%") scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of matching files Examples: # Find all images results = await find_by_type("image/%") # Find all PDFs results = await find_by_type("application/pdf") """ where_conditions = f""" {mime_type} """ return await self.search_files( scope=scope, where_conditions=where_conditions, limit=limit ) async def list_favorites( self, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """List all favorite files. Args: scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of favorite files/directories Examples: # List all favorites results = await list_favorites() # List favorites in a specific folder results = await list_favorites(scope="Documents") """ # Use REPORT method for favorites as it's more efficient # But we can also use SEARCH as fallback where_conditions = """ 1 """ # Request favorite property properties = [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", "fileid", "favorite", ] return await self.search_files( scope=scope, where_conditions=where_conditions, properties=properties, limit=limit, ) async def find_by_tag( self, tag_name: str, scope: str = "", limit: Optional[int] = None ) -> List[Dict[str, Any]]: """Find files by tag name. DEPRECATED: Use NextcloudClient.find_files_by_tag() instead, which uses the proper OCS Tags API rather than WebDAV SEARCH. Args: tag_name: Tag to filter by (e.g., "vector-index") scope: Directory path to search in (empty string for user root) limit: Maximum number of results to return Returns: List of files/directories with the specified tag Examples: # Find all files tagged with "vector-index" results = await find_by_tag("vector-index") # Find tagged files in a specific folder results = await find_by_tag("vector-index", scope="Documents") """ # Use LIKE for tag matching since tags can be comma-separated where_conditions = f""" %{tag_name}% """ # Request tag property along with standard properties properties = [ "displayname", "getcontentlength", "getcontenttype", "getlastmodified", "resourcetype", "getetag", "fileid", "tags", ] return await self.search_files( scope=scope, where_conditions=where_conditions, properties=properties, limit=limit, ) async def _get_file_info_by_id(self, file_id: int) -> Dict[str, Any]: """Get file information by Nextcloud file ID using WebDAV. Args: file_id: Nextcloud internal file ID Returns: File information dictionary with path, size, content_type, etc. Raises: HTTPStatusError: If file not found or request fails """ # Nextcloud allows accessing files by ID via special meta endpoint meta_path = f"/remote.php/dav/meta/{file_id}/" propfind_body = """ """ headers = {"Depth": "0", "Content-Type": "text/xml", "OCS-APIRequest": "true"} response = await self._make_request( "PROPFIND", meta_path, content=propfind_body, headers=headers ) response.raise_for_status() # Parse the XML response root = ET.fromstring(response.content) responses = root.findall(".//{DAV:}response") if not responses: raise RuntimeError(f"File ID {file_id} not found") response_elem = responses[0] href = response_elem.find(".//{DAV:}href") if href is None: raise RuntimeError(f"No href in response for file ID {file_id}") propstat = response_elem.find(".//{DAV:}propstat") if propstat is None: raise RuntimeError(f"No propstat for file ID {file_id}") prop = propstat.find(".//{DAV:}prop") if prop is None: raise RuntimeError(f"No prop for file ID {file_id}") # Extract file path from displayname or construct from file ID displayname_elem = prop.find(".//{DAV:}displayname") name = ( displayname_elem.text if displayname_elem is not None else f"file_{file_id}" ) # Get file properties size_elem = prop.find(".//{DAV:}getcontentlength") size = int(size_elem.text) if size_elem is not None and size_elem.text else 0 content_type_elem = prop.find(".//{DAV:}getcontenttype") content_type = content_type_elem.text if content_type_elem is not None else None modified_elem = prop.find(".//{DAV:}getlastmodified") modified = modified_elem.text if modified_elem is not None else None etag_elem = prop.find(".//{DAV:}getetag") etag = ( etag_elem.text.strip('"') if etag_elem is not None and etag_elem.text else None ) # Check if it's a directory resourcetype = prop.find(".//{DAV:}resourcetype") is_directory = ( resourcetype is not None and resourcetype.find(".//{DAV:}collection") is not None ) # Try to get actual file path - meta endpoint doesn't give us the real path # so we'll construct a reasonable path from the name # The calling code in NextcloudClient will have the context to determine the actual path file_info = { "name": name, "path": f"/{name}", # Placeholder - caller should use WebDAV to get real path if needed "size": size, "content_type": content_type, "last_modified": modified, "etag": etag, "is_directory": is_directory, "file_id": file_id, } logger.debug(f"Retrieved file info for ID {file_id}: {name}") return file_info async def get_tag_by_name(self, tag_name: str) -> dict[str, Any] | None: """Get a system tag by its name via WebDAV. Args: tag_name: Name of the tag to find (case-sensitive) Returns: Tag dictionary if found, None otherwise """ # Use WebDAV PROPFIND to list all systemtags propfind_body = """ """ response = await self._client.request( "PROPFIND", "/remote.php/dav/systemtags/", headers={"Depth": "1"}, content=propfind_body, ) response.raise_for_status() # Parse XML response root = ET.fromstring(response.content) ns = { "d": "DAV:", "oc": "http://owncloud.org/ns", } for response_elem in root.findall("d:response", ns): href = response_elem.find("d:href", ns) if href is None or href.text == "/remote.php/dav/systemtags/": # Skip the collection itself continue propstat = response_elem.find("d:propstat", ns) if propstat is None: continue prop = propstat.find("d:prop", ns) if prop is None: continue # Extract tag properties tag_id_elem = prop.find("oc:id", ns) display_name_elem = prop.find("oc:display-name", ns) user_visible_elem = prop.find("oc:user-visible", ns) user_assignable_elem = prop.find("oc:user-assignable", ns) if display_name_elem is not None and display_name_elem.text == tag_name: tag_info = { "id": int(tag_id_elem.text) if tag_id_elem is not None and tag_id_elem.text is not None else None, "name": display_name_elem.text, "userVisible": user_visible_elem.text.lower() == "true" if user_visible_elem is not None and user_visible_elem.text is not None else True, "userAssignable": user_assignable_elem.text.lower() == "true" if user_assignable_elem is not None and user_assignable_elem.text is not None else True, } logger.debug(f"Found tag '{tag_name}' with ID {tag_info['id']}") return tag_info logger.debug(f"Tag '{tag_name}' not found") return None async def get_files_by_tag(self, tag_id: int) -> list[dict[str, Any]]: """Get all files tagged with a specific system tag via WebDAV REPORT. Args: tag_id: Numeric ID of the tag Returns: List of file info dictionaries with path, size, content_type, etc. """ # Use WebDAV REPORT method with systemtag filter, requesting all properties report_body = f""" {tag_id} """ response = await self._client.request( "REPORT", f"{self._get_webdav_base_path()}/", content=report_body, ) response.raise_for_status() # Parse XML response root = ET.fromstring(response.content) ns = { "d": "DAV:", "oc": "http://owncloud.org/ns", } files = [] for response_elem in root.findall("d:response", ns): # Extract href (file path) href_elem = response_elem.find("d:href", ns) if href_elem is None or not href_elem.text: continue propstat = response_elem.find("d:propstat", ns) if propstat is None: continue prop = propstat.find("d:prop", ns) if prop is None: continue # Extract all properties fileid_elem = prop.find("oc:fileid", ns) displayname_elem = prop.find("d:displayname", ns) contentlength_elem = prop.find("d:getcontentlength", ns) contenttype_elem = prop.find("d:getcontenttype", ns) lastmodified_elem = prop.find("d:getlastmodified", ns) etag_elem = prop.find("d:getetag", ns) if fileid_elem is None or not fileid_elem.text: continue # Decode href path and extract the file path href_path = unquote(href_elem.text) # Remove WebDAV prefix to get user-relative path webdav_prefix = f"/remote.php/dav/files/{self.username}/" file_path = href_path.replace(webdav_prefix, "/") # Parse last modified timestamp last_modified_timestamp = None if lastmodified_elem is not None and lastmodified_elem.text: try: dt = parsedate_to_datetime(lastmodified_elem.text) last_modified_timestamp = int(dt.timestamp()) except Exception: pass file_info = { "id": int(fileid_elem.text), "path": file_path, "name": displayname_elem.text if displayname_elem is not None else file_path.split("/")[-1], "size": int(contentlength_elem.text) if contentlength_elem is not None and contentlength_elem.text else 0, "content_type": contenttype_elem.text if contenttype_elem is not None else "", "last_modified": lastmodified_elem.text if lastmodified_elem is not None else None, "last_modified_timestamp": last_modified_timestamp, "etag": etag_elem.text if etag_elem is not None else None, } files.append(file_info) logger.debug(f"Found {len(files)} files with tag ID {tag_id}") return files async def get_file_info(self, path: str) -> dict[str, Any] | None: """Get file info including file ID via WebDAV PROPFIND. Args: path: Path to the file (relative to user's files directory) Returns: File info dictionary with id, name, size, content_type, etc. Returns None if file not found. """ webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}" propfind_body = """ """ try: response = await self._client.request( "PROPFIND", webdav_path, headers={"Depth": "0"}, content=propfind_body, ) response.raise_for_status() except HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"File not found: {path}") return None raise # Parse XML response root = ET.fromstring(response.content) ns = { "d": "DAV:", "oc": "http://owncloud.org/ns", } response_elem = root.find("d:response", ns) if response_elem is None: return None propstat = response_elem.find("d:propstat", ns) if propstat is None: return None prop = propstat.find("d:prop", ns) if prop is None: return None # Extract properties fileid_elem = prop.find("oc:fileid", ns) displayname_elem = prop.find("d:displayname", ns) contentlength_elem = prop.find("d:getcontentlength", ns) contenttype_elem = prop.find("d:getcontenttype", ns) lastmodified_elem = prop.find("d:getlastmodified", ns) etag_elem = prop.find("d:getetag", ns) resourcetype_elem = prop.find("d:resourcetype", ns) is_directory = ( resourcetype_elem is not None and resourcetype_elem.find("d:collection", ns) is not None ) file_info = { "id": int(fileid_elem.text) if fileid_elem is not None and fileid_elem.text is not None else None, "path": path, "name": displayname_elem.text if displayname_elem is not None else path.split("/")[-1], "size": int(contentlength_elem.text) if contentlength_elem is not None and contentlength_elem.text else 0, "content_type": contenttype_elem.text if contenttype_elem is not None else "", "last_modified": lastmodified_elem.text if lastmodified_elem is not None else None, "etag": etag_elem.text.strip('"') if etag_elem is not None and etag_elem.text else None, "is_directory": is_directory, } logger.debug(f"Got file info for '{path}': id={file_info['id']}") return file_info async def create_tag( self, name: str, user_visible: bool = True, user_assignable: bool = True, ) -> dict[str, Any]: """Create a system tag via WebDAV. Args: name: Name of the tag to create user_visible: Whether the tag is visible to users user_assignable: Whether users can assign this tag Returns: Tag dictionary with id, name, userVisible, userAssignable Raises: HTTPStatusError: If tag creation fails (409 if already exists) """ # Use WebDAV POST with JSON body to create tag response = await self._client.post( "/remote.php/dav/systemtags/", headers={"Content-Type": "application/json"}, json={ "name": name, "userVisible": user_visible, "userAssignable": user_assignable, }, ) response.raise_for_status() # Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42) content_location = response.headers.get("Content-Location", "") tag_id = None if content_location: # Extract the numeric ID from the path try: tag_id = int(content_location.rstrip("/").split("/")[-1]) except (ValueError, IndexError): pass tag_info = { "id": tag_id, "name": name, "userVisible": user_visible, "userAssignable": user_assignable, } logger.info(f"Created tag '{name}' with ID {tag_info['id']}") return tag_info async def get_or_create_tag( self, name: str, user_visible: bool = True, user_assignable: bool = True, ) -> dict[str, Any]: """Get a tag by name, creating it if it doesn't exist. Args: name: Name of the tag user_visible: Whether the tag is visible to users (for creation) user_assignable: Whether users can assign this tag (for creation) Returns: Tag dictionary with id, name, userVisible, userAssignable """ # First try to get existing tag existing_tag = await self.get_tag_by_name(name) if existing_tag: logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}") return existing_tag # Create new tag try: return await self.create_tag(name, user_visible, user_assignable) except HTTPStatusError as e: if e.response.status_code == 409: # Tag was created between our check and creation, fetch it existing_tag = await self.get_tag_by_name(name) if existing_tag: return existing_tag raise async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool: """Assign a system tag to a file. Args: file_id: Numeric file ID tag_id: Numeric tag ID Returns: True if tag was assigned successfully (or already assigned) Raises: HTTPStatusError: If tag assignment fails """ response = await self._client.request( "PUT", f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}", headers={"Content-Length": "0"}, content=b"", ) # 201 = Created (new assignment), 409 = Conflict (already assigned) if response.status_code in (201, 409): logger.info(f"Tagged file {file_id} with tag {tag_id}") return True response.raise_for_status() return True async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool: """Remove a system tag from a file. Args: file_id: Numeric file ID tag_id: Numeric tag ID Returns: True if tag was removed successfully (or wasn't assigned) Raises: HTTPStatusError: If tag removal fails """ response = await self._client.request( "DELETE", f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}", ) # 204 = No Content (removed), 404 = Not Found (wasn't assigned) if response.status_code in (204, 404): logger.info(f"Removed tag {tag_id} from file {file_id}") return True response.raise_for_status() return True