311 lines
11 KiB
Python
311 lines
11 KiB
Python
"""Document processor using Unstructured.io API."""
|
|
|
|
import io
|
|
import logging
|
|
import time
|
|
from collections.abc import Awaitable, Callable
|
|
from typing import Any, Optional
|
|
|
|
import anyio
|
|
import httpx
|
|
|
|
from .base import DocumentProcessor, ProcessingResult, ProcessorError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class UnstructuredProcessor(DocumentProcessor):
|
|
"""Document processor using Unstructured.io API.
|
|
|
|
The Unstructured API provides document parsing capabilities for various formats
|
|
including PDF, DOCX, images with OCR, and more.
|
|
|
|
API Documentation: https://docs.unstructured.io/api-reference/api-services/api-parameters
|
|
"""
|
|
|
|
# Supported MIME types for Unstructured
|
|
SUPPORTED_TYPES = {
|
|
"application/pdf",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/msword",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"application/vnd.ms-powerpoint",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.ms-excel",
|
|
"application/rtf",
|
|
"text/rtf",
|
|
"application/vnd.oasis.opendocument.text",
|
|
"application/epub+zip",
|
|
"message/rfc822",
|
|
"application/vnd.ms-outlook",
|
|
"image/jpeg",
|
|
"image/png",
|
|
"image/tiff",
|
|
"image/bmp",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
api_url: str,
|
|
timeout: int = 120,
|
|
default_strategy: str = "auto",
|
|
default_languages: Optional[list[str]] = None,
|
|
progress_interval: int = 10,
|
|
):
|
|
"""Initialize Unstructured processor.
|
|
|
|
Args:
|
|
api_url: Unstructured API endpoint
|
|
timeout: Request timeout in seconds (default: 120)
|
|
default_strategy: Default parsing strategy - "auto", "fast", or "hi_res"
|
|
default_languages: Default OCR language codes (e.g., ["eng", "deu"])
|
|
progress_interval: Seconds between progress updates (default: 10)
|
|
"""
|
|
self.api_url = api_url
|
|
self.timeout = timeout
|
|
self.default_strategy = default_strategy
|
|
self.default_languages = default_languages or ["eng"]
|
|
self.progress_interval = progress_interval
|
|
|
|
logger.info(
|
|
f"Initialized UnstructuredProcessor: {api_url}, "
|
|
f"strategy={default_strategy}, languages={self.default_languages}, "
|
|
f"progress_interval={progress_interval}s"
|
|
)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "unstructured"
|
|
|
|
@property
|
|
def supported_mime_types(self) -> set[str]:
|
|
return self.SUPPORTED_TYPES
|
|
|
|
async def _run_progress_poller(
|
|
self,
|
|
stop_event: anyio.Event,
|
|
progress_callback: Callable[
|
|
[float, Optional[float], Optional[str]], Awaitable[None]
|
|
],
|
|
start_time: float,
|
|
):
|
|
"""Run progress poller that reports status every N seconds.
|
|
|
|
Args:
|
|
stop_event: Event to signal when processing is complete
|
|
progress_callback: Async callback to report progress
|
|
start_time: Time when processing started (from time.time())
|
|
"""
|
|
logger.debug("Starting progress poller")
|
|
while not stop_event.is_set():
|
|
try:
|
|
# Wait for the event to be set, with a timeout equal to progress_interval
|
|
with anyio.fail_after(self.progress_interval):
|
|
await stop_event.wait()
|
|
# If wait() finished, the event was set (processing complete)
|
|
break
|
|
except TimeoutError:
|
|
# Timeout occurred - time to send a progress update
|
|
if not stop_event.is_set(): # Double-check in case of race condition
|
|
elapsed = int(time.time() - start_time)
|
|
message = (
|
|
f"Processing document with unstructured... ({elapsed}s elapsed)"
|
|
)
|
|
try:
|
|
await progress_callback( # type: ignore
|
|
progress=float(elapsed), # type: ignore
|
|
total=None, # Unknown total duration # type: ignore
|
|
message=message, # type: ignore
|
|
)
|
|
logger.debug(f"Progress update sent: {elapsed}s elapsed")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send progress update: {e}")
|
|
logger.debug("Progress poller stopped")
|
|
|
|
async def _make_api_request(
|
|
self,
|
|
content: bytes,
|
|
content_type: str,
|
|
filename: Optional[str],
|
|
strategy: str,
|
|
languages: list[str],
|
|
extract_image_block_types: Optional[list[str]],
|
|
) -> ProcessingResult:
|
|
"""Make the actual API request to Unstructured.
|
|
|
|
Args:
|
|
content: Document bytes
|
|
content_type: MIME type
|
|
filename: Optional filename
|
|
strategy: Processing strategy
|
|
languages: OCR languages
|
|
extract_image_block_types: Image element types to extract
|
|
|
|
Returns:
|
|
ProcessingResult with extracted text and metadata
|
|
|
|
Raises:
|
|
ProcessorError: If processing fails
|
|
"""
|
|
# Prepare multipart request
|
|
files = {
|
|
"files": (
|
|
filename or "document",
|
|
io.BytesIO(content),
|
|
content_type or "application/octet-stream",
|
|
)
|
|
}
|
|
|
|
data = {
|
|
"strategy": strategy,
|
|
"languages": ",".join(languages),
|
|
}
|
|
|
|
if extract_image_block_types:
|
|
data["extract_image_block_types"] = ",".join(extract_image_block_types)
|
|
|
|
logger.debug(
|
|
f"Processing with Unstructured API: strategy={strategy}, languages={languages}"
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
f"{self.api_url}/general/v0/general",
|
|
files=files,
|
|
data=data,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse response
|
|
elements = response.json()
|
|
|
|
# Extract text and metadata
|
|
texts = []
|
|
element_types: dict[str, int] = {}
|
|
|
|
for element in elements:
|
|
if "text" in element and element["text"]:
|
|
texts.append(element["text"])
|
|
|
|
el_type = element.get("type", "unknown")
|
|
element_types[el_type] = element_types.get(el_type, 0) + 1
|
|
|
|
parsed_text = "\n\n".join(texts)
|
|
|
|
metadata = {
|
|
"element_count": len(elements),
|
|
"text_length": len(parsed_text),
|
|
"element_types": element_types,
|
|
"strategy": strategy,
|
|
"languages": languages,
|
|
}
|
|
|
|
logger.debug(
|
|
f"Successfully processed: {len(elements)} elements, "
|
|
f"{len(parsed_text)} characters"
|
|
)
|
|
|
|
return ProcessingResult(
|
|
text=parsed_text,
|
|
metadata=metadata,
|
|
processor=self.name,
|
|
success=True,
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"Unstructured API HTTP error: {e}")
|
|
raise ProcessorError(f"HTTP error: {str(e)}") from e
|
|
except Exception as e:
|
|
logger.error(f"Unstructured API processing failed: {e}")
|
|
raise ProcessorError(f"Processing failed: {str(e)}") from e
|
|
|
|
async def process(
|
|
self,
|
|
content: bytes,
|
|
content_type: str,
|
|
filename: Optional[str] = None,
|
|
options: Optional[dict[str, Any]] = None,
|
|
progress_callback: Optional[
|
|
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
|
|
] = None,
|
|
) -> ProcessingResult:
|
|
"""Process document via Unstructured API.
|
|
|
|
Args:
|
|
content: Document bytes
|
|
content_type: MIME type
|
|
filename: Optional filename for format detection
|
|
options: Processing options:
|
|
- strategy: "auto", "fast", or "hi_res" (default: from init)
|
|
- languages: List of language codes (default: from init)
|
|
- extract_image_block_types: Types of image elements to extract
|
|
progress_callback: Optional async callback for progress updates
|
|
|
|
Returns:
|
|
ProcessingResult with extracted text and metadata
|
|
|
|
Raises:
|
|
ProcessorError: If processing fails
|
|
"""
|
|
options = options or {}
|
|
|
|
# Extract options with defaults
|
|
strategy = options.get("strategy", self.default_strategy)
|
|
languages = options.get("languages", self.default_languages)
|
|
extract_image_block_types = options.get("extract_image_block_types")
|
|
|
|
# If no progress callback, just make the request directly
|
|
if progress_callback is None:
|
|
return await self._make_api_request(
|
|
content=content,
|
|
content_type=content_type,
|
|
filename=filename,
|
|
strategy=strategy,
|
|
languages=languages,
|
|
extract_image_block_types=extract_image_block_types,
|
|
)
|
|
|
|
# With progress callback: run API request + progress poller concurrently
|
|
stop_event = anyio.Event()
|
|
start_time = time.time()
|
|
result = None
|
|
|
|
async def capture_result():
|
|
nonlocal result
|
|
try:
|
|
result = await self._make_api_request(
|
|
content=content,
|
|
content_type=content_type,
|
|
filename=filename,
|
|
strategy=strategy,
|
|
languages=languages,
|
|
extract_image_block_types=extract_image_block_types,
|
|
)
|
|
finally:
|
|
# Signal poller to stop after API request completes
|
|
stop_event.set()
|
|
|
|
# Run both tasks concurrently using anyio task groups
|
|
async with anyio.create_task_group() as tg:
|
|
tg.start_soon(capture_result)
|
|
tg.start_soon(
|
|
self._run_progress_poller, stop_event, progress_callback, start_time
|
|
)
|
|
|
|
return result # type: ignore
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Check if Unstructured API is available.
|
|
|
|
Returns:
|
|
True if API is healthy, False otherwise
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
response = await client.get(f"{self.api_url}/healthcheck")
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.warning(f"Unstructured health check failed: {e}")
|
|
return False
|