Coverage for src / idx_api / auth.py: 49%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-28 11:09 -0700

1"""Authentication module with dual auth support (OIDC JWT + API keys).""" 

2 

3from datetime import datetime, timezone 

4from typing import Annotated, Optional 

5 

6from fastapi import Depends, HTTPException, Security, status 

7from jose import jwt 

8from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

9from sqlalchemy.orm import Session 

10 

11from idx_api.config import settings 

12from idx_api.database import get_db 

13from idx_api.models import APIKey, User 

14from idx_api.security import verify_api_key 

15 

16# Security scheme for Bearer tokens (both JWT and API keys) 

17security = HTTPBearer(auto_error=False) 

18 

19# Role hierarchy for permission checks 

20ROLE_HIERARCHY = { 

21 "viewer": 0, 

22 "agent": 1, 

23 "broker": 2, 

24 "admin": 4, 

25} 

26 

27 

28async def authenticate_api_key( 

29 api_key: str, db: Session 

30) -> Optional[User]: 

31 """ 

32 Authenticate user via API key. 

33 

34 Args: 

35 api_key: The plaintext API key (starts with idx_) 

36 db: Database session 

37 

38 Returns: 

39 User object if authenticated, None otherwise 

40 """ 

41 # Extract prefix for quick lookup (first 12 chars) 

42 key_prefix = api_key[:12] if len(api_key) >= 12 else api_key 

43 

44 # Find API key by prefix 

45 api_key_obj = ( 

46 db.query(APIKey) 

47 .filter(APIKey.key_prefix == key_prefix, APIKey.revoked_at.is_(None)) 

48 .first() 

49 ) 

50 

51 if not api_key_obj: 

52 return None 

53 

54 # Verify the full key against stored hash 

55 if not verify_api_key(api_key, api_key_obj.key_hash): 

56 return None 

57 

58 # Check if key is expired 

59 if api_key_obj.expires_at and datetime.now(timezone.utc) > api_key_obj.expires_at: 

60 return None 

61 

62 # Update last used timestamp 

63 api_key_obj.last_used_at = datetime.now(timezone.utc) 

64 db.commit() 

65 

66 # Return associated user (or create a virtual user from key metadata) 

67 if api_key_obj.user_id: 

68 user = db.query(User).filter(User.id == api_key_obj.user_id).first() 

69 if user and not user.disabled_at: 

70 return user 

71 

72 # If no user associated, create a virtual user from API key 

73 # This allows API keys to work without a full user account 

74 virtual_user = User( 

75 id=None, # Virtual user 

76 email=f"api_key_{api_key_obj.id}@system", 

77 role=api_key_obj.role, 

78 brokerage_id=api_key_obj.brokerage_id, 

79 ) 

80 return virtual_user 

81 

82 

83async def authenticate_jwt(token: str, db: Session) -> Optional[User]: 

84 """ 

85 Authenticate user via OIDC JWT token. 

86 

87 Args: 

88 token: The JWT token from Keycloak 

89 db: Database session 

90 

91 Returns: 

92 User object if authenticated, None otherwise 

93 """ 

94 if not settings.jwt_public_key: 

95 # JWT public key not configured yet 

96 print("DEBUG: JWT public key not configured") 

97 return None 

98 

99 try: 

100 print(f"DEBUG: Attempting to decode JWT token (first 20 chars): {token[:20]}...") 

101 # Decode and verify JWT token (audience is optional for Keycloak) 

102 payload = jwt.decode( 

103 token, 

104 settings.jwt_public_key, 

105 algorithms=[settings.jwt_algorithm], 

106 options={"verify_aud": False}, # Keycloak doesn't always include audience 

107 ) 

108 print(f"DEBUG: JWT decoded successfully, payload keys: {list(payload.keys())}") 

109 

110 # Extract user info from JWT 

111 keycloak_user_id = payload.get("sub") 

112 email = payload.get("email") 

113 preferred_username = payload.get("preferred_username") 

114 

115 print(f"DEBUG: sub={keycloak_user_id}, email={email}, preferred_username={preferred_username}") 

116 

117 # Extract roles from Keycloak JWT (realm_access.roles) 

118 realm_access = payload.get("realm_access", {}) 

119 roles = realm_access.get("roles", []) 

120 

121 print(f"DEBUG: realm_access={realm_access}, roles={roles}") 

122 

123 # Determine highest role (admin > broker > agent > viewer) 

124 user_role = "viewer" # Default 

125 if "admin" in roles: 

126 user_role = "admin" 

127 elif "broker" in roles: 

128 user_role = "broker" 

129 elif "agent" in roles: 

130 user_role = "agent" 

131 

132 # Use sub if available, otherwise fall back to preferred_username or email 

133 # Some Keycloak configurations don't include sub in the JWT 

134 if not keycloak_user_id: 

135 keycloak_user_id = preferred_username or email 

136 print(f"DEBUG: No 'sub' in JWT, using fallback identifier: {keycloak_user_id}") 

137 

138 if not keycloak_user_id: 

139 print("DEBUG: No user identifier found in JWT (no sub, preferred_username, or email)") 

140 return None 

141 

142 # Find or create user (with retry for race conditions) 

143 user = ( 

144 db.query(User) 

145 .filter(User.keycloak_user_id == keycloak_user_id) 

146 .first() 

147 ) 

148 

149 if not user: 

150 # Auto-create user on first login with role from JWT 

151 # Use try/except to handle race condition when multiple requests 

152 # try to create the same user simultaneously 

153 try: 

154 user = User( 

155 keycloak_user_id=keycloak_user_id, 

156 email=email or f"{keycloak_user_id}@unknown", 

157 role=user_role, 

158 ) 

159 db.add(user) 

160 db.commit() 

161 db.refresh(user) 

162 except Exception as create_error: 

163 # Race condition: another request created the user first 

164 # Roll back and fetch the existing user 

165 db.rollback() 

166 print(f"DEBUG: User creation race condition, fetching existing: {create_error}") 

167 user = ( 

168 db.query(User) 

169 .filter(User.keycloak_user_id == keycloak_user_id) 

170 .first() 

171 ) 

172 if not user: 

173 # Still not found - something else is wrong 

174 print(f"DEBUG: User still not found after race condition handling") 

175 return None 

176 else: 

177 # Update role if it changed in Keycloak 

178 if user.role != user_role: 

179 user.role = user_role 

180 db.commit() 

181 

182 # Check if user is disabled 

183 if user.disabled_at: 

184 return None 

185 

186 # Update last login time 

187 user.last_login_at = datetime.now(timezone.utc) 

188 db.commit() 

189 

190 return user 

191 

192 except jwt.ExpiredSignatureError as e: 

193 # Token expired 

194 print(f"DEBUG: JWT expired: {e}") 

195 return None 

196 except jwt.JWTError as e: 

197 # Invalid token or JWT error 

198 print(f"DEBUG: JWT error: {e}") 

199 return None 

200 except Exception as e: 

201 # Other errors 

202 print(f"DEBUG: Unexpected error in JWT auth: {type(e).__name__}: {e}") 

203 return None 

204 

205 

206async def get_current_user( 

207 credentials: Optional[HTTPAuthorizationCredentials] = Security(security), 

208 db: Session = Depends(get_db), 

209) -> Optional[User]: 

210 """ 

211 Get current authenticated user (dual auth: API key or JWT). 

212 

213 This dependency returns None if not authenticated (optional auth). 

214 Use require_role() for mandatory authentication. 

215 

216 Args: 

217 credentials: HTTP Bearer credentials 

218 db: Database session 

219 

220 Returns: 

221 User object if authenticated, None otherwise 

222 """ 

223 if not credentials: 

224 return None 

225 

226 token = credentials.credentials 

227 

228 # Try API key authentication first (starts with idx_) 

229 if token.startswith("idx_"): 

230 user = await authenticate_api_key(token, db) 

231 if user: 

232 return user 

233 

234 # Try JWT authentication 

235 user = await authenticate_jwt(token, db) 

236 if user: 

237 return user 

238 

239 # Neither authentication method succeeded 

240 return None 

241 

242 

243async def get_current_user_required( 

244 credentials: Optional[HTTPAuthorizationCredentials] = Security(security), 

245 db: Session = Depends(get_db), 

246) -> User: 

247 """ 

248 Get current authenticated user (required). 

249 

250 Raises 401 if not authenticated. 

251 

252 Args: 

253 credentials: HTTP Bearer credentials 

254 db: Database session 

255 

256 Returns: 

257 User object 

258 

259 Raises: 

260 HTTPException: 401 if not authenticated 

261 """ 

262 user = await get_current_user(credentials, db) 

263 

264 if not user: 

265 raise HTTPException( 

266 status_code=status.HTTP_401_UNAUTHORIZED, 

267 detail="Authentication required", 

268 headers={"WWW-Authenticate": "Bearer"}, 

269 ) 

270 

271 return user 

272 

273 

274def require_role(min_role: str): 

275 """ 

276 Dependency factory: require minimum role. 

277 

278 Role hierarchy: admin (4) > broker (2) > agent (1) > viewer (0) 

279 

280 Args: 

281 min_role: Minimum required role (viewer, agent, broker, admin) 

282 

283 Returns: 

284 Dependency function that checks role 

285 

286 Example: 

287 @router.get("/admin-only") 

288 async def admin_endpoint(user: Annotated[User, Depends(require_role("admin"))]): 

289 ... 

290 """ 

291 

292 async def check_role( 

293 user: User = Depends(get_current_user_required), 

294 ) -> User: 

295 min_level = ROLE_HIERARCHY.get(min_role, 999) 

296 user_level = ROLE_HIERARCHY.get(user.role, 0) 

297 

298 if user_level < min_level: 

299 raise HTTPException( 

300 status_code=status.HTTP_403_FORBIDDEN, 

301 detail=f"Insufficient permissions. Required role: {min_role}", 

302 ) 

303 

304 return user 

305 

306 return check_role 

307 

308 

309# Type aliases for common dependencies 

310CurrentUser = Annotated[Optional[User], Depends(get_current_user)] 

311RequiredUser = Annotated[User, Depends(get_current_user_required)] 

312AdminUser = Annotated[User, Depends(require_role("admin"))] 

313BrokerUser = Annotated[User, Depends(require_role("broker"))] 

314AgentUser = Annotated[User, Depends(require_role("agent"))]