test: Fix multi-user tests

This commit is contained in:
Chris Coutinho
2025-10-15 02:11:17 +02:00
parent 7a4a31b52d
commit 7004104873
8 changed files with 1559 additions and 152 deletions
+3 -1
View File
@@ -4,4 +4,6 @@ __pycache__/
*.env
.env.local
.env.*.local
.nextcloud_oauth_test_client.json
# Generated by pytest used to login users
.nextcloud_oauth_shared_test_client.json
+14 -5
View File
@@ -123,7 +123,14 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
**Automated Testing (Default - Recommended for CI/CD):**
- **Default fixtures**: `nc_oauth_client`, `nc_mcp_oauth_client` now use Playwright automation by default
- Uses Playwright headless browser automation to complete OAuth flow programmatically
- **Shared OAuth Client**: All test users authenticate using a single OAuth client (matching MCP server behavior)
- Single `client_id`/`client_secret` pair is registered and reused for all test users
- Stored in `.nextcloud_oauth_shared_test_client.json` with `force_register=False` for reuse
- Reduces OAuth client registrations and matches production MCP server architecture
- All Playwright fixtures: `playwright_oauth_token`, `nc_oauth_client`, `nc_mcp_oauth_client`, `nc_oauth_client_playwright`, `nc_mcp_oauth_client_playwright`
- Multi-user fixtures: `alice_oauth_token`, `bob_oauth_token`, `charlie_oauth_token`, `diana_oauth_token`
- All use `shared_oauth_client_credentials` fixture for consistent client credentials
- Each user gets unique access tokens via same OAuth client (like multiple users using the MCP server)
- Requires: `NEXTCLOUD_HOST`, `NEXTCLOUD_USERNAME`, `NEXTCLOUD_PASSWORD` environment variables
- Uses `pytest-playwright-asyncio` for async Playwright fixtures
- Playwright configuration: Use pytest CLI args like `--browser firefox --headed` to customize
@@ -131,13 +138,13 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
- Example:
```bash
# Run all OAuth tests with automated Playwright flow using Firefox
uv run pytest tests/integration/test_oauth*.py --browser firefox -v
uv run pytest tests/server/test_oauth*.py --browser firefox -v
# Run specific Playwright tests with visible browser for debugging
uv run pytest tests/integration/test_oauth_playwright.py --browser firefox --headed -v
uv run pytest tests/server/test_mcp_oauth.py --browser firefox --headed -v
# Run with Chromium (default)
uv run pytest tests/integration/test_oauth.py -v
uv run pytest tests/server/test_oauth*.py -v
```
**Interactive Testing (Manual browser login):**
@@ -149,18 +156,20 @@ OAuth integration tests support both **automated** (Playwright) and **interactiv
- Example:
```bash
# Run OAuth tests with interactive flow (will open browser and wait for manual login)
uv run pytest tests/integration/test_oauth_interactive.py -v
uv run pytest tests/client/test_oauth_interactive.py -v
```
**Test Environment Setup:**
- Start OAuth MCP server: `docker-compose up --build -d mcp-oauth`
- OAuth server runs on port 8001 (regular MCP on 8000)
- Both flows register OAuth clients dynamically using Nextcloud's OIDC provider
- Shared OAuth client is registered once and reused across test runs
- Client credentials cached in `.nextcloud_oauth_shared_test_client.json`
**CI/CD Considerations:**
- Interactive OAuth tests are automatically skipped when `GITHUB_ACTIONS` environment variable is set
- Automated Playwright tests will run in CI/CD environments
- Use Firefox browser in CI: `--browser firefox` (Chromium may have issues with localhost redirects)
- Shared client approach reduces test time and API calls to Nextcloud
### Configuration Files
+1 -1
View File
@@ -63,7 +63,7 @@ services:
- 127.0.0.1:8001:8001
environment:
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.01:8001
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8001
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
# No USERNAME/PASSWORD - will use OAuth
volumes:
+16 -5
View File
@@ -99,7 +99,7 @@ class DeckClient(BaseNextcloudClient):
permission_edit: bool,
permission_share: bool,
permission_manage: bool,
) -> List[DeckACL]:
) -> DeckACL:
json_data = {
"type": type,
"participant": participant,
@@ -107,10 +107,14 @@ class DeckClient(BaseNextcloudClient):
"permissionShare": permission_share,
"permissionManage": permission_manage,
}
headers = self._get_deck_headers()
response = await self._make_request(
"POST", f"/apps/deck/api/v1.0/boards/{board_id}/acl", json=json_data
"POST",
f"/apps/deck/api/v1.0/boards/{board_id}/acl",
json=json_data,
headers=headers,
)
return [DeckACL(**acl) for acl in response.json()]
return DeckACL(**response.json())
async def update_acl_rule(
self,
@@ -127,13 +131,20 @@ class DeckClient(BaseNextcloudClient):
json_data["permissionShare"] = permission_share
if permission_manage is not None:
json_data["permissionManage"] = permission_manage
headers = self._get_deck_headers()
await self._make_request(
"PUT", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}", json=json_data
"PUT",
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
json=json_data,
headers=headers,
)
async def delete_acl_rule(self, board_id: int, acl_id: int) -> None:
headers = self._get_deck_headers()
await self._make_request(
"DELETE", f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}"
"DELETE",
f"/apps/deck/api/v1.0/boards/{board_id}/acl/{acl_id}",
headers=headers,
)
async def clone_board(
+555 -140
View File
@@ -810,18 +810,75 @@ async def interactive_oauth_token(oauth_callback_server) -> str:
@pytest.fixture(scope="session")
async def playwright_oauth_token(browser) -> str:
async def shared_oauth_client_credentials():
"""
Fixture to obtain shared OAuth client credentials that will be reused for all users.
This registers a single OAuth client with Nextcloud that matches the MCP server's
registration, allowing all test users to authenticate using the same client_id/secret.
Returns:
Tuple of (client_id, client_secret, callback_url, token_endpoint, authorization_endpoint)
"""
from nextcloud_mcp_server.auth.client_registration import load_or_register_client
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("Shared OAuth client requires NEXTCLOUD_HOST")
logger.info("Setting up shared OAuth client credentials for all test users...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
# OIDC Discovery
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
discovery_response = await http_client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
token_endpoint = oidc_config.get("token_endpoint")
registration_endpoint = oidc_config.get("registration_endpoint")
authorization_endpoint = oidc_config.get("authorization_endpoint")
if not all([token_endpoint, registration_endpoint, authorization_endpoint]):
raise ValueError("OIDC discovery missing required endpoints")
# Use callback URL that won't actually be used (we extract code from browser URL)
callback_url = "http://localhost:9999/oauth/callback"
# Register or load shared OAuth client (matches MCP server registration)
client_info = await load_or_register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
storage_path=".nextcloud_oauth_shared_test_client.json",
client_name="Nextcloud MCP Server - Shared Test Client",
redirect_uris=[callback_url],
force_register=False, # Reuse existing credentials if valid
)
logger.info(f"Shared OAuth client ready: {client_info.client_id[:16]}...")
logger.info("This client will be reused for all test user authentications")
return (
client_info.client_id,
client_info.client_secret,
callback_url,
token_endpoint,
authorization_endpoint,
)
@pytest.fixture(scope="session")
async def playwright_oauth_token(browser, shared_oauth_client_credentials) -> str:
"""
Fixture to obtain an OAuth access token using Playwright headless browser automation.
This fully automates the OAuth flow by:
1. Discovering OIDC endpoints
2. Registering an OAuth client
3. Navigating to authorization URL in headless browser
4. Programmatically filling in login form
5. Handling OAuth consent
6. Extracting auth code from redirect
7. Exchanging code for access token
1. Using shared OAuth client credentials (reused across all users)
2. Navigating to authorization URL in headless browser
3. Programmatically filling in login form
4. Handling OAuth consent
5. Extracting auth code from redirect
6. Exchanging code for access token
Environment variables required:
- NEXTCLOUD_HOST: Nextcloud instance URL
@@ -844,154 +901,110 @@ async def playwright_oauth_token(browser) -> str:
"Playwright OAuth requires NEXTCLOUD_HOST, NEXTCLOUD_USERNAME, and NEXTCLOUD_PASSWORD"
)
logger.info("Starting Playwright-based OAuth flow...")
# Unpack shared client credentials
client_id, client_secret, callback_url, token_endpoint, authorization_endpoint = (
shared_oauth_client_credentials
)
# Use async httpx for all HTTP operations
async with httpx.AsyncClient(timeout=30.0) as http_client:
# OIDC Discovery
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
logger.debug(f"Fetching OIDC discovery from: {discovery_url}")
logger.info(f"Starting Playwright-based OAuth flow for {username}...")
logger.info(f"Using shared OAuth client: {client_id[:16]}...")
discovery_response = await http_client.get(discovery_url)
discovery_response.raise_for_status()
oidc_config = discovery_response.json()
# Construct authorization URL
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={callback_url}&"
f"scope=openid%20profile%20email"
)
token_endpoint = oidc_config.get("token_endpoint")
registration_endpoint = oidc_config.get("registration_endpoint")
authorization_endpoint = oidc_config.get("authorization_endpoint")
# Async browser automation using pytest-playwright's browser fixture
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
if not all([token_endpoint, registration_endpoint, authorization_endpoint]):
raise ValueError("OIDC discovery missing required endpoints")
try:
# Navigate to authorization URL
logger.debug(f"Navigating to: {auth_url}")
await page.goto(auth_url, wait_until="networkidle", timeout=30000)
logger.debug(f"Authorization endpoint: {authorization_endpoint}")
logger.debug(f"Token endpoint: {token_endpoint}")
# Check if we need to login first
current_url = page.url
logger.debug(f"Current URL after navigation: {current_url}")
# Register OAuth client with a callback that won't actually be used
# (we'll extract the code from the browser URL instead)
callback_url = "http://localhost:9999/oauth/callback"
# If we're on a login page, fill in credentials
if "/login" in current_url or "/index.php/login" in current_url:
logger.info("Login page detected, filling in credentials...")
# Register client asynchronously
client_metadata = {
"client_name": "Nextcloud MCP Server - Playwright Tests",
"redirect_uris": [callback_url],
"token_endpoint_auth_method": "client_secret_post",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"scope": "openid profile email",
}
# Wait for login form
await page.wait_for_selector('input[name="user"]', timeout=10000)
reg_response = await http_client.post(
registration_endpoint,
json=client_metadata,
headers={"Content-Type": "application/json"},
)
reg_response.raise_for_status()
client_info_dict = reg_response.json()
# Fill in username and password
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
client_id = client_info_dict["client_id"]
client_secret = client_info_dict["client_secret"]
logger.debug("Credentials filled, submitting login form...")
# Construct authorization URL
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={callback_url}&"
f"scope=openid%20profile%20email"
)
# Submit the form
await page.click('button[type="submit"]')
logger.info("Opening browser for OAuth authorization...")
# Async browser automation using pytest-playwright's browser fixture
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
# Navigate to authorization URL
logger.debug(f"Navigating to: {auth_url}")
await page.goto(auth_url, wait_until="networkidle", timeout=30000)
# Check if we need to login first
# Wait for navigation after login
await page.wait_for_load_state("networkidle", timeout=30000)
current_url = page.url
logger.debug(f"Current URL after navigation: {current_url}")
logger.info(f"After login, current URL: {current_url}")
# If we're on a login page, fill in credentials
if "/login" in current_url or "/index.php/login" in current_url:
logger.info("Login page detected, filling in credentials...")
# Wait for login form
await page.wait_for_selector('input[name="user"]', timeout=10000)
# Fill in username and password
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
logger.debug("Credentials filled, submitting login form...")
# Submit the form
await page.click('button[type="submit"]')
# Wait for navigation after login
await page.wait_for_load_state("networkidle", timeout=30000)
current_url = page.url
logger.info(f"After login, current URL: {current_url}")
# Now we should be on the OAuth authorization/consent page or already redirected
# Check if there's an authorize button to click
try:
# Look for common authorization button patterns
authorize_button = await page.query_selector(
'button:has-text("Authorize"), button:has-text("Allow"), input[type="submit"][value*="uthoriz"]'
)
if authorize_button:
logger.info(
"Authorization consent page detected, clicking authorize..."
)
await authorize_button.click()
await page.wait_for_load_state("networkidle", timeout=10000)
current_url = page.url
logger.debug(f"After authorization, current_url: {current_url}")
except Exception as e:
logger.debug(
f"No authorization button found or already authorized: {e}"
)
# Wait for redirect to callback URL (which will fail to load, but we just need the URL)
try:
# The redirect might fail since localhost:9999 isn't actually running
# But we can still extract the code from the URL
await page.wait_for_url(f"{callback_url}*", timeout=10000)
except Exception as e:
# Expected - the callback URL won't load, but we should have the URL
logger.debug(f"Callback redirect (expected to fail): {e}")
# Extract auth code from URL
final_url = page.url
logger.debug(f"Final URL: {final_url}")
parsed_url = urlparse(final_url)
query_params = parse_qs(parsed_url.query)
auth_code = query_params.get("code", [None])[0]
if not auth_code:
# Take a screenshot for debugging
screenshot_path = "/tmp/playwright_oauth_error.png"
await page.screenshot(path=screenshot_path)
logger.error(f"Screenshot saved to {screenshot_path}")
raise ValueError(
f"No authorization code found in redirect URL: {final_url}"
)
logger.info(
f"Successfully extracted authorization code: {auth_code[:20]}..."
# Now we should be on the OAuth authorization/consent page or already redirected
# Check if there's an authorize button to click
try:
# Look for common authorization button patterns
authorize_button = await page.query_selector(
'button:has-text("Authorize"), button:has-text("Allow"), input[type="submit"][value*="uthoriz"]'
)
finally:
await context.close()
if authorize_button:
logger.info(
"Authorization consent page detected, clicking authorize..."
)
await authorize_button.click()
await page.wait_for_load_state("networkidle", timeout=10000)
current_url = page.url
logger.debug(f"After authorization, current_url: {current_url}")
except Exception as e:
logger.debug(f"No authorization button found or already authorized: {e}")
# Exchange authorization code for access token
logger.info("Exchanging authorization code for access token...")
# Wait for redirect to callback URL (which will fail to load, but we just need the URL)
try:
# The redirect might fail since localhost:9999 isn't actually running
# But we can still extract the code from the URL
await page.wait_for_url(f"{callback_url}*", timeout=10000)
except Exception as e:
# Expected - the callback URL won't load, but we should have the URL
logger.debug(f"Callback redirect (expected to fail): {e}")
# Extract auth code from URL
final_url = page.url
logger.debug(f"Final URL: {final_url}")
parsed_url = urlparse(final_url)
query_params = parse_qs(parsed_url.query)
auth_code = query_params.get("code", [None])[0]
if not auth_code:
# Take a screenshot for debugging
screenshot_path = "/tmp/playwright_oauth_error.png"
await page.screenshot(path=screenshot_path)
logger.error(f"Screenshot saved to {screenshot_path}")
raise ValueError(
f"No authorization code found in redirect URL: {final_url}"
)
logger.info(f"Successfully extracted authorization code: {auth_code[:20]}...")
finally:
await context.close()
# Exchange authorization code for access token
logger.info("Exchanging authorization code for access token...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
token_response = await http_client.post(
token_endpoint,
data={
@@ -1111,3 +1124,405 @@ async def nc_mcp_oauth_client_playwright(
logger.warning(
f"Error closing Playwright OAuth streamable HTTP client: {e}"
)
@pytest.fixture(scope="session")
async def test_users_setup(nc_client: NextcloudClient):
"""
Create test users for multi-user OAuth testing.
Creates four test users:
- alice: Owner role, creates resources
- bob: Viewer role, read-only access
- charlie: Editor role, can edit (in 'editors' group)
- diana: No-access role, no shares
"""
test_user_configs = {
"alice": {
"password": "AliceSecurePass123!",
"email": "alice@example.com",
"display_name": "Alice Owner",
"groups": [],
},
"bob": {
"password": "BobSecurePass456!",
"email": "bob@example.com",
"display_name": "Bob Viewer",
"groups": [],
},
"charlie": {
"password": "CharlieSecurePass789!",
"email": "charlie@example.com",
"display_name": "Charlie Editor",
"groups": ["editors"],
},
"diana": {
"password": "DianaSecurePass012!",
"email": "diana@example.com",
"display_name": "Diana NoAccess",
"groups": [],
},
}
logger.info("Creating test users for multi-user OAuth testing...")
created_users = []
try:
# Create the 'editors' group first (charlie needs it)
try:
# Use admin nc_client to create the group via User API
# First, try to create it (will fail if exists, but that's okay)
async with httpx.AsyncClient() as http_client:
base_url = str(nc_client._client.base_url)
# Get password from environment since nc_client doesn't expose it
password = os.getenv("NEXTCLOUD_PASSWORD")
response = await http_client.post(
f"{base_url}/ocs/v2.php/cloud/groups",
auth=(nc_client.username, password),
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data={"groupid": "editors"},
)
if response.status_code in [
200,
409,
]: # 200 = created, 409 = already exists
logger.info("Editors group ready")
else:
logger.warning(
f"Group creation returned {response.status_code}: {response.text}"
)
except Exception as e:
logger.warning(f"Error creating editors group (may already exist): {e}")
# Create each test user
for username, config in test_user_configs.items():
try:
await nc_client.users.create_user(
userid=username,
password=config["password"],
display_name=config["display_name"],
email=config["email"],
)
logger.info(f"Created test user: {username}")
created_users.append(username)
# Add user to groups if specified
for group in config["groups"]:
try:
await nc_client.users.add_user_to_group(username, group)
logger.info(f"Added {username} to group {group}")
except Exception as e:
logger.warning(f"Error adding {username} to group {group}: {e}")
except Exception as e:
# User might already exist, that's okay
logger.warning(
f"Could not create user {username} (may already exist): {e}"
)
created_users.append(username) # Add to list anyway for cleanup
logger.info(f"Test users setup complete: {created_users}")
yield test_user_configs
finally:
# Cleanup: delete test users
logger.info("Cleaning up test users...")
for username in created_users:
try:
await nc_client.users.delete_user(username)
logger.info(f"Deleted test user: {username}")
except Exception as e:
logger.warning(f"Error deleting test user {username}: {e}")
async def _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, username: str, password: str
) -> str:
"""
Helper function to get OAuth access token for a user via Playwright.
Uses shared OAuth client credentials to authenticate multiple users with the same client.
Args:
browser: Playwright browser instance
shared_oauth_client_credentials: Tuple of (client_id, client_secret, callback_url, token_endpoint, authorization_endpoint)
username: Username to authenticate as
password: Password for the user
Returns:
OAuth access token string
"""
from urllib.parse import parse_qs, urlparse
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
if not nextcloud_host:
pytest.skip("OAuth requires NEXTCLOUD_HOST")
# Unpack shared client credentials
client_id, client_secret, callback_url, token_endpoint, authorization_endpoint = (
shared_oauth_client_credentials
)
logger.info(f"Getting OAuth token for user: {username}...")
logger.info(f"Using shared OAuth client: {client_id[:16]}...")
# Construct authorization URL
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={callback_url}&"
f"scope=openid%20profile%20email"
)
logger.info(f"Performing browser OAuth flow for {username}...")
# Browser automation
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
await page.goto(auth_url, wait_until="networkidle", timeout=30000)
current_url = page.url
# Login if needed
if "/login" in current_url or "/index.php/login" in current_url:
logger.info(f"Logging in as {username}...")
await page.wait_for_selector('input[name="user"]', timeout=10000)
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
await page.click('button[type="submit"]')
await page.wait_for_load_state("networkidle", timeout=30000)
current_url = page.url
# Handle OAuth consent if present
try:
authorize_button = await page.query_selector(
'button:has-text("Authorize"), button:has-text("Allow"), input[type="submit"][value*="uthoriz"]'
)
if authorize_button:
logger.info(f"Authorizing for {username}...")
await authorize_button.click()
await page.wait_for_load_state("networkidle", timeout=10000)
except Exception as e:
logger.debug(f"No authorization needed for {username}: {e}")
# Wait for redirect and extract auth code
try:
await page.wait_for_url(f"{callback_url}*", timeout=10000)
except Exception:
pass # Expected - callback won't load
final_url = page.url
parsed_url = urlparse(final_url)
query_params = parse_qs(parsed_url.query)
auth_code = query_params.get("code", [None])[0]
if not auth_code:
raise ValueError(
f"No authorization code found for {username} in URL: {final_url}"
)
logger.info(f"Got auth code for {username}: {auth_code[:20]}...")
finally:
await context.close()
# Exchange code for token
logger.info(f"Exchanging auth code for access token ({username})...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
token_response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": callback_url,
"client_id": client_id,
"client_secret": client_secret,
},
)
token_response.raise_for_status()
token_data = token_response.json()
access_token = token_data.get("access_token")
if not access_token:
raise ValueError(f"No access_token for {username}: {token_data}")
logger.info(f"Successfully obtained OAuth token for {username}")
return access_token
# Session-scoped OAuth token fixtures to avoid re-registering clients
@pytest.fixture(scope="session")
async def alice_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
"""OAuth token for alice (cached for session). Uses shared OAuth client."""
config = test_users_setup["alice"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "alice", config["password"]
)
@pytest.fixture(scope="session")
async def bob_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
"""OAuth token for bob (cached for session). Uses shared OAuth client."""
config = test_users_setup["bob"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "bob", config["password"]
)
@pytest.fixture(scope="session")
async def charlie_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
"""OAuth token for charlie (cached for session). Uses shared OAuth client."""
config = test_users_setup["charlie"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "charlie", config["password"]
)
@pytest.fixture(scope="session")
async def diana_oauth_token(
browser, shared_oauth_client_credentials, test_users_setup
) -> str:
"""OAuth token for diana (cached for session). Uses shared OAuth client."""
config = test_users_setup["diana"]
return await _get_oauth_token_for_user(
browser, shared_oauth_client_credentials, "diana", config["password"]
)
@pytest.fixture(scope="session")
async def alice_mcp_client(alice_oauth_token) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as alice (owner role)."""
token = alice_oauth_token
# Create MCP client session with proper lifecycle management
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Alice MCP client session initialized")
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing alice session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing alice streamable context: {e}")
@pytest.fixture(scope="session")
async def bob_mcp_client(bob_oauth_token) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as bob (viewer role)."""
token = bob_oauth_token
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Bob MCP client session initialized")
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing bob session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing bob streamable context: {e}")
@pytest.fixture(scope="session")
async def charlie_mcp_client(charlie_oauth_token) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as charlie (editor role, in 'editors' group)."""
token = charlie_oauth_token
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Charlie MCP client session initialized")
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing charlie session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing charlie streamable context: {e}")
@pytest.fixture(scope="session")
async def diana_mcp_client(diana_oauth_token) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as diana (no-access role)."""
token = diana_oauth_token
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Diana MCP client session initialized")
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing diana session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing diana streamable context: {e}")
+358
View File
@@ -0,0 +1,358 @@
"""
Multi-user OAuth tests for Nextcloud Deck board permissions.
Tests verify that the MCP server respects Nextcloud Deck board ACL permissions
when accessed via OAuth authentication with different users.
"""
import json
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def add_board_acl(nc_client, board_id: int, user: str, permission_type: int = 0):
"""
Helper to add ACL entry to a Deck board.
Args:
nc_client: Admin NextcloudClient
board_id: Board ID
user: Username to grant access
permission_type: 0=view, 1=edit, 2=manage
Returns:
ACL entry ID
"""
acl = await nc_client.deck.add_acl_rule(
board_id=board_id,
type=0, # 0 = user, 1 = group
participant=user,
permission_edit=permission_type >= 1,
permission_share=permission_type >= 2,
permission_manage=permission_type >= 2,
)
logger.info(f"Added ACL for board {board_id}: {user} (type={permission_type})")
return acl.id
async def delete_board_acl(nc_client, board_id: int, acl_id: int):
"""Helper to delete a board ACL entry."""
await nc_client.deck.delete_acl_rule(board_id, acl_id)
logger.info(f"Deleted ACL {acl_id} from board {board_id}")
@pytest.mark.asyncio
async def test_deck_board_view_permissions(
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that Deck boards respect view permissions.
Scenario:
1. Admin creates a board as alice
2. Admin adds bob to board with view-only permissions
3. Bob can view the board via MCP tools
4. Diana cannot access the board (no ACL entry)
"""
# Create a board as alice
logger.info("Creating Deck board as alice...")
board = await nc_client.deck.create_board(
"Alice's Shared Board - View Test", "FF0000"
)
board_id = board.id
bob_acl_id = None
try:
# Add bob to board with view-only permission
logger.info("Adding bob to board with view permission...")
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
# Test: Bob can view the board via MCP
logger.info("Bob attempting to list boards via MCP...")
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Bob can see {len(response_data)} boards: {board_ids}")
# Bob should see the shared board
if board_id in board_ids:
logger.info(f"Bob can see shared board {board_id}")
else:
logger.warning(f"Bob cannot see shared board {board_id}")
else:
logger.warning(f"Bob could not list boards: {result.content}")
# Test: Diana cannot see the board
logger.info("Diana attempting to list boards via MCP...")
result = await diana_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Diana can see {len(response_data)} boards")
# Diana should NOT see the board
assert board_id not in board_ids, "Diana should not see board without ACL"
logger.info("Diana correctly cannot see board without ACL")
else:
logger.warning(f"Diana could not list boards: {result.content}")
finally:
# Cleanup
if bob_acl_id:
await delete_board_acl(nc_client, board_id, bob_acl_id)
logger.info(f"Deleting board {board_id}")
await nc_client.deck.delete_board(board_id)
@pytest.mark.asyncio
async def test_deck_board_edit_permissions(
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that Deck boards respect edit permissions.
Scenario:
1. Admin creates a board as alice with a stack
2. Admin adds charlie with edit permission
3. Admin adds bob with view-only permission
4. Charlie can create cards via MCP tools
5. Bob cannot create cards
"""
# Create a board as alice
logger.info("Creating Deck board as alice...")
board = await nc_client.deck.create_board(
"Alice's Shared Board - Edit Test", "00FF00"
)
board_id = board.id
# Create a stack in the board
logger.info("Creating stack in board...")
stack = await nc_client.deck.create_stack(board_id, "Test Stack", 1)
stack_id = stack.id
charlie_acl_id = None
bob_acl_id = None
try:
# Add charlie with edit permission
logger.info("Adding charlie to board with edit permission...")
charlie_acl_id = await add_board_acl(
nc_client, board_id, "charlie", permission_type=1
)
# Add bob with view-only permission
logger.info("Adding bob to board with view permission...")
bob_acl_id = await add_board_acl(nc_client, board_id, "bob", permission_type=0)
# Test: Charlie can create a card
logger.info("Charlie attempting to create card via MCP...")
result = await charlie_mcp_client.call_tool(
"deck_create_card",
arguments={
"board_id": board_id,
"stack_id": stack_id,
"title": "Charlie's Card",
"description": "Created by Charlie with edit permission",
},
)
if not result.isError:
response_data = json.loads(result.content[0].text)
card_id = response_data.get("id")
logger.info(f"Charlie successfully created card {card_id}")
# Cleanup the card
await nc_client.deck.delete_card(board_id, stack_id, card_id)
else:
logger.warning(f"Charlie could not create card: {result.content}")
# Test: Bob attempts to create a card (should fail)
logger.info("Bob attempting to create card via MCP...")
result = await bob_mcp_client.call_tool(
"deck_create_card",
arguments={
"board_id": board_id,
"stack_id": stack_id,
"title": "Bob's Card",
"description": "Bob trying to create a card",
},
)
if result.isError:
logger.info("Bob correctly denied card creation (view-only)")
else:
logger.warning("Bob unexpectedly succeeded in creating card")
# Cleanup if bob somehow created a card
response_data = json.loads(result.content[0].text)
if "id" in response_data:
await nc_client.deck.delete_card(
board_id, stack_id, response_data["id"]
)
finally:
# Cleanup
if charlie_acl_id:
await delete_board_acl(nc_client, board_id, charlie_acl_id)
if bob_acl_id:
await delete_board_acl(nc_client, board_id, bob_acl_id)
logger.info(f"Deleting board {board_id}")
await nc_client.deck.delete_board(board_id)
@pytest.mark.asyncio
async def test_deck_board_manage_permissions(
nc_client, alice_mcp_client, charlie_mcp_client
):
"""
Test that Deck boards respect manage permissions.
Scenario:
1. Admin creates a board as alice
2. Admin adds charlie with manage permission
3. Charlie can create stacks and modify board settings
"""
# Create a board as alice
logger.info("Creating Deck board as alice...")
board = await nc_client.deck.create_board(
"Alice's Shared Board - Manage Test", "0000FF"
)
board_id = board.id
charlie_acl_id = None
try:
# Add charlie with manage permission
logger.info("Adding charlie to board with manage permission...")
charlie_acl_id = await add_board_acl(
nc_client, board_id, "charlie", permission_type=2
)
# Test: Charlie can create a stack
logger.info("Charlie attempting to create stack via MCP...")
result = await charlie_mcp_client.call_tool(
"deck_create_stack",
arguments={"board_id": board_id, "title": "Charlie's Stack", "order": 1},
)
if not result.isError:
response_data = json.loads(result.content[0].text)
stack_id = response_data.get("id")
logger.info(f"Charlie successfully created stack {stack_id}")
# Cleanup the stack
await nc_client.deck.delete_stack(board_id, stack_id)
else:
logger.warning(f"Charlie could not create stack: {result.content}")
# Test: Charlie can delete a stack (manage permission)
logger.info("Charlie attempting to delete stack via MCP...")
# First create a temporary stack to delete
temp_stack = await nc_client.deck.create_stack(
board_id, "Temp Stack for Deletion", 99
)
result = await charlie_mcp_client.call_tool(
"deck_delete_stack",
arguments={"board_id": board_id, "stack_id": temp_stack.id},
)
if not result.isError:
logger.info("Charlie successfully deleted stack")
else:
logger.warning(f"Charlie could not delete stack: {result.content}")
# Cleanup if deletion via MCP failed
try:
await nc_client.deck.delete_stack(board_id, temp_stack.id)
except Exception:
pass
finally:
# Cleanup
if charlie_acl_id:
await delete_board_acl(nc_client, board_id, charlie_acl_id)
logger.info(f"Deleting board {board_id}")
await nc_client.deck.delete_board(board_id)
@pytest.mark.asyncio
async def test_deck_user_isolation(nc_client, alice_mcp_client, bob_mcp_client):
"""
Test that users can only see their own boards when not shared.
Scenario:
1. Admin creates a board as alice (not shared)
2. Admin creates a board as bob (not shared)
3. Alice can only see her own board
4. Bob can only see his own board
"""
# Create alice's board
logger.info("Creating alice's private board...")
alice_board = await nc_client.deck.create_board("Alice's Private Board", "FF00FF")
alice_board_id = alice_board.id
# Create bob's board
logger.info("Creating bob's private board...")
bob_board = await nc_client.deck.create_board("Bob's Private Board", "00FFFF")
bob_board_id = bob_board.id
try:
# Test: Alice lists boards
logger.info("Alice listing boards via MCP...")
result = await alice_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Alice can see boards: {board_ids}")
# Alice should NOT see Bob's board
assert bob_board_id not in board_ids, (
"Alice should not see Bob's private board"
)
else:
logger.warning(f"Alice could not list boards: {result.content}")
# Test: Bob lists boards
logger.info("Bob listing boards via MCP...")
result = await bob_mcp_client.call_tool("deck_get_boards", arguments={})
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list of boards
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
board_ids = [b["id"] for b in response_data]
logger.info(f"Bob can see boards: {board_ids}")
# Bob should NOT see Alice's board
assert alice_board_id not in board_ids, (
"Bob should not see Alice's private board"
)
else:
logger.warning(f"Bob could not list boards: {result.content}")
logger.info("User isolation test passed: users can only see their own boards")
finally:
# Cleanup
logger.info("Cleaning up test boards...")
await nc_client.deck.delete_board(alice_board_id)
await nc_client.deck.delete_board(bob_board_id)
+352
View File
@@ -0,0 +1,352 @@
"""
Multi-user OAuth tests for Nextcloud WebDAV file permissions.
Tests verify that the MCP server respects Nextcloud file sharing permissions
when accessed via OAuth authentication with different users.
"""
import json
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def create_share(nc_client, path: str, share_with: str, permissions: int = 1):
"""
Helper to create a file share using OCS Sharing API.
Args:
nc_client: Admin NextcloudClient
path: Path to file/folder to share
share_with: Username to share with
permissions: Share permissions (1=read, 15=all, 19=read+write+share)
Returns:
Share ID
"""
# Use the authenticated client's internal HTTP client
response = await nc_client._client.post(
"/ocs/v2.php/apps/files_sharing/api/v1/shares",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
data={
"path": path,
"shareType": 0, # 0 = user share
"shareWith": share_with,
"permissions": permissions,
},
)
response.raise_for_status()
data = response.json()
share_id = data["ocs"]["data"]["id"]
logger.info(
f"Created share {share_id}: {path} -> {share_with} (permissions={permissions})"
)
return share_id
async def delete_share(nc_client, share_id: int):
"""Helper to delete a file share."""
response = await nc_client._client.delete(
f"/ocs/v2.php/apps/files_sharing/api/v1/shares/{share_id}",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
response.raise_for_status()
logger.info(f"Deleted share {share_id}")
@pytest.mark.asyncio
async def test_file_share_read_permissions(
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that shared files respect read permissions.
Scenario:
1. Admin creates a file as alice
2. Admin shares the file with bob (read-only)
3. Bob can read the file via MCP tools
4. Diana cannot access the file (no share)
"""
# Create a file as alice
file_path = "/alice_shared_file_read.txt"
file_content = b"This file is shared with Bob for reading only."
logger.info(f"Creating file as alice: {file_path}")
# Note: We're using admin client to create file as alice
# In a real scenario, we'd need to impersonate alice or use alice's OAuth client
await nc_client.webdav.write_file(file_path, file_content)
share_id = None
try:
# Share the file with bob (read-only, permissions=1)
logger.info("Sharing file with bob (read-only)...")
share_id = await create_share(nc_client, file_path, "bob", permissions=1)
# Test: Bob reads the file via MCP
logger.info("Bob attempting to read file via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": file_path}
)
# Bob should be able to read the shared file
if not result.isError:
response_data = json.loads(result.content[0].text)
logger.info(
f"Bob successfully read file: {response_data.get('content', '')[:50]}..."
)
assert "content" in response_data
else:
logger.warning(f"Bob could not read file: {result.content}")
# This might fail if the share path is different for bob
# Test: Diana attempts to read the file
logger.info("Diana attempting to read file via MCP...")
result = await diana_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": file_path}
)
# Diana should NOT be able to read (no share)
if result.isError:
logger.info("Diana correctly denied access to unshared file")
else:
logger.warning("Diana unexpectedly could read unshared file")
finally:
# Cleanup
if share_id:
await delete_share(nc_client, share_id)
logger.info(f"Deleting file {file_path}")
await nc_client.webdav.delete_resource(file_path)
@pytest.mark.asyncio
async def test_file_share_write_permissions(
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that shared files respect write permissions.
Scenario:
1. Admin creates a file as alice
2. Admin shares the file with charlie (edit permission)
3. Admin shares the file with bob (read-only)
4. Charlie can edit the file via MCP tools
5. Bob cannot edit the file
"""
# Create a file as alice
file_path = "/alice_shared_file_write.txt"
file_content = b"This file is shared with Charlie for editing."
logger.info(f"Creating file as alice: {file_path}")
await nc_client.webdav.write_file(file_path, file_content)
charlie_share_id = None
bob_share_id = None
try:
# Share with charlie (read+write, permissions=3)
logger.info("Sharing file with charlie (edit permission)...")
charlie_share_id = await create_share(
nc_client, file_path, "charlie", permissions=3
)
# Share with bob (read-only, permissions=1)
logger.info("Sharing file with bob (read-only)...")
bob_share_id = await create_share(nc_client, file_path, "bob", permissions=1)
# Test: Charlie can write to the file
logger.info("Charlie attempting to write to file via MCP...")
updated_content = (
b"This file is shared with Charlie for editing.\nCharlie added this line."
)
result = await charlie_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": updated_content.decode("utf-8")},
)
if not result.isError:
logger.info("Charlie successfully wrote to file")
else:
logger.warning(f"Charlie could not write to file: {result.content}")
# Test: Bob attempts to write (should fail)
logger.info("Bob attempting to write to file via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_write_file",
arguments={"path": file_path, "content": "Bob tries to overwrite this."},
)
# Bob should be denied
if result.isError:
logger.info("Bob correctly denied write access")
else:
logger.warning("Bob unexpectedly succeeded in writing (permissions issue?)")
finally:
# Cleanup
if charlie_share_id:
await delete_share(nc_client, charlie_share_id)
if bob_share_id:
await delete_share(nc_client, bob_share_id)
logger.info(f"Deleting file {file_path}")
await nc_client.webdav.delete_resource(file_path)
@pytest.mark.asyncio
async def test_file_list_permissions(nc_client, alice_mcp_client, bob_mcp_client):
"""
Test that file listing respects share permissions.
Scenario:
1. Admin creates alice's private file
2. Admin creates bob's private file
3. Admin creates a shared file
4. Alice can only list her own files + shared files
5. Bob can only list his own files + shared files
"""
alice_file = "/alice_private_file.txt"
bob_file = "/bob_private_file.txt"
shared_file = "/shared_file.txt"
logger.info("Creating test files...")
await nc_client.webdav.write_file(alice_file, b"Alice's private file")
await nc_client.webdav.write_file(bob_file, b"Bob's private file")
await nc_client.webdav.write_file(shared_file, b"Shared file content")
alice_share_id = None
bob_share_id = None
try:
# Share the shared file with both alice and bob
logger.info("Sharing file with alice and bob...")
alice_share_id = await create_share(
nc_client, shared_file, "alice", permissions=1
)
bob_share_id = await create_share(nc_client, shared_file, "bob", permissions=1)
# Test: Alice lists files in root
logger.info("Alice listing files via MCP...")
result = await alice_mcp_client.call_tool(
"nc_webdav_list_directory", arguments={"path": "/"}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list, not wrapped in a dict
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
file_names = [f["name"] for f in response_data]
logger.info(f"Alice can see files: {file_names}")
# Alice should see her own file and shared file, but not bob's
# Note: This depends on how Nextcloud handles file ownership
else:
logger.warning(f"Alice could not list files: {result.content}")
# Test: Bob lists files in root
logger.info("Bob listing files via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_list_directory", arguments={"path": "/"}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list, not wrapped in a dict
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
file_names = [f["name"] for f in response_data]
logger.info(f"Bob can see files: {file_names}")
# Bob should see his own file and shared file, but not alice's
else:
logger.warning(f"Bob could not list files: {result.content}")
finally:
# Cleanup
if alice_share_id:
await delete_share(nc_client, alice_share_id)
if bob_share_id:
await delete_share(nc_client, bob_share_id)
logger.info("Cleaning up test files...")
await nc_client.webdav.delete_resource(alice_file)
await nc_client.webdav.delete_resource(bob_file)
await nc_client.webdav.delete_resource(shared_file)
@pytest.mark.asyncio
async def test_folder_share_permissions(nc_client, alice_mcp_client, bob_mcp_client):
"""
Test that folder sharing works correctly.
Scenario:
1. Admin creates a folder as alice
2. Admin creates files in the folder
3. Admin shares the folder with bob
4. Bob can access files in the shared folder
"""
folder_path = "/alice_shared_folder"
file_in_folder = f"{folder_path}/document.txt"
file_content = b"This is a document in alice's shared folder"
logger.info(f"Creating folder: {folder_path}")
await nc_client.webdav.create_directory(folder_path)
logger.info(f"Creating file in folder: {file_in_folder}")
await nc_client.webdav.write_file(file_in_folder, file_content)
share_id = None
try:
# Share the folder with bob
logger.info("Sharing folder with bob...")
share_id = await create_share(nc_client, folder_path, "bob", permissions=1)
# Test: Bob lists the shared folder
logger.info("Bob attempting to list shared folder via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_list_directory", arguments={"path": folder_path}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
# The response is directly a list, not wrapped in a dict
if not isinstance(response_data, list):
response_data = [response_data] if response_data else []
logger.info(f"Bob can see {len(response_data)} files in shared folder")
# Bob should see the file in the shared folder
file_names = [f["name"] for f in response_data]
assert "document.txt" in file_names, (
"Bob should see the file in shared folder"
)
else:
logger.warning(f"Bob could not list shared folder: {result.content}")
# Test: Bob reads the file in the shared folder
logger.info("Bob attempting to read file in shared folder via MCP...")
result = await bob_mcp_client.call_tool(
"nc_webdav_read_file", arguments={"path": file_in_folder}
)
if not result.isError:
response_data = json.loads(result.content[0].text)
logger.info("Bob successfully read file in shared folder")
assert "content" in response_data
else:
logger.warning(
f"Bob could not read file in shared folder: {result.content}"
)
finally:
# Cleanup
if share_id:
await delete_share(nc_client, share_id)
logger.info("Cleaning up test folder...")
await nc_client.webdav.delete_resource(folder_path)
@@ -0,0 +1,260 @@
"""
Multi-user OAuth tests for Nextcloud Notes permissions.
Tests verify that the MCP server respects Nextcloud Notes sharing permissions
when accessed via OAuth authentication with different users.
"""
import json
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
@pytest.mark.asyncio
async def test_notes_share_read_permissions(
nc_client, alice_mcp_client, bob_mcp_client, diana_mcp_client
):
"""
Test that shared notes respect read permissions.
Scenario:
1. Admin creates a note as alice
2. Admin shares the note with bob (read-only)
3. Bob can read the note via MCP tools
4. Diana cannot access the note (no share)
"""
# Create a note as alice (using admin client to set up data)
note_title = "Alice's Shared Note - Read Test"
note_content = "This note is shared with Bob for reading only."
note_category = "SharedNotes"
logger.info("Creating note as alice...")
created_note = await nc_client.notes.create_note(
title=note_title, content=note_content, category=note_category
)
note_id = created_note.get("id")
try:
# TODO: Share the note with bob (read-only)
# Note: Nextcloud Notes API doesn't have direct sharing endpoints
# Sharing is typically done at the folder level via WebDAV
# For now, this test documents the expected behavior
# Test: Bob searches for notes via MCP
logger.info("Bob searching for notes via MCP...")
result = await bob_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
)
assert result.isError is False, f"Bob's search failed: {result.content}"
response_data = json.loads(result.content[0].text)
# Bob should see the shared note in search results
# (assuming proper share setup)
assert "results" in response_data
logger.info(f"Bob found {len(response_data['results'])} notes")
# Test: Diana searches for the same note
logger.info("Diana searching for notes via MCP...")
result = await diana_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": "Alice's Shared"}
)
assert result.isError is False
response_data = json.loads(result.content[0].text)
# Diana should NOT see the note (no share)
assert "results" in response_data
shared_note_ids = [
n["id"] for n in response_data["results"] if n["id"] == note_id
]
assert len(shared_note_ids) == 0, "Diana should not see unshared note"
logger.info("Diana correctly cannot see unshared note")
finally:
# Cleanup
logger.info(f"Cleaning up note {note_id}")
await nc_client.notes.delete_note(note_id)
@pytest.mark.asyncio
async def test_notes_share_write_permissions(
nc_client, alice_mcp_client, charlie_mcp_client, bob_mcp_client
):
"""
Test that shared notes respect write permissions.
Scenario:
1. Admin creates a note as alice
2. Admin shares the note with charlie (edit permission)
3. Admin shares the note with bob (read-only)
4. Charlie can edit the note via MCP tools
5. Bob cannot edit the note
"""
# Create a note as alice
note_title = "Alice's Shared Note - Write Test"
note_content = "This note is shared with Charlie for editing."
note_category = "SharedNotes"
logger.info("Creating note as alice...")
created_note = await nc_client.notes.create_note(
title=note_title, content=note_content, category=note_category
)
note_id = created_note.get("id")
try:
# TODO: Share the note with charlie (edit permission) and bob (read-only)
# Note: Nextcloud Notes sharing is folder-based
# Test: Charlie can append content to the note
logger.info("Charlie attempting to append content via MCP...")
result = await charlie_mcp_client.call_tool(
"nc_notes_append_content",
arguments={
"note_id": note_id,
"content": "\n\nCharlie added this content.",
},
)
# If sharing is properly configured, Charlie should succeed
# Without proper sharing setup, this will fail
logger.info(f"Charlie's append result: isError={result.isError}")
if not result.isError:
logger.info("Charlie successfully appended content (shares configured)")
else:
logger.warning("Charlie could not append (shares not yet configured)")
# Test: Bob attempts to append content (should fail)
logger.info("Bob attempting to append content via MCP...")
result = await bob_mcp_client.call_tool(
"nc_notes_append_content",
arguments={"note_id": note_id, "content": "\n\nBob tried to add this."},
)
# Bob should fail (read-only access)
logger.info(f"Bob's append result: isError={result.isError}")
if result.isError:
logger.info("Bob correctly denied write access")
else:
logger.warning("Bob unexpectedly succeeded (permissions issue?)")
finally:
# Cleanup
logger.info(f"Cleaning up note {note_id}")
await nc_client.notes.delete_note(note_id)
@pytest.mark.asyncio
async def test_user_isolation_notes(nc_client, alice_mcp_client, bob_mcp_client):
"""
Test that users can only see their own notes when not shared.
Scenario:
1. Admin creates a note as alice (not shared)
2. Admin creates a note as bob (not shared)
3. Alice can only see her own note
4. Bob can only see his own note
"""
# Create alice's note
logger.info("Creating alice's private note...")
alice_note = await nc_client.notes.create_note(
title="Alice's Private Note",
content="This is Alice's private content.",
category="AlicePrivate",
)
alice_note_id = alice_note.get("id")
# Create bob's note
logger.info("Creating bob's private note...")
bob_note = await nc_client.notes.create_note(
title="Bob's Private Note",
content="This is Bob's private content.",
category="BobPrivate",
)
bob_note_id = bob_note.get("id")
try:
# Test: Alice searches all notes
logger.info("Alice searching all notes via MCP...")
result = await alice_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False
response_data = json.loads(result.content[0].text)
alice_notes = response_data.get("results", [])
alice_note_ids = [n["id"] for n in alice_notes]
logger.info(f"Alice can see {len(alice_notes)} notes")
# Alice should NOT see Bob's note
assert bob_note_id not in alice_note_ids, (
"Alice should not see Bob's private note"
)
# Test: Bob searches all notes
logger.info("Bob searching all notes via MCP...")
result = await bob_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False
response_data = json.loads(result.content[0].text)
bob_notes = response_data.get("results", [])
bob_note_ids = [n["id"] for n in bob_notes]
logger.info(f"Bob can see {len(bob_notes)} notes")
# Bob should NOT see Alice's note
assert alice_note_id not in bob_note_ids, (
"Bob should not see Alice's private note"
)
logger.info("User isolation test passed: users can only see their own notes")
finally:
# Cleanup
logger.info("Cleaning up test notes...")
await nc_client.notes.delete_note(alice_note_id)
await nc_client.notes.delete_note(bob_note_id)
@pytest.mark.asyncio
async def test_oauth_mcp_clients_initialized(
alice_mcp_client, bob_mcp_client, charlie_mcp_client, diana_mcp_client
):
"""
Smoke test to verify all OAuth MCP clients are properly initialized.
"""
logger.info("Testing alice_mcp_client initialization...")
result = await alice_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Alice MCP client failed: {result.content}"
logger.info("Alice MCP client working")
logger.info("Testing bob_mcp_client initialization...")
result = await bob_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Bob MCP client failed: {result.content}"
logger.info("Bob MCP client working")
logger.info("Testing charlie_mcp_client initialization...")
result = await charlie_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Charlie MCP client failed: {result.content}"
logger.info("Charlie MCP client working")
logger.info("Testing diana_mcp_client initialization...")
result = await diana_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Diana MCP client failed: {result.content}"
logger.info("Diana MCP client working")
logger.info("All OAuth MCP clients successfully initialized!")