Claude-skill-registry authentication
Authentication and authorization including JWT, OAuth2, OIDC, sessions, RBAC, and security analysis. Activate for login, auth flows, security audits, threat modeling, access control, and identity management.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/authentication-lobbi-docs-claude" ~/.claude/skills/majiayu000-claude-skill-registry-authentication && rm -rf "$T"
skills/data/authentication-lobbi-docs-claude/SKILL.mdAuthentication Skill
Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.
When to Use This Skill
Activate this skill when working with:
- User authentication flows
- JWT token management
- OAuth2 integration
- Session management
- Role-based access control (RBAC)
JWT Authentication
Token Generation
```python from jose import jwt from datetime import datetime, timedelta from passlib.context import CryptContext
SECRET_KEY = os.environ["JWT_SECRET"] ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(user_id: str, roles: list[str]) -> str: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": user_id, "roles": roles, "exp": expire, "iat": datetime.utcnow(), "type": "access" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str) -> str: expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = { "sub": user_id, "exp": expire, "type": "refresh" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid token") ```
Password Hashing
```python def hash_password(password: str) -> str: return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password)
Usage
async def authenticate_user(email: str, password: str) -> User | None: user = await get_user_by_email(email) if not user: return None if not verify_password(password, user.hashed_password): return None return user ```
FastAPI Auth Dependencies
```python from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: payload = verify_token(token) user = await get_user(payload["sub"]) if not user: raise HTTPException(status_code=401, detail="User not found") return user
async def get_current_active_user(user: User = Depends(get_current_user)) -> User: if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user
Role-based access
def require_roles(*roles: str): async def role_checker(user: User = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException(status_code=403, detail="Insufficient permissions") return user return role_checker
Usage
@app.get("/admin") async def admin_route(user: User = Depends(require_roles("admin"))): return {"message": "Admin access granted"} ```
OAuth2 Integration
Google OAuth2
```python from authlib.integrations.starlette_client import OAuth
oauth = OAuth() oauth.register( name='google', client_id=os.environ['GOOGLE_CLIENT_ID'], client_secret=os.environ['GOOGLE_CLIENT_SECRET'], server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'} )
@app.get('/auth/google') async def google_login(request: Request): redirect_uri = request.url_for('google_callback') return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google/callback') async def google_callback(request: Request): token = await oauth.google.authorize_access_token(request) user_info = token.get('userinfo')
# Find or create user user = await get_or_create_user( email=user_info['email'], name=user_info['name'], provider='google' ) # Generate JWT access_token = create_access_token(user.id, user.roles) return {"access_token": access_token, "token_type": "bearer"}
```
GitHub OAuth2
```python oauth.register( name='github', client_id=os.environ['GITHUB_CLIENT_ID'], client_secret=os.environ['GITHUB_CLIENT_SECRET'], authorize_url='https://github.com/login/oauth/authorize', access_token_url='https://github.com/login/oauth/access_token', api_base_url='https://api.github.com/', client_kwargs={'scope': 'user:email'} ) ```
Session Management
```python from fastapi import Request, Response import secrets
async def create_session(user_id: str, response: Response) -> str: session_id = secrets.token_urlsafe(32)
# Store in Redis await redis.hset(f"session:{session_id}", mapping={ "user_id": user_id, "created_at": datetime.utcnow().isoformat() }) await redis.expire(f"session:{session_id}", 86400) # 24 hours # Set cookie response.set_cookie( key="session_id", value=session_id, httponly=True, secure=True, samesite="lax", max_age=86400 ) return session_id
async def get_session(request: Request) -> dict | None: session_id = request.cookies.get("session_id") if not session_id: return None
session = await redis.hgetall(f"session:{session_id}") if not session: return None # Refresh TTL await redis.expire(f"session:{session_id}", 86400) return session
async def destroy_session(request: Request, response: Response): session_id = request.cookies.get("session_id") if session_id: await redis.delete(f"session:{session_id}") response.delete_cookie("session_id") ```
RBAC Implementation
```python from enum import Enum from typing import Set
class Permission(str, Enum): READ_AGENTS = "read:agents" WRITE_AGENTS = "write:agents" DELETE_AGENTS = "delete:agents" ADMIN = "admin"
ROLE_PERMISSIONS: dict[str, Set[Permission]] = { "viewer": {Permission.READ_AGENTS}, "operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS}, "admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN}, }
def has_permission(user_roles: list[str], required: Permission) -> bool: for role in user_roles: if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]: return True return False
def require_permission(permission: Permission): async def permission_checker(user: User = Depends(get_current_user)): if not has_permission(user.roles, permission): raise HTTPException(status_code=403, detail="Permission denied") return user return permission_checker
Usage
@app.delete("/agents/{id}") async def delete_agent( id: str, user: User = Depends(require_permission(Permission.DELETE_AGENTS)) ): await agent_service.delete(id) return {"status": "deleted"} ```
Security Best Practices
- Use HTTPS always in production
- Hash passwords with bcrypt or argon2
- Short-lived access tokens (15-30 minutes)
- Refresh token rotation on each use
- HttpOnly, Secure cookies for tokens
- Rate limit authentication endpoints
- Log authentication events for auditing
- Implement account lockout after failed attempts
OAuth 2.0 & OIDC Best Practices
OAuth 2.0 Grant Types
Authorization Code Flow (Recommended for Web Apps)
from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( name='custom_provider', client_id=os.environ['OAUTH_CLIENT_ID'], client_secret=os.environ['OAUTH_CLIENT_SECRET'], authorize_url='https://provider.com/oauth/authorize', authorize_params=None, access_token_url='https://provider.com/oauth/token', access_token_params=None, refresh_token_url=None, client_kwargs={'scope': 'openid profile email'} ) @app.get('/auth/login') async def login(request: Request): # Generate state for CSRF protection state = secrets.token_urlsafe(32) await redis.set(f"oauth_state:{state}", "1", ex=600) redirect_uri = request.url_for('auth_callback') return await oauth.custom_provider.authorize_redirect( request, redirect_uri, state=state ) @app.get('/auth/callback') async def auth_callback(request: Request): # Verify state (CSRF protection) state = request.query_params.get('state') if not await redis.get(f"oauth_state:{state}"): raise HTTPException(status_code=400, detail="Invalid state") await redis.delete(f"oauth_state:{state}") # Exchange authorization code for tokens token = await oauth.custom_provider.authorize_access_token(request) user_info = token.get('userinfo') # Create or update user user = await upsert_user(user_info) # Issue application tokens access_token = create_access_token(user.id, user.roles) refresh_token = create_refresh_token(user.id) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer" }
PKCE (Proof Key for Code Exchange) for SPAs
# Frontend (JavaScript/TypeScript) import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce' // Generate PKCE parameters const codeVerifier = generateCodeVerifier() const codeChallenge = await generateCodeChallenge(codeVerifier) // Store verifier for later use sessionStorage.setItem('code_verifier', codeVerifier) // Redirect to authorization endpoint const authUrl = new URL('https://provider.com/oauth/authorize') authUrl.searchParams.set('client_id', CLIENT_ID) authUrl.searchParams.set('redirect_uri', REDIRECT_URI) authUrl.searchParams.set('response_type', 'code') authUrl.searchParams.set('scope', 'openid profile email') authUrl.searchParams.set('code_challenge', codeChallenge) authUrl.searchParams.set('code_challenge_method', 'S256') authUrl.searchParams.set('state', generateState()) window.location.href = authUrl.toString() // In callback handler const code = new URLSearchParams(window.location.search).get('code') const codeVerifier = sessionStorage.getItem('code_verifier') const tokenResponse = await fetch('https://provider.com/oauth/token', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'authorization_code', code: code, redirect_uri: REDIRECT_URI, client_id: CLIENT_ID, code_verifier: codeVerifier }) })
Client Credentials Flow (Service-to-Service)
import httpx from datetime import datetime, timedelta class ServiceAuthClient: def __init__(self): self.token = None self.expires_at = None async def get_token(self) -> str: # Return cached token if still valid if self.token and self.expires_at > datetime.utcnow(): return self.token # Request new token async with httpx.AsyncClient() as client: response = await client.post( 'https://provider.com/oauth/token', data={ 'grant_type': 'client_credentials', 'client_id': os.environ['SERVICE_CLIENT_ID'], 'client_secret': os.environ['SERVICE_CLIENT_SECRET'], 'scope': 'api.read api.write' } ) response.raise_for_status() data = response.json() self.token = data['access_token'] self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60) return self.token # Usage auth_client = ServiceAuthClient() async def call_protected_api(): token = await auth_client.get_token() async with httpx.AsyncClient() as client: response = await client.get( 'https://api.service.com/resource', headers={'Authorization': f'Bearer {token}'} ) return response.json()
OpenID Connect (OIDC)
ID Token Validation
from jose import jwt, jwk from jose.utils import base64url_decode import httpx class OIDCValidator: def __init__(self, issuer: str, client_id: str): self.issuer = issuer self.client_id = client_id self.jwks = None self.jwks_updated_at = None async def get_jwks(self) -> dict: # Refresh JWKS if stale (cache for 24 hours) if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400: async with httpx.AsyncClient() as client: response = await client.get(f"{self.issuer}/.well-known/jwks.json") response.raise_for_status() self.jwks = response.json() self.jwks_updated_at = datetime.utcnow() return self.jwks async def validate_id_token(self, id_token: str) -> dict: # Decode header to get key ID header = jwt.get_unverified_header(id_token) kid = header['kid'] # Get JWKS and find matching key jwks = await self.get_jwks() key = next((k for k in jwks['keys'] if k['kid'] == kid), None) if not key: raise ValueError("Public key not found in JWKS") # Convert JWK to PEM public_key = jwk.construct(key) # Validate and decode ID token try: claims = jwt.decode( id_token, public_key.to_pem().decode('utf-8'), algorithms=['RS256'], audience=self.client_id, issuer=self.issuer, options={ 'verify_exp': True, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True } ) # Additional validations if claims.get('nonce'): # Verify nonce matches what was sent in auth request pass return claims except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="ID token expired") except jwt.JWTClaimsError as e: raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}") except Exception as e: raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}") # Usage validator = OIDCValidator( issuer="https://provider.com", client_id=os.environ['OIDC_CLIENT_ID'] ) @app.post("/auth/oidc/callback") async def oidc_callback(id_token: str): claims = await validator.validate_id_token(id_token) # Extract user information user = await get_or_create_user( email=claims['email'], name=claims['name'], sub=claims['sub'] ) return {"user": user, "claims": claims}
JWT Security Best Practices
Secure Token Generation
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend import os # Use RS256 (asymmetric) instead of HS256 for public verification PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH') PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH') def load_keys(): with open(PRIVATE_KEY_PATH, 'rb') as f: private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(PUBLIC_KEY_PATH, 'rb') as f: public_key = serialization.load_pem_public_key( f.read(), backend=default_backend() ) return private_key, public_key PRIVATE_KEY, PUBLIC_KEY = load_keys() def create_secure_access_token( user_id: str, roles: list[str], tenant_id: str = None, custom_claims: dict = None ) -> str: now = datetime.utcnow() payload = { # Standard claims (RFC 7519) "iss": "https://api.yourdomain.com", # Issuer "sub": user_id, # Subject (user ID) "aud": ["https://api.yourdomain.com"], # Audience "exp": now + timedelta(minutes=15), # Expiration (short-lived) "nbf": now, # Not before "iat": now, # Issued at "jti": secrets.token_urlsafe(16), # JWT ID (unique token identifier) # Custom claims "roles": roles, "type": "access", } # Add tenant context for multi-tenant applications if tenant_id: payload["tenant_id"] = tenant_id # Add any custom claims if custom_claims: payload.update(custom_claims) return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256") def verify_secure_token(token: str) -> dict: try: payload = jwt.decode( token, PUBLIC_KEY, algorithms=["RS256"], audience=["https://api.yourdomain.com"], issuer="https://api.yourdomain.com", options={ 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'verify_aud': True, 'verify_iss': True, 'require_exp': True, 'require_iat': True, 'require_nbf': True } ) # Validate token type if payload.get('type') != 'access': raise HTTPException(status_code=401, detail="Invalid token type") return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError as e: raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
Token Revocation with Blacklist
class TokenBlacklist: """Redis-based token blacklist for revoked tokens""" def __init__(self, redis_client): self.redis = redis_client async def revoke_token(self, jti: str, exp: int): """Add token to blacklist until expiration""" ttl = exp - int(datetime.utcnow().timestamp()) if ttl > 0: await self.redis.set(f"blacklist:{jti}", "1", ex=ttl) async def is_revoked(self, jti: str) -> bool: """Check if token is blacklisted""" return await self.redis.exists(f"blacklist:{jti}") # Global blacklist instance token_blacklist = TokenBlacklist(redis) async def get_current_user_with_revocation_check( token: str = Depends(oauth2_scheme) ) -> User: payload = verify_secure_token(token) # Check if token has been revoked if await token_blacklist.is_revoked(payload['jti']): raise HTTPException(status_code=401, detail="Token has been revoked") user = await get_user(payload["sub"]) if not user: raise HTTPException(status_code=401, detail="User not found") return user @app.post("/auth/logout") async def logout(token: str = Depends(oauth2_scheme)): payload = verify_secure_token(token) await token_blacklist.revoke_token(payload['jti'], payload['exp']) return {"message": "Logged out successfully"}
Security Analysis with Extended Thinking
When reviewing authentication flows, use extended thinking for comprehensive security analysis.
Authentication Flow Security Review Template
## Authentication Flow Security Review **Flow**: [Login/OAuth/SSO/API Authentication] **Date**: [YYYY-MM-DD] **Reviewer**: [Name/Agent] ### Flow Diagram [Document the authentication flow step-by-step] ### Security Analysis Checklist #### Confidentiality - [ ] Credentials transmitted over HTTPS only - [ ] Passwords hashed with strong algorithm (bcrypt/argon2) - [ ] Tokens encrypted in transit - [ ] Sensitive data not logged - [ ] Secrets stored securely (env vars, secrets manager) #### Integrity - [ ] CSRF protection implemented - [ ] Request tampering prevented - [ ] Token signature validation - [ ] State parameter validated (OAuth) - [ ] Nonce validated (OIDC) #### Availability - [ ] Rate limiting on auth endpoints - [ ] Account lockout after failed attempts - [ ] DDoS protection in place - [ ] Graceful degradation strategy - [ ] Session timeout configured #### Authentication - [ ] MFA available for sensitive accounts - [ ] Password complexity requirements - [ ] Credential stuffing protection - [ ] Brute force mitigation - [ ] Session fixation prevention #### Authorization - [ ] Principle of least privilege - [ ] Role-based access control - [ ] Permission checks on every request - [ ] Token scope validation - [ ] Tenant isolation (multi-tenant apps) #### Session Management - [ ] Secure session tokens - [ ] HttpOnly, Secure, SameSite cookies - [ ] Session timeout implemented - [ ] Logout functionality - [ ] Concurrent session handling ### Extended Thinking Analysis Use Claude with extended thinking for deep security review: ```python import anthropic client = anthropic.Anthropic() security_review_prompt = """ Perform a comprehensive security analysis of this authentication flow: [Paste authentication code/flow description] Analyze for: 1. OWASP Top 10 vulnerabilities 2. Authentication bypass possibilities 3. Token security weaknesses 4. Session management issues 5. Input validation gaps 6. Race conditions 7. Logic flaws Provide specific findings with: - Severity (Critical/High/Medium/Low) - Location (file:line) - Attack vector - Remediation steps """ response = client.messages.create( model="claude-opus-4-5-20250514", max_tokens=32000, thinking={ "type": "enabled", "budget_tokens": 20000 # High budget for security analysis }, messages=[{ "role": "user", "content": security_review_prompt }] ) # Extract thinking and analysis for block in response.content: if block.type == "thinking": print(f"Deep Analysis:\n{block.thinking}\n") elif block.type == "text": print(f"Findings:\n{block.text}")
Threat Modeling for Authentication
STRIDE Threat Model for Auth Systems
Adapted from [[deep-analysis]] skill threat modeling template:
## Authentication Threat Model ### System: [Auth System Name] **Version**: 1.0 **Last Updated**: [Date] ### Trust Boundaries
┌─────────────────────────────────────────┐ │ External (Untrusted) │ │ [End Users] [Credential Stuffers] │ │ [MITM Attackers] │ └──────────────────┬──────────────────────┘ │ TLS/HTTPS ┌──────────────────┴──────────────────────┐ │ Public API (Semi-trusted) │ │ [API Gateway] [Auth Endpoints] │ │ [OAuth Providers] [OIDC IdP] │ └──────────────────┬──────────────────────┘ │ Internal Auth ┌──────────────────┴──────────────────────┐ │ Application Layer (Trusted) │ │ [Business Logic] [User Management] │ └──────────────────┬──────────────────────┘ │ DB Protocol ┌──────────────────┴──────────────────────┐ │ Data Layer (Highly Trusted) │ │ [User DB] [Session Store] [Secrets] │ └─────────────────────────────────────────┘
### STRIDE Analysis #### Spoofing Identity | Threat | Likelihood | Impact | Mitigation | Status | |--------|------------|--------|------------|--------| | Credential theft via phishing | High | Critical | MFA, user education | ✅ | | Session hijacking | Medium | High | Secure cookies, HTTPS | ✅ | | Token replay attacks | Medium | High | Short token lifetime, JTI tracking | ✅ | | OAuth state manipulation | Low | Medium | Cryptographic state validation | ✅ | | Impersonation via stolen refresh token | Medium | Critical | Refresh token rotation, device binding | ⚠️ | #### Tampering with Data | Threat | Likelihood | Impact | Mitigation | Status | |--------|------------|--------|------------|--------| | JWT payload manipulation | Low | Critical | Signature verification, RS256 | ✅ | | Cookie tampering | Low | High | Signed cookies, HMAC validation | ✅ | | OAuth callback manipulation | Medium | High | Redirect URI validation | ✅ | | Password reset token tampering | Low | High | Cryptographic tokens, time limits | ✅ | #### Repudiation | Threat | Likelihood | Impact | Mitigation | Status | |--------|------------|--------|------------|--------| | Deny unauthorized access | Medium | Medium | Comprehensive audit logging | ✅ | | Dispute authentication events | Low | Low | Immutable audit trail, timestamps | ✅ | #### Information Disclosure | Threat | Likelihood | Impact | Mitigation | Status | |--------|------------|--------|------------|--------| | Credentials in logs | Medium | Critical | Sanitize logs, secret detection | ✅ | | User enumeration via login | High | Medium | Generic error messages | ⚠️ | | Token leakage in URLs | Low | High | Tokens in headers only | ✅ | | Timing attacks on password check | Medium | Medium | Constant-time comparison | ✅ | | JWKS endpoint information leak | Low | Low | Rate limiting, monitoring | ✅ | #### Denial of Service | Threat | Likelihood | Impact | Mitigation | Status | |--------|------------|--------|------------|--------| | Brute force attacks | High | High | Rate limiting, CAPTCHA, account lockout | ✅ | | Resource exhaustion (bcrypt) | Medium | Medium | Request throttling, async processing | ✅ | | Session store exhaustion | Low | High | Session limits per user, TTL | ✅ | | OAuth callback flooding | Medium | Medium | State validation, rate limiting | ⚠️ | #### Elevation of Privilege | Threat | Likelihood | Impact | Mitigation | Status | |--------|------------|--------|------------|--------| | Role manipulation in JWT | Low | Critical | Server-side role verification | ✅ | | Privilege escalation via API | Medium | Critical | Permission checks on every request | ✅ | | Admin impersonation | Low | Critical | Additional auth for admin actions | ⚠️ | | OAuth scope escalation | Low | High | Strict scope validation | ✅ | ### Risk Matrix | Threat ID | Threat | Likelihood | Impact | Risk Score | Priority | |-----------|--------|------------|--------|------------|----------| | T1 | Credential stuffing attack | High | Critical | 9 | P0 | | T2 | Refresh token theft | Medium | Critical | 8 | P1 | | T3 | User enumeration | High | Medium | 6 | P2 | | T4 | OAuth callback flooding | Medium | Medium | 4 | P2 | | T5 | Admin impersonation | Low | Critical | 7 | P1 | ### Attack Scenarios #### Scenario 1: Credential Stuffing Attack **Attacker Goal**: Gain unauthorized access using leaked credentials **Attack Steps**: 1. Obtain credential database from breach 2. Automate login attempts across accounts 3. Bypass rate limiting with distributed IPs 4. Identify valid credentials 5. Access user accounts **Defenses**: - Rate limiting per IP and per account - CAPTCHA after N failed attempts - Anomaly detection (impossible travel, new device) - Breach database monitoring (HaveIBeenPwned) - Mandatory MFA for sensitive accounts #### Scenario 2: Token Theft via XSS **Attacker Goal**: Steal access token to impersonate user **Attack Steps**: 1. Inject malicious script via vulnerable input 2. Script reads token from localStorage 3. Exfiltrate token to attacker server 4. Use token to access API as victim **Defenses**: - Store tokens in HttpOnly cookies (not accessible to JS) - Content Security Policy (CSP) - Input sanitization and validation - Regular security audits - Short token lifetimes ### Recommendations by Priority #### P0 (Critical - Immediate Action) 1. Implement credential stuffing protection 2. Add device fingerprinting for anomaly detection 3. Enable MFA for all admin accounts #### P1 (High - Within Sprint) 1. Implement refresh token rotation 2. Add additional auth step for admin impersonation 3. Strengthen OAuth callback validation #### P2 (Medium - Next Quarter) 1. Improve user enumeration protection 2. Implement risk-based authentication 3. Add behavioral biometrics
Common Vulnerabilities & Mitigations
OWASP Top 10 for Authentication
A01: Broken Access Control
# VULNERABLE: Client-side role check only @app.get("/admin/users") async def get_users(user: User = Depends(get_current_user)): # No server-side permission check! return await db.users.find_all() # SECURE: Server-side permission enforcement @app.get("/admin/users") async def get_users(user: User = Depends(require_permission(Permission.ADMIN))): # Permission verified on server return await db.users.find_all()
A02: Cryptographic Failures
# VULNERABLE: Weak hashing hashed = hashlib.md5(password.encode()).hexdigest() # SECURE: Strong adaptive hashing from passlib.context import CryptContext pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 # Adjust based on security/performance needs ) hashed = pwd_context.hash(password)
A03: Injection
# VULNERABLE: SQL injection in auth query query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'" # SECURE: Parameterized queries query = "SELECT * FROM users WHERE email = $1" user = await db.fetch_one(query, email) if user and pwd_context.verify(password, user.hashed_password): return user
A07: Identification and Authentication Failures
# VULNERABLE: Weak session management session_id = hashlib.md5(str(time.time()).encode()).hexdigest() # SECURE: Cryptographically secure session tokens import secrets session_id = secrets.token_urlsafe(32) # VULNERABLE: No rate limiting @app.post("/auth/login") async def login(credentials: LoginRequest): return await authenticate(credentials) # SECURE: Rate limiting with slowdown from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @app.post("/auth/login") @limiter.limit("5/minute") # 5 attempts per minute async def login(request: Request, credentials: LoginRequest): return await authenticate(credentials)
Integration with Keycloak
For enterprise-grade auth, integrate with Keycloak (see [[keycloak]] skill):
from keycloak import KeycloakOpenID # Configure Keycloak client keycloak_openid = KeycloakOpenID( server_url="http://localhost:8080/", client_id="your-client", realm_name="your-realm", client_secret_key="your-secret" ) # Get token token = keycloak_openid.token(username, password) # Validate token token_info = keycloak_openid.introspect(token['access_token']) # Get user info user_info = keycloak_openid.userinfo(token['access_token']) # Decode and verify token locally KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + \ keycloak_openid.public_key() + \ "\n-----END PUBLIC KEY-----" options = {"verify_signature": True, "verify_aud": True, "verify_exp": True} token_info = keycloak_openid.decode_token( token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options )
Related Skills & Resources
Skills
- [[keycloak]] - Enterprise identity and access management
- [[deep-analysis]] - Security audit templates and threat modeling
- [[extended-thinking]] - Enable deep reasoning for security analysis
- [[complex-reasoning]] - Hypothesis-driven debugging for auth issues
Keycloak Agents
- keycloak-realm-admin - Realm and client management
- keycloak-security-auditor - Security review and compliance
- keycloak-auth-flow-designer - Custom authentication flows
- keycloak-identity-specialist - Federation and SSO setup
External Resources
- OAuth 2.0 RFC 6749
- OpenID Connect Core 1.0
- JWT Best Practices RFC 8725
- OWASP Authentication Cheat Sheet
- OWASP Session Management
Troubleshooting
Common Issues
Token Verification Failures
# Debug JWT token python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))" # Verify token signature openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt # Check token expiration date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")
OAuth Flow Issues
# Enable debug logging import logging logging.basicConfig(level=logging.DEBUG) # Log OAuth flow steps logger.debug(f"Redirect URI: {redirect_uri}") logger.debug(f"State: {state}") logger.debug(f"Code: {code}") logger.debug(f"Token response: {token_response}")
Session Issues
# Check Redis session data redis-cli > KEYS session:* > HGETALL session:abc123 > TTL session:abc123
Last Updated: 2025-12-12 Version: 2.0.0 Enhanced with security analysis, threat modeling, and OIDC/OAuth 2.0 best practices