Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b50fa9b824 |
+1
-3
@@ -4,6 +4,4 @@ __pycache__/
|
||||
*.env
|
||||
.env.local
|
||||
.env.*.local
|
||||
|
||||
# Generated by pytest used to login users
|
||||
.nextcloud_oauth_shared_test_client.json
|
||||
.nextcloud_oauth_test_client.json
|
||||
|
||||
@@ -1,15 +1,3 @@
|
||||
## v0.14.0 (2025-10-15)
|
||||
|
||||
### Feat
|
||||
|
||||
- Add Groups API client
|
||||
- add sharing API client and server tools
|
||||
- **users**: Initialize user API client
|
||||
|
||||
### Fix
|
||||
|
||||
- Update user/groups API to OCS v2
|
||||
|
||||
## v0.13.0 (2025-10-13)
|
||||
|
||||
### Feat
|
||||
|
||||
@@ -38,21 +38,13 @@ mcp run --transport sse nextcloud_mcp_server.app:mcp
|
||||
# Docker development environment with Nextcloud instance
|
||||
docker-compose up
|
||||
|
||||
# After code changes, rebuild and restart the appropriate MCP server container:
|
||||
# For basic auth changes (most common) - uses admin credentials
|
||||
# After code changes, rebuild and restart only the MCP server container
|
||||
docker-compose up --build -d mcp
|
||||
|
||||
# For OAuth changes - uses OAuth authentication flow
|
||||
docker-compose up --build -d mcp-oauth
|
||||
|
||||
# Build Docker image
|
||||
docker build -t nextcloud-mcp-server .
|
||||
```
|
||||
|
||||
**Important: Two MCP Server Containers**
|
||||
- **`mcp`** (port 8000): Uses basic auth with admin credentials. Use this for most development and testing.
|
||||
- **`mcp-oauth`** (port 8001): Uses OAuth authentication. Only use this when working on OAuth-specific features or tests.
|
||||
|
||||
### Environment Setup
|
||||
```bash
|
||||
# Install dependencies
|
||||
@@ -104,23 +96,18 @@ Each Nextcloud app has a corresponding server module that:
|
||||
|
||||
### Testing Structure
|
||||
|
||||
- **Integration tests** in `tests/integration/` and `tests/client/`, `tests/server/` - Test real Nextcloud API interactions
|
||||
- **Integration tests** in `tests/integration/` - Test real Nextcloud API interactions
|
||||
- **Fixtures** in `tests/conftest.py` - Shared test setup and utilities
|
||||
- Tests are marked with `@pytest.mark.integration` for selective running
|
||||
- **Important**: Integration tests run against live Docker containers. After making code changes:
|
||||
- For basic auth tests: rebuild with `docker-compose up --build -d mcp`
|
||||
- For OAuth tests: rebuild with `docker-compose up --build -d mcp-oauth`
|
||||
- **Important**: Integration tests run against live Docker containers. After making code changes to the MCP server, rebuild only the MCP container with `docker-compose up --build -d mcp` before running tests
|
||||
|
||||
#### Testing Best Practices
|
||||
- **MANDATORY: Always run tests after implementing features or fixing bugs**
|
||||
- Run tests to completion before considering any task complete
|
||||
- If tests require modifications to pass, ask for permission before proceeding
|
||||
- **Rebuild the correct container** after code changes:
|
||||
- For basic auth tests (most common): `docker-compose up --build -d mcp`
|
||||
- For OAuth tests: `docker-compose up --build -d mcp-oauth`
|
||||
- Use `docker-compose up --build -d mcp` to rebuild MCP container after code changes
|
||||
- **Use existing fixtures** from `tests/conftest.py` to avoid duplicate setup work:
|
||||
- `nc_mcp_client` - MCP client session for tool/resource testing (uses `mcp` container)
|
||||
- `nc_mcp_oauth_client` - MCP client session for OAuth testing (uses `mcp-oauth` container)
|
||||
- `nc_mcp_client` - MCP client session for tool/resource testing
|
||||
- `nc_client` - Direct NextcloudClient for setup/cleanup operations
|
||||
- `temporary_note` - Creates and cleans up test notes automatically
|
||||
- `temporary_addressbook` - Creates and cleans up test address books
|
||||
@@ -128,7 +115,6 @@ Each Nextcloud app has a corresponding server module that:
|
||||
- **Test specific functionality** after changes:
|
||||
- For Notes changes: `uv run pytest tests/integration/test_mcp.py -k "notes" -v`
|
||||
- For specific API changes: `uv run pytest tests/integration/test_notes_api.py -v`
|
||||
- For OAuth changes: `uv run pytest tests/server/test_oauth*.py -v` (remember to rebuild `mcp-oauth` container)
|
||||
- **Avoid creating standalone test scripts** - use pytest with proper fixtures instead
|
||||
|
||||
#### OAuth/OIDC Testing
|
||||
@@ -137,14 +123,7 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
|
||||
**Automated Testing (Default - Recommended for CI/CD):**
|
||||
- **Default fixtures**: `nc_oauth_client`, `nc_mcp_oauth_client` now use Playwright automation by default
|
||||
- Uses Playwright headless browser automation to complete OAuth flow programmatically
|
||||
- **Shared OAuth Client**: All test users authenticate using a single OAuth client (matching MCP server behavior)
|
||||
- Single `client_id`/`client_secret` pair is registered and reused for all test users
|
||||
- Stored in `.nextcloud_oauth_shared_test_client.json` with `force_register=False` for reuse
|
||||
- Reduces OAuth client registrations and matches production MCP server architecture
|
||||
- All Playwright fixtures: `playwright_oauth_token`, `nc_oauth_client`, `nc_mcp_oauth_client`, `nc_oauth_client_playwright`, `nc_mcp_oauth_client_playwright`
|
||||
- Multi-user fixtures: `alice_oauth_token`, `bob_oauth_token`, `charlie_oauth_token`, `diana_oauth_token`
|
||||
- All use `shared_oauth_client_credentials` fixture for consistent client credentials
|
||||
- Each user gets unique access tokens via same OAuth client (like multiple users using the MCP server)
|
||||
- Requires: `NEXTCLOUD_HOST`, `NEXTCLOUD_USERNAME`, `NEXTCLOUD_PASSWORD` environment variables
|
||||
- Uses `pytest-playwright-asyncio` for async Playwright fixtures
|
||||
- Playwright configuration: Use pytest CLI args like `--browser firefox --headed` to customize
|
||||
@@ -152,13 +131,13 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
|
||||
- Example:
|
||||
```bash
|
||||
# Run all OAuth tests with automated Playwright flow using Firefox
|
||||
uv run pytest tests/server/test_oauth*.py --browser firefox -v
|
||||
uv run pytest tests/integration/test_oauth*.py --browser firefox -v
|
||||
|
||||
# Run specific Playwright tests with visible browser for debugging
|
||||
uv run pytest tests/server/test_mcp_oauth.py --browser firefox --headed -v
|
||||
uv run pytest tests/integration/test_oauth_playwright.py --browser firefox --headed -v
|
||||
|
||||
# Run with Chromium (default)
|
||||
uv run pytest tests/server/test_oauth*.py -v
|
||||
uv run pytest tests/integration/test_oauth.py -v
|
||||
```
|
||||
|
||||
**Interactive Testing (Manual browser login):**
|
||||
@@ -170,23 +149,18 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
|
||||
- Example:
|
||||
```bash
|
||||
# Run OAuth tests with interactive flow (will open browser and wait for manual login)
|
||||
uv run pytest tests/client/test_oauth_interactive.py -v
|
||||
uv run pytest tests/integration/test_oauth_interactive.py -v
|
||||
```
|
||||
|
||||
**Test Environment Setup:**
|
||||
- **Two MCP server containers are available:**
|
||||
- `mcp` (port 8000): Uses basic auth with admin credentials - for most testing
|
||||
- `mcp-oauth` (port 8001): Uses OAuth authentication - for OAuth-specific testing
|
||||
- Start OAuth MCP server: `docker-compose up --build -d mcp-oauth`
|
||||
- **Important**: When working on OAuth functionality, always rebuild `mcp-oauth` container, not `mcp`
|
||||
- Shared OAuth client is registered once and reused across test runs
|
||||
- Client credentials cached in `.nextcloud_oauth_shared_test_client.json`
|
||||
- OAuth server runs on port 8001 (regular MCP on 8000)
|
||||
- Both flows register OAuth clients dynamically using Nextcloud's OIDC provider
|
||||
|
||||
**CI/CD Considerations:**
|
||||
- Interactive OAuth tests are automatically skipped when `GITHUB_ACTIONS` environment variable is set
|
||||
- Automated Playwright tests will run in CI/CD environments
|
||||
- Use Firefox browser in CI: `--browser firefox` (Chromium may have issues with localhost redirects)
|
||||
- Shared client approach reduces test time and API calls to Nextcloud
|
||||
|
||||
### Configuration Files
|
||||
|
||||
|
||||
@@ -17,6 +17,11 @@ patch -u /var/www/html/custom_apps/oidc/lib/Util/DiscoveryGenerator.php -i /dock
|
||||
php /var/www/html/occ config:app:set oidc dynamic_client_registration --value='true'
|
||||
php /var/www/html/occ config:app:set oidc proof_key_for_code_exchange --value=true --type=boolean
|
||||
|
||||
# Set the OIDC issuer URL (defaults to http://localhost:8080 if not provided)
|
||||
OIDC_ISSUER="${NEXTCLOUD_PUBLIC_ISSUER_URL:-http://localhost:8080}"
|
||||
php /var/www/html/occ config:app:set oidc issuer --value="${OIDC_ISSUER}"
|
||||
echo "OIDC issuer set to: ${OIDC_ISSUER}"
|
||||
|
||||
# Configure user_oidc to validate bearer tokens from the OIDC Identity Provider
|
||||
php /var/www/html/occ config:system:set user_oidc oidc_provider_bearer_validation --value=true --type=boolean
|
||||
|
||||
|
||||
+1
-1
@@ -63,7 +63,7 @@ services:
|
||||
- 127.0.0.1:8001:8001
|
||||
environment:
|
||||
- NEXTCLOUD_HOST=http://app:80
|
||||
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8001
|
||||
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.01:8001
|
||||
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
|
||||
# No USERNAME/PASSWORD - will use OAuth
|
||||
volumes:
|
||||
|
||||
@@ -21,7 +21,6 @@ from nextcloud_mcp_server.server import (
|
||||
configure_contacts_tools,
|
||||
configure_deck_tools,
|
||||
configure_notes_tools,
|
||||
configure_sharing_tools,
|
||||
configure_tables_tools,
|
||||
configure_webdav_tools,
|
||||
)
|
||||
@@ -376,7 +375,6 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
"notes": configure_notes_tools,
|
||||
"tables": configure_tables_tools,
|
||||
"webdav": configure_webdav_tools,
|
||||
"sharing": configure_sharing_tools,
|
||||
"calendar": configure_calendar_tools,
|
||||
"contacts": configure_contacts_tools,
|
||||
"deck": configure_deck_tools,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Dynamic client registration for Nextcloud OIDC."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -205,6 +206,65 @@ def save_client_to_file(client_info: ClientInfo, storage_path: Path):
|
||||
raise
|
||||
|
||||
|
||||
async def wait_for_client_propagation(
|
||||
nextcloud_url: str,
|
||||
client_id: str,
|
||||
max_retries: int = 10,
|
||||
initial_delay: float = 0.5,
|
||||
max_delay: float = 5.0,
|
||||
) -> None:
|
||||
"""
|
||||
Wait for the registered OAuth client to be fully propagated in Nextcloud.
|
||||
|
||||
This function attempts to verify the client is ready by checking if we can
|
||||
access OIDC-related endpoints. Uses exponential backoff for retries.
|
||||
|
||||
Args:
|
||||
nextcloud_url: Base URL of the Nextcloud instance
|
||||
client_id: The registered client ID
|
||||
max_retries: Maximum number of retry attempts
|
||||
initial_delay: Initial delay in seconds before first verification
|
||||
max_delay: Maximum delay between retries
|
||||
|
||||
Note:
|
||||
This is a best-effort approach to mitigate race conditions between
|
||||
client registration and first use. Nextcloud's OIDC provider may need
|
||||
time to propagate newly registered clients to its cache/database.
|
||||
"""
|
||||
# Always wait at least the initial delay to give Nextcloud time to propagate
|
||||
logger.debug(
|
||||
f"Waiting {initial_delay}s for OAuth client {client_id[:16]}... to propagate"
|
||||
)
|
||||
await asyncio.sleep(initial_delay)
|
||||
|
||||
# Verify the client is accessible by checking OIDC discovery again
|
||||
# (this gives Nextcloud additional time to complete any async operations)
|
||||
discovery_url = f"{nextcloud_url}/.well-known/openid-configuration"
|
||||
delay = initial_delay
|
||||
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
response = await client.get(discovery_url)
|
||||
response.raise_for_status()
|
||||
logger.debug(
|
||||
f"OAuth client propagation verification successful (attempt {attempt})"
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
if attempt < max_retries:
|
||||
delay = min(delay * 1.5, max_delay)
|
||||
logger.debug(
|
||||
f"Verification attempt {attempt} failed: {e}. Retrying in {delay:.1f}s..."
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Could not verify client propagation after {max_retries} attempts. "
|
||||
"Continuing anyway - first authorization may fail."
|
||||
)
|
||||
|
||||
|
||||
async def load_or_register_client(
|
||||
nextcloud_url: str,
|
||||
registration_endpoint: str,
|
||||
@@ -212,6 +272,7 @@ async def load_or_register_client(
|
||||
client_name: str = "Nextcloud MCP Server",
|
||||
redirect_uris: list[str] | None = None,
|
||||
force_register: bool = True,
|
||||
wait_for_propagation: bool = True,
|
||||
) -> ClientInfo:
|
||||
"""
|
||||
Load client from storage or register a new one if not found/expired.
|
||||
@@ -220,7 +281,8 @@ async def load_or_register_client(
|
||||
1. Checks for existing client credentials in storage
|
||||
2. Validates the credentials are not expired
|
||||
3. Registers a new client if needed
|
||||
4. Saves the new client credentials
|
||||
4. Waits for the client to propagate (if newly registered)
|
||||
5. Saves the new client credentials
|
||||
|
||||
Args:
|
||||
nextcloud_url: Base URL of the Nextcloud instance
|
||||
@@ -229,6 +291,7 @@ async def load_or_register_client(
|
||||
client_name: Name of the client application
|
||||
redirect_uris: List of redirect URIs
|
||||
force_register: Force registration even if valid credentials exist
|
||||
wait_for_propagation: Wait for newly registered clients to propagate (default: True)
|
||||
|
||||
Returns:
|
||||
ClientInfo with valid credentials
|
||||
@@ -254,6 +317,15 @@ async def load_or_register_client(
|
||||
redirect_uris=redirect_uris,
|
||||
)
|
||||
|
||||
# Wait for client to propagate in Nextcloud's OIDC provider
|
||||
# This mitigates race conditions where the client is used immediately after registration
|
||||
if wait_for_propagation:
|
||||
logger.info("Waiting for OAuth client to propagate in Nextcloud...")
|
||||
await wait_for_client_propagation(
|
||||
nextcloud_url=nextcloud_url,
|
||||
client_id=client_info.client_id,
|
||||
)
|
||||
|
||||
# Save to storage
|
||||
save_client_to_file(client_info, storage_path)
|
||||
|
||||
|
||||
@@ -15,12 +15,9 @@ from ..controllers.notes_search import NotesSearchController
|
||||
from .calendar import CalendarClient
|
||||
from .contacts import ContactsClient
|
||||
from .deck import DeckClient
|
||||
from .groups import GroupsClient
|
||||
from .notes import NotesClient
|
||||
from .sharing import SharingClient
|
||||
from .tables import TablesClient
|
||||
from .webdav import WebDAVClient
|
||||
from .users import UsersClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -74,9 +71,6 @@ class NextcloudClient:
|
||||
self.calendar = CalendarClient(self._client, username)
|
||||
self.contacts = ContactsClient(self._client, username)
|
||||
self.deck = DeckClient(self._client, username)
|
||||
self.users = UsersClient(self._client, username)
|
||||
self.groups = GroupsClient(self._client, username)
|
||||
self.sharing = SharingClient(self._client, username)
|
||||
|
||||
# Initialize controllers
|
||||
self._notes_search = NotesSearchController()
|
||||
|
||||
@@ -99,7 +99,7 @@ class DeckClient(BaseNextcloudClient):
|
||||
permission_edit: bool,
|
||||
permission_share: bool,
|
||||
permission_manage: bool,
|
||||
) -> DeckACL:
|
||||
) -> List[DeckACL]:
|
||||
json_data = {
|
||||
"type": type,
|
||||
"participant": participant,
|
||||
@@ -107,14 +107,10 @@ class DeckClient(BaseNextcloudClient):
|
||||
"permissionShare": permission_share,
|
||||
"permissionManage": permission_manage,
|
||||
}
|
||||
headers = self._get_deck_headers()
|
||||
response = await self._make_request(
|
||||
"POST",
|
||||
f"/apps/deck/api/v1.0/boards/{board_id}/acl",
|
||||
json=json_data,
|
||||
headers=headers,
|
||||
"POST", f"/apps/deck/api/v1.0/boards/{board_id}/acl", json=json_data
|
||||
)
|
||||
return DeckACL(**response.json())
|
||||
return [DeckACL(**acl) for acl in response.json()]
|
||||
|
||||
async def update_acl_rule(
|
||||
self,
|
||||
@@ -131,20 +127,13 @@ class DeckClient(BaseNextcloudClient):
|
||||
json_data["permissionShare"] = permission_share
|
||||
if permission_manage is not None:
|
||||
json_data["permissionManage"] = permission_manage
|
||||
headers = self._get_deck_headers()
|
||||
await self._make_request(
|
||||
"PUT",
|
||||
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
|
||||
json=json_data,
|
||||
headers=headers,
|
||||
"PUT", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}", json=json_data
|
||||
)
|
||||
|
||||
async def delete_acl_rule(self, board_id: int, acl_id: int) -> None:
|
||||
headers = self._get_deck_headers()
|
||||
await self._make_request(
|
||||
"DELETE",
|
||||
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
|
||||
headers=headers,
|
||||
"DELETE", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}"
|
||||
)
|
||||
|
||||
async def clone_board(
|
||||
|
||||
@@ -1,151 +0,0 @@
|
||||
"""Nextcloud Groups API client."""
|
||||
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from .base import BaseNextcloudClient, retry_on_429
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GroupsClient(BaseNextcloudClient):
|
||||
"""Client for Nextcloud Groups API operations."""
|
||||
|
||||
@retry_on_429
|
||||
async def search_groups(
|
||||
self,
|
||||
search: str | None = None,
|
||||
limit: int | None = None,
|
||||
offset: int | None = None,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Search for groups on the Nextcloud server.
|
||||
|
||||
Args:
|
||||
search: Optional search string to filter groups
|
||||
limit: Optional limit for number of results
|
||||
offset: Optional offset for pagination
|
||||
|
||||
Returns:
|
||||
List of group IDs matching the search criteria
|
||||
"""
|
||||
params = {}
|
||||
if search is not None:
|
||||
params["search"] = search
|
||||
if limit is not None:
|
||||
params["limit"] = limit
|
||||
if offset is not None:
|
||||
params["offset"] = offset
|
||||
|
||||
response = await self._client.get(
|
||||
"/ocs/v2.php/cloud/groups",
|
||||
params=params,
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
groups = data["ocs"]["data"].get("groups", [])
|
||||
return groups
|
||||
|
||||
@retry_on_429
|
||||
async def create_group(self, groupid: str) -> None:
|
||||
"""
|
||||
Create a new group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID to create
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails (e.g., group already exists)
|
||||
"""
|
||||
response = await self._client.post(
|
||||
"/ocs/v2.php/cloud/groups",
|
||||
data={"groupid": groupid},
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Created group: {groupid}")
|
||||
|
||||
@retry_on_429
|
||||
async def delete_group(self, groupid: str) -> None:
|
||||
"""
|
||||
Delete a group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID to delete
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails (e.g., group doesn't exist)
|
||||
"""
|
||||
response = await self._client.delete(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Deleted group: {groupid}")
|
||||
|
||||
@retry_on_429
|
||||
async def get_group_members(self, groupid: str) -> List[str]:
|
||||
"""
|
||||
Get members of a group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID
|
||||
|
||||
Returns:
|
||||
List of usernames in the group
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
users = data["ocs"]["data"].get("users", [])
|
||||
return users
|
||||
|
||||
@retry_on_429
|
||||
async def get_group_subadmins(self, groupid: str) -> List[str]:
|
||||
"""
|
||||
Get subadmins of a group.
|
||||
|
||||
Args:
|
||||
groupid: The group ID
|
||||
|
||||
Returns:
|
||||
List of usernames who are subadmins of the group
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}/subadmins",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
# The API returns data as a list or dict depending on results
|
||||
subadmins_data = data["ocs"]["data"]
|
||||
if isinstance(subadmins_data, list):
|
||||
return subadmins_data
|
||||
return []
|
||||
|
||||
@retry_on_429
|
||||
async def update_group_displayname(self, groupid: str, displayname: str) -> None:
|
||||
"""
|
||||
Update a group's display name.
|
||||
|
||||
Args:
|
||||
groupid: The group ID
|
||||
displayname: The new display name
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.put(
|
||||
f"/ocs/v2.php/cloud/groups/{groupid}",
|
||||
data={"key": "displayname", "value": displayname},
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Updated group {groupid} displayname to: {displayname}")
|
||||
@@ -1,208 +0,0 @@
|
||||
"""Nextcloud OCS Sharing API client for file/folder sharing operations."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from .base import BaseNextcloudClient, retry_on_429
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SharingClient(BaseNextcloudClient):
|
||||
"""Client for Nextcloud OCS Sharing API operations."""
|
||||
|
||||
@retry_on_429
|
||||
async def create_share(
|
||||
self,
|
||||
path: str,
|
||||
share_with: str,
|
||||
share_type: int = 0,
|
||||
permissions: int = 1,
|
||||
) -> dict[str, Any]:
|
||||
"""Create a share for a file or folder.
|
||||
|
||||
Args:
|
||||
path: Path to file/folder to share (relative to user's files)
|
||||
share_with: Username (for user share) or group name (for group share)
|
||||
share_type: Share type (0=user, 1=group, 3=public link)
|
||||
permissions: Share permissions:
|
||||
- 1 = read
|
||||
- 2 = update
|
||||
- 4 = create
|
||||
- 8 = delete
|
||||
- 16 = share
|
||||
- 31 = all permissions
|
||||
Common combinations: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
|
||||
|
||||
Returns:
|
||||
Share data including share ID
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.post(
|
||||
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
data={
|
||||
"path": path,
|
||||
"shareType": share_type,
|
||||
"shareWith": share_with,
|
||||
"permissions": permissions,
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
# OCS API v2 uses HTTP-style status codes (200 for success)
|
||||
# OCS API v1 used custom codes (100 for success)
|
||||
ocs_status = data["ocs"]["meta"]["statuscode"]
|
||||
if ocs_status not in (100, 200):
|
||||
ocs_message = data["ocs"]["meta"].get("message", "Unknown error")
|
||||
raise RuntimeError(f"OCS API error (code {ocs_status}): {ocs_message}")
|
||||
|
||||
share_data = data["ocs"]["data"]
|
||||
|
||||
# Handle case where data might be an empty list on error
|
||||
if not share_data or (isinstance(share_data, list) and len(share_data) == 0):
|
||||
ocs_message = data["ocs"]["meta"].get("message", "Unknown error")
|
||||
raise RuntimeError(
|
||||
f"Share creation failed: {ocs_message} (status {ocs_status})"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Created share {share_data['id']}: {path} -> {share_with} "
|
||||
f"(type={share_type}, permissions={permissions})"
|
||||
)
|
||||
return share_data
|
||||
|
||||
@retry_on_429
|
||||
async def delete_share(self, share_id: int) -> None:
|
||||
"""Delete a share by its ID.
|
||||
|
||||
Args:
|
||||
share_id: The share ID to delete
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.delete(
|
||||
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
logger.info(f"Deleted share {share_id}")
|
||||
|
||||
@retry_on_429
|
||||
async def get_share(self, share_id: int) -> dict[str, Any]:
|
||||
"""Get information about a specific share.
|
||||
|
||||
Args:
|
||||
share_id: The share ID
|
||||
|
||||
Returns:
|
||||
Share data
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
response = await self._client.get(
|
||||
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
share_data = data["ocs"]["data"]
|
||||
# The API returns a list with a single share, extract the first element
|
||||
if isinstance(share_data, list) and len(share_data) > 0:
|
||||
return share_data[0]
|
||||
return share_data
|
||||
|
||||
@retry_on_429
|
||||
async def list_shares(
|
||||
self, path: str | None = None, shared_with_me: bool = False
|
||||
) -> list[dict[str, Any]]:
|
||||
"""List shares.
|
||||
|
||||
Args:
|
||||
path: Optional path to filter shares for a specific file/folder
|
||||
shared_with_me: If True, list shares shared with the current user
|
||||
|
||||
Returns:
|
||||
List of share data
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
params = {}
|
||||
if path:
|
||||
params["path"] = path
|
||||
if shared_with_me:
|
||||
params["shared_with_me"] = "true"
|
||||
|
||||
response = await self._client.get(
|
||||
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
|
||||
params=params,
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {data['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
# Handle both single share and list of shares
|
||||
shares_data = data["ocs"]["data"]
|
||||
if isinstance(shares_data, dict):
|
||||
return [shares_data]
|
||||
return shares_data if shares_data else []
|
||||
|
||||
@retry_on_429
|
||||
async def update_share(
|
||||
self, share_id: int, permissions: int | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""Update a share's permissions.
|
||||
|
||||
Args:
|
||||
share_id: The share ID to update
|
||||
permissions: New permissions value (see create_share for values)
|
||||
|
||||
Returns:
|
||||
Updated share data
|
||||
|
||||
Raises:
|
||||
HTTPStatusError: If the request fails
|
||||
"""
|
||||
data = {}
|
||||
if permissions is not None:
|
||||
data["permissions"] = permissions
|
||||
|
||||
response = await self._client.put(
|
||||
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
|
||||
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
|
||||
data=data,
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
if result["ocs"]["meta"]["statuscode"] not in (100, 200):
|
||||
raise RuntimeError(
|
||||
f"OCS API error: {result['ocs']['meta'].get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
logger.info(f"Updated share {share_id}")
|
||||
return result["ocs"]["data"]
|
||||
@@ -1,222 +0,0 @@
|
||||
from typing import List, Optional, Dict
|
||||
from nextcloud_mcp_server.client.base import BaseNextcloudClient
|
||||
from nextcloud_mcp_server.models.users import UserDetails
|
||||
|
||||
|
||||
class UsersClient(BaseNextcloudClient):
|
||||
"""Client for Nextcloud User API operations."""
|
||||
|
||||
def _get_user_headers(
|
||||
self, additional_headers: Optional[Dict[str, str]] = None
|
||||
) -> Dict[str, str]:
|
||||
"""Get standard headers required for User API calls."""
|
||||
headers = {"OCS-APIRequest": "true", "Accept": "application/json"}
|
||||
if additional_headers:
|
||||
headers.update(additional_headers)
|
||||
return headers
|
||||
|
||||
async def create_user(
|
||||
self,
|
||||
userid: str,
|
||||
password: Optional[str] = None,
|
||||
display_name: Optional[str] = None,
|
||||
email: Optional[str] = None,
|
||||
groups: Optional[List[str]] = None,
|
||||
subadmin_groups: Optional[List[str]] = None,
|
||||
quota: Optional[str] = None,
|
||||
language: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Create a new user on the Nextcloud server.
|
||||
"""
|
||||
data = {"userid": userid}
|
||||
if password is not None:
|
||||
data["password"] = password
|
||||
if display_name is not None:
|
||||
data["displayName"] = display_name
|
||||
if email is not None:
|
||||
data["email"] = email
|
||||
if groups is not None:
|
||||
for i, group in enumerate(groups):
|
||||
data[f"groups[{i}]"] = group
|
||||
if subadmin_groups is not None:
|
||||
for i, group in enumerate(subadmin_groups):
|
||||
data[f"subadmin[{i}]"] = group
|
||||
if quota is not None:
|
||||
data["quota"] = quota
|
||||
if language is not None:
|
||||
data["language"] = language
|
||||
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST", "/ocs/v2.php/cloud/users", data=data, headers=headers
|
||||
)
|
||||
|
||||
async def search_users(
|
||||
self,
|
||||
search: Optional[str] = None,
|
||||
limit: Optional[int] = None,
|
||||
offset: Optional[int] = None,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Retrieves a list of users from the Nextcloud server.
|
||||
"""
|
||||
params = {}
|
||||
if search is not None:
|
||||
params["search"] = search
|
||||
if limit is not None:
|
||||
params["limit"] = limit
|
||||
if offset is not None:
|
||||
params["offset"] = offset
|
||||
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", "/ocs/v2.php/cloud/users", params=params, headers=headers
|
||||
)
|
||||
# The v2 API returns JSON with users as a direct list under data.users
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data.get("users", [])
|
||||
|
||||
async def get_user_details(self, userid: str) -> UserDetails:
|
||||
"""
|
||||
Retrieves information about a single user.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", f"/ocs/v2.php/cloud/users/{userid}", headers=headers
|
||||
)
|
||||
return UserDetails(**response.json()["ocs"]["data"])
|
||||
|
||||
async def update_user_field(self, userid: str, key: str, value: str) -> None:
|
||||
"""
|
||||
Edits attributes related to a user.
|
||||
"""
|
||||
data = {"key": key, "value": value}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/ocs/v2.php/cloud/users/{userid}", data=data, headers=headers
|
||||
)
|
||||
|
||||
async def get_editable_user_fields(self) -> List[str]:
|
||||
"""
|
||||
Gets the list of editable data fields for a user.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", "/ocs/v2.php/cloud/user/fields", headers=headers
|
||||
)
|
||||
# The v2 API returns data as a direct list
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
async def disable_user(self, userid: str) -> None:
|
||||
"""
|
||||
Disables a user on the Nextcloud server.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/ocs/v2.php/cloud/users/{userid}/disable", headers=headers
|
||||
)
|
||||
|
||||
async def enable_user(self, userid: str) -> None:
|
||||
"""
|
||||
Enables a user on the Nextcloud server.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"PUT", f"/ocs/v2.php/cloud/users/{userid}/enable", headers=headers
|
||||
)
|
||||
|
||||
async def delete_user(self, userid: str) -> None:
|
||||
"""
|
||||
Deletes a user from the Nextcloud server.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"DELETE", f"/ocs/v2.php/cloud/users/{userid}", headers=headers
|
||||
)
|
||||
|
||||
async def get_user_groups(self, userid: str) -> List[str]:
|
||||
"""
|
||||
Retrieves a list of groups the specified user is a member of.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", f"/ocs/v2.php/cloud/users/{userid}/groups", headers=headers
|
||||
)
|
||||
# The v2 API returns groups as a direct list under data.groups
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data.get("groups", [])
|
||||
|
||||
async def add_user_to_group(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Adds the specified user to the specified group.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/groups",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def remove_user_from_group(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Removes the specified user from the specified group.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"DELETE",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/groups",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def promote_user_to_subadmin(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Makes a user the subadmin of a group.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/subadmins",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def demote_user_from_subadmin(self, userid: str, groupid: str) -> None:
|
||||
"""
|
||||
Removes the subadmin rights for the user specified from the group specified.
|
||||
"""
|
||||
data = {"groupid": groupid}
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"DELETE",
|
||||
f"/ocs/v2.php/cloud/users/{userid}/subadmins",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def get_user_subadmin_groups(self, userid: str) -> List[str]:
|
||||
"""
|
||||
Returns the groups in which the user is a subadmin.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
response = await self._make_request(
|
||||
"GET", f"/ocs/v2.php/cloud/users/{userid}/subadmins", headers=headers
|
||||
)
|
||||
# The v2 API returns data as a direct list
|
||||
data = response.json()["ocs"]["data"]
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
async def resend_welcome_email(self, userid: str) -> None:
|
||||
"""
|
||||
Triggers the welcome email for this user again.
|
||||
"""
|
||||
headers = self._get_user_headers()
|
||||
await self._make_request(
|
||||
"POST", f"/ocs/v2.php/cloud/users/{userid}/welcome", headers=headers
|
||||
)
|
||||
@@ -1,40 +0,0 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
"""Model for creating a new user."""
|
||||
|
||||
userid: str
|
||||
password: Optional[str] = None
|
||||
displayName: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
groups: Optional[List[str]] = Field(default_factory=list)
|
||||
subadmin: Optional[List[str]] = Field(default_factory=list)
|
||||
quota: Optional[str] = None
|
||||
language: Optional[str] = None
|
||||
|
||||
|
||||
class UserDetails(BaseModel):
|
||||
"""Model for retrieving detailed user information."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
enabled: bool
|
||||
id: str
|
||||
quota: Union[str, Dict[str, Any]] # Can be string or quota object
|
||||
email: Optional[str] = None # Can be null
|
||||
displayname: str = Field(
|
||||
alias="display-name"
|
||||
) # Handle both displayname and display-name
|
||||
phone: Optional[str] = None
|
||||
address: Optional[str] = None
|
||||
website: Optional[str] = None
|
||||
twitter: Optional[str] = None
|
||||
groups: Optional[List[str]] = Field(default_factory=list)
|
||||
|
||||
|
||||
class Group(BaseModel):
|
||||
"""Model for a user group."""
|
||||
|
||||
id: str
|
||||
@@ -2,7 +2,6 @@ from .calendar import configure_calendar_tools
|
||||
from .contacts import configure_contacts_tools
|
||||
from .deck import configure_deck_tools
|
||||
from .notes import configure_notes_tools
|
||||
from .sharing import configure_sharing_tools
|
||||
from .tables import configure_tables_tools
|
||||
from .webdav import configure_webdav_tools
|
||||
|
||||
@@ -11,7 +10,6 @@ __all__ = [
|
||||
"configure_contacts_tools",
|
||||
"configure_deck_tools",
|
||||
"configure_notes_tools",
|
||||
"configure_sharing_tools",
|
||||
"configure_tables_tools",
|
||||
"configure_webdav_tools",
|
||||
]
|
||||
|
||||
@@ -1,133 +0,0 @@
|
||||
"""MCP tools for Nextcloud file/folder sharing operations."""
|
||||
|
||||
import json
|
||||
|
||||
from nextcloud_mcp_server.context import get_client
|
||||
from mcp.server.fastmcp import Context, FastMCP
|
||||
|
||||
|
||||
def configure_sharing_tools(mcp: FastMCP):
|
||||
"""Configure sharing-related MCP tools.
|
||||
|
||||
Args:
|
||||
mcp: FastMCP server instance
|
||||
"""
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_create(
|
||||
path: str,
|
||||
share_with: str,
|
||||
ctx: Context,
|
||||
share_type: int = 0,
|
||||
permissions: int = 1,
|
||||
) -> str:
|
||||
"""Create a share for a file or folder in Nextcloud.
|
||||
|
||||
Share a file or folder with another user or group. The authenticated user
|
||||
must own the file/folder being shared.
|
||||
|
||||
Args:
|
||||
path: Path to file/folder to share (relative to your files, e.g., "/document.txt")
|
||||
share_with: Username (for user share) or group name (for group share)
|
||||
share_type: Share type - 0 for user (default), 1 for group, 3 for public link
|
||||
permissions: Share permissions (default: 1 for read-only):
|
||||
- 1 = read
|
||||
- 2 = update
|
||||
- 4 = create
|
||||
- 8 = delete
|
||||
- 16 = share
|
||||
- 31 = all permissions
|
||||
Common: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
|
||||
|
||||
Returns:
|
||||
JSON string with share information including share ID
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
share_data = await client.sharing.create_share(
|
||||
path=path,
|
||||
share_with=share_with,
|
||||
share_type=share_type,
|
||||
permissions=permissions,
|
||||
)
|
||||
return json.dumps(share_data, indent=2)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_delete(share_id: int, ctx: Context) -> str:
|
||||
"""Delete a share by its ID.
|
||||
|
||||
Remove a share that you created. You must be the owner of the share.
|
||||
|
||||
Args:
|
||||
share_id: The ID of the share to delete
|
||||
|
||||
Returns:
|
||||
JSON string confirming deletion
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
await client.sharing.delete_share(share_id)
|
||||
return json.dumps(
|
||||
{"success": True, "message": f"Share {share_id} deleted"}, indent=2
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_get(share_id: int, ctx: Context) -> str:
|
||||
"""Get information about a specific share.
|
||||
|
||||
Retrieve details about a share by its ID. You must have access to the share
|
||||
(either as owner or recipient).
|
||||
|
||||
Args:
|
||||
share_id: The ID of the share
|
||||
|
||||
Returns:
|
||||
JSON string with share information
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
share_data = await client.sharing.get_share(share_id)
|
||||
return json.dumps(share_data, indent=2)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_list(
|
||||
ctx: Context, path: str | None = None, shared_with_me: bool = False
|
||||
) -> str:
|
||||
"""List shares created by you or shared with you.
|
||||
|
||||
Args:
|
||||
path: Optional path to filter shares for a specific file/folder
|
||||
shared_with_me: If True, list shares that others shared with you.
|
||||
If False (default), list shares you created.
|
||||
|
||||
Returns:
|
||||
JSON string with list of shares
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
shares = await client.sharing.list_shares(
|
||||
path=path, shared_with_me=shared_with_me
|
||||
)
|
||||
return json.dumps(shares, indent=2)
|
||||
|
||||
@mcp.tool()
|
||||
async def nc_share_update(share_id: int, permissions: int, ctx: Context) -> str:
|
||||
"""Update the permissions of an existing share.
|
||||
|
||||
Modify the permissions for a share you created. You must be the owner.
|
||||
|
||||
Args:
|
||||
share_id: The ID of the share to update
|
||||
permissions: New permissions value:
|
||||
- 1 = read
|
||||
- 2 = update
|
||||
- 4 = create
|
||||
- 8 = delete
|
||||
- 16 = share
|
||||
- 31 = all permissions
|
||||
Common: 1 (read-only), 3 (read+update), 15 (read+update+create+delete)
|
||||
|
||||
Returns:
|
||||
JSON string with updated share information
|
||||
"""
|
||||
client = get_client(ctx)
|
||||
share_data = await client.sharing.update_share(
|
||||
share_id=share_id, permissions=permissions
|
||||
)
|
||||
return json.dumps(share_data, indent=2)
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "nextcloud-mcp-server"
|
||||
version = "0.14.0"
|
||||
version = "0.13.0"
|
||||
description = ""
|
||||
authors = [
|
||||
{name = "Chris Coutinho",email = "chris@coutinho.io"}
|
||||
|
||||
@@ -1,172 +0,0 @@
|
||||
"""Integration tests for Nextcloud Sharing API client."""
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_and_delete_share(nc_client):
|
||||
"""Test creating and deleting a file share."""
|
||||
# Create a test user to share with
|
||||
test_user = "testuser3"
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
|
||||
)
|
||||
except Exception:
|
||||
pass # User might already exist
|
||||
|
||||
# Create a test file
|
||||
file_path = "/test_share_file.txt"
|
||||
file_content = b"Test file for sharing"
|
||||
|
||||
await nc_client.webdav.write_file(file_path, file_content)
|
||||
|
||||
share_id = None
|
||||
try:
|
||||
# Create a share
|
||||
share_data = await nc_client.sharing.create_share(
|
||||
path=file_path,
|
||||
share_with=test_user, # Share with test user
|
||||
share_type=0, # User share
|
||||
permissions=1, # Read-only
|
||||
)
|
||||
|
||||
assert share_data is not None
|
||||
assert "id" in share_data
|
||||
share_id = share_data["id"]
|
||||
logger.info(f"Created share: {share_id}")
|
||||
|
||||
# Get share info
|
||||
share_info = await nc_client.sharing.get_share(share_id)
|
||||
assert share_info["id"] == share_id
|
||||
assert share_info["path"] == file_path
|
||||
assert share_info["permissions"] == 1
|
||||
|
||||
# List shares
|
||||
shares = await nc_client.sharing.list_shares(path=file_path)
|
||||
assert len(shares) > 0
|
||||
assert any(s["id"] == share_id for s in shares)
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
await nc_client.sharing.delete_share(share_id)
|
||||
logger.info(f"Deleted share: {share_id}")
|
||||
|
||||
await nc_client.webdav.delete_resource(file_path)
|
||||
|
||||
# Cleanup test user
|
||||
try:
|
||||
await nc_client.users.delete_user(test_user)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_share_permissions(nc_client):
|
||||
"""Test updating share permissions."""
|
||||
# Create a test user to share with
|
||||
test_user = "testuser3"
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
|
||||
)
|
||||
except Exception:
|
||||
pass # User might already exist
|
||||
|
||||
# Create a test file
|
||||
file_path = "/test_share_update.txt"
|
||||
file_content = b"Test file for permission updates"
|
||||
|
||||
await nc_client.webdav.write_file(file_path, file_content)
|
||||
|
||||
share_id = None
|
||||
try:
|
||||
# Create a share with read-only permissions
|
||||
share_data = await nc_client.sharing.create_share(
|
||||
path=file_path,
|
||||
share_with=test_user,
|
||||
share_type=0,
|
||||
permissions=1, # Read-only
|
||||
)
|
||||
share_id = share_data["id"]
|
||||
|
||||
# Update to read+write permissions
|
||||
updated_share = await nc_client.sharing.update_share(
|
||||
share_id=share_id,
|
||||
permissions=3, # Read + Write
|
||||
)
|
||||
|
||||
assert updated_share["id"] == share_id
|
||||
assert updated_share["permissions"] == 3
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
await nc_client.sharing.delete_share(share_id)
|
||||
|
||||
await nc_client.webdav.delete_resource(file_path)
|
||||
|
||||
# Cleanup test user
|
||||
try:
|
||||
await nc_client.users.delete_user(test_user)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_shares(nc_client):
|
||||
"""Test listing all shares."""
|
||||
# Create a test user to share with
|
||||
test_user = "testuser3"
|
||||
try:
|
||||
await nc_client.users.create_user(
|
||||
userid=test_user, password="SecureP@ssw0rd!2024TestUser"
|
||||
)
|
||||
except Exception:
|
||||
pass # User might already exist
|
||||
|
||||
# Create a test file
|
||||
file_path = "/test_list_shares.txt"
|
||||
file_content = b"Test file for listing shares"
|
||||
|
||||
await nc_client.webdav.write_file(file_path, file_content)
|
||||
|
||||
share_id = None
|
||||
try:
|
||||
# Create a share
|
||||
share_data = await nc_client.sharing.create_share(
|
||||
path=file_path,
|
||||
share_with=test_user,
|
||||
share_type=0,
|
||||
permissions=1,
|
||||
)
|
||||
share_id = share_data["id"]
|
||||
|
||||
# List all shares
|
||||
all_shares = await nc_client.sharing.list_shares()
|
||||
assert len(all_shares) > 0
|
||||
|
||||
# List shares for specific file
|
||||
file_shares = await nc_client.sharing.list_shares(path=file_path)
|
||||
assert len(file_shares) > 0
|
||||
assert any(s["id"] == share_id for s in file_shares)
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
await nc_client.sharing.delete_share(share_id)
|
||||
|
||||
await nc_client.webdav.delete_resource(file_path)
|
||||
|
||||
# Cleanup test user
|
||||
try:
|
||||
await nc_client.users.delete_user(test_user)
|
||||
except Exception:
|
||||
pass
|
||||
+221
-712
File diff suppressed because it is too large
Load Diff
@@ -1,569 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from mcp import ClientSession
|
||||
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
# Stack MCP Tools Tests
|
||||
async def test_deck_stack_mcp_tools(
|
||||
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
|
||||
):
|
||||
"""Test complete deck stack operations via MCP tools."""
|
||||
board_id = temporary_board["id"]
|
||||
stack_title = f"MCP Test Stack {uuid.uuid4().hex[:8]}"
|
||||
stack_order = 1
|
||||
|
||||
# 1. Create stack via MCP tool
|
||||
logger.info(f"Creating stack via MCP: {stack_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_stack",
|
||||
{"board_id": board_id, "title": stack_title, "order": stack_order},
|
||||
)
|
||||
|
||||
assert create_result.isError is False, (
|
||||
f"MCP stack creation failed: {create_result.content}"
|
||||
)
|
||||
created_stack_response = json.loads(create_result.content[0].text)
|
||||
stack_id = created_stack_response["id"]
|
||||
assert created_stack_response["title"] == stack_title
|
||||
assert created_stack_response["order"] == stack_order
|
||||
logger.info(f"Stack created via MCP with ID: {stack_id}")
|
||||
|
||||
try:
|
||||
# 2. Get stack via MCP resource
|
||||
logger.info(f"Getting stack via MCP resource: {stack_id}")
|
||||
get_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
|
||||
)
|
||||
|
||||
assert len(get_result.contents) == 1, "Expected exactly one content item"
|
||||
get_stack_response = json.loads(get_result.contents[0].text)
|
||||
assert get_stack_response["title"] == stack_title
|
||||
logger.info("Stack retrieved via MCP resource successfully")
|
||||
|
||||
# 3. Update stack via MCP tool
|
||||
updated_title = f"Updated {stack_title}"
|
||||
updated_order = 2
|
||||
logger.info(f"Updating stack via MCP tool: {stack_id}")
|
||||
update_result = await nc_mcp_client.call_tool(
|
||||
"deck_update_stack",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": updated_title,
|
||||
"order": updated_order,
|
||||
},
|
||||
)
|
||||
|
||||
assert update_result.isError is False, (
|
||||
f"MCP stack update failed: {update_result.content}"
|
||||
)
|
||||
logger.info("Stack updated via MCP tool successfully")
|
||||
|
||||
# 4. Verify update via direct client
|
||||
updated_stack = await nc_client.deck.get_stack(board_id, stack_id)
|
||||
assert updated_stack.title == updated_title
|
||||
assert updated_stack.order == updated_order
|
||||
logger.info("Stack update verified via direct client")
|
||||
|
||||
# 5. List stacks via MCP resource
|
||||
logger.info("Listing stacks via MCP resource")
|
||||
list_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks"
|
||||
)
|
||||
|
||||
assert len(list_result.contents) == 1, "Expected exactly one content item"
|
||||
stacks_data = json.loads(list_result.contents[0].text)
|
||||
assert isinstance(stacks_data, list)
|
||||
|
||||
# Verify our stack is in the list
|
||||
stack_ids = [stack["id"] for stack in stacks_data]
|
||||
assert stack_id in stack_ids, "Updated stack not found in list"
|
||||
logger.info(f"Stack {stack_id} found in stacks list")
|
||||
|
||||
# 6. Read stack via MCP resource
|
||||
logger.info(f"Reading stack via MCP resource: {stack_id}")
|
||||
read_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
|
||||
)
|
||||
read_stack_data = json.loads(read_result.contents[0].text)
|
||||
assert read_stack_data["title"] == updated_title
|
||||
logger.info("Stack read via MCP resource successfully")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_stack(board_id, stack_id)
|
||||
logger.info(f"Cleaned up stack ID: {stack_id}")
|
||||
|
||||
|
||||
# Card MCP Tools Tests
|
||||
async def test_deck_card_mcp_tools(
|
||||
nc_mcp_client: ClientSession,
|
||||
nc_client: NextcloudClient,
|
||||
temporary_board_with_stack: tuple,
|
||||
):
|
||||
"""Test complete deck card operations via MCP tools."""
|
||||
board_data, stack_data = temporary_board_with_stack
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
card_title = f"MCP Test Card {uuid.uuid4().hex[:8]}"
|
||||
card_description = f"Test description for {card_title}"
|
||||
|
||||
# 1. Create card via MCP tool
|
||||
logger.info(f"Creating card via MCP: {card_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": card_title,
|
||||
"description": card_description,
|
||||
"type": "plain",
|
||||
"order": 1,
|
||||
},
|
||||
)
|
||||
|
||||
assert create_result.isError is False, (
|
||||
f"MCP card creation failed: {create_result.content}"
|
||||
)
|
||||
created_card_response = json.loads(create_result.content[0].text)
|
||||
card_id = created_card_response["id"]
|
||||
assert created_card_response["title"] == card_title
|
||||
assert created_card_response["description"] == card_description
|
||||
logger.info(f"Card created via MCP with ID: {card_id}")
|
||||
|
||||
try:
|
||||
# 2. Get card via MCP resource
|
||||
logger.info(f"Getting card via MCP resource: {card_id}")
|
||||
get_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
|
||||
)
|
||||
|
||||
assert len(get_result.contents) == 1, "Expected exactly one content item"
|
||||
get_card_response = json.loads(get_result.contents[0].text)
|
||||
assert get_card_response["title"] == card_title
|
||||
logger.info("Card retrieved via MCP resource successfully")
|
||||
|
||||
# 3. Update card via MCP tool
|
||||
updated_title = f"Updated {card_title}"
|
||||
updated_description = f"Updated description for {card_title}"
|
||||
logger.info(f"Updating card via MCP tool: {card_id}")
|
||||
update_result = await nc_mcp_client.call_tool(
|
||||
"deck_update_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"title": updated_title,
|
||||
"description": updated_description,
|
||||
},
|
||||
)
|
||||
|
||||
assert update_result.isError is False, (
|
||||
f"MCP card update failed: {update_result.content}"
|
||||
)
|
||||
logger.info("Card updated via MCP tool successfully")
|
||||
|
||||
# 4. Verify update via direct client
|
||||
updated_card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
assert updated_card.title == updated_title
|
||||
assert updated_card.description == updated_description
|
||||
logger.info("Card update verified via direct client")
|
||||
|
||||
# 5. Archive/unarchive card via MCP tools
|
||||
logger.info(f"Archiving card via MCP tool: {card_id}")
|
||||
archive_result = await nc_mcp_client.call_tool(
|
||||
"deck_archive_card",
|
||||
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
|
||||
)
|
||||
|
||||
assert archive_result.isError is False, (
|
||||
f"MCP card archive failed: {archive_result.content}"
|
||||
)
|
||||
logger.info("Card archived via MCP tool successfully")
|
||||
|
||||
logger.info(f"Unarchiving card via MCP tool: {card_id}")
|
||||
unarchive_result = await nc_mcp_client.call_tool(
|
||||
"deck_unarchive_card",
|
||||
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
|
||||
)
|
||||
|
||||
assert unarchive_result.isError is False, (
|
||||
f"MCP card unarchive failed: {unarchive_result.content}"
|
||||
)
|
||||
logger.info("Card unarchived via MCP tool successfully")
|
||||
|
||||
# 6. Move card to different position via MCP tool
|
||||
logger.info(f"Reordering card via MCP tool: {card_id}")
|
||||
reorder_result = await nc_mcp_client.call_tool(
|
||||
"deck_reorder_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"order": 10,
|
||||
"target_stack_id": stack_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert reorder_result.isError is False, (
|
||||
f"MCP card reorder failed: {reorder_result.content}"
|
||||
)
|
||||
logger.info("Card reordered via MCP tool successfully")
|
||||
|
||||
# 7. Read card via MCP resource
|
||||
logger.info(f"Reading card via MCP resource: {card_id}")
|
||||
read_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
|
||||
)
|
||||
read_card_data = json.loads(read_result.contents[0].text)
|
||||
assert read_card_data["title"] == updated_title
|
||||
logger.info("Card read via MCP resource successfully")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_card(board_id, stack_id, card_id)
|
||||
logger.info(f"Cleaned up card ID: {card_id}")
|
||||
|
||||
|
||||
# Label MCP Tools Tests
|
||||
async def test_deck_label_mcp_tools(
|
||||
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
|
||||
):
|
||||
"""Test complete deck label operations via MCP tools."""
|
||||
board_id = temporary_board["id"]
|
||||
label_title = f"MCP Test Label {uuid.uuid4().hex[:8]}"
|
||||
label_color = "FF0000" # Red
|
||||
|
||||
# 1. Create label via MCP tool
|
||||
logger.info(f"Creating label via MCP: {label_title}")
|
||||
create_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_label",
|
||||
{"board_id": board_id, "title": label_title, "color": label_color},
|
||||
)
|
||||
|
||||
assert create_result.isError is False, (
|
||||
f"MCP label creation failed: {create_result.content}"
|
||||
)
|
||||
created_label_response = json.loads(create_result.content[0].text)
|
||||
label_id = created_label_response["id"]
|
||||
assert created_label_response["title"] == label_title
|
||||
assert created_label_response["color"] == label_color
|
||||
logger.info(f"Label created via MCP with ID: {label_id}")
|
||||
|
||||
try:
|
||||
# 2. Get label via MCP resource
|
||||
logger.info(f"Getting label via MCP resource: {label_id}")
|
||||
get_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/labels/{label_id}"
|
||||
)
|
||||
|
||||
assert len(get_result.contents) == 1, "Expected exactly one content item"
|
||||
get_label_response = json.loads(get_result.contents[0].text)
|
||||
assert get_label_response["title"] == label_title
|
||||
logger.info("Label retrieved via MCP resource successfully")
|
||||
|
||||
# 3. Update label via MCP tool
|
||||
updated_title = f"Updated {label_title}"
|
||||
updated_color = "00FF00" # Green
|
||||
logger.info(f"Updating label via MCP tool: {label_id}")
|
||||
update_result = await nc_mcp_client.call_tool(
|
||||
"deck_update_label",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"label_id": label_id,
|
||||
"title": updated_title,
|
||||
"color": updated_color,
|
||||
},
|
||||
)
|
||||
|
||||
assert update_result.isError is False, (
|
||||
f"MCP label update failed: {update_result.content}"
|
||||
)
|
||||
logger.info("Label updated via MCP tool successfully")
|
||||
|
||||
# 4. Verify update via direct client
|
||||
updated_label = await nc_client.deck.get_label(board_id, label_id)
|
||||
assert updated_label.title == updated_title
|
||||
assert updated_label.color == updated_color
|
||||
logger.info("Label update verified via direct client")
|
||||
|
||||
# 5. Read label via MCP resource
|
||||
logger.info(f"Reading label via MCP resource: {label_id}")
|
||||
read_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/labels/{label_id}"
|
||||
)
|
||||
read_label_data = json.loads(read_result.contents[0].text)
|
||||
assert read_label_data["title"] == updated_title
|
||||
logger.info("Label read via MCP resource successfully")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_label(board_id, label_id)
|
||||
logger.info(f"Cleaned up label ID: {label_id}")
|
||||
|
||||
|
||||
# Label-Card Assignment Tests
|
||||
async def test_deck_card_label_assignment_mcp_tools(
|
||||
nc_mcp_client: ClientSession,
|
||||
nc_client: NextcloudClient,
|
||||
temporary_board_with_card: tuple,
|
||||
):
|
||||
"""Test card-label assignment operations via MCP tools."""
|
||||
board_data, stack_data, card_data = temporary_board_with_card
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
card_id = card_data["id"]
|
||||
|
||||
# Create a label for assignment
|
||||
label = await nc_client.deck.create_label(
|
||||
board_id, "Assignment Test Label", "0000FF"
|
||||
)
|
||||
label_id = label.id
|
||||
|
||||
try:
|
||||
# 1. Assign label to card via MCP tool
|
||||
logger.info(f"Assigning label {label_id} to card {card_id} via MCP")
|
||||
assign_result = await nc_mcp_client.call_tool(
|
||||
"deck_assign_label_to_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"label_id": label_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert assign_result.isError is False, (
|
||||
f"MCP label assignment failed: {assign_result.content}"
|
||||
)
|
||||
logger.info("Label assigned to card via MCP tool successfully")
|
||||
|
||||
# 2. Verify assignment via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.labels:
|
||||
label_ids = [label.id for label in card.labels]
|
||||
assert label_id in label_ids, "Label not found in card labels"
|
||||
logger.info("Label assignment verified via direct client")
|
||||
|
||||
# 3. Remove label from card via MCP tool
|
||||
logger.info(f"Removing label {label_id} from card {card_id} via MCP")
|
||||
remove_result = await nc_mcp_client.call_tool(
|
||||
"deck_remove_label_from_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"label_id": label_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert remove_result.isError is False, (
|
||||
f"MCP label removal failed: {remove_result.content}"
|
||||
)
|
||||
logger.info("Label removed from card via MCP tool successfully")
|
||||
|
||||
# 4. Verify removal via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.labels:
|
||||
label_ids = [label.id for label in card.labels]
|
||||
assert label_id not in label_ids, (
|
||||
"Label still found in card labels after removal"
|
||||
)
|
||||
logger.info("Label removal verified via direct client")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
await nc_client.deck.delete_label(board_id, label_id)
|
||||
logger.info(f"Cleaned up label ID: {label_id}")
|
||||
|
||||
|
||||
# User Assignment Tests
|
||||
async def test_deck_card_user_assignment_mcp_tools(
|
||||
nc_mcp_client: ClientSession,
|
||||
nc_client: NextcloudClient,
|
||||
temporary_board_with_card: tuple,
|
||||
):
|
||||
"""Test card-user assignment operations via MCP tools."""
|
||||
board_data, stack_data, card_data = temporary_board_with_card
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
card_id = card_data["id"]
|
||||
|
||||
# Use the current user ID (admin in most test environments)
|
||||
user_id = "admin"
|
||||
|
||||
# 1. Assign user to card via MCP tool
|
||||
logger.info(f"Assigning user {user_id} to card {card_id} via MCP")
|
||||
assign_result = await nc_mcp_client.call_tool(
|
||||
"deck_assign_user_to_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert assign_result.isError is False, (
|
||||
f"MCP user assignment failed: {assign_result.content}"
|
||||
)
|
||||
logger.info("User assigned to card via MCP tool successfully")
|
||||
|
||||
# 2. Verify assignment via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.assignedUsers:
|
||||
user_ids = []
|
||||
for user in card.assignedUsers:
|
||||
if hasattr(user, "participant"):
|
||||
# It's a DeckAssignedUser with participant
|
||||
user_ids.append(user.participant.uid)
|
||||
elif hasattr(user, "uid"):
|
||||
# It's a direct DeckUser
|
||||
user_ids.append(user.uid)
|
||||
assert user_id in user_ids, "User not found in card assigned users"
|
||||
logger.info("User assignment verified via direct client")
|
||||
|
||||
# 3. Unassign user from card via MCP tool
|
||||
logger.info(f"Unassigning user {user_id} from card {card_id} via MCP")
|
||||
unassign_result = await nc_mcp_client.call_tool(
|
||||
"deck_unassign_user_from_card",
|
||||
{
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"card_id": card_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
|
||||
assert unassign_result.isError is False, (
|
||||
f"MCP user unassignment failed: {unassign_result.content}"
|
||||
)
|
||||
logger.info("User unassigned from card via MCP tool successfully")
|
||||
|
||||
# 4. Verify unassignment via direct client
|
||||
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
|
||||
if card.assignedUsers:
|
||||
user_ids = []
|
||||
for user in card.assignedUsers:
|
||||
if hasattr(user, "participant"):
|
||||
# It's a DeckAssignedUser with participant
|
||||
user_ids.append(user.participant.uid)
|
||||
elif hasattr(user, "uid"):
|
||||
# It's a direct DeckUser
|
||||
user_ids.append(user.uid)
|
||||
assert user_id not in user_ids, (
|
||||
"User still found in card assigned users after removal"
|
||||
)
|
||||
logger.info("User unassignment verified via direct client")
|
||||
|
||||
|
||||
# Error handling tests
|
||||
async def test_deck_mcp_tools_error_handling(nc_mcp_client: ClientSession):
|
||||
"""Test error handling for deck MCP tools with invalid parameters."""
|
||||
non_existent_id = 999999999
|
||||
|
||||
# Test stack operations with non-existent board
|
||||
stack_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_stack",
|
||||
{"board_id": non_existent_id, "title": "Should Fail", "order": 1},
|
||||
)
|
||||
assert stack_result.isError is True, (
|
||||
"Expected error for stack creation on non-existent board"
|
||||
)
|
||||
|
||||
# Test card operations with non-existent IDs
|
||||
card_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
{
|
||||
"board_id": non_existent_id,
|
||||
"stack_id": non_existent_id,
|
||||
"title": "Should Fail",
|
||||
"type": "plain",
|
||||
},
|
||||
)
|
||||
assert card_result.isError is True, (
|
||||
"Expected error for card creation with non-existent IDs"
|
||||
)
|
||||
|
||||
# Test label operations with non-existent board
|
||||
label_result = await nc_mcp_client.call_tool(
|
||||
"deck_create_label",
|
||||
{"board_id": non_existent_id, "title": "Should Fail", "color": "FF0000"},
|
||||
)
|
||||
assert label_result.isError is True, (
|
||||
"Expected error for label creation on non-existent board"
|
||||
)
|
||||
|
||||
logger.info("Error handling tests passed for deck MCP tools")
|
||||
|
||||
|
||||
# Resource template tests
|
||||
async def test_deck_mcp_resource_templates(nc_mcp_client: ClientSession):
|
||||
"""Test deck MCP resource templates are properly registered."""
|
||||
templates = await nc_mcp_client.list_resource_templates()
|
||||
template_uris = [template.uriTemplate for template in templates.resourceTemplates]
|
||||
|
||||
expected_templates = [
|
||||
"nc://Deck/boards/{board_id}/stacks/{stack_id}",
|
||||
"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}",
|
||||
"nc://Deck/boards/{board_id}/labels/{label_id}",
|
||||
]
|
||||
|
||||
for expected_template in expected_templates:
|
||||
assert expected_template in template_uris, (
|
||||
f"Expected template '{expected_template}' not found"
|
||||
)
|
||||
logger.info(f"Found expected deck resource template: {expected_template}")
|
||||
|
||||
|
||||
# Listing resource tests
|
||||
async def test_deck_mcp_listing_resources(
|
||||
nc_mcp_client: ClientSession, temporary_board_with_card: tuple
|
||||
):
|
||||
"""Test deck MCP listing resources for stacks and cards."""
|
||||
board_data, stack_data, card_data = temporary_board_with_card
|
||||
board_id = board_data["id"]
|
||||
stack_id = stack_data["id"]
|
||||
|
||||
# 1. Test listing stacks resource
|
||||
logger.info(f"Reading stacks list via MCP resource for board {board_id}")
|
||||
stacks_resource_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks"
|
||||
)
|
||||
stacks_resource_data = json.loads(stacks_resource_result.contents[0].text)
|
||||
assert isinstance(stacks_resource_data, list)
|
||||
|
||||
# Verify our stack is in the resource list
|
||||
stack_ids = [stack["id"] for stack in stacks_resource_data]
|
||||
assert stack_id in stack_ids, "Stack not found in stacks resource list"
|
||||
logger.info("Stack found in stacks resource list")
|
||||
|
||||
# 2. Test listing cards resource
|
||||
logger.info(f"Reading cards list via MCP resource for stack {stack_id}")
|
||||
cards_resource_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards"
|
||||
)
|
||||
cards_resource_data = json.loads(cards_resource_result.contents[0].text)
|
||||
assert isinstance(cards_resource_data, list)
|
||||
|
||||
# Verify our card is in the resource list
|
||||
card_ids = [card["id"] for card in cards_resource_data]
|
||||
assert card_data["id"] in card_ids, "Card not found in cards resource list"
|
||||
logger.info("Card found in cards resource list")
|
||||
|
||||
# 3. Test listing labels resource
|
||||
logger.info(f"Reading labels list via MCP resource for board {board_id}")
|
||||
labels_resource_result = await nc_mcp_client.read_resource(
|
||||
f"nc://Deck/boards/{board_id}/labels"
|
||||
)
|
||||
labels_resource_data = json.loads(labels_resource_result.contents[0].text)
|
||||
assert isinstance(labels_resource_data, list)
|
||||
logger.info("Labels resource read successfully")
|
||||
@@ -1,358 +0,0 @@
|
||||
"""
|
||||
Multi-user OAuth tests for Nextcloud Deck board permissions.
|
||||
|
||||
Tests verify that the MCP server respects Nextcloud Deck board ACL permissions
|
||||
when accessed via OAuth authentication with different users.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
|
||||
|
||||
|
||||
async def add_board_acl(nc_client, board_id: int, user: str, permission_type: int = 0):
|
||||
"""
|
||||
Helper to add ACL entry to a Deck board.
|
||||
|
||||
Args:
|
||||
nc_client: Admin NextcloudClient
|
||||
board_id: Board ID
|
||||
user: Username to grant access
|
||||
permission_type: 0=view, 1=edit, 2=manage
|
||||
|
||||
Returns:
|
||||
ACL entry ID
|
||||
"""
|
||||
acl = await nc_client.deck.add_acl_rule(
|
||||
board_id=board_id,
|
||||
type=0, # 0 = user, 1 = group
|
||||
participant=user,
|
||||
permission_edit=permission_type >= 1,
|
||||
permission_share=permission_type >= 2,
|
||||
permission_manage=permission_type >= 2,
|
||||
)
|
||||
logger.info(f"Added ACL for board {board_id}: {user} (type={permission_type})")
|
||||
return acl.id
|
||||
|
||||
|
||||
async def delete_board_acl(nc_client, board_id: int, acl_id: int):
|
||||
"""Helper to delete a board ACL entry."""
|
||||
await nc_client.deck.delete_acl_rule(board_id, acl_id)
|
||||
logger.info(f"Deleted ACL {acl_id} from board {board_id}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_board_view_permissions(
|
||||
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that Deck boards respect view permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice
|
||||
2. Admin adds bob to board with view-only permissions
|
||||
3. Bob can view the board via MCP tools
|
||||
4. Diana cannot access the board (no ACL entry)
|
||||
"""
|
||||
# Create a board as alice
|
||||
logger.info("Creating Deck board as alice...")
|
||||
board = await nc_client.deck.create_board(
|
||||
"Alice's Shared Board - View Test", "FF0000"
|
||||
)
|
||||
board_id = board.id
|
||||
|
||||
bob_acl_id = None
|
||||
|
||||
try:
|
||||
# Add bob to board with view-only permission
|
||||
logger.info("Adding bob to board with view permission...")
|
||||
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
|
||||
|
||||
# Test: Bob can view the board via MCP
|
||||
logger.info("Bob attempting to list boards via MCP...")
|
||||
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Bob can see {len(response_data)} boards: {board_ids}")
|
||||
|
||||
# Bob should see the shared board
|
||||
if board_id in board_ids:
|
||||
logger.info(f"Bob can see shared board {board_id}")
|
||||
else:
|
||||
logger.warning(f"Bob cannot see shared board {board_id}")
|
||||
else:
|
||||
logger.warning(f"Bob could not list boards: {result.content}")
|
||||
|
||||
# Test: Diana cannot see the board
|
||||
logger.info("Diana attempting to list boards via MCP...")
|
||||
result = await diana_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Diana can see {len(response_data)} boards")
|
||||
|
||||
# Diana should NOT see the board
|
||||
assert board_id not in board_ids, "Diana should not see board without ACL"
|
||||
logger.info("Diana correctly cannot see board without ACL")
|
||||
else:
|
||||
logger.warning(f"Diana could not list boards: {result.content}")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if bob_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, bob_acl_id)
|
||||
logger.info(f"Deleting board {board_id}")
|
||||
await nc_client.deck.delete_board(board_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_board_edit_permissions(
|
||||
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that Deck boards respect edit permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice with a stack
|
||||
2. Admin adds charlie with edit permission
|
||||
3. Admin adds bob with view-only permission
|
||||
4. Charlie can create cards via MCP tools
|
||||
5. Bob cannot create cards
|
||||
"""
|
||||
# Create a board as alice
|
||||
logger.info("Creating Deck board as alice...")
|
||||
board = await nc_client.deck.create_board(
|
||||
"Alice's Shared Board - Edit Test", "00FF00"
|
||||
)
|
||||
board_id = board.id
|
||||
|
||||
# Create a stack in the board
|
||||
logger.info("Creating stack in board...")
|
||||
stack = await nc_client.deck.create_stack(board_id, "Test Stack", 1)
|
||||
stack_id = stack.id
|
||||
|
||||
charlie_acl_id = None
|
||||
bob_acl_id = None
|
||||
|
||||
try:
|
||||
# Add charlie with edit permission
|
||||
logger.info("Adding charlie to board with edit permission...")
|
||||
charlie_acl_id = await add_board_acl(
|
||||
nc_client, board_id, "charlie", permission_type=1
|
||||
)
|
||||
|
||||
# Add bob with view-only permission
|
||||
logger.info("Adding bob to board with view permission...")
|
||||
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
|
||||
|
||||
# Test: Charlie can create a card
|
||||
logger.info("Charlie attempting to create card via MCP...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
arguments={
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": "Charlie's Card",
|
||||
"description": "Created by Charlie with edit permission",
|
||||
},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
card_id = response_data.get("id")
|
||||
logger.info(f"Charlie successfully created card {card_id}")
|
||||
|
||||
# Cleanup the card
|
||||
await nc_client.deck.delete_card(board_id, stack_id, card_id)
|
||||
else:
|
||||
logger.warning(f"Charlie could not create card: {result.content}")
|
||||
|
||||
# Test: Bob attempts to create a card (should fail)
|
||||
logger.info("Bob attempting to create card via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"deck_create_card",
|
||||
arguments={
|
||||
"board_id": board_id,
|
||||
"stack_id": stack_id,
|
||||
"title": "Bob's Card",
|
||||
"description": "Bob trying to create a card",
|
||||
},
|
||||
)
|
||||
|
||||
if result.isError:
|
||||
logger.info("Bob correctly denied card creation (view-only)")
|
||||
else:
|
||||
logger.warning("Bob unexpectedly succeeded in creating card")
|
||||
# Cleanup if bob somehow created a card
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if "id" in response_data:
|
||||
await nc_client.deck.delete_card(
|
||||
board_id, stack_id, response_data["id"]
|
||||
)
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if charlie_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, charlie_acl_id)
|
||||
if bob_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, bob_acl_id)
|
||||
logger.info(f"Deleting board {board_id}")
|
||||
await nc_client.deck.delete_board(board_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_board_manage_permissions(
|
||||
nc_client, alice_mcp_client, charlie_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that Deck boards respect manage permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice
|
||||
2. Admin adds charlie with manage permission
|
||||
3. Charlie can create stacks and modify board settings
|
||||
"""
|
||||
# Create a board as alice
|
||||
logger.info("Creating Deck board as alice...")
|
||||
board = await nc_client.deck.create_board(
|
||||
"Alice's Shared Board - Manage Test", "0000FF"
|
||||
)
|
||||
board_id = board.id
|
||||
|
||||
charlie_acl_id = None
|
||||
|
||||
try:
|
||||
# Add charlie with manage permission
|
||||
logger.info("Adding charlie to board with manage permission...")
|
||||
charlie_acl_id = await add_board_acl(
|
||||
nc_client, board_id, "charlie", permission_type=2
|
||||
)
|
||||
|
||||
# Test: Charlie can create a stack
|
||||
logger.info("Charlie attempting to create stack via MCP...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"deck_create_stack",
|
||||
arguments={"board_id": board_id, "title": "Charlie's Stack", "order": 1},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
stack_id = response_data.get("id")
|
||||
logger.info(f"Charlie successfully created stack {stack_id}")
|
||||
|
||||
# Cleanup the stack
|
||||
await nc_client.deck.delete_stack(board_id, stack_id)
|
||||
else:
|
||||
logger.warning(f"Charlie could not create stack: {result.content}")
|
||||
|
||||
# Test: Charlie can delete a stack (manage permission)
|
||||
logger.info("Charlie attempting to delete stack via MCP...")
|
||||
# First create a temporary stack to delete
|
||||
temp_stack = await nc_client.deck.create_stack(
|
||||
board_id, "Temp Stack for Deletion", 99
|
||||
)
|
||||
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"deck_delete_stack",
|
||||
arguments={"board_id": board_id, "stack_id": temp_stack.id},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
logger.info("Charlie successfully deleted stack")
|
||||
else:
|
||||
logger.warning(f"Charlie could not delete stack: {result.content}")
|
||||
# Cleanup if deletion via MCP failed
|
||||
try:
|
||||
await nc_client.deck.delete_stack(board_id, temp_stack.id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if charlie_acl_id:
|
||||
await delete_board_acl(nc_client, board_id, charlie_acl_id)
|
||||
logger.info(f"Deleting board {board_id}")
|
||||
await nc_client.deck.delete_board(board_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deck_user_isolation(nc_client, alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that users can only see their own boards when not shared.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a board as alice (not shared)
|
||||
2. Admin creates a board as bob (not shared)
|
||||
3. Alice can only see her own board
|
||||
4. Bob can only see his own board
|
||||
"""
|
||||
# Create alice's board
|
||||
logger.info("Creating alice's private board...")
|
||||
alice_board = await nc_client.deck.create_board("Alice's Private Board", "FF00FF")
|
||||
alice_board_id = alice_board.id
|
||||
|
||||
# Create bob's board
|
||||
logger.info("Creating bob's private board...")
|
||||
bob_board = await nc_client.deck.create_board("Bob's Private Board", "00FFFF")
|
||||
bob_board_id = bob_board.id
|
||||
|
||||
try:
|
||||
# Test: Alice lists boards
|
||||
logger.info("Alice listing boards via MCP...")
|
||||
result = await alice_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Alice can see boards: {board_ids}")
|
||||
|
||||
# Alice should NOT see Bob's board
|
||||
assert bob_board_id not in board_ids, (
|
||||
"Alice should not see Bob's private board"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Alice could not list boards: {result.content}")
|
||||
|
||||
# Test: Bob lists boards
|
||||
logger.info("Bob listing boards via MCP...")
|
||||
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
# The response is directly a list of boards
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
board_ids = [b["id"] for b in response_data]
|
||||
logger.info(f"Bob can see boards: {board_ids}")
|
||||
|
||||
# Bob should NOT see Alice's board
|
||||
assert alice_board_id not in board_ids, (
|
||||
"Bob should not see Alice's private board"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Bob could not list boards: {result.content}")
|
||||
|
||||
logger.info("User isolation test passed: users can only see their own boards")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info("Cleaning up test boards...")
|
||||
await nc_client.deck.delete_board(alice_board_id)
|
||||
await nc_client.deck.delete_board(bob_board_id)
|
||||
@@ -1,425 +0,0 @@
|
||||
"""
|
||||
Multi-user OAuth tests for Nextcloud WebDAV file permissions.
|
||||
|
||||
Tests verify that the MCP server respects Nextcloud file sharing permissions
|
||||
when accessed via OAuth authentication with different users.
|
||||
|
||||
All operations (file creation, sharing, access) are performed through MCP tools
|
||||
to ensure the MCP server properly supports multi-user scenarios.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_share_read_permissions(
|
||||
alice_mcp_client, bob_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared files respect read permissions.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates a file via MCP
|
||||
2. Alice shares the file with Bob (read-only) via MCP
|
||||
3. Bob can read the file via MCP tools
|
||||
4. Diana cannot access the file (no share)
|
||||
"""
|
||||
file_path = "/alice_shared_file_read.txt"
|
||||
file_content = "This file is shared with Bob for reading only."
|
||||
|
||||
# Alice creates a file
|
||||
logger.info(f"Alice creating file: {file_path}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": file_content},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares the file with bob (read-only, permissions=1)
|
||||
logger.info("Alice sharing file with bob (read-only)...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": file_path,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create share: {result.content}"
|
||||
share_data = json.loads(result.content[0].text)
|
||||
share_id = share_data["id"]
|
||||
logger.info(f"Created share {share_id}")
|
||||
|
||||
# Test: Bob reads the file via MCP
|
||||
logger.info("Bob attempting to read file via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_read_file", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
# Bob should be able to read the shared file
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
logger.info(
|
||||
f"Bob successfully read file: {response_data.get('content', '')[:50]}..."
|
||||
)
|
||||
assert "content" in response_data
|
||||
assert file_content in response_data["content"]
|
||||
else:
|
||||
logger.warning(f"Bob could not read file: {result.content}")
|
||||
# This might fail if the share path is different for bob
|
||||
|
||||
# Test: Diana attempts to read the file
|
||||
logger.info("Diana attempting to read file via MCP...")
|
||||
result = await diana_mcp_client.call_tool(
|
||||
"nc_webdav_read_file", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
# Diana should NOT be able to read (no share)
|
||||
if result.isError:
|
||||
logger.info("Diana correctly denied access to unshared file")
|
||||
else:
|
||||
logger.warning("Diana unexpectedly could read unshared file")
|
||||
|
||||
finally:
|
||||
# Cleanup - Alice deletes the share and file
|
||||
if share_id:
|
||||
logger.info(f"Alice deleting share {share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": share_id}
|
||||
)
|
||||
logger.info(f"Alice deleting file {file_path}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_share_write_permissions(
|
||||
alice_mcp_client, charlie_mcp_client, bob_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared files respect write permissions.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates a file via MCP
|
||||
2. Alice shares the file with Charlie (edit permission) via MCP
|
||||
3. Alice shares the file with Bob (read-only) via MCP
|
||||
4. Charlie can edit the file via MCP tools
|
||||
5. Bob cannot edit the file
|
||||
"""
|
||||
file_path = "/alice_shared_file_write.txt"
|
||||
file_content = "This file is shared with Charlie for editing."
|
||||
|
||||
logger.info(f"Alice creating file: {file_path}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": file_content},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
charlie_share_id = None
|
||||
bob_share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares with Charlie (read+write, permissions=3)
|
||||
logger.info("Alice sharing file with Charlie (edit permission)...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": file_path,
|
||||
"share_with": "charlie",
|
||||
"share_type": 0,
|
||||
"permissions": 3,
|
||||
},
|
||||
)
|
||||
assert not result.isError, (
|
||||
f"Alice failed to share with Charlie: {result.content}"
|
||||
)
|
||||
charlie_share_data = json.loads(result.content[0].text)
|
||||
charlie_share_id = charlie_share_data["id"]
|
||||
logger.info(f"Created share {charlie_share_id} for Charlie")
|
||||
|
||||
# Alice shares with Bob (read-only, permissions=1)
|
||||
logger.info("Alice sharing file with Bob (read-only)...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": file_path,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to share with Bob: {result.content}"
|
||||
bob_share_data = json.loads(result.content[0].text)
|
||||
bob_share_id = bob_share_data["id"]
|
||||
logger.info(f"Created share {bob_share_id} for Bob")
|
||||
|
||||
# Test: Charlie can write to the file
|
||||
logger.info("Charlie attempting to write to file via MCP...")
|
||||
updated_content = f"{file_content}\nCharlie added this line."
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": updated_content},
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
logger.info("Charlie successfully wrote to file")
|
||||
else:
|
||||
logger.warning(f"Charlie could not write to file: {result.content}")
|
||||
|
||||
# Test: Bob attempts to write (should fail)
|
||||
logger.info("Bob attempting to write to file via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_path, "content": "Bob tries to overwrite this."},
|
||||
)
|
||||
|
||||
# Bob should be denied
|
||||
if result.isError:
|
||||
logger.info("Bob correctly denied write access")
|
||||
else:
|
||||
logger.warning("Bob unexpectedly succeeded in writing (permissions issue?)")
|
||||
|
||||
finally:
|
||||
# Cleanup - Alice deletes shares and file
|
||||
if charlie_share_id:
|
||||
logger.info(f"Alice deleting Charlie's share {charlie_share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": charlie_share_id}
|
||||
)
|
||||
if bob_share_id:
|
||||
logger.info(f"Alice deleting Bob's share {bob_share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": bob_share_id}
|
||||
)
|
||||
logger.info(f"Alice deleting file {file_path}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": file_path}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_list_permissions(alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that file listing respects share permissions.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates her private file via MCP
|
||||
2. Bob creates his private file via MCP
|
||||
3. Alice creates a file and shares it with Bob via MCP
|
||||
4. Alice can list her own files + shared files
|
||||
5. Bob can list his own files + shared files from Alice
|
||||
"""
|
||||
alice_file = "/alice_private_file.txt"
|
||||
bob_file = "/bob_private_file.txt"
|
||||
shared_file = "/alice_shared_with_bob.txt"
|
||||
|
||||
# Alice creates her private file
|
||||
logger.info(f"Alice creating private file: {alice_file}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": alice_file, "content": "Alice's private file"},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
# Bob creates his private file
|
||||
logger.info(f"Bob creating private file: {bob_file}")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": bob_file, "content": "Bob's private file"},
|
||||
)
|
||||
assert not result.isError, f"Bob failed to create file: {result.content}"
|
||||
|
||||
# Alice creates a shared file
|
||||
logger.info(f"Alice creating shared file: {shared_file}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": shared_file, "content": "Shared file content"},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create shared file: {result.content}"
|
||||
|
||||
share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares the file with Bob
|
||||
logger.info("Alice sharing file with Bob...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": shared_file,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create share: {result.content}"
|
||||
share_data = json.loads(result.content[0].text)
|
||||
share_id = share_data["id"]
|
||||
|
||||
# Test: Alice lists files in root
|
||||
logger.info("Alice listing files via MCP...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_list_directory", arguments={"path": "/"}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
file_names = [f["name"] for f in response_data]
|
||||
logger.info(f"Alice can see files: {file_names}")
|
||||
|
||||
# Alice should see her own files
|
||||
# Note: Exact assertions depend on test isolation
|
||||
else:
|
||||
logger.warning(f"Alice could not list files: {result.content}")
|
||||
|
||||
# Test: Bob lists files in root
|
||||
logger.info("Bob listing files via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_list_directory", arguments={"path": "/"}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
file_names = [f["name"] for f in response_data]
|
||||
logger.info(f"Bob can see files: {file_names}")
|
||||
|
||||
# Bob should see his own file, but not Alice's private file
|
||||
# Bob may see shared files in his shared folder or via different path
|
||||
else:
|
||||
logger.warning(f"Bob could not list files: {result.content}")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if share_id:
|
||||
logger.info(f"Alice deleting share {share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": share_id}
|
||||
)
|
||||
|
||||
logger.info("Cleaning up Alice's files...")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": alice_file}
|
||||
)
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": shared_file}
|
||||
)
|
||||
|
||||
logger.info("Cleaning up Bob's files...")
|
||||
await bob_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": bob_file}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_folder_share_permissions(alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that folder sharing works correctly.
|
||||
|
||||
Scenario:
|
||||
1. Alice creates a folder via MCP
|
||||
2. Alice creates files in the folder via MCP
|
||||
3. Alice shares the folder with Bob via MCP
|
||||
4. Bob can access files in the shared folder via MCP
|
||||
"""
|
||||
folder_path = "/alice_shared_folder"
|
||||
file_in_folder = f"{folder_path}/document.txt"
|
||||
file_content = "This is a document in Alice's shared folder"
|
||||
|
||||
# Alice creates folder
|
||||
logger.info(f"Alice creating folder: {folder_path}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_create_directory", arguments={"path": folder_path}
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create folder: {result.content}"
|
||||
|
||||
# Alice creates file in folder
|
||||
logger.info(f"Alice creating file in folder: {file_in_folder}")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_webdav_write_file",
|
||||
arguments={"path": file_in_folder, "content": file_content},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create file: {result.content}"
|
||||
|
||||
share_id = None
|
||||
|
||||
try:
|
||||
# Alice shares the folder with Bob
|
||||
logger.info("Alice sharing folder with Bob...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_share_create",
|
||||
arguments={
|
||||
"path": folder_path,
|
||||
"share_with": "bob",
|
||||
"share_type": 0,
|
||||
"permissions": 1,
|
||||
},
|
||||
)
|
||||
assert not result.isError, f"Alice failed to create share: {result.content}"
|
||||
share_data = json.loads(result.content[0].text)
|
||||
share_id = share_data["id"]
|
||||
logger.info(f"Created folder share {share_id}")
|
||||
|
||||
# Test: Bob lists the shared folder
|
||||
logger.info("Bob attempting to list shared folder via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_list_directory", arguments={"path": folder_path}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
if not isinstance(response_data, list):
|
||||
response_data = [response_data] if response_data else []
|
||||
logger.info(f"Bob can see {len(response_data)} files in shared folder")
|
||||
|
||||
# Bob should see the file in the shared folder
|
||||
file_names = [f["name"] for f in response_data]
|
||||
assert "document.txt" in file_names, (
|
||||
"Bob should see the file in shared folder"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Bob could not list shared folder: {result.content}")
|
||||
|
||||
# Test: Bob reads the file in the shared folder
|
||||
logger.info("Bob attempting to read file in shared folder via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_webdav_read_file", arguments={"path": file_in_folder}
|
||||
)
|
||||
|
||||
if not result.isError:
|
||||
response_data = json.loads(result.content[0].text)
|
||||
logger.info("Bob successfully read file in shared folder")
|
||||
assert "content" in response_data
|
||||
assert file_content in response_data["content"]
|
||||
else:
|
||||
logger.warning(
|
||||
f"Bob could not read file in shared folder: {result.content}"
|
||||
)
|
||||
|
||||
finally:
|
||||
# Cleanup - Alice deletes the share and folder
|
||||
if share_id:
|
||||
logger.info(f"Alice deleting share {share_id}")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_share_delete", arguments={"share_id": share_id}
|
||||
)
|
||||
|
||||
logger.info("Alice cleaning up test folder...")
|
||||
await alice_mcp_client.call_tool(
|
||||
"nc_webdav_delete_resource", arguments={"path": folder_path}
|
||||
)
|
||||
@@ -1,260 +0,0 @@
|
||||
"""
|
||||
Multi-user OAuth tests for Nextcloud Notes permissions.
|
||||
|
||||
Tests verify that the MCP server respects Nextcloud Notes sharing permissions
|
||||
when accessed via OAuth authentication with different users.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notes_share_read_permissions(
|
||||
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared notes respect read permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a note as alice
|
||||
2. Admin shares the note with bob (read-only)
|
||||
3. Bob can read the note via MCP tools
|
||||
4. Diana cannot access the note (no share)
|
||||
"""
|
||||
# Create a note as alice (using admin client to set up data)
|
||||
note_title = "Alice's Shared Note - Read Test"
|
||||
note_content = "This note is shared with Bob for reading only."
|
||||
note_category = "SharedNotes"
|
||||
|
||||
logger.info("Creating note as alice...")
|
||||
created_note = await nc_client.notes.create_note(
|
||||
title=note_title, content=note_content, category=note_category
|
||||
)
|
||||
note_id = created_note.get("id")
|
||||
|
||||
try:
|
||||
# TODO: Share the note with bob (read-only)
|
||||
# Note: Nextcloud Notes API doesn't have direct sharing endpoints
|
||||
# Sharing is typically done at the folder level via WebDAV
|
||||
# For now, this test documents the expected behavior
|
||||
|
||||
# Test: Bob searches for notes via MCP
|
||||
logger.info("Bob searching for notes via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
|
||||
)
|
||||
|
||||
assert result.isError is False, f"Bob's search failed: {result.content}"
|
||||
response_data = json.loads(result.content[0].text)
|
||||
|
||||
# Bob should see the shared note in search results
|
||||
# (assuming proper share setup)
|
||||
assert "results" in response_data
|
||||
logger.info(f"Bob found {len(response_data['results'])} notes")
|
||||
|
||||
# Test: Diana searches for the same note
|
||||
logger.info("Diana searching for notes via MCP...")
|
||||
result = await diana_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
|
||||
)
|
||||
|
||||
assert result.isError is False
|
||||
response_data = json.loads(result.content[0].text)
|
||||
|
||||
# Diana should NOT see the note (no share)
|
||||
assert "results" in response_data
|
||||
shared_note_ids = [
|
||||
n["id"] for n in response_data["results"] if n["id"] == note_id
|
||||
]
|
||||
assert len(shared_note_ids) == 0, "Diana should not see unshared note"
|
||||
logger.info("Diana correctly cannot see unshared note")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info(f"Cleaning up note {note_id}")
|
||||
await nc_client.notes.delete_note(note_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notes_share_write_permissions(
|
||||
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
|
||||
):
|
||||
"""
|
||||
Test that shared notes respect write permissions.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a note as alice
|
||||
2. Admin shares the note with charlie (edit permission)
|
||||
3. Admin shares the note with bob (read-only)
|
||||
4. Charlie can edit the note via MCP tools
|
||||
5. Bob cannot edit the note
|
||||
"""
|
||||
# Create a note as alice
|
||||
note_title = "Alice's Shared Note - Write Test"
|
||||
note_content = "This note is shared with Charlie for editing."
|
||||
note_category = "SharedNotes"
|
||||
|
||||
logger.info("Creating note as alice...")
|
||||
created_note = await nc_client.notes.create_note(
|
||||
title=note_title, content=note_content, category=note_category
|
||||
)
|
||||
note_id = created_note.get("id")
|
||||
|
||||
try:
|
||||
# TODO: Share the note with charlie (edit permission) and bob (read-only)
|
||||
# Note: Nextcloud Notes sharing is folder-based
|
||||
|
||||
# Test: Charlie can append content to the note
|
||||
logger.info("Charlie attempting to append content via MCP...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"nc_notes_append_content",
|
||||
arguments={
|
||||
"note_id": note_id,
|
||||
"content": "\n\nCharlie added this content.",
|
||||
},
|
||||
)
|
||||
|
||||
# If sharing is properly configured, Charlie should succeed
|
||||
# Without proper sharing setup, this will fail
|
||||
logger.info(f"Charlie's append result: isError={result.isError}")
|
||||
if not result.isError:
|
||||
logger.info("Charlie successfully appended content (shares configured)")
|
||||
else:
|
||||
logger.warning("Charlie could not append (shares not yet configured)")
|
||||
|
||||
# Test: Bob attempts to append content (should fail)
|
||||
logger.info("Bob attempting to append content via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_append_content",
|
||||
arguments={"note_id": note_id, "content": "\n\nBob tried to add this."},
|
||||
)
|
||||
|
||||
# Bob should fail (read-only access)
|
||||
logger.info(f"Bob's append result: isError={result.isError}")
|
||||
if result.isError:
|
||||
logger.info("Bob correctly denied write access")
|
||||
else:
|
||||
logger.warning("Bob unexpectedly succeeded (permissions issue?)")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info(f"Cleaning up note {note_id}")
|
||||
await nc_client.notes.delete_note(note_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_isolation_notes(nc_client, alice_mcp_client, bob_mcp_client):
|
||||
"""
|
||||
Test that users can only see their own notes when not shared.
|
||||
|
||||
Scenario:
|
||||
1. Admin creates a note as alice (not shared)
|
||||
2. Admin creates a note as bob (not shared)
|
||||
3. Alice can only see her own note
|
||||
4. Bob can only see his own note
|
||||
"""
|
||||
# Create alice's note
|
||||
logger.info("Creating alice's private note...")
|
||||
alice_note = await nc_client.notes.create_note(
|
||||
title="Alice's Private Note",
|
||||
content="This is Alice's private content.",
|
||||
category="AlicePrivate",
|
||||
)
|
||||
alice_note_id = alice_note.get("id")
|
||||
|
||||
# Create bob's note
|
||||
logger.info("Creating bob's private note...")
|
||||
bob_note = await nc_client.notes.create_note(
|
||||
title="Bob's Private Note",
|
||||
content="This is Bob's private content.",
|
||||
category="BobPrivate",
|
||||
)
|
||||
bob_note_id = bob_note.get("id")
|
||||
|
||||
try:
|
||||
# Test: Alice searches all notes
|
||||
logger.info("Alice searching all notes via MCP...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
|
||||
assert result.isError is False
|
||||
response_data = json.loads(result.content[0].text)
|
||||
alice_notes = response_data.get("results", [])
|
||||
alice_note_ids = [n["id"] for n in alice_notes]
|
||||
|
||||
logger.info(f"Alice can see {len(alice_notes)} notes")
|
||||
# Alice should NOT see Bob's note
|
||||
assert bob_note_id not in alice_note_ids, (
|
||||
"Alice should not see Bob's private note"
|
||||
)
|
||||
|
||||
# Test: Bob searches all notes
|
||||
logger.info("Bob searching all notes via MCP...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
|
||||
assert result.isError is False
|
||||
response_data = json.loads(result.content[0].text)
|
||||
bob_notes = response_data.get("results", [])
|
||||
bob_note_ids = [n["id"] for n in bob_notes]
|
||||
|
||||
logger.info(f"Bob can see {len(bob_notes)} notes")
|
||||
# Bob should NOT see Alice's note
|
||||
assert alice_note_id not in bob_note_ids, (
|
||||
"Bob should not see Alice's private note"
|
||||
)
|
||||
|
||||
logger.info("User isolation test passed: users can only see their own notes")
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
logger.info("Cleaning up test notes...")
|
||||
await nc_client.notes.delete_note(alice_note_id)
|
||||
await nc_client.notes.delete_note(bob_note_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_oauth_mcp_clients_initialized(
|
||||
alice_mcp_client, bob_mcp_client, charlie_mcp_client, diana_mcp_client
|
||||
):
|
||||
"""
|
||||
Smoke test to verify all OAuth MCP clients are properly initialized.
|
||||
"""
|
||||
logger.info("Testing alice_mcp_client initialization...")
|
||||
result = await alice_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Alice MCP client failed: {result.content}"
|
||||
logger.info("Alice MCP client working")
|
||||
|
||||
logger.info("Testing bob_mcp_client initialization...")
|
||||
result = await bob_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Bob MCP client failed: {result.content}"
|
||||
logger.info("Bob MCP client working")
|
||||
|
||||
logger.info("Testing charlie_mcp_client initialization...")
|
||||
result = await charlie_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Charlie MCP client failed: {result.content}"
|
||||
logger.info("Charlie MCP client working")
|
||||
|
||||
logger.info("Testing diana_mcp_client initialization...")
|
||||
result = await diana_mcp_client.call_tool(
|
||||
"nc_notes_search_notes", arguments={"query": ""}
|
||||
)
|
||||
assert result.isError is False, f"Diana MCP client failed: {result.content}"
|
||||
logger.info("Diana MCP client working")
|
||||
|
||||
logger.info("All OAuth MCP clients successfully initialized!")
|
||||
@@ -1,108 +0,0 @@
|
||||
import pytest
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_and_delete_user(nc_client: NextcloudClient, test_user):
|
||||
"""Test creating a user and verifying deletion (cleanup by fixture)."""
|
||||
user_config = test_user
|
||||
|
||||
# Create user
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Verify user exists
|
||||
users = await nc_client.users.search_users(search=user_config["userid"])
|
||||
assert user_config["userid"] in users
|
||||
|
||||
user_details = await nc_client.users.get_user_details(user_config["userid"])
|
||||
assert user_details.id == user_config["userid"]
|
||||
assert user_details.displayname == user_config["display_name"]
|
||||
assert user_details.email == user_config["email"]
|
||||
|
||||
# Test deletion explicitly as part of test functionality
|
||||
await nc_client.users.delete_user(user_config["userid"])
|
||||
|
||||
# Verify user is deleted
|
||||
users = await nc_client.users.search_users(search=user_config["userid"])
|
||||
assert user_config["userid"] not in users
|
||||
# Note: Fixture cleanup will also try to delete but handle 404 gracefully
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_field(nc_client: NextcloudClient, test_user):
|
||||
"""Test updating user fields."""
|
||||
user_config = test_user
|
||||
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
new_email = f"new.{user_config['email']}"
|
||||
await nc_client.users.update_user_field(user_config["userid"], "email", new_email)
|
||||
|
||||
user_details = await nc_client.users.get_user_details(user_config["userid"])
|
||||
assert user_details.email == new_email
|
||||
# Fixture will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_groups(nc_client: NextcloudClient, test_user_in_group):
|
||||
"""Test adding and removing users from groups."""
|
||||
user_config, groupid = test_user_in_group
|
||||
userid = user_config["userid"]
|
||||
|
||||
# Verify user is in group
|
||||
groups = await nc_client.users.get_user_groups(userid)
|
||||
assert groupid in groups
|
||||
|
||||
# Remove user from group
|
||||
await nc_client.users.remove_user_from_group(userid, groupid)
|
||||
groups = await nc_client.users.get_user_groups(userid)
|
||||
assert groupid not in groups
|
||||
# Fixtures will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_subadmins(nc_client: NextcloudClient, test_user, test_group):
|
||||
"""Test promoting and demoting subadmins."""
|
||||
user_config = test_user
|
||||
groupid = test_group
|
||||
userid = user_config["userid"]
|
||||
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Promote to subadmin
|
||||
await nc_client.users.promote_user_to_subadmin(userid, groupid)
|
||||
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
|
||||
assert groupid in subadmin_groups
|
||||
|
||||
# Demote from subadmin
|
||||
await nc_client.users.demote_user_from_subadmin(userid, groupid)
|
||||
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
|
||||
assert groupid not in subadmin_groups
|
||||
# Fixtures will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disable_enable_user(nc_client: NextcloudClient, test_user):
|
||||
"""Test disabling and enabling users."""
|
||||
user_config = test_user
|
||||
userid = user_config["userid"]
|
||||
|
||||
await nc_client.users.create_user(**user_config)
|
||||
|
||||
# Disable user
|
||||
await nc_client.users.disable_user(userid)
|
||||
user_details = await nc_client.users.get_user_details(userid)
|
||||
assert not user_details.enabled
|
||||
|
||||
# Enable user
|
||||
await nc_client.users.enable_user(userid)
|
||||
user_details = await nc_client.users.get_user_details(userid)
|
||||
assert user_details.enabled
|
||||
# Fixture will handle cleanup
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_editable_user_fields(nc_client: NextcloudClient):
|
||||
editable_fields = await nc_client.users.get_editable_user_fields()
|
||||
assert "displayname" in editable_fields
|
||||
assert "email" in editable_fields
|
||||
-674
@@ -1,674 +0,0 @@
|
||||
=========================
|
||||
Instruction set for users
|
||||
=========================
|
||||
|
||||
Add a new user
|
||||
--------------
|
||||
|
||||
Create a new user on the Nextcloud server. Authentication is done by sending a
|
||||
basic HTTP authentication header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users**
|
||||
|
||||
* HTTP method: POST
|
||||
* POST argument: userid - string, the required username for the new user
|
||||
* POST argument: password - string, the password for the new user, leave empty to send welcome mail
|
||||
* POST argument: displayName - string, the display name for the new user
|
||||
* POST argument: email - string, the email for the new user, required if password empty
|
||||
* POST argument: groups - array, the groups for the new user
|
||||
* POST argument: subadmin - array, the groups in which the new user is subadmin
|
||||
* POST argument: quota - string, quota for the new user
|
||||
* POST argument: language - string, language for the new user
|
||||
|
||||
Status codes:
|
||||
|
||||
* 101 - invalid argument
|
||||
* 102 - user already exists
|
||||
* 103 - cannot create sub-admins for admin group
|
||||
* 104 - group does not exist
|
||||
* 105 - insufficient privileges for group
|
||||
* 106 - no group specified (required for sub-admins)
|
||||
* 107 - hint exceptions
|
||||
* 108 - an email address is required, to send a password link to the user.
|
||||
* 109 - sub-admin group does not exist
|
||||
* 110 - required email address was not provided
|
||||
* 111 - could not create non-existing user ID
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
::
|
||||
|
||||
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users -d userid="Frank" -d password="frankspassword" -H "OCS-APIRequest: true"
|
||||
|
||||
* Creates the user ``Frank`` with password ``frankspassword``
|
||||
* optionally groups can be specified by one or more ``groups[]`` query parameters:
|
||||
``URL -d groups[]="admin" -D groups[]="Team1"``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Search/get users
|
||||
----------------
|
||||
|
||||
Retrieves a list of users from the Nextcloud server. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users**
|
||||
|
||||
* HTTP method: GET
|
||||
* url arguments: search - string, optional search string
|
||||
* url arguments: limit - int, optional limit value
|
||||
* url arguments: offset - int, optional offset value
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users?search=Frank -H "OCS-APIRequest: true"
|
||||
|
||||
* Returns list of users matching the search string.
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data>
|
||||
<users>
|
||||
<element>Frank</element>
|
||||
</users>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Get data of a single user
|
||||
-------------------------
|
||||
|
||||
Retrieves information about a single user. Authentication is done by sending a
|
||||
Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
|
||||
|
||||
* Returns information on the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data>
|
||||
<enabled>true</enabled>
|
||||
<id>Frank</id>
|
||||
<quota>0</quota>
|
||||
<email>frank@example.org</email>
|
||||
<displayname>Frank K.</displayname>
|
||||
<display-name>Frank K.</display-name>
|
||||
<phone>0123 / 456 789</phone>
|
||||
<address>Foobar 12, 12345 Town</address>
|
||||
<website>https://nextcloud.com</website>
|
||||
<twitter>Nextcloud</twitter>
|
||||
<groups>
|
||||
<element>group1</element>
|
||||
<element>group2</element>
|
||||
</groups>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Edit data of a single user
|
||||
--------------------------
|
||||
|
||||
Edits attributes related to a user. Users are able to edit email, displayname
|
||||
and password; admins can also edit the quota value. Further restrictions may apply,
|
||||
check the `List of editable data fields`_ endpoint. Authentication
|
||||
is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}**
|
||||
|
||||
* HTTP method: PUT
|
||||
* PUT argument: key, the field to edit:
|
||||
|
||||
+ email
|
||||
+ quota
|
||||
+ displayname
|
||||
+ display (**deprecated** use `displayname` instead)
|
||||
+ phone
|
||||
+ address
|
||||
+ website
|
||||
+ twitter
|
||||
+ password
|
||||
|
||||
* PUT argument: value, the new value for the field
|
||||
|
||||
Status codes:
|
||||
|
||||
* 101 - invalid argument
|
||||
* 107 - password policy (hint exception)
|
||||
* 112 - Setting the password is not supported by the users backend
|
||||
* 113 - editing field not allowed / field doesn’t exist
|
||||
|
||||
Examples
|
||||
^^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="email" -d value="franksnewemail@example.org" -H "OCS-APIRequest: true"
|
||||
|
||||
* Updates the email address for the user ``Frank``
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="quota" -d value="100MB" -H "OCS-APIRequest: true"
|
||||
|
||||
* Updates the quota for the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
.. _editable_field_list:
|
||||
|
||||
List of editable data fields
|
||||
----------------------------
|
||||
|
||||
Edits attributes related to a user. Users are able to edit email, displayname
|
||||
and password; admins can also edit the quota value. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/user/fields**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Examples
|
||||
^^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/user/fields -H "OCS-APIRequest: true"
|
||||
|
||||
* Gets the list of fields
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message>OK</message>
|
||||
</meta>
|
||||
<data>
|
||||
<element>displayname</element>
|
||||
<element>email</element>
|
||||
<element>phone</element>
|
||||
<element>address</element>
|
||||
<element>website</element>
|
||||
<element>twitter</element>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
|
||||
Disable a user
|
||||
--------------
|
||||
|
||||
Disables a user on the Nextcloud server so that the user cannot login anymore.
|
||||
Authentication is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/disable**
|
||||
|
||||
* HTTP method: PUT
|
||||
|
||||
Statuscodes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/disable -H "OCS-APIRequest: true"
|
||||
|
||||
* Disables the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Enable a user
|
||||
-------------
|
||||
|
||||
Enables a user on the Nextcloud server so that the user can login again.
|
||||
Authentication is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/enable**
|
||||
|
||||
* HTTP method: PUT
|
||||
|
||||
Statuscodes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/enable -H "OCS-APIRequest: true"
|
||||
|
||||
* Enables the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Delete a user
|
||||
-------------
|
||||
|
||||
Deletes a user from the Nextcloud server. Authentication is done by sending a
|
||||
Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}**
|
||||
|
||||
* HTTP method: DELETE
|
||||
|
||||
Statuscodes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
|
||||
|
||||
* Deletes the user ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Get user's groups
|
||||
-----------------
|
||||
|
||||
Retrieves a list of groups the specified user is a member of. Authentication is
|
||||
done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -H "OCS-APIRequest: true"
|
||||
|
||||
* Retrieves a list of groups of which ``Frank`` is a member
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data>
|
||||
<groups>
|
||||
<element>admin</element>
|
||||
<element>group1</element>
|
||||
</groups>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Add user to group
|
||||
-----------------
|
||||
|
||||
Adds the specified user to the specified group. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
|
||||
|
||||
* HTTP method: POST
|
||||
* POST argument: groupid, string - the group to add the user to
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - no group specified
|
||||
* 102 - group does not exist
|
||||
* 103 - user does not exist
|
||||
* 104 - insufficient privileges
|
||||
* 105 - failed to add user to group
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
|
||||
|
||||
* Adds the user ``Frank`` to the group ``newgroup``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Remove user from group
|
||||
----------------------
|
||||
|
||||
Removes the specified user from the specified group. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
|
||||
|
||||
* HTTP method: DELETE
|
||||
* DELETE argument: groupid, string - the group to remove the user from
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - no group specified
|
||||
* 102 - group does not exist
|
||||
* 103 - user does not exist
|
||||
* 104 - insufficient privileges
|
||||
* 105 - failed to remove user from group
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
|
||||
|
||||
* Removes the user ``Frank`` from the group ``newgroup``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Promote user to subadmin
|
||||
------------------------
|
||||
|
||||
Makes a user the subadmin of a group. Authentication is done by sending a Basic
|
||||
HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
|
||||
|
||||
* HTTP method: POST
|
||||
* POST argument: groupid, string - the group of which to make the user a
|
||||
subadmin
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - user does not exist
|
||||
* 102 - group does not exist
|
||||
* 103 - unknown failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="group" -H "OCS-APIRequest: true"
|
||||
|
||||
* Makes the user ``Frank`` a subadmin of the ``group`` group
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Demote user from subadmin
|
||||
-------------------------
|
||||
|
||||
Removes the subadmin rights for the user specified from the group specified.
|
||||
Authentication is done by sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
|
||||
|
||||
* HTTP method: DELETE
|
||||
* DELETE argument: groupid, string - the group from which to remove the user's
|
||||
subadmin rights
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - user does not exist
|
||||
* 102 - user is not a subadmin of the group / group does not exist
|
||||
* 103 - unknown failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X DELETE https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="oldgroup" -H "OCS-APIRequest: true"
|
||||
|
||||
* Removes ``Frank's`` subadmin rights from the ``oldgroup`` group
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<statuscode>100</statuscode>
|
||||
<status>ok</status>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
|
||||
Get user's subadmin groups
|
||||
--------------------------
|
||||
|
||||
Returns the groups in which the user is a subadmin. Authentication is done by
|
||||
sending a Basic HTTP Authorization header.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
|
||||
|
||||
* HTTP method: GET
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - user does not exist
|
||||
* 102 - unknown failure
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X GET https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -H "OCS-APIRequest: true"
|
||||
|
||||
* Returns the groups of which ``Frank`` is a subadmin
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data>
|
||||
<element>testgroup</element>
|
||||
</data>
|
||||
</ocs>
|
||||
|
||||
Resend the welcome email
|
||||
------------------------
|
||||
|
||||
The request to this endpoint triggers the welcome email for this user again.
|
||||
|
||||
**Syntax: ocs/v1.php/cloud/users/{userid}/welcome**
|
||||
|
||||
* HTTP method: POST
|
||||
|
||||
Status codes:
|
||||
|
||||
* 100 - successful
|
||||
* 101 - email address not available
|
||||
* 102 - sending email failed
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
::
|
||||
|
||||
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/welcome -H "OCS-APIRequest: true"
|
||||
|
||||
* Sends the welcome email to ``Frank``
|
||||
|
||||
XML output
|
||||
^^^^^^^^^^
|
||||
|
||||
.. code-block:: xml
|
||||
|
||||
<?xml version="1.0"?>
|
||||
<ocs>
|
||||
<meta>
|
||||
<status>ok</status>
|
||||
<statuscode>100</statuscode>
|
||||
<message/>
|
||||
</meta>
|
||||
<data/>
|
||||
</ocs>
|
||||
Reference in New Issue
Block a user