|
|
|
@@ -9,6 +9,7 @@ import pytest
|
|
|
|
|
from httpx import HTTPStatusError
|
|
|
|
|
from mcp import ClientSession
|
|
|
|
|
from mcp.client.session import RequestContext
|
|
|
|
|
from mcp.client.sse import sse_client
|
|
|
|
|
from mcp.client.streamable_http import streamablehttp_client
|
|
|
|
|
from mcp.types import ElicitRequestParams, ElicitResult, ErrorData
|
|
|
|
|
|
|
|
|
@@ -165,6 +166,51 @@ async def create_mcp_client_session(
|
|
|
|
|
logger.debug(f"{client_name} client session cleaned up successfully")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def create_mcp_client_session_sse(
|
|
|
|
|
url: str,
|
|
|
|
|
token: str | None = None,
|
|
|
|
|
client_name: str = "MCP",
|
|
|
|
|
elicitation_callback: Any = None,
|
|
|
|
|
) -> AsyncGenerator[ClientSession, Any]:
|
|
|
|
|
"""
|
|
|
|
|
Factory function to create an MCP client session using SSE transport.
|
|
|
|
|
|
|
|
|
|
Similar to create_mcp_client_session but uses SSE transport instead of streamable-http.
|
|
|
|
|
Uses native async context managers to ensure correct LIFO cleanup order.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url: MCP server URL (e.g., "http://localhost:8000/sse")
|
|
|
|
|
token: Optional OAuth access token for Bearer authentication
|
|
|
|
|
client_name: Client name for logging (e.g., "Basic MCP (SSE)")
|
|
|
|
|
elicitation_callback: Optional callback for handling elicitation requests
|
|
|
|
|
|
|
|
|
|
Yields:
|
|
|
|
|
Initialized MCP ClientSession
|
|
|
|
|
|
|
|
|
|
Note:
|
|
|
|
|
SSE transport is being deprecated in favor of streamable-http.
|
|
|
|
|
This function exists for compatibility testing only.
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f"Creating SSE client for {client_name}")
|
|
|
|
|
|
|
|
|
|
# Prepare headers with OAuth token if provided
|
|
|
|
|
headers = {"Authorization": f"Bearer {token}"} if token else None
|
|
|
|
|
|
|
|
|
|
# Use native async with - Python ensures LIFO cleanup
|
|
|
|
|
# Cleanup order will be: ClientSession.__aexit__ -> sse_client.__aexit__
|
|
|
|
|
# Note: sse_client yields only (read_stream, write_stream), not 3 values like streamablehttp_client
|
|
|
|
|
async with sse_client(url, headers=headers) as (read_stream, write_stream):
|
|
|
|
|
async with ClientSession(
|
|
|
|
|
read_stream, write_stream, elicitation_callback=elicitation_callback
|
|
|
|
|
) as session:
|
|
|
|
|
await session.initialize()
|
|
|
|
|
logger.info(f"{client_name} client session initialized successfully")
|
|
|
|
|
yield session
|
|
|
|
|
|
|
|
|
|
# Cleanup happens automatically in LIFO order - no exception suppression needed
|
|
|
|
|
logger.debug(f"{client_name} client session cleaned up successfully")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
|
|
|
async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
|
|
|
|
|
"""
|
|
|
|
@@ -203,12 +249,14 @@ async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
|
|
|
async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]:
|
|
|
|
|
"""
|
|
|
|
|
Fixture to create an MCP client session for integration tests using streamable-http.
|
|
|
|
|
Fixture to create an MCP client session for integration tests using SSE transport.
|
|
|
|
|
|
|
|
|
|
Uses anyio pytest plugin for proper async fixture handling.
|
|
|
|
|
|
|
|
|
|
Note: SSE transport is being deprecated. This fixture uses SSE for compatibility testing.
|
|
|
|
|
"""
|
|
|
|
|
async for session in create_mcp_client_session(
|
|
|
|
|
url="http://localhost:8000/mcp", client_name="Basic MCP"
|
|
|
|
|
async for session in create_mcp_client_session_sse(
|
|
|
|
|
url="http://localhost:8000/sse", client_name="Basic MCP (SSE)"
|
|
|
|
|
):
|
|
|
|
|
yield session
|
|
|
|
|
|
|
|
|
|