From a36038422b7c3de1b78623bf1c7b50f1890438dc Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 25 Oct 2025 19:52:45 +0200 Subject: [PATCH] feat: Add text processing background worker for telling client about progress --- docker-compose.yml | 2 +- env.sample | 5 + nextcloud_mcp_server/app.py | 1 + nextcloud_mcp_server/config.py | 1 + .../document_processors/base.py | 9 + .../document_processors/custom_http.py | 4 + .../document_processors/registry.py | 9 +- .../document_processors/tesseract.py | 4 + .../document_processors/unstructured.py | 151 ++++++++++++++-- nextcloud_mcp_server/server/webdav.py | 5 +- nextcloud_mcp_server/utils/document_parser.py | 10 +- .../test_document_processing_progress.py | 143 +++++++++++++++ tests/unit/test_progress_notification.py | 164 ++++++++++++++++++ 13 files changed, 487 insertions(+), 21 deletions(-) create mode 100644 tests/integration/client/test_document_processing_progress.py create mode 100644 tests/unit/test_progress_notification.py diff --git a/docker-compose.yml b/docker-compose.yml index ad13f60..7bc7e1b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,7 +59,7 @@ services: # Unstructured API runs on port 8000 internally # We expose it on 8002 externally to avoid conflict profiles: - - text-extraction + - unstructured mcp: build: . diff --git a/env.sample b/env.sample index 45dce9b..ef79c31 100644 --- a/env.sample +++ b/env.sample @@ -57,6 +57,11 @@ UNSTRUCTURED_STRATEGY=auto # Common: eng=English, deu=German, fra=French, spa=Spanish UNSTRUCTURED_LANGUAGES=eng,deu +# Progress reporting interval in seconds (default: 10) +# During long-running OCR operations, progress notifications are sent to the MCP client +# at this interval to prevent timeouts and provide status updates +PROGRESS_INTERVAL=10 + # ============================================ # Tesseract Processor (Local OCR) # ============================================ diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 960e9e0..0bb61d3 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -72,6 +72,7 @@ def initialize_document_processors(): timeout=unst_config["timeout"], default_strategy=unst_config["strategy"], default_languages=unst_config["languages"], + progress_interval=unst_config.get("progress_interval", 10), ) registry.register(processor, priority=10) logger.info(f"Registered Unstructured processor: {unst_config['api_url']}") diff --git a/nextcloud_mcp_server/config.py b/nextcloud_mcp_server/config.py index af41ee9..2617e58 100644 --- a/nextcloud_mcp_server/config.py +++ b/nextcloud_mcp_server/config.py @@ -90,6 +90,7 @@ def get_document_processor_config() -> dict[str, Any]: for lang in os.getenv("UNSTRUCTURED_LANGUAGES", "eng,deu").split(",") if lang.strip() ], + "progress_interval": int(os.getenv("PROGRESS_INTERVAL", "10")), } # Tesseract configuration diff --git a/nextcloud_mcp_server/document_processors/base.py b/nextcloud_mcp_server/document_processors/base.py index c43f7e9..f812a26 100644 --- a/nextcloud_mcp_server/document_processors/base.py +++ b/nextcloud_mcp_server/document_processors/base.py @@ -1,6 +1,7 @@ """Abstract base class for document processing plugins.""" from abc import ABC, abstractmethod +from collections.abc import Awaitable, Callable from typing import Any, Optional from pydantic import BaseModel @@ -71,6 +72,9 @@ class DocumentProcessor(ABC): content_type: str, filename: Optional[str] = None, options: Optional[dict[str, Any]] = None, + progress_callback: Optional[ + Callable[[float, Optional[float], Optional[str]], Awaitable[None]] + ] = None, ) -> ProcessingResult: """Process a document and extract text. @@ -79,6 +83,11 @@ class DocumentProcessor(ABC): content_type: MIME type of the document filename: Optional filename for format detection options: Processor-specific options (e.g., OCR language, strategy) + progress_callback: Optional async callback for progress updates. + Called as: await progress_callback(progress, total, message) + - progress: Current progress value (monotonically increasing) + - total: Optional total value (None if unknown) + - message: Optional human-readable status message Returns: ProcessingResult with extracted text and metadata diff --git a/nextcloud_mcp_server/document_processors/custom_http.py b/nextcloud_mcp_server/document_processors/custom_http.py index e34ae45..c1f5c40 100644 --- a/nextcloud_mcp_server/document_processors/custom_http.py +++ b/nextcloud_mcp_server/document_processors/custom_http.py @@ -1,6 +1,7 @@ """Generic HTTP API processor wrapper for custom document processing services.""" import logging +from collections.abc import Awaitable, Callable from typing import Any, Optional import httpx @@ -70,6 +71,9 @@ class CustomHTTPProcessor(DocumentProcessor): content_type: str, filename: Optional[str] = None, options: Optional[dict[str, Any]] = None, + progress_callback: Optional[ + Callable[[float, Optional[float], Optional[str]], Awaitable[None]] + ] = None, ) -> ProcessingResult: """Process via custom HTTP API. diff --git a/nextcloud_mcp_server/document_processors/registry.py b/nextcloud_mcp_server/document_processors/registry.py index 97bcb9c..71ef06b 100644 --- a/nextcloud_mcp_server/document_processors/registry.py +++ b/nextcloud_mcp_server/document_processors/registry.py @@ -1,6 +1,7 @@ """Central registry for document processors.""" import logging +from collections.abc import Awaitable, Callable from typing import Any, Optional from .base import DocumentProcessor, ProcessingResult, ProcessorError @@ -113,6 +114,9 @@ class ProcessorRegistry: filename: Optional[str] = None, processor_name: Optional[str] = None, options: Optional[dict[str, Any]] = None, + progress_callback: Optional[ + Callable[[float, Optional[float], Optional[str]], Awaitable[None]] + ] = None, ) -> ProcessingResult: """Process a document using available processors. @@ -122,6 +126,7 @@ class ProcessorRegistry: filename: Optional filename for format detection processor_name: Force specific processor (or None for auto-select) options: Processing options passed to processor + progress_callback: Optional async callback for progress updates Returns: ProcessingResult with extracted text and metadata @@ -148,7 +153,9 @@ class ProcessorRegistry: logger.info(f"Processing with '{processor.name}' processor") # Process - return await processor.process(content, content_type, filename, options) + return await processor.process( + content, content_type, filename, options, progress_callback + ) # Global registry instance diff --git a/nextcloud_mcp_server/document_processors/tesseract.py b/nextcloud_mcp_server/document_processors/tesseract.py index 809fcc2..d912868 100644 --- a/nextcloud_mcp_server/document_processors/tesseract.py +++ b/nextcloud_mcp_server/document_processors/tesseract.py @@ -2,6 +2,7 @@ import logging import shutil +from collections.abc import Awaitable, Callable from typing import Any, Optional from .base import DocumentProcessor, ProcessingResult, ProcessorError @@ -86,6 +87,9 @@ class TesseractProcessor(DocumentProcessor): content_type: str, filename: Optional[str] = None, options: Optional[dict[str, Any]] = None, + progress_callback: Optional[ + Callable[[float, Optional[float], Optional[str]], Awaitable[None]] + ] = None, ) -> ProcessingResult: """Process image via Tesseract OCR. diff --git a/nextcloud_mcp_server/document_processors/unstructured.py b/nextcloud_mcp_server/document_processors/unstructured.py index b7ca73c..de434c1 100644 --- a/nextcloud_mcp_server/document_processors/unstructured.py +++ b/nextcloud_mcp_server/document_processors/unstructured.py @@ -2,8 +2,11 @@ import io import logging +import time +from collections.abc import Awaitable, Callable from typing import Any, Optional +import anyio import httpx from .base import DocumentProcessor, ProcessingResult, ProcessorError @@ -47,6 +50,7 @@ class UnstructuredProcessor(DocumentProcessor): timeout: int = 120, default_strategy: str = "auto", default_languages: Optional[list[str]] = None, + progress_interval: int = 10, ): """Initialize Unstructured processor. @@ -55,15 +59,18 @@ class UnstructuredProcessor(DocumentProcessor): timeout: Request timeout in seconds (default: 120) default_strategy: Default parsing strategy - "auto", "fast", or "hi_res" default_languages: Default OCR language codes (e.g., ["eng", "deu"]) + progress_interval: Seconds between progress updates (default: 10) """ self.api_url = api_url self.timeout = timeout self.default_strategy = default_strategy self.default_languages = default_languages or ["eng"] + self.progress_interval = progress_interval logger.info( f"Initialized UnstructuredProcessor: {api_url}, " - f"strategy={default_strategy}, languages={self.default_languages}" + f"strategy={default_strategy}, languages={self.default_languages}, " + f"progress_interval={progress_interval}s" ) @property @@ -74,23 +81,65 @@ class UnstructuredProcessor(DocumentProcessor): def supported_mime_types(self) -> set[str]: return self.SUPPORTED_TYPES - async def process( + async def _run_progress_poller( + self, + stop_event: anyio.Event, + progress_callback: Callable[ + [float, Optional[float], Optional[str]], Awaitable[None] + ], + start_time: float, + ): + """Run progress poller that reports status every N seconds. + + Args: + stop_event: Event to signal when processing is complete + progress_callback: Async callback to report progress + start_time: Time when processing started (from time.time()) + """ + logger.debug("Starting progress poller") + while not stop_event.is_set(): + try: + # Wait for the event to be set, with a timeout equal to progress_interval + with anyio.fail_after(self.progress_interval): + await stop_event.wait() + # If wait() finished, the event was set (processing complete) + break + except TimeoutError: + # Timeout occurred - time to send a progress update + if not stop_event.is_set(): # Double-check in case of race condition + elapsed = int(time.time() - start_time) + message = ( + f"Processing document with unstructured... ({elapsed}s elapsed)" + ) + try: + await progress_callback( + progress=float(elapsed), + total=None, # Unknown total duration + message=message, + ) + logger.debug(f"Progress update sent: {elapsed}s elapsed") + except Exception as e: + logger.warning(f"Failed to send progress update: {e}") + logger.debug("Progress poller stopped") + + async def _make_api_request( self, content: bytes, content_type: str, - filename: Optional[str] = None, - options: Optional[dict[str, Any]] = None, + filename: Optional[str], + strategy: str, + languages: list[str], + extract_image_block_types: Optional[list[str]], ) -> ProcessingResult: - """Process document via Unstructured API. + """Make the actual API request to Unstructured. Args: content: Document bytes content_type: MIME type - filename: Optional filename for format detection - options: Processing options: - - strategy: "auto", "fast", or "hi_res" (default: from init) - - languages: List of language codes (default: from init) - - extract_image_block_types: Types of image elements to extract + filename: Optional filename + strategy: Processing strategy + languages: OCR languages + extract_image_block_types: Image element types to extract Returns: ProcessingResult with extracted text and metadata @@ -98,13 +147,6 @@ class UnstructuredProcessor(DocumentProcessor): Raises: ProcessorError: If processing fails """ - options = options or {} - - # Extract options with defaults - strategy = options.get("strategy", self.default_strategy) - languages = options.get("languages", self.default_languages) - extract_image_block_types = options.get("extract_image_block_types") - # Prepare multipart request files = { "files": ( @@ -178,6 +220,81 @@ class UnstructuredProcessor(DocumentProcessor): logger.error(f"Unstructured API processing failed: {e}") raise ProcessorError(f"Processing failed: {str(e)}") from e + async def process( + self, + content: bytes, + content_type: str, + filename: Optional[str] = None, + options: Optional[dict[str, Any]] = None, + progress_callback: Optional[ + Callable[[float, Optional[float], Optional[str]], Awaitable[None]] + ] = None, + ) -> ProcessingResult: + """Process document via Unstructured API. + + Args: + content: Document bytes + content_type: MIME type + filename: Optional filename for format detection + options: Processing options: + - strategy: "auto", "fast", or "hi_res" (default: from init) + - languages: List of language codes (default: from init) + - extract_image_block_types: Types of image elements to extract + progress_callback: Optional async callback for progress updates + + Returns: + ProcessingResult with extracted text and metadata + + Raises: + ProcessorError: If processing fails + """ + options = options or {} + + # Extract options with defaults + strategy = options.get("strategy", self.default_strategy) + languages = options.get("languages", self.default_languages) + extract_image_block_types = options.get("extract_image_block_types") + + # If no progress callback, just make the request directly + if progress_callback is None: + return await self._make_api_request( + content=content, + content_type=content_type, + filename=filename, + strategy=strategy, + languages=languages, + extract_image_block_types=extract_image_block_types, + ) + + # With progress callback: run API request + progress poller concurrently + stop_event = anyio.Event() + start_time = time.time() + result = None + + async def capture_result(): + nonlocal result + try: + result = await self._make_api_request( + content=content, + content_type=content_type, + filename=filename, + strategy=strategy, + languages=languages, + extract_image_block_types=extract_image_block_types, + ) + finally: + # Signal poller to stop after API request completes + stop_event.set() + + # Run both tasks concurrently using anyio task groups + async with anyio.create_task_group() as tg: + tg.start_soon(capture_result) + tg.start_soon( + self._run_progress_poller, stop_event, progress_callback, start_time + ) + + return result + async def health_check(self) -> bool: """Check if Unstructured API is available. diff --git a/nextcloud_mcp_server/server/webdav.py b/nextcloud_mcp_server/server/webdav.py index 670318c..eae3292 100644 --- a/nextcloud_mcp_server/server/webdav.py +++ b/nextcloud_mcp_server/server/webdav.py @@ -85,7 +85,10 @@ def configure_webdav_tools(mcp: FastMCP): try: logger.info(f"Parsing document '{path}' of type '{content_type}'") parsed_text, metadata = await parse_document( - content, content_type, filename=path + content, + content_type, + filename=path, + progress_callback=ctx.report_progress, ) return { "path": path, diff --git a/nextcloud_mcp_server/utils/document_parser.py b/nextcloud_mcp_server/utils/document_parser.py index 0095069..887e96c 100644 --- a/nextcloud_mcp_server/utils/document_parser.py +++ b/nextcloud_mcp_server/utils/document_parser.py @@ -2,6 +2,7 @@ import base64 import logging +from collections.abc import Awaitable, Callable from typing import Optional, Tuple from nextcloud_mcp_server.config import get_document_processor_config @@ -35,7 +36,12 @@ def is_parseable_document(content_type: Optional[str]) -> bool: async def parse_document( - content: bytes, content_type: Optional[str], filename: Optional[str] = None + content: bytes, + content_type: Optional[str], + filename: Optional[str] = None, + progress_callback: Optional[ + Callable[[float, Optional[float], Optional[str]], Awaitable[None]] + ] = None, ) -> Tuple[str, dict]: """Parse a document using registered processors. @@ -46,6 +52,7 @@ async def parse_document( content: The document content as bytes content_type: The MIME type of the document filename: Optional filename to help with format detection + progress_callback: Optional async callback for progress updates during long operations Returns: Tuple of (parsed_text, metadata) where: @@ -73,6 +80,7 @@ async def parse_document( content=content, content_type=content_type, filename=filename, + progress_callback=progress_callback, ) logger.info(f"Successfully parsed document with '{result.processor}' processor") diff --git a/tests/integration/client/test_document_processing_progress.py b/tests/integration/client/test_document_processing_progress.py new file mode 100644 index 0000000..d75827e --- /dev/null +++ b/tests/integration/client/test_document_processing_progress.py @@ -0,0 +1,143 @@ +"""Integration tests for document processing with progress notifications.""" + +import io + +import pytest +from PIL import Image + +pytestmark = pytest.mark.integration + + +class TestDocumentProcessingProgress: + """Test document processing with progress notifications.""" + + async def test_unstructured_processor_with_progress_callback(self, nc_client): + """Test that UnstructuredProcessor calls progress callback during processing.""" + import os + + # Skip if unstructured is not enabled + if os.getenv("ENABLE_UNSTRUCTURED", "false").lower() != "true": + pytest.skip("Unstructured processor not enabled") + + from nextcloud_mcp_server.document_processors.unstructured import ( + UnstructuredProcessor, + ) + + # Track progress callback invocations + progress_updates = [] + + async def track_progress(progress: float, total: float | None, message: str): + progress_updates.append( + {"progress": progress, "total": total, "message": message} + ) + + # Create processor configured to use local unstructured service + processor = UnstructuredProcessor( + api_url=os.getenv("UNSTRUCTURED_API_URL", "http://unstructured:8000"), + timeout=120, + progress_interval=2, # 2 second intervals for testing + ) + + # Create a simple test image (which requires OCR processing) + # This should take long enough to trigger at least one progress update + img = Image.new("RGB", (400, 200), color=(73, 109, 137)) + buffer = io.BytesIO() + img.save(buffer, format="PNG") + test_image = buffer.getvalue() + + # Process with progress callback + result = await processor.process( + content=test_image, + content_type="image/png", + filename="test.png", + progress_callback=track_progress, + ) + + # Verify processing succeeded + assert result.success is True + assert result.processor == "unstructured" + assert isinstance(result.text, str) + + # Note: Progress updates may or may not occur depending on processing speed + # If updates occurred, verify their structure + if progress_updates: + for update in progress_updates: + assert isinstance(update["progress"], float) + assert update["total"] is None # Unknown total + assert "Processing document with unstructured" in update["message"] + assert "elapsed" in update["message"] + + async def test_webdav_read_file_sends_progress_notifications( + self, nc_mcp_client, nc_client + ): + """Test that reading a document via WebDAV MCP tool sends progress notifications.""" + import os + + # Skip if document processing is not enabled + if os.getenv("ENABLE_DOCUMENT_PROCESSING", "false").lower() != "true": + pytest.skip("Document processing not enabled") + + # Create a test image file in Nextcloud via WebDAV + from PIL import Image + + img = Image.new("RGB", (400, 200), color=(100, 150, 200)) + buffer = io.BytesIO() + img.save(buffer, format="PNG") + test_image = buffer.getvalue() + + # Upload test file + test_path = "test_progress.png" + await nc_client.webdav.write_file(test_path, test_image, "image/png") + + try: + # Read file via MCP tool (which should trigger document processing) + # The MCP client will automatically track progress notifications + result = await nc_mcp_client.call_tool( + "nc_webdav_read_file", arguments={"path": test_path} + ) + + # Note: FastMCP progress notifications are sent automatically by ctx.report_progress + # We can't easily capture them in this test without mocking the MCP transport layer + # The important thing is that the code path is exercised without errors + assert result.isError is False + + finally: + # Cleanup + try: + await nc_client.webdav.delete_resource(test_path) + except Exception: + pass # Ignore cleanup errors + + async def test_progress_callback_not_required(self, nc_client): + """Test that processing works without progress callback (backward compatibility).""" + import os + + if os.getenv("ENABLE_UNSTRUCTURED", "false").lower() != "true": + pytest.skip("Unstructured processor not enabled") + + from nextcloud_mcp_server.document_processors.unstructured import ( + UnstructuredProcessor, + ) + + processor = UnstructuredProcessor( + api_url=os.getenv("UNSTRUCTURED_API_URL", "http://unstructured:8000"), + timeout=120, + ) + + # Create simple test image + img = Image.new("RGB", (200, 100), color=(50, 100, 150)) + buffer = io.BytesIO() + img.save(buffer, format="PNG") + test_image = buffer.getvalue() + + # Process WITHOUT progress callback + result = await processor.process( + content=test_image, + content_type="image/png", + filename="test.png", + progress_callback=None, # Explicitly None + ) + + # Should still work + assert result.success is True + assert result.processor == "unstructured" diff --git a/tests/unit/test_progress_notification.py b/tests/unit/test_progress_notification.py new file mode 100644 index 0000000..f3d07c8 --- /dev/null +++ b/tests/unit/test_progress_notification.py @@ -0,0 +1,164 @@ +"""Unit tests for progress notification system.""" + +import time +from unittest.mock import AsyncMock + +import anyio +import pytest + +pytestmark = pytest.mark.unit + + +class TestProgressNotification: + """Test progress notification in document processors.""" + + async def test_progress_callback_called_during_processing(self): + """Test that progress callback is called at intervals during processing.""" + from nextcloud_mcp_server.document_processors.unstructured import ( + UnstructuredProcessor, + ) + + # Mock progress callback to track calls + progress_callback = AsyncMock() + + # Create processor with 1-second interval for faster testing + processor = UnstructuredProcessor( + api_url="http://test:8000", + timeout=10, + progress_interval=1, + ) + + # Create a mock event and start time + stop_event = anyio.Event() + start_time = time.time() + + # Run the poller for 3 seconds, then stop it + async def stop_after_delay(): + await anyio.sleep(3.5) + stop_event.set() + + # Run poller and stopper concurrently + async with anyio.create_task_group() as tg: + tg.start_soon( + processor._run_progress_poller, + stop_event, + progress_callback, + start_time, + ) + tg.start_soon(stop_after_delay) + + # Verify progress callback was called at least 3 times (1s, 2s, 3s) + assert progress_callback.call_count >= 3 + + # Verify each call had correct structure + for call in progress_callback.call_args_list: + # Calls are made with keyword arguments + assert "progress" in call.kwargs + assert "total" in call.kwargs + assert "message" in call.kwargs + + progress = call.kwargs["progress"] + total = call.kwargs["total"] + message = call.kwargs["message"] + + assert isinstance(progress, float) + assert total is None # Unknown total for unstructured + assert "Processing document with unstructured" in message + assert "elapsed" in message + + async def test_progress_poller_stops_when_event_set(self): + """Test that progress poller stops immediately when event is set.""" + from nextcloud_mcp_server.document_processors.unstructured import ( + UnstructuredProcessor, + ) + + progress_callback = AsyncMock() + processor = UnstructuredProcessor( + api_url="http://test:8000", + timeout=10, + progress_interval=10, # Long interval + ) + + stop_event = anyio.Event() + start_time = time.time() + + # Set event immediately + stop_event.set() + + # Run poller + await processor._run_progress_poller(stop_event, progress_callback, start_time) + + # Should not call progress callback since event was already set + assert progress_callback.call_count == 0 + + async def test_progress_callback_exception_handled(self): + """Test that exceptions in progress callback don't crash the poller.""" + from nextcloud_mcp_server.document_processors.unstructured import ( + UnstructuredProcessor, + ) + + # Mock callback that raises exception + progress_callback = AsyncMock(side_effect=Exception("Callback error")) + + processor = UnstructuredProcessor( + api_url="http://test:8000", + timeout=10, + progress_interval=1, + ) + + stop_event = anyio.Event() + start_time = time.time() + + # Run poller for 2 seconds + async def stop_after_delay(): + await anyio.sleep(2.5) + stop_event.set() + + # Should not raise exception even though callback fails + async with anyio.create_task_group() as tg: + tg.start_soon( + processor._run_progress_poller, + stop_event, + progress_callback, + start_time, + ) + tg.start_soon(stop_after_delay) + + # Callback should have been called (and failed) at least twice + assert progress_callback.call_count >= 2 + + async def test_process_without_progress_callback(self): + """Test that processing works without progress callback (backward compatibility).""" + from nextcloud_mcp_server.document_processors.unstructured import ( + UnstructuredProcessor, + ) + + processor = UnstructuredProcessor( + api_url="http://test:8000", + timeout=10, + progress_interval=1, + ) + + # Mock the _make_api_request method to avoid actual HTTP call + from unittest.mock import patch + + from nextcloud_mcp_server.document_processors.base import ProcessingResult + + mock_result = ProcessingResult( + text="Test content", + metadata={"test": "data"}, + processor="unstructured", + success=True, + ) + + with patch.object( + processor, "_make_api_request", return_value=mock_result + ) as mock_request: + # Call process without progress_callback + result = await processor.process( + content=b"test", content_type="application/pdf", progress_callback=None + ) + + # Should call _make_api_request directly + assert result == mock_result + mock_request.assert_called_once()