refactor: Simplify PDF text extraction with single to_markdown call

Replace parallel per-page extraction with single to_markdown(page_chunks=True)
call. This is more efficient as pymupdf4llm can optimize internally for
full-document processing instead of making N separate calls for N pages.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-22 03:52:02 +01:00
parent 31fade9730
commit ba8a53803a
@@ -120,26 +120,39 @@ class PyMuPDFProcessor(DocumentProcessor):
pdf_image_dir = self.image_dir / pdf_id
pdf_image_dir.mkdir(exist_ok=True, parents=True)
# OPTIMIZATION: Extract pages in parallel using anyio task group
page_texts = await self._extract_pages_parallel(
doc, page_count, pdf_image_dir
# Extract all pages in a single call with page_chunks=True
def do_extract() -> list[dict[str, Any]]:
# When page_chunks=True, to_markdown returns list[dict] not str
return pymupdf4llm.to_markdown( # type: ignore[return-value]
doc,
write_images=self.extract_images,
image_path=pdf_image_dir if self.extract_images else None,
page_chunks=True,
)
page_chunks: list[dict[str, Any]] = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
do_extract
)
if progress_callback:
await progress_callback(90, 100, "Building result")
# Calculate page boundaries (sequential, fast)
page_boundaries = []
# Extract page texts and build boundaries from chunks
page_texts: list[str] = []
page_boundaries: list[dict[str, Any]] = []
current_offset = 0
for page_num, page_md in enumerate(page_texts):
for chunk in page_chunks:
text = chunk.get("text", "")
page_num = chunk.get("metadata", {}).get("page", len(page_texts) + 1)
page_texts.append(text)
page_boundaries.append(
{
"page": page_num + 1,
"page": page_num,
"start_offset": current_offset,
"end_offset": current_offset + len(page_md),
"end_offset": current_offset + len(text),
}
)
current_offset += len(page_md)
current_offset += len(text)
# Collect image paths
image_paths = []
@@ -178,54 +191,6 @@ class PyMuPDFProcessor(DocumentProcessor):
logger.error(error_msg, exc_info=True)
raise ProcessorError(error_msg) from e
async def _extract_pages_parallel(
self,
doc: pymupdf.Document,
page_count: int,
pdf_image_dir: pathlib.Path | None,
) -> list[str]:
"""Extract text from all pages in parallel using anyio.
Args:
doc: Opened PyMuPDF document
page_count: Number of pages to extract
pdf_image_dir: Directory for extracted images (or None)
Returns:
List of page texts in order
"""
import anyio
results: list[str | None] = [None] * page_count
async def extract_one(page_num: int) -> None:
"""Extract single page in thread pool."""
def do_extract() -> str:
return pymupdf4llm.to_markdown(
doc,
pages=[page_num],
write_images=self.extract_images,
image_path=pdf_image_dir if self.extract_images else None,
page_chunks=False,
)
results[page_num] = await anyio.to_thread.run_sync(do_extract) # type: ignore[attr-defined]
# Run all page extractions in parallel
async with anyio.create_task_group() as tg:
for page_num in range(page_count):
tg.start_soon(extract_one, page_num)
# Verify all pages extracted
final_results: list[str] = []
for i, text in enumerate(results):
if text is None:
raise ProcessorError(f"Page {i} extraction failed")
final_results.append(text)
return final_results
def _extract_metadata(
self, doc: pymupdf.Document, filename: Optional[str]
) -> dict[str, Any]: