feat: Add text processing background worker for telling client about progress

This commit is contained in:
Chris Coutinho
2025-10-25 19:52:45 +02:00
parent 2147fc1696
commit a36038422b
13 changed files with 487 additions and 21 deletions
+1 -1
View File
@@ -59,7 +59,7 @@ services:
# Unstructured API runs on port 8000 internally
# We expose it on 8002 externally to avoid conflict
profiles:
- text-extraction
- unstructured
mcp:
build: .
+5
View File
@@ -57,6 +57,11 @@ UNSTRUCTURED_STRATEGY=auto
# Common: eng=English, deu=German, fra=French, spa=Spanish
UNSTRUCTURED_LANGUAGES=eng,deu
# Progress reporting interval in seconds (default: 10)
# During long-running OCR operations, progress notifications are sent to the MCP client
# at this interval to prevent timeouts and provide status updates
PROGRESS_INTERVAL=10
# ============================================
# Tesseract Processor (Local OCR)
# ============================================
+1
View File
@@ -72,6 +72,7 @@ def initialize_document_processors():
timeout=unst_config["timeout"],
default_strategy=unst_config["strategy"],
default_languages=unst_config["languages"],
progress_interval=unst_config.get("progress_interval", 10),
)
registry.register(processor, priority=10)
logger.info(f"Registered Unstructured processor: {unst_config['api_url']}")
+1
View File
@@ -90,6 +90,7 @@ def get_document_processor_config() -> dict[str, Any]:
for lang in os.getenv("UNSTRUCTURED_LANGUAGES", "eng,deu").split(",")
if lang.strip()
],
"progress_interval": int(os.getenv("PROGRESS_INTERVAL", "10")),
}
# Tesseract configuration
@@ -1,6 +1,7 @@
"""Abstract base class for document processing plugins."""
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from pydantic import BaseModel
@@ -71,6 +72,9 @@ class DocumentProcessor(ABC):
content_type: str,
filename: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> ProcessingResult:
"""Process a document and extract text.
@@ -79,6 +83,11 @@ class DocumentProcessor(ABC):
content_type: MIME type of the document
filename: Optional filename for format detection
options: Processor-specific options (e.g., OCR language, strategy)
progress_callback: Optional async callback for progress updates.
Called as: await progress_callback(progress, total, message)
- progress: Current progress value (monotonically increasing)
- total: Optional total value (None if unknown)
- message: Optional human-readable status message
Returns:
ProcessingResult with extracted text and metadata
@@ -1,6 +1,7 @@
"""Generic HTTP API processor wrapper for custom document processing services."""
import logging
from collections.abc import Awaitable, Callable
from typing import Any, Optional
import httpx
@@ -70,6 +71,9 @@ class CustomHTTPProcessor(DocumentProcessor):
content_type: str,
filename: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> ProcessingResult:
"""Process via custom HTTP API.
@@ -1,6 +1,7 @@
"""Central registry for document processors."""
import logging
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from .base import DocumentProcessor, ProcessingResult, ProcessorError
@@ -113,6 +114,9 @@ class ProcessorRegistry:
filename: Optional[str] = None,
processor_name: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> ProcessingResult:
"""Process a document using available processors.
@@ -122,6 +126,7 @@ class ProcessorRegistry:
filename: Optional filename for format detection
processor_name: Force specific processor (or None for auto-select)
options: Processing options passed to processor
progress_callback: Optional async callback for progress updates
Returns:
ProcessingResult with extracted text and metadata
@@ -148,7 +153,9 @@ class ProcessorRegistry:
logger.info(f"Processing with '{processor.name}' processor")
# Process
return await processor.process(content, content_type, filename, options)
return await processor.process(
content, content_type, filename, options, progress_callback
)
# Global registry instance
@@ -2,6 +2,7 @@
import logging
import shutil
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from .base import DocumentProcessor, ProcessingResult, ProcessorError
@@ -86,6 +87,9 @@ class TesseractProcessor(DocumentProcessor):
content_type: str,
filename: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> ProcessingResult:
"""Process image via Tesseract OCR.
@@ -2,8 +2,11 @@
import io
import logging
import time
from collections.abc import Awaitable, Callable
from typing import Any, Optional
import anyio
import httpx
from .base import DocumentProcessor, ProcessingResult, ProcessorError
@@ -47,6 +50,7 @@ class UnstructuredProcessor(DocumentProcessor):
timeout: int = 120,
default_strategy: str = "auto",
default_languages: Optional[list[str]] = None,
progress_interval: int = 10,
):
"""Initialize Unstructured processor.
@@ -55,15 +59,18 @@ class UnstructuredProcessor(DocumentProcessor):
timeout: Request timeout in seconds (default: 120)
default_strategy: Default parsing strategy - "auto", "fast", or "hi_res"
default_languages: Default OCR language codes (e.g., ["eng", "deu"])
progress_interval: Seconds between progress updates (default: 10)
"""
self.api_url = api_url
self.timeout = timeout
self.default_strategy = default_strategy
self.default_languages = default_languages or ["eng"]
self.progress_interval = progress_interval
logger.info(
f"Initialized UnstructuredProcessor: {api_url}, "
f"strategy={default_strategy}, languages={self.default_languages}"
f"strategy={default_strategy}, languages={self.default_languages}, "
f"progress_interval={progress_interval}s"
)
@property
@@ -74,23 +81,65 @@ class UnstructuredProcessor(DocumentProcessor):
def supported_mime_types(self) -> set[str]:
return self.SUPPORTED_TYPES
async def process(
async def _run_progress_poller(
self,
stop_event: anyio.Event,
progress_callback: Callable[
[float, Optional[float], Optional[str]], Awaitable[None]
],
start_time: float,
):
"""Run progress poller that reports status every N seconds.
Args:
stop_event: Event to signal when processing is complete
progress_callback: Async callback to report progress
start_time: Time when processing started (from time.time())
"""
logger.debug("Starting progress poller")
while not stop_event.is_set():
try:
# Wait for the event to be set, with a timeout equal to progress_interval
with anyio.fail_after(self.progress_interval):
await stop_event.wait()
# If wait() finished, the event was set (processing complete)
break
except TimeoutError:
# Timeout occurred - time to send a progress update
if not stop_event.is_set(): # Double-check in case of race condition
elapsed = int(time.time() - start_time)
message = (
f"Processing document with unstructured... ({elapsed}s elapsed)"
)
try:
await progress_callback(
progress=float(elapsed),
total=None, # Unknown total duration
message=message,
)
logger.debug(f"Progress update sent: {elapsed}s elapsed")
except Exception as e:
logger.warning(f"Failed to send progress update: {e}")
logger.debug("Progress poller stopped")
async def _make_api_request(
self,
content: bytes,
content_type: str,
filename: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
filename: Optional[str],
strategy: str,
languages: list[str],
extract_image_block_types: Optional[list[str]],
) -> ProcessingResult:
"""Process document via Unstructured API.
"""Make the actual API request to Unstructured.
Args:
content: Document bytes
content_type: MIME type
filename: Optional filename for format detection
options: Processing options:
- strategy: "auto", "fast", or "hi_res" (default: from init)
- languages: List of language codes (default: from init)
- extract_image_block_types: Types of image elements to extract
filename: Optional filename
strategy: Processing strategy
languages: OCR languages
extract_image_block_types: Image element types to extract
Returns:
ProcessingResult with extracted text and metadata
@@ -98,13 +147,6 @@ class UnstructuredProcessor(DocumentProcessor):
Raises:
ProcessorError: If processing fails
"""
options = options or {}
# Extract options with defaults
strategy = options.get("strategy", self.default_strategy)
languages = options.get("languages", self.default_languages)
extract_image_block_types = options.get("extract_image_block_types")
# Prepare multipart request
files = {
"files": (
@@ -178,6 +220,81 @@ class UnstructuredProcessor(DocumentProcessor):
logger.error(f"Unstructured API processing failed: {e}")
raise ProcessorError(f"Processing failed: {str(e)}") from e
async def process(
self,
content: bytes,
content_type: str,
filename: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> ProcessingResult:
"""Process document via Unstructured API.
Args:
content: Document bytes
content_type: MIME type
filename: Optional filename for format detection
options: Processing options:
- strategy: "auto", "fast", or "hi_res" (default: from init)
- languages: List of language codes (default: from init)
- extract_image_block_types: Types of image elements to extract
progress_callback: Optional async callback for progress updates
Returns:
ProcessingResult with extracted text and metadata
Raises:
ProcessorError: If processing fails
"""
options = options or {}
# Extract options with defaults
strategy = options.get("strategy", self.default_strategy)
languages = options.get("languages", self.default_languages)
extract_image_block_types = options.get("extract_image_block_types")
# If no progress callback, just make the request directly
if progress_callback is None:
return await self._make_api_request(
content=content,
content_type=content_type,
filename=filename,
strategy=strategy,
languages=languages,
extract_image_block_types=extract_image_block_types,
)
# With progress callback: run API request + progress poller concurrently
stop_event = anyio.Event()
start_time = time.time()
result = None
async def capture_result():
nonlocal result
try:
result = await self._make_api_request(
content=content,
content_type=content_type,
filename=filename,
strategy=strategy,
languages=languages,
extract_image_block_types=extract_image_block_types,
)
finally:
# Signal poller to stop after API request completes
stop_event.set()
# Run both tasks concurrently using anyio task groups
async with anyio.create_task_group() as tg:
tg.start_soon(capture_result)
tg.start_soon(
self._run_progress_poller, stop_event, progress_callback, start_time
)
return result
async def health_check(self) -> bool:
"""Check if Unstructured API is available.
+4 -1
View File
@@ -85,7 +85,10 @@ def configure_webdav_tools(mcp: FastMCP):
try:
logger.info(f"Parsing document '{path}' of type '{content_type}'")
parsed_text, metadata = await parse_document(
content, content_type, filename=path
content,
content_type,
filename=path,
progress_callback=ctx.report_progress,
)
return {
"path": path,
@@ -2,6 +2,7 @@
import base64
import logging
from collections.abc import Awaitable, Callable
from typing import Optional, Tuple
from nextcloud_mcp_server.config import get_document_processor_config
@@ -35,7 +36,12 @@ def is_parseable_document(content_type: Optional[str]) -> bool:
async def parse_document(
content: bytes, content_type: Optional[str], filename: Optional[str] = None
content: bytes,
content_type: Optional[str],
filename: Optional[str] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> Tuple[str, dict]:
"""Parse a document using registered processors.
@@ -46,6 +52,7 @@ async def parse_document(
content: The document content as bytes
content_type: The MIME type of the document
filename: Optional filename to help with format detection
progress_callback: Optional async callback for progress updates during long operations
Returns:
Tuple of (parsed_text, metadata) where:
@@ -73,6 +80,7 @@ async def parse_document(
content=content,
content_type=content_type,
filename=filename,
progress_callback=progress_callback,
)
logger.info(f"Successfully parsed document with '{result.processor}' processor")
@@ -0,0 +1,143 @@
"""Integration tests for document processing with progress notifications."""
import io
import pytest
from PIL import Image
pytestmark = pytest.mark.integration
class TestDocumentProcessingProgress:
"""Test document processing with progress notifications."""
async def test_unstructured_processor_with_progress_callback(self, nc_client):
"""Test that UnstructuredProcessor calls progress callback during processing."""
import os
# Skip if unstructured is not enabled
if os.getenv("ENABLE_UNSTRUCTURED", "false").lower() != "true":
pytest.skip("Unstructured processor not enabled")
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
# Track progress callback invocations
progress_updates = []
async def track_progress(progress: float, total: float | None, message: str):
progress_updates.append(
{"progress": progress, "total": total, "message": message}
)
# Create processor configured to use local unstructured service
processor = UnstructuredProcessor(
api_url=os.getenv("UNSTRUCTURED_API_URL", "http://unstructured:8000"),
timeout=120,
progress_interval=2, # 2 second intervals for testing
)
# Create a simple test image (which requires OCR processing)
# This should take long enough to trigger at least one progress update
img = Image.new("RGB", (400, 200), color=(73, 109, 137))
buffer = io.BytesIO()
img.save(buffer, format="PNG")
test_image = buffer.getvalue()
# Process with progress callback
result = await processor.process(
content=test_image,
content_type="image/png",
filename="test.png",
progress_callback=track_progress,
)
# Verify processing succeeded
assert result.success is True
assert result.processor == "unstructured"
assert isinstance(result.text, str)
# Note: Progress updates may or may not occur depending on processing speed
# If updates occurred, verify their structure
if progress_updates:
for update in progress_updates:
assert isinstance(update["progress"], float)
assert update["total"] is None # Unknown total
assert "Processing document with unstructured" in update["message"]
assert "elapsed" in update["message"]
async def test_webdav_read_file_sends_progress_notifications(
self, nc_mcp_client, nc_client
):
"""Test that reading a document via WebDAV MCP tool sends progress notifications."""
import os
# Skip if document processing is not enabled
if os.getenv("ENABLE_DOCUMENT_PROCESSING", "false").lower() != "true":
pytest.skip("Document processing not enabled")
# Create a test image file in Nextcloud via WebDAV
from PIL import Image
img = Image.new("RGB", (400, 200), color=(100, 150, 200))
buffer = io.BytesIO()
img.save(buffer, format="PNG")
test_image = buffer.getvalue()
# Upload test file
test_path = "test_progress.png"
await nc_client.webdav.write_file(test_path, test_image, "image/png")
try:
# Read file via MCP tool (which should trigger document processing)
# The MCP client will automatically track progress notifications
result = await nc_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": test_path}
)
# Note: FastMCP progress notifications are sent automatically by ctx.report_progress
# We can't easily capture them in this test without mocking the MCP transport layer
# The important thing is that the code path is exercised without errors
assert result.isError is False
finally:
# Cleanup
try:
await nc_client.webdav.delete_resource(test_path)
except Exception:
pass # Ignore cleanup errors
async def test_progress_callback_not_required(self, nc_client):
"""Test that processing works without progress callback (backward compatibility)."""
import os
if os.getenv("ENABLE_UNSTRUCTURED", "false").lower() != "true":
pytest.skip("Unstructured processor not enabled")
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
processor = UnstructuredProcessor(
api_url=os.getenv("UNSTRUCTURED_API_URL", "http://unstructured:8000"),
timeout=120,
)
# Create simple test image
img = Image.new("RGB", (200, 100), color=(50, 100, 150))
buffer = io.BytesIO()
img.save(buffer, format="PNG")
test_image = buffer.getvalue()
# Process WITHOUT progress callback
result = await processor.process(
content=test_image,
content_type="image/png",
filename="test.png",
progress_callback=None, # Explicitly None
)
# Should still work
assert result.success is True
assert result.processor == "unstructured"
+164
View File
@@ -0,0 +1,164 @@
"""Unit tests for progress notification system."""
import time
from unittest.mock import AsyncMock
import anyio
import pytest
pytestmark = pytest.mark.unit
class TestProgressNotification:
"""Test progress notification in document processors."""
async def test_progress_callback_called_during_processing(self):
"""Test that progress callback is called at intervals during processing."""
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
# Mock progress callback to track calls
progress_callback = AsyncMock()
# Create processor with 1-second interval for faster testing
processor = UnstructuredProcessor(
api_url="http://test:8000",
timeout=10,
progress_interval=1,
)
# Create a mock event and start time
stop_event = anyio.Event()
start_time = time.time()
# Run the poller for 3 seconds, then stop it
async def stop_after_delay():
await anyio.sleep(3.5)
stop_event.set()
# Run poller and stopper concurrently
async with anyio.create_task_group() as tg:
tg.start_soon(
processor._run_progress_poller,
stop_event,
progress_callback,
start_time,
)
tg.start_soon(stop_after_delay)
# Verify progress callback was called at least 3 times (1s, 2s, 3s)
assert progress_callback.call_count >= 3
# Verify each call had correct structure
for call in progress_callback.call_args_list:
# Calls are made with keyword arguments
assert "progress" in call.kwargs
assert "total" in call.kwargs
assert "message" in call.kwargs
progress = call.kwargs["progress"]
total = call.kwargs["total"]
message = call.kwargs["message"]
assert isinstance(progress, float)
assert total is None # Unknown total for unstructured
assert "Processing document with unstructured" in message
assert "elapsed" in message
async def test_progress_poller_stops_when_event_set(self):
"""Test that progress poller stops immediately when event is set."""
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
progress_callback = AsyncMock()
processor = UnstructuredProcessor(
api_url="http://test:8000",
timeout=10,
progress_interval=10, # Long interval
)
stop_event = anyio.Event()
start_time = time.time()
# Set event immediately
stop_event.set()
# Run poller
await processor._run_progress_poller(stop_event, progress_callback, start_time)
# Should not call progress callback since event was already set
assert progress_callback.call_count == 0
async def test_progress_callback_exception_handled(self):
"""Test that exceptions in progress callback don't crash the poller."""
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
# Mock callback that raises exception
progress_callback = AsyncMock(side_effect=Exception("Callback error"))
processor = UnstructuredProcessor(
api_url="http://test:8000",
timeout=10,
progress_interval=1,
)
stop_event = anyio.Event()
start_time = time.time()
# Run poller for 2 seconds
async def stop_after_delay():
await anyio.sleep(2.5)
stop_event.set()
# Should not raise exception even though callback fails
async with anyio.create_task_group() as tg:
tg.start_soon(
processor._run_progress_poller,
stop_event,
progress_callback,
start_time,
)
tg.start_soon(stop_after_delay)
# Callback should have been called (and failed) at least twice
assert progress_callback.call_count >= 2
async def test_process_without_progress_callback(self):
"""Test that processing works without progress callback (backward compatibility)."""
from nextcloud_mcp_server.document_processors.unstructured import (
UnstructuredProcessor,
)
processor = UnstructuredProcessor(
api_url="http://test:8000",
timeout=10,
progress_interval=1,
)
# Mock the _make_api_request method to avoid actual HTTP call
from unittest.mock import patch
from nextcloud_mcp_server.document_processors.base import ProcessingResult
mock_result = ProcessingResult(
text="Test content",
metadata={"test": "data"},
processor="unstructured",
success=True,
)
with patch.object(
processor, "_make_api_request", return_value=mock_result
) as mock_request:
# Call process without progress_callback
result = await processor.process(
content=b"test", content_type="application/pdf", progress_callback=None
)
# Should call _make_api_request directly
assert result == mock_result
mock_request.assert_called_once()