Claude-skill-registry Keycloak FastAPI Integration
This skill should be used when the user asks to "add Keycloak authentication", "implement OIDC", "configure SSO", "validate JWT token", "add role-based access", "protect API endpoint", or mentions Keycloak, OAuth2, OpenID Connect, identity provider, or authentication in FastAPI. Provides Keycloak/OIDC integration patterns.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/keycloak-fastapi" ~/.claude/skills/majiayu000-claude-skill-registry-keycloak-fastapi-integration && rm -rf "$T"
manifest:
skills/data/keycloak-fastapi/SKILL.mdsource content
Keycloak Integration for FastAPI
This skill provides patterns for integrating Keycloak as an identity provider with FastAPI applications using OIDC/OAuth2.
Configuration
Settings
from pydantic_settings import BaseSettings class KeycloakSettings(BaseSettings): keycloak_url: str = "https://auth.example.com" keycloak_realm: str = "my-realm" keycloak_client_id: str = "my-api" keycloak_client_secret: str = "" @property def openid_config_url(self) -> str: return f"{self.keycloak_url}/realms/{self.keycloak_realm}/.well-known/openid-configuration" @property def jwks_url(self) -> str: return f"{self.keycloak_url}/realms/{self.keycloak_realm}/protocol/openid-connect/certs" @property def token_url(self) -> str: return f"{self.keycloak_url}/realms/{self.keycloak_realm}/protocol/openid-connect/token" class Config: env_file = ".env"
JWT Token Validation
Token Validator
import httpx from jose import jwt, JWTError from jose.jwk import construct from functools import lru_cache from typing import Optional, Dict, Any class KeycloakTokenValidator: def __init__(self, settings: KeycloakSettings): self.settings = settings self._jwks: Optional[Dict] = None async def get_jwks(self) -> Dict: if self._jwks is None: async with httpx.AsyncClient() as client: response = await client.get(self.settings.jwks_url) response.raise_for_status() self._jwks = response.json() return self._jwks async def validate_token(self, token: str) -> Dict[str, Any]: try: jwks = await self.get_jwks() # Get key ID from token header unverified_header = jwt.get_unverified_header(token) kid = unverified_header.get("kid") # Find matching key key = None for jwk in jwks.get("keys", []): if jwk.get("kid") == kid: key = jwk break if not key: raise JWTError("Key not found") # Decode and validate payload = jwt.decode( token, key, algorithms=["RS256"], audience=self.settings.keycloak_client_id, issuer=f"{self.settings.keycloak_url}/realms/{self.settings.keycloak_realm}" ) return payload except JWTError as e: raise ValueError(f"Invalid token: {str(e)}")
FastAPI Dependencies
Current User Dependency
from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional security = HTTPBearer(auto_error=False) class TokenUser: def __init__(self, payload: Dict[str, Any]): self.sub: str = payload.get("sub", "") self.email: str = payload.get("email", "") self.name: str = payload.get("name", "") self.preferred_username: str = payload.get("preferred_username", "") self.roles: list = self._extract_roles(payload) self.raw_payload = payload def _extract_roles(self, payload: Dict) -> list: # Realm roles realm_roles = payload.get("realm_access", {}).get("roles", []) # Client roles resource_access = payload.get("resource_access", {}) client_roles = resource_access.get( settings.keycloak_client_id, {} ).get("roles", []) return list(set(realm_roles + client_roles)) async def get_token_validator() -> KeycloakTokenValidator: return KeycloakTokenValidator(get_settings()) async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), validator: KeycloakTokenValidator = Depends(get_token_validator) ) -> TokenUser: if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"} ) try: payload = await validator.validate_token(credentials.credentials) return TokenUser(payload) except ValueError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e), headers={"WWW-Authenticate": "Bearer"} ) async def get_current_user_optional( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), validator: KeycloakTokenValidator = Depends(get_token_validator) ) -> Optional[TokenUser]: if not credentials: return None try: payload = await validator.validate_token(credentials.credentials) return TokenUser(payload) except ValueError: return None
Role-Based Access Control
from functools import wraps from typing import List def require_roles(*required_roles: str): """Dependency that checks for required roles.""" async def role_checker( user: TokenUser = Depends(get_current_user) ) -> TokenUser: if not any(role in user.roles for role in required_roles): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Required roles: {', '.join(required_roles)}" ) return user return role_checker def require_all_roles(*required_roles: str): """Dependency that checks user has ALL required roles.""" async def role_checker( user: TokenUser = Depends(get_current_user) ) -> TokenUser: missing = [r for r in required_roles if r not in user.roles] if missing: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing roles: {', '.join(missing)}" ) return user return role_checker # Usage in routes @router.get("/admin/users") async def list_users(user: TokenUser = Depends(require_roles("admin", "user-manager"))): """Only admins or user-managers can access.""" return {"users": []} @router.delete("/admin/system") async def system_action(user: TokenUser = Depends(require_all_roles("admin", "super-admin"))): """Requires BOTH admin AND super-admin roles.""" return {"status": "ok"}
Protected Routes
from fastapi import APIRouter, Depends router = APIRouter(prefix="/api/v1", tags=["Protected"]) @router.get("/profile") async def get_profile(user: TokenUser = Depends(get_current_user)): """Get current user's profile.""" return { "sub": user.sub, "email": user.email, "name": user.name, "roles": user.roles } @router.get("/public") async def public_endpoint(): """Public endpoint - no auth required.""" return {"message": "Public data"} @router.get("/optional-auth") async def optional_auth(user: Optional[TokenUser] = Depends(get_current_user_optional)): """Returns different data based on auth status.""" if user: return {"message": f"Hello, {user.name}!", "authenticated": True} return {"message": "Hello, guest!", "authenticated": False}
Token Refresh Flow
import httpx from typing import Tuple class KeycloakAuthService: def __init__(self, settings: KeycloakSettings): self.settings = settings async def refresh_token(self, refresh_token: str) -> Tuple[str, str]: """Exchange refresh token for new access token.""" async with httpx.AsyncClient() as client: response = await client.post( self.settings.token_url, data={ "grant_type": "refresh_token", "client_id": self.settings.keycloak_client_id, "client_secret": self.settings.keycloak_client_secret, "refresh_token": refresh_token } ) if response.status_code != 200: raise ValueError("Failed to refresh token") data = response.json() return data["access_token"], data["refresh_token"] async def exchange_code(self, code: str, redirect_uri: str) -> dict: """Exchange authorization code for tokens.""" async with httpx.AsyncClient() as client: response = await client.post( self.settings.token_url, data={ "grant_type": "authorization_code", "client_id": self.settings.keycloak_client_id, "client_secret": self.settings.keycloak_client_secret, "code": code, "redirect_uri": redirect_uri } ) if response.status_code != 200: raise ValueError("Failed to exchange code") return response.json()
Middleware for Token Refresh
from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response class TokenRefreshMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) # Check if token is about to expire (from custom header) token_exp = request.state.get("token_exp") if token_exp and token_exp - time.time() < 300: # 5 min # Token expires soon - add header to signal frontend response.headers["X-Token-Expiring"] = "true" return response
Additional Resources
Reference Files
For detailed configuration and advanced patterns:
- Keycloak realm/client configurationreferences/keycloak-setup.md
- Multi-tenant authentication patternsreferences/multi-tenant.md
- Testing authenticated endpointsreferences/testing.md
Example Files
Working examples in
examples/:
- Complete auth dependenciesexamples/auth_dependencies.py
- Protected route examplesexamples/protected_router.py
- Full Keycloak serviceexamples/keycloak_service.py