feat(news): add Nextcloud News app integration

Add full integration for the Nextcloud News (RSS/Atom reader) app:

- Add NewsClient with complete CRUD operations for folders, feeds, and items
- Add 8 read-only MCP tools for listing/getting folders, feeds, items
- Add Pydantic models for News entities with camelCase alias support
- Add vector sync support for starred + unread items
- Add HTML to Markdown converter using markdownify for better embeddings
- Add Docker post-install hook to enable News app
- Add 25 unit tests for NewsClient API methods

Vector sync indexes starred and unread items, providing a balanced approach
that captures important (starred) and current (unread) content without
indexing the entire article history.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-29 14:30:23 +01:00
parent 1b1667bc2b
commit a33f6a2f15
15 changed files with 2055 additions and 7 deletions
+209 -1
View File
@@ -544,9 +544,217 @@ async def scan_user_documents(
queued += file_queued
# Scan News items (starred + unread)
news_queued = 0
try:
news_queued = await scan_news_items(
user_id=user_id,
send_stream=send_stream,
nc_client=nc_client,
initial_sync=initial_sync,
scan_id=scan_id,
)
queued += news_queued
except Exception as e:
logger.warning(f"Failed to scan news items for {user_id}: {e}")
if queued > 0:
logger.info(
f"Sent {queued} documents ({file_queued} files) for incremental sync: {user_id}"
f"Sent {queued} documents ({file_queued} files, {news_queued} news items) for incremental sync: {user_id}"
)
else:
logger.debug(f"No changes detected for {user_id}")
async def scan_news_items(
user_id: str,
send_stream: MemoryObjectSendStream[DocumentTask],
nc_client: NextcloudClient,
initial_sync: bool,
scan_id: int,
) -> int:
"""
Scan user's News items (starred + unread) and queue changed items.
Indexes starred and unread items for semantic search. This provides
a balanced approach - important items (starred) and current items
(unread) are searchable, while avoiding indexing the entire history.
Args:
user_id: User to scan
send_stream: Stream to send changed documents to processors
nc_client: Authenticated Nextcloud client
initial_sync: If True, send all documents (first-time sync)
scan_id: Scan identifier for logging
Returns:
Number of items queued for processing
"""
from nextcloud_mcp_server.client.news import NewsItemType
settings = get_settings()
queued = 0
# Get indexed news item IDs from Qdrant (for deletion tracking)
indexed_item_ids: set[str] = set()
if not initial_sync:
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="news_item")),
]
),
with_payload=["doc_id"],
with_vectors=False,
limit=10000,
)
indexed_item_ids = {point.payload["doc_id"] for point in scroll_result[0]}
logger.debug(f"Found {len(indexed_item_ids)} indexed news items in Qdrant")
# Fetch starred items (type=STARRED)
starred_items = await nc_client.news.get_items(
batch_size=-1, # Get all
type_=NewsItemType.STARRED,
get_read=True, # Include read starred items
)
logger.debug(f"[SCAN-{scan_id}] Found {len(starred_items)} starred news items")
# Fetch unread items (type=ALL, get_read=False)
unread_items = await nc_client.news.get_items(
batch_size=-1,
type_=NewsItemType.ALL,
get_read=False, # Only unread
)
logger.debug(f"[SCAN-{scan_id}] Found {len(unread_items)} unread news items")
# Combine and deduplicate (an item can be both starred and unread)
items_by_id: dict[int, dict] = {}
for item in starred_items:
items_by_id[item["id"]] = item
for item in unread_items:
items_by_id[item["id"]] = item
item_count = len(items_by_id)
nextcloud_item_ids: set[str] = set()
for item_id, item in items_by_id.items():
doc_id = str(item_id)
nextcloud_item_ids.add(doc_id)
# Use lastModified timestamp (microseconds in News API)
modified_at = item.get("lastModified", 0)
# Convert to seconds if needed (News API uses microseconds)
if modified_at > 10000000000: # > year 2286 in seconds
modified_at = modified_at // 1000000
if initial_sync:
# Send everything on first sync - write placeholder first
await write_placeholder_point(
doc_id=doc_id,
doc_type="news_item",
user_id=user_id,
modified_at=modified_at,
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="news_item",
operation="index",
modified_at=modified_at,
)
)
queued += 1
else:
# Incremental sync: check if item exists and compare modified_at
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"News item {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Query Qdrant for existing entry
existing_metadata = await query_document_metadata(
doc_id=doc_id, doc_type="news_item", user_id=user_id
)
needs_indexing = False
if existing_metadata is None:
needs_indexing = True
elif existing_metadata.get("modified_at", 0) < modified_at:
needs_indexing = True
elif existing_metadata.get("is_placeholder", False):
queued_at = existing_metadata.get("queued_at", 0)
placeholder_age = time.time() - queued_at
stale_threshold = settings.vector_sync_scan_interval * 5
if placeholder_age > stale_threshold:
logger.debug(
f"Found stale placeholder for news item {doc_id} "
f"(age={placeholder_age:.1f}s), requeuing"
)
needs_indexing = True
if needs_indexing:
await write_placeholder_point(
doc_id=doc_id,
doc_type="news_item",
user_id=user_id,
modified_at=modified_at,
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="news_item",
operation="index",
modified_at=modified_at,
)
)
queued += 1
logger.info(
f"[SCAN-{scan_id}] Found {item_count} news items (starred+unread) for {user_id}"
)
record_vector_sync_scan(item_count)
# Check for deleted items (not initial sync)
# Items become "deleted" when they are no longer starred AND become read
if not initial_sync:
grace_period = settings.vector_sync_scan_interval * 1.5
current_time = time.time()
for doc_id in indexed_item_ids:
if doc_id not in nextcloud_item_ids:
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
first_missing_time = _potentially_deleted[doc_key]
time_missing = current_time - first_missing_time
if time_missing >= grace_period:
logger.info(
f"News item {doc_id} missing for {time_missing:.1f}s "
f"(>{grace_period:.1f}s grace period), sending deletion"
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="news_item",
operation="delete",
modified_at=0,
)
)
queued += 1
del _potentially_deleted[doc_key]
else:
logger.debug(
f"News item {doc_id} missing for first time, starting grace period"
)
_potentially_deleted[doc_key] = current_time
return queued