Coverage for src / idx_api / routers / embeddings_admin.py: 29%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-28 11:09 -0700

1"""Embeddings and vector search administration endpoints.""" 

2 

3from fastapi import APIRouter, Depends, HTTPException, Query 

4from pydantic import BaseModel 

5from sqlalchemy.orm import Session 

6 

7from idx_api.auth import AdminUser, RequiredUser 

8from idx_api.database import get_db 

9from idx_api.embeddings import ( 

10 reindex_all_agents, 

11 reindex_all_brokerages, 

12) 

13 

14router = APIRouter() 

15 

16 

17# ===== Response Models ===== 

18 

19 

20class ReindexResponse(BaseModel): 

21 """Reindex operation response.""" 

22 

23 success: bool 

24 brokerages_indexed: int 

25 agents_indexed: int 

26 message: str 

27 

28 

29class ImageIndexStats(BaseModel): 

30 """Statistics about image indexing progress.""" 

31 

32 total_properties: int 

33 properties_with_images: int 

34 total_images: int 

35 description_embeddings: int 

36 visual_embeddings: int 

37 

38 

39class ImageSearchResult(BaseModel): 

40 """Image search result with property info.""" 

41 

42 image_id: int 

43 listing_id: str 

44 image_url: str 

45 image_index: int 

46 description: str | None 

47 room_type: str | None 

48 city: str | None 

49 state: str | None 

50 price: float | None 

51 beds: int | None 

52 baths: int | None 

53 similarity: float 

54 

55 

56# ===== Embeddings Management Endpoints ===== 

57 

58 

59@router.post("/embeddings/reindex", response_model=ReindexResponse) 

60async def reindex_embeddings( 

61 user: AdminUser, 

62 db: Session = Depends(get_db), 

63): 

64 """ 

65 Rebuild the entire embeddings index for brokerages and agents. 

66 

67 This is useful when: 

68 - The embedding model has been updated 

69 - Data was imported without triggering auto-indexing 

70 - The index becomes corrupted or out of sync 

71 

72 Requires admin role. 

73 """ 

74 try: 

75 brokerages_count = reindex_all_brokerages(db) 

76 agents_count = reindex_all_agents(db) 

77 

78 return ReindexResponse( 

79 success=True, 

80 brokerages_indexed=brokerages_count, 

81 agents_indexed=agents_count, 

82 message=f"Successfully reindexed {brokerages_count} brokerages and {agents_count} agents", 

83 ) 

84 except Exception as e: 

85 raise HTTPException( 

86 status_code=500, 

87 detail=f"Reindexing failed: {str(e)}", 

88 ) 

89 

90 

91@router.post("/embeddings/reindex/brokers") 

92async def reindex_brokers_only( 

93 user: AdminUser, 

94 db: Session = Depends(get_db), 

95): 

96 """Rebuild embeddings index for brokerages only.""" 

97 try: 

98 count = reindex_all_brokerages(db) 

99 return { 

100 "success": True, 

101 "count": count, 

102 "message": f"Successfully reindexed {count} brokerages", 

103 } 

104 except Exception as e: 

105 raise HTTPException( 

106 status_code=500, 

107 detail=f"Brokerage reindexing failed: {str(e)}", 

108 ) 

109 

110 

111@router.post("/embeddings/reindex/agents") 

112async def reindex_agents_only( 

113 user: AdminUser, 

114 db: Session = Depends(get_db), 

115): 

116 """Rebuild embeddings index for agents only.""" 

117 try: 

118 count = reindex_all_agents(db) 

119 return { 

120 "success": True, 

121 "count": count, 

122 "message": f"Successfully reindexed {count} agents", 

123 } 

124 except Exception as e: 

125 raise HTTPException( 

126 status_code=500, 

127 detail=f"Agent reindexing failed: {str(e)}", 

128 ) 

129 

130 

131@router.get("/embeddings/stats") 

132async def get_embeddings_stats( 

133 user: AdminUser, 

134 db: Session = Depends(get_db), 

135): 

136 """Get vector index statistics (indexed counts for all content types).""" 

137 from sqlalchemy import text 

138 

139 stats = { 

140 "brokers_indexed": 0, 

141 "agents_indexed": 0, 

142 "properties_indexed": 0, 

143 "blog_posts_indexed": 0, 

144 "images_indexed": 0, 

145 "image_descriptions_indexed": 0, 

146 "image_visuals_indexed": 0, 

147 } 

148 

149 # Count indexed brokerages 

150 try: 

151 result = db.execute(text("SELECT COUNT(*) FROM vec_brokerages")) 

152 stats["brokers_indexed"] = result.scalar() or 0 

153 except Exception: 

154 pass 

155 

156 # Count indexed agents 

157 try: 

158 result = db.execute(text("SELECT COUNT(*) FROM vec_agents")) 

159 stats["agents_indexed"] = result.scalar() or 0 

160 except Exception: 

161 pass 

162 

163 # Count indexed properties 

164 try: 

165 result = db.execute(text("SELECT COUNT(*) FROM vec_properties")) 

166 stats["properties_indexed"] = result.scalar() or 0 

167 except Exception: 

168 pass 

169 

170 # Count indexed blog posts 

171 try: 

172 result = db.execute(text("SELECT COUNT(*) FROM vec_blog_posts")) 

173 stats["blog_posts_indexed"] = result.scalar() or 0 

174 except Exception: 

175 pass 

176 

177 # Count indexed property images 

178 try: 

179 result = db.execute(text("SELECT COUNT(*) FROM property_images")) 

180 stats["images_indexed"] = result.scalar() or 0 

181 except Exception: 

182 pass 

183 

184 # Count image description embeddings (Approach A) 

185 try: 

186 result = db.execute(text("SELECT COUNT(*) FROM vec_image_descriptions")) 

187 stats["image_descriptions_indexed"] = result.scalar() or 0 

188 except Exception: 

189 pass 

190 

191 # Count image visual embeddings (Approach B) 

192 try: 

193 result = db.execute(text("SELECT COUNT(*) FROM vec_image_visuals")) 

194 stats["image_visuals_indexed"] = result.scalar() or 0 

195 except Exception: 

196 pass 

197 

198 return stats 

199 

200 

201@router.post("/embeddings/reindex/properties") 

202async def reindex_properties( 

203 user: AdminUser, 

204 db: Session = Depends(get_db), 

205 limit: int | None = None, 

206): 

207 """Reindex all active properties for semantic search.""" 

208 try: 

209 from idx_api.embeddings import reindex_all_properties 

210 

211 count = reindex_all_properties(db, limit=limit) 

212 return { 

213 "success": True, 

214 "count": count, 

215 "message": f"Successfully indexed {count} properties", 

216 } 

217 except Exception as e: 

218 raise HTTPException( 

219 status_code=500, 

220 detail=f"Property reindexing failed: {str(e)}", 

221 ) 

222 

223 

224@router.post("/embeddings/reindex/blog-posts") 

225async def reindex_blog_posts( 

226 user: AdminUser, 

227 db: Session = Depends(get_db), 

228 posts: list[dict] | None = None, 

229): 

230 """Reindex blog posts for semantic search. 

231 

232 Expects a list of blog post data in the request body: 

233 [{"slug": "...", "title": "...", "description": "...", "content": "...", "tags": [...]}] 

234 """ 

235 if not posts: 

236 raise HTTPException(status_code=400, detail="No blog posts provided") 

237 

238 try: 

239 from idx_api.embeddings import index_blog_posts_from_data 

240 

241 count = index_blog_posts_from_data(db, posts) 

242 return { 

243 "success": True, 

244 "count": count, 

245 "message": f"Successfully indexed {count} blog posts", 

246 } 

247 except Exception as e: 

248 raise HTTPException( 

249 status_code=500, 

250 detail=f"Blog post reindexing failed: {str(e)}", 

251 ) 

252 

253 

254# ===== Image Indexing Endpoints (Multi-Modal Search) ===== 

255 

256 

257@router.get("/embeddings/images/stats", response_model=ImageIndexStats) 

258async def get_image_index_stats_endpoint( 

259 user: AdminUser, 

260 db: Session = Depends(get_db), 

261): 

262 """Get statistics about image indexing progress.""" 

263 from idx_api.embeddings import get_image_index_stats 

264 

265 try: 

266 stats = get_image_index_stats(db) 

267 return ImageIndexStats(**stats) 

268 except Exception as e: 

269 raise HTTPException( 

270 status_code=500, 

271 detail=f"Failed to get image stats: {str(e)}", 

272 ) 

273 

274 

275@router.post("/embeddings/reindex/images") 

276async def reindex_property_images( 

277 user: AdminUser, 

278 db: Session = Depends(get_db), 

279 limit: int = Query(10, ge=1, le=100, description="Max properties to process"), 

280 background: bool = Query(False, description="Run as background task"), 

281): 

282 """ 

283 Index property images using Qwen3-VL vision model. 

284 

285 This generates AI descriptions of property photos and creates 

286 embeddings for semantic image search (Approach A). 

287 

288 The process: 

289 1. Find properties with photos but no image embeddings 

290 2. For each property's photos (up to 5): 

291 - Download and resize the image 

292 - Generate a buyer-focused description using Qwen3-VL 

293 - Store description and create embedding 

294 

295 This is an expensive operation - start with small limits to test. 

296 """ 

297 from idx_api.config import settings 

298 from idx_api.embeddings import ( 

299 get_unindexed_property_images, 

300 index_property_image, 

301 index_property_image_visual, 

302 ) 

303 from idx_api.vision import ( 

304 describe_image, 

305 extract_room_type, 

306 get_visual_embedding, 

307 ) 

308 

309 try: 

310 # Get properties needing indexing 

311 properties = get_unindexed_property_images(db, limit=limit) 

312 

313 if not properties: 

314 return { 

315 "success": True, 

316 "message": "No properties need image indexing", 

317 "processed": 0, 

318 "images_indexed": 0, 

319 "visual_embeddings": 0, 

320 } 

321 

322 # Process each property's images 

323 total_images = 0 

324 total_visual = 0 

325 errors = [] 

326 

327 for prop in properties: 

328 listing_id = prop["listing_id"] 

329 photos = prop["photos"] 

330 

331 for idx, photo_url in enumerate(photos): 

332 try: 

333 # Generate description using vision model (Approach A) 

334 description = await describe_image(photo_url) 

335 

336 if description: 

337 # Extract room type from description 

338 room_type = extract_room_type(description) 

339 

340 # Store image and create description embedding 

341 image_id = index_property_image( 

342 db, 

343 listing_id=listing_id, 

344 image_url=photo_url, 

345 image_index=idx, 

346 description=description, 

347 room_type=room_type, 

348 ) 

349 total_images += 1 

350 

351 # Generate visual embedding if SigLIP is enabled (Approach B) 

352 if settings.siglip_enabled: 

353 visual_embedding = await get_visual_embedding(photo_url) 

354 if visual_embedding: 

355 index_property_image_visual(db, image_id, visual_embedding) 

356 total_visual += 1 

357 else: 

358 errors.append(f"{listing_id}[{idx}]: No description generated") 

359 

360 except Exception as e: 

361 errors.append(f"{listing_id}[{idx}]: {str(e)}") 

362 

363 # Commit after each property 

364 db.commit() 

365 

366 return { 

367 "success": True, 

368 "message": f"Indexed {total_images} images from {len(properties)} properties", 

369 "processed": len(properties), 

370 "images_indexed": total_images, 

371 "visual_embeddings": total_visual, 

372 "siglip_enabled": settings.siglip_enabled, 

373 "errors": errors[:10] if errors else [], # Return first 10 errors 

374 } 

375 

376 except Exception as e: 

377 raise HTTPException( 

378 status_code=500, 

379 detail=f"Image indexing failed: {str(e)}", 

380 ) 

381 

382 

383@router.get("/search/images") 

384async def search_property_images( 

385 user: RequiredUser, 

386 db: Session = Depends(get_db), 

387 q: str = Query(..., min_length=2, description="Search query"), 

388 limit: int = Query(20, ge=1, le=50, description="Maximum results"), 

389): 

390 """ 

391 Search property images using semantic similarity (Approach A). 

392 

393 Examples: 

394 - "granite countertops" 

395 - "modern kitchen with island" 

396 - "pool with mountain views" 

397 - "hardwood floors" 

398 - "walk-in closet" 

399 

400 This searches against AI-generated image descriptions, finding 

401 photos where the described features match your query. 

402 """ 

403 from idx_api.embeddings import search_image_descriptions 

404 

405 try: 

406 results = search_image_descriptions(db, query=q, limit=limit) 

407 return { 

408 "query": q, 

409 "results": [ImageSearchResult.model_validate(r) for r in results], 

410 "total": len(results), 

411 "search_type": "image_description", 

412 } 

413 except Exception as e: 

414 raise HTTPException( 

415 status_code=503, 

416 detail=f"Image search service unavailable: {str(e)}", 

417 ) 

418 

419 

420@router.get("/search/images-visual") 

421async def search_property_images_visual( 

422 user: RequiredUser, 

423 db: Session = Depends(get_db), 

424 q: str = Query(..., min_length=2, description="Search query"), 

425 limit: int = Query(20, ge=1, le=50, description="Maximum results"), 

426): 

427 """ 

428 Search property images using SigLIP visual embeddings (Approach B). 

429 

430 This embeds the query text using SigLIP and searches directly against 

431 image embeddings - no intermediate text description needed. 

432 

433 Examples: 

434 - "granite countertops" - finds images showing granite countertops 

435 - "pool" - finds images showing pools 

436 - "mountain views" - finds images showing mountain views 

437 

438 Requires SigLIP service to be enabled. 

439 """ 

440 from idx_api.config import settings 

441 from idx_api.embeddings import search_image_visuals 

442 from idx_api.vision import get_siglip_text_embedding 

443 

444 if not settings.siglip_enabled: 

445 raise HTTPException( 

446 status_code=503, 

447 detail="SigLIP visual search is not enabled. Set SIGLIP_ENABLED=true", 

448 ) 

449 

450 try: 

451 # Get text embedding for query using SigLIP 

452 query_embedding = await get_siglip_text_embedding(q) 

453 

454 if not query_embedding: 

455 raise HTTPException( 

456 status_code=503, 

457 detail="Failed to get query embedding from SigLIP", 

458 ) 

459 

460 # Search against visual embeddings 

461 results = search_image_visuals(db, query_embedding=query_embedding, limit=limit) 

462 return { 

463 "query": q, 

464 "results": [ImageSearchResult.model_validate(r) for r in results], 

465 "total": len(results), 

466 "search_type": "image_visual", 

467 } 

468 except HTTPException: 

469 raise 

470 except Exception as e: 

471 raise HTTPException( 

472 status_code=503, 

473 detail=f"Visual search service unavailable: {str(e)}", 

474 )