feat: Add TablesClient and associated tools

This commit is contained in:
Chris Coutinho
2025-07-06 09:18:34 +02:00
parent 57440f845f
commit a1c186aa95
5 changed files with 10855 additions and 80 deletions
+2 -73
View File
@@ -10,6 +10,7 @@ import logging
from .notes_client import NotesClient
from .webdav_client import WebDAVClient
from .tables_client import TablesClient
from .controllers.notes_search import NotesSearchController
logger = logging.getLogger(__name__)
@@ -44,6 +45,7 @@ class NextcloudClient:
# Initialize app clients
self.notes = NotesClient(self._client, username)
self.webdav = WebDAVClient(self._client, username)
self.tables = TablesClient(self._client, username)
# Initialize controllers
self._notes_search = NotesSearchController()
@@ -67,84 +69,11 @@ class NextcloudClient:
return response.json()
# Convenience methods that delegate to subclients
async def notes_get_settings(self):
"""Get Notes app settings."""
return await self.notes.get_settings()
async def notes_get_all(self):
"""Get all notes."""
return await self.notes.get_all_notes()
async def notes_get_note(self, *, note_id: int):
"""Get a specific note."""
return await self.notes.get_note(note_id)
async def notes_create_note(
self,
*,
title: str | None = None,
content: str | None = None,
category: str | None = None,
):
"""Create a new note."""
return await self.notes.create_note(
title=title, content=content, category=category
)
async def notes_update_note(
self,
*,
note_id: int,
etag: str,
title: str | None = None,
content: str | None = None,
category: str | None = None,
):
"""Update a note."""
return await self.notes.update(
note_id=note_id, etag=etag, title=title, content=content, category=category
)
async def notes_append_content(self, *, note_id: int, content: str):
"""Append content to an existing note with a separator."""
return await self.notes.append_content(note_id=note_id, content=content)
async def notes_search_notes(self, *, query: str):
"""Search notes using token-based matching with relevance ranking."""
all_notes = await self.notes.get_all_notes()
return self._notes_search.search_notes(all_notes, query)
async def notes_delete_note(self, *, note_id: int):
"""Delete a note and its attachments."""
return await self.notes.delete_note(note_id)
async def add_note_attachment(
self,
*,
note_id: int,
filename: str,
content: bytes,
category: str | None = None,
mime_type: str | None = None,
):
"""Add/Update an attachment to a note via WebDAV PUT."""
return await self.webdav.add_note_attachment(
note_id=note_id,
filename=filename,
content=content,
category=category,
mime_type=mime_type,
)
async def get_note_attachment(
self, *, note_id: int, filename: str, category: str | None = None
):
"""Fetch a specific attachment from a note via WebDAV GET."""
return await self.webdav.get_note_attachment(
note_id=note_id, filename=filename, category=category
)
def _get_webdav_base_path(self) -> str:
"""Helper to get the base WebDAV path for the authenticated user."""
return f"/remote.php/dav/files/{self.username}"
+61 -7
View File
@@ -53,21 +53,21 @@ async def notes_get_settings():
mcp.get_context()
) # https://github.com/modelcontextprotocol/python-sdk/issues/244
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_get_settings()
return await client.notes.get_settings()
@mcp.tool()
async def nc_get_note(note_id: int, ctx: Context):
"""Get user note using note id"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_get_note(note_id=note_id)
return await client.notes.get_note(note_id)
@mcp.tool()
async def nc_notes_create_note(title: str, content: str, category: str, ctx: Context):
"""Create a new note"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_create_note(
return await client.notes.create_note(
title=title,
content=content,
category=category,
@@ -85,7 +85,7 @@ async def nc_notes_update_note(
):
logger.info("Updating note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_update_note(
return await client.notes.update(
note_id=note_id,
etag=etag,
title=title,
@@ -99,7 +99,7 @@ async def nc_notes_append_content(note_id: int, content: str, ctx: Context):
"""Append content to an existing note with a clear separator"""
logger.info("Appending content to note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_append_content(note_id=note_id, content=content)
return await client.notes.append_content(note_id=note_id, content=content)
@mcp.tool()
@@ -113,7 +113,61 @@ async def nc_notes_search_notes(query: str, ctx: Context):
async def nc_notes_delete_note(note_id: int, ctx: Context):
logger.info("Deleting note %s", note_id)
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.notes_delete_note(note_id=note_id)
return await client.notes.delete_note(note_id)
# Tables tools
@mcp.tool()
async def nc_tables_list_tables(ctx: Context):
"""List all tables available to the user"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.tables.list_tables()
@mcp.tool()
async def nc_tables_get_schema(table_id: int, ctx: Context):
"""Get the schema/structure of a specific table including columns and views"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.tables.get_table_schema(table_id)
@mcp.tool()
async def nc_tables_read_table(
table_id: int,
limit: int | None = None,
offset: int | None = None,
ctx: Context = None,
):
"""Read rows from a table with optional pagination"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.tables.get_table_rows(table_id, limit, offset)
@mcp.tool()
async def nc_tables_insert_row(table_id: int, data: dict, ctx: Context):
"""Insert a new row into a table.
Data should be a dictionary mapping column IDs to values, e.g. {1: "text", 2: 42}
"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.tables.create_row(table_id, data)
@mcp.tool()
async def nc_tables_update_row(row_id: int, data: dict, ctx: Context):
"""Update an existing row in a table.
Data should be a dictionary mapping column IDs to new values, e.g. {1: "new text", 2: 99}
"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.tables.update_row(row_id, data)
@mcp.tool()
async def nc_tables_delete_row(row_id: int, ctx: Context):
"""Delete a row from a table"""
client: NextcloudClient = ctx.request_context.lifespan_context.client
return await client.tables.delete_row(row_id)
@mcp.resource("nc://Notes/{note_id}/attachments/{attachment_filename}")
@@ -123,7 +177,7 @@ async def nc_notes_get_attachment(note_id: int, attachment_filename: str):
client: NextcloudClient = ctx.request_context.lifespan_context.client
# Assuming a method get_note_attachment exists in the client
# This method should return the raw content and determine the mime type
content, mime_type = await client.get_note_attachment(
content, mime_type = await client.webdav.get_note_attachment(
note_id=note_id, filename=attachment_filename
)
return {
+125
View File
@@ -0,0 +1,125 @@
"""Client for Nextcloud Tables app operations."""
from typing import Dict, List, Any, Optional
import logging
from .base_client import BaseNextcloudClient
logger = logging.getLogger(__name__)
class TablesClient(BaseNextcloudClient):
"""Client for Nextcloud Tables app operations."""
async def list_tables(self) -> List[Dict[str, Any]]:
"""List all tables available to the user."""
response = await self._make_request(
"GET",
"/ocs/v2.php/apps/tables/api/2/tables",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
)
result = response.json()
return result["ocs"]["data"]
async def get_table_schema(self, table_id: int) -> Dict[str, Any]:
"""Get the schema/structure of a specific table including columns and views."""
# Using v1 API as v2 schema endpoint had issues during testing
response = await self._make_request(
"GET", f"/index.php/apps/tables/api/1/tables/{table_id}/scheme"
)
return response.json()
async def get_table_rows(
self, table_id: int, limit: Optional[int] = None, offset: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Read rows from a table with optional pagination."""
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
response = await self._make_request(
"GET", f"/index.php/apps/tables/api/1/tables/{table_id}/rows", params=params
)
return response.json()
async def create_row(self, table_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
"""Insert a new row into a table.
Args:
table_id: ID of the table to insert into
data: Dictionary mapping column IDs to values, e.g. {1: "text", 2: 42}
"""
# Transform data to API format: {"data": {"1": "text", "2": 42}}
api_data = {str(k): v for k, v in data.items()}
response = await self._make_request(
"POST",
f"/ocs/v2.php/apps/tables/api/2/tables/{table_id}/rows",
headers={"OCS-APIRequest": "true", "Accept": "application/json"},
json={"data": api_data},
)
result = response.json()
return result["ocs"]["data"]
async def update_row(self, row_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing row in a table.
Args:
row_id: ID of the row to update
data: Dictionary mapping column IDs to new values, e.g. {1: "new text", 2: 99}
"""
# Transform data to API format for v1 endpoint
api_data = {str(k): v for k, v in data.items()}
response = await self._make_request(
"PUT",
f"/index.php/apps/tables/api/1/rows/{row_id}",
json={"data": api_data},
)
return response.json()
async def delete_row(self, row_id: int) -> Dict[str, Any]:
"""Delete a row from a table."""
response = await self._make_request(
"DELETE", f"/index.php/apps/tables/api/1/rows/{row_id}"
)
return response.json()
def transform_row_data(
self, rows: List[Dict[str, Any]], columns: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Transform raw row data into more readable format using column names.
Args:
rows: Raw row data from the API
columns: Column definitions from table schema
Returns:
List of rows with column names as keys instead of column IDs
"""
# Create mapping from column ID to column title
column_map = {col["id"]: col["title"] for col in columns}
transformed_rows = []
for row in rows:
transformed_row = {
"id": row["id"],
"tableId": row["tableId"],
"createdBy": row["createdBy"],
"createdAt": row["createdAt"],
"lastEditBy": row["lastEditBy"],
"lastEditAt": row["lastEditAt"],
"data": {},
}
# Transform data array to column_name: value mapping
for item in row["data"]:
column_id = item["columnId"]
column_name = column_map.get(column_id, f"column_{column_id}")
transformed_row["data"][column_name] = item["value"]
transformed_rows.append(transformed_row)
return transformed_rows
+10133
View File
File diff suppressed because it is too large Load Diff
+534
View File
@@ -0,0 +1,534 @@
import pytest
import logging
import asyncio
import uuid
from httpx import HTTPStatusError
from typing import Dict, Any
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
# Mark all tests in this module as integration tests
pytestmark = pytest.mark.integration
@pytest.fixture(scope="session")
async def sample_table_info(nc_client: NextcloudClient) -> Dict[str, Any]:
"""
Fixture to get information about the sample table that comes with Nextcloud Tables.
This assumes that the sample table exists in the Nextcloud instance.
"""
logger.info("Looking for sample table in Nextcloud Tables app")
# Get all tables
tables = await nc_client.tables.list_tables()
# Look for a sample table (usually created by default)
sample_table = None
for table in tables:
# Common names for sample tables
if any(
keyword in table.get("title", "").lower()
for keyword in ["sample", "demo", "example", "test"]
):
sample_table = table
break
if not sample_table and tables:
# If no sample table found, use the first available table
sample_table = tables[0]
logger.info(
f"No sample table found, using first available table: {sample_table.get('title')}"
)
if not sample_table:
pytest.skip(
"No tables found in Nextcloud Tables app. Please ensure Tables app is installed and has at least one table."
)
# Get the schema for the sample table
table_id = sample_table["id"]
schema = await nc_client.tables.get_table_schema(table_id)
logger.info(f"Using sample table: {sample_table.get('title')} (ID: {table_id})")
return {
"table": sample_table,
"schema": schema,
"table_id": table_id,
"columns": schema.get("columns", []),
}
@pytest.fixture
async def temporary_table_row(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Fixture to create a temporary row in the sample table for testing.
Yields the created row data and cleans up afterward.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
# Create test data based on the table schema
test_data = {}
unique_suffix = uuid.uuid4().hex[:8]
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
# Generate test data based on column type
if column_type == "text":
test_data[column_id] = f"Test {column_title} {unique_suffix}"
elif column_type == "number":
test_data[column_id] = 42
elif column_type == "datetime":
test_data[column_id] = "2024-01-01T12:00:00Z"
elif column_type == "select":
# For select columns, use the first option if available
options = column.get("selectOptions", [])
if options:
test_data[column_id] = options[0].get("label", "Option 1")
else:
test_data[column_id] = "Test Option"
else:
# Default to text for unknown types
test_data[column_id] = f"Test {column_title} {unique_suffix}"
logger.info(f"Creating temporary row in table {table_id} with data: {test_data}")
created_row = None
try:
created_row = await nc_client.tables.create_row(table_id, test_data)
row_id = created_row.get("id")
if not row_id:
pytest.fail("Failed to get ID from created temporary row.")
logger.info(f"Temporary row created with ID: {row_id}")
yield created_row
finally:
if created_row and created_row.get("id"):
row_id = created_row["id"]
logger.info(f"Cleaning up temporary row ID: {row_id}")
try:
await nc_client.tables.delete_row(row_id)
logger.info(f"Successfully deleted temporary row ID: {row_id}")
except HTTPStatusError as e:
# Ignore 404 if row was already deleted by the test itself
if e.response.status_code != 404:
logger.error(f"HTTP error deleting temporary row {row_id}: {e}")
else:
logger.warning(f"Temporary row {row_id} already deleted (404).")
except Exception as e:
logger.error(f"Unexpected error deleting temporary row {row_id}: {e}")
async def test_tables_list_tables(nc_client: NextcloudClient):
"""
Test listing all tables available to the user.
"""
logger.info("Testing list_tables functionality")
tables = await nc_client.tables.list_tables()
assert isinstance(tables, list)
assert len(tables) > 0, "Expected at least one table to be available"
# Check that each table has required fields
for table in tables:
assert "id" in table
assert "title" in table
assert isinstance(table["id"], int)
assert isinstance(table["title"], str)
logger.info(f"Successfully listed {len(tables)} tables")
async def test_tables_get_schema(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test getting the schema/structure of a specific table.
"""
table_id = sample_table_info["table_id"]
logger.info(f"Testing get_table_schema for table ID: {table_id}")
schema = await nc_client.tables.get_table_schema(table_id)
assert isinstance(schema, dict)
assert "columns" in schema
assert isinstance(schema["columns"], list)
assert len(schema["columns"]) > 0, "Expected at least one column in the table"
# Check that each column has required fields
for column in schema["columns"]:
assert "id" in column
assert "title" in column
assert "type" in column
assert isinstance(column["id"], int)
assert isinstance(column["title"], str)
assert isinstance(column["type"], str)
logger.info(f"Successfully retrieved schema with {len(schema['columns'])} columns")
async def test_tables_read_table(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test reading rows from a table.
"""
table_id = sample_table_info["table_id"]
logger.info(f"Testing get_table_rows for table ID: {table_id}")
# Test without pagination
rows = await nc_client.tables.get_table_rows(table_id)
assert isinstance(rows, list)
# Note: The table might be empty, so we don't assert len > 0
# Test with pagination
rows_limited = await nc_client.tables.get_table_rows(table_id, limit=5, offset=0)
assert isinstance(rows_limited, list)
assert len(rows_limited) <= 5
# If there are rows, check their structure
if rows:
row = rows[0]
assert "id" in row
assert "tableId" in row
assert "data" in row
assert isinstance(row["id"], int)
assert isinstance(row["tableId"], int)
assert isinstance(row["data"], list)
logger.info(f"Successfully read {len(rows)} rows from table")
async def test_tables_create_row(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test creating a new row in a table.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
# Create test data based on the table schema
test_data = {}
unique_suffix = uuid.uuid4().hex[:8]
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
# Generate test data based on column type
if column_type == "text":
test_data[column_id] = f"Test Create {column_title} {unique_suffix}"
elif column_type == "number":
test_data[column_id] = 123
elif column_type == "datetime":
test_data[column_id] = "2024-01-01T12:00:00Z"
elif column_type == "select":
# For select columns, use the first option if available
options = column.get("selectOptions", [])
if options:
test_data[column_id] = options[0].get("label", "Option 1")
else:
test_data[column_id] = "Test Option"
else:
# Default to text for unknown types
test_data[column_id] = f"Test Create {column_title} {unique_suffix}"
logger.info(f"Testing create_row for table ID: {table_id} with data: {test_data}")
created_row = None
try:
created_row = await nc_client.tables.create_row(table_id, test_data)
assert isinstance(created_row, dict)
assert "id" in created_row
assert "tableId" in created_row
assert isinstance(created_row["id"], int)
assert created_row["tableId"] == table_id
# Verify the row was created by reading it back
await asyncio.sleep(1) # Allow potential propagation delay
rows = await nc_client.tables.get_table_rows(table_id)
created_row_id = created_row["id"]
# Find the created row in the results
found_row = None
for row in rows:
if row["id"] == created_row_id:
found_row = row
break
assert found_row is not None, (
f"Created row with ID {created_row_id} not found in table"
)
logger.info(f"Successfully created row with ID: {created_row_id}")
finally:
# Clean up the created row
if created_row and created_row.get("id"):
try:
await nc_client.tables.delete_row(created_row["id"])
logger.info(f"Cleaned up created row ID: {created_row['id']}")
except Exception as e:
logger.warning(f"Failed to clean up created row: {e}")
async def test_tables_update_row(
nc_client: NextcloudClient,
temporary_table_row: Dict[str, Any],
sample_table_info: Dict[str, Any],
):
"""
Test updating an existing row in a table.
"""
row_id = temporary_table_row["id"]
columns = sample_table_info["columns"]
# Create updated data
update_data = {}
unique_suffix = uuid.uuid4().hex[:8]
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
# Generate updated test data based on column type
if column_type == "text":
update_data[column_id] = f"Updated {column_title} {unique_suffix}"
elif column_type == "number":
update_data[column_id] = 456
elif column_type == "datetime":
update_data[column_id] = "2024-12-31T23:59:59Z"
elif column_type == "select":
# For select columns, use the first option if available
options = column.get("selectOptions", [])
if options:
update_data[column_id] = options[0].get("label", "Option 1")
else:
update_data[column_id] = "Updated Option"
else:
# Default to text for unknown types
update_data[column_id] = f"Updated {column_title} {unique_suffix}"
logger.info(f"Testing update_row for row ID: {row_id} with data: {update_data}")
updated_row = await nc_client.tables.update_row(row_id, update_data)
assert isinstance(updated_row, dict)
assert "id" in updated_row
assert updated_row["id"] == row_id
# Verify the row was updated by reading it back
await asyncio.sleep(1) # Allow potential propagation delay
table_id = sample_table_info["table_id"]
rows = await nc_client.tables.get_table_rows(table_id)
# Find the updated row in the results
found_row = None
for row in rows:
if row["id"] == row_id:
found_row = row
break
assert found_row is not None, f"Updated row with ID {row_id} not found in table"
logger.info(f"Successfully updated row with ID: {row_id}")
async def test_tables_delete_row(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test deleting a row from a table.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
# First create a row to delete
test_data = {}
unique_suffix = uuid.uuid4().hex[:8]
for column in columns:
column_id = column["id"]
column_type = column.get("type", "text")
column_title = column.get("title", f"column_{column_id}")
if column_type == "text":
test_data[column_id] = f"Test Delete {column_title} {unique_suffix}"
elif column_type == "number":
test_data[column_id] = 789
elif column_type == "datetime":
test_data[column_id] = "2024-06-15T10:30:00Z"
elif column_type == "select":
options = column.get("selectOptions", [])
if options:
test_data[column_id] = options[0].get("label", "Option 1")
else:
test_data[column_id] = "Delete Option"
else:
test_data[column_id] = f"Test Delete {column_title} {unique_suffix}"
logger.info(f"Creating row for delete test in table ID: {table_id}")
created_row = await nc_client.tables.create_row(table_id, test_data)
row_id = created_row["id"]
logger.info(f"Testing delete_row for row ID: {row_id}")
# Delete the row
delete_result = await nc_client.tables.delete_row(row_id)
assert isinstance(delete_result, dict)
# The delete response might vary, but it should be successful
# Verify the row was deleted by trying to find it
await asyncio.sleep(1) # Allow potential propagation delay
rows = await nc_client.tables.get_table_rows(table_id)
# Ensure the deleted row is not in the results
found_row = None
for row in rows:
if row["id"] == row_id:
found_row = row
break
assert found_row is None, f"Deleted row with ID {row_id} still found in table"
logger.info(f"Successfully deleted row with ID: {row_id}")
async def test_tables_delete_nonexistent_row(nc_client: NextcloudClient):
"""
Test that deleting a non-existent row fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
logger.info(f"Testing delete_row for non-existent row ID: {non_existent_id}")
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.delete_row(non_existent_id)
# Accept both 404 and 500 as valid error responses for non-existent rows
# The API behavior may vary between Nextcloud versions
assert excinfo.value.response.status_code in [404, 500]
logger.info(
f"Deleting non-existent row ID: {non_existent_id} correctly failed with {excinfo.value.response.status_code}."
)
async def test_tables_transform_row_data(
nc_client: NextcloudClient, sample_table_info: Dict[str, Any]
):
"""
Test the transform_row_data utility method.
"""
table_id = sample_table_info["table_id"]
columns = sample_table_info["columns"]
logger.info(f"Testing transform_row_data for table ID: {table_id}")
# Get some rows to transform
rows = await nc_client.tables.get_table_rows(table_id, limit=5)
if not rows:
logger.info("No rows to transform, skipping transform_row_data test")
return
# Transform the rows
transformed_rows = nc_client.tables.transform_row_data(rows, columns)
assert isinstance(transformed_rows, list)
assert len(transformed_rows) == len(rows)
# Check the structure of transformed rows
for i, transformed_row in enumerate(transformed_rows):
original_row = rows[i]
assert "id" in transformed_row
assert "tableId" in transformed_row
assert "data" in transformed_row
assert transformed_row["id"] == original_row["id"]
assert transformed_row["tableId"] == original_row["tableId"]
assert isinstance(transformed_row["data"], dict)
# Check that column IDs were transformed to column names
for column in columns:
column_title = column["title"]
# The transformed data should have column names as keys
# (though the column might not have data in this row)
if any(item["columnId"] == column["id"] for item in original_row["data"]):
assert column_title in transformed_row["data"]
logger.info(f"Successfully transformed {len(transformed_rows)} rows")
async def test_tables_get_nonexistent_table_schema(nc_client: NextcloudClient):
"""
Test that getting schema for a non-existent table fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
logger.info(
f"Testing get_table_schema for non-existent table ID: {non_existent_id}"
)
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.get_table_schema(non_existent_id)
assert excinfo.value.response.status_code == 404
logger.info(
f"Getting schema for non-existent table ID: {non_existent_id} correctly failed with 404."
)
async def test_tables_read_nonexistent_table(nc_client: NextcloudClient):
"""
Test that reading from a non-existent table fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
logger.info(f"Testing get_table_rows for non-existent table ID: {non_existent_id}")
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.get_table_rows(non_existent_id)
assert excinfo.value.response.status_code == 404
logger.info(
f"Reading from non-existent table ID: {non_existent_id} correctly failed with 404."
)
async def test_tables_create_row_invalid_table(nc_client: NextcloudClient):
"""
Test that creating a row in a non-existent table fails appropriately.
"""
non_existent_id = 999999999 # Use an ID highly unlikely to exist
test_data = {1: "test value"}
logger.info(f"Testing create_row for non-existent table ID: {non_existent_id}")
with pytest.raises(HTTPStatusError) as excinfo:
await nc_client.tables.create_row(non_existent_id, test_data)
assert excinfo.value.response.status_code == 404
logger.info(
f"Creating row in non-existent table ID: {non_existent_id} correctly failed with 404."
)