Coverage for src / idx_api / routers / vision.py: 59%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-28 11:09 -0700

1"""Vision extraction admin endpoints.""" 

2 

3import asyncio 

4from datetime import datetime 

5from typing import Optional 

6 

7from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException 

8from pydantic import BaseModel 

9from sqlalchemy import desc, select, text 

10from sqlalchemy.orm import Session 

11 

12from idx_api.auth import AdminUser 

13from idx_api.config import settings 

14from idx_api.database import get_db 

15from idx_api.embeddings.images import get_image_index_stats, get_unindexed_property_images 

16from idx_api.models.vision_job import VisionExtractionJob 

17 

18router = APIRouter() 

19 

20 

21# --- Response Models --- 

22 

23 

24class ImageIndexStats(BaseModel): 

25 """Current image index statistics.""" 

26 

27 total_properties: int 

28 properties_with_images: int 

29 total_images: int 

30 description_embeddings: int 

31 visual_embeddings: int 

32 coverage_percent: float 

33 

34 

35class TopFeature(BaseModel): 

36 """Most common feature tag.""" 

37 

38 feature_type: str 

39 feature_value: str 

40 count: int 

41 

42 

43class RoomTypeStats(BaseModel): 

44 """Room type distribution.""" 

45 

46 room_type: str 

47 count: int 

48 percent: float 

49 

50 

51class VisionStatusResponse(BaseModel): 

52 """Complete vision system status.""" 

53 

54 enabled: bool 

55 model: str 

56 ollama_url: str 

57 

58 # Current stats 

59 stats: ImageIndexStats 

60 unindexed_count: int 

61 

62 # Top features 

63 top_features: list[TopFeature] 

64 room_types: list[RoomTypeStats] 

65 

66 # Active job (if any) 

67 active_job: Optional[dict] = None 

68 

69 

70class JobSummary(BaseModel): 

71 """Summary of an extraction job.""" 

72 

73 id: int 

74 status: str 

75 started_at: datetime 

76 ended_at: Optional[datetime] 

77 duration_seconds: Optional[float] 

78 total_properties: int 

79 processed_properties: int 

80 total_images: int 

81 processed_images: int 

82 failed_images: int 

83 tags_created: int 

84 progress_percent: float 

85 triggered_by: Optional[str] 

86 

87 class Config: 

88 from_attributes = True 

89 

90 

91class JobHistoryResponse(BaseModel): 

92 """Paginated job history.""" 

93 

94 jobs: list[JobSummary] 

95 total: int 

96 

97 

98class StartJobRequest(BaseModel): 

99 """Request to start extraction job.""" 

100 

101 limit: Optional[int] = None 

102 brokerage_id: Optional[int] = None 

103 skip_visual: bool = True 

104 

105 

106class StartJobResponse(BaseModel): 

107 """Response after starting job.""" 

108 

109 job_id: int 

110 message: str 

111 

112 

113# --- Endpoints --- 

114 

115 

116@router.get("/status", response_model=VisionStatusResponse) 

117async def get_vision_status(user: AdminUser, db: Session = Depends(get_db)): 

118 """Get current vision extraction system status.""" 

119 

120 # Get image index stats 

121 stats_raw = get_image_index_stats(db) 

122 coverage = ( 

123 (stats_raw["properties_with_images"] / stats_raw["total_properties"] * 100) 

124 if stats_raw["total_properties"] > 0 

125 else 0 

126 ) 

127 

128 stats = ImageIndexStats( 

129 total_properties=stats_raw["total_properties"], 

130 properties_with_images=stats_raw["properties_with_images"], 

131 total_images=stats_raw["total_images"], 

132 description_embeddings=stats_raw["description_embeddings"], 

133 visual_embeddings=stats_raw["visual_embeddings"], 

134 coverage_percent=round(coverage, 1), 

135 ) 

136 

137 # Get unindexed count 

138 unindexed = get_unindexed_property_images(db, limit=1000) 

139 unindexed_count = len(unindexed) 

140 

141 # Get top features 

142 result = db.execute( 

143 text(""" 

144 SELECT feature_type, feature_value, COUNT(*) as cnt 

145 FROM image_feature_tags 

146 GROUP BY feature_type, feature_value 

147 ORDER BY cnt DESC 

148 LIMIT 15 

149 """) 

150 ) 

151 top_features = [ 

152 TopFeature(feature_type=row.feature_type, feature_value=row.feature_value, count=row.cnt) 

153 for row in result.fetchall() 

154 ] 

155 

156 # Get room type distribution 

157 result = db.execute( 

158 text(""" 

159 SELECT room_type, COUNT(*) as cnt 

160 FROM property_images 

161 WHERE room_type IS NOT NULL 

162 GROUP BY room_type 

163 ORDER BY cnt DESC 

164 """) 

165 ) 

166 room_rows = result.fetchall() 

167 total_rooms = sum(row.cnt for row in room_rows) 

168 room_types = [ 

169 RoomTypeStats( 

170 room_type=row.room_type, 

171 count=row.cnt, 

172 percent=round(row.cnt / total_rooms * 100, 1) if total_rooms > 0 else 0, 

173 ) 

174 for row in room_rows 

175 ] 

176 

177 # Check for active job 

178 active_job = None 

179 result = db.execute( 

180 select(VisionExtractionJob) 

181 .where(VisionExtractionJob.status.in_(["pending", "running"])) 

182 .order_by(desc(VisionExtractionJob.started_at)) 

183 .limit(1) 

184 ) 

185 job = result.scalar_one_or_none() 

186 if job: 

187 active_job = { 

188 "id": job.id, 

189 "status": job.status, 

190 "progress_percent": job.progress_percent, 

191 "processed_images": job.processed_images, 

192 "total_images": job.total_images, 

193 "started_at": job.started_at.isoformat() if job.started_at else None, 

194 } 

195 

196 return VisionStatusResponse( 

197 enabled=settings.vision_enabled, 

198 model=settings.vision_model, 

199 ollama_url=settings.ollama_base_url, 

200 stats=stats, 

201 unindexed_count=unindexed_count, 

202 top_features=top_features, 

203 room_types=room_types, 

204 active_job=active_job, 

205 ) 

206 

207 

208@router.get("/jobs", response_model=JobHistoryResponse) 

209async def get_job_history( 

210 user: AdminUser, 

211 db: Session = Depends(get_db), 

212 limit: int = 20, 

213 offset: int = 0, 

214): 

215 """Get extraction job history.""" 

216 

217 # Get total count 

218 total_result = db.execute(select(VisionExtractionJob.id)) 

219 total = len(total_result.fetchall()) 

220 

221 # Get jobs 

222 result = db.execute( 

223 select(VisionExtractionJob) 

224 .order_by(desc(VisionExtractionJob.started_at)) 

225 .limit(limit) 

226 .offset(offset) 

227 ) 

228 jobs = result.scalars().all() 

229 

230 return JobHistoryResponse( 

231 jobs=[ 

232 JobSummary( 

233 id=job.id, 

234 status=job.status, 

235 started_at=job.started_at, 

236 ended_at=job.ended_at, 

237 duration_seconds=job.duration_seconds, 

238 total_properties=job.total_properties, 

239 processed_properties=job.processed_properties, 

240 total_images=job.total_images, 

241 processed_images=job.processed_images, 

242 failed_images=job.failed_images, 

243 tags_created=job.tags_created, 

244 progress_percent=job.progress_percent, 

245 triggered_by=job.triggered_by, 

246 ) 

247 for job in jobs 

248 ], 

249 total=total, 

250 ) 

251 

252 

253@router.get("/jobs/{job_id}", response_model=JobSummary) 

254async def get_job_detail(job_id: int, user: AdminUser, db: Session = Depends(get_db)): 

255 """Get details for a specific job.""" 

256 

257 result = db.execute(select(VisionExtractionJob).where(VisionExtractionJob.id == job_id)) 

258 job = result.scalar_one_or_none() 

259 

260 if not job: 

261 raise HTTPException(status_code=404, detail="Job not found") 

262 

263 return JobSummary( 

264 id=job.id, 

265 status=job.status, 

266 started_at=job.started_at, 

267 ended_at=job.ended_at, 

268 duration_seconds=job.duration_seconds, 

269 total_properties=job.total_properties, 

270 processed_properties=job.processed_properties, 

271 total_images=job.total_images, 

272 processed_images=job.processed_images, 

273 failed_images=job.failed_images, 

274 tags_created=job.tags_created, 

275 progress_percent=job.progress_percent, 

276 triggered_by=job.triggered_by, 

277 ) 

278 

279 

280# Note: Actual job execution would need to be handled by a separate worker process 

281# This endpoint just creates the job record - a background worker polls for pending jobs 

282@router.post("/jobs", response_model=StartJobResponse) 

283async def start_extraction_job( 

284 request: StartJobRequest, 

285 user: AdminUser, 

286 db: Session = Depends(get_db), 

287): 

288 """Create a new extraction job. 

289 

290 Note: This creates the job record. A separate worker process 

291 should poll for pending jobs and execute them. 

292 """ 

293 

294 # Check for existing active job 

295 result = db.execute( 

296 select(VisionExtractionJob).where(VisionExtractionJob.status.in_(["pending", "running"])) 

297 ) 

298 active = result.scalar_one_or_none() 

299 if active: 

300 raise HTTPException( 

301 status_code=409, detail=f"Job {active.id} is already {active.status}" 

302 ) 

303 

304 # Get count of unindexed properties 

305 unindexed = get_unindexed_property_images(db, limit=request.limit or 10000) 

306 total_properties = len(unindexed) 

307 total_images = sum(len(p["photos"]) for p in unindexed) 

308 

309 if total_properties == 0: 

310 raise HTTPException(status_code=400, detail="No unindexed properties found") 

311 

312 # Create job record 

313 job = VisionExtractionJob( 

314 status="pending", 

315 started_at=datetime.utcnow(), 

316 brokerage_id=request.brokerage_id, 

317 limit_properties=request.limit, 

318 total_properties=total_properties, 

319 total_images=total_images, 

320 vision_model=settings.vision_model, 

321 skip_visual_embeddings=request.skip_visual, 

322 triggered_by=user.email, 

323 ) 

324 db.add(job) 

325 db.commit() 

326 db.refresh(job) 

327 

328 return StartJobResponse( 

329 job_id=job.id, 

330 message=f"Job created: {total_properties} properties, {total_images} images queued", 

331 ) 

332 

333 

334@router.post("/jobs/{job_id}/cancel") 

335async def cancel_job(job_id: int, user: AdminUser, db: Session = Depends(get_db)): 

336 """Cancel a pending or running job.""" 

337 

338 result = db.execute(select(VisionExtractionJob).where(VisionExtractionJob.id == job_id)) 

339 job = result.scalar_one_or_none() 

340 

341 if not job: 

342 raise HTTPException(status_code=404, detail="Job not found") 

343 

344 if job.status not in ["pending", "running"]: 

345 raise HTTPException(status_code=400, detail=f"Cannot cancel job with status: {job.status}") 

346 

347 job.status = "cancelled" 

348 job.ended_at = datetime.utcnow() 

349 db.commit() 

350 

351 return {"message": f"Job {job_id} cancelled"}