test: Add tests for sharing/groups

This commit is contained in:
Chris Coutinho
2025-10-15 03:46:01 +02:00
parent 85f8522085
commit b50e212f05
4 changed files with 572 additions and 222 deletions
+166
View File
@@ -0,0 +1,166 @@
"""Integration tests for Nextcloud Sharing API client."""
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.integration
@pytest.mark.asyncio
async def test_create_and_delete_share(nc_client):
"""Test creating and deleting a file share."""
# Create a test user to share with
test_user = "testuser3"
try:
await nc_client.users.create_user(userid=test_user, password="password123")
except Exception:
pass # User might already exist
# Create a test file
file_path = "/test_share_file.txt"
file_content = b"Test file for sharing"
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Create a share
share_data = await nc_client.sharing.create_share(
path=file_path,
share_with=test_user, # Share with test user
share_type=0, # User share
permissions=1, # Read-only
)
assert share_data is not None
assert "id" in share_data
share_id = share_data["id"]
logger.info(f"Created share: {share_id}")
# Get share info
share_info = await nc_client.sharing.get_share(share_id)
assert share_info["id"] == share_id
assert share_info["path"] == file_path
assert share_info["permissions"] == 1
# List shares
shares = await nc_client.sharing.list_shares(path=file_path)
assert len(shares) > 0
assert any(s["id"] == share_id for s in shares)
finally:
# Cleanup
if share_id:
await nc_client.sharing.delete_share(share_id)
logger.info(f"Deleted share: {share_id}")
await nc_client.webdav.delete_resource(file_path)
# Cleanup test user
try:
await nc_client.users.delete_user(test_user)
except Exception:
pass
@pytest.mark.asyncio
async def test_update_share_permissions(nc_client):
"""Test updating share permissions."""
# Create a test user to share with
test_user = "testuser3"
try:
await nc_client.users.create_user(userid=test_user, password="password123")
except Exception:
pass # User might already exist
# Create a test file
file_path = "/test_share_update.txt"
file_content = b"Test file for permission updates"
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Create a share with read-only permissions
share_data = await nc_client.sharing.create_share(
path=file_path,
share_with=test_user,
share_type=0,
permissions=1, # Read-only
)
share_id = share_data["id"]
# Update to read+write permissions
updated_share = await nc_client.sharing.update_share(
share_id=share_id,
permissions=3, # Read + Write
)
assert updated_share["id"] == share_id
assert updated_share["permissions"] == 3
finally:
# Cleanup
if share_id:
await nc_client.sharing.delete_share(share_id)
await nc_client.webdav.delete_resource(file_path)
# Cleanup test user
try:
await nc_client.users.delete_user(test_user)
except Exception:
pass
@pytest.mark.asyncio
async def test_list_shares(nc_client):
"""Test listing all shares."""
# Create a test user to share with
test_user = "testuser3"
try:
await nc_client.users.create_user(userid=test_user, password="password123")
except Exception:
pass # User might already exist
# Create a test file
file_path = "/test_list_shares.txt"
file_content = b"Test file for listing shares"
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Create a share
share_data = await nc_client.sharing.create_share(
path=file_path,
share_with=test_user,
share_type=0,
permissions=1,
)
share_id = share_data["id"]
# List all shares
all_shares = await nc_client.sharing.list_shares()
assert len(all_shares) > 0
# List shares for specific file
file_shares = await nc_client.sharing.list_shares(path=file_path)
assert len(file_shares) > 0
assert any(s["id"] == share_id for s in file_shares)
finally:
# Cleanup
if share_id:
await nc_client.sharing.delete_share(share_id)
await nc_client.webdav.delete_resource(file_path)
# Cleanup test user
try:
await nc_client.users.delete_user(test_user)
except Exception:
pass
+159 -30
View File
@@ -1267,16 +1267,19 @@ async def _get_oauth_token_for_user(
logger.info(f"Getting OAuth token for user: {username}...")
logger.info(f"Using shared OAuth client: {client_id[:16]}...")
# Construct authorization URL
# Construct authorization URL with properly encoded redirect_uri
from urllib.parse import quote
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={callback_url}&"
f"redirect_uri={quote(callback_url, safe='')}&"
f"scope=openid%20profile%20email"
)
logger.info(f"Performing browser OAuth flow for {username}...")
logger.debug(f"Authorization URL: {auth_url}")
# Browser automation
context = await browser.new_context(ignore_https_errors=True)
@@ -1354,49 +1357,82 @@ async def _get_oauth_token_for_user(
return access_token
# Session-scoped OAuth token fixtures to avoid re-registering clients
# Parallel token retrieval fixture - fetches all OAuth tokens concurrently
@pytest.fixture(scope="session")
async def alice_oauth_token(
async def all_oauth_tokens(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
) -> dict[str, str]:
"""
Fetch OAuth tokens for all test users in parallel for speed.
Returns a dict mapping username to OAuth access token.
This is significantly faster than fetching tokens sequentially.
Note: We add a small stagger between starting each flow to avoid
race conditions in Nextcloud's OAuth session handling.
"""
import asyncio
import time
start_time = time.time()
logger.info("Fetching OAuth tokens for all users in parallel...")
async def get_token_with_delay(username: str, config: dict, delay: float):
"""Get token for a user after a small delay to stagger requests."""
if delay > 0:
await asyncio.sleep(delay)
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, username, config["password"]
)
# Create tasks for all users with staggered starts (0.5s apart)
tasks = {
username: get_token_with_delay(username, config, idx * 0.5)
for idx, (username, config) in enumerate(test_users_setup.items())
}
# Run all token fetches concurrently
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Build result dict, handling any errors
tokens = {}
for username, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
logger.error(f"Failed to get OAuth token for {username}: {result}")
raise result
tokens[username] = result
elapsed = time.time() - start_time
logger.info(
f"Successfully fetched {len(tokens)} OAuth tokens in parallel "
f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)"
)
return tokens
# Session-scoped OAuth token fixtures - now use the parallel fixture
@pytest.fixture(scope="session")
async def alice_oauth_token(all_oauth_tokens) -> str:
"""OAuth token for alice (cached for session). Uses shared OAuth client."""
config = test_users_setup["alice"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "alice", config["password"]
)
return all_oauth_tokens["alice"]
@pytest.fixture(scope="session")
async def bob_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
async def bob_oauth_token(all_oauth_tokens) -> str:
"""OAuth token for bob (cached for session). Uses shared OAuth client."""
config = test_users_setup["bob"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "bob", config["password"]
)
return all_oauth_tokens["bob"]
@pytest.fixture(scope="session")
async def charlie_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
async def charlie_oauth_token(all_oauth_tokens) -> str:
"""OAuth token for charlie (cached for session). Uses shared OAuth client."""
config = test_users_setup["charlie"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "charlie", config["password"]
)
return all_oauth_tokens["charlie"]
@pytest.fixture(scope="session")
async def diana_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
async def diana_oauth_token(all_oauth_tokens) -> str:
"""OAuth token for diana (cached for session). Uses shared OAuth client."""
config = test_users_setup["diana"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "diana", config["password"]
)
return all_oauth_tokens["diana"]
@pytest.fixture(scope="session")
@@ -1526,3 +1562,96 @@ async def diana_mcp_client(diana_oauth_token) -> AsyncGenerator[ClientSession, A
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing diana streamable context: {e}")
# Test user/group fixtures for clean test isolation
@pytest.fixture
async def test_user(nc_client: NextcloudClient):
"""
Fixture that creates a test user and cleans it up after the test.
Returns a dict with user details that can be customized.
Usage:
async def test_something(test_user):
user_config = test_user
await nc_client.users.create_user(**user_config)
"""
import uuid
# Generate unique user ID to avoid conflicts
userid = f"testuser_{uuid.uuid4().hex[:8]}"
password = "SecureTestPassword123!"
user_config = {
"userid": userid,
"password": password,
"display_name": f"Test User {userid}",
"email": f"{userid}@example.com",
}
# Cleanup before (in case of previous failed run)
try:
await nc_client.users.delete_user(userid)
except Exception:
pass
yield user_config
# Cleanup after test
try:
await nc_client.users.delete_user(userid)
logger.debug(f"Cleaned up test user: {userid}")
except Exception as e:
logger.warning(f"Failed to cleanup test user {userid}: {e}")
@pytest.fixture
async def test_group(nc_client: NextcloudClient):
"""
Fixture that creates a test group and cleans it up after the test.
Returns the group ID.
"""
import uuid
# Generate unique group ID to avoid conflicts
groupid = f"testgroup_{uuid.uuid4().hex[:8]}"
# Cleanup before (in case of previous failed run)
try:
await nc_client.groups.delete_group(groupid)
except Exception:
pass
# Create the group
await nc_client.groups.create_group(groupid)
logger.debug(f"Created test group: {groupid}")
yield groupid
# Cleanup after test
try:
await nc_client.groups.delete_group(groupid)
logger.debug(f"Cleaned up test group: {groupid}")
except Exception as e:
logger.warning(f"Failed to cleanup test group {groupid}: {e}")
@pytest.fixture
async def test_user_in_group(nc_client: NextcloudClient, test_user, test_group):
"""
Fixture that creates a test user and adds them to a test group.
Returns a tuple of (user_config, groupid).
"""
user_config = test_user
groupid = test_group
# Create the user
await nc_client.users.create_user(**user_config)
# Add user to group
await nc_client.users.add_user_to_group(user_config["userid"], groupid)
logger.debug(f"Added user {user_config['userid']} to group {groupid}")
yield (user_config, groupid)
+205 -132
View File
@@ -3,6 +3,9 @@ Multi-user OAuth tests for Nextcloud WebDAV file permissions.
Tests verify that the MCP server respects Nextcloud file sharing permissions
when accessed via OAuth authentication with different users.
All operations (file creation, sharing, access) are performed through MCP tools
to ensure the MCP server properly supports multi-user scenarios.
"""
import json
@@ -15,77 +18,48 @@ logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def create_share(nc_client, path: str, share_with: str, permissions: int = 1):
"""
Helper to create a file share using OCS Sharing API.
Args:
nc_client: Admin NextcloudClient
path: Path to file/folder to share
share_with: Username to share with
permissions: Share permissions (1=read, 15=all, 19=read+write+share)
Returns:
Share ID
"""
# Use the authenticated client's internal HTTP client
response = await nc_client._client.post(
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data={
"path": path,
"shareType": 0, # 0 = user share
"shareWith": share_with,
"permissions": permissions,
},
)
response.raise_for_status()
data = response.json()
share_id = data["ocs"]["data"]["id"]
logger.info(
f"Created share {share_id}: {path} -> {share_with} (permissions={permissions})"
)
return share_id
async def delete_share(nc_client, share_id: int):
"""Helper to delete a file share."""
response = await nc_client._client.delete(
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
logger.info(f"Deleted share {share_id}")
@pytest.mark.asyncio
async def test_file_share_read_permissions(
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that shared files respect read permissions.
Scenario:
1. Admin creates a file as alice
2. Admin shares the file with bob (read-only)
1. Alice creates a file via MCP
2. Alice shares the file with Bob (read-only) via MCP
3. Bob can read the file via MCP tools
4. Diana cannot access the file (no share)
"""
# Create a file as alice
file_path = "/alice_shared_file_read.txt"
file_content = b"This file is shared with Bob for reading only."
file_content = "This file is shared with Bob for reading only."
logger.info(f"Creating file as alice: {file_path}")
# Note: We're using admin client to create file as alice
# In a real scenario, we'd need to impersonate alice or use alice's OAuth client
await nc_client.webdav.write_file(file_path, file_content)
# Alice creates a file
logger.info(f"Alice creating file: {file_path}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": file_content},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
share_id = None
try:
# Share the file with bob (read-only, permissions=1)
logger.info("Sharing file with bob (read-only)...")
share_id = await create_share(nc_client, file_path, "bob", permissions=1)
# Alice shares the file with bob (read-only, permissions=1)
logger.info("Alice sharing file with bob (read-only)...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": file_path,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to create share: {result.content}"
share_data = json.loads(result.content[0].text)
share_id = share_data["id"]
logger.info(f"Created share {share_id}")
# Test: Bob reads the file via MCP
logger.info("Bob attempting to read file via MCP...")
@@ -100,6 +74,7 @@ async def test_file_share_read_permissions(
f"Bob successfully read file: {response_data.get('content', '')[:50]}..."
)
assert "content" in response_data
assert file_content in response_data["content"]
else:
logger.warning(f"Bob could not read file: {result.content}")
# This might fail if the share path is different for bob
@@ -117,56 +92,86 @@ async def test_file_share_read_permissions(
logger.warning("Diana unexpectedly could read unshared file")
finally:
# Cleanup
# Cleanup - Alice deletes the share and file
if share_id:
await delete_share(nc_client, share_id)
logger.info(f"Deleting file {file_path}")
await nc_client.webdav.delete_resource(file_path)
logger.info(f"Alice deleting share {share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": share_id}
)
logger.info(f"Alice deleting file {file_path}")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": file_path}
)
@pytest.mark.asyncio
async def test_file_share_write_permissions(
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that shared files respect write permissions.
Scenario:
1. Admin creates a file as alice
2. Admin shares the file with charlie (edit permission)
3. Admin shares the file with bob (read-only)
1. Alice creates a file via MCP
2. Alice shares the file with Charlie (edit permission) via MCP
3. Alice shares the file with Bob (read-only) via MCP
4. Charlie can edit the file via MCP tools
5. Bob cannot edit the file
"""
# Create a file as alice
file_path = "/alice_shared_file_write.txt"
file_content = b"This file is shared with Charlie for editing."
file_content = "This file is shared with Charlie for editing."
logger.info(f"Creating file as alice: {file_path}")
await nc_client.webdav.write_file(file_path, file_content)
logger.info(f"Alice creating file: {file_path}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": file_content},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
charlie_share_id = None
bob_share_id = None
try:
# Share with charlie (read+write, permissions=3)
logger.info("Sharing file with charlie (edit permission)...")
charlie_share_id = await create_share(
nc_client, file_path, "charlie", permissions=3
# Alice shares with Charlie (read+write, permissions=3)
logger.info("Alice sharing file with Charlie (edit permission)...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": file_path,
"share_with": "charlie",
"share_type": 0,
"permissions": 3,
},
)
assert not result.isError, (
f"Alice failed to share with Charlie: {result.content}"
)
charlie_share_data = json.loads(result.content[0].text)
charlie_share_id = charlie_share_data["id"]
logger.info(f"Created share {charlie_share_id} for Charlie")
# Share with bob (read-only, permissions=1)
logger.info("Sharing file with bob (read-only)...")
bob_share_id = await create_share(nc_client, file_path, "bob", permissions=1)
# Alice shares with Bob (read-only, permissions=1)
logger.info("Alice sharing file with Bob (read-only)...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": file_path,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to share with Bob: {result.content}"
bob_share_data = json.loads(result.content[0].text)
bob_share_id = bob_share_data["id"]
logger.info(f"Created share {bob_share_id} for Bob")
# Test: Charlie can write to the file
logger.info("Charlie attempting to write to file via MCP...")
updated_content = (
b"This file is shared with Charlie for editing.\nCharlie added this line."
)
updated_content = f"{file_content}\nCharlie added this line."
result = await charlie_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": updated_content.decode("utf-8")},
arguments={"path": file_path, "content": updated_content},
)
if not result.isError:
@@ -188,46 +193,80 @@ async def test_file_share_write_permissions(
logger.warning("Bob unexpectedly succeeded in writing (permissions issue?)")
finally:
# Cleanup
# Cleanup - Alice deletes shares and file
if charlie_share_id:
await delete_share(nc_client, charlie_share_id)
logger.info(f"Alice deleting Charlie's share {charlie_share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": charlie_share_id}
)
if bob_share_id:
await delete_share(nc_client, bob_share_id)
logger.info(f"Deleting file {file_path}")
await nc_client.webdav.delete_resource(file_path)
logger.info(f"Alice deleting Bob's share {bob_share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": bob_share_id}
)
logger.info(f"Alice deleting file {file_path}")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": file_path}
)
@pytest.mark.asyncio
async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client):
async def test_file_list_permissions(alice_mcp_client, bob_mcp_client):
"""
Test that file listing respects share permissions.
Scenario:
1. Admin creates alice's private file
2. Admin creates bob's private file
3. Admin creates a shared file
4. Alice can only list her own files + shared files
5. Bob can only list his own files + shared files
1. Alice creates her private file via MCP
2. Bob creates his private file via MCP
3. Alice creates a file and shares it with Bob via MCP
4. Alice can list her own files + shared files
5. Bob can list his own files + shared files from Alice
"""
alice_file = "/alice_private_file.txt"
bob_file = "/bob_private_file.txt"
shared_file = "/shared_file.txt"
shared_file = "/alice_shared_with_bob.txt"
logger.info("Creating test files...")
await nc_client.webdav.write_file(alice_file, b"Alice's private file")
await nc_client.webdav.write_file(bob_file, b"Bob's private file")
await nc_client.webdav.write_file(shared_file, b"Shared file content")
# Alice creates her private file
logger.info(f"Alice creating private file: {alice_file}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": alice_file, "content": "Alice's private file"},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
alice_share_id = None
bob_share_id = None
# Bob creates his private file
logger.info(f"Bob creating private file: {bob_file}")
result = await bob_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": bob_file, "content": "Bob's private file"},
)
assert not result.isError, f"Bob failed to create file: {result.content}"
# Alice creates a shared file
logger.info(f"Alice creating shared file: {shared_file}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": shared_file, "content": "Shared file content"},
)
assert not result.isError, f"Alice failed to create shared file: {result.content}"
share_id = None
try:
# Share the shared file with both alice and bob
logger.info("Sharing file with alice and bob...")
alice_share_id = await create_share(
nc_client, shared_file, "alice", permissions=1
# Alice shares the file with Bob
logger.info("Alice sharing file with Bob...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": shared_file,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
bob_share_id = await create_share(nc_client, shared_file, "bob", permissions=1)
assert not result.isError, f"Alice failed to create share: {result.content}"
share_data = json.loads(result.content[0].text)
share_id = share_data["id"]
# Test: Alice lists files in root
logger.info("Alice listing files via MCP...")
@@ -237,14 +276,13 @@ async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list, not wrapped in a dict
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
file_names = [f["name"] for f in response_data]
logger.info(f"Alice can see files: {file_names}")
# Alice should see her own file and shared file, but not bob's
# Note: This depends on how Nextcloud handles file ownership
# Alice should see her own files
# Note: Exact assertions depend on test isolation
else:
logger.warning(f"Alice could not list files: {result.content}")
@@ -256,56 +294,86 @@ async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list, not wrapped in a dict
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
file_names = [f["name"] for f in response_data]
logger.info(f"Bob can see files: {file_names}")
# Bob should see his own file and shared file, but not alice's
# Bob should see his own file, but not Alice's private file
# Bob may see shared files in his shared folder or via different path
else:
logger.warning(f"Bob could not list files: {result.content}")
finally:
# Cleanup
if alice_share_id:
await delete_share(nc_client, alice_share_id)
if bob_share_id:
await delete_share(nc_client, bob_share_id)
if share_id:
logger.info(f"Alice deleting share {share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": share_id}
)
logger.info("Cleaning up test files...")
await nc_client.webdav.delete_resource(alice_file)
await nc_client.webdav.delete_resource(bob_file)
await nc_client.webdav.delete_resource(shared_file)
logger.info("Cleaning up Alice's files...")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": alice_file}
)
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": shared_file}
)
logger.info("Cleaning up Bob's files...")
await bob_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": bob_file}
)
@pytest.mark.asyncio
async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_client):
async def test_folder_share_permissions(alice_mcp_client, bob_mcp_client):
"""
Test that folder sharing works correctly.
Scenario:
1. Admin creates a folder as alice
2. Admin creates files in the folder
3. Admin shares the folder with bob
4. Bob can access files in the shared folder
1. Alice creates a folder via MCP
2. Alice creates files in the folder via MCP
3. Alice shares the folder with Bob via MCP
4. Bob can access files in the shared folder via MCP
"""
folder_path = "/alice_shared_folder"
file_in_folder = f"{folder_path}/document.txt"
file_content = b"This is a document in alice's shared folder"
file_content = "This is a document in Alice's shared folder"
logger.info(f"Creating folder: {folder_path}")
await nc_client.webdav.create_directory(folder_path)
# Alice creates folder
logger.info(f"Alice creating folder: {folder_path}")
result = await alice_mcp_client.call_tool(
"nc_webdav_create_directory", arguments={"path": folder_path}
)
assert not result.isError, f"Alice failed to create folder: {result.content}"
logger.info(f"Creating file in folder: {file_in_folder}")
await nc_client.webdav.write_file(file_in_folder, file_content)
# Alice creates file in folder
logger.info(f"Alice creating file in folder: {file_in_folder}")
result = await alice_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_in_folder, "content": file_content},
)
assert not result.isError, f"Alice failed to create file: {result.content}"
share_id = None
try:
# Share the folder with bob
logger.info("Sharing folder with bob...")
share_id = await create_share(nc_client, folder_path, "bob", permissions=1)
# Alice shares the folder with Bob
logger.info("Alice sharing folder with Bob...")
result = await alice_mcp_client.call_tool(
"nc_share_create",
arguments={
"path": folder_path,
"share_with": "bob",
"share_type": 0,
"permissions": 1,
},
)
assert not result.isError, f"Alice failed to create share: {result.content}"
share_data = json.loads(result.content[0].text)
share_id = share_data["id"]
logger.info(f"Created folder share {share_id}")
# Test: Bob lists the shared folder
logger.info("Bob attempting to list shared folder via MCP...")
@@ -315,7 +383,6 @@ async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_cli
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list, not wrapped in a dict
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
logger.info(f"Bob can see {len(response_data)} files in shared folder")
@@ -338,15 +405,21 @@ async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_cli
response_data = json.loads(result.content[0].text)
logger.info("Bob successfully read file in shared folder")
assert "content" in response_data
assert file_content in response_data["content"]
else:
logger.warning(
f"Bob could not read file in shared folder: {result.content}"
)
finally:
# Cleanup
# Cleanup - Alice deletes the share and folder
if share_id:
await delete_share(nc_client, share_id)
logger.info(f"Alice deleting share {share_id}")
await alice_mcp_client.call_tool(
"nc_share_delete", arguments={"share_id": share_id}
)
logger.info("Cleaning up test folder...")
await nc_client.webdav.delete_resource(folder_path)
logger.info("Alice cleaning up test folder...")
await alice_mcp_client.call_tool(
"nc_webdav_delete_resource", arguments={"path": folder_path}
)
+42 -60
View File
@@ -3,70 +3,53 @@ from nextcloud_mcp_server.client import NextcloudClient
@pytest.mark.asyncio
async def test_create_and_delete_user(nc_client: NextcloudClient):
userid = "testuser1"
password = "SecureTestPassword123!"
display_name = "Test User One"
email = "test1@example.com"
async def test_create_and_delete_user(nc_client: NextcloudClient, test_user):
"""Test creating a user and verifying deletion (cleanup by fixture)."""
user_config = test_user
# Create user
await nc_client.users.create_user(
userid=userid,
password=password,
display_name=display_name,
email=email,
)
await nc_client.users.create_user(**user_config)
# Verify user exists
users = await nc_client.users.search_users(search=userid)
assert userid in users
users = await nc_client.users.search_users(search=user_config["userid"])
assert user_config["userid"] in users
user_details = await nc_client.users.get_user_details(userid)
assert user_details.id == userid
assert user_details.displayname == display_name
assert user_details.email == email
user_details = await nc_client.users.get_user_details(user_config["userid"])
assert user_details.id == user_config["userid"]
assert user_details.displayname == user_config["display_name"]
assert user_details.email == user_config["email"]
# Delete user
await nc_client.users.delete_user(userid)
# Test deletion explicitly as part of test functionality
await nc_client.users.delete_user(user_config["userid"])
# Verify user is deleted
users = await nc_client.users.search_users(search=userid)
assert userid not in users
users = await nc_client.users.search_users(search=user_config["userid"])
assert user_config["userid"] not in users
# Note: Fixture cleanup will also try to delete but handle 404 gracefully
@pytest.mark.asyncio
async def test_update_user_field(nc_client: NextcloudClient):
userid = "testuser2"
password = "SecureTestPassword123!"
display_name = "Test User Two"
email = "test2@example.com"
async def test_update_user_field(nc_client: NextcloudClient, test_user):
"""Test updating user fields."""
user_config = test_user
await nc_client.users.create_user(
userid=userid,
password=password,
display_name=display_name,
email=email,
)
await nc_client.users.create_user(**user_config)
new_email = "new.test2@example.com"
await nc_client.users.update_user_field(userid, "email", new_email)
new_email = f"new.{user_config['email']}"
await nc_client.users.update_user_field(user_config["userid"], "email", new_email)
user_details = await nc_client.users.get_user_details(userid)
user_details = await nc_client.users.get_user_details(user_config["userid"])
assert user_details.email == new_email
await nc_client.users.delete_user(userid)
# Fixture will handle cleanup
@pytest.mark.asyncio
async def test_user_groups(nc_client: NextcloudClient):
userid = "testuser3"
password = "SecureTestPassword123!"
groupid = "testgroup"
async def test_user_groups(nc_client: NextcloudClient, test_user_in_group):
"""Test adding and removing users from groups."""
user_config, groupid = test_user_in_group
userid = user_config["userid"]
await nc_client.users.create_user(userid=userid, password=password)
# Add user to group
await nc_client.users.add_user_to_group(userid, groupid)
# Verify user is in group
groups = await nc_client.users.get_user_groups(userid)
assert groupid in groups
@@ -74,17 +57,17 @@ async def test_user_groups(nc_client: NextcloudClient):
await nc_client.users.remove_user_from_group(userid, groupid)
groups = await nc_client.users.get_user_groups(userid)
assert groupid not in groups
await nc_client.users.delete_user(userid)
# Fixtures will handle cleanup
@pytest.mark.asyncio
async def test_user_subadmins(nc_client: NextcloudClient):
userid = "testuser4"
password = "SecureTestPassword123!"
groupid = "subadmingroup"
async def test_user_subadmins(nc_client: NextcloudClient, test_user, test_group):
"""Test promoting and demoting subadmins."""
user_config = test_user
groupid = test_group
userid = user_config["userid"]
await nc_client.users.create_user(userid=userid, password=password)
await nc_client.users.create_user(**user_config)
# Promote to subadmin
await nc_client.users.promote_user_to_subadmin(userid, groupid)
@@ -95,16 +78,16 @@ async def test_user_subadmins(nc_client: NextcloudClient):
await nc_client.users.demote_user_from_subadmin(userid, groupid)
subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid)
assert groupid not in subadmin_groups
await nc_client.users.delete_user(userid)
# Fixtures will handle cleanup
@pytest.mark.asyncio
async def test_disable_enable_user(nc_client: NextcloudClient):
userid = "testuser5"
password = "SecureTestPassword123!"
async def test_disable_enable_user(nc_client: NextcloudClient, test_user):
"""Test disabling and enabling users."""
user_config = test_user
userid = user_config["userid"]
await nc_client.users.create_user(userid=userid, password=password)
await nc_client.users.create_user(**user_config)
# Disable user
await nc_client.users.disable_user(userid)
@@ -115,8 +98,7 @@ async def test_disable_enable_user(nc_client: NextcloudClient):
await nc_client.users.enable_user(userid)
user_details = await nc_client.users.get_user_details(userid)
assert user_details.enabled
await nc_client.users.delete_user(userid)
# Fixture will handle cleanup
@pytest.mark.asyncio