Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 46deb0f726 | |||
| daacf08a54 | |||
| cc2a5c9d58 | |||
| 26f8deff17 | |||
| fb3063e94e | |||
| 83f89e9394 | |||
| 5db02313a1 | |||
| b50e212f05 | |||
| 85f8522085 | |||
| a38c795124 | |||
| 7004104873 | |||
| 7a4a31b52d | |||
| 898c2e72ae | |||
| 961f23b5ea |
+3
-1
@@ -4,4 +4,6 @@ __pycache__/
|
||||
*.env
|
||||
.env.local
|
||||
.env.*.local
|
||||
.nextcloud_oauth_test_client.json
|
||||
|
||||
# Generated by pytest used to login users
|
||||
.nextcloud_oauth_shared_test_client.json
|
||||
|
||||
@@ -1,3 +1,15 @@
|
||||
## v0.14.0 (2025-10-15)
|
||||
|
||||
### Feat
|
||||
|
||||
- Add Groups API client
|
||||
- add sharing API client and server tools
|
||||
- **users**: Initialize user API client
|
||||
|
||||
### Fix
|
||||
|
||||
- Update user/groups API to OCS v2
|
||||
|
||||
## v0.13.0 (2025-10-13)
|
||||
|
||||
### Feat
|
||||
|
||||
@@ -38,13 +38,21 @@ mcp run --transport sse nextcloud_mcp_server.app:mcp
|
||||
# Docker development environment with Nextcloud instance
|
||||
docker-compose up
|
||||
|
||||
# After code changes, rebuild and restart only the MCP server container
|
||||
# After code changes, rebuild and restart the appropriate MCP server container:
|
||||
# For basic auth changes (most common) - uses admin credentials
|
||||
docker-compose up --build -d mcp
|
||||
|
||||
# For OAuth changes - uses OAuth authentication flow
|
||||
docker-compose up --build -d mcp-oauth
|
||||
|
||||
# Build Docker image
|
||||
docker build -t nextcloud-mcp-server .
|
||||
```
|
||||
|
||||
**Important: Two MCP Server Containers**
|
||||
- **`mcp`** (port 8000): Uses basic auth with admin credentials. Use this for most development and testing.
|
||||
- **`mcp-oauth`** (port 8001): Uses OAuth authentication. Only use this when working on OAuth-specific features or tests.
|
||||
|
||||
### Environment Setup
|
||||
```bash
|
||||
# Install dependencies
|
||||
@@ -96,18 +104,23 @@ Each Nextcloud app has a corresponding server module that:
|
||||
|
||||
### Testing Structure
|
||||
|
||||
- **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions
|
||||
- **Integration tests** in `tests/integration/` and `tests/client/`, `tests/server/` - Test real Nextcloud API interactions
|
||||
- **Fixtures** in `tests/conftest.py` - Shared test setup and utilities
|
||||
- Tests are marked with `@pytest.mark.integration` for selective running
|
||||
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
|
||||
- **Important**: Integration tests run against live Docker containers. After making code changes:
|
||||
- For basic auth tests: rebuild with `docker-compose up --build -d mcp`
|
||||
- For OAuth tests: rebuild with `docker-compose up --build -d mcp-oauth`
|
||||
|
||||
#### Testing Best Practices
|
||||
- **MANDATORY: Always run tests after implementing features or fixing bugs**
|
||||
- Run tests to completion before considering any task complete
|
||||
- If tests require modifications to pass, ask for permission before proceeding
|
||||
- Use `docker-compose up --build -d mcp` to rebuild MCP container after code changes
|
||||
- **Rebuild the correct container** after code changes:
|
||||
- For basic auth tests (most common): `docker-compose up --build -d mcp`
|
||||
- For OAuth tests: `docker-compose up --build -d mcp-oauth`
|
||||
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
|
||||
- `nc_mcp_client` - MCP client session for tool/resource testing
|
||||
- `nc_mcp_client` - MCP client session for tool/resource testing (uses `mcp` container)
|
||||
- `nc_mcp_oauth_client` - MCP client session for OAuth testing (uses `mcp-oauth` container)
|
||||
- `nc_client` - Direct NextcloudClient for setup/cleanup operations
|
||||
- `temporary_note` - Creates and cleans up test notes automatically
|
||||
- `temporary_addressbook` - Creates and cleans up test address books
|
||||
@@ -115,6 +128,7 @@ Each Nextcloud app has a corresponding server module that:
|
||||
- **Test specific functionality** after changes:
|
||||
- For Notes changes: `uv run pytest tests/integration/test_mcp.py -k "notes" -v`
|
||||
- For specific API changes: `uv run pytest tests/integration/test_notes_api.py -v`
|
||||
- For OAuth changes: `uv run pytest tests/server/test_oauth*.py -v` (remember to rebuild `mcp-oauth` container)
|
||||
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
|
||||
|
||||
#### OAuth/OIDC Testing
|
||||
@@ -123,7 +137,14 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
|
||||
**Automated Testing (Default - Recommended for CI/CD):**
|
||||
- **Default fixtures**: `nc_oauth_client`, `nc_mcp_oauth_client` now use Playwright automation by default
|
||||
- Uses Playwright headless browser automation to complete OAuth flow programmatically
|
||||
- **Shared OAuth Client**: All test users authenticate using a single OAuth client (matching MCP server behavior)
|
||||
- Single `client_id`/`client_secret` pair is registered and reused for all test users
|
||||
- Stored in `.nextcloud_oauth_shared_test_client.json` with `force_register=False` for reuse
|
||||
- Reduces OAuth client registrations and matches production MCP server architecture
|
||||
- All Playwright fixtures: `playwright_oauth_token`, `nc_oauth_client`, `nc_mcp_oauth_client`, `nc_oauth_client_playwright`, `nc_mcp_oauth_client_playwright`
|
||||
- Multi-user fixtures: `alice_oauth_token`, `bob_oauth_token`, `charlie_oauth_token`, `diana_oauth_token`
|
||||
- All use `shared_oauth_client_credentials` fixture for consistent client credentials
|
||||
- Each user gets unique access tokens via same OAuth client (like multiple users using the MCP server)
|
||||
- Requires: `NEXTCLOUD_HOST`, `NEXTCLOUD_USERNAME`, `NEXTCLOUD_PASSWORD` environment variables
|
||||
- Uses `pytest-playwright-asyncio` for async Playwright fixtures
|
||||
- Playwright configuration: Use pytest CLI args like `--browser firefox --headed` to customize
|
||||
@@ -131,13 +152,13 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
|
||||
- Example:
|
||||
```bash
|
||||
# Run all OAuth tests with automated Playwright flow using Firefox
|
||||
uv run pytest tests/integration/test_oauth*.py --browser firefox -v
|
||||
uv run pytest tests/server/test_oauth*.py --browser firefox -v
|
||||
|
||||
# Run specific Playwright tests with visible browser for debugging
|
||||
uv run pytest tests/integration/test_oauth_playwright.py --browser firefox --headed -v
|
||||
uv run pytest tests/server/test_mcp_oauth.py --browser firefox --headed -v
|
||||
|
||||
# Run with Chromium (default)
|
||||
uv run pytest tests/integration/test_oauth.py -v
|
||||
uv run pytest tests/server/test_oauth*.py -v
|
||||
```
|
||||
|
||||
**Interactive Testing (Manual browser login):**
|
||||
@@ -149,18 +170,23 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
|
||||
- Example:
|
||||
```bash
|
||||
# Run OAuth tests with interactive flow (will open browser and wait for manual login)
|
||||
uv run pytest tests/integration/test_oauth_interactive.py -v
|
||||
uv run pytest tests/client/test_oauth_interactive.py -v
|
||||
```
|
||||
|
||||
**Test Environment Setup:**
|
||||
- **Two MCP server containers are available:**
|
||||
- `mcp` (port 8000): Uses basic auth with admin credentials - for most testing
|
||||
- `mcp-oauth` (port 8001): Uses OAuth authentication - for OAuth-specific testing
|
||||
- Start OAuth MCP server: `docker-compose up --build -d mcp-oauth`
|
||||
- OAuth server runs on port 8001 (regular MCP on 8000)
|
||||
- Both flows register OAuth clients dynamically using Nextcloud's OIDC provider
|
||||
- **Important**: When working on OAuth functionality, always rebuild `mcp-oauth` container, not `mcp`
|
||||
- Shared OAuth client is registered once and reused across test runs
|
||||
- Client credentials cached in `.nextcloud_oauth_shared_test_client.json`
|
||||
|
||||
**CI/CD Considerations:**
|
||||
- Interactive OAuth tests are automatically skipped when `GITHUB_ACTIONS` environment variable is set
|
||||
- Automated Playwright tests will run in CI/CD environments
|
||||
- Use Firefox browser in CI: `--browser firefox` (Chromium may have issues with localhost redirects)
|
||||
- Shared client approach reduces test time and API calls to Nextcloud
|
||||
|
||||
### Configuration Files
|
||||
|
||||
|
||||
+1
-1
@@ -63,7 +63,7 @@ services:
|
||||
- 127.0.0.1:8001:8001
|
||||
environment:
|
||||
- NEXTCLOUD_HOST=http://app:80
|
||||
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.01:8001
|
||||
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8001
|
||||
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
|
||||
# No USERNAME/PASSWORD - will use OAuth
|
||||
volumes:
|
||||
|
||||
@@ -21,6 +21,7 @@ from nextcloud_mcp_server.server import (
|
||||
configure_contacts_tools,
|
||||
configure_deck_tools,
|
||||
configure_notes_tools,
|
||||
configure_sharing_tools,
|
||||
configure_tables_tools,
|
||||
configure_webdav_tools,
|
||||
)
|
||||
@@ -375,6 +376,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
"notes": configure_notes_tools,
|
||||
"tables": configure_tables_tools,
|
||||
"webdav": configure_webdav_tools,
|
||||
"sharing": configure_sharing_tools,
|
||||
"calendar": configure_calendar_tools,
|
||||
"contacts": configure_contacts_tools,
|
||||
"deck": configure_deck_tools,
|
||||
|
||||
@@ -15,9 +15,12 @@ from ..controllers.notes_search import NotesSearchController
|
||||
from .calendar import CalendarClient
|
||||
from .contacts import ContactsClient
|
||||
from .deck import DeckClient
|
||||
from .groups import GroupsClient
|
||||
from .notes import NotesClient
|
||||
from .sharing import SharingClient
|
||||
from .tables import TablesClient
|
||||
from .webdav import WebDAVClient
|
||||
from .users import UsersClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -71,6 +74,9 @@ class NextcloudClient:
|
||||
self.calendar = CalendarClient(self._client, username)
|
||||
self.contacts = ContactsClient(self._client, username)
|
||||
self.deck = DeckClient(self._client, username)
|
||||
self.users = UsersClient(self._client, username)
|
||||
self.groups = GroupsClient(self._client, username)
|
||||
self.sharing = SharingClient(self._client, username)
|
||||
|
||||
# Initialize controllers
|
||||
self._notes_search = NotesSearchController()
|
||||
|
||||
@@ -99,7 +99,7 @@ class DeckClient(BaseNextcloudClient):
|
||||
permission_edit: bool,
|
||||
permission_share: bool,
|
||||
permission_manage: bool,
|
||||
) -> List[DeckACL]:
|
||||
) -> DeckACL:
|
||||
json_data = {
|
||||
"type": type,
|
||||
"participant": participant,
|
||||
@@ -107,10 +107,14 @@ class DeckClient(BaseNextcloudClient):
|
||||
"permissionShare": permission_share,
|
||||
"permissionManage": permission_manage,
|
||||
}
|
||||
headers = self._get_deck_headers()
|
||||
response = await self._make_request(
|
||||
"POST", f"/apps/deck/api/v1.0/boards/{board_id}/acl", json=json_data
|
||||
"POST",
|
||||
f"/apps/deck/api/v1.0/boards/{board_id}/acl",
|
||||
json=json_data,
|
||||
headers=headers,
|
||||
)
|
||||
return [DeckACL(**acl) for acl in response.json()]
|
||||
return DeckACL(**response.json())
|
||||
|
||||
async def update_acl_rule(
|
||||
self,
|
||||
@@ -127,13 +131,20 @@ class DeckClient(BaseNextcloudClient):
|
||||
json_data["permissionShare"] = permission_share
|
||||
if permission_manage is not None:
|
||||
json_data["permissionManage"] = permission_manage
|
||||
headers = self._get_deck_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}", json=json_data
|
||||
"PUT",
|
||||
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
|
||||
json=json_data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def delete_acl_rule(self, board_id: int, acl_id: int) -> None:
|
||||
headers = self._get_deck_headers()
|
||||
await self._make_request(
|
||||
"DELETE", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}"
|
||||
"DELETE",
|
||||
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def clone_board(
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
"""Nextcloud Groups API client."""
|
||||
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from .base import BaseNextcloudClient, retry_on_429
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GroupsClient(BaseNextcloudClient):
|
||||
"""Client for Nextcloud Groups API operations."""
|
||||
|
||||
@retry_on_429
|
||||
async def search_groups(
|
||||
self,
|
||||
search: str | None = None,
|
||||
limit: int | None = None,
|
||||
offset: int | None = None,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Search for groups on the Nextcloud server.
|
||||
|
||||
Args:
|
||||
search: Optional search string to filter groups
|
||||
limit: Optional limit for number of results
|
||||
offset: Optional offset for pagination
|
||||
|
||||
Returns:
|
||||
List of group IDs matching the search criteria
|
||||
"""
|
||||
params = {}
|
||||
if search is not None:
|
||||
params["search"] = search
|
||||
if limit is not None:
|
||||
params["limit"] = limit
|
||||
if offset is not None:
|
||||
params["offset"] = offset
|
||||
|
||||
response = await self._client.get(
|
||||
"/ocs/v2.php/cloud/groups",
|
||||
params=params,
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
groups = data["ocs"]["data"].get("groups", [])
|
||||
return groups
|
||||
|
||||
@retry_on_429
|
||||
async def create_group(self, groupid: str) -> None:
|
||||
"""
|
||||
Create a new group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID to create
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails (e.g., group already exists)
|
||||
"""
|
||||
response = await self._client.post(
|
||||
"/ocs/v2.php/cloud/groups",
|
||||
data={"groupid": groupid},
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Created group: {groupid}")
|
||||
|
||||
@retry_on_429
|
||||
async def delete_group(self, groupid: str) -> None:
|
||||
"""
|
||||
Delete a group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID to delete
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails (e.g., group doesn't exist)
|
||||
"""
|
||||
response = await self._client.delete(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Deleted group: {groupid}")
|
||||
|
||||
@retry_on_429
|
||||
async def get_group_members(self, groupid: str) -> List[str]:
|
||||
"""
|
||||
Get members of a group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID
|
||||
|
||||
Returns:
|
||||
List of usernames in the group
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
users = data["ocs"]["data"].get("users", [])
|
||||
return users
|
||||
|
||||
@retry_on_429
|
||||
async def get_group_subadmins(self, groupid: str) -> List[str]:
|
||||
"""
|
||||
Get subadmins of a group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID
|
||||
|
||||
Returns:
|
||||
List of usernames who are subadmins of the group
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}/subadmins",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
# The API returns data as a list or dict depending on results
|
||||
subadmins_data = data["ocs"]["data"]
|
||||
if isinstance(subadmins_data, list):
|
||||
return subadmins_data
|
||||
return []
|
||||
|
||||
@retry_on_429
|
||||
async def update_group_displayname(self, groupid: str, displayname: str) -> None:
|
||||
"""
|
||||
Update a group's display name.
|
||||
|
||||
Args:
|
||||
groupid: The group ID
|
||||
displayname: The new display name
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.put(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}",
|
||||
data={"key": "displayname", "value": displayname},
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Updated group {groupid} displayname to: {displayname}")
|
||||
@@ -0,0 +1,208 @@
|
||||
"""Nextcloud OCS Sharing API client for file/folder sharing operations."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from .base import BaseNextcloudClient, retry_on_429
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SharingClient(BaseNextcloudClient):
|
||||
"""Client for Nextcloud OCS Sharing API operations."""
|
||||
|
||||
@retry_on_429
|
||||
async def create_share(
|
||||
self,
|
||||
path: str,
|
||||
share_with: str,
|
||||
share_type: int = 0,
|
||||
permissions: int = 1,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a share for a file or folder.
|
||||
|
||||
Args:
|
||||
path: Path to file/folder to share (relative to user's files)
|
||||
share_with: Username (for user share) or group name (for group share)
|
||||
share_type: Share type (0=user, 1=group, 3=public link)
|
||||
permissions: Share permissions:
|
||||
- 1 = read
|
||||
- 2 = update
|
||||
- 4 = create
|
||||
- 8 = delete
|
||||
- 16 = share
|
||||
- 31 = all permissions
|
||||
Common combinations: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
|
||||
|
||||
Returns:
|
||||
Share data including share ID
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.post(
|
||||
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
data={
|
||||
"path": path,
|
||||
"shareType": share_type,
|
||||
"shareWith": share_with,
|
||||
"permissions": permissions,
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
# OCS API v2 uses HTTP-style status codes (200 for success)
|
||||
# OCS API v1 used custom codes (100 for success)
|
||||
ocs_status = data["ocs"]["meta"]["statuscode"]
|
||||
if ocs_status not in (100, 200):
|
||||
ocs_message = data["ocs"]["meta"].get("message", "Unknown error")
|
||||
raise RuntimeError(f"OCS API error (code {ocs_status}): {ocs_message}")
|
||||
|
||||
share_data = data["ocs"]["data"]
|
||||
|
||||
# Handle case where data might be an empty list on error
|
||||
if not share_data or (isinstance(share_data, list) and len(share_data) == 0):
|
||||
ocs_message = data["ocs"]["meta"].get("message", "Unknown error")
|
||||
raise RuntimeError(
|
||||
f"Share creation failed: {ocs_message} (status {ocs_status})"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Created share {share_data['id']}: {path} -> {share_with} "
|
||||
f"(type={share_type}, permissions={permissions})"
|
||||
)
|
||||
return share_data
|
||||
|
||||
@retry_on_429
|
||||
async def delete_share(self, share_id: int) -> None:
|
||||
"""Delete a share by its ID.
|
||||
|
||||
Args:
|
||||
share_id: The share ID to delete
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.delete(
|
||||
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
logger.info(f"Deleted share {share_id}")
|
||||
|
||||
@retry_on_429
|
||||
async def get_share(self, share_id: int) -> dict[str, Any]:
|
||||
"""Get information about a specific share.
|
||||
|
||||
Args:
|
||||
share_id: The share ID
|
||||
|
||||
Returns:
|
||||
Share data
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
share_data = data["ocs"]["data"]
|
||||
# The API returns a list with a single share, extract the first element
|
||||
if isinstance(share_data, list) and len(share_data) > 0:
|
||||
return share_data[0]
|
||||
return share_data
|
||||
|
||||
@retry_on_429
|
||||
async def list_shares(
|
||||
self, path: str | None = None, shared_with_me: bool = False
|
||||
) -> list[dict[str, Any]]:
|
||||
"""List shares.
|
||||
|
||||
Args:
|
||||
path: Optional path to filter shares for a specific file/folder
|
||||
shared_with_me: If True, list shares shared with the current user
|
||||
|
||||
Returns:
|
||||
List of share data
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
params = {}
|
||||
if path:
|
||||
params["path"] = path
|
||||
if shared_with_me:
|
||||
params["shared_with_me"] = "true"
|
||||
|
||||
response = await self._client.get(
|
||||
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
|
||||
params=params,
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
# Handle both single share and list of shares
|
||||
shares_data = data["ocs"]["data"]
|
||||
if isinstance(shares_data, dict):
|
||||
return [shares_data]
|
||||
return shares_data if shares_data else []
|
||||
|
||||
@retry_on_429
|
||||
async def update_share(
|
||||
self, share_id: int, permissions: int | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""Update a share's permissions.
|
||||
|
||||
Args:
|
||||
share_id: The share ID to update
|
||||
permissions: New permissions value (see create_share for values)
|
||||
|
||||
Returns:
|
||||
Updated share data
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
data = {}
|
||||
if permissions is not None:
|
||||
data["permissions"] = permissions
|
||||
|
||||
response = await self._client.put(
|
||||
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
data=data,
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
if result["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {result['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
logger.info(f"Updated share {share_id}")
|
||||
return result["ocs"]["data"]
|
||||
@@ -0,0 +1,222 @@
|
||||
from typing import List, Optional, Dict
|
||||
from nextcloud_mcp_server.client.base import BaseNextcloudClient
|
||||
from nextcloud_mcp_server.models.users import UserDetails
|
||||
|
||||
|
||||
class UsersClient(BaseNextcloudClient):
|
||||
"""Client for Nextcloud User API operations."""
|
||||
|
||||
def _get_user_headers(
|
||||
self, additional_headers: Optional[Dict[str, str]] = None
|
||||
) -> Dict[str, str]:
|
||||
"""Get standard headers required for User API calls."""
|
||||
headers = {"OCS-APIRequest": "true", "Accept": "application/json"}
|
||||
if additional_headers:
|
||||
headers.update(additional_headers)
|
||||
return headers
|
||||
|
||||
async def create_user(
|
||||
self,
|
||||
userid: str,
|
||||
password: Optional[str] = None,
|
||||
display_name: Optional[str] = None,
|
||||
email: Optional[str] = None,
|
||||
groups: Optional[List[str]] = None,
|
||||
subadmin_groups: Optional[List[str]] = None,
|
||||
quota: Optional[str] = None,
|
||||
language: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Create a new user on the Nextcloud server.
|
||||
"""
|
||||
data = {"userid": userid}
|
||||
if password is not None:
|
||||
data["password"] = password
|
||||
if display_name is not None:
|
||||
data["displayName"] = display_name
|
||||
if email is not None:
|
||||
data["email"] = email
|
||||
if groups is not None:
|
||||
for i, group in enumerate(groups):
|
||||
data[f"groups[{i}]"] = group
|
||||
if subadmin_groups is not None:
|
||||
for i, group in enumerate(subadmin_groups):
|
||||
data[f"subadmin[{i}]"] = group
|
||||
if quota is not None:
|
||||
data["quota"] = quota
|
||||
if language is not None:
|
||||
data["language"] = language
|
||||
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST", "/ocs/v2.php/cloud/users", data=data, headers=headers
|
||||
)
|
||||
|
||||
async def search_users(
|
||||
self,
|
||||
search: Optional[str] = None,
|
||||
limit: Optional[int] = None,
|
||||
offset: Optional[int] = None,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Retrieves a list of users from the Nextcloud server.
|
||||
"""
|
||||
params = {}
|
||||
if search is not None:
|
||||
params["search"] = search
|
||||
if limit is not None:
|
||||
params["limit"] = limit
|
||||
if offset is not None:
|
||||
params["offset"] = offset
|
||||
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", "/ocs/v2.php/cloud/users", params=params, headers=headers
|
||||
)
|
||||
# The v2 API returns JSON with users as a direct list under data.users
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data.get("users", [])
|
||||
|
||||
async def get_user_details(self, userid: str) -> UserDetails:
|
||||
"""
|
||||
Retrieves information about a single user.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", f"/ocs/v2.php/cloud/users/{userid}", headers=headers
|
||||
)
|
||||
return UserDetails(**response.json()["ocs"]["data"])
|
||||
|
||||
async def update_user_field(self, userid: str, key: str, value: str) -> None:
|
||||
"""
|
||||
Edits attributes related to a user.
|
||||
"""
|
||||
data = {"key": key, "value": value}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/ocs/v2.php/cloud/users/{userid}", data=data, headers=headers
|
||||
)
|
||||
|
||||
async def get_editable_user_fields(self) -> List[str]:
|
||||
"""
|
||||
Gets the list of editable data fields for a user.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", "/ocs/v2.php/cloud/user/fields", headers=headers
|
||||
)
|
||||
# The v2 API returns data as a direct list
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
async def disable_user(self, userid: str) -> None:
|
||||
"""
|
||||
Disables a user on the Nextcloud server.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/ocs/v2.php/cloud/users/{userid}/disable", headers=headers
|
||||
)
|
||||
|
||||
async def enable_user(self, userid: str) -> None:
|
||||
"""
|
||||
Enables a user on the Nextcloud server.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/ocs/v2.php/cloud/users/{userid}/enable", headers=headers
|
||||
)
|
||||
|
||||
async def delete_user(self, userid: str) -> None:
|
||||
"""
|
||||
Deletes a user from the Nextcloud server.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"DELETE", f"/ocs/v2.php/cloud/users/{userid}", headers=headers
|
||||
)
|
||||
|
||||
async def get_user_groups(self, userid: str) -> List[str]:
|
||||
"""
|
||||
Retrieves a list of groups the specified user is a member of.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", f"/ocs/v2.php/cloud/users/{userid}/groups", headers=headers
|
||||
)
|
||||
# The v2 API returns groups as a direct list under data.groups
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data.get("groups", [])
|
||||
|
||||
async def add_user_to_group(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Adds the specified user to the specified group.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/groups",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def remove_user_from_group(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Removes the specified user from the specified group.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"DELETE",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/groups",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def promote_user_to_subadmin(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Makes a user the subadmin of a group.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/subadmins",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def demote_user_from_subadmin(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Removes the subadmin rights for the user specified from the group specified.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"DELETE",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/subadmins",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def get_user_subadmin_groups(self, userid: str) -> List[str]:
|
||||
"""
|
||||
Returns the groups in which the user is a subadmin.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", f"/ocs/v2.php/cloud/users/{userid}/subadmins", headers=headers
|
||||
)
|
||||
# The v2 API returns data as a direct list
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
async def resend_welcome_email(self, userid: str) -> None:
|
||||
"""
|
||||
Triggers the welcome email for this user again.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST", f"/ocs/v2.php/cloud/users/{userid}/welcome", headers=headers
|
||||
)
|
||||
@@ -0,0 +1,40 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
"""Model for creating a new user."""
|
||||
|
||||
userid: str
|
||||
password: Optional[str] = None
|
||||
displayName: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
groups: Optional[List[str]] = Field(default_factory=list)
|
||||
subadmin: Optional[List[str]] = Field(default_factory=list)
|
||||
quota: Optional[str] = None
|
||||
language: Optional[str] = None
|
||||
|
||||
|
||||
class UserDetails(BaseModel):
|
||||
"""Model for retrieving detailed user information."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
enabled: bool
|
||||
id: str
|
||||
quota: Union[str, Dict[str, Any]] # Can be string or quota object
|
||||
email: Optional[str] = None # Can be null
|
||||
displayname: str = Field(
|
||||
alias="display-name"
|
||||
) # Handle both displayname and display-name
|
||||
phone: Optional[str] = None
|
||||
address: Optional[str] = None
|
||||
website: Optional[str] = None
|
||||
twitter: Optional[str] = None
|
||||
groups: Optional[List[str]] = Field(default_factory=list)
|
||||
|
||||
|
||||
class Group(BaseModel):
|
||||
"""Model for a user group."""
|
||||
|
||||
id: str
|
||||
@@ -2,6 +2,7 @@ from .calendar import configure_calendar_tools
|
||||
from .contacts import configure_contacts_tools
|
||||
from .deck import configure_deck_tools
|
||||
from .notes import configure_notes_tools
|
||||
from .sharing import configure_sharing_tools
|
||||
from .tables import configure_tables_tools
|
||||
from .webdav import configure_webdav_tools
|
||||
|
||||
@@ -10,6 +11,7 @@ __all__ = [
|
||||
"configure_contacts_tools",
|
||||
"configure_deck_tools",
|
||||
"configure_notes_tools",
|
||||
"configure_sharing_tools",
|
||||
"configure_tables_tools",
|
||||
"configure_webdav_tools",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
"""MCP tools for Nextcloud file/folder sharing operations."""
|
||||
|
||||
import json
|
||||
|
||||
from nextcloud_mcp_server.context import get_client
|
||||
from mcp.server.fastmcp import Context, FastMCP
|
||||
|
||||
|
||||
def configure_sharing_tools(mcp: FastMCP):
|
||||
"""Configure sharing-related MCP tools.
|
||||
|
||||
Args:
|
||||
mcp: FastMCP server instance
|
||||
"""
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_create(
|
||||
path: str,
|
||||
share_with: str,
|
||||
ctx: Context,
|
||||
share_type: int = 0,
|
||||
permissions: int = 1,
|
||||
) -> str:
|
||||
"""Create a share for a file or folder in Nextcloud.
|
||||
|
||||
Share a file or folder with another user or group. The authenticated user
|
||||
must own the file/folder being shared.
|
||||
|
||||
Args:
|
||||
path: Path to file/folder to share (relative to your files, e.g., "/document.txt")
|
||||
share_with: Username (for user share) or group name (for group share)
|
||||
share_type: Share type - 0 for user (default), 1 for group, 3 for public link
|
||||
permissions: Share permissions (default: 1 for read-only):
|
||||
- 1 = read
|
||||
- 2 = update
|
||||
- 4 = create
|
||||
- 8 = delete
|
||||
- 16 = share
|
||||
- 31 = all permissions
|
||||
Common: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
|
||||
|
||||
Returns:
|
||||
JSON string with share information including share ID
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
share_data = await client.sharing.create_share(
|
||||
path=path,
|
||||
share_with=share_with,
|
||||
share_type=share_type,
|
||||
permissions=permissions,
|
||||
)
|
||||
return json.dumps(share_data, indent=2)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_delete(share_id: int, ctx: Context) -> str:
|
||||
"""Delete a share by its ID.
|
||||
|
||||
Remove a share that you created. You must be the owner of the share.
|
||||
|
||||
Args:
|
||||
share_id: The ID of the share to delete
|
||||
|
||||
Returns:
|
||||
JSON string confirming deletion
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
await client.sharing.delete_share(share_id)
|
||||
return json.dumps(
|
||||
{"success": True, "message": f"Share {share_id} deleted"}, indent=2
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_get(share_id: int, ctx: Context) -> str:
|
||||
"""Get information about a specific share.
|
||||
|
||||
Retrieve details about a share by its ID. You must have access to the share
|
||||
(either as owner or recipient).
|
||||
|
||||
Args:
|
||||
share_id: The ID of the share
|
||||
|
||||
Returns:
|
||||
JSON string with share information
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
share_data = await client.sharing.get_share(share_id)
|
||||
return json.dumps(share_data, indent=2)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_list(
|
||||
ctx: Context, path: str | None = None, shared_with_me: bool = False
|
||||
) -> str:
|
||||
"""List shares created by you or shared with you.
|
||||
|
||||
Args:
|
||||
path: Optional path to filter shares for a specific file/folder
|
||||
shared_with_me: If True, list shares that others shared with you.
|
||||
If False (default), list shares you created.
|
||||
|
||||
Returns:
|
||||
JSON string with list of shares
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
shares = await client.sharing.list_shares(
|
||||
path=path, shared_with_me=shared_with_me
|
||||
)
|
||||
return json.dumps(shares, indent=2)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_update(share_id: int, permissions: int, ctx: Context) -> str:
|
||||
"""Update the permissions of an existing share.
|
||||
|
||||
Modify the permissions for a share you created. You must be the owner.
|
||||
|
||||
Args:
|
||||
share_id: The ID of the share to update
|
||||
permissions: New permissions value:
|
||||
- 1 = read
|
||||
- 2 = update
|
||||
- 4 = create
|
||||
- 8 = delete
|
||||
- 16 = share
|
||||
- 31 = all permissions
|
||||
Common: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
|
||||
|
||||
Returns:
|
||||
JSON string with updated share information
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
share_data = await client.sharing.update_share(
|
||||
share_id=share_id, permissions=permissions
|
||||
)
|
||||
return json.dumps(share_data, indent=2)
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "nextcloud-mcp-server"
|
||||
version = "0.13.0"
|
||||
version = "0.14.0"
|
||||
description = ""
|
||||
authors = [
|
||||
{name = "Chris Coutinho",email = "chris@coutinho.io"}
|
||||
|
||||
@@ -0,0 +1,172 @@
|
||||
"""Integration tests for Nextcloud Sharing API client."""
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_and_delete_share(nc_client):
|
||||
"""Test creating and deleting a file share."""
|
||||
# Create a test user to share with
|
||||
test_user = "testuser3"
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
|
||||
)
|
||||
except Exception:
|
||||
pass # User might already exist
|
||||
|
||||
# Create a test file
|
||||
file_path = "/test_share_file.txt"
|
||||
file_content = b"Test file for sharing"
|
||||
|
||||
await nc_client.webdav.write_file(file_path, file_content)
|
||||
|
||||
share_id = None
|
||||
try:
|
||||
# Create a share
|
||||
share_data = await nc_client.sharing.create_share(
|
||||
path=file_path,
|
||||
share_with=test_user, # Share with test user
|
||||
share_type=0, # User share
|
||||
permissions=1, # Read-only
|
||||
)
|
||||
|
||||
assert share_data is not None
|
||||
assert "id" in share_data
|
||||
share_id = share_data["id"]
|
||||
logger.info(f"Created share: {share_id}")
|
||||
|
||||
# Get share info
|
||||
share_info = await nc_client.sharing.get_share(share_id)
|
||||
assert share_info["id"] == share_id
|
||||
assert share_info["path"] == file_path
|
||||
assert share_info["permissions"] == 1
|
||||
|
||||
# List shares
|
||||
shares = await nc_client.sharing.list_shares(path=file_path)
|
||||
assert len(shares) > 0
|
||||
assert any(s["id"] == share_id for s in shares)
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
await nc_client.sharing.delete_share(share_id)
|
||||
logger.info(f"Deleted share: {share_id}")
|
||||
|
||||
await nc_client.webdav.delete_resource(file_path)
|
||||
|
||||
# Cleanup test user
|
||||
try:
|
||||
await nc_client.users.delete_user(test_user)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_share_permissions(nc_client):
|
||||
"""Test updating share permissions."""
|
||||
# Create a test user to share with
|
||||
test_user = "testuser3"
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
|
||||
)
|
||||
except Exception:
|
||||
pass # User might already exist
|
||||
|
||||
# Create a test file
|
||||
file_path = "/test_share_update.txt"
|
||||
file_content = b"Test file for permission updates"
|
||||
|
||||
await nc_client.webdav.write_file(file_path, file_content)
|
||||
|
||||
share_id = None
|
||||
try:
|
||||
# Create a share with read-only permissions
|
||||
share_data = await nc_client.sharing.create_share(
|
||||
path=file_path,
|
||||
share_with=test_user,
|
||||
share_type=0,
|
||||
permissions=1, # Read-only
|
||||
)
|
||||
share_id = share_data["id"]
|
||||
|
||||
# Update to read+write permissions
|
||||
updated_share = await nc_client.sharing.update_share(
|
||||
share_id=share_id,
|
||||
permissions=3, # Read + Write
|
||||
)
|
||||
|
||||
assert updated_share["id"] == share_id
|
||||
assert updated_share["permissions"] == 3
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
await nc_client.sharing.delete_share(share_id)
|
||||
|
||||
await nc_client.webdav.delete_resource(file_path)
|
||||
|
||||
# Cleanup test user
|
||||
try:
|
||||
await nc_client.users.delete_user(test_user)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_shares(nc_client):
|
||||
"""Test listing all shares."""
|
||||
# Create a test user to share with
|
||||
test_user = "testuser3"
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
|
||||
)
|
||||
except Exception:
|
||||
pass # User might already exist
|
||||
|
||||
# Create a test file
|
||||
file_path = "/test_list_shares.txt"
|
||||
file_content = b"Test file for listing shares"
|
||||
|
||||
await nc_client.webdav.write_file(file_path, file_content)
|
||||
|
||||
share_id = None
|
||||
try:
|
||||
# Create a share
|
||||
share_data = await nc_client.sharing.create_share(
|
||||
path=file_path,
|
||||
share_with=test_user,
|
||||
share_type=0,
|
||||
permissions=1,
|
||||
)
|
||||
share_id = share_data["id"]
|
||||
|
||||
# List all shares
|
||||
all_shares = await nc_client.sharing.list_shares()
|
||||
assert len(all_shares) > 0
|
||||
|
||||
# List shares for specific file
|
||||
file_shares = await nc_client.sharing.list_shares(path=file_path)
|
||||
assert len(file_shares) > 0
|
||||
assert any(s["id"] == share_id for s in file_shares)
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
await nc_client.sharing.delete_share(share_id)
|
||||
|
||||
await nc_client.webdav.delete_resource(file_path)
|
||||
|
||||
# Cleanup test user
|
||||
try:
|
||||
await nc_client.users.delete_user(test_user)
|
||||
except Exception:
|
||||
pass
|
||||
+686
-142
@@ -760,9 +760,9 @@ async def interactive_oauth_token(oauth_callback_server) -> str:
|
||||
client_info = await load_or_register_client(
|
||||
nextcloud_url=nextcloud_host,
|
||||
registration_endpoint=registration_endpoint,
|
||||
storage_path=".nextcloud_oauth_test_client.json",
|
||||
storage_path=".nextcloud_oauth_shared_test_client.json",
|
||||
redirect_uris=[callback_url],
|
||||
force_register=True,
|
||||
force_register=False, # Reuse existing credentials if valid
|
||||
)
|
||||
|
||||
# First, open Nextcloud login page to establish session
|
||||
@@ -810,18 +810,75 @@ async def interactive_oauth_token(oauth_callback_server) -> str:
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def playwright_oauth_token(browser) -> str:
|
||||
async def shared_oauth_client_credentials():
|
||||
"""
|
||||
Fixture to obtain shared OAuth client credentials that will be reused for all users.
|
||||
|
||||
This registers a single OAuth client with Nextcloud that matches the MCP server's
|
||||
registration, allowing all test users to authenticate using the same client_id/secret.
|
||||
|
||||
Returns:
|
||||
Tuple of (client_id, client_secret, callback_url, token_endpoint, authorization_endpoint)
|
||||
"""
|
||||
from nextcloud_mcp_server.auth.client_registration import load_or_register_client
|
||||
|
||||
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
|
||||
if not nextcloud_host:
|
||||
pytest.skip("Shared OAuth client requires NEXTCLOUD_HOST")
|
||||
|
||||
logger.info("Setting up shared OAuth client credentials for all test users...")
|
||||
|
||||
async with httpx.AsyncClient(timeout=30.0) as http_client:
|
||||
# OIDC Discovery
|
||||
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
|
||||
discovery_response = await http_client.get(discovery_url)
|
||||
discovery_response.raise_for_status()
|
||||
oidc_config = discovery_response.json()
|
||||
|
||||
token_endpoint = oidc_config.get("token_endpoint")
|
||||
registration_endpoint = oidc_config.get("registration_endpoint")
|
||||
authorization_endpoint = oidc_config.get("authorization_endpoint")
|
||||
|
||||
if not all([token_endpoint, registration_endpoint, authorization_endpoint]):
|
||||
raise ValueError("OIDC discovery missing required endpoints")
|
||||
|
||||
# Use callback URL that won't actually be used (we extract code from browser URL)
|
||||
callback_url = "http://localhost:9999/oauth/callback"
|
||||
|
||||
# Register or load shared OAuth client (matches MCP server registration)
|
||||
client_info = await load_or_register_client(
|
||||
nextcloud_url=nextcloud_host,
|
||||
registration_endpoint=registration_endpoint,
|
||||
storage_path=".nextcloud_oauth_shared_test_client.json",
|
||||
client_name="Nextcloud MCP Server - Shared Test Client",
|
||||
redirect_uris=[callback_url],
|
||||
force_register=False, # Reuse existing credentials if valid
|
||||
)
|
||||
|
||||
logger.info(f"Shared OAuth client ready: {client_info.client_id[:16]}...")
|
||||
logger.info("This client will be reused for all test user authentications")
|
||||
|
||||
return (
|
||||
client_info.client_id,
|
||||
client_info.client_secret,
|
||||
callback_url,
|
||||
token_endpoint,
|
||||
authorization_endpoint,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def playwright_oauth_token(browser, shared_oauth_client_credentials) -> str:
|
||||
"""
|
||||
Fixture to obtain an OAuth access token using Playwright headless browser automation.
|
||||
|
||||
This fully automates the OAuth flow by:
|
||||
1. Discovering OIDC endpoints
|
||||
2. Registering an OAuth client
|
||||
3. Navigating to authorization URL in headless browser
|
||||
4. Programmatically filling in login form
|
||||
5. Handling OAuth consent
|
||||
6. Extracting auth code from redirect
|
||||
7. Exchanging code for access token
|
||||
1. Using shared OAuth client credentials (reused across all users)
|
||||
2. Navigating to authorization URL in headless browser
|
||||
3. Programmatically filling in login form
|
||||
4. Handling OAuth consent
|
||||
5. Extracting auth code from redirect
|
||||
6. Exchanging code for access token
|
||||
|
||||
Environment variables required:
|
||||
- NEXTCLOUD_HOST: Nextcloud instance URL
|
||||
@@ -844,154 +901,110 @@ async def playwright_oauth_token(browser) -> str:
|
||||
"Playwright OAuth requires NEXTCLOUD_HOST, NEXTCLOUD_USERNAME, and NEXTCLOUD_PASSWORD"
|
||||
)
|
||||
|
||||
logger.info("Starting Playwright-based OAuth flow...")
|
||||
# Unpack shared client credentials
|
||||
client_id, client_secret, callback_url, token_endpoint, authorization_endpoint = (
|
||||
shared_oauth_client_credentials
|
||||
)
|
||||
|
||||
# Use async httpx for all HTTP operations
|
||||
async with httpx.AsyncClient(timeout=30.0) as http_client:
|
||||
# OIDC Discovery
|
||||
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
|
||||
logger.debug(f"Fetching OIDC discovery from: {discovery_url}")
|
||||
logger.info(f"Starting Playwright-based OAuth flow for {username}...")
|
||||
logger.info(f"Using shared OAuth client: {client_id[:16]}...")
|
||||
|
||||
discovery_response = await http_client.get(discovery_url)
|
||||
discovery_response.raise_for_status()
|
||||
oidc_config = discovery_response.json()
|
||||
# Construct authorization URL
|
||||
auth_url = (
|
||||
f"{authorization_endpoint}?"
|
||||
f"response_type=code&"
|
||||
f"client_id={client_id}&"
|
||||
f"redirect_uri={callback_url}&"
|
||||
f"scope=openid%20profile%20email"
|
||||
)
|
||||
|
||||
token_endpoint = oidc_config.get("token_endpoint")
|
||||
registration_endpoint = oidc_config.get("registration_endpoint")
|
||||
authorization_endpoint = oidc_config.get("authorization_endpoint")
|
||||
# Async browser automation using pytest-playwright's browser fixture
|
||||
context = await browser.new_context(ignore_https_errors=True)
|
||||
page = await context.new_page()
|
||||
|
||||
if not all([token_endpoint, registration_endpoint, authorization_endpoint]):
|
||||
raise ValueError("OIDC discovery missing required endpoints")
|
||||
try:
|
||||
# Navigate to authorization URL
|
||||
logger.debug(f"Navigating to: {auth_url}")
|
||||
await page.goto(auth_url, wait_until="networkidle", timeout=30000)
|
||||
|
||||
logger.debug(f"Authorization endpoint: {authorization_endpoint}")
|
||||
logger.debug(f"Token endpoint: {token_endpoint}")
|
||||
# Check if we need to login first
|
||||
current_url = page.url
|
||||
logger.debug(f"Current URL after navigation: {current_url}")
|
||||
|
||||
# Register OAuth client with a callback that won't actually be used
|
||||
# (we'll extract the code from the browser URL instead)
|
||||
callback_url = "http://localhost:9999/oauth/callback"
|
||||
# If we're on a login page, fill in credentials
|
||||
if "/login" in current_url or "/index.php/login" in current_url:
|
||||
logger.info("Login page detected, filling in credentials...")
|
||||
|
||||
# Register client asynchronously
|
||||
client_metadata = {
|
||||
"client_name": "Nextcloud MCP Server - Playwright Tests",
|
||||
"redirect_uris": [callback_url],
|
||||
"token_endpoint_auth_method": "client_secret_post",
|
||||
"grant_types": ["authorization_code", "refresh_token"],
|
||||
"response_types": ["code"],
|
||||
"scope": "openid profile email",
|
||||
}
|
||||
# Wait for login form
|
||||
await page.wait_for_selector('input[name="user"]', timeout=10000)
|
||||
|
||||
reg_response = await http_client.post(
|
||||
registration_endpoint,
|
||||
json=client_metadata,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
reg_response.raise_for_status()
|
||||
client_info_dict = reg_response.json()
|
||||
# Fill in username and password
|
||||
await page.fill('input[name="user"]', username)
|
||||
await page.fill('input[name="password"]', password)
|
||||
|
||||
client_id = client_info_dict["client_id"]
|
||||
client_secret = client_info_dict["client_secret"]
|
||||
logger.debug("Credentials filled, submitting login form...")
|
||||
|
||||
# Construct authorization URL
|
||||
auth_url = (
|
||||
f"{authorization_endpoint}?"
|
||||
f"response_type=code&"
|
||||
f"client_id={client_id}&"
|
||||
f"redirect_uri={callback_url}&"
|
||||
f"scope=openid%20profile%20email"
|
||||
)
|
||||
# Submit the form
|
||||
await page.click('button[type="submit"]')
|
||||
|
||||
logger.info("Opening browser for OAuth authorization...")
|
||||
|
||||
# Async browser automation using pytest-playwright's browser fixture
|
||||
context = await browser.new_context(ignore_https_errors=True)
|
||||
page = await context.new_page()
|
||||
|
||||
try:
|
||||
# Navigate to authorization URL
|
||||
logger.debug(f"Navigating to: {auth_url}")
|
||||
await page.goto(auth_url, wait_until="networkidle", timeout=30000)
|
||||
|
||||
# Check if we need to login first
|
||||
# Wait for navigation after login
|
||||
await page.wait_for_load_state("networkidle", timeout=30000)
|
||||
current_url = page.url
|
||||
logger.debug(f"Current URL after navigation: {current_url}")
|
||||
logger.info(f"After login, current URL: {current_url}")
|
||||
|
||||
# If we're on a login page, fill in credentials
|
||||
if "/login" in current_url or "/index.php/login" in current_url:
|
||||
logger.info("Login page detected, filling in credentials...")
|
||||
|
||||
# Wait for login form
|
||||
await page.wait_for_selector('input[name="user"]', timeout=10000)
|
||||
|
||||
# Fill in username and password
|
||||
await page.fill('input[name="user"]', username)
|
||||
await page.fill('input[name="password"]', password)
|
||||
|
||||
logger.debug("Credentials filled, submitting login form...")
|
||||
|
||||
# Submit the form
|
||||
await page.click('button[type="submit"]')
|
||||
|
||||
# Wait for navigation after login
|
||||
await page.wait_for_load_state("networkidle", timeout=30000)
|
||||
current_url = page.url
|
||||
logger.info(f"After login, current URL: {current_url}")
|
||||
|
||||
# Now we should be on the OAuth authorization/consent page or already redirected
|
||||
# Check if there's an authorize button to click
|
||||
try:
|
||||
# Look for common authorization button patterns
|
||||
authorize_button = await page.query_selector(
|
||||
'button:has-text("Authorize"), button:has-text("Allow"), input[type="submit"][value*="uthoriz"]'
|
||||
)
|
||||
|
||||
if authorize_button:
|
||||
logger.info(
|
||||
"Authorization consent page detected, clicking authorize..."
|
||||
)
|
||||
await authorize_button.click()
|
||||
await page.wait_for_load_state("networkidle", timeout=10000)
|
||||
current_url = page.url
|
||||
logger.debug(f"After authorization, current_url: {current_url}")
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"No authorization button found or already authorized: {e}"
|
||||
)
|
||||
|
||||
# Wait for redirect to callback URL (which will fail to load, but we just need the URL)
|
||||
try:
|
||||
# The redirect might fail since localhost:9999 isn't actually running
|
||||
# But we can still extract the code from the URL
|
||||
await page.wait_for_url(f"{callback_url}*", timeout=10000)
|
||||
except Exception as e:
|
||||
# Expected - the callback URL won't load, but we should have the URL
|
||||
logger.debug(f"Callback redirect (expected to fail): {e}")
|
||||
|
||||
# Extract auth code from URL
|
||||
final_url = page.url
|
||||
logger.debug(f"Final URL: {final_url}")
|
||||
|
||||
parsed_url = urlparse(final_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
auth_code = query_params.get("code", [None])[0]
|
||||
|
||||
if not auth_code:
|
||||
# Take a screenshot for debugging
|
||||
screenshot_path = "/tmp/playwright_oauth_error.png"
|
||||
await page.screenshot(path=screenshot_path)
|
||||
logger.error(f"Screenshot saved to {screenshot_path}")
|
||||
raise ValueError(
|
||||
f"No authorization code found in redirect URL: {final_url}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Successfully extracted authorization code: {auth_code[:20]}..."
|
||||
# Now we should be on the OAuth authorization/consent page or already redirected
|
||||
# Check if there's an authorize button to click
|
||||
try:
|
||||
# Look for common authorization button patterns
|
||||
authorize_button = await page.query_selector(
|
||||
'button:has-text("Authorize"), button:has-text("Allow"), input[type="submit"][value*="uthoriz"]'
|
||||
)
|
||||
|
||||
finally:
|
||||
await context.close()
|
||||
if authorize_button:
|
||||
logger.info(
|
||||
"Authorization consent page detected, clicking authorize..."
|
||||
)
|
||||
await authorize_button.click()
|
||||
await page.wait_for_load_state("networkidle", timeout=10000)
|
||||
current_url = page.url
|
||||
logger.debug(f"After authorization, current_url: {current_url}")
|
||||
except Exception as e:
|
||||
logger.debug(f"No authorization button found or already authorized: {e}")
|
||||
|
||||
# Exchange authorization code for access token
|
||||
logger.info("Exchanging authorization code for access token...")
|
||||
# Wait for redirect to callback URL (which will fail to load, but we just need the URL)
|
||||
try:
|
||||
# The redirect might fail since localhost:9999 isn't actually running
|
||||
# But we can still extract the code from the URL
|
||||
await page.wait_for_url(f"{callback_url}*", timeout=10000)
|
||||
except Exception as e:
|
||||
# Expected - the callback URL won't load, but we should have the URL
|
||||
logger.debug(f"Callback redirect (expected to fail): {e}")
|
||||
|
||||
# Extract auth code from URL
|
||||
final_url = page.url
|
||||
logger.debug(f"Final URL: {final_url}")
|
||||
|
||||
parsed_url = urlparse(final_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
auth_code = query_params.get("code", [None])[0]
|
||||
|
||||
if not auth_code:
|
||||
# Take a screenshot for debugging
|
||||
screenshot_path = "/tmp/playwright_oauth_error.png"
|
||||
await page.screenshot(path=screenshot_path)
|
||||
logger.error(f"Screenshot saved to {screenshot_path}")
|
||||
raise ValueError(
|
||||
f"No authorization code found in redirect URL: {final_url}"
|
||||
)
|
||||
|
||||
logger.info(f"Successfully extracted authorization code: {auth_code[:20]}...")
|
||||
|
||||
finally:
|
||||
await context.close()
|
||||
|
||||
# Exchange authorization code for access token
|
||||
logger.info("Exchanging authorization code for access token...")
|
||||
async with httpx.AsyncClient(timeout=30.0) as http_client:
|
||||
token_response = await http_client.post(
|
||||
token_endpoint,
|
||||
data={
|
||||
@@ -1111,3 +1124,534 @@ async def nc_mcp_oauth_client_playwright(
|
||||
logger.warning(
|
||||
f"Error closing Playwright OAuth streamable HTTP client: {e}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def test_users_setup(nc_client: NextcloudClient):
|
||||
"""
|
||||
Create test users for multi-user OAuth testing.
|
||||
|
||||
Creates four test users:
|
||||
- alice: Owner role, creates resources
|
||||
- bob: Viewer role, read-only access
|
||||
- charlie: Editor role, can edit (in 'editors' group)
|
||||
- diana: No-access role, no shares
|
||||
"""
|
||||
test_user_configs = {
|
||||
"alice": {
|
||||
"password": "AliceSecurePass123!",
|
||||
"email": "alice@example.com",
|
||||
"display_name": "Alice Owner",
|
||||
"groups": [],
|
||||
},
|
||||
"bob": {
|
||||
"password": "BobSecurePass456!",
|
||||
"email": "bob@example.com",
|
||||
"display_name": "Bob Viewer",
|
||||
"groups": [],
|
||||
},
|
||||
"charlie": {
|
||||
"password": "CharlieSecurePass789!",
|
||||
"email": "charlie@example.com",
|
||||
"display_name": "Charlie Editor",
|
||||
"groups": ["editors"],
|
||||
},
|
||||
"diana": {
|
||||
"password": "DianaSecurePass012!",
|
||||
"email": "diana@example.com",
|
||||
"display_name": "Diana NoAccess",
|
||||
"groups": [],
|
||||
},
|
||||
}
|
||||
|
||||
logger.info("Creating test users for multi-user OAuth testing...")
|
||||
created_users = []
|
||||
|
||||
try:
|
||||
# Create the 'editors' group first (charlie needs it)
|
||||
try:
|
||||
# Use admin nc_client to create the group via User API
|
||||
# First, try to create it (will fail if exists, but that's okay)
|
||||
async with httpx.AsyncClient() as http_client:
|
||||
base_url = str(nc_client._client.base_url)
|
||||
# Get password from environment since nc_client doesn't expose it
|
||||
password = os.getenv("NEXTCLOUD_PASSWORD")
|
||||
response = await http_client.post(
|
||||
f"{base_url}/ocs/v2.php/cloud/groups",
|
||||
auth=(nc_client.username, password),
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
data={"groupid": "editors"},
|
||||
)
|
||||
if response.status_code in [
|
||||
200,
|
||||
409,
|
||||
]: # 200 = created, 409 = already exists
|
||||
logger.info("Editors group ready")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Group creation returned {response.status_code}: {response.text}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error creating editors group (may already exist): {e}")
|
||||
|
||||
# Create each test user
|
||||
for username, config in test_user_configs.items():
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=username,
|
||||
password=config["password"],
|
||||
display_name=config["display_name"],
|
||||
email=config["email"],
|
||||
)
|
||||
logger.info(f"Created test user: {username}")
|
||||
created_users.append(username)
|
||||
|
||||
# Add user to groups if specified
|
||||
for group in config["groups"]:
|
||||
try:
|
||||
await nc_client.users.add_user_to_group(username, group)
|
||||
logger.info(f"Added {username} to group {group}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error adding {username} to group {group}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
# User might already exist, that's okay
|
||||
logger.warning(
|
||||
f"Could not create user {username} (may already exist): {e}"
|
||||
)
|
||||
created_users.append(username) # Add to list anyway for cleanup
|
||||
|
||||
logger.info(f"Test users setup complete: {created_users}")
|
||||
yield test_user_configs
|
||||
|
||||
finally:
|
||||
# Cleanup: delete test users
|
||||
logger.info("Cleaning up test users...")
|
||||
for username in created_users:
|
||||
try:
|
||||
await nc_client.users.delete_user(username)
|
||||
logger.info(f"Deleted test user: {username}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error deleting test user {username}: {e}")
|
||||
|
||||
|
||||
async def _get_oauth_token_for_user(
|
||||
browser, shared_oauth_client_credentials, username: str, password: str
|
||||
) -> str:
|
||||
"""
|
||||
Helper function to get OAuth access token for a user via Playwright.
|
||||
|
||||
Uses shared OAuth client credentials to authenticate multiple users with the same client.
|
||||
|
||||
Args:
|
||||
browser: Playwright browser instance
|
||||
shared_oauth_client_credentials: Tuple of (client_id, client_secret, callback_url, token_endpoint, authorization_endpoint)
|
||||
username: Username to authenticate as
|
||||
password: Password for the user
|
||||
|
||||
Returns:
|
||||
OAuth access token string
|
||||
"""
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
|
||||
|
||||
if not nextcloud_host:
|
||||
pytest.skip("OAuth requires NEXTCLOUD_HOST")
|
||||
|
||||
# Unpack shared client credentials
|
||||
client_id, client_secret, callback_url, token_endpoint, authorization_endpoint = (
|
||||
shared_oauth_client_credentials
|
||||
)
|
||||
|
||||
logger.info(f"Getting OAuth token for user: {username}...")
|
||||
logger.info(f"Using shared OAuth client: {client_id[:16]}...")
|
||||
|
||||
# Construct authorization URL with properly encoded redirect_uri
|
||||
from urllib.parse import quote
|
||||
|
||||
auth_url = (
|
||||
f"{authorization_endpoint}?"
|
||||
f"response_type=code&"
|
||||
f"client_id={client_id}&"
|
||||
f"redirect_uri={quote(callback_url, safe='')}&"
|
||||
f"scope=openid%20profile%20email"
|
||||
)
|
||||
|
||||
logger.info(f"Performing browser OAuth flow for {username}...")
|
||||
logger.debug(f"Authorization URL: {auth_url}")
|
||||
|
||||
# Browser automation
|
||||
context = await browser.new_context(ignore_https_errors=True)
|
||||
page = await context.new_page()
|
||||
|
||||
try:
|
||||
await page.goto(auth_url, wait_until="networkidle", timeout=30000)
|
||||
current_url = page.url
|
||||
|
||||
# Login if needed
|
||||
if "/login" in current_url or "/index.php/login" in current_url:
|
||||
logger.info(f"Logging in as {username}...")
|
||||
await page.wait_for_selector('input[name="user"]', timeout=10000)
|
||||
await page.fill('input[name="user"]', username)
|
||||
await page.fill('input[name="password"]', password)
|
||||
await page.click('button[type="submit"]')
|
||||
await page.wait_for_load_state("networkidle", timeout=30000)
|
||||
current_url = page.url
|
||||
|
||||
# Handle OAuth consent if present
|
||||
try:
|
||||
authorize_button = await page.query_selector(
|
||||
'button:has-text("Authorize"), button:has-text("Allow"), input[type="submit"][value*="uthoriz"]'
|
||||
)
|
||||
if authorize_button:
|
||||
logger.info(f"Authorizing for {username}...")
|
||||
await authorize_button.click()
|
||||
await page.wait_for_load_state("networkidle", timeout=10000)
|
||||
except Exception as e:
|
||||
logger.debug(f"No authorization needed for {username}: {e}")
|
||||
|
||||
# Wait for redirect and extract auth code
|
||||
try:
|
||||
await page.wait_for_url(f"{callback_url}*", timeout=30000)
|
||||
except Exception:
|
||||
pass # Expected - callback won't load
|
||||
|
||||
final_url = page.url
|
||||
parsed_url = urlparse(final_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
auth_code = query_params.get("code", [None])[0]
|
||||
|
||||
if not auth_code:
|
||||
raise ValueError(
|
||||
f"No authorization code found for {username} in URL: {final_url}"
|
||||
)
|
||||
|
||||
logger.info(f"Got auth code for {username}: {auth_code[:20]}...")
|
||||
|
||||
finally:
|
||||
await context.close()
|
||||
|
||||
# Exchange code for token
|
||||
logger.info(f"Exchanging auth code for access token ({username})...")
|
||||
async with httpx.AsyncClient(timeout=30.0) as http_client:
|
||||
token_response = await http_client.post(
|
||||
token_endpoint,
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": auth_code,
|
||||
"redirect_uri": callback_url,
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
},
|
||||
)
|
||||
|
||||
token_response.raise_for_status()
|
||||
token_data = token_response.json()
|
||||
access_token = token_data.get("access_token")
|
||||
|
||||
if not access_token:
|
||||
raise ValueError(f"No access_token for {username}: {token_data}")
|
||||
|
||||
logger.info(f"Successfully obtained OAuth token for {username}")
|
||||
return access_token
|
||||
|
||||
|
||||
# Parallel token retrieval fixture - fetches all OAuth tokens concurrently
|
||||
@pytest.fixture(scope="session")
|
||||
async def all_oauth_tokens(
|
||||
browser, shared_oauth_client_credentials, test_users_setup
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Fetch OAuth tokens for all test users in parallel for speed.
|
||||
|
||||
Returns a dict mapping username to OAuth access token.
|
||||
This is significantly faster than fetching tokens sequentially.
|
||||
|
||||
Note: We add a small stagger between starting each flow to avoid
|
||||
race conditions in Nextcloud's OAuth session handling.
|
||||
"""
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
logger.info("Fetching OAuth tokens for all users in parallel...")
|
||||
|
||||
async def get_token_with_delay(username: str, config: dict, delay: float):
|
||||
"""Get token for a user after a small delay to stagger requests."""
|
||||
if delay > 0:
|
||||
await asyncio.sleep(delay)
|
||||
return await _get_oauth_token_for_user(
|
||||
browser, shared_oauth_client_credentials, username, config["password"]
|
||||
)
|
||||
|
||||
# Create tasks for all users with staggered starts (2.0s apart)
|
||||
tasks = {
|
||||
username: get_token_with_delay(username, config, (idx + 1) * 2.0)
|
||||
for idx, (username, config) in enumerate(test_users_setup.items())
|
||||
}
|
||||
|
||||
# Run all token fetches concurrently
|
||||
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
|
||||
|
||||
# Build result dict, handling any errors
|
||||
tokens = {}
|
||||
for username, result in zip(tasks.keys(), results):
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Failed to get OAuth token for {username}: {result}")
|
||||
raise result
|
||||
tokens[username] = result
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
logger.info(
|
||||
f"Successfully fetched {len(tokens)} OAuth tokens in parallel "
|
||||
f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)"
|
||||
)
|
||||
return tokens
|
||||
|
||||
|
||||
# Session-scoped OAuth token fixtures - now use the parallel fixture
|
||||
@pytest.fixture(scope="session")
|
||||
async def alice_oauth_token(all_oauth_tokens) -> str:
|
||||
"""OAuth token for alice (cached for session). Uses shared OAuth client."""
|
||||
return all_oauth_tokens["alice"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def bob_oauth_token(all_oauth_tokens) -> str:
|
||||
"""OAuth token for bob (cached for session). Uses shared OAuth client."""
|
||||
return all_oauth_tokens["bob"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def charlie_oauth_token(all_oauth_tokens) -> str:
|
||||
"""OAuth token for charlie (cached for session). Uses shared OAuth client."""
|
||||
return all_oauth_tokens["charlie"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def diana_oauth_token(all_oauth_tokens) -> str:
|
||||
"""OAuth token for diana (cached for session). Uses shared OAuth client."""
|
||||
return all_oauth_tokens["diana"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def alice_mcp_client(alice_oauth_token) -> AsyncGenerator[ClientSession, Any]:
|
||||
"""MCP client authenticated as alice (owner role)."""
|
||||
token = alice_oauth_token
|
||||
|
||||
# Create MCP client session with proper lifecycle management
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
streamable_context = streamablehttp_client(
|
||||
"http://127.0.0.1:8001/mcp", headers=headers
|
||||
)
|
||||
session_context = None
|
||||
|
||||
try:
|
||||
read_stream, write_stream, _ = await streamable_context.__aenter__()
|
||||
session_context = ClientSession(read_stream, write_stream)
|
||||
session = await session_context.__aenter__()
|
||||
await session.initialize()
|
||||
logger.info("Alice MCP client session initialized")
|
||||
|
||||
yield session
|
||||
|
||||
finally:
|
||||
if session_context is not None:
|
||||
try:
|
||||
await session_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing alice session: {e}")
|
||||
try:
|
||||
await streamable_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing alice streamable context: {e}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def bob_mcp_client(bob_oauth_token) -> AsyncGenerator[ClientSession, Any]:
|
||||
"""MCP client authenticated as bob (viewer role)."""
|
||||
token = bob_oauth_token
|
||||
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
streamable_context = streamablehttp_client(
|
||||
"http://127.0.0.1:8001/mcp", headers=headers
|
||||
)
|
||||
session_context = None
|
||||
|
||||
try:
|
||||
read_stream, write_stream, _ = await streamable_context.__aenter__()
|
||||
session_context = ClientSession(read_stream, write_stream)
|
||||
session = await session_context.__aenter__()
|
||||
await session.initialize()
|
||||
logger.info("Bob MCP client session initialized")
|
||||
|
||||
yield session
|
||||
|
||||
finally:
|
||||
if session_context is not None:
|
||||
try:
|
||||
await session_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing bob session: {e}")
|
||||
try:
|
||||
await streamable_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing bob streamable context: {e}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def charlie_mcp_client(charlie_oauth_token) -> AsyncGenerator[ClientSession, Any]:
|
||||
"""MCP client authenticated as charlie (editor role, in 'editors' group)."""
|
||||
token = charlie_oauth_token
|
||||
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
streamable_context = streamablehttp_client(
|
||||
"http://127.0.0.1:8001/mcp", headers=headers
|
||||
)
|
||||
session_context = None
|
||||
|
||||
try:
|
||||
read_stream, write_stream, _ = await streamable_context.__aenter__()
|
||||
session_context = ClientSession(read_stream, write_stream)
|
||||
session = await session_context.__aenter__()
|
||||
await session.initialize()
|
||||
logger.info("Charlie MCP client session initialized")
|
||||
|
||||
yield session
|
||||
|
||||
finally:
|
||||
if session_context is not None:
|
||||
try:
|
||||
await session_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing charlie session: {e}")
|
||||
try:
|
||||
await streamable_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing charlie streamable context: {e}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def diana_mcp_client(diana_oauth_token) -> AsyncGenerator[ClientSession, Any]:
|
||||
"""MCP client authenticated as diana (no-access role)."""
|
||||
token = diana_oauth_token
|
||||
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
streamable_context = streamablehttp_client(
|
||||
"http://127.0.0.1:8001/mcp", headers=headers
|
||||
)
|
||||
session_context = None
|
||||
|
||||
try:
|
||||
read_stream, write_stream, _ = await streamable_context.__aenter__()
|
||||
session_context = ClientSession(read_stream, write_stream)
|
||||
session = await session_context.__aenter__()
|
||||
await session.initialize()
|
||||
logger.info("Diana MCP client session initialized")
|
||||
|
||||
yield session
|
||||
|
||||
finally:
|
||||
if session_context is not None:
|
||||
try:
|
||||
await session_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing diana session: {e}")
|
||||
try:
|
||||
await streamable_context.__aexit__(None, None, None)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing diana streamable context: {e}")
|
||||
|
||||
|
||||
# Test user/group fixtures for clean test isolation
|
||||
@pytest.fixture
|
||||
async def test_user(nc_client: NextcloudClient):
|
||||
"""
|
||||
Fixture that creates a test user and cleans it up after the test.
|
||||
|
||||
Returns a dict with user details that can be customized.
|
||||
Usage:
|
||||
async def test_something(test_user):
|
||||
user_config = test_user
|
||||
await nc_client.users.create_user(**user_config)
|
||||
"""
|
||||
import uuid
|
||||
|
||||
# Generate unique user ID to avoid conflicts
|
||||
userid = f"testuser_{uuid.uuid4().hex[:8]}"
|
||||
password = "SecureTestPassword123!"
|
||||
|
||||
user_config = {
|
||||
"userid": userid,
|
||||
"password": password,
|
||||
"display_name": f"Test User {userid}",
|
||||
"email": f"{userid}@example.com",
|
||||
}
|
||||
|
||||
# Cleanup before (in case of previous failed run)
|
||||
try:
|
||||
await nc_client.users.delete_user(userid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
yield user_config
|
||||
|
||||
# Cleanup after test
|
||||
try:
|
||||
await nc_client.users.delete_user(userid)
|
||||
logger.debug(f"Cleaned up test user: {userid}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cleanup test user {userid}: {e}")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def test_group(nc_client: NextcloudClient):
|
||||
"""
|
||||
Fixture that creates a test group and cleans it up after the test.
|
||||
|
||||
Returns the group ID.
|
||||
"""
|
||||
import uuid
|
||||
|
||||
# Generate unique group ID to avoid conflicts
|
||||
groupid = f"testgroup_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Cleanup before (in case of previous failed run)
|
||||
try:
|
||||
await nc_client.groups.delete_group(groupid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Create the group
|
||||
await nc_client.groups.create_group(groupid)
|
||||
logger.debug(f"Created test group: {groupid}")
|
||||
|
||||
yield groupid
|
||||
|
||||
# Cleanup after test
|
||||
try:
|
||||
await nc_client.groups.delete_group(groupid)
|
||||
logger.debug(f"Cleaned up test group: {groupid}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cleanup test group {groupid}: {e}")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def test_user_in_group(nc_client: NextcloudClient, test_user, test_group):
|
||||
"""
|
||||
Fixture that creates a test user and adds them to a test group.
|
||||
|
||||
Returns a tuple of (user_config, groupid).
|
||||
"""
|
||||
user_config = test_user
|
||||
groupid = test_group
|
||||
|
||||
# Create the user
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Add user to group
|
||||
await nc_client.users.add_user_to_group(user_config["userid"], groupid)
|
||||
logger.debug(f"Added user {user_config['userid']} to group {groupid}")
|
||||
|
||||
yield (user_config, groupid)
|
||||
|
||||
@@ -0,0 +1,569 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from mcp import ClientSession
|
||||
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
# Stack MCP Tools Tests
|
||||
async def test_deck_stack_mcp_tools(
|
||||
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
|
||||
):
|
||||
"""Test complete deck stack operations via MCP tools."""
|
||||
board_id = temporary_board["id"]
|
||||
stack_title = f"MCP Test Stack {uuid.uuid4().hex[:8]}"
|
||||
stack_order = 1
|
||||
|
||||
# 1. Create stack via MCP tool
|
||||
logger.info(f"Creating stack via MCP: {stack_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_stack",
|
||||
{"board_id": board_id, "title": stack_title, "order": stack_order},
|
||||
)
|
||||
|
||||
assert create_result.isError is False, (
|
||||
f"MCP stack creation failed: {create_result.content}"
|
||||
)
|
||||
created_stack_response = json.loads(create_result.content[0].text)
|
||||
stack_id = created_stack_response["id"]
|
||||
assert created_stack_response["title"] == stack_title
|
||||
assert created_stack_response["order"] == stack_order
|
||||
logger.info(f"Stack created via MCP with ID: {stack_id}")
|
||||
|
||||
try:
|
||||
# 2. Get stack via MCP resource
|
||||
logger.info(f"Getting stack via MCP resource: {stack_id}")
|
||||
get_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
|
||||
)
|
||||
|
||||
assert len(get_result.contents) == 1, "Expected exactly one content item"
|
||||
get_stack_response = json.loads(get_result.contents[0].text)
|
||||
assert get_stack_response["title"] == stack_title
|
||||
logger.info("Stack retrieved via MCP resource successfully")
|
||||
|
||||
# 3. Update stack via MCP tool
|
||||
updated_title = f"Updated {stack_title}"
|
||||
updated_order = 2
|
||||
logger.info(f"Updating stack via MCP tool: {stack_id}")
|
||||
update_result = await nc_mcp_client.call_tool(
|
||||
"deck_update_stack",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": updated_title,
|
||||
"order": updated_order,
|
||||
},
|
||||
)
|
||||
|
||||
assert update_result.isError is False, (
|
||||
f"MCP stack update failed: {update_result.content}"
|
||||
)
|
||||
logger.info("Stack updated via MCP tool successfully")
|
||||
|
||||
# 4. Verify update via direct client
|
||||
updated_stack = await nc_client.deck.get_stack(board_id, stack_id)
|
||||
assert updated_stack.title == updated_title
|
||||
assert updated_stack.order == updated_order
|
||||
logger.info("Stack update verified via direct client")
|
||||
|
||||
# 5. List stacks via MCP resource
|
||||
logger.info("Listing stacks via MCP resource")
|
||||
list_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks"
|
||||
)
|
||||
|
||||
assert len(list_result.contents) == 1, "Expected exactly one content item"
|
||||
stacks_data = json.loads(list_result.contents[0].text)
|
||||
assert isinstance(stacks_data, list)
|
||||
|
||||
# Verify our stack is in the list
|
||||
stack_ids = [stack["id"] for stack in stacks_data]
|
||||
assert stack_id in stack_ids, "Updated stack not found in list"
|
||||
logger.info(f"Stack {stack_id} found in stacks list")
|
||||
|
||||
# 6. Read stack via MCP resource
|
||||
logger.info(f"Reading stack via MCP resource: {stack_id}")
|
||||
read_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
|
||||
)
|
||||
read_stack_data = json.loads(read_result.contents[0].text)
|
||||
assert read_stack_data["title"] == updated_title
|
||||
logger.info("Stack read via MCP resource successfully")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_stack(board_id, stack_id)
|
||||
logger.info(f"Cleaned up stack ID: {stack_id}")
|
||||
|
||||
|
||||
# Card MCP Tools Tests
|
||||
async def test_deck_card_mcp_tools(
|
||||
nc_mcp_client: ClientSession,
|
||||
nc_client: NextcloudClient,
|
||||
temporary_board_with_stack: tuple,
|
||||
):
|
||||
"""Test complete deck card operations via MCP tools."""
|
||||
board_data, stack_data = temporary_board_with_stack
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
card_title = f"MCP Test Card {uuid.uuid4().hex[:8]}"
|
||||
card_description = f"Test description for {card_title}"
|
||||
|
||||
# 1. Create card via MCP tool
|
||||
logger.info(f"Creating card via MCP: {card_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": card_title,
|
||||
"description": card_description,
|
||||
"type": "plain",
|
||||
"order": 1,
|
||||
},
|
||||
)
|
||||
|
||||
assert create_result.isError is False, (
|
||||
f"MCP card creation failed: {create_result.content}"
|
||||
)
|
||||
created_card_response = json.loads(create_result.content[0].text)
|
||||
card_id = created_card_response["id"]
|
||||
assert created_card_response["title"] == card_title
|
||||
assert created_card_response["description"] == card_description
|
||||
logger.info(f"Card created via MCP with ID: {card_id}")
|
||||
|
||||
try:
|
||||
# 2. Get card via MCP resource
|
||||
logger.info(f"Getting card via MCP resource: {card_id}")
|
||||
get_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
|
||||
)
|
||||
|
||||
assert len(get_result.contents) == 1, "Expected exactly one content item"
|
||||
get_card_response = json.loads(get_result.contents[0].text)
|
||||
assert get_card_response["title"] == card_title
|
||||
logger.info("Card retrieved via MCP resource successfully")
|
||||
|
||||
# 3. Update card via MCP tool
|
||||
updated_title = f"Updated {card_title}"
|
||||
updated_description = f"Updated description for {card_title}"
|
||||
logger.info(f"Updating card via MCP tool: {card_id}")
|
||||
update_result = await nc_mcp_client.call_tool(
|
||||
"deck_update_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"title": updated_title,
|
||||
"description": updated_description,
|
||||
},
|
||||
)
|
||||
|
||||
assert update_result.isError is False, (
|
||||
f"MCP card update failed: {update_result.content}"
|
||||
)
|
||||
logger.info("Card updated via MCP tool successfully")
|
||||
|
||||
# 4. Verify update via direct client
|
||||
updated_card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
assert updated_card.title == updated_title
|
||||
assert updated_card.description == updated_description
|
||||
logger.info("Card update verified via direct client")
|
||||
|
||||
# 5. Archive/unarchive card via MCP tools
|
||||
logger.info(f"Archiving card via MCP tool: {card_id}")
|
||||
archive_result = await nc_mcp_client.call_tool(
|
||||
"deck_archive_card",
|
||||
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
|
||||
)
|
||||
|
||||
assert archive_result.isError is False, (
|
||||
f"MCP card archive failed: {archive_result.content}"
|
||||
)
|
||||
logger.info("Card archived via MCP tool successfully")
|
||||
|
||||
logger.info(f"Unarchiving card via MCP tool: {card_id}")
|
||||
unarchive_result = await nc_mcp_client.call_tool(
|
||||
"deck_unarchive_card",
|
||||
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
|
||||
)
|
||||
|
||||
assert unarchive_result.isError is False, (
|
||||
f"MCP card unarchive failed: {unarchive_result.content}"
|
||||
)
|
||||
logger.info("Card unarchived via MCP tool successfully")
|
||||
|
||||
# 6. Move card to different position via MCP tool
|
||||
logger.info(f"Reordering card via MCP tool: {card_id}")
|
||||
reorder_result = await nc_mcp_client.call_tool(
|
||||
"deck_reorder_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"order": 10,
|
||||
"target_stack_id": stack_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert reorder_result.isError is False, (
|
||||
f"MCP card reorder failed: {reorder_result.content}"
|
||||
)
|
||||
logger.info("Card reordered via MCP tool successfully")
|
||||
|
||||
# 7. Read card via MCP resource
|
||||
logger.info(f"Reading card via MCP resource: {card_id}")
|
||||
read_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
|
||||
)
|
||||
read_card_data = json.loads(read_result.contents[0].text)
|
||||
assert read_card_data["title"] == updated_title
|
||||
logger.info("Card read via MCP resource successfully")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_card(board_id, stack_id, card_id)
|
||||
logger.info(f"Cleaned up card ID: {card_id}")
|
||||
|
||||
|
||||
# Label MCP Tools Tests
|
||||
async def test_deck_label_mcp_tools(
|
||||
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
|
||||
):
|
||||
"""Test complete deck label operations via MCP tools."""
|
||||
board_id = temporary_board["id"]
|
||||
label_title = f"MCP Test Label {uuid.uuid4().hex[:8]}"
|
||||
label_color = "FF0000" # Red
|
||||
|
||||
# 1. Create label via MCP tool
|
||||
logger.info(f"Creating label via MCP: {label_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_label",
|
||||
{"board_id": board_id, "title": label_title, "color": label_color},
|
||||
)
|
||||
|
||||
assert create_result.isError is False, (
|
||||
f"MCP label creation failed: {create_result.content}"
|
||||
)
|
||||
created_label_response = json.loads(create_result.content[0].text)
|
||||
label_id = created_label_response["id"]
|
||||
assert created_label_response["title"] == label_title
|
||||
assert created_label_response["color"] == label_color
|
||||
logger.info(f"Label created via MCP with ID: {label_id}")
|
||||
|
||||
try:
|
||||
# 2. Get label via MCP resource
|
||||
logger.info(f"Getting label via MCP resource: {label_id}")
|
||||
get_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/labels/{label_id}"
|
||||
)
|
||||
|
||||
assert len(get_result.contents) == 1, "Expected exactly one content item"
|
||||
get_label_response = json.loads(get_result.contents[0].text)
|
||||
assert get_label_response["title"] == label_title
|
||||
logger.info("Label retrieved via MCP resource successfully")
|
||||
|
||||
# 3. Update label via MCP tool
|
||||
updated_title = f"Updated {label_title}"
|
||||
updated_color = "00FF00" # Green
|
||||
logger.info(f"Updating label via MCP tool: {label_id}")
|
||||
update_result = await nc_mcp_client.call_tool(
|
||||
"deck_update_label",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"label_id": label_id,
|
||||
"title": updated_title,
|
||||
"color": updated_color,
|
||||
},
|
||||
)
|
||||
|
||||
assert update_result.isError is False, (
|
||||
f"MCP label update failed: {update_result.content}"
|
||||
)
|
||||
logger.info("Label updated via MCP tool successfully")
|
||||
|
||||
# 4. Verify update via direct client
|
||||
updated_label = await nc_client.deck.get_label(board_id, label_id)
|
||||
assert updated_label.title == updated_title
|
||||
assert updated_label.color == updated_color
|
||||
logger.info("Label update verified via direct client")
|
||||
|
||||
# 5. Read label via MCP resource
|
||||
logger.info(f"Reading label via MCP resource: {label_id}")
|
||||
read_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/labels/{label_id}"
|
||||
)
|
||||
read_label_data = json.loads(read_result.contents[0].text)
|
||||
assert read_label_data["title"] == updated_title
|
||||
logger.info("Label read via MCP resource successfully")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_label(board_id, label_id)
|
||||
logger.info(f"Cleaned up label ID: {label_id}")
|
||||
|
||||
|
||||
# Label-Card Assignment Tests
|
||||
async def test_deck_card_label_assignment_mcp_tools(
|
||||
nc_mcp_client: ClientSession,
|
||||
nc_client: NextcloudClient,
|
||||
temporary_board_with_card: tuple,
|
||||
):
|
||||
"""Test card-label assignment operations via MCP tools."""
|
||||
board_data, stack_data, card_data = temporary_board_with_card
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
card_id = card_data["id"]
|
||||
|
||||
# Create a label for assignment
|
||||
label = await nc_client.deck.create_label(
|
||||
board_id, "Assignment Test Label", "0000FF"
|
||||
)
|
||||
label_id = label.id
|
||||
|
||||
try:
|
||||
# 1. Assign label to card via MCP tool
|
||||
logger.info(f"Assigning label {label_id} to card {card_id} via MCP")
|
||||
assign_result = await nc_mcp_client.call_tool(
|
||||
"deck_assign_label_to_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"label_id": label_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert assign_result.isError is False, (
|
||||
f"MCP label assignment failed: {assign_result.content}"
|
||||
)
|
||||
logger.info("Label assigned to card via MCP tool successfully")
|
||||
|
||||
# 2. Verify assignment via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.labels:
|
||||
label_ids = [label.id for label in card.labels]
|
||||
assert label_id in label_ids, "Label not found in card labels"
|
||||
logger.info("Label assignment verified via direct client")
|
||||
|
||||
# 3. Remove label from card via MCP tool
|
||||
logger.info(f"Removing label {label_id} from card {card_id} via MCP")
|
||||
remove_result = await nc_mcp_client.call_tool(
|
||||
"deck_remove_label_from_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"label_id": label_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert remove_result.isError is False, (
|
||||
f"MCP label removal failed: {remove_result.content}"
|
||||
)
|
||||
logger.info("Label removed from card via MCP tool successfully")
|
||||
|
||||
# 4. Verify removal via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.labels:
|
||||
label_ids = [label.id for label in card.labels]
|
||||
assert label_id not in label_ids, (
|
||||
"Label still found in card labels after removal"
|
||||
)
|
||||
logger.info("Label removal verified via direct client")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_label(board_id, label_id)
|
||||
logger.info(f"Cleaned up label ID: {label_id}")
|
||||
|
||||
|
||||
# User Assignment Tests
|
||||
async def test_deck_card_user_assignment_mcp_tools(
|
||||
nc_mcp_client: ClientSession,
|
||||
nc_client: NextcloudClient,
|
||||
temporary_board_with_card: tuple,
|
||||
):
|
||||
"""Test card-user assignment operations via MCP tools."""
|
||||
board_data, stack_data, card_data = temporary_board_with_card
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
card_id = card_data["id"]
|
||||
|
||||
# Use the current user ID (admin in most test environments)
|
||||
user_id = "admin"
|
||||
|
||||
# 1. Assign user to card via MCP tool
|
||||
logger.info(f"Assigning user {user_id} to card {card_id} via MCP")
|
||||
assign_result = await nc_mcp_client.call_tool(
|
||||
"deck_assign_user_to_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert assign_result.isError is False, (
|
||||
f"MCP user assignment failed: {assign_result.content}"
|
||||
)
|
||||
logger.info("User assigned to card via MCP tool successfully")
|
||||
|
||||
# 2. Verify assignment via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.assignedUsers:
|
||||
user_ids = []
|
||||
for user in card.assignedUsers:
|
||||
if hasattr(user, "participant"):
|
||||
# It's a DeckAssignedUser with participant
|
||||
user_ids.append(user.participant.uid)
|
||||
elif hasattr(user, "uid"):
|
||||
# It's a direct DeckUser
|
||||
user_ids.append(user.uid)
|
||||
assert user_id in user_ids, "User not found in card assigned users"
|
||||
logger.info("User assignment verified via direct client")
|
||||
|
||||
# 3. Unassign user from card via MCP tool
|
||||
logger.info(f"Unassigning user {user_id} from card {card_id} via MCP")
|
||||
unassign_result = await nc_mcp_client.call_tool(
|
||||
"deck_unassign_user_from_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert unassign_result.isError is False, (
|
||||
f"MCP user unassignment failed: {unassign_result.content}"
|
||||
)
|
||||
logger.info("User unassigned from card via MCP tool successfully")
|
||||
|
||||
# 4. Verify unassignment via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.assignedUsers:
|
||||
user_ids = []
|
||||
for user in card.assignedUsers:
|
||||
if hasattr(user, "participant"):
|
||||
# It's a DeckAssignedUser with participant
|
||||
user_ids.append(user.participant.uid)
|
||||
elif hasattr(user, "uid"):
|
||||
# It's a direct DeckUser
|
||||
user_ids.append(user.uid)
|
||||
assert user_id not in user_ids, (
|
||||
"User still found in card assigned users after removal"
|
||||
)
|
||||
logger.info("User unassignment verified via direct client")
|
||||
|
||||
|
||||
# Error handling tests
|
||||
async def test_deck_mcp_tools_error_handling(nc_mcp_client: ClientSession):
|
||||
"""Test error handling for deck MCP tools with invalid parameters."""
|
||||
non_existent_id = 999999999
|
||||
|
||||
# Test stack operations with non-existent board
|
||||
stack_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_stack",
|
||||
{"board_id": non_existent_id, "title": "Should Fail", "order": 1},
|
||||
)
|
||||
assert stack_result.isError is True, (
|
||||
"Expected error for stack creation on non-existent board"
|
||||
)
|
||||
|
||||
# Test card operations with non-existent IDs
|
||||
card_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
{
|
||||
"board_id": non_existent_id,
|
||||
"stack_id": non_existent_id,
|
||||
"title": "Should Fail",
|
||||
"type": "plain",
|
||||
},
|
||||
)
|
||||
assert card_result.isError is True, (
|
||||
"Expected error for card creation with non-existent IDs"
|
||||
)
|
||||
|
||||
# Test label operations with non-existent board
|
||||
label_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_label",
|
||||
{"board_id": non_existent_id, "title": "Should Fail", "color": "FF0000"},
|
||||
)
|
||||
assert label_result.isError is True, (
|
||||
"Expected error for label creation on non-existent board"
|
||||
)
|
||||
|
||||
logger.info("Error handling tests passed for deck MCP tools")
|
||||
|
||||
|
||||
# Resource template tests
|
||||
async def test_deck_mcp_resource_templates(nc_mcp_client: ClientSession):
|
||||
"""Test deck MCP resource templates are properly registered."""
|
||||
templates = await nc_mcp_client.list_resource_templates()
|
||||
template_uris = [template.uriTemplate for template in templates.resourceTemplates]
|
||||
|
||||
expected_templates = [
|
||||
"nc://Deck/boards/{board_id}/stacks/{stack_id}",
|
||||
"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}",
|
||||
"nc://Deck/boards/{board_id}/labels/{label_id}",
|
||||
]
|
||||
|
||||
for expected_template in expected_templates:
|
||||
assert expected_template in template_uris, (
|
||||
f"Expected template '{expected_template}' not found"
|
||||
)
|
||||
logger.info(f"Found expected deck resource template: {expected_template}")
|
||||
|
||||
|
||||
# Listing resource tests
|
||||
async def test_deck_mcp_listing_resources(
|
||||
nc_mcp_client: ClientSession, temporary_board_with_card: tuple
|
||||
):
|
||||
"""Test deck MCP listing resources for stacks and cards."""
|
||||
board_data, stack_data, card_data = temporary_board_with_card
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
|
||||
# 1. Test listing stacks resource
|
||||
logger.info(f"Reading stacks list via MCP resource for board {board_id}")
|
||||
stacks_resource_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks"
|
||||
)
|
||||
stacks_resource_data = json.loads(stacks_resource_result.contents[0].text)
|
||||
assert isinstance(stacks_resource_data, list)
|
||||
|
||||
# Verify our stack is in the resource list
|
||||
stack_ids = [stack["id"] for stack in stacks_resource_data]
|
||||
assert stack_id in stack_ids, "Stack not found in stacks resource list"
|
||||
logger.info("Stack found in stacks resource list")
|
||||
|
||||
# 2. Test listing cards resource
|
||||
logger.info(f"Reading cards list via MCP resource for stack {stack_id}")
|
||||
cards_resource_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards"
|
||||
)
|
||||
cards_resource_data = json.loads(cards_resource_result.contents[0].text)
|
||||
assert isinstance(cards_resource_data, list)
|
||||
|
||||
# Verify our card is in the resource list
|
||||
card_ids = [card["id"] for card in cards_resource_data]
|
||||
assert card_data["id"] in card_ids, "Card not found in cards resource list"
|
||||
logger.info("Card found in cards resource list")
|
||||
|
||||
# 3. Test listing labels resource
|
||||
logger.info(f"Reading labels list via MCP resource for board {board_id}")
|
||||
labels_resource_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/labels"
|
||||
)
|
||||
labels_resource_data = json.loads(labels_resource_result.contents[0].text)
|
||||
assert isinstance(labels_resource_data, list)
|
||||
logger.info("Labels resource read successfully")
|
||||
@@ -0,0 +1,358 @@
|
||||
"""
|
||||
Multi-user OAuth tests for Nextcloud Deck board permissions.
|
||||
|
||||
Tests verify that the MCP server respects Nextcloud Deck board ACL permissions
|
||||
when accessed via OAuth authentication with different users.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
|
||||
|
||||
|
||||
async def add_board_acl(nc_client, board_id: int, user: str, permission_type: int = 0):
|
||||
"""
|
||||
Helper to add ACL entry to a Deck board.
|
||||
|
||||
Args:
|
||||
nc_client: Admin NextcloudClient
|
||||
board_id: Board ID
|
||||
user: Username to grant access
|
||||
permission_type: 0=view, 1=edit, 2=manage
|
||||
|
||||
Returns:
|
||||
ACL entry ID
|
||||
"""
|
||||
acl = await nc_client.deck.add_acl_rule(
|
||||
board_id=board_id,
|
||||
type=0, # 0 = user, 1 = group
|
||||
participant=user,
|
||||
permission_edit=permission_type >= 1,
|
||||
permission_share=permission_type >= 2,
|
||||
permission_manage=permission_type >= 2,
|
||||
)
|
||||
logger.info(f"Added ACL for board {board_id}: {user} (type={permission_type})")
|
||||
return acl.id
|
||||
|
||||
|
||||
async def delete_board_acl(nc_client, board_id: int, acl_id: int):
|
||||
"""Helper to delete a board ACL entry."""
|
||||
await nc_client.deck.delete_acl_rule(board_id, acl_id)
|
||||
logger.info(f"Deleted ACL {acl_id} from board {board_id}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_board_view_permissions(
|
||||
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that Deck boards respect view permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice
|
||||
2. Admin adds bob to board with view-only permissions
|
||||
3. Bob can view the board via MCP tools
|
||||
4. Diana cannot access the board (no ACL entry)
|
||||
"""
|
||||
# Create a board as alice
|
||||
logger.info("Creating Deck board as alice...")
|
||||
board = await nc_client.deck.create_board(
|
||||
"Alice's Shared Board - View Test", "FF0000"
|
||||
)
|
||||
board_id = board.id
|
||||
|
||||
bob_acl_id = None
|
||||
|
||||
try:
|
||||
# Add bob to board with view-only permission
|
||||
logger.info("Adding bob to board with view permission...")
|
||||
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
|
||||
|
||||
# Test: Bob can view the board via MCP
|
||||
logger.info("Bob attempting to list boards via MCP...")
|
||||
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Bob can see {len(response_data)} boards: {board_ids}")
|
||||
|
||||
# Bob should see the shared board
|
||||
if board_id in board_ids:
|
||||
logger.info(f"Bob can see shared board {board_id}")
|
||||
else:
|
||||
logger.warning(f"Bob cannot see shared board {board_id}")
|
||||
else:
|
||||
logger.warning(f"Bob could not list boards: {result.content}")
|
||||
|
||||
# Test: Diana cannot see the board
|
||||
logger.info("Diana attempting to list boards via MCP...")
|
||||
result = await diana_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Diana can see {len(response_data)} boards")
|
||||
|
||||
# Diana should NOT see the board
|
||||
assert board_id not in board_ids, "Diana should not see board without ACL"
|
||||
logger.info("Diana correctly cannot see board without ACL")
|
||||
else:
|
||||
logger.warning(f"Diana could not list boards: {result.content}")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if bob_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, bob_acl_id)
|
||||
logger.info(f"Deleting board {board_id}")
|
||||
await nc_client.deck.delete_board(board_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_board_edit_permissions(
|
||||
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that Deck boards respect edit permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice with a stack
|
||||
2. Admin adds charlie with edit permission
|
||||
3. Admin adds bob with view-only permission
|
||||
4. Charlie can create cards via MCP tools
|
||||
5. Bob cannot create cards
|
||||
"""
|
||||
# Create a board as alice
|
||||
logger.info("Creating Deck board as alice...")
|
||||
board = await nc_client.deck.create_board(
|
||||
"Alice's Shared Board - Edit Test", "00FF00"
|
||||
)
|
||||
board_id = board.id
|
||||
|
||||
# Create a stack in the board
|
||||
logger.info("Creating stack in board...")
|
||||
stack = await nc_client.deck.create_stack(board_id, "Test Stack", 1)
|
||||
stack_id = stack.id
|
||||
|
||||
charlie_acl_id = None
|
||||
bob_acl_id = None
|
||||
|
||||
try:
|
||||
# Add charlie with edit permission
|
||||
logger.info("Adding charlie to board with edit permission...")
|
||||
charlie_acl_id = await add_board_acl(
|
||||
nc_client, board_id, "charlie", permission_type=1
|
||||
)
|
||||
|
||||
# Add bob with view-only permission
|
||||
logger.info("Adding bob to board with view permission...")
|
||||
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
|
||||
|
||||
# Test: Charlie can create a card
|
||||
logger.info("Charlie attempting to create card via MCP...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
arguments={
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": "Charlie's Card",
|
||||
"description": "Created by Charlie with edit permission",
|
||||
},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
card_id = response_data.get("id")
|
||||
logger.info(f"Charlie successfully created card {card_id}")
|
||||
|
||||
# Cleanup the card
|
||||
await nc_client.deck.delete_card(board_id, stack_id, card_id)
|
||||
else:
|
||||
logger.warning(f"Charlie could not create card: {result.content}")
|
||||
|
||||
# Test: Bob attempts to create a card (should fail)
|
||||
logger.info("Bob attempting to create card via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
arguments={
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": "Bob's Card",
|
||||
"description": "Bob trying to create a card",
|
||||
},
|
||||
)
|
||||
|
||||
if result.isError:
|
||||
logger.info("Bob correctly denied card creation (view-only)")
|
||||
else:
|
||||
logger.warning("Bob unexpectedly succeeded in creating card")
|
||||
# Cleanup if bob somehow created a card
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if "id" in response_data:
|
||||
await nc_client.deck.delete_card(
|
||||
board_id, stack_id, response_data["id"]
|
||||
)
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if charlie_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, charlie_acl_id)
|
||||
if bob_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, bob_acl_id)
|
||||
logger.info(f"Deleting board {board_id}")
|
||||
await nc_client.deck.delete_board(board_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_board_manage_permissions(
|
||||
nc_client, alice_mcp_client, charlie_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that Deck boards respect manage permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice
|
||||
2. Admin adds charlie with manage permission
|
||||
3. Charlie can create stacks and modify board settings
|
||||
"""
|
||||
# Create a board as alice
|
||||
logger.info("Creating Deck board as alice...")
|
||||
board = await nc_client.deck.create_board(
|
||||
"Alice's Shared Board - Manage Test", "0000FF"
|
||||
)
|
||||
board_id = board.id
|
||||
|
||||
charlie_acl_id = None
|
||||
|
||||
try:
|
||||
# Add charlie with manage permission
|
||||
logger.info("Adding charlie to board with manage permission...")
|
||||
charlie_acl_id = await add_board_acl(
|
||||
nc_client, board_id, "charlie", permission_type=2
|
||||
)
|
||||
|
||||
# Test: Charlie can create a stack
|
||||
logger.info("Charlie attempting to create stack via MCP...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"deck_create_stack",
|
||||
arguments={"board_id": board_id, "title": "Charlie's Stack", "order": 1},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
stack_id = response_data.get("id")
|
||||
logger.info(f"Charlie successfully created stack {stack_id}")
|
||||
|
||||
# Cleanup the stack
|
||||
await nc_client.deck.delete_stack(board_id, stack_id)
|
||||
else:
|
||||
logger.warning(f"Charlie could not create stack: {result.content}")
|
||||
|
||||
# Test: Charlie can delete a stack (manage permission)
|
||||
logger.info("Charlie attempting to delete stack via MCP...")
|
||||
# First create a temporary stack to delete
|
||||
temp_stack = await nc_client.deck.create_stack(
|
||||
board_id, "Temp Stack for Deletion", 99
|
||||
)
|
||||
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"deck_delete_stack",
|
||||
arguments={"board_id": board_id, "stack_id": temp_stack.id},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
logger.info("Charlie successfully deleted stack")
|
||||
else:
|
||||
logger.warning(f"Charlie could not delete stack: {result.content}")
|
||||
# Cleanup if deletion via MCP failed
|
||||
try:
|
||||
await nc_client.deck.delete_stack(board_id, temp_stack.id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if charlie_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, charlie_acl_id)
|
||||
logger.info(f"Deleting board {board_id}")
|
||||
await nc_client.deck.delete_board(board_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_user_isolation(nc_client, alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that users can only see their own boards when not shared.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice (not shared)
|
||||
2. Admin creates a board as bob (not shared)
|
||||
3. Alice can only see her own board
|
||||
4. Bob can only see his own board
|
||||
"""
|
||||
# Create alice's board
|
||||
logger.info("Creating alice's private board...")
|
||||
alice_board = await nc_client.deck.create_board("Alice's Private Board", "FF00FF")
|
||||
alice_board_id = alice_board.id
|
||||
|
||||
# Create bob's board
|
||||
logger.info("Creating bob's private board...")
|
||||
bob_board = await nc_client.deck.create_board("Bob's Private Board", "00FFFF")
|
||||
bob_board_id = bob_board.id
|
||||
|
||||
try:
|
||||
# Test: Alice lists boards
|
||||
logger.info("Alice listing boards via MCP...")
|
||||
result = await alice_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Alice can see boards: {board_ids}")
|
||||
|
||||
# Alice should NOT see Bob's board
|
||||
assert bob_board_id not in board_ids, (
|
||||
"Alice should not see Bob's private board"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Alice could not list boards: {result.content}")
|
||||
|
||||
# Test: Bob lists boards
|
||||
logger.info("Bob listing boards via MCP...")
|
||||
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Bob can see boards: {board_ids}")
|
||||
|
||||
# Bob should NOT see Alice's board
|
||||
assert alice_board_id not in board_ids, (
|
||||
"Bob should not see Alice's private board"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Bob could not list boards: {result.content}")
|
||||
|
||||
logger.info("User isolation test passed: users can only see their own boards")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info("Cleaning up test boards...")
|
||||
await nc_client.deck.delete_board(alice_board_id)
|
||||
await nc_client.deck.delete_board(bob_board_id)
|
||||
@@ -0,0 +1,425 @@
|
||||
"""
|
||||
Multi-user OAuth tests for Nextcloud WebDAV file permissions.
|
||||
|
||||
Tests verify that the MCP server respects Nextcloud file sharing permissions
|
||||
when accessed via OAuth authentication with different users.
|
||||
|
||||
All operations (file creation, sharing, access) are performed through MCP tools
|
||||
to ensure the MCP server properly supports multi-user scenarios.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_share_read_permissions(
|
||||
alice_mcp_client, bob_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared files respect read permissions.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates a file via MCP
|
||||
2. Alice shares the file with Bob (read-only) via MCP
|
||||
3. Bob can read the file via MCP tools
|
||||
4. Diana cannot access the file (no share)
|
||||
"""
|
||||
file_path = "/alice_shared_file_read.txt"
|
||||
file_content = "This file is shared with Bob for reading only."
|
||||
|
||||
# Alice creates a file
|
||||
logger.info(f"Alice creating file: {file_path}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": file_content},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares the file with bob (read-only, permissions=1)
|
||||
logger.info("Alice sharing file with bob (read-only)...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": file_path,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create share: {result.content}"
|
||||
share_data = json.loads(result.content[0].text)
|
||||
share_id = share_data["id"]
|
||||
logger.info(f"Created share {share_id}")
|
||||
|
||||
# Test: Bob reads the file via MCP
|
||||
logger.info("Bob attempting to read file via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_read_file", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
# Bob should be able to read the shared file
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
logger.info(
|
||||
f"Bob successfully read file: {response_data.get('content', '')[:50]}..."
|
||||
)
|
||||
assert "content" in response_data
|
||||
assert file_content in response_data["content"]
|
||||
else:
|
||||
logger.warning(f"Bob could not read file: {result.content}")
|
||||
# This might fail if the share path is different for bob
|
||||
|
||||
# Test: Diana attempts to read the file
|
||||
logger.info("Diana attempting to read file via MCP...")
|
||||
result = await diana_mcp_client.call_tool(
|
||||
"nc_webdav_read_file", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
# Diana should NOT be able to read (no share)
|
||||
if result.isError:
|
||||
logger.info("Diana correctly denied access to unshared file")
|
||||
else:
|
||||
logger.warning("Diana unexpectedly could read unshared file")
|
||||
|
||||
finally:
|
||||
# Cleanup - Alice deletes the share and file
|
||||
if share_id:
|
||||
logger.info(f"Alice deleting share {share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": share_id}
|
||||
)
|
||||
logger.info(f"Alice deleting file {file_path}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_share_write_permissions(
|
||||
alice_mcp_client, charlie_mcp_client, bob_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared files respect write permissions.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates a file via MCP
|
||||
2. Alice shares the file with Charlie (edit permission) via MCP
|
||||
3. Alice shares the file with Bob (read-only) via MCP
|
||||
4. Charlie can edit the file via MCP tools
|
||||
5. Bob cannot edit the file
|
||||
"""
|
||||
file_path = "/alice_shared_file_write.txt"
|
||||
file_content = "This file is shared with Charlie for editing."
|
||||
|
||||
logger.info(f"Alice creating file: {file_path}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": file_content},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
charlie_share_id = None
|
||||
bob_share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares with Charlie (read+write, permissions=3)
|
||||
logger.info("Alice sharing file with Charlie (edit permission)...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": file_path,
|
||||
"share_with": "charlie",
|
||||
"share_type": 0,
|
||||
"permissions": 3,
|
||||
},
|
||||
)
|
||||
assert not result.isError, (
|
||||
f"Alice failed to share with Charlie: {result.content}"
|
||||
)
|
||||
charlie_share_data = json.loads(result.content[0].text)
|
||||
charlie_share_id = charlie_share_data["id"]
|
||||
logger.info(f"Created share {charlie_share_id} for Charlie")
|
||||
|
||||
# Alice shares with Bob (read-only, permissions=1)
|
||||
logger.info("Alice sharing file with Bob (read-only)...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": file_path,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to share with Bob: {result.content}"
|
||||
bob_share_data = json.loads(result.content[0].text)
|
||||
bob_share_id = bob_share_data["id"]
|
||||
logger.info(f"Created share {bob_share_id} for Bob")
|
||||
|
||||
# Test: Charlie can write to the file
|
||||
logger.info("Charlie attempting to write to file via MCP...")
|
||||
updated_content = f"{file_content}\nCharlie added this line."
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": updated_content},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
logger.info("Charlie successfully wrote to file")
|
||||
else:
|
||||
logger.warning(f"Charlie could not write to file: {result.content}")
|
||||
|
||||
# Test: Bob attempts to write (should fail)
|
||||
logger.info("Bob attempting to write to file via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": "Bob tries to overwrite this."},
|
||||
)
|
||||
|
||||
# Bob should be denied
|
||||
if result.isError:
|
||||
logger.info("Bob correctly denied write access")
|
||||
else:
|
||||
logger.warning("Bob unexpectedly succeeded in writing (permissions issue?)")
|
||||
|
||||
finally:
|
||||
# Cleanup - Alice deletes shares and file
|
||||
if charlie_share_id:
|
||||
logger.info(f"Alice deleting Charlie's share {charlie_share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": charlie_share_id}
|
||||
)
|
||||
if bob_share_id:
|
||||
logger.info(f"Alice deleting Bob's share {bob_share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": bob_share_id}
|
||||
)
|
||||
logger.info(f"Alice deleting file {file_path}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_list_permissions(alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that file listing respects share permissions.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates her private file via MCP
|
||||
2. Bob creates his private file via MCP
|
||||
3. Alice creates a file and shares it with Bob via MCP
|
||||
4. Alice can list her own files + shared files
|
||||
5. Bob can list his own files + shared files from Alice
|
||||
"""
|
||||
alice_file = "/alice_private_file.txt"
|
||||
bob_file = "/bob_private_file.txt"
|
||||
shared_file = "/alice_shared_with_bob.txt"
|
||||
|
||||
# Alice creates her private file
|
||||
logger.info(f"Alice creating private file: {alice_file}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": alice_file, "content": "Alice's private file"},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
# Bob creates his private file
|
||||
logger.info(f"Bob creating private file: {bob_file}")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": bob_file, "content": "Bob's private file"},
|
||||
)
|
||||
assert not result.isError, f"Bob failed to create file: {result.content}"
|
||||
|
||||
# Alice creates a shared file
|
||||
logger.info(f"Alice creating shared file: {shared_file}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": shared_file, "content": "Shared file content"},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create shared file: {result.content}"
|
||||
|
||||
share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares the file with Bob
|
||||
logger.info("Alice sharing file with Bob...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": shared_file,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create share: {result.content}"
|
||||
share_data = json.loads(result.content[0].text)
|
||||
share_id = share_data["id"]
|
||||
|
||||
# Test: Alice lists files in root
|
||||
logger.info("Alice listing files via MCP...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_list_directory", arguments={"path": "/"}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
file_names = [f["name"] for f in response_data]
|
||||
logger.info(f"Alice can see files: {file_names}")
|
||||
|
||||
# Alice should see her own files
|
||||
# Note: Exact assertions depend on test isolation
|
||||
else:
|
||||
logger.warning(f"Alice could not list files: {result.content}")
|
||||
|
||||
# Test: Bob lists files in root
|
||||
logger.info("Bob listing files via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_list_directory", arguments={"path": "/"}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
file_names = [f["name"] for f in response_data]
|
||||
logger.info(f"Bob can see files: {file_names}")
|
||||
|
||||
# Bob should see his own file, but not Alice's private file
|
||||
# Bob may see shared files in his shared folder or via different path
|
||||
else:
|
||||
logger.warning(f"Bob could not list files: {result.content}")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
logger.info(f"Alice deleting share {share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": share_id}
|
||||
)
|
||||
|
||||
logger.info("Cleaning up Alice's files...")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": alice_file}
|
||||
)
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": shared_file}
|
||||
)
|
||||
|
||||
logger.info("Cleaning up Bob's files...")
|
||||
await bob_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": bob_file}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_folder_share_permissions(alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that folder sharing works correctly.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates a folder via MCP
|
||||
2. Alice creates files in the folder via MCP
|
||||
3. Alice shares the folder with Bob via MCP
|
||||
4. Bob can access files in the shared folder via MCP
|
||||
"""
|
||||
folder_path = "/alice_shared_folder"
|
||||
file_in_folder = f"{folder_path}/document.txt"
|
||||
file_content = "This is a document in Alice's shared folder"
|
||||
|
||||
# Alice creates folder
|
||||
logger.info(f"Alice creating folder: {folder_path}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_create_directory", arguments={"path": folder_path}
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create folder: {result.content}"
|
||||
|
||||
# Alice creates file in folder
|
||||
logger.info(f"Alice creating file in folder: {file_in_folder}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_in_folder, "content": file_content},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares the folder with Bob
|
||||
logger.info("Alice sharing folder with Bob...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": folder_path,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create share: {result.content}"
|
||||
share_data = json.loads(result.content[0].text)
|
||||
share_id = share_data["id"]
|
||||
logger.info(f"Created folder share {share_id}")
|
||||
|
||||
# Test: Bob lists the shared folder
|
||||
logger.info("Bob attempting to list shared folder via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_list_directory", arguments={"path": folder_path}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
logger.info(f"Bob can see {len(response_data)} files in shared folder")
|
||||
|
||||
# Bob should see the file in the shared folder
|
||||
file_names = [f["name"] for f in response_data]
|
||||
assert "document.txt" in file_names, (
|
||||
"Bob should see the file in shared folder"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Bob could not list shared folder: {result.content}")
|
||||
|
||||
# Test: Bob reads the file in the shared folder
|
||||
logger.info("Bob attempting to read file in shared folder via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_read_file", arguments={"path": file_in_folder}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
logger.info("Bob successfully read file in shared folder")
|
||||
assert "content" in response_data
|
||||
assert file_content in response_data["content"]
|
||||
else:
|
||||
logger.warning(
|
||||
f"Bob could not read file in shared folder: {result.content}"
|
||||
)
|
||||
|
||||
finally:
|
||||
# Cleanup - Alice deletes the share and folder
|
||||
if share_id:
|
||||
logger.info(f"Alice deleting share {share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": share_id}
|
||||
)
|
||||
|
||||
logger.info("Alice cleaning up test folder...")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": folder_path}
|
||||
)
|
||||
@@ -0,0 +1,260 @@
|
||||
"""
|
||||
Multi-user OAuth tests for Nextcloud Notes permissions.
|
||||
|
||||
Tests verify that the MCP server respects Nextcloud Notes sharing permissions
|
||||
when accessed via OAuth authentication with different users.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notes_share_read_permissions(
|
||||
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared notes respect read permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a note as alice
|
||||
2. Admin shares the note with bob (read-only)
|
||||
3. Bob can read the note via MCP tools
|
||||
4. Diana cannot access the note (no share)
|
||||
"""
|
||||
# Create a note as alice (using admin client to set up data)
|
||||
note_title = "Alice's Shared Note - Read Test"
|
||||
note_content = "This note is shared with Bob for reading only."
|
||||
note_category = "SharedNotes"
|
||||
|
||||
logger.info("Creating note as alice...")
|
||||
created_note = await nc_client.notes.create_note(
|
||||
title=note_title, content=note_content, category=note_category
|
||||
)
|
||||
note_id = created_note.get("id")
|
||||
|
||||
try:
|
||||
# TODO: Share the note with bob (read-only)
|
||||
# Note: Nextcloud Notes API doesn't have direct sharing endpoints
|
||||
# Sharing is typically done at the folder level via WebDAV
|
||||
# For now, this test documents the expected behavior
|
||||
|
||||
# Test: Bob searches for notes via MCP
|
||||
logger.info("Bob searching for notes via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
|
||||
)
|
||||
|
||||
assert result.isError is False, f"Bob's search failed: {result.content}"
|
||||
response_data = json.loads(result.content[0].text)
|
||||
|
||||
# Bob should see the shared note in search results
|
||||
# (assuming proper share setup)
|
||||
assert "results" in response_data
|
||||
logger.info(f"Bob found {len(response_data['results'])} notes")
|
||||
|
||||
# Test: Diana searches for the same note
|
||||
logger.info("Diana searching for notes via MCP...")
|
||||
result = await diana_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
|
||||
)
|
||||
|
||||
assert result.isError is False
|
||||
response_data = json.loads(result.content[0].text)
|
||||
|
||||
# Diana should NOT see the note (no share)
|
||||
assert "results" in response_data
|
||||
shared_note_ids = [
|
||||
n["id"] for n in response_data["results"] if n["id"] == note_id
|
||||
]
|
||||
assert len(shared_note_ids) == 0, "Diana should not see unshared note"
|
||||
logger.info("Diana correctly cannot see unshared note")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info(f"Cleaning up note {note_id}")
|
||||
await nc_client.notes.delete_note(note_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notes_share_write_permissions(
|
||||
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared notes respect write permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a note as alice
|
||||
2. Admin shares the note with charlie (edit permission)
|
||||
3. Admin shares the note with bob (read-only)
|
||||
4. Charlie can edit the note via MCP tools
|
||||
5. Bob cannot edit the note
|
||||
"""
|
||||
# Create a note as alice
|
||||
note_title = "Alice's Shared Note - Write Test"
|
||||
note_content = "This note is shared with Charlie for editing."
|
||||
note_category = "SharedNotes"
|
||||
|
||||
logger.info("Creating note as alice...")
|
||||
created_note = await nc_client.notes.create_note(
|
||||
title=note_title, content=note_content, category=note_category
|
||||
)
|
||||
note_id = created_note.get("id")
|
||||
|
||||
try:
|
||||
# TODO: Share the note with charlie (edit permission) and bob (read-only)
|
||||
# Note: Nextcloud Notes sharing is folder-based
|
||||
|
||||
# Test: Charlie can append content to the note
|
||||
logger.info("Charlie attempting to append content via MCP...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"nc_notes_append_content",
|
||||
arguments={
|
||||
"note_id": note_id,
|
||||
"content": "\n\nCharlie added this content.",
|
||||
},
|
||||
)
|
||||
|
||||
# If sharing is properly configured, Charlie should succeed
|
||||
# Without proper sharing setup, this will fail
|
||||
logger.info(f"Charlie's append result: isError={result.isError}")
|
||||
if not result.isError:
|
||||
logger.info("Charlie successfully appended content (shares configured)")
|
||||
else:
|
||||
logger.warning("Charlie could not append (shares not yet configured)")
|
||||
|
||||
# Test: Bob attempts to append content (should fail)
|
||||
logger.info("Bob attempting to append content via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_append_content",
|
||||
arguments={"note_id": note_id, "content": "\n\nBob tried to add this."},
|
||||
)
|
||||
|
||||
# Bob should fail (read-only access)
|
||||
logger.info(f"Bob's append result: isError={result.isError}")
|
||||
if result.isError:
|
||||
logger.info("Bob correctly denied write access")
|
||||
else:
|
||||
logger.warning("Bob unexpectedly succeeded (permissions issue?)")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info(f"Cleaning up note {note_id}")
|
||||
await nc_client.notes.delete_note(note_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_isolation_notes(nc_client, alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that users can only see their own notes when not shared.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a note as alice (not shared)
|
||||
2. Admin creates a note as bob (not shared)
|
||||
3. Alice can only see her own note
|
||||
4. Bob can only see his own note
|
||||
"""
|
||||
# Create alice's note
|
||||
logger.info("Creating alice's private note...")
|
||||
alice_note = await nc_client.notes.create_note(
|
||||
title="Alice's Private Note",
|
||||
content="This is Alice's private content.",
|
||||
category="AlicePrivate",
|
||||
)
|
||||
alice_note_id = alice_note.get("id")
|
||||
|
||||
# Create bob's note
|
||||
logger.info("Creating bob's private note...")
|
||||
bob_note = await nc_client.notes.create_note(
|
||||
title="Bob's Private Note",
|
||||
content="This is Bob's private content.",
|
||||
category="BobPrivate",
|
||||
)
|
||||
bob_note_id = bob_note.get("id")
|
||||
|
||||
try:
|
||||
# Test: Alice searches all notes
|
||||
logger.info("Alice searching all notes via MCP...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
|
||||
assert result.isError is False
|
||||
response_data = json.loads(result.content[0].text)
|
||||
alice_notes = response_data.get("results", [])
|
||||
alice_note_ids = [n["id"] for n in alice_notes]
|
||||
|
||||
logger.info(f"Alice can see {len(alice_notes)} notes")
|
||||
# Alice should NOT see Bob's note
|
||||
assert bob_note_id not in alice_note_ids, (
|
||||
"Alice should not see Bob's private note"
|
||||
)
|
||||
|
||||
# Test: Bob searches all notes
|
||||
logger.info("Bob searching all notes via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
|
||||
assert result.isError is False
|
||||
response_data = json.loads(result.content[0].text)
|
||||
bob_notes = response_data.get("results", [])
|
||||
bob_note_ids = [n["id"] for n in bob_notes]
|
||||
|
||||
logger.info(f"Bob can see {len(bob_notes)} notes")
|
||||
# Bob should NOT see Alice's note
|
||||
assert alice_note_id not in bob_note_ids, (
|
||||
"Bob should not see Alice's private note"
|
||||
)
|
||||
|
||||
logger.info("User isolation test passed: users can only see their own notes")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info("Cleaning up test notes...")
|
||||
await nc_client.notes.delete_note(alice_note_id)
|
||||
await nc_client.notes.delete_note(bob_note_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_oauth_mcp_clients_initialized(
|
||||
alice_mcp_client, bob_mcp_client, charlie_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Smoke test to verify all OAuth MCP clients are properly initialized.
|
||||
"""
|
||||
logger.info("Testing alice_mcp_client initialization...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Alice MCP client failed: {result.content}"
|
||||
logger.info("Alice MCP client working")
|
||||
|
||||
logger.info("Testing bob_mcp_client initialization...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Bob MCP client failed: {result.content}"
|
||||
logger.info("Bob MCP client working")
|
||||
|
||||
logger.info("Testing charlie_mcp_client initialization...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Charlie MCP client failed: {result.content}"
|
||||
logger.info("Charlie MCP client working")
|
||||
|
||||
logger.info("Testing diana_mcp_client initialization...")
|
||||
result = await diana_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Diana MCP client failed: {result.content}"
|
||||
logger.info("Diana MCP client working")
|
||||
|
||||
logger.info("All OAuth MCP clients successfully initialized!")
|
||||
@@ -0,0 +1,108 @@
|
||||
import pytest
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_and_delete_user(nc_client: NextcloudClient, test_user):
|
||||
"""Test creating a user and verifying deletion (cleanup by fixture)."""
|
||||
user_config = test_user
|
||||
|
||||
# Create user
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Verify user exists
|
||||
users = await nc_client.users.search_users(search=user_config["userid"])
|
||||
assert user_config["userid"] in users
|
||||
|
||||
user_details = await nc_client.users.get_user_details(user_config["userid"])
|
||||
assert user_details.id == user_config["userid"]
|
||||
assert user_details.displayname == user_config["display_name"]
|
||||
assert user_details.email == user_config["email"]
|
||||
|
||||
# Test deletion explicitly as part of test functionality
|
||||
await nc_client.users.delete_user(user_config["userid"])
|
||||
|
||||
# Verify user is deleted
|
||||
users = await nc_client.users.search_users(search=user_config["userid"])
|
||||
assert user_config["userid"] not in users
|
||||
# Note: Fixture cleanup will also try to delete but handle 404 gracefully
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_field(nc_client: NextcloudClient, test_user):
|
||||
"""Test updating user fields."""
|
||||
user_config = test_user
|
||||
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
new_email = f"new.{user_config['email']}"
|
||||
await nc_client.users.update_user_field(user_config["userid"], "email", new_email)
|
||||
|
||||
user_details = await nc_client.users.get_user_details(user_config["userid"])
|
||||
assert user_details.email == new_email
|
||||
# Fixture will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_groups(nc_client: NextcloudClient, test_user_in_group):
|
||||
"""Test adding and removing users from groups."""
|
||||
user_config, groupid = test_user_in_group
|
||||
userid = user_config["userid"]
|
||||
|
||||
# Verify user is in group
|
||||
groups = await nc_client.users.get_user_groups(userid)
|
||||
assert groupid in groups
|
||||
|
||||
# Remove user from group
|
||||
await nc_client.users.remove_user_from_group(userid, groupid)
|
||||
groups = await nc_client.users.get_user_groups(userid)
|
||||
assert groupid not in groups
|
||||
# Fixtures will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_subadmins(nc_client: NextcloudClient, test_user, test_group):
|
||||
"""Test promoting and demoting subadmins."""
|
||||
user_config = test_user
|
||||
groupid = test_group
|
||||
userid = user_config["userid"]
|
||||
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Promote to subadmin
|
||||
await nc_client.users.promote_user_to_subadmin(userid, groupid)
|
||||
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
|
||||
assert groupid in subadmin_groups
|
||||
|
||||
# Demote from subadmin
|
||||
await nc_client.users.demote_user_from_subadmin(userid, groupid)
|
||||
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
|
||||
assert groupid not in subadmin_groups
|
||||
# Fixtures will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disable_enable_user(nc_client: NextcloudClient, test_user):
|
||||
"""Test disabling and enabling users."""
|
||||
user_config = test_user
|
||||
userid = user_config["userid"]
|
||||
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Disable user
|
||||
await nc_client.users.disable_user(userid)
|
||||
user_details = await nc_client.users.get_user_details(userid)
|
||||
assert not user_details.enabled
|
||||
|
||||
# Enable user
|
||||
await nc_client.users.enable_user(userid)
|
||||
user_details = await nc_client.users.get_user_details(userid)
|
||||
assert user_details.enabled
|
||||
# Fixture will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_editable_user_fields(nc_client: NextcloudClient):
|
||||
editable_fields = await nc_client.users.get_editable_user_fields()
|
||||
assert "displayname" in editable_fields
|
||||
assert "email" in editable_fields
|
||||
+674
@@ -0,0 +1,674 @@
|
||||
=========================
|
||||
Instruction set for users
|
||||
=========================
|
||||
|
||||
Add a new user
|
||||
--------------
|
||||
|
||||
Create a new user on the Nextcloud server. Authentication is done by sending a
|
||||
basic HTTP authentication header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users**
|
||||
|
||||
* HTTP method: POST
|
||||
* POST argument: userid - string, the required username for the new user
|
||||
* POST argument: password - string, the password for the new user, leave empty to send welcome mail
|
||||
* POST argument: displayName - string, the display name for the new user
|
||||
* POST argument: email - string, the email for the new user, required if password empty
|
||||
* POST argument: groups - array, the groups for the new user
|
||||
* POST argument: subadmin - array, the groups in which the new user is subadmin
|
||||
* POST argument: quota - string, quota for the new user
|
||||
* POST argument: language - string, language for the new user
|
||||
|
||||
Status codes:
|
||||
|
||||
* 101 - invalid argument
|
||||
* 102 - user already exists
|
||||
* 103 - cannot create sub-admins for admin group
|
||||
* 104 - group does not exist
|
||||
* 105 - insufficient privileges for group
|
||||
* 106 - no group specified (required for sub-admins)
|
||||
* 107 - hint exceptions
|
||||
* 108 - an email address is required, to send a password link to the user.
|
||||
* 109 - sub-admin group does not exist
|
||||
* 110 - required email address was not provided
|
||||
* 111 - could not create non-existing user ID
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
::
|
||||
|
||||
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users -d userid="Frank" -d password="frankspassword" -H "OCS-APIRequest: true"
|
||||
|
||||
* Creates the user ``Frank`` with password ``frankspassword``
|
||||
* optionally groups can be specified by one or more ``groups[]`` query parameters:
|
||||
``URL -d groups[]="admin" -D groups[]="Team1"``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Search/get users
|
||||
----------------
|
||||
|
||||
Retrieves a list of users from the Nextcloud server. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users**
|
||||
|
||||
* HTTP method: GET
|
||||
* url arguments: search - string, optional search string
|
||||
* url arguments: limit - int, optional limit value
|
||||
* url arguments: offset - int, optional offset value
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users?search=Frank -H "OCS-APIRequest: true"
|
||||
|
||||
* Returns list of users matching the search string.
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data>
|
||||
<users>
|
||||
<element>Frank</element>
|
||||
</users>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Get data of a single user
|
||||
-------------------------
|
||||
|
||||
Retrieves information about a single user. Authentication is done by sending a
|
||||
Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
|
||||
|
||||
* Returns information on the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data>
|
||||
<enabled>true</enabled>
|
||||
<id>Frank</id>
|
||||
<quota>0</quota>
|
||||
<email>frank@example.org</email>
|
||||
<displayname>Frank K.</displayname>
|
||||
<display-name>Frank K.</display-name>
|
||||
<phone>0123 / 456 789</phone>
|
||||
<address>Foobar 12, 12345 Town</address>
|
||||
<website>https://nextcloud.com</website>
|
||||
<twitter>Nextcloud</twitter>
|
||||
<groups>
|
||||
<element>group1</element>
|
||||
<element>group2</element>
|
||||
</groups>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Edit data of a single user
|
||||
--------------------------
|
||||
|
||||
Edits attributes related to a user. Users are able to edit email, displayname
|
||||
and password; admins can also edit the quota value. Further restrictions may apply,
|
||||
check the `List of editable data fields`_ endpoint. Authentication
|
||||
is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}**
|
||||
|
||||
* HTTP method: PUT
|
||||
* PUT argument: key, the field to edit:
|
||||
|
||||
+ email
|
||||
+ quota
|
||||
+ displayname
|
||||
+ display (**deprecated** use `displayname` instead)
|
||||
+ phone
|
||||
+ address
|
||||
+ website
|
||||
+ twitter
|
||||
+ password
|
||||
|
||||
* PUT argument: value, the new value for the field
|
||||
|
||||
Status codes:
|
||||
|
||||
* 101 - invalid argument
|
||||
* 107 - password policy (hint exception)
|
||||
* 112 - Setting the password is not supported by the users backend
|
||||
* 113 - editing field not allowed / field doesn’t exist
|
||||
|
||||
Examples
|
||||
^^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="email" -d value="franksnewemail@example.org" -H "OCS-APIRequest: true"
|
||||
|
||||
* Updates the email address for the user ``Frank``
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="quota" -d value="100MB" -H "OCS-APIRequest: true"
|
||||
|
||||
* Updates the quota for the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
.. _editable_field_list:
|
||||
|
||||
List of editable data fields
|
||||
----------------------------
|
||||
|
||||
Edits attributes related to a user. Users are able to edit email, displayname
|
||||
and password; admins can also edit the quota value. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/user/fields**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Examples
|
||||
^^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/user/fields -H "OCS-APIRequest: true"
|
||||
|
||||
* Gets the list of fields
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message>OK</message>
|
||||
</meta>
|
||||
<data>
|
||||
<element>displayname</element>
|
||||
<element>email</element>
|
||||
<element>phone</element>
|
||||
<element>address</element>
|
||||
<element>website</element>
|
||||
<element>twitter</element>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
|
||||
Disable a user
|
||||
--------------
|
||||
|
||||
Disables a user on the Nextcloud server so that the user cannot login anymore.
|
||||
Authentication is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/disable**
|
||||
|
||||
* HTTP method: PUT
|
||||
|
||||
Statuscodes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/disable -H "OCS-APIRequest: true"
|
||||
|
||||
* Disables the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Enable a user
|
||||
-------------
|
||||
|
||||
Enables a user on the Nextcloud server so that the user can login again.
|
||||
Authentication is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/enable**
|
||||
|
||||
* HTTP method: PUT
|
||||
|
||||
Statuscodes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/enable -H "OCS-APIRequest: true"
|
||||
|
||||
* Enables the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Delete a user
|
||||
-------------
|
||||
|
||||
Deletes a user from the Nextcloud server. Authentication is done by sending a
|
||||
Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}**
|
||||
|
||||
* HTTP method: DELETE
|
||||
|
||||
Statuscodes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
|
||||
|
||||
* Deletes the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Get user's groups
|
||||
-----------------
|
||||
|
||||
Retrieves a list of groups the specified user is a member of. Authentication is
|
||||
done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -H "OCS-APIRequest: true"
|
||||
|
||||
* Retrieves a list of groups of which ``Frank`` is a member
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data>
|
||||
<groups>
|
||||
<element>admin</element>
|
||||
<element>group1</element>
|
||||
</groups>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Add user to group
|
||||
-----------------
|
||||
|
||||
Adds the specified user to the specified group. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
|
||||
|
||||
* HTTP method: POST
|
||||
* POST argument: groupid, string - the group to add the user to
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - no group specified
|
||||
* 102 - group does not exist
|
||||
* 103 - user does not exist
|
||||
* 104 - insufficient privileges
|
||||
* 105 - failed to add user to group
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
|
||||
|
||||
* Adds the user ``Frank`` to the group ``newgroup``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Remove user from group
|
||||
----------------------
|
||||
|
||||
Removes the specified user from the specified group. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
|
||||
|
||||
* HTTP method: DELETE
|
||||
* DELETE argument: groupid, string - the group to remove the user from
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - no group specified
|
||||
* 102 - group does not exist
|
||||
* 103 - user does not exist
|
||||
* 104 - insufficient privileges
|
||||
* 105 - failed to remove user from group
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
|
||||
|
||||
* Removes the user ``Frank`` from the group ``newgroup``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Promote user to subadmin
|
||||
------------------------
|
||||
|
||||
Makes a user the subadmin of a group. Authentication is done by sending a Basic
|
||||
HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
|
||||
|
||||
* HTTP method: POST
|
||||
* POST argument: groupid, string - the group of which to make the user a
|
||||
subadmin
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - user does not exist
|
||||
* 102 - group does not exist
|
||||
* 103 - unknown failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="group" -H "OCS-APIRequest: true"
|
||||
|
||||
* Makes the user ``Frank`` a subadmin of the ``group`` group
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Demote user from subadmin
|
||||
-------------------------
|
||||
|
||||
Removes the subadmin rights for the user specified from the group specified.
|
||||
Authentication is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
|
||||
|
||||
* HTTP method: DELETE
|
||||
* DELETE argument: groupid, string - the group from which to remove the user's
|
||||
subadmin rights
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - user does not exist
|
||||
* 102 - user is not a subadmin of the group / group does not exist
|
||||
* 103 - unknown failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X DELETE https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="oldgroup" -H "OCS-APIRequest: true"
|
||||
|
||||
* Removes ``Frank's`` subadmin rights from the ``oldgroup`` group
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Get user's subadmin groups
|
||||
--------------------------
|
||||
|
||||
Returns the groups in which the user is a subadmin. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - user does not exist
|
||||
* 102 - unknown failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -H "OCS-APIRequest: true"
|
||||
|
||||
* Returns the groups of which ``Frank`` is a subadmin
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data>
|
||||
<element>testgroup</element>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Resend the welcome email
|
||||
------------------------
|
||||
|
||||
The request to this endpoint triggers the welcome email for this user again.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/welcome**
|
||||
|
||||
* HTTP method: POST
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - email address not available
|
||||
* 102 - sending email failed
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/welcome -H "OCS-APIRequest: true"
|
||||
|
||||
* Sends the welcome email to ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
Reference in New Issue
Block a user