From b50e212f0508a9ca146a307fb906e8c9abae024b Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Wed, 15 Oct 2025 03:46:01 +0200 Subject: [PATCH] test: Add tests for sharing/groups --- tests/client/test_sharing_api.py | 166 ++++++++++ tests/conftest.py | 189 +++++++++-- tests/server/test_oauth_file_permissions.py | 337 ++++++++++++-------- tests/server/test_users_api.py | 102 +++--- 4 files changed, 572 insertions(+), 222 deletions(-) create mode 100644 tests/client/test_sharing_api.py diff --git a/tests/client/test_sharing_api.py b/tests/client/test_sharing_api.py new file mode 100644 index 0000000..4f916b5 --- /dev/null +++ b/tests/client/test_sharing_api.py @@ -0,0 +1,166 @@ +"""Integration tests for Nextcloud Sharing API client.""" + +import logging + +import pytest + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.integration + + +@pytest.mark.asyncio +async def test_create_and_delete_share(nc_client): + """Test creating and deleting a file share.""" + # Create a test user to share with + test_user = "testuser3" + try: + await nc_client.users.create_user(userid=test_user, password="password123") + except Exception: + pass # User might already exist + + # Create a test file + file_path = "/test_share_file.txt" + file_content = b"Test file for sharing" + + await nc_client.webdav.write_file(file_path, file_content) + + share_id = None + try: + # Create a share + share_data = await nc_client.sharing.create_share( + path=file_path, + share_with=test_user, # Share with test user + share_type=0, # User share + permissions=1, # Read-only + ) + + assert share_data is not None + assert "id" in share_data + share_id = share_data["id"] + logger.info(f"Created share: {share_id}") + + # Get share info + share_info = await nc_client.sharing.get_share(share_id) + assert share_info["id"] == share_id + assert share_info["path"] == file_path + assert share_info["permissions"] == 1 + + # List shares + shares = await nc_client.sharing.list_shares(path=file_path) + assert len(shares) > 0 + assert any(s["id"] == share_id for s in shares) + + finally: + # Cleanup + if share_id: + await nc_client.sharing.delete_share(share_id) + logger.info(f"Deleted share: {share_id}") + + await nc_client.webdav.delete_resource(file_path) + + # Cleanup test user + try: + await nc_client.users.delete_user(test_user) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_update_share_permissions(nc_client): + """Test updating share permissions.""" + # Create a test user to share with + test_user = "testuser3" + try: + await nc_client.users.create_user(userid=test_user, password="password123") + except Exception: + pass # User might already exist + + # Create a test file + file_path = "/test_share_update.txt" + file_content = b"Test file for permission updates" + + await nc_client.webdav.write_file(file_path, file_content) + + share_id = None + try: + # Create a share with read-only permissions + share_data = await nc_client.sharing.create_share( + path=file_path, + share_with=test_user, + share_type=0, + permissions=1, # Read-only + ) + share_id = share_data["id"] + + # Update to read+write permissions + updated_share = await nc_client.sharing.update_share( + share_id=share_id, + permissions=3, # Read + Write + ) + + assert updated_share["id"] == share_id + assert updated_share["permissions"] == 3 + + finally: + # Cleanup + if share_id: + await nc_client.sharing.delete_share(share_id) + + await nc_client.webdav.delete_resource(file_path) + + # Cleanup test user + try: + await nc_client.users.delete_user(test_user) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_list_shares(nc_client): + """Test listing all shares.""" + # Create a test user to share with + test_user = "testuser3" + try: + await nc_client.users.create_user(userid=test_user, password="password123") + except Exception: + pass # User might already exist + + # Create a test file + file_path = "/test_list_shares.txt" + file_content = b"Test file for listing shares" + + await nc_client.webdav.write_file(file_path, file_content) + + share_id = None + try: + # Create a share + share_data = await nc_client.sharing.create_share( + path=file_path, + share_with=test_user, + share_type=0, + permissions=1, + ) + share_id = share_data["id"] + + # List all shares + all_shares = await nc_client.sharing.list_shares() + assert len(all_shares) > 0 + + # List shares for specific file + file_shares = await nc_client.sharing.list_shares(path=file_path) + assert len(file_shares) > 0 + assert any(s["id"] == share_id for s in file_shares) + + finally: + # Cleanup + if share_id: + await nc_client.sharing.delete_share(share_id) + + await nc_client.webdav.delete_resource(file_path) + + # Cleanup test user + try: + await nc_client.users.delete_user(test_user) + except Exception: + pass diff --git a/tests/conftest.py b/tests/conftest.py index 0105928..1e599e8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1267,16 +1267,19 @@ async def _get_oauth_token_for_user( logger.info(f"Getting OAuth token for user: {username}...") logger.info(f"Using shared OAuth client: {client_id[:16]}...") - # Construct authorization URL + # Construct authorization URL with properly encoded redirect_uri + from urllib.parse import quote + auth_url = ( f"{authorization_endpoint}?" f"response_type=code&" f"client_id={client_id}&" - f"redirect_uri={callback_url}&" + f"redirect_uri={quote(callback_url, safe='')}&" f"scope=openid%20profile%20email" ) logger.info(f"Performing browser OAuth flow for {username}...") + logger.debug(f"Authorization URL: {auth_url}") # Browser automation context = await browser.new_context(ignore_https_errors=True) @@ -1354,49 +1357,82 @@ async def _get_oauth_token_for_user( return access_token -# Session-scoped OAuth token fixtures to avoid re-registering clients +# Parallel token retrieval fixture - fetches all OAuth tokens concurrently @pytest.fixture(scope="session") -async def alice_oauth_token( +async def all_oauth_tokens( browser, shared_oauth_client_credentials, test_users_setup -) -> str: +) -> dict[str, str]: + """ + Fetch OAuth tokens for all test users in parallel for speed. + + Returns a dict mapping username to OAuth access token. + This is significantly faster than fetching tokens sequentially. + + Note: We add a small stagger between starting each flow to avoid + race conditions in Nextcloud's OAuth session handling. + """ + import asyncio + import time + + start_time = time.time() + logger.info("Fetching OAuth tokens for all users in parallel...") + + async def get_token_with_delay(username: str, config: dict, delay: float): + """Get token for a user after a small delay to stagger requests.""" + if delay > 0: + await asyncio.sleep(delay) + return await _get_oauth_token_for_user( + browser, shared_oauth_client_credentials, username, config["password"] + ) + + # Create tasks for all users with staggered starts (0.5s apart) + tasks = { + username: get_token_with_delay(username, config, idx * 0.5) + for idx, (username, config) in enumerate(test_users_setup.items()) + } + + # Run all token fetches concurrently + results = await asyncio.gather(*tasks.values(), return_exceptions=True) + + # Build result dict, handling any errors + tokens = {} + for username, result in zip(tasks.keys(), results): + if isinstance(result, Exception): + logger.error(f"Failed to get OAuth token for {username}: {result}") + raise result + tokens[username] = result + + elapsed = time.time() - start_time + logger.info( + f"Successfully fetched {len(tokens)} OAuth tokens in parallel " + f"in {elapsed:.1f}s (~{elapsed / len(tokens):.1f}s per user)" + ) + return tokens + + +# Session-scoped OAuth token fixtures - now use the parallel fixture +@pytest.fixture(scope="session") +async def alice_oauth_token(all_oauth_tokens) -> str: """OAuth token for alice (cached for session). Uses shared OAuth client.""" - config = test_users_setup["alice"] - return await _get_oauth_token_for_user( - browser, shared_oauth_client_credentials, "alice", config["password"] - ) + return all_oauth_tokens["alice"] @pytest.fixture(scope="session") -async def bob_oauth_token( - browser, shared_oauth_client_credentials, test_users_setup -) -> str: +async def bob_oauth_token(all_oauth_tokens) -> str: """OAuth token for bob (cached for session). Uses shared OAuth client.""" - config = test_users_setup["bob"] - return await _get_oauth_token_for_user( - browser, shared_oauth_client_credentials, "bob", config["password"] - ) + return all_oauth_tokens["bob"] @pytest.fixture(scope="session") -async def charlie_oauth_token( - browser, shared_oauth_client_credentials, test_users_setup -) -> str: +async def charlie_oauth_token(all_oauth_tokens) -> str: """OAuth token for charlie (cached for session). Uses shared OAuth client.""" - config = test_users_setup["charlie"] - return await _get_oauth_token_for_user( - browser, shared_oauth_client_credentials, "charlie", config["password"] - ) + return all_oauth_tokens["charlie"] @pytest.fixture(scope="session") -async def diana_oauth_token( - browser, shared_oauth_client_credentials, test_users_setup -) -> str: +async def diana_oauth_token(all_oauth_tokens) -> str: """OAuth token for diana (cached for session). Uses shared OAuth client.""" - config = test_users_setup["diana"] - return await _get_oauth_token_for_user( - browser, shared_oauth_client_credentials, "diana", config["password"] - ) + return all_oauth_tokens["diana"] @pytest.fixture(scope="session") @@ -1526,3 +1562,96 @@ async def diana_mcp_client(diana_oauth_token) -> AsyncGenerator[ClientSession, A await streamable_context.__aexit__(None, None, None) except Exception as e: logger.debug(f"Error closing diana streamable context: {e}") + + +# Test user/group fixtures for clean test isolation +@pytest.fixture +async def test_user(nc_client: NextcloudClient): + """ + Fixture that creates a test user and cleans it up after the test. + + Returns a dict with user details that can be customized. + Usage: + async def test_something(test_user): + user_config = test_user + await nc_client.users.create_user(**user_config) + """ + import uuid + + # Generate unique user ID to avoid conflicts + userid = f"testuser_{uuid.uuid4().hex[:8]}" + password = "SecureTestPassword123!" + + user_config = { + "userid": userid, + "password": password, + "display_name": f"Test User {userid}", + "email": f"{userid}@example.com", + } + + # Cleanup before (in case of previous failed run) + try: + await nc_client.users.delete_user(userid) + except Exception: + pass + + yield user_config + + # Cleanup after test + try: + await nc_client.users.delete_user(userid) + logger.debug(f"Cleaned up test user: {userid}") + except Exception as e: + logger.warning(f"Failed to cleanup test user {userid}: {e}") + + +@pytest.fixture +async def test_group(nc_client: NextcloudClient): + """ + Fixture that creates a test group and cleans it up after the test. + + Returns the group ID. + """ + import uuid + + # Generate unique group ID to avoid conflicts + groupid = f"testgroup_{uuid.uuid4().hex[:8]}" + + # Cleanup before (in case of previous failed run) + try: + await nc_client.groups.delete_group(groupid) + except Exception: + pass + + # Create the group + await nc_client.groups.create_group(groupid) + logger.debug(f"Created test group: {groupid}") + + yield groupid + + # Cleanup after test + try: + await nc_client.groups.delete_group(groupid) + logger.debug(f"Cleaned up test group: {groupid}") + except Exception as e: + logger.warning(f"Failed to cleanup test group {groupid}: {e}") + + +@pytest.fixture +async def test_user_in_group(nc_client: NextcloudClient, test_user, test_group): + """ + Fixture that creates a test user and adds them to a test group. + + Returns a tuple of (user_config, groupid). + """ + user_config = test_user + groupid = test_group + + # Create the user + await nc_client.users.create_user(**user_config) + + # Add user to group + await nc_client.users.add_user_to_group(user_config["userid"], groupid) + logger.debug(f"Added user {user_config['userid']} to group {groupid}") + + yield (user_config, groupid) diff --git a/tests/server/test_oauth_file_permissions.py b/tests/server/test_oauth_file_permissions.py index 5c1c322..3d78a0f 100644 --- a/tests/server/test_oauth_file_permissions.py +++ b/tests/server/test_oauth_file_permissions.py @@ -3,6 +3,9 @@ Multi-user OAuth tests for Nextcloud WebDAV file permissions. Tests verify that the MCP server respects Nextcloud file sharing permissions when accessed via OAuth authentication with different users. + +All operations (file creation, sharing, access) are performed through MCP tools +to ensure the MCP server properly supports multi-user scenarios. """ import json @@ -15,77 +18,48 @@ logger = logging.getLogger(__name__) pytestmark = [pytest.mark.integration, pytest.mark.oauth] -async def create_share(nc_client, path: str, share_with: str, permissions: int = 1): - """ - Helper to create a file share using OCS Sharing API. - - Args: - nc_client: Admin NextcloudClient - path: Path to file/folder to share - share_with: Username to share with - permissions: Share permissions (1=read, 15=all, 19=read+write+share) - - Returns: - Share ID - """ - # Use the authenticated client's internal HTTP client - response = await nc_client._client.post( - "/ocs/v2.php/apps/files_sharing/api/v1/shares", - headers={"OCS-APIRequest": "true", "Accept": "application/json"}, - data={ - "path": path, - "shareType": 0, # 0 = user share - "shareWith": share_with, - "permissions": permissions, - }, - ) - response.raise_for_status() - data = response.json() - share_id = data["ocs"]["data"]["id"] - logger.info( - f"Created share {share_id}: {path} -> {share_with} (permissions={permissions})" - ) - return share_id - - -async def delete_share(nc_client, share_id: int): - """Helper to delete a file share.""" - response = await nc_client._client.delete( - f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}", - headers={"OCS-APIRequest": "true", "Accept": "application/json"}, - ) - response.raise_for_status() - logger.info(f"Deleted share {share_id}") - - @pytest.mark.asyncio async def test_file_share_read_permissions( - nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client + alice_mcp_client, bob_mcp_client, diana_mcp_client ): """ Test that shared files respect read permissions. Scenario: - 1. Admin creates a file as alice - 2. Admin shares the file with bob (read-only) + 1. Alice creates a file via MCP + 2. Alice shares the file with Bob (read-only) via MCP 3. Bob can read the file via MCP tools 4. Diana cannot access the file (no share) """ - # Create a file as alice file_path = "/alice_shared_file_read.txt" - file_content = b"This file is shared with Bob for reading only." + file_content = "This file is shared with Bob for reading only." - logger.info(f"Creating file as alice: {file_path}") - # Note: We're using admin client to create file as alice - # In a real scenario, we'd need to impersonate alice or use alice's OAuth client - await nc_client.webdav.write_file(file_path, file_content) + # Alice creates a file + logger.info(f"Alice creating file: {file_path}") + result = await alice_mcp_client.call_tool( + "nc_webdav_write_file", + arguments={"path": file_path, "content": file_content}, + ) + assert not result.isError, f"Alice failed to create file: {result.content}" share_id = None try: - # Share the file with bob (read-only, permissions=1) - logger.info("Sharing file with bob (read-only)...") - share_id = await create_share(nc_client, file_path, "bob", permissions=1) + # Alice shares the file with bob (read-only, permissions=1) + logger.info("Alice sharing file with bob (read-only)...") + result = await alice_mcp_client.call_tool( + "nc_share_create", + arguments={ + "path": file_path, + "share_with": "bob", + "share_type": 0, + "permissions": 1, + }, + ) + assert not result.isError, f"Alice failed to create share: {result.content}" + share_data = json.loads(result.content[0].text) + share_id = share_data["id"] + logger.info(f"Created share {share_id}") # Test: Bob reads the file via MCP logger.info("Bob attempting to read file via MCP...") @@ -100,6 +74,7 @@ async def test_file_share_read_permissions( f"Bob successfully read file: {response_data.get('content', '')[:50]}..." ) assert "content" in response_data + assert file_content in response_data["content"] else: logger.warning(f"Bob could not read file: {result.content}") # This might fail if the share path is different for bob @@ -117,56 +92,86 @@ async def test_file_share_read_permissions( logger.warning("Diana unexpectedly could read unshared file") finally: - # Cleanup + # Cleanup - Alice deletes the share and file if share_id: - await delete_share(nc_client, share_id) - logger.info(f"Deleting file {file_path}") - await nc_client.webdav.delete_resource(file_path) + logger.info(f"Alice deleting share {share_id}") + await alice_mcp_client.call_tool( + "nc_share_delete", arguments={"share_id": share_id} + ) + logger.info(f"Alice deleting file {file_path}") + await alice_mcp_client.call_tool( + "nc_webdav_delete_resource", arguments={"path": file_path} + ) @pytest.mark.asyncio async def test_file_share_write_permissions( - nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client + alice_mcp_client, charlie_mcp_client, bob_mcp_client ): """ Test that shared files respect write permissions. Scenario: - 1. Admin creates a file as alice - 2. Admin shares the file with charlie (edit permission) - 3. Admin shares the file with bob (read-only) + 1. Alice creates a file via MCP + 2. Alice shares the file with Charlie (edit permission) via MCP + 3. Alice shares the file with Bob (read-only) via MCP 4. Charlie can edit the file via MCP tools 5. Bob cannot edit the file """ - # Create a file as alice file_path = "/alice_shared_file_write.txt" - file_content = b"This file is shared with Charlie for editing." + file_content = "This file is shared with Charlie for editing." - logger.info(f"Creating file as alice: {file_path}") - await nc_client.webdav.write_file(file_path, file_content) + logger.info(f"Alice creating file: {file_path}") + result = await alice_mcp_client.call_tool( + "nc_webdav_write_file", + arguments={"path": file_path, "content": file_content}, + ) + assert not result.isError, f"Alice failed to create file: {result.content}" charlie_share_id = None bob_share_id = None try: - # Share with charlie (read+write, permissions=3) - logger.info("Sharing file with charlie (edit permission)...") - charlie_share_id = await create_share( - nc_client, file_path, "charlie", permissions=3 + # Alice shares with Charlie (read+write, permissions=3) + logger.info("Alice sharing file with Charlie (edit permission)...") + result = await alice_mcp_client.call_tool( + "nc_share_create", + arguments={ + "path": file_path, + "share_with": "charlie", + "share_type": 0, + "permissions": 3, + }, ) + assert not result.isError, ( + f"Alice failed to share with Charlie: {result.content}" + ) + charlie_share_data = json.loads(result.content[0].text) + charlie_share_id = charlie_share_data["id"] + logger.info(f"Created share {charlie_share_id} for Charlie") - # Share with bob (read-only, permissions=1) - logger.info("Sharing file with bob (read-only)...") - bob_share_id = await create_share(nc_client, file_path, "bob", permissions=1) + # Alice shares with Bob (read-only, permissions=1) + logger.info("Alice sharing file with Bob (read-only)...") + result = await alice_mcp_client.call_tool( + "nc_share_create", + arguments={ + "path": file_path, + "share_with": "bob", + "share_type": 0, + "permissions": 1, + }, + ) + assert not result.isError, f"Alice failed to share with Bob: {result.content}" + bob_share_data = json.loads(result.content[0].text) + bob_share_id = bob_share_data["id"] + logger.info(f"Created share {bob_share_id} for Bob") # Test: Charlie can write to the file logger.info("Charlie attempting to write to file via MCP...") - updated_content = ( - b"This file is shared with Charlie for editing.\nCharlie added this line." - ) + updated_content = f"{file_content}\nCharlie added this line." result = await charlie_mcp_client.call_tool( "nc_webdav_write_file", - arguments={"path": file_path, "content": updated_content.decode("utf-8")}, + arguments={"path": file_path, "content": updated_content}, ) if not result.isError: @@ -188,46 +193,80 @@ async def test_file_share_write_permissions( logger.warning("Bob unexpectedly succeeded in writing (permissions issue?)") finally: - # Cleanup + # Cleanup - Alice deletes shares and file if charlie_share_id: - await delete_share(nc_client, charlie_share_id) + logger.info(f"Alice deleting Charlie's share {charlie_share_id}") + await alice_mcp_client.call_tool( + "nc_share_delete", arguments={"share_id": charlie_share_id} + ) if bob_share_id: - await delete_share(nc_client, bob_share_id) - logger.info(f"Deleting file {file_path}") - await nc_client.webdav.delete_resource(file_path) + logger.info(f"Alice deleting Bob's share {bob_share_id}") + await alice_mcp_client.call_tool( + "nc_share_delete", arguments={"share_id": bob_share_id} + ) + logger.info(f"Alice deleting file {file_path}") + await alice_mcp_client.call_tool( + "nc_webdav_delete_resource", arguments={"path": file_path} + ) @pytest.mark.asyncio -async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client): +async def test_file_list_permissions(alice_mcp_client, bob_mcp_client): """ Test that file listing respects share permissions. Scenario: - 1. Admin creates alice's private file - 2. Admin creates bob's private file - 3. Admin creates a shared file - 4. Alice can only list her own files + shared files - 5. Bob can only list his own files + shared files + 1. Alice creates her private file via MCP + 2. Bob creates his private file via MCP + 3. Alice creates a file and shares it with Bob via MCP + 4. Alice can list her own files + shared files + 5. Bob can list his own files + shared files from Alice """ alice_file = "/alice_private_file.txt" bob_file = "/bob_private_file.txt" - shared_file = "/shared_file.txt" + shared_file = "/alice_shared_with_bob.txt" - logger.info("Creating test files...") - await nc_client.webdav.write_file(alice_file, b"Alice's private file") - await nc_client.webdav.write_file(bob_file, b"Bob's private file") - await nc_client.webdav.write_file(shared_file, b"Shared file content") + # Alice creates her private file + logger.info(f"Alice creating private file: {alice_file}") + result = await alice_mcp_client.call_tool( + "nc_webdav_write_file", + arguments={"path": alice_file, "content": "Alice's private file"}, + ) + assert not result.isError, f"Alice failed to create file: {result.content}" - alice_share_id = None - bob_share_id = None + # Bob creates his private file + logger.info(f"Bob creating private file: {bob_file}") + result = await bob_mcp_client.call_tool( + "nc_webdav_write_file", + arguments={"path": bob_file, "content": "Bob's private file"}, + ) + assert not result.isError, f"Bob failed to create file: {result.content}" + + # Alice creates a shared file + logger.info(f"Alice creating shared file: {shared_file}") + result = await alice_mcp_client.call_tool( + "nc_webdav_write_file", + arguments={"path": shared_file, "content": "Shared file content"}, + ) + assert not result.isError, f"Alice failed to create shared file: {result.content}" + + share_id = None try: - # Share the shared file with both alice and bob - logger.info("Sharing file with alice and bob...") - alice_share_id = await create_share( - nc_client, shared_file, "alice", permissions=1 + # Alice shares the file with Bob + logger.info("Alice sharing file with Bob...") + result = await alice_mcp_client.call_tool( + "nc_share_create", + arguments={ + "path": shared_file, + "share_with": "bob", + "share_type": 0, + "permissions": 1, + }, ) - bob_share_id = await create_share(nc_client, shared_file, "bob", permissions=1) + assert not result.isError, f"Alice failed to create share: {result.content}" + share_data = json.loads(result.content[0].text) + share_id = share_data["id"] # Test: Alice lists files in root logger.info("Alice listing files via MCP...") @@ -237,14 +276,13 @@ async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client if not result.isError: response_data = json.loads(result.content[0].text) - # The response is directly a list, not wrapped in a dict if not isinstance(response_data, list): response_data = [response_data] if response_data else [] file_names = [f["name"] for f in response_data] logger.info(f"Alice can see files: {file_names}") - # Alice should see her own file and shared file, but not bob's - # Note: This depends on how Nextcloud handles file ownership + # Alice should see her own files + # Note: Exact assertions depend on test isolation else: logger.warning(f"Alice could not list files: {result.content}") @@ -256,56 +294,86 @@ async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client if not result.isError: response_data = json.loads(result.content[0].text) - # The response is directly a list, not wrapped in a dict if not isinstance(response_data, list): response_data = [response_data] if response_data else [] file_names = [f["name"] for f in response_data] logger.info(f"Bob can see files: {file_names}") - # Bob should see his own file and shared file, but not alice's + # Bob should see his own file, but not Alice's private file + # Bob may see shared files in his shared folder or via different path else: logger.warning(f"Bob could not list files: {result.content}") finally: # Cleanup - if alice_share_id: - await delete_share(nc_client, alice_share_id) - if bob_share_id: - await delete_share(nc_client, bob_share_id) + if share_id: + logger.info(f"Alice deleting share {share_id}") + await alice_mcp_client.call_tool( + "nc_share_delete", arguments={"share_id": share_id} + ) - logger.info("Cleaning up test files...") - await nc_client.webdav.delete_resource(alice_file) - await nc_client.webdav.delete_resource(bob_file) - await nc_client.webdav.delete_resource(shared_file) + logger.info("Cleaning up Alice's files...") + await alice_mcp_client.call_tool( + "nc_webdav_delete_resource", arguments={"path": alice_file} + ) + await alice_mcp_client.call_tool( + "nc_webdav_delete_resource", arguments={"path": shared_file} + ) + + logger.info("Cleaning up Bob's files...") + await bob_mcp_client.call_tool( + "nc_webdav_delete_resource", arguments={"path": bob_file} + ) @pytest.mark.asyncio -async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_client): +async def test_folder_share_permissions(alice_mcp_client, bob_mcp_client): """ Test that folder sharing works correctly. Scenario: - 1. Admin creates a folder as alice - 2. Admin creates files in the folder - 3. Admin shares the folder with bob - 4. Bob can access files in the shared folder + 1. Alice creates a folder via MCP + 2. Alice creates files in the folder via MCP + 3. Alice shares the folder with Bob via MCP + 4. Bob can access files in the shared folder via MCP """ folder_path = "/alice_shared_folder" file_in_folder = f"{folder_path}/document.txt" - file_content = b"This is a document in alice's shared folder" + file_content = "This is a document in Alice's shared folder" - logger.info(f"Creating folder: {folder_path}") - await nc_client.webdav.create_directory(folder_path) + # Alice creates folder + logger.info(f"Alice creating folder: {folder_path}") + result = await alice_mcp_client.call_tool( + "nc_webdav_create_directory", arguments={"path": folder_path} + ) + assert not result.isError, f"Alice failed to create folder: {result.content}" - logger.info(f"Creating file in folder: {file_in_folder}") - await nc_client.webdav.write_file(file_in_folder, file_content) + # Alice creates file in folder + logger.info(f"Alice creating file in folder: {file_in_folder}") + result = await alice_mcp_client.call_tool( + "nc_webdav_write_file", + arguments={"path": file_in_folder, "content": file_content}, + ) + assert not result.isError, f"Alice failed to create file: {result.content}" share_id = None try: - # Share the folder with bob - logger.info("Sharing folder with bob...") - share_id = await create_share(nc_client, folder_path, "bob", permissions=1) + # Alice shares the folder with Bob + logger.info("Alice sharing folder with Bob...") + result = await alice_mcp_client.call_tool( + "nc_share_create", + arguments={ + "path": folder_path, + "share_with": "bob", + "share_type": 0, + "permissions": 1, + }, + ) + assert not result.isError, f"Alice failed to create share: {result.content}" + share_data = json.loads(result.content[0].text) + share_id = share_data["id"] + logger.info(f"Created folder share {share_id}") # Test: Bob lists the shared folder logger.info("Bob attempting to list shared folder via MCP...") @@ -315,7 +383,6 @@ async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_cli if not result.isError: response_data = json.loads(result.content[0].text) - # The response is directly a list, not wrapped in a dict if not isinstance(response_data, list): response_data = [response_data] if response_data else [] logger.info(f"Bob can see {len(response_data)} files in shared folder") @@ -338,15 +405,21 @@ async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_cli response_data = json.loads(result.content[0].text) logger.info("Bob successfully read file in shared folder") assert "content" in response_data + assert file_content in response_data["content"] else: logger.warning( f"Bob could not read file in shared folder: {result.content}" ) finally: - # Cleanup + # Cleanup - Alice deletes the share and folder if share_id: - await delete_share(nc_client, share_id) + logger.info(f"Alice deleting share {share_id}") + await alice_mcp_client.call_tool( + "nc_share_delete", arguments={"share_id": share_id} + ) - logger.info("Cleaning up test folder...") - await nc_client.webdav.delete_resource(folder_path) + logger.info("Alice cleaning up test folder...") + await alice_mcp_client.call_tool( + "nc_webdav_delete_resource", arguments={"path": folder_path} + ) diff --git a/tests/server/test_users_api.py b/tests/server/test_users_api.py index e3360f6..172aa15 100644 --- a/tests/server/test_users_api.py +++ b/tests/server/test_users_api.py @@ -3,70 +3,53 @@ from nextcloud_mcp_server.client import NextcloudClient @pytest.mark.asyncio -async def test_create_and_delete_user(nc_client: NextcloudClient): - userid = "testuser1" - password = "SecureTestPassword123!" - display_name = "Test User One" - email = "test1@example.com" +async def test_create_and_delete_user(nc_client: NextcloudClient, test_user): + """Test creating a user and verifying deletion (cleanup by fixture).""" + user_config = test_user # Create user - await nc_client.users.create_user( - userid=userid, - password=password, - display_name=display_name, - email=email, - ) + await nc_client.users.create_user(**user_config) # Verify user exists - users = await nc_client.users.search_users(search=userid) - assert userid in users + users = await nc_client.users.search_users(search=user_config["userid"]) + assert user_config["userid"] in users - user_details = await nc_client.users.get_user_details(userid) - assert user_details.id == userid - assert user_details.displayname == display_name - assert user_details.email == email + user_details = await nc_client.users.get_user_details(user_config["userid"]) + assert user_details.id == user_config["userid"] + assert user_details.displayname == user_config["display_name"] + assert user_details.email == user_config["email"] - # Delete user - await nc_client.users.delete_user(userid) + # Test deletion explicitly as part of test functionality + await nc_client.users.delete_user(user_config["userid"]) # Verify user is deleted - users = await nc_client.users.search_users(search=userid) - assert userid not in users + users = await nc_client.users.search_users(search=user_config["userid"]) + assert user_config["userid"] not in users + # Note: Fixture cleanup will also try to delete but handle 404 gracefully @pytest.mark.asyncio -async def test_update_user_field(nc_client: NextcloudClient): - userid = "testuser2" - password = "SecureTestPassword123!" - display_name = "Test User Two" - email = "test2@example.com" +async def test_update_user_field(nc_client: NextcloudClient, test_user): + """Test updating user fields.""" + user_config = test_user - await nc_client.users.create_user( - userid=userid, - password=password, - display_name=display_name, - email=email, - ) + await nc_client.users.create_user(**user_config) - new_email = "new.test2@example.com" - await nc_client.users.update_user_field(userid, "email", new_email) + new_email = f"new.{user_config['email']}" + await nc_client.users.update_user_field(user_config["userid"], "email", new_email) - user_details = await nc_client.users.get_user_details(userid) + user_details = await nc_client.users.get_user_details(user_config["userid"]) assert user_details.email == new_email - - await nc_client.users.delete_user(userid) + # Fixture will handle cleanup @pytest.mark.asyncio -async def test_user_groups(nc_client: NextcloudClient): - userid = "testuser3" - password = "SecureTestPassword123!" - groupid = "testgroup" +async def test_user_groups(nc_client: NextcloudClient, test_user_in_group): + """Test adding and removing users from groups.""" + user_config, groupid = test_user_in_group + userid = user_config["userid"] - await nc_client.users.create_user(userid=userid, password=password) - - # Add user to group - await nc_client.users.add_user_to_group(userid, groupid) + # Verify user is in group groups = await nc_client.users.get_user_groups(userid) assert groupid in groups @@ -74,17 +57,17 @@ async def test_user_groups(nc_client: NextcloudClient): await nc_client.users.remove_user_from_group(userid, groupid) groups = await nc_client.users.get_user_groups(userid) assert groupid not in groups - - await nc_client.users.delete_user(userid) + # Fixtures will handle cleanup @pytest.mark.asyncio -async def test_user_subadmins(nc_client: NextcloudClient): - userid = "testuser4" - password = "SecureTestPassword123!" - groupid = "subadmingroup" +async def test_user_subadmins(nc_client: NextcloudClient, test_user, test_group): + """Test promoting and demoting subadmins.""" + user_config = test_user + groupid = test_group + userid = user_config["userid"] - await nc_client.users.create_user(userid=userid, password=password) + await nc_client.users.create_user(**user_config) # Promote to subadmin await nc_client.users.promote_user_to_subadmin(userid, groupid) @@ -95,16 +78,16 @@ async def test_user_subadmins(nc_client: NextcloudClient): await nc_client.users.demote_user_from_subadmin(userid, groupid) subadmin_groups = await nc_client.users.get_user_subadmin_groups(userid) assert groupid not in subadmin_groups - - await nc_client.users.delete_user(userid) + # Fixtures will handle cleanup @pytest.mark.asyncio -async def test_disable_enable_user(nc_client: NextcloudClient): - userid = "testuser5" - password = "SecureTestPassword123!" +async def test_disable_enable_user(nc_client: NextcloudClient, test_user): + """Test disabling and enabling users.""" + user_config = test_user + userid = user_config["userid"] - await nc_client.users.create_user(userid=userid, password=password) + await nc_client.users.create_user(**user_config) # Disable user await nc_client.users.disable_user(userid) @@ -115,8 +98,7 @@ async def test_disable_enable_user(nc_client: NextcloudClient): await nc_client.users.enable_user(userid) user_details = await nc_client.users.get_user_details(userid) assert user_details.enabled - - await nc_client.users.delete_user(userid) + # Fixture will handle cleanup @pytest.mark.asyncio