Coverage for src / idx_api / auth.py: 49%
118 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-28 11:09 -0700
1"""Authentication module with dual auth support (OIDC JWT + API keys)."""
3from datetime import datetime, timezone
4from typing import Annotated, Optional
6from fastapi import Depends, HTTPException, Security, status
7from jose import jwt
8from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
9from sqlalchemy.orm import Session
11from idx_api.config import settings
12from idx_api.database import get_db
13from idx_api.models import APIKey, User
14from idx_api.security import verify_api_key
16# Security scheme for Bearer tokens (both JWT and API keys)
17security = HTTPBearer(auto_error=False)
19# Role hierarchy for permission checks
20ROLE_HIERARCHY = {
21 "viewer": 0,
22 "agent": 1,
23 "broker": 2,
24 "admin": 4,
25}
28async def authenticate_api_key(
29 api_key: str, db: Session
30) -> Optional[User]:
31 """
32 Authenticate user via API key.
34 Args:
35 api_key: The plaintext API key (starts with idx_)
36 db: Database session
38 Returns:
39 User object if authenticated, None otherwise
40 """
41 # Extract prefix for quick lookup (first 12 chars)
42 key_prefix = api_key[:12] if len(api_key) >= 12 else api_key
44 # Find API key by prefix
45 api_key_obj = (
46 db.query(APIKey)
47 .filter(APIKey.key_prefix == key_prefix, APIKey.revoked_at.is_(None))
48 .first()
49 )
51 if not api_key_obj:
52 return None
54 # Verify the full key against stored hash
55 if not verify_api_key(api_key, api_key_obj.key_hash):
56 return None
58 # Check if key is expired
59 if api_key_obj.expires_at and datetime.now(timezone.utc) > api_key_obj.expires_at:
60 return None
62 # Update last used timestamp
63 api_key_obj.last_used_at = datetime.now(timezone.utc)
64 db.commit()
66 # Return associated user (or create a virtual user from key metadata)
67 if api_key_obj.user_id:
68 user = db.query(User).filter(User.id == api_key_obj.user_id).first()
69 if user and not user.disabled_at:
70 return user
72 # If no user associated, create a virtual user from API key
73 # This allows API keys to work without a full user account
74 virtual_user = User(
75 id=None, # Virtual user
76 email=f"api_key_{api_key_obj.id}@system",
77 role=api_key_obj.role,
78 brokerage_id=api_key_obj.brokerage_id,
79 )
80 return virtual_user
83async def authenticate_jwt(token: str, db: Session) -> Optional[User]:
84 """
85 Authenticate user via OIDC JWT token.
87 Args:
88 token: The JWT token from Keycloak
89 db: Database session
91 Returns:
92 User object if authenticated, None otherwise
93 """
94 if not settings.jwt_public_key:
95 # JWT public key not configured yet
96 print("DEBUG: JWT public key not configured")
97 return None
99 try:
100 print(f"DEBUG: Attempting to decode JWT token (first 20 chars): {token[:20]}...")
101 # Decode and verify JWT token (audience is optional for Keycloak)
102 payload = jwt.decode(
103 token,
104 settings.jwt_public_key,
105 algorithms=[settings.jwt_algorithm],
106 options={"verify_aud": False}, # Keycloak doesn't always include audience
107 )
108 print(f"DEBUG: JWT decoded successfully, payload keys: {list(payload.keys())}")
110 # Extract user info from JWT
111 keycloak_user_id = payload.get("sub")
112 email = payload.get("email")
113 preferred_username = payload.get("preferred_username")
115 print(f"DEBUG: sub={keycloak_user_id}, email={email}, preferred_username={preferred_username}")
117 # Extract roles from Keycloak JWT (realm_access.roles)
118 realm_access = payload.get("realm_access", {})
119 roles = realm_access.get("roles", [])
121 print(f"DEBUG: realm_access={realm_access}, roles={roles}")
123 # Determine highest role (admin > broker > agent > viewer)
124 user_role = "viewer" # Default
125 if "admin" in roles:
126 user_role = "admin"
127 elif "broker" in roles:
128 user_role = "broker"
129 elif "agent" in roles:
130 user_role = "agent"
132 # Use sub if available, otherwise fall back to preferred_username or email
133 # Some Keycloak configurations don't include sub in the JWT
134 if not keycloak_user_id:
135 keycloak_user_id = preferred_username or email
136 print(f"DEBUG: No 'sub' in JWT, using fallback identifier: {keycloak_user_id}")
138 if not keycloak_user_id:
139 print("DEBUG: No user identifier found in JWT (no sub, preferred_username, or email)")
140 return None
142 # Find or create user (with retry for race conditions)
143 user = (
144 db.query(User)
145 .filter(User.keycloak_user_id == keycloak_user_id)
146 .first()
147 )
149 if not user:
150 # Auto-create user on first login with role from JWT
151 # Use try/except to handle race condition when multiple requests
152 # try to create the same user simultaneously
153 try:
154 user = User(
155 keycloak_user_id=keycloak_user_id,
156 email=email or f"{keycloak_user_id}@unknown",
157 role=user_role,
158 )
159 db.add(user)
160 db.commit()
161 db.refresh(user)
162 except Exception as create_error:
163 # Race condition: another request created the user first
164 # Roll back and fetch the existing user
165 db.rollback()
166 print(f"DEBUG: User creation race condition, fetching existing: {create_error}")
167 user = (
168 db.query(User)
169 .filter(User.keycloak_user_id == keycloak_user_id)
170 .first()
171 )
172 if not user:
173 # Still not found - something else is wrong
174 print(f"DEBUG: User still not found after race condition handling")
175 return None
176 else:
177 # Update role if it changed in Keycloak
178 if user.role != user_role:
179 user.role = user_role
180 db.commit()
182 # Check if user is disabled
183 if user.disabled_at:
184 return None
186 # Update last login time
187 user.last_login_at = datetime.now(timezone.utc)
188 db.commit()
190 return user
192 except jwt.ExpiredSignatureError as e:
193 # Token expired
194 print(f"DEBUG: JWT expired: {e}")
195 return None
196 except jwt.JWTError as e:
197 # Invalid token or JWT error
198 print(f"DEBUG: JWT error: {e}")
199 return None
200 except Exception as e:
201 # Other errors
202 print(f"DEBUG: Unexpected error in JWT auth: {type(e).__name__}: {e}")
203 return None
206async def get_current_user(
207 credentials: Optional[HTTPAuthorizationCredentials] = Security(security),
208 db: Session = Depends(get_db),
209) -> Optional[User]:
210 """
211 Get current authenticated user (dual auth: API key or JWT).
213 This dependency returns None if not authenticated (optional auth).
214 Use require_role() for mandatory authentication.
216 Args:
217 credentials: HTTP Bearer credentials
218 db: Database session
220 Returns:
221 User object if authenticated, None otherwise
222 """
223 if not credentials:
224 return None
226 token = credentials.credentials
228 # Try API key authentication first (starts with idx_)
229 if token.startswith("idx_"):
230 user = await authenticate_api_key(token, db)
231 if user:
232 return user
234 # Try JWT authentication
235 user = await authenticate_jwt(token, db)
236 if user:
237 return user
239 # Neither authentication method succeeded
240 return None
243async def get_current_user_required(
244 credentials: Optional[HTTPAuthorizationCredentials] = Security(security),
245 db: Session = Depends(get_db),
246) -> User:
247 """
248 Get current authenticated user (required).
250 Raises 401 if not authenticated.
252 Args:
253 credentials: HTTP Bearer credentials
254 db: Database session
256 Returns:
257 User object
259 Raises:
260 HTTPException: 401 if not authenticated
261 """
262 user = await get_current_user(credentials, db)
264 if not user:
265 raise HTTPException(
266 status_code=status.HTTP_401_UNAUTHORIZED,
267 detail="Authentication required",
268 headers={"WWW-Authenticate": "Bearer"},
269 )
271 return user
274def require_role(min_role: str):
275 """
276 Dependency factory: require minimum role.
278 Role hierarchy: admin (4) > broker (2) > agent (1) > viewer (0)
280 Args:
281 min_role: Minimum required role (viewer, agent, broker, admin)
283 Returns:
284 Dependency function that checks role
286 Example:
287 @router.get("/admin-only")
288 async def admin_endpoint(user: Annotated[User, Depends(require_role("admin"))]):
289 ...
290 """
292 async def check_role(
293 user: User = Depends(get_current_user_required),
294 ) -> User:
295 min_level = ROLE_HIERARCHY.get(min_role, 999)
296 user_level = ROLE_HIERARCHY.get(user.role, 0)
298 if user_level < min_level:
299 raise HTTPException(
300 status_code=status.HTTP_403_FORBIDDEN,
301 detail=f"Insufficient permissions. Required role: {min_role}",
302 )
304 return user
306 return check_role
309# Type aliases for common dependencies
310CurrentUser = Annotated[Optional[User], Depends(get_current_user)]
311RequiredUser = Annotated[User, Depends(get_current_user_required)]
312AdminUser = Annotated[User, Depends(require_role("admin"))]
313BrokerUser = Annotated[User, Depends(require_role("broker"))]
314AgentUser = Annotated[User, Depends(require_role("agent"))]