build: Add type checking

This commit is contained in:
Chris Coutinho
2025-11-05 15:19:55 +01:00
parent 9dcda0cd6a
commit 6cccd92b3b
17 changed files with 104 additions and 47 deletions
+1 -1
View File
@@ -818,7 +818,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
return allowed_tools
# Replace the tool manager's list_tools method
mcp._tool_manager.list_tools = list_tools_filtered
mcp._tool_manager.list_tools = list_tools_filtered # type: ignore[method-assign]
logger.info(
"Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)"
)
@@ -34,7 +34,7 @@ class ProgressiveConsentTokenVerifier:
def __init__(
self,
token_storage: RefreshTokenStorage,
token_storage: RefreshTokenStorage | None,
token_broker: Optional[TokenBrokerService] = None,
oidc_discovery_url: Optional[str] = None,
nextcloud_host: Optional[str] = None,
@@ -80,7 +80,7 @@ class ProgressiveConsentTokenVerifier:
# Create token broker if not provided
if token_broker:
self.token_broker = token_broker
elif self.encryption_key:
elif self.encryption_key and token_storage and self.nextcloud_host:
self.token_broker = TokenBrokerService(
storage=token_storage,
oidc_discovery_url=self.oidc_discovery_url,
@@ -89,7 +89,12 @@ class ProgressiveConsentTokenVerifier:
)
else:
self.token_broker = None
logger.warning("Token broker not available - encryption key missing")
if not self.encryption_key:
logger.warning("Token broker not available - encryption key missing")
elif not token_storage:
logger.warning("Token broker not available - token storage missing")
elif not self.nextcloud_host:
logger.warning("Token broker not available - nextcloud host missing")
async def verify_token(self, token: str) -> Optional[AccessToken]:
"""
@@ -25,7 +25,7 @@ import logging
import os
import time
from pathlib import Path
from typing import Optional
from typing import Any, Optional
import aiosqlite
from cryptography.fernet import Fernet
@@ -283,7 +283,7 @@ class RefreshTokenStorage:
)
async def store_user_profile(
self, user_id: str, profile_data: dict[str, any]
self, user_id: str, profile_data: dict[str, Any]
) -> None:
"""
Store user profile data (cached from IdP userinfo endpoint).
@@ -314,7 +314,7 @@ class RefreshTokenStorage:
logger.debug(f"Cached user profile for {user_id}")
async def get_user_profile(self, user_id: str) -> Optional[dict[str, any]]:
async def get_user_profile(self, user_id: str) -> Optional[dict[str, Any]]:
"""
Retrieve cached user profile data.
@@ -3,7 +3,7 @@
import logging
import os
from functools import wraps
from typing import Callable
from typing import Any, Callable
from mcp.server.auth.middleware.auth_context import get_access_token
from mcp.server.auth.provider import AccessToken
@@ -88,15 +88,18 @@ def require_scopes(*required_scopes: str):
ScopeAuthorizationError: If required scopes are not present in the access token
"""
def decorator(func: Callable):
def decorator(func: Callable) -> Callable:
# Store scope requirements as function metadata for dynamic filtering
func._required_scopes = list(required_scopes) # type: ignore
func._required_scopes = list(required_scopes) # type: ignore[attr-defined]
# Get function name for logging (works for any callable)
func_name = getattr(func, "__name__", repr(func))
# Find which parameter receives the Context (FastMCP injects it by name)
context_param_name = find_context_parameter(func)
@wraps(func)
async def wrapper(*args, **kwargs):
async def wrapper(*args: Any, **kwargs: Any) -> Any:
# Extract context from kwargs (where FastMCP injected it)
ctx: Context | None = (
kwargs.get(context_param_name) if context_param_name else None
@@ -106,7 +109,7 @@ def require_scopes(*required_scopes: str):
# No context parameter found - likely BasicAuth mode
# In BasicAuth mode, all operations are allowed
logger.debug(
f"No context parameter for {func.__name__} - allowing (BasicAuth mode)"
f"No context parameter for {func_name} - allowing (BasicAuth mode)"
)
return await func(*args, **kwargs)
@@ -119,7 +122,7 @@ def require_scopes(*required_scopes: str):
# Not in OAuth mode (BasicAuth or no auth)
# In BasicAuth mode, all operations are allowed
logger.debug(
f"No access token present for {func.__name__} - allowing (BasicAuth mode)"
f"No access token present for {func_name} - allowing (BasicAuth mode)"
)
return await func(*args, **kwargs)
@@ -172,7 +175,7 @@ def require_scopes(*required_scopes: str):
if not has_nextcloud_scopes:
error_msg = (
f"Access denied to {func.__name__}: "
f"Access denied to {func_name}: "
f"Nextcloud resource access not provisioned. "
f"Please run the 'provision_nextcloud_access' tool first."
)
@@ -183,7 +186,7 @@ def require_scopes(*required_scopes: str):
missing_scopes = required_scopes_set - token_scopes
if missing_scopes:
error_msg = (
f"Access denied to {func.__name__}: "
f"Access denied to {func_name}: "
f"Missing required scopes: {', '.join(sorted(missing_scopes))}. "
f"Token has scopes: {', '.join(sorted(token_scopes)) if token_scopes else 'none'}"
)
@@ -192,7 +195,7 @@ def require_scopes(*required_scopes: str):
# All required scopes present - allow execution
logger.debug(
f"Scope authorization passed for {func.__name__}: {required_scopes}"
f"Scope authorization passed for {func_name}: {required_scopes}"
)
return await func(*args, **kwargs)
+1 -1
View File
@@ -68,7 +68,7 @@ class TokenCache:
logger.debug(f"Using cached token for user {user_id}")
return token
async def set(self, user_id: str, token: str, expires_in: int = None):
async def set(self, user_id: str, token: str, expires_in: int | None = None):
"""Store token in cache."""
async with self._lock:
# Use provided expiry or default TTL
+4 -1
View File
@@ -114,7 +114,8 @@ class TokenExchangeService:
if not self.oidc_discovery_url:
# Fallback to Nextcloud OIDC if no discovery URL
self.oidc_discovery_url = urljoin(
self.nextcloud_host, "/.well-known/openid-configuration"
self.nextcloud_host, # type: ignore[arg-type]
"/.well-known/openid-configuration",
)
try:
@@ -363,6 +364,7 @@ class TokenExchangeService:
True if provisioned, False otherwise
"""
await self._ensure_storage()
assert self.storage is not None # _ensure_storage() ensures this
token_data = await self.storage.get_refresh_token(user_id)
return token_data is not None
@@ -376,6 +378,7 @@ class TokenExchangeService:
Refresh token if found, None otherwise
"""
await self._ensure_storage()
assert self.storage is not None # _ensure_storage() ensures this
token_data = await self.storage.get_refresh_token(user_id)
if token_data:
return token_data.get("refresh_token")
+2 -1
View File
@@ -165,6 +165,7 @@ class NextcloudTokenVerifier(TokenVerifier):
"""
try:
# Get signing key from JWKS
assert self._jwks_client is not None # Caller should check before calling
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
# Verify and decode JWT
@@ -257,7 +258,7 @@ class NextcloudTokenVerifier(TokenVerifier):
try:
# Introspection requires client authentication
response = await self._client.post(
self.introspection_uri,
self.introspection_uri, # type: ignore
data={"token": token},
auth=(self.client_id, self.client_secret),
)
+18 -12
View File
@@ -100,7 +100,7 @@ class CalendarClient:
# Use custom PROPFIND with CalendarServer namespace (cs:) for calendar-color.
# caldav library's nsmap lacks "CS" namespace, and its CalendarColor uses
# Apple iCal namespace which Nextcloud doesn't recognize.
from lxml import etree
from lxml import etree # type: ignore[import-untyped]
propfind_body = """<?xml version="1.0" encoding="utf-8"?>
<d:propfind xmlns:d="DAV:" xmlns:cs="http://calendarserver.org/ns/" xmlns:c="urn:ietf:params:xml:ns:caldav">
@@ -261,11 +261,12 @@ class CalendarClient:
result = []
for event in events:
await event.load(only_if_unloaded=True)
event_dict = self._parse_ical_event(event.data)
if event_dict:
event_dict["href"] = str(event.url)
event_dict["etag"] = ""
result.append(event_dict)
if event.data:
event_dict = self._parse_ical_event(event.data)
if event_dict:
event_dict["href"] = str(event.url)
event_dict["etag"] = ""
result.append(event_dict)
if len(result) >= limit:
break
@@ -314,8 +315,8 @@ class CalendarClient:
await event.load(only_if_unloaded=True)
# Merge updates into existing iCal data
updated_ical = self._merge_ical_properties(event.data, event_data, event_uid)
event.data = updated_ical
updated_ical = self._merge_ical_properties # type: ignore[arg-type](event.data, event_data, event_uid)
event.data = updated_ical # type: ignore[misc]
await event.save()
@@ -349,7 +350,7 @@ class CalendarClient:
event = await calendar.event_by_uid(event_uid)
await event.load(only_if_unloaded=True)
event_data = self._parse_ical_event(event.data)
event_data = self._parse_ical_event(event.data) if event.data else None # type: ignore[arg-type]
if not event_data:
raise ValueError(f"Failed to parse event data for {event_uid}")
@@ -416,7 +417,10 @@ class CalendarClient:
# Only load if data not already present from REPORT response
# This avoids 404 errors for virtual calendars (e.g., Deck boards)
await todo.load(only_if_unloaded=True)
todo_dict = self._parse_ical_todo(todo.data)
if todo.data:
todo_dict = self._parse_ical_todo(todo.data) # type: ignore[arg-type]
else:
continue
if todo_dict:
todo_dict["href"] = str(todo.url)
todo_dict["etag"] = ""
@@ -470,12 +474,14 @@ class CalendarClient:
await todo.load(only_if_unloaded=True)
logger.debug(
f"Loaded todo {todo_uid}, current data length: {len(todo.data)}"
f"Loaded todo {todo_uid}, current data length: {len(todo.data)}" # type: ignore
)
# Merge updates into existing iCal data
updated_ical = self._merge_ical_todo_properties(
todo.data, todo_data, todo_uid
todo.data, # type: ignore[arg-type]
todo_data,
todo_uid,
)
logger.debug(f"Merged iCal data length: {len(updated_ical)}")
logger.debug(f"Updated iCal content:\n{updated_ical}")
+2 -2
View File
@@ -124,7 +124,7 @@ class ContactsClient(BaseNextcloudClient):
carddav_path = self._get_carddav_base_path()
url = f"{carddav_path}/{addressbook}/{uid}.vcf"
contact = Contact(fn=contact_data.get("fn"), uid=uid)
contact = Contact(fn=contact_data.get("fn"), uid=uid) # type: ignore
if "email" in contact_data:
contact.email = [{"value": contact_data["email"], "type": ["HOME"]}]
if "tel" in contact_data:
@@ -174,7 +174,7 @@ class ContactsClient(BaseNextcloudClient):
)
else:
# Fallback to creating new vCard if we couldn't get existing
contact = Contact(fn=contact_data.get("fn"), uid=uid)
contact = Contact(fn=contact_data.get("fn"), uid=uid) # type: ignore
if "email" in contact_data:
contact.email = [{"value": contact_data["email"], "type": ["HOME"]}]
if "tel" in contact_data:
@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
try:
import io
import pytesseract
import pytesseract # type: ignore
from PIL import Image
TESSERACT_AVAILABLE = True
@@ -112,10 +112,10 @@ class UnstructuredProcessor(DocumentProcessor):
f"Processing document with unstructured... ({elapsed}s elapsed)"
)
try:
await progress_callback(
progress=float(elapsed),
total=None, # Unknown total duration
message=message,
await progress_callback( # type: ignore
progress=float(elapsed), # type: ignore
total=None, # Unknown total duration # type: ignore
message=message, # type: ignore
)
logger.debug(f"Progress update sent: {elapsed}s elapsed")
except Exception as e:
@@ -293,7 +293,7 @@ class UnstructuredProcessor(DocumentProcessor):
self._run_progress_poller, stop_event, progress_callback, start_time
)
return result
return result # type: ignore
async def health_check(self) -> bool:
"""Check if Unstructured API is available.
+3 -3
View File
@@ -191,7 +191,7 @@ def configure_cookbook_tools(mcp: FastMCP):
recipe_yield: int | None = None,
category: str | None = None,
keywords: str | None = None,
ctx: Context = None,
ctx: Context = None, # type: ignore
) -> CreateRecipeResponse:
"""Create a new recipe.
@@ -271,7 +271,7 @@ def configure_cookbook_tools(mcp: FastMCP):
recipe_yield: int | None = None,
category: str | None = None,
keywords: str | None = None,
ctx: Context = None,
ctx: Context = None, # type: ignore
) -> UpdateRecipeResponse:
"""Update an existing recipe.
@@ -544,7 +544,7 @@ def configure_cookbook_tools(mcp: FastMCP):
folder: str | None = None,
update_interval: int | None = None,
print_image: bool | None = None,
ctx: Context = None,
ctx: Context = None, # type: ignore
) -> ReindexResponse:
"""Set Cookbook app configuration.
+1 -1
View File
@@ -331,7 +331,7 @@ def configure_notes_tools(mcp: FastMCP):
content, mime_type = await client.webdav.get_note_attachment(
note_id=note_id, filename=attachment_filename
)
return {
return { # type: ignore
"uri": f"nc://Notes/{note_id}/attachments/{attachment_filename}",
"mimeType": mime_type,
"data": content,
+4 -4
View File
@@ -163,7 +163,7 @@ async def provision_nextcloud_access(
if not user_id:
# Get the authorization token from context
if hasattr(ctx, "authorization") and ctx.authorization:
token = ctx.authorization.token
token = ctx.authorization.token # type: ignore
# Decode token to get user info
try:
import jwt
@@ -304,7 +304,7 @@ async def revoke_nextcloud_access(
# Get user ID from context if not provided
if not user_id:
user_id = (
ctx.context.get("user_id", "default_user")
ctx.context.get("user_id", "default_user") # type: ignore
if hasattr(ctx, "context")
else "default_user"
)
@@ -334,7 +334,7 @@ async def revoke_nextcloud_access(
"OIDC_DISCOVERY_URL",
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
),
nextcloud_host=os.getenv("NEXTCLOUD_HOST"),
nextcloud_host=os.getenv("NEXTCLOUD_HOST"), # type: ignore
encryption_key=encryption_key,
)
@@ -382,7 +382,7 @@ async def check_provisioning_status(
# Get user ID from context if not provided
if not user_id:
user_id = (
ctx.context.get("user_id", "default_user")
ctx.context.get("user_id", "default_user") # type: ignore
if hasattr(ctx, "context")
else "default_user"
)