From e42cabb6ed4c958f62d9eac0252791c05f8f5fe2 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Mon, 13 Oct 2025 18:07:56 +0200 Subject: [PATCH] chore: logging --- scripts/verify_oidc.py | 290 ----------------------------------------- tests/conftest.py | 4 +- 2 files changed, 2 insertions(+), 292 deletions(-) delete mode 100755 scripts/verify_oidc.py diff --git a/scripts/verify_oidc.py b/scripts/verify_oidc.py deleted file mode 100755 index fff4c5e..0000000 --- a/scripts/verify_oidc.py +++ /dev/null @@ -1,290 +0,0 @@ -#!/usr/bin/env python3 -""" -Verification script for Nextcloud OIDC implementation. - -This script tests the OIDC endpoints to understand token format and capabilities. -Usage: python scripts/verify_oidc.py -""" - -import asyncio -import json -import sys - -import httpx - - -class NextcloudOIDCVerifier: - """Verify Nextcloud OIDC implementation details.""" - - def __init__(self, base_url: str): - self.base_url = base_url.rstrip("/") - self.client = httpx.AsyncClient(follow_redirects=True, timeout=30.0) - - async def close(self): - await self.client.aclose() - - async def get_discovery(self) -> dict: - """Fetch OIDC discovery document.""" - print(f"\n{'=' * 60}") - print("1. OIDC Discovery Endpoint") - print(f"{'=' * 60}") - - url = f"{self.base_url}/.well-known/openid-configuration" - print(f"URL: {url}") - - try: - response = await self.client.get(url) - response.raise_for_status() - discovery = response.json() - - print("\n✓ Discovery endpoint successful") - print(f"\nIssuer: {discovery.get('issuer')}") - print(f"Authorization endpoint: {discovery.get('authorization_endpoint')}") - print(f"Token endpoint: {discovery.get('token_endpoint')}") - print(f"Userinfo endpoint: {discovery.get('userinfo_endpoint')}") - print(f"JWKS URI: {discovery.get('jwks_uri')}") - print( - f"Registration endpoint: {discovery.get('registration_endpoint', 'NOT AVAILABLE')}" - ) - - print( - f"\nSupported scopes: {', '.join(discovery.get('scopes_supported', []))}" - ) - print( - f"Response types: {', '.join(discovery.get('response_types_supported', []))}" - ) - print( - f"Grant types: {', '.join(discovery.get('grant_types_supported', []))}" - ) - - return discovery - - except httpx.HTTPStatusError as e: - print(f"\n✗ Discovery failed: HTTP {e.response.status_code}") - print(f"Response: {e.response.text}") - sys.exit(1) - except Exception as e: - print(f"\n✗ Discovery failed: {e}") - sys.exit(1) - - async def get_jwks(self, jwks_uri: str) -> dict: - """Fetch JWKS to check if JWT tokens are supported.""" - print(f"\n{'=' * 60}") - print("2. JWKS Endpoint (JWT Support)") - print(f"{'=' * 60}") - - print(f"URL: {jwks_uri}") - - try: - response = await self.client.get(jwks_uri) - response.raise_for_status() - jwks = response.json() - - print("\n✓ JWKS endpoint successful") - print(f"Number of keys: {len(jwks.get('keys', []))}") - - for idx, key in enumerate(jwks.get("keys", []), 1): - print(f"\nKey {idx}:") - print(f" - Key type: {key.get('kty')}") - print(f" - Algorithm: {key.get('alg')}") - print(f" - Use: {key.get('use', 'N/A')}") - print(f" - Key ID: {key.get('kid', 'N/A')}") - - return jwks - - except Exception as e: - print(f"\n✗ JWKS failed: {e}") - return {} - - async def test_dynamic_registration( - self, registration_endpoint: str | None - ) -> dict | None: - """Test dynamic client registration.""" - print(f"\n{'=' * 60}") - print("3. Dynamic Client Registration") - print(f"{'=' * 60}") - - if not registration_endpoint: - print("✗ Dynamic registration not available (not in discovery)") - return None - - print(f"URL: {registration_endpoint}") - - client_metadata = { - "client_name": "Nextcloud MCP Server Test", - "redirect_uris": ["http://localhost:8000/oauth/callback"], - "token_endpoint_auth_method": "client_secret_post", - "grant_types": ["authorization_code", "refresh_token"], - "response_types": ["code"], - "scope": "openid profile email roles groups", - } - - print("\nRegistration payload:") - print(json.dumps(client_metadata, indent=2)) - - try: - response = await self.client.post( - registration_endpoint, - json=client_metadata, - headers={"Content-Type": "application/json"}, - ) - response.raise_for_status() - client_info = response.json() - - print("\n✓ Dynamic registration successful") - print(f"\nClient ID: {client_info.get('client_id')}") - print(f"Client Secret: {client_info.get('client_secret', 'N/A')[:20]}...") - print( - f"Client ID issued at: {client_info.get('client_id_issued_at', 'N/A')}" - ) - print( - f"Client secret expires at: {client_info.get('client_secret_expires_at', 'Never')}" - ) - - # Save for later use - with open("/tmp/nextcloud_oidc_client.json", "w") as f: - json.dump(client_info, f, indent=2) - print("\n✓ Client credentials saved to /tmp/nextcloud_oidc_client.json") - - return client_info - - except httpx.HTTPStatusError as e: - print(f"\n✗ Dynamic registration failed: HTTP {e.response.status_code}") - print(f"Response: {e.response.text}") - return None - except Exception as e: - print(f"\n✗ Dynamic registration failed: {e}") - return None - - async def check_introspection_endpoint(self, discovery: dict) -> bool: - """Check if token introspection endpoint exists.""" - print(f"\n{'=' * 60}") - print("4. Token Introspection Endpoint") - print(f"{'=' * 60}") - - introspection_endpoint = discovery.get("introspection_endpoint") - - if introspection_endpoint: - print(f"URL: {introspection_endpoint}") - print("✓ Introspection endpoint available") - return True - else: - print("✗ Introspection endpoint NOT available") - print("Note: Will need to use userinfo endpoint for token validation") - return False - - def print_summary( - self, discovery: dict, jwks_available: bool, registration_available: bool - ): - """Print implementation summary.""" - print(f"\n{'=' * 60}") - print("IMPLEMENTATION SUMMARY") - print(f"{'=' * 60}") - - print("\n📋 Nextcloud OIDC Capabilities:") - print(" ✓ Discovery endpoint: Available") - print( - f" {'✓' if jwks_available else '✗'} JWKS endpoint: {'Available' if jwks_available else 'Not Available'}" - ) - print( - f" {'✓' if registration_available else '✗'} Dynamic registration: {'Available' if registration_available else 'Not Available'}" - ) - print(f" {'✗'} Token introspection: Not Available (use userinfo)") - - print("\n🔑 Token Format:") - if jwks_available: - print(" ✓ JWT access tokens: SUPPORTED (RFC 9068)") - print(" - Must be enabled per-client in OIDC settings") - print(" - Default: Opaque tokens") - else: - print(" - Opaque tokens only") - - print("\n🔐 Authentication Strategy:") - print(" Primary: Userinfo endpoint validation") - print(" Alternative: JWT validation (if enabled per-client)") - - print("\n📦 Required Scopes:") - scopes = discovery.get("scopes_supported", []) - print(f" Available: {', '.join(scopes)}") - print(" Recommended for MCP: openid profile email") - - print("\n👤 User Context Extraction:") - print(" - Username: 'sub' or 'preferred_username' claim") - print(" - From: JWT claims OR userinfo endpoint") - print(" - Groups: Available via 'roles' or 'groups' scope") - - print("\n⚙️ Configuration Requirements:") - if registration_available: - print(" ✓ Dynamic registration enabled - zero-config deployment possible") - print(" - Clients expire after 3600s (1 hour)") - print(" - Max 100 dynamic clients per instance") - print(" - BruteForce protection enabled") - else: - print(" ✗ Dynamic registration disabled - manual client setup required") - print(" Admin must create client via: occ oidc:create") - - print("\n📝 Endpoints:") - print(f" Authorization: {discovery.get('authorization_endpoint')}") - print(f" Token: {discovery.get('token_endpoint')}") - print(f" Userinfo: {discovery.get('userinfo_endpoint')}") - print(f" JWKS: {discovery.get('jwks_uri')}") - - -async def main(): - """Run verification tests.""" - print("=" * 60) - print("Nextcloud OIDC Verification Script") - print("=" * 60) - - # Get Nextcloud URL - nextcloud_url = input( - "\nEnter Nextcloud URL (e.g., https://cloud.coutinho.io): " - ).strip() - if not nextcloud_url: - nextcloud_url = "https://cloud.coutinho.io" - - verifier = NextcloudOIDCVerifier(nextcloud_url) - - try: - # 1. Get discovery document - discovery = await verifier.get_discovery() - - # 2. Check JWKS - jwks_uri = discovery.get("jwks_uri") - jwks_available = False - if jwks_uri: - jwks = await verifier.get_jwks(jwks_uri) - jwks_available = len(jwks.get("keys", [])) > 0 - - # 3. Test dynamic registration - registration_endpoint = discovery.get("registration_endpoint") - if registration_endpoint: - print("\nTest dynamic registration? (y/n): ", end="") - test_reg = input().strip().lower() - if test_reg == "y": - client_info = await verifier.test_dynamic_registration( - registration_endpoint - ) - registration_available = client_info is not None - else: - registration_available = True - print("Skipping dynamic registration test") - else: - registration_available = False - - # 4. Check introspection - await verifier.check_introspection_endpoint(discovery) - - # 5. Print summary - verifier.print_summary(discovery, jwks_available, registration_available) - - print(f"\n{'=' * 60}") - print("Verification complete!") - print(f"{'=' * 60}\n") - - finally: - await verifier.close() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests/conftest.py b/tests/conftest.py index f5fcdb7..3775d2e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -698,9 +698,9 @@ async def interactive_oauth_token() -> str: }, ) - logger.info(f"Token response: {token_response.text}") + logger.debug(f"Token response: {token_response.text}") token_data = token_response.json() - logger.info(f"Token data: {token_data}") + logger.debug(f"Token data: {token_data}") access_token = token_data.get("access_token") # Shut down the server