added first versoin of the new document_parser utility and added it to the webdav file retrieval logic

This commit is contained in:
yuisheaven
2025-10-04 04:28:24 +02:00
parent 642108ee91
commit 76dce41ed9
5 changed files with 332 additions and 2 deletions
@@ -0,0 +1,154 @@
"""HTTP client for Unstructured API."""
import io
import logging
from typing import Optional, Tuple
import httpx
from nextcloud_mcp_server.config import get_unstructured_api_url
logger = logging.getLogger(__name__)
class UnstructuredClient:
"""Client for interacting with the Unstructured API.
The Unstructured API provides document parsing capabilities for various formats
including PDF, DOCX, images with OCR, and more.
API Documentation: https://docs.unstructured.io/api-reference/api-services/api-parameters
"""
def __init__(self, api_url: Optional[str] = None, timeout: int = 120):
"""Initialize the Unstructured API client.
Args:
api_url: Base URL of the Unstructured API. If None, will use config.
timeout: Request timeout in seconds (default: 120 for large documents)
"""
self.api_url = api_url or get_unstructured_api_url()
self.timeout = timeout
if not self.api_url:
raise ValueError(
"Unstructured API URL not configured. "
"Set ENABLE_UNSTRUCTURED_PARSING=true and UNSTRUCTURED_API_URL in environment."
)
logger.info(f"Initialized UnstructuredClient with API URL: {self.api_url}")
async def partition_document(
self,
content: bytes,
filename: str,
content_type: Optional[str] = None,
strategy: str = "auto",
languages: Optional[list[str]] = None,
extract_image_block_types: Optional[list[str]] = None,
) -> Tuple[str, dict]:
"""Parse a document using the Unstructured API.
Args:
content: The document content as bytes
filename: The filename (used for format detection)
content_type: Optional MIME type
strategy: Parsing strategy - "auto", "fast", or "hi_res" (OCR-based)
languages: List of language codes for OCR (e.g., ["eng", "deu"])
extract_image_block_types: Types of elements to extract from images
Returns:
Tuple of (parsed_text, metadata) where:
- parsed_text: The extracted text content
- metadata: Additional metadata about the parsing
Raises:
httpx.HTTPError: If the API request fails
Exception: If parsing fails
"""
if languages is None:
languages = ["eng"] # Default to English
# Prepare the multipart form data
files = {
"files": (filename, io.BytesIO(content), content_type or "application/octet-stream")
}
# Prepare the request data
data = {
"strategy": strategy,
"languages": ",".join(languages),
}
if extract_image_block_types:
data["extract_image_block_types"] = ",".join(extract_image_block_types)
logger.debug(
f"Partitioning document '{filename}' with strategy '{strategy}', "
f"languages: {languages}"
)
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.api_url}/general/v0/general",
files=files,
data=data,
)
response.raise_for_status()
# Parse the response
elements = response.json()
# Extract text from elements
# Each element has a "text" field
texts = []
element_types = {}
for element in elements:
if "text" in element and element["text"]:
texts.append(element["text"])
# Track element types
el_type = element.get("type", "unknown")
element_types[el_type] = element_types.get(el_type, 0) + 1
parsed_text = "\n\n".join(texts)
# Collect metadata
metadata = {
"element_count": len(elements),
"text_length": len(parsed_text),
"element_types": element_types,
"strategy": strategy,
"languages": languages,
"parsing_method": "unstructured_api"
}
logger.debug(
f"Successfully parsed document: {len(elements)} elements, "
f"{len(parsed_text)} characters"
)
return parsed_text, metadata
except httpx.HTTPError as e:
logger.error(f"HTTP error calling Unstructured API: {e}")
raise Exception(f"Failed to parse document via Unstructured API: {str(e)}") from e
except Exception as e:
logger.error(f"Unexpected error parsing document: {e}")
raise Exception(f"Failed to parse document: {str(e)}") from e
async def health_check(self) -> bool:
"""Check if the Unstructured API is available.
Returns:
True if the API is healthy, False otherwise.
"""
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get(f"{self.api_url}/healthcheck")
return response.status_code == 200
except Exception as e:
logger.warning(f"Unstructured API health check failed: {e}")
return False
+25
View File
@@ -1,4 +1,6 @@
import logging.config
import os
from typing import Optional
LOGGING_CONFIG = {
"version": 1,
@@ -35,3 +37,26 @@ LOGGING_CONFIG = {
def setup_logging():
logging.config.dictConfig(LOGGING_CONFIG)
# Document Parsing Configuration
def get_unstructured_api_url() -> Optional[str]:
"""Get the Unstructured API URL from environment variables.
Returns:
The Unstructured API URL if parsing is enabled, None otherwise.
"""
enabled = os.getenv("ENABLE_UNSTRUCTURED_PARSING", "true").lower() == "true"
if not enabled:
return None
return os.getenv("UNSTRUCTURED_API_URL", "http://unstructured:8000")
def is_unstructured_parsing_enabled() -> bool:
"""Check if unstructured document parsing is enabled.
Returns:
True if enabled, False otherwise.
"""
return os.getenv("ENABLE_UNSTRUCTURED_PARSING", "true").lower() == "true"
+32 -2
View File
@@ -3,6 +3,8 @@ import logging
from mcp.server.fastmcp import Context, FastMCP
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.utils.document_parser import is_parseable_document, parse_document
from nextcloud_mcp_server.config import is_unstructured_parsing_enabled
logger = logging.getLogger(__name__)
@@ -37,14 +39,21 @@ def configure_webdav_tools(mcp: FastMCP):
path: Full path to the file to read
Returns:
Dict with path, content, content_type, size, and encoding (if binary)
Text files are decoded to UTF-8, binary files are base64 encoded
Dict with path, content, content_type, size, and optional parsing metadata
- Text files are decoded to UTF-8
- Documents (PDF, DOCX, etc.) are parsed and text is extracted
- Other binary files are base64 encoded
Examples:
# Read a text file
result = await nc_webdav_read_file("Documents/readme.txt")
logger.info(result['content']) # Decoded text content
# Read a PDF document (automatically parsed)
result = await nc_webdav_read_file("Documents/report.pdf")
logger.info(result['content']) # Extracted text from PDF
logger.info(result['parsing_metadata']) # Document parsing info
# Read a binary file
result = await nc_webdav_read_file("Images/photo.jpg")
logger.info(result['encoding']) # 'base64'
@@ -52,6 +61,27 @@ def configure_webdav_tools(mcp: FastMCP):
client: NextcloudClient = ctx.request_context.lifespan_context.client
content, content_type = await client.webdav.read_file(path)
# Check if this is a parseable document (PDF, DOCX, etc.)
if (is_unstructured_parsing_enabled() and is_parseable_document(content_type)):
try:
logger.info(f"Parsing document '{path}' of type '{content_type}'")
parsed_text, metadata = await parse_document(
content, content_type, filename=path
)
return {
"path": path,
"content": parsed_text,
"content_type": content_type,
"size": len(content),
"parsed": True,
"parsing_metadata": metadata,
}
except Exception as e:
logger.warning(
f"Failed to parse document '{path}', falling back to base64: {e}"
)
# Fall through to base64 encoding on parse failure
# For text files, decode content for easier viewing
if content_type and content_type.startswith("text/"):
try:
+1
View File
@@ -0,0 +1 @@
"""Utility functions for the Nextcloud MCP server."""
@@ -0,0 +1,120 @@
"""Document parsing utilities based on the "unstructured" microservice"""
import logging
from typing import Optional, Tuple
from nextcloud_mcp_server.config import is_unstructured_parsing_enabled
logger = logging.getLogger(__name__)
# Mapping of MIME types to their corresponding parsing strategies
PARSEABLE_MIME_TYPES = {
# PDF documents
"application/pdf": "pdf",
# Microsoft Word documents
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
"application/msword": "doc",
# Microsoft PowerPoint
"application/vnd.openxmlformats-officedocument.presentationml.presentation": "pptx",
"application/vnd.ms-powerpoint": "ppt",
# Microsoft Excel
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx",
"application/vnd.ms-excel": "xls",
# Other document formats
"application/rtf": "rtf",
"text/rtf": "rtf",
"application/vnd.oasis.opendocument.text": "odt",
"application/epub+zip": "epub",
# Email formats
"message/rfc822": "eml",
"application/vnd.ms-outlook": "msg",
# Image formats (for OCR)
"image/jpeg": "image",
"image/png": "image",
"image/tiff": "image",
"image/bmp": "image",
}
def is_parseable_document(content_type: Optional[str]) -> bool:
"""Check if a document type can be parsed.
Args:
content_type: The MIME type of the document
Returns:
True if the document can be parsed, False otherwise
"""
if not content_type:
return False
# Handle content types with additional parameters (e.g., "application/pdf; charset=utf-8")
base_content_type = content_type.split(";")[0].strip().lower()
return base_content_type in PARSEABLE_MIME_TYPES
async def parse_document(
content: bytes,
content_type: Optional[str],
filename: Optional[str] = None
) -> Tuple[str, dict]:
"""Parse a document using the Unstructured API.
Args:
content: The document content as bytes
content_type: The MIME type of the document
filename: Optional filename to help with format detection
Returns:
Tuple of (parsed_text, metadata) where:
- parsed_text: The extracted text content
- metadata: Additional metadata about the parsing
Raises:
ValueError: If the document type is not supported
Exception: If parsing fails
"""
if not is_parseable_document(content_type):
raise ValueError(f"Document type '{content_type}' is not supported for parsing")
base_content_type = content_type.split(";")[0].strip().lower() if content_type else ""
doc_type = PARSEABLE_MIME_TYPES.get(base_content_type, "unknown")
logger.debug(f"Parsing document of type '{doc_type}' (MIME: {content_type})")
# Check if unstructured parsing is enabled via environment
if is_unstructured_parsing_enabled():
logger.debug("Using Unstructured API for parsing")
try:
from nextcloud_mcp_server.client.unstructured_client import UnstructuredClient
client = UnstructuredClient()
return await client.partition_document(
content=content,
filename=filename or f"document.{doc_type}",
content_type=content_type,
strategy="auto"
)
except Exception as e:
logger.error(f"Unstructured API parsing failed: {e}")
# If unstructured parsing fails, return base64 as fallback
import base64
parsed_text = f"Document could not be parsed. Base64 content: {base64.b64encode(content).decode('ascii')[:200]}..."
metadata = {
"document_type": doc_type,
"mime_type": content_type,
"element_count": 0,
"text_length": len(parsed_text),
"parsing_method": "fallback_base64",
"error": str(e)
}
return parsed_text, metadata
else:
logger.debug("Unstructured parsing is disabled, returning base64 encoded content as fallback")
import base64
parsed_text = f"Document could not be parsed. Base64 content: {base64.b64encode(content).decode('ascii')[:200]}..."
metadata = {
"document_type": doc_type,
"mime_type": content_type,
"element_count": 0,
"text_length": len(parsed_text),
"parsing_method": "fallback_base64"
}
return parsed_text, metadata