test: Move client integration tests to mocked unit tests

This commit is contained in:
Chris Coutinho
2025-10-24 05:50:25 +02:00
parent d452684535
commit 2f1bd1bbe9
7 changed files with 1792 additions and 1289 deletions
+263 -472
View File
@@ -1,535 +1,326 @@
import logging
import uuid
from typing import Any, Dict
import anyio
import httpx
import pytest
from httpx import HTTPStatusError
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.client.tables import TablesClient
from tests.client.conftest import (
create_mock_error_response,
create_mock_response,
create_mock_table_row_ocs_response,
create_mock_table_row_response,
create_mock_table_schema_response,
create_mock_tables_list_response,
)
logger = logging.getLogger(__name__)
# Mark all tests in this module as integration tests
pytestmark = pytest.mark.integration
# Mark all tests in this module as unit tests
pytestmark = pytest.mark.unit
@pytest.fixture(scope="module")
async def sample_table_info(nc_client: NextcloudClient) -> Dict[str, Any]:
"""
Fixture to get information about the sample table that comes with Nextcloud Tables.
This assumes that the sample table exists in the Nextcloud instance.
"""
logger.info("Looking for sample table in Nextcloud Tables app")
async def test_tables_list_tables(mocker):
"""Test that list_tables correctly parses the API response (OCS format)."""
mock_response = create_mock_tables_list_response(
tables=[
{"id": 1, "title": "Table 1"},
{"id": 2, "title": "Table 2"},
]
)
# Get all tables
tables = await nc_client.tables.list_tables()
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
# Look for a sample table (usually created by default)
sample_table = None
for table in tables:
# Common names for sample tables
if any(
keyword in table.get("title", "").lower()
for keyword in ["sample", "demo", "example", "test"]
):
sample_table = table
break
if not sample_table and tables:
# If no sample table found, use the first available table
sample_table = tables[0]
logger.info(
f"No sample table found, using first available table: {sample_table.get('title')}"
)
if not sample_table:
pytest.skip(
"No tables found in Nextcloud Tables app. Please ensure Tables app is installed and has at least one table."
)
# Get the schema for the sample table
table_id = sample_table["id"]
schema = await nc_client.tables.get_table_schema(table_id)
logger.info(f"Using sample table: {sample_table.get('title')} (ID: {table_id})")
return {
"table": sample_table,
"schema": schema,
"table_id": table_id,
"columns": schema.get("columns", []),
}
@pytest.fixture
async def temporary_table_row(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Fixture to create a temporary row in the sample table for testing.
Yields the created row data and cleans up afterward.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
# Create test data based on the table schema
test_data = {}
unique_suffix = uuid.uuid4().hex[:8]
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
# Generate test data based on column type
if column_type == "text":
test_data[column_id] = f"Test {column_title} {unique_suffix}"
elif column_type == "number":
test_data[column_id] = 42
elif column_type == "datetime":
test_data[column_id] = "2024-01-01T12:00:00Z"
elif column_type == "select":
# For select columns, use the first option if available
options = column.get("selectOptions", [])
if options:
test_data[column_id] = options[0].get("label", "Option 1")
else:
test_data[column_id] = "Test Option"
else:
# Default to text for unknown types
test_data[column_id] = f"Test {column_title} {unique_suffix}"
logger.info(f"Creating temporary row in table {table_id} with data: {test_data}")
created_row = None
try:
created_row = await nc_client.tables.create_row(table_id, test_data)
row_id = created_row.get("id")
if not row_id:
pytest.fail("Failed to get ID from created temporary row.")
logger.info(f"Temporary row created with ID: {row_id}")
yield created_row
finally:
if created_row and created_row.get("id"):
row_id = created_row["id"]
logger.info(f"Cleaning up temporary row ID: {row_id}")
try:
await nc_client.tables.delete_row(row_id)
logger.info(f"Successfully deleted temporary row ID: {row_id}")
except HTTPStatusError as e:
# Ignore 404 if row was already deleted by the test itself
if e.response.status_code != 404:
logger.error(f"HTTP error deleting temporary row {row_id}: {e}")
else:
logger.warning(f"Temporary row {row_id} already deleted (404).")
except Exception as e:
logger.error(f"Unexpected error deleting temporary row {row_id}: {e}")
async def test_tables_list_tables(nc_client: NextcloudClient):
"""
Test listing all tables available to the user.
"""
logger.info("Testing list_tables functionality")
tables = await nc_client.tables.list_tables()
client = TablesClient(mock_client, "testuser")
tables = await client.list_tables()
assert isinstance(tables, list)
assert len(tables) > 0, "Expected at least one table to be available"
assert len(tables) == 2
assert tables[0]["id"] == 1
assert tables[0]["title"] == "Table 1"
# Check that each table has required fields
for table in tables:
assert "id" in table
assert "title" in table
assert isinstance(table["id"], int)
assert isinstance(table["title"], str)
logger.info(f"Successfully listed {len(tables)} tables")
mock_make_request.assert_called_once()
async def test_tables_get_schema(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test getting the schema/structure of a specific table.
"""
table_id = sample_table_info["table_id"]
async def test_tables_get_schema(mocker):
"""Test that get_table_schema correctly parses the API response."""
mock_response = create_mock_table_schema_response(
table_id=123,
columns=[
{"id": 1, "title": "Name", "type": "text"},
{"id": 2, "title": "Age", "type": "number"},
{"id": 3, "title": "Email", "type": "text"},
],
)
logger.info(f"Testing get_table_schema for table ID: {table_id}")
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
schema = await nc_client.tables.get_table_schema(table_id)
client = TablesClient(mock_client, "testuser")
schema = await client.get_table_schema(table_id=123)
assert isinstance(schema, dict)
assert "columns" in schema
assert isinstance(schema["columns"], list)
assert len(schema["columns"]) > 0, "Expected at least one column in the table"
assert len(schema["columns"]) == 3
assert schema["columns"][0]["title"] == "Name"
# Check that each column has required fields
for column in schema["columns"]:
assert "id" in column
assert "title" in column
assert "type" in column
assert isinstance(column["id"], int)
assert isinstance(column["title"], str)
assert isinstance(column["type"], str)
logger.info(f"Successfully retrieved schema with {len(schema['columns'])} columns")
mock_make_request.assert_called_once()
assert "/tables/123/scheme" in mock_make_request.call_args[0][1]
async def test_tables_read_table(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test reading rows from a table.
"""
table_id = sample_table_info["table_id"]
async def test_tables_get_rows(mocker):
"""Test that get_table_rows correctly parses the API response."""
mock_response = create_mock_response(
status_code=200,
json_data=[
{
"id": 1,
"tableId": 123,
"data": [
{"columnId": 1, "value": "John"},
{"columnId": 2, "value": 30},
],
"createdBy": "testuser",
"createdAt": "2024-01-01T00:00:00+00:00",
"lastEditBy": "testuser",
"lastEditAt": "2024-01-01T00:00:00+00:00",
},
{
"id": 2,
"tableId": 123,
"data": [
{"columnId": 1, "value": "Jane"},
{"columnId": 2, "value": 25},
],
"createdBy": "testuser",
"createdAt": "2024-01-01T00:00:00+00:00",
"lastEditBy": "testuser",
"lastEditAt": "2024-01-01T00:00:00+00:00",
},
],
)
logger.info(f"Testing get_table_rows for table ID: {table_id}")
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
# Test without pagination
rows = await nc_client.tables.get_table_rows(table_id)
client = TablesClient(mock_client, "testuser")
rows = await client.get_table_rows(table_id=123)
assert isinstance(rows, list)
# Note: The table might be empty, so we don't assert len > 0
assert len(rows) == 2
assert rows[0]["id"] == 1
assert rows[0]["tableId"] == 123
# Test with pagination
rows_limited = await nc_client.tables.get_table_rows(table_id, limit=5, offset=0)
assert isinstance(rows_limited, list)
assert len(rows_limited) <= 5
# If there are rows, check their structure
if rows:
row = rows[0]
assert "id" in row
assert "tableId" in row
assert "data" in row
assert isinstance(row["id"], int)
assert isinstance(row["tableId"], int)
assert isinstance(row["data"], list)
logger.info(f"Successfully read {len(rows)} rows from table")
mock_make_request.assert_called_once()
async def test_tables_create_row(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test creating a new row in a table.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
async def test_tables_get_rows_with_pagination(mocker):
"""Test that get_table_rows correctly handles pagination parameters."""
mock_response = create_mock_response(
status_code=200,
json_data=[
{
"id": 1,
"tableId": 123,
"data": [],
"createdBy": "testuser",
"createdAt": "2024-01-01T00:00:00+00:00",
"lastEditBy": "testuser",
"lastEditAt": "2024-01-01T00:00:00+00:00",
},
],
)
# Create test data based on the table schema
test_data = {}
unique_suffix = uuid.uuid4().hex[:8]
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
client = TablesClient(mock_client, "testuser")
rows = await client.get_table_rows(table_id=123, limit=5, offset=10)
# Generate test data based on column type
if column_type == "text":
test_data[column_id] = f"Test Create {column_title} {unique_suffix}"
elif column_type == "number":
test_data[column_id] = 123
elif column_type == "datetime":
test_data[column_id] = "2024-01-01T12:00:00Z"
elif column_type == "select":
# For select columns, use the first option if available
options = column.get("selectOptions", [])
if options:
test_data[column_id] = options[0].get("label", "Option 1")
else:
test_data[column_id] = "Test Option"
else:
# Default to text for unknown types
test_data[column_id] = f"Test Create {column_title} {unique_suffix}"
assert isinstance(rows, list)
logger.info(f"Testing create_row for table ID: {table_id} with data: {test_data}")
created_row = None
try:
created_row = await nc_client.tables.create_row(table_id, test_data)
assert isinstance(created_row, dict)
assert "id" in created_row
assert "tableId" in created_row
assert isinstance(created_row["id"], int)
assert created_row["tableId"] == table_id
# Verify the row was created by reading it back
await anyio.sleep(1) # Allow potential propagation delay
rows = await nc_client.tables.get_table_rows(table_id)
created_row_id = created_row["id"]
# Find the created row in the results
found_row = None
for row in rows:
if row["id"] == created_row_id:
found_row = row
break
assert found_row is not None, (
f"Created row with ID {created_row_id} not found in table"
)
logger.info(f"Successfully created row with ID: {created_row_id}")
finally:
# Clean up the created row
if created_row and created_row.get("id"):
try:
await nc_client.tables.delete_row(created_row["id"])
logger.info(f"Cleaned up created row ID: {created_row['id']}")
except Exception as e:
logger.warning(f"Failed to clean up created row: {e}")
# Verify pagination parameters were passed
call_args = mock_make_request.call_args
assert call_args[1]["params"]["limit"] == 5
assert call_args[1]["params"]["offset"] == 10
async def test_tables_update_row(
nc_client: NextcloudClient,
temporary_table_row: Dict[str, Any],
sample_table_info: Dict[str, Any],
):
"""
Test updating an existing row in a table.
"""
row_id = temporary_table_row["id"]
columns = sample_table_info["columns"]
async def test_tables_create_row(mocker):
"""Test that create_row correctly parses the API response (OCS format)."""
mock_response = create_mock_table_row_ocs_response(
row_id=456,
table_id=123,
data=[
{"columnId": 1, "value": "Test Name"},
{"columnId": 2, "value": 99},
],
)
# Create updated data
update_data = {}
unique_suffix = uuid.uuid4().hex[:8]
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
client = TablesClient(mock_client, "testuser")
test_data = {1: "Test Name", 2: 99}
created_row = await client.create_row(table_id=123, data=test_data)
# Generate updated test data based on column type
if column_type == "text":
update_data[column_id] = f"Updated {column_title} {unique_suffix}"
elif column_type == "number":
update_data[column_id] = 456
elif column_type == "datetime":
update_data[column_id] = "2024-12-31T23:59:59Z"
elif column_type == "select":
# For select columns, use the first option if available
options = column.get("selectOptions", [])
if options:
update_data[column_id] = options[0].get("label", "Option 1")
else:
update_data[column_id] = "Updated Option"
else:
# Default to text for unknown types
update_data[column_id] = f"Updated {column_title} {unique_suffix}"
assert isinstance(created_row, dict)
assert created_row["id"] == 456
assert created_row["tableId"] == 123
logger.info(f"Testing update_row for row ID: {row_id} with data: {update_data}")
# Verify the data was transformed to string keys
call_args = mock_make_request.call_args
assert call_args[1]["json"]["data"]["1"] == "Test Name"
assert call_args[1]["json"]["data"]["2"] == 99
updated_row = await nc_client.tables.update_row(row_id, update_data)
async def test_tables_update_row(mocker):
"""Test that update_row correctly parses the API response."""
mock_response = create_mock_table_row_response(
row_id=456,
table_id=123,
data=[
{"columnId": 1, "value": "Updated Name"},
{"columnId": 2, "value": 100},
],
)
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
client = TablesClient(mock_client, "testuser")
update_data = {1: "Updated Name", 2: 100}
updated_row = await client.update_row(row_id=456, data=update_data)
assert isinstance(updated_row, dict)
assert "id" in updated_row
assert updated_row["id"] == row_id
assert updated_row["id"] == 456
# Verify the row was updated by reading it back
await anyio.sleep(1) # Allow potential propagation delay
table_id = sample_table_info["table_id"]
rows = await nc_client.tables.get_table_rows(table_id)
# Find the updated row in the results
found_row = None
for row in rows:
if row["id"] == row_id:
found_row = row
break
assert found_row is not None, f"Updated row with ID {row_id} not found in table"
logger.info(f"Successfully updated row with ID: {row_id}")
# Verify the PUT request was made
call_args = mock_make_request.call_args
assert call_args[0][0] == "PUT"
assert "/rows/456" in call_args[0][1]
async def test_tables_delete_row(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test deleting a row from a table.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
# First create a row to delete
test_data = {}
unique_suffix = uuid.uuid4().hex[:8]
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
if column_type == "text":
test_data[column_id] = f"Test Delete {column_title} {unique_suffix}"
elif column_type == "number":
test_data[column_id] = 789
elif column_type == "datetime":
test_data[column_id] = "2024-06-15T10:30:00Z"
elif column_type == "select":
options = column.get("selectOptions", [])
if options:
test_data[column_id] = options[0].get("label", "Option 1")
else:
test_data[column_id] = "Delete Option"
else:
test_data[column_id] = f"Test Delete {column_title} {unique_suffix}"
logger.info(f"Creating row for delete test in table ID: {table_id}")
created_row = await nc_client.tables.create_row(table_id, test_data)
row_id = created_row["id"]
logger.info(f"Testing delete_row for row ID: {row_id}")
# Delete the row
delete_result = await nc_client.tables.delete_row(row_id)
assert isinstance(delete_result, dict)
# The delete response might vary, but it should be successful
# Verify the row was deleted by trying to find it
await anyio.sleep(1) # Allow potential propagation delay
rows = await nc_client.tables.get_table_rows(table_id)
# Ensure the deleted row is not in the results
found_row = None
for row in rows:
if row["id"] == row_id:
found_row = row
break
assert found_row is None, f"Deleted row with ID {row_id} still found in table"
logger.info(f"Successfully deleted row with ID: {row_id}")
async def test_tables_delete_nonexistent_row(nc_client: NextcloudClient):
"""
Test that deleting a non-existent row fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
logger.info(f"Testing delete_row for non-existent row ID: {non_existent_id}")
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.delete_row(non_existent_id)
# Accept both 404 and 500 as valid error responses for non-existent rows
# The API behavior may vary between Nextcloud versions
assert excinfo.value.response.status_code in [404, 500]
logger.info(
f"Deleting non-existent row ID: {non_existent_id} correctly failed with {excinfo.value.response.status_code}."
async def test_tables_delete_row(mocker):
"""Test that delete_row correctly parses the API response."""
mock_response = create_mock_response(
status_code=200, json_data={"message": "Row deleted"}
)
async def test_tables_transform_row_data(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test the transform_row_data utility method.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
logger.info(f"Testing transform_row_data for table ID: {table_id}")
# Get some rows to transform
rows = await nc_client.tables.get_table_rows(table_id, limit=5)
if not rows:
logger.info("No rows to transform, skipping transform_row_data test")
return
# Transform the rows
transformed_rows = nc_client.tables.transform_row_data(rows, columns)
assert isinstance(transformed_rows, list)
assert len(transformed_rows) == len(rows)
# Check the structure of transformed rows
for i, transformed_row in enumerate(transformed_rows):
original_row = rows[i]
assert "id" in transformed_row
assert "tableId" in transformed_row
assert "data" in transformed_row
assert transformed_row["id"] == original_row["id"]
assert transformed_row["tableId"] == original_row["tableId"]
assert isinstance(transformed_row["data"], dict)
# Check that column IDs were transformed to column names
for column in columns:
column_title = column["title"]
# The transformed data should have column names as keys
# (though the column might not have data in this row)
if any(item["columnId"] == column["id"] for item in original_row["data"]):
assert column_title in transformed_row["data"]
logger.info(f"Successfully transformed {len(transformed_rows)} rows")
async def test_tables_get_nonexistent_table_schema(nc_client: NextcloudClient):
"""
Test that getting schema for a non-existent table fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
logger.info(
f"Testing get_table_schema for non-existent table ID: {non_existent_id}"
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(
TablesClient, "_make_request", return_value=mock_response
)
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.get_table_schema(non_existent_id)
client = TablesClient(mock_client, "testuser")
result = await client.delete_row(row_id=456)
assert isinstance(result, dict)
# Verify the DELETE request was made
call_args = mock_make_request.call_args
assert call_args[0][0] == "DELETE"
assert "/rows/456" in call_args[0][1]
async def test_tables_delete_nonexistent_row(mocker):
"""Test that deleting a non-existent row raises HTTPStatusError."""
error_response = create_mock_error_response(404, "Row not found")
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(TablesClient, "_make_request")
mock_make_request.side_effect = httpx.HTTPStatusError(
"404 Not Found",
request=httpx.Request("DELETE", "http://test.local"),
response=error_response,
)
client = TablesClient(mock_client, "testuser")
with pytest.raises(httpx.HTTPStatusError) as excinfo:
await client.delete_row(row_id=999999999)
assert excinfo.value.response.status_code == 404
logger.info(
f"Getting schema for non-existent table ID: {non_existent_id} correctly failed with 404."
async def test_tables_get_nonexistent_schema(mocker):
"""Test that getting schema for non-existent table raises HTTPStatusError."""
error_response = create_mock_error_response(404, "Table not found")
mock_client = mocker.AsyncMock(spec=httpx.AsyncClient)
mock_make_request = mocker.patch.object(TablesClient, "_make_request")
mock_make_request.side_effect = httpx.HTTPStatusError(
"404 Not Found",
request=httpx.Request("GET", "http://test.local"),
response=error_response,
)
client = TablesClient(mock_client, "testuser")
async def test_tables_read_nonexistent_table(nc_client: NextcloudClient):
"""
Test that reading from a non-existent table fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
logger.info(f"Testing get_table_rows for non-existent table ID: {non_existent_id}")
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.get_table_rows(non_existent_id)
with pytest.raises(httpx.HTTPStatusError) as excinfo:
await client.get_table_schema(table_id=999999999)
assert excinfo.value.response.status_code == 404
logger.info(
f"Reading from non-existent table ID: {non_existent_id} correctly failed with 404."
)
async def test_tables_create_row_invalid_table(nc_client: NextcloudClient):
"""
Test that creating a row in a non-existent table fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
test_data = {1: "test value"}
def test_tables_transform_row_data():
"""Test the transform_row_data utility method (synchronous)."""
# This is a pure function, no mocking needed
client = TablesClient(None, "testuser") # Client not used for this method
logger.info(f"Testing create_row for non-existent table ID: {non_existent_id}")
raw_rows = [
{
"id": 1,
"tableId": 123,
"createdBy": "testuser",
"createdAt": "2024-01-01T00:00:00+00:00",
"lastEditBy": "testuser",
"lastEditAt": "2024-01-01T00:00:00+00:00",
"data": [
{"columnId": 1, "value": "John Doe"},
{"columnId": 2, "value": 30},
{"columnId": 3, "value": "john@example.com"},
],
},
{
"id": 2,
"tableId": 123,
"createdBy": "testuser",
"createdAt": "2024-01-01T00:00:00+00:00",
"lastEditBy": "testuser",
"lastEditAt": "2024-01-01T00:00:00+00:00",
"data": [
{"columnId": 1, "value": "Jane Smith"},
{"columnId": 2, "value": 25},
{"columnId": 3, "value": "jane@example.com"},
],
},
]
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.create_row(non_existent_id, test_data)
columns = [
{"id": 1, "title": "Name", "type": "text"},
{"id": 2, "title": "Age", "type": "number"},
{"id": 3, "title": "Email", "type": "text"},
]
assert excinfo.value.response.status_code == 404
logger.info(
f"Creating row in non-existent table ID: {non_existent_id} correctly failed with 404."
)
transformed = client.transform_row_data(raw_rows, columns)
assert len(transformed) == 2
assert transformed[0]["id"] == 1
assert transformed[0]["data"]["Name"] == "John Doe"
assert transformed[0]["data"]["Age"] == 30
assert transformed[0]["data"]["Email"] == "john@example.com"
assert transformed[1]["data"]["Name"] == "Jane Smith"
assert transformed[1]["data"]["Age"] == 25