test: Refactor test fixtures into a oauth token factory

This commit is contained in:
Chris Coutinho
2025-10-15 21:15:18 +02:00
parent 9e4c20a4b1
commit 3921d9b982
+110 -274
View File
@@ -58,6 +58,69 @@ async def wait_for_nextcloud(
return False
async def create_mcp_client_session(
url: str,
token: str | None = None,
client_name: str = "MCP",
) -> AsyncGenerator[ClientSession, Any]:
"""
Factory function to create an MCP client session with proper lifecycle management.
Consolidates the common pattern used by all MCP client fixtures:
- Creates streamable HTTP client with optional OAuth token
- Initializes MCP ClientSession
- Handles cleanup with proper exception handling
Args:
url: MCP server URL (e.g., "http://127.0.0.1:8000/mcp")
token: Optional OAuth access token for Bearer authentication
client_name: Client name for logging (e.g., "OAuth MCP (Playwright)")
Yields:
Initialized MCP ClientSession
"""
logger.info(f"Creating Streamable HTTP client for {client_name}")
# Prepare headers with OAuth token if provided
headers = {"Authorization": f"Bearer {token}"} if token else None
streamable_context = streamablehttp_client(url, headers=headers)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info(f"{client_name} client session initialized successfully")
yield session
finally:
# Clean up in reverse order, ignoring task scope issues
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(f"Error closing {client_name} session: {e}")
except Exception as e:
logger.warning(f"Error closing {client_name} session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(
f"Error closing {client_name} streamable HTTP client: {e}"
)
except Exception as e:
logger.warning(f"Error closing {client_name} streamable HTTP client: {e}")
@pytest.fixture(scope="session")
async def nc_client() -> AsyncGenerator[NextcloudClient, Any]:
"""
@@ -98,42 +161,11 @@ async def nc_mcp_client() -> AsyncGenerator[ClientSession, Any]:
"""
Fixture to create an MCP client session for integration tests using streamable-http.
"""
logger.info("Creating Streamable HTTP client")
streamable_context = streamablehttp_client("http://127.0.0.1:8000/mcp")
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("MCP client session initialized successfully")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8000/mcp", client_name="Basic MCP"
):
yield session
finally:
# Clean up in reverse order, ignoring task scope issues
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(f"Error closing session: {e}")
except Exception as e:
logger.warning(f"Error closing session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(f"Error closing streamable HTTP client: {e}")
except Exception as e:
logger.warning(f"Error closing streamable HTTP client: {e}")
@pytest.fixture(scope="session")
async def nc_mcp_oauth_client_interactive(
@@ -148,52 +180,13 @@ async def nc_mcp_oauth_client_interactive(
Automatically skips when running in GitHub Actions CI.
"""
logger.info("Creating Streamable HTTP client for OAuth MCP server (Interactive)")
# Pass OAuth token as Bearer token in headers
headers = {"Authorization": f"Bearer {interactive_oauth_token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("OAuth MCP client session (Interactive) initialized successfully")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=interactive_oauth_token,
client_name="OAuth MCP (Interactive)",
):
yield session
finally:
# Clean up in reverse order, ignoring task scope issues
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(f"Error closing OAuth session (Interactive): {e}")
except Exception as e:
logger.warning(f"Error closing OAuth session (Interactive): {e}")
try:
await streamable_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(
f"Error closing OAuth streamable HTTP client (Interactive): {e}"
)
except Exception as e:
logger.warning(
f"Error closing OAuth streamable HTTP client (Interactive): {e}"
)
@pytest.fixture(scope="session")
async def nc_mcp_oauth_client(
@@ -206,51 +199,13 @@ async def nc_mcp_oauth_client(
This is the default OAuth MCP fixture using headless browser automation suitable for CI/CD.
For interactive testing with manual browser login, use nc_mcp_oauth_client_interactive instead.
"""
logger.info("Creating Streamable HTTP client for OAuth MCP server (Playwright)")
# Pass OAuth token as Bearer token in headers
headers = {"Authorization": f"Bearer {playwright_oauth_token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("OAuth MCP client session (Playwright) initialized successfully")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=playwright_oauth_token,
client_name="OAuth MCP (Playwright)",
):
yield session
finally:
# Clean up in reverse order, ignoring task scope issues
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(f"Error closing Playwright OAuth session: {e}")
except Exception as e:
logger.warning(f"Error closing Playwright OAuth session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(
f"Error closing Playwright OAuth streamable HTTP client: {e}"
)
except Exception as e:
logger.warning(
f"Error closing Playwright OAuth streamable HTTP client: {e}"
)
@pytest.fixture
async def temporary_note(nc_client: NextcloudClient):
@@ -1089,51 +1044,13 @@ async def nc_mcp_oauth_client_playwright(
This fixture uses headless browser automation and is suitable for CI/CD pipelines.
For interactive testing, use nc_mcp_oauth_client fixture instead.
"""
logger.info("Creating Streamable HTTP client for OAuth MCP server (Playwright)")
# Pass OAuth token as Bearer token in headers
headers = {"Authorization": f"Bearer {playwright_oauth_token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("OAuth MCP client session (Playwright) initialized successfully")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=playwright_oauth_token,
client_name="OAuth MCP (Playwright Alt)",
):
yield session
finally:
# Clean up in reverse order, ignoring task scope issues
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(f"Error closing Playwright OAuth session: {e}")
except Exception as e:
logger.warning(f"Error closing Playwright OAuth session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except RuntimeError as e:
if "cancel scope" in str(e):
logger.debug(f"Ignoring cancel scope teardown issue: {e}")
else:
logger.warning(
f"Error closing Playwright OAuth streamable HTTP client: {e}"
)
except Exception as e:
logger.warning(
f"Error closing Playwright OAuth streamable HTTP client: {e}"
)
@pytest.fixture(scope="session")
async def test_users_setup(nc_client: NextcloudClient):
@@ -1416,7 +1333,7 @@ async def all_oauth_tokens(
config["password"],
)
# Create tasks for all users with staggered starts (2.0s apart)
# Create tasks for all users with staggered starts (0.5s apart)
tasks = {
username: get_token_with_delay(username, config, idx * 0.5)
for idx, (username, config) in enumerate(test_users_setup.items())
@@ -1467,133 +1384,52 @@ async def diana_oauth_token(all_oauth_tokens) -> str:
@pytest.fixture(scope="session")
async def alice_mcp_client(alice_oauth_token) -> AsyncGenerator[ClientSession, Any]:
async def alice_mcp_client(
alice_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as alice (owner role)."""
token = alice_oauth_token
# Create MCP client session with proper lifecycle management
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Alice MCP client session initialized")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=alice_oauth_token,
client_name="Alice MCP",
):
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing alice session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing alice streamable context: {e}")
@pytest.fixture(scope="session")
async def bob_mcp_client(bob_oauth_token) -> AsyncGenerator[ClientSession, Any]:
async def bob_mcp_client(bob_oauth_token: str) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as bob (viewer role)."""
token = bob_oauth_token
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Bob MCP client session initialized")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp", token=bob_oauth_token, client_name="Bob MCP"
):
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing bob session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing bob streamable context: {e}")
@pytest.fixture(scope="session")
async def charlie_mcp_client(charlie_oauth_token) -> AsyncGenerator[ClientSession, Any]:
async def charlie_mcp_client(
charlie_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as charlie (editor role, in 'editors' group)."""
token = charlie_oauth_token
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Charlie MCP client session initialized")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=charlie_oauth_token,
client_name="Charlie MCP",
):
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing charlie session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing charlie streamable context: {e}")
@pytest.fixture(scope="session")
async def diana_mcp_client(diana_oauth_token) -> AsyncGenerator[ClientSession, Any]:
async def diana_mcp_client(
diana_oauth_token: str,
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client authenticated as diana (no-access role)."""
token = diana_oauth_token
headers = {"Authorization": f"Bearer {token}"}
streamable_context = streamablehttp_client(
"http://127.0.0.1:8001/mcp", headers=headers
)
session_context = None
try:
read_stream, write_stream, _ = await streamable_context.__aenter__()
session_context = ClientSession(read_stream, write_stream)
session = await session_context.__aenter__()
await session.initialize()
logger.info("Diana MCP client session initialized")
async for session in create_mcp_client_session(
url="http://127.0.0.1:8001/mcp",
token=diana_oauth_token,
client_name="Diana MCP",
):
yield session
finally:
if session_context is not None:
try:
await session_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing diana session: {e}")
try:
await streamable_context.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error closing diana streamable context: {e}")
# Test user/group fixtures for clean test isolation
@pytest.fixture