From 3921d9b982aa1af37faf92ebcebb9317420f71f2 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Wed, 15 Oct 2025 21:15:18 +0200 Subject: [PATCH] test: Refactor test fixtures into a oauth token factory --- tests/conftest.py | 384 +++++++++++++--------------------------------- 1 file changed, 110 insertions(+), 274 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 90a0f84e..7352d4c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -58,6 +58,69 @@ async def wait_for_nextcloud( return False +async def create_mcp_client_session( + url: str, + token: str | None = None, + client_name: str = "MCP", +) -> AsyncGenerator[ClientSession, Any]: + """ + Factory function to create an MCP client session with proper lifecycle management. + + Consolidates the common pattern used by all MCP client fixtures: + - Creates streamable HTTP client with optional OAuth token + - Initializes MCP ClientSession + - Handles cleanup with proper exception handling + + Args: + url: MCP server URL (e.g., "http://127.0.0.1:8000/mcp") + token: Optional OAuth access token for Bearer authentication + client_name: Client name for logging (e.g., "OAuth MCP (Playwright)") + + Yields: + Initialized MCP ClientSession + """ + logger.info(f"Creating Streamable HTTP client for {client_name}") + + # Prepare headers with OAuth token if provided + headers = {"Authorization": f"Bearer {token}"} if token else None + streamable_context = streamablehttp_client(url, headers=headers) + session_context = None + + try: + read_stream, write_stream, _ = await streamable_context.__aenter__() + session_context = ClientSession(read_stream, write_stream) + session = await session_context.__aenter__() + await session.initialize() + logger.info(f"{client_name} client session initialized successfully") + + yield session + + finally: + # Clean up in reverse order, ignoring task scope issues + if session_context is not None: + try: + await session_context.__aexit__(None, None, None) + except RuntimeError as e: + if "cancel scope" in str(e): + logger.debug(f"Ignoring cancel scope teardown issue: {e}") + else: + logger.warning(f"Error closing {client_name} session: {e}") + except Exception as e: + logger.warning(f"Error closing {client_name} session: {e}") + + try: + await streamable_context.__aexit__(None, None, None) + except RuntimeError as e: + if "cancel scope" in str(e): + logger.debug(f"Ignoring cancel scope teardown issue: {e}") + else: + logger.warning( + f"Error closing {client_name} streamable HTTP client: {e}" + ) + except Exception as e: + logger.warning(f"Error closing {client_name} streamable HTTP client: {e}") + + @pytest.fixture(scope="session") async def nc_client() -> AsyncGenerator[NextcloudClient, Any]: """ @@ -98,42 +161,11 @@ async def nc_mcp_client() -> AsyncGenerator[ClientSession, Any]: """ Fixture to create an MCP client session for integration tests using streamable-http. """ - logger.info("Creating Streamable HTTP client") - streamable_context = streamablehttp_client("http://127.0.0.1:8000/mcp") - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("MCP client session initialized successfully") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8000/mcp", client_name="Basic MCP" + ): yield session - finally: - # Clean up in reverse order, ignoring task scope issues - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning(f"Error closing session: {e}") - except Exception as e: - logger.warning(f"Error closing session: {e}") - - try: - await streamable_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning(f"Error closing streamable HTTP client: {e}") - except Exception as e: - logger.warning(f"Error closing streamable HTTP client: {e}") - @pytest.fixture(scope="session") async def nc_mcp_oauth_client_interactive( @@ -148,52 +180,13 @@ async def nc_mcp_oauth_client_interactive( Automatically skips when running in GitHub Actions CI. """ - - logger.info("Creating Streamable HTTP client for OAuth MCP server (Interactive)") - - # Pass OAuth token as Bearer token in headers - headers = {"Authorization": f"Bearer {interactive_oauth_token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("OAuth MCP client session (Interactive) initialized successfully") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=interactive_oauth_token, + client_name="OAuth MCP (Interactive)", + ): yield session - finally: - # Clean up in reverse order, ignoring task scope issues - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning(f"Error closing OAuth session (Interactive): {e}") - except Exception as e: - logger.warning(f"Error closing OAuth session (Interactive): {e}") - - try: - await streamable_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning( - f"Error closing OAuth streamable HTTP client (Interactive): {e}" - ) - except Exception as e: - logger.warning( - f"Error closing OAuth streamable HTTP client (Interactive): {e}" - ) - @pytest.fixture(scope="session") async def nc_mcp_oauth_client( @@ -206,51 +199,13 @@ async def nc_mcp_oauth_client( This is the default OAuth MCP fixture using headless browser automation suitable for CI/CD. For interactive testing with manual browser login, use nc_mcp_oauth_client_interactive instead. """ - logger.info("Creating Streamable HTTP client for OAuth MCP server (Playwright)") - - # Pass OAuth token as Bearer token in headers - headers = {"Authorization": f"Bearer {playwright_oauth_token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("OAuth MCP client session (Playwright) initialized successfully") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=playwright_oauth_token, + client_name="OAuth MCP (Playwright)", + ): yield session - finally: - # Clean up in reverse order, ignoring task scope issues - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning(f"Error closing Playwright OAuth session: {e}") - except Exception as e: - logger.warning(f"Error closing Playwright OAuth session: {e}") - - try: - await streamable_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning( - f"Error closing Playwright OAuth streamable HTTP client: {e}" - ) - except Exception as e: - logger.warning( - f"Error closing Playwright OAuth streamable HTTP client: {e}" - ) - @pytest.fixture async def temporary_note(nc_client: NextcloudClient): @@ -1089,51 +1044,13 @@ async def nc_mcp_oauth_client_playwright( This fixture uses headless browser automation and is suitable for CI/CD pipelines. For interactive testing, use nc_mcp_oauth_client fixture instead. """ - logger.info("Creating Streamable HTTP client for OAuth MCP server (Playwright)") - - # Pass OAuth token as Bearer token in headers - headers = {"Authorization": f"Bearer {playwright_oauth_token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("OAuth MCP client session (Playwright) initialized successfully") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=playwright_oauth_token, + client_name="OAuth MCP (Playwright Alt)", + ): yield session - finally: - # Clean up in reverse order, ignoring task scope issues - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning(f"Error closing Playwright OAuth session: {e}") - except Exception as e: - logger.warning(f"Error closing Playwright OAuth session: {e}") - - try: - await streamable_context.__aexit__(None, None, None) - except RuntimeError as e: - if "cancel scope" in str(e): - logger.debug(f"Ignoring cancel scope teardown issue: {e}") - else: - logger.warning( - f"Error closing Playwright OAuth streamable HTTP client: {e}" - ) - except Exception as e: - logger.warning( - f"Error closing Playwright OAuth streamable HTTP client: {e}" - ) - @pytest.fixture(scope="session") async def test_users_setup(nc_client: NextcloudClient): @@ -1416,7 +1333,7 @@ async def all_oauth_tokens( config["password"], ) - # Create tasks for all users with staggered starts (2.0s apart) + # Create tasks for all users with staggered starts (0.5s apart) tasks = { username: get_token_with_delay(username, config, idx * 0.5) for idx, (username, config) in enumerate(test_users_setup.items()) @@ -1467,133 +1384,52 @@ async def diana_oauth_token(all_oauth_tokens) -> str: @pytest.fixture(scope="session") -async def alice_mcp_client(alice_oauth_token) -> AsyncGenerator[ClientSession, Any]: +async def alice_mcp_client( + alice_oauth_token: str, +) -> AsyncGenerator[ClientSession, Any]: """MCP client authenticated as alice (owner role).""" - token = alice_oauth_token - - # Create MCP client session with proper lifecycle management - headers = {"Authorization": f"Bearer {token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("Alice MCP client session initialized") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=alice_oauth_token, + client_name="Alice MCP", + ): yield session - finally: - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing alice session: {e}") - try: - await streamable_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing alice streamable context: {e}") - @pytest.fixture(scope="session") -async def bob_mcp_client(bob_oauth_token) -> AsyncGenerator[ClientSession, Any]: +async def bob_mcp_client(bob_oauth_token: str) -> AsyncGenerator[ClientSession, Any]: """MCP client authenticated as bob (viewer role).""" - token = bob_oauth_token - - headers = {"Authorization": f"Bearer {token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("Bob MCP client session initialized") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", token=bob_oauth_token, client_name="Bob MCP" + ): yield session - finally: - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing bob session: {e}") - try: - await streamable_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing bob streamable context: {e}") - @pytest.fixture(scope="session") -async def charlie_mcp_client(charlie_oauth_token) -> AsyncGenerator[ClientSession, Any]: +async def charlie_mcp_client( + charlie_oauth_token: str, +) -> AsyncGenerator[ClientSession, Any]: """MCP client authenticated as charlie (editor role, in 'editors' group).""" - token = charlie_oauth_token - - headers = {"Authorization": f"Bearer {token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("Charlie MCP client session initialized") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=charlie_oauth_token, + client_name="Charlie MCP", + ): yield session - finally: - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing charlie session: {e}") - try: - await streamable_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing charlie streamable context: {e}") - @pytest.fixture(scope="session") -async def diana_mcp_client(diana_oauth_token) -> AsyncGenerator[ClientSession, Any]: +async def diana_mcp_client( + diana_oauth_token: str, +) -> AsyncGenerator[ClientSession, Any]: """MCP client authenticated as diana (no-access role).""" - token = diana_oauth_token - - headers = {"Authorization": f"Bearer {token}"} - streamable_context = streamablehttp_client( - "http://127.0.0.1:8001/mcp", headers=headers - ) - session_context = None - - try: - read_stream, write_stream, _ = await streamable_context.__aenter__() - session_context = ClientSession(read_stream, write_stream) - session = await session_context.__aenter__() - await session.initialize() - logger.info("Diana MCP client session initialized") - + async for session in create_mcp_client_session( + url="http://127.0.0.1:8001/mcp", + token=diana_oauth_token, + client_name="Diana MCP", + ): yield session - finally: - if session_context is not None: - try: - await session_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing diana session: {e}") - try: - await streamable_context.__aexit__(None, None, None) - except Exception as e: - logger.debug(f"Error closing diana streamable context: {e}") - # Test user/group fixtures for clean test isolation @pytest.fixture