fix(mcp): Move all imports to the top of modules

This commit is contained in:
Chris Coutinho
2025-12-26 10:05:27 -06:00
parent b841407f07
commit 056414752e
41 changed files with 85 additions and 152 deletions
+3 -2
View File
@@ -1,3 +1,5 @@
import json
import httpx
# ============================================================================
@@ -22,14 +24,13 @@ def create_mock_response(
Returns:
Mock httpx.Response object
"""
import json as json_module
if headers is None:
headers = {}
# If json_data is provided, serialize it to content
if json_data is not None:
content = json_module.dumps(json_data).encode("utf-8")
content = json.dumps(json_data).encode("utf-8")
headers.setdefault("content-type", "application/json")
if content is None:
+10 -25
View File
@@ -1,7 +1,17 @@
import base64
import hashlib
import json
import logging
import os
import re
import secrets
import subprocess
import threading
import time
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, AsyncGenerator
from urllib.parse import parse_qs, quote, urlparse
import anyio
import httpx
@@ -257,7 +267,6 @@ async def nc_mcp_basic_auth_client(
Uses anyio pytest plugin for proper async fixture handling.
"""
import base64
credentials = base64.b64encode(b"admin:admin").decode("utf-8")
auth_header = f"Basic {credentials}"
@@ -342,7 +351,6 @@ async def nc_mcp_oauth_client_with_elicitation(
logger.info(f" Schema: {params.schema}")
# Extract OAuth URL from elicitation message
import re
url_pattern = r"https?://[^\s]+"
urls = re.findall(url_pattern, params.message)
@@ -1108,10 +1116,6 @@ def oauth_callback_server():
# "OAuth tests with browser automation not supported in GitHub Actions CI"
# )
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, urlparse
# Use a dict to store auth codes keyed by state parameter
# This allows multiple concurrent OAuth flows
auth_states = {}
@@ -1758,9 +1762,6 @@ async def playwright_oauth_token(
- Browser fixture provided by pytest-playwright-asyncio
- See: https://playwright.dev/python/docs/test-runners
"""
import secrets
import time
from urllib.parse import quote
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
username = os.getenv("NEXTCLOUD_USERNAME")
@@ -2047,9 +2048,6 @@ async def _get_oauth_token_with_scopes(
Returns:
OAuth access token string with requested scopes
"""
import secrets
import time
from urllib.parse import quote
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
username = os.getenv("NEXTCLOUD_USERNAME")
@@ -2417,9 +2415,6 @@ async def _get_oauth_token_for_user(
Returns:
OAuth access token string
"""
import secrets
import time
from urllib.parse import quote
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
@@ -2560,7 +2555,6 @@ async def all_oauth_tokens(
Now uses the real callback server with state parameters for reliable
concurrent token acquisition without race conditions.
"""
import time
# Get auth_states dict from callback server
auth_states, callback_url = oauth_callback_server
@@ -2711,7 +2705,6 @@ async def test_user(nc_client: NextcloudClient):
user_config = test_user
await nc_client.users.create_user(**user_config)
"""
import uuid
# Generate unique user ID to avoid conflicts
userid = f"testuser_{uuid.uuid4().hex[:8]}"
@@ -2747,7 +2740,6 @@ async def test_group(nc_client: NextcloudClient):
Returns the group ID.
"""
import uuid
# Generate unique group ID to avoid conflicts
groupid = f"testgroup_{uuid.uuid4().hex[:8]}"
@@ -2882,11 +2874,6 @@ async def _get_keycloak_oauth_token(
Returns:
OAuth access token string from Keycloak
"""
import base64
import hashlib
import secrets
import time
from urllib.parse import quote
# Get auth_states dict from callback server
auth_states, _ = oauth_callback_server
@@ -3252,8 +3239,6 @@ async def configure_astrolabe_for_mcp_server(nc_client):
- mcp_server_public_url: Public URL for OAuth token audience validation
- client_id: Optional OAuth client ID (default: "nextcloudMcpServerUIPublicClient")
"""
import json
import subprocess
async def _configure(
mcp_server_internal_url: str,
@@ -1,6 +1,7 @@
"""Integration tests for document processing with progress notifications."""
import io
import os
import pytest
from PIL import Image
@@ -13,7 +14,6 @@ class TestDocumentProcessingProgress:
async def test_unstructured_processor_with_progress_callback(self, nc_client):
"""Test that UnstructuredProcessor calls progress callback during processing."""
import os
# Skip if unstructured is not enabled
if os.getenv("ENABLE_UNSTRUCTURED", "false").lower() != "true":
@@ -71,7 +71,6 @@ class TestDocumentProcessingProgress:
self, nc_mcp_client, nc_client
):
"""Test that reading a document via WebDAV MCP tool sends progress notifications."""
import os
# Skip if document processing is not enabled
if os.getenv("ENABLE_DOCUMENT_PROCESSING", "false").lower() != "true":
@@ -110,7 +109,6 @@ class TestDocumentProcessingProgress:
async def test_progress_callback_not_required(self, nc_client):
"""Test that processing works without progress callback (backward compatibility)."""
import os
if os.getenv("ENABLE_UNSTRUCTURED", "false").lower() != "true":
pytest.skip("Unstructured processor not enabled")
@@ -13,6 +13,8 @@ app password entry → background sync activation → database verification.
"""
import logging
import re
import subprocess
import anyio
import pytest
@@ -151,7 +153,6 @@ async def generate_app_password(
)
# Validate password format before returning
import re
if not re.match(
r"^[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}-[a-zA-Z0-9]{5}$",
@@ -350,7 +351,6 @@ async def verify_app_password_created(username: str) -> bool:
# Query the database to check for background sync credentials
# Astrolabe stores app passwords in oc_preferences, not oc_authtoken
import subprocess
query = f"""
SELECT userid, configkey, configvalue
+5 -8
View File
@@ -16,6 +16,7 @@ vector database with indexed test data.
import json
from unittest.mock import MagicMock
import anyio
import pytest
from mcp.types import CreateMessageResult, TextContent
@@ -67,7 +68,6 @@ async def test_semantic_search_answer_successful_sampling(
await require_vector_sync_tools(nc_mcp_client)
# Get initial indexed count before creating note
import asyncio
initial_sync = await nc_mcp_client.call_tool(
"nc_get_vector_sync_status", arguments={}
@@ -118,7 +118,7 @@ Avoid blocking operations in async code.""",
)
break
await asyncio.sleep(wait_interval)
await anyio.sleep(wait_interval)
waited += wait_interval
# Verify sync completed
@@ -247,7 +247,6 @@ async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_f
)
# Wait for vector indexing to complete
import asyncio
max_wait = 30
wait_interval = 1
@@ -262,7 +261,7 @@ async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_f
if status_data["status"] == "idle" and status_data["pending_count"] == 0:
break
await asyncio.sleep(wait_interval)
await anyio.sleep(wait_interval)
waited += wait_interval
assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds"
@@ -306,7 +305,6 @@ async def test_semantic_search_answer_score_threshold(
)
# Wait for vector indexing to complete
import asyncio
max_wait = 30
wait_interval = 1
@@ -321,7 +319,7 @@ async def test_semantic_search_answer_score_threshold(
if status_data["status"] == "idle" and status_data["pending_count"] == 0:
break
await asyncio.sleep(wait_interval)
await anyio.sleep(wait_interval)
waited += wait_interval
assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds"
@@ -371,7 +369,6 @@ async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_f
)
# Wait for vector indexing to complete
import asyncio
max_wait = 30
wait_interval = 1
@@ -386,7 +383,7 @@ async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_f
if status_data["status"] == "idle" and status_data["pending_count"] == 0:
break
await asyncio.sleep(wait_interval)
await anyio.sleep(wait_interval)
waited += wait_interval
assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds"
+1 -2
View File
@@ -10,6 +10,7 @@ Uses SimpleEmbeddingProvider for deterministic, in-process embeddings
without requiring external services like Ollama.
"""
import math
import tempfile
from pathlib import Path
@@ -147,7 +148,6 @@ async def test_simple_embedding_provider_deterministic(simple_embedding_provider
assert len(embedding1) == 384
# Should be normalized (unit length)
import math
norm = math.sqrt(sum(x * x for x in embedding1))
assert abs(norm - 1.0) < 1e-6
@@ -340,7 +340,6 @@ async def test_batch_embedding(simple_embedding_provider: SimpleEmbeddingProvide
assert all(len(emb) == 384 for emb in embeddings)
# Each should be normalized
import math
for emb in embeddings:
norm = math.sqrt(sum(x * x for x in emb))
+1 -2
View File
@@ -6,6 +6,7 @@ workflow completion rates, and cross-user operation latencies.
"""
import statistics
import time
from collections import Counter, defaultdict
from typing import Any
@@ -44,13 +45,11 @@ class OAuthBenchmarkMetrics:
def start(self):
"""Mark the start of the benchmark."""
import time
self.start_time = time.time()
def stop(self):
"""Mark the end of the benchmark."""
import time
self.end_time = time.time()
+4 -4
View File
@@ -5,8 +5,12 @@ Manages multiple OAuth-authenticated users for realistic multi-user load testing
"""
import logging
import secrets
import string
import time
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote
import anyio
import httpx
@@ -333,8 +337,6 @@ class OAuthUserPool:
TimeoutError: If callback not received within timeout
ValueError: If token exchange fails
"""
import time
from urllib.parse import quote
logger.info(f"Starting Playwright OAuth flow for {username}...")
logger.debug(f"Using state: {state[:16]}...")
@@ -478,8 +480,6 @@ class UserSessionWrapper:
def generate_secure_password(length: int = 20) -> str:
"""Generate a secure random password."""
import secrets
import string
alphabet = string.ascii_letters + string.digits + "!@#$%^&*()"
return "".join(secrets.choice(alphabet) for _ in range(length))
+1 -4
View File
@@ -4,6 +4,7 @@ Workload definitions for load testing the MCP server.
Defines realistic operation mixes and individual operation functions.
"""
import json
import logging
import random
import time
@@ -91,8 +92,6 @@ class WorkloadOperations:
if result and len(result.content) > 0:
content = result.content[0]
if hasattr(content, "text"):
import json
note_data = json.loads(content.text)
note_id = note_data.get("id")
if note_id:
@@ -222,8 +221,6 @@ class MixedWorkload:
"nc_notes_get_note", {"note_id": note_id}
)
if get_result and len(get_result.content) > 0:
import json
note_data = json.loads(get_result.content[0].text)
etag = note_data.get("etag", "")
self._warmup_note_ids.append((note_id, etag))
+1 -1
View File
@@ -18,6 +18,7 @@ Usage:
import asyncio
import logging
import os
import re
import sys
# Add parent directory to path
@@ -127,7 +128,6 @@ async def main():
)
# Extract requesttoken from HTML
import re
token_match = re.search(r'data-requesttoken="([^"]+)"', settings_response.text)
if token_match:
+1 -1
View File
@@ -17,6 +17,7 @@ Architecture:
MCP Client Keycloak DCR Keycloak OAuth MCP Server Nextcloud APIs
"""
import json
import logging
import os
import secrets
@@ -623,7 +624,6 @@ async def test_keycloak_dcr_architecture():
}
logger.info("Keycloak DCR Architecture:")
import json
logger.info(json.dumps(architecture, indent=2))
@@ -11,13 +11,15 @@ Note: Tests use JWT OAuth tokens because scopes are embedded in the token payloa
enabling efficient scope-based tool filtering without additional API calls.
"""
import logging
import httpx
import pytest
@pytest.mark.integration
async def test_prm_endpoint():
"""Test that the Protected Resource Metadata endpoint returns correct data."""
import httpx
# Test the PRM endpoint directly (RFC 9728 - path includes /mcp resource)
async with httpx.AsyncClient() as client:
@@ -60,7 +62,6 @@ async def test_basicauth_shows_all_tools(nc_mcp_client):
@pytest.mark.integration
async def test_read_only_token_filters_write_tools(nc_mcp_oauth_client_read_only):
"""Test that a token with only read scopes filters out write tools."""
import logging
logger = logging.getLogger(__name__)
@@ -109,7 +110,6 @@ async def test_read_only_token_filters_write_tools(nc_mcp_oauth_client_read_only
@pytest.mark.integration
async def test_write_only_token_filters_read_tools(nc_mcp_oauth_client_write_only):
"""Test that a token with only write scopes filters out read tools."""
import logging
logger = logging.getLogger(__name__)
@@ -158,7 +158,6 @@ async def test_write_only_token_filters_read_tools(nc_mcp_oauth_client_write_onl
@pytest.mark.integration
async def test_full_access_token_shows_all_tools(nc_mcp_oauth_client_full_access):
"""Test that a token with both read and write scopes scopes can see all tools."""
import logging
logger = logging.getLogger(__name__)
@@ -402,7 +401,6 @@ async def test_jwt_with_no_custom_scopes_returns_zero_tools(
- OAuth provisioning tools (requiring only 'openid') remain visible
so users can provision Nextcloud access after authentication
"""
import logging
logger = logging.getLogger(__name__)
@@ -442,7 +440,6 @@ async def test_jwt_consent_scenarios_read_only(nc_mcp_oauth_client_read_only):
Simulates user granting only read permission during OAuth consent.
Expected: Should see read tools but not write tools.
"""
import logging
logger = logging.getLogger(__name__)
@@ -480,7 +477,6 @@ async def test_jwt_consent_scenarios_write_only(nc_mcp_oauth_client_write_only):
Simulates user granting only write permission during OAuth consent.
Expected: Should see write tools but not read-only tools.
"""
import logging
logger = logging.getLogger(__name__)
@@ -518,7 +514,6 @@ async def test_jwt_consent_scenarios_full_access(nc_mcp_oauth_client_full_access
Simulates user granting both permissions during OAuth consent.
Expected: Should see all 90+ tools (both read and write).
"""
import logging
logger = logging.getLogger(__name__)
+2 -3
View File
@@ -6,10 +6,12 @@ Tests the critical token exchange pattern that separates:
"""
import os
import tempfile
from unittest.mock import AsyncMock, MagicMock, patch
import jwt
import pytest
from cryptography.fernet import Fernet
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
@@ -21,9 +23,6 @@ pytestmark = pytest.mark.unit
@pytest.fixture
async def token_storage():
"""Create test token storage."""
import tempfile
from cryptography.fernet import Fernet
# Generate valid Fernet key
encryption_key = Fernet.generate_key()
+1 -8
View File
@@ -1,5 +1,6 @@
"""Integration tests for Calendar VTODO (task) MCP tools."""
import json
import logging
from datetime import datetime, timedelta
@@ -41,7 +42,6 @@ async def test_mcp_todo_complete_workflow(
# Extract UID from the result
result_data = create_result.content[0].text
import json
result_json = json.loads(result_data)
todo_uid = result_json["uid"]
@@ -156,7 +156,6 @@ async def test_mcp_list_todos_with_filters(
{"calendar_name": calendar_name, "status": "NEEDS-ACTION"},
)
assert result.isError is False
import json
data = json.loads(result.content[0].text)
needs_action_todos = [t for t in data["todos"] if t["uid"] in created_uids]
@@ -253,8 +252,6 @@ async def test_mcp_search_todos_across_calendars(
)
assert search_result.isError is False
import json
data = json.loads(search_result.content[0].text)
assert "todos" in data
@@ -388,8 +385,6 @@ async def test_mcp_todo_with_dates(
)
assert create_result.isError is False
import json
result_data = json.loads(create_result.content[0].text)
todo_uid = result_data["uid"]
@@ -432,8 +427,6 @@ async def test_mcp_todo_categories(
)
assert create_result.isError is False
import json
result_data = json.loads(create_result.content[0].text)
todo_uid = result_data["uid"]
+1 -4
View File
@@ -1,5 +1,6 @@
"""Tests for configuration validation."""
import logging
import os
from unittest.mock import patch
@@ -48,7 +49,6 @@ class TestQdrantConfigValidation:
def test_api_key_warning_in_local_mode(self, caplog):
"""Test that API key in local mode triggers warning."""
import logging
caplog.set_level(logging.WARNING, logger="nextcloud_mcp_server.config")
Settings(
@@ -59,7 +59,6 @@ class TestQdrantConfigValidation:
def test_api_key_no_warning_in_network_mode(self, caplog):
"""Test that API key in network mode doesn't trigger warning."""
import logging
caplog.set_level(logging.WARNING, logger="nextcloud_mcp_server.config")
Settings(
@@ -206,7 +205,6 @@ class TestChunkConfigValidation:
def test_small_chunk_size_warning(self, caplog):
"""Test that chunk size < 512 triggers warning."""
import logging
caplog.set_level(logging.WARNING, logger="nextcloud_mcp_server.config")
Settings(
@@ -221,7 +219,6 @@ class TestChunkConfigValidation:
def test_reasonable_chunk_size_no_warning(self, caplog):
"""Test that chunk size >= 512 doesn't trigger warning."""
import logging
caplog.set_level(logging.WARNING, logger="nextcloud_mcp_server.config")
Settings(
+1 -1
View File
@@ -8,6 +8,7 @@ APIs use OAuth.
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
from nextcloud_mcp_server.app import setup_oauth_config_for_multi_user_basic
@@ -207,7 +208,6 @@ class TestSetupOAuthConfigForMultiUserBasic:
self, hybrid_auth_settings, mocker
):
"""Test handling of OIDC discovery HTTP errors."""
import httpx
# Create a mock response with a status error
mock_response = MagicMock()