diff --git a/tests/conftest.py b/tests/conftest.py index 5a5b026..61d953d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2351,32 +2351,41 @@ async def test_users_setup(anyio_backend, nc_client: NextcloudClient): except Exception as e: logger.warning(f"Error creating editors group (may already exist): {e}") - # Create each test user + # Create each test user (idempotent - check if exists first) for username, config in test_user_configs.items(): + # Check if user already exists + user_exists = False try: - await nc_client.users.create_user( - userid=username, - password=config["password"], - display_name=config["display_name"], - email=config["email"], - ) - logger.info(f"Created test user: {username}") - created_users.append(username) + await nc_client.users.get_user_details(username) + user_exists = True + logger.info(f"Test user {username} already exists, skipping creation") + except Exception: + # User doesn't exist, proceed with creation + pass - # Add user to groups if specified - for group in config["groups"]: - try: - await nc_client.users.add_user_to_group(username, group) - logger.info(f"Added {username} to group {group}") - except Exception as e: - logger.warning(f"Error adding {username} to group {group}: {e}") + if not user_exists: + try: + await nc_client.users.create_user( + userid=username, + password=config["password"], + display_name=config["display_name"], + email=config["email"], + ) + logger.info(f"Created test user: {username}") + created_users.append(username) # Only track users WE created - except Exception as e: - # User might already exist, that's okay - logger.warning( - f"Could not create user {username} (may already exist): {e}" - ) - created_users.append(username) # Add to list anyway for cleanup + # Add user to groups if specified + for group in config["groups"]: + try: + await nc_client.users.add_user_to_group(username, group) + logger.info(f"Added {username} to group {group}") + except Exception as e: + logger.warning( + f"Error adding {username} to group {group}: {e}" + ) + + except Exception as e: + logger.warning(f"Could not create user {username}: {e}") logger.info(f"Test users setup complete: {created_users}") yield test_user_configs diff --git a/tests/integration/test_astrolabe_multi_user_background_sync.py b/tests/integration/test_astrolabe_multi_user_background_sync.py index 7045aa1..5640f25 100644 --- a/tests/integration/test_astrolabe_multi_user_background_sync.py +++ b/tests/integration/test_astrolabe_multi_user_background_sync.py @@ -43,8 +43,19 @@ async def login_to_nextcloud(page: Page, username: str, password: str): await page.fill('input[name="user"]', username) await page.fill('input[name="password"]', password) - # Submit form - await page.click('button[type="submit"]') + # Submit form - use force=True to bypass stability check (CSS transitions) + submit_button = page.locator('button[type="submit"]') + try: + await submit_button.click(force=True, timeout=10000) + except Exception: + # Fallback: JavaScript click + logger.info("Using JavaScript click for login button...") + await page.evaluate( + """ + const btn = document.querySelector('button[type="submit"]'); + if (btn) btn.click(); + """ + ) await page.wait_for_load_state("networkidle", timeout=30000) # Verify logged in (should redirect away from login page) @@ -75,6 +86,289 @@ async def navigate_to_astrolabe_settings(page: Page): logger.info("✓ Successfully loaded Astrolabe settings page") +async def authorize_search_access(page: Page, username: str) -> bool: + """Complete Step 1: OAuth Authorization for Astrolabe. + + Handles the OAuth flow: + 1. Check if already authorized (Step 1 shows "Complete") + 2. Click "Authorize" link + 3. Handle Nextcloud OIDC consent screen + 4. Wait for redirect back to Astrolabe settings + 5. Verify "Complete" badge appears on Step 1 + + Args: + page: Playwright page instance (must be on Astrolabe settings page) + username: Username for logging + + Returns: + True if authorization completed successfully + """ + nextcloud_url = "http://localhost:8080" + + logger.info(f"Authorizing search access (Step 1) for {username}...") + + # Check if already on Astrolabe settings page, if not navigate there + if "/settings/user/astrolabe" not in page.url: + await navigate_to_astrolabe_settings(page) + + # Wait for page to fully render + await anyio.sleep(1) + + # Check if already authorized (either "Active" badge or Step 1 "Complete" badge) + try: + # Check for "Active" badge (fully configured state) + active_badge = page.get_by_text("Active", exact=True) + if await active_badge.count() > 0 and await active_badge.is_visible(): + logger.info(f"✓ Already fully authorized for {username} (Active badge)") + return True + except Exception: + pass + + try: + step1_section = page.locator('h4:has-text("Step 1")') + if await step1_section.count() > 0: + # Look for "Complete" text in the Step 1 section's parent + step1_parent = step1_section.locator("..") + complete_badge = step1_parent.get_by_text("Complete", exact=True) + if await complete_badge.count() > 0 and await complete_badge.is_visible(): + logger.info(f"✓ Step 1 already complete for {username}") + return True + except Exception: + pass + + # Find and click the "Authorize" button + authorize_button = page.locator('a.button.primary:has-text("Authorize")') + + try: + await authorize_button.wait_for(timeout=5000, state="visible") + logger.info(f"Found Authorize button for {username}") + except Exception: + # Take screenshot for debugging + screenshot_path = f"/tmp/astrolabe_no_authorize_button_{username}.png" + await page.screenshot(path=screenshot_path) + logger.error( + f"Could not find Authorize button for {username}. Screenshot: {screenshot_path}" + ) + raise ValueError(f"Authorize button not found for {username}") + + # Click the Authorize button - this will redirect to OAuth provider + # Use force=True to bypass stability check which can timeout due to CSS transitions + await authorize_button.click(force=True) + logger.info(f"Clicked Authorize button for {username}") + + # Wait for OAuth redirect to complete + await page.wait_for_load_state("networkidle", timeout=30000) + logger.info(f"After networkidle, current URL: {page.url}") + + # Take screenshot to see current state + await page.screenshot(path=f"/tmp/astrolabe_after_authorize_{username}.png") + logger.info(f"Screenshot saved: /tmp/astrolabe_after_authorize_{username}.png") + + # Handle OIDC consent screen if present + consent_handled = await _handle_oauth_consent_screen(page, username) + if consent_handled: + logger.info(f"✓ OAuth consent granted for {username}") + else: + logger.info( + f"No consent screen required for {username} (may be previously authorized)" + ) + + # Wait for redirect back to Astrolabe settings + # The OAuth callback will redirect back to /settings/user/astrolabe + try: + await page.wait_for_url( + f"**{nextcloud_url}/settings/user/astrolabe**", timeout=30000 + ) + logger.info(f"Redirected back to Astrolabe settings for {username}") + except Exception: + # Check if we're already on settings page + if "/settings/user/astrolabe" not in page.url: + logger.warning( + f"Not redirected to Astrolabe settings, current URL: {page.url}" + ) + # Navigate manually + await page.goto( + f"{nextcloud_url}/settings/user/astrolabe", wait_until="networkidle" + ) + + # Wait for page to reload and render + await anyio.sleep(2) + + # Verify authorization completed - check for various success indicators + # When fully configured, shows "Active" badge; when only Step 1 done, shows "Complete" + try: + # First check if "Active" badge is shown (fully configured state) + active_badge = page.get_by_text("Active", exact=True) + if await active_badge.count() > 0 and await active_badge.is_visible(): + logger.info(f"✓ OAuth authorization complete for {username} (Active badge)") + return True + except Exception: + pass + + try: + # Check for Step 1 "Complete" badge (partial configuration) + step1_section = page.locator('h4:has-text("Step 1")') + if await step1_section.count() > 0: + step1_parent = step1_section.locator("..") + complete_badge = step1_parent.get_by_text("Complete", exact=True) + await complete_badge.wait_for(timeout=5000, state="visible") + logger.info(f"✓ Step 1 OAuth authorization complete for {username}") + return True + except Exception: + pass + + # Neither badge found - authorization failed + screenshot_path = f"/tmp/astrolabe_step1_not_complete_{username}.png" + await page.screenshot(path=screenshot_path) + logger.error( + f"Authorization badge not visible for {username}. Screenshot: {screenshot_path}" + ) + raise ValueError(f"OAuth authorization did not complete for {username}") + + +async def _handle_oauth_consent_screen(page: Page, username: str) -> bool: + """Handle the OIDC consent screen during OAuth flow. + + Reuses the proven pattern from tests/conftest.py. + + Args: + page: Playwright page instance + username: Username for logging + + Returns: + True if consent was handled, False if no consent screen was found + """ + try: + logger.info(f"Checking for consent screen at URL: {page.url}") + + # Check if consent screen is present - try multiple selectors + # The consent screen may be #oidc-consent or use a different format + consent_div = await page.query_selector("#oidc-consent") + + if consent_div: + logger.info(f"Consent screen detected via #oidc-consent for {username}") + # Get consent screen data attributes for logging + client_name = await consent_div.get_attribute("data-client-name") + scopes_attr = await consent_div.get_attribute("data-scopes") + logger.info(f" Client: {client_name}") + logger.info(f" Requested scopes: {scopes_attr}") + else: + # Check for Allow button directly (different consent screen format) + allow_button = page.locator('button:has-text("Allow")') + if await allow_button.count() > 0: + logger.info(f"Consent screen detected via Allow button for {username}") + else: + logger.info(f"No consent screen found for {username} at {page.url}") + await page.screenshot(path=f"/tmp/no_consent_screen_{username}.png") + logger.info(f"Screenshot: /tmp/no_consent_screen_{username}.png") + return False + + # Wait for Vue.js to render the Allow button + try: + await page.wait_for_selector('button:has-text("Allow")', timeout=10000) + logger.info(" Allow button rendered by Vue.js") + except Exception as e: + screenshot_path = f"/tmp/consent_no_allow_button_{username}.png" + await page.screenshot(path=screenshot_path) + logger.error(f" Timeout waiting for Allow button: {e}") + raise + + # Check all scope checkboxes + scope_checkboxes = await page.query_selector_all('input[type="checkbox"]') + if scope_checkboxes: + logger.info(f" Found {len(scope_checkboxes)} scope checkboxes") + for i, checkbox in enumerate(scope_checkboxes): + is_checked = await checkbox.is_checked() + is_disabled = await checkbox.is_disabled() + if not is_checked and not is_disabled: + await checkbox.check() + logger.info(f" ✓ Checked scope checkbox {i + 1}") + + # Click the Allow button using JavaScript (handles viewport issues) + allow_button_locator = page.locator('button:has-text("Allow")') + + # Debug: take screenshot before clicking Allow + await page.screenshot(path=f"/tmp/consent_before_allow_{username}.png") + logger.info( + f" Screenshot before Allow: /tmp/consent_before_allow_{username}.png" + ) + + button_count = await allow_button_locator.count() + logger.info(f" Found {button_count} Allow button(s)") + + if button_count > 0: + current_url = page.url + logger.info(f" Current URL: {current_url}") + logger.info(f" Clicking Allow button for {username}...") + + # Use JavaScript click to handle consent buttons (proven pattern from conftest.py) + # This is more reliable than Playwright's click for Vue.js rendered buttons + await page.evaluate( + """ + const buttons = document.querySelectorAll('button'); + for (const btn of buttons) { + if (btn.textContent.trim() === 'Allow') { + btn.click(); + break; + } + } + """ + ) + + # Wait for URL to change (Vue.js uses window.location.href after fetch) + # networkidle doesn't detect fetch-based redirects + try: + await page.wait_for_url( + lambda url: url != current_url, + timeout=30000, + ) + logger.info(f" URL changed to: {page.url}") + except Exception as wait_error: + # If URL didn't change, check console for errors + logger.warning(f" URL didn't change after click: {wait_error}") + await page.screenshot(path=f"/tmp/consent_after_allow_{username}.png") + + # Try alternative: manually POST consent and navigate + logger.info(" Trying manual consent submission...") + try: + redirect_url = await page.evaluate( + """ + async () => { + const selectedScopes = Array.from(document.querySelectorAll('input[type="checkbox"]:checked')) + .map(cb => cb.value).join(' '); + + const response = await fetch('/index.php/apps/oidc/consent/grant', { + method: 'POST', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'requesttoken': OC.requestToken, + }, + body: 'scopes=' + encodeURIComponent(selectedScopes), + redirect: 'follow', + }); + + return response.url || '/index.php/apps/oidc/authorize'; + } + """ + ) + logger.info(f" Manual consent returned URL: {redirect_url}") + await page.goto(redirect_url, wait_until="networkidle") + except Exception as manual_error: + logger.error(f" Manual consent also failed: {manual_error}") + raise + + await page.screenshot(path=f"/tmp/consent_after_allow_{username}.png") + logger.info(f" Consent granted for {username}") + return True + else: + logger.error(f" Allow button not found for {username}") + return False + + except Exception as e: + logger.error(f"Error handling consent screen for {username}: {e}") + raise + + async def generate_app_password( page: Page, username: str, app_name: str = "Astrolabe Background Sync" ) -> str: @@ -105,16 +399,32 @@ async def generate_app_password( await anyio.sleep(1.0) logger.info("Waited for Vue.js to process input and enable button") - # Click the create button + # Click the create button - use force=True to bypass stability check (CSS transitions) create_button = page.locator( 'button[type="submit"]:has-text("Create new app password")' ) - await create_button.click() + try: + await create_button.click(force=True, timeout=10000) + except Exception: + # Fallback: JavaScript click + logger.info("Using JavaScript click for create button...") + await page.evaluate( + """ + const btn = document.querySelector('button[type="submit"]'); + if (btn) btn.click(); + """ + ) logger.info("Clicked create app password button") # Wait for app password to be generated and displayed in the dialog await anyio.sleep(3) # Give it more time to generate and display + # Debug screenshot after clicking create + await page.screenshot(path=f"/tmp/app_password_after_create_{username}.png") + logger.info( + f"Screenshot after create: /tmp/app_password_after_create_{username}.png" + ) + # Find the Login input field which should have the username value # Then find the Password input field which is in the same form app_password = None @@ -172,11 +482,11 @@ async def generate_app_password( f"✓ Generated app password for {username}: {app_password[:10]}... (validated)" ) - # Close the dialog by clicking the Close button - close_button = page.get_by_role("button", name="Close") - await close_button.click() + # Close dialog with Escape key (bypasses CSS layout issues with h2 intercepting clicks) + logger.info("Closing app password dialog with Escape key...") + await page.keyboard.press("Escape") + await anyio.sleep(0.5) # Wait for dialog close animation logger.info("Closed app password dialog") - await anyio.sleep(0.5) return app_password @@ -226,9 +536,9 @@ async def enable_background_sync_via_app_password( # Wait for page to load await anyio.sleep(1) - # Check if already active (look for "Active" text in the Background Sync Access section) + # Check if already complete (look for Step 2 "Complete" badge or overall "Active" state) try: - # The "Active" badge appears as a with text "Active" + # First check for overall "Active" badge (both steps complete) active_text = page.get_by_text("Active", exact=True) if await active_text.is_visible(timeout=2000): logger.info(f"✓ Background sync already active for {username}") @@ -236,6 +546,18 @@ async def enable_background_sync_via_app_password( except Exception: pass + try: + # Check for Step 2 "Complete" badge (app password already set) + step2_section = page.locator('h4:has-text("Step 2")') + if await step2_section.count() > 0: + step2_parent = step2_section.locator("..") + complete_badge = step2_parent.get_by_text("Complete", exact=True) + if await complete_badge.count() > 0 and await complete_badge.is_visible(): + logger.info(f"✓ Step 2 (app password) already complete for {username}") + return True + except Exception: + pass + # Find the app password input field using the placeholder text # Based on manual testing: textbox with placeholder "xxxxx-xxxxx-xxxxx-xxxxx-xxxxx" app_password_input = page.get_by_placeholder("xxxxx-xxxxx-xxxxx-xxxxx-xxxxx") @@ -319,21 +641,120 @@ async def enable_background_sync_via_app_password( except Exception: pass - # Verify "Active" text appears after reload + # Verify Step 2 "Complete" badge or overall "Active" badge appears after reload + try: + # First try to find "Active" badge (both steps complete) + active_text = page.get_by_text("Active", exact=True) + if await active_text.count() > 0: + await active_text.wait_for(timeout=5000, state="visible") + logger.info( + f"✓ Background sync enabled for {username} - Active badge visible" + ) + return True + except Exception: + pass + + try: + # Check for Step 2 "Complete" badge + step2_section = page.locator('h4:has-text("Step 2")') + if await step2_section.count() > 0: + step2_parent = step2_section.locator("..") + complete_badge = step2_parent.get_by_text("Complete", exact=True) + await complete_badge.wait_for(timeout=5000, state="visible") + logger.info( + f"✓ Step 2 (app password) enabled for {username} - Complete badge visible" + ) + return True + except Exception: + pass + + # If neither badge found, raise error + screenshot_path = f"/tmp/astrolabe_after_password_{username}.png" + await page.screenshot(path=screenshot_path) + logger.error( + f"Neither Active nor Complete badge appeared for {username}. " + f"Screenshot: {screenshot_path}" + ) + raise ValueError(f"Background sync setup did not complete for {username}") + + +async def complete_astrolabe_authorization( + page: Page, username: str, password: str +) -> dict: + """Complete full Astrolabe two-step authorization. + + Performs the complete authorization flow: + 1. Navigate to Astrolabe settings + 2. OAuth authorization (Step 1) if needed + 3. Generate app password in Security settings + 4. App password entry (Step 2) if needed + + Args: + page: Playwright page instance (must be logged in) + username: Nextcloud username + password: Nextcloud password (for reference, not used directly) + + Returns: + Dict with {"step1": bool, "step2": bool, "app_password": str | None} + """ + logger.info(f"Starting full Astrolabe authorization for {username}...") + + result = {"step1": False, "step2": False, "app_password": None} + + # Navigate to Astrolabe settings + await navigate_to_astrolabe_settings(page) + + # Step 1: OAuth authorization + try: + result["step1"] = await authorize_search_access(page, username) + logger.info(f"✓ Step 1 complete for {username}") + except Exception as e: + logger.error(f"Step 1 failed for {username}: {e}") + raise + + # Navigate back to settings if needed (OAuth might have redirected elsewhere) + if "/settings/user/astrolabe" not in page.url: + await navigate_to_astrolabe_settings(page) + + # Check if Step 2 is already complete + try: + step2_section = page.locator('h4:has-text("Step 2")') + if await step2_section.count() > 0: + step2_parent = step2_section.locator("..") + complete_badge = step2_parent.get_by_text("Complete", exact=True) + if await complete_badge.count() > 0 and await complete_badge.is_visible(): + logger.info(f"✓ Step 2 already complete for {username}") + result["step2"] = True + return result + except Exception: + pass + + # Also check for overall "Active" badge try: active_text = page.get_by_text("Active", exact=True) - await active_text.wait_for(timeout=5000, state="visible") - logger.info(f"✓ Background sync enabled for {username} - Active badge visible") - return True + if await active_text.count() > 0 and await active_text.is_visible(): + logger.info(f"✓ Authorization already fully active for {username}") + result["step2"] = True + return result except Exception: - # Take screenshot for debugging - screenshot_path = f"/tmp/astrolabe_after_password_{username}.png" - await page.screenshot(path=screenshot_path) - logger.error( - f"Active badge did not appear for {username}. Screenshot: {screenshot_path}" + pass + + # Step 2: Generate app password and enter it + app_password = await generate_app_password(page, username) + result["app_password"] = app_password + + try: + result["step2"] = await enable_background_sync_via_app_password( + page, username, app_password ) + logger.info(f"✓ Step 2 complete for {username}") + except Exception as e: + logger.error(f"Step 2 failed for {username}: {e}") raise + logger.info(f"✓ Full Astrolabe authorization complete for {username}") + return result + async def verify_app_password_created(username: str) -> bool: """Verify that background sync app password was stored for the user. diff --git a/tests/integration/test_astrolabe_plotly_visualization.py b/tests/integration/test_astrolabe_plotly_visualization.py new file mode 100644 index 0000000..2a9a93b --- /dev/null +++ b/tests/integration/test_astrolabe_plotly_visualization.py @@ -0,0 +1,371 @@ +"""Integration test for Astrolabe Plotly 3D visualization with multi-user BasicAuth mode. + +This test verifies that: +1. User can provision background sync access via app password +2. Content created via MCP tools is indexed by vector sync +3. Semantic search via Astrolabe UI returns results +4. Plotly 3D visualization container renders correctly + +Requires: +- docker-compose up -d app db mcp-multi-user-basic +- ENABLE_SEMANTIC_SEARCH=true on the mcp-multi-user-basic container +""" + +import base64 +import json +import logging +import re +import uuid + +import anyio +import pytest +from playwright.async_api import Page + +# Import helper functions from existing test +from tests.conftest import create_mcp_client_session +from tests.integration.test_astrolabe_multi_user_background_sync import ( + complete_astrolabe_authorization, + login_to_nextcloud, +) + +logger = logging.getLogger(__name__) + +pytestmark = [pytest.mark.integration, pytest.mark.oauth] + + +async def wait_for_vector_sync( + mcp_client, initial_indexed_count: int, timeout_seconds: int = 60 +) -> tuple[bool, dict | None]: + """Wait for vector sync to index new content. + + Args: + mcp_client: MCP client session + initial_indexed_count: Initial indexed document count before creating content + timeout_seconds: Maximum time to wait for sync + + Returns: + Tuple of (success, status_data) + """ + wait_interval = 2 + waited = 0 + status_data = None + + while waited < timeout_seconds: + sync_status = await mcp_client.call_tool("nc_get_vector_sync_status", {}) + if sync_status.isError: + logger.warning(f"Vector sync status error: {sync_status}") + return False, None + + status_data = json.loads(sync_status.content[0].text) + indexed_count = status_data.get("indexed_count", 0) + pending_count = status_data.get("pending_count", 1) + + logger.info( + f"Sync status at {waited}s: indexed={indexed_count}, " + f"pending={pending_count}, status={status_data.get('status')}" + ) + + if indexed_count > initial_indexed_count and pending_count == 0: + logger.info( + f"✓ Sync complete: {indexed_count} documents indexed " + f"(was {initial_indexed_count})" + ) + return True, status_data + + await anyio.sleep(wait_interval) + waited += wait_interval + + return False, status_data + + +async def navigate_to_astrolabe_main(page: Page): + """Navigate to Astrolabe main app page (Semantic Search section). + + Args: + page: Playwright page instance (must be authenticated) + """ + nextcloud_url = "http://localhost:8080" + + logger.info("Navigating to Astrolabe main app...") + await page.goto(f"{nextcloud_url}/apps/astrolabe", wait_until="networkidle") + + # Wait for the app to load + await anyio.sleep(1) + + logger.info("✓ Successfully loaded Astrolabe main app") + + +@pytest.mark.integration +@pytest.mark.oauth +@pytest.mark.timeout( + 300 +) # 5 minutes - this test involves OAuth, app password, and vector sync +async def test_astrolabe_plotly_visualization_with_basic_auth( + browser, + test_users_setup, + configure_astrolabe_for_mcp_server, +): + """Test Plotly 3D visualization in Astrolabe with multi-user BasicAuth mode. + + This test: + 1. Configures Astrolabe for the mcp-multi-user-basic service + 2. Provisions background sync access for alice via app password + 3. Creates a note with unique searchable content (as alice) + 4. Waits for vector sync to index the note + 5. Performs semantic search in Astrolabe UI + 6. Verifies the Plotly visualization renders and results are displayed + """ + # Phase 1: Configure Astrolabe for mcp-multi-user-basic + await configure_astrolabe_for_mcp_server( + mcp_server_internal_url="http://mcp-multi-user-basic:8000", + mcp_server_public_url="http://localhost:8003", + ) + + username = "alice" + password = test_users_setup[username]["password"] + note_id = None + unique_term = None + + # Create MCP client with alice's credentials for the multi-user BasicAuth server + credentials = base64.b64encode(f"{username}:{password}".encode()).decode("utf-8") + auth_header = f"Basic {credentials}" + + context = await browser.new_context(ignore_https_errors=True) + page = await context.new_page() + + try: + # Phase 2: Complete full Astrolabe authorization (OAuth + app password) + await login_to_nextcloud(page, username, password) + auth_result = await complete_astrolabe_authorization(page, username, password) + logger.info(f"Authorization result: {auth_result}") + + # Create MCP client session as alice - all MCP operations inside this block + async for alice_mcp_client in create_mcp_client_session( + url="http://localhost:8003/mcp", + headers={"Authorization": auth_header}, + client_name="Alice BasicAuth MCP", + ): + # Phase 3: Get initial indexed count + initial_sync = await alice_mcp_client.call_tool( + "nc_get_vector_sync_status", {} + ) + + if initial_sync.isError: + pytest.skip("Vector sync not enabled on mcp-multi-user-basic") + + initial_data = json.loads(initial_sync.content[0].text) + initial_count = initial_data.get("indexed_count", 0) + logger.info(f"Initial indexed count: {initial_count}") + + # Create note with unique searchable term + unique_term = f"plotly_viz_test_{uuid.uuid4().hex[:8]}" + note_response = await alice_mcp_client.call_tool( + "nc_notes_create_note", + { + "title": f"Visualization Test Note {unique_term}", + "content": f"""# Testing Plotly Visualization + +This note contains the unique term: {unique_term} + +It is used to test the 3D vector space visualization in the Astrolabe app. +The visualization should show this document as a point in PCA-reduced space. + +## Key Features +- Semantic search with embeddings +- PCA dimension reduction to 3D +- Interactive Plotly scatter3d plot +""", + "category": "Test", + }, + ) + + if note_response.isError: + pytest.fail(f"Failed to create test note: {note_response}") + + note_data = json.loads(note_response.content[0].text) + note_id = note_data.get("id") + logger.info(f"Created test note ID: {note_id}") + + # Phase 4: Wait for vector indexing + sync_complete, status = await wait_for_vector_sync( + alice_mcp_client, initial_count, timeout_seconds=90 + ) + assert sync_complete, f"Vector sync did not complete in time: {status}" + + # Phase 5: Navigate to Astrolabe and perform search + await navigate_to_astrolabe_main(page) + + # Fill search query - find the Astrolabe search input specifically + # The NcTextField component wraps the input in a div with class mcp-search-input + search_input = page.locator(".mcp-search-input input") + await search_input.wait_for(timeout=10000, state="visible") + await search_input.fill(unique_term) + logger.info(f"Entered search query: {unique_term}") + + # Trigger search by pressing Enter on the input field + # This is wired to performSearch via @keyup.enter in the Vue component + await search_input.press("Enter") + logger.info("Pressed Enter to trigger search") + + # Wait for loading to complete - watch for loading indicator to disappear + loading_indicator = page.locator(".mcp-loading") + try: + # If loading indicator appears, wait for it to disappear + if await loading_indicator.count() > 0: + await loading_indicator.wait_for(state="hidden", timeout=30000) + logger.info("Loading completed") + except Exception: + # Loading might be too fast to catch + pass + + # Brief wait for UI to settle + await anyio.sleep(1) + + # Take diagnostic screenshot + await page.screenshot(path="/tmp/astrolabe_search_after_click.png") + logger.info( + "Took diagnostic screenshot: /tmp/astrolabe_search_after_click.png" + ) + + # Wait for search results using text-based detection + # This is more reliable than class-based selectors + # The UI shows "N results" when search completes successfully + results_text_pattern = page.get_by_text(re.compile(r"\d+ results?")) + no_results_text = page.get_by_text("No results found") + error_note = page.locator(".mcp-error") + + # Wait for one of: results count, no results message, or error + try: + # Poll for results or error states (don't rely on Nextcloud core CSS classes) + found_state = False + for attempt in range(60): # 60 attempts, 500ms each = 30s total + if await error_note.count() > 0: + error_text = await error_note.text_content() + logger.error(f"Search error: {error_text}") + pytest.fail(f"Search failed with error: {error_text}") + + if await no_results_text.count() > 0: + logger.warning( + "No results found - vector sync may not have completed" + ) + await page.screenshot(path="/tmp/astrolabe_no_results.png") + pytest.fail( + f"Search returned no results for '{unique_term}'. " + "Check if vector sync completed for alice's content." + ) + + if await results_text_pattern.count() > 0: + results_text = await results_text_pattern.first.text_content() + logger.info(f"Found results: {results_text}") + found_state = True + break + + if attempt % 10 == 0: + logger.info( + f"Waiting for results... (attempt {attempt + 1}/60)" + ) + + await anyio.sleep(0.5) + + if not found_state: + await page.screenshot(path="/tmp/astrolabe_search_timeout.png") + page_content = await page.content() + logger.error(f"Search state not resolved. Page URL: {page.url}") + logger.error(f"Page content snippet: {page_content[:2000]}") + raise AssertionError("Search did not complete within timeout") + + except AssertionError: + raise # Re-raise AssertionError as-is + except Exception as e: + # Take another screenshot and get page content for debugging + await page.screenshot(path="/tmp/astrolabe_search_timeout.png") + page_content = await page.content() + logger.error(f"Search state not resolved. Page URL: {page.url}") + logger.error(f"Page content snippet: {page_content[:2000]}") + raise AssertionError(f"Search did not complete: {e}") + + logger.info("Results loaded") + + # Phase 6: Verify visualization + # Check Plotly container is visible + viz_plot = page.locator("#viz-plot") + await viz_plot.wait_for(timeout=15000, state="visible") + logger.info("Plotly container is visible") + + # Verify Plotly has rendered content (SVG/canvas elements inside) + has_viz_content = await page.evaluate( + """ + () => { + const plot = document.getElementById('viz-plot'); + if (!plot) return false; + // Plotly creates .plotly class, canvas, or svg elements + return plot.children.length > 0 || + plot.querySelector('.plotly, canvas, svg, .main-svg') !== null; + } + """ + ) + assert has_viz_content, "Plotly visualization did not render any content" + logger.info("✓ Plotly visualization rendered content") + + # Verify results are displayed + result_items = page.locator(".mcp-result-item") + result_count = await result_items.count() + assert result_count > 0, "No search results displayed" + logger.info(f"✓ Found {result_count} search result(s)") + + # Verify our note appears in results + found_note = False + for i in range(result_count): + item = result_items.nth(i) + title_elem = item.locator(".mcp-result-title") + title_text = await title_elem.text_content() + if title_text and unique_term in title_text: + found_note = True + logger.info(f"✓ Found test note in results: {title_text}") + break + + assert found_note, f"Created note with '{unique_term}' not found in results" + + # Optional: Take screenshot for verification + await page.screenshot(path="/tmp/astrolabe_plotly_test_success.png") + logger.info("✓ All Plotly visualization assertions passed") + + # Cleanup: delete the created note (inside the MCP client context) + if note_id: + try: + delete_response = await alice_mcp_client.call_tool( + "nc_notes_delete_note", {"note_id": note_id} + ) + if not delete_response.isError: + logger.info(f"✓ Cleaned up test note {note_id}") + note_id = None # Mark as cleaned + else: + logger.warning( + f"Failed to delete note {note_id}: {delete_response}" + ) + except Exception as e: + logger.warning(f"Cleanup failed for note {note_id}: {e}") + + finally: + # Cleanup note if not already cleaned (create new client for cleanup) + if note_id: + try: + async for cleanup_client in create_mcp_client_session( + url="http://localhost:8003/mcp", + headers={"Authorization": auth_header}, + client_name="Cleanup MCP", + ): + delete_response = await cleanup_client.call_tool( + "nc_notes_delete_note", {"note_id": note_id} + ) + if not delete_response.isError: + logger.info(f"✓ Cleaned up test note {note_id} (finally)") + else: + logger.warning( + f"Failed to delete note {note_id}: {delete_response}" + ) + except Exception as e: + logger.warning(f"Cleanup failed for note {note_id}: {e}") + + # Close browser context + await context.close()