Compare commits

...

20 Commits

Author SHA1 Message Date
github-actions[bot] 9e4c20a4b1 bump: version 0.14.0 → 0.14.1 2025-10-15 15:26:35 +00:00
Chris Coutinho f26bca13f1 Merge pull request #211 from cbcoutinho/feature/docs-oauth
fix(oauth): Remove the option to force_register new clients
2025-10-15 17:26:09 +02:00
Chris Coutinho 46c6f2f294 test: Fix oauth tests by reusing callback server 2025-10-15 17:06:46 +02:00
Chris Coutinho 3ad9198f36 fix(oauth): Remove the option to force_register new clients 2025-10-15 16:27:22 +02:00
Chris Coutinho dafac734e6 docs: Update README 2025-10-15 14:51:36 +02:00
Chris Coutinho 97bbc18121 docs: Update README
Add comparison to the Nextcloud Assistant & Context Agent
2025-10-15 14:47:43 +02:00
github-actions[bot] 46deb0f726 bump: version 0.13.0 → 0.14.0 2025-10-15 09:53:45 +00:00
Chris Coutinho daacf08a54 Merge pull request #208 from cbcoutinho/feature/user-api
Feature/user api
2025-10-15 11:53:20 +02:00
Chris Coutinho cc2a5c9d58 test: Inc delay for alice 2025-10-15 11:36:54 +02:00
Chris Coutinho 26f8deff17 test: Increase stagger delay 0.5 -> 2s 2025-10-15 11:07:06 +02:00
Chris Coutinho fb3063e94e test: Increase callback timeout 10s -> 30s 2025-10-15 10:57:21 +02:00
Chris Coutinho 83f89e9394 chore: Update CLAUDE.md 2025-10-15 10:36:27 +02:00
Chris Coutinho 5db02313a1 test: Update share client to fix test, update passwords 2025-10-15 10:35:22 +02:00
Chris Coutinho b50e212f05 test: Add tests for sharing/groups 2025-10-15 03:46:01 +02:00
Chris Coutinho 85f8522085 feat: Add Groups API client 2025-10-15 03:43:25 +02:00
Chris Coutinho a38c795124 feat: add sharing API client and server tools 2025-10-15 02:59:26 +02:00
Chris Coutinho 7004104873 test: Fix multi-user tests 2025-10-15 02:11:17 +02:00
Chris Coutinho 7a4a31b52d fix: Update user/groups API to OCS v2 2025-10-15 00:05:22 +02:00
Chris Coutinho 898c2e72ae Merge remote-tracking branch 'origin/master' into feature/user-api 2025-10-14 23:43:03 +02:00
Chris Coutinho 961f23b5ea feat(users): Initialize user API client 2025-09-11 09:42:42 +02:00
29 changed files with 4873 additions and 212 deletions
+3 -1
View File
@@ -4,4 +4,6 @@ __pycache__/
*.env
.env.local
.env.*.local
.nextcloud_oauth_test_client.json
# Generated by pytest used to login users
.nextcloud_oauth_shared_test_client.json
+18
View File
@@ -1,3 +1,21 @@
## v0.14.1 (2025-10-15)
### Fix
- **oauth**: Remove the option to force_register new clients
## v0.14.0 (2025-10-15)
### Feat
- Add Groups API client
- add sharing API client and server tools
- **users**: Initialize user API client
### Fix
- Update user/groups API to OCS v2
## v0.13.0 (2025-10-13)
### Feat
+35 -12
View File
@@ -38,13 +38,21 @@ mcp run --transport sse nextcloud_mcp_server.app:mcp
# Docker development environment with Nextcloud instance
docker-compose up
# After code changes, rebuild and restart only the MCP server container
# After code changes, rebuild and restart the appropriate MCP server container:
# For basic auth changes (most common) - uses admin credentials
docker-compose up --build -d mcp
# For OAuth changes - uses OAuth authentication flow
docker-compose up --build -d mcp-oauth
# Build Docker image
docker build -t nextcloud-mcp-server .
```
**Important: Two MCP Server Containers**
- **`mcp`** (port 8000): Uses basic auth with admin credentials. Use this for most development and testing.
- **`mcp-oauth`** (port 8001): Uses OAuth authentication. Only use this when working on OAuth-specific features or tests.
### Environment Setup
```bash
# Install dependencies
@@ -96,18 +104,23 @@ Each Nextcloud app has a corresponding server module that:
### Testing Structure
- **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions
- **Integration tests** in `tests/integration/` and `tests/client/`, `tests/server/` - Test real Nextcloud API interactions
- **Fixtures** in `tests/conftest.py` - Shared test setup and utilities
- Tests are marked with `@pytest.mark.integration` for selective running
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
- **Important**: Integration tests run against live Docker containers. After making code changes:
- For basic auth tests: rebuild with `docker-compose up --build -d mcp`
- For OAuth tests: rebuild with `docker-compose up --build -d mcp-oauth`
#### Testing Best Practices
- **MANDATORY: Always run tests after implementing features or fixing bugs**
- Run tests to completion before considering any task complete
- If tests require modifications to pass, ask for permission before proceeding
- Use `docker-compose up --build -d mcp` to rebuild MCP container after code changes
- **Rebuild the correct container** after code changes:
- For basic auth tests (most common): `docker-compose up --build -d mcp`
- For OAuth tests: `docker-compose up --build -d mcp-oauth`
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
- `nc_mcp_client` - MCP client session for tool/resource testing
- `nc_mcp_client` - MCP client session for tool/resource testing (uses `mcp` container)
- `nc_mcp_oauth_client` - MCP client session for OAuth testing (uses `mcp-oauth` container)
- `nc_client` - Direct NextcloudClient for setup/cleanup operations
- `temporary_note` - Creates and cleans up test notes automatically
- `temporary_addressbook` - Creates and cleans up test address books
@@ -115,15 +128,22 @@ Each Nextcloud app has a corresponding server module that:
- **Test specific functionality** after changes:
- For Notes changes: `uv run pytest tests/integration/test_mcp.py -k "notes" -v`
- For specific API changes: `uv run pytest tests/integration/test_notes_api.py -v`
- For OAuth changes: `uv run pytest tests/server/test_oauth*.py -v` (remember to rebuild `mcp-oauth` container)
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
#### OAuth/OIDC Testing
OAuth integration tests support both **automated** (Playwright) and **interactive** authentication flows:
**Automated Testing (Default - Recommended for CI/CD):**
- **Default fixtures**: `nc_oauth_client`, `nc_mcp_oauth_client` now use Playwright automation by default
- **Default fixtures**: `nc_oauth_client`, `nc_mcp_oauth_client` use Playwright automation
- Uses Playwright headless browser automation to complete OAuth flow programmatically
- **Shared OAuth Client**: All test users authenticate using a single OAuth client
- Stored in `.nextcloud_oauth_shared_test_client.json`
- Matches production MCP server behavior
- Each user gets their own unique access token
- Implementation: `shared_oauth_client_credentials` fixture in `tests/conftest.py:812`
- All Playwright fixtures: `playwright_oauth_token`, `nc_oauth_client`, `nc_mcp_oauth_client`, `nc_oauth_client_playwright`, `nc_mcp_oauth_client_playwright`
- Multi-user fixtures: `alice_oauth_token`, `bob_oauth_token`, `charlie_oauth_token`, `diana_oauth_token`
- Requires: `NEXTCLOUD_HOST`, `NEXTCLOUD_USERNAME`, `NEXTCLOUD_PASSWORD` environment variables
- Uses `pytest-playwright-asyncio` for async Playwright fixtures
- Playwright configuration: Use pytest CLI args like `--browser firefox --headed` to customize
@@ -131,13 +151,13 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
- Example:
```bash
# Run all OAuth tests with automated Playwright flow using Firefox
uv run pytest tests/integration/test_oauth*.py --browser firefox -v
uv run pytest tests/server/test_oauth*.py --browser firefox -v
# Run specific Playwright tests with visible browser for debugging
uv run pytest tests/integration/test_oauth_playwright.py --browser firefox --headed -v
uv run pytest tests/server/test_mcp_oauth.py --browser firefox --headed -v
# Run with Chromium (default)
uv run pytest tests/integration/test_oauth.py -v
uv run pytest tests/server/test_oauth*.py -v
```
**Interactive Testing (Manual browser login):**
@@ -149,13 +169,16 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
- Example:
```bash
# Run OAuth tests with interactive flow (will open browser and wait for manual login)
uv run pytest tests/integration/test_oauth_interactive.py -v
uv run pytest tests/client/test_oauth_interactive.py -v
```
**Test Environment Setup:**
- **Two MCP server containers are available:**
- `mcp` (port 8000): Uses basic auth with admin credentials - for most testing
- `mcp-oauth` (port 8001): Uses OAuth authentication - for OAuth-specific testing
- Start OAuth MCP server: `docker-compose up --build -d mcp-oauth`
- OAuth server runs on port 8001 (regular MCP on 8000)
- Both flows register OAuth clients dynamically using Nextcloud's OIDC provider
- **Important**: When working on OAuth functionality, always rebuild `mcp-oauth` container, not `mcp`
- OAuth client credentials cached in `.nextcloud_oauth_shared_test_client.json`
**CI/CD Considerations:**
- Interactive OAuth tests are automatically skipped when `GITHUB_ACTIONS` environment variable is set
+6
View File
@@ -6,6 +6,9 @@
The Nextcloud MCP (Model Context Protocol) server allows Large Language Models like Claude, GPT, and Gemini to interact with your Nextcloud data through a secure API. Create notes, manage calendars, organize contacts, work with files, and more - all through natural language.
> [!NOTE]
> **Nextcloud has two ways to enable AI access:** Nextcloud provides [Context Agent](https://github.com/nextcloud/context_agent), an AI agent backend that powers the [Assistant](https://github.com/nextcloud/assistant) app and allows AI to interact with Nextcloud apps like Calendar, Talk, and Contacts. Context Agent runs as an ExApp inside Nextcloud and also exposes an MCP server endpoint for external LLMs. This project (Nextcloud MCP Server) is a **dedicated standalone MCP server** designed specifically for external MCP clients like Claude Code and IDEs, with deep CRUD operations and OAuth support. See our [detailed comparison](docs/comparison-context-agent.md) to understand which approach fits your use case.
## Features
### Supported Nextcloud Apps
@@ -120,6 +123,9 @@ Or connect from:
- **[Authentication](docs/authentication.md)** - OAuth vs BasicAuth
- **[Running the Server](docs/running.md)** - Start and manage the server
### Architecture
- **[Comparison with Context Agent](docs/comparison-context-agent.md)** - How this MCP server differs from Nextcloud's Context Agent
### OAuth Documentation
- **[OAuth Quick Start](docs/quickstart-oauth.md)** - 5-minute setup guide
- **[OAuth Setup Guide](docs/oauth-setup.md)** - Production deployment
+1 -1
View File
@@ -63,7 +63,7 @@ services:
- 127.0.0.1:8001:8001
environment:
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.01:8001
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8001
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
# No USERNAME/PASSWORD - will use OAuth
volumes:
+698
View File
@@ -0,0 +1,698 @@
# MCP Server Comparison: Nextcloud MCP Server vs Context Agent
This document compares the two MCP server implementations in the Nextcloud ecosystem:
1. **Nextcloud MCP Server** (this project) - Standalone MCP server for external access to Nextcloud
2. **Context Agent MCP Server** - MCP server embedded within Nextcloud as an External App
## Executive Summary
Both projects expose Nextcloud functionality via the Model Context Protocol (MCP), but serve different purposes and audiences:
- **Nextcloud MCP Server**: Brings Nextcloud OUT to external MCP clients (Claude Code, etc.)
- **Context Agent**: Brings external MCP servers IN to Nextcloud's AI Assistant
## Architecture Overview
```mermaid
graph TB
subgraph External["External Clients"]
CC[Claude Code]
IDE[IDEs with MCP]
APP[Other MCP Clients]
end
subgraph NMCP["Nextcloud MCP Server<br/>(This Project)"]
NMCP_Server[FastMCP Server]
NMCP_Client[HTTP Clients]
NMCP_Auth[OAuth/BasicAuth]
end
subgraph NC["Nextcloud Instance"]
subgraph CA["Context Agent ExApp"]
CA_Agent[LangGraph Agent]
CA_MCP[MCP Server /mcp]
CA_Tools[Tool Loader]
end
NC_Apps[Nextcloud Apps<br/>Notes, Calendar, Files, etc.]
NC_Assistant[Assistant App]
end
subgraph ExtMCP["External MCP Servers"]
Weather[Weather MCP]
Other[Other Services]
end
%% External clients connect to standalone MCP server
CC --> NMCP_Server
IDE --> NMCP_Server
APP --> NMCP_Server
%% Standalone MCP server talks to Nextcloud over HTTP
NMCP_Server --> NMCP_Auth
NMCP_Auth --> NMCP_Client
NMCP_Client -->|HTTP/HTTPS| NC_Apps
%% Context Agent is inside Nextcloud
CA_Agent --> CA_Tools
CA_Tools --> NC_Apps
CA_MCP -->|Exposes to| NC_Assistant
NC_Assistant -->|User requests| CA_Agent
%% Context Agent can consume external MCP servers
CA_Tools -->|Consumes| ExtMCP
%% Context Agent could consume Nextcloud MCP Server
CA_Tools -.->|Could consume| NMCP_Server
classDef external fill:#e1f5ff
classDef standalone fill:#fff4e1
classDef internal fill:#e8f5e9
class CC,IDE,APP external
class NMCP_Server,NMCP_Client,NMCP_Auth standalone
class CA_Agent,CA_MCP,CA_Tools,NC_Apps,NC_Assistant internal
```
## Deployment Models
```mermaid
graph LR
subgraph Deploy1["Nextcloud MCP Server Deployment"]
direction TB
D1[Docker Container]
D2[Cloud VM]
D3[Local Machine]
D4[Kubernetes Pod]
end
subgraph Deploy2["Context Agent Deployment"]
direction TB
NC[Nextcloud Instance<br/>with AppAPI]
ExApp[External App Container<br/>Managed by Nextcloud]
end
Deploy1 -.->|HTTP/HTTPS| NC
ExApp -->|Integrated| NC
classDef deploy fill:#fff4e1
classDef integrated fill:#e8f5e9
class D1,D2,D3,D4 deploy
class NC,ExApp integrated
```
### Nextcloud MCP Server
- **Location**: Runs anywhere with network access to Nextcloud
- **Deployment**: Docker, VM, local machine, Kubernetes
- **Connection**: HTTP/HTTPS to Nextcloud APIs
- **Independence**: Fully standalone service
### Context Agent
- **Location**: Runs inside Nextcloud as External App
- **Deployment**: Managed by Nextcloud AppAPI
- **Connection**: Native nc-py-api integration
- **Integration**: Deep Nextcloud integration
## Authentication Architecture
```mermaid
graph TB
subgraph NMCP_Auth["Nextcloud MCP Server Authentication"]
direction TB
Client1[MCP Client]
subgraph BasicAuth["BasicAuth Mode"]
BA_Shared[Shared NextcloudClient]
BA_Creds[Username + Password]
end
subgraph OAuth["OAuth Mode"]
OAuth_Token[OAuth Token]
OAuth_Verify[Token Verifier]
OAuth_OIDC[OIDC Discovery]
OAuth_Client[Per-Request Client]
end
Client1 -->|Basic Auth| BasicAuth
Client1 -->|Bearer Token| OAuth
BA_Creds --> BA_Shared
OAuth_Token --> OAuth_Verify
OAuth_OIDC --> OAuth_Verify
OAuth_Verify --> OAuth_Client
end
subgraph CA_Auth["Context Agent Authentication"]
direction TB
Client2[MCP Client]
CA_Header[Authorization Header]
CA_OCS[OCS API Validation]
CA_User[User Context]
CA_NC[nc-py-api Client]
Client2 --> CA_Header
CA_Header --> CA_OCS
CA_OCS -->|Extract user_id| CA_User
CA_User -->|nc.set_user| CA_NC
end
classDef auth fill:#fff4e1
classDef user fill:#e1f5ff
class BasicAuth,OAuth auth
class CA_User user
```
## Tool Registration & Loading
```mermaid
sequenceDiagram
participant Startup
participant NMCP as Nextcloud MCP<br/>Server
participant CA as Context Agent
participant Request as Client Request
Note over Startup,NMCP: Nextcloud MCP Server (Static)
Startup->>NMCP: Server starts
NMCP->>NMCP: configure_notes_tools(mcp)
NMCP->>NMCP: configure_calendar_tools(mcp)
NMCP->>NMCP: configure_contacts_tools(mcp)
Note over NMCP: Tools registered once<br/>at startup
Request->>NMCP: Call tool
NMCP->>NMCP: Use pre-registered tool
Note over Startup,CA: Context Agent (Dynamic)
Startup->>CA: Server starts
CA->>CA: Install ToolListMiddleware
Request->>CA: List tools (or 60s elapsed)
CA->>CA: get_tools(nc)
CA->>CA: Import all_tools/*.py
CA->>CA: Call module.get_tools(nc)
CA->>CA: Regenerate tool functions
Note over CA: Tools refreshed every 60s<br/>or on demand
Request->>CA: Call tool
CA->>CA: Regenerate with fresh nc
```
## Tool Definition Patterns
### Nextcloud MCP Server
```python
# Static registration at startup
def configure_notes_tools(mcp: FastMCP):
@mcp.tool()
async def nc_notes_create_note(
title: str,
content: str,
category: str,
ctx: Context
) -> CreateNoteResponse:
"""Create a new note"""
client = get_client(ctx) # Auto-detects auth mode
note_data = await client.notes.create_note(
title=title,
content=content,
category=category
)
return CreateNoteResponse(
id=note_data["id"],
title=note_data["title"],
etag=note_data["etag"]
)
# Resources for structured data access
@mcp.resource("nc://Notes/{note_id}")
async def nc_get_note_resource(note_id: int):
"""Get user note using note id"""
ctx = mcp.get_context()
client = get_client(ctx)
note_data = await client.notes.get_note(note_id)
return Note(**note_data)
```
**Key Features**:
- Native FastMCP `@mcp.tool()` decorator
- Pydantic models for type safety
- MCP Resources support
- Comprehensive error handling with McpError
- Context-based client resolution
### Context Agent
```python
# Dynamic loading at runtime
async def get_tools(nc: Nextcloud):
@tool
@safe_tool
def list_calendars():
"""List all existing calendars by name"""
principal = nc.cal.principal()
calendars = principal.calendars()
return ", ".join([cal.name for cal in calendars])
@tool
@dangerous_tool
def schedule_event(
calendar_name: str,
title: str,
description: str,
start_date: str,
end_date: str,
attendees: list[str] | None,
start_time: str | None,
end_time: str | None
):
"""Create a new event or meeting in a calendar"""
# Parse dates and times
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
# ... event creation logic
principal = nc.cal.principal()
calendar = {cal.name: cal for cal in calendars}[calendar_name]
calendar.add_event(str(c))
return True
return [list_calendars, schedule_event, ...]
def get_category_name():
return "Calendar and Tasks"
def is_available(nc: Nextcloud):
return True # or check capabilities
```
**Key Features**:
- LangChain `@tool` decorator
- `@safe_tool` / `@dangerous_tool` decorators
- Dynamic tool regeneration with fresh context
- Tools returned as list from async function
- Availability checking per module
## Client Architecture
```mermaid
graph TB
subgraph NMCP_Client["Nextcloud MCP Server Clients"]
direction TB
NMCP_Main[NextcloudClient]
NMCP_Base[BaseNextcloudClient]
NMCP_Notes[NotesClient]
NMCP_Cal[CalendarClient]
NMCP_Contacts[ContactsClient]
NMCP_Tables[TablesClient]
NMCP_WebDAV[WebDAVClient]
NMCP_Deck[DeckClient]
NMCP_Main --> NMCP_Notes
NMCP_Main --> NMCP_Cal
NMCP_Main --> NMCP_Contacts
NMCP_Main --> NMCP_Tables
NMCP_Main --> NMCP_WebDAV
NMCP_Main --> NMCP_Deck
NMCP_Notes -.->|extends| NMCP_Base
NMCP_Cal -.->|extends| NMCP_Base
NMCP_Contacts -.->|extends| NMCP_Base
NMCP_Base --> HTTPX["httpx.AsyncClient"]
NMCP_Base --> Retry["@retry_on_429"]
end
subgraph CA_Client["Context Agent Client"]
direction TB
CA_NC["nc-py-api<br/>NextcloudApp"]
CA_NC --> CA_Cal["nc.cal<br/>CalDAV"]
CA_NC --> CA_Talk["nc.talk<br/>Talk API"]
CA_NC --> CA_OCS["nc.ocs<br/>OCS API"]
CA_NC --> CA_Session["nc._session<br/>HTTP Adapter"]
end
HTTPX -->|"HTTP/HTTPS"| NextcloudAPI["Nextcloud APIs"]
CA_Session -->|"HTTP/HTTPS"| NextcloudAPI
classDef custom fill:#fff4e1
classDef native fill:#e8f5e9
class NMCP_Main,NMCP_Base,NMCP_Notes,NMCP_Cal custom
class CA_NC,CA_Cal,CA_Talk,CA_OCS native
```
## Functionality Comparison
### Available Tools & Features
| Feature Category | Nextcloud MCP Server | Context Agent MCP |
|-----------------|---------------------|-------------------|
| **Notes** | ✅ Full CRUD, search, attachments (7 tools) | ❌ Not implemented |
| **Calendar** | ✅ Full CalDAV (events, recurring, attendees) | ✅ Schedule events, list calendars, free/busy, tasks (4 tools) |
| **Contacts** | ✅ Full CardDAV (address books, contacts) | ✅ Find person, current user details (2 tools) |
| **Files** | ✅ Full WebDAV (read, write, directories) | ✅ Get content, folder tree, sharing (3 tools) |
| **Tables** | ✅ Row CRUD operations | ❌ Not implemented |
| **Deck** | ✅ Boards, stacks, cards | ✅ Create board, add card (2 tools) |
| **Talk** | ❌ Not implemented | ✅ List/send messages, create conversation (4 tools) |
| **Mail** | ❌ Not implemented | ✅ Send email, list mailboxes (2 tools) |
| **AI Features** | ❌ Not implemented | ✅ Image gen, audio2text, doc-gen, context_chat (4 tools) |
| **Web Search** | ❌ Not implemented | ✅ DuckDuckGo, YouTube search (2 tools) |
| **Location** | ❌ Not implemented | ✅ OpenStreetMap, HERE transit, weather (3 tools) |
| **OpenProject** | ❌ Not implemented | ✅ Integration (2 tools) |
| **MCP Resources** | ✅ notes://, nc:// URIs | ❌ Not supported |
| **External MCP** | ❌ Pure server only | ✅ Consumes external MCP servers |
| **Sharing** | ✅ Share management API | ❌ Not implemented |
| **Capabilities** | ✅ Server info resource | ❌ Not exposed |
### Tool Count Summary
- **Nextcloud MCP Server**: ~50+ tools and resources
- Deep integration with specific apps
- Full CRUD operations
- MCP Resources for structured data
- **Context Agent**: ~28+ tools
- Broader feature coverage
- Action-oriented (agent tasks)
- Can aggregate external MCP servers
## Tool Safety & Confirmation
### Context Agent Safety Model
```mermaid
graph TD
Request[User Request] --> Agent[LangGraph Agent]
Agent --> Model[LLM generates tool calls]
Model --> Check{Tool type?}
Check -->|"@safe_tool"| Execute[Execute immediately]
Check -->|"@dangerous_tool"| Queue[Queue for confirmation]
Queue --> UserNode[Request user confirmation]
UserNode -->|Approved| Execute
UserNode -->|Denied| Cancel[Cancel with reason]
Execute --> Result[Return result to agent]
Cancel --> Result
Result --> Agent
classDef safe fill:#e8f5e9
classDef danger fill:#ffe8e8
class Execute safe
class Queue,UserNode,Cancel danger
```
**Safe Tools** (read-only):
- `list_calendars`
- `find_person_in_contacts`
- `list_talk_conversations`
- `get_file_content`
- `get_folder_tree`
**Dangerous Tools** (write operations):
- `schedule_event`
- `send_message_to_conversation`
- `create_public_sharing_link`
- `send_email`
### Nextcloud MCP Server Safety
**No built-in safety classification**:
- All tools treated equally
- Relies on MCP client for validation
- OAuth scopes could control permissions
- User must review all actions
## Error Handling
### Nextcloud MCP Server
```python
try:
note_data = await client.notes.create_note(...)
return CreateNoteResponse(...)
except HTTPStatusError as e:
if e.response.status_code == 403:
raise McpError(ErrorData(
code=-1,
message="Access denied: insufficient permissions"
))
elif e.response.status_code == 413:
raise McpError(ErrorData(
code=-1,
message="Note content too large"
))
elif e.response.status_code == 409:
raise McpError(ErrorData(
code=-1,
message="Note with this title already exists"
))
```
**Features**:
- Comprehensive HTTP status code handling
- User-friendly error messages
- Specific error codes
- Guidance on resolution
### Context Agent
```python
def schedule_event(...):
"""Create event"""
# ... implementation
calendar.add_event(str(c))
return True # Simple boolean return
```
**Features**:
- Minimal error handling
- Exceptions propagate to agent
- LangChain handles retries
- Agent interprets failures
## Use Cases
### When to Use Nextcloud MCP Server
```mermaid
graph LR
Root[Nextcloud MCP Server]
Root --> ExtAccess[External Access]
Root --> OAuth[OAuth Security]
Root --> DeepAPI[Deep API Access]
Root --> Deploy[Standalone Deployment]
ExtAccess --> EA1[Claude Code integration]
ExtAccess --> EA2[IDE plugins with MCP]
ExtAccess --> EA3[Custom MCP clients]
ExtAccess --> EA4[Cross-platform tools]
OAuth --> O1[Token-based auth]
OAuth --> O2[OIDC compliance]
OAuth --> O3[Per-user permissions]
OAuth --> O4[Secure external access]
DeepAPI --> DA1[Full CRUD operations]
DeepAPI --> DA2[Notes management]
DeepAPI --> DA3[Calendar CalDAV]
DeepAPI --> DA4[Contacts CardDAV]
DeepAPI --> DA5[File operations]
DeepAPI --> DA6[Table data]
Deploy --> D1[Docker containers]
Deploy --> D2[Cloud VMs]
Deploy --> D3[Kubernetes]
Deploy --> D4[On-premise servers]
classDef rootStyle fill:#4a90e2,stroke:#2e5c8a,color:#fff
classDef categoryStyle fill:#f39c12,stroke:#d68910,color:#fff
classDef itemStyle fill:#e8f5e9,stroke:#81c784
class Root rootStyle
class ExtAccess,OAuth,DeepAPI,Deploy categoryStyle
class EA1,EA2,EA3,EA4,O1,O2,O3,O4,DA1,DA2,DA3,DA4,DA5,DA6,D1,D2,D3,D4 itemStyle
```
**Best for**:
1. External clients accessing Nextcloud (Claude Code, IDEs)
2. OAuth/OIDC authentication requirements
3. Full CRUD on Notes, Calendar, Contacts, Tables
4. WebDAV file system access
5. MCP Resources for structured data
6. Flexible deployment scenarios
7. Building external integrations
### When to Use Context Agent MCP Server
```mermaid
graph LR
Root[Context Agent MCP]
Root --> Assistant[AI Assistant]
Root --> ActionOriented[Action-Oriented]
Root --> MCPAgg[MCP Aggregation]
Root --> Safety[Safety Features]
Assistant --> A1[Nextcloud UI integration]
Assistant --> A2[Task Processing API]
Assistant --> A3[User requests in Assistant]
Assistant --> A4[Human-in-the-loop]
ActionOriented --> AO1[Send emails]
ActionOriented --> AO2[Create calendar events]
ActionOriented --> AO3[Post Talk messages]
ActionOriented --> AO4[Generate images]
ActionOriented --> AO5[Search web]
MCPAgg --> M1[Consume external MCP servers]
MCPAgg --> M2[Weather services]
MCPAgg --> M3[Maps and transit]
MCPAgg --> M4[Custom integrations]
MCPAgg --> M5[Unified tool interface]
Safety --> S1[Read operations auto-execute]
Safety --> S2[Write operations require approval]
Safety --> S3[User confirmation flow]
Safety --> S4[Agent safety]
classDef rootStyle fill:#9b59b6,stroke:#6c3483,color:#fff
classDef categoryStyle fill:#e74c3c,stroke:#c0392b,color:#fff
classDef itemStyle fill:#fff4e1,stroke:#f39c12
class Root rootStyle
class Assistant,ActionOriented,MCPAgg,Safety categoryStyle
class A1,A2,A3,A4,AO1,AO2,AO3,AO4,AO5,M1,M2,M3,M4,M5,S1,S2,S3,S4 itemStyle
```
**Best for**:
1. AI-driven actions inside Nextcloud UI
2. Assistant app integration
3. Safe/dangerous tool distinction
4. Talk, Mail, Deck operations
5. AI features (image gen, audio2text)
6. Web search and maps
7. Aggregating external MCP servers
8. Agent acting on behalf of users
## Complementary Architecture
The two MCP servers can work together in complementary ways:
```mermaid
graph TB
User[User] -->|Requests AI assistance| Assistant[Nextcloud Assistant App]
Assistant --> ContextAgent[Context Agent]
subgraph ContextAgent["Context Agent (Inside Nextcloud)"]
direction TB
Agent[LangGraph Agent]
MCPServer[MCP Server /mcp]
ToolLoader[Tool Loader]
Agent --> ToolLoader
ToolLoader --> InternalTools[Internal Tools<br/>Talk, Mail, Calendar]
end
subgraph ExternalMCP["External MCP Ecosystem"]
NextcloudMCP[Nextcloud MCP Server<br/>This Project]
WeatherMCP[Weather MCP]
CustomMCP[Custom MCP Services]
end
ToolLoader -->|Consumes| NextcloudMCP
ToolLoader -->|Consumes| WeatherMCP
ToolLoader -->|Consumes| CustomMCP
subgraph ExternalClients["External Clients"]
Claude[Claude Code]
IDE[IDEs with MCP]
end
Claude -->|Direct access| NextcloudMCP
IDE -->|Direct access| NextcloudMCP
NextcloudMCP -->|OAuth/HTTP| NextcloudApps[Nextcloud Apps<br/>Notes, Calendar, Files]
InternalTools -->|nc-py-api| NextcloudApps
classDef internal fill:#e8f5e9
classDef external fill:#e1f5ff
classDef mcp fill:#fff4e1
class Assistant,Agent,MCPServer,ToolLoader,InternalTools,NextcloudApps internal
class Claude,IDE external
class NextcloudMCP,WeatherMCP,CustomMCP mcp
```
### Example Workflows
**Workflow 1: External Client → Nextcloud MCP Server**
```
Claude Code → Nextcloud MCP Server → Nextcloud Notes API
```
- User asks Claude Code to search notes
- Claude Code calls `nc_notes_search_notes` tool
- Returns results directly to user
**Workflow 2: Assistant → Context Agent → Internal Tools**
```
User → Assistant → Context Agent → Send Email Tool
```
- User asks Assistant to send an email
- Context Agent identifies "send_email" as dangerous
- Requests user confirmation
- Sends email via nc-py-api
**Workflow 3: Assistant → Context Agent → External MCP**
```
User → Assistant → Context Agent → Nextcloud MCP Server → Notes
```
- User asks Assistant about notes
- Context Agent consumes Nextcloud MCP Server as external MCP
- Gets notes data via MCP protocol
- Returns to user via Assistant
## Technical Comparison Matrix
| Aspect | Nextcloud MCP Server | Context Agent MCP |
|--------|---------------------|-------------------|
| **Framework** | FastMCP (native) | FastMCP + LangChain |
| **Tool Decorator** | `@mcp.tool()` | `@tool` from LangChain |
| **Tool Loading** | Static (startup) | Dynamic (runtime) |
| **Tool Refresh** | No (restart required) | Every 60 seconds |
| **Resources** | Yes (`@mcp.resource()`) | No |
| **Transports** | SSE, HTTP, Streamable-HTTP | Stateless HTTP only |
| **MCP Mode** | Server only | Server + Client (hybrid) |
| **Client Type** | httpx (custom HTTP) | nc-py-api (native) |
| **Deployment** | Standalone external | Inside Nextcloud (ExApp) |
| **Auth** | BasicAuth or OAuth/OIDC | Session-based (ExApp) |
| **User Context** | Shared or per-token | Per-request `nc.set_user()` |
| **Error Handling** | McpError with codes | Basic exceptions |
| **Type Safety** | Pydantic models | Python types |
| **Safety Model** | No built-in | Safe/Dangerous classification |
| **Dependencies** | FastMCP, httpx, Pydantic | nc-py-api, LangChain, LangGraph |
| **Integration** | HTTP APIs | AppAPI + Task Processing |
| **External MCP** | No | Yes (consumes) |
## Summary
Both MCP servers serve important but different roles in the Nextcloud ecosystem:
### Nextcloud MCP Server (This Project)
- **Purpose**: Expose Nextcloud to external MCP clients
- **Strength**: Deep CRUD operations, OAuth security, standalone deployment
- **Audience**: External developers, Claude Code users, integration builders
### Context Agent MCP Server
- **Purpose**: Bring AI agent capabilities to Nextcloud users
- **Strength**: Action-oriented, safe/dangerous tools, MCP aggregation
- **Audience**: Nextcloud users via Assistant app, AI-driven workflows
**Key Insight**: These are complementary, not competing. Context Agent could even consume Nextcloud MCP Server as one of its external MCP sources, creating a unified ecosystem where:
- External clients access Nextcloud via Nextcloud MCP Server
- Internal users leverage Context Agent for AI assistance
- Context Agent aggregates both internal tools and external MCP servers (including Nextcloud MCP Server)
+4 -3
View File
@@ -217,11 +217,12 @@ NEXTCLOUD_HOST=https://nextcloud.example.com
**How it works**:
1. Server checks `/.well-known/openid-configuration` for `registration_endpoint`
2. Calls `/apps/oidc/register` to register new client
2. Calls `/apps/oidc/register` to register a client on first startup
3. Saves credentials to `.nextcloud_oauth_client.json`
4. Re-registers if credentials expire
4. Reuses these credentials on subsequent startups
5. Re-registers only if credentials are missing or expired
**Best for**: Development, testing, short-lived deployments
**Best for**: Development, testing, quick deployments
### Pre-configured Client
+7 -7
View File
@@ -165,23 +165,23 @@ You have two options for managing OAuth clients:
### Mode A: Automatic Registration (Dynamic Client Registration)
**Best for**: Development, testing, short-lived deployments
**Best for**: Development, testing, quick deployments
**How it works**:
- MCP server automatically registers OAuth client at startup
- MCP server automatically registers an OAuth client on first startup
- Uses Nextcloud's dynamic client registration endpoint
- Saves credentials to `.nextcloud_oauth_client.json`
- Reuses stored credentials on subsequent restarts
- Re-registers automatically if credentials expire
**Pros**:
- Zero configuration required
- Quick setup
- No manual client management
- Automatic credential management
**Cons**:
- Clients expire (default: 1 hour, configurable)
- Must re-register on restart if expired
- Not ideal for long-running production
- Must have dynamic client registration enabled on Nextcloud
**Configuration**: Skip to [Step 4](#step-4-configure-mcp-server) with minimal config.
@@ -192,8 +192,8 @@ You have two options for managing OAuth clients:
**Best for**: Production, long-running deployments, stable environments
**How it works**:
- You manually register OAuth client via Nextcloud CLI
- Provide client credentials to MCP server
- You manually register an OAuth client via Nextcloud CLI
- Provide client credentials to MCP server via environment variables
- Credentials don't expire
**Pros**:
+4 -4
View File
@@ -151,11 +151,11 @@ curl https://your.nextcloud.instance.com/.well-known/openid-configuration
This quick start uses **automatic client registration** which is perfect for:
- Development
- Testing
- Short-lived deployments
- Quick deployments
For **production deployments**, you should:
1. Pre-register OAuth clients manually
2. Use dedicated client credentials
For **production deployments**, consider:
1. Pre-registering OAuth client manually
2. Using dedicated client credentials that don't expire
3. See [OAuth Setup Guide](oauth-setup.md) for production configuration
---
+2
View File
@@ -21,6 +21,7 @@ from nextcloud_mcp_server.server import (
configure_contacts_tools,
configure_deck_tools,
configure_notes_tools,
configure_sharing_tools,
configure_tables_tools,
configure_webdav_tools,
)
@@ -375,6 +376,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"notes": configure_notes_tools,
"tables": configure_tables_tools,
"webdav": configure_webdav_tools,
"sharing": configure_sharing_tools,
"calendar": configure_calendar_tools,
"contacts": configure_contacts_tools,
"deck": configure_deck_tools,
@@ -211,7 +211,6 @@ async def load_or_register_client(
storage_path: str | Path,
client_name: str = "Nextcloud MCP Server",
redirect_uris: list[str] | None = None,
force_register: bool = True,
) -> ClientInfo:
"""
Load client from storage or register a new one if not found/expired.
@@ -219,7 +218,7 @@ async def load_or_register_client(
This function:
1. Checks for existing client credentials in storage
2. Validates the credentials are not expired
3. Registers a new client if needed
3. Registers a new client if needed (no stored credentials or expired)
4. Saves the new client credentials
Args:
@@ -228,7 +227,6 @@ async def load_or_register_client(
storage_path: Path to store client credentials
client_name: Name of the client application
redirect_uris: List of redirect URIs
force_register: Force registration even if valid credentials exist
Returns:
ClientInfo with valid credentials
@@ -239,11 +237,10 @@ async def load_or_register_client(
"""
storage_path = Path(storage_path)
# Try to load existing client unless forced to register
if not force_register:
client_info = load_client_from_file(storage_path)
if client_info:
return client_info
# Try to load existing client
client_info = load_client_from_file(storage_path)
if client_info:
return client_info
# Register new client
logger.info("Registering new OAuth client...")
+6
View File
@@ -15,9 +15,12 @@ from ..controllers.notes_search import NotesSearchController
from .calendar import CalendarClient
from .contacts import ContactsClient
from .deck import DeckClient
from .groups import GroupsClient
from .notes import NotesClient
from .sharing import SharingClient
from .tables import TablesClient
from .webdav import WebDAVClient
from .users import UsersClient
logger = logging.getLogger(__name__)
@@ -71,6 +74,9 @@ class NextcloudClient:
self.calendar = CalendarClient(self._client, username)
self.contacts = ContactsClient(self._client, username)
self.deck = DeckClient(self._client, username)
self.users = UsersClient(self._client, username)
self.groups = GroupsClient(self._client, username)
self.sharing = SharingClient(self._client, username)
# Initialize controllers
self._notes_search = NotesSearchController()
+16 -5
View File
@@ -99,7 +99,7 @@ class DeckClient(BaseNextcloudClient):
permission_edit: bool,
permission_share: bool,
permission_manage: bool,
) -> List[DeckACL]:
) -> DeckACL:
json_data = {
"type": type,
"participant": participant,
@@ -107,10 +107,14 @@ class DeckClient(BaseNextcloudClient):
"permissionShare": permission_share,
"permissionManage": permission_manage,
}
headers = self._get_deck_headers()
response = await self._make_request(
"POST", f"/apps/deck/api/v1.0/boards/{board_id}/acl", json=json_data
"POST",
f"/apps/deck/api/v1.0/boards/{board_id}/acl",
json=json_data,
headers=headers,
)
return [DeckACL(**acl) for acl in response.json()]
return DeckACL(**response.json())
async def update_acl_rule(
self,
@@ -127,13 +131,20 @@ class DeckClient(BaseNextcloudClient):
json_data["permissionShare"] = permission_share
if permission_manage is not None:
json_data["permissionManage"] = permission_manage
headers = self._get_deck_headers()
await self._make_request(
"PUT", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}", json=json_data
"PUT",
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
json=json_data,
headers=headers,
)
async def delete_acl_rule(self, board_id: int, acl_id: int) -> None:
headers = self._get_deck_headers()
await self._make_request(
"DELETE", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}"
"DELETE",
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
headers=headers,
)
async def clone_board(
+151
View File
@@ -0,0 +1,151 @@
"""Nextcloud Groups API client."""
import logging
from typing import List
from .base import BaseNextcloudClient, retry_on_429
logger = logging.getLogger(__name__)
class GroupsClient(BaseNextcloudClient):
"""Client for Nextcloud Groups API operations."""
@retry_on_429
async def search_groups(
self,
search: str | None = None,
limit: int | None = None,
offset: int | None = None,
) -> List[str]:
"""
Search for groups on the Nextcloud server.
Args:
search: Optional search string to filter groups
limit: Optional limit for number of results
offset: Optional offset for pagination
Returns:
List of group IDs matching the search criteria
"""
params = {}
if search is not None:
params["search"] = search
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = await self._client.get(
"/ocs/v2.php/cloud/groups",
params=params,
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
groups = data["ocs"]["data"].get("groups", [])
return groups
@retry_on_429
async def create_group(self, groupid: str) -> None:
"""
Create a new group.
Args:
groupid: The group ID to create
Raises:
HTTPStatusError: If the request fails (e.g., group already exists)
"""
response = await self._client.post(
"/ocs/v2.php/cloud/groups",
data={"groupid": groupid},
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
logger.info(f"Created group: {groupid}")
@retry_on_429
async def delete_group(self, groupid: str) -> None:
"""
Delete a group.
Args:
groupid: The group ID to delete
Raises:
HTTPStatusError: If the request fails (e.g., group doesn't exist)
"""
response = await self._client.delete(
f"/ocs/v2.php/cloud/groups/{groupid}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
logger.info(f"Deleted group: {groupid}")
@retry_on_429
async def get_group_members(self, groupid: str) -> List[str]:
"""
Get members of a group.
Args:
groupid: The group ID
Returns:
List of usernames in the group
"""
response = await self._client.get(
f"/ocs/v2.php/cloud/groups/{groupid}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
users = data["ocs"]["data"].get("users", [])
return users
@retry_on_429
async def get_group_subadmins(self, groupid: str) -> List[str]:
"""
Get subadmins of a group.
Args:
groupid: The group ID
Returns:
List of usernames who are subadmins of the group
"""
response = await self._client.get(
f"/ocs/v2.php/cloud/groups/{groupid}/subadmins",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
# The API returns data as a list or dict depending on results
subadmins_data = data["ocs"]["data"]
if isinstance(subadmins_data, list):
return subadmins_data
return []
@retry_on_429
async def update_group_displayname(self, groupid: str, displayname: str) -> None:
"""
Update a group's display name.
Args:
groupid: The group ID
displayname: The new display name
Raises:
HTTPStatusError: If the request fails
"""
response = await self._client.put(
f"/ocs/v2.php/cloud/groups/{groupid}",
data={"key": "displayname", "value": displayname},
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
logger.info(f"Updated group {groupid} displayname to: {displayname}")
+208
View File
@@ -0,0 +1,208 @@
"""Nextcloud OCS Sharing API client for file/folder sharing operations."""
import logging
from typing import Any
from .base import BaseNextcloudClient, retry_on_429
logger = logging.getLogger(__name__)
class SharingClient(BaseNextcloudClient):
"""Client for Nextcloud OCS Sharing API operations."""
@retry_on_429
async def create_share(
self,
path: str,
share_with: str,
share_type: int = 0,
permissions: int = 1,
) -> dict[str, Any]:
"""Create a share for a file or folder.
Args:
path: Path to file/folder to share (relative to user's files)
share_with: Username (for user share) or group name (for group share)
share_type: Share type (0=user, 1=group, 3=public link)
permissions: Share permissions:
- 1 = read
- 2 = update
- 4 = create
- 8 = delete
- 16 = share
- 31 = all permissions
Common combinations: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
Returns:
Share data including share ID
Raises:
HTTPStatusError: If the request fails
"""
response = await self._client.post(
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data={
"path": path,
"shareType": share_type,
"shareWith": share_with,
"permissions": permissions,
},
)
response.raise_for_status()
data = response.json()
# OCS API v2 uses HTTP-style status codes (200 for success)
# OCS API v1 used custom codes (100 for success)
ocs_status = data["ocs"]["meta"]["statuscode"]
if ocs_status not in (100, 200):
ocs_message = data["ocs"]["meta"].get("message", "Unknown error")
raise RuntimeError(f"OCS API error (code {ocs_status}): {ocs_message}")
share_data = data["ocs"]["data"]
# Handle case where data might be an empty list on error
if not share_data or (isinstance(share_data, list) and len(share_data) == 0):
ocs_message = data["ocs"]["meta"].get("message", "Unknown error")
raise RuntimeError(
f"Share creation failed: {ocs_message} (status {ocs_status})"
)
logger.info(
f"Created share {share_data['id']}: {path} -> {share_with} "
f"(type={share_type}, permissions={permissions})"
)
return share_data
@retry_on_429
async def delete_share(self, share_id: int) -> None:
"""Delete a share by its ID.
Args:
share_id: The share ID to delete
Raises:
HTTPStatusError: If the request fails
"""
response = await self._client.delete(
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
raise RuntimeError(
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
)
logger.info(f"Deleted share {share_id}")
@retry_on_429
async def get_share(self, share_id: int) -> dict[str, Any]:
"""Get information about a specific share.
Args:
share_id: The share ID
Returns:
Share data
Raises:
HTTPStatusError: If the request fails
"""
response = await self._client.get(
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
raise RuntimeError(
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
)
share_data = data["ocs"]["data"]
# The API returns a list with a single share, extract the first element
if isinstance(share_data, list) and len(share_data) > 0:
return share_data[0]
return share_data
@retry_on_429
async def list_shares(
self, path: str | None = None, shared_with_me: bool = False
) -> list[dict[str, Any]]:
"""List shares.
Args:
path: Optional path to filter shares for a specific file/folder
shared_with_me: If True, list shares shared with the current user
Returns:
List of share data
Raises:
HTTPStatusError: If the request fails
"""
params = {}
if path:
params["path"] = path
if shared_with_me:
params["shared_with_me"] = "true"
response = await self._client.get(
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
params=params,
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
raise RuntimeError(
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
)
# Handle both single share and list of shares
shares_data = data["ocs"]["data"]
if isinstance(shares_data, dict):
return [shares_data]
return shares_data if shares_data else []
@retry_on_429
async def update_share(
self, share_id: int, permissions: int | None = None
) -> dict[str, Any]:
"""Update a share's permissions.
Args:
share_id: The share ID to update
permissions: New permissions value (see create_share for values)
Returns:
Updated share data
Raises:
HTTPStatusError: If the request fails
"""
data = {}
if permissions is not None:
data["permissions"] = permissions
response = await self._client.put(
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data=data,
)
response.raise_for_status()
result = response.json()
if result["ocs"]["meta"]["statuscode"] not in (100, 200):
raise RuntimeError(
f"OCS API error: {result['ocs']['meta'].get('message', 'Unknown error')}"
)
logger.info(f"Updated share {share_id}")
return result["ocs"]["data"]
+222
View File
@@ -0,0 +1,222 @@
from typing import List, Optional, Dict
from nextcloud_mcp_server.client.base import BaseNextcloudClient
from nextcloud_mcp_server.models.users import UserDetails
class UsersClient(BaseNextcloudClient):
"""Client for Nextcloud User API operations."""
def _get_user_headers(
self, additional_headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""Get standard headers required for User API calls."""
headers = {"OCS-APIRequest": "true", "Accept": "application/json"}
if additional_headers:
headers.update(additional_headers)
return headers
async def create_user(
self,
userid: str,
password: Optional[str] = None,
display_name: Optional[str] = None,
email: Optional[str] = None,
groups: Optional[List[str]] = None,
subadmin_groups: Optional[List[str]] = None,
quota: Optional[str] = None,
language: Optional[str] = None,
) -> None:
"""
Create a new user on the Nextcloud server.
"""
data = {"userid": userid}
if password is not None:
data["password"] = password
if display_name is not None:
data["displayName"] = display_name
if email is not None:
data["email"] = email
if groups is not None:
for i, group in enumerate(groups):
data[f"groups[{i}]"] = group
if subadmin_groups is not None:
for i, group in enumerate(subadmin_groups):
data[f"subadmin[{i}]"] = group
if quota is not None:
data["quota"] = quota
if language is not None:
data["language"] = language
headers = self._get_user_headers()
await self._make_request(
"POST", "/ocs/v2.php/cloud/users", data=data, headers=headers
)
async def search_users(
self,
search: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List[str]:
"""
Retrieves a list of users from the Nextcloud server.
"""
params = {}
if search is not None:
params["search"] = search
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
headers = self._get_user_headers()
response = await self._make_request(
"GET", "/ocs/v2.php/cloud/users", params=params, headers=headers
)
# The v2 API returns JSON with users as a direct list under data.users
data = response.json()["ocs"]["data"]
return data.get("users", [])
async def get_user_details(self, userid: str) -> UserDetails:
"""
Retrieves information about a single user.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", f"/ocs/v2.php/cloud/users/{userid}", headers=headers
)
return UserDetails(**response.json()["ocs"]["data"])
async def update_user_field(self, userid: str, key: str, value: str) -> None:
"""
Edits attributes related to a user.
"""
data = {"key": key, "value": value}
headers = self._get_user_headers()
await self._make_request(
"PUT", f"/ocs/v2.php/cloud/users/{userid}", data=data, headers=headers
)
async def get_editable_user_fields(self) -> List[str]:
"""
Gets the list of editable data fields for a user.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", "/ocs/v2.php/cloud/user/fields", headers=headers
)
# The v2 API returns data as a direct list
data = response.json()["ocs"]["data"]
return data if isinstance(data, list) else []
async def disable_user(self, userid: str) -> None:
"""
Disables a user on the Nextcloud server.
"""
headers = self._get_user_headers()
await self._make_request(
"PUT", f"/ocs/v2.php/cloud/users/{userid}/disable", headers=headers
)
async def enable_user(self, userid: str) -> None:
"""
Enables a user on the Nextcloud server.
"""
headers = self._get_user_headers()
await self._make_request(
"PUT", f"/ocs/v2.php/cloud/users/{userid}/enable", headers=headers
)
async def delete_user(self, userid: str) -> None:
"""
Deletes a user from the Nextcloud server.
"""
headers = self._get_user_headers()
await self._make_request(
"DELETE", f"/ocs/v2.php/cloud/users/{userid}", headers=headers
)
async def get_user_groups(self, userid: str) -> List[str]:
"""
Retrieves a list of groups the specified user is a member of.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", f"/ocs/v2.php/cloud/users/{userid}/groups", headers=headers
)
# The v2 API returns groups as a direct list under data.groups
data = response.json()["ocs"]["data"]
return data.get("groups", [])
async def add_user_to_group(self, userid: str, groupid: str) -> None:
"""
Adds the specified user to the specified group.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"POST",
f"/ocs/v2.php/cloud/users/{userid}/groups",
data=data,
headers=headers,
)
async def remove_user_from_group(self, userid: str, groupid: str) -> None:
"""
Removes the specified user from the specified group.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"DELETE",
f"/ocs/v2.php/cloud/users/{userid}/groups",
data=data,
headers=headers,
)
async def promote_user_to_subadmin(self, userid: str, groupid: str) -> None:
"""
Makes a user the subadmin of a group.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"POST",
f"/ocs/v2.php/cloud/users/{userid}/subadmins",
data=data,
headers=headers,
)
async def demote_user_from_subadmin(self, userid: str, groupid: str) -> None:
"""
Removes the subadmin rights for the user specified from the group specified.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"DELETE",
f"/ocs/v2.php/cloud/users/{userid}/subadmins",
data=data,
headers=headers,
)
async def get_user_subadmin_groups(self, userid: str) -> List[str]:
"""
Returns the groups in which the user is a subadmin.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", f"/ocs/v2.php/cloud/users/{userid}/subadmins", headers=headers
)
# The v2 API returns data as a direct list
data = response.json()["ocs"]["data"]
return data if isinstance(data, list) else []
async def resend_welcome_email(self, userid: str) -> None:
"""
Triggers the welcome email for this user again.
"""
headers = self._get_user_headers()
await self._make_request(
"POST", f"/ocs/v2.php/cloud/users/{userid}/welcome", headers=headers
)
+40
View File
@@ -0,0 +1,40 @@
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, ConfigDict, Field
class User(BaseModel):
"""Model for creating a new user."""
userid: str
password: Optional[str] = None
displayName: Optional[str] = None
email: Optional[str] = None
groups: Optional[List[str]] = Field(default_factory=list)
subadmin: Optional[List[str]] = Field(default_factory=list)
quota: Optional[str] = None
language: Optional[str] = None
class UserDetails(BaseModel):
"""Model for retrieving detailed user information."""
model_config = ConfigDict(populate_by_name=True)
enabled: bool
id: str
quota: Union[str, Dict[str, Any]] # Can be string or quota object
email: Optional[str] = None # Can be null
displayname: str = Field(
alias="display-name"
) # Handle both displayname and display-name
phone: Optional[str] = None
address: Optional[str] = None
website: Optional[str] = None
twitter: Optional[str] = None
groups: Optional[List[str]] = Field(default_factory=list)
class Group(BaseModel):
"""Model for a user group."""
id: str
+2
View File
@@ -2,6 +2,7 @@ from .calendar import configure_calendar_tools
from .contacts import configure_contacts_tools
from .deck import configure_deck_tools
from .notes import configure_notes_tools
from .sharing import configure_sharing_tools
from .tables import configure_tables_tools
from .webdav import configure_webdav_tools
@@ -10,6 +11,7 @@ __all__ = [
"configure_contacts_tools",
"configure_deck_tools",
"configure_notes_tools",
"configure_sharing_tools",
"configure_tables_tools",
"configure_webdav_tools",
]
+133
View File
@@ -0,0 +1,133 @@
"""MCP tools for Nextcloud file/folder sharing operations."""
import json
from nextcloud_mcp_server.context import get_client
from mcp.server.fastmcp import Context, FastMCP
def configure_sharing_tools(mcp: FastMCP):
"""Configure sharing-related MCP tools.
Args:
mcp: FastMCP server instance
"""
@mcp.tool()
async def nc_share_create(
path: str,
share_with: str,
ctx: Context,
share_type: int = 0,
permissions: int = 1,
) -> str:
"""Create a share for a file or folder in Nextcloud.
Share a file or folder with another user or group. The authenticated user
must own the file/folder being shared.
Args:
path: Path to file/folder to share (relative to your files, e.g., "/document.txt")
share_with: Username (for user share) or group name (for group share)
share_type: Share type - 0 for user (default), 1 for group, 3 for public link
permissions: Share permissions (default: 1 for read-only):
- 1 = read
- 2 = update
- 4 = create
- 8 = delete
- 16 = share
- 31 = all permissions
Common: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
Returns:
JSON string with share information including share ID
"""
client = get_client(ctx)
share_data = await client.sharing.create_share(
path=path,
share_with=share_with,
share_type=share_type,
permissions=permissions,
)
return json.dumps(share_data, indent=2)
@mcp.tool()
async def nc_share_delete(share_id: int, ctx: Context) -> str:
"""Delete a share by its ID.
Remove a share that you created. You must be the owner of the share.
Args:
share_id: The ID of the share to delete
Returns:
JSON string confirming deletion
"""
client = get_client(ctx)
await client.sharing.delete_share(share_id)
return json.dumps(
{"success": True, "message": f"Share {share_id} deleted"}, indent=2
)
@mcp.tool()
async def nc_share_get(share_id: int, ctx: Context) -> str:
"""Get information about a specific share.
Retrieve details about a share by its ID. You must have access to the share
(either as owner or recipient).
Args:
share_id: The ID of the share
Returns:
JSON string with share information
"""
client = get_client(ctx)
share_data = await client.sharing.get_share(share_id)
return json.dumps(share_data, indent=2)
@mcp.tool()
async def nc_share_list(
ctx: Context, path: str | None = None, shared_with_me: bool = False
) -> str:
"""List shares created by you or shared with you.
Args:
path: Optional path to filter shares for a specific file/folder
shared_with_me: If True, list shares that others shared with you.
If False (default), list shares you created.
Returns:
JSON string with list of shares
"""
client = get_client(ctx)
shares = await client.sharing.list_shares(
path=path, shared_with_me=shared_with_me
)
return json.dumps(shares, indent=2)
@mcp.tool()
async def nc_share_update(share_id: int, permissions: int, ctx: Context) -> str:
"""Update the permissions of an existing share.
Modify the permissions for a share you created. You must be the owner.
Args:
share_id: The ID of the share to update
permissions: New permissions value:
- 1 = read
- 2 = update
- 4 = create
- 8 = delete
- 16 = share
- 31 = all permissions
Common: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
Returns:
JSON string with updated share information
"""
client = get_client(ctx)
share_data = await client.sharing.update_share(
share_id=share_id, permissions=permissions
)
return json.dumps(share_data, indent=2)
+3 -3
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.13.0"
version = "0.14.1"
description = ""
authors = [
{name = "Chris Coutinho",email = "chris@coutinho.io"}
@@ -22,8 +22,8 @@ asyncio_mode = "auto"
asyncio_default_test_loop_scope = "session"
asyncio_default_fixture_loop_scope = "session"
log_cli = 1
log_cli_level = "INFO"
log_level = "INFO"
log_cli_level = "WARN"
log_level = "WARN"
markers = [
"integration: marks tests as slow (deselect with '-m \"not slow\"')",
"oauth: marks tests as oauth (deselect with '-m \"not oauth\"')"
+172
View File
@@ -0,0 +1,172 @@
"""Integration tests for Nextcloud Sharing API client."""
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.integration
@pytest.mark.asyncio
async def test_create_and_delete_share(nc_client):
"""Test creating and deleting a file share."""
# Create a test user to share with
test_user = "testuser3"
try:
await nc_client.users.create_user(
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
)
except Exception:
pass # User might already exist
# Create a test file
file_path = "/test_share_file.txt"
file_content = b"Test file for sharing"
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Create a share
share_data = await nc_client.sharing.create_share(
path=file_path,
share_with=test_user, # Share with test user
share_type=0, # User share
permissions=1, # Read-only
)
assert share_data is not None
assert "id" in share_data
share_id = share_data["id"]
logger.info(f"Created share: {share_id}")
# Get share info
share_info = await nc_client.sharing.get_share(share_id)
assert share_info["id"] == share_id
assert share_info["path"] == file_path
assert share_info["permissions"] == 1
# List shares
shares = await nc_client.sharing.list_shares(path=file_path)
assert len(shares) > 0
assert any(s["id"] == share_id for s in shares)
finally:
# Cleanup
if share_id:
await nc_client.sharing.delete_share(share_id)
logger.info(f"Deleted share: {share_id}")
await nc_client.webdav.delete_resource(file_path)
# Cleanup test user
try:
await nc_client.users.delete_user(test_user)
except Exception:
pass
@pytest.mark.asyncio
async def test_update_share_permissions(nc_client):
"""Test updating share permissions."""
# Create a test user to share with
test_user = "testuser3"
try:
await nc_client.users.create_user(
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
)
except Exception:
pass # User might already exist
# Create a test file
file_path = "/test_share_update.txt"
file_content = b"Test file for permission updates"
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Create a share with read-only permissions
share_data = await nc_client.sharing.create_share(
path=file_path,
share_with=test_user,
share_type=0,
permissions=1, # Read-only
)
share_id = share_data["id"]
# Update to read+write permissions
updated_share = await nc_client.sharing.update_share(
share_id=share_id,
permissions=3, # Read + Write
)
assert updated_share["id"] == share_id
assert updated_share["permissions"] == 3
finally:
# Cleanup
if share_id:
await nc_client.sharing.delete_share(share_id)
await nc_client.webdav.delete_resource(file_path)
# Cleanup test user
try:
await nc_client.users.delete_user(test_user)
except Exception:
pass
@pytest.mark.asyncio
async def test_list_shares(nc_client):
"""Test listing all shares."""
# Create a test user to share with
test_user = "testuser3"
try:
await nc_client.users.create_user(
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
)
except Exception:
pass # User might already exist
# Create a test file
file_path = "/test_list_shares.txt"
file_content = b"Test file for listing shares"
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Create a share
share_data = await nc_client.sharing.create_share(
path=file_path,
share_with=test_user,
share_type=0,
permissions=1,
)
share_id = share_data["id"]
# List all shares
all_shares = await nc_client.sharing.list_shares()
assert len(all_shares) > 0
# List shares for specific file
file_shares = await nc_client.sharing.list_shares(path=file_path)
assert len(file_shares) > 0
assert any(s["id"] == share_id for s in file_shares)
finally:
# Cleanup
if share_id:
await nc_client.sharing.delete_share(share_id)
await nc_client.webdav.delete_resource(file_path)
# Cleanup test user
try:
await nc_client.users.delete_user(test_user)
except Exception:
pass
+742 -167
View File
File diff suppressed because it is too large Load Diff
+569
View File
@@ -0,0 +1,569 @@
import json
import logging
import uuid
import pytest
from mcp import ClientSession
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.integration
# Stack MCP Tools Tests
async def test_deck_stack_mcp_tools(
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
):
"""Test complete deck stack operations via MCP tools."""
board_id = temporary_board["id"]
stack_title = f"MCP Test Stack {uuid.uuid4().hex[:8]}"
stack_order = 1
# 1. Create stack via MCP tool
logger.info(f"Creating stack via MCP: {stack_title}")
create_result = await nc_mcp_client.call_tool(
"deck_create_stack",
{"board_id": board_id, "title": stack_title, "order": stack_order},
)
assert create_result.isError is False, (
f"MCP stack creation failed: {create_result.content}"
)
created_stack_response = json.loads(create_result.content[0].text)
stack_id = created_stack_response["id"]
assert created_stack_response["title"] == stack_title
assert created_stack_response["order"] == stack_order
logger.info(f"Stack created via MCP with ID: {stack_id}")
try:
# 2. Get stack via MCP resource
logger.info(f"Getting stack via MCP resource: {stack_id}")
get_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
)
assert len(get_result.contents) == 1, "Expected exactly one content item"
get_stack_response = json.loads(get_result.contents[0].text)
assert get_stack_response["title"] == stack_title
logger.info("Stack retrieved via MCP resource successfully")
# 3. Update stack via MCP tool
updated_title = f"Updated {stack_title}"
updated_order = 2
logger.info(f"Updating stack via MCP tool: {stack_id}")
update_result = await nc_mcp_client.call_tool(
"deck_update_stack",
{
"board_id": board_id,
"stack_id": stack_id,
"title": updated_title,
"order": updated_order,
},
)
assert update_result.isError is False, (
f"MCP stack update failed: {update_result.content}"
)
logger.info("Stack updated via MCP tool successfully")
# 4. Verify update via direct client
updated_stack = await nc_client.deck.get_stack(board_id, stack_id)
assert updated_stack.title == updated_title
assert updated_stack.order == updated_order
logger.info("Stack update verified via direct client")
# 5. List stacks via MCP resource
logger.info("Listing stacks via MCP resource")
list_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks"
)
assert len(list_result.contents) == 1, "Expected exactly one content item"
stacks_data = json.loads(list_result.contents[0].text)
assert isinstance(stacks_data, list)
# Verify our stack is in the list
stack_ids = [stack["id"] for stack in stacks_data]
assert stack_id in stack_ids, "Updated stack not found in list"
logger.info(f"Stack {stack_id} found in stacks list")
# 6. Read stack via MCP resource
logger.info(f"Reading stack via MCP resource: {stack_id}")
read_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
)
read_stack_data = json.loads(read_result.contents[0].text)
assert read_stack_data["title"] == updated_title
logger.info("Stack read via MCP resource successfully")
finally:
# Clean up
await nc_client.deck.delete_stack(board_id, stack_id)
logger.info(f"Cleaned up stack ID: {stack_id}")
# Card MCP Tools Tests
async def test_deck_card_mcp_tools(
nc_mcp_client: ClientSession,
nc_client: NextcloudClient,
temporary_board_with_stack: tuple,
):
"""Test complete deck card operations via MCP tools."""
board_data, stack_data = temporary_board_with_stack
board_id = board_data["id"]
stack_id = stack_data["id"]
card_title = f"MCP Test Card {uuid.uuid4().hex[:8]}"
card_description = f"Test description for {card_title}"
# 1. Create card via MCP tool
logger.info(f"Creating card via MCP: {card_title}")
create_result = await nc_mcp_client.call_tool(
"deck_create_card",
{
"board_id": board_id,
"stack_id": stack_id,
"title": card_title,
"description": card_description,
"type": "plain",
"order": 1,
},
)
assert create_result.isError is False, (
f"MCP card creation failed: {create_result.content}"
)
created_card_response = json.loads(create_result.content[0].text)
card_id = created_card_response["id"]
assert created_card_response["title"] == card_title
assert created_card_response["description"] == card_description
logger.info(f"Card created via MCP with ID: {card_id}")
try:
# 2. Get card via MCP resource
logger.info(f"Getting card via MCP resource: {card_id}")
get_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
)
assert len(get_result.contents) == 1, "Expected exactly one content item"
get_card_response = json.loads(get_result.contents[0].text)
assert get_card_response["title"] == card_title
logger.info("Card retrieved via MCP resource successfully")
# 3. Update card via MCP tool
updated_title = f"Updated {card_title}"
updated_description = f"Updated description for {card_title}"
logger.info(f"Updating card via MCP tool: {card_id}")
update_result = await nc_mcp_client.call_tool(
"deck_update_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"title": updated_title,
"description": updated_description,
},
)
assert update_result.isError is False, (
f"MCP card update failed: {update_result.content}"
)
logger.info("Card updated via MCP tool successfully")
# 4. Verify update via direct client
updated_card = await nc_client.deck.get_card(board_id, stack_id, card_id)
assert updated_card.title == updated_title
assert updated_card.description == updated_description
logger.info("Card update verified via direct client")
# 5. Archive/unarchive card via MCP tools
logger.info(f"Archiving card via MCP tool: {card_id}")
archive_result = await nc_mcp_client.call_tool(
"deck_archive_card",
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
)
assert archive_result.isError is False, (
f"MCP card archive failed: {archive_result.content}"
)
logger.info("Card archived via MCP tool successfully")
logger.info(f"Unarchiving card via MCP tool: {card_id}")
unarchive_result = await nc_mcp_client.call_tool(
"deck_unarchive_card",
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
)
assert unarchive_result.isError is False, (
f"MCP card unarchive failed: {unarchive_result.content}"
)
logger.info("Card unarchived via MCP tool successfully")
# 6. Move card to different position via MCP tool
logger.info(f"Reordering card via MCP tool: {card_id}")
reorder_result = await nc_mcp_client.call_tool(
"deck_reorder_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"order": 10,
"target_stack_id": stack_id,
},
)
assert reorder_result.isError is False, (
f"MCP card reorder failed: {reorder_result.content}"
)
logger.info("Card reordered via MCP tool successfully")
# 7. Read card via MCP resource
logger.info(f"Reading card via MCP resource: {card_id}")
read_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
)
read_card_data = json.loads(read_result.contents[0].text)
assert read_card_data["title"] == updated_title
logger.info("Card read via MCP resource successfully")
finally:
# Clean up
await nc_client.deck.delete_card(board_id, stack_id, card_id)
logger.info(f"Cleaned up card ID: {card_id}")
# Label MCP Tools Tests
async def test_deck_label_mcp_tools(
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
):
"""Test complete deck label operations via MCP tools."""
board_id = temporary_board["id"]
label_title = f"MCP Test Label {uuid.uuid4().hex[:8]}"
label_color = "FF0000" # Red
# 1. Create label via MCP tool
logger.info(f"Creating label via MCP: {label_title}")
create_result = await nc_mcp_client.call_tool(
"deck_create_label",
{"board_id": board_id, "title": label_title, "color": label_color},
)
assert create_result.isError is False, (
f"MCP label creation failed: {create_result.content}"
)
created_label_response = json.loads(create_result.content[0].text)
label_id = created_label_response["id"]
assert created_label_response["title"] == label_title
assert created_label_response["color"] == label_color
logger.info(f"Label created via MCP with ID: {label_id}")
try:
# 2. Get label via MCP resource
logger.info(f"Getting label via MCP resource: {label_id}")
get_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/labels/{label_id}"
)
assert len(get_result.contents) == 1, "Expected exactly one content item"
get_label_response = json.loads(get_result.contents[0].text)
assert get_label_response["title"] == label_title
logger.info("Label retrieved via MCP resource successfully")
# 3. Update label via MCP tool
updated_title = f"Updated {label_title}"
updated_color = "00FF00" # Green
logger.info(f"Updating label via MCP tool: {label_id}")
update_result = await nc_mcp_client.call_tool(
"deck_update_label",
{
"board_id": board_id,
"label_id": label_id,
"title": updated_title,
"color": updated_color,
},
)
assert update_result.isError is False, (
f"MCP label update failed: {update_result.content}"
)
logger.info("Label updated via MCP tool successfully")
# 4. Verify update via direct client
updated_label = await nc_client.deck.get_label(board_id, label_id)
assert updated_label.title == updated_title
assert updated_label.color == updated_color
logger.info("Label update verified via direct client")
# 5. Read label via MCP resource
logger.info(f"Reading label via MCP resource: {label_id}")
read_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/labels/{label_id}"
)
read_label_data = json.loads(read_result.contents[0].text)
assert read_label_data["title"] == updated_title
logger.info("Label read via MCP resource successfully")
finally:
# Clean up
await nc_client.deck.delete_label(board_id, label_id)
logger.info(f"Cleaned up label ID: {label_id}")
# Label-Card Assignment Tests
async def test_deck_card_label_assignment_mcp_tools(
nc_mcp_client: ClientSession,
nc_client: NextcloudClient,
temporary_board_with_card: tuple,
):
"""Test card-label assignment operations via MCP tools."""
board_data, stack_data, card_data = temporary_board_with_card
board_id = board_data["id"]
stack_id = stack_data["id"]
card_id = card_data["id"]
# Create a label for assignment
label = await nc_client.deck.create_label(
board_id, "Assignment Test Label", "0000FF"
)
label_id = label.id
try:
# 1. Assign label to card via MCP tool
logger.info(f"Assigning label {label_id} to card {card_id} via MCP")
assign_result = await nc_mcp_client.call_tool(
"deck_assign_label_to_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"label_id": label_id,
},
)
assert assign_result.isError is False, (
f"MCP label assignment failed: {assign_result.content}"
)
logger.info("Label assigned to card via MCP tool successfully")
# 2. Verify assignment via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.labels:
label_ids = [label.id for label in card.labels]
assert label_id in label_ids, "Label not found in card labels"
logger.info("Label assignment verified via direct client")
# 3. Remove label from card via MCP tool
logger.info(f"Removing label {label_id} from card {card_id} via MCP")
remove_result = await nc_mcp_client.call_tool(
"deck_remove_label_from_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"label_id": label_id,
},
)
assert remove_result.isError is False, (
f"MCP label removal failed: {remove_result.content}"
)
logger.info("Label removed from card via MCP tool successfully")
# 4. Verify removal via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.labels:
label_ids = [label.id for label in card.labels]
assert label_id not in label_ids, (
"Label still found in card labels after removal"
)
logger.info("Label removal verified via direct client")
finally:
# Clean up
await nc_client.deck.delete_label(board_id, label_id)
logger.info(f"Cleaned up label ID: {label_id}")
# User Assignment Tests
async def test_deck_card_user_assignment_mcp_tools(
nc_mcp_client: ClientSession,
nc_client: NextcloudClient,
temporary_board_with_card: tuple,
):
"""Test card-user assignment operations via MCP tools."""
board_data, stack_data, card_data = temporary_board_with_card
board_id = board_data["id"]
stack_id = stack_data["id"]
card_id = card_data["id"]
# Use the current user ID (admin in most test environments)
user_id = "admin"
# 1. Assign user to card via MCP tool
logger.info(f"Assigning user {user_id} to card {card_id} via MCP")
assign_result = await nc_mcp_client.call_tool(
"deck_assign_user_to_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"user_id": user_id,
},
)
assert assign_result.isError is False, (
f"MCP user assignment failed: {assign_result.content}"
)
logger.info("User assigned to card via MCP tool successfully")
# 2. Verify assignment via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.assignedUsers:
user_ids = []
for user in card.assignedUsers:
if hasattr(user, "participant"):
# It's a DeckAssignedUser with participant
user_ids.append(user.participant.uid)
elif hasattr(user, "uid"):
# It's a direct DeckUser
user_ids.append(user.uid)
assert user_id in user_ids, "User not found in card assigned users"
logger.info("User assignment verified via direct client")
# 3. Unassign user from card via MCP tool
logger.info(f"Unassigning user {user_id} from card {card_id} via MCP")
unassign_result = await nc_mcp_client.call_tool(
"deck_unassign_user_from_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"user_id": user_id,
},
)
assert unassign_result.isError is False, (
f"MCP user unassignment failed: {unassign_result.content}"
)
logger.info("User unassigned from card via MCP tool successfully")
# 4. Verify unassignment via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.assignedUsers:
user_ids = []
for user in card.assignedUsers:
if hasattr(user, "participant"):
# It's a DeckAssignedUser with participant
user_ids.append(user.participant.uid)
elif hasattr(user, "uid"):
# It's a direct DeckUser
user_ids.append(user.uid)
assert user_id not in user_ids, (
"User still found in card assigned users after removal"
)
logger.info("User unassignment verified via direct client")
# Error handling tests
async def test_deck_mcp_tools_error_handling(nc_mcp_client: ClientSession):
"""Test error handling for deck MCP tools with invalid parameters."""
non_existent_id = 999999999
# Test stack operations with non-existent board
stack_result = await nc_mcp_client.call_tool(
"deck_create_stack",
{"board_id": non_existent_id, "title": "Should Fail", "order": 1},
)
assert stack_result.isError is True, (
"Expected error for stack creation on non-existent board"
)
# Test card operations with non-existent IDs
card_result = await nc_mcp_client.call_tool(
"deck_create_card",
{
"board_id": non_existent_id,
"stack_id": non_existent_id,
"title": "Should Fail",
"type": "plain",
},
)
assert card_result.isError is True, (
"Expected error for card creation with non-existent IDs"
)
# Test label operations with non-existent board
label_result = await nc_mcp_client.call_tool(
"deck_create_label",
{"board_id": non_existent_id, "title": "Should Fail", "color": "FF0000"},
)
assert label_result.isError is True, (
"Expected error for label creation on non-existent board"
)
logger.info("Error handling tests passed for deck MCP tools")
# Resource template tests
async def test_deck_mcp_resource_templates(nc_mcp_client: ClientSession):
"""Test deck MCP resource templates are properly registered."""
templates = await nc_mcp_client.list_resource_templates()
template_uris = [template.uriTemplate for template in templates.resourceTemplates]
expected_templates = [
"nc://Deck/boards/{board_id}/stacks/{stack_id}",
"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}",
"nc://Deck/boards/{board_id}/labels/{label_id}",
]
for expected_template in expected_templates:
assert expected_template in template_uris, (
f"Expected template '{expected_template}' not found"
)
logger.info(f"Found expected deck resource template: {expected_template}")
# Listing resource tests
async def test_deck_mcp_listing_resources(
nc_mcp_client: ClientSession, temporary_board_with_card: tuple
):
"""Test deck MCP listing resources for stacks and cards."""
board_data, stack_data, card_data = temporary_board_with_card
board_id = board_data["id"]
stack_id = stack_data["id"]
# 1. Test listing stacks resource
logger.info(f"Reading stacks list via MCP resource for board {board_id}")
stacks_resource_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks"
)
stacks_resource_data = json.loads(stacks_resource_result.contents[0].text)
assert isinstance(stacks_resource_data, list)
# Verify our stack is in the resource list
stack_ids = [stack["id"] for stack in stacks_resource_data]
assert stack_id in stack_ids, "Stack not found in stacks resource list"
logger.info("Stack found in stacks resource list")
# 2. Test listing cards resource
logger.info(f"Reading cards list via MCP resource for stack {stack_id}")
cards_resource_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards"
)
cards_resource_data = json.loads(cards_resource_result.contents[0].text)
assert isinstance(cards_resource_data, list)
# Verify our card is in the resource list
card_ids = [card["id"] for card in cards_resource_data]
assert card_data["id"] in card_ids, "Card not found in cards resource list"
logger.info("Card found in cards resource list")
# 3. Test listing labels resource
logger.info(f"Reading labels list via MCP resource for board {board_id}")
labels_resource_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/labels"
)
labels_resource_data = json.loads(labels_resource_result.contents[0].text)
assert isinstance(labels_resource_data, list)
logger.info("Labels resource read successfully")
+358
View File
@@ -0,0 +1,358 @@
"""
Multi-user OAuth tests for Nextcloud Deck board permissions.
Tests verify that the MCP server respects Nextcloud Deck board ACL permissions
when accessed via OAuth authentication with different users.
"""
import json
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def add_board_acl(nc_client, board_id: int, user: str, permission_type: int = 0):
"""
Helper to add ACL entry to a Deck board.
Args:
nc_client: Admin NextcloudClient
board_id: Board ID
user: Username to grant access
permission_type: 0=view, 1=edit, 2=manage
Returns:
ACL entry ID
"""
acl = await nc_client.deck.add_acl_rule(
board_id=board_id,
type=0, # 0 = user, 1 = group
participant=user,
permission_edit=permission_type >= 1,
permission_share=permission_type >= 2,
permission_manage=permission_type >= 2,
)
logger.info(f"Added ACL for board {board_id}: {user} (type={permission_type})")
return acl.id
async def delete_board_acl(nc_client, board_id: int, acl_id: int):
"""Helper to delete a board ACL entry."""
await nc_client.deck.delete_acl_rule(board_id, acl_id)
logger.info(f"Deleted ACL {acl_id} from board {board_id}")
@pytest.mark.asyncio
async def test_deck_board_view_permissions(
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that Deck boards respect view permissions.
Scenario:
1. Admin creates a board as alice
2. Admin adds bob to board with view-only permissions
3. Bob can view the board via MCP tools
4. Diana cannot access the board (no ACL entry)
"""
# Create a board as alice
logger.info("Creating Deck board as alice...")
board = await nc_client.deck.create_board(
"Alice's Shared Board - View Test", "FF0000"
)
board_id = board.id
bob_acl_id = None
try:
# Add bob to board with view-only permission
logger.info("Adding bob to board with view permission...")
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
# Test: Bob can view the board via MCP
logger.info("Bob attempting to list boards via MCP...")
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Bob can see {len(response_data)} boards: {board_ids}")
# Bob should see the shared board
if board_id in board_ids:
logger.info(f"Bob can see shared board {board_id}")
else:
logger.warning(f"Bob cannot see shared board {board_id}")
else:
logger.warning(f"Bob could not list boards: {result.content}")
# Test: Diana cannot see the board
logger.info("Diana attempting to list boards via MCP...")
result = await diana_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Diana can see {len(response_data)} boards")
# Diana should NOT see the board
assert board_id not in board_ids, "Diana should not see board without ACL"
logger.info("Diana correctly cannot see board without ACL")
else:
logger.warning(f"Diana could not list boards: {result.content}")
finally:
# Cleanup
if bob_acl_id:
await delete_board_acl(nc_client, board_id, bob_acl_id)
logger.info(f"Deleting board {board_id}")
await nc_client.deck.delete_board(board_id)
@pytest.mark.asyncio
async def test_deck_board_edit_permissions(
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that Deck boards respect edit permissions.
Scenario:
1. Admin creates a board as alice with a stack
2. Admin adds charlie with edit permission
3. Admin adds bob with view-only permission
4. Charlie can create cards via MCP tools
5. Bob cannot create cards
"""
# Create a board as alice
logger.info("Creating Deck board as alice...")
board = await nc_client.deck.create_board(
"Alice's Shared Board - Edit Test", "00FF00"
)
board_id = board.id
# Create a stack in the board
logger.info("Creating stack in board...")
stack = await nc_client.deck.create_stack(board_id, "Test Stack", 1)
stack_id = stack.id
charlie_acl_id = None
bob_acl_id = None
try:
# Add charlie with edit permission
logger.info("Adding charlie to board with edit permission...")
charlie_acl_id = await add_board_acl(
nc_client, board_id, "charlie", permission_type=1
)
# Add bob with view-only permission
logger.info("Adding bob to board with view permission...")
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
# Test: Charlie can create a card
logger.info("Charlie attempting to create card via MCP...")
result = await charlie_mcp_client.call_tool(
"deck_create_card",
arguments={
"board_id": board_id,
"stack_id": stack_id,
"title": "Charlie's Card",
"description": "Created by Charlie with edit permission",
},
)
if not result.isError:
response_data = json.loads(result.content[0].text)
card_id = response_data.get("id")
logger.info(f"Charlie successfully created card {card_id}")
# Cleanup the card
await nc_client.deck.delete_card(board_id, stack_id, card_id)
else:
logger.warning(f"Charlie could not create card: {result.content}")
# Test: Bob attempts to create a card (should fail)
logger.info("Bob attempting to create card via MCP...")
result = await bob_mcp_client.call_tool(
"deck_create_card",
arguments={
"board_id": board_id,
"stack_id": stack_id,
"title": "Bob's Card",
"description": "Bob trying to create a card",
},
)
if result.isError:
logger.info("Bob correctly denied card creation (view-only)")
else:
logger.warning("Bob unexpectedly succeeded in creating card")
# Cleanup if bob somehow created a card
response_data = json.loads(result.content[0].text)
if "id" in response_data:
await nc_client.deck.delete_card(
board_id, stack_id, response_data["id"]
)
finally:
# Cleanup
if charlie_acl_id:
await delete_board_acl(nc_client, board_id, charlie_acl_id)
if bob_acl_id:
await delete_board_acl(nc_client, board_id, bob_acl_id)
logger.info(f"Deleting board {board_id}")
await nc_client.deck.delete_board(board_id)
@pytest.mark.asyncio
async def test_deck_board_manage_permissions(
nc_client, alice_mcp_client, charlie_mcp_client
):
"""
Test that Deck boards respect manage permissions.
Scenario:
1. Admin creates a board as alice
2. Admin adds charlie with manage permission
3. Charlie can create stacks and modify board settings
"""
# Create a board as alice
logger.info("Creating Deck board as alice...")
board = await nc_client.deck.create_board(
"Alice's Shared Board - Manage Test", "0000FF"
)
board_id = board.id
charlie_acl_id = None
try:
# Add charlie with manage permission
logger.info("Adding charlie to board with manage permission...")
charlie_acl_id = await add_board_acl(
nc_client, board_id, "charlie", permission_type=2
)
# Test: Charlie can create a stack
logger.info("Charlie attempting to create stack via MCP...")
result = await charlie_mcp_client.call_tool(
"deck_create_stack",
arguments={"board_id": board_id, "title": "Charlie's Stack", "order": 1},
)
if not result.isError:
response_data = json.loads(result.content[0].text)
stack_id = response_data.get("id")
logger.info(f"Charlie successfully created stack {stack_id}")
# Cleanup the stack
await nc_client.deck.delete_stack(board_id, stack_id)
else:
logger.warning(f"Charlie could not create stack: {result.content}")
# Test: Charlie can delete a stack (manage permission)
logger.info("Charlie attempting to delete stack via MCP...")
# First create a temporary stack to delete
temp_stack = await nc_client.deck.create_stack(
board_id, "Temp Stack for Deletion", 99
)
result = await charlie_mcp_client.call_tool(
"deck_delete_stack",
arguments={"board_id": board_id, "stack_id": temp_stack.id},
)
if not result.isError:
logger.info("Charlie successfully deleted stack")
else:
logger.warning(f"Charlie could not delete stack: {result.content}")
# Cleanup if deletion via MCP failed
try:
await nc_client.deck.delete_stack(board_id, temp_stack.id)
except Exception:
pass
finally:
# Cleanup
if charlie_acl_id:
await delete_board_acl(nc_client, board_id, charlie_acl_id)
logger.info(f"Deleting board {board_id}")
await nc_client.deck.delete_board(board_id)
@pytest.mark.asyncio
async def test_deck_user_isolation(nc_client, alice_mcp_client, bob_mcp_client):
"""
Test that users can only see their own boards when not shared.
Scenario:
1. Admin creates a board as alice (not shared)
2. Admin creates a board as bob (not shared)
3. Alice can only see her own board
4. Bob can only see his own board
"""
# Create alice's board
logger.info("Creating alice's private board...")
alice_board = await nc_client.deck.create_board("Alice's Private Board", "FF00FF")
alice_board_id = alice_board.id
# Create bob's board
logger.info("Creating bob's private board...")
bob_board = await nc_client.deck.create_board("Bob's Private Board", "00FFFF")
bob_board_id = bob_board.id
try:
# Test: Alice lists boards
logger.info("Alice listing boards via MCP...")
result = await alice_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Alice can see boards: {board_ids}")
# Alice should NOT see Bob's board
assert bob_board_id not in board_ids, (
"Alice should not see Bob's private board"
)
else:
logger.warning(f"Alice could not list boards: {result.content}")
# Test: Bob lists boards
logger.info("Bob listing boards via MCP...")
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Bob can see boards: {board_ids}")
# Bob should NOT see Alice's board
assert alice_board_id not in board_ids, (
"Bob should not see Alice's private board"
)
else:
logger.warning(f"Bob could not list boards: {result.content}")
logger.info("User isolation test passed: users can only see their own boards")
finally:
# Cleanup
logger.info("Cleaning up test boards...")
await nc_client.deck.delete_board(alice_board_id)
await nc_client.deck.delete_board(bob_board_id)
+425
View File
@@ -0,0 +1,425 @@
"""
Multi-user OAuth tests for Nextcloud WebDAV file permissions.
Tests verify that the MCP server respects Nextcloud file sharing permissions
when accessed via OAuth authentication with different users.
All operations (file creation, sharing, access) are performed through MCP tools
to ensure the MCP server properly supports multi-user scenarios.
"""
import json
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
@pytest.mark.asyncio
async def test_file_share_read_permissions(
alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that shared files respect read permissions.
Scenario:
1. Alice creates a file via MCP
2. Alice shares the file with Bob (read-only) via MCP
3. Bob can read the file via MCP tools
4. Diana cannot access the file (no share)
"""
file_path = "/alice_shared_file_read.txt"
file_content = "This file is shared with Bob for reading only."
# Alice creates a file
logger.info(f"Alice creating file: {file_path}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": file_content},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
share_id = None
try:
# Alice shares the file with bob (read-only, permissions=1)
logger.info("Alice sharing file with bob (read-only)...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": file_path,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to create share: {result.content}"
share_data = json.loads(result.content[0].text)
share_id = share_data["id"]
logger.info(f"Created share {share_id}")
# Test: Bob reads the file via MCP
logger.info("Bob attempting to read file via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": file_path}
)
# Bob should be able to read the shared file
if not result.isError:
response_data = json.loads(result.content[0].text)
logger.info(
f"Bob successfully read file: {response_data.get('content', '')[:50]}..."
)
assert "content" in response_data
assert file_content in response_data["content"]
else:
logger.warning(f"Bob could not read file: {result.content}")
# This might fail if the share path is different for bob
# Test: Diana attempts to read the file
logger.info("Diana attempting to read file via MCP...")
result = await diana_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": file_path}
)
# Diana should NOT be able to read (no share)
if result.isError:
logger.info("Diana correctly denied access to unshared file")
else:
logger.warning("Diana unexpectedly could read unshared file")
finally:
# Cleanup - Alice deletes the share and file
if share_id:
logger.info(f"Alice deleting share {share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": share_id}
)
logger.info(f"Alice deleting file {file_path}")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": file_path}
)
@pytest.mark.asyncio
async def test_file_share_write_permissions(
alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that shared files respect write permissions.
Scenario:
1. Alice creates a file via MCP
2. Alice shares the file with Charlie (edit permission) via MCP
3. Alice shares the file with Bob (read-only) via MCP
4. Charlie can edit the file via MCP tools
5. Bob cannot edit the file
"""
file_path = "/alice_shared_file_write.txt"
file_content = "This file is shared with Charlie for editing."
logger.info(f"Alice creating file: {file_path}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": file_content},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
charlie_share_id = None
bob_share_id = None
try:
# Alice shares with Charlie (read+write, permissions=3)
logger.info("Alice sharing file with Charlie (edit permission)...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": file_path,
"share_with": "charlie",
"share_type": 0,
"permissions": 3,
},
)
assert not result.isError, (
f"Alice failed to share with Charlie: {result.content}"
)
charlie_share_data = json.loads(result.content[0].text)
charlie_share_id = charlie_share_data["id"]
logger.info(f"Created share {charlie_share_id} for Charlie")
# Alice shares with Bob (read-only, permissions=1)
logger.info("Alice sharing file with Bob (read-only)...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": file_path,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to share with Bob: {result.content}"
bob_share_data = json.loads(result.content[0].text)
bob_share_id = bob_share_data["id"]
logger.info(f"Created share {bob_share_id} for Bob")
# Test: Charlie can write to the file
logger.info("Charlie attempting to write to file via MCP...")
updated_content = f"{file_content}\nCharlie added this line."
result = await charlie_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": updated_content},
)
if not result.isError:
logger.info("Charlie successfully wrote to file")
else:
logger.warning(f"Charlie could not write to file: {result.content}")
# Test: Bob attempts to write (should fail)
logger.info("Bob attempting to write to file via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": "Bob tries to overwrite this."},
)
# Bob should be denied
if result.isError:
logger.info("Bob correctly denied write access")
else:
logger.warning("Bob unexpectedly succeeded in writing (permissions issue?)")
finally:
# Cleanup - Alice deletes shares and file
if charlie_share_id:
logger.info(f"Alice deleting Charlie's share {charlie_share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": charlie_share_id}
)
if bob_share_id:
logger.info(f"Alice deleting Bob's share {bob_share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": bob_share_id}
)
logger.info(f"Alice deleting file {file_path}")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": file_path}
)
@pytest.mark.asyncio
async def test_file_list_permissions(alice_mcp_client, bob_mcp_client):
"""
Test that file listing respects share permissions.
Scenario:
1. Alice creates her private file via MCP
2. Bob creates his private file via MCP
3. Alice creates a file and shares it with Bob via MCP
4. Alice can list her own files + shared files
5. Bob can list his own files + shared files from Alice
"""
alice_file = "/alice_private_file.txt"
bob_file = "/bob_private_file.txt"
shared_file = "/alice_shared_with_bob.txt"
# Alice creates her private file
logger.info(f"Alice creating private file: {alice_file}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": alice_file, "content": "Alice's private file"},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
# Bob creates his private file
logger.info(f"Bob creating private file: {bob_file}")
result = await bob_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": bob_file, "content": "Bob's private file"},
)
assert not result.isError, f"Bob failed to create file: {result.content}"
# Alice creates a shared file
logger.info(f"Alice creating shared file: {shared_file}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": shared_file, "content": "Shared file content"},
)
assert not result.isError, f"Alice failed to create shared file: {result.content}"
share_id = None
try:
# Alice shares the file with Bob
logger.info("Alice sharing file with Bob...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": shared_file,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to create share: {result.content}"
share_data = json.loads(result.content[0].text)
share_id = share_data["id"]
# Test: Alice lists files in root
logger.info("Alice listing files via MCP...")
result = await alice_mcp_client.call_tool(
"nc_webdav_list_directory", arguments={"path": "/"}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
file_names = [f["name"] for f in response_data]
logger.info(f"Alice can see files: {file_names}")
# Alice should see her own files
# Note: Exact assertions depend on test isolation
else:
logger.warning(f"Alice could not list files: {result.content}")
# Test: Bob lists files in root
logger.info("Bob listing files via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_list_directory", arguments={"path": "/"}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
file_names = [f["name"] for f in response_data]
logger.info(f"Bob can see files: {file_names}")
# Bob should see his own file, but not Alice's private file
# Bob may see shared files in his shared folder or via different path
else:
logger.warning(f"Bob could not list files: {result.content}")
finally:
# Cleanup
if share_id:
logger.info(f"Alice deleting share {share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": share_id}
)
logger.info("Cleaning up Alice's files...")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": alice_file}
)
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": shared_file}
)
logger.info("Cleaning up Bob's files...")
await bob_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": bob_file}
)
@pytest.mark.asyncio
async def test_folder_share_permissions(alice_mcp_client, bob_mcp_client):
"""
Test that folder sharing works correctly.
Scenario:
1. Alice creates a folder via MCP
2. Alice creates files in the folder via MCP
3. Alice shares the folder with Bob via MCP
4. Bob can access files in the shared folder via MCP
"""
folder_path = "/alice_shared_folder"
file_in_folder = f"{folder_path}/document.txt"
file_content = "This is a document in Alice's shared folder"
# Alice creates folder
logger.info(f"Alice creating folder: {folder_path}")
result = await alice_mcp_client.call_tool(
"nc_webdav_create_directory", arguments={"path": folder_path}
)
assert not result.isError, f"Alice failed to create folder: {result.content}"
# Alice creates file in folder
logger.info(f"Alice creating file in folder: {file_in_folder}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_in_folder, "content": file_content},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
share_id = None
try:
# Alice shares the folder with Bob
logger.info("Alice sharing folder with Bob...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": folder_path,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to create share: {result.content}"
share_data = json.loads(result.content[0].text)
share_id = share_data["id"]
logger.info(f"Created folder share {share_id}")
# Test: Bob lists the shared folder
logger.info("Bob attempting to list shared folder via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_list_directory", arguments={"path": folder_path}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
logger.info(f"Bob can see {len(response_data)} files in shared folder")
# Bob should see the file in the shared folder
file_names = [f["name"] for f in response_data]
assert "document.txt" in file_names, (
"Bob should see the file in shared folder"
)
else:
logger.warning(f"Bob could not list shared folder: {result.content}")
# Test: Bob reads the file in the shared folder
logger.info("Bob attempting to read file in shared folder via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": file_in_folder}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
logger.info("Bob successfully read file in shared folder")
assert "content" in response_data
assert file_content in response_data["content"]
else:
logger.warning(
f"Bob could not read file in shared folder: {result.content}"
)
finally:
# Cleanup - Alice deletes the share and folder
if share_id:
logger.info(f"Alice deleting share {share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": share_id}
)
logger.info("Alice cleaning up test folder...")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": folder_path}
)
@@ -0,0 +1,260 @@
"""
Multi-user OAuth tests for Nextcloud Notes permissions.
Tests verify that the MCP server respects Nextcloud Notes sharing permissions
when accessed via OAuth authentication with different users.
"""
import json
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
@pytest.mark.asyncio
async def test_notes_share_read_permissions(
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that shared notes respect read permissions.
Scenario:
1. Admin creates a note as alice
2. Admin shares the note with bob (read-only)
3. Bob can read the note via MCP tools
4. Diana cannot access the note (no share)
"""
# Create a note as alice (using admin client to set up data)
note_title = "Alice's Shared Note - Read Test"
note_content = "This note is shared with Bob for reading only."
note_category = "SharedNotes"
logger.info("Creating note as alice...")
created_note = await nc_client.notes.create_note(
title=note_title, content=note_content, category=note_category
)
note_id = created_note.get("id")
try:
# TODO: Share the note with bob (read-only)
# Note: Nextcloud Notes API doesn't have direct sharing endpoints
# Sharing is typically done at the folder level via WebDAV
# For now, this test documents the expected behavior
# Test: Bob searches for notes via MCP
logger.info("Bob searching for notes via MCP...")
result = await bob_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
)
assert result.isError is False, f"Bob's search failed: {result.content}"
response_data = json.loads(result.content[0].text)
# Bob should see the shared note in search results
# (assuming proper share setup)
assert "results" in response_data
logger.info(f"Bob found {len(response_data['results'])} notes")
# Test: Diana searches for the same note
logger.info("Diana searching for notes via MCP...")
result = await diana_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
)
assert result.isError is False
response_data = json.loads(result.content[0].text)
# Diana should NOT see the note (no share)
assert "results" in response_data
shared_note_ids = [
n["id"] for n in response_data["results"] if n["id"] == note_id
]
assert len(shared_note_ids) == 0, "Diana should not see unshared note"
logger.info("Diana correctly cannot see unshared note")
finally:
# Cleanup
logger.info(f"Cleaning up note {note_id}")
await nc_client.notes.delete_note(note_id)
@pytest.mark.asyncio
async def test_notes_share_write_permissions(
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that shared notes respect write permissions.
Scenario:
1. Admin creates a note as alice
2. Admin shares the note with charlie (edit permission)
3. Admin shares the note with bob (read-only)
4. Charlie can edit the note via MCP tools
5. Bob cannot edit the note
"""
# Create a note as alice
note_title = "Alice's Shared Note - Write Test"
note_content = "This note is shared with Charlie for editing."
note_category = "SharedNotes"
logger.info("Creating note as alice...")
created_note = await nc_client.notes.create_note(
title=note_title, content=note_content, category=note_category
)
note_id = created_note.get("id")
try:
# TODO: Share the note with charlie (edit permission) and bob (read-only)
# Note: Nextcloud Notes sharing is folder-based
# Test: Charlie can append content to the note
logger.info("Charlie attempting to append content via MCP...")
result = await charlie_mcp_client.call_tool(
"nc_notes_append_content",
arguments={
"note_id": note_id,
"content": "\n\nCharlie added this content.",
},
)
# If sharing is properly configured, Charlie should succeed
# Without proper sharing setup, this will fail
logger.info(f"Charlie's append result: isError={result.isError}")
if not result.isError:
logger.info("Charlie successfully appended content (shares configured)")
else:
logger.warning("Charlie could not append (shares not yet configured)")
# Test: Bob attempts to append content (should fail)
logger.info("Bob attempting to append content via MCP...")
result = await bob_mcp_client.call_tool(
"nc_notes_append_content",
arguments={"note_id": note_id, "content": "\n\nBob tried to add this."},
)
# Bob should fail (read-only access)
logger.info(f"Bob's append result: isError={result.isError}")
if result.isError:
logger.info("Bob correctly denied write access")
else:
logger.warning("Bob unexpectedly succeeded (permissions issue?)")
finally:
# Cleanup
logger.info(f"Cleaning up note {note_id}")
await nc_client.notes.delete_note(note_id)
@pytest.mark.asyncio
async def test_user_isolation_notes(nc_client, alice_mcp_client, bob_mcp_client):
"""
Test that users can only see their own notes when not shared.
Scenario:
1. Admin creates a note as alice (not shared)
2. Admin creates a note as bob (not shared)
3. Alice can only see her own note
4. Bob can only see his own note
"""
# Create alice's note
logger.info("Creating alice's private note...")
alice_note = await nc_client.notes.create_note(
title="Alice's Private Note",
content="This is Alice's private content.",
category="AlicePrivate",
)
alice_note_id = alice_note.get("id")
# Create bob's note
logger.info("Creating bob's private note...")
bob_note = await nc_client.notes.create_note(
title="Bob's Private Note",
content="This is Bob's private content.",
category="BobPrivate",
)
bob_note_id = bob_note.get("id")
try:
# Test: Alice searches all notes
logger.info("Alice searching all notes via MCP...")
result = await alice_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False
response_data = json.loads(result.content[0].text)
alice_notes = response_data.get("results", [])
alice_note_ids = [n["id"] for n in alice_notes]
logger.info(f"Alice can see {len(alice_notes)} notes")
# Alice should NOT see Bob's note
assert bob_note_id not in alice_note_ids, (
"Alice should not see Bob's private note"
)
# Test: Bob searches all notes
logger.info("Bob searching all notes via MCP...")
result = await bob_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False
response_data = json.loads(result.content[0].text)
bob_notes = response_data.get("results", [])
bob_note_ids = [n["id"] for n in bob_notes]
logger.info(f"Bob can see {len(bob_notes)} notes")
# Bob should NOT see Alice's note
assert alice_note_id not in bob_note_ids, (
"Bob should not see Alice's private note"
)
logger.info("User isolation test passed: users can only see their own notes")
finally:
# Cleanup
logger.info("Cleaning up test notes...")
await nc_client.notes.delete_note(alice_note_id)
await nc_client.notes.delete_note(bob_note_id)
@pytest.mark.asyncio
async def test_oauth_mcp_clients_initialized(
alice_mcp_client, bob_mcp_client, charlie_mcp_client, diana_mcp_client
):
"""
Smoke test to verify all OAuth MCP clients are properly initialized.
"""
logger.info("Testing alice_mcp_client initialization...")
result = await alice_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Alice MCP client failed: {result.content}"
logger.info("Alice MCP client working")
logger.info("Testing bob_mcp_client initialization...")
result = await bob_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Bob MCP client failed: {result.content}"
logger.info("Bob MCP client working")
logger.info("Testing charlie_mcp_client initialization...")
result = await charlie_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Charlie MCP client failed: {result.content}"
logger.info("Charlie MCP client working")
logger.info("Testing diana_mcp_client initialization...")
result = await diana_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Diana MCP client failed: {result.content}"
logger.info("Diana MCP client working")
logger.info("All OAuth MCP clients successfully initialized!")
+108
View File
@@ -0,0 +1,108 @@
import pytest
from nextcloud_mcp_server.client import NextcloudClient
@pytest.mark.asyncio
async def test_create_and_delete_user(nc_client: NextcloudClient, test_user):
"""Test creating a user and verifying deletion (cleanup by fixture)."""
user_config = test_user
# Create user
await nc_client.users.create_user(**user_config)
# Verify user exists
users = await nc_client.users.search_users(search=user_config["userid"])
assert user_config["userid"] in users
user_details = await nc_client.users.get_user_details(user_config["userid"])
assert user_details.id == user_config["userid"]
assert user_details.displayname == user_config["display_name"]
assert user_details.email == user_config["email"]
# Test deletion explicitly as part of test functionality
await nc_client.users.delete_user(user_config["userid"])
# Verify user is deleted
users = await nc_client.users.search_users(search=user_config["userid"])
assert user_config["userid"] not in users
# Note: Fixture cleanup will also try to delete but handle 404 gracefully
@pytest.mark.asyncio
async def test_update_user_field(nc_client: NextcloudClient, test_user):
"""Test updating user fields."""
user_config = test_user
await nc_client.users.create_user(**user_config)
new_email = f"new.{user_config['email']}"
await nc_client.users.update_user_field(user_config["userid"], "email", new_email)
user_details = await nc_client.users.get_user_details(user_config["userid"])
assert user_details.email == new_email
# Fixture will handle cleanup
@pytest.mark.asyncio
async def test_user_groups(nc_client: NextcloudClient, test_user_in_group):
"""Test adding and removing users from groups."""
user_config, groupid = test_user_in_group
userid = user_config["userid"]
# Verify user is in group
groups = await nc_client.users.get_user_groups(userid)
assert groupid in groups
# Remove user from group
await nc_client.users.remove_user_from_group(userid, groupid)
groups = await nc_client.users.get_user_groups(userid)
assert groupid not in groups
# Fixtures will handle cleanup
@pytest.mark.asyncio
async def test_user_subadmins(nc_client: NextcloudClient, test_user, test_group):
"""Test promoting and demoting subadmins."""
user_config = test_user
groupid = test_group
userid = user_config["userid"]
await nc_client.users.create_user(**user_config)
# Promote to subadmin
await nc_client.users.promote_user_to_subadmin(userid, groupid)
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
assert groupid in subadmin_groups
# Demote from subadmin
await nc_client.users.demote_user_from_subadmin(userid, groupid)
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
assert groupid not in subadmin_groups
# Fixtures will handle cleanup
@pytest.mark.asyncio
async def test_disable_enable_user(nc_client: NextcloudClient, test_user):
"""Test disabling and enabling users."""
user_config = test_user
userid = user_config["userid"]
await nc_client.users.create_user(**user_config)
# Disable user
await nc_client.users.disable_user(userid)
user_details = await nc_client.users.get_user_details(userid)
assert not user_details.enabled
# Enable user
await nc_client.users.enable_user(userid)
user_details = await nc_client.users.get_user_details(userid)
assert user_details.enabled
# Fixture will handle cleanup
@pytest.mark.asyncio
async def test_get_editable_user_fields(nc_client: NextcloudClient):
editable_fields = await nc_client.users.get_editable_user_fields()
assert "displayname" in editable_fields
assert "email" in editable_fields
+674
View File
@@ -0,0 +1,674 @@
=========================
Instruction set for users
=========================
Add a new user
--------------
Create a new user on the Nextcloud server. Authentication is done by sending a
basic HTTP authentication header.
**Syntax: ocs/v1.php/cloud/users**
* HTTP method: POST
* POST argument: userid - string, the required username for the new user
* POST argument: password - string, the password for the new user, leave empty to send welcome mail
* POST argument: displayName - string, the display name for the new user
* POST argument: email - string, the email for the new user, required if password empty
* POST argument: groups - array, the groups for the new user
* POST argument: subadmin - array, the groups in which the new user is subadmin
* POST argument: quota - string, quota for the new user
* POST argument: language - string, language for the new user
Status codes:
* 101 - invalid argument
* 102 - user already exists
* 103 - cannot create sub-admins for admin group
* 104 - group does not exist
* 105 - insufficient privileges for group
* 106 - no group specified (required for sub-admins)
* 107 - hint exceptions
* 108 - an email address is required, to send a password link to the user.
* 109 - sub-admin group does not exist
* 110 - required email address was not provided
* 111 - could not create non-existing user ID
Example
^^^^^^^
::
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users -d userid="Frank" -d password="frankspassword" -H "OCS-APIRequest: true"
* Creates the user ``Frank`` with password ``frankspassword``
* optionally groups can be specified by one or more ``groups[]`` query parameters:
``URL -d groups[]="admin" -D groups[]="Team1"``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Search/get users
----------------
Retrieves a list of users from the Nextcloud server. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users**
* HTTP method: GET
* url arguments: search - string, optional search string
* url arguments: limit - int, optional limit value
* url arguments: offset - int, optional offset value
Status codes:
* 100 - successful
Example
^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users?search=Frank -H "OCS-APIRequest: true"
* Returns list of users matching the search string.
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data>
<users>
<element>Frank</element>
</users>
</data>
</ocs>
Get data of a single user
-------------------------
Retrieves information about a single user. Authentication is done by sending a
Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}**
* HTTP method: GET
Status codes:
* 100 - successful
Example
^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
* Returns information on the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data>
<enabled>true</enabled>
<id>Frank</id>
<quota>0</quota>
<email>frank@example.org</email>
<displayname>Frank K.</displayname>
<display-name>Frank K.</display-name>
<phone>0123 / 456 789</phone>
<address>Foobar 12, 12345 Town</address>
<website>https://nextcloud.com</website>
<twitter>Nextcloud</twitter>
<groups>
<element>group1</element>
<element>group2</element>
</groups>
</data>
</ocs>
Edit data of a single user
--------------------------
Edits attributes related to a user. Users are able to edit email, displayname
and password; admins can also edit the quota value. Further restrictions may apply,
check the `List of editable data fields`_ endpoint. Authentication
is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}**
* HTTP method: PUT
* PUT argument: key, the field to edit:
+ email
+ quota
+ displayname
+ display (**deprecated** use `displayname` instead)
+ phone
+ address
+ website
+ twitter
+ password
* PUT argument: value, the new value for the field
Status codes:
* 101 - invalid argument
* 107 - password policy (hint exception)
* 112 - Setting the password is not supported by the users backend
* 113 - editing field not allowed / field doesnt exist
Examples
^^^^^^^^
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="email" -d value="franksnewemail@example.org" -H "OCS-APIRequest: true"
* Updates the email address for the user ``Frank``
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="quota" -d value="100MB" -H "OCS-APIRequest: true"
* Updates the quota for the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
.. _editable_field_list:
List of editable data fields
----------------------------
Edits attributes related to a user. Users are able to edit email, displayname
and password; admins can also edit the quota value. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/user/fields**
* HTTP method: GET
Status codes:
* 100 - successful
Examples
^^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/user/fields -H "OCS-APIRequest: true"
* Gets the list of fields
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message>OK</message>
</meta>
<data>
<element>displayname</element>
<element>email</element>
<element>phone</element>
<element>address</element>
<element>website</element>
<element>twitter</element>
</data>
</ocs>
Disable a user
--------------
Disables a user on the Nextcloud server so that the user cannot login anymore.
Authentication is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/disable**
* HTTP method: PUT
Statuscodes:
* 100 - successful
* 101 - failure
Example
^^^^^^^
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/disable -H "OCS-APIRequest: true"
* Disables the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Enable a user
-------------
Enables a user on the Nextcloud server so that the user can login again.
Authentication is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/enable**
* HTTP method: PUT
Statuscodes:
* 100 - successful
* 101 - failure
Example
^^^^^^^
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/enable -H "OCS-APIRequest: true"
* Enables the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Delete a user
-------------
Deletes a user from the Nextcloud server. Authentication is done by sending a
Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}**
* HTTP method: DELETE
Statuscodes:
* 100 - successful
* 101 - failure
Example
^^^^^^^
::
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
* Deletes the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Get user's groups
-----------------
Retrieves a list of groups the specified user is a member of. Authentication is
done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
* HTTP method: GET
Status codes:
* 100 - successful
Example
^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -H "OCS-APIRequest: true"
* Retrieves a list of groups of which ``Frank`` is a member
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data>
<groups>
<element>admin</element>
<element>group1</element>
</groups>
</data>
</ocs>
Add user to group
-----------------
Adds the specified user to the specified group. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
* HTTP method: POST
* POST argument: groupid, string - the group to add the user to
Status codes:
* 100 - successful
* 101 - no group specified
* 102 - group does not exist
* 103 - user does not exist
* 104 - insufficient privileges
* 105 - failed to add user to group
Example
^^^^^^^
::
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
* Adds the user ``Frank`` to the group ``newgroup``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Remove user from group
----------------------
Removes the specified user from the specified group. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
* HTTP method: DELETE
* DELETE argument: groupid, string - the group to remove the user from
Status codes:
* 100 - successful
* 101 - no group specified
* 102 - group does not exist
* 103 - user does not exist
* 104 - insufficient privileges
* 105 - failed to remove user from group
Example
^^^^^^^
::
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
* Removes the user ``Frank`` from the group ``newgroup``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Promote user to subadmin
------------------------
Makes a user the subadmin of a group. Authentication is done by sending a Basic
HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
* HTTP method: POST
* POST argument: groupid, string - the group of which to make the user a
subadmin
Status codes:
* 100 - successful
* 101 - user does not exist
* 102 - group does not exist
* 103 - unknown failure
Example
^^^^^^^
::
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="group" -H "OCS-APIRequest: true"
* Makes the user ``Frank`` a subadmin of the ``group`` group
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Demote user from subadmin
-------------------------
Removes the subadmin rights for the user specified from the group specified.
Authentication is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
* HTTP method: DELETE
* DELETE argument: groupid, string - the group from which to remove the user's
subadmin rights
Status codes:
* 100 - successful
* 101 - user does not exist
* 102 - user is not a subadmin of the group / group does not exist
* 103 - unknown failure
Example
^^^^^^^
::
$ curl -X DELETE https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="oldgroup" -H "OCS-APIRequest: true"
* Removes ``Frank's`` subadmin rights from the ``oldgroup`` group
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Get user's subadmin groups
--------------------------
Returns the groups in which the user is a subadmin. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
* HTTP method: GET
Status codes:
* 100 - successful
* 101 - user does not exist
* 102 - unknown failure
Example
^^^^^^^
::
$ curl -X GET https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -H "OCS-APIRequest: true"
* Returns the groups of which ``Frank`` is a subadmin
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data>
<element>testgroup</element>
</data>
</ocs>
Resend the welcome email
------------------------
The request to this endpoint triggers the welcome email for this user again.
**Syntax: ocs/v1.php/cloud/users/{userid}/welcome**
* HTTP method: POST
Status codes:
* 100 - successful
* 101 - email address not available
* 102 - sending email failed
Example
^^^^^^^
::
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/welcome -H "OCS-APIRequest: true"
* Sends the welcome email to ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Generated
+1 -1
View File
@@ -630,7 +630,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.13.0"
version = "0.14.1"
source = { editable = "." }
dependencies = [
{ name = "click" },