feat(users): Initialize user API client

This commit is contained in:
Chris Coutinho
2025-09-11 09:42:42 +02:00
parent 6fa59621bf
commit 961f23b5ea
6 changed files with 1653 additions and 0 deletions
+2
View File
@@ -18,6 +18,7 @@ from .deck import DeckClient
from .notes import NotesClient
from .tables import TablesClient
from .webdav import WebDAVClient
from .users import UsersClient
logger = logging.getLogger(__name__)
@@ -71,6 +72,7 @@ class NextcloudClient:
self.calendar = CalendarClient(self._client, username)
self.contacts = ContactsClient(self._client, username)
self.deck = DeckClient(self._client, username)
self.users = UsersClient(self._client, username)
# Initialize controllers
self._notes_search = NotesSearchController()
+244
View File
@@ -0,0 +1,244 @@
from typing import List, Optional, Dict
from nextcloud_mcp_server.client.base import BaseNextcloudClient
from nextcloud_mcp_server.models.users import UserDetails
class UsersClient(BaseNextcloudClient):
"""Client for Nextcloud User API operations."""
def _get_user_headers(
self, additional_headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""Get standard headers required for User API calls."""
headers = {"OCS-APIRequest": "true"}
if additional_headers:
headers.update(additional_headers)
return headers
async def create_user(
self,
userid: str,
password: Optional[str] = None,
display_name: Optional[str] = None,
email: Optional[str] = None,
groups: Optional[List[str]] = None,
subadmin_groups: Optional[List[str]] = None,
quota: Optional[str] = None,
language: Optional[str] = None,
) -> None:
"""
Create a new user on the Nextcloud server.
"""
data = {"userid": userid}
if password is not None:
data["password"] = password
if display_name is not None:
data["displayName"] = display_name
if email is not None:
data["email"] = email
if groups is not None:
for i, group in enumerate(groups):
data[f"groups[{i}]"] = group
if subadmin_groups is not None:
for i, group in enumerate(subadmin_groups):
data[f"subadmin[{i}]"] = group
if quota is not None:
data["quota"] = quota
if language is not None:
data["language"] = language
headers = self._get_user_headers()
await self._make_request(
"POST", "/ocs/v1.php/cloud/users", data=data, headers=headers
)
async def search_users(
self,
search: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List[str]:
"""
Retrieves a list of users from the Nextcloud server.
"""
params = {}
if search is not None:
params["search"] = search
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
headers = self._get_user_headers()
response = await self._make_request(
"GET", "/ocs/v1.php/cloud/users", params=params, headers=headers
)
# The API returns XML, which is parsed into a dict.
# The user IDs are under ocs.data.users.element (can be a list or a single string)
data = response.json()["ocs"]["data"]
if "users" in data and "element" in data["users"]:
elements = data["users"]["element"]
if isinstance(elements, list):
return elements
elif isinstance(elements, str):
return [elements]
return []
async def get_user_details(self, userid: str) -> UserDetails:
"""
Retrieves information about a single user.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", f"/ocs/v1.php/cloud/users/{userid}", headers=headers
)
return UserDetails(**response.json()["ocs"]["data"])
async def update_user_field(self, userid: str, key: str, value: str) -> None:
"""
Edits attributes related to a user.
"""
data = {"key": key, "value": value}
headers = self._get_user_headers()
await self._make_request(
"PUT", f"/ocs/v1.php/cloud/users/{userid}", data=data, headers=headers
)
async def get_editable_user_fields(self) -> List[str]:
"""
Gets the list of editable data fields for a user.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", "/ocs/v1.php/cloud/user/fields", headers=headers
)
data = response.json()["ocs"]["data"]
if "element" in data:
elements = data["element"]
if isinstance(elements, list):
return elements
elif isinstance(elements, str):
return [elements]
return []
async def disable_user(self, userid: str) -> None:
"""
Disables a user on the Nextcloud server.
"""
headers = self._get_user_headers()
await self._make_request(
"PUT", f"/ocs/v1.php/cloud/users/{userid}/disable", headers=headers
)
async def enable_user(self, userid: str) -> None:
"""
Enables a user on the Nextcloud server.
"""
headers = self._get_user_headers()
await self._make_request(
"PUT", f"/ocs/v1.php/cloud/users/{userid}/enable", headers=headers
)
async def delete_user(self, userid: str) -> None:
"""
Deletes a user from the Nextcloud server.
"""
headers = self._get_user_headers()
await self._make_request(
"DELETE", f"/ocs/v1.php/cloud/users/{userid}", headers=headers
)
async def get_user_groups(self, userid: str) -> List[str]:
"""
Retrieves a list of groups the specified user is a member of.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", f"/ocs/v1.php/cloud/users/{userid}/groups", headers=headers
)
data = response.json()["ocs"]["data"]
if "groups" in data and "element" in data["groups"]:
elements = data["groups"]["element"]
if isinstance(elements, list):
return elements
elif isinstance(elements, str):
return [elements]
return []
async def add_user_to_group(self, userid: str, groupid: str) -> None:
"""
Adds the specified user to the specified group.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"POST",
f"/ocs/v1.php/cloud/users/{userid}/groups",
data=data,
headers=headers,
)
async def remove_user_from_group(self, userid: str, groupid: str) -> None:
"""
Removes the specified user from the specified group.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"DELETE",
f"/ocs/v1.php/cloud/users/{userid}/groups",
data=data,
headers=headers,
)
async def promote_user_to_subadmin(self, userid: str, groupid: str) -> None:
"""
Makes a user the subadmin of a group.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"POST",
f"/ocs/v1.php/cloud/users/{userid}/subadmins",
data=data,
headers=headers,
)
async def demote_user_from_subadmin(self, userid: str, groupid: str) -> None:
"""
Removes the subadmin rights for the user specified from the group specified.
"""
data = {"groupid": groupid}
headers = self._get_user_headers()
await self._make_request(
"DELETE",
f"/ocs/v1.php/cloud/users/{userid}/subadmins",
data=data,
headers=headers,
)
async def get_user_subadmin_groups(self, userid: str) -> List[str]:
"""
Returns the groups in which the user is a subadmin.
"""
headers = self._get_user_headers()
response = await self._make_request(
"GET", f"/ocs/v1.php/cloud/users/{userid}/subadmins", headers=headers
)
data = response.json()["ocs"]["data"]
if "element" in data:
elements = data["element"]
if isinstance(elements, list):
return elements
elif isinstance(elements, str):
return [elements]
return []
async def resend_welcome_email(self, userid: str) -> None:
"""
Triggers the welcome email for this user again.
"""
headers = self._get_user_headers()
await self._make_request(
"POST", f"/ocs/v1.php/cloud/users/{userid}/welcome", headers=headers
)
+38
View File
@@ -0,0 +1,38 @@
from typing import List, Optional
from pydantic import BaseModel, Field
class User(BaseModel):
"""Model for creating a new user."""
userid: str
password: Optional[str] = None
displayName: Optional[str] = None
email: Optional[str] = None
groups: Optional[List[str]] = Field(default_factory=list)
subadmin: Optional[List[str]] = Field(default_factory=list)
quota: Optional[str] = None
language: Optional[str] = None
class UserDetails(BaseModel):
"""Model for retrieving detailed user information."""
enabled: bool
id: str
quota: str
email: str
displayname: str = Field(
alias="display-name"
) # Handle both displayname and display-name
phone: Optional[str] = None
address: Optional[str] = None
website: Optional[str] = None
twitter: Optional[str] = None
groups: Optional[List[str]] = Field(default_factory=list)
class Group(BaseModel):
"""Model for a user group."""
id: str
@@ -0,0 +1,569 @@
import json
import logging
import uuid
import pytest
from mcp import ClientSession
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.integration
# Stack MCP Tools Tests
async def test_deck_stack_mcp_tools(
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
):
"""Test complete deck stack operations via MCP tools."""
board_id = temporary_board["id"]
stack_title = f"MCP Test Stack {uuid.uuid4().hex[:8]}"
stack_order = 1
# 1. Create stack via MCP tool
logger.info(f"Creating stack via MCP: {stack_title}")
create_result = await nc_mcp_client.call_tool(
"deck_create_stack",
{"board_id": board_id, "title": stack_title, "order": stack_order},
)
assert create_result.isError is False, (
f"MCP stack creation failed: {create_result.content}"
)
created_stack_response = json.loads(create_result.content[0].text)
stack_id = created_stack_response["id"]
assert created_stack_response["title"] == stack_title
assert created_stack_response["order"] == stack_order
logger.info(f"Stack created via MCP with ID: {stack_id}")
try:
# 2. Get stack via MCP resource
logger.info(f"Getting stack via MCP resource: {stack_id}")
get_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
)
assert len(get_result.contents) == 1, "Expected exactly one content item"
get_stack_response = json.loads(get_result.contents[0].text)
assert get_stack_response["title"] == stack_title
logger.info("Stack retrieved via MCP resource successfully")
# 3. Update stack via MCP tool
updated_title = f"Updated {stack_title}"
updated_order = 2
logger.info(f"Updating stack via MCP tool: {stack_id}")
update_result = await nc_mcp_client.call_tool(
"deck_update_stack",
{
"board_id": board_id,
"stack_id": stack_id,
"title": updated_title,
"order": updated_order,
},
)
assert update_result.isError is False, (
f"MCP stack update failed: {update_result.content}"
)
logger.info("Stack updated via MCP tool successfully")
# 4. Verify update via direct client
updated_stack = await nc_client.deck.get_stack(board_id, stack_id)
assert updated_stack.title == updated_title
assert updated_stack.order == updated_order
logger.info("Stack update verified via direct client")
# 5. List stacks via MCP resource
logger.info("Listing stacks via MCP resource")
list_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks"
)
assert len(list_result.contents) == 1, "Expected exactly one content item"
stacks_data = json.loads(list_result.contents[0].text)
assert isinstance(stacks_data, list)
# Verify our stack is in the list
stack_ids = [stack["id"] for stack in stacks_data]
assert stack_id in stack_ids, "Updated stack not found in list"
logger.info(f"Stack {stack_id} found in stacks list")
# 6. Read stack via MCP resource
logger.info(f"Reading stack via MCP resource: {stack_id}")
read_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}"
)
read_stack_data = json.loads(read_result.contents[0].text)
assert read_stack_data["title"] == updated_title
logger.info("Stack read via MCP resource successfully")
finally:
# Clean up
await nc_client.deck.delete_stack(board_id, stack_id)
logger.info(f"Cleaned up stack ID: {stack_id}")
# Card MCP Tools Tests
async def test_deck_card_mcp_tools(
nc_mcp_client: ClientSession,
nc_client: NextcloudClient,
temporary_board_with_stack: tuple,
):
"""Test complete deck card operations via MCP tools."""
board_data, stack_data = temporary_board_with_stack
board_id = board_data["id"]
stack_id = stack_data["id"]
card_title = f"MCP Test Card {uuid.uuid4().hex[:8]}"
card_description = f"Test description for {card_title}"
# 1. Create card via MCP tool
logger.info(f"Creating card via MCP: {card_title}")
create_result = await nc_mcp_client.call_tool(
"deck_create_card",
{
"board_id": board_id,
"stack_id": stack_id,
"title": card_title,
"description": card_description,
"type": "plain",
"order": 1,
},
)
assert create_result.isError is False, (
f"MCP card creation failed: {create_result.content}"
)
created_card_response = json.loads(create_result.content[0].text)
card_id = created_card_response["id"]
assert created_card_response["title"] == card_title
assert created_card_response["description"] == card_description
logger.info(f"Card created via MCP with ID: {card_id}")
try:
# 2. Get card via MCP resource
logger.info(f"Getting card via MCP resource: {card_id}")
get_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
)
assert len(get_result.contents) == 1, "Expected exactly one content item"
get_card_response = json.loads(get_result.contents[0].text)
assert get_card_response["title"] == card_title
logger.info("Card retrieved via MCP resource successfully")
# 3. Update card via MCP tool
updated_title = f"Updated {card_title}"
updated_description = f"Updated description for {card_title}"
logger.info(f"Updating card via MCP tool: {card_id}")
update_result = await nc_mcp_client.call_tool(
"deck_update_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"title": updated_title,
"description": updated_description,
},
)
assert update_result.isError is False, (
f"MCP card update failed: {update_result.content}"
)
logger.info("Card updated via MCP tool successfully")
# 4. Verify update via direct client
updated_card = await nc_client.deck.get_card(board_id, stack_id, card_id)
assert updated_card.title == updated_title
assert updated_card.description == updated_description
logger.info("Card update verified via direct client")
# 5. Archive/unarchive card via MCP tools
logger.info(f"Archiving card via MCP tool: {card_id}")
archive_result = await nc_mcp_client.call_tool(
"deck_archive_card",
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
)
assert archive_result.isError is False, (
f"MCP card archive failed: {archive_result.content}"
)
logger.info("Card archived via MCP tool successfully")
logger.info(f"Unarchiving card via MCP tool: {card_id}")
unarchive_result = await nc_mcp_client.call_tool(
"deck_unarchive_card",
{"board_id": board_id, "stack_id": stack_id, "card_id": card_id},
)
assert unarchive_result.isError is False, (
f"MCP card unarchive failed: {unarchive_result.content}"
)
logger.info("Card unarchived via MCP tool successfully")
# 6. Move card to different position via MCP tool
logger.info(f"Reordering card via MCP tool: {card_id}")
reorder_result = await nc_mcp_client.call_tool(
"deck_reorder_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"order": 10,
"target_stack_id": stack_id,
},
)
assert reorder_result.isError is False, (
f"MCP card reorder failed: {reorder_result.content}"
)
logger.info("Card reordered via MCP tool successfully")
# 7. Read card via MCP resource
logger.info(f"Reading card via MCP resource: {card_id}")
read_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}"
)
read_card_data = json.loads(read_result.contents[0].text)
assert read_card_data["title"] == updated_title
logger.info("Card read via MCP resource successfully")
finally:
# Clean up
await nc_client.deck.delete_card(board_id, stack_id, card_id)
logger.info(f"Cleaned up card ID: {card_id}")
# Label MCP Tools Tests
async def test_deck_label_mcp_tools(
nc_mcp_client: ClientSession, nc_client: NextcloudClient, temporary_board: dict
):
"""Test complete deck label operations via MCP tools."""
board_id = temporary_board["id"]
label_title = f"MCP Test Label {uuid.uuid4().hex[:8]}"
label_color = "FF0000" # Red
# 1. Create label via MCP tool
logger.info(f"Creating label via MCP: {label_title}")
create_result = await nc_mcp_client.call_tool(
"deck_create_label",
{"board_id": board_id, "title": label_title, "color": label_color},
)
assert create_result.isError is False, (
f"MCP label creation failed: {create_result.content}"
)
created_label_response = json.loads(create_result.content[0].text)
label_id = created_label_response["id"]
assert created_label_response["title"] == label_title
assert created_label_response["color"] == label_color
logger.info(f"Label created via MCP with ID: {label_id}")
try:
# 2. Get label via MCP resource
logger.info(f"Getting label via MCP resource: {label_id}")
get_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/labels/{label_id}"
)
assert len(get_result.contents) == 1, "Expected exactly one content item"
get_label_response = json.loads(get_result.contents[0].text)
assert get_label_response["title"] == label_title
logger.info("Label retrieved via MCP resource successfully")
# 3. Update label via MCP tool
updated_title = f"Updated {label_title}"
updated_color = "00FF00" # Green
logger.info(f"Updating label via MCP tool: {label_id}")
update_result = await nc_mcp_client.call_tool(
"deck_update_label",
{
"board_id": board_id,
"label_id": label_id,
"title": updated_title,
"color": updated_color,
},
)
assert update_result.isError is False, (
f"MCP label update failed: {update_result.content}"
)
logger.info("Label updated via MCP tool successfully")
# 4. Verify update via direct client
updated_label = await nc_client.deck.get_label(board_id, label_id)
assert updated_label.title == updated_title
assert updated_label.color == updated_color
logger.info("Label update verified via direct client")
# 5. Read label via MCP resource
logger.info(f"Reading label via MCP resource: {label_id}")
read_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/labels/{label_id}"
)
read_label_data = json.loads(read_result.contents[0].text)
assert read_label_data["title"] == updated_title
logger.info("Label read via MCP resource successfully")
finally:
# Clean up
await nc_client.deck.delete_label(board_id, label_id)
logger.info(f"Cleaned up label ID: {label_id}")
# Label-Card Assignment Tests
async def test_deck_card_label_assignment_mcp_tools(
nc_mcp_client: ClientSession,
nc_client: NextcloudClient,
temporary_board_with_card: tuple,
):
"""Test card-label assignment operations via MCP tools."""
board_data, stack_data, card_data = temporary_board_with_card
board_id = board_data["id"]
stack_id = stack_data["id"]
card_id = card_data["id"]
# Create a label for assignment
label = await nc_client.deck.create_label(
board_id, "Assignment Test Label", "0000FF"
)
label_id = label.id
try:
# 1. Assign label to card via MCP tool
logger.info(f"Assigning label {label_id} to card {card_id} via MCP")
assign_result = await nc_mcp_client.call_tool(
"deck_assign_label_to_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"label_id": label_id,
},
)
assert assign_result.isError is False, (
f"MCP label assignment failed: {assign_result.content}"
)
logger.info("Label assigned to card via MCP tool successfully")
# 2. Verify assignment via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.labels:
label_ids = [label.id for label in card.labels]
assert label_id in label_ids, "Label not found in card labels"
logger.info("Label assignment verified via direct client")
# 3. Remove label from card via MCP tool
logger.info(f"Removing label {label_id} from card {card_id} via MCP")
remove_result = await nc_mcp_client.call_tool(
"deck_remove_label_from_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"label_id": label_id,
},
)
assert remove_result.isError is False, (
f"MCP label removal failed: {remove_result.content}"
)
logger.info("Label removed from card via MCP tool successfully")
# 4. Verify removal via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.labels:
label_ids = [label.id for label in card.labels]
assert label_id not in label_ids, (
"Label still found in card labels after removal"
)
logger.info("Label removal verified via direct client")
finally:
# Clean up
await nc_client.deck.delete_label(board_id, label_id)
logger.info(f"Cleaned up label ID: {label_id}")
# User Assignment Tests
async def test_deck_card_user_assignment_mcp_tools(
nc_mcp_client: ClientSession,
nc_client: NextcloudClient,
temporary_board_with_card: tuple,
):
"""Test card-user assignment operations via MCP tools."""
board_data, stack_data, card_data = temporary_board_with_card
board_id = board_data["id"]
stack_id = stack_data["id"]
card_id = card_data["id"]
# Use the current user ID (admin in most test environments)
user_id = "admin"
# 1. Assign user to card via MCP tool
logger.info(f"Assigning user {user_id} to card {card_id} via MCP")
assign_result = await nc_mcp_client.call_tool(
"deck_assign_user_to_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"user_id": user_id,
},
)
assert assign_result.isError is False, (
f"MCP user assignment failed: {assign_result.content}"
)
logger.info("User assigned to card via MCP tool successfully")
# 2. Verify assignment via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.assignedUsers:
user_ids = []
for user in card.assignedUsers:
if hasattr(user, "participant"):
# It's a DeckAssignedUser with participant
user_ids.append(user.participant.uid)
elif hasattr(user, "uid"):
# It's a direct DeckUser
user_ids.append(user.uid)
assert user_id in user_ids, "User not found in card assigned users"
logger.info("User assignment verified via direct client")
# 3. Unassign user from card via MCP tool
logger.info(f"Unassigning user {user_id} from card {card_id} via MCP")
unassign_result = await nc_mcp_client.call_tool(
"deck_unassign_user_from_card",
{
"board_id": board_id,
"stack_id": stack_id,
"card_id": card_id,
"user_id": user_id,
},
)
assert unassign_result.isError is False, (
f"MCP user unassignment failed: {unassign_result.content}"
)
logger.info("User unassigned from card via MCP tool successfully")
# 4. Verify unassignment via direct client
card = await nc_client.deck.get_card(board_id, stack_id, card_id)
if card.assignedUsers:
user_ids = []
for user in card.assignedUsers:
if hasattr(user, "participant"):
# It's a DeckAssignedUser with participant
user_ids.append(user.participant.uid)
elif hasattr(user, "uid"):
# It's a direct DeckUser
user_ids.append(user.uid)
assert user_id not in user_ids, (
"User still found in card assigned users after removal"
)
logger.info("User unassignment verified via direct client")
# Error handling tests
async def test_deck_mcp_tools_error_handling(nc_mcp_client: ClientSession):
"""Test error handling for deck MCP tools with invalid parameters."""
non_existent_id = 999999999
# Test stack operations with non-existent board
stack_result = await nc_mcp_client.call_tool(
"deck_create_stack",
{"board_id": non_existent_id, "title": "Should Fail", "order": 1},
)
assert stack_result.isError is True, (
"Expected error for stack creation on non-existent board"
)
# Test card operations with non-existent IDs
card_result = await nc_mcp_client.call_tool(
"deck_create_card",
{
"board_id": non_existent_id,
"stack_id": non_existent_id,
"title": "Should Fail",
"type": "plain",
},
)
assert card_result.isError is True, (
"Expected error for card creation with non-existent IDs"
)
# Test label operations with non-existent board
label_result = await nc_mcp_client.call_tool(
"deck_create_label",
{"board_id": non_existent_id, "title": "Should Fail", "color": "FF0000"},
)
assert label_result.isError is True, (
"Expected error for label creation on non-existent board"
)
logger.info("Error handling tests passed for deck MCP tools")
# Resource template tests
async def test_deck_mcp_resource_templates(nc_mcp_client: ClientSession):
"""Test deck MCP resource templates are properly registered."""
templates = await nc_mcp_client.list_resource_templates()
template_uris = [template.uriTemplate for template in templates.resourceTemplates]
expected_templates = [
"nc://Deck/boards/{board_id}/stacks/{stack_id}",
"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards/{card_id}",
"nc://Deck/boards/{board_id}/labels/{label_id}",
]
for expected_template in expected_templates:
assert expected_template in template_uris, (
f"Expected template '{expected_template}' not found"
)
logger.info(f"Found expected deck resource template: {expected_template}")
# Listing resource tests
async def test_deck_mcp_listing_resources(
nc_mcp_client: ClientSession, temporary_board_with_card: tuple
):
"""Test deck MCP listing resources for stacks and cards."""
board_data, stack_data, card_data = temporary_board_with_card
board_id = board_data["id"]
stack_id = stack_data["id"]
# 1. Test listing stacks resource
logger.info(f"Reading stacks list via MCP resource for board {board_id}")
stacks_resource_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks"
)
stacks_resource_data = json.loads(stacks_resource_result.contents[0].text)
assert isinstance(stacks_resource_data, list)
# Verify our stack is in the resource list
stack_ids = [stack["id"] for stack in stacks_resource_data]
assert stack_id in stack_ids, "Stack not found in stacks resource list"
logger.info("Stack found in stacks resource list")
# 2. Test listing cards resource
logger.info(f"Reading cards list via MCP resource for stack {stack_id}")
cards_resource_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/stacks/{stack_id}/cards"
)
cards_resource_data = json.loads(cards_resource_result.contents[0].text)
assert isinstance(cards_resource_data, list)
# Verify our card is in the resource list
card_ids = [card["id"] for card in cards_resource_data]
assert card_data["id"] in card_ids, "Card not found in cards resource list"
logger.info("Card found in cards resource list")
# 3. Test listing labels resource
logger.info(f"Reading labels list via MCP resource for board {board_id}")
labels_resource_result = await nc_mcp_client.read_resource(
f"nc://Deck/boards/{board_id}/labels"
)
labels_resource_data = json.loads(labels_resource_result.contents[0].text)
assert isinstance(labels_resource_data, list)
logger.info("Labels resource read successfully")
+126
View File
@@ -0,0 +1,126 @@
import pytest
from nextcloud_mcp_server.client import NextcloudClient
@pytest.mark.asyncio
async def test_create_and_delete_user(nc_client: NextcloudClient):
userid = "testuser1"
password = "testpassword1"
display_name = "Test User One"
email = "test1@example.com"
# Create user
await nc_client.users.create_user(
userid=userid,
password=password,
display_name=display_name,
email=email,
)
# Verify user exists
users = await nc_client.users.search_users(search=userid)
assert userid in users
user_details = await nc_client.users.get_user_details(userid)
assert user_details.id == userid
assert user_details.displayname == display_name
assert user_details.email == email
# Delete user
await nc_client.users.delete_user(userid)
# Verify user is deleted
users = await nc_client.users.search_users(search=userid)
assert userid not in users
@pytest.mark.asyncio
async def test_update_user_field(nc_client: NextcloudClient):
userid = "testuser2"
password = "testpassword2"
display_name = "Test User Two"
email = "test2@example.com"
await nc_client.users.create_user(
userid=userid,
password=password,
display_name=display_name,
email=email,
)
new_email = "new.test2@example.com"
await nc_client.users.update_user_field(userid, "email", new_email)
user_details = await nc_client.users.get_user_details(userid)
assert user_details.email == new_email
await nc_client.users.delete_user(userid)
@pytest.mark.asyncio
async def test_user_groups(nc_client: NextcloudClient):
userid = "testuser3"
password = "testpassword3"
groupid = "testgroup"
await nc_client.users.create_user(userid=userid, password=password)
# Add user to group
await nc_client.users.add_user_to_group(userid, groupid)
groups = await nc_client.users.get_user_groups(userid)
assert groupid in groups
# Remove user from group
await nc_client.users.remove_user_from_group(userid, groupid)
groups = await nc_client.users.get_user_groups(userid)
assert groupid not in groups
await nc_client.users.delete_user(userid)
@pytest.mark.asyncio
async def test_user_subadmins(nc_client: NextcloudClient):
userid = "testuser4"
password = "testpassword4"
groupid = "subadmingroup"
await nc_client.users.create_user(userid=userid, password=password)
# Promote to subadmin
await nc_client.users.promote_user_to_subadmin(userid, groupid)
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
assert groupid in subadmin_groups
# Demote from subadmin
await nc_client.users.demote_user_from_subadmin(userid, groupid)
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
assert groupid not in subadmin_groups
await nc_client.users.delete_user(userid)
@pytest.mark.asyncio
async def test_disable_enable_user(nc_client: NextcloudClient):
userid = "testuser5"
password = "testpassword5"
await nc_client.users.create_user(userid=userid, password=password)
# Disable user
await nc_client.users.disable_user(userid)
user_details = await nc_client.users.get_user_details(userid)
assert not user_details.enabled
# Enable user
await nc_client.users.enable_user(userid)
user_details = await nc_client.users.get_user_details(userid)
assert user_details.enabled
await nc_client.users.delete_user(userid)
@pytest.mark.asyncio
async def test_get_editable_user_fields(nc_client: NextcloudClient):
editable_fields = await nc_client.users.get_editable_user_fields()
assert "displayname" in editable_fields
assert "email" in editable_fields
+674
View File
@@ -0,0 +1,674 @@
=========================
Instruction set for users
=========================
Add a new user
--------------
Create a new user on the Nextcloud server. Authentication is done by sending a
basic HTTP authentication header.
**Syntax: ocs/v1.php/cloud/users**
* HTTP method: POST
* POST argument: userid - string, the required username for the new user
* POST argument: password - string, the password for the new user, leave empty to send welcome mail
* POST argument: displayName - string, the display name for the new user
* POST argument: email - string, the email for the new user, required if password empty
* POST argument: groups - array, the groups for the new user
* POST argument: subadmin - array, the groups in which the new user is subadmin
* POST argument: quota - string, quota for the new user
* POST argument: language - string, language for the new user
Status codes:
* 101 - invalid argument
* 102 - user already exists
* 103 - cannot create sub-admins for admin group
* 104 - group does not exist
* 105 - insufficient privileges for group
* 106 - no group specified (required for sub-admins)
* 107 - hint exceptions
* 108 - an email address is required, to send a password link to the user.
* 109 - sub-admin group does not exist
* 110 - required email address was not provided
* 111 - could not create non-existing user ID
Example
^^^^^^^
::
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users -d userid="Frank" -d password="frankspassword" -H "OCS-APIRequest: true"
* Creates the user ``Frank`` with password ``frankspassword``
* optionally groups can be specified by one or more ``groups[]`` query parameters:
``URL -d groups[]="admin" -D groups[]="Team1"``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Search/get users
----------------
Retrieves a list of users from the Nextcloud server. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users**
* HTTP method: GET
* url arguments: search - string, optional search string
* url arguments: limit - int, optional limit value
* url arguments: offset - int, optional offset value
Status codes:
* 100 - successful
Example
^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users?search=Frank -H "OCS-APIRequest: true"
* Returns list of users matching the search string.
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data>
<users>
<element>Frank</element>
</users>
</data>
</ocs>
Get data of a single user
-------------------------
Retrieves information about a single user. Authentication is done by sending a
Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}**
* HTTP method: GET
Status codes:
* 100 - successful
Example
^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
* Returns information on the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data>
<enabled>true</enabled>
<id>Frank</id>
<quota>0</quota>
<email>frank@example.org</email>
<displayname>Frank K.</displayname>
<display-name>Frank K.</display-name>
<phone>0123 / 456 789</phone>
<address>Foobar 12, 12345 Town</address>
<website>https://nextcloud.com</website>
<twitter>Nextcloud</twitter>
<groups>
<element>group1</element>
<element>group2</element>
</groups>
</data>
</ocs>
Edit data of a single user
--------------------------
Edits attributes related to a user. Users are able to edit email, displayname
and password; admins can also edit the quota value. Further restrictions may apply,
check the `List of editable data fields`_ endpoint. Authentication
is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}**
* HTTP method: PUT
* PUT argument: key, the field to edit:
+ email
+ quota
+ displayname
+ display (**deprecated** use `displayname` instead)
+ phone
+ address
+ website
+ twitter
+ password
* PUT argument: value, the new value for the field
Status codes:
* 101 - invalid argument
* 107 - password policy (hint exception)
* 112 - Setting the password is not supported by the users backend
* 113 - editing field not allowed / field doesnt exist
Examples
^^^^^^^^
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="email" -d value="franksnewemail@example.org" -H "OCS-APIRequest: true"
* Updates the email address for the user ``Frank``
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -d key="quota" -d value="100MB" -H "OCS-APIRequest: true"
* Updates the quota for the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
.. _editable_field_list:
List of editable data fields
----------------------------
Edits attributes related to a user. Users are able to edit email, displayname
and password; admins can also edit the quota value. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/user/fields**
* HTTP method: GET
Status codes:
* 100 - successful
Examples
^^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/user/fields -H "OCS-APIRequest: true"
* Gets the list of fields
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message>OK</message>
</meta>
<data>
<element>displayname</element>
<element>email</element>
<element>phone</element>
<element>address</element>
<element>website</element>
<element>twitter</element>
</data>
</ocs>
Disable a user
--------------
Disables a user on the Nextcloud server so that the user cannot login anymore.
Authentication is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/disable**
* HTTP method: PUT
Statuscodes:
* 100 - successful
* 101 - failure
Example
^^^^^^^
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/disable -H "OCS-APIRequest: true"
* Disables the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Enable a user
-------------
Enables a user on the Nextcloud server so that the user can login again.
Authentication is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/enable**
* HTTP method: PUT
Statuscodes:
* 100 - successful
* 101 - failure
Example
^^^^^^^
::
$ curl -X PUT http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/enable -H "OCS-APIRequest: true"
* Enables the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>
Delete a user
-------------
Deletes a user from the Nextcloud server. Authentication is done by sending a
Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}**
* HTTP method: DELETE
Statuscodes:
* 100 - successful
* 101 - failure
Example
^^^^^^^
::
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank -H "OCS-APIRequest: true"
* Deletes the user ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Get user's groups
-----------------
Retrieves a list of groups the specified user is a member of. Authentication is
done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
* HTTP method: GET
Status codes:
* 100 - successful
Example
^^^^^^^
::
$ curl -X GET http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -H "OCS-APIRequest: true"
* Retrieves a list of groups of which ``Frank`` is a member
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data>
<groups>
<element>admin</element>
<element>group1</element>
</groups>
</data>
</ocs>
Add user to group
-----------------
Adds the specified user to the specified group. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
* HTTP method: POST
* POST argument: groupid, string - the group to add the user to
Status codes:
* 100 - successful
* 101 - no group specified
* 102 - group does not exist
* 103 - user does not exist
* 104 - insufficient privileges
* 105 - failed to add user to group
Example
^^^^^^^
::
$ curl -X POST http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
* Adds the user ``Frank`` to the group ``newgroup``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Remove user from group
----------------------
Removes the specified user from the specified group. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/groups**
* HTTP method: DELETE
* DELETE argument: groupid, string - the group to remove the user from
Status codes:
* 100 - successful
* 101 - no group specified
* 102 - group does not exist
* 103 - user does not exist
* 104 - insufficient privileges
* 105 - failed to remove user from group
Example
^^^^^^^
::
$ curl -X DELETE http://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/groups -d groupid="newgroup" -H "OCS-APIRequest: true"
* Removes the user ``Frank`` from the group ``newgroup``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Promote user to subadmin
------------------------
Makes a user the subadmin of a group. Authentication is done by sending a Basic
HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
* HTTP method: POST
* POST argument: groupid, string - the group of which to make the user a
subadmin
Status codes:
* 100 - successful
* 101 - user does not exist
* 102 - group does not exist
* 103 - unknown failure
Example
^^^^^^^
::
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="group" -H "OCS-APIRequest: true"
* Makes the user ``Frank`` a subadmin of the ``group`` group
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Demote user from subadmin
-------------------------
Removes the subadmin rights for the user specified from the group specified.
Authentication is done by sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
* HTTP method: DELETE
* DELETE argument: groupid, string - the group from which to remove the user's
subadmin rights
Status codes:
* 100 - successful
* 101 - user does not exist
* 102 - user is not a subadmin of the group / group does not exist
* 103 - unknown failure
Example
^^^^^^^
::
$ curl -X DELETE https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -d groupid="oldgroup" -H "OCS-APIRequest: true"
* Removes ``Frank's`` subadmin rights from the ``oldgroup`` group
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<statuscode>100</statuscode>
<status>ok</status>
</meta>
<data/>
</ocs>
Get user's subadmin groups
--------------------------
Returns the groups in which the user is a subadmin. Authentication is done by
sending a Basic HTTP Authorization header.
**Syntax: ocs/v1.php/cloud/users/{userid}/subadmins**
* HTTP method: GET
Status codes:
* 100 - successful
* 101 - user does not exist
* 102 - unknown failure
Example
^^^^^^^
::
$ curl -X GET https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/subadmins -H "OCS-APIRequest: true"
* Returns the groups of which ``Frank`` is a subadmin
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data>
<element>testgroup</element>
</data>
</ocs>
Resend the welcome email
------------------------
The request to this endpoint triggers the welcome email for this user again.
**Syntax: ocs/v1.php/cloud/users/{userid}/welcome**
* HTTP method: POST
Status codes:
* 100 - successful
* 101 - email address not available
* 102 - sending email failed
Example
^^^^^^^
::
$ curl -X POST https://admin:secret@example.com/ocs/v1.php/cloud/users/Frank/welcome -H "OCS-APIRequest: true"
* Sends the welcome email to ``Frank``
XML output
^^^^^^^^^^
.. code-block:: xml
<?xml version="1.0"?>
<ocs>
<meta>
<status>ok</status>
<statuscode>100</statuscode>
<message/>
</meta>
<data/>
</ocs>